""" Data Fetcher for the HelpScout processing pipeline. Responsible for: 1. Fetching raw customer threads from Snowflake (reusing fetch_and_export logic) 2. Cleaning HTML and aggregating to conversation level 3. Checking which conversations have already been processed (for deduplication) Reuses fetch_raw(), process_threads(), and aggregate_conversations() from fetch_and_export.py so the cleaning and aggregation logic stays in one place. """ import logging from pathlib import Path from typing import Set import pandas as pd from snowflake_conn import SnowflakeConn from fetch_and_export import fetch_raw, process_threads, aggregate_conversations logger = logging.getLogger(__name__) def fetch_conversations(conn: SnowflakeConn) -> pd.DataFrame: """ Fetch, clean, and aggregate all customer conversations from HelpScout. Returns one row per conversation_id with the following key columns: - conversation_id - combined_text (all customer messages joined with ' | ') - customer_email, customer_first, customer_last, customer_hs_id - thread_count, first_message_at, last_message_at, duration_hours - status, state, source_type, source_via Returns an empty DataFrame if no data is available. """ raw_df = fetch_raw(conn) if raw_df.empty: logger.warning("No raw threads returned from Snowflake.") return pd.DataFrame() threads_df = process_threads(raw_df) if threads_df.empty: logger.warning("All threads were empty after HTML cleaning.") return pd.DataFrame() conversations_df = aggregate_conversations(threads_df) logger.info(f"Ready to process: {len(conversations_df):,} conversations") return conversations_df def fetch_processed_ids( conn: SnowflakeConn, database: str, schema: str, table: str, ) -> Set[str]: """ Return the set of conversation_ids already stored in the output table. Returns an empty set if the table does not exist yet (first run) or if the query fails for any other reason — the pipeline will then process all conversations. """ try: query = f"SELECT CONVERSATION_ID FROM {database}.{schema}.{table}" df = conn.run_query(query, description="fetch_processed_ids") ids = set(df["conversation_id"].dropna().astype(str).tolist()) logger.info(f"Found {len(ids):,} already-processed conversations in {table}") return ids except Exception as exc: logger.warning( f"Could not fetch processed IDs from {database}.{schema}.{table} " f"(table may not exist yet): {exc}" ) return set()