import numpy as np import random from src.utils.hf_storage import list_activities, upload_new_scenario def generate_demand_curve(slots_per_day, planning_slot, peak_staff, shape_type): """ Generatore di workload sintetico basato su distribuzioni Gaussiane. Modella profili di carico tipici dei settori BPO e Operations su base slot. """ daily_req = [] for s in range(slots_per_day): hour = 8 + (s * planning_slot / 60) # Offset dalle 08:00 if shape_type == "double_bell": # Distribuzione Bimodale (M-Shape): Tipica dell'Inbound Voice BPO (Picchi 11:00 e 16:00) val = np.exp(-((hour - 11)**2) / 4) + np.exp(-((hour - 16)**2) / 4) val = val * 0.8 # Normalizzazione euristica elif shape_type == "single_bell_center": # Unimodale centrata: Tipica del settore Delivery/Food o Customer Care pausa pranzo val = np.exp(-((hour - 13)**2) / 9) elif shape_type == "morning_peak": # Skewed left: Supporto Tecnico B2B o Helpdesk IT (Picco decrescente dalle 09:30) val = np.exp(-((hour - 9.5)**2) / 5) elif shape_type == "steady_high": # Workload Flat: Backoffice, Data Entry o Processi Asincroni # Iniezione di white noise per evitare un rettangolo artificiale noise = np.random.normal(0, 0.05) val = 0.8 + noise else: # Fallback val = 0.5 # Scaling del volume basato sulla capacity massima staff_needed = int(val * peak_staff) # Lower-bound di sicurezza: previene divisioni per zero o matrici vuote nei layer a valle daily_req.append(max(5, staff_needed)) return daily_req def generate_scenario_files(scenario_name, num_employees, mix_ratios, curve_shape="double_bell"): """ Orchestratore per il bootstrap di scenari di test. Istanzia anagrafiche, time-series della demand e hyper-parametri standard, pushandoli direttamente sul Data Lake (HF Dataset). """ # 1. State check sul repository remoto existing_activities = list_activities() if scenario_name in existing_activities: return False, f"Esiste già uno scenario con nome '{scenario_name}'." PLANNING_SLOT = 30 # 2. Iniezione Configurazione Base (Default Engine Params) activity_conf = { "client_settings": { "planning_slot_minutes": PLANNING_SLOT, "day_start_hour": 8, "day_end_hour": 22 }, "operating_hours": { "default": "08:00-22:00", "exceptions": {} }, "weights": { "understaffing": 1000.0, "overstaffing": 10.0, "homogeneity": 400.0, "soft_preference": 50.0 }, "genetic_params": { "population_size": 1000, "generations": 350, "mutation_rate": 0.45, "crossover_rate": 0.85, "elitism_rate": 0.02, "tournament_size": 2, "heuristic_rate": 0.4, "heuristic_noise": 0.5 } } # 3. Campionamento Anagrafica (Contract Mix) employees = [] contracts_def = { "FT40": {"wh": 8, "bd": 30}, "PT30": {"wh": 6, "bd": 0}, "PT20": {"wh": 4, "bd": 0} } contract_pool = [] for c_type, pct in mix_ratios.items(): count = int(num_employees * (pct / 100.0)) contract_pool.extend([c_type] * count) # Padding contrattuale per gestire eventuali sfridi degli arrotondamenti percentuali while len(contract_pool) < num_employees: contract_pool.append("FT40") random.shuffle(contract_pool) for i, c_type in enumerate(contract_pool): specs = contracts_def[c_type] # Assegnazione probabilistica dei pattern di flessibilità settimanale (Work vs Off) if c_type == "FT40": mix = {"WORK": 5, "OFF": 2} else: mix = {"WORK": 6, "OFF": 1} if random.random() < 0.3 else {"WORK": 5, "OFF": 2} emp = { "id": f"User_{i:03d}_{c_type}", "contract": c_type, "work_hours": float(specs["wh"]), "break_duration": specs["bd"], "shift_mix": mix, "constraints": {} } # Iniezione randomica di soft-constraints (es. preferenza oraria) if random.random() < 0.2: emp["constraints"]["0"] = {"type": "soft", "start_time": "09:00"} employees.append(emp) # 4. Generazione Time-Series della Demand slots_per_day = int((22 - 8) * 60 / PLANNING_SLOT) weekly_demand = [] # Baseline calcolata sul 70% della forza lavoro (forza un understaffing strutturale per sfidare il motore) peak_staff = int(num_employees * 0.7) base_daily_curve = generate_demand_curve(slots_per_day, PLANNING_SLOT, peak_staff, curve_shape) for day in range(7): # Data Augmentation: applicazione di rumore (+/- 10%) per sfasare i pattern giornalieri daily_req_noisy = [] for val in base_daily_curve: noise_factor = random.uniform(0.9, 1.1) daily_req_noisy.append(int(val * noise_factor)) weekly_demand.append([f"Giorno_{day}"] + daily_req_noisy) # 5. Pipeline I/O verso HF Hub try: upload_new_scenario(scenario_name, activity_conf, employees, weekly_demand) return True, f"Scenario '{scenario_name}' inizializzato con successo (Shape: {curve_shape})." except Exception as e: return False, str(e)