Final_Assignment_Template / agent /agents /websearchagents.py
AlexTrinityBlock's picture
refactor(api): add LLM abstraction layer to replace hardcoded model strings
bcdc55d
import os
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from colorama import Fore, Style # type: ignore[import]
from langchain.agents import create_agent
from langchain_core.tools import tool
from pydantic import BaseModel, Field
from agent.api.api import get_llm
from tavily import TavilyClient # type: ignore[import]
SUBAGENT_COLORS = [
Fore.MAGENTA,
Fore.CYAN,
Fore.GREEN,
Fore.YELLOW,
Fore.BLUE,
Fore.WHITE,
Fore.LIGHTRED_EX,
Fore.LIGHTGREEN_EX,
Fore.LIGHTYELLOW_EX,
Fore.LIGHTBLUE_EX,
Fore.LIGHTMAGENTA_EX,
Fore.LIGHTCYAN_EX,
]
MAX_CHARS = 900000
class ExpandedQueries(BaseModel):
"""A list of expanded search queries derived from the original query."""
queries: list[str] = Field(
description="A list of expanded search queries to cover different angles of the original question."
)
# ---------------------------------------------------------------------------
# Step 1: Query Expansion (structured output)
# ---------------------------------------------------------------------------
def expand_queries(origin_question: str, query: str) -> list[str]:
"""Use structured output to expand a single query into multiple search queries."""
print(f"{Fore.CYAN}[QueryExpander] Expanding: {query}{Style.RESET_ALL}")
agent = create_agent(
model=get_llm(),
response_format=ExpandedQueries,
system_prompt=(
f"Current time is: {datetime.now(timezone.utc).isoformat()}. "
"You are a search query expansion expert. "
"Given a user question, generate 3 diverse and specific search queries "
"that cover different angles of the question to maximize search coverage. "
"Each query should be concise and optimized for web search engines."
),
)
result = agent.invoke(
{
"messages": [
{
"role": "user",
"content": (
f"Original question: {origin_question}\n"
f"Query to expand: {query}"
),
}
]
}
)
expanded: ExpandedQueries = result["structured_response"]
for i, q in enumerate(expanded.queries, 1):
print(f"{Fore.CYAN} [{i}] {q}{Style.RESET_ALL}")
return expanded.queries
# ---------------------------------------------------------------------------
# Step 2: Parallel Tavily Search & Extract
# ---------------------------------------------------------------------------
def _search_single_query(query: str) -> list[dict]:
"""Search a single query via Tavily and return results with full content."""
client = TavilyClient(api_key=os.environ["TAVILY_API_KEY"])
search_response = client.search(query=query, search_depth="advanced", max_results=3)
results = search_response.get("results", [])
if not results:
return []
urls = [r["url"] for r in results]
try:
extraction = client.extract(
urls=urls, extract_depth="advanced", format="markdown"
)
extracted_map = {
item["url"]: item["raw_content"] for item in extraction.get("results", [])
}
except Exception as e:
print(f"{Fore.RED} Extraction failed: {e}{Style.RESET_ALL}")
extracted_map = {}
return [
{
"url": r["url"],
"title": r["title"],
"snippet": r["content"],
"full_content": extracted_map.get(r["url"], "Extraction failed."),
}
for r in results
]
def search_and_extract_parallel(queries: list[str]) -> list[dict]:
"""Search all expanded queries in parallel threads and deduplicate by URL."""
seen_urls: set[str] = set()
all_results: list[dict] = []
with ThreadPoolExecutor(max_workers=len(queries)) as pool:
futures = {pool.submit(_search_single_query, q): q for q in queries}
for future in as_completed(futures):
q = futures[future]
print(f"{Fore.GREEN}[Search & Extract] Done: {q}{Style.RESET_ALL}")
try:
for item in future.result():
if item["url"] not in seen_urls:
seen_urls.add(item["url"])
all_results.append(item)
except Exception as e:
print(f"{Fore.RED}[Search & Extract] Error: {e}{Style.RESET_ALL}")
print(
f"{Fore.GREEN}[SearchAgents] Collected {len(all_results)} unique pages.{Style.RESET_ALL}"
)
return all_results
# ---------------------------------------------------------------------------
# Step 3: SubAgent — investigate a single page
# ---------------------------------------------------------------------------
def subagent(origin_question: str, query: str) -> str:
"""
Investigate a single URL's full content against the original question.
The query parameter contains the URL + full page content (truncated to MAX_CHARS).
Returns the agent's findings as a string.
"""
prompt = (f"Original question: {origin_question}\n\nWeb page content:\n{query}")[
:MAX_CHARS
]
color = random.choice(SUBAGENT_COLORS)
print(f"{color}[SubAgent] Investigating ({len(prompt)} chars)...{Style.RESET_ALL}")
agent = create_agent(
model=get_llm(),
system_prompt=(
f"Current time is: {datetime.now(timezone.utc).isoformat()}. "
"You are a research analyst. You are given a web page's full content "
"and an original question. Extract ALL relevant clues, facts, data, "
"and details from the page that help answer the original question. "
"Be thorough and precise. Include specific numbers, names, and dates."
),
)
result = agent.invoke({"messages": [{"role": "user", "content": prompt}]})
content = result["messages"][-1].content
if isinstance(content, list):
content = content[0].get("text", "")
return str(content)
# ---------------------------------------------------------------------------
# Step 4: Combine all subagent findings
# ---------------------------------------------------------------------------
def combine_result_agent(origin_question: str, query: str) -> str:
"""
Combine multiple subagent findings into a single comprehensive answer.
The query parameter contains all subagent outputs joined together (truncated to MAX_CHARS).
"""
prompt = (
f"Original question: {origin_question}\n\n"
f"Research findings from multiple sources:\n{query}"
)[:MAX_CHARS]
print(
f"{Fore.BLUE}[CombineAgent] Synthesizing ({len(prompt)} chars)...{Style.RESET_ALL}"
)
agent = create_agent(
model=get_llm(),
system_prompt=(
f"Current time is: {datetime.now(timezone.utc).isoformat()}. "
"You are a research synthesizer. You receive findings from multiple "
"web sources investigating a question. Combine them into a single, "
"comprehensive, well-structured answer. Cite the source URL for each "
"key fact. Resolve any contradictions between sources."
),
)
result = agent.invoke({"messages": [{"role": "user", "content": prompt}]})
content = result["messages"][-1].content
if isinstance(content, list):
content = content[0].get("text", "")
return str(content)
# ---------------------------------------------------------------------------
# Step 5: Main orchestrator tool
# ---------------------------------------------------------------------------
@tool
def web_search_agents(origin_question: str, query: str) -> str:
"""
A multi-agent web search tool that expands the query, searches in parallel,
investigates each page with subagents, and synthesizes a final answer.
Pros:
- Dispatches multiple subagents for deep, parallel investigation across many sources.
- Can achieve both broad and deep research when queries are well-crafted.
Cons:
- Requires more detailed and transparent query descriptions for good control.
- Each subagent has no long-term memory (context is kept short to avoid token limit failures).
Use this tool for complex questions that require deep web research from multiple sources.
For simple factual lookups, prefer websearch_agent instead.
Args:
origin_question: The original user question for context. Must be detailed and clear.
query: The specific search query to research. Be as specific and transparent as possible.
"""
print(f"\n{Fore.YELLOW}{'=' * 60}")
print("[WebSearchAgents] Starting research")
print(f" Origin : {origin_question}")
print(f" Query : {query}")
print(f"{'=' * 60}{Style.RESET_ALL}\n")
# 1. Expand queries
expanded = expand_queries(origin_question, query)
# 2. Parallel Tavily search & extract
pages = search_and_extract_parallel(expanded)
if not pages:
return "No search results found."
# 3. Parallel subagent investigation
print(
f"\n{Fore.MAGENTA}[SubAgents] Dispatching {len(pages)} subagents...{Style.RESET_ALL}"
)
subagent_results: list[str] = []
def _run_subagent(page: dict) -> str:
page_input = (
f"URL: {page['url']}\nTitle: {page['title']}\n\n{page['full_content']}"
)
finding = subagent(origin_question, page_input)
return f"### Source: {page['url']}\n{finding}"
with ThreadPoolExecutor(max_workers=min(len(pages), 5)) as pool:
futures = {pool.submit(_run_subagent, p): p for p in pages}
for future in as_completed(futures):
page = futures[future]
try:
result = future.result()
subagent_results.append(result)
color = random.choice(SUBAGENT_COLORS)
print(f"{color}[SubAgent] Done: {page['url']}{Style.RESET_ALL}")
except Exception as e:
print(
f"{Fore.RED}[SubAgent] Error on {page['url']}: {e}{Style.RESET_ALL}"
)
# 4. Combine results
combined_input = "\n\n---\n\n".join(subagent_results)
result = combine_result_agent(origin_question, combined_input)
# print(f"\n{Fore.YELLOW}{'=' * 60}")
# print("[WebSearchAgents] Research complete")
# print(f"{'=' * 60}")
print(f"[WebSearchAgents -> SupervisorAgent] {result}")
print(f"{'=' * 60}{Style.RESET_ALL}\n")
return result
# ---------------------------------------------------------------------------
# Test
# ---------------------------------------------------------------------------
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv()
test_query = "What is LangGraph?"
answer = web_search_agents.invoke(
{"origin_question": test_query, "query": test_query}
)
print(f"\n{Fore.YELLOW}{'=' * 60}")
print("FINAL ANSWER")
print(f"{'=' * 60}{Style.RESET_ALL}")
print(answer)