Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d3be9fe7db | |||
| e0be45f39a | |||
| be2aafdb1f |
+7
-16
@@ -126,7 +126,7 @@ class Backend:
|
|||||||
async def cancel_api_call_if_disconnected() -> web.Response:
|
async def cancel_api_call_if_disconnected() -> web.Response:
|
||||||
await request.wait_for_disconnection()
|
await request.wait_for_disconnection()
|
||||||
log.debug(f"request with reqnum: {auth_data.reqnum} was canceled")
|
log.debug(f"request with reqnum: {auth_data.reqnum} was canceled")
|
||||||
self.metrics._request_canceled(workload=workload, reqnum=auth_data.reqnum)
|
self.metrics._request_canceled(workload=workload)
|
||||||
return web.Response(status=500)
|
return web.Response(status=500)
|
||||||
|
|
||||||
async def make_request() -> Union[web.Response, web.StreamResponse]:
|
async def make_request() -> Union[web.Response, web.StreamResponse]:
|
||||||
@@ -141,7 +141,6 @@ class Backend:
|
|||||||
else:
|
else:
|
||||||
log.debug(f"Starting request for reqnum:{auth_data.reqnum}")
|
log.debug(f"Starting request for reqnum:{auth_data.reqnum}")
|
||||||
try:
|
try:
|
||||||
start_time = time.time()
|
|
||||||
response = await self.__call_api(handler=handler, payload=payload)
|
response = await self.__call_api(handler=handler, payload=payload)
|
||||||
status_code = response.status
|
status_code = response.status
|
||||||
log.debug(
|
log.debug(
|
||||||
@@ -153,19 +152,17 @@ class Backend:
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
res = await handler.generate_client_response(request, response)
|
res = await handler.generate_client_response(request, response)
|
||||||
self.metrics._request_end(
|
self.metrics._request_success(workload=workload)
|
||||||
workload=workload,
|
|
||||||
req_response_time=time.time() - start_time,
|
|
||||||
reqnum=auth_data.reqnum,
|
|
||||||
)
|
|
||||||
return res
|
return res
|
||||||
except requests.exceptions.RequestException as e:
|
except requests.exceptions.RequestException as e:
|
||||||
log.debug(f"[backend] Request error: {e}")
|
log.debug(f"[backend] Request error: {e}")
|
||||||
self.metrics._request_errored(
|
self.metrics._request_errored(workload=workload)
|
||||||
workload=workload, reqnum=auth_data.reqnum
|
|
||||||
)
|
|
||||||
return web.Response(status=500)
|
return web.Response(status=500)
|
||||||
finally:
|
finally:
|
||||||
|
self.metrics._request_end(
|
||||||
|
workload=workload,
|
||||||
|
reqnum=auth_data.reqnum,
|
||||||
|
)
|
||||||
self.sem.release()
|
self.sem.release()
|
||||||
|
|
||||||
###########
|
###########
|
||||||
@@ -186,12 +183,6 @@ class Backend:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.debug(f"Exception in main handler loop {e}")
|
log.debug(f"Exception in main handler loop {e}")
|
||||||
return web.Response(status=500)
|
return web.Response(status=500)
|
||||||
finally:
|
|
||||||
if request.task.cancelled():
|
|
||||||
log.debug(f"request with reqnum: {auth_data.reqnum} was canceled")
|
|
||||||
self.metrics._request_canceled(
|
|
||||||
workload=workload, reqnum=auth_data.reqnum
|
|
||||||
)
|
|
||||||
|
|
||||||
async def __healthcheck(self):
|
async def __healthcheck(self):
|
||||||
health_check_url = self.benchmark_handler.healthcheck_endpoint
|
health_check_url = self.benchmark_handler.healthcheck_endpoint
|
||||||
|
|||||||
+7
-4
@@ -8,7 +8,6 @@ from aiohttp import web, ClientResponse
|
|||||||
import inspect
|
import inspect
|
||||||
|
|
||||||
import psutil
|
import psutil
|
||||||
import requests
|
|
||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@@ -206,13 +205,13 @@ class ModelMetrics:
|
|||||||
workload_received: float
|
workload_received: float
|
||||||
workload_cancelled: float
|
workload_cancelled: float
|
||||||
workload_errored: float
|
workload_errored: float
|
||||||
workload_pending: float
|
|
||||||
# these are not
|
# these are not
|
||||||
cur_perf: float
|
workload_pending: float
|
||||||
error_msg: Optional[str]
|
error_msg: Optional[str]
|
||||||
max_throughput: float
|
max_throughput: float
|
||||||
requests_recieved: Set[int] = field(default_factory=set)
|
requests_recieved: Set[int] = field(default_factory=set)
|
||||||
requests_working: Set[int] = field(default_factory=set)
|
requests_working: Set[int] = field(default_factory=set)
|
||||||
|
last_update: float = field(default_factory=time.time)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def empty(cls):
|
def empty(cls):
|
||||||
@@ -221,12 +220,15 @@ class ModelMetrics:
|
|||||||
workload_served=0.0,
|
workload_served=0.0,
|
||||||
workload_cancelled=0.0,
|
workload_cancelled=0.0,
|
||||||
workload_errored=0.0,
|
workload_errored=0.0,
|
||||||
cur_perf=0.0,
|
|
||||||
workload_received=0.0,
|
workload_received=0.0,
|
||||||
error_msg=None,
|
error_msg=None,
|
||||||
max_throughput=0.0,
|
max_throughput=0.0,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cur_perf(self) -> float:
|
||||||
|
return max(self.workload_served / (time.time() - self.last_update), 0.0)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def workload_processing(self) -> float:
|
def workload_processing(self) -> float:
|
||||||
return max(self.workload_received - self.workload_cancelled, 0.0)
|
return max(self.workload_received - self.workload_cancelled, 0.0)
|
||||||
@@ -240,6 +242,7 @@ class ModelMetrics:
|
|||||||
self.workload_received = 0
|
self.workload_received = 0
|
||||||
self.workload_cancelled = 0
|
self.workload_cancelled = 0
|
||||||
self.workload_errored = 0
|
self.workload_errored = 0
|
||||||
|
self.last_update = time.time()
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
|||||||
+10
-12
@@ -46,33 +46,31 @@ class Metrics:
|
|||||||
self.model_metrics.requests_recieved.add(reqnum)
|
self.model_metrics.requests_recieved.add(reqnum)
|
||||||
self.model_metrics.requests_working.add(reqnum)
|
self.model_metrics.requests_working.add(reqnum)
|
||||||
|
|
||||||
def _request_end(
|
def _request_end(self, workload: float, reqnum: int) -> None:
|
||||||
self, workload: float, req_response_time: float, reqnum: int
|
|
||||||
) -> None:
|
|
||||||
"""
|
"""
|
||||||
this function is called after a response from model API is received.
|
this function is called after handling of a request ends, regardless of the outcome
|
||||||
"""
|
"""
|
||||||
self.model_metrics.workload_served += workload
|
|
||||||
self.model_metrics.workload_pending -= workload
|
self.model_metrics.workload_pending -= workload
|
||||||
self.model_metrics.requests_working.discard(reqnum)
|
self.model_metrics.requests_working.discard(reqnum)
|
||||||
self.model_metrics.cur_perf = workload / req_response_time
|
|
||||||
|
def _request_success(self, workload: float) -> None:
|
||||||
|
"""
|
||||||
|
this function is called after a response from model API is received and forwarded.
|
||||||
|
"""
|
||||||
|
self.model_metrics.workload_served += workload
|
||||||
self.update_pending = True
|
self.update_pending = True
|
||||||
|
|
||||||
def _request_errored(self, workload: float, reqnum: int) -> None:
|
def _request_errored(self, workload: float) -> None:
|
||||||
"""
|
"""
|
||||||
this function is called if model API returns an error
|
this function is called if model API returns an error
|
||||||
"""
|
"""
|
||||||
self.model_metrics.workload_pending -= workload
|
|
||||||
self.model_metrics.workload_errored += workload
|
self.model_metrics.workload_errored += workload
|
||||||
self.model_metrics.requests_working.discard(reqnum)
|
|
||||||
|
|
||||||
def _request_canceled(self, workload: float, reqnum: int) -> None:
|
def _request_canceled(self, workload: float) -> None:
|
||||||
"""
|
"""
|
||||||
this function is called if client drops connection before model API has responded
|
this function is called if client drops connection before model API has responded
|
||||||
"""
|
"""
|
||||||
self.model_metrics.workload_pending -= workload
|
|
||||||
self.model_metrics.workload_cancelled += workload
|
self.model_metrics.workload_cancelled += workload
|
||||||
self.model_metrics.requests_working.discard(reqnum)
|
|
||||||
|
|
||||||
async def _send_metrics_loop(self) -> Awaitable[NoReturn]:
|
async def _send_metrics_loop(self) -> Awaitable[NoReturn]:
|
||||||
while True:
|
while True:
|
||||||
|
|||||||
+1
-1
@@ -27,7 +27,7 @@ def start_server(backend: Backend, routes: List[web.RouteDef], **kwargs):
|
|||||||
log.debug("starting server...")
|
log.debug("starting server...")
|
||||||
app = web.Application()
|
app = web.Application()
|
||||||
app.add_routes(routes)
|
app.add_routes(routes)
|
||||||
runner = web.AppRunner(app, handler_cancellation=True)
|
runner = web.AppRunner(app)
|
||||||
await runner.setup()
|
await runner.setup()
|
||||||
site = web.TCPSite(
|
site = web.TCPSite(
|
||||||
runner,
|
runner,
|
||||||
|
|||||||
+2
-2
@@ -1,4 +1,4 @@
|
|||||||
aiohttp==3.10.1
|
aiohttp[speedups]==3.10.1
|
||||||
anyio~=4.4
|
anyio~=4.4
|
||||||
lib~=4.0
|
lib~=4.0
|
||||||
nltk~=3.9
|
nltk~=3.9
|
||||||
@@ -6,5 +6,5 @@ psutil~=6.0
|
|||||||
pycryptodome~=3.20
|
pycryptodome~=3.20
|
||||||
Requests~=2.32
|
Requests~=2.32
|
||||||
transformers~=4.52
|
transformers~=4.52
|
||||||
utils~=1.0
|
utils==1.0.*
|
||||||
hf_transfer>=0.1.9
|
hf_transfer>=0.1.9
|
||||||
|
|||||||
+19
-15
@@ -28,24 +28,16 @@ class APIClient:
|
|||||||
DEFAULT_TIMEOUT = 4
|
DEFAULT_TIMEOUT = 4
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self, endpoint_group_name: str, api_key: str, server_url: str, instance: str
|
self,
|
||||||
|
endpoint_group_name: str,
|
||||||
|
api_key: str,
|
||||||
|
server_url: str,
|
||||||
|
endpoint_api_key: str,
|
||||||
):
|
):
|
||||||
self.endpoint_group_name = endpoint_group_name
|
self.endpoint_group_name = endpoint_group_name
|
||||||
self.api_key = api_key
|
self.api_key = api_key
|
||||||
self.server_url = server_url
|
self.server_url = server_url
|
||||||
self.instance = instance
|
self.endpoint_api_key = endpoint_api_key
|
||||||
self.endpoint_api_key = self._get_endpoint_api_key()
|
|
||||||
|
|
||||||
def _get_endpoint_api_key(self) -> Optional[str]:
|
|
||||||
"""Get the endpoint API key"""
|
|
||||||
endpoint_api_key = Endpoint.get_endpoint_api_key(
|
|
||||||
endpoint_name=self.endpoint_group_name,
|
|
||||||
account_api_key=self.api_key,
|
|
||||||
instance=self.instance,
|
|
||||||
)
|
|
||||||
if not endpoint_api_key:
|
|
||||||
log.error(f"Failed to get API key for endpoint {self.endpoint_group_name}")
|
|
||||||
return endpoint_api_key
|
|
||||||
|
|
||||||
def _get_worker_url(self, cost: int = DEFAULT_COST) -> Dict[str, Any]:
|
def _get_worker_url(self, cost: int = DEFAULT_COST) -> Dict[str, Any]:
|
||||||
"""Get worker URL and auth data from routing service"""
|
"""Get worker URL and auth data from routing service"""
|
||||||
@@ -554,12 +546,24 @@ def main():
|
|||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
endpoint_api_key = Endpoint.get_endpoint_api_key(
|
||||||
|
endpoint_name=args.endpoint_group_name,
|
||||||
|
account_api_key=args.api_key,
|
||||||
|
instance=args.instance,
|
||||||
|
)
|
||||||
|
|
||||||
|
if not endpoint_api_key:
|
||||||
|
log.error(
|
||||||
|
f"Could not retrieve API key for endpoint '{args.endpoint_group_name}'. Exiting."
|
||||||
|
)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
# Create the core API client
|
# Create the core API client
|
||||||
client = APIClient(
|
client = APIClient(
|
||||||
endpoint_group_name=args.endpoint_group_name,
|
endpoint_group_name=args.endpoint_group_name,
|
||||||
api_key=args.api_key,
|
api_key=args.api_key,
|
||||||
server_url=args.server_url,
|
server_url=args.server_url,
|
||||||
instance=args.instance,
|
endpoint_api_key=endpoint_api_key,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create tool manager and demo (passing the model parameter)
|
# Create tool manager and demo (passing the model parameter)
|
||||||
|
|||||||
@@ -124,7 +124,12 @@ class CompletionsData(GenericData):
|
|||||||
if not model:
|
if not model:
|
||||||
raise ValueError("MODEL_NAME environment variable not set")
|
raise ValueError("MODEL_NAME environment variable not set")
|
||||||
|
|
||||||
test_input = {"model": model, "prompt": prompt, "temperature": 0.7}
|
test_input = {
|
||||||
|
"model": model,
|
||||||
|
"prompt": prompt,
|
||||||
|
"temperature": 0.7,
|
||||||
|
"max_tokens": 500,
|
||||||
|
}
|
||||||
return cls(input=test_input)
|
return cls(input=test_input)
|
||||||
|
|
||||||
|
|
||||||
@@ -158,6 +163,7 @@ class ChatCompletionsData(GenericData):
|
|||||||
"model": model,
|
"model": model,
|
||||||
"messages": [{"role": "user", "content": prompt}],
|
"messages": [{"role": "user", "content": prompt}],
|
||||||
"temperature": 0.7,
|
"temperature": 0.7,
|
||||||
|
"max_tokens": 500,
|
||||||
}
|
}
|
||||||
return cls(input=test_input)
|
return cls(input=test_input)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user