281 lines
10 KiB
Python
281 lines
10 KiB
Python
"""Step 1: detect chessboard corners / ellipse centers and write per-image JSON."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Optional, Tuple
|
|
|
|
import cv2
|
|
import numpy as np
|
|
from tqdm import tqdm
|
|
|
|
from calibrationclasses.feature_json import FeatureRecord, save_feature_json
|
|
from calibrationclasses.preprocessing import Preprocessing
|
|
from calibrationclasses.session import (
|
|
CameraFolder,
|
|
json_path_for_image,
|
|
list_cameras_present,
|
|
list_image_paths,
|
|
resolve_session_root,
|
|
)
|
|
from calibrationclasses.timestamp import parse_pair_key, parse_timestamp_sec
|
|
|
|
|
|
@dataclass
|
|
class DetectionConfig:
|
|
chessboard_size: Tuple[int, int] = (8, 7)
|
|
square_size: float = 0.045
|
|
preprocessing: str = "None"
|
|
ir_mode: str = "auto" # auto | chessboard | ellipse
|
|
troubleshooting: bool = False
|
|
|
|
|
|
class FeatureDetector:
|
|
def __init__(self, config: DetectionConfig, corners_root: Optional[Path] = None):
|
|
self.config = config
|
|
self._preprocessor = Preprocessing()
|
|
self.corners_root = corners_root
|
|
|
|
def _preprocessing_enabled(self) -> bool:
|
|
spec = (self.config.preprocessing or "").strip().lower()
|
|
return bool(spec) and spec not in ("none", "off", "false", "0")
|
|
|
|
def _preprocess(self, image: np.ndarray) -> np.ndarray:
|
|
if image is None or not self._preprocessing_enabled():
|
|
return image
|
|
spec = (
|
|
(self.config.preprocessing or "")
|
|
.strip()
|
|
.lower()
|
|
.replace("none", "")
|
|
.replace(",", "")
|
|
.replace(" ", "")
|
|
)
|
|
out = image
|
|
pp = self._preprocessor
|
|
for ch in spec:
|
|
if ch == "g":
|
|
g = pp.gray(out)
|
|
out = cv2.cvtColor(g, cv2.COLOR_GRAY2BGR)
|
|
elif ch == "c":
|
|
c = pp.clahe(out)
|
|
out = cv2.cvtColor(c, cv2.COLOR_GRAY2BGR)
|
|
elif ch == "t":
|
|
t = pp.threshold(out)
|
|
out = cv2.cvtColor(t, cv2.COLOR_GRAY2BGR)
|
|
return out
|
|
|
|
@staticmethod
|
|
def _to_gray(image: np.ndarray) -> np.ndarray:
|
|
if len(image.shape) == 2:
|
|
return image
|
|
return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
|
|
|
def detect_chessboard(
|
|
self, image: np.ndarray, board_size: Tuple[int, int]
|
|
) -> Optional[np.ndarray]:
|
|
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)
|
|
found, corners = cv2.findChessboardCorners(image, board_size, None)
|
|
if not found:
|
|
return None
|
|
corners = cv2.cornerSubPix(
|
|
self._to_gray(image), corners, (11, 11), (-1, -1), criteria
|
|
)
|
|
return corners
|
|
|
|
def detect_ellipse(self, image: np.ndarray):
|
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
|
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
|
|
enhanced = clahe.apply(gray)
|
|
blurred = cv2.GaussianBlur(enhanced, (5, 5), 0)
|
|
_, binary = cv2.threshold(blurred, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
|
|
if np.sum(binary == 255) / binary.size > 0.5:
|
|
binary = cv2.bitwise_not(binary)
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (9, 9))
|
|
closed = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
|
|
contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
if not contours:
|
|
return None
|
|
valid = []
|
|
for cnt in contours:
|
|
area = cv2.contourArea(cnt)
|
|
if area < 100:
|
|
continue
|
|
x, y, w, h = cv2.boundingRect(cnt)
|
|
if 0.75 < (w / h) < 1.25:
|
|
valid.append(cnt)
|
|
if not valid:
|
|
return None
|
|
best = max(valid, key=cv2.contourArea)
|
|
if len(best) < 5:
|
|
return None
|
|
ellipse = cv2.fitEllipse(best)
|
|
(cx, cy), (major, minor), angle = ellipse
|
|
return (cx, cy), {
|
|
"center": [float(cx), float(cy)],
|
|
"axes": [float(major), float(minor)],
|
|
"angle": float(angle),
|
|
}
|
|
|
|
def _save_corner_overlay(
|
|
self,
|
|
image: np.ndarray,
|
|
record: FeatureRecord,
|
|
board_size: Tuple[int, int],
|
|
) -> None:
|
|
if self.corners_root is None:
|
|
return
|
|
out_dir = self.corners_root / record.camera_folder
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
vis = image.copy()
|
|
if record.feature_type == "chessboard" and record.corners is not None:
|
|
vis = cv2.drawChessboardCorners(vis, board_size, record.corners, True)
|
|
elif record.feature_type == "ellipse" and record.center is not None:
|
|
cx, cy = record.center
|
|
cv2.circle(vis, (int(cx), int(cy)), 12, (0, 255, 0), 2)
|
|
out_path = out_dir / record.image_path.name
|
|
cv2.imwrite(str(out_path), vis)
|
|
|
|
def process_image(
|
|
self,
|
|
image_path: Path,
|
|
camera: CameraFolder,
|
|
board_size: Optional[Tuple[int, int]] = None,
|
|
square_size: Optional[float] = None,
|
|
) -> FeatureRecord:
|
|
board_size = board_size or self.config.chessboard_size
|
|
square_size = square_size if square_size is not None else self.config.square_size
|
|
json_path = json_path_for_image(image_path)
|
|
|
|
base = FeatureRecord(
|
|
image_path=image_path,
|
|
json_path=json_path,
|
|
camera_folder=camera.folder_name,
|
|
feature_type="unknown",
|
|
success=False,
|
|
preprocessing=self.config.preprocessing,
|
|
timestamp_sec=parse_timestamp_sec(image_path.name),
|
|
pair_key=parse_pair_key(image_path.name),
|
|
)
|
|
|
|
image = cv2.imread(str(image_path))
|
|
if image is None:
|
|
base.error = "failed to load image"
|
|
save_feature_json(base)
|
|
return base
|
|
|
|
proc = self._preprocess(image)
|
|
use_ellipse = camera.logical_name == "ir" and self.config.ir_mode in (
|
|
"ellipse",
|
|
"auto",
|
|
)
|
|
|
|
if not use_ellipse or self.config.ir_mode == "auto":
|
|
corners = self.detect_chessboard(proc, board_size)
|
|
if corners is not None:
|
|
record = FeatureRecord(
|
|
image_path=image_path,
|
|
json_path=json_path,
|
|
camera_folder=camera.folder_name,
|
|
feature_type="chessboard",
|
|
success=True,
|
|
board_size=board_size,
|
|
square_size=square_size,
|
|
corners=corners,
|
|
preprocessing=self.config.preprocessing,
|
|
timestamp_sec=base.timestamp_sec,
|
|
pair_key=base.pair_key,
|
|
)
|
|
save_feature_json(record)
|
|
if self.config.troubleshooting:
|
|
self._save_corner_overlay(image, record, board_size)
|
|
return record
|
|
|
|
if use_ellipse:
|
|
result = self.detect_ellipse(image)
|
|
if result is not None:
|
|
(cx, cy), ellipse = result
|
|
record = FeatureRecord(
|
|
image_path=image_path,
|
|
json_path=json_path,
|
|
camera_folder=camera.folder_name,
|
|
feature_type="ellipse",
|
|
success=True,
|
|
center=(cx, cy),
|
|
ellipse=ellipse,
|
|
preprocessing=self.config.preprocessing,
|
|
timestamp_sec=base.timestamp_sec,
|
|
pair_key=base.pair_key,
|
|
)
|
|
save_feature_json(record)
|
|
if self.config.troubleshooting:
|
|
self._save_corner_overlay(image, record, board_size)
|
|
return record
|
|
|
|
base.feature_type = "chessboard" if not use_ellipse else "ellipse"
|
|
base.error = "no features detected"
|
|
save_feature_json(base)
|
|
if self.config.troubleshooting:
|
|
print(f"[detect] FAIL {image_path.name}: {base.error}")
|
|
return base
|
|
|
|
def process_camera(
|
|
self,
|
|
camera: CameraFolder,
|
|
board_size: Optional[Tuple[int, int]] = None,
|
|
square_size: Optional[float] = None,
|
|
) -> Tuple[int, int]:
|
|
images = list_image_paths(camera.path)
|
|
if not images:
|
|
print(f"[WARN] No images in {camera.path}")
|
|
return 0, 0
|
|
|
|
detected = 0
|
|
iterator = (
|
|
tqdm(images, unit="img", dynamic_ncols=True)
|
|
if self.config.troubleshooting
|
|
else images
|
|
)
|
|
for image_path in iterator:
|
|
record = self.process_image(
|
|
image_path, camera, board_size=board_size, square_size=square_size
|
|
)
|
|
if record.success:
|
|
detected += 1
|
|
if self.config.troubleshooting and hasattr(iterator, "set_description"):
|
|
iterator.set_description(
|
|
f"{camera.logical_name} | detected {detected}/{len(images)}"
|
|
)
|
|
print(f"[{camera.logical_name}] {detected}/{len(images)} features detected")
|
|
return detected, len(images)
|
|
|
|
|
|
def run_detection(
|
|
input_path: str | Path,
|
|
config: DetectionConfig,
|
|
cameras: Optional[list[str]] = None,
|
|
per_camera_board: Optional[dict] = None,
|
|
) -> None:
|
|
session_root = resolve_session_root(input_path)
|
|
present = list_cameras_present(session_root)
|
|
if cameras:
|
|
wanted = set(cameras)
|
|
present = [c for c in present if c.logical_name in wanted]
|
|
|
|
if not present:
|
|
raise FileNotFoundError(f"No camera folders found under {session_root}")
|
|
|
|
corners_root = None
|
|
if config.troubleshooting:
|
|
corners_root = Path(input_path) / "corners"
|
|
print(f"[detect] troubleshooting: corner overlays → {corners_root}")
|
|
|
|
detector = FeatureDetector(config, corners_root=corners_root)
|
|
per_camera_board = per_camera_board or {}
|
|
|
|
for camera in present:
|
|
board = per_camera_board.get(camera.logical_name, {}).get("board_size")
|
|
square = per_camera_board.get(camera.logical_name, {}).get("square_size")
|
|
detector.process_camera(camera, board_size=board, square_size=square)
|