| |
| """ |
| harness-state skill script. |
| Operations: get_state, set_state |
| State is stored in .codex-plugin/harness_state.json relative to the repo root. |
| Falls back to the current working directory if the .codex-plugin dir is not found. |
| """ |
|
|
| import json |
| import os |
| import sys |
| from pathlib import Path |
|
|
|
|
| STATE_FILENAME = "harness_state.json" |
|
|
| DEFAULT_STATE = { |
| "active": False, |
| "phase": 0, |
| "phase_name": "", |
| } |
|
|
|
|
| def find_state_dir() -> Path: |
| """Walk up from cwd looking for .codex-plugin/. Fall back to cwd.""" |
| cwd = Path.cwd() |
| for parent in [cwd, *cwd.parents]: |
| candidate = parent / ".codex-plugin" |
| if candidate.is_dir(): |
| return candidate |
| |
| return cwd |
|
|
|
|
| def state_path() -> Path: |
| return find_state_dir() / STATE_FILENAME |
|
|
|
|
| def read_state() -> dict: |
| path = state_path() |
| if not path.exists(): |
| return dict(DEFAULT_STATE) |
| try: |
| with open(path) as f: |
| data = json.load(f) |
| |
| return {**DEFAULT_STATE, **data} |
| except (json.JSONDecodeError, OSError): |
| return dict(DEFAULT_STATE) |
|
|
|
|
| def write_state(active: bool, phase: int, phase_name: str) -> dict: |
| state = {"active": active, "phase": phase, "phase_name": phase_name} |
| path = state_path() |
| path.parent.mkdir(parents=True, exist_ok=True) |
| with open(path, "w") as f: |
| json.dump(state, f, indent=2) |
| return state |
|
|
|
|
| def main(): |
| if len(sys.argv) < 2: |
| print(json.dumps({"error": "Usage: state.py <json_input>"})) |
| sys.exit(1) |
|
|
| try: |
| args = json.loads(sys.argv[1]) |
| except json.JSONDecodeError as e: |
| print(json.dumps({"error": f"Invalid JSON input: {e}"})) |
| sys.exit(1) |
|
|
| operation = args.get("operation") |
|
|
| if operation == "get_state": |
| result = read_state() |
| print(json.dumps(result)) |
|
|
| elif operation == "set_state": |
| active = args.get("active") |
| phase = args.get("phase") |
| phase_name = args.get("phase_name", "") |
|
|
| if active is None or phase is None: |
| print(json.dumps({"error": "set_state requires 'active' (bool) and 'phase' (int)"})) |
| sys.exit(1) |
|
|
| if not isinstance(active, bool): |
| print(json.dumps({"error": "'active' must be a boolean"})) |
| sys.exit(1) |
|
|
| if not isinstance(phase, int): |
| print(json.dumps({"error": "'phase' must be an integer"})) |
| sys.exit(1) |
|
|
| result = write_state(active=active, phase=phase, phase_name=str(phase_name)) |
| print(json.dumps({"ok": True, "state": result})) |
|
|
| else: |
| print(json.dumps({"error": f"Unknown operation '{operation}'. Valid: get_state, set_state"})) |
| sys.exit(1) |
|
|
|
|
| if __name__ == "__main__": |
| main() |