| from langchain_core.tools import tool
|
| from typing import Dict, Any
|
| import base64
|
| import binascii
|
| import re
|
| from .base_tool import Tool
|
|
|
| class DecoderTool(Tool):
|
| """Decode Base64 and Hex encoded strings commonly used to hide malicious commands"""
|
|
|
| def name(self) -> str:
|
| return "decoder"
|
|
|
| def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
|
| try:
|
| encoded_string = input_data.get("encoded_string", "")
|
| encoding_type = input_data.get("encoding_type", "auto")
|
|
|
| if not encoded_string:
|
| return {"error": "No encoded string provided"}
|
|
|
|
|
| if encoding_type == "auto":
|
| encoding_type = self._detect_encoding(encoded_string)
|
|
|
|
|
| decoded_text, success = self._decode_string(encoded_string, encoding_type)
|
|
|
| if not success:
|
| return {
|
| "tool": "decoder",
|
| "encoded_string": encoded_string[:100] + "..." if len(encoded_string) > 100 else encoded_string,
|
| "encoding_detected": encoding_type,
|
| "decoded_text": None,
|
| "success": False,
|
| "error": "Failed to decode - invalid encoding or corrupted data"
|
| }
|
|
|
|
|
| threat_analysis = self._analyze_decoded_content(decoded_text)
|
|
|
| return {
|
| "tool": "decoder",
|
| "encoded_string": encoded_string[:100] + "..." if len(encoded_string) > 100 else encoded_string,
|
| "encoding_detected": encoding_type,
|
| "decoded_text": decoded_text,
|
| "success": True,
|
| "threat_analysis": threat_analysis
|
| }
|
|
|
| except Exception as e:
|
| return {"error": f"{type(e).__name__}: {str(e)}"}
|
|
|
| def _detect_encoding(self, string: str) -> str:
|
| """Auto-detect if string is base64 or hex"""
|
|
|
| clean_string = string.strip()
|
|
|
|
|
| if re.match(r'^[0-9A-Fa-f]+$', clean_string) and len(clean_string) % 2 == 0:
|
|
|
|
|
| if len(clean_string) > 10:
|
| return "hex"
|
|
|
|
|
|
|
| if re.match(r'^[A-Za-z0-9+/]+=*$', clean_string):
|
| return "base64"
|
|
|
|
|
| return "base64"
|
|
|
| def _decode_string(self, encoded_string: str, encoding_type: str) -> tuple:
|
| """Decode string and return (decoded_text, success)"""
|
| try:
|
| if encoding_type == "base64":
|
| return self._decode_base64(encoded_string)
|
| elif encoding_type == "hex":
|
| return self._decode_hex(encoded_string)
|
| else:
|
| return None, False
|
| except Exception as e:
|
| return None, False
|
|
|
| def _decode_base64(self, encoded_string: str) -> tuple:
|
| """Decode base64 string, trying multiple character encodings"""
|
| try:
|
|
|
| clean_string = encoded_string.strip()
|
|
|
|
|
| decoded_bytes = base64.b64decode(clean_string)
|
|
|
|
|
| encodings = ['utf-16le', 'utf-16be', 'utf-8', 'ascii', 'latin-1']
|
|
|
| for encoding in encodings:
|
| try:
|
| decoded_text = decoded_bytes.decode(encoding)
|
|
|
| decoded_text = decoded_text.replace('\x00', '')
|
|
|
| if decoded_text.strip():
|
| return decoded_text, True
|
| except (UnicodeDecodeError, AttributeError):
|
| continue
|
|
|
|
|
| return decoded_bytes.hex(), True
|
|
|
| except Exception as e:
|
| return None, False
|
|
|
| def _decode_hex(self, encoded_string: str) -> tuple:
|
| """Decode hex string"""
|
| try:
|
| clean_string = encoded_string.strip()
|
| decoded_bytes = bytes.fromhex(clean_string)
|
|
|
|
|
| encodings = ['utf-8', 'utf-16le', 'ascii', 'latin-1']
|
|
|
| for encoding in encodings:
|
| try:
|
| decoded_text = decoded_bytes.decode(encoding)
|
| decoded_text = decoded_text.replace('\x00', '')
|
| if decoded_text.strip():
|
| return decoded_text, True
|
| except (UnicodeDecodeError, AttributeError):
|
| continue
|
|
|
| return None, False
|
|
|
| except Exception as e:
|
| return None, False
|
|
|
| def _analyze_decoded_content(self, decoded_text: str) -> Dict[str, Any]:
|
| """Analyze decoded content for malicious patterns"""
|
| if not decoded_text:
|
| return {
|
| "is_suspicious": False,
|
| "threat_level": "UNKNOWN",
|
| "indicators": [],
|
| "attack_techniques": []
|
| }
|
|
|
| decoded_lower = decoded_text.lower()
|
| indicators = []
|
| attack_techniques = []
|
|
|
|
|
| powershell_patterns = {
|
| "iex": "Invoke-Expression - executes arbitrary code",
|
| "invoke-expression": "Executes arbitrary PowerShell code",
|
| "invoke-command": "Remote command execution",
|
| "invoke-webrequest": "Downloads content from internet",
|
| "downloadstring": "Downloads and executes remote code",
|
| "downloadfile": "Downloads file from internet",
|
| "webclient": "Network client for downloading content",
|
| "net.webclient": "Network client object",
|
| "bitstransfer": "Background file transfer (potential data exfiltration)",
|
| "start-bitstransfer": "BITS transfer for file download"
|
| }
|
|
|
|
|
| evasion_patterns = {
|
| "-nop": "NoProfile flag - avoids loading profile scripts",
|
| "-noprofile": "Skips PowerShell profile loading",
|
| "-w hidden": "Hidden window - runs invisibly",
|
| "-windowstyle hidden": "Hides PowerShell window",
|
| "-ep bypass": "Execution policy bypass",
|
| "-executionpolicy bypass": "Disables script execution restrictions",
|
| "-enc": "Encoded command (nested encoding)",
|
| "-encodedcommand": "Base64 encoded command",
|
| "frombase64string": "Additional decoding layer"
|
| }
|
|
|
|
|
| credential_patterns = {
|
| "mimikatz": "Credential dumping tool",
|
| "invoke-mimikatz": "PowerShell wrapper for Mimikatz",
|
| "get-credential": "Prompts for credentials",
|
| "convertto-securestring": "Password manipulation",
|
| "sekurlsa": "Mimikatz module for credential extraction",
|
| "lsadump": "LSA secrets dumping",
|
| "password": "Potential credential theft",
|
| "sam": "Security Account Manager access"
|
| }
|
|
|
|
|
| persistence_patterns = {
|
| "schtasks": "Scheduled task creation",
|
| "new-scheduledtask": "Creates scheduled task for persistence",
|
| "register-scheduledtask": "Registers scheduled task",
|
| "startup": "Startup folder modification",
|
| "registry": "Registry modification",
|
| "wmi": "WMI-based persistence",
|
| "new-service": "Service creation"
|
| }
|
|
|
|
|
| lateral_patterns = {
|
| "psexec": "Remote execution tool",
|
| "winrm": "Windows Remote Management",
|
| "invoke-command -computername": "Remote command execution",
|
| "enter-pssession": "Interactive remote session",
|
| "wmic": "WMI command-line tool"
|
| }
|
|
|
|
|
| c2_patterns = {
|
| "http://": "HTTP connection (potential C2)",
|
| "https://": "HTTPS connection (potential C2)",
|
| "://": "URL connection",
|
| "tcp": "TCP network connection",
|
| "socket": "Network socket creation",
|
| "getstream": "Network stream (potential C2 channel)"
|
| }
|
|
|
|
|
| exfil_patterns = {
|
| "compress-archive": "File compression before exfiltration",
|
| "out-file": "Writing to file (staging for exfiltration)",
|
| "set-content": "File creation/modification",
|
| "send-mailmessage": "Email-based exfiltration",
|
| "ftp": "FTP transfer",
|
| "post": "HTTP POST (potential data upload)"
|
| }
|
|
|
|
|
| all_patterns = [
|
| (powershell_patterns, "execution"),
|
| (evasion_patterns, "defense_evasion"),
|
| (credential_patterns, "credential_access"),
|
| (persistence_patterns, "persistence"),
|
| (lateral_patterns, "lateral_movement"),
|
| (c2_patterns, "command_and_control"),
|
| (exfil_patterns, "exfiltration")
|
| ]
|
|
|
| for pattern_dict, technique in all_patterns:
|
| for pattern, description in pattern_dict.items():
|
| if pattern in decoded_lower:
|
| indicators.append(description)
|
| if technique not in attack_techniques:
|
| attack_techniques.append(technique)
|
|
|
|
|
| threat_level = self._calculate_threat_level(len(indicators), attack_techniques)
|
|
|
|
|
| threat_summary = self._generate_threat_summary(decoded_text, indicators, attack_techniques)
|
|
|
| return {
|
| "is_suspicious": len(indicators) > 0,
|
| "threat_level": threat_level,
|
| "indicators": indicators[:10],
|
| "indicator_count": len(indicators),
|
| "attack_techniques": attack_techniques,
|
| "threat_summary": threat_summary
|
| }
|
|
|
| def _calculate_threat_level(self, indicator_count: int, attack_techniques: list) -> str:
|
| """Calculate threat level based on indicators and techniques"""
|
| if indicator_count == 0:
|
| return "LOW"
|
|
|
|
|
| high_risk = ["credential_access", "command_and_control", "exfiltration"]
|
| has_high_risk = any(tech in attack_techniques for tech in high_risk)
|
|
|
| if has_high_risk or indicator_count >= 5:
|
| return "CRITICAL"
|
| elif indicator_count >= 3:
|
| return "HIGH"
|
| elif indicator_count >= 1:
|
| return "MEDIUM"
|
| else:
|
| return "LOW"
|
|
|
| def _generate_threat_summary(self, decoded_text: str, indicators: list, attack_techniques: list) -> str:
|
| """Generate human-readable threat summary"""
|
| if not indicators:
|
| return "No suspicious patterns detected in decoded content"
|
|
|
| summary_parts = []
|
|
|
|
|
| if len(indicators) == 1:
|
| summary_parts.append(f"Found 1 suspicious indicator: {indicators[0]}")
|
| else:
|
| summary_parts.append(f"Found {len(indicators)} suspicious indicators including: {indicators[0]}")
|
|
|
|
|
| if attack_techniques:
|
| technique_names = {
|
| "execution": "arbitrary code execution",
|
| "defense_evasion": "defense evasion",
|
| "credential_access": "credential theft",
|
| "persistence": "persistence mechanisms",
|
| "lateral_movement": "lateral movement",
|
| "command_and_control": "C2 communication",
|
| "exfiltration": "data exfiltration"
|
| }
|
|
|
| readable_techniques = [technique_names.get(t, t) for t in attack_techniques[:3]]
|
|
|
| if len(readable_techniques) == 1:
|
| summary_parts.append(f"Indicates {readable_techniques[0]}.")
|
| else:
|
| summary_parts.append(f"Indicates {', '.join(readable_techniques[:-1])} and {readable_techniques[-1]}.")
|
|
|
|
|
| preview = decoded_text[:100].strip()
|
| if len(decoded_text) > 100:
|
| preview += "..."
|
| summary_parts.append(f"Command preview: {preview}")
|
|
|
| return " ".join(summary_parts)
|
|
|
|
|
|
|
| _decoder_tool = DecoderTool()
|
|
|
| @tool
|
| def decoder(encoded_string: str, encoding_type: str = "auto") -> dict:
|
| """Decodes Base64 or hex-encoded strings commonly used to hide malicious commands.
|
|
|
| Use this tool when you see:
|
| - PowerShell with -enc, -e, or -encodedcommand flags
|
| - Long strings of random-looking characters (A-Z, a-z, 0-9, +, /, =)
|
| - Commands that look obfuscated or unreadable
|
| - Hex strings (0-9, A-F only) in unusual contexts
|
|
|
| The tool automatically detects encoding type, decodes the string, and analyzes it for
|
| malicious patterns including code execution, credential theft, C2 communication, and more.
|
|
|
| Args:
|
| encoded_string: The encoded string to decode (can be base64 or hex)
|
| encoding_type: Type of encoding - "auto", "base64", or "hex" (default: "auto")
|
|
|
| Returns:
|
| Decoded content with detailed threat analysis including indicators, attack techniques,
|
| and threat level assessment.
|
|
|
| Examples:
|
| - decoder("cG93ZXJzaGVsbC5leGU=") → decodes PowerShell commands
|
| - decoder("496e766f6b652d576562526571756573742068747470733a2f2f6576696c2e636f6d", "hex")
|
| """
|
| return _decoder_tool.run({
|
| "encoded_string": encoded_string,
|
| "encoding_type": encoding_type
|
| }) |