| """Shared helpers used by multiple nodes. |
| |
| Public surface (imported by `generate_sql`, `repair_once`, `plan_query`, |
| `eval.runner`, `tests.test_agent_support`, `scripts.wider_sc_poc`, |
| `tests.agent.nodes.test_schema_link_hints`): |
| |
| - `parse_generate_sql_output` — robust JSON-parsing of LLM output |
| - `render_m_schema` — XiYan-style compact schema rendering |
| - `render_schema_block` — full schema-card block with hint appendices |
| - `render_fewshot_block` — few-shot example rendering |
| |
| Internal helpers are split into two sibling modules (Kimi audit P1.4): |
| |
| - `_text_utils` — JSON-fence stripping, safe-loads, NaN-safe float coerce, |
| best-effort SELECT extraction. Used only by `parse_generate_sql_output`. |
| - `_hints` — M-Schema regexes (`_M_COL_RE`, `_M_FK_RE`), join-hints |
| appendix, schema-link hints (one if-block per landed P3.F rescue), |
| extended-samples appendix. Used by `render_m_schema` and |
| `render_schema_block`. |
| |
| Both sibling modules import nothing from this file — no circular paths. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import re |
|
|
| from nl_sql.agent.nodes._hints import ( |
| _M_COL_RE, |
| _M_FK_RE, |
| _render_extended_samples_appendix, |
| _render_join_hints_appendix, |
| _render_schema_link_hints_appendix, |
| ) |
| from nl_sql.agent.nodes._text_utils import ( |
| _coerce_float, |
| _safe_loads, |
| _strip_code_fence, |
| _strip_to_sql, |
| ) |
| from nl_sql.agent.state import GenerateSQLOutput |
| from nl_sql.schema_index.retriever import ContextBundle |
|
|
|
|
| def parse_generate_sql_output(text: str) -> GenerateSQLOutput: |
| """Parse the LLM's JSON response into a GenerateSQLOutput. |
| |
| Handles common deviations: markdown fences, trailing prose, single-quoted |
| keys (some local models do this). Falls back to extracting the longest |
| SQL substring if JSON is unrecoverable — confidence drops to 0. |
| """ |
| raw = (text or "").strip() |
| candidate = _strip_code_fence(raw) |
| parsed = _safe_loads(candidate) |
| if parsed is None: |
| |
| match = re.search(r"\{[\s\S]*\}", raw) |
| if match: |
| parsed = _safe_loads(match.group(0)) |
|
|
| if not isinstance(parsed, dict): |
| return GenerateSQLOutput( |
| sql=_strip_to_sql(raw), |
| rationale="", |
| tables_used=(), |
| confidence=0.0, |
| raw_text=raw, |
| ) |
|
|
| sql = str(parsed.get("sql") or "").strip().rstrip(";") |
| rationale = str(parsed.get("rationale") or "") |
| tables = parsed.get("tables_used") or () |
| tables_used = tuple(str(t) for t in tables) if isinstance(tables, list) else () |
|
|
| confidence = _coerce_float(parsed.get("confidence"), default=0.0) |
| return GenerateSQLOutput( |
| sql=sql, |
| rationale=rationale, |
| tables_used=tables_used, |
| confidence=confidence, |
| raw_text=raw, |
| ) |
|
|
|
|
| def render_m_schema(context: ContextBundle | None) -> str: |
| """Compact M-Schema rendering (XiYan-SQL style) parsed from chunk text. |
| |
| Replaces verbose table-card dump with: ``table.column (type) [samples]`` |
| per line plus a trailing FK block. Reduces tokens by ~60% and surfaces |
| FK pairs as first-class signal next to columns instead of buried inside |
| multi-section cards. |
| """ |
| if context is None: |
| return "(no schema context)" |
| all_hits = list(context.schema_hits) + list(context.fk_neighbours) |
| all_hits.sort(key=lambda h: h.table_name.lower()) |
| if not all_hits: |
| return "(no tables matched)" |
| col_lines: list[str] = [] |
| fk_lines: list[str] = [] |
| for hit in all_hits: |
| table = hit.table_name |
| for raw_line in hit.text.splitlines(): |
| m = _M_COL_RE.match(raw_line) |
| if m: |
| col = m.group("col").strip() |
| col_type = m.group("type") |
| flags = (m.group("flags") or "").strip() |
| samples = (m.group("samples") or "").strip() |
| pk = "PK" in flags.split() |
| parts = [f"{table}.{col} ({col_type})"] |
| if pk: |
| parts.append("[PK]") |
| if samples: |
| parts.append(f"[{samples}]") |
| col_lines.append(" ".join(parts)) |
| continue |
| fk_m = _M_FK_RE.match(raw_line) |
| if fk_m: |
| local_cols, ref_table, ref_cols = fk_m.groups() |
| fk_lines.append(f"{table}.({local_cols}) -> {ref_table}.({ref_cols})") |
| blocks: list[str] = ["# Columns", *col_lines] if col_lines else ["(no columns parsed)"] |
| if fk_lines: |
| blocks.append("\n# Foreign keys") |
| blocks.extend(fk_lines) |
| appendix = _render_extended_samples_appendix(context.extended_samples) |
| if appendix: |
| blocks.append(appendix) |
| return "\n".join(blocks) |
|
|
|
|
| def render_schema_block( |
| context: ContextBundle | None, |
| *, |
| sort_alphabetically: bool = False, |
| ) -> str: |
| """Render schema chunks + FK neighbours into a single text block. |
| |
| Order: top-k dense hits first, FK-extended neighbours after. Empty bundle |
| yields a placeholder so prompt formatting still works. |
| |
| `sort_alphabetically=True` overrides retrieval order and renders all |
| tables (dense hits + FK neighbours together) in alphabetical-by-table-name |
| order. The "FK-related tables" header is omitted in this mode because |
| the partition no longer exists. Empirically codestral is more accurate |
| when the schema block matches the alphabetical baseline order produced |
| by SQLAlchemy's `inspect()` — see docs/SESSION_HANDOFF.md (column- |
| ordering experiment). |
| """ |
| if context is None: |
| return "(no schema context)" |
| blocks: list[str] = [] |
| all_hits = list(context.schema_hits) + list(context.fk_neighbours) |
| if sort_alphabetically: |
| all_hits.sort(key=lambda h: h.table_name.lower()) |
| blocks.extend(hit.text for hit in all_hits) |
| else: |
| blocks.extend(hit.text for hit in context.schema_hits) |
| if context.fk_neighbours: |
| blocks.append("# FK-related tables") |
| blocks.extend(hit.text for hit in context.fk_neighbours) |
| if not blocks: |
| return "(no tables matched)" |
| join_hints = _render_join_hints_appendix(all_hits) |
| if join_hints: |
| blocks.append(join_hints) |
| schema_link_hints = _render_schema_link_hints_appendix(context, all_hits) |
| if schema_link_hints: |
| blocks.append(schema_link_hints) |
| appendix = _render_extended_samples_appendix(context.extended_samples) |
| if appendix: |
| blocks.append(appendix) |
| return "\n\n".join(blocks) |
|
|
|
|
| def render_fewshot_block(context: ContextBundle | None) -> str: |
| if context is None or not context.fewshots: |
| return "(none)" |
| lines: list[str] = [] |
| for ex in context.fewshots: |
| lines.append(f"Q: {ex.question}") |
| lines.append(f"SQL: {ex.sql}") |
| lines.append("") |
| return "\n".join(lines).rstrip() |
|
|