import os import json from huggingface_hub import HfApi, hf_hub_download from huggingface_hub.utils import EntryNotFoundError DATASET_REPO_ID = "NextGenTech/geneticWFM-activities" # --- ENVIRONMENT DETECTION --- # Switch dinamico Dev/Prod basato sull'injection di SPACE_ID da parte del container HF IS_LOCAL = os.environ.get("SPACE_ID") is None # Risoluzione dinamica della project root per gestire l'I/O in local fallback PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) LOCAL_BASE_PATH = os.path.join(PROJECT_ROOT, "data", "activities") def get_hf_api(): """Istanzia il client HF sfruttando i secrets di environment.""" return HfApi(token=os.environ.get("HF_TOKEN")) def list_activities(): """ Discovery dinamica dei workspace operativi. Implementa routing Dev (OS Locale) / Prod (HF Datasets API). """ if IS_LOCAL: if not os.path.exists(LOCAL_BASE_PATH): return [] return sorted([d for d in os.listdir(LOCAL_BASE_PATH) if os.path.isdir(os.path.join(LOCAL_BASE_PATH, d))]) # --- PROD LOGIC --- api = get_hf_api() try: files = api.list_repo_files(repo_id=DATASET_REPO_ID, repo_type="dataset") activities = set() for f in files: parts = f.split("/") if len(parts) > 2 and parts[0] == "activities": activities.add(parts[1]) return sorted(list(activities)) except Exception as e: print(f"[ERROR] Discovery attività fallita sul layer HF: {e}") return [] def load_json(activity_name, filename): """ I/O Read: Deserializzazione del payload JSON. """ if IS_LOCAL: path = os.path.join(LOCAL_BASE_PATH, activity_name, filename) if os.path.exists(path): with open(path, 'r') as f: return json.load(f) return None # --- PROD LOGIC --- repo_path = f"activities/{activity_name}/{filename}" try: downloaded_path = hf_hub_download( repo_id=DATASET_REPO_ID, repo_type="dataset", filename=repo_path, token=os.environ.get("HF_TOKEN") ) with open(downloaded_path, 'r') as f: return json.load(f) except EntryNotFoundError: # Silenziamo l'errore se il file semplicemente non esiste ancora return None except Exception as e: print(f"[ERROR] API Download fallito per {repo_path}: {e}") return None def save_json(activity_name, filename, data): """ I/O Write: Serializzazione su disco e successiva replica su object storage. """ if IS_LOCAL: path = os.path.join(LOCAL_BASE_PATH, activity_name, filename) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, 'w') as f: json.dump(data, f, indent=2) return # --- PROD LOGIC --- repo_path = f"activities/{activity_name}/{filename}" local_tmp_path = f"/tmp/{activity_name}_{filename}" # Scrittura su layer effimero del container (/tmp) os.makedirs(os.path.dirname(local_tmp_path), exist_ok=True) with open(local_tmp_path, 'w') as f: json.dump(data, f, indent=2) api = get_hf_api() api.upload_file( path_or_fileobj=local_tmp_path, path_in_repo=repo_path, repo_id=DATASET_REPO_ID, repo_type="dataset", commit_message=f"Sync {filename} -> {activity_name}" ) def upload_new_scenario(activity_name, activity_conf, employees, weekly_demand): """ Commit massivo (atomic push) di un nuovo scenario generato. Evita commit parziali raggruppando Config, Anagrafica e Demand. """ if IS_LOCAL: base_path = os.path.join(LOCAL_BASE_PATH, activity_name) os.makedirs(base_path, exist_ok=True) with open(os.path.join(base_path, "activity_config.json"), 'w') as f: json.dump(activity_conf, f, indent=2) with open(os.path.join(base_path, "employees.json"), 'w') as f: json.dump(employees, f, indent=2) with open(os.path.join(base_path, "demand.json"), 'w') as f: json.dump(weekly_demand, f, indent=2) return # --- PROD LOGIC --- api = get_hf_api() local_dir = f"/tmp/activities/{activity_name}" os.makedirs(local_dir, exist_ok=True) # Scrittura layer effimero with open(os.path.join(local_dir, "activity_config.json"), 'w') as f: json.dump(activity_conf, f, indent=2) with open(os.path.join(local_dir, "employees.json"), 'w') as f: json.dump(employees, f, indent=2) with open(os.path.join(local_dir, "demand.json"), 'w') as f: json.dump(weekly_demand, f, indent=2) # Push massivo della cartella temporanea api.upload_folder( folder_path=local_dir, path_in_repo=f"activities/{activity_name}", repo_id=DATASET_REPO_ID, repo_type="dataset", commit_message=f"Init workspace: {activity_name}" )