+327
@@ -0,0 +1,327 @@
|
||||
import os
|
||||
import json
|
||||
import time
|
||||
import base64
|
||||
import subprocess
|
||||
import dataclasses
|
||||
import logging
|
||||
from asyncio import wait, sleep, gather, Semaphore, FIRST_COMPLETED, create_task
|
||||
from typing import Tuple, Awaitable, NoReturn, List, Union, Callable
|
||||
from functools import cached_property
|
||||
|
||||
from anyio import open_file
|
||||
from aiohttp import web, ClientResponse, ClientSession, ClientConnectorError
|
||||
|
||||
import requests
|
||||
from Crypto.Signature import pkcs1_15
|
||||
from Crypto.Hash import SHA256
|
||||
from Crypto.PublicKey import RSA
|
||||
|
||||
from lib.metrics import Metrics
|
||||
from lib.data_types import (
|
||||
AuthData,
|
||||
EndpointHandler,
|
||||
LogAction,
|
||||
ApiPayload_T,
|
||||
JsonDataException,
|
||||
)
|
||||
|
||||
MSG_HISTORY_LEN = 100
|
||||
log = logging.getLogger(__file__)
|
||||
|
||||
# defines the minimum wait time between sending updates to autoscaler
|
||||
LOG_POLL_INTERVAL = 0.1
|
||||
BENCHMARK_INDICATOR_FILE = ".has_benchmark"
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class Backend:
|
||||
"""
|
||||
This class is responsible for:
|
||||
1. Tailing logs and updating load time metrics
|
||||
2. Taking an EndpointHandler alongside incoming payload, preparing a json to be sent to the model, and
|
||||
sending the request. It also updates metrics as it makes those requests.
|
||||
3. Running a benchmark from an EndpointHandler
|
||||
"""
|
||||
|
||||
model_server_url: str
|
||||
model_log_file: str
|
||||
allow_parallel_requests: bool
|
||||
benchmark_handler: (
|
||||
EndpointHandler # this endpoint handler will be used for benchmarking
|
||||
)
|
||||
log_actions: List[Tuple[LogAction, str]]
|
||||
reqnum = -1
|
||||
msg_history = []
|
||||
sem: Semaphore = dataclasses.field(default_factory=Semaphore)
|
||||
|
||||
def __post_init__(self):
|
||||
|
||||
def fetch_public_key():
|
||||
command = ["curl", "-X", "GET", "https://run.vast.ai/pubkey/"]
|
||||
result = subprocess.check_output(command, universal_newlines=True)
|
||||
log.debug("public key:")
|
||||
log.debug(result)
|
||||
key = None
|
||||
for _ in range(5):
|
||||
try:
|
||||
key = RSA.import_key(result)
|
||||
break
|
||||
except ValueError as e:
|
||||
log.debug(f"Error downloading key: {e}")
|
||||
time.sleep(15)
|
||||
return key
|
||||
|
||||
###########
|
||||
|
||||
self.PUBLIC_KEY = fetch_public_key()
|
||||
self.metrics = Metrics()
|
||||
|
||||
@cached_property
|
||||
def session(self):
|
||||
log.debug(f"starting session with {self.model_server_url}")
|
||||
return ClientSession(self.model_server_url)
|
||||
|
||||
def create_handler(
|
||||
self,
|
||||
handler: EndpointHandler[ApiPayload_T],
|
||||
) -> Callable[[web.Request], Awaitable[Union[web.Response, web.StreamResponse]]]:
|
||||
async def handler_fn(
|
||||
request: web.Request,
|
||||
) -> Union[web.Response, web.StreamResponse]:
|
||||
return await self.__handle_request(handler=handler, request=request)
|
||||
|
||||
return handler_fn
|
||||
|
||||
#######################################Private#######################################
|
||||
async def __handle_request(
|
||||
self,
|
||||
handler: EndpointHandler[ApiPayload_T],
|
||||
request: web.Request,
|
||||
) -> Union[web.Response, web.StreamResponse]:
|
||||
"""use this function to forward requests to the model endpoint"""
|
||||
try:
|
||||
data = await request.json()
|
||||
auth_data, payload = handler.get_data_from_request(data)
|
||||
except JsonDataException as e:
|
||||
return web.json_response(data=e.message, status=422)
|
||||
except json.JSONDecodeError:
|
||||
return web.json_response(dict(error="invalid JSON"), status=422)
|
||||
workload = payload.count_workload()
|
||||
|
||||
async def cancel_api_call_if_disconnected() -> web.Response:
|
||||
await request.wait_for_disconnection()
|
||||
log.debug(f"request with reqnum: {auth_data.reqnum} was canceled")
|
||||
self.metrics._request_canceled(workload=workload, reqnum=auth_data.reqnum)
|
||||
return web.Response(status=500)
|
||||
|
||||
async def make_request() -> Union[web.Response, web.StreamResponse]:
|
||||
log.debug(f"got request, {auth_data.reqnum}")
|
||||
self.metrics._request_start(workload=workload, reqnum=auth_data.reqnum)
|
||||
if self.allow_parallel_requests is False:
|
||||
log.debug(f"Waiting to aquire Sem for reqnum:{auth_data.reqnum}")
|
||||
await self.sem.acquire()
|
||||
log.debug(
|
||||
f"Sem acquired for reqnum:{auth_data.reqnum}, starting request..."
|
||||
)
|
||||
else:
|
||||
log.debug(f"Starting request for reqnum:{auth_data.reqnum}")
|
||||
try:
|
||||
start_time = time.time()
|
||||
response = await self.__call_api(handler=handler, payload=payload)
|
||||
status_code = response.status
|
||||
log.debug(
|
||||
" ".join(
|
||||
[
|
||||
f"request with reqnum:{auth_data.reqnum}",
|
||||
f"returned status code: {status_code},",
|
||||
]
|
||||
)
|
||||
)
|
||||
res = await handler.generate_client_response(request, response)
|
||||
self.metrics._request_end(
|
||||
workload=workload,
|
||||
req_response_time=time.time() - start_time,
|
||||
reqnum=auth_data.reqnum,
|
||||
)
|
||||
return res
|
||||
except requests.exceptions.RequestException as e:
|
||||
log.debug(f"[backend] Request error: {e}")
|
||||
self.metrics._request_errored(
|
||||
workload=workload, reqnum=auth_data.reqnum
|
||||
)
|
||||
return web.Response(status=500)
|
||||
finally:
|
||||
self.sem.release()
|
||||
|
||||
###########
|
||||
|
||||
if self.__check_signature(auth_data) is False:
|
||||
return web.Response(status=401)
|
||||
|
||||
try:
|
||||
done, pending = await wait(
|
||||
[
|
||||
create_task(make_request()),
|
||||
create_task(cancel_api_call_if_disconnected()),
|
||||
],
|
||||
return_when=FIRST_COMPLETED,
|
||||
)
|
||||
[task.cancel() for task in pending]
|
||||
return done.pop().result()
|
||||
except Exception as e:
|
||||
log.debug(f"Exception in main handler loop {e}")
|
||||
return web.Response(status=500)
|
||||
|
||||
async def _start_tracking(self) -> None:
|
||||
await gather(self.__read_logs(), self.metrics._send_metrics_loop())
|
||||
|
||||
def backend_errored(self, msg: str) -> None:
|
||||
self.metrics._model_errored(msg)
|
||||
|
||||
async def __call_api(
|
||||
self, handler: EndpointHandler[ApiPayload_T], payload: ApiPayload_T
|
||||
) -> ClientResponse:
|
||||
api_payload = payload.generate_payload_json()
|
||||
log.debug(f"posting to endpoint: '{handler.endpoint}', payload: {api_payload}")
|
||||
return await self.session.post(url=handler.endpoint, json=api_payload)
|
||||
|
||||
def __check_signature(self, auth_data: AuthData) -> bool:
|
||||
def verify_signature(message, signature):
|
||||
if self.PUBLIC_KEY is None:
|
||||
log.debug(f"No Public Key!")
|
||||
return False
|
||||
|
||||
h = SHA256.new(message.encode())
|
||||
try:
|
||||
pkcs1_15.new(self.PUBLIC_KEY).verify(h, base64.b64decode(signature))
|
||||
return True
|
||||
except (ValueError, TypeError):
|
||||
return False
|
||||
|
||||
message = {
|
||||
key: value
|
||||
for (key, value) in (dataclasses.asdict(auth_data).items())
|
||||
if key != "signature"
|
||||
}
|
||||
if auth_data.reqnum < (self.reqnum - MSG_HISTORY_LEN):
|
||||
log.debug(
|
||||
f"reqnum failure, got {auth_data.reqnum}, current_reqnum: {self.reqnum}"
|
||||
)
|
||||
return False
|
||||
elif message in self.msg_history:
|
||||
log.debug(f"message: {message} already in message history")
|
||||
return False
|
||||
elif verify_signature(json.dumps(message, indent=4), auth_data.signature):
|
||||
self.reqnum = max(auth_data.reqnum, self.reqnum)
|
||||
self.msg_history.append(message)
|
||||
self.msg_history = self.msg_history[-MSG_HISTORY_LEN:]
|
||||
return True
|
||||
else:
|
||||
log.debug(
|
||||
f"signature verification failed, sig:{auth_data.signature}, message: {message}"
|
||||
)
|
||||
return False
|
||||
|
||||
async def __read_logs(self) -> Awaitable[NoReturn]:
|
||||
|
||||
async def run_benchmark() -> float:
|
||||
log.debug("starting benchmark")
|
||||
try:
|
||||
with open(BENCHMARK_INDICATOR_FILE, "r") as f:
|
||||
log.debug("already ran benchmark")
|
||||
# trigger model load
|
||||
payload = self.benchmark_handler.make_benchmark_payload()
|
||||
_ = await self.__call_api(
|
||||
handler=self.benchmark_handler, payload=payload
|
||||
)
|
||||
return float(f.readline())
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
max_throughput = 0
|
||||
last_throughput = 0
|
||||
sum_throughput = 0
|
||||
for run in range(self.benchmark_handler.benchmark_runs + 1):
|
||||
start = time.time()
|
||||
payload = self.benchmark_handler.make_benchmark_payload()
|
||||
res = await self.__call_api(
|
||||
handler=self.benchmark_handler, payload=payload
|
||||
)
|
||||
data = await res.json()
|
||||
time_elapsed = time.time() - start
|
||||
# first run triggers one-time loading of the model which is very slow, so we skip counting it
|
||||
if run == 0:
|
||||
continue
|
||||
else:
|
||||
workload = payload.count_workload()
|
||||
last_throughput = workload / time_elapsed
|
||||
sum_throughput += last_throughput
|
||||
max_throughput = max(max_throughput, last_throughput)
|
||||
log.debug(
|
||||
"\n".join(
|
||||
[
|
||||
"#" * 60,
|
||||
f"Run: {run}, workload: {workload} time_elapsed: {time_elapsed}, throughput: {last_throughput}",
|
||||
"",
|
||||
f"response: {data}",
|
||||
"#" * 60,
|
||||
]
|
||||
)
|
||||
)
|
||||
average_throughput = sum_throughput / self.benchmark_handler.benchmark_runs
|
||||
log.debug(
|
||||
f"benchmark result: avg {average_throughput} workload per second, max {max_throughput}"
|
||||
)
|
||||
# save max_throughput so we don't have to run benchmark again on restart of cold instances
|
||||
with open(BENCHMARK_INDICATOR_FILE, "w") as f:
|
||||
f.write(str(max_throughput))
|
||||
return max_throughput
|
||||
|
||||
async def handle_log_line(log_line: str) -> None:
|
||||
"""
|
||||
Implement this function to handle each log line for your model.
|
||||
This function should mutate self.system_metrics and self.model_metrics
|
||||
"""
|
||||
for action, msg in self.log_actions:
|
||||
match action:
|
||||
case LogAction.ModelLoaded if msg in log_line:
|
||||
log.debug(
|
||||
f"Got log line indicating model is loaded: {log_line}"
|
||||
)
|
||||
# some backends need a few seconds after logging successful startup before
|
||||
# they can begin accepting requests
|
||||
await sleep(5)
|
||||
try:
|
||||
max_throughput = await run_benchmark()
|
||||
self.metrics._model_loaded(
|
||||
max_throughput=max_throughput,
|
||||
)
|
||||
except ClientConnectorError as e:
|
||||
log.debug(
|
||||
f"failed to connect to comfyui api during benchmark"
|
||||
)
|
||||
self.backend_errored(str(e))
|
||||
case LogAction.ModelError if msg in log_line:
|
||||
log.debug(f"Got log line indicating error: {log_line}")
|
||||
self.backend_errored(msg)
|
||||
break
|
||||
case LogAction.Info if msg in log_line:
|
||||
log.debug(f"Info from model logs: {log_line}")
|
||||
|
||||
async def tail_log():
|
||||
log.debug(f"tailing file: {self.model_log_file}")
|
||||
async with await open_file(self.model_log_file) as f:
|
||||
while True:
|
||||
line = await f.readline()
|
||||
if line:
|
||||
await handle_log_line(line.rstrip())
|
||||
else:
|
||||
time.sleep(LOG_POLL_INTERVAL)
|
||||
|
||||
###########
|
||||
|
||||
while True:
|
||||
if os.path.isfile(self.model_log_file) is True:
|
||||
return await tail_log()
|
||||
else:
|
||||
await sleep(1)
|
||||
@@ -0,0 +1,269 @@
|
||||
import time
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Dict, Any, Union, Tuple, Optional, Set, TypeVar, Generic, Type
|
||||
from aiohttp import web, ClientResponse
|
||||
import inspect
|
||||
|
||||
import psutil
|
||||
|
||||
|
||||
"""
|
||||
type variable representing an incoming payload to pyworker that will used to calculate load and will then
|
||||
be forwarded to the model
|
||||
"""
|
||||
|
||||
log = logging.getLogger(__file__)
|
||||
|
||||
|
||||
class JsonDataException(Exception):
|
||||
def __init__(self, json_msg: Dict[str, Any]):
|
||||
self.message = json_msg
|
||||
|
||||
|
||||
@dataclass
|
||||
class ApiPayload(ABC):
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def for_test(cls) -> "ApiPayload":
|
||||
"""defines how create a payload for load testing"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def generate_payload_json(self) -> Dict[str, Any]:
|
||||
"""defines how to convert an ApiPayload to JSON that will be sent to model API"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def count_workload(self) -> float:
|
||||
"""defines how to calculate workload for a payload"""
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def from_json_msg(cls, json_msg: Dict[str, Any]) -> "ApiPayload":
|
||||
"""
|
||||
defines how to create an API payload from a JSON message,
|
||||
it should throw an JsonDataException if there are issues with some fields
|
||||
or they are missing in the format of
|
||||
{
|
||||
"field": "error msg"
|
||||
}
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class AuthData:
|
||||
"""data used to authenticate requester"""
|
||||
|
||||
signature: str
|
||||
cost: str
|
||||
endpoint: str
|
||||
reqnum: int
|
||||
url: str
|
||||
|
||||
@classmethod
|
||||
def from_json_msg(cls, json_msg: Dict[str, Any]):
|
||||
errors = {}
|
||||
for param in inspect.signature(cls).parameters:
|
||||
if param not in json_msg:
|
||||
errors[param] = "missing parameter"
|
||||
if errors:
|
||||
raise JsonDataException(errors)
|
||||
return cls(
|
||||
**{
|
||||
k: v
|
||||
for k, v in json_msg.items()
|
||||
if k in inspect.signature(cls).parameters
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
ApiPayload_T = TypeVar("ApiPayload_T", bound=ApiPayload)
|
||||
|
||||
|
||||
@dataclass
|
||||
class EndpointHandler(ABC, Generic[ApiPayload_T]):
|
||||
"""
|
||||
Each model endpoint will have a handler responsible for counting workload from the incoming ApiPayload
|
||||
and converting it to json to be forwarded to model API
|
||||
"""
|
||||
|
||||
benchmark_runs: int = 8
|
||||
benchmark_words: int = 100
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def endpoint(self) -> str:
|
||||
"""the endpoint on the model API"""
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def payload_cls(cls) -> Type[ApiPayload_T]:
|
||||
"""ApiPayload class"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def make_benchmark_payload(self) -> ApiPayload_T:
|
||||
"""defines how to create an ApiPayload for benchmarking."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def generate_client_response(
|
||||
self, client_request: web.Request, model_response: ClientResponse
|
||||
) -> Union[web.Response, web.StreamResponse]:
|
||||
"""
|
||||
defines how to convert a model API response to a response to PyWorker client
|
||||
"""
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def get_data_from_request(
|
||||
cls, req_data: Dict[str, Any]
|
||||
) -> Tuple[AuthData, ApiPayload_T]:
|
||||
errors = {}
|
||||
auth_data = payload = None
|
||||
try:
|
||||
if "auth_data" in req_data:
|
||||
auth_data = AuthData.from_json_msg(req_data["auth_data"])
|
||||
else:
|
||||
errors["auth_data"] = "field missing"
|
||||
except JsonDataException as e:
|
||||
errors["auth_data"] = e.message
|
||||
try:
|
||||
if "payload" in req_data:
|
||||
payload = cls.payload_cls().from_json_msg(req_data["payload"])
|
||||
else:
|
||||
errors["payload"] = "field missing"
|
||||
except JsonDataException as e:
|
||||
errors["payload"] = e.message
|
||||
if errors:
|
||||
raise JsonDataException(errors)
|
||||
if auth_data and payload:
|
||||
return (auth_data, payload)
|
||||
else:
|
||||
raise Exception("error deserializing request data")
|
||||
|
||||
|
||||
@dataclass
|
||||
class SystemMetrics:
|
||||
"""General system metrics"""
|
||||
|
||||
model_loading_start: float
|
||||
model_loading_time: Union[float, None]
|
||||
last_disk_usage: float
|
||||
additional_disk_usage: float
|
||||
model_is_loaded: bool
|
||||
|
||||
@staticmethod
|
||||
def get_disk_usage_GB():
|
||||
return psutil.disk_usage("/").used / (2**30) # want units of GB
|
||||
|
||||
@classmethod
|
||||
def empty(cls):
|
||||
return cls(
|
||||
model_loading_start=time.time(),
|
||||
model_loading_time=None,
|
||||
last_disk_usage=SystemMetrics.get_disk_usage_GB(),
|
||||
additional_disk_usage=0.0,
|
||||
model_is_loaded=False,
|
||||
)
|
||||
|
||||
def update_disk_usage(self):
|
||||
disk_usage = SystemMetrics.get_disk_usage_GB()
|
||||
self.additional_disk_usage = disk_usage - self.last_disk_usage
|
||||
self.last_disk_usage = disk_usage
|
||||
|
||||
def reset(self):
|
||||
# autoscaler excepts model_loading_time to be populated only once, when the instance has
|
||||
# finished benchmarking and is ready to receive requests. This applies to restarted instances
|
||||
# as well: they should send model_loading_time once when they are done loading
|
||||
self.model_loading_time = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class ModelMetrics:
|
||||
"""Model specific metrics"""
|
||||
|
||||
# these are reset after being sent to autoscaler
|
||||
workload_served: float
|
||||
workload_received: float
|
||||
workload_cancelled: float
|
||||
workload_errored: float
|
||||
workload_pending: float
|
||||
# these are not
|
||||
cur_perf: float
|
||||
error_msg: Optional[str]
|
||||
max_throughput: float
|
||||
requests_recieved: Set[int] = field(default_factory=set)
|
||||
requests_working: Set[int] = field(default_factory=set)
|
||||
|
||||
@classmethod
|
||||
def empty(cls):
|
||||
return cls(
|
||||
workload_pending=0.0,
|
||||
workload_served=0.0,
|
||||
workload_cancelled=0.0,
|
||||
workload_errored=0.0,
|
||||
cur_perf=0.0,
|
||||
workload_received=0.0,
|
||||
error_msg=None,
|
||||
max_throughput=0.0,
|
||||
)
|
||||
|
||||
@property
|
||||
def workload_processing(self) -> float:
|
||||
return max(self.workload_received - self.workload_cancelled, 0.0)
|
||||
|
||||
def set_errored(self, error_msg):
|
||||
self.reset()
|
||||
self.error_msg = error_msg
|
||||
|
||||
def reset(self):
|
||||
self.workload_served = 0
|
||||
self.workload_received = 0
|
||||
self.workload_cancelled = 0
|
||||
self.workload_errored = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class AutoScalaerData:
|
||||
"""Data that is reported to autoscaler"""
|
||||
|
||||
id: int
|
||||
loadtime: float
|
||||
cur_load: float
|
||||
error_msg: str
|
||||
max_perf: float
|
||||
cur_perf: float
|
||||
cur_capacity: float
|
||||
max_capacity: float
|
||||
num_requests_working: int
|
||||
num_requests_recieved: int
|
||||
additional_disk_usage: float
|
||||
url: str
|
||||
|
||||
|
||||
class LogAction(Enum):
|
||||
"""
|
||||
These actions tell the backend what a log value means, for example:
|
||||
actions [
|
||||
# this marks the model server as loaded
|
||||
(LogAction.ModelLoaded, "Starting server"),
|
||||
# these mark the model server as errored
|
||||
(LogAction.ModelError, "Exception loading model"),
|
||||
(LogAction.ModelError, "Server failed to bind to port"),
|
||||
# this tells the backend to print any logs containing the string into its own logs
|
||||
# which are visible in the vast console instance logs
|
||||
(LogAction.Info, "Starting model download"),
|
||||
]
|
||||
"""
|
||||
|
||||
ModelLoaded = 1
|
||||
ModelError = 2
|
||||
Info = 3
|
||||
+153
@@ -0,0 +1,153 @@
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
import json
|
||||
from asyncio import sleep
|
||||
from dataclasses import dataclass, asdict, field
|
||||
from functools import cache
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import requests
|
||||
|
||||
from lib.data_types import AutoScalaerData, SystemMetrics, ModelMetrics
|
||||
from typing import Awaitable, NoReturn, List
|
||||
|
||||
METRICS_UPDATE_INTERVAL = 1
|
||||
|
||||
log = logging.getLogger(__file__)
|
||||
|
||||
|
||||
@cache
|
||||
def get_url() -> str:
|
||||
use_ssl = os.environ.get("USE_SSL", "false") == "true"
|
||||
worker_port = os.environ[f"VAST_TCP_PORT_{os.environ['WORKER_PORT']}"]
|
||||
public_ip = os.environ["PUBLIC_IPADDR"]
|
||||
return f"http{'s' if use_ssl else ''}://{public_ip}:{worker_port}"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Metrics:
|
||||
last_metric_update: float = 0.0
|
||||
update_pending: bool = False
|
||||
id: int = field(default_factory=lambda: int(os.environ["CONTAINER_ID"]))
|
||||
report_addr: List[str] = field(
|
||||
default_factory=lambda: os.environ["REPORT_ADDR"].split(",")
|
||||
)
|
||||
url: str = field(default_factory=get_url)
|
||||
system_metrics: SystemMetrics = field(default_factory=SystemMetrics.empty)
|
||||
model_metrics: ModelMetrics = field(default_factory=ModelMetrics.empty)
|
||||
|
||||
def _request_start(self, workload: float, reqnum: int) -> None:
|
||||
"""
|
||||
this function is called prior to forwarding a request to a model API.
|
||||
"""
|
||||
log.debug("request start")
|
||||
self.model_metrics.workload_pending += workload
|
||||
self.model_metrics.workload_received += workload
|
||||
self.model_metrics.requests_recieved.add(reqnum)
|
||||
self.model_metrics.requests_working.add(reqnum)
|
||||
|
||||
def _request_end(
|
||||
self, workload: float, req_response_time: float, reqnum: int
|
||||
) -> None:
|
||||
"""
|
||||
this function is called after a response from model API is received.
|
||||
"""
|
||||
self.model_metrics.workload_served += workload
|
||||
self.model_metrics.workload_pending -= workload
|
||||
self.model_metrics.requests_working.discard(reqnum)
|
||||
self.model_metrics.cur_perf = workload / req_response_time
|
||||
self.update_pending = True
|
||||
|
||||
def _request_errored(self, workload: float, reqnum: int) -> None:
|
||||
"""
|
||||
this function is called if model API returns an error
|
||||
"""
|
||||
self.model_metrics.workload_pending -= workload
|
||||
self.model_metrics.workload_errored += workload
|
||||
self.model_metrics.requests_working.discard(reqnum)
|
||||
|
||||
def _request_canceled(self, workload: float, reqnum: int) -> None:
|
||||
"""
|
||||
this function is called if client drops connection before model API has responded
|
||||
"""
|
||||
self.model_metrics.workload_pending -= workload
|
||||
self.model_metrics.workload_cancelled += workload
|
||||
self.model_metrics.requests_working.discard(reqnum)
|
||||
|
||||
async def _send_metrics_loop(self) -> Awaitable[NoReturn]:
|
||||
while True:
|
||||
await sleep(METRICS_UPDATE_INTERVAL)
|
||||
elapsed = time.time() - self.last_metric_update
|
||||
if self.system_metrics.model_is_loaded is False and elapsed >= 10:
|
||||
log.debug(f"sending loading model metrics after {int(elapsed)}s wait")
|
||||
self.__send_metrics_and_reset(elapsed)
|
||||
elif self.update_pending or elapsed > 10:
|
||||
log.debug(f"sending loaded model metrics after {int(elapsed)}s wait")
|
||||
self.__send_metrics_and_reset(elapsed)
|
||||
|
||||
def _model_loaded(self, max_throughput: float) -> None:
|
||||
self.system_metrics.model_loading_time = (
|
||||
time.time() - self.system_metrics.model_loading_start
|
||||
)
|
||||
self.system_metrics.model_is_loaded = True
|
||||
self.model_metrics.max_throughput = max_throughput
|
||||
|
||||
def _model_errored(self, error_msg: str) -> None:
|
||||
self.model_metrics.set_errored(error_msg)
|
||||
self.system_metrics.model_is_loaded = True
|
||||
|
||||
#######################################Private#######################################
|
||||
|
||||
def __send_metrics_and_reset(self, elapsed):
|
||||
|
||||
def compute_autoscaler_data() -> AutoScalaerData:
|
||||
return AutoScalaerData(
|
||||
id=self.id,
|
||||
loadtime=(self.system_metrics.model_loading_time or 0.0),
|
||||
cur_load=(self.model_metrics.workload_processing / elapsed),
|
||||
max_perf=self.model_metrics.max_throughput,
|
||||
cur_perf=self.model_metrics.cur_perf,
|
||||
error_msg=self.model_metrics.error_msg or "",
|
||||
num_requests_working=len(self.model_metrics.requests_working),
|
||||
num_requests_recieved=len(self.model_metrics.requests_recieved),
|
||||
additional_disk_usage=self.system_metrics.additional_disk_usage,
|
||||
cur_capacity=0,
|
||||
max_capacity=0,
|
||||
url=self.url,
|
||||
)
|
||||
|
||||
def send_data(report_addr: str) -> None:
|
||||
data = compute_autoscaler_data()
|
||||
full_path = urljoin(report_addr, "/worker_status/")
|
||||
log.debug(
|
||||
"\n".join(
|
||||
[
|
||||
"#" * 60,
|
||||
f"sending data to autoscaler",
|
||||
f"{json.dumps((asdict(data)), indent=2)}",
|
||||
"#" * 60,
|
||||
]
|
||||
)
|
||||
)
|
||||
for attempt in range(1, 4):
|
||||
try:
|
||||
requests.post(full_path, json=asdict(data), timeout=1)
|
||||
break
|
||||
except requests.Timeout:
|
||||
log.debug(f"autoscaler status update timed out")
|
||||
except Exception as e:
|
||||
log.debug(f"autoscaler status update failed with error: {e}")
|
||||
time.sleep(2)
|
||||
log.debug(f"retrying autoscaler status update, attempt: {attempt}")
|
||||
|
||||
###########
|
||||
|
||||
self.system_metrics.update_disk_usage()
|
||||
|
||||
for report_addr in self.report_addr:
|
||||
send_data(report_addr)
|
||||
self.update_pending = False
|
||||
self.model_metrics.reset()
|
||||
self.system_metrics.reset()
|
||||
self.last_metric_update = time.time()
|
||||
@@ -0,0 +1,40 @@
|
||||
import os
|
||||
import logging
|
||||
from typing import List
|
||||
import ssl
|
||||
from asyncio import run, gather
|
||||
|
||||
|
||||
from lib.backend import Backend
|
||||
from aiohttp import web
|
||||
|
||||
log = logging.getLogger(__file__)
|
||||
|
||||
|
||||
def start_server(backend: Backend, routes: List[web.RouteDef], **kwargs):
|
||||
log.debug("getting certificate...")
|
||||
use_ssl = os.environ.get("USE_SSL", "false") == "true"
|
||||
if use_ssl is True:
|
||||
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
||||
ssl_context.load_cert_chain(
|
||||
certfile="/etc/instance.crt",
|
||||
keyfile="/etc/instance.key",
|
||||
)
|
||||
else:
|
||||
ssl_context = None
|
||||
|
||||
async def main():
|
||||
log.debug("starting server...")
|
||||
app = web.Application()
|
||||
app.add_routes(routes)
|
||||
runner = web.AppRunner(app)
|
||||
await runner.setup()
|
||||
site = web.TCPSite(
|
||||
runner,
|
||||
ssl_context=ssl_context,
|
||||
port=int(os.environ["WORKER_PORT"]),
|
||||
**kwargs
|
||||
)
|
||||
await gather(site.start(), backend._start_tracking())
|
||||
|
||||
run(main())
|
||||
@@ -0,0 +1,267 @@
|
||||
import os
|
||||
import time
|
||||
import argparse
|
||||
from typing import Callable, List, Dict, Tuple, Dict, Any, Type
|
||||
from time import sleep
|
||||
import threading
|
||||
from enum import Enum
|
||||
from collections import Counter
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from urllib.parse import urljoin
|
||||
|
||||
import requests
|
||||
|
||||
from lib.data_types import AuthData, ApiPayload
|
||||
|
||||
|
||||
class ClientStatus(Enum):
|
||||
FetchEndpoint = 1
|
||||
Generating = 2
|
||||
Done = 3
|
||||
Error = 4
|
||||
|
||||
|
||||
total_success = 0
|
||||
last_res = []
|
||||
stop_event = threading.Event()
|
||||
|
||||
start_time = time.time()
|
||||
test_args = argparse.ArgumentParser(description="Test inference endpoint")
|
||||
test_args.add_argument(
|
||||
"-k", dest="api_key", type=str, required=True, help="Your vast account API key"
|
||||
)
|
||||
test_args.add_argument(
|
||||
"-e",
|
||||
dest="endpoint_group_name",
|
||||
type=str,
|
||||
required=True,
|
||||
help="Endpoint group name",
|
||||
)
|
||||
test_args.add_argument(
|
||||
"-l",
|
||||
dest="server_url",
|
||||
action="store_const",
|
||||
const="http://localhost:8081",
|
||||
default="https://run.vast.ai",
|
||||
help="Call local autoscaler instead of prod, for dev use only",
|
||||
)
|
||||
|
||||
GetPayloadAndWorkload = Callable[[], Tuple[Dict[str, Any], float]]
|
||||
|
||||
|
||||
def print_truncate_res(res: str):
|
||||
if len(res) > 150:
|
||||
print(f"{res[:50]}....{res[-100:]}")
|
||||
else:
|
||||
print(res)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ClientState:
|
||||
endpoint_group_name: str
|
||||
api_key: str
|
||||
server_url: str
|
||||
worker_endpoint: str
|
||||
payload: ApiPayload
|
||||
url: str = ""
|
||||
status: ClientStatus = ClientStatus.FetchEndpoint
|
||||
as_error: List[str] = field(default_factory=list)
|
||||
infer_error: List[str] = field(default_factory=list)
|
||||
conn_errors: Counter = field(default_factory=Counter)
|
||||
|
||||
def make_call(self):
|
||||
self.status = ClientStatus.FetchEndpoint
|
||||
route_payload = {
|
||||
"endpoint": self.endpoint_group_name,
|
||||
"api_key": self.api_key,
|
||||
"cost": self.payload.count_workload(),
|
||||
}
|
||||
response = requests.post(
|
||||
urljoin(self.server_url, "/route/"),
|
||||
json=route_payload,
|
||||
timeout=4,
|
||||
)
|
||||
if response.status_code != 200:
|
||||
self.as_error.append(
|
||||
f"code: {response.status_code}, body: {response.text}",
|
||||
)
|
||||
self.status = ClientStatus.Error
|
||||
return
|
||||
message = response.json()
|
||||
worker_address = message["url"]
|
||||
req_data = dict(
|
||||
payload=asdict(self.payload),
|
||||
auth_data=asdict(AuthData.from_json_msg(message)),
|
||||
)
|
||||
self.url = worker_address
|
||||
url = urljoin(worker_address, self.worker_endpoint)
|
||||
self.status = ClientStatus.Generating
|
||||
response = requests.post(
|
||||
url,
|
||||
json=req_data,
|
||||
)
|
||||
if response.status_code != 200:
|
||||
self.infer_error.append(
|
||||
f"code: {response.status_code}, body: {response.text}, url: {url}",
|
||||
)
|
||||
self.status = ClientStatus.Error
|
||||
return
|
||||
res = str(response.json())
|
||||
global total_success
|
||||
global last_res
|
||||
total_success += 1
|
||||
last_res.append(res)
|
||||
self.status = ClientStatus.Done
|
||||
|
||||
def simulate_user(self) -> None:
|
||||
try:
|
||||
self.make_call()
|
||||
except Exception as e:
|
||||
self.status = ClientStatus.Error
|
||||
_ = e
|
||||
self.conn_errors[self.url] += 1
|
||||
|
||||
|
||||
def print_state(clients: List[ClientState], num_clients: int) -> None:
|
||||
print("starting up...")
|
||||
sleep(2)
|
||||
center_size = 14
|
||||
global start_time
|
||||
while len(clients) < num_clients or (
|
||||
any(
|
||||
map(
|
||||
lambda client: client.status
|
||||
in [ClientStatus.FetchEndpoint, ClientStatus.Generating],
|
||||
clients,
|
||||
)
|
||||
)
|
||||
):
|
||||
sleep(0.5)
|
||||
os.system("clear")
|
||||
print(
|
||||
" | ".join(
|
||||
[member.name.center(center_size) for member in ClientStatus]
|
||||
+ [
|
||||
item.center(center_size)
|
||||
for item in [
|
||||
"urls",
|
||||
"as_error",
|
||||
"infer_error",
|
||||
"conn_error",
|
||||
"total_success",
|
||||
]
|
||||
]
|
||||
)
|
||||
)
|
||||
unique_urls = len(set([c.url for c in clients if c.url != ""]))
|
||||
as_errors = sum(
|
||||
map(
|
||||
lambda client: len(client.as_error),
|
||||
[client for client in clients],
|
||||
)
|
||||
)
|
||||
infer_errors = sum(
|
||||
map(
|
||||
lambda client: len(client.infer_error),
|
||||
[client for client in clients],
|
||||
)
|
||||
)
|
||||
conn_errors = sum([client.conn_errors for client in clients], start=Counter())
|
||||
conn_errors_str = ",".join(map(str, conn_errors.values())) or "0"
|
||||
elapsed = time.time() - start_time
|
||||
print(
|
||||
" | ".join(
|
||||
map(
|
||||
lambda item: str(item).center(center_size),
|
||||
[
|
||||
len(list(filter(lambda x: x.status == member, clients)))
|
||||
for member in ClientStatus
|
||||
]
|
||||
+ [
|
||||
unique_urls,
|
||||
as_errors,
|
||||
infer_errors,
|
||||
conn_errors_str,
|
||||
f"{total_success}({((total_success/elapsed) * 60):.2f}/minute)",
|
||||
],
|
||||
)
|
||||
)
|
||||
)
|
||||
if conn_errors:
|
||||
print("conn_errors:")
|
||||
for url, count in conn_errors.items():
|
||||
print(url.ljust(28), ": ", str(count))
|
||||
elapsed = time.time() - start_time
|
||||
print(f"\n elapsed: {int(elapsed // 60)}:{int(elapsed % 60)}")
|
||||
if last_res:
|
||||
for i, res in enumerate(last_res[-10:]):
|
||||
print_truncate_res(f"res #{1+i+max(len(last_res )-10,0)}: {res}")
|
||||
if stop_event.is_set():
|
||||
print("\n### waiting for existing connections to close ###")
|
||||
|
||||
|
||||
def run_test(
|
||||
num_requests: int,
|
||||
requests_per_second: int,
|
||||
endpoint_group_name: str,
|
||||
api_key: str,
|
||||
server_url: str,
|
||||
worker_endpoint: str,
|
||||
payload_cls: Type[ApiPayload],
|
||||
):
|
||||
threads = []
|
||||
|
||||
clients = []
|
||||
print_thread = threading.Thread(target=print_state, args=(clients, num_requests))
|
||||
print_thread.daemon = True # makes threads get killed on program exit
|
||||
print_thread.start()
|
||||
try:
|
||||
for _ in range(num_requests):
|
||||
client = ClientState(
|
||||
endpoint_group_name=endpoint_group_name,
|
||||
api_key=api_key,
|
||||
server_url=server_url,
|
||||
worker_endpoint=worker_endpoint,
|
||||
payload=payload_cls.for_test(),
|
||||
)
|
||||
clients.append(client)
|
||||
thread = threading.Thread(target=client.simulate_user, args=())
|
||||
threads.append(thread)
|
||||
thread.start()
|
||||
sleep(1 / requests_per_second)
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
print("done spawning workers")
|
||||
except KeyboardInterrupt:
|
||||
stop_event.set()
|
||||
|
||||
|
||||
def test_load_cmd(
|
||||
payload_cls: Type[ApiPayload], endpoint: str, arg_parser: argparse.ArgumentParser
|
||||
):
|
||||
arg_parser.add_argument(
|
||||
"-n",
|
||||
dest="num_requests",
|
||||
type=int,
|
||||
required=True,
|
||||
help="total number of requests",
|
||||
)
|
||||
arg_parser.add_argument(
|
||||
"-rps",
|
||||
dest="requests_per_second",
|
||||
type=float,
|
||||
required=True,
|
||||
help="requests per second",
|
||||
)
|
||||
args = arg_parser.parse_args()
|
||||
if hasattr(args, "comfy_model"):
|
||||
os.environ["COMFY_MODEL"] = args.comfy_model
|
||||
run_test(
|
||||
num_requests=args.num_requests,
|
||||
requests_per_second=args.requests_per_second,
|
||||
api_key=args.api_key,
|
||||
server_url=args.server_url,
|
||||
endpoint_group_name=args.endpoint_group_name,
|
||||
worker_endpoint=endpoint,
|
||||
payload_cls=payload_cls,
|
||||
)
|
||||
Reference in New Issue
Block a user