Rewrite null pyworker on the framework session model
Drop the held-/reserve approach in favour of the framework's session
primitive (max_sessions=1 + /session/create). Sessions are excluded from
the autoscaler's queue-wait math and don't suffer the cur_perf=0
degradation that a long-held request did, so this naturally produces the
"one request comes in and you get a worker; release and it scales back
down" model we were hand-rolling.
Server side:
- max_sessions=1; framework auto-registers /session/* routes
- Drop custom /reserve handler, _active_reservation event, max_queue_
time=0.0, MAX_RESERVATION_SECONDS, _perf_heartbeat
- Trivial /ping handler exists only to satisfy the framework's
"at least one handler with BenchmarkConfig" requirement (and to give
clients an extension/keepalive route)
- /release on the internal control port is kept as a convenience for
queue consumers that don't carry session_auth — calls the framework's
__close_session via name-mangling, which bypasses the session_auth
check but is fine for a localhost-only endpoint
- Workload/perf back to 100 (conventional)
Client side:
- Uses endpoint.session(cost, lifetime) instead of POST /reserve
- async with the SDK Session; close on exit posts /session/end with
proper auth → 200 success in metrics
- Demo and single modes both ride the same reserve() helper
Sessions landed in vastai-sdk 0.4.2 (commit ec9ef59, 2026-01-20).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+70
-121
@@ -2,7 +2,6 @@ import asyncio
|
||||
import logging
|
||||
import os
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Optional
|
||||
from urllib.parse import urlsplit
|
||||
|
||||
from aiohttp import web
|
||||
@@ -17,21 +16,21 @@ from vastai import (
|
||||
|
||||
log = logging.getLogger(__file__)
|
||||
|
||||
# Safety cap: if the user's queue consumer never calls /release, the
|
||||
# reservation is auto-released after this many seconds so a forgotten /release
|
||||
# can't pin a worker indefinitely. Override with MAX_RESERVATION_SECONDS.
|
||||
MAX_RESERVATION_SECONDS = float(os.environ.get("MAX_RESERVATION_SECONDS", 3600))
|
||||
# Performance value pinned in the benchmark cache; sent to autoscaler as
|
||||
# max_perf. Standardized at 100 — the conventional default the rest of the
|
||||
# serverless system expects.
|
||||
TARGET_PERF = 100.0
|
||||
|
||||
# Marker the benchmark path sets so the same remote function can return
|
||||
# immediately during capacity estimation instead of sleeping.
|
||||
# Marker the benchmark path sets so the fallback /ping path returns
|
||||
# immediately during the framework's startup benchmark.
|
||||
BENCHMARK_SENTINEL = "__null_worker_benchmark__"
|
||||
|
||||
# Internal control server. Hosts:
|
||||
# * POST /release — always available, marks the active reservation as
|
||||
# done so the held /reserve returns 200 (success in metrics, not a
|
||||
# cancellation).
|
||||
# * GET /health — only when no external BACKEND_HEALTH_URL is set; the
|
||||
# framework's healthcheck loop polls it so the worker has a live signal.
|
||||
# * POST /release — releases the active reservation by closing the
|
||||
# singleton session on this worker. Called by the user's queue
|
||||
# consumer when its work is done.
|
||||
# * GET /health — only when BACKEND_HEALTH_URL is unset; gives the
|
||||
# framework's healthcheck loop something live to talk to.
|
||||
# Bound to 127.0.0.1 so only processes on the instance can reach it.
|
||||
INTERNAL_HOST = "127.0.0.1"
|
||||
INTERNAL_PORT = int(os.environ.get("NULL_CONTROL_PORT", 18999))
|
||||
@@ -56,62 +55,51 @@ else:
|
||||
USE_STUB_HEALTH = True
|
||||
|
||||
|
||||
# Workload reported per /reserve and target perf for the heartbeat below.
|
||||
TARGET_PERF = 150.0
|
||||
|
||||
# Singleton active reservation. `allow_parallel_requests=False` on the
|
||||
# /reserve handler guarantees the framework only runs one at a time per
|
||||
# worker, so a single slot is enough.
|
||||
_active_reservation: Optional[asyncio.Event] = None
|
||||
|
||||
# Backed in after Worker(...) is constructed so the heartbeat coroutine in
|
||||
# null_lifecycle() can mutate backend.metrics. Stored in a dict so the
|
||||
# lifecycle closure picks up the assignment that happens before .run().
|
||||
# Stashed after Worker(...) is constructed so /release can reach the
|
||||
# framework's session machinery. Dict so the lifecycle closure picks up
|
||||
# the assignment that happens before .run().
|
||||
_backend_ref: dict = {"backend": None}
|
||||
|
||||
|
||||
async def _perf_heartbeat() -> None:
|
||||
"""Keep cur_perf pegged to TARGET_PERF while a reservation is held.
|
||||
|
||||
Without this, workload_served stays at 0 while a /reserve is being held
|
||||
open. The autoscaler observes cur_perf=0 against max_perf=150, decides
|
||||
the worker can't deliver its claimed throughput, and downgrades it —
|
||||
which makes it cautious about scaling up and prone to queueing
|
||||
subsequent requests behind the held one instead of routing elsewhere.
|
||||
|
||||
Every second, if anything is in flight, set workload_served=TARGET_PERF
|
||||
and mark update_pending so the metrics loop sends immediately. The
|
||||
metrics tick resets workload_served back to 0 after sending; we
|
||||
re-pin it next iteration.
|
||||
"""
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(1.0)
|
||||
backend = _backend_ref.get("backend")
|
||||
if backend is None:
|
||||
continue
|
||||
mm = backend.metrics.model_metrics
|
||||
if mm.requests_working:
|
||||
mm.workload_served = TARGET_PERF
|
||||
backend.metrics.update_pending = True
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as e:
|
||||
log.debug(f"perf heartbeat error: {e}")
|
||||
|
||||
|
||||
def _build_internal_app() -> web.Application:
|
||||
app = web.Application()
|
||||
|
||||
async def release_handler(_request: web.Request) -> web.Response:
|
||||
event = _active_reservation
|
||||
if event is None:
|
||||
"""End the active reservation (the singleton session on this worker).
|
||||
|
||||
max_sessions=1 means at most one session is active here. We call
|
||||
the framework's internal __close_session via name-mangling to
|
||||
bypass the session_auth check that /session/end normally requires.
|
||||
That's intentional: this endpoint is localhost-only so trust is
|
||||
assumed, and the user's consumer can release without having to
|
||||
plumb session_auth through their queue.
|
||||
|
||||
__close_session reports the session metrics as a success, fires
|
||||
on_close_route if configured, and pops the session from the dict.
|
||||
"""
|
||||
backend = _backend_ref.get("backend")
|
||||
if backend is None:
|
||||
return web.json_response(
|
||||
{"released": False, "reason": "no active reservation"},
|
||||
{"released": False, "reason": "backend not ready"},
|
||||
status=503,
|
||||
)
|
||||
sids = list(backend.sessions.keys())
|
||||
if not sids:
|
||||
return web.json_response(
|
||||
{"released": False, "reason": "no active session"},
|
||||
status=200,
|
||||
)
|
||||
event.set()
|
||||
return web.json_response({"released": True}, status=200)
|
||||
closed = []
|
||||
for sid in sids:
|
||||
try:
|
||||
if await backend._Backend__close_session(sid):
|
||||
closed.append(sid)
|
||||
except Exception as e:
|
||||
log.warning(f"Error closing session {sid}: {e}")
|
||||
return web.json_response(
|
||||
{"released": bool(closed), "session_ids": closed},
|
||||
status=200,
|
||||
)
|
||||
|
||||
app.router.add_post("/release", release_handler)
|
||||
|
||||
@@ -126,18 +114,14 @@ def _build_internal_app() -> web.Application:
|
||||
|
||||
@asynccontextmanager
|
||||
async def null_lifecycle():
|
||||
# Pin max_throughput to exactly 100 by pre-populating the framework's
|
||||
# benchmark cache file. The framework's __run_benchmark short-circuits
|
||||
# to `float(file_contents)` when this file exists, bypassing the
|
||||
# time-based calculation that would otherwise drift to ~99.x due to
|
||||
# asyncio scheduling overhead. The filename matches the framework
|
||||
# constant BENCHMARK_INDICATOR_FILE in
|
||||
# vastai.serverless.server.lib.backend.
|
||||
# Pin max_throughput to exactly TARGET_PERF by pre-populating the
|
||||
# framework's benchmark cache file. __run_benchmark short-circuits to
|
||||
# float(file_contents) when this file exists.
|
||||
try:
|
||||
with open(".has_benchmark", "w") as fh:
|
||||
fh.write("150")
|
||||
fh.write(str(int(TARGET_PERF)))
|
||||
except OSError as e:
|
||||
log.warning(f"Could not pin benchmark cache to 150: {e}")
|
||||
log.warning(f"Could not pin benchmark cache: {e}")
|
||||
|
||||
app = _build_internal_app()
|
||||
runner = web.AppRunner(app)
|
||||
@@ -145,8 +129,6 @@ async def null_lifecycle():
|
||||
site = web.TCPSite(runner, INTERNAL_HOST, INTERNAL_PORT)
|
||||
await site.start()
|
||||
|
||||
heartbeat = asyncio.create_task(_perf_heartbeat(), name="null-perf-heartbeat")
|
||||
|
||||
lines = [
|
||||
f"Null pyworker internal control server: http://{INTERNAL_HOST}:{INTERNAL_PORT}",
|
||||
f" POST /release - end the active reservation (call from your queue consumer)",
|
||||
@@ -157,60 +139,32 @@ async def null_lifecycle():
|
||||
)
|
||||
else:
|
||||
lines.append(f"Framework healthcheck pointed at: {BACKEND_HEALTH_URL}")
|
||||
lines.append(
|
||||
"Reservations use the framework session model. Clients POST to "
|
||||
"/session/create via the SDK to acquire a worker; max_sessions=1 "
|
||||
"so each worker holds at most one reservation."
|
||||
)
|
||||
log.info("\n".join(lines))
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
heartbeat.cancel()
|
||||
try:
|
||||
await heartbeat
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
await runner.cleanup()
|
||||
|
||||
|
||||
async def reserve_worker(**params: object) -> dict:
|
||||
global _active_reservation
|
||||
|
||||
async def ping(**params: object) -> dict:
|
||||
"""Trivial handler. Exists to satisfy the framework's requirement that
|
||||
at least one HandlerConfig has a BenchmarkConfig, and to give clients
|
||||
a route they can hit with session_id to extend their session TTL.
|
||||
"""
|
||||
if params.get(BENCHMARK_SENTINEL):
|
||||
# Fallback path only — the lifecycle pre-populates .has_benchmark
|
||||
# with "150" so __run_benchmark normally short-circuits and never
|
||||
# invokes us. If the cache write failed, sleep ~1s so the
|
||||
# time-based calculation lands near 150 (workload=150 / time~=1s).
|
||||
# Fallback only — the lifecycle pre-pins .has_benchmark so
|
||||
# __run_benchmark normally short-circuits and this never runs. If
|
||||
# the cache write failed, sleep ~1s so the time-based throughput
|
||||
# math lands near TARGET_PERF.
|
||||
await asyncio.sleep(1.0)
|
||||
return {"ok": True, "benchmark": True}
|
||||
|
||||
requested = params.get("duration")
|
||||
if requested is None:
|
||||
duration = MAX_RESERVATION_SECONDS
|
||||
else:
|
||||
try:
|
||||
duration = max(0.0, min(float(requested), MAX_RESERVATION_SECONDS))
|
||||
except (TypeError, ValueError):
|
||||
duration = MAX_RESERVATION_SECONDS
|
||||
|
||||
event = asyncio.Event()
|
||||
_active_reservation = event
|
||||
log.info(
|
||||
f"Reservation acquired; awaiting POST /release on "
|
||||
f"http://{INTERNAL_HOST}:{INTERNAL_PORT}/release "
|
||||
f"(auto-release after {duration:.1f}s)"
|
||||
)
|
||||
try:
|
||||
try:
|
||||
await asyncio.wait_for(event.wait(), timeout=duration)
|
||||
log.info("Reservation released via /release")
|
||||
return {"released": "explicit", "duration_cap": duration}
|
||||
except asyncio.TimeoutError:
|
||||
log.warning(
|
||||
f"Reservation hit duration cap of {duration:.1f}s without "
|
||||
f"explicit /release; releasing automatically"
|
||||
)
|
||||
return {"released": "duration_elapsed", "duration": duration}
|
||||
finally:
|
||||
if _active_reservation is event:
|
||||
_active_reservation = None
|
||||
return {"ok": True}
|
||||
|
||||
|
||||
worker_config = WorkerConfig(
|
||||
@@ -218,17 +172,12 @@ worker_config = WorkerConfig(
|
||||
model_server_port=HEALTH_PORT,
|
||||
model_healthcheck_url=HEALTH_PATH,
|
||||
lifecycle=null_lifecycle(),
|
||||
max_sessions=1,
|
||||
handlers=[
|
||||
HandlerConfig(
|
||||
route="/reserve",
|
||||
allow_parallel_requests=False,
|
||||
# Reject (429) any /reserve that arrives while the worker is
|
||||
# already busy. A held reservation lasts up to MAX_RESERVATION_
|
||||
# SECONDS, so queueing behind it would mean hours of wait —
|
||||
# better to bounce the request immediately so serverless routes
|
||||
# it to a free worker (or spins up a new one).
|
||||
max_queue_time=0.0,
|
||||
remote_function=reserve_worker,
|
||||
route="/ping",
|
||||
allow_parallel_requests=True,
|
||||
remote_function=ping,
|
||||
workload_calculator=lambda _payload: TARGET_PERF,
|
||||
benchmark_config=BenchmarkConfig(
|
||||
generator=lambda: {BENCHMARK_SENTINEL: True},
|
||||
|
||||
Reference in New Issue
Block a user