NOT-OMEGA commited on
Commit
de30f06
Β·
verified Β·
1 Parent(s): c96bf7e

Update classify.py

Browse files
Files changed (1) hide show
  1. classify.py +13 -15
classify.py CHANGED
@@ -8,9 +8,8 @@ Architecture:
8
  Changes in V3:
9
  - Tier-wise latency tracking (regex_ms, bert_ms, llm_ms)
10
  - Pipeline summary with p50/p95 per tier
11
- - Defensive: LLM timeout + retry baked in via processor_llm
12
- - classify_logs returns richer result dict
13
- - πŸš€ Added ThreadPoolExecutor for Parallel LLM Processing (Zero Lag)
14
  """
15
  from __future__ import annotations
16
  import time
@@ -36,7 +35,7 @@ def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
36
 
37
  # ── Single log (backward-compatible) ────────────────────────────────────────
38
  def classify_log(source: str, log_msg: str) -> dict:
39
- """Single log classify karo. Returns label, tier, confidence, latency_ms."""
40
  results = classify_logs([(source, log_msg)])
41
  return results[0]
42
 
@@ -60,18 +59,17 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
60
  # ── Step 1: Route to groups ─────────────────────────────────────────────
61
  llm_indices = []
62
  bert_indices = []
63
- entry_times = [time.perf_counter()] * n # approximate per-log start
64
 
65
  for i, (source, log_msg) in enumerate(logs):
66
- entry_times[i] = time.perf_counter()
67
  if source == LEGACY_SOURCE:
68
  llm_indices.append(i)
69
  else:
70
- t0 = time.perf_counter()
71
  label = classify_with_regex(log_msg)
72
- t1 = time.perf_counter()
73
  if label:
74
- results[i] = _make_result(label, "Regex", 1.0, (t1 - t0) * 1000)
 
75
  else:
76
  bert_indices.append(i)
77
 
@@ -91,7 +89,7 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
91
  else:
92
  llm_indices.append(idx)
93
 
94
- # ── Step 3: LLM (Parallel Concurrency Fix) ──────────────────────────────
95
  if llm_indices:
96
  def parallel_llm(idx):
97
  src, msg = logs[idx]
@@ -101,8 +99,8 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
101
  tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)"
102
  return idx, _make_result(label, tier, None, t_llm_ms)
103
 
104
- # 🚨 GOOGLE-LEVEL FIX: 20 threads API calls ek saath marenge!
105
- with ThreadPoolExecutor(max_workers=20) as executor:
106
  llm_results = list(executor.map(parallel_llm, llm_indices))
107
 
108
  for idx, res in llm_results:
@@ -149,14 +147,14 @@ def pipeline_summary(results: list[dict]) -> dict:
149
  # ── CSV batch classify ───────────────────────────────────────────────────────
150
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
151
  """
152
- CSV file classify karo.
153
  Required columns: 'source', 'log_message'
154
- Output: adds 'predicted_label', 'tier_used', 'confidence', 'latency_ms'
155
  """
156
  df = pd.read_csv(input_path)
157
  required = {"source", "log_message"}
158
  if not required.issubset(df.columns):
159
- raise ValueError(f"CSV mein ye columns chahiye: {required}. Mila: {set(df.columns)}")
160
 
161
  log_pairs = list(zip(df["source"], df["log_message"]))
162
  results = classify_logs(log_pairs)
 
8
  Changes in V3:
9
  - Tier-wise latency tracking (regex_ms, bert_ms, llm_ms)
10
  - Pipeline summary with p50/p95 per tier
11
+ - Defensive: LLM timeout + circuit breaker baked in via processor_llm
12
+ - Parallelized LLM Tier using ThreadPoolExecutor for high throughput
 
13
  """
14
  from __future__ import annotations
15
  import time
 
35
 
36
  # ── Single log (backward-compatible) ────────────────────────────────────────
37
  def classify_log(source: str, log_msg: str) -> dict:
38
+ """Classify a single log. Returns label, tier, confidence, and latency_ms."""
39
  results = classify_logs([(source, log_msg)])
40
  return results[0]
41
 
 
59
  # ── Step 1: Route to groups ─────────────────────────────────────────────
60
  llm_indices = []
61
  bert_indices = []
 
62
 
63
  for i, (source, log_msg) in enumerate(logs):
 
64
  if source == LEGACY_SOURCE:
65
  llm_indices.append(i)
66
  else:
67
+ t_start = time.perf_counter()
68
  label = classify_with_regex(log_msg)
69
+
70
  if label:
71
+ latency_ms = (time.perf_counter() - t_start) * 1000
72
+ results[i] = _make_result(label, "Regex", 1.0, latency_ms)
73
  else:
74
  bert_indices.append(i)
75
 
 
89
  else:
90
  llm_indices.append(idx)
91
 
92
+ # ── Step 3: LLM (Parallel Concurrency) ──────────────────────────────────
93
  if llm_indices:
94
  def parallel_llm(idx):
95
  src, msg = logs[idx]
 
99
  tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)"
100
  return idx, _make_result(label, tier, None, t_llm_ms)
101
 
102
+ # Parallelize API calls to prevent pipeline stall, restricted to 4 workers to prevent OOM
103
+ with ThreadPoolExecutor(max_workers=4) as executor:
104
  llm_results = list(executor.map(parallel_llm, llm_indices))
105
 
106
  for idx, res in llm_results:
 
147
  # ── CSV batch classify ───────────────────────────────────────────────────────
148
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
149
  """
150
+ Process a batch of logs from a CSV file.
151
  Required columns: 'source', 'log_message'
152
+ Output: appends 'predicted_label', 'tier_used', 'confidence', 'latency_ms'
153
  """
154
  df = pd.read_csv(input_path)
155
  required = {"source", "log_message"}
156
  if not required.issubset(df.columns):
157
+ raise ValueError(f"Missing required columns in CSV. Expected: {required}. Found: {set(df.columns)}")
158
 
159
  log_pairs = list(zip(df["source"], df["log_message"]))
160
  results = classify_logs(log_pairs)