Nivel 45: demo_level45.py

Este es el nivel 45 del tour de aprendizaje.

Código Fuente

"""
DEMO LEVEL 45: Async with Checkpoints
-------------------------------------
Adds: Checkpoints in async pipeline.
Continues: L44.

DIAGRAM:
(async start_motor)
      |
      +-- [Checkpoint: engine_on]
      |
      v
(async drive)
"""

import asyncio
from typing import Any, Dict
from wpipe import PipelineAsync

async def start_motor(data: Any) -> Dict[str, str]:
    """Start motor step asynchronously.

    Args:
        data: Input data.

    Returns:
        Dict[str, str]: Engine status.
    """
    await asyncio.sleep(0.05)
    print("🔑 [ASYNC] Motor started")
    return {"engine": "on"}

async def drive(data: Any) -> Dict[str, bool]:
    """Drive step asynchronously.

    Args:
        data: Input data.

    Returns:
        Dict[str, bool]: Driving status.
    """
    await asyncio.sleep(0.05)
    print("🚗 [ASYNC] Driving...")
    return {"driving": True}

if __name__ == "__main__":

    async def main() -> None:
        """Main async entry point."""
        pipe = PipelineAsync(pipeline_name="trip_l45_asynccheckpoint", verbose=True)

        pipe.add_checkpoint(
            checkpoint_name="engine_on",
            expression="engine == 'on'",
        )

        pipe.set_steps([start_motor, drive])
        print("\n>>> Testing async with checkpoints...\n")
        try:
            await pipe.run({})
        except Exception as e:
            print(f"Error: {e}")

    asyncio.run(main())

Resultado de Ejecución


>>> Testing async with checkpoints...

[CHECKPOINT INFO] Milestone ‘engine_on’ skip: name ‘engine’ is not defined 🔑 [ASYNC] Motor started

[ASYNC CHECKPOINT REACHED] engine_on 🚗 [ASYNC] Driving…