Compare commits

...

4 Commits

15 changed files with 828 additions and 90 deletions

View File

@@ -39,5 +39,22 @@ Istotne ustawienia:
- `detection.mode` - `best` rysuje najlepsza etykiete, `all` rysuje wszystkie wykrycia. - `detection.mode` - `best` rysuje najlepsza etykiete, `all` rysuje wszystkie wykrycia.
- `detection.frame_stride` - YOLO uruchamiany co N klatek podczas aktywnego wykrywania. - `detection.frame_stride` - YOLO uruchamiany co N klatek podczas aktywnego wykrywania.
- `label_data.models`, `label_data.colors` - slowniki do walidacji tekstu z etykiety. - `label_data.models`, `label_data.colors` - slowniki do walidacji tekstu z etykiety.
- `ocr.enabled`, `ocr.engine` - wlaczenie OCR i wybor silnika: `none`, `tesseract`, `paddle`.
Zdjecia trafiaja do `captures/photos`, filmy do `captures/videos`. Obok kazdego pliku media zapisywany jest JSON z aktualnym wynikiem detekcji/OCR. Zdjecia trafiaja do `captures/photos`, filmy do `captures/videos`. Obok kazdego pliku media zapisywany jest JSON z aktualnym wynikiem detekcji/OCR.
## Testowanie OCR poza aplikacja
OCR mozna testowac na gotowych cropach bez uruchamiania kamery i YOLO:
```bash
.venv-lin/bin/python -m app.ocr.cli crop --engine none
.venv-lin/bin/python -m app.ocr.cli crop --engine tesseract
.venv-lin/bin/python -m app.ocr.cli crop --engine paddle --json
```
Backend PaddleOCR jest opcjonalny. Zaleznosci do testow PaddleOCR sa w osobnym pliku:
```bash
.venv-lin/bin/pip install -r requirements-ocr-paddle.txt
```

View File

@@ -41,10 +41,23 @@ DEFAULT_CONFIG: dict[str, Any] = {
}, },
"ocr": { "ocr": {
"enabled": True, "enabled": True,
"engine": "tesseract",
"language": "eng", "language": "eng",
"tesseract_cmd": None, "tesseract_cmd": None,
"psm": 6,
"margin": 0,
"threshold": True, "threshold": True,
"paddle_threshold": False,
"scale": 2.0, "scale": 2.0,
"config": "",
"use_angle_cls": True,
"paddle": {
"enable_mkldnn": False,
"lang": "en",
"use_doc_orientation_classify": False,
"use_doc_unwarping": False,
"use_textline_orientation": False,
},
}, },
"capture": { "capture": {
"photos_dir": "captures/photos", "photos_dir": "captures/photos",
@@ -56,7 +69,12 @@ DEFAULT_CONFIG: dict[str, Any] = {
"display": { "display": {
"show_fps": True, "show_fps": True,
}, },
"label_data": {"models": ["Regius", "Duvell"], "colors": ["T-NF-BLK-OUT-BST-G", "T-BLK-G"]}, "label_data": {
"model_min_score": 0.72,
"color_min_score": 0.72,
"models": ["Regius", "Duvell"],
"colors": ["T-NF-BLK-OUT-BST-G", "T-BLK-G"],
},
} }

View File

@@ -4,10 +4,10 @@ from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
import cv2
import numpy as np import numpy as np
from app.label_parser import ParsedLabel, parse_label_text from app.label_parser import ParsedLabel, parse_label_text
from app.ocr import create_ocr_engine
@dataclass @dataclass
@@ -18,6 +18,9 @@ class DetectionResult:
raw_text: str = "" raw_text: str = ""
parsed: ParsedLabel | None = None parsed: ParsedLabel | None = None
error: str | None = None error: str | None = None
ocr_engine: str | None = None
ocr_confidence: float | None = None
ocr_elapsed_ms: float | None = None
all_boxes: list[dict[str, Any]] = field(default_factory=list) all_boxes: list[dict[str, Any]] = field(default_factory=list)
def to_metadata(self) -> dict[str, Any]: def to_metadata(self) -> dict[str, Any]:
@@ -28,6 +31,9 @@ class DetectionResult:
"raw_text": self.raw_text, "raw_text": self.raw_text,
"parsed": self.parsed.to_dict() if self.parsed else None, "parsed": self.parsed.to_dict() if self.parsed else None,
"error": self.error, "error": self.error,
"ocr_engine": self.ocr_engine,
"ocr_confidence": self.ocr_confidence,
"ocr_elapsed_ms": self.ocr_elapsed_ms,
"all_boxes": self.all_boxes, "all_boxes": self.all_boxes,
} }
@@ -72,6 +78,9 @@ class YoloLabelDetector:
boxes = [] boxes = []
names = getattr(self.model, "names", {}) names = getattr(self.model, "names", {})
for result in results: for result in results:
if result.boxes is None:
continue
for box in result.boxes: for box in result.boxes:
x1, y1, x2, y2 = [int(v) for v in box.xyxy[0].tolist()] x1, y1, x2, y2 = [int(v) for v in box.xyxy[0].tolist()]
confidence = float(box.conf[0]) confidence = float(box.conf[0])
@@ -106,78 +115,30 @@ class YoloLabelDetector:
return result return result
class TesseractOcr:
def __init__(self, config: dict[str, Any]) -> None:
self.config = config
self.load_error: str | None = None
self.pytesseract = None
self._load()
def _load(self) -> None:
if not self.config["ocr"].get("enabled", True):
return
try:
import pytesseract
command = self.config["ocr"].get("tesseract_cmd")
if command:
pytesseract.pytesseract.tesseract_cmd = command
self.pytesseract = pytesseract
except Exception as exc:
self.load_error = f"Nie mozna zaladowac pytesseract: {exc}"
def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> tuple[str, str | None]:
if not self.config["ocr"].get("enabled", True):
return "", None
if self.pytesseract is None:
return "", self.load_error or "OCR nie jest zaladowany"
x1, y1, x2, y2 = bbox
h, w = frame_bgr.shape[:2]
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(w, x2), min(h, y2)
if x2 <= x1 or y2 <= y1:
return "", "Nieprawidlowy bbox OCR"
roi = frame_bgr[y1:y2, x1:x2]
scale = float(self.config["ocr"].get("scale", 1.0))
if scale != 1.0:
roi = cv2.resize(roi, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
if self.config["ocr"].get("threshold", True):
gray = cv2.GaussianBlur(gray, (3, 3), 0)
gray = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]
try:
text = self.pytesseract.image_to_string(
gray,
lang=self.config["ocr"].get("language", "eng"),
config="--psm 6",
)
except Exception as exc:
return "", f"Blad OCR: {exc}"
return text, None
class DetectionPipeline: class DetectionPipeline:
def __init__(self, config: dict[str, Any], app_config: Any) -> None: def __init__(self, config: dict[str, Any], app_config: Any) -> None:
self.config = config self.config = config
self.detector = YoloLabelDetector(config, app_config) self.detector = YoloLabelDetector(config, app_config)
self.ocr = TesseractOcr(config) self.ocr = create_ocr_engine(config)
def process(self, frame_bgr: np.ndarray) -> DetectionResult: def process(self, frame_bgr: np.ndarray) -> DetectionResult:
result = self.detector.detect(frame_bgr) result = self.detector.detect(frame_bgr)
if result.xyxy is None: if result.xyxy is None:
return result return result
text, ocr_error = self.ocr.read_label(frame_bgr, result.xyxy) ocr_result = self.ocr.read_label(frame_bgr, result.xyxy)
result.raw_text = text result.raw_text = ocr_result.text
result.ocr_engine = ocr_result.engine
result.ocr_confidence = ocr_result.confidence
result.ocr_elapsed_ms = ocr_result.elapsed_ms
label_cfg = self.config["label_data"]
result.parsed = parse_label_text( result.parsed = parse_label_text(
text, ocr_result.text,
self.config["label_data"].get("colors", []), label_cfg.get("colors", []),
self.config["label_data"].get("models", []), label_cfg.get("models", []),
model_min_score=float(label_cfg.get("model_min_score", 0.72)),
color_min_score=float(label_cfg.get("color_min_score", 0.72)),
) )
if ocr_error: if ocr_result.error:
result.error = ocr_error result.error = ocr_result.error
return result return result

131
app/fuzzy_match.py Normal file
View File

@@ -0,0 +1,131 @@
from __future__ import annotations
import re
import unicodedata
from dataclasses import dataclass
from difflib import SequenceMatcher
@dataclass(frozen=True)
class FuzzyMatch:
value: str
score: float
matched_text: str
coverage: float
def compact_text(text: str) -> str:
normalized = unicodedata.normalize("NFKD", text)
ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
return re.sub(r"[^A-Z0-9]+", "", ascii_text.upper())
def token_text(text: str) -> list[str]:
normalized = unicodedata.normalize("NFKD", text)
ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
return re.findall(r"[A-Z0-9]+", ascii_text.upper())
def similarity(left: str, right: str) -> float:
if not left or not right:
return 0.0
return SequenceMatcher(None, left, right).ratio()
def best_fuzzy_match(text: str, candidates: list[str], min_score: float = 0.72) -> FuzzyMatch | None:
best: FuzzyMatch | None = None
for candidate in candidates:
candidate_compact = compact_text(candidate)
if not candidate_compact:
continue
score, matched_text, coverage = best_candidate_score(text, candidate_compact)
match = FuzzyMatch(
value=candidate,
score=score,
matched_text=matched_text,
coverage=coverage,
)
if best is None or _is_better_match(match, best):
best = match
if best is None or best.score < min_score:
return None
return best
def best_candidate_score(text: str, candidate_compact: str) -> tuple[float, str, float]:
full_compact = compact_text(text)
if candidate_compact in full_compact:
return 1.0, candidate_compact, 1.0
windows = candidate_windows(text, len(candidate_compact))
if not windows and full_compact:
windows = [full_compact]
best_score = 0.0
best_window = ""
best_coverage = 0.0
for window in windows:
coverage = min(len(window), len(candidate_compact)) / max(len(window), len(candidate_compact))
score = similarity(candidate_compact, window) * coverage * digit_match_weight(candidate_compact, window)
if score > best_score:
best_score = score
best_window = window
best_coverage = coverage
return best_score, best_window, best_coverage
def digit_match_weight(candidate: str, matched_text: str) -> float:
candidate_digits = re.findall(r"\d", candidate)
matched_digits = re.findall(r"\d", matched_text)
if not candidate_digits or not matched_digits:
return 1.0
if candidate_digits == matched_digits:
return 1.05
return 0.7
def candidate_windows(text: str, candidate_length: int) -> list[str]:
tokens = token_text(text)
windows: set[str] = set()
for token in tokens:
windows.add(token)
max_ngram = min(8, len(tokens))
for size in range(2, max_ngram + 1):
for index in range(0, len(tokens) - size + 1):
joined = "".join(tokens[index : index + size])
if _length_is_plausible(joined, candidate_length):
windows.add(joined)
full_compact = compact_text(text)
if full_compact:
min_len = max(1, int(candidate_length * 0.65))
max_len = max(min_len, int(candidate_length * 1.35))
for length in range(min_len, max_len + 1):
if length > len(full_compact):
continue
for index in range(0, len(full_compact) - length + 1):
windows.add(full_compact[index : index + length])
return sorted(windows)
def _length_is_plausible(value: str, candidate_length: int) -> bool:
if not value:
return False
return int(candidate_length * 0.65) <= len(value) <= int(candidate_length * 1.6)
def _is_better_match(match: FuzzyMatch, best: FuzzyMatch) -> bool:
if match.score > best.score + 0.03:
return True
if match.score < best.score - 0.03:
return False
if match.coverage > best.coverage + 0.05:
return True
if match.coverage < best.coverage - 0.05:
return False
return len(compact_text(match.value)) > len(compact_text(best.value))

View File

@@ -3,8 +3,12 @@ from __future__ import annotations
import re import re
from dataclasses import dataclass, asdict from dataclasses import dataclass, asdict
from app.fuzzy_match import best_fuzzy_match
ORDER_RE = re.compile(r"\b(?P<order>\d{4}/\d{4}/(?:[1-9]|[1-9]\d))\b") ORDER_RE = re.compile(r"\b(?P<order>\d{4}/\d{4}/(?:[1-9]|[1-9]\d))\b")
DEFAULT_MODEL_MIN_SCORE = 0.72
DEFAULT_COLOR_MIN_SCORE = 0.72
@dataclass @dataclass
@@ -13,8 +17,10 @@ class ParsedLabel:
color_code: str | None color_code: str | None
product_model: str | None product_model: str | None
raw_text: str raw_text: str
color_score: float | None = None
product_model_score: float | None = None
def to_dict(self) -> dict[str, str | None]: def to_dict(self) -> dict[str, str | float | None]:
return asdict(self) return asdict(self)
@@ -22,23 +28,24 @@ def normalize_ocr_text(text: str) -> str:
return " ".join(text.replace("\n", " ").replace("\r", " ").split()) return " ".join(text.replace("\n", " ").replace("\r", " ").split())
def parse_label_text(text: str, known_colors: list[str], known_models: list[str]) -> ParsedLabel: def parse_label_text(
text: str,
known_colors: list[str],
known_models: list[str],
model_min_score: float = DEFAULT_MODEL_MIN_SCORE,
color_min_score: float = DEFAULT_COLOR_MIN_SCORE,
) -> ParsedLabel:
normalized = normalize_ocr_text(text) normalized = normalize_ocr_text(text)
order_match = ORDER_RE.search(normalized) order_match = ORDER_RE.search(normalized)
normalized_upper = normalized.upper() color_match = best_fuzzy_match(normalized, known_colors, color_min_score)
color_code = next( model_match = best_fuzzy_match(normalized, known_models, model_min_score)
(color for color in known_colors if color.upper() in normalized_upper),
None,
)
product_model = next(
(model for model in known_models if re.search(rf"\b{re.escape(model)}\b", normalized, re.I)),
None,
)
return ParsedLabel( return ParsedLabel(
order_number=order_match.group("order") if order_match else None, order_number=order_match.group("order") if order_match else None,
color_code=color_code, color_code=color_match.value if color_match else None,
product_model=product_model, product_model=model_match.value if model_match else None,
raw_text=normalized, raw_text=normalized,
color_score=color_match.score if color_match else None,
product_model_score=model_match.score if model_match else None,
) )

View File

@@ -6,12 +6,13 @@ from typing import Any
import cv2 import cv2
import numpy as np import numpy as np
from PySide6.QtCore import Qt, Slot from PySide6.QtCore import Qt, QTimer, Slot
from PySide6.QtGui import QAction, QImage, QPixmap from PySide6.QtGui import QAction, QImage, QPixmap
from PySide6.QtWidgets import ( from PySide6.QtWidgets import (
QApplication, QApplication,
QHBoxLayout, QHBoxLayout,
QLabel, QLabel,
QFileDialog,
QMainWindow, QMainWindow,
QMessageBox, QMessageBox,
QPushButton, QPushButton,
@@ -42,6 +43,10 @@ class MainWindow(QMainWindow):
self.fps_frame_count = 0 self.fps_frame_count = 0
self.fps_last_time = time.monotonic() self.fps_last_time = time.monotonic()
self.display_fps = 0.0 self.display_fps = 0.0
self.video_capture: cv2.VideoCapture | None = None
self.video_timer = QTimer(self)
self.video_timer.timeout.connect(self._read_video_frame)
self.video_playing = False
self.media_store = MediaStore(self.config, self.app_config) self.media_store = MediaStore(self.config, self.app_config)
self.video_recorder = VideoRecorder(self.config, self.app_config) self.video_recorder = VideoRecorder(self.config, self.app_config)
@@ -119,12 +124,19 @@ class MainWindow(QMainWindow):
) )
toolbar_layout = QHBoxLayout(self.toolbar) toolbar_layout = QHBoxLayout(self.toolbar)
toolbar_layout.setContentsMargins(8, 6, 8, 6) toolbar_layout.setContentsMargins(8, 6, 8, 6)
self.load_video_button = self._tool_button(QStyle.SP_DirOpenIcon, "Wczytaj film")
self.video_play_button = self._tool_button(QStyle.SP_MediaPlay, "Play/pauza filmu")
self.photo_button = self._tool_button(QStyle.SP_DialogSaveButton, "Zrob zdjecie") self.photo_button = self._tool_button(QStyle.SP_DialogSaveButton, "Zrob zdjecie")
self.record_button = self._tool_button(QStyle.SP_MediaPlay, "Start/stop nagrywania") self.record_button = self._tool_button(QStyle.SP_MediaPlay, "Start/stop nagrywania")
self.settings_button = self._tool_button(QStyle.SP_FileDialogDetailedView, "Ustawienia obrazu") self.settings_button = self._tool_button(QStyle.SP_FileDialogDetailedView, "Ustawienia obrazu")
toolbar_layout.addWidget(self.load_video_button)
toolbar_layout.addWidget(self.video_play_button)
toolbar_layout.addWidget(self.photo_button) toolbar_layout.addWidget(self.photo_button)
toolbar_layout.addWidget(self.record_button) toolbar_layout.addWidget(self.record_button)
toolbar_layout.addWidget(self.settings_button) toolbar_layout.addWidget(self.settings_button)
self.video_play_button.setEnabled(False)
self.load_video_button.clicked.connect(self.load_video)
self.video_play_button.clicked.connect(self.toggle_video_playback)
self.photo_button.clicked.connect(self.take_photo) self.photo_button.clicked.connect(self.take_photo)
self.record_button.clicked.connect(self.toggle_recording) self.record_button.clicked.connect(self.toggle_recording)
self.settings_button.clicked.connect(self.open_settings) self.settings_button.clicked.connect(self.open_settings)
@@ -158,9 +170,10 @@ class MainWindow(QMainWindow):
def closeEvent(self, event: Any) -> None: def closeEvent(self, event: Any) -> None:
if self.video_recorder.is_recording: if self.video_recorder.is_recording:
self.video_recorder.stop(self.current_metadata("video")) self.video_recorder.stop(self.current_metadata("video"))
self.worker.stop() self.video_timer.stop()
self._close_video_capture()
self._stop_camera_worker()
self.detection_worker.stop() self.detection_worker.stop()
self.worker.wait(2000)
self.detection_worker.wait(2000) self.detection_worker.wait(2000)
super().closeEvent(event) super().closeEvent(event)
@@ -223,6 +236,41 @@ class MainWindow(QMainWindow):
self.record_button.setIcon(self.style().standardIcon(QStyle.SP_MediaStop)) self.record_button.setIcon(self.style().standardIcon(QStyle.SP_MediaStop))
self.statusBar().showMessage(f"Nagrywanie: {path}", 5000) self.statusBar().showMessage(f"Nagrywanie: {path}", 5000)
def load_video(self) -> None:
path, _ = QFileDialog.getOpenFileName(
self,
"Wczytaj film",
"",
"Filmy (*.mp4 *.avi *.mov *.mkv *.m4v);;Wszystkie pliki (*)",
)
if not path:
return
capture = cv2.VideoCapture(path)
if not capture.isOpened():
QMessageBox.warning(self, "Film", "Nie mozna otworzyc pliku wideo")
capture.release()
return
if self.video_recorder.is_recording:
self.video_recorder.stop(self.current_metadata("video"))
self.record_button.setIcon(self.style().standardIcon(QStyle.SP_MediaPlay))
self._stop_camera_worker()
self._close_video_capture()
self.video_capture = capture
self.video_play_button.setEnabled(True)
self._set_video_playing(False)
self.overlay_result = None
self.last_detection = None
self.result_text.setPlainText(f"Wczytano film: {path}")
self._read_video_frame()
def toggle_video_playback(self) -> None:
if self.video_capture is None:
return
self._set_video_playing(not self.video_playing)
def open_settings(self) -> None: def open_settings(self) -> None:
dialog = SettingsDialog(self.config, self) dialog = SettingsDialog(self.config, self)
dialog.settings_saved.connect(self.save_camera_settings) dialog.settings_saved.connect(self.save_camera_settings)
@@ -232,8 +280,55 @@ class MainWindow(QMainWindow):
def save_camera_settings(self, camera_config: dict[str, Any]) -> None: def save_camera_settings(self, camera_config: dict[str, Any]) -> None:
self.config["camera"] = camera_config self.config["camera"] = camera_config
self.app_config.save(self.config) self.app_config.save(self.config)
if self.worker is not None:
self.worker.update_camera_config(camera_config) self.worker.update_camera_config(camera_config)
def _read_video_frame(self) -> None:
if self.video_capture is None:
return
ok, frame = self.video_capture.read()
if not ok or frame is None:
self._set_video_playing(False)
self.video_capture.set(cv2.CAP_PROP_POS_FRAMES, 0)
self.statusBar().showMessage("Koniec filmu", 3000)
return
self.on_frame_ready(frame)
def _set_video_playing(self, playing: bool) -> None:
self.video_playing = playing
if self.video_capture is None:
self.video_timer.stop()
self.video_play_button.setIcon(self.style().standardIcon(QStyle.SP_MediaPlay))
self.video_play_button.setEnabled(False)
return
if playing:
fps = self.video_capture.get(cv2.CAP_PROP_FPS)
if fps <= 0:
fps = float(self.config["camera"].get("fps", 30))
interval_ms = max(1, int(round(1000 / fps)))
self.video_timer.start(interval_ms)
self.video_play_button.setIcon(self.style().standardIcon(QStyle.SP_MediaPause))
else:
self.video_timer.stop()
self.video_play_button.setIcon(self.style().standardIcon(QStyle.SP_MediaPlay))
def _close_video_capture(self) -> None:
self._set_video_playing(False)
if self.video_capture is not None:
self.video_capture.release()
self.video_capture = None
self.video_play_button.setEnabled(False)
def _stop_camera_worker(self) -> None:
if self.worker is None:
return
self.worker.stop()
self.worker.wait(2000)
self.worker = None
def _maybe_request_detection(self, frame: np.ndarray) -> None: def _maybe_request_detection(self, frame: np.ndarray) -> None:
if not self.detecting: if not self.detecting:
return return
@@ -277,10 +372,18 @@ class MainWindow(QMainWindow):
lines.append(f"Komunikat: {result.error}") lines.append(f"Komunikat: {result.error}")
if result.confidence is not None: if result.confidence is not None:
lines.append(f"YOLO confidence: {result.confidence:.3f}") lines.append(f"YOLO confidence: {result.confidence:.3f}")
if result.ocr_engine:
lines.append(f"OCR: {result.ocr_engine}")
if result.ocr_confidence is not None:
lines.append(f"OCR confidence: {result.ocr_confidence:.3f}")
if result.ocr_elapsed_ms is not None:
lines.append(f"OCR czas: {result.ocr_elapsed_ms:.0f} ms")
if result.parsed: if result.parsed:
lines.append(f"Zamowienie: {result.parsed.order_number or '-'}") lines.append(f"Zamowienie: {result.parsed.order_number or '-'}")
lines.append(f"Kolor: {result.parsed.color_code or '-'}") color_score = _format_score(result.parsed.color_score)
lines.append(f"Model: {result.parsed.product_model or '-'}") model_score = _format_score(result.parsed.product_model_score)
lines.append(f"Kolor: {result.parsed.color_code or '-'}{color_score}")
lines.append(f"Model: {result.parsed.product_model or '-'}{model_score}")
if result.raw_text: if result.raw_text:
lines.append("") lines.append("")
lines.append(result.raw_text) lines.append(result.raw_text)
@@ -326,7 +429,7 @@ class MainWindow(QMainWindow):
def _draw_fps(self, frame_bgr: np.ndarray) -> None: def _draw_fps(self, frame_bgr: np.ndarray) -> None:
label = f"FPS: {self.display_fps:.1f}" label = f"FPS: {self.display_fps:.1f}"
cv2.rectangle(frame_bgr, (12, 12), (122, 46), (0, 0, 0), -1) cv2.rectangle(frame_bgr, (12, 12), (142, 46), (0, 0, 0), -1)
cv2.putText( cv2.putText(
frame_bgr, frame_bgr,
label, label,
@@ -344,3 +447,9 @@ def run_app(app_config: AppConfig) -> int:
window = MainWindow(app_config) window = MainWindow(app_config)
window.show() window.show()
return app.exec() return app.exec()
def _format_score(score: float | None) -> str:
if score is None:
return ""
return f" ({score:.2f})"

4
app/ocr/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from app.ocr.base import OcrEngine, OcrLine, OcrResult
from app.ocr.factory import create_ocr_engine
__all__ = ["OcrEngine", "OcrLine", "OcrResult", "create_ocr_engine"]

54
app/ocr/base.py Normal file
View File

@@ -0,0 +1,54 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Protocol
import cv2
import numpy as np
@dataclass
class OcrLine:
text: str
confidence: float | None = None
bbox: list[list[float]] | None = None
@dataclass
class OcrResult:
text: str = ""
confidence: float | None = None
lines: list[OcrLine] = field(default_factory=list)
error: str | None = None
elapsed_ms: float = 0.0
engine: str = "none"
class OcrEngine(Protocol):
name: str
def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> OcrResult:
...
def crop_bbox(frame_bgr: np.ndarray, bbox: tuple[int, int, int, int], margin: int = 0) -> np.ndarray | None:
x1, y1, x2, y2 = bbox
h, w = frame_bgr.shape[:2]
x1, y1 = max(0, x1 - margin), max(0, y1 - margin)
x2, y2 = min(w, x2 + margin), min(h, y2 + margin)
if x2 <= x1 or y2 <= y1:
return None
return frame_bgr[y1:y2, x1:x2]
def prepare_ocr_image(image_bgr: np.ndarray, config: dict) -> np.ndarray:
scale = float(config.get("scale", 1.0))
if scale != 1.0:
image_bgr = cv2.resize(image_bgr, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
if not config.get("threshold", False):
return image_bgr
gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (3, 3), 0)
return cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]

106
app/ocr/cli.py Normal file
View File

@@ -0,0 +1,106 @@
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
import cv2
from app.config import AppConfig
from app.label_parser import parse_label_text
from app.ocr import create_ocr_engine
def iter_images(path: Path) -> list[Path]:
if path.is_file():
return [path]
extensions = {".jpg", ".jpeg", ".png", ".bmp", ".webp", ".tif", ".tiff"}
return sorted(item for item in path.iterdir() if item.is_file() and item.suffix.lower() in extensions)
def result_to_dict(path: Path, result: Any, config: dict[str, Any]) -> dict[str, Any]:
label_cfg = config.get("label_data", {})
parsed = parse_label_text(
result.text,
label_cfg.get("colors", []),
label_cfg.get("models", []),
model_min_score=float(label_cfg.get("model_min_score", 0.72)),
color_min_score=float(label_cfg.get("color_min_score", 0.72)),
)
return {
"file": str(path),
"engine": result.engine,
"elapsed_ms": round(result.elapsed_ms, 2),
"confidence": result.confidence,
"error": result.error,
"text": result.text,
"lines": [
{
"text": line.text,
"confidence": line.confidence,
"bbox": line.bbox,
}
for line in result.lines
],
"parsed": parsed.to_dict(),
}
def main() -> int:
parser = argparse.ArgumentParser(description="Test OCR backend on cropped label images.")
parser.add_argument("path", help="Image file or directory with crop images")
parser.add_argument("--config", default="app_config.json", help="Application config JSON path")
parser.add_argument(
"--engine",
choices=["none", "tesseract", "paddle"],
help="Override ocr.engine from config",
)
parser.add_argument("--no-threshold", action="store_true", help="Disable threshold preprocessing")
parser.add_argument("--scale", type=float, help="Override OCR scale")
parser.add_argument("--json", action="store_true", help="Print JSON output")
args = parser.parse_args()
app_config = AppConfig(Path(args.config))
config = app_config.data
if args.engine:
config["ocr"]["engine"] = args.engine
config["ocr"]["enabled"] = args.engine != "none"
if args.no_threshold:
config["ocr"]["threshold"] = False
if args.scale is not None:
config["ocr"]["scale"] = args.scale
engine = create_ocr_engine(config)
outputs = []
for image_path in iter_images(Path(args.path)):
image = cv2.imread(str(image_path), cv2.IMREAD_COLOR)
if image is None:
outputs.append({"file": str(image_path), "error": "Nie mozna odczytac obrazu"})
continue
h, w = image.shape[:2]
result = engine.read_label(image, (0, 0, w, h))
outputs.append(result_to_dict(image_path, result, config))
if args.json:
print(json.dumps(outputs, indent=2, ensure_ascii=False))
return 0
for output in outputs:
print(f"file: {output['file']}")
print(f"engine: {output.get('engine')}")
print(f"elapsed_ms: {output.get('elapsed_ms')}")
print(f"confidence: {output.get('confidence')}")
if output.get("error"):
print(f"error: {output['error']}")
print("text:")
print(output.get("text") or "")
print(f"parsed: {output.get('parsed')}")
print()
return 0
if __name__ == "__main__":
raise SystemExit(main())

24
app/ocr/factory.py Normal file
View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from typing import Any
from app.ocr.base import OcrEngine
from app.ocr.none import NoOcrEngine
from app.ocr.paddle import PaddleOcrEngine
from app.ocr.tesseract import TesseractOcrEngine
def create_ocr_engine(config: dict[str, Any]) -> OcrEngine:
ocr_cfg = config.get("ocr", {})
if not ocr_cfg.get("enabled", True):
return NoOcrEngine(ocr_cfg)
engine = str(ocr_cfg.get("engine", "tesseract")).lower()
if engine in {"none", "off", "disabled"}:
return NoOcrEngine(ocr_cfg)
if engine == "tesseract":
return TesseractOcrEngine(ocr_cfg)
if engine == "paddle":
return PaddleOcrEngine(ocr_cfg)
raise ValueError(f"Nieznany silnik OCR: {engine}")

15
app/ocr/none.py Normal file
View File

@@ -0,0 +1,15 @@
from __future__ import annotations
import numpy as np
from app.ocr.base import OcrResult
class NoOcrEngine:
name = "none"
def __init__(self, config: dict) -> None:
self.config = config
def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> OcrResult:
return OcrResult(engine=self.name)

153
app/ocr/paddle.py Normal file
View File

@@ -0,0 +1,153 @@
from __future__ import annotations
import time
from typing import Any
import numpy as np
from app.ocr.base import OcrLine, OcrResult, crop_bbox, prepare_ocr_image
class PaddleOcrEngine:
name = "paddle"
def __init__(self, config: dict) -> None:
self.config = config
self.load_error: str | None = None
self.ocr: Any = None
self._load()
def _load(self) -> None:
try:
from paddleocr import PaddleOCR
except Exception as exc:
self.load_error = f"Nie mozna zaimportowac PaddleOCR: {exc}"
return
paddle_cfg = dict(self.config.get("paddle", {}))
paddle_cfg.setdefault("lang", self.config.get("language", "en"))
try:
self.ocr = PaddleOCR(**paddle_cfg)
except Exception as exc:
self.load_error = f"Nie mozna zaladowac PaddleOCR: {exc}"
def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> OcrResult:
started = time.perf_counter()
if self.ocr is None:
return OcrResult(
error=self.load_error or "PaddleOCR nie jest zaladowany",
elapsed_ms=self._elapsed_ms(started),
engine=self.name,
)
margin = int(self.config.get("margin", 0))
roi = crop_bbox(frame_bgr, bbox, margin=margin)
if roi is None:
return OcrResult(
error="Nieprawidlowy bbox OCR",
elapsed_ms=self._elapsed_ms(started),
engine=self.name,
)
preprocess_config = {
**self.config,
"threshold": bool(self.config.get("paddle_threshold", False)),
}
image = prepare_ocr_image(roi, preprocess_config)
try:
raw_result = self._run_ocr(image)
except Exception as exc:
return OcrResult(
error=f"Blad PaddleOCR: {exc}",
elapsed_ms=self._elapsed_ms(started),
engine=self.name,
)
lines = self._parse_lines(raw_result)
text = "\n".join(line.text for line in lines)
confidences = [line.confidence for line in lines if line.confidence is not None]
confidence = sum(confidences) / len(confidences) if confidences else None
return OcrResult(
text=text,
confidence=confidence,
lines=lines,
elapsed_ms=self._elapsed_ms(started),
engine=self.name,
)
def _run_ocr(self, image: np.ndarray) -> Any:
if hasattr(self.ocr, "predict"):
return self.ocr.predict(image)
try:
return self.ocr.ocr(image, cls=bool(self.config.get("use_angle_cls", True)))
except TypeError:
return self.ocr.ocr(image)
def _parse_lines(self, raw_result: Any) -> list[OcrLine]:
if raw_result is None:
return []
lines: list[OcrLine] = []
for item in self._iter_result_items(raw_result):
parsed = self._parse_item(item)
if parsed is not None and parsed.text.strip():
lines.append(parsed)
return lines
def _iter_result_items(self, raw_result: Any) -> list[Any]:
if isinstance(raw_result, dict):
texts = raw_result.get("rec_texts") or raw_result.get("texts")
scores = raw_result.get("rec_scores") or raw_result.get("scores") or []
boxes = raw_result.get("rec_polys") or raw_result.get("dt_polys") or raw_result.get("boxes") or []
if texts:
return [
(boxes[index] if index < len(boxes) else None, (text, scores[index] if index < len(scores) else None))
for index, text in enumerate(texts)
]
return []
if isinstance(raw_result, list) and len(raw_result) == 1 and isinstance(raw_result[0], list):
return raw_result[0]
if isinstance(raw_result, list):
items = []
for result in raw_result:
if isinstance(result, dict):
items.extend(self._iter_result_items(result))
elif isinstance(result, list):
items.extend(result)
else:
items.append(result)
return items
return [raw_result]
def _parse_item(self, item: Any) -> OcrLine | None:
if not isinstance(item, (list, tuple)):
return None
if len(item) >= 2 and isinstance(item[1], (list, tuple)) and item[1]:
text = str(item[1][0])
confidence = self._to_float(item[1][1]) if len(item[1]) > 1 else None
bbox = self._to_bbox(item[0])
return OcrLine(text=text, confidence=confidence, bbox=bbox)
if len(item) >= 2 and isinstance(item[0], str):
return OcrLine(text=str(item[0]), confidence=self._to_float(item[1]))
return None
def _to_float(self, value: Any) -> float | None:
try:
return float(value)
except (TypeError, ValueError):
return None
def _to_bbox(self, value: Any) -> list[list[float]] | None:
if value is None:
return None
try:
return [[float(point[0]), float(point[1])] for point in value]
except (TypeError, ValueError, IndexError):
return None
def _elapsed_ms(self, started: float) -> float:
return (time.perf_counter() - started) * 1000.0

104
app/ocr/tesseract.py Normal file
View File

@@ -0,0 +1,104 @@
from __future__ import annotations
import time
import numpy as np
from app.ocr.base import OcrLine, OcrResult, crop_bbox, prepare_ocr_image
class TesseractOcrEngine:
name = "tesseract"
def __init__(self, config: dict) -> None:
self.config = config
self.load_error: str | None = None
self.pytesseract = None
self._load()
def _load(self) -> None:
try:
import pytesseract
command = self.config.get("tesseract_cmd")
if command:
pytesseract.pytesseract.tesseract_cmd = command
self.pytesseract = pytesseract
except Exception as exc:
self.load_error = f"Nie mozna zaladowac pytesseract: {exc}"
def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> OcrResult:
started = time.perf_counter()
if self.pytesseract is None:
return OcrResult(
error=self.load_error or "OCR Tesseract nie jest zaladowany",
elapsed_ms=self._elapsed_ms(started),
engine=self.name,
)
margin = int(self.config.get("margin", 0))
roi = crop_bbox(frame_bgr, bbox, margin=margin)
if roi is None:
return OcrResult(
error="Nieprawidlowy bbox OCR",
elapsed_ms=self._elapsed_ms(started),
engine=self.name,
)
image = prepare_ocr_image(roi, self.config)
psm = int(self.config.get("psm", 6))
language = self.config.get("language", "eng")
extra_config = str(self.config.get("config", "")).strip()
tesseract_config = f"--psm {psm}"
if extra_config:
tesseract_config = f"{tesseract_config} {extra_config}"
try:
text = self.pytesseract.image_to_string(
image,
lang=language,
config=tesseract_config,
)
except Exception as exc:
return OcrResult(
error=f"Blad OCR Tesseract: {exc}",
elapsed_ms=self._elapsed_ms(started),
engine=self.name,
)
confidence = self._mean_confidence(image, language, tesseract_config)
return OcrResult(
text=text,
confidence=confidence,
lines=[OcrLine(text=line) for line in text.splitlines() if line.strip()],
elapsed_ms=self._elapsed_ms(started),
engine=self.name,
)
def _mean_confidence(self, image: np.ndarray, language: str, tesseract_config: str) -> float | None:
if self.pytesseract is None:
return None
try:
data = self.pytesseract.image_to_data(
image,
lang=language,
config=tesseract_config,
output_type=self.pytesseract.Output.DICT,
)
except Exception:
return None
values = []
for raw_conf in data.get("conf", []):
try:
confidence = float(raw_conf)
except (TypeError, ValueError):
continue
if confidence >= 0:
values.append(confidence / 100.0)
if not values:
return None
return sum(values) / len(values)
def _elapsed_ms(self, started: float) -> float:
return (time.perf_counter() - started) * 1000.0

View File

@@ -20,19 +20,32 @@
} }
}, },
"detection": { "detection": {
"model_path": "models/best.pt", "model_path": "models/best_v1.pt",
"confidence_threshold": 0.25, "confidence_threshold": 0.25,
"mode": "best", "mode": "best",
"frame_stride": 5, "frame_stride": 30,
"image_size": 640, "image_size": 640,
"device": "cpu" "device": "cpu"
}, },
"ocr": { "ocr": {
"enabled": true, "enabled": true,
"engine": "paddle",
"language": "eng", "language": "eng",
"tesseract_cmd": null, "tesseract_cmd": null,
"psm": 6,
"margin": 0,
"threshold": true, "threshold": true,
"scale": 2.0 "paddle_threshold": false,
"scale": 2.0,
"config": "",
"use_angle_cls": true,
"paddle": {
"enable_mkldnn": false,
"lang": "en",
"use_doc_orientation_classify": false,
"use_doc_unwarping": false,
"use_textline_orientation": false
}
}, },
"capture": { "capture": {
"photos_dir": "captures/photos", "photos_dir": "captures/photos",
@@ -45,13 +58,33 @@
"show_fps": true "show_fps": true
}, },
"label_data": { "label_data": {
"model_min_score": 0.72,
"color_min_score": 0.72,
"models": [ "models": [
"Regius", "Regius 6",
"Duvell" "Regius 7",
"Duvell 6",
"Duvell 7",
"Duvell Elite 6",
"Duvell Elite 7"
], ],
"colors": [ "colors": [
"T-NF-BLK-OUT-BST-G", "T-NF-BLK-OUT-BST-G",
"T-BLK-G" "T-BLK-G",
"T-BLK-S",
"T-BLK-M",
"M-BLK-G",
"M-BLK-S",
"M-BLK-M",
"T-CST-G",
"T-CST-S",
"T-CST-M",
"T-ANTIQUE-G",
"T-ANTIQUE-S",
"T-ANTIQUE-M",
"T-NAT-G",
"T-NAT-S",
"T-NAT-M"
] ]
} }
} }

View File

@@ -0,0 +1,2 @@
paddlepaddle
paddleocr