import gphoto2 as gp import numpy as np import cv2 from PySide6.QtCore import QObject, QThread, Signal from PySide6.QtGui import QImage, QPixmap class CameraWorker(QObject): frameReady = Signal(QPixmap) errorOccurred = Signal(str) def __init__(self, fps: int = 15, parent=None): super().__init__(parent) self.fps = fps self.running = False self.camera = None def start_camera(self): """Uruchom kamerę i zacznij pobierać klatki""" try: self.camera = gp.Camera() self.camera.init() self.running = True self._capture_loop() except gp.GPhoto2Error as e: self.errorOccurred.emit(f"Błąd inicjalizacji kamery: {e}") def stop_camera(self): """Zatrzymaj pobieranie""" self.running = False if self.camera: try: self.camera.exit() except gp.GPhoto2Error: pass self.camera = None def _capture_loop(self): """Pętla odczytu klatek w osobnym wątku""" import time delay = 1.0 / self.fps while self.running: try: file = self.camera.capture_preview() # type: ignore data = file.get_data_and_size() frame = np.frombuffer(data, dtype=np.uint8) frame = cv2.imdecode(frame, cv2.IMREAD_COLOR) if frame is not None: rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) h, w, ch = rgb_image.shape qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888) pixmap = QPixmap.fromImage(qimg) self.frameReady.emit(pixmap) except gp.GPhoto2Error as e: self.errorOccurred.emit(f"Błąd odczytu LiveView: {e}") break except Exception as e: self.errorOccurred.emit(f"Nieoczekiwany błąd: {e}") break time.sleep(delay) class CameraController(QObject): frameReady = Signal(QPixmap) errorOccurred = Signal(str) def __init__(self, fps: int = 15, parent=None): super().__init__(parent) self.camera_thread = QThread() self.worker = CameraWorker(fps) self.worker.moveToThread(self.camera_thread ) # sygnały z workera self.worker.frameReady.connect(self.frameReady) self.worker.errorOccurred.connect(self.errorOccurred) # sygnały start/stop self.camera_thread .started.connect(self.worker.start_camera) def start(self): """Start kamery w osobnym wątku""" self.camera_thread.start() def stop(self): """Stop kamery i zakończenie wątku""" self.worker.stop_camera() self.camera_thread.quit() self.camera_thread.wait()