Nivel 8: demo_level8.py

Este es el nivel 8 del tour de aprendizaje.

Código Fuente

"""
DEMO LEVEL 8: Branching (Condition)
-----------------------------------
Adds: Use of Condition() to execute True/False branches.

DIAGRAM:
(process_frame)
      |
      v
Condition(Obstacle detected?)
      |--- [True]  -> (emergency_brake)
      |--- [False] -> (gentle_accelerate)
"""

import random
from typing import Any, Dict, Generator, Tuple
import numpy as np
from wpipe import Condition, For, Pipeline, step, to_obj


def simulate_video() -> Generator[Tuple[int, np.ndarray], None, None]:
    """Simulate a video stream.

    Yields:
        Tuple[int, np.ndarray]: Frame ID and image.
    """
    for i in range(10):
        yield i, np.zeros((100, 100, 3), dtype=np.uint8)


@step(name="start_camera")
def start_camera(_data: Dict[str, Any]) -> Dict[str, Any]:
    """Start camera.

    Args:
        _data (Dict[str, Any]): The current pipeline context data.

    Returns:
        Dict[str, Any]: Stream generator.
    """
    return {"stream": simulate_video()}


@step(name="process_frame")
@to_obj
def process_frame(ctx: Any) -> Dict[str, Any]:
    """Process a frame and detect hazards.

    Args:
        ctx (Any): The context object.

    Returns:
        Dict[str, Any]: Frame ID and obstacle status.
    """
    try:
        frame_id, _ = next(ctx.stream)
        danger = random.random() < 0.3
        print(f"šŸ–¼ļø Frame {frame_id} | Danger: {danger}")
        return {"current_frame": frame_id, "obstacle": danger}
    except StopIteration:
        return {"error": "End"}


@step(name="brake")
def brake(data: Dict[str, Any]) -> Dict[str, Any]:
    """Apply emergency brakes.

    Args:
        data (Dict[str, Any]): The current pipeline context data.

    Returns:
        Dict[str, Any]: Action performed.
    """
    print(f"šŸ›‘ EMERGENCY BRAKE ACTIVATED! Data: {data}")
    return {"action": "Braking"}


@step(name="accelerate")
def accelerate(data: Dict[str, Any]) -> Dict[str, Any]:
    """Accelerate smoothly.

    Args:
        data (Dict[str, Any]): The current pipeline context data.

    Returns:
        Dict[str, Any]: Action performed.
    """
    print(f"šŸ›£ļø Clear road. Accelerating... Data: {data}")
    return {"action": "Accelerating"}


if __name__ == "__main__":
    pipeline = Pipeline(pipeline_name="Trip_L8", verbose=True)
    pipeline.set_steps(
        [
            start_camera,
            For(
                iterations=5,
                steps=[
                    process_frame,
                    Condition(
                        expression="obstacle == True",
                        branch_true=[brake],
                        branch_false=[accelerate],
                    ),
                ],
            ),
        ]
    )
    pipeline.run({})

Resultado de Ejecución


šŸ–¼ļø Frame 0 | Danger: True [CONDITION] Evaluating: obstacle == True šŸ›‘ EMERGENCY BRAKE ACTIVATED! Data: {ā€˜_pipeline_start_time’: ā€˜2026-04-30T13:36:32.231285’, ā€˜stream’: <generator object simulate_video at 0x72efe6a249e0>, ā€˜_loop_iteration’: 0, ā€˜progress_rich’: <rich.progress.Progress object at 0x72efe6a28c20>, ā€˜current_frame’: 0, ā€˜obstacle’: True} šŸ–¼ļø Frame 1 | Danger: False [CONDITION] Evaluating: obstacle == True šŸ›£ļø Clear road. Accelerating… Data: {ā€˜_pipeline_start_time’: ā€˜2026-04-30T13:36:32.231285’, ā€˜stream’: <generator object simulate_video at 0x72efe6a249e0>, ā€˜_loop_iteration’: 1, ā€˜progress_rich’: <rich.progress.Progress object at 0x72efe6a28c20>, ā€˜current_frame’: 1, ā€˜obstacle’: False, ā€˜action’: ā€˜Braking’} šŸ–¼ļø Frame 2 | Danger: False [CONDITION] Evaluating: obstacle == True šŸ›£ļø Clear road. Accelerating… Data: {ā€˜_pipeline_start_time’: ā€˜2026-04-30T13:36:32.231285’, ā€˜stream’: <generator object simulate_video at 0x72efe6a249e0>, ā€˜_loop_iteration’: 2, ā€˜progress_rich’: <rich.progress.Progress object at 0x72efe6a28c20>, ā€˜current_frame’: 2, ā€˜obstacle’: False, ā€˜action’: ā€˜Accelerating’} šŸ–¼ļø Frame 3 | Danger: True [CONDITION] Evaluating: obstacle == True šŸ›‘ EMERGENCY BRAKE ACTIVATED! Data: {ā€˜_pipeline_start_time’: ā€˜2026-04-30T13:36:32.231285’, ā€˜stream’: <generator object simulate_video at 0x72efe6a249e0>, ā€˜_loop_iteration’: 3, ā€˜progress_rich’: <rich.progress.Progress object at 0x72efe6a28c20>, ā€˜current_frame’: 3, ā€˜obstacle’: True, ā€˜action’: ā€˜Accelerating’} šŸ–¼ļø Frame 4 | Danger: True [CONDITION] Evaluating: obstacle == True šŸ›‘ EMERGENCY BRAKE ACTIVATED! Data: {ā€˜_pipeline_start_time’: ā€˜2026-04-30T13:36:32.231285’, ā€˜stream’: <generator object simulate_video at 0x72efe6a249e0>, ā€˜_loop_iteration’: 4, ā€˜progress_rich’: <rich.progress.Progress object at 0x72efe6a28c20>, ā€˜current_frame’: 4, ā€˜obstacle’: True, ā€˜action’: ā€˜Braking’} Trip_L8 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00