"""Reward helpers for SQLEnv dense shaping. Design: per-step clipping with delta-based progress (PBRS). Each step's reward is computed independently and clipped to [-0.10, 0.15]. No cumulative tracking. The hard budget (15 steps) and step cost naturally limit total exploration reward to ~0.15, making cumulative caps redundant. Progress uses delta-from-previous (potential-based reward shaping, Ng et al. 1999): the agent is rewarded for improvement and penalized for regression. This preserves the optimal policy and provides recovery signal. See docs/design-docs/reward-shaping-research.md for full rationale. """ from __future__ import annotations import hashlib import math try: from sql_env.models import EpisodeContext except ImportError: # pragma: no cover - Docker fallback import path from models import EpisodeContext # type: ignore[no-redef] _EXEC_OK_REWARD = 0.02 _NEW_INFO_REWARD = 0.01 _REPEAT_PENALTY = 0.03 _STEP_COST = 0.02 _LAYER2_CARDINALITY_WEIGHT = 0.25 _LAYER2_VALUE_OVERLAP_WEIGHT = 0.50 _LAYER2_NUMERIC_RANGE_WEIGHT = 0.25 _LAYER2_IMPROVEMENT_SCALE = 0.15 _PER_STEP_FLOOR = -0.10 _PER_STEP_CAP = 0.15 def compute_step_reward( ctx: EpisodeContext, action_type: str, sql: str, rows: list[tuple] | None, error: str | None, ) -> float: """Compute one dense step reward with per-step clipping. Combines Layer 1 operational shaping with Layer 2 delta-based progress for successful QUERY actions. The result is clipped per step to [-0.10, 0.15]. No cumulative tracking. """ step_reward = _layer1_operational(ctx, action_type, sql, rows, error) if action_type.upper() == "QUERY" and rows is not None and error is None: step_reward += _layer2_progress(ctx, rows) return max(_PER_STEP_FLOOR, min(_PER_STEP_CAP, step_reward)) def _layer1_operational( ctx: EpisodeContext, action_type: str, sql: str, rows: list[tuple] | None, error: str | None, ) -> float: """Compute Layer 1 operational reward signals. - ``+0.02`` for successful execution (``error is None``) - ``+0.01`` for first-seen successful QUERY (unique SQL hash) - ``-0.03`` repeat penalty for duplicate QUERY SQL - ``-0.02`` step cost on every call """ reward = -_STEP_COST is_query = action_type.upper() == "QUERY" query_hash: str | None = None is_repeat = False if is_query and sql: query_hash = hashlib.sha256(sql.encode("utf-8")).hexdigest() is_repeat = query_hash in ctx.query_hashes if is_repeat: reward -= _REPEAT_PENALTY elif error is None: reward += _EXEC_OK_REWARD if ( is_query and error is None and rows is not None and query_hash is not None and not is_repeat ): ctx.query_hashes.add(query_hash) reward += _NEW_INFO_REWARD return reward def _cardinality_score(pred_rows: list[tuple], gold_rows: list[tuple]) -> float: """Compute row-count similarity score in [0.0, 1.0].""" pred_count = len(pred_rows) gold_count = len(gold_rows) denominator = max(pred_count, gold_count, 1) score = 1.0 - (abs(pred_count - gold_count) / denominator) return max(0.0, min(1.0, score)) def _value_overlap_score(pred_rows: list[tuple], gold_rows: list[tuple]) -> float: """Compute Jaccard overlap of flattened cell values as strings.""" pred_values = {str(cell) for row in pred_rows for cell in row} gold_values = {str(cell) for row in gold_rows for cell in row} union = pred_values | gold_values if not union: return 0.0 intersection = pred_values & gold_values return len(intersection) / len(union) def _numeric_range_score(pred_rows: list[tuple], gold_rows: list[tuple]) -> float: """Compute log-distance proximity for numeric cell values.""" def _is_numeric(value: object) -> bool: return isinstance(value, (int, float)) and not isinstance(value, bool) pred_numerics = [ float(cell) for row in pred_rows for cell in row if _is_numeric(cell) ] gold_numerics = [ float(cell) for row in gold_rows for cell in row if _is_numeric(cell) ] if not gold_numerics: return 1.0 if not pred_numerics: return 0.0 total = 0.0 for gold_value in gold_numerics: closest_distance = min( abs(pred_value - gold_value) for pred_value in pred_numerics ) total += 1.0 / (1.0 + math.log1p(closest_distance)) return total / len(gold_numerics) def _bin_progress(raw_score: float) -> float: """Bin raw progress to one of {0.0, 0.25, 0.5, 0.75, 1.0}.""" clamped_score = max(0.0, min(1.0, raw_score)) if clamped_score < 0.125: return 0.0 if clamped_score < 0.375: return 0.25 if clamped_score < 0.625: return 0.5 if clamped_score < 0.875: return 0.75 return 1.0 def _layer2_progress(ctx: EpisodeContext, rows: list[tuple]) -> float: """Compute Layer 2 progress reward using delta-from-previous (PBRS). Rewards improvement and penalizes regression relative to the previous step's progress score. This is potential-based reward shaping with gamma=1 (Ng et al. 1999), which provably preserves the optimal policy. """ if not ctx.gold_rows: return 0.0 cardinality = _cardinality_score(rows, ctx.gold_rows) value_overlap = _value_overlap_score(rows, ctx.gold_rows) numeric_range = _numeric_range_score(rows, ctx.gold_rows) raw_progress = ( _LAYER2_CARDINALITY_WEIGHT * cardinality + _LAYER2_VALUE_OVERLAP_WEIGHT * value_overlap + _LAYER2_NUMERIC_RANGE_WEIGHT * numeric_range ) binned_progress = _bin_progress(raw_progress) delta = binned_progress - ctx.previous_progress ctx.previous_progress = binned_progress return delta * _LAYER2_IMPROVEMENT_SCALE