Nivel 42: demo_level42.py
Este es el nivel 42 del tour de aprendizaje.
Código Fuente
"""
DEMO LEVEL 42: Step Decorator Async
-----------------------------------
Adds: Async functions within the pipeline.
Continues: L41.
DIAGRAM:
(async verify_battery) --> [battery: 85%]
|
v
(async start_system) --> [system: 'ON']
"""
import asyncio
from typing import Any, Dict
from wpipe import PipelineAsync
async def verify_battery(data: Any) -> Dict[str, int]:
"""Verify battery step asynchronously.
Args:
data: Input data.
Returns:
Dict[str, int]: Battery level.
"""
await asyncio.sleep(0.05)
print("🔋 [ASYNC] Battery at 85%")
return {"battery": 85}
async def start_system(data: Any) -> Dict[str, str]:
"""Start system step asynchronously.
Args:
data: Input data.
Returns:
Dict[str, str]: System status.
"""
await asyncio.sleep(0.05)
print("🟢 [ASYNC] System started")
return {"system": "ON"}
if __name__ == "__main__":
async def main() -> None:
"""Main async entry point."""
pipe = PipelineAsync(pipeline_name="trip_l42_asyncstep", verbose=True)
pipe.set_steps([verify_battery, start_system])
print("\n>>> Testing async functions...\n")
await pipe.run({})
asyncio.run(main())
Resultado de Ejecución
>>> Testing async functions...🔋 [ASYNC] Battery at 85% 🟢 [ASYNC] System started