feat: enhance telemetry metrics with target FPS tracking and logging

This commit is contained in:
2026-05-12 21:49:27 +02:00
parent b238f0d9b4
commit aec286c5ec
5 changed files with 185 additions and 85 deletions

View File

@@ -27,12 +27,15 @@ class CameraService(QObject):
camera_started() — camera successfully opened and streaming camera_started() — camera successfully opened and streaming
camera_stopped() — camera stopped (clean shutdown) camera_stopped() — camera stopped (clean shutdown)
camera_error(str) — camera error description camera_error(str) — camera error description
format_changed(float) — actual FPS after format was applied
(emitted after camera restarts with new format)
""" """
frame_ready = Signal(QVideoFrame) frame_ready = Signal(QVideoFrame)
camera_started = Signal() camera_started = Signal()
camera_stopped = Signal() camera_stopped = Signal()
camera_error = Signal(str) camera_error = Signal(str)
format_changed = Signal(float) # actual FPS delivered by camera after format change
def __init__(self, parent: QObject | None = None) -> None: def __init__(self, parent: QObject | None = None) -> None:
super().__init__(parent) super().__init__(parent)
@@ -42,6 +45,11 @@ class CameraService(QObject):
self._sink = QVideoSink(self) self._sink = QVideoSink(self)
self._current_info: CameraInfo | None = None self._current_info: CameraInfo | None = None
# Desired format — applied on next (re)start
self._desired_width: int = DEFAULT_WIDTH
self._desired_height: int = DEFAULT_HEIGHT
self._desired_fps: float = float(DEFAULT_FPS)
self._session.setVideoSink(self._sink) self._session.setVideoSink(self._sink)
self._sink.videoFrameChanged.connect(self._on_frame) self._sink.videoFrameChanged.connect(self._on_frame)
@@ -51,7 +59,7 @@ class CameraService(QObject):
def start(self, camera_info: CameraInfo) -> None: def start(self, camera_info: CameraInfo) -> None:
"""Start streaming from the given camera device.""" """Start streaming from the given camera device."""
self.stop() self._stop_camera()
self._current_info = camera_info self._current_info = camera_info
self._camera = QCamera(camera_info.device, self) self._camera = QCamera(camera_info.device, self)
@@ -59,22 +67,17 @@ class CameraService(QObject):
self._camera.activeChanged.connect(self._on_active_changed) self._camera.activeChanged.connect(self._on_active_changed)
self._session.setCamera(self._camera) self._session.setCamera(self._camera)
self._apply_best_format(camera_info) self._apply_format()
self._camera.start() self._camera.start()
logger.info("Camera start requested: %s", camera_info.name) logger.info("Camera start requested: %s", camera_info.name)
def stop(self) -> None: def stop(self) -> None:
"""Stop the current camera.""" """Stop the current camera and forget the device."""
if self._camera is not None: self._stop_camera()
self._camera.stop() self._current_info = None
self._camera.errorOccurred.disconnect()
self._camera.activeChanged.disconnect()
self._camera = None
self._current_info = None
logger.info("Camera stopped")
def reconnect(self) -> None: def reconnect(self) -> None:
"""Restart the current camera after an error or disconnect.""" """Restart the current camera (e.g. after an error or disconnect)."""
if self._current_info is not None: if self._current_info is not None:
logger.info("Reconnecting camera: %s", self._current_info.name) logger.info("Reconnecting camera: %s", self._current_info.name)
self.start(self._current_info) self.start(self._current_info)
@@ -82,19 +85,29 @@ class CameraService(QObject):
logger.warning("Reconnect requested but no camera was previously started") logger.warning("Reconnect requested but no camera was previously started")
def set_resolution(self, width: int, height: int) -> None: def set_resolution(self, width: int, height: int) -> None:
"""Request a specific resolution. Effective on next start() if camera is active.""" """
if self._camera is None: Request a new resolution.
return
self._set_format(width, height, fps=None) The camera is stopped and restarted so the backend reliably applies
the new format (QCamera.setCameraFormat on an active camera is often
silently ignored by Media Foundation on Windows).
"""
self._desired_width = width
self._desired_height = height
if self._current_info is not None:
logger.info("Resolution change requested: %dx%d — restarting camera", width, height)
self.start(self._current_info)
def set_fps(self, fps: float) -> None: def set_fps(self, fps: float) -> None:
"""Request a specific frame rate.""" """
if self._camera is None or self._current_info is None: Request a new frame rate.
return
# Get current resolution from active format Same stop+start strategy as set_resolution().
fmt = self._camera.cameraFormat() """
res = fmt.resolution() self._desired_fps = fps
self._set_format(res.width(), res.height(), fps=fps) if self._current_info is not None:
logger.info("FPS change requested: %.1f — restarting camera", fps)
self.start(self._current_info)
@property @property
def is_active(self) -> bool: def is_active(self) -> bool:
@@ -105,28 +118,37 @@ class CameraService(QObject):
return self._current_info return self._current_info
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Video output accessor for direct QVideoWidget connection # Internal video output accessors (kept for future use)
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def video_sink(self) -> QVideoSink: def video_sink(self) -> QVideoSink:
"""Return the internal QVideoSink (used by VideoRenderer)."""
return self._sink return self._sink
def capture_session(self) -> QMediaCaptureSession: def capture_session(self) -> QMediaCaptureSession:
"""Return the capture session (can be connected to QVideoWidget directly)."""
return self._session return self._session
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Private helpers # Private helpers
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def _apply_best_format(self, info: CameraInfo) -> None: def _stop_camera(self) -> None:
"""Pick the best matching format: prefer DEFAULT_WIDTH x DEFAULT_HEIGHT at DEFAULT_FPS.""" """Stop and destroy the QCamera object without clearing _current_info."""
if not info.formats: if self._camera is not None:
return self._camera.stop()
self._set_format(DEFAULT_WIDTH, DEFAULT_HEIGHT, fps=float(DEFAULT_FPS)) self._camera.errorOccurred.disconnect()
self._camera.activeChanged.disconnect()
self._camera = None
logger.debug("Camera stopped (internal)")
def _set_format(self, width: int, height: int, fps: float | None) -> None: def _apply_format(self) -> None:
"""
Select the best matching QCameraFormat and apply it before start().
The format is chosen by score:
+1000 exact resolution match
+100 exact FPS match (within 1 fps)
-|Δpixels| area proximity (tie-breaker)
"""
if self._camera is None or self._current_info is None: if self._camera is None or self._current_info is None:
return return
@@ -138,11 +160,11 @@ class CameraService(QObject):
w, h = res.width(), res.height() w, h = res.width(), res.height()
f = fmt.maxFrameRate() f = fmt.maxFrameRate()
res_match = int(w == width and h == height) * 1000 score = (
fps_match = int(fps is not None and abs(f - fps) < 1) * 100 int(w == self._desired_width and h == self._desired_height) * 1000
area_score = -(abs(w * h - width * height)) + int(abs(f - self._desired_fps) < 1) * 100
- abs(w * h - self._desired_width * self._desired_height)
score = res_match + fps_match + area_score )
if score > best_score: if score > best_score:
best_score = score best_score = score
best = fmt best = fmt
@@ -151,12 +173,29 @@ class CameraService(QObject):
self._camera.setCameraFormat(best) self._camera.setCameraFormat(best)
res = best.resolution() res = best.resolution()
logger.info( logger.info(
"Camera format set: %dx%d @ %.1f fps", "Camera format requested: %dx%d @ %.1f fps",
res.width(), res.width(), res.height(), best.maxFrameRate(),
res.height(),
best.maxFrameRate(),
) )
def _log_actual_format(self) -> None:
"""Log the format the camera actually started with and emit format_changed."""
if self._camera is None:
return
fmt = self._camera.cameraFormat()
res = fmt.resolution()
actual_fps = fmt.maxFrameRate()
logger.info(
"Camera format ACTUAL: %dx%d @ %.1f fps",
res.width(), res.height(), actual_fps,
)
if actual_fps != self._desired_fps:
logger.warning(
"Requested %.1f fps but camera is delivering %.1f fps "
"(camera may not support this combination)",
self._desired_fps, actual_fps,
)
self.format_changed.emit(actual_fps)
def _on_frame(self, frame: QVideoFrame) -> None: def _on_frame(self, frame: QVideoFrame) -> None:
if frame.isValid(): if frame.isValid():
self.frame_ready.emit(frame) self.frame_ready.emit(frame)
@@ -167,7 +206,9 @@ class CameraService(QObject):
def _on_active_changed(self, active: bool) -> None: def _on_active_changed(self, active: bool) -> None:
if active: if active:
logger.info("Camera active: %s", self._current_info.name if self._current_info else "?") name = self._current_info.name if self._current_info else "?"
logger.info("Camera active: %s", name)
self._log_actual_format() # report what the camera actually accepted
self.camera_started.emit() self.camera_started.emit()
else: else:
logger.info("Camera inactive") logger.info("Camera inactive")

View File

@@ -24,6 +24,15 @@ class TelemetryOverlay(IOverlayLayer):
overlay = TelemetryOverlay() overlay = TelemetryOverlay()
camera_view.add_overlay_layer(overlay) camera_view.add_overlay_layer(overlay)
telemetry_collector.metrics_updated.connect(overlay.on_metrics_updated) telemetry_collector.metrics_updated.connect(overlay.on_metrics_updated)
Display format:
FPS req 60.0 ← what was requested from camera
FPS got 30.2 ← what camera actually delivered
Frame 33.1 ms
Drop 0
CPU sys 14.8 % ← normalised by cpu_count (matches Task Manager)
CPU core 118.4 % ← per single core (can exceed 100%)
Mem 68 MB
""" """
def __init__(self) -> None: def __init__(self) -> None:
@@ -60,7 +69,6 @@ class TelemetryOverlay(IOverlayLayer):
box_w = max_width + OVERLAY_PADDING * 2 box_w = max_width + OVERLAY_PADDING * 2
box_h = line_height * len(lines) + OVERLAY_PADDING * 2 box_h = line_height * len(lines) + OVERLAY_PADDING * 2
# Position relative to the actual video area, not the full widget
x = video_rect.left() + OVERLAY_MARGIN x = video_rect.left() + OVERLAY_MARGIN
y = video_rect.top() + OVERLAY_MARGIN y = video_rect.top() + OVERLAY_MARGIN
@@ -83,12 +91,19 @@ class TelemetryOverlay(IOverlayLayer):
@staticmethod @staticmethod
def _format_lines(snap: TelemetrySnapshot) -> list[str]: def _format_lines(snap: TelemetrySnapshot) -> list[str]:
lines = [ lines: list[str] = []
f"FPS {snap.fps:>6.1f}",
f"Frame {snap.frame_time_ms:>6.1f} ms", # FPS — show target if known, then actual
f"Drop {snap.dropped_frames:>6d}", if snap.target_fps is not None:
f"CPU {snap.cpu_percent:>5.1f} %", lines.append(f"FPS req {snap.target_fps:>6.1f}")
] lines.append(f"FPS got {snap.fps:>6.1f}")
lines.append(f"Frame {snap.frame_time_ms:>6.1f} ms")
lines.append(f"Drop {snap.dropped_frames:>6d}")
lines.append(f"CPU sys {snap.cpu_percent_sys:>5.1f} %")
lines.append(f"CPU core {snap.cpu_percent_core:>5.1f} %")
if snap.memory_mb is not None: if snap.memory_mb is not None:
lines.append(f"Mem {snap.memory_mb:>5.0f} MB") lines.append(f"Mem {snap.memory_mb:>5.0f} MB")
return lines return lines

View File

@@ -17,10 +17,13 @@ from app.config import TELEMETRY_UPDATE_INTERVAL_MS
class TelemetrySnapshot: class TelemetrySnapshot:
"""Immutable snapshot of current performance metrics.""" """Immutable snapshot of current performance metrics."""
fps: float fps: float # actual frames received in the last second
target_fps: float | None # FPS requested from the camera (None = unknown)
frame_time_ms: float # average inter-frame time in ms frame_time_ms: float # average inter-frame time in ms
dropped_frames: int # cumulative dropped frames detected dropped_frames: int # cumulative dropped frames detected
cpu_percent: float # this process CPU usage (0100, all cores) cpu_percent_sys: float # process CPU as % of total system capacity
# (divided by cpu_count) — matches Task Manager
cpu_percent_core: float # process CPU per single core — can exceed 100%
memory_mb: float | None # process private working set in MB memory_mb: float | None # process private working set in MB
timestamp: float # time.perf_counter() when snapshot was taken timestamp: float # time.perf_counter() when snapshot was taken
@@ -32,6 +35,9 @@ class TelemetryCollector(QObject):
Connect to FrameDispatcher: Connect to FrameDispatcher:
dispatcher.subscribe(collector.on_frame, drop_if_busy=False) dispatcher.subscribe(collector.on_frame, drop_if_busy=False)
Receive target FPS updates from CameraService:
camera_service.format_changed.connect(collector.set_target_fps)
Listen to metrics updates: Listen to metrics updates:
collector.metrics_updated.connect(my_slot) collector.metrics_updated.connect(my_slot)
""" """
@@ -46,6 +52,7 @@ class TelemetryCollector(QObject):
super().__init__(parent) super().__init__(parent)
self._update_interval_ms = update_interval_ms self._update_interval_ms = update_interval_ms
self._target_fps: float | None = None
# frame timing ring-buffer (last 120 samples) # frame timing ring-buffer (last 120 samples)
self._frame_times: deque[float] = deque(maxlen=120) self._frame_times: deque[float] = deque(maxlen=120)
@@ -54,12 +61,13 @@ class TelemetryCollector(QObject):
self._dropped_frames: int = 0 self._dropped_frames: int = 0
# FPS window — count frames in the last second # FPS window — count frames in the last second
self._fps_window: deque[float] = deque() # timestamps of recent frames self._fps_window: deque[float] = deque()
self._fps_window_size_s: float = 1.0 self._fps_window_size_s: float = 1.0
# psutil process reference — call cpu_percent once to initialise the baseline # psutil — initialise baseline so first real reading is non-zero
self._process = psutil.Process() self._process = psutil.Process()
self._process.cpu_percent() # first call always returns 0.0; discard it self._process.cpu_percent() # first call always returns 0.0; discard
self._cpu_count: int = max(psutil.cpu_count(logical=True) or 1, 1)
# periodic snapshot timer # periodic snapshot timer
self._timer = QTimer(self) self._timer = QTimer(self)
@@ -67,9 +75,16 @@ class TelemetryCollector(QObject):
self._timer.timeout.connect(self._emit_snapshot) self._timer.timeout.connect(self._emit_snapshot)
self._timer.start() self._timer.start()
# latest snapshot (available synchronously)
self._latest: TelemetrySnapshot = self._make_empty_snapshot() self._latest: TelemetrySnapshot = self._make_empty_snapshot()
# ------------------------------------------------------------------
# Configuration
# ------------------------------------------------------------------
def set_target_fps(self, fps: float | None) -> None:
"""Record the FPS that was requested from the camera."""
self._target_fps = fps
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Frame subscriber callback # Frame subscriber callback
# ------------------------------------------------------------------ # ------------------------------------------------------------------
@@ -78,12 +93,11 @@ class TelemetryCollector(QObject):
"""Called by FrameDispatcher for every frame. Must be fast.""" """Called by FrameDispatcher for every frame. Must be fast."""
now = time.perf_counter() now = time.perf_counter()
# inter-frame time
if self._last_frame_time > 0: if self._last_frame_time > 0:
delta = now - self._last_frame_time delta = now - self._last_frame_time
self._frame_times.append(delta) self._frame_times.append(delta)
# drop detection: if delta > 2.5× the rolling average, count as drop # drop detection: gap > 2.5× rolling average
if len(self._frame_times) >= 5: if len(self._frame_times) >= 5:
avg = sum(self._frame_times) / len(self._frame_times) avg = sum(self._frame_times) / len(self._frame_times)
if delta > avg * 2.5: if delta > avg * 2.5:
@@ -92,9 +106,7 @@ class TelemetryCollector(QObject):
self._last_frame_time = now self._last_frame_time = now
self._total_frames += 1 self._total_frames += 1
# FPS window
self._fps_window.append(now) self._fps_window.append(now)
# prune old entries
cutoff = now - self._fps_window_size_s cutoff = now - self._fps_window_size_s
while self._fps_window and self._fps_window[0] < cutoff: while self._fps_window and self._fps_window[0] < cutoff:
self._fps_window.popleft() self._fps_window.popleft()
@@ -104,7 +116,6 @@ class TelemetryCollector(QObject):
# ------------------------------------------------------------------ # ------------------------------------------------------------------
def latest_snapshot(self) -> TelemetrySnapshot: def latest_snapshot(self) -> TelemetrySnapshot:
"""Return the most recently computed snapshot."""
return self._latest return self._latest
def reset_counters(self) -> None: def reset_counters(self) -> None:
@@ -131,46 +142,49 @@ class TelemetryCollector(QObject):
cutoff = now - self._fps_window_size_s cutoff = now - self._fps_window_size_s
while self._fps_window and self._fps_window[0] < cutoff: while self._fps_window and self._fps_window[0] < cutoff:
self._fps_window.popleft() self._fps_window.popleft()
fps = float(len(self._fps_window)) # frames in the last second fps = float(len(self._fps_window))
# average frame time # average frame time
if self._frame_times: avg_frame_time_ms = (
avg_frame_time_ms = (sum(self._frame_times) / len(self._frame_times)) * 1000.0 (sum(self._frame_times) / len(self._frame_times)) * 1000.0
else: if self._frame_times
avg_frame_time_ms = 0.0 else 0.0
)
# CPU — this process only, cumulative since last call (non-blocking) # CPU — per-core reading, then derive system-normalised value
try: try:
cpu = self._process.cpu_percent() cpu_core = self._process.cpu_percent()
except Exception: except Exception:
cpu = 0.0 cpu_core = 0.0
cpu_sys = cpu_core / self._cpu_count
# Memory — private working set (Windows) or RSS (macOS/Linux) # Memory — private working set (Windows) or RSS (macOS/Linux)
# This excludes shared DLLs/frameworks and matches Task Manager "Private"
try: try:
mem_info = self._process.memory_info() mem_info = self._process.memory_info()
# wset = Windows Working Set (private); rss on macOS/Linux
mem_bytes = getattr(mem_info, "wset", None) or mem_info.rss mem_bytes = getattr(mem_info, "wset", None) or mem_info.rss
mem_mb = mem_bytes / (1024 * 1024) mem_mb: float | None = mem_bytes / (1024 * 1024)
except Exception: except Exception:
mem_mb = None mem_mb = None
return TelemetrySnapshot( return TelemetrySnapshot(
fps=round(fps, 1), fps=round(fps, 1),
target_fps=self._target_fps,
frame_time_ms=round(avg_frame_time_ms, 2), frame_time_ms=round(avg_frame_time_ms, 2),
dropped_frames=self._dropped_frames, dropped_frames=self._dropped_frames,
cpu_percent=round(cpu, 1), cpu_percent_sys=round(cpu_sys, 1),
cpu_percent_core=round(cpu_core, 1),
memory_mb=round(mem_mb, 1) if mem_mb is not None else None, memory_mb=round(mem_mb, 1) if mem_mb is not None else None,
timestamp=now, timestamp=now,
) )
@staticmethod def _make_empty_snapshot(self) -> TelemetrySnapshot:
def _make_empty_snapshot() -> TelemetrySnapshot:
return TelemetrySnapshot( return TelemetrySnapshot(
fps=0.0, fps=0.0,
target_fps=self._target_fps,
frame_time_ms=0.0, frame_time_ms=0.0,
dropped_frames=0, dropped_frames=0,
cpu_percent=0.0, cpu_percent_sys=0.0,
cpu_percent_core=0.0,
memory_mb=None, memory_mb=None,
timestamp=time.perf_counter(), timestamp=time.perf_counter(),
) )

View File

@@ -120,6 +120,9 @@ class MainWindow(QMainWindow):
# TelemetryCollector → TelemetryOverlay (data only, no repaint trigger here) # TelemetryCollector → TelemetryOverlay (data only, no repaint trigger here)
self._telemetry.metrics_updated.connect(self._telemetry_overlay.on_metrics_updated) self._telemetry.metrics_updated.connect(self._telemetry_overlay.on_metrics_updated)
# CameraService → TelemetryCollector: keep target FPS in sync
self._camera_service.format_changed.connect(self._telemetry.set_target_fps)
# CameraService status # CameraService status
self._camera_service.camera_started.connect(self._on_camera_started) self._camera_service.camera_started.connect(self._on_camera_started)
self._camera_service.camera_stopped.connect(self._on_camera_stopped) self._camera_service.camera_stopped.connect(self._on_camera_stopped)

View File

@@ -10,7 +10,7 @@ from unittest.mock import MagicMock, patch
class TestTelemetryCollector: class TestTelemetryCollector:
"""Test telemetry calculations in isolation (no Qt event loop required).""" """Test telemetry calculations in isolation (no Qt event loop required)."""
def _make_collector(self): def _make_collector(self, cpu_count: int = 8):
"""Construct a TelemetryCollector bypassing Qt machinery.""" """Construct a TelemetryCollector bypassing Qt machinery."""
from app.telemetry.telemetry_collector import TelemetryCollector from app.telemetry.telemetry_collector import TelemetryCollector
@@ -23,8 +23,11 @@ class TestTelemetryCollector:
col._dropped_frames = 0 col._dropped_frames = 0
col._fps_window = deque() col._fps_window = deque()
col._fps_window_size_s = 1.0 col._fps_window_size_s = 1.0
col._target_fps = None
col._cpu_count = cpu_count
col._process = MagicMock() col._process = MagicMock()
# Simulate Windows: wset is present and takes priority over rss # Simulate Windows: wset takes priority over rss
mem_info = MagicMock() mem_info = MagicMock()
mem_info.wset = 50 * 1024 * 1024 # 50 MB private working set mem_info.wset = 50 * 1024 * 1024 # 50 MB private working set
mem_info.rss = 70 * 1024 * 1024 # RSS (larger, includes shared) mem_info.rss = 70 * 1024 * 1024 # RSS (larger, includes shared)
@@ -40,7 +43,6 @@ class TestTelemetryCollector:
def test_fps_counts_frames_in_window(self): def test_fps_counts_frames_in_window(self):
col = self._make_collector() col = self._make_collector()
now = time.perf_counter() now = time.perf_counter()
# Simulate 30 frames within the last second
for i in range(30): for i in range(30):
col._fps_window.append(now - 0.9 + i * 0.03) col._fps_window.append(now - 0.9 + i * 0.03)
snap = col._compute_snapshot() snap = col._compute_snapshot()
@@ -60,7 +62,6 @@ class TestTelemetryCollector:
def test_frame_time_average(self): def test_frame_time_average(self):
col = self._make_collector() col = self._make_collector()
# 10 frames at 33.3 ms each
interval = 0.0333 interval = 0.0333
for _ in range(10): for _ in range(10):
col._frame_times.append(interval) col._frame_times.append(interval)
@@ -69,7 +70,6 @@ class TestTelemetryCollector:
def test_drop_detection(self): def test_drop_detection(self):
col = self._make_collector() col = self._make_collector()
# Seed with 10 normal frames at ~16 ms
normal_interval = 0.016 normal_interval = 0.016
now = time.perf_counter() now = time.perf_counter()
col._last_frame_time = now - normal_interval * 10 col._last_frame_time = now - normal_interval * 10
@@ -77,15 +77,11 @@ class TestTelemetryCollector:
col._last_frame_time += normal_interval col._last_frame_time += normal_interval
col._frame_times.append(normal_interval) col._frame_times.append(normal_interval)
# Simulate a big gap (3× normal) — should trigger drop detection col._last_frame_time = now
col._last_frame_time = now - normal_interval * 10 # reset base
# Manually call on_frame-like logic
with patch("app.telemetry.telemetry_collector.time") as mock_time: with patch("app.telemetry.telemetry_collector.time") as mock_time:
# Set last_frame_time to something reasonable
col._last_frame_time = now col._last_frame_time = now
big_delta = normal_interval * 5 # 5× average → drop big_delta = normal_interval * 5 # 5× average → drop
mock_time.perf_counter.return_value = now + big_delta mock_time.perf_counter.return_value = now + big_delta
# Replicate the drop detection logic
delta = big_delta delta = big_delta
col._frame_times.append(delta) col._frame_times.append(delta)
avg = sum(col._frame_times) / len(col._frame_times) avg = sum(col._frame_times) / len(col._frame_times)
@@ -110,3 +106,34 @@ class TestTelemetryCollector:
col = self._make_collector() col = self._make_collector()
snap = col._compute_snapshot() snap = col._compute_snapshot()
assert snap.memory_mb == 50.0 assert snap.memory_mb == 50.0
def test_cpu_sys_is_core_divided_by_cpu_count(self):
col = self._make_collector(cpu_count=8)
col._process.cpu_percent.return_value = 80.0 # 80% of one core
snap = col._compute_snapshot()
assert snap.cpu_percent_core == 80.0
assert snap.cpu_percent_sys == round(80.0 / 8, 1)
def test_cpu_sys_never_exceeds_100_on_single_core_machine(self):
col = self._make_collector(cpu_count=1)
col._process.cpu_percent.return_value = 95.0
snap = col._compute_snapshot()
assert snap.cpu_percent_sys == snap.cpu_percent_core == 95.0
def test_cpu_sys_le_cpu_core(self):
"""cpu_percent_sys must always be <= cpu_percent_core."""
col = self._make_collector(cpu_count=4)
col._process.cpu_percent.return_value = 150.0
snap = col._compute_snapshot()
assert snap.cpu_percent_sys <= snap.cpu_percent_core
def test_target_fps_none_by_default(self):
col = self._make_collector()
snap = col._compute_snapshot()
assert snap.target_fps is None
def test_set_target_fps_reflected_in_snapshot(self):
col = self._make_collector()
col.set_target_fps(60.0)
snap = col._compute_snapshot()
assert snap.target_fps == 60.0