| | import datetime |
| | from typing import List, Dict, Any, Optional, Tuple |
| | import numpy as np |
| | from models.LexRank import degree_centrality_scores |
| | import logging |
| | from datetime import datetime as dt |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | class QueryProcessor: |
| | def __init__(self, embedding_model, summarization_model, nlp_model, db_service): |
| | self.embedding_model = embedding_model |
| | self.summarization_model = summarization_model |
| | self.nlp_model = nlp_model |
| | self.db_service = db_service |
| | logger.info("QueryProcessor initialized") |
| |
|
| | async def process( |
| | self, |
| | query: str, |
| | topic: Optional[str] = None, |
| | start_date: Optional[str] = None, |
| | end_date: Optional[str] = None |
| | ) -> Dict[str, Any]: |
| | try: |
| | |
| | start_dt = self._parse_date(start_date) if start_date else None |
| | end_dt = self._parse_date(end_date) if end_date else None |
| | |
| | |
| | query_embedding = self.embedding_model.encode(query).tolist() |
| | entities = self.nlp_model.extract_entities(query) |
| | print(f"Extracted entities: {entities}") |
| | |
| | |
| | articles = await self._execute_semantic_search( |
| | query_embedding, |
| | start_dt, |
| | end_dt, |
| | topic, |
| | entities |
| | ) |
| | |
| | if not articles: |
| | return {"message": "No articles found", "articles": []} |
| |
|
| | |
| | print("Starting summary generation") |
| | summary_data = self._generate_summary(articles) |
| | return { |
| | "summary": summary_data["summary"], |
| | "key_sentences": summary_data["key_sentences"], |
| | "articles": articles, |
| | "entities": entities |
| | } |
| |
|
| | except Exception as e: |
| | logger.error(f"Processing failed: {str(e)}", exc_info=True) |
| | return {"error": str(e)} |
| |
|
| | def _parse_date(self, date_str: str) -> dt: |
| | """Safe date parsing with validation""" |
| | try: |
| | return dt.strptime(date_str, "%Y-%m-%d") |
| | except ValueError as e: |
| | logger.error(f"Invalid date format: {date_str}") |
| | raise ValueError(f"Invalid date format. Expected YYYY-MM-DD, got {date_str}") |
| |
|
| | def _extract_entities_safely(self, text: str) -> List[Tuple[str, str]]: |
| | """Robust entity extraction handling both strings and lists""" |
| | try: |
| | if isinstance(text, list): |
| | logger.warning("Received list input for entity extraction, joining to string") |
| | text = " ".join(text) |
| | return self.nlp_model.extract_entities(text) |
| | except Exception as e: |
| | logger.error(f"Entity extraction failed: {str(e)}") |
| | return [] |
| |
|
| | async def _execute_semantic_search( |
| | self, |
| | query_embedding: List[float], |
| | start_date: Optional[dt], |
| | end_date: Optional[dt], |
| | topic: Optional[str], |
| | entities: List[Tuple[str, str]] |
| | ) -> List[Dict[str, Any]]: |
| | """Execute search with proper error handling""" |
| | try: |
| | return await self.db_service.semantic_search( |
| | query_embedding=query_embedding, |
| | start_date=start_date, |
| | end_date=end_date, |
| | topic=topic, |
| | entities=entities |
| | ) |
| | except Exception as e: |
| | logger.error(f"Semantic search failed: {str(e)}") |
| | raise |
| |
|
| | def _generate_summary(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]: |
| | """Generate summary from articles with fallback handling""" |
| | try: |
| | contents = [article["content"] for article in articles] |
| | sentences = [] |
| | |
| | for content in contents: |
| | if content: |
| | sentences.extend(self.nlp_model.tokenize_sentences(content)) |
| | |
| | if not sentences: |
| | logger.warning("No sentences available for summarization") |
| | return { |
| | "summary": "No content available for summarization", |
| | "key_sentences": [] |
| | } |
| | |
| | print("Starting first summary generation") |
| | embeddings = self.embedding_model.encode(sentences) |
| | print("Embeddings generated first summary") |
| | similarity_matrix = np.dot(embeddings, embeddings.T) / (np.linalg.norm(embeddings, axis=1, keepdims=True) * np.linalg.norm(embeddings, axis=1, keepdims=True).T) |
| | centrality_scores = degree_centrality_scores(similarity_matrix, threshold=None) |
| | |
| | top_indices = np.argsort(-centrality_scores)[:10] |
| | key_sentences = [sentences[idx].strip() for idx in top_indices] |
| | combined_text = ' '.join(key_sentences) |
| | |
| | print(f"First summary done with: {len(key_sentences)} sentences") |
| | print(combined_text) |
| |
|
| | return { |
| | "summary": self.summarization_model.summarize(combined_text), |
| | "key_sentences": key_sentences |
| | } |
| |
|
| | except Exception as e: |
| | logger.error(f"Summary generation failed: {str(e)}") |
| | return { |
| | "summary": "Summary generation failed", |
| | "key_sentences": [] |
| | } |