diff --git a/lib/backend.py b/lib/backend.py index e55ce59..d2ac11c 100644 --- a/lib/backend.py +++ b/lib/backend.py @@ -59,6 +59,8 @@ class Backend: ) log_actions: List[Tuple[LogAction, str]] max_wait_time: float = 10.0 + request_queue = asyncio.Queue() + worker_task = asyncio.create_task(_worker()) reqnum = -1 version = VERSION msg_history = [] @@ -91,6 +93,17 @@ class Backend: timeout = ClientTimeout(total=None) return ClientSession(self.model_server_url, timeout=timeout, connector=connector) + async def _worker(self): + while True: + handler, request, fut = await self.request_queue.get() + try: + res = await self.__process_request(handler, request) + fut.set_result(res) + except Exception as e: + fut.set_exception(e) + finally: + self.request_queue.task_done() + def create_handler( self, handler: EndpointHandler[ApiPayload_T], diff --git a/lib/metrics.py b/lib/metrics.py index 3022a8e..de5490b 100644 --- a/lib/metrics.py +++ b/lib/metrics.py @@ -145,14 +145,15 @@ class Metrics: #######################################Private####################################### async def __send_delete_requests_and_reset(self): - - async def send_data(report_addr: str, success: bool) -> bool: + async def post(report_addr: str, idxs: list[int], success_flag: bool) -> bool: data = { "worker_id": self.id, - "request_idxs": [r.request_idx for r in self.model_metrics.requests_deleting if r.success == success], - "success": success + "request_idxs": idxs, + "success": success_flag, } - log.debug(f"Deleting requests that {'succeeded' if success else 'failed'}: {data['request_idxs']}") + log.debug( + f"Deleting requests that {'succeeded' if success_flag else 'failed'}: {data['request_idxs']}" + ) full_path = report_addr.rstrip("/") + "/delete_requests/" for attempt in range(1, 4): try: @@ -162,16 +163,38 @@ class Metrics: res.raise_for_status() return True except asyncio.TimeoutError: - log.debug(f"delete_requests timed out") + log.debug("delete_requests timed out") except (ClientResponseError, Exception) as e: log.debug(f"delete_requests failed with error: {e}") await asyncio.sleep(2) log.debug(f"retrying delete_request, attempt: {attempt}") + return False + + # Take a snapshot of what we plan to send this tick. + # New arrivals after this snapshot will remain in the queue for the next tick. + snapshot = list(self.model_metrics.requests_deleting) + success_idxs = [r.request_idx for r in snapshot if r.success is True] + failed_idxs = [r.request_idx for r in snapshot if r.success is False] + + if not success_idxs and not failed_idxs: + return # nothing to do for report_addr in self.report_addr: - success = await send_data(report_addr, success=True) and await send_data(report_addr, success=False) - if success is True: - self.model_metrics.requests_deleting.clear() + sent_success = True + sent_failed = True + + if success_idxs: + sent_success = await post(report_addr, success_idxs, True) + if failed_idxs: + sent_failed = await post(report_addr, failed_idxs, False) + + if sent_success and sent_failed: + # Remove only the items we actually sent from the live queue. + sent_set = set(success_idxs) | set(failed_idxs) + self.model_metrics.requests_deleting[:] = [ + r for r in self.model_metrics.requests_deleting + if r.request_idx not in sent_set + ] break