6 Commits

7 changed files with 561 additions and 343 deletions

View File

@@ -5,89 +5,96 @@ from core.media import MediaRepository
from ui.widgets.color_list_widget import ColorListWidget from ui.widgets.color_list_widget import ColorListWidget
from ui.widgets.thumbnail_list_widget import ThumbnailListWidget from ui.widgets.thumbnail_list_widget import ThumbnailListWidget
from ui.widgets.split_view_widget import SplitView from ui.widgets.split_view_widget import SplitView
from .camera_controller import CameraController # from .camera_controller import CameraController
from core.camera.camera_controller import CameraController
from core.camera.gphoto_camera import GPhotoCamera
from core.camera.camera_manager import CameraManager from core.camera.camera_manager import CameraManager
from core.camera.gphoto_camera import GPhotoCamera
from core.camera.camera_controller import CameraController
class MainController: class MainController:
def __init__(self, view): def __init__(self, view):
self.db = DatabaseManager() self.db = DatabaseManager()
self.db.connect() self.db.connect()
self.media_repo = MediaRepository(self.db) self.media_repo = MediaRepository(self.db)
self.media_repo.sync_media() self.media_repo.sync_media()
camera = GPhotoCamera() # camera = GPhotoCamera()
self.manager = CameraManager(camera) # self.manager = CameraController(camera)
manager = CameraManager()
manager.detect_gphoto()
manager.detect_opencv()
# self.camera_controller = CameraController()
self.view = view
self.color_list: ColorListWidget = view.color_list_widget
self.thumbnail_list: ThumbnailListWidget = view.thumbnail_widget
self.split_view: SplitView = view.preview_widget
self.photo_button: QPushButton = view.photo_button
self.photo_button.clicked.connect(self.take_photo)
self.record_button: QPushButton = view.record_button
# self.record_button.clicked.connect(self.fun_test)
self.color_list.colorSelected.connect(self.on_color_selected)
self.color_list.editColor.connect(self.on_edit_color)
self.thumbnail_list.selectedThumbnail.connect(self.on_thumbnail_selected)
# self.camera_controller.errorOccurred.connect(self.split_view.widget_start.set_info_text)
# self.manager.error_occurred.connect(self.split_view.widget_start.set_info_text)
# self.camera_controller.frameReady.connect(self.split_view.set_live_image)
# self.manager.frame_ready.connect(self.split_view.set_live_image)
# self.split_view.widget_start.camera_start_btn.clicked.connect(self.camera_controller.start)
self.split_view.widget_start.camera_start_btn.clicked.connect(self.start_liveview)
# self.camera_controller = CameraController() def start_camera(self):
pass
self.view = view def load_colors(self) -> None:
self.color_list: ColorListWidget = view.color_list_widget colors = self.db.get_all_colors()
self.thumbnail_list: ThumbnailListWidget = view.thumbnail_widget print("Loaded colors:", colors)
self.split_view: SplitView = view.preview_widget self.color_list.set_colors(colors)
self.photo_button: QPushButton = view.photo_button
self.photo_button.clicked.connect(self.take_photo)
self.record_button: QPushButton = view.record_button
# self.record_button.clicked.connect(self.fun_test)
self.color_list.colorSelected.connect(self.on_color_selected)
self.color_list.editColor.connect(self.on_edit_color)
self.thumbnail_list.selectedThumbnail.connect(self.on_thumbnail_selected)
# self.camera_controller.errorOccurred.connect(self.split_view.widget_start.set_info_text)
self.manager.error_occurred.connect(self.split_view.widget_start.set_info_text)
# self.camera_controller.frameReady.connect(self.split_view.set_live_image)
self.manager.frame_ready.connect(self.split_view.set_live_image)
# self.split_view.widget_start.camera_start_btn.clicked.connect(self.camera_controller.start)
self.split_view.widget_start.camera_start_btn.clicked.connect(self.start_liveview)
def start_camera(self): def on_color_selected(self, color_name: str):
pass print(f"Wybrano kolor: {color_name}")
color_id = self.db.get_color_id(color_name)
if color_id is not None:
media_items = self.db.get_media_for_color(color_id)
print(f"Media dla koloru {color_name} (ID: {color_id}):", media_items)
def load_colors(self) -> None: self.thumbnail_list.list_widget.clear()
colors = self.db.get_all_colors() for media in media_items:
print("Loaded colors:", colors) if media['file_type'] == 'photo':
self.color_list.set_colors(colors) file_name = Path(media['media_path']).name
self.thumbnail_list.add_thumbnail(media['media_path'], file_name, media['id'])
else:
print(f"Nie znaleziono koloru o nazwie: {color_name}")
def on_edit_color(self, color_name: str):
print(f"Edycja koloru: {color_name}")
def on_color_selected(self, color_name: str): def on_thumbnail_selected(self, media_id: int):
print(f"Wybrano kolor: {color_name}") media = self.db.get_media(media_id)
color_id = self.db.get_color_id(color_name) if media:
if color_id is not None: print(f"Wybrano miniaturę o ID: {media_id}, ścieżka: {media['media_path']}")
media_items = self.db.get_media_for_color(color_id) self.split_view.set_reference_image(media['media_path'])
print(f"Media dla koloru {color_name} (ID: {color_id}):", media_items) else:
print(f"Nie znaleziono mediów o ID: {media_id}")
self.thumbnail_list.list_widget.clear() def take_photo(self):
for media in media_items: print("Robienie zdjęcia...")
if media['file_type'] == 'photo': self.split_view.toglle_live_view()
file_name = Path(media['media_path']).name
self.thumbnail_list.add_thumbnail(media['media_path'], file_name, media['id'])
else:
print(f"Nie znaleziono koloru o nazwie: {color_name}")
def on_edit_color(self, color_name: str): def start_liveview(self):
print(f"Edycja koloru: {color_name}") pass
# self.manager.start_camera()
# self.manager.start_stream()
def on_thumbnail_selected(self, media_id: int): def shutdown(self):
media = self.db.get_media(media_id) pass
if media: # self.manager.stop()
print(f"Wybrano miniaturę o ID: {media_id}, ścieżka: {media['media_path']}")
self.split_view.set_reference_image(media['media_path'])
else:
print(f"Nie znaleziono mediów o ID: {media_id}")
def take_photo(self):
print("Robienie zdjęcia...")
self.split_view.toglle_live_view()
def start_liveview(self):
self.manager.start_camera()
# self.manager.start_stream()
def shutdown(self):
self.manager.stop()

View File

@@ -1,6 +1,16 @@
import cv2 import cv2
import numpy as np import numpy as np
GP_WIDGET_WINDOW = 0
GP_WIDGET_SECTION = 0
GP_WIDGET_TEXT = 0
GP_WIDGET_RANGE = 0
GP_WIDGET_TOGGLE = 0
GP_WIDGET_RADIO = 0
GP_WIDGET_MENU = 0
GP_WIDGET_BUTTON = 0
GP_WIDGET_DATE = 0
class GPhoto2Error(Exception): class GPhoto2Error(Exception):
pass pass
@@ -19,6 +29,65 @@ class CameraFileMock:
return self._data return self._data
return self._data, len(self._data) return self._data, len(self._data)
class CameraListMock:
def count(self):
return 1
def get_name(self, idx):
return f"mock_name {idx}"
def get_value(self, idx):
return f"mock_value {idx}"
class MockPortInfo:
def __init__(self, address):
self.address = address
class PortInfoList:
def __init__(self):
self._ports = []
def load(self):
# Dodaj przykładowe porty
self._ports = [MockPortInfo("usb:001,002"), MockPortInfo("usb:001,003")]
def lookup_path(self, port_address):
for idx, port in enumerate(self._ports):
if port.address == port_address:
return idx
raise ValueError("Port not found")
def __getitem__(self, idx):
return self._ports[idx]
class ConfigMock:
def get_id(self):
return 0
def get_name(self):
return "name"
def get_label(self):
return "label"
def get_type(self):
return 0
def get_value(self):
return "value"
def get_choices(self):
return []
def count_children(self):
return 0
def get_child(self):
return ConfigMock()
class CameraAbilitiesList:
def __init__(self) -> None:
self.abilities = []
def load(self):
return
def lookup_model(self, name):
return 1
def get_abilities(self, abilities_index):
return 0
class Camera: class Camera:
def __init__(self): def __init__(self):
@@ -62,3 +131,18 @@ class Camera:
self._frame_counter += 1 self._frame_counter += 1
return CameraFileMock(frame) return CameraFileMock(frame)
def set_port_info(self, obj):
return False
def get_config(self):
return ConfigMock()
def set_single_config(self, name, widget):
return True
def gp_camera_autodetect():
return CameraListMock()
def check_result(obj):
return obj

View File

@@ -2,36 +2,41 @@ from abc import ABC, abstractmethod
class BaseCamera(ABC): class BaseCamera(ABC):
def __init__(self) -> None: def __init__(self) -> None:
self.error_msg = None self.error_msg = None
@abstractmethod @staticmethod
def connect(self) -> bool: @abstractmethod
raise NotImplementedError def detect() -> dict:
raise NotImplementedError
@abstractmethod @abstractmethod
def disconnect(self) -> None: def connect(self, index: int | None = None) -> bool:
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def get_frame(self): def disconnect(self) -> None:
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def get_config_by_id(self, id: int) -> dict: def get_frame(self):
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def get_config_by_name(self, name: str) -> dict: def get_config_by_id(self, id: int) -> dict:
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def set_config_by_id(self, id: int, value) -> None: def get_config_by_name(self, name: str) -> dict:
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def set_config_by_name(self, name: str, value) -> None: def set_config_by_id(self, id: int, value) -> None:
raise NotImplementedError raise NotImplementedError
def get_error_msg(self): @abstractmethod
return str(self.error_msg) def set_config_by_name(self, name: str, value) -> None:
raise NotImplementedError
def get_error_msg(self):
return str(self.error_msg)

View File

@@ -0,0 +1,115 @@
from PySide6.QtCore import QObject, QThread, QTimer, Signal, Slot, QMutex, QMutexLocker
from PySide6.QtGui import QImage, QPixmap
import cv2
from .base_camera import BaseCamera
class CameraController(QThread):
frame_ready = Signal(QPixmap)
photo_ready = Signal(QPixmap)
error_occurred = Signal(str)
_enable_timer = Signal(bool)
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self.camera = None
self.timer = None
self.fps = 15
self.is_streaming = False
self.is_connected = False
self._camera_mutex = QMutex()
self.start()
def run(self) -> None:
self.timer = QTimer()
self.timer.timeout.connect(self._update_frame)
self._enable_timer.connect(self._set_timer)
self.exec()
def stop(self):
self.stop_camera()
self.quit()
self.wait()
def set_camera(self, camera: BaseCamera, fps: int = 15) -> None:
with QMutexLocker(self._camera_mutex):
self.stop_stream()
self.stop_camera()
self.camera = camera
self.fps = fps
def start_camera(self) -> None:
if self.camera is None or self.is_connected:
return
if self.camera.connect():
self.is_connected = True
else:
self.is_connected = False
self.error_occurred.emit(self.camera.get_error_msg())
def stop_camera(self) -> None:
if self.is_streaming:
self.stop_stream()
if self.camera is not None:
self.camera.disconnect()
self.is_connected = False
def start_stream(self):
if not self.is_connected:
return
if self.is_streaming:
return
if self.timer:
self.is_streaming = True
# self.timer.start()
self._enable_timer.emit(True)
def stop_stream(self) -> None:
if self.is_streaming:
self.is_streaming = False
if self.timer:
# self.timer.stop()
self._enable_timer.emit(False)
def _update_frame(self) -> None:
with QMutexLocker(self._camera_mutex):
if self.camera is None or not self.is_connected:
return
if not self.is_streaming:
return
ret, frame = self.camera.get_frame()
if not ret:
self.error_occurred.emit(self.camera.get_error_msg())
return
if frame is not None:
rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w, ch = rgb_image.shape
qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888)
pixmap = QPixmap.fromImage(qimg)
self.frame_ready.emit(pixmap)
def _set_timer(self, enable: bool):
if not self.timer:
return
if enable:
self.timer.setInterval(int(1000 / self.fps))
self.timer.start()
else:
self.timer.stop()

View File

@@ -1,88 +1,20 @@
from PySide6.QtCore import QObject, QThread, QTimer, Signal, Slot
from PySide6.QtGui import QImage, QPixmap
import cv2
from .base_camera import BaseCamera
class CameraManager(QThread): from .gphoto_camera import GPhotoCamera
frame_ready = Signal(QPixmap) from .opencv_camera import OpenCvCamera
photo_ready = Signal(QPixmap) from .camera_controller import CameraController
error_occurred = Signal(str)
def __init__(self, camera: BaseCamera, fps: int = 15, parent: QObject | None = None) -> None:
super().__init__(parent)
self.camera = camera
self.fps = fps
self.timer = None
self.is_streaming = False
self.is_connected = False class CameraManager:
def __init__(self) -> None:
pass
self.timer = QTimer() def detect_gphoto(self):
self.timer.setInterval(int(1000 / self.fps)) camera_list = GPhotoCamera.detect()
self.timer.timeout.connect(self._update_frame) print(camera_list)
self.start() return camera_list
def run(self) -> None: def detect_opencv(self):
# self.timer = QTimer() camera_list = OpenCvCamera.detect()
# self.timer.setInterval(int(1000 / self.fps)) print(camera_list)
# self.timer.timeout.connect(self._update_frame) return camera_list
self.exec()
def start_camera(self) -> None:
if self.is_connected:
return
if self.camera.connect():
self.is_connected = True
else:
self.is_connected = False
self.error_occurred.emit(self.camera.get_error_msg())
def stop_camera(self) -> None:
self.is_streaming = False
self.is_connected = False
if self.timer:
self.timer.stop()
self.camera.disconnect()
def start_stream(self):
if not self.is_connected:
return
if self.is_streaming:
return
if self.timer:
self.is_streaming = True
self.timer.start()
def stop_stream(self) -> None:
if self.is_streaming:
self.is_streaming = False
if self.timer:
self.timer.stop()
def _update_frame(self) -> None:
if not self.is_streaming or not self.is_connected:
return
ret, frame = self.camera.get_frame()
if not ret:
self.error_occurred.emit(self.camera.get_error_msg())
return
if frame is not None:
rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w, ch = rgb_image.shape
qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888)
pixmap = QPixmap.fromImage(qimg)
self.frame_ready.emit(pixmap)
def stop(self):
self.stop_camera()
self.quit()
self.wait()

View File

@@ -1,122 +1,173 @@
from typing import Optional, List from typing import Optional, List
from dataclasses import dataclass, field from dataclasses import dataclass, field
import gphoto2 as gp
import cv2 import cv2
import numpy as np import numpy as np
from .base_camera import BaseCamera from .base_camera import BaseCamera
try:
import gphoto2 as gp # type: ignore
except:
import controllers.mock_gphoto as gp
camera_widget_types = { camera_widget_types = {
gp.GP_WIDGET_WINDOW: "GP_WIDGET_WINDOW", # type: ignore gp.GP_WIDGET_WINDOW: "GP_WIDGET_WINDOW", # type: ignore
gp.GP_WIDGET_SECTION: "GP_WIDGET_SECTION", # type: ignore gp.GP_WIDGET_SECTION: "GP_WIDGET_SECTION", # type: ignore
gp.GP_WIDGET_TEXT: "GP_WIDGET_TEXT", # type: ignore gp.GP_WIDGET_TEXT: "GP_WIDGET_TEXT", # type: ignore
gp.GP_WIDGET_RANGE: "GP_WIDGET_RANGE", # type: ignore gp.GP_WIDGET_RANGE: "GP_WIDGET_RANGE", # type: ignore
gp.GP_WIDGET_TOGGLE: "GP_WIDGET_TOGGLE", # type: ignore gp.GP_WIDGET_TOGGLE: "GP_WIDGET_TOGGLE", # type: ignore
gp.GP_WIDGET_RADIO: "GP_WIDGET_RADIO", # type: ignore gp.GP_WIDGET_RADIO: "GP_WIDGET_RADIO", # type: ignore
gp.GP_WIDGET_MENU: "GP_WIDGET_MENU", # type: ignore gp.GP_WIDGET_MENU: "GP_WIDGET_MENU", # type: ignore
gp.GP_WIDGET_BUTTON: "GP_WIDGET_BUTTON", # type: ignore gp.GP_WIDGET_BUTTON: "GP_WIDGET_BUTTON", # type: ignore
gp.GP_WIDGET_DATE: "GP_WIDGET_DATE", # type: ignore gp.GP_WIDGET_DATE: "GP_WIDGET_DATE", # type: ignore
} }
operations = [
("GP_OPERATION_NONE", gp.GP_OPERATION_NONE), # type: ignore
("GP_OPERATION_CAPTURE_IMAGE", gp.GP_OPERATION_CAPTURE_IMAGE), # type: ignore
("GP_OPERATION_CAPTURE_VIDEO", gp.GP_OPERATION_CAPTURE_VIDEO), # type: ignore
("GP_OPERATION_CAPTURE_AUDIO", gp.GP_OPERATION_CAPTURE_AUDIO), # type: ignore
("GP_OPERATION_CAPTURE_PREVIEW", gp.GP_OPERATION_CAPTURE_PREVIEW), # type: ignore
("GP_OPERATION_CONFIG", gp.GP_OPERATION_CONFIG), # type: ignore
("GP_OPERATION_TRIGGER_CAPTURE", gp.GP_OPERATION_TRIGGER_CAPTURE), # type: ignore
]
class GPhotoCamera(BaseCamera): class GPhotoCamera(BaseCamera):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
self.camera = None self.camera = None
self.configs: List[dict] = [] self.configs: List[dict] = []
self.camera_index = 0
def connect(self) -> bool: @staticmethod
self.error_msg = None def detect() -> dict:
try: cameras = gp.check_result(gp.gp_camera_autodetect()) # type: ignore
self.camera = gp.Camera() # type: ignore # cameras = gp.Camera().autodetect()
self.camera.init() if not cameras or cameras.count() == 0: # type: ignore
config = self.camera.get_config() return {}
self.read_config(config)
return True
except Exception as e:
self.error_msg = f"[GPHOTO2] {e}"
self.camera = None
return False
def disconnect(self) -> None: abilities_list = gp.CameraAbilitiesList() # type: ignore
if self.camera: abilities_list.load()
self.camera.exit() camera_list = {}
self.camera = None for i in range(cameras.count()): # type: ignore
self.configs.clear() name = cameras.get_name(i) # type: ignore
port = cameras.get_value(i) # type: ignore
def get_frame(self): abilities_index = abilities_list.lookup_model(name)
self.error_msg = None abilities = abilities_list.get_abilities(abilities_index)
abilities_name = []
for name, bit in operations:
if abilities.operations & bit: # type: ignore
abilities_name.append(name)
if self.camera is None: camera_list[i] = {"name": name, "port": port, "abilities": abilities_name}
self.error_msg = "[GPHOTO2] Camera is not initialized." return camera_list
return (False, None)
try: def connect(self, index: int | None = None) -> bool:
file = self.camera.capture_preview() # type: ignore self.error_msg = None
data = file.get_data_and_size() self.camera = gp.Camera() # type: ignore
frame = np.frombuffer(data, dtype=np.uint8)
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
return (True, frame) try:
except Exception as e: if index:
self.error_msg = f"[GPHOTO2] {e}" self.camera_index = index
return (False, None) camera_list = GPhotoCamera.detect()
port_info_list = gp.PortInfoList()
port_info_list.load()
def get_config_by_id(self, id: int): port_address = camera_list[index]["port"]
return next(w for w in self.configs if w['id'] == id) port_index = port_info_list.lookup_path(port_address)
def get_config_by_name(self, name: str): self.camera.set_port_info(port_info_list[port_index])
return next(w for w in self.configs if w['name'] == name)
def set_config(self, config, value): self.camera.init()
if value not in config['choices']: config = self.camera.get_config()
return self.read_config(config)
return True
except Exception as e:
self.error_msg = f"[GPHOTO2] {e}"
self.camera = None
return False
config['config'].set_value(value) # type: ignore def disconnect(self) -> None:
if self._save_config(config): if self.camera:
config['value'] = value self.camera.exit()
self.camera = None
self.configs.clear()
def set_config_by_id(self, id: int, value: str): def get_frame(self):
config = self.get_config_by_id(id) self.error_msg = None
self.set_config(config, value) if self.camera is None:
self.error_msg = "[GPHOTO2] Camera is not initialized."
return (False, None)
def set_config_by_name(self, name: str, value: str): try:
config = self.get_config_by_name(name) file = self.camera.capture_preview() # type: ignore
data = file.get_data_and_size()
frame = np.frombuffer(data, dtype=np.uint8)
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
self.set_config(config, value) return (True, frame)
except Exception as e:
self.error_msg = f"[GPHOTO2] {e}"
return (False, None)
def _save_config(self, config): def get_config_by_id(self, id: int):
if not self.camera: return next(w for w in self.configs if w['id'] == id)
return False
self.camera.set_single.config(config['name'], config['widget']) def get_config_by_name(self, name: str):
return True return next(w for w in self.configs if w['name'] == name)
def parse_config(self, config): def set_config(self, config, value):
new_config = { if value not in config['choices']:
"id": config.get_id(), return
"name": config.get_name(),
"label": config.get_label(),
"type": camera_widget_types[config.get_type()],
"widget": config
}
try: config['widget'].set_value(value) # type: ignore
new_config["value"] = config.get_value() if self._save_config(config):
except gp.GPhoto2Error: config['value'] = value
pass
try: def set_config_by_id(self, id: int, value: str):
new_config["choices"] = list(config.get_choices()) config = self.get_config_by_id(id)
except gp.GPhoto2Error:
pass
return new_config self.set_config(config, value)
def read_config(self, config): def set_config_by_name(self, name: str, value: str):
self.configs.append(self.parse_config(config)) config = self.get_config_by_name(name)
for i in range(config.count_children()): self.set_config(config, value)
child = config.get_child(i)
self.read_config(child) def _save_config(self, config):
if not self.camera:
return False
self.camera.set_single_config(config['name'], config['widget'])
return True
def parse_config(self, config):
new_config = {
"id": config.get_id(),
"name": config.get_name(),
"label": config.get_label(),
"type": camera_widget_types[config.get_type()],
"widget": config
}
try:
new_config["value"] = config.get_value()
except gp.GPhoto2Error:
pass
try:
new_config["choices"] = list(config.get_choices())
except gp.GPhoto2Error:
pass
return new_config
def read_config(self, config):
self.configs.append(self.parse_config(config))
for i in range(config.count_children()):
child = config.get_child(i)
self.read_config(child)

View File

@@ -1,98 +1,122 @@
import cv2 import cv2
from cv2_enumerate_cameras import enumerate_cameras
from typing import List from typing import List
from .base_camera import BaseCamera from .base_camera import BaseCamera
class OpenCvCamera(BaseCamera):
"""Kamera oparta na cv2.VideoCapture"""
config_map = {
0: {"name": "frame_width", "cv_prop": cv2.CAP_PROP_FRAME_WIDTH, "default": 640},
1: {"name": "frame_height", "cv_prop": cv2.CAP_PROP_FRAME_HEIGHT, "default": 480},
2: {"name": "fps", "cv_prop": cv2.CAP_PROP_FPS, "default": 30},
3: {"name": "brightness", "cv_prop": cv2.CAP_PROP_BRIGHTNESS, "default": 0.5},
4: {"name": "contrast", "cv_prop": cv2.CAP_PROP_CONTRAST, "default": 0.5},
5: {"name": "saturation", "cv_prop": cv2.CAP_PROP_SATURATION, "default": 0.5},
}
class CvCamera(BaseCamera): def __init__(self) -> None:
"""Kamera oparta na cv2.VideoCapture""" super().__init__()
self.cap = None
self.configs: List[dict] = []
self.camera_list = []
self.camera_index = 0
config_map = { @staticmethod
0: {"name": "frame_width", "cv_prop": cv2.CAP_PROP_FRAME_WIDTH, "default": 640}, def detect():
1: {"name": "frame_height", "cv_prop": cv2.CAP_PROP_FRAME_HEIGHT, "default": 480}, camera_list = enumerate_cameras(cv2.CAP_ANY)
2: {"name": "fps", "cv_prop": cv2.CAP_PROP_FPS, "default": 30}, result = {}
3: {"name": "brightness", "cv_prop": cv2.CAP_PROP_BRIGHTNESS, "default": 0.5},
4: {"name": "contrast", "cv_prop": cv2.CAP_PROP_CONTRAST, "default": 0.5},
5: {"name": "saturation", "cv_prop": cv2.CAP_PROP_SATURATION, "default": 0.5},
}
def __init__(self, device_index: int = 0) -> None: for camera in camera_list:
super().__init__() cap = cv2.VideoCapture(camera.index, camera.backend)
self.device_index = device_index # ret, frame = cap.read()
self.cap = None cap.release()
self.configs: List[dict] = []
def connect(self) -> bool: # if ret and frame is not None and frame.size > 0:
self.error_msg = None result[camera.index] = {
try: "name": camera.name,
self.cap = cv2.VideoCapture(self.device_index) "port": camera.path,
if not self.cap.isOpened(): "backend": camera.backend,
self.error_msg = f"[CV2] Could not open camera {self.device_index}" }
return False
self.configs.clear() return result
for id, conf in self.config_map.items():
value = self.cap.get(conf["cv_prop"])
self.configs.append(
{
"id": id,
"name": conf["name"],
"label": conf["name"].capitalize(),
"value": value,
"choices": None, # brak predefiniowanych wyborów
"cv_prop": conf["cv_prop"],
}
)
return True
except Exception as e: def connect(self, index: int | None = None) -> bool:
self.error_msg = f"[CV2] {e}" self.error_msg = None
self.cap = None try:
return False if index:
self.camera_index = index
def disconnect(self) -> None: self.cap = cv2.VideoCapture(self.camera_index)
if self.cap:
self.cap.release()
self.cap = None
self.configs.clear()
def get_frame(self): if not self.cap.isOpened():
self.error_msg = None self.error_msg = f"[CV2] Could not open camera {self.camera_index}"
if self.cap is None or not self.cap.isOpened(): return False
self.error_msg = "[CV2] Camera is not initialized."
return (False, None)
try: self.configs.clear()
ret, frame = self.cap.read() for id, conf in self.config_map.items():
if not ret: value = self.cap.get(conf["cv_prop"])
self.error_msg = "[CV2] Failed to read frame." self.configs.append(
return (False, None) {
return (True, frame) "id": id,
except Exception as e: "name": conf["name"],
self.error_msg = f"[CV2] {e}" "label": conf["name"].capitalize(),
return (False, None) "value": value,
"choices": None, # brak predefiniowanych wyborów
"cv_prop": conf["cv_prop"],
}
)
return True
def get_config_by_id(self, id: int): except Exception as e:
return next(w for w in self.configs if w["id"] == id) self.error_msg = f"[CV2] {e}"
self.cap = None
return False
def get_config_by_name(self, name: str): def disconnect(self) -> None:
return next(w for w in self.configs if w["name"] == name) if self.cap:
self.cap.release()
self.cap = None
self.configs.clear()
def set_config(self, config, value: float): def get_frame(self):
if not self.cap: self.error_msg = None
return if self.cap is None or not self.cap.isOpened():
try: self.error_msg = "[CV2] Camera is not initialized."
self.cap.set(config["cv_prop"], value) return (False, None)
config["value"] = self.cap.get(config["cv_prop"]) # sprawdz co ustawiło
except Exception as e:
self.error_msg = f"[CV2] {e}"
def set_config_by_id(self, id: int, value: float): try:
config = self.get_config_by_id(id) ret, frame = self.cap.read()
self.set_config(config, value) if not ret:
self.error_msg = "[CV2] Failed to read frame."
return (False, None)
return (True, frame)
except Exception as e:
self.error_msg = f"[CV2] {e}"
return (False, None)
def set_config_by_name(self, name: str, value: float): def get_config_by_id(self, id: int):
config = self.get_config_by_name(name) return next(w for w in self.configs if w["id"] == id)
self.set_config(config, value)
def get_config_by_name(self, name: str):
return next(w for w in self.configs if w["name"] == name)
def set_config(self, config, value: float):
if not self.cap:
return
try:
self.cap.set(config["cv_prop"], value)
config["value"] = self.cap.get(
config["cv_prop"]) # sprawdz co ustawiło
except Exception as e:
self.error_msg = f"[CV2] {e}"
def set_config_by_id(self, id: int, value: float):
config = self.get_config_by_id(id)
self.set_config(config, value)
def set_config_by_name(self, name: str, value: float):
config = self.get_config_by_name(name)
self.set_config(config, value)