Compare commits

...

2 Commits

6 changed files with 379 additions and 85 deletions

View File

@@ -27,12 +27,15 @@ class CameraService(QObject):
camera_started() — camera successfully opened and streaming
camera_stopped() — camera stopped (clean shutdown)
camera_error(str) — camera error description
format_changed(float) — actual FPS after format was applied
(emitted after camera restarts with new format)
"""
frame_ready = Signal(QVideoFrame)
camera_started = Signal()
camera_stopped = Signal()
camera_error = Signal(str)
format_changed = Signal(float) # actual FPS delivered by camera after format change
def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent)
@@ -42,6 +45,11 @@ class CameraService(QObject):
self._sink = QVideoSink(self)
self._current_info: CameraInfo | None = None
# Desired format — applied on next (re)start
self._desired_width: int = DEFAULT_WIDTH
self._desired_height: int = DEFAULT_HEIGHT
self._desired_fps: float = float(DEFAULT_FPS)
self._session.setVideoSink(self._sink)
self._sink.videoFrameChanged.connect(self._on_frame)
@@ -51,7 +59,7 @@ class CameraService(QObject):
def start(self, camera_info: CameraInfo) -> None:
"""Start streaming from the given camera device."""
self.stop()
self._stop_camera()
self._current_info = camera_info
self._camera = QCamera(camera_info.device, self)
@@ -59,22 +67,17 @@ class CameraService(QObject):
self._camera.activeChanged.connect(self._on_active_changed)
self._session.setCamera(self._camera)
self._apply_best_format(camera_info)
self._apply_format()
self._camera.start()
logger.info("Camera start requested: %s", camera_info.name)
def stop(self) -> None:
"""Stop the current camera."""
if self._camera is not None:
self._camera.stop()
self._camera.errorOccurred.disconnect()
self._camera.activeChanged.disconnect()
self._camera = None
self._current_info = None
logger.info("Camera stopped")
"""Stop the current camera and forget the device."""
self._stop_camera()
self._current_info = None
def reconnect(self) -> None:
"""Restart the current camera after an error or disconnect."""
"""Restart the current camera (e.g. after an error or disconnect)."""
if self._current_info is not None:
logger.info("Reconnecting camera: %s", self._current_info.name)
self.start(self._current_info)
@@ -82,19 +85,29 @@ class CameraService(QObject):
logger.warning("Reconnect requested but no camera was previously started")
def set_resolution(self, width: int, height: int) -> None:
"""Request a specific resolution. Effective on next start() if camera is active."""
if self._camera is None:
return
self._set_format(width, height, fps=None)
"""
Request a new resolution.
The camera is stopped and restarted so the backend reliably applies
the new format (QCamera.setCameraFormat on an active camera is often
silently ignored by Media Foundation on Windows).
"""
self._desired_width = width
self._desired_height = height
if self._current_info is not None:
logger.info("Resolution change requested: %dx%d — restarting camera", width, height)
self.start(self._current_info)
def set_fps(self, fps: float) -> None:
"""Request a specific frame rate."""
if self._camera is None or self._current_info is None:
return
# Get current resolution from active format
fmt = self._camera.cameraFormat()
res = fmt.resolution()
self._set_format(res.width(), res.height(), fps=fps)
"""
Request a new frame rate.
Same stop+start strategy as set_resolution().
"""
self._desired_fps = fps
if self._current_info is not None:
logger.info("FPS change requested: %.1f — restarting camera", fps)
self.start(self._current_info)
@property
def is_active(self) -> bool:
@@ -105,28 +118,37 @@ class CameraService(QObject):
return self._current_info
# ------------------------------------------------------------------
# Video output accessor for direct QVideoWidget connection
# Internal video output accessors (kept for future use)
# ------------------------------------------------------------------
def video_sink(self) -> QVideoSink:
"""Return the internal QVideoSink (used by VideoRenderer)."""
return self._sink
def capture_session(self) -> QMediaCaptureSession:
"""Return the capture session (can be connected to QVideoWidget directly)."""
return self._session
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
def _apply_best_format(self, info: CameraInfo) -> None:
"""Pick the best matching format: prefer DEFAULT_WIDTH x DEFAULT_HEIGHT at DEFAULT_FPS."""
if not info.formats:
return
self._set_format(DEFAULT_WIDTH, DEFAULT_HEIGHT, fps=float(DEFAULT_FPS))
def _stop_camera(self) -> None:
"""Stop and destroy the QCamera object without clearing _current_info."""
if self._camera is not None:
self._camera.stop()
self._camera.errorOccurred.disconnect()
self._camera.activeChanged.disconnect()
self._camera = None
logger.debug("Camera stopped (internal)")
def _set_format(self, width: int, height: int, fps: float | None) -> None:
def _apply_format(self) -> None:
"""
Select the best matching QCameraFormat and apply it before start().
The format is chosen by score:
+1000 exact resolution match
+100 exact FPS match (within 1 fps)
-|Δpixels| area proximity (tie-breaker)
"""
if self._camera is None or self._current_info is None:
return
@@ -138,11 +160,11 @@ class CameraService(QObject):
w, h = res.width(), res.height()
f = fmt.maxFrameRate()
res_match = int(w == width and h == height) * 1000
fps_match = int(fps is not None and abs(f - fps) < 1) * 100
area_score = -(abs(w * h - width * height))
score = res_match + fps_match + area_score
score = (
int(w == self._desired_width and h == self._desired_height) * 1000
+ int(abs(f - self._desired_fps) < 1) * 100
- abs(w * h - self._desired_width * self._desired_height)
)
if score > best_score:
best_score = score
best = fmt
@@ -151,12 +173,29 @@ class CameraService(QObject):
self._camera.setCameraFormat(best)
res = best.resolution()
logger.info(
"Camera format set: %dx%d @ %.1f fps",
res.width(),
res.height(),
best.maxFrameRate(),
"Camera format requested: %dx%d @ %.1f fps",
res.width(), res.height(), best.maxFrameRate(),
)
def _log_actual_format(self) -> None:
"""Log the format the camera actually started with and emit format_changed."""
if self._camera is None:
return
fmt = self._camera.cameraFormat()
res = fmt.resolution()
actual_fps = fmt.maxFrameRate()
logger.info(
"Camera format ACTUAL: %dx%d @ %.1f fps",
res.width(), res.height(), actual_fps,
)
if actual_fps != self._desired_fps:
logger.warning(
"Requested %.1f fps but camera is delivering %.1f fps "
"(camera may not support this combination)",
self._desired_fps, actual_fps,
)
self.format_changed.emit(actual_fps)
def _on_frame(self, frame: QVideoFrame) -> None:
if frame.isValid():
self.frame_ready.emit(frame)
@@ -167,7 +206,9 @@ class CameraService(QObject):
def _on_active_changed(self, active: bool) -> None:
if active:
logger.info("Camera active: %s", self._current_info.name if self._current_info else "?")
name = self._current_info.name if self._current_info else "?"
logger.info("Camera active: %s", name)
self._log_actual_format() # report what the camera actually accepted
self.camera_started.emit()
else:
logger.info("Camera inactive")

View File

@@ -24,6 +24,15 @@ class TelemetryOverlay(IOverlayLayer):
overlay = TelemetryOverlay()
camera_view.add_overlay_layer(overlay)
telemetry_collector.metrics_updated.connect(overlay.on_metrics_updated)
Display format:
FPS req 60.0 ← what was requested from camera
FPS got 30.2 ← what camera actually delivered
Frame 33.1 ms
Drop 0
CPU sys 14.8 % ← normalised by cpu_count (matches Task Manager)
CPU core 118.4 % ← per single core (can exceed 100%)
Mem 68 MB
"""
def __init__(self) -> None:
@@ -60,7 +69,6 @@ class TelemetryOverlay(IOverlayLayer):
box_w = max_width + OVERLAY_PADDING * 2
box_h = line_height * len(lines) + OVERLAY_PADDING * 2
# Position relative to the actual video area, not the full widget
x = video_rect.left() + OVERLAY_MARGIN
y = video_rect.top() + OVERLAY_MARGIN
@@ -83,12 +91,19 @@ class TelemetryOverlay(IOverlayLayer):
@staticmethod
def _format_lines(snap: TelemetrySnapshot) -> list[str]:
lines = [
f"FPS {snap.fps:>6.1f}",
f"Frame {snap.frame_time_ms:>6.1f} ms",
f"Drop {snap.dropped_frames:>6d}",
f"CPU {snap.cpu_percent:>5.1f} %",
]
lines: list[str] = []
# FPS — show target if known, then actual
if snap.target_fps is not None:
lines.append(f"FPS req {snap.target_fps:>6.1f}")
lines.append(f"FPS got {snap.fps:>6.1f}")
lines.append(f"Frame {snap.frame_time_ms:>6.1f} ms")
lines.append(f"Drop {snap.dropped_frames:>6d}")
lines.append(f"CPU sys {snap.cpu_percent_sys:>5.1f} %")
lines.append(f"CPU core {snap.cpu_percent_core:>5.1f} %")
if snap.memory_mb is not None:
lines.append(f"Mem {snap.memory_mb:>5.0f} MB")
lines.append(f"Mem {snap.memory_mb:>5.0f} MB")
return lines

View File

@@ -17,10 +17,13 @@ from app.config import TELEMETRY_UPDATE_INTERVAL_MS
class TelemetrySnapshot:
"""Immutable snapshot of current performance metrics."""
fps: float
fps: float # actual frames received in the last second
target_fps: float | None # FPS requested from the camera (None = unknown)
frame_time_ms: float # average inter-frame time in ms
dropped_frames: int # cumulative dropped frames detected
cpu_percent: float # this process CPU usage (0100, all cores)
cpu_percent_sys: float # process CPU as % of total system capacity
# (divided by cpu_count) — matches Task Manager
cpu_percent_core: float # process CPU per single core — can exceed 100%
memory_mb: float | None # process private working set in MB
timestamp: float # time.perf_counter() when snapshot was taken
@@ -32,6 +35,9 @@ class TelemetryCollector(QObject):
Connect to FrameDispatcher:
dispatcher.subscribe(collector.on_frame, drop_if_busy=False)
Receive target FPS updates from CameraService:
camera_service.format_changed.connect(collector.set_target_fps)
Listen to metrics updates:
collector.metrics_updated.connect(my_slot)
"""
@@ -46,6 +52,7 @@ class TelemetryCollector(QObject):
super().__init__(parent)
self._update_interval_ms = update_interval_ms
self._target_fps: float | None = None
# frame timing ring-buffer (last 120 samples)
self._frame_times: deque[float] = deque(maxlen=120)
@@ -54,12 +61,13 @@ class TelemetryCollector(QObject):
self._dropped_frames: int = 0
# FPS window — count frames in the last second
self._fps_window: deque[float] = deque() # timestamps of recent frames
self._fps_window: deque[float] = deque()
self._fps_window_size_s: float = 1.0
# psutil process reference — call cpu_percent once to initialise the baseline
# psutil — initialise baseline so first real reading is non-zero
self._process = psutil.Process()
self._process.cpu_percent() # first call always returns 0.0; discard it
self._process.cpu_percent() # first call always returns 0.0; discard
self._cpu_count: int = max(psutil.cpu_count(logical=True) or 1, 1)
# periodic snapshot timer
self._timer = QTimer(self)
@@ -67,9 +75,16 @@ class TelemetryCollector(QObject):
self._timer.timeout.connect(self._emit_snapshot)
self._timer.start()
# latest snapshot (available synchronously)
self._latest: TelemetrySnapshot = self._make_empty_snapshot()
# ------------------------------------------------------------------
# Configuration
# ------------------------------------------------------------------
def set_target_fps(self, fps: float | None) -> None:
"""Record the FPS that was requested from the camera."""
self._target_fps = fps
# ------------------------------------------------------------------
# Frame subscriber callback
# ------------------------------------------------------------------
@@ -78,12 +93,11 @@ class TelemetryCollector(QObject):
"""Called by FrameDispatcher for every frame. Must be fast."""
now = time.perf_counter()
# inter-frame time
if self._last_frame_time > 0:
delta = now - self._last_frame_time
self._frame_times.append(delta)
# drop detection: if delta > 2.5× the rolling average, count as drop
# drop detection: gap > 2.5× rolling average
if len(self._frame_times) >= 5:
avg = sum(self._frame_times) / len(self._frame_times)
if delta > avg * 2.5:
@@ -92,9 +106,7 @@ class TelemetryCollector(QObject):
self._last_frame_time = now
self._total_frames += 1
# FPS window
self._fps_window.append(now)
# prune old entries
cutoff = now - self._fps_window_size_s
while self._fps_window and self._fps_window[0] < cutoff:
self._fps_window.popleft()
@@ -104,7 +116,6 @@ class TelemetryCollector(QObject):
# ------------------------------------------------------------------
def latest_snapshot(self) -> TelemetrySnapshot:
"""Return the most recently computed snapshot."""
return self._latest
def reset_counters(self) -> None:
@@ -131,46 +142,49 @@ class TelemetryCollector(QObject):
cutoff = now - self._fps_window_size_s
while self._fps_window and self._fps_window[0] < cutoff:
self._fps_window.popleft()
fps = float(len(self._fps_window)) # frames in the last second
fps = float(len(self._fps_window))
# average frame time
if self._frame_times:
avg_frame_time_ms = (sum(self._frame_times) / len(self._frame_times)) * 1000.0
else:
avg_frame_time_ms = 0.0
avg_frame_time_ms = (
(sum(self._frame_times) / len(self._frame_times)) * 1000.0
if self._frame_times
else 0.0
)
# CPU — this process only, cumulative since last call (non-blocking)
# CPU — per-core reading, then derive system-normalised value
try:
cpu = self._process.cpu_percent()
cpu_core = self._process.cpu_percent()
except Exception:
cpu = 0.0
cpu_core = 0.0
cpu_sys = cpu_core / self._cpu_count
# Memory — private working set (Windows) or RSS (macOS/Linux)
# This excludes shared DLLs/frameworks and matches Task Manager "Private"
try:
mem_info = self._process.memory_info()
# wset = Windows Working Set (private); rss on macOS/Linux
mem_bytes = getattr(mem_info, "wset", None) or mem_info.rss
mem_mb = mem_bytes / (1024 * 1024)
mem_mb: float | None = mem_bytes / (1024 * 1024)
except Exception:
mem_mb = None
return TelemetrySnapshot(
fps=round(fps, 1),
target_fps=self._target_fps,
frame_time_ms=round(avg_frame_time_ms, 2),
dropped_frames=self._dropped_frames,
cpu_percent=round(cpu, 1),
cpu_percent_sys=round(cpu_sys, 1),
cpu_percent_core=round(cpu_core, 1),
memory_mb=round(mem_mb, 1) if mem_mb is not None else None,
timestamp=now,
)
@staticmethod
def _make_empty_snapshot() -> TelemetrySnapshot:
def _make_empty_snapshot(self) -> TelemetrySnapshot:
return TelemetrySnapshot(
fps=0.0,
target_fps=self._target_fps,
frame_time_ms=0.0,
dropped_frames=0,
cpu_percent=0.0,
cpu_percent_sys=0.0,
cpu_percent_core=0.0,
memory_mb=None,
timestamp=time.perf_counter(),
)

View File

@@ -120,6 +120,9 @@ class MainWindow(QMainWindow):
# TelemetryCollector → TelemetryOverlay (data only, no repaint trigger here)
self._telemetry.metrics_updated.connect(self._telemetry_overlay.on_metrics_updated)
# CameraService → TelemetryCollector: keep target FPS in sync
self._camera_service.format_changed.connect(self._telemetry.set_target_fps)
# CameraService status
self._camera_service.camera_started.connect(self._on_camera_started)
self._camera_service.camera_stopped.connect(self._on_camera_stopped)

194
notes/02-mvp-app.md Normal file
View File

@@ -0,0 +1,194 @@
# Stan aplikacji — MVP Camera Preview
Data: 2026-05-12
Środowisko dev: Windows 11, Python 3.12.10, PySide6 6.11.0
Środowisko docelowe: Mac Mini (Intel i7, macOS Ventura), kamera ELP USB
---
## Zaimplementowane moduły
### Struktura projektu
```
duck-preview2/
├── app/
│ ├── config.py # stałe i domyślne ustawienia
│ ├── main.py # entry point (python -m app.main)
│ ├── camera/
│ │ ├── camera_enumerator.py # wykrywanie kamer przez QMediaDevices
│ │ └── camera_service.py # zarządzanie QCamera + QMediaCaptureSession
│ ├── pipeline/
│ │ └── frame_dispatcher.py # pub/sub dystrybucja QVideoFrame
│ ├── telemetry/
│ │ └── telemetry_collector.py # pomiar FPS, frame time, CPU, pamięci
│ ├── overlay/
│ │ ├── overlay_layer.py # interfejs IOverlayLayer (ABC)
│ │ └── telemetry_overlay.py # implementacja — box z metrykami
│ └── ui/
│ ├── camera_view.py # CameraView — render + kompozycja overlayów
│ ├── main_window.py # główne okno, wiring komponentów
│ └── menu_bar.py # menu: Camera / Video / Debug
├── tests/
│ ├── test_frame_dispatcher.py # 8 testów jednostkowych
│ └── test_telemetry_collector.py # 7 testów jednostkowych
├── pyproject.toml # konfiguracja pytest + ruff
├── requirements.txt # PySide6, psutil
└── requirements-dev.txt # + pytest, ruff
```
### Opis komponentów
#### CameraEnumerator (`app/camera/camera_enumerator.py`)
- Wykrywa dostępne kamery przez `QMediaDevices.videoInputs()`
- Zwraca listę `CameraInfo` z nazwą, id i listą obsługiwanych formatów (rozdzielczość × FPS)
- Formaty deduplikowane i posortowane (największa rozdzielczość pierwsza)
- Obsługa braku kamer bez crasha
#### CameraService (`app/camera/camera_service.py`)
- Opakowuje `QCamera` + `QMediaCaptureSession` + `QVideoSink`
- API: `start(CameraInfo)`, `stop()`, `reconnect()`, `set_resolution()`, `set_fps()`
- Algorytm doboru formatu: score-based (priorytet: dopasowanie rozdzielczości, potem FPS)
- Sygnały: `frame_ready(QVideoFrame)`, `camera_started`, `camera_stopped`, `camera_error`
#### FrameDispatcher (`app/pipeline/frame_dispatcher.py`)
- Pub/sub: `subscribe(callback, drop_if_busy=True)` / `unsubscribe(callback)`
- `drop_if_busy=True` — klatka pomijana jeśli subskrybent jest zajęty (render)
- `drop_if_busy=False` — każda klatka dociera (telemetria)
- Odporny na wyjątki w subskrybentach (jeden nie blokuje pozostałych)
#### TelemetryCollector (`app/telemetry/telemetry_collector.py`)
- Subskrybuje każdą klatkę (`drop_if_busy=False`)
- Mierzy: FPS (okno 1 s), średni frame time (ring-buffer 120 próbek), dropped frames (heurystyka 2.5× avg)
- CPU: `psutil.Process.cpu_percent()` — tylko nasz proces, inicjalizowany w `__init__` (warmup)
- RAM: `memory_info().wset` (Windows private working set) lub `rss` (macOS/Linux)
- Emituje `metrics_updated(TelemetrySnapshot)` co 500 ms przez `QTimer`
#### IOverlayLayer (`app/overlay/overlay_layer.py`)
- Abstrakcja (ABC) dla pluggable overlayów
- Interface: `paint(painter: QPainter, video_rect: QRect)`, `visible: bool`, `name: str`
- Nowe overlaye nie wymagają modyfikacji żadnego istniejącego kodu
#### TelemetryOverlay (`app/overlay/telemetry_overlay.py`)
- Implementacja `IOverlayLayer`
- Rysuje semi-przezroczysty box z metrykami w lewym górnym rogu video
- Slot `on_metrics_updated(TelemetrySnapshot)` — odbiera dane z TelemetryCollector
- Format: FPS, Frame time, Drop count, CPU %, Mem MB
#### CameraView (`app/ui/camera_view.py`)
- Zwykły `QWidget` (nie `QVideoWidget`) — render przez `QPainter` w `paintEvent`
- Odbiera `QVideoFrame` przez slot `on_frame()`, konwertuje do `QImage.Format_RGB32`
- Letterboxing z zachowaniem aspect ratio
- Rejestr overlayów: `add_overlay_layer()`, `remove_overlay_layer()`, `set_all_overlays_visible()`
- W `paintEvent`: rysuje klatkę → iteruje po warstwach, każda dostaje `painter.save()/restore()`
#### MainWindow (`app/ui/main_window.py`)
- Wires together wszystkie komponenty
- Minimalny status bar (nazwa kamery, błędy)
- `closeEvent``CameraService.stop()`
#### AppMenuBar (`app/ui/menu_bar.py`)
- Menu **Camera**: lista wykrytych kamer (radio), Reconnect
- Menu **Video**: Resolution submenu, FPS submenu (pobierane z `QCameraDevice.videoFormats()`)
- Menu **Debug**: Show Overlay (toggle), Console Logging (toggle poziom logowania)
---
## Co działało na Windows 11 (dev)
- Wykrycie wbudowanej kamery laptopa
- Podgląd w czasie rzeczywistym
- Przełączanie rozdzielczości i FPS z menu
- Overlay z metrykami widoczny i aktualizowany
- Toggle overlay przez Debug menu
- Logowanie do konsoli przez Debug menu
- Reconnect po zmianie kamery
- Letterboxing przy resize okna
---
## Co próbowaliśmy i nie wyszło
### QVideoWidget jako renderer (porzucone)
**Podejście:** Użycie `QVideoWidget` jako centralnego widgetu + nałożenie `OverlayWidget` (child lub sibling).
**Problem:** Na Windows `QVideoWidget` tworzy natywne okno HWND z powierzchnią D3D (Media Foundation backend). Natywna powierzchnia jest rysowana poza hierarchią Qt i zawsze przykrywa wszystkie `QWidget` — niezależnie od:
- kolejności z-order (`raise_()`)
- rodzica (child/sibling)
- flag okna
- `WA_TranslucentBackground` / `WA_NoSystemBackground`
Żaden `QWidget` nie może się wyświetlić nad `QVideoWidget` na Windows.
**Próby ratowania:**
1. `OverlayWidget` jako child `QVideoWidget` — zasłonięty przez natywną powierzchnię
2. Kontener `QWidget` z `QVideoWidget` i `OverlayWidget` jako rodzeństwo — nadal zasłonięty
3. `setWindowFlags(FramelessWindowHint)` na child widget — odrywa widget od rodzica, tworzy osobne (niewidoczne) okno top-level
**Rozwiązanie:** Porzucenie `QVideoWidget`. Własny `CameraView(QWidget)` odbiera klatki przez `QVideoSink`, konwertuje do `QImage` i rysuje przez `QPainter`. Overlay w tym samym `paintEvent` — brak konfliktu z natywnym renderowaniem.
### OverlayWidget jako osobny QWidget (porzucone)
**Podejście:** `OverlayWidget` z `WA_TransparentForMouseEvents` i `WA_TranslucentBackground` nałożony na video.
**Problem:** Poza konfliktem z `QVideoWidget`, `WA_TranslucentBackground` na child widget działa tylko gdy rodzic też jest transparentny — Qt nie komponuje dziecka z tłem rodzica innym niż własne. W praktyce overlay był niewidoczny lub zasłaniał video czarnym prostokątem.
---
## Znane ograniczenia i uwagi
### Pomiar CPU
`psutil.Process.cpu_percent()` zwraca procent **względem jednego rdzenia** (np. 50% = pół rdzenia). Na wielordzeniowym procesorze 15% w Task Managerze (uśrednione po rdzeniach) może odpowiadać 50% na jednym rdzeniu. To nie jest błąd — to różna metodologia. Task Manager pokazuje `total_cpu / num_cores`, psutil pokazuje `cpu_time / wall_time`.
Jeśli potrzebny jest widok "jak Task Manager": `process.cpu_percent() / psutil.cpu_count()`.
### Pomiar RAM
`memory_info().wset` (Windows) = Private Working Set = to co Task Manager pokazuje w kolumnie "Pamięć". RSS zawiera też współdzielone biblioteki (Qt DLLs ~40 MB) i dlatego było zawyżone. Na macOS używane jest `rss` (tam `wset` nie istnieje).
### Wydajność konwersji klatek
`QVideoFrame.toImage()` + `convertToFormat(RGB32)` wykonuje się na CPU. Przy 1080p60 to koszt ~1-3 ms/klatkę. Dla MVP akceptowalny. Przy przyszłej integracji YOLO warto rozważyć bezpośredni dostęp do danych przez `QVideoFrame.map()` i przekazywanie raw bufora do GPU.
### Brak testów integracyjnych
Testy jednostkowe pokrywają `FrameDispatcher` i `TelemetryCollector` w izolacji (bez Qt event loop). Brak testów `CameraView`, `CameraService` i `MainWindow` — wymagałyby `QApplication` i mockowania urządzeń.
---
## Stan zgodności z PRD (01-mvp-preview.md)
| Wymaganie | Status |
|---|---|
| Realtime camera preview | Zaimplementowane |
| Camera switching | Zaimplementowane |
| Resolution selection | Zaimplementowane |
| FPS selection | Zaimplementowane |
| Reconnect/restart | Zaimplementowane |
| Realtime FPS metric | Zaimplementowane |
| Frame time metric | Zaimplementowane |
| Dropped frames detection | Zaimplementowane (heurystyka) |
| CPU usage metric | Zaimplementowane |
| Memory usage metric | Zaimplementowane |
| Overlay system | Zaimplementowane (IOverlayLayer) |
| Performance metrics display | Zaimplementowane (TelemetryOverlay) |
| Minimal GUI | Zaimplementowane |
| Camera menu | Zaimplementowane |
| Resolution/FPS menu | Zaimplementowane |
| Debug/telemetry options | Zaimplementowane |
| Architecture ready for AI subscribers | Zaimplementowane (IOverlayLayer + FrameDispatcher) |
| Low latency preview | Zaimplementowane (drop-if-busy w dispatcher) |
| Non-blocking GUI thread | Zaimplementowane |
---
## Następne kroki (poza MVP)
- Walidacja na Mac Mini z kamerą ELP (AVFoundation backend, macOS Ventura)
- Snapshot / recording
- Integracja YOLO: `YoloBboxOverlay(IOverlayLayer)` + worker w osobnym procesie
- Integracja OCR
- Optymalizacja konwersji klatek (QVideoFrame.map() → numpy → GPU)
- Testy integracyjne z QApplication + mock camera

View File

@@ -10,7 +10,7 @@ from unittest.mock import MagicMock, patch
class TestTelemetryCollector:
"""Test telemetry calculations in isolation (no Qt event loop required)."""
def _make_collector(self):
def _make_collector(self, cpu_count: int = 8):
"""Construct a TelemetryCollector bypassing Qt machinery."""
from app.telemetry.telemetry_collector import TelemetryCollector
@@ -23,8 +23,11 @@ class TestTelemetryCollector:
col._dropped_frames = 0
col._fps_window = deque()
col._fps_window_size_s = 1.0
col._target_fps = None
col._cpu_count = cpu_count
col._process = MagicMock()
# Simulate Windows: wset is present and takes priority over rss
# Simulate Windows: wset takes priority over rss
mem_info = MagicMock()
mem_info.wset = 50 * 1024 * 1024 # 50 MB private working set
mem_info.rss = 70 * 1024 * 1024 # RSS (larger, includes shared)
@@ -40,7 +43,6 @@ class TestTelemetryCollector:
def test_fps_counts_frames_in_window(self):
col = self._make_collector()
now = time.perf_counter()
# Simulate 30 frames within the last second
for i in range(30):
col._fps_window.append(now - 0.9 + i * 0.03)
snap = col._compute_snapshot()
@@ -60,7 +62,6 @@ class TestTelemetryCollector:
def test_frame_time_average(self):
col = self._make_collector()
# 10 frames at 33.3 ms each
interval = 0.0333
for _ in range(10):
col._frame_times.append(interval)
@@ -69,7 +70,6 @@ class TestTelemetryCollector:
def test_drop_detection(self):
col = self._make_collector()
# Seed with 10 normal frames at ~16 ms
normal_interval = 0.016
now = time.perf_counter()
col._last_frame_time = now - normal_interval * 10
@@ -77,15 +77,11 @@ class TestTelemetryCollector:
col._last_frame_time += normal_interval
col._frame_times.append(normal_interval)
# Simulate a big gap (3× normal) — should trigger drop detection
col._last_frame_time = now - normal_interval * 10 # reset base
# Manually call on_frame-like logic
col._last_frame_time = now
with patch("app.telemetry.telemetry_collector.time") as mock_time:
# Set last_frame_time to something reasonable
col._last_frame_time = now
big_delta = normal_interval * 5 # 5× average → drop
mock_time.perf_counter.return_value = now + big_delta
# Replicate the drop detection logic
delta = big_delta
col._frame_times.append(delta)
avg = sum(col._frame_times) / len(col._frame_times)
@@ -110,3 +106,34 @@ class TestTelemetryCollector:
col = self._make_collector()
snap = col._compute_snapshot()
assert snap.memory_mb == 50.0
def test_cpu_sys_is_core_divided_by_cpu_count(self):
col = self._make_collector(cpu_count=8)
col._process.cpu_percent.return_value = 80.0 # 80% of one core
snap = col._compute_snapshot()
assert snap.cpu_percent_core == 80.0
assert snap.cpu_percent_sys == round(80.0 / 8, 1)
def test_cpu_sys_never_exceeds_100_on_single_core_machine(self):
col = self._make_collector(cpu_count=1)
col._process.cpu_percent.return_value = 95.0
snap = col._compute_snapshot()
assert snap.cpu_percent_sys == snap.cpu_percent_core == 95.0
def test_cpu_sys_le_cpu_core(self):
"""cpu_percent_sys must always be <= cpu_percent_core."""
col = self._make_collector(cpu_count=4)
col._process.cpu_percent.return_value = 150.0
snap = col._compute_snapshot()
assert snap.cpu_percent_sys <= snap.cpu_percent_core
def test_target_fps_none_by_default(self):
col = self._make_collector()
snap = col._compute_snapshot()
assert snap.target_fps is None
def test_set_target_fps_reflected_in_snapshot(self):
col = self._make_collector()
col.set_target_fps(60.0)
snap = col._compute_snapshot()
assert snap.target_fps == 60.0