Initial MVP application skeleton
Add PySide6 camera UI, YOLO/Tesseract detection pipeline, capture metadata, configuration, and project gitignore.
This commit is contained in:
129
app/camera.py
Normal file
129
app/camera.py
Normal file
@@ -0,0 +1,129 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import threading
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from PySide6.QtCore import QThread, Signal, Slot
|
||||
|
||||
from app.detection import DetectionPipeline, DetectionResult
|
||||
|
||||
|
||||
CV_CAP_PROPS = {
|
||||
"brightness": cv2.CAP_PROP_BRIGHTNESS,
|
||||
"contrast": cv2.CAP_PROP_CONTRAST,
|
||||
"saturation": cv2.CAP_PROP_SATURATION,
|
||||
"hue": cv2.CAP_PROP_HUE,
|
||||
"gain": cv2.CAP_PROP_GAIN,
|
||||
"exposure": cv2.CAP_PROP_EXPOSURE,
|
||||
"sharpness": cv2.CAP_PROP_SHARPNESS,
|
||||
"auto_exposure": cv2.CAP_PROP_AUTO_EXPOSURE,
|
||||
"focus": cv2.CAP_PROP_FOCUS,
|
||||
"auto_focus": cv2.CAP_PROP_AUTOFOCUS,
|
||||
}
|
||||
|
||||
|
||||
def backend_for_name(name: str) -> int:
|
||||
if name == "avfoundation":
|
||||
return cv2.CAP_AVFOUNDATION
|
||||
if name == "v4l2":
|
||||
return cv2.CAP_V4L2
|
||||
if name == "dshow":
|
||||
return cv2.CAP_DSHOW
|
||||
return cv2.CAP_ANY
|
||||
|
||||
|
||||
class CameraWorker(QThread):
|
||||
frame_ready = Signal(object)
|
||||
detection_ready = Signal(object)
|
||||
camera_error = Signal(str)
|
||||
|
||||
def __init__(self, config: dict[str, Any], app_config: Any) -> None:
|
||||
super().__init__()
|
||||
self.config = config
|
||||
self.app_config = app_config
|
||||
self.pipeline = DetectionPipeline(config, app_config)
|
||||
self._running = threading.Event()
|
||||
self._running.set()
|
||||
self._detecting = False
|
||||
self._accepted = False
|
||||
self._frame_count = 0
|
||||
self._capture: cv2.VideoCapture | None = None
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def stop(self) -> None:
|
||||
self._running.clear()
|
||||
|
||||
@Slot()
|
||||
def start_detection(self) -> None:
|
||||
with self._lock:
|
||||
self._detecting = True
|
||||
self._accepted = False
|
||||
self._frame_count = 0
|
||||
|
||||
@Slot()
|
||||
def accept_detection(self) -> None:
|
||||
with self._lock:
|
||||
self._detecting = False
|
||||
self._accepted = True
|
||||
|
||||
@Slot(dict)
|
||||
def update_camera_config(self, camera_config: dict[str, Any]) -> None:
|
||||
with self._lock:
|
||||
self.config["camera"] = camera_config
|
||||
capture = self._capture
|
||||
if capture is not None:
|
||||
self._apply_camera_settings(capture)
|
||||
|
||||
def run(self) -> None:
|
||||
camera_cfg = self.config["camera"]
|
||||
capture = cv2.VideoCapture(
|
||||
int(camera_cfg.get("index", 0)),
|
||||
backend_for_name(str(camera_cfg.get("backend", "auto"))),
|
||||
)
|
||||
self._capture = capture
|
||||
if not capture.isOpened():
|
||||
self.camera_error.emit("Nie mozna otworzyc kamery USB")
|
||||
return
|
||||
|
||||
self._apply_camera_settings(capture)
|
||||
|
||||
try:
|
||||
while self._running.is_set():
|
||||
ok, frame = capture.read()
|
||||
if not ok or frame is None:
|
||||
self.camera_error.emit("Nie mozna odczytac klatki z kamery")
|
||||
time.sleep(0.2)
|
||||
continue
|
||||
|
||||
self.frame_ready.emit(frame)
|
||||
self._maybe_detect(frame)
|
||||
finally:
|
||||
capture.release()
|
||||
self._capture = None
|
||||
|
||||
def _apply_camera_settings(self, capture: cv2.VideoCapture) -> None:
|
||||
camera_cfg = self.config["camera"]
|
||||
capture.set(cv2.CAP_PROP_FRAME_WIDTH, int(camera_cfg.get("width", 1920)))
|
||||
capture.set(cv2.CAP_PROP_FRAME_HEIGHT, int(camera_cfg.get("height", 1080)))
|
||||
capture.set(cv2.CAP_PROP_FPS, int(camera_cfg.get("fps", 30)))
|
||||
|
||||
for name, value in camera_cfg.get("properties", {}).items():
|
||||
if value is None or name not in CV_CAP_PROPS:
|
||||
continue
|
||||
capture.set(CV_CAP_PROPS[name], float(value))
|
||||
|
||||
def _maybe_detect(self, frame: np.ndarray) -> None:
|
||||
with self._lock:
|
||||
detecting = self._detecting and not self._accepted
|
||||
frame_stride = max(1, int(self.config["detection"].get("frame_stride", 5)))
|
||||
self._frame_count += 1
|
||||
should_detect = detecting and self._frame_count % frame_stride == 0
|
||||
|
||||
if not should_detect:
|
||||
return
|
||||
|
||||
result: DetectionResult = self.pipeline.process(frame)
|
||||
self.detection_ready.emit(result)
|
||||
Reference in New Issue
Block a user