from typing import List, Optional, Tuple import re import cv2 import numpy as np import os import shutil from tqdm import tqdm from calibrationclasses.preprocessing import Preprocessing from pathlib import Path class StereoCalibration: """Stereo calibration class. Parameters ---------- input_path : str path to input data chessboard_size : Tuple default inner corner grid (width, height) for both cameras square_size : float default chessboard square size in meters for both cameras chessboard_size_left, chessboard_size_right : Tuple, optional per-camera inner corner grids (default: chessboard_size) square_size_left, square_size_right : float, optional per-camera square sizes in meters (default: square_size) preprocessing: str default = none, G = Gray, C = Clahe, T = Threshold left_camera: str Which folder to use as the left view: lc or lc-ir (folder may be lc-ir or lc_ir on disk). right_camera: str Which folder to use as the stereo 'right' view: rc, rg/rgb, or ir (IR folder may be named IR). troubleshooting: bool If True, save debug folders (corners, local_coords, images_ncb, rectified). """ def __init__( self, input_path: str, chessboard_size: Tuple, square_size: float, preprocessing: str, calibration_mode: str = "checkerboard", chessboard_size_left: Optional[Tuple] = None, chessboard_size_right: Optional[Tuple] = None, square_size_left: Optional[float] = None, square_size_right: Optional[float] = None, left_camera: str = "lc", right_camera: str = "rc", troubleshooting: bool = False, ) -> None: self.input_path = input_path requested_left_camera = left_camera.lower().replace("_", "-") if requested_left_camera not in ("lc", "lc-ir"): raise ValueError( f"left_camera must be one of lc, lc-ir; got {left_camera!r}" ) self.left_camera = requested_left_camera self._left_folder_aliases = { "lc": ["lc"], "lc-ir": ["lc-ir", "lc_ir", "LC-IR"], }[self.left_camera] images_dir = os.path.join(input_path, "images") nested_left = any( os.path.isdir(os.path.join(images_dir, folder)) for folder in self._left_folder_aliases ) flat_left = any( os.path.isdir(os.path.join(input_path, folder)) for folder in self._left_folder_aliases ) if nested_left: self.image_path = images_dir elif flat_left: self.image_path = input_path else: raise FileNotFoundError( f"Could not find left camera folder for {self.left_camera!r}. " f"Tried under {images_dir} (nested) and {input_path} (flat): " f"{self._left_folder_aliases}" ) self.chessboard_size = chessboard_size self.square_size = square_size self.chessboard_size_left = chessboard_size_left or chessboard_size self.chessboard_size_right = chessboard_size_right or chessboard_size self.square_size_left = ( square_size if square_size_left is None else square_size_left ) self.square_size_right = ( square_size if square_size_right is None else square_size_right ) self.preprocessing = preprocessing self.calibration_mode = calibration_mode requested_right_camera = right_camera.lower() if requested_right_camera not in ("rc", "rgb", "rg", "ir"): raise ValueError( f"right_camera must be one of rc, rgb, rg, ir; got {right_camera!r}" ) # Normalize rgb/rg to a single partner label for params naming. if requested_right_camera in ("rgb", "rg"): self.right_camera = "rg" else: self.right_camera = requested_right_camera self._right_subdir = self.right_camera self.troubleshooting = troubleshooting self.corners_path = ( os.path.join(input_path, "corners") if self.troubleshooting else None ) self.local_coords_path = ( os.path.join(input_path, "local_coords") if self.troubleshooting else None ) self.ncb_path = ( os.path.join(input_path, "images_ncb") if self.troubleshooting else None ) self._preprocessor = Preprocessing() self._initialize() self._init_paths() def _preprocessing_enabled(self) -> bool: spec = (self.preprocessing or "").strip().lower() return bool(spec) and spec not in ("none", "off", "false", "0") def _preprocess_for_detection(self, image: np.ndarray) -> np.ndarray: """ Apply optional preprocessing before chessboard corner detection. Chain letters in order (case-insensitive): G=grayscale, C=CLAHE, T=Otsu threshold (T uses CLAHE internally per Preprocessing.threshold). Examples: C, GC, GCT. """ if image is None or not self._preprocessing_enabled(): return image spec = ( (self.preprocessing or "") .strip() .lower() .replace("none", "") .replace(",", "") .replace(" ", "") ) if not spec: return image out = image pp = self._preprocessor for ch in spec: if ch == "g": g = pp.gray(out) out = cv2.cvtColor(g, cv2.COLOR_GRAY2BGR) elif ch == "c": c = pp.clahe(out) out = cv2.cvtColor(c, cv2.COLOR_GRAY2BGR) elif ch == "t": t = pp.threshold(out) out = cv2.cvtColor(t, cv2.COLOR_GRAY2BGR) return out @staticmethod def _to_gray_for_subpix(image: np.ndarray) -> np.ndarray: if image is None: return None if len(image.shape) == 2: return image if image.shape[2] == 1: return np.squeeze(image, axis=2) return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) @staticmethod def _refresh_detection_progress_bar( pbar, idx: int, total: int, count: int, label: str = "detected" ) -> None: """Live status on the tqdm bar (avoid print() inside loops).""" fmt = getattr(pbar, "format_dict", None) or {} rate = fmt.get("rate") if rate is None or not isinstance(rate, (int, float)) or not np.isfinite(rate) or rate <= 0: rate_str = "…" else: rate_str = f"{rate:.1f}" pbar.set_description( f"Processing image {idx}/{total} | {label}: {count} | {rate_str} it/s", refresh=True, ) def _left_filename_prefixes(self) -> Tuple[str, ...]: if self.left_camera == "lc-ir": return ("lc-ir", "lcir", "lc_ir", "lc") return ("lc",) def _strip_left_filename_prefix(self, filename: str) -> str: name = filename.lower() for prefix in sorted(self._left_filename_prefixes(), key=len, reverse=True): if name.startswith(prefix): return name[len(prefix) :].lstrip("_-.") return name @staticmethod def _pair_key_after_prefix(name: str, prefix: str) -> str: """Strip camera prefix and normalize so lc_123 matches ir_123 / rc_123.""" return Path(name[len(prefix) :].lstrip("_-.")).stem def extract_key_lc(self, path: str): """Filename key to pair the left camera with rc/rgb/ir.""" name = Path(path).name.lower() for prefix in sorted(self._left_filename_prefixes(), key=len, reverse=True): if name.startswith(prefix): return self._pair_key_after_prefix(name, prefix) m = re.search(r"(scan\d{6})", name) if m: return m.group(1) return None def extract_key_other(self, path: str): name = Path(path).name.lower() # IR_scan_000001.png → same scan id as scan000001_* in LC folder m = re.match(r"^ir_scan_(\d+)", name) if m: return f"scan{int(m.group(1)):06d}" for prefix in ("rc", "rgb", "rg", "ir"): if name.startswith(prefix): return self._pair_key_after_prefix(name, prefix) return None def get_camera_type(path): name = os.path.basename(path).lower() if name.startswith("rc"): return "rc" elif name.startswith("rg"): return "rg" elif name.startswith("rgb"): return "rgb" elif name.startswith("ir"): return "ir" return "unknown" def _init_paths(self) -> None: """Initialize paths for the selected left and right camera folders.""" left_dir = None for folder in self._left_folder_aliases: path = os.path.join(self.image_path, folder) if os.path.isdir(path): left_dir = path break if left_dir is None: raise FileNotFoundError( f"Left camera folder for {self.left_camera!r} not found under " f"{self.image_path}. Tried: {self._left_folder_aliases}" ) self._left_image_path = left_dir self._left_subdir = os.path.basename(left_dir) self._right_image_paths_all = [] right_folder_aliases = { "rc": ["rc"], "rg": ["rg", "rgb"], "ir": ["ir", "IR"], } aliases = right_folder_aliases[self.right_camera] right_dir = None for folder in aliases: path = os.path.join(self.image_path, folder) if os.path.isdir(path): right_dir = path break if right_dir is None: raise FileNotFoundError( f"Right camera folder for {self.right_camera!r} not found under " f"{self.image_path}. Tried: {aliases}" ) # Use discovered folder name for writing corners/local_coords/rectified outputs. self._right_subdir = os.path.basename(right_dir) names = [ name for name in os.listdir(right_dir) if name.lower().endswith((".bmp", ".png", ".jpg")) ] names.sort() self._right_image_paths_all = [os.path.join(right_dir, name) for name in names] # --- Load left camera images --- self._left_image_names = [ name for name in os.listdir(self._left_image_path) if name.lower().endswith((".bmp", ".png", ".jpg")) ] self._left_image_names.sort() self._left_image_paths = [ os.path.join(self._left_image_path, name) for name in self._left_image_names ] # --- Initialize containers --- self._valid_left_image_paths = [] self._valid_right_image_paths = [] self._left_image_paths_pairs = [] self._right_image_paths_all_pairs = [] def _initialize(self) -> None: self._chessboard_3d_left_points: List[np.ndarray] = [] self._chessboard_2d_left_points: List[np.ndarray] = [] self._chessboard_3d_right_points: List[np.ndarray] = [] self._chessboard_2d_right_points: List[np.ndarray] = [] self._chessboard_3d_left_points_pairs: List[np.ndarray] = [] self._chessboard_2d_left_points_pairs: List[np.ndarray] = [] self._chessboard_3d_right_points_pairs: List[np.ndarray] = [] self._chessboard_2d_right_points_pairs: List[np.ndarray] = [] self._stereo_map_left: Tuple[np.ndarray, np.ndarray] = [] self._stereo_map_right: Tuple[np.ndarray, np.ndarray] = [] self._left_image_size: Tuple[int, int] = None self._right_image_size: Tuple[int, int] = None self._Parameters: dict = {} def create_3d_chessboard_points( self, chessboard_size: Optional[Tuple] = None, square_size: Optional[float] = None, ) -> np.ndarray: """Create 3D chessboard points for a given chessboard size and square size. Returns ------- np.ndarray Nx3 array of 3D points: N = number of squares in chessboard """ board = chessboard_size if chessboard_size is not None else self.chessboard_size_left square = square_size if square_size is not None else self.square_size_left # create Nx3 array of 3D points: N = number of squares in chessboard chessboard_points = np.zeros((np.prod(board), 3), np.float32) # fill x, y coordinates with indices of chessboard points chessboard_points[:, :2] = np.indices(board).T.reshape(-1, 2) # scale coordinates by square size chessboard_points *= square return chessboard_points def create_2d_chessboard_points( self, image: np.ndarray, chessboard_size: Optional[Tuple] = None ) -> np.ndarray: """Create 2D chessboard points for a given image and chessboard size. Parameters ---------- image : np.ndarray image of chessboard Returns ------- np.ndarray Nx1x2 array of 2D points: N = number of corners found in the image """ board = ( chessboard_size if chessboard_size is not None else self.chessboard_size_left ) # termination criteria criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001) # find chessboard corners found, corners = cv2.findChessboardCorners(image, board, None) if not found: return None # refine corner positions corners = cv2.cornerSubPix( self._to_gray_for_subpix(image), corners, (11, 11), (-1, -1), criteria, ) return corners def plot_chessboard( self, image: np.ndarray, corners: np.ndarray, chessboard_size: Optional[Tuple] = None, ) -> np.ndarray: """Plot chessboard corners on the image. Parameters ---------- image : np.ndarray image of chessboard corners : np.ndarray Nx1x2 array of 2D points: N = number of corners found in the image """ board = ( chessboard_size if chessboard_size is not None else self.chessboard_size_left ) image = cv2.drawChessboardCorners(image, board, corners, True) return image def plot_local_coordinates( self, image: np.ndarray, corners: np.ndarray, chessboard_size: Optional[Tuple] = None, ) -> np.ndarray: """Plot local coordinates on the image. Parameters ---------- image : np.ndarray image of chessboard corners : np.ndarray Nx1x2 array of 2D points: N = number of corners found in the image """ board = ( chessboard_size if chessboard_size is not None else self.chessboard_size_left ) corners = np.int32(corners) cv2.arrowedLine(image, tuple(corners[0,0]), tuple(corners[4,0]), (255,0,0), 3, tipLength=0.05) cv2.arrowedLine(image, tuple(corners[0,0]), tuple(corners[board[0]*2,0]), (255,0,0), 3, tipLength=0.05) cv2.circle(image, tuple(corners[0,0]), 8, (0,255,0), 3) cv2.putText(image, '0,0', (corners[0,0,0]-35, corners[0,0,1]-15), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2, cv2.LINE_AA) cv2.putText(image, 'X', (corners[4,0,0]-25, corners[4,0,1]-15), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2, cv2.LINE_AA) cv2.putText(image, 'Y', (corners[board[0]*2,0,0]-25, corners[board[0]*2,0,1]-15), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255,255,255), 2, cv2.LINE_AA) return image def create_chessboard_points_left(self) -> None: """Create 3D and 2D chessboard points for left images.""" last_loaded_shape = None paths = self._left_image_paths n_total = len(paths) detected = 0 with tqdm(paths, unit="img", dynamic_ncols=True, smoothing=0.05) as pbar: for idx, left_image_path in enumerate(pbar, start=1): left_image = cv2.imread(left_image_path) left_name = left_image_path.split("/")[-1] if left_image is None: tqdm.write(f"Warning: Failed to load image {left_name}. Skipping.") self._refresh_detection_progress_bar(pbar, idx, n_total, detected) continue last_loaded_shape = left_image.shape proc_image = self._preprocess_for_detection(left_image) left_corners = self.create_2d_chessboard_points( proc_image, self.chessboard_size_left ) if left_corners is not None: detected += 1 self._chessboard_3d_left_points.append( self.create_3d_chessboard_points( self.chessboard_size_left, self.square_size_left ) ) self._chessboard_2d_left_points.append(left_corners) self._valid_left_image_paths.append(left_image_path) if self.corners_path is not None: left_image = self.plot_chessboard( left_image, left_corners, self.chessboard_size_left, ) left_image_name = os.path.basename(left_image_path) left_image_path = os.path.join( self.corners_path, self._left_subdir, left_image_name ) os.makedirs(os.path.dirname(left_image_path), exist_ok=True) cv2.imwrite(left_image_path, left_image) if self.local_coords_path is not None: left_image = self.plot_local_coordinates( left_image, left_corners, self.chessboard_size_left ) left_image_name = os.path.basename(left_image_path) left_image_path = os.path.join( self.local_coords_path, self._left_subdir, left_image_name ) os.makedirs(os.path.dirname(left_image_path), exist_ok=True) cv2.imwrite(left_image_path, left_image) self._refresh_detection_progress_bar(pbar, idx, n_total, detected) if last_loaded_shape is not None: self._left_image_size = (last_loaded_shape[1], last_loaded_shape[0]) elif self._left_image_paths: probe = cv2.imread(self._left_image_paths[0]) self._left_image_size = ( (probe.shape[1], probe.shape[0]) if probe is not None else (0, 0) ) else: self._left_image_size = (0, 0) print( f"Number of images with detected chessboards for left ({self.left_camera}): ", len(self._chessboard_2d_left_points), ) def create_chessboard_points_right(self) -> None: """Create 3D and 2D chessboard points for right images.""" paths = self._right_image_paths_all n_total = len(paths) detected = 0 with tqdm(paths, unit="img", dynamic_ncols=True, smoothing=0.05) as pbar: for idx, right_image_path in enumerate(pbar, start=1): right_image = cv2.imread(right_image_path) right_name = right_image_path.split("/")[-1] if right_image is None: tqdm.write(f"Warning: Failed to load image {right_name}. Skipping.") self._refresh_detection_progress_bar(pbar, idx, n_total, detected) continue proc_image = self._preprocess_for_detection(right_image) right_corners = self.create_2d_chessboard_points( proc_image, self.chessboard_size_right ) if right_corners is not None: detected += 1 self._chessboard_3d_right_points.append( self.create_3d_chessboard_points( self.chessboard_size_right, self.square_size_right ) ) self._chessboard_2d_right_points.append(right_corners) self._valid_right_image_paths.append(right_image_path) if self.corners_path is not None: right_image = self.plot_chessboard( right_image, right_corners, self.chessboard_size_right, ) right_image_name = os.path.basename(right_image_path) right_image_path = os.path.join( self.corners_path, self._right_subdir, right_image_name ) os.makedirs(os.path.dirname(right_image_path), exist_ok=True) cv2.imwrite(right_image_path, right_image) if self.local_coords_path is not None: right_image = self.plot_local_coordinates( right_image, right_corners, self.chessboard_size_right, ) right_image_name = os.path.basename(right_image_path) right_image_path = os.path.join( self.local_coords_path, self._right_subdir, right_image_name ) os.makedirs(os.path.dirname(right_image_path), exist_ok=True) cv2.imwrite(right_image_path, right_image) self._refresh_detection_progress_bar(pbar, idx, n_total, detected) if self._valid_right_image_paths: probe = cv2.imread(self._valid_right_image_paths[-1]) self._right_image_size = ( (probe.shape[1], probe.shape[0]) if probe is not None else (0, 0) ) elif self._right_image_paths_all: probe = cv2.imread(self._right_image_paths_all[0]) self._right_image_size = ( (probe.shape[1], probe.shape[0]) if probe is not None else (0, 0) ) else: self._right_image_size = (0, 0) print( f"Number of images with detected chessboards for right ({self.right_camera}): ", len(self._chessboard_2d_right_points), ) def detect_ellipse_center(self, image: np.ndarray) -> Tuple[float, float]: """Detect ellipse center in infrared image. Parameters ---------- image : np.ndarray infrared image Returns ------- Tuple[float, float] (cx, cy) center coordinates of detected ellipse, or None if not found """ gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # --- Step 1: CLAHE for contrast enhancement --- clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) enhanced = clahe.apply(gray) # --- Step 2: Smooth to remove small noise --- blurred = cv2.GaussianBlur(enhanced, (5, 5), 0) # --- Step 3: Binarization (Adaptive or Otsu) --- _, binary = cv2.threshold(blurred, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) # If the circle is dark instead of bright, invert white_ratio = np.sum(binary == 255) / binary.size if white_ratio > 0.5: binary = cv2.bitwise_not(binary) # --- Step 4: Morphological close to fill small holes --- kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (9, 9)) closed = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel) # --- Step 5: Contour detection --- contours, _ = cv2.findContours(closed, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not contours: return None # --- Step 6: Filter for circle-like contours --- valid_contours = [] for cnt in contours: area = cv2.contourArea(cnt) if area < 100: # Skip tiny noise continue x, y, w, h = cv2.boundingRect(cnt) aspect_ratio = w / h if 0.75 < aspect_ratio < 1.25: # roughly circular valid_contours.append(cnt) if not valid_contours: return None # --- Step 7: Choose largest circular contour --- best_contour = max(valid_contours, key=cv2.contourArea) # --- Step 8: Fit ellipse --- if len(best_contour) < 5: return None ellipse = cv2.fitEllipse(best_contour) (cx, cy), (major, minor), angle = ellipse return (cx, cy), ellipse def plot_ellipse(self, image: np.ndarray, ellipse: Tuple) -> np.ndarray: """Plot ellipse on the image. Parameters ---------- image : np.ndarray image ellipse : Tuple ellipse parameters from cv2.fitEllipse """ (cx, cy), (major, minor), angle = ellipse result = image.copy() cv2.ellipse(result, ellipse, (0, 255, 0), 2) cv2.circle(result, (int(cx), int(cy)), 5, (0, 0, 255), -1) cv2.putText(result, f"({int(cx)}, {int(cy)})", (int(cx)+10, int(cy)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) return result def create_ellipse_points_left(self) -> None: """Create calibration points for left infrared images using ellipse detection.""" last_loaded_shape = None paths = self._left_image_paths n_total = len(paths) detected = 0 with tqdm(paths, unit="img", dynamic_ncols=True, smoothing=0.05) as pbar: for idx, left_image_path in enumerate(pbar, start=1): left_image = cv2.imread(left_image_path) left_name = left_image_path.split("/")[-1] if left_image is None: tqdm.write(f"Warning: Failed to load image {left_name}. Skipping.") self._refresh_detection_progress_bar(pbar, idx, n_total, detected) continue last_loaded_shape = left_image.shape ellipse_result = self.detect_ellipse_center(left_image) if ellipse_result is not None: detected += 1 (cx, cy), ellipse = ellipse_result ellipse_center = np.array([[[cx, cy]]], dtype=np.float32) chessboard_3d = np.array([[0.0, 0.0, 0.0]], dtype=np.float32) self._chessboard_3d_left_points.append(chessboard_3d) self._chessboard_2d_left_points.append(ellipse_center) self._valid_left_image_paths.append(left_image_path) if self.corners_path is not None: left_image = self.plot_ellipse(left_image, ellipse) left_image_name = os.path.basename(left_image_path) left_image_path_out = os.path.join( self.corners_path, self._left_subdir, left_image_name ) os.makedirs(os.path.dirname(left_image_path_out), exist_ok=True) cv2.imwrite(left_image_path_out, left_image) self._refresh_detection_progress_bar(pbar, idx, n_total, detected) if last_loaded_shape is not None: self._left_image_size = (last_loaded_shape[1], last_loaded_shape[0]) elif self._left_image_paths: probe = cv2.imread(self._left_image_paths[0]) self._left_image_size = ( (probe.shape[1], probe.shape[0]) if probe is not None else (0, 0) ) else: self._left_image_size = (0, 0) print("Number of images with detected ellipses for LC: ", len(self._chessboard_2d_left_points)) def create_ellipse_points_right(self) -> None: """Create calibration points for right infrared images using ellipse detection.""" last_loaded_shape = None paths = self._right_image_paths_all n_total = len(paths) detected = 0 with tqdm(paths, unit="img", dynamic_ncols=True, smoothing=0.05) as pbar: for idx, right_image_path in enumerate(pbar, start=1): right_image = cv2.imread(right_image_path) right_name = right_image_path.split("/")[-1] if right_image is None: tqdm.write(f"Warning: Failed to load image {right_name}. Skipping.") self._refresh_detection_progress_bar(pbar, idx, n_total, detected) continue last_loaded_shape = right_image.shape ellipse_result = self.detect_ellipse_center(right_image) if ellipse_result is not None: detected += 1 (cx, cy), ellipse = ellipse_result ellipse_center = np.array([[[cx, cy]]], dtype=np.float32) chessboard_3d = np.array([[0.0, 0.0, 0.0]], dtype=np.float32) self._chessboard_3d_right_points.append(chessboard_3d) self._chessboard_2d_right_points.append(ellipse_center) self._valid_right_image_paths.append(right_image_path) if self.corners_path is not None: right_image = self.plot_ellipse(right_image, ellipse) right_image_name = os.path.basename(right_image_path) right_image_path_out = os.path.join( self.corners_path, self._right_subdir, right_image_name ) os.makedirs(os.path.dirname(right_image_path_out), exist_ok=True) cv2.imwrite(right_image_path_out, right_image) self._refresh_detection_progress_bar(pbar, idx, n_total, detected) if last_loaded_shape is not None: self._right_image_size = (last_loaded_shape[1], last_loaded_shape[0]) elif self._right_image_paths_all: probe = cv2.imread(self._right_image_paths_all[0]) self._right_image_size = ( (probe.shape[1], probe.shape[0]) if probe is not None else (0, 0) ) else: self._right_image_size = (0, 0) print( f"Number of images with detected ellipses for right ({self.right_camera}): ", len(self._chessboard_2d_right_points), ) def _dedupe_left_one_per_scan_for_ir(self) -> None: """With IR_scan_NNN (one IR per scan), keep one LC frame per scan ID.""" if self.right_camera != "ir": return packed = list( zip( self._valid_left_image_paths, self._chessboard_2d_left_points, self._chessboard_3d_left_points, ) ) packed.sort(key=lambda x: x[0]) seen = set() kept = [] for path, p2d, p3d in packed: key = self.extract_key_lc(path) if key is None: continue if key in seen: continue seen.add(key) kept.append((path, p2d, p3d)) if not kept: return self._valid_left_image_paths = [x[0] for x in kept] self._chessboard_2d_left_points = [x[1] for x in kept] self._chessboard_3d_left_points = [x[2] for x in kept] def build_pairs_cal(self) -> None: """ Build pairs of LC ↔ (RC | RGB | IR) images based on filenames, and align their corresponding chessboard points. """ self._dedupe_left_one_per_scan_for_ir() warned_stereo_board_model = False # --- Build lookup for RIGHT images --- right_lookup = {} for j, path in enumerate(self._valid_right_image_paths): key = self.extract_key_other(path) # use @staticmethod via self if key is not None: right_lookup[key] = (path, j) # --- Match LC against RIGHT --- for i, left_path in enumerate(self._valid_left_image_paths): key = self.extract_key_lc(left_path) # use @staticmethod via self if key is None: continue if key in right_lookup: right_path, j = right_lookup[key] left_corners = self._chessboard_2d_left_points[i] right_corners = self._chessboard_2d_right_points[j] if left_corners.shape != right_corners.shape: tqdm.write( f"Skipping pair {key}: corner count mismatch " f"({self.left_camera} {left_corners.shape[0]} vs " f"{self.right_camera} {right_corners.shape[0]}). " "Use the same board geometry for stereo pairs." ) continue if not warned_stereo_board_model and ( self.chessboard_size_left != self.chessboard_size_right or self.square_size_left != self.square_size_right ): tqdm.write( "Per-camera board sizes differ; stereo object points use left: " f"{self.chessboard_size_left} @ {self.square_size_left}m." ) warned_stereo_board_model = True # --- Append image paths --- self._left_image_paths_pairs.append(left_path) self._right_image_paths_all_pairs.append(right_path) # --- Append chessboard 2D points --- self._chessboard_2d_left_points_pairs.append(left_corners) self._chessboard_2d_right_points_pairs.append(right_corners) # --- Append chessboard 3D points (stereo requires one object model) --- pts3d = self.create_3d_chessboard_points( self.chessboard_size_left, self.square_size_left ) self._chessboard_3d_left_points_pairs.append(pts3d) self._chessboard_3d_right_points_pairs.append(pts3d) print(f"Found {len(self._left_image_paths_pairs)} common image pairs with valid chessboard corners.") def build_pairs_rec(self) -> None: def strip_prefix(filename): # Assuming the prefix is always the first two characters (like "lc" or "rc") return filename[2:] # Remove the first two characters valid_left_images = set([strip_prefix(path.split('/')[-1]) for path in self._left_image_paths]) valid_right_images = set([strip_prefix(path.split('/')[-1]) for path in self._right_image_paths_all]) common_images = valid_left_images.intersection(valid_right_images) # Filter for common image pairs and corresponding points for i, left_path in enumerate(self._left_image_paths): left_name = strip_prefix(left_path.split('/')[-1]) if left_name in common_images: # Find the corresponding right image and its index for j, right_path in enumerate(self._right_image_paths_all): right_name = strip_prefix(right_path.split('/')[-1]) if left_name == right_name: # Append the common left and right paths and points self._left_image_paths_pairs.append(left_path) self._right_image_paths_all_pairs.append(right_path) break # Print or return the number of common image pairs and points print(f"Found {len(self._left_image_paths_pairs)} common image pairs.") left_image_paths = set([path for path in self._left_image_paths]) right_image_paths = set(self._valid_right_image_paths) left_image_paths_pairs = set([path for path in self._left_image_paths_pairs]) right_image_paths_pairs = set([path for path in self._right_image_paths_all_pairs]) unmatched_left_paths = left_image_paths - left_image_paths_pairs unmatched_right_paths = right_image_paths - right_image_paths_pairs for path in unmatched_left_paths: if self.ncb_path is not None: left_image_path = os.path.join( self.ncb_path, self._left_subdir, os.path.basename(path) ) os.makedirs(os.path.dirname(left_image_path), exist_ok=True) shutil.copy(path, left_image_path) for path in unmatched_right_paths: if self.ncb_path is not None: right_image_path = os.path.join( self.ncb_path, self._right_subdir, os.path.basename(path) ) os.makedirs(os.path.dirname(right_image_path), exist_ok=True) shutil.copy(path, right_image_path) def create_chessboard_points_pairs(self) -> Tuple[np.ndarray, np.ndarray]: """Create 3D and 2D chessboard points for left and right images.""" # Match left/right images by filename after stripping camera prefixes left_dict = { self._strip_left_filename_prefix(os.path.basename(p)): p for p in self._left_image_paths } right_dict = {os.path.basename(p).replace("rc", ""): p for p in self._right_image_paths_all} # Find common keys by matching filenames without prefixes common_keys = set(left_dict.keys()) & set(right_dict.keys()) keys_sorted = sorted(common_keys, key=str) n_total = len(keys_sorted) detected = 0 last_left_shape = None last_right_shape = None with tqdm(keys_sorted, unit="pair", dynamic_ncols=True, smoothing=0.05) as pbar: for idx, key in enumerate(pbar, start=1): left_name = left_dict[key] right_name = right_dict[key] left_image = cv2.imread(left_name) right_image = cv2.imread(right_name) if left_image is None or right_image is None: tqdm.write( f"Warning: Failed to load images {left_name} or {right_name}. Skipping this pair." ) self._refresh_detection_progress_bar(pbar, idx, n_total, detected) continue last_left_shape = left_image.shape last_right_shape = right_image.shape left_proc = self._preprocess_for_detection(left_image) right_proc = self._preprocess_for_detection(right_image) left_corners = self.create_2d_chessboard_points( left_proc, self.chessboard_size_left ) right_corners = self.create_2d_chessboard_points( right_proc, self.chessboard_size_right ) if left_corners is not None and right_corners is not None: if left_corners.shape != right_corners.shape: tqdm.write( f"Skipping pair {key}: corner count mismatch " f"({self.left_camera} {left_corners.shape[0]} vs " f"{self.right_camera} {right_corners.shape[0]})." ) self._refresh_detection_progress_bar( pbar, idx, n_total, detected ) continue detected += 1 second_elements_array1 = left_corners[:, :, 1] second_elements_array2 = right_corners[:, :, 1] difference = second_elements_array1 - second_elements_array2 updated_second_elements_array2 = second_elements_array2 + difference right_corners2 = np.copy(right_corners) right_corners2[:, :, 1] = updated_second_elements_array2 pts3d = self.create_3d_chessboard_points( self.chessboard_size_left, self.square_size_left ) self._chessboard_3d_left_points_pairs.append(pts3d) self._chessboard_2d_left_points_pairs.append(left_corners) self._chessboard_3d_right_points_pairs.append(pts3d) self._chessboard_2d_right_points_pairs.append(right_corners) if self.corners_path is not None: left_image = self.plot_chessboard( left_image, left_corners, self.chessboard_size_left ) right_image = self.plot_chessboard( right_image, right_corners, self.chessboard_size_right ) right_image = self.plot_chessboard( right_image, right_corners2, self.chessboard_size_right ) right_image_name = os.path.basename(right_name) right_image_path = os.path.join( self.corners_path, self._right_subdir, right_image_name ) os.makedirs(os.path.dirname(right_image_path), exist_ok=True) cv2.imwrite(right_image_path, right_image) left_image_name = os.path.basename(left_name) left_image_path = os.path.join( self.corners_path, self._left_subdir, left_image_name ) os.makedirs(os.path.dirname(left_image_path), exist_ok=True) cv2.imwrite(left_image_path, left_image) else: if self.ncb_path is not None: left_image_path = os.path.join( self.ncb_path, self._left_subdir, os.path.basename(left_name) ) right_image_path = os.path.join( self.ncb_path, self._right_subdir, os.path.basename(right_name) ) os.makedirs(os.path.dirname(left_image_path), exist_ok=True) os.makedirs(os.path.dirname(right_image_path), exist_ok=True) tqdm.write( f"Chessboard not detected in both {left_name} and {right_name}. " f"Moving to {os.path.dirname(left_image_path)} and {os.path.dirname(right_image_path)}." ) shutil.copy(left_name, left_image_path) shutil.copy(right_name, right_image_path) self._refresh_detection_progress_bar(pbar, idx, n_total, detected) if last_left_shape is not None and last_right_shape is not None: self._left_image_size = (last_left_shape[1], last_left_shape[0]) self._right_image_size = (last_right_shape[1], last_right_shape[0]) else: self._left_image_size = (0, 0) self._right_image_size = (0, 0) unmatched_left = set(left_dict.keys()) - common_keys unmatched_right = set(right_dict.keys()) - common_keys for key in unmatched_left: if self.ncb_path is not None: left_image_path = os.path.join( self.ncb_path, self._left_subdir, os.path.basename(left_dict[key]) ) os.makedirs(os.path.dirname(left_image_path), exist_ok=True) shutil.copy(left_dict[key], left_image_path) for key in unmatched_right: if self.ncb_path is not None: right_image_path = os.path.join( self.ncb_path, self._right_subdir, os.path.basename(right_dict[key]) ) os.makedirs(os.path.dirname(right_image_path), exist_ok=True) shutil.copy(right_dict[key], right_image_path) self._left_image_names = os.listdir(self._left_image_path) self._right_image_names = os.listdir(self._right_image_path) self._left_image_names = [ name for name in self._left_image_names if name.endswith((".bmp", '.png', '.jpg')) ] self._right_image_names = [ name for name in self._right_image_names if name.endswith((".bmp", '.png', '.jpg')) ] self._left_image_names.sort() self._right_image_names.sort() self._left_image_paths_pairs = [ os.path.join(self._left_image_path, name) for name in self._left_image_names ] self._right_image_paths_all_pairs = [ os.path.join(self._right_image_path, name) for name in self._right_image_names ] print("Number of image pairs: ", len(self._chessboard_2d_left_points_pairs)) return self._chessboard_2d_left_points_pairs, self._chessboard_2d_right_points_pairs def calibrate(self) -> dict: """Calibrate stereo camera. The calibration process consists of the following steps: 1. Calibrate left camera 2. Calibrate right camera 3. Stereo calibration Returns ------- dict_keys(['Transformation', 'Essential', 'Fundamental', 'MeanError', 'SquareSize', 'BoardSize', 'Objpoints', 'L_Intrinsic', 'L_Distortion', 'L_DistortionROI', 'L_DistortionIntrinsic', 'L_RotVektor', 'L_RotMatrix', 'L_Extrinsics', 'L_TransVektor', 'L_Errors', 'L_MeanError', 'R_Intrinsic', 'R_Distortion', 'R_DistortionROI', 'R_DistortionIntrinsic', 'R_RotVektor', 'R_RotMatrix', 'R_Extrinsics', 'R_TransVektor', 'R_Errors', 'R_MeanError', 'L_Imgpoints', 'R_Imgpoints']) """ if len(self._left_image_paths_pairs) == 0: raise RuntimeError( "No stereo pairs with detected chessboard corners. " "Check --chessboard_size (or per-camera sizes), preprocessing, and that the board is visible in both cameras. " f"For {self.left_camera}+IR, left files like scan000001_* pair with " "IR_scan_000001.png by scan id; only one left frame per scan is used " "when multiple sequences exist." ) def CalibrateCamera(image_size, chessboard_2d_points, chessboard_3d_points): CameraParams = {} flags = 0 (ret, mtx, dist, rvecs, tvecs) = cv2.calibrateCamera(chessboard_3d_points, chessboard_2d_points, image_size, None, None, flags=flags) Rmtx = []; Tmtx = []; k = 0 for r in rvecs: Rmtx.append(cv2.Rodrigues(r)[0]) Tmtx.append(np.vstack((np.hstack((Rmtx[k],tvecs[k])),np.array([0,0,0,1])))) k += 1 newmtx, roi = cv2.getOptimalNewCameraMatrix(mtx,dist,image_size,1,image_size) if np.sum(roi) == 0: roi = (0,0,image_size[0]-1,image_size[1]-1) CameraParams['Intrinsic'] = mtx CameraParams['Distortion'] = dist CameraParams['DistortionROI'] = roi CameraParams['DistortionIntrinsic'] = newmtx CameraParams['RotVektor'] = rvecs CameraParams['RotMatrix'] = Rmtx CameraParams['Extrinsics'] = Tmtx CameraParams['TransVektor'] = tvecs return CameraParams def CalculateErrors( params, chessboard_2d_points, chessboard_size, square_size ): imgp = np.array(chessboard_2d_points) imgp = imgp.reshape((imgp.shape[0], imgp.shape[1], imgp.shape[3])) objp = np.array( self.create_3d_chessboard_points(chessboard_size, square_size) ) K = np.array(params['Intrinsic']) D = np.array(params['Distortion']) R = np.array(params['RotVektor']) T = np.array(params['TransVektor']) N = imgp.shape[0] imgpNew = [] for i in range(N): temp, _ = cv2.projectPoints(objp, R[i], T[i], K, D) imgpNew.append(temp.reshape((temp.shape[0], temp.shape[2]))) imgpNew = np.array(imgpNew) err = [] for i in range(N): err.append(imgp[i] - imgpNew[i]) err = np.array(err) def RMSE(err): return np.sqrt(np.mean(np.sum(err**2, axis=1))) errall = np.copy(err[0]) rmsePerView = [RMSE(err[0])] for i in range(1,N): errall = np.vstack((errall, err[i])) rmsePerView.append(RMSE(err[i])) rmseAll = RMSE(errall) return rmsePerView, rmseAll # left camera calibration print("Calibrating left camera...") if self._chessboard_2d_left_points == [] : Left_Params = CalibrateCamera(self._left_image_size, self._chessboard_2d_left_points_pairs, self._chessboard_3d_left_points_pairs) Left_Errors, Left_MeanError = CalculateErrors( Left_Params, self._chessboard_2d_left_points_pairs, self.chessboard_size_left, self.square_size_left, ) Left_Params['Imgpoints'] = self._chessboard_2d_left_points_pairs else : Left_Params = CalibrateCamera(self._left_image_size, self._chessboard_2d_left_points, self._chessboard_3d_left_points) Left_Errors, Left_MeanError = CalculateErrors( Left_Params, self._chessboard_2d_left_points, self.chessboard_size_left, self.square_size_left, ) Left_Params['Imgpoints'] = self._chessboard_2d_left_points np.set_printoptions(suppress=True, precision=5) print('Intrinsic Matrix:') print(Left_Params['Intrinsic']) print('\nDistortion Parameters:') print(Left_Params['Distortion']) print('\nExtrinsic Matrix from 1.Image:') print(Left_Params['Extrinsics'][0]) print('Reprojection Error Left: {:.4f}'.format(Left_MeanError)) # right camera calibration print("Calibrating right camera...") if self._chessboard_2d_right_points == [] : Right_Params = CalibrateCamera(self._right_image_size, self._chessboard_2d_right_points_pairs, self._chessboard_3d_right_points_pairs) Right_Errors, Right_MeanError = CalculateErrors( Right_Params, self._chessboard_2d_right_points_pairs, self.chessboard_size_right, self.square_size_right, ) Right_Params['Imgpoints'] = self._chessboard_2d_right_points_pairs else : Right_Params = CalibrateCamera(self._right_image_size, self._chessboard_2d_right_points, self._chessboard_3d_right_points) Right_Errors, Right_MeanError = CalculateErrors( Right_Params, self._chessboard_2d_right_points, self.chessboard_size_right, self.square_size_right, ) Right_Params['Imgpoints'] = self._chessboard_2d_right_points np.set_printoptions(suppress=True, precision=5) print('Intrinsic Matrix:') print(Right_Params['Intrinsic']) print('\nDistortion Parameters:') print(Right_Params['Distortion']) print('\nExtrinsic Matrix from 1.Image:') print(Right_Params['Extrinsics'][0]) print('Reprojection Error Right: {:.4f}'.format(Right_MeanError)) Left_Params['Errors'] = Left_Errors Left_Params['MeanError'] = Left_MeanError Right_Params['Errors'] = Right_Errors Right_Params['MeanError'] = Right_MeanError # --- stereo calibration --- print("Calibrating stereo camera...") flags = cv2.CALIB_FIX_INTRINSIC criteria = (cv2.TERM_CRITERIA_MAX_ITER + cv2.TERM_CRITERIA_EPS, 30, 0.001) ( ret_stereo, newcameramtx_left, dist_left, newcameramtx_right, dist_right, rot, trans, essential, fundamental, ) = cv2.stereoCalibrate( self._chessboard_3d_left_points_pairs, self._chessboard_2d_left_points_pairs, self._chessboard_2d_right_points_pairs, Left_Params['Intrinsic'], Left_Params['Distortion'], Right_Params['Intrinsic'], Right_Params['Distortion'], self._left_image_size, criteria=criteria, flags=flags, ) T = np.vstack((np.hstack((rot, trans)), np.array([0, 0, 0, 1]))) print("\n--- Stereo Calibration Results ---") print("Translation vector:\n", trans) print("Rotation matrix:\n", rot) print("Mean reprojection error:", ret_stereo) # --- Compute Q --- print("\nComputing stereo rectification + Q matrix...") R1, R2, P1, P2, Q, roi1, roi2 = cv2.stereoRectify( Left_Params['Intrinsic'], Left_Params['Distortion'], Right_Params['Intrinsic'], Right_Params['Distortion'], self._left_image_size, rot, trans, flags=0, alpha=1 ) # ensure clean numpy array self._Q = np.array(Q, dtype=np.float64) print("\n--- Q MATRIX ---") print(self._Q) print("Q shape:", self._Q.shape) print("Q dtype:", self._Q.dtype) # --- sanity checks --- print("\n--- Q sanity check ---") print("Q[2,3] (focal length term):", self._Q[2, 3]) print("Q[3,2] (baseline term):", self._Q[3, 2]) if self._Q.shape != (4, 4): raise RuntimeError("[ERROR] Q is not 4x4!") if self._Q[3, 2] == 0: print("[WARNING] Baseline is ZERO → depth will fail!") # --- store stereo params --- Parameters = { 'Translation': trans, 'Rotation': rot, 'Transformation': T, 'Essential': essential, 'Fundamental': fundamental, 'MeanError': ret_stereo, 'SquareSize': self.square_size_left, 'BoardSize': self.chessboard_size_left, 'L_SquareSize': self.square_size_left, 'R_SquareSize': self.square_size_right, 'L_BoardSize': self.chessboard_size_left, 'R_BoardSize': self.chessboard_size_right, 'Objpoints': self.create_3d_chessboard_points( self.chessboard_size_left, self.square_size_left ), 'Q': self._Q # ✅ store Q in parameters (for NPZ) } # add left params for key, value in Left_Params.items(): Parameters[f"L_{key}"] = value # add right params for key, value in Right_Params.items(): Parameters[f"R_{key}"] = value self._Parameters = Parameters return Parameters def save_stereo_calibration(self) -> None: print("[DEBUG] save_stereo_calibration() CALLED") """Save stereo calibration to pair-specific NPZ/YAML/Q (including Q).""" print("\nSaving stereo calibration...") if not hasattr(self, "_Q") or self._Q is None: raise RuntimeError("[ERROR] Q missing! Run calibration first.") # ensure Q is clean Q_clean = np.array(self._Q, dtype=np.float64) self._Parameters['Q'] = Q_clean pair_tag = f"{self.left_camera}-{self.right_camera}" # --- Save pair-specific NPZ --- npz_path = os.path.join(self.input_path, "params", f"{pair_tag}_parameters.npz") os.makedirs(os.path.dirname(npz_path), exist_ok=True) np.savez(npz_path, **self._Parameters) print(f"[INFO] Saved NPZ → {npz_path}") # --- Save pair-specific YAML --- yaml_path = os.path.join(self.input_path, "params", f"{pair_tag}_stereo_cam_model.yaml") fs = cv2.FileStorage(yaml_path, cv2.FILE_STORAGE_WRITE) fs.write("L_DistortionIntrinsic", self._Parameters['L_DistortionIntrinsic']) fs.write("L_Intrinsic", self._Parameters['L_Intrinsic']) fs.write("L_Distortion", self._Parameters['L_Distortion']) fs.write("R_DistortionIntrinsic", self._Parameters['R_DistortionIntrinsic']) fs.write("R_Intrinsic", self._Parameters['R_Intrinsic']) fs.write("R_Distortion", self._Parameters['R_Distortion']) fs.write("Rotation", self._Parameters['Transformation'][:3, :3]) fs.write("Translation", self._Parameters['Transformation'][:3, 3:]) # --- Save Q (FIXED) --- fs.write("Q", Q_clean) print("[INFO] Q written to YAML") fs.release() print(f"[INFO] Saved YAML → {yaml_path}") # --- Save pair-specific Q-matrix for use in disparity --- cvstore_path = os.path.join(os.path.dirname(npz_path), f"{pair_tag}_Q.cvstore") fs2 = cv2.FileStorage(cvstore_path, cv2.FILE_STORAGE_WRITE) fs2.write("Q", Q_clean) fs2.release() print(f"[INFO] Saved Q matrix separately → {cvstore_path}") def read_stereo_calibration_yaml(self, yaml_path: str) -> None: """Read stereo calibration from YAML file safely.""" print("\nReading stereo calibration YAML...") fs = cv2.FileStorage(yaml_path, cv2.FILE_STORAGE_READ) self._Parameters = {} for key in fs.root().keys(): node = fs.getNode(key) if node.empty(): print(f"[WARNING] Empty node: {key}") continue mat = node.mat() if mat is None: print(f"[WARNING] Could not read: {key}") continue self._Parameters[key] = mat print(f"[INFO] Loaded: {key}, shape={mat.shape}") fs.release() # --- Check Q --- if "Q" in self._Parameters: Q = self._Parameters["Q"] print("\n--- Loaded Q MATRIX ---") print(Q) print("Shape:", Q.shape) print("dtype:", Q.dtype) print("\n--- Q sanity check ---") print("Q[2,3]:", Q[2, 3]) print("Q[3,2]:", Q[3, 2]) if Q.shape != (4, 4): print("[ERROR] Q is not 4x4!") if Q[3, 2] == 0: print("[ERROR] Baseline is zero → INVALID Q") else: print("[ERROR] Q NOT FOUND in YAML!") def read_stereo_calibration_npz(self, npz_path: str) -> None: """Read stereo calibration from npz file. Parameters ---------- npz_path : str path to npz file """ print("Reading stereo calibration...") self._Parameters = dict(np.load(npz_path)) print(self._Parameters.keys()) def rectify_images( self, left_image: np.ndarray, right_image: np.ndarray, left_image_size, right_image_size ) -> Tuple[np.ndarray, np.ndarray]: """Rectify left and right images using stereo mapping. Parameters ---------- left_image : np.ndarray left image right_image : np.ndarray right image Returns ------- Tuple[np.ndarray, np.ndarray] left_rectified, right_rectified """ # stereo rectification print("Rectifying stereo camera...") rect_left, rect_right, proj_left, proj_right, Q, roi_left, roi_right = ( cv2.stereoRectify( self._Parameters['L_Intrinsic'], self._Parameters['L_Distortion'], self._Parameters['R_Intrinsic'], self._Parameters['R_Distortion'], left_image_size, self._Parameters['Rotation'], self._Parameters['Translation'], flags=0, alpha=1 ) ) self._Q = Q print ("roi_left: ", roi_left) print ("roi_right: ", roi_right) # stereo mapping print("Mapping left and right cameras...") stereo_map_left = cv2.initUndistortRectifyMap( self._Parameters['L_Intrinsic'], self._Parameters['L_Distortion'], rect_left, proj_left, left_image_size, cv2.CV_32FC1, ) stereo_map_right = cv2.initUndistortRectifyMap( self._Parameters['R_Intrinsic'], self._Parameters['R_Distortion'], rect_right, proj_right, right_image_size, cv2.CV_32FC1, ) print("Calibration complete!") self._stereo_map_left = stereo_map_left self._stereo_map_right = stereo_map_right self._roi_left = roi_left self._roi_right = roi_right # rectify images left_rectified = cv2.remap( left_image, self._stereo_map_left[0], self._stereo_map_left[1], cv2.INTER_LINEAR ) right_rectified = cv2.remap( right_image, self._stereo_map_right[0], self._stereo_map_right[1], cv2.INTER_LINEAR ) return left_rectified, right_rectified def rectify_calibration_images(self) -> None: """Rectify calibration images and save them to disk. The rectified images are stored in separate directories as follows: - input_path - rectified - lc - lc_0000.bmp - lc_0001.bmp - ... - rc - rc_0000.bmp - rc_0001.bmp - ... """ if not self.troubleshooting: print("Troubleshooting is disabled; skipping rectified debug image export.") return print("Rectifying images...") pairs = list( zip(self._left_image_paths_pairs, self._right_image_paths_all_pairs) ) n_total = len(pairs) saved = 0 with tqdm(pairs, unit="pair", dynamic_ncols=True, smoothing=0.05) as pbar: for idx, (left_image_path, right_image_path) in enumerate(pbar, start=1): left_image = cv2.imread(left_image_path) right_image = cv2.imread(right_image_path) if left_image is None or right_image is None: tqdm.write( f"Warning: Failed to load rectification pair " f"{os.path.basename(left_image_path)} / {os.path.basename(right_image_path)}." ) self._refresh_detection_progress_bar( pbar, idx, n_total, saved, label="saved" ) continue left_image_size = (left_image.shape[1], left_image.shape[0]) right_image_size = (right_image.shape[1], right_image.shape[0]) left_rectified, right_rectified = self.rectify_images( left_image, right_image, left_image_size, right_image_size ) left_image_name = os.path.basename(left_image_path) right_image_name = os.path.basename(right_image_path) left_image_path = os.path.join( self.input_path, "rectified", self._left_subdir, left_image_name ) right_image_path = os.path.join( self.input_path, "rectified", self._right_subdir, right_image_name ) os.makedirs(os.path.dirname(left_image_path), exist_ok=True) os.makedirs(os.path.dirname(right_image_path), exist_ok=True) cv2.imwrite(left_image_path, left_rectified) cv2.imwrite(right_image_path, right_rectified) saved += 1 self._refresh_detection_progress_bar( pbar, idx, n_total, saved, label="saved" )