From 061ebf9978ddc825bed73a9de02869d552be5216 Mon Sep 17 00:00:00 2001 From: bartool Date: Fri, 8 May 2026 07:08:48 +0200 Subject: [PATCH] Implement OCR engine architecture with base, factory, and specific engines --- app/ocr/__init__.py | 4 ++ app/ocr/base.py | 54 +++++++++++++++ app/ocr/cli.py | 106 ++++++++++++++++++++++++++++++ app/ocr/factory.py | 24 +++++++ app/ocr/none.py | 15 +++++ app/ocr/paddle.py | 153 +++++++++++++++++++++++++++++++++++++++++++ app/ocr/tesseract.py | 104 +++++++++++++++++++++++++++++ 7 files changed, 460 insertions(+) create mode 100644 app/ocr/__init__.py create mode 100644 app/ocr/base.py create mode 100644 app/ocr/cli.py create mode 100644 app/ocr/factory.py create mode 100644 app/ocr/none.py create mode 100644 app/ocr/paddle.py create mode 100644 app/ocr/tesseract.py diff --git a/app/ocr/__init__.py b/app/ocr/__init__.py new file mode 100644 index 0000000..47143fd --- /dev/null +++ b/app/ocr/__init__.py @@ -0,0 +1,4 @@ +from app.ocr.base import OcrEngine, OcrLine, OcrResult +from app.ocr.factory import create_ocr_engine + +__all__ = ["OcrEngine", "OcrLine", "OcrResult", "create_ocr_engine"] diff --git a/app/ocr/base.py b/app/ocr/base.py new file mode 100644 index 0000000..b1fdb8f --- /dev/null +++ b/app/ocr/base.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Protocol + +import cv2 +import numpy as np + + +@dataclass +class OcrLine: + text: str + confidence: float | None = None + bbox: list[list[float]] | None = None + + +@dataclass +class OcrResult: + text: str = "" + confidence: float | None = None + lines: list[OcrLine] = field(default_factory=list) + error: str | None = None + elapsed_ms: float = 0.0 + engine: str = "none" + + +class OcrEngine(Protocol): + name: str + + def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> OcrResult: + ... + + +def crop_bbox(frame_bgr: np.ndarray, bbox: tuple[int, int, int, int], margin: int = 0) -> np.ndarray | None: + x1, y1, x2, y2 = bbox + h, w = frame_bgr.shape[:2] + x1, y1 = max(0, x1 - margin), max(0, y1 - margin) + x2, y2 = min(w, x2 + margin), min(h, y2 + margin) + if x2 <= x1 or y2 <= y1: + return None + return frame_bgr[y1:y2, x1:x2] + + +def prepare_ocr_image(image_bgr: np.ndarray, config: dict) -> np.ndarray: + scale = float(config.get("scale", 1.0)) + if scale != 1.0: + image_bgr = cv2.resize(image_bgr, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC) + + if not config.get("threshold", False): + return image_bgr + + gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY) + gray = cv2.GaussianBlur(gray, (3, 3), 0) + return cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1] diff --git a/app/ocr/cli.py b/app/ocr/cli.py new file mode 100644 index 0000000..2e3cf97 --- /dev/null +++ b/app/ocr/cli.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +import argparse +import json +from pathlib import Path +from typing import Any + +import cv2 + +from app.config import AppConfig +from app.label_parser import parse_label_text +from app.ocr import create_ocr_engine + + +def iter_images(path: Path) -> list[Path]: + if path.is_file(): + return [path] + + extensions = {".jpg", ".jpeg", ".png", ".bmp", ".webp", ".tif", ".tiff"} + return sorted(item for item in path.iterdir() if item.is_file() and item.suffix.lower() in extensions) + + +def result_to_dict(path: Path, result: Any, config: dict[str, Any]) -> dict[str, Any]: + label_cfg = config.get("label_data", {}) + parsed = parse_label_text( + result.text, + label_cfg.get("colors", []), + label_cfg.get("models", []), + model_min_score=float(label_cfg.get("model_min_score", 0.72)), + color_min_score=float(label_cfg.get("color_min_score", 0.72)), + ) + return { + "file": str(path), + "engine": result.engine, + "elapsed_ms": round(result.elapsed_ms, 2), + "confidence": result.confidence, + "error": result.error, + "text": result.text, + "lines": [ + { + "text": line.text, + "confidence": line.confidence, + "bbox": line.bbox, + } + for line in result.lines + ], + "parsed": parsed.to_dict(), + } + + +def main() -> int: + parser = argparse.ArgumentParser(description="Test OCR backend on cropped label images.") + parser.add_argument("path", help="Image file or directory with crop images") + parser.add_argument("--config", default="app_config.json", help="Application config JSON path") + parser.add_argument( + "--engine", + choices=["none", "tesseract", "paddle"], + help="Override ocr.engine from config", + ) + parser.add_argument("--no-threshold", action="store_true", help="Disable threshold preprocessing") + parser.add_argument("--scale", type=float, help="Override OCR scale") + parser.add_argument("--json", action="store_true", help="Print JSON output") + args = parser.parse_args() + + app_config = AppConfig(Path(args.config)) + config = app_config.data + if args.engine: + config["ocr"]["engine"] = args.engine + config["ocr"]["enabled"] = args.engine != "none" + if args.no_threshold: + config["ocr"]["threshold"] = False + if args.scale is not None: + config["ocr"]["scale"] = args.scale + + engine = create_ocr_engine(config) + outputs = [] + for image_path in iter_images(Path(args.path)): + image = cv2.imread(str(image_path), cv2.IMREAD_COLOR) + if image is None: + outputs.append({"file": str(image_path), "error": "Nie mozna odczytac obrazu"}) + continue + + h, w = image.shape[:2] + result = engine.read_label(image, (0, 0, w, h)) + outputs.append(result_to_dict(image_path, result, config)) + + if args.json: + print(json.dumps(outputs, indent=2, ensure_ascii=False)) + return 0 + + for output in outputs: + print(f"file: {output['file']}") + print(f"engine: {output.get('engine')}") + print(f"elapsed_ms: {output.get('elapsed_ms')}") + print(f"confidence: {output.get('confidence')}") + if output.get("error"): + print(f"error: {output['error']}") + print("text:") + print(output.get("text") or "") + print(f"parsed: {output.get('parsed')}") + print() + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/app/ocr/factory.py b/app/ocr/factory.py new file mode 100644 index 0000000..cf45e66 --- /dev/null +++ b/app/ocr/factory.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from typing import Any + +from app.ocr.base import OcrEngine +from app.ocr.none import NoOcrEngine +from app.ocr.paddle import PaddleOcrEngine +from app.ocr.tesseract import TesseractOcrEngine + + +def create_ocr_engine(config: dict[str, Any]) -> OcrEngine: + ocr_cfg = config.get("ocr", {}) + if not ocr_cfg.get("enabled", True): + return NoOcrEngine(ocr_cfg) + + engine = str(ocr_cfg.get("engine", "tesseract")).lower() + if engine in {"none", "off", "disabled"}: + return NoOcrEngine(ocr_cfg) + if engine == "tesseract": + return TesseractOcrEngine(ocr_cfg) + if engine == "paddle": + return PaddleOcrEngine(ocr_cfg) + + raise ValueError(f"Nieznany silnik OCR: {engine}") diff --git a/app/ocr/none.py b/app/ocr/none.py new file mode 100644 index 0000000..9a91dbe --- /dev/null +++ b/app/ocr/none.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +import numpy as np + +from app.ocr.base import OcrResult + + +class NoOcrEngine: + name = "none" + + def __init__(self, config: dict) -> None: + self.config = config + + def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> OcrResult: + return OcrResult(engine=self.name) diff --git a/app/ocr/paddle.py b/app/ocr/paddle.py new file mode 100644 index 0000000..e195508 --- /dev/null +++ b/app/ocr/paddle.py @@ -0,0 +1,153 @@ +from __future__ import annotations + +import time +from typing import Any + +import numpy as np + +from app.ocr.base import OcrLine, OcrResult, crop_bbox, prepare_ocr_image + + +class PaddleOcrEngine: + name = "paddle" + + def __init__(self, config: dict) -> None: + self.config = config + self.load_error: str | None = None + self.ocr: Any = None + self._load() + + def _load(self) -> None: + try: + from paddleocr import PaddleOCR + except Exception as exc: + self.load_error = f"Nie mozna zaimportowac PaddleOCR: {exc}" + return + + paddle_cfg = dict(self.config.get("paddle", {})) + paddle_cfg.setdefault("lang", self.config.get("language", "en")) + try: + self.ocr = PaddleOCR(**paddle_cfg) + except Exception as exc: + self.load_error = f"Nie mozna zaladowac PaddleOCR: {exc}" + + def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> OcrResult: + started = time.perf_counter() + if self.ocr is None: + return OcrResult( + error=self.load_error or "PaddleOCR nie jest zaladowany", + elapsed_ms=self._elapsed_ms(started), + engine=self.name, + ) + + margin = int(self.config.get("margin", 0)) + roi = crop_bbox(frame_bgr, bbox, margin=margin) + if roi is None: + return OcrResult( + error="Nieprawidlowy bbox OCR", + elapsed_ms=self._elapsed_ms(started), + engine=self.name, + ) + + preprocess_config = { + **self.config, + "threshold": bool(self.config.get("paddle_threshold", False)), + } + image = prepare_ocr_image(roi, preprocess_config) + try: + raw_result = self._run_ocr(image) + except Exception as exc: + return OcrResult( + error=f"Blad PaddleOCR: {exc}", + elapsed_ms=self._elapsed_ms(started), + engine=self.name, + ) + + lines = self._parse_lines(raw_result) + text = "\n".join(line.text for line in lines) + confidences = [line.confidence for line in lines if line.confidence is not None] + confidence = sum(confidences) / len(confidences) if confidences else None + return OcrResult( + text=text, + confidence=confidence, + lines=lines, + elapsed_ms=self._elapsed_ms(started), + engine=self.name, + ) + + def _run_ocr(self, image: np.ndarray) -> Any: + if hasattr(self.ocr, "predict"): + return self.ocr.predict(image) + try: + return self.ocr.ocr(image, cls=bool(self.config.get("use_angle_cls", True))) + except TypeError: + return self.ocr.ocr(image) + + def _parse_lines(self, raw_result: Any) -> list[OcrLine]: + if raw_result is None: + return [] + + lines: list[OcrLine] = [] + for item in self._iter_result_items(raw_result): + parsed = self._parse_item(item) + if parsed is not None and parsed.text.strip(): + lines.append(parsed) + return lines + + def _iter_result_items(self, raw_result: Any) -> list[Any]: + if isinstance(raw_result, dict): + texts = raw_result.get("rec_texts") or raw_result.get("texts") + scores = raw_result.get("rec_scores") or raw_result.get("scores") or [] + boxes = raw_result.get("rec_polys") or raw_result.get("dt_polys") or raw_result.get("boxes") or [] + if texts: + return [ + (boxes[index] if index < len(boxes) else None, (text, scores[index] if index < len(scores) else None)) + for index, text in enumerate(texts) + ] + return [] + + if isinstance(raw_result, list) and len(raw_result) == 1 and isinstance(raw_result[0], list): + return raw_result[0] + if isinstance(raw_result, list): + items = [] + for result in raw_result: + if isinstance(result, dict): + items.extend(self._iter_result_items(result)) + elif isinstance(result, list): + items.extend(result) + else: + items.append(result) + return items + return [raw_result] + + def _parse_item(self, item: Any) -> OcrLine | None: + if not isinstance(item, (list, tuple)): + return None + + if len(item) >= 2 and isinstance(item[1], (list, tuple)) and item[1]: + text = str(item[1][0]) + confidence = self._to_float(item[1][1]) if len(item[1]) > 1 else None + bbox = self._to_bbox(item[0]) + return OcrLine(text=text, confidence=confidence, bbox=bbox) + + if len(item) >= 2 and isinstance(item[0], str): + return OcrLine(text=str(item[0]), confidence=self._to_float(item[1])) + + return None + + def _to_float(self, value: Any) -> float | None: + try: + return float(value) + except (TypeError, ValueError): + return None + + def _to_bbox(self, value: Any) -> list[list[float]] | None: + if value is None: + return None + try: + return [[float(point[0]), float(point[1])] for point in value] + except (TypeError, ValueError, IndexError): + return None + + def _elapsed_ms(self, started: float) -> float: + return (time.perf_counter() - started) * 1000.0 diff --git a/app/ocr/tesseract.py b/app/ocr/tesseract.py new file mode 100644 index 0000000..e3723f6 --- /dev/null +++ b/app/ocr/tesseract.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +import time + +import numpy as np + +from app.ocr.base import OcrLine, OcrResult, crop_bbox, prepare_ocr_image + + +class TesseractOcrEngine: + name = "tesseract" + + def __init__(self, config: dict) -> None: + self.config = config + self.load_error: str | None = None + self.pytesseract = None + self._load() + + def _load(self) -> None: + try: + import pytesseract + + command = self.config.get("tesseract_cmd") + if command: + pytesseract.pytesseract.tesseract_cmd = command + self.pytesseract = pytesseract + except Exception as exc: + self.load_error = f"Nie mozna zaladowac pytesseract: {exc}" + + def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> OcrResult: + started = time.perf_counter() + if self.pytesseract is None: + return OcrResult( + error=self.load_error or "OCR Tesseract nie jest zaladowany", + elapsed_ms=self._elapsed_ms(started), + engine=self.name, + ) + + margin = int(self.config.get("margin", 0)) + roi = crop_bbox(frame_bgr, bbox, margin=margin) + if roi is None: + return OcrResult( + error="Nieprawidlowy bbox OCR", + elapsed_ms=self._elapsed_ms(started), + engine=self.name, + ) + + image = prepare_ocr_image(roi, self.config) + psm = int(self.config.get("psm", 6)) + language = self.config.get("language", "eng") + extra_config = str(self.config.get("config", "")).strip() + tesseract_config = f"--psm {psm}" + if extra_config: + tesseract_config = f"{tesseract_config} {extra_config}" + + try: + text = self.pytesseract.image_to_string( + image, + lang=language, + config=tesseract_config, + ) + except Exception as exc: + return OcrResult( + error=f"Blad OCR Tesseract: {exc}", + elapsed_ms=self._elapsed_ms(started), + engine=self.name, + ) + + confidence = self._mean_confidence(image, language, tesseract_config) + return OcrResult( + text=text, + confidence=confidence, + lines=[OcrLine(text=line) for line in text.splitlines() if line.strip()], + elapsed_ms=self._elapsed_ms(started), + engine=self.name, + ) + + def _mean_confidence(self, image: np.ndarray, language: str, tesseract_config: str) -> float | None: + if self.pytesseract is None: + return None + try: + data = self.pytesseract.image_to_data( + image, + lang=language, + config=tesseract_config, + output_type=self.pytesseract.Output.DICT, + ) + except Exception: + return None + + values = [] + for raw_conf in data.get("conf", []): + try: + confidence = float(raw_conf) + except (TypeError, ValueError): + continue + if confidence >= 0: + values.append(confidence / 100.0) + if not values: + return None + return sum(values) / len(values) + + def _elapsed_ms(self, started: float) -> float: + return (time.perf_counter() - started) * 1000.0