| """
|
| Dataset Quality Monitor - tracks dataset quality metrics across versions,
|
| compares quality between iterations, and flags regressions.
|
| """
|
|
|
| from __future__ import annotations
|
|
|
| import json
|
| import os
|
| import sys
|
| import threading
|
| from datetime import datetime
|
| from pathlib import Path
|
| from typing import Any, Dict, List, Optional
|
|
|
| _THIS_DIR = Path(__file__).resolve().parent
|
| _PROJECT_ROOT = _THIS_DIR.parent
|
| if str(_PROJECT_ROOT) not in sys.path:
|
| sys.path.insert(0, str(_PROJECT_ROOT))
|
|
|
|
|
| _DEFAULT_QUALITY_FILE = Path(__file__).resolve().parent.parent / "dataset_quality_log.json"
|
|
|
|
|
| class DatasetQualityMonitor:
|
| """Monitor dataset quality metrics across versions."""
|
|
|
|
|
| REGRESSION_THRESHOLDS = {
|
| "total_examples": -0.10,
|
| "avg_response_length": -0.15,
|
| "duplicate_rate": 0.05,
|
| "topic_diversity": -0.10,
|
| }
|
|
|
| def __init__(self, quality_file: Optional[str] = None):
|
| self.quality_file = Path(quality_file) if quality_file else _DEFAULT_QUALITY_FILE
|
| self._lock = threading.Lock()
|
| self._ensure_file()
|
|
|
| def _ensure_file(self) -> None:
|
| if not self.quality_file.exists():
|
| os.makedirs(self.quality_file.parent, exist_ok=True)
|
| with open(self.quality_file, "w", encoding="utf-8") as f:
|
| json.dump([], f)
|
|
|
| def _read_all(self) -> List[Dict[str, Any]]:
|
| with open(self.quality_file, "r", encoding="utf-8") as f:
|
| try:
|
| data = json.load(f)
|
| except json.JSONDecodeError:
|
| data = []
|
| return data if isinstance(data, list) else []
|
|
|
| def _write_all(self, entries: List[Dict[str, Any]]) -> None:
|
| with open(self.quality_file, "w", encoding="utf-8") as f:
|
| json.dump(entries, f, indent=2, default=str)
|
|
|
|
|
|
|
| def record_quality(
|
| self,
|
| dataset_version: str,
|
| total_examples: int,
|
| valid_examples: int,
|
| avg_response_length: float,
|
| duplicate_rate: float,
|
| near_duplicate_rate: float,
|
| topic_diversity: float,
|
| topic_concentration: float,
|
| min_length: int = 0,
|
| max_length: int = 0,
|
| too_short: int = 0,
|
| too_long: int = 0,
|
| extra: Optional[Dict[str, Any]] = None,
|
| ) -> Dict[str, Any]:
|
| """Record quality metrics for a dataset version.
|
|
|
| Returns the recorded entry.
|
| """
|
| entry: Dict[str, Any] = {
|
| "timestamp": datetime.utcnow().isoformat() + "Z",
|
| "dataset_version": dataset_version,
|
| "total_examples": total_examples,
|
| "valid_examples": valid_examples,
|
| "invalid_examples": total_examples - valid_examples,
|
| "validity_rate": round(valid_examples / max(total_examples, 1), 4),
|
| "avg_response_length": round(avg_response_length, 1),
|
| "duplicate_rate": round(duplicate_rate, 4),
|
| "near_duplicate_rate": round(near_duplicate_rate, 4),
|
| "topic_diversity": round(topic_diversity, 4),
|
| "topic_concentration": round(topic_concentration, 4),
|
| "min_length": min_length,
|
| "max_length": max_length,
|
| "too_short": too_short,
|
| "too_long": too_long,
|
| }
|
| if extra:
|
| entry["extra"] = extra
|
|
|
| with self._lock:
|
| entries = self._read_all()
|
| entries.append(entry)
|
| self._write_all(entries)
|
|
|
| return entry
|
|
|
| def record_from_validation_report(
|
| self,
|
| dataset_version: str,
|
| report: Dict[str, Any],
|
| ) -> Dict[str, Any]:
|
| """Record quality from a DatasetValidator report dict."""
|
| ls = report.get("response_length_stats", {})
|
| total = report.get("total_lines", 0)
|
| valid = report.get("valid", 0)
|
| exact_dup = report.get("exact_duplicates", 0)
|
| near_dup = report.get("near_duplicates", 0)
|
|
|
| return self.record_quality(
|
| dataset_version=dataset_version,
|
| total_examples=total,
|
| valid_examples=valid,
|
| avg_response_length=ls.get("mean", 0),
|
| duplicate_rate=exact_dup / max(total, 1),
|
| near_duplicate_rate=near_dup / max(total, 1),
|
| topic_diversity=report.get("unique_topics", 0) / max(total, 1),
|
| topic_concentration=report.get("topic_concentration", 0),
|
| min_length=ls.get("min", 0),
|
| max_length=ls.get("max", 0),
|
| too_short=report.get("too_short", 0),
|
| too_long=report.get("too_long", 0),
|
| )
|
|
|
|
|
|
|
| def get_all(self) -> List[Dict[str, Any]]:
|
| """Get all quality records."""
|
| with self._lock:
|
| return self._read_all()
|
|
|
| def get_by_version(self, version: str) -> Optional[Dict[str, Any]]:
|
| """Get the latest quality record for a specific version."""
|
| entries = self.get_all()
|
| matches = [e for e in entries if e.get("dataset_version") == version]
|
| if not matches:
|
| return None
|
| return max(matches, key=lambda e: e.get("timestamp", ""))
|
|
|
| def get_latest(self) -> Optional[Dict[str, Any]]:
|
| """Get the most recent quality record."""
|
| entries = self.get_all()
|
| if not entries:
|
| return None
|
| return max(entries, key=lambda e: e.get("timestamp", ""))
|
|
|
| def get_versions(self) -> List[str]:
|
| """Get all unique dataset versions, in chronological order."""
|
| entries = sorted(self.get_all(), key=lambda e: e.get("timestamp", ""))
|
| seen = set()
|
| versions = []
|
| for e in entries:
|
| v = e.get("dataset_version", "unknown")
|
| if v not in seen:
|
| seen.add(v)
|
| versions.append(v)
|
| return versions
|
|
|
|
|
|
|
| def compare_versions(
|
| self,
|
| version_a: str,
|
| version_b: str,
|
| ) -> Dict[str, Any]:
|
| """Compare quality metrics between two dataset versions.
|
|
|
| Returns dict with metrics from each version and deltas.
|
| """
|
| a = self.get_by_version(version_a)
|
| b = self.get_by_version(version_b)
|
|
|
| if not a or not b:
|
| return {
|
| "error": f"Missing version data: "
|
| f"{'version_a' if not a else 'version_b'} not found",
|
| "version_a": version_a,
|
| "version_b": version_b,
|
| }
|
|
|
| compare_keys = [
|
| "total_examples", "valid_examples", "validity_rate",
|
| "avg_response_length", "duplicate_rate", "near_duplicate_rate",
|
| "topic_diversity", "topic_concentration", "too_short", "too_long",
|
| ]
|
|
|
| delta = {}
|
| pct_change = {}
|
| for k in compare_keys:
|
| va = a.get(k, 0)
|
| vb = b.get(k, 0)
|
| if isinstance(va, (int, float)) and isinstance(vb, (int, float)):
|
| delta[k] = round(vb - va, 4)
|
| if va != 0:
|
| pct_change[k] = round((vb - va) / abs(va) * 100, 2)
|
| else:
|
| pct_change[k] = 0.0
|
|
|
| return {
|
| "version_a": version_a,
|
| "version_b": version_b,
|
| "metrics_a": {k: a.get(k) for k in compare_keys},
|
| "metrics_b": {k: b.get(k) for k in compare_keys},
|
| "delta": delta,
|
| "percent_change": pct_change,
|
| }
|
|
|
|
|
|
|
| def detect_regressions(
|
| self,
|
| version_a: str,
|
| version_b: str,
|
| ) -> List[Dict[str, Any]]:
|
| """Detect quality regressions between version_a and version_b.
|
|
|
| Returns list of regression dicts, each with:
|
| - metric, old_value, new_value, change, threshold, severity
|
| """
|
| comparison = self.compare_versions(version_a, version_b)
|
| if "error" in comparison:
|
| return []
|
|
|
| regressions: List[Dict[str, Any]] = []
|
|
|
| for metric, threshold in self.REGRESSION_THRESHOLDS.items():
|
| pct = comparison.get("percent_change", {}).get(metric, 0)
|
| delta = comparison.get("delta", {}).get(metric, 0)
|
| old_val = comparison.get("metrics_a", {}).get(metric, 0)
|
| new_val = comparison.get("metrics_b", {}).get(metric, 0)
|
|
|
| is_regression = False
|
| if metric == "duplicate_rate":
|
|
|
| if delta > threshold:
|
| is_regression = True
|
| else:
|
|
|
| if old_val != 0 and (pct / 100) < threshold:
|
| is_regression = True
|
|
|
| if is_regression:
|
| severity = "critical" if abs(pct) > abs(threshold * 100 * 2) else "warning"
|
| regressions.append({
|
| "metric": metric,
|
| "old_value": old_val,
|
| "new_value": new_val,
|
| "change": delta,
|
| "percent_change": pct,
|
| "threshold": threshold,
|
| "severity": severity,
|
| })
|
|
|
| return regressions
|
|
|
| def check_latest_regressions(self) -> List[Dict[str, Any]]:
|
| """Compare the two most recent versions and check for regressions."""
|
| versions = self.get_versions()
|
| if len(versions) < 2:
|
| return []
|
| return self.detect_regressions(versions[-2], versions[-1])
|
|
|
|
|
|
|
| def format_quality_summary(self) -> str:
|
| """Format a summary of all dataset quality records."""
|
| entries = sorted(self.get_all(), key=lambda e: e.get("timestamp", ""))
|
| if not entries:
|
| return "No dataset quality records found."
|
|
|
| lines: List[str] = []
|
| lines.append("=" * 74)
|
| lines.append(" DATASET QUALITY MONITOR")
|
| lines.append("=" * 74)
|
| lines.append(f" Total records: {len(entries)}")
|
| lines.append(f" Versions tracked: {len(self.get_versions())}")
|
| lines.append("")
|
|
|
|
|
| lines.append("-" * 74)
|
| lines.append(
|
| f" {'Version':<16} {'Total':>6} {'Valid':>6} {'AvgLen':>7} "
|
| f"{'Dup%':>6} {'Divers':>7} {'Conc%':>6}"
|
| )
|
| lines.append(
|
| f" {'-------':<16} {'-----':>6} {'-----':>6} {'------':>7} "
|
| f"{'----':>6} {'------':>7} {'-----':>6}"
|
| )
|
|
|
| for e in entries:
|
| ver = e.get("dataset_version", "?")[:15]
|
| total = e.get("total_examples", 0)
|
| valid = e.get("valid_examples", 0)
|
| avg_len = e.get("avg_response_length", 0)
|
| dup = e.get("duplicate_rate", 0) * 100
|
| div = e.get("topic_diversity", 0)
|
| conc = e.get("topic_concentration", 0) * 100
|
| lines.append(
|
| f" {ver:<16} {total:>6} {valid:>6} {avg_len:>7.1f} "
|
| f"{dup:>5.1f}% {div:>7.4f} {conc:>5.1f}%"
|
| )
|
|
|
|
|
| regressions = self.check_latest_regressions()
|
| if regressions:
|
| lines.append("")
|
| lines.append("-" * 74)
|
| lines.append(" QUALITY REGRESSIONS DETECTED")
|
| lines.append("-" * 74)
|
| for r in regressions:
|
| sev = r["severity"].upper()
|
| lines.append(
|
| f" [{sev}] {r['metric']}: "
|
| f"{r['old_value']} -> {r['new_value']} "
|
| f"({r['percent_change']:+.1f}%)"
|
| )
|
|
|
| lines.append("")
|
| lines.append("=" * 74)
|
| return "\n".join(lines)
|
|
|