Nivel 15: demo_level15.py

Este es el nivel 15 del tour de aprendizaje.

Código Fuente

"""
DEMO LEVEL 15: Resumption after Breakdown (Resume)
--------------------------------------------------
Adds: Automatic recovery from the last checkpoint.
Accumulates: Persistence (L14).

DIAGRAM:
[Breakdown detected?] -> [Load last Checkpoint]
      |
      v
(Skip steps already done) -> (Continue from point of failure)
"""

import random
from typing import Any, Dict

from wpipe import CheckpointManager, Pipeline, step

@step(name="preparation_phase")
def preparation_phase(data: Any) -> Dict[str, str]:
    """Preparation phase step.

    Args:
        data: Input data for the step.

    Returns:
        Dict[str, str]: Current location.
    """
    print("🏠 Leaving home (Step already done in the past)...")
    return {"location": "road"}

@step(name="critical_phase")
def critical_phase(data: Any) -> Dict[str, str]:
    """Critical phase step with potential failure.

    Args:
        data: Input data for the step.

    Returns:
        Dict[str, str]: Trip status.

    Raises:
        RuntimeError: If a breakdown occurs.
    """
    if random.random() < 0.7:
        print("💥 ELECTRICAL BREAKDOWN! The system is shutting down.")
        raise RuntimeError("Battery failure")
    print("🏁 Arrival at final destination.")
    return {"status": "Arrived"}

if __name__ == "__main__":
    ck_mgr = CheckpointManager("output/trip_emergency.db")
    session = "return_trip"

    pipe = Pipeline(pipeline_name="trip_l15_recovery", verbose=True)
    pipe.set_steps([preparation_phase, critical_phase])

    print(f">>> Can we resume previous trip? {ck_mgr.can_resume(session)}")
    try:
        pipe.run({}, checkpoint_mgr=ck_mgr, checkpoint_id=session)
    except RuntimeError:
        print("\n[!] The car has stopped. Run again to resume.")

Resultado de Ejecución


>>> Can we resume previous trip? False
🏠 Leaving home (Step already done in the past)...
💥 ELECTRICAL BREAKDOWN! The system is shutting down.
trip_l15_recovery ━━━━━━━━━━━━━━━━━━━━                      50% -:--:--
Traceback (most recent call last):
  File "/home/william.rodriguez/miniconda3/lib/python3.13/site-packages/wpipe/pipe/pipe.py", line 708, in _task_invoke
    result = _run()
  File "/home/william.rodriguez/miniconda3/lib/python3.13/site-packages/wpipe/pipe/pipe.py", line 701, in _run
    return func(*args, **kwargs)
  File "/home/william.rodriguez/miniconda3/lib/python3.13/site-packages/wpipe/decorators/step.py", line 209, in wrapper
    return func(*args, **kwargs)
  File "/home/william.rodriguez/Documents/wpipe/examples/00_honey_pot/03_yield/demo_level15.py", line 47, in critical_phase
    raise RuntimeError("Battery failure")
RuntimeError: Battery failure

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File “/home/william.rodriguez/miniconda3/lib/python3.13/site-packages/wpipe/pipe/pipe.py”, line 459, in _pipeline_run_with_report

result = self._pipeline_run(*args, **kwargs)

File “/home/william.rodriguez/miniconda3/lib/python3.13/site-packages/wpipe/pipe/pipe.py”, line 1290, in _pipeline_run
data, error_msg, error_step = self._execute_all_pipeline_steps(

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^

File “/home/william.rodriguez/miniconda3/lib/python3.13/site-packages/wpipe/pipe/pipe.py”, line 1200, in _execute_all_pipeline_steps

data = self._execute_step(item, data, **step_kwargs)

File “/home/william.rodriguez/miniconda3/lib/python3.13/site-packages/wpipe/pipe/pipe.py”, line 801, in _execute_step
return self._execute_task_step(item, data, parent_step_id, parallel_group, **kwargs)

~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File “/home/william.rodriguez/miniconda3/lib/python3.13/site-packages/wpipe/pipe/pipe.py”, line 967, in _execute_task_step

result_data = self._task_invoke(func, name, data, __step_meta__=step_meta, **kwargs)

File “/home/william.rodriguez/miniconda3/lib/python3.13/site-packages/wpipe/pipe/pipe.py”, line 747, in _task_invoke

raise TaskError(str(e), Codes.TASK_FAILED) from e

wpipe.exception.api_error.TaskError: [Error Code: 502] Battery failure

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File “/home/william.rodriguez/Documents/wpipe/examples/00_honey_pot/03_yield/demo_level15.py”, line 60, in <module>

pipe.run({}, checkpoint_mgr=ck_mgr, checkpoint_id=session) ~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File “/home/william.rodriguez/miniconda3/lib/python3.13/site-packages/wpipe/pipe/pipe.py”, line 1327, in run

result = self._pipeline_run_with_report(*args, **kwargs)

File “/home/william.rodriguez/miniconda3/lib/python3.13/site-packages/wpipe/pipe/pipe.py”, line 477, in _pipeline_run_with_report

raise ProcessError(str(te), Codes.TASK_FAILED) from te

wpipe.exception.api_error.ProcessError: [Error Code: 502] Battery failure