From 4016cf9a53d04ab32f22a06abb705c72ce474169 Mon Sep 17 00:00:00 2001 From: Nader Arbabian Date: Fri, 8 Aug 2025 17:01:21 -0700 Subject: [PATCH] redo metrics tracking for requests, fixes bug wherere some requests were marked as pending, even though they had finished (#24) --- lib/backend.py | 23 +++++++---------------- lib/data_types.py | 11 +++++++---- lib/metrics.py | 22 ++++++++++------------ requirements.txt | 4 ++-- 4 files changed, 26 insertions(+), 34 deletions(-) diff --git a/lib/backend.py b/lib/backend.py index 117ea6d..77147a6 100644 --- a/lib/backend.py +++ b/lib/backend.py @@ -126,7 +126,7 @@ class Backend: async def cancel_api_call_if_disconnected() -> web.Response: await request.wait_for_disconnection() log.debug(f"request with reqnum: {auth_data.reqnum} was canceled") - self.metrics._request_canceled(workload=workload, reqnum=auth_data.reqnum) + self.metrics._request_canceled(workload=workload) return web.Response(status=500) async def make_request() -> Union[web.Response, web.StreamResponse]: @@ -141,7 +141,6 @@ class Backend: else: log.debug(f"Starting request for reqnum:{auth_data.reqnum}") try: - start_time = time.time() response = await self.__call_api(handler=handler, payload=payload) status_code = response.status log.debug( @@ -153,19 +152,17 @@ class Backend: ) ) res = await handler.generate_client_response(request, response) - self.metrics._request_end( - workload=workload, - req_response_time=time.time() - start_time, - reqnum=auth_data.reqnum, - ) + self.metrics._request_success(workload=workload) return res except requests.exceptions.RequestException as e: log.debug(f"[backend] Request error: {e}") - self.metrics._request_errored( - workload=workload, reqnum=auth_data.reqnum - ) + self.metrics._request_errored(workload=workload) return web.Response(status=500) finally: + self.metrics._request_end( + workload=workload, + reqnum=auth_data.reqnum, + ) self.sem.release() ########### @@ -186,12 +183,6 @@ class Backend: except Exception as e: log.debug(f"Exception in main handler loop {e}") return web.Response(status=500) - finally: - if request.task.cancelled(): - log.debug(f"request with reqnum: {auth_data.reqnum} was canceled") - self.metrics._request_canceled( - workload=workload, reqnum=auth_data.reqnum - ) async def __healthcheck(self): health_check_url = self.benchmark_handler.healthcheck_endpoint diff --git a/lib/data_types.py b/lib/data_types.py index 9af8d03..ed8b9f4 100644 --- a/lib/data_types.py +++ b/lib/data_types.py @@ -8,7 +8,6 @@ from aiohttp import web, ClientResponse import inspect import psutil -import requests """ @@ -206,13 +205,13 @@ class ModelMetrics: workload_received: float workload_cancelled: float workload_errored: float - workload_pending: float # these are not - cur_perf: float + workload_pending: float error_msg: Optional[str] max_throughput: float requests_recieved: Set[int] = field(default_factory=set) requests_working: Set[int] = field(default_factory=set) + last_update: float = field(default_factory=time.time) @classmethod def empty(cls): @@ -221,12 +220,15 @@ class ModelMetrics: workload_served=0.0, workload_cancelled=0.0, workload_errored=0.0, - cur_perf=0.0, workload_received=0.0, error_msg=None, max_throughput=0.0, ) + @property + def cur_perf(self) -> float: + return max(self.workload_served / (time.time() - self.last_update), 0.0) + @property def workload_processing(self) -> float: return max(self.workload_received - self.workload_cancelled, 0.0) @@ -240,6 +242,7 @@ class ModelMetrics: self.workload_received = 0 self.workload_cancelled = 0 self.workload_errored = 0 + self.last_update = time.time() @dataclass diff --git a/lib/metrics.py b/lib/metrics.py index ebf3140..4bbdabb 100644 --- a/lib/metrics.py +++ b/lib/metrics.py @@ -46,33 +46,31 @@ class Metrics: self.model_metrics.requests_recieved.add(reqnum) self.model_metrics.requests_working.add(reqnum) - def _request_end( - self, workload: float, req_response_time: float, reqnum: int - ) -> None: + def _request_end(self, workload: float, reqnum: int) -> None: """ - this function is called after a response from model API is received. + this function is called after handling of a request ends, regardless of the outcome """ - self.model_metrics.workload_served += workload self.model_metrics.workload_pending -= workload self.model_metrics.requests_working.discard(reqnum) - self.model_metrics.cur_perf = workload / req_response_time + + def _request_success(self, workload: float) -> None: + """ + this function is called after a response from model API is received and forwarded. + """ + self.model_metrics.workload_served += workload self.update_pending = True - def _request_errored(self, workload: float, reqnum: int) -> None: + def _request_errored(self, workload: float) -> None: """ this function is called if model API returns an error """ - self.model_metrics.workload_pending -= workload self.model_metrics.workload_errored += workload - self.model_metrics.requests_working.discard(reqnum) - def _request_canceled(self, workload: float, reqnum: int) -> None: + def _request_canceled(self, workload: float) -> None: """ this function is called if client drops connection before model API has responded """ - self.model_metrics.workload_pending -= workload self.model_metrics.workload_cancelled += workload - self.model_metrics.requests_working.discard(reqnum) async def _send_metrics_loop(self) -> Awaitable[NoReturn]: while True: diff --git a/requirements.txt b/requirements.txt index 007aebc..1d99304 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -aiohttp==3.10.1 +aiohttp[speedups]==3.10.1 anyio~=4.4 lib~=4.0 nltk~=3.9 @@ -6,5 +6,5 @@ psutil~=6.0 pycryptodome~=3.20 Requests~=2.32 transformers~=4.52 -utils~=1.0 +utils==1.0.* hf_transfer>=0.1.9