"""Step 2: mono and stereo calibration from per-image JSON feature files.""" from __future__ import annotations import os from pathlib import Path from typing import Dict, List, Optional, Tuple import cv2 import numpy as np from calibrationclasses.feature_json import FeatureRecord, load_folder_features from calibrationclasses.pairing import StereoPair, build_stereo_pairs from calibrationclasses.session import ( CameraFolder, discover_camera_folder, resolve_session_root, STEREO_PARTNERS, ) def create_3d_board_points(board_size: Tuple[int, int], square_size: float) -> np.ndarray: pts = np.zeros((np.prod(board_size), 3), np.float32) pts[:, :2] = np.indices(board_size).T.reshape(-1, 2) pts *= square_size return pts def _image_size_from_records(records: List[FeatureRecord]) -> Tuple[int, int]: for record in records: img = cv2.imread(str(record.image_path)) if img is not None: return img.shape[1], img.shape[0] raise RuntimeError("Could not determine image size from feature JSONs") def calibrate_camera_intrinsics( records: List[FeatureRecord], board_size: Tuple[int, int], square_size: float, ) -> Dict: chess_records = [r for r in records if r.is_chessboard] if len(chess_records) < 3: raise RuntimeError( f"Need at least 3 chessboard detections for mono calibration, got {len(chess_records)}" ) image_size = _image_size_from_records(chess_records) objp = create_3d_board_points(board_size, square_size) obj_points = [objp for _ in chess_records] img_points = [r.corners for r in chess_records] ret, mtx, dist, rvecs, tvecs = cv2.calibrateCamera( obj_points, img_points, image_size, None, None, flags=0 ) rmtx = [] tmtx = [] for k, r in enumerate(rvecs): rmtx.append(cv2.Rodrigues(r)[0]) tmtx.append(np.vstack((np.hstack((rmtx[k], tvecs[k])), np.array([0, 0, 0, 1])))) newmtx, roi = cv2.getOptimalNewCameraMatrix(mtx, dist, image_size, 1, image_size) if np.sum(roi) == 0: roi = (0, 0, image_size[0] - 1, image_size[1] - 1) return { "Intrinsic": mtx, "Distortion": dist, "DistortionROI": roi, "DistortionIntrinsic": newmtx, "RotVektor": rvecs, "RotMatrix": rmtx, "Extrinsics": tmtx, "TransVektor": tvecs, "MeanError": float(ret), "image_size": image_size, "num_views": len(chess_records), } def save_mono_intrinsics( params_dir: Path, camera_name: str, intrinsics: Dict, *, troubleshooting: bool = False, ) -> None: params_dir.mkdir(parents=True, exist_ok=True) tag = camera_name.replace("/", "-") npz_path = params_dir / f"{tag}_intrinsics.npz" np.savez( npz_path, Intrinsic=intrinsics["Intrinsic"], Distortion=intrinsics["Distortion"], DistortionIntrinsic=intrinsics["DistortionIntrinsic"], DistortionROI=intrinsics["DistortionROI"], MeanError=intrinsics["MeanError"], image_size=np.array(intrinsics["image_size"]), num_views=intrinsics["num_views"], ) yaml_path = params_dir / f"{tag}_intrinsics.yaml" fs = cv2.FileStorage(str(yaml_path), cv2.FILE_STORAGE_WRITE) fs.write("Intrinsic", intrinsics["Intrinsic"]) fs.write("Distortion", intrinsics["Distortion"]) fs.write("DistortionIntrinsic", intrinsics["DistortionIntrinsic"]) fs.release() if troubleshooting: print(f"[INFO] Saved mono intrinsics → {npz_path} and {yaml_path}") def run_mono_calibration( input_path: str | Path, board_sizes: Dict[str, Tuple[int, int]], square_sizes: Dict[str, float], cameras: Optional[List[str]] = None, troubleshooting: bool = False, ) -> Dict[str, Dict]: session_root = resolve_session_root(input_path) params_dir = Path(input_path) / "params" results = {} for logical_name, board_size in board_sizes.items(): if cameras and logical_name not in cameras: continue cam = discover_camera_folder(session_root, logical_name) if cam is None: continue records = load_folder_features(cam.path) square_size = square_sizes[logical_name] try: intrinsics = calibrate_camera_intrinsics(records, board_size, square_size) save_mono_intrinsics( params_dir, logical_name, intrinsics, troubleshooting=troubleshooting ) results[logical_name] = intrinsics print( f"[mono:{logical_name}] views={intrinsics['num_views']} " f"reproj_err={intrinsics['MeanError']:.4f}" ) except RuntimeError as exc: if troubleshooting: print(f"[SKIP mono:{logical_name}] {exc}") else: print(f"[mono:{logical_name}] skipped") return results def calibrate_stereo_pair( pairs: List[StereoPair], left_intrinsics: Dict, right_intrinsics: Dict, board_size: Tuple[int, int], square_size: float, image_size: Tuple[int, int], ) -> Dict: if not pairs: raise RuntimeError("No stereo pairs available") objp = create_3d_board_points(board_size, square_size) obj_points = [objp for _ in pairs] left_img_points = [p.left.corners for p in pairs] right_img_points = [p.right.corners for p in pairs] flags = cv2.CALIB_FIX_INTRINSIC criteria = (cv2.TERM_CRITERIA_MAX_ITER + cv2.TERM_CRITERIA_EPS, 30, 0.001) ret_stereo, _, _, _, _, rot, trans, essential, fundamental = cv2.stereoCalibrate( obj_points, left_img_points, right_img_points, left_intrinsics["Intrinsic"], left_intrinsics["Distortion"], right_intrinsics["Intrinsic"], right_intrinsics["Distortion"], image_size, criteria=criteria, flags=flags, ) R1, R2, P1, P2, Q, roi1, roi2 = cv2.stereoRectify( left_intrinsics["Intrinsic"], left_intrinsics["Distortion"], right_intrinsics["Intrinsic"], right_intrinsics["Distortion"], image_size, rot, trans, flags=0, alpha=1, ) T = np.vstack((np.hstack((rot, trans)), np.array([0, 0, 0, 1]))) Q_clean = np.array(Q, dtype=np.float64) parameters = { "Translation": trans, "Rotation": rot, "Transformation": T, "Essential": essential, "Fundamental": fundamental, "MeanError": float(ret_stereo), "SquareSize": square_size, "BoardSize": board_size, "Objpoints": objp, "Q": Q_clean, "num_pairs": len(pairs), "L_Intrinsic": left_intrinsics["Intrinsic"], "L_Distortion": left_intrinsics["Distortion"], "L_DistortionIntrinsic": left_intrinsics["DistortionIntrinsic"], "R_Intrinsic": right_intrinsics["Intrinsic"], "R_Distortion": right_intrinsics["Distortion"], "R_DistortionIntrinsic": right_intrinsics["DistortionIntrinsic"], "L_Imgpoints": left_img_points, "R_Imgpoints": right_img_points, "R1": R1, "R2": R2, "P1": P1, "P2": P2, "image_size": image_size, } return parameters def save_stereo_calibration( input_path: str | Path, pair_tag: str, parameters: Dict, *, troubleshooting: bool = False, ) -> None: params_dir = Path(input_path) / "params" params_dir.mkdir(parents=True, exist_ok=True) Q_clean = np.array(parameters["Q"], dtype=np.float64) npz_path = params_dir / f"{pair_tag}_parameters.npz" save_kwargs = {k: v for k, v in parameters.items() if k not in ("R1", "R2", "P1", "P2")} np.savez(npz_path, **save_kwargs) if troubleshooting: print(f"[INFO] Saved NPZ → {npz_path}") yaml_path = params_dir / f"{pair_tag}_stereo_cam_model.yaml" fs = cv2.FileStorage(str(yaml_path), cv2.FILE_STORAGE_WRITE) fs.write("L_DistortionIntrinsic", parameters["L_DistortionIntrinsic"]) fs.write("L_Intrinsic", parameters["L_Intrinsic"]) fs.write("L_Distortion", parameters["L_Distortion"]) fs.write("R_DistortionIntrinsic", parameters["R_DistortionIntrinsic"]) fs.write("R_Intrinsic", parameters["R_Intrinsic"]) fs.write("R_Distortion", parameters["R_Distortion"]) fs.write("Rotation", parameters["Transformation"][:3, :3]) fs.write("Translation", parameters["Transformation"][:3, 3:]) fs.write("Q", Q_clean) fs.release() if troubleshooting: print(f"[INFO] Saved YAML → {yaml_path}") cvstore_path = params_dir / f"{pair_tag}_Q.cvstore" fs2 = cv2.FileStorage(str(cvstore_path), cv2.FILE_STORAGE_WRITE) fs2.write("Q", Q_clean) fs2.release() if troubleshooting: print(f"[INFO] Saved Q → {cvstore_path}") def save_pairing_report( input_path: str | Path, pair_tag: str, pairs: List[StereoPair], ) -> Path: report_dir = Path(input_path) / "pairing_reports" report_dir.mkdir(parents=True, exist_ok=True) report_path = report_dir / f"{pair_tag}.txt" lines = [ f"# stereo pairs for {pair_tag}", f"# total={len(pairs)}", "left_image\tright_image\tdelta_sec\tmethod", ] for pair in pairs: lines.append( f"{pair.left.image_path.name}\t{pair.right.image_path.name}\t" f"{pair.delta_sec:.6f}\t{pair.method}" ) report_path.write_text("\n".join(lines) + "\n", encoding="utf-8") print(f"[INFO] Pairing report → {report_path}") return report_path def save_rectified_pairs( input_path: str | Path, pair_tag: str, pairs: List[StereoPair], parameters: Dict, left_folder: str, right_folder: str, ) -> None: image_size = parameters["image_size"] R1, R2, P1, P2 = parameters["R1"], parameters["R2"], parameters["P1"], parameters["P2"] map_left = cv2.initUndistortRectifyMap( parameters["L_Intrinsic"], parameters["L_Distortion"], R1, P1, image_size, cv2.CV_32FC1, ) map_right = cv2.initUndistortRectifyMap( parameters["R_Intrinsic"], parameters["R_Distortion"], R2, P2, image_size, cv2.CV_32FC1, ) out_left = Path(input_path) / "rectified" / pair_tag / left_folder out_right = Path(input_path) / "rectified" / pair_tag / right_folder out_left.mkdir(parents=True, exist_ok=True) out_right.mkdir(parents=True, exist_ok=True) saved = 0 for pair in pairs: left_img = cv2.imread(str(pair.left.image_path)) right_img = cv2.imread(str(pair.right.image_path)) if left_img is None or right_img is None: continue left_rect = cv2.remap(left_img, map_left[0], map_left[1], cv2.INTER_LINEAR) right_rect = cv2.remap(right_img, map_right[0], map_right[1], cv2.INTER_LINEAR) cv2.imwrite(str(out_left / pair.left.image_path.name), left_rect) cv2.imwrite(str(out_right / pair.right.image_path.name), right_rect) saved += 1 print(f"[INFO] Rectified {saved}/{len(pairs)} pairs → {out_left.parent}") def run_stereo_calibration( input_path: str | Path, left_camera: str, mono_results: Dict[str, Dict], board_sizes: Dict[str, Tuple[int, int]], square_sizes: Dict[str, float], time_window_sec: float = 0.1, partners: Tuple[str, ...] = STEREO_PARTNERS, troubleshooting: bool = False, ) -> None: session_root = resolve_session_root(input_path) left_cam = discover_camera_folder(session_root, left_camera) if left_cam is None: raise FileNotFoundError(f"Left camera folder {left_camera!r} not found") if left_camera not in mono_results: raise RuntimeError( f"No mono intrinsics for {left_camera}. Run mono calibration first." ) left_records = load_folder_features(left_cam.path) left_board = board_sizes[left_camera] left_square = square_sizes[left_camera] image_size = mono_results[left_camera]["image_size"] for partner in partners: right_cam = discover_camera_folder(session_root, partner) if right_cam is None: if troubleshooting: print(f"[SKIP stereo:{left_camera}-{partner}] folder not found") continue if partner not in mono_results: if troubleshooting: print( f"[SKIP stereo:{left_camera}-{partner}] " f"no mono intrinsics for {partner}" ) continue right_records = load_folder_features(right_cam.path) pairs = build_stereo_pairs(left_records, right_records, time_window_sec) pair_tag = f"{left_camera}-{partner}" if not pairs: if troubleshooting: print( f"[SKIP stereo:{pair_tag}] no valid pairs " f"(time_window={time_window_sec}s)" ) continue time_n = sum(1 for p in pairs if p.method == "time_window") key_n = sum(1 for p in pairs if p.method == "pair_key") if troubleshooting: print( f"[stereo:{pair_tag}] {len(pairs)} pairs " f"(time_window={time_n}, pair_key={key_n})" ) save_pairing_report(input_path, pair_tag, pairs) try: params = calibrate_stereo_pair( pairs, mono_results[left_camera], mono_results[partner], left_board, left_square, image_size, ) save_stereo_calibration( input_path, pair_tag, params, troubleshooting=troubleshooting ) print( f"[stereo:{pair_tag}] pairs={params['num_pairs']} " f"reproj_err={params['MeanError']:.4f}" ) if troubleshooting: save_rectified_pairs( input_path, pair_tag, pairs, params, left_cam.folder_name, right_cam.folder_name, ) except RuntimeError as exc: if troubleshooting: print(f"[FAIL stereo:{pair_tag}] {exc}") else: print(f"[stereo:{pair_tag}] failed")