""" Response optimization utilities for API endpoints. This module provides response compression, caching headers, and other performance optimizations for HTTP responses. """ import gzip import json import logging from datetime import datetime, timedelta from typing import Any, Dict, Optional, Union, Tuple from functools import wraps from flask import Response, request, current_app, make_response, jsonify logger = logging.getLogger(__name__) class ResponseOptimizer: """Handles response optimization including compression and caching.""" def __init__(self): """Initialize response optimizer.""" self.compression_threshold = 1024 # Compress responses larger than 1KB self.default_cache_max_age = 300 # 5 minutes default cache self.compressible_types = { 'application/json', 'text/html', 'text/plain', 'text/css', 'text/javascript', 'application/javascript' } def compress_response(self, response: Response) -> Response: """ Compress response if appropriate. Args: response: Flask response object Returns: Potentially compressed response """ # Check if compression is supported by client accept_encoding = request.headers.get('Accept-Encoding', '') if 'gzip' not in accept_encoding.lower(): return response # Skip compression for streaming responses or direct passthrough if response.direct_passthrough or response.is_streamed: return response # Check if response has data and is large enough to compress try: if not hasattr(response, 'data') or len(response.data) < self.compression_threshold: return response except (RuntimeError, AttributeError): # Handle cases where data access fails (e.g., direct passthrough mode) return response # Check if content type is compressible content_type = response.headers.get('Content-Type', '').split(';')[0] if content_type not in self.compressible_types: return response # Check if already compressed if response.headers.get('Content-Encoding'): return response try: # Compress the response data compressed_data = gzip.compress(response.data) # Only use compression if it actually reduces size if len(compressed_data) < len(response.data): response.data = compressed_data response.headers['Content-Encoding'] = 'gzip' response.headers['Content-Length'] = len(compressed_data) response.headers['Vary'] = 'Accept-Encoding' logger.debug(f"Compressed response: {len(response.data)} -> {len(compressed_data)} bytes") except Exception as e: logger.warning(f"Failed to compress response: {e}") return response def add_cache_headers(self, response: Response, max_age: Optional[int] = None, etag: Optional[str] = None, last_modified: Optional[datetime] = None, cache_type: str = 'public') -> Response: """ Add appropriate cache headers to response. Args: response: Flask response object max_age: Cache max age in seconds etag: ETag value for conditional requests last_modified: Last modified timestamp cache_type: Cache type ('public', 'private', 'no-cache') Returns: Response with cache headers """ max_age = max_age or self.default_cache_max_age # Set Cache-Control header if cache_type == 'no-cache': response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' response.headers['Pragma'] = 'no-cache' response.headers['Expires'] = '0' else: response.headers['Cache-Control'] = f'{cache_type}, max-age={max_age}' # Set Expires header expires = datetime.utcnow() + timedelta(seconds=max_age) response.headers['Expires'] = expires.strftime('%a, %d %b %Y %H:%M:%S GMT') # Set ETag if provided if etag: response.headers['ETag'] = f'"{etag}"' # Set Last-Modified if provided if last_modified: response.headers['Last-Modified'] = last_modified.strftime('%a, %d %b %Y %H:%M:%S GMT') return response def handle_conditional_request(self, etag: Optional[str] = None, last_modified: Optional[datetime] = None) -> Optional[Response]: """ Handle conditional requests (If-None-Match, If-Modified-Since). Args: etag: Current ETag value last_modified: Current last modified timestamp Returns: 304 Not Modified response if conditions match, None otherwise """ # Handle If-None-Match (ETag) if etag: if_none_match = request.headers.get('If-None-Match') if if_none_match: # Remove quotes from ETag values client_etags = [tag.strip('"') for tag in if_none_match.split(',')] if etag in client_etags or '*' in client_etags: response = make_response('', 304) response.headers['ETag'] = f'"{etag}"' return response # Handle If-Modified-Since if last_modified: if_modified_since = request.headers.get('If-Modified-Since') if if_modified_since: try: client_time = datetime.strptime(if_modified_since, '%a, %d %b %Y %H:%M:%S GMT') # Remove microseconds for comparison server_time = last_modified.replace(microsecond=0) client_time = client_time.replace(microsecond=0) if server_time <= client_time: response = make_response('', 304) response.headers['Last-Modified'] = last_modified.strftime('%a, %d %b %Y %H:%M:%S GMT') return response except ValueError: # Invalid date format, ignore pass return None def generate_etag(self, data: Union[str, bytes, Dict, Any]) -> str: """ Generate ETag for response data. Args: data: Response data Returns: ETag string """ import hashlib if isinstance(data, dict): data_str = json.dumps(data, sort_keys=True) elif isinstance(data, bytes): data_str = data.decode('utf-8', errors='ignore') else: data_str = str(data) return hashlib.md5(data_str.encode('utf-8')).hexdigest() # Global response optimizer instance response_optimizer = ResponseOptimizer() def compress_response(f): """Decorator to automatically compress responses.""" @wraps(f) def decorated_function(*args, **kwargs): response = f(*args, **kwargs) # Convert to Response object if needed if not isinstance(response, Response): response = make_response(response) return response_optimizer.compress_response(response) return decorated_function def cache_response(max_age: int = 300, cache_type: str = 'public', generate_etag: bool = True): """ Decorator to add cache headers to responses. Args: max_age: Cache max age in seconds cache_type: Cache type ('public', 'private', 'no-cache') generate_etag: Whether to generate ETag automatically """ def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): # Check for conditional request first if generate_etag and request.method == 'GET': # For simple cases, we can't pre-generate ETag without calling the function # This is a limitation of this approach - for better ETag support, # consider implementing at the view level pass response = f(*args, **kwargs) # Convert to Response object if needed if not isinstance(response, Response): response = make_response(response) # Generate ETag if requested etag = None if generate_etag and hasattr(response, 'data'): etag = response_optimizer.generate_etag(response.data) # Add cache headers response = response_optimizer.add_cache_headers( response, max_age=max_age, etag=etag, cache_type=cache_type ) return response return decorated_function return decorator def no_cache(f): """Decorator to prevent caching of responses.""" @wraps(f) def decorated_function(*args, **kwargs): response = f(*args, **kwargs) # Convert to Response object if needed if not isinstance(response, Response): response = make_response(response) return response_optimizer.add_cache_headers(response, cache_type='no-cache') return decorated_function def conditional_response(etag_func=None, last_modified_func=None): """ Decorator for conditional responses with ETag and Last-Modified support. Args: etag_func: Function to generate ETag (receives same args as decorated function) last_modified_func: Function to get last modified time (receives same args as decorated function) """ def decorator(f): @wraps(f) def decorated_function(*args, **kwargs): etag = None last_modified = None # Generate ETag if function provided if etag_func: try: etag = etag_func(*args, **kwargs) except Exception as e: logger.warning(f"Failed to generate ETag: {e}") # Get last modified time if function provided if last_modified_func: try: last_modified = last_modified_func(*args, **kwargs) except Exception as e: logger.warning(f"Failed to get last modified time: {e}") # Check conditional request conditional_response = response_optimizer.handle_conditional_request( etag=etag, last_modified=last_modified ) if conditional_response: return conditional_response # Call original function response = f(*args, **kwargs) # Convert to Response object if needed if not isinstance(response, Response): response = make_response(response) # Add cache headers with ETag and Last-Modified response = response_optimizer.add_cache_headers( response, etag=etag, last_modified=last_modified ) return response return decorated_function return decorator def optimize_json_response(data: Dict[str, Any], status_code: int = 200, max_age: int = 300, compress: bool = True) -> Response: """ Create optimized JSON response with compression and caching. Args: data: Response data status_code: HTTP status code max_age: Cache max age in seconds compress: Whether to compress response Returns: Optimized Flask response """ response = jsonify(data) response.status_code = status_code # Add cache headers etag = response_optimizer.generate_etag(data) response = response_optimizer.add_cache_headers(response, max_age=max_age, etag=etag) # Compress if requested if compress: response = response_optimizer.compress_response(response) return response def create_streaming_response(generator, content_type: str = 'text/plain', compress: bool = False) -> Response: """ Create streaming response for real-time data. Args: generator: Data generator function content_type: Response content type compress: Whether to compress (not recommended for streaming) Returns: Streaming Flask response """ def generate(): try: for chunk in generator: if isinstance(chunk, dict): yield json.dumps(chunk) + '\n' else: yield str(chunk) except Exception as e: logger.error(f"Error in streaming response: {e}") yield json.dumps({'error': 'Stream interrupted'}) + '\n' response = Response(generate(), content_type=content_type) # Disable caching for streaming responses response = response_optimizer.add_cache_headers(response, cache_type='no-cache') # Add streaming headers response.headers['X-Accel-Buffering'] = 'no' # Disable nginx buffering response.headers['Connection'] = 'keep-alive' return response class ResponseMiddleware: """Middleware for automatic response optimization.""" def __init__(self, app=None): """Initialize response middleware.""" self.app = app if app is not None: self.init_app(app) def init_app(self, app): """Initialize middleware with Flask app.""" app.after_request(self.process_response) def process_response(self, response: Response) -> Response: """Process response for optimization.""" # Skip optimization for certain response types if response.status_code >= 400: return response # Skip if already processed if response.headers.get('X-Optimized'): return response # Skip optimization for static files and streaming responses if response.direct_passthrough or response.is_streamed: return response # Apply compression for appropriate responses if current_app.config.get('ENABLE_COMPRESSION', True): try: response = response_optimizer.compress_response(response) except (RuntimeError, AttributeError) as e: logger.debug(f"Skipping compression due to response type: {e}") # Add optimization marker response.headers['X-Optimized'] = 'true' return response