| """Simple synchronous tools for LlamaIndex ReActAgent.""" |
|
|
| import os |
| import time |
| import hashlib |
| import json |
| from typing import Optional |
| from datetime import datetime |
| from tavily import TavilyClient |
| from llama_index.core.tools import FunctionTool |
|
|
| |
| _workflow_state = { |
| "research_notes": {}, |
| "report_content": "Not written yet.", |
| "review": "Review required.", |
| "structured_report": None |
| } |
|
|
| |
| _tool_call_cache = {} |
| _cache_timeout = 30 |
|
|
| def _generate_call_hash(tool_name: str, **kwargs) -> str: |
| """Generate a hash for tool call deduplication.""" |
| call_data = {"tool": tool_name, "args": kwargs} |
| call_str = json.dumps(call_data, sort_keys=True) |
| return hashlib.md5(call_str.encode()).hexdigest() |
|
|
| def _should_execute_call(tool_name: str, **kwargs) -> bool: |
| """Check if a tool call should be executed or if it's a duplicate.""" |
| current_time = time.time() |
| call_hash = _generate_call_hash(tool_name, **kwargs) |
| |
| |
| expired_keys = [k for k, v in _tool_call_cache.items() if current_time - v > _cache_timeout] |
| for key in expired_keys: |
| del _tool_call_cache[key] |
| |
| |
| if call_hash in _tool_call_cache: |
| return False |
| |
| |
| _tool_call_cache[call_hash] = current_time |
| return True |
|
|
| def search_web(query: str) -> str: |
| """Search the web for information on a given query.""" |
| try: |
| print(f"DEBUG: search_web called with query: '{query}'") |
| |
| |
| if not _should_execute_call("search_web", query=query): |
| return f"Duplicate search call detected for query: '{query}'. Skipping to avoid redundant API calls." |
| |
| |
| client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY")) |
| result = client.search(query) |
| |
| print(f"DEBUG: search_web executed successfully for query: '{query}'") |
| return str(result) |
| except Exception as e: |
| error_msg = f"Search failed: {str(e)}" |
| print(f"ERROR: search_web failed: {e}") |
| return error_msg |
|
|
| def record_notes(notes: str, notes_title: str) -> str: |
| """Record notes on a given topic with a title.""" |
| try: |
| print(f"DEBUG: record_notes called with title: '{notes_title}', notes length: {len(notes)}") |
| |
| |
| if not _should_execute_call("record_notes", notes=notes, notes_title=notes_title): |
| return f"Duplicate notes recording detected for title: '{notes_title}'. Skipping to avoid redundant recording." |
| |
| |
| _workflow_state["research_notes"][notes_title] = notes |
| |
| print(f"DEBUG: Notes stored. Total research notes: {len(_workflow_state['research_notes'])}") |
| return f"Notes recorded successfully with title: '{notes_title}'. Total notes: {len(_workflow_state['research_notes'])}" |
| except Exception as e: |
| error_msg = f"Failed to record notes: {str(e)}" |
| print(f"ERROR: record_notes failed: {e}") |
| return error_msg |
|
|
| def write_report(report_content: str, title: str = "Research Report") -> str: |
| """Write a structured report with the given content and title.""" |
| try: |
| print(f"DEBUG: write_report FUNCTION ENTERED with title: '{title}', content length: {len(report_content)}") |
| print(f"DEBUG: Function arguments - report_content type: {type(report_content)}, title type: {type(title)}") |
| |
| |
| if not _should_execute_call("write_report", report_content=report_content, title=title): |
| print("DEBUG: Duplicate call detected, returning early") |
| return "Duplicate report writing detected. Skipping to avoid redundant report generation." |
| |
| print("DEBUG: Processing report content...") |
| |
| |
| import re |
| sections = re.findall(r'^#{1,3}\s+(.+)$', report_content, re.MULTILINE) |
| print(f"DEBUG: Found {len(sections)} sections: {sections}") |
| |
| |
| word_count = len(report_content.split()) |
| print(f"DEBUG: Word count: {word_count}") |
| |
| |
| lines = report_content.split('\n') |
| abstract = "" |
| for line in lines: |
| if line.strip() and not line.startswith('#'): |
| abstract = line.strip() |
| break |
| print(f"DEBUG: Abstract: {abstract[:100]}...") |
| |
| |
| structured_report = { |
| "title": title, |
| "abstract": abstract[:200] + "..." if len(abstract) > 200 else abstract, |
| "content": report_content, |
| "sections": sections, |
| "word_count": word_count, |
| "generated_at": datetime.now().isoformat(), |
| "sources_used": list(_workflow_state["research_notes"].keys()) |
| } |
| print("DEBUG: Structured report created") |
| |
| |
| print("DEBUG: Storing in global state...") |
| _workflow_state["report_content"] = report_content |
| _workflow_state["structured_report"] = structured_report |
| |
| print(f"DEBUG: Report stored successfully. Word count: {word_count}, Sections: {len(sections)}") |
| print(f"DEBUG: State keys now: {list(_workflow_state.keys())}") |
| print(f"DEBUG: State report_content length: {len(_workflow_state['report_content'])}") |
| |
| result = f"Report written successfully! Title: '{title}', Word count: {word_count}, Sections: {len(sections)}" |
| print(f"DEBUG: Returning result: {result}") |
| return result |
| except Exception as e: |
| error_msg = f"Failed to write report: {str(e)}" |
| print(f"ERROR: write_report failed: {e}") |
| import traceback |
| traceback.print_exc() |
| return error_msg |
|
|
| def review_report(review: str) -> str: |
| """Review a report and provide feedback.""" |
| try: |
| print(f"DEBUG: review_report called with review: '{review[:100]}...'") |
| |
| |
| if not _should_execute_call("review_report", review=review): |
| return "Duplicate review detected. Skipping to avoid redundant review submission." |
| |
| |
| _workflow_state["review"] = review |
| |
| print(f"DEBUG: Review stored successfully") |
| return f"Report reviewed successfully. Review: {review[:100]}{'...' if len(review) > 100 else ''}" |
| except Exception as e: |
| error_msg = f"Failed to review report: {str(e)}" |
| print(f"ERROR: review_report failed: {e}") |
| return error_msg |
|
|
| def get_workflow_state() -> dict: |
| """Get the current workflow state.""" |
| return _workflow_state.copy() |
|
|
| def reset_workflow_state(): |
| """Reset the workflow state.""" |
| global _workflow_state |
| _workflow_state = { |
| "research_notes": {}, |
| "report_content": "Not written yet.", |
| "review": "Review required.", |
| "structured_report": None |
| } |
|
|
| |
| search_web_tool = FunctionTool.from_defaults( |
| fn=search_web, |
| name="search_web", |
| description=( |
| "Search the web for information on any topic. " |
| "Input: A search query string. " |
| "Output: Search results containing relevant information. " |
| "Use this to gather facts and information about your research topic." |
| ), |
| ) |
|
|
| record_notes_tool = FunctionTool.from_defaults( |
| fn=record_notes, |
| name="record_notes", |
| description=( |
| "Record research notes with a descriptive title. " |
| "Input: notes (string) - the content to save, notes_title (string) - a title for the notes. " |
| "Output: Confirmation that notes were saved. " |
| "Use this after searching to save important information you found." |
| ), |
| ) |
|
|
| write_report_tool = FunctionTool.from_defaults( |
| fn=write_report, |
| name="write_report", |
| description=( |
| "Write a comprehensive markdown report. " |
| "Input: report_content (string) - full markdown report content, title (string, optional) - report title. " |
| "Output: Confirmation that report was written. " |
| "The report_content should be well-structured markdown with headers, sections, and detailed content." |
| ), |
| ) |
|
|
| review_report_tool = FunctionTool.from_defaults( |
| fn=review_report, |
| name="review_report", |
| description=( |
| "Review a written report and provide feedback. " |
| "Input: review (string) - your review and feedback on the report. " |
| "Output: Confirmation that review was recorded. " |
| "Start with 'APPROVED:' if the report is satisfactory, otherwise provide specific improvement suggestions." |
| ), |
| ) |