Nivel 57: demo_level57.py

Este es el nivel 57 del tour de aprendizaje.

Código Fuente

from typing import Any, Dict
"""
DEMO LEVEL 57: Checkpoints con Callbacks
---------------------------------------
Adds: Checkpoints que disparan callbacks al cumplir condición.
Continues: Eventos de L56.

DIAGRAM:
[Checkpoint: fuel < 20] --True--> [mostrar_alerta_combustible]
[Checkpoint: temp > 90]  --False--> (no se dispara)

"""

from wpipe import Pipeline, step

@step(name="start_engine")
def start_engine(data: dict) -> None:

    """Start engine step.

    Args:

        data: Input data for the step.

    Returns:

        dict: Result of the step.

    """
    print("🚀 Motor arrancado")
    return {"fuel": 15, "temp": 85}

@step(name="monitor_engine")
def monitor_engine(data: dict) -> None:

    """Monitor engine step.

    Args:

        data: Input data for the step.

    Returns:

        dict: Result of the step.

    """
    print("📊 Monitoreando motor...")
    return data

@step(name="mostrar_alerta_combustible")
def mostrar_alerta_combustible(context: Any) -> None:

    """Mostrar alert combustible step.

    Args:

        context: Input data for the step.

    Returns:

        dict: Result of the step.

    """
    print("⛽ [CHECKPOINT] ¡Alerta de combustible!")
    return {"alert": "combustible"}

@step(name="mostrar_alerta_temperatura")
def mostrar_alerta_temperatura(context: Any) -> None:

    """Mostrar alert temperatura step.

    Args:

        context: Input data for the step.

    Returns:

        dict: Result of the step.

    """
    print("🌡️ [CHECKPOINT] ¡Alerta de temperatura!")
    return {"alert": "temperatura"}

if __name__ == "__main__":
    pipe = Pipeline(pipeline_name="viaje_l57_checkpoints", verbose=True)

    pipe.add_checkpoint(
        checkpoint_name="fuel_low",
        expression="fuel < 20",
        steps=[mostrar_alerta_combustible],
    )

    pipe.add_checkpoint(
        checkpoint_name="temp_high",
        expression="temp > 90",
        steps=[mostrar_alerta_temperatura],
    )

    pipe.set_steps([start_engine, monitor_engine])
    print("\n>>> Startsndo con checkpoints...\n")
    pipe.run({})

Resultado de Ejecución


>>> Startsndo con checkpoints...

[CHECKPOINT INFO] Milestone ‘fuel_low’ skip/busy: name ‘fuel’ is not defined [CHECKPOINT INFO] Milestone ‘temp_high’ skip/busy: name ‘temp’ is not defined 🚀 Motor arrancado

[CHECKPOINT REACHED] fuel_low ⛽ [CHECKPOINT] ¡Alerta de combustible! 📊 Monitoreando motor… viaje_l57_checkpoints ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00