Nivel 23: demo_level23.py
Este es el nivel 23 del tour de aprendizaje.
Código Fuente
"""
DEMO LEVEL 23: Compartmentalization (Data Filtering)
----------------------------------------------------
Adds: Passing sub-contexts to protect sensitive data.
Accumulates: Modularization (L19).
DIAGRAM:
[Global Context: engine_id, position, image, speed]
|
v
(Ads_AI) <- Only receives 'position'! (Privacy)
"""
from typing import Any, Dict
from wpipe import Pipeline, step
@step(name="full_telemetry")
def full_telemetry(data: Any) -> Dict[str, Any]:
"""Full telemetry gathering step.
Args:
data: Input data.
Returns:
Dict[str, Any]: Engine ID, position, and speed.
"""
return {"engine_id": "X-100", "position": "Gran Via", "speed": 50}
# NEW IN L23: A step that only sees what we filter for it
@step(name="suggest_restaurants")
def suggest_restaurants(data: Dict[str, Any]) -> Dict[str, str]:
"""Restaurant suggestion step based on filtered position.
Args:
data: Filtered input data.
Returns:
Dict[str, str]: Suggestion details.
"""
# Verify we cannot see 'engine_id'
id_visible = "engine_id" in data
print(
f"📍 Suggestion in {data.get('position')}: Engine ID visible? {id_visible}"
)
return {"suggestion": "VIPS 200m away"}
if __name__ == "__main__":
pipe = Pipeline(pipeline_name="trip_l23_privacy", verbose=True)
pipe.set_steps(
[
full_telemetry,
# Filter the context using an intermediate lambda
(lambda d: {"position": d["position"]}, "PrivacyFilter", "v1.0"),
suggest_restaurants,
]
)
pipe.run({})
Resultado de Ejecución
📍 Suggestion in Gran Via: Engine ID visible? True trip_l23_privacy ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00