387 lines
12 KiB
Python
387 lines
12 KiB
Python
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
import signal
|
|
from collections import deque
|
|
import json
|
|
from typing import Optional, Deque
|
|
|
|
from docopt import docopt
|
|
|
|
from .core import BerghainClient
|
|
from .policy import load_policy, PolicyLoadError, default_policy_path
|
|
from .config import VENUE_CAP, REJECT_CAP
|
|
from .state import GameStatus
|
|
from .ui import ProgressiveRenderer
|
|
|
|
USAGE = """berghain
|
|
|
|
Usage:
|
|
berghain set-player <player>
|
|
berghain new [--scenario=<n>]
|
|
berghain play [--interactive] [--verbose]
|
|
berghain step [--verbose]
|
|
berghain list
|
|
berghain status [<idprefix>]
|
|
berghain switch <idprefix>
|
|
berghain set-policy (default | <file>)
|
|
berghain check-policy <file>
|
|
berghain (-h | --help)
|
|
berghain --version
|
|
|
|
Options:
|
|
-h --help Show this screen.
|
|
--version Show version.
|
|
--scenario=<n> Scenario number (1, 2, or 3).
|
|
--interactive Pause for Enter between steps.
|
|
--verbose Send raw transcript lines to stderr (policy debug prints too).
|
|
"""
|
|
|
|
TEMPLATE_POLICY = """# policy.py (user template)
|
|
# Write your own decision logic. Must define:
|
|
# decide(attributes, tallies, mins, admitted_count, venue_cap, **kwargs) -> bool
|
|
#
|
|
# Hints:
|
|
# - 'attributes' are booleans for the current person.
|
|
# - 'tallies' counts already-admitted attributes.
|
|
# - 'mins' are required minimum counts per attribute.
|
|
# - You may optionally accept: p (relative frequencies), corr (correlation matrix).
|
|
|
|
def decide(attributes: dict[str, bool],
|
|
tallies: dict[str, int],
|
|
mins: dict[str, int],
|
|
admitted_count: int,
|
|
venue_cap: int,
|
|
**kwargs) -> bool:
|
|
# Example starter: admit only if they help an unmet attribute.
|
|
need = {a: max(0, mins[a] - tallies.get(a, 0)) for a in mins}
|
|
if sum(need.values()) == 0:
|
|
return True
|
|
return any(attributes.get(a, False) and need[a] > 0 for a in mins)
|
|
"""
|
|
|
|
|
|
# -------- transcript helpers --------
|
|
|
|
def _raw_line(entry: dict, mins: dict[str, int]) -> str:
|
|
idx = entry.get("personIndex")
|
|
accept = entry.get("accept")
|
|
attrs = entry.get("attributes", {})
|
|
need_before = entry.get("remaining_need_before", {})
|
|
xs = sorted([k for k, v in attrs.items() if v])
|
|
helped = sorted([a for a in xs if need_before.get(a, 0) > 0])
|
|
return f"#{idx:>5} {'ACCEPT' if accept else 'reject':<6} attrs: {', '.join(xs) if xs else '∅':<24} helped: {', '.join(helped) if helped else 'none'}"
|
|
|
|
def _stderr(s: str) -> None:
|
|
print(s, file=sys.stderr, flush=True)
|
|
|
|
# -------- signal handling (finish current person, then exit) --------
|
|
|
|
class _SoftSigint:
|
|
"""Install a SIGINT handler that sets a flag and does not raise KeyboardInterrupt.
|
|
Use .armed and .disarm to toggle around blocking input.
|
|
"""
|
|
def __init__(self) -> None:
|
|
self._old = None
|
|
self.triggered = False
|
|
|
|
def _handler(self, signum, frame):
|
|
self.triggered = True
|
|
|
|
def arm(self) -> None:
|
|
self.triggered = False
|
|
self._old = signal.getsignal(signal.SIGINT)
|
|
signal.signal(signal.SIGINT, self._handler)
|
|
# Restart interrupted system calls rather than raising:
|
|
try:
|
|
signal.siginterrupt(signal.SIGINT, False)
|
|
except AttributeError:
|
|
pass # not available on some platforms
|
|
|
|
def disarm(self) -> None:
|
|
if self._old is not None:
|
|
signal.signal(signal.SIGINT, self._old)
|
|
self._old = None
|
|
|
|
# -------- commands --------
|
|
|
|
def _print_game_summary(rec) -> None:
|
|
mins = rec.constraints.mins
|
|
print(f"Game: {rec.game_id}")
|
|
print(f" Scenario: {rec.scenario}")
|
|
print(f" Status: {rec.status}")
|
|
print(f" Admitted: {rec.admitted_count}")
|
|
print(f" Rejected: {rec.rejected_count}")
|
|
print(f" Policy: {rec.policy_path or '(none)'}")
|
|
if mins:
|
|
print(" Constraints:")
|
|
for a, m in mins.items():
|
|
have = rec.tallies.get(a, 0)
|
|
print(f" - {a}: {have}/{m}")
|
|
print()
|
|
|
|
def _ensure_template(path: str) -> None:
|
|
dirpath = os.path.dirname(os.path.abspath(path)) or "."
|
|
os.makedirs(dirpath, exist_ok=True)
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
f.write(TEMPLATE_POLICY)
|
|
|
|
def cmd_set_player(player: str) -> int:
|
|
bc = BerghainClient()
|
|
bc.set_player(player)
|
|
print(f"Player set: {player}")
|
|
return 0
|
|
|
|
def cmd_new_game(scenario: int) -> int:
|
|
bc = BerghainClient()
|
|
if scenario not in (-1, 1, 2, 3):
|
|
print("--scenario must be 1, 2, or 3.", file=sys.stderr)
|
|
return 2
|
|
rec = bc.new_game(scenario=scenario)
|
|
_print_game_summary(rec)
|
|
print("Tip: attach a policy with `berghain set-policy ./my_policy.py` (required before `play`).")
|
|
return 0
|
|
|
|
def cmd_status(idprefix: str | None) -> int:
|
|
bc = BerghainClient()
|
|
rec = None
|
|
if idprefix:
|
|
rec = bc.gs.get_game(idprefix)
|
|
if not rec:
|
|
print("No unique match for that id prefix.", file=sys.stderr)
|
|
return 1
|
|
else:
|
|
rec = bc.gs.current_game()
|
|
if not rec:
|
|
print("No current game. Start one with `berghain new`.", file=sys.stderr)
|
|
return 1
|
|
|
|
_print_game_summary(rec)
|
|
print("Initial conditions:")
|
|
print(json.dumps(bc.gs.current_game().initial_raw, indent=2))
|
|
return 0
|
|
|
|
|
|
def cmd_set_policy(path: str) -> int:
|
|
bc = BerghainClient()
|
|
rec = bc.gs.current_game()
|
|
if not rec:
|
|
print("No current game. Start one with `berghain new`.", file=sys.stderr)
|
|
return 1
|
|
|
|
if path.strip().lower() == "default":
|
|
try:
|
|
load_policy(default_policy_path())
|
|
except PolicyLoadError as e:
|
|
print(f"Default policy error: {e}", file=sys.stderr)
|
|
return 2
|
|
rec = bc.set_policy_for_current_game(default_policy_path())
|
|
print(f"Policy set for game {rec.game_id}: {rec.policy_path}")
|
|
return 0
|
|
|
|
if not os.path.exists(path):
|
|
print(f"Policy file not found, creating template: {path}")
|
|
try:
|
|
_ensure_template(path)
|
|
except OSError as e:
|
|
print(f"Could not create template policy: {e}", file=sys.stderr)
|
|
return 2
|
|
try:
|
|
load_policy(path)
|
|
except PolicyLoadError as e:
|
|
print(f"Policy error: {e}", file=sys.stderr)
|
|
return 2
|
|
rec = bc.set_policy_for_current_game(path)
|
|
print(f"Policy set for game {rec.game_id}: {rec.policy_path}")
|
|
return 0
|
|
|
|
def cmd_check_policy(path: str) -> int:
|
|
if path.strip().lower() == "default":
|
|
try:
|
|
load_policy(default_policy_path())
|
|
except PolicyLoadError as e:
|
|
print(f"Invalid default policy: {e}", file=sys.stderr)
|
|
return 2
|
|
print("Default policy looks good.")
|
|
return 0
|
|
elif not os.path.exists(path):
|
|
print(f"Policy file not found, creating template: {path}")
|
|
try:
|
|
_ensure_template(path)
|
|
except OSError as e:
|
|
print(f"Could not create template policy: {e}", file=sys.stderr)
|
|
return 2
|
|
try:
|
|
load_policy(path)
|
|
except PolicyLoadError as e:
|
|
print(f"Invalid policy: {e}", file=sys.stderr)
|
|
return 2
|
|
print("Policy looks good.")
|
|
return 0
|
|
|
|
def cmd_step(verbose: bool) -> int:
|
|
bc = BerghainClient()
|
|
rec = bc.gs.current_game()
|
|
if not rec:
|
|
print("No current game. Start one with `berghain new`.", file=sys.stderr)
|
|
return 1
|
|
if not rec.policy_path:
|
|
print("This game has no policy set. Use `berghain set-policy ./policy.py` first.", file=sys.stderr)
|
|
return 2
|
|
|
|
last5: Deque[str] = deque(maxlen=5)
|
|
renderer = ProgressiveRenderer(rec.constraints.mins, VENUE_CAP, REJECT_CAP)
|
|
renderer.begin() # reserve and print the block FIRST
|
|
|
|
# Soft SIGINT so the current network call finishes
|
|
guard = _SoftSigint()
|
|
guard.arm()
|
|
try:
|
|
entry = bc.step(rec)
|
|
except PolicyLoadError as e:
|
|
print(f"Policy error: {e}", file=sys.stderr)
|
|
guard.disarm()
|
|
return 2
|
|
finally:
|
|
guard.disarm()
|
|
|
|
# Update UI
|
|
line = _raw_line(entry, rec.constraints.mins)
|
|
if verbose:
|
|
_stderr(line)
|
|
last5.append(line)
|
|
renderer.render(last5, rec.tallies,
|
|
entry.get("admitted_count", rec.admitted_count),
|
|
entry.get("rejected_count", rec.rejected_count))
|
|
|
|
# If SIGINT occurred during the step, exit now (current person finished)
|
|
if guard.triggered:
|
|
_stderr("Interrupted. Finished current person; exiting step…")
|
|
_print_game_summary(rec)
|
|
return 130
|
|
|
|
# If game ended, print summary
|
|
if entry.get("status") != GameStatus.RUNNING:
|
|
_print_game_summary(rec)
|
|
|
|
return 0
|
|
|
|
|
|
def cmd_play(interactive: bool, verbose: bool) -> int:
|
|
bc = BerghainClient()
|
|
rec = bc.gs.current_game()
|
|
if not rec:
|
|
print("No current game. Start one with `berghain new`.", file=sys.stderr)
|
|
return 1
|
|
if not rec.policy_path:
|
|
print("This game has no policy set. Use `berghain set-policy ./policy.py` first.", file=sys.stderr)
|
|
return 2
|
|
|
|
last5: Deque[str] = deque(maxlen=5)
|
|
renderer = ProgressiveRenderer(rec.constraints.mins, VENUE_CAP, REJECT_CAP)
|
|
renderer.begin() # reserve the block before any stdout prints
|
|
|
|
_stderr("Autoplaying… (Ctrl-C to stop after finishing the current person)")
|
|
|
|
guard = _SoftSigint()
|
|
guard.arm()
|
|
|
|
try:
|
|
while rec.status == GameStatus.RUNNING and rec.admitted_count < VENUE_CAP:
|
|
if rec.rejected_count >= 20000:
|
|
break
|
|
|
|
try:
|
|
entry = bc.step(rec) # soft SIGINT -> flag set, call completes
|
|
except PolicyLoadError as e:
|
|
print(f"Policy error: {e}", file=sys.stderr)
|
|
return 2
|
|
|
|
line = _raw_line(entry, rec.constraints.mins)
|
|
if verbose:
|
|
_stderr(line)
|
|
last5.append(line)
|
|
renderer.render(last5, rec.tallies,
|
|
entry.get("admitted_count", rec.admitted_count),
|
|
entry.get("rejected_count", rec.rejected_count))
|
|
|
|
if guard.triggered:
|
|
_stderr("Interrupted. Finished current person; exiting autoplay…")
|
|
break
|
|
|
|
if rec.status != GameStatus.RUNNING:
|
|
break
|
|
|
|
if interactive:
|
|
# Allow Ctrl-C to abort prompt immediately (no in-flight request)
|
|
guard.disarm()
|
|
try:
|
|
input("Press Enter for next…")
|
|
except KeyboardInterrupt:
|
|
_stderr("Interrupted at prompt. Exiting after finishing current person…")
|
|
break
|
|
finally:
|
|
guard.arm()
|
|
|
|
finally:
|
|
guard.disarm()
|
|
bc.gs.save()
|
|
|
|
_print_game_summary(rec)
|
|
return 0
|
|
|
|
|
|
def cmd_list() -> int:
|
|
bc = BerghainClient()
|
|
games = bc.gs.list_games()
|
|
if not games:
|
|
print("No games found.")
|
|
return 0
|
|
for gid, (status, scenario, admitted, rejected) in games.items():
|
|
star = "*" if bc.gs.current_game_id == gid else " "
|
|
print(f"{star} {gid} scen={scenario} status={status} A={admitted} R={rejected}")
|
|
return 0
|
|
|
|
def cmd_switch(idprefix: str) -> int:
|
|
bc = BerghainClient()
|
|
ok = bc.gs.set_current_game(idprefix)
|
|
if not ok:
|
|
print("No unique match for that id prefix.", file=sys.stderr)
|
|
return 1
|
|
bc.gs.save()
|
|
print(f"Switched current game to: {bc.gs.current_game_id}")
|
|
return 0
|
|
|
|
def main(argv: Optional[list[str]] = None) -> int:
|
|
args = docopt(USAGE, argv=argv, version="berghain 0.0.1")
|
|
if args["set-player"]:
|
|
return cmd_set_player(args["<player>"])
|
|
elif args["new"]:
|
|
scen = int(args["--scenario"] or -1)
|
|
if scen not in (-1, 1, 2, 3):
|
|
print("--scenario must be 1, 2, or 3.", file=sys.stderr)
|
|
return 2
|
|
return cmd_new_game(scen)
|
|
elif args["play"]:
|
|
return cmd_play(bool(args["--interactive"]), bool(args["--verbose"]))
|
|
elif args["step"]:
|
|
return cmd_step(bool(args["--verbose"]))
|
|
elif args["list"]:
|
|
return cmd_list()
|
|
elif args["status"]:
|
|
return cmd_status(args["<idprefix>"])
|
|
elif args["switch"]:
|
|
return cmd_switch(args["<idprefix>"])
|
|
elif args["set-policy"]:
|
|
if args["default"]:
|
|
return cmd_set_policy("default")
|
|
else:
|
|
return cmd_set_policy(args["<file>"])
|
|
elif args["check-policy"]:
|
|
return cmd_check_policy(args["<file>"])
|
|
else:
|
|
print("Unknown command", file=sys.stderr)
|
|
return 1
|
|
|