| from pathlib import Path |
| import argparse |
| import json |
| import os |
| import pickle |
| import shutil |
| import signal |
| import subprocess |
| import sys |
| import time |
| from typing import Dict, List, Optional, Sequence, Tuple |
|
|
| import numpy as np |
| import pandas as pd |
|
|
|
|
| PROJECT_ROOT = Path(__file__).resolve().parents[1] |
| if str(PROJECT_ROOT) not in sys.path: |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
| from rr_label_study.oven_study import ( |
| MotionTemplates, |
| _aggregate_summary, |
| _annotate_phase_columns, |
| _episode_metrics_from_frames, |
| _keyframe_subset, |
| _keypoint_discovery, |
| _load_demo, |
| _load_descriptions, |
| ) |
|
|
|
|
| def _launch_xvfb(display_num: int, log_path: Path) -> subprocess.Popen: |
| log_handle = log_path.open("w", encoding="utf-8") |
| return subprocess.Popen( |
| [ |
| "Xvfb", |
| f":{display_num}", |
| "-screen", |
| "0", |
| "1280x1024x24", |
| "+extension", |
| "GLX", |
| "+render", |
| "-noreset", |
| ], |
| stdout=log_handle, |
| stderr=subprocess.STDOUT, |
| start_new_session=True, |
| ) |
|
|
|
|
| def _wait_for_display(display_num: int, timeout_s: float = 10.0) -> None: |
| deadline = time.time() + timeout_s |
| while time.time() < deadline: |
| result = subprocess.run( |
| ["xdpyinfo", "-display", f":{display_num}"], |
| stdout=subprocess.DEVNULL, |
| stderr=subprocess.DEVNULL, |
| check=False, |
| ) |
| if result.returncode == 0: |
| return |
| time.sleep(0.25) |
| raise RuntimeError(f"display :{display_num} did not become ready") |
|
|
|
|
| def _stop_process(process: Optional[subprocess.Popen]) -> None: |
| if process is None or process.poll() is not None: |
| return |
| try: |
| os.killpg(process.pid, signal.SIGTERM) |
| except ProcessLookupError: |
| return |
| try: |
| process.wait(timeout=10) |
| except subprocess.TimeoutExpired: |
| try: |
| os.killpg(process.pid, signal.SIGKILL) |
| except ProcessLookupError: |
| pass |
|
|
|
|
| def _spawn_pregrasp_batch_job( |
| display_num: int, |
| episode_dir: Path, |
| templates_pkl: Path, |
| frame_indices: Sequence[int], |
| checkpoint_stride: int, |
| output_dir: Path, |
| log_path: Path, |
| ) -> subprocess.Popen: |
| runtime_dir = Path(f"/tmp/rr_label_study_pregrasp_display_{display_num}") |
| runtime_dir.mkdir(parents=True, exist_ok=True) |
| env = os.environ.copy() |
| env["DISPLAY"] = f":{display_num}" |
| env["COPPELIASIM_ROOT"] = "/workspace/coppelia_sim" |
| env["LD_LIBRARY_PATH"] = f"/workspace/coppelia_sim:{env.get('LD_LIBRARY_PATH', '')}" |
| env["QT_QPA_PLATFORM_PLUGIN_PATH"] = "/workspace/coppelia_sim" |
| env["XDG_RUNTIME_DIR"] = str(runtime_dir) |
| env["PYTHONUNBUFFERED"] = "1" |
| env["OMP_NUM_THREADS"] = "1" |
| env["OPENBLAS_NUM_THREADS"] = "1" |
| env["MKL_NUM_THREADS"] = "1" |
| env["NUMEXPR_NUM_THREADS"] = "1" |
| log_handle = log_path.open("w", encoding="utf-8") |
| return subprocess.Popen( |
| [ |
| sys.executable, |
| str(PROJECT_ROOT.joinpath("scripts", "run_oven_pregrasp_batch.py")), |
| "--episode-dir", |
| str(episode_dir), |
| "--templates-pkl", |
| str(templates_pkl), |
| "--frame-indices", |
| *[str(frame_index) for frame_index in frame_indices], |
| "--checkpoint-stride", |
| str(checkpoint_stride), |
| "--output-dir", |
| str(output_dir), |
| ], |
| stdout=log_handle, |
| stderr=subprocess.STDOUT, |
| cwd=str(PROJECT_ROOT), |
| env=env, |
| start_new_session=True, |
| ) |
|
|
|
|
| def _chunk_frame_indices(frame_indices: Sequence[int], num_workers: int) -> List[List[int]]: |
| if not frame_indices: |
| return [] |
| worker_count = min(max(1, num_workers), len(frame_indices)) |
| return [ |
| [int(index) for index in chunk.tolist()] |
| for chunk in np.array_split(np.asarray(frame_indices, dtype=int), worker_count) |
| if len(chunk) |
| ] |
|
|
|
|
| def _load_interventions(metrics_path: Path) -> Dict[str, float]: |
| payload = json.loads(metrics_path.read_text()) |
| return { |
| key: float(value) |
| for key, value in payload.items() |
| if key.startswith("pre_ready_") or key.startswith("post_ready_") |
| } |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--episode-dir", required=True) |
| parser.add_argument("--input-dense-csv", required=True) |
| parser.add_argument("--input-metrics-json", required=True) |
| parser.add_argument("--templates-json", required=True) |
| parser.add_argument("--output-dir", required=True) |
| parser.add_argument("--checkpoint-stride", type=int, default=16) |
| parser.add_argument("--num-workers", type=int, default=8) |
| parser.add_argument("--base-display", type=int, default=500) |
| parser.add_argument("--stagger-seconds", type=float, default=0.1) |
| parser.add_argument("--keep-frame-json", action="store_true") |
| args = parser.parse_args() |
|
|
| episode_dir = Path(args.episode_dir) |
| input_dense_csv = Path(args.input_dense_csv) |
| input_metrics_json = Path(args.input_metrics_json) |
| templates_json = Path(args.templates_json) |
| output_dir = Path(args.output_dir) |
| output_dir.mkdir(parents=True, exist_ok=True) |
|
|
| base_df = pd.read_csv(input_dense_csv) |
| demo = _load_demo(episode_dir) |
| descriptions = _load_descriptions(episode_dir) |
| num_frames = min(len(demo), len(base_df)) |
| frame_indices = list(range(num_frames)) |
| interventions = _load_interventions(input_metrics_json) |
|
|
| template_payload = json.loads(templates_json.read_text()) |
| templates = MotionTemplates.from_json(template_payload["templates"]) |
| with output_dir.joinpath("templates.json").open("w", encoding="utf-8") as handle: |
| json.dump(template_payload, handle, indent=2) |
| templates_pkl = output_dir.joinpath("templates.pkl") |
| with templates_pkl.open("wb") as handle: |
| pickle.dump(templates, handle) |
|
|
| frame_json_dir = output_dir.joinpath("pregrasp_rows") |
| frame_json_dir.mkdir(parents=True, exist_ok=True) |
| pending_frame_indices = [ |
| frame_index |
| for frame_index in frame_indices |
| if not frame_json_dir.joinpath(f"frame_{frame_index:04d}.json").exists() |
| ] |
| frame_chunks = _chunk_frame_indices(pending_frame_indices, args.num_workers) |
| displays = [args.base_display + index for index in range(len(frame_chunks))] |
| xvfb_procs: List[subprocess.Popen] = [] |
| active: Dict[int, Tuple[List[int], subprocess.Popen]] = {} |
|
|
| try: |
| for display_num in displays: |
| xvfb = _launch_xvfb(display_num, output_dir.joinpath(f"xvfb_{display_num}.log")) |
| xvfb_procs.append(xvfb) |
| for display_num in displays: |
| _wait_for_display(display_num) |
|
|
| for display_num, frame_chunk in zip(displays, frame_chunks): |
| process = _spawn_pregrasp_batch_job( |
| display_num=display_num, |
| episode_dir=episode_dir, |
| templates_pkl=templates_pkl, |
| frame_indices=frame_chunk, |
| checkpoint_stride=args.checkpoint_stride, |
| output_dir=frame_json_dir, |
| log_path=output_dir.joinpath(f"worker_{display_num}.log"), |
| ) |
| active[display_num] = (frame_chunk, process) |
| if args.stagger_seconds > 0: |
| time.sleep(args.stagger_seconds) |
|
|
| while active: |
| time.sleep(1.0) |
| finished: List[int] = [] |
| for display_num, (frame_chunk, process) in active.items(): |
| return_code = process.poll() |
| if return_code is None: |
| continue |
| missing = [ |
| frame_index |
| for frame_index in frame_chunk |
| if not frame_json_dir.joinpath(f"frame_{frame_index:04d}.json").exists() |
| ] |
| if return_code != 0 or missing: |
| raise RuntimeError( |
| "display " |
| f":{display_num} failed for frames {frame_chunk[:5]} " |
| f"missing={missing[:8]} log={output_dir.joinpath(f'worker_{display_num}.log')}" |
| ) |
| finished.append(display_num) |
| for display_num in finished: |
| active.pop(display_num) |
| finally: |
| for _, process in list(active.values()): |
| _stop_process(process) |
| for xvfb in xvfb_procs: |
| _stop_process(xvfb) |
|
|
| corrected_df = base_df.iloc[:num_frames].copy() |
| for frame_index in frame_indices: |
| row_path = frame_json_dir.joinpath(f"frame_{frame_index:04d}.json") |
| if not row_path.exists(): |
| raise RuntimeError(f"missing pregrasp row: {row_path}") |
| row = json.loads(row_path.read_text()) |
| for key, value in row.items(): |
| corrected_df.at[frame_index, key] = value |
|
|
| corrected_df = _annotate_phase_columns(corrected_df) |
| keyframes = [index for index in _keypoint_discovery(demo) if index < len(corrected_df)] |
| key_df = _keyframe_subset(corrected_df, keyframes) |
| metrics = _episode_metrics_from_frames( |
| frame_df=corrected_df, |
| key_df=key_df, |
| episode_name=episode_dir.name, |
| description=descriptions[0], |
| interventions=interventions, |
| ) |
|
|
| corrected_df.to_csv(output_dir.joinpath(f"{episode_dir.name}.dense.csv"), index=False) |
| key_df.to_csv(output_dir.joinpath(f"{episode_dir.name}.keyframes.csv"), index=False) |
| with output_dir.joinpath(f"{episode_dir.name}.metrics.json").open("w", encoding="utf-8") as handle: |
| json.dump(metrics, handle, indent=2) |
| summary = _aggregate_summary([metrics]) |
| with output_dir.joinpath("summary.json").open("w", encoding="utf-8") as handle: |
| json.dump(summary, handle, indent=2) |
|
|
| if not args.keep_frame_json: |
| shutil.rmtree(frame_json_dir, ignore_errors=True) |
|
|
| print(json.dumps(summary, indent=2)) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|