Nivel 28: demo_level28.py

Este es el nivel 28 del tour de aprendizaje.

Código Fuente

"""
DEMO LEVEL 28: The Driver's Diary (Events)
------------------------------------------
Adds: Manual markers on the timeline.
Accumulates: Metrics and Tracking.

DIAGRAM:
[Trip] ---- [Event: 'Radar Detected'] ---- [Trip] ---- [Event: 'Coffee Break']
"""

from typing import Any, Dict
from wpipe import Pipeline, step

@step(name="cross_border")
def cross_border(data: Any) -> Dict[str, str]:
    """Cross border step.

    Args:
        data: Input data for the step.

    Returns:
        Dict[str, str]: Country status.
    """
    print("🌍 Crossing border: Changing traffic regulations...")
    return {"country": "Portugal"}

if __name__ == "__main__":
    pipe = Pipeline(
        pipeline_name="travel_log_l28",
        tracking_db="output/car_events.db",
        verbose=True,
    )

    # NEW IN L28: Note milestones that don't change the context but are recorded
    pipe.add_event(
        event_type="annotation",
        event_name="Route Start",
        message="Driver: William R. | Weather: Clear",
    )

    pipe.set_steps([cross_border])
    pipe.run({})

Resultado de Ejecución


[PIPELINE STATUS] Registered: PIPE-A09F737B 🌍 Crossing border: Changing traffic regulations… travel_log_l28 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00 [PIPELINE STATUS] PIPE-A09F737B: COMPLETED