""" Pipeline runner for libSGM stereo disparity. Resolves all paths from the project folder structure and drives the stereosgm_new binary for each scan in a session (or all sessions on a date). For each scan it takes the LAST matched lc_/rc_ image pair from 02_rect_images/ (images sorted by timestamp — highest timestamp = last acquired image). Output layout per scan: ///// 02_rect_images/ <- input (lc_ts.png + rc_ts.png) 03_sgm_disp_map/ <- disparity.xml + disparity_color.png (created here) 05_sgm_pcl/ <- untouched Binary: ~/Speckle-Scanner/05_disparity/libsgm/build/sample/stereosgm_new """ import sys import re import argparse import subprocess from pathlib import Path # Resolve config.py from ~/Speckle-Scanner regardless of CWD sys.path.insert(0, str(Path.home() / "Speckle-Scanner")) import config # noqa: E402 BINARY = Path(__file__).parent / "build" / "sample" / "stereosgm_new" def extract_ts_token(filename, prefix="lc_"): """Extract ts token from lc_ts1634840093.png or lc_ts1634840093_ck....png.""" m = re.search(rf"^{re.escape(prefix)}(ts\d+)", filename, re.IGNORECASE) if not m: return None, None ts_token = m.group(1).lower() ts_int = int(re.search(r"\d+", ts_token).group()) return ts_token, ts_int def find_rc_for_ts(rect_dir, ts_token): """Match rc image by shared ts token (ck suffix optional).""" rc_matches = sorted(rect_dir.glob(f"rc_{ts_token}_*.png")) if not rc_matches: rc_matches = sorted(rect_dir.glob(f"rc_{ts_token}*.png")) return rc_matches[0] if rc_matches else None def find_last_lc_rc_pair(rect_dir): """Return (lc_path, rc_path) for the highest-timestamp matched pair in rect_dir.""" rect_dir = Path(rect_dir) pairs = [] for lc in rect_dir.glob("lc_ts*.png"): ts_token, ts_int = extract_ts_token(lc.name, "lc_") if ts_token is None: continue rc = find_rc_for_ts(rect_dir, ts_token) if rc is None: continue pairs.append((ts_int, lc, rc)) if not pairs: return None, None pairs.sort(key=lambda item: item[0]) _, lc, rc = pairs[-1] return lc, rc def build_cmd(lc, rc, output_dir, sgm_args): cmd = [ str(BINARY), str(lc), str(rc), f"--output_dir={output_dir}", "--no_display=1", ] for key, val in sgm_args.items(): if val is not None: cmd.append(f"--{key}={val}") return cmd def run_scan(project, date, session, scan, sgm_args): rect_dir = config.PROCESSING_DIR / project / date / session / scan / "02_rect_images" if not rect_dir.exists(): print(f"[SKIP] {session}/{scan}: 02_rect_images not found at {rect_dir}") return False lc, rc = find_last_lc_rc_pair(rect_dir) if lc is None: print(f"[SKIP] {session}/{scan}: no lc_ts*.png images found in {rect_dir}") return False if rc is None: print(f"[SKIP] {session}/{scan}: no matching rc image for {lc.name}") return False output_dir = config.get_processing_step_dir(project, date, session, scan, "03_sgm_disp_map") print(f"\n{'='*60}") print(f"[SCAN] {session}/{scan}") print(f" lc : {lc.name}") print(f" rc : {rc.name}") print(f" output : {output_dir}") print(f"{'='*60}") cmd = build_cmd(lc, rc, output_dir, sgm_args) result = subprocess.run(cmd) if result.returncode != 0: print(f"[FAIL] {session}/{scan} exited with code {result.returncode}") return False print(f"[DONE] {session}/{scan}") return True def run_session(project, date, session, scan_arg, sgm_args): if scan_arg: scans = [scan_arg] else: scans = config.list_scan_dirs(project, date, session) if not scans: print(f"[WARN] No scan folders found in {project}/{date}/{session}") return [], [] print(f"\n Session {session}: {len(scans)} scan(s) found") failed = [] for scan in scans: ok = run_scan(project, date, session, scan, sgm_args) if not ok: failed.append(f"{session}/{scan}") return scans, failed def main(): parser = argparse.ArgumentParser( description="libSGM disparity pipeline runner — resolves paths from project structure" ) # Project location parser.add_argument("--project", required=True, help="Project name (e.g. Olsen_wings)") parser.add_argument("--date", required=True, help="Date string (e.g. 2026-05-12)") parser.add_argument("--session", default=None, help="Session name (e.g. session1); omit to process ALL sessions on that date") parser.add_argument("--scan", default=None, help="Single scan (e.g. Scan000001); omit to process all scans in the session") # SGM parameters — all optional, forwarded to stereosgm_new parser.add_argument("--disp_size", type=int, default=None, help="Maximum disparity value (64, 128, or 256; default 256)") parser.add_argument("--P1", type=int, default=None, help="SGM penalty for disparity change of ±1 (default 10)") parser.add_argument("--P2", type=int, default=None, help="SGM penalty for disparity change >1 (default 120)") parser.add_argument("--uniqueness", type=float, default=None, help="Uniqueness ratio threshold (default 0.80)") parser.add_argument("--num_paths", type=int, default=None, choices=[4, 8], help="Scanlines for cost aggregation: 4 or 8 (default 8)") parser.add_argument("--min_disp", type=int, default=None, help="Minimum disparity value (default -160)") parser.add_argument("--LR_max_diff", type=int, default=None, help="Max left-right disparity difference (default 1)") parser.add_argument("--census_type", type=int, default=None, choices=[0, 1], help="Census transform type: 0=CENSUS_9x7, 1=SYMMETRIC_CENSUS_9x7 (default 1)") args = parser.parse_args() if not BINARY.exists(): print(f"ERROR: stereosgm_new binary not found at {BINARY}") print("Build it first: cd ~/Speckle-Scanner/05_disparity/libsgm/build && make stereosgm_new") sys.exit(1) sgm_args = { "disp_size": args.disp_size, "P1": args.P1, "P2": args.P2, "uniqueness": args.uniqueness, "num_paths": args.num_paths, "min_disp": args.min_disp, "LR_max_diff": args.LR_max_diff, "census_type": args.census_type, } # Determine sessions to process if args.session: sessions = [args.session] else: sessions = config.list_session_dirs(args.project, args.date) if not sessions: print(f"No session folders found under {args.project}/{args.date}") sys.exit(1) print(f"Found {len(sessions)} session(s): {sessions}") total_scans = 0 all_failed = [] for session in sessions: scans, failed = run_session( args.project, args.date, session, args.scan, sgm_args ) total_scans += len(scans) all_failed.extend(failed) print(f"\n{'='*60}") print(f"Finished: {total_scans - len(all_failed)}/{total_scans} scans succeeded.") if all_failed: print(f"Failed: {all_failed}") sys.exit(1) if __name__ == "__main__": main()