Nivel 109: demo_level109.py
Este es el nivel 109 del tour de aprendizaje.
Código Fuente
"""
DEMO LEVEL 109: Wsqlite Pipeline Integration
-------------------------------------
Adds: Wsqlite con tracking de Pipeline.
Continues: L108.
DIAGRAM:
Pipeline(tracking_db) + Wsqlite
"""
from wpipe import Pipeline, step
from wpipe.sqlite import Wsqlite
@step(name="proceso")
def proceso(data: dict) -> None:
"""Proceso step.
Args:
data: Input data for the step.
Returns:
dict: Result of the step.
"""
return {"ok": True}
if __name__ == "__main__":
print(">>> Wsqlite + Pipeline integration...")
db = "output/pipe109.db"
pipe = Pipeline(pipeline_name="viaje_l109", verbose=True, tracking_db=db)
pipe.set_steps([proceso])
pipe.run({})
with Wsqlite(db_name=db) as wdb:
wdb.details = {"pipeline": "L109", "status": "completado"}
print("✅ Datos guardados en pipeline DB")
Resultado de Ejecución
>>> Wsqlite + Pipeline integration...
[PIPELINE STATUS] Registered: PIPE-3E279443
viaje_l109 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00
[PIPELINE STATUS] PIPE-3E279443: COMPLETED
✅ Datos guardados en pipeline DB