| """ |
| FastAPI application for DocGenie document generation. |
| |
| FULLY INTEGRATED PIPELINE (All 19 Stages): |
| |
| ✅ Stage 1-2: Core Pipeline (Stages 01-06) |
| 1. Seed Selection: Download and encode seed images |
| 2. LLM Prompting: Call Claude API (batched client support) |
| 3. Response Processing: Extract and validate HTML/GT |
| 4. PDF Rendering: Generate PDFs with geometry extraction |
| 5. BBox Extraction: Extract bounding boxes from PDFs |
| 6. Validation: Verify geometries and bboxes |
| |
| ✅ Stage 3: Feature Synthesis (Stages 07-13) |
| 7. Extract handwriting definitions from HTML |
| 8. Extract visual element definitions from HTML |
| 9. Generate handwriting images (WordStylist diffusion model) |
| 10. Create visual elements (stamps, barcodes, logos) |
| 11. Render second-pass PDF with features |
| 12. Insert handwriting images into PDF |
| 13. Insert visual elements into PDF |
| |
| ✅ Stage 4: Image Finalization & OCR (Stages 14-15) |
| 14. Render final PDF to high-quality image (pdf2image) |
| 15. Perform OCR on final image (Microsoft Document Intelligence) |
| |
| ✅ Stage 5: Dataset Packaging (Stages 16-19) |
| 16. Normalize bounding boxes to [0,1] scale |
| 17. Verify and prepare ground truth annotations |
| 18. Generate document analysis and statistics |
| 19. Create debug visualization overlays |
| |
| See API_PIPELINE_STATUS.md for detailed integration status. |
| """ |
| import os |
| import sys |
| import pathlib |
| import tempfile |
| import uuid |
| import json |
| import zipfile |
| import asyncio |
| import shutil |
| import warnings |
| from typing import List, Optional |
| from contextlib import asynccontextmanager |
|
|
| |
| |
| |
| warnings.filterwarnings("ignore", category=UserWarning, module="resource_tracker") |
|
|
| |
| from dotenv import load_dotenv |
| load_dotenv() |
|
|
| |
| sys.path.insert(0, str(pathlib.Path(__file__).parent.parent)) |
|
|
| from fastapi import FastAPI, HTTPException, status, BackgroundTasks |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import FileResponse, StreamingResponse |
| import uvicorn |
| import io |
|
|
| from docgenie import ENV |
|
|
| from .schemas import ( |
| GenerateDocumentRequest, |
| GenerateDocumentResponse, |
| DocumentResult, |
| BoundingBox, |
| HealthResponse, |
| DatasetExportInfo |
| ) |
| from .utils import ( |
| download_image_to_base64, |
| build_prompt, |
| call_claude_api_direct, |
| extract_html_documents_from_response, |
| extract_ground_truth, |
| extract_css_from_html, |
| render_html_to_pdf, |
| extract_bboxes_from_rendered_pdf, |
| pdf_to_base64, |
| validate_html_structure, |
| validate_pdf, |
| validate_bboxes, |
| process_stage3_complete, |
| process_stage4_ocr, |
| process_stage5_complete |
| ) |
| from .config import settings |
|
|
|
|
| |
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| """Handle application lifecycle - startup and shutdown.""" |
| |
| print("🚀 DocGenie API starting up...") |
| yield |
| |
| print("🛑 DocGenie API shutting down gracefully...") |
| await asyncio.sleep(0.5) |
| print("✓ Shutdown complete") |
|
|
|
|
| |
| app = FastAPI( |
| title="DocGenie API", |
| description="API for generating synthetic documents using LLMs", |
| version="1.0.0", |
| docs_url="/docs", |
| lifespan=lifespan |
| ) |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=settings.get_cors_origins(), |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| @app.get("/", response_model=HealthResponse) |
| async def root(): |
| """Root endpoint - health check.""" |
| return HealthResponse(status="healthy", version="1.0.0") |
|
|
|
|
| @app.get("/health", response_model=HealthResponse) |
| async def health_check(): |
| """Health check endpoint.""" |
| return HealthResponse(status="healthy", version="1.0.0") |
|
|
|
|
| @app.post("/generate", response_model=GenerateDocumentResponse) |
| async def generate_documents(request: GenerateDocumentRequest): |
| """ |
| Generate synthetic documents from seed images. |
| |
| Pipeline: |
| 1. Download seed images from URLs |
| 2. Convert images to base64 |
| 3. Build prompt with user parameters |
| 4. Call Claude API |
| 5. Extract HTML documents from response |
| 6. Extract ground truth and CSS |
| 7. Render HTML to PDF |
| 8. Extract bounding boxes |
| 9. Return results |
| """ |
| try: |
| |
| print(f"Downloading {len(request.seed_images)} seed images...") |
| seed_images_base64 = [] |
| |
| |
| user_id_from_input, request_id = parse_request_id(request.request_id) |
| user_id = user_id_from_input |
| assets_temp_dir = None |
| |
| |
| try: |
| from .supabase_client import supabase_client |
| |
| effective_user_id = user_id |
| if not effective_user_id: |
| effective_user_id = supabase_client.get_user_id_from_request(request_id) |
| |
| if effective_user_id and request_id: |
| assets_path = f"{effective_user_id}/{request_id}/assets" |
| files = supabase_client.list_files("doc_storage", assets_path) |
| asset_files = [f for f in files if f.get('id') is not None] |
| |
| if asset_files: |
| assets_temp_dir = pathlib.Path(tempfile.mkdtemp()) |
| print(f"Found {len(asset_files)} assets in storage, downloading...") |
| for file_info in asset_files: |
| file_name = file_info['name'] |
| try: |
| file_content = supabase_client.download_file("doc_storage", f"{assets_path}/{file_name}") |
| with open(assets_temp_dir / file_name, 'wb') as f: |
| f.write(file_content) |
| except Exception as e: |
| print(f" ⚠ Failed to download asset {file_name}: {e}") |
| except Exception as e: |
| print(f" ⚠ Asset check failed: {e}") |
| |
| for url in request.seed_images: |
| try: |
| img_b64 = await download_image_to_base64(str(url)) |
| seed_images_base64.append(img_b64) |
| except Exception as e: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=f"Failed to download image from {url}: {str(e)}" |
| ) |
| |
| print(f"Successfully downloaded {len(seed_images_base64)} images") |
| |
| |
| prompt_template_path = ENV.PROMPT_TEMPLATES_DIR / "ClaudeRefined12" / "seed-based-json.txt" |
| |
| if not prompt_template_path.exists(): |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"Prompt template not found at {prompt_template_path}" |
| ) |
| |
| prompt = build_prompt( |
| language=request.prompt_params.language, |
| doc_type=request.prompt_params.doc_type, |
| gt_type=request.prompt_params.gt_type, |
| gt_format=request.prompt_params.gt_format, |
| num_solutions=request.prompt_params.num_solutions, |
| num_seed_images=len(seed_images_base64), |
| prompt_template_path=prompt_template_path, |
| enable_visual_elements=request.prompt_params.enable_visual_elements, |
| visual_element_types=request.prompt_params.visual_element_types |
| ) |
| |
| print("Prompt built successfully") |
| |
| |
| if not settings.ANTHROPIC_API_KEY: |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail="ANTHROPIC_API_KEY environment variable not set" |
| ) |
| |
| print(f"Calling Claude API with model {settings.CLAUDE_MODEL}...") |
| llm_response = await call_claude_api_direct( |
| prompt=prompt, |
| seed_images_base64=seed_images_base64, |
| api_key=settings.ANTHROPIC_API_KEY, |
| model=settings.CLAUDE_MODEL |
| ) |
| |
| print(f"Received LLM response ({len(llm_response)} chars)") |
| |
| |
| html_documents = extract_html_documents_from_response(llm_response) |
| |
| if not html_documents: |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail="No valid HTML documents found in LLM response" |
| ) |
| |
| print(f"Extracted {len(html_documents)} HTML documents") |
| |
| |
| results = [] |
| |
| |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| tmp_path = pathlib.Path(tmp_dir) |
| |
| for idx, html in enumerate(html_documents): |
| try: |
| doc_id = f"{uuid.uuid4()}_{idx}" |
| print(f"Processing document {idx + 1}/{len(html_documents)} (ID: {doc_id})") |
| |
| |
| original_pdf_path = None |
| |
| |
| is_valid, error_msg = validate_html_structure(html) |
| if not is_valid: |
| print(f" ⚠ HTML validation failed: {error_msg}") |
| continue |
| |
| |
| gt, html_clean = extract_ground_truth(html) |
| css, _ = extract_css_from_html(html_clean) |
| |
| |
| print(f"\n 🔍 DEBUG - Handwriting Detection:") |
| print(f" - Contains 'handwritten' class: {'handwritten' in html_clean}") |
| |
| |
| import re |
| author_pattern = re.compile(r'\bauthor\d+\b') |
| author_matches = author_pattern.findall(html_clean) |
| |
| if 'handwritten' in html_clean: |
| |
| hw_count = html_clean.count('handwritten') |
| print(f" - 'handwritten' occurrences: {hw_count}") |
| print(f" - Author classes found: {len(author_matches)}") |
| if author_matches: |
| unique_authors = set(author_matches) |
| print(f" - Unique author IDs: {sorted(unique_authors)}") |
| else: |
| print(f" - ⚠️ NO author classes found (expected format: author1, author2, etc.)") |
| |
| |
| idx = html_clean.find('handwritten') |
| context_start = max(0, idx - 50) |
| context_end = min(len(html_clean), idx + 150) |
| print(f" - First match context: ...{html_clean[context_start:context_end]}...") |
| else: |
| print(f" - ⚠️ NO handwriting classes found in LLM output!") |
| |
| print(f" - HTML sample (first 500 chars): {html_clean[:500]}") |
| |
| print(f" 🔍 DEBUG - Visual Elements Detection:") |
| print(f" - Contains 'data-placeholder': {'data-placeholder' in html_clean}") |
| if 'data-placeholder' in html_clean: |
| ve_count = html_clean.count('data-placeholder') |
| print(f" - 'data-placeholder' occurrences: {ve_count}") |
| print() |
| |
| |
| pdf_path = tmp_path / f"{doc_id}.pdf" |
| pdf_path, width_mm, height_mm, geometries = await render_html_to_pdf( |
| html=html_clean, |
| output_pdf_path=pdf_path |
| ) |
| |
| print(f" ✓ Rendered PDF: {width_mm:.1f}mm x {height_mm:.1f}mm") |
| |
| |
| is_valid, error_msg = validate_pdf(pdf_path) |
| if not is_valid: |
| print(f" ⚠ PDF validation failed: {error_msg}") |
| continue |
| |
| |
| bboxes_raw = extract_bboxes_from_rendered_pdf(pdf_path) |
| |
| |
| is_valid, error_msg = validate_bboxes(bboxes_raw, min_bbox_count=1) |
| if not is_valid: |
| print(f" ⚠ BBox validation failed: {error_msg}") |
| |
| |
| bboxes = [BoundingBox(**bbox) for bbox in bboxes_raw] |
| |
| print(f" ✓ Extracted {len(bboxes)} bounding boxes") |
| |
| |
| pdf_b64 = pdf_to_base64(pdf_path) |
| |
| |
| final_image_b64 = None |
| handwriting_regions = [] |
| visual_elements = [] |
| handwriting_images = {} |
| visual_element_images = {} |
| ocr_results = None |
| modified_pdf_path = None |
| |
| |
| original_pdf_path = pdf_path |
| |
| if request.prompt_params.enable_handwriting or request.prompt_params.enable_visual_elements: |
| print(f" 🎨 Processing Stages 07-13 (Handwriting & Visual Elements)...") |
| |
| try: |
| final_image_b64, handwriting_regions, visual_elements, handwriting_images, visual_element_images, pdf_with_handwriting_path, pdf_final_path = await process_stage3_complete( |
| pdf_path=pdf_path, |
| geometries=geometries, |
| ground_truth=gt, |
| bboxes_raw=bboxes_raw, |
| page_width_mm=width_mm, |
| page_height_mm=height_mm, |
| enable_handwriting=request.prompt_params.enable_handwriting, |
| handwriting_ratio=request.prompt_params.handwriting_ratio, |
| enable_visual_elements=request.prompt_params.enable_visual_elements, |
| visual_element_types=request.prompt_params.visual_element_types, |
| seed=request.prompt_params.seed, |
| assets_dir=assets_temp_dir, |
| barcode_number=request.prompt_params.barcode_number |
| ) |
| |
| |
| if pdf_final_path and pdf_final_path.exists(): |
| pdf_path = pdf_final_path |
| pdf_b64 = pdf_to_base64(pdf_path) |
| elif pdf_with_handwriting_path and pdf_with_handwriting_path.exists(): |
| pdf_path = pdf_with_handwriting_path |
| pdf_b64 = pdf_to_base64(pdf_path) |
| |
| print(f" ✓ Stages 07-13 complete: {len(handwriting_regions)} handwriting regions, {len(visual_elements)} visual elements") |
| print(f" - Individual tokens: {len(handwriting_images)} handwriting, {len(visual_element_images)} visual elements") |
| |
| except Exception as e: |
| print(f" ⚠ Stages 07-13 processing failed: {str(e)}") |
| |
| |
| |
| if request.prompt_params.enable_ocr or (final_image_b64 is None and (request.prompt_params.enable_handwriting or request.prompt_params.enable_visual_elements)): |
| print(f" 📄 Processing Stages 14-15 (Image Finalization & OCR)...") |
| |
| try: |
| stage4_image, ocr_results = await process_stage4_ocr( |
| pdf_path=pdf_path, |
| enable_ocr=request.prompt_params.enable_ocr, |
| dpi=settings.OCR_DPI |
| ) |
| |
| |
| if final_image_b64 is None and stage4_image: |
| final_image_b64 = stage4_image |
| |
| if ocr_results: |
| print(f" ✓ Stages 14-15 complete: Image rendered, OCR: {len(ocr_results.get('words', []))} words") |
| else: |
| print(f" ✓ Stage 14 complete: Image rendered") |
| |
| except Exception as e: |
| print(f" ⚠ Stages 14-15 processing failed: {str(e)}") |
| |
| |
| |
| stage5_results = {} |
| if any([ |
| request.prompt_params.enable_bbox_normalization, |
| request.prompt_params.enable_gt_verification, |
| request.prompt_params.enable_analysis, |
| request.prompt_params.enable_debug_visualization |
| ]): |
| print(f" 📦 Processing Stages 16-18 (Dataset Packaging)...") |
| |
| try: |
| stage5_results = await process_stage5_complete( |
| document_id=doc_id, |
| pdf_path=pdf_path, |
| image_base64=final_image_b64, |
| ocr_results=ocr_results, |
| ground_truth=gt, |
| has_handwriting=request.prompt_params.enable_handwriting, |
| has_visual_elements=request.prompt_params.enable_visual_elements, |
| layout_elements=visual_elements, |
| enable_bbox_normalization=request.prompt_params.enable_bbox_normalization, |
| enable_gt_verification=request.prompt_params.enable_gt_verification, |
| enable_analysis=request.prompt_params.enable_analysis, |
| enable_debug_visualization=request.prompt_params.enable_debug_visualization |
| ) |
| print(f" ✓ Stages 16-18 complete") |
| except Exception as e: |
| print(f" ⚠ Stages 16-18 processing failed: {str(e)}") |
| |
| |
| |
| dataset_export_info = None |
| if request.prompt_params.enable_dataset_export: |
| print(f" 📦 Exporting dataset format ({request.prompt_params.dataset_export_format})...") |
| |
| try: |
| from .utils import export_to_msgpack |
| |
| |
| if request.prompt_params.dataset_export_format.lower() == "msgpack": |
| |
| export_words = [] |
| export_word_bboxes = [] |
| export_segment_bboxes = [] |
| |
| |
| if stage5_results.get('normalized_bboxes_word'): |
| |
| for bbox_entry in stage5_results['normalized_bboxes_word']: |
| export_words.append(bbox_entry.get('text', '')) |
| bbox = bbox_entry.get('bbox', [0, 0, 1, 1]) |
| export_word_bboxes.append(bbox) |
| |
| if stage5_results.get('normalized_bboxes_segment'): |
| for bbox_entry in stage5_results['normalized_bboxes_segment']: |
| bbox = bbox_entry.get('bbox', [0, 0, 1, 1]) |
| export_segment_bboxes.append(bbox) |
| elif ocr_results: |
| |
| from pdf2image import convert_from_path |
| images = convert_from_path(pdf_path, dpi=settings.OCR_DPI) |
| img_width, img_height = images[0].size if images else (1000, 1000) |
| |
| for word in ocr_results.get('words', []): |
| export_words.append(word.get('text', '')) |
| bbox = word.get('bbox', {'x0': 0, 'y0': 0, 'x1': 1, 'y1': 1}) |
| |
| norm_bbox = [ |
| bbox['x0'] / img_width, |
| bbox['y0'] / img_height, |
| bbox['x1'] / img_width, |
| bbox['y1'] / img_height |
| ] |
| export_word_bboxes.append(norm_bbox) |
| export_segment_bboxes.append(norm_bbox) |
| else: |
| print(f" ⚠ No OCR data available for msgpack export") |
| |
| if export_words and export_word_bboxes: |
| |
| msgpack_path = pathlib.Path(tempfile.gettempdir()) / f"{doc_id}_dataset.msgpack" |
| |
| await export_to_msgpack( |
| document_id=doc_id, |
| image_path=None, |
| image_base64=final_image_b64, |
| words=export_words, |
| word_bboxes=export_word_bboxes, |
| segment_bboxes=export_segment_bboxes if export_segment_bboxes else export_word_bboxes, |
| ground_truth=gt, |
| output_path=msgpack_path, |
| image_width=None, |
| image_height=None |
| ) |
| |
| |
| if msgpack_path.exists(): |
| with open(msgpack_path, 'rb') as f: |
| msgpack_bytes = f.read() |
| msgpack_b64 = base64.b64encode(msgpack_bytes).decode('utf-8') |
| |
| dataset_export_info = DatasetExportInfo( |
| format="msgpack", |
| num_samples=1, |
| output_path=str(msgpack_path), |
| msgpack_base64=msgpack_b64 if len(msgpack_bytes) < 10_000_000 else None, |
| metadata={ |
| "document_id": doc_id, |
| "num_words": len(export_words), |
| "has_ground_truth": gt is not None, |
| "has_ocr": ocr_results is not None |
| } |
| ) |
| print(f" ✓ Dataset exported to msgpack: {msgpack_path}") |
| else: |
| print(f" ⚠ Export format '{request.prompt_params.dataset_export_format}' not supported. Only 'msgpack' is available.") |
| |
| except Exception as e: |
| print(f" ⚠ Dataset export failed: {str(e)}") |
| import traceback |
| traceback.print_exc() |
| |
| |
| handwriting_token_images_response = None |
| visual_element_images_response = None |
| token_mapping_response = None |
| |
| output_detail = request.prompt_params.output_detail |
| |
| if output_detail in ["dataset", "complete"]: |
| |
| from .utils import create_token_mapping_json |
| |
| if handwriting_images or visual_element_images: |
| handwriting_token_images_response = handwriting_images |
| visual_element_images_response = visual_element_images |
| token_mapping_response = create_token_mapping_json( |
| handwriting_regions, |
| handwriting_images, |
| visual_elements, |
| visual_element_images |
| ) |
| print(f" 📦 Output detail '{output_detail}': Including {len(handwriting_images)} handwriting tokens, {len(visual_element_images)} visual elements") |
| |
| |
| result = DocumentResult( |
| document_id=doc_id, |
| html=html_clean, |
| css=css, |
| ground_truth=gt, |
| pdf_base64=pdf_b64, |
| bboxes=bboxes, |
| page_width_mm=width_mm, |
| page_height_mm=height_mm, |
| image_base64=final_image_b64, |
| handwriting_regions=handwriting_regions, |
| visual_elements=visual_elements, |
| handwriting_token_images=handwriting_token_images_response, |
| visual_element_images=visual_element_images_response, |
| token_mapping=token_mapping_response, |
| ocr_results=ocr_results, |
| |
| normalized_bboxes_word=stage5_results.get('normalized_bboxes_word'), |
| normalized_bboxes_segment=stage5_results.get('normalized_bboxes_segment'), |
| gt_verification=stage5_results.get('gt_verification'), |
| analysis_stats=stage5_results.get('analysis_stats'), |
| debug_visualization=stage5_results.get('debug_visualization'), |
| dataset_export=dataset_export_info |
| ) |
| |
| results.append(result) |
| |
| except Exception as e: |
| print(f"Error processing document {idx}: {str(e)}") |
| |
| continue |
| |
| if not results: |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail="Failed to process any documents" |
| ) |
| |
| print(f"Successfully generated {len(results)} documents") |
| |
| |
| output_detail = request.prompt_params.output_detail |
| message = f"Successfully generated {len(results)} documents" |
| |
| if output_detail == "complete": |
| message += " ⚠️ WARNING: 'complete' output detail level may result in 50+ MB response" |
| elif output_detail == "dataset": |
| message += " (dataset mode: includes individual tokens)" |
| |
| return GenerateDocumentResponse( |
| success=True, |
| message=message, |
| documents=results, |
| total_documents=len(results) |
| ) |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"Internal server error: {str(e)}" |
| ) |
| finally: |
| |
| if 'assets_temp_dir' in locals() and assets_temp_dir and assets_temp_dir.exists(): |
| try: |
| shutil.rmtree(assets_temp_dir, ignore_errors=True) |
| print(f"✓ Cleaned up assets directory {assets_temp_dir}") |
| except: |
| pass |
|
|
|
|
| def parse_request_id(input_str: str) -> tuple: |
| """Extract user_id and request_id from input string (format: user_id/request_id or just request_id).""" |
| if "/" in input_str: |
| parts = input_str.split("/", 1) |
| return parts[0], parts[1] |
| return None, input_str |
|
|
|
|
| @app.post("/generate/pdf") |
| async def generate_document_pdf( |
| request: GenerateDocumentRequest, |
| background_tasks: BackgroundTasks |
| ): |
| """ |
| Generate documents and return them as downloadable PDF files (FAST DEMO ENDPOINT). |
| |
| This endpoint generates documents and returns a ZIP file immediately (20-60 seconds). |
| |
| **Workflow:** |
| 1. Frontend creates document_requests entry in Supabase with status="pending" |
| 2. Frontend sends request_id to this endpoint along with tokens and seed images |
| 3. API fetches existing request, validates, and starts generation |
| 4. API updates status through: processing → generating → completed/failed |
| 5. ZIP file is returned immediately |
| 6. If google_drive_token provided: ZIP is uploaded to GDrive in background |
| |
| **Request Parameters:** |
| - request_id: UUID of existing document_requests entry (required) |
| - seed_images: List of image URLs to use as document backgrounds (required) |
| - google_drive_token: OAuth token for GDrive upload (optional, enables backup) |
| - google_drive_refresh_token: Refresh token for GDrive (optional) |
| - prompt_params: Document generation parameters |
| |
| **Use Cases:** |
| - Quick demos and testing (with direct Claude API) |
| - Production with progress tracking and GDrive backup |
| |
| **For batch processing:** Use `/generate/async` (50% cheaper, 5-30 minutes) |
| """ |
| |
| user_id_from_input, request_id = parse_request_id(request.request_id) |
| user_id = user_id_from_input |
| supabase_enabled = False |
| gdrive_enabled = False |
| |
| try: |
| |
| from .supabase_client import supabase_client |
| |
| |
| existing_request = supabase_client.get_request(request_id) |
| if not existing_request: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail=f"Request {request_id} not found in database" |
| ) |
| |
| |
| if not user_id: |
| user_id = existing_request["user_id"] |
| |
| supabase_enabled = True |
| |
| print(f"[Request {request_id}] Processing request for user {user_id}") |
| print(f"[Request {request_id}] Current status: {existing_request['status']}") |
| |
| |
| if request.google_drive_token: |
| gdrive_enabled = True |
| |
| |
| assets_temp_dir = None |
| if supabase_enabled: |
| try: |
| assets_path = f"{user_id}/{request_id}/assets" |
| files = supabase_client.list_files("doc_storage", assets_path) |
| |
| |
| asset_files = [f for f in files if f.get('id') is not None] |
| |
| if asset_files: |
| assets_temp_dir = pathlib.Path(tempfile.mkdtemp()) |
| print(f"[Request {request_id}] Found {len(asset_files)} assets in storage, downloading...") |
| |
| for file_info in asset_files: |
| file_name = file_info['name'] |
| try: |
| file_content = supabase_client.download_file("doc_storage", f"{assets_path}/{file_name}") |
| with open(assets_temp_dir / file_name, 'wb') as f: |
| f.write(file_content) |
| print(f" ✓ Downloaded {file_name}") |
| except Exception as download_err: |
| print(f" ⚠ Failed to download {file_name}: {download_err}") |
| else: |
| print(f"[Request {request_id}] No assets found in {assets_path}") |
| except Exception as e: |
| print(f"[Request {request_id}] ⚠ Asset check/download failed: {e}") |
| print(f"[Request {request_id}] GDrive integration enabled") |
| |
| |
| try: |
| supabase_client.log_analytics_event( |
| user_id=user_id, |
| event_type="document_generation_started_sync", |
| entity_id=request_id |
| ) |
| except Exception as e: |
| print(f"[Request {request_id}] Warning: Analytics logging failed: {e}") |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| print(f"Error: Failed to fetch request from database: {e}") |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"Failed to fetch request: {str(e)}" |
| ) |
| |
| |
| if supabase_enabled: |
| try: |
| supabase_client.update_request_status(request_id, "downloading") |
| print(f"[Request {request_id}] Status: downloading (fetching seed images)") |
| except Exception as e: |
| print(f"Warning: Status update failed: {e}") |
| |
| try: |
| |
| print(f"Downloading {len(request.seed_images)} seed images...") |
| seed_images_base64 = [] |
| for url in request.seed_images: |
| try: |
| img_b64 = await download_image_to_base64(str(url)) |
| seed_images_base64.append(img_b64) |
| except Exception as e: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=f"Failed to download image from {url}: {str(e)}" |
| ) |
| |
| print(f"Successfully downloaded {len(seed_images_base64)} images") |
| |
| |
| prompt_template_path = ENV.PROMPT_TEMPLATES_DIR / "ClaudeRefined12" / "seed-based-json.txt" |
| |
| if not prompt_template_path.exists(): |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"Prompt template not found at {prompt_template_path}" |
| ) |
| |
| prompt = build_prompt( |
| language=request.prompt_params.language, |
| doc_type=request.prompt_params.doc_type, |
| gt_type=request.prompt_params.gt_type, |
| gt_format=request.prompt_params.gt_format, |
| num_solutions=request.prompt_params.num_solutions, |
| num_seed_images=len(seed_images_base64), |
| prompt_template_path=prompt_template_path, |
| enable_visual_elements=request.prompt_params.enable_visual_elements, |
| visual_element_types=request.prompt_params.visual_element_types |
| ) |
| |
| print("Prompt built successfully") |
| |
| |
| if supabase_enabled: |
| try: |
| supabase_client.update_request_status(request_id, "generating") |
| print(f"[Request {request_id}] Status: generating (calling LLM)") |
| except Exception as e: |
| print(f"Warning: Status update failed: {e}") |
| |
| |
| print(f"Calling Claude API with model {settings.CLAUDE_MODEL}...") |
| llm_response = await call_claude_api_direct( |
| prompt=prompt, |
| seed_images_base64=seed_images_base64, |
| api_key=settings.ANTHROPIC_API_KEY, |
| model=settings.CLAUDE_MODEL |
| ) |
| |
| print(f"Received LLM response ({len(llm_response)} chars)") |
| |
| |
| html_documents = extract_html_documents_from_response(llm_response) |
| |
| if not html_documents: |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail="No valid HTML documents found in LLM response" |
| ) |
| |
| print(f"Extracted {len(html_documents)} HTML documents") |
| |
| |
| output_detail = request.prompt_params.output_detail |
| |
| |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| tmp_path = pathlib.Path(tmp_dir) |
| |
| |
| from .dataset_exporter import DatasetExporter |
| exporter = DatasetExporter(tmp_path, dataset_name="docgenie_documents") |
| |
| pdf_files = [] |
| metadata = [] |
| |
| for idx, html in enumerate(html_documents): |
| try: |
| doc_id = f"document_{idx + 1}" |
| print(f"Processing document {idx + 1}/{len(html_documents)} (ID: {doc_id})") |
| |
| |
| original_pdf_path = None |
| |
| |
| gt, html_clean = extract_ground_truth(html) |
| |
| |
| print(f"\n 🔍 DEBUG - Handwriting Detection:") |
| print(f" - Contains 'handwritten' class: {'handwritten' in html_clean}") |
| |
| |
| import re |
| author_pattern = re.compile(r'\bauthor\d+\b') |
| author_matches = author_pattern.findall(html_clean) |
| |
| if 'handwritten' in html_clean: |
| |
| hw_count = html_clean.count('handwritten') |
| print(f" - 'handwritten' occurrences: {hw_count}") |
| print(f" - Author classes found: {len(author_matches)}") |
| if author_matches: |
| unique_authors = set(author_matches) |
| print(f" - Unique author IDs: {sorted(unique_authors)}") |
| else: |
| print(f" - ⚠️ NO author classes found (expected format: author1, author2, etc.)") |
| |
| |
| idx = html_clean.find('handwritten') |
| context_start = max(0, idx - 50) |
| context_end = min(len(html_clean), idx + 150) |
| print(f" - First match context: ...{html_clean[context_start:context_end]}...") |
| else: |
| print(f" - ⚠️ NO handwriting classes found in LLM output!") |
| |
| print(f" - HTML sample (first 500 chars): {html_clean[:500]}") |
| |
| print(f" 🔍 DEBUG - Visual Elements Detection:") |
| print(f" - Contains 'data-placeholder': {'data-placeholder' in html_clean}") |
| if 'data-placeholder' in html_clean: |
| ve_count = html_clean.count('data-placeholder') |
| print(f" - 'data-placeholder' occurrences: {ve_count}") |
| print() |
| |
| |
| pdf_path = tmp_path / f"{doc_id}.pdf" |
| pdf_path, width_mm, height_mm, geometries = await render_html_to_pdf( |
| html=html_clean, |
| output_pdf_path=pdf_path |
| ) |
| |
| print(f" - Rendered PDF: {width_mm:.1f}mm x {height_mm:.1f}mm") |
| |
| |
| bboxes_raw = extract_bboxes_from_rendered_pdf(pdf_path) |
| |
| print(f" - Extracted {len(bboxes_raw)} bounding boxes") |
| |
| |
| css, _ = extract_css_from_html(html_clean) |
| |
| |
| final_image_b64 = None |
| handwriting_regions = [] |
| visual_elements = [] |
| handwriting_images = {} |
| visual_element_images = {} |
| ocr_results = None |
| pdf_with_handwriting_path = None |
| pdf_final_path = None |
| |
| |
| original_pdf_path = pdf_path |
| |
| if request.prompt_params.enable_handwriting or request.prompt_params.enable_visual_elements: |
| print(f" 🎨 Processing Stages 07-13 (Handwriting & Visual Elements)...") |
| |
| try: |
| final_image_b64, handwriting_regions, visual_elements, handwriting_images, visual_element_images, pdf_with_handwriting_path, pdf_final_path = await process_stage3_complete( |
| pdf_path=pdf_path, |
| geometries=geometries, |
| ground_truth=gt, |
| bboxes_raw=bboxes_raw, |
| page_width_mm=width_mm, |
| page_height_mm=height_mm, |
| enable_handwriting=request.prompt_params.enable_handwriting, |
| handwriting_ratio=request.prompt_params.handwriting_ratio, |
| enable_visual_elements=request.prompt_params.enable_visual_elements, |
| visual_element_types=request.prompt_params.visual_element_types, |
| seed=request.prompt_params.seed, |
| assets_dir=assets_temp_dir, |
| barcode_number=request.prompt_params.barcode_number |
| ) |
| |
| |
| if pdf_final_path and pdf_final_path.exists(): |
| pdf_path = pdf_final_path |
| elif pdf_with_handwriting_path and pdf_with_handwriting_path.exists(): |
| pdf_path = pdf_with_handwriting_path |
| |
| print(f" ✓ Stages 07-13 complete: {len(handwriting_regions)} handwriting regions, {len(visual_elements)} visual elements") |
| print(f" - Individual tokens: {len(handwriting_images)} handwriting, {len(visual_element_images)} visual elements") |
| |
| except Exception as e: |
| print(f" ⚠ Stages 07-13 processing failed: {str(e)}") |
| |
| |
| |
| if request.prompt_params.enable_ocr: |
| print(f" 📄 Processing Stages 14-15 (OCR)...") |
| |
| try: |
| stage4_image, ocr_results = await process_stage4_ocr( |
| pdf_path=pdf_path, |
| enable_ocr=True, |
| dpi=settings.OCR_DPI |
| ) |
| |
| if ocr_results: |
| print(f" ✓ Stages 14-15 complete: OCR: {len(ocr_results.get('words', []))} words") |
| |
| except Exception as e: |
| print(f" ⚠ Stages 14-15 processing failed: {str(e)}") |
| |
| |
| |
| stage5_results = {} |
| if any([ |
| request.prompt_params.enable_bbox_normalization, |
| request.prompt_params.enable_gt_verification, |
| request.prompt_params.enable_analysis, |
| request.prompt_params.enable_debug_visualization |
| ]): |
| print(f" 📦 Processing Stages 16-18 (Dataset Packaging)...") |
| |
| try: |
| stage5_results = await process_stage5_complete( |
| document_id=doc_id, |
| pdf_path=pdf_path, |
| image_base64=final_image_b64, |
| ocr_results=ocr_results, |
| ground_truth=gt, |
| has_handwriting=request.prompt_params.enable_handwriting, |
| has_visual_elements=request.prompt_params.enable_visual_elements, |
| layout_elements=visual_elements, |
| enable_bbox_normalization=request.prompt_params.enable_bbox_normalization, |
| enable_gt_verification=request.prompt_params.enable_gt_verification, |
| enable_analysis=request.prompt_params.enable_analysis, |
| enable_debug_visualization=request.prompt_params.enable_debug_visualization |
| ) |
| print(f" ✓ Stages 16-18 complete") |
| except Exception as e: |
| print(f" ⚠ Stages 16-18 processing failed: {str(e)}") |
| |
| |
| |
| if original_pdf_path and pdf_path != original_pdf_path: |
| pdf_files.append(original_pdf_path) |
| pdf_files.append(pdf_path) |
| else: |
| pdf_files.append(pdf_path) |
| |
| |
| from .utils import extract_all_bboxes_from_pdf, extract_raw_annotations_from_geometries |
| print(f" 📦 Extracting bbox_pdf (word + char level) from original PDF...") |
| |
| try: |
| bboxes_pdf = extract_all_bboxes_from_pdf(original_pdf_path if original_pdf_path else pdf_path) |
| bbox_pdf_word = bboxes_pdf.get('word', []) |
| bbox_pdf_char = bboxes_pdf.get('char', []) |
| print(f" ✓ Extracted {len(bbox_pdf_word)} word bboxes, {len(bbox_pdf_char)} char bboxes from PDF") |
| except Exception as e: |
| print(f" ⚠ bbox_pdf extraction failed: {e}") |
| bbox_pdf_word = bboxes_raw |
| bbox_pdf_char = [] |
| |
| |
| raw_annotations = None |
| if geometries: |
| print(f" 📦 Extracting raw_annotations from geometries...") |
| try: |
| raw_annotations = extract_raw_annotations_from_geometries(geometries) |
| print(f" ✓ Extracted {len(raw_annotations)} layout annotations") |
| except Exception as e: |
| print(f" ⚠ raw_annotations extraction failed: {e}") |
| |
| |
| final_image_bytes = None |
| if final_image_b64: |
| import base64 |
| final_image_bytes = base64.b64decode(final_image_b64) |
| |
| |
| debug_viz_bytes = None |
| if stage5_results.get('debug_visualization'): |
| debug_viz_dict = stage5_results['debug_visualization'] |
| if debug_viz_dict and 'bbox_overlay_base64' in debug_viz_dict: |
| debug_viz_b64 = debug_viz_dict['bbox_overlay_base64'] |
| debug_viz_bytes = base64.b64decode(debug_viz_b64) |
| |
| |
| token_mapping_data = None |
| if output_detail in ["dataset", "complete"]: |
| if handwriting_images or visual_element_images: |
| from .utils import create_token_mapping_json |
| token_mapping_data = create_token_mapping_json( |
| handwriting_regions, |
| handwriting_images, |
| visual_elements, |
| visual_element_images |
| ) |
| print(f" 📦 Output detail '{output_detail}': Prepared {len(handwriting_images)} handwriting tokens, {len(visual_element_images)} visual elements") |
| |
| |
| bbox_final_word = None |
| bbox_final_segment = None |
| if ocr_results and ocr_results.get('words'): |
| |
| bbox_final_word = ocr_results.get('words', []) |
| bbox_final_segment = ocr_results.get('lines', []) |
| else: |
| |
| bbox_final_word = bbox_pdf_word |
| bbox_final_segment = [] |
| |
| |
| pdf_initial_bytes = original_pdf_path.read_bytes() |
| pdf_with_handwriting_bytes = pdf_with_handwriting_path.read_bytes() if pdf_with_handwriting_path and pdf_with_handwriting_path.exists() else None |
| pdf_final_bytes = pdf_final_path.read_bytes() if pdf_final_path and pdf_final_path.exists() else None |
| |
| |
| pdf_with_visual_elements_bytes = None |
| if pdf_final_bytes and not pdf_with_handwriting_bytes: |
| |
| pdf_with_visual_elements_bytes = pdf_final_bytes |
| pdf_final_bytes = None |
| |
| |
| print(f" 📦 Adding document to dataset exporter...") |
| exporter.add_document( |
| document_id=doc_id, |
| html=html_clean, |
| css=css, |
| pdf_initial=pdf_initial_bytes, |
| pdf_with_handwriting=pdf_with_handwriting_bytes, |
| pdf_with_visual_elements=pdf_with_visual_elements_bytes, |
| pdf_final=pdf_final_bytes, |
| final_image=final_image_bytes, |
| ground_truth=gt, |
| raw_annotations=raw_annotations, |
| bboxes_pdf_word=bbox_pdf_word, |
| bboxes_pdf_char=bbox_pdf_char, |
| bboxes_final_word=bbox_final_word, |
| bboxes_final_segment=bbox_final_segment, |
| bboxes_normalized_word=stage5_results.get('normalized_bboxes_word'), |
| bboxes_normalized_segment=stage5_results.get('normalized_bboxes_segment'), |
| gt_verification=stage5_results.get('gt_verification'), |
| token_mapping=token_mapping_data, |
| handwriting_regions=handwriting_regions, |
| handwriting_images=handwriting_images, |
| visual_elements=visual_elements, |
| visual_element_images=visual_element_images, |
| layout_elements=visual_elements, |
| geometries=geometries, |
| ocr_results=ocr_results, |
| analysis_stats=stage5_results.get('analysis_stats'), |
| debug_visualization=debug_viz_bytes |
| ) |
| print(f" ✓ Document {doc_id} added to dataset") |
| |
| |
| metadata.append({ |
| "document_id": doc_id, |
| "filename": f"{doc_id}.pdf", |
| "bboxes": bboxes_raw, |
| "ground_truth": gt, |
| "geometries": geometries, |
| "page_width_mm": width_mm, |
| "page_height_mm": height_mm, |
| "handwriting_regions": handwriting_regions, |
| "visual_elements": visual_elements, |
| "has_stage3_image": final_image_b64 is not None, |
| "ocr_results": ocr_results, |
| |
| "normalized_bboxes_word": stage5_results.get('normalized_bboxes_word'), |
| "normalized_bboxes_segment": stage5_results.get('normalized_bboxes_segment'), |
| "gt_verification": stage5_results.get('gt_verification'), |
| "analysis_stats": stage5_results.get('analysis_stats'), |
| "debug_visualization_available": stage5_results.get('debug_visualization') is not None |
| }) |
| |
| except Exception as e: |
| print(f"Error processing document {idx}: {str(e)}") |
| |
| continue |
| |
| if not pdf_files: |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail="Failed to process any documents" |
| ) |
| |
| print(f"Successfully generated {len(pdf_files)} documents") |
| |
| |
| print(f"📦 Finalizing dataset export...") |
| exporter.finalize( |
| request_id=request_id if request_id else "unnamed", |
| user_id=user_id, |
| prompt_params=request.prompt_params.dict(), |
| api_mode="sync" |
| ) |
| print(f"✓ Dataset structure finalized at {exporter.base_path}") |
| |
| |
| if supabase_enabled: |
| try: |
| supabase_client.update_request_status(request_id, "zipping") |
| print(f"[Request {request_id}] Status: zipping (creating ZIP archive)") |
| except Exception as e: |
| print(f"Warning: Status update failed: {e}") |
| |
| |
| print(f"📦 Creating ZIP archive from dataset...") |
| zip_buffer = io.BytesIO() |
| with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: |
| |
| for file_path in exporter.base_path.rglob('*'): |
| if file_path.is_file(): |
| arcname = file_path.relative_to(exporter.base_path.parent) |
| zip_file.write(file_path, arcname) |
| |
| zip_buffer.seek(0) |
| zip_size_mb = len(zip_buffer.getvalue()) / (1024 * 1024) |
| print(f"✓ ZIP created: {zip_size_mb:.2f} MB") |
| |
| |
| if supabase_enabled and request_id: |
| try: |
| from .supabase_client import supabase_client |
| supabase_client.update_request_status(request_id, "completed") |
| print(f"[Request {request_id}] Status: completed") |
| except Exception as e: |
| print(f"[Request {request_id}] ⚠ Supabase update failed: {e}") |
| |
| |
| temp_zip_path = pathlib.Path(tempfile.gettempdir()) / f"docgenie_{request_id}.zip" |
| temp_zip_path.write_bytes(zip_buffer.getvalue()) |
|
|
| |
| if gdrive_enabled and request_id and request.google_drive_token: |
| |
| try: |
| supabase_client.update_request_status(request_id, "uploading") |
| print(f"[Request {request_id}] Status: uploading (uploading to Google Drive)") |
| except Exception as e: |
| print(f"Warning: Status update failed: {e}") |
| |
| print(f"[Request {request_id}] Scheduling GDrive upload in background...") |
| |
| background_tasks.add_task( |
| upload_zip_to_gdrive_background, |
| request_id=request_id, |
| zip_path=temp_zip_path, |
| access_token=request.google_drive_token, |
| refresh_token=request.google_drive_refresh_token, |
| num_documents=len(pdf_files) |
| ) |
| |
| |
| if supabase_enabled: |
| import shutil |
| supabase_temp_dir = pathlib.Path(tempfile.gettempdir()) / f"docgenie_supabase_{request_id}" |
| if supabase_temp_dir.exists(): |
| shutil.rmtree(supabase_temp_dir, ignore_errors=True) |
| |
| |
| shutil.copytree(exporter.base_path, supabase_temp_dir) |
| |
| print(f"[Request {request_id}] Scheduling Supabase document upload in background...") |
| background_tasks.add_task( |
| upload_documents_to_supabase_background, |
| request_id=request_id, |
| user_id=str(user_id), |
| temp_dir=str(supabase_temp_dir), |
| num_documents=len(exporter.documents), |
| model_version=settings.LLM_MODEL, |
| zip_path=str(temp_zip_path) if 'temp_zip_path' in locals() else None |
| ) |
| |
| |
| headers = { |
| "Content-Disposition": f"attachment; filename=docgenie_documents_{uuid.uuid4().hex[:8]}.zip" |
| } |
| |
| |
| if supabase_enabled and request_id: |
| headers["X-Request-ID"] = request_id |
| headers["X-Status-URL"] = f"/jobs/{request_id}/status" |
| print(f"[Request {request_id}] Returning ZIP with tracking headers") |
| |
| return StreamingResponse( |
| zip_buffer, |
| media_type="application/zip", |
| headers=headers |
| ) |
| |
| except HTTPException as e: |
| |
| if supabase_enabled and request_id: |
| try: |
| from .supabase_client import supabase_client |
| supabase_client.update_request_status(request_id, "failed", error_message=str(e.detail)) |
| print(f"[Request {request_id}] Status: failed - {e.detail}") |
| except Exception as update_error: |
| print(f"Warning: Status update failed: {update_error}") |
| raise |
| except Exception as e: |
| |
| if supabase_enabled and request_id: |
| try: |
| from .supabase_client import supabase_client |
| supabase_client.update_request_status(request_id, "failed", error_message=str(e)) |
| print(f"[Request {request_id}] Status: failed - {str(e)}") |
| except Exception as sup_err: |
| print(f"[Request {request_id}] ⚠ Supabase update failed: {sup_err}") |
| print(f"Unexpected error: {str(e)}") |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"Internal server error: {str(e)}" |
| ) |
|
|
|
|
| |
|
|
| def upload_documents_to_supabase_background( |
| request_id: str, |
| user_id: str, |
| temp_dir: str, |
| num_documents: int, |
| model_version: str, |
| zip_path: Optional[str] = None |
| ): |
| """ |
| Background task to upload individual documents to Supabase Storage. |
| """ |
| import shutil |
| import pathlib |
| import traceback |
| |
| try: |
| print(f"[Background Task {request_id}] Starting Supabase individual document upload...") |
| from .supabase_client import supabase_client |
| |
| base_path = pathlib.Path(temp_dir) |
| |
| |
| zip_url = None |
| if zip_path and pathlib.Path(zip_path).exists(): |
| zip_file = pathlib.Path(zip_path) |
| zip_storage_path = f"{user_id}/{request_id}/generated/docgenie_{request_id}.zip" |
| supabase_client.upload_to_storage("doc_storage", zip_storage_path, zip_file.read_bytes(), "application/zip") |
| zip_url = supabase_client.get_public_url("doc_storage", zip_storage_path) |
| print(f"[Background Task {request_id}] ✓ Uploaded ZIP to Supabase: {zip_url}") |
| |
| for idx in range(num_documents): |
| doc_id = f"document_{idx + 1}" |
| |
| |
| doc_storage_path = f"{user_id}/{request_id}/generated/{idx}_doc.pdf" |
| gt_storage_path = f"{user_id}/{request_id}/generated/{idx}_gt.json" |
| html_storage_path = f"{user_id}/{request_id}/generated/{idx}_src.html" |
| bbox_storage_path = f"{user_id}/{request_id}/generated/{idx}_bbox.json" |
| |
| |
| local_pdf = base_path / "pdf" / "pdf_final" / f"{doc_id}.pdf" |
| if not local_pdf.exists(): |
| local_pdf = base_path / "pdf" / "pdf_initial" / f"{doc_id}.pdf" |
| |
| local_gt = base_path / "annotations" / "gt" / f"{doc_id}.json" |
| local_html = base_path / "html" / f"{doc_id}.html" |
| local_bbox = base_path / "bbox" / "bbox_final" / "word" / f"{doc_id}.json" |
| |
| |
| pdf_url = None |
| if local_pdf.exists(): |
| supabase_client.upload_to_storage("doc_storage", doc_storage_path, local_pdf.read_bytes(), "application/pdf") |
| pdf_url = supabase_client.get_public_url("doc_storage", doc_storage_path) |
| |
| if local_gt.exists(): |
| supabase_client.upload_to_storage("doc_storage", gt_storage_path, local_gt.read_bytes(), "application/json") |
| |
| if local_html.exists(): |
| supabase_client.upload_to_storage("doc_storage", html_storage_path, local_html.read_bytes(), "text/html") |
| |
| if local_bbox.exists(): |
| supabase_client.upload_to_storage("doc_storage", bbox_storage_path, local_bbox.read_bytes(), "application/json") |
| |
| supabase_client.create_generated_document( |
| request_id=request_id, |
| file_url=pdf_url, |
| file_type="application/pdf" if pdf_url else None, |
| model_version=model_version, |
| doc_index=idx, |
| doc_storage_path=doc_storage_path if local_pdf.exists() else None, |
| gt_storage_path=gt_storage_path if local_gt.exists() else None, |
| html_storage_path=html_storage_path if local_html.exists() else None, |
| bbox_storage_path=bbox_storage_path if local_bbox.exists() else None, |
| zip_url=zip_url |
| ) |
| print(f"[Background Task {request_id}] ✓ Uploaded and tracked document {idx}") |
| |
| except Exception as e: |
| print(f"[Background Task {request_id}] ⚠ Supabase upload failed: {str(e)}") |
| traceback.print_exc() |
| finally: |
| try: |
| |
| shutil.rmtree(temp_dir, ignore_errors=True) |
| print(f"[Background Task {request_id}] ✓ Cleaned up temporary directory {temp_dir}") |
| except Exception as e: |
| print(f"[Background Task {request_id}] ⚠ Failed to clean up temp dir: {e}") |
|
|
| def upload_zip_to_gdrive_background( |
| request_id: str, |
| zip_path: pathlib.Path, |
| access_token: str, |
| refresh_token: Optional[str], |
| num_documents: int |
| ): |
| """ |
| Background task to upload ZIP file to Google Drive. |
| |
| Args: |
| request_id: Supabase request ID |
| zip_path: Path to temporary ZIP file |
| access_token: Google Drive OAuth access token |
| refresh_token: Google Drive refresh token (optional) |
| num_documents: Number of documents in ZIP |
| """ |
| try: |
| print(f"[Background Task {request_id}] Starting GDrive upload...") |
| |
| from .google_drive import GoogleDriveClient |
| from .supabase_client import supabase_client |
| |
| |
| client = GoogleDriveClient( |
| access_token=access_token, |
| refresh_token=refresh_token |
| ) |
| |
| filename = f"docgenie_{request_id}.zip" |
| gdrive_url = client.upload_file( |
| file_path=zip_path, |
| filename=filename, |
| folder_name=settings.GOOGLE_DRIVE_FOLDER_NAME, |
| mime_type="application/zip" |
| ) |
| |
| print(f"[Background Task {request_id}] ✓ Uploaded to GDrive: {gdrive_url}") |
| |
| supabase_client.create_generated_document( |
| request_id=request_id, |
| file_url=gdrive_url, |
| file_type="application/zip", |
| model_version=settings.LLM_MODEL |
| ) |
| |
| print(f"[Background Task {request_id}] ✓ Updated Supabase with GDrive URL") |
| |
| |
| supabase_client.update_request_status(request_id, "completed") |
| print(f"[Background Task {request_id}] ✓ Status updated to completed") |
| |
| |
| zip_path.unlink(missing_ok=True) |
| print(f"[Background Task {request_id}] ✓ Cleaned up temp file") |
| |
| except Exception as e: |
| print(f"[Background Task {request_id}] ✗ GDrive upload failed: {str(e)}") |
| import traceback |
| traceback.print_exc() |
| |
| |
| try: |
| from .supabase_client import supabase_client |
| supabase_client.update_request_status(request_id, "completed_gdrive_failed") |
| print(f"[Background Task {request_id}] Status updated to completed_gdrive_failed") |
| except Exception as status_err: |
| print(f"[Background Task {request_id}] Failed to update status: {status_err}") |
| |
| |
| try: |
| zip_path.unlink(missing_ok=True) |
| except Exception: |
| pass |
|
|
|
|
| |
|
|
| from redis import Redis |
| from rq import Queue |
| from rq.job import Job |
| from .supabase_client import supabase_client |
| from .worker import process_document_generation_job |
|
|
|
|
| |
| try: |
| redis_conn = Redis.from_url(settings.REDIS_URL) |
| job_queue = Queue(settings.RQ_QUEUE_NAME, connection=redis_conn) |
| print(f"✓ Connected to Redis: {settings.REDIS_URL}") |
| print(f"✓ RQ Queue: {settings.RQ_QUEUE_NAME}") |
| except Exception as e: |
| print(f"⚠ Warning: Redis connection failed: {e}") |
| print(" Async endpoints will not work without Redis") |
| redis_conn = None |
| job_queue = None |
|
|
|
|
| @app.post("/generate/async") |
| async def generate_documents_async(request: GenerateDocumentRequest): |
| """ |
| Generate synthetic documents asynchronously using batched Claude API. |
| |
| **Workflow:** |
| 1. Frontend creates document_requests entry in Supabase with status="pending" |
| 2. Frontend sends request_id to this endpoint along with tokens and seed images |
| 3. API fetches existing request, validates, and enqueues background job |
| 4. API returns immediately with job info |
| 5. Background worker processes job and updates status: processing → generating → completed/failed |
| 6. User polls /jobs/{request_id}/status for progress |
| 7. Upon completion, ZIP is automatically uploaded to Google Drive |
| |
| Uses batched Claude API for 50% cost savings (but takes 5-30 minutes). |
| |
| Request body: |
| - request_id: UUID of existing document_requests entry (required) |
| - seed_images: List[str] (Supabase storage URLs) (required) |
| - google_drive_token: OAuth token for GDrive upload (optional) |
| - google_drive_refresh_token: Refresh token for GDrive (optional) |
| - prompt_params: dict (language, doc_type, num_solutions, etc.) |
| |
| Returns: |
| - request_id: UUID to track job |
| - status: "pending" |
| - estimated_time_minutes: int |
| - poll_url: URL to check status |
| """ |
| if not job_queue: |
| raise HTTPException( |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, |
| detail="Background job queue not available. Redis connection required." |
| ) |
| |
| |
| user_id_from_input, request_id = parse_request_id(request.request_id) |
| user_id = user_id_from_input |
| |
| try: |
| |
| existing_request = supabase_client.get_request(request_id) |
| if not existing_request: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail=f"Request {request_id} not found in database" |
| ) |
| |
| |
| if not user_id: |
| user_id = existing_request["user_id"] |
| |
| print(f"[Request {request_id}] Processing async request for user {user_id}") |
| print(f"[Request {request_id}] Current status: {existing_request['status']}") |
| |
|
|
| |
| |
| if not request.seed_images: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail="At least one seed image is required" |
| ) |
| |
| |
| supabase_client.update_request_status(request_id, "processing") |
| print(f"[Request {request_id}] Status: processing (queuing job)") |
| |
| |
| job_data = { |
| "user_id": user_id, |
| "google_drive_token": request.google_drive_token, |
| "google_drive_refresh_token": request.google_drive_refresh_token, |
| "seed_images": [str(url) for url in request.seed_images], |
| "prompt_params": request.prompt_params.dict() |
| } |
| |
| |
| job = job_queue.enqueue( |
| process_document_generation_job, |
| request_id=request_id, |
| request_data=job_data, |
| job_timeout='2h', |
| result_ttl=86400, |
| failure_ttl=86400 |
| ) |
| |
| print(f"Enqueued job {job.id} for request {request_id}") |
| |
| |
| num_solutions = request.prompt_params.num_solutions |
| if num_solutions <= 3: |
| estimated_time = 10 |
| elif num_solutions <= 10: |
| estimated_time = 20 |
| else: |
| estimated_time = 30 + (num_solutions - 10) * 2 |
| |
| |
| supabase_client.log_analytics_event( |
| user_id=user_id, |
| event_type="document_generation_requested", |
| entity_id=request_id |
| ) |
| |
| return { |
| "request_id": request_id, |
| "status": "pending", |
| "estimated_time_minutes": estimated_time, |
| "num_documents": num_solutions, |
| "poll_url": f"/jobs/{request_id}/status", |
| "message": f"Job queued successfully. Check status at /jobs/{request_id}/status" |
| } |
| |
| except HTTPException as http_exc: |
| |
| try: |
| supabase_client.update_request_status(request_id, "failed", error_message=str(http_exc.detail)) |
| print(f"[Request {request_id}] Status: failed - {http_exc.detail}") |
| except Exception as update_error: |
| print(f"Warning: Status update failed: {update_error}") |
| raise |
| except Exception as e: |
| print(f"Error creating async job: {str(e)}") |
| import traceback |
| traceback.print_exc() |
| |
| |
| try: |
| supabase_client.update_request_status(request_id, "failed", error_message=str(e)) |
| print(f"[Request {request_id}] Status: failed - {str(e)}") |
| except Exception as update_error: |
| print(f"Warning: Status update failed: {update_error}") |
| |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"Failed to create job: {str(e)}" |
| ) |
| finally: |
| |
| if 'assets_temp_dir' in locals() and assets_temp_dir and assets_temp_dir.exists(): |
| try: |
| shutil.rmtree(assets_temp_dir, ignore_errors=True) |
| print(f"[Request {request_id}] ✓ Cleaned up assets directory {assets_temp_dir}") |
| except: |
| pass |
|
|
|
|
| @app.get("/jobs/{request_id}/status") |
| async def get_job_status(request_id: str): |
| """ |
| Get status of a document generation job. |
| |
| Returns: |
| - request_id: UUID |
| - status: pending | processing | generating | completed | failed |
| - created_at: ISO timestamp |
| - updated_at: ISO timestamp |
| - error_message: str (if failed) |
| - results: dict with download_url (if completed) |
| """ |
| try: |
| |
| request_data = supabase_client.get_request(request_id) |
| |
| if not request_data: |
| raise HTTPException( |
| status_code=status.HTTP_404_NOT_FOUND, |
| detail=f"Request {request_id} not found" |
| ) |
| |
| response = { |
| "request_id": request_id, |
| "status": request_data["status"], |
| "created_at": request_data["created_at"], |
| "updated_at": request_data["updated_at"], |
| "num_documents": request_data["metadata"]["prompt_params"]["num_solutions"] |
| } |
| |
| |
| if request_data["status"] == "failed": |
| response["error_message"] = request_data.get("error_message") |
| |
| |
| if request_data["status"] == "completed": |
| |
| generated_docs = supabase_client.get_generated_documents(request_id) |
| |
| if generated_docs: |
| response["results"] = { |
| "documents": [ |
| { |
| "id": doc.get("id"), |
| "doc_index": doc.get("doc_index"), |
| "pdf_url": doc.get("file_url"), |
| "doc_storage_path": doc.get("doc_storage_path"), |
| "gt_storage_path": doc.get("gt_storage_path"), |
| "html_storage_path": doc.get("html_storage_path"), |
| "bbox_storage_path": doc.get("bbox_storage_path") |
| } for doc in generated_docs if doc.get("doc_index") is not None |
| ], |
| "zip_filename": f"docgenie_{request_id}.zip" |
| } |
| |
| |
| zip_docs = [doc for doc in generated_docs if doc.get("file_type") == "application/zip"] |
| if zip_docs: |
| response["results"]["download_url"] = zip_docs[0].get("file_url") |
| |
| return response |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| print(f"Error fetching job status: {str(e)}") |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"Failed to fetch job status: {str(e)}" |
| ) |
|
|
|
|
| @app.get("/jobs/user/{user_id}") |
| async def get_user_jobs(user_id: int, limit: int = 50, offset: int = 0): |
| """ |
| Get all jobs for a user. |
| |
| Query params: |
| - limit: int (default: 50, max: 100) |
| - offset: int (default: 0) |
| |
| Returns: |
| List of job status objects |
| """ |
| try: |
| |
| if limit > 100: |
| limit = 100 |
| |
| |
| requests = supabase_client.get_user_requests(user_id, limit, offset) |
| |
| results = [] |
| for request_data in requests: |
| result = { |
| "request_id": request_data["id"], |
| "status": request_data["status"], |
| "created_at": request_data["created_at"], |
| "updated_at": request_data["updated_at"], |
| "num_documents": request_data["metadata"]["prompt_params"]["num_solutions"] |
| } |
| |
| if request_data["status"] == "failed": |
| result["error_message"] = request_data.get("error_message") |
| |
| if request_data["status"] == "completed": |
| |
| generated_docs = supabase_client.get_generated_documents(request_data["id"]) |
| if generated_docs: |
| result["download_url"] = generated_docs[0]["file_url"] |
| |
| results.append(result) |
| |
| return { |
| "user_id": user_id, |
| "jobs": results, |
| "count": len(results), |
| "limit": limit, |
| "offset": offset |
| } |
| |
| except Exception as e: |
| print(f"Error fetching user jobs: {str(e)}") |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"Failed to fetch user jobs: {str(e)}" |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| uvicorn.run( |
| "main:app", |
| host=settings.API_HOST, |
| port=settings.API_PORT, |
| reload=settings.DEBUG_MODE |
| ) |
|
|