Nivel 43: demo_level43.py

Este es el nivel 43 del tour de aprendizaje.

Código Fuente

"""
DEMO LEVEL 43: Async with Retry (Using Pipeline)
-----------------------------------------------
Adds: Retry in async pipeline.
Continues: L42.

DIAGRAM:
(connect_car) -[fail]-> retry --> [connected]
"""

import asyncio
import random
from typing import Any, Dict
from wpipe import PipelineAsync

async def connect_car(data: Any) -> Dict[str, bool]:
    """Connect car step asynchronously with potential failure.

    Args:
        data: Input data.

    Returns:
        Dict[str, bool]: Connection status.

    Raises:
        ConnectionError: If Bluetooth is unavailable.
    """
    if random.random() < 0.4:
        raise ConnectionError("Bluetooth unavailable")
    print("📱 [ASYNC] Car connected")
    return {"connected": True}

async def sync_data(data: Any) -> Dict[str, str]:
    """Synchronize data step asynchronously.

    Args:
        data: Input data.

    Returns:
        Dict[str, str]: Sync status.
    """
    print("🔄 [ASYNC] Data synchronized")
    return {"sync": "ok"}

if __name__ == "__main__":

    async def main() -> None:
        """Main async entry point."""
        pipe = PipelineAsync(
            pipeline_name="trip_l43_asyncretry",
            verbose=True,
            max_retries=3,
            retry_delay=0.1,
        )
        pipe.set_steps([connect_car, sync_data])
        print("\n>>> Testing async retry...\n")
        try:
            await pipe.run({})
        except ConnectionError as e:
            print(f"Error: {e}")

    asyncio.run(main())

Resultado de Ejecución


>>> Testing async retry...

📱 [ASYNC] Car connected 🔄 [ASYNC] Data synchronized