Spaces:
Running
Running
| """ | |
| HF-Master Shared Utilities | |
| Helper functions for all projects | |
| """ | |
| import os | |
| import re | |
| import json | |
| import hashlib | |
| from typing import Dict, List, Optional, Any, Union | |
| from datetime import datetime | |
| from pathlib import Path | |
| import sqlite3 | |
| def load_env(var_name: str, default: Optional[str] = None) -> Optional[str]: | |
| """Load environment variable with optional default""" | |
| return os.getenv(var_name, default) | |
| def load_api_key(provider: str = "openai") -> Optional[str]: | |
| """Load API key for specified provider""" | |
| key_map = { | |
| "openai": "OPENAI_API_KEY", | |
| "anthropic": "ANTHROPIC_API_KEY", | |
| "huggingface": "HF_TOKEN", | |
| "cohere": "COHERE_API_KEY", | |
| "together": "TOGETHER_API_KEY" | |
| } | |
| env_var = key_map.get(provider.lower()) | |
| if env_var: | |
| return load_env(env_var) | |
| return None | |
| def estimate_token_count(text: str, model: str = "gpt-4") -> int: | |
| """Estimate token count for text""" | |
| tokens_per_word = { | |
| "gpt-4": 4, # ~4 chars per token | |
| "gpt-3.5": 4, | |
| "claude": 4, | |
| "llama": 3 # More efficient | |
| } | |
| chars_per_token = tokens_per_word.get(model, 4) | |
| return len(text) // chars_per_token | |
| def estimate_tokens(text: str, model: str = "gpt-4") -> int: | |
| """Backward-compatible alias used by older apps""" | |
| return estimate_token_count(text, model) | |
| def calculate_api_cost( | |
| model: str, | |
| input_tokens: int, | |
| output_tokens: int, | |
| provider: str = "openai" | |
| ) -> float: | |
| """Calculate API cost for model usage""" | |
| pricing = { | |
| "openai": { | |
| "gpt-4": {"input": 0.03, "output": 0.06}, | |
| "gpt-3.5-turbo": {"input": 0.001, "output": 0.002}, | |
| "gpt-4-turbo": {"input": 0.01, "output": 0.03} | |
| }, | |
| "anthropic": { | |
| "claude-3-opus": {"input": 0.015, "output": 0.075}, | |
| "claude-3-sonnet": {"input": 0.003, "output": 0.015} | |
| } | |
| } | |
| provider_pricing = pricing.get(provider, {}) | |
| model_pricing = provider_pricing.get(model, {"input": 0.01, "output": 0.03}) | |
| input_cost = (input_tokens / 1000) * model_pricing["input"] | |
| output_cost = (output_tokens / 1000) * model_pricing["output"] | |
| return input_cost + output_cost | |
| def calculate_cost(tokens: int, model: str = "gpt-4", provider: str = "openai") -> float: | |
| """Backward-compatible alias used by older apps""" | |
| return calculate_api_cost(model=model, input_tokens=tokens, output_tokens=0, provider=provider) | |
| def sanitize_filename(name: str) -> str: | |
| """Convert string to safe filename""" | |
| name = name.lower().strip() | |
| name = re.sub(r'[^\w\s-]', '', name) | |
| name = re.sub(r'[\s]+', '-', name) | |
| return name | |
| def create_hash(text: str, length: int = 8) -> str: | |
| """Create short hash from text""" | |
| return hashlib.md5(text.encode()).hexdigest()[:length] | |
| def format_duration(seconds: float) -> str: | |
| """Format duration in human-readable form""" | |
| if seconds < 60: | |
| return f"{seconds:.1f}s" | |
| elif seconds < 3600: | |
| return f"{seconds/60:.1f}m" | |
| else: | |
| return f"{seconds/3600:.1f}h" | |
| def format_bytes(bytes: int) -> str: | |
| """Format bytes in human-readable form""" | |
| for unit in ['B', 'KB', 'MB', 'GB', 'TB']: | |
| if bytes < 1024: | |
| return f"{bytes:.1f} {unit}" | |
| bytes /= 1024 | |
| return f"{bytes:.1f} PB" | |
| def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str: | |
| """Truncate text with suffix""" | |
| if len(text) <= max_length: | |
| return text | |
| return text[:max_length - len(suffix)] + suffix | |
| def parse_dice_notation(notation: str) -> Dict[str, Any]: | |
| """Parse dice notation like 2d6+3""" | |
| match = re.match(r'(\d+)d(\d+)(kh\d+)?([+-]\d+)?', notation.upper()) | |
| if not match: | |
| raise ValueError(f"Invalid dice notation: {notation}") | |
| num_dice = int(match.group(1)) | |
| die_size = int(match.group(2)) | |
| keep_high = match.group(3) | |
| modifier = int(match.group(4)) if match.group(4) else 0 | |
| return { | |
| "num_dice": num_dice, | |
| "die_size": die_size, | |
| "keep_high": keep_high, | |
| "modifier": modifier | |
| } | |
| def roll_dice(notation: str) -> List[int]: | |
| """Roll dice and return individual rolls""" | |
| import random | |
| parsed = parse_dice_notation(notation) | |
| rolls = [random.randint(1, parsed["die_size"]) for _ in range(parsed["num_dice"])] | |
| if parsed["keep_high"]: | |
| keep = int(parsed["keep_high"][2:]) | |
| rolls = sorted(rolls, reverse=True)[:keep] | |
| return rolls | |
| def calculate_modifier(ability_score: int) -> int: | |
| """Calculate D&D ability modifier from score""" | |
| return (ability_score - 10) // 2 | |
| def validate_ethereum_address(address: str) -> bool: | |
| """Validate Ethereum address format""" | |
| pattern = r'^0x[a-fA-F0-9]{40}$' | |
| return bool(re.match(pattern, address)) | |
| def validate_solana_address(address: str) -> bool: | |
| """Validate Solana address format""" | |
| pattern = r'^[1-9A-HJ-NP-Za-km-z]{32,44}$' | |
| return bool(re.match(pattern, address)) | |
| def extract_urls(text: str) -> List[str]: | |
| """Extract URLs from text""" | |
| url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+' | |
| return re.findall(url_pattern, text) | |
| def extract_code_blocks(text: str) -> List[str]: | |
| """Extract code blocks from markdown text""" | |
| pattern = r'```(?:\w+)?\n(.*?)```' | |
| return re.findall(pattern, text, re.DOTALL) | |
| def parse_math_expression(expr: str) -> float: | |
| """Safely evaluate simple math expressions""" | |
| allowed_chars = set("0123456789+-*/.() ") | |
| if all(c in allowed_chars for c in expr): | |
| return eval(expr) | |
| raise ValueError(f"Unsafe expression: {expr}") | |
| def create_timer(func): | |
| """Decorator to time function execution""" | |
| import time | |
| from functools import wraps | |
| def wrapper(*args, **kwargs): | |
| start = time.time() | |
| result = func(*args, **kwargs) | |
| duration = time.time() - start | |
| print(f"{func.__name__} took {format_duration(duration)}") | |
| return result | |
| return wrapper | |
| def retry_on_failure(max_attempts: int = 3, delay: float = 1.0): | |
| """Decorator to retry function on failure""" | |
| from functools import wraps | |
| import time | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| for attempt in range(max_attempts): | |
| try: | |
| return func(*args, **kwargs) | |
| except Exception as e: | |
| if attempt == max_attempts - 1: | |
| raise | |
| time.sleep(delay * (attempt + 1)) | |
| return wrapper | |
| return decorator | |
| class SimpleCache: | |
| """Simple in-memory cache""" | |
| def __init__(self, max_size: int = 100): | |
| self.cache: Dict[str, Any] = {} | |
| self.max_size = max_size | |
| self.access_times: Dict[str, datetime] = {} | |
| def get(self, key: str) -> Optional[Any]: | |
| """Get value from cache""" | |
| if key in self.cache: | |
| self.access_times[key] = datetime.now() | |
| return self.cache[key] | |
| return None | |
| def set(self, key: str, value: Any): | |
| """Set value in cache""" | |
| if len(self.cache) >= self.max_size: | |
| oldest = min(self.access_times.items(), key=lambda x: x[1])[0] | |
| del self.cache[oldest] | |
| del self.access_times[oldest] | |
| self.cache[key] = value | |
| self.access_times[key] = datetime.now() | |
| def clear(self): | |
| """Clear cache""" | |
| self.cache.clear() | |
| self.access_times.clear() | |
| class Database: | |
| """Simple SQLite wrapper""" | |
| def __init__(self, db_path: str = "data.db"): | |
| self.db_path = db_path | |
| Path(db_path).parent.mkdir(parents=True, exist_ok=True) | |
| self.conn = None | |
| def connect(self): | |
| """Connect to database""" | |
| self.conn = sqlite3.connect(self.db_path) | |
| self.conn.row_factory = sqlite3.Row | |
| def close(self): | |
| """Close database connection""" | |
| if self.conn: | |
| self.conn.close() | |
| def execute(self, query: str, params: tuple = ()) -> sqlite3.Cursor: | |
| """Execute query""" | |
| if not self.conn: | |
| self.connect() | |
| return self.conn.execute(query, params) | |
| def commit(self): | |
| """Commit transaction""" | |
| if self.conn: | |
| self.conn.commit() | |
| def fetchall(self, query: str, params: tuple = ()) -> List[Dict]: | |
| """Fetch all results""" | |
| cursor = self.execute(query, params) | |
| return [dict(row) for row in cursor.fetchall()] | |
| def fetchone(self, query: str, params: tuple = ()) -> Optional[Dict]: | |
| """Fetch one result""" | |
| cursor = self.execute(query, params) | |
| row = cursor.fetchone() | |
| return dict(row) if row else None | |
| def create_table(self, name: str, columns: Dict[str, str]): | |
| """Create table with columns""" | |
| cols = ", ".join([f"{k} {v}" for k, v in columns.items()]) | |
| self.execute(f"CREATE TABLE IF NOT EXISTS {name} ({cols})") | |
| self.commit() | |
| def load_json_file(filepath: str) -> Dict: | |
| """Load JSON file""" | |
| with open(filepath, 'r') as f: | |
| return json.load(f) | |
| def save_json_file(data: Dict, filepath: str): | |
| """Save JSON file""" | |
| Path(filepath).parent.mkdir(parents=True, exist_ok=True) | |
| with open(filepath, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| def merge_dicts(*dicts: Dict) -> Dict: | |
| """Merge multiple dictionaries""" | |
| result = {} | |
| for d in dicts: | |
| result.update(d) | |
| return result | |
| def flatten_list(nested: List[Any]) -> List[Any]: | |
| """Flatten nested list""" | |
| result = [] | |
| for item in nested: | |
| if isinstance(item, list): | |
| result.extend(flatten_list(item)) | |
| else: | |
| result.append(item) | |
| return result | |
| def chunk_text(text: str, chunk_size: int, overlap: int = 0) -> List[str]: | |
| """Split text into overlapping chunks""" | |
| chunks = [] | |
| start = 0 | |
| while start < len(text): | |
| end = start + chunk_size | |
| chunks.append(text[start:end]) | |
| start = end - overlap | |
| return chunks | |
| def get_project_root() -> Path: | |
| """Get project root directory""" | |
| return Path(__file__).parent.parent | |
| def ensure_dir(path: str): | |
| """Ensure directory exists""" | |
| Path(path).mkdir(parents=True, exist_ok=True) | |