Nested Pipelines Tutorial
Learn how to compose complex workflows from smaller, reusable pipelines.
1. Introduction
Nested pipelines allow you to:
Reuse common workflow patterns
Organize complex processing into logical groups
Build modular, maintainable pipelines
2. Basic Nested Pipeline
2.1 Creating Nested Pipelines
from wpipe import Pipeline
# Create inner pipeline
inner_pipeline = Pipeline(verbose=True)
inner_pipeline.set_steps([
(lambda d: {"inner_value": d["x"] * 2}, "Multiply by 2", "v1.0"),
(lambda d: {"squared": d["inner_value"] ** 2}, "Square", "v1.0"),
])
# Create outer pipeline
outer_pipeline = Pipeline(verbose=True)
outer_pipeline.set_steps([
(lambda d: {"x": 5}, "Initialize", "v1.0"),
(inner_pipeline.run, "Run Inner Pipeline", "v1.0"),
(lambda d: {"final": d.get("squared", 0) + 10}, "Finalize", "v1.0"),
])
result = outer_pipeline.run({})
print(result["final"]) # 60
3. Pipeline Factory Pattern
3.1 Creating Reusable Pipelines
def create_data_processing_pipeline(name: str, multiplier: float):
"""Factory function to create processing pipelines."""
pipeline = Pipeline()
pipeline.set_steps([
(lambda d: {"input": d.get("value", 0) * multiplier},
f"{name} Input", "v1.0"),
(lambda d: {"processed": d["input"] + 1},
f"{name} Process", "v1.0"),
(lambda d: {"output": d["processed"] * 2},
f"{name} Output", "v1.0"),
])
return pipeline
# Create multiple pipelines from factory
pipeline_a = create_data_processing_pipeline("PipelineA", 2.0)
pipeline_b = create_data_processing_pipeline("PipelineB", 3.0)
4. Complete Example
from wpipe import Pipeline
# Define reusable sub-pipelines
def create_etl_pipeline():
"""Create an ETL pipeline."""
pipeline = Pipeline()
pipeline.set_steps([
(lambda d: {"raw_data": [1, 2, 3, 4, 5]}, "Extract", "v1.0"),
(lambda d: {"cleaned": [x * 2 for x in d["raw_data"]]}, "Transform", "v1.0"),
(lambda d: {"loaded": True, "count": len(d["cleaned"])}, "Load", "v1.0"),
])
return pipeline
def create_validation_pipeline():
"""Create a validation pipeline."""
pipeline = Pipeline()
pipeline.set_steps([
(lambda d: {"valid": True}, "Check Format", "v1.0"),
(lambda d: {"validated": True}, "Check Rules", "v1.0"),
])
return pipeline
# Main pipeline that combines them
main_pipeline = Pipeline(verbose=True)
main_pipeline.set_steps([
(lambda d: {"source": "database"}, "Initialize", "v1.0"),
(create_etl_pipeline().run, "ETL Process", "v1.0"),
(create_validation_pipeline().run, "Validation", "v1.0"),
(lambda d: {"status": "completed"}, "Complete", "v1.0"),
])
result = main_pipeline.run({})
print(f"Result: {result}")
5. Best Practices
Keep nested pipelines focused on single responsibility
Use pipeline factories for configuration
Test sub-pipelines independently
6. Next Steps
SQLite Integration Tutorial - Add persistence
Production Deployment Tutorial - Deploy to production