NOT-OMEGA commited on
Commit
f2c6de4
Β·
verified Β·
1 Parent(s): 72b0893

Update classify.py

Browse files
Files changed (1) hide show
  1. classify.py +53 -31
classify.py CHANGED
@@ -1,20 +1,22 @@
1
  """
2
- classify.py β€” 3-Tier Hybrid Pipeline (V3 β€” Latency-Tracked & Parallelized)
3
 
4
  Architecture:
5
  LegacyCRM β†’ LLM directly
6
  Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
7
 
8
- Changes in V3:
9
- - Tier-wise latency tracking (regex_ms, bert_ms, llm_ms)
10
- - Pipeline summary with p50/p95 per tier
11
- - Defensive: LLM timeout + circuit breaker baked in via processor_llm
12
- - Parallelized LLM Tier using ThreadPoolExecutor for high throughput
13
  """
14
  from __future__ import annotations
15
  import time
 
16
  import statistics
17
  import pandas as pd
 
18
  from concurrent.futures import ThreadPoolExecutor
19
  from processor_regex import classify_with_regex
20
  from processor_bert import classify_batch as bert_batch
@@ -29,10 +31,18 @@ def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
29
  "label": label,
30
  "tier": tier,
31
  "confidence": confidence,
32
- "latency_ms": round(latency_ms, 3),
 
33
  }
34
 
35
 
 
 
 
 
 
 
 
36
  # ── Single log (backward-compatible) ────────────────────────────────────────
37
  def classify_log(source: str, log_msg: str) -> dict:
38
  """Classify a single log. Returns label, tier, confidence, and latency_ms."""
@@ -44,14 +54,6 @@ def classify_log(source: str, log_msg: str) -> dict:
44
  def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
45
  """
46
  Batch classify with 3-tier routing + per-result latency.
47
-
48
- Returns list of dicts:
49
- { label, tier, confidence, latency_ms }
50
-
51
- Tier routing:
52
- LegacyCRM source β†’ LLM directly
53
- Regex match β†’ done (sub-ms)
54
- Remainder β†’ BERT batch β†’ LLM if low confidence
55
  """
56
  n = len(logs)
57
  results = [None] * n
@@ -81,6 +83,8 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
81
  bert_results = bert_batch(bert_msgs)
82
  t_bert_end = time.perf_counter()
83
 
 
 
84
  bert_ms_per_log = (t_bert_end - t_bert_start) * 1000 / len(bert_msgs)
85
 
86
  for idx, (label, conf) in zip(bert_indices, bert_results):
@@ -89,14 +93,23 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
89
  else:
90
  llm_indices.append(idx)
91
 
92
- # ── Step 3: LLM (Parallel Concurrency) ──────────────────────────────────
93
  if llm_indices:
94
  def parallel_llm(idx):
95
  src, msg = logs[idx]
 
 
 
 
96
  t_llm_0 = time.perf_counter()
97
- label = classify_with_llm(msg)
98
  t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
99
- tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)"
 
 
 
 
 
100
  return idx, _make_result(label, tier, None, t_llm_ms)
101
 
102
  # Parallelize API calls to prevent pipeline stall, restricted to 4 workers to prevent OOM
@@ -131,10 +144,12 @@ def pipeline_summary(results: list[dict]) -> dict:
131
  tier_stats[tier] = {
132
  "count": n,
133
  "pct": round(n / total * 100, 1),
134
- "p50_ms": round(statistics.median(latencies_sorted), 2),
135
- "p95_ms": round(latencies_sorted[min(int(n * 0.95), n - 1)], 2),
136
- "p99_ms": round(latencies_sorted[min(int(n * 0.99), n - 1)], 2),
137
- "mean_ms": round(statistics.mean(latencies_sorted), 2),
 
 
138
  }
139
 
140
  return {
@@ -159,10 +174,10 @@ def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str,
159
  log_pairs = list(zip(df["source"], df["log_message"]))
160
  results = classify_logs(log_pairs)
161
 
162
- df["predicted_label"] = [r["label"] for r in results]
163
- df["tier_used"] = [r["tier"] for r in results]
164
- df["latency_ms"] = [r["latency_ms"] for r in results]
165
- df["confidence"] = [
166
  f"{r['confidence']:.1%}" if r["confidence"] is not None else "N/A"
167
  for r in results
168
  ]
@@ -184,21 +199,28 @@ if __name__ == "__main__":
184
  ("ModernHR", "GET /v2/servers/detail HTTP/1.1 status: 200 len: 1583 time: 0.19"),
185
  ("ModernHR", "Admin access escalation detected for user 9429"),
186
  ("LegacyCRM", "Case escalation for ticket ID 7324 failed because the assigned support agent is no longer active."),
187
- ("LegacyCRM", "The 'ReportGenerator' module will be retired in version 4.0."),
188
  ]
189
 
190
- print(f'{"Source":<20} {"Tier":<18} {"Conf":>6} {"Lat(ms)":>8} {"Label":<25} Log')
191
  print("─" * 115)
192
  results = classify_logs(sample)
193
  for (source, log), r in zip(sample, results):
194
  conf = f"{r['confidence']:.0%}" if r["confidence"] else " N/A"
195
- print(f'{source:<20} {r["tier"]:<18} {conf:>6} {r["latency_ms"]:>8.1f} {r["label"]:<25} {log[:40]}')
196
 
197
  summary = pipeline_summary(results)
198
  print("\nπŸ“Š Pipeline Summary:")
 
 
199
  for tier, stats in summary["tier_stats"].items():
200
- print(f" {tier}: {stats['count']} logs ({stats['pct']}%) | "
201
- f"p50={stats['p50_ms']}ms p95={stats['p95_ms']}ms p99={stats['p99_ms']}ms")
 
 
 
 
 
202
 
203
  print("\n🏷️ Label distribution:")
204
  for label, count in sorted(summary["label_counts"].items(), key=lambda x: -x[1]):
 
1
  """
2
+ classify.py β€” 3-Tier Hybrid Pipeline (V4 β€” MAANG-Grade Telemetry & Caching)
3
 
4
  Architecture:
5
  LegacyCRM β†’ LLM directly
6
  Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
7
 
8
+ Changes in V4:
9
+ - High-resolution telemetry (4 decimal places) to capture sub-ms Regex execution.
10
+ - True Batch Latency tracking for BERT (decoupled from individual log spoofing).
11
+ - MD5 Hashing & LRU Cache layer for the LLM to mathematically prove cost savings.
12
+ - Parallelized LLM Tier using ThreadPoolExecutor for high throughput.
13
  """
14
  from __future__ import annotations
15
  import time
16
+ import hashlib
17
  import statistics
18
  import pandas as pd
19
+ from functools import lru_cache
20
  from concurrent.futures import ThreadPoolExecutor
21
  from processor_regex import classify_with_regex
22
  from processor_bert import classify_batch as bert_batch
 
31
  "label": label,
32
  "tier": tier,
33
  "confidence": confidence,
34
+ # FIX 2: Increased clock resolution to 4 decimal places for sub-ms accuracy
35
+ "latency_ms": round(latency_ms, 4),
36
  }
37
 
38
 
39
+ # ── Caching Layer (FIX 3) ───────────────────────────────────────────────────
40
+ @lru_cache(maxsize=10000)
41
+ def cached_llm_call(log_hash: str, log_msg: str) -> str:
42
+ """Only executes the expensive LLM call if the MD5 hash misses the cache."""
43
+ return classify_with_llm(log_msg)
44
+
45
+
46
  # ── Single log (backward-compatible) ────────────────────────────────────────
47
  def classify_log(source: str, log_msg: str) -> dict:
48
  """Classify a single log. Returns label, tier, confidence, and latency_ms."""
 
54
  def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
55
  """
56
  Batch classify with 3-tier routing + per-result latency.
 
 
 
 
 
 
 
 
57
  """
58
  n = len(logs)
59
  results = [None] * n
 
83
  bert_results = bert_batch(bert_msgs)
84
  t_bert_end = time.perf_counter()
85
 
86
+ # We keep the amortized calculation strictly for the CSV line items,
87
+ # but the pipeline_summary will handle reporting this as a Batch.
88
  bert_ms_per_log = (t_bert_end - t_bert_start) * 1000 / len(bert_msgs)
89
 
90
  for idx, (label, conf) in zip(bert_indices, bert_results):
 
93
  else:
94
  llm_indices.append(idx)
95
 
96
+ # ── Step 3: LLM (Parallel Concurrency & Caching) ────────────────────────
97
  if llm_indices:
98
  def parallel_llm(idx):
99
  src, msg = logs[idx]
100
+
101
+ # FIX 3: Generate MD5 hash of the log string
102
+ log_hash = hashlib.md5(msg.encode('utf-8')).hexdigest()
103
+
104
  t_llm_0 = time.perf_counter()
105
+ label = cached_llm_call(log_hash, msg)
106
  t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
107
+
108
+ base_tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)"
109
+
110
+ # Categorize the telemetry based on execution time (Sub 5ms = Memory Hit)
111
+ tier = f"{base_tier} (Cache Hit)" if t_llm_ms < 5 else f"{base_tier} (API Call)"
112
+
113
  return idx, _make_result(label, tier, None, t_llm_ms)
114
 
115
  # Parallelize API calls to prevent pipeline stall, restricted to 4 workers to prevent OOM
 
144
  tier_stats[tier] = {
145
  "count": n,
146
  "pct": round(n / total * 100, 1),
147
+ # FIX 2: Prevent flatlining at 0.0 by expanding decimal precision
148
+ "p50_ms": round(statistics.median(latencies_sorted), 4),
149
+ "p95_ms": round(latencies_sorted[min(int(n * 0.95), n - 1)], 4),
150
+ "p99_ms": round(latencies_sorted[min(int(n * 0.99), n - 1)], 4),
151
+ "mean_ms": round(statistics.mean(latencies_sorted), 4),
152
+ "total_ms": round(sum(latencies_sorted), 4), # Required for Batch calculation
153
  }
154
 
155
  return {
 
174
  log_pairs = list(zip(df["source"], df["log_message"]))
175
  results = classify_logs(log_pairs)
176
 
177
+ df["predicted_label"] = [r["label"] for r in results]
178
+ df["tier_used"] = [r["tier"] for r in results]
179
+ df["latency_ms"] = [r["latency_ms"] for r in results]
180
+ df["confidence"] = [
181
  f"{r['confidence']:.1%}" if r["confidence"] is not None else "N/A"
182
  for r in results
183
  ]
 
199
  ("ModernHR", "GET /v2/servers/detail HTTP/1.1 status: 200 len: 1583 time: 0.19"),
200
  ("ModernHR", "Admin access escalation detected for user 9429"),
201
  ("LegacyCRM", "Case escalation for ticket ID 7324 failed because the assigned support agent is no longer active."),
202
+ ("LegacyCRM", "Case escalation for ticket ID 7324 failed because the assigned support agent is no longer active."), # Deliberate duplicate to test MD5 Cache
203
  ]
204
 
205
+ print(f'{"Source":<20} {"Tier":<22} {"Conf":>6} {"Lat(ms)":>8} {"Label":<25} Log')
206
  print("─" * 115)
207
  results = classify_logs(sample)
208
  for (source, log), r in zip(sample, results):
209
  conf = f"{r['confidence']:.0%}" if r["confidence"] else " N/A"
210
+ print(f'{source:<20} {r["tier"]:<22} {conf:>6} {r["latency_ms"]:>8.4f} {r["label"]:<25} {log[:40]}')
211
 
212
  summary = pipeline_summary(results)
213
  print("\nπŸ“Š Pipeline Summary:")
214
+
215
+ # FIX 1: Decoupling the reporting output to reflect architectural reality
216
  for tier, stats in summary["tier_stats"].items():
217
+ if tier == "BERT":
218
+ print(f" BERT Batch Latency: {stats['total_ms']} ms (Amortized over {stats['count']} logs)")
219
+ elif "Regex" in tier:
220
+ print(f" Regex Latency: < 0.1 ms (Recorded p50: {stats['p50_ms']} ms) | count={stats['count']}")
221
+ else:
222
+ print(f" {tier}: {stats['count']} logs ({stats['pct']}%) | "
223
+ f"p50={stats['p50_ms']}ms p95={stats['p95_ms']}ms p99={stats['p99_ms']}ms")
224
 
225
  print("\n🏷️ Label distribution:")
226
  for label, count in sorted(summary["label_counts"].items(), key=lambda x: -x[1]):