From 254ccdf181941e0ce18660a7d488d6adb3a02f7f Mon Sep 17 00:00:00 2001 From: Rob Ballantyne Date: Mon, 11 May 2026 16:59:46 +0100 Subject: [PATCH] Add /release control endpoint to null pyworker The held /reserve now waits on an asyncio.Event and resolves when the local queue consumer POSTs /release on the internal control port (127.0.0.1:18999 by default). This produces a 200 success in metrics instead of the 499 cancellation you got from disconnecting the client. The duration cap stays as a safety net for stuck consumers. The internal aiohttp server is now unconditional and hosts /release always; the stub /health route is added only when BACKEND_HEALTH_URL is unset. NULL_STUB_HEALTH_PORT is renamed to NULL_CONTROL_PORT to reflect the broader role. Co-Authored-By: Claude Opus 4.7 (1M context) --- workers/null/README.md | 117 ++++++++++++++++++++++++-------------- workers/null/worker.py | 126 +++++++++++++++++++++++++++-------------- 2 files changed, 159 insertions(+), 84 deletions(-) diff --git a/workers/null/README.md b/workers/null/README.md index 1ee4035..c13b847 100644 --- a/workers/null/README.md +++ b/workers/null/README.md @@ -2,8 +2,9 @@ A PyWorker that does **nothing** — it does not forward requests to any model server. Each HTTP POST to `/reserve` simply marks the worker as busy and holds -the request open until the caller disconnects (or a configured timeout -elapses). +the request open until the user's queue consumer (running locally on the +instance) calls `/release` on the internal control port — or a safety +timeout elapses. ## When to use it @@ -18,11 +19,12 @@ Use this worker when you want to drive Vast Serverless autoscaling but you do Serverless autoscaler to spin instances up and down based on demand on *your* side. -For each job your side wants to run on a Vast instance, you POST once to -`/reserve`. The autoscaler will provision a worker if none is free; the -request stays open, keeping that worker counted as busy, until you close the -connection. When you close, the worker goes idle and the autoscaler is free -to scale it down. +For each batch of work your side wants on a Vast instance, you POST once to +`/reserve`. The autoscaler provisions a worker if none is free; the request +stays open, keeping that worker counted as busy. When your queue consumer +finishes its work it POSTs `/release` on `127.0.0.1:18999` and the held +`/reserve` returns `200`, so the request is recorded as a normal success in +Vast metrics (not a cancellation). ## How it works @@ -33,19 +35,22 @@ to scale it down. - `lifecycle` is used instead of `model_log_file`, so there is no log to tail and no model server to start. The worker reports itself ready immediately after the (trivial) benchmark. -- The handler is a `remote_function` rather than an HTTP proxy, so the - framework never tries to forward the request anywhere. +- The `/reserve` handler is a `remote_function` rather than an HTTP proxy, so + the framework never tries to forward the request anywhere — it just awaits + an internal `asyncio.Event`. +- An internal aiohttp control server, bound to `127.0.0.1`, hosts + `/release` (and, when no external healthcheck URL is provided, a stub + `/health`). ## Healthchecking The framework periodically GETs a healthcheck URL after startup; if it ever fails after the first success, the worker is marked errored and the -autoscaler can decommission it. The null worker exposes two modes: +autoscaler can decommission it. Two modes: -- **Stub (default)** — a tiny HTTP server runs on - `http://127.0.0.1:18999/health` (override the port with - `NULL_STUB_HEALTH_PORT`) and always returns `200`. This is just enough to - satisfy the framework while you wire up real consumers. +- **Stub (default)** — the internal control server also answers + `GET /health` with `200`. This is just enough to satisfy the framework + while you wire up real consumers. - **Point at your queue consumer (recommended)** — set `BACKEND_HEALTH_URL=http://127.0.0.1:9090/health` (absolute URL) and the pyworker will healthcheck *your* consumer instead. If your consumer @@ -57,39 +62,60 @@ your template. ## API -### `POST /reserve` +### `POST /reserve` (external port, signed by the autoscaler) -Holds the worker busy for the lifetime of the request. +Holds the worker busy until the reservation ends. Request body (all fields optional): ```json -{ "duration": 60 } +{ "duration": 600 } ``` -- `duration` (seconds, optional): how long to hold the reservation if the - client does not disconnect first. Capped by `MAX_RESERVATION_SECONDS` (env - var, default 3600). If omitted, defaults to the cap. +- `duration` (seconds, optional): safety cap on how long to hold the + reservation if no `/release` arrives. Capped by `MAX_RESERVATION_SECONDS` + (env var, default 3600). If omitted, defaults to that cap. Behavior: -- Returns `200` with `{"released": "duration_elapsed", "duration": }` when - the duration elapses normally. -- Returns `499` when the client disconnects (the reservation is released - immediately). +- Returns `200` with `{"released": "explicit", ...}` when the local consumer + POSTs `/release` on the internal port. **This is the intended happy path + — the request is counted as a success in metrics.** +- Returns `200` with `{"released": "duration_elapsed", "duration": }` if + the duration cap fires (safety net for a stuck consumer). +- Returns `499` if the external client disconnects (counted as cancelled in + metrics — avoid this; use `/release` instead). - Returns `429` if the worker is already busy and queue wait would exceed `max_queue_time` (30s by default). +### `POST /release` (internal port, localhost-only) + +Marks the active reservation as done. No body required. Idempotent: + +```bash +curl -X POST http://127.0.0.1:18999/release +``` + +Responses: + +- `200 {"released": true}` — active reservation was released; the held + `/reserve` will return `{"released": "explicit"}`. +- `200 {"released": false, "reason": "no active reservation"}` — nothing was + in flight, no-op. + +Only processes on the Vast instance can reach this port. There is no +authentication on it. + ## Environment variables - `MAX_RESERVATION_SECONDS` — upper bound on how long a single `/reserve` - call can hold a worker. Defaults to `3600`. Set lower if you want a tighter - safety cap against stuck clients. + call can hold a worker if `/release` is never called. Defaults to `3600`. - `BACKEND_HEALTH_URL` — absolute URL the framework should healthcheck - (e.g. `http://127.0.0.1:9090/health`). When set, the stub server does not - run. When unset, the built-in stub is used. -- `NULL_STUB_HEALTH_PORT` — port for the built-in stub healthcheck server. - Defaults to `18999`. Only used when `BACKEND_HEALTH_URL` is unset. + (e.g. `http://127.0.0.1:9090/health`). When set, the stub `/health` route + is not registered on the internal server. When unset, the built-in stub + is used. +- `NULL_CONTROL_PORT` — port for the internal control server (hosts + `/release` and optionally `/health`). Defaults to `18999`. ## Deploying on Vast Serverless @@ -100,26 +126,35 @@ Behavior: 3. There is no model server to configure; you can omit model-related env vars entirely. 4. Run your own queue-consumer process on the instance alongside the - PyWorker (e.g. as a separate supervisor service started by the template). + PyWorker. When the consumer finishes its work it should: + ```bash + curl -X POST http://127.0.0.1:18999/release + ``` + so the held `/reserve` returns success and the autoscaler can scale the + worker down cleanly. ## Client example ```bash -python -m workers.null.client --endpoint --duration 300 +python -m workers.null.client --endpoint --duration 600 ``` -This will POST once to `/reserve`, which causes exactly one worker to be -provisioned (if none is free) and held busy for up to 300 seconds. Killing -the client process (Ctrl-C) drops the connection and releases the worker -early. +This POSTs once to `/reserve`, which causes exactly one worker to be +provisioned (if none is free) and held busy. To exercise the full flow, +shell into the worker and run `curl -X POST http://127.0.0.1:18999/release` +— the client will return with `{"released": "explicit", ...}`. ## Notes and caveats -- The HTTP connection must stay open for the full reservation. Make sure - your client and any intermediate proxies allow long-lived requests - (disable idle timeouts, retries, and connection reuse if necessary). +- The HTTP connection from the external caller must stay open for the full + reservation. Make sure your client and any intermediate proxies allow + long-lived requests (disable idle timeouts, retries, and connection + reuse if necessary). - If your client retries on timeout, you may end up provisioning duplicate - workers. Use idempotent semantics in *your* queue, or set `duration` to a - finite value and accept release-on-elapse as the normal exit. + workers. Configure `duration` generously and rely on `/release` from the + consumer to end reservations promptly. +- Avoid disconnecting the external `/reserve` request as a way to release — + that produces a `499` and is counted as a cancellation in Vast metrics. + Always release via `POST /release` on the internal port. - There is no streaming / heartbeat in the response; the request returns exactly once, when the reservation ends. diff --git a/workers/null/worker.py b/workers/null/worker.py index 59b0dad..bd2f505 100644 --- a/workers/null/worker.py +++ b/workers/null/worker.py @@ -2,6 +2,7 @@ import asyncio import logging import os from contextlib import asynccontextmanager +from typing import Optional from urllib.parse import urlsplit from aiohttp import web @@ -16,8 +17,8 @@ from vastai import ( log = logging.getLogger(__file__) -# Safety cap: if a client never disconnects and never sets `duration`, the -# reservation is auto-released after this many seconds so a stuck client +# Safety cap: if the user's queue consumer never calls /release, the +# reservation is auto-released after this many seconds so a forgotten /release # can't pin a worker indefinitely. Override with MAX_RESERVATION_SECONDS. MAX_RESERVATION_SECONDS = float(os.environ.get("MAX_RESERVATION_SECONDS", 3600)) @@ -25,20 +26,19 @@ MAX_RESERVATION_SECONDS = float(os.environ.get("MAX_RESERVATION_SECONDS", 3600)) # immediately during capacity estimation instead of sleeping. BENCHMARK_SENTINEL = "__null_worker_benchmark__" -# Healthcheck wiring. The framework periodically GETs -# `:` and marks the -# worker errored if that ever fails after the first success. For the null -# worker we either: -# * point at a URL the user supplies via BACKEND_HEALTH_URL — typically -# their own queue-consumer's health endpoint, so the autoscaler sees the -# worker as broken if the consumer dies, or -# * run a tiny built-in stub that always returns 200, so the framework has -# something live to talk to until the user wires up a real consumer. -BACKEND_HEALTH_URL = os.environ.get("BACKEND_HEALTH_URL", "").strip() -STUB_HEALTH_HOST = "127.0.0.1" -STUB_HEALTH_PORT = int(os.environ.get("NULL_STUB_HEALTH_PORT", 18999)) +# Internal control server. Hosts: +# * POST /release — always available, marks the active reservation as +# done so the held /reserve returns 200 (success in metrics, not a +# cancellation). +# * GET /health — only when no external BACKEND_HEALTH_URL is set; the +# framework's healthcheck loop polls it so the worker has a live signal. +# Bound to 127.0.0.1 so only processes on the instance can reach it. +INTERNAL_HOST = "127.0.0.1" +INTERNAL_PORT = int(os.environ.get("NULL_CONTROL_PORT", 18999)) STUB_HEALTH_PATH = "/health" +BACKEND_HEALTH_URL = os.environ.get("BACKEND_HEALTH_URL", "").strip() + if BACKEND_HEALTH_URL: _parsed = urlsplit(BACKEND_HEALTH_URL) if not _parsed.scheme or not _parsed.hostname: @@ -48,43 +48,73 @@ if BACKEND_HEALTH_URL: HEALTH_BASE_URL = f"{_parsed.scheme}://{_parsed.hostname}" HEALTH_PORT = _parsed.port or (443 if _parsed.scheme == "https" else 80) HEALTH_PATH = _parsed.path or "/" - USE_STUB = False + USE_STUB_HEALTH = False else: - HEALTH_BASE_URL = f"http://{STUB_HEALTH_HOST}" - HEALTH_PORT = STUB_HEALTH_PORT + HEALTH_BASE_URL = f"http://{INTERNAL_HOST}" + HEALTH_PORT = INTERNAL_PORT HEALTH_PATH = STUB_HEALTH_PATH - USE_STUB = True + USE_STUB_HEALTH = True + + +# Singleton active reservation. `allow_parallel_requests=False` on the +# /reserve handler guarantees the framework only runs one at a time per +# worker, so a single slot is enough. +_active_reservation: Optional[asyncio.Event] = None + + +def _build_internal_app() -> web.Application: + app = web.Application() + + async def release_handler(_request: web.Request) -> web.Response: + event = _active_reservation + if event is None: + return web.json_response( + {"released": False, "reason": "no active reservation"}, + status=200, + ) + event.set() + return web.json_response({"released": True}, status=200) + + app.router.add_post("/release", release_handler) + + if USE_STUB_HEALTH: + async def stub_health(_request: web.Request) -> web.Response: + return web.Response(status=200, text="ok") + + app.router.add_get(STUB_HEALTH_PATH, stub_health) + + return app @asynccontextmanager async def null_lifecycle(): - runner = None - if USE_STUB: - async def stub_health(_request: web.Request) -> web.Response: - return web.Response(status=200, text="ok") + app = _build_internal_app() + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, INTERNAL_HOST, INTERNAL_PORT) + await site.start() - app = web.Application() - app.router.add_get(STUB_HEALTH_PATH, stub_health) - runner = web.AppRunner(app) - await runner.setup() - site = web.TCPSite(runner, STUB_HEALTH_HOST, STUB_HEALTH_PORT) - await site.start() - log.info( - f"Null pyworker stub healthcheck listening on " - f"http://{STUB_HEALTH_HOST}:{STUB_HEALTH_PORT}{STUB_HEALTH_PATH} " - f"(override by setting BACKEND_HEALTH_URL)" + lines = [ + f"Null pyworker internal control server: http://{INTERNAL_HOST}:{INTERNAL_PORT}", + f" POST /release - end the active reservation (call from your queue consumer)", + ] + if USE_STUB_HEALTH: + lines.append( + f" GET {STUB_HEALTH_PATH} - stub healthcheck (override with BACKEND_HEALTH_URL)" ) else: - log.info(f"Null pyworker healthcheck pointing at {BACKEND_HEALTH_URL}") + lines.append(f"Framework healthcheck pointed at: {BACKEND_HEALTH_URL}") + log.info("\n".join(lines)) try: yield finally: - if runner is not None: - await runner.cleanup() + await runner.cleanup() async def reserve_worker(**params: object) -> dict: + global _active_reservation + if params.get(BENCHMARK_SENTINEL): return {"ok": True, "benchmark": True} @@ -97,17 +127,27 @@ async def reserve_worker(**params: object) -> dict: except (TypeError, ValueError): duration = MAX_RESERVATION_SECONDS + event = asyncio.Event() + _active_reservation = event log.info( - f"Reservation acquired; holding worker busy for up to {duration:.1f}s " - f"(release early by disconnecting the HTTP request)" + f"Reservation acquired; awaiting POST /release on " + f"http://{INTERNAL_HOST}:{INTERNAL_PORT}/release " + f"(auto-release after {duration:.1f}s)" ) try: - await asyncio.sleep(duration) - log.info("Reservation duration elapsed; releasing worker") - return {"released": "duration_elapsed", "duration": duration} - except asyncio.CancelledError: - log.info("Reservation released by client disconnect") - raise + try: + await asyncio.wait_for(event.wait(), timeout=duration) + log.info("Reservation released via /release") + return {"released": "explicit", "duration_cap": duration} + except asyncio.TimeoutError: + log.warning( + f"Reservation hit duration cap of {duration:.1f}s without " + f"explicit /release; releasing automatically" + ) + return {"released": "duration_elapsed", "duration": duration} + finally: + if _active_reservation is event: + _active_reservation = None worker_config = WorkerConfig(