| """ |
| HelpScout Summary Agent |
| Generates a page-level summary report from filtered HelpScout conversations. |
| Analyses the already-extracted SUMMARY fields to surface patterns and insights |
| beyond the pre-tagged topics / sentiments. |
| """ |
| import json |
| import sys |
| from pathlib import Path |
| from typing import Any, Dict |
|
|
| import pandas as pd |
|
|
| |
| _parent = Path(__file__).resolve().parent.parent |
| if str(_parent) not in sys.path: |
| sys.path.insert(0, str(_parent)) |
|
|
| from agents.base_agent import BaseVisualizationAgent |
| from utils.llm_helper import LLMHelper |
| from utils.helpscout_utils import topic_label, load_topic_taxonomy |
|
|
|
|
| class HelpScoutSummaryAgent(BaseVisualizationAgent): |
| """ |
| Produces an executive summary report from a filtered set of HelpScout |
| conversations by reading their SUMMARY fields through an LLM. |
| """ |
|
|
| MAX_SUMMARY_CHARS = 250 |
|
|
| def __init__(self, model: str = "gpt-5-nano", temperature: float = 1, |
| max_conversations: int = 300): |
| super().__init__(name="HelpScoutSummaryAgent", model=model, temperature=temperature) |
| self.llm_helper = LLMHelper(model=model, temperature=temperature) |
| self.max_conversations = max_conversations |
| self.taxonomy = load_topic_taxonomy() |
|
|
| |
| |
| |
|
|
| def validate_input(self, input_data: Dict[str, Any]) -> bool: |
| if "conversations" not in input_data: |
| self.log_processing("Missing 'conversations' key", level="error") |
| return False |
| if not isinstance(input_data["conversations"], pd.DataFrame): |
| self.log_processing("'conversations' must be a DataFrame", level="error") |
| return False |
| if "summary" not in input_data["conversations"].columns: |
| self.log_processing("DataFrame must contain a 'summary' column", level="error") |
| return False |
| return True |
|
|
| def process(self, input_data: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Generate an aggregate summary report from filtered HelpScout conversations. |
| |
| Args: |
| input_data: { |
| 'conversations': pd.DataFrame (must have 'summary' column), |
| 'filter_description': str (human-readable applied filters), |
| 'max_conversations': int (optional; overrides instance default), |
| } |
| |
| Returns: |
| { |
| 'success': bool, |
| 'summary': { |
| 'executive_summary': str, |
| 'top_themes': [{'theme': str, 'description': str, 'prevalence': str}], |
| 'top_complaints': [str], |
| 'unexpected_insights': [str], |
| 'recommended_actions': [{'priority': str, 'action': str, 'rationale': str}], |
| 'notable_quotes': [str], |
| }, |
| 'metadata': { |
| 'total_conversations_analyzed': int, |
| 'model_used': str, |
| 'tokens_used': int, |
| 'filter_applied': str, |
| }, |
| 'error': str | None, |
| } |
| """ |
| try: |
| if not self.validate_input(input_data): |
| return {"success": False, "error": "Invalid input data", "summary": None} |
|
|
| df = input_data["conversations"] |
| filter_desc = input_data.get("filter_description", "No filters applied") |
| max_convs = input_data.get("max_conversations", self.max_conversations) |
|
|
| total_available = len(df) |
|
|
| if total_available == 0: |
| return self._empty_result(filter_desc) |
|
|
| |
| df_sample = self._stratified_sample(df, max_convs) |
| n_analyzed = len(df_sample) |
|
|
| self.log_processing( |
| f"Analysing {n_analyzed} of {total_available} conversations" |
| f" (filter: {filter_desc[:60]})" |
| ) |
|
|
| |
| agg_context = self._build_aggregate_context(df_sample, df) |
| prompt = self._build_prompt(agg_context, filter_desc, n_analyzed) |
|
|
| system_msg = ( |
| "You are an expert customer support analyst for Musora, " |
| "a music education platform (Drumeo, Pianote, Guitareo, Singeo, PlayBass). " |
| "Your role is to synthesize customer support conversation summaries " |
| "and surface actionable insights that go beyond simple tagging." |
| ) |
|
|
| response = self.llm_helper.get_structured_completion( |
| prompt=prompt, |
| system_message=system_msg, |
| max_retries=3, |
| ) |
|
|
| if not response["success"]: |
| return self.handle_error( |
| Exception(response.get("error", "LLM call failed")), |
| context=f"filter={filter_desc[:60]}" |
| ) |
|
|
| summary = response["content"] |
| summary = self._ensure_defaults(summary) |
|
|
| return { |
| "success": True, |
| "summary": summary, |
| "metadata": { |
| "total_conversations_analyzed": n_analyzed, |
| "total_available": total_available, |
| "model_used": response["model"], |
| "tokens_used": response["usage"]["total_tokens"], |
| "filter_applied": filter_desc, |
| }, |
| "error": None, |
| } |
|
|
| except Exception as e: |
| return self.handle_error(e, context=input_data.get("filter_description", "")) |
|
|
| |
| |
| |
|
|
| def _stratified_sample(self, df: pd.DataFrame, cap: int) -> pd.DataFrame: |
| """Stratified sample by sentiment to keep signal diversity.""" |
| if len(df) <= cap: |
| return df |
| try: |
| strat_col = "sentiment_polarity" |
| if strat_col in df.columns and df[strat_col].nunique() > 1: |
| |
| groups = df.groupby(strat_col, group_keys=False) |
| sampled = groups.apply( |
| lambda g: g.sample( |
| n=max(1, int(cap * len(g) / len(df))), |
| random_state=42, |
| ) |
| ) |
| return sampled.head(cap) |
| except Exception: |
| pass |
| return df.sample(n=cap, random_state=42) |
|
|
| def _build_aggregate_context(self, df_sample: pd.DataFrame, |
| df_full: pd.DataFrame) -> str: |
| """Build a text block with aggregate stats + conversation summaries.""" |
| total = len(df_full) |
| n_sample = len(df_sample) |
|
|
| |
| stats = [] |
| if "sentiment_polarity" in df_full.columns: |
| sent_counts = df_full["sentiment_polarity"].value_counts() |
| sent_pct = (sent_counts / total * 100).round(1) |
| stats.append("Sentiment breakdown: " + |
| ", ".join(f"{s} {pct}%" for s, pct in sent_pct.items())) |
|
|
| if "topics" in df_full.columns: |
| from utils.helpscout_utils import explode_topics |
| exploded = explode_topics(df_full) |
| if not exploded.empty: |
| top_topics = exploded["topic_id"].value_counts().head(8) |
| topic_strs = [f"{topic_label(t, self.taxonomy)} ({c})" for t, c in top_topics.items()] |
| stats.append("Top topics: " + ", ".join(topic_strs)) |
|
|
| from utils.helpscout_utils import boolean_flag_counts |
| flags = boolean_flag_counts(df_full) |
| flag_parts = [] |
| if flags["is_refund_request"]: |
| flag_parts.append(f"Refund requests: {flags['is_refund_request']}") |
| if flags["is_cancellation"]: |
| flag_parts.append(f"Cancellations: {flags['is_cancellation']}") |
| if flags["is_membership"]: |
| flag_parts.append(f"Membership joins: {flags['is_membership']}") |
| if flag_parts: |
| stats.append(", ".join(flag_parts)) |
|
|
| if "duration_hours" in df_full.columns: |
| avg_dur = df_full["duration_hours"].mean() |
| stats.append(f"Average conversation duration: {avg_dur:.1f} hours") |
|
|
| stats_block = "\n".join(stats) |
|
|
| |
| summaries = [] |
| for i, row in enumerate(df_sample.itertuples(), 1): |
| s = getattr(row, "summary", None) or "" |
| s = str(s).strip() |
| if s: |
| s = s[:self.MAX_SUMMARY_CHARS] + ("…" if len(s) > self.MAX_SUMMARY_CHARS else "") |
| sent = getattr(row, "sentiment_polarity", "") |
| summaries.append(f"[{i}] ({sent}) {s}") |
|
|
| summaries_block = "\n".join(summaries) if summaries else "No summaries available." |
|
|
| note = (f"Note: Showing {n_sample} of {total} matched conversations." |
| if n_sample < total else f"Showing all {total} matched conversations.") |
|
|
| return f"""=== AGGREGATE STATISTICS === |
| {stats_block} |
| {note} |
| |
| === CONVERSATION SUMMARIES === |
| {summaries_block}""" |
|
|
| def _build_prompt(self, context: str, filter_desc: str, |
| n_analyzed: int) -> str: |
| return f"""Analyze the following {n_analyzed} HelpScout customer support conversation summaries for Musora. |
| |
| Applied filters: {filter_desc} |
| |
| {context} |
| |
| Your task: Synthesize these conversations and produce insights that go BEYOND the pre-extracted tags. |
| Look for underlying patterns, recurring pain points, emotional signals, product gaps, and operational issues |
| that would not be obvious from simple topic counts alone. |
| |
| Respond in JSON with this exact structure: |
| {{ |
| "executive_summary": "3-5 sentence high-level synthesis of what customers are experiencing", |
| "top_themes": [ |
| {{ |
| "theme": "Short theme name (not a topic tag)", |
| "description": "What customers are actually saying and feeling about this", |
| "prevalence": "Rough estimate: e.g. 'Appears in ~30% of conversations'" |
| }} |
| ], |
| "top_complaints": [ |
| "Specific actionable complaint statement (not generic)" |
| ], |
| "unexpected_insights": [ |
| "A pattern, contradiction, or insight that would surprise a product manager" |
| ], |
| "notable_quotes": [ |
| "Paraphrased quote or representative statement from conversations (not verbatim)" |
| ] |
| }} |
| |
| Guidelines: |
| - Top themes: 5-8 items, each distinct from pre-extracted topics |
| - Top complaints: 5-8 bullet points, specific and actionable |
| - Unexpected insights: 3-5 items, must genuinely go beyond the tag taxonomy |
| - Notable quotes: 3-5 representative paraphrases |
| - If a section has fewer relevant items, use fewer — quality over quantity |
| """ |
|
|
| @staticmethod |
| def _ensure_defaults(summary: dict) -> dict: |
| defaults = { |
| "executive_summary": "", |
| "top_themes": [], |
| "top_complaints": [], |
| "unexpected_insights": [], |
| "notable_quotes": [], |
| } |
| for k, v in defaults.items(): |
| if k not in summary: |
| summary[k] = v |
| return summary |
|
|
| def _empty_result(self, filter_desc: str) -> dict: |
| return { |
| "success": True, |
| "summary": { |
| "executive_summary": "No conversations matched the selected filters.", |
| "top_themes": [], |
| "top_complaints": [], |
| "unexpected_insights": [], |
| "notable_quotes": [], |
| }, |
| "metadata": { |
| "total_conversations_analyzed": 0, |
| "total_available": 0, |
| "model_used": self.model, |
| "tokens_used": 0, |
| "filter_applied": filter_desc, |
| }, |
| "error": None, |
| } |