File size: 5,911 Bytes
9e64e71 5dd1bb4 9e64e71 5dd1bb4 9e64e71 5dd1bb4 9e64e71 5dd1bb4 9e64e71 5dd1bb4 9e64e71 5dd1bb4 9e64e71 5dd1bb4 9e64e71 5dd1bb4 9e64e71 5dd1bb4 9e64e71 5dd1bb4 9e64e71 5dd1bb4 9e64e71 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | """Reward helpers for SQLEnv dense shaping.
Design: per-step clipping with delta-based progress (PBRS).
Each step's reward is computed independently and clipped to [-0.10, 0.15].
No cumulative tracking. The hard budget (15 steps) and step cost naturally
limit total exploration reward to ~0.15, making cumulative caps redundant.
Progress uses delta-from-previous (potential-based reward shaping, Ng et al.
1999): the agent is rewarded for improvement and penalized for regression.
This preserves the optimal policy and provides recovery signal.
See docs/design-docs/reward-shaping-research.md for full rationale.
"""
from __future__ import annotations
import hashlib
import math
try:
from sql_env.models import EpisodeContext
except ImportError: # pragma: no cover - Docker fallback import path
from models import EpisodeContext # type: ignore[no-redef]
_EXEC_OK_REWARD = 0.02
_NEW_INFO_REWARD = 0.01
_REPEAT_PENALTY = 0.03
_STEP_COST = 0.02
_LAYER2_CARDINALITY_WEIGHT = 0.25
_LAYER2_VALUE_OVERLAP_WEIGHT = 0.50
_LAYER2_NUMERIC_RANGE_WEIGHT = 0.25
_LAYER2_IMPROVEMENT_SCALE = 0.15
_PER_STEP_FLOOR = -0.10
_PER_STEP_CAP = 0.15
def compute_step_reward(
ctx: EpisodeContext,
action_type: str,
sql: str,
rows: list[tuple] | None,
error: str | None,
) -> float:
"""Compute one dense step reward with per-step clipping.
Combines Layer 1 operational shaping with Layer 2 delta-based progress
for successful QUERY actions. The result is clipped per step to
[-0.10, 0.15]. No cumulative tracking.
"""
step_reward = _layer1_operational(ctx, action_type, sql, rows, error)
if action_type.upper() == "QUERY" and rows is not None and error is None:
step_reward += _layer2_progress(ctx, rows)
return max(_PER_STEP_FLOOR, min(_PER_STEP_CAP, step_reward))
def _layer1_operational(
ctx: EpisodeContext,
action_type: str,
sql: str,
rows: list[tuple] | None,
error: str | None,
) -> float:
"""Compute Layer 1 operational reward signals.
- ``+0.02`` for successful execution (``error is None``)
- ``+0.01`` for first-seen successful QUERY (unique SQL hash)
- ``-0.03`` repeat penalty for duplicate QUERY SQL
- ``-0.02`` step cost on every call
"""
reward = -_STEP_COST
is_query = action_type.upper() == "QUERY"
query_hash: str | None = None
is_repeat = False
if is_query and sql:
query_hash = hashlib.sha256(sql.encode("utf-8")).hexdigest()
is_repeat = query_hash in ctx.query_hashes
if is_repeat:
reward -= _REPEAT_PENALTY
elif error is None:
reward += _EXEC_OK_REWARD
if (
is_query
and error is None
and rows is not None
and query_hash is not None
and not is_repeat
):
ctx.query_hashes.add(query_hash)
reward += _NEW_INFO_REWARD
return reward
def _cardinality_score(pred_rows: list[tuple], gold_rows: list[tuple]) -> float:
"""Compute row-count similarity score in [0.0, 1.0]."""
pred_count = len(pred_rows)
gold_count = len(gold_rows)
denominator = max(pred_count, gold_count, 1)
score = 1.0 - (abs(pred_count - gold_count) / denominator)
return max(0.0, min(1.0, score))
def _value_overlap_score(pred_rows: list[tuple], gold_rows: list[tuple]) -> float:
"""Compute Jaccard overlap of flattened cell values as strings."""
pred_values = {str(cell) for row in pred_rows for cell in row}
gold_values = {str(cell) for row in gold_rows for cell in row}
union = pred_values | gold_values
if not union:
return 0.0
intersection = pred_values & gold_values
return len(intersection) / len(union)
def _numeric_range_score(pred_rows: list[tuple], gold_rows: list[tuple]) -> float:
"""Compute log-distance proximity for numeric cell values."""
def _is_numeric(value: object) -> bool:
return isinstance(value, (int, float)) and not isinstance(value, bool)
pred_numerics = [
float(cell) for row in pred_rows for cell in row if _is_numeric(cell)
]
gold_numerics = [
float(cell) for row in gold_rows for cell in row if _is_numeric(cell)
]
if not gold_numerics:
return 1.0
if not pred_numerics:
return 0.0
total = 0.0
for gold_value in gold_numerics:
closest_distance = min(
abs(pred_value - gold_value) for pred_value in pred_numerics
)
total += 1.0 / (1.0 + math.log1p(closest_distance))
return total / len(gold_numerics)
def _bin_progress(raw_score: float) -> float:
"""Bin raw progress to one of {0.0, 0.25, 0.5, 0.75, 1.0}."""
clamped_score = max(0.0, min(1.0, raw_score))
if clamped_score < 0.125:
return 0.0
if clamped_score < 0.375:
return 0.25
if clamped_score < 0.625:
return 0.5
if clamped_score < 0.875:
return 0.75
return 1.0
def _layer2_progress(ctx: EpisodeContext, rows: list[tuple]) -> float:
"""Compute Layer 2 progress reward using delta-from-previous (PBRS).
Rewards improvement and penalizes regression relative to the previous
step's progress score. This is potential-based reward shaping with
gamma=1 (Ng et al. 1999), which provably preserves the optimal policy.
"""
if not ctx.gold_rows:
return 0.0
cardinality = _cardinality_score(rows, ctx.gold_rows)
value_overlap = _value_overlap_score(rows, ctx.gold_rows)
numeric_range = _numeric_range_score(rows, ctx.gold_rows)
raw_progress = (
_LAYER2_CARDINALITY_WEIGHT * cardinality
+ _LAYER2_VALUE_OVERLAP_WEIGHT * value_overlap
+ _LAYER2_NUMERIC_RANGE_WEIGHT * numeric_range
)
binned_progress = _bin_progress(raw_progress)
delta = binned_progress - ctx.previous_progress
ctx.previous_progress = binned_progress
return delta * _LAYER2_IMPROVEMENT_SCALE
|