1 Commits

Author SHA1 Message Date
6d616fd15e new camera handling concept 2025-09-18 20:14:12 +02:00
9 changed files with 293 additions and 174 deletions

View File

@@ -1,31 +1,97 @@
# import gphoto2 as gp
import numpy as np
import cv2
from PySide6.QtCore import QObject, QThread, Signal
from ..core.base import BaseImageSource, BaseControlSource
from PySide6.QtGui import QImage, QPixmap
# try:
# import gphoto2 as gp
# except:
from . import mock_gphoto as gp
class CameraWorker(QObject):
frameReady = Signal(QPixmap)
errorOccurred = Signal(str)
def __init__(self, fps: int = 15, parent=None):
super().__init__(parent)
self.fps = fps
self.running = False
self.camera = None
def start_camera(self):
"""Uruchom kamerę i zacznij pobierać klatki"""
try:
self.camera = gp.Camera() # type: ignore
self.camera.init()
self.running = True
self._capture_loop()
except gp.GPhoto2Error as e:
self.errorOccurred.emit(f"Błąd inicjalizacji kamery: {e}")
def stop_camera(self):
"""Zatrzymaj pobieranie"""
self.running = False
if self.camera:
try:
self.camera.exit()
except gp.GPhoto2Error:
pass
self.camera = None
def _capture_loop(self):
"""Pętla odczytu klatek w osobnym wątku"""
import time
delay = 1.0 / self.fps
while self.running:
try:
file = self.camera.capture_preview() # type: ignore
data = file.get_data_and_size()
frame = np.frombuffer(data, dtype=np.uint8)
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
if frame is not None:
rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w, ch = rgb_image.shape
qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888)
pixmap = QPixmap.fromImage(qimg)
self.frameReady.emit(pixmap)
except gp.GPhoto2Error as e:
self.errorOccurred.emit(f"Błąd odczytu LiveView: {e}")
break
except Exception as e:
self.errorOccurred.emit(f"Nieoczekiwany błąd: {e}")
break
time.sleep(delay)
class CameraController(QObject):
new_frame = Signal(object)
frameReady = Signal(QPixmap)
errorOccurred = Signal(str)
def __init__(self, image_source: BaseImageSource, control_source: BaseControlSource, parent=None):
def __init__(self, fps: int = 15, parent=None):
super().__init__(parent)
self.image_source = image_source
self.control_source = control_source
self.camera_thread = QThread()
self.moveToThread(self.camera_thread)
self.worker = CameraWorker(fps)
self.worker.moveToThread(self.camera_thread )
self.image_source.moveToThread(self.camera_thread)
self.control_source.moveToThread(self.camera_thread)
# sygnały z workera
self.worker.frameReady.connect(self.frameReady)
self.worker.errorOccurred.connect(self.errorOccurred)
self.image_source.new_frame.connect(self.new_frame)
self.image_source.errorOccurred.connect(self.errorOccurred)
self.control_source.errorOccurred.connect(self.errorOccurred)
# sygnały start/stop
self.camera_thread.started.connect(self.worker.start_camera)
def start(self):
"""Start kamery w osobnym wątku"""
self.camera_thread.start()
self.image_source.start()
def stop(self):
self.image_source.stop()
"""Stop kamery i zakończenie wątku"""
self.worker.stop_camera()
self.camera_thread.quit()
self.camera_thread.wait()

View File

@@ -6,8 +6,7 @@ from ui.widgets.color_list_widget import ColorListWidget
from ui.widgets.thumbnail_list_widget import ThumbnailListWidget
from ui.widgets.split_view_widget import SplitView
from .camera_controller import CameraController
from ..core.gphoto_adapter import GPhotoImageSource, GPhotoControlSource
import gphoto2 as gp
class MainController:
def __init__(self, view):
@@ -16,11 +15,7 @@ class MainController:
self.media_repo = MediaRepository(self.db)
self.media_repo.sync_media()
camera = gp.Camera()
camera.init()
stream = GPhotoImageSource(camera=camera, fps=15)
controll = GPhotoControlSource(camera=camera)
self.camera_controller = CameraController(stream, controll)
self.camera_controller = CameraController()
self.view = view
self.color_list: ColorListWidget = view.color_list_widget
@@ -35,7 +30,7 @@ class MainController:
self.thumbnail_list.selectedThumbnail.connect(self.on_thumbnail_selected)
self.camera_controller.errorOccurred.connect(self.split_view.widget_start.set_info_text)
self.camera_controller.new_frame.connect(self.split_view.set_live_image)
self.camera_controller.frameReady.connect(self.split_view.set_live_image)
self.split_view.widget_start.camera_start_btn.clicked.connect(self.camera_controller.start)
def start_camera(self):

View File

@@ -1,26 +1,18 @@
from PySide6.QtCore import QObject, Signal
from PySide6.QtGui import QPixmap
from
class BaseImageSource(QObject):
new_frame = Signal(QPixmap)
frameReady = Signal(QPixmap)
errorOccurred = Signal(str)
def start(self):
raise NotImplementedError
def start(self): ...
def stop(self): ...
def stop(self):
raise NotImplementedError
class BaseControlSource(QObject):
errorOccurred = Signal(str)
parameterChanged = Signal(str, object)
def set_parameter(self, name: str, value):
raise NotImplementedError
def get_parameter(self, name: str):
raise NotImplementedError
def list_parameters(self) -> dict:
raise NotImplementedError
def set_parameter(self, name: str, value): ...
def get_parameter(self, name: str): ...
def list_parameters(self) -> dict: ...

View File

@@ -0,0 +1,49 @@
from abc import ABC, abstractmethod
class BaseCamera(ABC):
"""Interfejs wspólny dla wszystkich backendów kamer."""
@abstractmethod
def connect(self) -> bool:
"""Nawiązuje połączenie z urządzeniem."""
raise NotImplementedError
@abstractmethod
def disconnect(self):
"""Zamyka połączenie z urządzeniem."""
raise NotImplementedError
@abstractmethod
def start_stream(self):
"""Rozpocznij strumień wideo."""
raise NotImplementedError
@abstractmethod
def stop_stream(self):
"""Zatrzymaj strumień wideo."""
raise NotImplementedError
@abstractmethod
def get_frame(self):
"""Pobierz jedną klatkę liveview."""
raise NotImplementedError
@abstractmethod
def capture_photo(self):
"""Zrób zdjęcie."""
raise NotImplementedError
@abstractmethod
def record_video(self):
"""Nagraj film."""
raise NotImplementedError
@abstractmethod
def get_available_settings(self) -> dict:
"""Zwraca słownik dostępnych ustawień i ich możliwych wartości."""
raise NotImplementedError
@abstractmethod
def set_setting(self, name: str, value) -> bool:
"""Ustawia wybraną wartość dla danego ustawienia."""
raise NotImplementedError

View File

@@ -0,0 +1,35 @@
from .base_camera import BaseCamera
class GPhotoBackend(BaseCamera):
def __init__(self) -> None:
self.camera = None
self.context = None
self._is_streaming = False
def connect(self) -> bool:
pass
def disconnect(self):
pass
def start_stream(self):
pass
def stop_stream(self):
pass
def get_frame(self):
pass
def capture_photo(self):
pass
def record_video(self):
pass
def get_available_settings(self) -> dict:
pass
def set_setting(self, name: str, value) -> bool:
pass

View File

@@ -0,0 +1,104 @@
# camera/opencv_camera.py
import cv2
import time
from PySide6.QtGui import QImage, QPixmap
from .base_camera import BaseCamera
class OpenCVCamera(BaseCamera):
"""Implementacja kamery przy użyciu OpenCV."""
def __init__(self, camera_index=0):
self.camera_index = camera_index
self.video_capture = None
self._is_streaming = False
# self._live_view_thread = None # Wewnętrzny wątek do pętli live view
def connect(self) -> bool:
self.video_capture = cv2.VideoCapture(self.camera_index)
if not self.video_capture.isOpened():
# self.error_occurred.emit(f"Nie można otworzyć kamery OpenCV o indeksie {self.camera_index}")
self.video_capture = None
return False
# print("Kamera OpenCV połączona.")
return True
def disconnect(self):
self.stop_stream()
if self.video_capture:
self.video_capture.release()
self.video_capture = None
# print("Kamera OpenCV rozłączona.")
# self.camera_disconnected.emit()
def start_stream(self):
if not self.video_capture or not self.video_capture.isOpened():
# self.error_occurred.emit("Próba uruchomienia podglądu na niepodłączonej kamerze.")
return
if self._is_streaming:
return # Już działa
self._is_streaming = True
# Uruchamiamy pętlę w metodzie, ponieważ cała klasa działa już w dedykowanym wątku
# self._live_view_loop()
def stop_stream(self):
self._is_streaming = False
def get_frame(self):
if not self.video_capture:
return None
ret, frame = self.video_capture.read()
if not ret:
self.stop_stream()
return None
rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w, ch = rgb_image.shape
bytes_per_line = ch * w
qt_image = QImage(rgb_image.data, w, h, bytes_per_line, QImage.Format.Format_RGB888)
return qt_image
def capture_photo(self, save_path: str):
if not self.video_capture or not self.video_capture.isOpened():
# self.error_occurred.emit("Nie można zrobić zdjęcia, kamera nie jest podłączona.")
return
ret, frame = self.video_capture.read()
if ret:
try:
cv2.imwrite(save_path, frame)
print(f"Zdjęcie zapisane w: {save_path}")
# self.photo_captured.emit(save_path)
except Exception as e:
# self.error_occurred.emit(f"Błąd zapisu zdjęcia: {e}")
else:
# self.error_occurred.emit("Nie udało się przechwycić klatki do zdjęcia.")
def get_available_settings(self) -> dict:
# To jest uproszczona implementacja
if not self.video_capture:
return {}
return {
"brightness": self.video_capture.get(cv2.CAP_PROP_BRIGHTNESS),
"contrast": self.video_capture.get(cv2.CAP_PROP_CONTRAST),
"saturation": self.video_capture.get(cv2.CAP_PROP_SATURATION),
}
def set_setting(self, name: str, value) -> bool:
if not self.video_capture:
return False
prop_map = {
"brightness": cv2.CAP_PROP_BRIGHTNESS,
"contrast": cv2.CAP_PROP_CONTRAST,
"saturation": cv2.CAP_PROP_SATURATION,
}
if name in prop_map:
return self.video_capture.set(prop_map[name], value)
return False

View File

@@ -1,69 +0,0 @@
import cv2
import gphoto2 as gp
from controllers.camera_controller import CameraController
from .gphoto_adapter import GPhotoImageSource, GPhotoControlSource
from .opencv_adapter import OpenCVImageSource, OpenCVControlSource
class CameraManager:
def __init__(self):
self.devices = [] # lista wykrytych kamer
def detect_devices(self):
self.devices.clear()
# --- Wykrywanie webcamów / grabberów HDMI
for index in range(5): # sprawdź kilka indeksów
cap = cv2.VideoCapture(index)
if cap.isOpened():
self.devices.append({
"id": f"opencv:{index}",
"name": f"Webcam / HDMI Grabber #{index}",
"type": "opencv",
"index": index
})
cap.release()
# --- Wykrywanie kamer gphoto2
cameras = gp.Camera.autodetect() # type: ignore
for i, (name, addr) in enumerate(cameras):
self.devices.append({
"id": f"gphoto:{i}",
"name": f"{name} ({addr})",
"type": "gphoto",
"addr": addr
})
return self.devices
def create_controller(self, device_id, hybrid_with=None):
"""
Tworzy CameraController na podstawie id urządzenia.
Można podać hybrid_with="opencv" albo "gphoto" żeby zbudować hybrydę.
"""
device = next((d for d in self.devices if d["id"] == device_id), None)
if not device:
raise ValueError(f"Nie znaleziono urządzenia {device_id}")
# Webcam / grabber
if device["type"] == "opencv":
cap = cv2.VideoCapture(device["index"])
img = OpenCVImageSource(device["index"])
ctrl = OpenCVControlSource(cap)
return CameraController(img, ctrl)
# GPhoto camera
elif device["type"] == "gphoto":
cam = gp.Camera() # type: ignore
cam.init()
img = GPhotoImageSource(cam)
ctrl = GPhotoControlSource(cam)
return CameraController(img, ctrl)
# Hybrydowy tryb
elif device["type"] == "hybrid":
raise NotImplementedError("Tu możesz połączyć OpenCV + GPhoto w hybrydę")
else:
raise ValueError(f"Nieobsługiwany typ urządzenia: {device['type']}")

View File

@@ -1,21 +1,17 @@
import numpy as np
import cv2
from PySide6.QtCore import QObject, QThread, Signal, QTimer
from PySide6.QtGui import QImage, QPixmap
import cv2
import numpy as np
from .base import BaseImageSource, BaseControlSource
import gphoto2 as gp
from .base import BaseControlSource, BaseImageSource
# try:
# import gphoto2 as gp
# except:
# from . import mock_gphoto as gp
from . import mock_gphoto as gp
class GPhotoImageSource(BaseImageSource):
def __init__(self, camera: gp.Camera, fps=10, parent=None): # type: ignore
def __init__(self, camera: gp.Camera, fps=10, parent=None):
super().__init__(parent)
self.camera = camera
self.fps = fps
@@ -26,10 +22,6 @@ class GPhotoImageSource(BaseImageSource):
self.timer.timeout.connect(self._grab_frame)
self.timer.start(int(1000 / self.fps))
def stop(self):
if self.timer:
self.timer.stop()
def _grab_frame(self):
try:
file = self.camera.capture_preview()
@@ -43,14 +35,18 @@ class GPhotoImageSource(BaseImageSource):
h, w, ch = rgb_image.shape
qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888)
pixmap = QPixmap.fromImage(qimg)
self.new_frame.emit(pixmap)
self.frameReady.emit(pixmap)
except gp.GPhoto2Error as e:
self.errorOccurred.emit(f"GPhoto2 error: {e}")
def stop(self):
if self.timer:
self.timer.stop()
class GPhotoControlSource(BaseControlSource):
def __init__(self, camera: gp.Camera, parent=None): # type: ignore
def __init__(self, camera: gp.Camera, parent=None):
super().__init__(parent)
self.camera = camera

View File

@@ -1,59 +1,10 @@
from PySide6.QtCore import QObject, Signal, QTimer
from PySide6.QtGui import QImage, QPixmap
import cv2
import numpy as np
from .base import BaseImageSource, BaseControlSource
class OpenCVImageSource(BaseImageSource):
def __init__(self, device_index=0, fps=30, parent=None):
super().__init__(parent)
self.device_index = device_index
self.fps = fps
self.cap = None
self.timer = None
def start(self):
self.cap = cv2.VideoCapture(self.device_index)
if not self.cap.isOpened():
self.errorOccurred.emit(f"Nie mogę otworzyć kamery {self.device_index}")
return
self.timer = QTimer()
self.timer.timeout.connect(self._grab_frame)
self.timer.start(int(1000 / self.fps))
def stop(self):
if self.timer:
self.timer.stop()
if self.cap:
self.cap.release()
def _grab_frame(self):
if self.cap is None:
self.errorOccurred.emit(f"Kamera niezaincjalizowana!")
return
ret, frame = self.cap.read()
if not ret:
self.errorOccurred.emit("Brak obrazu z kamery OpenCV")
return
rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w, ch = rgb_image.shape
qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888)
pixmap = QPixmap.fromImage(qimg)
self.new_frame.emit(pixmap)
class OpenCVControlSource(BaseControlSource):
def __init__(self, cap: cv2.VideoCapture, parent=None):
super().__init__(parent)
self.cap = cap