"""Camera Service — manages QCamera lifecycle and frame acquisition.""" from __future__ import annotations import logging from PySide6.QtCore import QObject, Signal from PySide6.QtMultimedia import ( QCamera, QMediaCaptureSession, QVideoFrame, QVideoSink, ) from app.camera.camera_enumerator import CameraInfo, pixel_format_name from app.config import DEFAULT_FPS, DEFAULT_HEIGHT, DEFAULT_WIDTH logger = logging.getLogger(__name__) class CameraService(QObject): """ Manages camera initialisation, configuration and frame delivery. Emits: frame_ready(QVideoFrame) — new frame available from the camera camera_started() — camera successfully opened and streaming camera_stopped() — camera stopped (clean shutdown) camera_error(str) — camera error description format_changed(float) — actual FPS after format was applied """ frame_ready = Signal(QVideoFrame) camera_started = Signal() camera_stopped = Signal() camera_error = Signal(str) format_changed = Signal(float) def __init__(self, parent: QObject | None = None) -> None: super().__init__(parent) self._camera: QCamera | None = None self._session = QMediaCaptureSession(self) self._sink = QVideoSink(self) self._current_info: CameraInfo | None = None self._session_logged: bool = False # Desired format — applied on every (re)start self._desired_width: int = DEFAULT_WIDTH self._desired_height: int = DEFAULT_HEIGHT self._desired_fps: float = float(DEFAULT_FPS) self._session.setVideoSink(self._sink) self._sink.videoFrameChanged.connect(self._on_frame) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def start(self, camera_info: CameraInfo) -> None: """Start streaming from the given camera device.""" self._stop_camera() self._current_info = camera_info self._camera = QCamera(camera_info.device, self) self._camera.errorOccurred.connect(self._on_error) self._camera.activeChanged.connect(self._on_active_changed) self._session.setCamera(self._camera) self._apply_format() self._camera.start() logger.info("Camera start requested: %s", camera_info.name) def stop(self) -> None: """Stop the current camera and forget the device.""" self._stop_camera() self._current_info = None def reconnect(self) -> None: """Restart the current camera (e.g. after an error or disconnect).""" if self._current_info is not None: logger.info("Reconnecting camera: %s", self._current_info.name) self.start(self._current_info) else: logger.warning("Reconnect requested but no camera was previously started") def set_resolution(self, width: int, height: int) -> None: """Request a new resolution — restarts camera to apply reliably.""" self._desired_width = width self._desired_height = height if self._current_info is not None: logger.info("Resolution change: %dx%d — restarting camera", width, height) self.start(self._current_info) def set_fps(self, fps: float) -> None: """Request a new frame rate — restarts camera to apply reliably.""" self._desired_fps = fps if self._current_info is not None: logger.info("FPS change: %.1f — restarting camera", fps) self.start(self._current_info) @property def is_active(self) -> bool: return self._camera is not None and self._camera.isActive() @property def current_camera(self) -> CameraInfo | None: return self._current_info def video_sink(self) -> QVideoSink: return self._sink def capture_session(self) -> QMediaCaptureSession: return self._session # ------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------ def _stop_camera(self) -> None: if self._camera is not None: self._camera.stop() self._camera.errorOccurred.disconnect() self._camera.activeChanged.disconnect() self._camera = None logger.debug("Camera stopped (internal)") def _apply_format(self) -> None: """Select and apply the best matching QCameraFormat before start().""" if self._camera is None or self._current_info is None: return best = None best_score = -1 for fmt in self._current_info.device.videoFormats(): res = fmt.resolution() w, h = res.width(), res.height() f = fmt.maxFrameRate() score = ( int(w == self._desired_width and h == self._desired_height) * 1000 + int(abs(f - self._desired_fps) < 1) * 100 - abs(w * h - self._desired_width * self._desired_height) ) if score > best_score: best_score = score best = fmt if best is not None: self._camera.setCameraFormat(best) res = best.resolution() pf = pixel_format_name(best.pixelFormat()) logger.info( "Camera format requested: %s %dx%d @ %.1f fps", pf, res.width(), res.height(), best.maxFrameRate(), ) def _log_actual_format(self) -> None: """Log the format the camera actually started with and emit format_changed.""" if self._camera is None: return fmt = self._camera.cameraFormat() res = fmt.resolution() actual_fps = fmt.maxFrameRate() pf = pixel_format_name(fmt.pixelFormat()) logger.info( "Camera format ACTUAL: %s %dx%d @ %.1f fps", pf, res.width(), res.height(), actual_fps, ) if abs(actual_fps - self._desired_fps) > 0.5: logger.warning( "Requested %.1f fps but camera delivering %.1f fps " "(camera may not support this resolution+fps combination)", self._desired_fps, actual_fps, ) self.format_changed.emit(actual_fps) def _log_qt_backend(self) -> None: """Log the Qt multimedia backend in use (once per session).""" try: # QMediaDevices doesn't expose backend name directly in Qt6, # but we can infer it from platform import platform # noqa: PLC0415 system = platform.system() backend = { "Darwin": "AVFoundation", "Windows": "Media Foundation (DirectShow fallback)", "Linux": "GStreamer / V4L2", }.get(system, system) logger.info("Qt multimedia backend: %s", backend) except Exception: pass def _on_frame(self, frame: QVideoFrame) -> None: if frame.isValid(): self.frame_ready.emit(frame) def _on_error(self, error: QCamera.Error, error_string: str) -> None: logger.error("Camera error %s: %s", error, error_string) self.camera_error.emit(error_string) def _on_active_changed(self, active: bool) -> None: if active: name = self._current_info.name if self._current_info else "?" if not self._session_logged: self._log_qt_backend() self._session_logged = True logger.info("Camera active: %s", name) self._log_actual_format() self.camera_started.emit() else: logger.info("Camera inactive") self.camera_stopped.emit()