Spaces:
Running
Running
File size: 6,047 Bytes
29fbb51 4d6298c 29fbb51 4d6298c 29fbb51 4d6298c 29fbb51 4d6298c 29fbb51 4d6298c 29fbb51 4d6298c 29fbb51 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 | import asyncio
import logging
from typing import Dict, Any
from fastapi import HTTPException, UploadFile, status, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from config import Config
from .rag_pipeline import route_and_process_query, add_document_to_rag, check_system_health
from .document_handler import extract_text_from_file
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
security = HTTPBearer()
# Supported file types
SUPPORTED_CONTENT_TYPES = Config.RAG_SUPPORTED_CONTENT_TYPES
MAX_FILE_SIZE = Config.RAG_MAX_FILE_SIZE
MAX_QUERY_LENGTH = Config.RAG_MAX_QUERY_LENGTH
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify Bearer token from Authorization header."""
token = credentials.credentials
expected_token = Config.SECRET_TOKEN
if not expected_token:
logger.error("MY_SECRET_TOKEN not configured")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Server configuration error"
)
if token != expected_token:
logger.warning(f"Invalid token attempt: {token[:10]}...")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Invalid or expired token"
)
return token
async def handle_rag_query(query: str) -> Dict[str, Any]:
"""Handle an incoming query by routing it and getting the appropriate answer."""
# Input validation
if not query or not query.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Query cannot be empty"
)
if len(query) > MAX_QUERY_LENGTH:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Query too long. Please limit to {MAX_QUERY_LENGTH} characters."
)
try:
logger.info(f"Processing query: {query[:50]}...")
# Process query in thread pool
response = await asyncio.to_thread(route_and_process_query, query)
logger.info(f"Query processed successfully. Route: {response.get('route', 'Unknown')}")
return response
except Exception as e:
logger.error(f"Error processing query: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error processing your query. Please try again."
)
async def handle_document_upload(file: UploadFile) -> Dict[str, str]:
"""Handle uploading a document to the RAG's vector store."""
# File validation
if not file.filename:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No file provided"
)
if file.content_type not in SUPPORTED_CONTENT_TYPES:
raise HTTPException(
status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
detail=f"Unsupported file type: {file.content_type}. "
f"Supported types: {', '.join(SUPPORTED_CONTENT_TYPES)}"
)
# Check file size
contents = await file.read()
if len(contents) > MAX_FILE_SIZE:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f"File too large. Maximum size: {MAX_FILE_SIZE / (1024*1024):.1f}MB"
)
# Reset file pointer
await file.seek(0)
try:
logger.info(f"Processing file upload: {file.filename}")
# Extract text from file
text = await extract_text_from_file(file)
if not text or not text.strip():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="The file appears to be empty or could not be read."
)
if len(text) < 50: # Too short to be meaningful
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="The extracted text is too short to be meaningful."
)
# Add to RAG system
success = await asyncio.to_thread(
add_document_to_rag,
text,
{
"source": file.filename,
"content_type": file.content_type,
"size": len(contents)
}
)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to add document to the knowledge base"
)
logger.info(f"Successfully processed file: {file.filename}")
return {
"message": f"Successfully uploaded and processed '{file.filename}'. "
f"It is now available for querying.",
"filename": file.filename,
"text_length": len(text),
"content_type": file.content_type
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error processing file {file.filename}: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Error processing the file. Please try again."
)
async def handle_health_check() -> Dict[str, Any]:
"""Handle health check requests."""
try:
health_status = await asyncio.to_thread(check_system_health)
if health_status["status"] == "unhealthy":
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Service is currently unhealthy"
)
return health_status
except HTTPException:
raise
except Exception as e:
logger.error(f"Health check failed: {e}")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Health check failed"
) |