98 lines
2.9 KiB
Python
98 lines
2.9 KiB
Python
# import gphoto2 as gp
|
|
import numpy as np
|
|
import cv2
|
|
|
|
from PySide6.QtCore import QObject, QThread, Signal
|
|
from PySide6.QtGui import QImage, QPixmap
|
|
|
|
# try:
|
|
# import gphoto2 as gp
|
|
# except:
|
|
from . import mock_gphoto as gp
|
|
|
|
class CameraWorker(QObject):
|
|
frameReady = Signal(QPixmap)
|
|
errorOccurred = Signal(str)
|
|
|
|
def __init__(self, fps: int = 15, parent=None):
|
|
super().__init__(parent)
|
|
self.fps = fps
|
|
self.running = False
|
|
self.camera = None
|
|
|
|
def start_camera(self):
|
|
"""Uruchom kamerę i zacznij pobierać klatki"""
|
|
try:
|
|
self.camera = gp.Camera() # type: ignore
|
|
self.camera.init()
|
|
self.running = True
|
|
self._capture_loop()
|
|
except gp.GPhoto2Error as e:
|
|
self.errorOccurred.emit(f"Błąd inicjalizacji kamery: {e}")
|
|
|
|
def stop_camera(self):
|
|
"""Zatrzymaj pobieranie"""
|
|
self.running = False
|
|
if self.camera:
|
|
try:
|
|
self.camera.exit()
|
|
except gp.GPhoto2Error:
|
|
pass
|
|
self.camera = None
|
|
|
|
def _capture_loop(self):
|
|
"""Pętla odczytu klatek w osobnym wątku"""
|
|
import time
|
|
delay = 1.0 / self.fps
|
|
|
|
while self.running:
|
|
try:
|
|
file = self.camera.capture_preview() # type: ignore
|
|
data = file.get_data_and_size()
|
|
frame = np.frombuffer(data, dtype=np.uint8)
|
|
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
|
|
|
|
if frame is not None:
|
|
rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
h, w, ch = rgb_image.shape
|
|
qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888)
|
|
pixmap = QPixmap.fromImage(qimg)
|
|
self.frameReady.emit(pixmap)
|
|
|
|
except gp.GPhoto2Error as e:
|
|
self.errorOccurred.emit(f"Błąd odczytu LiveView: {e}")
|
|
break
|
|
except Exception as e:
|
|
self.errorOccurred.emit(f"Nieoczekiwany błąd: {e}")
|
|
break
|
|
|
|
time.sleep(delay)
|
|
|
|
|
|
class CameraController(QObject):
|
|
frameReady = Signal(QPixmap)
|
|
errorOccurred = Signal(str)
|
|
|
|
def __init__(self, fps: int = 15, parent=None):
|
|
super().__init__(parent)
|
|
self.camera_thread = QThread()
|
|
self.worker = CameraWorker(fps)
|
|
self.worker.moveToThread(self.camera_thread )
|
|
|
|
# sygnały z workera
|
|
self.worker.frameReady.connect(self.frameReady)
|
|
self.worker.errorOccurred.connect(self.errorOccurred)
|
|
|
|
# sygnały start/stop
|
|
self.camera_thread.started.connect(self.worker.start_camera)
|
|
|
|
def start(self):
|
|
"""Start kamery w osobnym wątku"""
|
|
self.camera_thread.start()
|
|
|
|
def stop(self):
|
|
"""Stop kamery i zakończenie wątku"""
|
|
self.worker.stop_camera()
|
|
self.camera_thread.quit()
|
|
self.camera_thread.wait()
|