Mastering wpipe: Usage Guide

This guide provides deep-dive examples for every capability of the WPipe v2.4.0-LTS engine. Use the tabs below to navigate through different complexity levels.

All examples assume you have performed a standard installation: pip install wpipe.

1. Fundamental Step Types

WPipe is extremely flexible. You can define steps using any Python callable.

The most common way to build pipelines. Clean, testable, and simple.

def transform(data):
    # 'data' contains everything from previous steps
    return {"cleaned": data["raw"].strip()}

Ideal for stateful operations or when you need initialization parameters.

class Multiplier:
    def __init__(self, factor):
        self.factor = factor

    def __call__(self, data):
        return {"result": data["value"] * self.factor}

pipe.set_steps([ (Multiplier(10), "Scale", "v1.0") ])

Perfect for quick, one-liner data mapping.

pipe.set_steps([
    (lambda d: {"id": d["id"].upper()}, "UpperID", "v1.0")
])

2. Advanced Control Flow

Beyond linear sequences, WPipe supports complex logical structures.

Execute different paths based on runtime data.

🌳 Conditional Branching
from wpipe import Condition

logic = Condition(
    expression="status == 'CRITICAL'",
    branch_true=[notify_admin],
    branch_false=[log_info]
)

Iterate until a condition is met or for a fixed count.

🔁 Intelligent Loops
from wpipe import For

loop = For(
    iterations=5,
    validation_expression="fuel > 0",
    steps=[drive_step]
)

3. Enterprise Resiliency

Built for production environments where things will fail.

Configure fine-grained retry strategies for unstable steps.

@step(name="API_Call", retry_count=3, retry_delay=2)
def fetch_remote(data):
    # This will retry up to 3 times on any exception
    return call_api()

WPipe can save its state to a database and resume exactly where it left off.

# Checkpoint is triggered only if the expression matches
pipeline.add_checkpoint(
    checkpoint_name="data_loaded",
    expression="len(records) > 0"
)

Global error handlers that receive the full context and traceback.

def slack_notifier(context, error):
    send_to_slack(f"Pipeline {error['step_name']} failed!")

pipeline.add_error_capture([slack_notifier])

4. High Performance

Scale your pipelines horizontally and monitor them in real-time.

Feature

Code Example

Parallelism

from wpipe import Parallel
Parallel(steps=[t1, t2], use_processes=True)

Async Engine

from wpipe import PipelineAsync
result = await pipe.run(data)

Monitoring

from wpipe import ResourceMonitor
with ResourceMonitor("Audit") as m:
    pipe.run(data)

5. Operational Integration

WPipe integrates seamlessly with your infrastructure.

Start the web-based dashboard to see your pipelines in a timeline or graph view.

🌐 Dashboard Visualizer
from wpipe import start_dashboard
start_dashboard(db_path="tracking.db", port=5000)
api_integration

Define your entire pipeline architecture in clean, version-controlled YAML files.

📋 YAML-Driven Pipelines
name: MyPipeline
steps:
  - name: Step1
    func: my_module.my_func
    version: v1.1
yaml_config

Looking for more? Check the 140-level La Senda del Maestro: 140 Niveles de Poder for specialized patterns.