File size: 11,129 Bytes
a394be7 bcdc55d a394be7 bcdc55d a394be7 bcdc55d a394be7 bcdc55d a394be7 b70637a a394be7 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 | import os
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from colorama import Fore, Style # type: ignore[import]
from langchain.agents import create_agent
from langchain_core.tools import tool
from pydantic import BaseModel, Field
from agent.api.api import get_llm
from tavily import TavilyClient # type: ignore[import]
SUBAGENT_COLORS = [
Fore.MAGENTA,
Fore.CYAN,
Fore.GREEN,
Fore.YELLOW,
Fore.BLUE,
Fore.WHITE,
Fore.LIGHTRED_EX,
Fore.LIGHTGREEN_EX,
Fore.LIGHTYELLOW_EX,
Fore.LIGHTBLUE_EX,
Fore.LIGHTMAGENTA_EX,
Fore.LIGHTCYAN_EX,
]
MAX_CHARS = 900000
class ExpandedQueries(BaseModel):
"""A list of expanded search queries derived from the original query."""
queries: list[str] = Field(
description="A list of expanded search queries to cover different angles of the original question."
)
# ---------------------------------------------------------------------------
# Step 1: Query Expansion (structured output)
# ---------------------------------------------------------------------------
def expand_queries(origin_question: str, query: str) -> list[str]:
"""Use structured output to expand a single query into multiple search queries."""
print(f"{Fore.CYAN}[QueryExpander] Expanding: {query}{Style.RESET_ALL}")
agent = create_agent(
model=get_llm(),
response_format=ExpandedQueries,
system_prompt=(
f"Current time is: {datetime.now(timezone.utc).isoformat()}. "
"You are a search query expansion expert. "
"Given a user question, generate 3 diverse and specific search queries "
"that cover different angles of the question to maximize search coverage. "
"Each query should be concise and optimized for web search engines."
),
)
result = agent.invoke(
{
"messages": [
{
"role": "user",
"content": (
f"Original question: {origin_question}\n"
f"Query to expand: {query}"
),
}
]
}
)
expanded: ExpandedQueries = result["structured_response"]
for i, q in enumerate(expanded.queries, 1):
print(f"{Fore.CYAN} [{i}] {q}{Style.RESET_ALL}")
return expanded.queries
# ---------------------------------------------------------------------------
# Step 2: Parallel Tavily Search & Extract
# ---------------------------------------------------------------------------
def _search_single_query(query: str) -> list[dict]:
"""Search a single query via Tavily and return results with full content."""
client = TavilyClient(api_key=os.environ["TAVILY_API_KEY"])
search_response = client.search(query=query, search_depth="advanced", max_results=3)
results = search_response.get("results", [])
if not results:
return []
urls = [r["url"] for r in results]
try:
extraction = client.extract(
urls=urls, extract_depth="advanced", format="markdown"
)
extracted_map = {
item["url"]: item["raw_content"] for item in extraction.get("results", [])
}
except Exception as e:
print(f"{Fore.RED} Extraction failed: {e}{Style.RESET_ALL}")
extracted_map = {}
return [
{
"url": r["url"],
"title": r["title"],
"snippet": r["content"],
"full_content": extracted_map.get(r["url"], "Extraction failed."),
}
for r in results
]
def search_and_extract_parallel(queries: list[str]) -> list[dict]:
"""Search all expanded queries in parallel threads and deduplicate by URL."""
seen_urls: set[str] = set()
all_results: list[dict] = []
with ThreadPoolExecutor(max_workers=len(queries)) as pool:
futures = {pool.submit(_search_single_query, q): q for q in queries}
for future in as_completed(futures):
q = futures[future]
print(f"{Fore.GREEN}[Search & Extract] Done: {q}{Style.RESET_ALL}")
try:
for item in future.result():
if item["url"] not in seen_urls:
seen_urls.add(item["url"])
all_results.append(item)
except Exception as e:
print(f"{Fore.RED}[Search & Extract] Error: {e}{Style.RESET_ALL}")
print(
f"{Fore.GREEN}[SearchAgents] Collected {len(all_results)} unique pages.{Style.RESET_ALL}"
)
return all_results
# ---------------------------------------------------------------------------
# Step 3: SubAgent — investigate a single page
# ---------------------------------------------------------------------------
def subagent(origin_question: str, query: str) -> str:
"""
Investigate a single URL's full content against the original question.
The query parameter contains the URL + full page content (truncated to MAX_CHARS).
Returns the agent's findings as a string.
"""
prompt = (f"Original question: {origin_question}\n\nWeb page content:\n{query}")[
:MAX_CHARS
]
color = random.choice(SUBAGENT_COLORS)
print(f"{color}[SubAgent] Investigating ({len(prompt)} chars)...{Style.RESET_ALL}")
agent = create_agent(
model=get_llm(),
system_prompt=(
f"Current time is: {datetime.now(timezone.utc).isoformat()}. "
"You are a research analyst. You are given a web page's full content "
"and an original question. Extract ALL relevant clues, facts, data, "
"and details from the page that help answer the original question. "
"Be thorough and precise. Include specific numbers, names, and dates."
),
)
result = agent.invoke({"messages": [{"role": "user", "content": prompt}]})
content = result["messages"][-1].content
if isinstance(content, list):
content = content[0].get("text", "")
return str(content)
# ---------------------------------------------------------------------------
# Step 4: Combine all subagent findings
# ---------------------------------------------------------------------------
def combine_result_agent(origin_question: str, query: str) -> str:
"""
Combine multiple subagent findings into a single comprehensive answer.
The query parameter contains all subagent outputs joined together (truncated to MAX_CHARS).
"""
prompt = (
f"Original question: {origin_question}\n\n"
f"Research findings from multiple sources:\n{query}"
)[:MAX_CHARS]
print(
f"{Fore.BLUE}[CombineAgent] Synthesizing ({len(prompt)} chars)...{Style.RESET_ALL}"
)
agent = create_agent(
model=get_llm(),
system_prompt=(
f"Current time is: {datetime.now(timezone.utc).isoformat()}. "
"You are a research synthesizer. You receive findings from multiple "
"web sources investigating a question. Combine them into a single, "
"comprehensive, well-structured answer. Cite the source URL for each "
"key fact. Resolve any contradictions between sources."
),
)
result = agent.invoke({"messages": [{"role": "user", "content": prompt}]})
content = result["messages"][-1].content
if isinstance(content, list):
content = content[0].get("text", "")
return str(content)
# ---------------------------------------------------------------------------
# Step 5: Main orchestrator tool
# ---------------------------------------------------------------------------
@tool
def web_search_agents(origin_question: str, query: str) -> str:
"""
A multi-agent web search tool that expands the query, searches in parallel,
investigates each page with subagents, and synthesizes a final answer.
Pros:
- Dispatches multiple subagents for deep, parallel investigation across many sources.
- Can achieve both broad and deep research when queries are well-crafted.
Cons:
- Requires more detailed and transparent query descriptions for good control.
- Each subagent has no long-term memory (context is kept short to avoid token limit failures).
Use this tool for complex questions that require deep web research from multiple sources.
For simple factual lookups, prefer websearch_agent instead.
Args:
origin_question: The original user question for context. Must be detailed and clear.
query: The specific search query to research. Be as specific and transparent as possible.
"""
print(f"\n{Fore.YELLOW}{'=' * 60}")
print("[WebSearchAgents] Starting research")
print(f" Origin : {origin_question}")
print(f" Query : {query}")
print(f"{'=' * 60}{Style.RESET_ALL}\n")
# 1. Expand queries
expanded = expand_queries(origin_question, query)
# 2. Parallel Tavily search & extract
pages = search_and_extract_parallel(expanded)
if not pages:
return "No search results found."
# 3. Parallel subagent investigation
print(
f"\n{Fore.MAGENTA}[SubAgents] Dispatching {len(pages)} subagents...{Style.RESET_ALL}"
)
subagent_results: list[str] = []
def _run_subagent(page: dict) -> str:
page_input = (
f"URL: {page['url']}\nTitle: {page['title']}\n\n{page['full_content']}"
)
finding = subagent(origin_question, page_input)
return f"### Source: {page['url']}\n{finding}"
with ThreadPoolExecutor(max_workers=min(len(pages), 5)) as pool:
futures = {pool.submit(_run_subagent, p): p for p in pages}
for future in as_completed(futures):
page = futures[future]
try:
result = future.result()
subagent_results.append(result)
color = random.choice(SUBAGENT_COLORS)
print(f"{color}[SubAgent] Done: {page['url']}{Style.RESET_ALL}")
except Exception as e:
print(
f"{Fore.RED}[SubAgent] Error on {page['url']}: {e}{Style.RESET_ALL}"
)
# 4. Combine results
combined_input = "\n\n---\n\n".join(subagent_results)
result = combine_result_agent(origin_question, combined_input)
# print(f"\n{Fore.YELLOW}{'=' * 60}")
# print("[WebSearchAgents] Research complete")
# print(f"{'=' * 60}")
print(f"[WebSearchAgents -> SupervisorAgent] {result}")
print(f"{'=' * 60}{Style.RESET_ALL}\n")
return result
# ---------------------------------------------------------------------------
# Test
# ---------------------------------------------------------------------------
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv()
test_query = "What is LangGraph?"
answer = web_search_agents.invoke(
{"origin_question": test_query, "query": test_query}
)
print(f"\n{Fore.YELLOW}{'=' * 60}")
print("FINAL ANSWER")
print(f"{'=' * 60}{Style.RESET_ALL}")
print(answer)
|