File size: 6,489 Bytes
42db8ec
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
import openai
import numpy as np
import re
from typing import List, Tuple
from config import EMBED_MODEL

def get_embedding(text: str) -> List[float]:
    """Generate embedding for a given text."""
    text_strip = text.replace("\n", " ").strip()
    response = openai.embeddings.create(input=[text_strip], model=EMBED_MODEL)
    return response.data[0].embedding

def cosine_similarity(a: List[float], b: List[float]) -> float:
    """Calculate cosine similarity between two vectors."""
    a = np.array(a)
    b = np.array(b)
    if np.linalg.norm(a) == 0 or np.linalg.norm(b) == 0:
        return 0.0
    return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

def clean_time(time_str: str) -> str:
    """Clean up time string."""
    if not time_str:
        return ""
    
    time_match = re.search(r'(\d{1,2}):?(\d{0,2})\s*(AM|PM)', time_str, re.IGNORECASE)
    if time_match:
        hour = time_match.group(1)
        minute = time_match.group(2) or "00"
        ampm = time_match.group(3).upper()
        return f"{hour}:{minute} {ampm}"
    
    return time_str.strip()

def find_top_k_matches(user_embedding, dataset, k=3):
    """Find top k matching entries from a dataset."""
    scored = []
    for entry_id, text, emb in dataset:
        score = cosine_similarity(user_embedding, emb)
        scored.append((score, entry_id, text))
    scored.sort(reverse=True)
    return scored[:k]

def classify_intent(question: str) -> str:
    """
    Classify the user's intent into:
    Mode A: Recommendation Mode (Workshops, Dates, Availability, Recommendations)
    Mode B: Front Desk Mode (Default - Everything else)
    """
    prompt = f"""Classify the following user question into one of two modes:
1. "Mode A - Recommendation Mode": Use this if the user is asking about workshops, specific dates, what's available this month, asking for recommendations, or career goals (like getting an agent).
2. "Mode B - Front Desk Mode": Use this for broad introductory questions, kids classes, signing up, summit, instructor roles, auditing, online vs in-studio, general policies, or specific questions about existing classes.

User Question: "{question}"

Response must be exactly "Mode A" or "Mode B"."""

    try:
        response = openai.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            temperature=0,
            max_tokens=5
        )
        prediction = response.choices[0].message.content.strip()
        if "Mode A" in prediction:
            return "Mode A"
        return "Mode B"
    except Exception as e:
        print(f"Error in intent classification: {e}")
        return "Mode B"  # Default to Front Desk Mode

def should_include_email(question: str) -> bool:
    """
    Determine if the contact email should be shown based on user intent.
    Allowed for: Payments, Refunds, Attendance issues, Account problems.
    """
    from config import EMAIL_ONLY_KEYWORDS
    import re
    
    question_lower = question.lower()
    for word in EMAIL_ONLY_KEYWORDS:
        pattern = rf'\b{re.escape(word)}\b'
        if re.search(pattern, question_lower):
            return True
            
    return False

def classify_user_type(question: str, history: List[dict] = None) -> str:
    """
    Classify the user type into:
    - new_actor
    - experienced_actor
    - parent
    - current_student
    - unknown
    """
    history_str = ""
    if history:
        history_str = "\nConversation context:\n" + "\n".join([f"{m['role']}: {m['content'][:100]}..." for m in history[-3:]])

    prompt = f"""Classify the user into exactly one of these categories based on their question and context:
1. "new_actor": Just starting out, has no experience, or is asking how to begin.
2. "experienced_actor": Already has credits, mentions agents, looking for advanced workshops, or refers to their career progress.
3. "parent": Asking on behalf of their child, mentions "my kid", "my son", "my daughter", "teens".
4. "current_student": Refers to past/current classes at Get Scene, mentions a specific GSP membership, or asks about recurring student workshops.
5. "unknown": Not enough information yet.

User Question: "{question}"{history_str}

Response must be exactly one of: new_actor, experienced_actor, parent, current_student, unknown."""

    try:
        response = openai.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}],
            temperature=0,
            max_tokens=10
        )
        prediction = response.choices[0].message.content.strip().lower()
        valid_types = ["new_actor", "experienced_actor", "parent", "current_student", "unknown"]
        for t in valid_types:
            if t in prediction:
                return t
        return "unknown"
    except Exception as e:
        print(f"Error in user type classification: {e}")
        return "unknown"

def recalculate_all_embeddings():
    """Recalculate embeddings for all entries in faq_entries and podcast_episodes that are missing embeddings."""
    from database import get_db_connection
    import json
    
    with get_db_connection() as conn:
        cur = conn.cursor()
        
        # 1. Update FAQs
        print("Starting FAQ embedding recalculation...")
        cur.execute("SELECT id, question FROM faq_entries WHERE embedding IS NULL")
        faqs = cur.fetchall()
        for faq_id, question in faqs:
            try:
                emb = get_embedding(question)
                cur.execute("UPDATE faq_entries SET embedding = ? WHERE id = ?", (json.dumps(emb), faq_id))
                print(f"  ✓ Updated FAQ ID {faq_id}")
            except Exception as e:
                print(f"  ✗ Error updating FAQ ID {faq_id}: {e}")
        
        # 2. Update Podcasts
        print("Starting Podcast embedding recalculation...")
        cur.execute("SELECT id, full_text FROM podcast_episodes WHERE embedding IS NULL")
        podcasts = cur.fetchall()
        for pod_id, full_text in podcasts:
            try:
                emb = get_embedding(full_text)
                cur.execute("UPDATE podcast_episodes SET embedding = ? WHERE id = ?", (json.dumps(emb), pod_id))
                print(f"  ✓ Updated Podcast ID {pod_id}")
            except Exception as e:
                print(f"  ✗ Error updating Podcast ID {pod_id}: {e}")
        
        conn.commit()
    print("Embedding recalculation complete.")