From 18974873e5df32167e0b9a1af17e8bd30be5b68a Mon Sep 17 00:00:00 2001 From: Rob Ballantyne Date: Mon, 11 May 2026 16:48:52 +0100 Subject: [PATCH] Add null pyworker for queue-driven autoscaling A PyWorker that does not forward to any model server. POST /reserve holds the worker busy until the client disconnects (or the duration cap elapses), so users with their own job queue can drive Vast autoscaling without exposing inbound model traffic on the instance. Co-Authored-By: Claude Opus 4.7 (1M context) --- workers/null/README.md | 101 +++++++++++++++++++++++++++++++++++++++ workers/null/__init__.py | 0 workers/null/client.py | 70 +++++++++++++++++++++++++++ workers/null/worker.py | 83 ++++++++++++++++++++++++++++++++ 4 files changed, 254 insertions(+) create mode 100644 workers/null/README.md create mode 100644 workers/null/__init__.py create mode 100644 workers/null/client.py create mode 100644 workers/null/worker.py diff --git a/workers/null/README.md b/workers/null/README.md new file mode 100644 index 0000000..63920df --- /dev/null +++ b/workers/null/README.md @@ -0,0 +1,101 @@ +# Null PyWorker + +A PyWorker that does **nothing** — it does not forward requests to any model +server. Each HTTP POST to `/reserve` simply marks the worker as busy and holds +the request open until the caller disconnects (or a configured timeout +elapses). + +## When to use it + +Use this worker when you want to drive Vast Serverless autoscaling but you do +**not** want inbound requests to reach a model on the instance. Typical setup: + +- You already have a job queue on your own infrastructure (Redis, SQS, NATS, + etc.). +- A separate worker process on the Vast instance pulls work from that queue + directly. The Vast PyWorker is not involved in the request/response path. +- You want one Vast worker per active queue consumer, and you want the + Serverless autoscaler to spin instances up and down based on demand on + *your* side. + +For each job your side wants to run on a Vast instance, you POST once to +`/reserve`. The autoscaler will provision a worker if none is free; the +request stays open, keeping that worker counted as busy, until you close the +connection. When you close, the worker goes idle and the autoscaler is free +to scale it down. + +## How it works + +- `allow_parallel_requests=False`, so one in-flight `/reserve` fully occupies + the worker. Any second request that lands on the same worker queues (or is + rejected with `429` after `max_queue_time`), pushing the autoscaler to + provision more workers. +- `lifecycle` is used instead of `model_log_file`, so there is no log to tail + and no model server to start. The worker reports itself ready immediately + after the (trivial) benchmark. +- The handler is a `remote_function` rather than an HTTP proxy, so the + framework never tries to forward the request anywhere. + +## API + +### `POST /reserve` + +Holds the worker busy for the lifetime of the request. + +Request body (all fields optional): + +```json +{ "duration": 60 } +``` + +- `duration` (seconds, optional): how long to hold the reservation if the + client does not disconnect first. Capped by `MAX_RESERVATION_SECONDS` (env + var, default 3600). If omitted, defaults to the cap. + +Behavior: + +- Returns `200` with `{"released": "duration_elapsed", "duration": }` when + the duration elapses normally. +- Returns `499` when the client disconnects (the reservation is released + immediately). +- Returns `429` if the worker is already busy and queue wait would exceed + `max_queue_time` (30s by default). + +## Environment variables + +- `MAX_RESERVATION_SECONDS` — upper bound on how long a single `/reserve` + call can hold a worker. Defaults to `3600`. Set lower if you want a tighter + safety cap against stuck clients. + +## Deploying on Vast Serverless + +1. Create a Serverless endpoint and point `PYWORKER_REPO` at this repository + (or your fork). +2. Set `BACKEND=null` in the template so `start_server.sh` runs + `workers.null.worker`. +3. There is no model server to configure; you can omit model-related env vars + entirely. +4. Run your own queue-consumer process on the instance alongside the + PyWorker (e.g. as a separate supervisor service started by the template). + +## Client example + +```bash +python -m workers.null.client --endpoint --duration 300 +``` + +This will POST once to `/reserve`, which causes exactly one worker to be +provisioned (if none is free) and held busy for up to 300 seconds. Killing +the client process (Ctrl-C) drops the connection and releases the worker +early. + +## Notes and caveats + +- The HTTP connection must stay open for the full reservation. Make sure + your client and any intermediate proxies allow long-lived requests + (disable idle timeouts, retries, and connection reuse if necessary). +- If your client retries on timeout, you may end up provisioning duplicate + workers. Use idempotent semantics in *your* queue, or set `duration` to a + finite value and accept release-on-elapse as the normal exit. +- There is no streaming / heartbeat in the response; the request returns + exactly once, when the reservation ends. diff --git a/workers/null/__init__.py b/workers/null/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/workers/null/client.py b/workers/null/client.py new file mode 100644 index 0000000..23cd34d --- /dev/null +++ b/workers/null/client.py @@ -0,0 +1,70 @@ +import argparse +import asyncio +import logging +import os +import sys + +from vastai import Serverless + +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s[%(levelname)-5s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +log = logging.getLogger(__file__) + +ENDPOINT_NAME = "null-prod" + + +async def reserve(client: Serverless, *, endpoint_name: str, duration: float) -> dict: + """Hold a Vast worker open for `duration` seconds (or until we disconnect). + + The worker counts itself busy for the lifetime of this call, so the + autoscaler will keep it provisioned. Returning here means the reservation + has ended — either the worker hit its duration cap or the request errored. + """ + endpoint = await client.get_endpoint(name=endpoint_name) + payload = {"duration": duration} + log.info("POST /reserve duration=%ss", duration) + resp = await endpoint.request("/reserve", payload, cost=100) + return resp["response"] + + +def build_arg_parser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser(description="Vast Null PyWorker demo client") + p.add_argument( + "--endpoint", + default=os.environ.get("VAST_ENDPOINT", ENDPOINT_NAME), + help=f"Vast endpoint name (default: {ENDPOINT_NAME})", + ) + p.add_argument( + "--duration", + type=float, + default=60.0, + help="Seconds to hold the worker busy (default: 60)", + ) + return p + + +async def main_async(): + args = build_arg_parser().parse_args() + + print("=" * 60) + print(f"Reserving 1 worker on endpoint '{args.endpoint}' for {args.duration}s") + print("=" * 60) + + try: + async with Serverless() as client: + response = await reserve( + client=client, + endpoint_name=args.endpoint, + duration=args.duration, + ) + print(f"Reservation result: {response}") + except Exception as e: + log.error("Error during reservation: %s", e, exc_info=True) + sys.exit(1) + + +if __name__ == "__main__": + asyncio.run(main_async()) diff --git a/workers/null/worker.py b/workers/null/worker.py new file mode 100644 index 0000000..99453f8 --- /dev/null +++ b/workers/null/worker.py @@ -0,0 +1,83 @@ +import asyncio +import logging +import os +from contextlib import asynccontextmanager + +from vastai import ( + Worker, + WorkerConfig, + HandlerConfig, + BenchmarkConfig, + LogActionConfig, +) + +log = logging.getLogger(__file__) + +# Safety cap: if a client never disconnects and never sets `duration`, the +# reservation is auto-released after this many seconds so a stuck client +# can't pin a worker indefinitely. Override with MAX_RESERVATION_SECONDS. +MAX_RESERVATION_SECONDS = float(os.environ.get("MAX_RESERVATION_SECONDS", 3600)) + +# Marker the benchmark path sets so the same remote function can return +# immediately during capacity estimation instead of sleeping. +BENCHMARK_SENTINEL = "__null_worker_benchmark__" + + +@asynccontextmanager +async def null_lifecycle(): + log.info("Null pyworker active (no model server)") + try: + yield + finally: + log.info("Null pyworker shutting down") + + +async def reserve_worker(**params: object) -> dict: + if params.get(BENCHMARK_SENTINEL): + return {"ok": True, "benchmark": True} + + requested = params.get("duration") + if requested is None: + duration = MAX_RESERVATION_SECONDS + else: + try: + duration = max(0.0, min(float(requested), MAX_RESERVATION_SECONDS)) + except (TypeError, ValueError): + duration = MAX_RESERVATION_SECONDS + + log.info( + f"Reservation acquired; holding worker busy for up to {duration:.1f}s " + f"(release early by disconnecting the HTTP request)" + ) + try: + await asyncio.sleep(duration) + log.info("Reservation duration elapsed; releasing worker") + return {"released": "duration_elapsed", "duration": duration} + except asyncio.CancelledError: + log.info("Reservation released by client disconnect") + raise + + +worker_config = WorkerConfig( + model_server_url="http://127.0.0.1", + model_server_port=1, + lifecycle=null_lifecycle(), + handlers=[ + HandlerConfig( + route="/reserve", + allow_parallel_requests=False, + max_queue_time=30.0, + remote_function=reserve_worker, + workload_calculator=lambda _payload: 100.0, + benchmark_config=BenchmarkConfig( + generator=lambda: {BENCHMARK_SENTINEL: True}, + runs=1, + concurrency=1, + do_warmup=False, + ), + ), + ], + log_action_config=LogActionConfig(), +) + +Worker(worker_config).run()