Enhance OCR configuration and integrate fuzzy matching for label parsing
This commit is contained in:
@@ -41,10 +41,23 @@ DEFAULT_CONFIG: dict[str, Any] = {
|
||||
},
|
||||
"ocr": {
|
||||
"enabled": True,
|
||||
"engine": "tesseract",
|
||||
"language": "eng",
|
||||
"tesseract_cmd": None,
|
||||
"psm": 6,
|
||||
"margin": 0,
|
||||
"threshold": True,
|
||||
"paddle_threshold": False,
|
||||
"scale": 2.0,
|
||||
"config": "",
|
||||
"use_angle_cls": True,
|
||||
"paddle": {
|
||||
"enable_mkldnn": False,
|
||||
"lang": "en",
|
||||
"use_doc_orientation_classify": False,
|
||||
"use_doc_unwarping": False,
|
||||
"use_textline_orientation": False,
|
||||
},
|
||||
},
|
||||
"capture": {
|
||||
"photos_dir": "captures/photos",
|
||||
@@ -56,7 +69,12 @@ DEFAULT_CONFIG: dict[str, Any] = {
|
||||
"display": {
|
||||
"show_fps": True,
|
||||
},
|
||||
"label_data": {"models": ["Regius", "Duvell"], "colors": ["T-NF-BLK-OUT-BST-G", "T-BLK-G"]},
|
||||
"label_data": {
|
||||
"model_min_score": 0.72,
|
||||
"color_min_score": 0.72,
|
||||
"models": ["Regius", "Duvell"],
|
||||
"colors": ["T-NF-BLK-OUT-BST-G", "T-BLK-G"],
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -4,10 +4,10 @@ from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
from app.label_parser import ParsedLabel, parse_label_text
|
||||
from app.ocr import create_ocr_engine
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -18,6 +18,9 @@ class DetectionResult:
|
||||
raw_text: str = ""
|
||||
parsed: ParsedLabel | None = None
|
||||
error: str | None = None
|
||||
ocr_engine: str | None = None
|
||||
ocr_confidence: float | None = None
|
||||
ocr_elapsed_ms: float | None = None
|
||||
all_boxes: list[dict[str, Any]] = field(default_factory=list)
|
||||
|
||||
def to_metadata(self) -> dict[str, Any]:
|
||||
@@ -28,6 +31,9 @@ class DetectionResult:
|
||||
"raw_text": self.raw_text,
|
||||
"parsed": self.parsed.to_dict() if self.parsed else None,
|
||||
"error": self.error,
|
||||
"ocr_engine": self.ocr_engine,
|
||||
"ocr_confidence": self.ocr_confidence,
|
||||
"ocr_elapsed_ms": self.ocr_elapsed_ms,
|
||||
"all_boxes": self.all_boxes,
|
||||
}
|
||||
|
||||
@@ -72,6 +78,9 @@ class YoloLabelDetector:
|
||||
boxes = []
|
||||
names = getattr(self.model, "names", {})
|
||||
for result in results:
|
||||
if result.boxes is None:
|
||||
continue
|
||||
|
||||
for box in result.boxes:
|
||||
x1, y1, x2, y2 = [int(v) for v in box.xyxy[0].tolist()]
|
||||
confidence = float(box.conf[0])
|
||||
@@ -106,78 +115,30 @@ class YoloLabelDetector:
|
||||
return result
|
||||
|
||||
|
||||
class TesseractOcr:
|
||||
def __init__(self, config: dict[str, Any]) -> None:
|
||||
self.config = config
|
||||
self.load_error: str | None = None
|
||||
self.pytesseract = None
|
||||
self._load()
|
||||
|
||||
def _load(self) -> None:
|
||||
if not self.config["ocr"].get("enabled", True):
|
||||
return
|
||||
try:
|
||||
import pytesseract
|
||||
|
||||
command = self.config["ocr"].get("tesseract_cmd")
|
||||
if command:
|
||||
pytesseract.pytesseract.tesseract_cmd = command
|
||||
self.pytesseract = pytesseract
|
||||
except Exception as exc:
|
||||
self.load_error = f"Nie mozna zaladowac pytesseract: {exc}"
|
||||
|
||||
def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> tuple[str, str | None]:
|
||||
if not self.config["ocr"].get("enabled", True):
|
||||
return "", None
|
||||
if self.pytesseract is None:
|
||||
return "", self.load_error or "OCR nie jest zaladowany"
|
||||
|
||||
x1, y1, x2, y2 = bbox
|
||||
h, w = frame_bgr.shape[:2]
|
||||
x1, y1 = max(0, x1), max(0, y1)
|
||||
x2, y2 = min(w, x2), min(h, y2)
|
||||
if x2 <= x1 or y2 <= y1:
|
||||
return "", "Nieprawidlowy bbox OCR"
|
||||
|
||||
roi = frame_bgr[y1:y2, x1:x2]
|
||||
scale = float(self.config["ocr"].get("scale", 1.0))
|
||||
if scale != 1.0:
|
||||
roi = cv2.resize(roi, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
|
||||
|
||||
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
|
||||
if self.config["ocr"].get("threshold", True):
|
||||
gray = cv2.GaussianBlur(gray, (3, 3), 0)
|
||||
gray = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]
|
||||
|
||||
try:
|
||||
text = self.pytesseract.image_to_string(
|
||||
gray,
|
||||
lang=self.config["ocr"].get("language", "eng"),
|
||||
config="--psm 6",
|
||||
)
|
||||
except Exception as exc:
|
||||
return "", f"Blad OCR: {exc}"
|
||||
return text, None
|
||||
|
||||
|
||||
class DetectionPipeline:
|
||||
def __init__(self, config: dict[str, Any], app_config: Any) -> None:
|
||||
self.config = config
|
||||
self.detector = YoloLabelDetector(config, app_config)
|
||||
self.ocr = TesseractOcr(config)
|
||||
self.ocr = create_ocr_engine(config)
|
||||
|
||||
def process(self, frame_bgr: np.ndarray) -> DetectionResult:
|
||||
result = self.detector.detect(frame_bgr)
|
||||
if result.xyxy is None:
|
||||
return result
|
||||
|
||||
text, ocr_error = self.ocr.read_label(frame_bgr, result.xyxy)
|
||||
result.raw_text = text
|
||||
ocr_result = self.ocr.read_label(frame_bgr, result.xyxy)
|
||||
result.raw_text = ocr_result.text
|
||||
result.ocr_engine = ocr_result.engine
|
||||
result.ocr_confidence = ocr_result.confidence
|
||||
result.ocr_elapsed_ms = ocr_result.elapsed_ms
|
||||
label_cfg = self.config["label_data"]
|
||||
result.parsed = parse_label_text(
|
||||
text,
|
||||
self.config["label_data"].get("colors", []),
|
||||
self.config["label_data"].get("models", []),
|
||||
ocr_result.text,
|
||||
label_cfg.get("colors", []),
|
||||
label_cfg.get("models", []),
|
||||
model_min_score=float(label_cfg.get("model_min_score", 0.72)),
|
||||
color_min_score=float(label_cfg.get("color_min_score", 0.72)),
|
||||
)
|
||||
if ocr_error:
|
||||
result.error = ocr_error
|
||||
if ocr_result.error:
|
||||
result.error = ocr_result.error
|
||||
return result
|
||||
|
||||
131
app/fuzzy_match.py
Normal file
131
app/fuzzy_match.py
Normal file
@@ -0,0 +1,131 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import unicodedata
|
||||
from dataclasses import dataclass
|
||||
from difflib import SequenceMatcher
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class FuzzyMatch:
|
||||
value: str
|
||||
score: float
|
||||
matched_text: str
|
||||
coverage: float
|
||||
|
||||
|
||||
def compact_text(text: str) -> str:
|
||||
normalized = unicodedata.normalize("NFKD", text)
|
||||
ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
|
||||
return re.sub(r"[^A-Z0-9]+", "", ascii_text.upper())
|
||||
|
||||
|
||||
def token_text(text: str) -> list[str]:
|
||||
normalized = unicodedata.normalize("NFKD", text)
|
||||
ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
|
||||
return re.findall(r"[A-Z0-9]+", ascii_text.upper())
|
||||
|
||||
|
||||
def similarity(left: str, right: str) -> float:
|
||||
if not left or not right:
|
||||
return 0.0
|
||||
return SequenceMatcher(None, left, right).ratio()
|
||||
|
||||
|
||||
def best_fuzzy_match(text: str, candidates: list[str], min_score: float = 0.72) -> FuzzyMatch | None:
|
||||
best: FuzzyMatch | None = None
|
||||
for candidate in candidates:
|
||||
candidate_compact = compact_text(candidate)
|
||||
if not candidate_compact:
|
||||
continue
|
||||
|
||||
score, matched_text, coverage = best_candidate_score(text, candidate_compact)
|
||||
match = FuzzyMatch(
|
||||
value=candidate,
|
||||
score=score,
|
||||
matched_text=matched_text,
|
||||
coverage=coverage,
|
||||
)
|
||||
if best is None or _is_better_match(match, best):
|
||||
best = match
|
||||
|
||||
if best is None or best.score < min_score:
|
||||
return None
|
||||
return best
|
||||
|
||||
|
||||
def best_candidate_score(text: str, candidate_compact: str) -> tuple[float, str, float]:
|
||||
full_compact = compact_text(text)
|
||||
if candidate_compact in full_compact:
|
||||
return 1.0, candidate_compact, 1.0
|
||||
|
||||
windows = candidate_windows(text, len(candidate_compact))
|
||||
if not windows and full_compact:
|
||||
windows = [full_compact]
|
||||
|
||||
best_score = 0.0
|
||||
best_window = ""
|
||||
best_coverage = 0.0
|
||||
for window in windows:
|
||||
coverage = min(len(window), len(candidate_compact)) / max(len(window), len(candidate_compact))
|
||||
score = similarity(candidate_compact, window) * coverage * digit_match_weight(candidate_compact, window)
|
||||
if score > best_score:
|
||||
best_score = score
|
||||
best_window = window
|
||||
best_coverage = coverage
|
||||
return best_score, best_window, best_coverage
|
||||
|
||||
|
||||
def digit_match_weight(candidate: str, matched_text: str) -> float:
|
||||
candidate_digits = re.findall(r"\d", candidate)
|
||||
matched_digits = re.findall(r"\d", matched_text)
|
||||
if not candidate_digits or not matched_digits:
|
||||
return 1.0
|
||||
if candidate_digits == matched_digits:
|
||||
return 1.05
|
||||
return 0.7
|
||||
|
||||
|
||||
def candidate_windows(text: str, candidate_length: int) -> list[str]:
|
||||
tokens = token_text(text)
|
||||
windows: set[str] = set()
|
||||
|
||||
for token in tokens:
|
||||
windows.add(token)
|
||||
|
||||
max_ngram = min(8, len(tokens))
|
||||
for size in range(2, max_ngram + 1):
|
||||
for index in range(0, len(tokens) - size + 1):
|
||||
joined = "".join(tokens[index : index + size])
|
||||
if _length_is_plausible(joined, candidate_length):
|
||||
windows.add(joined)
|
||||
|
||||
full_compact = compact_text(text)
|
||||
if full_compact:
|
||||
min_len = max(1, int(candidate_length * 0.65))
|
||||
max_len = max(min_len, int(candidate_length * 1.35))
|
||||
for length in range(min_len, max_len + 1):
|
||||
if length > len(full_compact):
|
||||
continue
|
||||
for index in range(0, len(full_compact) - length + 1):
|
||||
windows.add(full_compact[index : index + length])
|
||||
|
||||
return sorted(windows)
|
||||
|
||||
|
||||
def _length_is_plausible(value: str, candidate_length: int) -> bool:
|
||||
if not value:
|
||||
return False
|
||||
return int(candidate_length * 0.65) <= len(value) <= int(candidate_length * 1.6)
|
||||
|
||||
|
||||
def _is_better_match(match: FuzzyMatch, best: FuzzyMatch) -> bool:
|
||||
if match.score > best.score + 0.03:
|
||||
return True
|
||||
if match.score < best.score - 0.03:
|
||||
return False
|
||||
if match.coverage > best.coverage + 0.05:
|
||||
return True
|
||||
if match.coverage < best.coverage - 0.05:
|
||||
return False
|
||||
return len(compact_text(match.value)) > len(compact_text(best.value))
|
||||
@@ -3,8 +3,12 @@ from __future__ import annotations
|
||||
import re
|
||||
from dataclasses import dataclass, asdict
|
||||
|
||||
from app.fuzzy_match import best_fuzzy_match
|
||||
|
||||
|
||||
ORDER_RE = re.compile(r"\b(?P<order>\d{4}/\d{4}/(?:[1-9]|[1-9]\d))\b")
|
||||
DEFAULT_MODEL_MIN_SCORE = 0.72
|
||||
DEFAULT_COLOR_MIN_SCORE = 0.72
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -13,8 +17,10 @@ class ParsedLabel:
|
||||
color_code: str | None
|
||||
product_model: str | None
|
||||
raw_text: str
|
||||
color_score: float | None = None
|
||||
product_model_score: float | None = None
|
||||
|
||||
def to_dict(self) -> dict[str, str | None]:
|
||||
def to_dict(self) -> dict[str, str | float | None]:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
@@ -22,23 +28,24 @@ def normalize_ocr_text(text: str) -> str:
|
||||
return " ".join(text.replace("\n", " ").replace("\r", " ").split())
|
||||
|
||||
|
||||
def parse_label_text(text: str, known_colors: list[str], known_models: list[str]) -> ParsedLabel:
|
||||
def parse_label_text(
|
||||
text: str,
|
||||
known_colors: list[str],
|
||||
known_models: list[str],
|
||||
model_min_score: float = DEFAULT_MODEL_MIN_SCORE,
|
||||
color_min_score: float = DEFAULT_COLOR_MIN_SCORE,
|
||||
) -> ParsedLabel:
|
||||
normalized = normalize_ocr_text(text)
|
||||
order_match = ORDER_RE.search(normalized)
|
||||
|
||||
normalized_upper = normalized.upper()
|
||||
color_code = next(
|
||||
(color for color in known_colors if color.upper() in normalized_upper),
|
||||
None,
|
||||
)
|
||||
product_model = next(
|
||||
(model for model in known_models if re.search(rf"\b{re.escape(model)}\b", normalized, re.I)),
|
||||
None,
|
||||
)
|
||||
color_match = best_fuzzy_match(normalized, known_colors, color_min_score)
|
||||
model_match = best_fuzzy_match(normalized, known_models, model_min_score)
|
||||
|
||||
return ParsedLabel(
|
||||
order_number=order_match.group("order") if order_match else None,
|
||||
color_code=color_code,
|
||||
product_model=product_model,
|
||||
color_code=color_match.value if color_match else None,
|
||||
product_model=model_match.value if model_match else None,
|
||||
raw_text=normalized,
|
||||
color_score=color_match.score if color_match else None,
|
||||
product_model_score=model_match.score if model_match else None,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user