""" Pipeline runner for ZNCC stereo disparity. Resolves all paths from the project folder structure and drives stereo_disparity_main.py for each scan in a session (or all sessions in a date). Output layout per scan: ///// 02_rect_images/ <- input (must exist) 04_zncc_disp_map/ <- disparity.npy + Disparity_map_colorbar.png (always) + vertical shift & correlation maps (troubleshooting only) 06_zncc_pcl/ <- Point_cloud.ply + .txt (troubleshooting only) Q matrix is read from: ////params_link/lc-rc_Q.cvstore """ import sys import argparse import subprocess from pathlib import Path # Resolve config.py relative to ~/Speckle-Scanner regardless of CWD sys.path.insert(0, str(Path.home() / "Speckle-Scanner")) import config # noqa: E402 SCRIPT = Path(__file__).parent / "stereo_disparity_main.py" def build_cmd(rect_dir, q_file, disp_out, pcl_out, zncc_args, troubleshooting): cmd = [ sys.executable, str(SCRIPT), "--left_dir", str(rect_dir), "--right_dir", str(rect_dir), "--left_prefix", "lc_", "--right_prefix", "rc_", "--q_file", str(q_file), "--disp_output_dir", str(disp_out), "--pcl_output_dir", str(pcl_out), ] if troubleshooting: cmd.append("--troubleshooting") # Forward ZNCC tuning params for key, val in zncc_args.items(): if val is not None: cmd += [f"--{key}", str(val)] return cmd def run_scan(project, date, session, scan, zncc_args, troubleshooting): rect_dir = config.PROCESSING_DIR / project / date / session / scan / "02_rect_images" q_file = config.get_params_link_dir(project, date, session) / "lc-rc_Q.cvstore" disp_out = config.get_processing_step_dir(project, date, session, scan, "04_zncc_disp_map") pcl_out = config.get_processing_step_dir(project, date, session, scan, "06_zncc_pcl") if not rect_dir.exists(): print(f"[SKIP] {session}/{scan}: 02_rect_images not found at {rect_dir}") return False if not q_file.exists(): print(f"[SKIP] {session}/{scan}: Q matrix not found at {q_file}") return False mode = "troubleshooting" if troubleshooting else "disparity-only" print(f"\n{'='*60}") print(f"[SCAN] {session}/{scan} [{mode}]") print(f" rect : {rect_dir}") print(f" Q : {q_file}") print(f" disp : {disp_out}") if troubleshooting: print(f" pcl : {pcl_out}") print(f"{'='*60}") cmd = build_cmd(rect_dir, q_file, disp_out, pcl_out, zncc_args, troubleshooting) result = subprocess.run(cmd) if result.returncode != 0: print(f"[FAIL] {session}/{scan} exited with code {result.returncode}") return False print(f"[DONE] {session}/{scan}") return True def run_session(project, date, session, scan_arg, zncc_args, troubleshooting): if scan_arg: scans = [scan_arg] else: scans = config.list_scan_dirs(project, date, session) if not scans: print(f"[WARN] No scan folders found in {project}/{date}/{session}") return [], [] print(f"\n Session {session}: {len(scans)} scan(s) found") failed = [] for scan in scans: ok = run_scan(project, date, session, scan, zncc_args, troubleshooting) if not ok: failed.append(f"{session}/{scan}") return scans, failed def main(): parser = argparse.ArgumentParser( description="ZNCC disparity pipeline runner — resolves paths from project structure" ) # Project location parser.add_argument("--project", required=True, help="Project name (e.g. Olsen_wings)") parser.add_argument("--date", required=True, help="Date string (e.g. 2026-05-12)") parser.add_argument("--session", default=None, help="Session name (e.g. session1); omit to process ALL sessions on that date") parser.add_argument("--scan", default=None, help="Single scan to process (e.g. Scan000001); omit to process all scans in the session") # Output mode parser.add_argument("--troubleshooting", action="store_true", help="Save all outputs (vertical shift map, correlation map, point cloud PLY/TXT); " "by default only disparity map (npy + png) is saved") # ZNCC tuning — all optional, forwarded to stereo_disparity_main.py parser.add_argument( "--num_images", type=int, default=None, help="Stereo pairs per scan. Default: all matched pairs. " "If fewer than available, uses the last N (highest timestamps).", ) parser.add_argument("--window_size", type=int, default=None) parser.add_argument("--H_neg_range", type=int, default=None) parser.add_argument("--H_pos_range", type=int, default=None) parser.add_argument("--v_neg_range", type=int, default=None) parser.add_argument("--v_pos_range", type=int, default=None) parser.add_argument("--zncc_threshold", type=float, default=None) parser.add_argument("--noise_filter", type=str, default=None, choices=["median", "open3d"]) parser.add_argument("--interpolation", type=str, default=None) parser.add_argument("--noise_remove", type=str, default=None) parser.add_argument("--method", type=str, default=None, choices=["parabolic", "gaussian", "equiangular"]) args = parser.parse_args() zncc_args = { "num_images": args.num_images, "window_size": args.window_size, "H_neg_range": args.H_neg_range, "H_pos_range": args.H_pos_range, "v_neg_range": args.v_neg_range, "v_pos_range": args.v_pos_range, "zncc_threshold": args.zncc_threshold, "noise_filter": args.noise_filter, "interpolation": args.interpolation, "noise_remove": args.noise_remove, "method": args.method, } # Determine sessions to process if args.session: sessions = [args.session] else: sessions = config.list_session_dirs(args.project, args.date) if not sessions: print(f"No session folders found under {args.project}/{args.date}") sys.exit(1) print(f"Found {len(sessions)} session(s): {sessions}") total_scans = 0 all_failed = [] for session in sessions: scans, failed = run_session( args.project, args.date, session, args.scan, zncc_args, args.troubleshooting ) total_scans += len(scans) all_failed.extend(failed) print(f"\n{'='*60}") print(f"Finished: {total_scans - len(all_failed)}/{total_scans} scans succeeded.") if all_failed: print(f"Failed: {all_failed}") sys.exit(1) if __name__ == "__main__": main()