| """ |
| simulation_engine.py |
| Core simulation logic: synthetic declarations, DATE scoring, gATE exploration, |
| hybrid channel assignment, offence-database feedback loop. |
| All formulas follow Kim et al. (2022) IEEE TKDE. |
| """ |
|
|
| import numpy as np |
| import pandas as pd |
| from sklearn.preprocessing import MinMaxScaler |
| import random |
| import hashlib |
|
|
| |
| RISK_AREAS = { |
| "Drugs & Narcotics": { |
| "color": "#C8102E", |
| "icon": "💊", |
| "base_illicit_rate": 0.12, |
| "rules": [ |
| {"id": "DN-01", "name": "High-Risk Origin Country", "weight": 0.25}, |
| {"id": "DN-02", "name": "Suspicious HS Code (29xx/30xx)", "weight": 0.22}, |
| {"id": "DN-03", "name": "Known Narco Importer Profile", "weight": 0.28}, |
| {"id": "DN-04", "name": "Anomalous Gross Weight/Value Ratio","weight": 0.15}, |
| {"id": "DN-05", "name": "Transit via Risk Corridor", "weight": 0.10}, |
| ], |
| }, |
| "Environmental/Plastic Waste": { |
| "color": "#00843D", |
| "icon": "♻️", |
| "base_illicit_rate": 0.07, |
| "rules": [ |
| {"id": "EP-01", "name": "Prohibited Plastic Waste HS Code", "weight": 0.30}, |
| {"id": "EP-02", "name": "Non-OECD Destination Mismatch", "weight": 0.20}, |
| {"id": "EP-03", "name": "Missing Basel Convention Permit", "weight": 0.28}, |
| {"id": "EP-04", "name": "Underweight vs Declared Volume", "weight": 0.12}, |
| {"id": "EP-05", "name": "Repeat Environmental Violator", "weight": 0.10}, |
| ], |
| }, |
| "Revenue Leakage": { |
| "color": "#F5A800", |
| "icon": "💰", |
| "sub_areas": ["Misclassification", "Undervaluation"], |
| "base_illicit_rate": 0.15, |
| "rules": [ |
| {"id": "RL-01", "name": "HS Misclassification (Low-Duty Code)","weight": 0.22, "sub":"Misclassification"}, |
| {"id": "RL-02", "name": "CIF/FOB Ratio Anomaly", "weight": 0.18, "sub":"Undervaluation"}, |
| {"id": "RL-03", "name": "Unit Value Below World Price", "weight": 0.20, "sub":"Undervaluation"}, |
| {"id": "RL-04", "name": "Related Party Transaction", "weight": 0.15, "sub":"Misclassification"}, |
| {"id": "RL-05", "name": "High Tax-Gap Importer History", "weight": 0.15, "sub":"Misclassification"}, |
| {"id": "RL-06", "name": "Invoice Currency Anomaly", "weight": 0.10, "sub":"Undervaluation"}, |
| ], |
| }, |
| "IPR Enforcement": { |
| "color": "#9B59B6", |
| "icon": "©️", |
| "base_illicit_rate": 0.08, |
| "rules": [ |
| {"id": "IP-01", "name": "Known Counterfeiting Source Country", "weight": 0.28}, |
| {"id": "IP-02", "name": "Brand HS Code + Low Unit Value", "weight": 0.24}, |
| {"id": "IP-03", "name": "Suspected Parallel Importer", "weight": 0.20}, |
| {"id": "IP-04", "name": "Suspicious Packaging Descriptor", "weight": 0.15}, |
| {"id": "IP-05", "name": "Unlicensed Declarant Agent", "weight": 0.13}, |
| ], |
| }, |
| "Wildlife Smuggling": { |
| "color": "#E67E22", |
| "icon": "🦁", |
| "base_illicit_rate": 0.06, |
| "rules": [ |
| {"id": "WL-01", "name": "CITES Appendix I/II HS Code", "weight": 0.30}, |
| {"id": "WL-02", "name": "High-Risk Biodiversity Origin", "weight": 0.25}, |
| {"id": "WL-03", "name": "Missing CITES Export Permit", "weight": 0.28}, |
| {"id": "WL-04", "name": "Underdeclared Weight (Live Animals)", "weight": 0.10}, |
| {"id": "WL-05", "name": "Known Wildlife Trafficker Profile", "weight": 0.07}, |
| ], |
| }, |
| } |
|
|
| COUNTRIES_RISK = ["CN","PK","NG","CO","MM","BD","VN","TH","AF","IR","TR","MX"] |
| COUNTRIES_LOW = ["DE","FR","JP","US","GB","AU","CA","NL","SG","CH"] |
| HS_HIGH_RISK = ["2933","2934","3003","3004","3920","3923","6309","6310","9101","9102","0106","0307"] |
| HS_LOW_RISK = ["8471","8517","6203","6204","9403","8703","0901","1006","1701","7208"] |
|
|
|
|
| def generate_declarations(n: int = 1000, seed: int = 42) -> pd.DataFrame: |
| rng = np.random.default_rng(seed) |
| risk_areas = list(RISK_AREAS.keys()) |
| n_per_area = n // len(risk_areas) |
|
|
| records = [] |
| bill_id = 1 |
|
|
| for area_idx, area_name in enumerate(risk_areas): |
| area_cfg = RISK_AREAS[area_name] |
| illicit_rate = area_cfg["base_illicit_rate"] |
| n_area = n_per_area if area_idx < len(risk_areas) - 1 else n - len(records) |
|
|
| for _ in range(n_area): |
| is_illicit = rng.random() < illicit_rate |
| country = rng.choice(COUNTRIES_RISK if is_illicit else COUNTRIES_LOW + COUNTRIES_RISK[:4]) |
| hs = rng.choice(HS_HIGH_RISK if is_illicit else HS_LOW_RISK) |
| fob = float(rng.lognormal(mean=8, sigma=1.5)) |
| cif = fob * rng.uniform(1.01, 1.15) |
| qty = int(rng.integers(1, 500)) |
| weight = float(rng.lognormal(5, 1.2)) |
| taxes = fob * rng.uniform(0.05, 0.25) |
| revenue = float(rng.lognormal(5, 1.0)) if is_illicit else 0.0 |
|
|
| |
| rule_hits = _compute_rule_hits(area_name, country, hs, fob, cif, qty, weight, taxes, is_illicit, rng) |
|
|
| records.append({ |
| "bill_id": f"SGD{bill_id:05d}", |
| "risk_area": area_name, |
| "country": country, |
| "hs_code": hs, |
| "fob_value": round(fob, 2), |
| "cif_value": round(cif, 2), |
| "quantity": qty, |
| "gross_weight": round(weight, 2), |
| "total_taxes": round(taxes, 2), |
| "is_illicit": int(is_illicit), |
| "true_revenue": round(revenue, 2), |
| **rule_hits, |
| }) |
| bill_id += 1 |
|
|
| df = pd.DataFrame(records) |
| return df |
|
|
|
|
| def _compute_rule_hits(area, country, hs, fob, cif, qty, weight, taxes, illicit, rng): |
| hits = {} |
| area_cfg = RISK_AREAS[area] |
| for rule in area_cfg["rules"]: |
| rid = rule["id"] |
| base_prob = 0.70 if illicit else 0.12 |
| noise = rng.random() |
| hits[f"hit_{rid}"] = int(noise < base_prob) |
| return hits |
|
|
|
|
| def compute_risk_scores(df: pd.DataFrame, weights: dict) -> pd.DataFrame: |
| """ |
| DATE-inspired exploitation: risk score = weighted sum of rule hits. |
| Uncertainty score follows Kim et al. (2022): unc_i = -1.8 * |ŷ - 0.5| + 1 |
| """ |
| df = df.copy() |
| scores = np.zeros(len(df)) |
|
|
| for area_name, area_cfg in RISK_AREAS.items(): |
| mask = df["risk_area"] == area_name |
| area_score = np.zeros(mask.sum()) |
| for rule in area_cfg["rules"]: |
| rid = rule["id"] |
| col = f"hit_{rid}" |
| if col in df.columns: |
| w = weights.get(rid, rule["weight"]) |
| area_score += df.loc[mask, col].values * w |
| scores[mask] = area_score |
|
|
| |
| scaler = MinMaxScaler() |
| df["fraud_score"] = scaler.fit_transform(scores.reshape(-1, 1)).flatten() |
|
|
| |
| noise = np.random.default_rng(99).uniform(-0.05, 0.05, len(df)) |
| df["fraud_score"] = np.clip(df["fraud_score"] + noise, 0, 1) |
|
|
| |
| df["pred_revenue"] = df["fob_value"] * df["fraud_score"] * np.random.default_rng(77).uniform(0.5, 1.5, len(df)) |
|
|
| |
| df["uncertainty_score"] = -1.8 * np.abs(df["fraud_score"] - 0.5) + 1 |
| df["uncertainty_score"] = df["uncertainty_score"].clip(0.1, 1.0) |
|
|
| |
| epsilon = 1e-6 |
| df["scale_factor"] = df["uncertainty_score"] * np.log(df["pred_revenue"] + epsilon) |
|
|
| |
| area_weights = {"Drugs & Narcotics": 0.30, "Environmental/Plastic Waste": 0.15, |
| "Revenue Leakage": 0.25, "IPR Enforcement": 0.15, "Wildlife Smuggling": 0.15} |
| df["risk_score_raw"] = scores |
| df["area_weight"] = df["risk_area"].map(area_weights) |
| df["composite_score"] = df["fraud_score"] * df["area_weight"] |
|
|
| return df |
|
|
|
|
| def assign_channels(df: pd.DataFrame, bandwidth: float = 0.10, |
| exploration_ratio: float = 0.10) -> pd.DataFrame: |
| """ |
| Hybrid selection strategy (Algorithm 4, Kim et al. 2022): |
| - (1 - exploration_ratio) × bandwidth → DATE exploitation → RED/YELLOW |
| - exploration_ratio × bandwidth → gATE exploration → YELLOW (uncertain) |
| - remainder → GREEN |
| """ |
| df = df.copy().reset_index(drop=True) |
| n = len(df) |
| total_selected = int(n * bandwidth) |
| n_exploit = int(total_selected * (1 - exploration_ratio)) |
| n_explore = total_selected - n_exploit |
|
|
| |
| exploit_idx = df["fraud_score"].nlargest(n_exploit).index.tolist() |
|
|
| |
| remaining = df.index.difference(exploit_idx) |
| explore_candidates = df.loc[remaining, "scale_factor"].nlargest(n_explore * 3) |
| |
| explore_idx = explore_candidates.index.tolist() |
| if len(explore_idx) > n_explore: |
| step = max(1, len(explore_idx) // n_explore) |
| explore_idx = [explore_idx[i] for i in range(0, min(len(explore_idx), n_explore * step), step)][:n_explore] |
|
|
| |
| df["channel"] = "GREEN" |
| |
| red_thresh = df.loc[exploit_idx, "fraud_score"].quantile(0.5) if exploit_idx else 0.5 |
| for idx in exploit_idx: |
| df.at[idx, "channel"] = "RED" if df.at[idx, "fraud_score"] >= red_thresh else "YELLOW" |
| for idx in explore_idx: |
| df.at[idx, "channel"] = "YELLOW" |
|
|
| df["is_exploration"] = 0 |
| df.loc[explore_idx, "is_exploration"] = 1 |
|
|
| return df |
|
|
|
|
| def simulate_inspection_outcomes(df: pd.DataFrame, seed: int = 42) -> pd.DataFrame: |
| """ |
| Simulate officer inspection outcomes for RED and YELLOW channels. |
| RED channel has higher detection probability. |
| """ |
| rng = np.random.default_rng(seed) |
| df = df.copy() |
| df["inspection_outcome"] = "NOT_INSPECTED" |
| df["detected_revenue"] = 0.0 |
| df["added_to_offence_db"] = 0 |
|
|
| for idx, row in df.iterrows(): |
| if row["channel"] == "RED": |
| detect_prob = 0.45 + 0.35 * row["fraud_score"] |
| if row["is_illicit"] == 1 or rng.random() < detect_prob: |
| if rng.random() < (0.70 if row["is_illicit"] else 0.15): |
| df.at[idx, "inspection_outcome"] = "FRAUD_DETECTED" |
| df.at[idx, "detected_revenue"] = row["true_revenue"] * rng.uniform(0.8, 1.0) |
| df.at[idx, "added_to_offence_db"] = 1 |
| else: |
| df.at[idx, "inspection_outcome"] = "CLEAN" |
| else: |
| df.at[idx, "inspection_outcome"] = "CLEAN" |
|
|
| elif row["channel"] == "YELLOW": |
| detect_prob = 0.25 + 0.20 * row["fraud_score"] |
| if row["is_illicit"] == 1 or rng.random() < detect_prob: |
| if rng.random() < (0.40 if row["is_illicit"] else 0.08): |
| df.at[idx, "inspection_outcome"] = "FRAUD_DETECTED" |
| df.at[idx, "detected_revenue"] = row["true_revenue"] * rng.uniform(0.5, 0.85) |
| df.at[idx, "added_to_offence_db"] = 1 |
| else: |
| df.at[idx, "inspection_outcome"] = "DOC_QUERY" |
| else: |
| df.at[idx, "inspection_outcome"] = "CLEAN" |
|
|
| return df |
|
|
|
|
| def compute_updated_weights(df: pd.DataFrame, base_weights: dict) -> dict: |
| """ |
| After simulation, update rule weights based on detection efficiency. |
| Rules that hit on detected frauds get weight boosted. |
| """ |
| updated = dict(base_weights) |
| detected = df[df["inspection_outcome"] == "FRAUD_DETECTED"] |
|
|
| for area_name, area_cfg in RISK_AREAS.items(): |
| area_detected = detected[detected["risk_area"] == area_name] |
| total = len(df[df["risk_area"] == area_name]) |
| if total == 0: |
| continue |
| for rule in area_cfg["rules"]: |
| rid = rule["id"] |
| col = f"hit_{rid}" |
| if col not in df.columns: |
| continue |
| n_hits_detected = area_detected[col].sum() if col in area_detected.columns else 0 |
| n_hits_total = df.loc[df["risk_area"] == area_name, col].sum() |
| if n_hits_total > 0: |
| efficiency = n_hits_detected / n_hits_total |
| old_w = updated.get(rid, rule["weight"]) |
| |
| boost = 0.05 * efficiency |
| updated[rid] = round(min(old_w + boost, 0.60), 4) |
|
|
| return updated |
|
|
|
|
| def build_rule_hit_table(df: pd.DataFrame) -> pd.DataFrame: |
| """Table 1: How many bills were hit by each rule (and combinations).""" |
| rows = [] |
| for area_name, area_cfg in RISK_AREAS.items(): |
| area_df = df[df["risk_area"] == area_name] |
| for rule in area_cfg["rules"]: |
| rid = rule["id"] |
| col = f"hit_{rid}" |
| if col not in df.columns: |
| continue |
| hits = int(df[col].sum()) |
| area_hits = int(area_df[col].sum()) |
| fraud_hits = int(df[df["is_illicit"] == 1][col].sum()) if col in df.columns else 0 |
| rows.append({ |
| "Risk Area": area_name, |
| "Rule ID": rid, |
| "Rule Name": rule["name"], |
| "Base Weight": rule["weight"], |
| "Total Bills Hit": hits, |
| "Area Bills Hit": area_hits, |
| "Fraud Bills Hit": fraud_hits, |
| "Precision (%)": round(100 * fraud_hits / hits, 1) if hits > 0 else 0.0, |
| }) |
| return pd.DataFrame(rows) |
|
|
|
|
| def build_channel_table(df: pd.DataFrame) -> pd.DataFrame: |
| """Table 2: Bills per channel, exploit vs explore breakdown.""" |
| rows = [] |
| for ch in ["RED", "YELLOW", "GREEN"]: |
| ch_df = df[df["channel"] == ch] |
| ex_df = ch_df[ch_df["is_exploration"] == 1] |
| rows.append({ |
| "Channel": ch, |
| "Total Bills": len(ch_df), |
| "% of Total": round(100 * len(ch_df) / len(df), 1), |
| "Exploitation Bills": len(ch_df) - len(ex_df), |
| "Exploration Bills": len(ex_df), |
| "Avg Risk Score": round(ch_df["fraud_score"].mean(), 3), |
| "Illicit Count": int(ch_df["is_illicit"].sum()), |
| "Detected Fraud": int((ch_df["inspection_outcome"] == "FRAUD_DETECTED").sum()), |
| "Revenue Collected ($)": round(ch_df["detected_revenue"].sum(), 2), |
| }) |
| return pd.DataFrame(rows) |
|
|
|
|
| def build_exploration_discovery_table(df: pd.DataFrame) -> pd.DataFrame: |
| """Table 3: How exploration added new bills to risky areas.""" |
| explore_df = df[df["is_exploration"] == 1] |
| rows = [] |
| for area in RISK_AREAS.keys(): |
| a_df = explore_df[explore_df["risk_area"] == area] |
| new_illicit = a_df[a_df["added_to_offence_db"] == 1] |
| rows.append({ |
| "Risk Area": area, |
| "Exploration Bills": len(a_df), |
| "New Frauds Unearthed": len(new_illicit), |
| "Discovery Rate (%)": round(100 * len(new_illicit) / len(a_df), 1) if len(a_df) else 0, |
| "Avg Uncertainty Score": round(a_df["uncertainty_score"].mean(), 3) if len(a_df) else 0, |
| "New Revenue Recovered ($)": round(new_illicit["detected_revenue"].sum(), 2), |
| }) |
| return pd.DataFrame(rows) |
|
|
|
|
| def build_offence_db_table(df: pd.DataFrame) -> pd.DataFrame: |
| """Table 4: Offence database built from Red+Yellow feedback.""" |
| offence_df = df[df["added_to_offence_db"] == 1] |
| rows = [] |
| for area in RISK_AREAS.keys(): |
| a_df = offence_df[offence_df["risk_area"] == area] |
| exploit_adds = len(a_df[a_df["is_exploration"] == 0]) |
| explore_adds = len(a_df[a_df["is_exploration"] == 1]) |
| rows.append({ |
| "Risk Area": area, |
| "Total Added": len(a_df), |
| "From Exploitation": exploit_adds, |
| "From Exploration": explore_adds, |
| "Unique Countries": a_df["country"].nunique(), |
| "Unique HS Codes": a_df["hs_code"].nunique(), |
| "Total Revenue ($)": round(a_df["detected_revenue"].sum(), 2), |
| }) |
| return pd.DataFrame(rows) |
|
|
|
|
| def build_risk_score_table(df: pd.DataFrame, updated_weights: dict) -> pd.DataFrame: |
| """Table 5: Risk scoring per transaction with weight evolution.""" |
| sample = df.sample(min(200, len(df)), random_state=42).copy() |
| rule_ids = [r["id"] for a in RISK_AREAS.values() for r in a["rules"]] |
|
|
| rows = [] |
| for _, row in sample.iterrows(): |
| hit_rules = [rid for rid in rule_ids if row.get(f"hit_{rid}", 0) == 1] |
| orig_w = sum(r["weight"] for a in RISK_AREAS.values() for r in a["rules"] if r["id"] in hit_rules) |
| new_w = sum(updated_weights.get(rid, 0) for rid in hit_rules) |
| rows.append({ |
| "Bill ID": row["bill_id"], |
| "Risk Area": row["risk_area"], |
| "Channel": row["channel"], |
| "Risk Score": round(row["fraud_score"], 3), |
| "Rules Hit": len(hit_rules), |
| "Original Weight": round(orig_w, 3), |
| "Updated Weight": round(new_w, 3), |
| "Weight Δ": round(new_w - orig_w, 4), |
| "Outcome": row["inspection_outcome"], |
| }) |
| return pd.DataFrame(rows) |
|
|
|
|
| def compute_efficiency_metrics(df: pd.DataFrame) -> dict: |
| """WCO Efficiency Index = Detection Rate / Selection Rate.""" |
| n = len(df) |
| selected = df[df["channel"].isin(["RED", "YELLOW"])] |
| detected = df[df["inspection_outcome"] == "FRAUD_DETECTED"] |
| true_fraud = df[df["is_illicit"] == 1] |
|
|
| selection_rate = len(selected) / n |
| detection_rate = len(detected) / len(true_fraud) if len(true_fraud) > 0 else 0 |
| precision = len(detected) / len(selected) if len(selected) > 0 else 0 |
| efficiency_idx = detection_rate / selection_rate if selection_rate > 0 else 0 |
| revenue_total = df["detected_revenue"].sum() |
|
|
| |
| baseline = { |
| "selection_rate": selection_rate, |
| "detection_rate": 0.041, |
| "precision": 0.041 / selection_rate if selection_rate > 0 else 0.04, |
| "efficiency_index": 0.41, |
| "revenue": revenue_total * 0.40, |
| } |
| hybrid = { |
| "selection_rate": selection_rate, |
| "detection_rate": round(detection_rate, 4), |
| "precision": round(precision, 4), |
| "efficiency_index": round(efficiency_idx, 3), |
| "revenue": round(revenue_total, 2), |
| } |
| return {"baseline": baseline, "hybrid": hybrid, |
| "improvement_pct": round(100 * (efficiency_idx - 0.41) / 0.41, 1)} |
|
|
|
|
| def get_default_weights() -> dict: |
| return {r["id"]: r["weight"] for a in RISK_AREAS.values() for r in a["rules"]} |
|
|