from PySide6.QtCore import QObject, QThread, QTimer, Signal, Slot from PySide6.QtGui import QImage, QPixmap import cv2 from .base_camera import BaseCamera class CameraManager(QThread): frame_ready = Signal(QPixmap) photo_ready = Signal(QPixmap) error_occurred = Signal(str) def __init__(self, camera: BaseCamera, fps: int = 15, parent: QObject | None = None) -> None: super().__init__(parent) self.camera = camera self.fps = fps self.timer = None self.is_streaming = False self.is_connected = False self.timer = QTimer() self.timer.setInterval(int(1000 / self.fps)) self.timer.timeout.connect(self._update_frame) self.start() def run(self) -> None: # self.timer = QTimer() # self.timer.setInterval(int(1000 / self.fps)) # self.timer.timeout.connect(self._update_frame) self.exec() def start_camera(self) -> None: if self.is_connected: return if self.camera.connect(): self.is_connected = True else: self.is_connected = False self.error_occurred.emit(self.camera.get_error_msg()) def stop_camera(self) -> None: self.is_streaming = False self.is_connected = False if self.timer: self.timer.stop() self.camera.disconnect() def start_stream(self): if not self.is_connected: return if self.is_streaming: return if self.timer: self.is_streaming = True self.timer.start() def stop_stream(self) -> None: if self.is_streaming: self.is_streaming = False if self.timer: self.timer.stop() def _update_frame(self) -> None: if not self.is_streaming or not self.is_connected: return ret, frame = self.camera.get_frame() if not ret: self.error_occurred.emit(self.camera.get_error_msg()) return if frame is not None: rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) h, w, ch = rgb_image.shape qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888) pixmap = QPixmap.fromImage(qimg) self.frame_ready.emit(pixmap) def stop(self): self.stop_camera() self.quit() self.wait()