Nivel 34: demo_level34.py

Este es el nivel 34 del tour de aprendizaje.

Código Fuente

"""
DEMO LEVEL 34: Fault Isolation (Continue on Error)
--------------------------------------------------
Adds: continue_on_error=True for secondary systems.
Accumulates: Error Management (L11).

DIAGRAM:
(Safety_Braking) -> OK
      |
(Radio_Spotify)   -> ERROR! (No internet)
      |
(Cruise_Control)  -> STILL WORKING!
"""

from typing import Any, Dict
from wpipe import Pipeline, step

@step(name="piloting_system")
def piloting_system(data: Any) -> Dict[str, str]:
    """Main piloting system step.

    Args:
        data: Input data for the step.

    Returns:
        Dict[str, str]: Piloting status.
    """
    return {"piloting": "ACTIVE"}

@step(name="infotainment")
def infotainment(data: Any) -> None:
    """Infotainment system step with potential failure.

    Args:
        data: Input data for the step.

    Raises:
        RuntimeError: If network error occurs.
    """
    print("🎵 Radio: Trying to connect to the cloud...")
    raise RuntimeError("Network error: Streaming not available")

@step(name="maintain_distance")
def maintain_distance(data: Any) -> Dict[str, bool]:
    """Radar distance maintenance step.

    Args:
        data: Input data for the step.

    Returns:
        Dict[str, bool]: Distance status.
    """
    print("📏 Radar: Maintaining active safety distance.")
    return {"distance_ok": True}

if __name__ == "__main__":
    # NEW IN L34: A radio error does not stop navigation
    pipe = Pipeline(
        pipeline_name="isolation_system_l34", continue_on_error=True, verbose=True
    )

    pipe.set_steps([piloting_system, infotainment, maintain_distance])

    print(">>> Starting trip: Secondary failures will not affect driving.")
    pipe.run({})

Resultado de Ejecución


>>> Starting trip: Secondary failures will not affect driving.
🎵 Radio: Trying to connect to the cloud...
📏 Radar: Maintaining active safety distance.
isolation_system_l34 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00