From 6a562a1376a9c9ec5fdb94c4bcbc20a6ab356e80 Mon Sep 17 00:00:00 2001 From: Rob Ballantyne Date: Tue, 12 May 2026 10:51:24 +0100 Subject: [PATCH] Rewrite null pyworker on the framework session model MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Drop the held-/reserve approach in favour of the framework's session primitive (max_sessions=1 + /session/create). Sessions are excluded from the autoscaler's queue-wait math and don't suffer the cur_perf=0 degradation that a long-held request did, so this naturally produces the "one request comes in and you get a worker; release and it scales back down" model we were hand-rolling. Server side: - max_sessions=1; framework auto-registers /session/* routes - Drop custom /reserve handler, _active_reservation event, max_queue_ time=0.0, MAX_RESERVATION_SECONDS, _perf_heartbeat - Trivial /ping handler exists only to satisfy the framework's "at least one handler with BenchmarkConfig" requirement (and to give clients an extension/keepalive route) - /release on the internal control port is kept as a convenience for queue consumers that don't carry session_auth — calls the framework's __close_session via name-mangling, which bypasses the session_auth check but is fine for a localhost-only endpoint - Workload/perf back to 100 (conventional) Client side: - Uses endpoint.session(cost, lifetime) instead of POST /reserve - async with the SDK Session; close on exit posts /session/end with proper auth → 200 success in metrics - Demo and single modes both ride the same reserve() helper Sessions landed in vastai-sdk 0.4.2 (commit ec9ef59, 2026-01-20). Co-Authored-By: Claude Opus 4.7 (1M context) --- workers/null/README.md | 184 +++++++++++++++++++-------------------- workers/null/client.py | 83 ++++++++++-------- workers/null/worker.py | 191 +++++++++++++++-------------------------- 3 files changed, 206 insertions(+), 252 deletions(-) diff --git a/workers/null/README.md b/workers/null/README.md index 86fdba2..2195fe3 100644 --- a/workers/null/README.md +++ b/workers/null/README.md @@ -1,10 +1,8 @@ # Null PyWorker A PyWorker that does **nothing** — it does not forward requests to any model -server. Each HTTP POST to `/reserve` simply marks the worker as busy and holds -the request open until the user's queue consumer (running locally on the -instance) calls `/release` on the internal control port — or a safety -timeout elapses. +server. Reservations are modelled as framework **sessions**: a request +comes in and you get a worker; release and it scales back down. ## When to use it @@ -15,32 +13,29 @@ Use this worker when you want to drive Vast Serverless autoscaling but you do etc.). - A separate worker process on the Vast instance pulls work from that queue directly. The Vast PyWorker is not involved in the request/response path. + Your consumer can be any language — node, golang, python, a binary — + this PyWorker is implementation-agnostic. - You want one Vast worker per active queue consumer, and you want the Serverless autoscaler to spin instances up and down based on demand on *your* side. -A request comes in and you get a worker. Release and it scales back down. - -POST to `/reserve` and serverless gives you a worker, held busy for the -lifetime of the request. When your queue consumer is done, POST to -`/release` on the internal port (`127.0.0.1:18999` by default) and the -held `/reserve` returns `200`. - ## How it works -- `allow_parallel_requests=False` and `max_queue_time=0.0`, so one in-flight - `/reserve` fully occupies the worker and any further request that lands - on it is rejected with `429` immediately — serverless will route to a - free worker or scale a new one up. -- `lifecycle` is used instead of `model_log_file`, so there is no log to tail - and no model server to start. The worker reports itself ready immediately - after the (trivial) benchmark. -- The `/reserve` handler is a `remote_function` rather than an HTTP proxy, so - the framework never tries to forward the request anywhere — it just awaits - an internal `asyncio.Event`. -- An internal aiohttp control server, bound to `127.0.0.1`, hosts - `/release` (and, when no external healthcheck URL is provided, a stub - `/health`). +- Reservations use the framework's **session** model. The SDK exposes + `endpoint.session(cost, lifetime)` which POSTs to `/session/create` (a + built-in framework route) and returns a `Session` object usable as + `async with`. Closing the context (or calling `await session.close()`) + POSTs to `/session/end` — counted as a normal success in metrics. +- `max_sessions=1` on the worker side means a second `/session/create` + against an already-occupied worker returns `429`. Serverless routes + that request to a free worker or scales a new one up. +- Sessions are **excluded from queue-wait math** (the framework filters + `if not request.is_session`), so an occupied worker doesn't look like + it has a request queue piling up. The autoscaler treats a session as + occupancy, not as work-in-progress. +- `lifecycle` is used instead of `model_log_file`, so there is no log to + tail and no model server to start. The worker reports itself ready + immediately after a trivial benchmark. ## Healthchecking @@ -49,48 +44,52 @@ fails after the first success, the worker is marked errored and the autoscaler can decommission it. Two modes: - **Stub (default)** — the internal control server also answers - `GET /health` with `200`. This is just enough to satisfy the framework - while you wire up real consumers. + `GET /health` with `200`. Just enough to satisfy the framework while + you wire up real consumers. - **Point at your queue consumer (recommended)** — set - `BACKEND_HEALTH_URL=http://127.0.0.1:9090/health` (absolute URL) and the - pyworker will healthcheck *your* consumer instead. If your consumer + `BACKEND_HEALTH_URL=http://127.0.0.1:9090/health` (absolute URL) and + the pyworker will healthcheck *your* consumer instead. If the consumer process crashes, the autoscaler will see the worker as broken. -Run your queue consumer on the instance alongside the PyWorker, expose a -plain `/health` endpoint on it, then set `BACKEND_HEALTH_URL` accordingly in -your template. - ## API -### `POST /reserve` (external port, signed by the autoscaler) +### Reservation: `POST /session/create` (external, signed) -Holds the worker busy until the reservation ends. +Not implemented here — the framework provides this route automatically on +every PyWorker. Use the SDK: -Request body (all fields optional): +```python +from vastai import Serverless -```json -{ "duration": 600 } +async with Serverless() as client: + endpoint = await client.get_endpoint(name="my-null-endpoint") + async with endpoint.session(cost=100, lifetime=600) as s: + # Worker is now reserved. Your queue dispatcher does whatever it + # needs to do (typically: enqueue a job that mentions s.session_id). + ... + # `async with` exit posts to /session/end → 200 success in metrics ``` -- `duration` (seconds, optional): safety cap on how long to hold the - reservation if no `/release` arrives. Capped by `MAX_RESERVATION_SECONDS` - (env var, default 3600). If omitted, defaults to that cap. +Or raw HTTP (the SDK takes care of autoscaler signing for you, but the +shape of the request is documented for non-Python clients): -Behavior: +``` +POST /session/create +{ + "auth_data": { /* signed by autoscaler */ }, + "payload": { + "lifetime": 600, + "on_close_route": "https://your.callback/notify", + "on_close_payload": {"job_id": "..."} + } +} +``` -- Returns `200` with `{"released": "explicit", ...}` when the local consumer - POSTs `/release` on the internal port. **This is the intended happy path - — the request is counted as a success in metrics.** -- Returns `200` with `{"released": "duration_elapsed", "duration": }` if - the duration cap fires (safety net for a stuck consumer). -- Returns `499` if the external client disconnects (counted as cancelled in - metrics — avoid this; use `/release` instead). -- Returns `429` immediately if the worker is already holding a reservation - (so serverless routes the request to a free worker instead of queueing). +### Release from a local consumer: `POST /release` (internal, localhost-only) -### `POST /release` (internal port, localhost-only) - -Marks the active reservation as done. No body required. Idempotent: +Closes the active session, regardless of who created it. No body, no +auth. Use this when the queue consumer doesn't have (and shouldn't need) +the session's `session_auth`: ```bash curl -X POST http://127.0.0.1:18999/release @@ -98,78 +97,75 @@ curl -X POST http://127.0.0.1:18999/release Responses: -- `200 {"released": true}` — active reservation was released; the held - `/reserve` will return `{"released": "explicit"}`. -- `200 {"released": false, "reason": "no active reservation"}` — nothing was - in flight, no-op. +- `200 {"released": true, "session_ids": ["..."]}` — closed; the held + client-side `/session/create` completes and counts as a success. +- `200 {"released": false, "reason": "no active session"}` — nothing + active, no-op. -Only processes on the Vast instance can reach this port. There is no -authentication on it. +For setups where the dispatcher can hand the consumer `session_auth` +(e.g. as part of the queue payload), the consumer can instead POST +`/session/end` on the framework's HTTP-only port +(`$WORKER_HTTP_PORT`, default `WORKER_PORT+1`) — the standard, fully +authenticated release path. ## Environment variables -- `MAX_RESERVATION_SECONDS` — upper bound on how long a single `/reserve` - call can hold a worker if `/release` is never called. Defaults to `3600`. - `BACKEND_HEALTH_URL` — absolute URL the framework should healthcheck - (e.g. `http://127.0.0.1:9090/health`). When set, the stub `/health` route - is not registered on the internal server. When unset, the built-in stub - is used. + (e.g. `http://127.0.0.1:9090/health`). When set, the stub `/health` + route is not registered on the internal server. - `NULL_CONTROL_PORT` — port for the internal control server (hosts `/release` and optionally `/health`). Defaults to `18999`. ## Deploying on Vast Serverless -1. Create a Serverless endpoint and point `PYWORKER_REPO` at this repository - (or your fork). +1. Create a Serverless endpoint and point `PYWORKER_REPO` at this + repository (or your fork). 2. Set `BACKEND=null` in the template so `start_server.sh` runs `workers.null.worker`. -3. There is no model server to configure; you can omit model-related env vars - entirely. +3. There is no model server to configure; you can omit model-related env + vars entirely. 4. Run your own queue-consumer process on the instance alongside the - PyWorker. When the consumer finishes its work it should: + PyWorker. When it finishes its work: ```bash curl -X POST http://127.0.0.1:18999/release ``` - so the held `/reserve` returns success and the autoscaler can scale the - worker down cleanly. ## Client example -Single reservation: +Single reservation (holds for 180s): ```bash -python -m workers.null.client --endpoint --duration 600 +python -m workers.null.client --endpoint ``` -To exercise the full flow, shell into the worker and run -`curl -X POST http://127.0.0.1:18999/release` — the client returns with -`{"released": "explicit", ...}`. - Staggered demo: ```bash python -m workers.null.client --endpoint --demo ``` -Starts three reservations 30s apart (all held concurrently), holds the +Starts three sessions 30s apart (all held concurrently), holds the 3-worker plateau for 5 minutes so the autoscaler has time to actually -provision the third worker before any scale-down starts, then scales -down one worker at a time, also 30s apart, and exits. +provision the third worker before any scale-down starts, then closes +the sessions one at a time, also 30s apart, and exits. Every session +ends cleanly via the SDK's `session.close()` — `200` successes in +metrics, no cancellations. -Each reservation ends via its duration cap (a 200 success in metrics). -Tune the timing with `--interval` and `--plateau`. +Tune the timing with `--interval` and `--plateau`. To exercise the +local-release path, shell into a worker and run +`curl -X POST http://127.0.0.1:18999/release`. ## Notes and caveats -- The HTTP connection from the external caller must stay open for the full - reservation. Make sure your client and any intermediate proxies allow - long-lived requests (disable idle timeouts, retries, and connection - reuse if necessary). -- If your client retries on timeout, you may end up provisioning duplicate - workers. Configure `duration` generously and rely on `/release` from the - consumer to end reservations promptly. -- Avoid disconnecting the external `/reserve` request as a way to release — - that produces a `499` and is counted as a cancellation in Vast metrics. - Always release via `POST /release` on the internal port. -- There is no streaming / heartbeat in the response; the request returns - exactly once, when the reservation ends. +- The reservation's lifetime caps how long the session can live without + client activity. Set it comfortably longer than the work you expect to + do, or have the client periodically POST `/ping` with `session_id` to + extend. +- The `on_close_route` payload (passed at `/session/create`) is POSTed by + the framework when the session ends. Useful for notifying your queue + consumer that the reservation is closing. +- `/release` on the internal port is convenient but bypasses + `session_auth`. If you need the standard authenticated release flow, + pass `session_auth` to your consumer (e.g. through the queue payload) + and have it POST to `/session/end` on the framework's HTTP port + instead. diff --git a/workers/null/client.py b/workers/null/client.py index 86bd61f..a39de0e 100644 --- a/workers/null/client.py +++ b/workers/null/client.py @@ -15,35 +15,42 @@ logging.basicConfig( log = logging.getLogger(__file__) ENDPOINT_NAME = "null-prod" +SESSION_COST = 100 async def reserve( client: Serverless, *, endpoint_name: str, - duration: float, - label: str = "reservation", -) -> dict: - """Hold a Vast worker open for `duration` seconds (or until we disconnect). + hold_for: float, + label: str = "session", +) -> None: + """Open a session, hold the worker for `hold_for` seconds, close cleanly. - The worker counts itself busy for the lifetime of this call. Returning - here means the reservation has ended — either /release was called on - the worker's internal control port, or the duration cap fired, or the - HTTP request was cancelled. + Uses the framework's session model — each session counts as one worker + occupied, but unlike a held HTTP request it isn't poisoning the + worker's throughput math. max_sessions=1 on the worker side means a + second /session/create against the same worker gets 429, so serverless + routes the second reservation to a free worker or scales a new one up. """ endpoint = await client.get_endpoint(name=endpoint_name) - payload = {"duration": duration} + # Session lifetime must outlast the hold. The framework expires sessions + # whose `expiration` (set to now + lifetime at creation) has passed; we + # don't make any keepalive requests so no extension happens. + lifetime = hold_for + 60 start = time.monotonic() - log.info("[%s] POST /reserve duration=%ss", label, duration) - try: - resp = await endpoint.request("/reserve", payload, cost=150) - elapsed = time.monotonic() - start - log.info("[%s] returned after %.1fs: %s", label, elapsed, resp.get("response")) - return resp["response"] - except asyncio.CancelledError: - elapsed = time.monotonic() - start - log.info("[%s] cancelled after %.1fs (HTTP connection dropped)", label, elapsed) - raise + log.info("[%s] creating session (lifetime=%.0fs, hold=%.0fs)", label, lifetime, hold_for) + async with endpoint.session(cost=SESSION_COST, lifetime=lifetime) as s: + log.info("[%s] session %s open", label, s.session_id) + try: + await asyncio.sleep(hold_for) + log.info("[%s] hold complete, closing session", label) + except asyncio.CancelledError: + elapsed = time.monotonic() - start + log.info("[%s] cancelled after %.1fs, closing session", label, elapsed) + raise + elapsed = time.monotonic() - start + log.info("[%s] session closed cleanly after %.1fs", label, elapsed) async def run_demo( @@ -53,38 +60,41 @@ async def run_demo( interval: float, plateau: float, ) -> None: - """Trapezoidal load: ramp up three reservations, plateau, then scale down. + """Trapezoidal load: ramp up three sessions, plateau, then scale down. - Start three reservations spaced `interval` seconds apart. Pick the - duration so that the first release fires `plateau` seconds *after the - last reservation started*, giving the autoscaler time to actually have - all three workers running before any of them begin to scale down. - Releases then fire `interval` seconds apart, matching the ramp-up. + Start three sessions spaced `interval` seconds apart. Each holds for + `(n-1)*interval + plateau` seconds, so the first release fires + `plateau` seconds after the last session started — giving the + autoscaler time to actually have all three workers running before any + scale-down begins. Releases then fire `interval` seconds apart, + matching the ramp-up. - Each reservation ends via its duration cap (a 200 success). + Each session ends via the SDK's `session.close()` on `async with` exit, + which posts to /session/end with proper auth — counted as a normal + success in metrics. """ n = 3 hold = (n - 1) * interval + plateau tasks: list[asyncio.Task] = [] for i in range(1, n + 1): label = f"res-{i}" - log.info("[%s] starting (auto-release after %.0fs)", label, hold) + log.info("[%s] starting (hold=%.0fs)", label, hold) task = asyncio.create_task( reserve( client, endpoint_name=endpoint_name, - duration=hold, + hold_for=hold, label=label, ), name=label, ) tasks.append(task) if i < n: - log.info("Waiting %.0fs before next reservation...", interval) + log.info("Waiting %.0fs before next session...", interval) await asyncio.sleep(interval) log.info( - "All %d reservations in flight; holding plateau for %.0fs, " + "All %d sessions in flight; holding plateau for %.0fs, " "then scaling down %.0fs apart", n, plateau, @@ -106,19 +116,19 @@ def build_arg_parser() -> argparse.ArgumentParser: "--duration", type=float, default=180.0, - help="Seconds to hold each worker busy (default: 180)", + help="Single-reserve mode: seconds to hold the worker (default: 180)", ) modes = p.add_mutually_exclusive_group(required=False) modes.add_argument( "--reserve", action="store_true", - help="Make a single /reserve call (default if no mode given)", + help="Make a single session (default if no mode given)", ) modes.add_argument( "--demo", action="store_true", - help="Run the staggered 3-reservation demo, cancelling one mid-way", + help="Run the staggered 3-reservation trapezoid demo", ) p.add_argument( @@ -157,15 +167,14 @@ async def main_async(): plateau=args.plateau, ) else: - response = await reserve( + await reserve( client, endpoint_name=args.endpoint, - duration=args.duration, + hold_for=args.duration, label="reservation", ) - print(f"Reservation result: {response}") except KeyboardInterrupt: - log.info("Interrupted; dropping any in-flight reservations") + log.info("Interrupted; dropping any in-flight sessions") except Exception as e: log.error("Error: %s", e, exc_info=True) sys.exit(1) diff --git a/workers/null/worker.py b/workers/null/worker.py index 633f1a1..06494c2 100644 --- a/workers/null/worker.py +++ b/workers/null/worker.py @@ -2,7 +2,6 @@ import asyncio import logging import os from contextlib import asynccontextmanager -from typing import Optional from urllib.parse import urlsplit from aiohttp import web @@ -17,21 +16,21 @@ from vastai import ( log = logging.getLogger(__file__) -# Safety cap: if the user's queue consumer never calls /release, the -# reservation is auto-released after this many seconds so a forgotten /release -# can't pin a worker indefinitely. Override with MAX_RESERVATION_SECONDS. -MAX_RESERVATION_SECONDS = float(os.environ.get("MAX_RESERVATION_SECONDS", 3600)) +# Performance value pinned in the benchmark cache; sent to autoscaler as +# max_perf. Standardized at 100 — the conventional default the rest of the +# serverless system expects. +TARGET_PERF = 100.0 -# Marker the benchmark path sets so the same remote function can return -# immediately during capacity estimation instead of sleeping. +# Marker the benchmark path sets so the fallback /ping path returns +# immediately during the framework's startup benchmark. BENCHMARK_SENTINEL = "__null_worker_benchmark__" # Internal control server. Hosts: -# * POST /release — always available, marks the active reservation as -# done so the held /reserve returns 200 (success in metrics, not a -# cancellation). -# * GET /health — only when no external BACKEND_HEALTH_URL is set; the -# framework's healthcheck loop polls it so the worker has a live signal. +# * POST /release — releases the active reservation by closing the +# singleton session on this worker. Called by the user's queue +# consumer when its work is done. +# * GET /health — only when BACKEND_HEALTH_URL is unset; gives the +# framework's healthcheck loop something live to talk to. # Bound to 127.0.0.1 so only processes on the instance can reach it. INTERNAL_HOST = "127.0.0.1" INTERNAL_PORT = int(os.environ.get("NULL_CONTROL_PORT", 18999)) @@ -56,62 +55,51 @@ else: USE_STUB_HEALTH = True -# Workload reported per /reserve and target perf for the heartbeat below. -TARGET_PERF = 150.0 - -# Singleton active reservation. `allow_parallel_requests=False` on the -# /reserve handler guarantees the framework only runs one at a time per -# worker, so a single slot is enough. -_active_reservation: Optional[asyncio.Event] = None - -# Backed in after Worker(...) is constructed so the heartbeat coroutine in -# null_lifecycle() can mutate backend.metrics. Stored in a dict so the -# lifecycle closure picks up the assignment that happens before .run(). +# Stashed after Worker(...) is constructed so /release can reach the +# framework's session machinery. Dict so the lifecycle closure picks up +# the assignment that happens before .run(). _backend_ref: dict = {"backend": None} -async def _perf_heartbeat() -> None: - """Keep cur_perf pegged to TARGET_PERF while a reservation is held. - - Without this, workload_served stays at 0 while a /reserve is being held - open. The autoscaler observes cur_perf=0 against max_perf=150, decides - the worker can't deliver its claimed throughput, and downgrades it — - which makes it cautious about scaling up and prone to queueing - subsequent requests behind the held one instead of routing elsewhere. - - Every second, if anything is in flight, set workload_served=TARGET_PERF - and mark update_pending so the metrics loop sends immediately. The - metrics tick resets workload_served back to 0 after sending; we - re-pin it next iteration. - """ - while True: - try: - await asyncio.sleep(1.0) - backend = _backend_ref.get("backend") - if backend is None: - continue - mm = backend.metrics.model_metrics - if mm.requests_working: - mm.workload_served = TARGET_PERF - backend.metrics.update_pending = True - except asyncio.CancelledError: - raise - except Exception as e: - log.debug(f"perf heartbeat error: {e}") - - def _build_internal_app() -> web.Application: app = web.Application() async def release_handler(_request: web.Request) -> web.Response: - event = _active_reservation - if event is None: + """End the active reservation (the singleton session on this worker). + + max_sessions=1 means at most one session is active here. We call + the framework's internal __close_session via name-mangling to + bypass the session_auth check that /session/end normally requires. + That's intentional: this endpoint is localhost-only so trust is + assumed, and the user's consumer can release without having to + plumb session_auth through their queue. + + __close_session reports the session metrics as a success, fires + on_close_route if configured, and pops the session from the dict. + """ + backend = _backend_ref.get("backend") + if backend is None: return web.json_response( - {"released": False, "reason": "no active reservation"}, + {"released": False, "reason": "backend not ready"}, + status=503, + ) + sids = list(backend.sessions.keys()) + if not sids: + return web.json_response( + {"released": False, "reason": "no active session"}, status=200, ) - event.set() - return web.json_response({"released": True}, status=200) + closed = [] + for sid in sids: + try: + if await backend._Backend__close_session(sid): + closed.append(sid) + except Exception as e: + log.warning(f"Error closing session {sid}: {e}") + return web.json_response( + {"released": bool(closed), "session_ids": closed}, + status=200, + ) app.router.add_post("/release", release_handler) @@ -126,18 +114,14 @@ def _build_internal_app() -> web.Application: @asynccontextmanager async def null_lifecycle(): - # Pin max_throughput to exactly 100 by pre-populating the framework's - # benchmark cache file. The framework's __run_benchmark short-circuits - # to `float(file_contents)` when this file exists, bypassing the - # time-based calculation that would otherwise drift to ~99.x due to - # asyncio scheduling overhead. The filename matches the framework - # constant BENCHMARK_INDICATOR_FILE in - # vastai.serverless.server.lib.backend. + # Pin max_throughput to exactly TARGET_PERF by pre-populating the + # framework's benchmark cache file. __run_benchmark short-circuits to + # float(file_contents) when this file exists. try: with open(".has_benchmark", "w") as fh: - fh.write("150") + fh.write(str(int(TARGET_PERF))) except OSError as e: - log.warning(f"Could not pin benchmark cache to 150: {e}") + log.warning(f"Could not pin benchmark cache: {e}") app = _build_internal_app() runner = web.AppRunner(app) @@ -145,8 +129,6 @@ async def null_lifecycle(): site = web.TCPSite(runner, INTERNAL_HOST, INTERNAL_PORT) await site.start() - heartbeat = asyncio.create_task(_perf_heartbeat(), name="null-perf-heartbeat") - lines = [ f"Null pyworker internal control server: http://{INTERNAL_HOST}:{INTERNAL_PORT}", f" POST /release - end the active reservation (call from your queue consumer)", @@ -157,60 +139,32 @@ async def null_lifecycle(): ) else: lines.append(f"Framework healthcheck pointed at: {BACKEND_HEALTH_URL}") + lines.append( + "Reservations use the framework session model. Clients POST to " + "/session/create via the SDK to acquire a worker; max_sessions=1 " + "so each worker holds at most one reservation." + ) log.info("\n".join(lines)) try: yield finally: - heartbeat.cancel() - try: - await heartbeat - except (asyncio.CancelledError, Exception): - pass await runner.cleanup() -async def reserve_worker(**params: object) -> dict: - global _active_reservation - +async def ping(**params: object) -> dict: + """Trivial handler. Exists to satisfy the framework's requirement that + at least one HandlerConfig has a BenchmarkConfig, and to give clients + a route they can hit with session_id to extend their session TTL. + """ if params.get(BENCHMARK_SENTINEL): - # Fallback path only — the lifecycle pre-populates .has_benchmark - # with "150" so __run_benchmark normally short-circuits and never - # invokes us. If the cache write failed, sleep ~1s so the - # time-based calculation lands near 150 (workload=150 / time~=1s). + # Fallback only — the lifecycle pre-pins .has_benchmark so + # __run_benchmark normally short-circuits and this never runs. If + # the cache write failed, sleep ~1s so the time-based throughput + # math lands near TARGET_PERF. await asyncio.sleep(1.0) return {"ok": True, "benchmark": True} - - requested = params.get("duration") - if requested is None: - duration = MAX_RESERVATION_SECONDS - else: - try: - duration = max(0.0, min(float(requested), MAX_RESERVATION_SECONDS)) - except (TypeError, ValueError): - duration = MAX_RESERVATION_SECONDS - - event = asyncio.Event() - _active_reservation = event - log.info( - f"Reservation acquired; awaiting POST /release on " - f"http://{INTERNAL_HOST}:{INTERNAL_PORT}/release " - f"(auto-release after {duration:.1f}s)" - ) - try: - try: - await asyncio.wait_for(event.wait(), timeout=duration) - log.info("Reservation released via /release") - return {"released": "explicit", "duration_cap": duration} - except asyncio.TimeoutError: - log.warning( - f"Reservation hit duration cap of {duration:.1f}s without " - f"explicit /release; releasing automatically" - ) - return {"released": "duration_elapsed", "duration": duration} - finally: - if _active_reservation is event: - _active_reservation = None + return {"ok": True} worker_config = WorkerConfig( @@ -218,17 +172,12 @@ worker_config = WorkerConfig( model_server_port=HEALTH_PORT, model_healthcheck_url=HEALTH_PATH, lifecycle=null_lifecycle(), + max_sessions=1, handlers=[ HandlerConfig( - route="/reserve", - allow_parallel_requests=False, - # Reject (429) any /reserve that arrives while the worker is - # already busy. A held reservation lasts up to MAX_RESERVATION_ - # SECONDS, so queueing behind it would mean hours of wait — - # better to bounce the request immediately so serverless routes - # it to a free worker (or spins up a new one). - max_queue_time=0.0, - remote_function=reserve_worker, + route="/ping", + allow_parallel_requests=True, + remote_function=ping, workload_calculator=lambda _payload: TARGET_PERF, benchmark_config=BenchmarkConfig( generator=lambda: {BENCHMARK_SENTINEL: True},