diff --git a/lib/backend.py b/lib/backend.py index 3d4ef92..dc1f52c 100644 --- a/lib/backend.py +++ b/lib/backend.py @@ -171,7 +171,7 @@ class Backend: if self.metrics.model_metrics.wait_time > self.max_wait_time: self.metrics._request_reject(request_metrics) - return web.Response(status=500) + return web.Response(status=429) acquired = False try: diff --git a/lib/metrics.py b/lib/metrics.py index 40dcb9a..45f44f4 100644 --- a/lib/metrics.py +++ b/lib/metrics.py @@ -6,7 +6,7 @@ from asyncio import sleep from dataclasses import dataclass, asdict, field from functools import cache import asyncio -import requests +from aiohttp import ClientSession, ClientTimeout, TCPConnector, ClientResponseError from lib.data_types import AutoScalerData, SystemMetrics, ModelMetrics, RequestMetrics from typing import Awaitable, NoReturn, List @@ -38,6 +38,20 @@ class Metrics: url: str = field(default_factory=get_url) system_metrics: SystemMetrics = field(default_factory=SystemMetrics.empty) model_metrics: ModelMetrics = field(default_factory=ModelMetrics.empty) + _session: ClientSession | None = field(default=None, init=False, repr=False) + + async def http(self) -> ClientSession: + if self._session is None: + self._session = ClientSession( + timeout=ClientTimeout(total=10), + connector=TCPConnector(limit=8, limit_per_host=4, force_close=True, enable_cleanup_closed=True) + ) + return self._session + + async def aclose(self) -> None: + if self._session is not None: + await self._session.close() + self._session = None def _request_start(self, request: RequestMetrics) -> None: """ @@ -101,7 +115,7 @@ class Metrics: while True: await sleep(DELETE_REQUESTS_INTERVAL) if len(self.model_metrics.requests_deleting) > 0: - self.__send_delete_requests_and_reset() + await self.__send_delete_requests_and_reset() async def _send_metrics_loop(self) -> Awaitable[NoReturn]: while True: @@ -109,10 +123,10 @@ class Metrics: elapsed = time.time() - self.last_metric_update if self.system_metrics.model_is_loaded is False and elapsed >= 10: log.debug(f"sending loading model metrics after {int(elapsed)}s wait") - self.__send_metrics_and_reset() + await self.__send_metrics_and_reset() elif self.update_pending or elapsed > 10: log.debug(f"sending loaded model metrics after {int(elapsed)}s wait") - self.__send_metrics_and_reset() + await self.__send_metrics_and_reset() def _model_loaded(self, max_throughput: float) -> None: self.system_metrics.model_loading_time = ( @@ -130,9 +144,9 @@ class Metrics: #######################################Private####################################### - def __send_delete_requests_and_reset(self): + async def __send_delete_requests_and_reset(self): - def send_data(report_addr: str, success: bool) -> bool: + async def send_data(report_addr: str, success: bool) -> bool: data = { "worker_id": self.id, "request_idxs": [r.request_idx for r in self.model_metrics.requests_deleting if r.success == success], @@ -141,24 +155,25 @@ class Metrics: full_path = report_addr.rstrip("/") + "/delete_requests/" for attempt in range(1, 4): try: - res = requests.post(full_path, json=data, timeout=1) - res.raise_for_status() + session = await self.http() + async with session.post(full_path, json=data) as res: + res.raise_for_status() return True - except requests.Timeout: + except asyncio.TimeoutError: log.debug(f"delete_requests timed out") - except Exception as e: + except (ClientResponseError, Exception) as e: log.debug(f"delete_requests failed with error: {e}") - asyncio.sleep(2) + await asyncio.sleep(2) log.debug(f"retrying delete_request, attempt: {attempt}") for report_addr in self.report_addr: - success = send_data(report_addr, success=True) and send_data(report_addr, success=False) + success = await send_data(report_addr, success=True) and await send_data(report_addr, success=False) if success is True: self.model_metrics.requests_deleting.clear() break - def __send_metrics_and_reset(self): + async def __send_metrics_and_reset(self): def compute_autoscaler_data() -> AutoScalerData: return AutoScalerData( @@ -180,7 +195,7 @@ class Metrics: url=self.url, ) - def send_data(report_addr: str) -> bool: + async def send_data(report_addr: str) -> bool: data = compute_autoscaler_data() full_path = report_addr.rstrip("/") + "/worker_status/" log.debug( @@ -195,14 +210,15 @@ class Metrics: ) for attempt in range(1, 4): try: - res = requests.post(full_path, json=asdict(data), timeout=1) - res.raise_for_status() + session = await self.http() + async with session.post(full_path, json=asdict(data)) as res: + res.raise_for_status() return True - except requests.Timeout: + except asyncio.TimeoutError: log.debug(f"autoscaler status update timed out") - except Exception as e: + except (ClientResponseError, Exception) as e: log.debug(f"autoscaler status update failed with error: {e}") - asyncio.sleep(2) + await asyncio.sleep(2) log.debug(f"retrying autoscaler status update, attempt: {attempt}") log.debug(f"failed to send update through {report_addr}") return False @@ -212,7 +228,7 @@ class Metrics: self.system_metrics.update_disk_usage() for report_addr in self.report_addr: - success = send_data(report_addr) + success = await send_data(report_addr) if success is True: break self.update_pending = False