| import os |
| import random |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from datetime import datetime, timezone |
| from colorama import Fore, Style |
| from langchain.agents import create_agent |
| from langchain_core.tools import tool |
| from pydantic import BaseModel, Field |
|
|
| from agent.api.api import get_llm |
| from tavily import TavilyClient |
|
|
|
|
| SUBAGENT_COLORS = [ |
| Fore.MAGENTA, |
| Fore.CYAN, |
| Fore.GREEN, |
| Fore.YELLOW, |
| Fore.BLUE, |
| Fore.WHITE, |
| Fore.LIGHTRED_EX, |
| Fore.LIGHTGREEN_EX, |
| Fore.LIGHTYELLOW_EX, |
| Fore.LIGHTBLUE_EX, |
| Fore.LIGHTMAGENTA_EX, |
| Fore.LIGHTCYAN_EX, |
| ] |
|
|
| MAX_CHARS = 900000 |
|
|
|
|
| class ExpandedQueries(BaseModel): |
| """A list of expanded search queries derived from the original query.""" |
|
|
| queries: list[str] = Field( |
| description="A list of expanded search queries to cover different angles of the original question." |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def expand_queries(origin_question: str, query: str) -> list[str]: |
| """Use structured output to expand a single query into multiple search queries.""" |
| print(f"{Fore.CYAN}[QueryExpander] Expanding: {query}{Style.RESET_ALL}") |
|
|
| agent = create_agent( |
| model=get_llm(), |
| response_format=ExpandedQueries, |
| system_prompt=( |
| f"Current time is: {datetime.now(timezone.utc).isoformat()}. " |
| "You are a search query expansion expert. " |
| "Given a user question, generate 3 diverse and specific search queries " |
| "that cover different angles of the question to maximize search coverage. " |
| "Each query should be concise and optimized for web search engines." |
| ), |
| ) |
|
|
| result = agent.invoke( |
| { |
| "messages": [ |
| { |
| "role": "user", |
| "content": ( |
| f"Original question: {origin_question}\n" |
| f"Query to expand: {query}" |
| ), |
| } |
| ] |
| } |
| ) |
| expanded: ExpandedQueries = result["structured_response"] |
|
|
| for i, q in enumerate(expanded.queries, 1): |
| print(f"{Fore.CYAN} [{i}] {q}{Style.RESET_ALL}") |
|
|
| return expanded.queries |
|
|
|
|
| |
| |
| |
|
|
|
|
| def _search_single_query(query: str) -> list[dict]: |
| """Search a single query via Tavily and return results with full content.""" |
| client = TavilyClient(api_key=os.environ["TAVILY_API_KEY"]) |
|
|
| search_response = client.search(query=query, search_depth="advanced", max_results=3) |
| results = search_response.get("results", []) |
| if not results: |
| return [] |
|
|
| urls = [r["url"] for r in results] |
| try: |
| extraction = client.extract( |
| urls=urls, extract_depth="advanced", format="markdown" |
| ) |
| extracted_map = { |
| item["url"]: item["raw_content"] for item in extraction.get("results", []) |
| } |
| except Exception as e: |
| print(f"{Fore.RED} Extraction failed: {e}{Style.RESET_ALL}") |
| extracted_map = {} |
|
|
| return [ |
| { |
| "url": r["url"], |
| "title": r["title"], |
| "snippet": r["content"], |
| "full_content": extracted_map.get(r["url"], "Extraction failed."), |
| } |
| for r in results |
| ] |
|
|
|
|
| def search_and_extract_parallel(queries: list[str]) -> list[dict]: |
| """Search all expanded queries in parallel threads and deduplicate by URL.""" |
| seen_urls: set[str] = set() |
| all_results: list[dict] = [] |
|
|
| with ThreadPoolExecutor(max_workers=len(queries)) as pool: |
| futures = {pool.submit(_search_single_query, q): q for q in queries} |
| for future in as_completed(futures): |
| q = futures[future] |
| print(f"{Fore.GREEN}[Search & Extract] Done: {q}{Style.RESET_ALL}") |
| try: |
| for item in future.result(): |
| if item["url"] not in seen_urls: |
| seen_urls.add(item["url"]) |
| all_results.append(item) |
| except Exception as e: |
| print(f"{Fore.RED}[Search & Extract] Error: {e}{Style.RESET_ALL}") |
|
|
| print( |
| f"{Fore.GREEN}[SearchAgents] Collected {len(all_results)} unique pages.{Style.RESET_ALL}" |
| ) |
| return all_results |
|
|
|
|
| |
| |
| |
|
|
|
|
| def subagent(origin_question: str, query: str) -> str: |
| """ |
| Investigate a single URL's full content against the original question. |
| The query parameter contains the URL + full page content (truncated to MAX_CHARS). |
| Returns the agent's findings as a string. |
| """ |
| prompt = (f"Original question: {origin_question}\n\nWeb page content:\n{query}")[ |
| :MAX_CHARS |
| ] |
|
|
| color = random.choice(SUBAGENT_COLORS) |
| print(f"{color}[SubAgent] Investigating ({len(prompt)} chars)...{Style.RESET_ALL}") |
|
|
| agent = create_agent( |
| model=get_llm(), |
| system_prompt=( |
| f"Current time is: {datetime.now(timezone.utc).isoformat()}. " |
| "You are a research analyst. You are given a web page's full content " |
| "and an original question. Extract ALL relevant clues, facts, data, " |
| "and details from the page that help answer the original question. " |
| "Be thorough and precise. Include specific numbers, names, and dates." |
| ), |
| ) |
|
|
| result = agent.invoke({"messages": [{"role": "user", "content": prompt}]}) |
| content = result["messages"][-1].content |
| if isinstance(content, list): |
| content = content[0].get("text", "") |
| return str(content) |
|
|
|
|
| |
| |
| |
|
|
|
|
| def combine_result_agent(origin_question: str, query: str) -> str: |
| """ |
| Combine multiple subagent findings into a single comprehensive answer. |
| The query parameter contains all subagent outputs joined together (truncated to MAX_CHARS). |
| """ |
| prompt = ( |
| f"Original question: {origin_question}\n\n" |
| f"Research findings from multiple sources:\n{query}" |
| )[:MAX_CHARS] |
|
|
| print( |
| f"{Fore.BLUE}[CombineAgent] Synthesizing ({len(prompt)} chars)...{Style.RESET_ALL}" |
| ) |
|
|
| agent = create_agent( |
| model=get_llm(), |
| system_prompt=( |
| f"Current time is: {datetime.now(timezone.utc).isoformat()}. " |
| "You are a research synthesizer. You receive findings from multiple " |
| "web sources investigating a question. Combine them into a single, " |
| "comprehensive, well-structured answer. Cite the source URL for each " |
| "key fact. Resolve any contradictions between sources." |
| ), |
| ) |
|
|
| result = agent.invoke({"messages": [{"role": "user", "content": prompt}]}) |
| content = result["messages"][-1].content |
| if isinstance(content, list): |
| content = content[0].get("text", "") |
| return str(content) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @tool |
| def web_search_agents(origin_question: str, query: str) -> str: |
| """ |
| A multi-agent web search tool that expands the query, searches in parallel, |
| investigates each page with subagents, and synthesizes a final answer. |
| |
| Pros: |
| - Dispatches multiple subagents for deep, parallel investigation across many sources. |
| - Can achieve both broad and deep research when queries are well-crafted. |
| Cons: |
| - Requires more detailed and transparent query descriptions for good control. |
| - Each subagent has no long-term memory (context is kept short to avoid token limit failures). |
| |
| Use this tool for complex questions that require deep web research from multiple sources. |
| For simple factual lookups, prefer websearch_agent instead. |
| |
| Args: |
| origin_question: The original user question for context. Must be detailed and clear. |
| query: The specific search query to research. Be as specific and transparent as possible. |
| """ |
| print(f"\n{Fore.YELLOW}{'=' * 60}") |
| print("[WebSearchAgents] Starting research") |
| print(f" Origin : {origin_question}") |
| print(f" Query : {query}") |
| print(f"{'=' * 60}{Style.RESET_ALL}\n") |
|
|
| |
| expanded = expand_queries(origin_question, query) |
|
|
| |
| pages = search_and_extract_parallel(expanded) |
| if not pages: |
| return "No search results found." |
|
|
| |
| print( |
| f"\n{Fore.MAGENTA}[SubAgents] Dispatching {len(pages)} subagents...{Style.RESET_ALL}" |
| ) |
| subagent_results: list[str] = [] |
|
|
| def _run_subagent(page: dict) -> str: |
| page_input = ( |
| f"URL: {page['url']}\nTitle: {page['title']}\n\n{page['full_content']}" |
| ) |
| finding = subagent(origin_question, page_input) |
| return f"### Source: {page['url']}\n{finding}" |
|
|
| with ThreadPoolExecutor(max_workers=min(len(pages), 5)) as pool: |
| futures = {pool.submit(_run_subagent, p): p for p in pages} |
| for future in as_completed(futures): |
| page = futures[future] |
| try: |
| result = future.result() |
| subagent_results.append(result) |
| color = random.choice(SUBAGENT_COLORS) |
| print(f"{color}[SubAgent] Done: {page['url']}{Style.RESET_ALL}") |
| except Exception as e: |
| print( |
| f"{Fore.RED}[SubAgent] Error on {page['url']}: {e}{Style.RESET_ALL}" |
| ) |
|
|
| |
| combined_input = "\n\n---\n\n".join(subagent_results) |
| result = combine_result_agent(origin_question, combined_input) |
|
|
| |
| |
| |
| print(f"[WebSearchAgents -> SupervisorAgent] {result}") |
| print(f"{'=' * 60}{Style.RESET_ALL}\n") |
|
|
| return result |
|
|
|
|
| |
| |
| |
|
|
| if __name__ == "__main__": |
| from dotenv import load_dotenv |
|
|
| load_dotenv() |
|
|
| test_query = "What is LangGraph?" |
| answer = web_search_agents.invoke( |
| {"origin_question": test_query, "query": test_query} |
| ) |
| print(f"\n{Fore.YELLOW}{'=' * 60}") |
| print("FINAL ANSWER") |
| print(f"{'=' * 60}{Style.RESET_ALL}") |
| print(answer) |
|
|