Spaces:
Running
Running
| """ | |
| usage_logging.py | |
| ---------------- | |
| Purpose: | |
| This module implements privacy-preserving telemetry for the | |
| AI Recruiting Agent Hugging Face Space. | |
| Its sole purpose is to measure anonymous usage and adoption | |
| metrics in order to: | |
| - Understand how the tool is being used | |
| - Improve reliability and performance | |
| - Gauge sense of real-world adoption | |
| - Support research and evaluation of responsible AI practices | |
| Privacy Principles: | |
| This module is explicitly designed to minimize data collection | |
| and avoid storing any personally identifiable information (PII). | |
| It DOES NOT collect or store: | |
| - Raw IP addresses | |
| - User names or Hugging Face account IDs | |
| - Resume contents or job descriptions | |
| - Emails, phone numbers, or file names | |
| - Full user-agent strings or device fingerprints | |
| - Any demographic attributes about users | |
| It ONLY records: | |
| - Approximate country and city (derived from IP, not stored) | |
| - UTC timestamp of the event | |
| - Space URL | |
| - High-level event type (e.g., "app_open") | |
| - Non-identifying, aggregate metadata (e.g., counts, booleans, latencies) | |
| All usage logs are: | |
| - Anonymized | |
| - Append-only | |
| - Persisted in a public Hugging Face Dataset repository (https://huggingface.co/datasets/19arjun89/ai_recruiting_agent_usage) | |
| - Versioned via immutable commit history for auditability | |
| Ethical Safeguards: | |
| - Logging failures never break application functionality | |
| - No raw identifiers are persisted at any time | |
| - All telemetry is optional and best-effort | |
| - The system is intended for transparency and improvement, | |
| not for surveillance or profiling | |
| Transparency: | |
| A public-facing usage reporting Space will be provided to allow | |
| independent verification of aggregate adoption metrics. | |
| Author: | |
| Arjun Singh | |
| Last Updated: | |
| 2026-01-27 | |
| """ | |
| import os | |
| import json | |
| from datetime import datetime | |
| import requests | |
| import gradio as gr | |
| from huggingface_hub import HfApi, list_repo_files, hf_hub_download | |
| import ipaddress | |
| import pycountry | |
| from io import BytesIO | |
| import uuid | |
| import time | |
| SPACE_URL = "https://huggingface.co/spaces/19arjun89/AI_Recruiting_Agent" | |
| USAGE_DATASET_REPO = "19arjun89/ai_recruiting_agent_usage" | |
| USAGE_EVENTS_DIR = "usage/events" | |
| LEGACY_JSONL_PATH = "usage/visits_legacy.jsonl" | |
| ROLLUP_PATH = "usage/visits.jsonl" | |
| def _hf_api(): | |
| token = os.environ.get("HF_TOKEN") | |
| if not token: | |
| return None | |
| return HfApi(token=token) | |
| def _is_public_ip(ip: str) -> bool: | |
| try: | |
| obj = ipaddress.ip_address(ip) | |
| return not (obj.is_private or obj.is_loopback or obj.is_reserved or obj.is_multicast or obj.is_link_local) | |
| except Exception: | |
| return False | |
| def _get_client_ip(request: gr.Request) -> str: | |
| if request: | |
| xff = request.headers.get("x-forwarded-for") | |
| if xff: | |
| for part in xff.split(","): | |
| ip = part.strip() | |
| if _is_public_ip(ip): | |
| return ip | |
| if request.client: | |
| host = request.client.host | |
| return host if _is_public_ip(host) else "" | |
| return "" | |
| def _country_lookup(ip: str) -> tuple[str, str]: | |
| token = os.environ.get("IPINFO_TOKEN") | |
| if not token: | |
| return ("", "") | |
| try: | |
| url = f"https://ipinfo.io/{ip}/json?token={token}" | |
| r = requests.get(url, timeout=4) | |
| if r.status_code != 200: | |
| return ("", "") | |
| data = r.json() | |
| # Some plans: country="US" | |
| # Some plans: country_code="US" and country="United States" | |
| cc = (data.get("country_code") or data.get("country") or "").strip().upper() | |
| name = (data.get("country") or "").strip() | |
| # If name is actually a code like "US", expand it | |
| if len(name) == 2 and name.upper() == cc: | |
| name = _expand_country_code(cc) | |
| # If name is missing but cc exists, expand | |
| if not name and cc: | |
| name = _expand_country_code(cc) | |
| return (cc, name) | |
| except Exception: | |
| return ("", "") | |
| def append_visit_to_dataset( | |
| country: str, | |
| city: str, | |
| session_id: str = "", | |
| event_type: str = "session_start", | |
| country_source: str = "unknown", | |
| country_code: str = "", | |
| **extra_fields | |
| ): | |
| api = _hf_api() | |
| if not api: | |
| return | |
| event = { | |
| "ts_utc": datetime.utcnow().isoformat() + "Z", | |
| "space_url": SPACE_URL, | |
| "session_id": session_id, | |
| "event": event_type, | |
| "country": country or "Unknown", | |
| "country_code": (country_code or "").strip().upper(), | |
| "country_source": country_source or "unknown", | |
| "city": city or "", | |
| } | |
| if extra_fields: | |
| # Prevent JSON nulls | |
| event.update({k: v for k, v in extra_fields.items() if v is not None}) | |
| # Unique file path per event (prevents collisions) | |
| ts = datetime.utcnow().strftime("%Y%m%dT%H%M%S%f") | |
| uid = uuid.uuid4().hex[:8] | |
| path_in_repo = f"{USAGE_EVENTS_DIR}/{ts}_{uid}.json" | |
| try: | |
| api.upload_file( | |
| repo_id=USAGE_DATASET_REPO, | |
| repo_type="dataset", | |
| path_in_repo=path_in_repo, | |
| path_or_fileobj=BytesIO(json.dumps(event).encode("utf-8")), | |
| commit_message=f"log {event_type}", | |
| ) | |
| except Exception as e: | |
| print("telemetry upload failed:", repr(e)) | |
| def record_visit(request: gr.Request | None, session_id: str = "", event_type: str = "session_start"): | |
| if request is None: | |
| append_visit_to_dataset( | |
| country="NA", | |
| city="", | |
| session_id=session_id, | |
| event_type=event_type, | |
| country_source="click_event", | |
| country_code="", | |
| ) | |
| return | |
| # 1) Header hint | |
| country_hint = _country_from_headers(request) | |
| if _is_valid_country_code(country_hint): | |
| append_visit_to_dataset( | |
| country=_expand_country_code(country_hint), | |
| city="", | |
| session_id=session_id, | |
| event_type=event_type, | |
| country_source="header", | |
| country_code=country_hint.strip().upper(), | |
| ) | |
| return | |
| # 2) IP-based lookup | |
| ip = _get_client_ip(request) | |
| if ip: | |
| cc, name = _country_lookup(ip) | |
| if _is_valid_country_code(cc): | |
| append_visit_to_dataset( | |
| country=name or _expand_country_code(cc), | |
| city="", | |
| session_id=session_id, | |
| event_type=event_type, | |
| country_source="ipinfo", | |
| country_code=cc, | |
| ) | |
| else: | |
| append_visit_to_dataset( | |
| country="Unknown", | |
| city="", | |
| session_id=session_id, | |
| event_type=event_type, | |
| country_source="ipinfo_unknown", | |
| country_code="", | |
| ) | |
| return | |
| # 3) Nothing usable | |
| append_visit_to_dataset( | |
| country="Unknown", | |
| city="", | |
| session_id=session_id, | |
| event_type=event_type, | |
| country_source="none", | |
| country_code="", | |
| ) | |
| def _country_from_headers(request: gr.Request) -> str: | |
| if not request: | |
| return "" | |
| return ( | |
| request.headers.get("cf-ipcountry") or | |
| request.headers.get("x-country") or | |
| request.headers.get("x-geo-country") or | |
| "" | |
| ).strip().upper() | |
| def _is_valid_country_code(code: str) -> bool: | |
| if not code: | |
| return False | |
| code = code.strip().upper() | |
| # Common "unknown" markers from CDNs / proxies | |
| if code in {"XX", "ZZ", "UNKNOWN", "NA", "N/A", "NONE", "-"}: | |
| return False | |
| # ISO2 should be exactly 2 letters | |
| return len(code) == 2 and code.isalpha() | |
| def _expand_country_code(code: str) -> str: | |
| if not code or len(code) != 2: | |
| return "Unknown" | |
| try: | |
| country = pycountry.countries.get(alpha_2=code.upper()) | |
| return country.name if country else "Unknown" | |
| except Exception: | |
| return "Unknown" | |
| def _is_meaningful_country(val: str) -> bool: | |
| v = (val or "").strip().lower() | |
| if not v: | |
| return False | |
| if v in {"unknown", "na", "n/a", "none", "null", "undefined"}: | |
| return False | |
| return True | |
| def rebuild_visits_rollup_from_event_files() -> str: | |
| """ | |
| Rebuilds usage/visits.jsonl from immutable per-event JSON files in usage/events/. | |
| ALSO writes an enriched rollup usage/visits_enriched.jsonl where: | |
| - legacy rows (no session_id) keep their original country as final_country | |
| - new click rows (with session_id) get final_country from the session's session_start row when available | |
| """ | |
| api = _hf_api() | |
| if not api: | |
| return "HF_TOKEN not available. Rollup requires write access." | |
| ENRICHED_ROLLUP_PATH = "usage/visits_enriched.jsonl" | |
| # 1) List files | |
| try: | |
| files = list_repo_files(repo_id=USAGE_DATASET_REPO, repo_type="dataset") | |
| except Exception as e: | |
| return f"Could not list repo files: {repr(e)}" | |
| event_files = [ | |
| f for f in files | |
| if f.startswith(f"{USAGE_EVENTS_DIR}/") and f.endswith(".json") | |
| ] | |
| if not event_files: | |
| return f"No event files found under {USAGE_EVENTS_DIR}/" | |
| events = [] | |
| bad = 0 | |
| # 2) Download & parse each event | |
| for path in event_files: | |
| try: | |
| local_path = hf_hub_download( | |
| repo_id=USAGE_DATASET_REPO, | |
| repo_type="dataset", | |
| filename=path, | |
| ) | |
| with open(local_path, "r", encoding="utf-8") as f: | |
| events.append(json.load(f)) | |
| except Exception: | |
| bad += 1 | |
| if not events: | |
| return f"Found {len(event_files)} event files, but 0 were parseable (bad={bad})." | |
| # 3) Sort by ts_utc | |
| events.sort(key=lambda e: (e.get("ts_utc") or "")) | |
| # 4) Build session → geo map from session_start events (new schema) | |
| session_geo = {} | |
| for e in events: | |
| if e.get("event") == "session_start": | |
| sid = (e.get("session_id") or "").strip() | |
| if not sid: | |
| continue # legacy usage_start rows won’t have session_id; ignore for mapping | |
| if sid not in session_geo: | |
| session_geo[sid] = { | |
| "country_session": e.get("country") or "Unknown", | |
| "country_code_session": (e.get("country_code") or "").strip().upper(), | |
| "country_source_session": e.get("country_source") or "unknown", | |
| } | |
| # 5) Write RAW JSONL (same behavior as today) | |
| buf_raw = BytesIO() | |
| for evt in events: | |
| buf_raw.write((json.dumps(evt, ensure_ascii=False) + "\n").encode("utf-8")) | |
| buf_raw.seek(0) | |
| # 6) Write ENRICHED JSONL | |
| # Rules: | |
| # - Legacy rows (no session_id): final_country = evt.country (already correct) | |
| # - New rows with session_id: | |
| # - if evt.country is Unknown/blank, use country from session usage_start | |
| # - keep original evt.country as well (don’t overwrite) | |
| buf_enriched = BytesIO() | |
| for evt in events: | |
| enriched = dict(evt) | |
| sid = (evt.get("session_id") or "").strip() | |
| geo = session_geo.get(sid, {}) if sid else {} | |
| # Keep a copy of session geo (useful for debugging/audit) | |
| if geo: | |
| enriched.update(geo) | |
| evt_country = (evt.get("country") or "").strip() | |
| evt_cc = (evt.get("country_code") or "").strip().upper() | |
| # Determine final_country / final_country_code | |
| if sid: | |
| if _is_meaningful_country(evt_country): | |
| enriched["final_country"] = evt_country | |
| enriched["final_country_code"] = evt_cc | |
| enriched["final_country_source"] = evt.get("country_source") or "unknown" | |
| else: | |
| enriched["final_country"] = geo.get("country_session", "Unknown") | |
| enriched["final_country_code"] = geo.get("country_code_session", "") | |
| enriched["final_country_source"] = geo.get("country_source_session", "unknown") | |
| else: | |
| # Legacy row: preserve the original country fields | |
| enriched["final_country"] = evt_country or "Unknown" | |
| enriched["final_country_code"] = evt_cc | |
| enriched["final_country_source"] = evt.get("country_source") or "unknown" | |
| buf_enriched.write((json.dumps(enriched, ensure_ascii=False) + "\n").encode("utf-8")) | |
| buf_enriched.seek(0) | |
| # 7) Upload both rollups | |
| try: | |
| api.upload_file( | |
| repo_id=USAGE_DATASET_REPO, | |
| repo_type="dataset", | |
| path_in_repo=ROLLUP_PATH, # your existing usage/visits.jsonl | |
| path_or_fileobj=buf_raw, | |
| commit_message=f"rebuild {ROLLUP_PATH} from {USAGE_EVENTS_DIR}", | |
| ) | |
| api.upload_file( | |
| repo_id=USAGE_DATASET_REPO, | |
| repo_type="dataset", | |
| path_in_repo=ENRICHED_ROLLUP_PATH, | |
| path_or_fileobj=buf_enriched, | |
| commit_message=f"rebuild {ENRICHED_ROLLUP_PATH} from {USAGE_EVENTS_DIR}", | |
| ) | |
| except Exception as e: | |
| return f"Rollup upload failed: {repr(e)}" | |
| return ( | |
| f"Rollups rebuilt:\n" | |
| f"- RAW: {ROLLUP_PATH} rows={len(events)} (bad_files={bad})\n" | |
| f"- ENRICHED: {ENRICHED_ROLLUP_PATH} rows={len(events)} (bad_files={bad})" | |
| ) | |