diff --git a/workers/null/README.md b/workers/null/README.md index a0b7b21..dff0643 100644 --- a/workers/null/README.md +++ b/workers/null/README.md @@ -1,225 +1,87 @@ # Null PyWorker -A PyWorker that does **nothing** — it does not forward requests to any model -server. Reservations are modelled as framework **sessions**: a request -comes in and you get a worker; release and it scales back down. +Holds Vast Serverless reservations open without forwarding any work to a +model. Use it when your real workload (a queue consumer in any language) +runs as a separate process on the instance and you just want to drive +Vast autoscaling: **one POST reserves a worker, one POST releases it.** -## When to use it +## Use case -Use this worker when you want to drive Vast Serverless autoscaling but you do -**not** want inbound requests to reach a model on the instance. Typical setup: - -- You already have a job queue on your own infrastructure (Redis, SQS, NATS, - etc.). -- A separate worker process on the Vast instance pulls work from that queue - directly. The Vast PyWorker is not involved in the request/response path. - Your consumer can be any language — node, golang, python, a binary — - this PyWorker is implementation-agnostic. -- You want one Vast worker per active queue consumer, and you want the - Serverless autoscaler to spin instances up and down based on demand on - *your* side. +You have a job queue on your own infrastructure (Redis, SQS, NATS, etc.) +and a consumer (node, golang, python, a binary — anything) that pulls +from it. You want one Vast worker per unit of in-flight work, scaling +elastically from zero. The null PyWorker is the autoscaling driver; your +consumer does the work. ## How it works -- Reservations use the framework's **session** model. The SDK exposes - `endpoint.session(cost, lifetime)` which POSTs to `/session/create` (a - built-in framework route) and returns a `Session` object usable as - `async with`. Closing the context (or calling `await session.close()`) - POSTs to `/session/end` — counted as a normal success in metrics. -- `max_sessions=1` on the worker side means a second `/session/create` - against an already-occupied worker returns `429`. Serverless routes - that request to a free worker or scales a new one up. -- Sessions are **excluded from queue-wait math** (the framework filters - `if not request.is_session`), so an occupied worker doesn't look like - it has a request queue piling up. The autoscaler treats a session as - occupancy, not as work-in-progress. -- `lifecycle` is used instead of `model_log_file`, so there is no log to - tail and no model server to start. The worker reports itself ready - immediately after a trivial benchmark. +Reservations use the framework's session API. The SDK's +`endpoint.session(...)` POSTs `/session/create` to reserve a worker; +`session.close()` POSTs `/session/end` to release it. `max_sessions=1` +means each worker holds exactly one reservation — the next reservation +either lands on a free worker or triggers a scale-up. -## Healthchecking +The PyWorker itself does nothing functional: -The framework periodically GETs a healthcheck URL after startup; if it ever -fails after the first success, the worker is marked errored and the -autoscaler can decommission it. Two modes: +- One trivial `/ping` route to satisfy the framework's benchmark + requirement (its `max_perf` is pinned to 100). +- An internal `/release` endpoint on `127.0.0.1:18999` for the local + consumer to end the session without needing `session_auth`. -- **Stub (default)** — the internal control server also answers - `GET /health` with `200`. Just enough to satisfy the framework while - you wire up real consumers. -- **Point at your queue consumer (recommended)** — set - `BACKEND_HEALTH_URL=http://127.0.0.1:9090/health` (absolute URL) and - the pyworker will healthcheck *your* consumer instead. If the consumer - process crashes, the autoscaler will see the worker as broken. +## Endpoint parameters + +Tested working configuration: + +| Parameter | Value | Why | +|---|---|---| +| `target_util` | `1.0` | One session = one worker. Default `0.9` rounds up to an extra worker. | +| `min_load` | `0` | Scale-to-zero floor. | +| `max_queue_time` | `1` | Stop routing to an occupied worker after ~1s of implied queue. | +| `target_queue_time` | `0.5` | Trigger scale-up promptly once anything queues. | +| `inactivity_timeout` | `10` (seconds) | Permit scale-to-zero after 10s idle. | ## API -### Reservation: `POST /session/create` (external, signed) +| Route | Where | Use | +|---|---|---| +| `POST /session/create` | endpoint, signed | Reserve a worker (`endpoint.session(...)`) | +| `POST /session/end` | endpoint, signed | Release (`session.close()`) | +| `POST /release` | `127.0.0.1:18999`, no auth | Local consumer release, no `session_auth` needed | -Not implemented here — the framework provides this route automatically on -every PyWorker. Use the SDK: +## Healthcheck -```python -from vastai import Serverless +Default: stub on `127.0.0.1:18999/health` returning `200`. Set +`BACKEND_HEALTH_URL=http://127.0.0.1:9090/health` (absolute URL) to point +the framework at your queue consumer's health endpoint instead — if the +consumer dies, the autoscaler sees the worker as broken. -async with Serverless() as client: - endpoint = await client.get_endpoint(name="my-null-endpoint") - async with endpoint.session(cost=100, lifetime=600) as s: - # Worker is now reserved. Your queue dispatcher does whatever it - # needs to do (typically: enqueue a job that mentions s.session_id). - ... - # `async with` exit posts to /session/end → 200 success in metrics -``` +## Deploying -Or raw HTTP (the SDK takes care of autoscaler signing for you, but the -shape of the request is documented for non-Python clients): - -``` -POST /session/create -{ - "auth_data": { /* signed by autoscaler */ }, - "payload": { - "lifetime": 600, - "on_close_route": "https://your.callback/notify", - "on_close_payload": {"job_id": "..."} - } -} -``` - -### Release from a local consumer: `POST /release` (internal, localhost-only) - -Closes the active session, regardless of who created it. No body, no -auth. Use this when the queue consumer doesn't have (and shouldn't need) -the session's `session_auth`: - -```bash -curl -X POST http://127.0.0.1:18999/release -``` - -Responses: - -- `200 {"released": true, "session_ids": ["..."]}` — closed; the held - client-side `/session/create` completes and counts as a success. -- `200 {"released": false, "reason": "no active session"}` — nothing - active, no-op. - -For setups where the dispatcher can hand the consumer `session_auth` -(e.g. as part of the queue payload), the consumer can instead POST -`/session/end` on the framework's HTTP-only port -(`$WORKER_HTTP_PORT`, default `WORKER_PORT+1`) — the standard, fully -authenticated release path. - -## Environment variables - -- `BACKEND_HEALTH_URL` — absolute URL the framework should healthcheck - (e.g. `http://127.0.0.1:9090/health`). When set, the stub `/health` - route is not registered on the internal server. -- `NULL_CONTROL_PORT` — port for the internal control server (hosts - `/release` and optionally `/health`). Defaults to `18999`. - -## Deploying on Vast Serverless - -1. Create a Serverless endpoint and point `PYWORKER_REPO` at this - repository (or your fork). -2. Set `BACKEND=null` in the template so `start_server.sh` runs - `workers.null.worker`. -3. There is no model server to configure; you can omit model-related env - vars entirely. -4. Run your own queue-consumer process on the instance alongside the - PyWorker. When it finishes its work: +1. Point `PYWORKER_REPO` at this repo (or your fork). +2. Set `BACKEND=null` in the template. +3. Run your queue consumer alongside the PyWorker. When it's done with + a unit of work: ```bash curl -X POST http://127.0.0.1:18999/release ``` -### Endpoint scaling parameters - -The null worker reports `max_perf = 100` and each reservation is a -session of `cost = 100`. The intended model is **one session = one -worker**, scaling elastically from zero up to as many concurrent -sessions as you ask for. - -- **`target_util = 1.0`** — required. The default of `0.9` reserves - ~11% spare capacity, which for a unit-occupancy worker rounds up to a - whole extra worker (e.g. `min_load = 100` becomes `100 / 0.9 = 111.1` - → 2 active workers instead of 1). With `target_util = 1.0` the math - is clean: `min_load = 100 * N` keeps exactly `N` workers active. -- **`min_load = 0`** — required for scale-to-zero. With `min_load = 0` - and a positive `inactivity_timeout`, the endpoint can scale down to - zero active workers when no sessions exist. -- **`max_workers`** — cap on total reservations the endpoint can ever - serve concurrently. -- **`inactivity_timeout`** — positive value enables scale-to-zero - after the configured number of seconds of no active sessions. Use - alongside `cold_workers = 0` to also drop the inactive pool. -- **`max_queue_time = 0`** and **`target_queue_time = 0`** — - recommended. The autoscaler computes per-worker queue-time as - `cur_load / max_perf` and sessions *are* in `cur_load`. With the - defaults (~30s), an occupied null worker (`cur_load = 100`, - `max_perf = 100`, implied queue = 1s) looks "available" for routing, - so a third reservation gets repeatedly 429'd and never triggers - scale-up. Zeroing both knobs tells the autoscaler "don't estimate - when this worker will free up; route to a free one or make a new - one." - -#### Known autoscaler quirk - -In current Vast Serverless, scale-up reliably fires for the 1→2 -worker transition (the first 429 from an occupied worker activates a -cold one), but **the 2→3 transition often fails to fire** — the -third reservation 429s on both occupied workers and sits in the -autoscaler's global queue indefinitely instead of activating a third -cold worker. Scale-to-zero also has known issues. - -Fixes are pending on the Vast side. Until they land, a temporary -workaround is to over-provision by reporting `cost > max_perf` on -session creation: +## Client demo ```bash -python -m workers.null.client --demo --session-cost 200 +# Single reservation +python -m workers.null.client --endpoint --instance alpha + +# Staggered three-session trapezoid +python -m workers.null.client --endpoint --instance alpha --demo ``` -With `cost = 200, max_perf = 100`, each occupied worker reports -`cur_load / max_perf = 2.0` — clearly over capacity, so the autoscaler -keeps one extra active worker warm per session. The next -`/session/create` lands on the warm worker directly with no queue. -**This is a band-aid, not the design.** The intended steady state -is `cost = 100` with predictable elastic scale-up. +Flags: `--duration` (single), `--interval` and `--plateau` (demo +timing), `--session-cost` (overrides the cost reported at session +create; default 100 = `max_perf`), `--instance` (`prod` | `alpha` | +`candidate` | `local`). -## Client example +## Environment variables -Single reservation (holds for 180s): - -```bash -python -m workers.null.client --endpoint -``` - -Staggered demo: - -```bash -python -m workers.null.client --endpoint --demo -``` - -Starts three sessions 30s apart (all held concurrently), holds the -3-worker plateau for 5 minutes so the autoscaler has time to actually -provision the third worker before any scale-down starts, then closes -the sessions one at a time, also 30s apart, and exits. Every session -ends cleanly via the SDK's `session.close()` — `200` successes in -metrics, no cancellations. - -Tune the timing with `--interval` and `--plateau`. To exercise the -local-release path, shell into a worker and run -`curl -X POST http://127.0.0.1:18999/release`. - -## Notes and caveats - -- The reservation's lifetime caps how long the session can live without - client activity. Set it comfortably longer than the work you expect to - do, or have the client periodically POST `/ping` with `session_id` to - extend. -- The `on_close_route` payload (passed at `/session/create`) is POSTed by - the framework when the session ends. Useful for notifying your queue - consumer that the reservation is closing. -- `/release` on the internal port is convenient but bypasses - `session_auth`. If you need the standard authenticated release flow, - pass `session_auth` to your consumer (e.g. through the queue payload) - and have it POST to `/session/end` on the framework's HTTP port - instead. +- `BACKEND_HEALTH_URL` — absolute URL the framework healthchecks. Stub + is used when unset. +- `NULL_CONTROL_PORT` — internal control server port. Defaults to `18999`. diff --git a/workers/null/client.py b/workers/null/client.py index 4b255fc..752e13b 100644 --- a/workers/null/client.py +++ b/workers/null/client.py @@ -15,12 +15,7 @@ logging.basicConfig( log = logging.getLogger(__file__) ENDPOINT_NAME = "null-prod" -# Default cost passed to /session/create. 100 matches the worker's -# max_perf for clean unit-occupancy semantics: one session = one worker. -# If you hit autoscaler scale-up issues (queueing past the 2nd active -# worker), --session-cost 200 is a temporary over-provisioning workaround -# until the known autoscaler fixes land. -DEFAULT_SESSION_COST = 100 +DEFAULT_SESSION_COST = 100 # matches the worker's max_perf async def reserve( @@ -31,35 +26,18 @@ async def reserve( session_cost: int, label: str = "session", ) -> None: - """Open a session, hold the worker for `hold_for` seconds, close cleanly. - - Uses the framework's session model — each session counts as one worker - occupied, but unlike a held HTTP request it isn't poisoning the - worker's throughput math. max_sessions=1 on the worker side means a - second /session/create against the same worker gets 429, so serverless - routes the second reservation to a free worker or scales a new one up. - """ endpoint = await client.get_endpoint(name=endpoint_name) - # Session lifetime must outlast the hold. The framework expires sessions - # whose `expiration` (set to now + lifetime at creation) has passed; we - # don't make any keepalive requests so no extension happens. - lifetime = hold_for + 60 + lifetime = hold_for + 60 # outlast the hold; no keepalives sent start = time.monotonic() - log.info( - "[%s] creating session (cost=%d, lifetime=%.0fs, hold=%.0fs)", - label, session_cost, lifetime, hold_for, - ) + log.info("[%s] creating session (cost=%d, hold=%.0fs)", label, session_cost, hold_for) async with await endpoint.session(cost=session_cost, lifetime=lifetime) as s: log.info("[%s] session %s open", label, s.session_id) try: await asyncio.sleep(hold_for) - log.info("[%s] hold complete, closing session", label) except asyncio.CancelledError: - elapsed = time.monotonic() - start - log.info("[%s] cancelled after %.1fs, closing session", label, elapsed) + log.info("[%s] cancelled after %.1fs", label, time.monotonic() - start) raise - elapsed = time.monotonic() - start - log.info("[%s] session closed cleanly after %.1fs", label, elapsed) + log.info("[%s] closed cleanly after %.1fs", label, time.monotonic() - start) async def run_demo( @@ -70,117 +48,52 @@ async def run_demo( plateau: float, session_cost: int, ) -> None: - """Trapezoidal load: ramp up three sessions, plateau, then scale down. - - Start three sessions spaced `interval` seconds apart. Each holds for - `(n-1)*interval + plateau` seconds, so the first release fires - `plateau` seconds after the last session started — giving the - autoscaler time to actually have all three workers running before any - scale-down begins. Releases then fire `interval` seconds apart, - matching the ramp-up. - - Each session ends via the SDK's `session.close()` on `async with` exit, - which posts to /session/end with proper auth — counted as a normal - success in metrics. - """ n = 3 hold = (n - 1) * interval + plateau tasks: list[asyncio.Task] = [] for i in range(1, n + 1): label = f"res-{i}" - log.info("[%s] starting (hold=%.0fs)", label, hold) - task = asyncio.create_task( - reserve( - client, - endpoint_name=endpoint_name, - hold_for=hold, - session_cost=session_cost, - label=label, - ), + tasks.append(asyncio.create_task( + reserve(client, endpoint_name=endpoint_name, hold_for=hold, + session_cost=session_cost, label=label), name=label, - ) - tasks.append(task) + )) if i < n: - log.info("Waiting %.0fs before next session...", interval) await asyncio.sleep(interval) - log.info( - "All %d sessions in flight; holding plateau for %.0fs, " - "then scaling down %.0fs apart", - n, - plateau, - interval, + "All %d sessions in flight; plateau %.0fs, scale-down %.0fs apart", + n, plateau, interval, ) - results = await asyncio.gather(*tasks, return_exceptions=True) - for task, result in zip(tasks, results): - log.info("[%s] final: %r", task.get_name(), result) + await asyncio.gather(*tasks, return_exceptions=True) def build_arg_parser() -> argparse.ArgumentParser: p = argparse.ArgumentParser(description="Vast Null PyWorker demo client") - p.add_argument( - "--endpoint", - default=os.environ.get("VAST_ENDPOINT", ENDPOINT_NAME), - help=f"Vast endpoint name (default: {ENDPOINT_NAME})", - ) - p.add_argument( - "--duration", - type=float, - default=180.0, - help="Single-reserve mode: seconds to hold the worker (default: 180)", - ) + p.add_argument("--endpoint", default=os.environ.get("VAST_ENDPOINT", ENDPOINT_NAME), + help=f"endpoint name (default: {ENDPOINT_NAME})") + p.add_argument("--instance", choices=("prod", "alpha", "candidate", "local"), + default=os.environ.get("VAST_INSTANCE", "prod"), + help="serverless instance (default: prod)") + p.add_argument("--duration", type=float, default=180.0, + help="single-reserve mode: seconds to hold (default: 180)") modes = p.add_mutually_exclusive_group(required=False) - modes.add_argument( - "--reserve", - action="store_true", - help="Make a single session (default if no mode given)", - ) - modes.add_argument( - "--demo", - action="store_true", - help="Run the staggered 3-reservation trapezoid demo", - ) + modes.add_argument("--reserve", action="store_true", + help="single session (default if no mode given)") + modes.add_argument("--demo", action="store_true", + help="staggered 3-session trapezoid") - p.add_argument( - "--interval", - type=float, - default=30.0, - help="Demo mode: seconds between reservation steps (default: 30)", - ) - p.add_argument( - "--plateau", - type=float, - default=300.0, - help=( - "Demo mode: seconds to hold all 3 reservations active before " - "scale-down starts. Gives the autoscaler time to fully spin " - "up the third worker (default: 300)" - ), - ) - p.add_argument( - "--session-cost", - type=int, - default=DEFAULT_SESSION_COST, - help=( - f"Cost reported to the autoscaler for each /session/create. " - f"Setting this above the worker's max_perf (100) over-provisions " - f"slightly, keeping an extra active worker warm so the next " - f"session lands without queueing (default: {DEFAULT_SESSION_COST})" - ), - ) - p.add_argument( - "--instance", - choices=("prod", "alpha", "candidate", "local"), - default=os.environ.get("VAST_INSTANCE", "prod"), - help="Vast serverless instance to target (default: prod)", - ) + p.add_argument("--interval", type=float, default=30.0, + help="demo: seconds between sessions (default: 30)") + p.add_argument("--plateau", type=float, default=300.0, + help="demo: seconds to hold all 3 active (default: 300)") + p.add_argument("--session-cost", type=int, default=DEFAULT_SESSION_COST, + help=f"cost reported at session-create (default: {DEFAULT_SESSION_COST})") return p async def main_async(): args = build_arg_parser().parse_args() - print("=" * 60) print(f"Endpoint: {args.endpoint} (instance: {args.instance})") print("=" * 60) @@ -188,23 +101,16 @@ async def main_async(): try: async with Serverless(instance=args.instance) as client: if args.demo: - await run_demo( - client, - endpoint_name=args.endpoint, - interval=args.interval, - plateau=args.plateau, - session_cost=args.session_cost, - ) + await run_demo(client, endpoint_name=args.endpoint, + interval=args.interval, plateau=args.plateau, + session_cost=args.session_cost) else: - await reserve( - client, - endpoint_name=args.endpoint, - hold_for=args.duration, - session_cost=args.session_cost, - label="reservation", - ) + await reserve(client, endpoint_name=args.endpoint, + hold_for=args.duration, + session_cost=args.session_cost, + label="reservation") except KeyboardInterrupt: - log.info("Interrupted; dropping any in-flight sessions") + log.info("Interrupted; dropping in-flight sessions") except Exception as e: log.error("Error: %s", e, exc_info=True) sys.exit(1) diff --git a/workers/null/worker.py b/workers/null/worker.py index 06494c2..1027bf5 100644 --- a/workers/null/worker.py +++ b/workers/null/worker.py @@ -16,37 +16,21 @@ from vastai import ( log = logging.getLogger(__file__) -# Performance value pinned in the benchmark cache; sent to autoscaler as -# max_perf. Standardized at 100 — the conventional default the rest of the -# serverless system expects. TARGET_PERF = 100.0 - -# Marker the benchmark path sets so the fallback /ping path returns -# immediately during the framework's startup benchmark. BENCHMARK_SENTINEL = "__null_worker_benchmark__" -# Internal control server. Hosts: -# * POST /release — releases the active reservation by closing the -# singleton session on this worker. Called by the user's queue -# consumer when its work is done. -# * GET /health — only when BACKEND_HEALTH_URL is unset; gives the -# framework's healthcheck loop something live to talk to. -# Bound to 127.0.0.1 so only processes on the instance can reach it. INTERNAL_HOST = "127.0.0.1" INTERNAL_PORT = int(os.environ.get("NULL_CONTROL_PORT", 18999)) STUB_HEALTH_PATH = "/health" BACKEND_HEALTH_URL = os.environ.get("BACKEND_HEALTH_URL", "").strip() - if BACKEND_HEALTH_URL: - _parsed = urlsplit(BACKEND_HEALTH_URL) - if not _parsed.scheme or not _parsed.hostname: - raise ValueError( - f"BACKEND_HEALTH_URL must be an absolute URL, got: {BACKEND_HEALTH_URL!r}" - ) - HEALTH_BASE_URL = f"{_parsed.scheme}://{_parsed.hostname}" - HEALTH_PORT = _parsed.port or (443 if _parsed.scheme == "https" else 80) - HEALTH_PATH = _parsed.path or "/" + _p = urlsplit(BACKEND_HEALTH_URL) + if not _p.scheme or not _p.hostname: + raise ValueError(f"BACKEND_HEALTH_URL must be absolute, got: {BACKEND_HEALTH_URL!r}") + HEALTH_BASE_URL = f"{_p.scheme}://{_p.hostname}" + HEALTH_PORT = _p.port or (443 if _p.scheme == "https" else 80) + HEALTH_PATH = _p.path or "/" USE_STUB_HEALTH = False else: HEALTH_BASE_URL = f"http://{INTERNAL_HOST}" @@ -55,9 +39,6 @@ else: USE_STUB_HEALTH = True -# Stashed after Worker(...) is constructed so /release can reach the -# framework's session machinery. Dict so the lifecycle closure picks up -# the assignment that happens before .run(). _backend_ref: dict = {"backend": None} @@ -65,30 +46,16 @@ def _build_internal_app() -> web.Application: app = web.Application() async def release_handler(_request: web.Request) -> web.Response: - """End the active reservation (the singleton session on this worker). - - max_sessions=1 means at most one session is active here. We call - the framework's internal __close_session via name-mangling to - bypass the session_auth check that /session/end normally requires. - That's intentional: this endpoint is localhost-only so trust is - assumed, and the user's consumer can release without having to - plumb session_auth through their queue. - - __close_session reports the session metrics as a success, fires - on_close_route if configured, and pops the session from the dict. - """ + # Closes the singleton session. Uses name-mangled __close_session + # to bypass the session_auth check — safe because this server is + # bound to 127.0.0.1, and it spares the consumer from threading + # session_auth through its queue. backend = _backend_ref.get("backend") if backend is None: - return web.json_response( - {"released": False, "reason": "backend not ready"}, - status=503, - ) + return web.json_response({"released": False, "reason": "backend not ready"}, status=503) sids = list(backend.sessions.keys()) if not sids: - return web.json_response( - {"released": False, "reason": "no active session"}, - status=200, - ) + return web.json_response({"released": False, "reason": "no active session"}, status=200) closed = [] for sid in sids: try: @@ -96,17 +63,13 @@ def _build_internal_app() -> web.Application: closed.append(sid) except Exception as e: log.warning(f"Error closing session {sid}: {e}") - return web.json_response( - {"released": bool(closed), "session_ids": closed}, - status=200, - ) + return web.json_response({"released": bool(closed), "session_ids": closed}, status=200) app.router.add_post("/release", release_handler) if USE_STUB_HEALTH: async def stub_health(_request: web.Request) -> web.Response: return web.Response(status=200, text="ok") - app.router.add_get(STUB_HEALTH_PATH, stub_health) return app @@ -114,37 +77,26 @@ def _build_internal_app() -> web.Application: @asynccontextmanager async def null_lifecycle(): - # Pin max_throughput to exactly TARGET_PERF by pre-populating the - # framework's benchmark cache file. __run_benchmark short-circuits to - # float(file_contents) when this file exists. + # Pin max_throughput to TARGET_PERF exactly — the framework's + # __run_benchmark short-circuits to float(file_contents) if this exists. try: with open(".has_benchmark", "w") as fh: fh.write(str(int(TARGET_PERF))) except OSError as e: log.warning(f"Could not pin benchmark cache: {e}") - app = _build_internal_app() - runner = web.AppRunner(app) + runner = web.AppRunner(_build_internal_app()) await runner.setup() - site = web.TCPSite(runner, INTERNAL_HOST, INTERNAL_PORT) - await site.start() + await web.TCPSite(runner, INTERNAL_HOST, INTERNAL_PORT).start() - lines = [ - f"Null pyworker internal control server: http://{INTERNAL_HOST}:{INTERNAL_PORT}", - f" POST /release - end the active reservation (call from your queue consumer)", - ] - if USE_STUB_HEALTH: - lines.append( - f" GET {STUB_HEALTH_PATH} - stub healthcheck (override with BACKEND_HEALTH_URL)" - ) - else: - lines.append(f"Framework healthcheck pointed at: {BACKEND_HEALTH_URL}") - lines.append( - "Reservations use the framework session model. Clients POST to " - "/session/create via the SDK to acquire a worker; max_sessions=1 " - "so each worker holds at most one reservation." + log.info( + "Null pyworker control server: http://%s:%d (POST /release%s)", + INTERNAL_HOST, + INTERNAL_PORT, + f", GET {STUB_HEALTH_PATH}" if USE_STUB_HEALTH else "", ) - log.info("\n".join(lines)) + if not USE_STUB_HEALTH: + log.info("Framework healthcheck → %s", BACKEND_HEALTH_URL) try: yield @@ -153,15 +105,11 @@ async def null_lifecycle(): async def ping(**params: object) -> dict: - """Trivial handler. Exists to satisfy the framework's requirement that - at least one HandlerConfig has a BenchmarkConfig, and to give clients - a route they can hit with session_id to extend their session TTL. - """ + # Exists only to satisfy the framework's "at least one handler with a + # BenchmarkConfig" requirement. Sleep 1s on the benchmark path as a + # fallback in case the .has_benchmark cache pin failed; otherwise the + # benchmark cache short-circuits and this never runs. if params.get(BENCHMARK_SENTINEL): - # Fallback only — the lifecycle pre-pins .has_benchmark so - # __run_benchmark normally short-circuits and this never runs. If - # the cache write failed, sleep ~1s so the time-based throughput - # math lands near TARGET_PERF. await asyncio.sleep(1.0) return {"ok": True, "benchmark": True} return {"ok": True}