- Implemented CameraSettingsDialog to manage UVC and Qt camera controls. - Integrated UVC parameter sliders and auto controls for brightness, contrast, saturation, hue, sharpness, gamma, white balance, backlight compensation, and exposure. - Added functionality to change white balance and exposure settings via Qt controls. - Updated MainWindow to open CameraSettingsDialog and manage UVC controller lifecycle. - Enhanced AppMenuBar to include a Camera Settings option. - Created tests for UVC controller abstraction layer and parameter info. - Documented camera specifications and supported features in new markdown files.
226 lines
8.6 KiB
Python
226 lines
8.6 KiB
Python
"""Main application window."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
from PySide6.QtCore import QTimer
|
|
from PySide6.QtWidgets import QLabel, QMainWindow, QSizePolicy, QStatusBar
|
|
|
|
from app.camera.camera_enumerator import CameraEnumerator, CameraFormat, CameraInfo
|
|
from app.camera.camera_service import CameraService
|
|
from app.camera.uvc import make_uvc_controller
|
|
from app.camera.uvc.base import UvcControllerBase
|
|
from app.camera.uvc.stub import NullUvcController
|
|
from app.config import APP_NAME, APP_VERSION
|
|
from app.overlay.telemetry_overlay import TelemetryOverlay
|
|
from app.pipeline.frame_dispatcher import FrameDispatcher
|
|
from app.telemetry.csv_logger import CsvTelemetryLogger
|
|
from app.telemetry.telemetry_collector import TelemetryCollector
|
|
from app.ui.camera_settings_dialog import CameraSettingsDialog
|
|
from app.ui.camera_view import CameraView
|
|
from app.ui.menu_bar import AppMenuBar
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MainWindow(QMainWindow):
|
|
"""
|
|
Top-level application window.
|
|
|
|
Rendering architecture:
|
|
QVideoWidget is intentionally NOT used — on Windows its native HWND
|
|
surface occludes all sibling/child QWidgets regardless of z-order.
|
|
CameraView is a plain QWidget that renders frames and overlay layers
|
|
in a single paintEvent pass.
|
|
|
|
Signal flow:
|
|
CameraService.frame_ready
|
|
→ FrameDispatcher.dispatch
|
|
→ CameraView.on_frame (render frame)
|
|
→ TelemetryCollector.on_frame (measure metrics)
|
|
→ TelemetryOverlay.on_metrics_updated (overlay data)
|
|
→ CsvTelemetryLogger.on_metrics_updated (CSV file)
|
|
"""
|
|
|
|
def __init__(self, log_path: Path | None = None) -> None:
|
|
super().__init__()
|
|
|
|
self.setWindowTitle(f"{APP_NAME} v{APP_VERSION}")
|
|
self.setMinimumSize(640, 480)
|
|
self.resize(1280, 720)
|
|
|
|
# --- Core pipeline components ---
|
|
self._camera_service = CameraService(self)
|
|
self._dispatcher = FrameDispatcher(self)
|
|
self._telemetry = TelemetryCollector(parent=self)
|
|
|
|
# --- UVC controller (platform-specific, lazy-opened per camera) ---
|
|
self._uvc: UvcControllerBase = NullUvcController()
|
|
|
|
# --- CSV telemetry logger ---
|
|
self._csv_logger: CsvTelemetryLogger | None = None
|
|
if log_path is not None:
|
|
csv_path = log_path.with_suffix(".csv")
|
|
self._csv_logger = CsvTelemetryLogger(csv_path)
|
|
logger.info("Telemetry CSV: %s", csv_path.resolve())
|
|
|
|
# --- Camera view (central widget) ---
|
|
self._camera_view = CameraView(self)
|
|
self._camera_view.setSizePolicy(
|
|
QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding
|
|
)
|
|
self.setCentralWidget(self._camera_view)
|
|
|
|
# --- Overlay layers ---
|
|
self._telemetry_overlay = TelemetryOverlay()
|
|
self._camera_view.add_overlay_layer(self._telemetry_overlay)
|
|
|
|
# --- Menu bar ---
|
|
self._menu = AppMenuBar(self)
|
|
self.setMenuBar(self._menu)
|
|
if log_path is not None:
|
|
self._menu.set_log_file_path(str(log_path.resolve()))
|
|
|
|
# --- Status bar ---
|
|
self._status_bar = QStatusBar(self)
|
|
self.setStatusBar(self._status_bar)
|
|
self._status_label = QLabel("Initialising\u2026")
|
|
self._status_bar.addWidget(self._status_label)
|
|
|
|
# --- Wire signals ---
|
|
self._wire_signals()
|
|
|
|
# --- Enumerate cameras and start ---
|
|
QTimer.singleShot(0, self._initialise_cameras)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Initialisation
|
|
# ------------------------------------------------------------------
|
|
|
|
def _initialise_cameras(self) -> None:
|
|
cameras = CameraEnumerator.list_cameras()
|
|
|
|
if not cameras:
|
|
self._status_label.setText("No cameras found")
|
|
logger.warning("No cameras detected")
|
|
return
|
|
|
|
self._menu.populate_cameras(cameras)
|
|
|
|
default = CameraEnumerator.default_camera()
|
|
start_cam = default if default is not None else cameras[0]
|
|
|
|
self._menu.populate_formats(start_cam)
|
|
self._start_camera(start_cam)
|
|
|
|
def _start_camera(self, cam: CameraInfo) -> None:
|
|
self._telemetry.reset_counters()
|
|
self._camera_service.start(cam)
|
|
self._menu.set_active_camera(cam)
|
|
self._status_label.setText(f"Opening: {cam.name}")
|
|
self._open_uvc(cam)
|
|
|
|
def _open_uvc(self, cam: CameraInfo) -> None:
|
|
"""Open or reopen the UVC controller for the given camera."""
|
|
if self._uvc.is_open():
|
|
self._uvc.close()
|
|
ctrl = make_uvc_controller(cam.name)
|
|
if not ctrl.is_open():
|
|
# factory may return a pre-opened controller or a NullUvcController
|
|
ctrl.open(cam.name)
|
|
self._uvc = ctrl
|
|
|
|
# ------------------------------------------------------------------
|
|
# Signal wiring
|
|
# ------------------------------------------------------------------
|
|
|
|
def _wire_signals(self) -> None:
|
|
# CameraService → FrameDispatcher
|
|
self._camera_service.frame_ready.connect(self._dispatcher.dispatch)
|
|
|
|
# FrameDispatcher → CameraView (render) — drop if busy
|
|
self._dispatcher.subscribe(self._camera_view.on_frame, drop_if_busy=True)
|
|
|
|
# FrameDispatcher → TelemetryCollector — never drop
|
|
self._dispatcher.subscribe(self._telemetry.on_frame, drop_if_busy=False)
|
|
|
|
# TelemetryCollector → overlay
|
|
self._telemetry.metrics_updated.connect(
|
|
self._telemetry_overlay.on_metrics_updated
|
|
)
|
|
|
|
# TelemetryCollector → CSV logger (throttled internally)
|
|
if self._csv_logger is not None:
|
|
self._telemetry.metrics_updated.connect(self._csv_logger.on_metrics_updated)
|
|
|
|
# CameraService → TelemetryCollector: keep target FPS in sync
|
|
self._camera_service.format_changed.connect(self._telemetry.set_target_fps)
|
|
|
|
# CameraService status
|
|
self._camera_service.camera_started.connect(self._on_camera_started)
|
|
self._camera_service.camera_stopped.connect(self._on_camera_stopped)
|
|
self._camera_service.camera_error.connect(self._on_camera_error)
|
|
|
|
# Menu signals
|
|
self._menu.camera_selected.connect(self._on_camera_selected)
|
|
self._menu.format_selected.connect(self._on_format_selected)
|
|
self._menu.reconnect_requested.connect(self._camera_service.reconnect)
|
|
self._menu.overlay_toggled.connect(self._camera_view.set_all_overlays_visible)
|
|
self._menu.camera_settings_requested.connect(self._on_settings_requested)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Camera status slots
|
|
# ------------------------------------------------------------------
|
|
|
|
def _on_camera_started(self) -> None:
|
|
cam = self._camera_service.current_camera
|
|
name = cam.name if cam else "Unknown"
|
|
self._status_label.setText(f"Streaming: {name}")
|
|
logger.info("Camera streaming: %s", name)
|
|
|
|
def _on_camera_stopped(self) -> None:
|
|
self._status_label.setText("Camera stopped")
|
|
|
|
def _on_camera_error(self, message: str) -> None:
|
|
self._status_label.setText(f"Error: {message}")
|
|
logger.error("Camera error: %s", message)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Menu action slots
|
|
# ------------------------------------------------------------------
|
|
|
|
def _on_camera_selected(self, cam: CameraInfo) -> None:
|
|
self._start_camera(cam)
|
|
|
|
def _on_format_selected(self, fmt: CameraFormat) -> None:
|
|
logger.info(
|
|
"Format selected via menu: %dx%d @ %.4g fps (%s)",
|
|
fmt.width, fmt.height, fmt.max_fps, fmt.pixel_format,
|
|
)
|
|
self._camera_service.set_format(fmt)
|
|
|
|
def _on_settings_requested(self) -> None:
|
|
qt_cam = self._camera_service.qt_camera
|
|
if qt_cam is None:
|
|
logger.warning("Settings requested but no camera is active")
|
|
return
|
|
dlg = CameraSettingsDialog(qt_cam, self._uvc, parent=self)
|
|
dlg.exec()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Qt overrides
|
|
# ------------------------------------------------------------------
|
|
|
|
def closeEvent(self, event) -> None: # noqa: N802
|
|
self._camera_service.stop()
|
|
if self._uvc.is_open():
|
|
self._uvc.close()
|
|
if self._csv_logger is not None:
|
|
logger.info(
|
|
"CSV telemetry: %d rows written", self._csv_logger.rows_written
|
|
)
|
|
self._csv_logger.close()
|
|
super().closeEvent(event)
|