""" ga/operators.py — Operatori genetici: crossover, mutation, selection. Tutti gli operatori lavorano su copie degli individui senza modificare i genitori (operazioni pure). """ from __future__ import annotations import random from core.models import Individual, PoI # --------------------------------------------------------------------------- # SELECTION # --------------------------------------------------------------------------- def tournament_select( population: list[Individual], k: int = 3, use_pareto: bool = True, ) -> Individual: """ Selezione torneo: estrae k individui casuali e restituisce il migliore. Con use_pareto=True preferisce rango Pareto basso + crowding alto. """ contestants = random.sample(population, k) if use_pareto: return min(contestants, key=lambda x: (x.fitness.rank, -x.fitness.crowd)) return max(contestants, key=lambda x: x.fitness.scalar) # --------------------------------------------------------------------------- # CROSSOVER # --------------------------------------------------------------------------- def order_crossover(parent1: Individual, parent2: Individual) -> tuple[Individual, Individual]: """ Order Crossover (OX) adattato a tour a lunghezza variabile. Preserva l'ordine relativo dei PoI condivisi tra i genitori. """ g1, g2 = parent1.genes, parent2.genes if len(g1) < 2 or len(g2) < 2: return parent1.clone(), parent2.clone() def ox(donor: list[PoI], receiver: list[PoI]) -> list[PoI]: if len(donor) < 2: return list(donor) cut1 = random.randint(0, len(donor) - 1) cut2 = random.randint(cut1, len(donor) - 1) segment = donor[cut1:cut2+1] seg_ids = {p.id for p in segment} # Riempi con i PoI del receiver non già nel segmento filling = [p for p in receiver if p.id not in seg_ids] child = filling[:cut1] + segment + filling[cut1:] return child return ( Individual(genes=ox(g1, g2)), Individual(genes=ox(g2, g1)), ) def poi_aware_crossover( parent1: Individual, parent2: Individual, categories: list[str] | None = None, ) -> tuple[Individual, Individual]: """ PoI-aware Crossover: scambia interi sottoinsiemi per categoria. Utile per preservare "nicchie tematiche" (es. tour musei vs tour gastronomico). """ from core.models import PoICategory if categories is None: categories = [c.value for c in PoICategory] def by_category(genes: list[PoI]) -> dict[str, list[PoI]]: d: dict[str, list[PoI]] = {c: [] for c in categories} for p in genes: d[p.category.value].append(p) return d cat1 = by_category(parent1.genes) cat2 = by_category(parent2.genes) child1_genes, child2_genes = [], [] for cat in categories: # Con probabilità 0.5 scambia le categorie tra i figli if random.random() < 0.5: child1_genes.extend(cat1[cat]) child2_genes.extend(cat2[cat]) else: child1_genes.extend(cat2[cat]) child2_genes.extend(cat1[cat]) # Rimuovi duplicati mantenendo l'ordine def deduplicate(genes: list[PoI]) -> list[PoI]: seen, result = set(), [] for p in genes: if p.id not in seen: seen.add(p.id) result.append(p) return result return ( Individual(genes=deduplicate(child1_genes)), Individual(genes=deduplicate(child2_genes)), ) # --------------------------------------------------------------------------- # MUTATION # --------------------------------------------------------------------------- def mutate( individual: Individual, poi_pool: list[PoI], mut_prob: float = 0.15, ) -> Individual: """ Seleziona casualmente uno degli operatori di mutazione. Applicato con probabilità mut_prob. """ if random.random() > mut_prob or not individual.genes: return individual operator = random.choice([ swap_mutation, insert_mutation, reverse_segment_mutation, lambda ind, pool: add_remove_mutation(ind, pool), ]) result = operator(individual.clone(), poi_pool) result.invalidate_cache() return result def swap_mutation(individual: Individual, _pool: list[PoI]) -> Individual: """Scambia due PoI scelti casualmente nella sequenza.""" g = individual.genes if len(g) < 2: return individual i, j = random.sample(range(len(g)), 2) g[i], g[j] = g[j], g[i] return individual def insert_mutation(individual: Individual, _pool: list[PoI]) -> Individual: """Rimuove un PoI da una posizione e lo inserisce altrove.""" g = individual.genes if len(g) < 2: return individual i = random.randrange(len(g)) poi = g.pop(i) j = random.randint(0, len(g)) g.insert(j, poi) return individual def reverse_segment_mutation(individual: Individual, _pool: list[PoI]) -> Individual: """Inverte un sottosegmento casuale del tour (elimina incroci geografici).""" g = individual.genes if len(g) < 3: return individual i = random.randint(0, len(g) - 2) j = random.randint(i + 1, len(g) - 1) g[i:j+1] = reversed(g[i:j+1]) return individual def add_remove_mutation(individual: Individual, pool: list[PoI]) -> Individual: """ Con prob 0.70: aggiunge un PoI casuale non ancora nel tour (esplora). Con prob 0.30: rimuove il PoI con il peggior score/durata (semplifica). Il bias verso l'aggiunta contrasta la tendenza del GA a produrre tour sempre più corti dopo molte generazioni di mutazione. """ g = individual.genes visited = {p.id for p in g} if random.random() < 0.70: # era 0.50: bias verso aggiunta candidates = [p for p in pool if p.id not in visited] if candidates: new_poi = random.choice(candidates) pos = random.randint(0, len(g)) g.insert(pos, new_poi) else: if g: worst = min(g, key=lambda p: p.score / (p.visit_duration + 1e-9)) g.remove(worst) return individual