""" MazeProcessor - Maze generation, solving, rendering, and video frame generation. Mirrors the SudokuProcessor pattern: a single class encapsulating all maze logic including DFS generation, BFS solving, image/video rendering, path verification, and text serialization. """ import random from collections import deque from typing import List, Tuple, Optional, Dict import numpy as np from PIL import Image, ImageDraw # Wall bitmask encoding (matches text file format) WALL_MASKS = {"N": 1, "S": 2, "W": 4, "E": 8} OPPOSITE = {"N": "S", "S": "N", "E": "W", "W": "E"} MOVES = { "U": (-1, 0, "N"), "D": (1, 0, "S"), "L": (0, -1, "W"), "R": (0, 1, "E"), } NEIGHBORS = { "N": (-1, 0), "S": (1, 0), "E": (0, 1), "W": (0, -1), } # ======================== Grid Type ======================== # grid[r][c] = {"N": bool, "S": bool, "W": bool, "E": bool} # True => wall present, False => passage open Grid = List[List[Dict[str, bool]]] class MazeProcessor: """Handles maze generation, solving, image rendering, and video frame creation.""" def __init__(self, img_size: int = 512): self.img_size = img_size # Rendering colours (RGB) self.bg_color = "black" self.cell_color = "white" self.wall_color = "black" self.grid_color = (224, 224, 224) self.start_color = "yellow" self.end_color = "blue" self.path_color = "red" # ==================== Generation (DFS) ==================== @staticmethod def _empty_grid(n: int) -> Grid: """Create an n×n grid with all walls present.""" return [ [{"N": True, "E": True, "S": True, "W": True} for _ in range(n)] for _ in range(n) ] @staticmethod def _remove_wall(grid: Grid, r: int, c: int, direction: str) -> None: """Remove wall between (r,c) and its neighbour in *direction*.""" dr, dc = NEIGHBORS[direction] grid[r][c][direction] = False grid[r + dr][c + dc][OPPOSITE[direction]] = False def generate( self, size: int, min_path_len: int = 1, max_attempts: int = 200 ) -> Tuple[Grid, Tuple[int, int], Tuple[int, int], np.ndarray]: """ Generate a perfect maze and a start/end pair whose shortest path length >= *min_path_len*. Returns: (grid, start, end, path) where path is an (L, 2) int array. """ for _ in range(max_attempts): grid = self._gen_dfs(size) nodes = [(r, c) for r in range(size) for c in range(size)] start, end = random.sample(nodes, 2) path = self.solve_bfs(grid, start, end) if path is not None and len(path) >= min_path_len: return grid, tuple(start), tuple(end), path raise RuntimeError( f"Failed to generate maze (size={size}, min_path_len={min_path_len}) " f"after {max_attempts} attempts." ) def _gen_dfs(self, n: int) -> Grid: """Randomised DFS (iterative) to carve a perfect maze.""" grid = self._empty_grid(n) visited = [[False] * n for _ in range(n)] sr, sc = random.randrange(n), random.randrange(n) visited[sr][sc] = True stack = [(sr, sc)] while stack: r, c = stack[-1] nbrs = [] for d, (dr, dc) in NEIGHBORS.items(): nr, nc = r + dr, c + dc if 0 <= nr < n and 0 <= nc < n and not visited[nr][nc]: nbrs.append((d, nr, nc)) if nbrs: d, nr, nc = random.choice(nbrs) self._remove_wall(grid, r, c, d) visited[nr][nc] = True stack.append((nr, nc)) else: stack.pop() return grid # ==================== Solving (BFS) ==================== def solve_bfs( self, grid: Grid, start: Tuple[int, int], end: Tuple[int, int] ) -> Optional[np.ndarray]: """BFS shortest path. Returns (L,2) int ndarray or None.""" n = len(grid) q: deque = deque([(start, [start])]) visited = {start} while q: (r, c), path = q.popleft() if (r, c) == end: return np.array(path, dtype=int) cell = grid[r][c] for d, (dr, dc) in NEIGHBORS.items(): nr, nc = r + dr, c + dc if ( 0 <= nr < n and 0 <= nc < n and not cell[d] and (nr, nc) not in visited ): visited.add((nr, nc)) q.append(((nr, nc), path + [(nr, nc)])) return None # ==================== Path ↔ UDRL ==================== @staticmethod def path_to_udrl(path) -> str: """Convert coordinate path to UDRL string.""" moves = [] for i in range(len(path) - 1): r1, c1 = path[i] r2, c2 = path[i + 1] if r2 < r1: moves.append("U") elif r2 > r1: moves.append("D") elif c2 < c1: moves.append("L") else: moves.append("R") return "".join(moves) # ==================== Verification ==================== def verify_path(self, grid: Grid, start: Tuple, end: Tuple, udrl: str, strict : bool = True) -> bool: """Verify that *udrl* is a wall-respecting walk from *start* to *end*.""" n = len(grid) r, c = start for ch in udrl.replace(",", "").replace(" ", "").strip(): if ch not in MOVES: continue dr, dc, wall = MOVES[ch] if grid[r][c][wall]: return False nr, nc = r + dr, c + dc if not (0 <= nr < n and 0 <= nc < n): return False r, c = nr, nc if not strict and (r, c) == end: break return (r, c) == end # ==================== Text Encoding ==================== def encode_grid(self, grid: Grid) -> str: """Encode grid to compact bitmask string (one int per cell, row-major).""" rows = [] for row in grid: vals = [] for cell in row: v = 0 for d, mask in WALL_MASKS.items(): if cell[d]: v |= mask vals.append(str(v)) rows.append(" ".join(vals)) return "\n".join(rows) def decode_grid(self, text_lines: List[str]) -> Grid: """Decode bitmask text lines back to grid dicts.""" grid = [] for line in text_lines: row = [] for val_s in line.split(): val = int(val_s) row.append({d: bool(val & mask) for d, mask in WALL_MASKS.items()}) grid.append(row) return grid def save_text(self, filepath, grid: Grid, start: Tuple, end: Tuple) -> None: """Save maze to compact text file.""" n = len(grid) with open(filepath, "w") as f: f.write(f"{n}\n{start[0]} {start[1]}\n{end[0]} {end[1]}\n") f.write(self.encode_grid(grid) + "\n") def load_text(self, filepath) -> Optional[Dict]: """ Load maze from text file. Returns dict with keys: size, start, end, grid (dict-based), grid_raw (list[list[int]] bitmask). None on failure. """ try: with open(filepath) as f: lines = [l.strip() for l in f if l.strip()] n = int(lines[0]) sr, sc = map(int, lines[1].split()) er, ec = map(int, lines[2].split()) grid = self.decode_grid(lines[3 : 3 + n]) grid_raw: List[List[int]] = [] for r in range(n): grid_raw.append(list(map(int, lines[3 + r].split()))) return { "size": n, "start": (sr, sc), "end": (er, ec), "grid": grid, "grid_raw": grid_raw, } except Exception: return None def fingerprint(self, grid: Grid, start: Tuple, end: Tuple) -> str: """Content fingerprint for deduplication.""" n = len(grid) parts = [f"{n},{start[0]},{start[1]},{end[0]},{end[1]}"] for row in grid: for cell in row: v = sum(WALL_MASKS[d] for d in WALL_MASKS if cell[d]) parts.append(str(v)) return "|".join(parts) # ==================== Image Rendering ==================== def _layout(self, n: int): """Compute rendering layout parameters.""" cell_f = float(self.img_size) / n wall_f = cell_f / 4.0 half_f = wall_f / 2.0 grid_w = max(1, int(cell_f / 16.0)) return cell_f, wall_f, half_f, grid_w def render( self, grid: Grid, start: Tuple[int, int], end: Tuple[int, int], path: Optional[np.ndarray] = None, path_steps: Optional[int] = None, ) -> Image.Image: """ Render maze as a PIL image. Args: grid: The maze grid. start, end: Coordinates of start/end cells. path: Full solution path (L, 2). path_steps: Draw only the first *path_steps* segments (for video). Returns: PIL.Image (RGB, img_size × img_size). """ n = len(grid) cell_f, wall_f, half_f, grid_w = self._layout(n) img = Image.new("RGB", (self.img_size, self.img_size), self.bg_color) draw = ImageDraw.Draw(img) # --- fill cells & open passages --- for r in range(n): for c in range(n): x1 = c * cell_f + half_f y1 = r * cell_f + half_f x2 = (c + 1) * cell_f - half_f y2 = (r + 1) * cell_f - half_f draw.rectangle([(x1, y1), (x2, y2)], fill=self.cell_color) cell = grid[r][c] if not cell["S"] and r < n - 1: draw.rectangle( [(x1, y2), (x2, y2 + wall_f)], fill=self.cell_color ) if not cell["E"] and c < n - 1: draw.rectangle( [(x2, y1), (x2 + wall_f, y2)], fill=self.cell_color ) # --- subtle grid lines on open passages --- for r in range(n): for c in range(n): if r < n - 1 and not grid[r][c]["S"]: y = (r + 1) * cell_f draw.line( [(c * cell_f + half_f, y), ((c + 1) * cell_f - half_f, y)], fill=self.grid_color, width=grid_w, ) if c < n - 1 and not grid[r][c]["E"]: x = (c + 1) * cell_f draw.line( [(x, r * cell_f + half_f), (x, (r + 1) * cell_f - half_f)], fill=self.grid_color, width=grid_w, ) # --- start / end dots --- def _dot(rc, color): rr, cc = rc cx = cc * cell_f + cell_f / 2 cy = rr * cell_f + cell_f / 2 rad = max(2, int((cell_f - wall_f) * 0.25)) draw.ellipse([cx - rad, cy - rad, cx + rad, cy + rad], fill=color) _dot(start, self.start_color) _dot(end, self.end_color) # --- solution path (optionally partial) --- if path is not None and len(path) >= 2: end_idx = ( len(path) if path_steps is None else min(path_steps + 1, len(path)) ) if end_idx >= 2: pts = [ (c * cell_f + cell_f / 2, r * cell_f + cell_f / 2) for r, c in path[:end_idx] ] draw.line( pts, fill=self.path_color, width=max(1, int(wall_f)), joint="curve", ) return img # ==================== Video Frame Generation ==================== def generate_video_frames( self, grid: Grid, start: Tuple[int, int], end: Tuple[int, int], path: np.ndarray, n_start: int = 5, m_end: int = 5, frames: Optional[int] = None, ) -> List[Image.Image]: """ Generate progressive video frames showing the red line growing. *frames* controls the number of **content frames** between holds: - None → 1 per path step - frames > steps → slow-motion - frames < steps → fast-forward Total length = n_start + content_frames + m_end. """ n_steps = len(path) - 1 if n_steps <= 0: blank = self.render(grid, start, end) return [blank] * (n_start + m_end + 1) content_frames = frames if frames is not None else n_steps content_frames = max(1, content_frames) result: List[Image.Image] = [] # Opening hold blank = self.render(grid, start, end) result.extend([blank.copy() for _ in range(n_start)]) # Content frames if content_frames == n_steps: for step in range(1, n_steps + 1): result.append( self.render(grid, start, end, path=path, path_steps=step) ) elif content_frames > n_steps: for step in range(1, n_steps + 1): f_lo = (step - 1) * content_frames // n_steps f_hi = step * content_frames // n_steps count = f_hi - f_lo frame_img = self.render( grid, start, end, path=path, path_steps=step ) result.append(frame_img) if count > 1: result.extend([frame_img.copy() for _ in range(count - 1)]) else: for f in range(content_frames): step = (f + 1) * n_steps // content_frames result.append( self.render(grid, start, end, path=path, path_steps=step) ) # Closing hold final = self.render(grid, start, end, path=path) result.extend([final.copy() for _ in range(m_end)]) return result # ==================== Red-Path Extraction ==================== def extract_path_from_pixels( self, pixels: np.ndarray, grid_raw: List[List[int]], size: int, start: Tuple[int, int], pixel_threshold: float = 0.01, ) -> str: """ Detect red path in an RGB pixel array and return UDRL. Uses **floating-point** cell boundaries matching the renderer to avoid misalignment on sizes that don't evenly divide the image (e.g. 24, 48). Args: pixels: (H, W, 3) uint8 RGB array. grid_raw: Bitmask grid as list[list[int]]. size: Maze dimension n. start: Start coordinate (r, c). pixel_threshold: Min red-pixel fraction to mark a cell. Returns: UDRL action string. """ img = Image.fromarray(pixels) w, h = img.size px = np.array(img, dtype=float) r_ch, g_ch, b_ch = px[:, :, 0], px[:, :, 1], px[:, :, 2] red_mask = (r_ch > 100) & (r_ch > g_ch * 1.2) & (r_ch > b_ch * 1.2) # Use FLOAT cell size to match render() coordinate system exactly. # Integer division (h // size) drifts by up to (size-1) * fractional # pixels, causing the last cells to be completely misaligned. cell_h_f = h / size cell_w_f = w / size path_grid = np.zeros((size, size), dtype=bool) for r in range(size): y0 = int(round(r * cell_h_f)) y1 = int(round((r + 1) * cell_h_f)) for c in range(size): x0 = int(round(c * cell_w_f)) x1 = int(round((c + 1) * cell_w_f)) # Small inward margin to avoid wall / neighbour bleed-over ch = y1 - y0 cw = x1 - x0 margin_y = max(1, int(ch * 0.15)) margin_x = max(1, int(cw * 0.15)) sub = red_mask[y0 + margin_y : y1 - margin_y, x0 + margin_x : x1 - margin_x] if sub.size > 0 and np.mean(sub) > pixel_threshold: path_grid[r, c] = True # Greedy walk from start, respecting maze walls directions = [ ("R", MOVES["R"]), ("D", MOVES["D"]), ("L", MOVES["L"]), ("U", MOVES["U"]), ] visited = {start} cr, cc = start actions: List[str] = [] for _ in range(size * size * 2): found = False wval = grid_raw[cr][cc] for act, (dr, dc, wall_ch) in directions: nr, nc = cr + dr, cc + dc if 0 <= nr < size and 0 <= nc < size: if (wval & WALL_MASKS[wall_ch]) != 0: continue if path_grid[nr, nc] and (nr, nc) not in visited: visited.add((nr, nc)) actions.append(act) cr, cc = nr, nc found = True break if not found: break return "".join(actions) def extract_path_from_image( self, img_path: str, grid_raw: List[List[int]], size: int, start: Tuple ) -> str: """Extract UDRL from an image file (convenience wrapper).""" try: pixels = np.array(Image.open(img_path).convert("RGB")) return self.extract_path_from_pixels(pixels, grid_raw, size, start) except Exception: return "" if __name__ == "__main__": proc = MazeProcessor(img_size=512) # Quick smoke test grid, start, end, path = proc.generate(size=8, min_path_len=10) n_steps = len(path) - 1 print(f"Maze 8×8 | path length {len(path)} | steps {n_steps}") print(f"UDRL: {proc.path_to_udrl(path)}") print(f"Verify: {proc.verify_path(grid, start, end, proc.path_to_udrl(path))}") proc.render(grid, start, end).save("test_maze.png") proc.render(grid, start, end, path=path).save("test_maze_solution.png") # Test video frame modes f1 = proc.generate_video_frames(grid, start, end, path, n_start=3, m_end=3) assert len(f1) == 3 + n_steps + 3 f2 = proc.generate_video_frames( grid, start, end, path, n_start=3, m_end=3, frames=n_steps * 3 ) assert len(f2) == 3 + n_steps * 3 + 3 half = max(1, n_steps // 2) f3 = proc.generate_video_frames( grid, start, end, path, n_start=3, m_end=3, frames=half ) assert len(f3) == 3 + half + 3 print(f"frames=None → {len(f1)} total ({n_steps} content)") print(f"frames={n_steps*3:<4d} → {len(f2)} total (slow-mo)") print(f"frames={half:<4d} → {len(f3)} total (fast-fwd)") print("All assertions passed ✓")