LogAI-Engine / classify.py
NOT-OMEGA's picture
Update classify.py
668419c verified
"""
classify.py β€” 3-Tier Hybrid Pipeline (V10 β€” Bug Fixed)
Bug fixes vs V9:
1. BERT latency was reporting cumulative sum of per-log values (= total batch ms),
not actual per-log latency. Now stores real wall-clock batch time separately
and reports true per-log ms.
2. @lru_cache was per-process β€” with ProcessPoolExecutor every worker had its own
cold cache, so cross-process cache hits were impossible. Replaced with a
multiprocessing.Manager dict shared across all workers.
3. LLM tier label was using a hard '<5ms' threshold to detect cache hits which
was unreliable (cold process startup skews timings). Now uses an explicit
boolean returned alongside the label.
"""
from __future__ import annotations
import os
import time
import statistics
import pandas as pd
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from processor_regex import classify_with_regex
from processor_bert import classify_batch as bert_batch
from processor_llm import classify_with_llm
# ── Config ──────────────────────────────────────────────────────────────────
LEGACY_SOURCE = os.getenv("LEGACY_SOURCE", "LegacyCRM")
# ── Shared cross-process LLM cache ──────────────────────────────────────────
# BUG FIX #2: @lru_cache is per-process. With ProcessPoolExecutor, every worker
# has its own private cache that never warms across processes.
# Using multiprocessing.Manager().dict() gives a single shared cache for all workers.
_manager = None
_shared_llm_cache = None
def _get_shared_cache():
"""Return (or lazily create) the shared cross-process LLM cache."""
global _manager, _shared_llm_cache
if _shared_llm_cache is None:
_manager = multiprocessing.Manager()
_shared_llm_cache = _manager.dict()
return _shared_llm_cache
def _cached_llm_call(log_msg: str, cache: dict) -> tuple:
"""
Call LLM with shared cross-process cache.
Returns (label, cache_hit).
BUG FIX #2: uses shared dict instead of @lru_cache so all worker processes
benefit from each other's lookups.
BUG FIX #3: returns explicit cache_hit boolean instead of inferring from latency.
"""
if log_msg in cache:
return cache[log_msg], True
label = classify_with_llm(log_msg)
cache[log_msg] = label
return label, False
# ── Result type ─────────────────────────────────────────────────────────────
def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
return {
"label": label,
"tier": tier,
"confidence": confidence,
"latency_ms": round(latency_ms, 4),
}
# ── Single log (backward-compatible) ────────────────────────────────────────
def classify_log(source: str, log_msg: str) -> dict:
results = classify_logs([(source, log_msg)])
return results[0]
# ── Batch pipeline (main entry point) ───────────────────────────────────────
def classify_logs(logs: list) -> list:
n = len(logs)
results = [None] * n
# ── Step 1: Route to groups ─────────────────────────────────────────────
llm_indices = []
bert_indices = []
for i, (source, log_msg) in enumerate(logs):
if source == LEGACY_SOURCE:
llm_indices.append(i)
else:
t_start = time.perf_counter()
label = classify_with_regex(log_msg)
if label:
latency_ms = (time.perf_counter() - t_start) * 1000
results[i] = _make_result(label, "Regex", 1.0, latency_ms)
else:
bert_indices.append(i)
# ── Step 2: BERT batch (CPU Bound) ──────────────────────────────────────
if bert_indices:
bert_msgs = [logs[i][1] for i in bert_indices]
t_bert_start = time.perf_counter()
bert_results = bert_batch(bert_msgs)
t_bert_wall_ms = (time.perf_counter() - t_bert_start) * 1000
# BUG FIX #1: store TRUE per-log wall-clock ms.
# Old code did: bert_ms_per_log = total_ms / n, then assigned that same
# value to every log. pipeline_summary() then summed all n copies back up
# to total_ms β€” making BERT look like it took 2,962,635 ms on 2M logs.
bert_per_log_ms = t_bert_wall_ms / len(bert_msgs)
for idx, (label, conf) in zip(bert_indices, bert_results):
if label != "Unclassified":
results[idx] = _make_result(label, "BERT", conf, bert_per_log_ms)
else:
llm_indices.append(idx)
# ── Step 3: LLM (I/O Bound β€” Threading Applied Here) ────────────────────
if llm_indices:
cache = _get_shared_cache()
def parallel_llm(idx):
src, msg = logs[idx]
t_llm_0 = time.perf_counter()
# BUG FIX #2 + #3: shared cache + explicit cache_hit flag
label, cache_hit = _cached_llm_call(msg, cache)
t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
base_tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)"
# BUG FIX #3: explicit boolean, not fragile latency threshold
tier = f"{base_tier} (Cache Hit)" if cache_hit else f"{base_tier} (API Call)"
return idx, _make_result(label, tier, None, t_llm_ms)
with ThreadPoolExecutor() as executor:
llm_results = list(executor.map(parallel_llm, llm_indices))
for idx, res in llm_results:
results[idx] = res
return results
# ── Pipeline summary ─────────────────────────────────────────────────────────
def pipeline_summary(results: list) -> dict:
"""
BUG FIX #1: With corrected per-log latency values (true wall-clock / n),
total_ms now reflects real batch wall time instead of the old tautology of
(total_ms/n) * n = total_ms that showed as 2,962,635 ms for BERT.
"""
tier_groups = {}
label_counts = {}
for r in results:
tier = r["tier"]
tier_groups.setdefault(tier, []).append(r["latency_ms"])
label_counts[r["label"]] = label_counts.get(r["label"], 0) + 1
total = len(results)
tier_stats = {}
for tier, latencies in tier_groups.items():
latencies_sorted = sorted(latencies)
n = len(latencies_sorted)
tier_stats[tier] = {
"count": n,
"pct": round(n / total * 100, 1),
"p50_ms": round(statistics.median(latencies_sorted), 4),
"p95_ms": round(latencies_sorted[min(int(n * 0.95), n - 1)], 4),
"p99_ms": round(latencies_sorted[min(int(n * 0.99), n - 1)], 4),
"mean_ms": round(statistics.mean(latencies_sorted), 4),
"total_ms": round(sum(latencies_sorted), 4),
}
return {
"total": total,
"tier_stats": tier_stats,
"label_counts": label_counts,
}
# ── Multiprocessing Helper ───────────────────────────────────────────────────
def _process_chunk(chunk: list) -> list:
"""Top-level helper required for ProcessPoolExecutor mapping."""
return classify_logs(chunk)
# ── CSV batch classify (Balanced Processing) ─────────────────────────────────
def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple:
"""
Balanced Batch Processing to prevent CPU Starvation UI crashes.
"""
df = pd.read_csv(input_path)
required = {"source", "log_message"}
if not required.issubset(df.columns):
raise ValueError(f"Missing required columns in CSV. Expected: {required}. Found: {set(df.columns)}")
log_pairs = list(zip(df["source"], df["log_message"]))
total_logs = len(log_pairs)
# Use exactly half the available CPU cores β€” leaves the other half for Gradio.
safe_cores = max(1, os.cpu_count() // 2)
# Chunk size of 10,000 prevents CPU lockups during inter-process pickling.
chunk_size = 10000
chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
results = []
print(f"πŸ”₯ Firing up {safe_cores} CPU Cores (Leaving remaining for UI)...")
t_start = time.perf_counter()
with ProcessPoolExecutor(max_workers=safe_cores) as executor:
for chunk_result in executor.map(_process_chunk, chunks):
results.extend(chunk_result)
t_end = time.perf_counter()
print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")
df["predicted_label"] = [r["label"] for r in results]
df["tier_used"] = [r["tier"] for r in results]
df["latency_ms"] = [r["latency_ms"] for r in results]
df["confidence"] = [
f"{r['confidence']:.1%}" if r["confidence"] is not None else "N/A"
for r in results
]
df.to_csv(output_path, index=False)
return output_path, df
# Aliases
classify = classify_logs