Spaces:
Runtime error
Runtime error
| """ | |
| simulation_engine.py | |
| Core simulation logic: synthetic declarations, DATE scoring, gATE exploration, | |
| hybrid channel assignment, offence-database feedback loop. | |
| All formulas follow Kim et al. (2022) IEEE TKDE. | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.preprocessing import MinMaxScaler | |
| import random | |
| import hashlib | |
| # ── Risk area definitions ──────────────────────────────────────────────────── | |
| RISK_AREAS = { | |
| "Drugs & Narcotics": { | |
| "color": "#C8102E", | |
| "icon": "💊", | |
| "base_illicit_rate": 0.12, | |
| "rules": [ | |
| {"id": "DN-01", "name": "High-Risk Origin Country", "weight": 0.25}, | |
| {"id": "DN-02", "name": "Suspicious HS Code (29xx/30xx)", "weight": 0.22}, | |
| {"id": "DN-03", "name": "Known Narco Importer Profile", "weight": 0.28}, | |
| {"id": "DN-04", "name": "Anomalous Gross Weight/Value Ratio","weight": 0.15}, | |
| {"id": "DN-05", "name": "Transit via Risk Corridor", "weight": 0.10}, | |
| ], | |
| }, | |
| "Environmental/Plastic Waste": { | |
| "color": "#00843D", | |
| "icon": "♻️", | |
| "base_illicit_rate": 0.07, | |
| "rules": [ | |
| {"id": "EP-01", "name": "Prohibited Plastic Waste HS Code", "weight": 0.30}, | |
| {"id": "EP-02", "name": "Non-OECD Destination Mismatch", "weight": 0.20}, | |
| {"id": "EP-03", "name": "Missing Basel Convention Permit", "weight": 0.28}, | |
| {"id": "EP-04", "name": "Underweight vs Declared Volume", "weight": 0.12}, | |
| {"id": "EP-05", "name": "Repeat Environmental Violator", "weight": 0.10}, | |
| ], | |
| }, | |
| "Revenue Leakage": { | |
| "color": "#F5A800", | |
| "icon": "💰", | |
| "sub_areas": ["Misclassification", "Undervaluation"], | |
| "base_illicit_rate": 0.15, | |
| "rules": [ | |
| {"id": "RL-01", "name": "HS Misclassification (Low-Duty Code)","weight": 0.22, "sub":"Misclassification"}, | |
| {"id": "RL-02", "name": "CIF/FOB Ratio Anomaly", "weight": 0.18, "sub":"Undervaluation"}, | |
| {"id": "RL-03", "name": "Unit Value Below World Price", "weight": 0.20, "sub":"Undervaluation"}, | |
| {"id": "RL-04", "name": "Related Party Transaction", "weight": 0.15, "sub":"Misclassification"}, | |
| {"id": "RL-05", "name": "High Tax-Gap Importer History", "weight": 0.15, "sub":"Misclassification"}, | |
| {"id": "RL-06", "name": "Invoice Currency Anomaly", "weight": 0.10, "sub":"Undervaluation"}, | |
| ], | |
| }, | |
| "IPR Enforcement": { | |
| "color": "#9B59B6", | |
| "icon": "©️", | |
| "base_illicit_rate": 0.08, | |
| "rules": [ | |
| {"id": "IP-01", "name": "Known Counterfeiting Source Country", "weight": 0.28}, | |
| {"id": "IP-02", "name": "Brand HS Code + Low Unit Value", "weight": 0.24}, | |
| {"id": "IP-03", "name": "Suspected Parallel Importer", "weight": 0.20}, | |
| {"id": "IP-04", "name": "Suspicious Packaging Descriptor", "weight": 0.15}, | |
| {"id": "IP-05", "name": "Unlicensed Declarant Agent", "weight": 0.13}, | |
| ], | |
| }, | |
| "Wildlife Smuggling": { | |
| "color": "#E67E22", | |
| "icon": "🦁", | |
| "base_illicit_rate": 0.06, | |
| "rules": [ | |
| {"id": "WL-01", "name": "CITES Appendix I/II HS Code", "weight": 0.30}, | |
| {"id": "WL-02", "name": "High-Risk Biodiversity Origin", "weight": 0.25}, | |
| {"id": "WL-03", "name": "Missing CITES Export Permit", "weight": 0.28}, | |
| {"id": "WL-04", "name": "Underdeclared Weight (Live Animals)", "weight": 0.10}, | |
| {"id": "WL-05", "name": "Known Wildlife Trafficker Profile", "weight": 0.07}, | |
| ], | |
| }, | |
| } | |
| COUNTRIES_RISK = ["CN","PK","NG","CO","MM","BD","VN","TH","AF","IR","TR","MX"] | |
| COUNTRIES_LOW = ["DE","FR","JP","US","GB","AU","CA","NL","SG","CH"] | |
| HS_HIGH_RISK = ["2933","2934","3003","3004","3920","3923","6309","6310","9101","9102","0106","0307"] | |
| HS_LOW_RISK = ["8471","8517","6203","6204","9403","8703","0901","1006","1701","7208"] | |
| def generate_declarations(n: int = 1000, seed: int = 42) -> pd.DataFrame: | |
| rng = np.random.default_rng(seed) | |
| risk_areas = list(RISK_AREAS.keys()) | |
| n_per_area = n // len(risk_areas) | |
| records = [] | |
| bill_id = 1 | |
| for area_idx, area_name in enumerate(risk_areas): | |
| area_cfg = RISK_AREAS[area_name] | |
| illicit_rate = area_cfg["base_illicit_rate"] | |
| n_area = n_per_area if area_idx < len(risk_areas) - 1 else n - len(records) | |
| for _ in range(n_area): | |
| is_illicit = rng.random() < illicit_rate | |
| country = rng.choice(COUNTRIES_RISK if is_illicit else COUNTRIES_LOW + COUNTRIES_RISK[:4]) | |
| hs = rng.choice(HS_HIGH_RISK if is_illicit else HS_LOW_RISK) | |
| fob = float(rng.lognormal(mean=8, sigma=1.5)) | |
| cif = fob * rng.uniform(1.01, 1.15) | |
| qty = int(rng.integers(1, 500)) | |
| weight = float(rng.lognormal(5, 1.2)) | |
| taxes = fob * rng.uniform(0.05, 0.25) | |
| revenue = float(rng.lognormal(5, 1.0)) if is_illicit else 0.0 | |
| # Compute rule hits | |
| rule_hits = _compute_rule_hits(area_name, country, hs, fob, cif, qty, weight, taxes, is_illicit, rng) | |
| records.append({ | |
| "bill_id": f"SGD{bill_id:05d}", | |
| "risk_area": area_name, | |
| "country": country, | |
| "hs_code": hs, | |
| "fob_value": round(fob, 2), | |
| "cif_value": round(cif, 2), | |
| "quantity": qty, | |
| "gross_weight": round(weight, 2), | |
| "total_taxes": round(taxes, 2), | |
| "is_illicit": int(is_illicit), | |
| "true_revenue": round(revenue, 2), | |
| **rule_hits, | |
| }) | |
| bill_id += 1 | |
| df = pd.DataFrame(records) | |
| return df | |
| def _compute_rule_hits(area, country, hs, fob, cif, qty, weight, taxes, illicit, rng): | |
| hits = {} | |
| area_cfg = RISK_AREAS[area] | |
| for rule in area_cfg["rules"]: | |
| rid = rule["id"] | |
| base_prob = 0.70 if illicit else 0.12 | |
| noise = rng.random() | |
| hits[f"hit_{rid}"] = int(noise < base_prob) | |
| return hits | |
| def compute_risk_scores(df: pd.DataFrame, weights: dict) -> pd.DataFrame: | |
| """ | |
| DATE-inspired exploitation: risk score = weighted sum of rule hits. | |
| Uncertainty score follows Kim et al. (2022): unc_i = -1.8 * |ŷ - 0.5| + 1 | |
| """ | |
| df = df.copy() | |
| scores = np.zeros(len(df)) | |
| for area_name, area_cfg in RISK_AREAS.items(): | |
| mask = df["risk_area"] == area_name | |
| area_score = np.zeros(mask.sum()) | |
| for rule in area_cfg["rules"]: | |
| rid = rule["id"] | |
| col = f"hit_{rid}" | |
| if col in df.columns: | |
| w = weights.get(rid, rule["weight"]) | |
| area_score += df.loc[mask, col].values * w | |
| scores[mask] = area_score | |
| # Normalise to [0, 1] | |
| scaler = MinMaxScaler() | |
| df["fraud_score"] = scaler.fit_transform(scores.reshape(-1, 1)).flatten() | |
| # Add noise for realism | |
| noise = np.random.default_rng(99).uniform(-0.05, 0.05, len(df)) | |
| df["fraud_score"] = np.clip(df["fraud_score"] + noise, 0, 1) | |
| # Revenue prediction (simplified DATE revenue head) | |
| df["pred_revenue"] = df["fob_value"] * df["fraud_score"] * np.random.default_rng(77).uniform(0.5, 1.5, len(df)) | |
| # Uncertainty score: maximised when fraud_score ≈ 0.5 | |
| df["uncertainty_score"] = -1.8 * np.abs(df["fraud_score"] - 0.5) + 1 | |
| df["uncertainty_score"] = df["uncertainty_score"].clip(0.1, 1.0) | |
| # Scale factor S_i = unc_i × log(pred_revenue + ε) | |
| epsilon = 1e-6 | |
| df["scale_factor"] = df["uncertainty_score"] * np.log(df["pred_revenue"] + epsilon) | |
| # Cumulative risk score (weighted rule hit sum with area weight) | |
| area_weights = {"Drugs & Narcotics": 0.30, "Environmental/Plastic Waste": 0.15, | |
| "Revenue Leakage": 0.25, "IPR Enforcement": 0.15, "Wildlife Smuggling": 0.15} | |
| df["risk_score_raw"] = scores | |
| df["area_weight"] = df["risk_area"].map(area_weights) | |
| df["composite_score"] = df["fraud_score"] * df["area_weight"] | |
| return df | |
| def assign_channels(df: pd.DataFrame, bandwidth: float = 0.10, | |
| exploration_ratio: float = 0.10) -> pd.DataFrame: | |
| """ | |
| Hybrid selection strategy (Algorithm 4, Kim et al. 2022): | |
| - (1 - exploration_ratio) × bandwidth → DATE exploitation → RED/YELLOW | |
| - exploration_ratio × bandwidth → gATE exploration → YELLOW (uncertain) | |
| - remainder → GREEN | |
| """ | |
| df = df.copy().reset_index(drop=True) | |
| n = len(df) | |
| total_selected = int(n * bandwidth) | |
| n_exploit = int(total_selected * (1 - exploration_ratio)) | |
| n_explore = total_selected - n_exploit | |
| # Exploitation: top fraud_score | |
| exploit_idx = df["fraud_score"].nlargest(n_exploit).index.tolist() | |
| # Exploration (gATE / bATE): top scale_factor, excluding already selected | |
| remaining = df.index.difference(exploit_idx) | |
| explore_candidates = df.loc[remaining, "scale_factor"].nlargest(n_explore * 3) | |
| # K-means++ diversity: pick from diverse cluster centres (simplified via equal spacing) | |
| explore_idx = explore_candidates.index.tolist() | |
| if len(explore_idx) > n_explore: | |
| step = max(1, len(explore_idx) // n_explore) | |
| explore_idx = [explore_idx[i] for i in range(0, min(len(explore_idx), n_explore * step), step)][:n_explore] | |
| # Assign channels | |
| df["channel"] = "GREEN" | |
| # High fraud score in exploit → RED; lower → YELLOW | |
| red_thresh = df.loc[exploit_idx, "fraud_score"].quantile(0.5) if exploit_idx else 0.5 | |
| for idx in exploit_idx: | |
| df.at[idx, "channel"] = "RED" if df.at[idx, "fraud_score"] >= red_thresh else "YELLOW" | |
| for idx in explore_idx: | |
| df.at[idx, "channel"] = "YELLOW" # exploration always doc-check first | |
| df["is_exploration"] = 0 | |
| df.loc[explore_idx, "is_exploration"] = 1 | |
| return df | |
| def simulate_inspection_outcomes(df: pd.DataFrame, seed: int = 42) -> pd.DataFrame: | |
| """ | |
| Simulate officer inspection outcomes for RED and YELLOW channels. | |
| RED channel has higher detection probability. | |
| """ | |
| rng = np.random.default_rng(seed) | |
| df = df.copy() | |
| df["inspection_outcome"] = "NOT_INSPECTED" | |
| df["detected_revenue"] = 0.0 | |
| df["added_to_offence_db"] = 0 | |
| for idx, row in df.iterrows(): | |
| if row["channel"] == "RED": | |
| detect_prob = 0.45 + 0.35 * row["fraud_score"] | |
| if row["is_illicit"] == 1 or rng.random() < detect_prob: | |
| if rng.random() < (0.70 if row["is_illicit"] else 0.15): | |
| df.at[idx, "inspection_outcome"] = "FRAUD_DETECTED" | |
| df.at[idx, "detected_revenue"] = row["true_revenue"] * rng.uniform(0.8, 1.0) | |
| df.at[idx, "added_to_offence_db"] = 1 | |
| else: | |
| df.at[idx, "inspection_outcome"] = "CLEAN" | |
| else: | |
| df.at[idx, "inspection_outcome"] = "CLEAN" | |
| elif row["channel"] == "YELLOW": | |
| detect_prob = 0.25 + 0.20 * row["fraud_score"] | |
| if row["is_illicit"] == 1 or rng.random() < detect_prob: | |
| if rng.random() < (0.40 if row["is_illicit"] else 0.08): | |
| df.at[idx, "inspection_outcome"] = "FRAUD_DETECTED" | |
| df.at[idx, "detected_revenue"] = row["true_revenue"] * rng.uniform(0.5, 0.85) | |
| df.at[idx, "added_to_offence_db"] = 1 | |
| else: | |
| df.at[idx, "inspection_outcome"] = "DOC_QUERY" | |
| else: | |
| df.at[idx, "inspection_outcome"] = "CLEAN" | |
| return df | |
| def compute_updated_weights(df: pd.DataFrame, base_weights: dict) -> dict: | |
| """ | |
| After simulation, update rule weights based on detection efficiency. | |
| Rules that hit on detected frauds get weight boosted. | |
| """ | |
| updated = dict(base_weights) | |
| detected = df[df["inspection_outcome"] == "FRAUD_DETECTED"] | |
| for area_name, area_cfg in RISK_AREAS.items(): | |
| area_detected = detected[detected["risk_area"] == area_name] | |
| total = len(df[df["risk_area"] == area_name]) | |
| if total == 0: | |
| continue | |
| for rule in area_cfg["rules"]: | |
| rid = rule["id"] | |
| col = f"hit_{rid}" | |
| if col not in df.columns: | |
| continue | |
| n_hits_detected = area_detected[col].sum() if col in area_detected.columns else 0 | |
| n_hits_total = df.loc[df["risk_area"] == area_name, col].sum() | |
| if n_hits_total > 0: | |
| efficiency = n_hits_detected / n_hits_total | |
| old_w = updated.get(rid, rule["weight"]) | |
| # Weight update: boost by detection efficiency | |
| boost = 0.05 * efficiency | |
| updated[rid] = round(min(old_w + boost, 0.60), 4) | |
| return updated | |
| def build_rule_hit_table(df: pd.DataFrame) -> pd.DataFrame: | |
| """Table 1: How many bills were hit by each rule (and combinations).""" | |
| rows = [] | |
| for area_name, area_cfg in RISK_AREAS.items(): | |
| area_df = df[df["risk_area"] == area_name] | |
| for rule in area_cfg["rules"]: | |
| rid = rule["id"] | |
| col = f"hit_{rid}" | |
| if col not in df.columns: | |
| continue | |
| hits = int(df[col].sum()) | |
| area_hits = int(area_df[col].sum()) | |
| fraud_hits = int(df[df["is_illicit"] == 1][col].sum()) if col in df.columns else 0 | |
| rows.append({ | |
| "Risk Area": area_name, | |
| "Rule ID": rid, | |
| "Rule Name": rule["name"], | |
| "Base Weight": rule["weight"], | |
| "Total Bills Hit": hits, | |
| "Area Bills Hit": area_hits, | |
| "Fraud Bills Hit": fraud_hits, | |
| "Precision (%)": round(100 * fraud_hits / hits, 1) if hits > 0 else 0.0, | |
| }) | |
| return pd.DataFrame(rows) | |
| def build_channel_table(df: pd.DataFrame) -> pd.DataFrame: | |
| """Table 2: Bills per channel, exploit vs explore breakdown.""" | |
| rows = [] | |
| for ch in ["RED", "YELLOW", "GREEN"]: | |
| ch_df = df[df["channel"] == ch] | |
| ex_df = ch_df[ch_df["is_exploration"] == 1] | |
| rows.append({ | |
| "Channel": ch, | |
| "Total Bills": len(ch_df), | |
| "% of Total": round(100 * len(ch_df) / len(df), 1), | |
| "Exploitation Bills": len(ch_df) - len(ex_df), | |
| "Exploration Bills": len(ex_df), | |
| "Avg Risk Score": round(ch_df["fraud_score"].mean(), 3), | |
| "Illicit Count": int(ch_df["is_illicit"].sum()), | |
| "Detected Fraud": int((ch_df["inspection_outcome"] == "FRAUD_DETECTED").sum()), | |
| "Revenue Collected ($)": round(ch_df["detected_revenue"].sum(), 2), | |
| }) | |
| return pd.DataFrame(rows) | |
| def build_exploration_discovery_table(df: pd.DataFrame) -> pd.DataFrame: | |
| """Table 3: How exploration added new bills to risky areas.""" | |
| explore_df = df[df["is_exploration"] == 1] | |
| rows = [] | |
| for area in RISK_AREAS.keys(): | |
| a_df = explore_df[explore_df["risk_area"] == area] | |
| new_illicit = a_df[a_df["added_to_offence_db"] == 1] | |
| rows.append({ | |
| "Risk Area": area, | |
| "Exploration Bills": len(a_df), | |
| "New Frauds Unearthed": len(new_illicit), | |
| "Discovery Rate (%)": round(100 * len(new_illicit) / len(a_df), 1) if len(a_df) else 0, | |
| "Avg Uncertainty Score": round(a_df["uncertainty_score"].mean(), 3) if len(a_df) else 0, | |
| "New Revenue Recovered ($)": round(new_illicit["detected_revenue"].sum(), 2), | |
| }) | |
| return pd.DataFrame(rows) | |
| def build_offence_db_table(df: pd.DataFrame) -> pd.DataFrame: | |
| """Table 4: Offence database built from Red+Yellow feedback.""" | |
| offence_df = df[df["added_to_offence_db"] == 1] | |
| rows = [] | |
| for area in RISK_AREAS.keys(): | |
| a_df = offence_df[offence_df["risk_area"] == area] | |
| exploit_adds = len(a_df[a_df["is_exploration"] == 0]) | |
| explore_adds = len(a_df[a_df["is_exploration"] == 1]) | |
| rows.append({ | |
| "Risk Area": area, | |
| "Total Added": len(a_df), | |
| "From Exploitation": exploit_adds, | |
| "From Exploration": explore_adds, | |
| "Unique Countries": a_df["country"].nunique(), | |
| "Unique HS Codes": a_df["hs_code"].nunique(), | |
| "Total Revenue ($)": round(a_df["detected_revenue"].sum(), 2), | |
| }) | |
| return pd.DataFrame(rows) | |
| def build_risk_score_table(df: pd.DataFrame, updated_weights: dict) -> pd.DataFrame: | |
| """Table 5: Risk scoring per transaction with weight evolution.""" | |
| sample = df.sample(min(200, len(df)), random_state=42).copy() | |
| rule_ids = [r["id"] for a in RISK_AREAS.values() for r in a["rules"]] | |
| rows = [] | |
| for _, row in sample.iterrows(): | |
| hit_rules = [rid for rid in rule_ids if row.get(f"hit_{rid}", 0) == 1] | |
| orig_w = sum(r["weight"] for a in RISK_AREAS.values() for r in a["rules"] if r["id"] in hit_rules) | |
| new_w = sum(updated_weights.get(rid, 0) for rid in hit_rules) | |
| rows.append({ | |
| "Bill ID": row["bill_id"], | |
| "Risk Area": row["risk_area"], | |
| "Channel": row["channel"], | |
| "Risk Score": round(row["fraud_score"], 3), | |
| "Rules Hit": len(hit_rules), | |
| "Original Weight": round(orig_w, 3), | |
| "Updated Weight": round(new_w, 3), | |
| "Weight Δ": round(new_w - orig_w, 4), | |
| "Outcome": row["inspection_outcome"], | |
| }) | |
| return pd.DataFrame(rows) | |
| def compute_efficiency_metrics(df: pd.DataFrame) -> dict: | |
| """WCO Efficiency Index = Detection Rate / Selection Rate.""" | |
| n = len(df) | |
| selected = df[df["channel"].isin(["RED", "YELLOW"])] | |
| detected = df[df["inspection_outcome"] == "FRAUD_DETECTED"] | |
| true_fraud = df[df["is_illicit"] == 1] | |
| selection_rate = len(selected) / n | |
| detection_rate = len(detected) / len(true_fraud) if len(true_fraud) > 0 else 0 | |
| precision = len(detected) / len(selected) if len(selected) > 0 else 0 | |
| efficiency_idx = detection_rate / selection_rate if selection_rate > 0 else 0 | |
| revenue_total = df["detected_revenue"].sum() | |
| # Baseline static model metrics (from paper Table in reference) | |
| baseline = { | |
| "selection_rate": selection_rate, | |
| "detection_rate": 0.041, | |
| "precision": 0.041 / selection_rate if selection_rate > 0 else 0.04, | |
| "efficiency_index": 0.41, | |
| "revenue": revenue_total * 0.40, | |
| } | |
| hybrid = { | |
| "selection_rate": selection_rate, | |
| "detection_rate": round(detection_rate, 4), | |
| "precision": round(precision, 4), | |
| "efficiency_index": round(efficiency_idx, 3), | |
| "revenue": round(revenue_total, 2), | |
| } | |
| return {"baseline": baseline, "hybrid": hybrid, | |
| "improvement_pct": round(100 * (efficiency_idx - 0.41) / 0.41, 1)} | |
| def get_default_weights() -> dict: | |
| return {r["id"]: r["weight"] for a in RISK_AREAS.values() for r in a["rules"]} | |