Compare commits

..

2 Commits

38 changed files with 943 additions and 2081 deletions

View File

@@ -1,80 +0,0 @@
"""Camera enumeration — discovers available video input devices."""
from __future__ import annotations
from dataclasses import dataclass, field
from PySide6.QtMultimedia import QCameraDevice, QMediaDevices
@dataclass
class CameraInfo:
"""Lightweight descriptor of a detected camera."""
device: QCameraDevice
name: str
id: str
formats: list[tuple[int, int, float]] = field(default_factory=list)
# formats: list of (width, height, max_fps)
def __str__(self) -> str:
return f"{self.name} [{self.id}]"
class CameraEnumerator:
"""Discovers available video input devices via QMediaDevices."""
@staticmethod
def list_cameras() -> list[CameraInfo]:
"""Return all available camera devices with their supported formats."""
devices = QMediaDevices.videoInputs()
cameras: list[CameraInfo] = []
for device in devices:
formats: list[tuple[int, int, float]] = []
for fmt in device.videoFormats():
res = fmt.resolution()
fps = fmt.maxFrameRate()
formats.append((res.width(), res.height(), fps))
# deduplicate and sort: largest resolution first, then fps descending
seen: set[tuple[int, int, float]] = set()
unique_formats: list[tuple[int, int, float]] = []
for f in sorted(formats, key=lambda x: (x[0] * x[1], x[2]), reverse=True):
if f not in seen:
seen.add(f)
unique_formats.append(f)
cameras.append(
CameraInfo(
device=device,
name=device.description(),
id=device.id().toStdString()
if hasattr(device.id(), "toStdString")
else device.id().data().decode("utf-8", errors="replace"),
formats=unique_formats,
)
)
return cameras
@staticmethod
def default_camera() -> CameraInfo | None:
"""Return the system default camera, or None if no camera is available."""
device = QMediaDevices.defaultVideoInput()
if device.isNull():
return None
cameras = CameraEnumerator.list_cameras()
# find by id match
default_id = (
device.id().toStdString()
if hasattr(device.id(), "toStdString")
else device.id().data().decode("utf-8", errors="replace")
)
for cam in cameras:
if cam.id == default_id:
return cam
# fallback: wrap directly
return cameras[0] if cameras else None

View File

@@ -1,215 +0,0 @@
"""Camera Service — manages QCamera lifecycle and frame acquisition."""
from __future__ import annotations
import logging
from PySide6.QtCore import QObject, Signal
from PySide6.QtMultimedia import (
QCamera,
QMediaCaptureSession,
QVideoFrame,
QVideoSink,
)
from app.camera.camera_enumerator import CameraInfo
from app.config import DEFAULT_FPS, DEFAULT_HEIGHT, DEFAULT_WIDTH
logger = logging.getLogger(__name__)
class CameraService(QObject):
"""
Manages camera initialisation, configuration and frame delivery.
Emits:
frame_ready(QVideoFrame) — new frame available from the camera
camera_started() — camera successfully opened and streaming
camera_stopped() — camera stopped (clean shutdown)
camera_error(str) — camera error description
format_changed(float) — actual FPS after format was applied
(emitted after camera restarts with new format)
"""
frame_ready = Signal(QVideoFrame)
camera_started = Signal()
camera_stopped = Signal()
camera_error = Signal(str)
format_changed = Signal(float) # actual FPS delivered by camera after format change
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self._camera: QCamera | None = None
self._session = QMediaCaptureSession(self)
self._sink = QVideoSink(self)
self._current_info: CameraInfo | None = None
# Desired format — applied on next (re)start
self._desired_width: int = DEFAULT_WIDTH
self._desired_height: int = DEFAULT_HEIGHT
self._desired_fps: float = float(DEFAULT_FPS)
self._session.setVideoSink(self._sink)
self._sink.videoFrameChanged.connect(self._on_frame)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def start(self, camera_info: CameraInfo) -> None:
"""Start streaming from the given camera device."""
self._stop_camera()
self._current_info = camera_info
self._camera = QCamera(camera_info.device, self)
self._camera.errorOccurred.connect(self._on_error)
self._camera.activeChanged.connect(self._on_active_changed)
self._session.setCamera(self._camera)
self._apply_format()
self._camera.start()
logger.info("Camera start requested: %s", camera_info.name)
def stop(self) -> None:
"""Stop the current camera and forget the device."""
self._stop_camera()
self._current_info = None
def reconnect(self) -> None:
"""Restart the current camera (e.g. after an error or disconnect)."""
if self._current_info is not None:
logger.info("Reconnecting camera: %s", self._current_info.name)
self.start(self._current_info)
else:
logger.warning("Reconnect requested but no camera was previously started")
def set_resolution(self, width: int, height: int) -> None:
"""
Request a new resolution.
The camera is stopped and restarted so the backend reliably applies
the new format (QCamera.setCameraFormat on an active camera is often
silently ignored by Media Foundation on Windows).
"""
self._desired_width = width
self._desired_height = height
if self._current_info is not None:
logger.info("Resolution change requested: %dx%d — restarting camera", width, height)
self.start(self._current_info)
def set_fps(self, fps: float) -> None:
"""
Request a new frame rate.
Same stop+start strategy as set_resolution().
"""
self._desired_fps = fps
if self._current_info is not None:
logger.info("FPS change requested: %.1f — restarting camera", fps)
self.start(self._current_info)
@property
def is_active(self) -> bool:
return self._camera is not None and self._camera.isActive()
@property
def current_camera(self) -> CameraInfo | None:
return self._current_info
# ------------------------------------------------------------------
# Internal video output accessors (kept for future use)
# ------------------------------------------------------------------
def video_sink(self) -> QVideoSink:
return self._sink
def capture_session(self) -> QMediaCaptureSession:
return self._session
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _stop_camera(self) -> None:
"""Stop and destroy the QCamera object without clearing _current_info."""
if self._camera is not None:
self._camera.stop()
self._camera.errorOccurred.disconnect()
self._camera.activeChanged.disconnect()
self._camera = None
logger.debug("Camera stopped (internal)")
def _apply_format(self) -> None:
"""
Select the best matching QCameraFormat and apply it before start().
The format is chosen by score:
+1000 exact resolution match
+100 exact FPS match (within 1 fps)
-|Δpixels| area proximity (tie-breaker)
"""
if self._camera is None or self._current_info is None:
return
best = None
best_score = -1
for fmt in self._current_info.device.videoFormats():
res = fmt.resolution()
w, h = res.width(), res.height()
f = fmt.maxFrameRate()
score = (
int(w == self._desired_width and h == self._desired_height) * 1000
+ int(abs(f - self._desired_fps) < 1) * 100
- abs(w * h - self._desired_width * self._desired_height)
)
if score > best_score:
best_score = score
best = fmt
if best is not None:
self._camera.setCameraFormat(best)
res = best.resolution()
logger.info(
"Camera format requested: %dx%d @ %.1f fps",
res.width(), res.height(), best.maxFrameRate(),
)
def _log_actual_format(self) -> None:
"""Log the format the camera actually started with and emit format_changed."""
if self._camera is None:
return
fmt = self._camera.cameraFormat()
res = fmt.resolution()
actual_fps = fmt.maxFrameRate()
logger.info(
"Camera format ACTUAL: %dx%d @ %.1f fps",
res.width(), res.height(), actual_fps,
)
if actual_fps != self._desired_fps:
logger.warning(
"Requested %.1f fps but camera is delivering %.1f fps "
"(camera may not support this combination)",
self._desired_fps, actual_fps,
)
self.format_changed.emit(actual_fps)
def _on_frame(self, frame: QVideoFrame) -> None:
if frame.isValid():
self.frame_ready.emit(frame)
def _on_error(self, error: QCamera.Error, error_string: str) -> None:
logger.error("Camera error %s: %s", error, error_string)
self.camera_error.emit(error_string)
def _on_active_changed(self, active: bool) -> None:
if active:
name = self._current_info.name if self._current_info else "?"
logger.info("Camera active: %s", name)
self._log_actual_format() # report what the camera actually accepted
self.camera_started.emit()
else:
logger.info("Camera inactive")
self.camera_stopped.emit()

View File

@@ -1,22 +0,0 @@
"""Application-wide constants and default settings."""
APP_NAME = "Duck Preview"
APP_VERSION = "0.1.0"
# Default camera settings
DEFAULT_FPS = 30
DEFAULT_WIDTH = 1280
DEFAULT_HEIGHT = 720
# Telemetry
TELEMETRY_UPDATE_INTERVAL_MS = 500 # how often the metrics snapshot is refreshed
# Overlay
OVERLAY_BG_COLOR = (0, 0, 0, 160) # RGBA
OVERLAY_TEXT_COLOR = (255, 255, 255, 255)
OVERLAY_FONT_SIZE = 13
OVERLAY_PADDING = 10
OVERLAY_MARGIN = 10
# Frame dispatcher
DISPATCHER_MAX_QUEUE_SIZE = 2 # max pending frames per slow subscriber before drop

View File

@@ -1,35 +0,0 @@
"""Application entry point."""
from __future__ import annotations
import logging
import sys
from PySide6.QtCore import Qt
from PySide6.QtWidgets import QApplication
from app.config import APP_NAME
from app.ui.main_window import MainWindow
def main() -> None:
# Basic logging — WARNING by default; Debug menu toggles to DEBUG
logging.basicConfig(
level=logging.WARNING,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
app = QApplication(sys.argv)
app.setApplicationName(APP_NAME)
app.setHighDpiScaleFactorRoundingPolicy(
Qt.HighDpiScaleFactorRoundingPolicy.PassThrough
)
window = MainWindow()
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()

View File

@@ -1,54 +0,0 @@
"""IOverlayLayer — interface for pluggable overlay layers drawn on CameraView."""
from __future__ import annotations
from abc import ABC, abstractmethod
from PySide6.QtCore import QRect
from PySide6.QtGui import QPainter
class IOverlayLayer(ABC):
"""
Interface for a single overlay layer drawn over the camera frame.
Each layer receives the active QPainter and the video rect (the letterboxed
area where the camera image was drawn) so it can position elements relative
to the actual video content if needed.
To add a new overlay (e.g. YOLO bboxes):
1. Subclass IOverlayLayer.
2. Implement paint().
3. Register with CameraView.add_overlay_layer().
No Qt subclassing is required — layers are plain Python objects.
"""
@property
def name(self) -> str:
"""Human-readable identifier used in menus / debug output."""
return type(self).__name__
@property
def visible(self) -> bool:
"""Whether this layer should be drawn."""
return self._visible
@visible.setter
def visible(self, value: bool) -> None:
self._visible = value
def __init__(self) -> None:
self._visible: bool = True
@abstractmethod
def paint(self, painter: QPainter, video_rect: QRect) -> None:
"""
Draw this layer.
Args:
painter: Active QPainter on the CameraView widget.
Caller saves/restores painter state around each layer.
video_rect: The QRect where the camera image was drawn (letterboxed).
Use this to position overlays relative to the video image.
"""

View File

@@ -1,109 +0,0 @@
"""TelemetryOverlay — draws the performance metrics box on the camera view."""
from __future__ import annotations
from PySide6.QtCore import QRect, Qt, Slot
from PySide6.QtGui import QColor, QFont, QPainter, QPen
from app.config import (
OVERLAY_BG_COLOR,
OVERLAY_FONT_SIZE,
OVERLAY_MARGIN,
OVERLAY_PADDING,
OVERLAY_TEXT_COLOR,
)
from app.overlay.overlay_layer import IOverlayLayer
from app.telemetry.telemetry_collector import TelemetrySnapshot
class TelemetryOverlay(IOverlayLayer):
"""
Renders a semi-transparent metrics box in the top-left corner.
Usage:
overlay = TelemetryOverlay()
camera_view.add_overlay_layer(overlay)
telemetry_collector.metrics_updated.connect(overlay.on_metrics_updated)
Display format:
FPS req 60.0 ← what was requested from camera
FPS got 30.2 ← what camera actually delivered
Frame 33.1 ms
Drop 0
CPU sys 14.8 % ← normalised by cpu_count (matches Task Manager)
CPU core 118.4 % ← per single core (can exceed 100%)
Mem 68 MB
"""
def __init__(self) -> None:
super().__init__()
self._snapshot: TelemetrySnapshot | None = None
self._font = QFont("Monospace")
self._font.setStyleHint(QFont.StyleHint.TypeWriter)
self._font.setPointSize(OVERLAY_FONT_SIZE)
self._font.setBold(False)
@Slot(object)
def on_metrics_updated(self, snapshot: TelemetrySnapshot) -> None:
"""Receive a new snapshot from TelemetryCollector."""
self._snapshot = snapshot
# ------------------------------------------------------------------
# IOverlayLayer
# ------------------------------------------------------------------
def paint(self, painter: QPainter, video_rect: QRect) -> None:
if self._snapshot is None:
return
lines = self._format_lines(self._snapshot)
if not lines:
return
painter.setFont(self._font)
fm = painter.fontMetrics()
line_height = fm.height()
max_width = max(fm.horizontalAdvance(line) for line in lines)
box_w = max_width + OVERLAY_PADDING * 2
box_h = line_height * len(lines) + OVERLAY_PADDING * 2
x = video_rect.left() + OVERLAY_MARGIN
y = video_rect.top() + OVERLAY_MARGIN
# Background
painter.setBrush(QColor(*OVERLAY_BG_COLOR))
painter.setPen(Qt.PenStyle.NoPen)
painter.drawRoundedRect(QRect(x, y, box_w, box_h), 6, 6)
# Text
painter.setPen(QPen(QColor(*OVERLAY_TEXT_COLOR)))
text_x = x + OVERLAY_PADDING
text_y = y + OVERLAY_PADDING + fm.ascent()
for line in lines:
painter.drawText(text_x, text_y, line)
text_y += line_height
# ------------------------------------------------------------------
# Private
# ------------------------------------------------------------------
@staticmethod
def _format_lines(snap: TelemetrySnapshot) -> list[str]:
lines: list[str] = []
# FPS — show target if known, then actual
if snap.target_fps is not None:
lines.append(f"FPS req {snap.target_fps:>6.1f}")
lines.append(f"FPS got {snap.fps:>6.1f}")
lines.append(f"Frame {snap.frame_time_ms:>6.1f} ms")
lines.append(f"Drop {snap.dropped_frames:>6d}")
lines.append(f"CPU sys {snap.cpu_percent_sys:>5.1f} %")
lines.append(f"CPU core {snap.cpu_percent_core:>5.1f} %")
if snap.memory_mb is not None:
lines.append(f"Mem {snap.memory_mb:>5.0f} MB")
return lines

View File

@@ -1,111 +0,0 @@
"""Frame Dispatcher — distributes QVideoFrames to registered subscribers."""
from __future__ import annotations
import logging
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from PySide6.QtCore import QObject, Slot
from PySide6.QtMultimedia import QVideoFrame
logger = logging.getLogger(__name__)
FrameCallback = Callable[[QVideoFrame], None]
@dataclass
class _Subscriber:
callback: FrameCallback
drop_if_busy: bool = True
_busy: bool = field(default=False, init=False, repr=False)
class FrameDispatcher(QObject):
"""
Receives frames from CameraService and fans them out to all subscribers.
Each subscriber is a callable (QVideoFrame) -> None.
Subscribers that set drop_if_busy=True will skip a frame if they are still
processing the previous one (non-blocking). Subscribers with drop_if_busy=False
always receive every frame.
All dispatch happens in the GUI thread (Qt signal/slot), so subscribers
must NOT perform heavy work directly — they should queue to a worker thread
if processing is needed.
"""
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self._subscribers: list[_Subscriber] = []
self._frame_count: int = 0
self._last_dispatch_time: float = 0.0
# ------------------------------------------------------------------
# Subscription API
# ------------------------------------------------------------------
def subscribe(self, callback: FrameCallback, *, drop_if_busy: bool = True) -> None:
"""Register a frame callback.
Args:
callback: Callable that receives QVideoFrame.
drop_if_busy: When True, frame is skipped if subscriber is still
marked busy from last call (default True).
"""
for sub in self._subscribers:
if sub.callback is callback:
logger.warning("Subscriber %r already registered", callback)
return
self._subscribers.append(_Subscriber(callback=callback, drop_if_busy=drop_if_busy))
logger.debug("Subscriber added: %r (drop_if_busy=%s)", callback, drop_if_busy)
def unsubscribe(self, callback: FrameCallback) -> None:
"""Remove a previously registered callback."""
before = len(self._subscribers)
self._subscribers = [s for s in self._subscribers if s.callback is not callback]
if len(self._subscribers) < before:
logger.debug("Subscriber removed: %r", callback)
else:
logger.warning("Subscriber not found for removal: %r", callback)
def subscriber_count(self) -> int:
return len(self._subscribers)
# ------------------------------------------------------------------
# Frame intake — connect CameraService.frame_ready to this slot
# ------------------------------------------------------------------
@Slot(QVideoFrame)
def dispatch(self, frame: QVideoFrame) -> None:
"""Distribute the frame to all registered subscribers."""
self._frame_count += 1
now = time.perf_counter()
self._last_dispatch_time = now
for sub in self._subscribers:
if sub.drop_if_busy and sub._busy:
logger.debug("Dropping frame for busy subscriber %r", sub.callback)
continue
sub._busy = True
try:
sub.callback(frame)
except Exception:
logger.exception("Error in frame subscriber %r", sub.callback)
finally:
sub._busy = False
# ------------------------------------------------------------------
# Stats
# ------------------------------------------------------------------
@property
def frame_count(self) -> int:
return self._frame_count
@property
def last_dispatch_time(self) -> float:
"""perf_counter timestamp of the last dispatched frame."""
return self._last_dispatch_time

View File

@@ -1,190 +0,0 @@
"""Telemetry Collector — measures video pipeline performance metrics."""
from __future__ import annotations
import time
from collections import deque
from dataclasses import dataclass
import psutil
from PySide6.QtCore import QObject, QTimer, Signal
from PySide6.QtMultimedia import QVideoFrame
from app.config import TELEMETRY_UPDATE_INTERVAL_MS
@dataclass
class TelemetrySnapshot:
"""Immutable snapshot of current performance metrics."""
fps: float # actual frames received in the last second
target_fps: float | None # FPS requested from the camera (None = unknown)
frame_time_ms: float # average inter-frame time in ms
dropped_frames: int # cumulative dropped frames detected
cpu_percent_sys: float # process CPU as % of total system capacity
# (divided by cpu_count) — matches Task Manager
cpu_percent_core: float # process CPU per single core — can exceed 100%
memory_mb: float | None # process private working set in MB
timestamp: float # time.perf_counter() when snapshot was taken
class TelemetryCollector(QObject):
"""
Frame subscriber that measures pipeline performance.
Connect to FrameDispatcher:
dispatcher.subscribe(collector.on_frame, drop_if_busy=False)
Receive target FPS updates from CameraService:
camera_service.format_changed.connect(collector.set_target_fps)
Listen to metrics updates:
collector.metrics_updated.connect(my_slot)
"""
metrics_updated = Signal(object) # emits TelemetrySnapshot
def __init__(
self,
update_interval_ms: int = TELEMETRY_UPDATE_INTERVAL_MS,
parent: QObject | None = None,
) -> None:
super().__init__(parent)
self._update_interval_ms = update_interval_ms
self._target_fps: float | None = None
# frame timing ring-buffer (last 120 samples)
self._frame_times: deque[float] = deque(maxlen=120)
self._last_frame_time: float = 0.0
self._total_frames: int = 0
self._dropped_frames: int = 0
# FPS window — count frames in the last second
self._fps_window: deque[float] = deque()
self._fps_window_size_s: float = 1.0
# psutil — initialise baseline so first real reading is non-zero
self._process = psutil.Process()
self._process.cpu_percent() # first call always returns 0.0; discard
self._cpu_count: int = max(psutil.cpu_count(logical=True) or 1, 1)
# periodic snapshot timer
self._timer = QTimer(self)
self._timer.setInterval(update_interval_ms)
self._timer.timeout.connect(self._emit_snapshot)
self._timer.start()
self._latest: TelemetrySnapshot = self._make_empty_snapshot()
# ------------------------------------------------------------------
# Configuration
# ------------------------------------------------------------------
def set_target_fps(self, fps: float | None) -> None:
"""Record the FPS that was requested from the camera."""
self._target_fps = fps
# ------------------------------------------------------------------
# Frame subscriber callback
# ------------------------------------------------------------------
def on_frame(self, frame: QVideoFrame) -> None:
"""Called by FrameDispatcher for every frame. Must be fast."""
now = time.perf_counter()
if self._last_frame_time > 0:
delta = now - self._last_frame_time
self._frame_times.append(delta)
# drop detection: gap > 2.5× rolling average
if len(self._frame_times) >= 5:
avg = sum(self._frame_times) / len(self._frame_times)
if delta > avg * 2.5:
self._dropped_frames += 1
self._last_frame_time = now
self._total_frames += 1
self._fps_window.append(now)
cutoff = now - self._fps_window_size_s
while self._fps_window and self._fps_window[0] < cutoff:
self._fps_window.popleft()
# ------------------------------------------------------------------
# Snapshot
# ------------------------------------------------------------------
def latest_snapshot(self) -> TelemetrySnapshot:
return self._latest
def reset_counters(self) -> None:
"""Reset cumulative counters (e.g. after camera switch)."""
self._frame_times.clear()
self._fps_window.clear()
self._last_frame_time = 0.0
self._total_frames = 0
self._dropped_frames = 0
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _emit_snapshot(self) -> None:
snapshot = self._compute_snapshot()
self._latest = snapshot
self.metrics_updated.emit(snapshot)
def _compute_snapshot(self) -> TelemetrySnapshot:
now = time.perf_counter()
# FPS — prune stale entries before counting
cutoff = now - self._fps_window_size_s
while self._fps_window and self._fps_window[0] < cutoff:
self._fps_window.popleft()
fps = float(len(self._fps_window))
# average frame time
avg_frame_time_ms = (
(sum(self._frame_times) / len(self._frame_times)) * 1000.0
if self._frame_times
else 0.0
)
# CPU — per-core reading, then derive system-normalised value
try:
cpu_core = self._process.cpu_percent()
except Exception:
cpu_core = 0.0
cpu_sys = cpu_core / self._cpu_count
# Memory — private working set (Windows) or RSS (macOS/Linux)
try:
mem_info = self._process.memory_info()
mem_bytes = getattr(mem_info, "wset", None) or mem_info.rss
mem_mb: float | None = mem_bytes / (1024 * 1024)
except Exception:
mem_mb = None
return TelemetrySnapshot(
fps=round(fps, 1),
target_fps=self._target_fps,
frame_time_ms=round(avg_frame_time_ms, 2),
dropped_frames=self._dropped_frames,
cpu_percent_sys=round(cpu_sys, 1),
cpu_percent_core=round(cpu_core, 1),
memory_mb=round(mem_mb, 1) if mem_mb is not None else None,
timestamp=now,
)
def _make_empty_snapshot(self) -> TelemetrySnapshot:
return TelemetrySnapshot(
fps=0.0,
target_fps=self._target_fps,
frame_time_ms=0.0,
dropped_frames=0,
cpu_percent_sys=0.0,
cpu_percent_core=0.0,
memory_mb=None,
timestamp=time.perf_counter(),
)

View File

View File

@@ -1,144 +0,0 @@
"""CameraView — QWidget that renders camera frames and composites overlay layers.
Responsibilities:
- Receive QVideoFrame, convert to QImage, schedule repaint.
- In paintEvent: draw the frame (letterboxed) then call paint() on each
registered IOverlayLayer in order.
- Know nothing about what specific overlays draw — that is entirely up to
each IOverlayLayer implementation.
Adding a new overlay (e.g. YOLO bboxes):
layer = YoloBboxOverlay()
camera_view.add_overlay_layer(layer)
yolo_processor.results_ready.connect(layer.on_results)
No modification to CameraView is needed.
"""
from __future__ import annotations
import logging
from PySide6.QtCore import QRect, Qt, Slot
from PySide6.QtGui import QImage, QPainter
from PySide6.QtMultimedia import QVideoFrame
from PySide6.QtWidgets import QWidget
from app.overlay.overlay_layer import IOverlayLayer
logger = logging.getLogger(__name__)
class CameraView(QWidget):
"""
Camera preview widget.
Frame pipeline:
CameraService.frame_ready(QVideoFrame)
→ CameraView.on_frame() — convert to QImage, schedule update()
→ paintEvent() — draw image + all overlay layers
"""
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self.setStyleSheet("background: black;")
self._current_image: QImage | None = None
self._video_rect: QRect = QRect()
self._overlay_layers: list[IOverlayLayer] = []
# ------------------------------------------------------------------
# Overlay layer registry
# ------------------------------------------------------------------
def add_overlay_layer(self, layer: IOverlayLayer) -> None:
"""Register an overlay layer. Layers are painted in registration order."""
if layer not in self._overlay_layers:
self._overlay_layers.append(layer)
logger.debug("Overlay layer added: %s", layer.name)
def remove_overlay_layer(self, layer: IOverlayLayer) -> None:
"""Unregister an overlay layer."""
try:
self._overlay_layers.remove(layer)
logger.debug("Overlay layer removed: %s", layer.name)
except ValueError:
pass
def set_all_overlays_visible(self, visible: bool) -> None:
"""Show or hide all registered overlay layers at once."""
for layer in self._overlay_layers:
layer.visible = visible
self.update()
# ------------------------------------------------------------------
# Frame input
# ------------------------------------------------------------------
@Slot(QVideoFrame)
def on_frame(self, frame: QVideoFrame) -> None:
"""Receive a camera frame, convert to QImage, schedule repaint."""
if not frame.isValid():
return
image = frame.toImage()
if image.isNull():
return
# Convert to Format_RGB32 — QPainter's fastest path on all platforms
if image.format() != QImage.Format.Format_RGB32:
image = image.convertToFormat(QImage.Format.Format_RGB32)
self._current_image = image
self.update()
# ------------------------------------------------------------------
# Qt paint
# ------------------------------------------------------------------
def paintEvent(self, event) -> None: # noqa: N802
painter = QPainter(self)
# 1. Background
painter.fillRect(self.rect(), Qt.GlobalColor.black)
# 2. Camera frame — letterboxed
if self._current_image is not None:
img = self._current_image
self._video_rect = _letterbox_rect(
img.width(), img.height(), self.width(), self.height()
)
painter.drawImage(self._video_rect, img)
else:
self._video_rect = self.rect()
# 3. Overlay layers — each gets a clean painter state
for layer in self._overlay_layers:
if not layer.visible:
continue
painter.save()
try:
layer.paint(painter, self._video_rect)
except Exception:
logger.exception("Error painting overlay layer %s", layer.name)
finally:
painter.restore()
painter.end()
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _letterbox_rect(img_w: int, img_h: int, widget_w: int, widget_h: int) -> QRect:
"""Return the largest rect that fits the image while preserving aspect ratio."""
if img_w <= 0 or img_h <= 0:
return QRect(0, 0, widget_w, widget_h)
scale = min(widget_w / img_w, widget_h / img_h)
draw_w = int(img_w * scale)
draw_h = int(img_h * scale)
x = (widget_w - draw_w) // 2
y = (widget_h - draw_h) // 2
return QRect(x, y, draw_w, draw_h)

View File

@@ -1,174 +0,0 @@
"""Main application window."""
from __future__ import annotations
import logging
from PySide6.QtCore import QTimer
from PySide6.QtWidgets import QLabel, QMainWindow, QSizePolicy, QStatusBar
from app.camera.camera_enumerator import CameraEnumerator, CameraInfo
from app.camera.camera_service import CameraService
from app.config import APP_NAME, APP_VERSION
from app.overlay.telemetry_overlay import TelemetryOverlay
from app.pipeline.frame_dispatcher import FrameDispatcher
from app.telemetry.telemetry_collector import TelemetryCollector
from app.ui.camera_view import CameraView
from app.ui.menu_bar import AppMenuBar
logger = logging.getLogger(__name__)
class MainWindow(QMainWindow):
"""
Top-level application window.
Rendering architecture:
QVideoWidget is intentionally NOT used — on Windows its native HWND
surface occludes all sibling/child QWidgets regardless of z-order.
CameraView is a plain QWidget that renders frames and overlay layers
in a single paintEvent pass.
Signal flow:
CameraService.frame_ready
→ FrameDispatcher.dispatch
→ CameraView.on_frame (render frame)
→ TelemetryCollector.on_frame (measure metrics)
→ TelemetryOverlay.on_metrics_updated (feed overlay data)
(CameraView repaints and calls TelemetryOverlay.paint())
"""
def __init__(self) -> None:
super().__init__()
self.setWindowTitle(f"{APP_NAME} v{APP_VERSION}")
self.setMinimumSize(640, 480)
self.resize(1280, 720)
# --- Core pipeline components ---
self._camera_service = CameraService(self)
self._dispatcher = FrameDispatcher(self)
self._telemetry = TelemetryCollector(parent=self)
# --- Camera view (central widget) ---
self._camera_view = CameraView(self)
self._camera_view.setSizePolicy(
QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding
)
self.setCentralWidget(self._camera_view)
# --- Overlay layers ---
self._telemetry_overlay = TelemetryOverlay()
self._camera_view.add_overlay_layer(self._telemetry_overlay)
# --- Menu bar ---
self._menu = AppMenuBar(self)
self.setMenuBar(self._menu)
# --- Status bar ---
self._status_bar = QStatusBar(self)
self.setStatusBar(self._status_bar)
self._status_label = QLabel("Initialising…")
self._status_bar.addWidget(self._status_label)
# --- Wire signals ---
self._wire_signals()
# --- Enumerate cameras and start ---
QTimer.singleShot(0, self._initialise_cameras)
# ------------------------------------------------------------------
# Initialisation
# ------------------------------------------------------------------
def _initialise_cameras(self) -> None:
cameras = CameraEnumerator.list_cameras()
if not cameras:
self._status_label.setText("No cameras found")
logger.warning("No cameras detected")
return
self._menu.populate_cameras(cameras)
default = CameraEnumerator.default_camera()
start_cam = default if default is not None else cameras[0]
self._menu.populate_formats(start_cam)
self._start_camera(start_cam)
def _start_camera(self, cam: CameraInfo) -> None:
self._telemetry.reset_counters()
self._camera_service.start(cam)
self._menu.set_active_camera(cam)
self._status_label.setText(f"Opening: {cam.name}")
# ------------------------------------------------------------------
# Signal wiring
# ------------------------------------------------------------------
def _wire_signals(self) -> None:
# CameraService → FrameDispatcher
self._camera_service.frame_ready.connect(self._dispatcher.dispatch)
# FrameDispatcher → CameraView (render) — drop if busy: stay fluid
self._dispatcher.subscribe(self._camera_view.on_frame, drop_if_busy=True)
# FrameDispatcher → TelemetryCollector — never drop, count every frame
self._dispatcher.subscribe(self._telemetry.on_frame, drop_if_busy=False)
# TelemetryCollector → TelemetryOverlay (data only, no repaint trigger here)
self._telemetry.metrics_updated.connect(self._telemetry_overlay.on_metrics_updated)
# CameraService → TelemetryCollector: keep target FPS in sync
self._camera_service.format_changed.connect(self._telemetry.set_target_fps)
# CameraService status
self._camera_service.camera_started.connect(self._on_camera_started)
self._camera_service.camera_stopped.connect(self._on_camera_stopped)
self._camera_service.camera_error.connect(self._on_camera_error)
# Menu signals
self._menu.camera_selected.connect(self._on_camera_selected)
self._menu.resolution_selected.connect(self._on_resolution_selected)
self._menu.fps_selected.connect(self._on_fps_selected)
self._menu.reconnect_requested.connect(self._camera_service.reconnect)
self._menu.overlay_toggled.connect(self._camera_view.set_all_overlays_visible)
# ------------------------------------------------------------------
# Camera status slots
# ------------------------------------------------------------------
def _on_camera_started(self) -> None:
cam = self._camera_service.current_camera
name = cam.name if cam else "Unknown"
self._status_label.setText(f"Streaming: {name}")
logger.info("Camera streaming: %s", name)
def _on_camera_stopped(self) -> None:
self._status_label.setText("Camera stopped")
def _on_camera_error(self, message: str) -> None:
self._status_label.setText(f"Error: {message}")
logger.error("Camera error: %s", message)
# ------------------------------------------------------------------
# Menu action slots
# ------------------------------------------------------------------
def _on_camera_selected(self, cam: CameraInfo) -> None:
self._start_camera(cam)
def _on_resolution_selected(self, width: int, height: int) -> None:
self._camera_service.set_resolution(width, height)
def _on_fps_selected(self, fps: float) -> None:
self._camera_service.set_fps(fps)
# ------------------------------------------------------------------
# Qt overrides
# ------------------------------------------------------------------
def closeEvent(self, event) -> None: # noqa: N802
self._camera_service.stop()
super().closeEvent(event)

View File

@@ -1,198 +0,0 @@
"""Menu bar — camera, video format, FPS and debug controls."""
from __future__ import annotations
import logging
from PySide6.QtCore import Signal
from PySide6.QtGui import QAction, QActionGroup
from PySide6.QtWidgets import QMenuBar, QWidget
from app.camera.camera_enumerator import CameraInfo
logger = logging.getLogger(__name__)
class AppMenuBar(QMenuBar):
"""
Application menu bar.
Signals:
camera_selected(CameraInfo) — user picked a camera
resolution_selected(int, int) — user picked (width, height)
fps_selected(float) — user picked a target FPS
reconnect_requested() — user hit Reconnect
overlay_toggled(bool) — overlay show/hide
log_toggled(bool) — console logging on/off
"""
camera_selected = Signal(object) # CameraInfo
resolution_selected = Signal(int, int)
fps_selected = Signal(float)
reconnect_requested = Signal()
overlay_toggled = Signal(bool)
log_toggled = Signal(bool)
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self._camera_group: QActionGroup | None = None
self._resolution_group: QActionGroup | None = None
self._fps_group: QActionGroup | None = None
self._cameras: list[CameraInfo] = []
self._build_menus()
# ------------------------------------------------------------------
# Public API — called after camera enumeration
# ------------------------------------------------------------------
def populate_cameras(self, cameras: list[CameraInfo]) -> None:
"""Populate the Camera menu with discovered devices."""
self._cameras = cameras
menu = self._camera_menu
# Remove existing camera actions (keep Reconnect + separator)
for action in list(menu.actions()):
if action not in (self._reconnect_action, self._cam_separator):
menu.removeAction(action)
self._camera_group = QActionGroup(self)
self._camera_group.setExclusive(True)
for cam in cameras:
action = QAction(cam.name, self)
action.setCheckable(True)
action.setData(cam)
self._camera_group.addAction(action)
menu.insertAction(self._cam_separator, action)
action.triggered.connect(self._on_camera_action)
if cameras:
first = self._camera_group.actions()[0]
first.setChecked(True)
def populate_formats(self, camera_info: CameraInfo) -> None:
"""Populate Resolution and FPS menus based on a camera's supported formats."""
self._populate_resolutions(camera_info)
self._populate_fps(camera_info)
def set_active_camera(self, camera_info: CameraInfo) -> None:
"""Check the menu item matching camera_info."""
if self._camera_group is None:
return
for action in self._camera_group.actions():
if action.data() is camera_info:
action.setChecked(True)
return
# ------------------------------------------------------------------
# Menu construction
# ------------------------------------------------------------------
def _build_menus(self) -> None:
# --- Camera menu ---
self._camera_menu = self.addMenu("Camera")
self._cam_separator = self._camera_menu.addSeparator()
self._reconnect_action = QAction("Reconnect", self)
self._reconnect_action.triggered.connect(self.reconnect_requested)
self._camera_menu.addAction(self._reconnect_action)
# --- Video menu ---
self._video_menu = self.addMenu("Video")
self._res_menu = self._video_menu.addMenu("Resolution")
self._fps_menu = self._video_menu.addMenu("FPS")
# --- Debug menu ---
debug_menu = self.addMenu("Debug")
self._overlay_action = QAction("Show Overlay", self)
self._overlay_action.setCheckable(True)
self._overlay_action.setChecked(True)
self._overlay_action.toggled.connect(self.overlay_toggled)
debug_menu.addAction(self._overlay_action)
self._log_action = QAction("Console Logging", self)
self._log_action.setCheckable(True)
self._log_action.setChecked(False)
self._log_action.toggled.connect(self._on_log_toggled)
debug_menu.addAction(self._log_action)
def _populate_resolutions(self, camera_info: CameraInfo) -> None:
self._res_menu.clear()
self._resolution_group = QActionGroup(self)
self._resolution_group.setExclusive(True)
seen: set[tuple[int, int]] = set()
for w, h, _ in camera_info.formats:
key = (w, h)
if key in seen:
continue
seen.add(key)
action = QAction(f"{w} × {h}", self)
action.setCheckable(True)
action.setData((w, h))
self._resolution_group.addAction(action)
self._res_menu.addAction(action)
action.triggered.connect(self._on_resolution_action)
actions = self._resolution_group.actions()
if actions:
actions[0].setChecked(True)
def _populate_fps(self, camera_info: CameraInfo) -> None:
self._fps_menu.clear()
self._fps_group = QActionGroup(self)
self._fps_group.setExclusive(True)
seen: set[int] = set()
for _, _, fps in camera_info.formats:
key = round(fps)
if key in seen:
continue
seen.add(key)
action = QAction(f"{key} fps", self)
action.setCheckable(True)
action.setData(float(fps))
self._fps_group.addAction(action)
self._fps_menu.addAction(action)
action.triggered.connect(self._on_fps_action)
actions = self._fps_group.actions()
if actions:
actions[0].setChecked(True)
# ------------------------------------------------------------------
# Slots
# ------------------------------------------------------------------
def _on_camera_action(self) -> None:
action = self.sender()
if action is None:
return
cam: CameraInfo = action.data()
logger.debug("Camera selected: %s", cam.name)
self.camera_selected.emit(cam)
self._populate_resolutions(cam)
self._populate_fps(cam)
def _on_resolution_action(self) -> None:
action = self.sender()
if action is None:
return
w, h = action.data()
logger.debug("Resolution selected: %dx%d", w, h)
self.resolution_selected.emit(w, h)
def _on_fps_action(self) -> None:
action = self.sender()
if action is None:
return
fps: float = action.data()
logger.debug("FPS selected: %.1f", fps)
self.fps_selected.emit(fps)
def _on_log_toggled(self, enabled: bool) -> None:
level = logging.DEBUG if enabled else logging.WARNING
logging.getLogger().setLevel(level)
self.log_toggled.emit(enabled)

3
duck_preview/__main__.py Normal file
View File

@@ -0,0 +1,3 @@
from duck_preview.app import main
main()

47
duck_preview/app.py Normal file
View File

@@ -0,0 +1,47 @@
from __future__ import annotations
import sys
from PySide6.QtCore import QTimer
from PySide6.QtMultimedia import QVideoSink
from PySide6.QtWidgets import QApplication
from duck_preview.camera.service import CameraService
from duck_preview.dispatcher.frame_dispatcher import FrameDispatcher
from duck_preview.main_window import MainWindow
from duck_preview.rendering.overlay import OverlayWidget
from duck_preview.rendering.video_widget import VideoWidget
from duck_preview.telemetry.collector import TelemetryCollector
def main() -> None:
app = QApplication(sys.argv)
app.setApplicationName("Duck Preview")
camera = CameraService()
video_sink = QVideoSink()
dispatcher = FrameDispatcher()
telemetry = TelemetryCollector()
video_widget = VideoWidget()
overlay = OverlayWidget()
camera.set_video_widget(video_widget)
camera.set_video_sink(video_sink)
video_sink.videoFrameChanged.connect(dispatcher.on_frame)
dispatcher.subscribe(telemetry.on_frame)
window = MainWindow(camera, video_widget, overlay)
camera.error_occurred.connect(lambda msg: overlay.set_metrics({"error": msg}))
metrics_timer = QTimer()
metrics_timer.timeout.connect(lambda: overlay.set_metrics(telemetry.metrics()))
metrics_timer.start(200)
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,59 @@
from __future__ import annotations
from PySide6.QtCore import QObject, Signal
from PySide6.QtMultimedia import (
QCamera,
QCameraDevice,
QCameraFormat,
QMediaCaptureSession,
QMediaDevices,
QVideoSink,
)
from PySide6.QtMultimediaWidgets import QVideoWidget
class CameraService(QObject):
error_occurred = Signal(str)
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self._session = QMediaCaptureSession()
self._camera: QCamera | None = None
@property
def session(self) -> QMediaCaptureSession:
return self._session
def set_video_widget(self, widget: QVideoWidget) -> None:
self._session.setVideoOutput(widget)
def set_video_sink(self, sink: QVideoSink) -> None:
self._session.setVideoSink(sink)
@staticmethod
def available_cameras() -> list[QCameraDevice]:
return QMediaDevices.videoInputs()
def is_active(self) -> bool:
return self._camera is not None and self._camera.isActive()
def start(self, device: QCameraDevice) -> None:
self.stop()
self._camera = QCamera(device, self)
self._camera.errorOccurred.connect(self._on_error)
self._session.setCamera(self._camera)
self._camera.start()
def stop(self) -> None:
if self._camera is not None:
self._camera.stop()
self._session.setCamera(None)
self._camera.deleteLater()
self._camera = None
def set_camera_format(self, fmt: QCameraFormat) -> None:
if self._camera is not None:
self._camera.setCameraFormat(fmt)
def _on_error(self, error: QCamera.Error, error_string: str) -> None:
self.error_occurred.emit(error_string)

View File

@@ -0,0 +1,22 @@
from __future__ import annotations
from collections.abc import Callable
from PySide6.QtCore import QObject
from PySide6.QtMultimedia import QVideoFrame
class FrameDispatcher(QObject):
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self._subscribers: list[Callable[[QVideoFrame], None]] = []
def subscribe(self, callback: Callable[[QVideoFrame], None]) -> None:
self._subscribers.append(callback)
def unsubscribe(self, callback: Callable[[QVideoFrame], None]) -> None:
self._subscribers.remove(callback)
def on_frame(self, frame: QVideoFrame) -> None:
for cb in self._subscribers:
cb(frame)

139
duck_preview/main_window.py Normal file
View File

@@ -0,0 +1,139 @@
from __future__ import annotations
from PySide6.QtCore import QCameraPermission, Qt
from PySide6.QtGui import QAction, QCloseEvent
from PySide6.QtMultimedia import QCameraDevice, QMediaDevices
from PySide6.QtWidgets import QApplication, QGridLayout, QMainWindow, QWidget
from duck_preview.camera.service import CameraService
from duck_preview.rendering.overlay import OverlayWidget
from duck_preview.rendering.video_widget import VideoWidget
class MainWindow(QMainWindow):
def __init__(
self,
camera_service: CameraService,
video_widget: VideoWidget,
overlay_widget: OverlayWidget,
parent: QWidget | None = None,
) -> None:
super().__init__(parent)
self._camera = camera_service
self._video_widget = video_widget
self._overlay = overlay_widget
self._media_devices = QMediaDevices()
self.setWindowTitle("Duck Preview")
self.resize(1280, 720)
central = QWidget()
self.setCentralWidget(central)
layout = QGridLayout(central)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
layout.addWidget(video_widget, 0, 0)
layout.addWidget(overlay_widget, 0, 0)
self._setup_menus()
self._media_devices.videoInputsChanged.connect(self._rebuild_camera_menu)
def _setup_menus(self) -> None:
menu_bar = self.menuBar()
self._camera_menu = menu_bar.addMenu("Camera")
self._rebuild_camera_menu()
self._resolution_menu = menu_bar.addMenu("Resolution")
self._resolution_menu.setEnabled(False)
self._fps_menu = menu_bar.addMenu("FPS")
self._fps_menu.setEnabled(False)
debug_menu = menu_bar.addMenu("Debug")
toggle_overlay = QAction("Show Metrics", self)
toggle_overlay.setCheckable(True)
toggle_overlay.setChecked(True)
toggle_overlay.triggered.connect(self._overlay.set_visible)
debug_menu.addAction(toggle_overlay)
def _rebuild_camera_menu(self) -> None:
self._camera_menu.clear()
cameras = CameraService.available_cameras()
for device in cameras:
action = QAction(device.description(), self)
action.triggered.connect(lambda checked, d=device: self._on_camera_selected(d))
self._camera_menu.addAction(action)
if not cameras:
action = QAction("No cameras detected", self)
action.setEnabled(False)
self._camera_menu.addAction(action)
def _on_camera_selected(self, device: QCameraDevice) -> None:
self._request_camera_permission(device)
def _request_camera_permission(self, device: QCameraDevice) -> None:
perm = QCameraPermission()
match QApplication.checkPermission(perm):
case Qt.PermissionStatus.Undetermined:
QApplication.requestPermission(
perm, self, lambda: self._request_camera_permission(device)
)
case Qt.PermissionStatus.Denied:
self._overlay.set_metrics(
{
"error": (
"Camera permission denied.\n"
"Grant access in System Settings > "
"Privacy & Security > Camera"
)
}
)
case Qt.PermissionStatus.Granted:
self._camera.start(device)
self._rebuild_resolution_menu(device)
def _rebuild_resolution_menu(self, device: QCameraDevice) -> None:
self._resolution_menu.clear()
self._resolution_menu.setEnabled(True)
formats = device.videoFormats()
seen: set[tuple[int, int]] = set()
for fmt in formats:
res = fmt.resolution()
key = (res.width(), res.height())
if key not in seen:
seen.add(key)
action = QAction(f"{res.width()}x{res.height()}", self)
action.triggered.connect(
lambda checked, d=device, w=res.width(), h=res.height(): (
self._on_resolution_selected(d, w, h)
)
)
self._resolution_menu.addAction(action)
def _on_resolution_selected(self, device: QCameraDevice, width: int, height: int) -> None:
self._rebuild_fps_menu(device, width, height)
def _rebuild_fps_menu(self, device: QCameraDevice, width: int, height: int) -> None:
self._fps_menu.clear()
self._fps_menu.setEnabled(True)
formats = device.videoFormats()
seen_labels: set[str] = set()
for fmt in formats:
res = fmt.resolution()
if res.width() == width and res.height() == height:
min_fps = round(fmt.minFrameRate())
max_fps = round(fmt.maxFrameRate())
label = f"{max_fps} FPS" if min_fps == max_fps else f"{min_fps}-{max_fps} FPS"
if label not in seen_labels:
seen_labels.add(label)
action = QAction(label, self)
action.triggered.connect(
lambda checked, f=fmt: self._camera.set_camera_format(f)
)
self._fps_menu.addAction(action)
def closeEvent(self, event: QCloseEvent) -> None: # noqa: N802
self._camera.stop()
super().closeEvent(event)

View File

@@ -0,0 +1,73 @@
from __future__ import annotations
from PySide6.QtCore import Qt
from PySide6.QtGui import QColor, QFont, QFontMetrics, QPainter
from PySide6.QtWidgets import QWidget
class OverlayWidget(QWidget):
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self.setAttribute(Qt.WA_TransparentForMouseEvents)
self.setAttribute(Qt.WA_NoSystemBackground)
self._visible = True
self._metrics: dict[str, float | int | str] = {}
def set_visible(self, visible: bool) -> None:
self._visible = visible
self.update()
def set_metrics(self, metrics: dict[str, float | int | str]) -> None:
self._metrics = metrics
self.update()
def paintEvent(self, event) -> None: # noqa: N802
if not self._visible or not self._metrics:
return
painter = QPainter(self)
painter.setRenderHint(QPainter.TextAntialiasing)
if "error" in self._metrics:
self._paint_error(painter)
else:
self._paint_metrics(painter)
def _paint_metrics(self, painter: QPainter) -> None:
lines = [
f"FPS: {self._metrics.get('fps', 0):>7}",
f"Frame: {self._metrics.get('frame_time_ms', 0):>7.1f} ms",
f"Frames: {self._metrics.get('frame_count', 0):>7}",
]
font = QFont("monospace", 11)
painter.setFont(font)
fm = QFontMetrics(font)
text_width = max(fm.horizontalAdvance(line) for line in lines) + 24
text_height = len(lines) * (fm.height() + 6) + 12
painter.fillRect(8, 8, text_width, text_height, QColor(0, 0, 0, 160))
painter.setPen(QColor(0, 255, 0))
for i, line in enumerate(lines):
y = 22 + i * (fm.height() + 6)
painter.drawText(16, y, line)
def _paint_error(self, painter: QPainter) -> None:
msg = str(self._metrics.get("error", "Unknown error"))
lines = msg.split("\n")
font = QFont("monospace", 12)
painter.setFont(font)
fm = QFontMetrics(font)
text_width = max(fm.horizontalAdvance(line) for line in lines) + 24
text_height = len(lines) * (fm.height() + 6) + 12
painter.fillRect(8, 8, text_width, text_height, QColor(0, 0, 0, 200))
painter.setPen(QColor(200, 50, 50))
for i, line in enumerate(lines):
y = 22 + i * (fm.height() + 6)
painter.drawText(16, y, line)

View File

@@ -0,0 +1,10 @@
from __future__ import annotations
from PySide6.QtMultimediaWidgets import QVideoWidget
from PySide6.QtWidgets import QWidget
class VideoWidget(QVideoWidget):
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self.setMinimumSize(320, 240)

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
import time
from collections import deque
from PySide6.QtCore import QObject
from PySide6.QtMultimedia import QVideoFrame
class TelemetryCollector(QObject):
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self._timestamps: deque[float] = deque(maxlen=500)
def on_frame(self, frame: QVideoFrame) -> None:
self._timestamps.append(time.perf_counter())
def metrics(self) -> dict[str, float | int]:
if not self._timestamps:
return {"fps": 0, "frame_time_ms": 0.0, "frame_count": 0}
now = time.perf_counter()
cutoff = now - 1.0
recent = [t for t in self._timestamps if t >= cutoff]
fps = len(recent)
frame_time_ms = 0.0
if len(self._timestamps) >= 2:
diffs = [
self._timestamps[i] - self._timestamps[i - 1]
for i in range(1, len(self._timestamps))
]
frame_time_ms = (sum(diffs) / len(diffs)) * 1000 if diffs else 0.0
return {
"fps": fps,
"frame_time_ms": round(frame_time_ms, 2),
"frame_count": len(self._timestamps),
}

View File

@@ -1,324 +1,150 @@
# Plan działania — MVP Camera Preview (PySide6)
# MVP Implementation Plan — Duck Preview
## Środowisko
## Stack
| Element | Wartość |
|---|---|
| Python | 3.12.10 (venv: `.venv-win`) |
| Framework GUI | PySide6 6.11.0 |
| Dev platform | Windows 11 |
| Target platform | Mac Mini (Intel i7, macOS Ventura) |
| Kamera docelowa | ELP USB Camera |
| Narzędzia | pytest, ruff, colorama |
- **Language:** Python 3.12
- **GUI/Framework:** PySide6 6.11 (QtMultimedia + QtWidgets)
- **Camera backend:** QCamera + QMediaCaptureSession + QVideoSink (native GPU)
- **Rendering:** QWidget (paintEvent) z QPainter — manual render z QVideoFrame → QImage
- **Testing:** pytest
- **Linting/Formatting:** ruff
## Architecture
```
CameraService
└─ QVideoSink.videoFrame ──→ FrameDispatcher.on_frame
├─ VideoWidget.on_frame (render klatki)
├─ TelemetryCollector.on_frame (timestamp)
└─ [Future AI subscribers]
TelemetryCollector.metrics() ──→ OverlayWidget.set_metrics()
(polled co 200ms przez QTimer w app.py)
```
## Kolejność implementacji
### 0. Project scaffolding
`pyproject.toml` — projekt, zależność PySide6, entry point, ruff config, pytest
`duck_preview/__init__.py`, `__main__.py` — entry point `python -m duck_preview`
Podkatalogi: `camera/`, `dispatcher/`, `rendering/`, `telemetry/`
### 1. CameraService (`camera/service.py`)
- Wrapper na `QCamera` + `QMediaCaptureSession` + `QVideoSink`
- `available_cameras()` — static, zwraca listę `QCameraDevice` z `QMediaDevices`
- `start(device)` / `stop()` — zarządzanie cyklem życia kamery
- `set_camera_format(fmt)` — zmiana rozdzielczości/FPS
- `sink` property — QVideoSink do podpięcia dispatchera
- `error_occurred` Signal — propagacja błędów kamery
### 2. FrameDispatcher (`dispatcher/frame_dispatcher.py`)
- Prosty pub/sub: `subscribe(cb)` / `unsubscribe(cb)`
- `on_frame(frame)` — wołany z sygnału QVideoSink, iteruje wszystkich subskrybentów
- Frame dropping: na razie brak (wszystkie callbacki szybkie)
### 3. TelemetryCollector (`telemetry/collector.py`)
- Subskrybent dispatchera
- Zbiera timestampy (`deque`, maxlen=500)
- `metrics()` → dict: fps (klatki z ostatniej sekundy), frame_time_ms (średnia delta między klatkami), frame_count
- Brak CPU usage (out of scope na MVP)
### 4. VideoWidget (`rendering/video_widget.py`)
- `QWidget` z `WA_OpaquePaintEvent`
- `on_frame(frame)``frame.toImage()` → zapis QImage → `update()`
- `paintEvent` — skalowanie z zachowaniem proporcji, centrowanie, letterboxing
- Brak klatek → wyświetla "No camera feed"
### 5. OverlayWidget (`rendering/overlay.py`)
- Przezroczysty QWidget (`WA_TransparentForMouseEvents`)
- Nakładka na video, rysuje tekst metryk (FPS, frame time, frame count)
- Semi-transparentne tło, zielona czcionka Consolas
- Toggle widoczności przez menu Debug
### 6. MainWindow (`main_window.py`)
- `QMainWindow` z menu barem:
- **Camera** — lista dostępnych kamer (dynamiczna, reaguje na `videoInputsChanged`)
- **Resolution** — dostępne rozdzielczości dla wybranej kamery
- **FPS** — dostępne FPS dla wybranej rozdzielczości
- **Debug** — toggle nakładki
- Central widget: GridLayout z VideoWidget + OverlayWidget (stacked)
### 7. App (`app.py`)
- `main()` — tworzy QApplication, instancjonuje i łączy zależności (ręczne DI)
- Wire: camera.sink → dispatcher.on_frame
- Wire: dispatcher → telemetry.on_frame, video_widget.on_frame
- QTimer 200ms: telemetry.metrics() → overlay.set_metrics()
- `__main__.py``from duck_preview.app import main; main()`
---
## Fazy realizacji
### Faza 0 — Projekt i scaffolding
Cel: ustalenie struktury katalogów i modułów przed napisaniem pierwszej linii logiki.
#### 0.1 Struktura projektu
## DI Wiring (ręczne, w app.py)
```
duck-preview2/
├── app/
app = QApplication(sys.argv)
camera = CameraService()
dispatcher = FrameDispatcher()
telemetry = TelemetryCollector()
video_widget = VideoWidget()
overlay = OverlayWidget()
camera.sink.videoFrame.connect(dispatcher.on_frame)
dispatcher.subscribe(telemetry.on_frame)
dispatcher.subscribe(video_widget.on_frame)
window = MainWindow(camera, video_widget, overlay)
# Poll telemetry co 200ms → overlay
metrics_timer = QTimer()
metrics_timer.timeout.connect(lambda: overlay.set_metrics(telemetry.metrics()))
metrics_timer.start(200)
window.show()
app.exec()
```
---
## Struktura plików
```
duck-preview/
├── pyproject.toml
├── notes/
│ ├── 01-mvp-preview.md
│ └── 01-mvp-plan.md
├── duck_preview/
│ ├── __init__.py
│ ├── main.py # entry point
│ ├── config.py # stałe, domyślne ustawienia
│ ├── __main__.py
│ ├── app.py
│ ├── main_window.py
│ ├── camera/
│ │ ├── __init__.py
│ │ ── camera_service.py # QCamera + QMediaCaptureSession
│ └── camera_enumerator.py # wykrywanie dostępnych kamer
│ ├── pipeline/
│ │ ── service.py
├── dispatcher/
│ │ ├── __init__.py
│ │ └── frame_dispatcher.py # dystrybucja klatek do subskrybentów
│ ├── telemetry/
│ │ └── frame_dispatcher.py
│ ├── rendering/
│ │ ├── __init__.py
│ │ ── telemetry_collector.py # zbieranie metryk FPS/frame time/CPU
── overlay/
│ ├── __init__.py
│ │ └── overlay_widget.py # przezroczysta warstwa QWidget
│ └── ui/
│ │ ── video_widget.py
│ └── overlay.py
└── telemetry/
│ ├── __init__.py
── main_window.py # główne okno aplikacji
│ └── menu_bar.py # menu: kamera, rozdzielczość, FPS, debug
├── tests/
├── __init__.py
── test_camera_enumerator.py
│ └── test_telemetry_collector.py
├── notes/
├── requirements.txt
├── requirements-dev.txt
└── pyproject.toml # konfiguracja ruff + pytest
```
#### 0.2 Pliki konfiguracyjne
- `pyproject.toml` — konfiguracja ruff (linter/formatter) i pytest
- `requirements.txt` — zależności produkcyjne (PySide6)
- `requirements-dev.txt` — zależności deweloperskie (pytest, ruff)
- `.gitignore` — aktualizacja o artefakty Pythona
---
### Faza 1 — Camera Service
Cel: stabilne pobranie obrazu z kamery przez QtMultimedia.
#### 1.1 Camera Enumerator
- `QMediaDevices.videoInputs()` — lista dostępnych kamer
- Zwraca listę `QCameraDevice` z nazwą, id i obsługiwanymi formatami
- Obsługa braku kamer (komunikat, nie crash)
- Test jednostkowy: mockowanie `QMediaDevices`
#### 1.2 Camera Service
- Opakowuje `QCamera` + `QMediaCaptureSession`
- API:
- `start(device: QCameraDevice)` — uruchamia kamerę
- `stop()` — zatrzymuje kamerę
- `set_resolution(width, height)` — ustawia format
- `set_fps(fps)` — ustawia docelowy FPS
- `reconnect()` — restart po błędzie
- `QVideoSink` jako punkt odbioru klatek
- Sygnał `frame_ready(QVideoFrame)` do Frame Dispatcher
- Obsługa błędów kamery (`QCamera.errorOccurred`)
#### 1.3 Uwagi platformowe
| Aspekt | Windows 11 (dev) | macOS Ventura (target) |
|---|---|---|
| Backend | DirectShow / Media Foundation | AVFoundation |
| Kamera ELP | USB, standardowy UVC driver | USB, UVC |
| Format klatek | YUYV / MJPEG | YUYV / MJPEG |
| GPU rendering | ANGLE (OpenGL ES) | Metal |
---
### Faza 2 — Frame Dispatcher
Cel: dystrybucja klatek do wielu odbiorców bez blokowania akwizycji.
#### 2.1 Frame Dispatcher
- Wzorzec: publish-subscribe (lista callbacków)
- `subscribe(callback: Callable[[QVideoFrame], None])`
- `unsubscribe(callback)`
- `dispatch(frame: QVideoFrame)` — wywołuje wszystkich subskrybentów
- Klatki NIE są kopiowane — subskrybenci działają na referencji
- Subskrybenci mogą **pominąć klatkę** (tryb drop-if-busy)
- Wywołanie `dispatch` następuje w wątku GUI (slot połączony z `frame_ready`)
#### 2.2 Subskrybenci w Fazie 1
| Subskrybent | Działanie |
|---|---|
| Video Renderer | przekazuje klatkę do `QVideoSink` / `QVideoWidget` |
| Telemetry Collector | mierzy czas, zlicza klatki |
---
### Faza 3 — Video Renderer
Cel: renderowanie klatki w GUI bez zbędnych kopii.
#### 3.1 Podejście
- `QVideoWidget` jako główny widget podglądu
- `QMediaCaptureSession.setVideoOutput(QVideoWidget)` — ścieżka bezpośrednia, zero kopii
- Alternatywnie: `QVideoSink``QGraphicsVideoItem` dla przyszłych overlayów
- Domyślnie: `QVideoWidget` (prosta, niska latencja)
#### 3.2 Wymagania
- Preview nie blokuje wątku GUI
- Obsługa aspect ratio (letter/pillarbox)
- Resize okna bez migotania
---
### Faza 4 — Telemetry Collector
Cel: dokładne metryki pipeline'u wideo.
#### 4.1 Zbierane metryki
| Metryka | Metoda pomiaru |
|---|---|
| Realtime FPS | licznik klatek / okno 1 s |
| Frame time | `time.perf_counter()` między klatkami |
| Frame acquisition time | timestamp wejście frame_ready → dispatch |
| Rendering time | czas `QVideoWidget.update()` (opcjonalnie) |
| Dropped frames | detekcja przez numerację lub timestamp gap |
| CPU usage | `psutil.cpu_percent()` (dodać do requirements) |
| Memory usage | `psutil.virtual_memory()` (opcjonalnie) |
#### 4.2 API
- `TelemetryCollector` — subskrybent Frame Dispatcher
- `on_frame(frame: QVideoFrame)` — rejestruje timestamp klatki
- `get_snapshot() -> TelemetrySnapshot` — aktualny stan metryk (dataclass)
- `update_interval_ms: int` — jak często odświeżać snapshot (domyślnie 500 ms)
- Sygnał `metrics_updated(TelemetrySnapshot)` — emitowany co `update_interval_ms`
#### 4.3 TelemetrySnapshot (dataclass)
```python
@dataclass
class TelemetrySnapshot:
fps: float
frame_time_ms: float
dropped_frames: int
cpu_percent: float
memory_mb: float | None
timestamp: float
── collector.py
└── tests/
├── __init__.py
├── test_dispatcher.py
── test_collector.py
```
---
### Faza 5 — Overlay System
## Edge cases / uwagi
Cel: wyświetlanie metryk na przezroczystej warstwie nad podglądem.
#### 5.1 Architektura
- `OverlayWidget(QWidget)` — przezroczysty widget (`WA_TransparentForMouseEvents`)
- Pozycjonowany absolutnie nad `QVideoWidget` (ten sam parent, wyższy z-index)
- `paintEvent` rysuje semi-przezroczysty prostokąt + tekst z metrykami
- Połączony z sygnałem `metrics_updated` — odświeża tylko gdy dane się zmienią
#### 5.2 Zawartość overlaya (MVP)
```
FPS: 60.0
Frame: 16.7 ms
Drop: 0
CPU: 12.3 %
```
#### 5.3 Sterowalność
- Widoczność overlaya: toggle przez menu Debug
- Pozycja: lewy górny róg (stała w MVP)
- Kolor tła: `rgba(0, 0, 0, 160)`
---
### Faza 6 — GUI / Main Window
Cel: minimalne, funkcjonalne okno aplikacji.
#### 6.1 MainWindow
- `QMainWindow` z `QVideoWidget` jako central widget
- `OverlayWidget` nałożony na video
- Obsługa resize → reposition overlay
- Tytuł okna: `Duck Preview`
#### 6.2 MenuBar
Menu **Camera**:
- Lista wykrytych kamer (radio-style)
- Separator
- Reconnect
Menu **Video**:
- Resolution submenu (pobierane dynamicznie z `QCameraDevice.videoFormats()`)
- FPS submenu
Menu **Debug**:
- Toggle overlay metryk
- Logowanie do konsoli (toggle)
#### 6.3 Startup flow
```
main.py
→ QApplication
→ CameraEnumerator.list_cameras()
→ MainWindow(cameras)
→ CameraService.start(cameras[0]) # pierwsza kamera lub ELP
→ FrameDispatcher.subscribe(telemetry, renderer)
→ app.exec()
```
---
### Faza 7 — Testy i walidacja
#### 7.1 Testy jednostkowe
| Moduł | Co testować |
|---|---|
| `CameraEnumerator` | lista kamer, brak kamer, format danych |
| `TelemetryCollector` | obliczenia FPS, wykrywanie dropów |
| `FrameDispatcher` | subskrypcja, odsubskrypcja, dispatch |
| `TelemetrySnapshot` | poprawność dataclass |
#### 7.2 Testy manualne (Windows dev)
- [ ] Uruchomienie z kamerą laptopa / USB webcam
- [ ] Przełączanie kamer
- [ ] Zmiana rozdzielczości
- [ ] Zmiana FPS
- [ ] Toggle overlay
- [ ] Reconnect po odłączeniu kamery
#### 7.3 Testy na Mac Mini (target)
- [ ] Wykrycie kamery ELP
- [ ] Poprawny format YUYV/MJPEG
- [ ] Wydajność AVFoundation vs DirectShow
- [ ] GPU rendering przez Metal
#### 7.4 Kryteria sukcesu (z PRD)
- Preview stabilny i płynny
- Latencja renderowania niska
- Dane telemetrii dokładne
- GUI responsywne
- Overlay działa poprawnie
- Architektura gotowa na subskrybentów AI
---
## Kolejność implementacji (sprint order)
```
Sprint 1: Faza 0 — scaffolding, pyproject.toml, requirements
Sprint 2: Faza 1 — CameraEnumerator + CameraService (bez GUI)
Sprint 3: Faza 3 — VideoRenderer + MainWindow (preview działa)
Sprint 4: Faza 2 — FrameDispatcher (refactor pipeline)
Sprint 5: Faza 4 — TelemetryCollector
Sprint 6: Faza 5 — OverlayWidget
Sprint 7: Faza 6 — MenuBar (camera/resolution/fps switch)
Sprint 8: Faza 7 — Testy, poprawki, walidacja na Mac Mini
```
---
## Zależności do dodania
```
# requirements.txt
PySide6>=6.7
psutil>=6.0
# requirements-dev.txt
pytest>=8.0
ruff>=0.4
```
---
## Uwagi cross-platform
1. **ELP camera** — kamera UVC, powinna działać bez dodatkowych sterowników na obu platformach. Sprawdzić obsługiwane rozdzielczości i FPS przez `QCameraDevice.videoFormats()`.
2. **Ścieżki absolutne** — unikać `os.path` na korzyść `pathlib.Path`.
3. **Threading** — wszystkie operacje Qt muszą odbywać się w wątku GUI. `TelemetryCollector` może używać `QTimer` zamiast osobnego wątku.
4. **Format klatek** — na macOS AVFoundation preferuje `BGRA` lub `NV12`. Konwersja powinna być leniwa i tylko gdy potrzebna (nie w hot path renderowania).
5. **High DPI** — włączyć `QApplication.setHighDpiScaleFactorRoundingPolicy` dla konsistencji Windows/Mac.
6. **Testowanie bez kamery**`CameraEnumerator` powinien umożliwiać dependency injection / mock dla środowisk CI.
- Brak kamer → menu Camera pokazuje "No cameras detected"
- Brak klatek → VideoWidget pokazuje ciemne tło + napis
- Błąd kamery → CameraService emituje `error_occurred` (na razie tylko log)
- Zamknięcie okna → `closeEvent``camera.stop()`
- QVideoFrame.toImage() kopiuje dane — akceptowalne dla MVP
- Wszystkie obiekty żyją w main thread — brak problemów z threadingiem

View File

@@ -253,7 +253,6 @@ Architecture must support future additions:
* snapshots,
* streaming,
* remote sinks.
* play video files
Without major redesign.

View File

@@ -1,194 +0,0 @@
# Stan aplikacji — MVP Camera Preview
Data: 2026-05-12
Środowisko dev: Windows 11, Python 3.12.10, PySide6 6.11.0
Środowisko docelowe: Mac Mini (Intel i7, macOS Ventura), kamera ELP USB
---
## Zaimplementowane moduły
### Struktura projektu
```
duck-preview2/
├── app/
│ ├── config.py # stałe i domyślne ustawienia
│ ├── main.py # entry point (python -m app.main)
│ ├── camera/
│ │ ├── camera_enumerator.py # wykrywanie kamer przez QMediaDevices
│ │ └── camera_service.py # zarządzanie QCamera + QMediaCaptureSession
│ ├── pipeline/
│ │ └── frame_dispatcher.py # pub/sub dystrybucja QVideoFrame
│ ├── telemetry/
│ │ └── telemetry_collector.py # pomiar FPS, frame time, CPU, pamięci
│ ├── overlay/
│ │ ├── overlay_layer.py # interfejs IOverlayLayer (ABC)
│ │ └── telemetry_overlay.py # implementacja — box z metrykami
│ └── ui/
│ ├── camera_view.py # CameraView — render + kompozycja overlayów
│ ├── main_window.py # główne okno, wiring komponentów
│ └── menu_bar.py # menu: Camera / Video / Debug
├── tests/
│ ├── test_frame_dispatcher.py # 8 testów jednostkowych
│ └── test_telemetry_collector.py # 7 testów jednostkowych
├── pyproject.toml # konfiguracja pytest + ruff
├── requirements.txt # PySide6, psutil
└── requirements-dev.txt # + pytest, ruff
```
### Opis komponentów
#### CameraEnumerator (`app/camera/camera_enumerator.py`)
- Wykrywa dostępne kamery przez `QMediaDevices.videoInputs()`
- Zwraca listę `CameraInfo` z nazwą, id i listą obsługiwanych formatów (rozdzielczość × FPS)
- Formaty deduplikowane i posortowane (największa rozdzielczość pierwsza)
- Obsługa braku kamer bez crasha
#### CameraService (`app/camera/camera_service.py`)
- Opakowuje `QCamera` + `QMediaCaptureSession` + `QVideoSink`
- API: `start(CameraInfo)`, `stop()`, `reconnect()`, `set_resolution()`, `set_fps()`
- Algorytm doboru formatu: score-based (priorytet: dopasowanie rozdzielczości, potem FPS)
- Sygnały: `frame_ready(QVideoFrame)`, `camera_started`, `camera_stopped`, `camera_error`
#### FrameDispatcher (`app/pipeline/frame_dispatcher.py`)
- Pub/sub: `subscribe(callback, drop_if_busy=True)` / `unsubscribe(callback)`
- `drop_if_busy=True` — klatka pomijana jeśli subskrybent jest zajęty (render)
- `drop_if_busy=False` — każda klatka dociera (telemetria)
- Odporny na wyjątki w subskrybentach (jeden nie blokuje pozostałych)
#### TelemetryCollector (`app/telemetry/telemetry_collector.py`)
- Subskrybuje każdą klatkę (`drop_if_busy=False`)
- Mierzy: FPS (okno 1 s), średni frame time (ring-buffer 120 próbek), dropped frames (heurystyka 2.5× avg)
- CPU: `psutil.Process.cpu_percent()` — tylko nasz proces, inicjalizowany w `__init__` (warmup)
- RAM: `memory_info().wset` (Windows private working set) lub `rss` (macOS/Linux)
- Emituje `metrics_updated(TelemetrySnapshot)` co 500 ms przez `QTimer`
#### IOverlayLayer (`app/overlay/overlay_layer.py`)
- Abstrakcja (ABC) dla pluggable overlayów
- Interface: `paint(painter: QPainter, video_rect: QRect)`, `visible: bool`, `name: str`
- Nowe overlaye nie wymagają modyfikacji żadnego istniejącego kodu
#### TelemetryOverlay (`app/overlay/telemetry_overlay.py`)
- Implementacja `IOverlayLayer`
- Rysuje semi-przezroczysty box z metrykami w lewym górnym rogu video
- Slot `on_metrics_updated(TelemetrySnapshot)` — odbiera dane z TelemetryCollector
- Format: FPS, Frame time, Drop count, CPU %, Mem MB
#### CameraView (`app/ui/camera_view.py`)
- Zwykły `QWidget` (nie `QVideoWidget`) — render przez `QPainter` w `paintEvent`
- Odbiera `QVideoFrame` przez slot `on_frame()`, konwertuje do `QImage.Format_RGB32`
- Letterboxing z zachowaniem aspect ratio
- Rejestr overlayów: `add_overlay_layer()`, `remove_overlay_layer()`, `set_all_overlays_visible()`
- W `paintEvent`: rysuje klatkę → iteruje po warstwach, każda dostaje `painter.save()/restore()`
#### MainWindow (`app/ui/main_window.py`)
- Wires together wszystkie komponenty
- Minimalny status bar (nazwa kamery, błędy)
- `closeEvent``CameraService.stop()`
#### AppMenuBar (`app/ui/menu_bar.py`)
- Menu **Camera**: lista wykrytych kamer (radio), Reconnect
- Menu **Video**: Resolution submenu, FPS submenu (pobierane z `QCameraDevice.videoFormats()`)
- Menu **Debug**: Show Overlay (toggle), Console Logging (toggle poziom logowania)
---
## Co działało na Windows 11 (dev)
- Wykrycie wbudowanej kamery laptopa
- Podgląd w czasie rzeczywistym
- Przełączanie rozdzielczości i FPS z menu
- Overlay z metrykami widoczny i aktualizowany
- Toggle overlay przez Debug menu
- Logowanie do konsoli przez Debug menu
- Reconnect po zmianie kamery
- Letterboxing przy resize okna
---
## Co próbowaliśmy i nie wyszło
### QVideoWidget jako renderer (porzucone)
**Podejście:** Użycie `QVideoWidget` jako centralnego widgetu + nałożenie `OverlayWidget` (child lub sibling).
**Problem:** Na Windows `QVideoWidget` tworzy natywne okno HWND z powierzchnią D3D (Media Foundation backend). Natywna powierzchnia jest rysowana poza hierarchią Qt i zawsze przykrywa wszystkie `QWidget` — niezależnie od:
- kolejności z-order (`raise_()`)
- rodzica (child/sibling)
- flag okna
- `WA_TranslucentBackground` / `WA_NoSystemBackground`
Żaden `QWidget` nie może się wyświetlić nad `QVideoWidget` na Windows.
**Próby ratowania:**
1. `OverlayWidget` jako child `QVideoWidget` — zasłonięty przez natywną powierzchnię
2. Kontener `QWidget` z `QVideoWidget` i `OverlayWidget` jako rodzeństwo — nadal zasłonięty
3. `setWindowFlags(FramelessWindowHint)` na child widget — odrywa widget od rodzica, tworzy osobne (niewidoczne) okno top-level
**Rozwiązanie:** Porzucenie `QVideoWidget`. Własny `CameraView(QWidget)` odbiera klatki przez `QVideoSink`, konwertuje do `QImage` i rysuje przez `QPainter`. Overlay w tym samym `paintEvent` — brak konfliktu z natywnym renderowaniem.
### OverlayWidget jako osobny QWidget (porzucone)
**Podejście:** `OverlayWidget` z `WA_TransparentForMouseEvents` i `WA_TranslucentBackground` nałożony na video.
**Problem:** Poza konfliktem z `QVideoWidget`, `WA_TranslucentBackground` na child widget działa tylko gdy rodzic też jest transparentny — Qt nie komponuje dziecka z tłem rodzica innym niż własne. W praktyce overlay był niewidoczny lub zasłaniał video czarnym prostokątem.
---
## Znane ograniczenia i uwagi
### Pomiar CPU
`psutil.Process.cpu_percent()` zwraca procent **względem jednego rdzenia** (np. 50% = pół rdzenia). Na wielordzeniowym procesorze 15% w Task Managerze (uśrednione po rdzeniach) może odpowiadać 50% na jednym rdzeniu. To nie jest błąd — to różna metodologia. Task Manager pokazuje `total_cpu / num_cores`, psutil pokazuje `cpu_time / wall_time`.
Jeśli potrzebny jest widok "jak Task Manager": `process.cpu_percent() / psutil.cpu_count()`.
### Pomiar RAM
`memory_info().wset` (Windows) = Private Working Set = to co Task Manager pokazuje w kolumnie "Pamięć". RSS zawiera też współdzielone biblioteki (Qt DLLs ~40 MB) i dlatego było zawyżone. Na macOS używane jest `rss` (tam `wset` nie istnieje).
### Wydajność konwersji klatek
`QVideoFrame.toImage()` + `convertToFormat(RGB32)` wykonuje się na CPU. Przy 1080p60 to koszt ~1-3 ms/klatkę. Dla MVP akceptowalny. Przy przyszłej integracji YOLO warto rozważyć bezpośredni dostęp do danych przez `QVideoFrame.map()` i przekazywanie raw bufora do GPU.
### Brak testów integracyjnych
Testy jednostkowe pokrywają `FrameDispatcher` i `TelemetryCollector` w izolacji (bez Qt event loop). Brak testów `CameraView`, `CameraService` i `MainWindow` — wymagałyby `QApplication` i mockowania urządzeń.
---
## Stan zgodności z PRD (01-mvp-preview.md)
| Wymaganie | Status |
|---|---|
| Realtime camera preview | Zaimplementowane |
| Camera switching | Zaimplementowane |
| Resolution selection | Zaimplementowane |
| FPS selection | Zaimplementowane |
| Reconnect/restart | Zaimplementowane |
| Realtime FPS metric | Zaimplementowane |
| Frame time metric | Zaimplementowane |
| Dropped frames detection | Zaimplementowane (heurystyka) |
| CPU usage metric | Zaimplementowane |
| Memory usage metric | Zaimplementowane |
| Overlay system | Zaimplementowane (IOverlayLayer) |
| Performance metrics display | Zaimplementowane (TelemetryOverlay) |
| Minimal GUI | Zaimplementowane |
| Camera menu | Zaimplementowane |
| Resolution/FPS menu | Zaimplementowane |
| Debug/telemetry options | Zaimplementowane |
| Architecture ready for AI subscribers | Zaimplementowane (IOverlayLayer + FrameDispatcher) |
| Low latency preview | Zaimplementowane (drop-if-busy w dispatcher) |
| Non-blocking GUI thread | Zaimplementowane |
---
## Następne kroki (poza MVP)
- Walidacja na Mac Mini z kamerą ELP (AVFoundation backend, macOS Ventura)
- Snapshot / recording
- Integracja YOLO: `YoloBboxOverlay(IOverlayLayer)` + worker w osobnym procesie
- Integracja OCR
- Optymalizacja konwersji klatek (QVideoFrame.map() → numpy → GPU)
- Testy integracyjne z QApplication + mock camera

212
notes/02-mvp-mac-plan.md Normal file
View File

@@ -0,0 +1,212 @@
# Plan — macOS fix + QVideoWidget + QVideoSink dual output
## Cel
Przywrócić działanie kamery na macOS (Elgato) przez:
1. Przejście z manualnego `QPainter` renderowania na natywny `QVideoWidget`
2. Dodanie `QVideoSink` jako drugiego wyjścia (frame access dla telemetrii)
3. Obsługa `QCameraPermission` (Qt 6.5+)
4. Dokumentacja packagingu przez `pyside6-deploy` na macOS
## Kolejność implementacji
### 1. camera/service.py
```python
class CameraService(QObject):
error_occurred = Signal(str)
def __init__(self, parent=None):
super().__init__(parent)
self._session = QMediaCaptureSession()
self._camera: QCamera | None = None
def set_video_widget(self, widget: QVideoWidget) -> None:
self._session.setVideoOutput(widget)
def set_video_sink(self, sink: QVideoSink) -> None:
self._session.setVideoSink(sink)
@property
def session(self) -> QMediaCaptureSession:
return self._session
@staticmethod
def available_cameras() -> list[QCameraDevice]:
return QMediaDevices.videoInputs()
def is_active(self) -> bool:
return self._camera is not None and self._camera.isActive()
def start(self, device: QCameraDevice) -> None:
self.stop()
self._camera = QCamera(device, self)
self._camera.errorOccurred.connect(self._on_error)
self._session.setCamera(self._camera)
self._camera.start()
def stop(self) -> None:
if self._camera is not None:
self._camera.stop()
self._session.setCamera(None)
self._camera.deleteLater()
self._camera = None
def set_camera_format(self, fmt: QCameraFormat) -> None:
if self._camera is not None:
self._camera.setCameraFormat(fmt)
def _on_error(self, error, error_string):
self.error_occurred.emit(error_string)
```
Zmiany:
- Usunięto `self._sink` z `__init__`
- Usunięto `self._session.setVideoOutput(self._sink)`
- Dodano `set_video_widget(widget)``session.setVideoOutput(widget)`
- Dodano `set_video_sink(sink)``session.setVideoSink(sink)`
### 2. rendering/video_widget.py
```python
from __future__ import annotations
from PySide6.QtMultimediaWidgets import QVideoWidget
class VideoWidget(QVideoWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.setMinimumSize(320, 240)
```
Zmiany:
- Dziedziczy po `QVideoWidget` zamiast `QWidget`
- Usunięto `on_frame(frame)` — nie potrzeba, renderowanie natywne
- Usunięto `paintEvent` — nie potrzeba, robi to Qt
- Usunięto `QImage`, `QPainter`, `SmoothPixmapTransform`
### 3. rendering/overlay.py
```python
font = QFont("monospace", 11) # zamiast "Consolas"
```
Dodatkowo: obsługa błędu w `paintEvent`:
- Jeśli `self._metrics` ma klucz `"error"` → pomiń normalne metryki, narysuj czerwony komunikat błędu
- Użyj `QColor(180, 40, 40)` zamiast zielonego
### 4. app.py — nowe DI wiring
```python
def main():
app = QApplication(sys.argv)
app.setApplicationName("Duck Preview")
camera = CameraService()
video_sink = QVideoSink()
dispatcher = FrameDispatcher()
telemetry = TelemetryCollector()
video_widget = VideoWidget()
overlay = OverlayWidget()
camera.set_video_widget(video_widget)
camera.set_video_sink(video_sink)
video_sink.videoFrameChanged.connect(dispatcher.on_frame)
dispatcher.subscribe(telemetry.on_frame)
window = MainWindow(camera, video_widget, overlay)
# Wire error → overlay
camera.error_occurred.connect(
lambda msg: overlay.set_metrics({"error": msg})
)
# Poll telemetry → overlay
metrics_timer = QTimer()
metrics_timer.timeout.connect(
lambda: overlay.set_metrics(telemetry.metrics())
)
metrics_timer.start(200)
window.show()
sys.exit(app.exec())
```
Zmiany:
- Tworzy `QVideoSink` jawnie
- `camera.set_video_widget(video_widget)` + `camera.set_video_sink(video_sink)`
- `video_sink.videoFrameChanged → dispatcher.on_frame`
- Usunięto `camera.sink` (property nie istnieje)
- `camera.error_occurred → overlay.set_metrics({"error": msg})`
### 5. main_window.py — permission flow
```python
# Importy
from PySide6.QtCore import QCameraPermission, Qt
from PySide6.QtWidgets import QApplication
# W _on_camera_selected:
def _on_camera_selected(self, device: QCameraDevice) -> None:
self._request_camera_permission(device)
def _request_camera_permission(self, device: QCameraDevice) -> None:
perm = QCameraPermission()
match QApplication.checkPermission(perm):
case Qt.PermissionStatus.Undetermined:
QApplication.requestPermission(perm, self,
lambda: self._request_camera_permission(device))
case Qt.PermissionStatus.Denied:
self._overlay.set_metrics({
"error": "Camera permission denied.\n"
"Grant access in System Settings > "
"Privacy & Security > Camera"
})
case Qt.PermissionStatus.Granted:
self._camera.start(device)
self._rebuild_resolution_menu(device)
```
### 6. pyside6-deploy.toml
```toml
[app]
script = "duck_preview/__main__.py"
name = "Duck Preview"
bundle_identifier = "com.bartool.duck-preview"
categories = "public.app-category.photography"
platforms = ["macos"]
```
### 7. Po zmianach
- `ruff check .` — czysty
- `ruff format .` — sformatowany
- `pytest -q` — 5 passed
---
## Podsumowanie architektury końcowej
```
CameraService
├─ session.setVideoOutput(VideoWidget) → natywne GPU rendering
└─ session.setVideoSink(VideoSink) → frame access
└─ videoFrameChanged
→ FrameDispatcher
├─ TelemetryCollector
└─ [future AI]
MainWindow
├─ _on_camera_selected
│ → QCameraPermission check (Undetermined/Denied/Granted)
│ → CameraService.start(device)
├─ error_occurred → OverlayWidget.set_metrics({"error": ...})
└─ QTimer 200ms → TelemetryCollector.metrics() → OverlayWidget
OverlayWidget
├─ normal: zielone metryki (FPS, frame time, frame count)
└─ error: czerwony komunikat (np. brak permisji)
```

126
notes/02-mvp-mac.md Normal file
View File

@@ -0,0 +1,126 @@
# macOS — Camera on macOS with PySide6
## Problem
Aplikacja uruchomiona w interpreted mode (`python -m duck_preview`) na macOS z kamerą Elgato nie wyświetla obrazu.
## Przyczyna
Oficjalny przykład PySide6 (`camera.qml` / examples/multimedia) zawiera kod:
```python
if sys.platform == "darwin":
is_nuitka = "__compiled__" in globals()
if not is_nuitka and sys.platform == "darwin":
print("This example does not work on macOS when Python is run "
"in interpreted mode. For this example to work on macOS, "
"package the example using pyside6-deploy")
sys.exit(0)
```
**macOS → AVFoundation → wymaga spakowania przez Nuitka/pyside6-deploy.**
`QCamera` na macOS potrzebuje properly bundled app structure (Info.plist, entitlements, code signing). W interpreted mode Python nie ma tego contextu.
## Rozwiązanie
Pakować aplikację przez `pyside6-deploy` (Nuitka) na macOS. Na Windows działa bez pakowania.
---
## Qt Permission API (`QCameraPermission`)
Od Qt 6.5+ dostępne jest `QCameraPermission` — nowoczesne API do proszenia o zgodę kamery.
### API
```python
from PySide6.QtCore import QCameraPermission, Qt
from PySide6.QtWidgets import QApplication
perm = QCameraPermission()
match QApplication.checkPermission(perm):
case Qt.PermissionStatus.Undetermined:
# Prosimy o zgodę → callback wywoła init ponownie
QApplication.requestPermission(perm, parent, callback)
return
case Qt.PermissionStatus.Denied:
# overlay: "Camera permission denied"
case Qt.PermissionStatus.Granted:
# uruchom kamerę
```
### Permission flow
```
User → wybiera kamerę w menu
→ MainWindow._on_camera_selected(device)
→ checkPermission(QCameraPermission)
├─ Undetermined → requestPermission(camera, parent, callback)
│ └─ callback → _on_camera_selected ponownie
├─ Denied → overlay: "Camera permission denied. Grant access in System Settings > Privacy & Security > Camera"
└─ Granted → CameraService.start(device)
```
---
## QVideoWidget + QVideoSink — dual output
PySide6 6.11 ma osobne metody w `QMediaCaptureSession`:
| Metoda | Przeznaczenie |
|--------|---------------|
| `setVideoOutput(QVideoWidget*)` | Natywne renderowanie (GPU) |
| `setVideoSink(QVideoSink*)` | Dostęp do klatek (telemetria, AI) |
Oba działają **równolegle** — nie ma konfliktu.
```
QMediaCaptureSession
├─ setVideoOutput(QVideoWidget) → natywne renderowanie
└─ setVideoSink(QVideoSink) → frame access
└─ videoFrameChanged → FrameDispatcher
```
QVideoSink dostarcza sygnał `videoFrameChanged(QVideoFrame)` — na to podpina się FrameDispatcher, a ten rozsyła do TelemetryCollector i przyszłych AI subscriberów.
---
## macOS — packaging
### pyside6-deploy
```bash
pip install nuitka
pyside6-deploy duck_preview/__main__.py --name "Duck Preview"
```
### pyside6-deploy.toml
```toml
[app]
script = "duck_preview/__main__.py"
name = "Duck Preview"
bundle_identifier = "com.bartool.duck-preview"
categories = "public.app-category.photography"
platforms = ["macos"]
```
Po spakowaniu powstaje `Duck Preview.app` — standalone bundle z dostępem do AVFoundation.
### Windows
Na Windows działa bez pakowania — `python -m duck_preview` w venv.
---
## Font fallback
`Consolas` nie istnieje na macOS. Używać `"monospace"` (generic font family — Qt mapuje na Menlo na macOS, Consolas na Windows).
## Error state w Overlay
Overlay wspiera wyświetlanie błędów (np. brak permisji):
- Normalny stan → zielone metryki (FPS, frame times)
- Error state → czerwony komunikat
- Przełączanie przez `overlay.set_metrics({"error": "message"})`

View File

@@ -1,43 +1,21 @@
[project]
name = "duck-preview"
version = "0.1.0"
description = "Realtime camera preview application with telemetry"
description = "Realtime camera preview application with performance monitoring"
requires-python = ">=3.12"
dependencies = [
"PySide6>=6.7",
"psutil>=6.0",
"PySide6>=6.11",
]
[project.scripts]
duck-preview = "app.main:main"
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_functions = ["test_*"]
addopts = "-v"
duck-preview = "duck_preview.app:main"
[tool.ruff]
target-version = "py312"
line-length = 100
[tool.ruff.lint]
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"I", # isort
"UP", # pyupgrade
"B", # flake8-bugbear
"N", # pep8-naming
]
ignore = [
"B008", # do not perform function calls in default arguments
]
select = ["E", "F", "I", "N", "W"]
[tool.ruff.lint.isort]
known-first-party = ["app"]
[tool.ruff.format]
quote-style = "double"
indent-style = "space"
[tool.pytest.ini_options]
testpaths = ["tests"]

6
pyside6-deploy.toml Normal file
View File

@@ -0,0 +1,6 @@
[app]
script = "duck_preview/__main__.py"
name = "Duck Preview"
bundle_identifier = "com.bartool.duck-preview"
categories = "public.app-category.photography"
platforms = ["macos"]

View File

@@ -1,3 +0,0 @@
-r requirements.txt
pytest>=8.0
ruff>=0.4

View File

@@ -1,2 +0,0 @@
PySide6>=6.7
psutil>=6.0

22
tests/test_collector.py Normal file
View File

@@ -0,0 +1,22 @@
import time
from unittest.mock import MagicMock
from duck_preview.telemetry.collector import TelemetryCollector
def test_metrics_empty():
collector = TelemetryCollector()
metrics = collector.metrics()
assert metrics == {"fps": 0, "frame_time_ms": 0.0, "frame_count": 0}
def test_metrics_after_frames():
collector = TelemetryCollector()
mock = MagicMock()
for _ in range(30):
collector.on_frame(mock)
time.sleep(0.002)
metrics = collector.metrics()
assert metrics["frame_count"] >= 30
assert metrics["frame_time_ms"] > 0

47
tests/test_dispatcher.py Normal file
View File

@@ -0,0 +1,47 @@
from duck_preview.dispatcher.frame_dispatcher import FrameDispatcher
def test_subscribe_notified():
dispatcher = FrameDispatcher()
received = []
def cb(frame):
received.append(frame)
dispatcher.subscribe(cb)
dispatcher.on_frame("test")
assert received == ["test"]
def test_unsubscribe_not_notified():
dispatcher = FrameDispatcher()
received = []
def cb(frame):
received.append(frame)
dispatcher.subscribe(cb)
dispatcher.unsubscribe(cb)
dispatcher.on_frame("test")
assert received == []
def test_multiple_subscribers():
dispatcher = FrameDispatcher()
received_1 = []
received_2 = []
def cb1(frame):
received_1.append(frame)
def cb2(frame):
received_2.append(frame)
dispatcher.subscribe(cb1)
dispatcher.subscribe(cb2)
dispatcher.on_frame("test")
assert received_1 == ["test"]
assert received_2 == ["test"]

View File

@@ -1,76 +0,0 @@
"""Tests for FrameDispatcher."""
from __future__ import annotations
from unittest.mock import MagicMock, patch
from app.pipeline.frame_dispatcher import FrameDispatcher
def _make_frame():
frame = MagicMock()
frame.isValid.return_value = True
return frame
class TestFrameDispatcher:
def setup_method(self):
# FrameDispatcher is a QObject — needs QApplication
# Use minimal mock to avoid Qt dependency in unit tests
with patch("app.pipeline.frame_dispatcher.QObject.__init__", return_value=None):
self.dispatcher = FrameDispatcher.__new__(FrameDispatcher)
self.dispatcher._subscribers = []
self.dispatcher._frame_count = 0
self.dispatcher._last_dispatch_time = 0.0
def test_subscribe_adds_subscriber(self):
cb = MagicMock()
self.dispatcher.subscribe(cb)
assert self.dispatcher.subscriber_count() == 1
def test_subscribe_same_callback_twice_is_noop(self):
cb = MagicMock()
self.dispatcher.subscribe(cb)
self.dispatcher.subscribe(cb)
assert self.dispatcher.subscriber_count() == 1
def test_unsubscribe_removes_subscriber(self):
cb = MagicMock()
self.dispatcher.subscribe(cb)
self.dispatcher.unsubscribe(cb)
assert self.dispatcher.subscriber_count() == 0
def test_unsubscribe_nonexistent_does_not_raise(self):
cb = MagicMock()
self.dispatcher.unsubscribe(cb) # should not raise
def test_dispatch_calls_all_subscribers(self):
cb1 = MagicMock()
cb2 = MagicMock()
self.dispatcher.subscribe(cb1)
self.dispatcher.subscribe(cb2)
frame = _make_frame()
self.dispatcher.dispatch(frame)
cb1.assert_called_once_with(frame)
cb2.assert_called_once_with(frame)
def test_dispatch_increments_frame_count(self):
frame = _make_frame()
self.dispatcher.dispatch(frame)
self.dispatcher.dispatch(frame)
assert self.dispatcher.frame_count == 2
def test_dispatch_with_no_subscribers_does_not_raise(self):
frame = _make_frame()
self.dispatcher.dispatch(frame) # should not raise
def test_subscriber_exception_does_not_stop_others(self):
def bad_cb(f):
raise RuntimeError("boom")
good_cb = MagicMock()
self.dispatcher.subscribe(bad_cb)
self.dispatcher.subscribe(good_cb)
frame = _make_frame()
self.dispatcher.dispatch(frame) # should not raise
good_cb.assert_called_once_with(frame)

View File

@@ -1,139 +0,0 @@
"""Tests for TelemetryCollector."""
from __future__ import annotations
import time
from collections import deque
from unittest.mock import MagicMock, patch
class TestTelemetryCollector:
"""Test telemetry calculations in isolation (no Qt event loop required)."""
def _make_collector(self, cpu_count: int = 8):
"""Construct a TelemetryCollector bypassing Qt machinery."""
from app.telemetry.telemetry_collector import TelemetryCollector
with patch.object(TelemetryCollector, "__init__", return_value=None):
col = TelemetryCollector.__new__(TelemetryCollector)
col._frame_times = deque(maxlen=120)
col._last_frame_time = 0.0
col._total_frames = 0
col._dropped_frames = 0
col._fps_window = deque()
col._fps_window_size_s = 1.0
col._target_fps = None
col._cpu_count = cpu_count
col._process = MagicMock()
# Simulate Windows: wset takes priority over rss
mem_info = MagicMock()
mem_info.wset = 50 * 1024 * 1024 # 50 MB private working set
mem_info.rss = 70 * 1024 * 1024 # RSS (larger, includes shared)
col._process.memory_info.return_value = mem_info
col._process.cpu_percent.return_value = 0.0
return col
def test_initial_snapshot_has_zero_fps(self):
col = self._make_collector()
snap = col._compute_snapshot()
assert snap.fps == 0.0
def test_fps_counts_frames_in_window(self):
col = self._make_collector()
now = time.perf_counter()
for i in range(30):
col._fps_window.append(now - 0.9 + i * 0.03)
snap = col._compute_snapshot()
assert snap.fps == 30.0
def test_fps_excludes_old_frames(self):
col = self._make_collector()
now = time.perf_counter()
# 10 frames older than 1 second — should not count
for i in range(10):
col._fps_window.append(now - 2.0 + i * 0.05)
# 5 frames within the last second
for i in range(5):
col._fps_window.append(now - 0.4 + i * 0.05)
snap = col._compute_snapshot()
assert snap.fps == 5.0
def test_frame_time_average(self):
col = self._make_collector()
interval = 0.0333
for _ in range(10):
col._frame_times.append(interval)
snap = col._compute_snapshot()
assert abs(snap.frame_time_ms - interval * 1000) < 0.1
def test_drop_detection(self):
col = self._make_collector()
normal_interval = 0.016
now = time.perf_counter()
col._last_frame_time = now - normal_interval * 10
for _ in range(9):
col._last_frame_time += normal_interval
col._frame_times.append(normal_interval)
col._last_frame_time = now
with patch("app.telemetry.telemetry_collector.time") as mock_time:
col._last_frame_time = now
big_delta = normal_interval * 5 # 5× average → drop
mock_time.perf_counter.return_value = now + big_delta
delta = big_delta
col._frame_times.append(delta)
avg = sum(col._frame_times) / len(col._frame_times)
if delta > avg * 2.5:
col._dropped_frames += 1
assert col._dropped_frames == 1
def test_reset_counters(self):
col = self._make_collector()
col._total_frames = 100
col._dropped_frames = 5
col._frame_times.append(0.016)
col._fps_window.append(time.perf_counter())
col.reset_counters()
assert col._total_frames == 0
assert col._dropped_frames == 0
assert len(col._frame_times) == 0
assert len(col._fps_window) == 0
def test_snapshot_memory_mb(self):
col = self._make_collector()
snap = col._compute_snapshot()
assert snap.memory_mb == 50.0
def test_cpu_sys_is_core_divided_by_cpu_count(self):
col = self._make_collector(cpu_count=8)
col._process.cpu_percent.return_value = 80.0 # 80% of one core
snap = col._compute_snapshot()
assert snap.cpu_percent_core == 80.0
assert snap.cpu_percent_sys == round(80.0 / 8, 1)
def test_cpu_sys_never_exceeds_100_on_single_core_machine(self):
col = self._make_collector(cpu_count=1)
col._process.cpu_percent.return_value = 95.0
snap = col._compute_snapshot()
assert snap.cpu_percent_sys == snap.cpu_percent_core == 95.0
def test_cpu_sys_le_cpu_core(self):
"""cpu_percent_sys must always be <= cpu_percent_core."""
col = self._make_collector(cpu_count=4)
col._process.cpu_percent.return_value = 150.0
snap = col._compute_snapshot()
assert snap.cpu_percent_sys <= snap.cpu_percent_core
def test_target_fps_none_by_default(self):
col = self._make_collector()
snap = col._compute_snapshot()
assert snap.target_fps is None
def test_set_target_fps_reflected_in_snapshot(self):
col = self._make_collector()
col.set_target_fps(60.0)
snap = col._compute_snapshot()
assert snap.target_fps == 60.0