sap-predictive-integrity-using-RPT-1 / utils /failure_data_generator.py
amitgpt's picture
Upload 7 files
d3b385b verified
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
def generate_job_failure_data(n_samples: int = 1000, seed: int = 42) -> pd.DataFrame:
"""
Generates synthetic SAP Job failure data (TBTCO/TBTCP style).
"""
np.random.seed(seed)
records = []
job_classes = ['A', 'B', 'C']
job_names = ['Z_FIN_POST', 'Z_SALES_EXTRACT', 'Z_INV_RECON', 'Z_HR_SYNC', 'Z_MRP_RUN']
for i in range(n_samples):
job_name = np.random.choice(job_names)
job_class = np.random.choice(job_classes, p=[0.1, 0.3, 0.6])
# Features
duration_sec = np.random.gamma(shape=2, scale=300) # Avg 600s
delay_sec = np.random.exponential(scale=100)
step_count = np.random.randint(1, 15)
concurrent_jobs = np.random.randint(0, 50)
mem_usage_pct = np.random.uniform(10, 95)
cpu_load_pct = np.random.uniform(5, 90)
has_variant = np.random.choice([0, 1], p=[0.2, 0.8])
hist_fail_rate = np.random.uniform(0, 0.15)
# Non-linear risk formula
# Risk increases with high concurrency, high memory, and high delay
risk_score = (
(concurrent_jobs / 50) * 1.5 +
(mem_usage_pct / 100) * 2.0 +
(delay_sec / 500) * 1.2 +
(1 if job_class == 'A' else 0) * 0.5 +
hist_fail_rate * 5.0
)
risk_score += np.random.normal(0, 0.2)
# Determine class
if risk_score > 3.5:
status = 'Cancelled'
risk_label = 'HIGH'
elif risk_score > 2.2:
status = 'Finished' # But risky
risk_label = 'MEDIUM'
else:
status = 'Finished'
risk_label = 'LOW'
records.append({
'JOBNAME': job_name,
'JOBCOUNT': f'{i:08d}',
'JOBCLASS': job_class,
'DURATION_SEC': round(duration_sec, 1),
'DELAY_SEC': round(delay_sec, 1),
'STEP_COUNT': step_count,
'CONCURRENT_JOBS': concurrent_jobs,
'MEM_USAGE_PCT': round(mem_usage_pct, 1),
'CPU_LOAD_PCT': round(cpu_load_pct, 1),
'HAS_VARIANT': has_variant,
'HIST_FAIL_RATE': round(hist_fail_rate, 3),
'STATUS': status,
'RISK_SCORE': round(risk_score, 2),
'RISK_LABEL': risk_label
})
return pd.DataFrame(records)
def generate_transport_failure_data(n_samples: int = 1000, seed: int = 42) -> pd.DataFrame:
"""
Generates synthetic SAP Transport failure data (E070/E071 style).
"""
np.random.seed(seed)
records = []
users = ['DEV_ALAL', 'DEV_JDOE', 'DEV_BSMITH', 'DEV_KLEE']
systems = ['DEV', 'QAS', 'PRD']
for i in range(n_samples):
user = np.random.choice(users)
obj_count = np.random.randint(1, 500)
table_obj_pct = np.random.uniform(0, 0.8)
prog_obj_pct = 1.0 - table_obj_pct
cross_sys_dep = np.random.randint(0, 10)
author_success_rate = np.random.uniform(0.7, 0.99)
target_sys_load = np.random.uniform(10, 90)
network_latency = np.random.uniform(5, 200)
# Risk formula
risk_score = (
(obj_count / 500) * 2.0 +
table_obj_pct * 1.5 +
cross_sys_dep * 0.5 +
(1 - author_success_rate) * 4.0 +
(target_sys_load / 100) * 1.0 +
(network_latency / 200) * 0.8
)
risk_score += np.random.normal(0, 0.3)
if risk_score > 4.0:
risk_label = 'HIGH'
result = 'Error'
elif risk_score > 2.5:
risk_label = 'MEDIUM'
result = 'Warning'
else:
risk_label = 'LOW'
result = 'Success'
records.append({
'TRKORR': f'SIDK9{i:05d}',
'AS4USER': user,
'OBJ_COUNT': obj_count,
'TABLE_OBJ_PCT': round(table_obj_pct, 3),
'PROG_OBJ_PCT': round(prog_obj_pct, 3),
'CROSS_SYS_DEP': cross_sys_dep,
'AUTHOR_SUCCESS_RATE': round(author_success_rate, 3),
'TARGET_SYS_LOAD': round(target_sys_load, 1),
'NETWORK_LATENCY': round(network_latency, 1),
'RESULT': result,
'RISK_SCORE': round(risk_score, 2),
'RISK_LABEL': risk_label
})
return pd.DataFrame(records)
def generate_interface_failure_data(n_samples: int = 1000, seed: int = 42) -> pd.DataFrame:
"""
Generates synthetic SAP Interface failure data (IDoc/RFC style).
"""
np.random.seed(seed)
records = []
msg_types = ['ORDERS', 'INVOIC', 'MATMAS', 'DEBMAS']
partners = ['CUST_A', 'VEND_B', 'SYS_X', 'EXT_Y']
for i in range(n_samples):
msg_type = np.random.choice(msg_types)
partner = np.random.choice(partners)
payload_size_kb = np.random.lognormal(mean=4, sigma=1)
queue_depth = np.random.randint(0, 1000)
partner_reliability = np.random.uniform(0.6, 0.99)
retry_count = np.random.randint(0, 5)
sys_load_idx = np.random.uniform(0.1, 0.9)
dest_availability = np.random.uniform(0.5, 1.0)
# Risk formula
risk_score = (
(payload_size_kb / 500) * 1.0 +
(queue_depth / 1000) * 2.0 +
(1 - partner_reliability) * 3.0 +
retry_count * 0.8 +
sys_load_idx * 1.5 +
(1 - dest_availability) * 2.5
)
risk_score += np.random.normal(0, 0.25)
if risk_score > 4.5:
risk_label = 'HIGH'
status = 'Error'
elif risk_score > 2.8:
risk_label = 'MEDIUM'
status = 'Warning'
else:
risk_label = 'LOW'
status = 'Success'
records.append({
'MESTYP': msg_type,
'PARTNER': partner,
'PAYLOAD_SIZE_KB': round(payload_size_kb, 1),
'QUEUE_DEPTH': queue_depth,
'PARTNER_RELIABILITY': round(partner_reliability, 3),
'RETRY_COUNT': retry_count,
'SYS_LOAD_IDX': round(sys_load_idx, 2),
'DEST_AVAILABILITY': round(dest_availability, 3),
'STATUS': status,
'RISK_SCORE': round(risk_score, 2),
'RISK_LABEL': risk_label
})
return pd.DataFrame(records)
def detect_drift(df1: pd.DataFrame, df2: pd.DataFrame, column: str) -> float:
"""
Simple drift detection using mean difference percentage.
"""
if column not in df1.columns or column not in df2.columns:
return 0.0
m1 = df1[column].mean()
m2 = df2[column].mean()
if m1 == 0: return 0.0
return abs(m1 - m2) / m1