Nivel 47: demo_level47.py
Este es el nivel 47 del tour de aprendizaje.
Código Fuente
"""
DEMO LEVEL 47: Async with Simulated For Loop
---------------------------------------------
Adds: Loop in async pipeline (simulated with sequential steps).
Continues: L46.
DIAGRAM:
Steps executed 3 times sequentially
"""
import asyncio
from typing import Any
from wpipe import PipelineAsync
async def process_frame_0(data: Any) -> None:
"""Process frame 0 step asynchronously.
Args:
data: Input data.
"""
print("🖼️ [ASYNC] Frame 0")
async def process_frame_1(data: Any) -> None:
"""Process frame 1 step asynchronously.
Args:
data: Input data.
"""
print("🖼️ [ASYNC] Frame 1")
async def process_frame_2(data: Any) -> None:
"""Process frame 2 step asynchronously.
Args:
data: Input data.
"""
print("🖼️ [ASYNC] Frame 2")
async def main() -> None:
"""Main async entry point."""
pipe = PipelineAsync(pipeline_name="trip_l47_asyncfor", verbose=True)
pipe.set_steps([process_frame_0, process_frame_1, process_frame_2])
print("\n>>> Testing async with loop...\n")
try:
await pipe.run({})
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
asyncio.run(main())
Resultado de Ejecución
>>> Testing async with loop...🖼️ [ASYNC] Frame 0 🖼️ [ASYNC] Frame 1 🖼️ [ASYNC] Frame 2