Spaces:
Sleeping
Sleeping
| """PostgreSQL database connector (via SQLAlchemy + psycopg2).""" | |
| from __future__ import annotations | |
| from typing import List, Optional | |
| import pandas as pd | |
| from sqlalchemy import create_engine, inspect, text | |
| from sqlalchemy.engine import Engine | |
| from core.database.base import ConnectionConfig, DatabaseConnector | |
| class PostgreSQLConnector(DatabaseConnector): | |
| """Connects to a PostgreSQL database.""" | |
| def __init__(self, config: ConnectionConfig) -> None: | |
| super().__init__(config) | |
| self._engine: Optional[Engine] = None | |
| def _build_url(self) -> str: | |
| p = self.config.params | |
| user = p.get("user", "") | |
| password = p.get("password", "") | |
| host = p.get("host", "localhost") | |
| port = p.get("port", 5432) | |
| dbname = p.get("dbname", "") | |
| if password: | |
| return f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}" | |
| return f"postgresql+psycopg2://{user}@{host}:{port}/{dbname}" | |
| def connect(self) -> None: | |
| try: | |
| url = self._build_url() | |
| self._engine = create_engine(url, pool_pre_ping=True) | |
| # Test the connection | |
| with self._engine.connect() as conn: | |
| conn.execute(text("SELECT 1")) | |
| self._connected = True | |
| except Exception as e: | |
| raise ConnectionError(f"PostgreSQL connection failed: {e}") from e | |
| def disconnect(self) -> None: | |
| if self._engine: | |
| self._engine.dispose() | |
| self._engine = None | |
| self._connected = False | |
| def list_tables(self) -> List[str]: | |
| self._require_connected() | |
| inspector = inspect(self._engine) | |
| return inspector.get_table_names() | |
| def get_columns(self, table: str) -> List[str]: | |
| self._require_connected() | |
| inspector = inspect(self._engine) | |
| return [col["name"] for col in inspector.get_columns(table)] | |
| def get_records( | |
| self, | |
| table: str, | |
| query: Optional[str] = None, | |
| limit: Optional[int] = None, | |
| ) -> pd.DataFrame: | |
| self._require_connected() | |
| sql = f'SELECT * FROM "{table}"' | |
| if query: | |
| sql += f" WHERE {query}" | |
| if limit: | |
| sql += f" LIMIT {limit}" | |
| with self._engine.connect() as conn: # type: ignore[union-attr] | |
| return pd.read_sql_query(text(sql), conn) | |
| def execute_raw(self, sql: str) -> pd.DataFrame: | |
| """Run arbitrary read-only SQL.""" | |
| self._require_connected() | |
| with self._engine.connect() as conn: # type: ignore[union-attr] | |
| return pd.read_sql_query(text(sql), conn) | |
| def _require_connected(self) -> None: | |
| if not self._connected or self._engine is None: | |
| raise RuntimeError("Not connected. Call connect() first.") | |