Compare commits

...

1 Commits

4 changed files with 26 additions and 34 deletions
+7 -16
View File
@@ -126,7 +126,7 @@ class Backend:
async def cancel_api_call_if_disconnected() -> web.Response:
await request.wait_for_disconnection()
log.debug(f"request with reqnum: {auth_data.reqnum} was canceled")
self.metrics._request_canceled(workload=workload, reqnum=auth_data.reqnum)
self.metrics._request_canceled(workload=workload)
return web.Response(status=500)
async def make_request() -> Union[web.Response, web.StreamResponse]:
@@ -141,7 +141,6 @@ class Backend:
else:
log.debug(f"Starting request for reqnum:{auth_data.reqnum}")
try:
start_time = time.time()
response = await self.__call_api(handler=handler, payload=payload)
status_code = response.status
log.debug(
@@ -153,19 +152,17 @@ class Backend:
)
)
res = await handler.generate_client_response(request, response)
self.metrics._request_end(
workload=workload,
req_response_time=time.time() - start_time,
reqnum=auth_data.reqnum,
)
self.metrics._request_success(workload=workload)
return res
except requests.exceptions.RequestException as e:
log.debug(f"[backend] Request error: {e}")
self.metrics._request_errored(
workload=workload, reqnum=auth_data.reqnum
)
self.metrics._request_errored(workload=workload)
return web.Response(status=500)
finally:
self.metrics._request_end(
workload=workload,
reqnum=auth_data.reqnum,
)
self.sem.release()
###########
@@ -186,12 +183,6 @@ class Backend:
except Exception as e:
log.debug(f"Exception in main handler loop {e}")
return web.Response(status=500)
finally:
if request.task.cancelled():
log.debug(f"request with reqnum: {auth_data.reqnum} was canceled")
self.metrics._request_canceled(
workload=workload, reqnum=auth_data.reqnum
)
async def __healthcheck(self):
health_check_url = self.benchmark_handler.healthcheck_endpoint
+7 -4
View File
@@ -8,7 +8,6 @@ from aiohttp import web, ClientResponse
import inspect
import psutil
import requests
"""
@@ -206,13 +205,13 @@ class ModelMetrics:
workload_received: float
workload_cancelled: float
workload_errored: float
workload_pending: float
# these are not
cur_perf: float
workload_pending: float
error_msg: Optional[str]
max_throughput: float
requests_recieved: Set[int] = field(default_factory=set)
requests_working: Set[int] = field(default_factory=set)
last_update: float = field(default_factory=time.time)
@classmethod
def empty(cls):
@@ -221,12 +220,15 @@ class ModelMetrics:
workload_served=0.0,
workload_cancelled=0.0,
workload_errored=0.0,
cur_perf=0.0,
workload_received=0.0,
error_msg=None,
max_throughput=0.0,
)
@property
def cur_perf(self) -> float:
return max(self.workload_served / (time.time() - self.last_update), 0.0)
@property
def workload_processing(self) -> float:
return max(self.workload_received - self.workload_cancelled, 0.0)
@@ -240,6 +242,7 @@ class ModelMetrics:
self.workload_received = 0
self.workload_cancelled = 0
self.workload_errored = 0
self.last_update = time.time()
@dataclass
+10 -12
View File
@@ -46,33 +46,31 @@ class Metrics:
self.model_metrics.requests_recieved.add(reqnum)
self.model_metrics.requests_working.add(reqnum)
def _request_end(
self, workload: float, req_response_time: float, reqnum: int
) -> None:
def _request_end(self, workload: float, reqnum: int) -> None:
"""
this function is called after a response from model API is received.
this function is called after handling of a request ends, regardless of the outcome
"""
self.model_metrics.workload_served += workload
self.model_metrics.workload_pending -= workload
self.model_metrics.requests_working.discard(reqnum)
self.model_metrics.cur_perf = workload / req_response_time
def _request_success(self, workload: float) -> None:
"""
this function is called after a response from model API is received and forwarded.
"""
self.model_metrics.workload_served += workload
self.update_pending = True
def _request_errored(self, workload: float, reqnum: int) -> None:
def _request_errored(self, workload: float) -> None:
"""
this function is called if model API returns an error
"""
self.model_metrics.workload_pending -= workload
self.model_metrics.workload_errored += workload
self.model_metrics.requests_working.discard(reqnum)
def _request_canceled(self, workload: float, reqnum: int) -> None:
def _request_canceled(self, workload: float) -> None:
"""
this function is called if client drops connection before model API has responded
"""
self.model_metrics.workload_pending -= workload
self.model_metrics.workload_cancelled += workload
self.model_metrics.requests_working.discard(reqnum)
async def _send_metrics_loop(self) -> Awaitable[NoReturn]:
while True:
+2 -2
View File
@@ -1,4 +1,4 @@
aiohttp==3.10.1
aiohttp[speedups]==3.10.1
anyio~=4.4
lib~=4.0
nltk~=3.9
@@ -6,5 +6,5 @@ psutil~=6.0
pycryptodome~=3.20
Requests~=2.32
transformers~=4.52
utils~=1.0
utils==1.0.*
hf_transfer>=0.1.9