Advanced Patterns Tutorial

Learn advanced design patterns for complex pipeline scenarios using WPipe v2.4.0 high-end features. This tutorial is intended for senior developers building distributed systems or high-throughput data processing engines.

1. DAG Scheduling (Directed Acyclic Graphs)

When your tasks don’t follow a linear sequence, you need a DAG Scheduler. WPipe allows you to define dependencies between steps, and the engine will automatically calculate the execution order.

Pattern: Dependency-Based Execution

from wpipe.parallel import ParallelExecutor, ExecutionMode

executor = ParallelExecutor(max_workers=4)

# Define steps with explicit dependencies
executor.add_step("fetch", fetch_data, mode=ExecutionMode.IO_BOUND)
executor.add_step("clean", clean_data, depends_on=["fetch"])
executor.add_step("analyze", analyze_data, depends_on=["clean"])
executor.add_step("report", generate_report, depends_on=["analyze"])

result = executor.execute({})

Why use this? - Horizontal Scaling: Independent branches run in parallel automatically. - Complexity Management: Visually map your workflow as a graph.

2. Native Parallelism (IO vs CPU)

WPipe differentiates between tasks that wait for external resources (IO) and tasks that consume CPU.

  • ThreadPoolExecutor (IO-Bound): Best for API calls, database queries, and file operations.

  • ProcessPoolExecutor (CPU-Bound): Best for heavy calculations, image processing, or data transformations that bypass the Python GIL.

from wpipe import Parallel

pipeline.set_steps([
    Parallel(
        steps=[heavy_math_1, heavy_math_2],
        use_processes=True,  # Full GIL bypass
        max_workers=2
    )
])

3. Forensic Error Capture

Standard logging often misses the context of a failure. WPipe’s forensic capture provides the exact state of the warehouse at the moment of impact.

def alert_handler(context, error):
    print(f"FAILED: {error['step_name']} at line {error['line_number']}")
    # send to slack/telegram/email

pipeline.add_error_capture([alert_handler])

Metadata available in error object: - step_name: The failing task. - error_message: The exception string. - file_path: Exact location in source code. - line_number: Where it happened. - timestamp: When the error happened.

4. High-Performance Monitoring

WPipe v2.4.0 features a non-blocking ResourceMonitor using WAL (Write-Ahead Logging) mode for SQLite. This allows you to track system health without slowing down the primary processing.

from wpipe import ResourceMonitor

with ResourceMonitor("Heavy_Process") as monitor:
    pipeline.run(large_dataset)

stats = monitor.get_summary()
print(f"Peak Memory: {stats['peak_ram_mb']} MB")

Key metrics captured: - RSS RAM: Real memory usage of the process. - CPU %: Core utilization (normalized for multi-core). - Disk IO: Read/Write intensity.

5. Dynamic Pipeline Composition

Treat pipelines as first-class citizens. You can nest pipelines within pipelines to create recursive or modular architectures.

sub_pipe = Pipeline(pipeline_name="Worker")
sub_pipe.set_steps([validate, transform])

main_pipe = Pipeline(pipeline_name="Master")
main_pipe.set_steps([
    fetch_raw,
    sub_pipe,  # Nested execution
    save_final
])

Benefits: - Encapsulation: Hide complexity inside sub-pipelines. - Reusability: Use the same sub-pipeline in multiple projects.