Nivel 33: demo_level33.py
Este es el nivel 33 del tour de aprendizaje.
Código Fuente
"""
DEMO LEVEL 33: 360° Vision Fusion (Deltas)
------------------------------------------
Adds: Process parallelism that adds data without colliding.
Accumulates: Multiprocess (L13) and Delta Merging (Library).
DIAGRAM:
Parallel(PROCESSES)
[CPU 1] -> (AI_Signals) -> Adds 'signals'
[CPU 2] -> (AI_Objects) -> Adds 'objects'
|
v
[Final Context] -> Possesses BOTH results (Intelligent fusion).
"""
from typing import Any, Dict, List
from wpipe import Parallel, Pipeline, step
@step(name="ai_signals")
def ai_signals(data: Any) -> Dict[str, List[str]]:
"""AI Traffic signal analysis step.
Args:
data: Input data for the step.
Returns:
Dict[str, List[str]]: Detected traffic signals.
"""
print("🛑 CPU-1: Analyzing traffic signals...")
return {"signals": ["Stop", "60km/h"]}
@step(name="ai_objects")
def ai_objects(data: Any) -> Dict[str, List[str]]:
"""AI Pedestrian and obstacle analysis step.
Args:
data: Input data for the step.
Returns:
Dict[str, List[str]]: Detected objects.
"""
print("🚶 CPU-2: Analyzing pedestrians and obstacles...")
return {"objects": ["Pedestrian crossing"]}
if __name__ == "__main__":
pipe = Pipeline(pipeline_name="vision_360_l33", verbose=True)
pipe.set_steps(
[Parallel(steps=[ai_signals, ai_objects], max_workers=2, use_processes=True)]
)
final_results = pipe.run({})
print(f"\n📊 FULL 360 MAP: {list(final_results.keys())}")
print(f" Detections: {final_results.get('signals')} + {final_results.get('objects')}")
Resultado de Ejecución
[PARALLEL] Executing 2 steps using PROCESSES (workers=2) vision_360_l33 0% -:–:–
- 📊 FULL 360 MAP: [‘_pipeline_start_time’, ‘error’]
Detections: None + None