Nivel 58: demo_level58.py

Este es el nivel 58 del tour de aprendizaje.

Código Fuente

"""
DEMO LEVEL 58: Retry con Excepciones Específicas
------------------------------------------------
Adds: Control granular de reintentos por tipo de excepción.
Continues: Retry de L22.

DIAGRAM:
(operacion_inestable)
      |
      +-- NetworkError   --> [retry 3 veces]
      +-- ValueError  --> [NO reintentar - failure inmediatamente]
      +-- Success    --> [continúa]
"""

import random

from wpipe import Pipeline, step

class NetworkError(Exception):
    pass

@step(
    name="operacion_inestable",
    retry_count=3,
    retry_delay=0.1,
    retry_on_exceptions=(NetworkError,),
)
def operacion_inestable(data: dict) -> None:

    """Operacion inestable step.

    Args:

        data: Input data for the step.

    Returns:

        dict: Result of the step.

    """
    val = random.random()
    if val < 0.3:
        raise NetworkError("Conexión perdue")
    elif val < 0.5:
        raise ValueError("Error de valor inválido")
    print("✅ Operación completada exitosamente")
    return {"status": "ok"}

@step(name="verificar_sistema")
def verificar_sistema(data: dict) -> None:

    """Verificar sistema step.

    Args:

        data: Input data for the step.

    Returns:

        dict: Result of the step.

    """
    print("✅ Sistema verificado")
    return {"verificado": True}

if __name__ == "__main__":
    pipe = Pipeline(pipeline_name="viaje_l58_selectiveretry", verbose=True)
    pipe.set_steps([operacion_inestable, verificar_sistema])
    print("\n>>> Probando retry selectivo...\n")
    try:
        pipe.run({})
    except Exception as e:
        print(f"Error esperado: {e}")

Resultado de Ejecución


>>> Probando retry selectivo...

✅ Operación completada exitosamente ✅ Sistema verificado viaje_l58_selectiveretry ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00