6 Commits

7 changed files with 561 additions and 343 deletions

View File

@@ -5,10 +5,13 @@ from core.media import MediaRepository
from ui.widgets.color_list_widget import ColorListWidget from ui.widgets.color_list_widget import ColorListWidget
from ui.widgets.thumbnail_list_widget import ThumbnailListWidget from ui.widgets.thumbnail_list_widget import ThumbnailListWidget
from ui.widgets.split_view_widget import SplitView from ui.widgets.split_view_widget import SplitView
from .camera_controller import CameraController # from .camera_controller import CameraController
from core.camera.camera_controller import CameraController
from core.camera.camera_manager import CameraManager
from core.camera.gphoto_camera import GPhotoCamera from core.camera.gphoto_camera import GPhotoCamera
from core.camera.camera_manager import CameraManager from core.camera.camera_controller import CameraController
class MainController: class MainController:
def __init__(self, view): def __init__(self, view):
@@ -17,9 +20,11 @@ class MainController:
self.media_repo = MediaRepository(self.db) self.media_repo = MediaRepository(self.db)
self.media_repo.sync_media() self.media_repo.sync_media()
camera = GPhotoCamera() # camera = GPhotoCamera()
self.manager = CameraManager(camera) # self.manager = CameraController(camera)
manager = CameraManager()
manager.detect_gphoto()
manager.detect_opencv()
# self.camera_controller = CameraController() # self.camera_controller = CameraController()
@@ -39,9 +44,9 @@ class MainController:
self.thumbnail_list.selectedThumbnail.connect(self.on_thumbnail_selected) self.thumbnail_list.selectedThumbnail.connect(self.on_thumbnail_selected)
# self.camera_controller.errorOccurred.connect(self.split_view.widget_start.set_info_text) # self.camera_controller.errorOccurred.connect(self.split_view.widget_start.set_info_text)
self.manager.error_occurred.connect(self.split_view.widget_start.set_info_text) # self.manager.error_occurred.connect(self.split_view.widget_start.set_info_text)
# self.camera_controller.frameReady.connect(self.split_view.set_live_image) # self.camera_controller.frameReady.connect(self.split_view.set_live_image)
self.manager.frame_ready.connect(self.split_view.set_live_image) # self.manager.frame_ready.connect(self.split_view.set_live_image)
# self.split_view.widget_start.camera_start_btn.clicked.connect(self.camera_controller.start) # self.split_view.widget_start.camera_start_btn.clicked.connect(self.camera_controller.start)
self.split_view.widget_start.camera_start_btn.clicked.connect(self.start_liveview) self.split_view.widget_start.camera_start_btn.clicked.connect(self.start_liveview)
@@ -86,8 +91,10 @@ class MainController:
self.split_view.toglle_live_view() self.split_view.toglle_live_view()
def start_liveview(self): def start_liveview(self):
self.manager.start_camera() pass
# self.manager.start_camera()
# self.manager.start_stream() # self.manager.start_stream()
def shutdown(self): def shutdown(self):
self.manager.stop() pass
# self.manager.stop()

View File

@@ -1,6 +1,16 @@
import cv2 import cv2
import numpy as np import numpy as np
GP_WIDGET_WINDOW = 0
GP_WIDGET_SECTION = 0
GP_WIDGET_TEXT = 0
GP_WIDGET_RANGE = 0
GP_WIDGET_TOGGLE = 0
GP_WIDGET_RADIO = 0
GP_WIDGET_MENU = 0
GP_WIDGET_BUTTON = 0
GP_WIDGET_DATE = 0
class GPhoto2Error(Exception): class GPhoto2Error(Exception):
pass pass
@@ -19,6 +29,65 @@ class CameraFileMock:
return self._data return self._data
return self._data, len(self._data) return self._data, len(self._data)
class CameraListMock:
def count(self):
return 1
def get_name(self, idx):
return f"mock_name {idx}"
def get_value(self, idx):
return f"mock_value {idx}"
class MockPortInfo:
def __init__(self, address):
self.address = address
class PortInfoList:
def __init__(self):
self._ports = []
def load(self):
# Dodaj przykładowe porty
self._ports = [MockPortInfo("usb:001,002"), MockPortInfo("usb:001,003")]
def lookup_path(self, port_address):
for idx, port in enumerate(self._ports):
if port.address == port_address:
return idx
raise ValueError("Port not found")
def __getitem__(self, idx):
return self._ports[idx]
class ConfigMock:
def get_id(self):
return 0
def get_name(self):
return "name"
def get_label(self):
return "label"
def get_type(self):
return 0
def get_value(self):
return "value"
def get_choices(self):
return []
def count_children(self):
return 0
def get_child(self):
return ConfigMock()
class CameraAbilitiesList:
def __init__(self) -> None:
self.abilities = []
def load(self):
return
def lookup_model(self, name):
return 1
def get_abilities(self, abilities_index):
return 0
class Camera: class Camera:
def __init__(self): def __init__(self):
@@ -62,3 +131,18 @@ class Camera:
self._frame_counter += 1 self._frame_counter += 1
return CameraFileMock(frame) return CameraFileMock(frame)
def set_port_info(self, obj):
return False
def get_config(self):
return ConfigMock()
def set_single_config(self, name, widget):
return True
def gp_camera_autodetect():
return CameraListMock()
def check_result(obj):
return obj

View File

@@ -5,8 +5,13 @@ class BaseCamera(ABC):
def __init__(self) -> None: def __init__(self) -> None:
self.error_msg = None self.error_msg = None
@staticmethod
@abstractmethod @abstractmethod
def connect(self) -> bool: def detect() -> dict:
raise NotImplementedError
@abstractmethod
def connect(self, index: int | None = None) -> bool:
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod

View File

@@ -0,0 +1,115 @@
from PySide6.QtCore import QObject, QThread, QTimer, Signal, Slot, QMutex, QMutexLocker
from PySide6.QtGui import QImage, QPixmap
import cv2
from .base_camera import BaseCamera
class CameraController(QThread):
frame_ready = Signal(QPixmap)
photo_ready = Signal(QPixmap)
error_occurred = Signal(str)
_enable_timer = Signal(bool)
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self.camera = None
self.timer = None
self.fps = 15
self.is_streaming = False
self.is_connected = False
self._camera_mutex = QMutex()
self.start()
def run(self) -> None:
self.timer = QTimer()
self.timer.timeout.connect(self._update_frame)
self._enable_timer.connect(self._set_timer)
self.exec()
def stop(self):
self.stop_camera()
self.quit()
self.wait()
def set_camera(self, camera: BaseCamera, fps: int = 15) -> None:
with QMutexLocker(self._camera_mutex):
self.stop_stream()
self.stop_camera()
self.camera = camera
self.fps = fps
def start_camera(self) -> None:
if self.camera is None or self.is_connected:
return
if self.camera.connect():
self.is_connected = True
else:
self.is_connected = False
self.error_occurred.emit(self.camera.get_error_msg())
def stop_camera(self) -> None:
if self.is_streaming:
self.stop_stream()
if self.camera is not None:
self.camera.disconnect()
self.is_connected = False
def start_stream(self):
if not self.is_connected:
return
if self.is_streaming:
return
if self.timer:
self.is_streaming = True
# self.timer.start()
self._enable_timer.emit(True)
def stop_stream(self) -> None:
if self.is_streaming:
self.is_streaming = False
if self.timer:
# self.timer.stop()
self._enable_timer.emit(False)
def _update_frame(self) -> None:
with QMutexLocker(self._camera_mutex):
if self.camera is None or not self.is_connected:
return
if not self.is_streaming:
return
ret, frame = self.camera.get_frame()
if not ret:
self.error_occurred.emit(self.camera.get_error_msg())
return
if frame is not None:
rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w, ch = rgb_image.shape
qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888)
pixmap = QPixmap.fromImage(qimg)
self.frame_ready.emit(pixmap)
def _set_timer(self, enable: bool):
if not self.timer:
return
if enable:
self.timer.setInterval(int(1000 / self.fps))
self.timer.start()
else:
self.timer.stop()

View File

@@ -1,88 +1,20 @@
from PySide6.QtCore import QObject, QThread, QTimer, Signal, Slot
from PySide6.QtGui import QImage, QPixmap
import cv2
from .base_camera import BaseCamera
class CameraManager(QThread): from .gphoto_camera import GPhotoCamera
frame_ready = Signal(QPixmap) from .opencv_camera import OpenCvCamera
photo_ready = Signal(QPixmap) from .camera_controller import CameraController
error_occurred = Signal(str)
def __init__(self, camera: BaseCamera, fps: int = 15, parent: QObject | None = None) -> None:
super().__init__(parent)
self.camera = camera
self.fps = fps
self.timer = None
self.is_streaming = False
self.is_connected = False class CameraManager:
def __init__(self) -> None:
pass
self.timer = QTimer() def detect_gphoto(self):
self.timer.setInterval(int(1000 / self.fps)) camera_list = GPhotoCamera.detect()
self.timer.timeout.connect(self._update_frame) print(camera_list)
self.start() return camera_list
def run(self) -> None: def detect_opencv(self):
# self.timer = QTimer() camera_list = OpenCvCamera.detect()
# self.timer.setInterval(int(1000 / self.fps)) print(camera_list)
# self.timer.timeout.connect(self._update_frame) return camera_list
self.exec()
def start_camera(self) -> None:
if self.is_connected:
return
if self.camera.connect():
self.is_connected = True
else:
self.is_connected = False
self.error_occurred.emit(self.camera.get_error_msg())
def stop_camera(self) -> None:
self.is_streaming = False
self.is_connected = False
if self.timer:
self.timer.stop()
self.camera.disconnect()
def start_stream(self):
if not self.is_connected:
return
if self.is_streaming:
return
if self.timer:
self.is_streaming = True
self.timer.start()
def stop_stream(self) -> None:
if self.is_streaming:
self.is_streaming = False
if self.timer:
self.timer.stop()
def _update_frame(self) -> None:
if not self.is_streaming or not self.is_connected:
return
ret, frame = self.camera.get_frame()
if not ret:
self.error_occurred.emit(self.camera.get_error_msg())
return
if frame is not None:
rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w, ch = rgb_image.shape
qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888)
pixmap = QPixmap.fromImage(qimg)
self.frame_ready.emit(pixmap)
def stop(self):
self.stop_camera()
self.quit()
self.wait()

View File

@@ -1,11 +1,15 @@
from typing import Optional, List from typing import Optional, List
from dataclasses import dataclass, field from dataclasses import dataclass, field
import gphoto2 as gp
import cv2 import cv2
import numpy as np import numpy as np
from .base_camera import BaseCamera from .base_camera import BaseCamera
try:
import gphoto2 as gp # type: ignore
except:
import controllers.mock_gphoto as gp
camera_widget_types = { camera_widget_types = {
gp.GP_WIDGET_WINDOW: "GP_WIDGET_WINDOW", # type: ignore gp.GP_WIDGET_WINDOW: "GP_WIDGET_WINDOW", # type: ignore
gp.GP_WIDGET_SECTION: "GP_WIDGET_SECTION", # type: ignore gp.GP_WIDGET_SECTION: "GP_WIDGET_SECTION", # type: ignore
@@ -18,17 +22,64 @@ camera_widget_types = {
gp.GP_WIDGET_DATE: "GP_WIDGET_DATE", # type: ignore gp.GP_WIDGET_DATE: "GP_WIDGET_DATE", # type: ignore
} }
operations = [
("GP_OPERATION_NONE", gp.GP_OPERATION_NONE), # type: ignore
("GP_OPERATION_CAPTURE_IMAGE", gp.GP_OPERATION_CAPTURE_IMAGE), # type: ignore
("GP_OPERATION_CAPTURE_VIDEO", gp.GP_OPERATION_CAPTURE_VIDEO), # type: ignore
("GP_OPERATION_CAPTURE_AUDIO", gp.GP_OPERATION_CAPTURE_AUDIO), # type: ignore
("GP_OPERATION_CAPTURE_PREVIEW", gp.GP_OPERATION_CAPTURE_PREVIEW), # type: ignore
("GP_OPERATION_CONFIG", gp.GP_OPERATION_CONFIG), # type: ignore
("GP_OPERATION_TRIGGER_CAPTURE", gp.GP_OPERATION_TRIGGER_CAPTURE), # type: ignore
]
class GPhotoCamera(BaseCamera): class GPhotoCamera(BaseCamera):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
self.camera = None self.camera = None
self.configs: List[dict] = [] self.configs: List[dict] = []
self.camera_index = 0
def connect(self) -> bool: @staticmethod
def detect() -> dict:
cameras = gp.check_result(gp.gp_camera_autodetect()) # type: ignore
# cameras = gp.Camera().autodetect()
if not cameras or cameras.count() == 0: # type: ignore
return {}
abilities_list = gp.CameraAbilitiesList() # type: ignore
abilities_list.load()
camera_list = {}
for i in range(cameras.count()): # type: ignore
name = cameras.get_name(i) # type: ignore
port = cameras.get_value(i) # type: ignore
abilities_index = abilities_list.lookup_model(name)
abilities = abilities_list.get_abilities(abilities_index)
abilities_name = []
for name, bit in operations:
if abilities.operations & bit: # type: ignore
abilities_name.append(name)
camera_list[i] = {"name": name, "port": port, "abilities": abilities_name}
return camera_list
def connect(self, index: int | None = None) -> bool:
self.error_msg = None self.error_msg = None
try:
self.camera = gp.Camera() # type: ignore self.camera = gp.Camera() # type: ignore
try:
if index:
self.camera_index = index
camera_list = GPhotoCamera.detect()
port_info_list = gp.PortInfoList()
port_info_list.load()
port_address = camera_list[index]["port"]
port_index = port_info_list.lookup_path(port_address)
self.camera.set_port_info(port_info_list[port_index])
self.camera.init() self.camera.init()
config = self.camera.get_config() config = self.camera.get_config()
self.read_config(config) self.read_config(config)
@@ -72,7 +123,7 @@ class GPhotoCamera(BaseCamera):
if value not in config['choices']: if value not in config['choices']:
return return
config['config'].set_value(value) # type: ignore config['widget'].set_value(value) # type: ignore
if self._save_config(config): if self._save_config(config):
config['value'] = value config['value'] = value
@@ -90,7 +141,7 @@ class GPhotoCamera(BaseCamera):
if not self.camera: if not self.camera:
return False return False
self.camera.set_single.config(config['name'], config['widget']) self.camera.set_single_config(config['name'], config['widget'])
return True return True
def parse_config(self, config): def parse_config(self, config):

View File

@@ -1,12 +1,11 @@
import cv2 import cv2
from cv2_enumerate_cameras import enumerate_cameras
from typing import List from typing import List
from .base_camera import BaseCamera from .base_camera import BaseCamera
class OpenCvCamera(BaseCamera):
class CvCamera(BaseCamera):
"""Kamera oparta na cv2.VideoCapture""" """Kamera oparta na cv2.VideoCapture"""
config_map = { config_map = {
@@ -18,18 +17,42 @@ class CvCamera(BaseCamera):
5: {"name": "saturation", "cv_prop": cv2.CAP_PROP_SATURATION, "default": 0.5}, 5: {"name": "saturation", "cv_prop": cv2.CAP_PROP_SATURATION, "default": 0.5},
} }
def __init__(self, device_index: int = 0) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
self.device_index = device_index
self.cap = None self.cap = None
self.configs: List[dict] = [] self.configs: List[dict] = []
self.camera_list = []
self.camera_index = 0
def connect(self) -> bool: @staticmethod
def detect():
camera_list = enumerate_cameras(cv2.CAP_ANY)
result = {}
for camera in camera_list:
cap = cv2.VideoCapture(camera.index, camera.backend)
# ret, frame = cap.read()
cap.release()
# if ret and frame is not None and frame.size > 0:
result[camera.index] = {
"name": camera.name,
"port": camera.path,
"backend": camera.backend,
}
return result
def connect(self, index: int | None = None) -> bool:
self.error_msg = None self.error_msg = None
try: try:
self.cap = cv2.VideoCapture(self.device_index) if index:
self.camera_index = index
self.cap = cv2.VideoCapture(self.camera_index)
if not self.cap.isOpened(): if not self.cap.isOpened():
self.error_msg = f"[CV2] Could not open camera {self.device_index}" self.error_msg = f"[CV2] Could not open camera {self.camera_index}"
return False return False
self.configs.clear() self.configs.clear()
@@ -85,7 +108,8 @@ class CvCamera(BaseCamera):
return return
try: try:
self.cap.set(config["cv_prop"], value) self.cap.set(config["cv_prop"], value)
config["value"] = self.cap.get(config["cv_prop"]) # sprawdz co ustawiło config["value"] = self.cap.get(
config["cv_prop"]) # sprawdz co ustawiło
except Exception as e: except Exception as e:
self.error_msg = f"[CV2] {e}" self.error_msg = f"[CV2] {e}"