File size: 11,129 Bytes
a394be7
 
 
 
 
 
 
 
bcdc55d
 
a394be7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bcdc55d
a394be7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bcdc55d
a394be7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bcdc55d
a394be7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b70637a
 
 
a394be7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
import os
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from colorama import Fore, Style  # type: ignore[import]
from langchain.agents import create_agent
from langchain_core.tools import tool
from pydantic import BaseModel, Field

from agent.api.api import get_llm
from tavily import TavilyClient  # type: ignore[import]


SUBAGENT_COLORS = [
    Fore.MAGENTA,
    Fore.CYAN,
    Fore.GREEN,
    Fore.YELLOW,
    Fore.BLUE,
    Fore.WHITE,
    Fore.LIGHTRED_EX,
    Fore.LIGHTGREEN_EX,
    Fore.LIGHTYELLOW_EX,
    Fore.LIGHTBLUE_EX,
    Fore.LIGHTMAGENTA_EX,
    Fore.LIGHTCYAN_EX,
]

MAX_CHARS = 900000


class ExpandedQueries(BaseModel):
    """A list of expanded search queries derived from the original query."""

    queries: list[str] = Field(
        description="A list of expanded search queries to cover different angles of the original question."
    )


# ---------------------------------------------------------------------------
# Step 1: Query Expansion (structured output)
# ---------------------------------------------------------------------------


def expand_queries(origin_question: str, query: str) -> list[str]:
    """Use structured output to expand a single query into multiple search queries."""
    print(f"{Fore.CYAN}[QueryExpander] Expanding: {query}{Style.RESET_ALL}")

    agent = create_agent(
        model=get_llm(),
        response_format=ExpandedQueries,
        system_prompt=(
            f"Current time is: {datetime.now(timezone.utc).isoformat()}. "
            "You are a search query expansion expert. "
            "Given a user question, generate 3 diverse and specific search queries "
            "that cover different angles of the question to maximize search coverage. "
            "Each query should be concise and optimized for web search engines."
        ),
    )

    result = agent.invoke(
        {
            "messages": [
                {
                    "role": "user",
                    "content": (
                        f"Original question: {origin_question}\n"
                        f"Query to expand: {query}"
                    ),
                }
            ]
        }
    )
    expanded: ExpandedQueries = result["structured_response"]

    for i, q in enumerate(expanded.queries, 1):
        print(f"{Fore.CYAN}  [{i}] {q}{Style.RESET_ALL}")

    return expanded.queries


# ---------------------------------------------------------------------------
# Step 2: Parallel Tavily Search & Extract
# ---------------------------------------------------------------------------


def _search_single_query(query: str) -> list[dict]:
    """Search a single query via Tavily and return results with full content."""
    client = TavilyClient(api_key=os.environ["TAVILY_API_KEY"])

    search_response = client.search(query=query, search_depth="advanced", max_results=3)
    results = search_response.get("results", [])
    if not results:
        return []

    urls = [r["url"] for r in results]
    try:
        extraction = client.extract(
            urls=urls, extract_depth="advanced", format="markdown"
        )
        extracted_map = {
            item["url"]: item["raw_content"] for item in extraction.get("results", [])
        }
    except Exception as e:
        print(f"{Fore.RED}  Extraction failed: {e}{Style.RESET_ALL}")
        extracted_map = {}

    return [
        {
            "url": r["url"],
            "title": r["title"],
            "snippet": r["content"],
            "full_content": extracted_map.get(r["url"], "Extraction failed."),
        }
        for r in results
    ]


def search_and_extract_parallel(queries: list[str]) -> list[dict]:
    """Search all expanded queries in parallel threads and deduplicate by URL."""
    seen_urls: set[str] = set()
    all_results: list[dict] = []

    with ThreadPoolExecutor(max_workers=len(queries)) as pool:
        futures = {pool.submit(_search_single_query, q): q for q in queries}
        for future in as_completed(futures):
            q = futures[future]
            print(f"{Fore.GREEN}[Search & Extract] Done: {q}{Style.RESET_ALL}")
            try:
                for item in future.result():
                    if item["url"] not in seen_urls:
                        seen_urls.add(item["url"])
                        all_results.append(item)
            except Exception as e:
                print(f"{Fore.RED}[Search & Extract] Error: {e}{Style.RESET_ALL}")

    print(
        f"{Fore.GREEN}[SearchAgents] Collected {len(all_results)} unique pages.{Style.RESET_ALL}"
    )
    return all_results


# ---------------------------------------------------------------------------
# Step 3: SubAgent — investigate a single page
# ---------------------------------------------------------------------------


def subagent(origin_question: str, query: str) -> str:
    """
    Investigate a single URL's full content against the original question.
    The query parameter contains the URL + full page content (truncated to MAX_CHARS).
    Returns the agent's findings as a string.
    """
    prompt = (f"Original question: {origin_question}\n\nWeb page content:\n{query}")[
        :MAX_CHARS
    ]

    color = random.choice(SUBAGENT_COLORS)
    print(f"{color}[SubAgent] Investigating ({len(prompt)} chars)...{Style.RESET_ALL}")

    agent = create_agent(
        model=get_llm(),
        system_prompt=(
            f"Current time is: {datetime.now(timezone.utc).isoformat()}. "
            "You are a research analyst. You are given a web page's full content "
            "and an original question. Extract ALL relevant clues, facts, data, "
            "and details from the page that help answer the original question. "
            "Be thorough and precise. Include specific numbers, names, and dates."
        ),
    )

    result = agent.invoke({"messages": [{"role": "user", "content": prompt}]})
    content = result["messages"][-1].content
    if isinstance(content, list):
        content = content[0].get("text", "")
    return str(content)


# ---------------------------------------------------------------------------
# Step 4: Combine all subagent findings
# ---------------------------------------------------------------------------


def combine_result_agent(origin_question: str, query: str) -> str:
    """
    Combine multiple subagent findings into a single comprehensive answer.
    The query parameter contains all subagent outputs joined together (truncated to MAX_CHARS).
    """
    prompt = (
        f"Original question: {origin_question}\n\n"
        f"Research findings from multiple sources:\n{query}"
    )[:MAX_CHARS]

    print(
        f"{Fore.BLUE}[CombineAgent] Synthesizing ({len(prompt)} chars)...{Style.RESET_ALL}"
    )

    agent = create_agent(
        model=get_llm(),
        system_prompt=(
            f"Current time is: {datetime.now(timezone.utc).isoformat()}. "
            "You are a research synthesizer. You receive findings from multiple "
            "web sources investigating a question. Combine them into a single, "
            "comprehensive, well-structured answer. Cite the source URL for each "
            "key fact. Resolve any contradictions between sources."
        ),
    )

    result = agent.invoke({"messages": [{"role": "user", "content": prompt}]})
    content = result["messages"][-1].content
    if isinstance(content, list):
        content = content[0].get("text", "")
    return str(content)


# ---------------------------------------------------------------------------
# Step 5: Main orchestrator tool
# ---------------------------------------------------------------------------


@tool
def web_search_agents(origin_question: str, query: str) -> str:
    """
    A multi-agent web search tool that expands the query, searches in parallel,
    investigates each page with subagents, and synthesizes a final answer.

    Pros:
    - Dispatches multiple subagents for deep, parallel investigation across many sources.
    - Can achieve both broad and deep research when queries are well-crafted.
    Cons:
    - Requires more detailed and transparent query descriptions for good control.
    - Each subagent has no long-term memory (context is kept short to avoid token limit failures).

    Use this tool for complex questions that require deep web research from multiple sources.
    For simple factual lookups, prefer websearch_agent instead.

    Args:
        origin_question: The original user question for context. Must be detailed and clear.
        query: The specific search query to research. Be as specific and transparent as possible.
    """
    print(f"\n{Fore.YELLOW}{'=' * 60}")
    print("[WebSearchAgents] Starting research")
    print(f"  Origin : {origin_question}")
    print(f"  Query  : {query}")
    print(f"{'=' * 60}{Style.RESET_ALL}\n")

    # 1. Expand queries
    expanded = expand_queries(origin_question, query)

    # 2. Parallel Tavily search & extract
    pages = search_and_extract_parallel(expanded)
    if not pages:
        return "No search results found."

    # 3. Parallel subagent investigation
    print(
        f"\n{Fore.MAGENTA}[SubAgents] Dispatching {len(pages)} subagents...{Style.RESET_ALL}"
    )
    subagent_results: list[str] = []

    def _run_subagent(page: dict) -> str:
        page_input = (
            f"URL: {page['url']}\nTitle: {page['title']}\n\n{page['full_content']}"
        )
        finding = subagent(origin_question, page_input)
        return f"### Source: {page['url']}\n{finding}"

    with ThreadPoolExecutor(max_workers=min(len(pages), 5)) as pool:
        futures = {pool.submit(_run_subagent, p): p for p in pages}
        for future in as_completed(futures):
            page = futures[future]
            try:
                result = future.result()
                subagent_results.append(result)
                color = random.choice(SUBAGENT_COLORS)
                print(f"{color}[SubAgent] Done: {page['url']}{Style.RESET_ALL}")
            except Exception as e:
                print(
                    f"{Fore.RED}[SubAgent] Error on {page['url']}: {e}{Style.RESET_ALL}"
                )

    # 4. Combine results
    combined_input = "\n\n---\n\n".join(subagent_results)
    result = combine_result_agent(origin_question, combined_input)

    # print(f"\n{Fore.YELLOW}{'=' * 60}")
    # print("[WebSearchAgents] Research complete")
    # print(f"{'=' * 60}")
    print(f"[WebSearchAgents -> SupervisorAgent] {result}")
    print(f"{'=' * 60}{Style.RESET_ALL}\n")

    return result


# ---------------------------------------------------------------------------
# Test
# ---------------------------------------------------------------------------

if __name__ == "__main__":
    from dotenv import load_dotenv

    load_dotenv()

    test_query = "What is LangGraph?"
    answer = web_search_agents.invoke(
        {"origin_question": test_query, "query": test_query}
    )
    print(f"\n{Fore.YELLOW}{'=' * 60}")
    print("FINAL ANSWER")
    print(f"{'=' * 60}{Style.RESET_ALL}")
    print(answer)