| """ |
| Data Fetcher for the HelpScout processing pipeline. |
| |
| Responsible for: |
| 1. Fetching raw customer threads from Snowflake (reusing fetch_and_export logic) |
| 2. Cleaning HTML and aggregating to conversation level |
| 3. Checking which conversations have already been processed (for deduplication) |
| |
| Reuses fetch_raw(), process_threads(), and aggregate_conversations() from |
| fetch_and_export.py so the cleaning and aggregation logic stays in one place. |
| """ |
|
|
| import logging |
| from pathlib import Path |
| from typing import Set |
|
|
| import pandas as pd |
|
|
| from snowflake_conn import SnowflakeConn |
| from fetch_and_export import fetch_raw, process_threads, aggregate_conversations |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def fetch_conversations(conn: SnowflakeConn) -> pd.DataFrame: |
| """ |
| Fetch, clean, and aggregate all customer conversations from HelpScout. |
| |
| Returns one row per conversation_id with the following key columns: |
| - conversation_id |
| - combined_text (all customer messages joined with ' | ') |
| - customer_email, customer_first, customer_last, customer_hs_id |
| - thread_count, first_message_at, last_message_at, duration_hours |
| - status, state, source_type, source_via |
| |
| Returns an empty DataFrame if no data is available. |
| """ |
| raw_df = fetch_raw(conn) |
| if raw_df.empty: |
| logger.warning("No raw threads returned from Snowflake.") |
| return pd.DataFrame() |
|
|
| threads_df = process_threads(raw_df) |
| if threads_df.empty: |
| logger.warning("All threads were empty after HTML cleaning.") |
| return pd.DataFrame() |
|
|
| conversations_df = aggregate_conversations(threads_df) |
| logger.info(f"Ready to process: {len(conversations_df):,} conversations") |
| return conversations_df |
|
|
|
|
| def fetch_processed_ids( |
| conn: SnowflakeConn, |
| database: str, |
| schema: str, |
| table: str, |
| ) -> Set[str]: |
| """ |
| Return the set of conversation_ids already stored in the output table. |
| |
| Returns an empty set if the table does not exist yet (first run) or if |
| the query fails for any other reason — the pipeline will then process |
| all conversations. |
| """ |
| try: |
| query = f"SELECT CONVERSATION_ID FROM {database}.{schema}.{table}" |
| df = conn.run_query(query, description="fetch_processed_ids") |
| ids = set(df["conversation_id"].dropna().astype(str).tolist()) |
| logger.info(f"Found {len(ids):,} already-processed conversations in {table}") |
| return ids |
| except Exception as exc: |
| logger.warning( |
| f"Could not fetch processed IDs from {database}.{schema}.{table} " |
| f"(table may not exist yet): {exc}" |
| ) |
| return set() |