NOT-OMEGA commited on
Commit
2d11b15
Β·
verified Β·
1 Parent(s): 0d4acf4

Update classify.py

Browse files
Files changed (1) hide show
  1. classify.py +33 -36
classify.py CHANGED
@@ -1,20 +1,12 @@
1
  """
2
- classify.py β€” 3-Tier Hybrid Pipeline (V10 β€” Thread-Safe & Shared Cache)
3
-
4
- Architecture:
5
- LegacyCRM β†’ LLM directly
6
- Others β†’ Regex β†’ BERT (batch) β†’ LLM fallback
7
-
8
- Changes in V10:
9
- - Removed buggy ProcessPoolExecutor (Fixes fork deadlocks & memory spikes).
10
- - Global ThreadPoolExecutor for LLM (Fixes thread thrashing & context switching).
11
- - LRU Cache is now genuinely shared across the entire run.
12
  """
13
  from __future__ import annotations
14
  import os
15
  import time
16
  import statistics
17
  import pandas as pd
 
18
  from functools import lru_cache
19
  from concurrent.futures import ThreadPoolExecutor
20
  from processor_regex import classify_with_regex
@@ -24,9 +16,6 @@ from processor_llm import classify_with_llm
24
  # ── Config ──────────────────────────────────────────────────────────────────
25
  LEGACY_SOURCE = os.getenv("LEGACY_SOURCE", "LegacyCRM")
26
 
27
- # FIX: One global pool to prevent OS thread thrashing per chunk.
28
- _llm_executor = ThreadPoolExecutor(max_workers=min(32, (os.cpu_count() or 1) * 4))
29
-
30
  # ── Result type ─────────────────────────────────────────────────────────────
31
  def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
32
  return {
@@ -36,25 +25,20 @@ def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
36
  "latency_ms": round(latency_ms, 4),
37
  }
38
 
39
- # ── Caching Layer (Now Global) ──────────────────────────────────────────────
40
- @lru_cache(maxsize=500000)
41
  def cached_llm_call(log_msg: str) -> str:
42
- """Executes the expensive LLM call only if the string misses the cache."""
43
  return classify_with_llm(log_msg)
44
 
45
- # ── Single log (backward-compatible) ────────────────────────────────────────
46
- def classify_log(source: str, log_msg: str) -> dict:
47
- results = classify_logs([(source, log_msg)])
48
- return results[0]
49
-
50
- # ── Batch pipeline (main entry point) ───────────────────────────────────────
51
  def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
 
52
  n = len(logs)
53
  results = [None] * n
54
 
55
  llm_indices = []
56
  bert_indices = []
57
 
 
58
  for i, (source, log_msg) in enumerate(logs):
59
  if source == LEGACY_SOURCE:
60
  llm_indices.append(i)
@@ -68,10 +52,9 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
68
  else:
69
  bert_indices.append(i)
70
 
71
- # ── Step 2: BERT batch (ONNX handles its own multi-threading) ───────────
72
  if bert_indices:
73
  bert_msgs = [logs[i][1] for i in bert_indices]
74
-
75
  t_bert_start = time.perf_counter()
76
  bert_results = bert_batch(bert_msgs)
77
  t_bert_end = time.perf_counter()
@@ -84,11 +67,10 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
84
  else:
85
  llm_indices.append(idx)
86
 
87
- # ── Step 3: LLM (I/O Bound - Using Global Thread Pool) ──────────────────
88
  if llm_indices:
89
  def parallel_llm(idx):
90
  src, msg = logs[idx]
91
-
92
  t_llm_0 = time.perf_counter()
93
  label = cached_llm_call(msg)
94
  t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
@@ -98,32 +80,47 @@ def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
98
 
99
  return idx, _make_result(label, tier, None, t_llm_ms)
100
 
101
- # Delegate entirely to the pre-warmed global thread pool
102
- futures = [_llm_executor.submit(parallel_llm, idx) for idx in llm_indices]
103
- for future in futures:
104
- idx, res = future.result()
105
- results[idx] = res
106
 
107
  return results
108
 
 
 
 
 
 
109
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
110
- """Single-process batch processing (relying on ONNX C++ threads + Python network threads)"""
111
  df = pd.read_csv(input_path)
112
  required = {"source", "log_message"}
113
  if not required.issubset(df.columns):
114
- raise ValueError(f"Missing required columns in CSV. Expected: {required}. Found: {set(df.columns)}")
115
 
116
  log_pairs = list(zip(df["source"], df["log_message"]))
117
  total_logs = len(log_pairs)
118
 
119
- print(f"πŸ”₯ Processing {total_logs} logs (Thread Pool active for LLMs)...")
 
 
 
 
 
 
 
120
 
121
  t_start = time.perf_counter()
122
 
123
- # Process everything in one go - let classify_logs handle the internal batching
124
- results = classify_logs(log_pairs)
125
 
 
 
 
 
126
  t_end = time.perf_counter()
 
127
  print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")
128
 
129
  df["predicted_label"] = [r["label"] for r in results]
 
1
  """
2
+ classify.py β€” 3-Tier Hybrid Pipeline (V11 β€” MAX SPEED + SAFE MULTIPROCESSING)
 
 
 
 
 
 
 
 
 
3
  """
4
  from __future__ import annotations
5
  import os
6
  import time
7
  import statistics
8
  import pandas as pd
9
+ import multiprocessing as mp
10
  from functools import lru_cache
11
  from concurrent.futures import ThreadPoolExecutor
12
  from processor_regex import classify_with_regex
 
16
  # ── Config ──────────────────────────────────────────────────────────────────
17
  LEGACY_SOURCE = os.getenv("LEGACY_SOURCE", "LegacyCRM")
18
 
 
 
 
19
  # ── Result type ─────────────────────────────────────────────────────────────
20
  def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
21
  return {
 
25
  "latency_ms": round(latency_ms, 4),
26
  }
27
 
28
+ # ── Caching Layer ───────────────────────────────────────────────────────────
29
+ @lru_cache(maxsize=10000) # Reduced maxsize per-worker to prevent OOM
30
  def cached_llm_call(log_msg: str) -> str:
 
31
  return classify_with_llm(log_msg)
32
 
 
 
 
 
 
 
33
  def classify_logs(logs: list[tuple[str, str]]) -> list[dict]:
34
+ """Processes a chunk of logs."""
35
  n = len(logs)
36
  results = [None] * n
37
 
38
  llm_indices = []
39
  bert_indices = []
40
 
41
+ # Step 1: Regex (Now running on multiple cores in parallel!)
42
  for i, (source, log_msg) in enumerate(logs):
43
  if source == LEGACY_SOURCE:
44
  llm_indices.append(i)
 
52
  else:
53
  bert_indices.append(i)
54
 
55
+ # Step 2: BERT
56
  if bert_indices:
57
  bert_msgs = [logs[i][1] for i in bert_indices]
 
58
  t_bert_start = time.perf_counter()
59
  bert_results = bert_batch(bert_msgs)
60
  t_bert_end = time.perf_counter()
 
67
  else:
68
  llm_indices.append(idx)
69
 
70
+ # Step 3: LLM (Threaded inside each process)
71
  if llm_indices:
72
  def parallel_llm(idx):
73
  src, msg = logs[idx]
 
74
  t_llm_0 = time.perf_counter()
75
  label = cached_llm_call(msg)
76
  t_llm_ms = (time.perf_counter() - t_llm_0) * 1000
 
80
 
81
  return idx, _make_result(label, tier, None, t_llm_ms)
82
 
83
+ # Inner ThreadPool for API network requests
84
+ with ThreadPoolExecutor(max_workers=10) as executor:
85
+ for idx, res in executor.map(parallel_llm, llm_indices):
86
+ results[idx] = res
 
87
 
88
  return results
89
 
90
+ def _process_chunk(chunk: list[tuple[str, str]]) -> list[dict]:
91
+ """Helper function for mapping."""
92
+ return classify_logs(chunk)
93
+
94
+ # ── CSV batch classify (Safe Spawn Multiprocessing) ─────────────────────────
95
  def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple[str, pd.DataFrame]:
 
96
  df = pd.read_csv(input_path)
97
  required = {"source", "log_message"}
98
  if not required.issubset(df.columns):
99
+ raise ValueError(f"Missing required columns in CSV.")
100
 
101
  log_pairs = list(zip(df["source"], df["log_message"]))
102
  total_logs = len(log_pairs)
103
 
104
+ # Use max cores for speed, but leave 1 for the OS/Gradio UI
105
+ safe_cores = max(1, (os.cpu_count() or 1) - 1)
106
+ chunk_size = 5000 # Slightly smaller chunks so data copies faster between processes
107
+ chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]
108
+
109
+ results = []
110
+
111
+ print(f"πŸ”₯ Firing up {safe_cores} CPU Cores with SAFE SPAWN context...")
112
 
113
  t_start = time.perf_counter()
114
 
115
+ # FIX: Use 'spawn' context! This is the magic that prevents PyTorch/ONNX Segfaults
116
+ ctx = mp.get_context('spawn')
117
 
118
+ with ctx.ProcessPoolExecutor(max_workers=safe_cores) as executor:
119
+ for chunk_result in executor.map(_process_chunk, chunks):
120
+ results.extend(chunk_result)
121
+
122
  t_end = time.perf_counter()
123
+
124
  print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")
125
 
126
  df["predicted_label"] = [r["label"] for r in results]