Source code for gomill.gtp_games

"""Run a game between two GTP engines."""

from gomill import __version__
from gomill.utils import *
from gomill.common import *
from gomill import gtp_controller
from gomill import handicap_layout
from gomill import boards
from gomill import sgf
from gomill.gtp_controller import BadGtpResponse, GtpChannelError

[docs]class Game_result(object): """Description of a game result. Public attributes: players -- map colour -> player code player_b -- player code player_w -- player code winning_player -- player code or None losing_player -- player code or None winning_colour -- 'b', 'w', or None losing_colour -- 'b', 'w', or None is_jigo -- bool is_forfeit -- bool sgf_result -- string describing the game's result (for sgf RE) detail -- additional information (string or None) game_id -- string or None cpu_times -- map player code -> float or None or '?'. Winning/losing colour and player are None for a jigo, unknown result, or void game. cpu_times are user time + system time. '?' means that gomill-cpu_time gave an error. Game_results are suitable for pickling. """ def __init__(self, players, winning_colour): self.players = players.copy() self.player_b = players['b'] self.player_w = players['w'] self.winning_colour = winning_colour self.winning_player = players.get(winning_colour) self.is_jigo = False self.is_forfeit = False self.game_id = None if winning_colour is None: self.sgf_result = "?" else: self.sgf_result = "%s+" % winning_colour.upper() self.detail = None self.cpu_times = {self.player_b : None, self.player_w : None} def __getstate__(self): return ( self.player_b, self.player_w, self.winning_colour, self.sgf_result, self.detail, self.is_forfeit, self.game_id, self.cpu_times, ) def __setstate__(self, state): (self.player_b, self.player_w, self.winning_colour, self.sgf_result, self.detail, self.is_forfeit, self.game_id, self.cpu_times, ) = state self.players = {'b' : self.player_b, 'w' : self.player_w} self.winning_player = self.players.get(self.winning_colour) self.is_jigo = (self.sgf_result == "0") def set_jigo(self): self.sgf_result = "0" self.is_jigo = True @property
[docs] def losing_colour(self): if self.winning_colour is None: return None return opponent_of(self.winning_colour)
@property
[docs] def losing_player(self): if self.winning_colour is None: return None return self.players.get(opponent_of(self.winning_colour))
[docs] def describe(self): """Return a short human-readable description of the result.""" if self.winning_colour is not None: s = "%s beat %s " % (self.winning_player, self.losing_player) else: s = "%s vs %s " % (self.players['b'], self.players['w']) if self.is_jigo: s += "jigo" else: s += self.sgf_result if self.detail is not None: s += " (%s)" % self.detail return s
def __repr__(self): return "<Game_result: %s>" % self.describe()
class Game(object): """A single game between two GTP engines. Instantiate with: board_size -- int komi -- float (default 0.0) move_limit -- int (default 1000) The 'commands' values are lists of strings, as for subprocess.Popen. Normal use: game = Game(...) game.set_player_code('b', ...) game.set_player_code('w', ...) game.use_internal_scorer() or game.allow_scorer(...) [optional] game.set_move_callback...() [optional] game.set_player_subprocess('b', ...) or set_player_controller('b', ...) game.set_player_subprocess('w', ...) or set_player_controller('w', ...) game.request_engine_descriptions() [optional] game.ready() game.set_handicap(...) [optional] game.run() game.close_players() game.make_sgf() or game.write_sgf(...) [optional] then retrieve the Game_result and moves. If neither use_internal_scorer() nor allow_scorer() is called, the game won't be scored. Public attributes for reading: players -- map colour -> player code game_id -- string or None result -- Game_result (None before the game is complete) moves -- list of tuples (colour, move, comment) move is a pair (row, col), or None for a pass player_scores -- map player code -> string or None engine_names -- map player code -> string engine_descriptions -- map player code -> string player_scores values are the response to the final_score GTP command (if the player was asked). Methods which communicate with engines may raise BadGtpResponse if the engine returns a failure response. Methods which communicate with engines will normally raise GtpChannelError if there is trouble communicating with the engine. But after the game result has been decided, they will set these errors aside; retrieve them with describe_late_errors(). This enforces a simple ko rule, but no superko rule. It accepts self-capture moves. """ def __init__(self, board_size, komi=0.0, move_limit=1000): self.players = {'b' : 'b', 'w' : 'w'} self.game_id = None self.controllers = {} self.claim_allowed = {'b' : False, 'w' : False} self.after_move_callback = None self.board_size = board_size self.komi = komi self.move_limit = move_limit self.allowed_scorers = [] self.internal_scorer = False self.handicap_compensation = "no" self.handicap = 0 self.first_player = "b" self.engine_names = {} self.engine_descriptions = {} self.moves = [] self.player_scores = {'b' : None, 'w' : None} self.additional_sgf_props = [] self.late_errors = [] self.handicap_stones = None self.result = None self.board = boards.Board(board_size) self.simple_ko_point = None ## Configuration methods (callable before set_player_...) def set_player_code(self, colour, player_code): """Specify a player code. player_code -- short ascii string The player codes are used to identify the players in game results, sgf files, and the error messages. Setting these is optional but strongly encouraged. If not explicitly set, they will just be 'b' and 'w'. Raises ValueError if both players are given the same code. """ s = str(player_code) if self.players[opponent_of(colour)] == s: raise ValueError("player codes must be distinct") self.players[colour] = s def set_game_id(self, game_id): """Specify a game id. game_id -- string The game id is reported in the game result, and used as a default game name in the SGF file. If you don't set it, it will have value None. """ self.game_id = str(game_id) def use_internal_scorer(self, handicap_compensation='no'): """Set the scoring method to internal. The internal scorer uses area score, assuming all stones alive. handicap_compensation -- 'no' (default), 'short', or 'full'. If handicap_compensation is 'full', one point is deducted from Black's score for each handicap stone; if handicap_compensation is 'short', one point is deducted from Black's score for each handicap stone except the first. (The number of handicap stones is taken from the parameter to set_handicap().) """ self.internal_scorer = True if handicap_compensation not in ('no', 'short', 'full'): raise ValueError("bad handicap_compensation value: %s" % handicap_compensation) self.handicap_compensation = handicap_compensation def allow_scorer(self, colour): """Allow the specified player to score the game. If this is called for both colours, both are asked to score. """ self.allowed_scorers.append(colour) def set_claim_allowed(self, colour, b=True): """Allow the specified player to claim a win. This will have no effect if the engine doesn't implement gomill-genmove_ex. """ self.claim_allowed[colour] = bool(b) def set_move_callback(self, fn): """Specify a callback function to be called after every move. This function is called after each move is played, including passes but not resignations, and not moves which triggered a forfeit. It is passed three parameters: colour, move, board move is a pair (row, col), or None for a pass Treat the board parameter as read-only. Exceptions raised from the callback will be propagated unchanged out of run(). """ self.after_move_callback = fn ## Channel methods def set_player_controller(self, colour, controller, check_protocol_version=True): """Specify a player using a Gtp_controller. controller -- Gtp_controller check_protocol_version -- bool (default True) By convention, the channel name should be 'player <player code>'. If check_protocol_version is true, rejects an engine that declares a GTP protocol version <> 2. Propagates GtpChannelError if there's an error checking the protocol version. """ self.controllers[colour] = controller if check_protocol_version: controller.check_protocol_version() def set_player_subprocess(self, colour, command, check_protocol_version=True, **kwargs): """Specify the a player as a subprocess. command -- list of strings (as for subprocess.Popen) check_protocol_version -- bool (default True) Additional keyword arguments are passed to the Subprocess_gtp_channel constructor. If check_protocol_version is true, rejects an engine that declares a GTP protocol version <> 2. Propagates GtpChannelError if there's an error creating the subprocess or checking the protocol version. """ try: channel = gtp_controller.Subprocess_gtp_channel(command, **kwargs) except GtpChannelError, e: raise GtpChannelError( "error starting subprocess for player %s:\n%s" % (self.players[colour], e)) controller = gtp_controller.Gtp_controller( channel, "player %s" % self.players[colour]) self.set_player_controller(colour, controller, check_protocol_version) def get_controller(self, colour): """Return the underlying Gtp_controller for the specified engine.""" return self.controllers[colour] def send_command(self, colour, command, *arguments): """Send the specified GTP command to one of the players. colour -- player to talk to ('b' or 'w') command -- gtp command name (string) arguments -- gtp arguments (strings) Returns the response as a string. Raises BadGtpResponse if the engine returns a failure response. You can use this at any time between set_player_...() and close_players(). """ return self.controllers[colour].do_command(command, *arguments) def maybe_send_command(self, colour, command, *arguments): """Send the specified GTP command, if supported. Variant of send_command(): if the command isn't supported by the engine, or gives a failure response, returns None. """ controller = self.controllers[colour] if controller.known_command(command): try: result = controller.do_command(command, *arguments) except BadGtpResponse: result = None else: result = None return result def known_command(self, colour, command): """Check whether the specified GTP command is supported.""" return self.controllers[colour].known_command(command) def close_players(self): """Close both controllers (if they're open). Retrieves the late errors for describe_late_errors(). If cpu times are not already set in the game result, sets them from the CPU usage of the engine subprocesses. """ for colour in ("b", "w"): controller = self.controllers.get(colour) if controller is None: continue controller.safe_close() self.late_errors += controller.retrieve_error_messages() self.update_cpu_times_from_channels() ## High-level methods def request_engine_descriptions(self): """Obtain the engines' name, version, and description by GTP. After you have called this, you can retrieve the results from the engine_names and engine_descriptions attributes. If this has been called, other methods will use the engine name and/or description when appropriate (ie, call this if you want proper engine names to appear in the SGF file). """ for colour in "b", "w": controller = self.controllers[colour] player = self.players[colour] short_s, long_s = gtp_controller.describe_engine(controller, player) self.engine_names[player] = short_s self.engine_descriptions[player] = long_s def ready(self): """Reset the engines' GTP game state (board size, contents, komi).""" for colour in "b", "w": controller = self.controllers[colour] controller.do_command("boardsize", str(self.board_size)) controller.do_command("clear_board") controller.do_command("komi", str(self.komi)) def set_handicap(self, handicap, is_free): """Initialise the board position for a handicap. Raises ValueError if the number of stones isn't valid (see GTP spec). Raises BadGtpResponse if there's an invalid respone to place_free_handicap or fixed_handicap. """ if is_free: max_points = handicap_layout.max_free_handicap_for_board_size( self.board_size) if not 2 <= handicap < max_points: raise ValueError vertices = self.send_command( "b", "place_free_handicap", str(handicap)) try: points = [move_from_vertex(vt, self.board_size) for vt in vertices.split(" ")] if None in points: raise ValueError("response included 'pass'") if len(set(points)) < len(points): raise ValueError("duplicate point") except ValueError, e: raise BadGtpResponse( "invalid response from place_free_handicap command " "to %s: %s" % (self.players["b"], e)) vertices = [format_vertex(point) for point in points] self.send_command("w", "set_free_handicap", *vertices) else: # May propagate ValueError points = handicap_layout.handicap_points(handicap, self.board_size) for colour in "b", "w": vertices = self.send_command( colour, "fixed_handicap", str(handicap)) try: seen_points = [move_from_vertex(vt, self.board_size) for vt in vertices.split(" ")] if set(seen_points) != set(points): raise ValueError except ValueError: raise BadGtpResponse( "bad response from fixed_handicap command " "to %s: %s" % (self.players[colour], vertices)) self.board.apply_setup(points, [], []) self.handicap = handicap self.additional_sgf_props.append(('HA', handicap)) self.handicap_stones = points self.first_player = "w" def _forfeit_to(self, winner, msg): self.winner = winner self.forfeited = True self.forfeit_reason = msg def _play_move(self, colour): opponent = opponent_of(colour) if (self.claim_allowed[colour] and self.known_command(colour, "gomill-genmove_ex")): genmove_command = ["gomill-genmove_ex", colour, "claim"] may_claim = True else: genmove_command = ["genmove", colour] may_claim = False try: move_s = self.send_command(colour, *genmove_command).lower() except BadGtpResponse, e: self._forfeit_to(opponent, str(e)) return if move_s == "resign": self.winner = opponent self.seen_resignation = True return if may_claim and move_s == "claim": self.winner = colour self.seen_claim = True return try: move = move_from_vertex(move_s, self.board_size) except ValueError: self._forfeit_to(opponent, "%s attempted ill-formed move %s" % ( self.players[colour], move_s)) return comment = self.maybe_send_command(colour, "gomill-explain_last_move") comment = sanitise_utf8(comment) if comment == "": comment = None if move is not None: self.pass_count = 0 if move == self.simple_ko_point: self._forfeit_to( opponent, "%s attempted move to ko-forbidden point %s" % ( self.players[colour], move_s)) return row, col = move try: self.simple_ko_point = self.board.play(row, col, colour) except ValueError: self._forfeit_to( opponent, "%s attempted move to occupied point %s" % ( self.players[colour], move_s)) return else: self.pass_count += 1 self.simple_ko_point = None try: self.send_command(opponent, "play", colour, move_s) except BadGtpResponse, e: if e.gtp_error_message == "illegal move": # we assume the move really was illegal, so 'colour' should lose self._forfeit_to(opponent, "%s claims move %s is illegal" % ( self.players[opponent], move_s)) else: self._forfeit_to(colour, str(e)) return self.moves.append((colour, move, comment)) if self.after_move_callback: self.after_move_callback(colour, move, self.board) def run(self): """Run a complete game between the two players. Sets self.moves and self.result. Sets CPU times in the game result if available via GTP. """ self.pass_count = 0 self.winner = None self.margin = None self.scorers_disagreed = False self.seen_resignation = False self.seen_claim = False self.forfeited = False self.hit_move_limit = False self.forfeit_reason = None self.passed_out = False player = self.first_player move_count = 0 while move_count < self.move_limit: self._play_move(player) if self.pass_count == 2: self.passed_out = True self.winner, self.margin, self.scorers_disagreed = \ self._score_game() break if self.winner is not None: break player = opponent_of(player) move_count += 1 else: self.hit_move_limit = True self.calculate_result() self.calculate_cpu_times() def fake_run(self, winner): """Set state variables as if the game had been run (for testing). You don't need to use set_player_{subprocess,controller} to call this. winner -- 'b' or 'w' """ self.winner = winner self.seen_resignation = False self.seen_claim = False self.forfeited = False self.hit_move_limit = False self.forfeit_reason = None self.passed_out = True self.margin = True self.scorers_disagreed = False self.calculate_result() def _score_game(self): is_disagreement = False if self.internal_scorer: score = self.board.area_score() - self.komi if self.handicap: if self.handicap_compensation == "full": score -= self.handicap elif self.handicap_compensation == "short": score -= (self.handicap - 1) if score > 0: winner = "b" margin = score elif score < 0: winner = "w" margin = -score else: winner = None margin = 0 else: winners = [] margins = [] for colour in self.allowed_scorers: final_score = self.maybe_send_command(colour, "final_score") if final_score is None: continue self.player_scores[colour] = final_score final_score = final_score.upper() if final_score == "0": winners.append(None) margins.append(0) continue if final_score.startswith("B+"): winners.append("b") elif final_score.startswith("W+"): winners.append("w") else: continue try: margin = float(final_score[2:]) if margin <= 0: margin = None except ValueError: margin = None margins.append(margin) if len(set(winners)) == 1: winner = winners[0] if len(set(margins)) == 1: margin = margins[0] else: margin = None else: if len(set(winners)) > 1: is_disagreement = True winner = None margin = None return winner, margin, is_disagreement def calculate_result(self): """Set self.result. You shouldn't normally call this directly. """ result = Game_result(self.players, self.winner) result.game_id = self.game_id if self.hit_move_limit: result.sgf_result = "Void" result.detail = "hit move limit" elif self.seen_resignation: result.sgf_result += "R" elif self.seen_claim: # Leave SGF result in form 'B+' result.detail = "claim" elif self.forfeited: result.sgf_result += "F" result.is_forfeit = True result.detail = "forfeit: %s" % self.forfeit_reason else: assert self.passed_out if self.winner is None: if self.margin == 0: result.set_jigo() elif self.scorers_disagreed: result.detail = "players disagreed" else: result.detail = "no score reported" elif self.margin is not None: result.sgf_result += format_float(self.margin) else: # Players returned something like 'B+?', # or disagreed about the margin # Leave SGF result in form 'B+' result.detail = "unknown margin" self.result = result def calculate_cpu_times(self): """Set CPU times in self.result. You shouldn't normally call this directly. """ # The ugliness with cpu_time '?' is to avoid using the cpu time reported # by channel close() for engines which claim to support gomill-cpu_time # but give an error. for colour in ('b', 'w'): cpu_time = None controller = self.controllers[colour] if controller.safe_known_command('gomill-cpu_time'): try: s = controller.safe_do_command('gomill-cpu_time') cpu_time = float(s) except (BadGtpResponse, ValueError, TypeError): cpu_time = "?" self.result.cpu_times[self.players[colour]] = cpu_time def update_cpu_times_from_channels(self): """Set CPU times in self.result from the channel resource usage. There's normally no need to call this directly: close_players() will do it. Has no effect if CPU times have already been set. """ for colour in ('b', 'w'): controller = self.controllers.get(colour) if controller is None: continue ru = controller.channel.resource_usage if (ru is not None and self.result is not None and self.result.cpu_times[self.players[colour]] is None): self.result.cpu_times[self.players[colour]] = \ ru.ru_utime + ru.ru_stime def describe_late_errors(self): """Retrieve the late error messages. Returns a string, or None if there were no late errors. This is only available after close_players() has been called. The late errors are low-level errors which occurred after the game result was decided and so were set asied. In particular, they include any errors from closing (including failure responses from the final 'quit' command) """ if not self.late_errors: return None return "\n".join(self.late_errors) def describe_scoring(self): """Return a multiline string describing the game's scoring.""" if self.result is None: return "" def normalise_score(s): s = s.upper() if s.endswith(".0"): s = s[:-2] return s l = [self.result.describe()] sgf_result = self.result.sgf_result score_b = self.player_scores['b'] score_w = self.player_scores['w'] if ((score_b is not None and normalise_score(score_b) != sgf_result) or (score_w is not None and normalise_score(score_w) != sgf_result)): for colour, score in (('b', score_b), ('w', score_w)): if score is not None: l.append("%s final_score: %s" % (self.players[colour], score)) return "\n".join(l) def make_sgf(self, game_end_message=None): """Return an SGF description of the game. Returns an Sgf_game object. game_end_message -- optional string to put in the final comment. If game_end_message is specified, it appears before the text describing 'late errors'. """ sgf_game = sgf.Sgf_game(self.board_size) root = sgf_game.get_root() root.set('KM', self.komi) root.set('AP', ("gomill", __version__)) for prop, value in self.additional_sgf_props: root.set(prop, value) sgf_game.set_date() if self.engine_names: root.set('PB', self.engine_names[self.players['b']]) root.set('PW', self.engine_names[self.players['w']]) if self.game_id: root.set('GN', self.game_id) if self.handicap_stones: root.set_setup_stones(black=self.handicap_stones, white=[]) for colour, move, comment in self.moves: node = sgf_game.extend_main_sequence() node.set_move(colour, move) if comment is not None: node.set("C", comment) last_node = sgf_game.get_last_node() if self.result is not None: root.set('RE', self.result.sgf_result) last_node.add_comment_text(self.describe_scoring()) if game_end_message is not None: last_node.add_comment_text(game_end_message) late_error_messages = self.describe_late_errors() if late_error_messages is not None: last_node.add_comment_text(late_error_messages) return sgf_game def write_sgf(self, pathname, game_end_message=None): """Write an SGF description of the game to the specified pathname.""" sgf_game = self.make_sgf(game_end_message) f = open(pathname, "w") f.write(sgf_game.serialise()) f.close()