diff --git a/berghain/__init__.py b/berghain/__init__.py new file mode 100644 index 0000000..95492ec --- /dev/null +++ b/berghain/__init__.py @@ -0,0 +1,2 @@ +__app_name__ = "berghain" +__version__ = "0.0.1" diff --git a/berghain/__main__.py b/berghain/__main__.py new file mode 100644 index 0000000..5fe53a5 --- /dev/null +++ b/berghain/__main__.py @@ -0,0 +1,3 @@ +from .cli import main +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/berghain/api.py b/berghain/api.py new file mode 100644 index 0000000..e5fb357 --- /dev/null +++ b/berghain/api.py @@ -0,0 +1,50 @@ +from typing import Any +import requests +from . import __version__ +from .config import BASE_URL + +class APIRequestError(Exception): + """Raised when the API request is malformed""" + +class APIResponseError(Exception): + """Raised when the API returns an error""" + +class APIClient: + def __init__(self) -> None: + self.base_url = BASE_URL.rstrip("/") + self.session = requests.Session() + self.session.headers.update({"User-Agent": f"kaylee's berghain-cli/{__version__}"}) + + def new_game(self, player_id: str | None, scenario: int) -> dict[str, Any]: + """ + Start a new game for this player + """ + if player_id is None: + raise APIRequestError("A player ID is required") + return self.__do_request("new-game",{"scenario": scenario, "playerId": player_id}, "gameId") + + def decide_and_next(self, game_id: str, person_index: int, accept: bool | None = None) -> dict[str, Any]: + """Submit a decision for the current person and get the next one""" + params: dict[str, Any] = {"gameId": game_id, "personIndex": person_index} + + if person_index > 0 and accept is None: + raise APIRequestError("You must specify whether to accept or reject this person") + + if accept is not None: + params["accept"] = str(accept).lower() + + return self.__do_request("decide-and-next", params, "status") + + + def __do_request(self, endpoint: str, params: dict[str, Any], check_for: str | None = None) -> dict[str, Any]: + resp = self.session.get(f"{self.base_url}/{endpoint}", params=params, timeout=10) + if not resp.ok: + raise APIResponseError(f"HTTP {resp.status_code}: {resp.text}") + try: + data = resp.json() + except ValueError as e: + raise APIResponseError(f"Invalid JSON in response: {resp.text}") + if check_for is not None and check_for not in data: + raise APIResponseError(f"Malformed response (key \"{check_for}\" not found): {data}") + + return data diff --git a/berghain/cli.py b/berghain/cli.py new file mode 100644 index 0000000..d4bac7e --- /dev/null +++ b/berghain/cli.py @@ -0,0 +1,386 @@ +from __future__ import annotations + +import os +import sys +import signal +from collections import deque +import json +from typing import Optional, Deque + +from docopt import docopt + +from .core import BerghainClient +from .policy import load_policy, PolicyLoadError, default_policy_path +from .config import VENUE_CAP, REJECT_CAP +from .state import GameStatus +from .ui import ProgressiveRenderer + +USAGE = """berghain + +Usage: + berghain set-player + berghain new [--scenario=] + berghain play [--interactive] [--verbose] + berghain step [--verbose] + berghain list + berghain status [] + berghain switch + berghain set-policy (default | ) + berghain check-policy + berghain (-h | --help) + berghain --version + +Options: + -h --help Show this screen. + --version Show version. + --scenario= Scenario number (1, 2, or 3). [default: 1] + --interactive Pause for Enter between steps. + --verbose Send raw transcript lines to stderr (policy debug prints too). +""" + +TEMPLATE_POLICY = """# policy.py (user template) +# Write your own decision logic. Must define: +# decide(attributes, tallies, mins, admitted_count, venue_cap, **kwargs) -> bool +# +# Hints: +# - 'attributes' are booleans for the current person. +# - 'tallies' counts already-admitted attributes. +# - 'mins' are required minimum counts per attribute. +# - You may optionally accept: p (relative frequencies), corr (correlation matrix). + +def decide(attributes: dict[str, bool], + tallies: dict[str, int], + mins: dict[str, int], + admitted_count: int, + venue_cap: int, + **kwargs) -> bool: + # Example starter: admit only if they help an unmet attribute. + need = {a: max(0, mins[a] - tallies.get(a, 0)) for a in mins} + if sum(need.values()) == 0: + return True + return any(attributes.get(a, False) and need[a] > 0 for a in mins) +""" + + +# -------- transcript helpers -------- + +def _raw_line(entry: dict, mins: dict[str, int]) -> str: + idx = entry.get("personIndex") + accept = entry.get("accept") + attrs = entry.get("attributes", {}) + need_before = entry.get("remaining_need_before", {}) + xs = sorted([k for k, v in attrs.items() if v]) + helped = sorted([a for a in xs if need_before.get(a, 0) > 0]) + return f"#{idx:>5} {'ACCEPT' if accept else 'reject':<6} attrs: {', '.join(xs) if xs else '∅':<24} helped: {', '.join(helped) if helped else 'none'}" + +def _stderr(s: str) -> None: + print(s, file=sys.stderr, flush=True) + +# -------- signal handling (finish current person, then exit) -------- + +class _SoftSigint: + """Install a SIGINT handler that sets a flag and does not raise KeyboardInterrupt. + Use .armed and .disarm to toggle around blocking input. + """ + def __init__(self) -> None: + self._old = None + self.triggered = False + + def _handler(self, signum, frame): + self.triggered = True + + def arm(self) -> None: + self.triggered = False + self._old = signal.getsignal(signal.SIGINT) + signal.signal(signal.SIGINT, self._handler) + # Restart interrupted system calls rather than raising: + try: + signal.siginterrupt(signal.SIGINT, False) + except AttributeError: + pass # not available on some platforms + + def disarm(self) -> None: + if self._old is not None: + signal.signal(signal.SIGINT, self._old) + self._old = None + +# -------- commands -------- + +def _print_game_summary(rec) -> None: + mins = rec.constraints.mins + print(f"Game: {rec.game_id}") + print(f" Scenario: {rec.scenario}") + print(f" Status: {rec.status}") + print(f" Admitted: {rec.admitted_count}") + print(f" Rejected: {rec.rejected_count}") + print(f" Policy: {rec.policy_path or '(none)'}") + if mins: + print(" Constraints:") + for a, m in mins.items(): + have = rec.tallies.get(a, 0) + print(f" - {a}: {have}/{m}") + print() + +def _ensure_template(path: str) -> None: + dirpath = os.path.dirname(os.path.abspath(path)) or "." + os.makedirs(dirpath, exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + f.write(TEMPLATE_POLICY) + +def cmd_set_player(player: str) -> int: + bc = BerghainClient() + bc.set_player(player) + print(f"Player set: {player}") + return 0 + +def cmd_new_game(scenario: int) -> int: + bc = BerghainClient() + if scenario not in (1, 2, 3): + print("--scenario must be 1, 2, or 3.", file=sys.stderr) + return 2 + rec = bc.new_game(scenario=scenario) + _print_game_summary(rec) + print("Tip: attach a policy with `berghain set-policy ./my_policy.py` (required before `play`).") + return 0 + +def cmd_status(idprefix: str | None) -> int: + bc = BerghainClient() + rec = None + if idprefix: + rec = bc.gs.get_game(idprefix) + if not rec: + print("No unique match for that id prefix.", file=sys.stderr) + return 1 + else: + rec = bc.gs.current_game() + if not rec: + print("No current game. Start one with `berghain new`.", file=sys.stderr) + return 1 + + _print_game_summary(rec) + print("Initial conditions:") + print(json.dumps(bc.gs.current_game().initial_raw, indent=2)) + return 0 + + +def cmd_set_policy(path: str) -> int: + bc = BerghainClient() + rec = bc.gs.current_game() + if not rec: + print("No current game. Start one with `berghain new`.", file=sys.stderr) + return 1 + + if path.strip().lower() == "default": + try: + load_policy(default_policy_path()) + except PolicyLoadError as e: + print(f"Default policy error: {e}", file=sys.stderr) + return 2 + rec = bc.set_policy_for_current_game(default_policy_path()) + print(f"Policy set for game {rec.game_id}: {rec.policy_path}") + return 0 + + if not os.path.exists(path): + print(f"Policy file not found, creating template: {path}") + try: + _ensure_template(path) + except OSError as e: + print(f"Could not create template policy: {e}", file=sys.stderr) + return 2 + try: + load_policy(path) + except PolicyLoadError as e: + print(f"Policy error: {e}", file=sys.stderr) + return 2 + rec = bc.set_policy_for_current_game(path) + print(f"Policy set for game {rec.game_id}: {rec.policy_path}") + return 0 + +def cmd_check_policy(path: str) -> int: + if path.strip().lower() == "default": + try: + load_policy(default_policy_path()) + except PolicyLoadError as e: + print(f"Invalid default policy: {e}", file=sys.stderr) + return 2 + print("Default policy looks good.") + return 0 + elif not os.path.exists(path): + print(f"Policy file not found, creating template: {path}") + try: + _ensure_template(path) + except OSError as e: + print(f"Could not create template policy: {e}", file=sys.stderr) + return 2 + try: + load_policy(path) + except PolicyLoadError as e: + print(f"Invalid policy: {e}", file=sys.stderr) + return 2 + print("Policy looks good.") + return 0 + +def cmd_step(verbose: bool) -> int: + bc = BerghainClient() + rec = bc.gs.current_game() + if not rec: + print("No current game. Start one with `berghain new`.", file=sys.stderr) + return 1 + if not rec.policy_path: + print("This game has no policy set. Use `berghain set-policy ./policy.py` first.", file=sys.stderr) + return 2 + + last5: Deque[str] = deque(maxlen=5) + renderer = ProgressiveRenderer(rec.constraints.mins, VENUE_CAP, REJECT_CAP) + renderer.begin() # reserve and print the block FIRST + + # Soft SIGINT so the current network call finishes + guard = _SoftSigint() + guard.arm() + try: + entry = bc.step(rec) + except PolicyLoadError as e: + print(f"Policy error: {e}", file=sys.stderr) + guard.disarm() + return 2 + finally: + guard.disarm() + + # Update UI + line = _raw_line(entry, rec.constraints.mins) + if verbose: + _stderr(line) + last5.append(line) + renderer.render(last5, rec.tallies, + entry.get("admitted_count", rec.admitted_count), + entry.get("rejected_count", rec.rejected_count)) + + # If SIGINT occurred during the step, exit now (current person finished) + if guard.triggered: + _stderr("Interrupted. Finished current person; exiting step…") + _print_game_summary(rec) + return 130 + + # If game ended, print summary + if entry.get("status") != GameStatus.RUNNING: + _print_game_summary(rec) + + return 0 + + +def cmd_play(interactive: bool, verbose: bool) -> int: + bc = BerghainClient() + rec = bc.gs.current_game() + if not rec: + print("No current game. Start one with `berghain new`.", file=sys.stderr) + return 1 + if not rec.policy_path: + print("This game has no policy set. Use `berghain set-policy ./policy.py` first.", file=sys.stderr) + return 2 + + last5: Deque[str] = deque(maxlen=5) + renderer = ProgressiveRenderer(rec.constraints.mins, VENUE_CAP, REJECT_CAP) + renderer.begin() # reserve the block before any stdout prints + + _stderr("Autoplaying… (Ctrl-C to stop after finishing the current person)") + + guard = _SoftSigint() + guard.arm() + + try: + while rec.status == GameStatus.RUNNING and rec.admitted_count < VENUE_CAP: + if rec.rejected_count >= 20000: + break + + try: + entry = bc.step(rec) # soft SIGINT -> flag set, call completes + except PolicyLoadError as e: + print(f"Policy error: {e}", file=sys.stderr) + return 2 + + line = _raw_line(entry, rec.constraints.mins) + if verbose: + _stderr(line) + last5.append(line) + renderer.render(last5, rec.tallies, + entry.get("admitted_count", rec.admitted_count), + entry.get("rejected_count", rec.rejected_count)) + + if guard.triggered: + _stderr("Interrupted. Finished current person; exiting autoplay…") + break + + if rec.status != GameStatus.RUNNING: + break + + if interactive: + # Allow Ctrl-C to abort prompt immediately (no in-flight request) + guard.disarm() + try: + input("Press Enter for next…") + except KeyboardInterrupt: + _stderr("Interrupted at prompt. Exiting after finishing current person…") + break + finally: + guard.arm() + + finally: + guard.disarm() + bc.gs.save() + + _print_game_summary(rec) + return 0 + + +def cmd_list() -> int: + bc = BerghainClient() + games = bc.gs.list_games() + if not games: + print("No games found.") + return 0 + for gid, (status, scenario, admitted, rejected) in games.items(): + star = "*" if bc.gs.current_game_id == gid else " " + print(f"{star} {gid} scen={scenario} status={status} A={admitted} R={rejected}") + return 0 + +def cmd_switch(idprefix: str) -> int: + bc = BerghainClient() + ok = bc.gs.set_current_game(idprefix) + if not ok: + print("No unique match for that id prefix.", file=sys.stderr) + return 1 + bc.gs.save() + print(f"Switched current game to: {bc.gs.current_game_id}") + return 0 + +def main(argv: Optional[list[str]] = None) -> int: + args = docopt(USAGE, argv=argv, version="berghain 0.0.1") + if args["set-player"]: + return cmd_set_player(args[""]) + elif args["new"]: + scen = int(args["--scenario"] or 1) + if scen not in (1, 2, 3): + print("--scenario must be 1, 2, or 3.", file=sys.stderr) + return 2 + return cmd_new_game(scen) + elif args["play"]: + return cmd_play(bool(args["--interactive"]), bool(args["--verbose"])) + elif args["step"]: + return cmd_step(bool(args["--verbose"])) + elif args["list"]: + return cmd_list() + elif args["status"]: + return cmd_status(args[""]) + elif args["switch"]: + return cmd_switch(args[""]) + elif args["set-policy"]: + if args["default"]: + return cmd_set_policy("default") + else: + return cmd_set_policy(args[""]) + elif args["check-policy"]: + return cmd_check_policy(args[""]) + else: + print("Unknown command", file=sys.stderr) + return 1 + diff --git a/berghain/config.py b/berghain/config.py new file mode 100644 index 0000000..53a5a34 --- /dev/null +++ b/berghain/config.py @@ -0,0 +1,5 @@ +import os +BASE_URL="https://berghain.challenges.listenlabs.ai" +STATE_PATH = os.path.expanduser("~/.berghain_state.json") +VENUE_CAP = 1000 +REJECT_CAP = 20000 diff --git a/berghain/core.py b/berghain/core.py new file mode 100644 index 0000000..f15a48a --- /dev/null +++ b/berghain/core.py @@ -0,0 +1,246 @@ +from __future__ import annotations + +from typing import Any, Dict, Iterator, Optional + +from .state import GameState, GameRecord, Constraints, AttributeStats, GameStatus +from .api import APIClient +from .policy import load_policy, PolicyLoadError, call_policy, default_policy_path +from .config import VENUE_CAP, REJECT_CAP + + +class BerghainClient: + def __init__(self) -> None: + self.gs = GameState.load() + self.api = APIClient() + + # ---------------------------- + # Player / game lifecycle + # ---------------------------- + + def set_player(self, uuid: str) -> None: + """Sets the player ID and persists state.""" + if not uuid: + raise ValueError("Player ID cannot be empty") + self.gs.player_id = uuid + self.gs.save() + + def new_game(self, scenario: int) -> GameRecord: + """ + Starts a new game (policy is optional and may be attached later). + Returns the created GameRecord. + """ + resp = self.api.new_game(self.gs.player_id, scenario) + + prev = self.gs.current_game() + inherit_path = getattr(prev, "policy_path", None) if prev else None + policy_path = inherit_path or default_policy_path() + + rec = GameRecord( + game_id=resp["gameId"], + initial_raw=resp, + scenario=scenario, + status=GameStatus.RUNNING, + constraints=Constraints( + mins={c["attribute"]: int(c["minCount"]) for c in resp.get("constraints", [])} + ), + stats=AttributeStats( + relative_frequencies=resp.get("attributeStatistics", {}).get("relativeFrequencies", {}) or {}, + correlations=resp.get("attributeStatistics", {}).get("correlations", {}) or {}, + ), + admitted_count=0, + rejected_count=0, + next_person=None, + tallies={}, + policy_path=policy_path, + ) + self.gs.add_game(rec) + self.gs.save() + return rec + + def set_policy_for_current_game(self, policy_path: str) -> GameRecord: + """ + Attach/replace a policy for the current game (validates it first). + """ + rec = self.gs.current_game() + if not rec: + raise ValueError("No current game to set a policy for.") + load_policy(policy_path) # validate before saving + rec.policy_path = policy_path + self.gs.save() + return rec + + # ---------------------------- + # Internals + # ---------------------------- + + def _get_decide_callable(self, rec: GameRecord): + """ + Load and return the user's decide() callable for the given game record. + Raises PolicyLoadError if missing/invalid. + """ + if not rec.policy_path: + raise PolicyLoadError("No policy set for this game. Use `berghain set-policy `.") + return load_policy(rec.policy_path) + + def _update_from_decide_response(self, rec: GameRecord, data: Dict[str, Any]) -> None: + """ + Update local record from /decide-and-next API response. + """ + status = (data.get("status") or "").lower() + if status in {GameStatus.RUNNING, GameStatus.COMPLETED, GameStatus.FAILED}: + rec.status = status + rec.admitted_count = int(data.get("admittedCount", rec.admitted_count)) + rec.rejected_count = int(data.get("rejectedCount", rec.rejected_count)) + rec.next_person = data.get("nextPerson") + self.gs.save() + + def fetch_first_person(self, rec: GameRecord) -> None: + """ + Fetch the first person (personIndex=0). The API allows 'accept' to be omitted for index 0. + """ + data = self.api.decide_and_next(rec.game_id, person_index=0, accept=None) + self._update_from_decide_response(rec, data) + + def _remaining_need(self, rec: GameRecord) -> Dict[str, int]: + """ + Compute remaining shortfall per constrained attribute based on local tallies. + """ + return { + a: max(0, rec.constraints.mins.get(a, 0) - rec.tallies.get(a, 0)) + for a in rec.constraints.mins + } + + # ---------------------------- + # Step / autoplay + # ---------------------------- + + def step(self, rec: GameRecord) -> Dict[str, Any]: + """ + Execute ONE decision step using the current policy and return a transcript entry: + + { + "personIndex": int | None, + "accept": bool, + "attributes": {attr: bool}, + "remaining_need_before": {attr: int}, + "admitted_count": int, # after API call + "rejected_count": int, # after API call + "status": "running"|"completed"|"failed", + "reason": str # human-readable reason (from policy) + } + + If no move is possible (e.g., game not running), returns a no-op entry with current counters. + Raises PolicyLoadError if no policy is set/invalid. + """ + # If the game isn't running, return a no-op entry. + if rec.status != GameStatus.RUNNING: + return { + "personIndex": None, + "accept": False, + "attributes": {}, + "remaining_need_before": self._remaining_need(rec), + "admitted_count": rec.admitted_count, + "rejected_count": rec.rejected_count, + "status": rec.status, + "reason": "Game not running", + } + + # Ensure we have a person ready. + if not rec.next_person: + self.fetch_first_person(rec) + if not rec.next_person: + return { + "personIndex": None, + "accept": False, + "attributes": {}, + "remaining_need_before": self._remaining_need(rec), + "admitted_count": rec.admitted_count, + "rejected_count": rec.rejected_count, + "status": rec.status, + "reason": "No next person", + } + + # Load user policy decide() and compute decision. + decide = self._get_decide_callable(rec) + + idx = int(rec.next_person.get("personIndex", 0)) + attrs: Dict[str, bool] = rec.next_person.get("attributes", {}) or {} + remaining_need_before = self._remaining_need(rec) + + accept = call_policy( + decide, + attributes=attrs, + tallies=rec.tallies, + mins=rec.constraints.mins, + admitted_count=rec.admitted_count, + venue_cap=VENUE_CAP, + # New optional context for policies that opt in: + p=rec.stats.relative_frequencies, + corr=rec.stats.correlations, + stats=rec.stats, # optional: full object if the policy wants it + ) + + # Ask the policy module for its last explanation + reason = None + if hasattr(decide, "__globals__"): + pol = decide.__globals__ + if "policy_reason" in pol and callable(pol["policy_reason"]): + reason = pol["policy_reason"]() + + # Update local tallies pre-API so they reflect our chosen accept. + if accept: + rec.bump_tallies(attrs) + + # Send decision, update state. + data = self.api.decide_and_next(rec.game_id, person_index=idx, accept=bool(accept)) + self._update_from_decide_response(rec, data) + + # Build the entry dict (with reason included). + entry = { + "personIndex": idx, + "accept": bool(accept), + "attributes": attrs, + "remaining_need_before": remaining_need_before, + "admitted_count": rec.admitted_count, + "rejected_count": rec.rejected_count, + "status": rec.status, + "reason": reason or ("Admit" if accept else "Reject"), + } + + return entry + + + def autoplay_stream( + self, + rec: GameRecord, + max_steps: Optional[int] = None, + ) -> Iterator[Dict[str, Any]]: + """ + Yield transcript entries (same shape as step()) until completion/failure. + Stops early if max_steps is provided. + """ + # Ensure policy exists before starting (raises PolicyLoadError if missing/invalid). + self._get_decide_callable(rec) + + # Ensure we have the first person available to step on. + if rec.next_person is None and rec.status == GameStatus.RUNNING: + self.fetch_first_person(rec) + + steps = 0 + while rec.status == GameStatus.RUNNING: + if rec.admitted_count >= VENUE_CAP or rec.rejected_count >= REJECT_CAP: + break + entry = self.step(rec) + yield entry + steps += 1 + if max_steps is not None and steps >= max_steps: + break + + def autoplay(self, rec: GameRecord) -> GameRecord: + """ + Run until completion/failure using the current policy (drains the stream). + """ + for _ in self.autoplay_stream(rec): + pass + return rec + diff --git a/berghain/default_policy.py b/berghain/default_policy.py new file mode 100644 index 0000000..e64da2a --- /dev/null +++ b/berghain/default_policy.py @@ -0,0 +1,78 @@ +# default_policy.py +# Built-in default policy for Berghain Challenge. +# +# Strategy: +# - If all minimums are satisfied, admit freely. +# - Admit creatives until quota met (they are rare). +# - Admit Berlin locals until quota met (they are required in bulk). +# - Otherwise admit if candidate helps at least one unmet need, +# provided there is still enough capacity to meet all others. +# - Reject if candidate does not help with unmet needs +# or would make quotas infeasible. +# +# The policy records a human-readable reason string for each decision, +# retrievable via policy_reason(). Your UI can display this directly. + +import sys + +DEBUG = False +_last_reason: str | None = None + + +def _set_reason(msg: str) -> None: + """Set the last decision reason (and echo to stderr if DEBUG).""" + global _last_reason + _last_reason = msg + if DEBUG: + print(msg, file=sys.stderr, flush=True) + + +def policy_reason() -> str | None: + """Return the most recent decision reason string.""" + return _last_reason + + +def decide(attributes: dict[str, bool], + tallies: dict[str, int], + mins: dict[str, int], + admitted_count: int, + venue_cap: int, + **_) -> bool: + """Return True to admit, False to reject.""" + S = venue_cap - admitted_count + need = {a: max(0, mins[a] - tallies.get(a, 0)) for a in mins} + + # 1. All quotas already met + if sum(need.values()) == 0: + _set_reason("Admit (all minimums satisfied)") + return True + + # 2. Admit creatives until quota met + if need.get("creative", 0) > 0 and attributes.get("creative", False): + _set_reason("Admit creative (rare + still needed)") + return True + + # 3. Admit locals until quota met + if need.get("berlin_local", 0) > 0 and attributes.get("berlin_local", False): + _set_reason("Admit berlin_local (large quota still unmet)") + return True + + # 4. Admit if candidate helps an unmet need and feasibility is safe + helps = [a for a in need if need[a] > 0 and attributes.get(a, False)] + if helps: + S_after = S - 1 + remaining_after = sum( + max(0, need[a] - (1 if attributes.get(a, False) else 0)) + for a in need + ) + if S_after >= remaining_after: + _set_reason(f"Admit (helps {', '.join(helps)}, feasibility ok)") + return True + else: + _set_reason("Reject (would break feasibility)") + return False + + # 5. Reject if they don’t help + _set_reason("Reject (no help to unmet needs)") + return False + diff --git a/berghain/policy.py b/berghain/policy.py new file mode 100644 index 0000000..86eabd5 --- /dev/null +++ b/berghain/policy.py @@ -0,0 +1,50 @@ +# policy.py (loader/utilities for user-supplied policy scripts) +from __future__ import annotations + +import importlib.util +import inspect +import os +from types import ModuleType +from typing import Callable, Any + + +class PolicyLoadError(Exception): + pass + + +def _import_module_from_path(path: str) -> ModuleType: + if not os.path.exists(path): + raise PolicyLoadError(f"Policy file not found: {path}") + spec = importlib.util.spec_from_file_location("user_policy", path) + if spec is None or spec.loader is None: + raise PolicyLoadError("Could not load policy module spec.") + mod = importlib.util.module_from_spec(spec) + try: + spec.loader.exec_module(mod) # type: ignore[reportAttributeAccessIssue] + except Exception as e: + raise PolicyLoadError(f"Error executing policy module: {e}") from e + return mod + +def default_policy_path() -> str: + return os.path.join(os.path.dirname(__file__), "default_policy.py") + +def load_policy(path: str) -> Callable[..., bool]: + """ + Load a user policy file and return the `decide` callable. + """ + mod = _import_module_from_path(path) + decide = getattr(mod, "decide", None) + if not callable(decide): + raise PolicyLoadError("Policy script must define a callable `decide(...)`.") + return decide + + +def call_policy(decide: Callable[..., Any], **kwargs: Any) -> Any: + """ + Call `decide` with only the parameters it accepts. + This lets us pass new optional context (like p, corr) without breaking old policies. + """ + sig = inspect.signature(decide) + accepted = {k: v for k, v in kwargs.items() if k in sig.parameters} + return decide(**accepted) + diff --git a/berghain/state.py b/berghain/state.py new file mode 100644 index 0000000..4d5f583 --- /dev/null +++ b/berghain/state.py @@ -0,0 +1,124 @@ +from __future__ import annotations + +from dataclasses import dataclass, field, asdict +from typing import Any, Dict, Tuple +import json +import os + +from .config import STATE_PATH + + +class GameStatus(str): + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + + +@dataclass +class Constraints: + mins: Dict[str, int] = field(default_factory=dict) + + +@dataclass +class AttributeStats: + relative_frequencies: Dict[str, float] = field(default_factory=dict) + correlations: Dict[str, Dict[str, float]] = field(default_factory=dict) + + +@dataclass +class GameRecord: + game_id: str + initial_raw: Dict[str, Any] | None = None + scenario: int | None = None + status: str = GameStatus.RUNNING + constraints: Constraints = field(default_factory=Constraints) + stats: AttributeStats = field(default_factory=AttributeStats) + admitted_count: int = 0 + rejected_count: int = 0 + next_person: Dict[str, Any] | None = None + tallies: Dict[str, int] = field(default_factory=dict) + policy_path: str | None = None # <— path to the user-provided policy script + + def bump_tallies(self, attributes: Dict[str, bool]) -> None: + for k, v in attributes.items(): + if v: + self.tallies[k] = self.tallies.get(k, 0) + 1 + + def raw(self) -> Dict[str, Any]: + return asdict(self) + + @classmethod + def from_dict(cls, d: Dict[str, Any]) -> "GameRecord": + constraints = Constraints(**(d.get("constraints") or {})) + stats = AttributeStats(**(d.get("stats") or {})) + return cls( + game_id=d["game_id"], + initial_raw=d.get("initial_raw"), + scenario=d.get("scenario"), + status=d.get("status", GameStatus.RUNNING), + constraints=constraints, + stats=stats, + admitted_count=d.get("admitted_count", 0), + rejected_count=d.get("rejected_count", 0), + next_person=d.get("next_person"), + tallies=d.get("tallies", {}) or {}, + policy_path=d.get("policy_path"), + ) + + +@dataclass +class GameState: + player_id: str | None = None + current_game_id: str | None = None + games: Dict[str, GameRecord] = field(default_factory=dict) + + def save(self) -> None: + payload = { + "player_id": self.player_id, + "current_game_id": self.current_game_id, + "games": {gid: g.raw() for gid, g in self.games.items()}, + } + os.makedirs(os.path.dirname(STATE_PATH), exist_ok=True) + with open(STATE_PATH, "w", encoding="utf-8") as f: + json.dump(payload, f, indent=2) + + @classmethod + def load(cls) -> "GameState": + try: + with open(STATE_PATH, "r", encoding="utf-8") as f: + raw = json.load(f) + except FileNotFoundError: + return cls() + gs = cls() + gs.player_id = raw.get("player_id") + gs.current_game_id = raw.get("current_game_id") + for gid, graw in (raw.get("games") or {}).items(): + gs.games[gid] = GameRecord.from_dict(graw) + return gs + + def add_game(self, rec: GameRecord) -> None: + self.games[rec.game_id] = rec + self.current_game_id = rec.game_id + + def current_game(self) -> GameRecord | None: + return self.games.get(self.current_game_id) if self.current_game_id else None + + def set_current_game(self, game_id_prefix: str) -> bool: + g = self.get_game(game_id_prefix) + if g: + self.current_game_id = g.game_id + return True + return False + + def get_game(self, game_id_prefix: str) -> GameRecord | None: + matches = [gid for gid in self.games if gid.startswith(game_id_prefix)] + if len(matches) == 1: + return self.games[matches[0]] + return None + + def list_games(self) -> Dict[str, Tuple[str, int | None, int, int]]: + return { + gid: (g.status, g.scenario, g.admitted_count, g.rejected_count) + for gid, g in self.games.items() + } + diff --git a/berghain/ui.py b/berghain/ui.py new file mode 100644 index 0000000..8d102aa --- /dev/null +++ b/berghain/ui.py @@ -0,0 +1,181 @@ +from __future__ import annotations + +import shutil +from typing import Deque, Dict, List + +# ANSI +RESET = "\x1b[0m" +BOLD = "\x1b[1m" +DIM = "\x1b[2m" +FG_RED = "\x1b[31m" +FG_GREEN = "\x1b[32m" +FG_CYAN = "\x1b[36m" +FG_YELLOW = "\x1b[33m" +ERASE_EOL = "\x1b[K" + +def _term_width() -> int: + return shutil.get_terminal_size(fallback=(80, 20)).columns + +def _bar(current: int, target: int, width: int, color: str = "", label: str = "", pad_to: int | None = None) -> str: + """ + Draw a bar and show " current/target" with current optionally right-aligned to pad_to digits. + """ + target = max(1, target) + ratio = min(1.0, max(0.0, current / target)) + filled = int(ratio * width) + bar = "[" + "#" * filled + "-" * (width - filled) + "]" + label_part = f" {label}" if label else "" + # right-align current to pad_to (defaults to len(str(target))) + w = pad_to if pad_to is not None else len(str(target)) + cur_s = str(current).rjust(w) + val = f"{cur_s}/{target}" + return f"{color}{bar}{RESET}{label_part} {DIM}{val}{RESET}" + +class ProgressiveRenderer: + """ + Flicker-free renderer that: + - Prints the whole block ONCE (with real newlines) and leaves the cursor + on the LAST line of the block (no extra newline). + - Updates by moving the cursor up to the top of the block, overwriting + ONLY changed lines, then returning to the bottom line (never beyond). + - The initial scaffold reserves blank lines for the constraints to avoid + label vs. label+bar duplication on some terminals. + """ + + def __init__(self, mins: Dict[str, int], venue_cap: int, reject_cap: int) -> None: + self.mins = dict(mins) + self.labels = sorted(self.mins.keys()) + self.venue_cap = venue_cap + self.reject_cap = reject_cap + + # Layout indices within the reserved block + self.recent_count = 5 + self.recent_title = 0 + self.recent_lines_start = 1 # 5 lines: [1..5] + self.recent_lines_end = self.recent_lines_start + self.recent_count - 1 # 5 + + self.constraints_header = self.recent_lines_end + 1 + 1 # +1 spacer + self.constraints_start = self.constraints_header + 1 + self.constraints_lines = len(self.labels) + self.constraints_end = self.constraints_start + self.constraints_lines - 1 + + self.totals_header = self.constraints_end + 1 + 1 + self.total_admit_line = self.totals_header + 1 + self.total_reject_line = self.totals_header + 2 + + self.total_lines = self.total_reject_line + 1 # number of lines in block + + self._cache: Dict[int, str] = {} # last rendered content per line + + term = _term_width() + self.bar_inner_width = max(10, min(50, term - 30)) + + # Keep the cursor "inside" the block; we park it on the last line. + self._cur_line = self.total_lines - 1 + self._begun = False + + # ---- cursor movement helpers (relative to *current* position) ---- + + def _move_to_top(self) -> None: + if self._cur_line > 0: + print(f"\x1b[{self._cur_line}F", end="") # up to BOL of top line + self._cur_line = 0 + else: + print("\r", end="") + + def _move_to_line(self, target: int) -> None: + delta = target - self._cur_line + if delta > 0: + print(f"\x1b[{delta}E", end="") # down to BOL of target + elif delta < 0: + print(f"\x1b[{-delta}F", end="") # up to BOL of target + else: + print("\r", end="") + self._cur_line = target + + def _write_line(self, text: str) -> None: + print("\r" + text + ERASE_EOL, end="") + + def _return_to_bottom(self) -> None: + if self._cur_line < self.total_lines - 1: + print(f"\x1b[{(self.total_lines - 1) - self._cur_line}E", end="") + self._cur_line = self.total_lines - 1 + print("\r", end="", flush=True) + + # ---- public API ---- + + def begin(self) -> None: + if self._begun: + return + # Print the static block once. IMPORTANT: the constraint lines are BLANK here. + lines: List[str] = [] + # Recent + lines.append(f"{BOLD}Recent decisions{RESET}") + for _ in range(self.recent_count): + lines.append("") + # spacer + lines.append("") + # Constraints header + blank lines (reserved) + lines.append(f"{BOLD}Constraints{RESET}") + for _ in self.labels: + lines.append("") # reserve a blank line for each constraint + # spacer + lines.append("") + # Totals + lines.append(f"{BOLD}Totals{RESET}") + lines.append("") # admitted bar + lines.append("") # rejected count + + for i, txt in enumerate(lines): + end = "" if i == len(lines) - 1 else "\n" + print(txt, end=end) + self._cache[i] = txt + + self._cur_line = self.total_lines - 1 + print("\r", end="", flush=True) + self._begun = True + + def render(self, recent: Deque[str], tallies: Dict[str, int], + admitted: int, rejected: int) -> None: + """ + Update the block with current data, overwriting only changed lines. + Leaves the cursor on the last line of the block (BOL). + """ + desired: Dict[int, str] = {} + + # Recent (bottom-aligned) — we now expect `recent` to contain + # human-readable policy reasons, not raw attribute dumps. + pad = max(0, self.recent_count - len(recent)) + recent_padded = [""] * pad + list(recent) + desired[self.recent_title] = f"{BOLD}Recent decisions{RESET}" + for i in range(self.recent_count): + desired[self.recent_lines_start + i] = recent_padded[i] + + # Constraints (full line: label + bar) + desired[self.constraints_header] = f"{BOLD}Constraints{RESET}" + for i, a in enumerate(self.labels): + have = tallies.get(a, 0) + need = self.mins[a] + color = FG_CYAN if have < need else FG_YELLOW + bar = _bar(have, need, self.bar_inner_width, color, pad_to=len(str(need))) + desired[self.constraints_start + i] = f"{a:>16}: {bar}" + + # Totals (right-aligned numbers) + desired[self.totals_header] = f"{BOLD}Totals{RESET}" + desired[self.total_admit_line] = _bar( + admitted, self.venue_cap, self.bar_inner_width, + FG_GREEN, "admitted", pad_to=len(str(self.venue_cap)) + ) + rej_w = len(str(self.reject_cap)) + desired[self.total_reject_line] = f"{FG_RED}rejected{RESET} {DIM}{str(rejected).rjust(rej_w)}{RESET}" + + # Minimal redraw + self._move_to_top() + for idx in range(self.total_lines): + new_text = desired.get(idx, "") + if self._cache.get(idx) != new_text: + self._move_to_line(idx) + self._write_line(new_text) + self._cache[idx] = new_text + self._return_to_bottom() + diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9f844f3 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,11 @@ +[project] +name = "berghain" +version = "0.1.1" +description = "berghain challenge client" +dependencies = [ + "docopt>=0.6", + "requests>=2.31", +] + +[project.scripts] +berghain = "berghain.cli:main"