from __future__ import annotations import json import shutil import subprocess import tempfile from datetime import UTC, datetime from pathlib import Path, PurePosixPath from typing import Any from slop_farmer.config import RepoRef from slop_farmer.data.parquet_io import read_json, write_json, write_text from slop_farmer.reports.canonical_duplicate_pr import prepare_publish_artifacts, stage_run_bundle from slop_farmer.reports.duplicate_prs import ( DEFAULT_DUPLICATE_PR_MODEL, load_duplicate_pr_bundle, select_mergeable_duplicate_pr_cluster, ) # Navigation: # - run_duplicate_pr_merge(): end-to-end orchestration entrypoint # - validate_codex_result(): structured result checks # - git/gh/codex helpers # - file-policy helpers DEFAULT_RUNS_DIR = Path("runs/duplicate_prs") DEFAULT_FILE_POLICY = "pure-loc" FILE_POLICY_CHOICES = ("pure-loc", "allow-docs", "allow-any") CODE_FILE_SUFFIXES = { ".c", ".cc", ".cpp", ".go", ".h", ".hpp", ".java", ".js", ".jsx", ".kt", ".m", ".mm", ".php", ".py", ".rb", ".rs", ".scala", ".sh", ".swift", ".ts", ".tsx", } DOC_FILE_SUFFIXES = {".md", ".mdx", ".rst", ".txt"} DOC_DIRECTORY_NAMES = {"doc", "docs"} DOC_FILE_PREFIXES = ("changelog", "readme", "news") TEST_DIRECTORY_NAMES = {"test", "tests"} # Merge orchestration def run_duplicate_pr_merge( *, report_path: Path | None, snapshot_dir: Path | None, repo_dir: Path, cluster_id: str | None, fork_owner: str | None, fork_repo: str | None = None, upstream_repo: str | None = None, upstream_remote: str = "origin", fork_remote: str = "fork", file_policy: str = DEFAULT_FILE_POLICY, model: str = DEFAULT_DUPLICATE_PR_MODEL, runs_dir: Path = DEFAULT_RUNS_DIR, ) -> dict[str, Any]: if file_policy not in FILE_POLICY_CHOICES: raise ValueError( f"Unsupported file policy {file_policy!r}. " f"Expected one of: {', '.join(FILE_POLICY_CHOICES)}." ) bundle = load_duplicate_pr_bundle( report_path=report_path, snapshot_dir=snapshot_dir, model=model, ) effective_upstream_repo = _normalize_repo_slug(upstream_repo or bundle.repo) selected_cluster = select_mergeable_duplicate_pr_cluster( bundle, cluster_id=cluster_id, model=model, ) _require_command("git") _require_command("gh") _require_command("codex") authenticated_user = _resolve_authenticated_github_user() fork_target = _resolve_fork_target( upstream_repo=effective_upstream_repo, fork_repo=fork_repo, fork_owner=fork_owner, authenticated_user=authenticated_user, ) effective_fork_owner = fork_target.owner effective_fork_repo = fork_target.slug resolved_repo_dir = repo_dir.resolve() _validate_repo_checkout( resolved_repo_dir, expected_repo=effective_upstream_repo, remote_name=upstream_remote, ) default_branch = _resolve_default_branch(effective_upstream_repo) run_dir = _create_run_dir(runs_dir) manifest = stage_run_bundle( bundle.report_path, run_dir, selected_cluster=selected_cluster, max_clusters=1, prompt_repo=effective_upstream_repo, prompt_default_branch=default_branch, prompt_file_policy_instruction=_file_policy_instruction(file_policy), ) manifest_path = run_dir / "run-manifest.json" run_stamp = _utc_stamp() branch_name = f"codex/{selected_cluster['cluster_id']}-{run_stamp}" worktree_dir = run_dir / "worktree" _create_worktree( repo_dir=resolved_repo_dir, worktree_dir=worktree_dir, branch_name=branch_name, default_branch=default_branch, upstream_remote=upstream_remote, ) _update_manifest( manifest_path, { "upstream_repo": effective_upstream_repo, "upstream_remote": upstream_remote, "default_branch": default_branch, "branch_name": branch_name, "worktree_dir": str(worktree_dir.resolve()), "fork_owner": effective_fork_owner, "fork_repo": effective_fork_repo, "fork_remote": fork_remote, "file_policy": file_policy, }, ) artifacts = manifest["artifacts"] result_path = Path(artifacts["result_path"]) _run_codex_exec( worktree_dir=worktree_dir, run_dir=run_dir, prompt_path=Path(artifacts["prompt_path"]), schema_path=Path(artifacts["schema_path"]), result_path=result_path, ) result = validate_codex_result(manifest_path, result_path) changed_paths = _validate_synthesized_branch( worktree_dir=worktree_dir, upstream_remote=upstream_remote, default_branch=default_branch, commit_message=result["commit_message"], file_policy=file_policy, ) publish_metadata = prepare_publish_artifacts(manifest_path, result_path) fork_repo = _ensure_fork_repo( upstream_repo=effective_upstream_repo, fork_repo=effective_fork_repo, authenticated_user=authenticated_user, ) _ensure_fork_remote(worktree_dir=worktree_dir, fork_repo=fork_repo, remote_name=fork_remote) _push_branch(worktree_dir=worktree_dir, branch_name=branch_name, remote_name=fork_remote) pr_url = _create_pull_request( upstream_repo=effective_upstream_repo, default_branch=default_branch, fork_owner=effective_fork_owner, branch_name=branch_name, title=publish_metadata["pr_title"], body_path=Path(publish_metadata["pr_body_path"]), ) pr_url_path = Path(artifacts["pr_url_path"]) write_text(pr_url.rstrip() + "\n", pr_url_path) _update_manifest( manifest_path, { "changed_paths": changed_paths, "pr_url": pr_url, }, ) publish_metadata_path = Path(artifacts["publish_metadata_path"]) publish_metadata["pr_url"] = pr_url publish_metadata["changed_paths"] = changed_paths write_json(publish_metadata, publish_metadata_path) return { "cluster_id": selected_cluster["cluster_id"], "repo": effective_upstream_repo, "report_path": str(bundle.report_path), "run_dir": str(run_dir.resolve()), "worktree_dir": str(worktree_dir.resolve()), "branch_name": branch_name, "fork_repo": fork_repo, "fork_remote": fork_remote, "upstream_remote": upstream_remote, "file_policy": file_policy, "pr_url": pr_url, "changed_paths": changed_paths, } def validate_codex_result(manifest_path: Path, result_path: Path) -> dict[str, Any]: manifest = read_json(manifest_path.resolve()) result = json.loads(result_path.resolve().read_text(encoding="utf-8")) selected_cluster = manifest["selected_cluster"] if result.get("status") != "success": summary = str(result.get("summary") or "").strip() raise ValueError( "Codex did not synthesize a valid canonical PR." + (f" {summary}" if summary else "") ) if result.get("cluster_id") != selected_cluster["cluster_id"]: raise ValueError("Codex result cluster_id does not match the selected cluster.") expected_source_pr_numbers = _ordered_ints(selected_cluster["source_pr_numbers"]) actual_source_pr_numbers = _normalize_result_source_pr_numbers( expected_source_pr_numbers=expected_source_pr_numbers, raw_source_pr_numbers=result.get("source_pr_numbers"), ) tests_run = [ str(value).strip() for value in result.get("tests_run") or [] if str(value).strip() ] if not tests_run: raise ValueError("Codex result did not include any executed validation commands.") for field in ("commit_message", "pr_title", "summary"): if not str(result.get(field) or "").strip(): raise ValueError(f"Codex result did not provide a {field.replace('_', ' ')}.") normalized = dict(result) normalized["source_pr_numbers"] = actual_source_pr_numbers normalized["tests_run"] = tests_run return normalized # GitHub / git / Codex helpers def _require_command(command_name: str) -> None: if shutil.which(command_name): return raise RuntimeError(f"Missing required command: {command_name}") def _resolve_authenticated_github_user() -> str: try: _run_checked(["gh", "auth", "status"]) except RuntimeError as exc: raise RuntimeError( "GitHub CLI authentication is invalid. Run `gh auth login` and retry." ) from exc login = _run_stdout(["gh", "api", "user", "--jq", ".login"]).strip() if not login: raise RuntimeError("Could not resolve the authenticated GitHub user from `gh api user`.") return login def _normalize_repo_slug(raw: str) -> str: return RepoRef.parse(raw).slug def _resolve_fork_target( *, upstream_repo: str, fork_repo: str | None, fork_owner: str | None, authenticated_user: str, ) -> RepoRef: if fork_repo is not None: return RepoRef.parse(fork_repo.strip()) owner = (fork_owner or authenticated_user).strip() if not owner: raise RuntimeError("Could not resolve the GitHub fork owner.") upstream = RepoRef.parse(upstream_repo) return RepoRef(owner=owner, name=upstream.name) def _validate_repo_checkout(repo_dir: Path, *, expected_repo: str, remote_name: str) -> None: if not repo_dir.exists(): raise RuntimeError(f"Missing repo checkout: {repo_dir}") remote_url = _run_stdout(["git", "-C", str(repo_dir), "remote", "get-url", remote_name]).strip() actual_repo = _repo_slug_from_remote_url(remote_url) if actual_repo != expected_repo: raise RuntimeError( f"`--repo-dir` remote {remote_name!r} must point at {expected_repo}, " f"but resolves to {actual_repo or remote_url!r}." ) def _resolve_default_branch(repo: str) -> str: default_branch = _run_stdout( [ "gh", "repo", "view", repo, "--json", "defaultBranchRef", "--jq", ".defaultBranchRef.name", ] ).strip() if not default_branch: raise RuntimeError(f"Could not resolve the default branch for {repo}.") return default_branch def _create_run_dir(runs_dir: Path) -> Path: base_dir = runs_dir.resolve() base_dir.mkdir(parents=True, exist_ok=True) return Path(tempfile.mkdtemp(prefix=f"{_utc_stamp()}.", dir=base_dir)) def _create_worktree( *, repo_dir: Path, worktree_dir: Path, branch_name: str, default_branch: str, upstream_remote: str, ) -> None: _run_checked(["git", "-C", str(repo_dir), "fetch", upstream_remote, default_branch]) _run_checked( [ "git", "-C", str(repo_dir), "worktree", "add", "-B", branch_name, str(worktree_dir), f"{upstream_remote}/{default_branch}", ] ) def _run_codex_exec( *, worktree_dir: Path, run_dir: Path, prompt_path: Path, schema_path: Path, result_path: Path, ) -> None: prompt_text = prompt_path.read_text(encoding="utf-8") _run_checked( [ "codex", "exec", "-C", str(worktree_dir), "--add-dir", str(run_dir), "--full-auto", "--output-schema", str(schema_path), "-o", str(result_path), "-", ], input_text=prompt_text, ) if not result_path.exists(): raise RuntimeError("Codex did not write a structured result.") def _validate_synthesized_branch( *, worktree_dir: Path, upstream_remote: str, default_branch: str, commit_message: str, file_policy: str, ) -> list[str]: ahead_count = int( _run_stdout( [ "git", "-C", str(worktree_dir), "rev-list", "--count", f"{upstream_remote}/{default_branch}..HEAD", ] ) ) if ahead_count != 1: raise RuntimeError( f"Synthesized branch must contain exactly one commit on top of " f"{upstream_remote}/{default_branch}; found {ahead_count}." ) head_subject = _run_stdout(["git", "-C", str(worktree_dir), "log", "-1", "--pretty=%s"]).strip() if head_subject != commit_message: raise RuntimeError( f"Codex commit message {commit_message!r} does not match HEAD subject {head_subject!r}." ) status_output = _run_stdout(["git", "-C", str(worktree_dir), "status", "--porcelain"]) if status_output.strip(): raise RuntimeError("Codex left uncommitted changes in the synthesis worktree.") changed_paths = [ line.strip() for line in _run_stdout( [ "git", "-C", str(worktree_dir), "diff", "--name-only", f"{upstream_remote}/{default_branch}..HEAD", ] ).splitlines() if line.strip() ] if not changed_paths: raise RuntimeError("The synthesized branch does not modify any files.") if file_policy == "pure-loc": disallowed_paths = [path for path in changed_paths if _is_doc_path(path)] if disallowed_paths: raise RuntimeError( "The synthesized branch touched non-LOC documentation paths: " + ", ".join(disallowed_paths) ) unsupported_paths = [ path for path in changed_paths if not _is_allowed_path(path, allow_docs=False) ] if unsupported_paths: raise RuntimeError( "The synthesized branch touched files outside implementation/test code paths: " + ", ".join(unsupported_paths) ) elif file_policy == "allow-docs": unsupported_paths = [ path for path in changed_paths if not _is_allowed_path(path, allow_docs=True) ] if unsupported_paths: raise RuntimeError( "The synthesized branch touched files outside implementation/test/documentation " "paths: " + ", ".join(unsupported_paths) ) elif file_policy != "allow-any": raise RuntimeError(f"Unsupported file policy: {file_policy}") return changed_paths def _ensure_fork_repo( *, upstream_repo: str, fork_repo: str, authenticated_user: str, ) -> str: fork_target = RepoRef.parse(fork_repo) try: _run_checked(["gh", "repo", "view", fork_repo, "--json", "nameWithOwner"]) except RuntimeError: fork_command = [ "gh", "repo", "fork", upstream_repo, "--clone=false", "--remote=false", "--fork-name", fork_target.name, ] if fork_target.owner != authenticated_user: fork_command.extend(["--org", fork_target.owner]) _run_checked(fork_command) return fork_target.slug def _ensure_fork_remote(*, worktree_dir: Path, fork_repo: str, remote_name: str) -> None: fork_url = f"https://github.com/{fork_repo}.git" try: existing_url = _run_stdout( ["git", "-C", str(worktree_dir), "remote", "get-url", remote_name] ).strip() except RuntimeError: _run_checked( [ "git", "-C", str(worktree_dir), "remote", "add", remote_name, fork_url, ] ) return if existing_url != fork_url: raise RuntimeError( f"Existing `{remote_name}` remote points to {existing_url}, expected {fork_url}." ) def _push_branch(*, worktree_dir: Path, branch_name: str, remote_name: str) -> None: _run_checked(["git", "-C", str(worktree_dir), "push", "-u", remote_name, branch_name]) def _create_pull_request( *, upstream_repo: str, default_branch: str, fork_owner: str, branch_name: str, title: str, body_path: Path, ) -> str: return _run_stdout( [ "gh", "pr", "create", "--repo", upstream_repo, "--base", default_branch, "--head", f"{fork_owner}:{branch_name}", "--title", title, "--body-file", str(body_path), ] ).strip() def _repo_slug_from_remote_url(url: str) -> str: normalized = url.strip() if not normalized: return "" for prefix in ( "https://github.com/", "http://github.com/", "ssh://git@github.com/", "git://github.com/", ): if normalized.startswith(prefix): normalized = normalized[len(prefix) :] break if normalized.startswith("git@github.com:"): normalized = normalized.split(":", 1)[1] normalized = normalized.rstrip("/") if normalized.endswith(".git"): normalized = normalized[:-4] return normalized # File-policy helpers def _is_doc_path(path: str) -> bool: pure_path = PurePosixPath(path) lowered_parts = [part.lower() for part in pure_path.parts] lowered_name = pure_path.name.lower() if pure_path.suffix.lower() in DOC_FILE_SUFFIXES: return True if any(part in DOC_DIRECTORY_NAMES for part in lowered_parts): return True return lowered_name.startswith(DOC_FILE_PREFIXES) def _is_allowed_path(path: str, *, allow_docs: bool) -> bool: pure_path = PurePosixPath(path) if _is_test_path(pure_path): return True if allow_docs and _is_doc_path(path): return True return pure_path.suffix.lower() in CODE_FILE_SUFFIXES def _file_policy_instruction(file_policy: str) -> str: if file_policy == "pure-loc": return ( "Do not touch README files, changelogs, markdown docs, prose-only files, " "or commentary artifacts. Fail instead of submitting a noisy branch." ) if file_policy == "allow-docs": return ( "Documentation and markdown changes are allowed only when they are necessary " "for the same fix. Keep them minimal and subordinate to the code patch." ) if file_policy == "allow-any": return ( "Non-code file changes are allowed when they are required for the same fix, " "but keep the patch as small and focused as possible." ) raise ValueError(f"Unsupported file policy: {file_policy}") def _is_test_path(path: PurePosixPath) -> bool: lowered_parts = [part.lower() for part in path.parts] lowered_name = path.name.lower() lowered_stem = path.stem.lower() if any(part in TEST_DIRECTORY_NAMES for part in lowered_parts): return True return lowered_name.startswith("test_") or lowered_stem.endswith("_test") def _run_checked( args: list[str], *, input_text: str | None = None, ) -> subprocess.CompletedProcess[str]: try: return subprocess.run( args, input=input_text, text=True, capture_output=True, check=True, ) except subprocess.CalledProcessError as exc: detail = (exc.stderr or exc.stdout or "").strip() message = f"Command failed: {' '.join(args)}" if detail: message = f"{message}: {detail}" raise RuntimeError(message) from exc def _run_stdout(args: list[str]) -> str: return _run_checked(args).stdout def _update_manifest(manifest_path: Path, updates: dict[str, Any]) -> None: manifest = read_json(manifest_path) manifest.update(updates) write_json(manifest, manifest_path) def _utc_stamp() -> str: return datetime.now(UTC).strftime("%Y%m%dT%H%M%SZ") def _ordered_ints(values: Any) -> list[int]: ordered: list[int] = [] seen: set[int] = set() for value in values or []: number = _coerce_int(value) if number is None or number in seen: continue ordered.append(number) seen.add(number) return ordered def _normalize_result_source_pr_numbers( *, expected_source_pr_numbers: list[int], raw_source_pr_numbers: Any, ) -> list[int]: actual_source_pr_numbers = _ordered_ints(raw_source_pr_numbers) if len(actual_source_pr_numbers) < 2: raise ValueError( "Codex result must reference at least two open source PRs from the selected cluster." ) expected_source_pr_set = set(expected_source_pr_numbers) unknown_source_pr_numbers = [ number for number in actual_source_pr_numbers if number not in expected_source_pr_set ] if unknown_source_pr_numbers: raise ValueError( "Codex result source_pr_numbers included PRs outside the selected open PR set: " + ", ".join(str(number) for number in unknown_source_pr_numbers) ) actual_source_pr_set = set(actual_source_pr_numbers) return [number for number in expected_source_pr_numbers if number in actual_source_pr_set] def _coerce_int(value: Any) -> int | None: if value is None: return None try: return int(value) except (TypeError, ValueError): return None