From c6f059046490241d27eab13433cb86a705a0cf09 Mon Sep 17 00:00:00 2001 From: kay Date: Wed, 3 Sep 2025 03:34:30 -0700 Subject: [PATCH] added policies --- berghain/cli.py | 8 +- berghain/core.py | 4 +- policies/newpolicy.py | 134 +++++++++++++++++++++++++ policies/policy-new.py | 122 +++++++++++++++++++++++ policies/policy.py | 28 ++++++ policies/policy2.py | 80 +++++++++++++++ policies/policy3.py | 92 ++++++++++++++++++ policies/policy4.py | 154 +++++++++++++++++++++++++++++ policies/policy5.py | 215 +++++++++++++++++++++++++++++++++++++++++ policies/policy6.py | 184 +++++++++++++++++++++++++++++++++++ 10 files changed, 1016 insertions(+), 5 deletions(-) create mode 100644 policies/newpolicy.py create mode 100644 policies/policy-new.py create mode 100644 policies/policy.py create mode 100644 policies/policy2.py create mode 100644 policies/policy3.py create mode 100644 policies/policy4.py create mode 100644 policies/policy5.py create mode 100644 policies/policy6.py diff --git a/berghain/cli.py b/berghain/cli.py index d4bac7e..ee3e809 100644 --- a/berghain/cli.py +++ b/berghain/cli.py @@ -33,7 +33,7 @@ Usage: Options: -h --help Show this screen. --version Show version. - --scenario= Scenario number (1, 2, or 3). [default: 1] + --scenario= Scenario number (1, 2, or 3). --interactive Pause for Enter between steps. --verbose Send raw transcript lines to stderr (policy debug prints too). """ @@ -135,7 +135,7 @@ def cmd_set_player(player: str) -> int: def cmd_new_game(scenario: int) -> int: bc = BerghainClient() - if scenario not in (1, 2, 3): + if scenario not in (-1, 1, 2, 3): print("--scenario must be 1, 2, or 3.", file=sys.stderr) return 2 rec = bc.new_game(scenario=scenario) @@ -358,8 +358,8 @@ def main(argv: Optional[list[str]] = None) -> int: if args["set-player"]: return cmd_set_player(args[""]) elif args["new"]: - scen = int(args["--scenario"] or 1) - if scen not in (1, 2, 3): + scen = int(args["--scenario"] or -1) + if scen not in (-1, 1, 2, 3): print("--scenario must be 1, 2, or 3.", file=sys.stderr) return 2 return cmd_new_game(scen) diff --git a/berghain/core.py b/berghain/core.py index f15a48a..7e39745 100644 --- a/berghain/core.py +++ b/berghain/core.py @@ -29,9 +29,11 @@ class BerghainClient: Starts a new game (policy is optional and may be attached later). Returns the created GameRecord. """ + prev = self.gs.current_game() + scenario = prev.scenario if scenario == -1 else scenario + resp = self.api.new_game(self.gs.player_id, scenario) - prev = self.gs.current_game() inherit_path = getattr(prev, "policy_path", None) if prev else None policy_path = inherit_path or default_policy_path() diff --git a/policies/newpolicy.py b/policies/newpolicy.py new file mode 100644 index 0000000..adfac9c --- /dev/null +++ b/policies/newpolicy.py @@ -0,0 +1,134 @@ +# policy.py — resume-safe, no module state, overlap-aware +# +# Key fixes for your stuck run: +# • Uses a *derived lower bound* for the required (german_speaker ∩ international) +# overlap: GI_MIN = max(0, minG + minI - CAP); current GI >= max(0, G + I - admitted). +# This avoids freezing when swapping policies mid-game. +# • Hard gates: +# (1) helper-only, (2) per-attribute feasibility, (3) GI-overlap feasibility +# using the lower-bound (safe, never overestimates what's required). +# • Pacing guard: while any attribute is behind its on-track pace, only admit +# candidates that help at least one behind attribute. +# • Must-take guard: if S <= need[a] for any attribute a, accept anyone who has a. +# +# Drop-in compatible with the Berghain runner. Flip DEBUG=True for reasoning logs. + +import sys +from math import ceil +from typing import Dict, List + +DEBUG = False + +# Attribute names for Scenario 3 +U = "underground_veteran" +I = "international" +F = "fashion_forward" +Q = "queer_friendly" +V = "vinyl_collector" +G = "german_speaker" + +# Small slack so we don't thrash when exactly on pace +PACE_SLACK = 0 + +def _log(msg: str) -> None: + if DEBUG: + print(msg, file=sys.stderr, flush=True) + +def _need(mins: Dict[str, int], tallies: Dict[str, int]) -> Dict[str, int]: + return {a: max(0, mins[a] - tallies.get(a, 0)) for a in mins} + +def _helpers(attrs: Dict[str, bool], need: Dict[str, int]) -> List[str]: + return [a for a, n in need.items() if n > 0 and attrs.get(a, False)] + +def _feasible_per_attr(attrs: Dict[str, bool], need: Dict[str, int], S: int) -> bool: + # After admitting, S_after seats must cover all remaining shortfalls individually. + S_after = S - 1 + for a, n in need.items(): + rem = n - (1 if attrs.get(a, False) else 0) + if rem < 0: + rem = 0 + if S_after < rem: + return False + return True + +def _gi_both_lb(tallies: Dict[str, int], admitted_count: int) -> int: + # Lower bound on how many admitted are both German and International + g = tallies.get(G, 0) + i = tallies.get(I, 0) + return max(0, g + i - admitted_count) + +def _feasible_gi_overlap(attrs: Dict[str, bool], + mins: Dict[str, int], + tallies: Dict[str, int], + admitted_count: int, + venue_cap: int, + S: int) -> bool: + # Enforce feasibility for the *required* G∩I overlap using the *lower bound* so we never over-block. + gi_min = max(0, mins.get(G, 0) + mins.get(I, 0) - venue_cap) + if gi_min <= 0: + return True + lb_now = _gi_both_lb(tallies, admitted_count) + lb_after = lb_now + (1 if (attrs.get(G, False) and attrs.get(I, False)) else 0) + need_after = max(0, gi_min - lb_after) + S_after = S - 1 + return S_after >= need_after + +def _pace_behind(mins: Dict[str, int], tallies: Dict[str, int], admitted_count: int, cap: int) -> List[str]: + # Attributes behind linear pace: ceil(min * admitted / cap) + behind = [] + for a, m in mins.items(): + want = ceil(m * admitted_count / max(1, cap)) + have = tallies.get(a, 0) + if have + PACE_SLACK < want: + behind.append(a) + return behind + +def decide(attributes: Dict[str, bool], + tallies: Dict[str, int], + mins: Dict[str, int], + admitted_count: int, + venue_cap: int) -> bool: + + S = venue_cap - admitted_count # remaining seats + need = _need(mins, tallies) + total_need = sum(need.values()) + + # If all minima are satisfied, admit everyone. + if total_need == 0: + _log("All minima met -> admit") + return True + + # Must help at least one unmet attribute. + helps = _helpers(attributes, need) + if not helps: + _log("Reject: non-helper") + return False + + # Hard feasibility gates + if not _feasible_per_attr(attributes, need, S): + _log("Reject: per-attribute feasibility would break") + return False + if not _feasible_gi_overlap(attributes, mins, tallies, admitted_count, venue_cap, S): + _log("Reject: GI-overlap feasibility (lower-bound) would break") + return False + + # Must-take guard: if S <= need[a] for any attribute a, we must accept anyone who has a. + must_take = [a for a, n in need.items() if n > 0 and S <= n] + if must_take and any(attributes.get(a, False) for a in must_take): + _log(f"Accept: must-take for {must_take}") + return True + + # Pacing guard: while any attribute is behind its on-track count, only take helpers for those behind. + behind = _pace_behind(mins, tallies, admitted_count, venue_cap) + if behind: + if any(a in behind for a in helps): + _log(f"Accept: pacing helper for behind={behind}") + return True + else: + _log(f"Reject: pacing guard (behind={behind}); candidate helps {helps} none behind") + return False + + # On pace: admit any feasible helper (GI overlap protected by the hard gate). + _log(f"Accept: on-pace helper {helps}") + return True + diff --git a/policies/policy-new.py b/policies/policy-new.py new file mode 100644 index 0000000..69be43e --- /dev/null +++ b/policies/policy-new.py @@ -0,0 +1,122 @@ +# default_policy.py +# Pressure-gated policy: every admit must carry at least one statistically-scarce attribute +# until quotas are back on track. Designed to prevent over-admitting "easy" attributes while +# 'creative' and 'berlin_local' lag. +# +# Use policy_reason() in your UI to display decision explanations. + +from __future__ import annotations +import sys +from typing import Dict + +DEBUG = False +_last_reason: str | None = None + +def _set_reason(msg: str) -> None: + global _last_reason + _last_reason = msg + if DEBUG: + print(msg, file=sys.stderr, flush=True) + +def policy_reason() -> str | None: + return _last_reason + +# --- Tunables --- +PRESSURE_SLACK = 0.0 # >0 softens the must-have gate slightly (e.g., 0.5) +PACING_START = 20 # ignore pacing/pressure noise for the first N admits +FEASIBILITY_SLACK = 0 # extra slack for the feasibility guard + +# If runner doesn't pass base frequencies, use conservative fallbacks +DEFAULT_P = { + "creative": 0.06, + "berlin_local": 0.40, + "techno_lover": 0.60, + "well_connected": 0.45, +} + +ORDER = ("creative", "berlin_local", "techno_lover", "well_connected") + +def _need(mins: Dict[str,int], tallies: Dict[str,int]) -> Dict[str,int]: + return {a: max(0, int(mins.get(a, 0)) - int(tallies.get(a, 0))) for a in mins} + +def _p(p: Dict[str,float] | None, a: str) -> float: + v = (p or {}).get(a, DEFAULT_P.get(a, 0.0)) + try: + return max(0.0, min(1.0, float(v))) + except Exception: + return DEFAULT_P.get(a, 0.0) + +def decide(attributes: Dict[str, bool], + tallies: Dict[str, int], + mins: Dict[str, int], + admitted_count: int, + venue_cap: int, + p: Dict[str, float] | None = None, + **_) -> bool: + + has = lambda a: bool(attributes.get(a, False)) + + S = venue_cap - admitted_count + need = _need(mins, tallies) + total_need = sum(need.values()) + + # 0) All mins satisfied -> admit freely + if total_need == 0: + _set_reason("Admit (all minimums satisfied)") + return True + + # Ensure candidate helps at least one unmet attribute + helps = [a for a in ORDER if need.get(a, 0) > 0 and has(a)] + if not helps: + _set_reason("Reject (no contribution to unmet attributes)") + return False + + # Early game: admit any helper while counts are tiny to avoid noise + if admitted_count < PACING_START: + _set_reason(f"Admit (early game; helps {', '.join(helps)})") + return True + + # 1) Compute base probs and PRESSURE = shortfall vs expected future supply + P = {a: _p(p, a) for a in ORDER} + S_after = max(0, S - 1) + + pressure: Dict[str, float] = {} + for a in ORDER: + # remaining need after counting this candidate (if they have 'a') + rem_after = max(0, need.get(a, 0) - (1 if has(a) else 0)) + # expected future supply after this admit + exp_after = S_after * P[a] + pressure[a] = rem_after - exp_after # >0 means we're short in expectation + + # 2) Build MUST set: attributes whose pressure exceeds slack (i.e., statistically scarce) + must = [a for a in ORDER if pressure[a] > PRESSURE_SLACK] + + if must: + # Require the candidate to carry at least one MUST attribute + if not any(has(a) for a in must): + _set_reason("Reject (must carry a scarce attribute: " + ", ".join(must) + ")") + return False + + # 3) Feasibility: after admitting, do we still have enough slots to finish? + total_need_after = sum( + max(0, need.get(a, 0) - (1 if has(a) else 0)) for a in ORDER + ) + if S_after + FEASIBILITY_SLACK < total_need_after: + _set_reason("Reject (feasibility would break after admitting)") + return False + + # 4) Passed gates -> admit, with crisp reason + if must: + # Preferential message by priority + for a in ("creative", "berlin_local"): + if a in must and has(a): + _set_reason(f"Admit (scarce: {a}; pressure {pressure[a]:.1f})") + return True + # else any must-have present + found = [a for a in must if has(a)] + _set_reason("Admit (meets scarce requirement: " + ", ".join(found) + ")") + return True + else: + _set_reason(f"Admit (on pace; helps {', '.join(helps)})") + return True + diff --git a/policies/policy.py b/policies/policy.py new file mode 100644 index 0000000..1fe07f1 --- /dev/null +++ b/policies/policy.py @@ -0,0 +1,28 @@ +# Generated by berghain CLI: policy template +# You can customize this however you like. The only requirement is that +# a function named `decide` is defined and returns a bool. + +def decide(attributes: dict[str, bool], + tallies: dict[str, int], + mins: dict[str, int], + admitted_count: int, + venue_cap: int) -> bool: + """A safe default 'pacing' policy. + + Logic: + 1) Admit if the person has ANY still-needed attribute. + 2) Else, admit only if admitting them will still leave enough slots to + finish remaining minimums: (remaining_slots - 1) >= total_remaining_shortfall. + 3) If all mins are satisfied, admit everyone. + """ + remaining_slots = venue_cap - admitted_count + remaining_need = {a: max(0, mins.get(a, 0) - tallies.get(a, 0)) for a in mins} + total_need = sum(remaining_need.values()) + + if total_need == 0: + return True + + if any(attributes.get(a, False) and remaining_need[a] > 0 for a in mins): + return True + + return (remaining_slots - 1) >= total_need diff --git a/policies/policy2.py b/policies/policy2.py new file mode 100644 index 0000000..f48dc7f --- /dev/null +++ b/policies/policy2.py @@ -0,0 +1,80 @@ +# policy.py +# Scenario policy for: +# techno_lover ≥ 650 +# well_connected ≥ 450 +# creative ≥ 300 +# berlin_local ≥ 750 +# +# Strategy: +# 1) If all mins are satisfied -> admit everyone. +# 2) Otherwise, admit ONLY candidates who contribute to at least one unmet attribute. +# 3) Before admitting, run a per-attribute feasibility check: +# After this admission, for every attribute 'a', the remaining slots (S-1) +# must be ≥ the remaining shortfall for 'a' AFTER counting this candidate. +# This reserves enough capacity to finish all quotas, avoiding the earlier failure +# where locals/creatives were starved out. +# This is stricter than a sum-of-needs check and blocks “useless” admits early. +# +# Logging hook: +# Set DEBUG=True to emit per-decision reasoning to stderr. +# Works well with: `berghain play --verbose` or `berghain step --verbose`. + +import sys + +DEBUG = False # flip to True for reasoning prints + + +def _log(msg: str) -> None: + if DEBUG: + print(msg, file=sys.stderr, flush=True) + + +def decide(attributes: dict[str, bool], + tallies: dict[str, int], + mins: dict[str, int], + admitted_count: int, + venue_cap: int) -> bool: + S = venue_cap - admitted_count + need = {a: max(0, mins[a] - tallies.get(a, 0)) for a in mins} + total_need = sum(need.values()) + + # 1) If all constraints are satisfied, admit freely. + if total_need == 0: + _log("All mins met -> admit") + return True + + # Which unmet attributes would this candidate help? + helps = [a for a in need if need[a] > 0 and attributes.get(a, False)] + + # 2) If they don't help any unmet attribute, reject (avoid wasting scarce slots). + if not helps: + _log("Helps none of the unmet attributes -> reject") + return False + + # 3) Feasibility check per attribute (after admitting this candidate). + # For every attribute 'a', we must have (S-1) >= max(0, need[a] - (1 if candidate has 'a' else 0)) + # This guarantees we never consume a slot that makes any target impossible. + S_after = S - 1 + for a in need: + remaining = need[a] + if attributes.get(a, False): + remaining = max(0, remaining - 1) + if S_after < remaining: + _log(f"Admitting would break feasibility for '{a}': S-1={S_after} < need'={remaining} -> reject") + return False + + # Optional bias: since 'creative' is very rare, note when we’re behind and admit them eagerly. + if "creative" in need and need["creative"] > 0 and attributes.get("creative", False): + _log(f"Accept (creative still needed: {need['creative']})") + return True + + # Optional bias: locals are heavily required; prefer local if still needed. + if "berlin_local" in need and need["berlin_local"] > 0 and attributes.get("berlin_local", False): + _log(f"Accept (berlin_local still needed: {need['berlin_local']})") + return True + + # If we reached here, the candidate helps (at least one unmet attribute) + # and passes feasibility for ALL attributes — safe to admit. + _log(f"Accept (helps {helps}; all feasibility checks passed)") + return True + diff --git a/policies/policy3.py b/policies/policy3.py new file mode 100644 index 0000000..b94f813 --- /dev/null +++ b/policies/policy3.py @@ -0,0 +1,92 @@ +# policy.py +# Scenario-agnostic admission policy for Berghain bouncer game. +# +# Works for any set of min-count constraints and a 1000-cap venue. +# +# Core idea (optimality & guarantee): +# • Define current "need[a]" for each attribute a (how many more we must admit). +# • Let S be the remaining slots (capacity - admitted_so_far). +# • A candidate is a "helper" if they have at least one attribute with need[a] > 0. +# • We accept a candidate iff: +# (i) they help (avoid wasting scarce slots), and +# (ii) after admitting them, for every attribute 'a', +# the remaining slots (S-1) are still >= the remaining shortfall for 'a'. +# +# This per-attribute feasibility check is stricter than a total-need check and +# prevents dead-ends (e.g., starving a rare-but-required attribute). +# +# Why this minimizes rejections: +# • Among all policies that never violate feasibility, accepting every feasible +# helper immediately maximizes the admission rate. Rejecting any such candidate +# only delays filling the club and can never reduce the number of rejections +# needed to meet the same constraints (you'll still need to admit at least the +# same multisets of helpers to hit all minima). So this greedy rule attains the +# minimum possible rejections subject to feasibility. +# +# Practical notes: +# • Once all mins are satisfied, we admit everyone (no reason to reject). +# • If multiple attributes remain unmet, a multi-attribute helper is naturally +# accepted (it passes feasibility more easily and reduces multiple needs). +# • Extremely rare attributes are handled automatically because we refuse to +# spend slots on non-helpers while those needs exist. +# +# Logging: +# • Set DEBUG=True to print reasoning to stderr (useful with --verbose runs). + +import sys +from typing import Dict + +DEBUG = False # flip to True for detailed decision logging + + +def _log(msg: str) -> None: + if DEBUG: + print(msg, file=sys.stderr, flush=True) + + +def decide(attributes: Dict[str, bool], + tallies: Dict[str, int], + mins: Dict[str, int], + admitted_count: int, + venue_cap: int) -> bool: + """ + Return True to admit, False to reject. + """ + # Remaining slots + S = venue_cap - admitted_count + + # Current unmet need per attribute + need = {a: max(0, mins[a] - tallies.get(a, 0)) for a in mins} + total_need = sum(need.values()) + + # If all minima are already satisfied, admit freely. + if total_need == 0: + _log("All minima satisfied -> admit") + return True + + # Determine which unmet attributes this candidate helps. + helps = [a for a, n in need.items() if n > 0 and attributes.get(a, False)] + + # If they don't help any unmet attribute, reject to avoid wasting a scarce slot. + if not helps: + _log("Helps none of the unmet attributes -> reject") + return False + + # Feasibility check (per attribute) AFTER admitting this candidate. + # For each attribute 'a', remaining shortfall cannot exceed remaining slots (S-1). + S_after = S - 1 + for a, n in need.items(): + rem = n - (1 if attributes.get(a, False) else 0) + if rem < 0: + rem = 0 + if S_after < rem: + _log(f"Reject: admitting would break feasibility for '{a}' " + f"(S_after={S_after} < rem_need={rem})") + return False + + # Optional priority logging (no behavioral change needed for optimality): + # Track how many unmet attributes the candidate covers. + k = sum(1 for a in helps) + _log(f"Accept: helps {helps} (k={k}); all feasibility checks passed") + return True + diff --git a/policies/policy4.py b/policies/policy4.py new file mode 100644 index 0000000..b8c1bc6 --- /dev/null +++ b/policies/policy4.py @@ -0,0 +1,154 @@ +# policy.py — “rare-first, German-safe” strategy for Scenario 3 +# +# Goal: +# Hit all minima as early as possible by prioritizing the bottlenecks +# (queer_friendly, vinyl_collector) and protecting the big German quota, +# while never painting ourselves into a corner. +# +# Hard guarantees: +# • Per-attribute feasibility check on every decision (no dead-ends). +# • Reject anyone who doesn’t help an unmet attribute (“non-helper”). +# • Once all minima are satisfied, admit everyone. +# +# Bias (what “does this”): +# • While any of {queer_friendly, vinyl_collector, german_speaker} are unmet: +# 1) Admit if candidate has queer_friendly or vinyl_collector (rare). +# 2) Else admit if candidate has german_speaker (protect 800 target), +# BUT stay feasibility-safe for other mins (esp. international). +# 3) Otherwise reject even if they help a non-priority min, unless that +# non-priority attribute has become *critical* by scarcity. +# • Exploit overlaps: queer∩vinyl, queer/vinyl with German are admitted eagerly. +# • When rare quotas are safe and German scarcity ≤ 1, admit other helpers. +# +# Notes: +# • This policy may reject some feasible low-priority helpers early to keep +# headroom aligned with the binding quotas, as requested. +# • DEBUG=True prints reasoning to stderr. + +import sys +from typing import Dict + +DEBUG = False # set True for verbose reasoning + +# Scenario 3 stats (iid arrivals) +P = { + "underground_veteran": 0.6795, + "international": 0.5735, + "fashion_forward": 0.6910, + "queer_friendly": 0.04614, + "vinyl_collector": 0.04454, + "german_speaker": 0.4565, +} + +# Priority groups +RARE = ("queer_friendly", "vinyl_collector") +GUARD = "german_speaker" # strongly constrained and conflicts with "international" + +# Scarcity thresholds +CRITICAL_SCARCITY = 1.0 # need exceeds expected supply in remaining slots +ALMOST_SAFE_SCARCITY = 0.9 # used to check if rare quotas are basically safe + + +def _log(msg: str) -> None: + if DEBUG: + print(msg, file=sys.stderr, flush=True) + + +def _need(mins: Dict[str, int], tallies: Dict[str, int]) -> Dict[str, int]: + return {a: max(0, mins[a] - tallies.get(a, 0)) for a in mins} + + +def _scarcity(need: Dict[str, int], S: int) -> Dict[str, float]: + # Scarcity = remaining shortfall divided by expected remaining supply. + # (>1 means we're behind expectation; <1 means we are ahead/on track.) + s = {} + for a, n in need.items(): + exp = max(1e-9, P.get(a, 0.0) * S) + s[a] = (n / exp) if n > 0 else 0.0 + return s + + +def _feasible_after_admit(attributes: Dict[str, bool], + need: Dict[str, int], + S: int) -> bool: + # After admitting this candidate we have S_after slots; for every attribute a, + # the remaining shortfall must fit in those slots. + S_after = S - 1 + for a, n in need.items(): + rem = n - (1 if attributes.get(a, False) else 0) + if rem < 0: + rem = 0 + if S_after < rem: + return False + return True + + +def decide(attributes: Dict[str, bool], + tallies: Dict[str, int], + mins: Dict[str, int], + admitted_count: int, + venue_cap: int) -> bool: + # Remaining slots + S = venue_cap - admitted_count + + # Unmet needs & scarcity + need = _need(mins, tallies) + total_need = sum(need.values()) + + # If all minima satisfied, admit everyone. + if total_need == 0: + _log("All minima met -> admit") + return True + + # Helper set = unmet attributes the candidate has + helps = [a for a, n in need.items() if n > 0 and attributes.get(a, False)] + if not helps: + _log("Non-helper -> reject") + return False + + # Feasibility is a hard gate + if not _feasible_after_admit(attributes, need, S): + _log("Reject: would break per-attribute feasibility") + return False + + # Scarcity picture + sc = _scarcity(need, S) + + # Are any of the "rare or guard" quotas still unmet? + rare_unmet = any(need[a] > 0 for a in RARE) + guard_unmet = need.get(GUARD, 0) > 0 + in_priority_phase = rare_unmet or guard_unmet + + # If we're in the priority phase, apply the requested bias: + if in_priority_phase: + # 1) If candidate helps a rare attribute, admit. + if any(attributes.get(a, False) and need.get(a, 0) > 0 for a in RARE): + _log(f"Accept: rare helper { [a for a in RARE if attributes.get(a, False)] }") + return True + + # 2) Else if candidate helps German (guard), admit (feasibility already checked). + if attributes.get(GUARD, False) and guard_unmet: + _log("Accept: german_speaker to protect 800 target") + return True + + # 3) Else candidate helps only non-priority mins. Normally reject here to + # keep headroom for rare/German — UNLESS that non-priority attribute + # itself is now critical by scarcity. + # (This preserves viability for e.g. international/fashion when genuinely at risk.) + critical_nonprio = [ + a for a in helps + if (a not in RARE and a != GUARD and sc.get(a, 0.0) >= CRITICAL_SCARCITY) + ] + if critical_nonprio: + _log(f"Accept: non-priority helper {critical_nonprio} is critical by scarcity") + return True + + # Otherwise reject to conserve headroom for rare/German during priority phase. + _log(f"Reject: helper only for non-priority {helps} while rare/guard unmet") + return False + + # If not in priority phase (rare met and German safe), admit any helper that keeps feasibility. + # "German safe" defined implicitly by leaving priority phase; still preserve feasibility. + _log(f"Accept: post-priority helper {helps}") + return True + diff --git a/policies/policy5.py b/policies/policy5.py new file mode 100644 index 0000000..6e49ff3 --- /dev/null +++ b/policies/policy5.py @@ -0,0 +1,215 @@ +# policy.py — ultra-selective front-loading with a hard “GI-both” (German∩International) reserve +# +# Problem you hit: +# Early admits were too loose, starving the truly binding quotas later +# (queer_friendly, vinyl_collector, and the necessary overlap of +# german_speaker ∩ international). +# +# What this does: +# 1) Helper-only + strict feasibility (can’t paint into a corner). +# 2) A **virtual required-overlap** constraint for (german_speaker, international): +# GI_BOTH_MIN = max(0, minG + minI − CAP) = 450 in Scenario 3. +# We maintain a live counter of how many “GI-both” admits we’ve made and +# run a feasibility check against the remaining seats (hard gate). +# 3) **Front-load gating**: +# • While GI-both is behind its *own* pace, admit ONLY: +# – GI-both candidates, OR +# – Rare helpers: queer_friendly or vinyl_collector. +# (No german-only, no international-only, no “easy” helpers.) +# • If GI-both is on pace but (queer_friendly or vinyl_collector) is behind, +# admit ONLY rare helpers (or GI-both). +# 4) After the front-load phase (GI-both pace OK and rare OK), relax to helper-only +# (still blocking anything that would break either base feasibility or GI-both reserve). +# +# Result: +# Early acceptance rate is intentionally low (lots of rejections) until the +# binding quotas are on track; this keeps headroom for scarce attributes and +# the GI overlap you *must* accumulate to hit both 800 Germans and 650 +# Internationals in only 1000 seats. +# +# Toggle DEBUG=True for per-decision logs. + +import sys +from math import ceil +from typing import Dict, List + +DEBUG = False # set True for verbose reasoning + +# ---- Scenario 3 attribute names ---- +U = "underground_veteran" +I = "international" +F = "fashion_forward" +Q = "queer_friendly" +V = "vinyl_collector" +G = "german_speaker" + +PRIORITY_RARE = (Q, V) # truly scarce attributes to front-load +PRIORITY_GI_PAIR = (G, I) # the negative-correlated pair requiring a big overlap + +# Small pacing slack so we don't thrash on single-count fluctuations. +PACE_SLACK = 0 + +# Module-level state to count admitted intersections we care about. +_GI_BOTH_ADMITTED = 0 # count of admits who were BOTH german_speaker AND international + + +def _log(msg: str) -> None: + if DEBUG: + print(msg, file=sys.stderr, flush=True) + + +def _need(mins: Dict[str, int], tallies: Dict[str, int]) -> Dict[str, int]: + return {a: max(0, mins[a] - tallies.get(a, 0)) for a in mins} + + +def _helpers(attributes: Dict[str, bool], need: Dict[str, int]) -> List[str]: + # Attributes this candidate has that are still unmet + return [a for a, n in need.items() if n > 0 and attributes.get(a, False)] + + +def _pace_required(total_target: int, admitted_count: int, cap: int) -> int: + # On-track count by now for a quota of 'total_target' + return ceil(total_target * admitted_count / max(1, cap)) + + +def _feasible_base(attributes: Dict[str, bool], need: Dict[str, int], remaining_slots: int) -> bool: + # Classic per-attribute feasibility: after admitting, each remaining shortfall fits in S_after. + S_after = remaining_slots - 1 + for a, n in need.items(): + rem = n - (1 if attributes.get(a, False) else 0) + if rem < 0: + rem = 0 + if S_after < rem: + return False + return True + + +def _feasible_gi_both(is_gi_both: bool, gi_both_min: int, remaining_slots: int) -> bool: + # Virtual feasibility for required overlap: after admitting, there must be + # enough seats to still acquire the remaining GI-both admits. + global _GI_BOTH_ADMITTED + have_after = _GI_BOTH_ADMITTED + (1 if is_gi_both else 0) + need_after = max(0, gi_both_min - have_after) + S_after = remaining_slots - 1 + return S_after >= need_after + + +def decide(attributes: Dict[str, bool], + tallies: Dict[str, int], + mins: Dict[str, int], + admitted_count: int, + venue_cap: int) -> bool: + global _GI_BOTH_ADMITTED + + S = venue_cap - admitted_count # remaining seats + need = _need(mins, tallies) + total_need = sum(need.values()) + + # Admit freely once all minima are met. + if total_need == 0: + _log("All minima satisfied -> admit") + return True + + # Candidate properties + hasG = bool(attributes.get(G, False)) + hasI = bool(attributes.get(I, False)) + hasQ = bool(attributes.get(Q, False)) + hasV = bool(attributes.get(V, False)) + is_GI_both = hasG and hasI + + # Must help at least one STILL-UNMET base attribute. + helps = _helpers(attributes, need) + if not helps: + _log("Reject: non-helper") + return False + + # --- Hard feasibility gates --- + # 1) Base (per-attribute) feasibility + if not _feasible_base(attributes, need, S): + _log("Reject: would break per-attribute feasibility") + return False + + # 2) Virtual GI-both feasibility + gi_both_min_total = max(0, mins.get(G, 0) + mins.get(I, 0) - venue_cap) # = 450 here + if gi_both_min_total > 0 and not _feasible_gi_both(is_GI_both, gi_both_min_total, S): + _log("Reject: would break GI-both overlap feasibility") + return False + + # --- Pacing (front-load) guards --- + # On-track counts by now for the truly binding quotas and the GI-both overlap. + q_pace = _pace_required(mins.get(Q, 0), admitted_count, venue_cap) + v_pace = _pace_required(mins.get(V, 0), admitted_count, venue_cap) + g_pace = _pace_required(mins.get(G, 0), admitted_count, venue_cap) + gi_both_pace = _pace_required(gi_both_min_total, admitted_count, venue_cap) + + q_have = tallies.get(Q, 0) + v_have = tallies.get(V, 0) + g_have = tallies.get(G, 0) + gi_both_have = _GI_BOTH_ADMITTED # exact, tracked by us + + behind_Q = (q_have + PACE_SLACK) < q_pace + behind_V = (v_have + PACE_SLACK) < v_pace + behind_GI_both = (gi_both_have + PACE_SLACK) < gi_both_pace + behind_G = (g_have + PACE_SLACK) < g_pace + + # Priority front-load order: + # 1) GI-both pace + # 2) Rare (Q, V) pace + # 3) German pace (ONLY when GI-both is on pace) + if behind_GI_both: + # While GI-both is behind, ONLY admit GI-both or rare helpers. + if is_GI_both: + _log("Accept: GI-both behind -> take GI-both") + _GI_BOTH_ADMITTED += 1 + return True + if (hasQ and need.get(Q, 0) > 0) or (hasV and need.get(V, 0) > 0): + _log("Accept: GI-both behind -> take rare (Q/V)") + return True + _log("Reject: GI-both behind -> conserve seat (no GI-both, no rare)") + return False + + # GI-both is on pace now. + # If rare quotas are behind, only admit rare (or GI-both which is always welcome). + if behind_Q or behind_V: + if is_GI_both: + _log("Accept: rare behind, GI-both bonus") + _GI_BOTH_ADMITTED += 1 + return True + if (behind_Q and hasQ and need.get(Q, 0) > 0) or (behind_V and hasV and need.get(V, 0) > 0): + _log("Accept: rare behind -> take needed rare") + return True + _log("Reject: rare behind -> conserve seat (candidate not needed rare)") + return False + + # If German is behind (but GI-both is fine), allow German-only as a third-tier priority. + if behind_G: + if is_GI_both: + _log("Accept: German behind, GI-both present") + _GI_BOTH_ADMITTED += 1 + return True + if hasG and need.get(G, 0) > 0 and not hasI: + _log("Accept: German behind -> take German-only") + return True + # Don’t pull in International-only here; keep GI balance healthy. + if hasI and not hasG: + _log("Reject: German behind -> skip International-only") + return False + + # --- Post front-load: Helper-only, with GI-both feasibility guard already enforced --- + # Still avoid pure International-only unless we either (a) need International + # and (b) GI-both target is already fully satisfied. + gi_both_remaining = max(0, gi_both_min_total - _GI_BOTH_ADMITTED) + if hasI and not hasG and gi_both_remaining > 0: + # We still owe GI-both; don't consume seats with International-only yet. + _log("Reject: GI-both remaining -> defer International-only") + return False + + # Safe to admit any feasible helper now. + if is_GI_both: + _GI_BOTH_ADMITTED += 1 + _log("Accept: on-track helper (GI-both)") + return True + + _log(f"Accept: on-track helper {helps}") + return True + diff --git a/policies/policy6.py b/policies/policy6.py new file mode 100644 index 0000000..d3c9e70 --- /dev/null +++ b/policies/policy6.py @@ -0,0 +1,184 @@ +# policy.py — anti-stall, rare+overlap–reserved, feasibility-first +# +# Fix for your “stuck at ~550 admits” case: +# • Keep hard feasibility (can’t paint into a corner). +# • Explicitly reserve seats for the three true bottlenecks: +# – queer_friendly (Q), vinyl_collector (V), and the GI overlap +# (german_speaker ∩ international) which must be ≥ 450. +# • Use *pacing* only as a soft gate; never let it deadlock: +# – While Q/V/GI-overlap are behind pace, prefer them. +# – Allow other helpers ONLY if, after admitting, there is comfortable +# cushion beyond the reserved seats for Q/V/GI-overlap. +# – If we still see a long rejection streak, auto-relax the pacing gate +# (base + reserves feasibility still enforced) to break droughts. +# +# Guarantees: +# • Reject non-helpers. +# • Per-attribute feasibility after each admit. +# • GI-overlap feasibility (and aggregated rare+overlap reserve) after each admit. +# • Anti-stall: pacing can’t lock the policy into rejecting forever. +# +# Toggle DEBUG=True for verbose reasoning. + +import sys +from math import ceil +from typing import Dict, List + +DEBUG = False # set True for step-by-step logs + +# Attribute names for Scenario 3 +U = "underground_veteran" +I = "international" +F = "fashion_forward" +Q = "queer_friendly" +V = "vinyl_collector" +G = "german_speaker" + +# --- Module state (persists across calls) --- +_STEP = 0 +_REJECT_STREAK = 0 +_GI_BOTH_ADMITTED = 0 # admits with BOTH german_speaker AND international + +# --- Tunables --- +PACE_SLACK = 0 # how many below on-track we tolerate +RELAX_STREAK = 400 # after this many consecutive rejects, relax pacing gate +CUSHION_FRACTION = 0.05 # 5% of remaining seats as cushion when behind pace +MIN_CUSHION = 2 # at least this cushion when behind pace + + +def _log(msg: str) -> None: + if DEBUG: + print(msg, file=sys.stderr, flush=True) + + +def _need(mins: Dict[str, int], tallies: Dict[str, int]) -> Dict[str, int]: + return {a: max(0, mins[a] - tallies.get(a, 0)) for a in mins} + + +def _helpers(attrs: Dict[str, bool], need: Dict[str, int]) -> List[str]: + return [a for a, n in need.items() if n > 0 and attrs.get(a, False)] + + +def _pace_required(target: int, admitted: int, cap: int) -> int: + return ceil(target * admitted / max(1, cap)) + + +def _feasible_per_attribute(attrs: Dict[str, bool], need: Dict[str, int], S: int) -> bool: + S_after = S - 1 + for a, n in need.items(): + rem = n - (1 if attrs.get(a, False) else 0) + if rem < 0: + rem = 0 + if S_after < rem: + return False + return True + + +def _gi_overlap_total_min(mins: Dict[str, int], cap: int) -> int: + # Minimum number of admits that must be BOTH german & international to satisfy both minima in cap seats + return max(0, mins.get(G, 0) + mins.get(I, 0) - cap) + + +def _reserve_after_admit(attrs: Dict[str, bool], + need_Q: int, need_V: int, need_GI: int) -> int: + # Remaining reserved seats needed for Q, V, and GI-overlap AFTER this admission + q_after = max(0, need_Q - (1 if attrs.get(Q, False) else 0)) + v_after = max(0, need_V - (1 if attrs.get(V, False) else 0)) + gi_after = max(0, need_GI - (1 if (attrs.get(G, False) and attrs.get(I, False)) else 0)) + return q_after + v_after + gi_after + + +def decide(attributes: Dict[str, bool], + tallies: Dict[str, int], + mins: Dict[str, int], + admitted_count: int, + venue_cap: int) -> bool: + global _STEP, _REJECT_STREAK, _GI_BOTH_ADMITTED + _STEP += 1 + + S = venue_cap - admitted_count # remaining seats + need = _need(mins, tallies) + total_need = sum(need.values()) + + # Admit freely when all minima are satisfied. + if total_need == 0: + _REJECT_STREAK = 0 + _log("All minima satisfied -> admit") + return True + + # Must help at least one unmet attribute. + helps = _helpers(attributes, need) + if not helps: + _REJECT_STREAK += 1 + _log("Reject: non-helper") + return False + + # Hard per-attribute feasibility. + if not _feasible_per_attribute(attributes, need, S): + _REJECT_STREAK += 1 + _log("Reject: per-attribute feasibility would break") + return False + + # --- GI-overlap reserve & feasibility --- + gi_min_total = _gi_overlap_total_min(mins, venue_cap) # = 450 in this scenario + gi_need_now = max(0, gi_min_total - _GI_BOTH_ADMITTED) + + # Reserved seats for Q, V, and GI-overlap after admitting this person + reserved_after = _reserve_after_admit(attributes, + need.get(Q, 0), need.get(V, 0), gi_need_now) + S_after = S - 1 + + # Hard reserve gate: we must always be able to fit remaining Q, V, GI-overlap. + if S_after < reserved_after: + _REJECT_STREAK += 1 + _log(f"Reject: violates rare+overlap reserve (S_after={S_after} < reserved={reserved_after})") + return False + + # --- Pacing (soft gate with anti-stall) --- + # On-track targets by now: + q_pace = _pace_required(mins.get(Q, 0), admitted_count, venue_cap) + v_pace = _pace_required(mins.get(V, 0), admitted_count, venue_cap) + gi_pace = _pace_required(gi_min_total, admitted_count, venue_cap) + + behind_Q = tallies.get(Q, 0) + PACE_SLACK < q_pace + behind_V = tallies.get(V, 0) + PACE_SLACK < v_pace + behind_GI = _GI_BOTH_ADMITTED + PACE_SLACK < gi_pace + behind_any = behind_Q or behind_V or behind_GI + + is_GI_both = bool(attributes.get(G, False) and attributes.get(I, False)) + is_Q = bool(attributes.get(Q, False)) + is_V = bool(attributes.get(V, False)) + + if behind_any: + # Prefer admitting GI-both or rare (Q/V) when behind pace. + if (behind_GI and is_GI_both) or (behind_Q and is_Q) or (behind_V and is_V): + if is_GI_both: + _GI_BOTH_ADMITTED += 1 + _REJECT_STREAK = 0 + _log("Accept: behind pace -> prioritized helper (GI-both or Q/V)") + return True + + # Otherwise, allow admitting other helpers only if there's *comfortable* cushion beyond reserves. + # Cushion scaled to remaining seats to avoid early over-admitting. + cushion_needed = max(MIN_CUSHION, int(S_after * CUSHION_FRACTION)) + cushion_have = S_after - reserved_after + if cushion_have >= cushion_needed or _REJECT_STREAK >= RELAX_STREAK: + if is_GI_both: + _GI_BOTH_ADMITTED += 1 + _REJECT_STREAK = 0 + _log(f"Accept: behind pace but cushion OK (have={cushion_have}≥{cushion_needed}) " + f"or relax streak hit ({_REJECT_STREAK>=RELAX_STREAK})") + return True + + # Not prioritized and cushion too small -> reject to conserve headroom. + _REJECT_STREAK += 1 + _log(f"Reject: behind pace, cushion too small (have={cushion_have}<{cushion_needed})") + return False + + # On pace: admit any helper (reserves already enforced). + if is_GI_both: + _GI_BOTH_ADMITTED += 1 + _REJECT_STREAK = 0 + _log(f"Accept: on pace helper {helps}") + return True +