- Implement main application entry point in duck-ocr.py - Create logging configuration in logging_config.py - Add video streaming functionality in camera.py - Introduce main window UI in main_window.py - Include SVG assets for UI buttons and icons - Update .gitignore to exclude log files - Add placeholder .gitkeep files for empty directories
148 lines
4.6 KiB
Python
148 lines
4.6 KiB
Python
from PySide6.QtCore import QThread, QTimer, QMutex, QWaitCondition, Signal, QObject, QElapsedTimer
|
|
import cv2
|
|
import time
|
|
import logging
|
|
from dataclasses import dataclass
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class VideoMetrics:
|
|
frames_processed: int
|
|
frames_dropped: int
|
|
last_frame_time: float
|
|
last_cap_time: float
|
|
fps_average: float
|
|
fps_last_time: float
|
|
fps_frame_count: float
|
|
|
|
def update_fps(self) -> None:
|
|
fps_now = time.perf_counter()
|
|
elypsed = fps_now - self.fps_last_time
|
|
if elypsed < 1.0:
|
|
return
|
|
|
|
self.fps_frame_count = self.frames_processed - self.fps_frame_count
|
|
self.fps_average = self.fps_frame_count / elypsed
|
|
self.fps_frame_count = self.frames_processed
|
|
self.fps_last_time = fps_now
|
|
|
|
|
|
class VideoStreamWorker(QObject):
|
|
# Emitowany z wątku roboczego - będzie przenoszony do głównego wątku przez VideoStream
|
|
_internal_frame = Signal(object)
|
|
|
|
def __init__(self, source):
|
|
super().__init__()
|
|
self.source = source
|
|
self.cap = None
|
|
self.fps = 30.0
|
|
self.width = 0
|
|
self.height = 0
|
|
self.running = False
|
|
self.metrics = VideoMetrics(
|
|
frames_processed=0,
|
|
frames_dropped=0,
|
|
last_frame_time=0.0,
|
|
last_cap_time=0.0,
|
|
fps_average=0.0,
|
|
fps_last_time=time.perf_counter(),
|
|
fps_frame_count=0.0
|
|
)
|
|
|
|
logger.debug(f"VideoStreamWorker initialized with source: {source}")
|
|
|
|
def set_source(self, source):
|
|
logger.debug(f"Setting new video source: {source}")
|
|
self.source = source
|
|
if self.running:
|
|
self.stop()
|
|
self.run()
|
|
|
|
def run(self):
|
|
"""Główna pętla wątku roboczego."""
|
|
if self.source is None:
|
|
logger.warning("No video source provided")
|
|
return
|
|
|
|
self.cap = cv2.VideoCapture(self.source)
|
|
if not self.cap.isOpened():
|
|
logger.error(f"Failed to open video source: {self.source}")
|
|
return
|
|
|
|
self.fps = self.cap.get(cv2.CAP_PROP_FPS)
|
|
if self.fps <= 0:
|
|
self.fps = 30.0
|
|
|
|
self.width = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
self.height = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
|
|
logger.debug(f"Video opened: {self.source} (fps: {self.fps}, size: {self.width}x{self.height})")
|
|
|
|
frame_interval = 1.0 / self.fps
|
|
self.running = True
|
|
frame_emit_time = time.perf_counter()
|
|
|
|
while self.running:
|
|
next_frame_time = frame_emit_time + frame_interval
|
|
|
|
ret, frame = self.cap.read()
|
|
|
|
if not ret:
|
|
logger.debug("End of video stream or read error")
|
|
break
|
|
|
|
current_time = time.perf_counter()
|
|
self.metrics.last_cap_time = current_time - frame_emit_time
|
|
|
|
if current_time < next_frame_time:
|
|
sleep_time = next_frame_time - current_time
|
|
time.sleep(sleep_time)
|
|
else:
|
|
self.metrics.frames_dropped += 1
|
|
logger.debug(f"Frame drops counted: {self.metrics.frames_dropped}")
|
|
|
|
frame_emit_time = time.perf_counter()
|
|
self._internal_frame.emit(frame)
|
|
|
|
self.metrics.frames_processed += 1
|
|
self.metrics.update_fps()
|
|
|
|
self.cap.release()
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
self.mutex.lock()
|
|
self.condition.wakeAll()
|
|
self.mutex.unlock()
|
|
|
|
|
|
class VideoStream(QObject):
|
|
"""Klasa fasadowa do użycia w głównym wątku GUI."""
|
|
frame_ready = Signal(object) # To będzie emitowane z głównego wątku
|
|
|
|
def __init__(self, source):
|
|
super().__init__()
|
|
self.worker = VideoStreamWorker(source)
|
|
self.worker_thread = QThread()
|
|
|
|
# Przenosimy workera do osobnego wątku
|
|
self.worker.moveToThread(self.worker_thread)
|
|
|
|
# Łączymy sygnały
|
|
self.worker._internal_frame.connect(self._on_frame)
|
|
self.worker_thread.started.connect(self.worker.run)
|
|
self.worker_thread.finished.connect(self.worker.deleteLater)
|
|
self.worker_thread.finished.connect(self.worker_thread.deleteLater)
|
|
|
|
def start(self):
|
|
self.worker_thread.start()
|
|
|
|
def stop(self):
|
|
self.worker.stop()
|
|
self.worker_thread.quit()
|
|
self.worker_thread.wait()
|
|
|
|
def _on_frame(self, frame):
|
|
"""Slot wywoływany w głównym wątku po otrzymaniu klatki."""
|
|
# Tutaj możesz dodać korekcję ekspozycji jeśli nie zrobiłeś tego w workerze
|
|
self.frame_ready.emit(frame) |