|
|
""" |
|
|
MazeProcessor - Maze generation, solving, rendering, and video frame generation. |
|
|
|
|
|
Mirrors the SudokuProcessor pattern: a single class encapsulating all maze logic |
|
|
including DFS generation, BFS solving, image/video rendering, path verification, |
|
|
and text serialization. |
|
|
""" |
|
|
import random |
|
|
from collections import deque |
|
|
from typing import List, Tuple, Optional, Dict |
|
|
|
|
|
import numpy as np |
|
|
from PIL import Image, ImageDraw |
|
|
|
|
|
|
|
|
WALL_MASKS = {"N": 1, "S": 2, "W": 4, "E": 8} |
|
|
OPPOSITE = {"N": "S", "S": "N", "E": "W", "W": "E"} |
|
|
MOVES = { |
|
|
"U": (-1, 0, "N"), |
|
|
"D": (1, 0, "S"), |
|
|
"L": (0, -1, "W"), |
|
|
"R": (0, 1, "E"), |
|
|
} |
|
|
NEIGHBORS = { |
|
|
"N": (-1, 0), |
|
|
"S": (1, 0), |
|
|
"E": (0, 1), |
|
|
"W": (0, -1), |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Grid = List[List[Dict[str, bool]]] |
|
|
|
|
|
|
|
|
class MazeProcessor: |
|
|
"""Handles maze generation, solving, image rendering, and video frame creation.""" |
|
|
|
|
|
def __init__(self, img_size: int = 512): |
|
|
self.img_size = img_size |
|
|
|
|
|
|
|
|
self.bg_color = "black" |
|
|
self.cell_color = "white" |
|
|
self.wall_color = "black" |
|
|
self.grid_color = (224, 224, 224) |
|
|
self.start_color = "yellow" |
|
|
self.end_color = "blue" |
|
|
self.path_color = "red" |
|
|
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
def _empty_grid(n: int) -> Grid: |
|
|
"""Create an n×n grid with all walls present.""" |
|
|
return [ |
|
|
[{"N": True, "E": True, "S": True, "W": True} for _ in range(n)] |
|
|
for _ in range(n) |
|
|
] |
|
|
|
|
|
@staticmethod |
|
|
def _remove_wall(grid: Grid, r: int, c: int, direction: str) -> None: |
|
|
"""Remove wall between (r,c) and its neighbour in *direction*.""" |
|
|
dr, dc = NEIGHBORS[direction] |
|
|
grid[r][c][direction] = False |
|
|
grid[r + dr][c + dc][OPPOSITE[direction]] = False |
|
|
|
|
|
def generate( |
|
|
self, size: int, min_path_len: int = 1, max_attempts: int = 200 |
|
|
) -> Tuple[Grid, Tuple[int, int], Tuple[int, int], np.ndarray]: |
|
|
""" |
|
|
Generate a perfect maze and a start/end pair whose shortest path |
|
|
length >= *min_path_len*. |
|
|
|
|
|
Returns: |
|
|
(grid, start, end, path) where path is an (L, 2) int array. |
|
|
""" |
|
|
for _ in range(max_attempts): |
|
|
grid = self._gen_dfs(size) |
|
|
nodes = [(r, c) for r in range(size) for c in range(size)] |
|
|
start, end = random.sample(nodes, 2) |
|
|
path = self.solve_bfs(grid, start, end) |
|
|
if path is not None and len(path) >= min_path_len: |
|
|
return grid, tuple(start), tuple(end), path |
|
|
raise RuntimeError( |
|
|
f"Failed to generate maze (size={size}, min_path_len={min_path_len}) " |
|
|
f"after {max_attempts} attempts." |
|
|
) |
|
|
|
|
|
def _gen_dfs(self, n: int) -> Grid: |
|
|
"""Randomised DFS (iterative) to carve a perfect maze.""" |
|
|
grid = self._empty_grid(n) |
|
|
visited = [[False] * n for _ in range(n)] |
|
|
sr, sc = random.randrange(n), random.randrange(n) |
|
|
visited[sr][sc] = True |
|
|
stack = [(sr, sc)] |
|
|
|
|
|
while stack: |
|
|
r, c = stack[-1] |
|
|
nbrs = [] |
|
|
for d, (dr, dc) in NEIGHBORS.items(): |
|
|
nr, nc = r + dr, c + dc |
|
|
if 0 <= nr < n and 0 <= nc < n and not visited[nr][nc]: |
|
|
nbrs.append((d, nr, nc)) |
|
|
if nbrs: |
|
|
d, nr, nc = random.choice(nbrs) |
|
|
self._remove_wall(grid, r, c, d) |
|
|
visited[nr][nc] = True |
|
|
stack.append((nr, nc)) |
|
|
else: |
|
|
stack.pop() |
|
|
return grid |
|
|
|
|
|
|
|
|
|
|
|
def solve_bfs( |
|
|
self, grid: Grid, start: Tuple[int, int], end: Tuple[int, int] |
|
|
) -> Optional[np.ndarray]: |
|
|
"""BFS shortest path. Returns (L,2) int ndarray or None.""" |
|
|
n = len(grid) |
|
|
q: deque = deque([(start, [start])]) |
|
|
visited = {start} |
|
|
|
|
|
while q: |
|
|
(r, c), path = q.popleft() |
|
|
if (r, c) == end: |
|
|
return np.array(path, dtype=int) |
|
|
cell = grid[r][c] |
|
|
for d, (dr, dc) in NEIGHBORS.items(): |
|
|
nr, nc = r + dr, c + dc |
|
|
if ( |
|
|
0 <= nr < n |
|
|
and 0 <= nc < n |
|
|
and not cell[d] |
|
|
and (nr, nc) not in visited |
|
|
): |
|
|
visited.add((nr, nc)) |
|
|
q.append(((nr, nc), path + [(nr, nc)])) |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
def path_to_udrl(path) -> str: |
|
|
"""Convert coordinate path to UDRL string.""" |
|
|
moves = [] |
|
|
for i in range(len(path) - 1): |
|
|
r1, c1 = path[i] |
|
|
r2, c2 = path[i + 1] |
|
|
if r2 < r1: |
|
|
moves.append("U") |
|
|
elif r2 > r1: |
|
|
moves.append("D") |
|
|
elif c2 < c1: |
|
|
moves.append("L") |
|
|
else: |
|
|
moves.append("R") |
|
|
return "".join(moves) |
|
|
|
|
|
|
|
|
|
|
|
def verify_path(self, grid: Grid, start: Tuple, end: Tuple, udrl: str, strict : bool = True) -> bool: |
|
|
"""Verify that *udrl* is a wall-respecting walk from *start* to *end*.""" |
|
|
n = len(grid) |
|
|
r, c = start |
|
|
for ch in udrl.replace(",", "").replace(" ", "").strip(): |
|
|
if ch not in MOVES: |
|
|
continue |
|
|
dr, dc, wall = MOVES[ch] |
|
|
if grid[r][c][wall]: |
|
|
return False |
|
|
nr, nc = r + dr, c + dc |
|
|
if not (0 <= nr < n and 0 <= nc < n): |
|
|
return False |
|
|
r, c = nr, nc |
|
|
if not strict and (r, c) == end: |
|
|
break |
|
|
|
|
|
return (r, c) == end |
|
|
|
|
|
|
|
|
|
|
|
def encode_grid(self, grid: Grid) -> str: |
|
|
"""Encode grid to compact bitmask string (one int per cell, row-major).""" |
|
|
rows = [] |
|
|
for row in grid: |
|
|
vals = [] |
|
|
for cell in row: |
|
|
v = 0 |
|
|
for d, mask in WALL_MASKS.items(): |
|
|
if cell[d]: |
|
|
v |= mask |
|
|
vals.append(str(v)) |
|
|
rows.append(" ".join(vals)) |
|
|
return "\n".join(rows) |
|
|
|
|
|
def decode_grid(self, text_lines: List[str]) -> Grid: |
|
|
"""Decode bitmask text lines back to grid dicts.""" |
|
|
grid = [] |
|
|
for line in text_lines: |
|
|
row = [] |
|
|
for val_s in line.split(): |
|
|
val = int(val_s) |
|
|
row.append({d: bool(val & mask) for d, mask in WALL_MASKS.items()}) |
|
|
grid.append(row) |
|
|
return grid |
|
|
|
|
|
def save_text(self, filepath, grid: Grid, start: Tuple, end: Tuple) -> None: |
|
|
"""Save maze to compact text file.""" |
|
|
n = len(grid) |
|
|
with open(filepath, "w") as f: |
|
|
f.write(f"{n}\n{start[0]} {start[1]}\n{end[0]} {end[1]}\n") |
|
|
f.write(self.encode_grid(grid) + "\n") |
|
|
|
|
|
def load_text(self, filepath) -> Optional[Dict]: |
|
|
""" |
|
|
Load maze from text file. |
|
|
|
|
|
Returns dict with keys: size, start, end, grid (dict-based), |
|
|
grid_raw (list[list[int]] bitmask). None on failure. |
|
|
""" |
|
|
try: |
|
|
with open(filepath) as f: |
|
|
lines = [l.strip() for l in f if l.strip()] |
|
|
n = int(lines[0]) |
|
|
sr, sc = map(int, lines[1].split()) |
|
|
er, ec = map(int, lines[2].split()) |
|
|
grid = self.decode_grid(lines[3 : 3 + n]) |
|
|
grid_raw: List[List[int]] = [] |
|
|
for r in range(n): |
|
|
grid_raw.append(list(map(int, lines[3 + r].split()))) |
|
|
return { |
|
|
"size": n, |
|
|
"start": (sr, sc), |
|
|
"end": (er, ec), |
|
|
"grid": grid, |
|
|
"grid_raw": grid_raw, |
|
|
} |
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
def fingerprint(self, grid: Grid, start: Tuple, end: Tuple) -> str: |
|
|
"""Content fingerprint for deduplication.""" |
|
|
n = len(grid) |
|
|
parts = [f"{n},{start[0]},{start[1]},{end[0]},{end[1]}"] |
|
|
for row in grid: |
|
|
for cell in row: |
|
|
v = sum(WALL_MASKS[d] for d in WALL_MASKS if cell[d]) |
|
|
parts.append(str(v)) |
|
|
return "|".join(parts) |
|
|
|
|
|
|
|
|
|
|
|
def _layout(self, n: int): |
|
|
"""Compute rendering layout parameters.""" |
|
|
cell_f = float(self.img_size) / n |
|
|
wall_f = cell_f / 4.0 |
|
|
half_f = wall_f / 2.0 |
|
|
grid_w = max(1, int(cell_f / 16.0)) |
|
|
return cell_f, wall_f, half_f, grid_w |
|
|
|
|
|
def render( |
|
|
self, |
|
|
grid: Grid, |
|
|
start: Tuple[int, int], |
|
|
end: Tuple[int, int], |
|
|
path: Optional[np.ndarray] = None, |
|
|
path_steps: Optional[int] = None, |
|
|
) -> Image.Image: |
|
|
""" |
|
|
Render maze as a PIL image. |
|
|
|
|
|
Args: |
|
|
grid: The maze grid. |
|
|
start, end: Coordinates of start/end cells. |
|
|
path: Full solution path (L, 2). |
|
|
path_steps: Draw only the first *path_steps* segments (for video). |
|
|
|
|
|
Returns: |
|
|
PIL.Image (RGB, img_size × img_size). |
|
|
""" |
|
|
n = len(grid) |
|
|
cell_f, wall_f, half_f, grid_w = self._layout(n) |
|
|
|
|
|
img = Image.new("RGB", (self.img_size, self.img_size), self.bg_color) |
|
|
draw = ImageDraw.Draw(img) |
|
|
|
|
|
|
|
|
for r in range(n): |
|
|
for c in range(n): |
|
|
x1 = c * cell_f + half_f |
|
|
y1 = r * cell_f + half_f |
|
|
x2 = (c + 1) * cell_f - half_f |
|
|
y2 = (r + 1) * cell_f - half_f |
|
|
draw.rectangle([(x1, y1), (x2, y2)], fill=self.cell_color) |
|
|
cell = grid[r][c] |
|
|
if not cell["S"] and r < n - 1: |
|
|
draw.rectangle( |
|
|
[(x1, y2), (x2, y2 + wall_f)], fill=self.cell_color |
|
|
) |
|
|
if not cell["E"] and c < n - 1: |
|
|
draw.rectangle( |
|
|
[(x2, y1), (x2 + wall_f, y2)], fill=self.cell_color |
|
|
) |
|
|
|
|
|
|
|
|
for r in range(n): |
|
|
for c in range(n): |
|
|
if r < n - 1 and not grid[r][c]["S"]: |
|
|
y = (r + 1) * cell_f |
|
|
draw.line( |
|
|
[(c * cell_f + half_f, y), ((c + 1) * cell_f - half_f, y)], |
|
|
fill=self.grid_color, width=grid_w, |
|
|
) |
|
|
if c < n - 1 and not grid[r][c]["E"]: |
|
|
x = (c + 1) * cell_f |
|
|
draw.line( |
|
|
[(x, r * cell_f + half_f), (x, (r + 1) * cell_f - half_f)], |
|
|
fill=self.grid_color, width=grid_w, |
|
|
) |
|
|
|
|
|
|
|
|
def _dot(rc, color): |
|
|
rr, cc = rc |
|
|
cx = cc * cell_f + cell_f / 2 |
|
|
cy = rr * cell_f + cell_f / 2 |
|
|
rad = max(2, int((cell_f - wall_f) * 0.25)) |
|
|
draw.ellipse([cx - rad, cy - rad, cx + rad, cy + rad], fill=color) |
|
|
|
|
|
_dot(start, self.start_color) |
|
|
_dot(end, self.end_color) |
|
|
|
|
|
|
|
|
if path is not None and len(path) >= 2: |
|
|
end_idx = ( |
|
|
len(path) if path_steps is None |
|
|
else min(path_steps + 1, len(path)) |
|
|
) |
|
|
if end_idx >= 2: |
|
|
pts = [ |
|
|
(c * cell_f + cell_f / 2, r * cell_f + cell_f / 2) |
|
|
for r, c in path[:end_idx] |
|
|
] |
|
|
draw.line( |
|
|
pts, fill=self.path_color, |
|
|
width=max(1, int(wall_f)), joint="curve", |
|
|
) |
|
|
|
|
|
return img |
|
|
|
|
|
|
|
|
|
|
|
def generate_video_frames( |
|
|
self, |
|
|
grid: Grid, |
|
|
start: Tuple[int, int], |
|
|
end: Tuple[int, int], |
|
|
path: np.ndarray, |
|
|
n_start: int = 5, |
|
|
m_end: int = 5, |
|
|
frames: Optional[int] = None, |
|
|
) -> List[Image.Image]: |
|
|
""" |
|
|
Generate progressive video frames showing the red line growing. |
|
|
|
|
|
*frames* controls the number of **content frames** between holds: |
|
|
- None → 1 per path step |
|
|
- frames > steps → slow-motion |
|
|
- frames < steps → fast-forward |
|
|
|
|
|
Total length = n_start + content_frames + m_end. |
|
|
""" |
|
|
n_steps = len(path) - 1 |
|
|
if n_steps <= 0: |
|
|
blank = self.render(grid, start, end) |
|
|
return [blank] * (n_start + m_end + 1) |
|
|
|
|
|
content_frames = frames if frames is not None else n_steps |
|
|
content_frames = max(1, content_frames) |
|
|
|
|
|
result: List[Image.Image] = [] |
|
|
|
|
|
|
|
|
blank = self.render(grid, start, end) |
|
|
result.extend([blank.copy() for _ in range(n_start)]) |
|
|
|
|
|
|
|
|
if content_frames == n_steps: |
|
|
for step in range(1, n_steps + 1): |
|
|
result.append( |
|
|
self.render(grid, start, end, path=path, path_steps=step) |
|
|
) |
|
|
elif content_frames > n_steps: |
|
|
for step in range(1, n_steps + 1): |
|
|
f_lo = (step - 1) * content_frames // n_steps |
|
|
f_hi = step * content_frames // n_steps |
|
|
count = f_hi - f_lo |
|
|
frame_img = self.render( |
|
|
grid, start, end, path=path, path_steps=step |
|
|
) |
|
|
result.append(frame_img) |
|
|
if count > 1: |
|
|
result.extend([frame_img.copy() for _ in range(count - 1)]) |
|
|
else: |
|
|
for f in range(content_frames): |
|
|
step = (f + 1) * n_steps // content_frames |
|
|
result.append( |
|
|
self.render(grid, start, end, path=path, path_steps=step) |
|
|
) |
|
|
|
|
|
|
|
|
final = self.render(grid, start, end, path=path) |
|
|
result.extend([final.copy() for _ in range(m_end)]) |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
def extract_path_from_pixels( |
|
|
self, |
|
|
pixels: np.ndarray, |
|
|
grid_raw: List[List[int]], |
|
|
size: int, |
|
|
start: Tuple[int, int], |
|
|
pixel_threshold: float = 0.01, |
|
|
) -> str: |
|
|
""" |
|
|
Detect red path in an RGB pixel array and return UDRL. |
|
|
|
|
|
Uses **floating-point** cell boundaries matching the renderer to avoid |
|
|
misalignment on sizes that don't evenly divide the image (e.g. 24, 48). |
|
|
|
|
|
Args: |
|
|
pixels: (H, W, 3) uint8 RGB array. |
|
|
grid_raw: Bitmask grid as list[list[int]]. |
|
|
size: Maze dimension n. |
|
|
start: Start coordinate (r, c). |
|
|
pixel_threshold: Min red-pixel fraction to mark a cell. |
|
|
|
|
|
Returns: |
|
|
UDRL action string. |
|
|
""" |
|
|
img = Image.fromarray(pixels) |
|
|
w, h = img.size |
|
|
px = np.array(img, dtype=float) |
|
|
|
|
|
r_ch, g_ch, b_ch = px[:, :, 0], px[:, :, 1], px[:, :, 2] |
|
|
red_mask = (r_ch > 100) & (r_ch > g_ch * 1.2) & (r_ch > b_ch * 1.2) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cell_h_f = h / size |
|
|
cell_w_f = w / size |
|
|
|
|
|
path_grid = np.zeros((size, size), dtype=bool) |
|
|
for r in range(size): |
|
|
y0 = int(round(r * cell_h_f)) |
|
|
y1 = int(round((r + 1) * cell_h_f)) |
|
|
for c in range(size): |
|
|
x0 = int(round(c * cell_w_f)) |
|
|
x1 = int(round((c + 1) * cell_w_f)) |
|
|
|
|
|
ch = y1 - y0 |
|
|
cw = x1 - x0 |
|
|
margin_y = max(1, int(ch * 0.15)) |
|
|
margin_x = max(1, int(cw * 0.15)) |
|
|
sub = red_mask[y0 + margin_y : y1 - margin_y, |
|
|
x0 + margin_x : x1 - margin_x] |
|
|
if sub.size > 0 and np.mean(sub) > pixel_threshold: |
|
|
path_grid[r, c] = True |
|
|
|
|
|
|
|
|
directions = [ |
|
|
("R", MOVES["R"]), |
|
|
("D", MOVES["D"]), |
|
|
("L", MOVES["L"]), |
|
|
("U", MOVES["U"]), |
|
|
] |
|
|
visited = {start} |
|
|
cr, cc = start |
|
|
actions: List[str] = [] |
|
|
for _ in range(size * size * 2): |
|
|
found = False |
|
|
wval = grid_raw[cr][cc] |
|
|
for act, (dr, dc, wall_ch) in directions: |
|
|
nr, nc = cr + dr, cc + dc |
|
|
if 0 <= nr < size and 0 <= nc < size: |
|
|
if (wval & WALL_MASKS[wall_ch]) != 0: |
|
|
continue |
|
|
if path_grid[nr, nc] and (nr, nc) not in visited: |
|
|
visited.add((nr, nc)) |
|
|
actions.append(act) |
|
|
cr, cc = nr, nc |
|
|
found = True |
|
|
break |
|
|
if not found: |
|
|
break |
|
|
return "".join(actions) |
|
|
|
|
|
def extract_path_from_image( |
|
|
self, img_path: str, grid_raw: List[List[int]], size: int, start: Tuple |
|
|
) -> str: |
|
|
"""Extract UDRL from an image file (convenience wrapper).""" |
|
|
try: |
|
|
pixels = np.array(Image.open(img_path).convert("RGB")) |
|
|
return self.extract_path_from_pixels(pixels, grid_raw, size, start) |
|
|
except Exception: |
|
|
return "" |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
proc = MazeProcessor(img_size=512) |
|
|
|
|
|
|
|
|
grid, start, end, path = proc.generate(size=8, min_path_len=10) |
|
|
n_steps = len(path) - 1 |
|
|
print(f"Maze 8×8 | path length {len(path)} | steps {n_steps}") |
|
|
print(f"UDRL: {proc.path_to_udrl(path)}") |
|
|
print(f"Verify: {proc.verify_path(grid, start, end, proc.path_to_udrl(path))}") |
|
|
|
|
|
proc.render(grid, start, end).save("test_maze.png") |
|
|
proc.render(grid, start, end, path=path).save("test_maze_solution.png") |
|
|
|
|
|
|
|
|
f1 = proc.generate_video_frames(grid, start, end, path, n_start=3, m_end=3) |
|
|
assert len(f1) == 3 + n_steps + 3 |
|
|
|
|
|
f2 = proc.generate_video_frames( |
|
|
grid, start, end, path, n_start=3, m_end=3, frames=n_steps * 3 |
|
|
) |
|
|
assert len(f2) == 3 + n_steps * 3 + 3 |
|
|
|
|
|
half = max(1, n_steps // 2) |
|
|
f3 = proc.generate_video_frames( |
|
|
grid, start, end, path, n_start=3, m_end=3, frames=half |
|
|
) |
|
|
assert len(f3) == 3 + half + 3 |
|
|
|
|
|
print(f"frames=None → {len(f1)} total ({n_steps} content)") |
|
|
print(f"frames={n_steps*3:<4d} → {len(f2)} total (slow-mo)") |
|
|
print(f"frames={half:<4d} → {len(f3)} total (fast-fwd)") |
|
|
print("All assertions passed ✓") |