GeneticWFM / app.py
GaetanoParente's picture
fix plot
c8c03a8
import streamlit as st
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import datetime
import traceback
import io
from src.config import cfg
from src.utils.helpers import load_employees_from_json, check_hours_balance, minutes_to_time
from src.utils.visualization import get_final_coverage_matrix
from src.utils.health import calculate_ibe, interpret_ibe, analyze_convergence_quality
from src.utils.demand_processing import sanitize_weekly_demand
from src.utils.generator import generate_scenario_files
from src.utils.hf_storage import load_json, save_json, list_activities
from src.problems.my_problem import process_demand
from src.engine.evolution import run_genetic_algorithm
# --- SETUP INTERFACCIA ---
st.set_page_config(page_title="AI Workforce Scheduler", layout="wide", page_icon="🧬")
# ==============================================================================
# 1. ROUTING E GESTIONE WORKSPACE (SIDEBAR)
# ==============================================================================
st.sidebar.title("🏢 Seleziona Attività")
# Recupero dinamicamente la lista dei workspace dal repository remoto (HF Datasets)
available_activities = list_activities()
if not available_activities:
st.warning("⚠️ Nessuna attività trovata nel Dataset remoto.")
st.sidebar.info("Utilizza il Generatore per istanziare un nuovo workspace.")
st.stop()
selected_activity = st.sidebar.selectbox("Attività da gestire:", available_activities)
st.title(f"Gestione Turni: {selected_activity}")
st.markdown("Pianificazione intelligente dei turni di lavoro tramite intelligenza artificiale evolutiva.")
# Gestione dello stato di sessione: se l'utente cambia attività, forzo il ricaricamento
# del Singleton di configurazione e pulisco la cache dei risultati precedenti.
if 'current_activity' not in st.session_state or st.session_state['current_activity'] != selected_activity:
try:
cfg.load_configurations(selected_activity)
st.session_state['current_activity'] = selected_activity
if 'ga_results' in st.session_state:
del st.session_state['ga_results']
except Exception as e:
st.error(f"Errore caricamento config per {selected_activity}: {e}")
if st.sidebar.button("♻️ Ricarica App"):
st.cache_data.clear()
st.rerun()
# ==============================================================================
# 2. VIEWPORT & TAB ROUTING
# ==============================================================================
tab1, tab2, tab3, tab4, tab5 = st.tabs(["⚙️ Configurazione Attività", "👥 Dipendenti", "📈 Domanda (Fabbisogno)", "🚀 Esecuzione & Risultati", "⚡ Generatore"])
# ------------------------------------------------------------------------------
# TAB 1: PARAMETRI DI SISTEMA E BUSINESS (CONFIG L2)
# ------------------------------------------------------------------------------
with tab1:
st.header("⚙️ Parametri Generali")
config_filename = "activity_config.json"
config_data = load_json(selected_activity, config_filename)
if config_data:
client_settings = config_data.get('client_settings', {})
curr_slot = client_settings.get('planning_slot_minutes', 30)
# --- 1. SETUP GRIGLIA TEMPORALE ---
st.subheader("🗓️ Calendario Operativo")
st.caption("Definisci la granularità della pianificazione e gli orari di apertura del servizio.")
new_slot = st.selectbox(
"Granularità Pianificazione (minuti)",
options=[15, 30, 60],
index=[15, 30, 60].index(curr_slot) if curr_slot in [15, 30, 60] else 1,
key=f"{selected_activity}_slot_select"
)
with st.expander("Modifica Orari di Apertura/Chiusura Settimanali"):
st.info("I turni generati dall'algoritmo non supereranno mai i limiti impostati qui. Seleziona 'Chiuso' per impedire la pianificazione in un giorno specifico.")
# Costruisco i form per le regole orarie. Uso un dict temporaneo per raccogliere gli input.
day_names = ["Lunedì", "Martedì", "Mercoledì", "Giovedì", "Venerdì", "Sabato", "Domenica"]
user_schedule = {}
current_hours = config_data.get('operating_hours', {})
default_rule = current_hours.get('default', "09:00-18:00")
exceptions = current_hours.get('exceptions', {})
for i, day_name in enumerate(day_names):
day_idx_str = str(i)
rule = exceptions.get(day_idx_str, default_rule)
is_closed_init = (rule == "CLOSED")
start_init, end_init = datetime.time(8, 0), datetime.time(20, 0)
if not is_closed_init:
try:
s_str, e_str = rule.split('-')
sh, sm = map(int, s_str.split(':'))
eh, em = map(int, e_str.split(':'))
start_init, end_init = datetime.time(sh, sm), datetime.time(eh, em)
except: pass
c1, c2, c3, c4 = st.columns([1, 1, 1, 1])
c1.markdown(f"**{day_name}**")
unique_key = f"{selected_activity}_day_{i}"
is_closed = c2.checkbox("Chiuso", value=is_closed_init, key=f"{unique_key}_closed")
start_t = c3.time_input("Apertura", value=start_init, key=f"{unique_key}_start", disabled=is_closed, step=900)
end_t = c4.time_input("Chiusura", value=end_init, key=f"{unique_key}_end", disabled=is_closed, step=900)
user_schedule[i] = {"closed": is_closed, "start": start_t, "end": end_t}
st.divider()
# --- 2. PESI DELLA LOSS FUNCTION ---
st.subheader("⚖️ Obiettivi di Business (Pesi)")
st.caption("Istruisci l'algoritmo su cosa è più importante. Valori più alti indicano una priorità maggiore.")
weights = config_data.get('weights', {})
wk = f"{selected_activity}_weights"
# Espongo i pesi per permettere il fine-tuning degli obiettivi di business direttamente da UI
col_w1, col_w2 = st.columns(2)
with col_w1:
new_under = st.number_input("Peso Understaffing (Evita Buchi)", value=weights.get('understaffing', 1000.0), step=100.0, key=f"{wk}_under")
new_over = st.number_input("Peso Overstaffing (Evita Eccessi)", value=weights.get('overstaffing', 10.0), step=5.0, key=f"{wk}_over")
with col_w2:
new_homo = st.number_input("Peso Equità (Bilancia i Carichi)", value=weights.get('homogeneity', 20.0), step=5.0, key=f"{wk}_homo")
new_soft = st.number_input("Peso Preferenze (Accontenta gli Operatori)", value=weights.get('soft_preference', 50.0), step=5.0, key=f"{wk}_soft")
# --- 3. HYPER-PARAMETRI DEL MOTORE GENETICO ---
st.subheader("🧬 Motore Genetico")
st.caption("Configura le prestazioni dell'intelligenza artificiale.")
gen_params = config_data.get('genetic_params', {})
c_gen1, c_gen2 = st.columns(2)
new_pop = c_gen1.number_input("Popolazione (Soluzioni per generazione)", value=gen_params.get('population_size', 500), step=100)
new_gen = c_gen2.number_input("Generazioni (Cicli di apprendimento)", value=gen_params.get('generations', 200), step=50)
# Nascondo i parametri avanzati dell'engine JIT in un expander per mantenere la UI pulita
with st.expander("🔧 Parametri Avanzati Algoritmo (Fine Tuning)"):
st.warning("⚠️ Modifica questi valori solo se sai cosa stai facendo. I default sono ottimizzati per scenari BPO standard. Valori errati possono bloccare l'algoritmo.")
ac1, ac2, ac3 = st.columns(3)
p_mut = ac1.slider("Mutation Rate", 0.0, 1.0, gen_params.get('mutation_rate', 0.4))
p_cross = ac2.slider("Crossover Rate", 0.0, 1.0, gen_params.get('crossover_rate', 0.85))
p_elite = ac3.number_input("Elitism Rate (Protezione Migliori)", 0.0, 0.5, gen_params.get('elitism_rate', 0.02), step=0.01, format="%.2f")
st.markdown("---")
bc1, bc2, bc3 = st.columns(3)
p_tourn = bc1.number_input("Tournament Size", 2, 20, gen_params.get('tournament_size', 5))
p_heur = bc2.slider("Heuristic Init Rate (Partenza Intelligente)", 0.0, 1.0, gen_params.get('heuristic_rate', 0.8))
p_noise = bc3.number_input("Heuristic Noise (Rumore iniziale)", 0.0, 1.0, gen_params.get('heuristic_noise', 0.2), step=0.1)
st.markdown("---")
p_split = st.slider("Guided Mutation Split (Swap Giorno vs Cambio Orario)", 0.0, 1.0, gen_params.get('guided_mutation_split', 0.4))
st.markdown("---")
st.subheader("🧪 Analisi Pressione Evolutiva (IBE)")
# Calcolo live dell'IBE (il mio indicatore custom) per dare un feedback
# immediato sulla bontà dei parametri genetici scelti dall'utente.
curr_ibe = calculate_ibe(
pop_size=int(new_pop), generations=int(new_gen),
p_cross=p_cross, p_mut=p_mut, p_heur=p_heur, p_elite=p_elite
)
status_msg, delta_color = interpret_ibe(curr_ibe)
hc1, hc2, hc3 = st.columns([1, 1.5, 1])
hc1.metric(label="IBE Score", value=f"{curr_ibe:,.0f}".replace(",", "."), delta="Target: 1k-3k", delta_color=delta_color)
if "OTTIMALE" in status_msg: hc2.success(f"**Stato:** {status_msg}")
elif "STALLO" in status_msg: hc2.warning(f"**Stato:** {status_msg}")
else: hc2.error(f"**Stato:** {status_msg}")
with hc3.expander("Cos'è l'IBE?"):
st.caption("""
**Indice di Bilanciamento Evolutivo (IBE)**
È un indicatore di salute delle impostazioni che hai inserito.
Misura l'equilibrio tra la capacità dell'AI di esplorare nuove soluzioni (Mutazioni)
e la tendenza a sfruttare quelle già trovate (Euristiche/Elitismo).
- **Troppo basso:** L'AI si "accontenta" subito della prima soluzione mediocre trovata.
- **Troppo alto:** L'AI continua a cercare a caso senza mai focalizzarsi su un piano stabile.
""")
# --- 4. PERSISTENZA I/O ---
st.divider()
if st.button("💾 Calcola e Salva Configurazione", type="primary"):
# Costruisco la mappa delle regole
daily_rules_map = {}
min_h, max_h = 24, 0
for i in range(7):
d = user_schedule[i]
if d['closed']: daily_rules_map[str(i)] = "CLOSED"
else:
s_str, e_str = d['start'].strftime("%H:%M"), d['end'].strftime("%H:%M")
daily_rules_map[str(i)] = f"{s_str}-{e_str}"
if d['start'].hour < min_h: min_h = d['start'].hour
if d['end'].hour > max_h: max_h = d['end'].hour + (1 if d['end'].minute > 0 else 0)
# Trick per ottimizzare il payload JSON: calcolo l'orario più frequente
# e lo imposto come 'default', salvando gli altri giorni come 'exceptions'.
from collections import Counter
vals = list(daily_rules_map.values())
most_common = Counter(vals).most_common(1)[0][0]
final_hours = {"default": most_common, "exceptions": {}}
for k, v in daily_rules_map.items():
if v != most_common: final_hours["exceptions"][k] = v
# Aggiornamento dell'oggetto configurazione
if 'client_settings' not in config_data: config_data['client_settings'] = {}
config_data['client_settings']['planning_slot_minutes'] = new_slot
config_data['client_settings']['day_start_hour'] = int(min_h) if min_h < 24 else 8
config_data['client_settings']['day_end_hour'] = int(max_h) if max_h > 0 else 20
config_data['operating_hours'] = final_hours
config_data['weights'] = {"understaffing": new_under, "overstaffing": new_over, "homogeneity": new_homo, "soft_preference": new_soft}
config_data['genetic_params'] = {
"population_size": int(new_pop), "generations": int(new_gen),
"mutation_rate": p_mut, "crossover_rate": p_cross, "elitism_rate": p_elite,
"tournament_size": int(p_tourn), "heuristic_rate": p_heur,
"heuristic_noise": p_noise, "guided_mutation_split": p_split
}
# Push sul cloud
save_json(selected_activity, config_filename, config_data)
cfg.load_configurations(selected_activity)
st.success("✅ Configurazione salvata e sincronizzata con successo.")
else:
st.error("Payload di configurazione mancante dal Dataset.")
# ------------------------------------------------------------------------------
# TAB 2: ANAGRAFICA E VINCOLI (HR MASTER DATA)
# ------------------------------------------------------------------------------
with tab2:
emp_filename = "employees.json"
emp_data = load_json(selected_activity, emp_filename)
col_header, col_pie = st.columns([3, 1])
with col_header:
st.header("👥 Gestione Personale")
st.caption("Gestisci i profili contrattuali, le regole di fairness settimanale e inserisci ferie, permessi o vincoli di orario.")
if emp_data is not None:
with col_pie:
# Rendering del mix contrattuale. Uso matplotlib con patch alpha=0.0
# per avere uno sfondo trasparente che si adatti al tema di Streamlit.
df_stats = pd.DataFrame(emp_data)
if not df_stats.empty and 'contract' in df_stats.columns:
counts = df_stats['contract'].value_counts()
fig_pie, ax_pie = plt.subplots(figsize=(1.5, 1.5))
colors = ['#3498db', '#e74c3c', '#f1c40f', '#9b59b6']
wedges, texts, autotexts = ax_pie.pie(
counts, autopct='%1.0f%%', startangle=90, colors=colors[:len(counts)],
textprops={'fontsize': 5, 'weight': 'bold', 'color': 'white'}, pctdistance=0.7
)
ax_pie.axis('equal')
fig_pie.patch.set_alpha(0.0)
leg = ax_pie.legend(
wedges, counts.index, title="Contratti", loc="center left",
bbox_to_anchor=(1, 0, 0.5, 1), fontsize=5, title_fontsize=6, frameon=False, labelcolor='white'
)
leg.get_title().set_color("white")
leg.get_title().set_fontweight("bold")
buf_pie = io.BytesIO()
fig_pie.savefig(buf_pie, format="png", bbox_inches="tight", transparent=True, dpi=150)
buf_pie.seek(0)
st.image(buf_pie)
st.divider()
col_list, col_editor = st.columns([1, 2])
with col_list:
st.subheader("Anagrafica Risorse")
emp_ids = [e['id'] for e in emp_data]
selected_id = st.selectbox("Seleziona il Dipendente da ispezionare:", emp_ids, index=0 if emp_ids else None)
st.divider()
if st.button("💾 Salva Modifiche Anagrafica", type="primary"):
save_json(selected_activity, emp_filename, emp_data)
st.success("Anagrafica aggiornata in Cloud.")
# Costruisco la preview tabellare
summary = []
for e in emp_data:
mix = e.get('shift_mix', {"WORK": 5, "OFF": 2})
summary.append({"ID": e['id'], "Contratto": e['contract'], "Mix": f"{mix.get('WORK',5)}W/{mix.get('OFF',2)}O"})
st.dataframe(summary, hide_index=True, width='stretch')
with col_editor:
# Editor del singolo dipendente: permette di iniettare override
# hard/soft/absence direttamente sull'oggetto prima del salvataggio.
if selected_id:
emp_record = next((e for e in emp_data if e['id'] == selected_id), None)
if emp_record:
st.subheader(f"✏️ Proprietà: {emp_record['id']}")
c1, c2 = st.columns(2)
emp_record['id'] = c1.text_input("ID Dipendente", value=emp_record['id'])
ct = emp_record.get('contract', 'FT40')
ct_idx = ["FT40", "PT30", "PT20"].index(ct) if ct in ["FT40", "PT30", "PT20"] else 0
emp_record['contract'] = c2.selectbox("Tipologia Contratto", ["FT40", "PT30", "PT20"], index=ct_idx)
cc1, cc2 = st.columns(2)
emp_record['work_hours'] = cc1.number_input("Ore Lavorative Giornaliere", value=float(emp_record.get('work_hours', 8.0)))
emp_record['break_duration'] = cc2.number_input("Minuti di Pausa/Pranzo", value=int(emp_record.get('break_duration', 0)))
st.markdown("---")
st.subheader("📅 Regole di Fairness Settimanale")
st.caption("Imposta quanti giorni questa risorsa deve lavorare rispetto a quanti giorni deve riposare nella settimana.")
curr_mix = emp_record.get('shift_mix', {"WORK": 5, "OFF": 2})
cm1, cm2 = st.columns(2)
w_days = cm1.number_input("Target Giorni Lavorativi", min_value=1, max_value=7, value=int(curr_mix.get("WORK", 5)))
o_days = cm2.number_input("Target Giorni di Riposo", min_value=0, max_value=6, value=int(curr_mix.get("OFF", 2)))
emp_record['shift_mix'] = {"WORK": w_days, "OFF": o_days}
st.info(f"L'algoritmo cercherà in tutti i modi di programmare esattamente {w_days} giorni di lavoro e {o_days} di riposo. Le violazioni verranno penalizzate nel calcolo finale.")
st.markdown("---")
st.subheader("🔒 Vincoli Operativi (Assenze e Permessi)")
st.caption("Aggiungi ferie, malattie o turni fissi inamovibili.")
constraints = emp_record.get('constraints', {})
day_map = {0: "Lunedì", 1: "Martedì", 2: "Mercoledì", 3: "Giovedì", 4: "Venerdì", 5: "Sabato", 6: "Domenica"}
if constraints:
cons_view = []
for d, r in constraints.items():
cons_view.append({"Giorno": day_map.get(int(d), d), "Tipo": r['type'].upper(), "Valore/Motivo": r.get('start_time', r.get('reason',''))})
st.table(pd.DataFrame(cons_view))
to_del = st.selectbox("Seleziona Giorno da sbloccare", options=list(constraints.keys()), format_func=lambda x: day_map.get(int(x), x))
if st.button("🗑️ Rimuovi Vincolo"):
del emp_record['constraints'][to_del]
st.rerun()
with st.expander("➕ Aggiungi una nuova regola per questo dipendente"):
ac1, ac2 = st.columns(2)
add_d = ac1.selectbox("Giorno della settimana", range(7), format_func=lambda x: day_map[x])
add_t = ac2.selectbox("Tipologia di regola", ["absence", "hard", "soft"], format_func=lambda x: "Assenza" if x=="absence" else ("Turno Obbligato (Hard)" if x=="hard" else "Preferenza Oraria (Soft)"))
new_rule = {"type": add_t}
if add_t == "absence":
new_rule["reason"] = st.selectbox("Motivo Assenza", ["FERIE", "MALATTIA", "PERMESSO"])
else:
t_val = st.time_input("Orario di inzio desiderato").strftime("%H:%M")
new_rule["start_time"] = t_val
if st.button("Conferma Inserimento"):
if 'constraints' not in emp_record: emp_record['constraints'] = {}
emp_record['constraints'][str(add_d)] = new_rule
st.rerun()
else:
st.error("Payload anagrafico mancante o corrotto.")
# ------------------------------------------------------------------------------
# TAB 3: DEMAND TIME-SERIES (FABBISOGNO)
# ------------------------------------------------------------------------------
with tab3:
st.header("📈 Time-Series Fabbisogno Operativo")
st.caption("Visualizza e modifica la curva di traffico o il numero di operatori richiesti per ogni frazione oraria.")
demand_filename = "demand.json"
conf = load_json(selected_activity, "activity_config.json")
raw_demand = load_json(selected_activity, demand_filename)
if conf:
sett = conf.get('client_settings', {})
start_h = sett.get('day_start_hour', 8)
end_h = sett.get('day_end_hour', 20)
slot_min = sett.get('planning_slot_minutes', 30)
current_conf_for_alignment = {'client_settings': {'day_start_hour': start_h, 'day_end_hour': end_h, 'planning_slot_minutes': slot_min}}
total_min = (end_h - start_h) * 60
num_slots = int(total_min / slot_min)
# Allineamento dinamico della time-series: gestisco i cambi di granularità oraria
# troncando o paddando la matrice tramite l'helper apposito.
if raw_demand:
sanitized_list = sanitize_weekly_demand(raw_demand, current_conf_for_alignment)
target_demand = np.array(sanitized_list)
else:
target_demand = np.ones((7, num_slots), dtype=int) * 5
days = ["Lun", "Mar", "Mer", "Gio", "Ven", "Sab", "Dom"]
time_labels = []
curr_m = start_h * 60
end_m = end_h * 60
while curr_m < end_m:
h = int(curr_m // 60)
m = int(curr_m % 60)
time_labels.append(f"{h:02d}:{m:02d}")
curr_m += slot_min
x = np.arange(len(time_labels))
day_idx = st.selectbox("Ispeziona Giorno:", range(7), format_func=lambda x: days[x])
# Rendering vettoriale del profilo di carico
fig, ax = plt.subplots(figsize=(10, 3))
daily_curve = target_demand[day_idx]
ax.plot(x, daily_curve, color='#e74c3c', linestyle='--', marker='o', markersize=3, label="Target Staff (Richiesto)")
ax.fill_between(x, 0, daily_curve, color='#e74c3c', alpha=0.1)
step_x = max(1, len(x) // 15)
ax.set_xticks(x[::step_x])
ax.set_xticklabels(time_labels[::step_x], rotation=45, fontsize=8)
ax.set_title(f"Profilo di Carico: {days[day_idx]}")
ax.grid(True, linestyle='--', alpha=0.3)
ax.legend()
buf_demand = io.BytesIO()
fig.savefig(buf_demand, format="png", bbox_inches="tight", transparent=False, dpi=150)
buf_demand.seek(0)
st.image(buf_demand)
plt.close(fig)
st.divider()
# Uso il data_editor nativo di Streamlit per permettere l'override manuale
# della demand curva direttamente in UI, molto comodo per i planner.
st.subheader("✏️ Override Manuale (Modifica Volumi)")
st.caption("Fai doppio clic su una cella della tabella per alterare manualmente il numero di operatori richiesti.")
df_demand = pd.DataFrame(target_demand, index=days, columns=time_labels)
edited_df = st.data_editor(df_demand, width='stretch', height=300)
if st.button("💾 Salva Modifiche Curva"):
final_json_structure = []
for i, row in enumerate(edited_df.values):
row_list = [f"Giorno_{i}"] + row.tolist()
final_json_structure.append(row_list)
save_json(selected_activity, "demand.json", final_json_structure)
st.success("✅ Fabbisogno aggiornato e sincronizzato.")
st.rerun()
else:
st.warning("Impossibile effettuare il render: payload mancante.")
# ------------------------------------------------------------------------------
# TAB 4: MOTORE DI OTTIMIZZAZIONE
# ------------------------------------------------------------------------------
with tab4:
st.header("🚀 Motore di Ottimizzazione")
st.caption("Avvia l'AI per calcolare l'incastro dei turni migliore in base ai parametri che hai inserito.")
col_run, col_stat = st.columns([1, 2])
with col_run:
run_btn = st.button("✨ AVVIA IL CALCOLO DEI TURNI", type="primary")
if run_btn:
prog_bar = st.progress(0)
status_text = st.empty()
# Callback passata all'engine genetico per aggiornare l'interfaccia
# asincronamente durante i pesanti cicli for loop su Numba.
def ui_callback(gen, tot, score, div):
prog_bar.progress(gen / tot)
status_text.markdown(f"🧬 Elaborazione in corso... Generazione: **{gen}/{tot}** | Punteggio Penalità: **{score:.0f}** | Esplorazione: **{div:.2f}%**" )
with st.spinner("Compilazione codice macchina (JIT) e calcolo in corso. Potrebbe volerci qualche minuto..."):
try:
current_act = st.session_state.get('current_activity')
if not current_act: raise ValueError("Nessun contesto operativo attivo.")
employees = load_employees_from_json(current_act)
raw_d = load_json(selected_activity, "demand.json")
target = process_demand(raw_d)
# Applichiamo le regole di business (le chiusure impostate in L2 config)
# azzerando forzatamente la domanda oraria per non far schedulare turni.
for d in range(7):
closing_slot = cfg.get_closing_slot(d)
if cfg.is_day_closed(d) or closing_slot == 0:
target[d, :] = 0
else:
if closing_slot < target.shape[1]:
target[d, closing_slot:] = 0
hours_diff = check_hours_balance(employees, target)
if hours_diff >= 0:
st.success(f"✅ Controllo Preliminare Superato: Lo staff disponibile copre matematicamente le ore richieste (Surplus: {hours_diff:.1f}h).")
else:
st.error(f"⚠️ Attenzione - Sotto-dimensionamento Strutturale: Hai chiesto più ore di quelle contrattualizzate. Verranno generati dei buchi inevitabili (Deficit: {abs(hours_diff):.1f}h).")
# Esecuzione del kernel genetico core
top_solutions, div_history = run_genetic_algorithm(employees, target, progress_callback=ui_callback)
final_pop_sample = np.array([sol['schedule'] for sol in top_solutions])
final_diversity = div_history[-1] if div_history else 0.0
# Caching dell'output generato nell'oggetto di sessione.
# Evita di perdere i risultati (o triggerare ricalcoli) se cambio tab.
st.session_state['ga_results'] = {
'top_solutions': top_solutions,
'diversity_score': final_diversity,
'diversity_history': div_history,
'selected_idx': 0,
'employees': employees,
'target': target
}
best_s = top_solutions[0]['total_score']
status_text.success(f"Ottimizzazione conclusa con successo! Miglior punteggio di penalità raggiunto: {best_s:.0f}")
except Exception as e:
st.error(f"Errore di sistema durante il calcolo: {e}")
traceback.print_exc()
# Blocco di visualizzazione post-run
if 'ga_results' in st.session_state:
res = st.session_state['ga_results']
solutions = res['top_solutions']
div_hist = res.get('diversity_history', [res.get('diversity_score', 0.0)])
final_div = div_hist[-1]
# Diagnostica algoritmica automatizzata (Controlla il drop-rate della diversità)
msg, color_code = analyze_convergence_quality(div_hist)
st.markdown("### 🩺 Diagnostica e Validazione Scientifica")
kpi1, kpi2 = st.columns([1, 3])
kpi1.metric("Diversità Genetica Finale", f"{final_div:.2f}%")
if color_code == "success": kpi2.success(msg)
elif color_code == "normal": kpi2.info(msg)
else: kpi2.error(msg)
with st.expander("Cos'è la Diversità e come interpretarla?"):
st.caption("""
La **Diversità** indica quante soluzioni "diverse" l'algoritmo stava ancora testando alla fine del processo.
- **Se è >40%:** L'AI non è riuscita a trovare un pattern vincente e ha continuato a sparare a caso (aumenta le Generazioni o diminuisci la Mutazione).
- **Se crolla subito a <5% (Convergenza Prematura):** L'AI si è "incastrata" su una soluzione mediocre e ha smesso di cercare (aumenta la Mutazione).
- **Se scende gradualmente (Matura):** È lo stato ideale. L'AI ha esplorato bene e poi ha "stretto" verso la soluzione perfetta.
""")
if div_hist:
st.subheader("📉 Profilo Dinamico dell'Apprendimento")
fig_div, ax_div = plt.subplots(figsize=(10, 3))
x_axis = [i * 5 for i in range(len(div_hist))]
ax_div.plot(x_axis, div_hist, color='#2980b9', linewidth=2, label='Varianza di Popolazione (%)')
ax_div.axhspan(0, 5, color='#e74c3c', alpha=0.1, label='Rischio Collasso (<5%)')
ax_div.axhspan(40, 100, color='#e67e22', alpha=0.1, label='Rischio Divergenza (>40%)')
ax_div.axhspan(5, 40, color='#2ecc71', alpha=0.1, label='Fascia Ottimale')
ax_div.set_ylabel("Hamming Dist (%)")
ax_div.set_xlabel("Epoche di Addestramento")
ax_div.set_ylim(0, max(50, max(div_hist) + 5))
ax_div.grid(True, linestyle='--', alpha=0.5)
ax_div.legend(loc='upper right', fontsize='small')
buf_div = io.BytesIO()
fig_div.savefig(buf_div, format="png", bbox_inches="tight", transparent=False, dpi=150)
buf_div.seek(0)
st.image(buf_div)
plt.close(fig_div)
with st.expander("Come leggere questo grafico?"):
st.caption("""
Questo grafico racconta visivamente il lavoro dell'algoritmo:
1. **Fase Iniziale (Esplorazione):** Il grafico deve partire alto (fuori dal rosso basso). L'AI sta provando incastri creativi.
2. **Discesa (Sfruttamento):** La curva deve scendere dolcemente verso il basso.
3. **Atterraggio:** La curva dovrebbe stabilizzarsi nella **Fascia Verde Ottimale**. Se vedi crolli verticali improvvisi all'inizio, c'è un problema di configurazione nei parametri genetici.
""")
st.divider()
st.subheader("🏆 Esplorazione delle Migliori Soluzioni Trovate")
st.caption("L'algoritmo ti propone le varianti più performanti. Lo SCORE TOTALE è la somma delle penalità (più è basso, meglio è).")
comp_data = []
for i, s in enumerate(solutions):
comp_data.append({
"Candidato": f"Soluzione #{i+1}",
"SCORE TOTALE (Penalità)": int(s['total_score']),
"❌ Understaffing (Buchi)": int(s['understaffing']),
"⚖️ Inequità Weekend": int(s['equity']),
"⚡ Overstaffing (Eccessi)": int(s['overstaffing']),
"🎨 Pref. Ignorate": int(s['soft_preferences']),
"📝 Mix Contratti Violato": int(s['contract'])
})
df_comp = pd.DataFrame(comp_data)
st.dataframe(
df_comp.style.background_gradient(cmap="RdYlGn_r", subset=["SCORE TOTALE (Penalità)", "❌ Understaffing (Buchi)"]),
width='stretch',
hide_index=True
)
sel_opt = st.radio("Seleziona quale piano turni visualizzare in dettaglio:",
options=range(len(solutions)),
format_func=lambda x: f"Apri Dettaglio Soluzione #{x+1}",
horizontal=True,
index=st.session_state.get('selected_idx', 0))
st.session_state['ga_results']['selected_idx'] = sel_opt
chosen_sol = solutions[sel_opt]
best_sched = chosen_sol['schedule']
emps = res['employees']
tgt = res['target']
st.subheader(f"Dashboard Copertura: Soluzione #{sel_opt+1}")
st.caption("Confronto visivo tra le persone richieste (linea rossa tratteggiata) e le persone messe a turno (area blu).")
d_tabs = st.tabs(["Lunedì", "Martedì", "Mercoledì", "Giovedì", "Venerdì", "Sabato", "Domenica"])
# Proietto il genoma elaborato (best_sched) sulla matrice di copertura per le charts
cov_mat = get_final_coverage_matrix(best_sched, emps)
for i, t in enumerate(d_tabs):
with t:
fig, ax = plt.subplots(figsize=(8, 2))
x = range(cfg.daily_slots)
ax.fill_between(x, cov_mat[i], alpha=0.3)
ax.plot(x, cov_mat[i], label="Staff Schedulato")
ax.plot(x, tgt[i], 'r--', label="Staff Richiesto (Target)")
tick_step = 4
lbls = [minutes_to_time(k * cfg.system_slot_minutes) for k in x[::tick_step]]
ax.set_xticks(list(x)[::tick_step])
ax.set_xticklabels(lbls, rotation=0, fontsize=4)
ax.legend()
buf_day = io.BytesIO()
fig.savefig(buf_day, format="png", bbox_inches="tight", transparent=False, dpi=150)
buf_day.seek(0)
st.image(buf_day)
plt.close(fig)
st.subheader("Tabellone Turni (Export)")
data_rows = []
days = ["Lun", "Mar", "Mer", "Gio", "Ven", "Sab", "Dom"]
for idx, e in enumerate(emps):
row = {"Dipendente": e['id']}
for d in range(7):
s = best_sched[idx, d]
if s == -1: txt = "OFF"
elif s == -2: txt = "ABS"
else:
start = s * cfg.system_slot_minutes
end = start + (e['shift_len'] * cfg.system_slot_minutes)
txt = f"{minutes_to_time(start)}-{minutes_to_time(end)}"
row[days[d]] = txt
data_rows.append(row)
st.dataframe(pd.DataFrame(data_rows), width='stretch')
st.markdown("---")
st.subheader("🔬 Ispezione Micro-Turno (Maschere VDT e Pause)")
st.caption("Verifica la corretta allocazione delle pause VDT all'interno dello spezzato del singolo operatore.")
# Micro-rendering della maschera binaria per l'ispezione visiva dei sub-slot
c1, c2 = st.columns(2)
sel_emp = c1.selectbox("Seleziona Operatore", [e['id'] for e in emps])
sel_day = c2.selectbox("Seleziona Giorno", range(7), format_func=lambda x: days[x])
e_idx = next(i for i,e in enumerate(emps) if e['id'] == sel_emp)
s_start = best_sched[e_idx, sel_day]
if s_start >= 0:
mask = emps[e_idx]['mask']
html = ""
for k, bit in enumerate(mask):
t_str = minutes_to_time((s_start + k) * cfg.system_slot_minutes)
col = "#4CAF50" if bit else "#FF5252"
html += f"<div style='display:inline-block;width:35px;background:{col};color:white;font-size:10px;text-align:center;margin:1px;'>{t_str}</div>"
st.markdown(html, unsafe_allow_html=True)
st.caption("**Legenda:** [Verde] = Operatività a Terminale | [Rosso] = Pausa/Pranzo")
else:
st.info("Status per il giorno selezionato: RIPOSO o ASSENTE.")
# ------------------------------------------------------------------------------
# TAB 5: BOOTSTRAPPING DI MOCK SCENARIOS (GENERATORE)
# ------------------------------------------------------------------------------
with tab5:
st.header("⚡ Generatore Ambienti di Test (Mock Scenarios)")
st.markdown("""
Crea rapidamente nuovi scenari completi per testare come il motore AI reagisce a diverse
composizioni della forza lavoro (es. alta rigidità vs alta flessibilità).
""")
col_gen_L, col_gen_R = st.columns([1, 2])
with col_gen_L:
st.subheader("1. Setup Spazio Dati")
new_scenario_name = st.text_input("Nome del nuovo Scenario", value="Nuovo_Test_BPO")
new_emp_count = st.number_input("Numero Dipendenti Fittizi", min_value=1, max_value=2000, value=300, step=10)
st.markdown("---")
st.subheader("📊 Modello Matematico del Fabbisogno")
st.caption("Scegli una distribuzione che simuli fedelmente il traffico del servizio.")
curve_options = {
"Doppia Campana (Tipico BPO Voice)": "double_bell",
"Campana Centrale (Es. Delivery/Pausa Pranzo)": "single_bell_center",
"Picco Mattutino (Es. Helpdesk IT)": "morning_peak",
"Piatto Costante (Es. Backoffice/Data Entry)": "steady_high"
}
selected_curve_label = st.selectbox("Seleziona Modello di Carico:", options=list(curve_options.keys()), index=0)
curve_key = curve_options[selected_curve_label]
st.caption("Anteprima Forma:")
# Anteprima visiva matematica della distribuzione scelta
preview_x = np.linspace(8, 22, 50)
if curve_key == "double_bell":
preview_y = np.exp(-((preview_x - 11)**2)/4) + np.exp(-((preview_x - 16)**2)/4)
elif curve_key == "single_bell_center":
preview_y = np.exp(-((preview_x - 13)**2)/9)
elif curve_key == "morning_peak":
preview_y = np.exp(-((preview_x - 9.5)**2)/5)
else:
preview_y = np.ones_like(preview_x) * 0.8
fig_curve_prev, ax_cp = plt.subplots(figsize=(4, 1.5))
ax_cp.plot(preview_x, preview_y, color='#2ecc71', lw=2)
ax_cp.fill_between(preview_x, preview_y, color='#2ecc71', alpha=0.2)
ax_cp.set_yticks([])
ax_cp.set_xticks([8, 12, 16, 20])
ax_cp.set_xlim(8, 22)
fig_curve_prev.patch.set_alpha(0.0)
ax_cp.patch.set_alpha(0.0)
buf_curve = io.BytesIO()
fig_curve_prev.savefig(buf_curve, format="png", bbox_inches="tight", transparent=False, dpi=150)
buf_curve.seek(0)
st.image(buf_curve)
plt.close(fig_curve_prev)
with col_gen_R:
st.subheader("2. Strategia HR (Mix Contrattuale)")
st.caption("Simula il livello di flessibilità del personale.")
pct_ft40 = st.slider("🔵 Full Time (8h - Alta rigidità)", 0, 100, 60)
pct_pt30 = st.slider("🟡 Part Time (6h - Media flessibilità)", 0, 100, 20)
remaining = max(0, 100 - (pct_ft40 + pct_pt30))
pct_pt20 = st.slider("🔴 Part Time (4h - Alta flessibilità)", 0, 100, remaining)
total_mix = pct_ft40 + pct_pt30 + pct_pt20
if total_mix != 100:
st.warning(f"⚠️ La somma deve essere 100%. Attuale: {total_mix}%.")
else:
st.success("✅ Composizione valida.")
fig_preview, ax_prev = plt.subplots(figsize=(3, 1.5))
data_prev, labels_prev, colors_prev = [pct_ft40, pct_pt30, pct_pt20], ['FT 8h', 'PT 6h', 'PT 4h'], ['#3498db', '#f1c40f', '#e74c3c']
d_clean, l_clean, c_clean = zip(*[(d, l, c) for d, l, c in zip(data_prev, labels_prev, colors_prev) if d > 0])
wedges, texts, autotexts = ax_prev.pie(d_clean, labels=None, colors=c_clean, autopct='%1.0f%%', textprops={'color':"white", 'fontsize': 8, 'weight': 'bold'}, pctdistance=0.5)
ax_prev.axis('equal')
fig_preview.patch.set_alpha(0.0)
leg = ax_prev.legend(wedges, l_clean, loc="center left", bbox_to_anchor=(1, 0, 0.5, 1), frameon=False, labelcolor='white', fontsize=7)
buf = io.BytesIO()
fig_preview.savefig(buf, format="png", bbox_inches="tight", transparent=True, dpi=150)
buf.seek(0)
st.image(buf)
st.divider()
btn_col, _ = st.columns([1, 3])
if btn_col.button("🚀 Inizializza Scenario su HF", type="primary", disabled=(total_mix != 100)):
if not new_scenario_name.strip():
st.error("Nome scenario non valido.")
else:
with st.spinner("Creazione dati fittizi e upload in corso..."):
mix_dict = {'FT40': pct_ft40, 'PT30': pct_pt30, 'PT20': pct_pt20}
success, msg = generate_scenario_files(new_scenario_name, new_emp_count, mix_dict, curve_key)
if success:
st.success(f"{msg}")
st.info("🔄 Clicca su 'Ricarica App' nella barra laterale sinistra per gestire questo nuovo scenario.")
else:
st.error(f"Errore di sistema: {msg}")