diff --git a/app/config.py b/app/config.py index e67a1cf..ba8f9d6 100644 --- a/app/config.py +++ b/app/config.py @@ -41,10 +41,23 @@ DEFAULT_CONFIG: dict[str, Any] = { }, "ocr": { "enabled": True, + "engine": "tesseract", "language": "eng", "tesseract_cmd": None, + "psm": 6, + "margin": 0, "threshold": True, + "paddle_threshold": False, "scale": 2.0, + "config": "", + "use_angle_cls": True, + "paddle": { + "enable_mkldnn": False, + "lang": "en", + "use_doc_orientation_classify": False, + "use_doc_unwarping": False, + "use_textline_orientation": False, + }, }, "capture": { "photos_dir": "captures/photos", @@ -56,7 +69,12 @@ DEFAULT_CONFIG: dict[str, Any] = { "display": { "show_fps": True, }, - "label_data": {"models": ["Regius", "Duvell"], "colors": ["T-NF-BLK-OUT-BST-G", "T-BLK-G"]}, + "label_data": { + "model_min_score": 0.72, + "color_min_score": 0.72, + "models": ["Regius", "Duvell"], + "colors": ["T-NF-BLK-OUT-BST-G", "T-BLK-G"], + }, } diff --git a/app/detection.py b/app/detection.py index 8e945d0..15d7551 100644 --- a/app/detection.py +++ b/app/detection.py @@ -4,10 +4,10 @@ from dataclasses import dataclass, field from pathlib import Path from typing import Any -import cv2 import numpy as np from app.label_parser import ParsedLabel, parse_label_text +from app.ocr import create_ocr_engine @dataclass @@ -18,6 +18,9 @@ class DetectionResult: raw_text: str = "" parsed: ParsedLabel | None = None error: str | None = None + ocr_engine: str | None = None + ocr_confidence: float | None = None + ocr_elapsed_ms: float | None = None all_boxes: list[dict[str, Any]] = field(default_factory=list) def to_metadata(self) -> dict[str, Any]: @@ -28,6 +31,9 @@ class DetectionResult: "raw_text": self.raw_text, "parsed": self.parsed.to_dict() if self.parsed else None, "error": self.error, + "ocr_engine": self.ocr_engine, + "ocr_confidence": self.ocr_confidence, + "ocr_elapsed_ms": self.ocr_elapsed_ms, "all_boxes": self.all_boxes, } @@ -72,6 +78,9 @@ class YoloLabelDetector: boxes = [] names = getattr(self.model, "names", {}) for result in results: + if result.boxes is None: + continue + for box in result.boxes: x1, y1, x2, y2 = [int(v) for v in box.xyxy[0].tolist()] confidence = float(box.conf[0]) @@ -106,78 +115,30 @@ class YoloLabelDetector: return result -class TesseractOcr: - def __init__(self, config: dict[str, Any]) -> None: - self.config = config - self.load_error: str | None = None - self.pytesseract = None - self._load() - - def _load(self) -> None: - if not self.config["ocr"].get("enabled", True): - return - try: - import pytesseract - - command = self.config["ocr"].get("tesseract_cmd") - if command: - pytesseract.pytesseract.tesseract_cmd = command - self.pytesseract = pytesseract - except Exception as exc: - self.load_error = f"Nie mozna zaladowac pytesseract: {exc}" - - def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> tuple[str, str | None]: - if not self.config["ocr"].get("enabled", True): - return "", None - if self.pytesseract is None: - return "", self.load_error or "OCR nie jest zaladowany" - - x1, y1, x2, y2 = bbox - h, w = frame_bgr.shape[:2] - x1, y1 = max(0, x1), max(0, y1) - x2, y2 = min(w, x2), min(h, y2) - if x2 <= x1 or y2 <= y1: - return "", "Nieprawidlowy bbox OCR" - - roi = frame_bgr[y1:y2, x1:x2] - scale = float(self.config["ocr"].get("scale", 1.0)) - if scale != 1.0: - roi = cv2.resize(roi, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC) - - gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) - if self.config["ocr"].get("threshold", True): - gray = cv2.GaussianBlur(gray, (3, 3), 0) - gray = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1] - - try: - text = self.pytesseract.image_to_string( - gray, - lang=self.config["ocr"].get("language", "eng"), - config="--psm 6", - ) - except Exception as exc: - return "", f"Blad OCR: {exc}" - return text, None - - class DetectionPipeline: def __init__(self, config: dict[str, Any], app_config: Any) -> None: self.config = config self.detector = YoloLabelDetector(config, app_config) - self.ocr = TesseractOcr(config) + self.ocr = create_ocr_engine(config) def process(self, frame_bgr: np.ndarray) -> DetectionResult: result = self.detector.detect(frame_bgr) if result.xyxy is None: return result - text, ocr_error = self.ocr.read_label(frame_bgr, result.xyxy) - result.raw_text = text + ocr_result = self.ocr.read_label(frame_bgr, result.xyxy) + result.raw_text = ocr_result.text + result.ocr_engine = ocr_result.engine + result.ocr_confidence = ocr_result.confidence + result.ocr_elapsed_ms = ocr_result.elapsed_ms + label_cfg = self.config["label_data"] result.parsed = parse_label_text( - text, - self.config["label_data"].get("colors", []), - self.config["label_data"].get("models", []), + ocr_result.text, + label_cfg.get("colors", []), + label_cfg.get("models", []), + model_min_score=float(label_cfg.get("model_min_score", 0.72)), + color_min_score=float(label_cfg.get("color_min_score", 0.72)), ) - if ocr_error: - result.error = ocr_error + if ocr_result.error: + result.error = ocr_result.error return result diff --git a/app/fuzzy_match.py b/app/fuzzy_match.py new file mode 100644 index 0000000..c74e832 --- /dev/null +++ b/app/fuzzy_match.py @@ -0,0 +1,131 @@ +from __future__ import annotations + +import re +import unicodedata +from dataclasses import dataclass +from difflib import SequenceMatcher + + +@dataclass(frozen=True) +class FuzzyMatch: + value: str + score: float + matched_text: str + coverage: float + + +def compact_text(text: str) -> str: + normalized = unicodedata.normalize("NFKD", text) + ascii_text = normalized.encode("ascii", "ignore").decode("ascii") + return re.sub(r"[^A-Z0-9]+", "", ascii_text.upper()) + + +def token_text(text: str) -> list[str]: + normalized = unicodedata.normalize("NFKD", text) + ascii_text = normalized.encode("ascii", "ignore").decode("ascii") + return re.findall(r"[A-Z0-9]+", ascii_text.upper()) + + +def similarity(left: str, right: str) -> float: + if not left or not right: + return 0.0 + return SequenceMatcher(None, left, right).ratio() + + +def best_fuzzy_match(text: str, candidates: list[str], min_score: float = 0.72) -> FuzzyMatch | None: + best: FuzzyMatch | None = None + for candidate in candidates: + candidate_compact = compact_text(candidate) + if not candidate_compact: + continue + + score, matched_text, coverage = best_candidate_score(text, candidate_compact) + match = FuzzyMatch( + value=candidate, + score=score, + matched_text=matched_text, + coverage=coverage, + ) + if best is None or _is_better_match(match, best): + best = match + + if best is None or best.score < min_score: + return None + return best + + +def best_candidate_score(text: str, candidate_compact: str) -> tuple[float, str, float]: + full_compact = compact_text(text) + if candidate_compact in full_compact: + return 1.0, candidate_compact, 1.0 + + windows = candidate_windows(text, len(candidate_compact)) + if not windows and full_compact: + windows = [full_compact] + + best_score = 0.0 + best_window = "" + best_coverage = 0.0 + for window in windows: + coverage = min(len(window), len(candidate_compact)) / max(len(window), len(candidate_compact)) + score = similarity(candidate_compact, window) * coverage * digit_match_weight(candidate_compact, window) + if score > best_score: + best_score = score + best_window = window + best_coverage = coverage + return best_score, best_window, best_coverage + + +def digit_match_weight(candidate: str, matched_text: str) -> float: + candidate_digits = re.findall(r"\d", candidate) + matched_digits = re.findall(r"\d", matched_text) + if not candidate_digits or not matched_digits: + return 1.0 + if candidate_digits == matched_digits: + return 1.05 + return 0.7 + + +def candidate_windows(text: str, candidate_length: int) -> list[str]: + tokens = token_text(text) + windows: set[str] = set() + + for token in tokens: + windows.add(token) + + max_ngram = min(8, len(tokens)) + for size in range(2, max_ngram + 1): + for index in range(0, len(tokens) - size + 1): + joined = "".join(tokens[index : index + size]) + if _length_is_plausible(joined, candidate_length): + windows.add(joined) + + full_compact = compact_text(text) + if full_compact: + min_len = max(1, int(candidate_length * 0.65)) + max_len = max(min_len, int(candidate_length * 1.35)) + for length in range(min_len, max_len + 1): + if length > len(full_compact): + continue + for index in range(0, len(full_compact) - length + 1): + windows.add(full_compact[index : index + length]) + + return sorted(windows) + + +def _length_is_plausible(value: str, candidate_length: int) -> bool: + if not value: + return False + return int(candidate_length * 0.65) <= len(value) <= int(candidate_length * 1.6) + + +def _is_better_match(match: FuzzyMatch, best: FuzzyMatch) -> bool: + if match.score > best.score + 0.03: + return True + if match.score < best.score - 0.03: + return False + if match.coverage > best.coverage + 0.05: + return True + if match.coverage < best.coverage - 0.05: + return False + return len(compact_text(match.value)) > len(compact_text(best.value)) diff --git a/app/label_parser.py b/app/label_parser.py index 26318b0..066278c 100644 --- a/app/label_parser.py +++ b/app/label_parser.py @@ -3,8 +3,12 @@ from __future__ import annotations import re from dataclasses import dataclass, asdict +from app.fuzzy_match import best_fuzzy_match + ORDER_RE = re.compile(r"\b(?P\d{4}/\d{4}/(?:[1-9]|[1-9]\d))\b") +DEFAULT_MODEL_MIN_SCORE = 0.72 +DEFAULT_COLOR_MIN_SCORE = 0.72 @dataclass @@ -13,8 +17,10 @@ class ParsedLabel: color_code: str | None product_model: str | None raw_text: str + color_score: float | None = None + product_model_score: float | None = None - def to_dict(self) -> dict[str, str | None]: + def to_dict(self) -> dict[str, str | float | None]: return asdict(self) @@ -22,23 +28,24 @@ def normalize_ocr_text(text: str) -> str: return " ".join(text.replace("\n", " ").replace("\r", " ").split()) -def parse_label_text(text: str, known_colors: list[str], known_models: list[str]) -> ParsedLabel: +def parse_label_text( + text: str, + known_colors: list[str], + known_models: list[str], + model_min_score: float = DEFAULT_MODEL_MIN_SCORE, + color_min_score: float = DEFAULT_COLOR_MIN_SCORE, +) -> ParsedLabel: normalized = normalize_ocr_text(text) order_match = ORDER_RE.search(normalized) - normalized_upper = normalized.upper() - color_code = next( - (color for color in known_colors if color.upper() in normalized_upper), - None, - ) - product_model = next( - (model for model in known_models if re.search(rf"\b{re.escape(model)}\b", normalized, re.I)), - None, - ) + color_match = best_fuzzy_match(normalized, known_colors, color_min_score) + model_match = best_fuzzy_match(normalized, known_models, model_min_score) return ParsedLabel( order_number=order_match.group("order") if order_match else None, - color_code=color_code, - product_model=product_model, + color_code=color_match.value if color_match else None, + product_model=model_match.value if model_match else None, raw_text=normalized, + color_score=color_match.score if color_match else None, + product_model_score=model_match.score if model_match else None, )