diff --git a/workers/null/README.md b/workers/null/README.md new file mode 100644 index 0000000..63920df --- /dev/null +++ b/workers/null/README.md @@ -0,0 +1,101 @@ +# Null PyWorker + +A PyWorker that does **nothing** — it does not forward requests to any model +server. Each HTTP POST to `/reserve` simply marks the worker as busy and holds +the request open until the caller disconnects (or a configured timeout +elapses). + +## When to use it + +Use this worker when you want to drive Vast Serverless autoscaling but you do +**not** want inbound requests to reach a model on the instance. Typical setup: + +- You already have a job queue on your own infrastructure (Redis, SQS, NATS, + etc.). +- A separate worker process on the Vast instance pulls work from that queue + directly. The Vast PyWorker is not involved in the request/response path. +- You want one Vast worker per active queue consumer, and you want the + Serverless autoscaler to spin instances up and down based on demand on + *your* side. + +For each job your side wants to run on a Vast instance, you POST once to +`/reserve`. The autoscaler will provision a worker if none is free; the +request stays open, keeping that worker counted as busy, until you close the +connection. When you close, the worker goes idle and the autoscaler is free +to scale it down. + +## How it works + +- `allow_parallel_requests=False`, so one in-flight `/reserve` fully occupies + the worker. Any second request that lands on the same worker queues (or is + rejected with `429` after `max_queue_time`), pushing the autoscaler to + provision more workers. +- `lifecycle` is used instead of `model_log_file`, so there is no log to tail + and no model server to start. The worker reports itself ready immediately + after the (trivial) benchmark. +- The handler is a `remote_function` rather than an HTTP proxy, so the + framework never tries to forward the request anywhere. + +## API + +### `POST /reserve` + +Holds the worker busy for the lifetime of the request. + +Request body (all fields optional): + +```json +{ "duration": 60 } +``` + +- `duration` (seconds, optional): how long to hold the reservation if the + client does not disconnect first. Capped by `MAX_RESERVATION_SECONDS` (env + var, default 3600). If omitted, defaults to the cap. + +Behavior: + +- Returns `200` with `{"released": "duration_elapsed", "duration": }` when + the duration elapses normally. +- Returns `499` when the client disconnects (the reservation is released + immediately). +- Returns `429` if the worker is already busy and queue wait would exceed + `max_queue_time` (30s by default). + +## Environment variables + +- `MAX_RESERVATION_SECONDS` — upper bound on how long a single `/reserve` + call can hold a worker. Defaults to `3600`. Set lower if you want a tighter + safety cap against stuck clients. + +## Deploying on Vast Serverless + +1. Create a Serverless endpoint and point `PYWORKER_REPO` at this repository + (or your fork). +2. Set `BACKEND=null` in the template so `start_server.sh` runs + `workers.null.worker`. +3. There is no model server to configure; you can omit model-related env vars + entirely. +4. Run your own queue-consumer process on the instance alongside the + PyWorker (e.g. as a separate supervisor service started by the template). + +## Client example + +```bash +python -m workers.null.client --endpoint --duration 300 +``` + +This will POST once to `/reserve`, which causes exactly one worker to be +provisioned (if none is free) and held busy for up to 300 seconds. Killing +the client process (Ctrl-C) drops the connection and releases the worker +early. + +## Notes and caveats + +- The HTTP connection must stay open for the full reservation. Make sure + your client and any intermediate proxies allow long-lived requests + (disable idle timeouts, retries, and connection reuse if necessary). +- If your client retries on timeout, you may end up provisioning duplicate + workers. Use idempotent semantics in *your* queue, or set `duration` to a + finite value and accept release-on-elapse as the normal exit. +- There is no streaming / heartbeat in the response; the request returns + exactly once, when the reservation ends. diff --git a/workers/null/__init__.py b/workers/null/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/workers/null/client.py b/workers/null/client.py new file mode 100644 index 0000000..23cd34d --- /dev/null +++ b/workers/null/client.py @@ -0,0 +1,70 @@ +import argparse +import asyncio +import logging +import os +import sys + +from vastai import Serverless + +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s[%(levelname)-5s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +log = logging.getLogger(__file__) + +ENDPOINT_NAME = "null-prod" + + +async def reserve(client: Serverless, *, endpoint_name: str, duration: float) -> dict: + """Hold a Vast worker open for `duration` seconds (or until we disconnect). + + The worker counts itself busy for the lifetime of this call, so the + autoscaler will keep it provisioned. Returning here means the reservation + has ended — either the worker hit its duration cap or the request errored. + """ + endpoint = await client.get_endpoint(name=endpoint_name) + payload = {"duration": duration} + log.info("POST /reserve duration=%ss", duration) + resp = await endpoint.request("/reserve", payload, cost=100) + return resp["response"] + + +def build_arg_parser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser(description="Vast Null PyWorker demo client") + p.add_argument( + "--endpoint", + default=os.environ.get("VAST_ENDPOINT", ENDPOINT_NAME), + help=f"Vast endpoint name (default: {ENDPOINT_NAME})", + ) + p.add_argument( + "--duration", + type=float, + default=60.0, + help="Seconds to hold the worker busy (default: 60)", + ) + return p + + +async def main_async(): + args = build_arg_parser().parse_args() + + print("=" * 60) + print(f"Reserving 1 worker on endpoint '{args.endpoint}' for {args.duration}s") + print("=" * 60) + + try: + async with Serverless() as client: + response = await reserve( + client=client, + endpoint_name=args.endpoint, + duration=args.duration, + ) + print(f"Reservation result: {response}") + except Exception as e: + log.error("Error during reservation: %s", e, exc_info=True) + sys.exit(1) + + +if __name__ == "__main__": + asyncio.run(main_async()) diff --git a/workers/null/worker.py b/workers/null/worker.py new file mode 100644 index 0000000..99453f8 --- /dev/null +++ b/workers/null/worker.py @@ -0,0 +1,83 @@ +import asyncio +import logging +import os +from contextlib import asynccontextmanager + +from vastai import ( + Worker, + WorkerConfig, + HandlerConfig, + BenchmarkConfig, + LogActionConfig, +) + +log = logging.getLogger(__file__) + +# Safety cap: if a client never disconnects and never sets `duration`, the +# reservation is auto-released after this many seconds so a stuck client +# can't pin a worker indefinitely. Override with MAX_RESERVATION_SECONDS. +MAX_RESERVATION_SECONDS = float(os.environ.get("MAX_RESERVATION_SECONDS", 3600)) + +# Marker the benchmark path sets so the same remote function can return +# immediately during capacity estimation instead of sleeping. +BENCHMARK_SENTINEL = "__null_worker_benchmark__" + + +@asynccontextmanager +async def null_lifecycle(): + log.info("Null pyworker active (no model server)") + try: + yield + finally: + log.info("Null pyworker shutting down") + + +async def reserve_worker(**params: object) -> dict: + if params.get(BENCHMARK_SENTINEL): + return {"ok": True, "benchmark": True} + + requested = params.get("duration") + if requested is None: + duration = MAX_RESERVATION_SECONDS + else: + try: + duration = max(0.0, min(float(requested), MAX_RESERVATION_SECONDS)) + except (TypeError, ValueError): + duration = MAX_RESERVATION_SECONDS + + log.info( + f"Reservation acquired; holding worker busy for up to {duration:.1f}s " + f"(release early by disconnecting the HTTP request)" + ) + try: + await asyncio.sleep(duration) + log.info("Reservation duration elapsed; releasing worker") + return {"released": "duration_elapsed", "duration": duration} + except asyncio.CancelledError: + log.info("Reservation released by client disconnect") + raise + + +worker_config = WorkerConfig( + model_server_url="http://127.0.0.1", + model_server_port=1, + lifecycle=null_lifecycle(), + handlers=[ + HandlerConfig( + route="/reserve", + allow_parallel_requests=False, + max_queue_time=30.0, + remote_function=reserve_worker, + workload_calculator=lambda _payload: 100.0, + benchmark_config=BenchmarkConfig( + generator=lambda: {BENCHMARK_SENTINEL: True}, + runs=1, + concurrency=1, + do_warmup=False, + ), + ), + ], + log_action_config=LogActionConfig(), +) + +Worker(worker_config).run()