"""Windows UVC controller — backed by duvc-ctl (DirectShow).""" from __future__ import annotations import logging import duvc_ctl as duvc # type: ignore[import-untyped] # noqa: PLC0415 (lazy import) from app.camera.uvc.base import UvcControllerBase, UvcParam, UvcParamInfo logger = logging.getLogger(__name__) # Map our UvcParam → duvc_ctl VidProp string name _VIDPROP: dict[UvcParam, str] = { UvcParam.BRIGHTNESS: "brightness", UvcParam.CONTRAST: "contrast", UvcParam.SATURATION: "saturation", UvcParam.HUE: "hue", UvcParam.SHARPNESS: "sharpness", UvcParam.GAMMA: "gamma", UvcParam.WHITE_BALANCE: "white_balance", UvcParam.BACKLIGHT_COMPENSATION: "backlight_compensation", } # Map UvcParam → duvc_ctl CamProp string for camera-side controls _CAMPROP: dict[UvcParam, str] = { UvcParam.EXPOSURE: "exposure", } class WindowsUvcController(UvcControllerBase): """ UVC camera controls on Windows via duvc-ctl (DirectShow). duvc-ctl docs: https://allanhanan.github.io/duvc-ctl/ """ def __init__(self, device_name: str) -> None: self._device_name = device_name self._cam: duvc.CameraController | None = None self._supported: dict[UvcParam, bool] = {} # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ def open(self, device_name: str) -> bool: self.close() try: devices = duvc.list_devices() target = None for d in devices: if device_name.lower() in d.name.lower() or d.name.lower() in device_name.lower(): target = d break if target is None and devices: target = devices[0] # fallback to first device logger.warning( "UVC: camera '%s' not found by name, using '%s'", device_name, target.name, ) if target is None: logger.warning("UVC: no devices found on Windows") return False self._cam = duvc.CameraController(target) self._cam.__enter__() self._refresh_supported() logger.info("UVC: opened device '%s'", target.name) return True except Exception as exc: logger.warning("UVC: failed to open device: %s", exc) self._cam = None return False def close(self) -> None: if self._cam is not None: try: self._cam.__exit__(None, None, None) except Exception: pass self._cam = None self._supported.clear() def is_open(self) -> bool: return self._cam is not None # ------------------------------------------------------------------ # Query # ------------------------------------------------------------------ def get_param_info(self, param: UvcParam) -> UvcParamInfo: if not self.is_open() or not self._supported.get(param, False): return UvcParamInfo(param=param, supported=False) try: prop_name = _VIDPROP.get(param) or _CAMPROP.get(param) if prop_name is None: return UvcParamInfo(param=param, supported=False) rng = self._cam.get_property_range(prop_name) current = getattr(self._cam, prop_name, rng.get("default", 0)) auto_supported = param in (UvcParam.WHITE_BALANCE, UvcParam.EXPOSURE) return UvcParamInfo( param=param, supported=True, minimum=int(rng.get("min", 0)), maximum=int(rng.get("max", 100)), default=int(rng.get("default", 50)), current=int(current), step=int(rng.get("step", 1)), auto_supported=auto_supported, ) except Exception as exc: logger.debug("UVC get_param_info(%s): %s", param.name, exc) return UvcParamInfo(param=param, supported=False) def get_all_params(self) -> list[UvcParamInfo]: return [self.get_param_info(p) for p in UvcParam] # ------------------------------------------------------------------ # Set # ------------------------------------------------------------------ def set_value(self, param: UvcParam, value: int) -> bool: if not self.is_open(): return False try: prop_name = _VIDPROP.get(param) or _CAMPROP.get(param) if prop_name is None: return False setattr(self._cam, prop_name, value) logger.debug("UVC set %s = %d", param.name, value) return True except Exception as exc: logger.warning("UVC set_value(%s, %d): %s", param.name, value, exc) return False def set_auto(self, param: UvcParam, enabled: bool) -> bool: if not self.is_open(): return False try: if param == UvcParam.WHITE_BALANCE: prop = "white_balance" elif param == UvcParam.EXPOSURE: prop = "exposure" else: return False mode = duvc.CamMode.AUTO if enabled else duvc.CamMode.MANUAL self._cam.set_camera_property(prop, mode=mode) logger.debug("UVC auto %s = %s", param.name, enabled) return True except Exception as exc: logger.warning("UVC set_auto(%s, %s): %s", param.name, enabled, exc) return False # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _refresh_supported(self) -> None: if self._cam is None: return try: info = self._cam.get_supported_properties() vid = set(info.get("video", [])) cam = set(info.get("camera", [])) for param in UvcParam: prop = _VIDPROP.get(param) or _CAMPROP.get(param, "") self._supported[param] = prop in vid or prop in cam except Exception as exc: logger.debug("UVC _refresh_supported: %s", exc) for param in UvcParam: self._supported[param] = False