Spaces:
Running
Running
File size: 15,172 Bytes
05a3c74 4a3a3e8 05a3c74 4a3a3e8 05a3c74 16616fb 05a3c74 16616fb 05a3c74 16616fb 05a3c74 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 | """
Semantic Layer using SBERT for Column Understanding and Agent Routing
Provides semantic understanding of dataset columns and agent intent matching
using sentence-transformers embeddings.
"""
import numpy as np
from typing import Dict, Any, List, Optional, Tuple
import polars as pl
from pathlib import Path
import json
# SBERT for semantic embeddings
try:
from sentence_transformers import SentenceTransformer
import torch
SBERT_AVAILABLE = True
except ImportError:
SBERT_AVAILABLE = False
print("⚠️ sentence-transformers not available. Install with: pip install sentence-transformers")
# Sklearn for similarity
try:
from sklearn.metrics.pairwise import cosine_similarity
SKLEARN_AVAILABLE = True
except ImportError:
SKLEARN_AVAILABLE = False
class SemanticLayer:
"""
Semantic understanding layer using SBERT embeddings.
Features:
- Column semantic embedding (name + sample values + dtype)
- Semantic column matching (find similar columns)
- Agent intent routing (semantic task → agent mapping)
- Target column inference (semantic similarity to "target")
"""
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
"""
Initialize semantic layer with SBERT model.
Args:
model_name: Sentence-transformer model name
- all-MiniLM-L6-v2: Fast, 384 dims (recommended)
- all-mpnet-base-v2: Better quality, 768 dims, slower
- paraphrase-MiniLM-L6-v2: Good for short texts
"""
self.model_name = model_name
self.model = None
self.enabled = SBERT_AVAILABLE and SKLEARN_AVAILABLE
if self.enabled:
try:
print(f"🧠 Loading SBERT model: {model_name}...")
# Try loading with trust_remote_code for better compatibility
self.model = SentenceTransformer(model_name, trust_remote_code=True)
# Use GPU if available
if torch.cuda.is_available():
self.model = self.model.to('cuda')
print("✅ SBERT loaded on GPU")
else:
print("✅ SBERT loaded on CPU")
except Exception as e:
print(f"⚠️ Failed to load SBERT model: {e}")
print(f" Falling back to keyword-based routing (semantic features disabled)")
self.enabled = False
else:
print("⚠️ SBERT semantic layer disabled (missing dependencies)")
def encode_column(self, column_name: str, dtype: str,
sample_values: Optional[List[Any]] = None,
stats: Optional[Dict[str, Any]] = None) -> np.ndarray:
"""
Create semantic embedding for a column.
Combines column name, data type, sample values, and stats into
a text description that captures the column's semantic meaning.
Args:
column_name: Name of the column
dtype: Data type (Int64, Float64, Utf8, etc.)
sample_values: Sample values from the column
stats: Optional statistics (mean, min, max, etc.)
Returns:
Embedding vector (numpy array)
Example:
>>> encode_column("annual_salary", "Float64", [50000, 75000], {"mean": 65000})
>>> # Returns embedding for "annual_salary (Float64 numeric): values like 50000, 75000, mean 65000"
"""
if not self.enabled:
return np.zeros(384) # Dummy embedding
# Build semantic description
description_parts = [f"Column name: {column_name}"]
# Add type information
type_desc = self._interpret_dtype(dtype)
description_parts.append(f"Type: {type_desc}")
# Add sample values
if sample_values:
# Format samples nicely
samples_str = ", ".join([str(v)[:50] for v in sample_values[:5] if v is not None])
description_parts.append(f"Example values: {samples_str}")
# Add statistics
if stats:
if 'mean' in stats and stats['mean'] is not None:
description_parts.append(f"Mean: {stats['mean']:.2f}")
if 'unique_count' in stats and stats['unique_count'] is not None:
description_parts.append(f"Unique values: {stats['unique_count']}")
if 'null_percentage' in stats and stats['null_percentage'] is not None:
description_parts.append(f"Missing: {stats['null_percentage']:.1f}%")
# Combine into single text
text = ". ".join(description_parts)
# Generate embedding
try:
embedding = self.model.encode(text, convert_to_numpy=True, show_progress_bar=False)
return embedding
except Exception as e:
print(f"⚠️ Error encoding column {column_name}: {e}")
return np.zeros(self.model.get_sentence_embedding_dimension())
def _interpret_dtype(self, dtype: str) -> str:
"""Convert polars dtype to human-readable description."""
dtype_lower = str(dtype).lower()
if 'int' in dtype_lower or 'float' in dtype_lower:
return "numeric continuous or count data"
elif 'bool' in dtype_lower:
return "boolean flag"
elif 'utf8' in dtype_lower or 'str' in dtype_lower:
return "text or categorical label"
elif 'date' in dtype_lower or 'time' in dtype_lower:
return "temporal timestamp"
else:
return "data values"
def find_similar_columns(self, query_column: str, column_embeddings: Dict[str, np.ndarray],
top_k: int = 3, threshold: float = 0.6) -> List[Tuple[str, float]]:
"""
Find columns semantically similar to query column.
Use case: Detect duplicates or related columns
Example: "Salary" → finds ["Annual_Income", "Compensation", "Pay"]
Args:
query_column: Column name to search for
column_embeddings: Dict mapping column names to their embeddings
top_k: Number of similar columns to return
threshold: Minimum similarity score (0-1)
Returns:
List of (column_name, similarity_score) tuples
"""
if not self.enabled or query_column not in column_embeddings:
return []
query_emb = column_embeddings[query_column].reshape(1, -1)
similarities = []
for col_name, col_emb in column_embeddings.items():
if col_name == query_column:
continue
sim = cosine_similarity(query_emb, col_emb.reshape(1, -1))[0][0]
if sim >= threshold:
similarities.append((col_name, float(sim)))
# Sort by similarity descending
similarities.sort(key=lambda x: x[1], reverse=True)
return similarities[:top_k]
def infer_target_column(self, column_embeddings: Dict[str, np.ndarray],
task_description: str) -> Optional[Tuple[str, float]]:
"""
Infer which column is likely the target/label for prediction.
Uses semantic similarity between column descriptions and task description.
Args:
column_embeddings: Dict mapping column names to embeddings
task_description: User's task description
Returns:
(column_name, confidence_score) or None
Example:
>>> infer_target_column(embeddings, "predict house prices")
>>> ("Price", 0.85) # High confidence "Price" is target
"""
if not self.enabled:
return None
# Encode task description
task_emb = self.model.encode(task_description, convert_to_numpy=True, show_progress_bar=False)
task_emb = task_emb.reshape(1, -1)
# Find column with highest similarity to task
best_col = None
best_score = 0.0
for col_name, col_emb in column_embeddings.items():
sim = cosine_similarity(task_emb, col_emb.reshape(1, -1))[0][0]
if sim > best_score:
best_score = sim
best_col = col_name
# Only return if confidence is reasonable
if best_score >= 0.4: # Threshold for target inference
return (best_col, float(best_score))
return None
def route_to_agent(self, task_description: str,
agent_descriptions: Dict[str, str]) -> Tuple[str, float]:
"""
Route task to appropriate specialist agent using semantic similarity.
Replaces keyword-based routing with semantic understanding.
Args:
task_description: User's task description
agent_descriptions: Dict mapping agent_key → agent description
Returns:
(agent_key, confidence_score)
Example:
>>> route_to_agent("build a predictive model", {
... "modeling_agent": "Expert in ML training and models",
... "viz_agent": "Expert in visualizations"
... })
>>> ("modeling_agent", 0.92)
"""
if not self.enabled:
# Fallback to first agent
return list(agent_descriptions.keys())[0], 0.5
# Encode task
task_emb = self.model.encode(task_description, convert_to_numpy=True, show_progress_bar=False)
task_emb = task_emb.reshape(1, -1)
# Encode agent descriptions
best_agent = None
best_score = 0.0
for agent_key, agent_desc in agent_descriptions.items():
agent_emb = self.model.encode(agent_desc, convert_to_numpy=True, show_progress_bar=False)
agent_emb = agent_emb.reshape(1, -1)
sim = cosine_similarity(task_emb, agent_emb)[0][0]
if sim > best_score:
best_score = sim
best_agent = agent_key
return best_agent, float(best_score)
def semantic_column_match(self, target_name: str, available_columns: List[str],
threshold: float = 0.6) -> Optional[Tuple[str, float]]:
"""
Find best matching column for a target name using fuzzy semantic matching.
Better than string fuzzy matching because it understands synonyms:
- "salary" matches "annual_income", "compensation", "pay"
- "target" matches "label", "class", "outcome"
Args:
target_name: Column name to find (might not exist exactly)
available_columns: List of actual column names in dataset
threshold: Minimum similarity to consider a match
Returns:
(matched_column, confidence) or None
Example:
>>> semantic_column_match("salary", ["Annual_Income", "Name", "Age"])
>>> ("Annual_Income", 0.78)
"""
if not self.enabled:
# Fallback to exact match
if target_name in available_columns:
return (target_name, 1.0)
return None
# Encode target
target_emb = self.model.encode(target_name, convert_to_numpy=True, show_progress_bar=False)
target_emb = target_emb.reshape(1, -1)
# Find best match
best_col = None
best_score = 0.0
for col in available_columns:
col_emb = self.model.encode(col, convert_to_numpy=True, show_progress_bar=False)
col_emb = col_emb.reshape(1, -1)
sim = cosine_similarity(target_emb, col_emb)[0][0]
if sim > best_score:
best_score = sim
best_col = col
if best_score >= threshold:
return (best_col, float(best_score))
return None
def enrich_dataset_info(self, dataset_info: Dict[str, Any],
file_path: str, sample_size: int = 100) -> Dict[str, Any]:
"""
Enrich dataset_info with semantic column embeddings.
Adds 'column_embeddings' and 'semantic_insights' to dataset_info.
Args:
dataset_info: Dataset info from schema_extraction
file_path: Path to CSV file
sample_size: Number of rows to sample for encoding
Returns:
Enhanced dataset_info with semantic layer
"""
if not self.enabled:
return dataset_info
try:
# Load dataset
df = pl.read_csv(file_path, n_rows=sample_size)
column_embeddings = {}
for col_name, col_info in dataset_info['columns'].items():
# Get sample values
sample_values = df[col_name].head(5).to_list()
# Create embedding
embedding = self.encode_column(
column_name=col_name,
dtype=col_info['dtype'],
sample_values=sample_values,
stats={
'unique_count': col_info.get('unique_count'),
'missing_pct': col_info.get('missing_pct'),
'mean': col_info.get('mean')
}
)
column_embeddings[col_name] = embedding
# Add to dataset_info
dataset_info['column_embeddings'] = column_embeddings
# Detect similar columns (potential duplicates)
similar_pairs = []
cols = list(column_embeddings.keys())
for i, col1 in enumerate(cols):
similar = self.find_similar_columns(col1, column_embeddings, top_k=1, threshold=0.75)
if similar:
similar_pairs.append((col1, similar[0][0], similar[0][1]))
dataset_info['semantic_insights'] = {
'similar_columns': similar_pairs,
'total_columns_embedded': len(column_embeddings)
}
print(f"🧠 Semantic layer: Embedded {len(column_embeddings)} columns")
if similar_pairs:
print(f" Found {len(similar_pairs)} similar column pairs (potential duplicates)")
except Exception as e:
print(f"⚠️ Error enriching dataset with semantic layer: {e}")
return dataset_info
# Global semantic layer instance (lazy loaded)
_semantic_layer = None
def get_semantic_layer() -> SemanticLayer:
"""Get or create global semantic layer instance."""
global _semantic_layer
if _semantic_layer is None:
_semantic_layer = SemanticLayer()
return _semantic_layer
|