From c38e71dec42d48348f8c62149914aa1c57545df0 Mon Sep 17 00:00:00 2001 From: bartool Date: Sun, 10 May 2026 21:35:48 +0200 Subject: [PATCH] Add VideoStreamController and VideoStreamWorker classes for video processing Co-authored-by: Copilot --- app/video_stream/__init__.py | 3 + app/video_stream/controller.py | 238 +++++++++++++++++++++++++++++++++ app/video_stream/worker.py | 118 ++++++++++++++++ 3 files changed, 359 insertions(+) create mode 100644 app/video_stream/__init__.py create mode 100644 app/video_stream/controller.py create mode 100644 app/video_stream/worker.py diff --git a/app/video_stream/__init__.py b/app/video_stream/__init__.py new file mode 100644 index 0000000..a214bd3 --- /dev/null +++ b/app/video_stream/__init__.py @@ -0,0 +1,3 @@ +from .controller import VideoStreamController + +__all__ = ['VideoStreamController'] \ No newline at end of file diff --git a/app/video_stream/controller.py b/app/video_stream/controller.py new file mode 100644 index 0000000..a2687f0 --- /dev/null +++ b/app/video_stream/controller.py @@ -0,0 +1,238 @@ +import numpy as np +import cv2 +from logging import getLogger +from collections import deque +from cv2_enumerate_cameras import enumerate_cameras +from PySide6.QtCore import QObject, QThread, Signal, Slot +from PySide6.QtGui import QImage +from dataclasses import dataclass, field +from .worker import VideoStreamWorker + +logger = getLogger(__name__) + + +@dataclass +class FrameMetrics: + fr_start_delta_min: float = float('inf') # Minimum time between frame read starts + fr_start_delta_max: float = 0.0 # Maximum time between frame read starts + fr_start_delta_avg: float = 0.0 # Average time between frame read starts (calculated over a buffer of recent frames) + fr_time_min: float = float('inf') # Minimum time taken to read a frame + fr_time_max: float = 0.0 # Maximum time taken to read a frame + fr_time_avg: float = 0.0 # Average time taken to read a frame (calculated over a buffer of recent frames) + fps_average_1s: float = 0.0 # Average FPS calculated over the last 1 second (or a buffer of recent frames) + frame_count: int = 0 # Total number of frames processed + last_start_time: float = 0.0 # Timestamp of the last frame read start (used for calculating deltas) + buf_size: int = 30 # Size of the buffer for calculating average metrics + b_fr_start_delta: deque = field(init=False, repr=False) # Buffer for recent frame read start deltas (used for calculating average) + total_start_delta: float = 0.0 # Total accumulated frame read start delta (used for calculating average) + b_fr_time_delta: deque = field(init=False, repr=False) # Buffer for recent frame read time deltas (used for calculating average) + total_frame_time: float = 0.0 # Total accumulated frame read time delta (used for calculating average) + + def __post_init__(self): + self.b_fr_start_delta = deque(maxlen=self.buf_size) + self.b_fr_time_delta = deque(maxlen=self.buf_size) + + def calc_start_delta_min_max_avg(self, start_time: float) -> None: + if self.last_start_time == 0.0: + self.last_start_time = start_time + return + + start_delta = start_time - self.last_start_time + self.fr_start_delta_min = min(self.fr_start_delta_min, start_delta) + self.fr_start_delta_max = max(self.fr_start_delta_max, start_delta) + + if len(self.b_fr_start_delta) == self.buf_size: + self.total_start_delta -= self.b_fr_start_delta[0] + + self.b_fr_start_delta.append(start_delta) + self.total_start_delta += start_delta + self.last_start_time = start_time + + self.fr_start_delta_avg = self.total_start_delta / len(self.b_fr_start_delta) + + def calc_frame_time_min_max_avg(self, start_time: float, end_time: float) -> None: + fr_time = end_time - start_time + self.fr_time_min = min(self.fr_time_min, fr_time) + self.fr_time_max = max(self.fr_time_max, fr_time) + + if len(self.b_fr_time_delta) == self.buf_size: + self.total_frame_time -= self.b_fr_time_delta[0] + + self.b_fr_time_delta.append(fr_time) + self.total_frame_time += fr_time + + self.fr_time_avg = self.total_frame_time / len(self.b_fr_time_delta) + + def calc_fps(self) -> None: + if self.frame_count < self.buf_size: + return + + # fps = (len(self.b_fr_start_delta)) / self.total_start_delta if self.total_start_delta > 0 else 0.0 + # self.fps_average_1s = fps + + self.fps_average_1s = 1.0 / self.fr_start_delta_avg if self.fr_start_delta_avg > 0 else 0.0 + + def update_metrics(self, start_time: float, end_time: float) -> None: + self.frame_count += 1 + self.calc_start_delta_min_max_avg(start_time) + self.calc_frame_time_min_max_avg(start_time, end_time) + self.calc_fps() + + def reset_metrics(self) -> None: + self.fr_start_delta_min = float('inf') + self.fr_start_delta_max = 0.0 + self.fr_start_delta_avg = 0.0 + self.fr_time_min = float('inf') + self.fr_time_max = 0.0 + self.fr_time_avg = 0.0 + self.fps_average_1s = 0.0 + self.frame_count = 0 + self.last_start_time = 0.0 + self.b_fr_start_delta.clear() + self.total_start_delta = 0.0 + self.b_fr_time_delta.clear() + self.total_frame_time = 0.0 + + def get_metrics(self) -> "FrameMetrics": + return FrameMetrics( + fr_start_delta_min= round(self.fr_start_delta_min * 1000, 3), # Convert to milliseconds + fr_start_delta_max= round(self.fr_start_delta_max * 1000, 3), + fr_start_delta_avg= round(self.fr_start_delta_avg * 1000, 3), + fr_time_min= round(self.fr_time_min * 1000, 3), + fr_time_max= round(self.fr_time_max * 1000, 3), + fr_time_avg= round(self.fr_time_avg * 1000, 3), + fps_average_1s= round(self.fps_average_1s, 3), + frame_count=self.frame_count + ) + + +class VideoStreamController(QObject): + """ + Facade class managing the VideoStreamWorker and its QThread. + Converts raw frames to QImage for the UI and can route them to other processors. + """ + image_ready = Signal(QImage) # Signal for the UI + raw_frame_ready = Signal(np.ndarray) # Signal for other processing modules + error_occurred = Signal(str) + status_changed = Signal(bool) + + def __init__(self): + super().__init__() + self._thread = QThread() + self._worker = VideoStreamWorker() + self._worker.moveToThread(self._thread) + self._metrics: FrameMetrics | None = None + + # Connect internal signals + self._worker.frame_ready.connect(self._handle_frame) + self._worker.frame_time.connect(self._handle_frame_time) + self._worker.error_occurred.connect(self.error_occurred) + self._worker.started.connect(self.started) + self._worker.stopped.connect(self.stopped) + + # Cleanup + self._thread.finished.connect(self._worker.stop) + + self._thread.start() + + @staticmethod + def get_available_cameras(): + """ + Returns a list of available camera devices using cv2_enumerate_cameras. + Each item is an object with properties like 'index' and 'name'. + """ + return list(enumerate_cameras()) + + @Slot() + def get_metrics(self) -> FrameMetrics | None: + """ + Returns the current frame metrics. + """ + return self._metrics.get_metrics() if self._metrics else None + + @Slot(object) + def change_source(self, source): + """ + Change the video source (camera index or file path). + """ + self._worker.set_source(source) + + @Slot() + def start(self): + logger.debug("Starting video stream worker thread.") + self._worker.start() + + @Slot() + def stop(self): + logger.debug("Stopping video stream worker thread.") + self._worker.stop() + + @Slot(float, tuple) + def started(self, fps: float, video_res: tuple): + logger.debug(f"Video stream worker started with FPS: {fps}, Resolution: {video_res[0]}x{video_res[1]}") + self._metrics = FrameMetrics() + self.status_changed.emit(True) + + @Slot() + def stopped(self): + logger.debug("Video stream worker thread stopped.") + if self._metrics: + self._metrics.reset_metrics() + self.status_changed.emit(False) + + def cleanup(self): + """ + Safely shuts down the worker and the thread. + Must be called before the application exits. + """ + logger.debug("Cleaning up video stream controller.") + self._worker.stop() + self._thread.quit() + if not self._thread.wait(2000): # Wait up to 2 seconds + self._thread.terminate() + self._thread.wait() + + @Slot(np.ndarray) + def _handle_frame(self, frame): + """ + Internal handler for frames from the worker. + Handles conversion to QImage and routing. + """ + # 1. Emit raw frame for other processing (OCR, Detection, etc.) + self.raw_frame_ready.emit(frame) + + # 2. Convert to QImage for UI + q_image = self._convert_to_qimage(frame) + self.image_ready.emit(q_image) + + def _convert_to_qimage(self, frame): + """ + Converts a BGR numpy array to a RGB QImage. + """ + if frame is None: + return QImage() + + # OpenCV uses BGR, PySide uses RGB + try: + frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + height, width, channel = frame_rgb.shape + bytes_per_line = channel * width + + return QImage( + frame_rgb.data, + width, + height, + bytes_per_line, + QImage.Format.Format_RGB888 + ).copy() # .copy() ensures the QImage owns the data and is safe to pass across threads + except Exception as e: + self.error_occurred.emit(f"Image conversion error: {str(e)}") + return QImage() + + @Slot(float, float) + def _handle_frame_time(self, start_time, end_time): + """ + Optional: Handle frame timing information for performance monitoring. + """ + if self._metrics: + self._metrics.update_metrics(start_time, end_time) \ No newline at end of file diff --git a/app/video_stream/worker.py b/app/video_stream/worker.py new file mode 100644 index 0000000..db300db --- /dev/null +++ b/app/video_stream/worker.py @@ -0,0 +1,118 @@ +import cv2 +import numpy as np +import time +from PySide6.QtCore import QObject, Signal, Slot, QTimer +from logging import getLogger + +logger = getLogger(__name__) + +class VideoStreamWorker(QObject): + """ + Worker class responsible for reading frames from OpenCV VideoCapture. + Runs in a separate QThread. + """ + frame_ready = Signal(np.ndarray) + frame_time = Signal(float, float) # (start_time, end_time) + error_occurred = Signal(str) + started = Signal(float, tuple) # (fps, (width, height)) + stopped = Signal() + source_changed = Signal(object) # Nowy sygnał dla zmiany źródła + + def __init__(self): + super().__init__() + self._cap = None + self._source = 0 # Default to first camera + self._is_running = False + self._timer = QTimer() + self._timer.timeout.connect(self._process_frame) + self.source_changed.connect(self._on_source_changed) # Łączymy sygnał ze slotem + + @Slot(object) + def set_source(self, source): + """ + Sets the video source asynchronously. + source: int (camera index) or str (file path). + """ + self.source_changed.emit(source) # Emitujemy sygnał zamiast synchronicznej zmiany + + @Slot(object) + def _on_source_changed(self, source): + """ + Slot obsługujący zmianę źródła w wątku roboczym. + """ + logger.debug(f"Changing video source to: {source}") + was_running = self._is_running + if was_running: + self.stop() # Zatrzymujemy synchronicznie w tym samym wątku + self._source = source + if was_running: + self.start() # Uruchamiamy ponownie w tym samym wątku + + @Slot() + def start(self): + if self._is_running: + logger.warning("VideoStreamWorker is already running.") + return + + if self._source is None: + logger.error("No video source specified.") + self.error_occurred.emit("No source specified.") + return + + self._cap = cv2.VideoCapture(self._source) + if not self._cap.isOpened(): + logger.error(f"Could not open source: {self._source}") + self.error_occurred.emit(f"Could not open source: {self._source}") + return + + self._is_running = True + # Using a timer for consistent frame rate and to play nice with the event loop + # For files, we might want to calculate the interval from FPS. + fps = self._cap.get(cv2.CAP_PROP_FPS) + interval = int(1000 / fps) if fps > 0 else 30 + + video_res = (int(self._cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(self._cap.get(cv2.CAP_PROP_FRAME_HEIGHT))) + logger.debug(f"Video opened: {self._source} (fps: {fps}, width: {video_res[0]}, height: {video_res[1]})") + + self._timer.start(interval) + self.started.emit(fps, video_res) + + @Slot() + def stop(self): + logger.debug("Stopping VideoStreamWorker.") + self._is_running = False + self._timer.stop() + if self._cap: + self._cap.release() + self._cap = None + self.stopped.emit() + + def _process_frame(self): + if not self._is_running: + logger.debug("VideoStreamWorker is not running. Skipping frame processing.") + return + + if self._cap is None or not self._cap.isOpened(): + logger.error("VideoCapture is not initialized or opened.") + self.error_occurred.emit("VideoCapture not initialized.") + self.stop() + return + try: + read_start_time = time.perf_counter() + ret, frame = self._cap.read() + if ret: + self.frame_ready.emit(frame) + else: + # End of video file or lost connection + if isinstance(self._source, str): + self.stop() + else: + logger.error("Lost connection to camera.") + self.error_occurred.emit("Lost connection to camera.") + self.stop() + read_end_time = time.perf_counter() + self.frame_time.emit(read_start_time, read_end_time) + except Exception as e: + logger.error(f"Error reading frame: {str(e)}") + self.error_occurred.emit(f"Error reading frame: {str(e)}") +