| | """ |
| | Parquet module. |
| | |
| | TODO: handle migrations |
| | TODO: make it work with chunked exports. |
| | TODO: make it work with chunked imports. |
| | |
| | Mostly auto-generated by Cursor + GPT-5. |
| | """ |
| |
|
| | import os |
| |
|
| | import pandas as pd |
| | from sqlalchemy import inspect, text |
| | from sqlalchemy.engine import Engine |
| | from sqlmodel import Session |
| |
|
| |
|
| | def export_to_parquet(engine: Engine, backup_dir: str) -> None: |
| | """ |
| | Export each table in the database to a separate Parquet file. |
| | Loads entire tables into memory and sorts deterministically. |
| | |
| | TODO: make it work with chunked exports. |
| | TODO: handle migrations |
| | """ |
| | os.makedirs(backup_dir, exist_ok=True) |
| | inspector = inspect(engine) |
| | table_names = inspector.get_table_names() |
| |
|
| | for table_name in table_names: |
| | file_path = os.path.join(backup_dir, f"{table_name}.parquet") |
| |
|
| | |
| | query = text(f"SELECT * FROM {table_name}") |
| | with engine.connect() as conn: |
| | df = pd.read_sql_query(query, conn) |
| |
|
| | |
| | sort_cols = list(df.columns) |
| | df_sorted = df.sort_values(by=sort_cols).reset_index(drop=True) |
| |
|
| | |
| | df_sorted.to_parquet(file_path, index=False) |
| | print(f"Exported {table_name} to {file_path}") |
| |
|
| |
|
| | def import_from_parquet(engine: Engine, backup_dir: str) -> None: |
| | """ |
| | Import each Parquet file into the database. |
| | Checks schema strictly (column names + types). |
| | Loads entire files into memory. |
| | |
| | TODO: make it work with chunked imports. |
| | TODO: handle migrations |
| | """ |
| | inspector = inspect(engine) |
| | table_names = inspector.get_table_names() |
| |
|
| | for table_name in table_names: |
| | file_path = os.path.join(backup_dir, f"{table_name}.parquet") |
| | if not os.path.exists(file_path): |
| | print(f"No backup found for table {table_name}, skipping.") |
| | continue |
| |
|
| | |
| | with Session(engine) as session: |
| | session.exec(text(f"DELETE FROM {table_name}")) |
| |
|
| | |
| | df = pd.read_parquet(file_path) |
| | with engine.begin() as conn: |
| | conn.execute(text(f"DELETE FROM {table_name}")) |
| | if not df.empty: |
| | columns = df.columns.tolist() |
| | total_rows = len(df) |
| | chunk_size = 10000 |
| | for start in range(0, total_rows, chunk_size): |
| | end = min(start + chunk_size, total_rows) |
| | chunk = df.iloc[start:end] |
| | values = chunk.to_dict(orient="records") |
| | insert_stmt = text( |
| | f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES " |
| | + ", ".join( |
| | [ |
| | "(" |
| | + ", ".join([f":{col}_{i}" for col in columns]) |
| | + ")" |
| | for i in range(len(values)) |
| | ] |
| | ) |
| | ) |
| | params = {} |
| | for i, row in enumerate(values): |
| | for col in columns: |
| | params[f"{col}_{i}"] = row[col] |
| | conn.execute(insert_stmt, params) |
| |
|
| | print(f"Imported {table_name} from {file_path}") |
| |
|