sammoftah's picture
Deploy Code Search Engine
a6356f4 verified
"""
HF-Master Shared Utilities
Helper functions for all projects
"""
import os
import re
import json
import hashlib
from typing import Dict, List, Optional, Any, Union
from datetime import datetime
from pathlib import Path
import sqlite3
def load_env(var_name: str, default: Optional[str] = None) -> Optional[str]:
"""Load environment variable with optional default"""
return os.getenv(var_name, default)
def load_api_key(provider: str = "openai") -> Optional[str]:
"""Load API key for specified provider"""
key_map = {
"openai": "OPENAI_API_KEY",
"anthropic": "ANTHROPIC_API_KEY",
"huggingface": "HF_TOKEN",
"cohere": "COHERE_API_KEY",
"together": "TOGETHER_API_KEY"
}
env_var = key_map.get(provider.lower())
if env_var:
return load_env(env_var)
return None
def estimate_token_count(text: str, model: str = "gpt-4") -> int:
"""Estimate token count for text"""
tokens_per_word = {
"gpt-4": 4, # ~4 chars per token
"gpt-3.5": 4,
"claude": 4,
"llama": 3 # More efficient
}
chars_per_token = tokens_per_word.get(model, 4)
return len(text) // chars_per_token
def estimate_tokens(text: str, model: str = "gpt-4") -> int:
"""Backward-compatible alias used by older apps"""
return estimate_token_count(text, model)
def calculate_api_cost(
model: str,
input_tokens: int,
output_tokens: int,
provider: str = "openai"
) -> float:
"""Calculate API cost for model usage"""
pricing = {
"openai": {
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-3.5-turbo": {"input": 0.001, "output": 0.002},
"gpt-4-turbo": {"input": 0.01, "output": 0.03}
},
"anthropic": {
"claude-3-opus": {"input": 0.015, "output": 0.075},
"claude-3-sonnet": {"input": 0.003, "output": 0.015}
}
}
provider_pricing = pricing.get(provider, {})
model_pricing = provider_pricing.get(model, {"input": 0.01, "output": 0.03})
input_cost = (input_tokens / 1000) * model_pricing["input"]
output_cost = (output_tokens / 1000) * model_pricing["output"]
return input_cost + output_cost
def calculate_cost(tokens: int, model: str = "gpt-4", provider: str = "openai") -> float:
"""Backward-compatible alias used by older apps"""
return calculate_api_cost(model=model, input_tokens=tokens, output_tokens=0, provider=provider)
def sanitize_filename(name: str) -> str:
"""Convert string to safe filename"""
name = name.lower().strip()
name = re.sub(r'[^\w\s-]', '', name)
name = re.sub(r'[\s]+', '-', name)
return name
def create_hash(text: str, length: int = 8) -> str:
"""Create short hash from text"""
return hashlib.md5(text.encode()).hexdigest()[:length]
def format_duration(seconds: float) -> str:
"""Format duration in human-readable form"""
if seconds < 60:
return f"{seconds:.1f}s"
elif seconds < 3600:
return f"{seconds/60:.1f}m"
else:
return f"{seconds/3600:.1f}h"
def format_bytes(bytes: int) -> str:
"""Format bytes in human-readable form"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes < 1024:
return f"{bytes:.1f} {unit}"
bytes /= 1024
return f"{bytes:.1f} PB"
def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str:
"""Truncate text with suffix"""
if len(text) <= max_length:
return text
return text[:max_length - len(suffix)] + suffix
def parse_dice_notation(notation: str) -> Dict[str, Any]:
"""Parse dice notation like 2d6+3"""
match = re.match(r'(\d+)d(\d+)(kh\d+)?([+-]\d+)?', notation.upper())
if not match:
raise ValueError(f"Invalid dice notation: {notation}")
num_dice = int(match.group(1))
die_size = int(match.group(2))
keep_high = match.group(3)
modifier = int(match.group(4)) if match.group(4) else 0
return {
"num_dice": num_dice,
"die_size": die_size,
"keep_high": keep_high,
"modifier": modifier
}
def roll_dice(notation: str) -> List[int]:
"""Roll dice and return individual rolls"""
import random
parsed = parse_dice_notation(notation)
rolls = [random.randint(1, parsed["die_size"]) for _ in range(parsed["num_dice"])]
if parsed["keep_high"]:
keep = int(parsed["keep_high"][2:])
rolls = sorted(rolls, reverse=True)[:keep]
return rolls
def calculate_modifier(ability_score: int) -> int:
"""Calculate D&D ability modifier from score"""
return (ability_score - 10) // 2
def validate_ethereum_address(address: str) -> bool:
"""Validate Ethereum address format"""
pattern = r'^0x[a-fA-F0-9]{40}$'
return bool(re.match(pattern, address))
def validate_solana_address(address: str) -> bool:
"""Validate Solana address format"""
pattern = r'^[1-9A-HJ-NP-Za-km-z]{32,44}$'
return bool(re.match(pattern, address))
def extract_urls(text: str) -> List[str]:
"""Extract URLs from text"""
url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+'
return re.findall(url_pattern, text)
def extract_code_blocks(text: str) -> List[str]:
"""Extract code blocks from markdown text"""
pattern = r'```(?:\w+)?\n(.*?)```'
return re.findall(pattern, text, re.DOTALL)
def parse_math_expression(expr: str) -> float:
"""Safely evaluate simple math expressions"""
allowed_chars = set("0123456789+-*/.() ")
if all(c in allowed_chars for c in expr):
return eval(expr)
raise ValueError(f"Unsafe expression: {expr}")
def create_timer(func):
"""Decorator to time function execution"""
import time
from functools import wraps
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
duration = time.time() - start
print(f"{func.__name__} took {format_duration(duration)}")
return result
return wrapper
def retry_on_failure(max_attempts: int = 3, delay: float = 1.0):
"""Decorator to retry function on failure"""
from functools import wraps
import time
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise
time.sleep(delay * (attempt + 1))
return wrapper
return decorator
class SimpleCache:
"""Simple in-memory cache"""
def __init__(self, max_size: int = 100):
self.cache: Dict[str, Any] = {}
self.max_size = max_size
self.access_times: Dict[str, datetime] = {}
def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
if key in self.cache:
self.access_times[key] = datetime.now()
return self.cache[key]
return None
def set(self, key: str, value: Any):
"""Set value in cache"""
if len(self.cache) >= self.max_size:
oldest = min(self.access_times.items(), key=lambda x: x[1])[0]
del self.cache[oldest]
del self.access_times[oldest]
self.cache[key] = value
self.access_times[key] = datetime.now()
def clear(self):
"""Clear cache"""
self.cache.clear()
self.access_times.clear()
class Database:
"""Simple SQLite wrapper"""
def __init__(self, db_path: str = "data.db"):
self.db_path = db_path
Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self.conn = None
def connect(self):
"""Connect to database"""
self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row
def close(self):
"""Close database connection"""
if self.conn:
self.conn.close()
def execute(self, query: str, params: tuple = ()) -> sqlite3.Cursor:
"""Execute query"""
if not self.conn:
self.connect()
return self.conn.execute(query, params)
def commit(self):
"""Commit transaction"""
if self.conn:
self.conn.commit()
def fetchall(self, query: str, params: tuple = ()) -> List[Dict]:
"""Fetch all results"""
cursor = self.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
def fetchone(self, query: str, params: tuple = ()) -> Optional[Dict]:
"""Fetch one result"""
cursor = self.execute(query, params)
row = cursor.fetchone()
return dict(row) if row else None
def create_table(self, name: str, columns: Dict[str, str]):
"""Create table with columns"""
cols = ", ".join([f"{k} {v}" for k, v in columns.items()])
self.execute(f"CREATE TABLE IF NOT EXISTS {name} ({cols})")
self.commit()
def load_json_file(filepath: str) -> Dict:
"""Load JSON file"""
with open(filepath, 'r') as f:
return json.load(f)
def save_json_file(data: Dict, filepath: str):
"""Save JSON file"""
Path(filepath).parent.mkdir(parents=True, exist_ok=True)
with open(filepath, 'w') as f:
json.dump(data, f, indent=2)
def merge_dicts(*dicts: Dict) -> Dict:
"""Merge multiple dictionaries"""
result = {}
for d in dicts:
result.update(d)
return result
def flatten_list(nested: List[Any]) -> List[Any]:
"""Flatten nested list"""
result = []
for item in nested:
if isinstance(item, list):
result.extend(flatten_list(item))
else:
result.append(item)
return result
def chunk_text(text: str, chunk_size: int, overlap: int = 0) -> List[str]:
"""Split text into overlapping chunks"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
start = end - overlap
return chunks
def get_project_root() -> Path:
"""Get project root directory"""
return Path(__file__).parent.parent
def ensure_dir(path: str):
"""Ensure directory exists"""
Path(path).mkdir(parents=True, exist_ok=True)