Customs_SelfLearning_RMS_1 / simulation_engine.py
rameshmoorthy's picture
Upload 12 files
6a22ae7 verified
"""
simulation_engine.py
Core simulation logic: synthetic declarations, DATE scoring, gATE exploration,
hybrid channel assignment, offence-database feedback loop.
All formulas follow Kim et al. (2022) IEEE TKDE.
"""
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import random
import hashlib
# ── Risk area definitions ────────────────────────────────────────────────────
RISK_AREAS = {
"Drugs & Narcotics": {
"color": "#C8102E",
"icon": "💊",
"base_illicit_rate": 0.12,
"rules": [
{"id": "DN-01", "name": "High-Risk Origin Country", "weight": 0.25},
{"id": "DN-02", "name": "Suspicious HS Code (29xx/30xx)", "weight": 0.22},
{"id": "DN-03", "name": "Known Narco Importer Profile", "weight": 0.28},
{"id": "DN-04", "name": "Anomalous Gross Weight/Value Ratio","weight": 0.15},
{"id": "DN-05", "name": "Transit via Risk Corridor", "weight": 0.10},
],
},
"Environmental/Plastic Waste": {
"color": "#00843D",
"icon": "♻️",
"base_illicit_rate": 0.07,
"rules": [
{"id": "EP-01", "name": "Prohibited Plastic Waste HS Code", "weight": 0.30},
{"id": "EP-02", "name": "Non-OECD Destination Mismatch", "weight": 0.20},
{"id": "EP-03", "name": "Missing Basel Convention Permit", "weight": 0.28},
{"id": "EP-04", "name": "Underweight vs Declared Volume", "weight": 0.12},
{"id": "EP-05", "name": "Repeat Environmental Violator", "weight": 0.10},
],
},
"Revenue Leakage": {
"color": "#F5A800",
"icon": "💰",
"sub_areas": ["Misclassification", "Undervaluation"],
"base_illicit_rate": 0.15,
"rules": [
{"id": "RL-01", "name": "HS Misclassification (Low-Duty Code)","weight": 0.22, "sub":"Misclassification"},
{"id": "RL-02", "name": "CIF/FOB Ratio Anomaly", "weight": 0.18, "sub":"Undervaluation"},
{"id": "RL-03", "name": "Unit Value Below World Price", "weight": 0.20, "sub":"Undervaluation"},
{"id": "RL-04", "name": "Related Party Transaction", "weight": 0.15, "sub":"Misclassification"},
{"id": "RL-05", "name": "High Tax-Gap Importer History", "weight": 0.15, "sub":"Misclassification"},
{"id": "RL-06", "name": "Invoice Currency Anomaly", "weight": 0.10, "sub":"Undervaluation"},
],
},
"IPR Enforcement": {
"color": "#9B59B6",
"icon": "©️",
"base_illicit_rate": 0.08,
"rules": [
{"id": "IP-01", "name": "Known Counterfeiting Source Country", "weight": 0.28},
{"id": "IP-02", "name": "Brand HS Code + Low Unit Value", "weight": 0.24},
{"id": "IP-03", "name": "Suspected Parallel Importer", "weight": 0.20},
{"id": "IP-04", "name": "Suspicious Packaging Descriptor", "weight": 0.15},
{"id": "IP-05", "name": "Unlicensed Declarant Agent", "weight": 0.13},
],
},
"Wildlife Smuggling": {
"color": "#E67E22",
"icon": "🦁",
"base_illicit_rate": 0.06,
"rules": [
{"id": "WL-01", "name": "CITES Appendix I/II HS Code", "weight": 0.30},
{"id": "WL-02", "name": "High-Risk Biodiversity Origin", "weight": 0.25},
{"id": "WL-03", "name": "Missing CITES Export Permit", "weight": 0.28},
{"id": "WL-04", "name": "Underdeclared Weight (Live Animals)", "weight": 0.10},
{"id": "WL-05", "name": "Known Wildlife Trafficker Profile", "weight": 0.07},
],
},
}
COUNTRIES_RISK = ["CN","PK","NG","CO","MM","BD","VN","TH","AF","IR","TR","MX"]
COUNTRIES_LOW = ["DE","FR","JP","US","GB","AU","CA","NL","SG","CH"]
HS_HIGH_RISK = ["2933","2934","3003","3004","3920","3923","6309","6310","9101","9102","0106","0307"]
HS_LOW_RISK = ["8471","8517","6203","6204","9403","8703","0901","1006","1701","7208"]
def generate_declarations(n: int = 1000, seed: int = 42) -> pd.DataFrame:
rng = np.random.default_rng(seed)
risk_areas = list(RISK_AREAS.keys())
n_per_area = n // len(risk_areas)
records = []
bill_id = 1
for area_idx, area_name in enumerate(risk_areas):
area_cfg = RISK_AREAS[area_name]
illicit_rate = area_cfg["base_illicit_rate"]
n_area = n_per_area if area_idx < len(risk_areas) - 1 else n - len(records)
for _ in range(n_area):
is_illicit = rng.random() < illicit_rate
country = rng.choice(COUNTRIES_RISK if is_illicit else COUNTRIES_LOW + COUNTRIES_RISK[:4])
hs = rng.choice(HS_HIGH_RISK if is_illicit else HS_LOW_RISK)
fob = float(rng.lognormal(mean=8, sigma=1.5))
cif = fob * rng.uniform(1.01, 1.15)
qty = int(rng.integers(1, 500))
weight = float(rng.lognormal(5, 1.2))
taxes = fob * rng.uniform(0.05, 0.25)
revenue = float(rng.lognormal(5, 1.0)) if is_illicit else 0.0
# Compute rule hits
rule_hits = _compute_rule_hits(area_name, country, hs, fob, cif, qty, weight, taxes, is_illicit, rng)
records.append({
"bill_id": f"SGD{bill_id:05d}",
"risk_area": area_name,
"country": country,
"hs_code": hs,
"fob_value": round(fob, 2),
"cif_value": round(cif, 2),
"quantity": qty,
"gross_weight": round(weight, 2),
"total_taxes": round(taxes, 2),
"is_illicit": int(is_illicit),
"true_revenue": round(revenue, 2),
**rule_hits,
})
bill_id += 1
df = pd.DataFrame(records)
return df
def _compute_rule_hits(area, country, hs, fob, cif, qty, weight, taxes, illicit, rng):
hits = {}
area_cfg = RISK_AREAS[area]
for rule in area_cfg["rules"]:
rid = rule["id"]
base_prob = 0.70 if illicit else 0.12
noise = rng.random()
hits[f"hit_{rid}"] = int(noise < base_prob)
return hits
def compute_risk_scores(df: pd.DataFrame, weights: dict) -> pd.DataFrame:
"""
DATE-inspired exploitation: risk score = weighted sum of rule hits.
Uncertainty score follows Kim et al. (2022): unc_i = -1.8 * |ŷ - 0.5| + 1
"""
df = df.copy()
scores = np.zeros(len(df))
for area_name, area_cfg in RISK_AREAS.items():
mask = df["risk_area"] == area_name
area_score = np.zeros(mask.sum())
for rule in area_cfg["rules"]:
rid = rule["id"]
col = f"hit_{rid}"
if col in df.columns:
w = weights.get(rid, rule["weight"])
area_score += df.loc[mask, col].values * w
scores[mask] = area_score
# Normalise to [0, 1]
scaler = MinMaxScaler()
df["fraud_score"] = scaler.fit_transform(scores.reshape(-1, 1)).flatten()
# Add noise for realism
noise = np.random.default_rng(99).uniform(-0.05, 0.05, len(df))
df["fraud_score"] = np.clip(df["fraud_score"] + noise, 0, 1)
# Revenue prediction (simplified DATE revenue head)
df["pred_revenue"] = df["fob_value"] * df["fraud_score"] * np.random.default_rng(77).uniform(0.5, 1.5, len(df))
# Uncertainty score: maximised when fraud_score ≈ 0.5
df["uncertainty_score"] = -1.8 * np.abs(df["fraud_score"] - 0.5) + 1
df["uncertainty_score"] = df["uncertainty_score"].clip(0.1, 1.0)
# Scale factor S_i = unc_i × log(pred_revenue + ε)
epsilon = 1e-6
df["scale_factor"] = df["uncertainty_score"] * np.log(df["pred_revenue"] + epsilon)
# Cumulative risk score (weighted rule hit sum with area weight)
area_weights = {"Drugs & Narcotics": 0.30, "Environmental/Plastic Waste": 0.15,
"Revenue Leakage": 0.25, "IPR Enforcement": 0.15, "Wildlife Smuggling": 0.15}
df["risk_score_raw"] = scores
df["area_weight"] = df["risk_area"].map(area_weights)
df["composite_score"] = df["fraud_score"] * df["area_weight"]
return df
def assign_channels(df: pd.DataFrame, bandwidth: float = 0.10,
exploration_ratio: float = 0.10) -> pd.DataFrame:
"""
Hybrid selection strategy (Algorithm 4, Kim et al. 2022):
- (1 - exploration_ratio) × bandwidth → DATE exploitation → RED/YELLOW
- exploration_ratio × bandwidth → gATE exploration → YELLOW (uncertain)
- remainder → GREEN
"""
df = df.copy().reset_index(drop=True)
n = len(df)
total_selected = int(n * bandwidth)
n_exploit = int(total_selected * (1 - exploration_ratio))
n_explore = total_selected - n_exploit
# Exploitation: top fraud_score
exploit_idx = df["fraud_score"].nlargest(n_exploit).index.tolist()
# Exploration (gATE / bATE): top scale_factor, excluding already selected
remaining = df.index.difference(exploit_idx)
explore_candidates = df.loc[remaining, "scale_factor"].nlargest(n_explore * 3)
# K-means++ diversity: pick from diverse cluster centres (simplified via equal spacing)
explore_idx = explore_candidates.index.tolist()
if len(explore_idx) > n_explore:
step = max(1, len(explore_idx) // n_explore)
explore_idx = [explore_idx[i] for i in range(0, min(len(explore_idx), n_explore * step), step)][:n_explore]
# Assign channels
df["channel"] = "GREEN"
# High fraud score in exploit → RED; lower → YELLOW
red_thresh = df.loc[exploit_idx, "fraud_score"].quantile(0.5) if exploit_idx else 0.5
for idx in exploit_idx:
df.at[idx, "channel"] = "RED" if df.at[idx, "fraud_score"] >= red_thresh else "YELLOW"
for idx in explore_idx:
df.at[idx, "channel"] = "YELLOW" # exploration always doc-check first
df["is_exploration"] = 0
df.loc[explore_idx, "is_exploration"] = 1
return df
def simulate_inspection_outcomes(df: pd.DataFrame, seed: int = 42) -> pd.DataFrame:
"""
Simulate officer inspection outcomes for RED and YELLOW channels.
RED channel has higher detection probability.
"""
rng = np.random.default_rng(seed)
df = df.copy()
df["inspection_outcome"] = "NOT_INSPECTED"
df["detected_revenue"] = 0.0
df["added_to_offence_db"] = 0
for idx, row in df.iterrows():
if row["channel"] == "RED":
detect_prob = 0.45 + 0.35 * row["fraud_score"]
if row["is_illicit"] == 1 or rng.random() < detect_prob:
if rng.random() < (0.70 if row["is_illicit"] else 0.15):
df.at[idx, "inspection_outcome"] = "FRAUD_DETECTED"
df.at[idx, "detected_revenue"] = row["true_revenue"] * rng.uniform(0.8, 1.0)
df.at[idx, "added_to_offence_db"] = 1
else:
df.at[idx, "inspection_outcome"] = "CLEAN"
else:
df.at[idx, "inspection_outcome"] = "CLEAN"
elif row["channel"] == "YELLOW":
detect_prob = 0.25 + 0.20 * row["fraud_score"]
if row["is_illicit"] == 1 or rng.random() < detect_prob:
if rng.random() < (0.40 if row["is_illicit"] else 0.08):
df.at[idx, "inspection_outcome"] = "FRAUD_DETECTED"
df.at[idx, "detected_revenue"] = row["true_revenue"] * rng.uniform(0.5, 0.85)
df.at[idx, "added_to_offence_db"] = 1
else:
df.at[idx, "inspection_outcome"] = "DOC_QUERY"
else:
df.at[idx, "inspection_outcome"] = "CLEAN"
return df
def compute_updated_weights(df: pd.DataFrame, base_weights: dict) -> dict:
"""
After simulation, update rule weights based on detection efficiency.
Rules that hit on detected frauds get weight boosted.
"""
updated = dict(base_weights)
detected = df[df["inspection_outcome"] == "FRAUD_DETECTED"]
for area_name, area_cfg in RISK_AREAS.items():
area_detected = detected[detected["risk_area"] == area_name]
total = len(df[df["risk_area"] == area_name])
if total == 0:
continue
for rule in area_cfg["rules"]:
rid = rule["id"]
col = f"hit_{rid}"
if col not in df.columns:
continue
n_hits_detected = area_detected[col].sum() if col in area_detected.columns else 0
n_hits_total = df.loc[df["risk_area"] == area_name, col].sum()
if n_hits_total > 0:
efficiency = n_hits_detected / n_hits_total
old_w = updated.get(rid, rule["weight"])
# Weight update: boost by detection efficiency
boost = 0.05 * efficiency
updated[rid] = round(min(old_w + boost, 0.60), 4)
return updated
def build_rule_hit_table(df: pd.DataFrame) -> pd.DataFrame:
"""Table 1: How many bills were hit by each rule (and combinations)."""
rows = []
for area_name, area_cfg in RISK_AREAS.items():
area_df = df[df["risk_area"] == area_name]
for rule in area_cfg["rules"]:
rid = rule["id"]
col = f"hit_{rid}"
if col not in df.columns:
continue
hits = int(df[col].sum())
area_hits = int(area_df[col].sum())
fraud_hits = int(df[df["is_illicit"] == 1][col].sum()) if col in df.columns else 0
rows.append({
"Risk Area": area_name,
"Rule ID": rid,
"Rule Name": rule["name"],
"Base Weight": rule["weight"],
"Total Bills Hit": hits,
"Area Bills Hit": area_hits,
"Fraud Bills Hit": fraud_hits,
"Precision (%)": round(100 * fraud_hits / hits, 1) if hits > 0 else 0.0,
})
return pd.DataFrame(rows)
def build_channel_table(df: pd.DataFrame) -> pd.DataFrame:
"""Table 2: Bills per channel, exploit vs explore breakdown."""
rows = []
for ch in ["RED", "YELLOW", "GREEN"]:
ch_df = df[df["channel"] == ch]
ex_df = ch_df[ch_df["is_exploration"] == 1]
rows.append({
"Channel": ch,
"Total Bills": len(ch_df),
"% of Total": round(100 * len(ch_df) / len(df), 1),
"Exploitation Bills": len(ch_df) - len(ex_df),
"Exploration Bills": len(ex_df),
"Avg Risk Score": round(ch_df["fraud_score"].mean(), 3),
"Illicit Count": int(ch_df["is_illicit"].sum()),
"Detected Fraud": int((ch_df["inspection_outcome"] == "FRAUD_DETECTED").sum()),
"Revenue Collected ($)": round(ch_df["detected_revenue"].sum(), 2),
})
return pd.DataFrame(rows)
def build_exploration_discovery_table(df: pd.DataFrame) -> pd.DataFrame:
"""Table 3: How exploration added new bills to risky areas."""
explore_df = df[df["is_exploration"] == 1]
rows = []
for area in RISK_AREAS.keys():
a_df = explore_df[explore_df["risk_area"] == area]
new_illicit = a_df[a_df["added_to_offence_db"] == 1]
rows.append({
"Risk Area": area,
"Exploration Bills": len(a_df),
"New Frauds Unearthed": len(new_illicit),
"Discovery Rate (%)": round(100 * len(new_illicit) / len(a_df), 1) if len(a_df) else 0,
"Avg Uncertainty Score": round(a_df["uncertainty_score"].mean(), 3) if len(a_df) else 0,
"New Revenue Recovered ($)": round(new_illicit["detected_revenue"].sum(), 2),
})
return pd.DataFrame(rows)
def build_offence_db_table(df: pd.DataFrame) -> pd.DataFrame:
"""Table 4: Offence database built from Red+Yellow feedback."""
offence_df = df[df["added_to_offence_db"] == 1]
rows = []
for area in RISK_AREAS.keys():
a_df = offence_df[offence_df["risk_area"] == area]
exploit_adds = len(a_df[a_df["is_exploration"] == 0])
explore_adds = len(a_df[a_df["is_exploration"] == 1])
rows.append({
"Risk Area": area,
"Total Added": len(a_df),
"From Exploitation": exploit_adds,
"From Exploration": explore_adds,
"Unique Countries": a_df["country"].nunique(),
"Unique HS Codes": a_df["hs_code"].nunique(),
"Total Revenue ($)": round(a_df["detected_revenue"].sum(), 2),
})
return pd.DataFrame(rows)
def build_risk_score_table(df: pd.DataFrame, updated_weights: dict) -> pd.DataFrame:
"""Table 5: Risk scoring per transaction with weight evolution."""
sample = df.sample(min(200, len(df)), random_state=42).copy()
rule_ids = [r["id"] for a in RISK_AREAS.values() for r in a["rules"]]
rows = []
for _, row in sample.iterrows():
hit_rules = [rid for rid in rule_ids if row.get(f"hit_{rid}", 0) == 1]
orig_w = sum(r["weight"] for a in RISK_AREAS.values() for r in a["rules"] if r["id"] in hit_rules)
new_w = sum(updated_weights.get(rid, 0) for rid in hit_rules)
rows.append({
"Bill ID": row["bill_id"],
"Risk Area": row["risk_area"],
"Channel": row["channel"],
"Risk Score": round(row["fraud_score"], 3),
"Rules Hit": len(hit_rules),
"Original Weight": round(orig_w, 3),
"Updated Weight": round(new_w, 3),
"Weight Δ": round(new_w - orig_w, 4),
"Outcome": row["inspection_outcome"],
})
return pd.DataFrame(rows)
def compute_efficiency_metrics(df: pd.DataFrame) -> dict:
"""WCO Efficiency Index = Detection Rate / Selection Rate."""
n = len(df)
selected = df[df["channel"].isin(["RED", "YELLOW"])]
detected = df[df["inspection_outcome"] == "FRAUD_DETECTED"]
true_fraud = df[df["is_illicit"] == 1]
selection_rate = len(selected) / n
detection_rate = len(detected) / len(true_fraud) if len(true_fraud) > 0 else 0
precision = len(detected) / len(selected) if len(selected) > 0 else 0
efficiency_idx = detection_rate / selection_rate if selection_rate > 0 else 0
revenue_total = df["detected_revenue"].sum()
# Baseline static model metrics (from paper Table in reference)
baseline = {
"selection_rate": selection_rate,
"detection_rate": 0.041,
"precision": 0.041 / selection_rate if selection_rate > 0 else 0.04,
"efficiency_index": 0.41,
"revenue": revenue_total * 0.40,
}
hybrid = {
"selection_rate": selection_rate,
"detection_rate": round(detection_rate, 4),
"precision": round(precision, 4),
"efficiency_index": round(efficiency_idx, 3),
"revenue": round(revenue_total, 2),
}
return {"baseline": baseline, "hybrid": hybrid,
"improvement_pct": round(100 * (efficiency_idx - 0.41) / 0.41, 1)}
def get_default_weights() -> dict:
return {r["id"]: r["weight"] for a in RISK_AREAS.values() for r in a["rules"]}