Nivel 29: demo_level29.py
Este es el nivel 29 del tour de aprendizaje.
Código Fuente
"""
DEMO LEVEL 29: Safety Save (Smart Checkpoints)
----------------------------------------------
Adds: Checkpoints that only fire under critical conditions.
Accumulates: Persistence (L14).
DIAGRAM:
(Trip) -> [Fuel < 15%?] -- YES --> (Save Rescue Point)
"""
import os
from typing import Any, Dict
from wpipe import CheckpointManager, Pipeline, step
@step(name="critical_consumption")
def critical_consumption(data: Any) -> Dict[str, int]:
"""Critical consumption detection step.
Args:
data: Input data for the step.
Returns:
Dict[str, int]: Fuel level.
"""
# Simulate fuel dropping dangerously
fuel_actual = 10
print(f"⛽ Fuel Alert: {fuel_actual}%")
return {"fuel": fuel_actual}
if __name__ == "__main__":
os.makedirs("output", exist_ok=True)
ck_mgr = CheckpointManager("output/car_safety.db")
pipe = Pipeline(pipeline_name="safety_first_l29", verbose=True)
# NEW IN L29: The car saves automatically only if fuel is low
pipe.add_checkpoint(checkpoint_name="rescue_point", expression="fuel < 15")
pipe.set_steps([critical_consumption])
pipe.run({"fuel": 100}, checkpoint_mgr=ck_mgr, checkpoint_id="night_trip")
Resultado de Ejecución
⛽ Fuel Alert: 10%
[CHECKPOINT REACHED] rescue_point safety_first_l29 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00