Compare commits
4 Commits
d117be5eec
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 711aee3334 | |||
| 30c508287a | |||
| 2b582dc732 | |||
| 061ebf9978 |
17
README.md
17
README.md
@@ -39,5 +39,22 @@ Istotne ustawienia:
|
|||||||
- `detection.mode` - `best` rysuje najlepsza etykiete, `all` rysuje wszystkie wykrycia.
|
- `detection.mode` - `best` rysuje najlepsza etykiete, `all` rysuje wszystkie wykrycia.
|
||||||
- `detection.frame_stride` - YOLO uruchamiany co N klatek podczas aktywnego wykrywania.
|
- `detection.frame_stride` - YOLO uruchamiany co N klatek podczas aktywnego wykrywania.
|
||||||
- `label_data.models`, `label_data.colors` - slowniki do walidacji tekstu z etykiety.
|
- `label_data.models`, `label_data.colors` - slowniki do walidacji tekstu z etykiety.
|
||||||
|
- `ocr.enabled`, `ocr.engine` - wlaczenie OCR i wybor silnika: `none`, `tesseract`, `paddle`.
|
||||||
|
|
||||||
Zdjecia trafiaja do `captures/photos`, filmy do `captures/videos`. Obok kazdego pliku media zapisywany jest JSON z aktualnym wynikiem detekcji/OCR.
|
Zdjecia trafiaja do `captures/photos`, filmy do `captures/videos`. Obok kazdego pliku media zapisywany jest JSON z aktualnym wynikiem detekcji/OCR.
|
||||||
|
|
||||||
|
## Testowanie OCR poza aplikacja
|
||||||
|
|
||||||
|
OCR mozna testowac na gotowych cropach bez uruchamiania kamery i YOLO:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
.venv-lin/bin/python -m app.ocr.cli crop --engine none
|
||||||
|
.venv-lin/bin/python -m app.ocr.cli crop --engine tesseract
|
||||||
|
.venv-lin/bin/python -m app.ocr.cli crop --engine paddle --json
|
||||||
|
```
|
||||||
|
|
||||||
|
Backend PaddleOCR jest opcjonalny. Zaleznosci do testow PaddleOCR sa w osobnym pliku:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
.venv-lin/bin/pip install -r requirements-ocr-paddle.txt
|
||||||
|
```
|
||||||
|
|||||||
@@ -41,10 +41,23 @@ DEFAULT_CONFIG: dict[str, Any] = {
|
|||||||
},
|
},
|
||||||
"ocr": {
|
"ocr": {
|
||||||
"enabled": True,
|
"enabled": True,
|
||||||
|
"engine": "tesseract",
|
||||||
"language": "eng",
|
"language": "eng",
|
||||||
"tesseract_cmd": None,
|
"tesseract_cmd": None,
|
||||||
|
"psm": 6,
|
||||||
|
"margin": 0,
|
||||||
"threshold": True,
|
"threshold": True,
|
||||||
|
"paddle_threshold": False,
|
||||||
"scale": 2.0,
|
"scale": 2.0,
|
||||||
|
"config": "",
|
||||||
|
"use_angle_cls": True,
|
||||||
|
"paddle": {
|
||||||
|
"enable_mkldnn": False,
|
||||||
|
"lang": "en",
|
||||||
|
"use_doc_orientation_classify": False,
|
||||||
|
"use_doc_unwarping": False,
|
||||||
|
"use_textline_orientation": False,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
"capture": {
|
"capture": {
|
||||||
"photos_dir": "captures/photos",
|
"photos_dir": "captures/photos",
|
||||||
@@ -56,7 +69,12 @@ DEFAULT_CONFIG: dict[str, Any] = {
|
|||||||
"display": {
|
"display": {
|
||||||
"show_fps": True,
|
"show_fps": True,
|
||||||
},
|
},
|
||||||
"label_data": {"models": ["Regius", "Duvell"], "colors": ["T-NF-BLK-OUT-BST-G", "T-BLK-G"]},
|
"label_data": {
|
||||||
|
"model_min_score": 0.72,
|
||||||
|
"color_min_score": 0.72,
|
||||||
|
"models": ["Regius", "Duvell"],
|
||||||
|
"colors": ["T-NF-BLK-OUT-BST-G", "T-BLK-G"],
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -4,10 +4,10 @@ from dataclasses import dataclass, field
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
import cv2
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
from app.label_parser import ParsedLabel, parse_label_text
|
from app.label_parser import ParsedLabel, parse_label_text
|
||||||
|
from app.ocr import create_ocr_engine
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -18,6 +18,9 @@ class DetectionResult:
|
|||||||
raw_text: str = ""
|
raw_text: str = ""
|
||||||
parsed: ParsedLabel | None = None
|
parsed: ParsedLabel | None = None
|
||||||
error: str | None = None
|
error: str | None = None
|
||||||
|
ocr_engine: str | None = None
|
||||||
|
ocr_confidence: float | None = None
|
||||||
|
ocr_elapsed_ms: float | None = None
|
||||||
all_boxes: list[dict[str, Any]] = field(default_factory=list)
|
all_boxes: list[dict[str, Any]] = field(default_factory=list)
|
||||||
|
|
||||||
def to_metadata(self) -> dict[str, Any]:
|
def to_metadata(self) -> dict[str, Any]:
|
||||||
@@ -28,6 +31,9 @@ class DetectionResult:
|
|||||||
"raw_text": self.raw_text,
|
"raw_text": self.raw_text,
|
||||||
"parsed": self.parsed.to_dict() if self.parsed else None,
|
"parsed": self.parsed.to_dict() if self.parsed else None,
|
||||||
"error": self.error,
|
"error": self.error,
|
||||||
|
"ocr_engine": self.ocr_engine,
|
||||||
|
"ocr_confidence": self.ocr_confidence,
|
||||||
|
"ocr_elapsed_ms": self.ocr_elapsed_ms,
|
||||||
"all_boxes": self.all_boxes,
|
"all_boxes": self.all_boxes,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -72,6 +78,9 @@ class YoloLabelDetector:
|
|||||||
boxes = []
|
boxes = []
|
||||||
names = getattr(self.model, "names", {})
|
names = getattr(self.model, "names", {})
|
||||||
for result in results:
|
for result in results:
|
||||||
|
if result.boxes is None:
|
||||||
|
continue
|
||||||
|
|
||||||
for box in result.boxes:
|
for box in result.boxes:
|
||||||
x1, y1, x2, y2 = [int(v) for v in box.xyxy[0].tolist()]
|
x1, y1, x2, y2 = [int(v) for v in box.xyxy[0].tolist()]
|
||||||
confidence = float(box.conf[0])
|
confidence = float(box.conf[0])
|
||||||
@@ -106,78 +115,30 @@ class YoloLabelDetector:
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
class TesseractOcr:
|
|
||||||
def __init__(self, config: dict[str, Any]) -> None:
|
|
||||||
self.config = config
|
|
||||||
self.load_error: str | None = None
|
|
||||||
self.pytesseract = None
|
|
||||||
self._load()
|
|
||||||
|
|
||||||
def _load(self) -> None:
|
|
||||||
if not self.config["ocr"].get("enabled", True):
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
import pytesseract
|
|
||||||
|
|
||||||
command = self.config["ocr"].get("tesseract_cmd")
|
|
||||||
if command:
|
|
||||||
pytesseract.pytesseract.tesseract_cmd = command
|
|
||||||
self.pytesseract = pytesseract
|
|
||||||
except Exception as exc:
|
|
||||||
self.load_error = f"Nie mozna zaladowac pytesseract: {exc}"
|
|
||||||
|
|
||||||
def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> tuple[str, str | None]:
|
|
||||||
if not self.config["ocr"].get("enabled", True):
|
|
||||||
return "", None
|
|
||||||
if self.pytesseract is None:
|
|
||||||
return "", self.load_error or "OCR nie jest zaladowany"
|
|
||||||
|
|
||||||
x1, y1, x2, y2 = bbox
|
|
||||||
h, w = frame_bgr.shape[:2]
|
|
||||||
x1, y1 = max(0, x1), max(0, y1)
|
|
||||||
x2, y2 = min(w, x2), min(h, y2)
|
|
||||||
if x2 <= x1 or y2 <= y1:
|
|
||||||
return "", "Nieprawidlowy bbox OCR"
|
|
||||||
|
|
||||||
roi = frame_bgr[y1:y2, x1:x2]
|
|
||||||
scale = float(self.config["ocr"].get("scale", 1.0))
|
|
||||||
if scale != 1.0:
|
|
||||||
roi = cv2.resize(roi, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
|
|
||||||
|
|
||||||
gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
|
|
||||||
if self.config["ocr"].get("threshold", True):
|
|
||||||
gray = cv2.GaussianBlur(gray, (3, 3), 0)
|
|
||||||
gray = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]
|
|
||||||
|
|
||||||
try:
|
|
||||||
text = self.pytesseract.image_to_string(
|
|
||||||
gray,
|
|
||||||
lang=self.config["ocr"].get("language", "eng"),
|
|
||||||
config="--psm 6",
|
|
||||||
)
|
|
||||||
except Exception as exc:
|
|
||||||
return "", f"Blad OCR: {exc}"
|
|
||||||
return text, None
|
|
||||||
|
|
||||||
|
|
||||||
class DetectionPipeline:
|
class DetectionPipeline:
|
||||||
def __init__(self, config: dict[str, Any], app_config: Any) -> None:
|
def __init__(self, config: dict[str, Any], app_config: Any) -> None:
|
||||||
self.config = config
|
self.config = config
|
||||||
self.detector = YoloLabelDetector(config, app_config)
|
self.detector = YoloLabelDetector(config, app_config)
|
||||||
self.ocr = TesseractOcr(config)
|
self.ocr = create_ocr_engine(config)
|
||||||
|
|
||||||
def process(self, frame_bgr: np.ndarray) -> DetectionResult:
|
def process(self, frame_bgr: np.ndarray) -> DetectionResult:
|
||||||
result = self.detector.detect(frame_bgr)
|
result = self.detector.detect(frame_bgr)
|
||||||
if result.xyxy is None:
|
if result.xyxy is None:
|
||||||
return result
|
return result
|
||||||
|
|
||||||
text, ocr_error = self.ocr.read_label(frame_bgr, result.xyxy)
|
ocr_result = self.ocr.read_label(frame_bgr, result.xyxy)
|
||||||
result.raw_text = text
|
result.raw_text = ocr_result.text
|
||||||
|
result.ocr_engine = ocr_result.engine
|
||||||
|
result.ocr_confidence = ocr_result.confidence
|
||||||
|
result.ocr_elapsed_ms = ocr_result.elapsed_ms
|
||||||
|
label_cfg = self.config["label_data"]
|
||||||
result.parsed = parse_label_text(
|
result.parsed = parse_label_text(
|
||||||
text,
|
ocr_result.text,
|
||||||
self.config["label_data"].get("colors", []),
|
label_cfg.get("colors", []),
|
||||||
self.config["label_data"].get("models", []),
|
label_cfg.get("models", []),
|
||||||
|
model_min_score=float(label_cfg.get("model_min_score", 0.72)),
|
||||||
|
color_min_score=float(label_cfg.get("color_min_score", 0.72)),
|
||||||
)
|
)
|
||||||
if ocr_error:
|
if ocr_result.error:
|
||||||
result.error = ocr_error
|
result.error = ocr_result.error
|
||||||
return result
|
return result
|
||||||
|
|||||||
131
app/fuzzy_match.py
Normal file
131
app/fuzzy_match.py
Normal file
@@ -0,0 +1,131 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re
|
||||||
|
import unicodedata
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from difflib import SequenceMatcher
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class FuzzyMatch:
|
||||||
|
value: str
|
||||||
|
score: float
|
||||||
|
matched_text: str
|
||||||
|
coverage: float
|
||||||
|
|
||||||
|
|
||||||
|
def compact_text(text: str) -> str:
|
||||||
|
normalized = unicodedata.normalize("NFKD", text)
|
||||||
|
ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
|
||||||
|
return re.sub(r"[^A-Z0-9]+", "", ascii_text.upper())
|
||||||
|
|
||||||
|
|
||||||
|
def token_text(text: str) -> list[str]:
|
||||||
|
normalized = unicodedata.normalize("NFKD", text)
|
||||||
|
ascii_text = normalized.encode("ascii", "ignore").decode("ascii")
|
||||||
|
return re.findall(r"[A-Z0-9]+", ascii_text.upper())
|
||||||
|
|
||||||
|
|
||||||
|
def similarity(left: str, right: str) -> float:
|
||||||
|
if not left or not right:
|
||||||
|
return 0.0
|
||||||
|
return SequenceMatcher(None, left, right).ratio()
|
||||||
|
|
||||||
|
|
||||||
|
def best_fuzzy_match(text: str, candidates: list[str], min_score: float = 0.72) -> FuzzyMatch | None:
|
||||||
|
best: FuzzyMatch | None = None
|
||||||
|
for candidate in candidates:
|
||||||
|
candidate_compact = compact_text(candidate)
|
||||||
|
if not candidate_compact:
|
||||||
|
continue
|
||||||
|
|
||||||
|
score, matched_text, coverage = best_candidate_score(text, candidate_compact)
|
||||||
|
match = FuzzyMatch(
|
||||||
|
value=candidate,
|
||||||
|
score=score,
|
||||||
|
matched_text=matched_text,
|
||||||
|
coverage=coverage,
|
||||||
|
)
|
||||||
|
if best is None or _is_better_match(match, best):
|
||||||
|
best = match
|
||||||
|
|
||||||
|
if best is None or best.score < min_score:
|
||||||
|
return None
|
||||||
|
return best
|
||||||
|
|
||||||
|
|
||||||
|
def best_candidate_score(text: str, candidate_compact: str) -> tuple[float, str, float]:
|
||||||
|
full_compact = compact_text(text)
|
||||||
|
if candidate_compact in full_compact:
|
||||||
|
return 1.0, candidate_compact, 1.0
|
||||||
|
|
||||||
|
windows = candidate_windows(text, len(candidate_compact))
|
||||||
|
if not windows and full_compact:
|
||||||
|
windows = [full_compact]
|
||||||
|
|
||||||
|
best_score = 0.0
|
||||||
|
best_window = ""
|
||||||
|
best_coverage = 0.0
|
||||||
|
for window in windows:
|
||||||
|
coverage = min(len(window), len(candidate_compact)) / max(len(window), len(candidate_compact))
|
||||||
|
score = similarity(candidate_compact, window) * coverage * digit_match_weight(candidate_compact, window)
|
||||||
|
if score > best_score:
|
||||||
|
best_score = score
|
||||||
|
best_window = window
|
||||||
|
best_coverage = coverage
|
||||||
|
return best_score, best_window, best_coverage
|
||||||
|
|
||||||
|
|
||||||
|
def digit_match_weight(candidate: str, matched_text: str) -> float:
|
||||||
|
candidate_digits = re.findall(r"\d", candidate)
|
||||||
|
matched_digits = re.findall(r"\d", matched_text)
|
||||||
|
if not candidate_digits or not matched_digits:
|
||||||
|
return 1.0
|
||||||
|
if candidate_digits == matched_digits:
|
||||||
|
return 1.05
|
||||||
|
return 0.7
|
||||||
|
|
||||||
|
|
||||||
|
def candidate_windows(text: str, candidate_length: int) -> list[str]:
|
||||||
|
tokens = token_text(text)
|
||||||
|
windows: set[str] = set()
|
||||||
|
|
||||||
|
for token in tokens:
|
||||||
|
windows.add(token)
|
||||||
|
|
||||||
|
max_ngram = min(8, len(tokens))
|
||||||
|
for size in range(2, max_ngram + 1):
|
||||||
|
for index in range(0, len(tokens) - size + 1):
|
||||||
|
joined = "".join(tokens[index : index + size])
|
||||||
|
if _length_is_plausible(joined, candidate_length):
|
||||||
|
windows.add(joined)
|
||||||
|
|
||||||
|
full_compact = compact_text(text)
|
||||||
|
if full_compact:
|
||||||
|
min_len = max(1, int(candidate_length * 0.65))
|
||||||
|
max_len = max(min_len, int(candidate_length * 1.35))
|
||||||
|
for length in range(min_len, max_len + 1):
|
||||||
|
if length > len(full_compact):
|
||||||
|
continue
|
||||||
|
for index in range(0, len(full_compact) - length + 1):
|
||||||
|
windows.add(full_compact[index : index + length])
|
||||||
|
|
||||||
|
return sorted(windows)
|
||||||
|
|
||||||
|
|
||||||
|
def _length_is_plausible(value: str, candidate_length: int) -> bool:
|
||||||
|
if not value:
|
||||||
|
return False
|
||||||
|
return int(candidate_length * 0.65) <= len(value) <= int(candidate_length * 1.6)
|
||||||
|
|
||||||
|
|
||||||
|
def _is_better_match(match: FuzzyMatch, best: FuzzyMatch) -> bool:
|
||||||
|
if match.score > best.score + 0.03:
|
||||||
|
return True
|
||||||
|
if match.score < best.score - 0.03:
|
||||||
|
return False
|
||||||
|
if match.coverage > best.coverage + 0.05:
|
||||||
|
return True
|
||||||
|
if match.coverage < best.coverage - 0.05:
|
||||||
|
return False
|
||||||
|
return len(compact_text(match.value)) > len(compact_text(best.value))
|
||||||
@@ -3,8 +3,12 @@ from __future__ import annotations
|
|||||||
import re
|
import re
|
||||||
from dataclasses import dataclass, asdict
|
from dataclasses import dataclass, asdict
|
||||||
|
|
||||||
|
from app.fuzzy_match import best_fuzzy_match
|
||||||
|
|
||||||
|
|
||||||
ORDER_RE = re.compile(r"\b(?P<order>\d{4}/\d{4}/(?:[1-9]|[1-9]\d))\b")
|
ORDER_RE = re.compile(r"\b(?P<order>\d{4}/\d{4}/(?:[1-9]|[1-9]\d))\b")
|
||||||
|
DEFAULT_MODEL_MIN_SCORE = 0.72
|
||||||
|
DEFAULT_COLOR_MIN_SCORE = 0.72
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -13,8 +17,10 @@ class ParsedLabel:
|
|||||||
color_code: str | None
|
color_code: str | None
|
||||||
product_model: str | None
|
product_model: str | None
|
||||||
raw_text: str
|
raw_text: str
|
||||||
|
color_score: float | None = None
|
||||||
|
product_model_score: float | None = None
|
||||||
|
|
||||||
def to_dict(self) -> dict[str, str | None]:
|
def to_dict(self) -> dict[str, str | float | None]:
|
||||||
return asdict(self)
|
return asdict(self)
|
||||||
|
|
||||||
|
|
||||||
@@ -22,23 +28,24 @@ def normalize_ocr_text(text: str) -> str:
|
|||||||
return " ".join(text.replace("\n", " ").replace("\r", " ").split())
|
return " ".join(text.replace("\n", " ").replace("\r", " ").split())
|
||||||
|
|
||||||
|
|
||||||
def parse_label_text(text: str, known_colors: list[str], known_models: list[str]) -> ParsedLabel:
|
def parse_label_text(
|
||||||
|
text: str,
|
||||||
|
known_colors: list[str],
|
||||||
|
known_models: list[str],
|
||||||
|
model_min_score: float = DEFAULT_MODEL_MIN_SCORE,
|
||||||
|
color_min_score: float = DEFAULT_COLOR_MIN_SCORE,
|
||||||
|
) -> ParsedLabel:
|
||||||
normalized = normalize_ocr_text(text)
|
normalized = normalize_ocr_text(text)
|
||||||
order_match = ORDER_RE.search(normalized)
|
order_match = ORDER_RE.search(normalized)
|
||||||
|
|
||||||
normalized_upper = normalized.upper()
|
color_match = best_fuzzy_match(normalized, known_colors, color_min_score)
|
||||||
color_code = next(
|
model_match = best_fuzzy_match(normalized, known_models, model_min_score)
|
||||||
(color for color in known_colors if color.upper() in normalized_upper),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
product_model = next(
|
|
||||||
(model for model in known_models if re.search(rf"\b{re.escape(model)}\b", normalized, re.I)),
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
|
|
||||||
return ParsedLabel(
|
return ParsedLabel(
|
||||||
order_number=order_match.group("order") if order_match else None,
|
order_number=order_match.group("order") if order_match else None,
|
||||||
color_code=color_code,
|
color_code=color_match.value if color_match else None,
|
||||||
product_model=product_model,
|
product_model=model_match.value if model_match else None,
|
||||||
raw_text=normalized,
|
raw_text=normalized,
|
||||||
|
color_score=color_match.score if color_match else None,
|
||||||
|
product_model_score=model_match.score if model_match else None,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -6,12 +6,13 @@ from typing import Any
|
|||||||
|
|
||||||
import cv2
|
import cv2
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from PySide6.QtCore import Qt, Slot
|
from PySide6.QtCore import Qt, QTimer, Slot
|
||||||
from PySide6.QtGui import QAction, QImage, QPixmap
|
from PySide6.QtGui import QAction, QImage, QPixmap
|
||||||
from PySide6.QtWidgets import (
|
from PySide6.QtWidgets import (
|
||||||
QApplication,
|
QApplication,
|
||||||
QHBoxLayout,
|
QHBoxLayout,
|
||||||
QLabel,
|
QLabel,
|
||||||
|
QFileDialog,
|
||||||
QMainWindow,
|
QMainWindow,
|
||||||
QMessageBox,
|
QMessageBox,
|
||||||
QPushButton,
|
QPushButton,
|
||||||
@@ -42,6 +43,10 @@ class MainWindow(QMainWindow):
|
|||||||
self.fps_frame_count = 0
|
self.fps_frame_count = 0
|
||||||
self.fps_last_time = time.monotonic()
|
self.fps_last_time = time.monotonic()
|
||||||
self.display_fps = 0.0
|
self.display_fps = 0.0
|
||||||
|
self.video_capture: cv2.VideoCapture | None = None
|
||||||
|
self.video_timer = QTimer(self)
|
||||||
|
self.video_timer.timeout.connect(self._read_video_frame)
|
||||||
|
self.video_playing = False
|
||||||
self.media_store = MediaStore(self.config, self.app_config)
|
self.media_store = MediaStore(self.config, self.app_config)
|
||||||
self.video_recorder = VideoRecorder(self.config, self.app_config)
|
self.video_recorder = VideoRecorder(self.config, self.app_config)
|
||||||
|
|
||||||
@@ -119,12 +124,19 @@ class MainWindow(QMainWindow):
|
|||||||
)
|
)
|
||||||
toolbar_layout = QHBoxLayout(self.toolbar)
|
toolbar_layout = QHBoxLayout(self.toolbar)
|
||||||
toolbar_layout.setContentsMargins(8, 6, 8, 6)
|
toolbar_layout.setContentsMargins(8, 6, 8, 6)
|
||||||
|
self.load_video_button = self._tool_button(QStyle.SP_DirOpenIcon, "Wczytaj film")
|
||||||
|
self.video_play_button = self._tool_button(QStyle.SP_MediaPlay, "Play/pauza filmu")
|
||||||
self.photo_button = self._tool_button(QStyle.SP_DialogSaveButton, "Zrob zdjecie")
|
self.photo_button = self._tool_button(QStyle.SP_DialogSaveButton, "Zrob zdjecie")
|
||||||
self.record_button = self._tool_button(QStyle.SP_MediaPlay, "Start/stop nagrywania")
|
self.record_button = self._tool_button(QStyle.SP_MediaPlay, "Start/stop nagrywania")
|
||||||
self.settings_button = self._tool_button(QStyle.SP_FileDialogDetailedView, "Ustawienia obrazu")
|
self.settings_button = self._tool_button(QStyle.SP_FileDialogDetailedView, "Ustawienia obrazu")
|
||||||
|
toolbar_layout.addWidget(self.load_video_button)
|
||||||
|
toolbar_layout.addWidget(self.video_play_button)
|
||||||
toolbar_layout.addWidget(self.photo_button)
|
toolbar_layout.addWidget(self.photo_button)
|
||||||
toolbar_layout.addWidget(self.record_button)
|
toolbar_layout.addWidget(self.record_button)
|
||||||
toolbar_layout.addWidget(self.settings_button)
|
toolbar_layout.addWidget(self.settings_button)
|
||||||
|
self.video_play_button.setEnabled(False)
|
||||||
|
self.load_video_button.clicked.connect(self.load_video)
|
||||||
|
self.video_play_button.clicked.connect(self.toggle_video_playback)
|
||||||
self.photo_button.clicked.connect(self.take_photo)
|
self.photo_button.clicked.connect(self.take_photo)
|
||||||
self.record_button.clicked.connect(self.toggle_recording)
|
self.record_button.clicked.connect(self.toggle_recording)
|
||||||
self.settings_button.clicked.connect(self.open_settings)
|
self.settings_button.clicked.connect(self.open_settings)
|
||||||
@@ -158,9 +170,10 @@ class MainWindow(QMainWindow):
|
|||||||
def closeEvent(self, event: Any) -> None:
|
def closeEvent(self, event: Any) -> None:
|
||||||
if self.video_recorder.is_recording:
|
if self.video_recorder.is_recording:
|
||||||
self.video_recorder.stop(self.current_metadata("video"))
|
self.video_recorder.stop(self.current_metadata("video"))
|
||||||
self.worker.stop()
|
self.video_timer.stop()
|
||||||
|
self._close_video_capture()
|
||||||
|
self._stop_camera_worker()
|
||||||
self.detection_worker.stop()
|
self.detection_worker.stop()
|
||||||
self.worker.wait(2000)
|
|
||||||
self.detection_worker.wait(2000)
|
self.detection_worker.wait(2000)
|
||||||
super().closeEvent(event)
|
super().closeEvent(event)
|
||||||
|
|
||||||
@@ -223,6 +236,41 @@ class MainWindow(QMainWindow):
|
|||||||
self.record_button.setIcon(self.style().standardIcon(QStyle.SP_MediaStop))
|
self.record_button.setIcon(self.style().standardIcon(QStyle.SP_MediaStop))
|
||||||
self.statusBar().showMessage(f"Nagrywanie: {path}", 5000)
|
self.statusBar().showMessage(f"Nagrywanie: {path}", 5000)
|
||||||
|
|
||||||
|
def load_video(self) -> None:
|
||||||
|
path, _ = QFileDialog.getOpenFileName(
|
||||||
|
self,
|
||||||
|
"Wczytaj film",
|
||||||
|
"",
|
||||||
|
"Filmy (*.mp4 *.avi *.mov *.mkv *.m4v);;Wszystkie pliki (*)",
|
||||||
|
)
|
||||||
|
if not path:
|
||||||
|
return
|
||||||
|
|
||||||
|
capture = cv2.VideoCapture(path)
|
||||||
|
if not capture.isOpened():
|
||||||
|
QMessageBox.warning(self, "Film", "Nie mozna otworzyc pliku wideo")
|
||||||
|
capture.release()
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.video_recorder.is_recording:
|
||||||
|
self.video_recorder.stop(self.current_metadata("video"))
|
||||||
|
self.record_button.setIcon(self.style().standardIcon(QStyle.SP_MediaPlay))
|
||||||
|
|
||||||
|
self._stop_camera_worker()
|
||||||
|
self._close_video_capture()
|
||||||
|
self.video_capture = capture
|
||||||
|
self.video_play_button.setEnabled(True)
|
||||||
|
self._set_video_playing(False)
|
||||||
|
self.overlay_result = None
|
||||||
|
self.last_detection = None
|
||||||
|
self.result_text.setPlainText(f"Wczytano film: {path}")
|
||||||
|
self._read_video_frame()
|
||||||
|
|
||||||
|
def toggle_video_playback(self) -> None:
|
||||||
|
if self.video_capture is None:
|
||||||
|
return
|
||||||
|
self._set_video_playing(not self.video_playing)
|
||||||
|
|
||||||
def open_settings(self) -> None:
|
def open_settings(self) -> None:
|
||||||
dialog = SettingsDialog(self.config, self)
|
dialog = SettingsDialog(self.config, self)
|
||||||
dialog.settings_saved.connect(self.save_camera_settings)
|
dialog.settings_saved.connect(self.save_camera_settings)
|
||||||
@@ -232,7 +280,54 @@ class MainWindow(QMainWindow):
|
|||||||
def save_camera_settings(self, camera_config: dict[str, Any]) -> None:
|
def save_camera_settings(self, camera_config: dict[str, Any]) -> None:
|
||||||
self.config["camera"] = camera_config
|
self.config["camera"] = camera_config
|
||||||
self.app_config.save(self.config)
|
self.app_config.save(self.config)
|
||||||
self.worker.update_camera_config(camera_config)
|
if self.worker is not None:
|
||||||
|
self.worker.update_camera_config(camera_config)
|
||||||
|
|
||||||
|
def _read_video_frame(self) -> None:
|
||||||
|
if self.video_capture is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
ok, frame = self.video_capture.read()
|
||||||
|
if not ok or frame is None:
|
||||||
|
self._set_video_playing(False)
|
||||||
|
self.video_capture.set(cv2.CAP_PROP_POS_FRAMES, 0)
|
||||||
|
self.statusBar().showMessage("Koniec filmu", 3000)
|
||||||
|
return
|
||||||
|
|
||||||
|
self.on_frame_ready(frame)
|
||||||
|
|
||||||
|
def _set_video_playing(self, playing: bool) -> None:
|
||||||
|
self.video_playing = playing
|
||||||
|
if self.video_capture is None:
|
||||||
|
self.video_timer.stop()
|
||||||
|
self.video_play_button.setIcon(self.style().standardIcon(QStyle.SP_MediaPlay))
|
||||||
|
self.video_play_button.setEnabled(False)
|
||||||
|
return
|
||||||
|
|
||||||
|
if playing:
|
||||||
|
fps = self.video_capture.get(cv2.CAP_PROP_FPS)
|
||||||
|
if fps <= 0:
|
||||||
|
fps = float(self.config["camera"].get("fps", 30))
|
||||||
|
interval_ms = max(1, int(round(1000 / fps)))
|
||||||
|
self.video_timer.start(interval_ms)
|
||||||
|
self.video_play_button.setIcon(self.style().standardIcon(QStyle.SP_MediaPause))
|
||||||
|
else:
|
||||||
|
self.video_timer.stop()
|
||||||
|
self.video_play_button.setIcon(self.style().standardIcon(QStyle.SP_MediaPlay))
|
||||||
|
|
||||||
|
def _close_video_capture(self) -> None:
|
||||||
|
self._set_video_playing(False)
|
||||||
|
if self.video_capture is not None:
|
||||||
|
self.video_capture.release()
|
||||||
|
self.video_capture = None
|
||||||
|
self.video_play_button.setEnabled(False)
|
||||||
|
|
||||||
|
def _stop_camera_worker(self) -> None:
|
||||||
|
if self.worker is None:
|
||||||
|
return
|
||||||
|
self.worker.stop()
|
||||||
|
self.worker.wait(2000)
|
||||||
|
self.worker = None
|
||||||
|
|
||||||
def _maybe_request_detection(self, frame: np.ndarray) -> None:
|
def _maybe_request_detection(self, frame: np.ndarray) -> None:
|
||||||
if not self.detecting:
|
if not self.detecting:
|
||||||
@@ -277,10 +372,18 @@ class MainWindow(QMainWindow):
|
|||||||
lines.append(f"Komunikat: {result.error}")
|
lines.append(f"Komunikat: {result.error}")
|
||||||
if result.confidence is not None:
|
if result.confidence is not None:
|
||||||
lines.append(f"YOLO confidence: {result.confidence:.3f}")
|
lines.append(f"YOLO confidence: {result.confidence:.3f}")
|
||||||
|
if result.ocr_engine:
|
||||||
|
lines.append(f"OCR: {result.ocr_engine}")
|
||||||
|
if result.ocr_confidence is not None:
|
||||||
|
lines.append(f"OCR confidence: {result.ocr_confidence:.3f}")
|
||||||
|
if result.ocr_elapsed_ms is not None:
|
||||||
|
lines.append(f"OCR czas: {result.ocr_elapsed_ms:.0f} ms")
|
||||||
if result.parsed:
|
if result.parsed:
|
||||||
lines.append(f"Zamowienie: {result.parsed.order_number or '-'}")
|
lines.append(f"Zamowienie: {result.parsed.order_number or '-'}")
|
||||||
lines.append(f"Kolor: {result.parsed.color_code or '-'}")
|
color_score = _format_score(result.parsed.color_score)
|
||||||
lines.append(f"Model: {result.parsed.product_model or '-'}")
|
model_score = _format_score(result.parsed.product_model_score)
|
||||||
|
lines.append(f"Kolor: {result.parsed.color_code or '-'}{color_score}")
|
||||||
|
lines.append(f"Model: {result.parsed.product_model or '-'}{model_score}")
|
||||||
if result.raw_text:
|
if result.raw_text:
|
||||||
lines.append("")
|
lines.append("")
|
||||||
lines.append(result.raw_text)
|
lines.append(result.raw_text)
|
||||||
@@ -326,7 +429,7 @@ class MainWindow(QMainWindow):
|
|||||||
|
|
||||||
def _draw_fps(self, frame_bgr: np.ndarray) -> None:
|
def _draw_fps(self, frame_bgr: np.ndarray) -> None:
|
||||||
label = f"FPS: {self.display_fps:.1f}"
|
label = f"FPS: {self.display_fps:.1f}"
|
||||||
cv2.rectangle(frame_bgr, (12, 12), (122, 46), (0, 0, 0), -1)
|
cv2.rectangle(frame_bgr, (12, 12), (142, 46), (0, 0, 0), -1)
|
||||||
cv2.putText(
|
cv2.putText(
|
||||||
frame_bgr,
|
frame_bgr,
|
||||||
label,
|
label,
|
||||||
@@ -344,3 +447,9 @@ def run_app(app_config: AppConfig) -> int:
|
|||||||
window = MainWindow(app_config)
|
window = MainWindow(app_config)
|
||||||
window.show()
|
window.show()
|
||||||
return app.exec()
|
return app.exec()
|
||||||
|
|
||||||
|
|
||||||
|
def _format_score(score: float | None) -> str:
|
||||||
|
if score is None:
|
||||||
|
return ""
|
||||||
|
return f" ({score:.2f})"
|
||||||
|
|||||||
4
app/ocr/__init__.py
Normal file
4
app/ocr/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
from app.ocr.base import OcrEngine, OcrLine, OcrResult
|
||||||
|
from app.ocr.factory import create_ocr_engine
|
||||||
|
|
||||||
|
__all__ = ["OcrEngine", "OcrLine", "OcrResult", "create_ocr_engine"]
|
||||||
54
app/ocr/base.py
Normal file
54
app/ocr/base.py
Normal file
@@ -0,0 +1,54 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import Protocol
|
||||||
|
|
||||||
|
import cv2
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class OcrLine:
|
||||||
|
text: str
|
||||||
|
confidence: float | None = None
|
||||||
|
bbox: list[list[float]] | None = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class OcrResult:
|
||||||
|
text: str = ""
|
||||||
|
confidence: float | None = None
|
||||||
|
lines: list[OcrLine] = field(default_factory=list)
|
||||||
|
error: str | None = None
|
||||||
|
elapsed_ms: float = 0.0
|
||||||
|
engine: str = "none"
|
||||||
|
|
||||||
|
|
||||||
|
class OcrEngine(Protocol):
|
||||||
|
name: str
|
||||||
|
|
||||||
|
def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> OcrResult:
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
def crop_bbox(frame_bgr: np.ndarray, bbox: tuple[int, int, int, int], margin: int = 0) -> np.ndarray | None:
|
||||||
|
x1, y1, x2, y2 = bbox
|
||||||
|
h, w = frame_bgr.shape[:2]
|
||||||
|
x1, y1 = max(0, x1 - margin), max(0, y1 - margin)
|
||||||
|
x2, y2 = min(w, x2 + margin), min(h, y2 + margin)
|
||||||
|
if x2 <= x1 or y2 <= y1:
|
||||||
|
return None
|
||||||
|
return frame_bgr[y1:y2, x1:x2]
|
||||||
|
|
||||||
|
|
||||||
|
def prepare_ocr_image(image_bgr: np.ndarray, config: dict) -> np.ndarray:
|
||||||
|
scale = float(config.get("scale", 1.0))
|
||||||
|
if scale != 1.0:
|
||||||
|
image_bgr = cv2.resize(image_bgr, None, fx=scale, fy=scale, interpolation=cv2.INTER_CUBIC)
|
||||||
|
|
||||||
|
if not config.get("threshold", False):
|
||||||
|
return image_bgr
|
||||||
|
|
||||||
|
gray = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2GRAY)
|
||||||
|
gray = cv2.GaussianBlur(gray, (3, 3), 0)
|
||||||
|
return cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]
|
||||||
106
app/ocr/cli.py
Normal file
106
app/ocr/cli.py
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import cv2
|
||||||
|
|
||||||
|
from app.config import AppConfig
|
||||||
|
from app.label_parser import parse_label_text
|
||||||
|
from app.ocr import create_ocr_engine
|
||||||
|
|
||||||
|
|
||||||
|
def iter_images(path: Path) -> list[Path]:
|
||||||
|
if path.is_file():
|
||||||
|
return [path]
|
||||||
|
|
||||||
|
extensions = {".jpg", ".jpeg", ".png", ".bmp", ".webp", ".tif", ".tiff"}
|
||||||
|
return sorted(item for item in path.iterdir() if item.is_file() and item.suffix.lower() in extensions)
|
||||||
|
|
||||||
|
|
||||||
|
def result_to_dict(path: Path, result: Any, config: dict[str, Any]) -> dict[str, Any]:
|
||||||
|
label_cfg = config.get("label_data", {})
|
||||||
|
parsed = parse_label_text(
|
||||||
|
result.text,
|
||||||
|
label_cfg.get("colors", []),
|
||||||
|
label_cfg.get("models", []),
|
||||||
|
model_min_score=float(label_cfg.get("model_min_score", 0.72)),
|
||||||
|
color_min_score=float(label_cfg.get("color_min_score", 0.72)),
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"file": str(path),
|
||||||
|
"engine": result.engine,
|
||||||
|
"elapsed_ms": round(result.elapsed_ms, 2),
|
||||||
|
"confidence": result.confidence,
|
||||||
|
"error": result.error,
|
||||||
|
"text": result.text,
|
||||||
|
"lines": [
|
||||||
|
{
|
||||||
|
"text": line.text,
|
||||||
|
"confidence": line.confidence,
|
||||||
|
"bbox": line.bbox,
|
||||||
|
}
|
||||||
|
for line in result.lines
|
||||||
|
],
|
||||||
|
"parsed": parsed.to_dict(),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> int:
|
||||||
|
parser = argparse.ArgumentParser(description="Test OCR backend on cropped label images.")
|
||||||
|
parser.add_argument("path", help="Image file or directory with crop images")
|
||||||
|
parser.add_argument("--config", default="app_config.json", help="Application config JSON path")
|
||||||
|
parser.add_argument(
|
||||||
|
"--engine",
|
||||||
|
choices=["none", "tesseract", "paddle"],
|
||||||
|
help="Override ocr.engine from config",
|
||||||
|
)
|
||||||
|
parser.add_argument("--no-threshold", action="store_true", help="Disable threshold preprocessing")
|
||||||
|
parser.add_argument("--scale", type=float, help="Override OCR scale")
|
||||||
|
parser.add_argument("--json", action="store_true", help="Print JSON output")
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
app_config = AppConfig(Path(args.config))
|
||||||
|
config = app_config.data
|
||||||
|
if args.engine:
|
||||||
|
config["ocr"]["engine"] = args.engine
|
||||||
|
config["ocr"]["enabled"] = args.engine != "none"
|
||||||
|
if args.no_threshold:
|
||||||
|
config["ocr"]["threshold"] = False
|
||||||
|
if args.scale is not None:
|
||||||
|
config["ocr"]["scale"] = args.scale
|
||||||
|
|
||||||
|
engine = create_ocr_engine(config)
|
||||||
|
outputs = []
|
||||||
|
for image_path in iter_images(Path(args.path)):
|
||||||
|
image = cv2.imread(str(image_path), cv2.IMREAD_COLOR)
|
||||||
|
if image is None:
|
||||||
|
outputs.append({"file": str(image_path), "error": "Nie mozna odczytac obrazu"})
|
||||||
|
continue
|
||||||
|
|
||||||
|
h, w = image.shape[:2]
|
||||||
|
result = engine.read_label(image, (0, 0, w, h))
|
||||||
|
outputs.append(result_to_dict(image_path, result, config))
|
||||||
|
|
||||||
|
if args.json:
|
||||||
|
print(json.dumps(outputs, indent=2, ensure_ascii=False))
|
||||||
|
return 0
|
||||||
|
|
||||||
|
for output in outputs:
|
||||||
|
print(f"file: {output['file']}")
|
||||||
|
print(f"engine: {output.get('engine')}")
|
||||||
|
print(f"elapsed_ms: {output.get('elapsed_ms')}")
|
||||||
|
print(f"confidence: {output.get('confidence')}")
|
||||||
|
if output.get("error"):
|
||||||
|
print(f"error: {output['error']}")
|
||||||
|
print("text:")
|
||||||
|
print(output.get("text") or "")
|
||||||
|
print(f"parsed: {output.get('parsed')}")
|
||||||
|
print()
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
raise SystemExit(main())
|
||||||
24
app/ocr/factory.py
Normal file
24
app/ocr/factory.py
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from app.ocr.base import OcrEngine
|
||||||
|
from app.ocr.none import NoOcrEngine
|
||||||
|
from app.ocr.paddle import PaddleOcrEngine
|
||||||
|
from app.ocr.tesseract import TesseractOcrEngine
|
||||||
|
|
||||||
|
|
||||||
|
def create_ocr_engine(config: dict[str, Any]) -> OcrEngine:
|
||||||
|
ocr_cfg = config.get("ocr", {})
|
||||||
|
if not ocr_cfg.get("enabled", True):
|
||||||
|
return NoOcrEngine(ocr_cfg)
|
||||||
|
|
||||||
|
engine = str(ocr_cfg.get("engine", "tesseract")).lower()
|
||||||
|
if engine in {"none", "off", "disabled"}:
|
||||||
|
return NoOcrEngine(ocr_cfg)
|
||||||
|
if engine == "tesseract":
|
||||||
|
return TesseractOcrEngine(ocr_cfg)
|
||||||
|
if engine == "paddle":
|
||||||
|
return PaddleOcrEngine(ocr_cfg)
|
||||||
|
|
||||||
|
raise ValueError(f"Nieznany silnik OCR: {engine}")
|
||||||
15
app/ocr/none.py
Normal file
15
app/ocr/none.py
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from app.ocr.base import OcrResult
|
||||||
|
|
||||||
|
|
||||||
|
class NoOcrEngine:
|
||||||
|
name = "none"
|
||||||
|
|
||||||
|
def __init__(self, config: dict) -> None:
|
||||||
|
self.config = config
|
||||||
|
|
||||||
|
def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> OcrResult:
|
||||||
|
return OcrResult(engine=self.name)
|
||||||
153
app/ocr/paddle.py
Normal file
153
app/ocr/paddle.py
Normal file
@@ -0,0 +1,153 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import time
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from app.ocr.base import OcrLine, OcrResult, crop_bbox, prepare_ocr_image
|
||||||
|
|
||||||
|
|
||||||
|
class PaddleOcrEngine:
|
||||||
|
name = "paddle"
|
||||||
|
|
||||||
|
def __init__(self, config: dict) -> None:
|
||||||
|
self.config = config
|
||||||
|
self.load_error: str | None = None
|
||||||
|
self.ocr: Any = None
|
||||||
|
self._load()
|
||||||
|
|
||||||
|
def _load(self) -> None:
|
||||||
|
try:
|
||||||
|
from paddleocr import PaddleOCR
|
||||||
|
except Exception as exc:
|
||||||
|
self.load_error = f"Nie mozna zaimportowac PaddleOCR: {exc}"
|
||||||
|
return
|
||||||
|
|
||||||
|
paddle_cfg = dict(self.config.get("paddle", {}))
|
||||||
|
paddle_cfg.setdefault("lang", self.config.get("language", "en"))
|
||||||
|
try:
|
||||||
|
self.ocr = PaddleOCR(**paddle_cfg)
|
||||||
|
except Exception as exc:
|
||||||
|
self.load_error = f"Nie mozna zaladowac PaddleOCR: {exc}"
|
||||||
|
|
||||||
|
def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> OcrResult:
|
||||||
|
started = time.perf_counter()
|
||||||
|
if self.ocr is None:
|
||||||
|
return OcrResult(
|
||||||
|
error=self.load_error or "PaddleOCR nie jest zaladowany",
|
||||||
|
elapsed_ms=self._elapsed_ms(started),
|
||||||
|
engine=self.name,
|
||||||
|
)
|
||||||
|
|
||||||
|
margin = int(self.config.get("margin", 0))
|
||||||
|
roi = crop_bbox(frame_bgr, bbox, margin=margin)
|
||||||
|
if roi is None:
|
||||||
|
return OcrResult(
|
||||||
|
error="Nieprawidlowy bbox OCR",
|
||||||
|
elapsed_ms=self._elapsed_ms(started),
|
||||||
|
engine=self.name,
|
||||||
|
)
|
||||||
|
|
||||||
|
preprocess_config = {
|
||||||
|
**self.config,
|
||||||
|
"threshold": bool(self.config.get("paddle_threshold", False)),
|
||||||
|
}
|
||||||
|
image = prepare_ocr_image(roi, preprocess_config)
|
||||||
|
try:
|
||||||
|
raw_result = self._run_ocr(image)
|
||||||
|
except Exception as exc:
|
||||||
|
return OcrResult(
|
||||||
|
error=f"Blad PaddleOCR: {exc}",
|
||||||
|
elapsed_ms=self._elapsed_ms(started),
|
||||||
|
engine=self.name,
|
||||||
|
)
|
||||||
|
|
||||||
|
lines = self._parse_lines(raw_result)
|
||||||
|
text = "\n".join(line.text for line in lines)
|
||||||
|
confidences = [line.confidence for line in lines if line.confidence is not None]
|
||||||
|
confidence = sum(confidences) / len(confidences) if confidences else None
|
||||||
|
return OcrResult(
|
||||||
|
text=text,
|
||||||
|
confidence=confidence,
|
||||||
|
lines=lines,
|
||||||
|
elapsed_ms=self._elapsed_ms(started),
|
||||||
|
engine=self.name,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _run_ocr(self, image: np.ndarray) -> Any:
|
||||||
|
if hasattr(self.ocr, "predict"):
|
||||||
|
return self.ocr.predict(image)
|
||||||
|
try:
|
||||||
|
return self.ocr.ocr(image, cls=bool(self.config.get("use_angle_cls", True)))
|
||||||
|
except TypeError:
|
||||||
|
return self.ocr.ocr(image)
|
||||||
|
|
||||||
|
def _parse_lines(self, raw_result: Any) -> list[OcrLine]:
|
||||||
|
if raw_result is None:
|
||||||
|
return []
|
||||||
|
|
||||||
|
lines: list[OcrLine] = []
|
||||||
|
for item in self._iter_result_items(raw_result):
|
||||||
|
parsed = self._parse_item(item)
|
||||||
|
if parsed is not None and parsed.text.strip():
|
||||||
|
lines.append(parsed)
|
||||||
|
return lines
|
||||||
|
|
||||||
|
def _iter_result_items(self, raw_result: Any) -> list[Any]:
|
||||||
|
if isinstance(raw_result, dict):
|
||||||
|
texts = raw_result.get("rec_texts") or raw_result.get("texts")
|
||||||
|
scores = raw_result.get("rec_scores") or raw_result.get("scores") or []
|
||||||
|
boxes = raw_result.get("rec_polys") or raw_result.get("dt_polys") or raw_result.get("boxes") or []
|
||||||
|
if texts:
|
||||||
|
return [
|
||||||
|
(boxes[index] if index < len(boxes) else None, (text, scores[index] if index < len(scores) else None))
|
||||||
|
for index, text in enumerate(texts)
|
||||||
|
]
|
||||||
|
return []
|
||||||
|
|
||||||
|
if isinstance(raw_result, list) and len(raw_result) == 1 and isinstance(raw_result[0], list):
|
||||||
|
return raw_result[0]
|
||||||
|
if isinstance(raw_result, list):
|
||||||
|
items = []
|
||||||
|
for result in raw_result:
|
||||||
|
if isinstance(result, dict):
|
||||||
|
items.extend(self._iter_result_items(result))
|
||||||
|
elif isinstance(result, list):
|
||||||
|
items.extend(result)
|
||||||
|
else:
|
||||||
|
items.append(result)
|
||||||
|
return items
|
||||||
|
return [raw_result]
|
||||||
|
|
||||||
|
def _parse_item(self, item: Any) -> OcrLine | None:
|
||||||
|
if not isinstance(item, (list, tuple)):
|
||||||
|
return None
|
||||||
|
|
||||||
|
if len(item) >= 2 and isinstance(item[1], (list, tuple)) and item[1]:
|
||||||
|
text = str(item[1][0])
|
||||||
|
confidence = self._to_float(item[1][1]) if len(item[1]) > 1 else None
|
||||||
|
bbox = self._to_bbox(item[0])
|
||||||
|
return OcrLine(text=text, confidence=confidence, bbox=bbox)
|
||||||
|
|
||||||
|
if len(item) >= 2 and isinstance(item[0], str):
|
||||||
|
return OcrLine(text=str(item[0]), confidence=self._to_float(item[1]))
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _to_float(self, value: Any) -> float | None:
|
||||||
|
try:
|
||||||
|
return float(value)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _to_bbox(self, value: Any) -> list[list[float]] | None:
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return [[float(point[0]), float(point[1])] for point in value]
|
||||||
|
except (TypeError, ValueError, IndexError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _elapsed_ms(self, started: float) -> float:
|
||||||
|
return (time.perf_counter() - started) * 1000.0
|
||||||
104
app/ocr/tesseract.py
Normal file
104
app/ocr/tesseract.py
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from app.ocr.base import OcrLine, OcrResult, crop_bbox, prepare_ocr_image
|
||||||
|
|
||||||
|
|
||||||
|
class TesseractOcrEngine:
|
||||||
|
name = "tesseract"
|
||||||
|
|
||||||
|
def __init__(self, config: dict) -> None:
|
||||||
|
self.config = config
|
||||||
|
self.load_error: str | None = None
|
||||||
|
self.pytesseract = None
|
||||||
|
self._load()
|
||||||
|
|
||||||
|
def _load(self) -> None:
|
||||||
|
try:
|
||||||
|
import pytesseract
|
||||||
|
|
||||||
|
command = self.config.get("tesseract_cmd")
|
||||||
|
if command:
|
||||||
|
pytesseract.pytesseract.tesseract_cmd = command
|
||||||
|
self.pytesseract = pytesseract
|
||||||
|
except Exception as exc:
|
||||||
|
self.load_error = f"Nie mozna zaladowac pytesseract: {exc}"
|
||||||
|
|
||||||
|
def read_label(self, frame_bgr: np.ndarray, bbox: tuple[int, int, int, int]) -> OcrResult:
|
||||||
|
started = time.perf_counter()
|
||||||
|
if self.pytesseract is None:
|
||||||
|
return OcrResult(
|
||||||
|
error=self.load_error or "OCR Tesseract nie jest zaladowany",
|
||||||
|
elapsed_ms=self._elapsed_ms(started),
|
||||||
|
engine=self.name,
|
||||||
|
)
|
||||||
|
|
||||||
|
margin = int(self.config.get("margin", 0))
|
||||||
|
roi = crop_bbox(frame_bgr, bbox, margin=margin)
|
||||||
|
if roi is None:
|
||||||
|
return OcrResult(
|
||||||
|
error="Nieprawidlowy bbox OCR",
|
||||||
|
elapsed_ms=self._elapsed_ms(started),
|
||||||
|
engine=self.name,
|
||||||
|
)
|
||||||
|
|
||||||
|
image = prepare_ocr_image(roi, self.config)
|
||||||
|
psm = int(self.config.get("psm", 6))
|
||||||
|
language = self.config.get("language", "eng")
|
||||||
|
extra_config = str(self.config.get("config", "")).strip()
|
||||||
|
tesseract_config = f"--psm {psm}"
|
||||||
|
if extra_config:
|
||||||
|
tesseract_config = f"{tesseract_config} {extra_config}"
|
||||||
|
|
||||||
|
try:
|
||||||
|
text = self.pytesseract.image_to_string(
|
||||||
|
image,
|
||||||
|
lang=language,
|
||||||
|
config=tesseract_config,
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
return OcrResult(
|
||||||
|
error=f"Blad OCR Tesseract: {exc}",
|
||||||
|
elapsed_ms=self._elapsed_ms(started),
|
||||||
|
engine=self.name,
|
||||||
|
)
|
||||||
|
|
||||||
|
confidence = self._mean_confidence(image, language, tesseract_config)
|
||||||
|
return OcrResult(
|
||||||
|
text=text,
|
||||||
|
confidence=confidence,
|
||||||
|
lines=[OcrLine(text=line) for line in text.splitlines() if line.strip()],
|
||||||
|
elapsed_ms=self._elapsed_ms(started),
|
||||||
|
engine=self.name,
|
||||||
|
)
|
||||||
|
|
||||||
|
def _mean_confidence(self, image: np.ndarray, language: str, tesseract_config: str) -> float | None:
|
||||||
|
if self.pytesseract is None:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
data = self.pytesseract.image_to_data(
|
||||||
|
image,
|
||||||
|
lang=language,
|
||||||
|
config=tesseract_config,
|
||||||
|
output_type=self.pytesseract.Output.DICT,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
values = []
|
||||||
|
for raw_conf in data.get("conf", []):
|
||||||
|
try:
|
||||||
|
confidence = float(raw_conf)
|
||||||
|
except (TypeError, ValueError):
|
||||||
|
continue
|
||||||
|
if confidence >= 0:
|
||||||
|
values.append(confidence / 100.0)
|
||||||
|
if not values:
|
||||||
|
return None
|
||||||
|
return sum(values) / len(values)
|
||||||
|
|
||||||
|
def _elapsed_ms(self, started: float) -> float:
|
||||||
|
return (time.perf_counter() - started) * 1000.0
|
||||||
@@ -20,19 +20,32 @@
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
"detection": {
|
"detection": {
|
||||||
"model_path": "models/best.pt",
|
"model_path": "models/best_v1.pt",
|
||||||
"confidence_threshold": 0.25,
|
"confidence_threshold": 0.25,
|
||||||
"mode": "best",
|
"mode": "best",
|
||||||
"frame_stride": 5,
|
"frame_stride": 30,
|
||||||
"image_size": 640,
|
"image_size": 640,
|
||||||
"device": "cpu"
|
"device": "cpu"
|
||||||
},
|
},
|
||||||
"ocr": {
|
"ocr": {
|
||||||
"enabled": true,
|
"enabled": true,
|
||||||
|
"engine": "paddle",
|
||||||
"language": "eng",
|
"language": "eng",
|
||||||
"tesseract_cmd": null,
|
"tesseract_cmd": null,
|
||||||
|
"psm": 6,
|
||||||
|
"margin": 0,
|
||||||
"threshold": true,
|
"threshold": true,
|
||||||
"scale": 2.0
|
"paddle_threshold": false,
|
||||||
|
"scale": 2.0,
|
||||||
|
"config": "",
|
||||||
|
"use_angle_cls": true,
|
||||||
|
"paddle": {
|
||||||
|
"enable_mkldnn": false,
|
||||||
|
"lang": "en",
|
||||||
|
"use_doc_orientation_classify": false,
|
||||||
|
"use_doc_unwarping": false,
|
||||||
|
"use_textline_orientation": false
|
||||||
|
}
|
||||||
},
|
},
|
||||||
"capture": {
|
"capture": {
|
||||||
"photos_dir": "captures/photos",
|
"photos_dir": "captures/photos",
|
||||||
@@ -45,13 +58,33 @@
|
|||||||
"show_fps": true
|
"show_fps": true
|
||||||
},
|
},
|
||||||
"label_data": {
|
"label_data": {
|
||||||
|
"model_min_score": 0.72,
|
||||||
|
"color_min_score": 0.72,
|
||||||
"models": [
|
"models": [
|
||||||
"Regius",
|
"Regius 6",
|
||||||
"Duvell"
|
"Regius 7",
|
||||||
|
"Duvell 6",
|
||||||
|
"Duvell 7",
|
||||||
|
"Duvell Elite 6",
|
||||||
|
"Duvell Elite 7"
|
||||||
],
|
],
|
||||||
"colors": [
|
"colors": [
|
||||||
"T-NF-BLK-OUT-BST-G",
|
"T-NF-BLK-OUT-BST-G",
|
||||||
"T-BLK-G"
|
"T-BLK-G",
|
||||||
|
"T-BLK-S",
|
||||||
|
"T-BLK-M",
|
||||||
|
"M-BLK-G",
|
||||||
|
"M-BLK-S",
|
||||||
|
"M-BLK-M",
|
||||||
|
"T-CST-G",
|
||||||
|
"T-CST-S",
|
||||||
|
"T-CST-M",
|
||||||
|
"T-ANTIQUE-G",
|
||||||
|
"T-ANTIQUE-S",
|
||||||
|
"T-ANTIQUE-M",
|
||||||
|
"T-NAT-G",
|
||||||
|
"T-NAT-S",
|
||||||
|
"T-NAT-M"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
2
requirements-ocr-paddle.txt
Normal file
2
requirements-ocr-paddle.txt
Normal file
@@ -0,0 +1,2 @@
|
|||||||
|
paddlepaddle
|
||||||
|
paddleocr
|
||||||
Reference in New Issue
Block a user