evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
import json
import shutil
from collections.abc import Iterable
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any, Protocol
from slop_farmer.config import NewContributorReportOptions, PipelineOptions, resolve_github_token
from slop_farmer.data.dataset_card import build_hf_dataset_card
from slop_farmer.data.github_api import GitHubClient
from slop_farmer.data.links import build_pr_duplicate_candidate_rows, build_text_link_rows
from slop_farmer.data.normalize import (
issue_url_to_number,
normalize_comment,
normalize_issue,
normalize_pr_diff,
normalize_pr_file,
normalize_pull_request,
normalize_review,
normalize_review_comment,
normalize_timeline_event,
)
from slop_farmer.data.parquet_io import (
read_json,
read_parquet_rows,
write_json,
write_parquet,
write_text,
)
from slop_farmer.reports.new_contributor_report import run_new_contributor_report
# Navigation:
# - protocol + small time/log/view helpers
# - checkpoint/state helpers for resumable crawls
# - incremental merge helpers
# - run_pipeline(): fetch -> hydrate -> derive links -> merge/write -> publish
class GitHubClientLike(Protocol):
def iter_repo_issues(
self, owner: str, repo: str, since: str | None, limit: int | None
) -> Iterable[dict[str, Any]]: ...
def iter_issue_comments_for_number(
self, owner: str, repo: str, number: int, since: str | None, limit: int | None = None
) -> Iterable[dict[str, Any]]: ...
def get_pull_request(self, owner: str, repo: str, number: int) -> dict[str, Any]: ...
def iter_pull_reviews(
self, owner: str, repo: str, number: int, limit: int | None = None
) -> Iterable[dict[str, Any]]: ...
def iter_pull_review_comments(
self, owner: str, repo: str, number: int, limit: int | None = None
) -> Iterable[dict[str, Any]]: ...
def iter_pull_files(
self, owner: str, repo: str, number: int, limit: int | None = None
) -> Iterable[dict[str, Any]]: ...
def get_pull_request_diff(self, owner: str, repo: str, number: int) -> str: ...
def iter_issue_timeline(
self, owner: str, repo: str, number: int, limit: int | None = None
) -> Iterable[dict[str, Any]]: ...
def _iso_now() -> str:
return datetime.now(tz=UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def _snapshot_id() -> str:
return datetime.now(tz=UTC).strftime("%Y%m%dT%H%M%SZ")
def _log(message: str) -> None:
stamp = datetime.now(tz=UTC).strftime("%H:%M:%SZ")
print(f"[{stamp}] {message}", flush=True)
def _remaining_limit(limit: int | None, used: int) -> int | None:
if limit is None:
return None
return max(limit - used, 0)
def _created_after_cutoff(
item: dict[str, Any], max_age_days: int | None, reference_time: datetime
) -> bool:
if max_age_days is None:
return True
created_at = item.get("created_at")
if not created_at:
return False
try:
created_dt = datetime.fromisoformat(str(created_at).replace("Z", "+00:00"))
except ValueError:
return False
return created_dt >= reference_time - timedelta(days=max_age_days)
def _reference_time_for_age_caps(crawl_started_at: str) -> datetime:
try:
return datetime.fromisoformat(crawl_started_at.replace("Z", "+00:00"))
except ValueError:
return datetime.now(tz=UTC)
def _dataset_card(
repo: str, snapshot_id: str, manifest: dict[str, Any], *, include_new_contributors: bool = False
) -> str:
notes = ["new contributor reviewer artifacts are included"] if include_new_contributors else []
del manifest
return build_hf_dataset_card(
repo,
snapshot_id,
include_new_contributors=include_new_contributors,
notes=notes,
)
def _viewer_comment_rows(
comments: list[dict[str, Any]],
pull_requests: list[dict[str, Any]],
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
pr_numbers = {int(row["number"]) for row in pull_requests if row.get("number") is not None}
issue_comments: list[dict[str, Any]] = []
pr_comments: list[dict[str, Any]] = []
for row in comments:
parent_number = row.get("parent_number")
parent_kind = row.get("parent_kind")
if parent_kind == "pull_request" or parent_number in pr_numbers:
pr_comments.append(row)
else:
issue_comments.append(row)
return issue_comments, pr_comments
PRIMARY_KEYS: dict[str, tuple[str, ...]] = {
"issues": ("github_id",),
"pull_requests": ("github_id",),
"comments": ("github_id",),
"reviews": ("github_id",),
"review_comments": ("github_id",),
"pr_files": ("repo", "pull_request_number", "filename"),
"pr_diffs": ("repo", "pull_request_number"),
"links": (
"repo",
"source_type",
"source_number",
"source_github_id",
"target_owner",
"target_repo",
"target_number",
"link_type",
"link_origin",
),
"events": (
"repo",
"parent_kind",
"parent_number",
"event",
"created_at",
"actor_login",
"source_issue_number",
"source_issue_url",
"commit_id",
"label_name",
),
}
CHECKPOINT_VERSION = 1
CHECKPOINT_PR_INTERVAL = 5
CHECKPOINT_ISSUE_TIMELINE_INTERVAL = 25
CHECKPOINT_TABLE_NAMES = (
"issues",
"comments",
"pull_requests",
"reviews",
"review_comments",
"pr_files",
"pr_diffs",
"events",
)
# Checkpoint/state helpers
def _state_dir(output_dir: Path) -> Path:
return output_dir / "state"
def _in_progress_path(output_dir: Path) -> Path:
return _state_dir(output_dir) / "in_progress.json"
def _watermark_path(output_dir: Path) -> Path:
return _state_dir(output_dir) / "watermark.json"
def _latest_snapshot_pointer_path(output_dir: Path) -> Path:
return output_dir / "snapshots" / "latest.json"
def _checkpoint_dir(snapshot_dir: Path) -> Path:
return snapshot_dir / "_checkpoint"
def _checkpoint_progress_path(snapshot_dir: Path) -> Path:
return _checkpoint_dir(snapshot_dir) / "progress.json"
def _checkpoint_table_path(snapshot_dir: Path, table_name: str) -> Path:
return _checkpoint_dir(snapshot_dir) / f"{table_name}.parquet"
def _checkpoint_options(options: PipelineOptions) -> dict[str, Any]:
return {
"max_issues": options.max_issues,
"max_prs": options.max_prs,
"max_issue_comments": options.max_issue_comments,
"max_reviews_per_pr": options.max_reviews_per_pr,
"max_review_comments_per_pr": options.max_review_comments_per_pr,
"issue_max_age_days": options.issue_max_age_days,
"pr_max_age_days": options.pr_max_age_days,
"fetch_timeline": options.fetch_timeline,
}
def _load_in_progress_checkpoint(options: PipelineOptions, repo_slug: str) -> dict[str, Any] | None:
if not options.resume or options.since:
return None
path = _in_progress_path(options.output_dir)
if not path.exists():
return None
payload = read_json(path)
if not isinstance(payload, dict):
return None
if payload.get("version") != CHECKPOINT_VERSION or payload.get("repo") != repo_slug:
return None
if payload.get("options") != _checkpoint_options(options):
_log(f"Ignoring incompatible in-progress checkpoint: {path}")
return None
snapshot_dir_raw = payload.get("snapshot_dir")
if not isinstance(snapshot_dir_raw, str) or not snapshot_dir_raw:
return None
payload["snapshot_dir"] = Path(snapshot_dir_raw)
previous_snapshot_dir_raw = payload.get("previous_snapshot_dir")
payload["previous_snapshot_dir"] = (
Path(previous_snapshot_dir_raw)
if isinstance(previous_snapshot_dir_raw, str) and previous_snapshot_dir_raw
else None
)
return payload
def _load_checkpoint_rows(snapshot_dir: Path, table_name: str) -> list[dict[str, Any]]:
return read_parquet_rows(_checkpoint_table_path(snapshot_dir, table_name))
def _write_checkpoint(
*,
options: PipelineOptions,
repo_slug: str,
snapshot_id: str,
snapshot_dir: Path,
effective_since: str | None,
crawl_started_at: str,
extracted_at: str,
previous_snapshot_dir: Path | None,
merge_with_previous: bool,
phase: str,
comments_done: bool,
issue_rows: list[dict[str, Any]],
comment_rows: list[dict[str, Any]],
pr_rows: list[dict[str, Any]],
review_rows: list[dict[str, Any]],
review_comment_rows: list[dict[str, Any]],
pr_file_rows: list[dict[str, Any]],
pr_diff_rows: list[dict[str, Any]],
timeline_rows: list[dict[str, Any]],
completed_issue_timeline_numbers: set[int],
) -> None:
checkpoint_tables = {
"issues": issue_rows,
"comments": comment_rows,
"pull_requests": pr_rows,
"reviews": review_rows,
"review_comments": review_comment_rows,
"pr_files": pr_file_rows,
"pr_diffs": pr_diff_rows,
"events": timeline_rows,
}
for table_name, rows in checkpoint_tables.items():
write_parquet(rows, _checkpoint_table_path(snapshot_dir, table_name), table_name)
progress = {
"version": CHECKPOINT_VERSION,
"repo": repo_slug,
"snapshot_id": snapshot_id,
"snapshot_dir": str(snapshot_dir),
"effective_since": effective_since,
"crawl_started_at": crawl_started_at,
"extracted_at": extracted_at,
"phase": phase,
"comments_done": comments_done,
"merge_with_previous": merge_with_previous,
"previous_snapshot_dir": str(previous_snapshot_dir) if previous_snapshot_dir else None,
"completed_pr_numbers": sorted(
int(row["number"]) for row in pr_rows if row.get("number") is not None
),
"completed_issue_timeline_numbers": sorted(completed_issue_timeline_numbers),
"options": _checkpoint_options(options),
"counts": {table_name: len(rows) for table_name, rows in checkpoint_tables.items()},
}
write_json(progress, _checkpoint_progress_path(snapshot_dir))
write_json(progress, _in_progress_path(options.output_dir))
def _clear_checkpoint(output_dir: Path, snapshot_dir: Path) -> None:
checkpoint_state = _in_progress_path(output_dir)
if checkpoint_state.exists():
checkpoint_state.unlink()
checkpoint_dir = _checkpoint_dir(snapshot_dir)
if checkpoint_dir.exists():
shutil.rmtree(checkpoint_dir)
def _load_watermark(output_dir: Path) -> dict[str, Any] | None:
path = _watermark_path(output_dir)
if not path.exists():
return None
return read_json(path)
def _load_latest_snapshot_pointer(output_dir: Path) -> dict[str, Any] | None:
path = _latest_snapshot_pointer_path(output_dir)
if not path.exists():
return None
return read_json(path)
def _resolve_effective_since(
options: PipelineOptions, repo_slug: str
) -> tuple[str | None, dict[str, Any] | None]:
if options.since:
return options.since, None
if not options.resume:
return None, None
watermark = _load_watermark(options.output_dir)
if not watermark or watermark.get("repo") != repo_slug:
return None, watermark
return watermark.get("next_since"), watermark
def _previous_snapshot_dir(output_dir: Path, repo_slug: str) -> Path | None:
latest = _load_latest_snapshot_pointer(output_dir)
if not latest:
return None
latest_repo = latest.get("repo")
if latest_repo and latest_repo != repo_slug:
return None
snapshot_dir = latest.get("snapshot_dir")
if not snapshot_dir:
return None
path = Path(snapshot_dir)
return path if path.exists() else None
# Incremental merge helpers
def _row_key(row: dict[str, Any], key_fields: tuple[str, ...]) -> str:
return json.dumps([row.get(field) for field in key_fields], sort_keys=False, default=str)
def _merge_rows(
table_name: str, previous_rows: list[dict[str, Any]], delta_rows: list[dict[str, Any]]
) -> list[dict[str, Any]]:
key_fields = PRIMARY_KEYS[table_name]
if table_name == "pr_files":
refreshed_prs = {
(row.get("repo"), row.get("pull_request_number"))
for row in delta_rows
if row.get("pull_request_number") is not None
}
previous_rows = [
row
for row in previous_rows
if (row.get("repo"), row.get("pull_request_number")) not in refreshed_prs
]
merged: dict[str, dict[str, Any]] = {}
for row in previous_rows:
merged[_row_key(row, key_fields)] = row
for row in delta_rows:
merged[_row_key(row, key_fields)] = row
rows = list(merged.values())
sort_fields = [
field
for field in ("number", "pull_request_number", "parent_number", "github_id", "created_at")
if rows and field in rows[0]
]
if sort_fields:
rows.sort(
key=lambda row: tuple(
"" if row.get(field) is None else str(row.get(field)) for field in sort_fields
)
)
return rows
def _load_previous_rows(snapshot_dir: Path | None, table_name: str) -> list[dict[str, Any]]:
if snapshot_dir is None:
return []
return read_parquet_rows(snapshot_dir / f"{table_name}.parquet")
# Pipeline orchestration
def run_pipeline(options: PipelineOptions, client: GitHubClientLike | None = None) -> Path:
# Resume or initialize one snapshot run.
repo_slug = options.repo.slug
checkpoint = _load_in_progress_checkpoint(options, repo_slug)
watermark: dict[str, Any] | None = None
if checkpoint:
effective_since = checkpoint.get("effective_since")
previous_snapshot_dir = checkpoint.get("previous_snapshot_dir")
merge_with_previous = bool(checkpoint.get("merge_with_previous"))
crawl_started_at = str(checkpoint.get("crawl_started_at") or _iso_now())
snapshot_id = str(checkpoint.get("snapshot_id") or _snapshot_id())
extracted_at = str(checkpoint.get("extracted_at") or _iso_now())
snapshot_dir = checkpoint["snapshot_dir"]
else:
effective_since, watermark = _resolve_effective_since(options, repo_slug)
previous_snapshot_dir = _previous_snapshot_dir(options.output_dir, repo_slug)
merge_with_previous = previous_snapshot_dir is not None and effective_since is not None
crawl_started_at = _iso_now()
snapshot_id = _snapshot_id()
extracted_at = _iso_now()
snapshot_dir = options.output_dir / "snapshots" / snapshot_id
snapshot_dir.mkdir(parents=True, exist_ok=True)
if client is None:
token = resolve_github_token()
client = GitHubClient(
token=token,
timeout=options.http_timeout,
max_retries=options.http_max_retries,
log=_log,
)
if checkpoint:
_log(f"Resuming snapshot {snapshot_id} for {repo_slug}")
_log(f"Recovered in-progress checkpoint: {_in_progress_path(options.output_dir)}")
else:
_log(f"Starting snapshot {snapshot_id} for {repo_slug}")
_log(f"Output directory: {snapshot_dir}")
if options.since:
_log(f"Using explicit since watermark: {effective_since}")
elif checkpoint:
_log(f"Resuming in-progress crawl window from {effective_since}")
elif effective_since:
source_snapshot = watermark.get("last_successful_snapshot_id") if watermark else None
_log(f"Resuming from local watermark {effective_since} from snapshot {source_snapshot}")
else:
_log("No watermark active; running full snapshot")
if merge_with_previous:
_log(f"Merging delta into previous snapshot: {previous_snapshot_dir}")
# Load any checkpointed tables before resuming remote work.
issue_rows = _load_checkpoint_rows(snapshot_dir, "issues") if checkpoint else []
comment_rows = _load_checkpoint_rows(snapshot_dir, "comments") if checkpoint else []
pr_rows = _load_checkpoint_rows(snapshot_dir, "pull_requests") if checkpoint else []
review_rows = _load_checkpoint_rows(snapshot_dir, "reviews") if checkpoint else []
review_comment_rows = (
_load_checkpoint_rows(snapshot_dir, "review_comments") if checkpoint else []
)
pr_file_rows = _load_checkpoint_rows(snapshot_dir, "pr_files") if checkpoint else []
pr_diff_rows = _load_checkpoint_rows(snapshot_dir, "pr_diffs") if checkpoint else []
timeline_rows = _load_checkpoint_rows(snapshot_dir, "events") if checkpoint else []
completed_issue_timeline_numbers = {
int(number)
for number in (checkpoint or {}).get("completed_issue_timeline_numbers", [])
if number is not None
}
comments_done = bool((checkpoint or {}).get("comments_done"))
if not checkpoint:
_write_checkpoint(
options=options,
repo_slug=repo_slug,
snapshot_id=snapshot_id,
snapshot_dir=snapshot_dir,
effective_since=effective_since,
crawl_started_at=crawl_started_at,
extracted_at=extracted_at,
previous_snapshot_dir=previous_snapshot_dir,
merge_with_previous=merge_with_previous,
phase="starting",
comments_done=False,
issue_rows=issue_rows,
comment_rows=comment_rows,
pr_rows=pr_rows,
review_rows=review_rows,
review_comment_rows=review_comment_rows,
pr_file_rows=pr_file_rows,
pr_diff_rows=pr_diff_rows,
timeline_rows=timeline_rows,
completed_issue_timeline_numbers=completed_issue_timeline_numbers,
)
# Fetch lightweight issue/PR stubs and top-level discussion comments first.
_log("Fetching issue and pull request stubs from GitHub")
issue_stubs = list(
client.iter_repo_issues(
options.repo.owner,
options.repo.name,
since=effective_since,
limit=options.max_issues,
)
)
reference_time = _reference_time_for_age_caps(crawl_started_at)
issues = [
item
for item in issue_stubs
if "pull_request" not in item
and _created_after_cutoff(item, options.issue_max_age_days, reference_time)
]
pr_stubs = [
item
for item in issue_stubs
if "pull_request" in item
and _created_after_cutoff(item, options.pr_max_age_days, reference_time)
]
if options.max_prs is not None:
pr_stubs = pr_stubs[: options.max_prs]
_log(
f"Fetched {len(issue_stubs)} stubs total: {len(issues)} issues and {len(pr_stubs)} pull requests selected"
)
if options.issue_max_age_days is not None:
_log(f"Issue import age cap: last {options.issue_max_age_days} days by created_at")
if options.pr_max_age_days is not None:
_log(f"PR import age cap: last {options.pr_max_age_days} days by created_at")
issue_number_to_kind = {
item["number"]: ("pull_request" if "pull_request" in item else "issue")
for item in issue_stubs
}
issue_rows = [normalize_issue(repo_slug, item, snapshot_id, extracted_at) for item in issues]
if comments_done:
_log(f"Reusing {len(comment_rows)} checkpointed discussion comments")
else:
comment_rows = []
comment_threads_seen = 0
for item in issue_stubs:
if not item.get("comments"):
continue
remaining = _remaining_limit(options.max_issue_comments, len(comment_rows))
if remaining == 0:
break
comment_threads_seen += 1
if comment_threads_seen == 1 or comment_threads_seen % 25 == 0:
_log(
f"Collecting discussion comments: {len(comment_rows)} gathered so far across {comment_threads_seen} threads"
)
for comment in client.iter_issue_comments_for_number(
options.repo.owner,
options.repo.name,
int(item["number"]),
since=effective_since,
limit=remaining,
):
parent_number = issue_url_to_number(comment.get("issue_url"))
kind = issue_number_to_kind.get(parent_number, "issue_or_pr")
comment_rows.append(
normalize_comment(
repo_slug, comment, kind, parent_number, snapshot_id, extracted_at
)
)
remaining = _remaining_limit(options.max_issue_comments, len(comment_rows))
if remaining == 0:
break
comments_done = True
_write_checkpoint(
options=options,
repo_slug=repo_slug,
snapshot_id=snapshot_id,
snapshot_dir=snapshot_dir,
effective_since=effective_since,
crawl_started_at=crawl_started_at,
extracted_at=extracted_at,
previous_snapshot_dir=previous_snapshot_dir,
merge_with_previous=merge_with_previous,
phase="comments_complete",
comments_done=comments_done,
issue_rows=issue_rows,
comment_rows=comment_rows,
pr_rows=pr_rows,
review_rows=review_rows,
review_comment_rows=review_comment_rows,
pr_file_rows=pr_file_rows,
pr_diff_rows=pr_diff_rows,
timeline_rows=timeline_rows,
completed_issue_timeline_numbers=completed_issue_timeline_numbers,
)
_log(f"Collected {len(comment_rows)} discussion comments")
# Hydrate PR-owned detail tables: reviews, review comments, files, diffs, timelines.
completed_pr_numbers = {int(row["number"]) for row in pr_rows if row.get("number") is not None}
if completed_pr_numbers:
_log(f"Reusing hydrated data for {len(completed_pr_numbers)} pull requests from checkpoint")
total_prs = len(pr_stubs)
for pr_stub in pr_stubs:
number = int(pr_stub["number"])
if number in completed_pr_numbers:
continue
current_pr_index = len(completed_pr_numbers) + 1
if current_pr_index == 1 or current_pr_index % 10 == 0 or current_pr_index == total_prs:
_log(f"Hydrating pull requests: {current_pr_index}/{total_prs} (current #{number})")
pr_detail = client.get_pull_request(options.repo.owner, options.repo.name, number)
pr_rows.append(
normalize_pull_request(repo_slug, pr_stub, pr_detail, snapshot_id, extracted_at)
)
for review in client.iter_pull_reviews(
options.repo.owner,
options.repo.name,
number,
limit=options.max_reviews_per_pr,
):
review_rows.append(
normalize_review(repo_slug, number, review, snapshot_id, extracted_at)
)
for review_comment in client.iter_pull_review_comments(
options.repo.owner,
options.repo.name,
number,
limit=options.max_review_comments_per_pr,
):
review_comment_rows.append(
normalize_review_comment(
repo_slug, number, review_comment, snapshot_id, extracted_at
)
)
for pr_file in client.iter_pull_files(options.repo.owner, options.repo.name, number):
pr_file_rows.append(
normalize_pr_file(repo_slug, number, pr_file, snapshot_id, extracted_at)
)
pr_diff_rows.append(
normalize_pr_diff(
repo_slug,
number,
pr_stub.get("html_url"),
pr_stub.get("url"),
client.get_pull_request_diff(options.repo.owner, options.repo.name, number),
snapshot_id,
extracted_at,
)
)
if options.fetch_timeline:
for event in client.iter_issue_timeline(options.repo.owner, options.repo.name, number):
timeline_rows.append(
normalize_timeline_event(
repo_slug, number, "pull_request", event, snapshot_id, extracted_at
)
)
completed_pr_numbers.add(number)
if (
len(completed_pr_numbers) % CHECKPOINT_PR_INTERVAL == 0
or len(completed_pr_numbers) == total_prs
):
_log(f"Checkpointing pull request hydration at {len(completed_pr_numbers)}/{total_prs}")
_write_checkpoint(
options=options,
repo_slug=repo_slug,
snapshot_id=snapshot_id,
snapshot_dir=snapshot_dir,
effective_since=effective_since,
crawl_started_at=crawl_started_at,
extracted_at=extracted_at,
previous_snapshot_dir=previous_snapshot_dir,
merge_with_previous=merge_with_previous,
phase="hydrating_pull_requests",
comments_done=comments_done,
issue_rows=issue_rows,
comment_rows=comment_rows,
pr_rows=pr_rows,
review_rows=review_rows,
review_comment_rows=review_comment_rows,
pr_file_rows=pr_file_rows,
pr_diff_rows=pr_diff_rows,
timeline_rows=timeline_rows,
completed_issue_timeline_numbers=completed_issue_timeline_numbers,
)
_log(
f"Hydrated {len(pr_rows)} pull requests, {len(review_rows)} reviews, "
f"{len(review_comment_rows)} review comments, {len(pr_file_rows)} PR files, "
f"and {len(pr_diff_rows)} PR diffs"
)
# Fetch issue timelines after PR hydration so checkpoints can resume either phase.
if options.fetch_timeline:
_log(f"Fetching timeline events for {len(issues)} issues")
if completed_issue_timeline_numbers:
_log(f"Reusing timeline checkpoints for {len(completed_issue_timeline_numbers)} issues")
for issue in issues:
number = int(issue["number"])
if number in completed_issue_timeline_numbers:
continue
for event in client.iter_issue_timeline(options.repo.owner, options.repo.name, number):
timeline_rows.append(
normalize_timeline_event(
repo_slug, number, "issue", event, snapshot_id, extracted_at
)
)
completed_issue_timeline_numbers.add(number)
if len(
completed_issue_timeline_numbers
) % CHECKPOINT_ISSUE_TIMELINE_INTERVAL == 0 or len(
completed_issue_timeline_numbers
) == len(issues):
_log(
f"Checkpointing issue timelines at {len(completed_issue_timeline_numbers)}/{len(issues)} issues"
)
_write_checkpoint(
options=options,
repo_slug=repo_slug,
snapshot_id=snapshot_id,
snapshot_dir=snapshot_dir,
effective_since=effective_since,
crawl_started_at=crawl_started_at,
extracted_at=extracted_at,
previous_snapshot_dir=previous_snapshot_dir,
merge_with_previous=merge_with_previous,
phase="fetching_issue_timelines",
comments_done=comments_done,
issue_rows=issue_rows,
comment_rows=comment_rows,
pr_rows=pr_rows,
review_rows=review_rows,
review_comment_rows=review_comment_rows,
pr_file_rows=pr_file_rows,
pr_diff_rows=pr_diff_rows,
timeline_rows=timeline_rows,
completed_issue_timeline_numbers=completed_issue_timeline_numbers,
)
_log(f"Collected {len(timeline_rows)} timeline events")
# Derive link rows, then optionally merge this delta into the previous full snapshot.
_log("Building derived link rows")
link_rows: list[dict[str, Any]] = []
for issue_row in issue_rows:
link_rows.extend(
build_text_link_rows(
repo=repo_slug,
owner=options.repo.owner,
repo_name=options.repo.name,
source_type="issue",
source_number=issue_row["number"],
source_id=issue_row["github_id"],
body=issue_row["body"],
snapshot_id=snapshot_id,
extracted_at=extracted_at,
)
)
for pr_row in pr_rows:
link_rows.extend(
build_text_link_rows(
repo=repo_slug,
owner=options.repo.owner,
repo_name=options.repo.name,
source_type="pull_request",
source_number=pr_row["number"],
source_id=pr_row["github_id"],
body=pr_row["body"],
snapshot_id=snapshot_id,
extracted_at=extracted_at,
)
)
for comment_row in comment_rows:
parent_number = comment_row.get("parent_number")
if parent_number is None:
continue
link_rows.extend(
build_text_link_rows(
repo=repo_slug,
owner=options.repo.owner,
repo_name=options.repo.name,
source_type="comment",
source_number=parent_number,
source_id=comment_row["github_id"],
body=comment_row["body"],
snapshot_id=snapshot_id,
extracted_at=extracted_at,
)
)
for review_row in review_rows:
link_rows.extend(
build_text_link_rows(
repo=repo_slug,
owner=options.repo.owner,
repo_name=options.repo.name,
source_type="review",
source_number=review_row["pull_request_number"],
source_id=review_row["github_id"],
body=review_row["body"],
snapshot_id=snapshot_id,
extracted_at=extracted_at,
)
)
for review_comment_row in review_comment_rows:
link_rows.extend(
build_text_link_rows(
repo=repo_slug,
owner=options.repo.owner,
repo_name=options.repo.name,
source_type="review_comment",
source_number=review_comment_row["pull_request_number"],
source_id=review_comment_row["github_id"],
body=review_comment_row["body"],
snapshot_id=snapshot_id,
extracted_at=extracted_at,
)
)
link_rows.extend(
build_pr_duplicate_candidate_rows(
repo=repo_slug,
pull_requests=pr_rows,
link_rows=link_rows,
snapshot_id=snapshot_id,
extracted_at=extracted_at,
)
)
for event in timeline_rows:
if event.get("source_issue_number"):
link_rows.append(
{
"repo": repo_slug,
"source_type": event["parent_kind"],
"source_number": event["parent_number"],
"source_github_id": None,
"target_owner": options.repo.owner,
"target_repo": options.repo.name,
"target_number": event["source_issue_number"],
"link_type": f"timeline:{event['event']}",
"link_origin": "timeline",
"snapshot_id": snapshot_id,
"extracted_at": extracted_at,
}
)
_log(f"Built {len(link_rows)} link rows")
delta_tables = {
"issues": issue_rows,
"pull_requests": pr_rows,
"comments": comment_rows,
"reviews": review_rows,
"review_comments": review_comment_rows,
"pr_files": pr_file_rows,
"pr_diffs": pr_diff_rows,
"links": link_rows,
"events": timeline_rows,
}
final_tables = dict(delta_tables)
if merge_with_previous:
_log("Loading previous snapshot tables for merge")
for table_name, delta_rows in delta_tables.items():
previous_rows = _load_previous_rows(previous_snapshot_dir, table_name)
final_tables[table_name] = _merge_rows(table_name, previous_rows, delta_rows)
_log("Merged incremental delta into cumulative snapshot")
manifest = {
"repo": repo_slug,
"snapshot_id": snapshot_id,
"crawl_started_at": crawl_started_at,
"extracted_at": extracted_at,
"watermark": {
"effective_since": effective_since,
"next_since": crawl_started_at,
"resume_enabled": options.resume,
"resumed_from_checkpoint": bool(checkpoint),
"merge_with_previous": merge_with_previous,
"previous_snapshot_dir": str(previous_snapshot_dir) if previous_snapshot_dir else None,
},
"options": {
"since": options.since,
"effective_since": effective_since,
"http_timeout": options.http_timeout,
"http_max_retries": options.http_max_retries,
"max_issues": options.max_issues,
"max_prs": options.max_prs,
"max_issue_comments": options.max_issue_comments,
"max_reviews_per_pr": options.max_reviews_per_pr,
"max_review_comments_per_pr": options.max_review_comments_per_pr,
"issue_max_age_days": options.issue_max_age_days,
"pr_max_age_days": options.pr_max_age_days,
"fetch_timeline": options.fetch_timeline,
"new_contributor_report": options.new_contributor_report,
"new_contributor_window_days": options.new_contributor_window_days,
"new_contributor_max_authors": options.new_contributor_max_authors,
},
"delta_counts": {
"issue_stubs": len(issue_stubs),
"issues": len(issue_rows),
"pull_requests": len(pr_rows),
"comments": len(comment_rows),
"reviews": len(review_rows),
"review_comments": len(review_comment_rows),
"pr_files": len(pr_file_rows),
"pr_diffs": len(pr_diff_rows),
"timeline_events": len(timeline_rows),
"links": len(link_rows),
},
"counts": {
"issues": len(final_tables["issues"]),
"pull_requests": len(final_tables["pull_requests"]),
"comments": len(final_tables["comments"]),
"reviews": len(final_tables["reviews"]),
"review_comments": len(final_tables["review_comments"]),
"pr_files": len(final_tables["pr_files"]),
"pr_diffs": len(final_tables["pr_diffs"]),
"timeline_events": len(final_tables["events"]),
"links": len(final_tables["links"]),
},
}
# Write the final snapshot, derived viewer tables, manifest, and optional publish artifacts.
_log("Writing Parquet snapshot files")
write_parquet(final_tables["issues"], snapshot_dir / "issues.parquet", "issues")
write_parquet(
final_tables["pull_requests"], snapshot_dir / "pull_requests.parquet", "pull_requests"
)
write_parquet(final_tables["comments"], snapshot_dir / "comments.parquet", "comments")
issue_comment_rows, pr_comment_rows = _viewer_comment_rows(
final_tables["comments"],
final_tables["pull_requests"],
)
write_parquet(issue_comment_rows, snapshot_dir / "issue_comments.parquet", "comments")
write_parquet(pr_comment_rows, snapshot_dir / "pr_comments.parquet", "comments")
write_parquet(final_tables["reviews"], snapshot_dir / "reviews.parquet", "reviews")
write_parquet(final_tables["pr_files"], snapshot_dir / "pr_files.parquet", "pr_files")
write_parquet(final_tables["pr_diffs"], snapshot_dir / "pr_diffs.parquet", "pr_diffs")
write_parquet(
final_tables["review_comments"], snapshot_dir / "review_comments.parquet", "review_comments"
)
write_parquet(final_tables["links"], snapshot_dir / "links.parquet", "links")
write_parquet(final_tables["events"], snapshot_dir / "events.parquet", "events")
write_json(manifest, snapshot_dir / "manifest.json")
generated_new_contributor_report = False
if options.new_contributor_report:
_log("Generating new contributor dataset/report artifacts")
run_new_contributor_report(
NewContributorReportOptions(
snapshot_dir=snapshot_dir,
output_dir=options.output_dir,
output=None,
json_output=None,
hf_repo_id=None,
hf_revision=None,
hf_materialize_dir=None,
window_days=options.new_contributor_window_days,
max_authors=options.new_contributor_max_authors,
)
)
generated_new_contributor_report = True
new_contributor_rows = read_parquet_rows(snapshot_dir / "new_contributors.parquet")
manifest["counts"]["new_contributors"] = len(new_contributor_rows)
manifest["artifacts"] = {
"new_contributors_parquet": "new_contributors.parquet",
"new_contributors_json": "new-contributors-report.json",
"new_contributors_markdown": "new-contributors-report.md",
}
write_json(manifest, snapshot_dir / "manifest.json")
write_text(
_dataset_card(
repo_slug,
snapshot_id,
manifest,
include_new_contributors=generated_new_contributor_report,
),
snapshot_dir / "README.md",
)
_log("Wrote manifest and dataset card")
latest = _latest_snapshot_pointer_path(options.output_dir)
write_json(
{
"repo": repo_slug,
"latest_snapshot_id": snapshot_id,
"snapshot_dir": str(snapshot_dir),
"manifest_path": str(snapshot_dir / "manifest.json"),
"next_since": crawl_started_at,
},
latest,
)
_log(f"Updated latest snapshot pointer: {latest}")
watermark_payload = {
"repo": repo_slug,
"last_successful_snapshot_id": snapshot_id,
"snapshot_dir": str(snapshot_dir),
"effective_since": effective_since,
"next_since": crawl_started_at,
"updated_at": extracted_at,
}
write_json(watermark_payload, _watermark_path(options.output_dir))
_log(f"Updated watermark state: {_watermark_path(options.output_dir)}")
_clear_checkpoint(options.output_dir, snapshot_dir)
_log(f"Snapshot complete: {snapshot_dir}")
return snapshot_dir