AML_env / tools /haystack.py
DataBoySu
second
4204e21
"""
Financial Knowledge Graph - Haystack Generator
================================================
Generates entities.json, accounts.json, and transactions.json
for an AI Reinforcement Learning environment.
Usage:
python generate_haystack.py # Generate haystack only
python generate_haystack.py --inject manual_tasks.json # Inject manual fraud tasks
"""
import json
import random
import argparse
from datetime import datetime, timedelta, timezone
from pathlib import Path
from faker import Faker
# ── Reproducibility ──────────────────────────────────────────────────────────
SEED = 42
random.seed(SEED)
fake = Faker()
Faker.seed(SEED)
# ── Volume constants ──────────────────────────────────────────────────────────
NUM_ENTITIES = 300
NUM_ACCOUNTS = 400
NUM_TRANSACTIONS = 5_000
PCT_INDIVIDUAL = 0.80 # 80 % Individual, 20 % Corporate
PCT_ACTIVE = 0.95 # 95 % Active accounts
# ── Time window ───────────────────────────────────────────────────────────────
NOW = datetime.now(timezone.utc)
SIX_MONTHS_AGO = NOW - timedelta(days=182)
# ═══════════════════════════════════════════════════════════════════════════════
# STEP 1 – PROCEDURAL GENERATION
# ═══════════════════════════════════════════════════════════════════════════════
def make_entity_id(n: int) -> str:
return f"ENT-{n:04d}"
def make_account_id(n: int) -> str:
return f"ACC-{n:04d}"
def make_txn_id(n: int) -> str:
return f"TXN-{n:06d}"
def generate_entities(count: int = NUM_ENTITIES) -> list[dict]:
"""Generate Individual and Corporate entities."""
entities: list[dict] = []
n_individual = round(count * PCT_INDIVIDUAL)
# ── Individuals first (needed as directors for Corporates) ───────────────
for i in range(1, n_individual + 1):
entities.append({
"entity_id": make_entity_id(i),
"name": fake.name(),
"type": "Individual",
"registration_address": fake.address().replace("\n", ", "),
"directors": [],
})
individual_ids = [e["entity_id"] for e in entities]
# ── Corporates ────────────────────────────────────────────────────────────
for i in range(n_individual + 1, count + 1):
num_directors = random.randint(1, 3)
directors = random.sample(individual_ids, k=num_directors)
entities.append({
"entity_id": make_entity_id(i),
"name": fake.company(),
"type": "Corporate",
"registration_address": fake.address().replace("\n", ", "),
"directors": directors,
})
return entities
def generate_accounts(entities: list[dict], count: int = NUM_ACCOUNTS) -> list[dict]:
"""Assign accounts randomly to entities."""
entity_ids = [e["entity_id"] for e in entities]
accounts: list[dict] = []
for i in range(1, count + 1):
accounts.append({
"account_id": make_account_id(i),
"owner_entity_id": random.choice(entity_ids),
"status": "Active" if random.random() < PCT_ACTIVE else "Closed",
})
return accounts
# ── Memo / amount helpers ─────────────────────────────────────────────────────
def _corp_to_individual_tx() -> tuple[str, float]:
memo = random.choice([
"Payroll", "Salary Q3", "Salary Q4", "Expense Reimbursement",
"Bonus Payment", "Contractor Fee", "Freelance Invoice",
])
amount = round(random.uniform(2_000, 10_000), 2)
return memo, amount
def _corp_to_corp_tx() -> tuple[str, float]:
memo = random.choice([
"Server Hosting", "Consulting Retainer", "Office Supplies",
f"Invoice #{random.randint(1000, 9999)}", "Software License",
"Marketing Services", "Logistics Fee", "Partnership Distribution",
])
amount = round(random.uniform(500, 50_000), 2)
return memo, amount
def _individual_to_corp_tx() -> tuple[str, float]:
memo = random.choice([
"Utility Bill", "Coffee", "Gym Membership", "Online Shopping",
"Streaming Subscription", "Insurance Premium", "Rent Payment",
])
amount = round(random.uniform(5, 200), 2)
return memo, amount
def _individual_to_individual_tx() -> tuple[str, float]:
memo = random.choice([
"Dinner split", "Birthday gift", "Loan repayment", "Shared expenses",
"Concert tickets", "Rent share", "",
])
amount = round(random.uniform(10, 500), 2)
return memo, amount
def _random_timestamp() -> str:
delta_seconds = (NOW - SIX_MONTHS_AGO).total_seconds()
rand_seconds = random.uniform(0, delta_seconds)
ts = SIX_MONTHS_AGO + timedelta(seconds=rand_seconds)
return ts.isoformat()
def generate_transactions(
accounts: list[dict],
entities: list[dict],
count: int = NUM_TRANSACTIONS,
id_offset: int = 0,
) -> list[dict]:
"""Generate semantically-typed transactions between accounts."""
# Build a lookup: account_id β†’ entity type
entity_type: dict[str, str] = {e["entity_id"]: e["type"] for e in entities}
acct_to_entity: dict[str, str] = {
a["account_id"]: a["owner_entity_id"] for a in accounts
}
active_accounts = [a["account_id"] for a in accounts if a["status"] == "Active"]
if len(active_accounts) < 2:
raise ValueError("Not enough active accounts to generate transactions.")
transactions: list[dict] = []
for i in range(1, count + 1):
sender_acct = random.choice(active_accounts)
receiver_acct = random.choice(active_accounts)
while receiver_acct == sender_acct:
receiver_acct = random.choice(active_accounts)
sender_type = entity_type.get(acct_to_entity.get(sender_acct, ""), "Individual")
receiver_type = entity_type.get(acct_to_entity.get(receiver_acct, ""), "Individual")
if sender_type == "Corporate" and receiver_type == "Individual":
memo, amount = _corp_to_individual_tx()
elif sender_type == "Corporate" and receiver_type == "Corporate":
memo, amount = _corp_to_corp_tx()
elif sender_type == "Individual" and receiver_type == "Corporate":
memo, amount = _individual_to_corp_tx()
else:
memo, amount = _individual_to_individual_tx()
transactions.append({
"txn_id": make_txn_id(i + id_offset),
"sender_account": sender_acct,
"receiver_account": receiver_acct,
"amount": amount,
"timestamp": _random_timestamp(),
"memo_text": memo,
})
return transactions
# ═══════════════════════════════════════════════════════════════════════════════
# STEP 3 – INTEGRATION HOOKS
# ═══════════════════════════════════════════════════════════════════════════════
def _camouflage_transactions(
manual_account_ids: list[str],
haystack_account_ids: list[str],
entities: list[dict],
accounts: list[dict],
txn_id_start: int,
) -> list[dict]:
"""
Generate 5-10 'normal' transactions that bridge each manual account to
random haystack accounts, so manual accounts don't appear as isolated islands.
"""
entity_type: dict[str, str] = {e["entity_id"]: e["type"] for e in entities}
acct_to_entity: dict[str, str] = {
a["account_id"]: a["owner_entity_id"] for a in accounts
}
camouflage: list[dict] = []
counter = txn_id_start
for manual_acct in manual_account_ids:
n_bridge_txns = random.randint(5, 10)
for _ in range(n_bridge_txns):
haystack_acct = random.choice(haystack_account_ids)
# Randomly decide direction (manual sends or receives)
if random.random() < 0.5:
sender_acct, receiver_acct = manual_acct, haystack_acct
else:
sender_acct, receiver_acct = haystack_acct, manual_acct
sender_type = entity_type.get(acct_to_entity.get(sender_acct, ""), "Individual")
receiver_type = entity_type.get(acct_to_entity.get(receiver_acct, ""), "Individual")
if sender_type == "Corporate" and receiver_type == "Individual":
memo, amount = _corp_to_individual_tx()
elif sender_type == "Corporate" and receiver_type == "Corporate":
memo, amount = _corp_to_corp_tx()
elif sender_type == "Individual" and receiver_type == "Corporate":
memo, amount = _individual_to_corp_tx()
else:
memo, amount = _individual_to_individual_tx()
camouflage.append({
"txn_id": make_txn_id(counter),
"sender_account": sender_acct,
"receiver_account": receiver_acct,
"amount": amount,
"timestamp": _random_timestamp(),
"memo_text": memo,
"_camouflage": True, # debug tag – remove if unwanted
})
counter += 1
return camouflage
def inject_manual_tasks(
haystack_data: dict,
manual_json_path: str | Path,
) -> dict:
"""
Load hand-written manual_tasks.json and merge it into the haystack.
Expected manual_tasks.json schema:
{
"entities": [ { ...entity fields... }, ... ],
"accounts": [ { ...account fields... }, ... ],
"transactions": [ { ...transaction fields... }, ... ]
}
Returns the merged dataset dict.
"""
manual_path = Path(manual_json_path)
if not manual_path.exists():
raise FileNotFoundError(f"Manual tasks file not found: {manual_path}")
with manual_path.open() as fh:
manual: dict = json.load(fh)
# ── Validate top-level keys ───────────────────────────────────────────────
for key in ("entities", "accounts", "transactions"):
if key not in manual:
raise ValueError(
f"manual_tasks.json is missing the '{key}' key. "
"Please check the expected schema in the docstring."
)
print(f" Injecting {len(manual['entities'])} manual entities …")
print(f" Injecting {len(manual['accounts'])} manual accounts …")
print(f" Injecting {len(manual['transactions'])} manual transactions …")
# ── Collect IDs already in the haystack to detect collisions ─────────────
existing_entity_ids = {e["entity_id"] for e in haystack_data["entities"]}
existing_acct_ids = {a["account_id"] for a in haystack_data["accounts"]}
existing_txn_ids = {t["txn_id"] for t in haystack_data["transactions"]}
for e in manual["entities"]:
if e["entity_id"] in existing_entity_ids:
raise ValueError(
f"Collision: entity_id '{e['entity_id']}' already exists in the haystack. "
"Use IDs outside the ENT-0001 … ENT-0300 range (e.g. ENT-9001)."
)
for a in manual["accounts"]:
if a["account_id"] in existing_acct_ids:
raise ValueError(
f"Collision: account_id '{a['account_id']}' already exists in the haystack. "
"Use IDs outside the ACC-0001 … ACC-0400 range (e.g. ACC-9001)."
)
for t in manual["transactions"]:
if t["txn_id"] in existing_txn_ids:
raise ValueError(
f"Collision: txn_id '{t['txn_id']}' already exists in the haystack. "
"Use IDs outside the TXN-000001 … TXN-005000 range (e.g. TXN-900001)."
)
# ── Append manual data ────────────────────────────────────────────────────
# Build a combined entity + account list so camouflage txns can look up types
combined_entities = haystack_data["entities"] + manual["entities"]
combined_accounts = haystack_data["accounts"] + manual["accounts"]
haystack_data["entities"] += manual["entities"]
haystack_data["accounts"] += manual["accounts"]
haystack_data["transactions"] += manual["transactions"]
# ── Camouflage: bridge manual accounts to haystack accounts ──────────────
manual_acct_ids = [a["account_id"] for a in manual["accounts"]]
haystack_acct_ids = [
a["account_id"]
for a in haystack_data["accounts"]
if a["account_id"] not in set(manual_acct_ids) and a["status"] == "Active"
]
txn_id_start = (
max(
int(t["txn_id"].split("-")[1])
for t in haystack_data["transactions"]
) + 1
)
camouflage = _camouflage_transactions(
manual_account_ids = manual_acct_ids,
haystack_account_ids = haystack_acct_ids,
entities = combined_entities,
accounts = combined_accounts,
txn_id_start = txn_id_start,
)
print(f" Generated {len(camouflage)} camouflage transactions …")
haystack_data["transactions"] += camouflage
return haystack_data
# ═══════════════════════════════════════════════════════════════════════════════
# I/O HELPERS
# ═══════════════════════════════════════════════════════════════════════════════
def _write_json(obj: list | dict, path: Path) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as fh:
json.dump(obj, fh, indent=2, ensure_ascii=False)
print(f" βœ“ Wrote {len(obj):,} records β†’ {path}")
def save_dataset(data: dict, output_dir: Path = Path(".")) -> None:
"""Write the three JSON files to output_dir."""
_write_json(data["entities"], output_dir / "entities.json")
_write_json(data["accounts"], output_dir / "accounts.json")
_write_json(data["transactions"], output_dir / "transactions.json")
# ═══════════════════════════════════════════════════════════════════════════════
# MAIN
# ═══════════════════════════════════════════════════════════════════════════════
def main() -> None:
parser = argparse.ArgumentParser(
description="Generate a financial Knowledge Graph haystack."
)
parser.add_argument(
"--inject",
metavar="MANUAL_JSON",
help="Path to hand-written manual_tasks.json to inject into the haystack.",
)
parser.add_argument(
"--output-dir",
default=".",
metavar="DIR",
help="Directory where the three JSON files will be written (default: cwd).",
)
args = parser.parse_args()
output_dir = Path(args.output_dir)
# ── 1. Build the haystack ─────────────────────────────────────────────────
print("\n── Step 1: Generating entities …")
entities = generate_entities()
print(f" βœ“ {len(entities)} entities "
f"({sum(1 for e in entities if e['type']=='Individual')} individuals, "
f"{sum(1 for e in entities if e['type']=='Corporate')} corporates)")
print("── Step 2: Generating accounts …")
accounts = generate_accounts(entities)
print(f" βœ“ {len(accounts)} accounts "
f"({sum(1 for a in accounts if a['status']=='Active')} active)")
print("── Step 3: Generating transactions …")
transactions = generate_transactions(accounts, entities)
print(f" βœ“ {len(transactions):,} transactions")
dataset: dict = {
"entities": entities,
"accounts": accounts,
"transactions": transactions,
}
# ── 2. Optionally inject manual fraud tasks ───────────────────────────────
if args.inject:
print(f"\n── Injecting manual tasks from: {args.inject}")
dataset = inject_manual_tasks(dataset, args.inject)
# ── 3. Write outputs ──────────────────────────────────────────────────────
print(f"\n── Writing JSON files to: {output_dir.resolve()}")
save_dataset(dataset, output_dir)
print("\nβœ… Done.\n")
print(" Dataset summary:")
print(f" Entities: {len(dataset['entities']):>6,}")
print(f" Accounts: {len(dataset['accounts']):>6,}")
print(f" Transactions: {len(dataset['transactions']):>6,}")
if __name__ == "__main__":
main()