| | import json |
| | import os |
| | import re |
| | import time |
| |
|
| | import yaml |
| | from duckduckgo_search.exceptions import DuckDuckGoSearchException |
| | from smolagents import FinalAnswerTool, Tool, OpenAIServerModel, CodeAgent |
| |
|
| |
|
| | python_interpreter_max_print_outputs_length = 10**6 |
| |
|
| |
|
| | class CustomDuckDuckGoSearchTool(Tool): |
| | name = "web_search" |
| | description = """Performs a duckduckgo web search based on your query (think a Google search) then returns the top search results.""" |
| | inputs = {"query": {"type": "string", "description": "The search query to perform."}} |
| | output_type = "string" |
| |
|
| | def __init__(self, max_results=10, **kwargs): |
| | super().__init__() |
| | self.max_results = max_results |
| | try: |
| | from duckduckgo_search import DDGS |
| | except ImportError as e: |
| | raise ImportError( |
| | "You must install package `duckduckgo_search` to run this tool: for instance run `pip install duckduckgo-search`." |
| | ) from e |
| | self.ddgs = DDGS(**kwargs) |
| |
|
| | def forward(self, query: str) -> str: |
| | num_tries = 5 |
| | for cnt in range(num_tries): |
| | try: |
| | results = self.ddgs.text(query, max_results=self.max_results) |
| | break |
| | except DuckDuckGoSearchException as e: |
| | print(e) |
| | if cnt == num_tries - 1: |
| | raise |
| | time.sleep(1.5) |
| |
|
| | if len(results) == 0: |
| | raise Exception("No results found! Try a less restrictive/shorter query.") |
| | postprocessed_results = [f"[{result['title']}]({result['href']})\n{result['body']}" for result in results] |
| | return "## Search Results\n\n" + "\n\n".join(postprocessed_results) |
| |
|
| |
|
| | class CustomVisitWebpageTool(Tool): |
| | name = "visit_webpage" |
| | description = ( |
| | "Visits a webpage at the given url and reads its content as a markdown string. Use this to browse webpages." |
| | ) |
| | inputs = { |
| | "url": { |
| | "type": "string", |
| | "description": "The url of the webpage to visit.", |
| | } |
| | } |
| | output_type = "string" |
| |
|
| | def forward(self, url: str) -> str: |
| | try: |
| | import requests |
| | from markdownify import markdownify |
| | from requests.exceptions import RequestException |
| |
|
| | from smolagents.utils import truncate_content |
| | except ImportError as e: |
| | raise ImportError( |
| | "You must install packages `markdownify` and `requests` to run this tool: for instance run `pip install markdownify requests`." |
| | ) from e |
| | try: |
| | |
| | response = requests.get(url, timeout=20) |
| | response.raise_for_status() |
| |
|
| | |
| | markdown_content = markdownify(response.text).strip() |
| |
|
| | |
| | markdown_content = re.sub(r"\n{3,}", "\n\n", markdown_content) |
| |
|
| | return truncate_content(markdown_content, python_interpreter_max_print_outputs_length) |
| |
|
| | except requests.exceptions.Timeout: |
| | return "The request timed out. Please try again later or check the URL." |
| | except RequestException as e: |
| | return f"Error fetching the webpage: {str(e)}" |
| | except Exception as e: |
| | return f"An unexpected error occurred: {str(e)}" |
| |
|
| |
|
| | class SmolAgent: |
| | def __init__(self, openai_api_key=None): |
| | final_answer = FinalAnswerTool() |
| | search_tool = CustomDuckDuckGoSearchTool(max_results=3) |
| | visit_webpage_tool = CustomVisitWebpageTool() |
| | model = OpenAIServerModel( |
| | model_id="gpt-4.1-2025-04-14", |
| | |
| | |
| | max_completion_tokens=1024, |
| | temperature=0.01, |
| | api_key=openai_api_key, |
| | ) |
| | with open('prompt_templates.yaml', 'r') as f: |
| | prompt_templates = yaml.safe_load(f) |
| | with open('system_prompt.txt', 'r') as f: |
| | prompt_templates['system_prompt'] = f.read() |
| | self.agent = CodeAgent( |
| | model=model, |
| | prompt_templates=prompt_templates, |
| | tools=[search_tool, visit_webpage_tool, final_answer], |
| | max_steps=10, |
| | verbosity_level=100, |
| | grammar=None, |
| | planning_interval=None, |
| | name='Advanced GAIA Agent', |
| | description=None, |
| | max_print_outputs_length=python_interpreter_max_print_outputs_length, |
| | ) |
| | self.agent.visualize() |
| |
|
| | def run(self, task: dict[str, str]) -> str: |
| | if len(task.get('file_name')) != 0: |
| | return '' |
| |
|
| | question = task.get('question') |
| | if question.find('www.youtube.com') != -1: |
| | return '' |
| |
|
| | return self.agent.run(question) |
| |
|
| |
|
| | if __name__ == '__main__': |
| | openai_key = os.getenv('OPENAI_API_KEY') |
| | if not openai_key: |
| | with open("data/openai.key", "r") as f: |
| | openai_key = f.read().strip() |
| |
|
| | agent = SmolAgent(openai_api_key=openai_key) |
| |
|
| | with open('data/questions.json', 'r') as f: |
| | questions = json.load(f) |
| |
|
| | for q in questions: |
| | print('\n===') |
| | print(q) |
| | print('\n---') |
| | a = agent.run(q) |
| | print('\n---') |
| | print(a) |
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|