# HelpScout Processing Pipeline Extracts, cleans, and enriches customer support conversations from HelpScout. The module has two distinct responsibilities: 1. **Data export** (`fetch_and_export.py`) — fetches raw threads, cleans HTML, and exports CSVs for the Streamlit dashboard. 2. **AI processing pipeline** (`main.py`) — fetches the same conversations, runs them through a two-step agentic workflow (sentiment + topic extraction), and writes enriched records to Snowflake. --- ## Folder Structure ``` process_helpscout/ │ ├── main.py # Pipeline entry point (parallel processing) ├── data_fetcher.py # Fetches & aggregates conversations; deduplication check ├── fetch_and_export.py # CSV export script (separate from the pipeline) ├── html_cleaner.py # HTML → clean plain text (shared by both workflows) ├── snowflake_conn.py # Snowflake connection wrapper │ ├── agents/ # LLM-based extraction agents │ ├── README.md # Agent architecture docs (read this to extend) │ ├── base_agent.py # Abstract base class for all agents │ ├── sentiment_analysis_agent.py # Classifies sentiment polarity + emotions │ └── topic_extraction_agent.py # Assigns topic tags + billing flags │ ├── workflow/ │ └── conversation_processor.py # LangGraph workflow: sentiment → topics → END │ ├── config_files/ │ ├── processing_config.json # Agent models, batch settings, output table, sentiment categories │ └── topics.json # HelpScout topic taxonomy (source of truth for topic extraction) │ ├── queries/ │ └── helpscout_conversations.sql # SQL that fetches customer threads from Snowflake │ ├── sql/ │ └── create_features_table.sql # DDL — run once before first pipeline execution │ ├── output/ # Auto-created; holds CSV exports │ ├── helpscout_threads.csv │ └── helpscout_conversations.csv │ └── visualization/ # Streamlit dashboard (reads from CSV exports) ├── app.py ├── components/dashboard.py └── utils/data_processor.py ``` --- ## Data Flow ### CSV Export (Dashboard) ``` Snowflake (STITCH.HELPSCOUT.CONVERSATION_THREADS) │ queries/helpscout_conversations.sql ▼ fetch_and_export.py │ process_threads() — clean HTML, add word_count, date columns │ aggregate_conversations() — one row per conversation_id ▼ output/helpscout_threads.csv (one row per message thread) output/helpscout_conversations.csv (one row per conversation) │ ▼ visualization/app.py → Streamlit dashboard ``` ### AI Processing Pipeline ``` Snowflake (STITCH.HELPSCOUT.CONVERSATION_THREADS) │ Same SQL — customer threads only, Feb 17 2026+ ▼ data_fetcher.fetch_conversations() │ Cleans HTML (html_cleaner.py) │ Aggregates to one row per conversation │ Checks HELPSCOUT_CONVERSATION_FEATURES for already-processed IDs ▼ main.py — splits into parallel batches │ ├── Worker 1: ConversationProcessingWorkflow │ ├── Node 1: SentimentAnalysisAgent → polarity + emotions │ └── Node 2: TopicExtractionAgent → topics + billing flags │ ├── Worker 2: ... (same) └── Worker N: ... (same) │ ▼ SOCIAL_MEDIA_DB.ML_FEATURES.HELPSCOUT_CONVERSATION_FEATURES ``` --- ## Setup ### 1. Environment variables All credentials are read from the project root `.env` file. | Key | Description | |-----|-------------| | `SNOWFLAKE_USER` | Snowflake username | | `SNOWFLAKE_PASSWORD` | Snowflake password | | `SNOWFLAKE_ACCOUNT` | Snowflake account identifier | | `SNOWFLAKE_ROLE` | Role with access to `STITCH`, `ESTUARY`, and `SOCIAL_MEDIA_DB` | | `SNOWFLAKE_WAREHOUSE` | Compute warehouse | | `OPENAI_API_KEY` | Required for the AI pipeline only | ### 2. Dependencies All dependencies are in the project root `requirements.txt`: - `snowflake-snowpark-python` - `beautifulsoup4` - `pandas`, `numpy` - `langchain-openai`, `langgraph` - `python-dotenv` - `streamlit`, `plotly` (dashboard only) ### 3. Create the output table (once) Before running the pipeline for the first time, execute the DDL in Snowflake: ```sql -- Run this in your Snowflake worksheet or via the Snowflake CLI -- File: sql/create_features_table.sql ``` This creates `SOCIAL_MEDIA_DB.ML_FEATURES.HELPSCOUT_CONVERSATION_FEATURES` with a primary key on `CONVERSATION_ID`. The pipeline always appends — it never truncates the table. --- ## Usage ### Run the AI processing pipeline ```bash cd process_helpscout # Process all new conversations (parallel, recommended) python main.py # Limit to 100 conversations — useful for a first test run python main.py --limit 100 # Sequential mode — single process, easier to read logs when debugging python main.py --sequential # Use a custom config file python main.py --config /path/to/my_config.json ``` On every run the pipeline: 1. Fetches all conversations (from Feb 17 2026 to today) 2. Queries the output table for already-processed `CONVERSATION_ID`s 3. Skips those — only new conversations are sent to the LLM 4. Appends results to the Snowflake output table ### Run the CSV export (dashboard data) ```bash cd process_helpscout python fetch_and_export.py ``` ### Launch the Streamlit dashboard ```bash cd process_helpscout streamlit run visualization/app.py ``` --- ## Output Table **Table:** `SOCIAL_MEDIA_DB.ML_FEATURES.HELPSCOUT_CONVERSATION_FEATURES` | Column | Type | Description | |--------|------|-------------| | `CONVERSATION_ID` | VARCHAR | HelpScout conversation ID (primary key) | | `CUSTOMER_EMAIL` | VARCHAR | Customer email address | | `CUSTOMER_FIRST` | VARCHAR | Customer first name | | `CUSTOMER_LAST` | VARCHAR | Customer last name | | `CUSTOMER_HS_ID` | NUMBER | HelpScout internal customer ID | | `THREAD_COUNT` | NUMBER | Number of customer message threads | | `FIRST_MESSAGE_AT` | TIMESTAMP_TZ | When the first customer message was sent | | `LAST_MESSAGE_AT` | TIMESTAMP_TZ | When the last customer message was sent | | `DURATION_HOURS` | FLOAT | Hours between first and last message | | `STATUS` | VARCHAR | Last known HelpScout status | | `STATE` | VARCHAR | Last known HelpScout state | | `SOURCE_TYPE` | VARCHAR | e.g. `email`, `chat` | | `SOURCE_VIA` | VARCHAR | e.g. `api`, `mailbox` | | `COMBINED_TEXT` | TEXT | Raw aggregated customer messages | | `CONVERSATION_TEXT_USED` | TEXT | Formatted + truncated text sent to the LLM | | `SENTIMENT_POLARITY` | VARCHAR | `very_positive` / `positive` / `neutral` / `negative` / `very_negative` | | `EMOTIONS` | VARCHAR | Comma-separated emotion values (NULL if none valid) | | `SENTIMENT_CONFIDENCE` | VARCHAR | `high` / `medium` / `low` | | `SENTIMENT_NOTES` | TEXT | 1-2 sentence LLM explanation of the sentiment | | `TOPICS` | VARCHAR | Comma-separated topic IDs (multi-label) | | `IS_REFUND_REQUEST` | BOOLEAN | Customer explicitly asked for a refund | | `IS_CANCELLATION` | BOOLEAN | Customer explicitly wants to cancel | | `IS_MEMBERSHIP` | BOOLEAN | Customer wants to join/rejoin and purchase membership | | `TOPIC_CONFIDENCE` | VARCHAR | `high` / `medium` / `low` | | `TOPIC_NOTES` | TEXT | 1-2 sentence LLM explanation of topics | | `SUMMARY` | TEXT | 2-3 sentence neutral summary of the conversation | | `PROCESSING_ERRORS` | TEXT | Semicolon-separated errors (NULL on full success) | | `PROCESSED_AT` | TIMESTAMP_NTZ | When this record was written by the pipeline | | `WORKFLOW_VERSION` | VARCHAR | Pipeline version for auditability | --- ## Configuration All pipeline settings live in `config_files/processing_config.json`. ### Agent models ```json "agents": { "sentiment_analysis": { "model": "gpt-4o-mini", "temperature": 0.2, "max_retries": 3 }, "topic_extraction": { "model": "gpt-4o-mini", "temperature": 0.2, "max_retries": 3 } } ``` Switch any agent to `gpt-4o` for higher accuracy (at higher cost) by changing the `"model"` value. ### Conversation length ```json "processing": { "max_conversation_chars": 3000, "min_batch_size": 10, "max_batch_size": 50 } ``` `max_conversation_chars` controls how many characters of conversation text are sent to the LLM. Increasing this improves context for long conversations but raises token costs. The workflow formats messages as `[1] msg\n[2] msg…` and truncates at this limit. ### Output destination ```json "output": { "database": "SOCIAL_MEDIA_DB", "schema": "ML_FEATURES", "table": "HELPSCOUT_CONVERSATION_FEATURES" } ``` To write to a different table (e.g. a staging or test table), change these values and re-run the DDL in `sql/create_features_table.sql` for the new table name. ### Sentiment categories The `sentiment_polarity` and `emotions` blocks in `processing_config.json` define the valid values for classification. Adding, removing, or renaming a category here is automatically reflected in both the LLM prompt and the output validation — no code changes required. ### Topic taxonomy Topic definitions live in `config_files/topics.json`. This file is the single source of truth: the `TopicExtractionAgent` builds its system prompt directly from it. To add a new topic: 1. Add an entry to the `"topics"` array with a unique `id`, `label`, and `description`. 2. If the topic has boolean sub-flags (like billing), add a `"flags"` key — then update `topic_extraction_agent.py` to extract those flags. 3. Re-run the pipeline — the new topic will be available immediately. --- ## SQL Query **File:** `queries/helpscout_conversations.sql` | Design decision | Detail | |-----------------|--------| | Date filter | `CREATED_AT >= '2026-02-17'` to current date | | Team exclusion | Anti-join with `USORA_USERS WHERE access_level = 'team'` — only customer messages reach the pipeline | | Thread types | `TYPE IN ('customer', 'message')` — excludes notes, forwarded threads, system messages | | JSON extraction | Snowflake semi-structured syntax: `COLUMN:field::VARCHAR` | To change the date range, edit the `WHERE ct.CREATED_AT >= '...'` line in the SQL file. --- ## HTML Cleaner `html_cleaner.py` runs a four-stage pipeline on every message body: | Stage | What it removes | |-------|----------------| | `_remove_quoted_sections()` | `
` tags and Gmail/Outlook/Yahoo quoted-reply CSS wrappers | | `_remove_boilerplate()` | ``, ``, `