Spaces:
Running
Running
| """ | |
| Semantic Layer using SBERT for Column Understanding and Agent Routing | |
| Provides semantic understanding of dataset columns and agent intent matching | |
| using sentence-transformers embeddings. | |
| """ | |
| import numpy as np | |
| from typing import Dict, Any, List, Optional, Tuple | |
| import polars as pl | |
| from pathlib import Path | |
| import json | |
| # SBERT for semantic embeddings | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| import torch | |
| SBERT_AVAILABLE = True | |
| except ImportError: | |
| SBERT_AVAILABLE = False | |
| print("⚠️ sentence-transformers not available. Install with: pip install sentence-transformers") | |
| # Sklearn for similarity | |
| try: | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| SKLEARN_AVAILABLE = True | |
| except ImportError: | |
| SKLEARN_AVAILABLE = False | |
| class SemanticLayer: | |
| """ | |
| Semantic understanding layer using SBERT embeddings. | |
| Features: | |
| - Column semantic embedding (name + sample values + dtype) | |
| - Semantic column matching (find similar columns) | |
| - Agent intent routing (semantic task → agent mapping) | |
| - Target column inference (semantic similarity to "target") | |
| """ | |
| def __init__(self, model_name: str = "all-MiniLM-L6-v2"): | |
| """ | |
| Initialize semantic layer with SBERT model. | |
| Args: | |
| model_name: Sentence-transformer model name | |
| - all-MiniLM-L6-v2: Fast, 384 dims (recommended) | |
| - all-mpnet-base-v2: Better quality, 768 dims, slower | |
| - paraphrase-MiniLM-L6-v2: Good for short texts | |
| """ | |
| self.model_name = model_name | |
| self.model = None | |
| self.enabled = SBERT_AVAILABLE and SKLEARN_AVAILABLE | |
| if self.enabled: | |
| try: | |
| print(f"🧠 Loading SBERT model: {model_name}...") | |
| # Try loading with trust_remote_code for better compatibility | |
| self.model = SentenceTransformer(model_name, trust_remote_code=True) | |
| # Use GPU if available | |
| if torch.cuda.is_available(): | |
| self.model = self.model.to('cuda') | |
| print("✅ SBERT loaded on GPU") | |
| else: | |
| print("✅ SBERT loaded on CPU") | |
| except Exception as e: | |
| print(f"⚠️ Failed to load SBERT model: {e}") | |
| print(f" Falling back to keyword-based routing (semantic features disabled)") | |
| self.enabled = False | |
| else: | |
| print("⚠️ SBERT semantic layer disabled (missing dependencies)") | |
| def encode_column(self, column_name: str, dtype: str, | |
| sample_values: Optional[List[Any]] = None, | |
| stats: Optional[Dict[str, Any]] = None) -> np.ndarray: | |
| """ | |
| Create semantic embedding for a column. | |
| Combines column name, data type, sample values, and stats into | |
| a text description that captures the column's semantic meaning. | |
| Args: | |
| column_name: Name of the column | |
| dtype: Data type (Int64, Float64, Utf8, etc.) | |
| sample_values: Sample values from the column | |
| stats: Optional statistics (mean, min, max, etc.) | |
| Returns: | |
| Embedding vector (numpy array) | |
| Example: | |
| >>> encode_column("annual_salary", "Float64", [50000, 75000], {"mean": 65000}) | |
| >>> # Returns embedding for "annual_salary (Float64 numeric): values like 50000, 75000, mean 65000" | |
| """ | |
| if not self.enabled: | |
| return np.zeros(384) # Dummy embedding | |
| # Build semantic description | |
| description_parts = [f"Column name: {column_name}"] | |
| # Add type information | |
| type_desc = self._interpret_dtype(dtype) | |
| description_parts.append(f"Type: {type_desc}") | |
| # Add sample values | |
| if sample_values: | |
| # Format samples nicely | |
| samples_str = ", ".join([str(v)[:50] for v in sample_values[:5] if v is not None]) | |
| description_parts.append(f"Example values: {samples_str}") | |
| # Add statistics | |
| if stats: | |
| if 'mean' in stats and stats['mean'] is not None: | |
| description_parts.append(f"Mean: {stats['mean']:.2f}") | |
| if 'unique_count' in stats and stats['unique_count'] is not None: | |
| description_parts.append(f"Unique values: {stats['unique_count']}") | |
| if 'null_percentage' in stats and stats['null_percentage'] is not None: | |
| description_parts.append(f"Missing: {stats['null_percentage']:.1f}%") | |
| # Combine into single text | |
| text = ". ".join(description_parts) | |
| # Generate embedding | |
| try: | |
| embedding = self.model.encode(text, convert_to_numpy=True, show_progress_bar=False) | |
| return embedding | |
| except Exception as e: | |
| print(f"⚠️ Error encoding column {column_name}: {e}") | |
| return np.zeros(self.model.get_sentence_embedding_dimension()) | |
| def _interpret_dtype(self, dtype: str) -> str: | |
| """Convert polars dtype to human-readable description.""" | |
| dtype_lower = str(dtype).lower() | |
| if 'int' in dtype_lower or 'float' in dtype_lower: | |
| return "numeric continuous or count data" | |
| elif 'bool' in dtype_lower: | |
| return "boolean flag" | |
| elif 'utf8' in dtype_lower or 'str' in dtype_lower: | |
| return "text or categorical label" | |
| elif 'date' in dtype_lower or 'time' in dtype_lower: | |
| return "temporal timestamp" | |
| else: | |
| return "data values" | |
| def find_similar_columns(self, query_column: str, column_embeddings: Dict[str, np.ndarray], | |
| top_k: int = 3, threshold: float = 0.6) -> List[Tuple[str, float]]: | |
| """ | |
| Find columns semantically similar to query column. | |
| Use case: Detect duplicates or related columns | |
| Example: "Salary" → finds ["Annual_Income", "Compensation", "Pay"] | |
| Args: | |
| query_column: Column name to search for | |
| column_embeddings: Dict mapping column names to their embeddings | |
| top_k: Number of similar columns to return | |
| threshold: Minimum similarity score (0-1) | |
| Returns: | |
| List of (column_name, similarity_score) tuples | |
| """ | |
| if not self.enabled or query_column not in column_embeddings: | |
| return [] | |
| query_emb = column_embeddings[query_column].reshape(1, -1) | |
| similarities = [] | |
| for col_name, col_emb in column_embeddings.items(): | |
| if col_name == query_column: | |
| continue | |
| sim = cosine_similarity(query_emb, col_emb.reshape(1, -1))[0][0] | |
| if sim >= threshold: | |
| similarities.append((col_name, float(sim))) | |
| # Sort by similarity descending | |
| similarities.sort(key=lambda x: x[1], reverse=True) | |
| return similarities[:top_k] | |
| def infer_target_column(self, column_embeddings: Dict[str, np.ndarray], | |
| task_description: str) -> Optional[Tuple[str, float]]: | |
| """ | |
| Infer which column is likely the target/label for prediction. | |
| Uses semantic similarity between column descriptions and task description. | |
| Args: | |
| column_embeddings: Dict mapping column names to embeddings | |
| task_description: User's task description | |
| Returns: | |
| (column_name, confidence_score) or None | |
| Example: | |
| >>> infer_target_column(embeddings, "predict house prices") | |
| >>> ("Price", 0.85) # High confidence "Price" is target | |
| """ | |
| if not self.enabled: | |
| return None | |
| # Encode task description | |
| task_emb = self.model.encode(task_description, convert_to_numpy=True, show_progress_bar=False) | |
| task_emb = task_emb.reshape(1, -1) | |
| # Find column with highest similarity to task | |
| best_col = None | |
| best_score = 0.0 | |
| for col_name, col_emb in column_embeddings.items(): | |
| sim = cosine_similarity(task_emb, col_emb.reshape(1, -1))[0][0] | |
| if sim > best_score: | |
| best_score = sim | |
| best_col = col_name | |
| # Only return if confidence is reasonable | |
| if best_score >= 0.4: # Threshold for target inference | |
| return (best_col, float(best_score)) | |
| return None | |
| def route_to_agent(self, task_description: str, | |
| agent_descriptions: Dict[str, str]) -> Tuple[str, float]: | |
| """ | |
| Route task to appropriate specialist agent using semantic similarity. | |
| Replaces keyword-based routing with semantic understanding. | |
| Args: | |
| task_description: User's task description | |
| agent_descriptions: Dict mapping agent_key → agent description | |
| Returns: | |
| (agent_key, confidence_score) | |
| Example: | |
| >>> route_to_agent("build a predictive model", { | |
| ... "modeling_agent": "Expert in ML training and models", | |
| ... "viz_agent": "Expert in visualizations" | |
| ... }) | |
| >>> ("modeling_agent", 0.92) | |
| """ | |
| if not self.enabled: | |
| # Fallback to first agent | |
| return list(agent_descriptions.keys())[0], 0.5 | |
| # Encode task | |
| task_emb = self.model.encode(task_description, convert_to_numpy=True, show_progress_bar=False) | |
| task_emb = task_emb.reshape(1, -1) | |
| # Encode agent descriptions | |
| best_agent = None | |
| best_score = 0.0 | |
| for agent_key, agent_desc in agent_descriptions.items(): | |
| agent_emb = self.model.encode(agent_desc, convert_to_numpy=True, show_progress_bar=False) | |
| agent_emb = agent_emb.reshape(1, -1) | |
| sim = cosine_similarity(task_emb, agent_emb)[0][0] | |
| if sim > best_score: | |
| best_score = sim | |
| best_agent = agent_key | |
| return best_agent, float(best_score) | |
| def semantic_column_match(self, target_name: str, available_columns: List[str], | |
| threshold: float = 0.6) -> Optional[Tuple[str, float]]: | |
| """ | |
| Find best matching column for a target name using fuzzy semantic matching. | |
| Better than string fuzzy matching because it understands synonyms: | |
| - "salary" matches "annual_income", "compensation", "pay" | |
| - "target" matches "label", "class", "outcome" | |
| Args: | |
| target_name: Column name to find (might not exist exactly) | |
| available_columns: List of actual column names in dataset | |
| threshold: Minimum similarity to consider a match | |
| Returns: | |
| (matched_column, confidence) or None | |
| Example: | |
| >>> semantic_column_match("salary", ["Annual_Income", "Name", "Age"]) | |
| >>> ("Annual_Income", 0.78) | |
| """ | |
| if not self.enabled: | |
| # Fallback to exact match | |
| if target_name in available_columns: | |
| return (target_name, 1.0) | |
| return None | |
| # Encode target | |
| target_emb = self.model.encode(target_name, convert_to_numpy=True, show_progress_bar=False) | |
| target_emb = target_emb.reshape(1, -1) | |
| # Find best match | |
| best_col = None | |
| best_score = 0.0 | |
| for col in available_columns: | |
| col_emb = self.model.encode(col, convert_to_numpy=True, show_progress_bar=False) | |
| col_emb = col_emb.reshape(1, -1) | |
| sim = cosine_similarity(target_emb, col_emb)[0][0] | |
| if sim > best_score: | |
| best_score = sim | |
| best_col = col | |
| if best_score >= threshold: | |
| return (best_col, float(best_score)) | |
| return None | |
| def enrich_dataset_info(self, dataset_info: Dict[str, Any], | |
| file_path: str, sample_size: int = 100) -> Dict[str, Any]: | |
| """ | |
| Enrich dataset_info with semantic column embeddings. | |
| Adds 'column_embeddings' and 'semantic_insights' to dataset_info. | |
| Args: | |
| dataset_info: Dataset info from schema_extraction | |
| file_path: Path to CSV file | |
| sample_size: Number of rows to sample for encoding | |
| Returns: | |
| Enhanced dataset_info with semantic layer | |
| """ | |
| if not self.enabled: | |
| return dataset_info | |
| try: | |
| # Load dataset | |
| df = pl.read_csv(file_path, n_rows=sample_size) | |
| column_embeddings = {} | |
| for col_name, col_info in dataset_info['columns'].items(): | |
| # Get sample values | |
| sample_values = df[col_name].head(5).to_list() | |
| # Create embedding | |
| embedding = self.encode_column( | |
| column_name=col_name, | |
| dtype=col_info['dtype'], | |
| sample_values=sample_values, | |
| stats={ | |
| 'unique_count': col_info.get('unique_count'), | |
| 'missing_pct': col_info.get('missing_pct'), | |
| 'mean': col_info.get('mean') | |
| } | |
| ) | |
| column_embeddings[col_name] = embedding | |
| # Add to dataset_info | |
| dataset_info['column_embeddings'] = column_embeddings | |
| # Detect similar columns (potential duplicates) | |
| similar_pairs = [] | |
| cols = list(column_embeddings.keys()) | |
| for i, col1 in enumerate(cols): | |
| similar = self.find_similar_columns(col1, column_embeddings, top_k=1, threshold=0.75) | |
| if similar: | |
| similar_pairs.append((col1, similar[0][0], similar[0][1])) | |
| dataset_info['semantic_insights'] = { | |
| 'similar_columns': similar_pairs, | |
| 'total_columns_embedded': len(column_embeddings) | |
| } | |
| print(f"🧠 Semantic layer: Embedded {len(column_embeddings)} columns") | |
| if similar_pairs: | |
| print(f" Found {len(similar_pairs)} similar column pairs (potential duplicates)") | |
| except Exception as e: | |
| print(f"⚠️ Error enriching dataset with semantic layer: {e}") | |
| return dataset_info | |
| # Global semantic layer instance (lazy loaded) | |
| _semantic_layer = None | |
| def get_semantic_layer() -> SemanticLayer: | |
| """Get or create global semantic layer instance.""" | |
| global _semantic_layer | |
| if _semantic_layer is None: | |
| _semantic_layer = SemanticLayer() | |
| return _semantic_layer | |