2025-09-03 03:34:30 -07:00

387 lines
12 KiB
Python

from __future__ import annotations
import os
import sys
import signal
from collections import deque
import json
from typing import Optional, Deque
from docopt import docopt
from .core import BerghainClient
from .policy import load_policy, PolicyLoadError, default_policy_path
from .config import VENUE_CAP, REJECT_CAP
from .state import GameStatus
from .ui import ProgressiveRenderer
USAGE = """berghain
Usage:
berghain set-player <player>
berghain new [--scenario=<n>]
berghain play [--interactive] [--verbose]
berghain step [--verbose]
berghain list
berghain status [<idprefix>]
berghain switch <idprefix>
berghain set-policy (default | <file>)
berghain check-policy <file>
berghain (-h | --help)
berghain --version
Options:
-h --help Show this screen.
--version Show version.
--scenario=<n> Scenario number (1, 2, or 3).
--interactive Pause for Enter between steps.
--verbose Send raw transcript lines to stderr (policy debug prints too).
"""
TEMPLATE_POLICY = """# policy.py (user template)
# Write your own decision logic. Must define:
# decide(attributes, tallies, mins, admitted_count, venue_cap, **kwargs) -> bool
#
# Hints:
# - 'attributes' are booleans for the current person.
# - 'tallies' counts already-admitted attributes.
# - 'mins' are required minimum counts per attribute.
# - You may optionally accept: p (relative frequencies), corr (correlation matrix).
def decide(attributes: dict[str, bool],
tallies: dict[str, int],
mins: dict[str, int],
admitted_count: int,
venue_cap: int,
**kwargs) -> bool:
# Example starter: admit only if they help an unmet attribute.
need = {a: max(0, mins[a] - tallies.get(a, 0)) for a in mins}
if sum(need.values()) == 0:
return True
return any(attributes.get(a, False) and need[a] > 0 for a in mins)
"""
# -------- transcript helpers --------
def _raw_line(entry: dict, mins: dict[str, int]) -> str:
idx = entry.get("personIndex")
accept = entry.get("accept")
attrs = entry.get("attributes", {})
need_before = entry.get("remaining_need_before", {})
xs = sorted([k for k, v in attrs.items() if v])
helped = sorted([a for a in xs if need_before.get(a, 0) > 0])
return f"#{idx:>5} {'ACCEPT' if accept else 'reject':<6} attrs: {', '.join(xs) if xs else '':<24} helped: {', '.join(helped) if helped else 'none'}"
def _stderr(s: str) -> None:
print(s, file=sys.stderr, flush=True)
# -------- signal handling (finish current person, then exit) --------
class _SoftSigint:
"""Install a SIGINT handler that sets a flag and does not raise KeyboardInterrupt.
Use .armed and .disarm to toggle around blocking input.
"""
def __init__(self) -> None:
self._old = None
self.triggered = False
def _handler(self, signum, frame):
self.triggered = True
def arm(self) -> None:
self.triggered = False
self._old = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, self._handler)
# Restart interrupted system calls rather than raising:
try:
signal.siginterrupt(signal.SIGINT, False)
except AttributeError:
pass # not available on some platforms
def disarm(self) -> None:
if self._old is not None:
signal.signal(signal.SIGINT, self._old)
self._old = None
# -------- commands --------
def _print_game_summary(rec) -> None:
mins = rec.constraints.mins
print(f"Game: {rec.game_id}")
print(f" Scenario: {rec.scenario}")
print(f" Status: {rec.status}")
print(f" Admitted: {rec.admitted_count}")
print(f" Rejected: {rec.rejected_count}")
print(f" Policy: {rec.policy_path or '(none)'}")
if mins:
print(" Constraints:")
for a, m in mins.items():
have = rec.tallies.get(a, 0)
print(f" - {a}: {have}/{m}")
print()
def _ensure_template(path: str) -> None:
dirpath = os.path.dirname(os.path.abspath(path)) or "."
os.makedirs(dirpath, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(TEMPLATE_POLICY)
def cmd_set_player(player: str) -> int:
bc = BerghainClient()
bc.set_player(player)
print(f"Player set: {player}")
return 0
def cmd_new_game(scenario: int) -> int:
bc = BerghainClient()
if scenario not in (-1, 1, 2, 3):
print("--scenario must be 1, 2, or 3.", file=sys.stderr)
return 2
rec = bc.new_game(scenario=scenario)
_print_game_summary(rec)
print("Tip: attach a policy with `berghain set-policy ./my_policy.py` (required before `play`).")
return 0
def cmd_status(idprefix: str | None) -> int:
bc = BerghainClient()
rec = None
if idprefix:
rec = bc.gs.get_game(idprefix)
if not rec:
print("No unique match for that id prefix.", file=sys.stderr)
return 1
else:
rec = bc.gs.current_game()
if not rec:
print("No current game. Start one with `berghain new`.", file=sys.stderr)
return 1
_print_game_summary(rec)
print("Initial conditions:")
print(json.dumps(bc.gs.current_game().initial_raw, indent=2))
return 0
def cmd_set_policy(path: str) -> int:
bc = BerghainClient()
rec = bc.gs.current_game()
if not rec:
print("No current game. Start one with `berghain new`.", file=sys.stderr)
return 1
if path.strip().lower() == "default":
try:
load_policy(default_policy_path())
except PolicyLoadError as e:
print(f"Default policy error: {e}", file=sys.stderr)
return 2
rec = bc.set_policy_for_current_game(default_policy_path())
print(f"Policy set for game {rec.game_id}: {rec.policy_path}")
return 0
if not os.path.exists(path):
print(f"Policy file not found, creating template: {path}")
try:
_ensure_template(path)
except OSError as e:
print(f"Could not create template policy: {e}", file=sys.stderr)
return 2
try:
load_policy(path)
except PolicyLoadError as e:
print(f"Policy error: {e}", file=sys.stderr)
return 2
rec = bc.set_policy_for_current_game(path)
print(f"Policy set for game {rec.game_id}: {rec.policy_path}")
return 0
def cmd_check_policy(path: str) -> int:
if path.strip().lower() == "default":
try:
load_policy(default_policy_path())
except PolicyLoadError as e:
print(f"Invalid default policy: {e}", file=sys.stderr)
return 2
print("Default policy looks good.")
return 0
elif not os.path.exists(path):
print(f"Policy file not found, creating template: {path}")
try:
_ensure_template(path)
except OSError as e:
print(f"Could not create template policy: {e}", file=sys.stderr)
return 2
try:
load_policy(path)
except PolicyLoadError as e:
print(f"Invalid policy: {e}", file=sys.stderr)
return 2
print("Policy looks good.")
return 0
def cmd_step(verbose: bool) -> int:
bc = BerghainClient()
rec = bc.gs.current_game()
if not rec:
print("No current game. Start one with `berghain new`.", file=sys.stderr)
return 1
if not rec.policy_path:
print("This game has no policy set. Use `berghain set-policy ./policy.py` first.", file=sys.stderr)
return 2
last5: Deque[str] = deque(maxlen=5)
renderer = ProgressiveRenderer(rec.constraints.mins, VENUE_CAP, REJECT_CAP)
renderer.begin() # reserve and print the block FIRST
# Soft SIGINT so the current network call finishes
guard = _SoftSigint()
guard.arm()
try:
entry = bc.step(rec)
except PolicyLoadError as e:
print(f"Policy error: {e}", file=sys.stderr)
guard.disarm()
return 2
finally:
guard.disarm()
# Update UI
line = _raw_line(entry, rec.constraints.mins)
if verbose:
_stderr(line)
last5.append(line)
renderer.render(last5, rec.tallies,
entry.get("admitted_count", rec.admitted_count),
entry.get("rejected_count", rec.rejected_count))
# If SIGINT occurred during the step, exit now (current person finished)
if guard.triggered:
_stderr("Interrupted. Finished current person; exiting step…")
_print_game_summary(rec)
return 130
# If game ended, print summary
if entry.get("status") != GameStatus.RUNNING:
_print_game_summary(rec)
return 0
def cmd_play(interactive: bool, verbose: bool) -> int:
bc = BerghainClient()
rec = bc.gs.current_game()
if not rec:
print("No current game. Start one with `berghain new`.", file=sys.stderr)
return 1
if not rec.policy_path:
print("This game has no policy set. Use `berghain set-policy ./policy.py` first.", file=sys.stderr)
return 2
last5: Deque[str] = deque(maxlen=5)
renderer = ProgressiveRenderer(rec.constraints.mins, VENUE_CAP, REJECT_CAP)
renderer.begin() # reserve the block before any stdout prints
_stderr("Autoplaying… (Ctrl-C to stop after finishing the current person)")
guard = _SoftSigint()
guard.arm()
try:
while rec.status == GameStatus.RUNNING and rec.admitted_count < VENUE_CAP:
if rec.rejected_count >= 20000:
break
try:
entry = bc.step(rec) # soft SIGINT -> flag set, call completes
except PolicyLoadError as e:
print(f"Policy error: {e}", file=sys.stderr)
return 2
line = _raw_line(entry, rec.constraints.mins)
if verbose:
_stderr(line)
last5.append(line)
renderer.render(last5, rec.tallies,
entry.get("admitted_count", rec.admitted_count),
entry.get("rejected_count", rec.rejected_count))
if guard.triggered:
_stderr("Interrupted. Finished current person; exiting autoplay…")
break
if rec.status != GameStatus.RUNNING:
break
if interactive:
# Allow Ctrl-C to abort prompt immediately (no in-flight request)
guard.disarm()
try:
input("Press Enter for next…")
except KeyboardInterrupt:
_stderr("Interrupted at prompt. Exiting after finishing current person…")
break
finally:
guard.arm()
finally:
guard.disarm()
bc.gs.save()
_print_game_summary(rec)
return 0
def cmd_list() -> int:
bc = BerghainClient()
games = bc.gs.list_games()
if not games:
print("No games found.")
return 0
for gid, (status, scenario, admitted, rejected) in games.items():
star = "*" if bc.gs.current_game_id == gid else " "
print(f"{star} {gid} scen={scenario} status={status} A={admitted} R={rejected}")
return 0
def cmd_switch(idprefix: str) -> int:
bc = BerghainClient()
ok = bc.gs.set_current_game(idprefix)
if not ok:
print("No unique match for that id prefix.", file=sys.stderr)
return 1
bc.gs.save()
print(f"Switched current game to: {bc.gs.current_game_id}")
return 0
def main(argv: Optional[list[str]] = None) -> int:
args = docopt(USAGE, argv=argv, version="berghain 0.0.1")
if args["set-player"]:
return cmd_set_player(args["<player>"])
elif args["new"]:
scen = int(args["--scenario"] or -1)
if scen not in (-1, 1, 2, 3):
print("--scenario must be 1, 2, or 3.", file=sys.stderr)
return 2
return cmd_new_game(scen)
elif args["play"]:
return cmd_play(bool(args["--interactive"]), bool(args["--verbose"]))
elif args["step"]:
return cmd_step(bool(args["--verbose"]))
elif args["list"]:
return cmd_list()
elif args["status"]:
return cmd_status(args["<idprefix>"])
elif args["switch"]:
return cmd_switch(args["<idprefix>"])
elif args["set-policy"]:
if args["default"]:
return cmd_set_policy("default")
else:
return cmd_set_policy(args["<file>"])
elif args["check-policy"]:
return cmd_check_policy(args["<file>"])
else:
print("Unknown command", file=sys.stderr)
return 1