Nivel 54: demo_level54.pyļ
Este es el nivel 54 del tour de aprendizaje.
Código Fuenteļ
from typing import Any, Dict
import os
import random
import time
import cv2
from genericpath import exists
from extras.wsqlite_test import test_Wsqlite
from wpipe import (
Condition,
For,
Metric,
Parallel,
Pipeline,
PipelineContext,
PipelineExporter,
ResourceMonitor,
Severity,
TaskTimer,
auto_dict_input,
object_to_dict,
step,
to_obj,
)
SIZE_CAPTURE = 150
def generator_capture():
image = cv2.imread("images.jpeg")
for i in range(SIZE_CAPTURE):
image_tmp = image.copy()
cv2.putText(
image_tmp, str(i), (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2
)
yield (i, image_tmp)
@step(name="start_capture", version="v1.0")
@to_obj
def start_capture(context: object):
cap = generator_capture()
return {
"cap": cap,
"video_size": SIZE_CAPTURE,
"process_complete": int(False),
}
@step(name="create_batch", version="v1.0")
class Create_batch:
def __init__(self, size=2):
self.size = size
@to_obj
def __call__(self, context: object):
print("Running Create_batch step...", flush=True)
batch = []
process_complete = False
for _ in range(self.size):
frame_id, frame = next(context.cap)
batch.append((frame_id, frame))
if frame_id >= context.video_size:
process_complete = True
return {"batch": batch, "process_complete": process_complete}
@step(name="notify_telegram_error", version="v1.0")
def notify_telegram_error(context: Any, error: Any) -> dict:
"""Notify telegram error step.
Args:
context: Input data for the step.
Returns:
dict: Result of the step.
"""
"""
Simula el envĆo de una notificación detallada a Telegram.
Recibe el contexto y los detalles tƩcnicos del error.
"""
print("\n" + "!" * 60)
print("šØ ALERTA DE SISTEMA: ERROR DETECTADO")
print("!" * 60)
print(f"š ESTADO FALLIDO: {error['step_name']}")
print(f"š ARCHIVO: {error['file_path']}")
print(f"š¢ LĆNEA: {error['line_number']}")
print(f"ā ļø MENSAJE: {error['error_message']}")
print("-" * 60)
# AquĆ podrĆas usar requests.post para enviar el mensaje real
return context
def random_mask() -> dict:
"""Notify telegram error step.
Args:
context: Input data for the step.
Returns:
dict: Result of the step.
"""
return [
# tupla de dos valores de 0 a 255, de valor aleatorio
(random.randint(0, 255), random.randint(0, 255))
for _ in range(random.randint(30, 80))
]
TEMPLATE = {
"class": "0",
"class_name": "demo",
"confidence": 0.2,
"mask": random_mask(),
}
CLASS_NAMES_A = [f"A_{i}" for i in range(30)]
CLASS_NAMES_B = [f"B_{i}" for i in range(30)]
CLASS_NAMES_C = [f"C_{i}" for i in range(30)]
@step(name="simulated_inference", version="v1.0")
class Simulated_yolo_inference:
_counter = 0
def __init__(self, ref_class_names, sub_name):
self.ref_class_names = ref_class_names
self.sub_name = sub_name
self._instance_id = Simulated_yolo_inference._counter
Simulated_yolo_inference._counter += 1
@to_obj
def __call__(self, context: object):
batch_ids = [id_ for id_, _ in context.batch]
batch_images = [image for _, image in context.batch]
# simulate if there are or there aren't predictions
SELECTION = 50
predictions = []
for frame_id, frame in zip(batch_ids, batch_images):
# simulated wait of predictions
time.sleep(0.001)
random_value = random.randint(0, 100)
there_predictions = True if random_value > SELECTION else False
if not there_predictions:
predictions.append((frame_id, None))
continue
template_tmp = TEMPLATE.copy()
template_tmp["class_name"] = random.choice(self.ref_class_names)
template_tmp["confidence"] = round(random.random(), 2)
template_tmp["mask"] = random_mask()
predictions.append((frame_id, template_tmp))
return {f"{self.sub_name}_predictions": predictions}
def put_text(frame, text, y=80):
cv2.putText(frame, text, (10, y), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
return frame
@step(name="draw_ia", version="v1.0")
@to_obj
def draw_ia(context: object):
batch_ids = [id_ for id_, _ in context.batch]
batch_images = [image for _, image in context.batch]
a_predictions = context.A_predictions
b_predictions = context.B_predictions
c_predictions = context.C_predictions
print(f"[DEBUG draw_ia] batch_ids: {batch_ids}", flush=True)
print(f"[DEBUG draw_ia] batch has: {len(context.batch)} items", flush=True)
new_batch_images = []
for frame_id, frame, (A_id, A_predict), (B_id, B_predict), (C_id, C_predict) in zip(
batch_ids, batch_images, a_predictions, b_predictions, c_predictions
):
if frame_id != A_id != B_id != C_id:
raise Exception("Error: desncronizacion")
if A_predict is not None:
A_text = (
str(A_predict["class_name"]) + "-" + str(A_predict.get("confidence"))
)
else:
A_text = "-o-"
if B_predict is not None:
B_text = (
str(B_predict["class_name"]) + "-" + str(B_predict.get("confidence"))
)
else:
B_text = "-o-"
if C_predict is not None:
C_text = (
str(C_predict["class_name"]) + "-" + str(C_predict.get("confidence"))
)
else:
C_text = "-o-"
frame = put_text(frame, A_text, y=80)
frame = put_text(frame, B_text, y=120)
frame = put_text(frame, C_text, y=160)
new_batch_images.append((frame_id, frame))
return {"batch": new_batch_images}
@step(name="save_images", version="v1.0")
@to_obj
def save_images(context: object):
import sys
print("save_images called!", flush=True)
batch_ids = [id_ for id_, _ in context.batch]
batch_images = [image for _, image in context.batch]
print(f"[save_images] Saving {len(batch_ids)} images", flush=True)
path_to_save = "output/images"
for frame_id, frame in zip(batch_ids, batch_images):
cv2.imwrite(f"{path_to_save}/{frame_id}.jpg", frame)
return {"saved": True}
os.makedirs("output/images", exist_ok=True)
db_path = "output/wpipe_dashboard.db"
pipe = Pipeline(
pipeline_name="viaje_tmp",
verbose=False,
show_progress=False,
tracking_db=db_path,
)
pipe.add_error_capture([notify_telegram_error])
pipe.set_steps(
[
start_capture,
For(
iterations=150,
steps=[
Create_batch(2),
# option 2
Parallel(
steps=[
Simulated_yolo_inference(CLASS_NAMES_A, "A"),
Simulated_yolo_inference(CLASS_NAMES_B, "B"),
Simulated_yolo_inference(CLASS_NAMES_C, "C"),
],
max_workers=3,
use_processes=True,
),
draw_ia,
save_images,
],
),
],
)
pipe.run({})
Resultado de Ejecuciónļ
Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [0, 1] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [2, 3] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [4, 5] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [6, 7] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [8, 9] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [10, 11] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [12, 13] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [14, 15] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [16, 17] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [18, 19] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [20, 21] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [22, 23] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [24, 25] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [26, 27] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [28, 29] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [30, 31] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [32, 33] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [34, 35] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [36, 37] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [38, 39] [DEBUG draw_ia] batch has: 2 items save_images called! [save_images] Saving 2 images Running Create_batch step⦠[DEBUG draw_ia] batch_ids: [40, 41] [DEBUG draw_ia] batch has: 2 items
š ESTADO FALLIDO: draw_ia š ARCHIVO: /home/william.rodriguez/Documents/wpipe/examples/00_honey_pot/03_yield/demo_level54.py š¢ LĆNEA: 192 ā ļø MENSAJE: Error: desncronizacion āāāāāāāāāāāāāāāāāāāā
[ERROR] Loop broken at iteration 20 due to: [Error Code: 502] Error: desncronizacion