Spaces:
Sleeping
Sleeping
File size: 9,831 Bytes
2222383 668419c 2222383 3f1a1d6 2222383 668419c 7d3f899 2222383 b90444b 2222383 668419c 1a9b340 2222383 668419c 2222383 1a9b340 8ca0e43 1a9b340 668419c 2222383 1a9b340 668419c 2222383 de30f06 2222383 668419c 2222383 de30f06 2222383 1a9b340 2222383 1a9b340 2222383 668419c 2222383 668419c 2222383 668419c 2222383 668419c 217890c 668419c 217890c 668419c 217890c 668419c 217890c 668419c f2c6de4 668419c 217890c 1a9b340 2222383 1a9b340 668419c 1a9b340 668419c 1a9b340 668419c 2d11b15 1a9b340 668419c 1a9b340 2222383 1a9b340 2222383 668419c 3f1a1d6 668419c 1a9b340 668419c 2d11b15 668419c 2d11b15 668419c 1a9b340 668419c ca8312a 1a9b340 2d11b15 b90444b 668419c b90444b 2222383 668419c f2c6de4 2222383 1a9b340 668419c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 | """
classify.py β 3-Tier Hybrid Pipeline (V10 β Bug Fixed)
Bug fixes vs V9:
1. BERT latency was reporting cumulative sum of per-log values (= total batch ms),
not actual per-log latency. Now stores real wall-clock batch time separately
and reports true per-log ms.
2. @lru_cache was per-process β with ProcessPoolExecutor every worker had its own
cold cache, so cross-process cache hits were impossible. Replaced with a
multiprocessing.Manager dict shared across all workers.
3. LLM tier label was using a hard '<5ms' threshold to detect cache hits which
was unreliable (cold process startup skews timings). Now uses an explicit
boolean returned alongside the label.
"""
from __future__ import annotations
import os
import time
import statistics
import pandas as pd
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from processor_regex import classify_with_regex
from processor_bert import classify_batch as bert_batch
from processor_llm import classify_with_llm
# ββ Config ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
LEGACY_SOURCE = os.getenv("LEGACY_SOURCE", "LegacyCRM")
# ββ Shared cross-process LLM cache ββββββββββββββββββββββββββββββββββββββββββ
# BUG FIX #2: @lru_cache is per-process. With ProcessPoolExecutor, every worker
# has its own private cache that never warms across processes.
# Using multiprocessing.Manager().dict() gives a single shared cache for all workers.
_manager = None
_shared_llm_cache = None
def _get_shared_cache():
"""Return (or lazily create) the shared cross-process LLM cache."""
global _manager, _shared_llm_cache
if _shared_llm_cache is None:
_manager = multiprocessing.Manager()
_shared_llm_cache = _manager.dict()
return _shared_llm_cache
def _cached_llm_call(log_msg: str, cache: dict) -> tuple:
"""
Call LLM with shared cross-process cache.
Returns (label, cache_hit).
BUG FIX #2: uses shared dict instead of @lru_cache so all worker processes
benefit from each other's lookups.
BUG FIX #3: returns explicit cache_hit boolean instead of inferring from latency.
"""
if log_msg in cache:
return cache[log_msg], True
label = classify_with_llm(log_msg)
cache[log_msg] = label
return label, False
# ββ Result type βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
return {
"label": label,
"tier": tier,
"confidence": confidence,
"latency_ms": round(latency_ms, 4),
}
# ββ Single log (backward-compatible) ββββββββββββββββββββββββββββββββββββββββ
def classify_log(source: str, log_msg: str) -> dict:
results = classify_logs([(source, log_msg)])
return results[0]
# ββ Batch pipeline (main entry point) βββββββββββββββββββββββββββββββββββββββ
def classify_logs(logs: list) -> list:
n = len(logs)
results = [None] * n
# ββ Step 1: Route to groups βββββββββββββββββββββββββββββββββββββββββββββ
llm_indices = []
bert_indices = []
for i, (source, log_msg) in enumerate(logs):
if source == LEGACY_SOURCE:
llm_indices.append(i)
else:
t_start = time.perf_counter()
label = classify_with_regex(log_msg)
if label:
latency_ms = (time.perf_counter() - t_start) * 1000
results[i] = _make_result(label, "Regex", 1.0, latency_ms)
else:
bert_indices.append(i)
# ββ Step 2: BERT batch (CPU Bound) ββββββββββββββββββββββββββββββββββββββ
if bert_indices:
bert_msgs = [logs[i][1] for i in bert_indices]
t_bert_start = time.perf_counter()
bert_results = bert_batch(bert_msgs)
t_bert_wall_ms = (time.perf_counter() - t_bert_start) * 1000
# BUG FIX #1: store TRUE per-log wall-clock ms.
# Old code did: bert_ms_per_log = total_ms / n, then assigned that same
# value to every log. pipeline_summary() then summed all n copies back up
# to total_ms β making BERT look like it took 2,962,635 ms on 2M logs.
bert_per_log_ms = t_bert_wall_ms / len(bert_msgs)
for idx, (label, conf) in zip(bert_indices, bert_results):
if label != "Unclassified":
results[idx] = _make_result(label, "BERT", conf, bert_per_log_ms)
else:
llm_indices.append(idx)
# ββ Step 3: LLM (I/O Bound β Threading Applied Here) ββββββββββββββββββββ
if llm_indices:
cache = _get_shared_cache()
def parallel_llm(idx):
src, msg = logs[idx]
t_llm_0 = time.perf_counter()
# BUG FIX #2 + #3: shared cache + explicit cache_hit flag
label, cache_hit = _cached_llm_call(msg, cache)
t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
base_tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)"
# BUG FIX #3: explicit boolean, not fragile latency threshold
tier = f"{base_tier} (Cache Hit)" if cache_hit else f"{base_tier} (API Call)"
return idx, _make_result(label, tier, None, t_llm_ms)
with ThreadPoolExecutor() as executor:
llm_results = list(executor.map(parallel_llm, llm_indices))
for idx, res in llm_results:
results[idx] = res
return results
# ββ Pipeline summary βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def pipeline_summary(results: list) -> dict:
"""
BUG FIX #1: With corrected per-log latency values (true wall-clock / n),
total_ms now reflects real batch wall time instead of the old tautology of
(total_ms/n) * n = total_ms that showed as 2,962,635 ms for BERT.
"""
tier_groups = {}
label_counts = {}
for r in results:
tier = r["tier"]
tier_groups.setdefault(tier, []).append(r["latency_ms"])
label_counts[r["label"]] = label_counts.get(r["label"], 0) + 1
total = len(results)
tier_stats = {}
for tier, latencies in tier_groups.items():
latencies_sorted = sorted(latencies)
n = len(latencies_sorted)
tier_stats[tier] = {
"count": n,
"pct": round(n / total * 100, 1),
"p50_ms": round(statistics.median(latencies_sorted), 4),
"p95_ms": round(latencies_sorted[min(int(n * 0.95), n - 1)], 4),
"p99_ms": round(latencies_sorted[min(int(n * 0.99), n - 1)], 4),
"mean_ms": round(statistics.mean(latencies_sorted), 4),
"total_ms": round(sum(latencies_sorted), 4),
}
return {
"total": total,
"tier_stats": tier_stats,
"label_counts": label_counts,
}
# ββ Multiprocessing Helper βββββββββββββββββββββββββββββββββββββββββββββββββββ
def _process_chunk(chunk: list) -> list:
"""Top-level helper required for ProcessPoolExecutor mapping."""
return classify_logs(chunk)
# ββ CSV batch classify (Balanced Processing) βββββββββββββββββββββββββββββββββ
def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple:
"""
Balanced Batch Processing to prevent CPU Starvation UI crashes.
"""
df = pd.read_csv(input_path)
required = {"source", "log_message"}
if not required.issubset(df.columns):
raise ValueError(f"Missing required columns in CSV. Expected: {required}. Found: {set(df.columns)}")
log_pairs = list(zip(df["source"], df["log_message"]))
total_logs = len(log_pairs)
# Use exactly half the available CPU cores β leaves the other half for Gradio.
safe_cores = max(1, os.cpu_count() // 2)
# Chunk size of 10,000 prevents CPU lockups during inter-process pickling.
chunk_size = 10000
chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
results = []
print(f"π₯ Firing up {safe_cores} CPU Cores (Leaving remaining for UI)...")
t_start = time.perf_counter()
with ProcessPoolExecutor(max_workers=safe_cores) as executor:
for chunk_result in executor.map(_process_chunk, chunks):
results.extend(chunk_result)
t_end = time.perf_counter()
print(f"β±οΈ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")
df["predicted_label"] = [r["label"] for r in results]
df["tier_used"] = [r["tier"] for r in results]
df["latency_ms"] = [r["latency_ms"] for r in results]
df["confidence"] = [
f"{r['confidence']:.1%}" if r["confidence"] is not None else "N/A"
for r in results
]
df.to_csv(output_path, index=False)
return output_path, df
# Aliases
classify = classify_logs
|