Spaces:
Sleeping
Sleeping
File size: 5,690 Bytes
9e62f55 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 | import numpy as np
import random
from src.utils.hf_storage import list_activities, upload_new_scenario
def generate_demand_curve(slots_per_day, planning_slot, peak_staff, shape_type):
"""
Generatore di workload sintetico basato su distribuzioni Gaussiane.
Modella profili di carico tipici dei settori BPO e Operations su base slot.
"""
daily_req = []
for s in range(slots_per_day):
hour = 8 + (s * planning_slot / 60) # Offset dalle 08:00
if shape_type == "double_bell":
# Distribuzione Bimodale (M-Shape): Tipica dell'Inbound Voice BPO (Picchi 11:00 e 16:00)
val = np.exp(-((hour - 11)**2) / 4) + np.exp(-((hour - 16)**2) / 4)
val = val * 0.8 # Normalizzazione euristica
elif shape_type == "single_bell_center":
# Unimodale centrata: Tipica del settore Delivery/Food o Customer Care pausa pranzo
val = np.exp(-((hour - 13)**2) / 9)
elif shape_type == "morning_peak":
# Skewed left: Supporto Tecnico B2B o Helpdesk IT (Picco decrescente dalle 09:30)
val = np.exp(-((hour - 9.5)**2) / 5)
elif shape_type == "steady_high":
# Workload Flat: Backoffice, Data Entry o Processi Asincroni
# Iniezione di white noise per evitare un rettangolo artificiale
noise = np.random.normal(0, 0.05)
val = 0.8 + noise
else: # Fallback
val = 0.5
# Scaling del volume basato sulla capacity massima
staff_needed = int(val * peak_staff)
# Lower-bound di sicurezza: previene divisioni per zero o matrici vuote nei layer a valle
daily_req.append(max(5, staff_needed))
return daily_req
def generate_scenario_files(scenario_name, num_employees, mix_ratios, curve_shape="double_bell"):
"""
Orchestratore per il bootstrap di scenari di test.
Istanzia anagrafiche, time-series della demand e hyper-parametri standard,
pushandoli direttamente sul Data Lake (HF Dataset).
"""
# 1. State check sul repository remoto
existing_activities = list_activities()
if scenario_name in existing_activities:
return False, f"Esiste già uno scenario con nome '{scenario_name}'."
PLANNING_SLOT = 30
# 2. Iniezione Configurazione Base (Default Engine Params)
activity_conf = {
"client_settings": {
"planning_slot_minutes": PLANNING_SLOT,
"day_start_hour": 8,
"day_end_hour": 22
},
"operating_hours": {
"default": "08:00-22:00",
"exceptions": {}
},
"weights": {
"understaffing": 1000.0,
"overstaffing": 10.0,
"homogeneity": 400.0,
"soft_preference": 50.0
},
"genetic_params": {
"population_size": 1000,
"generations": 350,
"mutation_rate": 0.45,
"crossover_rate": 0.85,
"elitism_rate": 0.02,
"tournament_size": 2,
"heuristic_rate": 0.4,
"heuristic_noise": 0.5
}
}
# 3. Campionamento Anagrafica (Contract Mix)
employees = []
contracts_def = {
"FT40": {"wh": 8, "bd": 30},
"PT30": {"wh": 6, "bd": 0},
"PT20": {"wh": 4, "bd": 0}
}
contract_pool = []
for c_type, pct in mix_ratios.items():
count = int(num_employees * (pct / 100.0))
contract_pool.extend([c_type] * count)
# Padding contrattuale per gestire eventuali sfridi degli arrotondamenti percentuali
while len(contract_pool) < num_employees:
contract_pool.append("FT40")
random.shuffle(contract_pool)
for i, c_type in enumerate(contract_pool):
specs = contracts_def[c_type]
# Assegnazione probabilistica dei pattern di flessibilità settimanale (Work vs Off)
if c_type == "FT40":
mix = {"WORK": 5, "OFF": 2}
else:
mix = {"WORK": 6, "OFF": 1} if random.random() < 0.3 else {"WORK": 5, "OFF": 2}
emp = {
"id": f"User_{i:03d}_{c_type}",
"contract": c_type,
"work_hours": float(specs["wh"]),
"break_duration": specs["bd"],
"shift_mix": mix,
"constraints": {}
}
# Iniezione randomica di soft-constraints (es. preferenza oraria)
if random.random() < 0.2:
emp["constraints"]["0"] = {"type": "soft", "start_time": "09:00"}
employees.append(emp)
# 4. Generazione Time-Series della Demand
slots_per_day = int((22 - 8) * 60 / PLANNING_SLOT)
weekly_demand = []
# Baseline calcolata sul 70% della forza lavoro (forza un understaffing strutturale per sfidare il motore)
peak_staff = int(num_employees * 0.7)
base_daily_curve = generate_demand_curve(slots_per_day, PLANNING_SLOT, peak_staff, curve_shape)
for day in range(7):
# Data Augmentation: applicazione di rumore (+/- 10%) per sfasare i pattern giornalieri
daily_req_noisy = []
for val in base_daily_curve:
noise_factor = random.uniform(0.9, 1.1)
daily_req_noisy.append(int(val * noise_factor))
weekly_demand.append([f"Giorno_{day}"] + daily_req_noisy)
# 5. Pipeline I/O verso HF Hub
try:
upload_new_scenario(scenario_name, activity_conf, employees, weekly_demand)
return True, f"Scenario '{scenario_name}' inizializzato con successo (Shape: {curve_shape})."
except Exception as e:
return False, str(e) |