Architecture
Core Dataflow Model
Section titled “Core Dataflow Model”Recoco implements an incremental dataflow engine where data flows through Flows:
Sources → Transforms → Targets- Sources ingest data (files, database rows, cloud storage)
- Transforms (‘functions’) process data (split, embed, map, filter)
- Targets persist results (vector DBs, graph DBs, databases)
The engine tracks data lineage - when source data changes, the engine re-executes only the affected downstream computations.
Two Flow Execution Modes
Section titled “Two Flow Execution Modes”-
Transient Flows - In-memory execution without persistence
- Use
FlowBuilder::build_transient_flow() - Evaluate with
execution::evaluator::evaluate_transient_flow() - No database tracking, ideal for testing and single-run operations
- Use
-
Persisted Flows - Tracked execution with incremental updates
- Use
FlowBuilder::build_flow()to create persisted flow spec - Requires database setup for state tracking
- Enables incremental processing when data changes
- Use
Module Organization
Section titled “Module Organization”base/- Core data types (schema, value, spec, json_schema)builder/- Flow construction API (FlowBuilder, analysis, planning)execution/- Runtime engine (evaluator, memoization, indexing, tracking)ops/- Operation implementationssources/- Data ingestion (local-file, postgres, s3, azure, gdrive)functions/- Transforms (split, embed, json, detect-lang, extract-llm)targets/- Data persistence (postgres, qdrant, neo4j, kuzu)interface.rs- Trait definitions for all operation typesregistry.rs- Operation registration and lookupsdk.rs- Public API for custom operations
lib_context.rs- Global library initialization and context managementprelude.rs- Common imports (use recoco::prelude::*)
Flow Construction Pattern
Section titled “Flow Construction Pattern”All flows follow this pattern:
// 1. Initialize library context (loads operation registry)recoco::lib_context::init_lib_context(Some(Settings::default())).await?;
// 2. Create builderlet mut builder = FlowBuilder::new("flow_name").await?;
// 3. Define inputslet input = builder.add_direct_input( "input_name".to_string(), schema::make_output_type(schema::BasicValueType::Str),)?;
// 4. Add transforms (chain operations)let output = builder.transform( "OperationName".to_string(), json!({ "param": "value" }).as_object().unwrap().clone(), vec![(input, Some("arg_name".to_string()))], None, "step_name".to_string(),).await?;
// 5. Set outputbuilder.set_direct_output(output)?;
// 6. Build and executelet flow = builder.build_transient_flow().await?;let result = evaluate_transient_flow(&flow.0, &vec![Value::Basic(...)]).await?;Custom Operation Pattern
Section titled “Custom Operation Pattern”Creating custom operations requires implementing:
- Executor - Runtime logic (
SimpleFunctionExecutortrait) - Factory - Analysis and construction (
SimpleFunctionFactoryBasetrait) - Registration - Add to registry before use
See examples/custom_op.rs for full implementation pattern.
Feature System
Section titled “Feature System”Recoco feature-gates all operations at the dependency level:
- Sources:
source-local-file,source-postgres,source-s3,source-azure,source-gdrive - Targets:
target-postgres,target-qdrant,target-neo4j,target-kuzu - Functions:
function-split,function-embed,function-extract-llm,function-detect-lang,function-json
When adding new code:
- Check
Cargo.tomlfeatures to understand which dependencies are available - Conditional compilation uses
#[cfg(feature = "...")]attributes - The
fullfeature enables everything (use sparingly, it’s heavy)
Library Context
Section titled “Library Context”Initialize the global library context (lib_context::init_lib_context()) before creating flows. It:
- Loads the operation registry with all compiled-in operations
- Sets up authentication registries
- Initializes runtime configuration
Only call this once per application (usually in main()).