Nivel 14: demo_level14.py

Este es el nivel 14 del tour de aprendizaje.

Código Fuente

"""
DEMO LEVEL 14: Service Area Stop (Checkpoints)
----------------------------------------------
Adds: Saving trip status at a stop.
Accumulates: All trip telemetry.

DIAGRAM:
(Section 1: City) -> (Arrival Service Area)
      |
      v
[CHECKPOINT: 'break_1'] --> (Saves Gasoline, Km, Destination in DB)
      |
      v
(Section 2: Highway)
"""

import os
from typing import Any, Dict

from wpipe import CheckpointManager, Pipeline, step

@step(name="drive_to_service_area")
def drive_to_service_area(data: Any) -> Dict[str, int]:
    """Drive to service area step.

    Args:
        data: Input data for the step.

    Returns:
        Dict[str, int]: Kilometers and gasoline level.
    """
    print("🛣️  Driving 100km to the service area...")
    return {"km": 100, "gasoline": 70}

@step(name="service_break")
def service_break(data: Any) -> Dict[str, bool]:
    """Service break step.

    Args:
        data: Input data for the step.

    Returns:
        Dict[str, bool]: Resting status.
    """
    print("☕ Taking a coffee. The car saves progress automatically.")
    return {"rested": True}

if __name__ == "__main__":
    os.makedirs("output", exist_ok=True)
    ck_mgr = CheckpointManager("output/trip_progress.db")

    pipe = Pipeline(pipeline_name="trip_l14_checkpoints", verbose=True)
    pipe.set_steps([drive_to_service_area, service_break])

    # The system registers the milestone 'stop_1'
    pipe.run(
        {"trip_id": "vacation_2026"}, checkpoint_mgr=ck_mgr, checkpoint_id="trip_1"
    )

Resultado de Ejecución


🛣️ Driving 100km to the service area… ☕ Taking a coffee. The car saves progress automatically. trip_l14_checkpoints ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00