Compare commits

..

1 Commits

Author SHA1 Message Date
Colter Downing d6eb498ee4 catch the case where all benchmarks fail (sets error) 2025-10-27 12:01:55 -07:00
4 changed files with 68 additions and 53 deletions
+11 -26
View File
@@ -73,9 +73,6 @@ class Backend:
self._total_pubkey_fetch_errors = 0 self._total_pubkey_fetch_errors = 0
self._pubkey = self._fetch_pubkey() self._pubkey = self._fetch_pubkey()
self.__start_healthcheck: bool = False self.__start_healthcheck: bool = False
self._model_tail_start_time = None
self._model_loaded_time = None
self._first_healthcheck_ok = False
@property @property
def pubkey(self) -> Optional[RSA.RsaKey]: def pubkey(self) -> Optional[RSA.RsaKey]:
@@ -107,7 +104,6 @@ class Backend:
#######################################Private####################################### #######################################Private#######################################
def _fetch_pubkey(self): def _fetch_pubkey(self):
t0 = time.time()
command = ["curl", "-X", "GET", "https://run.vast.ai/pubkey/"] command = ["curl", "-X", "GET", "https://run.vast.ai/pubkey/"]
result = subprocess.check_output(command, universal_newlines=True) result = subprocess.check_output(command, universal_newlines=True)
log.debug("public key:") log.debug("public key:")
@@ -124,8 +120,6 @@ class Backend:
self._total_pubkey_fetch_errors += 1 self._total_pubkey_fetch_errors += 1
if self._total_pubkey_fetch_errors >= MAX_PUBKEY_FETCH_ATTEMPTS: if self._total_pubkey_fetch_errors >= MAX_PUBKEY_FETCH_ATTEMPTS:
self.backend_errored("Failed to get autoscaler pubkey") self.backend_errored("Failed to get autoscaler pubkey")
else:
log.debug(f"pubkey fetch+parse took {time.time()-t0:.2f}s")
return key return key
async def __handle_request( async def __handle_request(
@@ -246,10 +240,6 @@ class Backend:
log.debug(f"Performing healthcheck on {health_check_url}") log.debug(f"Performing healthcheck on {health_check_url}")
async with self.healthcheck_session.get(health_check_url) as response: async with self.healthcheck_session.get(health_check_url) as response:
if response.status == 200: if response.status == 200:
if not self._first_healthcheck_ok:
if self._model_loaded_time:
log.debug(f"first healthcheck OK after {time.time()-self._model_loaded_time:.2f}s since ModelLoaded")
self._first_healthcheck_ok = True
log.debug("Healthcheck successful") log.debug("Healthcheck successful")
elif response.status == 503: elif response.status == 503:
log.debug(f"Healthcheck failed with status: {response.status}") log.debug(f"Healthcheck failed with status: {response.status}")
@@ -325,20 +315,17 @@ class Backend:
with open(BENCHMARK_INDICATOR_FILE, "r") as f: with open(BENCHMARK_INDICATOR_FILE, "r") as f:
log.debug("already ran benchmark") log.debug("already ran benchmark")
# trigger model load # trigger model load
# payload = self.benchmark_handler.make_benchmark_payload() payload = self.benchmark_handler.make_benchmark_payload()
# _ = await self.__call_api( _ = await self.__call_api(
# handler=self.benchmark_handler, payload=payload handler=self.benchmark_handler, payload=payload
# ) )
return float(f.readline()) return float(f.readline())
except FileNotFoundError: except FileNotFoundError:
pass pass
log.debug("Initial run to trigger model loading...") log.debug("Initial run to trigger model loading...")
t_bench0 = time.time()
payload = self.benchmark_handler.make_benchmark_payload() payload = self.benchmark_handler.make_benchmark_payload()
await self.__call_api(handler=self.benchmark_handler, payload=payload) await self.__call_api(handler=self.benchmark_handler, payload=payload)
log.debug(f"warmup request took {time.time()-t_bench0:.2f}s")
t_benchmark_loop0 = time.time()
max_throughput = 0 max_throughput = 0
sum_throughput = 0 sum_throughput = 0
@@ -363,6 +350,9 @@ class Backend:
total_workload = sum(br.workload for br in benchmark_requests if br.is_successful) total_workload = sum(br.workload for br in benchmark_requests if br.is_successful)
time_elapsed = time.time() - start time_elapsed = time.time() - start
successful_responses = sum([1 for br in benchmark_requests if br.is_successful]) successful_responses = sum([1 for br in benchmark_requests if br.is_successful])
if successful_responses == 0:
self.backend_errored("No successful responses from benchmark")
log.debug(f"benchmark failed: {successful_responses}/{concurrent_requests} successful responses")
throughput = total_workload / time_elapsed throughput = total_workload / time_elapsed
sum_throughput += throughput sum_throughput += throughput
@@ -386,7 +376,6 @@ class Backend:
log.debug( log.debug(
f"benchmark result: avg {average_throughput} workload per second, max {max_throughput}" f"benchmark result: avg {average_throughput} workload per second, max {max_throughput}"
) )
log.debug(f"benchmark loop took {time.time()-t_benchmark_loop0:.2f}s")
with open(BENCHMARK_INDICATOR_FILE, "w") as f: with open(BENCHMARK_INDICATOR_FILE, "w") as f:
f.write(str(max_throughput)) f.write(str(max_throughput))
return max_throughput return max_throughput
@@ -399,17 +388,14 @@ class Backend:
for action, msg in self.log_actions: for action, msg in self.log_actions:
match action: match action:
case LogAction.ModelLoaded if msg in log_line: case LogAction.ModelLoaded if msg in log_line:
now = time.time() log.debug(
elapsed = now - self._model_tail_start_time f"Got log line indicating model is loaded: {log_line}"
log.debug(f"ModelLoaded observed after {elapsed:.2f}s: {log_line}") )
# some backends need a few seconds after logging successful startup before # some backends need a few seconds after logging successful startup before
# they can begin accepting requests # they can begin accepting requests
# await sleep(5) await sleep(5)
try: try:
t_bench0 = time.time()
max_throughput = await run_benchmark() max_throughput = await run_benchmark()
self._model_loaded_time = time.time()
log.debug(f"benchmark total took {self._model_loaded_time - t_bench0:.2f}s")
self.__start_healthcheck = True self.__start_healthcheck = True
self.metrics._model_loaded( self.metrics._model_loaded(
max_throughput=max_throughput, max_throughput=max_throughput,
@@ -428,7 +414,6 @@ class Backend:
async def tail_log(): async def tail_log():
log.debug(f"tailing file: {self.model_log_file}") log.debug(f"tailing file: {self.model_log_file}")
self._model_tail_start_time = time.time()
async with await open_file(self.model_log_file) as f: async with await open_file(self.model_log_file) as f:
while True: while True:
line = await f.readline() line = await f.readline()
+1 -1
View File
@@ -257,7 +257,7 @@ class ModelMetrics:
def wait_time(self) -> float: def wait_time(self) -> float:
if (len(self.requests_working) == 0): if (len(self.requests_working) == 0):
return 0.0 return 0.0
return sum([request.workload for request in self.requests_working.values()]) / self.max_throughput return sum([request.workload for request in self.requests_working.values()]) / max(self.max_throughput, 0.00001)
@property @property
def cur_load(self) -> float: def cur_load(self) -> float:
+55 -22
View File
@@ -2,9 +2,6 @@
set -e -o pipefail set -e -o pipefail
log() { echo "$(date +'%Y-%m-%d %H:%M:%S') $*"; }
step(){ _t0=$(date +%s); eval "$1"; _dt=$(($(date +%s)-_t0)); log "$2 took ${_dt}s"; }
WORKSPACE_DIR="${WORKSPACE_DIR:-/workspace}" WORKSPACE_DIR="${WORKSPACE_DIR:-/workspace}"
SERVER_DIR="$WORKSPACE_DIR/vast-pyworker" SERVER_DIR="$WORKSPACE_DIR/vast-pyworker"
@@ -44,28 +41,33 @@ echo_var DEBUG_LOG
echo_var PYWORKER_LOG echo_var PYWORKER_LOG
echo_var MODEL_LOG echo_var MODEL_LOG
# # Populate /etc/environment with quoted values # Populate /etc/environment with quoted values
# if ! grep -q "VAST" /etc/environment; then if ! grep -q "VAST" /etc/environment; then
# env -0 | grep -zEv "^(HOME=|SHLVL=)|CONDA" | while IFS= read -r -d '' line; do env -0 | grep -zEv "^(HOME=|SHLVL=)|CONDA" | while IFS= read -r -d '' line; do
# name=${line%%=*} name=${line%%=*}
# value=${line#*=} value=${line#*=}
# printf '%s="%s"\n' "$name" "$value" printf '%s="%s"\n' "$name" "$value"
# done > /etc/environment done > /etc/environment
# fi fi
if [ ! -d "$ENV_PATH" ] if [ ! -d "$ENV_PATH" ]
then then
echo "setting up venv" echo "setting up venv"
step 'if ! which uv; then curl -LsSf https://astral.sh/uv/install.sh | sh; source ~/.local/bin/env; fi' "uv install" if ! which uv; then
curl -LsSf https://astral.sh/uv/install.sh | sh
source ~/.local/bin/env
fi
# Fork testing # Fork testing
step '[[ ! -d $SERVER_DIR ]] && git clone "${PYWORKER_REPO:-https://github.com/vast-ai/pyworker}" "$SERVER_DIR"' "git clone" [[ ! -d $SERVER_DIR ]] && git clone "${PYWORKER_REPO:-https://github.com/vast-ai/pyworker}" "$SERVER_DIR"
step 'if [[ -n ${PYWORKER_REF:-} ]]; then (cd "$SERVER_DIR" && git checkout "$PYWORKER_REF"); fi' "git checkout" if [[ -n ${PYWORKER_REF:-} ]]; then
(cd "$SERVER_DIR" && git checkout "$PYWORKER_REF")
fi
uv venv --python-preference only-managed "$ENV_PATH" -p 3.10
source "$ENV_PATH/bin/activate"
step 'uv venv --python-preference only-managed "$ENV_PATH" -p 3.10' "venv create" uv pip install -r "${SERVER_DIR}/requirements.txt"
step 'source "$ENV_PATH/bin/activate"' "venv activate"
step 'uv pip install -r "${SERVER_DIR}/requirements.txt"' "pip install requirements"
touch ~/.no_auto_tmux touch ~/.no_auto_tmux
else else
@@ -78,8 +80,39 @@ fi
[ ! -d "$SERVER_DIR/workers/$BACKEND" ] && echo "$BACKEND not supported!" && exit 1 [ ! -d "$SERVER_DIR/workers/$BACKEND" ] && echo "$BACKEND not supported!" && exit 1
if [ "$USE_SSL" = true ]; then if [ "$USE_SSL" = true ]; then
step 'openssl req -newkey rsa:2048 -subj "/C=US/ST=CA/CN=pyworker.vast.ai/" -nodes -sha256 -keyout /etc/instance.key -out /etc/instance.csr -config /etc/openssl-san.cnf' "openssl csr"
step 'curl --header "Content-Type: application/octet-stream" --data-binary @//etc/instance.csr -X POST "https://console.vast.ai/api/v0/sign_cert/?instance_id=$CONTAINER_ID" > /etc/instance.crt' "sign cert" cat << EOF > /etc/openssl-san.cnf
[req]
default_bits = 2048
distinguished_name = req_distinguished_name
req_extensions = v3_req
[req_distinguished_name]
countryName = US
stateOrProvinceName = CA
organizationName = Vast.ai Inc.
commonName = vast.ai
[v3_req]
basicConstraints = CA:FALSE
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
subjectAltName = @alt_names
[alt_names]
IP.1 = 0.0.0.0
EOF
openssl req -newkey rsa:2048 -subj "/C=US/ST=CA/CN=pyworker.vast.ai/" \
-nodes \
-sha256 \
-keyout /etc/instance.key \
-out /etc/instance.csr \
-config /etc/openssl-san.cnf
curl --header 'Content-Type: application/octet-stream' \
--data-binary @//etc/instance.csr \
-X \
POST "https://console.vast.ai/api/v0/sign_cert/?instance_id=$CONTAINER_ID" > /etc/instance.crt;
fi fi
@@ -89,11 +122,11 @@ export REPORT_ADDR WORKER_PORT USE_SSL UNSECURED
cd "$SERVER_DIR" cd "$SERVER_DIR"
log "launching PyWorker server" echo "launching PyWorker server"
# if instance is rebooted, we want to clear out the log file so pyworker doesn't read lines # if instance is rebooted, we want to clear out the log file so pyworker doesn't read lines
# from the run prior to reboot. past logs are saved in $MODEL_LOG.old for debugging only # from the run prior to reboot. past logs are saved in $MODEL_LOG.old for debugging only
[ -e "$MODEL_LOG" ] && cat "$MODEL_LOG" >> "$MODEL_LOG.old" && : > "$MODEL_LOG" [ -e "$MODEL_LOG" ] && cat "$MODEL_LOG" >> "$MODEL_LOG.old" && : > "$MODEL_LOG"
_t0=$(date +%s); (python3 -m "workers.$BACKEND.server" |& tee -a "$PYWORKER_LOG") & _dt=$(($(date +%s)-_t0)); log "PyWorker spawn took ${_dt}s" (python3 -m "workers.$BACKEND.server" |& tee -a "$PYWORKER_LOG") &
log "launching PyWorker server done" echo "launching PyWorker server done"
+1 -4
View File
@@ -6,13 +6,10 @@ from typing import Union, Type, Dict, Any, Optional
from aiohttp import web, ClientResponse from aiohttp import web, ClientResponse
import nltk import nltk
import logging import logging
import time
log = logging.getLogger(__name__)
t0 = time.time()
nltk.download("words") nltk.download("words")
WORD_LIST = nltk.corpus.words.words() WORD_LIST = nltk.corpus.words.words()
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} NLTK words download+load took {time.time()-t0:.2f}s") log = logging.getLogger(__name__)
""" """
Generic dataclass accepts any dictionary in input. Generic dataclass accepts any dictionary in input.