| """ |
| Background worker for processing document generation jobs using batched Claude API. |
| Runs as RQ worker process. |
| """ |
|
|
| import asyncio |
| import io |
| import json |
| import os |
| import pathlib |
| import tempfile |
| import time |
| import traceback |
| import zipfile |
| import shutil |
| from typing import Dict, Any, List, Callable |
| from datetime import datetime |
|
|
| |
| print("============================================================") |
| print("π§ Worker Configuration Check") |
| print("============================================================") |
|
|
| from .config import settings |
|
|
| |
| print(f"β ANTHROPIC_API_KEY: {'Set' if settings.ANTHROPIC_API_KEY else 'β MISSING'}") |
| print(f"β SUPABASE: {settings.SUPABASE_URL[:40]}..." if settings.SUPABASE_URL else "β MISSING") |
| print(f"β GOOGLE_CLIENT_ID: {settings.GOOGLE_CLIENT_ID[:30]}..." if settings.GOOGLE_CLIENT_ID else "β MISSING") |
| print(f"β GOOGLE_CLIENT_SECRET: {'Set' if settings.GOOGLE_CLIENT_SECRET else 'β MISSING'}") |
| if settings.GOOGLE_CLIENT_ID and settings.GOOGLE_CLIENT_SECRET: |
| print(f" β Token auto-refresh: ENABLED") |
| print("============================================================") |
|
|
| from .supabase_client import supabase_client |
| from .google_drive import GoogleDriveClient |
| from .utils import ( |
| download_seed_images, |
| build_prompt, |
| extract_html_documents_from_response, |
| extract_ground_truth, |
| extract_css_from_html, |
| render_html_to_pdf, |
| extract_bboxes_from_rendered_pdf, |
| pdf_to_base64, |
| process_stage3_complete, |
| process_stage4_ocr, |
| process_stage5_complete, |
| validate_html_structure, |
| validate_pdf, |
| validate_bboxes |
| ) |
| from docgenie.generation.pipeline_01.claude_batching import ClaudeBatchedClient |
| from docgenie import ENV |
|
|
|
|
| |
| |
| VERBOSE_LOGGING = os.getenv('WORKER_VERBOSE_LOGGING', 'false').lower() in ('true', '1', 'yes') |
|
|
| def log_verbose(message: str): |
| """Log message only if verbose logging is enabled""" |
| if VERBOSE_LOGGING: |
| print(message) |
|
|
|
|
| |
| def validate_worker_config(): |
| """Validate worker configuration at startup""" |
| print("=" * 60) |
| print("π§ Worker Configuration Check") |
| print("=" * 60) |
| |
| |
| if settings.ANTHROPIC_API_KEY: |
| print("β ANTHROPIC_API_KEY: Set") |
| else: |
| print("β ANTHROPIC_API_KEY: NOT SET (REQUIRED)") |
| |
| |
| if settings.SUPABASE_URL and settings.SUPABASE_KEY: |
| print(f"β SUPABASE: {settings.SUPABASE_URL[:30]}...") |
| else: |
| print("β SUPABASE: NOT SET (REQUIRED)") |
| |
| |
| if settings.GOOGLE_CLIENT_ID and settings.GOOGLE_CLIENT_SECRET: |
| print(f"β GOOGLE_CLIENT_ID: {settings.GOOGLE_CLIENT_ID[:20]}...") |
| print("β GOOGLE_CLIENT_SECRET: Set") |
| print(" β Token auto-refresh: ENABLED") |
| else: |
| print("β GOOGLE_CLIENT_ID/SECRET: Not set") |
| print(" β Token auto-refresh: DISABLED") |
| print(" β Users must provide fresh access tokens that don't expire during processing") |
| |
| print("=" * 60) |
|
|
| |
| validate_worker_config() |
|
|
|
|
| def retry_on_network_error(func: Callable, max_retries: int = 3, delay: float = 2.0) -> Any: |
| """ |
| Retry a function on network errors with exponential backoff. |
| |
| Args: |
| func: Function to execute (must be callable with no args) |
| max_retries: Maximum number of retry attempts |
| delay: Initial delay in seconds (doubles each retry) |
| |
| Returns: |
| Result of the function call |
| |
| Raises: |
| Last exception if all retries fail |
| """ |
| last_exception = None |
| for attempt in range(max_retries): |
| try: |
| return func() |
| except Exception as e: |
| last_exception = e |
| error_str = str(e).lower() |
| |
| if any(err in error_str for err in ['name resolution', 'connection', 'timeout', 'network']): |
| if attempt < max_retries - 1: |
| wait_time = delay * (2 ** attempt) |
| print(f"[Retry {attempt + 1}/{max_retries}] Network error, retrying in {wait_time}s: {e}") |
| time.sleep(wait_time) |
| continue |
| |
| raise |
| |
| raise last_exception |
|
|
|
|
| async def process_document_generation_job_async(request_id: str, request_data: Dict[str, Any]): |
| """ |
| Async background job function - processes document generation using batched Claude API. |
| |
| This function: |
| 1. Creates Claude batch with single message (generates N documents) |
| 2. Polls batch until completion |
| 3. Processes all documents (PDFs, handwriting, etc.) |
| 4. Uploads ZIP to user's Google Drive |
| 5. Updates Supabase with results |
| |
| Args: |
| request_id: Document request UUID from Supabase |
| request_data: Request parameters dict containing: |
| - user_id: int |
| - seed_images: List[str] (URLs) |
| - prompt_params: Dict (language, doc_type, num_solutions, etc.) |
| |
| Raises: |
| Exception: Any error during processing (logged to Supabase) |
| """ |
| user_id = request_data['user_id'] |
| google_drive_token = request_data.get('google_drive_token') |
| google_drive_refresh_token = request_data.get('google_drive_refresh_token') |
| seed_image_urls = request_data['seed_images'] |
| prompt_params = request_data['prompt_params'] |
| |
| |
| if google_drive_refresh_token: |
| if not settings.GOOGLE_CLIENT_ID or not settings.GOOGLE_CLIENT_SECRET: |
| print(f"[Job {request_id}] β οΈ WARNING: refresh_token provided but GOOGLE_CLIENT_ID/SECRET not configured") |
| print(f"[Job {request_id}] Token auto-refresh will fail. Ensure access token remains valid.") |
| |
| |
| with tempfile.TemporaryDirectory() as tmp_dir: |
| tmp_path = pathlib.Path(tmp_dir) |
| batch_dir = tmp_path / "batches" |
| message_dir = tmp_path / "messages" |
| batch_dir.mkdir(exist_ok=True) |
| message_dir.mkdir(exist_ok=True) |
| |
| |
| from .dataset_exporter import DatasetExporter |
| exporter = DatasetExporter(tmp_path, dataset_name="docgenie_documents") |
| |
| try: |
| |
| retry_on_network_error(lambda: supabase_client.update_request_status(request_id, "downloading")) |
| print(f"[Job {request_id}] Status: downloading (fetching seed images)") |
| |
| |
| log_verbose(f"[Job {request_id}] Downloading {len(seed_image_urls)} seed images...") |
| seed_images_base64 = download_seed_images(seed_image_urls) |
| log_verbose(f"[Job {request_id}] Downloaded {len(seed_images_base64)} images") |
| |
| |
| prompt_template_path = ENV.PROMPT_TEMPLATES_DIR / "ClaudeRefined12" / "seed-based-json.txt" |
| |
| if not prompt_template_path.exists(): |
| raise FileNotFoundError(f"Prompt template not found: {prompt_template_path}") |
| |
| prompt = build_prompt( |
| language=prompt_params.get('language', 'English'), |
| doc_type=prompt_params.get('doc_type', 'business and administrative'), |
| gt_type=prompt_params.get('gt_type', 'Questions and answers'), |
| gt_format=prompt_params.get('gt_format', '{"question": "answer"}'), |
| num_solutions=prompt_params.get('num_solutions', 1), |
| num_seed_images=len(seed_images_base64), |
| prompt_template_path=prompt_template_path, |
| enable_visual_elements=prompt_params.get('enable_visual_elements', False), |
| visual_element_types=prompt_params.get('visual_element_types', []) |
| ) |
| log_verbose(f"[Job {request_id}] Prompt built") |
| |
| |
| log_verbose(f"[Job {request_id}] Creating Claude batch (batched API)...") |
| |
| client = ClaudeBatchedClient(api_key=settings.ANTHROPIC_API_KEY) |
| |
| |
| client.send_batch( |
| model=settings.CLAUDE_MODEL, |
| prompts=[prompt], |
| images_base64=[seed_images_base64], |
| image_docids=[["seed"] * len(seed_images_base64)], |
| batch_data_directory=batch_dir, |
| max_tokens=16384 |
| ) |
| |
| print(f"[Job {request_id}] β³ Batch created, processing for Claude to process...") |
| |
| |
| client.await_batches( |
| batch_data_directory=batch_dir, |
| message_data_directory=message_dir, |
| sleep_seconds_between_batch=2, |
| sleep_seconds_iteration=settings.BATCH_POLL_INTERVAL |
| ) |
| |
| print(f"[Job {request_id}] β Batch complete") |
| |
| |
| message_files = list(message_dir.glob("*.json")) |
| |
| if not message_files: |
| raise RuntimeError("No message results found after batch completion") |
| |
| message_data = json.loads(message_files[0].read_text()) |
| |
| if message_data.get('result_type') != 'succeeded': |
| error_msg = message_data.get('error', 'Unknown error from Claude API') |
| raise RuntimeError(f"Claude API error: {error_msg}") |
| |
| llm_response = message_data['response'] |
| log_verbose(f"[Job {request_id}] Received LLM response ({len(llm_response)} chars)") |
| |
| |
| html_documents = extract_html_documents_from_response(llm_response) |
| |
| if not html_documents: |
| raise RuntimeError("No valid HTML documents found in LLM response") |
| |
| print(f"[Job {request_id}] β Extracted {len(html_documents)} documents") |
| |
| |
| retry_on_network_error(lambda: supabase_client.update_request_status(request_id, "generating")) |
| print(f"[Job {request_id}] Status: generating (processing documents)") |
| |
| |
| assets_temp_dir = None |
| try: |
| assets_path = f"{user_id}/{request_id}/assets" |
| files = supabase_client.list_files("doc_storage", assets_path) |
| |
| |
| asset_files = [f for f in files if f.get('id') is not None] |
| |
| if asset_files: |
| assets_temp_dir = pathlib.Path(tempfile.mkdtemp()) |
| print(f"[Job {request_id}] Found {len(asset_files)} assets in storage, downloading...") |
| |
| for file_info in asset_files: |
| file_name = file_info['name'] |
| try: |
| file_content = supabase_client.download_file("doc_storage", f"{assets_path}/{file_name}") |
| with open(assets_temp_dir / file_name, 'wb') as f: |
| f.write(file_content) |
| log_verbose(f" β Downloaded {file_name}") |
| except Exception as download_err: |
| print(f" β Failed to download {file_name}: {download_err}") |
| else: |
| log_verbose(f"[Job {request_id}] No assets found in {assets_path}") |
| except Exception as e: |
| print(f"[Job {request_id}] β Asset check/download failed: {e}") |
| |
| |
| pdf_files = [] |
| metadata = [] |
| |
| for idx, html in enumerate(html_documents): |
| try: |
| doc_id = f"document_{idx + 1}" |
| log_verbose(f"[Job {request_id}] Processing document {idx + 1}/{len(html_documents)}") |
| |
| |
| original_pdf_path = None |
| |
| |
| is_valid, error_msg = validate_html_structure(html) |
| if not is_valid: |
| print(f"[Job {request_id}] Document {idx + 1} HTML validation failed: {error_msg}") |
| continue |
| |
| |
| gt, html_clean = extract_ground_truth(html) |
| css, _ = extract_css_from_html(html_clean) |
| |
| |
| pdf_path = tmp_path / f"{doc_id}.pdf" |
| pdf_path, width_mm, height_mm, geometries = await render_html_to_pdf( |
| html=html_clean, |
| output_pdf_path=pdf_path |
| ) |
| |
| |
| original_pdf_path = pdf_path |
| |
| |
| is_valid, error_msg = validate_pdf(pdf_path) |
| if not is_valid: |
| print(f"[Job {request_id}] Document {idx + 1} PDF validation failed: {error_msg}") |
| continue |
| |
| |
| bboxes_raw = extract_bboxes_from_rendered_pdf(pdf_path) |
| |
| |
| is_valid, error_msg = validate_bboxes(bboxes_raw, min_bbox_count=1) |
| if not is_valid: |
| print(f"[Job {request_id}] Document {idx + 1} BBox validation warning: {error_msg}") |
| |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: Extracted {len(bboxes_raw)} bboxes") |
| |
| |
| final_image_b64 = None |
| handwriting_regions = [] |
| visual_elements = [] |
| handwriting_images = {} |
| visual_element_images = {} |
| ocr_results = None |
| pdf_with_handwriting_path = None |
| pdf_final_path = None |
| |
| if prompt_params.get('enable_handwriting') or prompt_params.get('enable_visual_elements'): |
| |
| if prompt_params.get('enable_handwriting'): |
| retry_on_network_error(lambda: supabase_client.update_request_status(request_id, "handwriting")) |
| log_verbose(f"[Job {request_id}] Status: handwriting (generating handwritten text)") |
| |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: Processing handwriting/visual elements...") |
| |
| try: |
| final_image_b64, handwriting_regions, visual_elements, handwriting_images, visual_element_images, pdf_with_handwriting_path, pdf_final_path = await process_stage3_complete( |
| pdf_path=pdf_path, |
| geometries=geometries, |
| ground_truth=gt, |
| bboxes_raw=bboxes_raw, |
| page_width_mm=width_mm, |
| page_height_mm=height_mm, |
| enable_handwriting=prompt_params.get('enable_handwriting', False), |
| handwriting_ratio=prompt_params.get('handwriting_ratio', 0.3), |
| enable_visual_elements=prompt_params.get('enable_visual_elements', False), |
| visual_element_types=prompt_params.get('visual_element_types', []), |
| seed=prompt_params.get('seed'), |
| assets_dir=assets_temp_dir |
| ) |
| |
| |
| if pdf_final_path and pdf_final_path.exists(): |
| pdf_path = pdf_final_path |
| elif pdf_with_handwriting_path and pdf_with_handwriting_path.exists(): |
| pdf_path = pdf_with_handwriting_path |
| |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: {len(handwriting_regions)} handwriting, {len(visual_elements)} visual elements") |
| |
| except Exception as e: |
| print(f"[Job {request_id}] Document {idx + 1}: Stage 3 failed: {str(e)}") |
| |
| |
| if prompt_params.get('enable_ocr'): |
| |
| retry_on_network_error(lambda: supabase_client.update_request_status(request_id, "ocr")) |
| log_verbose(f"[Job {request_id}] Status: ocr (running OCR on documents)") |
| |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: Processing OCR...") |
| |
| try: |
| stage4_image, ocr_results = await process_stage4_ocr( |
| pdf_path=pdf_path, |
| enable_ocr=True, |
| dpi=settings.OCR_DPI |
| ) |
| |
| if ocr_results: |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: OCR complete - {len(ocr_results.get('words', []))} words") |
| |
| except Exception as e: |
| print(f"[Job {request_id}] Document {idx + 1}: OCR failed: {str(e)}") |
| |
| |
| stage5_results = {} |
| if any([ |
| prompt_params.get('enable_bbox_normalization'), |
| prompt_params.get('enable_gt_verification'), |
| prompt_params.get('enable_analysis'), |
| prompt_params.get('enable_debug_visualization') |
| ]): |
| |
| if prompt_params.get('enable_gt_verification'): |
| retry_on_network_error(lambda: supabase_client.update_request_status(request_id, "validation")) |
| log_verbose(f"[Job {request_id}] Status: validation (validating ground truth)") |
| |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: Processing dataset packaging...") |
| |
| try: |
| stage5_results = await process_stage5_complete( |
| document_id=doc_id, |
| pdf_path=pdf_path, |
| image_base64=final_image_b64, |
| ocr_results=ocr_results, |
| ground_truth=gt, |
| has_handwriting=prompt_params.get('enable_handwriting', False), |
| has_visual_elements=prompt_params.get('enable_visual_elements', False), |
| layout_elements=visual_elements, |
| enable_bbox_normalization=prompt_params.get('enable_bbox_normalization', False), |
| enable_gt_verification=prompt_params.get('enable_gt_verification', False), |
| enable_analysis=prompt_params.get('enable_analysis', False), |
| enable_debug_visualization=prompt_params.get('enable_debug_visualization', False) |
| ) |
| |
| except Exception as e: |
| print(f"[Job {request_id}] Document {idx + 1}: Stage 5 failed: {str(e)}") |
| |
| |
| if original_pdf_path and pdf_path != original_pdf_path: |
| pdf_files.append(original_pdf_path) |
| pdf_files.append(pdf_path) |
| else: |
| pdf_files.append(pdf_path) |
| |
| |
| from .utils import extract_all_bboxes_from_pdf, extract_raw_annotations_from_geometries |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: π¦ Extracting bbox_pdf (word + char level) from original PDF...") |
| |
| try: |
| bboxes_pdf = extract_all_bboxes_from_pdf(original_pdf_path if original_pdf_path else pdf_path) |
| bbox_pdf_word = bboxes_pdf.get('word', []) |
| bbox_pdf_char = bboxes_pdf.get('char', []) |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: β Extracted {len(bbox_pdf_word)} word bboxes, {len(bbox_pdf_char)} char bboxes from PDF") |
| except Exception as e: |
| print(f"[Job {request_id}] Document {idx + 1}: β bbox_pdf extraction failed: {e}") |
| bbox_pdf_word = bboxes_raw |
| bbox_pdf_char = [] |
| |
| |
| raw_annotations = None |
| if geometries: |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: π¦ Extracting raw_annotations from geometries...") |
| try: |
| raw_annotations = extract_raw_annotations_from_geometries(geometries) |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: β Extracted {len(raw_annotations)} layout annotations") |
| except Exception as e: |
| print(f"[Job {request_id}] Document {idx + 1}: β raw_annotations extraction failed: {e}") |
| |
| |
| final_image_bytes = None |
| if final_image_b64: |
| import base64 |
| final_image_bytes = base64.b64decode(final_image_b64) |
| |
| |
| debug_viz_bytes = None |
| if stage5_results.get('debug_visualization'): |
| import base64 |
| debug_viz_dict = stage5_results['debug_visualization'] |
| if debug_viz_dict and 'bbox_overlay_base64' in debug_viz_dict: |
| debug_viz_b64 = debug_viz_dict['bbox_overlay_base64'] |
| debug_viz_bytes = base64.b64decode(debug_viz_b64) |
| |
| |
| output_detail = prompt_params.get('output_detail', 'minimal') |
| token_mapping_data = None |
| if output_detail in ["dataset", "complete"]: |
| if handwriting_images or visual_element_images: |
| from .utils import create_token_mapping_json |
| token_mapping_data = create_token_mapping_json( |
| handwriting_regions, |
| handwriting_images, |
| visual_elements, |
| visual_element_images |
| ) |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: π¦ Output detail '{output_detail}': Prepared {len(handwriting_images)} handwriting tokens, {len(visual_element_images)} visual elements") |
| |
| |
| bbox_final_word = None |
| bbox_final_segment = None |
| if ocr_results and ocr_results.get('words'): |
| |
| bbox_final_word = ocr_results.get('words', []) |
| bbox_final_segment = ocr_results.get('lines', []) |
| else: |
| |
| bbox_final_word = bbox_pdf_word |
| bbox_final_segment = [] |
| |
| |
| pdf_initial_bytes = original_pdf_path.read_bytes() |
| |
| |
| pdf_with_handwriting_bytes = None |
| pdf_final_bytes = None |
| pdf_with_visual_elements_bytes = None |
| |
| if pdf_with_handwriting_path and pdf_with_handwriting_path.exists(): |
| pdf_with_handwriting_bytes = pdf_with_handwriting_path.read_bytes() |
| |
| if pdf_final_path and pdf_final_path.exists(): |
| pdf_final_bytes = pdf_final_path.read_bytes() |
| |
| |
| if pdf_final_bytes and not pdf_with_handwriting_bytes: |
| pdf_with_visual_elements_bytes = pdf_final_bytes |
| pdf_final_bytes = None |
| |
| |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: π¦ Adding document to dataset exporter...") |
| exporter.add_document( |
| document_id=doc_id, |
| html=html_clean, |
| css=css, |
| pdf_initial=pdf_initial_bytes, |
| pdf_with_handwriting=pdf_with_handwriting_bytes, |
| pdf_with_visual_elements=pdf_with_visual_elements_bytes, |
| pdf_final=pdf_final_bytes, |
| final_image=final_image_bytes, |
| ground_truth=gt, |
| raw_annotations=raw_annotations, |
| bboxes_pdf_word=bbox_pdf_word, |
| bboxes_pdf_char=bbox_pdf_char, |
| bboxes_final_word=bbox_final_word, |
| bboxes_final_segment=bbox_final_segment, |
| bboxes_normalized_word=stage5_results.get('normalized_bboxes_word'), |
| bboxes_normalized_segment=stage5_results.get('normalized_bboxes_segment'), |
| gt_verification=stage5_results.get('gt_verification'), |
| token_mapping=token_mapping_data, |
| handwriting_regions=handwriting_regions, |
| handwriting_images=handwriting_images, |
| visual_elements=visual_elements, |
| visual_element_images=visual_element_images, |
| layout_elements=visual_elements, |
| geometries=geometries, |
| ocr_results=ocr_results, |
| analysis_stats=stage5_results.get('analysis_stats'), |
| debug_visualization=debug_viz_bytes |
| ) |
| log_verbose(f"[Job {request_id}] Document {idx + 1}: β Document {doc_id} added to dataset") |
| |
| |
| metadata.append({ |
| "document_id": doc_id, |
| "filename": f"{doc_id}.pdf", |
| "bboxes": bboxes_raw, |
| "ground_truth": gt, |
| "geometries": geometries, |
| "page_width_mm": width_mm, |
| "page_height_mm": height_mm, |
| "handwriting_regions": handwriting_regions, |
| "visual_elements": visual_elements, |
| "has_stage3_image": final_image_b64 is not None, |
| "ocr_results": ocr_results, |
| |
| "normalized_bboxes_word": stage5_results.get('normalized_bboxes_word'), |
| "normalized_bboxes_segment": stage5_results.get('normalized_bboxes_segment'), |
| "gt_verification": stage5_results.get('gt_verification'), |
| "analysis_stats": stage5_results.get('analysis_stats'), |
| "debug_visualization_available": stage5_results.get('debug_visualization') is not None |
| }) |
| |
| except Exception as e: |
| print(f"[Job {request_id}] Error processing document {idx + 1}: {str(e)}") |
| traceback.print_exc() |
| continue |
| |
| if not pdf_files: |
| raise RuntimeError("Failed to process any documents") |
| |
| log_verbose(f"[Job {request_id}] Processed {len(pdf_files)} PDF files") |
| |
| |
| log_verbose(f"[Job {request_id}] π¦ Finalizing dataset export...") |
| exporter.finalize( |
| request_id=request_id, |
| user_id=user_id, |
| prompt_params=prompt_params, |
| api_mode="async" |
| ) |
| log_verbose(f"[Job {request_id}] β Dataset structure finalized at {exporter.base_path}") |
| |
| |
| retry_on_network_error(lambda: supabase_client.update_request_status(request_id, "zipping")) |
| print(f"[Job {request_id}] Status: zipping (creating ZIP archive)") |
| |
| |
| log_verbose(f"[Job {request_id}] π¦ Creating ZIP archive from dataset...") |
| zip_path = tmp_path / f"docgenie_{request_id}.zip" |
| |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zip_file: |
| |
| for file_path in exporter.base_path.rglob('*'): |
| if file_path.is_file(): |
| arcname = file_path.relative_to(exporter.base_path.parent) |
| zip_file.write(file_path, arcname) |
| |
| zip_size_mb = zip_path.stat().st_size / (1024 * 1024) |
| log_verbose(f"[Job {request_id}] β ZIP created: {zip_size_mb:.2f} MB") |
| |
| |
| retry_on_network_error(lambda: supabase_client.update_request_status(request_id, "uploading")) |
| print(f"[Job {request_id}] Status: uploading (uploading to Google Drive)") |
| |
| |
| print(f"[Job {request_id}] β¬οΈ Uploading to Google Drive...") |
| |
| google_drive_url = None |
| gdrive_failed = False |
| |
| if not google_drive_token: |
| print(f"[Job {request_id}] No Google Drive token provided. Skipping Google Drive upload.") |
| else: |
| try: |
| drive_client = GoogleDriveClient( |
| access_token=google_drive_token, |
| refresh_token=google_drive_refresh_token |
| ) |
| google_drive_url = drive_client.upload_file( |
| file_path=zip_path, |
| filename=f"docgenie_{request_id}.zip", |
| folder_name=settings.GOOGLE_DRIVE_FOLDER_NAME |
| ) |
| |
| print(f"[Job {request_id}] β Uploaded to Google Drive: {google_drive_url}") |
| |
| except Exception as e: |
| print(f"[Job {request_id}] Google Drive upload failed: {str(e)}") |
| gdrive_failed = True |
| |
| |
| |
| log_verbose(f"[Job {request_id}] Saving results to Supabase...") |
| log_verbose(f"[Job {request_id}] URL: {google_drive_url}") |
| |
| |
| zip_url = None |
| try: |
| zip_storage_path = f"{user_id}/{request_id}/generated/docgenie_{request_id}.zip" |
| supabase_client.upload_to_storage("doc_storage", zip_storage_path, zip_path.read_bytes(), "application/zip") |
| zip_url = supabase_client.get_public_url("doc_storage", zip_storage_path) |
| print(f"[Job {request_id}] β Uploaded ZIP to Supabase: {zip_url}") |
| except Exception as e: |
| print(f"[Job {request_id}] β Supabase ZIP upload failed: {e}") |
|
|
| |
| retry_on_network_error(lambda: supabase_client.create_generated_document( |
| request_id=request_id, |
| file_url=google_drive_url, |
| file_type="application/zip", |
| page_count=len(metadata), |
| model_version=settings.LLM_MODEL, |
| zip_url=zip_url |
| )) |
| |
| |
| status = "completed_gdrive_failed" if gdrive_failed else "completed" |
| retry_on_network_error(lambda: supabase_client.update_request_status(request_id, status)) |
| |
| |
| retry_on_network_error(lambda: supabase_client.log_analytics_event( |
| user_id=user_id, |
| event_type="document_generation_completed", |
| entity_id=request_id |
| )) |
| |
| print(f"[Job {request_id}] β
Job completed successfully!") |
| |
| except Exception as e: |
| |
| error_message = f"{type(e).__name__}: {str(e)}" |
| print(f"[Job {request_id}] β Job failed: {error_message}") |
| traceback.print_exc() |
| |
| retry_on_network_error(lambda: supabase_client.update_request_status( |
| request_id=request_id, |
| status="failed", |
| error_message=error_message |
| )) |
| |
| |
| retry_on_network_error(lambda: supabase_client.log_analytics_event( |
| user_id=user_id, |
| event_type="document_generation_failed", |
| entity_id=request_id |
| )) |
| |
| raise |
| finally: |
| |
| if 'assets_temp_dir' in locals() and assets_temp_dir and assets_temp_dir.exists(): |
| try: |
| shutil.rmtree(assets_temp_dir, ignore_errors=True) |
| print(f"[Job {request_id}] β Cleaned up assets directory {assets_temp_dir}") |
| except: |
| pass |
|
|
|
|
| def process_document_generation_job(request_id: str, request_data: Dict[str, Any]): |
| """ |
| Synchronous wrapper for RQ - calls the async function with asyncio.run(). |
| |
| This is the function that RQ worker calls. It runs the async version using asyncio. |
| """ |
| print(f"{'='*60}") |
| print(f"π― Worker picked up job: {request_id}") |
| print(f" User ID: {request_data.get('user_id', 'N/A')}") |
| print(f" Num documents: {request_data.get('prompt_params', {}).get('num_solutions', 'N/A')}") |
| print(f"{'='*60}") |
| |
| return asyncio.run(process_document_generation_job_async(request_id, request_data)) |
|
|