Files
Speckle-Scanner/02_Calibration/calibrationclasses/calibration_engine.py
T

425 lines
14 KiB
Python

"""Step 2: mono and stereo calibration from per-image JSON feature files."""
from __future__ import annotations
import os
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import cv2
import numpy as np
from calibrationclasses.feature_json import FeatureRecord, load_folder_features
from calibrationclasses.pairing import StereoPair, build_stereo_pairs
from calibrationclasses.session import (
CameraFolder,
discover_camera_folder,
resolve_session_root,
STEREO_PARTNERS,
)
def create_3d_board_points(board_size: Tuple[int, int], square_size: float) -> np.ndarray:
pts = np.zeros((np.prod(board_size), 3), np.float32)
pts[:, :2] = np.indices(board_size).T.reshape(-1, 2)
pts *= square_size
return pts
def _image_size_from_records(records: List[FeatureRecord]) -> Tuple[int, int]:
for record in records:
img = cv2.imread(str(record.image_path))
if img is not None:
return img.shape[1], img.shape[0]
raise RuntimeError("Could not determine image size from feature JSONs")
def calibrate_camera_intrinsics(
records: List[FeatureRecord],
board_size: Tuple[int, int],
square_size: float,
) -> Dict:
chess_records = [r for r in records if r.is_chessboard]
if len(chess_records) < 3:
raise RuntimeError(
f"Need at least 3 chessboard detections for mono calibration, got {len(chess_records)}"
)
image_size = _image_size_from_records(chess_records)
objp = create_3d_board_points(board_size, square_size)
obj_points = [objp for _ in chess_records]
img_points = [r.corners for r in chess_records]
ret, mtx, dist, rvecs, tvecs = cv2.calibrateCamera(
obj_points, img_points, image_size, None, None, flags=0
)
rmtx = []
tmtx = []
for k, r in enumerate(rvecs):
rmtx.append(cv2.Rodrigues(r)[0])
tmtx.append(np.vstack((np.hstack((rmtx[k], tvecs[k])), np.array([0, 0, 0, 1]))))
newmtx, roi = cv2.getOptimalNewCameraMatrix(mtx, dist, image_size, 1, image_size)
if np.sum(roi) == 0:
roi = (0, 0, image_size[0] - 1, image_size[1] - 1)
return {
"Intrinsic": mtx,
"Distortion": dist,
"DistortionROI": roi,
"DistortionIntrinsic": newmtx,
"RotVektor": rvecs,
"RotMatrix": rmtx,
"Extrinsics": tmtx,
"TransVektor": tvecs,
"MeanError": float(ret),
"image_size": image_size,
"num_views": len(chess_records),
}
def save_mono_intrinsics(
params_dir: Path,
camera_name: str,
intrinsics: Dict,
*,
troubleshooting: bool = False,
) -> None:
params_dir.mkdir(parents=True, exist_ok=True)
tag = camera_name.replace("/", "-")
npz_path = params_dir / f"{tag}_intrinsics.npz"
np.savez(
npz_path,
Intrinsic=intrinsics["Intrinsic"],
Distortion=intrinsics["Distortion"],
DistortionIntrinsic=intrinsics["DistortionIntrinsic"],
DistortionROI=intrinsics["DistortionROI"],
MeanError=intrinsics["MeanError"],
image_size=np.array(intrinsics["image_size"]),
num_views=intrinsics["num_views"],
)
yaml_path = params_dir / f"{tag}_intrinsics.yaml"
fs = cv2.FileStorage(str(yaml_path), cv2.FILE_STORAGE_WRITE)
fs.write("Intrinsic", intrinsics["Intrinsic"])
fs.write("Distortion", intrinsics["Distortion"])
fs.write("DistortionIntrinsic", intrinsics["DistortionIntrinsic"])
fs.release()
if troubleshooting:
print(f"[INFO] Saved mono intrinsics → {npz_path} and {yaml_path}")
def run_mono_calibration(
input_path: str | Path,
board_sizes: Dict[str, Tuple[int, int]],
square_sizes: Dict[str, float],
cameras: Optional[List[str]] = None,
troubleshooting: bool = False,
) -> Dict[str, Dict]:
session_root = resolve_session_root(input_path)
params_dir = Path(input_path) / "params"
results = {}
for logical_name, board_size in board_sizes.items():
if cameras and logical_name not in cameras:
continue
cam = discover_camera_folder(session_root, logical_name)
if cam is None:
continue
records = load_folder_features(cam.path)
square_size = square_sizes[logical_name]
try:
intrinsics = calibrate_camera_intrinsics(records, board_size, square_size)
save_mono_intrinsics(
params_dir, logical_name, intrinsics, troubleshooting=troubleshooting
)
results[logical_name] = intrinsics
print(
f"[mono:{logical_name}] views={intrinsics['num_views']} "
f"reproj_err={intrinsics['MeanError']:.4f}"
)
except RuntimeError as exc:
if troubleshooting:
print(f"[SKIP mono:{logical_name}] {exc}")
else:
print(f"[mono:{logical_name}] skipped")
return results
def calibrate_stereo_pair(
pairs: List[StereoPair],
left_intrinsics: Dict,
right_intrinsics: Dict,
board_size: Tuple[int, int],
square_size: float,
image_size: Tuple[int, int],
) -> Dict:
if not pairs:
raise RuntimeError("No stereo pairs available")
objp = create_3d_board_points(board_size, square_size)
obj_points = [objp for _ in pairs]
left_img_points = [p.left.corners for p in pairs]
right_img_points = [p.right.corners for p in pairs]
flags = cv2.CALIB_FIX_INTRINSIC
criteria = (cv2.TERM_CRITERIA_MAX_ITER + cv2.TERM_CRITERIA_EPS, 30, 0.001)
ret_stereo, _, _, _, _, rot, trans, essential, fundamental = cv2.stereoCalibrate(
obj_points,
left_img_points,
right_img_points,
left_intrinsics["Intrinsic"],
left_intrinsics["Distortion"],
right_intrinsics["Intrinsic"],
right_intrinsics["Distortion"],
image_size,
criteria=criteria,
flags=flags,
)
R1, R2, P1, P2, Q, roi1, roi2 = cv2.stereoRectify(
left_intrinsics["Intrinsic"],
left_intrinsics["Distortion"],
right_intrinsics["Intrinsic"],
right_intrinsics["Distortion"],
image_size,
rot,
trans,
flags=0,
alpha=1,
)
T = np.vstack((np.hstack((rot, trans)), np.array([0, 0, 0, 1])))
Q_clean = np.array(Q, dtype=np.float64)
parameters = {
"Translation": trans,
"Rotation": rot,
"Transformation": T,
"Essential": essential,
"Fundamental": fundamental,
"MeanError": float(ret_stereo),
"SquareSize": square_size,
"BoardSize": board_size,
"Objpoints": objp,
"Q": Q_clean,
"num_pairs": len(pairs),
"L_Intrinsic": left_intrinsics["Intrinsic"],
"L_Distortion": left_intrinsics["Distortion"],
"L_DistortionIntrinsic": left_intrinsics["DistortionIntrinsic"],
"R_Intrinsic": right_intrinsics["Intrinsic"],
"R_Distortion": right_intrinsics["Distortion"],
"R_DistortionIntrinsic": right_intrinsics["DistortionIntrinsic"],
"L_Imgpoints": left_img_points,
"R_Imgpoints": right_img_points,
"R1": R1,
"R2": R2,
"P1": P1,
"P2": P2,
"image_size": image_size,
}
return parameters
def save_stereo_calibration(
input_path: str | Path,
pair_tag: str,
parameters: Dict,
*,
troubleshooting: bool = False,
) -> None:
params_dir = Path(input_path) / "params"
params_dir.mkdir(parents=True, exist_ok=True)
Q_clean = np.array(parameters["Q"], dtype=np.float64)
npz_path = params_dir / f"{pair_tag}_parameters.npz"
save_kwargs = {k: v for k, v in parameters.items() if k not in ("R1", "R2", "P1", "P2")}
np.savez(npz_path, **save_kwargs)
if troubleshooting:
print(f"[INFO] Saved NPZ → {npz_path}")
yaml_path = params_dir / f"{pair_tag}_stereo_cam_model.yaml"
fs = cv2.FileStorage(str(yaml_path), cv2.FILE_STORAGE_WRITE)
fs.write("L_DistortionIntrinsic", parameters["L_DistortionIntrinsic"])
fs.write("L_Intrinsic", parameters["L_Intrinsic"])
fs.write("L_Distortion", parameters["L_Distortion"])
fs.write("R_DistortionIntrinsic", parameters["R_DistortionIntrinsic"])
fs.write("R_Intrinsic", parameters["R_Intrinsic"])
fs.write("R_Distortion", parameters["R_Distortion"])
fs.write("Rotation", parameters["Transformation"][:3, :3])
fs.write("Translation", parameters["Transformation"][:3, 3:])
fs.write("Q", Q_clean)
fs.release()
if troubleshooting:
print(f"[INFO] Saved YAML → {yaml_path}")
cvstore_path = params_dir / f"{pair_tag}_Q.cvstore"
fs2 = cv2.FileStorage(str(cvstore_path), cv2.FILE_STORAGE_WRITE)
fs2.write("Q", Q_clean)
fs2.release()
if troubleshooting:
print(f"[INFO] Saved Q → {cvstore_path}")
def save_pairing_report(
input_path: str | Path,
pair_tag: str,
pairs: List[StereoPair],
) -> Path:
report_dir = Path(input_path) / "pairing_reports"
report_dir.mkdir(parents=True, exist_ok=True)
report_path = report_dir / f"{pair_tag}.txt"
lines = [
f"# stereo pairs for {pair_tag}",
f"# total={len(pairs)}",
"left_image\tright_image\tdelta_sec\tmethod",
]
for pair in pairs:
lines.append(
f"{pair.left.image_path.name}\t{pair.right.image_path.name}\t"
f"{pair.delta_sec:.6f}\t{pair.method}"
)
report_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
print(f"[INFO] Pairing report → {report_path}")
return report_path
def save_rectified_pairs(
input_path: str | Path,
pair_tag: str,
pairs: List[StereoPair],
parameters: Dict,
left_folder: str,
right_folder: str,
) -> None:
image_size = parameters["image_size"]
R1, R2, P1, P2 = parameters["R1"], parameters["R2"], parameters["P1"], parameters["P2"]
map_left = cv2.initUndistortRectifyMap(
parameters["L_Intrinsic"],
parameters["L_Distortion"],
R1,
P1,
image_size,
cv2.CV_32FC1,
)
map_right = cv2.initUndistortRectifyMap(
parameters["R_Intrinsic"],
parameters["R_Distortion"],
R2,
P2,
image_size,
cv2.CV_32FC1,
)
out_left = Path(input_path) / "rectified" / pair_tag / left_folder
out_right = Path(input_path) / "rectified" / pair_tag / right_folder
out_left.mkdir(parents=True, exist_ok=True)
out_right.mkdir(parents=True, exist_ok=True)
saved = 0
for pair in pairs:
left_img = cv2.imread(str(pair.left.image_path))
right_img = cv2.imread(str(pair.right.image_path))
if left_img is None or right_img is None:
continue
left_rect = cv2.remap(left_img, map_left[0], map_left[1], cv2.INTER_LINEAR)
right_rect = cv2.remap(right_img, map_right[0], map_right[1], cv2.INTER_LINEAR)
cv2.imwrite(str(out_left / pair.left.image_path.name), left_rect)
cv2.imwrite(str(out_right / pair.right.image_path.name), right_rect)
saved += 1
print(f"[INFO] Rectified {saved}/{len(pairs)} pairs → {out_left.parent}")
def run_stereo_calibration(
input_path: str | Path,
left_camera: str,
mono_results: Dict[str, Dict],
board_sizes: Dict[str, Tuple[int, int]],
square_sizes: Dict[str, float],
time_window_sec: float = 0.1,
partners: Tuple[str, ...] = STEREO_PARTNERS,
troubleshooting: bool = False,
) -> None:
session_root = resolve_session_root(input_path)
left_cam = discover_camera_folder(session_root, left_camera)
if left_cam is None:
raise FileNotFoundError(f"Left camera folder {left_camera!r} not found")
if left_camera not in mono_results:
raise RuntimeError(
f"No mono intrinsics for {left_camera}. Run mono calibration first."
)
left_records = load_folder_features(left_cam.path)
left_board = board_sizes[left_camera]
left_square = square_sizes[left_camera]
image_size = mono_results[left_camera]["image_size"]
for partner in partners:
right_cam = discover_camera_folder(session_root, partner)
if right_cam is None:
if troubleshooting:
print(f"[SKIP stereo:{left_camera}-{partner}] folder not found")
continue
if partner not in mono_results:
if troubleshooting:
print(
f"[SKIP stereo:{left_camera}-{partner}] "
f"no mono intrinsics for {partner}"
)
continue
right_records = load_folder_features(right_cam.path)
pairs = build_stereo_pairs(left_records, right_records, time_window_sec)
pair_tag = f"{left_camera}-{partner}"
if not pairs:
if troubleshooting:
print(
f"[SKIP stereo:{pair_tag}] no valid pairs "
f"(time_window={time_window_sec}s)"
)
continue
time_n = sum(1 for p in pairs if p.method == "time_window")
key_n = sum(1 for p in pairs if p.method == "pair_key")
if troubleshooting:
print(
f"[stereo:{pair_tag}] {len(pairs)} pairs "
f"(time_window={time_n}, pair_key={key_n})"
)
save_pairing_report(input_path, pair_tag, pairs)
try:
params = calibrate_stereo_pair(
pairs,
mono_results[left_camera],
mono_results[partner],
left_board,
left_square,
image_size,
)
save_stereo_calibration(
input_path, pair_tag, params, troubleshooting=troubleshooting
)
print(
f"[stereo:{pair_tag}] pairs={params['num_pairs']} "
f"reproj_err={params['MeanError']:.4f}"
)
if troubleshooting:
save_rectified_pairs(
input_path,
pair_tag,
pairs,
params,
left_cam.folder_name,
right_cam.folder_name,
)
except RuntimeError as exc:
if troubleshooting:
print(f"[FAIL stereo:{pair_tag}] {exc}")
else:
print(f"[stereo:{pair_tag}] failed")