diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/camera/__init__.py b/app/camera/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/camera/camera_enumerator.py b/app/camera/camera_enumerator.py new file mode 100644 index 0000000..3285bf6 --- /dev/null +++ b/app/camera/camera_enumerator.py @@ -0,0 +1,80 @@ +"""Camera enumeration — discovers available video input devices.""" + +from __future__ import annotations + +from dataclasses import dataclass, field + +from PySide6.QtMultimedia import QCameraDevice, QMediaDevices + + +@dataclass +class CameraInfo: + """Lightweight descriptor of a detected camera.""" + + device: QCameraDevice + name: str + id: str + formats: list[tuple[int, int, float]] = field(default_factory=list) + # formats: list of (width, height, max_fps) + + def __str__(self) -> str: + return f"{self.name} [{self.id}]" + + +class CameraEnumerator: + """Discovers available video input devices via QMediaDevices.""" + + @staticmethod + def list_cameras() -> list[CameraInfo]: + """Return all available camera devices with their supported formats.""" + devices = QMediaDevices.videoInputs() + cameras: list[CameraInfo] = [] + + for device in devices: + formats: list[tuple[int, int, float]] = [] + for fmt in device.videoFormats(): + res = fmt.resolution() + fps = fmt.maxFrameRate() + formats.append((res.width(), res.height(), fps)) + + # deduplicate and sort: largest resolution first, then fps descending + seen: set[tuple[int, int, float]] = set() + unique_formats: list[tuple[int, int, float]] = [] + for f in sorted(formats, key=lambda x: (x[0] * x[1], x[2]), reverse=True): + if f not in seen: + seen.add(f) + unique_formats.append(f) + + cameras.append( + CameraInfo( + device=device, + name=device.description(), + id=device.id().toStdString() + if hasattr(device.id(), "toStdString") + else device.id().data().decode("utf-8", errors="replace"), + formats=unique_formats, + ) + ) + + return cameras + + @staticmethod + def default_camera() -> CameraInfo | None: + """Return the system default camera, or None if no camera is available.""" + device = QMediaDevices.defaultVideoInput() + if device.isNull(): + return None + + cameras = CameraEnumerator.list_cameras() + # find by id match + default_id = ( + device.id().toStdString() + if hasattr(device.id(), "toStdString") + else device.id().data().decode("utf-8", errors="replace") + ) + for cam in cameras: + if cam.id == default_id: + return cam + + # fallback: wrap directly + return cameras[0] if cameras else None diff --git a/app/camera/camera_service.py b/app/camera/camera_service.py new file mode 100644 index 0000000..acfdafd --- /dev/null +++ b/app/camera/camera_service.py @@ -0,0 +1,174 @@ +"""Camera Service — manages QCamera lifecycle and frame acquisition.""" + +from __future__ import annotations + +import logging + +from PySide6.QtCore import QObject, Signal +from PySide6.QtMultimedia import ( + QCamera, + QMediaCaptureSession, + QVideoFrame, + QVideoSink, +) + +from app.camera.camera_enumerator import CameraInfo +from app.config import DEFAULT_FPS, DEFAULT_HEIGHT, DEFAULT_WIDTH + +logger = logging.getLogger(__name__) + + +class CameraService(QObject): + """ + Manages camera initialisation, configuration and frame delivery. + + Emits: + frame_ready(QVideoFrame) — new frame available from the camera + camera_started() — camera successfully opened and streaming + camera_stopped() — camera stopped (clean shutdown) + camera_error(str) — camera error description + """ + + frame_ready = Signal(QVideoFrame) + camera_started = Signal() + camera_stopped = Signal() + camera_error = Signal(str) + + def __init__(self, parent: QObject | None = None) -> None: + super().__init__(parent) + + self._camera: QCamera | None = None + self._session = QMediaCaptureSession(self) + self._sink = QVideoSink(self) + self._current_info: CameraInfo | None = None + + self._session.setVideoSink(self._sink) + self._sink.videoFrameChanged.connect(self._on_frame) + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def start(self, camera_info: CameraInfo) -> None: + """Start streaming from the given camera device.""" + self.stop() + + self._current_info = camera_info + self._camera = QCamera(camera_info.device, self) + self._camera.errorOccurred.connect(self._on_error) + self._camera.activeChanged.connect(self._on_active_changed) + + self._session.setCamera(self._camera) + self._apply_best_format(camera_info) + self._camera.start() + logger.info("Camera start requested: %s", camera_info.name) + + def stop(self) -> None: + """Stop the current camera.""" + if self._camera is not None: + self._camera.stop() + self._camera.errorOccurred.disconnect() + self._camera.activeChanged.disconnect() + self._camera = None + self._current_info = None + logger.info("Camera stopped") + + def reconnect(self) -> None: + """Restart the current camera after an error or disconnect.""" + if self._current_info is not None: + logger.info("Reconnecting camera: %s", self._current_info.name) + self.start(self._current_info) + else: + logger.warning("Reconnect requested but no camera was previously started") + + def set_resolution(self, width: int, height: int) -> None: + """Request a specific resolution. Effective on next start() if camera is active.""" + if self._camera is None: + return + self._set_format(width, height, fps=None) + + def set_fps(self, fps: float) -> None: + """Request a specific frame rate.""" + if self._camera is None or self._current_info is None: + return + # Get current resolution from active format + fmt = self._camera.cameraFormat() + res = fmt.resolution() + self._set_format(res.width(), res.height(), fps=fps) + + @property + def is_active(self) -> bool: + return self._camera is not None and self._camera.isActive() + + @property + def current_camera(self) -> CameraInfo | None: + return self._current_info + + # ------------------------------------------------------------------ + # Video output accessor for direct QVideoWidget connection + # ------------------------------------------------------------------ + + def video_sink(self) -> QVideoSink: + """Return the internal QVideoSink (used by VideoRenderer).""" + return self._sink + + def capture_session(self) -> QMediaCaptureSession: + """Return the capture session (can be connected to QVideoWidget directly).""" + return self._session + + # ------------------------------------------------------------------ + # Private helpers + # ------------------------------------------------------------------ + + def _apply_best_format(self, info: CameraInfo) -> None: + """Pick the best matching format: prefer DEFAULT_WIDTH x DEFAULT_HEIGHT at DEFAULT_FPS.""" + if not info.formats: + return + self._set_format(DEFAULT_WIDTH, DEFAULT_HEIGHT, fps=float(DEFAULT_FPS)) + + def _set_format(self, width: int, height: int, fps: float | None) -> None: + if self._camera is None or self._current_info is None: + return + + best = None + best_score = -1 + + for fmt in self._current_info.device.videoFormats(): + res = fmt.resolution() + w, h = res.width(), res.height() + f = fmt.maxFrameRate() + + res_match = int(w == width and h == height) * 1000 + fps_match = int(fps is not None and abs(f - fps) < 1) * 100 + area_score = -(abs(w * h - width * height)) + + score = res_match + fps_match + area_score + if score > best_score: + best_score = score + best = fmt + + if best is not None: + self._camera.setCameraFormat(best) + res = best.resolution() + logger.info( + "Camera format set: %dx%d @ %.1f fps", + res.width(), + res.height(), + best.maxFrameRate(), + ) + + def _on_frame(self, frame: QVideoFrame) -> None: + if frame.isValid(): + self.frame_ready.emit(frame) + + def _on_error(self, error: QCamera.Error, error_string: str) -> None: + logger.error("Camera error %s: %s", error, error_string) + self.camera_error.emit(error_string) + + def _on_active_changed(self, active: bool) -> None: + if active: + logger.info("Camera active: %s", self._current_info.name if self._current_info else "?") + self.camera_started.emit() + else: + logger.info("Camera inactive") + self.camera_stopped.emit() diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..171896a --- /dev/null +++ b/app/config.py @@ -0,0 +1,22 @@ +"""Application-wide constants and default settings.""" + +APP_NAME = "Duck Preview" +APP_VERSION = "0.1.0" + +# Default camera settings +DEFAULT_FPS = 30 +DEFAULT_WIDTH = 1280 +DEFAULT_HEIGHT = 720 + +# Telemetry +TELEMETRY_UPDATE_INTERVAL_MS = 500 # how often the metrics snapshot is refreshed + +# Overlay +OVERLAY_BG_COLOR = (0, 0, 0, 160) # RGBA +OVERLAY_TEXT_COLOR = (255, 255, 255, 255) +OVERLAY_FONT_SIZE = 13 +OVERLAY_PADDING = 10 +OVERLAY_MARGIN = 10 + +# Frame dispatcher +DISPATCHER_MAX_QUEUE_SIZE = 2 # max pending frames per slow subscriber before drop diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..d852165 --- /dev/null +++ b/app/main.py @@ -0,0 +1,35 @@ +"""Application entry point.""" + +from __future__ import annotations + +import logging +import sys + +from PySide6.QtCore import Qt +from PySide6.QtWidgets import QApplication + +from app.config import APP_NAME +from app.ui.main_window import MainWindow + + +def main() -> None: + # Basic logging — WARNING by default; Debug menu toggles to DEBUG + logging.basicConfig( + level=logging.WARNING, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + ) + + app = QApplication(sys.argv) + app.setApplicationName(APP_NAME) + app.setHighDpiScaleFactorRoundingPolicy( + Qt.HighDpiScaleFactorRoundingPolicy.PassThrough + ) + + window = MainWindow() + window.show() + + sys.exit(app.exec()) + + +if __name__ == "__main__": + main() diff --git a/app/overlay/__init__.py b/app/overlay/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/overlay/overlay_widget.py b/app/overlay/overlay_widget.py new file mode 100644 index 0000000..e456b99 --- /dev/null +++ b/app/overlay/overlay_widget.py @@ -0,0 +1,126 @@ +"""Overlay Widget — transparent layer rendered above the video preview.""" + +from __future__ import annotations + +from PySide6.QtCore import QRect, Qt, Slot +from PySide6.QtGui import QColor, QFont, QPainter, QPen +from PySide6.QtWidgets import QWidget + +from app.config import ( + OVERLAY_BG_COLOR, + OVERLAY_FONT_SIZE, + OVERLAY_MARGIN, + OVERLAY_PADDING, + OVERLAY_TEXT_COLOR, +) +from app.telemetry.telemetry_collector import TelemetrySnapshot + + +class OverlayWidget(QWidget): + """ + Semi-transparent performance metrics overlay. + + Sits on top of the video widget (same parent, raised). + Does NOT intercept mouse events — clicks pass through to the video widget. + + Usage: + overlay = OverlayWidget(parent=main_window) + telemetry_collector.metrics_updated.connect(overlay.update_metrics) + """ + + def __init__(self, parent: QWidget | None = None) -> None: + super().__init__(parent) + + # Make widget transparent to mouse and visual background + self.setAttribute(Qt.WidgetAttribute.WA_TransparentForMouseEvents, True) + self.setAttribute(Qt.WidgetAttribute.WA_NoSystemBackground, True) + self.setAttribute(Qt.WidgetAttribute.WA_TranslucentBackground, True) + self.setWindowFlags(Qt.WindowType.FramelessWindowHint) + + self._snapshot: TelemetrySnapshot | None = None + self._visible_overlay: bool = True + + # Font + self._font = QFont("Monospace") + self._font.setStyleHint(QFont.StyleHint.TypeWriter) + self._font.setPointSize(OVERLAY_FONT_SIZE) + self._font.setBold(False) + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + @Slot(object) + def update_metrics(self, snapshot: TelemetrySnapshot) -> None: + """Receive a new telemetry snapshot and trigger a repaint.""" + self._snapshot = snapshot + if self._visible_overlay: + self.update() + + def set_overlay_visible(self, visible: bool) -> None: + self._visible_overlay = visible + self.update() + + def toggle_overlay(self) -> None: + self.set_overlay_visible(not self._visible_overlay) + + # ------------------------------------------------------------------ + # Qt painting + # ------------------------------------------------------------------ + + def paintEvent(self, event) -> None: # noqa: N802 + if not self._visible_overlay or self._snapshot is None: + return + + lines = self._format_lines(self._snapshot) + if not lines: + return + + painter = QPainter(self) + painter.setRenderHint(QPainter.RenderHint.Antialiasing, False) + painter.setFont(self._font) + + fm = painter.fontMetrics() + line_height = fm.height() + max_width = max(fm.horizontalAdvance(line) for line in lines) + + box_w = max_width + OVERLAY_PADDING * 2 + box_h = line_height * len(lines) + OVERLAY_PADDING * 2 + + x = OVERLAY_MARGIN + y = OVERLAY_MARGIN + + # Background rectangle + bg = QColor(*OVERLAY_BG_COLOR) + painter.setBrush(bg) + painter.setPen(Qt.PenStyle.NoPen) + painter.drawRoundedRect(QRect(x, y, box_w, box_h), 6, 6) + + # Text + text_color = QColor(*OVERLAY_TEXT_COLOR) + painter.setPen(QPen(text_color)) + + text_x = x + OVERLAY_PADDING + text_y = y + OVERLAY_PADDING + fm.ascent() + + for line in lines: + painter.drawText(text_x, text_y, line) + text_y += line_height + + painter.end() + + # ------------------------------------------------------------------ + # Private + # ------------------------------------------------------------------ + + @staticmethod + def _format_lines(snap: TelemetrySnapshot) -> list[str]: + lines = [ + f"FPS {snap.fps:>6.1f}", + f"Frame {snap.frame_time_ms:>6.1f} ms", + f"Drop {snap.dropped_frames:>6d}", + f"CPU {snap.cpu_percent:>5.1f} %", + ] + if snap.memory_mb is not None: + lines.append(f"Mem {snap.memory_mb:>5.0f} MB") + return lines diff --git a/app/pipeline/__init__.py b/app/pipeline/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/pipeline/frame_dispatcher.py b/app/pipeline/frame_dispatcher.py new file mode 100644 index 0000000..4fbbdc0 --- /dev/null +++ b/app/pipeline/frame_dispatcher.py @@ -0,0 +1,111 @@ +"""Frame Dispatcher — distributes QVideoFrames to registered subscribers.""" + +from __future__ import annotations + +import logging +import time +from collections.abc import Callable +from dataclasses import dataclass, field + +from PySide6.QtCore import QObject, Slot +from PySide6.QtMultimedia import QVideoFrame + +logger = logging.getLogger(__name__) + +FrameCallback = Callable[[QVideoFrame], None] + + +@dataclass +class _Subscriber: + callback: FrameCallback + drop_if_busy: bool = True + _busy: bool = field(default=False, init=False, repr=False) + + +class FrameDispatcher(QObject): + """ + Receives frames from CameraService and fans them out to all subscribers. + + Each subscriber is a callable (QVideoFrame) -> None. + + Subscribers that set drop_if_busy=True will skip a frame if they are still + processing the previous one (non-blocking). Subscribers with drop_if_busy=False + always receive every frame. + + All dispatch happens in the GUI thread (Qt signal/slot), so subscribers + must NOT perform heavy work directly — they should queue to a worker thread + if processing is needed. + """ + + def __init__(self, parent: QObject | None = None) -> None: + super().__init__(parent) + self._subscribers: list[_Subscriber] = [] + self._frame_count: int = 0 + self._last_dispatch_time: float = 0.0 + + # ------------------------------------------------------------------ + # Subscription API + # ------------------------------------------------------------------ + + def subscribe(self, callback: FrameCallback, *, drop_if_busy: bool = True) -> None: + """Register a frame callback. + + Args: + callback: Callable that receives QVideoFrame. + drop_if_busy: When True, frame is skipped if subscriber is still + marked busy from last call (default True). + """ + for sub in self._subscribers: + if sub.callback is callback: + logger.warning("Subscriber %r already registered", callback) + return + self._subscribers.append(_Subscriber(callback=callback, drop_if_busy=drop_if_busy)) + logger.debug("Subscriber added: %r (drop_if_busy=%s)", callback, drop_if_busy) + + def unsubscribe(self, callback: FrameCallback) -> None: + """Remove a previously registered callback.""" + before = len(self._subscribers) + self._subscribers = [s for s in self._subscribers if s.callback is not callback] + if len(self._subscribers) < before: + logger.debug("Subscriber removed: %r", callback) + else: + logger.warning("Subscriber not found for removal: %r", callback) + + def subscriber_count(self) -> int: + return len(self._subscribers) + + # ------------------------------------------------------------------ + # Frame intake — connect CameraService.frame_ready to this slot + # ------------------------------------------------------------------ + + @Slot(QVideoFrame) + def dispatch(self, frame: QVideoFrame) -> None: + """Distribute the frame to all registered subscribers.""" + self._frame_count += 1 + now = time.perf_counter() + self._last_dispatch_time = now + + for sub in self._subscribers: + if sub.drop_if_busy and sub._busy: + logger.debug("Dropping frame for busy subscriber %r", sub.callback) + continue + sub._busy = True + try: + sub.callback(frame) + except Exception: + logger.exception("Error in frame subscriber %r", sub.callback) + finally: + sub._busy = False + + # ------------------------------------------------------------------ + # Stats + # ------------------------------------------------------------------ + + @property + def frame_count(self) -> int: + return self._frame_count + + @property + def last_dispatch_time(self) -> float: + """perf_counter timestamp of the last dispatched frame.""" + return self._last_dispatch_time diff --git a/app/telemetry/__init__.py b/app/telemetry/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/telemetry/telemetry_collector.py b/app/telemetry/telemetry_collector.py new file mode 100644 index 0000000..221343c --- /dev/null +++ b/app/telemetry/telemetry_collector.py @@ -0,0 +1,171 @@ +"""Telemetry Collector — measures video pipeline performance metrics.""" + +from __future__ import annotations + +import time +from collections import deque +from dataclasses import dataclass + +import psutil +from PySide6.QtCore import QObject, QTimer, Signal +from PySide6.QtMultimedia import QVideoFrame + +from app.config import TELEMETRY_UPDATE_INTERVAL_MS + + +@dataclass +class TelemetrySnapshot: + """Immutable snapshot of current performance metrics.""" + + fps: float + frame_time_ms: float # average inter-frame time in ms + dropped_frames: int # cumulative dropped frames detected + cpu_percent: float # overall CPU usage (0–100) + memory_mb: float | None # RSS memory usage in MB (optional) + timestamp: float # time.perf_counter() when snapshot was taken + + +class TelemetryCollector(QObject): + """ + Frame subscriber that measures pipeline performance. + + Connect to FrameDispatcher: + dispatcher.subscribe(collector.on_frame, drop_if_busy=False) + + Listen to metrics updates: + collector.metrics_updated.connect(my_slot) + """ + + metrics_updated = Signal(object) # emits TelemetrySnapshot + + def __init__( + self, + update_interval_ms: int = TELEMETRY_UPDATE_INTERVAL_MS, + parent: QObject | None = None, + ) -> None: + super().__init__(parent) + + self._update_interval_ms = update_interval_ms + + # frame timing ring-buffer (last 120 samples) + self._frame_times: deque[float] = deque(maxlen=120) + self._last_frame_time: float = 0.0 + self._total_frames: int = 0 + self._dropped_frames: int = 0 + + # FPS window — count frames in the last second + self._fps_window: deque[float] = deque() # timestamps of recent frames + self._fps_window_size_s: float = 1.0 + + # psutil process reference + self._process = psutil.Process() + + # periodic snapshot timer + self._timer = QTimer(self) + self._timer.setInterval(update_interval_ms) + self._timer.timeout.connect(self._emit_snapshot) + self._timer.start() + + # latest snapshot (available synchronously) + self._latest: TelemetrySnapshot = self._make_empty_snapshot() + + # ------------------------------------------------------------------ + # Frame subscriber callback + # ------------------------------------------------------------------ + + def on_frame(self, frame: QVideoFrame) -> None: + """Called by FrameDispatcher for every frame. Must be fast.""" + now = time.perf_counter() + + # inter-frame time + if self._last_frame_time > 0: + delta = now - self._last_frame_time + self._frame_times.append(delta) + + # drop detection: if delta > 2.5× the rolling average, count as drop + if len(self._frame_times) >= 5: + avg = sum(self._frame_times) / len(self._frame_times) + if delta > avg * 2.5: + self._dropped_frames += 1 + + self._last_frame_time = now + self._total_frames += 1 + + # FPS window + self._fps_window.append(now) + # prune old entries + cutoff = now - self._fps_window_size_s + while self._fps_window and self._fps_window[0] < cutoff: + self._fps_window.popleft() + + # ------------------------------------------------------------------ + # Snapshot + # ------------------------------------------------------------------ + + def latest_snapshot(self) -> TelemetrySnapshot: + """Return the most recently computed snapshot.""" + return self._latest + + def reset_counters(self) -> None: + """Reset cumulative counters (e.g. after camera switch).""" + self._frame_times.clear() + self._fps_window.clear() + self._last_frame_time = 0.0 + self._total_frames = 0 + self._dropped_frames = 0 + + # ------------------------------------------------------------------ + # Internal + # ------------------------------------------------------------------ + + def _emit_snapshot(self) -> None: + snapshot = self._compute_snapshot() + self._latest = snapshot + self.metrics_updated.emit(snapshot) + + def _compute_snapshot(self) -> TelemetrySnapshot: + now = time.perf_counter() + + # FPS — prune stale entries before counting + cutoff = now - self._fps_window_size_s + while self._fps_window and self._fps_window[0] < cutoff: + self._fps_window.popleft() + fps = float(len(self._fps_window)) # frames in the last second + + # average frame time + if self._frame_times: + avg_frame_time_ms = (sum(self._frame_times) / len(self._frame_times)) * 1000.0 + else: + avg_frame_time_ms = 0.0 + + # CPU + try: + cpu = psutil.cpu_percent(interval=None) + except Exception: + cpu = 0.0 + + # memory + try: + mem_mb = self._process.memory_info().rss / (1024 * 1024) + except Exception: + mem_mb = None + + return TelemetrySnapshot( + fps=round(fps, 1), + frame_time_ms=round(avg_frame_time_ms, 2), + dropped_frames=self._dropped_frames, + cpu_percent=round(cpu, 1), + memory_mb=round(mem_mb, 1) if mem_mb is not None else None, + timestamp=now, + ) + + @staticmethod + def _make_empty_snapshot() -> TelemetrySnapshot: + return TelemetrySnapshot( + fps=0.0, + frame_time_ms=0.0, + dropped_frames=0, + cpu_percent=0.0, + memory_mb=None, + timestamp=time.perf_counter(), + ) diff --git a/app/ui/__init__.py b/app/ui/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/ui/main_window.py b/app/ui/main_window.py new file mode 100644 index 0000000..3b143ac --- /dev/null +++ b/app/ui/main_window.py @@ -0,0 +1,170 @@ +"""Main application window.""" + +from __future__ import annotations + +import logging + +from PySide6.QtCore import Qt, QTimer +from PySide6.QtMultimedia import QVideoWidget +from PySide6.QtWidgets import QLabel, QMainWindow, QSizePolicy, QStatusBar + +from app.camera.camera_enumerator import CameraEnumerator, CameraInfo +from app.camera.camera_service import CameraService +from app.config import APP_NAME, APP_VERSION +from app.overlay.overlay_widget import OverlayWidget +from app.pipeline.frame_dispatcher import FrameDispatcher +from app.telemetry.telemetry_collector import TelemetryCollector +from app.ui.menu_bar import AppMenuBar + +logger = logging.getLogger(__name__) + + +class MainWindow(QMainWindow): + """ + Top-level application window. + + Wires together: + CameraService → FrameDispatcher → TelemetryCollector + → OverlayWidget (via metrics_updated) + CameraService.capture_session → QVideoWidget (direct, zero-copy path) + """ + + def __init__(self) -> None: + super().__init__() + + self.setWindowTitle(f"{APP_NAME} v{APP_VERSION}") + self.setMinimumSize(640, 480) + self.resize(1280, 720) + + # --- Core components --- + self._camera_service = CameraService(self) + self._dispatcher = FrameDispatcher(self) + self._telemetry = TelemetryCollector(parent=self) + + # --- Video widget (central widget) --- + self._video_widget = QVideoWidget(self) + self._video_widget.setSizePolicy( + QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding + ) + self._video_widget.setAspectRatioMode(Qt.AspectRatioMode.KeepAspectRatio) + self.setCentralWidget(self._video_widget) + + # Connect camera session to video widget — this is the zero-copy render path + self._camera_service.capture_session().setVideoOutput(self._video_widget) + + # --- Overlay --- + self._overlay = OverlayWidget(parent=self._video_widget) + self._overlay.raise_() + self._overlay.resize(self._video_widget.size()) + + # --- Menu bar --- + self._menu = AppMenuBar(self) + self.setMenuBar(self._menu) + + # --- Status bar --- + self._status_bar = QStatusBar(self) + self.setStatusBar(self._status_bar) + self._status_label = QLabel("Initialising…") + self._status_bar.addWidget(self._status_label) + + # --- Wire signals --- + self._wire_signals() + + # --- Enumerate cameras and start --- + QTimer.singleShot(0, self._initialise_cameras) + + # ------------------------------------------------------------------ + # Initialisation + # ------------------------------------------------------------------ + + def _initialise_cameras(self) -> None: + cameras = CameraEnumerator.list_cameras() + + if not cameras: + self._status_label.setText("No cameras found") + logger.warning("No cameras detected") + return + + self._menu.populate_cameras(cameras) + + default = CameraEnumerator.default_camera() + start_cam = default if default is not None else cameras[0] + + self._menu.populate_formats(start_cam) + self._start_camera(start_cam) + + def _start_camera(self, cam: CameraInfo) -> None: + self._telemetry.reset_counters() + self._camera_service.start(cam) + self._menu.set_active_camera(cam) + self._status_label.setText(f"Opening: {cam.name}") + + # ------------------------------------------------------------------ + # Signal wiring + # ------------------------------------------------------------------ + + def _wire_signals(self) -> None: + # CameraService → FrameDispatcher + self._camera_service.frame_ready.connect(self._dispatcher.dispatch) + + # FrameDispatcher → TelemetryCollector (never drop for telemetry) + self._dispatcher.subscribe(self._telemetry.on_frame, drop_if_busy=False) + + # TelemetryCollector → OverlayWidget + self._telemetry.metrics_updated.connect(self._overlay.update_metrics) + + # CameraService status + self._camera_service.camera_started.connect(self._on_camera_started) + self._camera_service.camera_stopped.connect(self._on_camera_stopped) + self._camera_service.camera_error.connect(self._on_camera_error) + + # Menu signals + self._menu.camera_selected.connect(self._on_camera_selected) + self._menu.resolution_selected.connect(self._on_resolution_selected) + self._menu.fps_selected.connect(self._on_fps_selected) + self._menu.reconnect_requested.connect(self._camera_service.reconnect) + self._menu.overlay_toggled.connect(self._overlay.set_overlay_visible) + + # ------------------------------------------------------------------ + # Camera status slots + # ------------------------------------------------------------------ + + def _on_camera_started(self) -> None: + cam = self._camera_service.current_camera + name = cam.name if cam else "Unknown" + self._status_label.setText(f"Streaming: {name}") + logger.info("Camera streaming: %s", name) + + def _on_camera_stopped(self) -> None: + self._status_label.setText("Camera stopped") + + def _on_camera_error(self, message: str) -> None: + self._status_label.setText(f"Error: {message}") + logger.error("Camera error: %s", message) + + # ------------------------------------------------------------------ + # Menu action slots + # ------------------------------------------------------------------ + + def _on_camera_selected(self, cam: CameraInfo) -> None: + self._start_camera(cam) + + def _on_resolution_selected(self, width: int, height: int) -> None: + self._camera_service.set_resolution(width, height) + + def _on_fps_selected(self, fps: float) -> None: + self._camera_service.set_fps(fps) + + # ------------------------------------------------------------------ + # Qt overrides + # ------------------------------------------------------------------ + + def resizeEvent(self, event) -> None: # noqa: N802 + super().resizeEvent(event) + # Keep overlay covering the video widget + if hasattr(self, "_overlay") and hasattr(self, "_video_widget"): + self._overlay.resize(self._video_widget.size()) + + def closeEvent(self, event) -> None: # noqa: N802 + self._camera_service.stop() + super().closeEvent(event) diff --git a/app/ui/menu_bar.py b/app/ui/menu_bar.py new file mode 100644 index 0000000..ea36097 --- /dev/null +++ b/app/ui/menu_bar.py @@ -0,0 +1,198 @@ +"""Menu bar — camera, video format, FPS and debug controls.""" + +from __future__ import annotations + +import logging + +from PySide6.QtCore import Signal +from PySide6.QtGui import QAction, QActionGroup +from PySide6.QtWidgets import QMenuBar, QWidget + +from app.camera.camera_enumerator import CameraInfo + +logger = logging.getLogger(__name__) + + +class AppMenuBar(QMenuBar): + """ + Application menu bar. + + Signals: + camera_selected(CameraInfo) — user picked a camera + resolution_selected(int, int) — user picked (width, height) + fps_selected(float) — user picked a target FPS + reconnect_requested() — user hit Reconnect + overlay_toggled(bool) — overlay show/hide + log_toggled(bool) — console logging on/off + """ + + camera_selected = Signal(object) # CameraInfo + resolution_selected = Signal(int, int) + fps_selected = Signal(float) + reconnect_requested = Signal() + overlay_toggled = Signal(bool) + log_toggled = Signal(bool) + + def __init__(self, parent: QWidget | None = None) -> None: + super().__init__(parent) + + self._camera_group: QActionGroup | None = None + self._resolution_group: QActionGroup | None = None + self._fps_group: QActionGroup | None = None + self._cameras: list[CameraInfo] = [] + + self._build_menus() + + # ------------------------------------------------------------------ + # Public API — called after camera enumeration + # ------------------------------------------------------------------ + + def populate_cameras(self, cameras: list[CameraInfo]) -> None: + """Populate the Camera menu with discovered devices.""" + self._cameras = cameras + menu = self._camera_menu + + # Remove existing camera actions (keep Reconnect + separator) + for action in list(menu.actions()): + if action not in (self._reconnect_action, self._cam_separator): + menu.removeAction(action) + + self._camera_group = QActionGroup(self) + self._camera_group.setExclusive(True) + + for cam in cameras: + action = QAction(cam.name, self) + action.setCheckable(True) + action.setData(cam) + self._camera_group.addAction(action) + menu.insertAction(self._cam_separator, action) + action.triggered.connect(self._on_camera_action) + + if cameras: + first = self._camera_group.actions()[0] + first.setChecked(True) + + def populate_formats(self, camera_info: CameraInfo) -> None: + """Populate Resolution and FPS menus based on a camera's supported formats.""" + self._populate_resolutions(camera_info) + self._populate_fps(camera_info) + + def set_active_camera(self, camera_info: CameraInfo) -> None: + """Check the menu item matching camera_info.""" + if self._camera_group is None: + return + for action in self._camera_group.actions(): + if action.data() is camera_info: + action.setChecked(True) + return + + # ------------------------------------------------------------------ + # Menu construction + # ------------------------------------------------------------------ + + def _build_menus(self) -> None: + # --- Camera menu --- + self._camera_menu = self.addMenu("Camera") + self._cam_separator = self._camera_menu.addSeparator() + self._reconnect_action = QAction("Reconnect", self) + self._reconnect_action.triggered.connect(self.reconnect_requested) + self._camera_menu.addAction(self._reconnect_action) + + # --- Video menu --- + self._video_menu = self.addMenu("Video") + self._res_menu = self._video_menu.addMenu("Resolution") + self._fps_menu = self._video_menu.addMenu("FPS") + + # --- Debug menu --- + debug_menu = self.addMenu("Debug") + + self._overlay_action = QAction("Show Overlay", self) + self._overlay_action.setCheckable(True) + self._overlay_action.setChecked(True) + self._overlay_action.toggled.connect(self.overlay_toggled) + debug_menu.addAction(self._overlay_action) + + self._log_action = QAction("Console Logging", self) + self._log_action.setCheckable(True) + self._log_action.setChecked(False) + self._log_action.toggled.connect(self._on_log_toggled) + debug_menu.addAction(self._log_action) + + def _populate_resolutions(self, camera_info: CameraInfo) -> None: + self._res_menu.clear() + self._resolution_group = QActionGroup(self) + self._resolution_group.setExclusive(True) + + seen: set[tuple[int, int]] = set() + for w, h, _ in camera_info.formats: + key = (w, h) + if key in seen: + continue + seen.add(key) + action = QAction(f"{w} × {h}", self) + action.setCheckable(True) + action.setData((w, h)) + self._resolution_group.addAction(action) + self._res_menu.addAction(action) + action.triggered.connect(self._on_resolution_action) + + actions = self._resolution_group.actions() + if actions: + actions[0].setChecked(True) + + def _populate_fps(self, camera_info: CameraInfo) -> None: + self._fps_menu.clear() + self._fps_group = QActionGroup(self) + self._fps_group.setExclusive(True) + + seen: set[int] = set() + for _, _, fps in camera_info.formats: + key = round(fps) + if key in seen: + continue + seen.add(key) + action = QAction(f"{key} fps", self) + action.setCheckable(True) + action.setData(float(fps)) + self._fps_group.addAction(action) + self._fps_menu.addAction(action) + action.triggered.connect(self._on_fps_action) + + actions = self._fps_group.actions() + if actions: + actions[0].setChecked(True) + + # ------------------------------------------------------------------ + # Slots + # ------------------------------------------------------------------ + + def _on_camera_action(self) -> None: + action = self.sender() + if action is None: + return + cam: CameraInfo = action.data() + logger.debug("Camera selected: %s", cam.name) + self.camera_selected.emit(cam) + self._populate_resolutions(cam) + self._populate_fps(cam) + + def _on_resolution_action(self) -> None: + action = self.sender() + if action is None: + return + w, h = action.data() + logger.debug("Resolution selected: %dx%d", w, h) + self.resolution_selected.emit(w, h) + + def _on_fps_action(self) -> None: + action = self.sender() + if action is None: + return + fps: float = action.data() + logger.debug("FPS selected: %.1f", fps) + self.fps_selected.emit(fps) + + def _on_log_toggled(self, enabled: bool) -> None: + level = logging.DEBUG if enabled else logging.WARNING + logging.getLogger().setLevel(level) + self.log_toggled.emit(enabled) diff --git a/notes/01-mvp-plan.md b/notes/01-mvp-plan.md new file mode 100644 index 0000000..d87b8b1 --- /dev/null +++ b/notes/01-mvp-plan.md @@ -0,0 +1,324 @@ +# Plan działania — MVP Camera Preview (PySide6) + +## Środowisko + +| Element | Wartość | +|---|---| +| Python | 3.12.10 (venv: `.venv-win`) | +| Framework GUI | PySide6 6.11.0 | +| Dev platform | Windows 11 | +| Target platform | Mac Mini (Intel i7, macOS Ventura) | +| Kamera docelowa | ELP USB Camera | +| Narzędzia | pytest, ruff, colorama | + +--- + +## Fazy realizacji + +### Faza 0 — Projekt i scaffolding + +Cel: ustalenie struktury katalogów i modułów przed napisaniem pierwszej linii logiki. + +#### 0.1 Struktura projektu + +``` +duck-preview2/ +├── app/ +│ ├── __init__.py +│ ├── main.py # entry point +│ ├── config.py # stałe, domyślne ustawienia +│ ├── camera/ +│ │ ├── __init__.py +│ │ ├── camera_service.py # QCamera + QMediaCaptureSession +│ │ └── camera_enumerator.py # wykrywanie dostępnych kamer +│ ├── pipeline/ +│ │ ├── __init__.py +│ │ └── frame_dispatcher.py # dystrybucja klatek do subskrybentów +│ ├── telemetry/ +│ │ ├── __init__.py +│ │ └── telemetry_collector.py # zbieranie metryk FPS/frame time/CPU +│ ├── overlay/ +│ │ ├── __init__.py +│ │ └── overlay_widget.py # przezroczysta warstwa QWidget +│ └── ui/ +│ ├── __init__.py +│ ├── main_window.py # główne okno aplikacji +│ └── menu_bar.py # menu: kamera, rozdzielczość, FPS, debug +├── tests/ +│ ├── __init__.py +│ ├── test_camera_enumerator.py +│ └── test_telemetry_collector.py +├── notes/ +├── requirements.txt +├── requirements-dev.txt +└── pyproject.toml # konfiguracja ruff + pytest +``` + +#### 0.2 Pliki konfiguracyjne + +- `pyproject.toml` — konfiguracja ruff (linter/formatter) i pytest +- `requirements.txt` — zależności produkcyjne (PySide6) +- `requirements-dev.txt` — zależności deweloperskie (pytest, ruff) +- `.gitignore` — aktualizacja o artefakty Pythona + +--- + +### Faza 1 — Camera Service + +Cel: stabilne pobranie obrazu z kamery przez QtMultimedia. + +#### 1.1 Camera Enumerator + +- `QMediaDevices.videoInputs()` — lista dostępnych kamer +- Zwraca listę `QCameraDevice` z nazwą, id i obsługiwanymi formatami +- Obsługa braku kamer (komunikat, nie crash) +- Test jednostkowy: mockowanie `QMediaDevices` + +#### 1.2 Camera Service + +- Opakowuje `QCamera` + `QMediaCaptureSession` +- API: + - `start(device: QCameraDevice)` — uruchamia kamerę + - `stop()` — zatrzymuje kamerę + - `set_resolution(width, height)` — ustawia format + - `set_fps(fps)` — ustawia docelowy FPS + - `reconnect()` — restart po błędzie +- `QVideoSink` jako punkt odbioru klatek +- Sygnał `frame_ready(QVideoFrame)` do Frame Dispatcher +- Obsługa błędów kamery (`QCamera.errorOccurred`) + +#### 1.3 Uwagi platformowe + +| Aspekt | Windows 11 (dev) | macOS Ventura (target) | +|---|---|---| +| Backend | DirectShow / Media Foundation | AVFoundation | +| Kamera ELP | USB, standardowy UVC driver | USB, UVC | +| Format klatek | YUYV / MJPEG | YUYV / MJPEG | +| GPU rendering | ANGLE (OpenGL ES) | Metal | + +--- + +### Faza 2 — Frame Dispatcher + +Cel: dystrybucja klatek do wielu odbiorców bez blokowania akwizycji. + +#### 2.1 Frame Dispatcher + +- Wzorzec: publish-subscribe (lista callbacków) +- `subscribe(callback: Callable[[QVideoFrame], None])` +- `unsubscribe(callback)` +- `dispatch(frame: QVideoFrame)` — wywołuje wszystkich subskrybentów +- Klatki NIE są kopiowane — subskrybenci działają na referencji +- Subskrybenci mogą **pominąć klatkę** (tryb drop-if-busy) +- Wywołanie `dispatch` następuje w wątku GUI (slot połączony z `frame_ready`) + +#### 2.2 Subskrybenci w Fazie 1 + +| Subskrybent | Działanie | +|---|---| +| Video Renderer | przekazuje klatkę do `QVideoSink` / `QVideoWidget` | +| Telemetry Collector | mierzy czas, zlicza klatki | + +--- + +### Faza 3 — Video Renderer + +Cel: renderowanie klatki w GUI bez zbędnych kopii. + +#### 3.1 Podejście + +- `QVideoWidget` jako główny widget podglądu +- `QMediaCaptureSession.setVideoOutput(QVideoWidget)` — ścieżka bezpośrednia, zero kopii +- Alternatywnie: `QVideoSink` → `QGraphicsVideoItem` dla przyszłych overlayów +- Domyślnie: `QVideoWidget` (prosta, niska latencja) + +#### 3.2 Wymagania + +- Preview nie blokuje wątku GUI +- Obsługa aspect ratio (letter/pillarbox) +- Resize okna bez migotania + +--- + +### Faza 4 — Telemetry Collector + +Cel: dokładne metryki pipeline'u wideo. + +#### 4.1 Zbierane metryki + +| Metryka | Metoda pomiaru | +|---|---| +| Realtime FPS | licznik klatek / okno 1 s | +| Frame time | `time.perf_counter()` między klatkami | +| Frame acquisition time | timestamp wejście frame_ready → dispatch | +| Rendering time | czas `QVideoWidget.update()` (opcjonalnie) | +| Dropped frames | detekcja przez numerację lub timestamp gap | +| CPU usage | `psutil.cpu_percent()` (dodać do requirements) | +| Memory usage | `psutil.virtual_memory()` (opcjonalnie) | + +#### 4.2 API + +- `TelemetryCollector` — subskrybent Frame Dispatcher +- `on_frame(frame: QVideoFrame)` — rejestruje timestamp klatki +- `get_snapshot() -> TelemetrySnapshot` — aktualny stan metryk (dataclass) +- `update_interval_ms: int` — jak często odświeżać snapshot (domyślnie 500 ms) +- Sygnał `metrics_updated(TelemetrySnapshot)` — emitowany co `update_interval_ms` + +#### 4.3 TelemetrySnapshot (dataclass) + +```python +@dataclass +class TelemetrySnapshot: + fps: float + frame_time_ms: float + dropped_frames: int + cpu_percent: float + memory_mb: float | None + timestamp: float +``` + +--- + +### Faza 5 — Overlay System + +Cel: wyświetlanie metryk na przezroczystej warstwie nad podglądem. + +#### 5.1 Architektura + +- `OverlayWidget(QWidget)` — przezroczysty widget (`WA_TransparentForMouseEvents`) +- Pozycjonowany absolutnie nad `QVideoWidget` (ten sam parent, wyższy z-index) +- `paintEvent` rysuje semi-przezroczysty prostokąt + tekst z metrykami +- Połączony z sygnałem `metrics_updated` — odświeża tylko gdy dane się zmienią + +#### 5.2 Zawartość overlaya (MVP) + +``` +FPS: 60.0 +Frame: 16.7 ms +Drop: 0 +CPU: 12.3 % +``` + +#### 5.3 Sterowalność + +- Widoczność overlaya: toggle przez menu Debug +- Pozycja: lewy górny róg (stała w MVP) +- Kolor tła: `rgba(0, 0, 0, 160)` + +--- + +### Faza 6 — GUI / Main Window + +Cel: minimalne, funkcjonalne okno aplikacji. + +#### 6.1 MainWindow + +- `QMainWindow` z `QVideoWidget` jako central widget +- `OverlayWidget` nałożony na video +- Obsługa resize → reposition overlay +- Tytuł okna: `Duck Preview` + +#### 6.2 MenuBar + +Menu **Camera**: +- Lista wykrytych kamer (radio-style) +- Separator +- Reconnect + +Menu **Video**: +- Resolution submenu (pobierane dynamicznie z `QCameraDevice.videoFormats()`) +- FPS submenu + +Menu **Debug**: +- Toggle overlay metryk +- Logowanie do konsoli (toggle) + +#### 6.3 Startup flow + +``` +main.py + → QApplication + → CameraEnumerator.list_cameras() + → MainWindow(cameras) + → CameraService.start(cameras[0]) # pierwsza kamera lub ELP + → FrameDispatcher.subscribe(telemetry, renderer) + → app.exec() +``` + +--- + +### Faza 7 — Testy i walidacja + +#### 7.1 Testy jednostkowe + +| Moduł | Co testować | +|---|---| +| `CameraEnumerator` | lista kamer, brak kamer, format danych | +| `TelemetryCollector` | obliczenia FPS, wykrywanie dropów | +| `FrameDispatcher` | subskrypcja, odsubskrypcja, dispatch | +| `TelemetrySnapshot` | poprawność dataclass | + +#### 7.2 Testy manualne (Windows dev) + +- [ ] Uruchomienie z kamerą laptopa / USB webcam +- [ ] Przełączanie kamer +- [ ] Zmiana rozdzielczości +- [ ] Zmiana FPS +- [ ] Toggle overlay +- [ ] Reconnect po odłączeniu kamery + +#### 7.3 Testy na Mac Mini (target) + +- [ ] Wykrycie kamery ELP +- [ ] Poprawny format YUYV/MJPEG +- [ ] Wydajność AVFoundation vs DirectShow +- [ ] GPU rendering przez Metal + +#### 7.4 Kryteria sukcesu (z PRD) + +- Preview stabilny i płynny +- Latencja renderowania niska +- Dane telemetrii dokładne +- GUI responsywne +- Overlay działa poprawnie +- Architektura gotowa na subskrybentów AI + +--- + +## Kolejność implementacji (sprint order) + +``` +Sprint 1: Faza 0 — scaffolding, pyproject.toml, requirements +Sprint 2: Faza 1 — CameraEnumerator + CameraService (bez GUI) +Sprint 3: Faza 3 — VideoRenderer + MainWindow (preview działa) +Sprint 4: Faza 2 — FrameDispatcher (refactor pipeline) +Sprint 5: Faza 4 — TelemetryCollector +Sprint 6: Faza 5 — OverlayWidget +Sprint 7: Faza 6 — MenuBar (camera/resolution/fps switch) +Sprint 8: Faza 7 — Testy, poprawki, walidacja na Mac Mini +``` + +--- + +## Zależności do dodania + +``` +# requirements.txt +PySide6>=6.7 +psutil>=6.0 + +# requirements-dev.txt +pytest>=8.0 +ruff>=0.4 +``` + +--- + +## Uwagi cross-platform + +1. **ELP camera** — kamera UVC, powinna działać bez dodatkowych sterowników na obu platformach. Sprawdzić obsługiwane rozdzielczości i FPS przez `QCameraDevice.videoFormats()`. +2. **Ścieżki absolutne** — unikać `os.path` na korzyść `pathlib.Path`. +3. **Threading** — wszystkie operacje Qt muszą odbywać się w wątku GUI. `TelemetryCollector` może używać `QTimer` zamiast osobnego wątku. +4. **Format klatek** — na macOS AVFoundation preferuje `BGRA` lub `NV12`. Konwersja powinna być leniwa i tylko gdy potrzebna (nie w hot path renderowania). +5. **High DPI** — włączyć `QApplication.setHighDpiScaleFactorRoundingPolicy` dla konsistencji Windows/Mac. +6. **Testowanie bez kamery** — `CameraEnumerator` powinien umożliwiać dependency injection / mock dla środowisk CI. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..30de526 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,43 @@ +[project] +name = "duck-preview" +version = "0.1.0" +description = "Realtime camera preview application with telemetry" +requires-python = ">=3.12" +dependencies = [ + "PySide6>=6.7", + "psutil>=6.0", +] + +[project.scripts] +duck-preview = "app.main:main" + +[tool.pytest.ini_options] +testpaths = ["tests"] +python_files = ["test_*.py"] +python_functions = ["test_*"] +addopts = "-v" + +[tool.ruff] +target-version = "py312" +line-length = 100 + +[tool.ruff.lint] +select = [ + "E", # pycodestyle errors + "W", # pycodestyle warnings + "F", # pyflakes + "I", # isort + "UP", # pyupgrade + "B", # flake8-bugbear + "N", # pep8-naming +] +ignore = [ + "B008", # do not perform function calls in default arguments +] + +[tool.ruff.lint.isort] +known-first-party = ["app"] + +[tool.ruff.format] +quote-style = "double" +indent-style = "space" diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..ecd205f --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,3 @@ +-r requirements.txt +pytest>=8.0 +ruff>=0.4 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..3fcaa84 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +PySide6>=6.7 +psutil>=6.0 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_frame_dispatcher.py b/tests/test_frame_dispatcher.py new file mode 100644 index 0000000..4de81f9 --- /dev/null +++ b/tests/test_frame_dispatcher.py @@ -0,0 +1,76 @@ +"""Tests for FrameDispatcher.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +from app.pipeline.frame_dispatcher import FrameDispatcher + + +def _make_frame(): + frame = MagicMock() + frame.isValid.return_value = True + return frame + + +class TestFrameDispatcher: + def setup_method(self): + # FrameDispatcher is a QObject — needs QApplication + # Use minimal mock to avoid Qt dependency in unit tests + with patch("app.pipeline.frame_dispatcher.QObject.__init__", return_value=None): + self.dispatcher = FrameDispatcher.__new__(FrameDispatcher) + self.dispatcher._subscribers = [] + self.dispatcher._frame_count = 0 + self.dispatcher._last_dispatch_time = 0.0 + + def test_subscribe_adds_subscriber(self): + cb = MagicMock() + self.dispatcher.subscribe(cb) + assert self.dispatcher.subscriber_count() == 1 + + def test_subscribe_same_callback_twice_is_noop(self): + cb = MagicMock() + self.dispatcher.subscribe(cb) + self.dispatcher.subscribe(cb) + assert self.dispatcher.subscriber_count() == 1 + + def test_unsubscribe_removes_subscriber(self): + cb = MagicMock() + self.dispatcher.subscribe(cb) + self.dispatcher.unsubscribe(cb) + assert self.dispatcher.subscriber_count() == 0 + + def test_unsubscribe_nonexistent_does_not_raise(self): + cb = MagicMock() + self.dispatcher.unsubscribe(cb) # should not raise + + def test_dispatch_calls_all_subscribers(self): + cb1 = MagicMock() + cb2 = MagicMock() + self.dispatcher.subscribe(cb1) + self.dispatcher.subscribe(cb2) + frame = _make_frame() + self.dispatcher.dispatch(frame) + cb1.assert_called_once_with(frame) + cb2.assert_called_once_with(frame) + + def test_dispatch_increments_frame_count(self): + frame = _make_frame() + self.dispatcher.dispatch(frame) + self.dispatcher.dispatch(frame) + assert self.dispatcher.frame_count == 2 + + def test_dispatch_with_no_subscribers_does_not_raise(self): + frame = _make_frame() + self.dispatcher.dispatch(frame) # should not raise + + def test_subscriber_exception_does_not_stop_others(self): + def bad_cb(f): + raise RuntimeError("boom") + + good_cb = MagicMock() + self.dispatcher.subscribe(bad_cb) + self.dispatcher.subscribe(good_cb) + frame = _make_frame() + self.dispatcher.dispatch(frame) # should not raise + good_cb.assert_called_once_with(frame) diff --git a/tests/test_telemetry_collector.py b/tests/test_telemetry_collector.py new file mode 100644 index 0000000..e0d95f7 --- /dev/null +++ b/tests/test_telemetry_collector.py @@ -0,0 +1,107 @@ +"""Tests for TelemetryCollector.""" + +from __future__ import annotations + +import time +from collections import deque +from unittest.mock import MagicMock, patch + + +class TestTelemetryCollector: + """Test telemetry calculations in isolation (no Qt event loop required).""" + + def _make_collector(self): + """Construct a TelemetryCollector bypassing Qt machinery.""" + from app.telemetry.telemetry_collector import TelemetryCollector + + with patch.object(TelemetryCollector, "__init__", return_value=None): + col = TelemetryCollector.__new__(TelemetryCollector) + + col._frame_times = deque(maxlen=120) + col._last_frame_time = 0.0 + col._total_frames = 0 + col._dropped_frames = 0 + col._fps_window = deque() + col._fps_window_size_s = 1.0 + col._process = MagicMock() + col._process.memory_info.return_value.rss = 50 * 1024 * 1024 # 50 MB + return col + + def test_initial_snapshot_has_zero_fps(self): + col = self._make_collector() + snap = col._compute_snapshot() + assert snap.fps == 0.0 + + def test_fps_counts_frames_in_window(self): + col = self._make_collector() + now = time.perf_counter() + # Simulate 30 frames within the last second + for i in range(30): + col._fps_window.append(now - 0.9 + i * 0.03) + snap = col._compute_snapshot() + assert snap.fps == 30.0 + + def test_fps_excludes_old_frames(self): + col = self._make_collector() + now = time.perf_counter() + # 10 frames older than 1 second — should not count + for i in range(10): + col._fps_window.append(now - 2.0 + i * 0.05) + # 5 frames within the last second + for i in range(5): + col._fps_window.append(now - 0.4 + i * 0.05) + snap = col._compute_snapshot() + assert snap.fps == 5.0 + + def test_frame_time_average(self): + col = self._make_collector() + # 10 frames at 33.3 ms each + interval = 0.0333 + for _ in range(10): + col._frame_times.append(interval) + snap = col._compute_snapshot() + assert abs(snap.frame_time_ms - interval * 1000) < 0.1 + + def test_drop_detection(self): + col = self._make_collector() + # Seed with 10 normal frames at ~16 ms + normal_interval = 0.016 + now = time.perf_counter() + col._last_frame_time = now - normal_interval * 10 + for _ in range(9): + col._last_frame_time += normal_interval + col._frame_times.append(normal_interval) + + # Simulate a big gap (3× normal) — should trigger drop detection + col._last_frame_time = now - normal_interval * 10 # reset base + # Manually call on_frame-like logic + with patch("app.telemetry.telemetry_collector.time") as mock_time: + # Set last_frame_time to something reasonable + col._last_frame_time = now + big_delta = normal_interval * 5 # 5× average → drop + mock_time.perf_counter.return_value = now + big_delta + # Replicate the drop detection logic + delta = big_delta + col._frame_times.append(delta) + avg = sum(col._frame_times) / len(col._frame_times) + if delta > avg * 2.5: + col._dropped_frames += 1 + + assert col._dropped_frames == 1 + + def test_reset_counters(self): + col = self._make_collector() + col._total_frames = 100 + col._dropped_frames = 5 + col._frame_times.append(0.016) + col._fps_window.append(time.perf_counter()) + col.reset_counters() + assert col._total_frames == 0 + assert col._dropped_frames == 0 + assert len(col._frame_times) == 0 + assert len(col._fps_window) == 0 + + def test_snapshot_memory_mb(self): + col = self._make_collector() + snap = col._compute_snapshot() + assert snap.memory_mb == 50.0