Danialebrat's picture
Adding HelpScout to UI
58db664
"""
Main execution script for the HelpScout conversation processing pipeline.
Steps:
1. Fetch all customer conversations from Snowflake (HTML cleaned + aggregated)
2. Filter out conversations already in the output table
3. Run sentiment analysis + topic extraction in parallel batches
4. Append results to SOCIAL_MEDIA_DB.ML_FEATURES.HELPSCOUT_CONVERSATION_FEATURES
Run:
python main.py # process all new conversations, parallel
python main.py --limit 100 # process at most 100 conversations
python main.py --sequential # single-process mode (useful for debugging)
python main.py --config <path> # use a custom config file
"""
import json
import logging
import os
import sys
import argparse
import traceback
from datetime import datetime
from multiprocessing import Pool, cpu_count
from pathlib import Path
from typing import Any, Dict, List
import pandas as pd
from dotenv import load_dotenv
# ---------------------------------------------------------------------------
# Path setup — allows imports from the process_helpscout package directory
# ---------------------------------------------------------------------------
SCRIPT_DIR = Path(__file__).resolve().parent
ROOT_DIR = SCRIPT_DIR.parent
load_dotenv(ROOT_DIR / ".env")
sys.path.insert(0, str(SCRIPT_DIR))
# ---------------------------------------------------------------------------
# Logging — file + console; log directory is created on first run
# ---------------------------------------------------------------------------
_logs_dir = SCRIPT_DIR / "logs"
_logs_dir.mkdir(exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(
_logs_dir / f"helpscout_processing_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
),
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Local imports (after sys.path is set)
# ---------------------------------------------------------------------------
from snowflake_conn import SnowflakeConn
from data_fetcher import fetch_conversations, fetch_processed_ids
from workflow.conversation_processor import ConversationProcessingWorkflow
# ---------------------------------------------------------------------------
# Batch size helper
# ---------------------------------------------------------------------------
def calculate_optimal_batch_size(
total: int,
num_workers: int,
min_batch: int = 10,
max_batch: int = 50,
) -> int:
"""
Distribute work evenly across workers within the configured min/max bounds.
Args:
total: Total number of conversations to process
num_workers: Number of parallel worker processes
min_batch: Minimum conversations per batch
max_batch: Maximum conversations per batch
Returns:
Optimal batch size
"""
if total <= min_batch:
return total
batch_size = total // num_workers
return max(min_batch, min(max_batch, batch_size))
# ---------------------------------------------------------------------------
# Batch worker — runs in a separate process (must be module-level for pickle)
# ---------------------------------------------------------------------------
def process_batch_worker(batch_data: tuple) -> dict:
"""
Worker function executed in a separate process for one batch of conversations.
Each worker creates its own Snowflake connection and workflow instance so
resources are not shared across processes.
Args:
batch_data: (batch_num, conversations, config, api_key)
Returns:
Statistics dict for this batch.
"""
batch_num, batch_conversations, config, api_key = batch_data
worker_logger = logging.getLogger(f"Worker-{batch_num}")
try:
worker_logger.info(f"Batch {batch_num}: Processing {len(batch_conversations)} conversations")
# Worker-local Snowflake connection and workflow
conn = SnowflakeConn()
workflow = ConversationProcessingWorkflow(config, api_key)
# Run the workflow
results = workflow.process_batch(batch_conversations)
results_df = pd.DataFrame(results)
# Separate successful results
initial_count = len(results_df)
df_ok = results_df[results_df["success"] == True].copy()
failed_count = initial_count - len(df_ok)
worker_logger.info(
f"Batch {batch_num}: {len(df_ok)} successful, {failed_count} failed"
)
# ----------------------------------------------------------------
# Build output DataFrame with Snowflake column names
# ----------------------------------------------------------------
column_map = {
"conversation_id": "CONVERSATION_ID",
"customer_email": "CUSTOMER_EMAIL",
"customer_first": "CUSTOMER_FIRST",
"customer_last": "CUSTOMER_LAST",
"customer_hs_id": "CUSTOMER_HS_ID",
"thread_count": "THREAD_COUNT",
"first_message_at": "FIRST_MESSAGE_AT",
"last_message_at": "LAST_MESSAGE_AT",
"duration_hours": "DURATION_HOURS",
"status": "STATUS",
"state": "STATE",
"source_type": "SOURCE_TYPE",
"source_via": "SOURCE_VIA",
"combined_text": "COMBINED_TEXT",
"conversation_text": "CONVERSATION_TEXT_USED",
"sentiment_polarity": "SENTIMENT_POLARITY",
"emotions": "EMOTIONS",
"sentiment_confidence": "SENTIMENT_CONFIDENCE",
"sentiment_notes": "SENTIMENT_NOTES",
"topics": "TOPICS",
"is_refund_request": "IS_REFUND_REQUEST",
"is_cancellation": "IS_CANCELLATION",
"is_membership": "IS_MEMBERSHIP",
"topic_confidence": "TOPIC_CONFIDENCE",
"topic_notes": "TOPIC_NOTES",
"summary": "SUMMARY",
"processing_errors": "PROCESSING_ERRORS",
}
output_df = pd.DataFrame()
for src_col, tgt_col in column_map.items():
output_df[tgt_col] = df_ok[src_col] if src_col in df_ok.columns else None
# Flatten processing_errors list to a semicolon-separated string
if "PROCESSING_ERRORS" in output_df.columns:
output_df["PROCESSING_ERRORS"] = output_df["PROCESSING_ERRORS"].apply(
lambda x: "; ".join(x) if isinstance(x, list) else (str(x) if x else None)
)
# Pipeline metadata
output_df["PROCESSED_AT"] = datetime.now()
output_df["WORKFLOW_VERSION"] = "1.0"
# ----------------------------------------------------------------
# Store to Snowflake
# ----------------------------------------------------------------
out_cfg = config["output"]
if not output_df.empty:
conn.store_df_to_snowflake(
table_name=out_cfg["table"],
dataframe=output_df,
database=out_cfg["database"],
schema=out_cfg["schema"],
overwrite=False, # Always append; deduplication is handled upstream
)
conn.close()
return {
"batch_num": batch_num,
"success": True,
"total_processed": initial_count,
"total_stored": len(output_df),
"failed_count": failed_count,
"error": None,
}
except Exception as exc:
error_msg = f"Batch {batch_num} failed: {exc}"
worker_logger.error(error_msg)
worker_logger.error(traceback.format_exc())
return {
"batch_num": batch_num,
"success": False,
"total_processed": len(batch_conversations),
"total_stored": 0,
"failed_count": len(batch_conversations),
"error": str(exc),
}
# ---------------------------------------------------------------------------
# Main processor class
# ---------------------------------------------------------------------------
class HelpScoutProcessor:
"""
Orchestrates the end-to-end HelpScout conversation processing pipeline.
Typical usage:
processor = HelpScoutProcessor()
processor.run(limit=500)
"""
def __init__(self, config_path: str = None):
"""
Args:
config_path: Path to processing_config.json.
Defaults to config_files/processing_config.json
relative to this script.
"""
if config_path is None:
config_path = SCRIPT_DIR / "config_files" / "processing_config.json"
with open(config_path, "r") as f:
self.config = json.load(f)
self.conn = SnowflakeConn()
self.api_key = os.getenv("OPENAI_API_KEY")
if not self.api_key:
raise ValueError("OPENAI_API_KEY not found in environment variables")
logger.info("HelpScoutProcessor initialized")
def _calculate_num_workers(self) -> int:
"""CPU count minus 2, capped at 5 — mirrors the processing_comments pattern."""
num_cpus = cpu_count()
num_workers = max(1, min(5, num_cpus - 2))
logger.info(f"Using {num_workers} parallel workers (CPU count: {num_cpus})")
return num_workers
def run(self, limit: int = None, sequential: bool = False):
"""
Execute the full pipeline.
Args:
limit: Cap the number of conversations processed in this run.
Useful for incremental or test runs. Default: process all new.
sequential: If True, bypass multiprocessing (single-process debug mode).
"""
try:
logger.info("=" * 70)
logger.info("HelpScout Conversation Processing Pipeline")
logger.info(f"Mode: {'SEQUENTIAL (debug)' if sequential else 'PARALLEL'}")
logger.info("=" * 70)
# ------------------------------------------------------------------
# Step 1: Fetch + preprocess conversations
# ------------------------------------------------------------------
logger.info("Step 1: Fetching conversations from Snowflake...")
conversations_df = fetch_conversations(self.conn)
if conversations_df.empty:
logger.warning("No conversations returned. Exiting.")
return
logger.info(f"Fetched {len(conversations_df):,} total conversations")
# ------------------------------------------------------------------
# Step 2: Skip already-processed conversations
# ------------------------------------------------------------------
out_cfg = self.config["output"]
processed_ids = fetch_processed_ids(
self.conn,
out_cfg["database"],
out_cfg["schema"],
out_cfg["table"],
)
if processed_ids:
before = len(conversations_df)
conversations_df = conversations_df[
~conversations_df["conversation_id"].astype(str).isin(processed_ids)
].copy()
skipped = before - len(conversations_df)
logger.info(f"Skipped {skipped:,} already-processed conversations")
if conversations_df.empty:
logger.info("All conversations are already processed. Nothing to do.")
return
# ------------------------------------------------------------------
# Step 3: Apply optional limit
# ------------------------------------------------------------------
if limit:
conversations_df = conversations_df.head(limit)
logger.info(f"Limit applied: processing {len(conversations_df):,} conversations")
total = len(conversations_df)
logger.info(f"Processing {total:,} new conversations...")
# ------------------------------------------------------------------
# Step 4: Split into batches
# ------------------------------------------------------------------
num_workers = self._calculate_num_workers()
proc_cfg = self.config.get("processing", {})
batch_size = calculate_optimal_batch_size(
total,
num_workers,
min_batch=proc_cfg.get("min_batch_size", 10),
max_batch=proc_cfg.get("max_batch_size", 50),
)
conversations = conversations_df.to_dict("records")
batches = []
for i in range(0, total, batch_size):
batch = conversations[i : i + batch_size]
batch_num = (i // batch_size) + 1
batches.append((batch_num, batch, self.config, self.api_key))
logger.info(
f"Split into {len(batches)} batch(es) "
f"(batch size: {batch_size}, workers: {num_workers})"
)
# ------------------------------------------------------------------
# Step 5: Run batches
# ------------------------------------------------------------------
start_time = datetime.now()
if sequential:
results = [process_batch_worker(b) for b in batches]
else:
with Pool(processes=num_workers) as pool:
results = pool.map(process_batch_worker, batches)
elapsed = (datetime.now() - start_time).total_seconds()
# ------------------------------------------------------------------
# Step 6: Summary
# ------------------------------------------------------------------
total_processed = sum(r["total_processed"] for r in results)
total_stored = sum(r["total_stored"] for r in results)
total_failed = sum(r["failed_count"] for r in results)
failed_batches = [r for r in results if not r["success"]]
logger.info("=" * 70)
logger.info("Pipeline Summary")
logger.info(f" Output table : {out_cfg['database']}.{out_cfg['schema']}.{out_cfg['table']}")
logger.info(f" Processed : {total_processed:,}")
logger.info(f" Stored : {total_stored:,}")
logger.info(f" Failed : {total_failed:,}")
if failed_batches:
logger.error(f" Failed batches ({len(failed_batches)}):")
for fb in failed_batches:
logger.error(f" Batch {fb['batch_num']}: {fb['error']}")
logger.info(f" Elapsed : {elapsed:.1f}s")
logger.info(
f" Avg per conv : {elapsed / max(total_processed, 1):.2f}s"
)
logger.info("=" * 70)
except Exception as exc:
logger.error(f"Pipeline failed: {exc}", exc_info=True)
raise
finally:
self.conn.close()
logger.info("Snowflake connection closed")
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="Process HelpScout conversations: sentiment analysis + topic extraction"
)
parser.add_argument(
"--limit",
type=int,
default=None,
help="Maximum number of new conversations to process in this run (default: all)",
)
parser.add_argument(
"--sequential",
action="store_true",
default=False,
help="Single-process mode — useful for debugging (default: parallel)",
)
parser.add_argument(
"--config",
type=str,
default=None,
help="Path to processing_config.json (default: config_files/processing_config.json)",
)
args = parser.parse_args()
processor = HelpScoutProcessor(config_path=args.config)
processor.run(limit=args.limit, sequential=args.sequential)
if __name__ == "__main__":
main()