diff --git a/lib/backend.py b/lib/backend.py index 6a2f3c0..5cbb7ff 100644 --- a/lib/backend.py +++ b/lib/backend.py @@ -30,7 +30,7 @@ from lib.data_types import ( BenchmarkResult ) -VERSION = "0.1.0" +VERSION = "0.2.0" MSG_HISTORY_LEN = 100 log = logging.getLogger(__file__) @@ -69,10 +69,14 @@ class Backend: report_addr: str = dataclasses.field( default_factory=lambda: os.environ.get("REPORT_ADDR", "https://run.vast.ai") ) + mtoken: str = dataclasses.field( + default_factory=lambda: os.environ.get("MASTER_TOKEN", "") + ) def __post_init__(self): self.metrics = Metrics() self.metrics._set_version(self.version) + self.metrics._set_mtoken(self.mtoken) self._total_pubkey_fetch_errors = 0 self._pubkey = self._fetch_pubkey() self.__start_healthcheck: bool = False diff --git a/lib/data_types.py b/lib/data_types.py index 77883c5..d948c60 100644 --- a/lib/data_types.py +++ b/lib/data_types.py @@ -286,6 +286,7 @@ class AutoScalerData: """Data that is reported to autoscaler""" id: int + mtoken: str version: str loadtime: float cur_load: float diff --git a/lib/metrics.py b/lib/metrics.py index 5f15f74..48774fe 100644 --- a/lib/metrics.py +++ b/lib/metrics.py @@ -28,6 +28,7 @@ def get_url() -> str: @dataclass class Metrics: version: str = "0" + mtoken: str = "" last_metric_update: float = 0.0 last_request_served: float = 0.0 update_pending: bool = False @@ -142,12 +143,16 @@ class Metrics: def _set_version(self, version: str) -> None: self.version = version + def _set_mtoken(self, mtoken: str) -> None: + self.mtoken = mtoken + #######################################Private####################################### async def __send_delete_requests_and_reset(self): async def post(report_addr: str, idxs: list[int], success_flag: bool) -> bool: data = { "worker_id": self.id, + "mtoken": self.mtoken, "request_idxs": idxs, "success": success_flag, } @@ -209,6 +214,7 @@ class Metrics: def compute_autoscaler_data() -> AutoScalerData: return AutoScalerData( id=self.id, + mtoken=self.mtoken, version=self.version, loadtime=(loadtime_snapshot or 0.0), new_load=self.model_metrics.workload_processing, @@ -228,17 +234,25 @@ class Metrics: async def send_data(report_addr: str) -> bool: data = compute_autoscaler_data() - full_path = report_addr.rstrip("/") + "/worker_status/" + log_data = asdict(data) + def obfuscate(secret: str) -> str: + if secret is None: + return "" + return secret[:7] + "..." if len(secret) > 7 else ("*" * len(secret)) + + log_data["mtoken"] = obfuscate(log_data.get("mtoken")) log.debug( "\n".join( [ "#" * 60, f"sending data to autoscaler", - f"{json.dumps((asdict(data)), indent=2)}", + f"{json.dumps(log_data, indent=2)}", "#" * 60, ] ) ) + + full_path = report_addr.rstrip("/") + "/worker_status/" for attempt in range(1, 4): try: session = await self.http()