Mastering wpipe: Usage Guide
This guide provides deep-dive examples for every capability of the WPipe v2.4.0-LTS engine. Use the tabs below to navigate through different complexity levels.
All examples assume you have performed a standard installation: pip install wpipe.
1. Fundamental Step Types
WPipe is extremely flexible. You can define steps using any Python callable.
The most common way to build pipelines. Clean, testable, and simple.
def transform(data):
# 'data' contains everything from previous steps
return {"cleaned": data["raw"].strip()}
Ideal for stateful operations or when you need initialization parameters.
class Multiplier:
def __init__(self, factor):
self.factor = factor
def __call__(self, data):
return {"result": data["value"] * self.factor}
pipe.set_steps([ (Multiplier(10), "Scale", "v1.0") ])
Perfect for quick, one-liner data mapping.
pipe.set_steps([
(lambda d: {"id": d["id"].upper()}, "UpperID", "v1.0")
])
2. Advanced Control Flow
Beyond linear sequences, WPipe supports complex logical structures.
Execute different paths based on runtime data.
from wpipe import Condition
logic = Condition(
expression="status == 'CRITICAL'",
branch_true=[notify_admin],
branch_false=[log_info]
)
Iterate until a condition is met or for a fixed count.
from wpipe import For
loop = For(
iterations=5,
validation_expression="fuel > 0",
steps=[drive_step]
)
3. Enterprise Resiliency
Built for production environments where things will fail.
Configure fine-grained retry strategies for unstable steps.
@step(name="API_Call", retry_count=3, retry_delay=2)
def fetch_remote(data):
# This will retry up to 3 times on any exception
return call_api()
WPipe can save its state to a database and resume exactly where it left off.
# Checkpoint is triggered only if the expression matches
pipeline.add_checkpoint(
checkpoint_name="data_loaded",
expression="len(records) > 0"
)
Global error handlers that receive the full context and traceback.
def slack_notifier(context, error):
send_to_slack(f"Pipeline {error['step_name']} failed!")
pipeline.add_error_capture([slack_notifier])
4. High Performance
Scale your pipelines horizontally and monitor them in real-time.
Feature |
Code Example |
|---|---|
Parallelism |
from wpipe import Parallel
Parallel(steps=[t1, t2], use_processes=True)
|
Async Engine |
from wpipe import PipelineAsync
result = await pipe.run(data)
|
Monitoring |
from wpipe import ResourceMonitor
with ResourceMonitor("Audit") as m:
pipe.run(data)
|
5. Operational Integration
WPipe integrates seamlessly with your infrastructure.
Start the web-based dashboard to see your pipelines in a timeline or graph view.
from wpipe import start_dashboard
start_dashboard(db_path="tracking.db", port=5000)
Define your entire pipeline architecture in clean, version-controlled YAML files.
name: MyPipeline
steps:
- name: Step1
func: my_module.my_func
version: v1.1
Looking for more? Check the 140-level La Senda del Maestro: 140 Niveles de Poder for specialized patterns.