Nivel 31: demo_level31.py

Este es el nivel 31 del tour de aprendizaje.

Código Fuente

"""
DEMO LEVEL 31: Safety Scan (Validators)
---------------------------------------
Adds: Custom Pydantic validators for physical safety.
Accumulates: Pydantic Telemetry (L16).

DIAGRAM:
(read_pressure) -> [pressure: 1.0 bar]
      |
      v
[Validator] -- [Is < 1.5?] -- ERROR! --> (Alert: Flat tire)
"""

import random
from typing import Any, Dict

from pydantic import BaseModel, Field, validator

from wpipe import Pipeline, step, to_obj

class SafetyCheck(BaseModel):
    """Pydantic model for tire pressure safety check."""
    pressure: float = Field(..., ge=1.0, le=4.0)

    @validator("pressure")
    def low_pressure_alert(cls, v: float) -> float:
        """Alerts if tire pressure is low.

        Args:
            v: The pressure value.

        Returns:
            float: The validated pressure value.
        """
        if v < 2.0:
            print("⚠️ WARNING: Low pressure detected. Inflation recommended.")
        return v

@step(name="pressure_sensor")
def pressure_sensor(data: Any) -> Dict[str, float]:
    """Pressure sensor reading step.

    Args:
        data: Input data for the step.

    Returns:
        Dict[str, float]: Pressure reading.
    """
    # Simulate reading from a low-air tire
    random_pressure = round(random.randint(10, 30) / 10, 2)
    return {"pressure": random_pressure}

@step(name="verify_integrity")
@to_obj(SafetyCheck)
def verify_integrity(ctx: SafetyCheck) -> Dict[str, bool]:
    """Verifies physical integrity based on tire pressure.

    Args:
        ctx: Validated safety check context.

    Returns:
        Dict[str, bool]: Integrity status.
    """
    print(f"🛞  Tires: {ctx.pressure} bar. Physical integrity confirmed.")
    return {"tires_ok": True}

if __name__ == "__main__":
    pipe = Pipeline(pipeline_name="safety_scan_l31", verbose=True)
    pipe.set_steps([pressure_sensor, verify_integrity])
    pipe.run({})

Resultado de Ejecución


🛞 Tires: 2.8 bar. Physical integrity confirmed. safety_scan_l31 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00 /home/william.rodriguez/Documents/wpipe/examples/00_honey_pot/03_yield/demo_level31.py:25: PydanticDeprecatedSince20: Pydantic V1 style @validator validators are deprecated. You should migrate to Pydantic V2 style @field_validator validators, see the migration guide for more details. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.13/migration/

@validator(“pressure”)