Nivel 51: demo_level51.py
Este es el nivel 51 del tour de aprendizaje.
Código Fuente
import os
from datetime import datetime
from dto.car import Car
from demo_level50 import get_trip_pipeline
from wpipe import CheckpointManager
# 1. Configuración de fiabilidad
db_path = "output/wpipe_dashboard.db"
trip = get_trip_pipeline(db_path)
trip.verbose = True
chk = CheckpointManager("checkpoints.db")
# ID estable para la demostración
ID_VIAJE = "vacaciones_lts_2026"
print(f"\n" + "="*60)
print(f"🚀 PIPELINE RESUMIBLE: {ID_VIAJE}")
print("="*60 + "\n")
# 2. Datos iniciales
car = Car(make="Toyota", model="Corolla").__dict__
try:
if not chk.can_resume(ID_VIAJE):
print("⊘ No se encontró punto de control previo. Startsndo trip desde el garaje...")
print("\n[!] PASO 1: Ejecución inicial con caída simulada.")
print("-" * 60)
# Inyectamos una excepción que ocurrirá DESPUÉS del primer paso exitoso (preparation_phase)
# pero antes de completar el bucle de viajes.
# Para esta demo, simplemente lanzamos el error manualmente tras una ejecución parcial.
# Ejecutamos la pipeline con el gestor de checkpoints activo
trip.run(car, checkpoint_mgr=chk, checkpoint_id=ID_VIAJE)
# Si por algún motivo termina sin errores, forzamos la caída para la demo
if not os.path.exists("checkpoints.db"):
print("ℹ Nota: No se generaron checkpoints. Asegúrate de que el trip sea lo suficientemente largo.")
raise RuntimeError("🔌 FALLO ELÉCTRICO CRÍTICO: El sistema se ha apagado inesperadamente.")
else:
print("⟲ ¡SISTEMA RECUPERADO! Detectado punto de control anterior.")
last = chk.get_last_checkpoint(ID_VIAJE)
print(f" 📍 Último hito guardado: {last['step_name']}")
print(f" 🔢 Índice del paso: {last['step_order']}")
print(f" 🕒 Fecha del guardado: {last['created_at']}")
print(f" 📦 Datos recuperados: {len(last['data'])} llaves en bodega")
print("\n>>> PASO 2: Reanudando trip automáticamente desde el punto exacto...")
print("-" * 60)
# Al pasar el mismo ID_VIAJE y el gestor, la pipeline saltará los pasos ya hechos
res = trip.run(car, checkpoint_mgr=chk, checkpoint_id=ID_VIAJE)
print("\n" + "✓" * 60)
print("🏁 ¡VIAJE COMPLETADO CON ÉXITO TRAS LA REANUDACIÓN!")
print("✓" * 60)
# Limpiamos para que se pueda volver a probar la demo desde cero
chk.clear_checkpoints(ID_VIAJE)
if os.path.exists("checkpoints.db"):
os.remove("checkpoints.db")
except Exception as e:
print(f"\n" + "!" * 60)
print(f"✘ SISTEMA CAÍDO: {e}")
print("!" * 60)
print("\n>>> INSTRUCCIONES: Ejecuta este script una vez más para ver cómo WPipe")
print(">>> reanuda el trip saltándose la fase de preparación.")
Resultado de Ejecución
⊘ No se encontró punto de control previo. Startsndo trip desde el garaje…
[PIPELINE STATUS] Registered: PIPE-E4419507
[CHECKPOINT REACHED] trip_start >>> [CHECKPOINT] Trip start — New trip —_loop_iteration [PARALLEL] Executing 3 steps using THREADS (workers=3)
Checking front and rear lights… OK
[CONDITION] Evaluating: tire_level == ‘Low’ [CONDITION] Evaluating: tire_level == ‘Low’
[ERROR CAPTURE] Processing error in state ‘random_flat_tire’…
📍 FAILED STATE: random_flat_tire 📄 FILE: /home/william.rodriguez/Documents/wpipe/examples/00_honey_pot/03_yield/demo_level50.py 🔢 LINE: 72 ⚠️ MESSAGE: Random puncture 🔄 ATTEMPT: 1 🕒 TIMESTAMP: 2026-04-30T13:36:49.655922 ———————————————————— [RETRY] random_flat_tire failed (attempt 1): Random puncture [non_serializable_obj]: None non_serializable_objv1.0 — New trip —_loop_iteration [PARALLEL] Executing 3 steps using THREADS (workers=3)
Checking front and rear lights… OK
[non_serializable_obj]: None non_serializable_objv1.0 — New trip —_loop_iteration [PARALLEL] Executing 3 steps using THREADS (workers=3)
Checking front and rear lights… OK
[non_serializable_obj]: None non_serializable_objv1.0 trip ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00
[HOOKS] Executing post-run tasks… >>> [HOOK] Trip finished sending final summary… [PIPELINE STATUS] PIPE-E4419507: COMPLETED
>>> INSTRUCCIONES: Ejecuta este script una vez más para ver cómo WPipe >>> reanuda el trip saltándose la fase de preparación.