| """ |
| Supabase client for database operations. |
| Handles document requests, generated documents, and user integrations. |
| """ |
|
|
| from typing import Optional, Dict, Any, List |
| import os |
| from datetime import datetime |
| from supabase import create_client, Client |
| from .config import settings |
|
|
|
|
| class SupabaseClient: |
| """Wrapper for Supabase operations related to document generation""" |
| |
| def __init__(self): |
| if not settings.SUPABASE_URL or not settings.SUPABASE_KEY: |
| raise ValueError("SUPABASE_URL and SUPABASE_KEY must be set in environment") |
| |
| self.client: Client = create_client( |
| settings.SUPABASE_URL, |
| settings.SUPABASE_KEY |
| ) |
| |
| |
| |
| def create_document_request( |
| self, |
| user_id: int, |
| metadata: Dict[str, Any], |
| status: str = "pending" |
| ) -> str: |
| """ |
| Create a new document generation request. |
| |
| Args: |
| user_id: User ID from users table |
| metadata: Request parameters (seed_images, prompt_params, etc.) |
| status: Initial status (default: 'pending') |
| |
| Returns: |
| request_id (UUID) |
| """ |
| result = self.client.table("document_requests").insert({ |
| "user_id": user_id, |
| "metadata": metadata, |
| "status": status, |
| "created_at": datetime.now().isoformat(), |
| "updated_at": datetime.now().isoformat() |
| }).execute() |
| |
| return result.data[0]["id"] |
| |
| def update_request_status( |
| self, |
| request_id: str, |
| status: str, |
| error_message: Optional[str] = None |
| ): |
| """ |
| Update document request status. |
| |
| Args: |
| request_id: UUID of the request |
| status: New status (pending, processing, generating, completed, failed, etc.) |
| error_message: Error message if status is 'failed' |
| """ |
| update_data = { |
| "status": status, |
| "updated_at": datetime.now().isoformat() |
| } |
| |
| if error_message: |
| update_data["error_message"] = error_message |
| |
| self.client.table("document_requests").update(update_data).eq( |
| "id", request_id |
| ).execute() |
| |
| def get_request(self, request_id: str) -> Optional[Dict[str, Any]]: |
| """ |
| Get document request by ID. |
| |
| Returns: |
| Dict with keys: id, user_id, metadata, status, created_at, updated_at, error_message |
| """ |
| result = self.client.table("document_requests").select("*").eq( |
| "id", request_id |
| ).execute() |
| |
| return result.data[0] if result.data else None |
| |
| def get_user_id_from_request(self, request_id: str) -> Optional[int]: |
| """ |
| Get user_id from a document request. |
| |
| Args: |
| request_id: Document request UUID |
| |
| Returns: |
| user_id or None if request not found |
| """ |
| result = self.client.table("document_requests").select("user_id").eq( |
| "id", request_id |
| ).execute() |
| |
| return result.data[0]["user_id"] if result.data else None |
| |
| def get_user_requests( |
| self, |
| user_id: int, |
| limit: int = 50, |
| offset: int = 0 |
| ) -> List[Dict[str, Any]]: |
| """Get all requests for a user, ordered by created_at DESC""" |
| result = self.client.table("document_requests").select( |
| "*" |
| ).eq("user_id", user_id).order( |
| "created_at", desc=True |
| ).range(offset, offset + limit - 1).execute() |
| |
| return result.data |
| |
| |
| |
| def create_generated_document( |
| self, |
| request_id: str, |
| file_url: Optional[str] = None, |
| file_type: Optional[str] = None, |
| model_version: Optional[str] = None, |
| doc_index: Optional[int] = None, |
| doc_storage_path: Optional[str] = None, |
| gt_storage_path: Optional[str] = None, |
| html_storage_path: Optional[str] = None, |
| bbox_storage_path: Optional[str] = None, |
| zip_url: Optional[str] = None, |
| flagged: bool = False, |
| flag_reason: Optional[str] = None |
| ) -> str: |
| """ |
| Create record for a generated document. |
| |
| Args: |
| request_id: Parent request UUID (FK to document_requests) |
| file_url: Google Drive URL or other storage URL |
| file_type: MIME type (e.g., 'application/zip', 'application/pdf') |
| model_version: Model version used for generation (optional) |
| doc_index: Index of the document within the request (optional) |
| doc_storage_path: Path to the generated PDF in Supabase storage (optional) |
| gt_storage_path: Path to the ground truth JSON in Supabase storage (optional) |
| html_storage_path: Path to the HTML source in Supabase storage (optional) |
| bbox_storage_path: Path to the bbox JSON in Supabase storage (optional) |
| flagged: Whether the document is flagged for review |
| flag_reason: Reason for flagging |
| |
| Returns: |
| id (UUID) - Database record ID |
| """ |
| insert_data = { |
| "request_id": request_id, |
| "created_at": datetime.now().isoformat(), |
| "updated_at": datetime.now().isoformat(), |
| "flagged": flagged |
| } |
| |
| if file_url is not None: |
| insert_data["file_url"] = file_url |
| if file_type is not None: |
| insert_data["file_type"] = file_type |
| if model_version is not None: |
| insert_data["model_version"] = model_version |
| if doc_index is not None: |
| insert_data["doc_index"] = doc_index |
| if doc_storage_path is not None: |
| insert_data["doc_storage_path"] = doc_storage_path |
| if gt_storage_path is not None: |
| insert_data["gt_storage_path"] = gt_storage_path |
| if html_storage_path is not None: |
| insert_data["html_storage_path"] = html_storage_path |
| if bbox_storage_path is not None: |
| insert_data["bbox_storage_path"] = bbox_storage_path |
| if zip_url is not None: |
| insert_data["zip_url"] = zip_url |
| if flag_reason is not None: |
| insert_data["flag_reason"] = flag_reason |
| |
| result = self.client.table("generated_documents").insert(insert_data).execute() |
| |
| return result.data[0]["id"] |
| |
| def upload_to_storage( |
| self, |
| bucket_name: str, |
| path: str, |
| file_bytes: bytes, |
| content_type: str |
| ) -> Dict[str, Any]: |
| """ |
| Upload a file to Supabase storage. |
| |
| Args: |
| bucket_name: The name of the Supabase storage bucket |
| path: The path/filename to store the file as |
| file_bytes: The raw bytes of the file |
| content_type: MIME type of the file |
| |
| Returns: |
| Upload result dictionary containing the path |
| """ |
| return self.client.storage.from_(bucket_name).upload( |
| file=file_bytes, |
| path=path, |
| file_options={"content-type": content_type, "upsert": "true"} |
| ) |
| |
| def list_files(self, bucket_name: str, path: str) -> List[Dict[str, Any]]: |
| """List files in a Supabase storage bucket at a given path.""" |
| return self.client.storage.from_(bucket_name).list(path) |
|
|
| def download_file(self, bucket_name: str, path: str) -> bytes: |
| """Download a file from Supabase storage.""" |
| return self.client.storage.from_(bucket_name).download(path) |
| |
| def get_public_url(self, bucket_name: str, path: str) -> str: |
| """Get the public URL for a file in Supabase storage.""" |
| return self.client.storage.from_(bucket_name).get_public_url(path) |
| |
| def get_generated_documents( |
| self, |
| request_id: str |
| ) -> List[Dict[str, Any]]: |
| """Get all generated documents for a request""" |
| result = self.client.table("generated_documents").select("*").eq( |
| "request_id", request_id |
| ).execute() |
| |
| return result.data |
| |
| |
| |
| def get_user_google_drive_integration( |
| self, |
| user_id: int |
| ) -> Optional[Dict[str, Any]]: |
| """Get user's Google Drive integration credentials""" |
| result = self.client.table("user_integrations").select("*").eq( |
| "user_id", user_id |
| ).eq("provider", "google_drive").execute() |
| |
| return result.data[0] if result.data else None |
| |
| def update_google_drive_tokens( |
| self, |
| user_id: int, |
| access_token: str, |
| refresh_token: Optional[str] = None, |
| expires_at: Optional[datetime] = None |
| ): |
| """[DEPRECATED] Update Google Drive OAuth tokens""" |
| |
| |
| pass |
| |
| |
| |
| def log_analytics_event( |
| self, |
| user_id: int, |
| event_type: str, |
| entity_id: Optional[str] = None |
| ): |
| """Log analytics event""" |
| self.client.table("analytics_events").insert({ |
| "user_id": user_id, |
| "event_type": event_type, |
| "entity_id": entity_id, |
| "created_at": datetime.now().isoformat() |
| }).execute() |
|
|
|
|
| |
| supabase_client = SupabaseClient() |
|
|