112 lines
4.0 KiB
Python
112 lines
4.0 KiB
Python
"""Frame Dispatcher — distributes QVideoFrames to registered subscribers."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import time
|
|
from collections.abc import Callable
|
|
from dataclasses import dataclass, field
|
|
|
|
from PySide6.QtCore import QObject, Slot
|
|
from PySide6.QtMultimedia import QVideoFrame
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
FrameCallback = Callable[[QVideoFrame], None]
|
|
|
|
|
|
@dataclass
|
|
class _Subscriber:
|
|
callback: FrameCallback
|
|
drop_if_busy: bool = True
|
|
_busy: bool = field(default=False, init=False, repr=False)
|
|
|
|
|
|
class FrameDispatcher(QObject):
|
|
"""
|
|
Receives frames from CameraService and fans them out to all subscribers.
|
|
|
|
Each subscriber is a callable (QVideoFrame) -> None.
|
|
|
|
Subscribers that set drop_if_busy=True will skip a frame if they are still
|
|
processing the previous one (non-blocking). Subscribers with drop_if_busy=False
|
|
always receive every frame.
|
|
|
|
All dispatch happens in the GUI thread (Qt signal/slot), so subscribers
|
|
must NOT perform heavy work directly — they should queue to a worker thread
|
|
if processing is needed.
|
|
"""
|
|
|
|
def __init__(self, parent: QObject | None = None) -> None:
|
|
super().__init__(parent)
|
|
self._subscribers: list[_Subscriber] = []
|
|
self._frame_count: int = 0
|
|
self._last_dispatch_time: float = 0.0
|
|
|
|
# ------------------------------------------------------------------
|
|
# Subscription API
|
|
# ------------------------------------------------------------------
|
|
|
|
def subscribe(self, callback: FrameCallback, *, drop_if_busy: bool = True) -> None:
|
|
"""Register a frame callback.
|
|
|
|
Args:
|
|
callback: Callable that receives QVideoFrame.
|
|
drop_if_busy: When True, frame is skipped if subscriber is still
|
|
marked busy from last call (default True).
|
|
"""
|
|
for sub in self._subscribers:
|
|
if sub.callback is callback:
|
|
logger.warning("Subscriber %r already registered", callback)
|
|
return
|
|
self._subscribers.append(_Subscriber(callback=callback, drop_if_busy=drop_if_busy))
|
|
logger.debug("Subscriber added: %r (drop_if_busy=%s)", callback, drop_if_busy)
|
|
|
|
def unsubscribe(self, callback: FrameCallback) -> None:
|
|
"""Remove a previously registered callback."""
|
|
before = len(self._subscribers)
|
|
self._subscribers = [s for s in self._subscribers if s.callback is not callback]
|
|
if len(self._subscribers) < before:
|
|
logger.debug("Subscriber removed: %r", callback)
|
|
else:
|
|
logger.debug("Subscriber not found for removal: %r", callback)
|
|
|
|
def subscriber_count(self) -> int:
|
|
return len(self._subscribers)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Frame intake — connect CameraService.frame_ready to this slot
|
|
# ------------------------------------------------------------------
|
|
|
|
@Slot(QVideoFrame)
|
|
def dispatch(self, frame: QVideoFrame) -> None:
|
|
"""Distribute the frame to all registered subscribers."""
|
|
self._frame_count += 1
|
|
now = time.perf_counter()
|
|
self._last_dispatch_time = now
|
|
|
|
for sub in self._subscribers:
|
|
if sub.drop_if_busy and sub._busy:
|
|
logger.debug("Dropping frame for busy subscriber %r", sub.callback)
|
|
continue
|
|
sub._busy = True
|
|
try:
|
|
sub.callback(frame)
|
|
except Exception:
|
|
logger.exception("Error in frame subscriber %r", sub.callback)
|
|
finally:
|
|
sub._busy = False
|
|
|
|
# ------------------------------------------------------------------
|
|
# Stats
|
|
# ------------------------------------------------------------------
|
|
|
|
@property
|
|
def frame_count(self) -> int:
|
|
return self._frame_count
|
|
|
|
@property
|
|
def last_dispatch_time(self) -> float:
|
|
"""perf_counter timestamp of the last dispatched frame."""
|
|
return self._last_dispatch_time
|