Nivel 37: demo_level37.py
Este es el nivel 37 del tour de aprendizaje.
Código Fuente
"""
DEMO LEVEL 37: Shutdown Protocols (Hooks)
-----------------------------------------
Adds: Events with associated steps for post-pipeline execution.
Accumulates: Error Management (L11).
DIAGRAM:
(Trip Execution) -- [Success or Failure] --> (Trigger Hooks)
|
v
(send_trip_summary) -> (shutdown_ai_systems)
"""
from typing import Any, Dict
from wpipe import Pipeline, step
@step(name="drive_final_stretch")
def drive_final_stretch(data: Any) -> Dict[str, str]:
"""Final stretch driving step.
Args:
data: Input data for the step.
Returns:
Dict[str, str]: Arrival status.
"""
print("🚗 Driving the last few meters...")
return {"arrival": "Parking"}
@step(name="shutdown_protocol")
def shutdown_protocol(data: Any) -> Dict[str, bool]:
"""Shutdown protocol step for cleaning up resources.
Args:
data: Input data for the step.
Returns:
Dict[str, bool]: Shutdown status.
"""
print("🧹 SAFETY HOOK: Deactivating cameras and clearing temporary memory...")
return {"systems_asleep": True}
if __name__ == "__main__":
pipe = Pipeline(pipeline_name="safety_hooks_l37", verbose=True)
# NEW IN L37: This event ensures shutdown_protocol runs AT THE END
pipe.add_event(
event_type="cleanup", event_name="End of Journey", steps=[shutdown_protocol]
)
pipe.set_steps([drive_final_stretch])
print(">>> Starting trip. Safety protocols will trigger upon completion.")
pipe.run({})
Resultado de Ejecución
>>> Starting trip. Safety protocols will trigger upon completion. 🚗 Driving the last few meters... safety_hooks_l37 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00[HOOKS] Executing post-run tasks… 🧹 SAFETY HOOK: Deactivating cameras and clearing temporary memory…