Nivel 84: demo_level84.py

Este es el nivel 84 del tour de aprendizaje.

Código Fuente

"""
DEMO LEVEL 84: Export Multiple Formats
------------------------------------
Adds: Exportar a múltiples formatos.
Continues: L83.

DIAGRAM:
Pipeline --> JSON + CSV + STATS
"""

import os

from pathlib import Path

from wpipe import Pipeline, PipelineExporter, step

@step(name="start")
def start(data: dict) -> None:

    """Start step.

    Args:

        data: Input data for the step.

    Returns:

        dict: Result of the step.

    """
    print("🔑 Motor iniciado")
    return {"motor": "on"}

@step(name="process")
def process(data: dict) -> None:

    """Process step.

    Args:

        data: Input data for the step.

    Returns:

        dict: Result of the step.

    """
    print("📊 Procesando...")
    return {"procesado": True}

if __name__ == "__main__":
    os.makedirs("output", exist_ok=True)

    db = "output/export_multi.db"
    pipe = Pipeline(pipeline_name="viaje_l84_exportmulti", verbose=True, tracking_db=db)
    pipe.set_steps([start, process])
    pipe.run({})

    print("\n📤 Exportando en múltiples formatos...")
    exporter = PipelineExporter(db)

    json_data = exporter.export_pipeline_logs(export_format="json")
    if json_data:
        Path("output/viaje84.json").write_text(json_data)
        print("✅ JSON exportado")

    csv_data = exporter.export_pipeline_logs(export_format="csv")
    if csv_data:
        Path("output/viaje84.csv").write_text(csv_data)
        print("✅ CSV exportado")

    stats = exporter.export_statistics(export_format="json")
    if stats:
        Path("output/viaje84_stats.json").write_text(stats)
        print("✅ Statistics exportado")

Resultado de Ejecución


[PIPELINE STATUS] Registered: PIPE-3B19A26C 🔑 Motor iniciado 📊 Procesando… viaje_l84_exportmulti ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00 [PIPELINE STATUS] PIPE-3B19A26C: COMPLETED

📤 Exportando en múltiples formatos… ✅ JSON exportado ✅ CSV exportado ✅ Statistics exportado