Nivel 11: demo_level11.py

Este es el nivel 11 del tour de aprendizaje.

Código Fuente

"""
DEMO LEVEL 11: Sensor Failure (Error Capture)
---------------------------------------------
Adds: Error handling when the camera gets dirty or fails.
Accumulates: Smart Windshield (L10).

DIAGRAM:
(Camera) -- [Lens dirty?] -- ERROR! --> (Automatic Windshield Wipers)
      |
      v
(Process Trip)
"""

import random
from typing import Any, Dict

from wpipe import Pipeline, step

def emergency_maintenance(context: Dict[str, Any], error: Dict[str, Any]) -> Dict[str, Any]:
    """Emergency maintenance handler when a sensor fails.

    Args:
        context: The current pipeline context.
        error: Dictionary containing error details.

    Returns:
        Dict[str, Any]: The updated context.
    """
    print(f"\n🔧 SYSTEM: Error detected in '{error['step_name']}'.")
    print("🧼 ACTION: Activating sensor self-cleaning...\n")
    return context

@step(name="verify_lens")
def verify_lens(data: Any) -> Dict[str, str]:
    """Verifies if the sensor lens is clean.

    Args:
        data: Input data for the step.

    Returns:
        Dict[str, str]: Vision status.

    Raises:
        RuntimeError: If visibility is obstructed.
    """
    if random.random() < 0.3:
        raise RuntimeError("Obstructed visibility (Dirty lens)")
    print("👀 Sensors clean. Visibility 100%.")
    return {"vision": "Clear"}

if __name__ == "__main__":
    pipe = Pipeline(pipeline_name="trip_l11_faulttolerance", verbose=True)

    # The car now knows what to do if vision fails
    pipe.add_error_capture([emergency_maintenance])

    pipe.set_steps([verify_lens])
    print(">>> Starting trip with security sensors...")
    pipe.run({})

Resultado de Ejecución


>>> Starting trip with security sensors...
👀 Sensors clean. Visibility 100%.
trip_l11_faulttolerance ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00