230 lines
8.1 KiB
Python
230 lines
8.1 KiB
Python
"""Camera Service — manages QCamera lifecycle and frame acquisition."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import platform
|
|
|
|
from PySide6.QtCore import QObject, Signal
|
|
from PySide6.QtMultimedia import (
|
|
QCamera,
|
|
QMediaCaptureSession,
|
|
QVideoFrame,
|
|
QVideoSink,
|
|
)
|
|
|
|
from app.camera.camera_enumerator import CameraFormat, CameraInfo, pixel_format_name
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CameraService(QObject):
|
|
"""
|
|
Manages camera initialisation, configuration and frame delivery.
|
|
|
|
Emits:
|
|
frame_ready(QVideoFrame) — new frame available from the camera
|
|
camera_started() — camera successfully opened and streaming
|
|
camera_stopped() — camera stopped (clean shutdown)
|
|
camera_error(str) — camera error description
|
|
format_changed(float) — actual FPS after format was applied
|
|
"""
|
|
|
|
frame_ready = Signal(QVideoFrame)
|
|
camera_started = Signal()
|
|
camera_stopped = Signal()
|
|
camera_error = Signal(str)
|
|
format_changed = Signal(float)
|
|
|
|
def __init__(self, parent: QObject | None = None) -> None:
|
|
super().__init__(parent)
|
|
|
|
self._camera: QCamera | None = None
|
|
self._session = QMediaCaptureSession(self)
|
|
self._sink = QVideoSink(self)
|
|
self._current_info: CameraInfo | None = None
|
|
self._session_logged: bool = False
|
|
|
|
# Desired format — applied on every (re)start
|
|
self._desired_fmt: CameraFormat | None = None
|
|
self._desired_fmt = CameraFormat(width=1280, height=720, max_fps=30, pixel_format="NV12")
|
|
|
|
self._session.setVideoSink(self._sink)
|
|
self._sink.videoFrameChanged.connect(self._on_frame)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Public API
|
|
# ------------------------------------------------------------------
|
|
|
|
def start(self, camera_info: CameraInfo) -> None:
|
|
"""Start streaming from the given camera device."""
|
|
self._stop_camera()
|
|
|
|
self._current_info = camera_info
|
|
self._camera = QCamera(camera_info.device, self)
|
|
self._camera.errorOccurred.connect(self._on_error)
|
|
self._camera.activeChanged.connect(self._on_active_changed)
|
|
|
|
self._session.setCamera(self._camera)
|
|
self._apply_format()
|
|
self._camera.start()
|
|
logger.info("Camera start requested: %s", camera_info.name)
|
|
|
|
def stop(self) -> None:
|
|
"""Stop the current camera and forget the device."""
|
|
self._stop_camera()
|
|
self._current_info = None
|
|
|
|
def reconnect(self) -> None:
|
|
"""Restart the current camera (e.g. after an error or disconnect)."""
|
|
if self._current_info is not None:
|
|
logger.info("Reconnecting camera: %s", self._current_info.name)
|
|
self.start(self._current_info)
|
|
else:
|
|
logger.warning("Reconnect requested but no camera was previously started")
|
|
|
|
def set_format(self, fmt: CameraFormat) -> None:
|
|
"""
|
|
Request a specific video format (resolution + fps + pixel format).
|
|
Restarts the camera to apply the change reliably.
|
|
"""
|
|
self._desired_fmt = fmt
|
|
if self._current_info is not None:
|
|
logger.info(
|
|
"Format change: %dx%d @ %.4g fps (%s) — restarting camera",
|
|
fmt.width, fmt.height, fmt.max_fps, fmt.pixel_format,
|
|
)
|
|
self.start(self._current_info)
|
|
|
|
@property
|
|
def is_active(self) -> bool:
|
|
return self._camera is not None and self._camera.isActive()
|
|
|
|
@property
|
|
def current_camera(self) -> CameraInfo | None:
|
|
return self._current_info
|
|
|
|
@property
|
|
def qt_camera(self) -> QCamera | None:
|
|
"""Expose underlying QCamera for settings dialog."""
|
|
return self._camera
|
|
|
|
def video_sink(self) -> QVideoSink:
|
|
return self._sink
|
|
|
|
def capture_session(self) -> QMediaCaptureSession:
|
|
return self._session
|
|
|
|
# ------------------------------------------------------------------
|
|
# Private helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _stop_camera(self) -> None:
|
|
if self._camera is not None:
|
|
self._camera.stop()
|
|
self._camera.errorOccurred.disconnect()
|
|
self._camera.activeChanged.disconnect()
|
|
self._camera = None
|
|
logger.debug("Camera stopped (internal)")
|
|
|
|
def _apply_format(self) -> None:
|
|
"""Select and apply the best matching QCameraFormat before start()."""
|
|
if self._camera is None or self._current_info is None:
|
|
return
|
|
|
|
best = None
|
|
best_score = -1
|
|
|
|
desired = self._desired_fmt
|
|
|
|
for qfmt in self._current_info.device.videoFormats():
|
|
res = qfmt.resolution()
|
|
w, h = res.width(), res.height()
|
|
f = qfmt.maxFrameRate()
|
|
pf = pixel_format_name(qfmt.pixelFormat())
|
|
|
|
if desired is not None:
|
|
# Exact match preferred
|
|
exact_res = int(w == desired.width and h == desired.height) * 1000
|
|
exact_fps = int(abs(f - desired.max_fps) < 0.5) * 100
|
|
exact_pf = int(pf == desired.pixel_format) * 500
|
|
area_diff = abs(w * h - desired.width * desired.height)
|
|
score = exact_res + exact_fps + exact_pf - area_diff
|
|
else:
|
|
# No preference — pick largest area, then highest fps
|
|
score = w * h + int(f)
|
|
|
|
if score > best_score:
|
|
best_score = score
|
|
best = qfmt
|
|
|
|
if best is not None:
|
|
self._camera.setCameraFormat(best)
|
|
res = best.resolution()
|
|
pf = pixel_format_name(best.pixelFormat())
|
|
logger.info(
|
|
"Camera format requested: %s %dx%d @ %.4g fps",
|
|
pf, res.width(), res.height(), best.maxFrameRate(),
|
|
)
|
|
|
|
def _log_actual_format(self) -> None:
|
|
"""Log the format the camera actually started with and emit format_changed."""
|
|
if self._camera is None:
|
|
return
|
|
fmt = self._camera.cameraFormat()
|
|
res = fmt.resolution()
|
|
actual_fps = fmt.maxFrameRate()
|
|
pf = pixel_format_name(fmt.pixelFormat())
|
|
|
|
logger.info(
|
|
"Camera format ACTUAL: %s %dx%d @ %.4g fps",
|
|
pf, res.width(), res.height(), actual_fps,
|
|
)
|
|
if self._desired_fmt is not None:
|
|
d = self._desired_fmt
|
|
if abs(actual_fps - d.max_fps) > 0.5:
|
|
logger.warning(
|
|
"Requested %.4g fps but camera delivering %.4g fps",
|
|
d.max_fps, actual_fps,
|
|
)
|
|
if res.width() != d.width or res.height() != d.height:
|
|
logger.warning(
|
|
"Requested %dx%d but camera delivering %dx%d",
|
|
d.width, d.height, res.width(), res.height(),
|
|
)
|
|
self.format_changed.emit(actual_fps)
|
|
|
|
def _log_qt_backend(self) -> None:
|
|
"""Log the Qt multimedia backend in use (once per session)."""
|
|
try:
|
|
system = platform.system()
|
|
backend = {
|
|
"Darwin": "AVFoundation",
|
|
"Windows": "Media Foundation / DirectShow",
|
|
"Linux": "GStreamer / V4L2",
|
|
}.get(system, system)
|
|
logger.info("Qt multimedia backend: %s", backend)
|
|
except Exception:
|
|
pass
|
|
|
|
def _on_frame(self, frame: QVideoFrame) -> None:
|
|
if frame.isValid():
|
|
self.frame_ready.emit(frame)
|
|
|
|
def _on_error(self, error: QCamera.Error, error_string: str) -> None:
|
|
logger.error("Camera error %s: %s", error, error_string)
|
|
self.camera_error.emit(error_string)
|
|
|
|
def _on_active_changed(self, active: bool) -> None:
|
|
if active:
|
|
name = self._current_info.name if self._current_info else "?"
|
|
if not self._session_logged:
|
|
self._log_qt_backend()
|
|
self._session_logged = True
|
|
logger.info("Camera active: %s", name)
|
|
self._log_actual_format()
|
|
self.camera_started.emit()
|
|
else:
|
|
logger.info("Camera inactive")
|
|
self.camera_stopped.emit()
|