154 lines
5.4 KiB
Python
154 lines
5.4 KiB
Python
from __future__ import annotations
|
|
|
|
import time
|
|
from typing import Any
|
|
|
|
import numpy as np
|
|
|
|
from app.ocr.base import OcrLine, OcrResult, crop_bbox, prepare_ocr_image
|
|
|
|
|
|
class PaddleOcrEngine:
|
|
name = "paddle"
|
|
|
|
def __init__(self, config: dict) -> None:
|
|
self.config = config
|
|
self.load_error: str | None = None
|
|
self.ocr: Any = None
|
|
self._load()
|
|
|
|
def _load(self) -> None:
|
|
try:
|
|
from paddleocr import PaddleOCR
|
|
except Exception as exc:
|
|
self.load_error = f"Nie mozna zaimportowac PaddleOCR: {exc}"
|
|
return
|
|
|
|
paddle_cfg = dict(self.config.get("paddle", {}))
|
|
paddle_cfg.setdefault("lang", self.config.get("language", "en"))
|
|
try:
|
|
self.ocr = PaddleOCR(**paddle_cfg)
|
|
except Exception as exc:
|
|
self.load_error = f"Nie mozna zaladowac PaddleOCR: {exc}"
|
|
|
|
def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> OcrResult:
|
|
started = time.perf_counter()
|
|
if self.ocr is None:
|
|
return OcrResult(
|
|
error=self.load_error or "PaddleOCR nie jest zaladowany",
|
|
elapsed_ms=self._elapsed_ms(started),
|
|
engine=self.name,
|
|
)
|
|
|
|
margin = int(self.config.get("margin", 0))
|
|
roi = crop_bbox(frame_bgr, bbox, margin=margin)
|
|
if roi is None:
|
|
return OcrResult(
|
|
error="Nieprawidlowy bbox OCR",
|
|
elapsed_ms=self._elapsed_ms(started),
|
|
engine=self.name,
|
|
)
|
|
|
|
preprocess_config = {
|
|
**self.config,
|
|
"threshold": bool(self.config.get("paddle_threshold", False)),
|
|
}
|
|
image = prepare_ocr_image(roi, preprocess_config)
|
|
try:
|
|
raw_result = self._run_ocr(image)
|
|
except Exception as exc:
|
|
return OcrResult(
|
|
error=f"Blad PaddleOCR: {exc}",
|
|
elapsed_ms=self._elapsed_ms(started),
|
|
engine=self.name,
|
|
)
|
|
|
|
lines = self._parse_lines(raw_result)
|
|
text = "\n".join(line.text for line in lines)
|
|
confidences = [line.confidence for line in lines if line.confidence is not None]
|
|
confidence = sum(confidences) / len(confidences) if confidences else None
|
|
return OcrResult(
|
|
text=text,
|
|
confidence=confidence,
|
|
lines=lines,
|
|
elapsed_ms=self._elapsed_ms(started),
|
|
engine=self.name,
|
|
)
|
|
|
|
def _run_ocr(self, image: np.ndarray) -> Any:
|
|
if hasattr(self.ocr, "predict"):
|
|
return self.ocr.predict(image)
|
|
try:
|
|
return self.ocr.ocr(image, cls=bool(self.config.get("use_angle_cls", True)))
|
|
except TypeError:
|
|
return self.ocr.ocr(image)
|
|
|
|
def _parse_lines(self, raw_result: Any) -> list[OcrLine]:
|
|
if raw_result is None:
|
|
return []
|
|
|
|
lines: list[OcrLine] = []
|
|
for item in self._iter_result_items(raw_result):
|
|
parsed = self._parse_item(item)
|
|
if parsed is not None and parsed.text.strip():
|
|
lines.append(parsed)
|
|
return lines
|
|
|
|
def _iter_result_items(self, raw_result: Any) -> list[Any]:
|
|
if isinstance(raw_result, dict):
|
|
texts = raw_result.get("rec_texts") or raw_result.get("texts")
|
|
scores = raw_result.get("rec_scores") or raw_result.get("scores") or []
|
|
boxes = raw_result.get("rec_polys") or raw_result.get("dt_polys") or raw_result.get("boxes") or []
|
|
if texts:
|
|
return [
|
|
(boxes[index] if index < len(boxes) else None, (text, scores[index] if index < len(scores) else None))
|
|
for index, text in enumerate(texts)
|
|
]
|
|
return []
|
|
|
|
if isinstance(raw_result, list) and len(raw_result) == 1 and isinstance(raw_result[0], list):
|
|
return raw_result[0]
|
|
if isinstance(raw_result, list):
|
|
items = []
|
|
for result in raw_result:
|
|
if isinstance(result, dict):
|
|
items.extend(self._iter_result_items(result))
|
|
elif isinstance(result, list):
|
|
items.extend(result)
|
|
else:
|
|
items.append(result)
|
|
return items
|
|
return [raw_result]
|
|
|
|
def _parse_item(self, item: Any) -> OcrLine | None:
|
|
if not isinstance(item, (list, tuple)):
|
|
return None
|
|
|
|
if len(item) >= 2 and isinstance(item[1], (list, tuple)) and item[1]:
|
|
text = str(item[1][0])
|
|
confidence = self._to_float(item[1][1]) if len(item[1]) > 1 else None
|
|
bbox = self._to_bbox(item[0])
|
|
return OcrLine(text=text, confidence=confidence, bbox=bbox)
|
|
|
|
if len(item) >= 2 and isinstance(item[0], str):
|
|
return OcrLine(text=str(item[0]), confidence=self._to_float(item[1]))
|
|
|
|
return None
|
|
|
|
def _to_float(self, value: Any) -> float | None:
|
|
try:
|
|
return float(value)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
def _to_bbox(self, value: Any) -> list[list[float]] | None:
|
|
if value is None:
|
|
return None
|
|
try:
|
|
return [[float(point[0]), float(point[1])] for point in value]
|
|
except (TypeError, ValueError, IndexError):
|
|
return None
|
|
|
|
def _elapsed_ms(self, started: float) -> float:
|
|
return (time.perf_counter() - started) * 1000.0
|