| import requests |
| import pandas as pd |
| from PIL import Image |
| import os |
| import subprocess |
| from bs4 import BeautifulSoup |
| import urllib.parse |
|
|
| def web_search(query: str, api_key: str = None) -> str: |
| """ |
| Perform web search using SerpAPI if available, otherwise fallback to DuckDuckGo scraping. |
| """ |
| if api_key and api_key != "your-serpapi-key-here": |
| return _serpapi_search(query, api_key) |
| else: |
| return _duckduckgo_search(query) |
|
|
| def _serpapi_search(query: str, api_key: str) -> str: |
| """Search using SerpAPI.""" |
| try: |
| url = f"https://serpapi.com/search" |
| params = { |
| "q": query, |
| "api_key": api_key, |
| "engine": "google" |
| } |
| response = requests.get(url, params=params, timeout=10) |
| response.raise_for_status() |
| |
| results = response.json() |
| organic_results = results.get("organic_results", []) |
| |
| if organic_results: |
| |
| search_summary = [] |
| for i, result in enumerate(organic_results[:3]): |
| title = result.get("title", "") |
| snippet = result.get("snippet", "") |
| if title and snippet: |
| search_summary.append(f"{i+1}. {title}: {snippet}") |
| |
| return "\n".join(search_summary) if search_summary else "No useful results found" |
| else: |
| return "No search results found" |
| |
| except requests.RequestException as e: |
| print(f"SerpAPI search error: {e}") |
| return "Search failed" |
|
|
| def _duckduckgo_search(query: str) -> str: |
| """Fallback web search using DuckDuckGo scraping.""" |
| try: |
| |
| url = "https://api.duckduckgo.com/" |
| params = { |
| "q": query, |
| "format": "json", |
| "no_html": "1", |
| "skip_disambig": "1" |
| } |
| |
| response = requests.get(url, params=params, timeout=10) |
| response.raise_for_status() |
| |
| data = response.json() |
| |
| |
| abstract = data.get("Abstract", "") |
| if abstract: |
| return f"Summary: {abstract}" |
| |
| |
| related_topics = data.get("RelatedTopics", []) |
| if related_topics: |
| summaries = [] |
| for topic in related_topics[:3]: |
| if isinstance(topic, dict) and "Text" in topic: |
| summaries.append(topic["Text"]) |
| if summaries: |
| return "Related information:\n" + "\n".join(summaries) |
| |
| |
| return _simple_web_scrape(query) |
| |
| except Exception as e: |
| print(f"DuckDuckGo search error: {e}") |
| return "Search failed" |
|
|
| def _simple_web_scrape(query: str) -> str: |
| """Simple web scraping fallback.""" |
| try: |
| |
| search_url = f"https://html.duckduckgo.com/html/?q={urllib.parse.quote(query)}" |
| headers = { |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" |
| } |
| |
| response = requests.get(search_url, headers=headers, timeout=10) |
| if response.status_code == 200: |
| soup = BeautifulSoup(response.content, 'html.parser') |
| |
| results = soup.find_all('a', class_='result__snippet')[:3] |
| if results: |
| snippets = [r.get_text().strip() for r in results if r.get_text().strip()] |
| return "\n".join(snippets[:3]) if snippets else "Limited search results available" |
| |
| return "Basic web search completed - limited results" |
| |
| except Exception as e: |
| print(f"Web scraping error: {e}") |
| return "Web search unavailable" |
|
|
| def read_file(file_name: str) -> str: |
| """ |
| Read and process different file types (text, CSV, images). |
| """ |
| if not file_name or not os.path.exists(file_name): |
| return "File not found" |
| |
| try: |
| file_extension = os.path.splitext(file_name)[1].lower() |
| |
| if file_extension == ".csv": |
| return _read_csv_file(file_name) |
| elif file_extension in [".png", ".jpg", ".jpeg", ".gif", ".bmp"]: |
| return _read_image_file(file_name) |
| elif file_extension in [".txt", ".md", ".py", ".js", ".html", ".json"]: |
| return _read_text_file(file_name) |
| else: |
| |
| return _read_text_file(file_name) |
| |
| except Exception as e: |
| return f"Error reading file: {str(e)}" |
|
|
| def _read_text_file(file_name: str) -> str: |
| """Read a text file.""" |
| try: |
| with open(file_name, "r", encoding="utf-8") as f: |
| content = f.read() |
| return content[:5000] |
| except UnicodeDecodeError: |
| |
| try: |
| with open(file_name, "r", encoding="latin-1") as f: |
| content = f.read() |
| return content[:5000] |
| except Exception as e: |
| return f"Text file reading error: {str(e)}" |
|
|
| def _read_csv_file(file_name: str) -> str: |
| """Read and summarize a CSV file.""" |
| try: |
| df = pd.read_csv(file_name) |
| |
| |
| summary = [] |
| summary.append(f"CSV file shape: {df.shape[0]} rows, {df.shape[1]} columns") |
| summary.append(f"Columns: {', '.join(df.columns.tolist())}") |
| |
| |
| summary.append("\nFirst 5 rows:") |
| summary.append(df.head().to_string()) |
| |
| |
| numeric_columns = df.select_dtypes(include=['number']).columns |
| if len(numeric_columns) > 0: |
| summary.append(f"\nNumeric column statistics:") |
| summary.append(df[numeric_columns].describe().to_string()) |
| |
| return "\n".join(summary) |
| |
| except Exception as e: |
| return f"CSV reading error: {str(e)}" |
|
|
| def _read_image_file(file_name: str) -> str: |
| """Read and analyze an image file.""" |
| try: |
| |
| try: |
| import pytesseract |
| img = Image.open(file_name) |
| |
| |
| info = f"Image: {img.size[0]}x{img.size[1]} pixels, mode: {img.mode}" |
| |
| |
| text = pytesseract.image_to_string(img).strip() |
| if text: |
| return f"{info}\n\nExtracted text:\n{text}" |
| else: |
| return f"{info}\n\nNo text detected in image." |
| |
| except ImportError: |
| |
| img = Image.open(file_name) |
| return f"Image: {img.size[0]}x{img.size[1]} pixels, mode: {img.mode}\n(OCR not available - install pytesseract for text extraction)" |
| |
| except Exception as e: |
| return f"Image reading error: {str(e)}" |
|
|
| def execute_code(code: str, timeout: int = 10) -> str: |
| """ |
| Execute Python code safely with timeout. |
| """ |
| try: |
| |
| dangerous_keywords = ["import os", "import subprocess", "__import__", "exec", "eval", "open("] |
| if any(keyword in code.lower() for keyword in dangerous_keywords): |
| return "Code execution blocked: potentially unsafe operations detected" |
| |
| result = subprocess.run( |
| ["python3", "-c", code], |
| capture_output=True, |
| text=True, |
| timeout=timeout, |
| cwd="/tmp" |
| ) |
| |
| if result.returncode == 0: |
| return result.stdout.strip() if result.stdout else "Code executed successfully (no output)" |
| else: |
| return f"Code execution error: {result.stderr.strip()}" |
| |
| except subprocess.TimeoutExpired: |
| return "Code execution timeout" |
| except Exception as e: |
| return f"Code execution error: {str(e)}" |
|
|
| def calculate_simple_math(expression: str) -> str: |
| """ |
| Safely evaluate simple mathematical expressions. |
| """ |
| try: |
| |
| allowed_chars = set("0123456789+-*/.() ") |
| if not all(c in allowed_chars for c in expression): |
| return "Invalid mathematical expression" |
| |
| |
| result = eval(expression) |
| return str(result) |
| |
| except Exception as e: |
| return f"Math calculation error: {str(e)}" |