GeneticWFM / src /utils /hf_storage.py
GaetanoParente's picture
first commit
9e62f55
import os
import json
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import EntryNotFoundError
DATASET_REPO_ID = "NextGenTech/geneticWFM-activities"
# --- ENVIRONMENT DETECTION ---
# Switch dinamico Dev/Prod basato sull'injection di SPACE_ID da parte del container HF
IS_LOCAL = os.environ.get("SPACE_ID") is None
# Risoluzione dinamica della project root per gestire l'I/O in local fallback
PROJECT_ROOT = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
LOCAL_BASE_PATH = os.path.join(PROJECT_ROOT, "data", "activities")
def get_hf_api():
"""Istanzia il client HF sfruttando i secrets di environment."""
return HfApi(token=os.environ.get("HF_TOKEN"))
def list_activities():
"""
Discovery dinamica dei workspace operativi.
Implementa routing Dev (OS Locale) / Prod (HF Datasets API).
"""
if IS_LOCAL:
if not os.path.exists(LOCAL_BASE_PATH):
return []
return sorted([d for d in os.listdir(LOCAL_BASE_PATH) if os.path.isdir(os.path.join(LOCAL_BASE_PATH, d))])
# --- PROD LOGIC ---
api = get_hf_api()
try:
files = api.list_repo_files(repo_id=DATASET_REPO_ID, repo_type="dataset")
activities = set()
for f in files:
parts = f.split("/")
if len(parts) > 2 and parts[0] == "activities":
activities.add(parts[1])
return sorted(list(activities))
except Exception as e:
print(f"[ERROR] Discovery attività fallita sul layer HF: {e}")
return []
def load_json(activity_name, filename):
"""
I/O Read: Deserializzazione del payload JSON.
"""
if IS_LOCAL:
path = os.path.join(LOCAL_BASE_PATH, activity_name, filename)
if os.path.exists(path):
with open(path, 'r') as f:
return json.load(f)
return None
# --- PROD LOGIC ---
repo_path = f"activities/{activity_name}/{filename}"
try:
downloaded_path = hf_hub_download(
repo_id=DATASET_REPO_ID,
repo_type="dataset",
filename=repo_path,
token=os.environ.get("HF_TOKEN")
)
with open(downloaded_path, 'r') as f:
return json.load(f)
except EntryNotFoundError:
# Silenziamo l'errore se il file semplicemente non esiste ancora
return None
except Exception as e:
print(f"[ERROR] API Download fallito per {repo_path}: {e}")
return None
def save_json(activity_name, filename, data):
"""
I/O Write: Serializzazione su disco e successiva replica su object storage.
"""
if IS_LOCAL:
path = os.path.join(LOCAL_BASE_PATH, activity_name, filename)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'w') as f:
json.dump(data, f, indent=2)
return
# --- PROD LOGIC ---
repo_path = f"activities/{activity_name}/{filename}"
local_tmp_path = f"/tmp/{activity_name}_{filename}"
# Scrittura su layer effimero del container (/tmp)
os.makedirs(os.path.dirname(local_tmp_path), exist_ok=True)
with open(local_tmp_path, 'w') as f:
json.dump(data, f, indent=2)
api = get_hf_api()
api.upload_file(
path_or_fileobj=local_tmp_path,
path_in_repo=repo_path,
repo_id=DATASET_REPO_ID,
repo_type="dataset",
commit_message=f"Sync {filename} -> {activity_name}"
)
def upload_new_scenario(activity_name, activity_conf, employees, weekly_demand):
"""
Commit massivo (atomic push) di un nuovo scenario generato.
Evita commit parziali raggruppando Config, Anagrafica e Demand.
"""
if IS_LOCAL:
base_path = os.path.join(LOCAL_BASE_PATH, activity_name)
os.makedirs(base_path, exist_ok=True)
with open(os.path.join(base_path, "activity_config.json"), 'w') as f:
json.dump(activity_conf, f, indent=2)
with open(os.path.join(base_path, "employees.json"), 'w') as f:
json.dump(employees, f, indent=2)
with open(os.path.join(base_path, "demand.json"), 'w') as f:
json.dump(weekly_demand, f, indent=2)
return
# --- PROD LOGIC ---
api = get_hf_api()
local_dir = f"/tmp/activities/{activity_name}"
os.makedirs(local_dir, exist_ok=True)
# Scrittura layer effimero
with open(os.path.join(local_dir, "activity_config.json"), 'w') as f:
json.dump(activity_conf, f, indent=2)
with open(os.path.join(local_dir, "employees.json"), 'w') as f:
json.dump(employees, f, indent=2)
with open(os.path.join(local_dir, "demand.json"), 'w') as f:
json.dump(weekly_demand, f, indent=2)
# Push massivo della cartella temporanea
api.upload_folder(
folder_path=local_dir,
path_in_repo=f"activities/{activity_name}",
repo_id=DATASET_REPO_ID,
repo_type="dataset",
commit_message=f"Init workspace: {activity_name}"
)