Nivel 2: Metadatos y Decorador @step

Objetivo

Aprender a enriquecer las tareas del pipeline utilizando el decorador @step para proporcionar nombres legibles, versionado y trazabilidad mejorada.

Conceptos Clave

  • Decorador @step: Cómo usar metadatos para identificar tareas en el Dashboard y logs.

  • Trazabilidad: Identificación única de versiones de cada paso.

  • Flujo de Datos II: Cómo el diccionario de salida de un paso se inyecta como entrada en el siguiente.

¿Qué estamos probando?

En este nivel probamos la capacidad del motor para extraer metadatos de las funciones decoradas. Validamos que el orquestador respete el nombre y la versión definidos en el decorador, y que mantenga la integridad del flujo de datos entre un paso simple y un paso decorado.

Código Fuente

"""
DEMO LEVEL 2: Metadata (@step)
------------------------------
This level introduces the use of @step to provide a name, version,
and traceability to the pipeline steps.

DIAGRAM:
[Empty Warehouse]
      |
      v
(start_engine) ----> [engine: 'ON']
      |
      v
(check_brakes @step) -> [engine: 'ON', brakes: 'OK']
"""

from typing import Any, Dict
from wpipe import Pipeline, step


def start_engine(data: Dict[str, Any]) -> Dict[str, Any]:
    """Start the car engine and initialize fuel levels.

    Args:
        data (Dict[str, Any]): The current pipeline context data.

    Returns:
        Dict[str, Any]: Updated context with engine status and fuel.
    """
    print(f"🔑 Turning key: Engine started. Input data: {data}")
    return {"engine": "ON", "fuel": 100}


@step(name="check_brakes", version="v1.0")
def check_brakes(data: Dict[str, Any]) -> Dict[str, Any]:
    """Verify the state of the brake system.

    Args:
        data (Dict[str, Any]): The current pipeline context data.

    Returns:
        Dict[str, Any]: Updated context with brake status.
    """
    print(f"👟 Testing pedals: Brakes verified. Input data: {data}")
    return {"brakes": "OK"}


if __name__ == "__main__":
    pipeline = Pipeline(pipeline_name="Trip_L2", verbose=True)
    pipeline.set_steps([start_engine, check_brakes])
    pipeline.run({})

Resultado de Ejecución


🔑 Turning key: Engine started. Input data: {‘_pipeline_start_time’: ‘2026-04-30T13:36:30.307010’, ‘progress_rich’: <rich.progress.Progress object at 0x72a03f99f230>} 👟 Testing pedals: Brakes verified. Input data: {‘_pipeline_start_time’: ‘2026-04-30T13:36:30.307010’, ‘progress_rich’: <rich.progress.Progress object at 0x72a03f99f230>, ‘engine’: ‘ON’, ‘fuel’: 100} Trip_L2 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00