252 lines
9.9 KiB
Python
252 lines
9.9 KiB
Python
import numpy as np
|
|
import cv2
|
|
from logging import getLogger
|
|
from collections import deque
|
|
from cv2_enumerate_cameras import enumerate_cameras
|
|
from PySide6.QtCore import QObject, QThread, Signal, Slot
|
|
from PySide6.QtGui import QImage
|
|
from dataclasses import dataclass, field
|
|
from .worker import VideoStreamWorker
|
|
|
|
logger = getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class FrameMetrics:
|
|
fr_start_delta_min: float = float('inf') # Minimum time between frame read starts
|
|
fr_start_delta_max: float = 0.0 # Maximum time between frame read starts
|
|
fr_start_delta_avg: float = 0.0 # Average time between frame read starts (calculated over a buffer of recent frames)
|
|
fr_time_min: float = float('inf') # Minimum time taken to read a frame
|
|
fr_time_max: float = 0.0 # Maximum time taken to read a frame
|
|
fr_time_avg: float = 0.0 # Average time taken to read a frame (calculated over a buffer of recent frames)
|
|
fps_average_1s: float = 0.0 # Average FPS calculated over the last 1 second (or a buffer of recent frames)
|
|
frame_count: int = 0 # Total number of frames processed
|
|
last_start_time: float = 0.0 # Timestamp of the last frame read start (used for calculating deltas)
|
|
buf_size: int = 30 # Size of the buffer for calculating average metrics
|
|
b_fr_start_delta: deque = field(init=False, repr=False) # Buffer for recent frame read start deltas (used for calculating average)
|
|
total_start_delta: float = 0.0 # Total accumulated frame read start delta (used for calculating average)
|
|
b_fr_time_delta: deque = field(init=False, repr=False) # Buffer for recent frame read time deltas (used for calculating average)
|
|
total_frame_time: float = 0.0 # Total accumulated frame read time delta (used for calculating average)
|
|
frame_drops: int = 0 # Total number of frames dropped (if frame read time exceeds expected frame interval)
|
|
|
|
def __post_init__(self):
|
|
self.b_fr_start_delta = deque(maxlen=self.buf_size)
|
|
self.b_fr_time_delta = deque(maxlen=self.buf_size)
|
|
|
|
def calc_start_delta_min_max_avg(self, start_time: float) -> None:
|
|
if self.last_start_time == 0.0:
|
|
self.last_start_time = start_time
|
|
return
|
|
|
|
start_delta = start_time - self.last_start_time
|
|
self.fr_start_delta_min = min(self.fr_start_delta_min, start_delta)
|
|
self.fr_start_delta_max = max(self.fr_start_delta_max, start_delta)
|
|
|
|
if len(self.b_fr_start_delta) == self.buf_size:
|
|
self.total_start_delta -= self.b_fr_start_delta[0]
|
|
|
|
self.b_fr_start_delta.append(start_delta)
|
|
self.total_start_delta += start_delta
|
|
self.last_start_time = start_time
|
|
|
|
self.fr_start_delta_avg = self.total_start_delta / len(self.b_fr_start_delta)
|
|
|
|
def calc_frame_time_min_max_avg(self, start_time: float, end_time: float) -> None:
|
|
fr_time = end_time - start_time
|
|
self.fr_time_min = min(self.fr_time_min, fr_time)
|
|
self.fr_time_max = max(self.fr_time_max, fr_time)
|
|
|
|
if len(self.b_fr_time_delta) == self.buf_size:
|
|
self.total_frame_time -= self.b_fr_time_delta[0]
|
|
|
|
self.b_fr_time_delta.append(fr_time)
|
|
self.total_frame_time += fr_time
|
|
|
|
self.fr_time_avg = self.total_frame_time / len(self.b_fr_time_delta)
|
|
|
|
def calc_fps(self) -> None:
|
|
if self.frame_count < self.buf_size:
|
|
return
|
|
|
|
# fps = (len(self.b_fr_start_delta)) / self.total_start_delta if self.total_start_delta > 0 else 0.0
|
|
# self.fps_average_1s = fps
|
|
|
|
self.fps_average_1s = 1.0 / self.fr_start_delta_avg if self.fr_start_delta_avg > 0 else 0.0
|
|
|
|
def update_metrics(self, start_time: float, end_time: float, frames_dropped: int) -> None:
|
|
self.frame_count += 1
|
|
self.calc_start_delta_min_max_avg(start_time)
|
|
self.calc_frame_time_min_max_avg(start_time, end_time)
|
|
self.calc_fps()
|
|
self.frame_drops = frames_dropped
|
|
|
|
def reset_metrics(self) -> None:
|
|
self.fr_start_delta_min = float('inf')
|
|
self.fr_start_delta_max = 0.0
|
|
self.fr_start_delta_avg = 0.0
|
|
self.fr_time_min = float('inf')
|
|
self.fr_time_max = 0.0
|
|
self.fr_time_avg = 0.0
|
|
self.fps_average_1s = 0.0
|
|
self.frame_count = 0
|
|
self.last_start_time = 0.0
|
|
self.b_fr_start_delta.clear()
|
|
self.total_start_delta = 0.0
|
|
self.b_fr_time_delta.clear()
|
|
self.total_frame_time = 0.0
|
|
self.frame_drops = 0
|
|
|
|
def get_metrics(self) -> "FrameMetrics":
|
|
return FrameMetrics(
|
|
fr_start_delta_min= round(self.fr_start_delta_min * 1000, 3), # Convert to milliseconds
|
|
fr_start_delta_max= round(self.fr_start_delta_max * 1000, 3),
|
|
fr_start_delta_avg= round(self.fr_start_delta_avg * 1000, 3),
|
|
fr_time_min= round(self.fr_time_min * 1000, 3),
|
|
fr_time_max= round(self.fr_time_max * 1000, 3),
|
|
fr_time_avg= round(self.fr_time_avg * 1000, 3),
|
|
fps_average_1s= round(self.fps_average_1s, 3),
|
|
frame_count=self.frame_count,
|
|
last_start_time=self.last_start_time,
|
|
buf_size=self.buf_size,
|
|
total_frame_time=round(self.total_frame_time * 1000, 3),
|
|
total_start_delta=round(self.total_start_delta * 1000, 3)
|
|
)
|
|
|
|
|
|
class VideoStreamController(QObject):
|
|
"""
|
|
Facade class managing the VideoStreamWorker and its QThread.
|
|
Converts raw frames to QImage for the UI and can route them to other processors.
|
|
"""
|
|
image_ready = Signal(QImage) # Signal for the UI
|
|
raw_frame_ready = Signal(np.ndarray) # Signal for other processing modules
|
|
error_occurred = Signal(str)
|
|
status_changed = Signal(bool)
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self._thread = QThread()
|
|
self._worker = VideoStreamWorker()
|
|
self._worker.moveToThread(self._thread)
|
|
self._metrics: FrameMetrics | None = None
|
|
|
|
# Connect internal signals
|
|
self._worker.frame_ready.connect(self._handle_frame)
|
|
self._worker.frame_time.connect(self._handle_frame_time)
|
|
self._worker.error_occurred.connect(self.error_occurred)
|
|
self._worker.started.connect(self.started)
|
|
self._worker.stopped.connect(self.stopped)
|
|
|
|
# Cleanup
|
|
self._thread.finished.connect(self._worker.stop)
|
|
|
|
self._thread.start()
|
|
|
|
@staticmethod
|
|
def get_available_cameras():
|
|
"""
|
|
Returns a list of available camera devices using cv2_enumerate_cameras.
|
|
Each item is an object with properties like 'index' and 'name'.
|
|
"""
|
|
return list(enumerate_cameras())
|
|
|
|
@Slot()
|
|
def get_metrics(self) -> FrameMetrics | None:
|
|
"""
|
|
Returns the current frame metrics.
|
|
"""
|
|
return self._metrics.get_metrics() if self._metrics else None
|
|
|
|
def reset_metrics(self):
|
|
"""
|
|
Resets the frame metrics to initial state.
|
|
"""
|
|
if self._metrics:
|
|
self._metrics.reset_metrics()
|
|
|
|
@Slot(object)
|
|
def change_source(self, source):
|
|
"""
|
|
Change the video source (camera index or file path).
|
|
"""
|
|
self._worker.set_source(source)
|
|
|
|
@Slot()
|
|
def start(self):
|
|
logger.debug("Starting video stream worker thread.")
|
|
self._worker.start()
|
|
|
|
@Slot()
|
|
def stop(self):
|
|
logger.debug("Stopping video stream worker thread.")
|
|
self._worker.stop()
|
|
|
|
@Slot(float, tuple)
|
|
def started(self, fps: float, video_res: tuple):
|
|
logger.debug(f"Video stream worker started with FPS: {fps}, Resolution: {video_res[0]}x{video_res[1]}")
|
|
self._metrics = FrameMetrics(buf_size=int(fps))
|
|
self.status_changed.emit(True)
|
|
|
|
@Slot()
|
|
def stopped(self):
|
|
logger.debug("Video stream worker thread stopped.")
|
|
if self._metrics:
|
|
self._metrics.reset_metrics()
|
|
self.status_changed.emit(False)
|
|
|
|
def cleanup(self):
|
|
"""
|
|
Safely shuts down the worker and the thread.
|
|
Must be called before the application exits.
|
|
"""
|
|
logger.debug("Cleaning up video stream controller.")
|
|
self._worker.stop()
|
|
self._thread.quit()
|
|
if not self._thread.wait(2000): # Wait up to 2 seconds
|
|
self._thread.terminate()
|
|
self._thread.wait()
|
|
|
|
@Slot(np.ndarray)
|
|
def _handle_frame(self, frame):
|
|
"""
|
|
Internal handler for frames from the worker.
|
|
Handles conversion to QImage and routing.
|
|
"""
|
|
# 1. Emit raw frame for other processing (OCR, Detection, etc.)
|
|
self.raw_frame_ready.emit(frame)
|
|
|
|
# 2. Convert to QImage for UI
|
|
q_image = self._convert_to_qimage(frame)
|
|
self.image_ready.emit(q_image)
|
|
|
|
def _convert_to_qimage(self, frame):
|
|
"""
|
|
Converts a BGR numpy array to a RGB QImage.
|
|
"""
|
|
if frame is None:
|
|
return QImage()
|
|
|
|
# OpenCV uses BGR, PySide uses RGB
|
|
try:
|
|
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
height, width, channel = frame_rgb.shape
|
|
bytes_per_line = channel * width
|
|
|
|
return QImage(
|
|
frame_rgb.data,
|
|
width,
|
|
height,
|
|
bytes_per_line,
|
|
QImage.Format.Format_RGB888
|
|
).copy() # .copy() ensures the QImage owns the data and is safe to pass across threads
|
|
except Exception as e:
|
|
self.error_occurred.emit(f"Image conversion error: {str(e)}")
|
|
return QImage()
|
|
|
|
@Slot(float, float, int)
|
|
def _handle_frame_time(self, start_time, end_time, frames_dropped):
|
|
"""
|
|
Optional: Handle frame timing information for performance monitoring.
|
|
"""
|
|
if self._metrics:
|
|
self._metrics.update_metrics(start_time, end_time, frames_dropped) |