import numpy as np import cv2 from logging import getLogger from collections import deque from cv2_enumerate_cameras import enumerate_cameras from PySide6.QtCore import QObject, QThread, Signal, Slot from PySide6.QtGui import QImage from dataclasses import dataclass, field from .worker import VideoStreamWorker logger = getLogger(__name__) @dataclass class FrameMetrics: fr_start_delta_min: float = float('inf') # Minimum time between frame read starts fr_start_delta_max: float = 0.0 # Maximum time between frame read starts fr_start_delta_avg: float = 0.0 # Average time between frame read starts (calculated over a buffer of recent frames) fr_time_min: float = float('inf') # Minimum time taken to read a frame fr_time_max: float = 0.0 # Maximum time taken to read a frame fr_time_avg: float = 0.0 # Average time taken to read a frame (calculated over a buffer of recent frames) fps_average_1s: float = 0.0 # Average FPS calculated over the last 1 second (or a buffer of recent frames) frame_count: int = 0 # Total number of frames processed last_start_time: float = 0.0 # Timestamp of the last frame read start (used for calculating deltas) buf_size: int = 30 # Size of the buffer for calculating average metrics b_fr_start_delta: deque = field(init=False, repr=False) # Buffer for recent frame read start deltas (used for calculating average) total_start_delta: float = 0.0 # Total accumulated frame read start delta (used for calculating average) b_fr_time_delta: deque = field(init=False, repr=False) # Buffer for recent frame read time deltas (used for calculating average) total_frame_time: float = 0.0 # Total accumulated frame read time delta (used for calculating average) def __post_init__(self): self.b_fr_start_delta = deque(maxlen=self.buf_size) self.b_fr_time_delta = deque(maxlen=self.buf_size) def calc_start_delta_min_max_avg(self, start_time: float) -> None: if self.last_start_time == 0.0: self.last_start_time = start_time return start_delta = start_time - self.last_start_time self.fr_start_delta_min = min(self.fr_start_delta_min, start_delta) self.fr_start_delta_max = max(self.fr_start_delta_max, start_delta) if len(self.b_fr_start_delta) == self.buf_size: self.total_start_delta -= self.b_fr_start_delta[0] self.b_fr_start_delta.append(start_delta) self.total_start_delta += start_delta self.last_start_time = start_time self.fr_start_delta_avg = self.total_start_delta / len(self.b_fr_start_delta) def calc_frame_time_min_max_avg(self, start_time: float, end_time: float) -> None: fr_time = end_time - start_time self.fr_time_min = min(self.fr_time_min, fr_time) self.fr_time_max = max(self.fr_time_max, fr_time) if len(self.b_fr_time_delta) == self.buf_size: self.total_frame_time -= self.b_fr_time_delta[0] self.b_fr_time_delta.append(fr_time) self.total_frame_time += fr_time self.fr_time_avg = self.total_frame_time / len(self.b_fr_time_delta) def calc_fps(self) -> None: if self.frame_count < self.buf_size: return # fps = (len(self.b_fr_start_delta)) / self.total_start_delta if self.total_start_delta > 0 else 0.0 # self.fps_average_1s = fps self.fps_average_1s = 1.0 / self.fr_start_delta_avg if self.fr_start_delta_avg > 0 else 0.0 def update_metrics(self, start_time: float, end_time: float) -> None: self.frame_count += 1 self.calc_start_delta_min_max_avg(start_time) self.calc_frame_time_min_max_avg(start_time, end_time) self.calc_fps() def reset_metrics(self) -> None: self.fr_start_delta_min = float('inf') self.fr_start_delta_max = 0.0 self.fr_start_delta_avg = 0.0 self.fr_time_min = float('inf') self.fr_time_max = 0.0 self.fr_time_avg = 0.0 self.fps_average_1s = 0.0 self.frame_count = 0 self.last_start_time = 0.0 self.b_fr_start_delta.clear() self.total_start_delta = 0.0 self.b_fr_time_delta.clear() self.total_frame_time = 0.0 def get_metrics(self) -> "FrameMetrics": return FrameMetrics( fr_start_delta_min= round(self.fr_start_delta_min * 1000, 3), # Convert to milliseconds fr_start_delta_max= round(self.fr_start_delta_max * 1000, 3), fr_start_delta_avg= round(self.fr_start_delta_avg * 1000, 3), fr_time_min= round(self.fr_time_min * 1000, 3), fr_time_max= round(self.fr_time_max * 1000, 3), fr_time_avg= round(self.fr_time_avg * 1000, 3), fps_average_1s= round(self.fps_average_1s, 3), frame_count=self.frame_count ) class VideoStreamController(QObject): """ Facade class managing the VideoStreamWorker and its QThread. Converts raw frames to QImage for the UI and can route them to other processors. """ image_ready = Signal(QImage) # Signal for the UI raw_frame_ready = Signal(np.ndarray) # Signal for other processing modules error_occurred = Signal(str) status_changed = Signal(bool) def __init__(self): super().__init__() self._thread = QThread() self._worker = VideoStreamWorker() self._worker.moveToThread(self._thread) self._metrics: FrameMetrics | None = None # Connect internal signals self._worker.frame_ready.connect(self._handle_frame) self._worker.frame_time.connect(self._handle_frame_time) self._worker.error_occurred.connect(self.error_occurred) self._worker.started.connect(self.started) self._worker.stopped.connect(self.stopped) # Cleanup self._thread.finished.connect(self._worker.stop) self._thread.start() @staticmethod def get_available_cameras(): """ Returns a list of available camera devices using cv2_enumerate_cameras. Each item is an object with properties like 'index' and 'name'. """ return list(enumerate_cameras()) @Slot() def get_metrics(self) -> FrameMetrics | None: """ Returns the current frame metrics. """ return self._metrics.get_metrics() if self._metrics else None @Slot(object) def change_source(self, source): """ Change the video source (camera index or file path). """ self._worker.set_source(source) @Slot() def start(self): logger.debug("Starting video stream worker thread.") self._worker.start() @Slot() def stop(self): logger.debug("Stopping video stream worker thread.") self._worker.stop() @Slot(float, tuple) def started(self, fps: float, video_res: tuple): logger.debug(f"Video stream worker started with FPS: {fps}, Resolution: {video_res[0]}x{video_res[1]}") self._metrics = FrameMetrics() self.status_changed.emit(True) @Slot() def stopped(self): logger.debug("Video stream worker thread stopped.") if self._metrics: self._metrics.reset_metrics() self.status_changed.emit(False) def cleanup(self): """ Safely shuts down the worker and the thread. Must be called before the application exits. """ logger.debug("Cleaning up video stream controller.") self._worker.stop() self._thread.quit() if not self._thread.wait(2000): # Wait up to 2 seconds self._thread.terminate() self._thread.wait() @Slot(np.ndarray) def _handle_frame(self, frame): """ Internal handler for frames from the worker. Handles conversion to QImage and routing. """ # 1. Emit raw frame for other processing (OCR, Detection, etc.) self.raw_frame_ready.emit(frame) # 2. Convert to QImage for UI q_image = self._convert_to_qimage(frame) self.image_ready.emit(q_image) def _convert_to_qimage(self, frame): """ Converts a BGR numpy array to a RGB QImage. """ if frame is None: return QImage() # OpenCV uses BGR, PySide uses RGB try: frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) height, width, channel = frame_rgb.shape bytes_per_line = channel * width return QImage( frame_rgb.data, width, height, bytes_per_line, QImage.Format.Format_RGB888 ).copy() # .copy() ensures the QImage owns the data and is safe to pass across threads except Exception as e: self.error_occurred.emit(f"Image conversion error: {str(e)}") return QImage() @Slot(float, float) def _handle_frame_time(self, start_time, end_time): """ Optional: Handle frame timing information for performance monitoring. """ if self._metrics: self._metrics.update_metrics(start_time, end_time)