File size: 6,556 Bytes
6ba100e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""
classifier.py β€” Production Rule-Based Email Classifier
=======================================================
Shared by SageMaker inference.py and Lambda handler.py.
Zero heavy dependencies β€” no numpy, no gymnasium.

Key fix vs lambda/classifier.py:
  "legal" removed from _LEGAL_SECURITY_KW β€” it is a deception keyword
  in phishing emails (TC-H-09), not a routing signal. Context field
  is the authoritative source for legal routing.
"""

# ── Label maps ────────────────────────────────────────────────────────────────
URGENCY_LABELS    = {0: "General",       1: "Billing",      2: "Security Breach"}
ROUTING_LABELS    = {0: "AI Auto-Reply", 1: "Tech Support", 2: "Legal"}
RESOLUTION_LABELS = {0: "Archive",       1: "Draft Reply",  2: "Escalate"}

# Security emails that need Legal routing (ransomware / extortion / IP theft).
# NOTE: "legal" intentionally excluded β€” it appears in phishing deception text.
_LEGAL_SECURITY_KW   = {"lawsuit", "attorney", "sue", "ransomware", "extortion"}

# Only "refund" escalates billing to Legal β€” "overdue" stays routine.
_BILLING_ESCALATE_KW = {"refund"}

# Canonical keyword vocabulary (must match environment.py KEYWORD_VOCAB)
KEYWORD_VOCAB = [
    "invoice",  "payment",      "overdue",  "refund",
    "hacked",   "breach",       "unauthorized", "password",
    "crash",    "error",        "bug",      "slow",
    "lawsuit",  "legal",        "attorney", "sue",
    "spam",     "offer",        "win",      "free",
    "urgent",   "critical",     "angry",    "threat",
]

# Words used for sentiment scoring
_NEG_WORDS = {
    "angry", "threat", "hacked", "breach", "lawsuit", "overdue",
    "unauthorized", "ransomware", "critical", "urgent", "error",
    "crash", "bug", "refund",
}
_POS_WORDS = {"win", "free", "offer", "congratulations", "prize"}


# ── Feature extraction ────────────────────────────────────────────────────────

def extract_features(subject: str, body: str) -> dict:
    """
    Parse raw email text β†’ feature dict {keywords, sentiment, context}.
    Used when the caller does not supply pre-computed features.
    """
    text   = (subject + " " + body).lower()
    tokens = set(text.split())

    keywords = [kw for kw in KEYWORD_VOCAB if kw in tokens]
    kw_set   = set(keywords)

    # Sentiment
    neg_hits = len(tokens & _NEG_WORDS)
    pos_hits = len(tokens & _POS_WORDS)
    if neg_hits > pos_hits:
        sentiment = "negative"
    elif pos_hits > 0:
        sentiment = "positive"
    else:
        sentiment = "neutral"

    # Context β€” priority order matches the classifier decision tree
    if kw_set & {"hacked", "breach", "unauthorized", "ransomware"}:
        context = "security"
    elif kw_set & {"lawsuit", "attorney", "sue"}:
        context = "legal"
    elif kw_set & {"invoice", "payment", "overdue", "refund"}:
        context = "billing"
    elif kw_set & {"crash", "error", "bug", "slow", "password"}:
        context = "tech"
    elif kw_set & {"spam", "offer", "win", "free"}:
        context = "spam"
    else:
        context = "general"

    return {"keywords": keywords, "sentiment": sentiment, "context": context}


# ── Classifier ────────────────────────────────────────────────────────────────

def classify(email: dict) -> tuple[int, int, int]:
    """
    Deterministic rule-based classifier.
    Returns (urgency, routing, resolution) as plain ints.

    Decision tree β€” first match wins:
      Rule 1  legal context OR lawsuit/attorney/sue keywords β†’ (2, 2, 2)
      Rule 2a security + ransomware/extortion/hacked+breach  β†’ (2, 2, 2)
      Rule 2b security (account-level attack)                β†’ (2, 1, 2)
      Rule 3  billing + refund keyword                       β†’ (1, 2, 2)
      Rule 4  billing routine                                β†’ (1, 0, 1)
      Rule 5  tech context or crash/error/bug/slow           β†’ (0, 1, 1)
      Rule 6  spam / default                                 β†’ (0, 0, 0)
    """
    kw      = set(email.get("keywords", []))
    context = email.get("context", "").lower()

    # Rule 1 β€” Legal
    if context == "legal" or kw & {"lawsuit", "attorney", "sue"}:
        return (2, 2, 2)

    # Rule 2 β€” Security
    if context == "security":
        if kw & _LEGAL_SECURITY_KW or ("hacked" in kw and "breach" in kw):
            return (2, 2, 2)   # ransomware / extortion β†’ Legal
        return (2, 1, 2)       # account-level attack   β†’ Tech Support

    # Rule 3 & 4 β€” Billing
    if context == "billing":
        return (1, 2, 2) if kw & _BILLING_ESCALATE_KW else (1, 0, 1)

    # Rule 5 β€” Tech
    if context == "tech" or kw & {"crash", "error", "bug", "slow"}:
        return (0, 1, 1)

    # Rule 6 β€” Spam / default
    return (0, 0, 0)


# ── Decoder ───────────────────────────────────────────────────────────────────

def decode(urgency: int, routing: int, resolution: int) -> dict:
    """Convert integer action codes to human-readable label dict."""
    return {
        "urgency":    URGENCY_LABELS[urgency],
        "routing":    ROUTING_LABELS[routing],
        "resolution": RESOLUTION_LABELS[resolution],
    }


# ── Batch helper ─────────────────────────────────────────────────────────────

def classify_batch(emails: list[dict]) -> list[dict]:
    """
    Classify a list of email dicts in one call.
    Each dict may contain pre-computed features OR raw subject+body.
    Returns a list of decode() dicts with codes attached.
    """
    results = []
    for email in emails:
        if not email.get("context"):
            features = extract_features(
                email.get("subject", ""),
                email.get("body", ""),
            )
        else:
            features = email

        u, r, res = classify(features)
        result    = decode(u, r, res)
        result.update({"urgency_code": u, "routing_code": r, "resolution_code": res})
        results.append(result)
    return results