Nivel 41: demo_level41.py
Este es el nivel 41 del tour de aprendizaje.
Código Fuente
"""
DEMO LEVEL 41: Basic Async Pipeline
------------------------------------
Adds: PipelineAsync for asynchronous execution.
Accumulates: Basic pipeline from L1.
DIAGRAM:
(async start_motor) --> [motor: 'ON']
|
v
(async verify_sensors) --> [sensors: 'OK']
"""
import asyncio
from typing import Any, Dict
from wpipe import PipelineAsync
def start_motor(data: Any) -> Dict[str, Any]:
"""Start motor step.
Args:
data: Input data for the step.
Returns:
Dict[str, Any]: Motor status and fuel level.
"""
print("🔑 [ASYNC] Motor started")
return {"motor": "ON", "fuel": 100}
def verify_sensors(data: Any) -> Dict[str, str]:
"""Verify sensors step.
Args:
data: Input data for the step.
Returns:
Dict[str, str]: Sensors status.
"""
print("📡 [ASYNC] Sensors verified")
return {"sensors": "OK"}
if __name__ == "__main__":
async def main() -> None:
"""Main async entry point."""
pipe = PipelineAsync(pipeline_name="trip_l41_asyncbasic", verbose=True)
pipe.set_steps([start_motor, verify_sensors])
print("\n>>> Starting asynchronous pipeline...\n")
await pipe.run({})
asyncio.run(main())
Resultado de Ejecución
>>> Starting asynchronous pipeline...🔑 [ASYNC] Motor started 📡 [ASYNC] Sensors verified