| # HelpScout Processing Pipeline |
|
|
| Extracts, cleans, and enriches customer support conversations from HelpScout. |
| The module has two distinct responsibilities: |
|
|
| 1. **Data export** (`fetch_and_export.py`) — fetches raw threads, cleans HTML, and exports CSVs for the Streamlit dashboard. |
| 2. **AI processing pipeline** (`main.py`) — fetches the same conversations, runs them through a two-step agentic workflow (sentiment + topic extraction), and writes enriched records to Snowflake. |
|
|
| --- |
|
|
| ## Folder Structure |
|
|
| ``` |
| process_helpscout/ |
| │ |
| ├── main.py # Pipeline entry point (parallel processing) |
| ├── data_fetcher.py # Fetches & aggregates conversations; deduplication check |
| ├── fetch_and_export.py # CSV export script (separate from the pipeline) |
| ├── html_cleaner.py # HTML → clean plain text (shared by both workflows) |
| ├── snowflake_conn.py # Snowflake connection wrapper |
| │ |
| ├── agents/ # LLM-based extraction agents |
| │ ├── README.md # Agent architecture docs (read this to extend) |
| │ ├── base_agent.py # Abstract base class for all agents |
| │ ├── sentiment_analysis_agent.py # Classifies sentiment polarity + emotions |
| │ └── topic_extraction_agent.py # Assigns topic tags + billing flags |
| │ |
| ├── workflow/ |
| │ └── conversation_processor.py # LangGraph workflow: sentiment → topics → END |
| │ |
| ├── config_files/ |
| │ ├── processing_config.json # Agent models, batch settings, output table, sentiment categories |
| │ └── topics.json # HelpScout topic taxonomy (source of truth for topic extraction) |
| │ |
| ├── queries/ |
| │ └── helpscout_conversations.sql # SQL that fetches customer threads from Snowflake |
| │ |
| ├── sql/ |
| │ └── create_features_table.sql # DDL — run once before first pipeline execution |
| │ |
| ├── output/ # Auto-created; holds CSV exports |
| │ ├── helpscout_threads.csv |
| │ └── helpscout_conversations.csv |
| │ |
| └── visualization/ # Streamlit dashboard (reads from CSV exports) |
| ├── app.py |
| ├── components/dashboard.py |
| └── utils/data_processor.py |
| ``` |
|
|
| --- |
|
|
| ## Data Flow |
|
|
| ### CSV Export (Dashboard) |
|
|
| ``` |
| Snowflake (STITCH.HELPSCOUT.CONVERSATION_THREADS) |
| │ queries/helpscout_conversations.sql |
| ▼ |
| fetch_and_export.py |
| │ process_threads() — clean HTML, add word_count, date columns |
| │ aggregate_conversations() — one row per conversation_id |
| ▼ |
| output/helpscout_threads.csv (one row per message thread) |
| output/helpscout_conversations.csv (one row per conversation) |
| │ |
| ▼ |
| visualization/app.py → Streamlit dashboard |
| ``` |
|
|
| ### AI Processing Pipeline |
|
|
| ``` |
| Snowflake (STITCH.HELPSCOUT.CONVERSATION_THREADS) |
| │ Same SQL — customer threads only, Feb 17 2026+ |
| ▼ |
| data_fetcher.fetch_conversations() |
| │ Cleans HTML (html_cleaner.py) |
| │ Aggregates to one row per conversation |
| │ Checks HELPSCOUT_CONVERSATION_FEATURES for already-processed IDs |
| ▼ |
| main.py — splits into parallel batches |
| │ |
| ├── Worker 1: ConversationProcessingWorkflow |
| │ ├── Node 1: SentimentAnalysisAgent → polarity + emotions |
| │ └── Node 2: TopicExtractionAgent → topics + billing flags |
| │ |
| ├── Worker 2: ... (same) |
| └── Worker N: ... (same) |
| │ |
| ▼ |
| SOCIAL_MEDIA_DB.ML_FEATURES.HELPSCOUT_CONVERSATION_FEATURES |
| ``` |
|
|
| --- |
|
|
| ## Setup |
|
|
| ### 1. Environment variables |
|
|
| All credentials are read from the project root `.env` file. |
|
|
| | Key | Description | |
| |-----|-------------| |
| | `SNOWFLAKE_USER` | Snowflake username | |
| | `SNOWFLAKE_PASSWORD` | Snowflake password | |
| | `SNOWFLAKE_ACCOUNT` | Snowflake account identifier | |
| | `SNOWFLAKE_ROLE` | Role with access to `STITCH`, `ESTUARY`, and `SOCIAL_MEDIA_DB` | |
| | `SNOWFLAKE_WAREHOUSE` | Compute warehouse | |
| | `OPENAI_API_KEY` | Required for the AI pipeline only | |
|
|
| ### 2. Dependencies |
|
|
| All dependencies are in the project root `requirements.txt`: |
| - `snowflake-snowpark-python` |
| - `beautifulsoup4` |
| - `pandas`, `numpy` |
| - `langchain-openai`, `langgraph` |
| - `python-dotenv` |
| - `streamlit`, `plotly` (dashboard only) |
|
|
| ### 3. Create the output table (once) |
|
|
| Before running the pipeline for the first time, execute the DDL in Snowflake: |
|
|
| ```sql |
| -- Run this in your Snowflake worksheet or via the Snowflake CLI |
| -- File: sql/create_features_table.sql |
| ``` |
|
|
| This creates `SOCIAL_MEDIA_DB.ML_FEATURES.HELPSCOUT_CONVERSATION_FEATURES` with a primary key on `CONVERSATION_ID`. The pipeline always appends — it never truncates the table. |
|
|
| --- |
|
|
| ## Usage |
|
|
| ### Run the AI processing pipeline |
|
|
| ```bash |
| cd process_helpscout |
| |
| # Process all new conversations (parallel, recommended) |
| python main.py |
| |
| # Limit to 100 conversations — useful for a first test run |
| python main.py --limit 100 |
| |
| # Sequential mode — single process, easier to read logs when debugging |
| python main.py --sequential |
| |
| # Use a custom config file |
| python main.py --config /path/to/my_config.json |
| ``` |
|
|
| On every run the pipeline: |
| 1. Fetches all conversations (from Feb 17 2026 to today) |
| 2. Queries the output table for already-processed `CONVERSATION_ID`s |
| 3. Skips those — only new conversations are sent to the LLM |
| 4. Appends results to the Snowflake output table |
|
|
| ### Run the CSV export (dashboard data) |
|
|
| ```bash |
| cd process_helpscout |
| python fetch_and_export.py |
| ``` |
|
|
| ### Launch the Streamlit dashboard |
|
|
| ```bash |
| cd process_helpscout |
| streamlit run visualization/app.py |
| ``` |
|
|
| --- |
|
|
| ## Output Table |
|
|
| **Table:** `SOCIAL_MEDIA_DB.ML_FEATURES.HELPSCOUT_CONVERSATION_FEATURES` |
|
|
| | Column | Type | Description | |
| |--------|------|-------------| |
| | `CONVERSATION_ID` | VARCHAR | HelpScout conversation ID (primary key) | |
| | `CUSTOMER_EMAIL` | VARCHAR | Customer email address | |
| | `CUSTOMER_FIRST` | VARCHAR | Customer first name | |
| | `CUSTOMER_LAST` | VARCHAR | Customer last name | |
| | `CUSTOMER_HS_ID` | NUMBER | HelpScout internal customer ID | |
| | `THREAD_COUNT` | NUMBER | Number of customer message threads | |
| | `FIRST_MESSAGE_AT` | TIMESTAMP_TZ | When the first customer message was sent | |
| | `LAST_MESSAGE_AT` | TIMESTAMP_TZ | When the last customer message was sent | |
| | `DURATION_HOURS` | FLOAT | Hours between first and last message | |
| | `STATUS` | VARCHAR | Last known HelpScout status | |
| | `STATE` | VARCHAR | Last known HelpScout state | |
| | `SOURCE_TYPE` | VARCHAR | e.g. `email`, `chat` | |
| | `SOURCE_VIA` | VARCHAR | e.g. `api`, `mailbox` | |
| | `COMBINED_TEXT` | TEXT | Raw aggregated customer messages | |
| | `CONVERSATION_TEXT_USED` | TEXT | Formatted + truncated text sent to the LLM | |
| | `SENTIMENT_POLARITY` | VARCHAR | `very_positive` / `positive` / `neutral` / `negative` / `very_negative` | |
| | `EMOTIONS` | VARCHAR | Comma-separated emotion values (NULL if none valid) | |
| | `SENTIMENT_CONFIDENCE` | VARCHAR | `high` / `medium` / `low` | |
| | `SENTIMENT_NOTES` | TEXT | 1-2 sentence LLM explanation of the sentiment | |
| | `TOPICS` | VARCHAR | Comma-separated topic IDs (multi-label) | |
| | `IS_REFUND_REQUEST` | BOOLEAN | Customer explicitly asked for a refund | |
| | `IS_CANCELLATION` | BOOLEAN | Customer explicitly wants to cancel | |
| | `IS_MEMBERSHIP` | BOOLEAN | Customer wants to join/rejoin and purchase membership | |
| | `TOPIC_CONFIDENCE` | VARCHAR | `high` / `medium` / `low` | |
| | `TOPIC_NOTES` | TEXT | 1-2 sentence LLM explanation of topics | |
| | `SUMMARY` | TEXT | 2-3 sentence neutral summary of the conversation | |
| | `PROCESSING_ERRORS` | TEXT | Semicolon-separated errors (NULL on full success) | |
| | `PROCESSED_AT` | TIMESTAMP_NTZ | When this record was written by the pipeline | |
| | `WORKFLOW_VERSION` | VARCHAR | Pipeline version for auditability | |
|
|
| --- |
|
|
| ## Configuration |
|
|
| All pipeline settings live in `config_files/processing_config.json`. |
|
|
| ### Agent models |
|
|
| ```json |
| "agents": { |
| "sentiment_analysis": { |
| "model": "gpt-4o-mini", |
| "temperature": 0.2, |
| "max_retries": 3 |
| }, |
| "topic_extraction": { |
| "model": "gpt-4o-mini", |
| "temperature": 0.2, |
| "max_retries": 3 |
| } |
| } |
| ``` |
|
|
| Switch any agent to `gpt-4o` for higher accuracy (at higher cost) by changing the `"model"` value. |
|
|
| ### Conversation length |
|
|
| ```json |
| "processing": { |
| "max_conversation_chars": 3000, |
| "min_batch_size": 10, |
| "max_batch_size": 50 |
| } |
| ``` |
|
|
| `max_conversation_chars` controls how many characters of conversation text are sent to the LLM. Increasing this improves context for long conversations but raises token costs. The workflow formats messages as `[1] msg\n[2] msg…` and truncates at this limit. |
|
|
| ### Output destination |
|
|
| ```json |
| "output": { |
| "database": "SOCIAL_MEDIA_DB", |
| "schema": "ML_FEATURES", |
| "table": "HELPSCOUT_CONVERSATION_FEATURES" |
| } |
| ``` |
|
|
| To write to a different table (e.g. a staging or test table), change these values and re-run the DDL in `sql/create_features_table.sql` for the new table name. |
|
|
| ### Sentiment categories |
|
|
| The `sentiment_polarity` and `emotions` blocks in `processing_config.json` define the valid values for classification. Adding, removing, or renaming a category here is automatically reflected in both the LLM prompt and the output validation — no code changes required. |
|
|
| ### Topic taxonomy |
|
|
| Topic definitions live in `config_files/topics.json`. This file is the single source of truth: the `TopicExtractionAgent` builds its system prompt directly from it. To add a new topic: |
|
|
| 1. Add an entry to the `"topics"` array with a unique `id`, `label`, and `description`. |
| 2. If the topic has boolean sub-flags (like billing), add a `"flags"` key — then update `topic_extraction_agent.py` to extract those flags. |
| 3. Re-run the pipeline — the new topic will be available immediately. |
|
|
| --- |
|
|
| ## SQL Query |
|
|
| **File:** `queries/helpscout_conversations.sql` |
|
|
| | Design decision | Detail | |
| |-----------------|--------| |
| | Date filter | `CREATED_AT >= '2026-02-17'` to current date | |
| | Team exclusion | Anti-join with `USORA_USERS WHERE access_level = 'team'` — only customer messages reach the pipeline | |
| | Thread types | `TYPE IN ('customer', 'message')` — excludes notes, forwarded threads, system messages | |
| | JSON extraction | Snowflake semi-structured syntax: `COLUMN:field::VARCHAR` | |
|
|
| To change the date range, edit the `WHERE ct.CREATED_AT >= '...'` line in the SQL file. |
|
|
| --- |
|
|
| ## HTML Cleaner |
|
|
| `html_cleaner.py` runs a four-stage pipeline on every message body: |
|
|
| | Stage | What it removes | |
| |-------|----------------| |
| | `_remove_quoted_sections()` | `<blockquote>` tags and Gmail/Outlook/Yahoo quoted-reply CSS wrappers | |
| | `_remove_boilerplate()` | `<table>`, `<img>`, `<script>`, `<style>` tags and footer/unsubscribe blocks | |
| | `_extract_text()` | Extracts plain text while preserving line breaks | |
| | `_clean_text()` | Strips invisible Unicode, collapses whitespace, removes `>` quote lines, cuts off at "On … wrote:" markers | |
|
|
| To add a new boilerplate pattern, append a string to `footer_keywords` inside `_remove_boilerplate()`, or add a CSS class fragment to `_QUOTED_CLASS_PATTERNS` at the top of the file. |
|
|
| --- |
|
|
| ## Extending the Pipeline |
|
|
| ### Add a third agentic step |
|
|
| 1. Create `agents/your_new_agent.py` inheriting from `BaseAgent` (see `agents/README.md`). |
| 2. Add a new node method `_your_node()` in `workflow/conversation_processor.py`. |
| 3. Add the node and a new edge in `_build_workflow()`: |
| ```python |
| graph.add_node("your_step", self._your_node) |
| graph.add_edge("topic_extraction", "your_step") |
| graph.add_edge("your_step", END) |
| ``` |
| 4. Add the corresponding output fields to `ConversationState`. |
| 5. Map new columns in `main.py`'s `column_map` dict and add them to the DDL. |
|
|
| ### Change the date range |
|
|
| Edit `queries/helpscout_conversations.sql`: |
| ```sql |
| ct.CREATED_AT >= '2026-02-17 00:00:00' -- ← change start date |
| ``` |
|
|
| ### Include team replies |
|
|
| Remove the anti-join in `helpscout_conversations.sql` and broaden `TYPE` to include `'note'` and `'message'`. Be sure to update the HTML cleaning and aggregation if team messages need different handling. |
|
|
| ### Process a different HelpScout mailbox |
|
|
| Add a `WHERE` clause on a mailbox ID column if available, or filter by `source_via` / `status`. |
|
|
| ### Automate daily runs |
|
|
| Schedule `main.py` with a cron job, Airflow DAG, or any task scheduler. Because the pipeline skips already-processed conversations, re-running it daily processes only new conversations — no manual bookkeeping needed. |