""" HF-Master Shared Utilities Helper functions for all projects """ import os import re import json import hashlib from typing import Dict, List, Optional, Any, Union from datetime import datetime from pathlib import Path import sqlite3 def load_env(var_name: str, default: Optional[str] = None) -> Optional[str]: """Load environment variable with optional default""" return os.getenv(var_name, default) def load_api_key(provider: str = "openai") -> Optional[str]: """Load API key for specified provider""" key_map = { "openai": "OPENAI_API_KEY", "anthropic": "ANTHROPIC_API_KEY", "huggingface": "HF_TOKEN", "cohere": "COHERE_API_KEY", "together": "TOGETHER_API_KEY" } env_var = key_map.get(provider.lower()) if env_var: return load_env(env_var) return None def estimate_token_count(text: str, model: str = "gpt-4") -> int: """Estimate token count for text""" tokens_per_word = { "gpt-4": 4, # ~4 chars per token "gpt-3.5": 4, "claude": 4, "llama": 3 # More efficient } chars_per_token = tokens_per_word.get(model, 4) return len(text) // chars_per_token def estimate_tokens(text: str, model: str = "gpt-4") -> int: """Backward-compatible alias used by older apps""" return estimate_token_count(text, model) def calculate_api_cost( model: str, input_tokens: int, output_tokens: int, provider: str = "openai" ) -> float: """Calculate API cost for model usage""" pricing = { "openai": { "gpt-4": {"input": 0.03, "output": 0.06}, "gpt-3.5-turbo": {"input": 0.001, "output": 0.002}, "gpt-4-turbo": {"input": 0.01, "output": 0.03} }, "anthropic": { "claude-3-opus": {"input": 0.015, "output": 0.075}, "claude-3-sonnet": {"input": 0.003, "output": 0.015} } } provider_pricing = pricing.get(provider, {}) model_pricing = provider_pricing.get(model, {"input": 0.01, "output": 0.03}) input_cost = (input_tokens / 1000) * model_pricing["input"] output_cost = (output_tokens / 1000) * model_pricing["output"] return input_cost + output_cost def calculate_cost(tokens: int, model: str = "gpt-4", provider: str = "openai") -> float: """Backward-compatible alias used by older apps""" return calculate_api_cost(model=model, input_tokens=tokens, output_tokens=0, provider=provider) def sanitize_filename(name: str) -> str: """Convert string to safe filename""" name = name.lower().strip() name = re.sub(r'[^\w\s-]', '', name) name = re.sub(r'[\s]+', '-', name) return name def create_hash(text: str, length: int = 8) -> str: """Create short hash from text""" return hashlib.md5(text.encode()).hexdigest()[:length] def format_duration(seconds: float) -> str: """Format duration in human-readable form""" if seconds < 60: return f"{seconds:.1f}s" elif seconds < 3600: return f"{seconds/60:.1f}m" else: return f"{seconds/3600:.1f}h" def format_bytes(bytes: int) -> str: """Format bytes in human-readable form""" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if bytes < 1024: return f"{bytes:.1f} {unit}" bytes /= 1024 return f"{bytes:.1f} PB" def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str: """Truncate text with suffix""" if len(text) <= max_length: return text return text[:max_length - len(suffix)] + suffix def parse_dice_notation(notation: str) -> Dict[str, Any]: """Parse dice notation like 2d6+3""" match = re.match(r'(\d+)d(\d+)(kh\d+)?([+-]\d+)?', notation.upper()) if not match: raise ValueError(f"Invalid dice notation: {notation}") num_dice = int(match.group(1)) die_size = int(match.group(2)) keep_high = match.group(3) modifier = int(match.group(4)) if match.group(4) else 0 return { "num_dice": num_dice, "die_size": die_size, "keep_high": keep_high, "modifier": modifier } def roll_dice(notation: str) -> List[int]: """Roll dice and return individual rolls""" import random parsed = parse_dice_notation(notation) rolls = [random.randint(1, parsed["die_size"]) for _ in range(parsed["num_dice"])] if parsed["keep_high"]: keep = int(parsed["keep_high"][2:]) rolls = sorted(rolls, reverse=True)[:keep] return rolls def calculate_modifier(ability_score: int) -> int: """Calculate D&D ability modifier from score""" return (ability_score - 10) // 2 def validate_ethereum_address(address: str) -> bool: """Validate Ethereum address format""" pattern = r'^0x[a-fA-F0-9]{40}$' return bool(re.match(pattern, address)) def validate_solana_address(address: str) -> bool: """Validate Solana address format""" pattern = r'^[1-9A-HJ-NP-Za-km-z]{32,44}$' return bool(re.match(pattern, address)) def extract_urls(text: str) -> List[str]: """Extract URLs from text""" url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+' return re.findall(url_pattern, text) def extract_code_blocks(text: str) -> List[str]: """Extract code blocks from markdown text""" pattern = r'```(?:\w+)?\n(.*?)```' return re.findall(pattern, text, re.DOTALL) def parse_math_expression(expr: str) -> float: """Safely evaluate simple math expressions""" allowed_chars = set("0123456789+-*/.() ") if all(c in allowed_chars for c in expr): return eval(expr) raise ValueError(f"Unsafe expression: {expr}") def create_timer(func): """Decorator to time function execution""" import time from functools import wraps @wraps(func) def wrapper(*args, **kwargs): start = time.time() result = func(*args, **kwargs) duration = time.time() - start print(f"{func.__name__} took {format_duration(duration)}") return result return wrapper def retry_on_failure(max_attempts: int = 3, delay: float = 1.0): """Decorator to retry function on failure""" from functools import wraps import time def decorator(func): @wraps(func) def wrapper(*args, **kwargs): for attempt in range(max_attempts): try: return func(*args, **kwargs) except Exception as e: if attempt == max_attempts - 1: raise time.sleep(delay * (attempt + 1)) return wrapper return decorator class SimpleCache: """Simple in-memory cache""" def __init__(self, max_size: int = 100): self.cache: Dict[str, Any] = {} self.max_size = max_size self.access_times: Dict[str, datetime] = {} def get(self, key: str) -> Optional[Any]: """Get value from cache""" if key in self.cache: self.access_times[key] = datetime.now() return self.cache[key] return None def set(self, key: str, value: Any): """Set value in cache""" if len(self.cache) >= self.max_size: oldest = min(self.access_times.items(), key=lambda x: x[1])[0] del self.cache[oldest] del self.access_times[oldest] self.cache[key] = value self.access_times[key] = datetime.now() def clear(self): """Clear cache""" self.cache.clear() self.access_times.clear() class Database: """Simple SQLite wrapper""" def __init__(self, db_path: str = "data.db"): self.db_path = db_path Path(db_path).parent.mkdir(parents=True, exist_ok=True) self.conn = None def connect(self): """Connect to database""" self.conn = sqlite3.connect(self.db_path) self.conn.row_factory = sqlite3.Row def close(self): """Close database connection""" if self.conn: self.conn.close() def execute(self, query: str, params: tuple = ()) -> sqlite3.Cursor: """Execute query""" if not self.conn: self.connect() return self.conn.execute(query, params) def commit(self): """Commit transaction""" if self.conn: self.conn.commit() def fetchall(self, query: str, params: tuple = ()) -> List[Dict]: """Fetch all results""" cursor = self.execute(query, params) return [dict(row) for row in cursor.fetchall()] def fetchone(self, query: str, params: tuple = ()) -> Optional[Dict]: """Fetch one result""" cursor = self.execute(query, params) row = cursor.fetchone() return dict(row) if row else None def create_table(self, name: str, columns: Dict[str, str]): """Create table with columns""" cols = ", ".join([f"{k} {v}" for k, v in columns.items()]) self.execute(f"CREATE TABLE IF NOT EXISTS {name} ({cols})") self.commit() def load_json_file(filepath: str) -> Dict: """Load JSON file""" with open(filepath, 'r') as f: return json.load(f) def save_json_file(data: Dict, filepath: str): """Save JSON file""" Path(filepath).parent.mkdir(parents=True, exist_ok=True) with open(filepath, 'w') as f: json.dump(data, f, indent=2) def merge_dicts(*dicts: Dict) -> Dict: """Merge multiple dictionaries""" result = {} for d in dicts: result.update(d) return result def flatten_list(nested: List[Any]) -> List[Any]: """Flatten nested list""" result = [] for item in nested: if isinstance(item, list): result.extend(flatten_list(item)) else: result.append(item) return result def chunk_text(text: str, chunk_size: int, overlap: int = 0) -> List[str]: """Split text into overlapping chunks""" chunks = [] start = 0 while start < len(text): end = start + chunk_size chunks.append(text[start:end]) start = end - overlap return chunks def get_project_root() -> Path: """Get project root directory""" return Path(__file__).parent.parent def ensure_dir(path: str): """Ensure directory exists""" Path(path).mkdir(parents=True, exist_ok=True)