""" Main execution script for the HelpScout conversation processing pipeline. Steps: 1. Fetch all customer conversations from Snowflake (HTML cleaned + aggregated) 2. Filter out conversations already in the output table 3. Run sentiment analysis + topic extraction in parallel batches 4. Append results to SOCIAL_MEDIA_DB.ML_FEATURES.HELPSCOUT_CONVERSATION_FEATURES Run: python main.py # process all new conversations, parallel python main.py --limit 100 # process at most 100 conversations python main.py --sequential # single-process mode (useful for debugging) python main.py --config # use a custom config file """ import json import logging import os import sys import argparse import traceback from datetime import datetime from multiprocessing import Pool, cpu_count from pathlib import Path from typing import Any, Dict, List import pandas as pd from dotenv import load_dotenv # --------------------------------------------------------------------------- # Path setup — allows imports from the process_helpscout package directory # --------------------------------------------------------------------------- SCRIPT_DIR = Path(__file__).resolve().parent ROOT_DIR = SCRIPT_DIR.parent load_dotenv(ROOT_DIR / ".env") sys.path.insert(0, str(SCRIPT_DIR)) # --------------------------------------------------------------------------- # Logging — file + console; log directory is created on first run # --------------------------------------------------------------------------- _logs_dir = SCRIPT_DIR / "logs" _logs_dir.mkdir(exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler( _logs_dir / f"helpscout_processing_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" ), logging.StreamHandler(), ], ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Local imports (after sys.path is set) # --------------------------------------------------------------------------- from snowflake_conn import SnowflakeConn from data_fetcher import fetch_conversations, fetch_processed_ids from workflow.conversation_processor import ConversationProcessingWorkflow # --------------------------------------------------------------------------- # Batch size helper # --------------------------------------------------------------------------- def calculate_optimal_batch_size( total: int, num_workers: int, min_batch: int = 10, max_batch: int = 50, ) -> int: """ Distribute work evenly across workers within the configured min/max bounds. Args: total: Total number of conversations to process num_workers: Number of parallel worker processes min_batch: Minimum conversations per batch max_batch: Maximum conversations per batch Returns: Optimal batch size """ if total <= min_batch: return total batch_size = total // num_workers return max(min_batch, min(max_batch, batch_size)) # --------------------------------------------------------------------------- # Batch worker — runs in a separate process (must be module-level for pickle) # --------------------------------------------------------------------------- def process_batch_worker(batch_data: tuple) -> dict: """ Worker function executed in a separate process for one batch of conversations. Each worker creates its own Snowflake connection and workflow instance so resources are not shared across processes. Args: batch_data: (batch_num, conversations, config, api_key) Returns: Statistics dict for this batch. """ batch_num, batch_conversations, config, api_key = batch_data worker_logger = logging.getLogger(f"Worker-{batch_num}") try: worker_logger.info(f"Batch {batch_num}: Processing {len(batch_conversations)} conversations") # Worker-local Snowflake connection and workflow conn = SnowflakeConn() workflow = ConversationProcessingWorkflow(config, api_key) # Run the workflow results = workflow.process_batch(batch_conversations) results_df = pd.DataFrame(results) # Separate successful results initial_count = len(results_df) df_ok = results_df[results_df["success"] == True].copy() failed_count = initial_count - len(df_ok) worker_logger.info( f"Batch {batch_num}: {len(df_ok)} successful, {failed_count} failed" ) # ---------------------------------------------------------------- # Build output DataFrame with Snowflake column names # ---------------------------------------------------------------- column_map = { "conversation_id": "CONVERSATION_ID", "customer_email": "CUSTOMER_EMAIL", "customer_first": "CUSTOMER_FIRST", "customer_last": "CUSTOMER_LAST", "customer_hs_id": "CUSTOMER_HS_ID", "thread_count": "THREAD_COUNT", "first_message_at": "FIRST_MESSAGE_AT", "last_message_at": "LAST_MESSAGE_AT", "duration_hours": "DURATION_HOURS", "status": "STATUS", "state": "STATE", "source_type": "SOURCE_TYPE", "source_via": "SOURCE_VIA", "combined_text": "COMBINED_TEXT", "conversation_text": "CONVERSATION_TEXT_USED", "sentiment_polarity": "SENTIMENT_POLARITY", "emotions": "EMOTIONS", "sentiment_confidence": "SENTIMENT_CONFIDENCE", "sentiment_notes": "SENTIMENT_NOTES", "topics": "TOPICS", "is_refund_request": "IS_REFUND_REQUEST", "is_cancellation": "IS_CANCELLATION", "is_membership": "IS_MEMBERSHIP", "topic_confidence": "TOPIC_CONFIDENCE", "topic_notes": "TOPIC_NOTES", "summary": "SUMMARY", "processing_errors": "PROCESSING_ERRORS", } output_df = pd.DataFrame() for src_col, tgt_col in column_map.items(): output_df[tgt_col] = df_ok[src_col] if src_col in df_ok.columns else None # Flatten processing_errors list to a semicolon-separated string if "PROCESSING_ERRORS" in output_df.columns: output_df["PROCESSING_ERRORS"] = output_df["PROCESSING_ERRORS"].apply( lambda x: "; ".join(x) if isinstance(x, list) else (str(x) if x else None) ) # Pipeline metadata output_df["PROCESSED_AT"] = datetime.now() output_df["WORKFLOW_VERSION"] = "1.0" # ---------------------------------------------------------------- # Store to Snowflake # ---------------------------------------------------------------- out_cfg = config["output"] if not output_df.empty: conn.store_df_to_snowflake( table_name=out_cfg["table"], dataframe=output_df, database=out_cfg["database"], schema=out_cfg["schema"], overwrite=False, # Always append; deduplication is handled upstream ) conn.close() return { "batch_num": batch_num, "success": True, "total_processed": initial_count, "total_stored": len(output_df), "failed_count": failed_count, "error": None, } except Exception as exc: error_msg = f"Batch {batch_num} failed: {exc}" worker_logger.error(error_msg) worker_logger.error(traceback.format_exc()) return { "batch_num": batch_num, "success": False, "total_processed": len(batch_conversations), "total_stored": 0, "failed_count": len(batch_conversations), "error": str(exc), } # --------------------------------------------------------------------------- # Main processor class # --------------------------------------------------------------------------- class HelpScoutProcessor: """ Orchestrates the end-to-end HelpScout conversation processing pipeline. Typical usage: processor = HelpScoutProcessor() processor.run(limit=500) """ def __init__(self, config_path: str = None): """ Args: config_path: Path to processing_config.json. Defaults to config_files/processing_config.json relative to this script. """ if config_path is None: config_path = SCRIPT_DIR / "config_files" / "processing_config.json" with open(config_path, "r") as f: self.config = json.load(f) self.conn = SnowflakeConn() self.api_key = os.getenv("OPENAI_API_KEY") if not self.api_key: raise ValueError("OPENAI_API_KEY not found in environment variables") logger.info("HelpScoutProcessor initialized") def _calculate_num_workers(self) -> int: """CPU count minus 2, capped at 5 — mirrors the processing_comments pattern.""" num_cpus = cpu_count() num_workers = max(1, min(5, num_cpus - 2)) logger.info(f"Using {num_workers} parallel workers (CPU count: {num_cpus})") return num_workers def run(self, limit: int = None, sequential: bool = False): """ Execute the full pipeline. Args: limit: Cap the number of conversations processed in this run. Useful for incremental or test runs. Default: process all new. sequential: If True, bypass multiprocessing (single-process debug mode). """ try: logger.info("=" * 70) logger.info("HelpScout Conversation Processing Pipeline") logger.info(f"Mode: {'SEQUENTIAL (debug)' if sequential else 'PARALLEL'}") logger.info("=" * 70) # ------------------------------------------------------------------ # Step 1: Fetch + preprocess conversations # ------------------------------------------------------------------ logger.info("Step 1: Fetching conversations from Snowflake...") conversations_df = fetch_conversations(self.conn) if conversations_df.empty: logger.warning("No conversations returned. Exiting.") return logger.info(f"Fetched {len(conversations_df):,} total conversations") # ------------------------------------------------------------------ # Step 2: Skip already-processed conversations # ------------------------------------------------------------------ out_cfg = self.config["output"] processed_ids = fetch_processed_ids( self.conn, out_cfg["database"], out_cfg["schema"], out_cfg["table"], ) if processed_ids: before = len(conversations_df) conversations_df = conversations_df[ ~conversations_df["conversation_id"].astype(str).isin(processed_ids) ].copy() skipped = before - len(conversations_df) logger.info(f"Skipped {skipped:,} already-processed conversations") if conversations_df.empty: logger.info("All conversations are already processed. Nothing to do.") return # ------------------------------------------------------------------ # Step 3: Apply optional limit # ------------------------------------------------------------------ if limit: conversations_df = conversations_df.head(limit) logger.info(f"Limit applied: processing {len(conversations_df):,} conversations") total = len(conversations_df) logger.info(f"Processing {total:,} new conversations...") # ------------------------------------------------------------------ # Step 4: Split into batches # ------------------------------------------------------------------ num_workers = self._calculate_num_workers() proc_cfg = self.config.get("processing", {}) batch_size = calculate_optimal_batch_size( total, num_workers, min_batch=proc_cfg.get("min_batch_size", 10), max_batch=proc_cfg.get("max_batch_size", 50), ) conversations = conversations_df.to_dict("records") batches = [] for i in range(0, total, batch_size): batch = conversations[i : i + batch_size] batch_num = (i // batch_size) + 1 batches.append((batch_num, batch, self.config, self.api_key)) logger.info( f"Split into {len(batches)} batch(es) " f"(batch size: {batch_size}, workers: {num_workers})" ) # ------------------------------------------------------------------ # Step 5: Run batches # ------------------------------------------------------------------ start_time = datetime.now() if sequential: results = [process_batch_worker(b) for b in batches] else: with Pool(processes=num_workers) as pool: results = pool.map(process_batch_worker, batches) elapsed = (datetime.now() - start_time).total_seconds() # ------------------------------------------------------------------ # Step 6: Summary # ------------------------------------------------------------------ total_processed = sum(r["total_processed"] for r in results) total_stored = sum(r["total_stored"] for r in results) total_failed = sum(r["failed_count"] for r in results) failed_batches = [r for r in results if not r["success"]] logger.info("=" * 70) logger.info("Pipeline Summary") logger.info(f" Output table : {out_cfg['database']}.{out_cfg['schema']}.{out_cfg['table']}") logger.info(f" Processed : {total_processed:,}") logger.info(f" Stored : {total_stored:,}") logger.info(f" Failed : {total_failed:,}") if failed_batches: logger.error(f" Failed batches ({len(failed_batches)}):") for fb in failed_batches: logger.error(f" Batch {fb['batch_num']}: {fb['error']}") logger.info(f" Elapsed : {elapsed:.1f}s") logger.info( f" Avg per conv : {elapsed / max(total_processed, 1):.2f}s" ) logger.info("=" * 70) except Exception as exc: logger.error(f"Pipeline failed: {exc}", exc_info=True) raise finally: self.conn.close() logger.info("Snowflake connection closed") # --------------------------------------------------------------------------- # CLI entry point # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser( description="Process HelpScout conversations: sentiment analysis + topic extraction" ) parser.add_argument( "--limit", type=int, default=None, help="Maximum number of new conversations to process in this run (default: all)", ) parser.add_argument( "--sequential", action="store_true", default=False, help="Single-process mode — useful for debugging (default: parallel)", ) parser.add_argument( "--config", type=str, default=None, help="Path to processing_config.json (default: config_files/processing_config.json)", ) args = parser.parse_args() processor = HelpScoutProcessor(config_path=args.config) processor.run(limit=args.limit, sequential=args.sequential) if __name__ == "__main__": main()