asyncio in metrics

This commit is contained in:
Lucas Armand
2025-10-23 10:18:31 -07:00
parent 0f13506938
commit 37ad3f8d46
2 changed files with 37 additions and 21 deletions
+1 -1
View File
@@ -171,7 +171,7 @@ class Backend:
if self.metrics.model_metrics.wait_time > self.max_wait_time:
self.metrics._request_reject(request_metrics)
return web.Response(status=500)
return web.Response(status=429)
acquired = False
try:
+34 -18
View File
@@ -6,7 +6,7 @@ from asyncio import sleep
from dataclasses import dataclass, asdict, field
from functools import cache
import asyncio
import requests
from aiohttp import ClientSession, ClientTimeout, TCPConnector, ClientResponseError
from lib.data_types import AutoScalerData, SystemMetrics, ModelMetrics, RequestMetrics
from typing import Awaitable, NoReturn, List
@@ -38,6 +38,20 @@ class Metrics:
url: str = field(default_factory=get_url)
system_metrics: SystemMetrics = field(default_factory=SystemMetrics.empty)
model_metrics: ModelMetrics = field(default_factory=ModelMetrics.empty)
_session: ClientSession | None = field(default=None, init=False, repr=False)
async def http(self) -> ClientSession:
if self._session is None:
self._session = ClientSession(
timeout=ClientTimeout(total=10),
connector=TCPConnector(limit=8, limit_per_host=4, force_close=True, enable_cleanup_closed=True)
)
return self._session
async def aclose(self) -> None:
if self._session is not None:
await self._session.close()
self._session = None
def _request_start(self, request: RequestMetrics) -> None:
"""
@@ -101,7 +115,7 @@ class Metrics:
while True:
await sleep(DELETE_REQUESTS_INTERVAL)
if len(self.model_metrics.requests_deleting) > 0:
self.__send_delete_requests_and_reset()
await self.__send_delete_requests_and_reset()
async def _send_metrics_loop(self) -> Awaitable[NoReturn]:
while True:
@@ -109,10 +123,10 @@ class Metrics:
elapsed = time.time() - self.last_metric_update
if self.system_metrics.model_is_loaded is False and elapsed >= 10:
log.debug(f"sending loading model metrics after {int(elapsed)}s wait")
self.__send_metrics_and_reset()
await self.__send_metrics_and_reset()
elif self.update_pending or elapsed > 10:
log.debug(f"sending loaded model metrics after {int(elapsed)}s wait")
self.__send_metrics_and_reset()
await self.__send_metrics_and_reset()
def _model_loaded(self, max_throughput: float) -> None:
self.system_metrics.model_loading_time = (
@@ -130,9 +144,9 @@ class Metrics:
#######################################Private#######################################
def __send_delete_requests_and_reset(self):
async def __send_delete_requests_and_reset(self):
def send_data(report_addr: str, success: bool) -> bool:
async def send_data(report_addr: str, success: bool) -> bool:
data = {
"worker_id": self.id,
"request_idxs": [r.request_idx for r in self.model_metrics.requests_deleting if r.success == success],
@@ -141,24 +155,25 @@ class Metrics:
full_path = report_addr.rstrip("/") + "/delete_requests/"
for attempt in range(1, 4):
try:
res = requests.post(full_path, json=data, timeout=1)
session = await self.http()
async with session.post(full_path, json=data) as res:
res.raise_for_status()
return True
except requests.Timeout:
except asyncio.TimeoutError:
log.debug(f"delete_requests timed out")
except Exception as e:
except (ClientResponseError, Exception) as e:
log.debug(f"delete_requests failed with error: {e}")
asyncio.sleep(2)
await asyncio.sleep(2)
log.debug(f"retrying delete_request, attempt: {attempt}")
for report_addr in self.report_addr:
success = send_data(report_addr, success=True) and send_data(report_addr, success=False)
success = await send_data(report_addr, success=True) and await send_data(report_addr, success=False)
if success is True:
self.model_metrics.requests_deleting.clear()
break
def __send_metrics_and_reset(self):
async def __send_metrics_and_reset(self):
def compute_autoscaler_data() -> AutoScalerData:
return AutoScalerData(
@@ -180,7 +195,7 @@ class Metrics:
url=self.url,
)
def send_data(report_addr: str) -> bool:
async def send_data(report_addr: str) -> bool:
data = compute_autoscaler_data()
full_path = report_addr.rstrip("/") + "/worker_status/"
log.debug(
@@ -195,14 +210,15 @@ class Metrics:
)
for attempt in range(1, 4):
try:
res = requests.post(full_path, json=asdict(data), timeout=1)
session = await self.http()
async with session.post(full_path, json=asdict(data)) as res:
res.raise_for_status()
return True
except requests.Timeout:
except asyncio.TimeoutError:
log.debug(f"autoscaler status update timed out")
except Exception as e:
except (ClientResponseError, Exception) as e:
log.debug(f"autoscaler status update failed with error: {e}")
asyncio.sleep(2)
await asyncio.sleep(2)
log.debug(f"retrying autoscaler status update, attempt: {attempt}")
log.debug(f"failed to send update through {report_addr}")
return False
@@ -212,7 +228,7 @@ class Metrics:
self.system_metrics.update_disk_usage()
for report_addr in self.report_addr:
success = send_data(report_addr)
success = await send_data(report_addr)
if success is True:
break
self.update_pending = False