| | |
| | import os |
| | import asyncio |
| | import tempfile |
| | import hashlib |
| | import json |
| | import time |
| | from pathlib import Path |
| | import pdfplumber |
| | import numpy as np |
| | from uuid import uuid4 |
| | import openai |
| | import shutil |
| | from typing import List, Dict, Any, Optional |
| |
|
| | |
| | |
| | |
| | OPENAI_KEY = os.environ.get("OPENAI_API_KEY") |
| | if OPENAI_KEY is None: |
| | raise RuntimeError("Set OPENAI_API_KEY environment variable before running.") |
| |
|
| | openai.api_key = OPENAI_KEY |
| |
|
| |
|
| | def uuid4_hex(): |
| | from uuid import uuid4 |
| | return uuid4().hex |
| |
|
| | |
| | |
| | |
| | async def call_openai_chat(model: str, messages: list, temperature=0.2, max_tokens=800): |
| | """ |
| | Async wrapper for OpenAI >=1.0.0 Chat Completions |
| | """ |
| | def _call(): |
| | resp = openai.chat.completions.create( |
| | model=model, |
| | messages=messages, |
| | temperature=temperature, |
| | max_tokens=max_tokens, |
| | ) |
| | return resp.choices[0].message.content.strip() |
| | return await asyncio.to_thread(_call) |
| |
|
| | |
| | |
| | |
| | def load_pdf_text(path: str) -> str: |
| | """Extract comprehensive content from PDF using pdfplumber""" |
| | content = [] |
| | with pdfplumber.open(path) as pdf: |
| | for page_num, page in enumerate(pdf.pages, 1): |
| | page_content = [] |
| | |
| | |
| | text = page.extract_text() |
| | if text: |
| | page_content.append(f"=== PAGE {page_num} TEXT ===") |
| | page_content.append(text) |
| | |
| | |
| | tables = page.extract_tables() |
| | if tables: |
| | page_content.append(f"\n=== PAGE {page_num} TABLES ===") |
| | for table_num, table in enumerate(tables, 1): |
| | page_content.append(f"\n--- TABLE {table_num} ---") |
| | for row in table: |
| | if row: |
| | |
| | clean_row = [cell.strip() if cell else "" for cell in row] |
| | page_content.append(" | ".join(clean_row)) |
| | |
| | |
| | images = page.images |
| | if images: |
| | page_content.append(f"\n=== PAGE {page_num} IMAGES ===") |
| | for img_num, img in enumerate(images, 1): |
| | page_content.append(f"Image {img_num}: {img.get('width', 'unknown')}x{img.get('height', 'unknown')} pixels") |
| | |
| | |
| | page_content.append(f"\n=== PAGE {page_num} METADATA ===") |
| | page_content.append(f"Page size: {page.width}x{page.height}") |
| | page_content.append(f"Rotation: {page.rotation}") |
| | |
| | if page_content: |
| | content.append("\n".join(page_content)) |
| | |
| | return "\n\n".join(content) |
| |
|
| | def save_text_as_file(text: str, suffix=".txt") -> str: |
| | """Save text to a temporary file""" |
| | fp = Path(tempfile.gettempdir()) / f"analysis_{uuid4().hex}{suffix}" |
| | fp.write_text(text, encoding="utf-8") |
| | return str(fp) |
| |
|
| | def save_uploaded_file(uploaded) -> str: |
| | """ |
| | Save uploaded file to temporary location |
| | """ |
| | dst = Path(tempfile.gettempdir()) / f"upload_{uuid4().hex}.pdf" |
| | with open(dst, "wb") as f: |
| | shutil.copyfileobj(uploaded, f) |
| | return str(dst) |
| |
|
| | |
| | |
| | |
| | def chunk_text(text: str, chunk_size: int = 15000, overlap: int = 1000) -> List[str]: |
| | """ |
| | Split text into overlapping chunks for processing large documents |
| | """ |
| | if len(text) <= chunk_size: |
| | return [text] |
| | |
| | chunks = [] |
| | start = 0 |
| | |
| | while start < len(text): |
| | end = start + chunk_size |
| | |
| | |
| | if end < len(text): |
| | |
| | search_start = max(start, end - 200) |
| | sentence_end = text.rfind('.', search_start, end) |
| | if sentence_end > search_start: |
| | end = sentence_end + 1 |
| | |
| | chunk = text[start:end].strip() |
| | if chunk: |
| | chunks.append(chunk) |
| | |
| | |
| | start = end - overlap |
| | if start >= len(text): |
| | break |
| | |
| | return chunks |
| |
|
| |
|
| | def get_file_hash(file_path: str) -> str: |
| | """Generate hash for file caching""" |
| | with open(file_path, 'rb') as f: |
| | return hashlib.md5(f.read()).hexdigest() |
| |
|
| | |
| | |
| | |
| | def estimate_tokens(text: str) -> int: |
| | """Rough estimation of token count (1 token ≈ 4 characters for English)""" |
| | return len(text) // 4 |
| |
|
| | def is_within_token_limit(text: str, max_tokens: int = 6000) -> bool: |
| | """Check if text is within token limit for API calls""" |
| | return estimate_tokens(text) <= max_tokens |
| |
|
| | def truncate_to_token_limit(text: str, max_tokens: int = 6000) -> str: |
| | """Truncate text to fit within token limit""" |
| | if is_within_token_limit(text, max_tokens): |
| | return text |
| | |
| | |
| | char_limit = max_tokens * 4 |
| | return text[:char_limit] + "\n\n[Content truncated due to length...]" |
| |
|
| | |
| | |
| | |
| | async def create_hierarchical_summary(chunk_results: List[str], prompt: str, model: str, max_tokens: int = 6000) -> str: |
| | """Create a summary using hierarchical approach to avoid token limits""" |
| | |
| | |
| | intermediate_summaries = [] |
| | group_size = 3 |
| | |
| | for i in range(0, len(chunk_results), group_size): |
| | group = chunk_results[i:i + group_size] |
| | group_text = "\n\n".join(group) |
| | |
| | |
| | if not is_within_token_limit(group_text, max_tokens): |
| | group_text = truncate_to_token_limit(group_text, max_tokens) |
| | |
| | group_prompt = f"Summarize the following chunk analyses, focusing on key insights and findings:\n\n{group_text}" |
| | |
| | try: |
| | summary = await call_openai_chat( |
| | model=model, |
| | messages=[ |
| | {"role": "system", "content": "You are an expert analyst creating sophisticated summaries. Focus on:\n- Identifying strategic opportunities and competitive advantages\n- Extracting specific, actionable insights with real-world applications\n- Highlighting unique value propositions and market implications\n- Connecting insights to broader business themes and opportunities\n- Providing concrete examples and implementation considerations"}, |
| | {"role": "user", "content": group_prompt} |
| | ], |
| | temperature=0.2, |
| | max_tokens=800 |
| | ) |
| | intermediate_summaries.append(f"Group {i//group_size + 1} Summary:\n{summary}") |
| | except Exception as e: |
| | intermediate_summaries.append(f"Group {i//group_size + 1} Summary:\nError: {str(e)}") |
| | |
| | |
| | if len(intermediate_summaries) == 1: |
| | return intermediate_summaries[0] |
| | |
| | final_text = "\n\n".join(intermediate_summaries) |
| | |
| | |
| | if not is_within_token_limit(final_text, max_tokens): |
| | final_text = truncate_to_token_limit(final_text, max_tokens) |
| | |
| | final_prompt = f"Create a comprehensive final summary based on the following intermediate summaries. Original prompt: {prompt}\n\n{final_text}" |
| | |
| | try: |
| | final_summary = await call_openai_chat( |
| | model=model, |
| | messages=[ |
| | {"role": "system", "content": "You are a strategic business analyst creating comprehensive, actionable insights. Your final summary should:\n- Synthesize insights into a coherent strategic narrative\n- Prioritize opportunities by potential impact and feasibility\n- Provide specific, actionable recommendations with clear next steps\n- Include quantifiable insights where possible (market size, ROI, timelines)\n- Address implementation challenges and mitigation strategies\n- Connect all insights to create a unified strategic vision\n- Focus on what matters most for business success"}, |
| | {"role": "user", "content": final_prompt} |
| | ], |
| | temperature=0.2, |
| | max_tokens=1000 |
| | ) |
| | return final_summary |
| | except Exception as e: |
| | return f"Error creating final summary: {str(e)}\n\nIntermediate summaries:\n{final_text}" |
| |
|
| | |
| | |
| | |
| | CACHE_DIR = Path(tempfile.gettempdir()) / "pdf_analysis_cache" |
| | CACHE_DIR.mkdir(exist_ok=True) |
| |
|
| | def get_cached_analysis(file_path: str, prompt: str) -> Optional[Dict[str, Any]]: |
| | """Retrieve cached analysis if available - exact prompt match""" |
| | file_hash = get_file_hash(file_path) |
| | prompt_hash = hashlib.md5(prompt.encode()).hexdigest() |
| | cache_file = CACHE_DIR / f"{file_hash}_{prompt_hash}.json" |
| | |
| | if cache_file.exists(): |
| | try: |
| | with open(cache_file, 'r', encoding='utf-8') as f: |
| | cache_data = json.load(f) |
| | |
| | if (cache_data.get('file_hash') == file_hash and |
| | cache_data.get('prompt_hash') == prompt_hash and |
| | time.time() - cache_data.get('cached_at', 0) < 86400): |
| | return cache_data.get('analysis') |
| | except Exception: |
| | pass |
| | return None |
| |
|
| | def get_cached_document_content(file_path: str) -> Optional[str]: |
| | """Retrieve cached document content for any prompt - document-only match""" |
| | file_hash = get_file_hash(file_path) |
| | cache_file = CACHE_DIR / f"{file_hash}_content.json" |
| | |
| | if cache_file.exists(): |
| | try: |
| | with open(cache_file, 'r', encoding='utf-8') as f: |
| | cache_data = json.load(f) |
| | |
| | if (cache_data.get('file_hash') == file_hash and |
| | time.time() - cache_data.get('cached_at', 0) < 86400): |
| | return cache_data.get('content') |
| | except Exception: |
| | pass |
| | return None |
| |
|
| | def cache_analysis(file_path: str, prompt: str, analysis: Dict[str, Any]) -> None: |
| | """Cache analysis results for future use""" |
| | file_hash = get_file_hash(file_path) |
| | prompt_hash = hashlib.md5(prompt.encode()).hexdigest() |
| | cache_file = CACHE_DIR / f"{file_hash}_{prompt_hash}.json" |
| | |
| | try: |
| | cache_data = { |
| | 'file_hash': file_hash, |
| | 'prompt_hash': prompt_hash, |
| | 'analysis': analysis, |
| | 'cached_at': time.time() |
| | } |
| | with open(cache_file, 'w', encoding='utf-8') as f: |
| | json.dump(cache_data, f, ensure_ascii=False) |
| | except Exception: |
| | pass |
| |
|
| | def cache_document_content(file_path: str, content: str) -> None: |
| | """Cache document content for reuse with any prompt""" |
| | file_hash = get_file_hash(file_path) |
| | cache_file = CACHE_DIR / f"{file_hash}_content.json" |
| | |
| | try: |
| | cache_data = { |
| | 'file_hash': file_hash, |
| | 'content': content, |
| | 'cached_at': time.time() |
| | } |
| | with open(cache_file, 'w', encoding='utf-8') as f: |
| | json.dump(cache_data, f, ensure_ascii=False) |
| | except Exception: |
| | pass |
| |
|
| | def get_cached_text(file_path: str) -> Optional[str]: |
| | """Retrieve cached PDF text if available""" |
| | file_hash = get_file_hash(file_path) |
| | cache_file = CACHE_DIR / f"{file_hash}_text.json" |
| | |
| | if cache_file.exists(): |
| | try: |
| | with open(cache_file, 'r', encoding='utf-8') as f: |
| | cache_data = json.load(f) |
| | |
| | if cache_data.get('file_hash') == file_hash: |
| | return cache_data.get('text') |
| | except Exception: |
| | pass |
| | return None |
| |
|
| | def cache_text(file_path: str, text: str) -> None: |
| | """Cache PDF text for future use""" |
| | file_hash = get_file_hash(file_path) |
| | cache_file = CACHE_DIR / f"{file_hash}_text.json" |
| | |
| | try: |
| | cache_data = { |
| | 'file_hash': file_hash, |
| | 'text': text, |
| | 'cached_at': time.time() |
| | } |
| | with open(cache_file, 'w', encoding='utf-8') as f: |
| | json.dump(cache_data, f, ensure_ascii=False) |
| | except Exception: |
| | pass |
| |
|
| | def load_pdf_text_cached(path: str) -> str: |
| | """Load PDF text with caching support""" |
| | |
| | cached_text = get_cached_text(path) |
| | if cached_text: |
| | return cached_text |
| | |
| | |
| | text = load_pdf_text(path) |
| | |
| | |
| | cache_text(path, text) |
| | |
| | return text |
| |
|
| | |
| | |
| | |
| | def load_pdf_text_chunked(path: str, chunk_size: int = 15000) -> List[str]: |
| | """Load PDF text and return as chunks for large documents""" |
| | text = load_pdf_text_cached(path) |
| | return chunk_text(text, chunk_size) |
| |
|
| | def get_document_metadata(path: str) -> Dict[str, Any]: |
| | """Extract basic metadata from PDF""" |
| | try: |
| | with pdfplumber.open(path) as pdf: |
| | return { |
| | 'page_count': len(pdf.pages), |
| | 'file_size': Path(path).stat().st_size, |
| | 'extracted_at': time.time() |
| | } |
| | except Exception: |
| | return {'page_count': 0, 'file_size': 0, 'extracted_at': time.time()} |
| |
|