Compare commits

...

4 Commits

10 changed files with 104 additions and 65 deletions
+38 -36
View File
@@ -126,7 +126,7 @@ class Backend:
async def cancel_api_call_if_disconnected() -> web.Response: async def cancel_api_call_if_disconnected() -> web.Response:
await request.wait_for_disconnection() await request.wait_for_disconnection()
log.debug(f"request with reqnum: {auth_data.reqnum} was canceled") log.debug(f"request with reqnum: {auth_data.reqnum} was canceled")
self.metrics._request_canceled(workload=workload, reqnum=auth_data.reqnum) self.metrics._request_canceled(workload=workload)
return web.Response(status=500) return web.Response(status=500)
async def make_request() -> Union[web.Response, web.StreamResponse]: async def make_request() -> Union[web.Response, web.StreamResponse]:
@@ -141,7 +141,6 @@ class Backend:
else: else:
log.debug(f"Starting request for reqnum:{auth_data.reqnum}") log.debug(f"Starting request for reqnum:{auth_data.reqnum}")
try: try:
start_time = time.time()
response = await self.__call_api(handler=handler, payload=payload) response = await self.__call_api(handler=handler, payload=payload)
status_code = response.status status_code = response.status
log.debug( log.debug(
@@ -153,19 +152,17 @@ class Backend:
) )
) )
res = await handler.generate_client_response(request, response) res = await handler.generate_client_response(request, response)
self.metrics._request_end( self.metrics._request_success(workload=workload)
workload=workload,
req_response_time=time.time() - start_time,
reqnum=auth_data.reqnum,
)
return res return res
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
log.debug(f"[backend] Request error: {e}") log.debug(f"[backend] Request error: {e}")
self.metrics._request_errored( self.metrics._request_errored(workload=workload)
workload=workload, reqnum=auth_data.reqnum
)
return web.Response(status=500) return web.Response(status=500)
finally: finally:
self.metrics._request_end(
workload=workload,
reqnum=auth_data.reqnum,
)
self.sem.release() self.sem.release()
########### ###########
@@ -186,12 +183,6 @@ class Backend:
except Exception as e: except Exception as e:
log.debug(f"Exception in main handler loop {e}") log.debug(f"Exception in main handler loop {e}")
return web.Response(status=500) return web.Response(status=500)
finally:
if request.task.cancelled():
log.debug(f"request with reqnum: {auth_data.reqnum} was canceled")
self.metrics._request_canceled(
workload=workload, reqnum=auth_data.reqnum
)
async def __healthcheck(self): async def __healthcheck(self):
health_check_url = self.benchmark_handler.healthcheck_endpoint health_check_url = self.benchmark_handler.healthcheck_endpoint
@@ -289,41 +280,52 @@ class Backend:
return float(f.readline()) return float(f.readline())
except FileNotFoundError: except FileNotFoundError:
pass pass
max_throughput = 0
last_throughput = 0 log.debug("Initial run to trigger model loading...")
sum_throughput = 0
for run in range(self.benchmark_handler.benchmark_runs + 1):
start = time.time()
payload = self.benchmark_handler.make_benchmark_payload() payload = self.benchmark_handler.make_benchmark_payload()
res = await self.__call_api( await self.__call_api(handler=self.benchmark_handler, payload=payload)
handler=self.benchmark_handler, payload=payload
max_throughput = 0
sum_throughput = 0
concurrent_requests = 10 if self.allow_parallel_requests else 1
for run in range(1, self.benchmark_handler.benchmark_runs + 1):
start = time.time()
tasks = []
total_workload = 0
for _ in range(concurrent_requests):
payload = self.benchmark_handler.make_benchmark_payload()
total_workload += payload.count_workload()
tasks.append(
self.__call_api(handler=self.benchmark_handler, payload=payload)
) )
data = await res.json()
responses = await gather(*tasks)
time_elapsed = time.time() - start time_elapsed = time.time() - start
# first run triggers one-time loading of the model which is very slow, so we skip counting it
if run == 0: throughput = total_workload / time_elapsed
continue sum_throughput += throughput
else: max_throughput = max(max_throughput, throughput)
workload = payload.count_workload()
last_throughput = workload / time_elapsed # Log results for debugging
sum_throughput += last_throughput
max_throughput = max(max_throughput, last_throughput)
log.debug( log.debug(
"\n".join( "\n".join(
[ [
"#" * 60, "#" * 60,
f"Run: {run}, workload: {workload} time_elapsed: {time_elapsed}, throughput: {last_throughput}", f"Run: {run}, concurrent_requests: {concurrent_requests}",
"", f"Total workload: {total_workload}, time_elapsed: {time_elapsed}s",
f"response: {data}", f"Throughput: {throughput} workload/s",
f"Successful responses: {len([r for r in responses if r.status == 200])}",
"#" * 60, "#" * 60,
] ]
) )
) )
average_throughput = sum_throughput / self.benchmark_handler.benchmark_runs average_throughput = sum_throughput / self.benchmark_handler.benchmark_runs
log.debug( log.debug(
f"benchmark result: avg {average_throughput} workload per second, max {max_throughput}" f"benchmark result: avg {average_throughput} workload per second, max {max_throughput}"
) )
# save max_throughput so we don't have to run benchmark again on restart of cold instances
with open(BENCHMARK_INDICATOR_FILE, "w") as f: with open(BENCHMARK_INDICATOR_FILE, "w") as f:
f.write(str(max_throughput)) f.write(str(max_throughput))
return max_throughput return max_throughput
+7 -4
View File
@@ -8,7 +8,6 @@ from aiohttp import web, ClientResponse
import inspect import inspect
import psutil import psutil
import requests
""" """
@@ -206,13 +205,13 @@ class ModelMetrics:
workload_received: float workload_received: float
workload_cancelled: float workload_cancelled: float
workload_errored: float workload_errored: float
workload_pending: float
# these are not # these are not
cur_perf: float workload_pending: float
error_msg: Optional[str] error_msg: Optional[str]
max_throughput: float max_throughput: float
requests_recieved: Set[int] = field(default_factory=set) requests_recieved: Set[int] = field(default_factory=set)
requests_working: Set[int] = field(default_factory=set) requests_working: Set[int] = field(default_factory=set)
last_update: float = field(default_factory=time.time)
@classmethod @classmethod
def empty(cls): def empty(cls):
@@ -221,12 +220,15 @@ class ModelMetrics:
workload_served=0.0, workload_served=0.0,
workload_cancelled=0.0, workload_cancelled=0.0,
workload_errored=0.0, workload_errored=0.0,
cur_perf=0.0,
workload_received=0.0, workload_received=0.0,
error_msg=None, error_msg=None,
max_throughput=0.0, max_throughput=0.0,
) )
@property
def cur_perf(self) -> float:
return max(self.workload_served / (time.time() - self.last_update), 0.0)
@property @property
def workload_processing(self) -> float: def workload_processing(self) -> float:
return max(self.workload_received - self.workload_cancelled, 0.0) return max(self.workload_received - self.workload_cancelled, 0.0)
@@ -240,6 +242,7 @@ class ModelMetrics:
self.workload_received = 0 self.workload_received = 0
self.workload_cancelled = 0 self.workload_cancelled = 0
self.workload_errored = 0 self.workload_errored = 0
self.last_update = time.time()
@dataclass @dataclass
+10 -12
View File
@@ -46,33 +46,31 @@ class Metrics:
self.model_metrics.requests_recieved.add(reqnum) self.model_metrics.requests_recieved.add(reqnum)
self.model_metrics.requests_working.add(reqnum) self.model_metrics.requests_working.add(reqnum)
def _request_end( def _request_end(self, workload: float, reqnum: int) -> None:
self, workload: float, req_response_time: float, reqnum: int
) -> None:
""" """
this function is called after a response from model API is received. this function is called after handling of a request ends, regardless of the outcome
""" """
self.model_metrics.workload_served += workload
self.model_metrics.workload_pending -= workload self.model_metrics.workload_pending -= workload
self.model_metrics.requests_working.discard(reqnum) self.model_metrics.requests_working.discard(reqnum)
self.model_metrics.cur_perf = workload / req_response_time
def _request_success(self, workload: float) -> None:
"""
this function is called after a response from model API is received and forwarded.
"""
self.model_metrics.workload_served += workload
self.update_pending = True self.update_pending = True
def _request_errored(self, workload: float, reqnum: int) -> None: def _request_errored(self, workload: float) -> None:
""" """
this function is called if model API returns an error this function is called if model API returns an error
""" """
self.model_metrics.workload_pending -= workload
self.model_metrics.workload_errored += workload self.model_metrics.workload_errored += workload
self.model_metrics.requests_working.discard(reqnum)
def _request_canceled(self, workload: float, reqnum: int) -> None: def _request_canceled(self, workload: float) -> None:
""" """
this function is called if client drops connection before model API has responded this function is called if client drops connection before model API has responded
""" """
self.model_metrics.workload_pending -= workload
self.model_metrics.workload_cancelled += workload self.model_metrics.workload_cancelled += workload
self.model_metrics.requests_working.discard(reqnum)
async def _send_metrics_loop(self) -> Awaitable[NoReturn]: async def _send_metrics_loop(self) -> Awaitable[NoReturn]:
while True: while True:
+3
View File
@@ -10,6 +10,7 @@ from collections import Counter
from dataclasses import dataclass, field, asdict from dataclasses import dataclass, field, asdict
from urllib.parse import urljoin from urllib.parse import urljoin
from utils.endpoint_util import Endpoint from utils.endpoint_util import Endpoint
from utils.ssl import get_cert_file_path
import requests import requests
from lib.data_types import AuthData, ApiPayload from lib.data_types import AuthData, ApiPayload
@@ -120,9 +121,11 @@ class ClientState:
self.url = worker_address self.url = worker_address
url = urljoin(worker_address, self.worker_endpoint) url = urljoin(worker_address, self.worker_endpoint)
self.status = ClientStatus.Generating self.status = ClientStatus.Generating
response = requests.post( response = requests.post(
url, url,
json=req_data, json=req_data,
verify=get_cert_file_path(),
) )
if response.status_code != 200: if response.status_code != 200:
self.infer_error.append( self.infer_error.append(
+2 -2
View File
@@ -1,4 +1,4 @@
aiohttp==3.10.1 aiohttp[speedups]==3.10.1
anyio~=4.4 anyio~=4.4
lib~=4.0 lib~=4.0
nltk~=3.9 nltk~=3.9
@@ -6,5 +6,5 @@ psutil~=6.0
pycryptodome~=3.20 pycryptodome~=3.20
Requests~=2.32 Requests~=2.32
transformers~=4.52 transformers~=4.52
utils~=1.0 utils==1.0.*
hf_transfer>=0.1.9 hf_transfer>=0.1.9
+6 -1
View File
@@ -30,7 +30,12 @@ class Endpoint:
Returns: Returns:
Endpoint API key if successful, None otherwise Endpoint API key if successful, None otherwise
""" """
vast_console_url = "https://console.vast.ai/api/v0/endptjobs/" endpoints = {
"alpha": "alpha",
"candidate": "candidate",
"prod": "console",
}
vast_console_url = f"https://{endpoints[instance]}.vast.ai/api/v0/endptjobs/"
headers = {"Authorization": f"Bearer {account_api_key}"} headers = {"Authorization": f"Bearer {account_api_key}"}
try: try:
+15
View File
@@ -0,0 +1,15 @@
import tempfile
from functools import cache
import requests
@cache
def get_cert_file_path():
cert_url = "https://console.vast.ai/static/jvastai_root.cer"
response = requests.get(cert_url)
response.raise_for_status()
# Use a temporary file that is not deleted on close
with tempfile.NamedTemporaryFile(delete=False, suffix=".cer", mode="wb") as f:
f.write(response.content)
return f.name
+3
View File
@@ -5,6 +5,7 @@ import requests
from lib.test_utils import print_truncate_res from lib.test_utils import print_truncate_res
from utils.endpoint_util import Endpoint from utils.endpoint_util import Endpoint
from utils.ssl import get_cert_file_path
""" """
NOTE: this client example uses a custom comfy workflow compatible with SD3 only NOTE: this client example uses a custom comfy workflow compatible with SD3 only
@@ -51,6 +52,7 @@ def call_default_workflow(
response = requests.post( response = requests.post(
url, url,
json=req_data, json=req_data,
verify=get_cert_file_path(),
) )
response.raise_for_status() response.raise_for_status()
print_truncate_res(str(response.json())) print_truncate_res(str(response.json()))
@@ -141,6 +143,7 @@ def call_custom_workflow_for_sd3(
response = requests.post( response = requests.post(
url, url,
json=req_data, json=req_data,
verify=get_cert_file_path(),
) )
response.raise_for_status() response.raise_for_status()
print_truncate_res(str(response.json())) print_truncate_res(str(response.json()))
+7 -2
View File
@@ -6,6 +6,7 @@ from urllib.parse import urljoin
from typing import Dict, Any, Optional, Iterator, Union, List from typing import Dict, Any, Optional, Iterator, Union, List
import requests import requests
from utils.endpoint_util import Endpoint from utils.endpoint_util import Endpoint
from utils.ssl import get_cert_file_path
from .data_types.client import CompletionConfig, ChatCompletionConfig from .data_types.client import CompletionConfig, ChatCompletionConfig
logging.basicConfig( logging.basicConfig(
@@ -90,9 +91,13 @@ class APIClient:
# Make the request using the specified method # Make the request using the specified method
if method.upper() == "POST": if method.upper() == "POST":
response = requests.post(url, json=req_data, stream=stream) response = requests.post(
url, json=req_data, stream=stream, verify=get_cert_file_path()
)
elif method.upper() == "GET": elif method.upper() == "GET":
response = requests.get(url, params=req_data, stream=stream) response = requests.get(
url, params=req_data, stream=stream, verify=get_cert_file_path()
)
else: else:
raise ValueError(f"Unsupported HTTP method: {method}") raise ValueError(f"Unsupported HTTP method: {method}")
+6 -1
View File
@@ -4,6 +4,7 @@ import json
from urllib.parse import urljoin from urllib.parse import urljoin
import requests import requests
from utils.endpoint_util import Endpoint from utils.endpoint_util import Endpoint
from utils.ssl import get_cert_file_path
logging.basicConfig( logging.basicConfig(
level=logging.DEBUG, level=logging.DEBUG,
@@ -42,7 +43,11 @@ def call_generate(endpoint_group_name: str, api_key: str, server_url: str) -> No
req_data = dict(payload=payload, auth_data=auth_data) req_data = dict(payload=payload, auth_data=auth_data)
url = urljoin(url, WORKER_ENDPOINT) url = urljoin(url, WORKER_ENDPOINT)
print(f"url: {url}") print(f"url: {url}")
response = requests.post(url, json=req_data) response = requests.post(
url,
json=req_data,
verify=get_cert_file_path(),
)
response.raise_for_status() response.raise_for_status()
res = response.json() res = response.json()
print(res) print(res)