Nivel 30: demo_level30.py
Este es el nivel 30 del tour de aprendizaje.
Código Fuente
"""
DEMO LEVEL 30: Satellite Route (External YAML)
-----------------------------------------------
Adds: Loading trip configuration from a YAML file.
Accumulates: Modularization (L19).
DIAGRAM:
[satellite.yaml] -> (read_yaml) -> [Configuration] -> Pipeline.run()
"""
import os
from typing import Any, Dict
from wpipe import Pipeline, step
from wpipe.util import escribir_yaml, leer_yaml
@step(name="show_destination")
def show_destination(data: Dict[str, Any]) -> Dict[str, str]:
"""Show destination step based on external configuration.
Args:
data: Input data for the step.
Returns:
Dict[str, str]: Estimated arrival.
"""
config = data.get("external_config", {})
print(
f"📡 Satellite: Received route to {config.get('destination')} via {config.get('route')}"
)
return {"estimated_arrival": "14:00"}
if __name__ == "__main__":
# We create a configuration file simulating the mobile app
os.makedirs("pipeline_configs", exist_ok=True)
mock_config = {"destination": "Lisbon", "route": "Toll", "stops": 2}
config_path = "pipeline_configs/satellite.yaml"
escribir_yaml(config_path, mock_config)
# NEW IN L30: We load data before starting
satellite_data = leer_yaml(config_path)
pipe = Pipeline(pipeline_name="connected_car_l30", verbose=True)
pipe.set_steps([show_destination])
print(">>> Synchronizing with mobile...")
pipe.run({"external_config": satellite_data})
Resultado de Ejecución
>>> Synchronizing with mobile...
📡 Satellite: Received route to Lisbon via Toll
connected_car_l30 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00