Nivel 82: demo_level82.py

Este es el nivel 82 del tour de aprendizaje.

Código Fuente

"""
DEMO LEVEL 82: Export a CSV
-------------------------------
Adds: Exportar resultados a CSV.
Continues: L81.

DIAGRAM:
Pipeline --> (export) --> resultado.csv
"""

import os

from pathlib import Path

from wpipe import Pipeline, PipelineExporter, step

def start(data):
    print("🔑 Motor iniciado")
    return {"motor": "on"}

@step(name="finish")
def finish(data: dict) -> None:

    """Finish step.

    Args:

        data: Input data for the step.

    Returns:

        dict: Result of the step.

    """
    print("🏁 Viaje completado")
    return {"destino": "llegado"}

if __name__ == "__main__":
    os.makedirs("output", exist_ok=True)

    pipe = Pipeline(
        pipeline_name="viaje_l82_exportcsv",
        verbose=True,
        tracking_db="output/export_csv.db",
    )
    pipe.set_steps([start, finish])
    pipe.run({})

    print("\n📤 Exportando a CSV...")
    exporter = PipelineExporter("output/export_csv.db")
    csv_data = exporter.export_pipeline_logs(export_format="csv")

    if csv_data:
        Path("output/viaje82.csv").write_text(csv_data)
        print("✅ Exportado a output/viaje82.csv")
    else:
        print("ℹ No hay datos para exportar")

Resultado de Ejecución


[PIPELINE STATUS] Registered: PIPE-507953F8 🔑 Motor iniciado 🏁 Viaje completado viaje_l82_exportcsv ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00 [PIPELINE STATUS] PIPE-507953F8: COMPLETED

📤 Exportando a CSV… ✅ Exportado a output/viaje82.csv