Compare commits

...

4 Commits

15 changed files with 828 additions and 90 deletions

View File

@@ -39,5 +39,22 @@ Istotne ustawienia:
- `detection.mode` - `best` rysuje najlepsza etykiete, `all` rysuje wszystkie wykrycia.
- `detection.frame_stride` - YOLO uruchamiany co N klatek podczas aktywnego wykrywania.
- `label_data.models`, `label_data.colors` - slowniki do walidacji tekstu z etykiety.
- `ocr.enabled`, `ocr.engine` - wlaczenie OCR i wybor silnika: `none`, `tesseract`, `paddle`.
Zdjecia trafiaja do `captures/photos`, filmy do `captures/videos`. Obok kazdego pliku media zapisywany jest JSON z aktualnym wynikiem detekcji/OCR.
## Testowanie OCR poza aplikacja
OCR mozna testowac na gotowych cropach bez uruchamiania kamery i YOLO:
```bash
.venv-lin/bin/python -m app.ocr.cli crop --engine none
.venv-lin/bin/python -m app.ocr.cli crop --engine tesseract
.venv-lin/bin/python -m app.ocr.cli crop --engine paddle --json
```
Backend PaddleOCR jest opcjonalny. Zaleznosci do testow PaddleOCR sa w osobnym pliku:
```bash
.venv-lin/bin/pip install -r requirements-ocr-paddle.txt
```

View File

@@ -41,10 +41,23 @@ DEFAULT_CONFIG: dict[str, Any] = {
},
"ocr": {
"enabled": True,
"engine": "tesseract",
"language": "eng",
"tesseract_cmd": None,
"psm": 6,
"margin": 0,
"threshold": True,
"paddle_threshold": False,
"scale": 2.0,
"config": "",
"use_angle_cls": True,
"paddle": {
"enable_mkldnn": False,
"lang": "en",
"use_doc_orientation_classify": False,
"use_doc_unwarping": False,
"use_textline_orientation": False,
},
},
"capture": {
"photos_dir": "captures/photos",
@@ -56,7 +69,12 @@ DEFAULT_CONFIG: dict[str, Any] = {
"display": {
"show_fps": True,
},
"label_data": {"models": ["Regius", "Duvell"], "colors": ["T-NF-BLK-OUT-BST-G", "T-BLK-G"]},
"label_data": {
"model_min_score": 0.72,
"color_min_score": 0.72,
"models": ["Regius", "Duvell"],
"colors": ["T-NF-BLK-OUT-BST-G", "T-BLK-G"],
},
}

View File

@@ -4,10 +4,10 @@ from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import cv2
import numpy as np
from app.label_parser import ParsedLabel, parse_label_text
from app.ocr import create_ocr_engine
@dataclass
@@ -18,6 +18,9 @@ class DetectionResult:
raw_text: str = ""
parsed: ParsedLabel | None = None
error: str | None = None
ocr_engine: str | None = None
ocr_confidence: float | None = None
ocr_elapsed_ms: float | None = None
all_boxes: list[dict[str, Any]] = field(default_factory=list)
def to_metadata(self) -> dict[str, Any]:
@@ -28,6 +31,9 @@ class DetectionResult:
"raw_text": self.raw_text,
"parsed": self.parsed.to_dict() if self.parsed else None,
"error": self.error,
"ocr_engine": self.ocr_engine,
"ocr_confidence": self.ocr_confidence,
"ocr_elapsed_ms": self.ocr_elapsed_ms,
"all_boxes": self.all_boxes,
}
@@ -72,6 +78,9 @@ class YoloLabelDetector:
boxes = []
names = getattr(self.model, "names", {})
for result in results:
if result.boxes is None:
continue
for box in result.boxes:
x1, y1, x2, y2 = [int(v) for v in box.xyxy[0].tolist()]
confidence = float(box.conf[0])
@@ -106,78 +115,30 @@ class YoloLabelDetector:
return result
class TesseractOcr:
def __init__(self, config: dict[str, Any]) -> None:
self.config = config
self.load_error: str | None = None
self.pytesseract = None
self._load()
def _load(self) -> None:
if not self.config["ocr"].get("enabled", True):
return
try:
import pytesseract
command = self.config["ocr"].get("tesseract_cmd")
if command:
pytesseract.pytesseract.tesseract_cmd = command
self.pytesseract = pytesseract
except Exception as exc:
self.load_error = f"Nie mozna zaladowac pytesseract: {exc}"
def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> tuple[str, str | None]:
if not self.config["ocr"].get("enabled", True):
return "", None
if self.pytesseract is None:
return "", self.load_error or "OCR nie jest zaladowany"
x1, y1, x2, y2 = bbox
h, w = frame_bgr.shape[:2]
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(w, x2), min(h, y2)
if x2 <= x1 or y2 <= y1:
return "", "Nieprawidlowy bbox OCR"
roi = frame_bgr[y1:y2, x1:x2]
scale = float(self.config["ocr"].get("scale", 1.0))
if scale != 1.0:
roi = cv2.resize(roi, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
if self.config["ocr"].get("threshold", True):
gray = cv2.GaussianBlur(gray, (3, 3), 0)
gray = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]
try:
text = self.pytesseract.image_to_string(
gray,
lang=self.config["ocr"].get("language", "eng"),
config="--psm 6",
)
except Exception as exc:
return "", f"Blad OCR: {exc}"
return text, None
class DetectionPipeline:
def __init__(self, config: dict[str, Any], app_config: Any) -> None:
self.config = config
self.detector = YoloLabelDetector(config, app_config)
self.ocr = TesseractOcr(config)
self.ocr = create_ocr_engine(config)
def process(self, frame_bgr: np.ndarray) -> DetectionResult:
result = self.detector.detect(frame_bgr)
if result.xyxy is None:
return result
text, ocr_error = self.ocr.read_label(frame_bgr, result.xyxy)
result.raw_text = text
ocr_result = self.ocr.read_label(frame_bgr, result.xyxy)
result.raw_text = ocr_result.text
result.ocr_engine = ocr_result.engine
result.ocr_confidence = ocr_result.confidence
result.ocr_elapsed_ms = ocr_result.elapsed_ms
label_cfg = self.config["label_data"]
result.parsed = parse_label_text(
text,
self.config["label_data"].get("colors", []),
self.config["label_data"].get("models", []),
ocr_result.text,
label_cfg.get("colors", []),
label_cfg.get("models", []),
model_min_score=float(label_cfg.get("model_min_score", 0.72)),
color_min_score=float(label_cfg.get("color_min_score", 0.72)),
)
if ocr_error:
result.error = ocr_error
if ocr_result.error:
result.error = ocr_result.error
return result

131
app/fuzzy_match.py Normal file
View File

@@ -0,0 +1,131 @@
from __future__ import annotations
import re
import unicodedata
from dataclasses import dataclass
from difflib import SequenceMatcher
@dataclass(frozen=True)
class FuzzyMatch:
value: str
score: float
matched_text: str
coverage: float
def compact_text(text: str) -> str:
normalized = unicodedata.normalize("NFKD", text)
ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
return re.sub(r"[^A-Z0-9]+", "", ascii_text.upper())
def token_text(text: str) -> list[str]:
normalized = unicodedata.normalize("NFKD", text)
ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
return re.findall(r"[A-Z0-9]+", ascii_text.upper())
def similarity(left: str, right: str) -> float:
if not left or not right:
return 0.0
return SequenceMatcher(None, left, right).ratio()
def best_fuzzy_match(text: str, candidates: list[str], min_score: float = 0.72) -> FuzzyMatch | None:
best: FuzzyMatch | None = None
for candidate in candidates:
candidate_compact = compact_text(candidate)
if not candidate_compact:
continue
score, matched_text, coverage = best_candidate_score(text, candidate_compact)
match = FuzzyMatch(
value=candidate,
score=score,
matched_text=matched_text,
coverage=coverage,
)
if best is None or _is_better_match(match, best):
best = match
if best is None or best.score < min_score:
return None
return best
def best_candidate_score(text: str, candidate_compact: str) -> tuple[float, str, float]:
full_compact = compact_text(text)
if candidate_compact in full_compact:
return 1.0, candidate_compact, 1.0
windows = candidate_windows(text, len(candidate_compact))
if not windows and full_compact:
windows = [full_compact]
best_score = 0.0
best_window = ""
best_coverage = 0.0
for window in windows:
coverage = min(len(window), len(candidate_compact)) / max(len(window), len(candidate_compact))
score = similarity(candidate_compact, window) * coverage * digit_match_weight(candidate_compact, window)
if score > best_score:
best_score = score
best_window = window
best_coverage = coverage
return best_score, best_window, best_coverage
def digit_match_weight(candidate: str, matched_text: str) -> float:
candidate_digits = re.findall(r"\d", candidate)
matched_digits = re.findall(r"\d", matched_text)
if not candidate_digits or not matched_digits:
return 1.0
if candidate_digits == matched_digits:
return 1.05
return 0.7
def candidate_windows(text: str, candidate_length: int) -> list[str]:
tokens = token_text(text)
windows: set[str] = set()
for token in tokens:
windows.add(token)
max_ngram = min(8, len(tokens))
for size in range(2, max_ngram + 1):
for index in range(0, len(tokens) - size + 1):
joined = "".join(tokens[index : index + size])
if _length_is_plausible(joined, candidate_length):
windows.add(joined)
full_compact = compact_text(text)
if full_compact:
min_len = max(1, int(candidate_length * 0.65))
max_len = max(min_len, int(candidate_length * 1.35))
for length in range(min_len, max_len + 1):
if length > len(full_compact):
continue
for index in range(0, len(full_compact) - length + 1):
windows.add(full_compact[index : index + length])
return sorted(windows)
def _length_is_plausible(value: str, candidate_length: int) -> bool:
if not value:
return False
return int(candidate_length * 0.65) <= len(value) <= int(candidate_length * 1.6)
def _is_better_match(match: FuzzyMatch, best: FuzzyMatch) -> bool:
if match.score > best.score + 0.03:
return True
if match.score < best.score - 0.03:
return False
if match.coverage > best.coverage + 0.05:
return True
if match.coverage < best.coverage - 0.05:
return False
return len(compact_text(match.value)) > len(compact_text(best.value))

View File

@@ -3,8 +3,12 @@ from __future__ import annotations
import re
from dataclasses import dataclass, asdict
from app.fuzzy_match import best_fuzzy_match
ORDER_RE = re.compile(r"\b(?P<order>\d{4}/\d{4}/(?:[1-9]|[1-9]\d))\b")
DEFAULT_MODEL_MIN_SCORE = 0.72
DEFAULT_COLOR_MIN_SCORE = 0.72
@dataclass
@@ -13,8 +17,10 @@ class ParsedLabel:
color_code: str | None
product_model: str | None
raw_text: str
color_score: float | None = None
product_model_score: float | None = None
def to_dict(self) -> dict[str, str | None]:
def to_dict(self) -> dict[str, str | float | None]:
return asdict(self)
@@ -22,23 +28,24 @@ def normalize_ocr_text(text: str) -> str:
return " ".join(text.replace("\n", " ").replace("\r", " ").split())
def parse_label_text(text: str, known_colors: list[str], known_models: list[str]) -> ParsedLabel:
def parse_label_text(
text: str,
known_colors: list[str],
known_models: list[str],
model_min_score: float = DEFAULT_MODEL_MIN_SCORE,
color_min_score: float = DEFAULT_COLOR_MIN_SCORE,
) -> ParsedLabel:
normalized = normalize_ocr_text(text)
order_match = ORDER_RE.search(normalized)
normalized_upper = normalized.upper()
color_code = next(
(color for color in known_colors if color.upper() in normalized_upper),
None,
)
product_model = next(
(model for model in known_models if re.search(rf"\b{re.escape(model)}\b", normalized, re.I)),
None,
)
color_match = best_fuzzy_match(normalized, known_colors, color_min_score)
model_match = best_fuzzy_match(normalized, known_models, model_min_score)
return ParsedLabel(
order_number=order_match.group("order") if order_match else None,
color_code=color_code,
product_model=product_model,
color_code=color_match.value if color_match else None,
product_model=model_match.value if model_match else None,
raw_text=normalized,
color_score=color_match.score if color_match else None,
product_model_score=model_match.score if model_match else None,
)

View File

@@ -6,12 +6,13 @@ from typing import Any
import cv2
import numpy as np
from PySide6.QtCore import Qt, Slot
from PySide6.QtCore import Qt, QTimer, Slot
from PySide6.QtGui import QAction, QImage, QPixmap
from PySide6.QtWidgets import (
QApplication,
QHBoxLayout,
QLabel,
QFileDialog,
QMainWindow,
QMessageBox,
QPushButton,
@@ -42,6 +43,10 @@ class MainWindow(QMainWindow):
self.fps_frame_count = 0
self.fps_last_time = time.monotonic()
self.display_fps = 0.0
self.video_capture: cv2.VideoCapture | None = None
self.video_timer = QTimer(self)
self.video_timer.timeout.connect(self._read_video_frame)
self.video_playing = False
self.media_store = MediaStore(self.config, self.app_config)
self.video_recorder = VideoRecorder(self.config, self.app_config)
@@ -119,12 +124,19 @@ class MainWindow(QMainWindow):
)
toolbar_layout = QHBoxLayout(self.toolbar)
toolbar_layout.setContentsMargins(8, 6, 8, 6)
self.load_video_button = self._tool_button(QStyle.SP_DirOpenIcon, "Wczytaj film")
self.video_play_button = self._tool_button(QStyle.SP_MediaPlay, "Play/pauza filmu")
self.photo_button = self._tool_button(QStyle.SP_DialogSaveButton, "Zrob zdjecie")
self.record_button = self._tool_button(QStyle.SP_MediaPlay, "Start/stop nagrywania")
self.settings_button = self._tool_button(QStyle.SP_FileDialogDetailedView, "Ustawienia obrazu")
toolbar_layout.addWidget(self.load_video_button)
toolbar_layout.addWidget(self.video_play_button)
toolbar_layout.addWidget(self.photo_button)
toolbar_layout.addWidget(self.record_button)
toolbar_layout.addWidget(self.settings_button)
self.video_play_button.setEnabled(False)
self.load_video_button.clicked.connect(self.load_video)
self.video_play_button.clicked.connect(self.toggle_video_playback)
self.photo_button.clicked.connect(self.take_photo)
self.record_button.clicked.connect(self.toggle_recording)
self.settings_button.clicked.connect(self.open_settings)
@@ -158,9 +170,10 @@ class MainWindow(QMainWindow):
def closeEvent(self, event: Any) -> None:
if self.video_recorder.is_recording:
self.video_recorder.stop(self.current_metadata("video"))
self.worker.stop()
self.video_timer.stop()
self._close_video_capture()
self._stop_camera_worker()
self.detection_worker.stop()
self.worker.wait(2000)
self.detection_worker.wait(2000)
super().closeEvent(event)
@@ -223,6 +236,41 @@ class MainWindow(QMainWindow):
self.record_button.setIcon(self.style().standardIcon(QStyle.SP_MediaStop))
self.statusBar().showMessage(f"Nagrywanie: {path}", 5000)
def load_video(self) -> None:
path, _ = QFileDialog.getOpenFileName(
self,
"Wczytaj film",
"",
"Filmy (*.mp4 *.avi *.mov *.mkv *.m4v);;Wszystkie pliki (*)",
)
if not path:
return
capture = cv2.VideoCapture(path)
if not capture.isOpened():
QMessageBox.warning(self, "Film", "Nie mozna otworzyc pliku wideo")
capture.release()
return
if self.video_recorder.is_recording:
self.video_recorder.stop(self.current_metadata("video"))
self.record_button.setIcon(self.style().standardIcon(QStyle.SP_MediaPlay))
self._stop_camera_worker()
self._close_video_capture()
self.video_capture = capture
self.video_play_button.setEnabled(True)
self._set_video_playing(False)
self.overlay_result = None
self.last_detection = None
self.result_text.setPlainText(f"Wczytano film: {path}")
self._read_video_frame()
def toggle_video_playback(self) -> None:
if self.video_capture is None:
return
self._set_video_playing(not self.video_playing)
def open_settings(self) -> None:
dialog = SettingsDialog(self.config, self)
dialog.settings_saved.connect(self.save_camera_settings)
@@ -232,7 +280,54 @@ class MainWindow(QMainWindow):
def save_camera_settings(self, camera_config: dict[str, Any]) -> None:
self.config["camera"] = camera_config
self.app_config.save(self.config)
self.worker.update_camera_config(camera_config)
if self.worker is not None:
self.worker.update_camera_config(camera_config)
def _read_video_frame(self) -> None:
if self.video_capture is None:
return
ok, frame = self.video_capture.read()
if not ok or frame is None:
self._set_video_playing(False)
self.video_capture.set(cv2.CAP_PROP_POS_FRAMES, 0)
self.statusBar().showMessage("Koniec filmu", 3000)
return
self.on_frame_ready(frame)
def _set_video_playing(self, playing: bool) -> None:
self.video_playing = playing
if self.video_capture is None:
self.video_timer.stop()
self.video_play_button.setIcon(self.style().standardIcon(QStyle.SP_MediaPlay))
self.video_play_button.setEnabled(False)
return
if playing:
fps = self.video_capture.get(cv2.CAP_PROP_FPS)
if fps <= 0:
fps = float(self.config["camera"].get("fps", 30))
interval_ms = max(1, int(round(1000 / fps)))
self.video_timer.start(interval_ms)
self.video_play_button.setIcon(self.style().standardIcon(QStyle.SP_MediaPause))
else:
self.video_timer.stop()
self.video_play_button.setIcon(self.style().standardIcon(QStyle.SP_MediaPlay))
def _close_video_capture(self) -> None:
self._set_video_playing(False)
if self.video_capture is not None:
self.video_capture.release()
self.video_capture = None
self.video_play_button.setEnabled(False)
def _stop_camera_worker(self) -> None:
if self.worker is None:
return
self.worker.stop()
self.worker.wait(2000)
self.worker = None
def _maybe_request_detection(self, frame: np.ndarray) -> None:
if not self.detecting:
@@ -277,10 +372,18 @@ class MainWindow(QMainWindow):
lines.append(f"Komunikat: {result.error}")
if result.confidence is not None:
lines.append(f"YOLO confidence: {result.confidence:.3f}")
if result.ocr_engine:
lines.append(f"OCR: {result.ocr_engine}")
if result.ocr_confidence is not None:
lines.append(f"OCR confidence: {result.ocr_confidence:.3f}")
if result.ocr_elapsed_ms is not None:
lines.append(f"OCR czas: {result.ocr_elapsed_ms:.0f} ms")
if result.parsed:
lines.append(f"Zamowienie: {result.parsed.order_number or '-'}")
lines.append(f"Kolor: {result.parsed.color_code or '-'}")
lines.append(f"Model: {result.parsed.product_model or '-'}")
color_score = _format_score(result.parsed.color_score)
model_score = _format_score(result.parsed.product_model_score)
lines.append(f"Kolor: {result.parsed.color_code or '-'}{color_score}")
lines.append(f"Model: {result.parsed.product_model or '-'}{model_score}")
if result.raw_text:
lines.append("")
lines.append(result.raw_text)
@@ -326,7 +429,7 @@ class MainWindow(QMainWindow):
def _draw_fps(self, frame_bgr: np.ndarray) -> None:
label = f"FPS: {self.display_fps:.1f}"
cv2.rectangle(frame_bgr, (12, 12), (122, 46), (0, 0, 0), -1)
cv2.rectangle(frame_bgr, (12, 12), (142, 46), (0, 0, 0), -1)
cv2.putText(
frame_bgr,
label,
@@ -344,3 +447,9 @@ def run_app(app_config: AppConfig) -> int:
window = MainWindow(app_config)
window.show()
return app.exec()
def _format_score(score: float | None) -> str:
if score is None:
return ""
return f" ({score:.2f})"

4
app/ocr/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from app.ocr.base import OcrEngine, OcrLine, OcrResult
from app.ocr.factory import create_ocr_engine
__all__ = ["OcrEngine", "OcrLine", "OcrResult", "create_ocr_engine"]

54
app/ocr/base.py Normal file
View File

@@ -0,0 +1,54 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Protocol
import cv2
import numpy as np
@dataclass
class OcrLine:
text: str
confidence: float | None = None
bbox: list[list[float]] | None = None
@dataclass
class OcrResult:
text: str = ""
confidence: float | None = None
lines: list[OcrLine] = field(default_factory=list)
error: str | None = None
elapsed_ms: float = 0.0
engine: str = "none"
class OcrEngine(Protocol):
name: str
def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> OcrResult:
...
def crop_bbox(frame_bgr: np.ndarray, bbox: tuple[int, int, int, int], margin: int = 0) -> np.ndarray | None:
x1, y1, x2, y2 = bbox
h, w = frame_bgr.shape[:2]
x1, y1 = max(0, x1 - margin), max(0, y1 - margin)
x2, y2 = min(w, x2 + margin), min(h, y2 + margin)
if x2 <= x1 or y2 <= y1:
return None
return frame_bgr[y1:y2, x1:x2]
def prepare_ocr_image(image_bgr: np.ndarray, config: dict) -> np.ndarray:
scale = float(config.get("scale", 1.0))
if scale != 1.0:
image_bgr = cv2.resize(image_bgr, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
if not config.get("threshold", False):
return image_bgr
gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (3, 3), 0)
return cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]

106
app/ocr/cli.py Normal file
View File

@@ -0,0 +1,106 @@
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
import cv2
from app.config import AppConfig
from app.label_parser import parse_label_text
from app.ocr import create_ocr_engine
def iter_images(path: Path) -> list[Path]:
if path.is_file():
return [path]
extensions = {".jpg", ".jpeg", ".png", ".bmp", ".webp", ".tif", ".tiff"}
return sorted(item for item in path.iterdir() if item.is_file() and item.suffix.lower() in extensions)
def result_to_dict(path: Path, result: Any, config: dict[str, Any]) -> dict[str, Any]:
label_cfg = config.get("label_data", {})
parsed = parse_label_text(
result.text,
label_cfg.get("colors", []),
label_cfg.get("models", []),
model_min_score=float(label_cfg.get("model_min_score", 0.72)),
color_min_score=float(label_cfg.get("color_min_score", 0.72)),
)
return {
"file": str(path),
"engine": result.engine,
"elapsed_ms": round(result.elapsed_ms, 2),
"confidence": result.confidence,
"error": result.error,
"text": result.text,
"lines": [
{
"text": line.text,
"confidence": line.confidence,
"bbox": line.bbox,
}
for line in result.lines
],
"parsed": parsed.to_dict(),
}
def main() -> int:
parser = argparse.ArgumentParser(description="Test OCR backend on cropped label images.")
parser.add_argument("path", help="Image file or directory with crop images")
parser.add_argument("--config", default="app_config.json", help="Application config JSON path")
parser.add_argument(
"--engine",
choices=["none", "tesseract", "paddle"],
help="Override ocr.engine from config",
)
parser.add_argument("--no-threshold", action="store_true", help="Disable threshold preprocessing")
parser.add_argument("--scale", type=float, help="Override OCR scale")
parser.add_argument("--json", action="store_true", help="Print JSON output")
args = parser.parse_args()
app_config = AppConfig(Path(args.config))
config = app_config.data
if args.engine:
config["ocr"]["engine"] = args.engine
config["ocr"]["enabled"] = args.engine != "none"
if args.no_threshold:
config["ocr"]["threshold"] = False
if args.scale is not None:
config["ocr"]["scale"] = args.scale
engine = create_ocr_engine(config)
outputs = []
for image_path in iter_images(Path(args.path)):
image = cv2.imread(str(image_path), cv2.IMREAD_COLOR)
if image is None:
outputs.append({"file": str(image_path), "error": "Nie mozna odczytac obrazu"})
continue
h, w = image.shape[:2]
result = engine.read_label(image, (0, 0, w, h))
outputs.append(result_to_dict(image_path, result, config))
if args.json:
print(json.dumps(outputs, indent=2, ensure_ascii=False))
return 0
for output in outputs:
print(f"file: {output['file']}")
print(f"engine: {output.get('engine')}")
print(f"elapsed_ms: {output.get('elapsed_ms')}")
print(f"confidence: {output.get('confidence')}")
if output.get("error"):
print(f"error: {output['error']}")
print("text:")
print(output.get("text") or "")
print(f"parsed: {output.get('parsed')}")
print()
return 0
if __name__ == "__main__":
raise SystemExit(main())

24
app/ocr/factory.py Normal file
View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from typing import Any
from app.ocr.base import OcrEngine
from app.ocr.none import NoOcrEngine
from app.ocr.paddle import PaddleOcrEngine
from app.ocr.tesseract import TesseractOcrEngine
def create_ocr_engine(config: dict[str, Any]) -> OcrEngine:
ocr_cfg = config.get("ocr", {})
if not ocr_cfg.get("enabled", True):
return NoOcrEngine(ocr_cfg)
engine = str(ocr_cfg.get("engine", "tesseract")).lower()
if engine in {"none", "off", "disabled"}:
return NoOcrEngine(ocr_cfg)
if engine == "tesseract":
return TesseractOcrEngine(ocr_cfg)
if engine == "paddle":
return PaddleOcrEngine(ocr_cfg)
raise ValueError(f"Nieznany silnik OCR: {engine}")

15
app/ocr/none.py Normal file
View File

@@ -0,0 +1,15 @@
from __future__ import annotations
import numpy as np
from app.ocr.base import OcrResult
class NoOcrEngine:
name = "none"
def __init__(self, config: dict) -> None:
self.config = config
def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> OcrResult:
return OcrResult(engine=self.name)

153
app/ocr/paddle.py Normal file
View File

@@ -0,0 +1,153 @@
from __future__ import annotations
import time
from typing import Any
import numpy as np
from app.ocr.base import OcrLine, OcrResult, crop_bbox, prepare_ocr_image
class PaddleOcrEngine:
name = "paddle"
def __init__(self, config: dict) -> None:
self.config = config
self.load_error: str | None = None
self.ocr: Any = None
self._load()
def _load(self) -> None:
try:
from paddleocr import PaddleOCR
except Exception as exc:
self.load_error = f"Nie mozna zaimportowac PaddleOCR: {exc}"
return
paddle_cfg = dict(self.config.get("paddle", {}))
paddle_cfg.setdefault("lang", self.config.get("language", "en"))
try:
self.ocr = PaddleOCR(**paddle_cfg)
except Exception as exc:
self.load_error = f"Nie mozna zaladowac PaddleOCR: {exc}"
def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> OcrResult:
started = time.perf_counter()
if self.ocr is None:
return OcrResult(
error=self.load_error or "PaddleOCR nie jest zaladowany",
elapsed_ms=self._elapsed_ms(started),
engine=self.name,
)
margin = int(self.config.get("margin", 0))
roi = crop_bbox(frame_bgr, bbox, margin=margin)
if roi is None:
return OcrResult(
error="Nieprawidlowy bbox OCR",
elapsed_ms=self._elapsed_ms(started),
engine=self.name,
)
preprocess_config = {
**self.config,
"threshold": bool(self.config.get("paddle_threshold", False)),
}
image = prepare_ocr_image(roi, preprocess_config)
try:
raw_result = self._run_ocr(image)
except Exception as exc:
return OcrResult(
error=f"Blad PaddleOCR: {exc}",
elapsed_ms=self._elapsed_ms(started),
engine=self.name,
)
lines = self._parse_lines(raw_result)
text = "\n".join(line.text for line in lines)
confidences = [line.confidence for line in lines if line.confidence is not None]
confidence = sum(confidences) / len(confidences) if confidences else None
return OcrResult(
text=text,
confidence=confidence,
lines=lines,
elapsed_ms=self._elapsed_ms(started),
engine=self.name,
)
def _run_ocr(self, image: np.ndarray) -> Any:
if hasattr(self.ocr, "predict"):
return self.ocr.predict(image)
try:
return self.ocr.ocr(image, cls=bool(self.config.get("use_angle_cls", True)))
except TypeError:
return self.ocr.ocr(image)
def _parse_lines(self, raw_result: Any) -> list[OcrLine]:
if raw_result is None:
return []
lines: list[OcrLine] = []
for item in self._iter_result_items(raw_result):
parsed = self._parse_item(item)
if parsed is not None and parsed.text.strip():
lines.append(parsed)
return lines
def _iter_result_items(self, raw_result: Any) -> list[Any]:
if isinstance(raw_result, dict):
texts = raw_result.get("rec_texts") or raw_result.get("texts")
scores = raw_result.get("rec_scores") or raw_result.get("scores") or []
boxes = raw_result.get("rec_polys") or raw_result.get("dt_polys") or raw_result.get("boxes") or []
if texts:
return [
(boxes[index] if index < len(boxes) else None, (text, scores[index] if index < len(scores) else None))
for index, text in enumerate(texts)
]
return []
if isinstance(raw_result, list) and len(raw_result) == 1 and isinstance(raw_result[0], list):
return raw_result[0]
if isinstance(raw_result, list):
items = []
for result in raw_result:
if isinstance(result, dict):
items.extend(self._iter_result_items(result))
elif isinstance(result, list):
items.extend(result)
else:
items.append(result)
return items
return [raw_result]
def _parse_item(self, item: Any) -> OcrLine | None:
if not isinstance(item, (list, tuple)):
return None
if len(item) >= 2 and isinstance(item[1], (list, tuple)) and item[1]:
text = str(item[1][0])
confidence = self._to_float(item[1][1]) if len(item[1]) > 1 else None
bbox = self._to_bbox(item[0])
return OcrLine(text=text, confidence=confidence, bbox=bbox)
if len(item) >= 2 and isinstance(item[0], str):
return OcrLine(text=str(item[0]), confidence=self._to_float(item[1]))
return None
def _to_float(self, value: Any) -> float | None:
try:
return float(value)
except (TypeError, ValueError):
return None
def _to_bbox(self, value: Any) -> list[list[float]] | None:
if value is None:
return None
try:
return [[float(point[0]), float(point[1])] for point in value]
except (TypeError, ValueError, IndexError):
return None
def _elapsed_ms(self, started: float) -> float:
return (time.perf_counter() - started) * 1000.0

104
app/ocr/tesseract.py Normal file
View File

@@ -0,0 +1,104 @@
from __future__ import annotations
import time
import numpy as np
from app.ocr.base import OcrLine, OcrResult, crop_bbox, prepare_ocr_image
class TesseractOcrEngine:
name = "tesseract"
def __init__(self, config: dict) -> None:
self.config = config
self.load_error: str | None = None
self.pytesseract = None
self._load()
def _load(self) -> None:
try:
import pytesseract
command = self.config.get("tesseract_cmd")
if command:
pytesseract.pytesseract.tesseract_cmd = command
self.pytesseract = pytesseract
except Exception as exc:
self.load_error = f"Nie mozna zaladowac pytesseract: {exc}"
def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> OcrResult:
started = time.perf_counter()
if self.pytesseract is None:
return OcrResult(
error=self.load_error or "OCR Tesseract nie jest zaladowany",
elapsed_ms=self._elapsed_ms(started),
engine=self.name,
)
margin = int(self.config.get("margin", 0))
roi = crop_bbox(frame_bgr, bbox, margin=margin)
if roi is None:
return OcrResult(
error="Nieprawidlowy bbox OCR",
elapsed_ms=self._elapsed_ms(started),
engine=self.name,
)
image = prepare_ocr_image(roi, self.config)
psm = int(self.config.get("psm", 6))
language = self.config.get("language", "eng")
extra_config = str(self.config.get("config", "")).strip()
tesseract_config = f"--psm {psm}"
if extra_config:
tesseract_config = f"{tesseract_config} {extra_config}"
try:
text = self.pytesseract.image_to_string(
image,
lang=language,
config=tesseract_config,
)
except Exception as exc:
return OcrResult(
error=f"Blad OCR Tesseract: {exc}",
elapsed_ms=self._elapsed_ms(started),
engine=self.name,
)
confidence = self._mean_confidence(image, language, tesseract_config)
return OcrResult(
text=text,
confidence=confidence,
lines=[OcrLine(text=line) for line in text.splitlines() if line.strip()],
elapsed_ms=self._elapsed_ms(started),
engine=self.name,
)
def _mean_confidence(self, image: np.ndarray, language: str, tesseract_config: str) -> float | None:
if self.pytesseract is None:
return None
try:
data = self.pytesseract.image_to_data(
image,
lang=language,
config=tesseract_config,
output_type=self.pytesseract.Output.DICT,
)
except Exception:
return None
values = []
for raw_conf in data.get("conf", []):
try:
confidence = float(raw_conf)
except (TypeError, ValueError):
continue
if confidence >= 0:
values.append(confidence / 100.0)
if not values:
return None
return sum(values) / len(values)
def _elapsed_ms(self, started: float) -> float:
return (time.perf_counter() - started) * 1000.0

View File

@@ -20,19 +20,32 @@
}
},
"detection": {
"model_path": "models/best.pt",
"model_path": "models/best_v1.pt",
"confidence_threshold": 0.25,
"mode": "best",
"frame_stride": 5,
"frame_stride": 30,
"image_size": 640,
"device": "cpu"
},
"ocr": {
"enabled": true,
"engine": "paddle",
"language": "eng",
"tesseract_cmd": null,
"psm": 6,
"margin": 0,
"threshold": true,
"scale": 2.0
"paddle_threshold": false,
"scale": 2.0,
"config": "",
"use_angle_cls": true,
"paddle": {
"enable_mkldnn": false,
"lang": "en",
"use_doc_orientation_classify": false,
"use_doc_unwarping": false,
"use_textline_orientation": false
}
},
"capture": {
"photos_dir": "captures/photos",
@@ -45,13 +58,33 @@
"show_fps": true
},
"label_data": {
"model_min_score": 0.72,
"color_min_score": 0.72,
"models": [
"Regius",
"Duvell"
"Regius 6",
"Regius 7",
"Duvell 6",
"Duvell 7",
"Duvell Elite 6",
"Duvell Elite 7"
],
"colors": [
"T-NF-BLK-OUT-BST-G",
"T-BLK-G"
"T-BLK-G",
"T-BLK-S",
"T-BLK-M",
"M-BLK-G",
"M-BLK-S",
"M-BLK-M",
"T-CST-G",
"T-CST-S",
"T-CST-M",
"T-ANTIQUE-G",
"T-ANTIQUE-S",
"T-ANTIQUE-M",
"T-NAT-G",
"T-NAT-S",
"T-NAT-M"
]
}
}

View File

@@ -0,0 +1,2 @@
paddlepaddle
paddleocr