Spaces:
Runtime error
Runtime error
| """ | |
| Response optimization utilities for API endpoints. | |
| This module provides response compression, caching headers, and other | |
| performance optimizations for HTTP responses. | |
| """ | |
| import gzip | |
| import json | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import Any, Dict, Optional, Union, Tuple | |
| from functools import wraps | |
| from flask import Response, request, current_app, make_response, jsonify | |
| logger = logging.getLogger(__name__) | |
| class ResponseOptimizer: | |
| """Handles response optimization including compression and caching.""" | |
| def __init__(self): | |
| """Initialize response optimizer.""" | |
| self.compression_threshold = 1024 # Compress responses larger than 1KB | |
| self.default_cache_max_age = 300 # 5 minutes default cache | |
| self.compressible_types = { | |
| 'application/json', | |
| 'text/html', | |
| 'text/plain', | |
| 'text/css', | |
| 'text/javascript', | |
| 'application/javascript' | |
| } | |
| def compress_response(self, response: Response) -> Response: | |
| """ | |
| Compress response if appropriate. | |
| Args: | |
| response: Flask response object | |
| Returns: | |
| Potentially compressed response | |
| """ | |
| # Check if compression is supported by client | |
| accept_encoding = request.headers.get('Accept-Encoding', '') | |
| if 'gzip' not in accept_encoding.lower(): | |
| return response | |
| # Skip compression for streaming responses or direct passthrough | |
| if response.direct_passthrough or response.is_streamed: | |
| return response | |
| # Check if response has data and is large enough to compress | |
| try: | |
| if not hasattr(response, 'data') or len(response.data) < self.compression_threshold: | |
| return response | |
| except (RuntimeError, AttributeError): | |
| # Handle cases where data access fails (e.g., direct passthrough mode) | |
| return response | |
| # Check if content type is compressible | |
| content_type = response.headers.get('Content-Type', '').split(';')[0] | |
| if content_type not in self.compressible_types: | |
| return response | |
| # Check if already compressed | |
| if response.headers.get('Content-Encoding'): | |
| return response | |
| try: | |
| # Compress the response data | |
| compressed_data = gzip.compress(response.data) | |
| # Only use compression if it actually reduces size | |
| if len(compressed_data) < len(response.data): | |
| response.data = compressed_data | |
| response.headers['Content-Encoding'] = 'gzip' | |
| response.headers['Content-Length'] = len(compressed_data) | |
| response.headers['Vary'] = 'Accept-Encoding' | |
| logger.debug(f"Compressed response: {len(response.data)} -> {len(compressed_data)} bytes") | |
| except Exception as e: | |
| logger.warning(f"Failed to compress response: {e}") | |
| return response | |
| def add_cache_headers(self, response: Response, max_age: Optional[int] = None, | |
| etag: Optional[str] = None, last_modified: Optional[datetime] = None, | |
| cache_type: str = 'public') -> Response: | |
| """ | |
| Add appropriate cache headers to response. | |
| Args: | |
| response: Flask response object | |
| max_age: Cache max age in seconds | |
| etag: ETag value for conditional requests | |
| last_modified: Last modified timestamp | |
| cache_type: Cache type ('public', 'private', 'no-cache') | |
| Returns: | |
| Response with cache headers | |
| """ | |
| max_age = max_age or self.default_cache_max_age | |
| # Set Cache-Control header | |
| if cache_type == 'no-cache': | |
| response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' | |
| response.headers['Pragma'] = 'no-cache' | |
| response.headers['Expires'] = '0' | |
| else: | |
| response.headers['Cache-Control'] = f'{cache_type}, max-age={max_age}' | |
| # Set Expires header | |
| expires = datetime.utcnow() + timedelta(seconds=max_age) | |
| response.headers['Expires'] = expires.strftime('%a, %d %b %Y %H:%M:%S GMT') | |
| # Set ETag if provided | |
| if etag: | |
| response.headers['ETag'] = f'"{etag}"' | |
| # Set Last-Modified if provided | |
| if last_modified: | |
| response.headers['Last-Modified'] = last_modified.strftime('%a, %d %b %Y %H:%M:%S GMT') | |
| return response | |
| def handle_conditional_request(self, etag: Optional[str] = None, | |
| last_modified: Optional[datetime] = None) -> Optional[Response]: | |
| """ | |
| Handle conditional requests (If-None-Match, If-Modified-Since). | |
| Args: | |
| etag: Current ETag value | |
| last_modified: Current last modified timestamp | |
| Returns: | |
| 304 Not Modified response if conditions match, None otherwise | |
| """ | |
| # Handle If-None-Match (ETag) | |
| if etag: | |
| if_none_match = request.headers.get('If-None-Match') | |
| if if_none_match: | |
| # Remove quotes from ETag values | |
| client_etags = [tag.strip('"') for tag in if_none_match.split(',')] | |
| if etag in client_etags or '*' in client_etags: | |
| response = make_response('', 304) | |
| response.headers['ETag'] = f'"{etag}"' | |
| return response | |
| # Handle If-Modified-Since | |
| if last_modified: | |
| if_modified_since = request.headers.get('If-Modified-Since') | |
| if if_modified_since: | |
| try: | |
| client_time = datetime.strptime(if_modified_since, '%a, %d %b %Y %H:%M:%S GMT') | |
| # Remove microseconds for comparison | |
| server_time = last_modified.replace(microsecond=0) | |
| client_time = client_time.replace(microsecond=0) | |
| if server_time <= client_time: | |
| response = make_response('', 304) | |
| response.headers['Last-Modified'] = last_modified.strftime('%a, %d %b %Y %H:%M:%S GMT') | |
| return response | |
| except ValueError: | |
| # Invalid date format, ignore | |
| pass | |
| return None | |
| def generate_etag(self, data: Union[str, bytes, Dict, Any]) -> str: | |
| """ | |
| Generate ETag for response data. | |
| Args: | |
| data: Response data | |
| Returns: | |
| ETag string | |
| """ | |
| import hashlib | |
| if isinstance(data, dict): | |
| data_str = json.dumps(data, sort_keys=True) | |
| elif isinstance(data, bytes): | |
| data_str = data.decode('utf-8', errors='ignore') | |
| else: | |
| data_str = str(data) | |
| return hashlib.md5(data_str.encode('utf-8')).hexdigest() | |
| # Global response optimizer instance | |
| response_optimizer = ResponseOptimizer() | |
| def compress_response(f): | |
| """Decorator to automatically compress responses.""" | |
| def decorated_function(*args, **kwargs): | |
| response = f(*args, **kwargs) | |
| # Convert to Response object if needed | |
| if not isinstance(response, Response): | |
| response = make_response(response) | |
| return response_optimizer.compress_response(response) | |
| return decorated_function | |
| def cache_response(max_age: int = 300, cache_type: str = 'public', | |
| generate_etag: bool = True): | |
| """ | |
| Decorator to add cache headers to responses. | |
| Args: | |
| max_age: Cache max age in seconds | |
| cache_type: Cache type ('public', 'private', 'no-cache') | |
| generate_etag: Whether to generate ETag automatically | |
| """ | |
| def decorator(f): | |
| def decorated_function(*args, **kwargs): | |
| # Check for conditional request first | |
| if generate_etag and request.method == 'GET': | |
| # For simple cases, we can't pre-generate ETag without calling the function | |
| # This is a limitation of this approach - for better ETag support, | |
| # consider implementing at the view level | |
| pass | |
| response = f(*args, **kwargs) | |
| # Convert to Response object if needed | |
| if not isinstance(response, Response): | |
| response = make_response(response) | |
| # Generate ETag if requested | |
| etag = None | |
| if generate_etag and hasattr(response, 'data'): | |
| etag = response_optimizer.generate_etag(response.data) | |
| # Add cache headers | |
| response = response_optimizer.add_cache_headers( | |
| response, max_age=max_age, etag=etag, cache_type=cache_type | |
| ) | |
| return response | |
| return decorated_function | |
| return decorator | |
| def no_cache(f): | |
| """Decorator to prevent caching of responses.""" | |
| def decorated_function(*args, **kwargs): | |
| response = f(*args, **kwargs) | |
| # Convert to Response object if needed | |
| if not isinstance(response, Response): | |
| response = make_response(response) | |
| return response_optimizer.add_cache_headers(response, cache_type='no-cache') | |
| return decorated_function | |
| def conditional_response(etag_func=None, last_modified_func=None): | |
| """ | |
| Decorator for conditional responses with ETag and Last-Modified support. | |
| Args: | |
| etag_func: Function to generate ETag (receives same args as decorated function) | |
| last_modified_func: Function to get last modified time (receives same args as decorated function) | |
| """ | |
| def decorator(f): | |
| def decorated_function(*args, **kwargs): | |
| etag = None | |
| last_modified = None | |
| # Generate ETag if function provided | |
| if etag_func: | |
| try: | |
| etag = etag_func(*args, **kwargs) | |
| except Exception as e: | |
| logger.warning(f"Failed to generate ETag: {e}") | |
| # Get last modified time if function provided | |
| if last_modified_func: | |
| try: | |
| last_modified = last_modified_func(*args, **kwargs) | |
| except Exception as e: | |
| logger.warning(f"Failed to get last modified time: {e}") | |
| # Check conditional request | |
| conditional_response = response_optimizer.handle_conditional_request( | |
| etag=etag, last_modified=last_modified | |
| ) | |
| if conditional_response: | |
| return conditional_response | |
| # Call original function | |
| response = f(*args, **kwargs) | |
| # Convert to Response object if needed | |
| if not isinstance(response, Response): | |
| response = make_response(response) | |
| # Add cache headers with ETag and Last-Modified | |
| response = response_optimizer.add_cache_headers( | |
| response, etag=etag, last_modified=last_modified | |
| ) | |
| return response | |
| return decorated_function | |
| return decorator | |
| def optimize_json_response(data: Dict[str, Any], status_code: int = 200, | |
| max_age: int = 300, compress: bool = True) -> Response: | |
| """ | |
| Create optimized JSON response with compression and caching. | |
| Args: | |
| data: Response data | |
| status_code: HTTP status code | |
| max_age: Cache max age in seconds | |
| compress: Whether to compress response | |
| Returns: | |
| Optimized Flask response | |
| """ | |
| response = jsonify(data) | |
| response.status_code = status_code | |
| # Add cache headers | |
| etag = response_optimizer.generate_etag(data) | |
| response = response_optimizer.add_cache_headers(response, max_age=max_age, etag=etag) | |
| # Compress if requested | |
| if compress: | |
| response = response_optimizer.compress_response(response) | |
| return response | |
| def create_streaming_response(generator, content_type: str = 'text/plain', | |
| compress: bool = False) -> Response: | |
| """ | |
| Create streaming response for real-time data. | |
| Args: | |
| generator: Data generator function | |
| content_type: Response content type | |
| compress: Whether to compress (not recommended for streaming) | |
| Returns: | |
| Streaming Flask response | |
| """ | |
| def generate(): | |
| try: | |
| for chunk in generator: | |
| if isinstance(chunk, dict): | |
| yield json.dumps(chunk) + '\n' | |
| else: | |
| yield str(chunk) | |
| except Exception as e: | |
| logger.error(f"Error in streaming response: {e}") | |
| yield json.dumps({'error': 'Stream interrupted'}) + '\n' | |
| response = Response(generate(), content_type=content_type) | |
| # Disable caching for streaming responses | |
| response = response_optimizer.add_cache_headers(response, cache_type='no-cache') | |
| # Add streaming headers | |
| response.headers['X-Accel-Buffering'] = 'no' # Disable nginx buffering | |
| response.headers['Connection'] = 'keep-alive' | |
| return response | |
| class ResponseMiddleware: | |
| """Middleware for automatic response optimization.""" | |
| def __init__(self, app=None): | |
| """Initialize response middleware.""" | |
| self.app = app | |
| if app is not None: | |
| self.init_app(app) | |
| def init_app(self, app): | |
| """Initialize middleware with Flask app.""" | |
| app.after_request(self.process_response) | |
| def process_response(self, response: Response) -> Response: | |
| """Process response for optimization.""" | |
| # Skip optimization for certain response types | |
| if response.status_code >= 400: | |
| return response | |
| # Skip if already processed | |
| if response.headers.get('X-Optimized'): | |
| return response | |
| # Skip optimization for static files and streaming responses | |
| if response.direct_passthrough or response.is_streamed: | |
| return response | |
| # Apply compression for appropriate responses | |
| if current_app.config.get('ENABLE_COMPRESSION', True): | |
| try: | |
| response = response_optimizer.compress_response(response) | |
| except (RuntimeError, AttributeError) as e: | |
| logger.debug(f"Skipping compression due to response type: {e}") | |
| # Add optimization marker | |
| response.headers['X-Optimized'] = 'true' | |
| return response |