Nivel 83: demo_level83.py

Este es el nivel 83 del tour de aprendizaje.

Código Fuente

"""
DEMO LEVEL 83: Export Statistics
---------------------------------
Adds: Exportar estadísticas.
Continues: L82.

DIAGRAM:
Pipeline --> (export_statistics) --> stats.json
"""

import os

from pathlib import Path

from wpipe import Pipeline, PipelineExporter, step

@step(name="start")
def start(data: dict) -> None:

    """Start step.

    Args:

        data: Input data for the step.

    Returns:

        dict: Result of the step.

    """
    return {"motor": "on"}

@step(name="process")
def process(data: dict) -> None:

    """Process step.

    Args:

        data: Input data for the step.

    Returns:

        dict: Result of the step.

    """
    return {"procesado": True}

if __name__ == "__main__":
    os.makedirs("output", exist_ok=True)

    pipe = Pipeline(
        pipeline_name="viaje_l83_exportstats",
        verbose=True,
        tracking_db="output/export_stats.db",
    )
    pipe.set_steps([start, process])
    pipe.run({})

    print("\n📊 Exportando estadísticas...")
    exporter = PipelineExporter("output/export_stats.db")
    stats = exporter.export_statistics(export_format="json")

    if stats:
        Path("output/viaje83_stats.json").write_text(stats)
        print("✅ Exportado a output/viaje83_stats.json")
    else:
        print("ℹ No hay estadísticas")

Resultado de Ejecución


[PIPELINE STATUS] Registered: PIPE-F12328AC viaje_l83_exportstats ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00 [PIPELINE STATUS] PIPE-F12328AC: COMPLETED

📊 Exportando estadísticas… ✅ Exportado a output/viaje83_stats.json