Compare commits

..

4 Commits

Author SHA1 Message Date
Lucas Armand 4e951f4912 test vastai_sdk test package 2026-04-08 13:38:22 -07:00
Lucas Armand f636012685 add test index 2026-04-08 13:18:46 -07:00
Lucas Armand ddb986d561 use test package 2026-04-08 13:12:27 -07:00
Lucas Armand 99a3319e66 Point to vast-cli 2026-04-08 12:30:20 -07:00
9 changed files with 19 additions and 315 deletions
+1 -1
View File
@@ -9,7 +9,7 @@ This repository contains **example PyWorkers** used by Vast.ais default Serve
- Optionally supports FIFO queueing when the backend cannot process concurrent requests
- Detects readiness/failure from model logs and runs a benchmark to estimate throughput
> Important: The **core PyWorker framework** (Worker, WorkerConfig, HandlerConfig, BenchmarkConfig, LogActionConfig) is provided by the **`vastai` / `vastai-sdk`** Python package (https://github.com/vast-ai/vast-sdk). This repo focuses on *worker implementations and examples*, not the framework internals.
> Important: The **core PyWorker framework** (Worker, WorkerConfig, HandlerConfig, BenchmarkConfig, LogActionConfig) is provided by the **`vastai`** Python package (https://github.com/vast-ai/vast-cli). This repo focuses on *worker implementations and examples*, not the framework internals.
## Repository Purpose
+4 -4
View File
@@ -1,16 +1,16 @@
# Where did the PyWorker code go?
We have moved the PyWorker source code into the `vastai-sdk` Python SDK.
We have moved the PyWorker source code into the `vastai` Python package.
You can install it with
```
pip install vastai-sdk
pip install vastai
```
All of the source code can be found here:
https://github.com/vast-ai/vast-sdk
https://github.com/vast-ai/vast-cli
And can be imported from vastai.serverless.server.lib
Serverless instances automatically run the start_server.sh script, which installs the vastai-sdk.
Serverless instances automatically run the start_server.sh script, which installs the vastai package.
This is how the PyWorker source code makes it onto your serverless instances.
You provide a worker.py file in your PYWORKER_REPO, and the start_server.sh will
create and run a PyWorker according to your configuration defined in the file.
+1 -2
View File
@@ -1,2 +1 @@
vastai-sdk>=0.3.0
nltk==3.9.4
vastai>=0.3.0
+12 -12
View File
@@ -53,7 +53,7 @@ JSON
exit 1
}
function install_vastai_sdk() {
function install_vastai() {
local uv_flags=()
if [ "${USE_SYSTEM_PYTHON:-}" = "true" ]; then
uv_flags+=(--system --break-system-packages)
@@ -77,8 +77,8 @@ function install_vastai_sdk() {
if [ -n "${SDK_VERSION:-}" ]; then
echo "Installing vastai version ${SDK_VERSION}"
if ! uv pip install "${uv_flags[@]}" "vastai==${SDK_VERSION}"; then
report_error_and_exit "Failed to install vastai==${SDK_VERSION}"
if ! uv pip install --index-url https://test.pypi.org/simple/ --extra-index-url https://pypi.org/simple/ "${uv_flags[@]}" "vastai-sdk-vast==${SDK_VERSION}"; then
report_error_and_exit "Failed to install vastai-vast==${SDK_VERSION}"
fi
return 0
fi
@@ -140,7 +140,7 @@ if [ "${USE_SYSTEM_PYTHON:-}" = "true" ]; then
fi
fi
fi
install_vastai_sdk
install_vastai
touch ~/.no_auto_tmux
elif [ ! -d "$ENV_PATH" ]; then
echo "setting up venv"
@@ -197,7 +197,7 @@ elif [ ! -d "$ENV_PATH" ]; then
report_error_and_exit "Failed to install Python requirements"
fi
install_vastai_sdk
install_vastai
if ! touch ~/.no_auto_tmux; then
report_error_and_exit "Failed to create ~/.no_auto_tmux"
@@ -237,7 +237,7 @@ else
fi
fi
install_vastai_sdk
install_vastai
fi
fi
@@ -318,8 +318,8 @@ if [ "$IS_DEPLOYMENT" = "true" ]; then
# The s3_key exists in the DB as soon as the deployment is created, but the
# actual upload may still be in flight from the client side.
# Install SDK (uses the install_vastai_sdk function which supports SDK_BRANCH/SDK_VERSION)
install_vastai_sdk
# Install SDK (uses the install_vastai function which supports SDK_BRANCH/SDK_VERSION)
install_vastai
# Run deployment in serve mode
export VAST_DEPLOYMENT_MODE=serve
echo "Starting deployment: python3 $DEPLOY_DIR/deployment.py"
@@ -339,19 +339,19 @@ set +e
PY_STATUS=1
if [ -f "$SERVER_DIR/worker.py" ]; then
echo "Running worker.py"
echo "trying worker.py"
python3 -m "worker" |& tee -a "$PYWORKER_LOG"
PY_STATUS=${PIPESTATUS[0]}
fi
if [ "${PY_STATUS}" -ne 0 ] && [ -f "$SERVER_DIR/workers/$BACKEND/worker.py" ]; then
echo "Running workers.${BACKEND}.worker"
echo "trying workers.${BACKEND}.worker"
python3 -m "workers.${BACKEND}.worker" |& tee -a "$PYWORKER_LOG"
PY_STATUS=${PIPESTATUS[0]}
fi
if [ "${PY_STATUS}" -ne 0 ] && [ -f "$SERVER_DIR/workers/$BACKEND/server.py" ]; then
echo "Running workers.${BACKEND}.server"
echo "trying workers.${BACKEND}.server"
python3 -m "workers.${BACKEND}.server" |& tee -a "$PYWORKER_LOG"
PY_STATUS=${PIPESTATUS[0]}
fi
@@ -365,4 +365,4 @@ if [ "${PY_STATUS}" -ne 0 ]; then
report_error_and_exit "PyWorker exited with status ${PY_STATUS}"
fi
echo "PyWorker bootstrap complete"
echo "launching PyWorker server done"
-88
View File
@@ -1,88 +0,0 @@
# Null PyWorker
Holds Vast Serverless reservations open without forwarding any work to a
model. Use it when your real workload (a queue consumer in any language)
runs as a separate process on the instance and you just want to drive
Vast autoscaling: **one POST reserves a worker, one POST releases it.**
## Use case
You have a job queue on your own infrastructure (Redis, SQS, NATS, etc.)
and a consumer (node, golang, python, a binary — anything) that pulls
from it. You want one Vast worker per unit of in-flight work, scaling
elastically from zero. The null PyWorker is the autoscaling driver; your
consumer does the work.
## How it works
Reservations use the framework's session API. The SDK's
`endpoint.session(...)` POSTs `/session/create` to reserve a worker;
`session.close()` POSTs `/session/end` to release it. `max_sessions=1`
means each worker holds exactly one reservation — the next reservation
either lands on a free worker or triggers a scale-up.
The PyWorker itself does nothing functional:
- One trivial `/ping` route to satisfy the framework's benchmark
requirement (its `max_perf` is pinned to 100).
- An internal `/release` endpoint on `127.0.0.1:18999` for the local
consumer to end the session without needing `session_auth`.
## Endpoint parameters
Tested working configuration:
| Parameter | Value | Why |
|---|---|---|
| `target_util` | `1.0` | One session = one worker. Default `0.9` rounds up to an extra worker. |
| `min_load` | `0` | Scale-to-zero floor. |
| `max_queue_time` | `1` | Stop routing to an occupied worker after ~1s of implied queue. |
| `target_queue_time` | `0.5` | Trigger scale-up promptly once anything queues. |
| `inactivity_timeout` | `10` (seconds) | Permit scale-to-zero after 10s idle. |
## API
| Route | Where | Use |
|---|---|---|
| `POST /session/create` | endpoint, signed | Reserve a worker (`endpoint.session(...)`) |
| `POST /session/end` | endpoint, signed | Release (`session.close()`) |
| `POST /release` | `127.0.0.1:18999`, no auth | Local consumer release, no `session_auth` needed |
## Healthcheck
Default: stub on `127.0.0.1:18999/health` returning `200`. Set
`BACKEND_HEALTH_URL=http://127.0.0.1:9090/health` (absolute URL) to point
the framework at your queue consumer's health endpoint instead — if the
consumer dies, the autoscaler sees the worker as broken.
## Deploying
1. Point `PYWORKER_REPO` at this repo (or your fork).
2. Set `BACKEND=null` in the template.
3. Run your queue consumer alongside the PyWorker. When it's done with
a unit of work:
```bash
curl -X POST http://127.0.0.1:18999/release
```
## Client demo
```bash
# Single reservation, hold 180s
python -m workers.null.client --endpoint <NAME> --instance alpha
# Three concurrent reservations, started 30s apart, each held 360s
python -m workers.null.client --endpoint <NAME> --instance alpha --count 3 --hold 360
```
Flags: `--count` (number of concurrent sessions, default 1), `--hold`
(seconds each session is held, default 180), `--interval` (seconds
between starts when `--count > 1`, default 30), `--cost` (cost reported
at session-create, default 100 = `max_perf`), `--instance` (`prod` |
`alpha` | `candidate` | `local`).
## Environment variables
- `BACKEND_HEALTH_URL` — absolute URL the framework healthchecks. Stub
is used when unset.
- `NULL_CONTROL_PORT` — internal control server port. Defaults to `18999`.
View File
-64
View File
@@ -1,64 +0,0 @@
import argparse
import asyncio
import logging
import os
import sys
from vastai import Serverless
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s[%(levelname)-5s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger(__file__)
async def reserve(client: Serverless, endpoint_name: str, hold: float, cost: int, label: str):
endpoint = await client.get_endpoint(name=endpoint_name)
async with await endpoint.session(cost=cost, lifetime=hold + 60) as s:
sid = s.session_id
log.info("[%s] %s open, holding %.0fs", label, sid, hold)
await asyncio.sleep(hold)
log.info("[%s] %s closed", label, sid)
async def main_async():
p = argparse.ArgumentParser(description="Vast Null PyWorker demo client")
p.add_argument("--endpoint", default=os.environ.get("VAST_ENDPOINT", "null-prod"))
p.add_argument("--instance", choices=("prod", "alpha", "candidate", "local"),
default=os.environ.get("VAST_INSTANCE", "prod"))
p.add_argument("--count", type=int, default=1,
help="concurrent sessions to open (default: 1)")
p.add_argument("--interval", type=float, default=30.0,
help="seconds between session starts when count>1 (default: 30)")
p.add_argument("--hold", type=float, default=180.0,
help="seconds to hold each session (default: 180)")
p.add_argument("--cost", type=int, default=100,
help="cost reported at session-create (default: 100)")
args = p.parse_args()
print(f"endpoint={args.endpoint} instance={args.instance} "
f"count={args.count} hold={args.hold}s cost={args.cost}")
try:
async with Serverless(instance=args.instance) as client:
tasks = []
for i in range(args.count):
label = f"res-{i+1}" if args.count > 1 else "reservation"
tasks.append(asyncio.create_task(
reserve(client, args.endpoint, args.hold, args.cost, label),
name=label,
))
if i + 1 < args.count:
await asyncio.sleep(args.interval)
await asyncio.gather(*tasks, return_exceptions=True)
except KeyboardInterrupt:
log.info("Interrupted")
except Exception as e:
log.error("Error: %s", e, exc_info=True)
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main_async())
-143
View File
@@ -1,143 +0,0 @@
import asyncio
import logging
import os
from contextlib import asynccontextmanager
from urllib.parse import urlsplit
from aiohttp import web
from vastai import (
Worker,
WorkerConfig,
HandlerConfig,
BenchmarkConfig,
LogActionConfig,
)
log = logging.getLogger(__file__)
TARGET_PERF = 100.0
BENCHMARK_SENTINEL = "__null_worker_benchmark__"
INTERNAL_HOST = "127.0.0.1"
INTERNAL_PORT = int(os.environ.get("NULL_CONTROL_PORT", 18999))
STUB_HEALTH_PATH = "/health"
BACKEND_HEALTH_URL = os.environ.get("BACKEND_HEALTH_URL", "").strip()
if BACKEND_HEALTH_URL:
_p = urlsplit(BACKEND_HEALTH_URL)
if not _p.scheme or not _p.hostname:
raise ValueError(f"BACKEND_HEALTH_URL must be absolute, got: {BACKEND_HEALTH_URL!r}")
HEALTH_BASE_URL = f"{_p.scheme}://{_p.hostname}"
HEALTH_PORT = _p.port or (443 if _p.scheme == "https" else 80)
HEALTH_PATH = _p.path or "/"
USE_STUB_HEALTH = False
else:
HEALTH_BASE_URL = f"http://{INTERNAL_HOST}"
HEALTH_PORT = INTERNAL_PORT
HEALTH_PATH = STUB_HEALTH_PATH
USE_STUB_HEALTH = True
_backend_ref: dict = {"backend": None}
def _build_internal_app() -> web.Application:
app = web.Application()
async def release_handler(_request: web.Request) -> web.Response:
# Closes the singleton session. Uses name-mangled __close_session
# to bypass the session_auth check — safe because this server is
# bound to 127.0.0.1, and it spares the consumer from threading
# session_auth through its queue.
backend = _backend_ref.get("backend")
if backend is None:
return web.json_response({"released": False, "reason": "backend not ready"}, status=503)
sids = list(backend.sessions.keys())
if not sids:
return web.json_response({"released": False, "reason": "no active session"}, status=200)
closed = []
for sid in sids:
try:
if await backend._Backend__close_session(sid):
closed.append(sid)
except Exception as e:
log.warning(f"Error closing session {sid}: {e}")
return web.json_response({"released": bool(closed), "session_ids": closed}, status=200)
app.router.add_post("/release", release_handler)
if USE_STUB_HEALTH:
async def stub_health(_request: web.Request) -> web.Response:
return web.Response(status=200, text="ok")
app.router.add_get(STUB_HEALTH_PATH, stub_health)
return app
@asynccontextmanager
async def null_lifecycle():
# Pin max_throughput to TARGET_PERF exactly — the framework's
# __run_benchmark short-circuits to float(file_contents) if this exists.
try:
with open(".has_benchmark", "w") as fh:
fh.write(str(int(TARGET_PERF)))
except OSError as e:
log.warning(f"Could not pin benchmark cache: {e}")
runner = web.AppRunner(_build_internal_app())
await runner.setup()
await web.TCPSite(runner, INTERNAL_HOST, INTERNAL_PORT).start()
log.info(
"Null pyworker control server: http://%s:%d (POST /release%s)",
INTERNAL_HOST,
INTERNAL_PORT,
f", GET {STUB_HEALTH_PATH}" if USE_STUB_HEALTH else "",
)
if not USE_STUB_HEALTH:
log.info("Framework healthcheck → %s", BACKEND_HEALTH_URL)
try:
yield
finally:
await runner.cleanup()
async def ping(**params: object) -> dict:
# Exists only to satisfy the framework's "at least one handler with a
# BenchmarkConfig" requirement. Sleep 1s on the benchmark path as a
# fallback in case the .has_benchmark cache pin failed; otherwise the
# benchmark cache short-circuits and this never runs.
if params.get(BENCHMARK_SENTINEL):
await asyncio.sleep(1.0)
return {"ok": True, "benchmark": True}
return {"ok": True}
worker_config = WorkerConfig(
model_server_url=HEALTH_BASE_URL,
model_server_port=HEALTH_PORT,
model_healthcheck_url=HEALTH_PATH,
lifecycle=null_lifecycle(),
max_sessions=1,
handlers=[
HandlerConfig(
route="/ping",
allow_parallel_requests=True,
remote_function=ping,
workload_calculator=lambda _payload: TARGET_PERF,
benchmark_config=BenchmarkConfig(
generator=lambda: {BENCHMARK_SENTINEL: True},
runs=1,
concurrency=1,
do_warmup=False,
),
),
],
log_action_config=LogActionConfig(),
)
_worker = Worker(worker_config)
_backend_ref["backend"] = _worker.backend
_worker.run()
+1 -1
View File
@@ -35,7 +35,7 @@ def benchmark_generator() -> dict:
benchmark_data = {
"inputs": prompt,
"parameters": {
"max_new_tokens": 500,
"max_new_tokens": 128,
"temperature": 0.7,
"return_full_text": False
}