from __future__ import annotations import os import sys import signal from collections import deque import json from typing import Optional, Deque from docopt import docopt from .core import BerghainClient from .policy import load_policy, PolicyLoadError, default_policy_path from .config import VENUE_CAP, REJECT_CAP from .state import GameStatus from .ui import ProgressiveRenderer USAGE = """berghain Usage: berghain set-player berghain new [--scenario=] berghain play [--interactive] [--verbose] berghain step [--verbose] berghain list berghain status [] berghain switch berghain set-policy (default | ) berghain check-policy berghain (-h | --help) berghain --version Options: -h --help Show this screen. --version Show version. --scenario= Scenario number (1, 2, or 3). --interactive Pause for Enter between steps. --verbose Send raw transcript lines to stderr (policy debug prints too). """ TEMPLATE_POLICY = """# policy.py (user template) # Write your own decision logic. Must define: # decide(attributes, tallies, mins, admitted_count, venue_cap, **kwargs) -> bool # # Hints: # - 'attributes' are booleans for the current person. # - 'tallies' counts already-admitted attributes. # - 'mins' are required minimum counts per attribute. # - You may optionally accept: p (relative frequencies), corr (correlation matrix). def decide(attributes: dict[str, bool], tallies: dict[str, int], mins: dict[str, int], admitted_count: int, venue_cap: int, **kwargs) -> bool: # Example starter: admit only if they help an unmet attribute. need = {a: max(0, mins[a] - tallies.get(a, 0)) for a in mins} if sum(need.values()) == 0: return True return any(attributes.get(a, False) and need[a] > 0 for a in mins) """ # -------- transcript helpers -------- def _raw_line(entry: dict, mins: dict[str, int]) -> str: idx = entry.get("personIndex") accept = entry.get("accept") attrs = entry.get("attributes", {}) need_before = entry.get("remaining_need_before", {}) xs = sorted([k for k, v in attrs.items() if v]) helped = sorted([a for a in xs if need_before.get(a, 0) > 0]) return f"#{idx:>5} {'ACCEPT' if accept else 'reject':<6} attrs: {', '.join(xs) if xs else '∅':<24} helped: {', '.join(helped) if helped else 'none'}" def _stderr(s: str) -> None: print(s, file=sys.stderr, flush=True) # -------- signal handling (finish current person, then exit) -------- class _SoftSigint: """Install a SIGINT handler that sets a flag and does not raise KeyboardInterrupt. Use .armed and .disarm to toggle around blocking input. """ def __init__(self) -> None: self._old = None self.triggered = False def _handler(self, signum, frame): self.triggered = True def arm(self) -> None: self.triggered = False self._old = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, self._handler) # Restart interrupted system calls rather than raising: try: signal.siginterrupt(signal.SIGINT, False) except AttributeError: pass # not available on some platforms def disarm(self) -> None: if self._old is not None: signal.signal(signal.SIGINT, self._old) self._old = None # -------- commands -------- def _print_game_summary(rec) -> None: mins = rec.constraints.mins print(f"Game: {rec.game_id}") print(f" Scenario: {rec.scenario}") print(f" Status: {rec.status}") print(f" Admitted: {rec.admitted_count}") print(f" Rejected: {rec.rejected_count}") print(f" Policy: {rec.policy_path or '(none)'}") if mins: print(" Constraints:") for a, m in mins.items(): have = rec.tallies.get(a, 0) print(f" - {a}: {have}/{m}") print() def _ensure_template(path: str) -> None: dirpath = os.path.dirname(os.path.abspath(path)) or "." os.makedirs(dirpath, exist_ok=True) with open(path, "w", encoding="utf-8") as f: f.write(TEMPLATE_POLICY) def cmd_set_player(player: str) -> int: bc = BerghainClient() bc.set_player(player) print(f"Player set: {player}") return 0 def cmd_new_game(scenario: int) -> int: bc = BerghainClient() if scenario not in (-1, 1, 2, 3): print("--scenario must be 1, 2, or 3.", file=sys.stderr) return 2 rec = bc.new_game(scenario=scenario) _print_game_summary(rec) print("Tip: attach a policy with `berghain set-policy ./my_policy.py` (required before `play`).") return 0 def cmd_status(idprefix: str | None) -> int: bc = BerghainClient() rec = None if idprefix: rec = bc.gs.get_game(idprefix) if not rec: print("No unique match for that id prefix.", file=sys.stderr) return 1 else: rec = bc.gs.current_game() if not rec: print("No current game. Start one with `berghain new`.", file=sys.stderr) return 1 _print_game_summary(rec) print("Initial conditions:") print(json.dumps(bc.gs.current_game().initial_raw, indent=2)) return 0 def cmd_set_policy(path: str) -> int: bc = BerghainClient() rec = bc.gs.current_game() if not rec: print("No current game. Start one with `berghain new`.", file=sys.stderr) return 1 if path.strip().lower() == "default": try: load_policy(default_policy_path()) except PolicyLoadError as e: print(f"Default policy error: {e}", file=sys.stderr) return 2 rec = bc.set_policy_for_current_game(default_policy_path()) print(f"Policy set for game {rec.game_id}: {rec.policy_path}") return 0 if not os.path.exists(path): print(f"Policy file not found, creating template: {path}") try: _ensure_template(path) except OSError as e: print(f"Could not create template policy: {e}", file=sys.stderr) return 2 try: load_policy(path) except PolicyLoadError as e: print(f"Policy error: {e}", file=sys.stderr) return 2 rec = bc.set_policy_for_current_game(path) print(f"Policy set for game {rec.game_id}: {rec.policy_path}") return 0 def cmd_check_policy(path: str) -> int: if path.strip().lower() == "default": try: load_policy(default_policy_path()) except PolicyLoadError as e: print(f"Invalid default policy: {e}", file=sys.stderr) return 2 print("Default policy looks good.") return 0 elif not os.path.exists(path): print(f"Policy file not found, creating template: {path}") try: _ensure_template(path) except OSError as e: print(f"Could not create template policy: {e}", file=sys.stderr) return 2 try: load_policy(path) except PolicyLoadError as e: print(f"Invalid policy: {e}", file=sys.stderr) return 2 print("Policy looks good.") return 0 def cmd_step(verbose: bool) -> int: bc = BerghainClient() rec = bc.gs.current_game() if not rec: print("No current game. Start one with `berghain new`.", file=sys.stderr) return 1 if not rec.policy_path: print("This game has no policy set. Use `berghain set-policy ./policy.py` first.", file=sys.stderr) return 2 last5: Deque[str] = deque(maxlen=5) renderer = ProgressiveRenderer(rec.constraints.mins, VENUE_CAP, REJECT_CAP) renderer.begin() # reserve and print the block FIRST # Soft SIGINT so the current network call finishes guard = _SoftSigint() guard.arm() try: entry = bc.step(rec) except PolicyLoadError as e: print(f"Policy error: {e}", file=sys.stderr) guard.disarm() return 2 finally: guard.disarm() # Update UI line = _raw_line(entry, rec.constraints.mins) if verbose: _stderr(line) last5.append(line) renderer.render(last5, rec.tallies, entry.get("admitted_count", rec.admitted_count), entry.get("rejected_count", rec.rejected_count)) # If SIGINT occurred during the step, exit now (current person finished) if guard.triggered: _stderr("Interrupted. Finished current person; exiting step…") _print_game_summary(rec) return 130 # If game ended, print summary if entry.get("status") != GameStatus.RUNNING: _print_game_summary(rec) return 0 def cmd_play(interactive: bool, verbose: bool) -> int: bc = BerghainClient() rec = bc.gs.current_game() if not rec: print("No current game. Start one with `berghain new`.", file=sys.stderr) return 1 if not rec.policy_path: print("This game has no policy set. Use `berghain set-policy ./policy.py` first.", file=sys.stderr) return 2 last5: Deque[str] = deque(maxlen=5) renderer = ProgressiveRenderer(rec.constraints.mins, VENUE_CAP, REJECT_CAP) renderer.begin() # reserve the block before any stdout prints _stderr("Autoplaying… (Ctrl-C to stop after finishing the current person)") guard = _SoftSigint() guard.arm() try: while rec.status == GameStatus.RUNNING and rec.admitted_count < VENUE_CAP: if rec.rejected_count >= 20000: break try: entry = bc.step(rec) # soft SIGINT -> flag set, call completes except PolicyLoadError as e: print(f"Policy error: {e}", file=sys.stderr) return 2 line = _raw_line(entry, rec.constraints.mins) if verbose: _stderr(line) last5.append(line) renderer.render(last5, rec.tallies, entry.get("admitted_count", rec.admitted_count), entry.get("rejected_count", rec.rejected_count)) if guard.triggered: _stderr("Interrupted. Finished current person; exiting autoplay…") break if rec.status != GameStatus.RUNNING: break if interactive: # Allow Ctrl-C to abort prompt immediately (no in-flight request) guard.disarm() try: input("Press Enter for next…") except KeyboardInterrupt: _stderr("Interrupted at prompt. Exiting after finishing current person…") break finally: guard.arm() finally: guard.disarm() bc.gs.save() _print_game_summary(rec) return 0 def cmd_list() -> int: bc = BerghainClient() games = bc.gs.list_games() if not games: print("No games found.") return 0 for gid, (status, scenario, admitted, rejected) in games.items(): star = "*" if bc.gs.current_game_id == gid else " " print(f"{star} {gid} scen={scenario} status={status} A={admitted} R={rejected}") return 0 def cmd_switch(idprefix: str) -> int: bc = BerghainClient() ok = bc.gs.set_current_game(idprefix) if not ok: print("No unique match for that id prefix.", file=sys.stderr) return 1 bc.gs.save() print(f"Switched current game to: {bc.gs.current_game_id}") return 0 def main(argv: Optional[list[str]] = None) -> int: args = docopt(USAGE, argv=argv, version="berghain 0.0.1") if args["set-player"]: return cmd_set_player(args[""]) elif args["new"]: scen = int(args["--scenario"] or -1) if scen not in (-1, 1, 2, 3): print("--scenario must be 1, 2, or 3.", file=sys.stderr) return 2 return cmd_new_game(scen) elif args["play"]: return cmd_play(bool(args["--interactive"]), bool(args["--verbose"])) elif args["step"]: return cmd_step(bool(args["--verbose"])) elif args["list"]: return cmd_list() elif args["status"]: return cmd_status(args[""]) elif args["switch"]: return cmd_switch(args[""]) elif args["set-policy"]: if args["default"]: return cmd_set_policy("default") else: return cmd_set_policy(args[""]) elif args["check-policy"]: return cmd_check_policy(args[""]) else: print("Unknown command", file=sys.stderr) return 1