"""Camera Service — manages QCamera lifecycle and frame acquisition.""" from __future__ import annotations import logging import platform from PySide6.QtCore import QObject, Signal from PySide6.QtMultimedia import ( QCamera, QMediaCaptureSession, QVideoFrame, QVideoSink, ) from app.camera.camera_enumerator import CameraFormat, CameraInfo, pixel_format_name logger = logging.getLogger(__name__) class CameraService(QObject): """ Manages camera initialisation, configuration and frame delivery. Emits: frame_ready(QVideoFrame) — new frame available from the camera camera_started() — camera successfully opened and streaming camera_stopped() — camera stopped (clean shutdown) camera_error(str) — camera error description format_changed(float) — actual FPS after format was applied """ frame_ready = Signal(QVideoFrame) camera_started = Signal() camera_stopped = Signal() camera_error = Signal(str) format_changed = Signal(float) def __init__(self, parent: QObject | None = None) -> None: super().__init__(parent) self._camera: QCamera | None = None self._session = QMediaCaptureSession(self) self._sink = QVideoSink(self) self._current_info: CameraInfo | None = None self._session_logged: bool = False # Desired format — applied on every (re)start self._desired_fmt: CameraFormat | None = None self._session.setVideoSink(self._sink) self._sink.videoFrameChanged.connect(self._on_frame) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def start(self, camera_info: CameraInfo) -> None: """Start streaming from the given camera device.""" self._stop_camera() self._current_info = camera_info self._camera = QCamera(camera_info.device, self) self._camera.errorOccurred.connect(self._on_error) self._camera.activeChanged.connect(self._on_active_changed) self._session.setCamera(self._camera) self._apply_format() self._camera.start() logger.info("Camera start requested: %s", camera_info.name) def stop(self) -> None: """Stop the current camera and forget the device.""" self._stop_camera() self._current_info = None def reconnect(self) -> None: """Restart the current camera (e.g. after an error or disconnect).""" if self._current_info is not None: logger.info("Reconnecting camera: %s", self._current_info.name) self.start(self._current_info) else: logger.warning("Reconnect requested but no camera was previously started") def set_format(self, fmt: CameraFormat) -> None: """ Request a specific video format (resolution + fps + pixel format). Restarts the camera to apply the change reliably. """ self._desired_fmt = fmt if self._current_info is not None: logger.info( "Format change: %dx%d @ %.4g fps (%s) — restarting camera", fmt.width, fmt.height, fmt.max_fps, fmt.pixel_format, ) self.start(self._current_info) @property def is_active(self) -> bool: return self._camera is not None and self._camera.isActive() @property def current_camera(self) -> CameraInfo | None: return self._current_info @property def qt_camera(self) -> QCamera | None: """Expose underlying QCamera for settings dialog.""" return self._camera def video_sink(self) -> QVideoSink: return self._sink def capture_session(self) -> QMediaCaptureSession: return self._session # ------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------ def _stop_camera(self) -> None: if self._camera is not None: self._camera.stop() self._camera.errorOccurred.disconnect() self._camera.activeChanged.disconnect() self._camera = None logger.debug("Camera stopped (internal)") def _apply_format(self) -> None: """Select and apply the best matching QCameraFormat before start().""" if self._camera is None or self._current_info is None: return best = None best_score = -1 desired = self._desired_fmt for qfmt in self._current_info.device.videoFormats(): res = qfmt.resolution() w, h = res.width(), res.height() f = qfmt.maxFrameRate() pf = pixel_format_name(qfmt.pixelFormat()) if desired is not None: # Exact match preferred exact_res = int(w == desired.width and h == desired.height) * 1000 exact_fps = int(abs(f - desired.max_fps) < 0.5) * 100 exact_pf = int(pf == desired.pixel_format) * 500 area_diff = abs(w * h - desired.width * desired.height) score = exact_res + exact_fps + exact_pf - area_diff else: # No preference — pick largest area, then highest fps score = w * h + int(f) if score > best_score: best_score = score best = qfmt if best is not None: self._camera.setCameraFormat(best) res = best.resolution() pf = pixel_format_name(best.pixelFormat()) logger.info( "Camera format requested: %s %dx%d @ %.4g fps", pf, res.width(), res.height(), best.maxFrameRate(), ) def _log_actual_format(self) -> None: """Log the format the camera actually started with and emit format_changed.""" if self._camera is None: return fmt = self._camera.cameraFormat() res = fmt.resolution() actual_fps = fmt.maxFrameRate() pf = pixel_format_name(fmt.pixelFormat()) logger.info( "Camera format ACTUAL: %s %dx%d @ %.4g fps", pf, res.width(), res.height(), actual_fps, ) if self._desired_fmt is not None: d = self._desired_fmt if abs(actual_fps - d.max_fps) > 0.5: logger.warning( "Requested %.4g fps but camera delivering %.4g fps", d.max_fps, actual_fps, ) if res.width() != d.width or res.height() != d.height: logger.warning( "Requested %dx%d but camera delivering %dx%d", d.width, d.height, res.width(), res.height(), ) self.format_changed.emit(actual_fps) def _log_qt_backend(self) -> None: """Log the Qt multimedia backend in use (once per session).""" try: system = platform.system() backend = { "Darwin": "AVFoundation", "Windows": "Media Foundation / DirectShow", "Linux": "GStreamer / V4L2", }.get(system, system) logger.info("Qt multimedia backend: %s", backend) except Exception: pass def _on_frame(self, frame: QVideoFrame) -> None: if frame.isValid(): self.frame_ready.emit(frame) def _on_error(self, error: QCamera.Error, error_string: str) -> None: logger.error("Camera error %s: %s", error, error_string) self.camera_error.emit(error_string) def _on_active_changed(self, active: bool) -> None: if active: name = self._current_info.name if self._current_info else "?" if not self._session_logged: self._log_qt_backend() self._session_logged = True logger.info("Camera active: %s", name) self._log_actual_format() self.camera_started.emit() else: logger.info("Camera inactive") self.camera_stopped.emit()