Danialebrat's picture
Adding HelpScout to UI
58db664
# HelpScout Processing Pipeline
Extracts, cleans, and enriches customer support conversations from HelpScout.
The module has two distinct responsibilities:
1. **Data export** (`fetch_and_export.py`) — fetches raw threads, cleans HTML, and exports CSVs for the Streamlit dashboard.
2. **AI processing pipeline** (`main.py`) — fetches the same conversations, runs them through a two-step agentic workflow (sentiment + topic extraction), and writes enriched records to Snowflake.
---
## Folder Structure
```
process_helpscout/
├── main.py # Pipeline entry point (parallel processing)
├── data_fetcher.py # Fetches & aggregates conversations; deduplication check
├── fetch_and_export.py # CSV export script (separate from the pipeline)
├── html_cleaner.py # HTML → clean plain text (shared by both workflows)
├── snowflake_conn.py # Snowflake connection wrapper
├── agents/ # LLM-based extraction agents
│ ├── README.md # Agent architecture docs (read this to extend)
│ ├── base_agent.py # Abstract base class for all agents
│ ├── sentiment_analysis_agent.py # Classifies sentiment polarity + emotions
│ └── topic_extraction_agent.py # Assigns topic tags + billing flags
├── workflow/
│ └── conversation_processor.py # LangGraph workflow: sentiment → topics → END
├── config_files/
│ ├── processing_config.json # Agent models, batch settings, output table, sentiment categories
│ └── topics.json # HelpScout topic taxonomy (source of truth for topic extraction)
├── queries/
│ └── helpscout_conversations.sql # SQL that fetches customer threads from Snowflake
├── sql/
│ └── create_features_table.sql # DDL — run once before first pipeline execution
├── output/ # Auto-created; holds CSV exports
│ ├── helpscout_threads.csv
│ └── helpscout_conversations.csv
└── visualization/ # Streamlit dashboard (reads from CSV exports)
├── app.py
├── components/dashboard.py
└── utils/data_processor.py
```
---
## Data Flow
### CSV Export (Dashboard)
```
Snowflake (STITCH.HELPSCOUT.CONVERSATION_THREADS)
│ queries/helpscout_conversations.sql
fetch_and_export.py
│ process_threads() — clean HTML, add word_count, date columns
│ aggregate_conversations() — one row per conversation_id
output/helpscout_threads.csv (one row per message thread)
output/helpscout_conversations.csv (one row per conversation)
visualization/app.py → Streamlit dashboard
```
### AI Processing Pipeline
```
Snowflake (STITCH.HELPSCOUT.CONVERSATION_THREADS)
│ Same SQL — customer threads only, Feb 17 2026+
data_fetcher.fetch_conversations()
│ Cleans HTML (html_cleaner.py)
│ Aggregates to one row per conversation
│ Checks HELPSCOUT_CONVERSATION_FEATURES for already-processed IDs
main.py — splits into parallel batches
├── Worker 1: ConversationProcessingWorkflow
│ ├── Node 1: SentimentAnalysisAgent → polarity + emotions
│ └── Node 2: TopicExtractionAgent → topics + billing flags
├── Worker 2: ... (same)
└── Worker N: ... (same)
SOCIAL_MEDIA_DB.ML_FEATURES.HELPSCOUT_CONVERSATION_FEATURES
```
---
## Setup
### 1. Environment variables
All credentials are read from the project root `.env` file.
| Key | Description |
|-----|-------------|
| `SNOWFLAKE_USER` | Snowflake username |
| `SNOWFLAKE_PASSWORD` | Snowflake password |
| `SNOWFLAKE_ACCOUNT` | Snowflake account identifier |
| `SNOWFLAKE_ROLE` | Role with access to `STITCH`, `ESTUARY`, and `SOCIAL_MEDIA_DB` |
| `SNOWFLAKE_WAREHOUSE` | Compute warehouse |
| `OPENAI_API_KEY` | Required for the AI pipeline only |
### 2. Dependencies
All dependencies are in the project root `requirements.txt`:
- `snowflake-snowpark-python`
- `beautifulsoup4`
- `pandas`, `numpy`
- `langchain-openai`, `langgraph`
- `python-dotenv`
- `streamlit`, `plotly` (dashboard only)
### 3. Create the output table (once)
Before running the pipeline for the first time, execute the DDL in Snowflake:
```sql
-- Run this in your Snowflake worksheet or via the Snowflake CLI
-- File: sql/create_features_table.sql
```
This creates `SOCIAL_MEDIA_DB.ML_FEATURES.HELPSCOUT_CONVERSATION_FEATURES` with a primary key on `CONVERSATION_ID`. The pipeline always appends — it never truncates the table.
---
## Usage
### Run the AI processing pipeline
```bash
cd process_helpscout
# Process all new conversations (parallel, recommended)
python main.py
# Limit to 100 conversations — useful for a first test run
python main.py --limit 100
# Sequential mode — single process, easier to read logs when debugging
python main.py --sequential
# Use a custom config file
python main.py --config /path/to/my_config.json
```
On every run the pipeline:
1. Fetches all conversations (from Feb 17 2026 to today)
2. Queries the output table for already-processed `CONVERSATION_ID`s
3. Skips those — only new conversations are sent to the LLM
4. Appends results to the Snowflake output table
### Run the CSV export (dashboard data)
```bash
cd process_helpscout
python fetch_and_export.py
```
### Launch the Streamlit dashboard
```bash
cd process_helpscout
streamlit run visualization/app.py
```
---
## Output Table
**Table:** `SOCIAL_MEDIA_DB.ML_FEATURES.HELPSCOUT_CONVERSATION_FEATURES`
| Column | Type | Description |
|--------|------|-------------|
| `CONVERSATION_ID` | VARCHAR | HelpScout conversation ID (primary key) |
| `CUSTOMER_EMAIL` | VARCHAR | Customer email address |
| `CUSTOMER_FIRST` | VARCHAR | Customer first name |
| `CUSTOMER_LAST` | VARCHAR | Customer last name |
| `CUSTOMER_HS_ID` | NUMBER | HelpScout internal customer ID |
| `THREAD_COUNT` | NUMBER | Number of customer message threads |
| `FIRST_MESSAGE_AT` | TIMESTAMP_TZ | When the first customer message was sent |
| `LAST_MESSAGE_AT` | TIMESTAMP_TZ | When the last customer message was sent |
| `DURATION_HOURS` | FLOAT | Hours between first and last message |
| `STATUS` | VARCHAR | Last known HelpScout status |
| `STATE` | VARCHAR | Last known HelpScout state |
| `SOURCE_TYPE` | VARCHAR | e.g. `email`, `chat` |
| `SOURCE_VIA` | VARCHAR | e.g. `api`, `mailbox` |
| `COMBINED_TEXT` | TEXT | Raw aggregated customer messages |
| `CONVERSATION_TEXT_USED` | TEXT | Formatted + truncated text sent to the LLM |
| `SENTIMENT_POLARITY` | VARCHAR | `very_positive` / `positive` / `neutral` / `negative` / `very_negative` |
| `EMOTIONS` | VARCHAR | Comma-separated emotion values (NULL if none valid) |
| `SENTIMENT_CONFIDENCE` | VARCHAR | `high` / `medium` / `low` |
| `SENTIMENT_NOTES` | TEXT | 1-2 sentence LLM explanation of the sentiment |
| `TOPICS` | VARCHAR | Comma-separated topic IDs (multi-label) |
| `IS_REFUND_REQUEST` | BOOLEAN | Customer explicitly asked for a refund |
| `IS_CANCELLATION` | BOOLEAN | Customer explicitly wants to cancel |
| `IS_MEMBERSHIP` | BOOLEAN | Customer wants to join/rejoin and purchase membership |
| `TOPIC_CONFIDENCE` | VARCHAR | `high` / `medium` / `low` |
| `TOPIC_NOTES` | TEXT | 1-2 sentence LLM explanation of topics |
| `SUMMARY` | TEXT | 2-3 sentence neutral summary of the conversation |
| `PROCESSING_ERRORS` | TEXT | Semicolon-separated errors (NULL on full success) |
| `PROCESSED_AT` | TIMESTAMP_NTZ | When this record was written by the pipeline |
| `WORKFLOW_VERSION` | VARCHAR | Pipeline version for auditability |
---
## Configuration
All pipeline settings live in `config_files/processing_config.json`.
### Agent models
```json
"agents": {
"sentiment_analysis": {
"model": "gpt-4o-mini",
"temperature": 0.2,
"max_retries": 3
},
"topic_extraction": {
"model": "gpt-4o-mini",
"temperature": 0.2,
"max_retries": 3
}
}
```
Switch any agent to `gpt-4o` for higher accuracy (at higher cost) by changing the `"model"` value.
### Conversation length
```json
"processing": {
"max_conversation_chars": 3000,
"min_batch_size": 10,
"max_batch_size": 50
}
```
`max_conversation_chars` controls how many characters of conversation text are sent to the LLM. Increasing this improves context for long conversations but raises token costs. The workflow formats messages as `[1] msg\n[2] msg…` and truncates at this limit.
### Output destination
```json
"output": {
"database": "SOCIAL_MEDIA_DB",
"schema": "ML_FEATURES",
"table": "HELPSCOUT_CONVERSATION_FEATURES"
}
```
To write to a different table (e.g. a staging or test table), change these values and re-run the DDL in `sql/create_features_table.sql` for the new table name.
### Sentiment categories
The `sentiment_polarity` and `emotions` blocks in `processing_config.json` define the valid values for classification. Adding, removing, or renaming a category here is automatically reflected in both the LLM prompt and the output validation — no code changes required.
### Topic taxonomy
Topic definitions live in `config_files/topics.json`. This file is the single source of truth: the `TopicExtractionAgent` builds its system prompt directly from it. To add a new topic:
1. Add an entry to the `"topics"` array with a unique `id`, `label`, and `description`.
2. If the topic has boolean sub-flags (like billing), add a `"flags"` key — then update `topic_extraction_agent.py` to extract those flags.
3. Re-run the pipeline — the new topic will be available immediately.
---
## SQL Query
**File:** `queries/helpscout_conversations.sql`
| Design decision | Detail |
|-----------------|--------|
| Date filter | `CREATED_AT >= '2026-02-17'` to current date |
| Team exclusion | Anti-join with `USORA_USERS WHERE access_level = 'team'` — only customer messages reach the pipeline |
| Thread types | `TYPE IN ('customer', 'message')` — excludes notes, forwarded threads, system messages |
| JSON extraction | Snowflake semi-structured syntax: `COLUMN:field::VARCHAR` |
To change the date range, edit the `WHERE ct.CREATED_AT >= '...'` line in the SQL file.
---
## HTML Cleaner
`html_cleaner.py` runs a four-stage pipeline on every message body:
| Stage | What it removes |
|-------|----------------|
| `_remove_quoted_sections()` | `<blockquote>` tags and Gmail/Outlook/Yahoo quoted-reply CSS wrappers |
| `_remove_boilerplate()` | `<table>`, `<img>`, `<script>`, `<style>` tags and footer/unsubscribe blocks |
| `_extract_text()` | Extracts plain text while preserving line breaks |
| `_clean_text()` | Strips invisible Unicode, collapses whitespace, removes `>` quote lines, cuts off at "On … wrote:" markers |
To add a new boilerplate pattern, append a string to `footer_keywords` inside `_remove_boilerplate()`, or add a CSS class fragment to `_QUOTED_CLASS_PATTERNS` at the top of the file.
---
## Extending the Pipeline
### Add a third agentic step
1. Create `agents/your_new_agent.py` inheriting from `BaseAgent` (see `agents/README.md`).
2. Add a new node method `_your_node()` in `workflow/conversation_processor.py`.
3. Add the node and a new edge in `_build_workflow()`:
```python
graph.add_node("your_step", self._your_node)
graph.add_edge("topic_extraction", "your_step")
graph.add_edge("your_step", END)
```
4. Add the corresponding output fields to `ConversationState`.
5. Map new columns in `main.py`'s `column_map` dict and add them to the DDL.
### Change the date range
Edit `queries/helpscout_conversations.sql`:
```sql
ct.CREATED_AT >= '2026-02-17 00:00:00' -- ← change start date
```
### Include team replies
Remove the anti-join in `helpscout_conversations.sql` and broaden `TYPE` to include `'note'` and `'message'`. Be sure to update the HTML cleaning and aggregation if team messages need different handling.
### Process a different HelpScout mailbox
Add a `WHERE` clause on a mailbox ID column if available, or filter by `source_via` / `status`.
### Automate daily runs
Schedule `main.py` with a cron job, Airflow DAG, or any task scheduler. Because the pipeline skips already-processed conversations, re-running it daily processes only new conversations — no manual bookkeeping needed.