Spaces:
Sleeping
Sleeping
| """ | |
| Abstract database connector interface and SchemaMapper. | |
| Every database backend (SQLite, PostgreSQL, CSV) implements DatabaseConnector. | |
| SchemaMapper translates arbitrary column names to the mRNASequence model fields. | |
| """ | |
| from __future__ import annotations | |
| from abc import ABC, abstractmethod | |
| from dataclasses import dataclass, field | |
| from typing import Any, Dict, List, Optional | |
| import pandas as pd | |
| from core.models.sequence import mRNASequence | |
| # Fields in mRNASequence that can be mapped from a database | |
| SEQUENCE_FIELDS = { | |
| "name", | |
| "five_prime_utr", | |
| "kozak", | |
| "cds", | |
| "three_prime_utr", | |
| "poly_a", | |
| "full_mrna", | |
| } | |
| class ConnectionConfig: | |
| """Generic connection configuration (fields vary by backend).""" | |
| backend: str # "sqlite", "postgres", "csv", "excel" | |
| display_name: str # User-facing label for the connection | |
| params: Dict[str, Any] = field(default_factory=dict) | |
| # e.g. sqlite: {"path": "/data/seqs.db"} | |
| # e.g. postgres: {"host": "...", "port": 5432, "dbname": "...", "user": "...", "password": "..."} | |
| # e.g. csv: {"path": "/data/seqs.csv"} | |
| class DatabaseConnector(ABC): | |
| """Abstract database connector. One instance per active connection.""" | |
| def __init__(self, config: ConnectionConfig) -> None: | |
| self.config = config | |
| self._connected = False | |
| def connect(self) -> None: | |
| """Open the connection. Raises ConnectionError on failure.""" | |
| ... | |
| def disconnect(self) -> None: | |
| """Close the connection.""" | |
| ... | |
| def list_tables(self) -> List[str]: | |
| """Return available table / sheet names.""" | |
| ... | |
| def get_records( | |
| self, | |
| table: str, | |
| query: Optional[str] = None, | |
| limit: Optional[int] = None, | |
| ) -> pd.DataFrame: | |
| """ | |
| Fetch records from a table. | |
| Parameters | |
| ---------- | |
| table : str | |
| Table name (from list_tables). | |
| query : str, optional | |
| Backend-specific filter string (SQL WHERE clause for SQL backends, | |
| pandas query string for file backends). | |
| limit : int, optional | |
| Max rows to return. | |
| """ | |
| ... | |
| def get_columns(self, table: str) -> List[str]: | |
| """Return column names for a table.""" | |
| ... | |
| def is_connected(self) -> bool: | |
| return self._connected | |
| def name(self) -> str: | |
| return self.config.display_name | |
| def __repr__(self) -> str: | |
| status = "connected" if self._connected else "disconnected" | |
| return f"{self.__class__.__name__}({self.name!r}, {status})" | |
| # ββ Schema Mapper ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class FieldMapping: | |
| """ | |
| Describes how one database column maps to a mRNASequence field. | |
| source_column : str | |
| Column name in the database. | |
| target_field : str | |
| Field name in mRNASequence. Must be in SEQUENCE_FIELDS. | |
| transform : callable, optional | |
| Optional transform applied to the raw value before assignment. | |
| E.g. str.upper, lambda x: x.replace(" ", "") | |
| """ | |
| source_column: str | |
| target_field: str | |
| transform: Optional[Any] = None # callable or None | |
| def __post_init__(self) -> None: | |
| if self.target_field not in SEQUENCE_FIELDS: | |
| raise ValueError( | |
| f"'{self.target_field}' is not a valid mRNASequence field. " | |
| f"Valid fields: {sorted(SEQUENCE_FIELDS)}" | |
| ) | |
| class SchemaMapper: | |
| """ | |
| Maps a DataFrame (from any DatabaseConnector) to a list of mRNASequence | |
| objects using a user-configured field mapping. | |
| Example | |
| ------- | |
| mapper = SchemaMapper([ | |
| FieldMapping("mrna_sequence", "full_mrna"), | |
| FieldMapping("gene_name", "name"), | |
| FieldMapping("utr5_sequence", "five_prime_utr", transform=str.upper), | |
| ]) | |
| sequences = mapper.map_dataframe(df, db_source="my_lims") | |
| """ | |
| def __init__(self, mappings: List[FieldMapping], db_source: str = "") -> None: | |
| self.mappings = mappings | |
| self.db_source = db_source | |
| # Validate: exactly one mapping targeting 'name' must exist | |
| name_targets = [m for m in mappings if m.target_field == "name"] | |
| if not name_targets: | |
| raise ValueError( | |
| "SchemaMapper requires at least one FieldMapping targeting 'name'." | |
| ) | |
| def map_row(self, row: Dict[str, Any]) -> mRNASequence: | |
| """Map a single row dict to an mRNASequence.""" | |
| kwargs: Dict[str, Any] = { | |
| "source": "database", | |
| "db_source": self.db_source, | |
| "raw_metadata": dict(row), | |
| } | |
| for mapping in self.mappings: | |
| value = row.get(mapping.source_column) | |
| # Skip None and NaN values (pandas often returns NaN for SQL NULL) | |
| if value is None or (isinstance(value, float) and pd.isna(value)): | |
| continue | |
| if mapping.transform is not None: | |
| try: | |
| value = mapping.transform(value) | |
| except Exception: | |
| pass | |
| kwargs[mapping.target_field] = value | |
| # name is required β fall back to first non-empty string value in the row | |
| if "name" not in kwargs or not kwargs["name"]: | |
| for v in row.values(): | |
| if isinstance(v, str) and v.strip(): | |
| kwargs["name"] = v.strip()[:80] | |
| break | |
| else: | |
| kwargs["name"] = "unnamed" | |
| return mRNASequence(**kwargs) # type: ignore[arg-type] | |
| def map_dataframe(self, df: pd.DataFrame) -> List[mRNASequence]: | |
| """Map every row in df to an mRNASequence.""" | |
| return [self.map_row(row.to_dict()) for _, row in df.iterrows()] | |
| def from_dict(cls, mapping_dict: Dict[str, str], db_source: str = "") -> "SchemaMapper": | |
| """ | |
| Convenience constructor from a plain {db_column: sequence_field} dict. | |
| Example | |
| ------- | |
| mapper = SchemaMapper.from_dict({ | |
| "gene_name": "name", | |
| "mrna_seq": "full_mrna", | |
| "utr": "five_prime_utr", | |
| }) | |
| """ | |
| mappings = [ | |
| FieldMapping(source_column=col, target_field=field_) | |
| for col, field_ in mapping_dict.items() | |
| ] | |
| return cls(mappings, db_source=db_source) | |