6 Commits

7 changed files with 561 additions and 343 deletions

View File

@@ -5,89 +5,96 @@ from core.media import MediaRepository
from ui.widgets.color_list_widget import ColorListWidget from ui.widgets.color_list_widget import ColorListWidget
from ui.widgets.thumbnail_list_widget import ThumbnailListWidget from ui.widgets.thumbnail_list_widget import ThumbnailListWidget
from ui.widgets.split_view_widget import SplitView from ui.widgets.split_view_widget import SplitView
from .camera_controller import CameraController # from .camera_controller import CameraController
from core.camera.camera_controller import CameraController
from core.camera.gphoto_camera import GPhotoCamera
from core.camera.camera_manager import CameraManager from core.camera.camera_manager import CameraManager
from core.camera.gphoto_camera import GPhotoCamera
from core.camera.camera_controller import CameraController
class MainController: class MainController:
def __init__(self, view): def __init__(self, view):
self.db = DatabaseManager() self.db = DatabaseManager()
self.db.connect() self.db.connect()
self.media_repo = MediaRepository(self.db) self.media_repo = MediaRepository(self.db)
self.media_repo.sync_media() self.media_repo.sync_media()
camera = GPhotoCamera() # camera = GPhotoCamera()
self.manager = CameraManager(camera) # self.manager = CameraController(camera)
manager = CameraManager()
manager.detect_gphoto()
manager.detect_opencv()
# self.camera_controller = CameraController()
self.view = view
self.color_list: ColorListWidget = view.color_list_widget
self.thumbnail_list: ThumbnailListWidget = view.thumbnail_widget
self.split_view: SplitView = view.preview_widget
self.photo_button: QPushButton = view.photo_button
self.photo_button.clicked.connect(self.take_photo)
self.record_button: QPushButton = view.record_button
# self.record_button.clicked.connect(self.fun_test)
self.color_list.colorSelected.connect(self.on_color_selected)
self.color_list.editColor.connect(self.on_edit_color)
self.thumbnail_list.selectedThumbnail.connect(self.on_thumbnail_selected)
# self.camera_controller.errorOccurred.connect(self.split_view.widget_start.set_info_text)
# self.manager.error_occurred.connect(self.split_view.widget_start.set_info_text)
# self.camera_controller.frameReady.connect(self.split_view.set_live_image)
# self.manager.frame_ready.connect(self.split_view.set_live_image)
# self.split_view.widget_start.camera_start_btn.clicked.connect(self.camera_controller.start)
self.split_view.widget_start.camera_start_btn.clicked.connect(self.start_liveview)
# self.camera_controller = CameraController() def start_camera(self):
pass
self.view = view def load_colors(self) -> None:
self.color_list: ColorListWidget = view.color_list_widget colors = self.db.get_all_colors()
self.thumbnail_list: ThumbnailListWidget = view.thumbnail_widget print("Loaded colors:", colors)
self.split_view: SplitView = view.preview_widget self.color_list.set_colors(colors)
self.photo_button: QPushButton = view.photo_button
self.photo_button.clicked.connect(self.take_photo)
self.record_button: QPushButton = view.record_button
# self.record_button.clicked.connect(self.fun_test)
self.color_list.colorSelected.connect(self.on_color_selected)
self.color_list.editColor.connect(self.on_edit_color)
self.thumbnail_list.selectedThumbnail.connect(self.on_thumbnail_selected)
# self.camera_controller.errorOccurred.connect(self.split_view.widget_start.set_info_text)
self.manager.error_occurred.connect(self.split_view.widget_start.set_info_text)
# self.camera_controller.frameReady.connect(self.split_view.set_live_image)
self.manager.frame_ready.connect(self.split_view.set_live_image)
# self.split_view.widget_start.camera_start_btn.clicked.connect(self.camera_controller.start)
self.split_view.widget_start.camera_start_btn.clicked.connect(self.start_liveview)
def start_camera(self): def on_color_selected(self, color_name: str):
pass print(f"Wybrano kolor: {color_name}")
color_id = self.db.get_color_id(color_name)
if color_id is not None:
media_items = self.db.get_media_for_color(color_id)
print(f"Media dla koloru {color_name} (ID: {color_id}):", media_items)
def load_colors(self) -> None: self.thumbnail_list.list_widget.clear()
colors = self.db.get_all_colors() for media in media_items:
print("Loaded colors:", colors) if media['file_type'] == 'photo':
self.color_list.set_colors(colors) file_name = Path(media['media_path']).name
self.thumbnail_list.add_thumbnail(media['media_path'], file_name, media['id'])
else:
print(f"Nie znaleziono koloru o nazwie: {color_name}")
def on_edit_color(self, color_name: str):
print(f"Edycja koloru: {color_name}")
def on_color_selected(self, color_name: str): def on_thumbnail_selected(self, media_id: int):
print(f"Wybrano kolor: {color_name}") media = self.db.get_media(media_id)
color_id = self.db.get_color_id(color_name) if media:
if color_id is not None: print(f"Wybrano miniaturę o ID: {media_id}, ścieżka: {media['media_path']}")
media_items = self.db.get_media_for_color(color_id) self.split_view.set_reference_image(media['media_path'])
print(f"Media dla koloru {color_name} (ID: {color_id}):", media_items) else:
print(f"Nie znaleziono mediów o ID: {media_id}")
self.thumbnail_list.list_widget.clear() def take_photo(self):
for media in media_items: print("Robienie zdjęcia...")
if media['file_type'] == 'photo': self.split_view.toglle_live_view()
file_name = Path(media['media_path']).name
self.thumbnail_list.add_thumbnail(media['media_path'], file_name, media['id'])
else:
print(f"Nie znaleziono koloru o nazwie: {color_name}")
def on_edit_color(self, color_name: str): def start_liveview(self):
print(f"Edycja koloru: {color_name}") pass
# self.manager.start_camera()
# self.manager.start_stream()
def on_thumbnail_selected(self, media_id: int): def shutdown(self):
media = self.db.get_media(media_id) pass
if media: # self.manager.stop()
print(f"Wybrano miniaturę o ID: {media_id}, ścieżka: {media['media_path']}")
self.split_view.set_reference_image(media['media_path'])
else:
print(f"Nie znaleziono mediów o ID: {media_id}")
def take_photo(self):
print("Robienie zdjęcia...")
self.split_view.toglle_live_view()
def start_liveview(self):
self.manager.start_camera()
# self.manager.start_stream()
def shutdown(self):
self.manager.stop()

View File

@@ -1,6 +1,16 @@
import cv2 import cv2
import numpy as np import numpy as np
GP_WIDGET_WINDOW = 0
GP_WIDGET_SECTION = 0
GP_WIDGET_TEXT = 0
GP_WIDGET_RANGE = 0
GP_WIDGET_TOGGLE = 0
GP_WIDGET_RADIO = 0
GP_WIDGET_MENU = 0
GP_WIDGET_BUTTON = 0
GP_WIDGET_DATE = 0
class GPhoto2Error(Exception): class GPhoto2Error(Exception):
pass pass
@@ -19,6 +29,65 @@ class CameraFileMock:
return self._data return self._data
return self._data, len(self._data) return self._data, len(self._data)
class CameraListMock:
def count(self):
return 1
def get_name(self, idx):
return f"mock_name {idx}"
def get_value(self, idx):
return f"mock_value {idx}"
class MockPortInfo:
def __init__(self, address):
self.address = address
class PortInfoList:
def __init__(self):
self._ports = []
def load(self):
# Dodaj przykładowe porty
self._ports = [MockPortInfo("usb:001,002"), MockPortInfo("usb:001,003")]
def lookup_path(self, port_address):
for idx, port in enumerate(self._ports):
if port.address == port_address:
return idx
raise ValueError("Port not found")
def __getitem__(self, idx):
return self._ports[idx]
class ConfigMock:
def get_id(self):
return 0
def get_name(self):
return "name"
def get_label(self):
return "label"
def get_type(self):
return 0
def get_value(self):
return "value"
def get_choices(self):
return []
def count_children(self):
return 0
def get_child(self):
return ConfigMock()
class CameraAbilitiesList:
def __init__(self) -> None:
self.abilities = []
def load(self):
return
def lookup_model(self, name):
return 1
def get_abilities(self, abilities_index):
return 0
class Camera: class Camera:
def __init__(self): def __init__(self):
@@ -62,3 +131,18 @@ class Camera:
self._frame_counter += 1 self._frame_counter += 1
return CameraFileMock(frame) return CameraFileMock(frame)
def set_port_info(self, obj):
return False
def get_config(self):
return ConfigMock()
def set_single_config(self, name, widget):
return True
def gp_camera_autodetect():
return CameraListMock()
def check_result(obj):
return obj

View File

@@ -2,36 +2,41 @@ from abc import ABC, abstractmethod
class BaseCamera(ABC): class BaseCamera(ABC):
def __init__(self) -> None: def __init__(self) -> None:
self.error_msg = None self.error_msg = None
@abstractmethod @staticmethod
def connect(self) -> bool: @abstractmethod
raise NotImplementedError def detect() -> dict:
raise NotImplementedError
@abstractmethod @abstractmethod
def disconnect(self) -> None: def connect(self, index: int | None = None) -> bool:
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def get_frame(self): def disconnect(self) -> None:
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def get_config_by_id(self, id: int) -> dict: def get_frame(self):
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def get_config_by_name(self, name: str) -> dict: def get_config_by_id(self, id: int) -> dict:
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def set_config_by_id(self, id: int, value) -> None: def get_config_by_name(self, name: str) -> dict:
raise NotImplementedError raise NotImplementedError
@abstractmethod @abstractmethod
def set_config_by_name(self, name: str, value) -> None: def set_config_by_id(self, id: int, value) -> None:
raise NotImplementedError raise NotImplementedError
def get_error_msg(self): @abstractmethod
return str(self.error_msg) def set_config_by_name(self, name: str, value) -> None:
raise NotImplementedError
def get_error_msg(self):
return str(self.error_msg)

View File

@@ -0,0 +1,115 @@
from PySide6.QtCore import QObject, QThread, QTimer, Signal, Slot, QMutex, QMutexLocker
from PySide6.QtGui import QImage, QPixmap
import cv2
from .base_camera import BaseCamera
class CameraController(QThread):
frame_ready = Signal(QPixmap)
photo_ready = Signal(QPixmap)
error_occurred = Signal(str)
_enable_timer = Signal(bool)
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self.camera = None
self.timer = None
self.fps = 15
self.is_streaming = False
self.is_connected = False
self._camera_mutex = QMutex()
self.start()
def run(self) -> None:
self.timer = QTimer()
self.timer.timeout.connect(self._update_frame)
self._enable_timer.connect(self._set_timer)
self.exec()
def stop(self):
self.stop_camera()
self.quit()
self.wait()
def set_camera(self, camera: BaseCamera, fps: int = 15) -> None:
with QMutexLocker(self._camera_mutex):
self.stop_stream()
self.stop_camera()
self.camera = camera
self.fps = fps
def start_camera(self) -> None:
if self.camera is None or self.is_connected:
return
if self.camera.connect():
self.is_connected = True
else:
self.is_connected = False
self.error_occurred.emit(self.camera.get_error_msg())
def stop_camera(self) -> None:
if self.is_streaming:
self.stop_stream()
if self.camera is not None:
self.camera.disconnect()
self.is_connected = False
def start_stream(self):
if not self.is_connected:
return
if self.is_streaming:
return
if self.timer:
self.is_streaming = True
# self.timer.start()
self._enable_timer.emit(True)
def stop_stream(self) -> None:
if self.is_streaming:
self.is_streaming = False
if self.timer:
# self.timer.stop()
self._enable_timer.emit(False)
def _update_frame(self) -> None:
with QMutexLocker(self._camera_mutex):
if self.camera is None or not self.is_connected:
return
if not self.is_streaming:
return
ret, frame = self.camera.get_frame()
if not ret:
self.error_occurred.emit(self.camera.get_error_msg())
return
if frame is not None:
rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w, ch = rgb_image.shape
qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888)
pixmap = QPixmap.fromImage(qimg)
self.frame_ready.emit(pixmap)
def _set_timer(self, enable: bool):
if not self.timer:
return
if enable:
self.timer.setInterval(int(1000 / self.fps))
self.timer.start()
else:
self.timer.stop()

View File

@@ -1,88 +1,20 @@
from PySide6.QtCore import QObject, QThread, QTimer, Signal, Slot
from PySide6.QtGui import QImage, QPixmap
import cv2
from .base_camera import BaseCamera
class CameraManager(QThread): from .gphoto_camera import GPhotoCamera
frame_ready = Signal(QPixmap) from .opencv_camera import OpenCvCamera
photo_ready = Signal(QPixmap) from .camera_controller import CameraController
error_occurred = Signal(str)
def __init__(self, camera: BaseCamera, fps: int = 15, parent: QObject | None = None) -> None:
super().__init__(parent)
self.camera = camera
self.fps = fps
self.timer = None
self.is_streaming = False
self.is_connected = False class CameraManager:
def __init__(self) -> None:
pass
self.timer = QTimer() def detect_gphoto(self):
self.timer.setInterval(int(1000 / self.fps)) camera_list = GPhotoCamera.detect()
self.timer.timeout.connect(self._update_frame) print(camera_list)
self.start() return camera_list
def run(self) -> None: def detect_opencv(self):
# self.timer = QTimer() camera_list = OpenCvCamera.detect()
# self.timer.setInterval(int(1000 / self.fps)) print(camera_list)
# self.timer.timeout.connect(self._update_frame) return camera_list
self.exec()
def start_camera(self) -> None:
if self.is_connected:
return
if self.camera.connect():
self.is_connected = True
else:
self.is_connected = False
self.error_occurred.emit(self.camera.get_error_msg())
def stop_camera(self) -> None:
self.is_streaming = False
self.is_connected = False
if self.timer:
self.timer.stop()
self.camera.disconnect()
def start_stream(self):
if not self.is_connected:
return
if self.is_streaming:
return
if self.timer:
self.is_streaming = True
self.timer.start()
def stop_stream(self) -> None:
if self.is_streaming:
self.is_streaming = False
if self.timer:
self.timer.stop()
def _update_frame(self) -> None:
if not self.is_streaming or not self.is_connected:
return
ret, frame = self.camera.get_frame()
if not ret:
self.error_occurred.emit(self.camera.get_error_msg())
return
if frame is not None:
rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w, ch = rgb_image.shape
qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888)
pixmap = QPixmap.fromImage(qimg)
self.frame_ready.emit(pixmap)
def stop(self):
self.stop_camera()
self.quit()
self.wait()

View File

@@ -1,122 +1,173 @@
from typing import Optional, List from typing import Optional, List
from dataclasses import dataclass, field from dataclasses import dataclass, field
import gphoto2 as gp
import cv2 import cv2
import numpy as np import numpy as np
from .base_camera import BaseCamera from .base_camera import BaseCamera
try:
import gphoto2 as gp # type: ignore
except:
import controllers.mock_gphoto as gp
camera_widget_types = { camera_widget_types = {
gp.GP_WIDGET_WINDOW: "GP_WIDGET_WINDOW", # type: ignore gp.GP_WIDGET_WINDOW: "GP_WIDGET_WINDOW", # type: ignore
gp.GP_WIDGET_SECTION: "GP_WIDGET_SECTION", # type: ignore gp.GP_WIDGET_SECTION: "GP_WIDGET_SECTION", # type: ignore
gp.GP_WIDGET_TEXT: "GP_WIDGET_TEXT", # type: ignore gp.GP_WIDGET_TEXT: "GP_WIDGET_TEXT", # type: ignore
gp.GP_WIDGET_RANGE: "GP_WIDGET_RANGE", # type: ignore gp.GP_WIDGET_RANGE: "GP_WIDGET_RANGE", # type: ignore
gp.GP_WIDGET_TOGGLE: "GP_WIDGET_TOGGLE", # type: ignore gp.GP_WIDGET_TOGGLE: "GP_WIDGET_TOGGLE", # type: ignore
gp.GP_WIDGET_RADIO: "GP_WIDGET_RADIO", # type: ignore gp.GP_WIDGET_RADIO: "GP_WIDGET_RADIO", # type: ignore
gp.GP_WIDGET_MENU: "GP_WIDGET_MENU", # type: ignore gp.GP_WIDGET_MENU: "GP_WIDGET_MENU", # type: ignore
gp.GP_WIDGET_BUTTON: "GP_WIDGET_BUTTON", # type: ignore gp.GP_WIDGET_BUTTON: "GP_WIDGET_BUTTON", # type: ignore
gp.GP_WIDGET_DATE: "GP_WIDGET_DATE", # type: ignore gp.GP_WIDGET_DATE: "GP_WIDGET_DATE", # type: ignore
} }
operations = [
("GP_OPERATION_NONE", gp.GP_OPERATION_NONE), # type: ignore
("GP_OPERATION_CAPTURE_IMAGE", gp.GP_OPERATION_CAPTURE_IMAGE), # type: ignore
("GP_OPERATION_CAPTURE_VIDEO", gp.GP_OPERATION_CAPTURE_VIDEO), # type: ignore
("GP_OPERATION_CAPTURE_AUDIO", gp.GP_OPERATION_CAPTURE_AUDIO), # type: ignore
("GP_OPERATION_CAPTURE_PREVIEW", gp.GP_OPERATION_CAPTURE_PREVIEW), # type: ignore
("GP_OPERATION_CONFIG", gp.GP_OPERATION_CONFIG), # type: ignore
("GP_OPERATION_TRIGGER_CAPTURE", gp.GP_OPERATION_TRIGGER_CAPTURE), # type: ignore
]
class GPhotoCamera(BaseCamera): class GPhotoCamera(BaseCamera):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
self.camera = None self.camera = None
self.configs: List[dict] = [] self.configs: List[dict] = []
self.camera_index = 0
def connect(self) -> bool: @staticmethod
self.error_msg = None def detect() -> dict:
try: cameras = gp.check_result(gp.gp_camera_autodetect()) # type: ignore
self.camera = gp.Camera() # type: ignore # cameras = gp.Camera().autodetect()
self.camera.init() if not cameras or cameras.count() == 0: # type: ignore
config = self.camera.get_config() return {}
self.read_config(config)
return True abilities_list = gp.CameraAbilitiesList() # type: ignore
except Exception as e: abilities_list.load()
self.error_msg = f"[GPHOTO2] {e}" camera_list = {}
self.camera = None for i in range(cameras.count()): # type: ignore
return False name = cameras.get_name(i) # type: ignore
port = cameras.get_value(i) # type: ignore
def disconnect(self) -> None: abilities_index = abilities_list.lookup_model(name)
if self.camera: abilities = abilities_list.get_abilities(abilities_index)
self.camera.exit() abilities_name = []
self.camera = None for name, bit in operations:
self.configs.clear() if abilities.operations & bit: # type: ignore
abilities_name.append(name)
def get_frame(self): camera_list[i] = {"name": name, "port": port, "abilities": abilities_name}
self.error_msg = None return camera_list
def connect(self, index: int | None = None) -> bool:
self.error_msg = None
self.camera = gp.Camera() # type: ignore
try:
if index:
self.camera_index = index
camera_list = GPhotoCamera.detect()
port_info_list = gp.PortInfoList()
port_info_list.load()
if self.camera is None: port_address = camera_list[index]["port"]
self.error_msg = "[GPHOTO2] Camera is not initialized." port_index = port_info_list.lookup_path(port_address)
return (False, None)
self.camera.set_port_info(port_info_list[port_index])
self.camera.init()
config = self.camera.get_config()
self.read_config(config)
return True
except Exception as e:
self.error_msg = f"[GPHOTO2] {e}"
self.camera = None
return False
try: def disconnect(self) -> None:
file = self.camera.capture_preview() # type: ignore if self.camera:
data = file.get_data_and_size() self.camera.exit()
frame = np.frombuffer(data, dtype=np.uint8) self.camera = None
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR) self.configs.clear()
return (True, frame) def get_frame(self):
except Exception as e: self.error_msg = None
self.error_msg = f"[GPHOTO2] {e}"
return (False, None)
def get_config_by_id(self, id: int): if self.camera is None:
return next(w for w in self.configs if w['id'] == id) self.error_msg = "[GPHOTO2] Camera is not initialized."
return (False, None)
def get_config_by_name(self, name: str): try:
return next(w for w in self.configs if w['name'] == name) file = self.camera.capture_preview() # type: ignore
data = file.get_data_and_size()
frame = np.frombuffer(data, dtype=np.uint8)
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
def set_config(self, config, value): return (True, frame)
if value not in config['choices']: except Exception as e:
return self.error_msg = f"[GPHOTO2] {e}"
return (False, None)
config['config'].set_value(value) # type: ignore def get_config_by_id(self, id: int):
if self._save_config(config): return next(w for w in self.configs if w['id'] == id)
config['value'] = value
def set_config_by_id(self, id: int, value: str): def get_config_by_name(self, name: str):
config = self.get_config_by_id(id) return next(w for w in self.configs if w['name'] == name)
self.set_config(config, value) def set_config(self, config, value):
if value not in config['choices']:
return
def set_config_by_name(self, name: str, value: str): config['widget'].set_value(value) # type: ignore
config = self.get_config_by_name(name) if self._save_config(config):
config['value'] = value
self.set_config(config, value) def set_config_by_id(self, id: int, value: str):
config = self.get_config_by_id(id)
def _save_config(self, config): self.set_config(config, value)
if not self.camera:
return False
self.camera.set_single.config(config['name'], config['widget']) def set_config_by_name(self, name: str, value: str):
return True config = self.get_config_by_name(name)
def parse_config(self, config): self.set_config(config, value)
new_config = {
"id": config.get_id(),
"name": config.get_name(),
"label": config.get_label(),
"type": camera_widget_types[config.get_type()],
"widget": config
}
try: def _save_config(self, config):
new_config["value"] = config.get_value() if not self.camera:
except gp.GPhoto2Error: return False
pass
try: self.camera.set_single_config(config['name'], config['widget'])
new_config["choices"] = list(config.get_choices()) return True
except gp.GPhoto2Error:
pass
return new_config def parse_config(self, config):
new_config = {
"id": config.get_id(),
"name": config.get_name(),
"label": config.get_label(),
"type": camera_widget_types[config.get_type()],
"widget": config
}
def read_config(self, config): try:
self.configs.append(self.parse_config(config)) new_config["value"] = config.get_value()
except gp.GPhoto2Error:
pass
for i in range(config.count_children()): try:
child = config.get_child(i) new_config["choices"] = list(config.get_choices())
self.read_config(child) except gp.GPhoto2Error:
pass
return new_config
def read_config(self, config):
self.configs.append(self.parse_config(config))
for i in range(config.count_children()):
child = config.get_child(i)
self.read_config(child)

View File

@@ -1,98 +1,122 @@
import cv2 import cv2
from cv2_enumerate_cameras import enumerate_cameras
from typing import List from typing import List
from .base_camera import BaseCamera from .base_camera import BaseCamera
class OpenCvCamera(BaseCamera):
"""Kamera oparta na cv2.VideoCapture"""
config_map = {
0: {"name": "frame_width", "cv_prop": cv2.CAP_PROP_FRAME_WIDTH, "default": 640},
1: {"name": "frame_height", "cv_prop": cv2.CAP_PROP_FRAME_HEIGHT, "default": 480},
2: {"name": "fps", "cv_prop": cv2.CAP_PROP_FPS, "default": 30},
3: {"name": "brightness", "cv_prop": cv2.CAP_PROP_BRIGHTNESS, "default": 0.5},
4: {"name": "contrast", "cv_prop": cv2.CAP_PROP_CONTRAST, "default": 0.5},
5: {"name": "saturation", "cv_prop": cv2.CAP_PROP_SATURATION, "default": 0.5},
}
class CvCamera(BaseCamera): def __init__(self) -> None:
"""Kamera oparta na cv2.VideoCapture""" super().__init__()
self.cap = None
self.configs: List[dict] = []
self.camera_list = []
self.camera_index = 0
config_map = { @staticmethod
0: {"name": "frame_width", "cv_prop": cv2.CAP_PROP_FRAME_WIDTH, "default": 640}, def detect():
1: {"name": "frame_height", "cv_prop": cv2.CAP_PROP_FRAME_HEIGHT, "default": 480}, camera_list = enumerate_cameras(cv2.CAP_ANY)
2: {"name": "fps", "cv_prop": cv2.CAP_PROP_FPS, "default": 30}, result = {}
3: {"name": "brightness", "cv_prop": cv2.CAP_PROP_BRIGHTNESS, "default": 0.5},
4: {"name": "contrast", "cv_prop": cv2.CAP_PROP_CONTRAST, "default": 0.5},
5: {"name": "saturation", "cv_prop": cv2.CAP_PROP_SATURATION, "default": 0.5},
}
def __init__(self, device_index: int = 0) -> None: for camera in camera_list:
super().__init__() cap = cv2.VideoCapture(camera.index, camera.backend)
self.device_index = device_index # ret, frame = cap.read()
self.cap = None cap.release()
self.configs: List[dict] = []
def connect(self) -> bool: # if ret and frame is not None and frame.size > 0:
self.error_msg = None result[camera.index] = {
try: "name": camera.name,
self.cap = cv2.VideoCapture(self.device_index) "port": camera.path,
if not self.cap.isOpened(): "backend": camera.backend,
self.error_msg = f"[CV2] Could not open camera {self.device_index}" }
return False
self.configs.clear() return result
for id, conf in self.config_map.items():
value = self.cap.get(conf["cv_prop"])
self.configs.append(
{
"id": id,
"name": conf["name"],
"label": conf["name"].capitalize(),
"value": value,
"choices": None, # brak predefiniowanych wyborów
"cv_prop": conf["cv_prop"],
}
)
return True
except Exception as e: def connect(self, index: int | None = None) -> bool:
self.error_msg = f"[CV2] {e}" self.error_msg = None
self.cap = None try:
return False if index:
self.camera_index = index
def disconnect(self) -> None: self.cap = cv2.VideoCapture(self.camera_index)
if self.cap:
self.cap.release()
self.cap = None
self.configs.clear()
def get_frame(self): if not self.cap.isOpened():
self.error_msg = None self.error_msg = f"[CV2] Could not open camera {self.camera_index}"
if self.cap is None or not self.cap.isOpened(): return False
self.error_msg = "[CV2] Camera is not initialized."
return (False, None)
try: self.configs.clear()
ret, frame = self.cap.read() for id, conf in self.config_map.items():
if not ret: value = self.cap.get(conf["cv_prop"])
self.error_msg = "[CV2] Failed to read frame." self.configs.append(
return (False, None) {
return (True, frame) "id": id,
except Exception as e: "name": conf["name"],
self.error_msg = f"[CV2] {e}" "label": conf["name"].capitalize(),
return (False, None) "value": value,
"choices": None, # brak predefiniowanych wyborów
"cv_prop": conf["cv_prop"],
}
)
return True
def get_config_by_id(self, id: int): except Exception as e:
return next(w for w in self.configs if w["id"] == id) self.error_msg = f"[CV2] {e}"
self.cap = None
return False
def get_config_by_name(self, name: str): def disconnect(self) -> None:
return next(w for w in self.configs if w["name"] == name) if self.cap:
self.cap.release()
self.cap = None
self.configs.clear()
def set_config(self, config, value: float): def get_frame(self):
if not self.cap: self.error_msg = None
return if self.cap is None or not self.cap.isOpened():
try: self.error_msg = "[CV2] Camera is not initialized."
self.cap.set(config["cv_prop"], value) return (False, None)
config["value"] = self.cap.get(config["cv_prop"]) # sprawdz co ustawiło
except Exception as e:
self.error_msg = f"[CV2] {e}"
def set_config_by_id(self, id: int, value: float): try:
config = self.get_config_by_id(id) ret, frame = self.cap.read()
self.set_config(config, value) if not ret:
self.error_msg = "[CV2] Failed to read frame."
return (False, None)
return (True, frame)
except Exception as e:
self.error_msg = f"[CV2] {e}"
return (False, None)
def set_config_by_name(self, name: str, value: float): def get_config_by_id(self, id: int):
config = self.get_config_by_name(name) return next(w for w in self.configs if w["id"] == id)
self.set_config(config, value)
def get_config_by_name(self, name: str):
return next(w for w in self.configs if w["name"] == name)
def set_config(self, config, value: float):
if not self.cap:
return
try:
self.cap.set(config["cv_prop"], value)
config["value"] = self.cap.get(
config["cv_prop"]) # sprawdz co ustawiło
except Exception as e:
self.error_msg = f"[CV2] {e}"
def set_config_by_id(self, id: int, value: float):
config = self.get_config_by_id(id)
self.set_config(config, value)
def set_config_by_name(self, name: str, value: float):
config = self.get_config_by_name(name)
self.set_config(config, value)