File size: 6,556 Bytes
6ba100e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 | """
classifier.py β Production Rule-Based Email Classifier
=======================================================
Shared by SageMaker inference.py and Lambda handler.py.
Zero heavy dependencies β no numpy, no gymnasium.
Key fix vs lambda/classifier.py:
"legal" removed from _LEGAL_SECURITY_KW β it is a deception keyword
in phishing emails (TC-H-09), not a routing signal. Context field
is the authoritative source for legal routing.
"""
# ββ Label maps ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
URGENCY_LABELS = {0: "General", 1: "Billing", 2: "Security Breach"}
ROUTING_LABELS = {0: "AI Auto-Reply", 1: "Tech Support", 2: "Legal"}
RESOLUTION_LABELS = {0: "Archive", 1: "Draft Reply", 2: "Escalate"}
# Security emails that need Legal routing (ransomware / extortion / IP theft).
# NOTE: "legal" intentionally excluded β it appears in phishing deception text.
_LEGAL_SECURITY_KW = {"lawsuit", "attorney", "sue", "ransomware", "extortion"}
# Only "refund" escalates billing to Legal β "overdue" stays routine.
_BILLING_ESCALATE_KW = {"refund"}
# Canonical keyword vocabulary (must match environment.py KEYWORD_VOCAB)
KEYWORD_VOCAB = [
"invoice", "payment", "overdue", "refund",
"hacked", "breach", "unauthorized", "password",
"crash", "error", "bug", "slow",
"lawsuit", "legal", "attorney", "sue",
"spam", "offer", "win", "free",
"urgent", "critical", "angry", "threat",
]
# Words used for sentiment scoring
_NEG_WORDS = {
"angry", "threat", "hacked", "breach", "lawsuit", "overdue",
"unauthorized", "ransomware", "critical", "urgent", "error",
"crash", "bug", "refund",
}
_POS_WORDS = {"win", "free", "offer", "congratulations", "prize"}
# ββ Feature extraction ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def extract_features(subject: str, body: str) -> dict:
"""
Parse raw email text β feature dict {keywords, sentiment, context}.
Used when the caller does not supply pre-computed features.
"""
text = (subject + " " + body).lower()
tokens = set(text.split())
keywords = [kw for kw in KEYWORD_VOCAB if kw in tokens]
kw_set = set(keywords)
# Sentiment
neg_hits = len(tokens & _NEG_WORDS)
pos_hits = len(tokens & _POS_WORDS)
if neg_hits > pos_hits:
sentiment = "negative"
elif pos_hits > 0:
sentiment = "positive"
else:
sentiment = "neutral"
# Context β priority order matches the classifier decision tree
if kw_set & {"hacked", "breach", "unauthorized", "ransomware"}:
context = "security"
elif kw_set & {"lawsuit", "attorney", "sue"}:
context = "legal"
elif kw_set & {"invoice", "payment", "overdue", "refund"}:
context = "billing"
elif kw_set & {"crash", "error", "bug", "slow", "password"}:
context = "tech"
elif kw_set & {"spam", "offer", "win", "free"}:
context = "spam"
else:
context = "general"
return {"keywords": keywords, "sentiment": sentiment, "context": context}
# ββ Classifier ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def classify(email: dict) -> tuple[int, int, int]:
"""
Deterministic rule-based classifier.
Returns (urgency, routing, resolution) as plain ints.
Decision tree β first match wins:
Rule 1 legal context OR lawsuit/attorney/sue keywords β (2, 2, 2)
Rule 2a security + ransomware/extortion/hacked+breach β (2, 2, 2)
Rule 2b security (account-level attack) β (2, 1, 2)
Rule 3 billing + refund keyword β (1, 2, 2)
Rule 4 billing routine β (1, 0, 1)
Rule 5 tech context or crash/error/bug/slow β (0, 1, 1)
Rule 6 spam / default β (0, 0, 0)
"""
kw = set(email.get("keywords", []))
context = email.get("context", "").lower()
# Rule 1 β Legal
if context == "legal" or kw & {"lawsuit", "attorney", "sue"}:
return (2, 2, 2)
# Rule 2 β Security
if context == "security":
if kw & _LEGAL_SECURITY_KW or ("hacked" in kw and "breach" in kw):
return (2, 2, 2) # ransomware / extortion β Legal
return (2, 1, 2) # account-level attack β Tech Support
# Rule 3 & 4 β Billing
if context == "billing":
return (1, 2, 2) if kw & _BILLING_ESCALATE_KW else (1, 0, 1)
# Rule 5 β Tech
if context == "tech" or kw & {"crash", "error", "bug", "slow"}:
return (0, 1, 1)
# Rule 6 β Spam / default
return (0, 0, 0)
# ββ Decoder βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def decode(urgency: int, routing: int, resolution: int) -> dict:
"""Convert integer action codes to human-readable label dict."""
return {
"urgency": URGENCY_LABELS[urgency],
"routing": ROUTING_LABELS[routing],
"resolution": RESOLUTION_LABELS[resolution],
}
# ββ Batch helper βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def classify_batch(emails: list[dict]) -> list[dict]:
"""
Classify a list of email dicts in one call.
Each dict may contain pre-computed features OR raw subject+body.
Returns a list of decode() dicts with codes attached.
"""
results = []
for email in emails:
if not email.get("context"):
features = extract_features(
email.get("subject", ""),
email.get("body", ""),
)
else:
features = email
u, r, res = classify(features)
result = decode(u, r, res)
result.update({"urgency_code": u, "routing_code": r, "resolution_code": res})
results.append(result)
return results
|