Data-Science-Agent / src /utils /semantic_layer.py
Pulastya B
Fix: Better SBERT error handling + suppress invalid hand-off warnings
4a3a3e8
"""
Semantic Layer using SBERT for Column Understanding and Agent Routing
Provides semantic understanding of dataset columns and agent intent matching
using sentence-transformers embeddings.
"""
import numpy as np
from typing import Dict, Any, List, Optional, Tuple
import polars as pl
from pathlib import Path
import json
# SBERT for semantic embeddings
try:
from sentence_transformers import SentenceTransformer
import torch
SBERT_AVAILABLE = True
except ImportError:
SBERT_AVAILABLE = False
print("⚠️ sentence-transformers not available. Install with: pip install sentence-transformers")
# Sklearn for similarity
try:
from sklearn.metrics.pairwise import cosine_similarity
SKLEARN_AVAILABLE = True
except ImportError:
SKLEARN_AVAILABLE = False
class SemanticLayer:
"""
Semantic understanding layer using SBERT embeddings.
Features:
- Column semantic embedding (name + sample values + dtype)
- Semantic column matching (find similar columns)
- Agent intent routing (semantic task → agent mapping)
- Target column inference (semantic similarity to "target")
"""
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
"""
Initialize semantic layer with SBERT model.
Args:
model_name: Sentence-transformer model name
- all-MiniLM-L6-v2: Fast, 384 dims (recommended)
- all-mpnet-base-v2: Better quality, 768 dims, slower
- paraphrase-MiniLM-L6-v2: Good for short texts
"""
self.model_name = model_name
self.model = None
self.enabled = SBERT_AVAILABLE and SKLEARN_AVAILABLE
if self.enabled:
try:
print(f"🧠 Loading SBERT model: {model_name}...")
# Try loading with trust_remote_code for better compatibility
self.model = SentenceTransformer(model_name, trust_remote_code=True)
# Use GPU if available
if torch.cuda.is_available():
self.model = self.model.to('cuda')
print("✅ SBERT loaded on GPU")
else:
print("✅ SBERT loaded on CPU")
except Exception as e:
print(f"⚠️ Failed to load SBERT model: {e}")
print(f" Falling back to keyword-based routing (semantic features disabled)")
self.enabled = False
else:
print("⚠️ SBERT semantic layer disabled (missing dependencies)")
def encode_column(self, column_name: str, dtype: str,
sample_values: Optional[List[Any]] = None,
stats: Optional[Dict[str, Any]] = None) -> np.ndarray:
"""
Create semantic embedding for a column.
Combines column name, data type, sample values, and stats into
a text description that captures the column's semantic meaning.
Args:
column_name: Name of the column
dtype: Data type (Int64, Float64, Utf8, etc.)
sample_values: Sample values from the column
stats: Optional statistics (mean, min, max, etc.)
Returns:
Embedding vector (numpy array)
Example:
>>> encode_column("annual_salary", "Float64", [50000, 75000], {"mean": 65000})
>>> # Returns embedding for "annual_salary (Float64 numeric): values like 50000, 75000, mean 65000"
"""
if not self.enabled:
return np.zeros(384) # Dummy embedding
# Build semantic description
description_parts = [f"Column name: {column_name}"]
# Add type information
type_desc = self._interpret_dtype(dtype)
description_parts.append(f"Type: {type_desc}")
# Add sample values
if sample_values:
# Format samples nicely
samples_str = ", ".join([str(v)[:50] for v in sample_values[:5] if v is not None])
description_parts.append(f"Example values: {samples_str}")
# Add statistics
if stats:
if 'mean' in stats and stats['mean'] is not None:
description_parts.append(f"Mean: {stats['mean']:.2f}")
if 'unique_count' in stats and stats['unique_count'] is not None:
description_parts.append(f"Unique values: {stats['unique_count']}")
if 'null_percentage' in stats and stats['null_percentage'] is not None:
description_parts.append(f"Missing: {stats['null_percentage']:.1f}%")
# Combine into single text
text = ". ".join(description_parts)
# Generate embedding
try:
embedding = self.model.encode(text, convert_to_numpy=True, show_progress_bar=False)
return embedding
except Exception as e:
print(f"⚠️ Error encoding column {column_name}: {e}")
return np.zeros(self.model.get_sentence_embedding_dimension())
def _interpret_dtype(self, dtype: str) -> str:
"""Convert polars dtype to human-readable description."""
dtype_lower = str(dtype).lower()
if 'int' in dtype_lower or 'float' in dtype_lower:
return "numeric continuous or count data"
elif 'bool' in dtype_lower:
return "boolean flag"
elif 'utf8' in dtype_lower or 'str' in dtype_lower:
return "text or categorical label"
elif 'date' in dtype_lower or 'time' in dtype_lower:
return "temporal timestamp"
else:
return "data values"
def find_similar_columns(self, query_column: str, column_embeddings: Dict[str, np.ndarray],
top_k: int = 3, threshold: float = 0.6) -> List[Tuple[str, float]]:
"""
Find columns semantically similar to query column.
Use case: Detect duplicates or related columns
Example: "Salary" → finds ["Annual_Income", "Compensation", "Pay"]
Args:
query_column: Column name to search for
column_embeddings: Dict mapping column names to their embeddings
top_k: Number of similar columns to return
threshold: Minimum similarity score (0-1)
Returns:
List of (column_name, similarity_score) tuples
"""
if not self.enabled or query_column not in column_embeddings:
return []
query_emb = column_embeddings[query_column].reshape(1, -1)
similarities = []
for col_name, col_emb in column_embeddings.items():
if col_name == query_column:
continue
sim = cosine_similarity(query_emb, col_emb.reshape(1, -1))[0][0]
if sim >= threshold:
similarities.append((col_name, float(sim)))
# Sort by similarity descending
similarities.sort(key=lambda x: x[1], reverse=True)
return similarities[:top_k]
def infer_target_column(self, column_embeddings: Dict[str, np.ndarray],
task_description: str) -> Optional[Tuple[str, float]]:
"""
Infer which column is likely the target/label for prediction.
Uses semantic similarity between column descriptions and task description.
Args:
column_embeddings: Dict mapping column names to embeddings
task_description: User's task description
Returns:
(column_name, confidence_score) or None
Example:
>>> infer_target_column(embeddings, "predict house prices")
>>> ("Price", 0.85) # High confidence "Price" is target
"""
if not self.enabled:
return None
# Encode task description
task_emb = self.model.encode(task_description, convert_to_numpy=True, show_progress_bar=False)
task_emb = task_emb.reshape(1, -1)
# Find column with highest similarity to task
best_col = None
best_score = 0.0
for col_name, col_emb in column_embeddings.items():
sim = cosine_similarity(task_emb, col_emb.reshape(1, -1))[0][0]
if sim > best_score:
best_score = sim
best_col = col_name
# Only return if confidence is reasonable
if best_score >= 0.4: # Threshold for target inference
return (best_col, float(best_score))
return None
def route_to_agent(self, task_description: str,
agent_descriptions: Dict[str, str]) -> Tuple[str, float]:
"""
Route task to appropriate specialist agent using semantic similarity.
Replaces keyword-based routing with semantic understanding.
Args:
task_description: User's task description
agent_descriptions: Dict mapping agent_key → agent description
Returns:
(agent_key, confidence_score)
Example:
>>> route_to_agent("build a predictive model", {
... "modeling_agent": "Expert in ML training and models",
... "viz_agent": "Expert in visualizations"
... })
>>> ("modeling_agent", 0.92)
"""
if not self.enabled:
# Fallback to first agent
return list(agent_descriptions.keys())[0], 0.5
# Encode task
task_emb = self.model.encode(task_description, convert_to_numpy=True, show_progress_bar=False)
task_emb = task_emb.reshape(1, -1)
# Encode agent descriptions
best_agent = None
best_score = 0.0
for agent_key, agent_desc in agent_descriptions.items():
agent_emb = self.model.encode(agent_desc, convert_to_numpy=True, show_progress_bar=False)
agent_emb = agent_emb.reshape(1, -1)
sim = cosine_similarity(task_emb, agent_emb)[0][0]
if sim > best_score:
best_score = sim
best_agent = agent_key
return best_agent, float(best_score)
def semantic_column_match(self, target_name: str, available_columns: List[str],
threshold: float = 0.6) -> Optional[Tuple[str, float]]:
"""
Find best matching column for a target name using fuzzy semantic matching.
Better than string fuzzy matching because it understands synonyms:
- "salary" matches "annual_income", "compensation", "pay"
- "target" matches "label", "class", "outcome"
Args:
target_name: Column name to find (might not exist exactly)
available_columns: List of actual column names in dataset
threshold: Minimum similarity to consider a match
Returns:
(matched_column, confidence) or None
Example:
>>> semantic_column_match("salary", ["Annual_Income", "Name", "Age"])
>>> ("Annual_Income", 0.78)
"""
if not self.enabled:
# Fallback to exact match
if target_name in available_columns:
return (target_name, 1.0)
return None
# Encode target
target_emb = self.model.encode(target_name, convert_to_numpy=True, show_progress_bar=False)
target_emb = target_emb.reshape(1, -1)
# Find best match
best_col = None
best_score = 0.0
for col in available_columns:
col_emb = self.model.encode(col, convert_to_numpy=True, show_progress_bar=False)
col_emb = col_emb.reshape(1, -1)
sim = cosine_similarity(target_emb, col_emb)[0][0]
if sim > best_score:
best_score = sim
best_col = col
if best_score >= threshold:
return (best_col, float(best_score))
return None
def enrich_dataset_info(self, dataset_info: Dict[str, Any],
file_path: str, sample_size: int = 100) -> Dict[str, Any]:
"""
Enrich dataset_info with semantic column embeddings.
Adds 'column_embeddings' and 'semantic_insights' to dataset_info.
Args:
dataset_info: Dataset info from schema_extraction
file_path: Path to CSV file
sample_size: Number of rows to sample for encoding
Returns:
Enhanced dataset_info with semantic layer
"""
if not self.enabled:
return dataset_info
try:
# Load dataset
df = pl.read_csv(file_path, n_rows=sample_size)
column_embeddings = {}
for col_name, col_info in dataset_info['columns'].items():
# Get sample values
sample_values = df[col_name].head(5).to_list()
# Create embedding
embedding = self.encode_column(
column_name=col_name,
dtype=col_info['dtype'],
sample_values=sample_values,
stats={
'unique_count': col_info.get('unique_count'),
'missing_pct': col_info.get('missing_pct'),
'mean': col_info.get('mean')
}
)
column_embeddings[col_name] = embedding
# Add to dataset_info
dataset_info['column_embeddings'] = column_embeddings
# Detect similar columns (potential duplicates)
similar_pairs = []
cols = list(column_embeddings.keys())
for i, col1 in enumerate(cols):
similar = self.find_similar_columns(col1, column_embeddings, top_k=1, threshold=0.75)
if similar:
similar_pairs.append((col1, similar[0][0], similar[0][1]))
dataset_info['semantic_insights'] = {
'similar_columns': similar_pairs,
'total_columns_embedded': len(column_embeddings)
}
print(f"🧠 Semantic layer: Embedded {len(column_embeddings)} columns")
if similar_pairs:
print(f" Found {len(similar_pairs)} similar column pairs (potential duplicates)")
except Exception as e:
print(f"⚠️ Error enriching dataset with semantic layer: {e}")
return dataset_info
# Global semantic layer instance (lazy loaded)
_semantic_layer = None
def get_semantic_layer() -> SemanticLayer:
"""Get or create global semantic layer instance."""
global _semantic_layer
if _semantic_layer is None:
_semantic_layer = SemanticLayer()
return _semantic_layer