| import requests |
| import json |
| import os |
| from typing import Dict, Any, Optional, Union |
| from urllib.parse import urljoin |
| from bs4 import BeautifulSoup |
| import html2text |
| import time |
|
|
| from ..core.module import BaseModule |
|
|
|
|
| class RequestBase(BaseModule): |
| """ |
| Base class for handling HTTP requests, parsing content, and saving data. |
| This class provides common functionality for web scraping and HTTP operations. |
| """ |
| |
| def __init__(self, timeout: int = 30, max_retries: int = 3, delay_between_requests: float = 1.0): |
| """ |
| Initialize the RequestBase with configuration options. |
| |
| Args: |
| timeout: Request timeout in seconds |
| max_retries: Maximum number of retry attempts |
| delay_between_requests: Delay between requests in seconds |
| """ |
| super().__init__() |
| self.timeout = timeout |
| self.max_retries = max_retries |
| self.delay_between_requests = delay_between_requests |
| self.session = requests.Session() |
| |
| |
| self.html_converter = html2text.HTML2Text() |
| self.html_converter.ignore_links = False |
| self.html_converter.ignore_images = False |
| self.html_converter.body_width = 0 |
| |
| |
| self.session.headers.update({ |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
| }) |
| |
| def request(self, url: str, method: str = 'GET', headers: Optional[Dict[str, str]] = None, |
| params: Optional[Dict[str, Any]] = None, data: Optional[Dict[str, Any]] = None, |
| json_data: Optional[Dict[str, Any]] = None) -> requests.Response: |
| """ |
| Make an HTTP request with retry logic and error handling. |
| |
| Args: |
| url: The URL to request |
| method: HTTP method (GET, POST, PUT, DELETE, etc.) |
| headers: Additional headers to include |
| params: URL parameters |
| data: Form data to send |
| json_data: JSON data to send |
| |
| Returns: |
| requests.Response object |
| |
| Raises: |
| requests.RequestException: If request fails after all retries |
| """ |
| if headers: |
| request_headers = {**self.session.headers, **headers} |
| else: |
| request_headers = self.session.headers |
| |
| for attempt in range(self.max_retries): |
| try: |
| response = self.session.request( |
| method=method.upper(), |
| url=url, |
| headers=request_headers, |
| params=params, |
| data=data, |
| json=json_data, |
| timeout=self.timeout |
| ) |
| response.raise_for_status() |
| |
| |
| if attempt < self.max_retries - 1: |
| time.sleep(self.delay_between_requests) |
| |
| return response |
| |
| except requests.RequestException as e: |
| if attempt == self.max_retries - 1: |
| raise e |
| time.sleep(self.delay_between_requests * (attempt + 1)) |
| |
| def parse_html(self, html_content: str) -> BeautifulSoup: |
| """ |
| Parse HTML content using BeautifulSoup. |
| |
| Args: |
| html_content: Raw HTML content |
| |
| Returns: |
| BeautifulSoup object for parsing |
| """ |
| return BeautifulSoup(html_content, 'html.parser') |
| |
| def parse_json(self, json_content: str) -> Dict[str, Any]: |
| """ |
| Parse JSON content. |
| |
| Args: |
| json_content: Raw JSON content |
| |
| Returns: |
| Parsed JSON as dictionary |
| """ |
| return json.loads(json_content) |
| |
| def extract_text(self, html_content: str, selector: Optional[str] = None) -> str: |
| """ |
| Extract text content from HTML using html2text. |
| |
| Args: |
| html_content: Raw HTML content |
| selector: CSS selector to extract specific elements (optional) |
| |
| Returns: |
| Extracted text content |
| """ |
| if selector: |
| soup = self.parse_html(html_content) |
| elements = soup.select(selector) |
| combined_html = '\n'.join([str(elem) for elem in elements]) |
| return self.html_converter.handle(combined_html) |
| else: |
| return self.html_converter.handle(html_content) |
| |
| def extract_links(self, html_content: str, base_url: str = None) -> list: |
| """ |
| Extract all links from HTML content. |
| |
| Args: |
| html_content: Raw HTML content |
| base_url: Base URL to resolve relative links |
| |
| Returns: |
| List of extracted URLs |
| """ |
| soup = self.parse_html(html_content) |
| links = [] |
| |
| for link in soup.find_all('a', href=True): |
| href = link['href'] |
| if base_url and not href.startswith(('http://', 'https://', 'mailto:', 'tel:')): |
| href = urljoin(base_url, href) |
| links.append(href) |
| |
| return links |
| |
| def save_content(self, content: Union[str, Dict[str, Any], bytes], file_path: str, |
| content_type: str = 'text') -> bool: |
| """ |
| Save content to a file. |
| |
| Args: |
| content: Content to save (string, dictionary, or bytes) |
| file_path: Path where to save the file |
| content_type: Type of content ('text', 'json', 'html', 'pdf', 'binary') |
| |
| Returns: |
| True if successful, False otherwise |
| """ |
| try: |
| |
| os.makedirs(os.path.dirname(file_path), exist_ok=True) |
| |
| if content_type.lower() == 'json': |
| with open(file_path, 'w', encoding='utf-8') as f: |
| json.dump(content, f, indent=2, ensure_ascii=False) |
| elif content_type.lower() in ['pdf', 'binary'] or isinstance(content, bytes): |
| with open(file_path, 'wb') as f: |
| if isinstance(content, bytes): |
| f.write(content) |
| else: |
| f.write(str(content).encode('utf-8')) |
| else: |
| with open(file_path, 'w', encoding='utf-8') as f: |
| f.write(str(content)) |
| |
| return True |
| |
| except Exception as e: |
| print(f"Error saving content to {file_path}: {e}") |
| return False |
| |
| def get_page_info(self, url: str) -> Dict[str, Any]: |
| """ |
| Get basic information about a webpage. |
| |
| Args: |
| url: URL to analyze |
| |
| Returns: |
| Dictionary containing page information |
| """ |
| try: |
| response = self.request(url) |
| soup = self.parse_html(response.text) |
| |
| |
| info = { |
| 'url': url, |
| 'status_code': response.status_code, |
| 'title': soup.title.string if soup.title else '', |
| 'content_type': response.headers.get('content-type', ''), |
| 'content_length': len(response.text), |
| 'links_count': len(soup.find_all('a', href=True)), |
| 'images_count': len(soup.find_all('img')), |
| } |
| |
| |
| meta_desc = soup.find('meta', attrs={'name': 'description'}) |
| if meta_desc: |
| info['description'] = meta_desc.get('content', '') |
| |
| return info |
| |
| except Exception as e: |
| return {'error': str(e), 'url': url} |
| |
| def request_and_process(self, url: str, method: str = 'GET', headers: Optional[Dict[str, str]] = None, |
| params: Optional[Dict[str, Any]] = None, data: Optional[Dict[str, Any]] = None, |
| json_data: Optional[Dict[str, Any]] = None, return_raw: bool = False, |
| save_file_path: Optional[str] = None) -> Dict[str, Any]: |
| """ |
| Make a request and process the response with comprehensive error handling. |
| |
| Args: |
| url: The URL to request |
| method: HTTP method (GET, POST, PUT, DELETE, etc.) |
| headers: Additional headers to include |
| params: URL parameters |
| data: Form data to send |
| json_data: JSON data to send |
| return_raw: If True, return raw HTML content, otherwise processed text |
| save_file_path: Optional path to save the content |
| |
| Returns: |
| Dictionary containing processed response data |
| """ |
| try: |
| response = self.request( |
| url=url, |
| method=method, |
| headers=headers, |
| params=params, |
| data=data, |
| json_data=json_data |
| ) |
| |
| |
| content_type = response.headers.get('content-type', '').lower() |
| |
| result = { |
| 'url': url, |
| 'method': method.upper(), |
| 'status_code': response.status_code, |
| 'success': True, |
| 'content_type': content_type, |
| 'content_length': len(response.text), |
| 'headers': dict(response.headers) |
| } |
| |
| |
| if return_raw: |
| result['content'] = response.text |
| else: |
| if 'json' in content_type: |
| try: |
| result['content'] = response.json() |
| except json.JSONDecodeError: |
| result['content'] = response.text |
| result['warning'] = 'Content-Type indicates JSON but parsing failed' |
| else: |
| result['content'] = self.extract_text(response.text) |
| |
| |
| if save_file_path: |
| save_success = self._save_response_content(response, save_file_path, content_type) |
| result['saved_to_file'] = save_file_path if save_success else None |
| if not save_success: |
| result['save_warning'] = f'Failed to save content to {save_file_path}' |
| |
| return result |
| |
| except Exception as e: |
| return { |
| 'url': url, |
| 'method': method.upper(), |
| 'error': str(e), |
| 'success': False |
| } |
| |
| def _save_response_content(self, response: requests.Response, file_path: str, content_type: str) -> bool: |
| """ |
| Save response content to file with appropriate format. |
| |
| Args: |
| response: The response object |
| file_path: Path to save the file |
| content_type: Content type of the response |
| |
| Returns: |
| True if successful, False otherwise |
| """ |
| try: |
| |
| os.makedirs(os.path.dirname(file_path), exist_ok=True) |
| |
| if 'json' in content_type: |
| try: |
| json_content = response.json() |
| return self.save_content(json_content, file_path, 'json') |
| except json.JSONDecodeError: |
| return self.save_content(response.text, file_path, 'text') |
| elif 'html' in content_type: |
| return self.save_content(response.text, file_path, 'html') |
| else: |
| return self.save_content(response.text, file_path, 'text') |
| |
| except Exception as e: |
| print(f"Error saving response content: {e}") |
| return False |
| |
| def close(self): |
| """Close the session.""" |
| self.session.close() |