NOT-OMEGA commited on
Commit
217890c
Β·
verified Β·
1 Parent(s): 7762518

Update classify.py

Browse files
Files changed (1) hide show
  1. classify.py +20 -11
classify.py CHANGED
@@ -1,5 +1,5 @@
1
  """
2
- classify.py β€” 3-Tier Hybrid Pipeline (V3 β€” Latency-Tracked)
3
 
4
  Architecture:
5
  LegacyCRM β†’ LLM directly
@@ -10,11 +10,13 @@ Changes in V3:
10
  - Pipeline summary with p50/p95 per tier
11
  - Defensive: LLM timeout + retry baked in via processor_llm
12
  - classify_logs returns richer result dict
 
13
  """
14
  from __future__ import annotations
15
  import time
16
  import statistics
17
  import pandas as pd
 
18
  from processor_regex import classify_with_regex
19
  from processor_bert import classify_batch as bert_batch
20
  from processor_llm import classify_with_llm
@@ -60,7 +62,6 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
60
  bert_indices = []
61
  entry_times = [time.perf_counter()] * n # approximate per-log start
62
 
63
- t_route_start = time.perf_counter()
64
  for i, (source, log_msg) in enumerate(logs):
65
  entry_times[i] = time.perf_counter()
66
  if source == LEGACY_SOURCE:
@@ -90,14 +91,22 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
90
  else:
91
  llm_indices.append(idx)
92
 
93
- # ── Step 3: LLM (LegacyCRM + BERT fallback) ────────────────────────────
94
- for i in llm_indices:
95
- _, log_msg = logs[i]
96
- t0 = time.perf_counter()
97
- label = classify_with_llm(log_msg)
98
- t1 = time.perf_counter()
99
- tier = "LLM" if logs[i][0] == LEGACY_SOURCE else "LLM (fallback)"
100
- results[i] = _make_result(label, tier, None, (t1 - t0) * 1000)
 
 
 
 
 
 
 
 
101
 
102
  return results
103
 
@@ -195,4 +204,4 @@ if __name__ == "__main__":
195
 
196
  print("\n🏷️ Label distribution:")
197
  for label, count in sorted(summary["label_counts"].items(), key=lambda x: -x[1]):
198
- print(f" β€’ {label}: {count}")
 
1
  """
2
+ classify.py β€” 3-Tier Hybrid Pipeline (V3 β€” Latency-Tracked & Parallelized)
3
 
4
  Architecture:
5
  LegacyCRM β†’ LLM directly
 
10
  - Pipeline summary with p50/p95 per tier
11
  - Defensive: LLM timeout + retry baked in via processor_llm
12
  - classify_logs returns richer result dict
13
+ - πŸš€ Added ThreadPoolExecutor for Parallel LLM Processing (Zero Lag)
14
  """
15
  from __future__ import annotations
16
  import time
17
  import statistics
18
  import pandas as pd
19
+ from concurrent.futures import ThreadPoolExecutor
20
  from processor_regex import classify_with_regex
21
  from processor_bert import classify_batch as bert_batch
22
  from processor_llm import classify_with_llm
 
62
  bert_indices = []
63
  entry_times = [time.perf_counter()] * n # approximate per-log start
64
 
 
65
  for i, (source, log_msg) in enumerate(logs):
66
  entry_times[i] = time.perf_counter()
67
  if source == LEGACY_SOURCE:
 
91
  else:
92
  llm_indices.append(idx)
93
 
94
+ # ── Step 3: LLM (Parallel Concurrency Fix) ──────────────────────────────
95
+ if llm_indices:
96
+ def parallel_llm(idx):
97
+ src, msg = logs[idx]
98
+ t_llm_0 = time.perf_counter()
99
+ label = classify_with_llm(msg)
100
+ t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
101
+ tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)"
102
+ return idx, _make_result(label, tier, None, t_llm_ms)
103
+
104
+ # 🚨 GOOGLE-LEVEL FIX: 20 threads API calls ek saath marenge!
105
+ with ThreadPoolExecutor(max_workers=20) as executor:
106
+ llm_results = list(executor.map(parallel_llm, llm_indices))
107
+
108
+ for idx, res in llm_results:
109
+ results[idx] = res
110
 
111
  return results
112
 
 
204
 
205
  print("\n🏷️ Label distribution:")
206
  for label, count in sorted(summary["label_counts"].items(), key=lambda x: -x[1]):
207
+ print(f" β€’ {label}: {count}")