Compare commits

..

4 Commits

Author SHA1 Message Date
Lucas Armand 63550d5af3 Actual fix -- MOVED TO TEMPLATE 2025-11-17 10:57:55 -08:00
Lucas Armand 7ec0e11938 add await 2025-11-17 10:46:09 -08:00
Lucas Armand 191fbbfe18 try seek 2025-11-17 10:44:13 -08:00
Lucas Armand 9a4a39c71b Read model log 2025-11-17 10:32:38 -08:00
3 changed files with 29 additions and 95 deletions
+2 -2
View File
@@ -30,7 +30,7 @@ from lib.data_types import (
BenchmarkResult BenchmarkResult
) )
VERSION = "0.2.1" VERSION = "0.2.0"
MSG_HISTORY_LEN = 100 MSG_HISTORY_LEN = 100
log = logging.getLogger(__file__) log = logging.getLogger(__file__)
@@ -431,4 +431,4 @@ class Backend:
if os.path.isfile(self.model_log_file) is True: if os.path.isfile(self.model_log_file) is True:
return await tail_log() return await tail_log()
else: else:
await sleep(1) await asyncio.sleep(1)
+25 -45
View File
@@ -3,58 +3,38 @@ import logging
from typing import List from typing import List
import ssl import ssl
from asyncio import run, gather from asyncio import run, gather
import asyncio
from lib.backend import Backend from lib.backend import Backend
from lib.metrics import Metrics
from aiohttp import web from aiohttp import web
log = logging.getLogger(__file__) log = logging.getLogger(__file__)
def start_server(backend: Backend, routes: List[web.RouteDef], **kwargs): def start_server(backend: Backend, routes: List[web.RouteDef], **kwargs):
try: log.debug("getting certificate...")
log.debug("getting certificate...") use_ssl = os.environ.get("USE_SSL", "false") == "true"
use_ssl = os.environ.get("USE_SSL", "false") == "true" if use_ssl is True:
if use_ssl is True: ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_context.load_cert_chain(
ssl_context.load_cert_chain( certfile="/etc/instance.crt",
certfile="/etc/instance.crt", keyfile="/etc/instance.key",
keyfile="/etc/instance.key", )
) else:
else: ssl_context = None
ssl_context = None
async def main(): async def main():
log.debug("starting server...") log.debug("starting server...")
app = web.Application() app = web.Application()
app.add_routes(routes) app.add_routes(routes)
runner = web.AppRunner(app) runner = web.AppRunner(app)
await runner.setup() await runner.setup()
site = web.TCPSite( site = web.TCPSite(
runner, runner,
ssl_context=ssl_context, ssl_context=ssl_context,
port=int(os.environ["WORKER_PORT"]), port=int(os.environ["WORKER_PORT"]),
**kwargs **kwargs
) )
await gather(site.start(), backend._start_tracking()) await gather(site.start(), backend._start_tracking())
run(main()) run(main())
except Exception as e:
err_msg = f"PyWorker failed to launch: {e}"
log.error(err_msg)
async def beacon():
metrics = Metrics()
metrics._set_version(getattr(backend, "version", "0"))
metrics._set_mtoken(getattr(backend, "mtoken", ""))
try:
while True:
metrics._model_errored(err_msg)
await metrics._Metrics__send_metrics_and_reset()
await asyncio.sleep(10)
finally:
await metrics.aclose()
run(beacon())
+1 -47
View File
@@ -41,14 +41,6 @@ echo_var DEBUG_LOG
echo_var PYWORKER_LOG echo_var PYWORKER_LOG
echo_var MODEL_LOG echo_var MODEL_LOG
# if instance is rebooted, we want to clear out the log file so pyworker doesn't read lines
# from the run prior to reboot. past logs are saved in $MODEL_LOG.old for debugging only
if [ -e "$MODEL_LOG" ]; then
echo "Rotating model log at $MODEL_LOG to $MODEL_LOG.old"
cat "$MODEL_LOG" >> "$MODEL_LOG.old"
: > "$MODEL_LOG"
fi
# Populate /etc/environment with quoted values # Populate /etc/environment with quoted values
if ! grep -q "VAST" /etc/environment; then if ! grep -q "VAST" /etc/environment; then
env -0 | grep -zEv "^(HOME=|SHLVL=)|CONDA" | while IFS= read -r -d '' line; do env -0 | grep -zEv "^(HOME=|SHLVL=)|CONDA" | while IFS= read -r -d '' line; do
@@ -132,43 +124,5 @@ cd "$SERVER_DIR"
echo "launching PyWorker server" echo "launching PyWorker server"
set +e (python3 -m "workers.$BACKEND.server" |& tee -a "$PYWORKER_LOG") &
python3 -m "workers.$BACKEND.server" |& tee -a "$PYWORKER_LOG"
PY_STATUS=${PIPESTATUS[0]}
set -e
if [ "${PY_STATUS}" -ne 0 ]; then
echo "PyWorker exited with status ${PY_STATUS}; notifying autoscaler..."
ERROR_MSG="PyWorker exited: code ${PY_STATUS}"
MTOKEN="${MASTER_TOKEN:-}"
VERSION="${PYWORKER_VERSION:-0}"
IFS=',' read -r -a REPORT_ADDRS <<< "${REPORT_ADDR}"
for addr in "${REPORT_ADDRS[@]}"; do
curl -sS -X POST -H 'Content-Type: application/json' \
-d "$(cat <<JSON
{
"id": ${CONTAINER_ID:-0},
"mtoken": "${MTOKEN}",
"version": "${VERSION}",
"loadtime": 0,
"new_load": 0,
"cur_load": 0,
"rej_load": 0,
"max_perf": 0,
"cur_perf": 0,
"error_msg": "${ERROR_MSG}",
"num_requests_working": 0,
"num_requests_recieved": 0,
"additional_disk_usage": 0,
"working_request_idxs": [],
"cur_capacity": 0,
"max_capacity": 0,
"url": "${URL}"
}
JSON
)" "${addr%/}/worker_status/" || true
done
fi
echo "launching PyWorker server done" echo "launching PyWorker server done"