Nivel 18: demo_level18.py
Este es el nivel 18 del tour de aprendizaje.
Código Fuente
"""
DEMO LEVEL 18: The Route Book (Exporting)
-----------------------------------------
Adds: PipelineExporter to save history to disk.
Accumulates: Data tracking (L14).
DIAGRAM:
(Trip Finished) -> [Tracking Database]
|
v
(PipelineExporter) -> [trip_logs.csv]
"""
import os
from typing import Any, Dict
from wpipe import Pipeline, PipelineExporter, step
@step(name="drive_section_a")
def drive_section_a(data: Any) -> Dict[str, Any]:
"""Driving section A step.
Args:
data: Input data for the step.
Returns:
Dict[str, Any]: Section details.
"""
print("🚗 Driving through Section A...")
return {"section": "A", "duration": 15}
if __name__ == "__main__":
db_path = "output/trip_history.db"
os.makedirs("output", exist_ok=True)
pipe = Pipeline(
pipeline_name="trip_l18_exporter", tracking_db=db_path, verbose=True
)
pipe.set_steps([drive_section_a])
pipe.run({})
# NEW IN L18: Exporting the trip to a human-readable format
print("\n>>> Generating CSV report for the insurance company...")
exporter = PipelineExporter(db_path)
exporter.export_pipeline_logs("output/trip_report.csv", export_format="csv")
print("✅ Report saved in output/trip_report.csv")
Resultado de Ejecución
[PIPELINE STATUS] Registered: PIPE-56977695 🚗 Driving through Section A… trip_l18_exporter ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00 [PIPELINE STATUS] PIPE-56977695: COMPLETED
>>> Generating CSV report for the insurance company... ✅ Report saved in output/trip_report.csv