from PySide6.QtCore import QObject, QThread, QTimer, Signal, Slot, QMutex, QMutexLocker from PySide6.QtGui import QImage, QPixmap import cv2 from .base_camera import BaseCamera class CameraWorker(QObject): frame_ready = Signal(QPixmap) photo_ready = Signal(QPixmap) error_occurred = Signal(str) def __init__(self, parent: QObject | None = None) -> None: super().__init__(parent) self.camera: BaseCamera | None = None self.timer: QTimer | None = None self.fps = 15 self.is_streaming = False self.is_connected = False self._camera_mutex = QMutex() @Slot() def initialize_worker(self): """Initializes the timer in the worker's thread.""" self.timer = QTimer() self.timer.timeout.connect(self._update_frame) @Slot(BaseCamera, int) def set_camera(self, camera: BaseCamera, fps: int = 15) -> None: with QMutexLocker(self._camera_mutex): self.stop_stream() self.stop_camera() self.camera = camera self.fps = fps @Slot() def start_camera(self) -> None: if self.camera is None or self.is_connected: return if self.camera.connect(): self.is_connected = True else: self.is_connected = False self.error_occurred.emit(self.camera.get_error_msg()) @Slot() def stop_camera(self) -> None: if self.is_streaming: self.stop_stream() if self.camera is not None: self.camera.disconnect() self.is_connected = False @Slot() def start_stream(self): if not self.is_connected or self.is_streaming or self.timer is None: return self.is_streaming = True self.timer.setInterval(int(1000 / self.fps)) self.timer.start() @Slot() def stop_stream(self) -> None: if self.is_streaming and self.timer is not None: self.is_streaming = False self.timer.stop() @Slot() def _update_frame(self) -> None: with QMutexLocker(self._camera_mutex): if self.camera is None or not self.is_connected or not self.is_streaming: return ret, frame = self.camera.get_frame() if not ret: self.error_occurred.emit(self.camera.get_error_msg()) return if frame is not None: rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) h, w, ch = rgb_image.shape qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888) pixmap = QPixmap.fromImage(qimg) self.frame_ready.emit(pixmap) def isConnected(self): return self.is_connected class CameraController(QObject): frame_ready = Signal(QPixmap) photo_ready = Signal(QPixmap) error_occurred = Signal(str) # Signals to command the worker _set_camera_requested = Signal(BaseCamera, int) _start_camera_requested = Signal() _stop_camera_requested = Signal() _start_stream_requested = Signal() _stop_stream_requested = Signal() def __init__(self, parent: QObject | None = None) -> None: super().__init__(parent) self._thread = QThread() self._worker = CameraWorker() self._worker.moveToThread(self._thread) # Connect worker signals to controller signals self._worker.frame_ready.connect(self.frame_ready) self._worker.photo_ready.connect(self.photo_ready) self._worker.error_occurred.connect(self.error_occurred) # Connect controller's command signals to worker's slots self._set_camera_requested.connect(self._worker.set_camera) self._start_camera_requested.connect(self._worker.start_camera) self._stop_camera_requested.connect(self._worker.stop_camera) self._start_stream_requested.connect(self._worker.start_stream) self._stop_stream_requested.connect(self._worker.stop_stream) # Initialize worker when thread starts self._thread.started.connect(self._worker.initialize_worker) self._thread.start() def stop(self): self._thread.quit() self._thread.wait() def set_camera(self, camera: BaseCamera, fps: int = 15) -> None: self._set_camera_requested.emit(camera, fps) def start_camera(self) -> None: self._start_camera_requested.emit() def stop_camera(self) -> None: self._stop_camera_requested.emit() def start_stream(self): self._start_stream_requested.emit() def stop_stream(self) -> None: self._stop_stream_requested.emit() def is_connected(self): return self._worker.isConnected()