Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| fd9d56e576 | |||
| 8d9ffb3a6c | |||
| 5d5bc197d7 |
+26
-8
@@ -73,6 +73,9 @@ class Backend:
|
|||||||
self._total_pubkey_fetch_errors = 0
|
self._total_pubkey_fetch_errors = 0
|
||||||
self._pubkey = self._fetch_pubkey()
|
self._pubkey = self._fetch_pubkey()
|
||||||
self.__start_healthcheck: bool = False
|
self.__start_healthcheck: bool = False
|
||||||
|
self._model_tail_start_time = None
|
||||||
|
self._model_loaded_time = None
|
||||||
|
self._first_healthcheck_ok = False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def pubkey(self) -> Optional[RSA.RsaKey]:
|
def pubkey(self) -> Optional[RSA.RsaKey]:
|
||||||
@@ -104,6 +107,7 @@ class Backend:
|
|||||||
|
|
||||||
#######################################Private#######################################
|
#######################################Private#######################################
|
||||||
def _fetch_pubkey(self):
|
def _fetch_pubkey(self):
|
||||||
|
t0 = time.time()
|
||||||
command = ["curl", "-X", "GET", "https://run.vast.ai/pubkey/"]
|
command = ["curl", "-X", "GET", "https://run.vast.ai/pubkey/"]
|
||||||
result = subprocess.check_output(command, universal_newlines=True)
|
result = subprocess.check_output(command, universal_newlines=True)
|
||||||
log.debug("public key:")
|
log.debug("public key:")
|
||||||
@@ -120,6 +124,8 @@ class Backend:
|
|||||||
self._total_pubkey_fetch_errors += 1
|
self._total_pubkey_fetch_errors += 1
|
||||||
if self._total_pubkey_fetch_errors >= MAX_PUBKEY_FETCH_ATTEMPTS:
|
if self._total_pubkey_fetch_errors >= MAX_PUBKEY_FETCH_ATTEMPTS:
|
||||||
self.backend_errored("Failed to get autoscaler pubkey")
|
self.backend_errored("Failed to get autoscaler pubkey")
|
||||||
|
else:
|
||||||
|
log.debug(f"pubkey fetch+parse took {time.time()-t0:.2f}s")
|
||||||
return key
|
return key
|
||||||
|
|
||||||
async def __handle_request(
|
async def __handle_request(
|
||||||
@@ -240,6 +246,10 @@ class Backend:
|
|||||||
log.debug(f"Performing healthcheck on {health_check_url}")
|
log.debug(f"Performing healthcheck on {health_check_url}")
|
||||||
async with self.healthcheck_session.get(health_check_url) as response:
|
async with self.healthcheck_session.get(health_check_url) as response:
|
||||||
if response.status == 200:
|
if response.status == 200:
|
||||||
|
if not self._first_healthcheck_ok:
|
||||||
|
if self._model_loaded_time:
|
||||||
|
log.debug(f"first healthcheck OK after {time.time()-self._model_loaded_time:.2f}s since ModelLoaded")
|
||||||
|
self._first_healthcheck_ok = True
|
||||||
log.debug("Healthcheck successful")
|
log.debug("Healthcheck successful")
|
||||||
elif response.status == 503:
|
elif response.status == 503:
|
||||||
log.debug(f"Healthcheck failed with status: {response.status}")
|
log.debug(f"Healthcheck failed with status: {response.status}")
|
||||||
@@ -315,17 +325,20 @@ class Backend:
|
|||||||
with open(BENCHMARK_INDICATOR_FILE, "r") as f:
|
with open(BENCHMARK_INDICATOR_FILE, "r") as f:
|
||||||
log.debug("already ran benchmark")
|
log.debug("already ran benchmark")
|
||||||
# trigger model load
|
# trigger model load
|
||||||
payload = self.benchmark_handler.make_benchmark_payload()
|
# payload = self.benchmark_handler.make_benchmark_payload()
|
||||||
_ = await self.__call_api(
|
# _ = await self.__call_api(
|
||||||
handler=self.benchmark_handler, payload=payload
|
# handler=self.benchmark_handler, payload=payload
|
||||||
)
|
# )
|
||||||
return float(f.readline())
|
return float(f.readline())
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
log.debug("Initial run to trigger model loading...")
|
log.debug("Initial run to trigger model loading...")
|
||||||
|
t_bench0 = time.time()
|
||||||
payload = self.benchmark_handler.make_benchmark_payload()
|
payload = self.benchmark_handler.make_benchmark_payload()
|
||||||
await self.__call_api(handler=self.benchmark_handler, payload=payload)
|
await self.__call_api(handler=self.benchmark_handler, payload=payload)
|
||||||
|
log.debug(f"warmup request took {time.time()-t_bench0:.2f}s")
|
||||||
|
t_benchmark_loop0 = time.time()
|
||||||
|
|
||||||
max_throughput = 0
|
max_throughput = 0
|
||||||
sum_throughput = 0
|
sum_throughput = 0
|
||||||
@@ -373,6 +386,7 @@ class Backend:
|
|||||||
log.debug(
|
log.debug(
|
||||||
f"benchmark result: avg {average_throughput} workload per second, max {max_throughput}"
|
f"benchmark result: avg {average_throughput} workload per second, max {max_throughput}"
|
||||||
)
|
)
|
||||||
|
log.debug(f"benchmark loop took {time.time()-t_benchmark_loop0:.2f}s")
|
||||||
with open(BENCHMARK_INDICATOR_FILE, "w") as f:
|
with open(BENCHMARK_INDICATOR_FILE, "w") as f:
|
||||||
f.write(str(max_throughput))
|
f.write(str(max_throughput))
|
||||||
return max_throughput
|
return max_throughput
|
||||||
@@ -385,14 +399,17 @@ class Backend:
|
|||||||
for action, msg in self.log_actions:
|
for action, msg in self.log_actions:
|
||||||
match action:
|
match action:
|
||||||
case LogAction.ModelLoaded if msg in log_line:
|
case LogAction.ModelLoaded if msg in log_line:
|
||||||
log.debug(
|
now = time.time()
|
||||||
f"Got log line indicating model is loaded: {log_line}"
|
elapsed = now - self._model_tail_start_time
|
||||||
)
|
log.debug(f"ModelLoaded observed after {elapsed:.2f}s: {log_line}")
|
||||||
# some backends need a few seconds after logging successful startup before
|
# some backends need a few seconds after logging successful startup before
|
||||||
# they can begin accepting requests
|
# they can begin accepting requests
|
||||||
await sleep(5)
|
# await sleep(5)
|
||||||
try:
|
try:
|
||||||
|
t_bench0 = time.time()
|
||||||
max_throughput = await run_benchmark()
|
max_throughput = await run_benchmark()
|
||||||
|
self._model_loaded_time = time.time()
|
||||||
|
log.debug(f"benchmark total took {self._model_loaded_time - t_bench0:.2f}s")
|
||||||
self.__start_healthcheck = True
|
self.__start_healthcheck = True
|
||||||
self.metrics._model_loaded(
|
self.metrics._model_loaded(
|
||||||
max_throughput=max_throughput,
|
max_throughput=max_throughput,
|
||||||
@@ -411,6 +428,7 @@ class Backend:
|
|||||||
|
|
||||||
async def tail_log():
|
async def tail_log():
|
||||||
log.debug(f"tailing file: {self.model_log_file}")
|
log.debug(f"tailing file: {self.model_log_file}")
|
||||||
|
self._model_tail_start_time = time.time()
|
||||||
async with await open_file(self.model_log_file) as f:
|
async with await open_file(self.model_log_file) as f:
|
||||||
while True:
|
while True:
|
||||||
line = await f.readline()
|
line = await f.readline()
|
||||||
|
|||||||
+22
-55
@@ -2,6 +2,9 @@
|
|||||||
|
|
||||||
set -e -o pipefail
|
set -e -o pipefail
|
||||||
|
|
||||||
|
log() { echo "$(date +'%Y-%m-%d %H:%M:%S') $*"; }
|
||||||
|
step(){ _t0=$(date +%s); eval "$1"; _dt=$(($(date +%s)-_t0)); log "$2 took ${_dt}s"; }
|
||||||
|
|
||||||
WORKSPACE_DIR="${WORKSPACE_DIR:-/workspace}"
|
WORKSPACE_DIR="${WORKSPACE_DIR:-/workspace}"
|
||||||
|
|
||||||
SERVER_DIR="$WORKSPACE_DIR/vast-pyworker"
|
SERVER_DIR="$WORKSPACE_DIR/vast-pyworker"
|
||||||
@@ -41,33 +44,28 @@ echo_var DEBUG_LOG
|
|||||||
echo_var PYWORKER_LOG
|
echo_var PYWORKER_LOG
|
||||||
echo_var MODEL_LOG
|
echo_var MODEL_LOG
|
||||||
|
|
||||||
# Populate /etc/environment with quoted values
|
# # Populate /etc/environment with quoted values
|
||||||
if ! grep -q "VAST" /etc/environment; then
|
# if ! grep -q "VAST" /etc/environment; then
|
||||||
env -0 | grep -zEv "^(HOME=|SHLVL=)|CONDA" | while IFS= read -r -d '' line; do
|
# env -0 | grep -zEv "^(HOME=|SHLVL=)|CONDA" | while IFS= read -r -d '' line; do
|
||||||
name=${line%%=*}
|
# name=${line%%=*}
|
||||||
value=${line#*=}
|
# value=${line#*=}
|
||||||
printf '%s="%s"\n' "$name" "$value"
|
# printf '%s="%s"\n' "$name" "$value"
|
||||||
done > /etc/environment
|
# done > /etc/environment
|
||||||
fi
|
# fi
|
||||||
|
|
||||||
if [ ! -d "$ENV_PATH" ]
|
if [ ! -d "$ENV_PATH" ]
|
||||||
then
|
then
|
||||||
echo "setting up venv"
|
echo "setting up venv"
|
||||||
if ! which uv; then
|
step 'if ! which uv; then curl -LsSf https://astral.sh/uv/install.sh | sh; source ~/.local/bin/env; fi' "uv install"
|
||||||
curl -LsSf https://astral.sh/uv/install.sh | sh
|
|
||||||
source ~/.local/bin/env
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Fork testing
|
# Fork testing
|
||||||
[[ ! -d $SERVER_DIR ]] && git clone "${PYWORKER_REPO:-https://github.com/vast-ai/pyworker}" "$SERVER_DIR"
|
step '[[ ! -d $SERVER_DIR ]] && git clone "${PYWORKER_REPO:-https://github.com/vast-ai/pyworker}" "$SERVER_DIR"' "git clone"
|
||||||
if [[ -n ${PYWORKER_REF:-} ]]; then
|
step 'if [[ -n ${PYWORKER_REF:-} ]]; then (cd "$SERVER_DIR" && git checkout "$PYWORKER_REF"); fi' "git checkout"
|
||||||
(cd "$SERVER_DIR" && git checkout "$PYWORKER_REF")
|
|
||||||
fi
|
|
||||||
|
|
||||||
uv venv --python-preference only-managed "$ENV_PATH" -p 3.10
|
|
||||||
source "$ENV_PATH/bin/activate"
|
|
||||||
|
|
||||||
uv pip install -r "${SERVER_DIR}/requirements.txt"
|
step 'uv venv --python-preference only-managed "$ENV_PATH" -p 3.10' "venv create"
|
||||||
|
step 'source "$ENV_PATH/bin/activate"' "venv activate"
|
||||||
|
step 'uv pip install -r "${SERVER_DIR}/requirements.txt"' "pip install requirements"
|
||||||
|
|
||||||
touch ~/.no_auto_tmux
|
touch ~/.no_auto_tmux
|
||||||
else
|
else
|
||||||
@@ -80,39 +78,8 @@ fi
|
|||||||
[ ! -d "$SERVER_DIR/workers/$BACKEND" ] && echo "$BACKEND not supported!" && exit 1
|
[ ! -d "$SERVER_DIR/workers/$BACKEND" ] && echo "$BACKEND not supported!" && exit 1
|
||||||
|
|
||||||
if [ "$USE_SSL" = true ]; then
|
if [ "$USE_SSL" = true ]; then
|
||||||
|
step 'openssl req -newkey rsa:2048 -subj "/C=US/ST=CA/CN=pyworker.vast.ai/" -nodes -sha256 -keyout /etc/instance.key -out /etc/instance.csr -config /etc/openssl-san.cnf' "openssl csr"
|
||||||
cat << EOF > /etc/openssl-san.cnf
|
step 'curl --header "Content-Type: application/octet-stream" --data-binary @//etc/instance.csr -X POST "https://console.vast.ai/api/v0/sign_cert/?instance_id=$CONTAINER_ID" > /etc/instance.crt' "sign cert"
|
||||||
[req]
|
|
||||||
default_bits = 2048
|
|
||||||
distinguished_name = req_distinguished_name
|
|
||||||
req_extensions = v3_req
|
|
||||||
|
|
||||||
[req_distinguished_name]
|
|
||||||
countryName = US
|
|
||||||
stateOrProvinceName = CA
|
|
||||||
organizationName = Vast.ai Inc.
|
|
||||||
commonName = vast.ai
|
|
||||||
|
|
||||||
[v3_req]
|
|
||||||
basicConstraints = CA:FALSE
|
|
||||||
keyUsage = nonRepudiation, digitalSignature, keyEncipherment
|
|
||||||
subjectAltName = @alt_names
|
|
||||||
|
|
||||||
[alt_names]
|
|
||||||
IP.1 = 0.0.0.0
|
|
||||||
EOF
|
|
||||||
|
|
||||||
openssl req -newkey rsa:2048 -subj "/C=US/ST=CA/CN=pyworker.vast.ai/" \
|
|
||||||
-nodes \
|
|
||||||
-sha256 \
|
|
||||||
-keyout /etc/instance.key \
|
|
||||||
-out /etc/instance.csr \
|
|
||||||
-config /etc/openssl-san.cnf
|
|
||||||
|
|
||||||
curl --header 'Content-Type: application/octet-stream' \
|
|
||||||
--data-binary @//etc/instance.csr \
|
|
||||||
-X \
|
|
||||||
POST "https://console.vast.ai/api/v0/sign_cert/?instance_id=$CONTAINER_ID" > /etc/instance.crt;
|
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
|
||||||
@@ -122,11 +89,11 @@ export REPORT_ADDR WORKER_PORT USE_SSL UNSECURED
|
|||||||
|
|
||||||
cd "$SERVER_DIR"
|
cd "$SERVER_DIR"
|
||||||
|
|
||||||
echo "launching PyWorker server"
|
log "launching PyWorker server"
|
||||||
|
|
||||||
# if instance is rebooted, we want to clear out the log file so pyworker doesn't read lines
|
# if instance is rebooted, we want to clear out the log file so pyworker doesn't read lines
|
||||||
# from the run prior to reboot. past logs are saved in $MODEL_LOG.old for debugging only
|
# from the run prior to reboot. past logs are saved in $MODEL_LOG.old for debugging only
|
||||||
[ -e "$MODEL_LOG" ] && cat "$MODEL_LOG" >> "$MODEL_LOG.old" && : > "$MODEL_LOG"
|
[ -e "$MODEL_LOG" ] && cat "$MODEL_LOG" >> "$MODEL_LOG.old" && : > "$MODEL_LOG"
|
||||||
|
|
||||||
(python3 -m "workers.$BACKEND.server" |& tee -a "$PYWORKER_LOG") &
|
_t0=$(date +%s); (python3 -m "workers.$BACKEND.server" |& tee -a "$PYWORKER_LOG") & _dt=$(($(date +%s)-_t0)); log "PyWorker spawn took ${_dt}s"
|
||||||
echo "launching PyWorker server done"
|
log "launching PyWorker server done"
|
||||||
|
|||||||
@@ -6,10 +6,13 @@ from typing import Union, Type, Dict, Any, Optional
|
|||||||
from aiohttp import web, ClientResponse
|
from aiohttp import web, ClientResponse
|
||||||
import nltk
|
import nltk
|
||||||
import logging
|
import logging
|
||||||
|
import time
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
t0 = time.time()
|
||||||
nltk.download("words")
|
nltk.download("words")
|
||||||
WORD_LIST = nltk.corpus.words.words()
|
WORD_LIST = nltk.corpus.words.words()
|
||||||
log = logging.getLogger(__name__)
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S')} NLTK words download+load took {time.time()-t0:.2f}s")
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Generic dataclass accepts any dictionary in input.
|
Generic dataclass accepts any dictionary in input.
|
||||||
|
|||||||
Reference in New Issue
Block a user