Nivel 46: demo_level46.py
Este es el nivel 46 del tour de aprendizaje.
Código Fuente
"""
DEMO LEVEL 46: Async with Condition
-----------------------------------
Adds: Condition in async pipeline.
Continues: L45.
DIAGRAM:
(async evaluate_situation)
|
+-- (obstacle == True) -> [BRAKE]
+-- (obstacle == False) -> [ACCELERATE]
"""
import asyncio
import random
from typing import Any, Dict
from wpipe import PipelineAsync, Condition
async def evaluate_situation(data: Any) -> Dict[str, bool]:
"""Evaluate situation step asynchronously.
Args:
data: Input data.
Returns:
Dict[str, bool]: Obstacle presence.
"""
await asyncio.sleep(0.05)
obstacle = random.random() < 0.3
print(f"🚗 [ASYNC] Evaluation: obstacle={obstacle}")
return {"obstacle": obstacle}
async def brake(data: Any) -> Dict[str, str]:
"""Brake step asynchronously.
Args:
data: Input data.
Returns:
Dict[str, str]: Action performed.
"""
print("🛑 [ASYNC] Emergency braking")
return {"action": "brake"}
async def accelerate(data: Any) -> Dict[str, str]:
"""Accelerate step asynchronously.
Args:
data: Input data.
Returns:
Dict[str, str]: Action performed.
"""
print("🚀 [ASYNC] Accelerating")
return {"action": "accelerate"}
async def main() -> None:
"""Main async entry point."""
pipe = PipelineAsync(pipeline_name="trip_l46_asynccondition", verbose=True)
pipe.set_steps(
[
evaluate_situation,
Condition(
expression="obstacle == True",
branch_true=[brake],
branch_false=[accelerate],
),
]
)
print("\n>>> Testing async with conditions...\n")
try:
await pipe.run({})
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
asyncio.run(main())
Resultado de Ejecución
>>> Testing async with conditions...🚗 [ASYNC] Evaluation: obstacle=False 🚀 [ASYNC] Accelerating