146 lines
4.6 KiB
Python
146 lines
4.6 KiB
Python
from __future__ import annotations
|
|
|
|
import threading
|
|
import time
|
|
from typing import Any
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from PySide6.QtCore import QThread, Signal, Slot
|
|
|
|
from app.detection import DetectionPipeline, DetectionResult
|
|
|
|
|
|
CV_CAP_PROPS = {
|
|
"brightness": cv2.CAP_PROP_BRIGHTNESS,
|
|
"contrast": cv2.CAP_PROP_CONTRAST,
|
|
"saturation": cv2.CAP_PROP_SATURATION,
|
|
"hue": cv2.CAP_PROP_HUE,
|
|
"gain": cv2.CAP_PROP_GAIN,
|
|
"exposure": cv2.CAP_PROP_EXPOSURE,
|
|
"sharpness": cv2.CAP_PROP_SHARPNESS,
|
|
"auto_exposure": cv2.CAP_PROP_AUTO_EXPOSURE,
|
|
"focus": cv2.CAP_PROP_FOCUS,
|
|
"auto_focus": cv2.CAP_PROP_AUTOFOCUS,
|
|
}
|
|
|
|
|
|
def backend_for_name(name: str) -> int:
|
|
if name == "avfoundation":
|
|
return cv2.CAP_AVFOUNDATION
|
|
if name == "v4l2":
|
|
return cv2.CAP_V4L2
|
|
if name == "dshow":
|
|
return cv2.CAP_DSHOW
|
|
return cv2.CAP_ANY
|
|
|
|
|
|
def rotate_frame(frame: np.ndarray, degrees: int) -> np.ndarray:
|
|
normalized = degrees % 360
|
|
if normalized == 0:
|
|
return frame
|
|
if normalized == 90:
|
|
return cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
|
|
if normalized == 180:
|
|
return cv2.rotate(frame, cv2.ROTATE_180)
|
|
if normalized == 270:
|
|
return cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
|
|
return frame
|
|
|
|
|
|
class CameraWorker(QThread):
|
|
frame_ready = Signal(object)
|
|
detection_ready = Signal(object)
|
|
camera_error = Signal(str)
|
|
|
|
def __init__(self, config: dict[str, Any], app_config: Any) -> None:
|
|
super().__init__()
|
|
self.config = config
|
|
self.app_config = app_config
|
|
self.pipeline = DetectionPipeline(config, app_config)
|
|
self._running = threading.Event()
|
|
self._running.set()
|
|
self._detecting = False
|
|
self._accepted = False
|
|
self._frame_count = 0
|
|
self._capture: cv2.VideoCapture | None = None
|
|
self._lock = threading.Lock()
|
|
|
|
def stop(self) -> None:
|
|
self._running.clear()
|
|
|
|
@Slot()
|
|
def start_detection(self) -> None:
|
|
with self._lock:
|
|
self._detecting = True
|
|
self._accepted = False
|
|
self._frame_count = 0
|
|
|
|
@Slot()
|
|
def accept_detection(self) -> None:
|
|
with self._lock:
|
|
self._detecting = False
|
|
self._accepted = True
|
|
|
|
@Slot(dict)
|
|
def update_camera_config(self, camera_config: dict[str, Any]) -> None:
|
|
with self._lock:
|
|
self.config["camera"] = camera_config
|
|
capture = self._capture
|
|
if capture is not None:
|
|
self._apply_camera_settings(capture)
|
|
|
|
def run(self) -> None:
|
|
camera_cfg = self.config["camera"]
|
|
capture = cv2.VideoCapture(
|
|
int(camera_cfg.get("index", 0)),
|
|
backend_for_name(str(camera_cfg.get("backend", "auto"))),
|
|
)
|
|
self._capture = capture
|
|
if not capture.isOpened():
|
|
self.camera_error.emit("Nie mozna otworzyc kamery USB")
|
|
return
|
|
|
|
self._apply_camera_settings(capture)
|
|
|
|
try:
|
|
while self._running.is_set():
|
|
ok, frame = capture.read()
|
|
if not ok or frame is None:
|
|
self.camera_error.emit("Nie mozna odczytac klatki z kamery")
|
|
time.sleep(0.2)
|
|
continue
|
|
|
|
with self._lock:
|
|
rotation_degrees = int(self.config["camera"].get("rotation_degrees", 0))
|
|
frame = rotate_frame(frame, rotation_degrees)
|
|
self.frame_ready.emit(frame)
|
|
self._maybe_detect(frame)
|
|
finally:
|
|
capture.release()
|
|
self._capture = None
|
|
|
|
def _apply_camera_settings(self, capture: cv2.VideoCapture) -> None:
|
|
camera_cfg = self.config["camera"]
|
|
capture.set(cv2.CAP_PROP_FRAME_WIDTH, int(camera_cfg.get("width", 1920)))
|
|
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, int(camera_cfg.get("height", 1080)))
|
|
capture.set(cv2.CAP_PROP_FPS, int(camera_cfg.get("fps", 30)))
|
|
|
|
for name, value in camera_cfg.get("properties", {}).items():
|
|
if value is None or name not in CV_CAP_PROPS:
|
|
continue
|
|
capture.set(CV_CAP_PROPS[name], float(value))
|
|
|
|
def _maybe_detect(self, frame: np.ndarray) -> None:
|
|
with self._lock:
|
|
detecting = self._detecting and not self._accepted
|
|
frame_stride = max(1, int(self.config["detection"].get("frame_stride", 5)))
|
|
self._frame_count += 1
|
|
should_detect = detecting and self._frame_count % frame_stride == 0
|
|
|
|
if not should_detect:
|
|
return
|
|
|
|
result: DetectionResult = self.pipeline.process(frame)
|
|
self.detection_ready.emit(result)
|