refactor: update camera control signals and improve live view handling

This commit is contained in:
2025-10-13 05:14:27 +02:00
parent 73b51c696e
commit c6345c569d
3 changed files with 104 additions and 147 deletions

View File

@@ -112,7 +112,7 @@ class MainController:
@Slot() @Slot()
def on_camera_started(self): def on_camera_started(self):
"""Updates UI when the camera stream starts.""" """Updates UI when the camera stream starts."""
# self.split_view.show_live_view() self.split_view.toggle_live_view()
self.welcome_view.set_button_text("Stop Camera") self.welcome_view.set_button_text("Stop Camera")
# Re-route button click to stop the camera # Re-route button click to stop the camera
self.welcome_view.camera_start_btn.clicked.disconnect() self.welcome_view.camera_start_btn.clicked.disconnect()

View File

@@ -1,4 +1,4 @@
from PySide6.QtCore import QObject, QTimer, Signal, Slot, QMutex, QMutexLocker from PySide6.QtCore import QObject, QTimer, Signal, Slot, QMutex, QMutexLocker, QThread
from PySide6.QtGui import QImage, QPixmap from PySide6.QtGui import QImage, QPixmap
import cv2 import cv2
@@ -9,6 +9,7 @@ class CameraWorker(QObject):
frame_ready = Signal(QPixmap) frame_ready = Signal(QPixmap)
photo_ready = Signal(QPixmap) photo_ready = Signal(QPixmap)
error_occurred = Signal(str) error_occurred = Signal(str)
camera_ready = Signal(bool)
def __init__(self, parent: QObject | None = None) -> None: def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent) super().__init__(parent)
@@ -44,8 +45,10 @@ class CameraWorker(QObject):
if self.camera.connect(): if self.camera.connect():
self.is_connected = True self.is_connected = True
self.camera_ready.emit(True)
else: else:
self.is_connected = False self.is_connected = False
self.camera_ready.emit(False)
self.error_occurred.emit(self.camera.get_error_msg()) self.error_occurred.emit(self.camera.get_error_msg())
@Slot() @Slot()
@@ -97,15 +100,14 @@ class CameraWorker(QObject):
qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888) qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888)
pixmap = QPixmap.fromImage(qimg) pixmap = QPixmap.fromImage(qimg)
self.frame_ready.emit(pixmap) self.frame_ready.emit(pixmap)
def isConnected(self):
return self.is_connected
class CameraController(QObject): class CameraController(QObject):
frame_ready = Signal(QPixmap) frame_ready = Signal(QPixmap)
photo_ready = Signal(QPixmap) photo_ready = Signal(QPixmap)
error_occurred = Signal(str) error_occurred = Signal(str)
camera_ready = Signal(bool)
# Signals to command the worker # Signals to command the worker
_set_camera_requested = Signal(BaseCamera, int) _set_camera_requested = Signal(BaseCamera, int)
@@ -125,6 +127,7 @@ class CameraController(QObject):
self._worker.frame_ready.connect(self.frame_ready) self._worker.frame_ready.connect(self.frame_ready)
self._worker.photo_ready.connect(self.photo_ready) self._worker.photo_ready.connect(self.photo_ready)
self._worker.error_occurred.connect(self.error_occurred) self._worker.error_occurred.connect(self.error_occurred)
self._worker.camera_ready.connect(self.camera_ready)
# Connect controller's command signals to worker's slots # Connect controller's command signals to worker's slots
self._set_camera_requested.connect(self._worker.set_camera) self._set_camera_requested.connect(self._worker.set_camera)
@@ -156,6 +159,4 @@ class CameraController(QObject):
def stop_stream(self) -> None: def stop_stream(self) -> None:
self._stop_stream_requested.emit() self._stop_stream_requested.emit()
def is_connected(self):
return self._worker.isConnected()

View File

@@ -1,4 +1,4 @@
from PySide6.QtCore import QObject, Signal, QRunnable, QThreadPool, QThread from PySide6.QtCore import QObject, Signal
from PySide6.QtGui import QPixmap from PySide6.QtGui import QPixmap
from .camera_controller import CameraController from .camera_controller import CameraController
@@ -7,159 +7,115 @@ from .opencv_camera import OpenCvCamera
from .base_camera import BaseCamera from .base_camera import BaseCamera
class CameraDetectionWorker(QRunnable):
"""
Worker thread for detecting cameras to avoid blocking the GUI.
"""
class WorkerSignals(QObject):
finished = Signal(list)
error = Signal(str)
def __init__(self):
super().__init__()
self.signals = self.WorkerSignals()
def run(self) -> None:
"""The main work of the worker."""
detected_cameras = []
try:
gphoto_cameras = GPhotoCamera.detect()
for index, info in gphoto_cameras.items():
detected_cameras.append({
"id": f"gphoto_{index}",
"name": f"{info['name']} ({info['port']})",
"type": "gphoto",
"index": index
})
except Exception as e:
self.signals.error.emit(f"Błąd podczas wykrywania kamer GPhoto: {e}")
try:
opencv_cameras = OpenCvCamera.detect()
for index, info in opencv_cameras.items():
detected_cameras.append({
"id": f"opencv_{index}",
"name": f"OpenCV: {info['name']}",
"type": "opencv",
"index": index
})
except Exception as e:
self.signals.error.emit(f"Błąd podczas wykrywania kamer OpenCV: {e}")
self.signals.finished.emit(detected_cameras)
class CameraManager(QObject): class CameraManager(QObject):
""" """
Zarządza wszystkimi operacjami związanymi z kamerami, Zarządza wszystkimi operacjami związanymi z kamerami,
stanowiąc fasadę dla reszty aplikacji. stanowiąc fasadę dla reszty aplikacji.
""" """
# --- Public API Signals --- frame_ready = Signal(QPixmap)
frame_ready = Signal(QPixmap) error_occurred = Signal(str)
error_occurred = Signal(str) cameras_detected = Signal(list)
detection_started = Signal() camera_started = Signal()
cameras_detected = Signal(list) camera_stopped = Signal()
camera_started = Signal()
camera_stopped = Signal()
# --- Internal signals to communicate with worker thread --- def __init__(self, parent: QObject | None = None) -> None:
_request_set_camera = Signal(BaseCamera, int) super().__init__(parent)
_request_start_camera = Signal() self._camera_controller = CameraController()
_request_stop_camera = Signal() self._detected_cameras: list[dict] = []
_request_start_stream = Signal() self._active_camera: BaseCamera | None = None
_request_stop_stream = Signal() self._active_camera_info: dict | None = None
def __init__(self, parent: QObject | None = None) -> None: # Przekazywanie sygnałów z kontrolera kamery na zewnątrz
super().__init__(parent) self._camera_controller.frame_ready.connect(self.frame_ready)
self._camera_controller.error_occurred.connect(self.error_occurred)
self._camera_thread = QThread() self._camera_controller.camera_ready.connect(self.start_liveview)
self._camera_controller = CameraController()
self._camera_controller.moveToThread(self._camera_thread)
# --- Connections --- def detect_cameras(self) -> None:
# Connect signals from controller to be re-emitted by manager """Wykrywa wszystkie dostępne kamery (GPhoto i OpenCV)."""
self._camera_controller.frame_ready.connect(self.frame_ready) self._detected_cameras.clear()
self._camera_controller.error_occurred.connect(self.error_occurred)
# Connect internal requests to controller slots
self._request_set_camera.connect(self._camera_controller.set_camera)
self._request_start_camera.connect(self._camera_controller.start_camera)
self._request_stop_camera.connect(self._camera_controller.stop_camera)
self._request_start_stream.connect(self._camera_controller.start_stream)
self._request_stop_stream.connect(self._camera_controller.stop_stream)
# Connect thread management # Wykryj kamery GPhoto
self._camera_thread.started.connect(self._camera_controller.run) try:
gphoto_cameras = GPhotoCamera.detect()
self._camera_thread.start() for index, info in gphoto_cameras.items():
self._detected_cameras.append({
"id": f"gphoto_{index}",
"name": f"{info['name']} ({info['port']})",
"type": "gphoto",
"index": index
})
except Exception as e:
self.error_occurred.emit(
f"Błąd podczas wykrywania kamer GPhoto: {e}")
self._detected_cameras: list[dict] = [] # Wykryj kamery OpenCV
self._active_camera_info: dict | None = None try:
self.thread_pool = QThreadPool.globalInstance() # For detection worker opencv_cameras = OpenCvCamera.detect()
for index, info in opencv_cameras.items():
self._detected_cameras.append({
"id": f"opencv_{index}",
"name": f"OpenCV: {info['name']}",
"type": "opencv",
"index": index
})
except Exception as e:
self.error_occurred.emit(
f"Błąd podczas wykrywania kamer OpenCV: {e}")
def detect_cameras(self) -> None: self.cameras_detected.emit(self._detected_cameras)
self.detection_started.emit()
worker = CameraDetectionWorker()
worker.signals.finished.connect(self._on_detection_finished)
worker.signals.error.connect(self.error_occurred)
self.thread_pool.start(worker)
def _on_detection_finished(self, detected_cameras: list): def get_detected_cameras(self) -> list[dict]:
self._detected_cameras = detected_cameras return self._detected_cameras
self.cameras_detected.emit(self._detected_cameras)
def get_detected_cameras(self) -> list[dict]: def start_camera(self, camera_id: str, fps: int = 15) -> None:
return self._detected_cameras """Uruchamia wybraną kamerę."""
if self._active_camera:
self.stop_camera()
def start_camera(self, camera_id: str, fps: int = 15) -> None: camera_info = next(
if self._active_camera_info and self._active_camera_info['id'] == camera_id: (c for c in self._detected_cameras if c['id'] == camera_id), None)
return
if self._active_camera_info: if not camera_info:
self.stop_camera() self.error_occurred.emit(
f"Nie znaleziono kamery o ID: {camera_id}")
return
camera_info = next((c for c in self._detected_cameras if c['id'] == camera_id), None) camera_type = camera_info['type']
camera_index = camera_info['index']
if not camera_info: if camera_type == "gphoto":
self.error_occurred.emit(f"Nie znaleziono kamery o ID: {camera_id}") self._active_camera = GPhotoCamera()
return elif camera_type == "opencv":
self._active_camera = OpenCvCamera()
else:
self.error_occurred.emit(f"Nieznany typ kamery: {camera_type}")
return
camera_type = camera_info['type'] self._active_camera_info = camera_info
camera_index = camera_info['index']
camera_instance: BaseCamera | None = None self._camera_controller.set_camera(self._active_camera, fps)
if camera_type == "gphoto": self._camera_controller.start_camera()
camera_instance = GPhotoCamera()
elif camera_type == "opencv":
camera_instance = OpenCvCamera()
else:
self.error_occurred.emit(f"Nieznany typ kamery: {camera_type}")
return
self._active_camera_info = camera_info
# Emit signals to trigger slots in the worker thread
self._request_set_camera.emit(camera_instance, fps)
self._request_start_camera.emit()
self._request_start_stream.emit()
self.camera_started.emit()
def stop_camera(self) -> None: def start_liveview(self, connected):
if self._active_camera_info: if connected:
# Emit signals to trigger slots in the worker thread self._camera_controller.start_stream()
self._request_stop_stream.emit() self.camera_started.emit()
self._request_stop_camera.emit() else:
self._active_camera_info = None self._active_camera = None
self.camera_stopped.emit() self._active_camera_info = None
def get_active_camera_info(self) -> dict | None: def stop_camera(self) -> None:
return self._active_camera_info """Zatrzymuje aktywną kamerę."""
if self._active_camera:
self._camera_controller.stop_camera()
self._active_camera = None
self._active_camera_info = None
self.camera_stopped.emit()
def shutdown(self) -> None: def get_active_camera_info(self) -> dict | None:
if self._camera_thread.isRunning(): return self._active_camera_info
self._camera_thread.quit()
if not self._camera_thread.wait(5000): def shutdown(self) -> None:
print("Camera thread did not finish gracefully, terminating.") """Zamyka kontroler kamery i jego wątek."""
self._camera_thread.terminate() self.stop_camera()
self._camera_thread.wait() self._camera_controller.stop()