File size: 3,755 Bytes
2222383
 
 
 
 
 
 
20781e1
2222383
 
 
 
 
 
 
 
 
 
 
5ef292a
 
2222383
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5ef292a
2222383
 
 
 
 
 
 
 
 
 
 
 
 
 
8e569c7
 
 
 
 
175e60b
2222383
8e569c7
2222383
5ef292a
2222383
175e60b
2222383
 
 
 
34a0633
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""
processor_llm.py β€” Tier 3: LLM-based Classifier
"""
from __future__ import annotations
import os
import time
import logging

from typing import Optional

logger = logging.getLogger(__name__)

# ── Config ─────────────────────────────────────────────────────────────────
HF_TOKEN   = os.getenv("HF_TOKEN")
LLM_MODEL  = "mistralai/Mistral-7B-Instruct-v0.3"

VALID_CATEGORIES = ["Workflow Error", "Deprecation Warning"]

MAX_RETRIES     = 2
RETRY_DELAY_SEC = 1.0   
REQUEST_TIMEOUT = 5     

SYSTEM_PROMPT = (
    "You are an enterprise log classifier. "
    "Classify log messages into exactly one category. "
    "Return ONLY the category name β€” no explanation, no punctuation."
)

FEW_SHOT_EXAMPLES = [
    {
        "log":   "Case escalation for ticket ID 7324 failed because the assigned support agent is no longer active.",
        "label": "Workflow Error",
    },
    {
        "log":   "The 'BulkEmailSender' feature is no longer supported. Use 'EmailCampaignManager' instead.",
        "label": "Deprecation Warning",
    },
    {
        "log":   "Invoice generation aborted for order ID 8910 due to invalid tax calculation module.",
        "label": "Workflow Error",
    },
]

def _build_messages(log_msg: str) -> list[dict]:
    categories_str = ", ".join(f'"{c}"' for c in VALID_CATEGORIES)
    user_content = (
        f'Classify the following log into one of these categories: {categories_str}.\n'
        'If none fits, return "Unclassified".\n\n'
    )
    for ex in FEW_SHOT_EXAMPLES:
        user_content += f'Log: {ex["log"]}\nCategory: {ex["label"]}\n\n'
    user_content += f"Log: {log_msg}\nCategory:"

    return [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user",   "content": user_content},
    ]

def _normalize(raw: str) -> str:
    raw = raw.strip().strip('"').strip("'")
    for cat in VALID_CATEGORIES:
        if cat.lower() in raw.lower():
            return cat
    return "Unclassified"

def classify_with_llm(log_msg: str) -> str:
    if not HF_TOKEN:
        logger.warning("[LLM] HF_TOKEN not set β€” returning Unclassified")
        return "Unclassified"

    from huggingface_hub import InferenceClient

    client  = InferenceClient(token=HF_TOKEN, timeout=REQUEST_TIMEOUT)
    delay   = RETRY_DELAY_SEC

    for attempt in range(1, MAX_RETRIES + 2): 
        try:
            response = client.chat.completions.create(
                model=LLM_MODEL,
                messages=_build_messages(log_msg),
                max_tokens=15,
                temperature=0.1,
            )
            raw   = response.choices[0].message.content
            label = _normalize(raw)

            logger.debug(f"[LLM] Attempt {attempt}: '{raw.strip()}' β†’ '{label}'")
            return label

        except Exception as e:
            # FIX: Safely check for HTTP 402 object attributes instead of raw string matching
            if hasattr(e, 'response') and e.response is not None:
                if getattr(e.response, 'status_code', None) == 402:
                    logger.error(f"[LLM] Credits Finished (402). Returning Unclassified.")
                    return "Unclassified"
            
            if attempt <= MAX_RETRIES:
                logger.warning(f"[LLM] Attempt {attempt} failed ({type(e).__name__}), retrying in {delay:.1f}s…")
                time.sleep(delay)
                delay *= 2  
            else:
                logger.error(f"[LLM] All attempts failed. Last error: {e}")

    return "Unclassified"

def classify_batch_llm(log_msgs: list[str]) -> list[str]:
    return [classify_with_llm(msg) for msg in log_msgs]