425 lines
14 KiB
Python
425 lines
14 KiB
Python
"""Step 2: mono and stereo calibration from per-image JSON feature files."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
import cv2
|
|
import numpy as np
|
|
|
|
from calibrationclasses.feature_json import FeatureRecord, load_folder_features
|
|
from calibrationclasses.pairing import StereoPair, build_stereo_pairs
|
|
from calibrationclasses.session import (
|
|
CameraFolder,
|
|
discover_camera_folder,
|
|
resolve_session_root,
|
|
STEREO_PARTNERS,
|
|
)
|
|
|
|
|
|
def create_3d_board_points(board_size: Tuple[int, int], square_size: float) -> np.ndarray:
|
|
pts = np.zeros((np.prod(board_size), 3), np.float32)
|
|
pts[:, :2] = np.indices(board_size).T.reshape(-1, 2)
|
|
pts *= square_size
|
|
return pts
|
|
|
|
|
|
def _image_size_from_records(records: List[FeatureRecord]) -> Tuple[int, int]:
|
|
for record in records:
|
|
img = cv2.imread(str(record.image_path))
|
|
if img is not None:
|
|
return img.shape[1], img.shape[0]
|
|
raise RuntimeError("Could not determine image size from feature JSONs")
|
|
|
|
|
|
def calibrate_camera_intrinsics(
|
|
records: List[FeatureRecord],
|
|
board_size: Tuple[int, int],
|
|
square_size: float,
|
|
) -> Dict:
|
|
chess_records = [r for r in records if r.is_chessboard]
|
|
if len(chess_records) < 3:
|
|
raise RuntimeError(
|
|
f"Need at least 3 chessboard detections for mono calibration, got {len(chess_records)}"
|
|
)
|
|
|
|
image_size = _image_size_from_records(chess_records)
|
|
objp = create_3d_board_points(board_size, square_size)
|
|
obj_points = [objp for _ in chess_records]
|
|
img_points = [r.corners for r in chess_records]
|
|
|
|
ret, mtx, dist, rvecs, tvecs = cv2.calibrateCamera(
|
|
obj_points, img_points, image_size, None, None, flags=0
|
|
)
|
|
|
|
rmtx = []
|
|
tmtx = []
|
|
for k, r in enumerate(rvecs):
|
|
rmtx.append(cv2.Rodrigues(r)[0])
|
|
tmtx.append(np.vstack((np.hstack((rmtx[k], tvecs[k])), np.array([0, 0, 0, 1]))))
|
|
|
|
newmtx, roi = cv2.getOptimalNewCameraMatrix(mtx, dist, image_size, 1, image_size)
|
|
if np.sum(roi) == 0:
|
|
roi = (0, 0, image_size[0] - 1, image_size[1] - 1)
|
|
|
|
return {
|
|
"Intrinsic": mtx,
|
|
"Distortion": dist,
|
|
"DistortionROI": roi,
|
|
"DistortionIntrinsic": newmtx,
|
|
"RotVektor": rvecs,
|
|
"RotMatrix": rmtx,
|
|
"Extrinsics": tmtx,
|
|
"TransVektor": tvecs,
|
|
"MeanError": float(ret),
|
|
"image_size": image_size,
|
|
"num_views": len(chess_records),
|
|
}
|
|
|
|
|
|
def save_mono_intrinsics(
|
|
params_dir: Path,
|
|
camera_name: str,
|
|
intrinsics: Dict,
|
|
*,
|
|
troubleshooting: bool = False,
|
|
) -> None:
|
|
params_dir.mkdir(parents=True, exist_ok=True)
|
|
tag = camera_name.replace("/", "-")
|
|
npz_path = params_dir / f"{tag}_intrinsics.npz"
|
|
np.savez(
|
|
npz_path,
|
|
Intrinsic=intrinsics["Intrinsic"],
|
|
Distortion=intrinsics["Distortion"],
|
|
DistortionIntrinsic=intrinsics["DistortionIntrinsic"],
|
|
DistortionROI=intrinsics["DistortionROI"],
|
|
MeanError=intrinsics["MeanError"],
|
|
image_size=np.array(intrinsics["image_size"]),
|
|
num_views=intrinsics["num_views"],
|
|
)
|
|
yaml_path = params_dir / f"{tag}_intrinsics.yaml"
|
|
fs = cv2.FileStorage(str(yaml_path), cv2.FILE_STORAGE_WRITE)
|
|
fs.write("Intrinsic", intrinsics["Intrinsic"])
|
|
fs.write("Distortion", intrinsics["Distortion"])
|
|
fs.write("DistortionIntrinsic", intrinsics["DistortionIntrinsic"])
|
|
fs.release()
|
|
if troubleshooting:
|
|
print(f"[INFO] Saved mono intrinsics → {npz_path} and {yaml_path}")
|
|
|
|
|
|
def run_mono_calibration(
|
|
input_path: str | Path,
|
|
board_sizes: Dict[str, Tuple[int, int]],
|
|
square_sizes: Dict[str, float],
|
|
cameras: Optional[List[str]] = None,
|
|
troubleshooting: bool = False,
|
|
) -> Dict[str, Dict]:
|
|
session_root = resolve_session_root(input_path)
|
|
params_dir = Path(input_path) / "params"
|
|
results = {}
|
|
|
|
for logical_name, board_size in board_sizes.items():
|
|
if cameras and logical_name not in cameras:
|
|
continue
|
|
cam = discover_camera_folder(session_root, logical_name)
|
|
if cam is None:
|
|
continue
|
|
records = load_folder_features(cam.path)
|
|
square_size = square_sizes[logical_name]
|
|
try:
|
|
intrinsics = calibrate_camera_intrinsics(records, board_size, square_size)
|
|
save_mono_intrinsics(
|
|
params_dir, logical_name, intrinsics, troubleshooting=troubleshooting
|
|
)
|
|
results[logical_name] = intrinsics
|
|
print(
|
|
f"[mono:{logical_name}] views={intrinsics['num_views']} "
|
|
f"reproj_err={intrinsics['MeanError']:.4f}"
|
|
)
|
|
except RuntimeError as exc:
|
|
if troubleshooting:
|
|
print(f"[SKIP mono:{logical_name}] {exc}")
|
|
else:
|
|
print(f"[mono:{logical_name}] skipped")
|
|
|
|
return results
|
|
|
|
|
|
def calibrate_stereo_pair(
|
|
pairs: List[StereoPair],
|
|
left_intrinsics: Dict,
|
|
right_intrinsics: Dict,
|
|
board_size: Tuple[int, int],
|
|
square_size: float,
|
|
image_size: Tuple[int, int],
|
|
) -> Dict:
|
|
if not pairs:
|
|
raise RuntimeError("No stereo pairs available")
|
|
|
|
objp = create_3d_board_points(board_size, square_size)
|
|
obj_points = [objp for _ in pairs]
|
|
left_img_points = [p.left.corners for p in pairs]
|
|
right_img_points = [p.right.corners for p in pairs]
|
|
|
|
flags = cv2.CALIB_FIX_INTRINSIC
|
|
criteria = (cv2.TERM_CRITERIA_MAX_ITER + cv2.TERM_CRITERIA_EPS, 30, 0.001)
|
|
|
|
ret_stereo, _, _, _, _, rot, trans, essential, fundamental = cv2.stereoCalibrate(
|
|
obj_points,
|
|
left_img_points,
|
|
right_img_points,
|
|
left_intrinsics["Intrinsic"],
|
|
left_intrinsics["Distortion"],
|
|
right_intrinsics["Intrinsic"],
|
|
right_intrinsics["Distortion"],
|
|
image_size,
|
|
criteria=criteria,
|
|
flags=flags,
|
|
)
|
|
|
|
R1, R2, P1, P2, Q, roi1, roi2 = cv2.stereoRectify(
|
|
left_intrinsics["Intrinsic"],
|
|
left_intrinsics["Distortion"],
|
|
right_intrinsics["Intrinsic"],
|
|
right_intrinsics["Distortion"],
|
|
image_size,
|
|
rot,
|
|
trans,
|
|
flags=0,
|
|
alpha=1,
|
|
)
|
|
|
|
T = np.vstack((np.hstack((rot, trans)), np.array([0, 0, 0, 1])))
|
|
Q_clean = np.array(Q, dtype=np.float64)
|
|
|
|
parameters = {
|
|
"Translation": trans,
|
|
"Rotation": rot,
|
|
"Transformation": T,
|
|
"Essential": essential,
|
|
"Fundamental": fundamental,
|
|
"MeanError": float(ret_stereo),
|
|
"SquareSize": square_size,
|
|
"BoardSize": board_size,
|
|
"Objpoints": objp,
|
|
"Q": Q_clean,
|
|
"num_pairs": len(pairs),
|
|
"L_Intrinsic": left_intrinsics["Intrinsic"],
|
|
"L_Distortion": left_intrinsics["Distortion"],
|
|
"L_DistortionIntrinsic": left_intrinsics["DistortionIntrinsic"],
|
|
"R_Intrinsic": right_intrinsics["Intrinsic"],
|
|
"R_Distortion": right_intrinsics["Distortion"],
|
|
"R_DistortionIntrinsic": right_intrinsics["DistortionIntrinsic"],
|
|
"L_Imgpoints": left_img_points,
|
|
"R_Imgpoints": right_img_points,
|
|
"R1": R1,
|
|
"R2": R2,
|
|
"P1": P1,
|
|
"P2": P2,
|
|
"image_size": image_size,
|
|
}
|
|
return parameters
|
|
|
|
|
|
def save_stereo_calibration(
|
|
input_path: str | Path,
|
|
pair_tag: str,
|
|
parameters: Dict,
|
|
*,
|
|
troubleshooting: bool = False,
|
|
) -> None:
|
|
params_dir = Path(input_path) / "params"
|
|
params_dir.mkdir(parents=True, exist_ok=True)
|
|
Q_clean = np.array(parameters["Q"], dtype=np.float64)
|
|
|
|
npz_path = params_dir / f"{pair_tag}_parameters.npz"
|
|
save_kwargs = {k: v for k, v in parameters.items() if k not in ("R1", "R2", "P1", "P2")}
|
|
np.savez(npz_path, **save_kwargs)
|
|
if troubleshooting:
|
|
print(f"[INFO] Saved NPZ → {npz_path}")
|
|
|
|
yaml_path = params_dir / f"{pair_tag}_stereo_cam_model.yaml"
|
|
fs = cv2.FileStorage(str(yaml_path), cv2.FILE_STORAGE_WRITE)
|
|
fs.write("L_DistortionIntrinsic", parameters["L_DistortionIntrinsic"])
|
|
fs.write("L_Intrinsic", parameters["L_Intrinsic"])
|
|
fs.write("L_Distortion", parameters["L_Distortion"])
|
|
fs.write("R_DistortionIntrinsic", parameters["R_DistortionIntrinsic"])
|
|
fs.write("R_Intrinsic", parameters["R_Intrinsic"])
|
|
fs.write("R_Distortion", parameters["R_Distortion"])
|
|
fs.write("Rotation", parameters["Transformation"][:3, :3])
|
|
fs.write("Translation", parameters["Transformation"][:3, 3:])
|
|
fs.write("Q", Q_clean)
|
|
fs.release()
|
|
if troubleshooting:
|
|
print(f"[INFO] Saved YAML → {yaml_path}")
|
|
|
|
cvstore_path = params_dir / f"{pair_tag}_Q.cvstore"
|
|
fs2 = cv2.FileStorage(str(cvstore_path), cv2.FILE_STORAGE_WRITE)
|
|
fs2.write("Q", Q_clean)
|
|
fs2.release()
|
|
if troubleshooting:
|
|
print(f"[INFO] Saved Q → {cvstore_path}")
|
|
|
|
|
|
def save_pairing_report(
|
|
input_path: str | Path,
|
|
pair_tag: str,
|
|
pairs: List[StereoPair],
|
|
) -> Path:
|
|
report_dir = Path(input_path) / "pairing_reports"
|
|
report_dir.mkdir(parents=True, exist_ok=True)
|
|
report_path = report_dir / f"{pair_tag}.txt"
|
|
lines = [
|
|
f"# stereo pairs for {pair_tag}",
|
|
f"# total={len(pairs)}",
|
|
"left_image\tright_image\tdelta_sec\tmethod",
|
|
]
|
|
for pair in pairs:
|
|
lines.append(
|
|
f"{pair.left.image_path.name}\t{pair.right.image_path.name}\t"
|
|
f"{pair.delta_sec:.6f}\t{pair.method}"
|
|
)
|
|
report_path.write_text("\n".join(lines) + "\n", encoding="utf-8")
|
|
print(f"[INFO] Pairing report → {report_path}")
|
|
return report_path
|
|
|
|
|
|
def save_rectified_pairs(
|
|
input_path: str | Path,
|
|
pair_tag: str,
|
|
pairs: List[StereoPair],
|
|
parameters: Dict,
|
|
left_folder: str,
|
|
right_folder: str,
|
|
) -> None:
|
|
image_size = parameters["image_size"]
|
|
R1, R2, P1, P2 = parameters["R1"], parameters["R2"], parameters["P1"], parameters["P2"]
|
|
map_left = cv2.initUndistortRectifyMap(
|
|
parameters["L_Intrinsic"],
|
|
parameters["L_Distortion"],
|
|
R1,
|
|
P1,
|
|
image_size,
|
|
cv2.CV_32FC1,
|
|
)
|
|
map_right = cv2.initUndistortRectifyMap(
|
|
parameters["R_Intrinsic"],
|
|
parameters["R_Distortion"],
|
|
R2,
|
|
P2,
|
|
image_size,
|
|
cv2.CV_32FC1,
|
|
)
|
|
|
|
out_left = Path(input_path) / "rectified" / pair_tag / left_folder
|
|
out_right = Path(input_path) / "rectified" / pair_tag / right_folder
|
|
out_left.mkdir(parents=True, exist_ok=True)
|
|
out_right.mkdir(parents=True, exist_ok=True)
|
|
|
|
saved = 0
|
|
for pair in pairs:
|
|
left_img = cv2.imread(str(pair.left.image_path))
|
|
right_img = cv2.imread(str(pair.right.image_path))
|
|
if left_img is None or right_img is None:
|
|
continue
|
|
left_rect = cv2.remap(left_img, map_left[0], map_left[1], cv2.INTER_LINEAR)
|
|
right_rect = cv2.remap(right_img, map_right[0], map_right[1], cv2.INTER_LINEAR)
|
|
cv2.imwrite(str(out_left / pair.left.image_path.name), left_rect)
|
|
cv2.imwrite(str(out_right / pair.right.image_path.name), right_rect)
|
|
saved += 1
|
|
print(f"[INFO] Rectified {saved}/{len(pairs)} pairs → {out_left.parent}")
|
|
|
|
|
|
def run_stereo_calibration(
|
|
input_path: str | Path,
|
|
left_camera: str,
|
|
mono_results: Dict[str, Dict],
|
|
board_sizes: Dict[str, Tuple[int, int]],
|
|
square_sizes: Dict[str, float],
|
|
time_window_sec: float = 0.1,
|
|
partners: Tuple[str, ...] = STEREO_PARTNERS,
|
|
troubleshooting: bool = False,
|
|
) -> None:
|
|
session_root = resolve_session_root(input_path)
|
|
left_cam = discover_camera_folder(session_root, left_camera)
|
|
if left_cam is None:
|
|
raise FileNotFoundError(f"Left camera folder {left_camera!r} not found")
|
|
|
|
if left_camera not in mono_results:
|
|
raise RuntimeError(
|
|
f"No mono intrinsics for {left_camera}. Run mono calibration first."
|
|
)
|
|
|
|
left_records = load_folder_features(left_cam.path)
|
|
left_board = board_sizes[left_camera]
|
|
left_square = square_sizes[left_camera]
|
|
image_size = mono_results[left_camera]["image_size"]
|
|
|
|
for partner in partners:
|
|
right_cam = discover_camera_folder(session_root, partner)
|
|
if right_cam is None:
|
|
if troubleshooting:
|
|
print(f"[SKIP stereo:{left_camera}-{partner}] folder not found")
|
|
continue
|
|
if partner not in mono_results:
|
|
if troubleshooting:
|
|
print(
|
|
f"[SKIP stereo:{left_camera}-{partner}] "
|
|
f"no mono intrinsics for {partner}"
|
|
)
|
|
continue
|
|
|
|
right_records = load_folder_features(right_cam.path)
|
|
pairs = build_stereo_pairs(left_records, right_records, time_window_sec)
|
|
pair_tag = f"{left_camera}-{partner}"
|
|
|
|
if not pairs:
|
|
if troubleshooting:
|
|
print(
|
|
f"[SKIP stereo:{pair_tag}] no valid pairs "
|
|
f"(time_window={time_window_sec}s)"
|
|
)
|
|
continue
|
|
|
|
time_n = sum(1 for p in pairs if p.method == "time_window")
|
|
key_n = sum(1 for p in pairs if p.method == "pair_key")
|
|
if troubleshooting:
|
|
print(
|
|
f"[stereo:{pair_tag}] {len(pairs)} pairs "
|
|
f"(time_window={time_n}, pair_key={key_n})"
|
|
)
|
|
save_pairing_report(input_path, pair_tag, pairs)
|
|
|
|
try:
|
|
params = calibrate_stereo_pair(
|
|
pairs,
|
|
mono_results[left_camera],
|
|
mono_results[partner],
|
|
left_board,
|
|
left_square,
|
|
image_size,
|
|
)
|
|
save_stereo_calibration(
|
|
input_path, pair_tag, params, troubleshooting=troubleshooting
|
|
)
|
|
print(
|
|
f"[stereo:{pair_tag}] pairs={params['num_pairs']} "
|
|
f"reproj_err={params['MeanError']:.4f}"
|
|
)
|
|
if troubleshooting:
|
|
save_rectified_pairs(
|
|
input_path,
|
|
pair_tag,
|
|
pairs,
|
|
params,
|
|
left_cam.folder_name,
|
|
right_cam.folder_name,
|
|
)
|
|
except RuntimeError as exc:
|
|
if troubleshooting:
|
|
print(f"[FAIL stereo:{pair_tag}] {exc}")
|
|
else:
|
|
print(f"[stereo:{pair_tag}] failed")
|