| from langchain_core.tools import tool
|
| from typing import Dict, Any, List
|
| from datetime import datetime
|
| import re
|
| import json
|
| from .base_tool import Tool
|
|
|
| class TimelineBuilderTool(Tool):
|
| """Build focused timelines around suspicious events by parsing raw logs directly"""
|
|
|
| def name(self) -> str:
|
| return "timeline_builder"
|
|
|
| def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
|
| try:
|
| pivot_entity = input_data.get("pivot_entity", "")
|
| pivot_type = input_data.get("pivot_type", "")
|
| time_window_minutes = int(input_data.get("time_window_minutes", 5))
|
| raw_logs = input_data.get("raw_logs", "")
|
|
|
| if not pivot_entity or not pivot_type:
|
| return {"error": "Both pivot_entity and pivot_type are required"}
|
|
|
| if not raw_logs:
|
| return {"error": "No raw log data provided"}
|
|
|
|
|
| events = self._parse_raw_logs(raw_logs)
|
|
|
| if not events:
|
| return {
|
| "tool": "timeline_builder",
|
| "pivot_entity": pivot_entity,
|
| "pivot_type": pivot_type,
|
| "result": {
|
| "found": False,
|
| "message": "No events could be parsed from logs",
|
| "timeline": []
|
| }
|
| }
|
|
|
|
|
| pivot_events = self._find_pivot_events(events, pivot_entity, pivot_type)
|
|
|
| if not pivot_events:
|
| return {
|
| "tool": "timeline_builder",
|
| "pivot_entity": pivot_entity,
|
| "pivot_type": pivot_type,
|
| "result": {
|
| "found": False,
|
| "message": f"No events found for {pivot_type}: {pivot_entity}",
|
| "total_events_parsed": len(events),
|
| "timeline": []
|
| }
|
| }
|
|
|
|
|
| pivot_event = pivot_events[0]
|
| timeline = self._build_timeline_around_event(
|
| events, pivot_event, time_window_minutes
|
| )
|
|
|
|
|
| summary = self._generate_timeline_summary(
|
| pivot_event, timeline, len(pivot_events)
|
| )
|
|
|
| return {
|
| "tool": "timeline_builder",
|
| "pivot_entity": pivot_entity,
|
| "pivot_type": pivot_type,
|
| "time_window_minutes": time_window_minutes,
|
| "result": {
|
| "found": True,
|
| "total_pivot_events": len(pivot_events),
|
| "showing_timeline_for": "first pivot event",
|
| "pivot_event_summary": self._summarize_event(pivot_event),
|
| "timeline": timeline,
|
| "summary": summary
|
| }
|
| }
|
|
|
| except Exception as e:
|
| return {"error": f"{type(e).__name__}: {str(e)}"}
|
|
|
| def _parse_raw_logs(self, raw_logs: str) -> List[Dict]:
|
| """Parse events from raw log string"""
|
| events = []
|
|
|
|
|
| for line in raw_logs.split('\n'):
|
| line = line.strip()
|
| if not line:
|
| continue
|
|
|
| try:
|
|
|
| event = json.loads(line)
|
| parsed_event = self._extract_event_data(event)
|
| if parsed_event:
|
| events.append(parsed_event)
|
| except json.JSONDecodeError:
|
|
|
| parsed_event = self._parse_text_log_line(line)
|
| if parsed_event:
|
| events.append(parsed_event)
|
|
|
| return events
|
|
|
| def _extract_event_data(self, event: Dict) -> Dict:
|
| """Extract relevant fields from a JSON event"""
|
| extracted = {
|
| "raw": event,
|
| "timestamp": None,
|
| "event_id": None,
|
| "user": None,
|
| "computer": None,
|
| "process_name": None,
|
| "command_line": None,
|
| "src_ip": None,
|
| "dst_ip": None,
|
| "file_path": None,
|
| "registry_path": None
|
| }
|
|
|
|
|
| for ts_field in ["@timestamp", "timestamp", "TimeCreated", "EventTime"]:
|
| if ts_field in event:
|
| extracted["timestamp"] = self._parse_timestamp(event[ts_field])
|
| break
|
|
|
|
|
| for id_field in ["EventID", "event_id", "event.code", "winlog.event_id"]:
|
| if id_field in event:
|
| extracted["event_id"] = str(event[id_field])
|
| break
|
|
|
|
|
| for user_field in ["User", "user", "user.name", "winlog.user.name", "SubjectUserName"]:
|
| if user_field in event:
|
| extracted["user"] = str(event[user_field])
|
| break
|
|
|
|
|
| for comp_field in ["Computer", "computer", "host.name", "winlog.computer_name"]:
|
| if comp_field in event:
|
| extracted["computer"] = str(event[comp_field])
|
| break
|
|
|
|
|
| for proc_field in ["Image", "process.name", "NewProcessName", "ProcessName"]:
|
| if proc_field in event:
|
| extracted["process_name"] = str(event[proc_field])
|
| break
|
|
|
| for cmd_field in ["CommandLine", "command_line", "process.command_line"]:
|
| if cmd_field in event:
|
| extracted["command_line"] = str(event[cmd_field])
|
| break
|
|
|
|
|
| for src_field in ["SourceIp", "src_ip", "source.ip"]:
|
| if src_field in event:
|
| extracted["src_ip"] = str(event[src_field])
|
| break
|
|
|
| for dst_field in ["DestinationIp", "dst_ip", "destination.ip"]:
|
| if dst_field in event:
|
| extracted["dst_ip"] = str(event[dst_field])
|
| break
|
|
|
|
|
| for file_field in ["TargetFilename", "file.path", "ObjectName"]:
|
| if file_field in event:
|
| extracted["file_path"] = str(event[file_field])
|
| break
|
|
|
|
|
| for reg_field in ["TargetObject", "registry.path"]:
|
| if reg_field in event:
|
| extracted["registry_path"] = str(event[reg_field])
|
| break
|
|
|
| return extracted
|
|
|
| def _parse_text_log_line(self, line: str) -> Dict:
|
| """Parse a plain text log line"""
|
| extracted = {
|
| "raw": {"text": line},
|
| "timestamp": None,
|
| "event_id": None,
|
| "user": None,
|
| "computer": None,
|
| "process_name": None,
|
| "command_line": None,
|
| "src_ip": None,
|
| "dst_ip": None,
|
| "file_path": None,
|
| "registry_path": None
|
| }
|
|
|
|
|
| ts_match = re.search(r'\d{4}-\d{2}-\d{2}[T\s]\d{2}:\d{2}:\d{2}', line)
|
| if ts_match:
|
| extracted["timestamp"] = self._parse_timestamp(ts_match.group(0))
|
|
|
|
|
| event_id_match = re.search(r'EventID[:\s=]+(\d+)', line, re.IGNORECASE)
|
| if event_id_match:
|
| extracted["event_id"] = event_id_match.group(1)
|
|
|
|
|
| ip_matches = re.findall(r'\b(?:\d{1,3}\.){3}\d{1,3}\b', line)
|
| if ip_matches:
|
| extracted["src_ip"] = ip_matches[0]
|
| if len(ip_matches) > 1:
|
| extracted["dst_ip"] = ip_matches[1]
|
|
|
| return extracted
|
|
|
| def _parse_timestamp(self, ts_value) -> datetime:
|
| """Parse timestamp from various formats"""
|
| if isinstance(ts_value, datetime):
|
| return ts_value
|
|
|
| if isinstance(ts_value, (int, float)):
|
| return datetime.fromtimestamp(ts_value)
|
|
|
| ts_str = str(ts_value)
|
|
|
|
|
| formats = [
|
| "%Y-%m-%dT%H:%M:%S.%fZ",
|
| "%Y-%m-%dT%H:%M:%SZ",
|
| "%Y-%m-%d %H:%M:%S.%f",
|
| "%Y-%m-%d %H:%M:%S",
|
| "%Y-%m-%dT%H:%M:%S",
|
| ]
|
|
|
| for fmt in formats:
|
| try:
|
| return datetime.strptime(ts_str, fmt)
|
| except ValueError:
|
| continue
|
|
|
| return None
|
|
|
| def _find_pivot_events(self, events: List[Dict], pivot_entity: str, pivot_type: str) -> List[Dict]:
|
| """Find events that match the pivot criteria"""
|
| pivot_events = []
|
| pivot_entity_lower = pivot_entity.lower()
|
|
|
| for event in events:
|
| if self._event_matches_pivot(event, pivot_entity_lower, pivot_type.lower()):
|
| pivot_events.append(event)
|
|
|
| return pivot_events
|
|
|
| def _event_matches_pivot(self, event: Dict, pivot_entity: str, pivot_type: str) -> bool:
|
| """Check if an event matches the pivot criteria"""
|
| if pivot_type == "user":
|
| user = (event.get('user') or '').lower()
|
| return pivot_entity in user
|
|
|
| elif pivot_type == "process":
|
| process_name = (event.get('process_name') or '').lower()
|
| command_line = (event.get('command_line') or '').lower()
|
| return pivot_entity in process_name or pivot_entity in command_line
|
|
|
| elif pivot_type == "ip":
|
| src_ip = (event.get('src_ip') or '').lower()
|
| dst_ip = (event.get('dst_ip') or '').lower()
|
| return pivot_entity in src_ip or pivot_entity in dst_ip
|
|
|
| elif pivot_type == "file":
|
| file_path = (event.get('file_path') or '').lower()
|
| return pivot_entity in file_path
|
|
|
| elif pivot_type == "computer":
|
| computer = (event.get('computer') or '').lower()
|
| return pivot_entity in computer
|
|
|
| elif pivot_type == "event_id":
|
| event_id = str(event.get('event_id') or '').lower()
|
| return pivot_entity in event_id
|
|
|
| elif pivot_type == "registry":
|
| registry_path = (event.get('registry_path') or '').lower()
|
| return pivot_entity in registry_path
|
|
|
| return False
|
|
|
| def _build_timeline_around_event(self, events: List[Dict], pivot_event: Dict, time_window_minutes: int) -> List[Dict]:
|
| """Build a timeline of events around a pivot event"""
|
| pivot_timestamp = pivot_event.get('timestamp')
|
|
|
|
|
| if not pivot_timestamp:
|
| pivot_idx = events.index(pivot_event)
|
| start_idx = max(0, pivot_idx - 10)
|
| end_idx = min(len(events), pivot_idx + 11)
|
|
|
| timeline = []
|
| for i, event in enumerate(events[start_idx:end_idx]):
|
| timeline.append({
|
| "sequence_position": i,
|
| "is_pivot": event == pivot_event,
|
| "event_summary": self._summarize_event(event)
|
| })
|
| return timeline
|
|
|
|
|
| from datetime import timedelta
|
| start_time = pivot_timestamp - timedelta(minutes=time_window_minutes)
|
| end_time = pivot_timestamp + timedelta(minutes=time_window_minutes)
|
|
|
| timeline = []
|
| for event in events:
|
| event_timestamp = event.get('timestamp')
|
| if event_timestamp and start_time <= event_timestamp <= end_time:
|
| offset_seconds = (event_timestamp - pivot_timestamp).total_seconds()
|
| timeline.append({
|
| "timestamp": event_timestamp.isoformat(),
|
| "offset_seconds": offset_seconds,
|
| "offset_human": self._format_time_offset(offset_seconds),
|
| "is_pivot": event == pivot_event,
|
| "event_summary": self._summarize_event(event)
|
| })
|
|
|
|
|
| timeline.sort(key=lambda x: x.get('timestamp', ''))
|
|
|
| return timeline
|
|
|
| def _format_time_offset(self, offset_seconds: float) -> str:
|
| """Format time offset in human readable form"""
|
| if offset_seconds == 0:
|
| return "PIVOT EVENT"
|
| elif offset_seconds < 0:
|
| return f"{abs(offset_seconds):.1f}s before"
|
| else:
|
| return f"{offset_seconds:.1f}s after"
|
|
|
| def _summarize_event(self, event: Dict) -> str:
|
| """Create a human-readable summary of an event"""
|
| parts = []
|
|
|
| if event.get('event_id'):
|
| parts.append(f"EventID {event['event_id']}")
|
|
|
| if event.get('user'):
|
| parts.append(f"User: {event['user']}")
|
|
|
| if event.get('process_name'):
|
| parts.append(f"Process: {event['process_name']}")
|
|
|
| if event.get('command_line'):
|
| cmd = event['command_line']
|
| if len(cmd) > 60:
|
| cmd = cmd[:57] + "..."
|
| parts.append(f"CMD: {cmd}")
|
|
|
| if event.get('src_ip') or event.get('dst_ip'):
|
| if event.get('src_ip') and event.get('dst_ip'):
|
| parts.append(f"Network: {event['src_ip']} → {event['dst_ip']}")
|
| elif event.get('src_ip'):
|
| parts.append(f"SrcIP: {event['src_ip']}")
|
| elif event.get('dst_ip'):
|
| parts.append(f"DstIP: {event['dst_ip']}")
|
|
|
| if event.get('file_path'):
|
| parts.append(f"File: {event['file_path']}")
|
|
|
| if event.get('registry_path'):
|
| parts.append(f"Registry: {event['registry_path']}")
|
|
|
| return " | ".join(parts) if parts else "No details available"
|
|
|
| def _generate_timeline_summary(self, pivot_event: Dict, timeline: List[Dict], total_pivot_count: int) -> str:
|
| """Generate a human-readable summary of the timeline"""
|
| summary_parts = []
|
|
|
| summary_parts.append(f"Found {total_pivot_count} matching event(s).")
|
| summary_parts.append(f"Timeline shows {len(timeline)} events around the first match.")
|
|
|
|
|
| before = sum(1 for e in timeline if e.get('offset_seconds', 1) < 0)
|
| after = sum(1 for e in timeline if e.get('offset_seconds', 1) > 0)
|
|
|
| if before or after:
|
| summary_parts.append(f"{before} events before, {after} events after pivot.")
|
|
|
|
|
| event_ids = [e['event_summary'] for e in timeline if 'EventID' in e.get('event_summary', '')]
|
| if len(set(event_ids)) < len(event_ids):
|
| summary_parts.append("Multiple similar events detected in sequence.")
|
|
|
| return " ".join(summary_parts)
|
|
|
|
|
|
|
| _timeline_builder_tool = TimelineBuilderTool()
|
|
|
| @tool
|
| def timeline_builder(pivot_entity: str, pivot_type: str, time_window_minutes: int = 5, raw_logs: str = None) -> dict:
|
| """Build a focused timeline around suspicious events to understand attack sequences.
|
|
|
| Use this when you suspect coordinated activity or want to understand what happened
|
| before and after a suspicious event. Analyzes the sequence of events to identify patterns.
|
|
|
| Args:
|
| pivot_entity: The entity to build timeline around (e.g., "powershell.exe", "admin", "192.168.1.100")
|
| pivot_type: Type of entity - "user", "process", "ip", "file", "computer", "event_id", or "registry"
|
| time_window_minutes: Minutes before and after pivot events to include (default: 5)
|
| raw_logs: The raw log data (automatically provided by agent)
|
|
|
| Returns:
|
| Timeline analysis showing events before and after the pivot, helping identify attack sequences.
|
| """
|
| return _timeline_builder_tool.run({
|
| "pivot_entity": pivot_entity,
|
| "pivot_type": pivot_type,
|
| "time_window_minutes": time_window_minutes,
|
| "raw_logs": raw_logs
|
| }) |