Compare commits

..

2 Commits

37 changed files with 943 additions and 1787 deletions

View File

@@ -1,80 +0,0 @@
"""Camera enumeration — discovers available video input devices."""
from __future__ import annotations
from dataclasses import dataclass, field
from PySide6.QtMultimedia import QCameraDevice, QMediaDevices
@dataclass
class CameraInfo:
"""Lightweight descriptor of a detected camera."""
device: QCameraDevice
name: str
id: str
formats: list[tuple[int, int, float]] = field(default_factory=list)
# formats: list of (width, height, max_fps)
def __str__(self) -> str:
return f"{self.name} [{self.id}]"
class CameraEnumerator:
"""Discovers available video input devices via QMediaDevices."""
@staticmethod
def list_cameras() -> list[CameraInfo]:
"""Return all available camera devices with their supported formats."""
devices = QMediaDevices.videoInputs()
cameras: list[CameraInfo] = []
for device in devices:
formats: list[tuple[int, int, float]] = []
for fmt in device.videoFormats():
res = fmt.resolution()
fps = fmt.maxFrameRate()
formats.append((res.width(), res.height(), fps))
# deduplicate and sort: largest resolution first, then fps descending
seen: set[tuple[int, int, float]] = set()
unique_formats: list[tuple[int, int, float]] = []
for f in sorted(formats, key=lambda x: (x[0] * x[1], x[2]), reverse=True):
if f not in seen:
seen.add(f)
unique_formats.append(f)
cameras.append(
CameraInfo(
device=device,
name=device.description(),
id=device.id().toStdString()
if hasattr(device.id(), "toStdString")
else device.id().data().decode("utf-8", errors="replace"),
formats=unique_formats,
)
)
return cameras
@staticmethod
def default_camera() -> CameraInfo | None:
"""Return the system default camera, or None if no camera is available."""
device = QMediaDevices.defaultVideoInput()
if device.isNull():
return None
cameras = CameraEnumerator.list_cameras()
# find by id match
default_id = (
device.id().toStdString()
if hasattr(device.id(), "toStdString")
else device.id().data().decode("utf-8", errors="replace")
)
for cam in cameras:
if cam.id == default_id:
return cam
# fallback: wrap directly
return cameras[0] if cameras else None

View File

@@ -1,174 +0,0 @@
"""Camera Service — manages QCamera lifecycle and frame acquisition."""
from __future__ import annotations
import logging
from PySide6.QtCore import QObject, Signal
from PySide6.QtMultimedia import (
QCamera,
QMediaCaptureSession,
QVideoFrame,
QVideoSink,
)
from app.camera.camera_enumerator import CameraInfo
from app.config import DEFAULT_FPS, DEFAULT_HEIGHT, DEFAULT_WIDTH
logger = logging.getLogger(__name__)
class CameraService(QObject):
"""
Manages camera initialisation, configuration and frame delivery.
Emits:
frame_ready(QVideoFrame) — new frame available from the camera
camera_started() — camera successfully opened and streaming
camera_stopped() — camera stopped (clean shutdown)
camera_error(str) — camera error description
"""
frame_ready = Signal(QVideoFrame)
camera_started = Signal()
camera_stopped = Signal()
camera_error = Signal(str)
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self._camera: QCamera | None = None
self._session = QMediaCaptureSession(self)
self._sink = QVideoSink(self)
self._current_info: CameraInfo | None = None
self._session.setVideoSink(self._sink)
self._sink.videoFrameChanged.connect(self._on_frame)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def start(self, camera_info: CameraInfo) -> None:
"""Start streaming from the given camera device."""
self.stop()
self._current_info = camera_info
self._camera = QCamera(camera_info.device, self)
self._camera.errorOccurred.connect(self._on_error)
self._camera.activeChanged.connect(self._on_active_changed)
self._session.setCamera(self._camera)
self._apply_best_format(camera_info)
self._camera.start()
logger.info("Camera start requested: %s", camera_info.name)
def stop(self) -> None:
"""Stop the current camera."""
if self._camera is not None:
self._camera.stop()
self._camera.errorOccurred.disconnect()
self._camera.activeChanged.disconnect()
self._camera = None
self._current_info = None
logger.info("Camera stopped")
def reconnect(self) -> None:
"""Restart the current camera after an error or disconnect."""
if self._current_info is not None:
logger.info("Reconnecting camera: %s", self._current_info.name)
self.start(self._current_info)
else:
logger.warning("Reconnect requested but no camera was previously started")
def set_resolution(self, width: int, height: int) -> None:
"""Request a specific resolution. Effective on next start() if camera is active."""
if self._camera is None:
return
self._set_format(width, height, fps=None)
def set_fps(self, fps: float) -> None:
"""Request a specific frame rate."""
if self._camera is None or self._current_info is None:
return
# Get current resolution from active format
fmt = self._camera.cameraFormat()
res = fmt.resolution()
self._set_format(res.width(), res.height(), fps=fps)
@property
def is_active(self) -> bool:
return self._camera is not None and self._camera.isActive()
@property
def current_camera(self) -> CameraInfo | None:
return self._current_info
# ------------------------------------------------------------------
# Video output accessor for direct QVideoWidget connection
# ------------------------------------------------------------------
def video_sink(self) -> QVideoSink:
"""Return the internal QVideoSink (used by VideoRenderer)."""
return self._sink
def capture_session(self) -> QMediaCaptureSession:
"""Return the capture session (can be connected to QVideoWidget directly)."""
return self._session
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _apply_best_format(self, info: CameraInfo) -> None:
"""Pick the best matching format: prefer DEFAULT_WIDTH x DEFAULT_HEIGHT at DEFAULT_FPS."""
if not info.formats:
return
self._set_format(DEFAULT_WIDTH, DEFAULT_HEIGHT, fps=float(DEFAULT_FPS))
def _set_format(self, width: int, height: int, fps: float | None) -> None:
if self._camera is None or self._current_info is None:
return
best = None
best_score = -1
for fmt in self._current_info.device.videoFormats():
res = fmt.resolution()
w, h = res.width(), res.height()
f = fmt.maxFrameRate()
res_match = int(w == width and h == height) * 1000
fps_match = int(fps is not None and abs(f - fps) < 1) * 100
area_score = -(abs(w * h - width * height))
score = res_match + fps_match + area_score
if score > best_score:
best_score = score
best = fmt
if best is not None:
self._camera.setCameraFormat(best)
res = best.resolution()
logger.info(
"Camera format set: %dx%d @ %.1f fps",
res.width(),
res.height(),
best.maxFrameRate(),
)
def _on_frame(self, frame: QVideoFrame) -> None:
if frame.isValid():
self.frame_ready.emit(frame)
def _on_error(self, error: QCamera.Error, error_string: str) -> None:
logger.error("Camera error %s: %s", error, error_string)
self.camera_error.emit(error_string)
def _on_active_changed(self, active: bool) -> None:
if active:
logger.info("Camera active: %s", self._current_info.name if self._current_info else "?")
self.camera_started.emit()
else:
logger.info("Camera inactive")
self.camera_stopped.emit()

View File

@@ -1,22 +0,0 @@
"""Application-wide constants and default settings."""
APP_NAME = "Duck Preview"
APP_VERSION = "0.1.0"
# Default camera settings
DEFAULT_FPS = 30
DEFAULT_WIDTH = 1280
DEFAULT_HEIGHT = 720
# Telemetry
TELEMETRY_UPDATE_INTERVAL_MS = 500 # how often the metrics snapshot is refreshed
# Overlay
OVERLAY_BG_COLOR = (0, 0, 0, 160) # RGBA
OVERLAY_TEXT_COLOR = (255, 255, 255, 255)
OVERLAY_FONT_SIZE = 13
OVERLAY_PADDING = 10
OVERLAY_MARGIN = 10
# Frame dispatcher
DISPATCHER_MAX_QUEUE_SIZE = 2 # max pending frames per slow subscriber before drop

View File

@@ -1,35 +0,0 @@
"""Application entry point."""
from __future__ import annotations
import logging
import sys
from PySide6.QtCore import Qt
from PySide6.QtWidgets import QApplication
from app.config import APP_NAME
from app.ui.main_window import MainWindow
def main() -> None:
# Basic logging — WARNING by default; Debug menu toggles to DEBUG
logging.basicConfig(
level=logging.WARNING,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
app = QApplication(sys.argv)
app.setApplicationName(APP_NAME)
app.setHighDpiScaleFactorRoundingPolicy(
Qt.HighDpiScaleFactorRoundingPolicy.PassThrough
)
window = MainWindow()
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()

View File

@@ -1,54 +0,0 @@
"""IOverlayLayer — interface for pluggable overlay layers drawn on CameraView."""
from __future__ import annotations
from abc import ABC, abstractmethod
from PySide6.QtCore import QRect
from PySide6.QtGui import QPainter
class IOverlayLayer(ABC):
"""
Interface for a single overlay layer drawn over the camera frame.
Each layer receives the active QPainter and the video rect (the letterboxed
area where the camera image was drawn) so it can position elements relative
to the actual video content if needed.
To add a new overlay (e.g. YOLO bboxes):
1. Subclass IOverlayLayer.
2. Implement paint().
3. Register with CameraView.add_overlay_layer().
No Qt subclassing is required — layers are plain Python objects.
"""
@property
def name(self) -> str:
"""Human-readable identifier used in menus / debug output."""
return type(self).__name__
@property
def visible(self) -> bool:
"""Whether this layer should be drawn."""
return self._visible
@visible.setter
def visible(self, value: bool) -> None:
self._visible = value
def __init__(self) -> None:
self._visible: bool = True
@abstractmethod
def paint(self, painter: QPainter, video_rect: QRect) -> None:
"""
Draw this layer.
Args:
painter: Active QPainter on the CameraView widget.
Caller saves/restores painter state around each layer.
video_rect: The QRect where the camera image was drawn (letterboxed).
Use this to position overlays relative to the video image.
"""

View File

@@ -1,94 +0,0 @@
"""TelemetryOverlay — draws the performance metrics box on the camera view."""
from __future__ import annotations
from PySide6.QtCore import QRect, Qt, Slot
from PySide6.QtGui import QColor, QFont, QPainter, QPen
from app.config import (
OVERLAY_BG_COLOR,
OVERLAY_FONT_SIZE,
OVERLAY_MARGIN,
OVERLAY_PADDING,
OVERLAY_TEXT_COLOR,
)
from app.overlay.overlay_layer import IOverlayLayer
from app.telemetry.telemetry_collector import TelemetrySnapshot
class TelemetryOverlay(IOverlayLayer):
"""
Renders a semi-transparent metrics box in the top-left corner.
Usage:
overlay = TelemetryOverlay()
camera_view.add_overlay_layer(overlay)
telemetry_collector.metrics_updated.connect(overlay.on_metrics_updated)
"""
def __init__(self) -> None:
super().__init__()
self._snapshot: TelemetrySnapshot | None = None
self._font = QFont("Monospace")
self._font.setStyleHint(QFont.StyleHint.TypeWriter)
self._font.setPointSize(OVERLAY_FONT_SIZE)
self._font.setBold(False)
@Slot(object)
def on_metrics_updated(self, snapshot: TelemetrySnapshot) -> None:
"""Receive a new snapshot from TelemetryCollector."""
self._snapshot = snapshot
# ------------------------------------------------------------------
# IOverlayLayer
# ------------------------------------------------------------------
def paint(self, painter: QPainter, video_rect: QRect) -> None:
if self._snapshot is None:
return
lines = self._format_lines(self._snapshot)
if not lines:
return
painter.setFont(self._font)
fm = painter.fontMetrics()
line_height = fm.height()
max_width = max(fm.horizontalAdvance(line) for line in lines)
box_w = max_width + OVERLAY_PADDING * 2
box_h = line_height * len(lines) + OVERLAY_PADDING * 2
# Position relative to the actual video area, not the full widget
x = video_rect.left() + OVERLAY_MARGIN
y = video_rect.top() + OVERLAY_MARGIN
# Background
painter.setBrush(QColor(*OVERLAY_BG_COLOR))
painter.setPen(Qt.PenStyle.NoPen)
painter.drawRoundedRect(QRect(x, y, box_w, box_h), 6, 6)
# Text
painter.setPen(QPen(QColor(*OVERLAY_TEXT_COLOR)))
text_x = x + OVERLAY_PADDING
text_y = y + OVERLAY_PADDING + fm.ascent()
for line in lines:
painter.drawText(text_x, text_y, line)
text_y += line_height
# ------------------------------------------------------------------
# Private
# ------------------------------------------------------------------
@staticmethod
def _format_lines(snap: TelemetrySnapshot) -> list[str]:
lines = [
f"FPS {snap.fps:>6.1f}",
f"Frame {snap.frame_time_ms:>6.1f} ms",
f"Drop {snap.dropped_frames:>6d}",
f"CPU {snap.cpu_percent:>5.1f} %",
]
if snap.memory_mb is not None:
lines.append(f"Mem {snap.memory_mb:>5.0f} MB")
return lines

View File

@@ -1,111 +0,0 @@
"""Frame Dispatcher — distributes QVideoFrames to registered subscribers."""
from __future__ import annotations
import logging
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from PySide6.QtCore import QObject, Slot
from PySide6.QtMultimedia import QVideoFrame
logger = logging.getLogger(__name__)
FrameCallback = Callable[[QVideoFrame], None]
@dataclass
class _Subscriber:
callback: FrameCallback
drop_if_busy: bool = True
_busy: bool = field(default=False, init=False, repr=False)
class FrameDispatcher(QObject):
"""
Receives frames from CameraService and fans them out to all subscribers.
Each subscriber is a callable (QVideoFrame) -> None.
Subscribers that set drop_if_busy=True will skip a frame if they are still
processing the previous one (non-blocking). Subscribers with drop_if_busy=False
always receive every frame.
All dispatch happens in the GUI thread (Qt signal/slot), so subscribers
must NOT perform heavy work directly — they should queue to a worker thread
if processing is needed.
"""
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self._subscribers: list[_Subscriber] = []
self._frame_count: int = 0
self._last_dispatch_time: float = 0.0
# ------------------------------------------------------------------
# Subscription API
# ------------------------------------------------------------------
def subscribe(self, callback: FrameCallback, *, drop_if_busy: bool = True) -> None:
"""Register a frame callback.
Args:
callback: Callable that receives QVideoFrame.
drop_if_busy: When True, frame is skipped if subscriber is still
marked busy from last call (default True).
"""
for sub in self._subscribers:
if sub.callback is callback:
logger.warning("Subscriber %r already registered", callback)
return
self._subscribers.append(_Subscriber(callback=callback, drop_if_busy=drop_if_busy))
logger.debug("Subscriber added: %r (drop_if_busy=%s)", callback, drop_if_busy)
def unsubscribe(self, callback: FrameCallback) -> None:
"""Remove a previously registered callback."""
before = len(self._subscribers)
self._subscribers = [s for s in self._subscribers if s.callback is not callback]
if len(self._subscribers) < before:
logger.debug("Subscriber removed: %r", callback)
else:
logger.warning("Subscriber not found for removal: %r", callback)
def subscriber_count(self) -> int:
return len(self._subscribers)
# ------------------------------------------------------------------
# Frame intake — connect CameraService.frame_ready to this slot
# ------------------------------------------------------------------
@Slot(QVideoFrame)
def dispatch(self, frame: QVideoFrame) -> None:
"""Distribute the frame to all registered subscribers."""
self._frame_count += 1
now = time.perf_counter()
self._last_dispatch_time = now
for sub in self._subscribers:
if sub.drop_if_busy and sub._busy:
logger.debug("Dropping frame for busy subscriber %r", sub.callback)
continue
sub._busy = True
try:
sub.callback(frame)
except Exception:
logger.exception("Error in frame subscriber %r", sub.callback)
finally:
sub._busy = False
# ------------------------------------------------------------------
# Stats
# ------------------------------------------------------------------
@property
def frame_count(self) -> int:
return self._frame_count
@property
def last_dispatch_time(self) -> float:
"""perf_counter timestamp of the last dispatched frame."""
return self._last_dispatch_time

View File

@@ -1,176 +0,0 @@
"""Telemetry Collector — measures video pipeline performance metrics."""
from __future__ import annotations
import time
from collections import deque
from dataclasses import dataclass
import psutil
from PySide6.QtCore import QObject, QTimer, Signal
from PySide6.QtMultimedia import QVideoFrame
from app.config import TELEMETRY_UPDATE_INTERVAL_MS
@dataclass
class TelemetrySnapshot:
"""Immutable snapshot of current performance metrics."""
fps: float
frame_time_ms: float # average inter-frame time in ms
dropped_frames: int # cumulative dropped frames detected
cpu_percent: float # this process CPU usage (0100, all cores)
memory_mb: float | None # process private working set in MB
timestamp: float # time.perf_counter() when snapshot was taken
class TelemetryCollector(QObject):
"""
Frame subscriber that measures pipeline performance.
Connect to FrameDispatcher:
dispatcher.subscribe(collector.on_frame, drop_if_busy=False)
Listen to metrics updates:
collector.metrics_updated.connect(my_slot)
"""
metrics_updated = Signal(object) # emits TelemetrySnapshot
def __init__(
self,
update_interval_ms: int = TELEMETRY_UPDATE_INTERVAL_MS,
parent: QObject | None = None,
) -> None:
super().__init__(parent)
self._update_interval_ms = update_interval_ms
# frame timing ring-buffer (last 120 samples)
self._frame_times: deque[float] = deque(maxlen=120)
self._last_frame_time: float = 0.0
self._total_frames: int = 0
self._dropped_frames: int = 0
# FPS window — count frames in the last second
self._fps_window: deque[float] = deque() # timestamps of recent frames
self._fps_window_size_s: float = 1.0
# psutil process reference — call cpu_percent once to initialise the baseline
self._process = psutil.Process()
self._process.cpu_percent() # first call always returns 0.0; discard it
# periodic snapshot timer
self._timer = QTimer(self)
self._timer.setInterval(update_interval_ms)
self._timer.timeout.connect(self._emit_snapshot)
self._timer.start()
# latest snapshot (available synchronously)
self._latest: TelemetrySnapshot = self._make_empty_snapshot()
# ------------------------------------------------------------------
# Frame subscriber callback
# ------------------------------------------------------------------
def on_frame(self, frame: QVideoFrame) -> None:
"""Called by FrameDispatcher for every frame. Must be fast."""
now = time.perf_counter()
# inter-frame time
if self._last_frame_time > 0:
delta = now - self._last_frame_time
self._frame_times.append(delta)
# drop detection: if delta > 2.5× the rolling average, count as drop
if len(self._frame_times) >= 5:
avg = sum(self._frame_times) / len(self._frame_times)
if delta > avg * 2.5:
self._dropped_frames += 1
self._last_frame_time = now
self._total_frames += 1
# FPS window
self._fps_window.append(now)
# prune old entries
cutoff = now - self._fps_window_size_s
while self._fps_window and self._fps_window[0] < cutoff:
self._fps_window.popleft()
# ------------------------------------------------------------------
# Snapshot
# ------------------------------------------------------------------
def latest_snapshot(self) -> TelemetrySnapshot:
"""Return the most recently computed snapshot."""
return self._latest
def reset_counters(self) -> None:
"""Reset cumulative counters (e.g. after camera switch)."""
self._frame_times.clear()
self._fps_window.clear()
self._last_frame_time = 0.0
self._total_frames = 0
self._dropped_frames = 0
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _emit_snapshot(self) -> None:
snapshot = self._compute_snapshot()
self._latest = snapshot
self.metrics_updated.emit(snapshot)
def _compute_snapshot(self) -> TelemetrySnapshot:
now = time.perf_counter()
# FPS — prune stale entries before counting
cutoff = now - self._fps_window_size_s
while self._fps_window and self._fps_window[0] < cutoff:
self._fps_window.popleft()
fps = float(len(self._fps_window)) # frames in the last second
# average frame time
if self._frame_times:
avg_frame_time_ms = (sum(self._frame_times) / len(self._frame_times)) * 1000.0
else:
avg_frame_time_ms = 0.0
# CPU — this process only, cumulative since last call (non-blocking)
try:
cpu = self._process.cpu_percent()
except Exception:
cpu = 0.0
# Memory — private working set (Windows) or RSS (macOS/Linux)
# This excludes shared DLLs/frameworks and matches Task Manager "Private"
try:
mem_info = self._process.memory_info()
# wset = Windows Working Set (private); rss on macOS/Linux
mem_bytes = getattr(mem_info, "wset", None) or mem_info.rss
mem_mb = mem_bytes / (1024 * 1024)
except Exception:
mem_mb = None
return TelemetrySnapshot(
fps=round(fps, 1),
frame_time_ms=round(avg_frame_time_ms, 2),
dropped_frames=self._dropped_frames,
cpu_percent=round(cpu, 1),
memory_mb=round(mem_mb, 1) if mem_mb is not None else None,
timestamp=now,
)
@staticmethod
def _make_empty_snapshot() -> TelemetrySnapshot:
return TelemetrySnapshot(
fps=0.0,
frame_time_ms=0.0,
dropped_frames=0,
cpu_percent=0.0,
memory_mb=None,
timestamp=time.perf_counter(),
)

View File

View File

@@ -1,144 +0,0 @@
"""CameraView — QWidget that renders camera frames and composites overlay layers.
Responsibilities:
- Receive QVideoFrame, convert to QImage, schedule repaint.
- In paintEvent: draw the frame (letterboxed) then call paint() on each
registered IOverlayLayer in order.
- Know nothing about what specific overlays draw — that is entirely up to
each IOverlayLayer implementation.
Adding a new overlay (e.g. YOLO bboxes):
layer = YoloBboxOverlay()
camera_view.add_overlay_layer(layer)
yolo_processor.results_ready.connect(layer.on_results)
No modification to CameraView is needed.
"""
from __future__ import annotations
import logging
from PySide6.QtCore import QRect, Qt, Slot
from PySide6.QtGui import QImage, QPainter
from PySide6.QtMultimedia import QVideoFrame
from PySide6.QtWidgets import QWidget
from app.overlay.overlay_layer import IOverlayLayer
logger = logging.getLogger(__name__)
class CameraView(QWidget):
"""
Camera preview widget.
Frame pipeline:
CameraService.frame_ready(QVideoFrame)
→ CameraView.on_frame() — convert to QImage, schedule update()
→ paintEvent() — draw image + all overlay layers
"""
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self.setStyleSheet("background: black;")
self._current_image: QImage | None = None
self._video_rect: QRect = QRect()
self._overlay_layers: list[IOverlayLayer] = []
# ------------------------------------------------------------------
# Overlay layer registry
# ------------------------------------------------------------------
def add_overlay_layer(self, layer: IOverlayLayer) -> None:
"""Register an overlay layer. Layers are painted in registration order."""
if layer not in self._overlay_layers:
self._overlay_layers.append(layer)
logger.debug("Overlay layer added: %s", layer.name)
def remove_overlay_layer(self, layer: IOverlayLayer) -> None:
"""Unregister an overlay layer."""
try:
self._overlay_layers.remove(layer)
logger.debug("Overlay layer removed: %s", layer.name)
except ValueError:
pass
def set_all_overlays_visible(self, visible: bool) -> None:
"""Show or hide all registered overlay layers at once."""
for layer in self._overlay_layers:
layer.visible = visible
self.update()
# ------------------------------------------------------------------
# Frame input
# ------------------------------------------------------------------
@Slot(QVideoFrame)
def on_frame(self, frame: QVideoFrame) -> None:
"""Receive a camera frame, convert to QImage, schedule repaint."""
if not frame.isValid():
return
image = frame.toImage()
if image.isNull():
return
# Convert to Format_RGB32 — QPainter's fastest path on all platforms
if image.format() != QImage.Format.Format_RGB32:
image = image.convertToFormat(QImage.Format.Format_RGB32)
self._current_image = image
self.update()
# ------------------------------------------------------------------
# Qt paint
# ------------------------------------------------------------------
def paintEvent(self, event) -> None: # noqa: N802
painter = QPainter(self)
# 1. Background
painter.fillRect(self.rect(), Qt.GlobalColor.black)
# 2. Camera frame — letterboxed
if self._current_image is not None:
img = self._current_image
self._video_rect = _letterbox_rect(
img.width(), img.height(), self.width(), self.height()
)
painter.drawImage(self._video_rect, img)
else:
self._video_rect = self.rect()
# 3. Overlay layers — each gets a clean painter state
for layer in self._overlay_layers:
if not layer.visible:
continue
painter.save()
try:
layer.paint(painter, self._video_rect)
except Exception:
logger.exception("Error painting overlay layer %s", layer.name)
finally:
painter.restore()
painter.end()
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _letterbox_rect(img_w: int, img_h: int, widget_w: int, widget_h: int) -> QRect:
"""Return the largest rect that fits the image while preserving aspect ratio."""
if img_w <= 0 or img_h <= 0:
return QRect(0, 0, widget_w, widget_h)
scale = min(widget_w / img_w, widget_h / img_h)
draw_w = int(img_w * scale)
draw_h = int(img_h * scale)
x = (widget_w - draw_w) // 2
y = (widget_h - draw_h) // 2
return QRect(x, y, draw_w, draw_h)

View File

@@ -1,171 +0,0 @@
"""Main application window."""
from __future__ import annotations
import logging
from PySide6.QtCore import QTimer
from PySide6.QtWidgets import QLabel, QMainWindow, QSizePolicy, QStatusBar
from app.camera.camera_enumerator import CameraEnumerator, CameraInfo
from app.camera.camera_service import CameraService
from app.config import APP_NAME, APP_VERSION
from app.overlay.telemetry_overlay import TelemetryOverlay
from app.pipeline.frame_dispatcher import FrameDispatcher
from app.telemetry.telemetry_collector import TelemetryCollector
from app.ui.camera_view import CameraView
from app.ui.menu_bar import AppMenuBar
logger = logging.getLogger(__name__)
class MainWindow(QMainWindow):
"""
Top-level application window.
Rendering architecture:
QVideoWidget is intentionally NOT used — on Windows its native HWND
surface occludes all sibling/child QWidgets regardless of z-order.
CameraView is a plain QWidget that renders frames and overlay layers
in a single paintEvent pass.
Signal flow:
CameraService.frame_ready
→ FrameDispatcher.dispatch
→ CameraView.on_frame (render frame)
→ TelemetryCollector.on_frame (measure metrics)
→ TelemetryOverlay.on_metrics_updated (feed overlay data)
(CameraView repaints and calls TelemetryOverlay.paint())
"""
def __init__(self) -> None:
super().__init__()
self.setWindowTitle(f"{APP_NAME} v{APP_VERSION}")
self.setMinimumSize(640, 480)
self.resize(1280, 720)
# --- Core pipeline components ---
self._camera_service = CameraService(self)
self._dispatcher = FrameDispatcher(self)
self._telemetry = TelemetryCollector(parent=self)
# --- Camera view (central widget) ---
self._camera_view = CameraView(self)
self._camera_view.setSizePolicy(
QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding
)
self.setCentralWidget(self._camera_view)
# --- Overlay layers ---
self._telemetry_overlay = TelemetryOverlay()
self._camera_view.add_overlay_layer(self._telemetry_overlay)
# --- Menu bar ---
self._menu = AppMenuBar(self)
self.setMenuBar(self._menu)
# --- Status bar ---
self._status_bar = QStatusBar(self)
self.setStatusBar(self._status_bar)
self._status_label = QLabel("Initialising…")
self._status_bar.addWidget(self._status_label)
# --- Wire signals ---
self._wire_signals()
# --- Enumerate cameras and start ---
QTimer.singleShot(0, self._initialise_cameras)
# ------------------------------------------------------------------
# Initialisation
# ------------------------------------------------------------------
def _initialise_cameras(self) -> None:
cameras = CameraEnumerator.list_cameras()
if not cameras:
self._status_label.setText("No cameras found")
logger.warning("No cameras detected")
return
self._menu.populate_cameras(cameras)
default = CameraEnumerator.default_camera()
start_cam = default if default is not None else cameras[0]
self._menu.populate_formats(start_cam)
self._start_camera(start_cam)
def _start_camera(self, cam: CameraInfo) -> None:
self._telemetry.reset_counters()
self._camera_service.start(cam)
self._menu.set_active_camera(cam)
self._status_label.setText(f"Opening: {cam.name}")
# ------------------------------------------------------------------
# Signal wiring
# ------------------------------------------------------------------
def _wire_signals(self) -> None:
# CameraService → FrameDispatcher
self._camera_service.frame_ready.connect(self._dispatcher.dispatch)
# FrameDispatcher → CameraView (render) — drop if busy: stay fluid
self._dispatcher.subscribe(self._camera_view.on_frame, drop_if_busy=True)
# FrameDispatcher → TelemetryCollector — never drop, count every frame
self._dispatcher.subscribe(self._telemetry.on_frame, drop_if_busy=False)
# TelemetryCollector → TelemetryOverlay (data only, no repaint trigger here)
self._telemetry.metrics_updated.connect(self._telemetry_overlay.on_metrics_updated)
# CameraService status
self._camera_service.camera_started.connect(self._on_camera_started)
self._camera_service.camera_stopped.connect(self._on_camera_stopped)
self._camera_service.camera_error.connect(self._on_camera_error)
# Menu signals
self._menu.camera_selected.connect(self._on_camera_selected)
self._menu.resolution_selected.connect(self._on_resolution_selected)
self._menu.fps_selected.connect(self._on_fps_selected)
self._menu.reconnect_requested.connect(self._camera_service.reconnect)
self._menu.overlay_toggled.connect(self._camera_view.set_all_overlays_visible)
# ------------------------------------------------------------------
# Camera status slots
# ------------------------------------------------------------------
def _on_camera_started(self) -> None:
cam = self._camera_service.current_camera
name = cam.name if cam else "Unknown"
self._status_label.setText(f"Streaming: {name}")
logger.info("Camera streaming: %s", name)
def _on_camera_stopped(self) -> None:
self._status_label.setText("Camera stopped")
def _on_camera_error(self, message: str) -> None:
self._status_label.setText(f"Error: {message}")
logger.error("Camera error: %s", message)
# ------------------------------------------------------------------
# Menu action slots
# ------------------------------------------------------------------
def _on_camera_selected(self, cam: CameraInfo) -> None:
self._start_camera(cam)
def _on_resolution_selected(self, width: int, height: int) -> None:
self._camera_service.set_resolution(width, height)
def _on_fps_selected(self, fps: float) -> None:
self._camera_service.set_fps(fps)
# ------------------------------------------------------------------
# Qt overrides
# ------------------------------------------------------------------
def closeEvent(self, event) -> None: # noqa: N802
self._camera_service.stop()
super().closeEvent(event)

View File

@@ -1,198 +0,0 @@
"""Menu bar — camera, video format, FPS and debug controls."""
from __future__ import annotations
import logging
from PySide6.QtCore import Signal
from PySide6.QtGui import QAction, QActionGroup
from PySide6.QtWidgets import QMenuBar, QWidget
from app.camera.camera_enumerator import CameraInfo
logger = logging.getLogger(__name__)
class AppMenuBar(QMenuBar):
"""
Application menu bar.
Signals:
camera_selected(CameraInfo) — user picked a camera
resolution_selected(int, int) — user picked (width, height)
fps_selected(float) — user picked a target FPS
reconnect_requested() — user hit Reconnect
overlay_toggled(bool) — overlay show/hide
log_toggled(bool) — console logging on/off
"""
camera_selected = Signal(object) # CameraInfo
resolution_selected = Signal(int, int)
fps_selected = Signal(float)
reconnect_requested = Signal()
overlay_toggled = Signal(bool)
log_toggled = Signal(bool)
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self._camera_group: QActionGroup | None = None
self._resolution_group: QActionGroup | None = None
self._fps_group: QActionGroup | None = None
self._cameras: list[CameraInfo] = []
self._build_menus()
# ------------------------------------------------------------------
# Public API — called after camera enumeration
# ------------------------------------------------------------------
def populate_cameras(self, cameras: list[CameraInfo]) -> None:
"""Populate the Camera menu with discovered devices."""
self._cameras = cameras
menu = self._camera_menu
# Remove existing camera actions (keep Reconnect + separator)
for action in list(menu.actions()):
if action not in (self._reconnect_action, self._cam_separator):
menu.removeAction(action)
self._camera_group = QActionGroup(self)
self._camera_group.setExclusive(True)
for cam in cameras:
action = QAction(cam.name, self)
action.setCheckable(True)
action.setData(cam)
self._camera_group.addAction(action)
menu.insertAction(self._cam_separator, action)
action.triggered.connect(self._on_camera_action)
if cameras:
first = self._camera_group.actions()[0]
first.setChecked(True)
def populate_formats(self, camera_info: CameraInfo) -> None:
"""Populate Resolution and FPS menus based on a camera's supported formats."""
self._populate_resolutions(camera_info)
self._populate_fps(camera_info)
def set_active_camera(self, camera_info: CameraInfo) -> None:
"""Check the menu item matching camera_info."""
if self._camera_group is None:
return
for action in self._camera_group.actions():
if action.data() is camera_info:
action.setChecked(True)
return
# ------------------------------------------------------------------
# Menu construction
# ------------------------------------------------------------------
def _build_menus(self) -> None:
# --- Camera menu ---
self._camera_menu = self.addMenu("Camera")
self._cam_separator = self._camera_menu.addSeparator()
self._reconnect_action = QAction("Reconnect", self)
self._reconnect_action.triggered.connect(self.reconnect_requested)
self._camera_menu.addAction(self._reconnect_action)
# --- Video menu ---
self._video_menu = self.addMenu("Video")
self._res_menu = self._video_menu.addMenu("Resolution")
self._fps_menu = self._video_menu.addMenu("FPS")
# --- Debug menu ---
debug_menu = self.addMenu("Debug")
self._overlay_action = QAction("Show Overlay", self)
self._overlay_action.setCheckable(True)
self._overlay_action.setChecked(True)
self._overlay_action.toggled.connect(self.overlay_toggled)
debug_menu.addAction(self._overlay_action)
self._log_action = QAction("Console Logging", self)
self._log_action.setCheckable(True)
self._log_action.setChecked(False)
self._log_action.toggled.connect(self._on_log_toggled)
debug_menu.addAction(self._log_action)
def _populate_resolutions(self, camera_info: CameraInfo) -> None:
self._res_menu.clear()
self._resolution_group = QActionGroup(self)
self._resolution_group.setExclusive(True)
seen: set[tuple[int, int]] = set()
for w, h, _ in camera_info.formats:
key = (w, h)
if key in seen:
continue
seen.add(key)
action = QAction(f"{w} × {h}", self)
action.setCheckable(True)
action.setData((w, h))
self._resolution_group.addAction(action)
self._res_menu.addAction(action)
action.triggered.connect(self._on_resolution_action)
actions = self._resolution_group.actions()
if actions:
actions[0].setChecked(True)
def _populate_fps(self, camera_info: CameraInfo) -> None:
self._fps_menu.clear()
self._fps_group = QActionGroup(self)
self._fps_group.setExclusive(True)
seen: set[int] = set()
for _, _, fps in camera_info.formats:
key = round(fps)
if key in seen:
continue
seen.add(key)
action = QAction(f"{key} fps", self)
action.setCheckable(True)
action.setData(float(fps))
self._fps_group.addAction(action)
self._fps_menu.addAction(action)
action.triggered.connect(self._on_fps_action)
actions = self._fps_group.actions()
if actions:
actions[0].setChecked(True)
# ------------------------------------------------------------------
# Slots
# ------------------------------------------------------------------
def _on_camera_action(self) -> None:
action = self.sender()
if action is None:
return
cam: CameraInfo = action.data()
logger.debug("Camera selected: %s", cam.name)
self.camera_selected.emit(cam)
self._populate_resolutions(cam)
self._populate_fps(cam)
def _on_resolution_action(self) -> None:
action = self.sender()
if action is None:
return
w, h = action.data()
logger.debug("Resolution selected: %dx%d", w, h)
self.resolution_selected.emit(w, h)
def _on_fps_action(self) -> None:
action = self.sender()
if action is None:
return
fps: float = action.data()
logger.debug("FPS selected: %.1f", fps)
self.fps_selected.emit(fps)
def _on_log_toggled(self, enabled: bool) -> None:
level = logging.DEBUG if enabled else logging.WARNING
logging.getLogger().setLevel(level)
self.log_toggled.emit(enabled)

3
duck_preview/__main__.py Normal file
View File

@@ -0,0 +1,3 @@
from duck_preview.app import main
main()

47
duck_preview/app.py Normal file
View File

@@ -0,0 +1,47 @@
from __future__ import annotations
import sys
from PySide6.QtCore import QTimer
from PySide6.QtMultimedia import QVideoSink
from PySide6.QtWidgets import QApplication
from duck_preview.camera.service import CameraService
from duck_preview.dispatcher.frame_dispatcher import FrameDispatcher
from duck_preview.main_window import MainWindow
from duck_preview.rendering.overlay import OverlayWidget
from duck_preview.rendering.video_widget import VideoWidget
from duck_preview.telemetry.collector import TelemetryCollector
def main() -> None:
app = QApplication(sys.argv)
app.setApplicationName("Duck Preview")
camera = CameraService()
video_sink = QVideoSink()
dispatcher = FrameDispatcher()
telemetry = TelemetryCollector()
video_widget = VideoWidget()
overlay = OverlayWidget()
camera.set_video_widget(video_widget)
camera.set_video_sink(video_sink)
video_sink.videoFrameChanged.connect(dispatcher.on_frame)
dispatcher.subscribe(telemetry.on_frame)
window = MainWindow(camera, video_widget, overlay)
camera.error_occurred.connect(lambda msg: overlay.set_metrics({"error": msg}))
metrics_timer = QTimer()
metrics_timer.timeout.connect(lambda: overlay.set_metrics(telemetry.metrics()))
metrics_timer.start(200)
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,59 @@
from __future__ import annotations
from PySide6.QtCore import QObject, Signal
from PySide6.QtMultimedia import (
QCamera,
QCameraDevice,
QCameraFormat,
QMediaCaptureSession,
QMediaDevices,
QVideoSink,
)
from PySide6.QtMultimediaWidgets import QVideoWidget
class CameraService(QObject):
error_occurred = Signal(str)
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self._session = QMediaCaptureSession()
self._camera: QCamera | None = None
@property
def session(self) -> QMediaCaptureSession:
return self._session
def set_video_widget(self, widget: QVideoWidget) -> None:
self._session.setVideoOutput(widget)
def set_video_sink(self, sink: QVideoSink) -> None:
self._session.setVideoSink(sink)
@staticmethod
def available_cameras() -> list[QCameraDevice]:
return QMediaDevices.videoInputs()
def is_active(self) -> bool:
return self._camera is not None and self._camera.isActive()
def start(self, device: QCameraDevice) -> None:
self.stop()
self._camera = QCamera(device, self)
self._camera.errorOccurred.connect(self._on_error)
self._session.setCamera(self._camera)
self._camera.start()
def stop(self) -> None:
if self._camera is not None:
self._camera.stop()
self._session.setCamera(None)
self._camera.deleteLater()
self._camera = None
def set_camera_format(self, fmt: QCameraFormat) -> None:
if self._camera is not None:
self._camera.setCameraFormat(fmt)
def _on_error(self, error: QCamera.Error, error_string: str) -> None:
self.error_occurred.emit(error_string)

View File

@@ -0,0 +1,22 @@
from __future__ import annotations
from collections.abc import Callable
from PySide6.QtCore import QObject
from PySide6.QtMultimedia import QVideoFrame
class FrameDispatcher(QObject):
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self._subscribers: list[Callable[[QVideoFrame], None]] = []
def subscribe(self, callback: Callable[[QVideoFrame], None]) -> None:
self._subscribers.append(callback)
def unsubscribe(self, callback: Callable[[QVideoFrame], None]) -> None:
self._subscribers.remove(callback)
def on_frame(self, frame: QVideoFrame) -> None:
for cb in self._subscribers:
cb(frame)

139
duck_preview/main_window.py Normal file
View File

@@ -0,0 +1,139 @@
from __future__ import annotations
from PySide6.QtCore import QCameraPermission, Qt
from PySide6.QtGui import QAction, QCloseEvent
from PySide6.QtMultimedia import QCameraDevice, QMediaDevices
from PySide6.QtWidgets import QApplication, QGridLayout, QMainWindow, QWidget
from duck_preview.camera.service import CameraService
from duck_preview.rendering.overlay import OverlayWidget
from duck_preview.rendering.video_widget import VideoWidget
class MainWindow(QMainWindow):
def __init__(
self,
camera_service: CameraService,
video_widget: VideoWidget,
overlay_widget: OverlayWidget,
parent: QWidget | None = None,
) -> None:
super().__init__(parent)
self._camera = camera_service
self._video_widget = video_widget
self._overlay = overlay_widget
self._media_devices = QMediaDevices()
self.setWindowTitle("Duck Preview")
self.resize(1280, 720)
central = QWidget()
self.setCentralWidget(central)
layout = QGridLayout(central)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
layout.addWidget(video_widget, 0, 0)
layout.addWidget(overlay_widget, 0, 0)
self._setup_menus()
self._media_devices.videoInputsChanged.connect(self._rebuild_camera_menu)
def _setup_menus(self) -> None:
menu_bar = self.menuBar()
self._camera_menu = menu_bar.addMenu("Camera")
self._rebuild_camera_menu()
self._resolution_menu = menu_bar.addMenu("Resolution")
self._resolution_menu.setEnabled(False)
self._fps_menu = menu_bar.addMenu("FPS")
self._fps_menu.setEnabled(False)
debug_menu = menu_bar.addMenu("Debug")
toggle_overlay = QAction("Show Metrics", self)
toggle_overlay.setCheckable(True)
toggle_overlay.setChecked(True)
toggle_overlay.triggered.connect(self._overlay.set_visible)
debug_menu.addAction(toggle_overlay)
def _rebuild_camera_menu(self) -> None:
self._camera_menu.clear()
cameras = CameraService.available_cameras()
for device in cameras:
action = QAction(device.description(), self)
action.triggered.connect(lambda checked, d=device: self._on_camera_selected(d))
self._camera_menu.addAction(action)
if not cameras:
action = QAction("No cameras detected", self)
action.setEnabled(False)
self._camera_menu.addAction(action)
def _on_camera_selected(self, device: QCameraDevice) -> None:
self._request_camera_permission(device)
def _request_camera_permission(self, device: QCameraDevice) -> None:
perm = QCameraPermission()
match QApplication.checkPermission(perm):
case Qt.PermissionStatus.Undetermined:
QApplication.requestPermission(
perm, self, lambda: self._request_camera_permission(device)
)
case Qt.PermissionStatus.Denied:
self._overlay.set_metrics(
{
"error": (
"Camera permission denied.\n"
"Grant access in System Settings > "
"Privacy & Security > Camera"
)
}
)
case Qt.PermissionStatus.Granted:
self._camera.start(device)
self._rebuild_resolution_menu(device)
def _rebuild_resolution_menu(self, device: QCameraDevice) -> None:
self._resolution_menu.clear()
self._resolution_menu.setEnabled(True)
formats = device.videoFormats()
seen: set[tuple[int, int]] = set()
for fmt in formats:
res = fmt.resolution()
key = (res.width(), res.height())
if key not in seen:
seen.add(key)
action = QAction(f"{res.width()}x{res.height()}", self)
action.triggered.connect(
lambda checked, d=device, w=res.width(), h=res.height(): (
self._on_resolution_selected(d, w, h)
)
)
self._resolution_menu.addAction(action)
def _on_resolution_selected(self, device: QCameraDevice, width: int, height: int) -> None:
self._rebuild_fps_menu(device, width, height)
def _rebuild_fps_menu(self, device: QCameraDevice, width: int, height: int) -> None:
self._fps_menu.clear()
self._fps_menu.setEnabled(True)
formats = device.videoFormats()
seen_labels: set[str] = set()
for fmt in formats:
res = fmt.resolution()
if res.width() == width and res.height() == height:
min_fps = round(fmt.minFrameRate())
max_fps = round(fmt.maxFrameRate())
label = f"{max_fps} FPS" if min_fps == max_fps else f"{min_fps}-{max_fps} FPS"
if label not in seen_labels:
seen_labels.add(label)
action = QAction(label, self)
action.triggered.connect(
lambda checked, f=fmt: self._camera.set_camera_format(f)
)
self._fps_menu.addAction(action)
def closeEvent(self, event: QCloseEvent) -> None: # noqa: N802
self._camera.stop()
super().closeEvent(event)

View File

@@ -0,0 +1,73 @@
from __future__ import annotations
from PySide6.QtCore import Qt
from PySide6.QtGui import QColor, QFont, QFontMetrics, QPainter
from PySide6.QtWidgets import QWidget
class OverlayWidget(QWidget):
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self.setAttribute(Qt.WA_TransparentForMouseEvents)
self.setAttribute(Qt.WA_NoSystemBackground)
self._visible = True
self._metrics: dict[str, float | int | str] = {}
def set_visible(self, visible: bool) -> None:
self._visible = visible
self.update()
def set_metrics(self, metrics: dict[str, float | int | str]) -> None:
self._metrics = metrics
self.update()
def paintEvent(self, event) -> None: # noqa: N802
if not self._visible or not self._metrics:
return
painter = QPainter(self)
painter.setRenderHint(QPainter.TextAntialiasing)
if "error" in self._metrics:
self._paint_error(painter)
else:
self._paint_metrics(painter)
def _paint_metrics(self, painter: QPainter) -> None:
lines = [
f"FPS: {self._metrics.get('fps', 0):>7}",
f"Frame: {self._metrics.get('frame_time_ms', 0):>7.1f} ms",
f"Frames: {self._metrics.get('frame_count', 0):>7}",
]
font = QFont("monospace", 11)
painter.setFont(font)
fm = QFontMetrics(font)
text_width = max(fm.horizontalAdvance(line) for line in lines) + 24
text_height = len(lines) * (fm.height() + 6) + 12
painter.fillRect(8, 8, text_width, text_height, QColor(0, 0, 0, 160))
painter.setPen(QColor(0, 255, 0))
for i, line in enumerate(lines):
y = 22 + i * (fm.height() + 6)
painter.drawText(16, y, line)
def _paint_error(self, painter: QPainter) -> None:
msg = str(self._metrics.get("error", "Unknown error"))
lines = msg.split("\n")
font = QFont("monospace", 12)
painter.setFont(font)
fm = QFontMetrics(font)
text_width = max(fm.horizontalAdvance(line) for line in lines) + 24
text_height = len(lines) * (fm.height() + 6) + 12
painter.fillRect(8, 8, text_width, text_height, QColor(0, 0, 0, 200))
painter.setPen(QColor(200, 50, 50))
for i, line in enumerate(lines):
y = 22 + i * (fm.height() + 6)
painter.drawText(16, y, line)

View File

@@ -0,0 +1,10 @@
from __future__ import annotations
from PySide6.QtMultimediaWidgets import QVideoWidget
from PySide6.QtWidgets import QWidget
class VideoWidget(QVideoWidget):
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self.setMinimumSize(320, 240)

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
import time
from collections import deque
from PySide6.QtCore import QObject
from PySide6.QtMultimedia import QVideoFrame
class TelemetryCollector(QObject):
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self._timestamps: deque[float] = deque(maxlen=500)
def on_frame(self, frame: QVideoFrame) -> None:
self._timestamps.append(time.perf_counter())
def metrics(self) -> dict[str, float | int]:
if not self._timestamps:
return {"fps": 0, "frame_time_ms": 0.0, "frame_count": 0}
now = time.perf_counter()
cutoff = now - 1.0
recent = [t for t in self._timestamps if t >= cutoff]
fps = len(recent)
frame_time_ms = 0.0
if len(self._timestamps) >= 2:
diffs = [
self._timestamps[i] - self._timestamps[i - 1]
for i in range(1, len(self._timestamps))
]
frame_time_ms = (sum(diffs) / len(diffs)) * 1000 if diffs else 0.0
return {
"fps": fps,
"frame_time_ms": round(frame_time_ms, 2),
"frame_count": len(self._timestamps),
}

View File

@@ -1,324 +1,150 @@
# Plan działania — MVP Camera Preview (PySide6)
# MVP Implementation Plan — Duck Preview
## Środowisko
## Stack
| Element | Wartość |
|---|---|
| Python | 3.12.10 (venv: `.venv-win`) |
| Framework GUI | PySide6 6.11.0 |
| Dev platform | Windows 11 |
| Target platform | Mac Mini (Intel i7, macOS Ventura) |
| Kamera docelowa | ELP USB Camera |
| Narzędzia | pytest, ruff, colorama |
- **Language:** Python 3.12
- **GUI/Framework:** PySide6 6.11 (QtMultimedia + QtWidgets)
- **Camera backend:** QCamera + QMediaCaptureSession + QVideoSink (native GPU)
- **Rendering:** QWidget (paintEvent) z QPainter — manual render z QVideoFrame → QImage
- **Testing:** pytest
- **Linting/Formatting:** ruff
## Architecture
```
CameraService
└─ QVideoSink.videoFrame ──→ FrameDispatcher.on_frame
├─ VideoWidget.on_frame (render klatki)
├─ TelemetryCollector.on_frame (timestamp)
└─ [Future AI subscribers]
TelemetryCollector.metrics() ──→ OverlayWidget.set_metrics()
(polled co 200ms przez QTimer w app.py)
```
## Kolejność implementacji
### 0. Project scaffolding
`pyproject.toml` — projekt, zależność PySide6, entry point, ruff config, pytest
`duck_preview/__init__.py`, `__main__.py` — entry point `python -m duck_preview`
Podkatalogi: `camera/`, `dispatcher/`, `rendering/`, `telemetry/`
### 1. CameraService (`camera/service.py`)
- Wrapper na `QCamera` + `QMediaCaptureSession` + `QVideoSink`
- `available_cameras()` — static, zwraca listę `QCameraDevice` z `QMediaDevices`
- `start(device)` / `stop()` — zarządzanie cyklem życia kamery
- `set_camera_format(fmt)` — zmiana rozdzielczości/FPS
- `sink` property — QVideoSink do podpięcia dispatchera
- `error_occurred` Signal — propagacja błędów kamery
### 2. FrameDispatcher (`dispatcher/frame_dispatcher.py`)
- Prosty pub/sub: `subscribe(cb)` / `unsubscribe(cb)`
- `on_frame(frame)` — wołany z sygnału QVideoSink, iteruje wszystkich subskrybentów
- Frame dropping: na razie brak (wszystkie callbacki szybkie)
### 3. TelemetryCollector (`telemetry/collector.py`)
- Subskrybent dispatchera
- Zbiera timestampy (`deque`, maxlen=500)
- `metrics()` → dict: fps (klatki z ostatniej sekundy), frame_time_ms (średnia delta między klatkami), frame_count
- Brak CPU usage (out of scope na MVP)
### 4. VideoWidget (`rendering/video_widget.py`)
- `QWidget` z `WA_OpaquePaintEvent`
- `on_frame(frame)``frame.toImage()` → zapis QImage → `update()`
- `paintEvent` — skalowanie z zachowaniem proporcji, centrowanie, letterboxing
- Brak klatek → wyświetla "No camera feed"
### 5. OverlayWidget (`rendering/overlay.py`)
- Przezroczysty QWidget (`WA_TransparentForMouseEvents`)
- Nakładka na video, rysuje tekst metryk (FPS, frame time, frame count)
- Semi-transparentne tło, zielona czcionka Consolas
- Toggle widoczności przez menu Debug
### 6. MainWindow (`main_window.py`)
- `QMainWindow` z menu barem:
- **Camera** — lista dostępnych kamer (dynamiczna, reaguje na `videoInputsChanged`)
- **Resolution** — dostępne rozdzielczości dla wybranej kamery
- **FPS** — dostępne FPS dla wybranej rozdzielczości
- **Debug** — toggle nakładki
- Central widget: GridLayout z VideoWidget + OverlayWidget (stacked)
### 7. App (`app.py`)
- `main()` — tworzy QApplication, instancjonuje i łączy zależności (ręczne DI)
- Wire: camera.sink → dispatcher.on_frame
- Wire: dispatcher → telemetry.on_frame, video_widget.on_frame
- QTimer 200ms: telemetry.metrics() → overlay.set_metrics()
- `__main__.py``from duck_preview.app import main; main()`
---
## Fazy realizacji
### Faza 0 — Projekt i scaffolding
Cel: ustalenie struktury katalogów i modułów przed napisaniem pierwszej linii logiki.
#### 0.1 Struktura projektu
## DI Wiring (ręczne, w app.py)
```
duck-preview2/
├── app/
app = QApplication(sys.argv)
camera = CameraService()
dispatcher = FrameDispatcher()
telemetry = TelemetryCollector()
video_widget = VideoWidget()
overlay = OverlayWidget()
camera.sink.videoFrame.connect(dispatcher.on_frame)
dispatcher.subscribe(telemetry.on_frame)
dispatcher.subscribe(video_widget.on_frame)
window = MainWindow(camera, video_widget, overlay)
# Poll telemetry co 200ms → overlay
metrics_timer = QTimer()
metrics_timer.timeout.connect(lambda: overlay.set_metrics(telemetry.metrics()))
metrics_timer.start(200)
window.show()
app.exec()
```
---
## Struktura plików
```
duck-preview/
├── pyproject.toml
├── notes/
│ ├── 01-mvp-preview.md
│ └── 01-mvp-plan.md
├── duck_preview/
│ ├── __init__.py
│ ├── main.py # entry point
│ ├── config.py # stałe, domyślne ustawienia
│ ├── __main__.py
│ ├── app.py
│ ├── main_window.py
│ ├── camera/
│ │ ├── __init__.py
│ │ ── camera_service.py # QCamera + QMediaCaptureSession
│ └── camera_enumerator.py # wykrywanie dostępnych kamer
│ ├── pipeline/
│ │ ── service.py
├── dispatcher/
│ │ ├── __init__.py
│ │ └── frame_dispatcher.py # dystrybucja klatek do subskrybentów
│ ├── telemetry/
│ │ └── frame_dispatcher.py
│ ├── rendering/
│ │ ├── __init__.py
│ │ ── telemetry_collector.py # zbieranie metryk FPS/frame time/CPU
── overlay/
│ ├── __init__.py
│ │ └── overlay_widget.py # przezroczysta warstwa QWidget
│ └── ui/
│ │ ── video_widget.py
│ └── overlay.py
└── telemetry/
│ ├── __init__.py
── main_window.py # główne okno aplikacji
│ └── menu_bar.py # menu: kamera, rozdzielczość, FPS, debug
├── tests/
├── __init__.py
── test_camera_enumerator.py
│ └── test_telemetry_collector.py
├── notes/
├── requirements.txt
├── requirements-dev.txt
└── pyproject.toml # konfiguracja ruff + pytest
```
#### 0.2 Pliki konfiguracyjne
- `pyproject.toml` — konfiguracja ruff (linter/formatter) i pytest
- `requirements.txt` — zależności produkcyjne (PySide6)
- `requirements-dev.txt` — zależności deweloperskie (pytest, ruff)
- `.gitignore` — aktualizacja o artefakty Pythona
---
### Faza 1 — Camera Service
Cel: stabilne pobranie obrazu z kamery przez QtMultimedia.
#### 1.1 Camera Enumerator
- `QMediaDevices.videoInputs()` — lista dostępnych kamer
- Zwraca listę `QCameraDevice` z nazwą, id i obsługiwanymi formatami
- Obsługa braku kamer (komunikat, nie crash)
- Test jednostkowy: mockowanie `QMediaDevices`
#### 1.2 Camera Service
- Opakowuje `QCamera` + `QMediaCaptureSession`
- API:
- `start(device: QCameraDevice)` — uruchamia kamerę
- `stop()` — zatrzymuje kamerę
- `set_resolution(width, height)` — ustawia format
- `set_fps(fps)` — ustawia docelowy FPS
- `reconnect()` — restart po błędzie
- `QVideoSink` jako punkt odbioru klatek
- Sygnał `frame_ready(QVideoFrame)` do Frame Dispatcher
- Obsługa błędów kamery (`QCamera.errorOccurred`)
#### 1.3 Uwagi platformowe
| Aspekt | Windows 11 (dev) | macOS Ventura (target) |
|---|---|---|
| Backend | DirectShow / Media Foundation | AVFoundation |
| Kamera ELP | USB, standardowy UVC driver | USB, UVC |
| Format klatek | YUYV / MJPEG | YUYV / MJPEG |
| GPU rendering | ANGLE (OpenGL ES) | Metal |
---
### Faza 2 — Frame Dispatcher
Cel: dystrybucja klatek do wielu odbiorców bez blokowania akwizycji.
#### 2.1 Frame Dispatcher
- Wzorzec: publish-subscribe (lista callbacków)
- `subscribe(callback: Callable[[QVideoFrame], None])`
- `unsubscribe(callback)`
- `dispatch(frame: QVideoFrame)` — wywołuje wszystkich subskrybentów
- Klatki NIE są kopiowane — subskrybenci działają na referencji
- Subskrybenci mogą **pominąć klatkę** (tryb drop-if-busy)
- Wywołanie `dispatch` następuje w wątku GUI (slot połączony z `frame_ready`)
#### 2.2 Subskrybenci w Fazie 1
| Subskrybent | Działanie |
|---|---|
| Video Renderer | przekazuje klatkę do `QVideoSink` / `QVideoWidget` |
| Telemetry Collector | mierzy czas, zlicza klatki |
---
### Faza 3 — Video Renderer
Cel: renderowanie klatki w GUI bez zbędnych kopii.
#### 3.1 Podejście
- `QVideoWidget` jako główny widget podglądu
- `QMediaCaptureSession.setVideoOutput(QVideoWidget)` — ścieżka bezpośrednia, zero kopii
- Alternatywnie: `QVideoSink``QGraphicsVideoItem` dla przyszłych overlayów
- Domyślnie: `QVideoWidget` (prosta, niska latencja)
#### 3.2 Wymagania
- Preview nie blokuje wątku GUI
- Obsługa aspect ratio (letter/pillarbox)
- Resize okna bez migotania
---
### Faza 4 — Telemetry Collector
Cel: dokładne metryki pipeline'u wideo.
#### 4.1 Zbierane metryki
| Metryka | Metoda pomiaru |
|---|---|
| Realtime FPS | licznik klatek / okno 1 s |
| Frame time | `time.perf_counter()` między klatkami |
| Frame acquisition time | timestamp wejście frame_ready → dispatch |
| Rendering time | czas `QVideoWidget.update()` (opcjonalnie) |
| Dropped frames | detekcja przez numerację lub timestamp gap |
| CPU usage | `psutil.cpu_percent()` (dodać do requirements) |
| Memory usage | `psutil.virtual_memory()` (opcjonalnie) |
#### 4.2 API
- `TelemetryCollector` — subskrybent Frame Dispatcher
- `on_frame(frame: QVideoFrame)` — rejestruje timestamp klatki
- `get_snapshot() -> TelemetrySnapshot` — aktualny stan metryk (dataclass)
- `update_interval_ms: int` — jak często odświeżać snapshot (domyślnie 500 ms)
- Sygnał `metrics_updated(TelemetrySnapshot)` — emitowany co `update_interval_ms`
#### 4.3 TelemetrySnapshot (dataclass)
```python
@dataclass
class TelemetrySnapshot:
fps: float
frame_time_ms: float
dropped_frames: int
cpu_percent: float
memory_mb: float | None
timestamp: float
── collector.py
└── tests/
├── __init__.py
├── test_dispatcher.py
── test_collector.py
```
---
### Faza 5 — Overlay System
## Edge cases / uwagi
Cel: wyświetlanie metryk na przezroczystej warstwie nad podglądem.
#### 5.1 Architektura
- `OverlayWidget(QWidget)` — przezroczysty widget (`WA_TransparentForMouseEvents`)
- Pozycjonowany absolutnie nad `QVideoWidget` (ten sam parent, wyższy z-index)
- `paintEvent` rysuje semi-przezroczysty prostokąt + tekst z metrykami
- Połączony z sygnałem `metrics_updated` — odświeża tylko gdy dane się zmienią
#### 5.2 Zawartość overlaya (MVP)
```
FPS: 60.0
Frame: 16.7 ms
Drop: 0
CPU: 12.3 %
```
#### 5.3 Sterowalność
- Widoczność overlaya: toggle przez menu Debug
- Pozycja: lewy górny róg (stała w MVP)
- Kolor tła: `rgba(0, 0, 0, 160)`
---
### Faza 6 — GUI / Main Window
Cel: minimalne, funkcjonalne okno aplikacji.
#### 6.1 MainWindow
- `QMainWindow` z `QVideoWidget` jako central widget
- `OverlayWidget` nałożony na video
- Obsługa resize → reposition overlay
- Tytuł okna: `Duck Preview`
#### 6.2 MenuBar
Menu **Camera**:
- Lista wykrytych kamer (radio-style)
- Separator
- Reconnect
Menu **Video**:
- Resolution submenu (pobierane dynamicznie z `QCameraDevice.videoFormats()`)
- FPS submenu
Menu **Debug**:
- Toggle overlay metryk
- Logowanie do konsoli (toggle)
#### 6.3 Startup flow
```
main.py
→ QApplication
→ CameraEnumerator.list_cameras()
→ MainWindow(cameras)
→ CameraService.start(cameras[0]) # pierwsza kamera lub ELP
→ FrameDispatcher.subscribe(telemetry, renderer)
→ app.exec()
```
---
### Faza 7 — Testy i walidacja
#### 7.1 Testy jednostkowe
| Moduł | Co testować |
|---|---|
| `CameraEnumerator` | lista kamer, brak kamer, format danych |
| `TelemetryCollector` | obliczenia FPS, wykrywanie dropów |
| `FrameDispatcher` | subskrypcja, odsubskrypcja, dispatch |
| `TelemetrySnapshot` | poprawność dataclass |
#### 7.2 Testy manualne (Windows dev)
- [ ] Uruchomienie z kamerą laptopa / USB webcam
- [ ] Przełączanie kamer
- [ ] Zmiana rozdzielczości
- [ ] Zmiana FPS
- [ ] Toggle overlay
- [ ] Reconnect po odłączeniu kamery
#### 7.3 Testy na Mac Mini (target)
- [ ] Wykrycie kamery ELP
- [ ] Poprawny format YUYV/MJPEG
- [ ] Wydajność AVFoundation vs DirectShow
- [ ] GPU rendering przez Metal
#### 7.4 Kryteria sukcesu (z PRD)
- Preview stabilny i płynny
- Latencja renderowania niska
- Dane telemetrii dokładne
- GUI responsywne
- Overlay działa poprawnie
- Architektura gotowa na subskrybentów AI
---
## Kolejność implementacji (sprint order)
```
Sprint 1: Faza 0 — scaffolding, pyproject.toml, requirements
Sprint 2: Faza 1 — CameraEnumerator + CameraService (bez GUI)
Sprint 3: Faza 3 — VideoRenderer + MainWindow (preview działa)
Sprint 4: Faza 2 — FrameDispatcher (refactor pipeline)
Sprint 5: Faza 4 — TelemetryCollector
Sprint 6: Faza 5 — OverlayWidget
Sprint 7: Faza 6 — MenuBar (camera/resolution/fps switch)
Sprint 8: Faza 7 — Testy, poprawki, walidacja na Mac Mini
```
---
## Zależności do dodania
```
# requirements.txt
PySide6>=6.7
psutil>=6.0
# requirements-dev.txt
pytest>=8.0
ruff>=0.4
```
---
## Uwagi cross-platform
1. **ELP camera** — kamera UVC, powinna działać bez dodatkowych sterowników na obu platformach. Sprawdzić obsługiwane rozdzielczości i FPS przez `QCameraDevice.videoFormats()`.
2. **Ścieżki absolutne** — unikać `os.path` na korzyść `pathlib.Path`.
3. **Threading** — wszystkie operacje Qt muszą odbywać się w wątku GUI. `TelemetryCollector` może używać `QTimer` zamiast osobnego wątku.
4. **Format klatek** — na macOS AVFoundation preferuje `BGRA` lub `NV12`. Konwersja powinna być leniwa i tylko gdy potrzebna (nie w hot path renderowania).
5. **High DPI** — włączyć `QApplication.setHighDpiScaleFactorRoundingPolicy` dla konsistencji Windows/Mac.
6. **Testowanie bez kamery**`CameraEnumerator` powinien umożliwiać dependency injection / mock dla środowisk CI.
- Brak kamer → menu Camera pokazuje "No cameras detected"
- Brak klatek → VideoWidget pokazuje ciemne tło + napis
- Błąd kamery → CameraService emituje `error_occurred` (na razie tylko log)
- Zamknięcie okna → `closeEvent``camera.stop()`
- QVideoFrame.toImage() kopiuje dane — akceptowalne dla MVP
- Wszystkie obiekty żyją w main thread — brak problemów z threadingiem

View File

@@ -253,7 +253,6 @@ Architecture must support future additions:
* snapshots,
* streaming,
* remote sinks.
* play video files
Without major redesign.

212
notes/02-mvp-mac-plan.md Normal file
View File

@@ -0,0 +1,212 @@
# Plan — macOS fix + QVideoWidget + QVideoSink dual output
## Cel
Przywrócić działanie kamery na macOS (Elgato) przez:
1. Przejście z manualnego `QPainter` renderowania na natywny `QVideoWidget`
2. Dodanie `QVideoSink` jako drugiego wyjścia (frame access dla telemetrii)
3. Obsługa `QCameraPermission` (Qt 6.5+)
4. Dokumentacja packagingu przez `pyside6-deploy` na macOS
## Kolejność implementacji
### 1. camera/service.py
```python
class CameraService(QObject):
error_occurred = Signal(str)
def __init__(self, parent=None):
super().__init__(parent)
self._session = QMediaCaptureSession()
self._camera: QCamera | None = None
def set_video_widget(self, widget: QVideoWidget) -> None:
self._session.setVideoOutput(widget)
def set_video_sink(self, sink: QVideoSink) -> None:
self._session.setVideoSink(sink)
@property
def session(self) -> QMediaCaptureSession:
return self._session
@staticmethod
def available_cameras() -> list[QCameraDevice]:
return QMediaDevices.videoInputs()
def is_active(self) -> bool:
return self._camera is not None and self._camera.isActive()
def start(self, device: QCameraDevice) -> None:
self.stop()
self._camera = QCamera(device, self)
self._camera.errorOccurred.connect(self._on_error)
self._session.setCamera(self._camera)
self._camera.start()
def stop(self) -> None:
if self._camera is not None:
self._camera.stop()
self._session.setCamera(None)
self._camera.deleteLater()
self._camera = None
def set_camera_format(self, fmt: QCameraFormat) -> None:
if self._camera is not None:
self._camera.setCameraFormat(fmt)
def _on_error(self, error, error_string):
self.error_occurred.emit(error_string)
```
Zmiany:
- Usunięto `self._sink` z `__init__`
- Usunięto `self._session.setVideoOutput(self._sink)`
- Dodano `set_video_widget(widget)``session.setVideoOutput(widget)`
- Dodano `set_video_sink(sink)``session.setVideoSink(sink)`
### 2. rendering/video_widget.py
```python
from __future__ import annotations
from PySide6.QtMultimediaWidgets import QVideoWidget
class VideoWidget(QVideoWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.setMinimumSize(320, 240)
```
Zmiany:
- Dziedziczy po `QVideoWidget` zamiast `QWidget`
- Usunięto `on_frame(frame)` — nie potrzeba, renderowanie natywne
- Usunięto `paintEvent` — nie potrzeba, robi to Qt
- Usunięto `QImage`, `QPainter`, `SmoothPixmapTransform`
### 3. rendering/overlay.py
```python
font = QFont("monospace", 11) # zamiast "Consolas"
```
Dodatkowo: obsługa błędu w `paintEvent`:
- Jeśli `self._metrics` ma klucz `"error"` → pomiń normalne metryki, narysuj czerwony komunikat błędu
- Użyj `QColor(180, 40, 40)` zamiast zielonego
### 4. app.py — nowe DI wiring
```python
def main():
app = QApplication(sys.argv)
app.setApplicationName("Duck Preview")
camera = CameraService()
video_sink = QVideoSink()
dispatcher = FrameDispatcher()
telemetry = TelemetryCollector()
video_widget = VideoWidget()
overlay = OverlayWidget()
camera.set_video_widget(video_widget)
camera.set_video_sink(video_sink)
video_sink.videoFrameChanged.connect(dispatcher.on_frame)
dispatcher.subscribe(telemetry.on_frame)
window = MainWindow(camera, video_widget, overlay)
# Wire error → overlay
camera.error_occurred.connect(
lambda msg: overlay.set_metrics({"error": msg})
)
# Poll telemetry → overlay
metrics_timer = QTimer()
metrics_timer.timeout.connect(
lambda: overlay.set_metrics(telemetry.metrics())
)
metrics_timer.start(200)
window.show()
sys.exit(app.exec())
```
Zmiany:
- Tworzy `QVideoSink` jawnie
- `camera.set_video_widget(video_widget)` + `camera.set_video_sink(video_sink)`
- `video_sink.videoFrameChanged → dispatcher.on_frame`
- Usunięto `camera.sink` (property nie istnieje)
- `camera.error_occurred → overlay.set_metrics({"error": msg})`
### 5. main_window.py — permission flow
```python
# Importy
from PySide6.QtCore import QCameraPermission, Qt
from PySide6.QtWidgets import QApplication
# W _on_camera_selected:
def _on_camera_selected(self, device: QCameraDevice) -> None:
self._request_camera_permission(device)
def _request_camera_permission(self, device: QCameraDevice) -> None:
perm = QCameraPermission()
match QApplication.checkPermission(perm):
case Qt.PermissionStatus.Undetermined:
QApplication.requestPermission(perm, self,
lambda: self._request_camera_permission(device))
case Qt.PermissionStatus.Denied:
self._overlay.set_metrics({
"error": "Camera permission denied.\n"
"Grant access in System Settings > "
"Privacy & Security > Camera"
})
case Qt.PermissionStatus.Granted:
self._camera.start(device)
self._rebuild_resolution_menu(device)
```
### 6. pyside6-deploy.toml
```toml
[app]
script = "duck_preview/__main__.py"
name = "Duck Preview"
bundle_identifier = "com.bartool.duck-preview"
categories = "public.app-category.photography"
platforms = ["macos"]
```
### 7. Po zmianach
- `ruff check .` — czysty
- `ruff format .` — sformatowany
- `pytest -q` — 5 passed
---
## Podsumowanie architektury końcowej
```
CameraService
├─ session.setVideoOutput(VideoWidget) → natywne GPU rendering
└─ session.setVideoSink(VideoSink) → frame access
└─ videoFrameChanged
→ FrameDispatcher
├─ TelemetryCollector
└─ [future AI]
MainWindow
├─ _on_camera_selected
│ → QCameraPermission check (Undetermined/Denied/Granted)
│ → CameraService.start(device)
├─ error_occurred → OverlayWidget.set_metrics({"error": ...})
└─ QTimer 200ms → TelemetryCollector.metrics() → OverlayWidget
OverlayWidget
├─ normal: zielone metryki (FPS, frame time, frame count)
└─ error: czerwony komunikat (np. brak permisji)
```

126
notes/02-mvp-mac.md Normal file
View File

@@ -0,0 +1,126 @@
# macOS — Camera on macOS with PySide6
## Problem
Aplikacja uruchomiona w interpreted mode (`python -m duck_preview`) na macOS z kamerą Elgato nie wyświetla obrazu.
## Przyczyna
Oficjalny przykład PySide6 (`camera.qml` / examples/multimedia) zawiera kod:
```python
if sys.platform == "darwin":
is_nuitka = "__compiled__" in globals()
if not is_nuitka and sys.platform == "darwin":
print("This example does not work on macOS when Python is run "
"in interpreted mode. For this example to work on macOS, "
"package the example using pyside6-deploy")
sys.exit(0)
```
**macOS → AVFoundation → wymaga spakowania przez Nuitka/pyside6-deploy.**
`QCamera` na macOS potrzebuje properly bundled app structure (Info.plist, entitlements, code signing). W interpreted mode Python nie ma tego contextu.
## Rozwiązanie
Pakować aplikację przez `pyside6-deploy` (Nuitka) na macOS. Na Windows działa bez pakowania.
---
## Qt Permission API (`QCameraPermission`)
Od Qt 6.5+ dostępne jest `QCameraPermission` — nowoczesne API do proszenia o zgodę kamery.
### API
```python
from PySide6.QtCore import QCameraPermission, Qt
from PySide6.QtWidgets import QApplication
perm = QCameraPermission()
match QApplication.checkPermission(perm):
case Qt.PermissionStatus.Undetermined:
# Prosimy o zgodę → callback wywoła init ponownie
QApplication.requestPermission(perm, parent, callback)
return
case Qt.PermissionStatus.Denied:
# overlay: "Camera permission denied"
case Qt.PermissionStatus.Granted:
# uruchom kamerę
```
### Permission flow
```
User → wybiera kamerę w menu
→ MainWindow._on_camera_selected(device)
→ checkPermission(QCameraPermission)
├─ Undetermined → requestPermission(camera, parent, callback)
│ └─ callback → _on_camera_selected ponownie
├─ Denied → overlay: "Camera permission denied. Grant access in System Settings > Privacy & Security > Camera"
└─ Granted → CameraService.start(device)
```
---
## QVideoWidget + QVideoSink — dual output
PySide6 6.11 ma osobne metody w `QMediaCaptureSession`:
| Metoda | Przeznaczenie |
|--------|---------------|
| `setVideoOutput(QVideoWidget*)` | Natywne renderowanie (GPU) |
| `setVideoSink(QVideoSink*)` | Dostęp do klatek (telemetria, AI) |
Oba działają **równolegle** — nie ma konfliktu.
```
QMediaCaptureSession
├─ setVideoOutput(QVideoWidget) → natywne renderowanie
└─ setVideoSink(QVideoSink) → frame access
└─ videoFrameChanged → FrameDispatcher
```
QVideoSink dostarcza sygnał `videoFrameChanged(QVideoFrame)` — na to podpina się FrameDispatcher, a ten rozsyła do TelemetryCollector i przyszłych AI subscriberów.
---
## macOS — packaging
### pyside6-deploy
```bash
pip install nuitka
pyside6-deploy duck_preview/__main__.py --name "Duck Preview"
```
### pyside6-deploy.toml
```toml
[app]
script = "duck_preview/__main__.py"
name = "Duck Preview"
bundle_identifier = "com.bartool.duck-preview"
categories = "public.app-category.photography"
platforms = ["macos"]
```
Po spakowaniu powstaje `Duck Preview.app` — standalone bundle z dostępem do AVFoundation.
### Windows
Na Windows działa bez pakowania — `python -m duck_preview` w venv.
---
## Font fallback
`Consolas` nie istnieje na macOS. Używać `"monospace"` (generic font family — Qt mapuje na Menlo na macOS, Consolas na Windows).
## Error state w Overlay
Overlay wspiera wyświetlanie błędów (np. brak permisji):
- Normalny stan → zielone metryki (FPS, frame times)
- Error state → czerwony komunikat
- Przełączanie przez `overlay.set_metrics({"error": "message"})`

View File

@@ -1,43 +1,21 @@
[project]
name = "duck-preview"
version = "0.1.0"
description = "Realtime camera preview application with telemetry"
description = "Realtime camera preview application with performance monitoring"
requires-python = ">=3.12"
dependencies = [
"PySide6>=6.7",
"psutil>=6.0",
"PySide6>=6.11",
]
[project.scripts]
duck-preview = "app.main:main"
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_functions = ["test_*"]
addopts = "-v"
duck-preview = "duck_preview.app:main"
[tool.ruff]
target-version = "py312"
line-length = 100
[tool.ruff.lint]
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"I", # isort
"UP", # pyupgrade
"B", # flake8-bugbear
"N", # pep8-naming
]
ignore = [
"B008", # do not perform function calls in default arguments
]
select = ["E", "F", "I", "N", "W"]
[tool.ruff.lint.isort]
known-first-party = ["app"]
[tool.ruff.format]
quote-style = "double"
indent-style = "space"
[tool.pytest.ini_options]
testpaths = ["tests"]

6
pyside6-deploy.toml Normal file
View File

@@ -0,0 +1,6 @@
[app]
script = "duck_preview/__main__.py"
name = "Duck Preview"
bundle_identifier = "com.bartool.duck-preview"
categories = "public.app-category.photography"
platforms = ["macos"]

View File

@@ -1,3 +0,0 @@
-r requirements.txt
pytest>=8.0
ruff>=0.4

View File

@@ -1,2 +0,0 @@
PySide6>=6.7
psutil>=6.0

22
tests/test_collector.py Normal file
View File

@@ -0,0 +1,22 @@
import time
from unittest.mock import MagicMock
from duck_preview.telemetry.collector import TelemetryCollector
def test_metrics_empty():
collector = TelemetryCollector()
metrics = collector.metrics()
assert metrics == {"fps": 0, "frame_time_ms": 0.0, "frame_count": 0}
def test_metrics_after_frames():
collector = TelemetryCollector()
mock = MagicMock()
for _ in range(30):
collector.on_frame(mock)
time.sleep(0.002)
metrics = collector.metrics()
assert metrics["frame_count"] >= 30
assert metrics["frame_time_ms"] > 0

47
tests/test_dispatcher.py Normal file
View File

@@ -0,0 +1,47 @@
from duck_preview.dispatcher.frame_dispatcher import FrameDispatcher
def test_subscribe_notified():
dispatcher = FrameDispatcher()
received = []
def cb(frame):
received.append(frame)
dispatcher.subscribe(cb)
dispatcher.on_frame("test")
assert received == ["test"]
def test_unsubscribe_not_notified():
dispatcher = FrameDispatcher()
received = []
def cb(frame):
received.append(frame)
dispatcher.subscribe(cb)
dispatcher.unsubscribe(cb)
dispatcher.on_frame("test")
assert received == []
def test_multiple_subscribers():
dispatcher = FrameDispatcher()
received_1 = []
received_2 = []
def cb1(frame):
received_1.append(frame)
def cb2(frame):
received_2.append(frame)
dispatcher.subscribe(cb1)
dispatcher.subscribe(cb2)
dispatcher.on_frame("test")
assert received_1 == ["test"]
assert received_2 == ["test"]

View File

@@ -1,76 +0,0 @@
"""Tests for FrameDispatcher."""
from __future__ import annotations
from unittest.mock import MagicMock, patch
from app.pipeline.frame_dispatcher import FrameDispatcher
def _make_frame():
frame = MagicMock()
frame.isValid.return_value = True
return frame
class TestFrameDispatcher:
def setup_method(self):
# FrameDispatcher is a QObject — needs QApplication
# Use minimal mock to avoid Qt dependency in unit tests
with patch("app.pipeline.frame_dispatcher.QObject.__init__", return_value=None):
self.dispatcher = FrameDispatcher.__new__(FrameDispatcher)
self.dispatcher._subscribers = []
self.dispatcher._frame_count = 0
self.dispatcher._last_dispatch_time = 0.0
def test_subscribe_adds_subscriber(self):
cb = MagicMock()
self.dispatcher.subscribe(cb)
assert self.dispatcher.subscriber_count() == 1
def test_subscribe_same_callback_twice_is_noop(self):
cb = MagicMock()
self.dispatcher.subscribe(cb)
self.dispatcher.subscribe(cb)
assert self.dispatcher.subscriber_count() == 1
def test_unsubscribe_removes_subscriber(self):
cb = MagicMock()
self.dispatcher.subscribe(cb)
self.dispatcher.unsubscribe(cb)
assert self.dispatcher.subscriber_count() == 0
def test_unsubscribe_nonexistent_does_not_raise(self):
cb = MagicMock()
self.dispatcher.unsubscribe(cb) # should not raise
def test_dispatch_calls_all_subscribers(self):
cb1 = MagicMock()
cb2 = MagicMock()
self.dispatcher.subscribe(cb1)
self.dispatcher.subscribe(cb2)
frame = _make_frame()
self.dispatcher.dispatch(frame)
cb1.assert_called_once_with(frame)
cb2.assert_called_once_with(frame)
def test_dispatch_increments_frame_count(self):
frame = _make_frame()
self.dispatcher.dispatch(frame)
self.dispatcher.dispatch(frame)
assert self.dispatcher.frame_count == 2
def test_dispatch_with_no_subscribers_does_not_raise(self):
frame = _make_frame()
self.dispatcher.dispatch(frame) # should not raise
def test_subscriber_exception_does_not_stop_others(self):
def bad_cb(f):
raise RuntimeError("boom")
good_cb = MagicMock()
self.dispatcher.subscribe(bad_cb)
self.dispatcher.subscribe(good_cb)
frame = _make_frame()
self.dispatcher.dispatch(frame) # should not raise
good_cb.assert_called_once_with(frame)

View File

@@ -1,112 +0,0 @@
"""Tests for TelemetryCollector."""
from __future__ import annotations
import time
from collections import deque
from unittest.mock import MagicMock, patch
class TestTelemetryCollector:
"""Test telemetry calculations in isolation (no Qt event loop required)."""
def _make_collector(self):
"""Construct a TelemetryCollector bypassing Qt machinery."""
from app.telemetry.telemetry_collector import TelemetryCollector
with patch.object(TelemetryCollector, "__init__", return_value=None):
col = TelemetryCollector.__new__(TelemetryCollector)
col._frame_times = deque(maxlen=120)
col._last_frame_time = 0.0
col._total_frames = 0
col._dropped_frames = 0
col._fps_window = deque()
col._fps_window_size_s = 1.0
col._process = MagicMock()
# Simulate Windows: wset is present and takes priority over rss
mem_info = MagicMock()
mem_info.wset = 50 * 1024 * 1024 # 50 MB private working set
mem_info.rss = 70 * 1024 * 1024 # RSS (larger, includes shared)
col._process.memory_info.return_value = mem_info
col._process.cpu_percent.return_value = 0.0
return col
def test_initial_snapshot_has_zero_fps(self):
col = self._make_collector()
snap = col._compute_snapshot()
assert snap.fps == 0.0
def test_fps_counts_frames_in_window(self):
col = self._make_collector()
now = time.perf_counter()
# Simulate 30 frames within the last second
for i in range(30):
col._fps_window.append(now - 0.9 + i * 0.03)
snap = col._compute_snapshot()
assert snap.fps == 30.0
def test_fps_excludes_old_frames(self):
col = self._make_collector()
now = time.perf_counter()
# 10 frames older than 1 second — should not count
for i in range(10):
col._fps_window.append(now - 2.0 + i * 0.05)
# 5 frames within the last second
for i in range(5):
col._fps_window.append(now - 0.4 + i * 0.05)
snap = col._compute_snapshot()
assert snap.fps == 5.0
def test_frame_time_average(self):
col = self._make_collector()
# 10 frames at 33.3 ms each
interval = 0.0333
for _ in range(10):
col._frame_times.append(interval)
snap = col._compute_snapshot()
assert abs(snap.frame_time_ms - interval * 1000) < 0.1
def test_drop_detection(self):
col = self._make_collector()
# Seed with 10 normal frames at ~16 ms
normal_interval = 0.016
now = time.perf_counter()
col._last_frame_time = now - normal_interval * 10
for _ in range(9):
col._last_frame_time += normal_interval
col._frame_times.append(normal_interval)
# Simulate a big gap (3× normal) — should trigger drop detection
col._last_frame_time = now - normal_interval * 10 # reset base
# Manually call on_frame-like logic
with patch("app.telemetry.telemetry_collector.time") as mock_time:
# Set last_frame_time to something reasonable
col._last_frame_time = now
big_delta = normal_interval * 5 # 5× average → drop
mock_time.perf_counter.return_value = now + big_delta
# Replicate the drop detection logic
delta = big_delta
col._frame_times.append(delta)
avg = sum(col._frame_times) / len(col._frame_times)
if delta > avg * 2.5:
col._dropped_frames += 1
assert col._dropped_frames == 1
def test_reset_counters(self):
col = self._make_collector()
col._total_frames = 100
col._dropped_frames = 5
col._frame_times.append(0.016)
col._fps_window.append(time.perf_counter())
col.reset_counters()
assert col._total_frames == 0
assert col._dropped_frames == 0
assert len(col._frame_times) == 0
assert len(col._fps_window) == 0
def test_snapshot_memory_mb(self):
col = self._make_collector()
snap = col._compute_snapshot()
assert snap.memory_mb == 50.0