""" Nexus-Nano Search Engine Fast alpha-beta with minimal overhead Focus: Speed > Depth Target: Sub-second responses """ import chess import logging from typing import Optional, Tuple, List, Dict from .evaluate import NexusNanoEvaluator from .transposition import TranspositionTable, NodeType from .move_ordering import MoveOrderer from .time_manager import TimeManager from .endgame import EndgameDetector logger = logging.getLogger(__name__) class NexusNanoEngine: """Ultra-fast 2.8M parameter chess engine""" MATE_SCORE = 100000 MAX_PLY = 100 def __init__(self, model_path: str, num_threads: int = 1): """Initialize with single-threaded config""" self.evaluator = NexusNanoEvaluator(model_path, num_threads) self.tt = TranspositionTable(size_mb=64) # 64MB TT self.move_orderer = MoveOrderer() self.time_manager = TimeManager() self.endgame_detector = EndgameDetector() self.nodes_evaluated = 0 self.depth_reached = 0 self.sel_depth = 0 self.principal_variation = [] logger.info("⚡ Nexus-Nano Engine initialized") logger.info(f" Model: {self.evaluator.get_model_size_mb():.2f} MB") logger.info(f" TT: 64 MB") def get_best_move( self, fen: str, depth: int = 4, time_limit: int = 2000 ) -> Dict: """ Fast move search Args: fen: Position depth: Max depth (1-6 recommended) time_limit: Time in ms """ board = chess.Board(fen) # Reset self.nodes_evaluated = 0 self.depth_reached = 0 self.sel_depth = 0 self.principal_variation = [] # Time setup time_limit_sec = time_limit / 1000.0 self.time_manager.start_search(time_limit_sec, time_limit_sec) # Clear old data self.move_orderer.clear() self.tt.increment_age() # Special cases legal_moves = list(board.legal_moves) if len(legal_moves) == 0: return self._no_legal_moves() if len(legal_moves) == 1: return self._single_move(board, legal_moves[0]) # Iterative deepening (fast) best_move = legal_moves[0] best_score = float('-inf') for current_depth in range(1, depth + 1): if self.time_manager.should_stop(current_depth): break score, move, pv = self._search_root( board, current_depth, float('-inf'), float('inf') ) if move: best_move = move best_score = score self.depth_reached = current_depth self.principal_variation = pv return { 'best_move': best_move.uci(), 'evaluation': round(best_score / 100.0, 2), 'depth_searched': self.depth_reached, 'seldepth': self.sel_depth, 'nodes_evaluated': self.nodes_evaluated, 'time_taken': int(self.time_manager.elapsed() * 1000), 'pv': [m.uci() for m in self.principal_variation], 'nps': int(self.nodes_evaluated / max(self.time_manager.elapsed(), 0.001)), 'tt_stats': self.tt.get_stats(), 'move_ordering_stats': self.move_orderer.get_stats() } def _search_root( self, board: chess.Board, depth: int, alpha: float, beta: float ) -> Tuple[float, Optional[chess.Move], List[chess.Move]]: """Root search""" legal_moves = list(board.legal_moves) # TT probe zobrist_key = self.tt.compute_zobrist_key(board) tt_result = self.tt.probe(zobrist_key, depth, alpha, beta) tt_move = tt_result[1] if tt_result else None # Order moves ordered_moves = self.move_orderer.order_moves( board, legal_moves, depth, tt_move ) best_move = ordered_moves[0] best_score = float('-inf') best_pv = [] for move in ordered_moves: board.push(move) score, pv = self._alpha_beta(board, depth - 1, -beta, -alpha) score = -score board.pop() if score > best_score: best_score = score best_move = move best_pv = [move] + pv if score > alpha: alpha = score if self.time_manager.should_stop(depth): break self.tt.store(zobrist_key, depth, best_score, NodeType.EXACT, best_move) return best_score, best_move, best_pv def _alpha_beta( self, board: chess.Board, depth: int, alpha: float, beta: float ) -> Tuple[float, List[chess.Move]]: """Fast alpha-beta search""" self.sel_depth = max(self.sel_depth, self.MAX_PLY - depth) # Draw check if board.is_repetition(2) or board.is_fifty_moves(): return 0, [] # TT probe zobrist_key = self.tt.compute_zobrist_key(board) tt_result = self.tt.probe(zobrist_key, depth, alpha, beta) if tt_result and tt_result[0] is not None: return tt_result[0], [] tt_move = tt_result[1] if tt_result else None # Quiescence if depth <= 0: return self._quiescence(board, alpha, beta, 0), [] # Legal moves legal_moves = list(board.legal_moves) if not legal_moves: if board.is_check(): return -self.MATE_SCORE + (self.MAX_PLY - depth), [] return 0, [] ordered_moves = self.move_orderer.order_moves( board, legal_moves, depth, tt_move ) # Search best_score = float('-inf') best_pv = [] node_type = NodeType.UPPER_BOUND for move in ordered_moves: board.push(move) score, pv = self._alpha_beta(board, depth - 1, -beta, -alpha) score = -score board.pop() if score > best_score: best_score = score best_pv = [move] + pv if score > alpha: alpha = score node_type = NodeType.EXACT if not board.is_capture(move): self.move_orderer.update_killer_move(move, depth) if score >= beta: node_type = NodeType.LOWER_BOUND break self.tt.store(zobrist_key, depth, best_score, node_type, best_pv[0] if best_pv else None) return best_score, best_pv def _quiescence( self, board: chess.Board, alpha: float, beta: float, qs_depth: int ) -> float: """Fast quiescence (captures only)""" self.nodes_evaluated += 1 # Stand-pat stand_pat = self.evaluator.evaluate_hybrid(board) stand_pat = self.endgame_detector.adjust_evaluation(board, stand_pat) if stand_pat >= beta: return beta if alpha < stand_pat: alpha = stand_pat # Depth limit if qs_depth >= 6: return stand_pat # Captures only (no checks for speed) captures = [m for m in board.legal_moves if board.is_capture(m)] if not captures: return stand_pat captures = self.move_orderer.order_moves(board, captures, 0) for move in captures: board.push(move) score = -self._quiescence(board, -beta, -alpha, qs_depth + 1) board.pop() if score >= beta: return beta if score > alpha: alpha = score return alpha def _no_legal_moves(self) -> Dict: return { 'best_move': '0000', 'evaluation': 0.0, 'depth_searched': 0, 'nodes_evaluated': 0, 'time_taken': 0 } def _single_move(self, board: chess.Board, move: chess.Move) -> Dict: eval_score = self.evaluator.evaluate_hybrid(board) return { 'best_move': move.uci(), 'evaluation': round(eval_score / 100.0, 2), 'depth_searched': 0, 'nodes_evaluated': 1, 'time_taken': 0, 'pv': [move.uci()] } def validate_fen(self, fen: str) -> bool: try: chess.Board(fen) return True except: return False def get_model_size(self) -> float: return self.evaluator.get_model_size_mb()