Nivel 52: demo_level52.py

Este es el nivel 52 del tour de aprendizaje.

Código Fuente

import os
import random
import json
import asyncio
from pathlib import Path
from dto.car import Car
from states.car_info_printer import CarInfoPrinter
from states.change_oil import change_oil
from states.drive import drive
from states.deflate_tires import deflate_tires
from states.preparation import preparation_phase
from states.refuel import refuel
from states.inflate_tires import inflate_tires
from states.print_fuel_level import print_fuel_level
from states.car_info_printer import nested_step

from wpipe import (
    Condition, 
    For, 
    Metric, 
    PipelineAsync, 
    Severity, 
    auto_dict_input, 
    PipelineExporter, 
    TaskTimer, 
    ResourceMonitor,
    step,
    Parallel,
    PipelineContext,
    object_to_dict
)

# Definimos el esquema de la Bodega (Contexto)
class ViajeContext(PipelineContext):
    marca: str
    modelo: str
    nivel_gasolina: str
    nivel_aceite: str
    nivel_neumaticos: str

# Definimos check_lights para que el bloque Parallel no falle por NameError
@step(name="check_lights", version="v1.0")
async def check_lights(d): 
    print("     * [ASYNC] Revisando lights traseras y delanteras... OK")
    await asyncio.sleep(0.01) # Simulación de trabajo I/O
    return d

@step(name="random_flat_tire", version="v1.0", retry_count=10, retry_delay=0)
async def random_flat_tire(d):
    if random.random() < 0.5:
        raise RuntimeError("Pinchazo aleatorio")
    return d

@step(name="notify_telegram_error", version="v1.0")
async def notify_telegram_error(context, error):
    print("\n" + "!" * 60)
    print("🚨 [ASYNC] ALERTA DE SISTEMA: ERROR DETECTADO")
    print("!" * 60)
    print(f"📍 ESTADO FALLIDO: {error['step_name']}")
    print(f"⚠️ MENSAJE: {error['error_message']}")
    print("-" * 60)
    return context

db_path = "wpipe_dashboard_async.db"

async def get_viaje_pipeline_async():
    trip = PipelineAsync(
        pipeline_name="viaje_async",
        verbose=True,
        tracking_db=db_path,
        max_retries=3,
        retry_delay=0,
        retry_on_exceptions=(RuntimeError,),
        collect_system_metrics=True,
        show_progress=True,
    )

    # Registro del capturador de errores
    trip.add_error_capture([notify_telegram_error])

    # Alerta de pipeline lento (>500ms)
    trip.tracker.add_alert_threshold(
        metric=Metric.PIPELINE_DURATION,
        expression=">500",
        severity=Severity.CRITICAL,
        steps=[CarInfoPrinter(">>> [ALERTA] Protocolo de rendimiento global activado")],
    )

    trip.add_event(
        event_type="notification",
        event_name="authorized_person",
        message="Results sent to external APIs",
        steps=[
            CarInfoPrinter(">>> [HOOK] El trip ha terminado, enviando resumen final..."),
        ],
    )

    trip.add_checkpoint(
        checkpoint_name="trip_start",
        expression="True",
        steps=[
            CarInfoPrinter(">>> [CHECKPOINT] Inicio del trip"),
        ],
    )

    trip.set_steps(
        [
            preparation_phase,
            For(
                iterations=3,
                steps=[
                    CarInfoPrinter(f"--- Nuevo trip asíncrono ---", "_loop_iteration"),
                    Parallel(
                        steps=[
                            refuel,
                            change_oil,
                            check_lights
                        ],
                        max_workers=3
                    ),
                    CarInfoPrinter("Resumen post-paralelo"),
                    (print_fuel_level, "Mostrar fuel", "v1.0"),
                    For(
                        validation_expression="nivel_gasolina != 'Vacío'",
                        steps=[
                            drive,
                            Condition(
                                expression="nivel_neumaticos == 'Bajo'",
                                branch_true=[
                                    nested_step,
                                    inflate_tires,
                                ],
                                branch_false=[deflate_tires],
                            ),
                            (print_fuel_level, "Mostrar fuel", "v1.0"),
                            random_flat_tire,
                        ],
                    ),
                ],
            ),
        ],
    )
    return trip

async def main():
    # Usamos ResourceMonitor para measure el consumption de hardware (RAM/CPU)
    with ResourceMonitor("Viaje_Completo_Async") as monitor:
        # Usamos TaskTimer para control de tiempos de ejecución
        with TaskTimer("viaje_pipeline_async", timeout_seconds=30) as timer:
            trip = await get_viaje_pipeline_async()

            @auto_dict_input
            async def run_pipeline(car_dict):
                return await trip.run(car_dict)

            car = Car(make="Toyota", model="Corolla")
            print(f"Carro inicial: {car.fuel_level}\n")
            results = await run_pipeline(car)

    # Resumen de recursos al terminar
    print(f"\nResource Summary (Async):")
    summary = monitor.get_summary()
    print(f"  - Peak RAM: {summary['peak_ram_mb']} MB")
    print(f"  - Avg CPU: {summary['avg_cpu_percent']}%")
    print(f"✓ Total time monitored: {timer.elapsed_seconds:.2f}s")

    print(f"\nViajes completados: {results.get('_loop_iteration')}")
    
    # --- ANÁLISIS DE DATOS ---
    analysis = trip.tracker.analysis
    stats = analysis.get_stats()
    print("\n" + "=" * 70)
    print("📊 ANÁLISIS DE RENDIMIENTO ASÍNCRONO")
    print("=" * 70)
    print(f"  - Total Ejecuciones: {stats['total_pipelines']}")
    print(f"  - Tasa de Éxito: {stats['success_rate']}%")

if __name__ == "__main__":
    asyncio.run(main())

Resultado de Ejecución


Carro inicial: Medium

[ASYNC CHECKPOINT REACHED] trip_start >>> [CHECKPOINT] Inicio del trip

[ASYNC STATUS] PIPE-D438CE14: COMPLETED

Resource Summary (Async):
  • Peak RAM: 51.2 MB

  • Avg CPU: 0.0%

✓ Total time monitored: 0.01s

Viajes completados: None

  • Total Ejecuciones: 2

  • Tasa de Éxito: 0.0%