Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import os | |
| import shutil | |
| import tempfile | |
| import time | |
| from collections import defaultdict | |
| from datetime import UTC, datetime | |
| from pathlib import Path | |
| from typing import Any | |
| from huggingface_hub import HfApi | |
| from slop_farmer.app_config import command_defaults, extract_cli_config_path | |
| from slop_farmer.config import ( | |
| DatasetRefreshOptions, | |
| NewContributorReportOptions, | |
| PrScopeOptions, | |
| RepoRef, | |
| resolve_github_token, | |
| ) | |
| from slop_farmer.data.dataset_card import build_hf_dataset_card | |
| from slop_farmer.data.github_api import GitHubClient | |
| from slop_farmer.data.hf_dataset_repo import ( | |
| list_remote_paths, | |
| load_remote_file, | |
| load_remote_json_file, | |
| stable_snapshot_candidates, | |
| ) | |
| from slop_farmer.data.links import build_pr_duplicate_candidate_rows, build_text_link_rows | |
| from slop_farmer.data.normalize import ( | |
| issue_url_to_number, | |
| normalize_comment, | |
| normalize_issue, | |
| normalize_pr_diff, | |
| normalize_pr_file, | |
| normalize_pull_request, | |
| normalize_review, | |
| normalize_review_comment, | |
| normalize_timeline_event, | |
| ) | |
| from slop_farmer.data.parquet_io import ( | |
| SCHEMAS, | |
| read_parquet_rows, | |
| write_json, | |
| write_parquet, | |
| write_text, | |
| ) | |
| from slop_farmer.reports.new_contributor_report import run_new_contributor_report | |
| from slop_farmer.reports.pr_scope import run_pr_scope_report | |
| PRIMARY_KEYS: dict[str, tuple[str, ...]] = { | |
| "issues": ("github_id",), | |
| "pull_requests": ("github_id",), | |
| "comments": ("github_id",), | |
| "reviews": ("github_id",), | |
| "review_comments": ("github_id",), | |
| "pr_files": ("repo", "pull_request_number", "filename"), | |
| "pr_diffs": ("repo", "pull_request_number"), | |
| "links": ( | |
| "repo", | |
| "source_type", | |
| "source_number", | |
| "source_github_id", | |
| "target_owner", | |
| "target_repo", | |
| "target_number", | |
| "link_type", | |
| "link_origin", | |
| ), | |
| "events": ( | |
| "repo", | |
| "parent_kind", | |
| "parent_number", | |
| "event", | |
| "created_at", | |
| "actor_login", | |
| "source_issue_number", | |
| "source_issue_url", | |
| "commit_id", | |
| "label_name", | |
| ), | |
| } | |
| CHECKPOINT_PREFIXES = ("_checkpoints", "checkpoints") | |
| def log(message: str) -> None: | |
| stamp = datetime.now(tz=UTC).strftime("%H:%M:%SZ") | |
| print(f"[{stamp}] {message}", flush=True) | |
| def iso_now() -> str: | |
| return datetime.now(tz=UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z") | |
| def snapshot_id() -> str: | |
| return datetime.now(tz=UTC).strftime("%Y%m%dT%H%M%SZ") | |
| def row_key(row: dict[str, Any], fields: tuple[str, ...]) -> str: | |
| return json.dumps([row.get(field) for field in fields], default=str) | |
| def merge_rows( | |
| table_name: str, | |
| previous_rows: list[dict[str, Any]], | |
| delta_rows: list[dict[str, Any]], | |
| ) -> list[dict[str, Any]]: | |
| if table_name == "pr_files": | |
| refreshed_prs = { | |
| (row.get("repo"), row.get("pull_request_number")) | |
| for row in delta_rows | |
| if row.get("pull_request_number") is not None | |
| } | |
| previous_rows = [ | |
| row | |
| for row in previous_rows | |
| if (row.get("repo"), row.get("pull_request_number")) not in refreshed_prs | |
| ] | |
| merged: dict[str, dict[str, Any]] = {} | |
| for row in previous_rows: | |
| merged[row_key(row, PRIMARY_KEYS[table_name])] = row | |
| for row in delta_rows: | |
| merged[row_key(row, PRIMARY_KEYS[table_name])] = row | |
| return list(merged.values()) | |
| def checkpoint_dirs(remote_paths: set[str]) -> list[tuple[str, str]]: | |
| by_snapshot_id: dict[str, str] = {} | |
| for path in remote_paths: | |
| parts = path.split("/") | |
| if len(parts) < 3 or parts[0] not in CHECKPOINT_PREFIXES: | |
| continue | |
| snapshot_key = parts[1] | |
| prefix = parts[0] | |
| current = by_snapshot_id.get(snapshot_key) | |
| if current is None or current.startswith("checkpoints/"): | |
| by_snapshot_id[snapshot_key] = f"{prefix}/{snapshot_key}" | |
| return [(sid, by_snapshot_id[sid]) for sid in sorted(by_snapshot_id)] | |
| def copy_remote_file_from_candidates( | |
| api: HfApi, | |
| repo_id: str, | |
| local_dir: Path, | |
| destination: Path, | |
| candidate_paths: list[str], | |
| ) -> bool: | |
| for candidate in candidate_paths: | |
| downloaded = load_remote_file(api, repo_id, candidate, local_dir) | |
| if downloaded is None: | |
| continue | |
| destination.parent.mkdir(parents=True, exist_ok=True) | |
| shutil.copy2(downloaded, destination) | |
| return True | |
| return False | |
| def materialize_previous_snapshot_dir( | |
| *, | |
| api: Any, | |
| repo_id: str, | |
| previous_root: Path, | |
| stable_snapshot_id: str | None, | |
| latest_pointer: dict[str, Any] | None, | |
| previous_tables: dict[str, list[dict[str, Any]]], | |
| ) -> Path | None: | |
| if not stable_snapshot_id: | |
| return None | |
| snapshot_dir = (previous_root / "materialized-snapshots" / stable_snapshot_id).resolve() | |
| snapshot_dir.mkdir(parents=True, exist_ok=True) | |
| for table_name, rows in previous_tables.items(): | |
| write_parquet(rows, snapshot_dir / f"{table_name}.parquet", table_name) | |
| for artifact_name in ( | |
| "manifest.json", | |
| "new_contributors.parquet", | |
| "new-contributors-report.json", | |
| "new-contributors-report.md", | |
| ): | |
| copy_remote_file_from_candidates( | |
| api, | |
| repo_id, | |
| previous_root, | |
| snapshot_dir / artifact_name, | |
| stable_snapshot_candidates(latest_pointer, artifact_name), | |
| ) | |
| return snapshot_dir | |
| def load_remote_table_from_candidates( | |
| api: HfApi, | |
| repo_id: str, | |
| table_name: str, | |
| local_dir: Path, | |
| candidate_paths: list[str], | |
| ) -> list[dict[str, Any]]: | |
| for candidate in candidate_paths: | |
| downloaded = load_remote_file(api, repo_id, candidate, local_dir) | |
| if downloaded is not None: | |
| return read_parquet_rows(downloaded) | |
| return [] | |
| def viewer_comment_rows( | |
| comments: list[dict[str, Any]], | |
| pull_requests: list[dict[str, Any]], | |
| ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: | |
| pr_numbers = {int(row["number"]) for row in pull_requests if row.get("number") is not None} | |
| issue_comments: list[dict[str, Any]] = [] | |
| pr_comments: list[dict[str, Any]] = [] | |
| for row in comments: | |
| parent_number = row.get("parent_number") | |
| parent_kind = row.get("parent_kind") | |
| if parent_kind == "pull_request" or parent_number in pr_numbers: | |
| pr_comments.append(row) | |
| else: | |
| issue_comments.append(row) | |
| return issue_comments, pr_comments | |
| def upload_delta_checkpoint( | |
| *, | |
| api: HfApi, | |
| repo_id: str, | |
| work_dir: Path, | |
| repo_slug: str, | |
| sid: str, | |
| stage: str, | |
| delta_tables: dict[str, list[dict[str, Any]]], | |
| progress: dict[str, Any], | |
| ) -> None: | |
| checkpoint_root = work_dir / "checkpoint_upload" | |
| if checkpoint_root.exists(): | |
| shutil.rmtree(checkpoint_root) | |
| checkpoint_root.mkdir(parents=True, exist_ok=True) | |
| for table_name, rows in delta_tables.items(): | |
| write_parquet(rows, checkpoint_root / f"{table_name}.parquet", table_name) | |
| write_json( | |
| {"repo": repo_slug, "snapshot_id": sid, **progress}, checkpoint_root / "progress.json" | |
| ) | |
| write_json( | |
| {"repo": repo_slug, "snapshot_id": sid, **progress}, | |
| checkpoint_root / "state" / "in_progress.json", | |
| ) | |
| api.upload_folder( | |
| folder_path=str(checkpoint_root), | |
| path_in_repo=f"_checkpoints/{sid}", | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| commit_message=f"Checkpoint {sid} ({stage})", | |
| ) | |
| def remaining_limit(limit: int | None, used: int) -> int | None: | |
| if limit is None: | |
| return None | |
| return max(limit - used, 0) | |
| def _build_argument_parser(*, config_path: Path | None = None) -> argparse.ArgumentParser: | |
| defaults = command_defaults("refresh-dataset", config_path=config_path) | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument("--config", type=Path, help="Optional repo config file.") | |
| parser.add_argument("--repo", default=defaults.get("repo", "huggingface/transformers")) | |
| parser.add_argument("--hf-repo-id", default=defaults.get("hf-repo-id")) | |
| parser.add_argument("--max-issues", type=int, default=defaults.get("max-issues")) | |
| parser.add_argument("--max-prs", type=int, default=defaults.get("max-prs")) | |
| parser.add_argument( | |
| "--max-issue-comments", | |
| type=int, | |
| default=defaults.get("max-issue-comments"), | |
| ) | |
| parser.add_argument( | |
| "--max-reviews-per-pr", | |
| type=int, | |
| default=defaults.get("max-reviews-per-pr"), | |
| ) | |
| parser.add_argument( | |
| "--max-review-comments-per-pr", | |
| type=int, | |
| default=defaults.get("max-review-comments-per-pr"), | |
| ) | |
| parser.add_argument( | |
| "--fetch-timeline", | |
| action="store_true", | |
| default=bool(defaults.get("fetch-timeline", False)), | |
| ) | |
| parser.add_argument( | |
| "--new-contributor-report", | |
| dest="new_contributor_report", | |
| action="store_true", | |
| default=bool(defaults.get("new-contributor-report", True)), | |
| ) | |
| parser.add_argument( | |
| "--no-new-contributor-report", | |
| dest="new_contributor_report", | |
| action="store_false", | |
| ) | |
| parser.add_argument( | |
| "--new-contributor-window-days", | |
| type=int, | |
| default=int(defaults.get("new-contributor-window-days", 42)), | |
| ) | |
| parser.add_argument( | |
| "--new-contributor-max-authors", | |
| type=int, | |
| default=int(defaults.get("new-contributor-max-authors", 25)), | |
| ) | |
| parser.add_argument("--http-timeout", type=int, default=300) | |
| parser.add_argument("--http-max-retries", type=int, default=8) | |
| parser.add_argument("--checkpoint-every-comments", type=int, default=1000) | |
| parser.add_argument("--checkpoint-every-prs", type=int, default=25) | |
| parser.add_argument( | |
| "--private-hf-repo", | |
| dest="private_hf_repo", | |
| action="store_true", | |
| default=bool(defaults.get("private-hf-repo", False)), | |
| ) | |
| parser.add_argument("--private", dest="private_hf_repo", action="store_true") | |
| parser.set_defaults( | |
| cluster_suppression_rules=tuple(defaults.get("cluster-suppression-rules", ())) | |
| ) | |
| return parser | |
| def parse_args(argv: list[str] | None = None) -> argparse.Namespace: | |
| config_path = extract_cli_config_path(argv) | |
| parser = _build_argument_parser(config_path=config_path) | |
| args = parser.parse_args(argv) | |
| if not args.hf_repo_id: | |
| parser.error("--hf-repo-id is required (or set dataset_id in --config)") | |
| return args | |
| def run_dataset_refresh(options: DatasetRefreshOptions) -> dict[str, Any]: | |
| hf_token = os.getenv("HF_TOKEN") | |
| github_token = resolve_github_token() | |
| if not github_token: | |
| raise RuntimeError("GITHUB_TOKEN must be set or resolvable via gh auth/.env") | |
| repo_slug = options.repo.slug | |
| owner, repo_name = options.repo.owner, options.repo.name | |
| sid = snapshot_id() | |
| crawl_started_at = iso_now() | |
| extracted_at = iso_now() | |
| api = HfApi(token=hf_token) | |
| api.create_repo( | |
| repo_id=options.hf_repo_id, | |
| repo_type="dataset", | |
| private=options.private_hf_repo, | |
| exist_ok=True, | |
| ) | |
| with tempfile.TemporaryDirectory(prefix="slop-farmer-job-") as tmp: | |
| root = Path(tmp) | |
| previous_root = root / "previous" | |
| output_root = root / "output" | |
| previous_root.mkdir(parents=True, exist_ok=True) | |
| output_root.mkdir(parents=True, exist_ok=True) | |
| remote_paths = list_remote_paths(api, options.hf_repo_id) | |
| previous_watermark = load_remote_json_file( | |
| api, options.hf_repo_id, "state/watermark.json", previous_root | |
| ) | |
| remote_manifest = load_remote_json_file( | |
| api, options.hf_repo_id, "manifest.json", previous_root | |
| ) | |
| latest_pointer = ( | |
| load_remote_json_file(api, options.hf_repo_id, "snapshots/latest.json", previous_root) | |
| if "snapshots/latest.json" in remote_paths | |
| else None | |
| ) | |
| stable_snapshot_id = None | |
| if previous_watermark: | |
| stable_snapshot_id = previous_watermark.get("last_successful_snapshot_id") | |
| elif latest_pointer: | |
| stable_snapshot_id = latest_pointer.get("latest_snapshot_id") | |
| elif remote_manifest: | |
| stable_snapshot_id = remote_manifest.get("snapshot_id") | |
| log(f"Starting dataset refresh for {repo_slug}") | |
| log(f"Target dataset repo: {options.hf_repo_id}") | |
| previous_tables = { | |
| table_name: [] for table_name in SCHEMAS if table_name != "new_contributors" | |
| } | |
| for table_name in previous_tables: | |
| previous_tables[table_name] = load_remote_table_from_candidates( | |
| api, | |
| options.hf_repo_id, | |
| table_name, | |
| previous_root, | |
| stable_snapshot_candidates(latest_pointer, f"{table_name}.parquet"), | |
| ) | |
| checkpoint_progress: dict[str, Any] | None = None | |
| best_comment_checkpoint_progress: dict[str, Any] | None = None | |
| for checkpoint_sid, checkpoint_dir in checkpoint_dirs(remote_paths): | |
| if stable_snapshot_id is not None and checkpoint_sid <= str(stable_snapshot_id): | |
| continue | |
| progress_payload = load_remote_json_file( | |
| api, options.hf_repo_id, f"{checkpoint_dir}/progress.json", previous_root | |
| ) or load_remote_json_file( | |
| api, | |
| options.hf_repo_id, | |
| f"{checkpoint_dir}/state/in_progress.json", | |
| previous_root, | |
| ) | |
| if progress_payload is not None: | |
| checkpoint_progress = progress_payload | |
| if ( | |
| progress_payload.get("effective_since") is None | |
| and (progress_payload.get("counts") or {}).get("comments", 0) > 0 | |
| and ( | |
| best_comment_checkpoint_progress is None | |
| or (progress_payload.get("counts") or {}).get("comments", 0) | |
| > (best_comment_checkpoint_progress.get("counts") or {}).get("comments", 0) | |
| ) | |
| ): | |
| best_comment_checkpoint_progress = progress_payload | |
| for table_name in previous_tables: | |
| checkpoint_rows = load_remote_table_from_candidates( | |
| api, | |
| options.hf_repo_id, | |
| table_name, | |
| previous_root, | |
| [f"{checkpoint_dir}/{table_name}.parquet"], | |
| ) | |
| if checkpoint_rows: | |
| previous_tables[table_name] = merge_rows( | |
| table_name, | |
| previous_tables[table_name], | |
| checkpoint_rows, | |
| ) | |
| effective_since = None | |
| if checkpoint_progress and checkpoint_progress.get("effective_since") is not None: | |
| effective_since = checkpoint_progress.get("effective_since") | |
| log(f"Resuming from incomplete checkpoint window starting at {effective_since}") | |
| elif previous_watermark and previous_watermark.get("next_since") is not None: | |
| effective_since = previous_watermark.get("next_since") | |
| log(f"Resuming from remote watermark {effective_since}") | |
| elif ( | |
| remote_manifest | |
| and isinstance(remote_manifest.get("watermark"), dict) | |
| and remote_manifest["watermark"].get("next_since") is not None | |
| ): | |
| effective_since = remote_manifest["watermark"].get("next_since") | |
| log(f"Bootstrapping remote watermark from root manifest {effective_since}") | |
| else: | |
| log("No successful watermark found; running full snapshot") | |
| client = GitHubClient( | |
| token=github_token, | |
| timeout=options.http_timeout, | |
| max_retries=options.http_max_retries, | |
| log=log, | |
| ) | |
| previous_snapshot_dir = materialize_previous_snapshot_dir( | |
| api=api, | |
| repo_id=options.hf_repo_id, | |
| previous_root=previous_root, | |
| stable_snapshot_id=str(stable_snapshot_id) if stable_snapshot_id is not None else None, | |
| latest_pointer=latest_pointer, | |
| previous_tables=previous_tables, | |
| ) | |
| rate_limit = client.get_json("/rate_limit") | |
| core = (rate_limit.get("resources") or {}).get("core") or {} | |
| limit = core.get("limit") | |
| remaining = core.get("remaining") | |
| reset_at = core.get("reset") | |
| log(f"GitHub core rate limit: limit={limit} remaining={remaining} reset={reset_at}") | |
| if limit is not None and int(limit) <= 60: | |
| raise RuntimeError("GITHUB_TOKEN appears to be missing, invalid, or not being applied") | |
| if remaining == 0 and reset_at: | |
| sleep_for = max(int(reset_at) - int(time.time()), 1) | |
| log(f"GitHub token exhausted before bootstrap; sleeping {sleep_for}s until reset") | |
| time.sleep(sleep_for) | |
| log("Fetching changed issue and pull request stubs from GitHub") | |
| issue_stubs = list( | |
| client.iter_repo_issues(owner, repo_name, effective_since, options.max_issues) | |
| ) | |
| issues = [item for item in issue_stubs if "pull_request" not in item] | |
| pr_stubs = [item for item in issue_stubs if "pull_request" in item] | |
| if options.max_prs is not None: | |
| pr_stubs = pr_stubs[: options.max_prs] | |
| log(f"Fetched {len(issue_stubs)} changed stubs") | |
| issue_number_to_kind = { | |
| item["number"]: ("pull_request" if "pull_request" in item else "issue") | |
| for item in issue_stubs | |
| } | |
| issue_rows = [normalize_issue(repo_slug, item, sid, extracted_at) for item in issues] | |
| comment_rows: list[dict[str, Any]] = [] | |
| next_comment_checkpoint = options.checkpoint_every_comments | |
| reuse_checkpoint_comments = ( | |
| stable_snapshot_id is None | |
| and effective_since is None | |
| and best_comment_checkpoint_progress is not None | |
| and bool(previous_tables["comments"]) | |
| ) | |
| if reuse_checkpoint_comments: | |
| log( | |
| f"Reusing {len(previous_tables['comments'])} checkpoint comments from prior partial runs" | |
| ) | |
| else: | |
| for index, item in enumerate(issue_stubs, start=1): | |
| if not item.get("comments"): | |
| continue | |
| remaining_comments = remaining_limit(options.max_issue_comments, len(comment_rows)) | |
| if remaining_comments == 0: | |
| break | |
| if index == 1 or index % 25 == 0: | |
| log(f"Collecting discussion comments; {len(comment_rows)} collected so far") | |
| for comment in client.iter_issue_comments_for_number( | |
| owner, | |
| repo_name, | |
| int(item["number"]), | |
| effective_since, | |
| remaining_comments, | |
| ): | |
| parent_number = issue_url_to_number(comment.get("issue_url")) | |
| parent_kind = issue_number_to_kind.get(parent_number, "issue_or_pr") | |
| comment_rows.append( | |
| normalize_comment( | |
| repo_slug, | |
| comment, | |
| parent_kind, | |
| parent_number, | |
| sid, | |
| extracted_at, | |
| ) | |
| ) | |
| remaining_comments = remaining_limit( | |
| options.max_issue_comments, | |
| len(comment_rows), | |
| ) | |
| if ( | |
| options.checkpoint_every_comments | |
| and len(comment_rows) >= next_comment_checkpoint | |
| ): | |
| log(f"Pushing comment checkpoint to Hub at {len(comment_rows)} comments") | |
| upload_delta_checkpoint( | |
| api=api, | |
| repo_id=options.hf_repo_id, | |
| work_dir=root, | |
| repo_slug=repo_slug, | |
| sid=sid, | |
| stage="comments", | |
| delta_tables={ | |
| "issues": issue_rows, | |
| "pull_requests": [], | |
| "comments": comment_rows, | |
| "reviews": [], | |
| "review_comments": [], | |
| "pr_files": [], | |
| "pr_diffs": [], | |
| "links": [], | |
| "events": [], | |
| }, | |
| progress={ | |
| "stage": "comments", | |
| "effective_since": effective_since, | |
| "counts": { | |
| "issues": len(issue_rows), | |
| "comments": len(comment_rows), | |
| "pull_requests": 0, | |
| "reviews": 0, | |
| "review_comments": 0, | |
| "pr_files": 0, | |
| "pr_diffs": 0, | |
| "links": 0, | |
| "events": 0, | |
| }, | |
| }, | |
| ) | |
| next_comment_checkpoint += options.checkpoint_every_comments | |
| if remaining_comments == 0: | |
| break | |
| pr_rows: list[dict[str, Any]] = [] | |
| review_rows: list[dict[str, Any]] = [] | |
| review_comment_rows: list[dict[str, Any]] = [] | |
| pr_file_rows: list[dict[str, Any]] = [] | |
| pr_diff_rows: list[dict[str, Any]] = [] | |
| event_rows: list[dict[str, Any]] = [] | |
| next_pr_checkpoint = options.checkpoint_every_prs | |
| previous_pr_rows_by_number = { | |
| int(row["number"]): row | |
| for row in previous_tables["pull_requests"] | |
| if row.get("number") is not None | |
| } | |
| previous_review_rows_by_number: defaultdict[int, list[dict[str, Any]]] = defaultdict(list) | |
| for row in previous_tables["reviews"]: | |
| if row.get("pull_request_number") is not None: | |
| previous_review_rows_by_number[int(row["pull_request_number"])].append(row) | |
| previous_review_comment_rows_by_number: defaultdict[int, list[dict[str, Any]]] = ( | |
| defaultdict(list) | |
| ) | |
| for row in previous_tables["review_comments"]: | |
| if row.get("pull_request_number") is not None: | |
| previous_review_comment_rows_by_number[int(row["pull_request_number"])].append(row) | |
| previous_pr_file_rows_by_number: defaultdict[int, list[dict[str, Any]]] = defaultdict(list) | |
| for row in previous_tables["pr_files"]: | |
| if row.get("pull_request_number") is not None: | |
| previous_pr_file_rows_by_number[int(row["pull_request_number"])].append(row) | |
| previous_pr_diff_rows_by_number = { | |
| int(row["pull_request_number"]): row | |
| for row in previous_tables["pr_diffs"] | |
| if row.get("pull_request_number") is not None | |
| } | |
| previous_pr_event_rows_by_number: defaultdict[int, list[dict[str, Any]]] = defaultdict(list) | |
| for row in previous_tables["events"]: | |
| if row.get("parent_kind") == "pull_request" and row.get("parent_number") is not None: | |
| previous_pr_event_rows_by_number[int(row["parent_number"])].append(row) | |
| hydration_pr_stubs: list[dict[str, Any]] = [] | |
| for pr_stub in pr_stubs: | |
| number = int(pr_stub["number"]) | |
| previous_pr_row = previous_pr_rows_by_number.get(number) | |
| if previous_pr_row and previous_pr_row.get("updated_at") == pr_stub.get("updated_at"): | |
| pr_rows.append(previous_pr_row) | |
| review_rows.extend(previous_review_rows_by_number[number]) | |
| review_comment_rows.extend(previous_review_comment_rows_by_number[number]) | |
| pr_file_rows.extend(previous_pr_file_rows_by_number[number]) | |
| if number in previous_pr_diff_rows_by_number: | |
| pr_diff_rows.append(previous_pr_diff_rows_by_number[number]) | |
| event_rows.extend(previous_pr_event_rows_by_number[number]) | |
| continue | |
| hydration_pr_stubs.append(pr_stub) | |
| reused_pr_count = len(pr_rows) | |
| if reused_pr_count: | |
| log(f"Reusing hydrated data for {reused_pr_count} pull requests from prior checkpoints") | |
| if options.checkpoint_every_prs: | |
| while reused_pr_count >= next_pr_checkpoint: | |
| next_pr_checkpoint += options.checkpoint_every_prs | |
| total_prs = len(pr_stubs) | |
| remaining_prs = len(hydration_pr_stubs) | |
| for index, pr_stub in enumerate(hydration_pr_stubs, start=1): | |
| number = int(pr_stub["number"]) | |
| hydrated_count = reused_pr_count + index | |
| if index == 1 or hydrated_count % 10 == 0 or index == remaining_prs: | |
| log(f"Hydrating pull requests: {hydrated_count}/{total_prs}") | |
| detail = client.get_pull_request(owner, repo_name, number) | |
| pr_rows.append(normalize_pull_request(repo_slug, pr_stub, detail, sid, extracted_at)) | |
| for review in client.iter_pull_reviews( | |
| owner, repo_name, number, options.max_reviews_per_pr | |
| ): | |
| review_rows.append(normalize_review(repo_slug, number, review, sid, extracted_at)) | |
| for comment in client.iter_pull_review_comments( | |
| owner, | |
| repo_name, | |
| number, | |
| options.max_review_comments_per_pr, | |
| ): | |
| review_comment_rows.append( | |
| normalize_review_comment(repo_slug, number, comment, sid, extracted_at) | |
| ) | |
| for pr_file in client.iter_pull_files(owner, repo_name, number): | |
| pr_file_rows.append( | |
| normalize_pr_file(repo_slug, number, pr_file, sid, extracted_at) | |
| ) | |
| pr_diff_rows.append( | |
| normalize_pr_diff( | |
| repo_slug, | |
| number, | |
| pr_stub.get("html_url"), | |
| pr_stub.get("url"), | |
| client.get_pull_request_diff(owner, repo_name, number), | |
| sid, | |
| extracted_at, | |
| ) | |
| ) | |
| if options.fetch_timeline: | |
| for event in client.iter_issue_timeline(owner, repo_name, number): | |
| event_rows.append( | |
| normalize_timeline_event( | |
| repo_slug, | |
| number, | |
| "pull_request", | |
| event, | |
| sid, | |
| extracted_at, | |
| ) | |
| ) | |
| if options.checkpoint_every_prs and len(pr_rows) >= next_pr_checkpoint: | |
| log(f"Pushing PR checkpoint to Hub at {len(pr_rows)} hydrated PRs") | |
| upload_delta_checkpoint( | |
| api=api, | |
| repo_id=options.hf_repo_id, | |
| work_dir=root, | |
| repo_slug=repo_slug, | |
| sid=sid, | |
| stage="pull_requests", | |
| delta_tables={ | |
| "issues": issue_rows, | |
| "pull_requests": pr_rows, | |
| "comments": comment_rows, | |
| "reviews": review_rows, | |
| "review_comments": review_comment_rows, | |
| "pr_files": pr_file_rows, | |
| "pr_diffs": pr_diff_rows, | |
| "links": [], | |
| "events": event_rows, | |
| }, | |
| progress={ | |
| "stage": "pull_requests", | |
| "effective_since": effective_since, | |
| "counts": { | |
| "issues": len(issue_rows), | |
| "comments": len(comment_rows), | |
| "pull_requests": len(pr_rows), | |
| "reviews": len(review_rows), | |
| "review_comments": len(review_comment_rows), | |
| "pr_files": len(pr_file_rows), | |
| "pr_diffs": len(pr_diff_rows), | |
| "links": 0, | |
| "events": len(event_rows), | |
| }, | |
| }, | |
| ) | |
| next_pr_checkpoint += options.checkpoint_every_prs | |
| if options.fetch_timeline: | |
| log(f"Fetching issue timelines for {len(issues)} changed issues") | |
| for issue in issues: | |
| for event in client.iter_issue_timeline(owner, repo_name, int(issue["number"])): | |
| event_rows.append( | |
| normalize_timeline_event( | |
| repo_slug, | |
| int(issue["number"]), | |
| "issue", | |
| event, | |
| sid, | |
| extracted_at, | |
| ) | |
| ) | |
| link_rows: list[dict[str, Any]] = [] | |
| for row in issue_rows: | |
| link_rows.extend( | |
| build_text_link_rows( | |
| repo=repo_slug, | |
| owner=owner, | |
| repo_name=repo_name, | |
| source_type="issue", | |
| source_number=row["number"], | |
| source_id=row["github_id"], | |
| body=row["body"], | |
| snapshot_id=sid, | |
| extracted_at=extracted_at, | |
| ) | |
| ) | |
| for row in pr_rows: | |
| link_rows.extend( | |
| build_text_link_rows( | |
| repo=repo_slug, | |
| owner=owner, | |
| repo_name=repo_name, | |
| source_type="pull_request", | |
| source_number=row["number"], | |
| source_id=row["github_id"], | |
| body=row["body"], | |
| snapshot_id=sid, | |
| extracted_at=extracted_at, | |
| ) | |
| ) | |
| for row in comment_rows or previous_tables["comments"]: | |
| if row["parent_number"] is None: | |
| continue | |
| link_rows.extend( | |
| build_text_link_rows( | |
| repo=repo_slug, | |
| owner=owner, | |
| repo_name=repo_name, | |
| source_type="comment", | |
| source_number=row["parent_number"], | |
| source_id=row["github_id"], | |
| body=row["body"], | |
| snapshot_id=sid, | |
| extracted_at=extracted_at, | |
| ) | |
| ) | |
| for row in review_rows: | |
| link_rows.extend( | |
| build_text_link_rows( | |
| repo=repo_slug, | |
| owner=owner, | |
| repo_name=repo_name, | |
| source_type="review", | |
| source_number=row["pull_request_number"], | |
| source_id=row["github_id"], | |
| body=row["body"], | |
| snapshot_id=sid, | |
| extracted_at=extracted_at, | |
| ) | |
| ) | |
| for row in review_comment_rows: | |
| link_rows.extend( | |
| build_text_link_rows( | |
| repo=repo_slug, | |
| owner=owner, | |
| repo_name=repo_name, | |
| source_type="review_comment", | |
| source_number=row["pull_request_number"], | |
| source_id=row["github_id"], | |
| body=row["body"], | |
| snapshot_id=sid, | |
| extracted_at=extracted_at, | |
| ) | |
| ) | |
| link_rows.extend( | |
| build_pr_duplicate_candidate_rows( | |
| repo=repo_slug, | |
| pull_requests=pr_rows, | |
| link_rows=link_rows, | |
| snapshot_id=sid, | |
| extracted_at=extracted_at, | |
| ) | |
| ) | |
| for event in event_rows: | |
| if event.get("source_issue_number"): | |
| link_rows.append( | |
| { | |
| "repo": repo_slug, | |
| "source_type": event["parent_kind"], | |
| "source_number": event["parent_number"], | |
| "source_github_id": None, | |
| "target_owner": owner, | |
| "target_repo": repo_name, | |
| "target_number": event["source_issue_number"], | |
| "link_type": f"timeline:{event['event']}", | |
| "link_origin": "timeline", | |
| "snapshot_id": sid, | |
| "extracted_at": extracted_at, | |
| } | |
| ) | |
| delta_tables = { | |
| "issues": issue_rows, | |
| "pull_requests": pr_rows, | |
| "comments": comment_rows, | |
| "reviews": review_rows, | |
| "review_comments": review_comment_rows, | |
| "pr_files": pr_file_rows, | |
| "pr_diffs": pr_diff_rows, | |
| "links": link_rows, | |
| "events": event_rows, | |
| } | |
| if any(delta_tables.values()): | |
| log("Pushing final delta checkpoint to Hub before merge upload") | |
| upload_delta_checkpoint( | |
| api=api, | |
| repo_id=options.hf_repo_id, | |
| work_dir=root, | |
| repo_slug=repo_slug, | |
| sid=sid, | |
| stage="final-delta", | |
| delta_tables=delta_tables, | |
| progress={ | |
| "stage": "final-delta", | |
| "effective_since": effective_since, | |
| "counts": {name: len(rows) for name, rows in delta_tables.items()}, | |
| }, | |
| ) | |
| final_tables = { | |
| table_name: merge_rows(table_name, previous_tables[table_name], delta_rows) | |
| for table_name, delta_rows in delta_tables.items() | |
| } | |
| manifest: dict[str, Any] = { | |
| "repo": repo_slug, | |
| "snapshot_id": sid, | |
| "crawl_started_at": crawl_started_at, | |
| "extracted_at": extracted_at, | |
| "watermark": { | |
| "effective_since": effective_since, | |
| "next_since": crawl_started_at, | |
| "previous_snapshot_dir": ( | |
| str(previous_snapshot_dir) if previous_snapshot_dir is not None else None | |
| ), | |
| }, | |
| "delta_counts": { | |
| "issue_stubs": len(issue_stubs), | |
| "issues": len(issue_rows), | |
| "pull_requests": len(pr_rows), | |
| "comments": len(comment_rows), | |
| "reviews": len(review_rows), | |
| "review_comments": len(review_comment_rows), | |
| "pr_files": len(pr_file_rows), | |
| "pr_diffs": len(pr_diff_rows), | |
| "timeline_events": len(event_rows), | |
| "links": len(link_rows), | |
| }, | |
| "counts": { | |
| "issues": len(final_tables["issues"]), | |
| "pull_requests": len(final_tables["pull_requests"]), | |
| "comments": len(final_tables["comments"]), | |
| "reviews": len(final_tables["reviews"]), | |
| "review_comments": len(final_tables["review_comments"]), | |
| "pr_files": len(final_tables["pr_files"]), | |
| "pr_diffs": len(final_tables["pr_diffs"]), | |
| "timeline_events": len(final_tables["events"]), | |
| "links": len(final_tables["links"]), | |
| }, | |
| } | |
| log("Writing updated dataset files") | |
| for table_name, rows in final_tables.items(): | |
| write_parquet(rows, output_root / f"{table_name}.parquet", table_name) | |
| issue_comment_rows, pr_comment_rows = viewer_comment_rows( | |
| final_tables["comments"], | |
| final_tables["pull_requests"], | |
| ) | |
| write_parquet(issue_comment_rows, output_root / "issue_comments.parquet", "comments") | |
| write_parquet(pr_comment_rows, output_root / "pr_comments.parquet", "comments") | |
| archived_snapshot_dir = output_root / "snapshots" / sid | |
| archived_snapshot_dir.mkdir(parents=True, exist_ok=True) | |
| write_json(manifest, output_root / "manifest.json") | |
| log("Generating PR scope clusters") | |
| pr_scope_path = run_pr_scope_report( | |
| PrScopeOptions( | |
| snapshot_dir=output_root, | |
| output_dir=output_root, | |
| output=output_root / "pr-scope-clusters.json", | |
| hf_repo_id=None, | |
| hf_revision=None, | |
| hf_materialize_dir=None, | |
| cluster_suppression_rules=options.cluster_suppression_rules, | |
| ) | |
| ) | |
| shutil.copy2(pr_scope_path, archived_snapshot_dir / pr_scope_path.name) | |
| artifacts: dict[str, str] = { | |
| "pr_scope_clusters_json": pr_scope_path.name, | |
| "archived_pr_scope_clusters_json": f"snapshots/{sid}/{pr_scope_path.name}", | |
| } | |
| if options.new_contributor_report: | |
| log("Generating new contributor dataset/report artifacts") | |
| run_new_contributor_report( | |
| NewContributorReportOptions( | |
| snapshot_dir=output_root, | |
| output_dir=output_root, | |
| output=None, | |
| json_output=None, | |
| hf_repo_id=None, | |
| hf_revision=None, | |
| hf_materialize_dir=None, | |
| window_days=options.new_contributor_window_days, | |
| max_authors=options.new_contributor_max_authors, | |
| ) | |
| ) | |
| manifest["counts"]["new_contributors"] = len( | |
| read_parquet_rows(output_root / "new_contributors.parquet") | |
| ) | |
| artifacts.update( | |
| { | |
| "new_contributors_parquet": "new_contributors.parquet", | |
| "new_contributors_json": "new-contributors-report.json", | |
| "new_contributors_markdown": "new-contributors-report.md", | |
| } | |
| ) | |
| manifest["artifacts"] = artifacts | |
| manifest["watermark"].pop("previous_snapshot_dir", None) | |
| write_json(manifest, output_root / "manifest.json") | |
| write_text( | |
| build_hf_dataset_card( | |
| repo_slug, | |
| sid, | |
| include_new_contributors=options.new_contributor_report, | |
| ), | |
| output_root / "README.md", | |
| ) | |
| write_json( | |
| { | |
| "repo": repo_slug, | |
| "last_successful_snapshot_id": sid, | |
| "effective_since": effective_since, | |
| "next_since": crawl_started_at, | |
| "updated_at": extracted_at, | |
| }, | |
| output_root / "state" / "watermark.json", | |
| ) | |
| write_json(manifest, archived_snapshot_dir / "manifest.json") | |
| write_json( | |
| { | |
| "repo": repo_slug, | |
| "latest_snapshot_id": sid, | |
| "snapshot_dir": f"snapshots/{sid}", | |
| "manifest_path": "manifest.json", | |
| "archived_manifest_path": f"snapshots/{sid}/manifest.json", | |
| "next_since": crawl_started_at, | |
| }, | |
| output_root / "snapshots" / "latest.json", | |
| ) | |
| log("Uploading updated dataset to the Hub") | |
| api.upload_folder( | |
| folder_path=str(output_root), | |
| repo_id=options.hf_repo_id, | |
| repo_type="dataset", | |
| commit_message=f"Refresh {repo_name} dataset snapshot {sid}", | |
| ) | |
| log(f"Dataset refresh complete for {options.hf_repo_id}") | |
| return { | |
| "repo": repo_slug, | |
| "dataset_id": options.hf_repo_id, | |
| "snapshot_id": sid, | |
| "effective_since": effective_since, | |
| "counts": manifest["counts"], | |
| } | |
| def main(argv: list[str] | None = None) -> None: | |
| args = parse_args(argv) | |
| result = run_dataset_refresh( | |
| DatasetRefreshOptions( | |
| repo=RepoRef.parse(args.repo), | |
| hf_repo_id=args.hf_repo_id, | |
| private_hf_repo=args.private_hf_repo, | |
| max_issues=args.max_issues, | |
| max_prs=args.max_prs, | |
| max_issue_comments=args.max_issue_comments, | |
| max_reviews_per_pr=args.max_reviews_per_pr, | |
| max_review_comments_per_pr=args.max_review_comments_per_pr, | |
| fetch_timeline=args.fetch_timeline, | |
| new_contributor_report=args.new_contributor_report, | |
| new_contributor_window_days=args.new_contributor_window_days, | |
| new_contributor_max_authors=args.new_contributor_max_authors, | |
| http_timeout=args.http_timeout, | |
| http_max_retries=args.http_max_retries, | |
| checkpoint_every_comments=args.checkpoint_every_comments, | |
| checkpoint_every_prs=args.checkpoint_every_prs, | |
| cluster_suppression_rules=tuple(args.cluster_suppression_rules), | |
| ) | |
| ) | |
| print(json.dumps(result, indent=2)) | |
| if __name__ == "__main__": | |
| main() | |