Visual-Reasoning / maze /maze_processor.py
Jayce-Ping's picture
Add files using upload-large-folder tool
2e9398f verified
"""
MazeProcessor - Maze generation, solving, rendering, and video frame generation.
Mirrors the SudokuProcessor pattern: a single class encapsulating all maze logic
including DFS generation, BFS solving, image/video rendering, path verification,
and text serialization.
"""
import random
from collections import deque
from typing import List, Tuple, Optional, Dict
import numpy as np
from PIL import Image, ImageDraw
# Wall bitmask encoding (matches text file format)
WALL_MASKS = {"N": 1, "S": 2, "W": 4, "E": 8}
OPPOSITE = {"N": "S", "S": "N", "E": "W", "W": "E"}
MOVES = {
"U": (-1, 0, "N"),
"D": (1, 0, "S"),
"L": (0, -1, "W"),
"R": (0, 1, "E"),
}
NEIGHBORS = {
"N": (-1, 0),
"S": (1, 0),
"E": (0, 1),
"W": (0, -1),
}
# ======================== Grid Type ========================
# grid[r][c] = {"N": bool, "S": bool, "W": bool, "E": bool}
# True => wall present, False => passage open
Grid = List[List[Dict[str, bool]]]
class MazeProcessor:
"""Handles maze generation, solving, image rendering, and video frame creation."""
def __init__(self, img_size: int = 512):
self.img_size = img_size
# Rendering colours (RGB)
self.bg_color = "black"
self.cell_color = "white"
self.wall_color = "black"
self.grid_color = (224, 224, 224)
self.start_color = "yellow"
self.end_color = "blue"
self.path_color = "red"
# ==================== Generation (DFS) ====================
@staticmethod
def _empty_grid(n: int) -> Grid:
"""Create an n×n grid with all walls present."""
return [
[{"N": True, "E": True, "S": True, "W": True} for _ in range(n)]
for _ in range(n)
]
@staticmethod
def _remove_wall(grid: Grid, r: int, c: int, direction: str) -> None:
"""Remove wall between (r,c) and its neighbour in *direction*."""
dr, dc = NEIGHBORS[direction]
grid[r][c][direction] = False
grid[r + dr][c + dc][OPPOSITE[direction]] = False
def generate(
self, size: int, min_path_len: int = 1, max_attempts: int = 200
) -> Tuple[Grid, Tuple[int, int], Tuple[int, int], np.ndarray]:
"""
Generate a perfect maze and a start/end pair whose shortest path
length >= *min_path_len*.
Returns:
(grid, start, end, path) where path is an (L, 2) int array.
"""
for _ in range(max_attempts):
grid = self._gen_dfs(size)
nodes = [(r, c) for r in range(size) for c in range(size)]
start, end = random.sample(nodes, 2)
path = self.solve_bfs(grid, start, end)
if path is not None and len(path) >= min_path_len:
return grid, tuple(start), tuple(end), path
raise RuntimeError(
f"Failed to generate maze (size={size}, min_path_len={min_path_len}) "
f"after {max_attempts} attempts."
)
def _gen_dfs(self, n: int) -> Grid:
"""Randomised DFS (iterative) to carve a perfect maze."""
grid = self._empty_grid(n)
visited = [[False] * n for _ in range(n)]
sr, sc = random.randrange(n), random.randrange(n)
visited[sr][sc] = True
stack = [(sr, sc)]
while stack:
r, c = stack[-1]
nbrs = []
for d, (dr, dc) in NEIGHBORS.items():
nr, nc = r + dr, c + dc
if 0 <= nr < n and 0 <= nc < n and not visited[nr][nc]:
nbrs.append((d, nr, nc))
if nbrs:
d, nr, nc = random.choice(nbrs)
self._remove_wall(grid, r, c, d)
visited[nr][nc] = True
stack.append((nr, nc))
else:
stack.pop()
return grid
# ==================== Solving (BFS) ====================
def solve_bfs(
self, grid: Grid, start: Tuple[int, int], end: Tuple[int, int]
) -> Optional[np.ndarray]:
"""BFS shortest path. Returns (L,2) int ndarray or None."""
n = len(grid)
q: deque = deque([(start, [start])])
visited = {start}
while q:
(r, c), path = q.popleft()
if (r, c) == end:
return np.array(path, dtype=int)
cell = grid[r][c]
for d, (dr, dc) in NEIGHBORS.items():
nr, nc = r + dr, c + dc
if (
0 <= nr < n
and 0 <= nc < n
and not cell[d]
and (nr, nc) not in visited
):
visited.add((nr, nc))
q.append(((nr, nc), path + [(nr, nc)]))
return None
# ==================== Path ↔ UDRL ====================
@staticmethod
def path_to_udrl(path) -> str:
"""Convert coordinate path to UDRL string."""
moves = []
for i in range(len(path) - 1):
r1, c1 = path[i]
r2, c2 = path[i + 1]
if r2 < r1:
moves.append("U")
elif r2 > r1:
moves.append("D")
elif c2 < c1:
moves.append("L")
else:
moves.append("R")
return "".join(moves)
# ==================== Verification ====================
def verify_path(self, grid: Grid, start: Tuple, end: Tuple, udrl: str, strict : bool = True) -> bool:
"""Verify that *udrl* is a wall-respecting walk from *start* to *end*."""
n = len(grid)
r, c = start
for ch in udrl.replace(",", "").replace(" ", "").strip():
if ch not in MOVES:
continue
dr, dc, wall = MOVES[ch]
if grid[r][c][wall]:
return False
nr, nc = r + dr, c + dc
if not (0 <= nr < n and 0 <= nc < n):
return False
r, c = nr, nc
if not strict and (r, c) == end:
break
return (r, c) == end
# ==================== Text Encoding ====================
def encode_grid(self, grid: Grid) -> str:
"""Encode grid to compact bitmask string (one int per cell, row-major)."""
rows = []
for row in grid:
vals = []
for cell in row:
v = 0
for d, mask in WALL_MASKS.items():
if cell[d]:
v |= mask
vals.append(str(v))
rows.append(" ".join(vals))
return "\n".join(rows)
def decode_grid(self, text_lines: List[str]) -> Grid:
"""Decode bitmask text lines back to grid dicts."""
grid = []
for line in text_lines:
row = []
for val_s in line.split():
val = int(val_s)
row.append({d: bool(val & mask) for d, mask in WALL_MASKS.items()})
grid.append(row)
return grid
def save_text(self, filepath, grid: Grid, start: Tuple, end: Tuple) -> None:
"""Save maze to compact text file."""
n = len(grid)
with open(filepath, "w") as f:
f.write(f"{n}\n{start[0]} {start[1]}\n{end[0]} {end[1]}\n")
f.write(self.encode_grid(grid) + "\n")
def load_text(self, filepath) -> Optional[Dict]:
"""
Load maze from text file.
Returns dict with keys: size, start, end, grid (dict-based),
grid_raw (list[list[int]] bitmask). None on failure.
"""
try:
with open(filepath) as f:
lines = [l.strip() for l in f if l.strip()]
n = int(lines[0])
sr, sc = map(int, lines[1].split())
er, ec = map(int, lines[2].split())
grid = self.decode_grid(lines[3 : 3 + n])
grid_raw: List[List[int]] = []
for r in range(n):
grid_raw.append(list(map(int, lines[3 + r].split())))
return {
"size": n,
"start": (sr, sc),
"end": (er, ec),
"grid": grid,
"grid_raw": grid_raw,
}
except Exception:
return None
def fingerprint(self, grid: Grid, start: Tuple, end: Tuple) -> str:
"""Content fingerprint for deduplication."""
n = len(grid)
parts = [f"{n},{start[0]},{start[1]},{end[0]},{end[1]}"]
for row in grid:
for cell in row:
v = sum(WALL_MASKS[d] for d in WALL_MASKS if cell[d])
parts.append(str(v))
return "|".join(parts)
# ==================== Image Rendering ====================
def _layout(self, n: int):
"""Compute rendering layout parameters."""
cell_f = float(self.img_size) / n
wall_f = cell_f / 4.0
half_f = wall_f / 2.0
grid_w = max(1, int(cell_f / 16.0))
return cell_f, wall_f, half_f, grid_w
def render(
self,
grid: Grid,
start: Tuple[int, int],
end: Tuple[int, int],
path: Optional[np.ndarray] = None,
path_steps: Optional[int] = None,
) -> Image.Image:
"""
Render maze as a PIL image.
Args:
grid: The maze grid.
start, end: Coordinates of start/end cells.
path: Full solution path (L, 2).
path_steps: Draw only the first *path_steps* segments (for video).
Returns:
PIL.Image (RGB, img_size × img_size).
"""
n = len(grid)
cell_f, wall_f, half_f, grid_w = self._layout(n)
img = Image.new("RGB", (self.img_size, self.img_size), self.bg_color)
draw = ImageDraw.Draw(img)
# --- fill cells & open passages ---
for r in range(n):
for c in range(n):
x1 = c * cell_f + half_f
y1 = r * cell_f + half_f
x2 = (c + 1) * cell_f - half_f
y2 = (r + 1) * cell_f - half_f
draw.rectangle([(x1, y1), (x2, y2)], fill=self.cell_color)
cell = grid[r][c]
if not cell["S"] and r < n - 1:
draw.rectangle(
[(x1, y2), (x2, y2 + wall_f)], fill=self.cell_color
)
if not cell["E"] and c < n - 1:
draw.rectangle(
[(x2, y1), (x2 + wall_f, y2)], fill=self.cell_color
)
# --- subtle grid lines on open passages ---
for r in range(n):
for c in range(n):
if r < n - 1 and not grid[r][c]["S"]:
y = (r + 1) * cell_f
draw.line(
[(c * cell_f + half_f, y), ((c + 1) * cell_f - half_f, y)],
fill=self.grid_color, width=grid_w,
)
if c < n - 1 and not grid[r][c]["E"]:
x = (c + 1) * cell_f
draw.line(
[(x, r * cell_f + half_f), (x, (r + 1) * cell_f - half_f)],
fill=self.grid_color, width=grid_w,
)
# --- start / end dots ---
def _dot(rc, color):
rr, cc = rc
cx = cc * cell_f + cell_f / 2
cy = rr * cell_f + cell_f / 2
rad = max(2, int((cell_f - wall_f) * 0.25))
draw.ellipse([cx - rad, cy - rad, cx + rad, cy + rad], fill=color)
_dot(start, self.start_color)
_dot(end, self.end_color)
# --- solution path (optionally partial) ---
if path is not None and len(path) >= 2:
end_idx = (
len(path) if path_steps is None
else min(path_steps + 1, len(path))
)
if end_idx >= 2:
pts = [
(c * cell_f + cell_f / 2, r * cell_f + cell_f / 2)
for r, c in path[:end_idx]
]
draw.line(
pts, fill=self.path_color,
width=max(1, int(wall_f)), joint="curve",
)
return img
# ==================== Video Frame Generation ====================
def generate_video_frames(
self,
grid: Grid,
start: Tuple[int, int],
end: Tuple[int, int],
path: np.ndarray,
n_start: int = 5,
m_end: int = 5,
frames: Optional[int] = None,
) -> List[Image.Image]:
"""
Generate progressive video frames showing the red line growing.
*frames* controls the number of **content frames** between holds:
- None → 1 per path step
- frames > steps → slow-motion
- frames < steps → fast-forward
Total length = n_start + content_frames + m_end.
"""
n_steps = len(path) - 1
if n_steps <= 0:
blank = self.render(grid, start, end)
return [blank] * (n_start + m_end + 1)
content_frames = frames if frames is not None else n_steps
content_frames = max(1, content_frames)
result: List[Image.Image] = []
# Opening hold
blank = self.render(grid, start, end)
result.extend([blank.copy() for _ in range(n_start)])
# Content frames
if content_frames == n_steps:
for step in range(1, n_steps + 1):
result.append(
self.render(grid, start, end, path=path, path_steps=step)
)
elif content_frames > n_steps:
for step in range(1, n_steps + 1):
f_lo = (step - 1) * content_frames // n_steps
f_hi = step * content_frames // n_steps
count = f_hi - f_lo
frame_img = self.render(
grid, start, end, path=path, path_steps=step
)
result.append(frame_img)
if count > 1:
result.extend([frame_img.copy() for _ in range(count - 1)])
else:
for f in range(content_frames):
step = (f + 1) * n_steps // content_frames
result.append(
self.render(grid, start, end, path=path, path_steps=step)
)
# Closing hold
final = self.render(grid, start, end, path=path)
result.extend([final.copy() for _ in range(m_end)])
return result
# ==================== Red-Path Extraction ====================
def extract_path_from_pixels(
self,
pixels: np.ndarray,
grid_raw: List[List[int]],
size: int,
start: Tuple[int, int],
pixel_threshold: float = 0.01,
) -> str:
"""
Detect red path in an RGB pixel array and return UDRL.
Uses **floating-point** cell boundaries matching the renderer to avoid
misalignment on sizes that don't evenly divide the image (e.g. 24, 48).
Args:
pixels: (H, W, 3) uint8 RGB array.
grid_raw: Bitmask grid as list[list[int]].
size: Maze dimension n.
start: Start coordinate (r, c).
pixel_threshold: Min red-pixel fraction to mark a cell.
Returns:
UDRL action string.
"""
img = Image.fromarray(pixels)
w, h = img.size
px = np.array(img, dtype=float)
r_ch, g_ch, b_ch = px[:, :, 0], px[:, :, 1], px[:, :, 2]
red_mask = (r_ch > 100) & (r_ch > g_ch * 1.2) & (r_ch > b_ch * 1.2)
# Use FLOAT cell size to match render() coordinate system exactly.
# Integer division (h // size) drifts by up to (size-1) * fractional
# pixels, causing the last cells to be completely misaligned.
cell_h_f = h / size
cell_w_f = w / size
path_grid = np.zeros((size, size), dtype=bool)
for r in range(size):
y0 = int(round(r * cell_h_f))
y1 = int(round((r + 1) * cell_h_f))
for c in range(size):
x0 = int(round(c * cell_w_f))
x1 = int(round((c + 1) * cell_w_f))
# Small inward margin to avoid wall / neighbour bleed-over
ch = y1 - y0
cw = x1 - x0
margin_y = max(1, int(ch * 0.15))
margin_x = max(1, int(cw * 0.15))
sub = red_mask[y0 + margin_y : y1 - margin_y,
x0 + margin_x : x1 - margin_x]
if sub.size > 0 and np.mean(sub) > pixel_threshold:
path_grid[r, c] = True
# Greedy walk from start, respecting maze walls
directions = [
("R", MOVES["R"]),
("D", MOVES["D"]),
("L", MOVES["L"]),
("U", MOVES["U"]),
]
visited = {start}
cr, cc = start
actions: List[str] = []
for _ in range(size * size * 2):
found = False
wval = grid_raw[cr][cc]
for act, (dr, dc, wall_ch) in directions:
nr, nc = cr + dr, cc + dc
if 0 <= nr < size and 0 <= nc < size:
if (wval & WALL_MASKS[wall_ch]) != 0:
continue
if path_grid[nr, nc] and (nr, nc) not in visited:
visited.add((nr, nc))
actions.append(act)
cr, cc = nr, nc
found = True
break
if not found:
break
return "".join(actions)
def extract_path_from_image(
self, img_path: str, grid_raw: List[List[int]], size: int, start: Tuple
) -> str:
"""Extract UDRL from an image file (convenience wrapper)."""
try:
pixels = np.array(Image.open(img_path).convert("RGB"))
return self.extract_path_from_pixels(pixels, grid_raw, size, start)
except Exception:
return ""
if __name__ == "__main__":
proc = MazeProcessor(img_size=512)
# Quick smoke test
grid, start, end, path = proc.generate(size=8, min_path_len=10)
n_steps = len(path) - 1
print(f"Maze 8×8 | path length {len(path)} | steps {n_steps}")
print(f"UDRL: {proc.path_to_udrl(path)}")
print(f"Verify: {proc.verify_path(grid, start, end, proc.path_to_udrl(path))}")
proc.render(grid, start, end).save("test_maze.png")
proc.render(grid, start, end, path=path).save("test_maze_solution.png")
# Test video frame modes
f1 = proc.generate_video_frames(grid, start, end, path, n_start=3, m_end=3)
assert len(f1) == 3 + n_steps + 3
f2 = proc.generate_video_frames(
grid, start, end, path, n_start=3, m_end=3, frames=n_steps * 3
)
assert len(f2) == 3 + n_steps * 3 + 3
half = max(1, n_steps // 2)
f3 = proc.generate_video_frames(
grid, start, end, path, n_start=3, m_end=3, frames=half
)
assert len(f3) == 3 + half + 3
print(f"frames=None → {len(f1)} total ({n_steps} content)")
print(f"frames={n_steps*3:<4d}{len(f2)} total (slow-mo)")
print(f"frames={half:<4d}{len(f3)} total (fast-fwd)")
print("All assertions passed ✓")