Compare commits

..

3 Commits

Author SHA1 Message Date
03d3332b35 fix: correct import statement for QVideoWidget in main_window.py 2026-05-12 19:52:59 +02:00
cd7f196b25 feat: implement core functionality for camera preview application
- Add FrameDispatcher for distributing QVideoFrames to subscribers
- Implement TelemetryCollector to measure video pipeline performance metrics
- Create MainWindow as the main application interface with video rendering
- Develop AppMenuBar for camera selection, resolution, and FPS settings
- Establish overlay system for displaying telemetry metrics
- Set up project structure and configuration files
- Add unit tests for FrameDispatcher and TelemetryCollector
2026-05-12 19:49:53 +02:00
65b98c352d new future - play video 2026-05-12 19:35:04 +02:00
35 changed files with 1610 additions and 943 deletions

View File

@@ -0,0 +1,80 @@
"""Camera enumeration — discovers available video input devices."""
from __future__ import annotations
from dataclasses import dataclass, field
from PySide6.QtMultimedia import QCameraDevice, QMediaDevices
@dataclass
class CameraInfo:
"""Lightweight descriptor of a detected camera."""
device: QCameraDevice
name: str
id: str
formats: list[tuple[int, int, float]] = field(default_factory=list)
# formats: list of (width, height, max_fps)
def __str__(self) -> str:
return f"{self.name} [{self.id}]"
class CameraEnumerator:
"""Discovers available video input devices via QMediaDevices."""
@staticmethod
def list_cameras() -> list[CameraInfo]:
"""Return all available camera devices with their supported formats."""
devices = QMediaDevices.videoInputs()
cameras: list[CameraInfo] = []
for device in devices:
formats: list[tuple[int, int, float]] = []
for fmt in device.videoFormats():
res = fmt.resolution()
fps = fmt.maxFrameRate()
formats.append((res.width(), res.height(), fps))
# deduplicate and sort: largest resolution first, then fps descending
seen: set[tuple[int, int, float]] = set()
unique_formats: list[tuple[int, int, float]] = []
for f in sorted(formats, key=lambda x: (x[0] * x[1], x[2]), reverse=True):
if f not in seen:
seen.add(f)
unique_formats.append(f)
cameras.append(
CameraInfo(
device=device,
name=device.description(),
id=device.id().toStdString()
if hasattr(device.id(), "toStdString")
else device.id().data().decode("utf-8", errors="replace"),
formats=unique_formats,
)
)
return cameras
@staticmethod
def default_camera() -> CameraInfo | None:
"""Return the system default camera, or None if no camera is available."""
device = QMediaDevices.defaultVideoInput()
if device.isNull():
return None
cameras = CameraEnumerator.list_cameras()
# find by id match
default_id = (
device.id().toStdString()
if hasattr(device.id(), "toStdString")
else device.id().data().decode("utf-8", errors="replace")
)
for cam in cameras:
if cam.id == default_id:
return cam
# fallback: wrap directly
return cameras[0] if cameras else None

View File

@@ -0,0 +1,174 @@
"""Camera Service — manages QCamera lifecycle and frame acquisition."""
from __future__ import annotations
import logging
from PySide6.QtCore import QObject, Signal
from PySide6.QtMultimedia import (
QCamera,
QMediaCaptureSession,
QVideoFrame,
QVideoSink,
)
from app.camera.camera_enumerator import CameraInfo
from app.config import DEFAULT_FPS, DEFAULT_HEIGHT, DEFAULT_WIDTH
logger = logging.getLogger(__name__)
class CameraService(QObject):
"""
Manages camera initialisation, configuration and frame delivery.
Emits:
frame_ready(QVideoFrame) — new frame available from the camera
camera_started() — camera successfully opened and streaming
camera_stopped() — camera stopped (clean shutdown)
camera_error(str) — camera error description
"""
frame_ready = Signal(QVideoFrame)
camera_started = Signal()
camera_stopped = Signal()
camera_error = Signal(str)
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self._camera: QCamera | None = None
self._session = QMediaCaptureSession(self)
self._sink = QVideoSink(self)
self._current_info: CameraInfo | None = None
self._session.setVideoSink(self._sink)
self._sink.videoFrameChanged.connect(self._on_frame)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def start(self, camera_info: CameraInfo) -> None:
"""Start streaming from the given camera device."""
self.stop()
self._current_info = camera_info
self._camera = QCamera(camera_info.device, self)
self._camera.errorOccurred.connect(self._on_error)
self._camera.activeChanged.connect(self._on_active_changed)
self._session.setCamera(self._camera)
self._apply_best_format(camera_info)
self._camera.start()
logger.info("Camera start requested: %s", camera_info.name)
def stop(self) -> None:
"""Stop the current camera."""
if self._camera is not None:
self._camera.stop()
self._camera.errorOccurred.disconnect()
self._camera.activeChanged.disconnect()
self._camera = None
self._current_info = None
logger.info("Camera stopped")
def reconnect(self) -> None:
"""Restart the current camera after an error or disconnect."""
if self._current_info is not None:
logger.info("Reconnecting camera: %s", self._current_info.name)
self.start(self._current_info)
else:
logger.warning("Reconnect requested but no camera was previously started")
def set_resolution(self, width: int, height: int) -> None:
"""Request a specific resolution. Effective on next start() if camera is active."""
if self._camera is None:
return
self._set_format(width, height, fps=None)
def set_fps(self, fps: float) -> None:
"""Request a specific frame rate."""
if self._camera is None or self._current_info is None:
return
# Get current resolution from active format
fmt = self._camera.cameraFormat()
res = fmt.resolution()
self._set_format(res.width(), res.height(), fps=fps)
@property
def is_active(self) -> bool:
return self._camera is not None and self._camera.isActive()
@property
def current_camera(self) -> CameraInfo | None:
return self._current_info
# ------------------------------------------------------------------
# Video output accessor for direct QVideoWidget connection
# ------------------------------------------------------------------
def video_sink(self) -> QVideoSink:
"""Return the internal QVideoSink (used by VideoRenderer)."""
return self._sink
def capture_session(self) -> QMediaCaptureSession:
"""Return the capture session (can be connected to QVideoWidget directly)."""
return self._session
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _apply_best_format(self, info: CameraInfo) -> None:
"""Pick the best matching format: prefer DEFAULT_WIDTH x DEFAULT_HEIGHT at DEFAULT_FPS."""
if not info.formats:
return
self._set_format(DEFAULT_WIDTH, DEFAULT_HEIGHT, fps=float(DEFAULT_FPS))
def _set_format(self, width: int, height: int, fps: float | None) -> None:
if self._camera is None or self._current_info is None:
return
best = None
best_score = -1
for fmt in self._current_info.device.videoFormats():
res = fmt.resolution()
w, h = res.width(), res.height()
f = fmt.maxFrameRate()
res_match = int(w == width and h == height) * 1000
fps_match = int(fps is not None and abs(f - fps) < 1) * 100
area_score = -(abs(w * h - width * height))
score = res_match + fps_match + area_score
if score > best_score:
best_score = score
best = fmt
if best is not None:
self._camera.setCameraFormat(best)
res = best.resolution()
logger.info(
"Camera format set: %dx%d @ %.1f fps",
res.width(),
res.height(),
best.maxFrameRate(),
)
def _on_frame(self, frame: QVideoFrame) -> None:
if frame.isValid():
self.frame_ready.emit(frame)
def _on_error(self, error: QCamera.Error, error_string: str) -> None:
logger.error("Camera error %s: %s", error, error_string)
self.camera_error.emit(error_string)
def _on_active_changed(self, active: bool) -> None:
if active:
logger.info("Camera active: %s", self._current_info.name if self._current_info else "?")
self.camera_started.emit()
else:
logger.info("Camera inactive")
self.camera_stopped.emit()

22
app/config.py Normal file
View File

@@ -0,0 +1,22 @@
"""Application-wide constants and default settings."""
APP_NAME = "Duck Preview"
APP_VERSION = "0.1.0"
# Default camera settings
DEFAULT_FPS = 30
DEFAULT_WIDTH = 1280
DEFAULT_HEIGHT = 720
# Telemetry
TELEMETRY_UPDATE_INTERVAL_MS = 500 # how often the metrics snapshot is refreshed
# Overlay
OVERLAY_BG_COLOR = (0, 0, 0, 160) # RGBA
OVERLAY_TEXT_COLOR = (255, 255, 255, 255)
OVERLAY_FONT_SIZE = 13
OVERLAY_PADDING = 10
OVERLAY_MARGIN = 10
# Frame dispatcher
DISPATCHER_MAX_QUEUE_SIZE = 2 # max pending frames per slow subscriber before drop

35
app/main.py Normal file
View File

@@ -0,0 +1,35 @@
"""Application entry point."""
from __future__ import annotations
import logging
import sys
from PySide6.QtCore import Qt
from PySide6.QtWidgets import QApplication
from app.config import APP_NAME
from app.ui.main_window import MainWindow
def main() -> None:
# Basic logging — WARNING by default; Debug menu toggles to DEBUG
logging.basicConfig(
level=logging.WARNING,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
app = QApplication(sys.argv)
app.setApplicationName(APP_NAME)
app.setHighDpiScaleFactorRoundingPolicy(
Qt.HighDpiScaleFactorRoundingPolicy.PassThrough
)
window = MainWindow()
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,126 @@
"""Overlay Widget — transparent layer rendered above the video preview."""
from __future__ import annotations
from PySide6.QtCore import QRect, Qt, Slot
from PySide6.QtGui import QColor, QFont, QPainter, QPen
from PySide6.QtWidgets import QWidget
from app.config import (
OVERLAY_BG_COLOR,
OVERLAY_FONT_SIZE,
OVERLAY_MARGIN,
OVERLAY_PADDING,
OVERLAY_TEXT_COLOR,
)
from app.telemetry.telemetry_collector import TelemetrySnapshot
class OverlayWidget(QWidget):
"""
Semi-transparent performance metrics overlay.
Sits on top of the video widget (same parent, raised).
Does NOT intercept mouse events — clicks pass through to the video widget.
Usage:
overlay = OverlayWidget(parent=main_window)
telemetry_collector.metrics_updated.connect(overlay.update_metrics)
"""
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
# Make widget transparent to mouse and visual background
self.setAttribute(Qt.WidgetAttribute.WA_TransparentForMouseEvents, True)
self.setAttribute(Qt.WidgetAttribute.WA_NoSystemBackground, True)
self.setAttribute(Qt.WidgetAttribute.WA_TranslucentBackground, True)
self.setWindowFlags(Qt.WindowType.FramelessWindowHint)
self._snapshot: TelemetrySnapshot | None = None
self._visible_overlay: bool = True
# Font
self._font = QFont("Monospace")
self._font.setStyleHint(QFont.StyleHint.TypeWriter)
self._font.setPointSize(OVERLAY_FONT_SIZE)
self._font.setBold(False)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
@Slot(object)
def update_metrics(self, snapshot: TelemetrySnapshot) -> None:
"""Receive a new telemetry snapshot and trigger a repaint."""
self._snapshot = snapshot
if self._visible_overlay:
self.update()
def set_overlay_visible(self, visible: bool) -> None:
self._visible_overlay = visible
self.update()
def toggle_overlay(self) -> None:
self.set_overlay_visible(not self._visible_overlay)
# ------------------------------------------------------------------
# Qt painting
# ------------------------------------------------------------------
def paintEvent(self, event) -> None: # noqa: N802
if not self._visible_overlay or self._snapshot is None:
return
lines = self._format_lines(self._snapshot)
if not lines:
return
painter = QPainter(self)
painter.setRenderHint(QPainter.RenderHint.Antialiasing, False)
painter.setFont(self._font)
fm = painter.fontMetrics()
line_height = fm.height()
max_width = max(fm.horizontalAdvance(line) for line in lines)
box_w = max_width + OVERLAY_PADDING * 2
box_h = line_height * len(lines) + OVERLAY_PADDING * 2
x = OVERLAY_MARGIN
y = OVERLAY_MARGIN
# Background rectangle
bg = QColor(*OVERLAY_BG_COLOR)
painter.setBrush(bg)
painter.setPen(Qt.PenStyle.NoPen)
painter.drawRoundedRect(QRect(x, y, box_w, box_h), 6, 6)
# Text
text_color = QColor(*OVERLAY_TEXT_COLOR)
painter.setPen(QPen(text_color))
text_x = x + OVERLAY_PADDING
text_y = y + OVERLAY_PADDING + fm.ascent()
for line in lines:
painter.drawText(text_x, text_y, line)
text_y += line_height
painter.end()
# ------------------------------------------------------------------
# Private
# ------------------------------------------------------------------
@staticmethod
def _format_lines(snap: TelemetrySnapshot) -> list[str]:
lines = [
f"FPS {snap.fps:>6.1f}",
f"Frame {snap.frame_time_ms:>6.1f} ms",
f"Drop {snap.dropped_frames:>6d}",
f"CPU {snap.cpu_percent:>5.1f} %",
]
if snap.memory_mb is not None:
lines.append(f"Mem {snap.memory_mb:>5.0f} MB")
return lines

View File

@@ -0,0 +1,111 @@
"""Frame Dispatcher — distributes QVideoFrames to registered subscribers."""
from __future__ import annotations
import logging
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from PySide6.QtCore import QObject, Slot
from PySide6.QtMultimedia import QVideoFrame
logger = logging.getLogger(__name__)
FrameCallback = Callable[[QVideoFrame], None]
@dataclass
class _Subscriber:
callback: FrameCallback
drop_if_busy: bool = True
_busy: bool = field(default=False, init=False, repr=False)
class FrameDispatcher(QObject):
"""
Receives frames from CameraService and fans them out to all subscribers.
Each subscriber is a callable (QVideoFrame) -> None.
Subscribers that set drop_if_busy=True will skip a frame if they are still
processing the previous one (non-blocking). Subscribers with drop_if_busy=False
always receive every frame.
All dispatch happens in the GUI thread (Qt signal/slot), so subscribers
must NOT perform heavy work directly — they should queue to a worker thread
if processing is needed.
"""
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self._subscribers: list[_Subscriber] = []
self._frame_count: int = 0
self._last_dispatch_time: float = 0.0
# ------------------------------------------------------------------
# Subscription API
# ------------------------------------------------------------------
def subscribe(self, callback: FrameCallback, *, drop_if_busy: bool = True) -> None:
"""Register a frame callback.
Args:
callback: Callable that receives QVideoFrame.
drop_if_busy: When True, frame is skipped if subscriber is still
marked busy from last call (default True).
"""
for sub in self._subscribers:
if sub.callback is callback:
logger.warning("Subscriber %r already registered", callback)
return
self._subscribers.append(_Subscriber(callback=callback, drop_if_busy=drop_if_busy))
logger.debug("Subscriber added: %r (drop_if_busy=%s)", callback, drop_if_busy)
def unsubscribe(self, callback: FrameCallback) -> None:
"""Remove a previously registered callback."""
before = len(self._subscribers)
self._subscribers = [s for s in self._subscribers if s.callback is not callback]
if len(self._subscribers) < before:
logger.debug("Subscriber removed: %r", callback)
else:
logger.warning("Subscriber not found for removal: %r", callback)
def subscriber_count(self) -> int:
return len(self._subscribers)
# ------------------------------------------------------------------
# Frame intake — connect CameraService.frame_ready to this slot
# ------------------------------------------------------------------
@Slot(QVideoFrame)
def dispatch(self, frame: QVideoFrame) -> None:
"""Distribute the frame to all registered subscribers."""
self._frame_count += 1
now = time.perf_counter()
self._last_dispatch_time = now
for sub in self._subscribers:
if sub.drop_if_busy and sub._busy:
logger.debug("Dropping frame for busy subscriber %r", sub.callback)
continue
sub._busy = True
try:
sub.callback(frame)
except Exception:
logger.exception("Error in frame subscriber %r", sub.callback)
finally:
sub._busy = False
# ------------------------------------------------------------------
# Stats
# ------------------------------------------------------------------
@property
def frame_count(self) -> int:
return self._frame_count
@property
def last_dispatch_time(self) -> float:
"""perf_counter timestamp of the last dispatched frame."""
return self._last_dispatch_time

View File

@@ -0,0 +1,171 @@
"""Telemetry Collector — measures video pipeline performance metrics."""
from __future__ import annotations
import time
from collections import deque
from dataclasses import dataclass
import psutil
from PySide6.QtCore import QObject, QTimer, Signal
from PySide6.QtMultimedia import QVideoFrame
from app.config import TELEMETRY_UPDATE_INTERVAL_MS
@dataclass
class TelemetrySnapshot:
"""Immutable snapshot of current performance metrics."""
fps: float
frame_time_ms: float # average inter-frame time in ms
dropped_frames: int # cumulative dropped frames detected
cpu_percent: float # overall CPU usage (0100)
memory_mb: float | None # RSS memory usage in MB (optional)
timestamp: float # time.perf_counter() when snapshot was taken
class TelemetryCollector(QObject):
"""
Frame subscriber that measures pipeline performance.
Connect to FrameDispatcher:
dispatcher.subscribe(collector.on_frame, drop_if_busy=False)
Listen to metrics updates:
collector.metrics_updated.connect(my_slot)
"""
metrics_updated = Signal(object) # emits TelemetrySnapshot
def __init__(
self,
update_interval_ms: int = TELEMETRY_UPDATE_INTERVAL_MS,
parent: QObject | None = None,
) -> None:
super().__init__(parent)
self._update_interval_ms = update_interval_ms
# frame timing ring-buffer (last 120 samples)
self._frame_times: deque[float] = deque(maxlen=120)
self._last_frame_time: float = 0.0
self._total_frames: int = 0
self._dropped_frames: int = 0
# FPS window — count frames in the last second
self._fps_window: deque[float] = deque() # timestamps of recent frames
self._fps_window_size_s: float = 1.0
# psutil process reference
self._process = psutil.Process()
# periodic snapshot timer
self._timer = QTimer(self)
self._timer.setInterval(update_interval_ms)
self._timer.timeout.connect(self._emit_snapshot)
self._timer.start()
# latest snapshot (available synchronously)
self._latest: TelemetrySnapshot = self._make_empty_snapshot()
# ------------------------------------------------------------------
# Frame subscriber callback
# ------------------------------------------------------------------
def on_frame(self, frame: QVideoFrame) -> None:
"""Called by FrameDispatcher for every frame. Must be fast."""
now = time.perf_counter()
# inter-frame time
if self._last_frame_time > 0:
delta = now - self._last_frame_time
self._frame_times.append(delta)
# drop detection: if delta > 2.5× the rolling average, count as drop
if len(self._frame_times) >= 5:
avg = sum(self._frame_times) / len(self._frame_times)
if delta > avg * 2.5:
self._dropped_frames += 1
self._last_frame_time = now
self._total_frames += 1
# FPS window
self._fps_window.append(now)
# prune old entries
cutoff = now - self._fps_window_size_s
while self._fps_window and self._fps_window[0] < cutoff:
self._fps_window.popleft()
# ------------------------------------------------------------------
# Snapshot
# ------------------------------------------------------------------
def latest_snapshot(self) -> TelemetrySnapshot:
"""Return the most recently computed snapshot."""
return self._latest
def reset_counters(self) -> None:
"""Reset cumulative counters (e.g. after camera switch)."""
self._frame_times.clear()
self._fps_window.clear()
self._last_frame_time = 0.0
self._total_frames = 0
self._dropped_frames = 0
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _emit_snapshot(self) -> None:
snapshot = self._compute_snapshot()
self._latest = snapshot
self.metrics_updated.emit(snapshot)
def _compute_snapshot(self) -> TelemetrySnapshot:
now = time.perf_counter()
# FPS — prune stale entries before counting
cutoff = now - self._fps_window_size_s
while self._fps_window and self._fps_window[0] < cutoff:
self._fps_window.popleft()
fps = float(len(self._fps_window)) # frames in the last second
# average frame time
if self._frame_times:
avg_frame_time_ms = (sum(self._frame_times) / len(self._frame_times)) * 1000.0
else:
avg_frame_time_ms = 0.0
# CPU
try:
cpu = psutil.cpu_percent(interval=None)
except Exception:
cpu = 0.0
# memory
try:
mem_mb = self._process.memory_info().rss / (1024 * 1024)
except Exception:
mem_mb = None
return TelemetrySnapshot(
fps=round(fps, 1),
frame_time_ms=round(avg_frame_time_ms, 2),
dropped_frames=self._dropped_frames,
cpu_percent=round(cpu, 1),
memory_mb=round(mem_mb, 1) if mem_mb is not None else None,
timestamp=now,
)
@staticmethod
def _make_empty_snapshot() -> TelemetrySnapshot:
return TelemetrySnapshot(
fps=0.0,
frame_time_ms=0.0,
dropped_frames=0,
cpu_percent=0.0,
memory_mb=None,
timestamp=time.perf_counter(),
)

0
app/ui/__init__.py Normal file
View File

170
app/ui/main_window.py Normal file
View File

@@ -0,0 +1,170 @@
"""Main application window."""
from __future__ import annotations
import logging
from PySide6.QtCore import Qt, QTimer
from PySide6.QtMultimediaWidgets import QVideoWidget
from PySide6.QtWidgets import QLabel, QMainWindow, QSizePolicy, QStatusBar
from app.camera.camera_enumerator import CameraEnumerator, CameraInfo
from app.camera.camera_service import CameraService
from app.config import APP_NAME, APP_VERSION
from app.overlay.overlay_widget import OverlayWidget
from app.pipeline.frame_dispatcher import FrameDispatcher
from app.telemetry.telemetry_collector import TelemetryCollector
from app.ui.menu_bar import AppMenuBar
logger = logging.getLogger(__name__)
class MainWindow(QMainWindow):
"""
Top-level application window.
Wires together:
CameraService → FrameDispatcher → TelemetryCollector
→ OverlayWidget (via metrics_updated)
CameraService.capture_session → QVideoWidget (direct, zero-copy path)
"""
def __init__(self) -> None:
super().__init__()
self.setWindowTitle(f"{APP_NAME} v{APP_VERSION}")
self.setMinimumSize(640, 480)
self.resize(1280, 720)
# --- Core components ---
self._camera_service = CameraService(self)
self._dispatcher = FrameDispatcher(self)
self._telemetry = TelemetryCollector(parent=self)
# --- Video widget (central widget) ---
self._video_widget = QVideoWidget(self)
self._video_widget.setSizePolicy(
QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding
)
self._video_widget.setAspectRatioMode(Qt.AspectRatioMode.KeepAspectRatio)
self.setCentralWidget(self._video_widget)
# Connect camera session to video widget — this is the zero-copy render path
self._camera_service.capture_session().setVideoOutput(self._video_widget)
# --- Overlay ---
self._overlay = OverlayWidget(parent=self._video_widget)
self._overlay.raise_()
self._overlay.resize(self._video_widget.size())
# --- Menu bar ---
self._menu = AppMenuBar(self)
self.setMenuBar(self._menu)
# --- Status bar ---
self._status_bar = QStatusBar(self)
self.setStatusBar(self._status_bar)
self._status_label = QLabel("Initialising…")
self._status_bar.addWidget(self._status_label)
# --- Wire signals ---
self._wire_signals()
# --- Enumerate cameras and start ---
QTimer.singleShot(0, self._initialise_cameras)
# ------------------------------------------------------------------
# Initialisation
# ------------------------------------------------------------------
def _initialise_cameras(self) -> None:
cameras = CameraEnumerator.list_cameras()
if not cameras:
self._status_label.setText("No cameras found")
logger.warning("No cameras detected")
return
self._menu.populate_cameras(cameras)
default = CameraEnumerator.default_camera()
start_cam = default if default is not None else cameras[0]
self._menu.populate_formats(start_cam)
self._start_camera(start_cam)
def _start_camera(self, cam: CameraInfo) -> None:
self._telemetry.reset_counters()
self._camera_service.start(cam)
self._menu.set_active_camera(cam)
self._status_label.setText(f"Opening: {cam.name}")
# ------------------------------------------------------------------
# Signal wiring
# ------------------------------------------------------------------
def _wire_signals(self) -> None:
# CameraService → FrameDispatcher
self._camera_service.frame_ready.connect(self._dispatcher.dispatch)
# FrameDispatcher → TelemetryCollector (never drop for telemetry)
self._dispatcher.subscribe(self._telemetry.on_frame, drop_if_busy=False)
# TelemetryCollector → OverlayWidget
self._telemetry.metrics_updated.connect(self._overlay.update_metrics)
# CameraService status
self._camera_service.camera_started.connect(self._on_camera_started)
self._camera_service.camera_stopped.connect(self._on_camera_stopped)
self._camera_service.camera_error.connect(self._on_camera_error)
# Menu signals
self._menu.camera_selected.connect(self._on_camera_selected)
self._menu.resolution_selected.connect(self._on_resolution_selected)
self._menu.fps_selected.connect(self._on_fps_selected)
self._menu.reconnect_requested.connect(self._camera_service.reconnect)
self._menu.overlay_toggled.connect(self._overlay.set_overlay_visible)
# ------------------------------------------------------------------
# Camera status slots
# ------------------------------------------------------------------
def _on_camera_started(self) -> None:
cam = self._camera_service.current_camera
name = cam.name if cam else "Unknown"
self._status_label.setText(f"Streaming: {name}")
logger.info("Camera streaming: %s", name)
def _on_camera_stopped(self) -> None:
self._status_label.setText("Camera stopped")
def _on_camera_error(self, message: str) -> None:
self._status_label.setText(f"Error: {message}")
logger.error("Camera error: %s", message)
# ------------------------------------------------------------------
# Menu action slots
# ------------------------------------------------------------------
def _on_camera_selected(self, cam: CameraInfo) -> None:
self._start_camera(cam)
def _on_resolution_selected(self, width: int, height: int) -> None:
self._camera_service.set_resolution(width, height)
def _on_fps_selected(self, fps: float) -> None:
self._camera_service.set_fps(fps)
# ------------------------------------------------------------------
# Qt overrides
# ------------------------------------------------------------------
def resizeEvent(self, event) -> None: # noqa: N802
super().resizeEvent(event)
# Keep overlay covering the video widget
if hasattr(self, "_overlay") and hasattr(self, "_video_widget"):
self._overlay.resize(self._video_widget.size())
def closeEvent(self, event) -> None: # noqa: N802
self._camera_service.stop()
super().closeEvent(event)

198
app/ui/menu_bar.py Normal file
View File

@@ -0,0 +1,198 @@
"""Menu bar — camera, video format, FPS and debug controls."""
from __future__ import annotations
import logging
from PySide6.QtCore import Signal
from PySide6.QtGui import QAction, QActionGroup
from PySide6.QtWidgets import QMenuBar, QWidget
from app.camera.camera_enumerator import CameraInfo
logger = logging.getLogger(__name__)
class AppMenuBar(QMenuBar):
"""
Application menu bar.
Signals:
camera_selected(CameraInfo) — user picked a camera
resolution_selected(int, int) — user picked (width, height)
fps_selected(float) — user picked a target FPS
reconnect_requested() — user hit Reconnect
overlay_toggled(bool) — overlay show/hide
log_toggled(bool) — console logging on/off
"""
camera_selected = Signal(object) # CameraInfo
resolution_selected = Signal(int, int)
fps_selected = Signal(float)
reconnect_requested = Signal()
overlay_toggled = Signal(bool)
log_toggled = Signal(bool)
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self._camera_group: QActionGroup | None = None
self._resolution_group: QActionGroup | None = None
self._fps_group: QActionGroup | None = None
self._cameras: list[CameraInfo] = []
self._build_menus()
# ------------------------------------------------------------------
# Public API — called after camera enumeration
# ------------------------------------------------------------------
def populate_cameras(self, cameras: list[CameraInfo]) -> None:
"""Populate the Camera menu with discovered devices."""
self._cameras = cameras
menu = self._camera_menu
# Remove existing camera actions (keep Reconnect + separator)
for action in list(menu.actions()):
if action not in (self._reconnect_action, self._cam_separator):
menu.removeAction(action)
self._camera_group = QActionGroup(self)
self._camera_group.setExclusive(True)
for cam in cameras:
action = QAction(cam.name, self)
action.setCheckable(True)
action.setData(cam)
self._camera_group.addAction(action)
menu.insertAction(self._cam_separator, action)
action.triggered.connect(self._on_camera_action)
if cameras:
first = self._camera_group.actions()[0]
first.setChecked(True)
def populate_formats(self, camera_info: CameraInfo) -> None:
"""Populate Resolution and FPS menus based on a camera's supported formats."""
self._populate_resolutions(camera_info)
self._populate_fps(camera_info)
def set_active_camera(self, camera_info: CameraInfo) -> None:
"""Check the menu item matching camera_info."""
if self._camera_group is None:
return
for action in self._camera_group.actions():
if action.data() is camera_info:
action.setChecked(True)
return
# ------------------------------------------------------------------
# Menu construction
# ------------------------------------------------------------------
def _build_menus(self) -> None:
# --- Camera menu ---
self._camera_menu = self.addMenu("Camera")
self._cam_separator = self._camera_menu.addSeparator()
self._reconnect_action = QAction("Reconnect", self)
self._reconnect_action.triggered.connect(self.reconnect_requested)
self._camera_menu.addAction(self._reconnect_action)
# --- Video menu ---
self._video_menu = self.addMenu("Video")
self._res_menu = self._video_menu.addMenu("Resolution")
self._fps_menu = self._video_menu.addMenu("FPS")
# --- Debug menu ---
debug_menu = self.addMenu("Debug")
self._overlay_action = QAction("Show Overlay", self)
self._overlay_action.setCheckable(True)
self._overlay_action.setChecked(True)
self._overlay_action.toggled.connect(self.overlay_toggled)
debug_menu.addAction(self._overlay_action)
self._log_action = QAction("Console Logging", self)
self._log_action.setCheckable(True)
self._log_action.setChecked(False)
self._log_action.toggled.connect(self._on_log_toggled)
debug_menu.addAction(self._log_action)
def _populate_resolutions(self, camera_info: CameraInfo) -> None:
self._res_menu.clear()
self._resolution_group = QActionGroup(self)
self._resolution_group.setExclusive(True)
seen: set[tuple[int, int]] = set()
for w, h, _ in camera_info.formats:
key = (w, h)
if key in seen:
continue
seen.add(key)
action = QAction(f"{w} × {h}", self)
action.setCheckable(True)
action.setData((w, h))
self._resolution_group.addAction(action)
self._res_menu.addAction(action)
action.triggered.connect(self._on_resolution_action)
actions = self._resolution_group.actions()
if actions:
actions[0].setChecked(True)
def _populate_fps(self, camera_info: CameraInfo) -> None:
self._fps_menu.clear()
self._fps_group = QActionGroup(self)
self._fps_group.setExclusive(True)
seen: set[int] = set()
for _, _, fps in camera_info.formats:
key = round(fps)
if key in seen:
continue
seen.add(key)
action = QAction(f"{key} fps", self)
action.setCheckable(True)
action.setData(float(fps))
self._fps_group.addAction(action)
self._fps_menu.addAction(action)
action.triggered.connect(self._on_fps_action)
actions = self._fps_group.actions()
if actions:
actions[0].setChecked(True)
# ------------------------------------------------------------------
# Slots
# ------------------------------------------------------------------
def _on_camera_action(self) -> None:
action = self.sender()
if action is None:
return
cam: CameraInfo = action.data()
logger.debug("Camera selected: %s", cam.name)
self.camera_selected.emit(cam)
self._populate_resolutions(cam)
self._populate_fps(cam)
def _on_resolution_action(self) -> None:
action = self.sender()
if action is None:
return
w, h = action.data()
logger.debug("Resolution selected: %dx%d", w, h)
self.resolution_selected.emit(w, h)
def _on_fps_action(self) -> None:
action = self.sender()
if action is None:
return
fps: float = action.data()
logger.debug("FPS selected: %.1f", fps)
self.fps_selected.emit(fps)
def _on_log_toggled(self, enabled: bool) -> None:
level = logging.DEBUG if enabled else logging.WARNING
logging.getLogger().setLevel(level)
self.log_toggled.emit(enabled)

View File

@@ -1,3 +0,0 @@
from duck_preview.app import main
main()

View File

@@ -1,47 +0,0 @@
from __future__ import annotations
import sys
from PySide6.QtCore import QTimer
from PySide6.QtMultimedia import QVideoSink
from PySide6.QtWidgets import QApplication
from duck_preview.camera.service import CameraService
from duck_preview.dispatcher.frame_dispatcher import FrameDispatcher
from duck_preview.main_window import MainWindow
from duck_preview.rendering.overlay import OverlayWidget
from duck_preview.rendering.video_widget import VideoWidget
from duck_preview.telemetry.collector import TelemetryCollector
def main() -> None:
app = QApplication(sys.argv)
app.setApplicationName("Duck Preview")
camera = CameraService()
video_sink = QVideoSink()
dispatcher = FrameDispatcher()
telemetry = TelemetryCollector()
video_widget = VideoWidget()
overlay = OverlayWidget()
camera.set_video_widget(video_widget)
camera.set_video_sink(video_sink)
video_sink.videoFrameChanged.connect(dispatcher.on_frame)
dispatcher.subscribe(telemetry.on_frame)
window = MainWindow(camera, video_widget, overlay)
camera.error_occurred.connect(lambda msg: overlay.set_metrics({"error": msg}))
metrics_timer = QTimer()
metrics_timer.timeout.connect(lambda: overlay.set_metrics(telemetry.metrics()))
metrics_timer.start(200)
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()

View File

@@ -1,59 +0,0 @@
from __future__ import annotations
from PySide6.QtCore import QObject, Signal
from PySide6.QtMultimedia import (
QCamera,
QCameraDevice,
QCameraFormat,
QMediaCaptureSession,
QMediaDevices,
QVideoSink,
)
from PySide6.QtMultimediaWidgets import QVideoWidget
class CameraService(QObject):
error_occurred = Signal(str)
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self._session = QMediaCaptureSession()
self._camera: QCamera | None = None
@property
def session(self) -> QMediaCaptureSession:
return self._session
def set_video_widget(self, widget: QVideoWidget) -> None:
self._session.setVideoOutput(widget)
def set_video_sink(self, sink: QVideoSink) -> None:
self._session.setVideoSink(sink)
@staticmethod
def available_cameras() -> list[QCameraDevice]:
return QMediaDevices.videoInputs()
def is_active(self) -> bool:
return self._camera is not None and self._camera.isActive()
def start(self, device: QCameraDevice) -> None:
self.stop()
self._camera = QCamera(device, self)
self._camera.errorOccurred.connect(self._on_error)
self._session.setCamera(self._camera)
self._camera.start()
def stop(self) -> None:
if self._camera is not None:
self._camera.stop()
self._session.setCamera(None)
self._camera.deleteLater()
self._camera = None
def set_camera_format(self, fmt: QCameraFormat) -> None:
if self._camera is not None:
self._camera.setCameraFormat(fmt)
def _on_error(self, error: QCamera.Error, error_string: str) -> None:
self.error_occurred.emit(error_string)

View File

@@ -1,22 +0,0 @@
from __future__ import annotations
from collections.abc import Callable
from PySide6.QtCore import QObject
from PySide6.QtMultimedia import QVideoFrame
class FrameDispatcher(QObject):
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self._subscribers: list[Callable[[QVideoFrame], None]] = []
def subscribe(self, callback: Callable[[QVideoFrame], None]) -> None:
self._subscribers.append(callback)
def unsubscribe(self, callback: Callable[[QVideoFrame], None]) -> None:
self._subscribers.remove(callback)
def on_frame(self, frame: QVideoFrame) -> None:
for cb in self._subscribers:
cb(frame)

View File

@@ -1,139 +0,0 @@
from __future__ import annotations
from PySide6.QtCore import QCameraPermission, Qt
from PySide6.QtGui import QAction, QCloseEvent
from PySide6.QtMultimedia import QCameraDevice, QMediaDevices
from PySide6.QtWidgets import QApplication, QGridLayout, QMainWindow, QWidget
from duck_preview.camera.service import CameraService
from duck_preview.rendering.overlay import OverlayWidget
from duck_preview.rendering.video_widget import VideoWidget
class MainWindow(QMainWindow):
def __init__(
self,
camera_service: CameraService,
video_widget: VideoWidget,
overlay_widget: OverlayWidget,
parent: QWidget | None = None,
) -> None:
super().__init__(parent)
self._camera = camera_service
self._video_widget = video_widget
self._overlay = overlay_widget
self._media_devices = QMediaDevices()
self.setWindowTitle("Duck Preview")
self.resize(1280, 720)
central = QWidget()
self.setCentralWidget(central)
layout = QGridLayout(central)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
layout.addWidget(video_widget, 0, 0)
layout.addWidget(overlay_widget, 0, 0)
self._setup_menus()
self._media_devices.videoInputsChanged.connect(self._rebuild_camera_menu)
def _setup_menus(self) -> None:
menu_bar = self.menuBar()
self._camera_menu = menu_bar.addMenu("Camera")
self._rebuild_camera_menu()
self._resolution_menu = menu_bar.addMenu("Resolution")
self._resolution_menu.setEnabled(False)
self._fps_menu = menu_bar.addMenu("FPS")
self._fps_menu.setEnabled(False)
debug_menu = menu_bar.addMenu("Debug")
toggle_overlay = QAction("Show Metrics", self)
toggle_overlay.setCheckable(True)
toggle_overlay.setChecked(True)
toggle_overlay.triggered.connect(self._overlay.set_visible)
debug_menu.addAction(toggle_overlay)
def _rebuild_camera_menu(self) -> None:
self._camera_menu.clear()
cameras = CameraService.available_cameras()
for device in cameras:
action = QAction(device.description(), self)
action.triggered.connect(lambda checked, d=device: self._on_camera_selected(d))
self._camera_menu.addAction(action)
if not cameras:
action = QAction("No cameras detected", self)
action.setEnabled(False)
self._camera_menu.addAction(action)
def _on_camera_selected(self, device: QCameraDevice) -> None:
self._request_camera_permission(device)
def _request_camera_permission(self, device: QCameraDevice) -> None:
perm = QCameraPermission()
match QApplication.checkPermission(perm):
case Qt.PermissionStatus.Undetermined:
QApplication.requestPermission(
perm, self, lambda: self._request_camera_permission(device)
)
case Qt.PermissionStatus.Denied:
self._overlay.set_metrics(
{
"error": (
"Camera permission denied.\n"
"Grant access in System Settings > "
"Privacy & Security > Camera"
)
}
)
case Qt.PermissionStatus.Granted:
self._camera.start(device)
self._rebuild_resolution_menu(device)
def _rebuild_resolution_menu(self, device: QCameraDevice) -> None:
self._resolution_menu.clear()
self._resolution_menu.setEnabled(True)
formats = device.videoFormats()
seen: set[tuple[int, int]] = set()
for fmt in formats:
res = fmt.resolution()
key = (res.width(), res.height())
if key not in seen:
seen.add(key)
action = QAction(f"{res.width()}x{res.height()}", self)
action.triggered.connect(
lambda checked, d=device, w=res.width(), h=res.height(): (
self._on_resolution_selected(d, w, h)
)
)
self._resolution_menu.addAction(action)
def _on_resolution_selected(self, device: QCameraDevice, width: int, height: int) -> None:
self._rebuild_fps_menu(device, width, height)
def _rebuild_fps_menu(self, device: QCameraDevice, width: int, height: int) -> None:
self._fps_menu.clear()
self._fps_menu.setEnabled(True)
formats = device.videoFormats()
seen_labels: set[str] = set()
for fmt in formats:
res = fmt.resolution()
if res.width() == width and res.height() == height:
min_fps = round(fmt.minFrameRate())
max_fps = round(fmt.maxFrameRate())
label = f"{max_fps} FPS" if min_fps == max_fps else f"{min_fps}-{max_fps} FPS"
if label not in seen_labels:
seen_labels.add(label)
action = QAction(label, self)
action.triggered.connect(
lambda checked, f=fmt: self._camera.set_camera_format(f)
)
self._fps_menu.addAction(action)
def closeEvent(self, event: QCloseEvent) -> None: # noqa: N802
self._camera.stop()
super().closeEvent(event)

View File

@@ -1,73 +0,0 @@
from __future__ import annotations
from PySide6.QtCore import Qt
from PySide6.QtGui import QColor, QFont, QFontMetrics, QPainter
from PySide6.QtWidgets import QWidget
class OverlayWidget(QWidget):
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self.setAttribute(Qt.WA_TransparentForMouseEvents)
self.setAttribute(Qt.WA_NoSystemBackground)
self._visible = True
self._metrics: dict[str, float | int | str] = {}
def set_visible(self, visible: bool) -> None:
self._visible = visible
self.update()
def set_metrics(self, metrics: dict[str, float | int | str]) -> None:
self._metrics = metrics
self.update()
def paintEvent(self, event) -> None: # noqa: N802
if not self._visible or not self._metrics:
return
painter = QPainter(self)
painter.setRenderHint(QPainter.TextAntialiasing)
if "error" in self._metrics:
self._paint_error(painter)
else:
self._paint_metrics(painter)
def _paint_metrics(self, painter: QPainter) -> None:
lines = [
f"FPS: {self._metrics.get('fps', 0):>7}",
f"Frame: {self._metrics.get('frame_time_ms', 0):>7.1f} ms",
f"Frames: {self._metrics.get('frame_count', 0):>7}",
]
font = QFont("monospace", 11)
painter.setFont(font)
fm = QFontMetrics(font)
text_width = max(fm.horizontalAdvance(line) for line in lines) + 24
text_height = len(lines) * (fm.height() + 6) + 12
painter.fillRect(8, 8, text_width, text_height, QColor(0, 0, 0, 160))
painter.setPen(QColor(0, 255, 0))
for i, line in enumerate(lines):
y = 22 + i * (fm.height() + 6)
painter.drawText(16, y, line)
def _paint_error(self, painter: QPainter) -> None:
msg = str(self._metrics.get("error", "Unknown error"))
lines = msg.split("\n")
font = QFont("monospace", 12)
painter.setFont(font)
fm = QFontMetrics(font)
text_width = max(fm.horizontalAdvance(line) for line in lines) + 24
text_height = len(lines) * (fm.height() + 6) + 12
painter.fillRect(8, 8, text_width, text_height, QColor(0, 0, 0, 200))
painter.setPen(QColor(200, 50, 50))
for i, line in enumerate(lines):
y = 22 + i * (fm.height() + 6)
painter.drawText(16, y, line)

View File

@@ -1,10 +0,0 @@
from __future__ import annotations
from PySide6.QtMultimediaWidgets import QVideoWidget
from PySide6.QtWidgets import QWidget
class VideoWidget(QVideoWidget):
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self.setMinimumSize(320, 240)

View File

@@ -1,39 +0,0 @@
from __future__ import annotations
import time
from collections import deque
from PySide6.QtCore import QObject
from PySide6.QtMultimedia import QVideoFrame
class TelemetryCollector(QObject):
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self._timestamps: deque[float] = deque(maxlen=500)
def on_frame(self, frame: QVideoFrame) -> None:
self._timestamps.append(time.perf_counter())
def metrics(self) -> dict[str, float | int]:
if not self._timestamps:
return {"fps": 0, "frame_time_ms": 0.0, "frame_count": 0}
now = time.perf_counter()
cutoff = now - 1.0
recent = [t for t in self._timestamps if t >= cutoff]
fps = len(recent)
frame_time_ms = 0.0
if len(self._timestamps) >= 2:
diffs = [
self._timestamps[i] - self._timestamps[i - 1]
for i in range(1, len(self._timestamps))
]
frame_time_ms = (sum(diffs) / len(diffs)) * 1000 if diffs else 0.0
return {
"fps": fps,
"frame_time_ms": round(frame_time_ms, 2),
"frame_count": len(self._timestamps),
}

View File

@@ -1,150 +1,324 @@
# MVP Implementation Plan — Duck Preview
# Plan działania — MVP Camera Preview (PySide6)
## Stack
## Środowisko
- **Language:** Python 3.12
- **GUI/Framework:** PySide6 6.11 (QtMultimedia + QtWidgets)
- **Camera backend:** QCamera + QMediaCaptureSession + QVideoSink (native GPU)
- **Rendering:** QWidget (paintEvent) z QPainter — manual render z QVideoFrame → QImage
- **Testing:** pytest
- **Linting/Formatting:** ruff
## Architecture
```
CameraService
└─ QVideoSink.videoFrame ──→ FrameDispatcher.on_frame
├─ VideoWidget.on_frame (render klatki)
├─ TelemetryCollector.on_frame (timestamp)
└─ [Future AI subscribers]
TelemetryCollector.metrics() ──→ OverlayWidget.set_metrics()
(polled co 200ms przez QTimer w app.py)
```
## Kolejność implementacji
### 0. Project scaffolding
`pyproject.toml` — projekt, zależność PySide6, entry point, ruff config, pytest
`duck_preview/__init__.py`, `__main__.py` — entry point `python -m duck_preview`
Podkatalogi: `camera/`, `dispatcher/`, `rendering/`, `telemetry/`
### 1. CameraService (`camera/service.py`)
- Wrapper na `QCamera` + `QMediaCaptureSession` + `QVideoSink`
- `available_cameras()` — static, zwraca listę `QCameraDevice` z `QMediaDevices`
- `start(device)` / `stop()` — zarządzanie cyklem życia kamery
- `set_camera_format(fmt)` — zmiana rozdzielczości/FPS
- `sink` property — QVideoSink do podpięcia dispatchera
- `error_occurred` Signal — propagacja błędów kamery
### 2. FrameDispatcher (`dispatcher/frame_dispatcher.py`)
- Prosty pub/sub: `subscribe(cb)` / `unsubscribe(cb)`
- `on_frame(frame)` — wołany z sygnału QVideoSink, iteruje wszystkich subskrybentów
- Frame dropping: na razie brak (wszystkie callbacki szybkie)
### 3. TelemetryCollector (`telemetry/collector.py`)
- Subskrybent dispatchera
- Zbiera timestampy (`deque`, maxlen=500)
- `metrics()` → dict: fps (klatki z ostatniej sekundy), frame_time_ms (średnia delta między klatkami), frame_count
- Brak CPU usage (out of scope na MVP)
### 4. VideoWidget (`rendering/video_widget.py`)
- `QWidget` z `WA_OpaquePaintEvent`
- `on_frame(frame)``frame.toImage()` → zapis QImage → `update()`
- `paintEvent` — skalowanie z zachowaniem proporcji, centrowanie, letterboxing
- Brak klatek → wyświetla "No camera feed"
### 5. OverlayWidget (`rendering/overlay.py`)
- Przezroczysty QWidget (`WA_TransparentForMouseEvents`)
- Nakładka na video, rysuje tekst metryk (FPS, frame time, frame count)
- Semi-transparentne tło, zielona czcionka Consolas
- Toggle widoczności przez menu Debug
### 6. MainWindow (`main_window.py`)
- `QMainWindow` z menu barem:
- **Camera** — lista dostępnych kamer (dynamiczna, reaguje na `videoInputsChanged`)
- **Resolution** — dostępne rozdzielczości dla wybranej kamery
- **FPS** — dostępne FPS dla wybranej rozdzielczości
- **Debug** — toggle nakładki
- Central widget: GridLayout z VideoWidget + OverlayWidget (stacked)
### 7. App (`app.py`)
- `main()` — tworzy QApplication, instancjonuje i łączy zależności (ręczne DI)
- Wire: camera.sink → dispatcher.on_frame
- Wire: dispatcher → telemetry.on_frame, video_widget.on_frame
- QTimer 200ms: telemetry.metrics() → overlay.set_metrics()
- `__main__.py``from duck_preview.app import main; main()`
| Element | Wartość |
|---|---|
| Python | 3.12.10 (venv: `.venv-win`) |
| Framework GUI | PySide6 6.11.0 |
| Dev platform | Windows 11 |
| Target platform | Mac Mini (Intel i7, macOS Ventura) |
| Kamera docelowa | ELP USB Camera |
| Narzędzia | pytest, ruff, colorama |
---
## DI Wiring (ręczne, w app.py)
## Fazy realizacji
### Faza 0 — Projekt i scaffolding
Cel: ustalenie struktury katalogów i modułów przed napisaniem pierwszej linii logiki.
#### 0.1 Struktura projektu
```
app = QApplication(sys.argv)
camera = CameraService()
dispatcher = FrameDispatcher()
telemetry = TelemetryCollector()
video_widget = VideoWidget()
overlay = OverlayWidget()
camera.sink.videoFrame.connect(dispatcher.on_frame)
dispatcher.subscribe(telemetry.on_frame)
dispatcher.subscribe(video_widget.on_frame)
window = MainWindow(camera, video_widget, overlay)
# Poll telemetry co 200ms → overlay
metrics_timer = QTimer()
metrics_timer.timeout.connect(lambda: overlay.set_metrics(telemetry.metrics()))
metrics_timer.start(200)
window.show()
app.exec()
```
---
## Struktura plików
```
duck-preview/
├── pyproject.toml
├── notes/
│ ├── 01-mvp-preview.md
│ └── 01-mvp-plan.md
├── duck_preview/
duck-preview2/
├── app/
│ ├── __init__.py
│ ├── __main__.py
│ ├── app.py
│ ├── main_window.py
│ ├── main.py # entry point
│ ├── config.py # stałe, domyślne ustawienia
│ ├── camera/
│ │ ├── __init__.py
│ │ ── service.py
├── dispatcher/
│ │ ── camera_service.py # QCamera + QMediaCaptureSession
│ └── camera_enumerator.py # wykrywanie dostępnych kamer
│ ├── pipeline/
│ │ ├── __init__.py
│ │ └── frame_dispatcher.py
│ ├── rendering/
│ │ └── frame_dispatcher.py # dystrybucja klatek do subskrybentów
│ ├── telemetry/
│ │ ├── __init__.py
│ │ ── video_widget.py
│ └── overlay.py
└── telemetry/
│ │ ── telemetry_collector.py # zbieranie metryk FPS/frame time/CPU
── overlay/
│ ├── __init__.py
│ │ └── overlay_widget.py # przezroczysta warstwa QWidget
│ └── ui/
│ ├── __init__.py
── collector.py
└── tests/
├── __init__.py
├── test_dispatcher.py
── test_collector.py
── main_window.py # główne okno aplikacji
│ └── menu_bar.py # menu: kamera, rozdzielczość, FPS, debug
├── tests/
├── __init__.py
── test_camera_enumerator.py
│ └── test_telemetry_collector.py
├── notes/
├── requirements.txt
├── requirements-dev.txt
└── pyproject.toml # konfiguracja ruff + pytest
```
#### 0.2 Pliki konfiguracyjne
- `pyproject.toml` — konfiguracja ruff (linter/formatter) i pytest
- `requirements.txt` — zależności produkcyjne (PySide6)
- `requirements-dev.txt` — zależności deweloperskie (pytest, ruff)
- `.gitignore` — aktualizacja o artefakty Pythona
---
### Faza 1 — Camera Service
Cel: stabilne pobranie obrazu z kamery przez QtMultimedia.
#### 1.1 Camera Enumerator
- `QMediaDevices.videoInputs()` — lista dostępnych kamer
- Zwraca listę `QCameraDevice` z nazwą, id i obsługiwanymi formatami
- Obsługa braku kamer (komunikat, nie crash)
- Test jednostkowy: mockowanie `QMediaDevices`
#### 1.2 Camera Service
- Opakowuje `QCamera` + `QMediaCaptureSession`
- API:
- `start(device: QCameraDevice)` — uruchamia kamerę
- `stop()` — zatrzymuje kamerę
- `set_resolution(width, height)` — ustawia format
- `set_fps(fps)` — ustawia docelowy FPS
- `reconnect()` — restart po błędzie
- `QVideoSink` jako punkt odbioru klatek
- Sygnał `frame_ready(QVideoFrame)` do Frame Dispatcher
- Obsługa błędów kamery (`QCamera.errorOccurred`)
#### 1.3 Uwagi platformowe
| Aspekt | Windows 11 (dev) | macOS Ventura (target) |
|---|---|---|
| Backend | DirectShow / Media Foundation | AVFoundation |
| Kamera ELP | USB, standardowy UVC driver | USB, UVC |
| Format klatek | YUYV / MJPEG | YUYV / MJPEG |
| GPU rendering | ANGLE (OpenGL ES) | Metal |
---
### Faza 2 — Frame Dispatcher
Cel: dystrybucja klatek do wielu odbiorców bez blokowania akwizycji.
#### 2.1 Frame Dispatcher
- Wzorzec: publish-subscribe (lista callbacków)
- `subscribe(callback: Callable[[QVideoFrame], None])`
- `unsubscribe(callback)`
- `dispatch(frame: QVideoFrame)` — wywołuje wszystkich subskrybentów
- Klatki NIE są kopiowane — subskrybenci działają na referencji
- Subskrybenci mogą **pominąć klatkę** (tryb drop-if-busy)
- Wywołanie `dispatch` następuje w wątku GUI (slot połączony z `frame_ready`)
#### 2.2 Subskrybenci w Fazie 1
| Subskrybent | Działanie |
|---|---|
| Video Renderer | przekazuje klatkę do `QVideoSink` / `QVideoWidget` |
| Telemetry Collector | mierzy czas, zlicza klatki |
---
### Faza 3 — Video Renderer
Cel: renderowanie klatki w GUI bez zbędnych kopii.
#### 3.1 Podejście
- `QVideoWidget` jako główny widget podglądu
- `QMediaCaptureSession.setVideoOutput(QVideoWidget)` — ścieżka bezpośrednia, zero kopii
- Alternatywnie: `QVideoSink``QGraphicsVideoItem` dla przyszłych overlayów
- Domyślnie: `QVideoWidget` (prosta, niska latencja)
#### 3.2 Wymagania
- Preview nie blokuje wątku GUI
- Obsługa aspect ratio (letter/pillarbox)
- Resize okna bez migotania
---
### Faza 4 — Telemetry Collector
Cel: dokładne metryki pipeline'u wideo.
#### 4.1 Zbierane metryki
| Metryka | Metoda pomiaru |
|---|---|
| Realtime FPS | licznik klatek / okno 1 s |
| Frame time | `time.perf_counter()` między klatkami |
| Frame acquisition time | timestamp wejście frame_ready → dispatch |
| Rendering time | czas `QVideoWidget.update()` (opcjonalnie) |
| Dropped frames | detekcja przez numerację lub timestamp gap |
| CPU usage | `psutil.cpu_percent()` (dodać do requirements) |
| Memory usage | `psutil.virtual_memory()` (opcjonalnie) |
#### 4.2 API
- `TelemetryCollector` — subskrybent Frame Dispatcher
- `on_frame(frame: QVideoFrame)` — rejestruje timestamp klatki
- `get_snapshot() -> TelemetrySnapshot` — aktualny stan metryk (dataclass)
- `update_interval_ms: int` — jak często odświeżać snapshot (domyślnie 500 ms)
- Sygnał `metrics_updated(TelemetrySnapshot)` — emitowany co `update_interval_ms`
#### 4.3 TelemetrySnapshot (dataclass)
```python
@dataclass
class TelemetrySnapshot:
fps: float
frame_time_ms: float
dropped_frames: int
cpu_percent: float
memory_mb: float | None
timestamp: float
```
---
## Edge cases / uwagi
### Faza 5 — Overlay System
- Brak kamer → menu Camera pokazuje "No cameras detected"
- Brak klatek → VideoWidget pokazuje ciemne tło + napis
- Błąd kamery → CameraService emituje `error_occurred` (na razie tylko log)
- Zamknięcie okna → `closeEvent``camera.stop()`
- QVideoFrame.toImage() kopiuje dane — akceptowalne dla MVP
- Wszystkie obiekty żyją w main thread — brak problemów z threadingiem
Cel: wyświetlanie metryk na przezroczystej warstwie nad podglądem.
#### 5.1 Architektura
- `OverlayWidget(QWidget)` — przezroczysty widget (`WA_TransparentForMouseEvents`)
- Pozycjonowany absolutnie nad `QVideoWidget` (ten sam parent, wyższy z-index)
- `paintEvent` rysuje semi-przezroczysty prostokąt + tekst z metrykami
- Połączony z sygnałem `metrics_updated` — odświeża tylko gdy dane się zmienią
#### 5.2 Zawartość overlaya (MVP)
```
FPS: 60.0
Frame: 16.7 ms
Drop: 0
CPU: 12.3 %
```
#### 5.3 Sterowalność
- Widoczność overlaya: toggle przez menu Debug
- Pozycja: lewy górny róg (stała w MVP)
- Kolor tła: `rgba(0, 0, 0, 160)`
---
### Faza 6 — GUI / Main Window
Cel: minimalne, funkcjonalne okno aplikacji.
#### 6.1 MainWindow
- `QMainWindow` z `QVideoWidget` jako central widget
- `OverlayWidget` nałożony na video
- Obsługa resize → reposition overlay
- Tytuł okna: `Duck Preview`
#### 6.2 MenuBar
Menu **Camera**:
- Lista wykrytych kamer (radio-style)
- Separator
- Reconnect
Menu **Video**:
- Resolution submenu (pobierane dynamicznie z `QCameraDevice.videoFormats()`)
- FPS submenu
Menu **Debug**:
- Toggle overlay metryk
- Logowanie do konsoli (toggle)
#### 6.3 Startup flow
```
main.py
→ QApplication
→ CameraEnumerator.list_cameras()
→ MainWindow(cameras)
→ CameraService.start(cameras[0]) # pierwsza kamera lub ELP
→ FrameDispatcher.subscribe(telemetry, renderer)
→ app.exec()
```
---
### Faza 7 — Testy i walidacja
#### 7.1 Testy jednostkowe
| Moduł | Co testować |
|---|---|
| `CameraEnumerator` | lista kamer, brak kamer, format danych |
| `TelemetryCollector` | obliczenia FPS, wykrywanie dropów |
| `FrameDispatcher` | subskrypcja, odsubskrypcja, dispatch |
| `TelemetrySnapshot` | poprawność dataclass |
#### 7.2 Testy manualne (Windows dev)
- [ ] Uruchomienie z kamerą laptopa / USB webcam
- [ ] Przełączanie kamer
- [ ] Zmiana rozdzielczości
- [ ] Zmiana FPS
- [ ] Toggle overlay
- [ ] Reconnect po odłączeniu kamery
#### 7.3 Testy na Mac Mini (target)
- [ ] Wykrycie kamery ELP
- [ ] Poprawny format YUYV/MJPEG
- [ ] Wydajność AVFoundation vs DirectShow
- [ ] GPU rendering przez Metal
#### 7.4 Kryteria sukcesu (z PRD)
- Preview stabilny i płynny
- Latencja renderowania niska
- Dane telemetrii dokładne
- GUI responsywne
- Overlay działa poprawnie
- Architektura gotowa na subskrybentów AI
---
## Kolejność implementacji (sprint order)
```
Sprint 1: Faza 0 — scaffolding, pyproject.toml, requirements
Sprint 2: Faza 1 — CameraEnumerator + CameraService (bez GUI)
Sprint 3: Faza 3 — VideoRenderer + MainWindow (preview działa)
Sprint 4: Faza 2 — FrameDispatcher (refactor pipeline)
Sprint 5: Faza 4 — TelemetryCollector
Sprint 6: Faza 5 — OverlayWidget
Sprint 7: Faza 6 — MenuBar (camera/resolution/fps switch)
Sprint 8: Faza 7 — Testy, poprawki, walidacja na Mac Mini
```
---
## Zależności do dodania
```
# requirements.txt
PySide6>=6.7
psutil>=6.0
# requirements-dev.txt
pytest>=8.0
ruff>=0.4
```
---
## Uwagi cross-platform
1. **ELP camera** — kamera UVC, powinna działać bez dodatkowych sterowników na obu platformach. Sprawdzić obsługiwane rozdzielczości i FPS przez `QCameraDevice.videoFormats()`.
2. **Ścieżki absolutne** — unikać `os.path` na korzyść `pathlib.Path`.
3. **Threading** — wszystkie operacje Qt muszą odbywać się w wątku GUI. `TelemetryCollector` może używać `QTimer` zamiast osobnego wątku.
4. **Format klatek** — na macOS AVFoundation preferuje `BGRA` lub `NV12`. Konwersja powinna być leniwa i tylko gdy potrzebna (nie w hot path renderowania).
5. **High DPI** — włączyć `QApplication.setHighDpiScaleFactorRoundingPolicy` dla konsistencji Windows/Mac.
6. **Testowanie bez kamery**`CameraEnumerator` powinien umożliwiać dependency injection / mock dla środowisk CI.

View File

@@ -253,6 +253,7 @@ Architecture must support future additions:
* snapshots,
* streaming,
* remote sinks.
* play video files
Without major redesign.

View File

@@ -1,212 +0,0 @@
# Plan — macOS fix + QVideoWidget + QVideoSink dual output
## Cel
Przywrócić działanie kamery na macOS (Elgato) przez:
1. Przejście z manualnego `QPainter` renderowania na natywny `QVideoWidget`
2. Dodanie `QVideoSink` jako drugiego wyjścia (frame access dla telemetrii)
3. Obsługa `QCameraPermission` (Qt 6.5+)
4. Dokumentacja packagingu przez `pyside6-deploy` na macOS
## Kolejność implementacji
### 1. camera/service.py
```python
class CameraService(QObject):
error_occurred = Signal(str)
def __init__(self, parent=None):
super().__init__(parent)
self._session = QMediaCaptureSession()
self._camera: QCamera | None = None
def set_video_widget(self, widget: QVideoWidget) -> None:
self._session.setVideoOutput(widget)
def set_video_sink(self, sink: QVideoSink) -> None:
self._session.setVideoSink(sink)
@property
def session(self) -> QMediaCaptureSession:
return self._session
@staticmethod
def available_cameras() -> list[QCameraDevice]:
return QMediaDevices.videoInputs()
def is_active(self) -> bool:
return self._camera is not None and self._camera.isActive()
def start(self, device: QCameraDevice) -> None:
self.stop()
self._camera = QCamera(device, self)
self._camera.errorOccurred.connect(self._on_error)
self._session.setCamera(self._camera)
self._camera.start()
def stop(self) -> None:
if self._camera is not None:
self._camera.stop()
self._session.setCamera(None)
self._camera.deleteLater()
self._camera = None
def set_camera_format(self, fmt: QCameraFormat) -> None:
if self._camera is not None:
self._camera.setCameraFormat(fmt)
def _on_error(self, error, error_string):
self.error_occurred.emit(error_string)
```
Zmiany:
- Usunięto `self._sink` z `__init__`
- Usunięto `self._session.setVideoOutput(self._sink)`
- Dodano `set_video_widget(widget)``session.setVideoOutput(widget)`
- Dodano `set_video_sink(sink)``session.setVideoSink(sink)`
### 2. rendering/video_widget.py
```python
from __future__ import annotations
from PySide6.QtMultimediaWidgets import QVideoWidget
class VideoWidget(QVideoWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.setMinimumSize(320, 240)
```
Zmiany:
- Dziedziczy po `QVideoWidget` zamiast `QWidget`
- Usunięto `on_frame(frame)` — nie potrzeba, renderowanie natywne
- Usunięto `paintEvent` — nie potrzeba, robi to Qt
- Usunięto `QImage`, `QPainter`, `SmoothPixmapTransform`
### 3. rendering/overlay.py
```python
font = QFont("monospace", 11) # zamiast "Consolas"
```
Dodatkowo: obsługa błędu w `paintEvent`:
- Jeśli `self._metrics` ma klucz `"error"` → pomiń normalne metryki, narysuj czerwony komunikat błędu
- Użyj `QColor(180, 40, 40)` zamiast zielonego
### 4. app.py — nowe DI wiring
```python
def main():
app = QApplication(sys.argv)
app.setApplicationName("Duck Preview")
camera = CameraService()
video_sink = QVideoSink()
dispatcher = FrameDispatcher()
telemetry = TelemetryCollector()
video_widget = VideoWidget()
overlay = OverlayWidget()
camera.set_video_widget(video_widget)
camera.set_video_sink(video_sink)
video_sink.videoFrameChanged.connect(dispatcher.on_frame)
dispatcher.subscribe(telemetry.on_frame)
window = MainWindow(camera, video_widget, overlay)
# Wire error → overlay
camera.error_occurred.connect(
lambda msg: overlay.set_metrics({"error": msg})
)
# Poll telemetry → overlay
metrics_timer = QTimer()
metrics_timer.timeout.connect(
lambda: overlay.set_metrics(telemetry.metrics())
)
metrics_timer.start(200)
window.show()
sys.exit(app.exec())
```
Zmiany:
- Tworzy `QVideoSink` jawnie
- `camera.set_video_widget(video_widget)` + `camera.set_video_sink(video_sink)`
- `video_sink.videoFrameChanged → dispatcher.on_frame`
- Usunięto `camera.sink` (property nie istnieje)
- `camera.error_occurred → overlay.set_metrics({"error": msg})`
### 5. main_window.py — permission flow
```python
# Importy
from PySide6.QtCore import QCameraPermission, Qt
from PySide6.QtWidgets import QApplication
# W _on_camera_selected:
def _on_camera_selected(self, device: QCameraDevice) -> None:
self._request_camera_permission(device)
def _request_camera_permission(self, device: QCameraDevice) -> None:
perm = QCameraPermission()
match QApplication.checkPermission(perm):
case Qt.PermissionStatus.Undetermined:
QApplication.requestPermission(perm, self,
lambda: self._request_camera_permission(device))
case Qt.PermissionStatus.Denied:
self._overlay.set_metrics({
"error": "Camera permission denied.\n"
"Grant access in System Settings > "
"Privacy & Security > Camera"
})
case Qt.PermissionStatus.Granted:
self._camera.start(device)
self._rebuild_resolution_menu(device)
```
### 6. pyside6-deploy.toml
```toml
[app]
script = "duck_preview/__main__.py"
name = "Duck Preview"
bundle_identifier = "com.bartool.duck-preview"
categories = "public.app-category.photography"
platforms = ["macos"]
```
### 7. Po zmianach
- `ruff check .` — czysty
- `ruff format .` — sformatowany
- `pytest -q` — 5 passed
---
## Podsumowanie architektury końcowej
```
CameraService
├─ session.setVideoOutput(VideoWidget) → natywne GPU rendering
└─ session.setVideoSink(VideoSink) → frame access
└─ videoFrameChanged
→ FrameDispatcher
├─ TelemetryCollector
└─ [future AI]
MainWindow
├─ _on_camera_selected
│ → QCameraPermission check (Undetermined/Denied/Granted)
│ → CameraService.start(device)
├─ error_occurred → OverlayWidget.set_metrics({"error": ...})
└─ QTimer 200ms → TelemetryCollector.metrics() → OverlayWidget
OverlayWidget
├─ normal: zielone metryki (FPS, frame time, frame count)
└─ error: czerwony komunikat (np. brak permisji)
```

View File

@@ -1,126 +0,0 @@
# macOS — Camera on macOS with PySide6
## Problem
Aplikacja uruchomiona w interpreted mode (`python -m duck_preview`) na macOS z kamerą Elgato nie wyświetla obrazu.
## Przyczyna
Oficjalny przykład PySide6 (`camera.qml` / examples/multimedia) zawiera kod:
```python
if sys.platform == "darwin":
is_nuitka = "__compiled__" in globals()
if not is_nuitka and sys.platform == "darwin":
print("This example does not work on macOS when Python is run "
"in interpreted mode. For this example to work on macOS, "
"package the example using pyside6-deploy")
sys.exit(0)
```
**macOS → AVFoundation → wymaga spakowania przez Nuitka/pyside6-deploy.**
`QCamera` na macOS potrzebuje properly bundled app structure (Info.plist, entitlements, code signing). W interpreted mode Python nie ma tego contextu.
## Rozwiązanie
Pakować aplikację przez `pyside6-deploy` (Nuitka) na macOS. Na Windows działa bez pakowania.
---
## Qt Permission API (`QCameraPermission`)
Od Qt 6.5+ dostępne jest `QCameraPermission` — nowoczesne API do proszenia o zgodę kamery.
### API
```python
from PySide6.QtCore import QCameraPermission, Qt
from PySide6.QtWidgets import QApplication
perm = QCameraPermission()
match QApplication.checkPermission(perm):
case Qt.PermissionStatus.Undetermined:
# Prosimy o zgodę → callback wywoła init ponownie
QApplication.requestPermission(perm, parent, callback)
return
case Qt.PermissionStatus.Denied:
# overlay: "Camera permission denied"
case Qt.PermissionStatus.Granted:
# uruchom kamerę
```
### Permission flow
```
User → wybiera kamerę w menu
→ MainWindow._on_camera_selected(device)
→ checkPermission(QCameraPermission)
├─ Undetermined → requestPermission(camera, parent, callback)
│ └─ callback → _on_camera_selected ponownie
├─ Denied → overlay: "Camera permission denied. Grant access in System Settings > Privacy & Security > Camera"
└─ Granted → CameraService.start(device)
```
---
## QVideoWidget + QVideoSink — dual output
PySide6 6.11 ma osobne metody w `QMediaCaptureSession`:
| Metoda | Przeznaczenie |
|--------|---------------|
| `setVideoOutput(QVideoWidget*)` | Natywne renderowanie (GPU) |
| `setVideoSink(QVideoSink*)` | Dostęp do klatek (telemetria, AI) |
Oba działają **równolegle** — nie ma konfliktu.
```
QMediaCaptureSession
├─ setVideoOutput(QVideoWidget) → natywne renderowanie
└─ setVideoSink(QVideoSink) → frame access
└─ videoFrameChanged → FrameDispatcher
```
QVideoSink dostarcza sygnał `videoFrameChanged(QVideoFrame)` — na to podpina się FrameDispatcher, a ten rozsyła do TelemetryCollector i przyszłych AI subscriberów.
---
## macOS — packaging
### pyside6-deploy
```bash
pip install nuitka
pyside6-deploy duck_preview/__main__.py --name "Duck Preview"
```
### pyside6-deploy.toml
```toml
[app]
script = "duck_preview/__main__.py"
name = "Duck Preview"
bundle_identifier = "com.bartool.duck-preview"
categories = "public.app-category.photography"
platforms = ["macos"]
```
Po spakowaniu powstaje `Duck Preview.app` — standalone bundle z dostępem do AVFoundation.
### Windows
Na Windows działa bez pakowania — `python -m duck_preview` w venv.
---
## Font fallback
`Consolas` nie istnieje na macOS. Używać `"monospace"` (generic font family — Qt mapuje na Menlo na macOS, Consolas na Windows).
## Error state w Overlay
Overlay wspiera wyświetlanie błędów (np. brak permisji):
- Normalny stan → zielone metryki (FPS, frame times)
- Error state → czerwony komunikat
- Przełączanie przez `overlay.set_metrics({"error": "message"})`

View File

@@ -1,21 +1,43 @@
[project]
name = "duck-preview"
version = "0.1.0"
description = "Realtime camera preview application with performance monitoring"
description = "Realtime camera preview application with telemetry"
requires-python = ">=3.12"
dependencies = [
"PySide6>=6.11",
"PySide6>=6.7",
"psutil>=6.0",
]
[project.scripts]
duck-preview = "duck_preview.app:main"
duck-preview = "app.main:main"
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_functions = ["test_*"]
addopts = "-v"
[tool.ruff]
target-version = "py312"
line-length = 100
[tool.ruff.lint]
select = ["E", "F", "I", "N", "W"]
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"I", # isort
"UP", # pyupgrade
"B", # flake8-bugbear
"N", # pep8-naming
]
ignore = [
"B008", # do not perform function calls in default arguments
]
[tool.pytest.ini_options]
testpaths = ["tests"]
[tool.ruff.lint.isort]
known-first-party = ["app"]
[tool.ruff.format]
quote-style = "double"
indent-style = "space"

View File

@@ -1,6 +0,0 @@
[app]
script = "duck_preview/__main__.py"
name = "Duck Preview"
bundle_identifier = "com.bartool.duck-preview"
categories = "public.app-category.photography"
platforms = ["macos"]

3
requirements-dev.txt Normal file
View File

@@ -0,0 +1,3 @@
-r requirements.txt
pytest>=8.0
ruff>=0.4

2
requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
PySide6>=6.7
psutil>=6.0

View File

@@ -1,22 +0,0 @@
import time
from unittest.mock import MagicMock
from duck_preview.telemetry.collector import TelemetryCollector
def test_metrics_empty():
collector = TelemetryCollector()
metrics = collector.metrics()
assert metrics == {"fps": 0, "frame_time_ms": 0.0, "frame_count": 0}
def test_metrics_after_frames():
collector = TelemetryCollector()
mock = MagicMock()
for _ in range(30):
collector.on_frame(mock)
time.sleep(0.002)
metrics = collector.metrics()
assert metrics["frame_count"] >= 30
assert metrics["frame_time_ms"] > 0

View File

@@ -1,47 +0,0 @@
from duck_preview.dispatcher.frame_dispatcher import FrameDispatcher
def test_subscribe_notified():
dispatcher = FrameDispatcher()
received = []
def cb(frame):
received.append(frame)
dispatcher.subscribe(cb)
dispatcher.on_frame("test")
assert received == ["test"]
def test_unsubscribe_not_notified():
dispatcher = FrameDispatcher()
received = []
def cb(frame):
received.append(frame)
dispatcher.subscribe(cb)
dispatcher.unsubscribe(cb)
dispatcher.on_frame("test")
assert received == []
def test_multiple_subscribers():
dispatcher = FrameDispatcher()
received_1 = []
received_2 = []
def cb1(frame):
received_1.append(frame)
def cb2(frame):
received_2.append(frame)
dispatcher.subscribe(cb1)
dispatcher.subscribe(cb2)
dispatcher.on_frame("test")
assert received_1 == ["test"]
assert received_2 == ["test"]

View File

@@ -0,0 +1,76 @@
"""Tests for FrameDispatcher."""
from __future__ import annotations
from unittest.mock import MagicMock, patch
from app.pipeline.frame_dispatcher import FrameDispatcher
def _make_frame():
frame = MagicMock()
frame.isValid.return_value = True
return frame
class TestFrameDispatcher:
def setup_method(self):
# FrameDispatcher is a QObject — needs QApplication
# Use minimal mock to avoid Qt dependency in unit tests
with patch("app.pipeline.frame_dispatcher.QObject.__init__", return_value=None):
self.dispatcher = FrameDispatcher.__new__(FrameDispatcher)
self.dispatcher._subscribers = []
self.dispatcher._frame_count = 0
self.dispatcher._last_dispatch_time = 0.0
def test_subscribe_adds_subscriber(self):
cb = MagicMock()
self.dispatcher.subscribe(cb)
assert self.dispatcher.subscriber_count() == 1
def test_subscribe_same_callback_twice_is_noop(self):
cb = MagicMock()
self.dispatcher.subscribe(cb)
self.dispatcher.subscribe(cb)
assert self.dispatcher.subscriber_count() == 1
def test_unsubscribe_removes_subscriber(self):
cb = MagicMock()
self.dispatcher.subscribe(cb)
self.dispatcher.unsubscribe(cb)
assert self.dispatcher.subscriber_count() == 0
def test_unsubscribe_nonexistent_does_not_raise(self):
cb = MagicMock()
self.dispatcher.unsubscribe(cb) # should not raise
def test_dispatch_calls_all_subscribers(self):
cb1 = MagicMock()
cb2 = MagicMock()
self.dispatcher.subscribe(cb1)
self.dispatcher.subscribe(cb2)
frame = _make_frame()
self.dispatcher.dispatch(frame)
cb1.assert_called_once_with(frame)
cb2.assert_called_once_with(frame)
def test_dispatch_increments_frame_count(self):
frame = _make_frame()
self.dispatcher.dispatch(frame)
self.dispatcher.dispatch(frame)
assert self.dispatcher.frame_count == 2
def test_dispatch_with_no_subscribers_does_not_raise(self):
frame = _make_frame()
self.dispatcher.dispatch(frame) # should not raise
def test_subscriber_exception_does_not_stop_others(self):
def bad_cb(f):
raise RuntimeError("boom")
good_cb = MagicMock()
self.dispatcher.subscribe(bad_cb)
self.dispatcher.subscribe(good_cb)
frame = _make_frame()
self.dispatcher.dispatch(frame) # should not raise
good_cb.assert_called_once_with(frame)

View File

@@ -0,0 +1,107 @@
"""Tests for TelemetryCollector."""
from __future__ import annotations
import time
from collections import deque
from unittest.mock import MagicMock, patch
class TestTelemetryCollector:
"""Test telemetry calculations in isolation (no Qt event loop required)."""
def _make_collector(self):
"""Construct a TelemetryCollector bypassing Qt machinery."""
from app.telemetry.telemetry_collector import TelemetryCollector
with patch.object(TelemetryCollector, "__init__", return_value=None):
col = TelemetryCollector.__new__(TelemetryCollector)
col._frame_times = deque(maxlen=120)
col._last_frame_time = 0.0
col._total_frames = 0
col._dropped_frames = 0
col._fps_window = deque()
col._fps_window_size_s = 1.0
col._process = MagicMock()
col._process.memory_info.return_value.rss = 50 * 1024 * 1024 # 50 MB
return col
def test_initial_snapshot_has_zero_fps(self):
col = self._make_collector()
snap = col._compute_snapshot()
assert snap.fps == 0.0
def test_fps_counts_frames_in_window(self):
col = self._make_collector()
now = time.perf_counter()
# Simulate 30 frames within the last second
for i in range(30):
col._fps_window.append(now - 0.9 + i * 0.03)
snap = col._compute_snapshot()
assert snap.fps == 30.0
def test_fps_excludes_old_frames(self):
col = self._make_collector()
now = time.perf_counter()
# 10 frames older than 1 second — should not count
for i in range(10):
col._fps_window.append(now - 2.0 + i * 0.05)
# 5 frames within the last second
for i in range(5):
col._fps_window.append(now - 0.4 + i * 0.05)
snap = col._compute_snapshot()
assert snap.fps == 5.0
def test_frame_time_average(self):
col = self._make_collector()
# 10 frames at 33.3 ms each
interval = 0.0333
for _ in range(10):
col._frame_times.append(interval)
snap = col._compute_snapshot()
assert abs(snap.frame_time_ms - interval * 1000) < 0.1
def test_drop_detection(self):
col = self._make_collector()
# Seed with 10 normal frames at ~16 ms
normal_interval = 0.016
now = time.perf_counter()
col._last_frame_time = now - normal_interval * 10
for _ in range(9):
col._last_frame_time += normal_interval
col._frame_times.append(normal_interval)
# Simulate a big gap (3× normal) — should trigger drop detection
col._last_frame_time = now - normal_interval * 10 # reset base
# Manually call on_frame-like logic
with patch("app.telemetry.telemetry_collector.time") as mock_time:
# Set last_frame_time to something reasonable
col._last_frame_time = now
big_delta = normal_interval * 5 # 5× average → drop
mock_time.perf_counter.return_value = now + big_delta
# Replicate the drop detection logic
delta = big_delta
col._frame_times.append(delta)
avg = sum(col._frame_times) / len(col._frame_times)
if delta > avg * 2.5:
col._dropped_frames += 1
assert col._dropped_frames == 1
def test_reset_counters(self):
col = self._make_collector()
col._total_frames = 100
col._dropped_frames = 5
col._frame_times.append(0.016)
col._fps_window.append(time.perf_counter())
col.reset_counters()
assert col._total_frames == 0
assert col._dropped_frames == 0
assert len(col._frame_times) == 0
assert len(col._fps_window) == 0
def test_snapshot_memory_mb(self):
col = self._make_collector()
snap = col._compute_snapshot()
assert snap.memory_mb == 50.0