AI_Chatbot / services.py
embedingHF's picture
Upload folder using huggingface_hub
ae677bb verified
Raw
History Blame Contribute Delete
21.3 kB
import logging
import threading
from typing import List, Dict, Optional, Any
import numpy as np
from sentence_transformers import SentenceTransformer
from django.conf import settings
from django.utils import timezone
from pgvector.django import CosineDistance
from .models import GlobalQA, PropertyCustomQA, PropertyQA, AgencyAutoChatSetting, PropertyAutoChatState
from django.db.models.signals import post_save
from django.dispatch import receiver
logger = logging.getLogger(__name__)
class EmbeddingService:
"""
FIX #7: Thread-safe singleton with per-thread model instances
Prevents memory leak and handles Django's multi-threaded environment
"""
_instances: Dict[int, 'EmbeddingService'] = {}
_lock = threading.Lock()
_model_path = None
def __new__(cls):
"""Thread-aware singleton: different instance per thread"""
thread_id = threading.get_ident()
if thread_id not in cls._instances:
with cls._lock:
if thread_id not in cls._instances:
instance = super().__new__(cls)
instance._loaded = False
cls._instances[thread_id] = instance
instance = cls._instances[thread_id]
if not instance._loaded:
instance._load_model()
instance._loaded = True
return instance
def __init__(self):
"""Initialize but don't load model until needed"""
if not hasattr(self, '_loaded'):
self._loaded = False
def _load_model(self):
"""Load the sentence transformer model"""
try:
model_path = getattr(settings, 'AI_MODEL_PATH', 'all-MiniLM-L6-v2')
self._model = SentenceTransformer(model_path)
logger.info(f"✅ AI Model loaded in thread {threading.get_ident()} from {model_path}")
except Exception as e:
logger.error(f"❌ Failed to load model: {e}")
raise
@classmethod
def cleanup_thread(cls):
"""Clean up model for current thread (call when thread exits)"""
thread_id = threading.get_ident()
if thread_id in cls._instances:
del cls._instances[thread_id]
logger.info(f"🧹 Cleaned up AI model for thread {thread_id}")
def generate_embedding(self, text: str) -> List[float]:
"""Generate embedding vector for text"""
if not hasattr(self, '_model') or self._model is None:
self._load_model()
embedding = self._model.encode(text)
return embedding.tolist()
def find_best_answer_for_property(self, question: str, property_id: str,
threshold: float = 0.6) -> Dict[str, Any]:
"""Find best matching answer for this specific property"""
question_embedding = self.generate_embedding(question)
# Check Global Q&A first (greetings)
best_global = GlobalQA.objects.filter(is_active=True).annotate(
distance=CosineDistance('question_embedding', question_embedding)
).order_by('distance').first()
if best_global:
confidence = 1 - best_global.distance
if confidence > threshold:
return {
'answer': best_global.answer,
'confidence': confidence,
'matched_question': best_global.question,
'type': 'global'
}
# Check Property Q&A
if property_id:
best_prop = PropertyQA.objects.filter(
property_id=property_id,
is_active=True
).annotate(
distance=CosineDistance('question_embedding', question_embedding)
).select_related('master_question').order_by('distance').first()
if best_prop:
confidence = 1 - best_prop.distance
if confidence > threshold:
return {
'answer': best_prop.answer,
'confidence': confidence,
'matched_question': best_prop.master_question.question,
'type': 'property'
}
return {
'answer': "Our agent will respond shortly. Thank you for your patience!",
'confidence': 0.0,
'matched_question': None,
'type': 'fallback'
}
# ai_chatbot/services.py - Update find_best_answer_global_first
def find_best_answer_global_first(self, question: str, property_id: str = None,
threshold: float = 0.6) -> Dict[str, Any]:
"""
Priority: 1. Property Data (dynamic) | 2. Global Greetings | 3. Property Master QA | 4. Property Custom QA | 5. Fallback
"""
# ✅ NEW: Check property data first (highest priority)
if property_id:
property_data_answer = self.get_property_info_answer(question, property_id)
if property_data_answer:
return property_data_answer
question_embedding = self.generate_embedding(question)
# 1. Global Greetings
best_global = GlobalQA.objects.filter(is_active=True).annotate(
distance=CosineDistance('question_embedding', question_embedding)
).order_by('distance').first()
if best_global:
global_conf = 1 - best_global.distance
if global_conf > 0.92:
return {
'answer': best_global.answer,
'confidence': global_conf,
'matched_question': best_global.question,
'type': 'global'
}
# 2. Property Master Q&A
if property_id:
best_prop = PropertyQA.objects.filter(
property_id=property_id,
is_active=True
).annotate(
distance=CosineDistance('question_embedding', question_embedding)
).select_related('master_question').order_by('distance').first()
if best_prop:
prop_conf = 1 - best_prop.distance
if prop_conf > threshold:
return {
'answer': best_prop.answer,
'confidence': prop_conf,
'matched_question': best_prop.master_question.question,
'type': 'property_master'
}
# 3. Property Custom Q&A
if property_id:
best_custom = PropertyCustomQA.objects.filter(
property_id=property_id,
is_active=True
).annotate(
distance=CosineDistance('question_embedding', question_embedding)
).order_by('distance').first()
if best_custom:
custom_conf = 1 - best_custom.distance
if custom_conf > threshold:
return {
'answer': best_custom.answer,
'confidence': custom_conf,
'matched_question': best_custom.question,
'type': 'property_custom'
}
# 4. Fallback
return {
'answer': "Our agent will respond shortly. Thank you for your patience!",
'confidence': 0.0,
'matched_question': None,
'type': 'fallback'
}
def get_property_info_answer(self, question: str, property_id: str) -> Dict[str, Any]:
"""Get answer from actual property data"""
from Property.models import Property
try:
# ✅ Remove 'area' from select_related - it's not a foreign key
property_obj = Property.objects.select_related('city').get(id=property_id)
except Property.DoesNotExist:
return None
question_lower = question.lower()
# City/Location questions
if any(word in question_lower for word in ['city', 'location', 'which city', 'kahan', 'city name', 'shahar']):
city_name = getattr(property_obj, 'city', None)
if city_name:
if hasattr(city_name, 'city'):
city = city_name.city
else:
city = str(city_name)
else:
city = "Karachi"
return {
'answer': f"This property is located in {city}, Pakistan.",
'confidence': 0.98,
'type': 'property_data'
}
# Area/Sector questions - handle area as a property field (not foreign key)
elif any(word in question_lower for word in ['area', 'sector', 'phase', 'block', 'location detail', 'area name']):
# ✅ 'area' is likely a direct field, not a foreign key
area = getattr(property_obj, 'area', None)
if area:
# If area is a string or has __str__ method
area_name = str(area)
else:
area_name = "the main area"
return {
'answer': f"This property is in {area_name} area.",
'confidence': 0.98,
'type': 'property_data'
}
# Address questions
elif any(word in question_lower for word in ['address', 'exact location', 'full address', 'pata', 'complete address']):
address = getattr(property_obj, 'address', None)
if address:
return {
'answer': f"Full address: {address}",
'confidence': 0.98,
'type': 'property_data'
}
# Price/Rent questions
elif any(word in question_lower for word in ['price', 'rent', 'cost', 'kitna', 'rate', 'price kya hai', 'rent kya hai']):
price = getattr(property_obj, 'price', None)
if price:
return {
'answer': f"The price for this property is PKR {price:,}.",
'confidence': 0.98,
'type': 'property_data'
}
# Bedrooms
elif any(word in question_lower for word in ['bedroom', 'bed', 'rooms', 'kitne kamray', 'bed count']):
beds = getattr(property_obj, 'beds', None)
if beds:
return {
'answer': f"This property has {beds} bedroom(s).",
'confidence': 0.98,
'type': 'property_data'
}
# Bathrooms
elif any(word in question_lower for word in ['bathroom', 'bath', 'washroom', 'bathrooms', 'bath count']):
baths = getattr(property_obj, 'baths', None)
if baths:
return {
'answer': f"This property has {baths} bathroom(s).",
'confidence': 0.98,
'type': 'property_data'
}
# Property size
elif any(word in question_lower for word in ['size', 'area size', 'total area', 'square feet', 'sqft', 'marle']):
area_size = getattr(property_obj, 'area_unit', None) or getattr(property_obj, 'total_area', None) or getattr(property_obj, 'land_area', None)
if area_size:
return {
'answer': f"The total area of this property is {area_size} sq ft.",
'confidence': 0.98,
'type': 'property_data'
}
# Property type / Category
elif any(word in question_lower for word in ['type', 'property type', 'kind', 'category']):
category = getattr(property_obj, 'category', None)
if category:
if hasattr(category, 'category'):
category_name = category.category
else:
category_name = str(category)
return {
'answer': f"This is a {category_name} property.",
'confidence': 0.98,
'type': 'property_data'
}
elif any(word in question_lower for word in ['detail', 'property information', 'property detail kiya hai', 'description', 'description kiya hai']):
desc = getattr(property_obj, 'desc', None)
if desc:
if hasattr(desc, 'desc'):
description_name = desc.desc
else:
description_name = str(desc)
return {
'answer': f"iss property ki detail hai: {description_name}.",
'confidence': 0.98,
'type': 'property_data'
}
return None
def _cosine_similarity(self, embedding1: List[float], embedding2: List[float]) -> float:
"""Calculate cosine similarity between two embeddings"""
vec1 = np.array(embedding1)
vec2 = np.array(embedding2)
dot = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
return float(dot / (norm1 * norm2)) if norm1 and norm2 else 0.0
class AutoChatService:
"""Orchestration service for auto-chat functionality"""
def __init__(self):
self.embedding_service = EmbeddingService()
def should_auto_reply(self, chat_id: str, property_id: str,
last_agency_reply_at=None, last_client_message_at=None) -> bool:
"""Check if auto-reply should trigger"""
from Chat.models import Chat
try:
# Get property auto-chat state
property_state = PropertyAutoChatState.objects.get(property_id=property_id)
if not property_state.is_auto_chat_enabled:
return False
# Get agency settings
property_obj = property_state.property
agency_setting = AgencyAutoChatSetting.objects.get(agency=property_obj.user)
if not agency_setting.is_enabled:
return False
if not last_client_message_at:
return False
# If agency replied after last client message, don't auto-reply
if last_agency_reply_at and last_agency_reply_at > last_client_message_at:
return False
# Check if configured delay passed
time_diff = (timezone.now() - last_client_message_at).total_seconds()
return time_diff >= agency_setting.delay_seconds
except (PropertyAutoChatState.DoesNotExist, AgencyAutoChatSetting.DoesNotExist):
return False
except Exception as e:
logger.error(f"Error checking auto-reply: {e}")
return False
def generate_auto_reply(self, client_message: str, property_id: str,
chat_id: str) -> Optional[Dict[str, Any]]:
"""Generate auto-reply with proper thresholds"""
try:
from Property.models import Property
property_obj = Property.objects.get(id=property_id)
agency_setting = AgencyAutoChatSetting.objects.get(agency=property_obj.user)
# Use agency's configured threshold
result = self.embedding_service.find_best_answer_global_first(
client_message,
property_id,
agency_setting.confidence_threshold
)
logger.info(f"🤖 Auto-reply | Type: {result['type']} | Confidence: {result['confidence']:.2f}")
return {
'answer': result['answer'],
'confidence': result['confidence'],
'matched_question': result.get('matched_question'),
'is_auto_reply': True,
'reply_type': result['type'],
'is_fallback': result['type'] == 'fallback'
}
except Property.DoesNotExist:
logger.error(f"Property {property_id} not found")
return None
except AgencyAutoChatSetting.DoesNotExist:
logger.warning(f"No agency settings for property {property_id}, using defaults")
# Fallback to defaults
result = self.embedding_service.find_best_answer_global_first(
client_message, property_id, 0.6
)
return {
'answer': result['answer'],
'confidence': result['confidence'],
'matched_question': result.get('matched_question'),
'is_auto_reply': True,
'reply_type': result['type'],
'is_fallback': result['type'] == 'fallback'
}
except Exception as e:
logger.error(f"Auto-reply error: {e}", exc_info=True)
return None
def sync_property_listing_to_ai(self, property_id: str) -> bool:
"""
Sync property data to AI Q&A
FIX #5: Don't delete existing, update in place
"""
from .models import MasterQuestion, PropertyQA
from Property.models import Property
try:
property_obj = Property.objects.get(id=property_id)
agency = property_obj.user
# Mapping of listing fields to Master Questions
data_map = {
"What is the price or rent of this property?": f"The demand for this property is PKR {property_obj if hasattr(property_obj, 'price') else 'contact agent'}.",
"What is the location and area of this property?": f"This property is located in {getattr(property_obj, 'area', 'the area')}, {getattr(property_obj, 'city', 'the city')}.",
"How many bedrooms and bathrooms does it have?": f"It is a {getattr(property_obj, 'beds', 'N/A')} bedroom property with {getattr(property_obj, 'baths', 'N/A')} bathrooms.",
"What is the total size or area of the property?": f"The total size is {getattr(property_obj, 'area', 'N/A')} {getattr(property_obj, 'area_unit', 'sq ft')}.",
"What type of property is it?": f"This is a {getattr(property_obj, 'category', 'property')}.",
"How many kitchens and TV lounges does it have?": f"It has {getattr(property_obj, 'kitchen', 'N/A')} kitchens and {getattr(property_obj, 'tv_launch', 'N/A')} TV lounges.",
"What is contact number of the agent?": f"You can contact the agent at {getattr(property_obj, 'agent_two_phone_number', 'the provided number')}.",
"Information About this property": f"{getattr(property_obj, 'desc', 'Contact agent for details')}",
}
updated_count = 0
created_count = 0
for q_text, answer_text in data_map.items():
master_q = MasterQuestion.objects.filter(question=q_text, is_active=True).first()
if master_q:
embedding = self.embedding_service.generate_embedding(q_text)
# FIX #5: Update or create without deleting
obj, created = PropertyQA.objects.update_or_create(
property=property_obj,
master_question=master_q,
defaults={
'agency': agency,
'answer': answer_text,
'question_embedding': embedding,
'is_active': True
}
)
if created:
created_count += 1
else:
updated_count += 1
logger.info(f"✅ Sync complete for Property {property_id} | Created: {created_count} | Updated: {updated_count}")
return True
except Exception as e:
logger.error(f"❌ Sync error for property {property_id}: {e}", exc_info=True)
return False
@receiver(post_save, sender='Property.Property')
def auto_sync_ai_on_listing_save(sender, instance, created, **kwargs):
"""Jab bhi property save ho, AI data sync karo"""
try:
from ai_chatbot.services import AutoChatService
service = AutoChatService()
service.sync_property_listing_to_ai(instance.id)
except Exception as e:
logger.error(f"Signal Error: {e}")