| """ |
| Main execution script for the HelpScout conversation processing pipeline. |
| |
| Steps: |
| 1. Fetch all customer conversations from Snowflake (HTML cleaned + aggregated) |
| 2. Filter out conversations already in the output table |
| 3. Run sentiment analysis + topic extraction in parallel batches |
| 4. Append results to SOCIAL_MEDIA_DB.ML_FEATURES.HELPSCOUT_CONVERSATION_FEATURES |
| |
| Run: |
| python main.py # process all new conversations, parallel |
| python main.py --limit 100 # process at most 100 conversations |
| python main.py --sequential # single-process mode (useful for debugging) |
| python main.py --config <path> # use a custom config file |
| """ |
|
|
| import json |
| import logging |
| import os |
| import sys |
| import argparse |
| import traceback |
| from datetime import datetime |
| from multiprocessing import Pool, cpu_count |
| from pathlib import Path |
| from typing import Any, Dict, List |
|
|
| import pandas as pd |
| from dotenv import load_dotenv |
|
|
| |
| |
| |
| SCRIPT_DIR = Path(__file__).resolve().parent |
| ROOT_DIR = SCRIPT_DIR.parent |
|
|
| load_dotenv(ROOT_DIR / ".env") |
| sys.path.insert(0, str(SCRIPT_DIR)) |
|
|
| |
| |
| |
| _logs_dir = SCRIPT_DIR / "logs" |
| _logs_dir.mkdir(exist_ok=True) |
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", |
| handlers=[ |
| logging.FileHandler( |
| _logs_dir / f"helpscout_processing_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" |
| ), |
| logging.StreamHandler(), |
| ], |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| |
| from snowflake_conn import SnowflakeConn |
| from data_fetcher import fetch_conversations, fetch_processed_ids |
| from workflow.conversation_processor import ConversationProcessingWorkflow |
|
|
|
|
| |
| |
| |
|
|
| def calculate_optimal_batch_size( |
| total: int, |
| num_workers: int, |
| min_batch: int = 10, |
| max_batch: int = 50, |
| ) -> int: |
| """ |
| Distribute work evenly across workers within the configured min/max bounds. |
| |
| Args: |
| total: Total number of conversations to process |
| num_workers: Number of parallel worker processes |
| min_batch: Minimum conversations per batch |
| max_batch: Maximum conversations per batch |
| |
| Returns: |
| Optimal batch size |
| """ |
| if total <= min_batch: |
| return total |
| batch_size = total // num_workers |
| return max(min_batch, min(max_batch, batch_size)) |
|
|
|
|
| |
| |
| |
|
|
| def process_batch_worker(batch_data: tuple) -> dict: |
| """ |
| Worker function executed in a separate process for one batch of conversations. |
| |
| Each worker creates its own Snowflake connection and workflow instance so |
| resources are not shared across processes. |
| |
| Args: |
| batch_data: (batch_num, conversations, config, api_key) |
| |
| Returns: |
| Statistics dict for this batch. |
| """ |
| batch_num, batch_conversations, config, api_key = batch_data |
| worker_logger = logging.getLogger(f"Worker-{batch_num}") |
|
|
| try: |
| worker_logger.info(f"Batch {batch_num}: Processing {len(batch_conversations)} conversations") |
|
|
| |
| conn = SnowflakeConn() |
| workflow = ConversationProcessingWorkflow(config, api_key) |
|
|
| |
| results = workflow.process_batch(batch_conversations) |
| results_df = pd.DataFrame(results) |
|
|
| |
| initial_count = len(results_df) |
| df_ok = results_df[results_df["success"] == True].copy() |
| failed_count = initial_count - len(df_ok) |
|
|
| worker_logger.info( |
| f"Batch {batch_num}: {len(df_ok)} successful, {failed_count} failed" |
| ) |
|
|
| |
| |
| |
| column_map = { |
| "conversation_id": "CONVERSATION_ID", |
| "customer_email": "CUSTOMER_EMAIL", |
| "customer_first": "CUSTOMER_FIRST", |
| "customer_last": "CUSTOMER_LAST", |
| "customer_hs_id": "CUSTOMER_HS_ID", |
| "thread_count": "THREAD_COUNT", |
| "first_message_at": "FIRST_MESSAGE_AT", |
| "last_message_at": "LAST_MESSAGE_AT", |
| "duration_hours": "DURATION_HOURS", |
| "status": "STATUS", |
| "state": "STATE", |
| "source_type": "SOURCE_TYPE", |
| "source_via": "SOURCE_VIA", |
| "combined_text": "COMBINED_TEXT", |
| "conversation_text": "CONVERSATION_TEXT_USED", |
| "sentiment_polarity": "SENTIMENT_POLARITY", |
| "emotions": "EMOTIONS", |
| "sentiment_confidence": "SENTIMENT_CONFIDENCE", |
| "sentiment_notes": "SENTIMENT_NOTES", |
| "topics": "TOPICS", |
| "is_refund_request": "IS_REFUND_REQUEST", |
| "is_cancellation": "IS_CANCELLATION", |
| "is_membership": "IS_MEMBERSHIP", |
| "topic_confidence": "TOPIC_CONFIDENCE", |
| "topic_notes": "TOPIC_NOTES", |
| "summary": "SUMMARY", |
| "processing_errors": "PROCESSING_ERRORS", |
| } |
|
|
| output_df = pd.DataFrame() |
| for src_col, tgt_col in column_map.items(): |
| output_df[tgt_col] = df_ok[src_col] if src_col in df_ok.columns else None |
|
|
| |
| if "PROCESSING_ERRORS" in output_df.columns: |
| output_df["PROCESSING_ERRORS"] = output_df["PROCESSING_ERRORS"].apply( |
| lambda x: "; ".join(x) if isinstance(x, list) else (str(x) if x else None) |
| ) |
|
|
| |
| output_df["PROCESSED_AT"] = datetime.now() |
| output_df["WORKFLOW_VERSION"] = "1.0" |
|
|
| |
| |
| |
| out_cfg = config["output"] |
| if not output_df.empty: |
| conn.store_df_to_snowflake( |
| table_name=out_cfg["table"], |
| dataframe=output_df, |
| database=out_cfg["database"], |
| schema=out_cfg["schema"], |
| overwrite=False, |
| ) |
|
|
| conn.close() |
|
|
| return { |
| "batch_num": batch_num, |
| "success": True, |
| "total_processed": initial_count, |
| "total_stored": len(output_df), |
| "failed_count": failed_count, |
| "error": None, |
| } |
|
|
| except Exception as exc: |
| error_msg = f"Batch {batch_num} failed: {exc}" |
| worker_logger.error(error_msg) |
| worker_logger.error(traceback.format_exc()) |
| return { |
| "batch_num": batch_num, |
| "success": False, |
| "total_processed": len(batch_conversations), |
| "total_stored": 0, |
| "failed_count": len(batch_conversations), |
| "error": str(exc), |
| } |
|
|
|
|
| |
| |
| |
|
|
| class HelpScoutProcessor: |
| """ |
| Orchestrates the end-to-end HelpScout conversation processing pipeline. |
| |
| Typical usage: |
| processor = HelpScoutProcessor() |
| processor.run(limit=500) |
| """ |
|
|
| def __init__(self, config_path: str = None): |
| """ |
| Args: |
| config_path: Path to processing_config.json. |
| Defaults to config_files/processing_config.json |
| relative to this script. |
| """ |
| if config_path is None: |
| config_path = SCRIPT_DIR / "config_files" / "processing_config.json" |
|
|
| with open(config_path, "r") as f: |
| self.config = json.load(f) |
|
|
| self.conn = SnowflakeConn() |
|
|
| self.api_key = os.getenv("OPENAI_API_KEY") |
| if not self.api_key: |
| raise ValueError("OPENAI_API_KEY not found in environment variables") |
|
|
| logger.info("HelpScoutProcessor initialized") |
|
|
| def _calculate_num_workers(self) -> int: |
| """CPU count minus 2, capped at 5 — mirrors the processing_comments pattern.""" |
| num_cpus = cpu_count() |
| num_workers = max(1, min(5, num_cpus - 2)) |
| logger.info(f"Using {num_workers} parallel workers (CPU count: {num_cpus})") |
| return num_workers |
|
|
| def run(self, limit: int = None, sequential: bool = False): |
| """ |
| Execute the full pipeline. |
| |
| Args: |
| limit: Cap the number of conversations processed in this run. |
| Useful for incremental or test runs. Default: process all new. |
| sequential: If True, bypass multiprocessing (single-process debug mode). |
| """ |
| try: |
| logger.info("=" * 70) |
| logger.info("HelpScout Conversation Processing Pipeline") |
| logger.info(f"Mode: {'SEQUENTIAL (debug)' if sequential else 'PARALLEL'}") |
| logger.info("=" * 70) |
|
|
| |
| |
| |
| logger.info("Step 1: Fetching conversations from Snowflake...") |
| conversations_df = fetch_conversations(self.conn) |
|
|
| if conversations_df.empty: |
| logger.warning("No conversations returned. Exiting.") |
| return |
|
|
| logger.info(f"Fetched {len(conversations_df):,} total conversations") |
|
|
| |
| |
| |
| out_cfg = self.config["output"] |
| processed_ids = fetch_processed_ids( |
| self.conn, |
| out_cfg["database"], |
| out_cfg["schema"], |
| out_cfg["table"], |
| ) |
|
|
| if processed_ids: |
| before = len(conversations_df) |
| conversations_df = conversations_df[ |
| ~conversations_df["conversation_id"].astype(str).isin(processed_ids) |
| ].copy() |
| skipped = before - len(conversations_df) |
| logger.info(f"Skipped {skipped:,} already-processed conversations") |
|
|
| if conversations_df.empty: |
| logger.info("All conversations are already processed. Nothing to do.") |
| return |
|
|
| |
| |
| |
| if limit: |
| conversations_df = conversations_df.head(limit) |
| logger.info(f"Limit applied: processing {len(conversations_df):,} conversations") |
|
|
| total = len(conversations_df) |
| logger.info(f"Processing {total:,} new conversations...") |
|
|
| |
| |
| |
| num_workers = self._calculate_num_workers() |
| proc_cfg = self.config.get("processing", {}) |
| batch_size = calculate_optimal_batch_size( |
| total, |
| num_workers, |
| min_batch=proc_cfg.get("min_batch_size", 10), |
| max_batch=proc_cfg.get("max_batch_size", 50), |
| ) |
|
|
| conversations = conversations_df.to_dict("records") |
| batches = [] |
| for i in range(0, total, batch_size): |
| batch = conversations[i : i + batch_size] |
| batch_num = (i // batch_size) + 1 |
| batches.append((batch_num, batch, self.config, self.api_key)) |
|
|
| logger.info( |
| f"Split into {len(batches)} batch(es) " |
| f"(batch size: {batch_size}, workers: {num_workers})" |
| ) |
|
|
| |
| |
| |
| start_time = datetime.now() |
|
|
| if sequential: |
| results = [process_batch_worker(b) for b in batches] |
| else: |
| with Pool(processes=num_workers) as pool: |
| results = pool.map(process_batch_worker, batches) |
|
|
| elapsed = (datetime.now() - start_time).total_seconds() |
|
|
| |
| |
| |
| total_processed = sum(r["total_processed"] for r in results) |
| total_stored = sum(r["total_stored"] for r in results) |
| total_failed = sum(r["failed_count"] for r in results) |
| failed_batches = [r for r in results if not r["success"]] |
|
|
| logger.info("=" * 70) |
| logger.info("Pipeline Summary") |
| logger.info(f" Output table : {out_cfg['database']}.{out_cfg['schema']}.{out_cfg['table']}") |
| logger.info(f" Processed : {total_processed:,}") |
| logger.info(f" Stored : {total_stored:,}") |
| logger.info(f" Failed : {total_failed:,}") |
| if failed_batches: |
| logger.error(f" Failed batches ({len(failed_batches)}):") |
| for fb in failed_batches: |
| logger.error(f" Batch {fb['batch_num']}: {fb['error']}") |
| logger.info(f" Elapsed : {elapsed:.1f}s") |
| logger.info( |
| f" Avg per conv : {elapsed / max(total_processed, 1):.2f}s" |
| ) |
| logger.info("=" * 70) |
|
|
| except Exception as exc: |
| logger.error(f"Pipeline failed: {exc}", exc_info=True) |
| raise |
|
|
| finally: |
| self.conn.close() |
| logger.info("Snowflake connection closed") |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| parser = argparse.ArgumentParser( |
| description="Process HelpScout conversations: sentiment analysis + topic extraction" |
| ) |
| parser.add_argument( |
| "--limit", |
| type=int, |
| default=None, |
| help="Maximum number of new conversations to process in this run (default: all)", |
| ) |
| parser.add_argument( |
| "--sequential", |
| action="store_true", |
| default=False, |
| help="Single-process mode — useful for debugging (default: parallel)", |
| ) |
| parser.add_argument( |
| "--config", |
| type=str, |
| default=None, |
| help="Path to processing_config.json (default: config_files/processing_config.json)", |
| ) |
| args = parser.parse_args() |
|
|
| processor = HelpScoutProcessor(config_path=args.config) |
| processor.run(limit=args.limit, sequential=args.sequential) |
|
|
|
|
| if __name__ == "__main__": |
| main() |