Nivel 81: demo_level81.py

Este es el nivel 81 del tour de aprendizaje.

Código Fuente

"""
DEMO LEVEL 81: Export a JSON
-------------------------------
Adds: Exportar resultados a JSON.
Continues: Tracking de L49.

DIAGRAM:
Pipeline --> (export) --> resultado.json
"""

import os

from pathlib import Path

from wpipe import Pipeline, PipelineExporter, step

def start(data):
    print("🔑 Motor iniciado")
    return {"motor": "on", "km": 100}

@step(name="finish")
def finish(data: dict) -> None:

    """Finish step.

    Args:

        data: Input data for the step.

    Returns:

        dict: Result of the step.

    """
    print("🏁 Viaje completado")
    return {"destino": "llegado"}

if __name__ == "__main__":
    os.makedirs("output", exist_ok=True)

    pipe = Pipeline(
        pipeline_name="viaje_l81_exportjson",
        verbose=True,
        tracking_db="output/export_test.db",
    )
    pipe.set_steps([start, finish])
    pipe.run({})

    print("\n📤 Exportando a JSON...")
    exporter = PipelineExporter("output/export_test.db")
    json_data = exporter.export_pipeline_logs(export_format="json")

    if json_data:
        Path("output/viaje81.json").write_text(json_data)
        print("✅ Exportado a output/viaje81.json")
    else:
        print("ℹ No hay datos para exportar")

Resultado de Ejecución


[PIPELINE STATUS] Registered: PIPE-888D3009 🔑 Motor iniciado 🏁 Viaje completado viaje_l81_exportjson ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00 [PIPELINE STATUS] PIPE-888D3009: COMPLETED

📤 Exportando a JSON… ✅ Exportado a output/viaje81.json