Files

156 lines
5.6 KiB
Python

"""
Pipeline runner: disparity map → point cloud.
Reads disparity maps from the project folder structure and saves
PLY point clouds using pointcloud_genration.py functions (.txt with --troubleshooting).
Supported modes:
sgm → 03_sgm_disp_map/disparity.xml → 05_sgm_pcl/
zncc → 04_zncc_disp_map/disparity.npy → 06_zncc_pcl/
both → runs both above (default)
Q matrix is taken from each session's own params_link folder:
<processing_dir>/<project>/<date>/<session>/params_link/lc-rc_Q.cvstore
"""
import sys
import argparse
from pathlib import Path
# Resolve config.py from ~/Speckle-Scanner regardless of CWD
sys.path.insert(0, str(Path.home() / "Speckle-Scanner"))
# Resolve pointcloud_genration.py from the same directory as this script
sys.path.insert(0, str(Path(__file__).parent))
import config
from pointcloud_genration import (
disparity_to_pointcloud,
pointcloud_with_depth_colors,
save_ply,
save_ascii_point_cloud,
)
MODE_SGM = "sgm"
MODE_ZNCC = "zncc"
MODE_BOTH = "both"
SGM_DISP_FOLDER = "03_sgm_disp_map"
ZNCC_DISP_FOLDER = "04_zncc_disp_map"
SGM_PCL_FOLDER = "05_sgm_pcl"
ZNCC_PCL_FOLDER = "06_zncc_pcl"
def run_one(disp_path, q_path, out_dir, label, troubleshooting=False):
"""Convert one disparity file to a point cloud and save PLY (+ TXT if troubleshooting)."""
if not disp_path.exists():
print(f" [{label}] SKIP — disparity not found: {disp_path.name}")
return False
if not q_path.exists():
print(f" [{label}] SKIP — Q matrix not found: {q_path}")
return False
print(f" [{label}] {disp_path.name}{out_dir.name}/")
points_xyz = disparity_to_pointcloud(disp_path, q_path)
point_cloud = pointcloud_with_depth_colors(points_xyz)
out_dir.mkdir(parents=True, exist_ok=True)
save_ply(str(out_dir / "Point_cloud.ply"), point_cloud)
if troubleshooting:
save_ascii_point_cloud(str(out_dir / "Point_cloud.txt"), point_cloud)
print(f" [{label}] {points_xyz.shape[0]} points → {out_dir.name}/Point_cloud.ply")
if troubleshooting:
print(f" [{label}] ASCII copy → {out_dir.name}/Point_cloud.txt")
return True
def run_scan(project, date, session, scan, modes, troubleshooting=False):
base = config.PROCESSING_DIR / project / date / session / scan
q_file = config.get_params_link_dir(project, date, session) / "lc-rc_Q.cvstore"
print(f"\n{'='*60}")
print(f"[SCAN] {session}/{scan}")
print(f"{'='*60}")
results = []
if MODE_SGM in modes:
disp = base / SGM_DISP_FOLDER / "disparity.xml"
out = config.get_processing_step_dir(project, date, session, scan, SGM_PCL_FOLDER)
results.append(run_one(disp, q_file, out, "SGM", troubleshooting))
if MODE_ZNCC in modes:
disp = base / ZNCC_DISP_FOLDER / "disparity.npy"
out = config.get_processing_step_dir(project, date, session, scan, ZNCC_PCL_FOLDER)
results.append(run_one(disp, q_file, out, "ZNCC", troubleshooting))
success = all(results)
print(f"[{'DONE' if success else 'PARTIAL'}] {session}/{scan}")
return success
def run_session(project, date, session, scan_arg, modes, troubleshooting=False):
if scan_arg:
scans = [scan_arg]
else:
scans = config.list_scan_dirs(project, date, session)
if not scans:
print(f"[WARN] No scan folders found in {project}/{date}/{session}")
return [], []
print(f"\n Session {session}: {len(scans)} scan(s) found")
failed = []
for scan in scans:
ok = run_scan(project, date, session, scan, modes, troubleshooting)
if not ok:
failed.append(f"{session}/{scan}")
return scans, failed
def main():
parser = argparse.ArgumentParser(
description="Point cloud pipeline — converts disparity maps to PLY using project folder structure"
)
parser.add_argument("--project", required=True, help="Project name (e.g. Olsen_wings)")
parser.add_argument("--date", required=True, help="Date string (e.g. 2026-05-12)")
parser.add_argument("--session", default=None, help="Session name (e.g. session1); omit to process ALL sessions on that date")
parser.add_argument("--scan", default=None, help="Single scan (e.g. Scan000001); omit to process all scans in the session")
parser.add_argument("--mode", default=MODE_BOTH, choices=[MODE_SGM, MODE_ZNCC, MODE_BOTH],
help="Which disparity source to convert: sgm, zncc, or both (default: both)")
parser.add_argument(
"--troubleshooting",
action="store_true",
help="Also save ASCII Point_cloud.txt (default: PLY only)",
)
args = parser.parse_args()
modes = {MODE_SGM, MODE_ZNCC} if args.mode == MODE_BOTH else {args.mode}
# Determine sessions to process
if args.session:
sessions = [args.session]
else:
sessions = config.list_session_dirs(args.project, args.date)
if not sessions:
print(f"No session folders found under {args.project}/{args.date}")
raise SystemExit(1)
print(f"Found {len(sessions)} session(s): {sessions}")
total_scans = 0
all_failed = []
for session in sessions:
scans, failed = run_session(
args.project, args.date, session, args.scan, modes, args.troubleshooting
)
total_scans += len(scans)
all_failed.extend(failed)
print(f"\n{'='*60}")
print(f"Finished: {total_scans - len(all_failed)}/{total_scans} scans succeeded.")
if all_failed:
print(f"Partial/failed: {all_failed}")
raise SystemExit(1)
if __name__ == "__main__":
main()