diff --git a/workers/null/README.md b/workers/null/README.md index 63920df..1ee4035 100644 --- a/workers/null/README.md +++ b/workers/null/README.md @@ -36,6 +36,25 @@ to scale it down. - The handler is a `remote_function` rather than an HTTP proxy, so the framework never tries to forward the request anywhere. +## Healthchecking + +The framework periodically GETs a healthcheck URL after startup; if it ever +fails after the first success, the worker is marked errored and the +autoscaler can decommission it. The null worker exposes two modes: + +- **Stub (default)** — a tiny HTTP server runs on + `http://127.0.0.1:18999/health` (override the port with + `NULL_STUB_HEALTH_PORT`) and always returns `200`. This is just enough to + satisfy the framework while you wire up real consumers. +- **Point at your queue consumer (recommended)** — set + `BACKEND_HEALTH_URL=http://127.0.0.1:9090/health` (absolute URL) and the + pyworker will healthcheck *your* consumer instead. If your consumer + process crashes, the autoscaler will see the worker as broken. + +Run your queue consumer on the instance alongside the PyWorker, expose a +plain `/health` endpoint on it, then set `BACKEND_HEALTH_URL` accordingly in +your template. + ## API ### `POST /reserve` @@ -66,6 +85,11 @@ Behavior: - `MAX_RESERVATION_SECONDS` — upper bound on how long a single `/reserve` call can hold a worker. Defaults to `3600`. Set lower if you want a tighter safety cap against stuck clients. +- `BACKEND_HEALTH_URL` — absolute URL the framework should healthcheck + (e.g. `http://127.0.0.1:9090/health`). When set, the stub server does not + run. When unset, the built-in stub is used. +- `NULL_STUB_HEALTH_PORT` — port for the built-in stub healthcheck server. + Defaults to `18999`. Only used when `BACKEND_HEALTH_URL` is unset. ## Deploying on Vast Serverless diff --git a/workers/null/worker.py b/workers/null/worker.py index 99453f8..59b0dad 100644 --- a/workers/null/worker.py +++ b/workers/null/worker.py @@ -2,6 +2,9 @@ import asyncio import logging import os from contextlib import asynccontextmanager +from urllib.parse import urlsplit + +from aiohttp import web from vastai import ( Worker, @@ -22,14 +25,63 @@ MAX_RESERVATION_SECONDS = float(os.environ.get("MAX_RESERVATION_SECONDS", 3600)) # immediately during capacity estimation instead of sleeping. BENCHMARK_SENTINEL = "__null_worker_benchmark__" +# Healthcheck wiring. The framework periodically GETs +# `:` and marks the +# worker errored if that ever fails after the first success. For the null +# worker we either: +# * point at a URL the user supplies via BACKEND_HEALTH_URL — typically +# their own queue-consumer's health endpoint, so the autoscaler sees the +# worker as broken if the consumer dies, or +# * run a tiny built-in stub that always returns 200, so the framework has +# something live to talk to until the user wires up a real consumer. +BACKEND_HEALTH_URL = os.environ.get("BACKEND_HEALTH_URL", "").strip() +STUB_HEALTH_HOST = "127.0.0.1" +STUB_HEALTH_PORT = int(os.environ.get("NULL_STUB_HEALTH_PORT", 18999)) +STUB_HEALTH_PATH = "/health" + +if BACKEND_HEALTH_URL: + _parsed = urlsplit(BACKEND_HEALTH_URL) + if not _parsed.scheme or not _parsed.hostname: + raise ValueError( + f"BACKEND_HEALTH_URL must be an absolute URL, got: {BACKEND_HEALTH_URL!r}" + ) + HEALTH_BASE_URL = f"{_parsed.scheme}://{_parsed.hostname}" + HEALTH_PORT = _parsed.port or (443 if _parsed.scheme == "https" else 80) + HEALTH_PATH = _parsed.path or "/" + USE_STUB = False +else: + HEALTH_BASE_URL = f"http://{STUB_HEALTH_HOST}" + HEALTH_PORT = STUB_HEALTH_PORT + HEALTH_PATH = STUB_HEALTH_PATH + USE_STUB = True + @asynccontextmanager async def null_lifecycle(): - log.info("Null pyworker active (no model server)") + runner = None + if USE_STUB: + async def stub_health(_request: web.Request) -> web.Response: + return web.Response(status=200, text="ok") + + app = web.Application() + app.router.add_get(STUB_HEALTH_PATH, stub_health) + runner = web.AppRunner(app) + await runner.setup() + site = web.TCPSite(runner, STUB_HEALTH_HOST, STUB_HEALTH_PORT) + await site.start() + log.info( + f"Null pyworker stub healthcheck listening on " + f"http://{STUB_HEALTH_HOST}:{STUB_HEALTH_PORT}{STUB_HEALTH_PATH} " + f"(override by setting BACKEND_HEALTH_URL)" + ) + else: + log.info(f"Null pyworker healthcheck pointing at {BACKEND_HEALTH_URL}") + try: yield finally: - log.info("Null pyworker shutting down") + if runner is not None: + await runner.cleanup() async def reserve_worker(**params: object) -> dict: @@ -59,8 +111,9 @@ async def reserve_worker(**params: object) -> dict: worker_config = WorkerConfig( - model_server_url="http://127.0.0.1", - model_server_port=1, + model_server_url=HEALTH_BASE_URL, + model_server_port=HEALTH_PORT, + model_healthcheck_url=HEALTH_PATH, lifecycle=null_lifecycle(), handlers=[ HandlerConfig(