File size: 9,831 Bytes
2222383
668419c
 
 
 
 
 
 
 
 
 
 
 
2222383
 
3f1a1d6
2222383
 
 
668419c
7d3f899
2222383
 
 
 
b90444b
 
2222383
668419c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1a9b340
2222383
 
 
 
 
 
668419c
2222383
 
1a9b340
 
8ca0e43
 
 
 
1a9b340
 
668419c
2222383
 
 
1a9b340
668419c
 
2222383
 
 
 
 
de30f06
2222383
668419c
2222383
de30f06
 
2222383
 
 
1a9b340
2222383
 
1a9b340
2222383
 
668419c
2222383
668419c
 
 
 
 
2222383
 
 
668419c
2222383
 
 
668419c
217890c
668419c
 
217890c
 
668419c
217890c
668419c
 
217890c
668419c
f2c6de4
668419c
 
 
217890c
 
1a9b340
 
 
 
 
2222383
 
 
1a9b340
 
668419c
 
 
 
 
 
 
 
1a9b340
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
668419c
1a9b340
 
 
 
 
 
 
 
 
 
668419c
 
2d11b15
 
1a9b340
 
668419c
1a9b340
 
 
2222383
 
 
1a9b340
2222383
668419c
3f1a1d6
668419c
 
1a9b340
668419c
 
 
2d11b15
668419c
2d11b15
668419c
1a9b340
668419c
ca8312a
1a9b340
2d11b15
 
b90444b
668419c
b90444b
2222383
668419c
 
 
f2c6de4
2222383
 
 
 
 
 
 
1a9b340
 
668419c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
"""
classify.py β€” 3-Tier Hybrid Pipeline (V10 β€” Bug Fixed)

Bug fixes vs V9:
  1. BERT latency was reporting cumulative sum of per-log values (= total batch ms),
     not actual per-log latency. Now stores real wall-clock batch time separately
     and reports true per-log ms.
  2. @lru_cache was per-process β€” with ProcessPoolExecutor every worker had its own
     cold cache, so cross-process cache hits were impossible. Replaced with a
     multiprocessing.Manager dict shared across all workers.
  3. LLM tier label was using a hard '<5ms' threshold to detect cache hits which
     was unreliable (cold process startup skews timings). Now uses an explicit
     boolean returned alongside the label.
"""
from __future__ import annotations
import os
import time
import statistics
import pandas as pd
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from processor_regex import classify_with_regex
from processor_bert  import classify_batch as bert_batch
from processor_llm   import classify_with_llm

# ── Config ──────────────────────────────────────────────────────────────────
LEGACY_SOURCE = os.getenv("LEGACY_SOURCE", "LegacyCRM")

# ── Shared cross-process LLM cache ──────────────────────────────────────────
# BUG FIX #2: @lru_cache is per-process. With ProcessPoolExecutor, every worker
# has its own private cache that never warms across processes.
# Using multiprocessing.Manager().dict() gives a single shared cache for all workers.
_manager = None
_shared_llm_cache = None

def _get_shared_cache():
    """Return (or lazily create) the shared cross-process LLM cache."""
    global _manager, _shared_llm_cache
    if _shared_llm_cache is None:
        _manager = multiprocessing.Manager()
        _shared_llm_cache = _manager.dict()
    return _shared_llm_cache


def _cached_llm_call(log_msg: str, cache: dict) -> tuple:
    """
    Call LLM with shared cross-process cache.
    Returns (label, cache_hit).
    BUG FIX #2: uses shared dict instead of @lru_cache so all worker processes
    benefit from each other's lookups.
    BUG FIX #3: returns explicit cache_hit boolean instead of inferring from latency.
    """
    if log_msg in cache:
        return cache[log_msg], True
    label = classify_with_llm(log_msg)
    cache[log_msg] = label
    return label, False


# ── Result type ─────────────────────────────────────────────────────────────
def _make_result(label: str, tier: str, confidence, latency_ms: float) -> dict:
    return {
        "label":      label,
        "tier":       tier,
        "confidence": confidence,
        "latency_ms": round(latency_ms, 4),
    }


# ── Single log (backward-compatible) ────────────────────────────────────────
def classify_log(source: str, log_msg: str) -> dict:
    results = classify_logs([(source, log_msg)])
    return results[0]


# ── Batch pipeline (main entry point) ───────────────────────────────────────
def classify_logs(logs: list) -> list:
    n       = len(logs)
    results = [None] * n

    # ── Step 1: Route to groups ─────────────────────────────────────────────
    llm_indices  = []
    bert_indices = []

    for i, (source, log_msg) in enumerate(logs):
        if source == LEGACY_SOURCE:
            llm_indices.append(i)
        else:
            t_start = time.perf_counter()
            label = classify_with_regex(log_msg)

            if label:
                latency_ms = (time.perf_counter() - t_start) * 1000
                results[i] = _make_result(label, "Regex", 1.0, latency_ms)
            else:
                bert_indices.append(i)

    # ── Step 2: BERT batch (CPU Bound) ──────────────────────────────────────
    if bert_indices:
        bert_msgs = [logs[i][1] for i in bert_indices]

        t_bert_start = time.perf_counter()
        bert_results = bert_batch(bert_msgs)
        t_bert_wall_ms = (time.perf_counter() - t_bert_start) * 1000

        # BUG FIX #1: store TRUE per-log wall-clock ms.
        # Old code did: bert_ms_per_log = total_ms / n, then assigned that same
        # value to every log. pipeline_summary() then summed all n copies back up
        # to total_ms β€” making BERT look like it took 2,962,635 ms on 2M logs.
        bert_per_log_ms = t_bert_wall_ms / len(bert_msgs)

        for idx, (label, conf) in zip(bert_indices, bert_results):
            if label != "Unclassified":
                results[idx] = _make_result(label, "BERT", conf, bert_per_log_ms)
            else:
                llm_indices.append(idx)

    # ── Step 3: LLM (I/O Bound β€” Threading Applied Here) ────────────────────
    if llm_indices:
        cache = _get_shared_cache()

        def parallel_llm(idx):
            src, msg = logs[idx]

            t_llm_0 = time.perf_counter()
            # BUG FIX #2 + #3: shared cache + explicit cache_hit flag
            label, cache_hit = _cached_llm_call(msg, cache)
            t_llm_ms = (time.perf_counter() - t_llm_0) * 1000

            base_tier = "LLM" if src == LEGACY_SOURCE else "LLM (fallback)"
            # BUG FIX #3: explicit boolean, not fragile latency threshold
            tier = f"{base_tier} (Cache Hit)" if cache_hit else f"{base_tier} (API Call)"

            return idx, _make_result(label, tier, None, t_llm_ms)

        with ThreadPoolExecutor() as executor:
            llm_results = list(executor.map(parallel_llm, llm_indices))

        for idx, res in llm_results:
            results[idx] = res

    return results


# ── Pipeline summary ─────────────────────────────────────────────────────────
def pipeline_summary(results: list) -> dict:
    """
    BUG FIX #1: With corrected per-log latency values (true wall-clock / n),
    total_ms now reflects real batch wall time instead of the old tautology of
    (total_ms/n) * n = total_ms that showed as 2,962,635 ms for BERT.
    """
    tier_groups = {}
    label_counts = {}

    for r in results:
        tier = r["tier"]
        tier_groups.setdefault(tier, []).append(r["latency_ms"])
        label_counts[r["label"]] = label_counts.get(r["label"], 0) + 1

    total = len(results)
    tier_stats = {}
    for tier, latencies in tier_groups.items():
        latencies_sorted = sorted(latencies)
        n = len(latencies_sorted)
        tier_stats[tier] = {
            "count":    n,
            "pct":      round(n / total * 100, 1),
            "p50_ms":   round(statistics.median(latencies_sorted), 4),
            "p95_ms":   round(latencies_sorted[min(int(n * 0.95), n - 1)], 4),
            "p99_ms":   round(latencies_sorted[min(int(n * 0.99), n - 1)], 4),
            "mean_ms":  round(statistics.mean(latencies_sorted), 4),
            "total_ms": round(sum(latencies_sorted), 4),
        }

    return {
        "total":        total,
        "tier_stats":   tier_stats,
        "label_counts": label_counts,
    }


# ── Multiprocessing Helper ───────────────────────────────────────────────────
def _process_chunk(chunk: list) -> list:
    """Top-level helper required for ProcessPoolExecutor mapping."""
    return classify_logs(chunk)


# ── CSV batch classify (Balanced Processing) ─────────────────────────────────
def classify_csv(input_path: str, output_path: str = "output.csv") -> tuple:
    """
    Balanced Batch Processing to prevent CPU Starvation UI crashes.
    """
    df = pd.read_csv(input_path)
    required = {"source", "log_message"}
    if not required.issubset(df.columns):
        raise ValueError(f"Missing required columns in CSV. Expected: {required}. Found: {set(df.columns)}")

    log_pairs  = list(zip(df["source"], df["log_message"]))
    total_logs = len(log_pairs)

    # Use exactly half the available CPU cores β€” leaves the other half for Gradio.
    safe_cores = max(1, os.cpu_count() // 2)

    # Chunk size of 10,000 prevents CPU lockups during inter-process pickling.
    chunk_size = 10000
    chunks = [log_pairs[i:i + chunk_size] for i in range(0, total_logs, chunk_size)]

    results = []

    print(f"πŸ”₯ Firing up {safe_cores} CPU Cores (Leaving remaining for UI)...")

    t_start = time.perf_counter()
    with ProcessPoolExecutor(max_workers=safe_cores) as executor:
        for chunk_result in executor.map(_process_chunk, chunks):
            results.extend(chunk_result)
    t_end = time.perf_counter()

    print(f"⏱️ True Wall-Clock Processing Time: {(t_end - t_start):.2f} seconds")

    df["predicted_label"] = [r["label"]      for r in results]
    df["tier_used"]       = [r["tier"]       for r in results]
    df["latency_ms"]      = [r["latency_ms"] for r in results]
    df["confidence"]      = [
        f"{r['confidence']:.1%}" if r["confidence"] is not None else "N/A"
        for r in results
    ]

    df.to_csv(output_path, index=False)
    return output_path, df


# Aliases
classify = classify_logs