from PySide6.QtCore import QObject, QTimer, Signal, Slot, QMutex, QMutexLocker from PySide6.QtGui import QImage, QPixmap import cv2 from .base_camera import BaseCamera class CameraController(QObject): """ A QObject worker for handling camera operations in a separate thread. This object should be moved to a QThread. """ frame_ready = Signal(QPixmap) photo_ready = Signal(QPixmap) error_occurred = Signal(str) def __init__(self, parent: QObject | None = None) -> None: super().__init__(parent) self.camera: BaseCamera | None = None self.timer: QTimer | None = None self.fps = 15 self.is_streaming = False self.is_connected = False self._camera_mutex = QMutex() @Slot() def run(self): """ Initializes resources in the worker thread. This should be connected to the QThread.started signal. """ self.timer = QTimer() self.timer.timeout.connect(self._update_frame) @Slot(BaseCamera, int) def set_camera(self, camera: BaseCamera, fps: int = 15) -> None: with QMutexLocker(self._camera_mutex): if self.is_streaming: self.stop_stream() if self.is_connected: self.stop_camera() self.camera = camera self.fps = fps @Slot() def start_camera(self) -> None: with QMutexLocker(self._camera_mutex): if self.camera is None or self.is_connected: return if self.camera.connect(): self.is_connected = True else: self.is_connected = False self.error_occurred.emit(self.camera.get_error_msg()) @Slot() def stop_camera(self) -> None: with QMutexLocker(self._camera_mutex): if self.is_streaming: self.stop_stream() if self.camera is not None and self.is_connected: self.camera.disconnect() self.is_connected = False @Slot() def start_stream(self): with QMutexLocker(self._camera_mutex): if not self.is_connected or self.is_streaming or self.timer is None: return self.is_streaming = True self.timer.setInterval(int(1000 / self.fps)) self.timer.start() @Slot() def stop_stream(self) -> None: with QMutexLocker(self._camera_mutex): if not self.is_streaming or self.timer is None: return self.is_streaming = False self.timer.stop() def _update_frame(self) -> None: # This method is called by the timer, which is in the same thread. # A mutex is still good practice for accessing the shared camera object. with QMutexLocker(self._camera_mutex): if self.camera is None or not self.is_connected or not self.is_streaming: return ret, frame = self.camera.get_frame() if not ret: error_msg = self.camera.get_error_msg() if error_msg: self.error_occurred.emit(error_msg) return if frame is not None: # Process the frame and emit it. rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) h, w, ch = rgb_image.shape qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888) pixmap = QPixmap.fromImage(qimg) self.frame_ready.emit(pixmap)