| from __future__ import annotations |
|
|
| from typing import Annotated, List |
| from datetime import datetime |
|
|
| import gradio as gr |
| from ddgs import DDGS |
|
|
| from app import _log_call_end, _log_call_start, _search_rate_limiter, _truncate_for_log |
| from ._docstrings import autodoc |
|
|
|
|
| |
| TOOL_SUMMARY = ( |
| "Run a DuckDuckGo-backed search across text, news, images, videos, or books. " |
| "Readable results include pagination hints and next_offset when more results are available; " |
| "Use in combination with `Web_Fetch` to navigate the web." |
| ) |
|
|
|
|
| _SAFESEARCH_LEVEL = "off" |
|
|
| |
| BACKEND_CHOICES = [ |
| "auto", |
| "duckduckgo", |
| "bing", |
| "brave", |
| "yahoo", |
| "wikipedia", |
| ] |
|
|
| |
| _ALLOWED_BACKENDS = { |
| "text": ["duckduckgo", "bing", "brave", "yahoo", "wikipedia"], |
| "news": ["duckduckgo", "bing", "yahoo"], |
| "images": ["duckduckgo"], |
| "videos": ["duckduckgo"], |
| "books": ["annasarchive"], |
| } |
|
|
| |
| _AUTO_ORDER = { |
| "text": ["duckduckgo", "bing", "brave", "yahoo"], |
| "news": ["duckduckgo", "bing", "yahoo"], |
| "images": ["duckduckgo"], |
| "videos": ["duckduckgo"], |
| "books": ["annasarchive"], |
| } |
|
|
| |
| DATE_FILTER_CHOICES = ["any", "day", "week", "month", "year"] |
|
|
|
|
| def _resolve_backend(search_type: str, backend_choice: str) -> str: |
| """Resolve backend string for DDGS based on search type and user choice. |
| |
| - If backend_choice is "auto", return a comma-separated fallback order for that type. |
| - If backend_choice is not supported by the type, fall back to the first allowed backend. |
| - Books endpoint uses only 'annasarchive'. |
| """ |
| stype = search_type if search_type in _ALLOWED_BACKENDS else "text" |
| allowed = _ALLOWED_BACKENDS[stype] |
| if backend_choice == "auto": |
| return ", ".join(_AUTO_ORDER[stype]) |
| if stype == "books": |
| return "annasarchive" |
| |
| if backend_choice in allowed: |
| return backend_choice |
| |
| return allowed[0] |
|
|
|
|
| def _resolve_timelimit(date_filter: str, search_type: str) -> str | None: |
| """Map UI date filter to DDGS timelimit code per endpoint. |
| |
| Returns one of: None, 'd', 'w', 'm', 'y'. For news/videos (which support d/w/m), |
| selecting 'year' will coerce to 'm' to stay within supported range. |
| """ |
| normalized = (date_filter or "any").strip().lower() |
| if normalized in ("any", "none", ""): |
| return None |
| mapping = { |
| "day": "d", |
| "week": "w", |
| "month": "m", |
| "year": "y", |
| } |
| code = mapping.get(normalized) |
| if not code: |
| return None |
| if search_type in ("news", "videos") and code == "y": |
| return "m" |
| return code |
|
|
|
|
| def _extract_date_from_snippet(snippet: str) -> str: |
| if not snippet: |
| return "" |
| import re |
|
|
| date_patterns = [ |
| r"\b(\d{4}[-/]\d{1,2}[-/]\d{1,2})\b", |
| r"\b([A-Za-z]{3,9}\s+\d{1,2},?\s+\d{4})\b", |
| r"\b(\d{1,2}\s+[A-Za-z]{3,9}\s+\d{4})\b", |
| r"\b(\d+\s+(?:day|week|month|year)s?\s+ago)\b", |
| r"(?:Published|Updated|Posted):\s*([^,\n]+?)(?:[,\n]|$)", |
| ] |
| for pattern in date_patterns: |
| matches = re.findall(pattern, snippet, re.IGNORECASE) |
| if matches: |
| return matches[0].strip() |
| return "" |
|
|
|
|
| def _format_search_result(result: dict, search_type: str, index: int) -> List[str]: |
| lines: List[str] = [] |
| if search_type == "text": |
| title = result.get("title", "").strip() |
| url = result.get("href", "").strip() |
| snippet = result.get("body", "").strip() |
| date = _extract_date_from_snippet(snippet) |
| lines.append(f"{index}. {title}") |
| lines.append(f" URL: {url}") |
| if snippet: |
| lines.append(f" Summary: {snippet}") |
| if date: |
| lines.append(f" Date: {date}") |
| elif search_type == "news": |
| title = result.get("title", "").strip() |
| url = result.get("url", "").strip() |
| body = result.get("body", "").strip() |
| date = result.get("date", "").strip() |
| source = result.get("source", "").strip() |
| lines.append(f"{index}. {title}") |
| lines.append(f" URL: {url}") |
| if source: |
| lines.append(f" Source: {source}") |
| if date: |
| lines.append(f" Date: {date}") |
| if body: |
| lines.append(f" Summary: {body}") |
| elif search_type == "images": |
| title = result.get("title", "").strip() |
| image_url = result.get("image", "").strip() |
| source_url = result.get("url", "").strip() |
| source = result.get("source", "").strip() |
| width = result.get("width", "") |
| height = result.get("height", "") |
| lines.append(f"{index}. {title}") |
| lines.append(f" Image: {image_url}") |
| lines.append(f" Source: {source_url}") |
| if source: |
| lines.append(f" Publisher: {source}") |
| if width and height: |
| lines.append(f" Dimensions: {width}x{height}") |
| elif search_type == "videos": |
| title = result.get("title", "").strip() |
| description = result.get("description", "").strip() |
| duration = result.get("duration", "").strip() |
| published = result.get("published", "").strip() |
| uploader = result.get("uploader", "").strip() |
| embed_url = result.get("embed_url", "").strip() |
| lines.append(f"{index}. {title}") |
| if embed_url: |
| lines.append(f" Video: {embed_url}") |
| if uploader: |
| lines.append(f" Uploader: {uploader}") |
| if duration: |
| lines.append(f" Duration: {duration}") |
| if published: |
| lines.append(f" Published: {published}") |
| if description: |
| lines.append(f" Description: {description}") |
| elif search_type == "books": |
| title = result.get("title", "").strip() |
| url = result.get("url", "").strip() |
| body = result.get("body", "").strip() |
| lines.append(f"{index}. {title}") |
| lines.append(f" URL: {url}") |
| if body: |
| lines.append(f" Description: {body}") |
| return lines |
|
|
|
|
| @autodoc( |
| summary=TOOL_SUMMARY, |
| ) |
| def Web_Search( |
| query: Annotated[str, "The search query (supports operators like site:, quotes, OR)."], |
| max_results: Annotated[int, "Number of results to return (1–20)."] = 5, |
| page: Annotated[int, "Page number for pagination (1-based, each page contains max_results items)."] = 1, |
| offset: Annotated[int, "Result offset to start from (overrides page if > 0, for precise continuation)."] = 0, |
| search_type: Annotated[str, "Type of search: 'text' (web pages), 'news', 'images', 'videos', or 'books'."] = "text", |
| backend: Annotated[str, "Search backend or ordered fallbacks. Use 'auto' for recommended order."] = "auto", |
| date_filter: Annotated[str, "Time filter: any, day, week, month, year."] = "any", |
| ) -> str: |
| _log_call_start( |
| "Web_Search", |
| query=query, |
| max_results=max_results, |
| page=page, |
| search_type=search_type, |
| offset=offset, |
| backend=backend, |
| date_filter=date_filter, |
| ) |
| if not query or not query.strip(): |
| result = "No search query provided. Please enter a search term." |
| _log_call_end("Web_Search", _truncate_for_log(result)) |
| return result |
| max_results = max(1, min(20, max_results)) |
| page = max(1, page) |
| offset = max(0, offset) |
| valid_types = ["text", "news", "images", "videos", "books"] |
| if search_type not in valid_types: |
| search_type = "text" |
| if offset > 0: |
| actual_offset = offset |
| calculated_page = (offset // max_results) + 1 |
| else: |
| actual_offset = (page - 1) * max_results |
| calculated_page = page |
| total_needed = actual_offset + max_results |
| used_fallback = False |
| original_search_type = search_type |
| |
| resolved_backend = _resolve_backend(search_type, (backend or "auto").lower()) |
| timelimit = _resolve_timelimit(date_filter, search_type) |
|
|
| def _perform_search(stype: str) -> list[dict]: |
| try: |
| _search_rate_limiter.acquire() |
| with DDGS() as ddgs: |
| if stype == "text": |
| user_backend_choice = (backend or "auto").lower() |
| if user_backend_choice == "auto": |
| |
| results: list[dict] = [] |
| seen: set[str] = set() |
|
|
| def add_unique(items: list[dict], key_field: str) -> None: |
| for it in items or []: |
| url = (it.get(key_field, "") or "").strip() |
| if url and url not in seen: |
| seen.add(url) |
| results.append(it) |
|
|
| |
| try: |
| ddg_items = list( |
| ddgs.text( |
| query, |
| max_results=total_needed + 10, |
| safesearch=_SAFESEARCH_LEVEL, |
| timelimit=timelimit, |
| backend="duckduckgo", |
| ) |
| ) |
| except Exception: |
| ddg_items = [] |
| add_unique(ddg_items, "href") |
|
|
| |
| for eng in [b for b in _AUTO_ORDER["text"] if b != "duckduckgo"]: |
| try: |
| extra = list( |
| ddgs.text( |
| query, |
| max_results=total_needed + 10, |
| safesearch=_SAFESEARCH_LEVEL, |
| timelimit=timelimit, |
| backend=eng, |
| ) |
| ) |
| except Exception: |
| extra = [] |
| add_unique(extra, "href") |
|
|
| return results |
| else: |
| raw_gen = ddgs.text( |
| query, |
| max_results=total_needed + 10, |
| safesearch=_SAFESEARCH_LEVEL, |
| timelimit=timelimit, |
| backend=resolved_backend, |
| ) |
| elif stype == "news": |
| user_backend_choice = (backend or "auto").lower() |
| if user_backend_choice == "auto": |
| |
| results: list[dict] = [] |
| seen: set[str] = set() |
|
|
| def add_unique(items: list[dict], key_field: str) -> None: |
| for it in items or []: |
| url = (it.get(key_field, "") or "").strip() |
| if url and url not in seen: |
| seen.add(url) |
| results.append(it) |
|
|
| |
| try: |
| ddg_news = list( |
| ddgs.news( |
| query, |
| max_results=total_needed + 10, |
| safesearch=_SAFESEARCH_LEVEL, |
| timelimit=timelimit, |
| backend="duckduckgo", |
| ) |
| ) |
| except Exception: |
| ddg_news = [] |
| add_unique(ddg_news, "url") |
|
|
| |
| for eng in [b for b in _AUTO_ORDER["news"] if b != "duckduckgo"]: |
| try: |
| extra = list( |
| ddgs.news( |
| query, |
| max_results=total_needed + 10, |
| safesearch=_SAFESEARCH_LEVEL, |
| timelimit=timelimit, |
| backend=eng, |
| ) |
| ) |
| except Exception: |
| extra = [] |
| add_unique(extra, "url") |
|
|
| return results |
| else: |
| raw_gen = ddgs.news( |
| query, |
| max_results=total_needed + 10, |
| safesearch=_SAFESEARCH_LEVEL, |
| timelimit=timelimit, |
| backend=_resolve_backend("news", (backend or "auto").lower()), |
| ) |
| elif stype == "images": |
| raw_gen = ddgs.images( |
| query, |
| max_results=total_needed + 10, |
| safesearch=_SAFESEARCH_LEVEL, |
| timelimit=timelimit, |
| backend=_resolve_backend("images", (backend or "auto").lower()), |
| ) |
| elif stype == "videos": |
| raw_gen = ddgs.videos( |
| query, |
| max_results=total_needed + 10, |
| safesearch=_SAFESEARCH_LEVEL, |
| timelimit=timelimit, |
| backend=_resolve_backend("videos", (backend or "auto").lower()), |
| ) |
| else: |
| raw_gen = ddgs.books( |
| query, |
| max_results=total_needed + 10, |
| backend=_resolve_backend("books", (backend or "auto").lower()), |
| ) |
| try: |
| return list(raw_gen) |
| except Exception as inner_exc: |
| if "no results" in str(inner_exc).lower() or "not found" in str(inner_exc).lower(): |
| return [] |
| raise inner_exc |
| except Exception as exc: |
| error_msg = f"Search failed: {str(exc)[:200]}" |
| lowered = str(exc).lower() |
| if "blocked" in lowered or "rate" in lowered: |
| error_msg = "Search temporarily blocked due to rate limiting. Please try again in a few minutes." |
| elif "timeout" in lowered: |
| error_msg = "Search timed out. Please try again with a simpler query." |
| elif "network" in lowered or "connection" in lowered: |
| error_msg = "Network connection error. Please check your internet connection and try again." |
| elif "no results" in lowered or "not found" in lowered: |
| return [] |
| raise Exception(error_msg) |
|
|
| try: |
| raw = _perform_search(search_type) |
| except Exception as exc: |
| result = f"Error: {exc}" |
| _log_call_end("Web_Search", _truncate_for_log(result)) |
| return result |
|
|
| if not raw and search_type == "news": |
| try: |
| raw = _perform_search("text") |
| if raw: |
| used_fallback = True |
| search_type = "text" |
| except Exception: |
| pass |
|
|
| if not raw: |
| fallback_note = " (also tried 'text' search as fallback)" if original_search_type == "news" and used_fallback else "" |
| result = f"No {original_search_type} results found for query: {query}{fallback_note}" |
| _log_call_end("Web_Search", _truncate_for_log(result)) |
| return result |
|
|
| paginated_results = raw[actual_offset: actual_offset + max_results] |
| if not paginated_results: |
| if actual_offset >= len(raw): |
| result = f"Offset {actual_offset} exceeds available results ({len(raw)} total). Try offset=0 to start from beginning." |
| else: |
| result = f"No {original_search_type} results found on page {calculated_page} for query: {query}. Try page 1 or reduce page number." |
| _log_call_end("Web_Search", _truncate_for_log(result)) |
| return result |
|
|
| total_available = len(raw) |
| start_num = actual_offset + 1 |
| end_num = actual_offset + len(paginated_results) |
| next_offset = actual_offset + len(paginated_results) |
| search_label = original_search_type.title() |
| if used_fallback: |
| search_label += " → Text (Smart Fallback)" |
|
|
| now_dt = datetime.now().astimezone() |
| date_str = now_dt.strftime("%A, %B %d, %Y %I:%M %p %Z").strip() |
| if not date_str: |
| date_str = now_dt.isoformat() |
|
|
| pagination_info = f"Page {calculated_page}" |
| if offset > 0: |
| pagination_info = f"Offset {actual_offset} (≈ {pagination_info})" |
| lines = [f"Current Date: {date_str}", f"{search_label} search results for: {query}"] |
| if used_fallback: |
| lines.append("📍 Note: News search returned no results, automatically searched general web content instead") |
| lines.append(f"{pagination_info} (results {start_num}-{end_num} of ~{total_available}+ available)\n") |
| for i, result in enumerate(paginated_results, start_num): |
| result_lines = _format_search_result(result, search_type, i) |
| lines.extend(result_lines) |
| lines.append("") |
| if total_available > end_num: |
| lines.append("💡 More results available:") |
| lines.append(f" • Next page: page={calculated_page + 1}") |
| lines.append(f" • Next offset: offset={next_offset}") |
| lines.append(f" • Use offset={next_offset} to continue exactly from result {next_offset + 1}") |
| result = "\n".join(lines) |
| search_info = f"type={original_search_type}" |
| if used_fallback: |
| search_info += "→text" |
| _log_call_end("Web_Search", f"{search_info} page={calculated_page} offset={actual_offset} results={len(paginated_results)} chars={len(result)}") |
| return result |
|
|
|
|
| def build_interface() -> gr.Interface: |
| return gr.Interface( |
| fn=Web_Search, |
| inputs=[ |
| gr.Textbox(label="Query", placeholder="topic OR site:example.com", max_lines=1, info="The search query"), |
| gr.Slider(minimum=1, maximum=20, value=5, step=1, label="Max results", info="Number of results to return (1–20)"), |
| gr.Slider(minimum=1, maximum=10, value=1, step=1, label="Page", info="Page number for pagination (ignored if offset > 0)"), |
| gr.Slider( |
| minimum=0, |
| maximum=1000, |
| value=0, |
| step=1, |
| label="Offset", |
| info="Result offset to start from (overrides page if > 0, use next_offset from previous search)", |
| ), |
| gr.Radio( |
| label="Search Type", |
| choices=["text", "news", "images", "videos", "books"], |
| value="text", |
| info="Type of content to search for", |
| ), |
| gr.Radio( |
| label="Backend", |
| choices=BACKEND_CHOICES, |
| value="auto", |
| info="Search engine backend or fallback order (auto applies recommended order)", |
| ), |
| gr.Radio( |
| label="Date filter", |
| choices=DATE_FILTER_CHOICES, |
| value="any", |
| info="Limit results to: day, week, month, or year (varies by type)", |
| ), |
| ], |
| outputs=gr.Textbox(label="Search Results", interactive=False, lines=20, max_lines=20), |
| title="Web Search", |
| description=( |
| "<div style=\"text-align:center\">Multi-type web search with readable output format, date detection, and flexible pagination. " |
| "Supports text, news, images, videos, and books. Features smart fallback for news searches and precise offset control.</div>" |
| ), |
| api_description=TOOL_SUMMARY, |
| flagging_mode="never", |
| submit_btn="Search", |
| ) |
|
|
|
|
| __all__ = ["Web_Search", "build_interface"] |
|
|