A newer version of the Streamlit SDK is available: 1.58.0
HelpScout Processing Pipeline
Extracts, cleans, and enriches customer support conversations from HelpScout. The module has two distinct responsibilities:
- Data export (
fetch_and_export.py) β fetches raw threads, cleans HTML, and exports CSVs for the Streamlit dashboard. - AI processing pipeline (
main.py) β fetches the same conversations, runs them through a two-step agentic workflow (sentiment + topic extraction), and writes enriched records to Snowflake.
Folder Structure
process_helpscout/
β
βββ main.py # Pipeline entry point (parallel processing)
βββ data_fetcher.py # Fetches & aggregates conversations; deduplication check
βββ fetch_and_export.py # CSV export script (separate from the pipeline)
βββ html_cleaner.py # HTML β clean plain text (shared by both workflows)
βββ snowflake_conn.py # Snowflake connection wrapper
β
βββ agents/ # LLM-based extraction agents
β βββ README.md # Agent architecture docs (read this to extend)
β βββ base_agent.py # Abstract base class for all agents
β βββ sentiment_analysis_agent.py # Classifies sentiment polarity + emotions
β βββ topic_extraction_agent.py # Assigns topic tags + billing flags
β
βββ workflow/
β βββ conversation_processor.py # LangGraph workflow: sentiment β topics β END
β
βββ config_files/
β βββ processing_config.json # Agent models, batch settings, output table, sentiment categories
β βββ topics.json # HelpScout topic taxonomy (source of truth for topic extraction)
β
βββ queries/
β βββ helpscout_conversations.sql # SQL that fetches customer threads from Snowflake
β
βββ sql/
β βββ create_features_table.sql # DDL β run once before first pipeline execution
β
βββ output/ # Auto-created; holds CSV exports
β βββ helpscout_threads.csv
β βββ helpscout_conversations.csv
β
βββ visualization/ # Streamlit dashboard (reads from CSV exports)
βββ app.py
βββ components/dashboard.py
βββ utils/data_processor.py
Data Flow
CSV Export (Dashboard)
Snowflake (STITCH.HELPSCOUT.CONVERSATION_THREADS)
β queries/helpscout_conversations.sql
βΌ
fetch_and_export.py
β process_threads() β clean HTML, add word_count, date columns
β aggregate_conversations() β one row per conversation_id
βΌ
output/helpscout_threads.csv (one row per message thread)
output/helpscout_conversations.csv (one row per conversation)
β
βΌ
visualization/app.py β Streamlit dashboard
AI Processing Pipeline
Snowflake (STITCH.HELPSCOUT.CONVERSATION_THREADS)
β Same SQL β customer threads only, Feb 17 2026+
βΌ
data_fetcher.fetch_conversations()
β Cleans HTML (html_cleaner.py)
β Aggregates to one row per conversation
β Checks HELPSCOUT_CONVERSATION_FEATURES for already-processed IDs
βΌ
main.py β splits into parallel batches
β
βββ Worker 1: ConversationProcessingWorkflow
β βββ Node 1: SentimentAnalysisAgent β polarity + emotions
β βββ Node 2: TopicExtractionAgent β topics + billing flags
β
βββ Worker 2: ... (same)
βββ Worker N: ... (same)
β
βΌ
SOCIAL_MEDIA_DB.ML_FEATURES.HELPSCOUT_CONVERSATION_FEATURES
Setup
1. Environment variables
All credentials are read from the project root .env file.
| Key | Description |
|---|---|
SNOWFLAKE_USER |
Snowflake username |
SNOWFLAKE_PASSWORD |
Snowflake password |
SNOWFLAKE_ACCOUNT |
Snowflake account identifier |
SNOWFLAKE_ROLE |
Role with access to STITCH, ESTUARY, and SOCIAL_MEDIA_DB |
SNOWFLAKE_WAREHOUSE |
Compute warehouse |
OPENAI_API_KEY |
Required for the AI pipeline only |
2. Dependencies
All dependencies are in the project root requirements.txt:
snowflake-snowpark-pythonbeautifulsoup4pandas,numpylangchain-openai,langgraphpython-dotenvstreamlit,plotly(dashboard only)
3. Create the output table (once)
Before running the pipeline for the first time, execute the DDL in Snowflake:
-- Run this in your Snowflake worksheet or via the Snowflake CLI
-- File: sql/create_features_table.sql
This creates SOCIAL_MEDIA_DB.ML_FEATURES.HELPSCOUT_CONVERSATION_FEATURES with a primary key on CONVERSATION_ID. The pipeline always appends β it never truncates the table.
Usage
Run the AI processing pipeline
cd process_helpscout
# Process all new conversations (parallel, recommended)
python main.py
# Limit to 100 conversations β useful for a first test run
python main.py --limit 100
# Sequential mode β single process, easier to read logs when debugging
python main.py --sequential
# Use a custom config file
python main.py --config /path/to/my_config.json
On every run the pipeline:
- Fetches all conversations (from Feb 17 2026 to today)
- Queries the output table for already-processed
CONVERSATION_IDs - Skips those β only new conversations are sent to the LLM
- Appends results to the Snowflake output table
Run the CSV export (dashboard data)
cd process_helpscout
python fetch_and_export.py
Launch the Streamlit dashboard
cd process_helpscout
streamlit run visualization/app.py
Output Table
Table: SOCIAL_MEDIA_DB.ML_FEATURES.HELPSCOUT_CONVERSATION_FEATURES
| Column | Type | Description |
|---|---|---|
CONVERSATION_ID |
VARCHAR | HelpScout conversation ID (primary key) |
CUSTOMER_EMAIL |
VARCHAR | Customer email address |
CUSTOMER_FIRST |
VARCHAR | Customer first name |
CUSTOMER_LAST |
VARCHAR | Customer last name |
CUSTOMER_HS_ID |
NUMBER | HelpScout internal customer ID |
THREAD_COUNT |
NUMBER | Number of customer message threads |
FIRST_MESSAGE_AT |
TIMESTAMP_TZ | When the first customer message was sent |
LAST_MESSAGE_AT |
TIMESTAMP_TZ | When the last customer message was sent |
DURATION_HOURS |
FLOAT | Hours between first and last message |
STATUS |
VARCHAR | Last known HelpScout status |
STATE |
VARCHAR | Last known HelpScout state |
SOURCE_TYPE |
VARCHAR | e.g. email, chat |
SOURCE_VIA |
VARCHAR | e.g. api, mailbox |
COMBINED_TEXT |
TEXT | Raw aggregated customer messages |
CONVERSATION_TEXT_USED |
TEXT | Formatted + truncated text sent to the LLM |
SENTIMENT_POLARITY |
VARCHAR | very_positive / positive / neutral / negative / very_negative |
EMOTIONS |
VARCHAR | Comma-separated emotion values (NULL if none valid) |
SENTIMENT_CONFIDENCE |
VARCHAR | high / medium / low |
SENTIMENT_NOTES |
TEXT | 1-2 sentence LLM explanation of the sentiment |
TOPICS |
VARCHAR | Comma-separated topic IDs (multi-label) |
IS_REFUND_REQUEST |
BOOLEAN | Customer explicitly asked for a refund |
IS_CANCELLATION |
BOOLEAN | Customer explicitly wants to cancel |
IS_MEMBERSHIP |
BOOLEAN | Customer wants to join/rejoin and purchase membership |
TOPIC_CONFIDENCE |
VARCHAR | high / medium / low |
TOPIC_NOTES |
TEXT | 1-2 sentence LLM explanation of topics |
SUMMARY |
TEXT | 2-3 sentence neutral summary of the conversation |
PROCESSING_ERRORS |
TEXT | Semicolon-separated errors (NULL on full success) |
PROCESSED_AT |
TIMESTAMP_NTZ | When this record was written by the pipeline |
WORKFLOW_VERSION |
VARCHAR | Pipeline version for auditability |
Configuration
All pipeline settings live in config_files/processing_config.json.
Agent models
"agents": {
"sentiment_analysis": {
"model": "gpt-4o-mini",
"temperature": 0.2,
"max_retries": 3
},
"topic_extraction": {
"model": "gpt-4o-mini",
"temperature": 0.2,
"max_retries": 3
}
}
Switch any agent to gpt-4o for higher accuracy (at higher cost) by changing the "model" value.
Conversation length
"processing": {
"max_conversation_chars": 3000,
"min_batch_size": 10,
"max_batch_size": 50
}
max_conversation_chars controls how many characters of conversation text are sent to the LLM. Increasing this improves context for long conversations but raises token costs. The workflow formats messages as [1] msg\n[2] msg⦠and truncates at this limit.
Output destination
"output": {
"database": "SOCIAL_MEDIA_DB",
"schema": "ML_FEATURES",
"table": "HELPSCOUT_CONVERSATION_FEATURES"
}
To write to a different table (e.g. a staging or test table), change these values and re-run the DDL in sql/create_features_table.sql for the new table name.
Sentiment categories
The sentiment_polarity and emotions blocks in processing_config.json define the valid values for classification. Adding, removing, or renaming a category here is automatically reflected in both the LLM prompt and the output validation β no code changes required.
Topic taxonomy
Topic definitions live in config_files/topics.json. This file is the single source of truth: the TopicExtractionAgent builds its system prompt directly from it. To add a new topic:
- Add an entry to the
"topics"array with a uniqueid,label, anddescription. - If the topic has boolean sub-flags (like billing), add a
"flags"key β then updatetopic_extraction_agent.pyto extract those flags. - Re-run the pipeline β the new topic will be available immediately.
SQL Query
File: queries/helpscout_conversations.sql
| Design decision | Detail |
|---|---|
| Date filter | CREATED_AT >= '2026-02-17' to current date |
| Team exclusion | Anti-join with USORA_USERS WHERE access_level = 'team' β only customer messages reach the pipeline |
| Thread types | TYPE IN ('customer', 'message') β excludes notes, forwarded threads, system messages |
| JSON extraction | Snowflake semi-structured syntax: COLUMN:field::VARCHAR |
To change the date range, edit the WHERE ct.CREATED_AT >= '...' line in the SQL file.
HTML Cleaner
html_cleaner.py runs a four-stage pipeline on every message body:
| Stage | What it removes |
|---|---|
_remove_quoted_sections() |
<blockquote> tags and Gmail/Outlook/Yahoo quoted-reply CSS wrappers |
_remove_boilerplate() |
<table>, <img>, <script>, <style> tags and footer/unsubscribe blocks |
_extract_text() |
Extracts plain text while preserving line breaks |
_clean_text() |
Strips invisible Unicode, collapses whitespace, removes > quote lines, cuts off at "On β¦ wrote:" markers |
To add a new boilerplate pattern, append a string to footer_keywords inside _remove_boilerplate(), or add a CSS class fragment to _QUOTED_CLASS_PATTERNS at the top of the file.
Extending the Pipeline
Add a third agentic step
- Create
agents/your_new_agent.pyinheriting fromBaseAgent(seeagents/README.md). - Add a new node method
_your_node()inworkflow/conversation_processor.py. - Add the node and a new edge in
_build_workflow():graph.add_node("your_step", self._your_node) graph.add_edge("topic_extraction", "your_step") graph.add_edge("your_step", END) - Add the corresponding output fields to
ConversationState. - Map new columns in
main.py'scolumn_mapdict and add them to the DDL.
Change the date range
Edit queries/helpscout_conversations.sql:
ct.CREATED_AT >= '2026-02-17 00:00:00' -- β change start date
Include team replies
Remove the anti-join in helpscout_conversations.sql and broaden TYPE to include 'note' and 'message'. Be sure to update the HTML cleaning and aggregation if team messages need different handling.
Process a different HelpScout mailbox
Add a WHERE clause on a mailbox ID column if available, or filter by source_via / status.
Automate daily runs
Schedule main.py with a cron job, Airflow DAG, or any task scheduler. Because the pipeline skips already-processed conversations, re-running it daily processes only new conversations β no manual bookkeeping needed.