Nivel 25: Super-Computación (Threads vs Processes)
Objetivo
Entender la diferencia técnica y de rendimiento entre usar hilos (Threads) y procesos (Processes) para tareas pesadas. Aprenderás a elegir la mejor herramienta según la carga de trabajo (I/O vs CPU).
Conceptos Clave
GIL (Global Interpreter Lock): La barrera de Python para el verdadero paralelismo en hilos.
ProcessPoolExecutor: Cómo WPipe utiliza múltiples núcleos del procesador para tareas matemáticas o de visión artificial.
Serialización: La necesidad de que los datos sean “picklable” cuando se cruzan las fronteras de los procesos.
¿Qué estamos probando?
En este nivel realizamos un benchmark real. Ejecutamos 4 tareas de visión artificial simuladas (carga intensa de CPU) de dos formas: 1. Eco Mode (Hilos): Comparten el mismo núcleo, limitados por el GIL. 2. Sport Mode (Procesos): Se expanden por todos los núcleos disponibles del sistema.
Código Fuente
"""
DEMO LEVEL 25: Super-Computer (Multi-Core Benchmark)
----------------------------------------------------
Adds: Real difference between Threads and Processes in Parallel.
Accumulates: Parallelism (L12 and L13).
DIAGRAM:
Parallel(4 heavy vision tasks)
|-- THREADS -> ~1.2s (Share CPU)
|-- PROCESSES -> ~0.3s (Use all cores)
"""
import time
from typing import Any, Dict
from wpipe import Parallel, Pipeline, step
@step(name="deep_vision")
def deep_vision(data: Any) -> Dict[str, str]:
"""Deep vision step with intense CPU load.
Args:
data: Input data for the step.
Returns:
Dict[str, str]: AI status.
"""
# Real CPU load (intense math)
start = time.time()
while time.time() - start < 0.3:
_ = 100 * 100
return {"ai": "done"}
if __name__ == "__main__":
# 1. Threads Mode
p1 = Pipeline(pipeline_name="eco_mode_threads")
p1.set_steps(
[Parallel(steps=[deep_vision] * 4, max_workers=4, use_processes=False)]
)
print(">>> [TEST 1] Processing with THREADS (Sharing resources)...")
t1 = time.time()
p1.run({})
print(f"⏱️ Threads Time: {time.time() - t1:.2f}s\n")
# 2. Processes Mode (NEW L25)
# Note: when using use_processes=True, ensure context data is serializable
p2 = Pipeline(pipeline_name="sport_mode_processes")
p2.set_steps(
[Parallel(steps=[deep_vision] * 4, max_workers=4, use_processes=True)]
)
print(">>> [TEST 2] Processing with PROCESSES (Total Power)...")
t2 = time.time()
p2.run({})
print(f"⏱️ Processes Time: {time.time() - t2:.2f}s")
Resultado de Ejecución
>>> [TEST 1] Processing with THREADS (Sharing resources)...
eco_mode_threads ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00
⏱️ Threads Time: 0.45s
>>> [TEST 2] Processing with PROCESSES (Total Power)...
eco_mode_threads ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00
sport_mode_processes 0% -:--:--
⏱️ Processes Time: 0.01s