Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| from huggingface_hub import HfApi, hf_hub_download | |
| from huggingface_hub.utils import EntryNotFoundError | |
| DATASET_REPO_ID = "NextGenTech/geneticWFM-activities" | |
| # --- ENVIRONMENT DETECTION --- | |
| # Switch dinamico Dev/Prod basato sull'injection di SPACE_ID da parte del container HF | |
| IS_LOCAL = os.environ.get("SPACE_ID") is None | |
| # Risoluzione dinamica della project root per gestire l'I/O in local fallback | |
| PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| LOCAL_BASE_PATH = os.path.join(PROJECT_ROOT, "data", "activities") | |
| def get_hf_api(): | |
| """Istanzia il client HF sfruttando i secrets di environment.""" | |
| return HfApi(token=os.environ.get("HF_TOKEN")) | |
| def list_activities(): | |
| """ | |
| Discovery dinamica dei workspace operativi. | |
| Implementa routing Dev (OS Locale) / Prod (HF Datasets API). | |
| """ | |
| if IS_LOCAL: | |
| if not os.path.exists(LOCAL_BASE_PATH): | |
| return [] | |
| return sorted([d for d in os.listdir(LOCAL_BASE_PATH) if os.path.isdir(os.path.join(LOCAL_BASE_PATH, d))]) | |
| # --- PROD LOGIC --- | |
| api = get_hf_api() | |
| try: | |
| files = api.list_repo_files(repo_id=DATASET_REPO_ID, repo_type="dataset") | |
| activities = set() | |
| for f in files: | |
| parts = f.split("/") | |
| if len(parts) > 2 and parts[0] == "activities": | |
| activities.add(parts[1]) | |
| return sorted(list(activities)) | |
| except Exception as e: | |
| print(f"[ERROR] Discovery attività fallita sul layer HF: {e}") | |
| return [] | |
| def load_json(activity_name, filename): | |
| """ | |
| I/O Read: Deserializzazione del payload JSON. | |
| """ | |
| if IS_LOCAL: | |
| path = os.path.join(LOCAL_BASE_PATH, activity_name, filename) | |
| if os.path.exists(path): | |
| with open(path, 'r') as f: | |
| return json.load(f) | |
| return None | |
| # --- PROD LOGIC --- | |
| repo_path = f"activities/{activity_name}/{filename}" | |
| try: | |
| downloaded_path = hf_hub_download( | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| filename=repo_path, | |
| token=os.environ.get("HF_TOKEN") | |
| ) | |
| with open(downloaded_path, 'r') as f: | |
| return json.load(f) | |
| except EntryNotFoundError: | |
| # Silenziamo l'errore se il file semplicemente non esiste ancora | |
| return None | |
| except Exception as e: | |
| print(f"[ERROR] API Download fallito per {repo_path}: {e}") | |
| return None | |
| def save_json(activity_name, filename, data): | |
| """ | |
| I/O Write: Serializzazione su disco e successiva replica su object storage. | |
| """ | |
| if IS_LOCAL: | |
| path = os.path.join(LOCAL_BASE_PATH, activity_name, filename) | |
| os.makedirs(os.path.dirname(path), exist_ok=True) | |
| with open(path, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| return | |
| # --- PROD LOGIC --- | |
| repo_path = f"activities/{activity_name}/{filename}" | |
| local_tmp_path = f"/tmp/{activity_name}_{filename}" | |
| # Scrittura su layer effimero del container (/tmp) | |
| os.makedirs(os.path.dirname(local_tmp_path), exist_ok=True) | |
| with open(local_tmp_path, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| api = get_hf_api() | |
| api.upload_file( | |
| path_or_fileobj=local_tmp_path, | |
| path_in_repo=repo_path, | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| commit_message=f"Sync {filename} -> {activity_name}" | |
| ) | |
| def upload_new_scenario(activity_name, activity_conf, employees, weekly_demand): | |
| """ | |
| Commit massivo (atomic push) di un nuovo scenario generato. | |
| Evita commit parziali raggruppando Config, Anagrafica e Demand. | |
| """ | |
| if IS_LOCAL: | |
| base_path = os.path.join(LOCAL_BASE_PATH, activity_name) | |
| os.makedirs(base_path, exist_ok=True) | |
| with open(os.path.join(base_path, "activity_config.json"), 'w') as f: | |
| json.dump(activity_conf, f, indent=2) | |
| with open(os.path.join(base_path, "employees.json"), 'w') as f: | |
| json.dump(employees, f, indent=2) | |
| with open(os.path.join(base_path, "demand.json"), 'w') as f: | |
| json.dump(weekly_demand, f, indent=2) | |
| return | |
| # --- PROD LOGIC --- | |
| api = get_hf_api() | |
| local_dir = f"/tmp/activities/{activity_name}" | |
| os.makedirs(local_dir, exist_ok=True) | |
| # Scrittura layer effimero | |
| with open(os.path.join(local_dir, "activity_config.json"), 'w') as f: | |
| json.dump(activity_conf, f, indent=2) | |
| with open(os.path.join(local_dir, "employees.json"), 'w') as f: | |
| json.dump(employees, f, indent=2) | |
| with open(os.path.join(local_dir, "demand.json"), 'w') as f: | |
| json.dump(weekly_demand, f, indent=2) | |
| # Push massivo della cartella temporanea | |
| api.upload_folder( | |
| folder_path=local_dir, | |
| path_in_repo=f"activities/{activity_name}", | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| commit_message=f"Init workspace: {activity_name}" | |
| ) |