| import logging
|
| import threading
|
| from typing import List, Dict, Optional, Any
|
|
|
| import numpy as np
|
| from sentence_transformers import SentenceTransformer
|
| from django.conf import settings
|
| from django.utils import timezone
|
| from pgvector.django import CosineDistance
|
|
|
| from .models import GlobalQA, PropertyCustomQA, PropertyQA, AgencyAutoChatSetting, PropertyAutoChatState
|
| from django.db.models.signals import post_save
|
| from django.dispatch import receiver
|
| logger = logging.getLogger(__name__)
|
|
|
|
|
| class EmbeddingService:
|
| """
|
| FIX #7: Thread-safe singleton with per-thread model instances
|
| Prevents memory leak and handles Django's multi-threaded environment
|
| """
|
| _instances: Dict[int, 'EmbeddingService'] = {}
|
| _lock = threading.Lock()
|
| _model_path = None
|
|
|
| def __new__(cls):
|
| """Thread-aware singleton: different instance per thread"""
|
| thread_id = threading.get_ident()
|
|
|
| if thread_id not in cls._instances:
|
| with cls._lock:
|
| if thread_id not in cls._instances:
|
| instance = super().__new__(cls)
|
| instance._loaded = False
|
| cls._instances[thread_id] = instance
|
|
|
| instance = cls._instances[thread_id]
|
| if not instance._loaded:
|
| instance._load_model()
|
| instance._loaded = True
|
|
|
| return instance
|
|
|
| def __init__(self):
|
| """Initialize but don't load model until needed"""
|
| if not hasattr(self, '_loaded'):
|
| self._loaded = False
|
|
|
| def _load_model(self):
|
| """Load the sentence transformer model"""
|
| try:
|
| model_path = getattr(settings, 'AI_MODEL_PATH', 'all-MiniLM-L6-v2')
|
| self._model = SentenceTransformer(model_path)
|
| logger.info(f"✅ AI Model loaded in thread {threading.get_ident()} from {model_path}")
|
| except Exception as e:
|
| logger.error(f"❌ Failed to load model: {e}")
|
| raise
|
|
|
| @classmethod
|
| def cleanup_thread(cls):
|
| """Clean up model for current thread (call when thread exits)"""
|
| thread_id = threading.get_ident()
|
| if thread_id in cls._instances:
|
| del cls._instances[thread_id]
|
| logger.info(f"🧹 Cleaned up AI model for thread {thread_id}")
|
|
|
| def generate_embedding(self, text: str) -> List[float]:
|
| """Generate embedding vector for text"""
|
| if not hasattr(self, '_model') or self._model is None:
|
| self._load_model()
|
|
|
| embedding = self._model.encode(text)
|
| return embedding.tolist()
|
|
|
| def find_best_answer_for_property(self, question: str, property_id: str,
|
| threshold: float = 0.6) -> Dict[str, Any]:
|
| """Find best matching answer for this specific property"""
|
| question_embedding = self.generate_embedding(question)
|
|
|
|
|
| best_global = GlobalQA.objects.filter(is_active=True).annotate(
|
| distance=CosineDistance('question_embedding', question_embedding)
|
| ).order_by('distance').first()
|
|
|
| if best_global:
|
| confidence = 1 - best_global.distance
|
| if confidence > threshold:
|
| return {
|
| 'answer': best_global.answer,
|
| 'confidence': confidence,
|
| 'matched_question': best_global.question,
|
| 'type': 'global'
|
| }
|
|
|
|
|
| if property_id:
|
| best_prop = PropertyQA.objects.filter(
|
| property_id=property_id,
|
| is_active=True
|
| ).annotate(
|
| distance=CosineDistance('question_embedding', question_embedding)
|
| ).select_related('master_question').order_by('distance').first()
|
|
|
| if best_prop:
|
| confidence = 1 - best_prop.distance
|
| if confidence > threshold:
|
| return {
|
| 'answer': best_prop.answer,
|
| 'confidence': confidence,
|
| 'matched_question': best_prop.master_question.question,
|
| 'type': 'property'
|
| }
|
|
|
| return {
|
| 'answer': "Our agent will respond shortly. Thank you for your patience!",
|
| 'confidence': 0.0,
|
| 'matched_question': None,
|
| 'type': 'fallback'
|
| }
|
|
|
|
|
|
|
|
|
| def find_best_answer_global_first(self, question: str, property_id: str = None,
|
| threshold: float = 0.6) -> Dict[str, Any]:
|
| """
|
| Priority: 1. Property Data (dynamic) | 2. Global Greetings | 3. Property Master QA | 4. Property Custom QA | 5. Fallback
|
| """
|
|
|
|
|
| if property_id:
|
| property_data_answer = self.get_property_info_answer(question, property_id)
|
| if property_data_answer:
|
| return property_data_answer
|
|
|
| question_embedding = self.generate_embedding(question)
|
|
|
|
|
| best_global = GlobalQA.objects.filter(is_active=True).annotate(
|
| distance=CosineDistance('question_embedding', question_embedding)
|
| ).order_by('distance').first()
|
|
|
| if best_global:
|
| global_conf = 1 - best_global.distance
|
| if global_conf > 0.92:
|
| return {
|
| 'answer': best_global.answer,
|
| 'confidence': global_conf,
|
| 'matched_question': best_global.question,
|
| 'type': 'global'
|
| }
|
|
|
|
|
| if property_id:
|
| best_prop = PropertyQA.objects.filter(
|
| property_id=property_id,
|
| is_active=True
|
| ).annotate(
|
| distance=CosineDistance('question_embedding', question_embedding)
|
| ).select_related('master_question').order_by('distance').first()
|
|
|
| if best_prop:
|
| prop_conf = 1 - best_prop.distance
|
| if prop_conf > threshold:
|
| return {
|
| 'answer': best_prop.answer,
|
| 'confidence': prop_conf,
|
| 'matched_question': best_prop.master_question.question,
|
| 'type': 'property_master'
|
| }
|
|
|
|
|
| if property_id:
|
| best_custom = PropertyCustomQA.objects.filter(
|
| property_id=property_id,
|
| is_active=True
|
| ).annotate(
|
| distance=CosineDistance('question_embedding', question_embedding)
|
| ).order_by('distance').first()
|
|
|
| if best_custom:
|
| custom_conf = 1 - best_custom.distance
|
| if custom_conf > threshold:
|
| return {
|
| 'answer': best_custom.answer,
|
| 'confidence': custom_conf,
|
| 'matched_question': best_custom.question,
|
| 'type': 'property_custom'
|
| }
|
|
|
|
|
| return {
|
| 'answer': "Our agent will respond shortly. Thank you for your patience!",
|
| 'confidence': 0.0,
|
| 'matched_question': None,
|
| 'type': 'fallback'
|
| }
|
|
|
|
|
| def get_property_info_answer(self, question: str, property_id: str) -> Dict[str, Any]:
|
| """Get answer from actual property data"""
|
| from Property.models import Property
|
|
|
| try:
|
|
|
| property_obj = Property.objects.select_related('city').get(id=property_id)
|
| except Property.DoesNotExist:
|
| return None
|
|
|
| question_lower = question.lower()
|
|
|
|
|
| if any(word in question_lower for word in ['city', 'location', 'which city', 'kahan', 'city name', 'shahar']):
|
| city_name = getattr(property_obj, 'city', None)
|
| if city_name:
|
| if hasattr(city_name, 'city'):
|
| city = city_name.city
|
| else:
|
| city = str(city_name)
|
| else:
|
| city = "Karachi"
|
| return {
|
| 'answer': f"This property is located in {city}, Pakistan.",
|
| 'confidence': 0.98,
|
| 'type': 'property_data'
|
| }
|
|
|
|
|
| elif any(word in question_lower for word in ['area', 'sector', 'phase', 'block', 'location detail', 'area name']):
|
|
|
| area = getattr(property_obj, 'area', None)
|
| if area:
|
|
|
| area_name = str(area)
|
| else:
|
| area_name = "the main area"
|
| return {
|
| 'answer': f"This property is in {area_name} area.",
|
| 'confidence': 0.98,
|
| 'type': 'property_data'
|
| }
|
|
|
|
|
| elif any(word in question_lower for word in ['address', 'exact location', 'full address', 'pata', 'complete address']):
|
| address = getattr(property_obj, 'address', None)
|
| if address:
|
| return {
|
| 'answer': f"Full address: {address}",
|
| 'confidence': 0.98,
|
| 'type': 'property_data'
|
| }
|
|
|
|
|
| elif any(word in question_lower for word in ['price', 'rent', 'cost', 'kitna', 'rate', 'price kya hai', 'rent kya hai']):
|
| price = getattr(property_obj, 'price', None)
|
| if price:
|
| return {
|
| 'answer': f"The price for this property is PKR {price:,}.",
|
| 'confidence': 0.98,
|
| 'type': 'property_data'
|
| }
|
|
|
|
|
| elif any(word in question_lower for word in ['bedroom', 'bed', 'rooms', 'kitne kamray', 'bed count']):
|
| beds = getattr(property_obj, 'beds', None)
|
| if beds:
|
| return {
|
| 'answer': f"This property has {beds} bedroom(s).",
|
| 'confidence': 0.98,
|
| 'type': 'property_data'
|
| }
|
|
|
|
|
| elif any(word in question_lower for word in ['bathroom', 'bath', 'washroom', 'bathrooms', 'bath count']):
|
| baths = getattr(property_obj, 'baths', None)
|
| if baths:
|
| return {
|
| 'answer': f"This property has {baths} bathroom(s).",
|
| 'confidence': 0.98,
|
| 'type': 'property_data'
|
| }
|
|
|
|
|
| elif any(word in question_lower for word in ['size', 'area size', 'total area', 'square feet', 'sqft', 'marle']):
|
| area_size = getattr(property_obj, 'area_unit', None) or getattr(property_obj, 'total_area', None) or getattr(property_obj, 'land_area', None)
|
| if area_size:
|
| return {
|
| 'answer': f"The total area of this property is {area_size} sq ft.",
|
| 'confidence': 0.98,
|
| 'type': 'property_data'
|
| }
|
|
|
|
|
| elif any(word in question_lower for word in ['type', 'property type', 'kind', 'category']):
|
| category = getattr(property_obj, 'category', None)
|
| if category:
|
| if hasattr(category, 'category'):
|
| category_name = category.category
|
| else:
|
| category_name = str(category)
|
| return {
|
| 'answer': f"This is a {category_name} property.",
|
| 'confidence': 0.98,
|
| 'type': 'property_data'
|
| }
|
|
|
| elif any(word in question_lower for word in ['detail', 'property information', 'property detail kiya hai', 'description', 'description kiya hai']):
|
| desc = getattr(property_obj, 'desc', None)
|
| if desc:
|
| if hasattr(desc, 'desc'):
|
| description_name = desc.desc
|
| else:
|
| description_name = str(desc)
|
| return {
|
| 'answer': f"iss property ki detail hai: {description_name}.",
|
| 'confidence': 0.98,
|
| 'type': 'property_data'
|
| }
|
|
|
| return None
|
|
|
| def _cosine_similarity(self, embedding1: List[float], embedding2: List[float]) -> float:
|
| """Calculate cosine similarity between two embeddings"""
|
| vec1 = np.array(embedding1)
|
| vec2 = np.array(embedding2)
|
| dot = np.dot(vec1, vec2)
|
| norm1 = np.linalg.norm(vec1)
|
| norm2 = np.linalg.norm(vec2)
|
| return float(dot / (norm1 * norm2)) if norm1 and norm2 else 0.0
|
|
|
|
|
| class AutoChatService:
|
| """Orchestration service for auto-chat functionality"""
|
|
|
| def __init__(self):
|
| self.embedding_service = EmbeddingService()
|
|
|
| def should_auto_reply(self, chat_id: str, property_id: str,
|
| last_agency_reply_at=None, last_client_message_at=None) -> bool:
|
| """Check if auto-reply should trigger"""
|
| from Chat.models import Chat
|
|
|
| try:
|
|
|
| property_state = PropertyAutoChatState.objects.get(property_id=property_id)
|
|
|
| if not property_state.is_auto_chat_enabled:
|
| return False
|
|
|
|
|
| property_obj = property_state.property
|
| agency_setting = AgencyAutoChatSetting.objects.get(agency=property_obj.user)
|
|
|
| if not agency_setting.is_enabled:
|
| return False
|
|
|
| if not last_client_message_at:
|
| return False
|
|
|
|
|
| if last_agency_reply_at and last_agency_reply_at > last_client_message_at:
|
| return False
|
|
|
|
|
| time_diff = (timezone.now() - last_client_message_at).total_seconds()
|
| return time_diff >= agency_setting.delay_seconds
|
|
|
| except (PropertyAutoChatState.DoesNotExist, AgencyAutoChatSetting.DoesNotExist):
|
| return False
|
| except Exception as e:
|
| logger.error(f"Error checking auto-reply: {e}")
|
| return False
|
|
|
| def generate_auto_reply(self, client_message: str, property_id: str,
|
| chat_id: str) -> Optional[Dict[str, Any]]:
|
| """Generate auto-reply with proper thresholds"""
|
| try:
|
| from Property.models import Property
|
|
|
| property_obj = Property.objects.get(id=property_id)
|
| agency_setting = AgencyAutoChatSetting.objects.get(agency=property_obj.user)
|
|
|
|
|
| result = self.embedding_service.find_best_answer_global_first(
|
| client_message,
|
| property_id,
|
| agency_setting.confidence_threshold
|
| )
|
|
|
| logger.info(f"🤖 Auto-reply | Type: {result['type']} | Confidence: {result['confidence']:.2f}")
|
|
|
| return {
|
| 'answer': result['answer'],
|
| 'confidence': result['confidence'],
|
| 'matched_question': result.get('matched_question'),
|
| 'is_auto_reply': True,
|
| 'reply_type': result['type'],
|
| 'is_fallback': result['type'] == 'fallback'
|
| }
|
|
|
| except Property.DoesNotExist:
|
| logger.error(f"Property {property_id} not found")
|
| return None
|
| except AgencyAutoChatSetting.DoesNotExist:
|
| logger.warning(f"No agency settings for property {property_id}, using defaults")
|
|
|
| result = self.embedding_service.find_best_answer_global_first(
|
| client_message, property_id, 0.6
|
| )
|
| return {
|
| 'answer': result['answer'],
|
| 'confidence': result['confidence'],
|
| 'matched_question': result.get('matched_question'),
|
| 'is_auto_reply': True,
|
| 'reply_type': result['type'],
|
| 'is_fallback': result['type'] == 'fallback'
|
| }
|
| except Exception as e:
|
| logger.error(f"Auto-reply error: {e}", exc_info=True)
|
| return None
|
|
|
| def sync_property_listing_to_ai(self, property_id: str) -> bool:
|
| """
|
| Sync property data to AI Q&A
|
| FIX #5: Don't delete existing, update in place
|
| """
|
| from .models import MasterQuestion, PropertyQA
|
| from Property.models import Property
|
|
|
| try:
|
| property_obj = Property.objects.get(id=property_id)
|
| agency = property_obj.user
|
|
|
|
|
| data_map = {
|
| "What is the price or rent of this property?": f"The demand for this property is PKR {property_obj if hasattr(property_obj, 'price') else 'contact agent'}.",
|
| "What is the location and area of this property?": f"This property is located in {getattr(property_obj, 'area', 'the area')}, {getattr(property_obj, 'city', 'the city')}.",
|
| "How many bedrooms and bathrooms does it have?": f"It is a {getattr(property_obj, 'beds', 'N/A')} bedroom property with {getattr(property_obj, 'baths', 'N/A')} bathrooms.",
|
| "What is the total size or area of the property?": f"The total size is {getattr(property_obj, 'area', 'N/A')} {getattr(property_obj, 'area_unit', 'sq ft')}.",
|
| "What type of property is it?": f"This is a {getattr(property_obj, 'category', 'property')}.",
|
| "How many kitchens and TV lounges does it have?": f"It has {getattr(property_obj, 'kitchen', 'N/A')} kitchens and {getattr(property_obj, 'tv_launch', 'N/A')} TV lounges.",
|
| "What is contact number of the agent?": f"You can contact the agent at {getattr(property_obj, 'agent_two_phone_number', 'the provided number')}.",
|
| "Information About this property": f"{getattr(property_obj, 'desc', 'Contact agent for details')}",
|
|
|
| }
|
|
|
| updated_count = 0
|
| created_count = 0
|
|
|
| for q_text, answer_text in data_map.items():
|
| master_q = MasterQuestion.objects.filter(question=q_text, is_active=True).first()
|
|
|
| if master_q:
|
| embedding = self.embedding_service.generate_embedding(q_text)
|
|
|
|
|
| obj, created = PropertyQA.objects.update_or_create(
|
| property=property_obj,
|
| master_question=master_q,
|
| defaults={
|
| 'agency': agency,
|
| 'answer': answer_text,
|
| 'question_embedding': embedding,
|
| 'is_active': True
|
| }
|
| )
|
|
|
| if created:
|
| created_count += 1
|
| else:
|
| updated_count += 1
|
|
|
| logger.info(f"✅ Sync complete for Property {property_id} | Created: {created_count} | Updated: {updated_count}")
|
| return True
|
|
|
| except Exception as e:
|
| logger.error(f"❌ Sync error for property {property_id}: {e}", exc_info=True)
|
| return False
|
|
|
| @receiver(post_save, sender='Property.Property')
|
| def auto_sync_ai_on_listing_save(sender, instance, created, **kwargs):
|
| """Jab bhi property save ho, AI data sync karo"""
|
| try:
|
| from ai_chatbot.services import AutoChatService
|
| service = AutoChatService()
|
| service.sync_property_listing_to_ai(instance.id)
|
| except Exception as e:
|
| logger.error(f"Signal Error: {e}") |