""" LangChain-Compatible Tools for SPARKNET All tools follow LangChain's tool interface for seamless integration with LangGraph agents and workflows. """ from typing import Optional, List, Dict, Any from pydantic import BaseModel, Field from langchain_core.tools import StructuredTool, tool from loguru import logger import json # PDF processing try: import PyPDF2 import fitz # pymupdf PDF_AVAILABLE = True except ImportError: PDF_AVAILABLE = False logger.warning("PDF libraries not available. Install PyPDF2 and pymupdf.") # Document generation try: from reportlab.lib.pagesizes import letter from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer from reportlab.lib.styles import getSampleStyleSheet REPORTLAB_AVAILABLE = True except ImportError: REPORTLAB_AVAILABLE = False logger.warning("ReportLab not available. Install reportlab for PDF generation.") # Web search and research try: from duckduckgo_search import DDGS DDGS_AVAILABLE = True except ImportError: DDGS_AVAILABLE = False logger.warning("DuckDuckGo search not available.") try: import wikipedia WIKIPEDIA_AVAILABLE = True except ImportError: WIKIPEDIA_AVAILABLE = False logger.warning("Wikipedia not available.") try: import arxiv ARXIV_AVAILABLE = True except ImportError: ARXIV_AVAILABLE = False logger.warning("Arxiv not available.") # GPU monitoring from ..utils.gpu_manager import get_gpu_manager # ============================================================================ # Pydantic Input Schemas # ============================================================================ class PDFExtractorInput(BaseModel): """Input schema for PDF extraction.""" file_path: str = Field(..., description="Path to the PDF file") page_range: Optional[str] = Field(None, description="Page range (e.g., '1-5', 'all')") extract_metadata: bool = Field(True, description="Extract PDF metadata") class PatentParserInput(BaseModel): """Input schema for patent parsing.""" text: str = Field(..., description="Patent text to parse") extract_claims: bool = Field(True, description="Extract patent claims") extract_abstract: bool = Field(True, description="Extract abstract") extract_description: bool = Field(True, description="Extract description") class WebSearchInput(BaseModel): """Input schema for web search.""" query: str = Field(..., description="Search query") max_results: int = Field(5, description="Maximum number of results") region: str = Field("wt-wt", description="Search region (e.g., 'us-en', 'wt-wt')") class WikipediaInput(BaseModel): """Input schema for Wikipedia lookup.""" query: str = Field(..., description="Wikipedia search query") sentences: int = Field(3, description="Number of sentences to return") class ArxivInput(BaseModel): """Input schema for Arxiv search.""" query: str = Field(..., description="Search query") max_results: int = Field(5, description="Maximum number of results") sort_by: str = Field("relevance", description="Sort by: relevance, lastUpdatedDate, submittedDate") class DocumentGeneratorInput(BaseModel): """Input schema for document generation.""" output_path: str = Field(..., description="Output PDF file path") title: str = Field(..., description="Document title") content: str = Field(..., description="Document content (markdown or plain text)") author: Optional[str] = Field(None, description="Document author") class GPUMonitorInput(BaseModel): """Input schema for GPU monitoring.""" gpu_id: Optional[int] = Field(None, description="Specific GPU ID or None for all GPUs") # ============================================================================ # PDF Tools # ============================================================================ def pdf_extractor_func(file_path: str, page_range: Optional[str] = None, extract_metadata: bool = True) -> str: """ Extract text and metadata from PDF files. Supports both PyPDF2 and PyMuPDF (fitz) backends. Args: file_path: Path to PDF file page_range: Page range like '1-5' or 'all' (default: all) extract_metadata: Whether to extract metadata Returns: Extracted text and metadata as formatted string """ if not PDF_AVAILABLE: return "Error: PDF libraries not installed. Run: pip install PyPDF2 pymupdf" try: # Open PDF with PyMuPDF (better text extraction) doc = fitz.open(file_path) # Parse page range if page_range and page_range.lower() != 'all': start, end = map(int, page_range.split('-')) pages = range(start - 1, min(end, len(doc))) # 0-indexed else: pages = range(len(doc)) # Extract text text_parts = [] for page_num in pages: page = doc[page_num] text_parts.append(f"--- Page {page_num + 1} ---\n{page.get_text()}") extracted_text = "\n\n".join(text_parts) # Extract metadata result = f"PDF: {file_path}\n" result += f"Total Pages: {len(doc)}\n" result += f"Extracted Pages: {len(pages)}\n\n" if extract_metadata: metadata = doc.metadata result += "Metadata:\n" for key, value in metadata.items(): if value: result += f" {key}: {value}\n" result += "\n" result += "=" * 80 + "\n" result += "EXTRACTED TEXT:\n" result += "=" * 80 + "\n" result += extracted_text doc.close() logger.info(f"Extracted {len(pages)} pages from {file_path}") return result except Exception as e: logger.error(f"PDF extraction failed: {e}") return f"Error extracting PDF: {str(e)}" def patent_parser_func(text: str, extract_claims: bool = True, extract_abstract: bool = True, extract_description: bool = True) -> str: """ Parse patent document structure and extract key sections. Uses heuristics to identify: abstract, claims, description, drawings. Args: text: Patent text (from PDF or plain text) extract_claims: Extract patent claims extract_abstract: Extract abstract extract_description: Extract detailed description Returns: Structured patent information as JSON string """ try: result = { "abstract": "", "claims": [], "description": "", "metadata": {} } lines = text.split('\n') current_section = None # Simple heuristic-based parser for i, line in enumerate(lines): line_lower = line.lower().strip() # Detect sections if 'abstract' in line_lower and len(line_lower) < 50: current_section = 'abstract' continue elif 'claim' in line_lower and len(line_lower) < 50: current_section = 'claims' continue elif 'description' in line_lower or 'detailed description' in line_lower: if len(line_lower) < 100: current_section = 'description' continue elif 'drawing' in line_lower or 'figure' in line_lower: if len(line_lower) < 50: current_section = 'drawings' continue # Extract content based on section if current_section == 'abstract' and extract_abstract: if line.strip(): result['abstract'] += line + "\n" elif current_section == 'claims' and extract_claims: if line.strip() and (line.strip()[0].isdigit() or 'wherein' in line_lower): result['claims'].append(line.strip()) elif current_section == 'description' and extract_description: if line.strip(): result['description'] += line + "\n" # Extract patent number if present for line in lines[:20]: # Check first 20 lines if 'patent' in line.lower() and any(char.isdigit() for char in line): result['metadata']['patent_number'] = line.strip() break # Format output output = "PATENT ANALYSIS\n" output += "=" * 80 + "\n\n" if result['abstract']: output += "ABSTRACT:\n" output += result['abstract'].strip()[:500] # Limit length output += "\n\n" if result['claims']: output += f"CLAIMS ({len(result['claims'])} found):\n" for i, claim in enumerate(result['claims'][:10], 1): # First 10 claims output += f"\n{i}. {claim}\n" output += "\n" if result['description']: output += "DESCRIPTION (excerpt):\n" output += result['description'].strip()[:1000] # First 1000 chars output += "\n\n" output += "=" * 80 + "\n" output += f"JSON OUTPUT:\n{json.dumps(result, indent=2)}" logger.info(f"Parsed patent: {len(result['claims'])} claims extracted") return output except Exception as e: logger.error(f"Patent parsing failed: {e}") return f"Error parsing patent: {str(e)}" # ============================================================================ # Web Search & Research Tools # ============================================================================ def web_search_func(query: str, max_results: int = 5, region: str = "wt-wt") -> str: """ Search the web using DuckDuckGo. Returns top results with title, snippet, and URL. Args: query: Search query max_results: Maximum number of results region: Search region code Returns: Formatted search results """ if not DDGS_AVAILABLE: return "Error: DuckDuckGo search not installed. Run: pip install duckduckgo-search" try: ddgs = DDGS() results = list(ddgs.text(query, region=region, max_results=max_results)) if not results: return f"No results found for: {query}" output = f"WEB SEARCH RESULTS: {query}\n" output += "=" * 80 + "\n\n" for i, result in enumerate(results, 1): output += f"{i}. {result.get('title', 'No title')}\n" output += f" {result.get('body', 'No description')}\n" output += f" URL: {result.get('href', 'No URL')}\n\n" logger.info(f"Web search completed: {len(results)} results for '{query}'") return output except Exception as e: logger.error(f"Web search failed: {e}") return f"Error performing web search: {str(e)}" def wikipedia_func(query: str, sentences: int = 3) -> str: """ Search Wikipedia and return summary. Args: query: Wikipedia search query sentences: Number of sentences to return Returns: Wikipedia summary """ if not WIKIPEDIA_AVAILABLE: return "Error: Wikipedia not installed. Run: pip install wikipedia" try: # Search for page search_results = wikipedia.search(query) if not search_results: return f"No Wikipedia page found for: {query}" # Get first result page = wikipedia.page(search_results[0], auto_suggest=False) # Get summary summary = wikipedia.summary(search_results[0], sentences=sentences, auto_suggest=False) output = f"WIKIPEDIA: {page.title}\n" output += "=" * 80 + "\n\n" output += summary + "\n\n" output += f"URL: {page.url}\n" output += f"Categories: {', '.join(page.categories[:5])}\n" logger.info(f"Wikipedia lookup completed: {page.title}") return output except wikipedia.exceptions.DisambiguationError as e: options = ', '.join(e.options[:5]) return f"Disambiguation needed for '{query}'. Options: {options}" except wikipedia.exceptions.PageError: return f"No Wikipedia page found for: {query}" except Exception as e: logger.error(f"Wikipedia lookup failed: {e}") return f"Error: {str(e)}" def arxiv_func(query: str, max_results: int = 5, sort_by: str = "relevance") -> str: """ Search Arxiv for academic papers. Args: query: Search query max_results: Maximum number of results sort_by: Sort by relevance, lastUpdatedDate, or submittedDate Returns: Formatted Arxiv results """ if not ARXIV_AVAILABLE: return "Error: Arxiv not installed. Run: pip install arxiv" try: # Map sort_by to arxiv.SortCriterion sort_map = { "relevance": arxiv.SortCriterion.Relevance, "lastUpdatedDate": arxiv.SortCriterion.LastUpdatedDate, "submittedDate": arxiv.SortCriterion.SubmittedDate, } sort_criterion = sort_map.get(sort_by, arxiv.SortCriterion.Relevance) # Search Arxiv search = arxiv.Search( query=query, max_results=max_results, sort_by=sort_criterion ) results = list(search.results()) if not results: return f"No Arxiv papers found for: {query}" output = f"ARXIV SEARCH: {query}\n" output += "=" * 80 + "\n\n" for i, paper in enumerate(results, 1): output += f"{i}. {paper.title}\n" output += f" Authors: {', '.join(str(author) for author in paper.authors[:3])}\n" output += f" Published: {paper.published.strftime('%Y-%m-%d')}\n" output += f" Summary: {paper.summary[:200]}...\n" output += f" PDF: {paper.pdf_url}\n" output += f" Categories: {', '.join(paper.categories)}\n\n" logger.info(f"Arxiv search completed: {len(results)} papers for '{query}'") return output except Exception as e: logger.error(f"Arxiv search failed: {e}") return f"Error searching Arxiv: {str(e)}" # ============================================================================ # Document Generation # ============================================================================ def document_generator_func(output_path: str, title: str, content: str, author: Optional[str] = None) -> str: """ Generate PDF document from text content. Supports basic formatting and styling. Args: output_path: Output PDF file path title: Document title content: Document content (plain text or simple markdown) author: Optional author name Returns: Success message with file path """ if not REPORTLAB_AVAILABLE: return "Error: ReportLab not installed. Run: pip install reportlab" try: # Create PDF doc = SimpleDocTemplate(output_path, pagesize=letter) styles = getSampleStyleSheet() story = [] # Title title_style = styles['Title'] story.append(Paragraph(title, title_style)) story.append(Spacer(1, 12)) # Author if author: author_style = styles['Normal'] story.append(Paragraph(f"By: {author}", author_style)) story.append(Spacer(1, 12)) # Content (split into paragraphs) paragraphs = content.split('\n\n') for para in paragraphs: if para.strip(): # Simple markdown-like processing if para.strip().startswith('#'): # Heading heading_text = para.strip().lstrip('#').strip() story.append(Paragraph(heading_text, styles['Heading2'])) else: # Regular paragraph story.append(Paragraph(para.strip(), styles['Normal'])) story.append(Spacer(1, 6)) # Build PDF doc.build(story) logger.info(f"Generated PDF: {output_path}") return f"Successfully generated PDF: {output_path}\nTitle: {title}\nPages: {len(paragraphs)}" except Exception as e: logger.error(f"PDF generation failed: {e}") return f"Error generating PDF: {str(e)}" # ============================================================================ # GPU Monitoring (converted from existing tool) # ============================================================================ def gpu_monitor_func(gpu_id: Optional[int] = None) -> str: """ Monitor GPU status, memory usage, and utilization. Args: gpu_id: Specific GPU ID or None for all GPUs Returns: Formatted GPU status information """ try: gpu_manager = get_gpu_manager() if gpu_id is not None: # Monitor specific GPU info = gpu_manager.get_gpu_info(gpu_id) if "error" in info: return f"Error: {info['error']}" output = f"GPU {info['gpu_id']}: {info['name']}\n" output += f"Memory: {info['memory_used'] / 1024**3:.2f} GB / {info['memory_total'] / 1024**3:.2f} GB " output += f"({info['memory_percent']:.1f}% used)\n" output += f"Free Memory: {info['memory_free'] / 1024**3:.2f} GB\n" output += f"GPU Utilization: {info['gpu_utilization']}%\n" output += f"Temperature: {info['temperature']}°C\n" return output else: # Monitor all GPUs return gpu_manager.monitor() except Exception as e: logger.error(f"GPU monitoring error: {e}") return f"Error monitoring GPU: {str(e)}" # ============================================================================ # Create LangChain Tools # ============================================================================ # Use StructuredTool for tools with Pydantic input schemas pdf_extractor_tool = StructuredTool.from_function( func=pdf_extractor_func, name="pdf_extractor", description=( "Extract text and metadata from PDF files. " "Useful for analyzing patent documents, research papers, and legal documents. " "Supports page range selection and metadata extraction." ), args_schema=PDFExtractorInput, return_direct=False, ) patent_parser_tool = StructuredTool.from_function( func=patent_parser_func, name="patent_parser", description=( "Parse patent document structure and extract key sections: abstract, claims, description. " "Useful for analyzing patent documents and identifying key innovations." ), args_schema=PatentParserInput, return_direct=False, ) web_search_tool = StructuredTool.from_function( func=web_search_func, name="web_search", description=( "Search the web using DuckDuckGo. Returns top results with titles, snippets, and URLs. " "Useful for market research, competitor analysis, and finding relevant information." ), args_schema=WebSearchInput, return_direct=False, ) wikipedia_tool = StructuredTool.from_function( func=wikipedia_func, name="wikipedia", description=( "Search Wikipedia and get article summaries. " "Useful for background information on technologies, companies, and concepts." ), args_schema=WikipediaInput, return_direct=False, ) arxiv_tool = StructuredTool.from_function( func=arxiv_func, name="arxiv_search", description=( "Search Arxiv for academic papers and preprints. " "Useful for finding relevant research, state-of-the-art methods, and technical background." ), args_schema=ArxivInput, return_direct=False, ) document_generator_tool = StructuredTool.from_function( func=document_generator_func, name="document_generator", description=( "Generate PDF documents from text content. " "Useful for creating reports, briefs, and documentation." ), args_schema=DocumentGeneratorInput, return_direct=False, ) gpu_monitor_tool = StructuredTool.from_function( func=gpu_monitor_func, name="gpu_monitor", description=( "Monitor GPU status including memory usage, utilization, and temperature. " "Useful for checking GPU availability before running models." ), args_schema=GPUMonitorInput, return_direct=False, ) # ============================================================================ # Tool Registry for VISTA Scenarios # ============================================================================ class VISTAToolRegistry: """ Registry of tools organized by VISTA scenario. Enables scenario-specific tool selection for optimal performance. """ SCENARIO_TOOLS = { "patent_wakeup": [ pdf_extractor_tool, patent_parser_tool, web_search_tool, wikipedia_tool, arxiv_tool, document_generator_tool, ], "agreement_safety": [ pdf_extractor_tool, web_search_tool, document_generator_tool, ], "partner_matching": [ web_search_tool, wikipedia_tool, arxiv_tool, ], "general": [ pdf_extractor_tool, patent_parser_tool, web_search_tool, wikipedia_tool, arxiv_tool, document_generator_tool, gpu_monitor_tool, ], } @classmethod def get_tools(cls, scenario: str = "general") -> List[StructuredTool]: """ Get tools for a specific VISTA scenario. Args: scenario: VISTA scenario type Returns: List of LangChain tools """ tools = cls.SCENARIO_TOOLS.get(scenario, cls.SCENARIO_TOOLS["general"]) logger.info(f"Retrieved {len(tools)} tools for scenario: {scenario}") return tools @classmethod def get_all_tools(cls) -> List[StructuredTool]: """Get all available tools.""" return cls.SCENARIO_TOOLS["general"] @classmethod def list_scenarios(cls) -> List[str]: """List available scenarios.""" return list(cls.SCENARIO_TOOLS.keys()) # ============================================================================ # Convenience Functions # ============================================================================ def get_vista_tools(scenario: str = "general") -> List[StructuredTool]: """ Get LangChain tools for a VISTA scenario. Args: scenario: Scenario name (patent_wakeup, agreement_safety, partner_matching, general) Returns: List of LangChain StructuredTool instances """ return VISTAToolRegistry.get_tools(scenario) def get_all_tools() -> List[StructuredTool]: """Get all available LangChain tools.""" return VISTAToolRegistry.get_all_tools() # Export all tools __all__ = [ "pdf_extractor_tool", "patent_parser_tool", "web_search_tool", "wikipedia_tool", "arxiv_tool", "document_generator_tool", "gpu_monitor_tool", "VISTAToolRegistry", "get_vista_tools", "get_all_tools", ]