Files
pyworker/lib/metrics.py
T

269 lines
11 KiB
Python
Raw Normal View History

2024-09-04 11:19:30 -07:00
import os
import time
import logging
import json
from asyncio import sleep
from dataclasses import dataclass, asdict, field
from functools import cache
2025-10-21 18:52:13 -07:00
import asyncio
2025-10-23 10:18:31 -07:00
from aiohttp import ClientSession, ClientTimeout, TCPConnector, ClientResponseError
2024-09-04 11:19:30 -07:00
2025-10-08 16:54:18 -07:00
from lib.data_types import AutoScalerData, SystemMetrics, ModelMetrics, RequestMetrics
2024-09-04 11:53:45 -07:00
from typing import Awaitable, NoReturn, List
2024-09-04 11:19:30 -07:00
METRICS_UPDATE_INTERVAL = 1
2025-10-08 16:54:18 -07:00
DELETE_REQUESTS_INTERVAL = 1
2024-09-04 11:19:30 -07:00
log = logging.getLogger(__file__)
@cache
def get_url() -> str:
use_ssl = os.environ.get("USE_SSL", "false") == "true"
worker_port = os.environ[f"VAST_TCP_PORT_{os.environ['WORKER_PORT']}"]
public_ip = os.environ["PUBLIC_IPADDR"]
return f"http{'s' if use_ssl else ''}://{public_ip}:{worker_port}"
@dataclass
class Metrics:
2025-10-21 15:42:43 -07:00
version: str = "0"
2024-09-04 11:19:30 -07:00
last_metric_update: float = 0.0
2025-10-08 16:54:18 -07:00
last_request_served: float = 0.0
2024-09-04 11:19:30 -07:00
update_pending: bool = False
id: int = field(default_factory=lambda: int(os.environ["CONTAINER_ID"]))
2024-09-04 11:53:45 -07:00
report_addr: List[str] = field(
default_factory=lambda: os.environ["REPORT_ADDR"].split(",")
)
2024-09-04 11:19:30 -07:00
url: str = field(default_factory=get_url)
system_metrics: SystemMetrics = field(default_factory=SystemMetrics.empty)
model_metrics: ModelMetrics = field(default_factory=ModelMetrics.empty)
2025-10-23 10:18:31 -07:00
_session: ClientSession | None = field(default=None, init=False, repr=False)
async def http(self) -> ClientSession:
if self._session is None:
self._session = ClientSession(
timeout=ClientTimeout(total=10),
connector=TCPConnector(limit=8, limit_per_host=4, force_close=True, enable_cleanup_closed=True)
)
return self._session
async def aclose(self) -> None:
if self._session is not None:
await self._session.close()
self._session = None
2024-09-04 11:19:30 -07:00
2025-10-08 16:54:18 -07:00
def _request_start(self, request: RequestMetrics) -> None:
2024-09-04 11:19:30 -07:00
"""
this function is called prior to forwarding a request to a model API.
"""
log.debug("request start")
2025-10-08 16:54:18 -07:00
request.status = "Started"
self.model_metrics.workload_pending += request.workload
self.model_metrics.workload_received += request.workload
self.model_metrics.requests_recieved.add(request.reqnum)
self.model_metrics.requests_working[request.reqnum] = request
self.update_pending = True
2024-09-04 11:19:30 -07:00
2025-10-08 16:54:18 -07:00
def _request_end(self, request: RequestMetrics) -> None:
2024-09-04 11:19:30 -07:00
"""
this function is called after handling of a request ends, regardless of the outcome
2024-09-04 11:19:30 -07:00
"""
2025-10-08 16:54:18 -07:00
self.model_metrics.workload_pending -= request.workload
self.model_metrics.requests_working.pop(request.reqnum, None)
self.model_metrics.requests_deleting.append(request)
self.last_request_served = time.time()
2025-10-08 16:54:18 -07:00
def _request_success(self, request: RequestMetrics) -> None:
"""
this function is called after a response from model API is received and forwarded.
"""
2025-10-08 16:54:18 -07:00
self.model_metrics.workload_served += request.workload
request.status = "Success"
2025-10-22 10:18:59 -07:00
request.success = True
2024-09-04 11:19:30 -07:00
self.update_pending = True
2025-10-08 16:54:18 -07:00
def _request_errored(self, request: RequestMetrics) -> None:
2024-09-04 11:19:30 -07:00
"""
this function is called if model API returns an error
"""
2025-10-08 16:54:18 -07:00
self.model_metrics.workload_errored += request.workload
request.status = "Error"
2025-10-22 10:18:59 -07:00
request.success = False
2025-10-08 16:54:18 -07:00
self.update_pending = True
2024-09-04 11:19:30 -07:00
2025-10-08 16:54:18 -07:00
def _request_canceled(self, request: RequestMetrics) -> None:
2024-09-04 11:19:30 -07:00
"""
this function is called if client drops connection before model API has responded
"""
2025-10-08 16:54:18 -07:00
self.model_metrics.workload_cancelled += request.workload
2025-10-22 10:18:59 -07:00
request.success = True
2025-10-08 16:54:18 -07:00
request.status = "Cancelled"
def _request_reject(self, request: RequestMetrics):
"""
this function is called if the current wait time for the model is above max_wait_time
"""
self.model_metrics.requests_recieved.add(request.reqnum)
self.model_metrics.requests_deleting.append(request)
self.model_metrics.workload_rejected += request.workload
2025-10-22 10:18:59 -07:00
request.success = False
2025-10-08 16:54:18 -07:00
request.status = "Rejected"
self.update_pending = True
async def _send_delete_requests_loop(self) -> Awaitable[NoReturn]:
while True:
await sleep(DELETE_REQUESTS_INTERVAL)
if len(self.model_metrics.requests_deleting) > 0:
2025-10-23 10:18:31 -07:00
await self.__send_delete_requests_and_reset()
2024-09-04 11:19:30 -07:00
async def _send_metrics_loop(self) -> Awaitable[NoReturn]:
while True:
await sleep(METRICS_UPDATE_INTERVAL)
elapsed = time.time() - self.last_metric_update
if self.system_metrics.model_is_loaded is False and elapsed >= 10:
log.debug(f"sending loading model metrics after {int(elapsed)}s wait")
2025-10-23 10:18:31 -07:00
await self.__send_metrics_and_reset()
2024-09-04 11:19:30 -07:00
elif self.update_pending or elapsed > 10:
log.debug(f"sending loaded model metrics after {int(elapsed)}s wait")
2025-10-23 10:18:31 -07:00
await self.__send_metrics_and_reset()
2024-09-04 11:19:30 -07:00
def _model_loaded(self, max_throughput: float) -> None:
self.system_metrics.model_loading_time = (
time.time() - self.system_metrics.model_loading_start
)
self.system_metrics.model_is_loaded = True
self.model_metrics.max_throughput = max_throughput
def _model_errored(self, error_msg: str) -> None:
self.model_metrics.set_errored(error_msg)
self.system_metrics.model_is_loaded = True
2025-10-21 15:42:43 -07:00
def _set_version(self, version: str) -> None:
self.version = version
2024-09-04 11:19:30 -07:00
#######################################Private#######################################
2025-10-23 10:18:31 -07:00
async def __send_delete_requests_and_reset(self):
2025-10-27 16:57:52 -07:00
async def post(report_addr: str, idxs: list[int], success_flag: bool) -> bool:
2025-10-08 16:54:18 -07:00
data = {
"worker_id": self.id,
2025-10-27 16:57:52 -07:00
"request_idxs": idxs,
"success": success_flag,
2025-10-08 16:54:18 -07:00
}
2025-10-27 16:57:52 -07:00
log.debug(
f"Deleting requests that {'succeeded' if success_flag else 'failed'}: {data['request_idxs']}"
)
2025-10-08 16:54:18 -07:00
full_path = report_addr.rstrip("/") + "/delete_requests/"
for attempt in range(1, 4):
try:
2025-10-23 10:18:31 -07:00
session = await self.http()
async with session.post(full_path, json=data) as res:
2025-10-24 15:41:00 -07:00
log.debug(f"delete_requests response: {res.status}")
2025-10-23 10:18:31 -07:00
res.raise_for_status()
2025-10-08 16:54:18 -07:00
return True
2025-10-23 10:18:31 -07:00
except asyncio.TimeoutError:
2025-10-27 16:57:52 -07:00
log.debug("delete_requests timed out")
2025-10-23 10:18:31 -07:00
except (ClientResponseError, Exception) as e:
2025-10-08 16:54:18 -07:00
log.debug(f"delete_requests failed with error: {e}")
2025-10-23 10:18:31 -07:00
await asyncio.sleep(2)
2025-10-08 16:54:18 -07:00
log.debug(f"retrying delete_request, attempt: {attempt}")
2025-10-27 16:57:52 -07:00
return False
# Take a snapshot of what we plan to send this tick.
# New arrivals after this snapshot will remain in the queue for the next tick.
snapshot = list(self.model_metrics.requests_deleting)
success_idxs = [r.request_idx for r in snapshot if r.success is True]
failed_idxs = [r.request_idx for r in snapshot if r.success is False]
if not success_idxs and not failed_idxs:
return # nothing to do
2025-10-08 16:54:18 -07:00
for report_addr in self.report_addr:
2025-10-27 16:57:52 -07:00
sent_success = True
sent_failed = True
if success_idxs:
sent_success = await post(report_addr, success_idxs, True)
if failed_idxs:
sent_failed = await post(report_addr, failed_idxs, False)
if sent_success and sent_failed:
# Remove only the items we actually sent from the live queue.
sent_set = set(success_idxs) | set(failed_idxs)
self.model_metrics.requests_deleting[:] = [
r for r in self.model_metrics.requests_deleting
if r.request_idx not in sent_set
]
2025-10-08 16:54:18 -07:00
break
2025-10-23 10:18:31 -07:00
async def __send_metrics_and_reset(self):
2025-10-08 16:54:18 -07:00
2025-10-27 18:25:21 -07:00
loadtime_snapshot = self.system_metrics.model_loading_time
2025-10-08 16:54:18 -07:00
def compute_autoscaler_data() -> AutoScalerData:
return AutoScalerData(
2024-09-04 11:19:30 -07:00
id=self.id,
2025-10-21 15:42:43 -07:00
version=self.version,
2025-10-27 18:25:21 -07:00
loadtime=(loadtime_snapshot or 0.0),
2025-10-08 16:54:18 -07:00
new_load=self.model_metrics.workload_processing,
cur_load=self.model_metrics.cur_load,
rej_load=self.model_metrics.workload_rejected,
2024-09-04 11:19:30 -07:00
max_perf=self.model_metrics.max_throughput,
2025-10-08 16:54:18 -07:00
cur_perf=self.model_metrics.workload_served,
2024-09-04 11:19:30 -07:00
error_msg=self.model_metrics.error_msg or "",
num_requests_working=len(self.model_metrics.requests_working),
num_requests_recieved=len(self.model_metrics.requests_recieved),
additional_disk_usage=self.system_metrics.additional_disk_usage,
2025-10-08 16:54:18 -07:00
working_request_idxs=self.model_metrics.working_request_idxs,
2024-09-04 11:19:30 -07:00
cur_capacity=0,
max_capacity=0,
url=self.url,
)
2025-10-23 10:18:31 -07:00
async def send_data(report_addr: str) -> bool:
2024-09-04 11:19:30 -07:00
data = compute_autoscaler_data()
full_path = report_addr.rstrip("/") + "/worker_status/"
2024-09-04 11:19:30 -07:00
log.debug(
"\n".join(
[
"#" * 60,
f"sending data to autoscaler",
f"{json.dumps((asdict(data)), indent=2)}",
"#" * 60,
]
)
)
for attempt in range(1, 4):
try:
2025-10-23 10:18:31 -07:00
session = await self.http()
async with session.post(full_path, json=asdict(data)) as res:
res.raise_for_status()
return True
2025-10-23 10:18:31 -07:00
except asyncio.TimeoutError:
2024-09-04 11:19:30 -07:00
log.debug(f"autoscaler status update timed out")
2025-10-23 10:18:31 -07:00
except (ClientResponseError, Exception) as e:
2024-09-04 11:19:30 -07:00
log.debug(f"autoscaler status update failed with error: {e}")
2025-10-23 10:18:31 -07:00
await asyncio.sleep(2)
2024-09-04 11:19:30 -07:00
log.debug(f"retrying autoscaler status update, attempt: {attempt}")
log.debug(f"failed to send update through {report_addr}")
return False
2024-09-04 11:19:30 -07:00
###########
self.system_metrics.update_disk_usage()
2024-09-04 11:53:45 -07:00
2025-10-27 18:25:21 -07:00
sent = False
2024-09-04 11:53:45 -07:00
for report_addr in self.report_addr:
2025-10-27 18:25:21 -07:00
if await send_data(report_addr):
sent = True
break
2025-10-27 18:25:21 -07:00
if sent:
# clear the one-shot loadtime only if we actually sent *this* value
self.system_metrics.reset(expected=loadtime_snapshot)
self.update_pending = False
self.model_metrics.reset()
self.last_metric_update = time.time()