Nivel 75: Orquestación Híbrida (For + Parallel)

Objetivo

Dominar la jerarquía de orquestación. Aprenderás a anidar un bloque Parallel dentro de un bucle For, permitiendo ejecuciones masivas y estructuradas de tareas que requieren procesamiento simultáneo en cada iteración.

Conceptos Clave

  • Jerarquía de Control: La capacidad de WPipe de manejar estructuras anidadas sin perder la trazabilidad.

  • Fusión de Datos: Cómo múltiples tareas paralelas consolidan su información antes de que el bucle continúe a la siguiente iteración.

  • Escalabilidad Vertical: Optimización del tiempo de ciclo en procesos repetitivos.

¿Qué estamos probando?

Simulamos el sistema de percepción de un vehículo autónomo. En cada iteración del bucle principal (el ciclo de control del coche), debemos leer tres sensores críticos (Cámara, Radar y LiDAR) en paralelo para minimizar la latencia, y luego fusionar esos datos.

Código Fuente

"""
DEMO LEVEL 75: For con Parallel
-----------------------------------
Adds: Parallel dentro de For loop.
Continues: L74.

DIAGRAM:
For() {
    Parallel(sensores) {
        camara, radar, lidar
    }
}
"""

from wpipe import Pipeline, For, Parallel, step

@step(name="leer_camara")
def leer_camara(data: dict) -> None:

    """Leer camara step.

    Args:

        data: Input data for the step.

    Returns:

        dict: Result of the step.

    """
    print("  📷 Cámara")

@step(name="leer_radar")
def leer_radar(data: dict) -> None:

    """Leer radar step.

    Args:

        data: Input data for the step.

    Returns:

        dict: Result of the step.

    """
    print("  📡 Radar")

@step(name="leer_lidar")
def leer_lidar(data: dict) -> None:

    """Leer lidar step.

    Args:

        data: Input data for the step.

    Returns:

        dict: Result of the step.

    """
    print("  🔴 LiDAR")

@step(name="process")
def process(data: dict) -> None:

    """Process step.

    Args:

        data: Input data for the step.

    Returns:

        dict: Result of the step.

    """
    print("🧠 Fusionando datos...")
    return {"fusion": "completa"}

if __name__ == "__main__":
    pipe = Pipeline(pipeline_name="viaje_l75_forparallel", verbose=True)
    pipe.set_steps(
        [
            For(
                iterations=2,
                steps=[
                    Parallel(
                        steps=[leer_camara, leer_radar, leer_lidar], max_workers=3
                    ),
                    process,
                ],
            )
        ]
    )
    print("\n>>> Sensores en paralelo dentro de bucle...\n")
    pipe.run({})

Resultado de Ejecución


>>> Sensores en paralelo dentro de bucle...
[PARALLEL] Executing 3 steps using THREADS (workers=3)

📷 Cámara 📡 Radar 🔴 LiDAR

🧠 Fusionando datos… [PARALLEL] Executing 3 steps using THREADS (workers=3)

📷 Cámara 📡 Radar 🔴 LiDAR

🧠 Fusionando datos… viaje_l75_forparallel ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00