Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d63a060202 | |||
| c6521cb6d4 | |||
| b7fe4ebb91 | |||
| 8ae7b74605 | |||
| 106067d716 | |||
| f5134d4bf5 | |||
| 47e5460532 |
+5
-1
@@ -30,7 +30,7 @@ from lib.data_types import (
|
|||||||
BenchmarkResult
|
BenchmarkResult
|
||||||
)
|
)
|
||||||
|
|
||||||
VERSION = "0.1.0"
|
VERSION = "0.2.0"
|
||||||
|
|
||||||
MSG_HISTORY_LEN = 100
|
MSG_HISTORY_LEN = 100
|
||||||
log = logging.getLogger(__file__)
|
log = logging.getLogger(__file__)
|
||||||
@@ -69,10 +69,14 @@ class Backend:
|
|||||||
report_addr: str = dataclasses.field(
|
report_addr: str = dataclasses.field(
|
||||||
default_factory=lambda: os.environ.get("REPORT_ADDR", "https://run.vast.ai")
|
default_factory=lambda: os.environ.get("REPORT_ADDR", "https://run.vast.ai")
|
||||||
)
|
)
|
||||||
|
mtoken: str = dataclasses.field(
|
||||||
|
default_factory=lambda: os.environ.get("MASTER_TOKEN", "")
|
||||||
|
)
|
||||||
|
|
||||||
def __post_init__(self):
|
def __post_init__(self):
|
||||||
self.metrics = Metrics()
|
self.metrics = Metrics()
|
||||||
self.metrics._set_version(self.version)
|
self.metrics._set_version(self.version)
|
||||||
|
self.metrics._set_mtoken(self.mtoken)
|
||||||
self._total_pubkey_fetch_errors = 0
|
self._total_pubkey_fetch_errors = 0
|
||||||
self._pubkey = self._fetch_pubkey()
|
self._pubkey = self._fetch_pubkey()
|
||||||
self.__start_healthcheck: bool = False
|
self.__start_healthcheck: bool = False
|
||||||
|
|||||||
@@ -286,6 +286,7 @@ class AutoScalerData:
|
|||||||
"""Data that is reported to autoscaler"""
|
"""Data that is reported to autoscaler"""
|
||||||
|
|
||||||
id: int
|
id: int
|
||||||
|
mtoken: str
|
||||||
version: str
|
version: str
|
||||||
loadtime: float
|
loadtime: float
|
||||||
cur_load: float
|
cur_load: float
|
||||||
|
|||||||
+16
-2
@@ -28,6 +28,7 @@ def get_url() -> str:
|
|||||||
@dataclass
|
@dataclass
|
||||||
class Metrics:
|
class Metrics:
|
||||||
version: str = "0"
|
version: str = "0"
|
||||||
|
mtoken: str = ""
|
||||||
last_metric_update: float = 0.0
|
last_metric_update: float = 0.0
|
||||||
last_request_served: float = 0.0
|
last_request_served: float = 0.0
|
||||||
update_pending: bool = False
|
update_pending: bool = False
|
||||||
@@ -142,12 +143,16 @@ class Metrics:
|
|||||||
def _set_version(self, version: str) -> None:
|
def _set_version(self, version: str) -> None:
|
||||||
self.version = version
|
self.version = version
|
||||||
|
|
||||||
|
def _set_mtoken(self, mtoken: str) -> None:
|
||||||
|
self.mtoken = mtoken
|
||||||
|
|
||||||
#######################################Private#######################################
|
#######################################Private#######################################
|
||||||
|
|
||||||
async def __send_delete_requests_and_reset(self):
|
async def __send_delete_requests_and_reset(self):
|
||||||
async def post(report_addr: str, idxs: list[int], success_flag: bool) -> bool:
|
async def post(report_addr: str, idxs: list[int], success_flag: bool) -> bool:
|
||||||
data = {
|
data = {
|
||||||
"worker_id": self.id,
|
"worker_id": self.id,
|
||||||
|
"mtoken": self.mtoken,
|
||||||
"request_idxs": idxs,
|
"request_idxs": idxs,
|
||||||
"success": success_flag,
|
"success": success_flag,
|
||||||
}
|
}
|
||||||
@@ -209,6 +214,7 @@ class Metrics:
|
|||||||
def compute_autoscaler_data() -> AutoScalerData:
|
def compute_autoscaler_data() -> AutoScalerData:
|
||||||
return AutoScalerData(
|
return AutoScalerData(
|
||||||
id=self.id,
|
id=self.id,
|
||||||
|
mtoken=self.mtoken,
|
||||||
version=self.version,
|
version=self.version,
|
||||||
loadtime=(loadtime_snapshot or 0.0),
|
loadtime=(loadtime_snapshot or 0.0),
|
||||||
new_load=self.model_metrics.workload_processing,
|
new_load=self.model_metrics.workload_processing,
|
||||||
@@ -228,17 +234,25 @@ class Metrics:
|
|||||||
|
|
||||||
async def send_data(report_addr: str) -> bool:
|
async def send_data(report_addr: str) -> bool:
|
||||||
data = compute_autoscaler_data()
|
data = compute_autoscaler_data()
|
||||||
full_path = report_addr.rstrip("/") + "/worker_status/"
|
log_data = asdict(data)
|
||||||
|
def obfuscate(secret: str) -> str:
|
||||||
|
if secret is None:
|
||||||
|
return ""
|
||||||
|
return secret[:7] + "..." if len(secret) > 7 else ("*" * len(secret))
|
||||||
|
|
||||||
|
log_data["mtoken"] = obfuscate(log_data.get("mtoken"))
|
||||||
log.debug(
|
log.debug(
|
||||||
"\n".join(
|
"\n".join(
|
||||||
[
|
[
|
||||||
"#" * 60,
|
"#" * 60,
|
||||||
f"sending data to autoscaler",
|
f"sending data to autoscaler",
|
||||||
f"{json.dumps((asdict(data)), indent=2)}",
|
f"{json.dumps(log_data, indent=2)}",
|
||||||
"#" * 60,
|
"#" * 60,
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
full_path = report_addr.rstrip("/") + "/worker_status/"
|
||||||
for attempt in range(1, 4):
|
for attempt in range(1, 4):
|
||||||
try:
|
try:
|
||||||
session = await self.http()
|
session = await self.http()
|
||||||
|
|||||||
Reference in New Issue
Block a user