import streamlit as st import pandas as pd import numpy as np import matplotlib.pyplot as plt import datetime import traceback import io from src.config import cfg from src.utils.helpers import load_employees_from_json, check_hours_balance, minutes_to_time from src.utils.visualization import get_final_coverage_matrix from src.utils.health import calculate_ibe, interpret_ibe, analyze_convergence_quality from src.utils.demand_processing import sanitize_weekly_demand from src.utils.generator import generate_scenario_files from src.utils.hf_storage import load_json, save_json, list_activities from src.problems.my_problem import process_demand from src.engine.evolution import run_genetic_algorithm # --- SETUP INTERFACCIA --- st.set_page_config(page_title="AI Workforce Scheduler", layout="wide", page_icon="🧬") # ============================================================================== # 1. ROUTING E GESTIONE WORKSPACE (SIDEBAR) # ============================================================================== st.sidebar.title("🏢 Seleziona Attività") # Recupero dinamicamente la lista dei workspace dal repository remoto (HF Datasets) available_activities = list_activities() if not available_activities: st.warning("⚠️ Nessuna attività trovata nel Dataset remoto.") st.sidebar.info("Utilizza il Generatore per istanziare un nuovo workspace.") st.stop() selected_activity = st.sidebar.selectbox("Attività da gestire:", available_activities) st.title(f"Gestione Turni: {selected_activity}") st.markdown("Pianificazione intelligente dei turni di lavoro tramite intelligenza artificiale evolutiva.") # Gestione dello stato di sessione: se l'utente cambia attività, forzo il ricaricamento # del Singleton di configurazione e pulisco la cache dei risultati precedenti. if 'current_activity' not in st.session_state or st.session_state['current_activity'] != selected_activity: try: cfg.load_configurations(selected_activity) st.session_state['current_activity'] = selected_activity if 'ga_results' in st.session_state: del st.session_state['ga_results'] except Exception as e: st.error(f"Errore caricamento config per {selected_activity}: {e}") if st.sidebar.button("♻️ Ricarica App"): st.cache_data.clear() st.rerun() # ============================================================================== # 2. VIEWPORT & TAB ROUTING # ============================================================================== tab1, tab2, tab3, tab4, tab5 = st.tabs(["⚙️ Configurazione Attività", "👥 Dipendenti", "📈 Domanda (Fabbisogno)", "🚀 Esecuzione & Risultati", "⚡ Generatore"]) # ------------------------------------------------------------------------------ # TAB 1: PARAMETRI DI SISTEMA E BUSINESS (CONFIG L2) # ------------------------------------------------------------------------------ with tab1: st.header("⚙️ Parametri Generali") config_filename = "activity_config.json" config_data = load_json(selected_activity, config_filename) if config_data: client_settings = config_data.get('client_settings', {}) curr_slot = client_settings.get('planning_slot_minutes', 30) # --- 1. SETUP GRIGLIA TEMPORALE --- st.subheader("🗓️ Calendario Operativo") st.caption("Definisci la granularità della pianificazione e gli orari di apertura del servizio.") new_slot = st.selectbox( "Granularità Pianificazione (minuti)", options=[15, 30, 60], index=[15, 30, 60].index(curr_slot) if curr_slot in [15, 30, 60] else 1, key=f"{selected_activity}_slot_select" ) with st.expander("Modifica Orari di Apertura/Chiusura Settimanali"): st.info("I turni generati dall'algoritmo non supereranno mai i limiti impostati qui. Seleziona 'Chiuso' per impedire la pianificazione in un giorno specifico.") # Costruisco i form per le regole orarie. Uso un dict temporaneo per raccogliere gli input. day_names = ["Lunedì", "Martedì", "Mercoledì", "Giovedì", "Venerdì", "Sabato", "Domenica"] user_schedule = {} current_hours = config_data.get('operating_hours', {}) default_rule = current_hours.get('default', "09:00-18:00") exceptions = current_hours.get('exceptions', {}) for i, day_name in enumerate(day_names): day_idx_str = str(i) rule = exceptions.get(day_idx_str, default_rule) is_closed_init = (rule == "CLOSED") start_init, end_init = datetime.time(8, 0), datetime.time(20, 0) if not is_closed_init: try: s_str, e_str = rule.split('-') sh, sm = map(int, s_str.split(':')) eh, em = map(int, e_str.split(':')) start_init, end_init = datetime.time(sh, sm), datetime.time(eh, em) except: pass c1, c2, c3, c4 = st.columns([1, 1, 1, 1]) c1.markdown(f"**{day_name}**") unique_key = f"{selected_activity}_day_{i}" is_closed = c2.checkbox("Chiuso", value=is_closed_init, key=f"{unique_key}_closed") start_t = c3.time_input("Apertura", value=start_init, key=f"{unique_key}_start", disabled=is_closed, step=900) end_t = c4.time_input("Chiusura", value=end_init, key=f"{unique_key}_end", disabled=is_closed, step=900) user_schedule[i] = {"closed": is_closed, "start": start_t, "end": end_t} st.divider() # --- 2. PESI DELLA LOSS FUNCTION --- st.subheader("⚖️ Obiettivi di Business (Pesi)") st.caption("Istruisci l'algoritmo su cosa è più importante. Valori più alti indicano una priorità maggiore.") weights = config_data.get('weights', {}) wk = f"{selected_activity}_weights" # Espongo i pesi per permettere il fine-tuning degli obiettivi di business direttamente da UI col_w1, col_w2 = st.columns(2) with col_w1: new_under = st.number_input("Peso Understaffing (Evita Buchi)", value=weights.get('understaffing', 1000.0), step=100.0, key=f"{wk}_under") new_over = st.number_input("Peso Overstaffing (Evita Eccessi)", value=weights.get('overstaffing', 10.0), step=5.0, key=f"{wk}_over") with col_w2: new_homo = st.number_input("Peso Equità (Bilancia i Carichi)", value=weights.get('homogeneity', 20.0), step=5.0, key=f"{wk}_homo") new_soft = st.number_input("Peso Preferenze (Accontenta gli Operatori)", value=weights.get('soft_preference', 50.0), step=5.0, key=f"{wk}_soft") # --- 3. HYPER-PARAMETRI DEL MOTORE GENETICO --- st.subheader("🧬 Motore Genetico") st.caption("Configura le prestazioni dell'intelligenza artificiale.") gen_params = config_data.get('genetic_params', {}) c_gen1, c_gen2 = st.columns(2) new_pop = c_gen1.number_input("Popolazione (Soluzioni per generazione)", value=gen_params.get('population_size', 500), step=100) new_gen = c_gen2.number_input("Generazioni (Cicli di apprendimento)", value=gen_params.get('generations', 200), step=50) # Nascondo i parametri avanzati dell'engine JIT in un expander per mantenere la UI pulita with st.expander("🔧 Parametri Avanzati Algoritmo (Fine Tuning)"): st.warning("⚠️ Modifica questi valori solo se sai cosa stai facendo. I default sono ottimizzati per scenari BPO standard. Valori errati possono bloccare l'algoritmo.") ac1, ac2, ac3 = st.columns(3) p_mut = ac1.slider("Mutation Rate", 0.0, 1.0, gen_params.get('mutation_rate', 0.4)) p_cross = ac2.slider("Crossover Rate", 0.0, 1.0, gen_params.get('crossover_rate', 0.85)) p_elite = ac3.number_input("Elitism Rate (Protezione Migliori)", 0.0, 0.5, gen_params.get('elitism_rate', 0.02), step=0.01, format="%.2f") st.markdown("---") bc1, bc2, bc3 = st.columns(3) p_tourn = bc1.number_input("Tournament Size", 2, 20, gen_params.get('tournament_size', 5)) p_heur = bc2.slider("Heuristic Init Rate (Partenza Intelligente)", 0.0, 1.0, gen_params.get('heuristic_rate', 0.8)) p_noise = bc3.number_input("Heuristic Noise (Rumore iniziale)", 0.0, 1.0, gen_params.get('heuristic_noise', 0.2), step=0.1) st.markdown("---") p_split = st.slider("Guided Mutation Split (Swap Giorno vs Cambio Orario)", 0.0, 1.0, gen_params.get('guided_mutation_split', 0.4)) st.markdown("---") st.subheader("🧪 Analisi Pressione Evolutiva (IBE)") # Calcolo live dell'IBE (il mio indicatore custom) per dare un feedback # immediato sulla bontà dei parametri genetici scelti dall'utente. curr_ibe = calculate_ibe( pop_size=int(new_pop), generations=int(new_gen), p_cross=p_cross, p_mut=p_mut, p_heur=p_heur, p_elite=p_elite ) status_msg, delta_color = interpret_ibe(curr_ibe) hc1, hc2, hc3 = st.columns([1, 1.5, 1]) hc1.metric(label="IBE Score", value=f"{curr_ibe:,.0f}".replace(",", "."), delta="Target: 1k-3k", delta_color=delta_color) if "OTTIMALE" in status_msg: hc2.success(f"**Stato:** {status_msg}") elif "STALLO" in status_msg: hc2.warning(f"**Stato:** {status_msg}") else: hc2.error(f"**Stato:** {status_msg}") with hc3.expander("Cos'è l'IBE?"): st.caption(""" **Indice di Bilanciamento Evolutivo (IBE)** È un indicatore di salute delle impostazioni che hai inserito. Misura l'equilibrio tra la capacità dell'AI di esplorare nuove soluzioni (Mutazioni) e la tendenza a sfruttare quelle già trovate (Euristiche/Elitismo). - **Troppo basso:** L'AI si "accontenta" subito della prima soluzione mediocre trovata. - **Troppo alto:** L'AI continua a cercare a caso senza mai focalizzarsi su un piano stabile. """) # --- 4. PERSISTENZA I/O --- st.divider() if st.button("💾 Calcola e Salva Configurazione", type="primary"): # Costruisco la mappa delle regole daily_rules_map = {} min_h, max_h = 24, 0 for i in range(7): d = user_schedule[i] if d['closed']: daily_rules_map[str(i)] = "CLOSED" else: s_str, e_str = d['start'].strftime("%H:%M"), d['end'].strftime("%H:%M") daily_rules_map[str(i)] = f"{s_str}-{e_str}" if d['start'].hour < min_h: min_h = d['start'].hour if d['end'].hour > max_h: max_h = d['end'].hour + (1 if d['end'].minute > 0 else 0) # Trick per ottimizzare il payload JSON: calcolo l'orario più frequente # e lo imposto come 'default', salvando gli altri giorni come 'exceptions'. from collections import Counter vals = list(daily_rules_map.values()) most_common = Counter(vals).most_common(1)[0][0] final_hours = {"default": most_common, "exceptions": {}} for k, v in daily_rules_map.items(): if v != most_common: final_hours["exceptions"][k] = v # Aggiornamento dell'oggetto configurazione if 'client_settings' not in config_data: config_data['client_settings'] = {} config_data['client_settings']['planning_slot_minutes'] = new_slot config_data['client_settings']['day_start_hour'] = int(min_h) if min_h < 24 else 8 config_data['client_settings']['day_end_hour'] = int(max_h) if max_h > 0 else 20 config_data['operating_hours'] = final_hours config_data['weights'] = {"understaffing": new_under, "overstaffing": new_over, "homogeneity": new_homo, "soft_preference": new_soft} config_data['genetic_params'] = { "population_size": int(new_pop), "generations": int(new_gen), "mutation_rate": p_mut, "crossover_rate": p_cross, "elitism_rate": p_elite, "tournament_size": int(p_tourn), "heuristic_rate": p_heur, "heuristic_noise": p_noise, "guided_mutation_split": p_split } # Push sul cloud save_json(selected_activity, config_filename, config_data) cfg.load_configurations(selected_activity) st.success("✅ Configurazione salvata e sincronizzata con successo.") else: st.error("Payload di configurazione mancante dal Dataset.") # ------------------------------------------------------------------------------ # TAB 2: ANAGRAFICA E VINCOLI (HR MASTER DATA) # ------------------------------------------------------------------------------ with tab2: emp_filename = "employees.json" emp_data = load_json(selected_activity, emp_filename) col_header, col_pie = st.columns([3, 1]) with col_header: st.header("👥 Gestione Personale") st.caption("Gestisci i profili contrattuali, le regole di fairness settimanale e inserisci ferie, permessi o vincoli di orario.") if emp_data is not None: with col_pie: # Rendering del mix contrattuale. Uso matplotlib con patch alpha=0.0 # per avere uno sfondo trasparente che si adatti al tema di Streamlit. df_stats = pd.DataFrame(emp_data) if not df_stats.empty and 'contract' in df_stats.columns: counts = df_stats['contract'].value_counts() fig_pie, ax_pie = plt.subplots(figsize=(1.5, 1.5)) colors = ['#3498db', '#e74c3c', '#f1c40f', '#9b59b6'] wedges, texts, autotexts = ax_pie.pie( counts, autopct='%1.0f%%', startangle=90, colors=colors[:len(counts)], textprops={'fontsize': 5, 'weight': 'bold', 'color': 'white'}, pctdistance=0.7 ) ax_pie.axis('equal') fig_pie.patch.set_alpha(0.0) leg = ax_pie.legend( wedges, counts.index, title="Contratti", loc="center left", bbox_to_anchor=(1, 0, 0.5, 1), fontsize=5, title_fontsize=6, frameon=False, labelcolor='white' ) leg.get_title().set_color("white") leg.get_title().set_fontweight("bold") buf_pie = io.BytesIO() fig_pie.savefig(buf_pie, format="png", bbox_inches="tight", transparent=True, dpi=150) buf_pie.seek(0) st.image(buf_pie) st.divider() col_list, col_editor = st.columns([1, 2]) with col_list: st.subheader("Anagrafica Risorse") emp_ids = [e['id'] for e in emp_data] selected_id = st.selectbox("Seleziona il Dipendente da ispezionare:", emp_ids, index=0 if emp_ids else None) st.divider() if st.button("💾 Salva Modifiche Anagrafica", type="primary"): save_json(selected_activity, emp_filename, emp_data) st.success("Anagrafica aggiornata in Cloud.") # Costruisco la preview tabellare summary = [] for e in emp_data: mix = e.get('shift_mix', {"WORK": 5, "OFF": 2}) summary.append({"ID": e['id'], "Contratto": e['contract'], "Mix": f"{mix.get('WORK',5)}W/{mix.get('OFF',2)}O"}) st.dataframe(summary, hide_index=True, width='stretch') with col_editor: # Editor del singolo dipendente: permette di iniettare override # hard/soft/absence direttamente sull'oggetto prima del salvataggio. if selected_id: emp_record = next((e for e in emp_data if e['id'] == selected_id), None) if emp_record: st.subheader(f"✏️ Proprietà: {emp_record['id']}") c1, c2 = st.columns(2) emp_record['id'] = c1.text_input("ID Dipendente", value=emp_record['id']) ct = emp_record.get('contract', 'FT40') ct_idx = ["FT40", "PT30", "PT20"].index(ct) if ct in ["FT40", "PT30", "PT20"] else 0 emp_record['contract'] = c2.selectbox("Tipologia Contratto", ["FT40", "PT30", "PT20"], index=ct_idx) cc1, cc2 = st.columns(2) emp_record['work_hours'] = cc1.number_input("Ore Lavorative Giornaliere", value=float(emp_record.get('work_hours', 8.0))) emp_record['break_duration'] = cc2.number_input("Minuti di Pausa/Pranzo", value=int(emp_record.get('break_duration', 0))) st.markdown("---") st.subheader("📅 Regole di Fairness Settimanale") st.caption("Imposta quanti giorni questa risorsa deve lavorare rispetto a quanti giorni deve riposare nella settimana.") curr_mix = emp_record.get('shift_mix', {"WORK": 5, "OFF": 2}) cm1, cm2 = st.columns(2) w_days = cm1.number_input("Target Giorni Lavorativi", min_value=1, max_value=7, value=int(curr_mix.get("WORK", 5))) o_days = cm2.number_input("Target Giorni di Riposo", min_value=0, max_value=6, value=int(curr_mix.get("OFF", 2))) emp_record['shift_mix'] = {"WORK": w_days, "OFF": o_days} st.info(f"L'algoritmo cercherà in tutti i modi di programmare esattamente {w_days} giorni di lavoro e {o_days} di riposo. Le violazioni verranno penalizzate nel calcolo finale.") st.markdown("---") st.subheader("🔒 Vincoli Operativi (Assenze e Permessi)") st.caption("Aggiungi ferie, malattie o turni fissi inamovibili.") constraints = emp_record.get('constraints', {}) day_map = {0: "Lunedì", 1: "Martedì", 2: "Mercoledì", 3: "Giovedì", 4: "Venerdì", 5: "Sabato", 6: "Domenica"} if constraints: cons_view = [] for d, r in constraints.items(): cons_view.append({"Giorno": day_map.get(int(d), d), "Tipo": r['type'].upper(), "Valore/Motivo": r.get('start_time', r.get('reason',''))}) st.table(pd.DataFrame(cons_view)) to_del = st.selectbox("Seleziona Giorno da sbloccare", options=list(constraints.keys()), format_func=lambda x: day_map.get(int(x), x)) if st.button("🗑️ Rimuovi Vincolo"): del emp_record['constraints'][to_del] st.rerun() with st.expander("➕ Aggiungi una nuova regola per questo dipendente"): ac1, ac2 = st.columns(2) add_d = ac1.selectbox("Giorno della settimana", range(7), format_func=lambda x: day_map[x]) add_t = ac2.selectbox("Tipologia di regola", ["absence", "hard", "soft"], format_func=lambda x: "Assenza" if x=="absence" else ("Turno Obbligato (Hard)" if x=="hard" else "Preferenza Oraria (Soft)")) new_rule = {"type": add_t} if add_t == "absence": new_rule["reason"] = st.selectbox("Motivo Assenza", ["FERIE", "MALATTIA", "PERMESSO"]) else: t_val = st.time_input("Orario di inzio desiderato").strftime("%H:%M") new_rule["start_time"] = t_val if st.button("Conferma Inserimento"): if 'constraints' not in emp_record: emp_record['constraints'] = {} emp_record['constraints'][str(add_d)] = new_rule st.rerun() else: st.error("Payload anagrafico mancante o corrotto.") # ------------------------------------------------------------------------------ # TAB 3: DEMAND TIME-SERIES (FABBISOGNO) # ------------------------------------------------------------------------------ with tab3: st.header("📈 Time-Series Fabbisogno Operativo") st.caption("Visualizza e modifica la curva di traffico o il numero di operatori richiesti per ogni frazione oraria.") demand_filename = "demand.json" conf = load_json(selected_activity, "activity_config.json") raw_demand = load_json(selected_activity, demand_filename) if conf: sett = conf.get('client_settings', {}) start_h = sett.get('day_start_hour', 8) end_h = sett.get('day_end_hour', 20) slot_min = sett.get('planning_slot_minutes', 30) current_conf_for_alignment = {'client_settings': {'day_start_hour': start_h, 'day_end_hour': end_h, 'planning_slot_minutes': slot_min}} total_min = (end_h - start_h) * 60 num_slots = int(total_min / slot_min) # Allineamento dinamico della time-series: gestisco i cambi di granularità oraria # troncando o paddando la matrice tramite l'helper apposito. if raw_demand: sanitized_list = sanitize_weekly_demand(raw_demand, current_conf_for_alignment) target_demand = np.array(sanitized_list) else: target_demand = np.ones((7, num_slots), dtype=int) * 5 days = ["Lun", "Mar", "Mer", "Gio", "Ven", "Sab", "Dom"] time_labels = [] curr_m = start_h * 60 end_m = end_h * 60 while curr_m < end_m: h = int(curr_m // 60) m = int(curr_m % 60) time_labels.append(f"{h:02d}:{m:02d}") curr_m += slot_min x = np.arange(len(time_labels)) day_idx = st.selectbox("Ispeziona Giorno:", range(7), format_func=lambda x: days[x]) # Rendering vettoriale del profilo di carico fig, ax = plt.subplots(figsize=(10, 3)) daily_curve = target_demand[day_idx] ax.plot(x, daily_curve, color='#e74c3c', linestyle='--', marker='o', markersize=3, label="Target Staff (Richiesto)") ax.fill_between(x, 0, daily_curve, color='#e74c3c', alpha=0.1) step_x = max(1, len(x) // 15) ax.set_xticks(x[::step_x]) ax.set_xticklabels(time_labels[::step_x], rotation=45, fontsize=8) ax.set_title(f"Profilo di Carico: {days[day_idx]}") ax.grid(True, linestyle='--', alpha=0.3) ax.legend() buf_demand = io.BytesIO() fig.savefig(buf_demand, format="png", bbox_inches="tight", transparent=False, dpi=150) buf_demand.seek(0) st.image(buf_demand) plt.close(fig) st.divider() # Uso il data_editor nativo di Streamlit per permettere l'override manuale # della demand curva direttamente in UI, molto comodo per i planner. st.subheader("✏️ Override Manuale (Modifica Volumi)") st.caption("Fai doppio clic su una cella della tabella per alterare manualmente il numero di operatori richiesti.") df_demand = pd.DataFrame(target_demand, index=days, columns=time_labels) edited_df = st.data_editor(df_demand, width='stretch', height=300) if st.button("💾 Salva Modifiche Curva"): final_json_structure = [] for i, row in enumerate(edited_df.values): row_list = [f"Giorno_{i}"] + row.tolist() final_json_structure.append(row_list) save_json(selected_activity, "demand.json", final_json_structure) st.success("✅ Fabbisogno aggiornato e sincronizzato.") st.rerun() else: st.warning("Impossibile effettuare il render: payload mancante.") # ------------------------------------------------------------------------------ # TAB 4: MOTORE DI OTTIMIZZAZIONE # ------------------------------------------------------------------------------ with tab4: st.header("🚀 Motore di Ottimizzazione") st.caption("Avvia l'AI per calcolare l'incastro dei turni migliore in base ai parametri che hai inserito.") col_run, col_stat = st.columns([1, 2]) with col_run: run_btn = st.button("✨ AVVIA IL CALCOLO DEI TURNI", type="primary") if run_btn: prog_bar = st.progress(0) status_text = st.empty() # Callback passata all'engine genetico per aggiornare l'interfaccia # asincronamente durante i pesanti cicli for loop su Numba. def ui_callback(gen, tot, score, div): prog_bar.progress(gen / tot) status_text.markdown(f"🧬 Elaborazione in corso... Generazione: **{gen}/{tot}** | Punteggio Penalità: **{score:.0f}** | Esplorazione: **{div:.2f}%**" ) with st.spinner("Compilazione codice macchina (JIT) e calcolo in corso. Potrebbe volerci qualche minuto..."): try: current_act = st.session_state.get('current_activity') if not current_act: raise ValueError("Nessun contesto operativo attivo.") employees = load_employees_from_json(current_act) raw_d = load_json(selected_activity, "demand.json") target = process_demand(raw_d) # Applichiamo le regole di business (le chiusure impostate in L2 config) # azzerando forzatamente la domanda oraria per non far schedulare turni. for d in range(7): closing_slot = cfg.get_closing_slot(d) if cfg.is_day_closed(d) or closing_slot == 0: target[d, :] = 0 else: if closing_slot < target.shape[1]: target[d, closing_slot:] = 0 hours_diff = check_hours_balance(employees, target) if hours_diff >= 0: st.success(f"✅ Controllo Preliminare Superato: Lo staff disponibile copre matematicamente le ore richieste (Surplus: {hours_diff:.1f}h).") else: st.error(f"⚠️ Attenzione - Sotto-dimensionamento Strutturale: Hai chiesto più ore di quelle contrattualizzate. Verranno generati dei buchi inevitabili (Deficit: {abs(hours_diff):.1f}h).") # Esecuzione del kernel genetico core top_solutions, div_history = run_genetic_algorithm(employees, target, progress_callback=ui_callback) final_pop_sample = np.array([sol['schedule'] for sol in top_solutions]) final_diversity = div_history[-1] if div_history else 0.0 # Caching dell'output generato nell'oggetto di sessione. # Evita di perdere i risultati (o triggerare ricalcoli) se cambio tab. st.session_state['ga_results'] = { 'top_solutions': top_solutions, 'diversity_score': final_diversity, 'diversity_history': div_history, 'selected_idx': 0, 'employees': employees, 'target': target } best_s = top_solutions[0]['total_score'] status_text.success(f"Ottimizzazione conclusa con successo! Miglior punteggio di penalità raggiunto: {best_s:.0f}") except Exception as e: st.error(f"Errore di sistema durante il calcolo: {e}") traceback.print_exc() # Blocco di visualizzazione post-run if 'ga_results' in st.session_state: res = st.session_state['ga_results'] solutions = res['top_solutions'] div_hist = res.get('diversity_history', [res.get('diversity_score', 0.0)]) final_div = div_hist[-1] # Diagnostica algoritmica automatizzata (Controlla il drop-rate della diversità) msg, color_code = analyze_convergence_quality(div_hist) st.markdown("### 🩺 Diagnostica e Validazione Scientifica") kpi1, kpi2 = st.columns([1, 3]) kpi1.metric("Diversità Genetica Finale", f"{final_div:.2f}%") if color_code == "success": kpi2.success(msg) elif color_code == "normal": kpi2.info(msg) else: kpi2.error(msg) with st.expander("Cos'è la Diversità e come interpretarla?"): st.caption(""" La **Diversità** indica quante soluzioni "diverse" l'algoritmo stava ancora testando alla fine del processo. - **Se è >40%:** L'AI non è riuscita a trovare un pattern vincente e ha continuato a sparare a caso (aumenta le Generazioni o diminuisci la Mutazione). - **Se crolla subito a <5% (Convergenza Prematura):** L'AI si è "incastrata" su una soluzione mediocre e ha smesso di cercare (aumenta la Mutazione). - **Se scende gradualmente (Matura):** È lo stato ideale. L'AI ha esplorato bene e poi ha "stretto" verso la soluzione perfetta. """) if div_hist: st.subheader("📉 Profilo Dinamico dell'Apprendimento") fig_div, ax_div = plt.subplots(figsize=(10, 3)) x_axis = [i * 5 for i in range(len(div_hist))] ax_div.plot(x_axis, div_hist, color='#2980b9', linewidth=2, label='Varianza di Popolazione (%)') ax_div.axhspan(0, 5, color='#e74c3c', alpha=0.1, label='Rischio Collasso (<5%)') ax_div.axhspan(40, 100, color='#e67e22', alpha=0.1, label='Rischio Divergenza (>40%)') ax_div.axhspan(5, 40, color='#2ecc71', alpha=0.1, label='Fascia Ottimale') ax_div.set_ylabel("Hamming Dist (%)") ax_div.set_xlabel("Epoche di Addestramento") ax_div.set_ylim(0, max(50, max(div_hist) + 5)) ax_div.grid(True, linestyle='--', alpha=0.5) ax_div.legend(loc='upper right', fontsize='small') buf_div = io.BytesIO() fig_div.savefig(buf_div, format="png", bbox_inches="tight", transparent=False, dpi=150) buf_div.seek(0) st.image(buf_div) plt.close(fig_div) with st.expander("Come leggere questo grafico?"): st.caption(""" Questo grafico racconta visivamente il lavoro dell'algoritmo: 1. **Fase Iniziale (Esplorazione):** Il grafico deve partire alto (fuori dal rosso basso). L'AI sta provando incastri creativi. 2. **Discesa (Sfruttamento):** La curva deve scendere dolcemente verso il basso. 3. **Atterraggio:** La curva dovrebbe stabilizzarsi nella **Fascia Verde Ottimale**. Se vedi crolli verticali improvvisi all'inizio, c'è un problema di configurazione nei parametri genetici. """) st.divider() st.subheader("🏆 Esplorazione delle Migliori Soluzioni Trovate") st.caption("L'algoritmo ti propone le varianti più performanti. Lo SCORE TOTALE è la somma delle penalità (più è basso, meglio è).") comp_data = [] for i, s in enumerate(solutions): comp_data.append({ "Candidato": f"Soluzione #{i+1}", "SCORE TOTALE (Penalità)": int(s['total_score']), "❌ Understaffing (Buchi)": int(s['understaffing']), "⚖️ Inequità Weekend": int(s['equity']), "⚡ Overstaffing (Eccessi)": int(s['overstaffing']), "🎨 Pref. Ignorate": int(s['soft_preferences']), "📝 Mix Contratti Violato": int(s['contract']) }) df_comp = pd.DataFrame(comp_data) st.dataframe( df_comp.style.background_gradient(cmap="RdYlGn_r", subset=["SCORE TOTALE (Penalità)", "❌ Understaffing (Buchi)"]), width='stretch', hide_index=True ) sel_opt = st.radio("Seleziona quale piano turni visualizzare in dettaglio:", options=range(len(solutions)), format_func=lambda x: f"Apri Dettaglio Soluzione #{x+1}", horizontal=True, index=st.session_state.get('selected_idx', 0)) st.session_state['ga_results']['selected_idx'] = sel_opt chosen_sol = solutions[sel_opt] best_sched = chosen_sol['schedule'] emps = res['employees'] tgt = res['target'] st.subheader(f"Dashboard Copertura: Soluzione #{sel_opt+1}") st.caption("Confronto visivo tra le persone richieste (linea rossa tratteggiata) e le persone messe a turno (area blu).") d_tabs = st.tabs(["Lunedì", "Martedì", "Mercoledì", "Giovedì", "Venerdì", "Sabato", "Domenica"]) # Proietto il genoma elaborato (best_sched) sulla matrice di copertura per le charts cov_mat = get_final_coverage_matrix(best_sched, emps) for i, t in enumerate(d_tabs): with t: fig, ax = plt.subplots(figsize=(8, 2)) x = range(cfg.daily_slots) ax.fill_between(x, cov_mat[i], alpha=0.3) ax.plot(x, cov_mat[i], label="Staff Schedulato") ax.plot(x, tgt[i], 'r--', label="Staff Richiesto (Target)") tick_step = 4 lbls = [minutes_to_time(k * cfg.system_slot_minutes) for k in x[::tick_step]] ax.set_xticks(list(x)[::tick_step]) ax.set_xticklabels(lbls, rotation=0, fontsize=4) ax.legend() buf_day = io.BytesIO() fig.savefig(buf_day, format="png", bbox_inches="tight", transparent=False, dpi=150) buf_day.seek(0) st.image(buf_day) plt.close(fig) st.subheader("Tabellone Turni (Export)") data_rows = [] days = ["Lun", "Mar", "Mer", "Gio", "Ven", "Sab", "Dom"] for idx, e in enumerate(emps): row = {"Dipendente": e['id']} for d in range(7): s = best_sched[idx, d] if s == -1: txt = "OFF" elif s == -2: txt = "ABS" else: start = s * cfg.system_slot_minutes end = start + (e['shift_len'] * cfg.system_slot_minutes) txt = f"{minutes_to_time(start)}-{minutes_to_time(end)}" row[days[d]] = txt data_rows.append(row) st.dataframe(pd.DataFrame(data_rows), width='stretch') st.markdown("---") st.subheader("🔬 Ispezione Micro-Turno (Maschere VDT e Pause)") st.caption("Verifica la corretta allocazione delle pause VDT all'interno dello spezzato del singolo operatore.") # Micro-rendering della maschera binaria per l'ispezione visiva dei sub-slot c1, c2 = st.columns(2) sel_emp = c1.selectbox("Seleziona Operatore", [e['id'] for e in emps]) sel_day = c2.selectbox("Seleziona Giorno", range(7), format_func=lambda x: days[x]) e_idx = next(i for i,e in enumerate(emps) if e['id'] == sel_emp) s_start = best_sched[e_idx, sel_day] if s_start >= 0: mask = emps[e_idx]['mask'] html = "" for k, bit in enumerate(mask): t_str = minutes_to_time((s_start + k) * cfg.system_slot_minutes) col = "#4CAF50" if bit else "#FF5252" html += f"
{t_str}
" st.markdown(html, unsafe_allow_html=True) st.caption("**Legenda:** [Verde] = Operatività a Terminale | [Rosso] = Pausa/Pranzo") else: st.info("Status per il giorno selezionato: RIPOSO o ASSENTE.") # ------------------------------------------------------------------------------ # TAB 5: BOOTSTRAPPING DI MOCK SCENARIOS (GENERATORE) # ------------------------------------------------------------------------------ with tab5: st.header("⚡ Generatore Ambienti di Test (Mock Scenarios)") st.markdown(""" Crea rapidamente nuovi scenari completi per testare come il motore AI reagisce a diverse composizioni della forza lavoro (es. alta rigidità vs alta flessibilità). """) col_gen_L, col_gen_R = st.columns([1, 2]) with col_gen_L: st.subheader("1. Setup Spazio Dati") new_scenario_name = st.text_input("Nome del nuovo Scenario", value="Nuovo_Test_BPO") new_emp_count = st.number_input("Numero Dipendenti Fittizi", min_value=1, max_value=2000, value=300, step=10) st.markdown("---") st.subheader("📊 Modello Matematico del Fabbisogno") st.caption("Scegli una distribuzione che simuli fedelmente il traffico del servizio.") curve_options = { "Doppia Campana (Tipico BPO Voice)": "double_bell", "Campana Centrale (Es. Delivery/Pausa Pranzo)": "single_bell_center", "Picco Mattutino (Es. Helpdesk IT)": "morning_peak", "Piatto Costante (Es. Backoffice/Data Entry)": "steady_high" } selected_curve_label = st.selectbox("Seleziona Modello di Carico:", options=list(curve_options.keys()), index=0) curve_key = curve_options[selected_curve_label] st.caption("Anteprima Forma:") # Anteprima visiva matematica della distribuzione scelta preview_x = np.linspace(8, 22, 50) if curve_key == "double_bell": preview_y = np.exp(-((preview_x - 11)**2)/4) + np.exp(-((preview_x - 16)**2)/4) elif curve_key == "single_bell_center": preview_y = np.exp(-((preview_x - 13)**2)/9) elif curve_key == "morning_peak": preview_y = np.exp(-((preview_x - 9.5)**2)/5) else: preview_y = np.ones_like(preview_x) * 0.8 fig_curve_prev, ax_cp = plt.subplots(figsize=(4, 1.5)) ax_cp.plot(preview_x, preview_y, color='#2ecc71', lw=2) ax_cp.fill_between(preview_x, preview_y, color='#2ecc71', alpha=0.2) ax_cp.set_yticks([]) ax_cp.set_xticks([8, 12, 16, 20]) ax_cp.set_xlim(8, 22) fig_curve_prev.patch.set_alpha(0.0) ax_cp.patch.set_alpha(0.0) buf_curve = io.BytesIO() fig_curve_prev.savefig(buf_curve, format="png", bbox_inches="tight", transparent=False, dpi=150) buf_curve.seek(0) st.image(buf_curve) plt.close(fig_curve_prev) with col_gen_R: st.subheader("2. Strategia HR (Mix Contrattuale)") st.caption("Simula il livello di flessibilità del personale.") pct_ft40 = st.slider("🔵 Full Time (8h - Alta rigidità)", 0, 100, 60) pct_pt30 = st.slider("🟡 Part Time (6h - Media flessibilità)", 0, 100, 20) remaining = max(0, 100 - (pct_ft40 + pct_pt30)) pct_pt20 = st.slider("🔴 Part Time (4h - Alta flessibilità)", 0, 100, remaining) total_mix = pct_ft40 + pct_pt30 + pct_pt20 if total_mix != 100: st.warning(f"⚠️ La somma deve essere 100%. Attuale: {total_mix}%.") else: st.success("✅ Composizione valida.") fig_preview, ax_prev = plt.subplots(figsize=(3, 1.5)) data_prev, labels_prev, colors_prev = [pct_ft40, pct_pt30, pct_pt20], ['FT 8h', 'PT 6h', 'PT 4h'], ['#3498db', '#f1c40f', '#e74c3c'] d_clean, l_clean, c_clean = zip(*[(d, l, c) for d, l, c in zip(data_prev, labels_prev, colors_prev) if d > 0]) wedges, texts, autotexts = ax_prev.pie(d_clean, labels=None, colors=c_clean, autopct='%1.0f%%', textprops={'color':"white", 'fontsize': 8, 'weight': 'bold'}, pctdistance=0.5) ax_prev.axis('equal') fig_preview.patch.set_alpha(0.0) leg = ax_prev.legend(wedges, l_clean, loc="center left", bbox_to_anchor=(1, 0, 0.5, 1), frameon=False, labelcolor='white', fontsize=7) buf = io.BytesIO() fig_preview.savefig(buf, format="png", bbox_inches="tight", transparent=True, dpi=150) buf.seek(0) st.image(buf) st.divider() btn_col, _ = st.columns([1, 3]) if btn_col.button("🚀 Inizializza Scenario su HF", type="primary", disabled=(total_mix != 100)): if not new_scenario_name.strip(): st.error("Nome scenario non valido.") else: with st.spinner("Creazione dati fittizi e upload in corso..."): mix_dict = {'FT40': pct_ft40, 'PT30': pct_pt30, 'PT20': pct_pt20} success, msg = generate_scenario_files(new_scenario_name, new_emp_count, mix_dict, curve_key) if success: st.success(f"{msg}") st.info("🔄 Clicca su 'Ricarica App' nella barra laterale sinistra per gestire questo nuovo scenario.") else: st.error(f"Errore di sistema: {msg}")