| """ |
| HF-Master Shared Utilities |
| Helper functions for all projects |
| """ |
|
|
| import os |
| import re |
| import json |
| import hashlib |
| from typing import Dict, List, Optional, Any, Union |
| from datetime import datetime |
| from pathlib import Path |
| import sqlite3 |
|
|
|
|
| def load_env(var_name: str, default: Optional[str] = None) -> Optional[str]: |
| """Load environment variable with optional default""" |
| return os.getenv(var_name, default) |
|
|
|
|
| def load_api_key(provider: str = "openai") -> Optional[str]: |
| """Load API key for specified provider""" |
| key_map = { |
| "openai": "OPENAI_API_KEY", |
| "anthropic": "ANTHROPIC_API_KEY", |
| "huggingface": "HF_TOKEN", |
| "cohere": "COHERE_API_KEY", |
| "together": "TOGETHER_API_KEY" |
| } |
|
|
| env_var = key_map.get(provider.lower()) |
| if env_var: |
| return load_env(env_var) |
|
|
| return None |
|
|
|
|
| def estimate_token_count(text: str, model: str = "gpt-4") -> int: |
| """Estimate token count for text""" |
| tokens_per_word = { |
| "gpt-4": 4, |
| "gpt-3.5": 4, |
| "claude": 4, |
| "llama": 3 |
| } |
|
|
| chars_per_token = tokens_per_word.get(model, 4) |
| return len(text) // chars_per_token |
|
|
|
|
| def estimate_tokens(text: str, model: str = "gpt-4") -> int: |
| """Backward-compatible alias used by older apps""" |
| return estimate_token_count(text, model) |
|
|
|
|
| def calculate_api_cost( |
| model: str, |
| input_tokens: int, |
| output_tokens: int, |
| provider: str = "openai" |
| ) -> float: |
| """Calculate API cost for model usage""" |
|
|
| pricing = { |
| "openai": { |
| "gpt-4": {"input": 0.03, "output": 0.06}, |
| "gpt-3.5-turbo": {"input": 0.001, "output": 0.002}, |
| "gpt-4-turbo": {"input": 0.01, "output": 0.03} |
| }, |
| "anthropic": { |
| "claude-3-opus": {"input": 0.015, "output": 0.075}, |
| "claude-3-sonnet": {"input": 0.003, "output": 0.015} |
| } |
| } |
|
|
| provider_pricing = pricing.get(provider, {}) |
| model_pricing = provider_pricing.get(model, {"input": 0.01, "output": 0.03}) |
|
|
| input_cost = (input_tokens / 1000) * model_pricing["input"] |
| output_cost = (output_tokens / 1000) * model_pricing["output"] |
|
|
| return input_cost + output_cost |
|
|
|
|
| def calculate_cost(tokens: int, model: str = "gpt-4", provider: str = "openai") -> float: |
| """Backward-compatible alias used by older apps""" |
| return calculate_api_cost(model=model, input_tokens=tokens, output_tokens=0, provider=provider) |
|
|
|
|
| def sanitize_filename(name: str) -> str: |
| """Convert string to safe filename""" |
| name = name.lower().strip() |
| name = re.sub(r'[^\w\s-]', '', name) |
| name = re.sub(r'[\s]+', '-', name) |
| return name |
|
|
|
|
| def create_hash(text: str, length: int = 8) -> str: |
| """Create short hash from text""" |
| return hashlib.md5(text.encode()).hexdigest()[:length] |
|
|
|
|
| def format_duration(seconds: float) -> str: |
| """Format duration in human-readable form""" |
| if seconds < 60: |
| return f"{seconds:.1f}s" |
| elif seconds < 3600: |
| return f"{seconds/60:.1f}m" |
| else: |
| return f"{seconds/3600:.1f}h" |
|
|
|
|
| def format_bytes(bytes: int) -> str: |
| """Format bytes in human-readable form""" |
| for unit in ['B', 'KB', 'MB', 'GB', 'TB']: |
| if bytes < 1024: |
| return f"{bytes:.1f} {unit}" |
| bytes /= 1024 |
| return f"{bytes:.1f} PB" |
|
|
|
|
| def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str: |
| """Truncate text with suffix""" |
| if len(text) <= max_length: |
| return text |
| return text[:max_length - len(suffix)] + suffix |
|
|
|
|
| def parse_dice_notation(notation: str) -> Dict[str, Any]: |
| """Parse dice notation like 2d6+3""" |
| match = re.match(r'(\d+)d(\d+)(kh\d+)?([+-]\d+)?', notation.upper()) |
| if not match: |
| raise ValueError(f"Invalid dice notation: {notation}") |
|
|
| num_dice = int(match.group(1)) |
| die_size = int(match.group(2)) |
| keep_high = match.group(3) |
| modifier = int(match.group(4)) if match.group(4) else 0 |
|
|
| return { |
| "num_dice": num_dice, |
| "die_size": die_size, |
| "keep_high": keep_high, |
| "modifier": modifier |
| } |
|
|
|
|
| def roll_dice(notation: str) -> List[int]: |
| """Roll dice and return individual rolls""" |
| import random |
|
|
| parsed = parse_dice_notation(notation) |
| rolls = [random.randint(1, parsed["die_size"]) for _ in range(parsed["num_dice"])] |
|
|
| if parsed["keep_high"]: |
| keep = int(parsed["keep_high"][2:]) |
| rolls = sorted(rolls, reverse=True)[:keep] |
|
|
| return rolls |
|
|
|
|
| def calculate_modifier(ability_score: int) -> int: |
| """Calculate D&D ability modifier from score""" |
| return (ability_score - 10) // 2 |
|
|
|
|
| def validate_ethereum_address(address: str) -> bool: |
| """Validate Ethereum address format""" |
| pattern = r'^0x[a-fA-F0-9]{40}$' |
| return bool(re.match(pattern, address)) |
|
|
|
|
| def validate_solana_address(address: str) -> bool: |
| """Validate Solana address format""" |
| pattern = r'^[1-9A-HJ-NP-Za-km-z]{32,44}$' |
| return bool(re.match(pattern, address)) |
|
|
|
|
| def extract_urls(text: str) -> List[str]: |
| """Extract URLs from text""" |
| url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+' |
| return re.findall(url_pattern, text) |
|
|
|
|
| def extract_code_blocks(text: str) -> List[str]: |
| """Extract code blocks from markdown text""" |
| pattern = r'```(?:\w+)?\n(.*?)```' |
| return re.findall(pattern, text, re.DOTALL) |
|
|
|
|
| def parse_math_expression(expr: str) -> float: |
| """Safely evaluate simple math expressions""" |
| allowed_chars = set("0123456789+-*/.() ") |
| if all(c in allowed_chars for c in expr): |
| return eval(expr) |
| raise ValueError(f"Unsafe expression: {expr}") |
|
|
|
|
| def create_timer(func): |
| """Decorator to time function execution""" |
| import time |
| from functools import wraps |
|
|
| @wraps(func) |
| def wrapper(*args, **kwargs): |
| start = time.time() |
| result = func(*args, **kwargs) |
| duration = time.time() - start |
| print(f"{func.__name__} took {format_duration(duration)}") |
| return result |
|
|
| return wrapper |
|
|
|
|
| def retry_on_failure(max_attempts: int = 3, delay: float = 1.0): |
| """Decorator to retry function on failure""" |
| from functools import wraps |
| import time |
|
|
| def decorator(func): |
| @wraps(func) |
| def wrapper(*args, **kwargs): |
| for attempt in range(max_attempts): |
| try: |
| return func(*args, **kwargs) |
| except Exception as e: |
| if attempt == max_attempts - 1: |
| raise |
| time.sleep(delay * (attempt + 1)) |
|
|
| return wrapper |
|
|
| return decorator |
|
|
|
|
| class SimpleCache: |
| """Simple in-memory cache""" |
|
|
| def __init__(self, max_size: int = 100): |
| self.cache: Dict[str, Any] = {} |
| self.max_size = max_size |
| self.access_times: Dict[str, datetime] = {} |
|
|
| def get(self, key: str) -> Optional[Any]: |
| """Get value from cache""" |
| if key in self.cache: |
| self.access_times[key] = datetime.now() |
| return self.cache[key] |
| return None |
|
|
| def set(self, key: str, value: Any): |
| """Set value in cache""" |
| if len(self.cache) >= self.max_size: |
| oldest = min(self.access_times.items(), key=lambda x: x[1])[0] |
| del self.cache[oldest] |
| del self.access_times[oldest] |
|
|
| self.cache[key] = value |
| self.access_times[key] = datetime.now() |
|
|
| def clear(self): |
| """Clear cache""" |
| self.cache.clear() |
| self.access_times.clear() |
|
|
|
|
| class Database: |
| """Simple SQLite wrapper""" |
|
|
| def __init__(self, db_path: str = "data.db"): |
| self.db_path = db_path |
| Path(db_path).parent.mkdir(parents=True, exist_ok=True) |
| self.conn = None |
|
|
| def connect(self): |
| """Connect to database""" |
| self.conn = sqlite3.connect(self.db_path) |
| self.conn.row_factory = sqlite3.Row |
|
|
| def close(self): |
| """Close database connection""" |
| if self.conn: |
| self.conn.close() |
|
|
| def execute(self, query: str, params: tuple = ()) -> sqlite3.Cursor: |
| """Execute query""" |
| if not self.conn: |
| self.connect() |
| return self.conn.execute(query, params) |
|
|
| def commit(self): |
| """Commit transaction""" |
| if self.conn: |
| self.conn.commit() |
|
|
| def fetchall(self, query: str, params: tuple = ()) -> List[Dict]: |
| """Fetch all results""" |
| cursor = self.execute(query, params) |
| return [dict(row) for row in cursor.fetchall()] |
|
|
| def fetchone(self, query: str, params: tuple = ()) -> Optional[Dict]: |
| """Fetch one result""" |
| cursor = self.execute(query, params) |
| row = cursor.fetchone() |
| return dict(row) if row else None |
|
|
| def create_table(self, name: str, columns: Dict[str, str]): |
| """Create table with columns""" |
| cols = ", ".join([f"{k} {v}" for k, v in columns.items()]) |
| self.execute(f"CREATE TABLE IF NOT EXISTS {name} ({cols})") |
| self.commit() |
|
|
|
|
| def load_json_file(filepath: str) -> Dict: |
| """Load JSON file""" |
| with open(filepath, 'r') as f: |
| return json.load(f) |
|
|
|
|
| def save_json_file(data: Dict, filepath: str): |
| """Save JSON file""" |
| Path(filepath).parent.mkdir(parents=True, exist_ok=True) |
| with open(filepath, 'w') as f: |
| json.dump(data, f, indent=2) |
|
|
|
|
| def merge_dicts(*dicts: Dict) -> Dict: |
| """Merge multiple dictionaries""" |
| result = {} |
| for d in dicts: |
| result.update(d) |
| return result |
|
|
|
|
| def flatten_list(nested: List[Any]) -> List[Any]: |
| """Flatten nested list""" |
| result = [] |
| for item in nested: |
| if isinstance(item, list): |
| result.extend(flatten_list(item)) |
| else: |
| result.append(item) |
| return result |
|
|
|
|
| def chunk_text(text: str, chunk_size: int, overlap: int = 0) -> List[str]: |
| """Split text into overlapping chunks""" |
| chunks = [] |
| start = 0 |
|
|
| while start < len(text): |
| end = start + chunk_size |
| chunks.append(text[start:end]) |
| start = end - overlap |
|
|
| return chunks |
|
|
|
|
| def get_project_root() -> Path: |
| """Get project root directory""" |
| return Path(__file__).parent.parent |
|
|
|
|
| def ensure_dir(path: str): |
| """Ensure directory exists""" |
| Path(path).mkdir(parents=True, exist_ok=True) |
|
|