File size: 3,865 Bytes
03a7eb9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""
CodeArena Agent Memory
Self-improving memory across episodes.
Stores best solutions per task + retrieves them to seed future fixes.
"""

import json
import os
import csv
import time
from typing import Optional

MEMORY_FILE = os.path.join(os.path.dirname(__file__), "..", "agent_memory.json")
CSV_FILE = os.path.join(os.path.dirname(__file__), "..", "complexity_rewards.csv")

# ── Memory Store ──────────────────────────────────────────────────────────────

def load_memory() -> dict:
    """Load agent memory from disk."""
    try:
        if os.path.exists(MEMORY_FILE):
            with open(MEMORY_FILE, "r") as f:
                return json.load(f)
    except Exception as e:
        print(f"[Memory] Load error: {e}")
    return {}


def save_memory(memory: dict) -> None:
    """Persist agent memory to disk."""
    try:
        with open(MEMORY_FILE, "w") as f:
            json.dump(memory, f, indent=2)
    except Exception as e:
        print(f"[Memory] Save error: {e}")


def store_success(task_id: str, code: str, reward: float) -> None:
    """
    Store a successful solution if reward improves on previous best.
    Only keeps the BEST solution per task.
    """
    memory = load_memory()
    existing = memory.get(task_id)

    if existing is None or reward > existing.get("reward", 0):
        memory[task_id] = {
            "best_code": code,
            "reward": round(reward, 4),
            "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
        }
        save_memory(memory)
        print(f"[Memory] Stored new best for '{task_id}' with reward={reward:.3f}")


def retrieve_memory(task_id: str) -> Optional[dict]:
    """
    Retrieve the best known solution for a task.
    Returns dict with 'best_code' and 'reward', or None.
    """
    memory = load_memory()
    return memory.get(task_id)


def get_all_memories() -> dict:
    """Return all stored task memories (for dashboard display)."""
    return load_memory()


# ── Complexity vs Reward CSV Logger ──────────────────────────────────────────

def log_complexity_reward(
    task_id: str,
    reward: float,
    complexity: str,
    step: int,
    method: str = "ollama",
) -> None:
    """
    Append a log entry to complexity_rewards.csv.
    Used to track: better algorithms β†’ better rewards.
    """
    log_entry = {
        "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
        "task_id": task_id,
        "reward": round(reward, 4),
        "complexity": complexity,
        "step": step,
        "method": method,
    }
    try:
        file_exists = os.path.exists(CSV_FILE)
        with open(CSV_FILE, "a", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=log_entry.keys())
            if not file_exists or f.tell() == 0:
                writer.writeheader()
            writer.writerow(log_entry)
    except Exception as e:
        print(f"[Memory] CSV log error: {e}")


def get_complexity_reward_stats() -> dict:
    """
    Read CSV and compute average reward per complexity class.
    Returns dict like: {"O(n)": 0.88, "O(n^2)": 0.55, "O(n^3)": 0.12}
    """
    stats: dict[str, list] = {}
    try:
        if not os.path.exists(CSV_FILE):
            return {}
        with open(CSV_FILE, "r") as f:
            reader = csv.DictReader(f)
            for row in reader:
                c = row.get("complexity", "unknown")
                r = float(row.get("reward", 0))
                stats.setdefault(c, []).append(r)
        return {k: round(sum(v) / len(v), 3) for k, v in stats.items()}
    except Exception as e:
        print(f"[Memory] Stats error: {e}")
        return {}