Data-Science-Agent / src /utils /schema_extraction.py
Pulastya B
Fixed Bugs where the SSE Streaming was improper added support for Auto Gluon, Fixed instances where stale schemas was causing EDA plots generation to fail
1ca2e0e
"""
Local Schema Extraction (No LLM)
Fast, cheap extraction of dataset metadata without sending to LLM.
"""
import polars as pl
from pathlib import Path
from typing import Dict, Any, Optional
def extract_schema_local(file_path: str, sample_rows: int = 5) -> Dict[str, Any]:
"""
Extract dataset schema and basic stats locally without LLM.
Returns:
- column names and types
- row/column counts
- missing value counts
- small sample for reference
- memory usage
"""
try:
# Read with Polars (faster than pandas)
if file_path.endswith('.csv'):
# 🔥 FIX: Use infer_schema_length and ignore_errors to handle mixed-type columns
# This prevents failures like: could not parse `835.159865` as dtype `i64`
try:
df = pl.read_csv(file_path, infer_schema_length=10000, ignore_errors=True)
except Exception:
# Final fallback: read everything as strings, then let Polars infer
try:
import pandas as pd
pdf = pd.read_csv(file_path, low_memory=False)
df = pl.from_pandas(pdf)
except Exception as e2:
return {
'error': f"Failed to read CSV: {str(e2)}",
'file_path': file_path
}
elif file_path.endswith('.parquet'):
df = pl.read_parquet(file_path)
else:
# Fallback to pandas
import pandas as pd
pdf = pd.read_csv(file_path, low_memory=False)
df = pl.from_pandas(pdf)
# Basic metadata
schema_info = {
'file_path': file_path,
'file_size_mb': round(Path(file_path).stat().st_size / (1024 * 1024), 2),
'num_rows': df.shape[0],
'num_columns': df.shape[1],
'columns': {}
}
# Per-column metadata
for col in df.columns:
col_series = df[col]
dtype_str = str(col_series.dtype)
col_info = {
'dtype': dtype_str,
'missing_count': col_series.null_count(),
'missing_pct': round(col_series.null_count() / len(col_series) * 100, 2),
'unique_count': col_series.n_unique() if len(col_series) < 100000 else None # Skip for huge datasets
}
# Type-specific stats (lightweight)
if dtype_str in ['Int64', 'Float64', 'Int32', 'Float32']:
try:
col_info['min'] = float(col_series.min())
col_info['max'] = float(col_series.max())
col_info['mean'] = float(col_series.mean())
except:
pass
schema_info['columns'][col] = col_info
# Small sample for LLM context (only first few rows)
sample_data = df.head(sample_rows).to_dicts()
schema_info['sample_rows'] = sample_data
# Categorize columns
schema_info['numeric_columns'] = [
col for col, info in schema_info['columns'].items()
if 'Int' in info['dtype'] or 'Float' in info['dtype']
]
schema_info['categorical_columns'] = [
col for col, info in schema_info['columns'].items()
if info['dtype'] in ['Utf8', 'String'] or (
info.get('unique_count') is not None and
info.get('unique_count') < 50 and
col not in schema_info['numeric_columns']
)
]
schema_info['datetime_columns'] = [
col for col, info in schema_info['columns'].items()
if 'Date' in info['dtype'] or 'Time' in info['dtype']
]
return schema_info
except Exception as e:
return {
'error': f"Failed to extract schema: {str(e)}",
'file_path': file_path
}
def infer_task_type(target_column: str, schema_info: Dict[str, Any]) -> Optional[str]:
"""
Infer ML task type from target column without LLM.
"""
if not target_column or target_column not in schema_info.get('columns', {}):
return None
target_info = schema_info['columns'][target_column]
# Numeric with many unique values → regression
if target_info['dtype'] in ['Int64', 'Float64', 'Int32', 'Float32']:
unique_count = target_info.get('unique_count')
if unique_count and unique_count > 20:
return 'regression'
elif unique_count and unique_count <= 10:
return 'classification'
# Categorical or low cardinality → classification
if target_info['dtype'] in ['Utf8', 'String'] or target_info.get('unique_count', 0) <= 20:
return 'classification'
return None