scratch_chat / chat_agent /utils /response_optimization.py
WebashalarForML's picture
Upload 178 files
330b6e4 verified
"""
Response optimization utilities for API endpoints.
This module provides response compression, caching headers, and other
performance optimizations for HTTP responses.
"""
import gzip
import json
import logging
from datetime import datetime, timedelta
from typing import Any, Dict, Optional, Union, Tuple
from functools import wraps
from flask import Response, request, current_app, make_response, jsonify
logger = logging.getLogger(__name__)
class ResponseOptimizer:
"""Handles response optimization including compression and caching."""
def __init__(self):
"""Initialize response optimizer."""
self.compression_threshold = 1024 # Compress responses larger than 1KB
self.default_cache_max_age = 300 # 5 minutes default cache
self.compressible_types = {
'application/json',
'text/html',
'text/plain',
'text/css',
'text/javascript',
'application/javascript'
}
def compress_response(self, response: Response) -> Response:
"""
Compress response if appropriate.
Args:
response: Flask response object
Returns:
Potentially compressed response
"""
# Check if compression is supported by client
accept_encoding = request.headers.get('Accept-Encoding', '')
if 'gzip' not in accept_encoding.lower():
return response
# Skip compression for streaming responses or direct passthrough
if response.direct_passthrough or response.is_streamed:
return response
# Check if response has data and is large enough to compress
try:
if not hasattr(response, 'data') or len(response.data) < self.compression_threshold:
return response
except (RuntimeError, AttributeError):
# Handle cases where data access fails (e.g., direct passthrough mode)
return response
# Check if content type is compressible
content_type = response.headers.get('Content-Type', '').split(';')[0]
if content_type not in self.compressible_types:
return response
# Check if already compressed
if response.headers.get('Content-Encoding'):
return response
try:
# Compress the response data
compressed_data = gzip.compress(response.data)
# Only use compression if it actually reduces size
if len(compressed_data) < len(response.data):
response.data = compressed_data
response.headers['Content-Encoding'] = 'gzip'
response.headers['Content-Length'] = len(compressed_data)
response.headers['Vary'] = 'Accept-Encoding'
logger.debug(f"Compressed response: {len(response.data)} -> {len(compressed_data)} bytes")
except Exception as e:
logger.warning(f"Failed to compress response: {e}")
return response
def add_cache_headers(self, response: Response, max_age: Optional[int] = None,
etag: Optional[str] = None, last_modified: Optional[datetime] = None,
cache_type: str = 'public') -> Response:
"""
Add appropriate cache headers to response.
Args:
response: Flask response object
max_age: Cache max age in seconds
etag: ETag value for conditional requests
last_modified: Last modified timestamp
cache_type: Cache type ('public', 'private', 'no-cache')
Returns:
Response with cache headers
"""
max_age = max_age or self.default_cache_max_age
# Set Cache-Control header
if cache_type == 'no-cache':
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = '0'
else:
response.headers['Cache-Control'] = f'{cache_type}, max-age={max_age}'
# Set Expires header
expires = datetime.utcnow() + timedelta(seconds=max_age)
response.headers['Expires'] = expires.strftime('%a, %d %b %Y %H:%M:%S GMT')
# Set ETag if provided
if etag:
response.headers['ETag'] = f'"{etag}"'
# Set Last-Modified if provided
if last_modified:
response.headers['Last-Modified'] = last_modified.strftime('%a, %d %b %Y %H:%M:%S GMT')
return response
def handle_conditional_request(self, etag: Optional[str] = None,
last_modified: Optional[datetime] = None) -> Optional[Response]:
"""
Handle conditional requests (If-None-Match, If-Modified-Since).
Args:
etag: Current ETag value
last_modified: Current last modified timestamp
Returns:
304 Not Modified response if conditions match, None otherwise
"""
# Handle If-None-Match (ETag)
if etag:
if_none_match = request.headers.get('If-None-Match')
if if_none_match:
# Remove quotes from ETag values
client_etags = [tag.strip('"') for tag in if_none_match.split(',')]
if etag in client_etags or '*' in client_etags:
response = make_response('', 304)
response.headers['ETag'] = f'"{etag}"'
return response
# Handle If-Modified-Since
if last_modified:
if_modified_since = request.headers.get('If-Modified-Since')
if if_modified_since:
try:
client_time = datetime.strptime(if_modified_since, '%a, %d %b %Y %H:%M:%S GMT')
# Remove microseconds for comparison
server_time = last_modified.replace(microsecond=0)
client_time = client_time.replace(microsecond=0)
if server_time <= client_time:
response = make_response('', 304)
response.headers['Last-Modified'] = last_modified.strftime('%a, %d %b %Y %H:%M:%S GMT')
return response
except ValueError:
# Invalid date format, ignore
pass
return None
def generate_etag(self, data: Union[str, bytes, Dict, Any]) -> str:
"""
Generate ETag for response data.
Args:
data: Response data
Returns:
ETag string
"""
import hashlib
if isinstance(data, dict):
data_str = json.dumps(data, sort_keys=True)
elif isinstance(data, bytes):
data_str = data.decode('utf-8', errors='ignore')
else:
data_str = str(data)
return hashlib.md5(data_str.encode('utf-8')).hexdigest()
# Global response optimizer instance
response_optimizer = ResponseOptimizer()
def compress_response(f):
"""Decorator to automatically compress responses."""
@wraps(f)
def decorated_function(*args, **kwargs):
response = f(*args, **kwargs)
# Convert to Response object if needed
if not isinstance(response, Response):
response = make_response(response)
return response_optimizer.compress_response(response)
return decorated_function
def cache_response(max_age: int = 300, cache_type: str = 'public',
generate_etag: bool = True):
"""
Decorator to add cache headers to responses.
Args:
max_age: Cache max age in seconds
cache_type: Cache type ('public', 'private', 'no-cache')
generate_etag: Whether to generate ETag automatically
"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Check for conditional request first
if generate_etag and request.method == 'GET':
# For simple cases, we can't pre-generate ETag without calling the function
# This is a limitation of this approach - for better ETag support,
# consider implementing at the view level
pass
response = f(*args, **kwargs)
# Convert to Response object if needed
if not isinstance(response, Response):
response = make_response(response)
# Generate ETag if requested
etag = None
if generate_etag and hasattr(response, 'data'):
etag = response_optimizer.generate_etag(response.data)
# Add cache headers
response = response_optimizer.add_cache_headers(
response, max_age=max_age, etag=etag, cache_type=cache_type
)
return response
return decorated_function
return decorator
def no_cache(f):
"""Decorator to prevent caching of responses."""
@wraps(f)
def decorated_function(*args, **kwargs):
response = f(*args, **kwargs)
# Convert to Response object if needed
if not isinstance(response, Response):
response = make_response(response)
return response_optimizer.add_cache_headers(response, cache_type='no-cache')
return decorated_function
def conditional_response(etag_func=None, last_modified_func=None):
"""
Decorator for conditional responses with ETag and Last-Modified support.
Args:
etag_func: Function to generate ETag (receives same args as decorated function)
last_modified_func: Function to get last modified time (receives same args as decorated function)
"""
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
etag = None
last_modified = None
# Generate ETag if function provided
if etag_func:
try:
etag = etag_func(*args, **kwargs)
except Exception as e:
logger.warning(f"Failed to generate ETag: {e}")
# Get last modified time if function provided
if last_modified_func:
try:
last_modified = last_modified_func(*args, **kwargs)
except Exception as e:
logger.warning(f"Failed to get last modified time: {e}")
# Check conditional request
conditional_response = response_optimizer.handle_conditional_request(
etag=etag, last_modified=last_modified
)
if conditional_response:
return conditional_response
# Call original function
response = f(*args, **kwargs)
# Convert to Response object if needed
if not isinstance(response, Response):
response = make_response(response)
# Add cache headers with ETag and Last-Modified
response = response_optimizer.add_cache_headers(
response, etag=etag, last_modified=last_modified
)
return response
return decorated_function
return decorator
def optimize_json_response(data: Dict[str, Any], status_code: int = 200,
max_age: int = 300, compress: bool = True) -> Response:
"""
Create optimized JSON response with compression and caching.
Args:
data: Response data
status_code: HTTP status code
max_age: Cache max age in seconds
compress: Whether to compress response
Returns:
Optimized Flask response
"""
response = jsonify(data)
response.status_code = status_code
# Add cache headers
etag = response_optimizer.generate_etag(data)
response = response_optimizer.add_cache_headers(response, max_age=max_age, etag=etag)
# Compress if requested
if compress:
response = response_optimizer.compress_response(response)
return response
def create_streaming_response(generator, content_type: str = 'text/plain',
compress: bool = False) -> Response:
"""
Create streaming response for real-time data.
Args:
generator: Data generator function
content_type: Response content type
compress: Whether to compress (not recommended for streaming)
Returns:
Streaming Flask response
"""
def generate():
try:
for chunk in generator:
if isinstance(chunk, dict):
yield json.dumps(chunk) + '\n'
else:
yield str(chunk)
except Exception as e:
logger.error(f"Error in streaming response: {e}")
yield json.dumps({'error': 'Stream interrupted'}) + '\n'
response = Response(generate(), content_type=content_type)
# Disable caching for streaming responses
response = response_optimizer.add_cache_headers(response, cache_type='no-cache')
# Add streaming headers
response.headers['X-Accel-Buffering'] = 'no' # Disable nginx buffering
response.headers['Connection'] = 'keep-alive'
return response
class ResponseMiddleware:
"""Middleware for automatic response optimization."""
def __init__(self, app=None):
"""Initialize response middleware."""
self.app = app
if app is not None:
self.init_app(app)
def init_app(self, app):
"""Initialize middleware with Flask app."""
app.after_request(self.process_response)
def process_response(self, response: Response) -> Response:
"""Process response for optimization."""
# Skip optimization for certain response types
if response.status_code >= 400:
return response
# Skip if already processed
if response.headers.get('X-Optimized'):
return response
# Skip optimization for static files and streaming responses
if response.direct_passthrough or response.is_streamed:
return response
# Apply compression for appropriate responses
if current_app.config.get('ENABLE_COMPRESSION', True):
try:
response = response_optimizer.compress_response(response)
except (RuntimeError, AttributeError) as e:
logger.debug(f"Skipping compression due to response type: {e}")
# Add optimization marker
response.headers['X-Optimized'] = 'true'
return response