Danialebrat's picture
Adding HelpScout to UI
58db664
"""
Data Fetcher for the HelpScout processing pipeline.
Responsible for:
1. Fetching raw customer threads from Snowflake (reusing fetch_and_export logic)
2. Cleaning HTML and aggregating to conversation level
3. Checking which conversations have already been processed (for deduplication)
Reuses fetch_raw(), process_threads(), and aggregate_conversations() from
fetch_and_export.py so the cleaning and aggregation logic stays in one place.
"""
import logging
from pathlib import Path
from typing import Set
import pandas as pd
from snowflake_conn import SnowflakeConn
from fetch_and_export import fetch_raw, process_threads, aggregate_conversations
logger = logging.getLogger(__name__)
def fetch_conversations(conn: SnowflakeConn) -> pd.DataFrame:
"""
Fetch, clean, and aggregate all customer conversations from HelpScout.
Returns one row per conversation_id with the following key columns:
- conversation_id
- combined_text (all customer messages joined with ' | ')
- customer_email, customer_first, customer_last, customer_hs_id
- thread_count, first_message_at, last_message_at, duration_hours
- status, state, source_type, source_via
Returns an empty DataFrame if no data is available.
"""
raw_df = fetch_raw(conn)
if raw_df.empty:
logger.warning("No raw threads returned from Snowflake.")
return pd.DataFrame()
threads_df = process_threads(raw_df)
if threads_df.empty:
logger.warning("All threads were empty after HTML cleaning.")
return pd.DataFrame()
conversations_df = aggregate_conversations(threads_df)
logger.info(f"Ready to process: {len(conversations_df):,} conversations")
return conversations_df
def fetch_processed_ids(
conn: SnowflakeConn,
database: str,
schema: str,
table: str,
) -> Set[str]:
"""
Return the set of conversation_ids already stored in the output table.
Returns an empty set if the table does not exist yet (first run) or if
the query fails for any other reason — the pipeline will then process
all conversations.
"""
try:
query = f"SELECT CONVERSATION_ID FROM {database}.{schema}.{table}"
df = conn.run_query(query, description="fetch_processed_ids")
ids = set(df["conversation_id"].dropna().astype(str).tolist())
logger.info(f"Found {len(ids):,} already-processed conversations in {table}")
return ids
except Exception as exc:
logger.warning(
f"Could not fetch processed IDs from {database}.{schema}.{table} "
f"(table may not exist yet): {exc}"
)
return set()