from __future__ import annotations import threading import time from typing import Any import cv2 import numpy as np from PySide6.QtCore import QThread, Signal, Slot from app.detection import DetectionPipeline, DetectionResult CV_CAP_PROPS = { "brightness": cv2.CAP_PROP_BRIGHTNESS, "contrast": cv2.CAP_PROP_CONTRAST, "saturation": cv2.CAP_PROP_SATURATION, "hue": cv2.CAP_PROP_HUE, "gain": cv2.CAP_PROP_GAIN, "exposure": cv2.CAP_PROP_EXPOSURE, "sharpness": cv2.CAP_PROP_SHARPNESS, "auto_exposure": cv2.CAP_PROP_AUTO_EXPOSURE, "focus": cv2.CAP_PROP_FOCUS, "auto_focus": cv2.CAP_PROP_AUTOFOCUS, } def backend_for_name(name: str) -> int: if name == "avfoundation": return cv2.CAP_AVFOUNDATION if name == "v4l2": return cv2.CAP_V4L2 if name == "dshow": return cv2.CAP_DSHOW return cv2.CAP_ANY class CameraWorker(QThread): frame_ready = Signal(object) detection_ready = Signal(object) camera_error = Signal(str) def __init__(self, config: dict[str, Any], app_config: Any) -> None: super().__init__() self.config = config self.app_config = app_config self.pipeline = DetectionPipeline(config, app_config) self._running = threading.Event() self._running.set() self._detecting = False self._accepted = False self._frame_count = 0 self._capture: cv2.VideoCapture | None = None self._lock = threading.Lock() def stop(self) -> None: self._running.clear() @Slot() def start_detection(self) -> None: with self._lock: self._detecting = True self._accepted = False self._frame_count = 0 @Slot() def accept_detection(self) -> None: with self._lock: self._detecting = False self._accepted = True @Slot(dict) def update_camera_config(self, camera_config: dict[str, Any]) -> None: with self._lock: self.config["camera"] = camera_config capture = self._capture if capture is not None: self._apply_camera_settings(capture) def run(self) -> None: camera_cfg = self.config["camera"] capture = cv2.VideoCapture( int(camera_cfg.get("index", 0)), backend_for_name(str(camera_cfg.get("backend", "auto"))), ) self._capture = capture if not capture.isOpened(): self.camera_error.emit("Nie mozna otworzyc kamery USB") return self._apply_camera_settings(capture) try: while self._running.is_set(): ok, frame = capture.read() if not ok or frame is None: self.camera_error.emit("Nie mozna odczytac klatki z kamery") time.sleep(0.2) continue self.frame_ready.emit(frame) self._maybe_detect(frame) finally: capture.release() self._capture = None def _apply_camera_settings(self, capture: cv2.VideoCapture) -> None: camera_cfg = self.config["camera"] capture.set(cv2.CAP_PROP_FRAME_WIDTH, int(camera_cfg.get("width", 1920))) capture.set(cv2.CAP_PROP_FRAME_HEIGHT, int(camera_cfg.get("height", 1080))) capture.set(cv2.CAP_PROP_FPS, int(camera_cfg.get("fps", 30))) for name, value in camera_cfg.get("properties", {}).items(): if value is None or name not in CV_CAP_PROPS: continue capture.set(CV_CAP_PROPS[name], float(value)) def _maybe_detect(self, frame: np.ndarray) -> None: with self._lock: detecting = self._detecting and not self._accepted frame_stride = max(1, int(self.config["detection"].get("frame_stride", 5))) self._frame_count += 1 should_detect = detecting and self._frame_count % frame_stride == 0 if not should_detect: return result: DetectionResult = self.pipeline.process(frame) self.detection_ready.emit(result)