From 6c2f194b28a8681142c8550735cc143bd9117362 Mon Sep 17 00:00:00 2001 From: Rob Ballantyne Date: Tue, 12 May 2026 10:35:18 +0100 Subject: [PATCH] Add perf heartbeat to keep null pyworker reporting peak throughput MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit While a /reserve is held, no requests complete so workload_served stays at 0 each metrics tick. The autoscaler sees cur_perf=0 against max_perf=150, concludes the worker can't deliver claimed throughput, downgrades it, and gets cautious about scaling up — so additional /reserve requests pile up behind the held one instead of triggering a new worker. Add a 1Hz heartbeat coroutine that, while anything is in flight, sets workload_served back to TARGET_PERF (150) and flags update_pending. The metrics tick reads 150 and resets to 0; the heartbeat re-pins it before the next tick. Net effect: the autoscaler sees a saturated worker delivering at peak rate, which is the signal it needs to scale a new worker up rather than queue. The heartbeat needs the backend instance, which is only created inside Worker(...) — stash a reference in a module-level dict between Worker() and .run() so the lifecycle coroutine can reach it. Co-Authored-By: Claude Opus 4.7 (1M context) --- workers/null/worker.py | 51 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/workers/null/worker.py b/workers/null/worker.py index 1410704..633f1a1 100644 --- a/workers/null/worker.py +++ b/workers/null/worker.py @@ -56,11 +56,49 @@ else: USE_STUB_HEALTH = True +# Workload reported per /reserve and target perf for the heartbeat below. +TARGET_PERF = 150.0 + # Singleton active reservation. `allow_parallel_requests=False` on the # /reserve handler guarantees the framework only runs one at a time per # worker, so a single slot is enough. _active_reservation: Optional[asyncio.Event] = None +# Backed in after Worker(...) is constructed so the heartbeat coroutine in +# null_lifecycle() can mutate backend.metrics. Stored in a dict so the +# lifecycle closure picks up the assignment that happens before .run(). +_backend_ref: dict = {"backend": None} + + +async def _perf_heartbeat() -> None: + """Keep cur_perf pegged to TARGET_PERF while a reservation is held. + + Without this, workload_served stays at 0 while a /reserve is being held + open. The autoscaler observes cur_perf=0 against max_perf=150, decides + the worker can't deliver its claimed throughput, and downgrades it — + which makes it cautious about scaling up and prone to queueing + subsequent requests behind the held one instead of routing elsewhere. + + Every second, if anything is in flight, set workload_served=TARGET_PERF + and mark update_pending so the metrics loop sends immediately. The + metrics tick resets workload_served back to 0 after sending; we + re-pin it next iteration. + """ + while True: + try: + await asyncio.sleep(1.0) + backend = _backend_ref.get("backend") + if backend is None: + continue + mm = backend.metrics.model_metrics + if mm.requests_working: + mm.workload_served = TARGET_PERF + backend.metrics.update_pending = True + except asyncio.CancelledError: + raise + except Exception as e: + log.debug(f"perf heartbeat error: {e}") + def _build_internal_app() -> web.Application: app = web.Application() @@ -107,6 +145,8 @@ async def null_lifecycle(): site = web.TCPSite(runner, INTERNAL_HOST, INTERNAL_PORT) await site.start() + heartbeat = asyncio.create_task(_perf_heartbeat(), name="null-perf-heartbeat") + lines = [ f"Null pyworker internal control server: http://{INTERNAL_HOST}:{INTERNAL_PORT}", f" POST /release - end the active reservation (call from your queue consumer)", @@ -122,6 +162,11 @@ async def null_lifecycle(): try: yield finally: + heartbeat.cancel() + try: + await heartbeat + except (asyncio.CancelledError, Exception): + pass await runner.cleanup() @@ -184,7 +229,7 @@ worker_config = WorkerConfig( # it to a free worker (or spins up a new one). max_queue_time=0.0, remote_function=reserve_worker, - workload_calculator=lambda _payload: 150.0, + workload_calculator=lambda _payload: TARGET_PERF, benchmark_config=BenchmarkConfig( generator=lambda: {BENCHMARK_SENTINEL: True}, runs=1, @@ -196,4 +241,6 @@ worker_config = WorkerConfig( log_action_config=LogActionConfig(), ) -Worker(worker_config).run() +_worker = Worker(worker_config) +_backend_ref["backend"] = _worker.backend +_worker.run()