Nivel 38: demo_level38.py
Este es el nivel 38 del tour de aprendizaje.
Código Fuente
"""
DEMO LEVEL 38: Hybrid Resource Management
------------------------------------------
Adds: Combination of threads and processes for maximum efficiency.
Accumulates: Total Parallelism (L25).
DIAGRAM:
[Threads] -> (Air Sensors, Temperature, Humidity) -> Light
[Processes] -> (4K Object Recognition AI) -> Heavy
"""
import time
from typing import Any, Dict
from wpipe import Pipeline, step, Parallel
@step(name="heavy_ai_4k")
def heavy_ai_4k(data: Any) -> Dict[str, str]:
"""Heavy 4K video analysis step using multiprocess power.
Args:
data: Input data for the step.
Returns:
Dict[str, str]: Video analysis status.
"""
time.sleep(0.3)
return {"video_analyzed": "OK"}
@step(name="light_air_sensor")
def light_air_sensor(data: Any) -> Dict[str, str]:
"""Light air quality sensor step using threads.
Args:
data: Input data for the step.
Returns:
Dict[str, str]: Air quality status.
"""
return {"air_quality": "Excellent"}
if __name__ == "__main__":
pipe = Pipeline(pipeline_name="hybrid_power_l38", verbose=True)
pipe.set_steps(
[
# 1. Use processes for heavy video load
Parallel(steps=[heavy_ai_4k] * 2, use_processes=True, max_workers=2),
# 2. Use threads for sensors that consume minimal CPU
Parallel(steps=[light_air_sensor] * 4, use_processes=False, max_workers=4),
]
)
print(">>> Optimizing hardware: The car uses threads and processes according to the task.")
pipe.run({})
Resultado de Ejecución
>>> Optimizing hardware: The car uses threads and processes according to the task.
[PARALLEL] Executing 2 steps using PROCESSES (workers=2)
hybrid_power_l38 0% -:--:--