12 Commits

14 changed files with 666 additions and 398 deletions

View File

@@ -1,31 +1,97 @@
# import gphoto2 as gp
import numpy as np
import cv2
from PySide6.QtCore import QObject, QThread, Signal from PySide6.QtCore import QObject, QThread, Signal
from ..core.base import BaseImageSource, BaseControlSource from PySide6.QtGui import QImage, QPixmap
# try:
# import gphoto2 as gp
# except:
from . import mock_gphoto as gp
class CameraWorker(QObject):
frameReady = Signal(QPixmap)
errorOccurred = Signal(str)
def __init__(self, fps: int = 15, parent=None):
super().__init__(parent)
self.fps = fps
self.running = False
self.camera = None
def start_camera(self):
"""Uruchom kamerę i zacznij pobierać klatki"""
try:
self.camera = gp.Camera() # type: ignore
self.camera.init()
self.running = True
self._capture_loop()
except gp.GPhoto2Error as e:
self.errorOccurred.emit(f"Błąd inicjalizacji kamery: {e}")
def stop_camera(self):
"""Zatrzymaj pobieranie"""
self.running = False
if self.camera:
try:
self.camera.exit()
except gp.GPhoto2Error:
pass
self.camera = None
def _capture_loop(self):
"""Pętla odczytu klatek w osobnym wątku"""
import time
delay = 1.0 / self.fps
while self.running:
try:
file = self.camera.capture_preview() # type: ignore
data = file.get_data_and_size()
frame = np.frombuffer(data, dtype=np.uint8)
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
if frame is not None:
rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w, ch = rgb_image.shape
qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888)
pixmap = QPixmap.fromImage(qimg)
self.frameReady.emit(pixmap)
except gp.GPhoto2Error as e:
self.errorOccurred.emit(f"Błąd odczytu LiveView: {e}")
break
except Exception as e:
self.errorOccurred.emit(f"Nieoczekiwany błąd: {e}")
break
time.sleep(delay)
class CameraController(QObject): class CameraController(QObject):
new_frame = Signal(object) frameReady = Signal(QPixmap)
errorOccurred = Signal(str) errorOccurred = Signal(str)
def __init__(self, image_source: BaseImageSource, control_source: BaseControlSource, parent=None): def __init__(self, fps: int = 15, parent=None):
super().__init__(parent) super().__init__(parent)
self.image_source = image_source self.camera_thread = QThread()
self.control_source = control_source self.worker = CameraWorker(fps)
self.worker.moveToThread(self.camera_thread )
self.camera_thread = QThread() # sygnały z workera
self.moveToThread(self.camera_thread) self.worker.frameReady.connect(self.frameReady)
self.worker.errorOccurred.connect(self.errorOccurred)
self.image_source.moveToThread(self.camera_thread) # sygnały start/stop
self.control_source.moveToThread(self.camera_thread) self.camera_thread.started.connect(self.worker.start_camera)
self.image_source.new_frame.connect(self.new_frame)
self.image_source.errorOccurred.connect(self.errorOccurred)
self.control_source.errorOccurred.connect(self.errorOccurred)
def start(self): def start(self):
"""Start kamery w osobnym wątku"""
self.camera_thread.start() self.camera_thread.start()
self.image_source.start()
def stop(self): def stop(self):
self.image_source.stop() """Stop kamery i zakończenie wątku"""
self.worker.stop_camera()
self.camera_thread.quit() self.camera_thread.quit()
self.camera_thread.wait() self.camera_thread.wait()

View File

@@ -5,74 +5,96 @@ from core.media import MediaRepository
from ui.widgets.color_list_widget import ColorListWidget from ui.widgets.color_list_widget import ColorListWidget
from ui.widgets.thumbnail_list_widget import ThumbnailListWidget from ui.widgets.thumbnail_list_widget import ThumbnailListWidget
from ui.widgets.split_view_widget import SplitView from ui.widgets.split_view_widget import SplitView
from .camera_controller import CameraController # from .camera_controller import CameraController
from ..core.gphoto_adapter import GPhotoImageSource, GPhotoControlSource from core.camera.camera_controller import CameraController
import gphoto2 as gp from core.camera.camera_manager import CameraManager
from core.camera.gphoto_camera import GPhotoCamera
from core.camera.camera_controller import CameraController
class MainController: class MainController:
def __init__(self, view): def __init__(self, view):
self.db = DatabaseManager() self.db = DatabaseManager()
self.db.connect() self.db.connect()
self.media_repo = MediaRepository(self.db) self.media_repo = MediaRepository(self.db)
self.media_repo.sync_media() self.media_repo.sync_media()
camera = gp.Camera() # camera = GPhotoCamera()
camera.init() # self.manager = CameraController(camera)
stream = GPhotoImageSource(camera=camera, fps=15) manager = CameraManager()
controll = GPhotoControlSource(camera=camera) manager.detect_gphoto()
self.camera_controller = CameraController(stream, controll) manager.detect_opencv()
self.view = view # self.camera_controller = CameraController()
self.color_list: ColorListWidget = view.color_list_widget
self.thumbnail_list: ThumbnailListWidget = view.thumbnail_widget
self.split_view: SplitView = view.preview_widget
self.photo_button: QPushButton = view.photo_button self.view = view
self.photo_button.clicked.connect(self.take_photo) self.color_list: ColorListWidget = view.color_list_widget
self.thumbnail_list: ThumbnailListWidget = view.thumbnail_widget
self.split_view: SplitView = view.preview_widget
self.color_list.colorSelected.connect(self.on_color_selected) self.photo_button: QPushButton = view.photo_button
self.color_list.editColor.connect(self.on_edit_color) self.photo_button.clicked.connect(self.take_photo)
self.thumbnail_list.selectedThumbnail.connect(self.on_thumbnail_selected)
self.camera_controller.errorOccurred.connect(self.split_view.widget_start.set_info_text) self.record_button: QPushButton = view.record_button
self.camera_controller.new_frame.connect(self.split_view.set_live_image) # self.record_button.clicked.connect(self.fun_test)
self.split_view.widget_start.camera_start_btn.clicked.connect(self.camera_controller.start)
def start_camera(self): self.color_list.colorSelected.connect(self.on_color_selected)
pass self.color_list.editColor.connect(self.on_edit_color)
self.thumbnail_list.selectedThumbnail.connect(self.on_thumbnail_selected)
def load_colors(self) -> None: # self.camera_controller.errorOccurred.connect(self.split_view.widget_start.set_info_text)
colors = self.db.get_all_colors() # self.manager.error_occurred.connect(self.split_view.widget_start.set_info_text)
print("Loaded colors:", colors) # self.camera_controller.frameReady.connect(self.split_view.set_live_image)
self.color_list.set_colors(colors) # self.manager.frame_ready.connect(self.split_view.set_live_image)
# self.split_view.widget_start.camera_start_btn.clicked.connect(self.camera_controller.start)
self.split_view.widget_start.camera_start_btn.clicked.connect(self.start_liveview)
def on_color_selected(self, color_name: str): def start_camera(self):
print(f"Wybrano kolor: {color_name}") pass
color_id = self.db.get_color_id(color_name)
if color_id is not None:
media_items = self.db.get_media_for_color(color_id)
print(f"Media dla koloru {color_name} (ID: {color_id}):", media_items)
self.thumbnail_list.list_widget.clear() def load_colors(self) -> None:
for media in media_items: colors = self.db.get_all_colors()
if media['file_type'] == 'photo': print("Loaded colors:", colors)
file_name = Path(media['media_path']).name self.color_list.set_colors(colors)
self.thumbnail_list.add_thumbnail(media['media_path'], file_name, media['id'])
else:
print(f"Nie znaleziono koloru o nazwie: {color_name}")
def on_edit_color(self, color_name: str):
print(f"Edycja koloru: {color_name}")
def on_thumbnail_selected(self, media_id: int): def on_color_selected(self, color_name: str):
media = self.db.get_media(media_id) print(f"Wybrano kolor: {color_name}")
if media: color_id = self.db.get_color_id(color_name)
print(f"Wybrano miniaturę o ID: {media_id}, ścieżka: {media['media_path']}") if color_id is not None:
self.split_view.set_reference_image(media['media_path']) media_items = self.db.get_media_for_color(color_id)
else: print(f"Media dla koloru {color_name} (ID: {color_id}):", media_items)
print(f"Nie znaleziono mediów o ID: {media_id}")
def take_photo(self): self.thumbnail_list.list_widget.clear()
print("Robienie zdjęcia...") for media in media_items:
self.split_view.toglle_live_view() if media['file_type'] == 'photo':
file_name = Path(media['media_path']).name
self.thumbnail_list.add_thumbnail(media['media_path'], file_name, media['id'])
else:
print(f"Nie znaleziono koloru o nazwie: {color_name}")
def on_edit_color(self, color_name: str):
print(f"Edycja koloru: {color_name}")
def on_thumbnail_selected(self, media_id: int):
media = self.db.get_media(media_id)
if media:
print(f"Wybrano miniaturę o ID: {media_id}, ścieżka: {media['media_path']}")
self.split_view.set_reference_image(media['media_path'])
else:
print(f"Nie znaleziono mediów o ID: {media_id}")
def take_photo(self):
print("Robienie zdjęcia...")
self.split_view.toglle_live_view()
def start_liveview(self):
pass
# self.manager.start_camera()
# self.manager.start_stream()
def shutdown(self):
pass
# self.manager.stop()

View File

@@ -1,26 +0,0 @@
from PySide6.QtCore import QObject, Signal
from PySide6.QtGui import QPixmap
class BaseImageSource(QObject):
new_frame = Signal(QPixmap)
errorOccurred = Signal(str)
def start(self):
raise NotImplementedError
def stop(self):
raise NotImplementedError
class BaseControlSource(QObject):
errorOccurred = Signal(str)
parameterChanged = Signal(str, object)
def set_parameter(self, name: str, value):
raise NotImplementedError
def get_parameter(self, name: str):
raise NotImplementedError
def list_parameters(self) -> dict:
raise NotImplementedError

View File

@@ -0,0 +1,42 @@
from abc import ABC, abstractmethod
class BaseCamera(ABC):
def __init__(self) -> None:
self.error_msg = None
@staticmethod
@abstractmethod
def detect() -> dict:
raise NotImplementedError
@abstractmethod
def connect(self, index: int | None = None) -> bool:
raise NotImplementedError
@abstractmethod
def disconnect(self) -> None:
raise NotImplementedError
@abstractmethod
def get_frame(self):
raise NotImplementedError
@abstractmethod
def get_config_by_id(self, id: int) -> dict:
raise NotImplementedError
@abstractmethod
def get_config_by_name(self, name: str) -> dict:
raise NotImplementedError
@abstractmethod
def set_config_by_id(self, id: int, value) -> None:
raise NotImplementedError
@abstractmethod
def set_config_by_name(self, name: str, value) -> None:
raise NotImplementedError
def get_error_msg(self):
return str(self.error_msg)

View File

@@ -0,0 +1,115 @@
from PySide6.QtCore import QObject, QThread, QTimer, Signal, Slot, QMutex, QMutexLocker
from PySide6.QtGui import QImage, QPixmap
import cv2
from .base_camera import BaseCamera
class CameraController(QThread):
frame_ready = Signal(QPixmap)
photo_ready = Signal(QPixmap)
error_occurred = Signal(str)
_enable_timer = Signal(bool)
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self.camera = None
self.timer = None
self.fps = 15
self.is_streaming = False
self.is_connected = False
self._camera_mutex = QMutex()
self.start()
def run(self) -> None:
self.timer = QTimer()
self.timer.timeout.connect(self._update_frame)
self._enable_timer.connect(self._set_timer)
self.exec()
def stop(self):
self.stop_camera()
self.quit()
self.wait()
def set_camera(self, camera: BaseCamera, fps: int = 15) -> None:
with QMutexLocker(self._camera_mutex):
self.stop_stream()
self.stop_camera()
self.camera = camera
self.fps = fps
def start_camera(self) -> None:
if self.camera is None or self.is_connected:
return
if self.camera.connect():
self.is_connected = True
else:
self.is_connected = False
self.error_occurred.emit(self.camera.get_error_msg())
def stop_camera(self) -> None:
if self.is_streaming:
self.stop_stream()
if self.camera is not None:
self.camera.disconnect()
self.is_connected = False
def start_stream(self):
if not self.is_connected:
return
if self.is_streaming:
return
if self.timer:
self.is_streaming = True
# self.timer.start()
self._enable_timer.emit(True)
def stop_stream(self) -> None:
if self.is_streaming:
self.is_streaming = False
if self.timer:
# self.timer.stop()
self._enable_timer.emit(False)
def _update_frame(self) -> None:
with QMutexLocker(self._camera_mutex):
if self.camera is None or not self.is_connected:
return
if not self.is_streaming:
return
ret, frame = self.camera.get_frame()
if not ret:
self.error_occurred.emit(self.camera.get_error_msg())
return
if frame is not None:
rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w, ch = rgb_image.shape
qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888)
pixmap = QPixmap.fromImage(qimg)
self.frame_ready.emit(pixmap)
def _set_timer(self, enable: bool):
if not self.timer:
return
if enable:
self.timer.setInterval(int(1000 / self.fps))
self.timer.start()
else:
self.timer.stop()

View File

@@ -0,0 +1,20 @@
from .gphoto_camera import GPhotoCamera
from .opencv_camera import OpenCvCamera
from .camera_controller import CameraController
class CameraManager:
def __init__(self) -> None:
pass
def detect_gphoto(self):
camera_list = GPhotoCamera.detect()
print(camera_list)
return camera_list
def detect_opencv(self):
camera_list = OpenCvCamera.detect()
print(camera_list)
return camera_list

View File

@@ -0,0 +1,149 @@
from typing import Optional, List
from dataclasses import dataclass, field
import gphoto2 as gp
import cv2
import numpy as np
from .base_camera import BaseCamera
camera_widget_types = {
gp.GP_WIDGET_WINDOW: "GP_WIDGET_WINDOW", # type: ignore
gp.GP_WIDGET_SECTION: "GP_WIDGET_SECTION", # type: ignore
gp.GP_WIDGET_TEXT: "GP_WIDGET_TEXT", # type: ignore
gp.GP_WIDGET_RANGE: "GP_WIDGET_RANGE", # type: ignore
gp.GP_WIDGET_TOGGLE: "GP_WIDGET_TOGGLE", # type: ignore
gp.GP_WIDGET_RADIO: "GP_WIDGET_RADIO", # type: ignore
gp.GP_WIDGET_MENU: "GP_WIDGET_MENU", # type: ignore
gp.GP_WIDGET_BUTTON: "GP_WIDGET_BUTTON", # type: ignore
gp.GP_WIDGET_DATE: "GP_WIDGET_DATE", # type: ignore
}
class GPhotoCamera(BaseCamera):
def __init__(self) -> None:
super().__init__()
self.camera = None
self.configs: List[dict] = []
self.camera_index = 0
@staticmethod
def detect() -> dict:
cameras = gp.check_result(gp.gp_camera_autodetect()) # type: ignore
# cameras = gp.Camera().autodetect()
if not cameras or cameras.count() == 0: # type: ignore
return {}
camera_list = {}
for i in range(cameras.count()): # type: ignore
name = cameras.get_name(i) # type: ignore
port = cameras.get_value(i) # type: ignore
camera_list[i] = {"name": name, "port": port}
return camera_list
def connect(self, index: int | None = None) -> bool:
self.error_msg = None
self.camera = gp.Camera() # type: ignore
try:
if index:
self.camera_index = index
camera_list = GPhotoCamera.detect()
port_info_list = gp.PortInfoList()
port_info_list.load()
port_address = camera_list[index]["port"]
port_index = port_info_list.lookup_path(port_address)
self.camera.set_port_info(port_info_list[port_index])
self.camera.init()
config = self.camera.get_config()
self.read_config(config)
return True
except Exception as e:
self.error_msg = f"[GPHOTO2] {e}"
self.camera = None
return False
def disconnect(self) -> None:
if self.camera:
self.camera.exit()
self.camera = None
self.configs.clear()
def get_frame(self):
self.error_msg = None
if self.camera is None:
self.error_msg = "[GPHOTO2] Camera is not initialized."
return (False, None)
try:
file = self.camera.capture_preview() # type: ignore
data = file.get_data_and_size()
frame = np.frombuffer(data, dtype=np.uint8)
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
return (True, frame)
except Exception as e:
self.error_msg = f"[GPHOTO2] {e}"
return (False, None)
def get_config_by_id(self, id: int):
return next(w for w in self.configs if w['id'] == id)
def get_config_by_name(self, name: str):
return next(w for w in self.configs if w['name'] == name)
def set_config(self, config, value):
if value not in config['choices']:
return
config['widget'].set_value(value) # type: ignore
if self._save_config(config):
config['value'] = value
def set_config_by_id(self, id: int, value: str):
config = self.get_config_by_id(id)
self.set_config(config, value)
def set_config_by_name(self, name: str, value: str):
config = self.get_config_by_name(name)
self.set_config(config, value)
def _save_config(self, config):
if not self.camera:
return False
self.camera.set_single.config(config['name'], config['widget'])
return True
def parse_config(self, config):
new_config = {
"id": config.get_id(),
"name": config.get_name(),
"label": config.get_label(),
"type": camera_widget_types[config.get_type()],
"widget": config
}
try:
new_config["value"] = config.get_value()
except gp.GPhoto2Error:
pass
try:
new_config["choices"] = list(config.get_choices())
except gp.GPhoto2Error:
pass
return new_config
def read_config(self, config):
self.configs.append(self.parse_config(config))
for i in range(config.count_children()):
child = config.get_child(i)
self.read_config(child)

View File

@@ -0,0 +1,128 @@
import cv2
from cv2_enumerate_cameras import enumerate_cameras
from typing import List
from .base_camera import BaseCamera
class OpenCvCamera(BaseCamera):
"""Kamera oparta na cv2.VideoCapture"""
config_map = {
0: {"name": "frame_width", "cv_prop": cv2.CAP_PROP_FRAME_WIDTH, "default": 640},
1: {"name": "frame_height", "cv_prop": cv2.CAP_PROP_FRAME_HEIGHT, "default": 480},
2: {"name": "fps", "cv_prop": cv2.CAP_PROP_FPS, "default": 30},
3: {"name": "brightness", "cv_prop": cv2.CAP_PROP_BRIGHTNESS, "default": 0.5},
4: {"name": "contrast", "cv_prop": cv2.CAP_PROP_CONTRAST, "default": 0.5},
5: {"name": "saturation", "cv_prop": cv2.CAP_PROP_SATURATION, "default": 0.5},
}
def __init__(self) -> None:
super().__init__()
self.cap = None
self.configs: List[dict] = []
self.camera_list = []
self.camera_index = 0
@staticmethod
def detect():
camera_list = enumerate_cameras(cv2.CAP_ANY)
result = {}
seen_ports = set()
for camera in camera_list:
# unikamy duplikatów tego samego /dev/videoX albo tej samej ścieżki na Windows/macOS
if camera.path in seen_ports:
continue
cap = cv2.VideoCapture(camera.index, camera.backend) # próbujemy otworzyć
ret, frame = cap.read()
cap.release()
if ret and frame is not None and frame.size > 0:
result[camera.index] = {
"name": camera.name,
"port": camera.path,
"backend": camera.backend,
}
seen_ports.add(camera.path)
return result
def connect(self, index: int | None = None) -> bool:
self.error_msg = None
try:
if index:
self.camera_index = index
self.cap = cv2.VideoCapture(self.camera_index)
if not self.cap.isOpened():
self.error_msg = f"[CV2] Could not open camera {self.camera_index}"
return False
self.configs.clear()
for id, conf in self.config_map.items():
value = self.cap.get(conf["cv_prop"])
self.configs.append(
{
"id": id,
"name": conf["name"],
"label": conf["name"].capitalize(),
"value": value,
"choices": None, # brak predefiniowanych wyborów
"cv_prop": conf["cv_prop"],
}
)
return True
except Exception as e:
self.error_msg = f"[CV2] {e}"
self.cap = None
return False
def disconnect(self) -> None:
if self.cap:
self.cap.release()
self.cap = None
self.configs.clear()
def get_frame(self):
self.error_msg = None
if self.cap is None or not self.cap.isOpened():
self.error_msg = "[CV2] Camera is not initialized."
return (False, None)
try:
ret, frame = self.cap.read()
if not ret:
self.error_msg = "[CV2] Failed to read frame."
return (False, None)
return (True, frame)
except Exception as e:
self.error_msg = f"[CV2] {e}"
return (False, None)
def get_config_by_id(self, id: int):
return next(w for w in self.configs if w["id"] == id)
def get_config_by_name(self, name: str):
return next(w for w in self.configs if w["name"] == name)
def set_config(self, config, value: float):
if not self.cap:
return
try:
self.cap.set(config["cv_prop"], value)
config["value"] = self.cap.get(
config["cv_prop"]) # sprawdz co ustawiło
except Exception as e:
self.error_msg = f"[CV2] {e}"
def set_config_by_id(self, id: int, value: float):
config = self.get_config_by_id(id)
self.set_config(config, value)
def set_config_by_name(self, name: str, value: float):
config = self.get_config_by_name(name)
self.set_config(config, value)

View File

@@ -1,69 +0,0 @@
import cv2
import gphoto2 as gp
from controllers.camera_controller import CameraController
from .gphoto_adapter import GPhotoImageSource, GPhotoControlSource
from .opencv_adapter import OpenCVImageSource, OpenCVControlSource
class CameraManager:
def __init__(self):
self.devices = [] # lista wykrytych kamer
def detect_devices(self):
self.devices.clear()
# --- Wykrywanie webcamów / grabberów HDMI
for index in range(5): # sprawdź kilka indeksów
cap = cv2.VideoCapture(index)
if cap.isOpened():
self.devices.append({
"id": f"opencv:{index}",
"name": f"Webcam / HDMI Grabber #{index}",
"type": "opencv",
"index": index
})
cap.release()
# --- Wykrywanie kamer gphoto2
cameras = gp.Camera.autodetect() # type: ignore
for i, (name, addr) in enumerate(cameras):
self.devices.append({
"id": f"gphoto:{i}",
"name": f"{name} ({addr})",
"type": "gphoto",
"addr": addr
})
return self.devices
def create_controller(self, device_id, hybrid_with=None):
"""
Tworzy CameraController na podstawie id urządzenia.
Można podać hybrid_with="opencv" albo "gphoto" żeby zbudować hybrydę.
"""
device = next((d for d in self.devices if d["id"] == device_id), None)
if not device:
raise ValueError(f"Nie znaleziono urządzenia {device_id}")
# Webcam / grabber
if device["type"] == "opencv":
cap = cv2.VideoCapture(device["index"])
img = OpenCVImageSource(device["index"])
ctrl = OpenCVControlSource(cap)
return CameraController(img, ctrl)
# GPhoto camera
elif device["type"] == "gphoto":
cam = gp.Camera() # type: ignore
cam.init()
img = GPhotoImageSource(cam)
ctrl = GPhotoControlSource(cam)
return CameraController(img, ctrl)
# Hybrydowy tryb
elif device["type"] == "hybrid":
raise NotImplementedError("Tu możesz połączyć OpenCV + GPhoto w hybrydę")
else:
raise ValueError(f"Nieobsługiwany typ urządzenia: {device['type']}")

View File

@@ -1,84 +0,0 @@
import numpy as np
import cv2
from PySide6.QtCore import QObject, QThread, Signal, QTimer
from PySide6.QtGui import QImage, QPixmap
from .base import BaseImageSource, BaseControlSource
import gphoto2 as gp
# try:
# import gphoto2 as gp
# except:
# from . import mock_gphoto as gp
class GPhotoImageSource(BaseImageSource):
def __init__(self, camera: gp.Camera, fps=10, parent=None): # type: ignore
super().__init__(parent)
self.camera = camera
self.fps = fps
self.timer = None
def start(self):
self.timer = QTimer()
self.timer.timeout.connect(self._grab_frame)
self.timer.start(int(1000 / self.fps))
def stop(self):
if self.timer:
self.timer.stop()
def _grab_frame(self):
try:
file = self.camera.capture_preview()
data = file.get_data_and_size()
frame = np.frombuffer(data, dtype=np.uint8)
frame = cv2.imdecode(frame, cv2.IMREAD_COLOR)
if frame is None:
return
rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w, ch = rgb_image.shape
qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888)
pixmap = QPixmap.fromImage(qimg)
self.new_frame.emit(pixmap)
except gp.GPhoto2Error as e:
self.errorOccurred.emit(f"GPhoto2 error: {e}")
class GPhotoControlSource(BaseControlSource):
def __init__(self, camera: gp.Camera, parent=None): # type: ignore
super().__init__(parent)
self.camera = camera
def set_parameter(self, name, value):
try:
config = self.camera.get_config()
child = config.get_child_by_name(name)
child.set_value(value)
self.camera.set_config(config)
self.parameterChanged.emit(name, value)
except gp.GPhoto2Error as e:
self.errorOccurred.emit(str(e))
def get_parameter(self, name):
try:
config = self.camera.get_config()
child = config.get_child_by_name(name)
return child.get_value()
except gp.GPhoto2Error as e:
self.errorOccurred.emit(str(e))
return None
def list_parameters(self):
params = {}
try:
config = self.camera.get_config()
for child in config.get_children():
params[child.get_name()] = child.get_value()
except gp.GPhoto2Error as e:
self.errorOccurred.emit(str(e))
return params

View File

@@ -1,64 +0,0 @@
import cv2
import numpy as np
class GPhoto2Error(Exception):
pass
class CameraFileMock:
"""Mock obiektu zwracanego przez gphoto2.Camera.capture_preview()"""
def __init__(self, frame: np.ndarray):
# Kodowanie do JPEG, żeby symulować prawdziwe dane z kamery
success, buf = cv2.imencode(".jpg", frame)
if not success:
raise GPhoto2Error("Nie udało się zakodować ramki testowej.")
self._data = buf.tobytes()
def get_data_and_size(self):
return self._data
return self._data, len(self._data)
class Camera:
def __init__(self):
self._frame_counter = 0
self._running = False
def init(self):
self._running = True
print("[my_gphoto] Kamera MOCK zainicjalizowana")
def exit(self):
self._running = False
print("[my_gphoto] Kamera MOCK wyłączona")
def capture_preview(self):
if not self._running:
raise GPhoto2Error("Kamera MOCK nie jest uruchomiona")
# przykład 1: wczytaj stały obrazek z pliku
# frame = cv2.imread("test_frame.jpg")
# if frame is None:
# raise GPhoto2Error("Nie znaleziono test_frame.jpg")
# przykład 2: wygeneruj kolorową planszę
h, w = 480, 640
color = (self._frame_counter % 255, 100, 200)
frame = np.full((h, w, 3), color, dtype=np.uint8)
# dodanie napisu
text = "OBRAZ TESTOWY"
font = cv2.FONT_HERSHEY_SIMPLEX
scale = 1.5
thickness = 3
color_text = (255, 255, 255)
(text_w, text_h), _ = cv2.getTextSize(text, font, scale, thickness)
x = (w - text_w) // 2
y = (h + text_h) // 2
cv2.putText(frame, text, (x, y), font, scale, color_text, thickness, cv2.LINE_AA)
self._frame_counter += 1
return CameraFileMock(frame)

View File

@@ -1,83 +0,0 @@
from PySide6.QtCore import QObject, Signal, QTimer
from PySide6.QtGui import QImage, QPixmap
import cv2
import numpy as np
from .base import BaseImageSource, BaseControlSource
class OpenCVImageSource(BaseImageSource):
def __init__(self, device_index=0, fps=30, parent=None):
super().__init__(parent)
self.device_index = device_index
self.fps = fps
self.cap = None
self.timer = None
def start(self):
self.cap = cv2.VideoCapture(self.device_index)
if not self.cap.isOpened():
self.errorOccurred.emit(f"Nie mogę otworzyć kamery {self.device_index}")
return
self.timer = QTimer()
self.timer.timeout.connect(self._grab_frame)
self.timer.start(int(1000 / self.fps))
def stop(self):
if self.timer:
self.timer.stop()
if self.cap:
self.cap.release()
def _grab_frame(self):
if self.cap is None:
self.errorOccurred.emit(f"Kamera niezaincjalizowana!")
return
ret, frame = self.cap.read()
if not ret:
self.errorOccurred.emit("Brak obrazu z kamery OpenCV")
return
rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w, ch = rgb_image.shape
qimg = QImage(rgb_image.data, w, h, ch * w, QImage.Format.Format_RGB888)
pixmap = QPixmap.fromImage(qimg)
self.new_frame.emit(pixmap)
class OpenCVControlSource(BaseControlSource):
def __init__(self, cap: cv2.VideoCapture, parent=None):
super().__init__(parent)
self.cap = cap
def set_parameter(self, name, value):
prop_id = getattr(cv2, name, None)
if prop_id is None:
self.errorOccurred.emit(f"Nieznany parametr {name}")
return
self.cap.set(prop_id, value)
self.parameterChanged.emit(name, value)
def get_parameter(self, name):
prop_id = getattr(cv2, name, None)
if prop_id is None:
self.errorOccurred.emit(f"Nieznany parametr {name}")
return None
return self.cap.get(prop_id)
def list_parameters(self):
return {
"CAP_PROP_BRIGHTNESS": self.cap.get(cv2.CAP_PROP_BRIGHTNESS),
"CAP_PROP_CONTRAST": self.cap.get(cv2.CAP_PROP_CONTRAST),
"CAP_PROP_SATURATION": self.cap.get(cv2.CAP_PROP_SATURATION),
"CAP_PROP_GAIN": self.cap.get(cv2.CAP_PROP_GAIN),
"CAP_PROP_EXPOSURE": self.cap.get(cv2.CAP_PROP_EXPOSURE),
}

View File

@@ -1,6 +1,7 @@
import sys import sys
from PySide6.QtWidgets import QApplication from PySide6.QtWidgets import QApplication
from ui.main_palette import set_dark_theme
from ui.main_window import MainWindow from ui.main_window import MainWindow
from controllers.main_controller import MainController from controllers.main_controller import MainController
@@ -8,10 +9,13 @@ from controllers.main_controller import MainController
def main(): def main():
app = QApplication(sys.argv) app = QApplication(sys.argv)
app.setStyle("Fusion") set_dark_theme(app)
# app.setStyle("Fusion")
window = MainWindow() window = MainWindow()
controller = MainController(window) controller = MainController(window)
controller.load_colors() controller.load_colors()
app.aboutToQuit.connect(controller.shutdown)
window.show() window.show()
sys.exit(app.exec()) sys.exit(app.exec())

48
ui/main_palette.py Normal file
View File

@@ -0,0 +1,48 @@
import sys
from PySide6.QtWidgets import QApplication
from PySide6.QtGui import QPalette, QColor
from PySide6.QtCore import Qt
def set_dark_theme(app: QApplication):
"""Definiuje i stosuje ciemną paletę kolorów do aplikacji."""
# 1. Upewnij się, że styl jest ustawiony na "Fusion"
app.setStyle('Fusion')
# 2. Definicja kolorów dla ciemnego motywu
palette = QPalette()
# Kolory tła
DARK_GRAY = QColor(45, 45, 45) # Ogólne tło okien i widżetów (Base, Window)
LIGHT_GRAY = QColor(53, 53, 53) # Tło elementów, np. toolbara, menu (Window)
VERY_DARK_GRAY = QColor(32, 32, 32) # Kolor tła dla kontrolek (Button)
# Kolory tekstu i obramowań
WHITE = QColor(200, 200, 200) # Główny kolor tekstu (Text, WindowText)
HIGHLIGHT = QColor(66, 135, 245) # Kolor podświetlenia (Highlight)
# Ustawienie głównej palety
# palette.setColor(QPalette.ColorRole.Window, LIGHT_GRAY)
palette.setColor(QPalette.ColorRole.Window, VERY_DARK_GRAY)
palette.setColor(QPalette.ColorRole.WindowText, WHITE)
palette.setColor(QPalette.ColorRole.Base, DARK_GRAY)
palette.setColor(QPalette.ColorRole.AlternateBase, LIGHT_GRAY)
palette.setColor(QPalette.ColorRole.ToolTipBase, WHITE)
palette.setColor(QPalette.ColorRole.ToolTipText, WHITE)
palette.setColor(QPalette.ColorRole.Text, WHITE)
palette.setColor(QPalette.ColorRole.Button, VERY_DARK_GRAY)
palette.setColor(QPalette.ColorRole.ButtonText, WHITE)
palette.setColor(QPalette.ColorRole.BrightText, Qt.GlobalColor.red)
palette.setColor(QPalette.ColorRole.Link, QColor(42, 130, 218))
palette.setColor(QPalette.ColorRole.PlaceholderText, QColor(150, 150, 150))
# Kolory zaznaczenia/interakcji
palette.setColor(QPalette.ColorRole.Highlight, HIGHLIGHT)
palette.setColor(QPalette.ColorRole.HighlightedText, Qt.GlobalColor.black)
# Kontrolki wyłączone (Disabled)
# palette.setColor(QPalette.ColorRole.Disabled, QPalette.ColorGroup.Active, QPalette.ColorRole.Text, QColor(127, 127, 127))
# palette.setColor(QPalette.ColorRole.Disabled, QPalette.ColorGroup.Active, QPalette.ColorRole.ButtonText, QColor(127, 127, 127))
# 3. Zastosowanie palety do aplikacji
app.setPalette(palette)