Compare commits

..

4 Commits

Author SHA1 Message Date
Lucas Armand 4e951f4912 test vastai_sdk test package 2026-04-08 13:38:22 -07:00
Lucas Armand f636012685 add test index 2026-04-08 13:18:46 -07:00
Lucas Armand ddb986d561 use test package 2026-04-08 13:12:27 -07:00
Lucas Armand 99a3319e66 Point to vast-cli 2026-04-08 12:30:20 -07:00
9 changed files with 19 additions and 315 deletions
+1 -1
View File
@@ -9,7 +9,7 @@ This repository contains **example PyWorkers** used by Vast.ais default Serve
- Optionally supports FIFO queueing when the backend cannot process concurrent requests - Optionally supports FIFO queueing when the backend cannot process concurrent requests
- Detects readiness/failure from model logs and runs a benchmark to estimate throughput - Detects readiness/failure from model logs and runs a benchmark to estimate throughput
> Important: The **core PyWorker framework** (Worker, WorkerConfig, HandlerConfig, BenchmarkConfig, LogActionConfig) is provided by the **`vastai` / `vastai-sdk`** Python package (https://github.com/vast-ai/vast-sdk). This repo focuses on *worker implementations and examples*, not the framework internals. > Important: The **core PyWorker framework** (Worker, WorkerConfig, HandlerConfig, BenchmarkConfig, LogActionConfig) is provided by the **`vastai`** Python package (https://github.com/vast-ai/vast-cli). This repo focuses on *worker implementations and examples*, not the framework internals.
## Repository Purpose ## Repository Purpose
+4 -4
View File
@@ -1,16 +1,16 @@
# Where did the PyWorker code go? # Where did the PyWorker code go?
We have moved the PyWorker source code into the `vastai-sdk` Python SDK. We have moved the PyWorker source code into the `vastai` Python package.
You can install it with You can install it with
``` ```
pip install vastai-sdk pip install vastai
``` ```
All of the source code can be found here: All of the source code can be found here:
https://github.com/vast-ai/vast-sdk https://github.com/vast-ai/vast-cli
And can be imported from vastai.serverless.server.lib And can be imported from vastai.serverless.server.lib
Serverless instances automatically run the start_server.sh script, which installs the vastai-sdk. Serverless instances automatically run the start_server.sh script, which installs the vastai package.
This is how the PyWorker source code makes it onto your serverless instances. This is how the PyWorker source code makes it onto your serverless instances.
You provide a worker.py file in your PYWORKER_REPO, and the start_server.sh will You provide a worker.py file in your PYWORKER_REPO, and the start_server.sh will
create and run a PyWorker according to your configuration defined in the file. create and run a PyWorker according to your configuration defined in the file.
+1 -2
View File
@@ -1,2 +1 @@
vastai-sdk>=0.3.0 vastai>=0.3.0
nltk==3.9.4
+12 -12
View File
@@ -53,7 +53,7 @@ JSON
exit 1 exit 1
} }
function install_vastai_sdk() { function install_vastai() {
local uv_flags=() local uv_flags=()
if [ "${USE_SYSTEM_PYTHON:-}" = "true" ]; then if [ "${USE_SYSTEM_PYTHON:-}" = "true" ]; then
uv_flags+=(--system --break-system-packages) uv_flags+=(--system --break-system-packages)
@@ -77,8 +77,8 @@ function install_vastai_sdk() {
if [ -n "${SDK_VERSION:-}" ]; then if [ -n "${SDK_VERSION:-}" ]; then
echo "Installing vastai version ${SDK_VERSION}" echo "Installing vastai version ${SDK_VERSION}"
if ! uv pip install "${uv_flags[@]}" "vastai==${SDK_VERSION}"; then if ! uv pip install --index-url https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple/ "${uv_flags[@]}" "vastai-sdk-vast==${SDK_VERSION}"; then
report_error_and_exit "Failed to install vastai==${SDK_VERSION}" report_error_and_exit "Failed to install vastai-vast==${SDK_VERSION}"
fi fi
return 0 return 0
fi fi
@@ -140,7 +140,7 @@ if [ "${USE_SYSTEM_PYTHON:-}" = "true" ]; then
fi fi
fi fi
fi fi
install_vastai_sdk install_vastai
touch ~/.no_auto_tmux touch ~/.no_auto_tmux
elif [ ! -d "$ENV_PATH" ]; then elif [ ! -d "$ENV_PATH" ]; then
echo "setting up venv" echo "setting up venv"
@@ -197,7 +197,7 @@ elif [ ! -d "$ENV_PATH" ]; then
report_error_and_exit "Failed to install Python requirements" report_error_and_exit "Failed to install Python requirements"
fi fi
install_vastai_sdk install_vastai
if ! touch ~/.no_auto_tmux; then if ! touch ~/.no_auto_tmux; then
report_error_and_exit "Failed to create ~/.no_auto_tmux" report_error_and_exit "Failed to create ~/.no_auto_tmux"
@@ -237,7 +237,7 @@ else
fi fi
fi fi
install_vastai_sdk install_vastai
fi fi
fi fi
@@ -318,8 +318,8 @@ if [ "$IS_DEPLOYMENT" = "true" ]; then
# The s3_key exists in the DB as soon as the deployment is created, but the # The s3_key exists in the DB as soon as the deployment is created, but the
# actual upload may still be in flight from the client side. # actual upload may still be in flight from the client side.
# Install SDK (uses the install_vastai_sdk function which supports SDK_BRANCH/SDK_VERSION) # Install SDK (uses the install_vastai function which supports SDK_BRANCH/SDK_VERSION)
install_vastai_sdk install_vastai
# Run deployment in serve mode # Run deployment in serve mode
export VAST_DEPLOYMENT_MODE=serve export VAST_DEPLOYMENT_MODE=serve
echo "Starting deployment: python3 $DEPLOY_DIR/deployment.py" echo "Starting deployment: python3 $DEPLOY_DIR/deployment.py"
@@ -339,19 +339,19 @@ set +e
PY_STATUS=1 PY_STATUS=1
if [ -f "$SERVER_DIR/worker.py" ]; then if [ -f "$SERVER_DIR/worker.py" ]; then
echo "Running worker.py" echo "trying worker.py"
python3 -m "worker" |& tee -a "$PYWORKER_LOG" python3 -m "worker" |& tee -a "$PYWORKER_LOG"
PY_STATUS=${PIPESTATUS[0]} PY_STATUS=${PIPESTATUS[0]}
fi fi
if [ "${PY_STATUS}" -ne 0 ] && [ -f "$SERVER_DIR/workers/$BACKEND/worker.py" ]; then if [ "${PY_STATUS}" -ne 0 ] && [ -f "$SERVER_DIR/workers/$BACKEND/worker.py" ]; then
echo "Running workers.${BACKEND}.worker" echo "trying workers.${BACKEND}.worker"
python3 -m "workers.${BACKEND}.worker" |& tee -a "$PYWORKER_LOG" python3 -m "workers.${BACKEND}.worker" |& tee -a "$PYWORKER_LOG"
PY_STATUS=${PIPESTATUS[0]} PY_STATUS=${PIPESTATUS[0]}
fi fi
if [ "${PY_STATUS}" -ne 0 ] && [ -f "$SERVER_DIR/workers/$BACKEND/server.py" ]; then if [ "${PY_STATUS}" -ne 0 ] && [ -f "$SERVER_DIR/workers/$BACKEND/server.py" ]; then
echo "Running workers.${BACKEND}.server" echo "trying workers.${BACKEND}.server"
python3 -m "workers.${BACKEND}.server" |& tee -a "$PYWORKER_LOG" python3 -m "workers.${BACKEND}.server" |& tee -a "$PYWORKER_LOG"
PY_STATUS=${PIPESTATUS[0]} PY_STATUS=${PIPESTATUS[0]}
fi fi
@@ -365,4 +365,4 @@ if [ "${PY_STATUS}" -ne 0 ]; then
report_error_and_exit "PyWorker exited with status ${PY_STATUS}" report_error_and_exit "PyWorker exited with status ${PY_STATUS}"
fi fi
echo "PyWorker bootstrap complete" echo "launching PyWorker server done"
-88
View File
@@ -1,88 +0,0 @@
# Null PyWorker
Holds Vast Serverless reservations open without forwarding any work to a
model. Use it when your real workload (a queue consumer in any language)
runs as a separate process on the instance and you just want to drive
Vast autoscaling: **one POST reserves a worker, one POST releases it.**
## Use case
You have a job queue on your own infrastructure (Redis, SQS, NATS, etc.)
and a consumer (node, golang, python, a binary — anything) that pulls
from it. You want one Vast worker per unit of in-flight work, scaling
elastically from zero. The null PyWorker is the autoscaling driver; your
consumer does the work.
## How it works
Reservations use the framework's session API. The SDK's
`endpoint.session(...)` POSTs `/session/create` to reserve a worker;
`session.close()` POSTs `/session/end` to release it. `max_sessions=1`
means each worker holds exactly one reservation — the next reservation
either lands on a free worker or triggers a scale-up.
The PyWorker itself does nothing functional:
- One trivial `/ping` route to satisfy the framework's benchmark
requirement (its `max_perf` is pinned to 100).
- An internal `/release` endpoint on `127.0.0.1:18999` for the local
consumer to end the session without needing `session_auth`.
## Endpoint parameters
Tested working configuration:
| Parameter | Value | Why |
|---|---|---|
| `target_util` | `1.0` | One session = one worker. Default `0.9` rounds up to an extra worker. |
| `min_load` | `0` | Scale-to-zero floor. |
| `max_queue_time` | `1` | Stop routing to an occupied worker after ~1s of implied queue. |
| `target_queue_time` | `0.5` | Trigger scale-up promptly once anything queues. |
| `inactivity_timeout` | `10` (seconds) | Permit scale-to-zero after 10s idle. |
## API
| Route | Where | Use |
|---|---|---|
| `POST /session/create` | endpoint, signed | Reserve a worker (`endpoint.session(...)`) |
| `POST /session/end` | endpoint, signed | Release (`session.close()`) |
| `POST /release` | `127.0.0.1:18999`, no auth | Local consumer release, no `session_auth` needed |
## Healthcheck
Default: stub on `127.0.0.1:18999/health` returning `200`. Set
`BACKEND_HEALTH_URL=http://127.0.0.1:9090/health` (absolute URL) to point
the framework at your queue consumer's health endpoint instead — if the
consumer dies, the autoscaler sees the worker as broken.
## Deploying
1. Point `PYWORKER_REPO` at this repo (or your fork).
2. Set `BACKEND=null` in the template.
3. Run your queue consumer alongside the PyWorker. When it's done with
a unit of work:
```bash
curl -X POST http://127.0.0.1:18999/release
```
## Client demo
```bash
# Single reservation, hold 180s
python -m workers.null.client --endpoint <NAME> --instance alpha
# Three concurrent reservations, started 30s apart, each held 360s
python -m workers.null.client --endpoint <NAME> --instance alpha --count 3 --hold 360
```
Flags: `--count` (number of concurrent sessions, default 1), `--hold`
(seconds each session is held, default 180), `--interval` (seconds
between starts when `--count > 1`, default 30), `--cost` (cost reported
at session-create, default 100 = `max_perf`), `--instance` (`prod` |
`alpha` | `candidate` | `local`).
## Environment variables
- `BACKEND_HEALTH_URL` — absolute URL the framework healthchecks. Stub
is used when unset.
- `NULL_CONTROL_PORT` — internal control server port. Defaults to `18999`.
View File
-64
View File
@@ -1,64 +0,0 @@
import argparse
import asyncio
import logging
import os
import sys
from vastai import Serverless
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s[%(levelname)-5s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger(__file__)
async def reserve(client: Serverless, endpoint_name: str, hold: float, cost: int, label: str):
endpoint = await client.get_endpoint(name=endpoint_name)
async with await endpoint.session(cost=cost, lifetime=hold + 60) as s:
sid = s.session_id
log.info("[%s] %s open, holding %.0fs", label, sid, hold)
await asyncio.sleep(hold)
log.info("[%s] %s closed", label, sid)
async def main_async():
p = argparse.ArgumentParser(description="Vast Null PyWorker demo client")
p.add_argument("--endpoint", default=os.environ.get("VAST_ENDPOINT", "null-prod"))
p.add_argument("--instance", choices=("prod", "alpha", "candidate", "local"),
default=os.environ.get("VAST_INSTANCE", "prod"))
p.add_argument("--count", type=int, default=1,
help="concurrent sessions to open (default: 1)")
p.add_argument("--interval", type=float, default=30.0,
help="seconds between session starts when count>1 (default: 30)")
p.add_argument("--hold", type=float, default=180.0,
help="seconds to hold each session (default: 180)")
p.add_argument("--cost", type=int, default=100,
help="cost reported at session-create (default: 100)")
args = p.parse_args()
print(f"endpoint={args.endpoint} instance={args.instance} "
f"count={args.count} hold={args.hold}s cost={args.cost}")
try:
async with Serverless(instance=args.instance) as client:
tasks = []
for i in range(args.count):
label = f"res-{i+1}" if args.count > 1 else "reservation"
tasks.append(asyncio.create_task(
reserve(client, args.endpoint, args.hold, args.cost, label),
name=label,
))
if i + 1 < args.count:
await asyncio.sleep(args.interval)
await asyncio.gather(*tasks, return_exceptions=True)
except KeyboardInterrupt:
log.info("Interrupted")
except Exception as e:
log.error("Error: %s", e, exc_info=True)
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main_async())
-143
View File
@@ -1,143 +0,0 @@
import asyncio
import logging
import os
from contextlib import asynccontextmanager
from urllib.parse import urlsplit
from aiohttp import web
from vastai import (
Worker,
WorkerConfig,
HandlerConfig,
BenchmarkConfig,
LogActionConfig,
)
log = logging.getLogger(__file__)
TARGET_PERF = 100.0
BENCHMARK_SENTINEL = "__null_worker_benchmark__"
INTERNAL_HOST = "127.0.0.1"
INTERNAL_PORT = int(os.environ.get("NULL_CONTROL_PORT", 18999))
STUB_HEALTH_PATH = "/health"
BACKEND_HEALTH_URL = os.environ.get("BACKEND_HEALTH_URL", "").strip()
if BACKEND_HEALTH_URL:
_p = urlsplit(BACKEND_HEALTH_URL)
if not _p.scheme or not _p.hostname:
raise ValueError(f"BACKEND_HEALTH_URL must be absolute, got: {BACKEND_HEALTH_URL!r}")
HEALTH_BASE_URL = f"{_p.scheme}://{_p.hostname}"
HEALTH_PORT = _p.port or (443 if _p.scheme == "https" else 80)
HEALTH_PATH = _p.path or "/"
USE_STUB_HEALTH = False
else:
HEALTH_BASE_URL = f"http://{INTERNAL_HOST}"
HEALTH_PORT = INTERNAL_PORT
HEALTH_PATH = STUB_HEALTH_PATH
USE_STUB_HEALTH = True
_backend_ref: dict = {"backend": None}
def _build_internal_app() -> web.Application:
app = web.Application()
async def release_handler(_request: web.Request) -> web.Response:
# Closes the singleton session. Uses name-mangled __close_session
# to bypass the session_auth check — safe because this server is
# bound to 127.0.0.1, and it spares the consumer from threading
# session_auth through its queue.
backend = _backend_ref.get("backend")
if backend is None:
return web.json_response({"released": False, "reason": "backend not ready"}, status=503)
sids = list(backend.sessions.keys())
if not sids:
return web.json_response({"released": False, "reason": "no active session"}, status=200)
closed = []
for sid in sids:
try:
if await backend._Backend__close_session(sid):
closed.append(sid)
except Exception as e:
log.warning(f"Error closing session {sid}: {e}")
return web.json_response({"released": bool(closed), "session_ids": closed}, status=200)
app.router.add_post("/release", release_handler)
if USE_STUB_HEALTH:
async def stub_health(_request: web.Request) -> web.Response:
return web.Response(status=200, text="ok")
app.router.add_get(STUB_HEALTH_PATH, stub_health)
return app
@asynccontextmanager
async def null_lifecycle():
# Pin max_throughput to TARGET_PERF exactly — the framework's
# __run_benchmark short-circuits to float(file_contents) if this exists.
try:
with open(".has_benchmark", "w") as fh:
fh.write(str(int(TARGET_PERF)))
except OSError as e:
log.warning(f"Could not pin benchmark cache: {e}")
runner = web.AppRunner(_build_internal_app())
await runner.setup()
await web.TCPSite(runner, INTERNAL_HOST, INTERNAL_PORT).start()
log.info(
"Null pyworker control server: http://%s:%d (POST /release%s)",
INTERNAL_HOST,
INTERNAL_PORT,
f", GET {STUB_HEALTH_PATH}" if USE_STUB_HEALTH else "",
)
if not USE_STUB_HEALTH:
log.info("Framework healthcheck → %s", BACKEND_HEALTH_URL)
try:
yield
finally:
await runner.cleanup()
async def ping(**params: object) -> dict:
# Exists only to satisfy the framework's "at least one handler with a
# BenchmarkConfig" requirement. Sleep 1s on the benchmark path as a
# fallback in case the .has_benchmark cache pin failed; otherwise the
# benchmark cache short-circuits and this never runs.
if params.get(BENCHMARK_SENTINEL):
await asyncio.sleep(1.0)
return {"ok": True, "benchmark": True}
return {"ok": True}
worker_config = WorkerConfig(
model_server_url=HEALTH_BASE_URL,
model_server_port=HEALTH_PORT,
model_healthcheck_url=HEALTH_PATH,
lifecycle=null_lifecycle(),
max_sessions=1,
handlers=[
HandlerConfig(
route="/ping",
allow_parallel_requests=True,
remote_function=ping,
workload_calculator=lambda _payload: TARGET_PERF,
benchmark_config=BenchmarkConfig(
generator=lambda: {BENCHMARK_SENTINEL: True},
runs=1,
concurrency=1,
do_warmup=False,
),
),
],
log_action_config=LogActionConfig(),
)
_worker = Worker(worker_config)
_backend_ref["backend"] = _worker.backend
_worker.run()
+1 -1
View File
@@ -35,7 +35,7 @@ def benchmark_generator() -> dict:
benchmark_data = { benchmark_data = {
"inputs": prompt, "inputs": prompt,
"parameters": { "parameters": {
"max_new_tokens": 500, "max_new_tokens": 128,
"temperature": 0.7, "temperature": 0.7,
"return_full_text": False "return_full_text": False
} }