diffusers-pr-api / src /slop_farmer /reports /user_activity.py
evalstate's picture
evalstate HF Staff
Deploy Diffusers PR API
dbf7313 verified
from __future__ import annotations
import json
import urllib.error
import urllib.request
from datetime import UTC, datetime, timedelta
from typing import Any
from slop_farmer.config import resolve_github_token
from slop_farmer.data.http import urlopen_with_retry
GRAPHQL_URL = "https://api.github.com/graphql"
PROFILE_QUERY = """
query UserActivityProfile($login: String!, $from: DateTime!, $to: DateTime!) {
rateLimit { cost remaining resetAt }
viewer {
login
organizations(first: 100) {
totalCount
nodes {
login
name
}
}
}
user(login: $login) {
login
name
createdAt
organizations(first: 100) {
totalCount
nodes {
login
name
}
}
starredRepositories(first: 100, orderBy: {field: STARRED_AT, direction: DESC}) {
totalCount
nodes {
nameWithOwner
stargazerCount
owner {
login
}
}
}
contributionsCollection(from: $from, to: $to) {
contributionCalendar {
totalContributions
weeks {
contributionDays {
date
contributionCount
}
}
}
totalIssueContributions
totalPullRequestContributions
pullRequestContributionsByRepository(maxRepositories: 10) {
repository { nameWithOwner }
contributions { totalCount }
}
issueContributionsByRepository(maxRepositories: 10) {
repository { nameWithOwner }
contributions { totalCount }
}
}
}
}
""".strip()
SEARCH_PRS_QUERY = """
query SearchPullRequests($query: String!, $cursor: String) {
rateLimit { cost remaining resetAt }
search(type: ISSUE, query: $query, first: 100, after: $cursor) {
issueCount
pageInfo { hasNextPage endCursor }
nodes {
... on PullRequest {
number
state
merged
createdAt
updatedAt
repository { nameWithOwner stargazerCount }
}
}
}
}
""".strip()
SEARCH_ISSUES_QUERY = """
query SearchIssues($query: String!, $cursor: String) {
rateLimit { cost remaining resetAt }
search(type: ISSUE, query: $query, first: 100, after: $cursor) {
issueCount
pageInfo { hasNextPage endCursor }
nodes {
... on Issue {
number
state
createdAt
updatedAt
repository { nameWithOwner }
}
}
}
}
""".strip()
def summarize_user(login: str, days: int, repo: str | None) -> dict[str, Any]:
now = datetime.now(tz=UTC)
start = (now - timedelta(days=days)).replace(microsecond=0)
from_iso = start.isoformat().replace("+00:00", "Z")
from_date = start.date().isoformat()
to_iso = now.replace(microsecond=0).isoformat().replace("+00:00", "Z")
_user_activity_log(f"user enrichment {login}: fetching profile")
profile, _errors = _post_graphql(
PROFILE_QUERY, {"login": login, "from": from_iso, "to": to_iso}
)
viewer = profile["viewer"]
user = profile["user"]
if user is None:
raise RuntimeError(f"unknown user {login!r}")
contributions = user["contributionsCollection"]
calendar = contributions["contributionCalendar"]
repo_term = f" repo:{repo}" if repo else ""
pr_query = f"author:{login} is:pr created:>={from_date}{repo_term} sort:created-desc"
open_pr_query = (
f"author:{login} is:pr is:open created:>={from_date}{repo_term} sort:created-desc"
)
issue_query = f"author:{login} is:issue created:>={from_date}{repo_term} sort:created-desc"
pr_count, prs, inaccessible_pr_nodes = _search_all(pr_query, prs=True, label=f"{login} prs")
open_pr_count, open_prs, inaccessible_open_pr_nodes = _search_all(
open_pr_query, prs=True, label=f"{login} open-prs"
)
issue_count, issues, inaccessible_issue_nodes = _search_all(
issue_query, prs=False, label=f"{login} issues"
)
merged_prs = [pr for pr in prs if pr.get("merged")]
closed_unmerged_prs = [pr for pr in prs if pr.get("state") == "CLOSED" and not pr.get("merged")]
still_open_prs = [pr for pr in prs if pr.get("state") == "OPEN"]
open_pr_repos = sorted(
{pr["repository"]["nameWithOwner"] for pr in open_prs if pr.get("repository")}
)
pr_repos = sorted({pr["repository"]["nameWithOwner"] for pr in prs if pr.get("repository")})
issue_repos = sorted(
{issue["repository"]["nameWithOwner"] for issue in issues if issue.get("repository")}
)
starred = user.get("starredRepositories") or {}
starred_nodes = [row for row in starred.get("nodes") or [] if isinstance(row, dict)]
non_self_starred = [
row
for row in starred_nodes
if ((row.get("owner") or {}).get("login") or "").casefold() != login.casefold()
]
recent_pr_repo_stars = [
int((pr.get("repository") or {}).get("stargazerCount") or 0) for pr in prs
]
merged_pr_repo_stars = [
int((pr.get("repository") or {}).get("stargazerCount") or 0) for pr in merged_prs
]
closed_unmerged_pr_repo_stars = [
int((pr.get("repository") or {}).get("stargazerCount") or 0) for pr in closed_unmerged_prs
]
open_pr_repo_stars = [
int((pr.get("repository") or {}).get("stargazerCount") or 0) for pr in open_prs
]
repo_owners = sorted(
{
repo_name.split("/", 1)[0]
for repo_name in pr_repos + issue_repos + open_pr_repos
if "/" in repo_name and repo_name.split("/", 1)[0].casefold() != login.casefold()
}
)
public_org_rows = _public_orgs(login)
target_listed_org_rows = [
{"login": row["login"], "name": row.get("name")}
for row in (user.get("organizations") or {}).get("nodes") or []
if isinstance(row, dict) and row.get("login")
]
viewer_listed_org_rows = [
{"login": row["login"], "name": row.get("name")}
for row in (viewer.get("organizations") or {}).get("nodes") or []
if isinstance(row, dict) and row.get("login")
]
viewer_is_target = viewer.get("login", "").casefold() == login.casefold()
auth_membership_rows = _authenticated_memberships() if viewer_is_target else []
direct_membership_checks = _check_viewer_org_membership(repo_owners) if viewer_is_target else []
public_org_logins = {row["login"] for row in public_org_rows}
listed_org_logins = {row["login"] for row in target_listed_org_rows}
auth_membership_logins = {row["login"] for row in auth_membership_rows}
directly_confirmed_orgs = sorted(
row["login"] for row in direct_membership_checks if row.get("viewer_is_a_member")
)
listing_endpoints_partial = any(
org not in public_org_logins | listed_org_logins | auth_membership_logins
for org in directly_confirmed_orgs
)
created_at = datetime.fromisoformat(user["createdAt"].replace("Z", "+00:00"))
account_age_days = (now - created_at).days
return {
"login": user["login"],
"name": user.get("name"),
"repo_scope": repo,
"window": {"days": days, "from": from_iso, "to": to_iso},
"account": {"created_at": user["createdAt"], "age_days": account_age_days},
"activity": {
"visible_contributions_total": calendar["totalContributions"],
**_contribution_calendar_summary(calendar["weeks"]),
"authored_issues": contributions["totalIssueContributions"],
"authored_pull_requests": contributions["totalPullRequestContributions"],
"visible_authored_issue_count": len(issues),
"visible_authored_pr_count": len(prs),
"visible_open_pr_count": len(open_prs),
"search_authored_issue_count": issue_count,
"search_authored_pr_count": pr_count,
"search_open_pr_count": open_pr_count,
"inaccessible_issue_nodes": inaccessible_issue_nodes,
"inaccessible_pr_nodes": inaccessible_pr_nodes,
"inaccessible_open_pr_nodes": inaccessible_open_pr_nodes,
"distinct_repos_with_authored_prs": len(pr_repos),
"distinct_repos_with_authored_issues": len(issue_repos),
"distinct_repos_with_open_prs": len(open_pr_repos),
"open_pr_count": len(open_prs),
"merged_pr_count": len(merged_prs),
"closed_unmerged_pr_count": len(closed_unmerged_prs),
"still_open_pr_count": len(still_open_prs),
"merged_pr_rate": _rate(len(merged_prs), len(prs)),
"closed_unmerged_pr_rate": _rate(len(closed_unmerged_prs), len(prs)),
"still_open_pr_rate": _rate(len(still_open_prs), len(prs)),
},
"stars": {
"starred_repositories_total": starred.get("totalCount"),
"visible_starred_repositories_returned": len(starred_nodes),
"visible_non_self_starred_repositories": len(non_self_starred),
"visible_non_self_starred_examples": [
row["nameWithOwner"] for row in non_self_starred[:10]
],
"recent_pr_repo_star_distribution": _star_distribution(recent_pr_repo_stars),
"recent_merged_pr_repo_star_distribution": _star_distribution(merged_pr_repo_stars),
"recent_closed_unmerged_pr_repo_star_distribution": _star_distribution(
closed_unmerged_pr_repo_stars
),
"recent_open_pr_repo_star_distribution": _star_distribution(open_pr_repo_stars),
},
"top_repositories": {
"pull_requests": [
{
"repo": row["repository"]["nameWithOwner"],
"count": row["contributions"]["totalCount"],
}
for row in contributions["pullRequestContributionsByRepository"]
],
"issues": [
{
"repo": row["repository"]["nameWithOwner"],
"count": row["contributions"]["totalCount"],
}
for row in contributions["issueContributionsByRepository"]
],
"open_pr_repositories": open_pr_repos,
},
"organization_membership": {
"target_is_viewer": viewer_is_target,
"public_orgs": public_org_rows,
"graphql_target_listed_orgs": target_listed_org_rows,
"graphql_viewer_listed_orgs": viewer_listed_org_rows,
"authenticated_memberships": auth_membership_rows,
"checked_recent_repo_owner_orgs": direct_membership_checks,
"visibility": {
"listing_endpoints_partial": listing_endpoints_partial,
"public_org_count": len(public_org_rows),
"graphql_target_listed_org_count": len(target_listed_org_rows),
"authenticated_membership_count": len(auth_membership_rows),
"directly_confirmed_membership_count": len(directly_confirmed_orgs),
"directly_confirmed_memberships": directly_confirmed_orgs,
},
},
}
def _post_graphql(
query: str,
variables: dict[str, Any],
*,
allow_partial: bool = False,
) -> tuple[dict[str, Any], list[dict[str, Any]]]:
token = resolve_github_token()
if not token:
raise RuntimeError("missing GITHUB_TOKEN/GRAPHQL_TOKEN/GH_TOKEN")
body = json.dumps({"query": query, "variables": variables}).encode()
request = urllib.request.Request(
GRAPHQL_URL,
data=body,
headers={
"Authorization": f"bearer {token}",
"User-Agent": "slop-farmer",
"Content-Type": "application/json",
"Accept": "application/json",
},
method="POST",
)
try:
with urlopen_with_retry(
request,
timeout=120,
log=_user_activity_log,
label="GitHub GraphQL user activity",
) as response:
payload = json.load(response)
except urllib.error.HTTPError as exc: # pragma: no cover - live network only
detail = exc.read().decode("utf-8", "replace")
raise RuntimeError(f"graphql request failed: {exc.code} {detail}") from exc
errors = payload.get("errors") or []
if errors and not allow_partial:
raise RuntimeError(json.dumps(errors))
return payload["data"], errors
def _search_all(
query: str, *, prs: bool, label: str | None = None
) -> tuple[int, list[dict[str, Any]], int]:
nodes: list[dict[str, Any]] = []
cursor: str | None = None
issue_count = 0
inaccessible = 0
search_query = SEARCH_PRS_QUERY if prs else SEARCH_ISSUES_QUERY
page = 0
while True:
page += 1
data, errors = _post_graphql(
search_query, {"query": query, "cursor": cursor}, allow_partial=True
)
search = data["search"]
issue_count = search["issueCount"]
inaccessible += len(errors)
nodes.extend(node for node in search["nodes"] if isinstance(node, dict))
if label and (
page == 1
or page % 5 == 0
or not search["pageInfo"]["hasNextPage"]
or len(nodes) >= 1000
):
_user_activity_log(
f"user enrichment {label}: page {page}, loaded {len(nodes)} visible rows, "
f"search_count={issue_count}, inaccessible={inaccessible}"
)
if not search["pageInfo"]["hasNextPage"] or len(nodes) >= 1000:
break
cursor = search["pageInfo"]["endCursor"]
return issue_count, nodes, inaccessible
def _public_orgs(login: str) -> list[dict[str, Any]]:
payload = _get_json(f"https://api.github.com/users/{login}/orgs")
if not isinstance(payload, list):
return []
return [
{"login": row.get("login"), "name": row.get("name")}
for row in payload
if isinstance(row, dict) and row.get("login")
]
def _authenticated_memberships() -> list[dict[str, Any]]:
payload = _get_json("https://api.github.com/user/memberships/orgs")
if not isinstance(payload, list):
return []
rows = []
for row in payload:
if not isinstance(row, dict):
continue
org = row.get("organization") or {}
if not org.get("login"):
continue
rows.append(
{
"login": org.get("login"),
"name": org.get("name"),
"state": row.get("state"),
"role": row.get("role"),
}
)
return rows
def _check_viewer_org_membership(logins: list[str]) -> list[dict[str, Any]]:
if not logins:
return []
aliases = {f"org{i}": login for i, login in enumerate(logins)}
fields = "\n".join(
f'{alias}: organization(login: "{login}") {{ login name viewerIsAMember viewerCanAdminister }}'
for alias, login in aliases.items()
)
query = f"query ViewerOrgChecks {{ rateLimit {{ cost remaining resetAt }}\n{fields}\n}}"
data, _errors = _post_graphql(query, {})
rows = []
for alias in aliases:
org = data.get(alias)
if isinstance(org, dict) and org.get("login"):
rows.append(
{
"login": org["login"],
"name": org.get("name"),
"viewer_is_a_member": bool(org.get("viewerIsAMember")),
"viewer_can_administer": bool(org.get("viewerCanAdminister")),
}
)
return rows
def _get_json(url: str) -> Any:
token = resolve_github_token()
if not token:
raise RuntimeError("missing GITHUB_TOKEN/GRAPHQL_TOKEN/GH_TOKEN")
request = urllib.request.Request(
url,
headers={
"Authorization": f"bearer {token}",
"User-Agent": "slop-farmer",
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
},
)
try:
with urlopen_with_retry(
request,
timeout=120,
log=_user_activity_log,
label=url,
) as response:
return json.load(response)
except urllib.error.HTTPError as exc: # pragma: no cover - live network only
detail = exc.read().decode("utf-8", "replace")
raise RuntimeError(f"rest request failed: {exc.code} {url} {detail}") from exc
def _contribution_calendar_summary(weeks: list[dict[str, Any]]) -> dict[str, Any]:
daily = [day for week in weeks for day in week["contributionDays"]]
active_days = sum(1 for day in daily if day["contributionCount"] > 0)
weekly = [sum(day["contributionCount"] for day in week["contributionDays"]) for week in weeks]
return {"active_days": active_days, "weekly_totals": weekly}
def _star_distribution(values: list[int]) -> dict[str, Any]:
if not values:
return {
"count": 0,
"min": None,
"median": None,
"max": None,
"buckets": {"lt_100": 0, "100_to_999": 0, "1k_to_9k": 0, "10k_plus": 0},
}
ordered = sorted(values)
mid = len(ordered) // 2
median = ordered[mid] if len(ordered) % 2 else round((ordered[mid - 1] + ordered[mid]) / 2, 1)
return {
"count": len(values),
"min": min(values),
"median": median,
"max": max(values),
"buckets": {
"lt_100": sum(1 for value in values if value < 100),
"100_to_999": sum(1 for value in values if 100 <= value < 1000),
"1k_to_9k": sum(1 for value in values if 1000 <= value < 10000),
"10k_plus": sum(1 for value in values if value >= 10000),
},
}
def _rate(numerator: int, denominator: int) -> float | None:
if denominator == 0:
return None
return round(numerator / denominator, 4)
def _user_activity_log(message: str) -> None:
stamp = datetime.now(tz=UTC).strftime("%H:%M:%SZ")
print(f"[{stamp}] {message}", flush=True)