AlexTrinityBlock commited on
Commit
a394be7
·
1 Parent(s): 0019780

refactor(agent): split websearch agent into separate modules

Browse files

Split monolithic websearch.py into websearchagent.py and websearchagents.py
to improve code organization and maintainability. The supervisor agent now
has access to both individual websearch_agent and aggregated web_search_agents.

agent/agent.py CHANGED
@@ -4,7 +4,9 @@ from colorama import Fore, Style # type: ignore[import]
4
  from langchain.agents import create_agent
5
  from langchain_core.messages import HumanMessage
6
  from agent.tools.math_solver import math_solver
7
- from agent.agents.websearch import websearch_agent
 
 
8
 
9
  load_dotenv()
10
 
@@ -13,7 +15,7 @@ def supervisor_agent():
13
  """Return a supervisor agent instance with math_solver and websearch_agent."""
14
  return create_agent(
15
  model="google_genai:gemini-3-flash-preview",
16
- tools=[math_solver, websearch_agent],
17
  system_prompt=(
18
  f"You are a supervisor agent. "
19
  f"Current time is: {datetime.now(timezone.utc).isoformat()}. "
 
4
  from langchain.agents import create_agent
5
  from langchain_core.messages import HumanMessage
6
  from agent.tools.math_solver import math_solver
7
+
8
+ from agent.agents.websearchagents import web_search_agents
9
+ from agent.agents.websearchagent import websearch_agent
10
 
11
  load_dotenv()
12
 
 
15
  """Return a supervisor agent instance with math_solver and websearch_agent."""
16
  return create_agent(
17
  model="google_genai:gemini-3-flash-preview",
18
+ tools=[math_solver, websearch_agent, web_search_agents],
19
  system_prompt=(
20
  f"You are a supervisor agent. "
21
  f"Current time is: {datetime.now(timezone.utc).isoformat()}. "
agent/agents/__init__.py CHANGED
@@ -1,3 +1,4 @@
1
- from agent.agents.websearch import websearch_agent
 
2
 
3
- __all__ = ["websearch_agent"]
 
1
+ from agent.agents.websearchagent import websearch_agent
2
+ from agent.agents.websearchagents import web_search_agents
3
 
4
+ __all__ = ["websearch_agent", "web_search_agents"]
agent/agents/{websearch.py → websearchagent.py} RENAMED
@@ -9,10 +9,17 @@ from agent.tools.search import web_search
9
  @tool
10
  def websearch_agent(query: str) -> str:
11
  """
12
- A web search agent that searches the internet and returns an answer.
13
- Use this tool when you need to find real-time or factual information
14
- from the web, such as current events, specific facts, or any
15
- knowledge that may require up-to-date sources.
 
 
 
 
 
 
 
16
 
17
  Args:
18
  query: The question or search query to look up on the web.
@@ -32,7 +39,7 @@ def websearch_agent(query: str) -> str:
32
  try:
33
  result = base_agent.invoke(
34
  {"messages": [{"role": "user", "content": query}]},
35
- config={"recursion_limit": 6},
36
  )
37
  content = result["messages"][-1].content
38
  if isinstance(content, list):
@@ -44,6 +51,13 @@ def websearch_agent(query: str) -> str:
44
  f"{Fore.RED}[WebSearchAgent] Recursion limit reached, returning partial results.{Style.RESET_ALL}"
45
  )
46
  content = "Search completed but no definitive answer was found within the allowed steps."
 
 
 
 
 
 
 
47
  print(
48
  f"{Fore.YELLOW}[WebSearchAgent -> SupervisorAgent] {content}{Style.RESET_ALL}"
49
  )
 
9
  @tool
10
  def websearch_agent(query: str) -> str:
11
  """
12
+ A single web search agent that searches the internet and returns an answer.
13
+ Use this tool when you need to find real-time or factual information from the web.
14
+
15
+ Pros:
16
+ - Has continuous memory across search steps, allowing deep investigation on a single topic.
17
+ Cons:
18
+ - Narrow field of view, can only follow one search thread at a time.
19
+ - May fail after too many steps due to token limit overflow.
20
+
21
+ Prefer websearch_agents for complex questions requiring broad, multi-source research.
22
+ Use this tool for simple, direct factual lookups.
23
 
24
  Args:
25
  query: The question or search query to look up on the web.
 
39
  try:
40
  result = base_agent.invoke(
41
  {"messages": [{"role": "user", "content": query}]},
42
+ # config={"recursion_limit": 10},
43
  )
44
  content = result["messages"][-1].content
45
  if isinstance(content, list):
 
51
  f"{Fore.RED}[WebSearchAgent] Recursion limit reached, returning partial results.{Style.RESET_ALL}"
52
  )
53
  content = "Search completed but no definitive answer was found within the allowed steps."
54
+ except Exception as e:
55
+ error_msg = str(e)
56
+ print(f"{Fore.RED}[WebSearchAgent] Error: {error_msg}{Style.RESET_ALL}")
57
+ content = (
58
+ f"Search agent failed with error: {error_msg}. "
59
+ f"Recommend retrying with the web_search_agents tool to avoid context length overflow."
60
+ )
61
  print(
62
  f"{Fore.YELLOW}[WebSearchAgent -> SupervisorAgent] {content}{Style.RESET_ALL}"
63
  )
agent/agents/websearchagents.py ADDED
@@ -0,0 +1,309 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import random
3
+ from concurrent.futures import ThreadPoolExecutor, as_completed
4
+ from datetime import datetime, timezone
5
+ from colorama import Fore, Style # type: ignore[import]
6
+ from langchain.agents import create_agent
7
+ from langchain_core.tools import tool
8
+ from pydantic import BaseModel, Field
9
+ from tavily import TavilyClient # type: ignore[import]
10
+
11
+
12
+ SUBAGENT_COLORS = [
13
+ Fore.MAGENTA,
14
+ Fore.CYAN,
15
+ Fore.GREEN,
16
+ Fore.YELLOW,
17
+ Fore.BLUE,
18
+ Fore.WHITE,
19
+ Fore.LIGHTRED_EX,
20
+ Fore.LIGHTGREEN_EX,
21
+ Fore.LIGHTYELLOW_EX,
22
+ Fore.LIGHTBLUE_EX,
23
+ Fore.LIGHTMAGENTA_EX,
24
+ Fore.LIGHTCYAN_EX,
25
+ ]
26
+
27
+ MAX_CHARS = 900000
28
+
29
+
30
+ class ExpandedQueries(BaseModel):
31
+ """A list of expanded search queries derived from the original query."""
32
+
33
+ queries: list[str] = Field(
34
+ description="A list of expanded search queries to cover different angles of the original question."
35
+ )
36
+
37
+
38
+ # ---------------------------------------------------------------------------
39
+ # Step 1: Query Expansion (structured output)
40
+ # ---------------------------------------------------------------------------
41
+
42
+
43
+ def expand_queries(origin_question: str, query: str) -> list[str]:
44
+ """Use structured output to expand a single query into multiple search queries."""
45
+ print(f"{Fore.CYAN}[QueryExpander] Expanding: {query}{Style.RESET_ALL}")
46
+
47
+ agent = create_agent(
48
+ model="google_genai:gemini-3-flash-preview",
49
+ response_format=ExpandedQueries,
50
+ system_prompt=(
51
+ f"Current time is: {datetime.now(timezone.utc).isoformat()}. "
52
+ "You are a search query expansion expert. "
53
+ "Given a user question, generate 3 diverse and specific search queries "
54
+ "that cover different angles of the question to maximize search coverage. "
55
+ "Each query should be concise and optimized for web search engines."
56
+ ),
57
+ )
58
+
59
+ result = agent.invoke(
60
+ {
61
+ "messages": [
62
+ {
63
+ "role": "user",
64
+ "content": (
65
+ f"Original question: {origin_question}\n"
66
+ f"Query to expand: {query}"
67
+ ),
68
+ }
69
+ ]
70
+ }
71
+ )
72
+ expanded: ExpandedQueries = result["structured_response"]
73
+
74
+ for i, q in enumerate(expanded.queries, 1):
75
+ print(f"{Fore.CYAN} [{i}] {q}{Style.RESET_ALL}")
76
+
77
+ return expanded.queries
78
+
79
+
80
+ # ---------------------------------------------------------------------------
81
+ # Step 2: Parallel Tavily Search & Extract
82
+ # ---------------------------------------------------------------------------
83
+
84
+
85
+ def _search_single_query(query: str) -> list[dict]:
86
+ """Search a single query via Tavily and return results with full content."""
87
+ client = TavilyClient(api_key=os.environ["TAVILY_API_KEY"])
88
+
89
+ search_response = client.search(query=query, search_depth="advanced", max_results=3)
90
+ results = search_response.get("results", [])
91
+ if not results:
92
+ return []
93
+
94
+ urls = [r["url"] for r in results]
95
+ try:
96
+ extraction = client.extract(
97
+ urls=urls, extract_depth="advanced", format="markdown"
98
+ )
99
+ extracted_map = {
100
+ item["url"]: item["raw_content"] for item in extraction.get("results", [])
101
+ }
102
+ except Exception as e:
103
+ print(f"{Fore.RED} Extraction failed: {e}{Style.RESET_ALL}")
104
+ extracted_map = {}
105
+
106
+ return [
107
+ {
108
+ "url": r["url"],
109
+ "title": r["title"],
110
+ "snippet": r["content"],
111
+ "full_content": extracted_map.get(r["url"], "Extraction failed."),
112
+ }
113
+ for r in results
114
+ ]
115
+
116
+
117
+ def search_and_extract_parallel(queries: list[str]) -> list[dict]:
118
+ """Search all expanded queries in parallel threads and deduplicate by URL."""
119
+ seen_urls: set[str] = set()
120
+ all_results: list[dict] = []
121
+
122
+ with ThreadPoolExecutor(max_workers=len(queries)) as pool:
123
+ futures = {pool.submit(_search_single_query, q): q for q in queries}
124
+ for future in as_completed(futures):
125
+ q = futures[future]
126
+ print(f"{Fore.GREEN}[Search & Extract] Done: {q}{Style.RESET_ALL}")
127
+ try:
128
+ for item in future.result():
129
+ if item["url"] not in seen_urls:
130
+ seen_urls.add(item["url"])
131
+ all_results.append(item)
132
+ except Exception as e:
133
+ print(f"{Fore.RED}[Search & Extract] Error: {e}{Style.RESET_ALL}")
134
+
135
+ print(
136
+ f"{Fore.GREEN}[SearchAgents] Collected {len(all_results)} unique pages.{Style.RESET_ALL}"
137
+ )
138
+ return all_results
139
+
140
+
141
+ # ---------------------------------------------------------------------------
142
+ # Step 3: SubAgent — investigate a single page
143
+ # ---------------------------------------------------------------------------
144
+
145
+
146
+ def subagent(origin_question: str, query: str) -> str:
147
+ """
148
+ Investigate a single URL's full content against the original question.
149
+ The query parameter contains the URL + full page content (truncated to MAX_CHARS).
150
+ Returns the agent's findings as a string.
151
+ """
152
+ prompt = (f"Original question: {origin_question}\n\nWeb page content:\n{query}")[
153
+ :MAX_CHARS
154
+ ]
155
+
156
+ color = random.choice(SUBAGENT_COLORS)
157
+ print(f"{color}[SubAgent] Investigating ({len(prompt)} chars)...{Style.RESET_ALL}")
158
+
159
+ agent = create_agent(
160
+ model="google_genai:gemini-3-flash-preview",
161
+ system_prompt=(
162
+ f"Current time is: {datetime.now(timezone.utc).isoformat()}. "
163
+ "You are a research analyst. You are given a web page's full content "
164
+ "and an original question. Extract ALL relevant clues, facts, data, "
165
+ "and details from the page that help answer the original question. "
166
+ "Be thorough and precise. Include specific numbers, names, and dates."
167
+ ),
168
+ )
169
+
170
+ result = agent.invoke({"messages": [{"role": "user", "content": prompt}]})
171
+ content = result["messages"][-1].content
172
+ if isinstance(content, list):
173
+ content = content[0].get("text", "")
174
+ return str(content)
175
+
176
+
177
+ # ---------------------------------------------------------------------------
178
+ # Step 4: Combine all subagent findings
179
+ # ---------------------------------------------------------------------------
180
+
181
+
182
+ def combine_result_agent(origin_question: str, query: str) -> str:
183
+ """
184
+ Combine multiple subagent findings into a single comprehensive answer.
185
+ The query parameter contains all subagent outputs joined together (truncated to MAX_CHARS).
186
+ """
187
+ prompt = (
188
+ f"Original question: {origin_question}\n\n"
189
+ f"Research findings from multiple sources:\n{query}"
190
+ )[:MAX_CHARS]
191
+
192
+ print(
193
+ f"{Fore.BLUE}[CombineAgent] Synthesizing ({len(prompt)} chars)...{Style.RESET_ALL}"
194
+ )
195
+
196
+ agent = create_agent(
197
+ model="google_genai:gemini-3-flash-preview",
198
+ system_prompt=(
199
+ f"Current time is: {datetime.now(timezone.utc).isoformat()}. "
200
+ "You are a research synthesizer. You receive findings from multiple "
201
+ "web sources investigating a question. Combine them into a single, "
202
+ "comprehensive, well-structured answer. Cite the source URL for each "
203
+ "key fact. Resolve any contradictions between sources."
204
+ ),
205
+ )
206
+
207
+ result = agent.invoke({"messages": [{"role": "user", "content": prompt}]})
208
+ content = result["messages"][-1].content
209
+ if isinstance(content, list):
210
+ content = content[0].get("text", "")
211
+ return str(content)
212
+
213
+
214
+ # ---------------------------------------------------------------------------
215
+ # Step 5: Main orchestrator tool
216
+ # ---------------------------------------------------------------------------
217
+
218
+
219
+ @tool
220
+ def web_search_agents(origin_question: str, query: str) -> str:
221
+ """
222
+ A multi-agent web search tool that expands the query, searches in parallel,
223
+ investigates each page with subagents, and synthesizes a final answer.
224
+
225
+ Pros:
226
+ - Dispatches multiple subagents for deep, parallel investigation across many sources.
227
+ - Can achieve both broad and deep research when queries are well-crafted.
228
+ Cons:
229
+ - Requires more detailed and transparent query descriptions for good control.
230
+ - Each subagent has no long-term memory (context is kept short to avoid token limit failures).
231
+
232
+ Use this tool for complex questions that require deep web research from multiple sources.
233
+ For simple factual lookups, prefer websearch_agent instead.
234
+
235
+ Args:
236
+ origin_question: The original user question for context. Must be detailed and clear.
237
+ query: The specific search query to research. Be as specific and transparent as possible.
238
+ """
239
+ print(f"\n{Fore.YELLOW}{'=' * 60}")
240
+ print("[WebSearchAgents] Starting research")
241
+ print(f" Origin : {origin_question}")
242
+ print(f" Query : {query}")
243
+ print(f"{'=' * 60}{Style.RESET_ALL}\n")
244
+
245
+ # 1. Expand queries
246
+ expanded = expand_queries(origin_question, query)
247
+
248
+ # 2. Parallel Tavily search & extract
249
+ pages = search_and_extract_parallel(expanded)
250
+ if not pages:
251
+ return "No search results found."
252
+
253
+ # 3. Parallel subagent investigation
254
+ print(
255
+ f"\n{Fore.MAGENTA}[SubAgents] Dispatching {len(pages)} subagents...{Style.RESET_ALL}"
256
+ )
257
+ subagent_results: list[str] = []
258
+
259
+ def _run_subagent(page: dict) -> str:
260
+ page_input = (
261
+ f"URL: {page['url']}\nTitle: {page['title']}\n\n{page['full_content']}"
262
+ )
263
+ finding = subagent(origin_question, page_input)
264
+ return f"### Source: {page['url']}\n{finding}"
265
+
266
+ with ThreadPoolExecutor(max_workers=min(len(pages), 5)) as pool:
267
+ futures = {pool.submit(_run_subagent, p): p for p in pages}
268
+ for future in as_completed(futures):
269
+ page = futures[future]
270
+ try:
271
+ result = future.result()
272
+ subagent_results.append(result)
273
+ color = random.choice(SUBAGENT_COLORS)
274
+ print(f"{color}[SubAgent] Done: {page['url']}{Style.RESET_ALL}")
275
+ except Exception as e:
276
+ print(
277
+ f"{Fore.RED}[SubAgent] Error on {page['url']}: {e}{Style.RESET_ALL}"
278
+ )
279
+
280
+ # 4. Combine results
281
+ combined_input = "\n\n---\n\n".join(subagent_results)
282
+ result = combine_result_agent(origin_question, combined_input)
283
+
284
+ print(f"\n{Fore.YELLOW}{'=' * 60}")
285
+ print("[WebSearchAgents] Research complete")
286
+ print(f"{'=' * 60}")
287
+ print(f"[WebSearchAgents -> SupervisorAgent] {result}")
288
+ print(f"{'=' * 60}{Style.RESET_ALL}\n")
289
+
290
+ return result
291
+
292
+
293
+ # ---------------------------------------------------------------------------
294
+ # Test
295
+ # ---------------------------------------------------------------------------
296
+
297
+ if __name__ == "__main__":
298
+ from dotenv import load_dotenv
299
+
300
+ load_dotenv()
301
+
302
+ test_query = "What is LangGraph?"
303
+ answer = web_search_agents.invoke(
304
+ {"origin_question": test_query, "query": test_query}
305
+ )
306
+ print(f"\n{Fore.YELLOW}{'=' * 60}")
307
+ print("FINAL ANSWER")
308
+ print(f"{'=' * 60}{Style.RESET_ALL}")
309
+ print(answer)
agent/tools/search.py CHANGED
@@ -1,5 +1,5 @@
1
  import os
2
- from colorama import Fore, Style
3
  from langchain_core.tools import tool
4
  from tavily import TavilyClient # type: ignore[import]
5
 
 
1
  import os
2
+ from colorama import Fore, Style # type: ignore[import]
3
  from langchain_core.tools import tool
4
  from tavily import TavilyClient # type: ignore[import]
5