Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| Test script for /generate/pdf endpoint (sync API with tracking & GDrive upload). | |
| Tests the complete flow with all features enabled: | |
| - Handwriting insertion | |
| - Visual elements (stamps, logos, figures, barcodes, photos) | |
| - OCR processing | |
| - Ground truth verification | |
| - Analysis and debug visualization | |
| - Dataset export | |
| - Google Drive upload | |
| Usage: | |
| python test_sync_pdf_api.py | |
| The script uses hardcoded tokens and polls continuously for status updates. | |
| """ | |
| import requests | |
| import time | |
| import sys | |
| import zipfile | |
| import io | |
| # Configuration | |
| BASE_URL = "http://localhost:8000" | |
| POLL_INTERVAL = 10 # seconds between status checks | |
| # Test payload with all features enabled | |
| PAYLOAD = { | |
| "user_id": 123, | |
| "google_drive_token": "ya29.a0ATkoCc5wSA3DqNSI35d2EOCfLku0NWULKJYNMPhngjTwcnEKrcNcut1vawhiErgauHc85BrZdF5pug1xzp9Zu1oWATlzIMrMo5jqKDaXWThC0GuRifayOstjOetZnRLPRxVlmjx4k_xm7rto_pN6mT1CUrnte0Qkwf7FJVtF08JzJqaCG9Vvamag4OkkOhy-LB8MsUQaCgYKAXASARISFQHGX2MiAX_4jMvIlv2OkO7WurUUVA0206", | |
| "google_drive_refresh_token": "1//03aLYGLUIYIl0CgYIARAAGAMSNwF-L9IrCfdJ-QHJHisqG86UjBvaEalyhWZdDcwbfLENt4V1ckik_wIkmsgjRwC9-SFeHrj-Yk4", | |
| "seed_images": [ | |
| "https://ocr.space/Content/Images/receipt-ocr-original.webp" | |
| ], | |
| "prompt_params": { | |
| "language": "English", | |
| "doc_type": "business and administrative", | |
| "gt_type": "Multiple questions about each document, with their answers taken **verbatim** from the document.", | |
| "gt_format": "{\"<Text of question 1>\": \"<Answer to question 1>\", \"<Text of question 2>\": \"<Answer to question 2>\", ...}", | |
| "num_solutions": 1, | |
| "enable_handwriting": True, | |
| "handwriting_ratio": 0.3, | |
| "enable_visual_elements": True, | |
| "visual_element_types": [ | |
| "stamp", | |
| "logo", | |
| "figure", | |
| "barcode", | |
| "photo" | |
| ], | |
| "seed": None, # Use None for random behavior, or set to integer for reproducibility | |
| "enable_ocr": True, | |
| "ocr_language": "en", | |
| "enable_bbox_normalization": True, | |
| "enable_gt_verification": True, | |
| "enable_analysis": True, | |
| "enable_debug_visualization": True, | |
| "enable_dataset_export": True, | |
| "dataset_export_format": "msgpack", | |
| "output_detail": "dataset" | |
| } | |
| } | |
| def test_health(): | |
| """Test API health endpoint""" | |
| print("=" * 80) | |
| print("TESTING API HEALTH") | |
| print("=" * 80) | |
| try: | |
| response = requests.get(f"{BASE_URL}/health", timeout=5) | |
| response.raise_for_status() | |
| print(f"β API is healthy: {response.json()}\n") | |
| return True | |
| except Exception as e: | |
| print(f"β Health check failed: {e}\n") | |
| return False | |
| def test_sync_endpoint(): | |
| """Test sync /generate/pdf endpoint with continuous polling""" | |
| print("=" * 80) | |
| print("TESTING SYNC /generate/pdf ENDPOINT") | |
| print("=" * 80) | |
| print("\nConfiguration:") | |
| print(f" User ID: {PAYLOAD['user_id']}") | |
| print(f" Seed Images: {len(PAYLOAD['seed_images'])}") | |
| print(f" Num Solutions: {PAYLOAD['prompt_params']['num_solutions']}") | |
| print(f" Handwriting: {PAYLOAD['prompt_params']['enable_handwriting']} (ratio: {PAYLOAD['prompt_params']['handwriting_ratio']})") | |
| print(f" Visual Elements: {PAYLOAD['prompt_params']['enable_visual_elements']} (types: {len(PAYLOAD['prompt_params']['visual_element_types'])})") | |
| print(f" OCR: {PAYLOAD['prompt_params']['enable_ocr']}") | |
| print(f" GT Verification: {PAYLOAD['prompt_params']['enable_gt_verification']}") | |
| print(f" Analysis: {PAYLOAD['prompt_params']['enable_analysis']}") | |
| print(f" Debug Viz: {PAYLOAD['prompt_params']['enable_debug_visualization']}") | |
| print(f" Dataset Export: {PAYLOAD['prompt_params']['enable_dataset_export']}") | |
| print(f" Google Drive Upload: Yes") | |
| print() | |
| try: | |
| print("β³ Calling /generate/pdf...") | |
| print(" (This will return immediately, then we'll poll for status)\n") | |
| start_time = time.time() | |
| response = requests.post( | |
| f"{BASE_URL}/generate/pdf", | |
| json=PAYLOAD, | |
| timeout=180, # 3 minutes max for initial response | |
| stream=True | |
| ) | |
| response.raise_for_status() | |
| elapsed_time = time.time() - start_time | |
| # Check response headers | |
| print(f"β Response received in {elapsed_time:.1f} seconds") | |
| print("\nResponse Headers:") | |
| request_id = response.headers.get('X-Request-ID') | |
| status_url = response.headers.get('X-Status-URL') | |
| if request_id: | |
| print(f" β X-Request-ID: {request_id}") | |
| else: | |
| print(f" β X-Request-ID: NOT SET") | |
| if status_url: | |
| print(f" β X-Status-URL: {status_url}") | |
| else: | |
| print(f" β X-Status-URL: NOT SET") | |
| # Verify ZIP file | |
| zip_data = response.content | |
| zip_size_mb = len(zip_data) / (1024 * 1024) | |
| print(f"\nβ ZIP file size: {zip_size_mb:.2f} MB") | |
| # Validate ZIP structure | |
| try: | |
| zip_buffer = io.BytesIO(zip_data) | |
| with zipfile.ZipFile(zip_buffer, 'r') as zip_file: | |
| file_list = zip_file.namelist() | |
| print(f"β ZIP contains {len(file_list)} files") | |
| # Show directory structure | |
| print("\nDataset Structure:") | |
| dirs = set() | |
| for filepath in file_list: | |
| parts = filepath.split('/') | |
| if len(parts) > 1: | |
| dirs.add(parts[0] + '/' + parts[1] if len(parts) > 2 else parts[0]) | |
| for dir_name in sorted(dirs): | |
| file_count = sum(1 for f in file_list if f.startswith(dir_name + '/') and f != dir_name + '/') | |
| if file_count > 0: | |
| print(f" π {dir_name}/ ({file_count} files)") | |
| # Check for essential files | |
| if 'docgenie_documents/metadata.json' in file_list: | |
| print("\n β metadata.json present") | |
| if 'docgenie_documents/README.md' in file_list: | |
| print(" β README.md present") | |
| except zipfile.BadZipFile as e: | |
| print(f"β Invalid ZIP file: {e}") | |
| return False | |
| # Continuous polling if we have request_id | |
| if request_id: | |
| print("\n" + "=" * 80) | |
| print("CONTINUOUS STATUS POLLING") | |
| print("=" * 80) | |
| print(f"Request ID: {request_id}") | |
| print(f"Polling every {POLL_INTERVAL} seconds...\n") | |
| poll_count = 0 | |
| last_status = None | |
| last_progress = None | |
| while True: | |
| poll_count += 1 | |
| timestamp = time.strftime("%H:%M:%S") | |
| try: | |
| status_response = requests.get( | |
| f"{BASE_URL}/jobs/{request_id}/status", | |
| timeout=10 | |
| ) | |
| status_response.raise_for_status() | |
| status_data = status_response.json() | |
| current_status = status_data.get('status') | |
| current_progress = status_data.get('progress') | |
| # Only print if status or progress changed | |
| if current_status != last_status or current_progress != last_progress: | |
| print(f"[{timestamp}] Poll #{poll_count}: {current_status.upper()}", end="") | |
| if current_progress: | |
| print(f" - {current_progress}", end="") | |
| print() | |
| last_status = current_status | |
| last_progress = current_progress | |
| # Check for terminal states | |
| if current_status == "completed": | |
| print("\n" + "=" * 80) | |
| print("β JOB COMPLETED!") | |
| print("=" * 80) | |
| results = status_data.get('results', {}) | |
| download_url = results.get('download_url') | |
| if download_url: | |
| print(f" β Google Drive URL: {download_url}") | |
| else: | |
| print(f" β³ Google Drive upload may still be in progress") | |
| if results.get('file_size_mb'): | |
| print(f" File Size: {results['file_size_mb']:.2f} MB") | |
| print(f" Document Count: {results.get('document_count', 'N/A')}") | |
| print(f" Created: {status_data.get('created_at')}") | |
| print(f" Completed: {status_data.get('updated_at')}") | |
| break | |
| elif current_status == "failed": | |
| print("\n" + "=" * 80) | |
| print("β JOB FAILED!") | |
| print("=" * 80) | |
| print(f" Error: {status_data.get('error_message', 'Unknown error')}") | |
| return False | |
| # Wait before next poll | |
| time.sleep(POLL_INTERVAL) | |
| except KeyboardInterrupt: | |
| print("\n\nβ Polling interrupted by user") | |
| print(f"You can continue polling manually:") | |
| print(f" GET {BASE_URL}/jobs/{request_id}/status") | |
| break | |
| except Exception as e: | |
| print(f"\nβ Error polling status: {e}") | |
| time.sleep(POLL_INTERVAL) | |
| print("\n" + "=" * 80) | |
| print("β TEST COMPLETED SUCCESSFULLY") | |
| print("=" * 80) | |
| print(f"β ZIP received in {elapsed_time:.1f} seconds") | |
| print(f"β ZIP size: {zip_size_mb:.2f} MB") | |
| print(f"β Dataset structure validated") | |
| print(f"β Google Drive upload tracked") | |
| return True | |
| except requests.exceptions.Timeout: | |
| print(f"β Request timed out") | |
| return False | |
| except Exception as e: | |
| print(f"β Test failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| def main(): | |
| print("\n" + "=" * 80) | |
| print(" " * 15 + "SYNC PDF API TEST - FULL FEATURE SET") | |
| print("=" * 80) | |
| print(f"Base URL: {BASE_URL}") | |
| print("=" * 80) | |
| print() | |
| # Step 1: Health check | |
| if not test_health(): | |
| print("\nβ API is not accessible. Make sure the server is running.") | |
| print(f" Expected URL: {BASE_URL}") | |
| sys.exit(1) | |
| # Step 2: Test sync endpoint | |
| success = test_sync_endpoint() | |
| # Summary | |
| print("\n" + "=" * 80) | |
| print(" " * 30 + "SUMMARY") | |
| print("=" * 80) | |
| if success: | |
| print("β ALL TESTS PASSED!") | |
| print("\nFeatures tested:") | |
| print(" β Handwriting insertion") | |
| print(" β Visual elements (5 types)") | |
| print(" β OCR processing") | |
| print(" β Ground truth verification") | |
| print(" β Analysis & debug visualization") | |
| print(" β Dataset export") | |
| print(" β Google Drive upload") | |
| print(" β Continuous status polling") | |
| else: | |
| print("β TEST FAILED") | |
| print("=" * 80) | |
| sys.exit(0 if success else 1) | |
| if __name__ == "__main__": | |
| main() | |