diffusers-pr-api / src /slop_farmer /app /dataset_refresh.py
evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
import argparse
import json
import os
import shutil
import tempfile
import time
from collections import defaultdict
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from huggingface_hub import HfApi
from slop_farmer.app_config import command_defaults, extract_cli_config_path
from slop_farmer.config import (
DatasetRefreshOptions,
NewContributorReportOptions,
PrScopeOptions,
RepoRef,
resolve_github_token,
)
from slop_farmer.data.dataset_card import build_hf_dataset_card
from slop_farmer.data.github_api import GitHubClient
from slop_farmer.data.hf_dataset_repo import (
list_remote_paths,
load_remote_file,
load_remote_json_file,
stable_snapshot_candidates,
)
from slop_farmer.data.links import build_pr_duplicate_candidate_rows, build_text_link_rows
from slop_farmer.data.normalize import (
issue_url_to_number,
normalize_comment,
normalize_issue,
normalize_pr_diff,
normalize_pr_file,
normalize_pull_request,
normalize_review,
normalize_review_comment,
normalize_timeline_event,
)
from slop_farmer.data.parquet_io import (
SCHEMAS,
read_parquet_rows,
write_json,
write_parquet,
write_text,
)
from slop_farmer.reports.new_contributor_report import run_new_contributor_report
from slop_farmer.reports.pr_scope import run_pr_scope_report
PRIMARY_KEYS: dict[str, tuple[str, ...]] = {
"issues": ("github_id",),
"pull_requests": ("github_id",),
"comments": ("github_id",),
"reviews": ("github_id",),
"review_comments": ("github_id",),
"pr_files": ("repo", "pull_request_number", "filename"),
"pr_diffs": ("repo", "pull_request_number"),
"links": (
"repo",
"source_type",
"source_number",
"source_github_id",
"target_owner",
"target_repo",
"target_number",
"link_type",
"link_origin",
),
"events": (
"repo",
"parent_kind",
"parent_number",
"event",
"created_at",
"actor_login",
"source_issue_number",
"source_issue_url",
"commit_id",
"label_name",
),
}
CHECKPOINT_PREFIXES = ("_checkpoints", "checkpoints")
def log(message: str) -> None:
stamp = datetime.now(tz=UTC).strftime("%H:%M:%SZ")
print(f"[{stamp}] {message}", flush=True)
def iso_now() -> str:
return datetime.now(tz=UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def snapshot_id() -> str:
return datetime.now(tz=UTC).strftime("%Y%m%dT%H%M%SZ")
def row_key(row: dict[str, Any], fields: tuple[str, ...]) -> str:
return json.dumps([row.get(field) for field in fields], default=str)
def merge_rows(
table_name: str,
previous_rows: list[dict[str, Any]],
delta_rows: list[dict[str, Any]],
) -> list[dict[str, Any]]:
if table_name == "pr_files":
refreshed_prs = {
(row.get("repo"), row.get("pull_request_number"))
for row in delta_rows
if row.get("pull_request_number") is not None
}
previous_rows = [
row
for row in previous_rows
if (row.get("repo"), row.get("pull_request_number")) not in refreshed_prs
]
merged: dict[str, dict[str, Any]] = {}
for row in previous_rows:
merged[row_key(row, PRIMARY_KEYS[table_name])] = row
for row in delta_rows:
merged[row_key(row, PRIMARY_KEYS[table_name])] = row
return list(merged.values())
def checkpoint_dirs(remote_paths: set[str]) -> list[tuple[str, str]]:
by_snapshot_id: dict[str, str] = {}
for path in remote_paths:
parts = path.split("/")
if len(parts) < 3 or parts[0] not in CHECKPOINT_PREFIXES:
continue
snapshot_key = parts[1]
prefix = parts[0]
current = by_snapshot_id.get(snapshot_key)
if current is None or current.startswith("checkpoints/"):
by_snapshot_id[snapshot_key] = f"{prefix}/{snapshot_key}"
return [(sid, by_snapshot_id[sid]) for sid in sorted(by_snapshot_id)]
def copy_remote_file_from_candidates(
api: HfApi,
repo_id: str,
local_dir: Path,
destination: Path,
candidate_paths: list[str],
) -> bool:
for candidate in candidate_paths:
downloaded = load_remote_file(api, repo_id, candidate, local_dir)
if downloaded is None:
continue
destination.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(downloaded, destination)
return True
return False
def materialize_previous_snapshot_dir(
*,
api: Any,
repo_id: str,
previous_root: Path,
stable_snapshot_id: str | None,
latest_pointer: dict[str, Any] | None,
previous_tables: dict[str, list[dict[str, Any]]],
) -> Path | None:
if not stable_snapshot_id:
return None
snapshot_dir = (previous_root / "materialized-snapshots" / stable_snapshot_id).resolve()
snapshot_dir.mkdir(parents=True, exist_ok=True)
for table_name, rows in previous_tables.items():
write_parquet(rows, snapshot_dir / f"{table_name}.parquet", table_name)
for artifact_name in (
"manifest.json",
"new_contributors.parquet",
"new-contributors-report.json",
"new-contributors-report.md",
):
copy_remote_file_from_candidates(
api,
repo_id,
previous_root,
snapshot_dir / artifact_name,
stable_snapshot_candidates(latest_pointer, artifact_name),
)
return snapshot_dir
def load_remote_table_from_candidates(
api: HfApi,
repo_id: str,
table_name: str,
local_dir: Path,
candidate_paths: list[str],
) -> list[dict[str, Any]]:
for candidate in candidate_paths:
downloaded = load_remote_file(api, repo_id, candidate, local_dir)
if downloaded is not None:
return read_parquet_rows(downloaded)
return []
def viewer_comment_rows(
comments: list[dict[str, Any]],
pull_requests: list[dict[str, Any]],
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
pr_numbers = {int(row["number"]) for row in pull_requests if row.get("number") is not None}
issue_comments: list[dict[str, Any]] = []
pr_comments: list[dict[str, Any]] = []
for row in comments:
parent_number = row.get("parent_number")
parent_kind = row.get("parent_kind")
if parent_kind == "pull_request" or parent_number in pr_numbers:
pr_comments.append(row)
else:
issue_comments.append(row)
return issue_comments, pr_comments
def upload_delta_checkpoint(
*,
api: HfApi,
repo_id: str,
work_dir: Path,
repo_slug: str,
sid: str,
stage: str,
delta_tables: dict[str, list[dict[str, Any]]],
progress: dict[str, Any],
) -> None:
checkpoint_root = work_dir / "checkpoint_upload"
if checkpoint_root.exists():
shutil.rmtree(checkpoint_root)
checkpoint_root.mkdir(parents=True, exist_ok=True)
for table_name, rows in delta_tables.items():
write_parquet(rows, checkpoint_root / f"{table_name}.parquet", table_name)
write_json(
{"repo": repo_slug, "snapshot_id": sid, **progress}, checkpoint_root / "progress.json"
)
write_json(
{"repo": repo_slug, "snapshot_id": sid, **progress},
checkpoint_root / "state" / "in_progress.json",
)
api.upload_folder(
folder_path=str(checkpoint_root),
path_in_repo=f"_checkpoints/{sid}",
repo_id=repo_id,
repo_type="dataset",
commit_message=f"Checkpoint {sid} ({stage})",
)
def remaining_limit(limit: int | None, used: int) -> int | None:
if limit is None:
return None
return max(limit - used, 0)
def _build_argument_parser(*, config_path: Path | None = None) -> argparse.ArgumentParser:
defaults = command_defaults("refresh-dataset", config_path=config_path)
parser = argparse.ArgumentParser()
parser.add_argument("--config", type=Path, help="Optional repo config file.")
parser.add_argument("--repo", default=defaults.get("repo", "huggingface/transformers"))
parser.add_argument("--hf-repo-id", default=defaults.get("hf-repo-id"))
parser.add_argument("--max-issues", type=int, default=defaults.get("max-issues"))
parser.add_argument("--max-prs", type=int, default=defaults.get("max-prs"))
parser.add_argument(
"--max-issue-comments",
type=int,
default=defaults.get("max-issue-comments"),
)
parser.add_argument(
"--max-reviews-per-pr",
type=int,
default=defaults.get("max-reviews-per-pr"),
)
parser.add_argument(
"--max-review-comments-per-pr",
type=int,
default=defaults.get("max-review-comments-per-pr"),
)
parser.add_argument(
"--fetch-timeline",
action="store_true",
default=bool(defaults.get("fetch-timeline", False)),
)
parser.add_argument(
"--new-contributor-report",
dest="new_contributor_report",
action="store_true",
default=bool(defaults.get("new-contributor-report", True)),
)
parser.add_argument(
"--no-new-contributor-report",
dest="new_contributor_report",
action="store_false",
)
parser.add_argument(
"--new-contributor-window-days",
type=int,
default=int(defaults.get("new-contributor-window-days", 42)),
)
parser.add_argument(
"--new-contributor-max-authors",
type=int,
default=int(defaults.get("new-contributor-max-authors", 25)),
)
parser.add_argument("--http-timeout", type=int, default=300)
parser.add_argument("--http-max-retries", type=int, default=8)
parser.add_argument("--checkpoint-every-comments", type=int, default=1000)
parser.add_argument("--checkpoint-every-prs", type=int, default=25)
parser.add_argument(
"--private-hf-repo",
dest="private_hf_repo",
action="store_true",
default=bool(defaults.get("private-hf-repo", False)),
)
parser.add_argument("--private", dest="private_hf_repo", action="store_true")
parser.set_defaults(
cluster_suppression_rules=tuple(defaults.get("cluster-suppression-rules", ()))
)
return parser
def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
config_path = extract_cli_config_path(argv)
parser = _build_argument_parser(config_path=config_path)
args = parser.parse_args(argv)
if not args.hf_repo_id:
parser.error("--hf-repo-id is required (or set dataset_id in --config)")
return args
def run_dataset_refresh(options: DatasetRefreshOptions) -> dict[str, Any]:
hf_token = os.getenv("HF_TOKEN")
github_token = resolve_github_token()
if not github_token:
raise RuntimeError("GITHUB_TOKEN must be set or resolvable via gh auth/.env")
repo_slug = options.repo.slug
owner, repo_name = options.repo.owner, options.repo.name
sid = snapshot_id()
crawl_started_at = iso_now()
extracted_at = iso_now()
api = HfApi(token=hf_token)
api.create_repo(
repo_id=options.hf_repo_id,
repo_type="dataset",
private=options.private_hf_repo,
exist_ok=True,
)
with tempfile.TemporaryDirectory(prefix="slop-farmer-job-") as tmp:
root = Path(tmp)
previous_root = root / "previous"
output_root = root / "output"
previous_root.mkdir(parents=True, exist_ok=True)
output_root.mkdir(parents=True, exist_ok=True)
remote_paths = list_remote_paths(api, options.hf_repo_id)
previous_watermark = load_remote_json_file(
api, options.hf_repo_id, "state/watermark.json", previous_root
)
remote_manifest = load_remote_json_file(
api, options.hf_repo_id, "manifest.json", previous_root
)
latest_pointer = (
load_remote_json_file(api, options.hf_repo_id, "snapshots/latest.json", previous_root)
if "snapshots/latest.json" in remote_paths
else None
)
stable_snapshot_id = None
if previous_watermark:
stable_snapshot_id = previous_watermark.get("last_successful_snapshot_id")
elif latest_pointer:
stable_snapshot_id = latest_pointer.get("latest_snapshot_id")
elif remote_manifest:
stable_snapshot_id = remote_manifest.get("snapshot_id")
log(f"Starting dataset refresh for {repo_slug}")
log(f"Target dataset repo: {options.hf_repo_id}")
previous_tables = {
table_name: [] for table_name in SCHEMAS if table_name != "new_contributors"
}
for table_name in previous_tables:
previous_tables[table_name] = load_remote_table_from_candidates(
api,
options.hf_repo_id,
table_name,
previous_root,
stable_snapshot_candidates(latest_pointer, f"{table_name}.parquet"),
)
checkpoint_progress: dict[str, Any] | None = None
best_comment_checkpoint_progress: dict[str, Any] | None = None
for checkpoint_sid, checkpoint_dir in checkpoint_dirs(remote_paths):
if stable_snapshot_id is not None and checkpoint_sid <= str(stable_snapshot_id):
continue
progress_payload = load_remote_json_file(
api, options.hf_repo_id, f"{checkpoint_dir}/progress.json", previous_root
) or load_remote_json_file(
api,
options.hf_repo_id,
f"{checkpoint_dir}/state/in_progress.json",
previous_root,
)
if progress_payload is not None:
checkpoint_progress = progress_payload
if (
progress_payload.get("effective_since") is None
and (progress_payload.get("counts") or {}).get("comments", 0) > 0
and (
best_comment_checkpoint_progress is None
or (progress_payload.get("counts") or {}).get("comments", 0)
> (best_comment_checkpoint_progress.get("counts") or {}).get("comments", 0)
)
):
best_comment_checkpoint_progress = progress_payload
for table_name in previous_tables:
checkpoint_rows = load_remote_table_from_candidates(
api,
options.hf_repo_id,
table_name,
previous_root,
[f"{checkpoint_dir}/{table_name}.parquet"],
)
if checkpoint_rows:
previous_tables[table_name] = merge_rows(
table_name,
previous_tables[table_name],
checkpoint_rows,
)
effective_since = None
if checkpoint_progress and checkpoint_progress.get("effective_since") is not None:
effective_since = checkpoint_progress.get("effective_since")
log(f"Resuming from incomplete checkpoint window starting at {effective_since}")
elif previous_watermark and previous_watermark.get("next_since") is not None:
effective_since = previous_watermark.get("next_since")
log(f"Resuming from remote watermark {effective_since}")
elif (
remote_manifest
and isinstance(remote_manifest.get("watermark"), dict)
and remote_manifest["watermark"].get("next_since") is not None
):
effective_since = remote_manifest["watermark"].get("next_since")
log(f"Bootstrapping remote watermark from root manifest {effective_since}")
else:
log("No successful watermark found; running full snapshot")
client = GitHubClient(
token=github_token,
timeout=options.http_timeout,
max_retries=options.http_max_retries,
log=log,
)
previous_snapshot_dir = materialize_previous_snapshot_dir(
api=api,
repo_id=options.hf_repo_id,
previous_root=previous_root,
stable_snapshot_id=str(stable_snapshot_id) if stable_snapshot_id is not None else None,
latest_pointer=latest_pointer,
previous_tables=previous_tables,
)
rate_limit = client.get_json("/rate_limit")
core = (rate_limit.get("resources") or {}).get("core") or {}
limit = core.get("limit")
remaining = core.get("remaining")
reset_at = core.get("reset")
log(f"GitHub core rate limit: limit={limit} remaining={remaining} reset={reset_at}")
if limit is not None and int(limit) <= 60:
raise RuntimeError("GITHUB_TOKEN appears to be missing, invalid, or not being applied")
if remaining == 0 and reset_at:
sleep_for = max(int(reset_at) - int(time.time()), 1)
log(f"GitHub token exhausted before bootstrap; sleeping {sleep_for}s until reset")
time.sleep(sleep_for)
log("Fetching changed issue and pull request stubs from GitHub")
issue_stubs = list(
client.iter_repo_issues(owner, repo_name, effective_since, options.max_issues)
)
issues = [item for item in issue_stubs if "pull_request" not in item]
pr_stubs = [item for item in issue_stubs if "pull_request" in item]
if options.max_prs is not None:
pr_stubs = pr_stubs[: options.max_prs]
log(f"Fetched {len(issue_stubs)} changed stubs")
issue_number_to_kind = {
item["number"]: ("pull_request" if "pull_request" in item else "issue")
for item in issue_stubs
}
issue_rows = [normalize_issue(repo_slug, item, sid, extracted_at) for item in issues]
comment_rows: list[dict[str, Any]] = []
next_comment_checkpoint = options.checkpoint_every_comments
reuse_checkpoint_comments = (
stable_snapshot_id is None
and effective_since is None
and best_comment_checkpoint_progress is not None
and bool(previous_tables["comments"])
)
if reuse_checkpoint_comments:
log(
f"Reusing {len(previous_tables['comments'])} checkpoint comments from prior partial runs"
)
else:
for index, item in enumerate(issue_stubs, start=1):
if not item.get("comments"):
continue
remaining_comments = remaining_limit(options.max_issue_comments, len(comment_rows))
if remaining_comments == 0:
break
if index == 1 or index % 25 == 0:
log(f"Collecting discussion comments; {len(comment_rows)} collected so far")
for comment in client.iter_issue_comments_for_number(
owner,
repo_name,
int(item["number"]),
effective_since,
remaining_comments,
):
parent_number = issue_url_to_number(comment.get("issue_url"))
parent_kind = issue_number_to_kind.get(parent_number, "issue_or_pr")
comment_rows.append(
normalize_comment(
repo_slug,
comment,
parent_kind,
parent_number,
sid,
extracted_at,
)
)
remaining_comments = remaining_limit(
options.max_issue_comments,
len(comment_rows),
)
if (
options.checkpoint_every_comments
and len(comment_rows) >= next_comment_checkpoint
):
log(f"Pushing comment checkpoint to Hub at {len(comment_rows)} comments")
upload_delta_checkpoint(
api=api,
repo_id=options.hf_repo_id,
work_dir=root,
repo_slug=repo_slug,
sid=sid,
stage="comments",
delta_tables={
"issues": issue_rows,
"pull_requests": [],
"comments": comment_rows,
"reviews": [],
"review_comments": [],
"pr_files": [],
"pr_diffs": [],
"links": [],
"events": [],
},
progress={
"stage": "comments",
"effective_since": effective_since,
"counts": {
"issues": len(issue_rows),
"comments": len(comment_rows),
"pull_requests": 0,
"reviews": 0,
"review_comments": 0,
"pr_files": 0,
"pr_diffs": 0,
"links": 0,
"events": 0,
},
},
)
next_comment_checkpoint += options.checkpoint_every_comments
if remaining_comments == 0:
break
pr_rows: list[dict[str, Any]] = []
review_rows: list[dict[str, Any]] = []
review_comment_rows: list[dict[str, Any]] = []
pr_file_rows: list[dict[str, Any]] = []
pr_diff_rows: list[dict[str, Any]] = []
event_rows: list[dict[str, Any]] = []
next_pr_checkpoint = options.checkpoint_every_prs
previous_pr_rows_by_number = {
int(row["number"]): row
for row in previous_tables["pull_requests"]
if row.get("number") is not None
}
previous_review_rows_by_number: defaultdict[int, list[dict[str, Any]]] = defaultdict(list)
for row in previous_tables["reviews"]:
if row.get("pull_request_number") is not None:
previous_review_rows_by_number[int(row["pull_request_number"])].append(row)
previous_review_comment_rows_by_number: defaultdict[int, list[dict[str, Any]]] = (
defaultdict(list)
)
for row in previous_tables["review_comments"]:
if row.get("pull_request_number") is not None:
previous_review_comment_rows_by_number[int(row["pull_request_number"])].append(row)
previous_pr_file_rows_by_number: defaultdict[int, list[dict[str, Any]]] = defaultdict(list)
for row in previous_tables["pr_files"]:
if row.get("pull_request_number") is not None:
previous_pr_file_rows_by_number[int(row["pull_request_number"])].append(row)
previous_pr_diff_rows_by_number = {
int(row["pull_request_number"]): row
for row in previous_tables["pr_diffs"]
if row.get("pull_request_number") is not None
}
previous_pr_event_rows_by_number: defaultdict[int, list[dict[str, Any]]] = defaultdict(list)
for row in previous_tables["events"]:
if row.get("parent_kind") == "pull_request" and row.get("parent_number") is not None:
previous_pr_event_rows_by_number[int(row["parent_number"])].append(row)
hydration_pr_stubs: list[dict[str, Any]] = []
for pr_stub in pr_stubs:
number = int(pr_stub["number"])
previous_pr_row = previous_pr_rows_by_number.get(number)
if previous_pr_row and previous_pr_row.get("updated_at") == pr_stub.get("updated_at"):
pr_rows.append(previous_pr_row)
review_rows.extend(previous_review_rows_by_number[number])
review_comment_rows.extend(previous_review_comment_rows_by_number[number])
pr_file_rows.extend(previous_pr_file_rows_by_number[number])
if number in previous_pr_diff_rows_by_number:
pr_diff_rows.append(previous_pr_diff_rows_by_number[number])
event_rows.extend(previous_pr_event_rows_by_number[number])
continue
hydration_pr_stubs.append(pr_stub)
reused_pr_count = len(pr_rows)
if reused_pr_count:
log(f"Reusing hydrated data for {reused_pr_count} pull requests from prior checkpoints")
if options.checkpoint_every_prs:
while reused_pr_count >= next_pr_checkpoint:
next_pr_checkpoint += options.checkpoint_every_prs
total_prs = len(pr_stubs)
remaining_prs = len(hydration_pr_stubs)
for index, pr_stub in enumerate(hydration_pr_stubs, start=1):
number = int(pr_stub["number"])
hydrated_count = reused_pr_count + index
if index == 1 or hydrated_count % 10 == 0 or index == remaining_prs:
log(f"Hydrating pull requests: {hydrated_count}/{total_prs}")
detail = client.get_pull_request(owner, repo_name, number)
pr_rows.append(normalize_pull_request(repo_slug, pr_stub, detail, sid, extracted_at))
for review in client.iter_pull_reviews(
owner, repo_name, number, options.max_reviews_per_pr
):
review_rows.append(normalize_review(repo_slug, number, review, sid, extracted_at))
for comment in client.iter_pull_review_comments(
owner,
repo_name,
number,
options.max_review_comments_per_pr,
):
review_comment_rows.append(
normalize_review_comment(repo_slug, number, comment, sid, extracted_at)
)
for pr_file in client.iter_pull_files(owner, repo_name, number):
pr_file_rows.append(
normalize_pr_file(repo_slug, number, pr_file, sid, extracted_at)
)
pr_diff_rows.append(
normalize_pr_diff(
repo_slug,
number,
pr_stub.get("html_url"),
pr_stub.get("url"),
client.get_pull_request_diff(owner, repo_name, number),
sid,
extracted_at,
)
)
if options.fetch_timeline:
for event in client.iter_issue_timeline(owner, repo_name, number):
event_rows.append(
normalize_timeline_event(
repo_slug,
number,
"pull_request",
event,
sid,
extracted_at,
)
)
if options.checkpoint_every_prs and len(pr_rows) >= next_pr_checkpoint:
log(f"Pushing PR checkpoint to Hub at {len(pr_rows)} hydrated PRs")
upload_delta_checkpoint(
api=api,
repo_id=options.hf_repo_id,
work_dir=root,
repo_slug=repo_slug,
sid=sid,
stage="pull_requests",
delta_tables={
"issues": issue_rows,
"pull_requests": pr_rows,
"comments": comment_rows,
"reviews": review_rows,
"review_comments": review_comment_rows,
"pr_files": pr_file_rows,
"pr_diffs": pr_diff_rows,
"links": [],
"events": event_rows,
},
progress={
"stage": "pull_requests",
"effective_since": effective_since,
"counts": {
"issues": len(issue_rows),
"comments": len(comment_rows),
"pull_requests": len(pr_rows),
"reviews": len(review_rows),
"review_comments": len(review_comment_rows),
"pr_files": len(pr_file_rows),
"pr_diffs": len(pr_diff_rows),
"links": 0,
"events": len(event_rows),
},
},
)
next_pr_checkpoint += options.checkpoint_every_prs
if options.fetch_timeline:
log(f"Fetching issue timelines for {len(issues)} changed issues")
for issue in issues:
for event in client.iter_issue_timeline(owner, repo_name, int(issue["number"])):
event_rows.append(
normalize_timeline_event(
repo_slug,
int(issue["number"]),
"issue",
event,
sid,
extracted_at,
)
)
link_rows: list[dict[str, Any]] = []
for row in issue_rows:
link_rows.extend(
build_text_link_rows(
repo=repo_slug,
owner=owner,
repo_name=repo_name,
source_type="issue",
source_number=row["number"],
source_id=row["github_id"],
body=row["body"],
snapshot_id=sid,
extracted_at=extracted_at,
)
)
for row in pr_rows:
link_rows.extend(
build_text_link_rows(
repo=repo_slug,
owner=owner,
repo_name=repo_name,
source_type="pull_request",
source_number=row["number"],
source_id=row["github_id"],
body=row["body"],
snapshot_id=sid,
extracted_at=extracted_at,
)
)
for row in comment_rows or previous_tables["comments"]:
if row["parent_number"] is None:
continue
link_rows.extend(
build_text_link_rows(
repo=repo_slug,
owner=owner,
repo_name=repo_name,
source_type="comment",
source_number=row["parent_number"],
source_id=row["github_id"],
body=row["body"],
snapshot_id=sid,
extracted_at=extracted_at,
)
)
for row in review_rows:
link_rows.extend(
build_text_link_rows(
repo=repo_slug,
owner=owner,
repo_name=repo_name,
source_type="review",
source_number=row["pull_request_number"],
source_id=row["github_id"],
body=row["body"],
snapshot_id=sid,
extracted_at=extracted_at,
)
)
for row in review_comment_rows:
link_rows.extend(
build_text_link_rows(
repo=repo_slug,
owner=owner,
repo_name=repo_name,
source_type="review_comment",
source_number=row["pull_request_number"],
source_id=row["github_id"],
body=row["body"],
snapshot_id=sid,
extracted_at=extracted_at,
)
)
link_rows.extend(
build_pr_duplicate_candidate_rows(
repo=repo_slug,
pull_requests=pr_rows,
link_rows=link_rows,
snapshot_id=sid,
extracted_at=extracted_at,
)
)
for event in event_rows:
if event.get("source_issue_number"):
link_rows.append(
{
"repo": repo_slug,
"source_type": event["parent_kind"],
"source_number": event["parent_number"],
"source_github_id": None,
"target_owner": owner,
"target_repo": repo_name,
"target_number": event["source_issue_number"],
"link_type": f"timeline:{event['event']}",
"link_origin": "timeline",
"snapshot_id": sid,
"extracted_at": extracted_at,
}
)
delta_tables = {
"issues": issue_rows,
"pull_requests": pr_rows,
"comments": comment_rows,
"reviews": review_rows,
"review_comments": review_comment_rows,
"pr_files": pr_file_rows,
"pr_diffs": pr_diff_rows,
"links": link_rows,
"events": event_rows,
}
if any(delta_tables.values()):
log("Pushing final delta checkpoint to Hub before merge upload")
upload_delta_checkpoint(
api=api,
repo_id=options.hf_repo_id,
work_dir=root,
repo_slug=repo_slug,
sid=sid,
stage="final-delta",
delta_tables=delta_tables,
progress={
"stage": "final-delta",
"effective_since": effective_since,
"counts": {name: len(rows) for name, rows in delta_tables.items()},
},
)
final_tables = {
table_name: merge_rows(table_name, previous_tables[table_name], delta_rows)
for table_name, delta_rows in delta_tables.items()
}
manifest: dict[str, Any] = {
"repo": repo_slug,
"snapshot_id": sid,
"crawl_started_at": crawl_started_at,
"extracted_at": extracted_at,
"watermark": {
"effective_since": effective_since,
"next_since": crawl_started_at,
"previous_snapshot_dir": (
str(previous_snapshot_dir) if previous_snapshot_dir is not None else None
),
},
"delta_counts": {
"issue_stubs": len(issue_stubs),
"issues": len(issue_rows),
"pull_requests": len(pr_rows),
"comments": len(comment_rows),
"reviews": len(review_rows),
"review_comments": len(review_comment_rows),
"pr_files": len(pr_file_rows),
"pr_diffs": len(pr_diff_rows),
"timeline_events": len(event_rows),
"links": len(link_rows),
},
"counts": {
"issues": len(final_tables["issues"]),
"pull_requests": len(final_tables["pull_requests"]),
"comments": len(final_tables["comments"]),
"reviews": len(final_tables["reviews"]),
"review_comments": len(final_tables["review_comments"]),
"pr_files": len(final_tables["pr_files"]),
"pr_diffs": len(final_tables["pr_diffs"]),
"timeline_events": len(final_tables["events"]),
"links": len(final_tables["links"]),
},
}
log("Writing updated dataset files")
for table_name, rows in final_tables.items():
write_parquet(rows, output_root / f"{table_name}.parquet", table_name)
issue_comment_rows, pr_comment_rows = viewer_comment_rows(
final_tables["comments"],
final_tables["pull_requests"],
)
write_parquet(issue_comment_rows, output_root / "issue_comments.parquet", "comments")
write_parquet(pr_comment_rows, output_root / "pr_comments.parquet", "comments")
archived_snapshot_dir = output_root / "snapshots" / sid
archived_snapshot_dir.mkdir(parents=True, exist_ok=True)
write_json(manifest, output_root / "manifest.json")
log("Generating PR scope clusters")
pr_scope_path = run_pr_scope_report(
PrScopeOptions(
snapshot_dir=output_root,
output_dir=output_root,
output=output_root / "pr-scope-clusters.json",
hf_repo_id=None,
hf_revision=None,
hf_materialize_dir=None,
cluster_suppression_rules=options.cluster_suppression_rules,
)
)
shutil.copy2(pr_scope_path, archived_snapshot_dir / pr_scope_path.name)
artifacts: dict[str, str] = {
"pr_scope_clusters_json": pr_scope_path.name,
"archived_pr_scope_clusters_json": f"snapshots/{sid}/{pr_scope_path.name}",
}
if options.new_contributor_report:
log("Generating new contributor dataset/report artifacts")
run_new_contributor_report(
NewContributorReportOptions(
snapshot_dir=output_root,
output_dir=output_root,
output=None,
json_output=None,
hf_repo_id=None,
hf_revision=None,
hf_materialize_dir=None,
window_days=options.new_contributor_window_days,
max_authors=options.new_contributor_max_authors,
)
)
manifest["counts"]["new_contributors"] = len(
read_parquet_rows(output_root / "new_contributors.parquet")
)
artifacts.update(
{
"new_contributors_parquet": "new_contributors.parquet",
"new_contributors_json": "new-contributors-report.json",
"new_contributors_markdown": "new-contributors-report.md",
}
)
manifest["artifacts"] = artifacts
manifest["watermark"].pop("previous_snapshot_dir", None)
write_json(manifest, output_root / "manifest.json")
write_text(
build_hf_dataset_card(
repo_slug,
sid,
include_new_contributors=options.new_contributor_report,
),
output_root / "README.md",
)
write_json(
{
"repo": repo_slug,
"last_successful_snapshot_id": sid,
"effective_since": effective_since,
"next_since": crawl_started_at,
"updated_at": extracted_at,
},
output_root / "state" / "watermark.json",
)
write_json(manifest, archived_snapshot_dir / "manifest.json")
write_json(
{
"repo": repo_slug,
"latest_snapshot_id": sid,
"snapshot_dir": f"snapshots/{sid}",
"manifest_path": "manifest.json",
"archived_manifest_path": f"snapshots/{sid}/manifest.json",
"next_since": crawl_started_at,
},
output_root / "snapshots" / "latest.json",
)
log("Uploading updated dataset to the Hub")
api.upload_folder(
folder_path=str(output_root),
repo_id=options.hf_repo_id,
repo_type="dataset",
commit_message=f"Refresh {repo_name} dataset snapshot {sid}",
)
log(f"Dataset refresh complete for {options.hf_repo_id}")
return {
"repo": repo_slug,
"dataset_id": options.hf_repo_id,
"snapshot_id": sid,
"effective_since": effective_since,
"counts": manifest["counts"],
}
def main(argv: list[str] | None = None) -> None:
args = parse_args(argv)
result = run_dataset_refresh(
DatasetRefreshOptions(
repo=RepoRef.parse(args.repo),
hf_repo_id=args.hf_repo_id,
private_hf_repo=args.private_hf_repo,
max_issues=args.max_issues,
max_prs=args.max_prs,
max_issue_comments=args.max_issue_comments,
max_reviews_per_pr=args.max_reviews_per_pr,
max_review_comments_per_pr=args.max_review_comments_per_pr,
fetch_timeline=args.fetch_timeline,
new_contributor_report=args.new_contributor_report,
new_contributor_window_days=args.new_contributor_window_days,
new_contributor_max_authors=args.new_contributor_max_authors,
http_timeout=args.http_timeout,
http_max_retries=args.http_max_retries,
checkpoint_every_comments=args.checkpoint_every_comments,
checkpoint_every_prs=args.checkpoint_every_prs,
cluster_suppression_rules=tuple(args.cluster_suppression_rules),
)
)
print(json.dumps(result, indent=2))
if __name__ == "__main__":
main()