Files
duck-stain-yolo/app/camera.py

165 lines
5.2 KiB
Python

from __future__ import annotations
import threading
import time
from typing import Any
import cv2
import numpy as np
from PySide6.QtCore import QThread, Signal, Slot
from app.detection import DetectionPipeline, DetectionResult
CV_CAP_PROPS = {
"brightness": cv2.CAP_PROP_BRIGHTNESS,
"contrast": cv2.CAP_PROP_CONTRAST,
"saturation": cv2.CAP_PROP_SATURATION,
"hue": cv2.CAP_PROP_HUE,
"gain": cv2.CAP_PROP_GAIN,
"exposure": cv2.CAP_PROP_EXPOSURE,
"sharpness": cv2.CAP_PROP_SHARPNESS,
"auto_exposure": cv2.CAP_PROP_AUTO_EXPOSURE,
"focus": cv2.CAP_PROP_FOCUS,
"auto_focus": cv2.CAP_PROP_AUTOFOCUS,
}
def backend_for_name(name: str) -> int:
if name == "avfoundation":
return cv2.CAP_AVFOUNDATION
if name == "v4l2":
return cv2.CAP_V4L2
if name == "dshow":
return cv2.CAP_DSHOW
return cv2.CAP_ANY
def rotate_frame(frame: np.ndarray, degrees: int) -> np.ndarray:
normalized = degrees % 360
if normalized == 0:
return frame
if normalized == 90:
return cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
if normalized == 180:
return cv2.rotate(frame, cv2.ROTATE_180)
if normalized == 270:
return cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
return frame
class CameraWorker(QThread):
frame_ready = Signal(object)
camera_error = Signal(str)
def __init__(self, config: dict[str, Any], app_config: Any) -> None:
super().__init__()
self.config = config
self.app_config = app_config
self._running = threading.Event()
self._running.set()
self._capture: cv2.VideoCapture | None = None
self._lock = threading.Lock()
def stop(self) -> None:
self._running.clear()
@Slot(dict)
def update_camera_config(self, camera_config: dict[str, Any]) -> None:
with self._lock:
self.config["camera"] = camera_config
capture = self._capture
if capture is not None:
self._apply_camera_settings(capture)
def run(self) -> None:
camera_cfg = self.config["camera"]
capture = cv2.VideoCapture(
int(camera_cfg.get("index", 0)),
backend_for_name(str(camera_cfg.get("backend", "auto"))),
)
self._capture = capture
if not capture.isOpened():
self.camera_error.emit("Nie mozna otworzyc kamery USB")
return
self._apply_camera_settings(capture)
try:
while self._running.is_set():
ok, frame = capture.read()
if not ok or frame is None:
self.camera_error.emit("Nie mozna odczytac klatki z kamery")
time.sleep(0.2)
continue
with self._lock:
rotation_degrees = int(self.config["camera"].get("rotation_degrees", 0))
frame = rotate_frame(frame, rotation_degrees)
self.frame_ready.emit(frame)
finally:
capture.release()
self._capture = None
def _apply_camera_settings(self, capture: cv2.VideoCapture) -> None:
camera_cfg = self.config["camera"]
capture.set(cv2.CAP_PROP_FRAME_WIDTH, int(camera_cfg.get("width", 1920)))
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, int(camera_cfg.get("height", 1080)))
capture.set(cv2.CAP_PROP_FPS, int(camera_cfg.get("fps", 30)))
for name, value in camera_cfg.get("properties", {}).items():
if value is None or name not in CV_CAP_PROPS:
continue
capture.set(CV_CAP_PROPS[name], float(value))
class DetectionWorker(QThread):
detection_ready = Signal(object)
def __init__(self, config: dict[str, Any], app_config: Any) -> None:
super().__init__()
self.config = config
self.app_config = app_config
self.pipeline: DetectionPipeline | None = None
self._running = threading.Event()
self._running.set()
self._pending = threading.Event()
self._lock = threading.Lock()
self._pending_frame: np.ndarray | None = None
self._busy = False
def stop(self) -> None:
self._running.clear()
self._pending.set()
def request_detection(self, frame: np.ndarray) -> bool:
with self._lock:
if self._busy or self._pending_frame is not None:
return False
self._pending_frame = frame.copy()
self._pending.set()
return True
def run(self) -> None:
while self._running.is_set():
self._pending.wait(0.2)
if not self._running.is_set():
break
with self._lock:
frame = self._pending_frame
self._pending_frame = None
self._pending.clear()
if frame is None:
continue
self._busy = True
try:
if self.pipeline is None:
self.pipeline = DetectionPipeline(self.config, self.app_config)
result: DetectionResult = self.pipeline.process(frame)
self.detection_ready.emit(result)
finally:
with self._lock:
self._busy = False