1572 lines
64 KiB
Python
1572 lines
64 KiB
Python
from typing import List, Optional, Tuple
|
|
import re
|
|
import cv2
|
|
import numpy as np
|
|
import os
|
|
import shutil
|
|
from tqdm import tqdm
|
|
from calibrationclasses.preprocessing import Preprocessing
|
|
from pathlib import Path
|
|
|
|
|
|
class StereoCalibration:
|
|
"""Stereo calibration class.
|
|
|
|
Parameters
|
|
----------
|
|
input_path : str
|
|
path to input data
|
|
chessboard_size : Tuple
|
|
default inner corner grid (width, height) for both cameras
|
|
square_size : float
|
|
default chessboard square size in meters for both cameras
|
|
chessboard_size_left, chessboard_size_right : Tuple, optional
|
|
per-camera inner corner grids (default: chessboard_size)
|
|
square_size_left, square_size_right : float, optional
|
|
per-camera square sizes in meters (default: square_size)
|
|
preprocessing: str
|
|
default = none, G = Gray, C = Clahe, T = Threshold
|
|
left_camera: str
|
|
Which folder to use as the left view: lc or lc-ir (folder may be lc-ir or lc_ir on disk).
|
|
right_camera: str
|
|
Which folder to use as the stereo 'right' view: rc, rg/rgb, or ir (IR folder may be named IR).
|
|
troubleshooting: bool
|
|
If True, save debug folders (corners, local_coords, images_ncb, rectified).
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
input_path: str,
|
|
chessboard_size: Tuple,
|
|
square_size: float,
|
|
preprocessing: str,
|
|
calibration_mode: str = "checkerboard",
|
|
chessboard_size_left: Optional[Tuple] = None,
|
|
chessboard_size_right: Optional[Tuple] = None,
|
|
square_size_left: Optional[float] = None,
|
|
square_size_right: Optional[float] = None,
|
|
left_camera: str = "lc",
|
|
right_camera: str = "rc",
|
|
troubleshooting: bool = False,
|
|
) -> None:
|
|
self.input_path = input_path
|
|
requested_left_camera = left_camera.lower().replace("_", "-")
|
|
if requested_left_camera not in ("lc", "lc-ir"):
|
|
raise ValueError(
|
|
f"left_camera must be one of lc, lc-ir; got {left_camera!r}"
|
|
)
|
|
self.left_camera = requested_left_camera
|
|
self._left_folder_aliases = {
|
|
"lc": ["lc"],
|
|
"lc-ir": ["lc-ir", "lc_ir", "LC-IR"],
|
|
}[self.left_camera]
|
|
|
|
images_dir = os.path.join(input_path, "images")
|
|
nested_left = any(
|
|
os.path.isdir(os.path.join(images_dir, folder))
|
|
for folder in self._left_folder_aliases
|
|
)
|
|
flat_left = any(
|
|
os.path.isdir(os.path.join(input_path, folder))
|
|
for folder in self._left_folder_aliases
|
|
)
|
|
if nested_left:
|
|
self.image_path = images_dir
|
|
elif flat_left:
|
|
self.image_path = input_path
|
|
else:
|
|
raise FileNotFoundError(
|
|
f"Could not find left camera folder for {self.left_camera!r}. "
|
|
f"Tried under {images_dir} (nested) and {input_path} (flat): "
|
|
f"{self._left_folder_aliases}"
|
|
)
|
|
self.chessboard_size = chessboard_size
|
|
self.square_size = square_size
|
|
self.chessboard_size_left = chessboard_size_left or chessboard_size
|
|
self.chessboard_size_right = chessboard_size_right or chessboard_size
|
|
self.square_size_left = (
|
|
square_size if square_size_left is None else square_size_left
|
|
)
|
|
self.square_size_right = (
|
|
square_size if square_size_right is None else square_size_right
|
|
)
|
|
self.preprocessing = preprocessing
|
|
self.calibration_mode = calibration_mode
|
|
requested_right_camera = right_camera.lower()
|
|
if requested_right_camera not in ("rc", "rgb", "rg", "ir"):
|
|
raise ValueError(
|
|
f"right_camera must be one of rc, rgb, rg, ir; got {right_camera!r}"
|
|
)
|
|
# Normalize rgb/rg to a single partner label for params naming.
|
|
if requested_right_camera in ("rgb", "rg"):
|
|
self.right_camera = "rg"
|
|
else:
|
|
self.right_camera = requested_right_camera
|
|
self._right_subdir = self.right_camera
|
|
self.troubleshooting = troubleshooting
|
|
self.corners_path = (
|
|
os.path.join(input_path, "corners") if self.troubleshooting else None
|
|
)
|
|
self.local_coords_path = (
|
|
os.path.join(input_path, "local_coords") if self.troubleshooting else None
|
|
)
|
|
self.ncb_path = (
|
|
os.path.join(input_path, "images_ncb") if self.troubleshooting else None
|
|
)
|
|
self._preprocessor = Preprocessing()
|
|
self._initialize()
|
|
self._init_paths()
|
|
|
|
def _preprocessing_enabled(self) -> bool:
|
|
spec = (self.preprocessing or "").strip().lower()
|
|
return bool(spec) and spec not in ("none", "off", "false", "0")
|
|
|
|
def _preprocess_for_detection(self, image: np.ndarray) -> np.ndarray:
|
|
"""
|
|
Apply optional preprocessing before chessboard corner detection.
|
|
|
|
Chain letters in order (case-insensitive): G=grayscale, C=CLAHE, T=Otsu threshold
|
|
(T uses CLAHE internally per Preprocessing.threshold). Examples: C, GC, GCT.
|
|
"""
|
|
if image is None or not self._preprocessing_enabled():
|
|
return image
|
|
spec = (
|
|
(self.preprocessing or "")
|
|
.strip()
|
|
.lower()
|
|
.replace("none", "")
|
|
.replace(",", "")
|
|
.replace(" ", "")
|
|
)
|
|
if not spec:
|
|
return image
|
|
out = image
|
|
pp = self._preprocessor
|
|
for ch in spec:
|
|
if ch == "g":
|
|
g = pp.gray(out)
|
|
out = cv2.cvtColor(g, cv2.COLOR_GRAY2BGR)
|
|
elif ch == "c":
|
|
c = pp.clahe(out)
|
|
out = cv2.cvtColor(c, cv2.COLOR_GRAY2BGR)
|
|
elif ch == "t":
|
|
t = pp.threshold(out)
|
|
out = cv2.cvtColor(t, cv2.COLOR_GRAY2BGR)
|
|
return out
|
|
|
|
@staticmethod
|
|
def _to_gray_for_subpix(image: np.ndarray) -> np.ndarray:
|
|
if image is None:
|
|
return None
|
|
if len(image.shape) == 2:
|
|
return image
|
|
if image.shape[2] == 1:
|
|
return np.squeeze(image, axis=2)
|
|
return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
|
|
|
@staticmethod
|
|
def _refresh_detection_progress_bar(
|
|
pbar, idx: int, total: int, count: int, label: str = "detected"
|
|
) -> None:
|
|
"""Live status on the tqdm bar (avoid print() inside loops)."""
|
|
fmt = getattr(pbar, "format_dict", None) or {}
|
|
rate = fmt.get("rate")
|
|
if rate is None or not isinstance(rate, (int, float)) or not np.isfinite(rate) or rate <= 0:
|
|
rate_str = "…"
|
|
else:
|
|
rate_str = f"{rate:.1f}"
|
|
pbar.set_description(
|
|
f"Processing image {idx}/{total} | {label}: {count} | {rate_str} it/s",
|
|
refresh=True,
|
|
)
|
|
|
|
def _left_filename_prefixes(self) -> Tuple[str, ...]:
|
|
if self.left_camera == "lc-ir":
|
|
return ("lc-ir", "lcir", "lc_ir", "lc")
|
|
return ("lc",)
|
|
|
|
def _strip_left_filename_prefix(self, filename: str) -> str:
|
|
name = filename.lower()
|
|
for prefix in sorted(self._left_filename_prefixes(), key=len, reverse=True):
|
|
if name.startswith(prefix):
|
|
return name[len(prefix) :].lstrip("_-.")
|
|
return name
|
|
|
|
@staticmethod
|
|
def _pair_key_after_prefix(name: str, prefix: str) -> str:
|
|
"""Strip camera prefix and normalize so lc_123 matches ir_123 / rc_123."""
|
|
return Path(name[len(prefix) :].lstrip("_-.")).stem
|
|
|
|
def extract_key_lc(self, path: str):
|
|
"""Filename key to pair the left camera with rc/rgb/ir."""
|
|
name = Path(path).name.lower()
|
|
for prefix in sorted(self._left_filename_prefixes(), key=len, reverse=True):
|
|
if name.startswith(prefix):
|
|
return self._pair_key_after_prefix(name, prefix)
|
|
m = re.search(r"(scan\d{6})", name)
|
|
if m:
|
|
return m.group(1)
|
|
return None
|
|
|
|
def extract_key_other(self, path: str):
|
|
name = Path(path).name.lower()
|
|
# IR_scan_000001.png → same scan id as scan000001_* in LC folder
|
|
m = re.match(r"^ir_scan_(\d+)", name)
|
|
if m:
|
|
return f"scan{int(m.group(1)):06d}"
|
|
for prefix in ("rc", "rgb", "rg", "ir"):
|
|
if name.startswith(prefix):
|
|
return self._pair_key_after_prefix(name, prefix)
|
|
return None
|
|
|
|
def get_camera_type(path):
|
|
name = os.path.basename(path).lower()
|
|
if name.startswith("rc"):
|
|
return "rc"
|
|
elif name.startswith("rg"):
|
|
return "rg"
|
|
elif name.startswith("rgb"):
|
|
return "rgb"
|
|
elif name.startswith("ir"):
|
|
return "ir"
|
|
return "unknown"
|
|
|
|
def _init_paths(self) -> None:
|
|
"""Initialize paths for the selected left and right camera folders."""
|
|
|
|
left_dir = None
|
|
for folder in self._left_folder_aliases:
|
|
path = os.path.join(self.image_path, folder)
|
|
if os.path.isdir(path):
|
|
left_dir = path
|
|
break
|
|
if left_dir is None:
|
|
raise FileNotFoundError(
|
|
f"Left camera folder for {self.left_camera!r} not found under "
|
|
f"{self.image_path}. Tried: {self._left_folder_aliases}"
|
|
)
|
|
self._left_image_path = left_dir
|
|
self._left_subdir = os.path.basename(left_dir)
|
|
|
|
self._right_image_paths_all = []
|
|
|
|
right_folder_aliases = {
|
|
"rc": ["rc"],
|
|
"rg": ["rg", "rgb"],
|
|
"ir": ["ir", "IR"],
|
|
}
|
|
aliases = right_folder_aliases[self.right_camera]
|
|
right_dir = None
|
|
for folder in aliases:
|
|
path = os.path.join(self.image_path, folder)
|
|
if os.path.isdir(path):
|
|
right_dir = path
|
|
break
|
|
if right_dir is None:
|
|
raise FileNotFoundError(
|
|
f"Right camera folder for {self.right_camera!r} not found under "
|
|
f"{self.image_path}. Tried: {aliases}"
|
|
)
|
|
# Use discovered folder name for writing corners/local_coords/rectified outputs.
|
|
self._right_subdir = os.path.basename(right_dir)
|
|
|
|
names = [
|
|
name
|
|
for name in os.listdir(right_dir)
|
|
if name.lower().endswith((".bmp", ".png", ".jpg"))
|
|
]
|
|
names.sort()
|
|
self._right_image_paths_all = [os.path.join(right_dir, name) for name in names]
|
|
|
|
# --- Load left camera images ---
|
|
self._left_image_names = [
|
|
name for name in os.listdir(self._left_image_path)
|
|
if name.lower().endswith((".bmp", ".png", ".jpg"))
|
|
]
|
|
self._left_image_names.sort()
|
|
|
|
self._left_image_paths = [
|
|
os.path.join(self._left_image_path, name)
|
|
for name in self._left_image_names
|
|
]
|
|
|
|
# --- Initialize containers ---
|
|
self._valid_left_image_paths = []
|
|
self._valid_right_image_paths = []
|
|
|
|
self._left_image_paths_pairs = []
|
|
self._right_image_paths_all_pairs = []
|
|
|
|
def _initialize(self) -> None:
|
|
self._chessboard_3d_left_points: List[np.ndarray] = []
|
|
self._chessboard_2d_left_points: List[np.ndarray] = []
|
|
self._chessboard_3d_right_points: List[np.ndarray] = []
|
|
self._chessboard_2d_right_points: List[np.ndarray] = []
|
|
|
|
self._chessboard_3d_left_points_pairs: List[np.ndarray] = []
|
|
self._chessboard_2d_left_points_pairs: List[np.ndarray] = []
|
|
self._chessboard_3d_right_points_pairs: List[np.ndarray] = []
|
|
self._chessboard_2d_right_points_pairs: List[np.ndarray] = []
|
|
|
|
self._stereo_map_left: Tuple[np.ndarray, np.ndarray] = []
|
|
self._stereo_map_right: Tuple[np.ndarray, np.ndarray] = []
|
|
|
|
self._left_image_size: Tuple[int, int] = None
|
|
self._right_image_size: Tuple[int, int] = None
|
|
self._Parameters: dict = {}
|
|
|
|
def create_3d_chessboard_points(
|
|
self,
|
|
chessboard_size: Optional[Tuple] = None,
|
|
square_size: Optional[float] = None,
|
|
) -> np.ndarray:
|
|
"""Create 3D chessboard points for a given chessboard size and square size.
|
|
|
|
Returns
|
|
-------
|
|
np.ndarray
|
|
Nx3 array of 3D points: N = number of squares in chessboard
|
|
"""
|
|
board = chessboard_size if chessboard_size is not None else self.chessboard_size_left
|
|
square = square_size if square_size is not None else self.square_size_left
|
|
|
|
# create Nx3 array of 3D points: N = number of squares in chessboard
|
|
chessboard_points = np.zeros((np.prod(board), 3), np.float32)
|
|
# fill x, y coordinates with indices of chessboard points
|
|
chessboard_points[:, :2] = np.indices(board).T.reshape(-1, 2)
|
|
# scale coordinates by square size
|
|
chessboard_points *= square
|
|
return chessboard_points
|
|
|
|
def create_2d_chessboard_points(
|
|
self, image: np.ndarray, chessboard_size: Optional[Tuple] = None
|
|
) -> np.ndarray:
|
|
"""Create 2D chessboard points for a given image and chessboard size.
|
|
|
|
Parameters
|
|
----------
|
|
image : np.ndarray
|
|
image of chessboard
|
|
|
|
Returns
|
|
-------
|
|
np.ndarray
|
|
Nx1x2 array of 2D points: N = number of corners found in the image
|
|
"""
|
|
board = (
|
|
chessboard_size
|
|
if chessboard_size is not None
|
|
else self.chessboard_size_left
|
|
)
|
|
# termination criteria
|
|
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)
|
|
# find chessboard corners
|
|
found, corners = cv2.findChessboardCorners(image, board, None)
|
|
if not found:
|
|
return None
|
|
# refine corner positions
|
|
corners = cv2.cornerSubPix(
|
|
self._to_gray_for_subpix(image),
|
|
corners,
|
|
(11, 11),
|
|
(-1, -1),
|
|
criteria,
|
|
)
|
|
return corners
|
|
|
|
def plot_chessboard(
|
|
self,
|
|
image: np.ndarray,
|
|
corners: np.ndarray,
|
|
chessboard_size: Optional[Tuple] = None,
|
|
) -> np.ndarray:
|
|
"""Plot chessboard corners on the image.
|
|
|
|
Parameters
|
|
----------
|
|
image : np.ndarray
|
|
image of chessboard
|
|
corners : np.ndarray
|
|
Nx1x2 array of 2D points: N = number of corners found in the image
|
|
"""
|
|
board = (
|
|
chessboard_size
|
|
if chessboard_size is not None
|
|
else self.chessboard_size_left
|
|
)
|
|
image = cv2.drawChessboardCorners(image, board, corners, True)
|
|
return image
|
|
|
|
def plot_local_coordinates(
|
|
self,
|
|
image: np.ndarray,
|
|
corners: np.ndarray,
|
|
chessboard_size: Optional[Tuple] = None,
|
|
) -> np.ndarray:
|
|
"""Plot local coordinates on the image.
|
|
|
|
Parameters
|
|
----------
|
|
image : np.ndarray
|
|
image of chessboard
|
|
corners : np.ndarray
|
|
Nx1x2 array of 2D points: N = number of corners found in the image
|
|
"""
|
|
board = (
|
|
chessboard_size
|
|
if chessboard_size is not None
|
|
else self.chessboard_size_left
|
|
)
|
|
corners = np.int32(corners)
|
|
cv2.arrowedLine(image, tuple(corners[0,0]), tuple(corners[4,0]), (255,0,0), 3, tipLength=0.05)
|
|
cv2.arrowedLine(image, tuple(corners[0,0]), tuple(corners[board[0]*2,0]), (255,0,0), 3, tipLength=0.05)
|
|
cv2.circle(image, tuple(corners[0,0]), 8, (0,255,0), 3)
|
|
cv2.putText(image, '0,0', (corners[0,0,0]-35, corners[0,0,1]-15), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2, cv2.LINE_AA)
|
|
cv2.putText(image, 'X', (corners[4,0,0]-25, corners[4,0,1]-15), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2, cv2.LINE_AA)
|
|
cv2.putText(image, 'Y', (corners[board[0]*2,0,0]-25, corners[board[0]*2,0,1]-15), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2, cv2.LINE_AA)
|
|
return image
|
|
|
|
def create_chessboard_points_left(self) -> None:
|
|
"""Create 3D and 2D chessboard points for left images."""
|
|
|
|
last_loaded_shape = None
|
|
paths = self._left_image_paths
|
|
n_total = len(paths)
|
|
detected = 0
|
|
with tqdm(paths, unit="img", dynamic_ncols=True, smoothing=0.05) as pbar:
|
|
for idx, left_image_path in enumerate(pbar, start=1):
|
|
left_image = cv2.imread(left_image_path)
|
|
|
|
left_name = left_image_path.split("/")[-1]
|
|
|
|
if left_image is None:
|
|
tqdm.write(f"Warning: Failed to load image {left_name}. Skipping.")
|
|
self._refresh_detection_progress_bar(pbar, idx, n_total, detected)
|
|
continue
|
|
last_loaded_shape = left_image.shape
|
|
|
|
proc_image = self._preprocess_for_detection(left_image)
|
|
left_corners = self.create_2d_chessboard_points(
|
|
proc_image, self.chessboard_size_left
|
|
)
|
|
|
|
if left_corners is not None:
|
|
detected += 1
|
|
self._chessboard_3d_left_points.append(
|
|
self.create_3d_chessboard_points(
|
|
self.chessboard_size_left, self.square_size_left
|
|
)
|
|
)
|
|
self._chessboard_2d_left_points.append(left_corners)
|
|
self._valid_left_image_paths.append(left_image_path)
|
|
if self.corners_path is not None:
|
|
left_image = self.plot_chessboard(
|
|
left_image,
|
|
left_corners,
|
|
self.chessboard_size_left,
|
|
)
|
|
left_image_name = os.path.basename(left_image_path)
|
|
left_image_path = os.path.join(
|
|
self.corners_path, self._left_subdir, left_image_name
|
|
)
|
|
os.makedirs(os.path.dirname(left_image_path), exist_ok=True)
|
|
cv2.imwrite(left_image_path, left_image)
|
|
if self.local_coords_path is not None:
|
|
left_image = self.plot_local_coordinates(
|
|
left_image, left_corners, self.chessboard_size_left
|
|
)
|
|
left_image_name = os.path.basename(left_image_path)
|
|
left_image_path = os.path.join(
|
|
self.local_coords_path, self._left_subdir, left_image_name
|
|
)
|
|
os.makedirs(os.path.dirname(left_image_path), exist_ok=True)
|
|
cv2.imwrite(left_image_path, left_image)
|
|
|
|
self._refresh_detection_progress_bar(pbar, idx, n_total, detected)
|
|
|
|
if last_loaded_shape is not None:
|
|
self._left_image_size = (last_loaded_shape[1], last_loaded_shape[0])
|
|
elif self._left_image_paths:
|
|
probe = cv2.imread(self._left_image_paths[0])
|
|
self._left_image_size = (
|
|
(probe.shape[1], probe.shape[0]) if probe is not None else (0, 0)
|
|
)
|
|
else:
|
|
self._left_image_size = (0, 0)
|
|
print(
|
|
f"Number of images with detected chessboards for left ({self.left_camera}): ",
|
|
len(self._chessboard_2d_left_points),
|
|
)
|
|
|
|
def create_chessboard_points_right(self) -> None:
|
|
"""Create 3D and 2D chessboard points for right images."""
|
|
|
|
paths = self._right_image_paths_all
|
|
n_total = len(paths)
|
|
detected = 0
|
|
with tqdm(paths, unit="img", dynamic_ncols=True, smoothing=0.05) as pbar:
|
|
for idx, right_image_path in enumerate(pbar, start=1):
|
|
right_image = cv2.imread(right_image_path)
|
|
|
|
right_name = right_image_path.split("/")[-1]
|
|
if right_image is None:
|
|
tqdm.write(f"Warning: Failed to load image {right_name}. Skipping.")
|
|
self._refresh_detection_progress_bar(pbar, idx, n_total, detected)
|
|
continue
|
|
|
|
proc_image = self._preprocess_for_detection(right_image)
|
|
right_corners = self.create_2d_chessboard_points(
|
|
proc_image, self.chessboard_size_right
|
|
)
|
|
|
|
if right_corners is not None:
|
|
detected += 1
|
|
self._chessboard_3d_right_points.append(
|
|
self.create_3d_chessboard_points(
|
|
self.chessboard_size_right, self.square_size_right
|
|
)
|
|
)
|
|
self._chessboard_2d_right_points.append(right_corners)
|
|
self._valid_right_image_paths.append(right_image_path)
|
|
if self.corners_path is not None:
|
|
right_image = self.plot_chessboard(
|
|
right_image,
|
|
right_corners,
|
|
self.chessboard_size_right,
|
|
)
|
|
right_image_name = os.path.basename(right_image_path)
|
|
right_image_path = os.path.join(
|
|
self.corners_path, self._right_subdir, right_image_name
|
|
)
|
|
os.makedirs(os.path.dirname(right_image_path), exist_ok=True)
|
|
cv2.imwrite(right_image_path, right_image)
|
|
if self.local_coords_path is not None:
|
|
right_image = self.plot_local_coordinates(
|
|
right_image,
|
|
right_corners,
|
|
self.chessboard_size_right,
|
|
)
|
|
right_image_name = os.path.basename(right_image_path)
|
|
right_image_path = os.path.join(
|
|
self.local_coords_path, self._right_subdir, right_image_name
|
|
)
|
|
os.makedirs(os.path.dirname(right_image_path), exist_ok=True)
|
|
cv2.imwrite(right_image_path, right_image)
|
|
|
|
self._refresh_detection_progress_bar(pbar, idx, n_total, detected)
|
|
|
|
if self._valid_right_image_paths:
|
|
probe = cv2.imread(self._valid_right_image_paths[-1])
|
|
self._right_image_size = (
|
|
(probe.shape[1], probe.shape[0]) if probe is not None else (0, 0)
|
|
)
|
|
elif self._right_image_paths_all:
|
|
probe = cv2.imread(self._right_image_paths_all[0])
|
|
self._right_image_size = (
|
|
(probe.shape[1], probe.shape[0]) if probe is not None else (0, 0)
|
|
)
|
|
else:
|
|
self._right_image_size = (0, 0)
|
|
|
|
print(
|
|
f"Number of images with detected chessboards for right ({self.right_camera}): ",
|
|
len(self._chessboard_2d_right_points),
|
|
)
|
|
|
|
def detect_ellipse_center(self, image: np.ndarray) -> Tuple[float, float]:
|
|
"""Detect ellipse center in infrared image.
|
|
|
|
Parameters
|
|
----------
|
|
image : np.ndarray
|
|
infrared image
|
|
|
|
Returns
|
|
-------
|
|
Tuple[float, float]
|
|
(cx, cy) center coordinates of detected ellipse, or None if not found
|
|
"""
|
|
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
|
|
|
# --- Step 1: CLAHE for contrast enhancement ---
|
|
clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
|
|
enhanced = clahe.apply(gray)
|
|
|
|
# --- Step 2: Smooth to remove small noise ---
|
|
blurred = cv2.GaussianBlur(enhanced, (5, 5), 0)
|
|
|
|
# --- Step 3: Binarization (Adaptive or Otsu) ---
|
|
_, binary = cv2.threshold(blurred, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
|
|
# If the circle is dark instead of bright, invert
|
|
white_ratio = np.sum(binary == 255) / binary.size
|
|
if white_ratio > 0.5:
|
|
binary = cv2.bitwise_not(binary)
|
|
|
|
# --- Step 4: Morphological close to fill small holes ---
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (9, 9))
|
|
closed = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel)
|
|
|
|
# --- Step 5: Contour detection ---
|
|
contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
if not contours:
|
|
return None
|
|
|
|
# --- Step 6: Filter for circle-like contours ---
|
|
valid_contours = []
|
|
for cnt in contours:
|
|
area = cv2.contourArea(cnt)
|
|
if area < 100: # Skip tiny noise
|
|
continue
|
|
x, y, w, h = cv2.boundingRect(cnt)
|
|
aspect_ratio = w / h
|
|
if 0.75 < aspect_ratio < 1.25: # roughly circular
|
|
valid_contours.append(cnt)
|
|
|
|
if not valid_contours:
|
|
return None
|
|
|
|
# --- Step 7: Choose largest circular contour ---
|
|
best_contour = max(valid_contours, key=cv2.contourArea)
|
|
|
|
# --- Step 8: Fit ellipse ---
|
|
if len(best_contour) < 5:
|
|
return None
|
|
|
|
ellipse = cv2.fitEllipse(best_contour)
|
|
(cx, cy), (major, minor), angle = ellipse
|
|
|
|
return (cx, cy), ellipse
|
|
|
|
def plot_ellipse(self, image: np.ndarray, ellipse: Tuple) -> np.ndarray:
|
|
"""Plot ellipse on the image.
|
|
|
|
Parameters
|
|
----------
|
|
image : np.ndarray
|
|
image
|
|
ellipse : Tuple
|
|
ellipse parameters from cv2.fitEllipse
|
|
"""
|
|
(cx, cy), (major, minor), angle = ellipse
|
|
result = image.copy()
|
|
cv2.ellipse(result, ellipse, (0, 255, 0), 2)
|
|
cv2.circle(result, (int(cx), int(cy)), 5, (0, 0, 255), -1)
|
|
cv2.putText(result, f"({int(cx)}, {int(cy)})", (int(cx)+10, int(cy)-10),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
|
|
return result
|
|
|
|
def create_ellipse_points_left(self) -> None:
|
|
"""Create calibration points for left infrared images using ellipse detection."""
|
|
|
|
last_loaded_shape = None
|
|
paths = self._left_image_paths
|
|
n_total = len(paths)
|
|
detected = 0
|
|
with tqdm(paths, unit="img", dynamic_ncols=True, smoothing=0.05) as pbar:
|
|
for idx, left_image_path in enumerate(pbar, start=1):
|
|
left_image = cv2.imread(left_image_path)
|
|
|
|
left_name = left_image_path.split("/")[-1]
|
|
|
|
if left_image is None:
|
|
tqdm.write(f"Warning: Failed to load image {left_name}. Skipping.")
|
|
self._refresh_detection_progress_bar(pbar, idx, n_total, detected)
|
|
continue
|
|
last_loaded_shape = left_image.shape
|
|
|
|
ellipse_result = self.detect_ellipse_center(left_image)
|
|
|
|
if ellipse_result is not None:
|
|
detected += 1
|
|
(cx, cy), ellipse = ellipse_result
|
|
ellipse_center = np.array([[[cx, cy]]], dtype=np.float32)
|
|
|
|
chessboard_3d = np.array([[0.0, 0.0, 0.0]], dtype=np.float32)
|
|
|
|
self._chessboard_3d_left_points.append(chessboard_3d)
|
|
self._chessboard_2d_left_points.append(ellipse_center)
|
|
self._valid_left_image_paths.append(left_image_path)
|
|
|
|
if self.corners_path is not None:
|
|
left_image = self.plot_ellipse(left_image, ellipse)
|
|
left_image_name = os.path.basename(left_image_path)
|
|
left_image_path_out = os.path.join(
|
|
self.corners_path, self._left_subdir, left_image_name
|
|
)
|
|
os.makedirs(os.path.dirname(left_image_path_out), exist_ok=True)
|
|
cv2.imwrite(left_image_path_out, left_image)
|
|
|
|
self._refresh_detection_progress_bar(pbar, idx, n_total, detected)
|
|
|
|
if last_loaded_shape is not None:
|
|
self._left_image_size = (last_loaded_shape[1], last_loaded_shape[0])
|
|
elif self._left_image_paths:
|
|
probe = cv2.imread(self._left_image_paths[0])
|
|
self._left_image_size = (
|
|
(probe.shape[1], probe.shape[0]) if probe is not None else (0, 0)
|
|
)
|
|
else:
|
|
self._left_image_size = (0, 0)
|
|
print("Number of images with detected ellipses for LC: ", len(self._chessboard_2d_left_points))
|
|
|
|
def create_ellipse_points_right(self) -> None:
|
|
"""Create calibration points for right infrared images using ellipse detection."""
|
|
|
|
last_loaded_shape = None
|
|
paths = self._right_image_paths_all
|
|
n_total = len(paths)
|
|
detected = 0
|
|
with tqdm(paths, unit="img", dynamic_ncols=True, smoothing=0.05) as pbar:
|
|
for idx, right_image_path in enumerate(pbar, start=1):
|
|
right_image = cv2.imread(right_image_path)
|
|
|
|
right_name = right_image_path.split("/")[-1]
|
|
if right_image is None:
|
|
tqdm.write(f"Warning: Failed to load image {right_name}. Skipping.")
|
|
self._refresh_detection_progress_bar(pbar, idx, n_total, detected)
|
|
continue
|
|
last_loaded_shape = right_image.shape
|
|
|
|
ellipse_result = self.detect_ellipse_center(right_image)
|
|
|
|
if ellipse_result is not None:
|
|
detected += 1
|
|
(cx, cy), ellipse = ellipse_result
|
|
ellipse_center = np.array([[[cx, cy]]], dtype=np.float32)
|
|
|
|
chessboard_3d = np.array([[0.0, 0.0, 0.0]], dtype=np.float32)
|
|
|
|
self._chessboard_3d_right_points.append(chessboard_3d)
|
|
self._chessboard_2d_right_points.append(ellipse_center)
|
|
self._valid_right_image_paths.append(right_image_path)
|
|
|
|
if self.corners_path is not None:
|
|
right_image = self.plot_ellipse(right_image, ellipse)
|
|
right_image_name = os.path.basename(right_image_path)
|
|
right_image_path_out = os.path.join(
|
|
self.corners_path, self._right_subdir, right_image_name
|
|
)
|
|
os.makedirs(os.path.dirname(right_image_path_out), exist_ok=True)
|
|
cv2.imwrite(right_image_path_out, right_image)
|
|
|
|
self._refresh_detection_progress_bar(pbar, idx, n_total, detected)
|
|
|
|
if last_loaded_shape is not None:
|
|
self._right_image_size = (last_loaded_shape[1], last_loaded_shape[0])
|
|
elif self._right_image_paths_all:
|
|
probe = cv2.imread(self._right_image_paths_all[0])
|
|
self._right_image_size = (
|
|
(probe.shape[1], probe.shape[0]) if probe is not None else (0, 0)
|
|
)
|
|
else:
|
|
self._right_image_size = (0, 0)
|
|
|
|
print(
|
|
f"Number of images with detected ellipses for right ({self.right_camera}): ",
|
|
len(self._chessboard_2d_right_points),
|
|
)
|
|
|
|
def _dedupe_left_one_per_scan_for_ir(self) -> None:
|
|
"""With IR_scan_NNN (one IR per scan), keep one LC frame per scan ID."""
|
|
if self.right_camera != "ir":
|
|
return
|
|
packed = list(
|
|
zip(
|
|
self._valid_left_image_paths,
|
|
self._chessboard_2d_left_points,
|
|
self._chessboard_3d_left_points,
|
|
)
|
|
)
|
|
packed.sort(key=lambda x: x[0])
|
|
seen = set()
|
|
kept = []
|
|
for path, p2d, p3d in packed:
|
|
key = self.extract_key_lc(path)
|
|
if key is None:
|
|
continue
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
kept.append((path, p2d, p3d))
|
|
if not kept:
|
|
return
|
|
self._valid_left_image_paths = [x[0] for x in kept]
|
|
self._chessboard_2d_left_points = [x[1] for x in kept]
|
|
self._chessboard_3d_left_points = [x[2] for x in kept]
|
|
|
|
def build_pairs_cal(self) -> None:
|
|
"""
|
|
Build pairs of LC ↔ (RC | RGB | IR) images based on filenames,
|
|
and align their corresponding chessboard points.
|
|
"""
|
|
self._dedupe_left_one_per_scan_for_ir()
|
|
warned_stereo_board_model = False
|
|
# --- Build lookup for RIGHT images ---
|
|
right_lookup = {}
|
|
for j, path in enumerate(self._valid_right_image_paths):
|
|
key = self.extract_key_other(path) # use @staticmethod via self
|
|
if key is not None:
|
|
right_lookup[key] = (path, j)
|
|
|
|
# --- Match LC against RIGHT ---
|
|
for i, left_path in enumerate(self._valid_left_image_paths):
|
|
key = self.extract_key_lc(left_path) # use @staticmethod via self
|
|
if key is None:
|
|
continue
|
|
|
|
if key in right_lookup:
|
|
right_path, j = right_lookup[key]
|
|
|
|
left_corners = self._chessboard_2d_left_points[i]
|
|
right_corners = self._chessboard_2d_right_points[j]
|
|
if left_corners.shape != right_corners.shape:
|
|
tqdm.write(
|
|
f"Skipping pair {key}: corner count mismatch "
|
|
f"({self.left_camera} {left_corners.shape[0]} vs "
|
|
f"{self.right_camera} {right_corners.shape[0]}). "
|
|
"Use the same board geometry for stereo pairs."
|
|
)
|
|
continue
|
|
if not warned_stereo_board_model and (
|
|
self.chessboard_size_left != self.chessboard_size_right
|
|
or self.square_size_left != self.square_size_right
|
|
):
|
|
tqdm.write(
|
|
"Per-camera board sizes differ; stereo object points use left: "
|
|
f"{self.chessboard_size_left} @ {self.square_size_left}m."
|
|
)
|
|
warned_stereo_board_model = True
|
|
|
|
# --- Append image paths ---
|
|
self._left_image_paths_pairs.append(left_path)
|
|
self._right_image_paths_all_pairs.append(right_path)
|
|
|
|
# --- Append chessboard 2D points ---
|
|
self._chessboard_2d_left_points_pairs.append(left_corners)
|
|
self._chessboard_2d_right_points_pairs.append(right_corners)
|
|
|
|
# --- Append chessboard 3D points (stereo requires one object model) ---
|
|
pts3d = self.create_3d_chessboard_points(
|
|
self.chessboard_size_left, self.square_size_left
|
|
)
|
|
self._chessboard_3d_left_points_pairs.append(pts3d)
|
|
self._chessboard_3d_right_points_pairs.append(pts3d)
|
|
|
|
print(f"Found {len(self._left_image_paths_pairs)} common image pairs with valid chessboard corners.")
|
|
|
|
|
|
|
|
|
|
def build_pairs_rec(self) -> None:
|
|
def strip_prefix(filename):
|
|
# Assuming the prefix is always the first two characters (like "lc" or "rc")
|
|
return filename[2:] # Remove the first two characters
|
|
|
|
|
|
valid_left_images = set([strip_prefix(path.split('/')[-1]) for path in self._left_image_paths])
|
|
valid_right_images = set([strip_prefix(path.split('/')[-1]) for path in self._right_image_paths_all])
|
|
common_images = valid_left_images.intersection(valid_right_images)
|
|
# Filter for common image pairs and corresponding points
|
|
for i, left_path in enumerate(self._left_image_paths):
|
|
left_name = strip_prefix(left_path.split('/')[-1])
|
|
if left_name in common_images:
|
|
# Find the corresponding right image and its index
|
|
for j, right_path in enumerate(self._right_image_paths_all):
|
|
right_name = strip_prefix(right_path.split('/')[-1])
|
|
if left_name == right_name:
|
|
# Append the common left and right paths and points
|
|
self._left_image_paths_pairs.append(left_path)
|
|
self._right_image_paths_all_pairs.append(right_path)
|
|
break
|
|
|
|
# Print or return the number of common image pairs and points
|
|
print(f"Found {len(self._left_image_paths_pairs)} common image pairs.")
|
|
|
|
left_image_paths = set([path for path in self._left_image_paths])
|
|
right_image_paths = set(self._valid_right_image_paths)
|
|
left_image_paths_pairs = set([path for path in self._left_image_paths_pairs])
|
|
right_image_paths_pairs = set([path for path in self._right_image_paths_all_pairs])
|
|
unmatched_left_paths = left_image_paths - left_image_paths_pairs
|
|
unmatched_right_paths = right_image_paths - right_image_paths_pairs
|
|
for path in unmatched_left_paths:
|
|
if self.ncb_path is not None:
|
|
left_image_path = os.path.join(
|
|
self.ncb_path, self._left_subdir, os.path.basename(path)
|
|
)
|
|
os.makedirs(os.path.dirname(left_image_path), exist_ok=True)
|
|
shutil.copy(path, left_image_path)
|
|
|
|
|
|
|
|
for path in unmatched_right_paths:
|
|
if self.ncb_path is not None:
|
|
right_image_path = os.path.join(
|
|
self.ncb_path, self._right_subdir, os.path.basename(path)
|
|
)
|
|
os.makedirs(os.path.dirname(right_image_path), exist_ok=True)
|
|
shutil.copy(path, right_image_path)
|
|
|
|
def create_chessboard_points_pairs(self) -> Tuple[np.ndarray, np.ndarray]:
|
|
"""Create 3D and 2D chessboard points for left and right images."""
|
|
|
|
# Match left/right images by filename after stripping camera prefixes
|
|
left_dict = {
|
|
self._strip_left_filename_prefix(os.path.basename(p)): p
|
|
for p in self._left_image_paths
|
|
}
|
|
right_dict = {os.path.basename(p).replace("rc", ""): p for p in self._right_image_paths_all}
|
|
# Find common keys by matching filenames without prefixes
|
|
common_keys = set(left_dict.keys()) & set(right_dict.keys())
|
|
keys_sorted = sorted(common_keys, key=str)
|
|
n_total = len(keys_sorted)
|
|
detected = 0
|
|
last_left_shape = None
|
|
last_right_shape = None
|
|
|
|
with tqdm(keys_sorted, unit="pair", dynamic_ncols=True, smoothing=0.05) as pbar:
|
|
for idx, key in enumerate(pbar, start=1):
|
|
left_name = left_dict[key]
|
|
right_name = right_dict[key]
|
|
|
|
left_image = cv2.imread(left_name)
|
|
right_image = cv2.imread(right_name)
|
|
|
|
if left_image is None or right_image is None:
|
|
tqdm.write(
|
|
f"Warning: Failed to load images {left_name} or {right_name}. Skipping this pair."
|
|
)
|
|
self._refresh_detection_progress_bar(pbar, idx, n_total, detected)
|
|
continue
|
|
|
|
last_left_shape = left_image.shape
|
|
last_right_shape = right_image.shape
|
|
|
|
left_proc = self._preprocess_for_detection(left_image)
|
|
right_proc = self._preprocess_for_detection(right_image)
|
|
left_corners = self.create_2d_chessboard_points(
|
|
left_proc, self.chessboard_size_left
|
|
)
|
|
right_corners = self.create_2d_chessboard_points(
|
|
right_proc, self.chessboard_size_right
|
|
)
|
|
|
|
if left_corners is not None and right_corners is not None:
|
|
if left_corners.shape != right_corners.shape:
|
|
tqdm.write(
|
|
f"Skipping pair {key}: corner count mismatch "
|
|
f"({self.left_camera} {left_corners.shape[0]} vs "
|
|
f"{self.right_camera} {right_corners.shape[0]})."
|
|
)
|
|
self._refresh_detection_progress_bar(
|
|
pbar, idx, n_total, detected
|
|
)
|
|
continue
|
|
detected += 1
|
|
second_elements_array1 = left_corners[:, :, 1]
|
|
second_elements_array2 = right_corners[:, :, 1]
|
|
|
|
difference = second_elements_array1 - second_elements_array2
|
|
|
|
updated_second_elements_array2 = second_elements_array2 + difference
|
|
|
|
right_corners2 = np.copy(right_corners)
|
|
|
|
right_corners2[:, :, 1] = updated_second_elements_array2
|
|
|
|
pts3d = self.create_3d_chessboard_points(
|
|
self.chessboard_size_left, self.square_size_left
|
|
)
|
|
self._chessboard_3d_left_points_pairs.append(pts3d)
|
|
self._chessboard_2d_left_points_pairs.append(left_corners)
|
|
self._chessboard_3d_right_points_pairs.append(pts3d)
|
|
self._chessboard_2d_right_points_pairs.append(right_corners)
|
|
if self.corners_path is not None:
|
|
left_image = self.plot_chessboard(
|
|
left_image, left_corners, self.chessboard_size_left
|
|
)
|
|
|
|
right_image = self.plot_chessboard(
|
|
right_image, right_corners, self.chessboard_size_right
|
|
)
|
|
right_image = self.plot_chessboard(
|
|
right_image, right_corners2, self.chessboard_size_right
|
|
)
|
|
|
|
right_image_name = os.path.basename(right_name)
|
|
right_image_path = os.path.join(
|
|
self.corners_path, self._right_subdir, right_image_name
|
|
)
|
|
os.makedirs(os.path.dirname(right_image_path), exist_ok=True)
|
|
cv2.imwrite(right_image_path, right_image)
|
|
|
|
left_image_name = os.path.basename(left_name)
|
|
left_image_path = os.path.join(
|
|
self.corners_path, self._left_subdir, left_image_name
|
|
)
|
|
os.makedirs(os.path.dirname(left_image_path), exist_ok=True)
|
|
cv2.imwrite(left_image_path, left_image)
|
|
else:
|
|
if self.ncb_path is not None:
|
|
left_image_path = os.path.join(
|
|
self.ncb_path, self._left_subdir, os.path.basename(left_name)
|
|
)
|
|
right_image_path = os.path.join(
|
|
self.ncb_path, self._right_subdir, os.path.basename(right_name)
|
|
)
|
|
os.makedirs(os.path.dirname(left_image_path), exist_ok=True)
|
|
os.makedirs(os.path.dirname(right_image_path), exist_ok=True)
|
|
tqdm.write(
|
|
f"Chessboard not detected in both {left_name} and {right_name}. "
|
|
f"Moving to {os.path.dirname(left_image_path)} and {os.path.dirname(right_image_path)}."
|
|
)
|
|
shutil.copy(left_name, left_image_path)
|
|
shutil.copy(right_name, right_image_path)
|
|
|
|
self._refresh_detection_progress_bar(pbar, idx, n_total, detected)
|
|
|
|
if last_left_shape is not None and last_right_shape is not None:
|
|
self._left_image_size = (last_left_shape[1], last_left_shape[0])
|
|
self._right_image_size = (last_right_shape[1], last_right_shape[0])
|
|
else:
|
|
self._left_image_size = (0, 0)
|
|
self._right_image_size = (0, 0)
|
|
unmatched_left = set(left_dict.keys()) - common_keys
|
|
unmatched_right = set(right_dict.keys()) - common_keys
|
|
|
|
for key in unmatched_left:
|
|
if self.ncb_path is not None:
|
|
left_image_path = os.path.join(
|
|
self.ncb_path, self._left_subdir, os.path.basename(left_dict[key])
|
|
)
|
|
os.makedirs(os.path.dirname(left_image_path), exist_ok=True)
|
|
shutil.copy(left_dict[key], left_image_path)
|
|
|
|
|
|
|
|
for key in unmatched_right:
|
|
if self.ncb_path is not None:
|
|
right_image_path = os.path.join(
|
|
self.ncb_path, self._right_subdir, os.path.basename(right_dict[key])
|
|
)
|
|
os.makedirs(os.path.dirname(right_image_path), exist_ok=True)
|
|
shutil.copy(right_dict[key], right_image_path)
|
|
|
|
self._left_image_names = os.listdir(self._left_image_path)
|
|
self._right_image_names = os.listdir(self._right_image_path)
|
|
self._left_image_names = [
|
|
name for name in self._left_image_names if name.endswith((".bmp", '.png', '.jpg'))
|
|
]
|
|
self._right_image_names = [
|
|
name for name in self._right_image_names if name.endswith((".bmp", '.png', '.jpg'))
|
|
]
|
|
self._left_image_names.sort()
|
|
self._right_image_names.sort()
|
|
self._left_image_paths_pairs = [
|
|
os.path.join(self._left_image_path, name) for name in self._left_image_names
|
|
]
|
|
self._right_image_paths_all_pairs = [
|
|
os.path.join(self._right_image_path, name)
|
|
for name in self._right_image_names
|
|
]
|
|
|
|
print("Number of image pairs: ", len(self._chessboard_2d_left_points_pairs))
|
|
return self._chessboard_2d_left_points_pairs, self._chessboard_2d_right_points_pairs
|
|
|
|
def calibrate(self) -> dict:
|
|
"""Calibrate stereo camera.
|
|
|
|
The calibration process consists of the following steps:
|
|
1. Calibrate left camera
|
|
2. Calibrate right camera
|
|
3. Stereo calibration
|
|
|
|
|
|
Returns
|
|
-------
|
|
dict_keys(['Transformation', 'Essential', 'Fundamental', 'MeanError', 'SquareSize', 'BoardSize', 'Objpoints', 'L_Intrinsic', 'L_Distortion', 'L_DistortionROI', 'L_DistortionIntrinsic', 'L_RotVektor', 'L_RotMatrix', 'L_Extrinsics', 'L_TransVektor', 'L_Errors', 'L_MeanError', 'R_Intrinsic', 'R_Distortion', 'R_DistortionROI', 'R_DistortionIntrinsic', 'R_RotVektor', 'R_RotMatrix', 'R_Extrinsics', 'R_TransVektor', 'R_Errors', 'R_MeanError', 'L_Imgpoints', 'R_Imgpoints'])
|
|
"""
|
|
if len(self._left_image_paths_pairs) == 0:
|
|
raise RuntimeError(
|
|
"No stereo pairs with detected chessboard corners. "
|
|
"Check --chessboard_size (or per-camera sizes), preprocessing, and that the board is visible in both cameras. "
|
|
f"For {self.left_camera}+IR, left files like scan000001_* pair with "
|
|
"IR_scan_000001.png by scan id; only one left frame per scan is used "
|
|
"when multiple sequences exist."
|
|
)
|
|
def CalibrateCamera(image_size, chessboard_2d_points, chessboard_3d_points):
|
|
CameraParams = {}
|
|
flags = 0
|
|
(ret, mtx, dist, rvecs, tvecs) = cv2.calibrateCamera(chessboard_3d_points, chessboard_2d_points, image_size, None, None, flags=flags)
|
|
|
|
Rmtx = []; Tmtx = []; k = 0
|
|
for r in rvecs:
|
|
Rmtx.append(cv2.Rodrigues(r)[0])
|
|
Tmtx.append(np.vstack((np.hstack((Rmtx[k],tvecs[k])),np.array([0,0,0,1]))))
|
|
k += 1
|
|
|
|
newmtx, roi = cv2.getOptimalNewCameraMatrix(mtx,dist,image_size,1,image_size)
|
|
|
|
if np.sum(roi) == 0:
|
|
roi = (0,0,image_size[0]-1,image_size[1]-1)
|
|
|
|
CameraParams['Intrinsic'] = mtx
|
|
CameraParams['Distortion'] = dist
|
|
CameraParams['DistortionROI'] = roi
|
|
CameraParams['DistortionIntrinsic'] = newmtx
|
|
CameraParams['RotVektor'] = rvecs
|
|
CameraParams['RotMatrix'] = Rmtx
|
|
CameraParams['Extrinsics'] = Tmtx
|
|
CameraParams['TransVektor'] = tvecs
|
|
|
|
return CameraParams
|
|
|
|
def CalculateErrors(
|
|
params, chessboard_2d_points, chessboard_size, square_size
|
|
):
|
|
imgp = np.array(chessboard_2d_points)
|
|
imgp = imgp.reshape((imgp.shape[0], imgp.shape[1], imgp.shape[3]))
|
|
objp = np.array(
|
|
self.create_3d_chessboard_points(chessboard_size, square_size)
|
|
)
|
|
K = np.array(params['Intrinsic'])
|
|
D = np.array(params['Distortion'])
|
|
R = np.array(params['RotVektor'])
|
|
T = np.array(params['TransVektor'])
|
|
N = imgp.shape[0]
|
|
|
|
imgpNew = []
|
|
for i in range(N):
|
|
temp, _ = cv2.projectPoints(objp, R[i], T[i], K, D)
|
|
imgpNew.append(temp.reshape((temp.shape[0], temp.shape[2])))
|
|
imgpNew = np.array(imgpNew)
|
|
|
|
err = []
|
|
for i in range(N):
|
|
err.append(imgp[i] - imgpNew[i])
|
|
err = np.array(err)
|
|
|
|
def RMSE(err):
|
|
return np.sqrt(np.mean(np.sum(err**2, axis=1)))
|
|
|
|
errall = np.copy(err[0])
|
|
rmsePerView = [RMSE(err[0])]
|
|
for i in range(1,N):
|
|
errall = np.vstack((errall, err[i]))
|
|
rmsePerView.append(RMSE(err[i]))
|
|
|
|
rmseAll = RMSE(errall)
|
|
return rmsePerView, rmseAll
|
|
|
|
# left camera calibration
|
|
print("Calibrating left camera...")
|
|
if self._chessboard_2d_left_points == [] :
|
|
Left_Params = CalibrateCamera(self._left_image_size, self._chessboard_2d_left_points_pairs, self._chessboard_3d_left_points_pairs)
|
|
Left_Errors, Left_MeanError = CalculateErrors(
|
|
Left_Params,
|
|
self._chessboard_2d_left_points_pairs,
|
|
self.chessboard_size_left,
|
|
self.square_size_left,
|
|
)
|
|
Left_Params['Imgpoints'] = self._chessboard_2d_left_points_pairs
|
|
else :
|
|
Left_Params = CalibrateCamera(self._left_image_size, self._chessboard_2d_left_points, self._chessboard_3d_left_points)
|
|
Left_Errors, Left_MeanError = CalculateErrors(
|
|
Left_Params,
|
|
self._chessboard_2d_left_points,
|
|
self.chessboard_size_left,
|
|
self.square_size_left,
|
|
)
|
|
Left_Params['Imgpoints'] = self._chessboard_2d_left_points
|
|
np.set_printoptions(suppress=True, precision=5)
|
|
print('Intrinsic Matrix:')
|
|
print(Left_Params['Intrinsic'])
|
|
print('\nDistortion Parameters:')
|
|
print(Left_Params['Distortion'])
|
|
print('\nExtrinsic Matrix from 1.Image:')
|
|
print(Left_Params['Extrinsics'][0])
|
|
print('Reprojection Error Left: {:.4f}'.format(Left_MeanError))
|
|
|
|
|
|
# right camera calibration
|
|
print("Calibrating right camera...")
|
|
if self._chessboard_2d_right_points == [] :
|
|
Right_Params = CalibrateCamera(self._right_image_size, self._chessboard_2d_right_points_pairs, self._chessboard_3d_right_points_pairs)
|
|
Right_Errors, Right_MeanError = CalculateErrors(
|
|
Right_Params,
|
|
self._chessboard_2d_right_points_pairs,
|
|
self.chessboard_size_right,
|
|
self.square_size_right,
|
|
)
|
|
Right_Params['Imgpoints'] = self._chessboard_2d_right_points_pairs
|
|
else :
|
|
Right_Params = CalibrateCamera(self._right_image_size, self._chessboard_2d_right_points, self._chessboard_3d_right_points)
|
|
Right_Errors, Right_MeanError = CalculateErrors(
|
|
Right_Params,
|
|
self._chessboard_2d_right_points,
|
|
self.chessboard_size_right,
|
|
self.square_size_right,
|
|
)
|
|
Right_Params['Imgpoints'] = self._chessboard_2d_right_points
|
|
np.set_printoptions(suppress=True, precision=5)
|
|
print('Intrinsic Matrix:')
|
|
print(Right_Params['Intrinsic'])
|
|
print('\nDistortion Parameters:')
|
|
print(Right_Params['Distortion'])
|
|
print('\nExtrinsic Matrix from 1.Image:')
|
|
print(Right_Params['Extrinsics'][0])
|
|
print('Reprojection Error Right: {:.4f}'.format(Right_MeanError))
|
|
|
|
|
|
Left_Params['Errors'] = Left_Errors
|
|
Left_Params['MeanError'] = Left_MeanError
|
|
|
|
Right_Params['Errors'] = Right_Errors
|
|
Right_Params['MeanError'] = Right_MeanError
|
|
|
|
|
|
# --- stereo calibration ---
|
|
print("Calibrating stereo camera...")
|
|
|
|
flags = cv2.CALIB_FIX_INTRINSIC
|
|
criteria = (cv2.TERM_CRITERIA_MAX_ITER + cv2.TERM_CRITERIA_EPS, 30, 0.001)
|
|
|
|
(
|
|
ret_stereo,
|
|
newcameramtx_left,
|
|
dist_left,
|
|
newcameramtx_right,
|
|
dist_right,
|
|
rot,
|
|
trans,
|
|
essential,
|
|
fundamental,
|
|
) = cv2.stereoCalibrate(
|
|
self._chessboard_3d_left_points_pairs,
|
|
self._chessboard_2d_left_points_pairs,
|
|
self._chessboard_2d_right_points_pairs,
|
|
Left_Params['Intrinsic'],
|
|
Left_Params['Distortion'],
|
|
Right_Params['Intrinsic'],
|
|
Right_Params['Distortion'],
|
|
self._left_image_size,
|
|
criteria=criteria,
|
|
flags=flags,
|
|
)
|
|
|
|
T = np.vstack((np.hstack((rot, trans)), np.array([0, 0, 0, 1])))
|
|
|
|
print("\n--- Stereo Calibration Results ---")
|
|
print("Translation vector:\n", trans)
|
|
print("Rotation matrix:\n", rot)
|
|
print("Mean reprojection error:", ret_stereo)
|
|
|
|
# --- Compute Q ---
|
|
print("\nComputing stereo rectification + Q matrix...")
|
|
|
|
R1, R2, P1, P2, Q, roi1, roi2 = cv2.stereoRectify(
|
|
Left_Params['Intrinsic'],
|
|
Left_Params['Distortion'],
|
|
Right_Params['Intrinsic'],
|
|
Right_Params['Distortion'],
|
|
self._left_image_size,
|
|
rot,
|
|
trans,
|
|
flags=0,
|
|
alpha=1
|
|
)
|
|
|
|
# ensure clean numpy array
|
|
self._Q = np.array(Q, dtype=np.float64)
|
|
|
|
print("\n--- Q MATRIX ---")
|
|
print(self._Q)
|
|
print("Q shape:", self._Q.shape)
|
|
print("Q dtype:", self._Q.dtype)
|
|
|
|
# --- sanity checks ---
|
|
print("\n--- Q sanity check ---")
|
|
print("Q[2,3] (focal length term):", self._Q[2, 3])
|
|
print("Q[3,2] (baseline term):", self._Q[3, 2])
|
|
|
|
if self._Q.shape != (4, 4):
|
|
raise RuntimeError("[ERROR] Q is not 4x4!")
|
|
|
|
if self._Q[3, 2] == 0:
|
|
print("[WARNING] Baseline is ZERO → depth will fail!")
|
|
|
|
# --- store stereo params ---
|
|
Parameters = {
|
|
'Translation': trans,
|
|
'Rotation': rot,
|
|
'Transformation': T,
|
|
'Essential': essential,
|
|
'Fundamental': fundamental,
|
|
'MeanError': ret_stereo,
|
|
'SquareSize': self.square_size_left,
|
|
'BoardSize': self.chessboard_size_left,
|
|
'L_SquareSize': self.square_size_left,
|
|
'R_SquareSize': self.square_size_right,
|
|
'L_BoardSize': self.chessboard_size_left,
|
|
'R_BoardSize': self.chessboard_size_right,
|
|
'Objpoints': self.create_3d_chessboard_points(
|
|
self.chessboard_size_left, self.square_size_left
|
|
),
|
|
'Q': self._Q # ✅ store Q in parameters (for NPZ)
|
|
}
|
|
|
|
# add left params
|
|
for key, value in Left_Params.items():
|
|
Parameters[f"L_{key}"] = value
|
|
|
|
# add right params
|
|
for key, value in Right_Params.items():
|
|
Parameters[f"R_{key}"] = value
|
|
|
|
self._Parameters = Parameters
|
|
|
|
return Parameters
|
|
|
|
|
|
def save_stereo_calibration(self) -> None:
|
|
print("[DEBUG] save_stereo_calibration() CALLED")
|
|
"""Save stereo calibration to pair-specific NPZ/YAML/Q (including Q)."""
|
|
print("\nSaving stereo calibration...")
|
|
if not hasattr(self, "_Q") or self._Q is None:
|
|
raise RuntimeError("[ERROR] Q missing! Run calibration first.")
|
|
# ensure Q is clean
|
|
Q_clean = np.array(self._Q, dtype=np.float64)
|
|
self._Parameters['Q'] = Q_clean
|
|
|
|
pair_tag = f"{self.left_camera}-{self.right_camera}"
|
|
|
|
# --- Save pair-specific NPZ ---
|
|
npz_path = os.path.join(self.input_path, "params", f"{pair_tag}_parameters.npz")
|
|
os.makedirs(os.path.dirname(npz_path), exist_ok=True)
|
|
np.savez(npz_path, **self._Parameters)
|
|
print(f"[INFO] Saved NPZ → {npz_path}")
|
|
|
|
# --- Save pair-specific YAML ---
|
|
yaml_path = os.path.join(self.input_path, "params", f"{pair_tag}_stereo_cam_model.yaml")
|
|
fs = cv2.FileStorage(yaml_path, cv2.FILE_STORAGE_WRITE)
|
|
fs.write("L_DistortionIntrinsic", self._Parameters['L_DistortionIntrinsic'])
|
|
fs.write("L_Intrinsic", self._Parameters['L_Intrinsic'])
|
|
fs.write("L_Distortion", self._Parameters['L_Distortion'])
|
|
fs.write("R_DistortionIntrinsic", self._Parameters['R_DistortionIntrinsic'])
|
|
fs.write("R_Intrinsic", self._Parameters['R_Intrinsic'])
|
|
fs.write("R_Distortion", self._Parameters['R_Distortion'])
|
|
fs.write("Rotation", self._Parameters['Transformation'][:3, :3])
|
|
fs.write("Translation", self._Parameters['Transformation'][:3, 3:])
|
|
# --- Save Q (FIXED) ---
|
|
fs.write("Q", Q_clean)
|
|
print("[INFO] Q written to YAML")
|
|
fs.release()
|
|
print(f"[INFO] Saved YAML → {yaml_path}")
|
|
|
|
# --- Save pair-specific Q-matrix for use in disparity ---
|
|
cvstore_path = os.path.join(os.path.dirname(npz_path), f"{pair_tag}_Q.cvstore")
|
|
fs2 = cv2.FileStorage(cvstore_path, cv2.FILE_STORAGE_WRITE)
|
|
fs2.write("Q", Q_clean)
|
|
fs2.release()
|
|
print(f"[INFO] Saved Q matrix separately → {cvstore_path}")
|
|
|
|
|
|
def read_stereo_calibration_yaml(self, yaml_path: str) -> None:
|
|
"""Read stereo calibration from YAML file safely."""
|
|
|
|
print("\nReading stereo calibration YAML...")
|
|
|
|
fs = cv2.FileStorage(yaml_path, cv2.FILE_STORAGE_READ)
|
|
|
|
self._Parameters = {}
|
|
|
|
for key in fs.root().keys():
|
|
node = fs.getNode(key)
|
|
|
|
if node.empty():
|
|
print(f"[WARNING] Empty node: {key}")
|
|
continue
|
|
|
|
mat = node.mat()
|
|
|
|
if mat is None:
|
|
print(f"[WARNING] Could not read: {key}")
|
|
continue
|
|
|
|
self._Parameters[key] = mat
|
|
print(f"[INFO] Loaded: {key}, shape={mat.shape}")
|
|
|
|
fs.release()
|
|
|
|
# --- Check Q ---
|
|
if "Q" in self._Parameters:
|
|
Q = self._Parameters["Q"]
|
|
|
|
print("\n--- Loaded Q MATRIX ---")
|
|
print(Q)
|
|
print("Shape:", Q.shape)
|
|
print("dtype:", Q.dtype)
|
|
|
|
print("\n--- Q sanity check ---")
|
|
print("Q[2,3]:", Q[2, 3])
|
|
print("Q[3,2]:", Q[3, 2])
|
|
|
|
if Q.shape != (4, 4):
|
|
print("[ERROR] Q is not 4x4!")
|
|
|
|
if Q[3, 2] == 0:
|
|
print("[ERROR] Baseline is zero → INVALID Q")
|
|
|
|
else:
|
|
print("[ERROR] Q NOT FOUND in YAML!")
|
|
|
|
def read_stereo_calibration_npz(self, npz_path: str) -> None:
|
|
"""Read stereo calibration from npz file.
|
|
|
|
Parameters
|
|
----------
|
|
npz_path : str
|
|
path to npz file
|
|
|
|
"""
|
|
|
|
print("Reading stereo calibration...")
|
|
self._Parameters = dict(np.load(npz_path))
|
|
print(self._Parameters.keys())
|
|
|
|
def rectify_images(
|
|
self, left_image: np.ndarray, right_image: np.ndarray, left_image_size, right_image_size
|
|
) -> Tuple[np.ndarray, np.ndarray]:
|
|
"""Rectify left and right images using stereo mapping.
|
|
|
|
Parameters
|
|
----------
|
|
left_image : np.ndarray
|
|
left image
|
|
right_image : np.ndarray
|
|
right image
|
|
|
|
Returns
|
|
-------
|
|
Tuple[np.ndarray, np.ndarray]
|
|
left_rectified, right_rectified
|
|
"""
|
|
|
|
# stereo rectification
|
|
print("Rectifying stereo camera...")
|
|
rect_left, rect_right, proj_left, proj_right, Q, roi_left, roi_right = (
|
|
cv2.stereoRectify(
|
|
self._Parameters['L_Intrinsic'],
|
|
self._Parameters['L_Distortion'],
|
|
self._Parameters['R_Intrinsic'],
|
|
self._Parameters['R_Distortion'],
|
|
left_image_size,
|
|
self._Parameters['Rotation'],
|
|
self._Parameters['Translation'],
|
|
flags=0,
|
|
alpha=1
|
|
)
|
|
)
|
|
self._Q = Q
|
|
print ("roi_left: ", roi_left)
|
|
print ("roi_right: ", roi_right)
|
|
|
|
# stereo mapping
|
|
print("Mapping left and right cameras...")
|
|
stereo_map_left = cv2.initUndistortRectifyMap(
|
|
self._Parameters['L_Intrinsic'],
|
|
self._Parameters['L_Distortion'],
|
|
rect_left,
|
|
proj_left,
|
|
left_image_size,
|
|
cv2.CV_32FC1,
|
|
)
|
|
stereo_map_right = cv2.initUndistortRectifyMap(
|
|
self._Parameters['R_Intrinsic'],
|
|
self._Parameters['R_Distortion'],
|
|
rect_right,
|
|
proj_right,
|
|
right_image_size,
|
|
cv2.CV_32FC1,
|
|
)
|
|
|
|
|
|
print("Calibration complete!")
|
|
self._stereo_map_left = stereo_map_left
|
|
self._stereo_map_right = stereo_map_right
|
|
self._roi_left = roi_left
|
|
self._roi_right = roi_right
|
|
|
|
# rectify images
|
|
left_rectified = cv2.remap(
|
|
left_image,
|
|
self._stereo_map_left[0],
|
|
self._stereo_map_left[1],
|
|
cv2.INTER_LINEAR
|
|
)
|
|
right_rectified = cv2.remap(
|
|
right_image,
|
|
self._stereo_map_right[0],
|
|
self._stereo_map_right[1],
|
|
cv2.INTER_LINEAR
|
|
)
|
|
|
|
return left_rectified, right_rectified
|
|
|
|
def rectify_calibration_images(self) -> None:
|
|
"""Rectify calibration images and save them to disk.
|
|
|
|
The rectified images are stored in separate directories as follows:
|
|
- input_path
|
|
- rectified
|
|
- lc
|
|
- lc_0000.bmp
|
|
- lc_0001.bmp
|
|
- ...
|
|
- rc
|
|
- rc_0000.bmp
|
|
- rc_0001.bmp
|
|
- ...
|
|
"""
|
|
if not self.troubleshooting:
|
|
print("Troubleshooting is disabled; skipping rectified debug image export.")
|
|
return
|
|
print("Rectifying images...")
|
|
pairs = list(
|
|
zip(self._left_image_paths_pairs, self._right_image_paths_all_pairs)
|
|
)
|
|
n_total = len(pairs)
|
|
saved = 0
|
|
with tqdm(pairs, unit="pair", dynamic_ncols=True, smoothing=0.05) as pbar:
|
|
for idx, (left_image_path, right_image_path) in enumerate(pbar, start=1):
|
|
left_image = cv2.imread(left_image_path)
|
|
right_image = cv2.imread(right_image_path)
|
|
if left_image is None or right_image is None:
|
|
tqdm.write(
|
|
f"Warning: Failed to load rectification pair "
|
|
f"{os.path.basename(left_image_path)} / {os.path.basename(right_image_path)}."
|
|
)
|
|
self._refresh_detection_progress_bar(
|
|
pbar, idx, n_total, saved, label="saved"
|
|
)
|
|
continue
|
|
left_image_size = (left_image.shape[1], left_image.shape[0])
|
|
right_image_size = (right_image.shape[1], right_image.shape[0])
|
|
left_rectified, right_rectified = self.rectify_images(
|
|
left_image, right_image, left_image_size, right_image_size
|
|
)
|
|
left_image_name = os.path.basename(left_image_path)
|
|
right_image_name = os.path.basename(right_image_path)
|
|
left_image_path = os.path.join(
|
|
self.input_path, "rectified", self._left_subdir, left_image_name
|
|
)
|
|
right_image_path = os.path.join(
|
|
self.input_path, "rectified", self._right_subdir, right_image_name
|
|
)
|
|
os.makedirs(os.path.dirname(left_image_path), exist_ok=True)
|
|
os.makedirs(os.path.dirname(right_image_path), exist_ok=True)
|
|
cv2.imwrite(left_image_path, left_rectified)
|
|
cv2.imwrite(right_image_path, right_rectified)
|
|
saved += 1
|
|
self._refresh_detection_progress_bar(
|
|
pbar, idx, n_total, saved, label="saved"
|
|
) |