File size: 5,911 Bytes
9e64e71
 
 
 
 
 
 
 
 
 
 
 
 
 
5dd1bb4
 
 
 
 
 
 
 
 
 
 
 
 
 
9e64e71
 
5dd1bb4
 
 
 
9e64e71
 
5dd1bb4
 
 
 
 
 
 
 
 
9e64e71
5dd1bb4
9e64e71
 
 
5dd1bb4
 
 
 
 
 
 
9e64e71
5dd1bb4
 
 
 
 
 
 
 
 
 
 
9e64e71
 
 
 
5dd1bb4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9e64e71
5dd1bb4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9e64e71
 
 
 
 
 
5dd1bb4
 
 
 
 
 
 
 
9e64e71
 
 
5dd1bb4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9e64e71
 
 
 
 
 
5dd1bb4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9e64e71
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
"""Reward helpers for SQLEnv dense shaping.

Design: per-step clipping with delta-based progress (PBRS).

Each step's reward is computed independently and clipped to [-0.10, 0.15].
No cumulative tracking. The hard budget (15 steps) and step cost naturally
limit total exploration reward to ~0.15, making cumulative caps redundant.

Progress uses delta-from-previous (potential-based reward shaping, Ng et al.
1999): the agent is rewarded for improvement and penalized for regression.
This preserves the optimal policy and provides recovery signal.

See docs/design-docs/reward-shaping-research.md for full rationale.
"""

from __future__ import annotations

import hashlib
import math

try:
    from sql_env.models import EpisodeContext
except ImportError:  # pragma: no cover - Docker fallback import path
    from models import EpisodeContext  # type: ignore[no-redef]


_EXEC_OK_REWARD = 0.02
_NEW_INFO_REWARD = 0.01
_REPEAT_PENALTY = 0.03
_STEP_COST = 0.02
_LAYER2_CARDINALITY_WEIGHT = 0.25
_LAYER2_VALUE_OVERLAP_WEIGHT = 0.50
_LAYER2_NUMERIC_RANGE_WEIGHT = 0.25
_LAYER2_IMPROVEMENT_SCALE = 0.15
_PER_STEP_FLOOR = -0.10
_PER_STEP_CAP = 0.15


def compute_step_reward(
    ctx: EpisodeContext,
    action_type: str,
    sql: str,
    rows: list[tuple] | None,
    error: str | None,
) -> float:
    """Compute one dense step reward with per-step clipping.

    Combines Layer 1 operational shaping with Layer 2 delta-based progress
    for successful QUERY actions. The result is clipped per step to
    [-0.10, 0.15]. No cumulative tracking.
    """

    step_reward = _layer1_operational(ctx, action_type, sql, rows, error)

    if action_type.upper() == "QUERY" and rows is not None and error is None:
        step_reward += _layer2_progress(ctx, rows)

    return max(_PER_STEP_FLOOR, min(_PER_STEP_CAP, step_reward))


def _layer1_operational(
    ctx: EpisodeContext,
    action_type: str,
    sql: str,
    rows: list[tuple] | None,
    error: str | None,
) -> float:
    """Compute Layer 1 operational reward signals.

    - ``+0.02`` for successful execution (``error is None``)
    - ``+0.01`` for first-seen successful QUERY (unique SQL hash)
    - ``-0.03`` repeat penalty for duplicate QUERY SQL
    - ``-0.02`` step cost on every call
    """

    reward = -_STEP_COST

    is_query = action_type.upper() == "QUERY"
    query_hash: str | None = None
    is_repeat = False

    if is_query and sql:
        query_hash = hashlib.sha256(sql.encode("utf-8")).hexdigest()
        is_repeat = query_hash in ctx.query_hashes

    if is_repeat:
        reward -= _REPEAT_PENALTY
    elif error is None:
        reward += _EXEC_OK_REWARD

    if (
        is_query
        and error is None
        and rows is not None
        and query_hash is not None
        and not is_repeat
    ):
        ctx.query_hashes.add(query_hash)
        reward += _NEW_INFO_REWARD

    return reward


def _cardinality_score(pred_rows: list[tuple], gold_rows: list[tuple]) -> float:
    """Compute row-count similarity score in [0.0, 1.0]."""

    pred_count = len(pred_rows)
    gold_count = len(gold_rows)
    denominator = max(pred_count, gold_count, 1)
    score = 1.0 - (abs(pred_count - gold_count) / denominator)
    return max(0.0, min(1.0, score))


def _value_overlap_score(pred_rows: list[tuple], gold_rows: list[tuple]) -> float:
    """Compute Jaccard overlap of flattened cell values as strings."""

    pred_values = {str(cell) for row in pred_rows for cell in row}
    gold_values = {str(cell) for row in gold_rows for cell in row}

    union = pred_values | gold_values
    if not union:
        return 0.0

    intersection = pred_values & gold_values
    return len(intersection) / len(union)


def _numeric_range_score(pred_rows: list[tuple], gold_rows: list[tuple]) -> float:
    """Compute log-distance proximity for numeric cell values."""

    def _is_numeric(value: object) -> bool:
        return isinstance(value, (int, float)) and not isinstance(value, bool)

    pred_numerics = [
        float(cell) for row in pred_rows for cell in row if _is_numeric(cell)
    ]
    gold_numerics = [
        float(cell) for row in gold_rows for cell in row if _is_numeric(cell)
    ]

    if not gold_numerics:
        return 1.0
    if not pred_numerics:
        return 0.0

    total = 0.0
    for gold_value in gold_numerics:
        closest_distance = min(
            abs(pred_value - gold_value) for pred_value in pred_numerics
        )
        total += 1.0 / (1.0 + math.log1p(closest_distance))

    return total / len(gold_numerics)


def _bin_progress(raw_score: float) -> float:
    """Bin raw progress to one of {0.0, 0.25, 0.5, 0.75, 1.0}."""

    clamped_score = max(0.0, min(1.0, raw_score))
    if clamped_score < 0.125:
        return 0.0
    if clamped_score < 0.375:
        return 0.25
    if clamped_score < 0.625:
        return 0.5
    if clamped_score < 0.875:
        return 0.75
    return 1.0


def _layer2_progress(ctx: EpisodeContext, rows: list[tuple]) -> float:
    """Compute Layer 2 progress reward using delta-from-previous (PBRS).

    Rewards improvement and penalizes regression relative to the previous
    step's progress score. This is potential-based reward shaping with
    gamma=1 (Ng et al. 1999), which provably preserves the optimal policy.
    """

    if not ctx.gold_rows:
        return 0.0

    cardinality = _cardinality_score(rows, ctx.gold_rows)
    value_overlap = _value_overlap_score(rows, ctx.gold_rows)
    numeric_range = _numeric_range_score(rows, ctx.gold_rows)

    raw_progress = (
        _LAYER2_CARDINALITY_WEIGHT * cardinality
        + _LAYER2_VALUE_OVERLAP_WEIGHT * value_overlap
        + _LAYER2_NUMERIC_RANGE_WEIGHT * numeric_range
    )
    binned_progress = _bin_progress(raw_progress)

    delta = binned_progress - ctx.previous_progress
    ctx.previous_progress = binned_progress
    return delta * _LAYER2_IMPROVEMENT_SCALE