Nivel 40: demo_level40.py

Este es el nivel 40 del tour de aprendizaje.

Código Fuente

"""
DEMO LEVEL 40: THE GRAND FINALE (Total Autonomy)
------------------------------------------------
Summary: Integration of the 40 functionalities in a real trip.
Accumulates: ABSOLUTELY EVERYTHING.

FINAL DIAGRAM:
[Start] -> (Load YAML) -> (Checkpoints) -> For(Route) {
   Parallel(Multiprocess AI) -> Condition(Obstacle) -> Pydantic(Validate) -> Metric(Fuel)
} -> [Destination] -> (Shutdown Hooks) -> (Export CSV)
"""

import os
import random
from typing import Any, Dict

from pydantic import BaseModel, Field

from wpipe import (
    CheckpointManager,
    Condition,
    For,
    Metric,
    Parallel,
    Pipeline,
    PipelineExporter,
    step,
    to_obj,
)

# 1. Secure Data Definition
class CarStatus(BaseModel):
    """Pydantic model for vehicle status validation."""
    fuel: float = Field(..., ge=0, le=100)
    speed: float = Field(..., ge=0, le=200)

# 2. Vision Intelligence
@step(name="ai_vision_360")
def ai_vision_360(data: Any) -> Dict[str, Any]:
    """AI Vision 360 processing step.

    Args:
        data: Input data for the step.

    Returns:
        Dict[str, Any]: Detection status and distance.
    """
    danger = random.random() < 0.2
    return {"obstacle": danger, "distance": random.randint(2, 50)}

# 3. Automatic Response
@step(name="emergency_braking")
def emergency_braking(data: Any) -> Dict[str, bool]:
    """Emergency braking step.

    Args:
        data: Input data for the step.

    Returns:
        Dict[str, bool]: Braking status.
    """
    print("🚨 ADAS: EMERGENCY BRAKING ACTIVATED!")
    return {"braking": True}

# Validation and Metrics
@to_obj(CarStatus)
def validate_telemetry(ctx: CarStatus) -> Dict[str, bool]:
    """Validates telemetry and records speed metric.

    Args:
        ctx: Validated CarStatus context.

    Returns:
        Dict[str, bool]: Validation status.
    """
    Metric.record("trip_speed", ctx.speed, "km/h")
    return {"v_ok": True}

if __name__ == "__main__":
    # Infrastructure preparation
    os.makedirs("output", exist_ok=True)
    TRACKING_DB = "output/grand_trip_2026.db"
    CK_MGR = CheckpointManager("output/trip_control_points.db")

    # THE SUPER PIPELINE: Configured with the full arsenal
    trip = Pipeline(
        pipeline_name="total_autonomous_trip",
        tracking_db=TRACKING_DB,
        collect_system_metrics=True,
        continue_on_error=True,
        verbose=True,
    )

    # Master flow definition
    trip.set_steps(
        [
            # Start and Preparation
            (lambda d: {"fuel": 100, "speed": 120}, "initial_start", "v1.0"),
            # Main Driving Loop
            For(
                iterations=3,
                steps=[
                    # Look in parallel using multiple cores (Multiprocess)
                    Parallel(steps=[ai_vision_360] * 3, use_processes=True, max_workers=3),
                    # Logical decision
                    Condition(expression="obstacle == True", branch_true=[emergency_braking]),
                    validate_telemetry,
                ],
            ),
            (
                lambda d: print("🏁 DESTINATION REACHED: The car has arrived on its own."),
                "finish",
                "v1.0",
            ),
        ]
    )

    # Event and Hook Registration
    trip.add_event(
        event_type="log", event_name="Departure", message="Starting Madrid-Valencia route"
    )
    trip.add_event(
        event_type="hook",
        event_name="Final Protocol",
        steps=[(lambda d: print("🔌 Systems disconnected."), "shutdown", "v1.0")],
    )

    # MASTER EXECUTION
    print("\n🚀 STARTING THE GRAND TRIP (Integration of 40 levels)...\n")
    trip.run({}, checkpoint_mgr=CK_MGR, checkpoint_id="final_autonomous_trip")

    # Exporting the final report for the owner
    PipelineExporter(TRACKING_DB).export_pipeline_logs(
        "output/owner_report.csv", export_format="csv"
    )
    print("\n✅ LEARNING TOUR COMPLETED. 40 LEVELS OF WPIPE MASTERY.")

Resultado de Ejecución


🚀 STARTING THE GRAND TRIP (Integration of 40 levels)…

[PIPELINE STATUS] Registered: PIPE-4D2FBA2B [PARALLEL] Executing 3 steps using PROCESSES (workers=3)

[ERROR] Loop broken at iteration 0 due to: Can’t pickle <function <lambda> at 0x7ce967c96020>: attribute lookup <lambda> on __main__ failed | Can’t pickle <function <lambda> at 0x7ce967c96020>: attribute lookup <lambda> on __main__ failed | Can’t pickle <function <lambda> at 0x7ce967c96020>: attribute lookup <lambda> on __main__ failed

🏁 DESTINATION REACHED: The car has arrived on its own. total_autonomous_trip ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00

[HOOKS] Executing post-run tasks… 🔌 Systems disconnected. [PIPELINE STATUS] PIPE-4D2FBA2B: COMPLETED

✅ LEARNING TOUR COMPLETED. 40 LEVELS OF WPIPE MASTERY.