Nivel 19: demo_level19.py

Este es el nivel 19 del tour de aprendizaje.

Código Fuente

"""
DEMO LEVEL 19: Modular Sections (Nested Pipelines)
--------------------------------------------------
Adds: Use of a Pipeline as a step of another.
Accumulates: Modularization of driving.

DIAGRAM:
Pipeline(Full Trip) [
  (Engine Preparation)
  Pipeline(Urban_Section) [
     (Pass Traffic Lights)
     (Pedestrian Control)
  ]
  (Arrival)
]
"""

from typing import Any, Dict
from wpipe import Pipeline, step

@step(name="prepare_engine")
def prepare_engine(data: Any) -> Dict[str, str]:
    """Engine preparation step.

    Args:
        data: Input data for the step.

    Returns:
        Dict[str, str]: Engine status.
    """
    return {"engine": "READY"}

# NEW IN L19: Defining an independent 'sub-trip'
urban_section = Pipeline(pipeline_name="urban_driving")

@step(name="cross_crosswalk")
def cross_crosswalk(data: Any) -> Dict[str, bool]:
    """Cross crosswalk step.

    Args:
        data: Input data for the step.

    Returns:
        Dict[str, bool]: Pedestrian status.
    """
    print("🚶 Urban Section: Yielding to pedestrians...")
    return {"pedestrians_crossing": False}

urban_section.set_steps([cross_crosswalk])

if __name__ == "__main__":
    full_trip = Pipeline(pipeline_name="trip_l19_modular", verbose=True)

    full_trip.set_steps(
        [
            prepare_engine,
            urban_section,  # <--- The urban pipeline acts as a piece of the trip
            (lambda d: print("🏁 Trip completed."), "finish", "v1"),
        ]
    )

    full_trip.run({})

Resultado de Ejecución


🚶 Urban Section: Yielding to pedestrians… trip_l19_modular ━━━━━━━━━━━━━ 33% -:–:– urban_driving ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00 🏁 Trip completed.