Rafs-an09002's picture
Update engine/search.py
33b3ec2 verified
"""
Nexus-Nano Search Engine
Fast alpha-beta with minimal overhead
Focus: Speed > Depth
Target: Sub-second responses
"""
import chess
import logging
from typing import Optional, Tuple, List, Dict
from .evaluate import NexusNanoEvaluator
from .transposition import TranspositionTable, NodeType
from .move_ordering import MoveOrderer
from .time_manager import TimeManager
from .endgame import EndgameDetector
logger = logging.getLogger(__name__)
class NexusNanoEngine:
"""Ultra-fast 2.8M parameter chess engine"""
MATE_SCORE = 100000
MAX_PLY = 100
def __init__(self, model_path: str, num_threads: int = 1):
"""Initialize with single-threaded config"""
self.evaluator = NexusNanoEvaluator(model_path, num_threads)
self.tt = TranspositionTable(size_mb=64) # 64MB TT
self.move_orderer = MoveOrderer()
self.time_manager = TimeManager()
self.endgame_detector = EndgameDetector()
self.nodes_evaluated = 0
self.depth_reached = 0
self.sel_depth = 0
self.principal_variation = []
logger.info("⚡ Nexus-Nano Engine initialized")
logger.info(f" Model: {self.evaluator.get_model_size_mb():.2f} MB")
logger.info(f" TT: 64 MB")
def get_best_move(
self,
fen: str,
depth: int = 4,
time_limit: int = 2000
) -> Dict:
"""
Fast move search
Args:
fen: Position
depth: Max depth (1-6 recommended)
time_limit: Time in ms
"""
board = chess.Board(fen)
# Reset
self.nodes_evaluated = 0
self.depth_reached = 0
self.sel_depth = 0
self.principal_variation = []
# Time setup
time_limit_sec = time_limit / 1000.0
self.time_manager.start_search(time_limit_sec, time_limit_sec)
# Clear old data
self.move_orderer.clear()
self.tt.increment_age()
# Special cases
legal_moves = list(board.legal_moves)
if len(legal_moves) == 0:
return self._no_legal_moves()
if len(legal_moves) == 1:
return self._single_move(board, legal_moves[0])
# Iterative deepening (fast)
best_move = legal_moves[0]
best_score = float('-inf')
for current_depth in range(1, depth + 1):
if self.time_manager.should_stop(current_depth):
break
score, move, pv = self._search_root(
board, current_depth, float('-inf'), float('inf')
)
if move:
best_move = move
best_score = score
self.depth_reached = current_depth
self.principal_variation = pv
return {
'best_move': best_move.uci(),
'evaluation': round(best_score / 100.0, 2),
'depth_searched': self.depth_reached,
'seldepth': self.sel_depth,
'nodes_evaluated': self.nodes_evaluated,
'time_taken': int(self.time_manager.elapsed() * 1000),
'pv': [m.uci() for m in self.principal_variation],
'nps': int(self.nodes_evaluated / max(self.time_manager.elapsed(), 0.001)),
'tt_stats': self.tt.get_stats(),
'move_ordering_stats': self.move_orderer.get_stats()
}
def _search_root(
self,
board: chess.Board,
depth: int,
alpha: float,
beta: float
) -> Tuple[float, Optional[chess.Move], List[chess.Move]]:
"""Root search"""
legal_moves = list(board.legal_moves)
# TT probe
zobrist_key = self.tt.compute_zobrist_key(board)
tt_result = self.tt.probe(zobrist_key, depth, alpha, beta)
tt_move = tt_result[1] if tt_result else None
# Order moves
ordered_moves = self.move_orderer.order_moves(
board, legal_moves, depth, tt_move
)
best_move = ordered_moves[0]
best_score = float('-inf')
best_pv = []
for move in ordered_moves:
board.push(move)
score, pv = self._alpha_beta(board, depth - 1, -beta, -alpha)
score = -score
board.pop()
if score > best_score:
best_score = score
best_move = move
best_pv = [move] + pv
if score > alpha:
alpha = score
if self.time_manager.should_stop(depth):
break
self.tt.store(zobrist_key, depth, best_score, NodeType.EXACT, best_move)
return best_score, best_move, best_pv
def _alpha_beta(
self,
board: chess.Board,
depth: int,
alpha: float,
beta: float
) -> Tuple[float, List[chess.Move]]:
"""Fast alpha-beta search"""
self.sel_depth = max(self.sel_depth, self.MAX_PLY - depth)
# Draw check
if board.is_repetition(2) or board.is_fifty_moves():
return 0, []
# TT probe
zobrist_key = self.tt.compute_zobrist_key(board)
tt_result = self.tt.probe(zobrist_key, depth, alpha, beta)
if tt_result and tt_result[0] is not None:
return tt_result[0], []
tt_move = tt_result[1] if tt_result else None
# Quiescence
if depth <= 0:
return self._quiescence(board, alpha, beta, 0), []
# Legal moves
legal_moves = list(board.legal_moves)
if not legal_moves:
if board.is_check():
return -self.MATE_SCORE + (self.MAX_PLY - depth), []
return 0, []
ordered_moves = self.move_orderer.order_moves(
board, legal_moves, depth, tt_move
)
# Search
best_score = float('-inf')
best_pv = []
node_type = NodeType.UPPER_BOUND
for move in ordered_moves:
board.push(move)
score, pv = self._alpha_beta(board, depth - 1, -beta, -alpha)
score = -score
board.pop()
if score > best_score:
best_score = score
best_pv = [move] + pv
if score > alpha:
alpha = score
node_type = NodeType.EXACT
if not board.is_capture(move):
self.move_orderer.update_killer_move(move, depth)
if score >= beta:
node_type = NodeType.LOWER_BOUND
break
self.tt.store(zobrist_key, depth, best_score, node_type, best_pv[0] if best_pv else None)
return best_score, best_pv
def _quiescence(
self,
board: chess.Board,
alpha: float,
beta: float,
qs_depth: int
) -> float:
"""Fast quiescence (captures only)"""
self.nodes_evaluated += 1
# Stand-pat
stand_pat = self.evaluator.evaluate_hybrid(board)
stand_pat = self.endgame_detector.adjust_evaluation(board, stand_pat)
if stand_pat >= beta:
return beta
if alpha < stand_pat:
alpha = stand_pat
# Depth limit
if qs_depth >= 6:
return stand_pat
# Captures only (no checks for speed)
captures = [m for m in board.legal_moves if board.is_capture(m)]
if not captures:
return stand_pat
captures = self.move_orderer.order_moves(board, captures, 0)
for move in captures:
board.push(move)
score = -self._quiescence(board, -beta, -alpha, qs_depth + 1)
board.pop()
if score >= beta:
return beta
if score > alpha:
alpha = score
return alpha
def _no_legal_moves(self) -> Dict:
return {
'best_move': '0000',
'evaluation': 0.0,
'depth_searched': 0,
'nodes_evaluated': 0,
'time_taken': 0
}
def _single_move(self, board: chess.Board, move: chess.Move) -> Dict:
eval_score = self.evaluator.evaluate_hybrid(board)
return {
'best_move': move.uci(),
'evaluation': round(eval_score / 100.0, 2),
'depth_searched': 0,
'nodes_evaluated': 1,
'time_taken': 0,
'pv': [move.uci()]
}
def validate_fen(self, fen: str) -> bool:
try:
chess.Board(fen)
return True
except:
return False
def get_model_size(self) -> float:
return self.evaluator.get_model_size_mb()