added all files

This commit is contained in:
kay 2025-09-03 01:51:13 -07:00
parent 4cc4e341e3
commit bad007e799
11 changed files with 1136 additions and 0 deletions

2
berghain/__init__.py Normal file
View File

@ -0,0 +1,2 @@
__app_name__ = "berghain"
__version__ = "0.0.1"

3
berghain/__main__.py Normal file
View File

@ -0,0 +1,3 @@
from .cli import main
if __name__ == "__main__":
raise SystemExit(main())

50
berghain/api.py Normal file
View File

@ -0,0 +1,50 @@
from typing import Any
import requests
from . import __version__
from .config import BASE_URL
class APIRequestError(Exception):
"""Raised when the API request is malformed"""
class APIResponseError(Exception):
"""Raised when the API returns an error"""
class APIClient:
def __init__(self) -> None:
self.base_url = BASE_URL.rstrip("/")
self.session = requests.Session()
self.session.headers.update({"User-Agent": f"kaylee's berghain-cli/{__version__}"})
def new_game(self, player_id: str | None, scenario: int) -> dict[str, Any]:
"""
Start a new game for this player
"""
if player_id is None:
raise APIRequestError("A player ID is required")
return self.__do_request("new-game",{"scenario": scenario, "playerId": player_id}, "gameId")
def decide_and_next(self, game_id: str, person_index: int, accept: bool | None = None) -> dict[str, Any]:
"""Submit a decision for the current person and get the next one"""
params: dict[str, Any] = {"gameId": game_id, "personIndex": person_index}
if person_index > 0 and accept is None:
raise APIRequestError("You must specify whether to accept or reject this person")
if accept is not None:
params["accept"] = str(accept).lower()
return self.__do_request("decide-and-next", params, "status")
def __do_request(self, endpoint: str, params: dict[str, Any], check_for: str | None = None) -> dict[str, Any]:
resp = self.session.get(f"{self.base_url}/{endpoint}", params=params, timeout=10)
if not resp.ok:
raise APIResponseError(f"HTTP {resp.status_code}: {resp.text}")
try:
data = resp.json()
except ValueError as e:
raise APIResponseError(f"Invalid JSON in response: {resp.text}")
if check_for is not None and check_for not in data:
raise APIResponseError(f"Malformed response (key \"{check_for}\" not found): {data}")
return data

386
berghain/cli.py Normal file
View File

@ -0,0 +1,386 @@
from __future__ import annotations
import os
import sys
import signal
from collections import deque
import json
from typing import Optional, Deque
from docopt import docopt
from .core import BerghainClient
from .policy import load_policy, PolicyLoadError, default_policy_path
from .config import VENUE_CAP, REJECT_CAP
from .state import GameStatus
from .ui import ProgressiveRenderer
USAGE = """berghain
Usage:
berghain set-player <player>
berghain new [--scenario=<n>]
berghain play [--interactive] [--verbose]
berghain step [--verbose]
berghain list
berghain status [<idprefix>]
berghain switch <idprefix>
berghain set-policy (default | <file>)
berghain check-policy <file>
berghain (-h | --help)
berghain --version
Options:
-h --help Show this screen.
--version Show version.
--scenario=<n> Scenario number (1, 2, or 3). [default: 1]
--interactive Pause for Enter between steps.
--verbose Send raw transcript lines to stderr (policy debug prints too).
"""
TEMPLATE_POLICY = """# policy.py (user template)
# Write your own decision logic. Must define:
# decide(attributes, tallies, mins, admitted_count, venue_cap, **kwargs) -> bool
#
# Hints:
# - 'attributes' are booleans for the current person.
# - 'tallies' counts already-admitted attributes.
# - 'mins' are required minimum counts per attribute.
# - You may optionally accept: p (relative frequencies), corr (correlation matrix).
def decide(attributes: dict[str, bool],
tallies: dict[str, int],
mins: dict[str, int],
admitted_count: int,
venue_cap: int,
**kwargs) -> bool:
# Example starter: admit only if they help an unmet attribute.
need = {a: max(0, mins[a] - tallies.get(a, 0)) for a in mins}
if sum(need.values()) == 0:
return True
return any(attributes.get(a, False) and need[a] > 0 for a in mins)
"""
# -------- transcript helpers --------
def _raw_line(entry: dict, mins: dict[str, int]) -> str:
idx = entry.get("personIndex")
accept = entry.get("accept")
attrs = entry.get("attributes", {})
need_before = entry.get("remaining_need_before", {})
xs = sorted([k for k, v in attrs.items() if v])
helped = sorted([a for a in xs if need_before.get(a, 0) > 0])
return f"#{idx:>5} {'ACCEPT' if accept else 'reject':<6} attrs: {', '.join(xs) if xs else '':<24} helped: {', '.join(helped) if helped else 'none'}"
def _stderr(s: str) -> None:
print(s, file=sys.stderr, flush=True)
# -------- signal handling (finish current person, then exit) --------
class _SoftSigint:
"""Install a SIGINT handler that sets a flag and does not raise KeyboardInterrupt.
Use .armed and .disarm to toggle around blocking input.
"""
def __init__(self) -> None:
self._old = None
self.triggered = False
def _handler(self, signum, frame):
self.triggered = True
def arm(self) -> None:
self.triggered = False
self._old = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, self._handler)
# Restart interrupted system calls rather than raising:
try:
signal.siginterrupt(signal.SIGINT, False)
except AttributeError:
pass # not available on some platforms
def disarm(self) -> None:
if self._old is not None:
signal.signal(signal.SIGINT, self._old)
self._old = None
# -------- commands --------
def _print_game_summary(rec) -> None:
mins = rec.constraints.mins
print(f"Game: {rec.game_id}")
print(f" Scenario: {rec.scenario}")
print(f" Status: {rec.status}")
print(f" Admitted: {rec.admitted_count}")
print(f" Rejected: {rec.rejected_count}")
print(f" Policy: {rec.policy_path or '(none)'}")
if mins:
print(" Constraints:")
for a, m in mins.items():
have = rec.tallies.get(a, 0)
print(f" - {a}: {have}/{m}")
print()
def _ensure_template(path: str) -> None:
dirpath = os.path.dirname(os.path.abspath(path)) or "."
os.makedirs(dirpath, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write(TEMPLATE_POLICY)
def cmd_set_player(player: str) -> int:
bc = BerghainClient()
bc.set_player(player)
print(f"Player set: {player}")
return 0
def cmd_new_game(scenario: int) -> int:
bc = BerghainClient()
if scenario not in (1, 2, 3):
print("--scenario must be 1, 2, or 3.", file=sys.stderr)
return 2
rec = bc.new_game(scenario=scenario)
_print_game_summary(rec)
print("Tip: attach a policy with `berghain set-policy ./my_policy.py` (required before `play`).")
return 0
def cmd_status(idprefix: str | None) -> int:
bc = BerghainClient()
rec = None
if idprefix:
rec = bc.gs.get_game(idprefix)
if not rec:
print("No unique match for that id prefix.", file=sys.stderr)
return 1
else:
rec = bc.gs.current_game()
if not rec:
print("No current game. Start one with `berghain new`.", file=sys.stderr)
return 1
_print_game_summary(rec)
print("Initial conditions:")
print(json.dumps(bc.gs.current_game().initial_raw, indent=2))
return 0
def cmd_set_policy(path: str) -> int:
bc = BerghainClient()
rec = bc.gs.current_game()
if not rec:
print("No current game. Start one with `berghain new`.", file=sys.stderr)
return 1
if path.strip().lower() == "default":
try:
load_policy(default_policy_path())
except PolicyLoadError as e:
print(f"Default policy error: {e}", file=sys.stderr)
return 2
rec = bc.set_policy_for_current_game(default_policy_path())
print(f"Policy set for game {rec.game_id}: {rec.policy_path}")
return 0
if not os.path.exists(path):
print(f"Policy file not found, creating template: {path}")
try:
_ensure_template(path)
except OSError as e:
print(f"Could not create template policy: {e}", file=sys.stderr)
return 2
try:
load_policy(path)
except PolicyLoadError as e:
print(f"Policy error: {e}", file=sys.stderr)
return 2
rec = bc.set_policy_for_current_game(path)
print(f"Policy set for game {rec.game_id}: {rec.policy_path}")
return 0
def cmd_check_policy(path: str) -> int:
if path.strip().lower() == "default":
try:
load_policy(default_policy_path())
except PolicyLoadError as e:
print(f"Invalid default policy: {e}", file=sys.stderr)
return 2
print("Default policy looks good.")
return 0
elif not os.path.exists(path):
print(f"Policy file not found, creating template: {path}")
try:
_ensure_template(path)
except OSError as e:
print(f"Could not create template policy: {e}", file=sys.stderr)
return 2
try:
load_policy(path)
except PolicyLoadError as e:
print(f"Invalid policy: {e}", file=sys.stderr)
return 2
print("Policy looks good.")
return 0
def cmd_step(verbose: bool) -> int:
bc = BerghainClient()
rec = bc.gs.current_game()
if not rec:
print("No current game. Start one with `berghain new`.", file=sys.stderr)
return 1
if not rec.policy_path:
print("This game has no policy set. Use `berghain set-policy ./policy.py` first.", file=sys.stderr)
return 2
last5: Deque[str] = deque(maxlen=5)
renderer = ProgressiveRenderer(rec.constraints.mins, VENUE_CAP, REJECT_CAP)
renderer.begin() # reserve and print the block FIRST
# Soft SIGINT so the current network call finishes
guard = _SoftSigint()
guard.arm()
try:
entry = bc.step(rec)
except PolicyLoadError as e:
print(f"Policy error: {e}", file=sys.stderr)
guard.disarm()
return 2
finally:
guard.disarm()
# Update UI
line = _raw_line(entry, rec.constraints.mins)
if verbose:
_stderr(line)
last5.append(line)
renderer.render(last5, rec.tallies,
entry.get("admitted_count", rec.admitted_count),
entry.get("rejected_count", rec.rejected_count))
# If SIGINT occurred during the step, exit now (current person finished)
if guard.triggered:
_stderr("Interrupted. Finished current person; exiting step…")
_print_game_summary(rec)
return 130
# If game ended, print summary
if entry.get("status") != GameStatus.RUNNING:
_print_game_summary(rec)
return 0
def cmd_play(interactive: bool, verbose: bool) -> int:
bc = BerghainClient()
rec = bc.gs.current_game()
if not rec:
print("No current game. Start one with `berghain new`.", file=sys.stderr)
return 1
if not rec.policy_path:
print("This game has no policy set. Use `berghain set-policy ./policy.py` first.", file=sys.stderr)
return 2
last5: Deque[str] = deque(maxlen=5)
renderer = ProgressiveRenderer(rec.constraints.mins, VENUE_CAP, REJECT_CAP)
renderer.begin() # reserve the block before any stdout prints
_stderr("Autoplaying… (Ctrl-C to stop after finishing the current person)")
guard = _SoftSigint()
guard.arm()
try:
while rec.status == GameStatus.RUNNING and rec.admitted_count < VENUE_CAP:
if rec.rejected_count >= 20000:
break
try:
entry = bc.step(rec) # soft SIGINT -> flag set, call completes
except PolicyLoadError as e:
print(f"Policy error: {e}", file=sys.stderr)
return 2
line = _raw_line(entry, rec.constraints.mins)
if verbose:
_stderr(line)
last5.append(line)
renderer.render(last5, rec.tallies,
entry.get("admitted_count", rec.admitted_count),
entry.get("rejected_count", rec.rejected_count))
if guard.triggered:
_stderr("Interrupted. Finished current person; exiting autoplay…")
break
if rec.status != GameStatus.RUNNING:
break
if interactive:
# Allow Ctrl-C to abort prompt immediately (no in-flight request)
guard.disarm()
try:
input("Press Enter for next…")
except KeyboardInterrupt:
_stderr("Interrupted at prompt. Exiting after finishing current person…")
break
finally:
guard.arm()
finally:
guard.disarm()
bc.gs.save()
_print_game_summary(rec)
return 0
def cmd_list() -> int:
bc = BerghainClient()
games = bc.gs.list_games()
if not games:
print("No games found.")
return 0
for gid, (status, scenario, admitted, rejected) in games.items():
star = "*" if bc.gs.current_game_id == gid else " "
print(f"{star} {gid} scen={scenario} status={status} A={admitted} R={rejected}")
return 0
def cmd_switch(idprefix: str) -> int:
bc = BerghainClient()
ok = bc.gs.set_current_game(idprefix)
if not ok:
print("No unique match for that id prefix.", file=sys.stderr)
return 1
bc.gs.save()
print(f"Switched current game to: {bc.gs.current_game_id}")
return 0
def main(argv: Optional[list[str]] = None) -> int:
args = docopt(USAGE, argv=argv, version="berghain 0.0.1")
if args["set-player"]:
return cmd_set_player(args["<player>"])
elif args["new"]:
scen = int(args["--scenario"] or 1)
if scen not in (1, 2, 3):
print("--scenario must be 1, 2, or 3.", file=sys.stderr)
return 2
return cmd_new_game(scen)
elif args["play"]:
return cmd_play(bool(args["--interactive"]), bool(args["--verbose"]))
elif args["step"]:
return cmd_step(bool(args["--verbose"]))
elif args["list"]:
return cmd_list()
elif args["status"]:
return cmd_status(args["<idprefix>"])
elif args["switch"]:
return cmd_switch(args["<idprefix>"])
elif args["set-policy"]:
if args["default"]:
return cmd_set_policy("default")
else:
return cmd_set_policy(args["<file>"])
elif args["check-policy"]:
return cmd_check_policy(args["<file>"])
else:
print("Unknown command", file=sys.stderr)
return 1

5
berghain/config.py Normal file
View File

@ -0,0 +1,5 @@
import os
BASE_URL="https://berghain.challenges.listenlabs.ai"
STATE_PATH = os.path.expanduser("~/.berghain_state.json")
VENUE_CAP = 1000
REJECT_CAP = 20000

246
berghain/core.py Normal file
View File

@ -0,0 +1,246 @@
from __future__ import annotations
from typing import Any, Dict, Iterator, Optional
from .state import GameState, GameRecord, Constraints, AttributeStats, GameStatus
from .api import APIClient
from .policy import load_policy, PolicyLoadError, call_policy, default_policy_path
from .config import VENUE_CAP, REJECT_CAP
class BerghainClient:
def __init__(self) -> None:
self.gs = GameState.load()
self.api = APIClient()
# ----------------------------
# Player / game lifecycle
# ----------------------------
def set_player(self, uuid: str) -> None:
"""Sets the player ID and persists state."""
if not uuid:
raise ValueError("Player ID cannot be empty")
self.gs.player_id = uuid
self.gs.save()
def new_game(self, scenario: int) -> GameRecord:
"""
Starts a new game (policy is optional and may be attached later).
Returns the created GameRecord.
"""
resp = self.api.new_game(self.gs.player_id, scenario)
prev = self.gs.current_game()
inherit_path = getattr(prev, "policy_path", None) if prev else None
policy_path = inherit_path or default_policy_path()
rec = GameRecord(
game_id=resp["gameId"],
initial_raw=resp,
scenario=scenario,
status=GameStatus.RUNNING,
constraints=Constraints(
mins={c["attribute"]: int(c["minCount"]) for c in resp.get("constraints", [])}
),
stats=AttributeStats(
relative_frequencies=resp.get("attributeStatistics", {}).get("relativeFrequencies", {}) or {},
correlations=resp.get("attributeStatistics", {}).get("correlations", {}) or {},
),
admitted_count=0,
rejected_count=0,
next_person=None,
tallies={},
policy_path=policy_path,
)
self.gs.add_game(rec)
self.gs.save()
return rec
def set_policy_for_current_game(self, policy_path: str) -> GameRecord:
"""
Attach/replace a policy for the current game (validates it first).
"""
rec = self.gs.current_game()
if not rec:
raise ValueError("No current game to set a policy for.")
load_policy(policy_path) # validate before saving
rec.policy_path = policy_path
self.gs.save()
return rec
# ----------------------------
# Internals
# ----------------------------
def _get_decide_callable(self, rec: GameRecord):
"""
Load and return the user's decide() callable for the given game record.
Raises PolicyLoadError if missing/invalid.
"""
if not rec.policy_path:
raise PolicyLoadError("No policy set for this game. Use `berghain set-policy <file>`.")
return load_policy(rec.policy_path)
def _update_from_decide_response(self, rec: GameRecord, data: Dict[str, Any]) -> None:
"""
Update local record from /decide-and-next API response.
"""
status = (data.get("status") or "").lower()
if status in {GameStatus.RUNNING, GameStatus.COMPLETED, GameStatus.FAILED}:
rec.status = status
rec.admitted_count = int(data.get("admittedCount", rec.admitted_count))
rec.rejected_count = int(data.get("rejectedCount", rec.rejected_count))
rec.next_person = data.get("nextPerson")
self.gs.save()
def fetch_first_person(self, rec: GameRecord) -> None:
"""
Fetch the first person (personIndex=0). The API allows 'accept' to be omitted for index 0.
"""
data = self.api.decide_and_next(rec.game_id, person_index=0, accept=None)
self._update_from_decide_response(rec, data)
def _remaining_need(self, rec: GameRecord) -> Dict[str, int]:
"""
Compute remaining shortfall per constrained attribute based on local tallies.
"""
return {
a: max(0, rec.constraints.mins.get(a, 0) - rec.tallies.get(a, 0))
for a in rec.constraints.mins
}
# ----------------------------
# Step / autoplay
# ----------------------------
def step(self, rec: GameRecord) -> Dict[str, Any]:
"""
Execute ONE decision step using the current policy and return a transcript entry:
{
"personIndex": int | None,
"accept": bool,
"attributes": {attr: bool},
"remaining_need_before": {attr: int},
"admitted_count": int, # after API call
"rejected_count": int, # after API call
"status": "running"|"completed"|"failed",
"reason": str # human-readable reason (from policy)
}
If no move is possible (e.g., game not running), returns a no-op entry with current counters.
Raises PolicyLoadError if no policy is set/invalid.
"""
# If the game isn't running, return a no-op entry.
if rec.status != GameStatus.RUNNING:
return {
"personIndex": None,
"accept": False,
"attributes": {},
"remaining_need_before": self._remaining_need(rec),
"admitted_count": rec.admitted_count,
"rejected_count": rec.rejected_count,
"status": rec.status,
"reason": "Game not running",
}
# Ensure we have a person ready.
if not rec.next_person:
self.fetch_first_person(rec)
if not rec.next_person:
return {
"personIndex": None,
"accept": False,
"attributes": {},
"remaining_need_before": self._remaining_need(rec),
"admitted_count": rec.admitted_count,
"rejected_count": rec.rejected_count,
"status": rec.status,
"reason": "No next person",
}
# Load user policy decide() and compute decision.
decide = self._get_decide_callable(rec)
idx = int(rec.next_person.get("personIndex", 0))
attrs: Dict[str, bool] = rec.next_person.get("attributes", {}) or {}
remaining_need_before = self._remaining_need(rec)
accept = call_policy(
decide,
attributes=attrs,
tallies=rec.tallies,
mins=rec.constraints.mins,
admitted_count=rec.admitted_count,
venue_cap=VENUE_CAP,
# New optional context for policies that opt in:
p=rec.stats.relative_frequencies,
corr=rec.stats.correlations,
stats=rec.stats, # optional: full object if the policy wants it
)
# Ask the policy module for its last explanation
reason = None
if hasattr(decide, "__globals__"):
pol = decide.__globals__
if "policy_reason" in pol and callable(pol["policy_reason"]):
reason = pol["policy_reason"]()
# Update local tallies pre-API so they reflect our chosen accept.
if accept:
rec.bump_tallies(attrs)
# Send decision, update state.
data = self.api.decide_and_next(rec.game_id, person_index=idx, accept=bool(accept))
self._update_from_decide_response(rec, data)
# Build the entry dict (with reason included).
entry = {
"personIndex": idx,
"accept": bool(accept),
"attributes": attrs,
"remaining_need_before": remaining_need_before,
"admitted_count": rec.admitted_count,
"rejected_count": rec.rejected_count,
"status": rec.status,
"reason": reason or ("Admit" if accept else "Reject"),
}
return entry
def autoplay_stream(
self,
rec: GameRecord,
max_steps: Optional[int] = None,
) -> Iterator[Dict[str, Any]]:
"""
Yield transcript entries (same shape as step()) until completion/failure.
Stops early if max_steps is provided.
"""
# Ensure policy exists before starting (raises PolicyLoadError if missing/invalid).
self._get_decide_callable(rec)
# Ensure we have the first person available to step on.
if rec.next_person is None and rec.status == GameStatus.RUNNING:
self.fetch_first_person(rec)
steps = 0
while rec.status == GameStatus.RUNNING:
if rec.admitted_count >= VENUE_CAP or rec.rejected_count >= REJECT_CAP:
break
entry = self.step(rec)
yield entry
steps += 1
if max_steps is not None and steps >= max_steps:
break
def autoplay(self, rec: GameRecord) -> GameRecord:
"""
Run until completion/failure using the current policy (drains the stream).
"""
for _ in self.autoplay_stream(rec):
pass
return rec

View File

@ -0,0 +1,78 @@
# default_policy.py
# Built-in default policy for Berghain Challenge.
#
# Strategy:
# - If all minimums are satisfied, admit freely.
# - Admit creatives until quota met (they are rare).
# - Admit Berlin locals until quota met (they are required in bulk).
# - Otherwise admit if candidate helps at least one unmet need,
# provided there is still enough capacity to meet all others.
# - Reject if candidate does not help with unmet needs
# or would make quotas infeasible.
#
# The policy records a human-readable reason string for each decision,
# retrievable via policy_reason(). Your UI can display this directly.
import sys
DEBUG = False
_last_reason: str | None = None
def _set_reason(msg: str) -> None:
"""Set the last decision reason (and echo to stderr if DEBUG)."""
global _last_reason
_last_reason = msg
if DEBUG:
print(msg, file=sys.stderr, flush=True)
def policy_reason() -> str | None:
"""Return the most recent decision reason string."""
return _last_reason
def decide(attributes: dict[str, bool],
tallies: dict[str, int],
mins: dict[str, int],
admitted_count: int,
venue_cap: int,
**_) -> bool:
"""Return True to admit, False to reject."""
S = venue_cap - admitted_count
need = {a: max(0, mins[a] - tallies.get(a, 0)) for a in mins}
# 1. All quotas already met
if sum(need.values()) == 0:
_set_reason("Admit (all minimums satisfied)")
return True
# 2. Admit creatives until quota met
if need.get("creative", 0) > 0 and attributes.get("creative", False):
_set_reason("Admit creative (rare + still needed)")
return True
# 3. Admit locals until quota met
if need.get("berlin_local", 0) > 0 and attributes.get("berlin_local", False):
_set_reason("Admit berlin_local (large quota still unmet)")
return True
# 4. Admit if candidate helps an unmet need and feasibility is safe
helps = [a for a in need if need[a] > 0 and attributes.get(a, False)]
if helps:
S_after = S - 1
remaining_after = sum(
max(0, need[a] - (1 if attributes.get(a, False) else 0))
for a in need
)
if S_after >= remaining_after:
_set_reason(f"Admit (helps {', '.join(helps)}, feasibility ok)")
return True
else:
_set_reason("Reject (would break feasibility)")
return False
# 5. Reject if they dont help
_set_reason("Reject (no help to unmet needs)")
return False

50
berghain/policy.py Normal file
View File

@ -0,0 +1,50 @@
# policy.py (loader/utilities for user-supplied policy scripts)
from __future__ import annotations
import importlib.util
import inspect
import os
from types import ModuleType
from typing import Callable, Any
class PolicyLoadError(Exception):
pass
def _import_module_from_path(path: str) -> ModuleType:
if not os.path.exists(path):
raise PolicyLoadError(f"Policy file not found: {path}")
spec = importlib.util.spec_from_file_location("user_policy", path)
if spec is None or spec.loader is None:
raise PolicyLoadError("Could not load policy module spec.")
mod = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(mod) # type: ignore[reportAttributeAccessIssue]
except Exception as e:
raise PolicyLoadError(f"Error executing policy module: {e}") from e
return mod
def default_policy_path() -> str:
return os.path.join(os.path.dirname(__file__), "default_policy.py")
def load_policy(path: str) -> Callable[..., bool]:
"""
Load a user policy file and return the `decide` callable.
"""
mod = _import_module_from_path(path)
decide = getattr(mod, "decide", None)
if not callable(decide):
raise PolicyLoadError("Policy script must define a callable `decide(...)`.")
return decide
def call_policy(decide: Callable[..., Any], **kwargs: Any) -> Any:
"""
Call `decide` with only the parameters it accepts.
This lets us pass new optional context (like p, corr) without breaking old policies.
"""
sig = inspect.signature(decide)
accepted = {k: v for k, v in kwargs.items() if k in sig.parameters}
return decide(**accepted)

124
berghain/state.py Normal file
View File

@ -0,0 +1,124 @@
from __future__ import annotations
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, Tuple
import json
import os
from .config import STATE_PATH
class GameStatus(str):
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Constraints:
mins: Dict[str, int] = field(default_factory=dict)
@dataclass
class AttributeStats:
relative_frequencies: Dict[str, float] = field(default_factory=dict)
correlations: Dict[str, Dict[str, float]] = field(default_factory=dict)
@dataclass
class GameRecord:
game_id: str
initial_raw: Dict[str, Any] | None = None
scenario: int | None = None
status: str = GameStatus.RUNNING
constraints: Constraints = field(default_factory=Constraints)
stats: AttributeStats = field(default_factory=AttributeStats)
admitted_count: int = 0
rejected_count: int = 0
next_person: Dict[str, Any] | None = None
tallies: Dict[str, int] = field(default_factory=dict)
policy_path: str | None = None # <— path to the user-provided policy script
def bump_tallies(self, attributes: Dict[str, bool]) -> None:
for k, v in attributes.items():
if v:
self.tallies[k] = self.tallies.get(k, 0) + 1
def raw(self) -> Dict[str, Any]:
return asdict(self)
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> "GameRecord":
constraints = Constraints(**(d.get("constraints") or {}))
stats = AttributeStats(**(d.get("stats") or {}))
return cls(
game_id=d["game_id"],
initial_raw=d.get("initial_raw"),
scenario=d.get("scenario"),
status=d.get("status", GameStatus.RUNNING),
constraints=constraints,
stats=stats,
admitted_count=d.get("admitted_count", 0),
rejected_count=d.get("rejected_count", 0),
next_person=d.get("next_person"),
tallies=d.get("tallies", {}) or {},
policy_path=d.get("policy_path"),
)
@dataclass
class GameState:
player_id: str | None = None
current_game_id: str | None = None
games: Dict[str, GameRecord] = field(default_factory=dict)
def save(self) -> None:
payload = {
"player_id": self.player_id,
"current_game_id": self.current_game_id,
"games": {gid: g.raw() for gid, g in self.games.items()},
}
os.makedirs(os.path.dirname(STATE_PATH), exist_ok=True)
with open(STATE_PATH, "w", encoding="utf-8") as f:
json.dump(payload, f, indent=2)
@classmethod
def load(cls) -> "GameState":
try:
with open(STATE_PATH, "r", encoding="utf-8") as f:
raw = json.load(f)
except FileNotFoundError:
return cls()
gs = cls()
gs.player_id = raw.get("player_id")
gs.current_game_id = raw.get("current_game_id")
for gid, graw in (raw.get("games") or {}).items():
gs.games[gid] = GameRecord.from_dict(graw)
return gs
def add_game(self, rec: GameRecord) -> None:
self.games[rec.game_id] = rec
self.current_game_id = rec.game_id
def current_game(self) -> GameRecord | None:
return self.games.get(self.current_game_id) if self.current_game_id else None
def set_current_game(self, game_id_prefix: str) -> bool:
g = self.get_game(game_id_prefix)
if g:
self.current_game_id = g.game_id
return True
return False
def get_game(self, game_id_prefix: str) -> GameRecord | None:
matches = [gid for gid in self.games if gid.startswith(game_id_prefix)]
if len(matches) == 1:
return self.games[matches[0]]
return None
def list_games(self) -> Dict[str, Tuple[str, int | None, int, int]]:
return {
gid: (g.status, g.scenario, g.admitted_count, g.rejected_count)
for gid, g in self.games.items()
}

181
berghain/ui.py Normal file
View File

@ -0,0 +1,181 @@
from __future__ import annotations
import shutil
from typing import Deque, Dict, List
# ANSI
RESET = "\x1b[0m"
BOLD = "\x1b[1m"
DIM = "\x1b[2m"
FG_RED = "\x1b[31m"
FG_GREEN = "\x1b[32m"
FG_CYAN = "\x1b[36m"
FG_YELLOW = "\x1b[33m"
ERASE_EOL = "\x1b[K"
def _term_width() -> int:
return shutil.get_terminal_size(fallback=(80, 20)).columns
def _bar(current: int, target: int, width: int, color: str = "", label: str = "", pad_to: int | None = None) -> str:
"""
Draw a bar and show " current/target" with current optionally right-aligned to pad_to digits.
"""
target = max(1, target)
ratio = min(1.0, max(0.0, current / target))
filled = int(ratio * width)
bar = "[" + "#" * filled + "-" * (width - filled) + "]"
label_part = f" {label}" if label else ""
# right-align current to pad_to (defaults to len(str(target)))
w = pad_to if pad_to is not None else len(str(target))
cur_s = str(current).rjust(w)
val = f"{cur_s}/{target}"
return f"{color}{bar}{RESET}{label_part} {DIM}{val}{RESET}"
class ProgressiveRenderer:
"""
Flicker-free renderer that:
- Prints the whole block ONCE (with real newlines) and leaves the cursor
on the LAST line of the block (no extra newline).
- Updates by moving the cursor up to the top of the block, overwriting
ONLY changed lines, then returning to the bottom line (never beyond).
- The initial scaffold reserves blank lines for the constraints to avoid
label vs. label+bar duplication on some terminals.
"""
def __init__(self, mins: Dict[str, int], venue_cap: int, reject_cap: int) -> None:
self.mins = dict(mins)
self.labels = sorted(self.mins.keys())
self.venue_cap = venue_cap
self.reject_cap = reject_cap
# Layout indices within the reserved block
self.recent_count = 5
self.recent_title = 0
self.recent_lines_start = 1 # 5 lines: [1..5]
self.recent_lines_end = self.recent_lines_start + self.recent_count - 1 # 5
self.constraints_header = self.recent_lines_end + 1 + 1 # +1 spacer
self.constraints_start = self.constraints_header + 1
self.constraints_lines = len(self.labels)
self.constraints_end = self.constraints_start + self.constraints_lines - 1
self.totals_header = self.constraints_end + 1 + 1
self.total_admit_line = self.totals_header + 1
self.total_reject_line = self.totals_header + 2
self.total_lines = self.total_reject_line + 1 # number of lines in block
self._cache: Dict[int, str] = {} # last rendered content per line
term = _term_width()
self.bar_inner_width = max(10, min(50, term - 30))
# Keep the cursor "inside" the block; we park it on the last line.
self._cur_line = self.total_lines - 1
self._begun = False
# ---- cursor movement helpers (relative to *current* position) ----
def _move_to_top(self) -> None:
if self._cur_line > 0:
print(f"\x1b[{self._cur_line}F", end="") # up to BOL of top line
self._cur_line = 0
else:
print("\r", end="")
def _move_to_line(self, target: int) -> None:
delta = target - self._cur_line
if delta > 0:
print(f"\x1b[{delta}E", end="") # down to BOL of target
elif delta < 0:
print(f"\x1b[{-delta}F", end="") # up to BOL of target
else:
print("\r", end="")
self._cur_line = target
def _write_line(self, text: str) -> None:
print("\r" + text + ERASE_EOL, end="")
def _return_to_bottom(self) -> None:
if self._cur_line < self.total_lines - 1:
print(f"\x1b[{(self.total_lines - 1) - self._cur_line}E", end="")
self._cur_line = self.total_lines - 1
print("\r", end="", flush=True)
# ---- public API ----
def begin(self) -> None:
if self._begun:
return
# Print the static block once. IMPORTANT: the constraint lines are BLANK here.
lines: List[str] = []
# Recent
lines.append(f"{BOLD}Recent decisions{RESET}")
for _ in range(self.recent_count):
lines.append("")
# spacer
lines.append("")
# Constraints header + blank lines (reserved)
lines.append(f"{BOLD}Constraints{RESET}")
for _ in self.labels:
lines.append("") # reserve a blank line for each constraint
# spacer
lines.append("")
# Totals
lines.append(f"{BOLD}Totals{RESET}")
lines.append("") # admitted bar
lines.append("") # rejected count
for i, txt in enumerate(lines):
end = "" if i == len(lines) - 1 else "\n"
print(txt, end=end)
self._cache[i] = txt
self._cur_line = self.total_lines - 1
print("\r", end="", flush=True)
self._begun = True
def render(self, recent: Deque[str], tallies: Dict[str, int],
admitted: int, rejected: int) -> None:
"""
Update the block with current data, overwriting only changed lines.
Leaves the cursor on the last line of the block (BOL).
"""
desired: Dict[int, str] = {}
# Recent (bottom-aligned) — we now expect `recent` to contain
# human-readable policy reasons, not raw attribute dumps.
pad = max(0, self.recent_count - len(recent))
recent_padded = [""] * pad + list(recent)
desired[self.recent_title] = f"{BOLD}Recent decisions{RESET}"
for i in range(self.recent_count):
desired[self.recent_lines_start + i] = recent_padded[i]
# Constraints (full line: label + bar)
desired[self.constraints_header] = f"{BOLD}Constraints{RESET}"
for i, a in enumerate(self.labels):
have = tallies.get(a, 0)
need = self.mins[a]
color = FG_CYAN if have < need else FG_YELLOW
bar = _bar(have, need, self.bar_inner_width, color, pad_to=len(str(need)))
desired[self.constraints_start + i] = f"{a:>16}: {bar}"
# Totals (right-aligned numbers)
desired[self.totals_header] = f"{BOLD}Totals{RESET}"
desired[self.total_admit_line] = _bar(
admitted, self.venue_cap, self.bar_inner_width,
FG_GREEN, "admitted", pad_to=len(str(self.venue_cap))
)
rej_w = len(str(self.reject_cap))
desired[self.total_reject_line] = f"{FG_RED}rejected{RESET} {DIM}{str(rejected).rjust(rej_w)}{RESET}"
# Minimal redraw
self._move_to_top()
for idx in range(self.total_lines):
new_text = desired.get(idx, "")
if self._cache.get(idx) != new_text:
self._move_to_line(idx)
self._write_line(new_text)
self._cache[idx] = new_text
self._return_to_bottom()

11
pyproject.toml Normal file
View File

@ -0,0 +1,11 @@
[project]
name = "berghain"
version = "0.1.1"
description = "berghain challenge client"
dependencies = [
"docopt>=0.6",
"requests>=2.31",
]
[project.scripts]
berghain = "berghain.cli:main"