From 324ab2e016c49b5f94d79e959753e503e5ab14f0 Mon Sep 17 00:00:00 2001 From: bartool Date: Tue, 30 Sep 2025 21:50:30 +0200 Subject: [PATCH] refactor: update camera classes to improve initialization and connection handling --- core/camera/base_camera.py | 50 ++++----- core/camera/gphoto_camera.py | 198 ++++++++++++++++++++--------------- core/camera/opencv_camera.py | 171 ++++++++++++++++-------------- 3 files changed, 229 insertions(+), 190 deletions(-) diff --git a/core/camera/base_camera.py b/core/camera/base_camera.py index fb8961d..acaea6b 100644 --- a/core/camera/base_camera.py +++ b/core/camera/base_camera.py @@ -2,36 +2,36 @@ from abc import ABC, abstractmethod class BaseCamera(ABC): - def __init__(self) -> None: - self.error_msg = None + def __init__(self) -> None: + self.error_msg = None - @abstractmethod - def connect(self) -> bool: - raise NotImplementedError + @abstractmethod + def connect(self, index: int | None = None) -> bool: + raise NotImplementedError - @abstractmethod - def disconnect(self) -> None: - raise NotImplementedError + @abstractmethod + def disconnect(self) -> None: + raise NotImplementedError - @abstractmethod - def get_frame(self): - raise NotImplementedError + @abstractmethod + def get_frame(self): + raise NotImplementedError - @abstractmethod - def get_config_by_id(self, id: int) -> dict: - raise NotImplementedError + @abstractmethod + def get_config_by_id(self, id: int) -> dict: + raise NotImplementedError - @abstractmethod - def get_config_by_name(self, name: str) -> dict: - raise NotImplementedError + @abstractmethod + def get_config_by_name(self, name: str) -> dict: + raise NotImplementedError - @abstractmethod - def set_config_by_id(self, id: int, value) -> None: - raise NotImplementedError + @abstractmethod + def set_config_by_id(self, id: int, value) -> None: + raise NotImplementedError - @abstractmethod - def set_config_by_name(self, name: str, value) -> None: - raise NotImplementedError + @abstractmethod + def set_config_by_name(self, name: str, value) -> None: + raise NotImplementedError - def get_error_msg(self): - return str(self.error_msg) + def get_error_msg(self): + return str(self.error_msg) diff --git a/core/camera/gphoto_camera.py b/core/camera/gphoto_camera.py index aac0f10..a6ceb2c 100644 --- a/core/camera/gphoto_camera.py +++ b/core/camera/gphoto_camera.py @@ -7,116 +7,140 @@ import numpy as np from .base_camera import BaseCamera camera_widget_types = { - gp.GP_WIDGET_WINDOW: "GP_WIDGET_WINDOW", # type: ignore - gp.GP_WIDGET_SECTION: "GP_WIDGET_SECTION", # type: ignore - gp.GP_WIDGET_TEXT: "GP_WIDGET_TEXT", # type: ignore - gp.GP_WIDGET_RANGE: "GP_WIDGET_RANGE", # type: ignore - gp.GP_WIDGET_TOGGLE: "GP_WIDGET_TOGGLE", # type: ignore - gp.GP_WIDGET_RADIO: "GP_WIDGET_RADIO", # type: ignore - gp.GP_WIDGET_MENU: "GP_WIDGET_MENU", # type: ignore - gp.GP_WIDGET_BUTTON: "GP_WIDGET_BUTTON", # type: ignore - gp.GP_WIDGET_DATE: "GP_WIDGET_DATE", # type: ignore + gp.GP_WIDGET_WINDOW: "GP_WIDGET_WINDOW", # type: ignore + gp.GP_WIDGET_SECTION: "GP_WIDGET_SECTION", # type: ignore + gp.GP_WIDGET_TEXT: "GP_WIDGET_TEXT", # type: ignore + gp.GP_WIDGET_RANGE: "GP_WIDGET_RANGE", # type: ignore + gp.GP_WIDGET_TOGGLE: "GP_WIDGET_TOGGLE", # type: ignore + gp.GP_WIDGET_RADIO: "GP_WIDGET_RADIO", # type: ignore + gp.GP_WIDGET_MENU: "GP_WIDGET_MENU", # type: ignore + gp.GP_WIDGET_BUTTON: "GP_WIDGET_BUTTON", # type: ignore + gp.GP_WIDGET_DATE: "GP_WIDGET_DATE", # type: ignore } class GPhotoCamera(BaseCamera): - def __init__(self) -> None: - super().__init__() - self.camera = None - self.configs: List[dict] = [] + def __init__(self) -> None: + super().__init__() + self.camera = None + self.configs: List[dict] = [] + self.camera_list = [] - def connect(self) -> bool: - self.error_msg = None - try: - self.camera = gp.Camera() # type: ignore - self.camera.init() - config = self.camera.get_config() - self.read_config(config) - return True - except Exception as e: - self.error_msg = f"[GPHOTO2] {e}" - self.camera = None - return False + def detect(self) -> list: + self.camera_list.clear() + cameras = gp.check_result(gp.gp_camera_autodetect()) # type: ignore + # cameras = gp.Camera().autodetect() + if not cameras or cameras.count() == 0: # type: ignore + return [] + + for i in range(cameras.count()): # type: ignore + name = cameras.get_name(i) # type: ignore + port = cameras.get_value(i) # type: ignore + self.camera_list.append({"name": name, "port": port}) + return self.camera_list + + def connect(self, index: int | None = None) -> bool: + self.error_msg = None + self.camera = gp.Camera() # type: ignore + + try: + if index: + port_info_list = gp.PortInfoList() + port_info_list.load() - def disconnect(self) -> None: - if self.camera: - self.camera.exit() - self.camera = None - self.configs.clear() + port_address = self.camera_list[index]["port"] + port_index = port_info_list.lookup_path(port_address) + + self.camera.set_port_info(port_info_list[port_index]) + + self.camera.init() + config = self.camera.get_config() + self.read_config(config) + return True + except Exception as e: + self.error_msg = f"[GPHOTO2] {e}" + self.camera = None + return False - def get_frame(self): - self.error_msg = None + def disconnect(self) -> None: + if self.camera: + self.camera.exit() + self.camera = None + self.configs.clear() - if self.camera is None: - self.error_msg = "[GPHOTO2] Camera is not initialized." - return (False, None) + def get_frame(self): + self.error_msg = None - try: - file = self.camera.capture_preview() # type: ignore - data = file.get_data_and_size() - frame = np.frombuffer(data, dtype=np.uint8) - frame = cv2.imdecode(frame, cv2.IMREAD_COLOR) + if self.camera is None: + self.error_msg = "[GPHOTO2] Camera is not initialized." + return (False, None) - return (True, frame) - except Exception as e: - self.error_msg = f"[GPHOTO2] {e}" - return (False, None) + try: + file = self.camera.capture_preview() # type: ignore + data = file.get_data_and_size() + frame = np.frombuffer(data, dtype=np.uint8) + frame = cv2.imdecode(frame, cv2.IMREAD_COLOR) - def get_config_by_id(self, id: int): - return next(w for w in self.configs if w['id'] == id) + return (True, frame) + except Exception as e: + self.error_msg = f"[GPHOTO2] {e}" + return (False, None) - def get_config_by_name(self, name: str): - return next(w for w in self.configs if w['name'] == name) + def get_config_by_id(self, id: int): + return next(w for w in self.configs if w['id'] == id) - def set_config(self, config, value): - if value not in config['choices']: - return + def get_config_by_name(self, name: str): + return next(w for w in self.configs if w['name'] == name) - config['widget'].set_value(value) # type: ignore - if self._save_config(config): - config['value'] = value + def set_config(self, config, value): + if value not in config['choices']: + return - def set_config_by_id(self, id: int, value: str): - config = self.get_config_by_id(id) + config['widget'].set_value(value) # type: ignore + if self._save_config(config): + config['value'] = value - self.set_config(config, value) + def set_config_by_id(self, id: int, value: str): + config = self.get_config_by_id(id) - def set_config_by_name(self, name: str, value: str): - config = self.get_config_by_name(name) + self.set_config(config, value) - self.set_config(config, value) + def set_config_by_name(self, name: str, value: str): + config = self.get_config_by_name(name) - def _save_config(self, config): - if not self.camera: - return False + self.set_config(config, value) - self.camera.set_single.config(config['name'], config['widget']) - return True + def _save_config(self, config): + if not self.camera: + return False - def parse_config(self, config): - new_config = { - "id": config.get_id(), - "name": config.get_name(), - "label": config.get_label(), - "type": camera_widget_types[config.get_type()], - "widget": config - } + self.camera.set_single.config(config['name'], config['widget']) + return True - try: - new_config["value"] = config.get_value() - except gp.GPhoto2Error: - pass + def parse_config(self, config): + new_config = { + "id": config.get_id(), + "name": config.get_name(), + "label": config.get_label(), + "type": camera_widget_types[config.get_type()], + "widget": config + } - try: - new_config["choices"] = list(config.get_choices()) - except gp.GPhoto2Error: - pass + try: + new_config["value"] = config.get_value() + except gp.GPhoto2Error: + pass - return new_config + try: + new_config["choices"] = list(config.get_choices()) + except gp.GPhoto2Error: + pass - def read_config(self, config): - self.configs.append(self.parse_config(config)) + return new_config - for i in range(config.count_children()): - child = config.get_child(i) - self.read_config(child) + def read_config(self, config): + self.configs.append(self.parse_config(config)) + + for i in range(config.count_children()): + child = config.get_child(i) + self.read_config(child) diff --git a/core/camera/opencv_camera.py b/core/camera/opencv_camera.py index 07816c5..a6a20b5 100644 --- a/core/camera/opencv_camera.py +++ b/core/camera/opencv_camera.py @@ -1,98 +1,113 @@ import cv2 +from cv2_enumerate_cameras import enumerate_cameras from typing import List from .base_camera import BaseCamera - - class CvCamera(BaseCamera): - """Kamera oparta na cv2.VideoCapture""" + """Kamera oparta na cv2.VideoCapture""" - config_map = { - 0: {"name": "frame_width", "cv_prop": cv2.CAP_PROP_FRAME_WIDTH, "default": 640}, - 1: {"name": "frame_height", "cv_prop": cv2.CAP_PROP_FRAME_HEIGHT, "default": 480}, - 2: {"name": "fps", "cv_prop": cv2.CAP_PROP_FPS, "default": 30}, - 3: {"name": "brightness", "cv_prop": cv2.CAP_PROP_BRIGHTNESS, "default": 0.5}, - 4: {"name": "contrast", "cv_prop": cv2.CAP_PROP_CONTRAST, "default": 0.5}, - 5: {"name": "saturation", "cv_prop": cv2.CAP_PROP_SATURATION, "default": 0.5}, - } + config_map = { + 0: {"name": "frame_width", "cv_prop": cv2.CAP_PROP_FRAME_WIDTH, "default": 640}, + 1: {"name": "frame_height", "cv_prop": cv2.CAP_PROP_FRAME_HEIGHT, "default": 480}, + 2: {"name": "fps", "cv_prop": cv2.CAP_PROP_FPS, "default": 30}, + 3: {"name": "brightness", "cv_prop": cv2.CAP_PROP_BRIGHTNESS, "default": 0.5}, + 4: {"name": "contrast", "cv_prop": cv2.CAP_PROP_CONTRAST, "default": 0.5}, + 5: {"name": "saturation", "cv_prop": cv2.CAP_PROP_SATURATION, "default": 0.5}, + } - def __init__(self, device_index: int = 0) -> None: - super().__init__() - self.device_index = device_index - self.cap = None - self.configs: List[dict] = [] + def __init__(self) -> None: + super().__init__() + self.cap = None + self.configs: List[dict] = [] + self.camera_list = [] + self.camera_index = 0 - def connect(self) -> bool: - self.error_msg = None - try: - self.cap = cv2.VideoCapture(self.device_index) - if not self.cap.isOpened(): - self.error_msg = f"[CV2] Could not open camera {self.device_index}" - return False + def detect(self) -> list: + self.camera_list.clear() + self.camera_list = enumerate_cameras(cv2.CAP_ANY) - self.configs.clear() - for id, conf in self.config_map.items(): - value = self.cap.get(conf["cv_prop"]) - self.configs.append( - { - "id": id, - "name": conf["name"], - "label": conf["name"].capitalize(), - "value": value, - "choices": None, # brak predefiniowanych wyborów - "cv_prop": conf["cv_prop"], - } - ) - return True + result = [] + for camera in self.camera_list: + result.append({"name": camera.name, "port": camera.path}) - except Exception as e: - self.error_msg = f"[CV2] {e}" - self.cap = None - return False + return result - def disconnect(self) -> None: - if self.cap: - self.cap.release() - self.cap = None - self.configs.clear() + def connect(self, index: int | None = None) -> bool: + self.error_msg = None + try: + if index: + self.camera_index = self.camera_list[index].index - def get_frame(self): - self.error_msg = None - if self.cap is None or not self.cap.isOpened(): - self.error_msg = "[CV2] Camera is not initialized." - return (False, None) + self.cap = cv2.VideoCapture(self.camera_index) - try: - ret, frame = self.cap.read() - if not ret: - self.error_msg = "[CV2] Failed to read frame." - return (False, None) - return (True, frame) - except Exception as e: - self.error_msg = f"[CV2] {e}" - return (False, None) + if not self.cap.isOpened(): + self.error_msg = f"[CV2] Could not open camera {self.camera_index}" + return False - def get_config_by_id(self, id: int): - return next(w for w in self.configs if w["id"] == id) + self.configs.clear() + for id, conf in self.config_map.items(): + value = self.cap.get(conf["cv_prop"]) + self.configs.append( + { + "id": id, + "name": conf["name"], + "label": conf["name"].capitalize(), + "value": value, + "choices": None, # brak predefiniowanych wyborów + "cv_prop": conf["cv_prop"], + } + ) + return True - def get_config_by_name(self, name: str): - return next(w for w in self.configs if w["name"] == name) + except Exception as e: + self.error_msg = f"[CV2] {e}" + self.cap = None + return False - def set_config(self, config, value: float): - if not self.cap: - return - try: - self.cap.set(config["cv_prop"], value) - config["value"] = self.cap.get(config["cv_prop"]) # sprawdz co ustawiło - except Exception as e: - self.error_msg = f"[CV2] {e}" + def disconnect(self) -> None: + if self.cap: + self.cap.release() + self.cap = None + self.configs.clear() - def set_config_by_id(self, id: int, value: float): - config = self.get_config_by_id(id) - self.set_config(config, value) + def get_frame(self): + self.error_msg = None + if self.cap is None or not self.cap.isOpened(): + self.error_msg = "[CV2] Camera is not initialized." + return (False, None) - def set_config_by_name(self, name: str, value: float): - config = self.get_config_by_name(name) - self.set_config(config, value) + try: + ret, frame = self.cap.read() + if not ret: + self.error_msg = "[CV2] Failed to read frame." + return (False, None) + return (True, frame) + except Exception as e: + self.error_msg = f"[CV2] {e}" + return (False, None) + + def get_config_by_id(self, id: int): + return next(w for w in self.configs if w["id"] == id) + + def get_config_by_name(self, name: str): + return next(w for w in self.configs if w["name"] == name) + + def set_config(self, config, value: float): + if not self.cap: + return + try: + self.cap.set(config["cv_prop"], value) + config["value"] = self.cap.get( + config["cv_prop"]) # sprawdz co ustawiło + except Exception as e: + self.error_msg = f"[CV2] {e}" + + def set_config_by_id(self, id: int, value: float): + config = self.get_config_by_id(id) + self.set_config(config, value) + + def set_config_by_name(self, name: str, value: float): + config = self.get_config_by_name(name) + self.set_config(config, value)