wpipe
Python Pipeline Library for Industrial-Grade Orchestration
Build powerful, production-ready data processing pipelines with native parallelism, intelligent recovery, and deep observability.
No web UI required | High-Performance | Resilient by Design
wpipe 2.4.0 LTS Documentation
wpipe is a powerful, industrial-grade Python library for orchestrating high-performance data processing pipelines with native support for parallelism, intelligent recovery, and deep observability. It now includes a 140-level Learning Tour to master the library.
📚 Documentation
- Getting Started with wpipe
- Installation Guide
- Mastering wpipe: Usage Guide
- User Guide
- WPipe Academy: Master Your Pipelines
- API Technical Specification
- Examples Gallery
- 01 Basic Pipeline (15 Examples)
- 02 API Pipeline (20 Examples)
- 03 Error Handling (15 Examples)
- 04 Conditional Branching (12 Examples)
- 05 Retry Logic (12 Examples)
- 06 SQLite Integration (14 Examples)
- 07 Nested Pipelines (14 Examples)
- 08 YAML Configuration (14 Examples)
- 09 Microservice (11 Examples)
- Running Examples
- Example Directory Structure
- System Architecture: The WPipe Engine
- Best Practices for Industrial Pipelines
- FAQ
- Glossary
- Contributing
- Changelog
🎯 Why wpipe?
Traditional workflow tools like Apache Airflow, Prefect, or Dagster are excellent but often introduce significant complexity. wpipe provides a refreshing alternative for high-performance engineering:
⚡ Lightning Mode
Optimized SQLite architecture with WAL mode and non-blocking monitoring. Designed for high-frequency bursts.
🧵 Native Parallelism
Execute steps using Threading or Process pooling with a single command. Full GIL bypass for CPU-heavy tasks.
🛡️ Intelligent Checkpoints
Define milestones using logical expressions. Auto-resume exactly where you left off after system failures.
🎓 140-Level Tour
Master the library with a guided tour of 140+ examples, covering everything from basic steps to complex DAGs.
🧬 Data Contracts
Strict schema validation for your data context using PipelineContext and TypeValidator.
🔄 Async/Sync Parity
Choose between Pipeline or PipelineAsync with 100% feature parity and coroutine support.
🚀 Quick Start
Get up and running in under 2 minutes:
Installation
# Install wpipe from PyPI
pip install wpipe
Your First Pipeline
from wpipe import Pipeline, step
@step(name="Process", retry_count=3)
def my_step(data):
return {"result": data.get("x", 0) * 2}
pipeline = Pipeline(verbose=True)
pipeline.set_steps([my_step])
result = pipeline.run({"x": 10})
# Output: {'x': 10, 'result': 20}