| Crates.io | recoco-core |
| lib.rs | recoco-core |
| version | 0.2.1 |
| created_at | 2026-01-25 19:29:51.974887+00 |
| updated_at | 2026-01-25 21:00:29.39563+00 |
| description | Recoco-core is the core library of Recoco; it's nearly identical to the main ReCoco crate, which is a simple wrapper around recoco-core and other sub-crates. |
| homepage | |
| repository | https://github.com/knitli/recoco |
| max_upload_size | |
| id | 2069393 |
| size | 1,253,271 |
ReCoco is a pure Rust fork of the excellent CocoIndex, a high-performance, incremental ETL and data processing framework.
I decided to create a Rust-only fork of CocoIndex for a couple reasons:
CocoIndex is not a Rust library. CocoIndex is written in Rust, but it does not expose a Rust API and its packaging, documentation, and examples are only focused on Python. It exposes a more limited API through its Rust extensions. It's not even released on crates.io.
CocoIndex is heavy. CocoIndex has several very heavy dependencies that you probably don't need all of, including Google/AWS/Azure components, Qdrant/Postgres/Neo4j, and more.
For Knitli, I particularly needed dependency control. I wanted to use CocoIndex as an ETL engine for Thread, but Thread needs to be edge-deployable. The dependencies were way too heavy and would never compile to WASM. Thread, of course, is also a Rust project, so pulling in a lot of Python dependencies didn't make sense for me.
[!NOTE] Knitli and ReCoco have no official relationship with CocoIndex and this project is not endorsed by them. We will contribute as much as we can upstream, our contribution guidelines encourage you to submit PRs and issues affecting shared code upstream to help both projects.
ReCoco fully exposes a Rust API. You can use ReCoco to support your rust ETL projects directly. Build on it.
Every target, source, and function is independently feature-gated. Use only what you want.
The minimum install now uses 600 fewer crates (820 -> 620)
We will regularly merge in upstream fixes and changes, particularly sources, targets, and functions.
blake2 -> blake3) to make ReCoco as fast as possiblerecoco, recoco-utils, and recoco-splitters cratespersistence feature flag)ReCoco, like CocoIndex, enables scalable data pipelines with intelligent incremental processing for many use cases, including:
Add recoco to your Cargo.toml. Since ReCoco uses a modular feature system, you should enable only the features you need.
[dependencies]
recoco = { version = "0.1.0", default-features = false, features = ["source-local-file", "function-split"] }
| Feature | Description | Dependencies |
|---|---|---|
source-local-file |
Read files from local filesystem | β Default - lightweight |
source-postgres |
Read from PostgreSQL (Change Data Capture) | π¦ PostgreSQL driver |
source-s3 |
Read from Amazon S3 | π¦ AWS SDK |
source-azure |
Read from Azure Blob Storage | π¦ Azure SDK |
source-gdrive |
Read from Google Drive | π¦ Google APIs |
| Feature | Description | Dependencies |
|---|---|---|
target-postgres |
Write to PostgreSQL | π¦ PostgreSQL driver |
target-qdrant |
Write to Qdrant Vector DB | π¦ Qdrant client |
target-neo4j |
Write to Neo4j Graph DB | π¦ Neo4j driver |
target-kuzu |
Write to KΓΉzu embedded Graph DB | π¦ KΓΉzu bindings |
| Feature | Description | Dependencies |
|---|---|---|
function-split |
Text splitting utilities (recursive, semantic) | β Lightweight |
function-embed |
Text embedding (OpenAI, Vertex AI, Voyage) | π¦ LLM APIs |
function-extract-llm |
Use LLM to extract data | π¦ LLM APIs |
function-detect-lang |
Programming language detection | β Lightweight |
function-json |
JSON/JSON5 parsing and manipulation | β Lightweight |
Required for function-embed and function-extract-llm.
| Feature | Description | Dependencies |
|---|---|---|
provider-anthropic |
Anthropic (Claude) | π¦ reqwest |
provider-azure |
Azure OpenAI | π¦ async-openai |
provider-bedrock |
AWS Bedrock | π¦ reqwest |
provider-gemini |
Google Gemini | π¦ google-cloud-aiplatform |
provider-litellm |
LiteLLM: Many agents/models | π¦ async-openai |
provider-ollama |
Ollama (Local LLMs) | π¦ reqwest |
provider-openai |
OpenAI (GPT-5, etc.) | π¦ async-openai |
provider-openrouter |
OpenRouter: Many agents/models | π¦ async-openai |
provider-voyage |
Voyage AI | π¦ reqwest |
provider-vllm |
vLLM: Many agents | π¦ async-openai |
When using function-split, you can enable specific Tree-sitter grammars to reduce binary size.
| Feature | Description |
|---|---|
splitter-language-rust |
Rust grammar |
splitter-language-python |
Python grammar |
splitter-language-javascript |
JavaScript grammar |
splitter-language-markdown |
Markdown grammar |
| ... and many more | See Cargo.toml for full list (c, cpp, go, java, etc.) |
| Feature | Description | Dependencies |
|---|---|---|
persistence |
SQLx-based state tracking & DB metadata | π¦ sqlx |
server |
Axum-based HTTP server components | π¦ axum, tower |
json-schema |
JSON Schema generation support | π¦ schemars |
| Feature | Description |
|---|---|
all-sources |
Enable all source connectors |
all-targets |
Enable all target connectors |
all-functions |
Enable all transform functions |
all-llm-providers |
Enable all LLM providers |
all-splitter-languages |
Enable all Tree-sitter grammars |
full |
Enable everything (β οΈ heavy dependencies) |
Here's a simple example that processes a string using a transient flow (in-memory, no persistence):
use recoco::prelude::*;
use recoco::builder::FlowBuilder;
use recoco::execution::evaluator::evaluate_transient_flow;
use serde_json::json;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 1. Initialize library context (loads operation registry)
recoco::lib_context::init_lib_context(Some(recoco::settings::Settings::default())).await?;
// 2. Create a flow builder
let mut builder = FlowBuilder::new("hello_world").await?;
// 3. Define input schema
let input = builder.add_direct_input(
"text".to_string(),
schema::make_output_type(schema::BasicValueType::Str),
)?;
// 4. Add a text splitting transformation
let output = builder.transform(
"SplitBySeparators".to_string(),
json!({ "separators_regex": [" "] }).as_object().unwrap().clone(),
vec![(input, Some("text".to_string()))],
None,
"splitter".to_string(),
).await?;
// 5. Set the output of the flow
builder.set_direct_output(output)?;
// 6. Build and execute the flow
let flow = builder.build_transient_flow().await?;
let result = evaluate_transient_flow(
&flow.0,
&vec![value::Value::Basic("Hello ReCoco".into())]
).await?;
println!("Result: {:?}", result);
Ok(())
}
You can define custom operations by implementing the SimpleFunctionExecutor and SimpleFunctionFactoryBase traits:
use recoco::ops::sdk::*;
struct UpperCaseExecutor;
#[async_trait::async_trait]
impl SimpleFunctionExecutor for UpperCaseExecutor {
async fn execute(&self, inputs: Vec<Value>) -> Result<Vec<Value>, ExecutionError> {
let input_str = inputs[0].as_basic_str()
.ok_or_else(|| ExecutionError::InvalidInput("Expected string".into()))?;
Ok(vec![Value::Basic(input_str.to_uppercase().into())])
}
}
// Register your custom operation
// See examples/custom_op.rs for complete implementation
All flows follow this consistent pattern:
// 1. Initialize library context
recoco::lib_context::init_lib_context(None).await?;
// 2. Create builder
let mut builder = FlowBuilder::new("my_flow").await?;
// 3. Define inputs
let input = builder.add_direct_input(/*...*/)?;
// 4. Chain transformations
let step1 = builder.transform(/*...*/)?;
let step2 = builder.transform(/*...*/)?;
// 5. Set outputs
builder.set_direct_output(step2)?;
// 6. Build and execute
let flow = builder.build_transient_flow().await?;
let result = evaluate_transient_flow(&flow.0, &inputs).await?;
ReCoco is configured via the Settings struct passed to init_lib_context.
| Variable | Description | Default |
|---|---|---|
RUST_LOG |
Controls logging verbosity (e.g., info, debug, recoco=trace) |
info |
The recoco::settings::Settings struct controls global behavior:
use recoco::settings::{Settings, DatabaseConnectionSpec, GlobalExecutionOptions};
let settings = Settings {
// Database configuration for persisted flows
database: Some(DatabaseConnectionSpec {
url: "postgres://user:pass@localhost:5432/recoco_db".to_string(),
user: Some("user".to_string()),
password: Some("pass".to_string()),
max_connections: 10,
min_connections: 1,
}),
// Concurrency controls
global_execution_options: GlobalExecutionOptions {
source_max_inflight_rows: Some(1000),
source_max_inflight_bytes: Some(10 * 1024 * 1024), // 10MB
},
// Other options
app_namespace: "my_app".to_string(),
ignore_target_drop_failures: false,
};
recoco::lib_context::init_lib_context(Some(settings)).await?;
Check out the examples/ directory for more usage patterns:
transient.rs: Basic Hello Worldfile_processing.rs: Line-by-line file processingcustom_op.rs: Defining and registering custom Rust operationsdetect_lang.rs: Using built-in functionsRun examples with the required features:
# Basic transient flow
cargo run -p recoco --example transient --features function-split
# File processing
cargo run -p recoco --example file_processing --features function-split
# Custom operations
cargo run -p recoco --example custom_op
# Language detection
cargo run -p recoco --example detect_lang --features function-detect-lang
# Build with default features (source-local-file only)
cargo build
# Build with specific features
cargo build --features "function-split,source-postgres"
# Build with all features (includes all heavy dependencies)
cargo build --features full
# Build specific feature bundles
cargo build --features all-sources # All source connectors
cargo build --features all-targets # All target connectors
cargo build --features all-functions # All transform functions
# Run all tests with default features
cargo test
# Run tests with specific features
cargo test --features "function-split,source-postgres"
# Run tests with all features
cargo test --features full
# Run a specific test with output
cargo test test_name -- --nocapture
# Check code formatting
cargo fmt --all -- --check
# Format code
cargo fmt
# Run clippy with all features
cargo clippy --all-features -- -D warnings
# Run clippy for specific workspace member
cargo clippy -p recoco --all-features
ReCoco implements an incremental dataflow engine where data flows through Flows:
Sources β Transforms β Targets
The engine tracks data lineage - when source data changes, only affected downstream computations are re-executed. This makes ReCoco highly efficient for processing large datasets that change incrementally.
Transient Flows - In-memory execution without persistence
FlowBuilder::build_transient_flow()execution::evaluator::evaluate_transient_flow()Persisted Flows - Tracked execution with incremental updates
FlowBuilder::build_flow() to create persisted flow specrecoco/
βββ base/ - Core data types (schema, value, spec, json_schema)
βββ builder/ - Flow construction API (FlowBuilder, analysis, planning)
βββ execution/ - Runtime engine (evaluator, memoization, indexing, tracking)
βββ ops/ - Operation implementations
β βββ sources/ - Data ingestion (local-file, postgres, s3, azure, gdrive)
β βββ functions/ - Transforms (split, embed, json, detect-lang, extract-llm)
β βββ targets/ - Data persistence (postgres, qdrant, neo4j, kuzu)
β βββ interface.rs - Trait definitions for all operation types
β βββ registry.rs - Operation registration and lookup
β βββ sdk.rs - Public API for custom operations
βββ lib_context.rs - Global library initialization and context management
βββ prelude.rs - Common imports (use recoco::prelude::*)
Contributions are welcome! Here's how to get started:
git checkout -b feature/my-new-featurecargo test --features fullcargo fmt --allcargo clippy --all-features -- -D warningsfeat: for new featuresfix: for bug fixesdocs: for documentationchore: for maintenance tasksrefactor: for code restructuringPlease see CONTRIBUTING.md for detailed guidelines.
crates/recoco/examples/ReCoco is a fork of CocoIndex:
| Aspect | CocoIndex (Upstream) | ReCoco (Fork) |
|---|---|---|
| Primary Language | Python with Rust core | Pure Rust |
| API Surface | Python-only | Full Rust API |
| Distribution | Not on crates.io | Published to crates.io |
| Dependencies | All bundled together | Feature-gated and modular |
| Target Audience | Python developers | Rust developers |
| License | Apache-2.0 | Apache-2.0 |
We aim to maintain compatibility with CocoIndex's core dataflow engine to allow porting upstream improvements, while diverging significantly in the API surface and dependency management to better serve Rust users.
Code headers maintain dual copyright (CocoIndex upstream, Knitli Inc. for ReCoco modifications) under Apache-2.0.
Apache License 2.0; see NOTICE for full license text.
This project is REUSE 3.3 compliant.
ReCoco is built on the excellent foundation provided by CocoIndex. We're grateful to the CocoIndex team for creating such a powerful and well-designed dataflow engine.
Built with π¦ by Knitli Inc.
Documentation β’ Crates.io β’ GitHub β’ Issues
In-memory operations aren't quite as feature rich; you lose incremental indexing, for example. At least, for now. β©