- Add FrameDispatcher for distributing QVideoFrames to subscribers - Implement TelemetryCollector to measure video pipeline performance metrics - Create MainWindow as the main application interface with video rendering - Develop AppMenuBar for camera selection, resolution, and FPS settings - Establish overlay system for displaying telemetry metrics - Set up project structure and configuration files - Add unit tests for FrameDispatcher and TelemetryCollector
175 lines
6.1 KiB
Python
175 lines
6.1 KiB
Python
"""Camera Service — manages QCamera lifecycle and frame acquisition."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
from PySide6.QtCore import QObject, Signal
|
|
from PySide6.QtMultimedia import (
|
|
QCamera,
|
|
QMediaCaptureSession,
|
|
QVideoFrame,
|
|
QVideoSink,
|
|
)
|
|
|
|
from app.camera.camera_enumerator import CameraInfo
|
|
from app.config import DEFAULT_FPS, DEFAULT_HEIGHT, DEFAULT_WIDTH
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class CameraService(QObject):
|
|
"""
|
|
Manages camera initialisation, configuration and frame delivery.
|
|
|
|
Emits:
|
|
frame_ready(QVideoFrame) — new frame available from the camera
|
|
camera_started() — camera successfully opened and streaming
|
|
camera_stopped() — camera stopped (clean shutdown)
|
|
camera_error(str) — camera error description
|
|
"""
|
|
|
|
frame_ready = Signal(QVideoFrame)
|
|
camera_started = Signal()
|
|
camera_stopped = Signal()
|
|
camera_error = Signal(str)
|
|
|
|
def __init__(self, parent: QObject | None = None) -> None:
|
|
super().__init__(parent)
|
|
|
|
self._camera: QCamera | None = None
|
|
self._session = QMediaCaptureSession(self)
|
|
self._sink = QVideoSink(self)
|
|
self._current_info: CameraInfo | None = None
|
|
|
|
self._session.setVideoSink(self._sink)
|
|
self._sink.videoFrameChanged.connect(self._on_frame)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Public API
|
|
# ------------------------------------------------------------------
|
|
|
|
def start(self, camera_info: CameraInfo) -> None:
|
|
"""Start streaming from the given camera device."""
|
|
self.stop()
|
|
|
|
self._current_info = camera_info
|
|
self._camera = QCamera(camera_info.device, self)
|
|
self._camera.errorOccurred.connect(self._on_error)
|
|
self._camera.activeChanged.connect(self._on_active_changed)
|
|
|
|
self._session.setCamera(self._camera)
|
|
self._apply_best_format(camera_info)
|
|
self._camera.start()
|
|
logger.info("Camera start requested: %s", camera_info.name)
|
|
|
|
def stop(self) -> None:
|
|
"""Stop the current camera."""
|
|
if self._camera is not None:
|
|
self._camera.stop()
|
|
self._camera.errorOccurred.disconnect()
|
|
self._camera.activeChanged.disconnect()
|
|
self._camera = None
|
|
self._current_info = None
|
|
logger.info("Camera stopped")
|
|
|
|
def reconnect(self) -> None:
|
|
"""Restart the current camera after an error or disconnect."""
|
|
if self._current_info is not None:
|
|
logger.info("Reconnecting camera: %s", self._current_info.name)
|
|
self.start(self._current_info)
|
|
else:
|
|
logger.warning("Reconnect requested but no camera was previously started")
|
|
|
|
def set_resolution(self, width: int, height: int) -> None:
|
|
"""Request a specific resolution. Effective on next start() if camera is active."""
|
|
if self._camera is None:
|
|
return
|
|
self._set_format(width, height, fps=None)
|
|
|
|
def set_fps(self, fps: float) -> None:
|
|
"""Request a specific frame rate."""
|
|
if self._camera is None or self._current_info is None:
|
|
return
|
|
# Get current resolution from active format
|
|
fmt = self._camera.cameraFormat()
|
|
res = fmt.resolution()
|
|
self._set_format(res.width(), res.height(), fps=fps)
|
|
|
|
@property
|
|
def is_active(self) -> bool:
|
|
return self._camera is not None and self._camera.isActive()
|
|
|
|
@property
|
|
def current_camera(self) -> CameraInfo | None:
|
|
return self._current_info
|
|
|
|
# ------------------------------------------------------------------
|
|
# Video output accessor for direct QVideoWidget connection
|
|
# ------------------------------------------------------------------
|
|
|
|
def video_sink(self) -> QVideoSink:
|
|
"""Return the internal QVideoSink (used by VideoRenderer)."""
|
|
return self._sink
|
|
|
|
def capture_session(self) -> QMediaCaptureSession:
|
|
"""Return the capture session (can be connected to QVideoWidget directly)."""
|
|
return self._session
|
|
|
|
# ------------------------------------------------------------------
|
|
# Private helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _apply_best_format(self, info: CameraInfo) -> None:
|
|
"""Pick the best matching format: prefer DEFAULT_WIDTH x DEFAULT_HEIGHT at DEFAULT_FPS."""
|
|
if not info.formats:
|
|
return
|
|
self._set_format(DEFAULT_WIDTH, DEFAULT_HEIGHT, fps=float(DEFAULT_FPS))
|
|
|
|
def _set_format(self, width: int, height: int, fps: float | None) -> None:
|
|
if self._camera is None or self._current_info is None:
|
|
return
|
|
|
|
best = None
|
|
best_score = -1
|
|
|
|
for fmt in self._current_info.device.videoFormats():
|
|
res = fmt.resolution()
|
|
w, h = res.width(), res.height()
|
|
f = fmt.maxFrameRate()
|
|
|
|
res_match = int(w == width and h == height) * 1000
|
|
fps_match = int(fps is not None and abs(f - fps) < 1) * 100
|
|
area_score = -(abs(w * h - width * height))
|
|
|
|
score = res_match + fps_match + area_score
|
|
if score > best_score:
|
|
best_score = score
|
|
best = fmt
|
|
|
|
if best is not None:
|
|
self._camera.setCameraFormat(best)
|
|
res = best.resolution()
|
|
logger.info(
|
|
"Camera format set: %dx%d @ %.1f fps",
|
|
res.width(),
|
|
res.height(),
|
|
best.maxFrameRate(),
|
|
)
|
|
|
|
def _on_frame(self, frame: QVideoFrame) -> None:
|
|
if frame.isValid():
|
|
self.frame_ready.emit(frame)
|
|
|
|
def _on_error(self, error: QCamera.Error, error_string: str) -> None:
|
|
logger.error("Camera error %s: %s", error, error_string)
|
|
self.camera_error.emit(error_string)
|
|
|
|
def _on_active_changed(self, active: bool) -> None:
|
|
if active:
|
|
logger.info("Camera active: %s", self._current_info.name if self._current_info else "?")
|
|
self.camera_started.emit()
|
|
else:
|
|
logger.info("Camera inactive")
|
|
self.camera_stopped.emit()
|