Nivel 1: Conceptos Básicos de Pipeline
Objetivo
Aprender a instanciar el motor de orquestación de WPipe y ejecutar una tarea atómica para verificar que el sistema está correctamente instalado y configurado.
Conceptos Clave
Instanciación: Creación del objeto Pipeline.
Verbose Mode: Uso del flag verbose=True para ver el progreso detallado en consola.
Flujo de Trabajo: Registro de funciones simples como pasos del pipeline.
¿Qué estamos probando?
En este nivel validamos la conectividad básica del motor. Estamos probando que una función estándar de Python pueda ser inyectada en el orquestador, ejecutada sin errores y que el sistema sea capaz de mostrar la barra de progreso de Rich.
Código Fuente
"""
DEMO LEVEL 1: The Beginning (Simple Functions)
----------------------------------------------
This level demonstrates the creation of a Pipeline and the execution
of a basic sequential function.
DIAGRAM:
[Empty Warehouse]
|
v
(start_engine) --> [engine: 'ON', fuel: 100]
"""
from typing import Any, Dict
from wpipe import Pipeline
def start_engine(data: Dict[str, Any]) -> Dict[str, Any]:
"""Start the car engine and initialize fuel levels.
Args:
data (Dict[str, Any]): The current pipeline context data.
Returns:
Dict[str, Any]: Updated context with engine status and fuel.
"""
print(f"🔑 Turning key: Engine started. Input data: {data}")
return {"engine": "ON", "fuel": 100}
if __name__ == "__main__":
pipeline = Pipeline(pipeline_name="Trip_L1", verbose=True)
pipeline.set_steps([start_engine])
pipeline.run({})
Resultado de Ejecución
🔑 Turning key: Engine started. Input data: {‘_pipeline_start_time’: ‘2026-04-30T13:36:30.012986’, ‘progress_rich’: <rich.progress.Progress object at 0x73979271f0e0>} Trip_L1 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00