| """ |
| Performance analytics for agent execution and trajectory analysis. |
| |
| Provides comprehensive metrics, statistics, and visualizations for observability data. |
| """ |
| import logging |
| from typing import List, Dict, Any, Optional |
| from datetime import datetime, timedelta |
| from collections import defaultdict |
| import statistics |
|
|
| from pydantic import BaseModel, Field |
| from observability.trace_reader import TraceReader, TraceInfo, SpanInfo, GenerationInfo |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class AgentStats(BaseModel): |
| """Statistics for a single agent.""" |
| agent_name: str |
| execution_count: int |
| avg_latency_ms: float |
| p50_latency_ms: float |
| p95_latency_ms: float |
| p99_latency_ms: float |
| min_latency_ms: float |
| max_latency_ms: float |
| success_rate: float |
| total_cost: float |
| avg_input_tokens: float |
| avg_output_tokens: float |
|
|
|
|
| class WorkflowStats(BaseModel): |
| """Statistics for entire workflow execution.""" |
| total_runs: int |
| avg_duration_ms: float |
| p50_duration_ms: float |
| p95_duration_ms: float |
| p99_duration_ms: float |
| success_rate: float |
| total_cost: float |
| avg_cost_per_run: float |
| total_tokens: int |
| avg_tokens_per_run: float |
|
|
|
|
| class AgentTrajectory(BaseModel): |
| """Trajectory of agent execution within a workflow.""" |
| trace_id: str |
| session_id: Optional[str] |
| start_time: datetime |
| total_duration_ms: float |
| agent_sequence: List[str] = Field(default_factory=list) |
| agent_timings: Dict[str, float] = Field(default_factory=dict) |
| agent_costs: Dict[str, float] = Field(default_factory=dict) |
| errors: List[str] = Field(default_factory=list) |
| success: bool = True |
|
|
|
|
| class AgentPerformanceAnalyzer: |
| """ |
| Analyze agent performance metrics from LangFuse traces. |
| |
| Usage: |
| analyzer = AgentPerformanceAnalyzer() |
| stats = analyzer.agent_latency_stats("retriever_agent", days=7) |
| cost_breakdown = analyzer.cost_per_agent(session_id="session-123") |
| error_rates = analyzer.error_rates(days=30) |
| """ |
|
|
| def __init__(self, trace_reader: Optional[TraceReader] = None): |
| """ |
| Initialize performance analyzer. |
| |
| Args: |
| trace_reader: Optional TraceReader instance (creates new if None) |
| """ |
| self.trace_reader = trace_reader or TraceReader() |
| logger.info("AgentPerformanceAnalyzer initialized") |
|
|
| def agent_latency_stats( |
| self, |
| agent_name: str, |
| days: int = 7, |
| limit: int = 1000, |
| ) -> Optional[AgentStats]: |
| """ |
| Calculate latency statistics for a specific agent. |
| |
| Args: |
| agent_name: Name of the agent |
| days: Number of days to analyze |
| limit: Maximum number of spans to analyze |
| |
| Returns: |
| AgentStats object or None if no data |
| """ |
| from_date = datetime.now() - timedelta(days=days) |
|
|
| spans = self.trace_reader.filter_by_agent( |
| agent_name=agent_name, |
| limit=limit, |
| from_timestamp=from_date, |
| ) |
|
|
| if not spans: |
| logger.warning(f"No data found for agent '{agent_name}'") |
| return None |
|
|
| |
| latencies = [s.duration_ms for s in spans if s.duration_ms is not None] |
|
|
| if not latencies: |
| logger.warning(f"No latency data for agent '{agent_name}'") |
| return None |
|
|
| |
| latencies_sorted = sorted(latencies) |
| n = len(latencies_sorted) |
|
|
| stats = AgentStats( |
| agent_name=agent_name, |
| execution_count=len(spans), |
| avg_latency_ms=statistics.mean(latencies), |
| p50_latency_ms=latencies_sorted[int(n * 0.50)] if n > 0 else 0, |
| p95_latency_ms=latencies_sorted[int(n * 0.95)] if n > 1 else 0, |
| p99_latency_ms=latencies_sorted[int(n * 0.99)] if n > 1 else 0, |
| min_latency_ms=min(latencies), |
| max_latency_ms=max(latencies), |
| success_rate=self._calculate_success_rate(spans), |
| total_cost=0.0, |
| avg_input_tokens=0.0, |
| avg_output_tokens=0.0, |
| ) |
|
|
| logger.info(f"Calculated stats for '{agent_name}': avg={stats.avg_latency_ms:.2f}ms, " |
| f"p95={stats.p95_latency_ms:.2f}ms") |
| return stats |
|
|
| def token_usage_breakdown( |
| self, |
| session_id: Optional[str] = None, |
| days: int = 7, |
| limit: int = 100, |
| ) -> Dict[str, Dict[str, int]]: |
| """ |
| Get token usage breakdown by agent. |
| |
| Args: |
| session_id: Optional session ID filter |
| days: Number of days to analyze |
| limit: Maximum number of traces |
| |
| Returns: |
| Dictionary mapping agent names to token usage |
| """ |
| from_date = datetime.now() - timedelta(days=days) |
|
|
| traces = self.trace_reader.get_traces( |
| limit=limit, |
| session_id=session_id, |
| from_timestamp=from_date, |
| ) |
|
|
| if not traces: |
| logger.warning("No traces found for token usage analysis") |
| return {} |
|
|
| |
| usage_by_agent = defaultdict(lambda: {"input": 0, "output": 0, "total": 0}) |
|
|
| for trace in traces: |
| |
| generations = self.trace_reader.get_generations(trace_id=trace.id) |
|
|
| for gen in generations: |
| agent_name = gen.name |
| usage_by_agent[agent_name]["input"] += gen.usage.get("input", 0) |
| usage_by_agent[agent_name]["output"] += gen.usage.get("output", 0) |
| usage_by_agent[agent_name]["total"] += gen.usage.get("total", 0) |
|
|
| logger.info(f"Token usage breakdown calculated for {len(usage_by_agent)} agents") |
| return dict(usage_by_agent) |
|
|
| def cost_per_agent( |
| self, |
| session_id: Optional[str] = None, |
| days: int = 7, |
| limit: int = 100, |
| ) -> Dict[str, float]: |
| """ |
| Calculate cost breakdown per agent. |
| |
| Args: |
| session_id: Optional session ID filter |
| days: Number of days to analyze |
| limit: Maximum number of traces |
| |
| Returns: |
| Dictionary mapping agent names to total cost |
| """ |
| from_date = datetime.now() - timedelta(days=days) |
|
|
| traces = self.trace_reader.get_traces( |
| limit=limit, |
| session_id=session_id, |
| from_timestamp=from_date, |
| ) |
|
|
| if not traces: |
| logger.warning("No traces found for cost analysis") |
| return {} |
|
|
| |
| cost_by_agent = defaultdict(float) |
|
|
| for trace in traces: |
| generations = self.trace_reader.get_generations(trace_id=trace.id) |
|
|
| for gen in generations: |
| agent_name = gen.name |
| cost = gen.cost or 0.0 |
| cost_by_agent[agent_name] += cost |
|
|
| logger.info(f"Cost breakdown calculated for {len(cost_by_agent)} agents") |
| return dict(cost_by_agent) |
|
|
| def error_rates( |
| self, |
| days: int = 7, |
| limit: int = 200, |
| ) -> Dict[str, Dict[str, Any]]: |
| """ |
| Calculate error rates per agent. |
| |
| Args: |
| days: Number of days to analyze |
| limit: Maximum number of spans per agent |
| |
| Returns: |
| Dictionary with error rates and counts per agent |
| """ |
| from_date = datetime.now() - timedelta(days=days) |
|
|
| agent_names = [ |
| "retriever_agent", |
| "analyzer_agent", |
| "synthesis_agent", |
| "citation_agent", |
| ] |
|
|
| error_stats = {} |
|
|
| for agent_name in agent_names: |
| spans = self.trace_reader.filter_by_agent( |
| agent_name=agent_name, |
| limit=limit, |
| from_timestamp=from_date, |
| ) |
|
|
| if not spans: |
| continue |
|
|
| total = len(spans) |
| errors = sum(1 for s in spans if s.level == "ERROR" or "error" in s.metadata) |
| error_rate = (errors / total) * 100 if total > 0 else 0 |
|
|
| error_stats[agent_name] = { |
| "total_executions": total, |
| "errors": errors, |
| "error_rate_percent": error_rate, |
| "success_rate_percent": 100 - error_rate, |
| } |
|
|
| logger.info(f"Error rates calculated for {len(error_stats)} agents") |
| return error_stats |
|
|
| def workflow_performance_summary( |
| self, |
| days: int = 7, |
| limit: int = 100, |
| ) -> Optional[WorkflowStats]: |
| """ |
| Generate workflow-level performance summary. |
| |
| Args: |
| days: Number of days to analyze |
| limit: Maximum number of workflow runs |
| |
| Returns: |
| WorkflowStats object or None if no data |
| """ |
| from_date = datetime.now() - timedelta(days=days) |
|
|
| traces = self.trace_reader.get_traces( |
| limit=limit, |
| from_timestamp=from_date, |
| ) |
|
|
| if not traces: |
| logger.warning("No workflow traces found") |
| return None |
|
|
| |
| durations = [t.duration_ms for t in traces if t.duration_ms is not None] |
| costs = [t.total_cost for t in traces if t.total_cost is not None] |
| total_tokens = sum(t.token_usage.get("total", 0) for t in traces) |
|
|
| if not durations: |
| logger.warning("No duration data for workflows") |
| return None |
|
|
| durations_sorted = sorted(durations) |
| n = len(durations_sorted) |
|
|
| stats = WorkflowStats( |
| total_runs=len(traces), |
| avg_duration_ms=statistics.mean(durations), |
| p50_duration_ms=durations_sorted[int(n * 0.50)] if n > 0 else 0, |
| p95_duration_ms=durations_sorted[int(n * 0.95)] if n > 1 else 0, |
| p99_duration_ms=durations_sorted[int(n * 0.99)] if n > 1 else 0, |
| success_rate=self._calculate_trace_success_rate(traces), |
| total_cost=sum(costs) if costs else 0.0, |
| avg_cost_per_run=statistics.mean(costs) if costs else 0.0, |
| total_tokens=total_tokens, |
| avg_tokens_per_run=total_tokens / len(traces) if traces else 0, |
| ) |
|
|
| logger.info(f"Workflow summary: {stats.total_runs} runs, " |
| f"avg={stats.avg_duration_ms:.2f}ms, cost=${stats.total_cost:.4f}") |
| return stats |
|
|
| def _calculate_success_rate(self, spans: List[SpanInfo]) -> float: |
| """Calculate success rate from spans.""" |
| if not spans: |
| return 0.0 |
|
|
| successes = sum(1 for s in spans if s.level != "ERROR" and "error" not in s.metadata) |
| return (successes / len(spans)) * 100 |
|
|
| def _calculate_trace_success_rate(self, traces: List[TraceInfo]) -> float: |
| """Calculate success rate from traces.""" |
| if not traces: |
| return 0.0 |
|
|
| successes = sum(1 for t in traces if not t.metadata.get("error")) |
| return (successes / len(traces)) * 100 |
|
|
|
|
| class AgentTrajectoryAnalyzer: |
| """ |
| Analyze agent execution trajectories and workflow paths. |
| |
| Usage: |
| analyzer = AgentTrajectoryAnalyzer() |
| trajectories = analyzer.get_trajectories(session_id="session-123") |
| path_analysis = analyzer.analyze_execution_paths(days=7) |
| """ |
|
|
| def __init__(self, trace_reader: Optional[TraceReader] = None): |
| """ |
| Initialize trajectory analyzer. |
| |
| Args: |
| trace_reader: Optional TraceReader instance |
| """ |
| self.trace_reader = trace_reader or TraceReader() |
| logger.info("AgentTrajectoryAnalyzer initialized") |
|
|
| def get_trajectories( |
| self, |
| session_id: Optional[str] = None, |
| days: int = 7, |
| limit: int = 50, |
| ) -> List[AgentTrajectory]: |
| """ |
| Get agent execution trajectories for workflows. |
| |
| Args: |
| session_id: Optional session ID filter |
| days: Number of days to analyze |
| limit: Maximum number of workflows |
| |
| Returns: |
| List of AgentTrajectory objects |
| """ |
| from_date = datetime.now() - timedelta(days=days) |
|
|
| traces = self.trace_reader.get_traces( |
| limit=limit, |
| session_id=session_id, |
| from_timestamp=from_date, |
| ) |
|
|
| trajectories = [] |
|
|
| for trace in traces: |
| trajectory = self._build_trajectory(trace) |
| trajectories.append(trajectory) |
|
|
| logger.info(f"Retrieved {len(trajectories)} agent trajectories") |
| return trajectories |
|
|
| def analyze_execution_paths( |
| self, |
| days: int = 7, |
| limit: int = 100, |
| ) -> Dict[str, Any]: |
| """ |
| Analyze common execution paths and patterns. |
| |
| Args: |
| days: Number of days to analyze |
| limit: Maximum number of workflows |
| |
| Returns: |
| Dictionary with path analysis |
| """ |
| trajectories = self.get_trajectories(days=days, limit=limit) |
|
|
| if not trajectories: |
| logger.warning("No trajectories found for path analysis") |
| return {} |
|
|
| |
| path_counts = defaultdict(int) |
| for trajectory in trajectories: |
| path = " → ".join(trajectory.agent_sequence) |
| path_counts[path] += 1 |
|
|
| |
| sorted_paths = sorted(path_counts.items(), key=lambda x: x[1], reverse=True) |
|
|
| analysis = { |
| "total_workflows": len(trajectories), |
| "unique_paths": len(path_counts), |
| "most_common_path": sorted_paths[0] if sorted_paths else None, |
| "path_distribution": dict(sorted_paths[:10]), |
| "avg_agents_per_workflow": statistics.mean([len(t.agent_sequence) for t in trajectories]), |
| } |
|
|
| logger.info(f"Path analysis: {analysis['unique_paths']} unique paths from {analysis['total_workflows']} workflows") |
| return analysis |
|
|
| def compare_trajectories( |
| self, |
| trace_id_1: str, |
| trace_id_2: str, |
| ) -> Dict[str, Any]: |
| """ |
| Compare two workflow trajectories. |
| |
| Args: |
| trace_id_1: First trace ID |
| trace_id_2: Second trace ID |
| |
| Returns: |
| Comparison dictionary |
| """ |
| trace1 = self.trace_reader.get_trace_by_id(trace_id_1) |
| trace2 = self.trace_reader.get_trace_by_id(trace_id_2) |
|
|
| if not trace1 or not trace2: |
| logger.error("One or both traces not found") |
| return {} |
|
|
| traj1 = self._build_trajectory(trace1) |
| traj2 = self._build_trajectory(trace2) |
|
|
| comparison = { |
| "trace_1": { |
| "id": trace_id_1, |
| "duration_ms": traj1.total_duration_ms, |
| "agents": traj1.agent_sequence, |
| "success": traj1.success, |
| }, |
| "trace_2": { |
| "id": trace_id_2, |
| "duration_ms": traj2.total_duration_ms, |
| "agents": traj2.agent_sequence, |
| "success": traj2.success, |
| }, |
| "duration_diff_ms": traj2.total_duration_ms - traj1.total_duration_ms, |
| "duration_diff_percent": ((traj2.total_duration_ms - traj1.total_duration_ms) / traj1.total_duration_ms) * 100 if traj1.total_duration_ms > 0 else 0, |
| "same_path": traj1.agent_sequence == traj2.agent_sequence, |
| } |
|
|
| logger.info(f"Compared trajectories: {trace_id_1} vs {trace_id_2}") |
| return comparison |
|
|
| def _build_trajectory(self, trace: TraceInfo) -> AgentTrajectory: |
| """Build agent trajectory from trace.""" |
| |
| |
| trajectory = AgentTrajectory( |
| trace_id=trace.id, |
| session_id=trace.session_id, |
| start_time=trace.timestamp, |
| total_duration_ms=trace.duration_ms or 0.0, |
| agent_sequence=[], |
| agent_timings={}, |
| agent_costs={}, |
| errors=[], |
| success=not trace.metadata.get("error"), |
| ) |
|
|
| |
| |
| if trace.output: |
| trajectory.success = True |
|
|
| return trajectory |
|
|