Implement initial structure and core functionality for Duck Preview application

This commit is contained in:
2026-05-12 07:10:11 +02:00
parent be85d7ca31
commit 58fff52d31
18 changed files with 610 additions and 0 deletions

0
duck_preview/__init__.py Normal file
View File

3
duck_preview/__main__.py Normal file
View File

@@ -0,0 +1,3 @@
from duck_preview.app import main
main()

42
duck_preview/app.py Normal file
View File

@@ -0,0 +1,42 @@
from __future__ import annotations
import sys
from PySide6.QtCore import QTimer
from PySide6.QtWidgets import QApplication
from duck_preview.camera.service import CameraService
from duck_preview.dispatcher.frame_dispatcher import FrameDispatcher
from duck_preview.main_window import MainWindow
from duck_preview.rendering.overlay import OverlayWidget
from duck_preview.rendering.video_widget import VideoWidget
from duck_preview.telemetry.collector import TelemetryCollector
def main() -> None:
app = QApplication(sys.argv)
app.setApplicationName("Duck Preview")
camera = CameraService()
dispatcher = FrameDispatcher()
telemetry = TelemetryCollector()
video_widget = VideoWidget()
overlay = OverlayWidget()
camera.sink.videoFrameChanged.connect(dispatcher.on_frame)
dispatcher.subscribe(telemetry.on_frame)
dispatcher.subscribe(video_widget.on_frame)
window = MainWindow(camera, video_widget, overlay)
metrics_timer = QTimer()
metrics_timer.timeout.connect(lambda: overlay.set_metrics(telemetry.metrics()))
metrics_timer.start(200)
window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()

View File

View File

@@ -0,0 +1,58 @@
from __future__ import annotations
from PySide6.QtCore import QObject, Signal
from PySide6.QtMultimedia import (
QCamera,
QCameraDevice,
QCameraFormat,
QMediaCaptureSession,
QMediaDevices,
QVideoSink,
)
class CameraService(QObject):
error_occurred = Signal(str)
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self._session = QMediaCaptureSession()
self._sink = QVideoSink()
self._session.setVideoOutput(self._sink)
self._camera: QCamera | None = None
@property
def sink(self) -> QVideoSink:
return self._sink
@property
def session(self) -> QMediaCaptureSession:
return self._session
@staticmethod
def available_cameras() -> list[QCameraDevice]:
return QMediaDevices.videoInputs()
def is_active(self) -> bool:
return self._camera is not None and self._camera.isActive()
def start(self, device: QCameraDevice) -> None:
self.stop()
self._camera = QCamera(device, self)
self._camera.errorOccurred.connect(self._on_error)
self._session.setCamera(self._camera)
self._camera.start()
def stop(self) -> None:
if self._camera is not None:
self._camera.stop()
self._session.setCamera(None)
self._camera.deleteLater()
self._camera = None
def set_camera_format(self, fmt: QCameraFormat) -> None:
if self._camera is not None:
self._camera.setCameraFormat(fmt)
def _on_error(self, error: QCamera.Error, error_string: str) -> None:
self.error_occurred.emit(error_string)

View File

View File

@@ -0,0 +1,22 @@
from __future__ import annotations
from collections.abc import Callable
from PySide6.QtCore import QObject
from PySide6.QtMultimedia import QVideoFrame
class FrameDispatcher(QObject):
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self._subscribers: list[Callable[[QVideoFrame], None]] = []
def subscribe(self, callback: Callable[[QVideoFrame], None]) -> None:
self._subscribers.append(callback)
def unsubscribe(self, callback: Callable[[QVideoFrame], None]) -> None:
self._subscribers.remove(callback)
def on_frame(self, frame: QVideoFrame) -> None:
for cb in self._subscribers:
cb(frame)

118
duck_preview/main_window.py Normal file
View File

@@ -0,0 +1,118 @@
from __future__ import annotations
from PySide6.QtGui import QAction, QCloseEvent
from PySide6.QtMultimedia import QCameraDevice, QMediaDevices
from PySide6.QtWidgets import QGridLayout, QMainWindow, QWidget
from duck_preview.camera.service import CameraService
from duck_preview.rendering.overlay import OverlayWidget
from duck_preview.rendering.video_widget import VideoWidget
class MainWindow(QMainWindow):
def __init__(
self,
camera_service: CameraService,
video_widget: VideoWidget,
overlay_widget: OverlayWidget,
parent: QWidget | None = None,
) -> None:
super().__init__(parent)
self._camera = camera_service
self._video_widget = video_widget
self._overlay = overlay_widget
self._media_devices = QMediaDevices()
self.setWindowTitle("Duck Preview")
self.resize(1280, 720)
central = QWidget()
self.setCentralWidget(central)
layout = QGridLayout(central)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
layout.addWidget(video_widget, 0, 0)
layout.addWidget(overlay_widget, 0, 0)
self._setup_menus()
self._media_devices.videoInputsChanged.connect(self._rebuild_camera_menu)
def _setup_menus(self) -> None:
menu_bar = self.menuBar()
self._camera_menu = menu_bar.addMenu("Camera")
self._rebuild_camera_menu()
self._resolution_menu = menu_bar.addMenu("Resolution")
self._resolution_menu.setEnabled(False)
self._fps_menu = menu_bar.addMenu("FPS")
self._fps_menu.setEnabled(False)
debug_menu = menu_bar.addMenu("Debug")
toggle_overlay = QAction("Show Metrics", self)
toggle_overlay.setCheckable(True)
toggle_overlay.setChecked(True)
toggle_overlay.triggered.connect(self._overlay.set_visible)
debug_menu.addAction(toggle_overlay)
def _rebuild_camera_menu(self) -> None:
self._camera_menu.clear()
cameras = CameraService.available_cameras()
for device in cameras:
action = QAction(device.description(), self)
action.triggered.connect(lambda checked, d=device: self._on_camera_selected(d))
self._camera_menu.addAction(action)
if not cameras:
action = QAction("No cameras detected", self)
action.setEnabled(False)
self._camera_menu.addAction(action)
def _on_camera_selected(self, device: QCameraDevice) -> None:
self._camera.start(device)
self._rebuild_resolution_menu(device)
def _rebuild_resolution_menu(self, device: QCameraDevice) -> None:
self._resolution_menu.clear()
self._resolution_menu.setEnabled(True)
formats = device.videoFormats()
seen: set[tuple[int, int]] = set()
for fmt in formats:
res = fmt.resolution()
key = (res.width(), res.height())
if key not in seen:
seen.add(key)
action = QAction(f"{res.width()}x{res.height()}", self)
action.triggered.connect(
lambda checked, d=device, w=res.width(), h=res.height(): (
self._on_resolution_selected(d, w, h)
)
)
self._resolution_menu.addAction(action)
def _on_resolution_selected(self, device: QCameraDevice, width: int, height: int) -> None:
self._rebuild_fps_menu(device, width, height)
def _rebuild_fps_menu(self, device: QCameraDevice, width: int, height: int) -> None:
self._fps_menu.clear()
self._fps_menu.setEnabled(True)
formats = device.videoFormats()
seen_labels: set[str] = set()
for fmt in formats:
res = fmt.resolution()
if res.width() == width and res.height() == height:
min_fps = round(fmt.minFrameRate())
max_fps = round(fmt.maxFrameRate())
label = f"{max_fps} FPS" if min_fps == max_fps else f"{min_fps}-{max_fps} FPS"
if label not in seen_labels:
seen_labels.add(label)
action = QAction(label, self)
action.triggered.connect(
lambda checked, f=fmt: self._camera.set_camera_format(f)
)
self._fps_menu.addAction(action)
def closeEvent(self, event: QCloseEvent) -> None: # noqa: N802
self._camera.stop()
super().closeEvent(event)

View File

View File

@@ -0,0 +1,49 @@
from __future__ import annotations
from PySide6.QtCore import Qt
from PySide6.QtGui import QColor, QFont, QFontMetrics, QPainter
from PySide6.QtWidgets import QWidget
class OverlayWidget(QWidget):
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self.setAttribute(Qt.WA_TransparentForMouseEvents)
self.setAttribute(Qt.WA_NoSystemBackground)
self._visible = True
self._metrics: dict[str, float | int] = {}
def set_visible(self, visible: bool) -> None:
self._visible = visible
self.update()
def set_metrics(self, metrics: dict[str, float | int]) -> None:
self._metrics = metrics
self.update()
def paintEvent(self, event) -> None: # noqa: N802
if not self._visible or not self._metrics:
return
lines = [
f"FPS: {self._metrics.get('fps', 0):>7}",
f"Frame: {self._metrics.get('frame_time_ms', 0):>7.1f} ms",
f"Frames: {self._metrics.get('frame_count', 0):>7}",
]
painter = QPainter(self)
painter.setRenderHint(QPainter.TextAntialiasing)
font = QFont("Consolas", 11)
painter.setFont(font)
fm = QFontMetrics(font)
text_width = max(fm.horizontalAdvance(line) for line in lines) + 24
text_height = len(lines) * (fm.height() + 6) + 12
painter.fillRect(8, 8, text_width, text_height, QColor(0, 0, 0, 160))
painter.setPen(QColor(0, 255, 0))
for i, line in enumerate(lines):
y = 22 + i * (fm.height() + 6)
painter.drawText(16, y, line)

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
from PySide6.QtCore import Qt
from PySide6.QtGui import QColor, QImage, QPainter
from PySide6.QtMultimedia import QVideoFrame
from PySide6.QtWidgets import QWidget
class VideoWidget(QWidget):
def __init__(self, parent: QWidget | None = None) -> None:
super().__init__(parent)
self._frame: QImage | None = None
self.setAttribute(Qt.WA_OpaquePaintEvent)
self.setMinimumSize(320, 240)
def on_frame(self, frame: QVideoFrame) -> None:
image = frame.toImage()
if not image.isNull():
self._frame = image
self.update()
def paintEvent(self, event) -> None: # noqa: N802
painter = QPainter(self)
painter.setRenderHint(QPainter.SmoothPixmapTransform)
if self._frame is not None:
painter.fillRect(self.rect(), QColor(0, 0, 0))
scaled = self._frame.scaled(
self.size(),
Qt.KeepAspectRatio,
Qt.SmoothTransformation,
)
x = (self.width() - scaled.width()) // 2
y = (self.height() - scaled.height()) // 2
painter.drawImage(x, y, scaled)
else:
painter.fillRect(self.rect(), QColor(20, 20, 20))
painter.setPen(QColor(100, 100, 100))
painter.drawText(self.rect(), Qt.AlignCenter, "No camera feed")

View File

View File

@@ -0,0 +1,39 @@
from __future__ import annotations
import time
from collections import deque
from PySide6.QtCore import QObject
from PySide6.QtMultimedia import QVideoFrame
class TelemetryCollector(QObject):
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
self._timestamps: deque[float] = deque(maxlen=500)
def on_frame(self, frame: QVideoFrame) -> None:
self._timestamps.append(time.perf_counter())
def metrics(self) -> dict[str, float | int]:
if not self._timestamps:
return {"fps": 0, "frame_time_ms": 0.0, "frame_count": 0}
now = time.perf_counter()
cutoff = now - 1.0
recent = [t for t in self._timestamps if t >= cutoff]
fps = len(recent)
frame_time_ms = 0.0
if len(self._timestamps) >= 2:
diffs = [
self._timestamps[i] - self._timestamps[i - 1]
for i in range(1, len(self._timestamps))
]
frame_time_ms = (sum(diffs) / len(diffs)) * 1000 if diffs else 0.0
return {
"fps": fps,
"frame_time_ms": round(frame_time_ms, 2),
"frame_count": len(self._timestamps),
}

150
notes/01-mvp-plan.md Normal file
View File

@@ -0,0 +1,150 @@
# MVP Implementation Plan — Duck Preview
## Stack
- **Language:** Python 3.12
- **GUI/Framework:** PySide6 6.11 (QtMultimedia + QtWidgets)
- **Camera backend:** QCamera + QMediaCaptureSession + QVideoSink (native GPU)
- **Rendering:** QWidget (paintEvent) z QPainter — manual render z QVideoFrame → QImage
- **Testing:** pytest
- **Linting/Formatting:** ruff
## Architecture
```
CameraService
└─ QVideoSink.videoFrame ──→ FrameDispatcher.on_frame
├─ VideoWidget.on_frame (render klatki)
├─ TelemetryCollector.on_frame (timestamp)
└─ [Future AI subscribers]
TelemetryCollector.metrics() ──→ OverlayWidget.set_metrics()
(polled co 200ms przez QTimer w app.py)
```
## Kolejność implementacji
### 0. Project scaffolding
`pyproject.toml` — projekt, zależność PySide6, entry point, ruff config, pytest
`duck_preview/__init__.py`, `__main__.py` — entry point `python -m duck_preview`
Podkatalogi: `camera/`, `dispatcher/`, `rendering/`, `telemetry/`
### 1. CameraService (`camera/service.py`)
- Wrapper na `QCamera` + `QMediaCaptureSession` + `QVideoSink`
- `available_cameras()` — static, zwraca listę `QCameraDevice` z `QMediaDevices`
- `start(device)` / `stop()` — zarządzanie cyklem życia kamery
- `set_camera_format(fmt)` — zmiana rozdzielczości/FPS
- `sink` property — QVideoSink do podpięcia dispatchera
- `error_occurred` Signal — propagacja błędów kamery
### 2. FrameDispatcher (`dispatcher/frame_dispatcher.py`)
- Prosty pub/sub: `subscribe(cb)` / `unsubscribe(cb)`
- `on_frame(frame)` — wołany z sygnału QVideoSink, iteruje wszystkich subskrybentów
- Frame dropping: na razie brak (wszystkie callbacki szybkie)
### 3. TelemetryCollector (`telemetry/collector.py`)
- Subskrybent dispatchera
- Zbiera timestampy (`deque`, maxlen=500)
- `metrics()` → dict: fps (klatki z ostatniej sekundy), frame_time_ms (średnia delta między klatkami), frame_count
- Brak CPU usage (out of scope na MVP)
### 4. VideoWidget (`rendering/video_widget.py`)
- `QWidget` z `WA_OpaquePaintEvent`
- `on_frame(frame)``frame.toImage()` → zapis QImage → `update()`
- `paintEvent` — skalowanie z zachowaniem proporcji, centrowanie, letterboxing
- Brak klatek → wyświetla "No camera feed"
### 5. OverlayWidget (`rendering/overlay.py`)
- Przezroczysty QWidget (`WA_TransparentForMouseEvents`)
- Nakładka na video, rysuje tekst metryk (FPS, frame time, frame count)
- Semi-transparentne tło, zielona czcionka Consolas
- Toggle widoczności przez menu Debug
### 6. MainWindow (`main_window.py`)
- `QMainWindow` z menu barem:
- **Camera** — lista dostępnych kamer (dynamiczna, reaguje na `videoInputsChanged`)
- **Resolution** — dostępne rozdzielczości dla wybranej kamery
- **FPS** — dostępne FPS dla wybranej rozdzielczości
- **Debug** — toggle nakładki
- Central widget: GridLayout z VideoWidget + OverlayWidget (stacked)
### 7. App (`app.py`)
- `main()` — tworzy QApplication, instancjonuje i łączy zależności (ręczne DI)
- Wire: camera.sink → dispatcher.on_frame
- Wire: dispatcher → telemetry.on_frame, video_widget.on_frame
- QTimer 200ms: telemetry.metrics() → overlay.set_metrics()
- `__main__.py``from duck_preview.app import main; main()`
---
## DI Wiring (ręczne, w app.py)
```
app = QApplication(sys.argv)
camera = CameraService()
dispatcher = FrameDispatcher()
telemetry = TelemetryCollector()
video_widget = VideoWidget()
overlay = OverlayWidget()
camera.sink.videoFrame.connect(dispatcher.on_frame)
dispatcher.subscribe(telemetry.on_frame)
dispatcher.subscribe(video_widget.on_frame)
window = MainWindow(camera, video_widget, overlay)
# Poll telemetry co 200ms → overlay
metrics_timer = QTimer()
metrics_timer.timeout.connect(lambda: overlay.set_metrics(telemetry.metrics()))
metrics_timer.start(200)
window.show()
app.exec()
```
---
## Struktura plików
```
duck-preview/
├── pyproject.toml
├── notes/
│ ├── 01-mvp-preview.md
│ └── 01-mvp-plan.md
├── duck_preview/
│ ├── __init__.py
│ ├── __main__.py
│ ├── app.py
│ ├── main_window.py
│ ├── camera/
│ │ ├── __init__.py
│ │ └── service.py
│ ├── dispatcher/
│ │ ├── __init__.py
│ │ └── frame_dispatcher.py
│ ├── rendering/
│ │ ├── __init__.py
│ │ ├── video_widget.py
│ │ └── overlay.py
│ └── telemetry/
│ ├── __init__.py
│ └── collector.py
└── tests/
├── __init__.py
├── test_dispatcher.py
└── test_collector.py
```
---
## Edge cases / uwagi
- Brak kamer → menu Camera pokazuje "No cameras detected"
- Brak klatek → VideoWidget pokazuje ciemne tło + napis
- Błąd kamery → CameraService emituje `error_occurred` (na razie tylko log)
- Zamknięcie okna → `closeEvent``camera.stop()`
- QVideoFrame.toImage() kopiuje dane — akceptowalne dla MVP
- Wszystkie obiekty żyją w main thread — brak problemów z threadingiem

21
pyproject.toml Normal file
View File

@@ -0,0 +1,21 @@
[project]
name = "duck-preview"
version = "0.1.0"
description = "Realtime camera preview application with performance monitoring"
requires-python = ">=3.12"
dependencies = [
"PySide6>=6.11",
]
[project.scripts]
duck-preview = "duck_preview.app:main"
[tool.ruff]
target-version = "py312"
line-length = 100
[tool.ruff.lint]
select = ["E", "F", "I", "N", "W"]
[tool.pytest.ini_options]
testpaths = ["tests"]

0
tests/__init__.py Normal file
View File

22
tests/test_collector.py Normal file
View File

@@ -0,0 +1,22 @@
import time
from unittest.mock import MagicMock
from duck_preview.telemetry.collector import TelemetryCollector
def test_metrics_empty():
collector = TelemetryCollector()
metrics = collector.metrics()
assert metrics == {"fps": 0, "frame_time_ms": 0.0, "frame_count": 0}
def test_metrics_after_frames():
collector = TelemetryCollector()
mock = MagicMock()
for _ in range(30):
collector.on_frame(mock)
time.sleep(0.002)
metrics = collector.metrics()
assert metrics["frame_count"] >= 30
assert metrics["frame_time_ms"] > 0

47
tests/test_dispatcher.py Normal file
View File

@@ -0,0 +1,47 @@
from duck_preview.dispatcher.frame_dispatcher import FrameDispatcher
def test_subscribe_notified():
dispatcher = FrameDispatcher()
received = []
def cb(frame):
received.append(frame)
dispatcher.subscribe(cb)
dispatcher.on_frame("test")
assert received == ["test"]
def test_unsubscribe_not_notified():
dispatcher = FrameDispatcher()
received = []
def cb(frame):
received.append(frame)
dispatcher.subscribe(cb)
dispatcher.unsubscribe(cb)
dispatcher.on_frame("test")
assert received == []
def test_multiple_subscribers():
dispatcher = FrameDispatcher()
received_1 = []
received_2 = []
def cb1(frame):
received_1.append(frame)
def cb2(frame):
received_2.append(frame)
dispatcher.subscribe(cb1)
dispatcher.subscribe(cb2)
dispatcher.on_frame("test")
assert received_1 == ["test"]
assert received_2 == ["test"]