Spaces:
Running
Running
| import requests | |
| import os | |
| from crewai.tools import BaseTool | |
| from typing import Type, List | |
| from pydantic import BaseModel, Field | |
| import streamlit as st | |
| # --- 1. IPInfo Geo Lookup Tool --- | |
| class IPInfoToolInput(BaseModel): | |
| """Input schema for IPInfo Geo Lookup Tool.""" | |
| target: List[str] = Field( | |
| ..., | |
| description="A list of IP addresses (e.g., ['8.8.8.8', '1.1.1.1']) to query." | |
| ) | |
| class IPInfoGeoLookup(BaseTool): | |
| # See https://ipinfo.io/developers/lite-api | |
| name: str = "IPInfo Geo Lookup" | |
| description: str = "Looks up geolocation, Internet Service Provider (ISP), and network details for an IP address. Useful for determining the geographic location of a digital asset." | |
| args_schema: Type[BaseModel] = IPInfoToolInput | |
| def _run(self, target: List[str]) -> str: | |
| api_key = st.session_state['ipinfo_api_key'] | |
| if not api_key: | |
| return "Error: La variable de entorno IPINFO_APIKEY no está configurada." | |
| results = [] | |
| for ip_address in target: | |
| try: | |
| url = f"https://api.ipinfo.io/lite/{ip_address}?token={api_key}" | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| data = response.json() | |
| # Formateamos la salida para que sea legible y útil para el agente | |
| result_str = ( | |
| f"IP: {data.get('ip')}, Country: {data.get('country')}, ASN: {data.get('as_name')}" | |
| ) | |
| results.append(result_str) | |
| except requests.exceptions.RequestException as e: | |
| results.append(f"Error al consultar la IP {ip_address}: {e}") | |
| return "\n".join(results) | |
| # --- 2. VirusTotal IP Report --- | |
| class VirusTotalIPReportInput(BaseModel): | |
| """Input schema for VirusTotal IP Report Tool.""" | |
| ip_address: str = Field( | |
| ..., | |
| description="The IP address to scan. Valid resources include IPv4 or IPv6 address." | |
| ) | |
| class VirusTotalIPReport(BaseTool): | |
| # See https://blog.virustotal.com/2024/08/VT-S1-EffectiveResearch.html | |
| # See https://docs.virustotal.com/reference/ip-info | |
| name: str = "VirusTotal IP Report" | |
| description: str = "Analyzes an IP address for malware and other security threats using multiple antivirus engines and reputation services. Provides a detailed security analysis report." | |
| args_schema: Type[BaseModel] = VirusTotalIPReportInput | |
| def _run(self, ip_address: str) -> str: | |
| api_key = os.getenv("VT_APIKEY") | |
| report = [] | |
| if not api_key: | |
| return "Tool error: The VT_APIKEY environment variable is not set." | |
| try: | |
| url = f"https://www.virustotal.com/api/v3/ip_addresses/{ip_address}" | |
| headers = {"x-apikey": api_key} | |
| response = requests.get(url, headers=headers) | |
| response.raise_for_status() | |
| data = response.json() | |
| # Formateamos la salida para que sea legible y útil para el agente | |
| result_str = ( | |
| f"IP: {data.get('data', {}).get('id')}", | |
| f"Country: {data.get('data', {}).get('attributes', {}).get('country')}", | |
| f"ASN Owner: {data.get('data', {}).get('attributes', {}).get('as_owner')}", | |
| f"Reputation: {data.get('data', {}).get('attributes', {}).get('reputation')}", | |
| f"Last Analysis Date: {data.get('data', {}).get('attributes', {}).get('last_analysis_date')}", | |
| f"Malicious Detections: {data.get('data', {}).get('attributes', {}).get('last_analysis_stats', {}).get('malicious')}", | |
| f"Harmless Detections: {data.get('data', {}).get('attributes', {}).get('last_analysis_stats', {}).get('harmless')}", | |
| f"Suspicious Detections: {data.get('data', {}).get('attributes', {}).get('last_analysis_stats', {}).get('suspicious')}" | |
| ) | |
| return "\n".join(result_str) | |
| except requests.exceptions.RequestException as e: | |
| return f"Error al consultar la IP {ip_address}: {e}" | |