"""Main application window.""" from __future__ import annotations import logging from pathlib import Path from PySide6.QtCore import QTimer from PySide6.QtWidgets import QLabel, QMainWindow, QMessageBox, QSizePolicy, QStatusBar from app.camera.camera_enumerator import CameraEnumerator, CameraFormat, CameraInfo from app.camera.camera_service import CameraService from app.camera.uvc import make_uvc_controller from app.camera.uvc.base import UvcControllerBase from app.camera.uvc.stub import NullUvcController from app.config import APP_NAME, APP_VERSION from app.inference.bbox_overlay import BboxOverlay from app.inference.worker_manager import InferenceManager from app.overlay.telemetry_overlay import TelemetryOverlay from app.pipeline.frame_dispatcher import FrameDispatcher from app.telemetry.csv_logger import CsvTelemetryLogger from app.telemetry.telemetry_collector import TelemetryCollector from app.ui.camera_settings_dialog import CameraSettingsDialog from app.ui.camera_view import CameraView from app.ui.menu_bar import AppMenuBar from app.video.video_player import VideoPlayer logger = logging.getLogger(__name__) class MainWindow(QMainWindow): """ Top-level application window. Frame source (exclusive): • CameraService — live camera (default) • VideoPlayer — local video file Inference pipeline (optional): InferenceManager runs YOLO in a separate process. Frames submitted via FrameDispatcher subscriber (drop_if_busy). Results displayed by BboxOverlay. Signal flow: [CameraService | VideoPlayer].frame_ready(QVideoFrame) → FrameDispatcher.dispatch → CameraView.on_frame (render) → TelemetryCollector.on_frame (metrics) → TelemetryOverlay (HUD) → CsvTelemetryLogger (CSV) → InferenceManager.submit_frame (drop_if_busy, optional) → [worker process] YOLO → BboxOverlay.on_detections (draw boxes) """ def __init__(self, log_path: Path | None = None) -> None: super().__init__() self.setWindowTitle(f"{APP_NAME} v{APP_VERSION}") self.setMinimumSize(640, 480) self.resize(1280, 720) # --- Core pipeline --- self._camera_service = CameraService(self) self._video_player = VideoPlayer(self) self._dispatcher = FrameDispatcher(self) self._telemetry = TelemetryCollector(parent=self) self._inference = InferenceManager(self) # Track which source is active self._video_source_active: bool = False self._current_camera: CameraInfo | None = None # --- UVC --- self._uvc: UvcControllerBase = NullUvcController() # --- CSV logger --- self._csv_logger: CsvTelemetryLogger | None = None if log_path is not None: csv_path = log_path.with_suffix(".csv") self._csv_logger = CsvTelemetryLogger(csv_path) logger.info("Telemetry CSV: %s", csv_path.resolve()) # --- Camera view --- self._camera_view = CameraView(self) self._camera_view.setSizePolicy( QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding ) self.setCentralWidget(self._camera_view) # --- Overlay layers --- self._telemetry_overlay = TelemetryOverlay() self._bbox_overlay = BboxOverlay() self._camera_view.add_overlay_layer(self._telemetry_overlay) self._camera_view.add_overlay_layer(self._bbox_overlay) self._bbox_overlay.visible = False # hidden until inference enabled # --- Menu bar --- self._menu = AppMenuBar(self) self.setMenuBar(self._menu) if log_path is not None: self._menu.set_log_file_path(str(log_path.resolve())) # --- Status bar --- self._status_bar = QStatusBar(self) self.setStatusBar(self._status_bar) self._status_label = QLabel("Initialising\u2026") self._status_bar.addWidget(self._status_label) # --- Wire signals --- self._wire_signals() QTimer.singleShot(0, self._initialise_cameras) # ------------------------------------------------------------------ # Initialisation # ------------------------------------------------------------------ def _initialise_cameras(self) -> None: cameras = CameraEnumerator.list_cameras() if not cameras: self._status_label.setText("No cameras found") logger.warning("No cameras detected") return self._menu.populate_cameras(cameras) default = CameraEnumerator.default_camera() start_cam = default if default is not None else cameras[0] self._menu.populate_formats(start_cam) self._start_camera(start_cam) def _start_camera(self, cam: CameraInfo) -> None: self._current_camera = cam self._telemetry.reset_counters() self._camera_service.start(cam) self._menu.set_active_camera(cam) self._status_label.setText(f"Opening: {cam.name}") self._open_uvc(cam) def _open_uvc(self, cam: CameraInfo) -> None: if self._uvc.is_open(): self._uvc.close() ctrl = make_uvc_controller(cam.name) if not ctrl.is_open(): ctrl.open(cam.name) self._uvc = ctrl # ------------------------------------------------------------------ # Signal wiring # ------------------------------------------------------------------ def _wire_signals(self) -> None: # ---- Active source → dispatcher ---- # (connected dynamically in _switch_to_camera / _switch_to_video) self._camera_service.frame_ready.connect(self._dispatcher.dispatch) # ---- Dispatcher fans out to all consumers ---- self._dispatcher.subscribe(self._camera_view.on_frame, drop_if_busy=True) self._dispatcher.subscribe(self._telemetry.on_frame, drop_if_busy=False) # InferenceManager subscriber added/removed dynamically on toggle # ---- Telemetry ---- self._telemetry.metrics_updated.connect( self._telemetry_overlay.on_metrics_updated ) if self._csv_logger is not None: self._telemetry.metrics_updated.connect(self._csv_logger.on_metrics_updated) self._camera_service.format_changed.connect(self._telemetry.set_target_fps) # ---- Camera service status ---- self._camera_service.camera_started.connect(self._on_camera_started) self._camera_service.camera_stopped.connect(self._on_camera_stopped) self._camera_service.camera_error.connect(self._on_camera_error) # ---- Video player status ---- self._video_player.playback_started.connect(self._on_playback_started) self._video_player.playback_stopped.connect(self._on_playback_stopped) self._video_player.playback_error.connect(self._on_playback_error) # ---- InferenceManager ---- self._inference.detections_ready.connect(self._bbox_overlay.on_detections) self._inference.inference_started.connect(self._on_inference_started) self._inference.inference_stopped.connect(self._on_inference_stopped) self._inference.inference_error.connect(self._on_inference_error) # ---- Menu ---- self._menu.camera_selected.connect(self._on_camera_selected) self._menu.format_selected.connect(self._on_format_selected) self._menu.reconnect_requested.connect(self._camera_service.reconnect) self._menu.overlay_toggled.connect(self._camera_view.set_all_overlays_visible) self._menu.camera_settings_requested.connect(self._on_settings_requested) self._menu.video_file_selected.connect(self._on_video_selected) self._menu.video_closed.connect(self._on_video_closed) self._menu.model_file_selected.connect(self._on_model_selected) self._menu.inference_toggled.connect(self._on_inference_toggled) # ------------------------------------------------------------------ # Source switching # ------------------------------------------------------------------ def _switch_to_camera(self) -> None: """Disconnect VideoPlayer, connect CameraService to dispatcher.""" try: self._video_player.frame_ready.disconnect(self._dispatcher.dispatch) except RuntimeError: pass self._camera_service.frame_ready.connect(self._dispatcher.dispatch) self._video_source_active = False self._menu.set_video_source_active(False) def _switch_to_video(self) -> None: """Disconnect CameraService, connect VideoPlayer to dispatcher.""" try: self._camera_service.frame_ready.disconnect(self._dispatcher.dispatch) except RuntimeError: pass self._video_player.frame_ready.connect(self._dispatcher.dispatch) self._video_source_active = True self._menu.set_video_source_active(True) # ------------------------------------------------------------------ # Camera status slots # ------------------------------------------------------------------ def _on_camera_started(self) -> None: cam = self._camera_service.current_camera name = cam.name if cam else "Unknown" self._status_label.setText(f"Streaming: {name}") logger.info("Camera streaming: %s", name) def _on_camera_stopped(self) -> None: self._status_label.setText("Camera stopped") def _on_camera_error(self, message: str) -> None: self._status_label.setText(f"Error: {message}") logger.error("Camera error: %s", message) # ------------------------------------------------------------------ # Video player slots # ------------------------------------------------------------------ def _on_playback_started(self) -> None: path = self._video_player.current_path or "" name = Path(path).name if path else "video" self._status_label.setText(f"Playing: {name}") def _on_playback_stopped(self) -> None: self._status_label.setText("Playback finished") def _on_playback_error(self, message: str) -> None: self._status_label.setText(f"Video error: {message}") logger.error(message) # ------------------------------------------------------------------ # Inference slots # ------------------------------------------------------------------ def _on_inference_started(self) -> None: self._status_label.setText("Inference running") self._menu.set_inference_checked(True) def _on_inference_stopped(self) -> None: self._bbox_overlay.clear() def _on_inference_error(self, message: str) -> None: logger.error("Inference: %s", message) self._menu.set_inference_available(False) self._menu.set_inference_checked(False) self._bbox_overlay.visible = False QMessageBox.critical(self, "Inference Error", message) # ------------------------------------------------------------------ # Menu action slots # ------------------------------------------------------------------ def _on_camera_selected(self, cam: CameraInfo) -> None: if self._video_source_active: self._video_player.stop() self._switch_to_camera() self._start_camera(cam) def _on_format_selected(self, fmt: CameraFormat) -> None: logger.info( "Format selected via menu: %dx%d @ %.4g fps (%s)", fmt.width, fmt.height, fmt.max_fps, fmt.pixel_format, ) self._camera_service.set_format(fmt) def _on_settings_requested(self) -> None: qt_cam = self._camera_service.qt_camera if qt_cam is None: logger.warning("Settings requested but no camera is active") return dlg = CameraSettingsDialog(qt_cam, self._uvc, parent=self) dlg.exec() def _on_video_selected(self, path: str) -> None: """Switch source to video file.""" self._camera_service.stop() self._switch_to_video() self._video_player.play(path) logger.info("Video source: %s", path) def _on_video_closed(self) -> None: """Return to camera source.""" self._video_player.stop() self._switch_to_camera() if self._current_camera is not None: self._start_camera(self._current_camera) logger.info("Returned to camera source") def _on_model_selected(self, path: str) -> None: """Load YOLO model into inference manager.""" name = Path(path).name logger.info("Loading model: %s", path) self._status_label.setText(f"Loading model: {name}\u2026") self._inference.start(path) self._menu.set_model_label(name) self._menu.set_inference_available(True) self._menu.set_inference_checked(False) # user must explicitly enable def _on_inference_toggled(self, enabled: bool) -> None: if enabled: if not self._inference.is_running: # shouldn't happen but be safe logger.warning("Inference toggle on but manager not running") self._menu.set_inference_checked(False) return self._inference.resume() self._dispatcher.subscribe( self._inference.submit_frame, drop_if_busy=True ) self._bbox_overlay.visible = True self._status_label.setText("Inference enabled") logger.info("Inference enabled") else: self._inference.pause() self._dispatcher.unsubscribe(self._inference.submit_frame) self._bbox_overlay.clear() self._bbox_overlay.visible = False self._status_label.setText("Inference disabled") logger.info("Inference disabled") # ------------------------------------------------------------------ # Qt overrides # ------------------------------------------------------------------ def closeEvent(self, event) -> None: # noqa: N802 self._inference.stop() self._camera_service.stop() self._video_player.stop() if self._uvc.is_open(): self._uvc.close() if self._csv_logger is not None: logger.info( "CSV telemetry: %d rows written", self._csv_logger.rows_written ) self._csv_logger.close() super().closeEvent(event)