import requests import os from crewai.tools import BaseTool from typing import Type, List from pydantic import BaseModel, Field import streamlit as st # --- 1. IPInfo Geo Lookup Tool --- class IPInfoToolInput(BaseModel): """Input schema for IPInfo Geo Lookup Tool.""" target: List[str] = Field( ..., description="A list of IP addresses (e.g., ['8.8.8.8', '1.1.1.1']) to query." ) class IPInfoGeoLookup(BaseTool): # See https://ipinfo.io/developers/lite-api name: str = "IPInfo Geo Lookup" description: str = "Looks up geolocation, Internet Service Provider (ISP), and network details for an IP address. Useful for determining the geographic location of a digital asset." args_schema: Type[BaseModel] = IPInfoToolInput def _run(self, target: List[str]) -> str: api_key = st.session_state['ipinfo_api_key'] if not api_key: return "Error: La variable de entorno IPINFO_APIKEY no está configurada." results = [] for ip_address in target: try: url = f"https://api.ipinfo.io/lite/{ip_address}?token={api_key}" response = requests.get(url) response.raise_for_status() data = response.json() # Formateamos la salida para que sea legible y útil para el agente result_str = ( f"IP: {data.get('ip')}, Country: {data.get('country')}, ASN: {data.get('as_name')}" ) results.append(result_str) except requests.exceptions.RequestException as e: results.append(f"Error al consultar la IP {ip_address}: {e}") return "\n".join(results) # --- 2. VirusTotal IP Report --- class VirusTotalIPReportInput(BaseModel): """Input schema for VirusTotal IP Report Tool.""" ip_address: str = Field( ..., description="The IP address to scan. Valid resources include IPv4 or IPv6 address." ) class VirusTotalIPReport(BaseTool): # See https://blog.virustotal.com/2024/08/VT-S1-EffectiveResearch.html # See https://docs.virustotal.com/reference/ip-info name: str = "VirusTotal IP Report" description: str = "Analyzes an IP address for malware and other security threats using multiple antivirus engines and reputation services. Provides a detailed security analysis report." args_schema: Type[BaseModel] = VirusTotalIPReportInput def _run(self, ip_address: str) -> str: api_key = os.getenv("VT_APIKEY") report = [] if not api_key: return "Tool error: The VT_APIKEY environment variable is not set." try: url = f"https://www.virustotal.com/api/v3/ip_addresses/{ip_address}" headers = {"x-apikey": api_key} response = requests.get(url, headers=headers) response.raise_for_status() data = response.json() # Formateamos la salida para que sea legible y útil para el agente result_str = ( f"IP: {data.get('data', {}).get('id')}", f"Country: {data.get('data', {}).get('attributes', {}).get('country')}", f"ASN Owner: {data.get('data', {}).get('attributes', {}).get('as_owner')}", f"Reputation: {data.get('data', {}).get('attributes', {}).get('reputation')}", f"Last Analysis Date: {data.get('data', {}).get('attributes', {}).get('last_analysis_date')}", f"Malicious Detections: {data.get('data', {}).get('attributes', {}).get('last_analysis_stats', {}).get('malicious')}", f"Harmless Detections: {data.get('data', {}).get('attributes', {}).get('last_analysis_stats', {}).get('harmless')}", f"Suspicious Detections: {data.get('data', {}).get('attributes', {}).get('last_analysis_stats', {}).get('suspicious')}" ) return "\n".join(result_str) except requests.exceptions.RequestException as e: return f"Error al consultar la IP {ip_address}: {e}"