Nested Pipelines
Compose complex workflows by nesting pipelines within pipelines.
Overview
Nested pipelines allow you to break complex workflows into smaller, reusable components.
Basic Usage
Create Sub-Pipelines:
from wpipe import Pipeline
pipeline_a = Pipeline()
pipeline_a.set_steps([
(step1, "Step A1", "v1.0"),
(step2, "Step A2", "v1.0"),
])
pipeline_b = Pipeline()
pipeline_b.set_steps([
(step3, "Step B1", "v1.0"),
(step4, "Step B2", "v1.0"),
])
Nest in Main Pipeline:
main_pipeline = Pipeline()
main_pipeline.set_steps([
(pipeline_a, "Pipeline A", "v1.0"),
(pipeline_b, "Pipeline B", "v1.0"),
])
result = main_pipeline.run({"initial": "data"})
Data Flow in Nested Pipelines
Data is accumulated through the entire nested structure:
Input: {'x': 1}
↓
Pipeline A → {'x': 1, 'a': True}
↓
Pipeline B → {'x': 1, 'a': True, 'b': True}
↓
Output: {'x': 1, 'a': True, 'b': True}
Use Cases
Data Processing ETL:
extract_pipeline = Pipeline()
extract_pipeline.set_steps([
(fetch_from_api, "Fetch API", "v1.0"),
(parse_json, "Parse JSON", "v1.0"),
])
transform_pipeline = Pipeline()
transform_pipeline.set_steps([
(clean_data, "Clean", "v1.0"),
(validate_schema, "Validate", "v1.0"),
(enrich_data, "Enrich", "v1.0"),
])
load_pipeline = Pipeline()
load_pipeline.set_steps([
(format_for_db, "Format", "v1.0"),
(insert_records, "Insert", "v1.0"),
])
etl_pipeline = Pipeline()
etl_pipeline.set_steps([
(extract_pipeline, "Extract", "v1.0"),
(transform_pipeline, "Transform", "v1.0"),
(load_pipeline, "Load", "v1.0"),
])
Complete Example
from wpipe import Pipeline
def fetch(data):
return {"users": [{"id": 1}, {"id": 2}]}
def enrich_user(data):
user = data["current_user"]
return {"enriched": {"id": user["id"], "name": f"User {user['id']}"}}
def save_user(data):
return {"saved": True}
user_pipeline = Pipeline()
user_pipeline.set_steps([
(enrich_user, "Enrich", "v1.0"),
(save_user, "Save", "v1.0"),
])
main_pipeline = Pipeline()
main_pipeline.set_steps([
(fetch, "Fetch", "v1.0"),
(user_pipeline, "Process Users", "v1.0"),
])
Best Practices
Keep sub-pipelines focused: Single responsibility
Name nested pipelines clearly: Easy to understand structure
Test sub-pipelines independently: Ensure reliability
Document data expectations: What each nested pipeline needs/produces
Next Steps
Learn about Error Handling for error recovery
Explore Conditional Branching for conditional execution