from PySide6.QtCore import QObject, QThread, QTimer, Signal, Slot, QMutex, QMutexLocker from PySide6.QtGui import QImage, QPixmap import cv2 from .base_camera import BaseCamera class CameraController(QThread): frame_ready = Signal(QPixmap) photo_ready = Signal(QPixmap) error_occurred = Signal(str) _enable_timer = Signal(bool) def __init__(self, parent: QObject | None = None) -> None: super().__init__(parent) self.camera = None self.timer = None self.fps = 15 self.is_streaming = False self.is_connected = False self._camera_mutex = QMutex() self.start() def run(self) -> None: self.timer = QTimer() self.timer.timeout.connect(self._update_frame) self._enable_timer.connect(self._set_timer) self.exec() def stop(self): self.stop_camera() self.quit() self.wait() def set_camera(self, camera: BaseCamera, fps: int = 15) -> None: with QMutexLocker(self._camera_mutex): self.stop_stream() self.stop_camera() self.camera = camera self.fps = fps def start_camera(self) -> None: if self.camera is None or self.is_connected: return if self.camera.connect(): self.is_connected = True else: self.is_connected = False self.error_occurred.emit(self.camera.get_error_msg()) def stop_camera(self) -> None: if self.is_streaming: self.stop_stream() if self.camera is not None: self.camera.disconnect() self.is_connected = False def start_stream(self): if not self.is_connected: return if self.is_streaming: return if self.timer: self.is_streaming = True # self.timer.start() self._enable_timer.emit(True) def stop_stream(self) -> None: if self.is_streaming: self.is_streaming = False if self.timer: # self.timer.stop() self._enable_timer.emit(False) def _update_frame(self) -> None: with QMutexLocker(self._camera_mutex): if self.camera is None or not self.is_connected: return if not self.is_streaming: return ret, frame = self.camera.get_frame() if not ret: self.error_occurred.emit(self.camera.get_error_msg()) return if frame is not None: rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) h, w, ch = rgb_image.shape qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888) pixmap = QPixmap.fromImage(qimg) self.frame_ready.emit(pixmap) def _set_timer(self, enable: bool): if not self.timer: return if enable: self.timer.setInterval(int(1000 / self.fps)) self.timer.start() else: self.timer.stop()