diff --git a/workers/null/worker.py b/workers/null/worker.py index 1410704..633f1a1 100644 --- a/workers/null/worker.py +++ b/workers/null/worker.py @@ -56,11 +56,49 @@ else: USE_STUB_HEALTH = True +# Workload reported per /reserve and target perf for the heartbeat below. +TARGET_PERF = 150.0 + # Singleton active reservation. `allow_parallel_requests=False` on the # /reserve handler guarantees the framework only runs one at a time per # worker, so a single slot is enough. _active_reservation: Optional[asyncio.Event] = None +# Backed in after Worker(...) is constructed so the heartbeat coroutine in +# null_lifecycle() can mutate backend.metrics. Stored in a dict so the +# lifecycle closure picks up the assignment that happens before .run(). +_backend_ref: dict = {"backend": None} + + +async def _perf_heartbeat() -> None: + """Keep cur_perf pegged to TARGET_PERF while a reservation is held. + + Without this, workload_served stays at 0 while a /reserve is being held + open. The autoscaler observes cur_perf=0 against max_perf=150, decides + the worker can't deliver its claimed throughput, and downgrades it — + which makes it cautious about scaling up and prone to queueing + subsequent requests behind the held one instead of routing elsewhere. + + Every second, if anything is in flight, set workload_served=TARGET_PERF + and mark update_pending so the metrics loop sends immediately. The + metrics tick resets workload_served back to 0 after sending; we + re-pin it next iteration. + """ + while True: + try: + await asyncio.sleep(1.0) + backend = _backend_ref.get("backend") + if backend is None: + continue + mm = backend.metrics.model_metrics + if mm.requests_working: + mm.workload_served = TARGET_PERF + backend.metrics.update_pending = True + except asyncio.CancelledError: + raise + except Exception as e: + log.debug(f"perf heartbeat error: {e}") + def _build_internal_app() -> web.Application: app = web.Application() @@ -107,6 +145,8 @@ async def null_lifecycle(): site = web.TCPSite(runner, INTERNAL_HOST, INTERNAL_PORT) await site.start() + heartbeat = asyncio.create_task(_perf_heartbeat(), name="null-perf-heartbeat") + lines = [ f"Null pyworker internal control server: http://{INTERNAL_HOST}:{INTERNAL_PORT}", f" POST /release - end the active reservation (call from your queue consumer)", @@ -122,6 +162,11 @@ async def null_lifecycle(): try: yield finally: + heartbeat.cancel() + try: + await heartbeat + except (asyncio.CancelledError, Exception): + pass await runner.cleanup() @@ -184,7 +229,7 @@ worker_config = WorkerConfig( # it to a free worker (or spins up a new one). max_queue_time=0.0, remote_function=reserve_worker, - workload_calculator=lambda _payload: 150.0, + workload_calculator=lambda _payload: TARGET_PERF, benchmark_config=BenchmarkConfig( generator=lambda: {BENCHMARK_SENTINEL: True}, runs=1, @@ -196,4 +241,6 @@ worker_config = WorkerConfig( log_action_config=LogActionConfig(), ) -Worker(worker_config).run() +_worker = Worker(worker_config) +_backend_ref["backend"] = _worker.backend +_worker.run()