Compare commits
32 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a81d3febe7 | |||
| 913e3a8782 | |||
| 47ad0ebe0a | |||
| 34fd21e76a | |||
| 1d2caaf554 | |||
| 01eff874d8 | |||
| d51f04a176 | |||
| ef248ef695 | |||
| 6a562a1376 | |||
| 6c2f194b28 | |||
| 2aada7b210 | |||
| 8df562e243 | |||
| 4eef5e22af | |||
| 9d969e376e | |||
| ef3f34a515 | |||
| 147bf2597a | |||
| dc423e2999 | |||
| 463f3de8ea | |||
| ed0db198c3 | |||
| 3668d948be | |||
| 254ccdf181 | |||
| 89761b378a | |||
| 18974873e5 | |||
| 9bc9ba11c5 | |||
| 48fdc65e3d | |||
| 2cd97315cd | |||
| 83c31e25a9 | |||
| fbe1dca6fa | |||
| 4c3120dbc5 | |||
| d7d9b915f6 | |||
| 4660b337fb | |||
| 7506ecb6b5 |
@@ -1 +1,2 @@
|
|||||||
vastai-sdk>=0.3.0
|
vastai-sdk>=0.3.0
|
||||||
|
nltk==3.9.4
|
||||||
+117
-77
@@ -2,10 +2,17 @@
|
|||||||
|
|
||||||
set -e -o pipefail
|
set -e -o pipefail
|
||||||
|
|
||||||
|
# Check for force update flag
|
||||||
|
FORCE_UPDATE=false
|
||||||
|
if [ -f "/.force_update" ]; then
|
||||||
|
echo "Force update flag detected at /.force_update"
|
||||||
|
FORCE_UPDATE=true
|
||||||
|
fi
|
||||||
|
|
||||||
WORKSPACE_DIR="${WORKSPACE_DIR:-/workspace}"
|
WORKSPACE_DIR="${WORKSPACE_DIR:-/workspace}"
|
||||||
|
|
||||||
SERVER_DIR="$WORKSPACE_DIR/vast-pyworker"
|
SERVER_DIR="$WORKSPACE_DIR/vast-pyworker"
|
||||||
ENV_PATH="$WORKSPACE_DIR/worker-env"
|
ENV_PATH="${ENV_PATH:-$WORKSPACE_DIR/worker-env}"
|
||||||
DEBUG_LOG="$WORKSPACE_DIR/debug.log"
|
DEBUG_LOG="$WORKSPACE_DIR/debug.log"
|
||||||
PYWORKER_LOG="$WORKSPACE_DIR/pyworker.log"
|
PYWORKER_LOG="$WORKSPACE_DIR/pyworker.log"
|
||||||
|
|
||||||
@@ -47,29 +54,38 @@ JSON
|
|||||||
}
|
}
|
||||||
|
|
||||||
function install_vastai_sdk() {
|
function install_vastai_sdk() {
|
||||||
# If SDK_BRANCH is set, install vastai-sdk from the vast-sdk repo at that branch/tag/commit.
|
local uv_flags=()
|
||||||
|
if [ "${USE_SYSTEM_PYTHON:-}" = "true" ]; then
|
||||||
|
uv_flags+=(--system --break-system-packages)
|
||||||
|
fi
|
||||||
|
if [ "$FORCE_UPDATE" = true ]; then
|
||||||
|
uv_flags+=(--force-reinstall)
|
||||||
|
echo "Force reinstalling vastai"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# If SDK_BRANCH is set, install vastai from the vast-cli repo at that branch/tag/commit.
|
||||||
if [ -n "${SDK_BRANCH:-}" ]; then
|
if [ -n "${SDK_BRANCH:-}" ]; then
|
||||||
if [ -n "${SDK_VERSION:-}" ]; then
|
if [ -n "${SDK_VERSION:-}" ]; then
|
||||||
echo "WARNING: Both SDK_BRANCH and SDK_VERSION are set; using SDK_BRANCH=${SDK_BRANCH}"
|
echo "WARNING: Both SDK_BRANCH and SDK_VERSION are set; using SDK_BRANCH=${SDK_BRANCH}"
|
||||||
fi
|
fi
|
||||||
echo "Installing vastai-sdk from https://github.com/vast-ai/vast-sdk/ @ ${SDK_BRANCH}"
|
echo "Installing vastai from https://github.com/vast-ai/vast-cli/ @ ${SDK_BRANCH}"
|
||||||
if ! uv pip install "vastai-sdk @ git+https://github.com/vast-ai/vast-sdk.git@${SDK_BRANCH}"; then
|
if ! uv pip install "${uv_flags[@]}" "vastai @ git+https://github.com/vast-ai/vast-cli.git@${SDK_BRANCH}"; then
|
||||||
report_error_and_exit "Failed to install vastai-sdk from vast-ai/vast-sdk@${SDK_BRANCH}"
|
report_error_and_exit "Failed to install vastai from vast-ai/vast-cli@${SDK_BRANCH}"
|
||||||
fi
|
fi
|
||||||
return 0
|
return 0
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if [ -n "${SDK_VERSION:-}" ]; then
|
if [ -n "${SDK_VERSION:-}" ]; then
|
||||||
echo "Installing vastai-sdk version ${SDK_VERSION}"
|
echo "Installing vastai version ${SDK_VERSION}"
|
||||||
if ! uv pip install "vastai-sdk==${SDK_VERSION}"; then
|
if ! uv pip install "${uv_flags[@]}" "vastai==${SDK_VERSION}"; then
|
||||||
report_error_and_exit "Failed to install vastai-sdk==${SDK_VERSION}"
|
report_error_and_exit "Failed to install vastai==${SDK_VERSION}"
|
||||||
fi
|
fi
|
||||||
return 0
|
return 0
|
||||||
fi
|
fi
|
||||||
|
|
||||||
echo "Installing default vastai-sdk"
|
echo "Installing default vastai"
|
||||||
if ! uv pip install vastai-sdk; then
|
if ! uv pip install "${uv_flags[@]}" vastai; then
|
||||||
report_error_and_exit "Failed to install vastai-sdk"
|
report_error_and_exit "Failed to install vastai"
|
||||||
fi
|
fi
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -112,8 +128,21 @@ if ! grep -q "VAST" /etc/environment; then
|
|||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if [ ! -d "$ENV_PATH" ]
|
if [ "${USE_SYSTEM_PYTHON:-}" = "true" ]; then
|
||||||
then
|
echo "Using system Python: $(which python3)"
|
||||||
|
if ! which uv > /dev/null 2>&1; then
|
||||||
|
if ! curl -LsSf https://astral.sh/uv/install.sh | sh; then
|
||||||
|
report_error_and_exit "Failed to install uv package manager"
|
||||||
|
fi
|
||||||
|
if [[ -f ~/.local/bin/env ]]; then
|
||||||
|
if ! source ~/.local/bin/env; then
|
||||||
|
report_error_and_exit "Failed to source uv environment"
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
install_vastai_sdk
|
||||||
|
touch ~/.no_auto_tmux
|
||||||
|
elif [ ! -d "$ENV_PATH" ]; then
|
||||||
echo "setting up venv"
|
echo "setting up venv"
|
||||||
if ! which uv; then
|
if ! which uv; then
|
||||||
if ! curl -LsSf https://astral.sh/uv/install.sh | sh; then
|
if ! curl -LsSf https://astral.sh/uv/install.sh | sh; then
|
||||||
@@ -132,10 +161,27 @@ then
|
|||||||
if ! git clone "${PYWORKER_REPO:-https://github.com/vast-ai/pyworker}" "$SERVER_DIR"; then
|
if ! git clone "${PYWORKER_REPO:-https://github.com/vast-ai/pyworker}" "$SERVER_DIR"; then
|
||||||
report_error_and_exit "Failed to clone pyworker repository"
|
report_error_and_exit "Failed to clone pyworker repository"
|
||||||
fi
|
fi
|
||||||
|
elif [ "$FORCE_UPDATE" = true ]; then
|
||||||
|
echo "Force updating pyworker repository"
|
||||||
|
if ! (cd "$SERVER_DIR" && git fetch --all); then
|
||||||
|
report_error_and_exit "Failed to fetch pyworker repository updates"
|
||||||
|
fi
|
||||||
fi
|
fi
|
||||||
if [[ -n ${PYWORKER_REF:-} ]]; then
|
if [[ -n ${PYWORKER_REF:-} ]]; then
|
||||||
if ! (cd "$SERVER_DIR" && git checkout "$PYWORKER_REF"); then
|
if [ "$FORCE_UPDATE" = true ]; then
|
||||||
report_error_and_exit "Failed to checkout pyworker reference: $PYWORKER_REF"
|
echo "Force updating to pyworker reference: $PYWORKER_REF"
|
||||||
|
if ! (cd "$SERVER_DIR" && git checkout "$PYWORKER_REF" && git pull); then
|
||||||
|
report_error_and_exit "Failed to force update pyworker reference: $PYWORKER_REF"
|
||||||
|
fi
|
||||||
|
else
|
||||||
|
if ! (cd "$SERVER_DIR" && git checkout "$PYWORKER_REF"); then
|
||||||
|
report_error_and_exit "Failed to checkout pyworker reference: $PYWORKER_REF"
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
elif [ "$FORCE_UPDATE" = true ]; then
|
||||||
|
echo "Force updating pyworker to latest"
|
||||||
|
if ! (cd "$SERVER_DIR" && git pull); then
|
||||||
|
report_error_and_exit "Failed to pull latest pyworker changes"
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
|
|
||||||
@@ -162,11 +208,44 @@ else
|
|||||||
report_error_and_exit "Failed to source uv environment"
|
report_error_and_exit "Failed to source uv environment"
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
if ! source "$WORKSPACE_DIR/worker-env/bin/activate"; then
|
if ! source "$ENV_PATH/bin/activate"; then
|
||||||
report_error_and_exit "Failed to activate existing virtual environment"
|
report_error_and_exit "Failed to activate existing virtual environment"
|
||||||
fi
|
fi
|
||||||
echo "environment activated"
|
echo "environment activated"
|
||||||
echo "venv: $VIRTUAL_ENV"
|
echo "venv: $VIRTUAL_ENV"
|
||||||
|
|
||||||
|
# Handle force update for existing environment
|
||||||
|
if [ "$FORCE_UPDATE" = true ]; then
|
||||||
|
echo "Performing force update on existing environment"
|
||||||
|
|
||||||
|
if [[ -d $SERVER_DIR ]]; then
|
||||||
|
echo "Force updating pyworker repository"
|
||||||
|
if ! (cd "$SERVER_DIR" && git fetch --all); then
|
||||||
|
report_error_and_exit "Failed to fetch pyworker repository updates"
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [[ -n ${PYWORKER_REF:-} ]]; then
|
||||||
|
echo "Force updating to pyworker reference: $PYWORKER_REF"
|
||||||
|
if ! (cd "$SERVER_DIR" && git checkout "$PYWORKER_REF" && git pull); then
|
||||||
|
report_error_and_exit "Failed to force update pyworker reference: $PYWORKER_REF"
|
||||||
|
fi
|
||||||
|
else
|
||||||
|
echo "Force updating pyworker to latest"
|
||||||
|
if ! (cd "$SERVER_DIR" && git pull); then
|
||||||
|
report_error_and_exit "Failed to pull latest pyworker changes"
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
|
||||||
|
install_vastai_sdk
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
|
||||||
|
# Remove force update flag after successful update
|
||||||
|
if [ "$FORCE_UPDATE" = true ]; then
|
||||||
|
echo "Removing force update flag"
|
||||||
|
rm -f "/.force_update"
|
||||||
|
echo "Force update completed successfully"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if [ "$USE_SSL" = true ]; then
|
if [ "$USE_SSL" = true ]; then
|
||||||
@@ -204,12 +283,23 @@ EOF
|
|||||||
report_error_and_exit "Failed to generate SSL certificate request"
|
report_error_and_exit "Failed to generate SSL certificate request"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if ! curl --header 'Content-Type: application/octet-stream' \
|
max_retries=5
|
||||||
--data-binary @/etc/instance.csr \
|
retry_delay=2
|
||||||
-X \
|
for attempt in $(seq 1 "$max_retries"); do
|
||||||
POST "https://console.vast.ai/api/v0/sign_cert/?instance_id=$CONTAINER_ID" > /etc/instance.crt; then
|
http_code=$(curl -sS -o /etc/instance.crt -w '%{http_code}' \
|
||||||
report_error_and_exit "Failed to sign SSL certificate"
|
--header 'Content-Type: application/octet-stream' \
|
||||||
fi
|
--data-binary @/etc/instance.csr \
|
||||||
|
-X POST "https://console.vast.ai/api/v0/sign_cert/?instance_id=$CONTAINER_ID")
|
||||||
|
if [ "$http_code" -ge 200 ] && [ "$http_code" -lt 300 ]; then
|
||||||
|
break
|
||||||
|
fi
|
||||||
|
echo "SSL cert signing attempt $attempt/$max_retries failed (HTTP $http_code)"
|
||||||
|
if [ "$attempt" -eq "$max_retries" ]; then
|
||||||
|
report_error_and_exit "Failed to sign SSL certificate after $max_retries attempts (HTTP $http_code)"
|
||||||
|
fi
|
||||||
|
sleep "$retry_delay"
|
||||||
|
retry_delay=$((retry_delay * 2))
|
||||||
|
done
|
||||||
fi
|
fi
|
||||||
|
|
||||||
export REPORT_ADDR WORKER_PORT USE_SSL UNSECURED
|
export REPORT_ADDR WORKER_PORT USE_SSL UNSECURED
|
||||||
@@ -227,63 +317,13 @@ if [ "$IS_DEPLOYMENT" = "true" ]; then
|
|||||||
# Download deployment code, retrying until the blob is available on S3.
|
# Download deployment code, retrying until the blob is available on S3.
|
||||||
# The s3_key exists in the DB as soon as the deployment is created, but the
|
# The s3_key exists in the DB as soon as the deployment is created, but the
|
||||||
# actual upload may still be in flight from the client side.
|
# actual upload may still be in flight from the client side.
|
||||||
echo "Downloading deployment code..."
|
|
||||||
RETRY=0
|
|
||||||
while true; do
|
|
||||||
DOWNLOAD_RESPONSE=$(curl -sS \
|
|
||||||
-H "Authorization: Bearer $CONTAINER_API_KEY" \
|
|
||||||
"${VAST_API_BASE}/api/v0/deployment/${DEPLOYMENT_ID}/download_url/")
|
|
||||||
DOWNLOAD_URL=$(python3 -c "
|
|
||||||
import sys, json
|
|
||||||
try:
|
|
||||||
d = json.load(sys.stdin)
|
|
||||||
print(d.get('download_url') or '')
|
|
||||||
except: print('')
|
|
||||||
" <<< "$DOWNLOAD_RESPONSE")
|
|
||||||
|
|
||||||
if [ -z "$DOWNLOAD_URL" ] || [ "$DOWNLOAD_URL" = "None" ]; then
|
|
||||||
RETRY=$((RETRY + 1))
|
|
||||||
echo "No download URL yet (attempt $RETRY), retrying in 10s... response: $DOWNLOAD_RESPONSE"
|
|
||||||
sleep 10
|
|
||||||
continue
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Got a URL — try the actual S3 download
|
|
||||||
HTTP_CODE=$(curl -sS -L -o "$DEPLOY_DIR/deployment.tar.gz" -w "%{http_code}" "$DOWNLOAD_URL")
|
|
||||||
if [ "$HTTP_CODE" = "200" ]; then
|
|
||||||
break
|
|
||||||
fi
|
|
||||||
|
|
||||||
RETRY=$((RETRY + 1))
|
|
||||||
echo "S3 download returned HTTP $HTTP_CODE (attempt $RETRY), blob not yet uploaded. Retrying in 10s..."
|
|
||||||
rm -f "$DEPLOY_DIR/deployment.tar.gz"
|
|
||||||
sleep 10
|
|
||||||
done
|
|
||||||
|
|
||||||
cd "$DEPLOY_DIR" && tar xzf deployment.tar.gz
|
|
||||||
echo "Deployment code extracted."
|
|
||||||
|
|
||||||
# Source secrets if present
|
|
||||||
if [ -f "$DEPLOY_DIR/.secrets" ]; then
|
|
||||||
echo "Sourcing secrets..."
|
|
||||||
source "$DEPLOY_DIR/.secrets"
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Run on_start.sh to completion if present
|
|
||||||
if [ -f "$DEPLOY_DIR/on_start.sh" ]; then
|
|
||||||
echo "Running on_start.sh..."
|
|
||||||
chmod +x "$DEPLOY_DIR/on_start.sh"
|
|
||||||
bash "$DEPLOY_DIR/on_start.sh"
|
|
||||||
echo "on_start.sh completed."
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Install SDK (uses the install_vastai_sdk function which supports SDK_BRANCH/SDK_VERSION)
|
# Install SDK (uses the install_vastai_sdk function which supports SDK_BRANCH/SDK_VERSION)
|
||||||
install_vastai_sdk
|
install_vastai_sdk
|
||||||
|
|
||||||
# Run deployment in serve mode
|
# Run deployment in serve mode
|
||||||
export VAST_DEPLOYMENT_MODE=serve
|
export VAST_DEPLOYMENT_MODE=serve
|
||||||
echo "Starting deployment: python3 $DEPLOY_DIR/deployment.py"
|
echo "Starting deployment: python3 $DEPLOY_DIR/deployment.py"
|
||||||
python3 "$DEPLOY_DIR/deployment.py"
|
serve-vast-deployment
|
||||||
exit $?
|
exit $?
|
||||||
fi
|
fi
|
||||||
# ─── End SDK Deployment Mode ───────────────────────────────────────────
|
# ─── End SDK Deployment Mode ───────────────────────────────────────────
|
||||||
@@ -299,19 +339,19 @@ set +e
|
|||||||
PY_STATUS=1
|
PY_STATUS=1
|
||||||
|
|
||||||
if [ -f "$SERVER_DIR/worker.py" ]; then
|
if [ -f "$SERVER_DIR/worker.py" ]; then
|
||||||
echo "trying worker.py"
|
echo "Running worker.py"
|
||||||
python3 -m "worker" |& tee -a "$PYWORKER_LOG"
|
python3 -m "worker" |& tee -a "$PYWORKER_LOG"
|
||||||
PY_STATUS=${PIPESTATUS[0]}
|
PY_STATUS=${PIPESTATUS[0]}
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if [ "${PY_STATUS}" -ne 0 ] && [ -f "$SERVER_DIR/workers/$BACKEND/worker.py" ]; then
|
if [ "${PY_STATUS}" -ne 0 ] && [ -f "$SERVER_DIR/workers/$BACKEND/worker.py" ]; then
|
||||||
echo "trying workers.${BACKEND}.worker"
|
echo "Running workers.${BACKEND}.worker"
|
||||||
python3 -m "workers.${BACKEND}.worker" |& tee -a "$PYWORKER_LOG"
|
python3 -m "workers.${BACKEND}.worker" |& tee -a "$PYWORKER_LOG"
|
||||||
PY_STATUS=${PIPESTATUS[0]}
|
PY_STATUS=${PIPESTATUS[0]}
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if [ "${PY_STATUS}" -ne 0 ] && [ -f "$SERVER_DIR/workers/$BACKEND/server.py" ]; then
|
if [ "${PY_STATUS}" -ne 0 ] && [ -f "$SERVER_DIR/workers/$BACKEND/server.py" ]; then
|
||||||
echo "trying workers.${BACKEND}.server"
|
echo "Running workers.${BACKEND}.server"
|
||||||
python3 -m "workers.${BACKEND}.server" |& tee -a "$PYWORKER_LOG"
|
python3 -m "workers.${BACKEND}.server" |& tee -a "$PYWORKER_LOG"
|
||||||
PY_STATUS=${PIPESTATUS[0]}
|
PY_STATUS=${PIPESTATUS[0]}
|
||||||
fi
|
fi
|
||||||
@@ -325,4 +365,4 @@ if [ "${PY_STATUS}" -ne 0 ]; then
|
|||||||
report_error_and_exit "PyWorker exited with status ${PY_STATUS}"
|
report_error_and_exit "PyWorker exited with status ${PY_STATUS}"
|
||||||
fi
|
fi
|
||||||
|
|
||||||
echo "launching PyWorker server done"
|
echo "PyWorker bootstrap complete"
|
||||||
|
|||||||
@@ -0,0 +1,88 @@
|
|||||||
|
# Null PyWorker
|
||||||
|
|
||||||
|
Holds Vast Serverless reservations open without forwarding any work to a
|
||||||
|
model. Use it when your real workload (a queue consumer in any language)
|
||||||
|
runs as a separate process on the instance and you just want to drive
|
||||||
|
Vast autoscaling: **one POST reserves a worker, one POST releases it.**
|
||||||
|
|
||||||
|
## Use case
|
||||||
|
|
||||||
|
You have a job queue on your own infrastructure (Redis, SQS, NATS, etc.)
|
||||||
|
and a consumer (node, golang, python, a binary — anything) that pulls
|
||||||
|
from it. You want one Vast worker per unit of in-flight work, scaling
|
||||||
|
elastically from zero. The null PyWorker is the autoscaling driver; your
|
||||||
|
consumer does the work.
|
||||||
|
|
||||||
|
## How it works
|
||||||
|
|
||||||
|
Reservations use the framework's session API. The SDK's
|
||||||
|
`endpoint.session(...)` POSTs `/session/create` to reserve a worker;
|
||||||
|
`session.close()` POSTs `/session/end` to release it. `max_sessions=1`
|
||||||
|
means each worker holds exactly one reservation — the next reservation
|
||||||
|
either lands on a free worker or triggers a scale-up.
|
||||||
|
|
||||||
|
The PyWorker itself does nothing functional:
|
||||||
|
|
||||||
|
- One trivial `/ping` route to satisfy the framework's benchmark
|
||||||
|
requirement (its `max_perf` is pinned to 100).
|
||||||
|
- An internal `/release` endpoint on `127.0.0.1:18999` for the local
|
||||||
|
consumer to end the session without needing `session_auth`.
|
||||||
|
|
||||||
|
## Endpoint parameters
|
||||||
|
|
||||||
|
Tested working configuration:
|
||||||
|
|
||||||
|
| Parameter | Value | Why |
|
||||||
|
|---|---|---|
|
||||||
|
| `target_util` | `1.0` | One session = one worker. Default `0.9` rounds up to an extra worker. |
|
||||||
|
| `min_load` | `0` | Scale-to-zero floor. |
|
||||||
|
| `max_queue_time` | `1` | Stop routing to an occupied worker after ~1s of implied queue. |
|
||||||
|
| `target_queue_time` | `0.5` | Trigger scale-up promptly once anything queues. |
|
||||||
|
| `inactivity_timeout` | `10` (seconds) | Permit scale-to-zero after 10s idle. |
|
||||||
|
|
||||||
|
## API
|
||||||
|
|
||||||
|
| Route | Where | Use |
|
||||||
|
|---|---|---|
|
||||||
|
| `POST /session/create` | endpoint, signed | Reserve a worker (`endpoint.session(...)`) |
|
||||||
|
| `POST /session/end` | endpoint, signed | Release (`session.close()`) |
|
||||||
|
| `POST /release` | `127.0.0.1:18999`, no auth | Local consumer release, no `session_auth` needed |
|
||||||
|
|
||||||
|
## Healthcheck
|
||||||
|
|
||||||
|
Default: stub on `127.0.0.1:18999/health` returning `200`. Set
|
||||||
|
`BACKEND_HEALTH_URL=http://127.0.0.1:9090/health` (absolute URL) to point
|
||||||
|
the framework at your queue consumer's health endpoint instead — if the
|
||||||
|
consumer dies, the autoscaler sees the worker as broken.
|
||||||
|
|
||||||
|
## Deploying
|
||||||
|
|
||||||
|
1. Point `PYWORKER_REPO` at this repo (or your fork).
|
||||||
|
2. Set `BACKEND=null` in the template.
|
||||||
|
3. Run your queue consumer alongside the PyWorker. When it's done with
|
||||||
|
a unit of work:
|
||||||
|
```bash
|
||||||
|
curl -X POST http://127.0.0.1:18999/release
|
||||||
|
```
|
||||||
|
|
||||||
|
## Client demo
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Single reservation, hold 180s
|
||||||
|
python -m workers.null.client --endpoint <NAME> --instance alpha
|
||||||
|
|
||||||
|
# Three concurrent reservations, started 30s apart, each held 360s
|
||||||
|
python -m workers.null.client --endpoint <NAME> --instance alpha --count 3 --hold 360
|
||||||
|
```
|
||||||
|
|
||||||
|
Flags: `--count` (number of concurrent sessions, default 1), `--hold`
|
||||||
|
(seconds each session is held, default 180), `--interval` (seconds
|
||||||
|
between starts when `--count > 1`, default 30), `--cost` (cost reported
|
||||||
|
at session-create, default 100 = `max_perf`), `--instance` (`prod` |
|
||||||
|
`alpha` | `candidate` | `local`).
|
||||||
|
|
||||||
|
## Environment variables
|
||||||
|
|
||||||
|
- `BACKEND_HEALTH_URL` — absolute URL the framework healthchecks. Stub
|
||||||
|
is used when unset.
|
||||||
|
- `NULL_CONTROL_PORT` — internal control server port. Defaults to `18999`.
|
||||||
@@ -0,0 +1,64 @@
|
|||||||
|
import argparse
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
from vastai import Serverless
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.INFO,
|
||||||
|
format="%(asctime)s[%(levelname)-5s] %(message)s",
|
||||||
|
datefmt="%Y-%m-%d %H:%M:%S",
|
||||||
|
)
|
||||||
|
log = logging.getLogger(__file__)
|
||||||
|
|
||||||
|
|
||||||
|
async def reserve(client: Serverless, endpoint_name: str, hold: float, cost: int, label: str):
|
||||||
|
endpoint = await client.get_endpoint(name=endpoint_name)
|
||||||
|
async with await endpoint.session(cost=cost, lifetime=hold + 60) as s:
|
||||||
|
sid = s.session_id
|
||||||
|
log.info("[%s] %s open, holding %.0fs", label, sid, hold)
|
||||||
|
await asyncio.sleep(hold)
|
||||||
|
log.info("[%s] %s closed", label, sid)
|
||||||
|
|
||||||
|
|
||||||
|
async def main_async():
|
||||||
|
p = argparse.ArgumentParser(description="Vast Null PyWorker demo client")
|
||||||
|
p.add_argument("--endpoint", default=os.environ.get("VAST_ENDPOINT", "null-prod"))
|
||||||
|
p.add_argument("--instance", choices=("prod", "alpha", "candidate", "local"),
|
||||||
|
default=os.environ.get("VAST_INSTANCE", "prod"))
|
||||||
|
p.add_argument("--count", type=int, default=1,
|
||||||
|
help="concurrent sessions to open (default: 1)")
|
||||||
|
p.add_argument("--interval", type=float, default=30.0,
|
||||||
|
help="seconds between session starts when count>1 (default: 30)")
|
||||||
|
p.add_argument("--hold", type=float, default=180.0,
|
||||||
|
help="seconds to hold each session (default: 180)")
|
||||||
|
p.add_argument("--cost", type=int, default=100,
|
||||||
|
help="cost reported at session-create (default: 100)")
|
||||||
|
args = p.parse_args()
|
||||||
|
|
||||||
|
print(f"endpoint={args.endpoint} instance={args.instance} "
|
||||||
|
f"count={args.count} hold={args.hold}s cost={args.cost}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with Serverless(instance=args.instance) as client:
|
||||||
|
tasks = []
|
||||||
|
for i in range(args.count):
|
||||||
|
label = f"res-{i+1}" if args.count > 1 else "reservation"
|
||||||
|
tasks.append(asyncio.create_task(
|
||||||
|
reserve(client, args.endpoint, args.hold, args.cost, label),
|
||||||
|
name=label,
|
||||||
|
))
|
||||||
|
if i + 1 < args.count:
|
||||||
|
await asyncio.sleep(args.interval)
|
||||||
|
await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
log.info("Interrupted")
|
||||||
|
except Exception as e:
|
||||||
|
log.error("Error: %s", e, exc_info=True)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
asyncio.run(main_async())
|
||||||
@@ -0,0 +1,143 @@
|
|||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
|
from urllib.parse import urlsplit
|
||||||
|
|
||||||
|
from aiohttp import web
|
||||||
|
|
||||||
|
from vastai import (
|
||||||
|
Worker,
|
||||||
|
WorkerConfig,
|
||||||
|
HandlerConfig,
|
||||||
|
BenchmarkConfig,
|
||||||
|
LogActionConfig,
|
||||||
|
)
|
||||||
|
|
||||||
|
log = logging.getLogger(__file__)
|
||||||
|
|
||||||
|
TARGET_PERF = 100.0
|
||||||
|
BENCHMARK_SENTINEL = "__null_worker_benchmark__"
|
||||||
|
|
||||||
|
INTERNAL_HOST = "127.0.0.1"
|
||||||
|
INTERNAL_PORT = int(os.environ.get("NULL_CONTROL_PORT", 18999))
|
||||||
|
STUB_HEALTH_PATH = "/health"
|
||||||
|
|
||||||
|
BACKEND_HEALTH_URL = os.environ.get("BACKEND_HEALTH_URL", "").strip()
|
||||||
|
if BACKEND_HEALTH_URL:
|
||||||
|
_p = urlsplit(BACKEND_HEALTH_URL)
|
||||||
|
if not _p.scheme or not _p.hostname:
|
||||||
|
raise ValueError(f"BACKEND_HEALTH_URL must be absolute, got: {BACKEND_HEALTH_URL!r}")
|
||||||
|
HEALTH_BASE_URL = f"{_p.scheme}://{_p.hostname}"
|
||||||
|
HEALTH_PORT = _p.port or (443 if _p.scheme == "https" else 80)
|
||||||
|
HEALTH_PATH = _p.path or "/"
|
||||||
|
USE_STUB_HEALTH = False
|
||||||
|
else:
|
||||||
|
HEALTH_BASE_URL = f"http://{INTERNAL_HOST}"
|
||||||
|
HEALTH_PORT = INTERNAL_PORT
|
||||||
|
HEALTH_PATH = STUB_HEALTH_PATH
|
||||||
|
USE_STUB_HEALTH = True
|
||||||
|
|
||||||
|
|
||||||
|
_backend_ref: dict = {"backend": None}
|
||||||
|
|
||||||
|
|
||||||
|
def _build_internal_app() -> web.Application:
|
||||||
|
app = web.Application()
|
||||||
|
|
||||||
|
async def release_handler(_request: web.Request) -> web.Response:
|
||||||
|
# Closes the singleton session. Uses name-mangled __close_session
|
||||||
|
# to bypass the session_auth check — safe because this server is
|
||||||
|
# bound to 127.0.0.1, and it spares the consumer from threading
|
||||||
|
# session_auth through its queue.
|
||||||
|
backend = _backend_ref.get("backend")
|
||||||
|
if backend is None:
|
||||||
|
return web.json_response({"released": False, "reason": "backend not ready"}, status=503)
|
||||||
|
sids = list(backend.sessions.keys())
|
||||||
|
if not sids:
|
||||||
|
return web.json_response({"released": False, "reason": "no active session"}, status=200)
|
||||||
|
closed = []
|
||||||
|
for sid in sids:
|
||||||
|
try:
|
||||||
|
if await backend._Backend__close_session(sid):
|
||||||
|
closed.append(sid)
|
||||||
|
except Exception as e:
|
||||||
|
log.warning(f"Error closing session {sid}: {e}")
|
||||||
|
return web.json_response({"released": bool(closed), "session_ids": closed}, status=200)
|
||||||
|
|
||||||
|
app.router.add_post("/release", release_handler)
|
||||||
|
|
||||||
|
if USE_STUB_HEALTH:
|
||||||
|
async def stub_health(_request: web.Request) -> web.Response:
|
||||||
|
return web.Response(status=200, text="ok")
|
||||||
|
app.router.add_get(STUB_HEALTH_PATH, stub_health)
|
||||||
|
|
||||||
|
return app
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def null_lifecycle():
|
||||||
|
# Pin max_throughput to TARGET_PERF exactly — the framework's
|
||||||
|
# __run_benchmark short-circuits to float(file_contents) if this exists.
|
||||||
|
try:
|
||||||
|
with open(".has_benchmark", "w") as fh:
|
||||||
|
fh.write(str(int(TARGET_PERF)))
|
||||||
|
except OSError as e:
|
||||||
|
log.warning(f"Could not pin benchmark cache: {e}")
|
||||||
|
|
||||||
|
runner = web.AppRunner(_build_internal_app())
|
||||||
|
await runner.setup()
|
||||||
|
await web.TCPSite(runner, INTERNAL_HOST, INTERNAL_PORT).start()
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"Null pyworker control server: http://%s:%d (POST /release%s)",
|
||||||
|
INTERNAL_HOST,
|
||||||
|
INTERNAL_PORT,
|
||||||
|
f", GET {STUB_HEALTH_PATH}" if USE_STUB_HEALTH else "",
|
||||||
|
)
|
||||||
|
if not USE_STUB_HEALTH:
|
||||||
|
log.info("Framework healthcheck → %s", BACKEND_HEALTH_URL)
|
||||||
|
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
await runner.cleanup()
|
||||||
|
|
||||||
|
|
||||||
|
async def ping(**params: object) -> dict:
|
||||||
|
# Exists only to satisfy the framework's "at least one handler with a
|
||||||
|
# BenchmarkConfig" requirement. Sleep 1s on the benchmark path as a
|
||||||
|
# fallback in case the .has_benchmark cache pin failed; otherwise the
|
||||||
|
# benchmark cache short-circuits and this never runs.
|
||||||
|
if params.get(BENCHMARK_SENTINEL):
|
||||||
|
await asyncio.sleep(1.0)
|
||||||
|
return {"ok": True, "benchmark": True}
|
||||||
|
return {"ok": True}
|
||||||
|
|
||||||
|
|
||||||
|
worker_config = WorkerConfig(
|
||||||
|
model_server_url=HEALTH_BASE_URL,
|
||||||
|
model_server_port=HEALTH_PORT,
|
||||||
|
model_healthcheck_url=HEALTH_PATH,
|
||||||
|
lifecycle=null_lifecycle(),
|
||||||
|
max_sessions=1,
|
||||||
|
handlers=[
|
||||||
|
HandlerConfig(
|
||||||
|
route="/ping",
|
||||||
|
allow_parallel_requests=True,
|
||||||
|
remote_function=ping,
|
||||||
|
workload_calculator=lambda _payload: TARGET_PERF,
|
||||||
|
benchmark_config=BenchmarkConfig(
|
||||||
|
generator=lambda: {BENCHMARK_SENTINEL: True},
|
||||||
|
runs=1,
|
||||||
|
concurrency=1,
|
||||||
|
do_warmup=False,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
],
|
||||||
|
log_action_config=LogActionConfig(),
|
||||||
|
)
|
||||||
|
|
||||||
|
_worker = Worker(worker_config)
|
||||||
|
_backend_ref["backend"] = _worker.backend
|
||||||
|
_worker.run()
|
||||||
@@ -35,7 +35,7 @@ def benchmark_generator() -> dict:
|
|||||||
benchmark_data = {
|
benchmark_data = {
|
||||||
"inputs": prompt,
|
"inputs": prompt,
|
||||||
"parameters": {
|
"parameters": {
|
||||||
"max_new_tokens": 128,
|
"max_new_tokens": 500,
|
||||||
"temperature": 0.7,
|
"temperature": 0.7,
|
||||||
"return_full_text": False
|
"return_full_text": False
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user