generador-post-mortem / src /custom_tools.py
vvillarreal-cfee's picture
feat(agent): Agregamos soporte para VirusTotal IP report.
1383685 verified
import requests
import os
from crewai.tools import BaseTool
from typing import Type, List
from pydantic import BaseModel, Field
import streamlit as st
# --- 1. IPInfo Geo Lookup Tool ---
class IPInfoToolInput(BaseModel):
"""Input schema for IPInfo Geo Lookup Tool."""
target: List[str] = Field(
...,
description="A list of IP addresses (e.g., ['8.8.8.8', '1.1.1.1']) to query."
)
class IPInfoGeoLookup(BaseTool):
# See https://ipinfo.io/developers/lite-api
name: str = "IPInfo Geo Lookup"
description: str = "Looks up geolocation, Internet Service Provider (ISP), and network details for an IP address. Useful for determining the geographic location of a digital asset."
args_schema: Type[BaseModel] = IPInfoToolInput
def _run(self, target: List[str]) -> str:
api_key = st.session_state['ipinfo_api_key']
if not api_key:
return "Error: La variable de entorno IPINFO_APIKEY no está configurada."
results = []
for ip_address in target:
try:
url = f"https://api.ipinfo.io/lite/{ip_address}?token={api_key}"
response = requests.get(url)
response.raise_for_status()
data = response.json()
# Formateamos la salida para que sea legible y útil para el agente
result_str = (
f"IP: {data.get('ip')}, Country: {data.get('country')}, ASN: {data.get('as_name')}"
)
results.append(result_str)
except requests.exceptions.RequestException as e:
results.append(f"Error al consultar la IP {ip_address}: {e}")
return "\n".join(results)
# --- 2. VirusTotal IP Report ---
class VirusTotalIPReportInput(BaseModel):
"""Input schema for VirusTotal IP Report Tool."""
ip_address: str = Field(
...,
description="The IP address to scan. Valid resources include IPv4 or IPv6 address."
)
class VirusTotalIPReport(BaseTool):
# See https://blog.virustotal.com/2024/08/VT-S1-EffectiveResearch.html
# See https://docs.virustotal.com/reference/ip-info
name: str = "VirusTotal IP Report"
description: str = "Analyzes an IP address for malware and other security threats using multiple antivirus engines and reputation services. Provides a detailed security analysis report."
args_schema: Type[BaseModel] = VirusTotalIPReportInput
def _run(self, ip_address: str) -> str:
api_key = os.getenv("VT_APIKEY")
report = []
if not api_key:
return "Tool error: The VT_APIKEY environment variable is not set."
try:
url = f"https://www.virustotal.com/api/v3/ip_addresses/{ip_address}"
headers = {"x-apikey": api_key}
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
# Formateamos la salida para que sea legible y útil para el agente
result_str = (
f"IP: {data.get('data', {}).get('id')}",
f"Country: {data.get('data', {}).get('attributes', {}).get('country')}",
f"ASN Owner: {data.get('data', {}).get('attributes', {}).get('as_owner')}",
f"Reputation: {data.get('data', {}).get('attributes', {}).get('reputation')}",
f"Last Analysis Date: {data.get('data', {}).get('attributes', {}).get('last_analysis_date')}",
f"Malicious Detections: {data.get('data', {}).get('attributes', {}).get('last_analysis_stats', {}).get('malicious')}",
f"Harmless Detections: {data.get('data', {}).get('attributes', {}).get('last_analysis_stats', {}).get('harmless')}",
f"Suspicious Detections: {data.get('data', {}).get('attributes', {}).get('last_analysis_stats', {}).get('suspicious')}"
)
return "\n".join(result_str)
except requests.exceptions.RequestException as e:
return f"Error al consultar la IP {ip_address}: {e}"