Nivel 5: demo_level5.py

Este es el nivel 5 del tour de aprendizaje.

Código Fuente

"""
DEMO LEVEL 5: Dynamic Initial Warehouse
---------------------------------------
Adds: Data injection when starting the pipeline (run).

DIAGRAM:
[Warehouse with 'weather': 'Sunny'] <--- Injected in run()
      |
      v
(Steps 1-4) --> [engine, gps, etc.]
      |
      v
(adjust_climate) -> Uses the data injected at the start!
"""

from typing import Any, Dict
from wpipe import Pipeline, step, to_obj


def start_engine(_data: Dict[str, Any]) -> Dict[str, Any]:
    """Start the engine.

    Args:
        _data (Dict[str, Any]): The current pipeline context data.

    Returns:
        Dict[str, Any]: Engine status.
    """
    return {"engine": "ON"}


@step(name="configure_gps")
class ConfigureGPS:
    """Class-based step for GPS configuration."""

    def __init__(self, destination: str):
        """Initialize with destination.

        Args:
            destination (str): Destination name.
        """
        self.destination = destination

    def __call__(self, _data: Dict[str, Any]) -> Dict[str, Any]:
        """Return the destination.

        Args:
            _data (Dict[str, Any]): The current pipeline context data.

        Returns:
            Dict[str, Any]: Destination.
        """
        return {"destination": self.destination}


@step(name="adjust_climate")
@to_obj
def adjust_climate(ctx: Any) -> Dict[str, Any]:
    """Adjust the car climate based on external weather.

    Args:
        ctx (Any): The context object.

    Returns:
        Dict[str, Any]: Internal temperature setting.
    """
    # 'weather' comes from the initial dictionary in run()
    print(f"🌡️ External weather: {ctx.weather}. Adjusting air...")
    return {"internal_climate": 22}


if __name__ == "__main__":
    pipeline = Pipeline(pipeline_name="Trip_L5", verbose=True)
    pipeline.set_steps([start_engine, ConfigureGPS("Madrid"), adjust_climate])

    # PASS INITIAL DATA HERE:
    pipeline.run({"weather": "Sunny"})

Resultado de Ejecución


🌡️ External weather: Sunny. Adjusting air… Trip_L5 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00