Nivel 44: demo_level44.py

Este es el nivel 44 del tour de aprendizaje.

Código Fuente

"""
DEMO LEVEL 44: Async Parallel
-----------------------------
Adds: Parallel async steps.
Continues: L43.

DIAGRAM:
Parallel(async steps) {
  |-- (async front_camera)
  |-- (async rear_camera)
  |-- (async radar)
}
"""

import asyncio
from typing import Any, Dict
from wpipe import PipelineAsync, Parallel

async def front_camera(data: Any) -> Dict[str, str]:
    """Front camera activation step asynchronously.

    Args:
        data: Input data.

    Returns:
        Dict[str, str]: Camera status.
    """
    await asyncio.sleep(0.05)
    print("📷 [ASYNC] Front camera activated")
    return {"front": "active"}

async def rear_camera(data: Any) -> Dict[str, str]:
    """Rear camera activation step asynchronously.

    Args:
        data: Input data.

    Returns:
        Dict[str, str]: Camera status.
    """
    await asyncio.sleep(0.05)
    print("📷 [ASYNC] Rear camera activated")
    return {"rear": "active"}

async def radar(data: Any) -> Dict[str, str]:
    """Radar activation step asynchronously.

    Args:
        data: Input data.

    Returns:
        Dict[str, str]: Radar status.
    """
    await asyncio.sleep(0.05)
    print("📡 [ASYNC] Radar activated")
    return {"radar": "active"}

async def main() -> None:
    """Main async entry point."""
    pipe = PipelineAsync(pipeline_name="trip_l44_asyncparallel", verbose=True)
    pipe.set_steps(
        [Parallel(steps=[front_camera, rear_camera, radar], max_workers=3)]
    )
    print("\n>>> Testing async parallel...\n")
    await pipe.run({})

if __name__ == "__main__":
    asyncio.run(main())

Resultado de Ejecución


>>> Testing async parallel...

[PARALLEL ASYNC] Executing 3 steps concurrently