Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| c05131cd14 | |||
| ebaf3b6d3a |
+12
-1
@@ -1,2 +1,13 @@
|
|||||||
|
aiohttp==3.10.1
|
||||||
|
aiodns~=3.6.0
|
||||||
|
pycares~=4.11.0
|
||||||
|
anyio~=4.4
|
||||||
|
lib~=4.0
|
||||||
|
nltk~=3.9
|
||||||
|
psutil~=6.0
|
||||||
|
pycryptodome~=3.20
|
||||||
|
Requests~=2.32
|
||||||
|
transformers~=4.52
|
||||||
|
utils==1.0.*
|
||||||
|
hf_transfer>=0.1.9
|
||||||
vastai-sdk>=0.3.0
|
vastai-sdk>=0.3.0
|
||||||
nltk==3.9.4
|
|
||||||
+17
-161
@@ -2,17 +2,10 @@
|
|||||||
|
|
||||||
set -e -o pipefail
|
set -e -o pipefail
|
||||||
|
|
||||||
# Check for force update flag
|
|
||||||
FORCE_UPDATE=false
|
|
||||||
if [ -f "/.force_update" ]; then
|
|
||||||
echo "Force update flag detected at /.force_update"
|
|
||||||
FORCE_UPDATE=true
|
|
||||||
fi
|
|
||||||
|
|
||||||
WORKSPACE_DIR="${WORKSPACE_DIR:-/workspace}"
|
WORKSPACE_DIR="${WORKSPACE_DIR:-/workspace}"
|
||||||
|
|
||||||
SERVER_DIR="$WORKSPACE_DIR/vast-pyworker"
|
SERVER_DIR="$WORKSPACE_DIR/vast-pyworker"
|
||||||
ENV_PATH="${ENV_PATH:-$WORKSPACE_DIR/worker-env}"
|
ENV_PATH="$WORKSPACE_DIR/worker-env"
|
||||||
DEBUG_LOG="$WORKSPACE_DIR/debug.log"
|
DEBUG_LOG="$WORKSPACE_DIR/debug.log"
|
||||||
PYWORKER_LOG="$WORKSPACE_DIR/pyworker.log"
|
PYWORKER_LOG="$WORKSPACE_DIR/pyworker.log"
|
||||||
|
|
||||||
@@ -53,42 +46,6 @@ JSON
|
|||||||
exit 1
|
exit 1
|
||||||
}
|
}
|
||||||
|
|
||||||
function install_vastai_sdk() {
|
|
||||||
local uv_flags=()
|
|
||||||
if [ "${USE_SYSTEM_PYTHON:-}" = "true" ]; then
|
|
||||||
uv_flags+=(--system --break-system-packages)
|
|
||||||
fi
|
|
||||||
if [ "$FORCE_UPDATE" = true ]; then
|
|
||||||
uv_flags+=(--force-reinstall)
|
|
||||||
echo "Force reinstalling vastai"
|
|
||||||
fi
|
|
||||||
|
|
||||||
# If SDK_BRANCH is set, install vastai from the vast-cli repo at that branch/tag/commit.
|
|
||||||
if [ -n "${SDK_BRANCH:-}" ]; then
|
|
||||||
if [ -n "${SDK_VERSION:-}" ]; then
|
|
||||||
echo "WARNING: Both SDK_BRANCH and SDK_VERSION are set; using SDK_BRANCH=${SDK_BRANCH}"
|
|
||||||
fi
|
|
||||||
echo "Installing vastai from https://github.com/vast-ai/vast-cli/ @ ${SDK_BRANCH}"
|
|
||||||
if ! uv pip install "${uv_flags[@]}" "vastai @ git+https://github.com/vast-ai/vast-cli.git@${SDK_BRANCH}"; then
|
|
||||||
report_error_and_exit "Failed to install vastai from vast-ai/vast-cli@${SDK_BRANCH}"
|
|
||||||
fi
|
|
||||||
return 0
|
|
||||||
fi
|
|
||||||
|
|
||||||
if [ -n "${SDK_VERSION:-}" ]; then
|
|
||||||
echo "Installing vastai version ${SDK_VERSION}"
|
|
||||||
if ! uv pip install "${uv_flags[@]}" "vastai==${SDK_VERSION}"; then
|
|
||||||
report_error_and_exit "Failed to install vastai==${SDK_VERSION}"
|
|
||||||
fi
|
|
||||||
return 0
|
|
||||||
fi
|
|
||||||
|
|
||||||
echo "Installing default vastai"
|
|
||||||
if ! uv pip install "${uv_flags[@]}" vastai; then
|
|
||||||
report_error_and_exit "Failed to install vastai"
|
|
||||||
fi
|
|
||||||
}
|
|
||||||
|
|
||||||
[ -n "$BACKEND" ] && [ -z "$HF_TOKEN" ] && report_error_and_exit "HF_TOKEN must be set when BACKEND is set!"
|
[ -n "$BACKEND" ] && [ -z "$HF_TOKEN" ] && report_error_and_exit "HF_TOKEN must be set when BACKEND is set!"
|
||||||
[ -z "$CONTAINER_ID" ] && report_error_and_exit "CONTAINER_ID must be set!"
|
[ -z "$CONTAINER_ID" ] && report_error_and_exit "CONTAINER_ID must be set!"
|
||||||
[ "$BACKEND" = "comfyui" ] && [ -z "$COMFY_MODEL" ] && report_error_and_exit "For comfyui backends, COMFY_MODEL must be set!"
|
[ "$BACKEND" = "comfyui" ] && [ -z "$COMFY_MODEL" ] && report_error_and_exit "For comfyui backends, COMFY_MODEL must be set!"
|
||||||
@@ -106,8 +63,7 @@ echo_var DEBUG_LOG
|
|||||||
echo_var PYWORKER_LOG
|
echo_var PYWORKER_LOG
|
||||||
echo_var MODEL_LOG
|
echo_var MODEL_LOG
|
||||||
|
|
||||||
ROTATE_MODEL_LOG="${ROTATE_MODEL_LOG:-false}"
|
if [ -e "$MODEL_LOG" ]; then
|
||||||
if [ "$ROTATE_MODEL_LOG" = "true" ] && [ -e "$MODEL_LOG" ]; then
|
|
||||||
echo "Rotating model log at $MODEL_LOG to $MODEL_LOG.old"
|
echo "Rotating model log at $MODEL_LOG to $MODEL_LOG.old"
|
||||||
if ! cat "$MODEL_LOG" >> "$MODEL_LOG.old"; then
|
if ! cat "$MODEL_LOG" >> "$MODEL_LOG.old"; then
|
||||||
report_error_and_exit "Failed to rotate model log"
|
report_error_and_exit "Failed to rotate model log"
|
||||||
@@ -128,21 +84,8 @@ if ! grep -q "VAST" /etc/environment; then
|
|||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if [ "${USE_SYSTEM_PYTHON:-}" = "true" ]; then
|
if [ ! -d "$ENV_PATH" ]
|
||||||
echo "Using system Python: $(which python3)"
|
then
|
||||||
if ! which uv > /dev/null 2>&1; then
|
|
||||||
if ! curl -LsSf https://astral.sh/uv/install.sh | sh; then
|
|
||||||
report_error_and_exit "Failed to install uv package manager"
|
|
||||||
fi
|
|
||||||
if [[ -f ~/.local/bin/env ]]; then
|
|
||||||
if ! source ~/.local/bin/env; then
|
|
||||||
report_error_and_exit "Failed to source uv environment"
|
|
||||||
fi
|
|
||||||
fi
|
|
||||||
fi
|
|
||||||
install_vastai_sdk
|
|
||||||
touch ~/.no_auto_tmux
|
|
||||||
elif [ ! -d "$ENV_PATH" ]; then
|
|
||||||
echo "setting up venv"
|
echo "setting up venv"
|
||||||
if ! which uv; then
|
if ! which uv; then
|
||||||
if ! curl -LsSf https://astral.sh/uv/install.sh | sh; then
|
if ! curl -LsSf https://astral.sh/uv/install.sh | sh; then
|
||||||
@@ -161,27 +104,10 @@ elif [ ! -d "$ENV_PATH" ]; then
|
|||||||
if ! git clone "${PYWORKER_REPO:-https://github.com/vast-ai/pyworker}" "$SERVER_DIR"; then
|
if ! git clone "${PYWORKER_REPO:-https://github.com/vast-ai/pyworker}" "$SERVER_DIR"; then
|
||||||
report_error_and_exit "Failed to clone pyworker repository"
|
report_error_and_exit "Failed to clone pyworker repository"
|
||||||
fi
|
fi
|
||||||
elif [ "$FORCE_UPDATE" = true ]; then
|
|
||||||
echo "Force updating pyworker repository"
|
|
||||||
if ! (cd "$SERVER_DIR" && git fetch --all); then
|
|
||||||
report_error_and_exit "Failed to fetch pyworker repository updates"
|
|
||||||
fi
|
|
||||||
fi
|
fi
|
||||||
if [[ -n ${PYWORKER_REF:-} ]]; then
|
if [[ -n ${PYWORKER_REF:-} ]]; then
|
||||||
if [ "$FORCE_UPDATE" = true ]; then
|
if ! (cd "$SERVER_DIR" && git checkout "$PYWORKER_REF"); then
|
||||||
echo "Force updating to pyworker reference: $PYWORKER_REF"
|
report_error_and_exit "Failed to checkout pyworker reference: $PYWORKER_REF"
|
||||||
if ! (cd "$SERVER_DIR" && git checkout "$PYWORKER_REF" && git pull); then
|
|
||||||
report_error_and_exit "Failed to force update pyworker reference: $PYWORKER_REF"
|
|
||||||
fi
|
|
||||||
else
|
|
||||||
if ! (cd "$SERVER_DIR" && git checkout "$PYWORKER_REF"); then
|
|
||||||
report_error_and_exit "Failed to checkout pyworker reference: $PYWORKER_REF"
|
|
||||||
fi
|
|
||||||
fi
|
|
||||||
elif [ "$FORCE_UPDATE" = true ]; then
|
|
||||||
echo "Force updating pyworker to latest"
|
|
||||||
if ! (cd "$SERVER_DIR" && git pull); then
|
|
||||||
report_error_and_exit "Failed to pull latest pyworker changes"
|
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
|
|
||||||
@@ -197,8 +123,6 @@ elif [ ! -d "$ENV_PATH" ]; then
|
|||||||
report_error_and_exit "Failed to install Python requirements"
|
report_error_and_exit "Failed to install Python requirements"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
install_vastai_sdk
|
|
||||||
|
|
||||||
if ! touch ~/.no_auto_tmux; then
|
if ! touch ~/.no_auto_tmux; then
|
||||||
report_error_and_exit "Failed to create ~/.no_auto_tmux"
|
report_error_and_exit "Failed to create ~/.no_auto_tmux"
|
||||||
fi
|
fi
|
||||||
@@ -208,44 +132,11 @@ else
|
|||||||
report_error_and_exit "Failed to source uv environment"
|
report_error_and_exit "Failed to source uv environment"
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
if ! source "$ENV_PATH/bin/activate"; then
|
if ! source "$WORKSPACE_DIR/worker-env/bin/activate"; then
|
||||||
report_error_and_exit "Failed to activate existing virtual environment"
|
report_error_and_exit "Failed to activate existing virtual environment"
|
||||||
fi
|
fi
|
||||||
echo "environment activated"
|
echo "environment activated"
|
||||||
echo "venv: $VIRTUAL_ENV"
|
echo "venv: $VIRTUAL_ENV"
|
||||||
|
|
||||||
# Handle force update for existing environment
|
|
||||||
if [ "$FORCE_UPDATE" = true ]; then
|
|
||||||
echo "Performing force update on existing environment"
|
|
||||||
|
|
||||||
if [[ -d $SERVER_DIR ]]; then
|
|
||||||
echo "Force updating pyworker repository"
|
|
||||||
if ! (cd "$SERVER_DIR" && git fetch --all); then
|
|
||||||
report_error_and_exit "Failed to fetch pyworker repository updates"
|
|
||||||
fi
|
|
||||||
|
|
||||||
if [[ -n ${PYWORKER_REF:-} ]]; then
|
|
||||||
echo "Force updating to pyworker reference: $PYWORKER_REF"
|
|
||||||
if ! (cd "$SERVER_DIR" && git checkout "$PYWORKER_REF" && git pull); then
|
|
||||||
report_error_and_exit "Failed to force update pyworker reference: $PYWORKER_REF"
|
|
||||||
fi
|
|
||||||
else
|
|
||||||
echo "Force updating pyworker to latest"
|
|
||||||
if ! (cd "$SERVER_DIR" && git pull); then
|
|
||||||
report_error_and_exit "Failed to pull latest pyworker changes"
|
|
||||||
fi
|
|
||||||
fi
|
|
||||||
fi
|
|
||||||
|
|
||||||
install_vastai_sdk
|
|
||||||
fi
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Remove force update flag after successful update
|
|
||||||
if [ "$FORCE_UPDATE" = true ]; then
|
|
||||||
echo "Removing force update flag"
|
|
||||||
rm -f "/.force_update"
|
|
||||||
echo "Force update completed successfully"
|
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if [ "$USE_SSL" = true ]; then
|
if [ "$USE_SSL" = true ]; then
|
||||||
@@ -283,51 +174,16 @@ EOF
|
|||||||
report_error_and_exit "Failed to generate SSL certificate request"
|
report_error_and_exit "Failed to generate SSL certificate request"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
max_retries=5
|
if ! curl --header 'Content-Type: application/octet-stream' \
|
||||||
retry_delay=2
|
--data-binary @/etc/instance.csr \
|
||||||
for attempt in $(seq 1 "$max_retries"); do
|
-X \
|
||||||
http_code=$(curl -sS -o /etc/instance.crt -w '%{http_code}' \
|
POST "https://console.vast.ai/api/v0/sign_cert/?instance_id=$CONTAINER_ID" > /etc/instance.crt; then
|
||||||
--header 'Content-Type: application/octet-stream' \
|
report_error_and_exit "Failed to sign SSL certificate"
|
||||||
--data-binary @/etc/instance.csr \
|
fi
|
||||||
-X POST "https://console.vast.ai/api/v0/sign_cert/?instance_id=$CONTAINER_ID")
|
|
||||||
if [ "$http_code" -ge 200 ] && [ "$http_code" -lt 300 ]; then
|
|
||||||
break
|
|
||||||
fi
|
|
||||||
echo "SSL cert signing attempt $attempt/$max_retries failed (HTTP $http_code)"
|
|
||||||
if [ "$attempt" -eq "$max_retries" ]; then
|
|
||||||
report_error_and_exit "Failed to sign SSL certificate after $max_retries attempts (HTTP $http_code)"
|
|
||||||
fi
|
|
||||||
sleep "$retry_delay"
|
|
||||||
retry_delay=$((retry_delay * 2))
|
|
||||||
done
|
|
||||||
fi
|
fi
|
||||||
|
|
||||||
export REPORT_ADDR WORKER_PORT USE_SSL UNSECURED
|
export REPORT_ADDR WORKER_PORT USE_SSL UNSECURED
|
||||||
|
|
||||||
# ─── SDK Deployment Mode ───────────────────────────────────────────────
|
|
||||||
if [ "$IS_DEPLOYMENT" = "true" ]; then
|
|
||||||
echo "=== SDK Deployment Mode ==="
|
|
||||||
echo "DEPLOYMENT_ID: $DEPLOYMENT_ID"
|
|
||||||
|
|
||||||
DEPLOY_DIR="/workspace/deployment"
|
|
||||||
mkdir -p "$DEPLOY_DIR"
|
|
||||||
|
|
||||||
VAST_API_BASE="${VAST_API_BASE:-https://console.vast.ai}"
|
|
||||||
|
|
||||||
# Download deployment code, retrying until the blob is available on S3.
|
|
||||||
# The s3_key exists in the DB as soon as the deployment is created, but the
|
|
||||||
# actual upload may still be in flight from the client side.
|
|
||||||
|
|
||||||
# Install SDK (uses the install_vastai_sdk function which supports SDK_BRANCH/SDK_VERSION)
|
|
||||||
install_vastai_sdk
|
|
||||||
# Run deployment in serve mode
|
|
||||||
export VAST_DEPLOYMENT_MODE=serve
|
|
||||||
echo "Starting deployment: python3 $DEPLOY_DIR/deployment.py"
|
|
||||||
serve-vast-deployment
|
|
||||||
exit $?
|
|
||||||
fi
|
|
||||||
# ─── End SDK Deployment Mode ───────────────────────────────────────────
|
|
||||||
|
|
||||||
if ! cd "$SERVER_DIR"; then
|
if ! cd "$SERVER_DIR"; then
|
||||||
report_error_and_exit "Failed to cd into SERVER_DIR: $SERVER_DIR"
|
report_error_and_exit "Failed to cd into SERVER_DIR: $SERVER_DIR"
|
||||||
fi
|
fi
|
||||||
@@ -339,19 +195,19 @@ set +e
|
|||||||
PY_STATUS=1
|
PY_STATUS=1
|
||||||
|
|
||||||
if [ -f "$SERVER_DIR/worker.py" ]; then
|
if [ -f "$SERVER_DIR/worker.py" ]; then
|
||||||
echo "Running worker.py"
|
echo "trying worker.py"
|
||||||
python3 -m "worker" |& tee -a "$PYWORKER_LOG"
|
python3 -m "worker" |& tee -a "$PYWORKER_LOG"
|
||||||
PY_STATUS=${PIPESTATUS[0]}
|
PY_STATUS=${PIPESTATUS[0]}
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if [ "${PY_STATUS}" -ne 0 ] && [ -f "$SERVER_DIR/workers/$BACKEND/worker.py" ]; then
|
if [ "${PY_STATUS}" -ne 0 ] && [ -f "$SERVER_DIR/workers/$BACKEND/worker.py" ]; then
|
||||||
echo "Running workers.${BACKEND}.worker"
|
echo "trying workers.${BACKEND}.worker"
|
||||||
python3 -m "workers.${BACKEND}.worker" |& tee -a "$PYWORKER_LOG"
|
python3 -m "workers.${BACKEND}.worker" |& tee -a "$PYWORKER_LOG"
|
||||||
PY_STATUS=${PIPESTATUS[0]}
|
PY_STATUS=${PIPESTATUS[0]}
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if [ "${PY_STATUS}" -ne 0 ] && [ -f "$SERVER_DIR/workers/$BACKEND/server.py" ]; then
|
if [ "${PY_STATUS}" -ne 0 ] && [ -f "$SERVER_DIR/workers/$BACKEND/server.py" ]; then
|
||||||
echo "Running workers.${BACKEND}.server"
|
echo "trying workers.${BACKEND}.server"
|
||||||
python3 -m "workers.${BACKEND}.server" |& tee -a "$PYWORKER_LOG"
|
python3 -m "workers.${BACKEND}.server" |& tee -a "$PYWORKER_LOG"
|
||||||
PY_STATUS=${PIPESTATUS[0]}
|
PY_STATUS=${PIPESTATUS[0]}
|
||||||
fi
|
fi
|
||||||
@@ -365,4 +221,4 @@ if [ "${PY_STATUS}" -ne 0 ]; then
|
|||||||
report_error_and_exit "PyWorker exited with status ${PY_STATUS}"
|
report_error_and_exit "PyWorker exited with status ${PY_STATUS}"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
echo "PyWorker bootstrap complete"
|
echo "launching PyWorker server done"
|
||||||
|
|||||||
@@ -1,88 +0,0 @@
|
|||||||
# Null PyWorker
|
|
||||||
|
|
||||||
Holds Vast Serverless reservations open without forwarding any work to a
|
|
||||||
model. Use it when your real workload (a queue consumer in any language)
|
|
||||||
runs as a separate process on the instance and you just want to drive
|
|
||||||
Vast autoscaling: **one POST reserves a worker, one POST releases it.**
|
|
||||||
|
|
||||||
## Use case
|
|
||||||
|
|
||||||
You have a job queue on your own infrastructure (Redis, SQS, NATS, etc.)
|
|
||||||
and a consumer (node, golang, python, a binary — anything) that pulls
|
|
||||||
from it. You want one Vast worker per unit of in-flight work, scaling
|
|
||||||
elastically from zero. The null PyWorker is the autoscaling driver; your
|
|
||||||
consumer does the work.
|
|
||||||
|
|
||||||
## How it works
|
|
||||||
|
|
||||||
Reservations use the framework's session API. The SDK's
|
|
||||||
`endpoint.session(...)` POSTs `/session/create` to reserve a worker;
|
|
||||||
`session.close()` POSTs `/session/end` to release it. `max_sessions=1`
|
|
||||||
means each worker holds exactly one reservation — the next reservation
|
|
||||||
either lands on a free worker or triggers a scale-up.
|
|
||||||
|
|
||||||
The PyWorker itself does nothing functional:
|
|
||||||
|
|
||||||
- One trivial `/ping` route to satisfy the framework's benchmark
|
|
||||||
requirement (its `max_perf` is pinned to 100).
|
|
||||||
- An internal `/release` endpoint on `127.0.0.1:18999` for the local
|
|
||||||
consumer to end the session without needing `session_auth`.
|
|
||||||
|
|
||||||
## Endpoint parameters
|
|
||||||
|
|
||||||
Tested working configuration:
|
|
||||||
|
|
||||||
| Parameter | Value | Why |
|
|
||||||
|---|---|---|
|
|
||||||
| `target_util` | `1.0` | One session = one worker. Default `0.9` rounds up to an extra worker. |
|
|
||||||
| `min_load` | `0` | Scale-to-zero floor. |
|
|
||||||
| `max_queue_time` | `1` | Stop routing to an occupied worker after ~1s of implied queue. |
|
|
||||||
| `target_queue_time` | `0.5` | Trigger scale-up promptly once anything queues. |
|
|
||||||
| `inactivity_timeout` | `10` (seconds) | Permit scale-to-zero after 10s idle. |
|
|
||||||
|
|
||||||
## API
|
|
||||||
|
|
||||||
| Route | Where | Use |
|
|
||||||
|---|---|---|
|
|
||||||
| `POST /session/create` | endpoint, signed | Reserve a worker (`endpoint.session(...)`) |
|
|
||||||
| `POST /session/end` | endpoint, signed | Release (`session.close()`) |
|
|
||||||
| `POST /release` | `127.0.0.1:18999`, no auth | Local consumer release, no `session_auth` needed |
|
|
||||||
|
|
||||||
## Healthcheck
|
|
||||||
|
|
||||||
Default: stub on `127.0.0.1:18999/health` returning `200`. Set
|
|
||||||
`BACKEND_HEALTH_URL=http://127.0.0.1:9090/health` (absolute URL) to point
|
|
||||||
the framework at your queue consumer's health endpoint instead — if the
|
|
||||||
consumer dies, the autoscaler sees the worker as broken.
|
|
||||||
|
|
||||||
## Deploying
|
|
||||||
|
|
||||||
1. Point `PYWORKER_REPO` at this repo (or your fork).
|
|
||||||
2. Set `BACKEND=null` in the template.
|
|
||||||
3. Run your queue consumer alongside the PyWorker. When it's done with
|
|
||||||
a unit of work:
|
|
||||||
```bash
|
|
||||||
curl -X POST http://127.0.0.1:18999/release
|
|
||||||
```
|
|
||||||
|
|
||||||
## Client demo
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# Single reservation, hold 180s
|
|
||||||
python -m workers.null.client --endpoint <NAME> --instance alpha
|
|
||||||
|
|
||||||
# Three concurrent reservations, started 30s apart, each held 360s
|
|
||||||
python -m workers.null.client --endpoint <NAME> --instance alpha --count 3 --hold 360
|
|
||||||
```
|
|
||||||
|
|
||||||
Flags: `--count` (number of concurrent sessions, default 1), `--hold`
|
|
||||||
(seconds each session is held, default 180), `--interval` (seconds
|
|
||||||
between starts when `--count > 1`, default 30), `--cost` (cost reported
|
|
||||||
at session-create, default 100 = `max_perf`), `--instance` (`prod` |
|
|
||||||
`alpha` | `candidate` | `local`).
|
|
||||||
|
|
||||||
## Environment variables
|
|
||||||
|
|
||||||
- `BACKEND_HEALTH_URL` — absolute URL the framework healthchecks. Stub
|
|
||||||
is used when unset.
|
|
||||||
- `NULL_CONTROL_PORT` — internal control server port. Defaults to `18999`.
|
|
||||||
@@ -1,64 +0,0 @@
|
|||||||
import argparse
|
|
||||||
import asyncio
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
|
|
||||||
from vastai import Serverless
|
|
||||||
|
|
||||||
logging.basicConfig(
|
|
||||||
level=logging.INFO,
|
|
||||||
format="%(asctime)s[%(levelname)-5s] %(message)s",
|
|
||||||
datefmt="%Y-%m-%d %H:%M:%S",
|
|
||||||
)
|
|
||||||
log = logging.getLogger(__file__)
|
|
||||||
|
|
||||||
|
|
||||||
async def reserve(client: Serverless, endpoint_name: str, hold: float, cost: int, label: str):
|
|
||||||
endpoint = await client.get_endpoint(name=endpoint_name)
|
|
||||||
async with await endpoint.session(cost=cost, lifetime=hold + 60) as s:
|
|
||||||
sid = s.session_id
|
|
||||||
log.info("[%s] %s open, holding %.0fs", label, sid, hold)
|
|
||||||
await asyncio.sleep(hold)
|
|
||||||
log.info("[%s] %s closed", label, sid)
|
|
||||||
|
|
||||||
|
|
||||||
async def main_async():
|
|
||||||
p = argparse.ArgumentParser(description="Vast Null PyWorker demo client")
|
|
||||||
p.add_argument("--endpoint", default=os.environ.get("VAST_ENDPOINT", "null-prod"))
|
|
||||||
p.add_argument("--instance", choices=("prod", "alpha", "candidate", "local"),
|
|
||||||
default=os.environ.get("VAST_INSTANCE", "prod"))
|
|
||||||
p.add_argument("--count", type=int, default=1,
|
|
||||||
help="concurrent sessions to open (default: 1)")
|
|
||||||
p.add_argument("--interval", type=float, default=30.0,
|
|
||||||
help="seconds between session starts when count>1 (default: 30)")
|
|
||||||
p.add_argument("--hold", type=float, default=180.0,
|
|
||||||
help="seconds to hold each session (default: 180)")
|
|
||||||
p.add_argument("--cost", type=int, default=100,
|
|
||||||
help="cost reported at session-create (default: 100)")
|
|
||||||
args = p.parse_args()
|
|
||||||
|
|
||||||
print(f"endpoint={args.endpoint} instance={args.instance} "
|
|
||||||
f"count={args.count} hold={args.hold}s cost={args.cost}")
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with Serverless(instance=args.instance) as client:
|
|
||||||
tasks = []
|
|
||||||
for i in range(args.count):
|
|
||||||
label = f"res-{i+1}" if args.count > 1 else "reservation"
|
|
||||||
tasks.append(asyncio.create_task(
|
|
||||||
reserve(client, args.endpoint, args.hold, args.cost, label),
|
|
||||||
name=label,
|
|
||||||
))
|
|
||||||
if i + 1 < args.count:
|
|
||||||
await asyncio.sleep(args.interval)
|
|
||||||
await asyncio.gather(*tasks, return_exceptions=True)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
log.info("Interrupted")
|
|
||||||
except Exception as e:
|
|
||||||
log.error("Error: %s", e, exc_info=True)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
asyncio.run(main_async())
|
|
||||||
@@ -1,143 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
from contextlib import asynccontextmanager
|
|
||||||
from urllib.parse import urlsplit
|
|
||||||
|
|
||||||
from aiohttp import web
|
|
||||||
|
|
||||||
from vastai import (
|
|
||||||
Worker,
|
|
||||||
WorkerConfig,
|
|
||||||
HandlerConfig,
|
|
||||||
BenchmarkConfig,
|
|
||||||
LogActionConfig,
|
|
||||||
)
|
|
||||||
|
|
||||||
log = logging.getLogger(__file__)
|
|
||||||
|
|
||||||
TARGET_PERF = 100.0
|
|
||||||
BENCHMARK_SENTINEL = "__null_worker_benchmark__"
|
|
||||||
|
|
||||||
INTERNAL_HOST = "127.0.0.1"
|
|
||||||
INTERNAL_PORT = int(os.environ.get("NULL_CONTROL_PORT", 18999))
|
|
||||||
STUB_HEALTH_PATH = "/health"
|
|
||||||
|
|
||||||
BACKEND_HEALTH_URL = os.environ.get("BACKEND_HEALTH_URL", "").strip()
|
|
||||||
if BACKEND_HEALTH_URL:
|
|
||||||
_p = urlsplit(BACKEND_HEALTH_URL)
|
|
||||||
if not _p.scheme or not _p.hostname:
|
|
||||||
raise ValueError(f"BACKEND_HEALTH_URL must be absolute, got: {BACKEND_HEALTH_URL!r}")
|
|
||||||
HEALTH_BASE_URL = f"{_p.scheme}://{_p.hostname}"
|
|
||||||
HEALTH_PORT = _p.port or (443 if _p.scheme == "https" else 80)
|
|
||||||
HEALTH_PATH = _p.path or "/"
|
|
||||||
USE_STUB_HEALTH = False
|
|
||||||
else:
|
|
||||||
HEALTH_BASE_URL = f"http://{INTERNAL_HOST}"
|
|
||||||
HEALTH_PORT = INTERNAL_PORT
|
|
||||||
HEALTH_PATH = STUB_HEALTH_PATH
|
|
||||||
USE_STUB_HEALTH = True
|
|
||||||
|
|
||||||
|
|
||||||
_backend_ref: dict = {"backend": None}
|
|
||||||
|
|
||||||
|
|
||||||
def _build_internal_app() -> web.Application:
|
|
||||||
app = web.Application()
|
|
||||||
|
|
||||||
async def release_handler(_request: web.Request) -> web.Response:
|
|
||||||
# Closes the singleton session. Uses name-mangled __close_session
|
|
||||||
# to bypass the session_auth check — safe because this server is
|
|
||||||
# bound to 127.0.0.1, and it spares the consumer from threading
|
|
||||||
# session_auth through its queue.
|
|
||||||
backend = _backend_ref.get("backend")
|
|
||||||
if backend is None:
|
|
||||||
return web.json_response({"released": False, "reason": "backend not ready"}, status=503)
|
|
||||||
sids = list(backend.sessions.keys())
|
|
||||||
if not sids:
|
|
||||||
return web.json_response({"released": False, "reason": "no active session"}, status=200)
|
|
||||||
closed = []
|
|
||||||
for sid in sids:
|
|
||||||
try:
|
|
||||||
if await backend._Backend__close_session(sid):
|
|
||||||
closed.append(sid)
|
|
||||||
except Exception as e:
|
|
||||||
log.warning(f"Error closing session {sid}: {e}")
|
|
||||||
return web.json_response({"released": bool(closed), "session_ids": closed}, status=200)
|
|
||||||
|
|
||||||
app.router.add_post("/release", release_handler)
|
|
||||||
|
|
||||||
if USE_STUB_HEALTH:
|
|
||||||
async def stub_health(_request: web.Request) -> web.Response:
|
|
||||||
return web.Response(status=200, text="ok")
|
|
||||||
app.router.add_get(STUB_HEALTH_PATH, stub_health)
|
|
||||||
|
|
||||||
return app
|
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def null_lifecycle():
|
|
||||||
# Pin max_throughput to TARGET_PERF exactly — the framework's
|
|
||||||
# __run_benchmark short-circuits to float(file_contents) if this exists.
|
|
||||||
try:
|
|
||||||
with open(".has_benchmark", "w") as fh:
|
|
||||||
fh.write(str(int(TARGET_PERF)))
|
|
||||||
except OSError as e:
|
|
||||||
log.warning(f"Could not pin benchmark cache: {e}")
|
|
||||||
|
|
||||||
runner = web.AppRunner(_build_internal_app())
|
|
||||||
await runner.setup()
|
|
||||||
await web.TCPSite(runner, INTERNAL_HOST, INTERNAL_PORT).start()
|
|
||||||
|
|
||||||
log.info(
|
|
||||||
"Null pyworker control server: http://%s:%d (POST /release%s)",
|
|
||||||
INTERNAL_HOST,
|
|
||||||
INTERNAL_PORT,
|
|
||||||
f", GET {STUB_HEALTH_PATH}" if USE_STUB_HEALTH else "",
|
|
||||||
)
|
|
||||||
if not USE_STUB_HEALTH:
|
|
||||||
log.info("Framework healthcheck → %s", BACKEND_HEALTH_URL)
|
|
||||||
|
|
||||||
try:
|
|
||||||
yield
|
|
||||||
finally:
|
|
||||||
await runner.cleanup()
|
|
||||||
|
|
||||||
|
|
||||||
async def ping(**params: object) -> dict:
|
|
||||||
# Exists only to satisfy the framework's "at least one handler with a
|
|
||||||
# BenchmarkConfig" requirement. Sleep 1s on the benchmark path as a
|
|
||||||
# fallback in case the .has_benchmark cache pin failed; otherwise the
|
|
||||||
# benchmark cache short-circuits and this never runs.
|
|
||||||
if params.get(BENCHMARK_SENTINEL):
|
|
||||||
await asyncio.sleep(1.0)
|
|
||||||
return {"ok": True, "benchmark": True}
|
|
||||||
return {"ok": True}
|
|
||||||
|
|
||||||
|
|
||||||
worker_config = WorkerConfig(
|
|
||||||
model_server_url=HEALTH_BASE_URL,
|
|
||||||
model_server_port=HEALTH_PORT,
|
|
||||||
model_healthcheck_url=HEALTH_PATH,
|
|
||||||
lifecycle=null_lifecycle(),
|
|
||||||
max_sessions=1,
|
|
||||||
handlers=[
|
|
||||||
HandlerConfig(
|
|
||||||
route="/ping",
|
|
||||||
allow_parallel_requests=True,
|
|
||||||
remote_function=ping,
|
|
||||||
workload_calculator=lambda _payload: TARGET_PERF,
|
|
||||||
benchmark_config=BenchmarkConfig(
|
|
||||||
generator=lambda: {BENCHMARK_SENTINEL: True},
|
|
||||||
runs=1,
|
|
||||||
concurrency=1,
|
|
||||||
do_warmup=False,
|
|
||||||
),
|
|
||||||
),
|
|
||||||
],
|
|
||||||
log_action_config=LogActionConfig(),
|
|
||||||
)
|
|
||||||
|
|
||||||
_worker = Worker(worker_config)
|
|
||||||
_backend_ref["backend"] = _worker.backend
|
|
||||||
_worker.run()
|
|
||||||
@@ -60,20 +60,20 @@ worker_config = WorkerConfig(
|
|||||||
route="/v1/completions",
|
route="/v1/completions",
|
||||||
workload_calculator= lambda data: data.get("max_tokens", 0),
|
workload_calculator= lambda data: data.get("max_tokens", 0),
|
||||||
allow_parallel_requests=True,
|
allow_parallel_requests=True,
|
||||||
|
max_queue_time=60.0,
|
||||||
request_parser=request_parser,
|
request_parser=request_parser,
|
||||||
max_queue_time=600.0,
|
|
||||||
benchmark_config=BenchmarkConfig(
|
benchmark_config=BenchmarkConfig(
|
||||||
generator=completions_benchmark_generator,
|
generator=completions_benchmark_generator,
|
||||||
concurrency=10,
|
concurrency=100,
|
||||||
runs=3
|
runs=2
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
HandlerConfig(
|
HandlerConfig(
|
||||||
route="/v1/chat/completions",
|
route="/v1/chat/completions",
|
||||||
workload_calculator= lambda data: data.get("max_tokens", 0),
|
workload_calculator= lambda data: data.get("max_tokens", 0),
|
||||||
allow_parallel_requests=True,
|
allow_parallel_requests=True,
|
||||||
request_parser=request_parser,
|
max_queue_time=60.0,
|
||||||
max_queue_time=600.0,
|
request_parser=request_parser
|
||||||
)
|
)
|
||||||
],
|
],
|
||||||
log_action_config=LogActionConfig(
|
log_action_config=LogActionConfig(
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ def benchmark_generator() -> dict:
|
|||||||
benchmark_data = {
|
benchmark_data = {
|
||||||
"inputs": prompt,
|
"inputs": prompt,
|
||||||
"parameters": {
|
"parameters": {
|
||||||
"max_new_tokens": 500,
|
"max_new_tokens": 128,
|
||||||
"temperature": 0.7,
|
"temperature": 0.7,
|
||||||
"return_full_text": False
|
"return_full_text": False
|
||||||
}
|
}
|
||||||
@@ -52,18 +52,17 @@ worker_config = WorkerConfig(
|
|||||||
HandlerConfig(
|
HandlerConfig(
|
||||||
route="/generate",
|
route="/generate",
|
||||||
allow_parallel_requests=True,
|
allow_parallel_requests=True,
|
||||||
max_queue_time=600.0,
|
max_queue_time=60.0,
|
||||||
benchmark_config=BenchmarkConfig(
|
benchmark_config=BenchmarkConfig(
|
||||||
generator=benchmark_generator,
|
generator=benchmark_generator,
|
||||||
concurrency=10,
|
concurrency=50
|
||||||
runs=3
|
|
||||||
),
|
),
|
||||||
workload_calculator= lambda x: x["parameters"]["max_new_tokens"]
|
workload_calculator= lambda x: x["parameters"]["max_new_tokens"]
|
||||||
),
|
),
|
||||||
HandlerConfig(
|
HandlerConfig(
|
||||||
route="/generate_stream",
|
route="/generate_stream",
|
||||||
allow_parallel_requests=True,
|
allow_parallel_requests=True,
|
||||||
max_queue_time=600.0,
|
max_queue_time=60.0,
|
||||||
workload_calculator= lambda x: x["parameters"]["max_new_tokens"]
|
workload_calculator= lambda x: x["parameters"]["max_new_tokens"]
|
||||||
)
|
)
|
||||||
],
|
],
|
||||||
|
|||||||
Reference in New Issue
Block a user