"""Camera Service — manages QCamera lifecycle and frame acquisition.""" from __future__ import annotations import logging from PySide6.QtCore import QObject, Signal from PySide6.QtMultimedia import ( QCamera, QMediaCaptureSession, QVideoFrame, QVideoSink, ) from app.camera.camera_enumerator import CameraInfo from app.config import DEFAULT_FPS, DEFAULT_HEIGHT, DEFAULT_WIDTH logger = logging.getLogger(__name__) class CameraService(QObject): """ Manages camera initialisation, configuration and frame delivery. Emits: frame_ready(QVideoFrame) — new frame available from the camera camera_started() — camera successfully opened and streaming camera_stopped() — camera stopped (clean shutdown) camera_error(str) — camera error description """ frame_ready = Signal(QVideoFrame) camera_started = Signal() camera_stopped = Signal() camera_error = Signal(str) def __init__(self, parent: QObject | None = None) -> None: super().__init__(parent) self._camera: QCamera | None = None self._session = QMediaCaptureSession(self) self._sink = QVideoSink(self) self._current_info: CameraInfo | None = None self._session.setVideoSink(self._sink) self._sink.videoFrameChanged.connect(self._on_frame) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def start(self, camera_info: CameraInfo) -> None: """Start streaming from the given camera device.""" self.stop() self._current_info = camera_info self._camera = QCamera(camera_info.device, self) self._camera.errorOccurred.connect(self._on_error) self._camera.activeChanged.connect(self._on_active_changed) self._session.setCamera(self._camera) self._apply_best_format(camera_info) self._camera.start() logger.info("Camera start requested: %s", camera_info.name) def stop(self) -> None: """Stop the current camera.""" if self._camera is not None: self._camera.stop() self._camera.errorOccurred.disconnect() self._camera.activeChanged.disconnect() self._camera = None self._current_info = None logger.info("Camera stopped") def reconnect(self) -> None: """Restart the current camera after an error or disconnect.""" if self._current_info is not None: logger.info("Reconnecting camera: %s", self._current_info.name) self.start(self._current_info) else: logger.warning("Reconnect requested but no camera was previously started") def set_resolution(self, width: int, height: int) -> None: """Request a specific resolution. Effective on next start() if camera is active.""" if self._camera is None: return self._set_format(width, height, fps=None) def set_fps(self, fps: float) -> None: """Request a specific frame rate.""" if self._camera is None or self._current_info is None: return # Get current resolution from active format fmt = self._camera.cameraFormat() res = fmt.resolution() self._set_format(res.width(), res.height(), fps=fps) @property def is_active(self) -> bool: return self._camera is not None and self._camera.isActive() @property def current_camera(self) -> CameraInfo | None: return self._current_info # ------------------------------------------------------------------ # Video output accessor for direct QVideoWidget connection # ------------------------------------------------------------------ def video_sink(self) -> QVideoSink: """Return the internal QVideoSink (used by VideoRenderer).""" return self._sink def capture_session(self) -> QMediaCaptureSession: """Return the capture session (can be connected to QVideoWidget directly).""" return self._session # ------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------ def _apply_best_format(self, info: CameraInfo) -> None: """Pick the best matching format: prefer DEFAULT_WIDTH x DEFAULT_HEIGHT at DEFAULT_FPS.""" if not info.formats: return self._set_format(DEFAULT_WIDTH, DEFAULT_HEIGHT, fps=float(DEFAULT_FPS)) def _set_format(self, width: int, height: int, fps: float | None) -> None: if self._camera is None or self._current_info is None: return best = None best_score = -1 for fmt in self._current_info.device.videoFormats(): res = fmt.resolution() w, h = res.width(), res.height() f = fmt.maxFrameRate() res_match = int(w == width and h == height) * 1000 fps_match = int(fps is not None and abs(f - fps) < 1) * 100 area_score = -(abs(w * h - width * height)) score = res_match + fps_match + area_score if score > best_score: best_score = score best = fmt if best is not None: self._camera.setCameraFormat(best) res = best.resolution() logger.info( "Camera format set: %dx%d @ %.1f fps", res.width(), res.height(), best.maxFrameRate(), ) def _on_frame(self, frame: QVideoFrame) -> None: if frame.isValid(): self.frame_ready.emit(frame) def _on_error(self, error: QCamera.Error, error_string: str) -> None: logger.error("Camera error %s: %s", error, error_string) self.camera_error.emit(error_string) def _on_active_changed(self, active: bool) -> None: if active: logger.info("Camera active: %s", self._current_info.name if self._current_info else "?") self.camera_started.emit() else: logger.info("Camera inactive") self.camera_stopped.emit()