Add /release control endpoint to null pyworker
The held /reserve now waits on an asyncio.Event and resolves when the local queue consumer POSTs /release on the internal control port (127.0.0.1:18999 by default). This produces a 200 success in metrics instead of the 499 cancellation you got from disconnecting the client. The duration cap stays as a safety net for stuck consumers. The internal aiohttp server is now unconditional and hosts /release always; the stub /health route is added only when BACKEND_HEALTH_URL is unset. NULL_STUB_HEALTH_PORT is renamed to NULL_CONTROL_PORT to reflect the broader role. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+83
-43
@@ -2,6 +2,7 @@ import asyncio
|
||||
import logging
|
||||
import os
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Optional
|
||||
from urllib.parse import urlsplit
|
||||
|
||||
from aiohttp import web
|
||||
@@ -16,8 +17,8 @@ from vastai import (
|
||||
|
||||
log = logging.getLogger(__file__)
|
||||
|
||||
# Safety cap: if a client never disconnects and never sets `duration`, the
|
||||
# reservation is auto-released after this many seconds so a stuck client
|
||||
# Safety cap: if the user's queue consumer never calls /release, the
|
||||
# reservation is auto-released after this many seconds so a forgotten /release
|
||||
# can't pin a worker indefinitely. Override with MAX_RESERVATION_SECONDS.
|
||||
MAX_RESERVATION_SECONDS = float(os.environ.get("MAX_RESERVATION_SECONDS", 3600))
|
||||
|
||||
@@ -25,20 +26,19 @@ MAX_RESERVATION_SECONDS = float(os.environ.get("MAX_RESERVATION_SECONDS", 3600))
|
||||
# immediately during capacity estimation instead of sleeping.
|
||||
BENCHMARK_SENTINEL = "__null_worker_benchmark__"
|
||||
|
||||
# Healthcheck wiring. The framework periodically GETs
|
||||
# `<model_server_url>:<model_server_port><model_healthcheck_url>` and marks the
|
||||
# worker errored if that ever fails after the first success. For the null
|
||||
# worker we either:
|
||||
# * point at a URL the user supplies via BACKEND_HEALTH_URL — typically
|
||||
# their own queue-consumer's health endpoint, so the autoscaler sees the
|
||||
# worker as broken if the consumer dies, or
|
||||
# * run a tiny built-in stub that always returns 200, so the framework has
|
||||
# something live to talk to until the user wires up a real consumer.
|
||||
BACKEND_HEALTH_URL = os.environ.get("BACKEND_HEALTH_URL", "").strip()
|
||||
STUB_HEALTH_HOST = "127.0.0.1"
|
||||
STUB_HEALTH_PORT = int(os.environ.get("NULL_STUB_HEALTH_PORT", 18999))
|
||||
# Internal control server. Hosts:
|
||||
# * POST /release — always available, marks the active reservation as
|
||||
# done so the held /reserve returns 200 (success in metrics, not a
|
||||
# cancellation).
|
||||
# * GET /health — only when no external BACKEND_HEALTH_URL is set; the
|
||||
# framework's healthcheck loop polls it so the worker has a live signal.
|
||||
# Bound to 127.0.0.1 so only processes on the instance can reach it.
|
||||
INTERNAL_HOST = "127.0.0.1"
|
||||
INTERNAL_PORT = int(os.environ.get("NULL_CONTROL_PORT", 18999))
|
||||
STUB_HEALTH_PATH = "/health"
|
||||
|
||||
BACKEND_HEALTH_URL = os.environ.get("BACKEND_HEALTH_URL", "").strip()
|
||||
|
||||
if BACKEND_HEALTH_URL:
|
||||
_parsed = urlsplit(BACKEND_HEALTH_URL)
|
||||
if not _parsed.scheme or not _parsed.hostname:
|
||||
@@ -48,43 +48,73 @@ if BACKEND_HEALTH_URL:
|
||||
HEALTH_BASE_URL = f"{_parsed.scheme}://{_parsed.hostname}"
|
||||
HEALTH_PORT = _parsed.port or (443 if _parsed.scheme == "https" else 80)
|
||||
HEALTH_PATH = _parsed.path or "/"
|
||||
USE_STUB = False
|
||||
USE_STUB_HEALTH = False
|
||||
else:
|
||||
HEALTH_BASE_URL = f"http://{STUB_HEALTH_HOST}"
|
||||
HEALTH_PORT = STUB_HEALTH_PORT
|
||||
HEALTH_BASE_URL = f"http://{INTERNAL_HOST}"
|
||||
HEALTH_PORT = INTERNAL_PORT
|
||||
HEALTH_PATH = STUB_HEALTH_PATH
|
||||
USE_STUB = True
|
||||
USE_STUB_HEALTH = True
|
||||
|
||||
|
||||
# Singleton active reservation. `allow_parallel_requests=False` on the
|
||||
# /reserve handler guarantees the framework only runs one at a time per
|
||||
# worker, so a single slot is enough.
|
||||
_active_reservation: Optional[asyncio.Event] = None
|
||||
|
||||
|
||||
def _build_internal_app() -> web.Application:
|
||||
app = web.Application()
|
||||
|
||||
async def release_handler(_request: web.Request) -> web.Response:
|
||||
event = _active_reservation
|
||||
if event is None:
|
||||
return web.json_response(
|
||||
{"released": False, "reason": "no active reservation"},
|
||||
status=200,
|
||||
)
|
||||
event.set()
|
||||
return web.json_response({"released": True}, status=200)
|
||||
|
||||
app.router.add_post("/release", release_handler)
|
||||
|
||||
if USE_STUB_HEALTH:
|
||||
async def stub_health(_request: web.Request) -> web.Response:
|
||||
return web.Response(status=200, text="ok")
|
||||
|
||||
app.router.add_get(STUB_HEALTH_PATH, stub_health)
|
||||
|
||||
return app
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def null_lifecycle():
|
||||
runner = None
|
||||
if USE_STUB:
|
||||
async def stub_health(_request: web.Request) -> web.Response:
|
||||
return web.Response(status=200, text="ok")
|
||||
app = _build_internal_app()
|
||||
runner = web.AppRunner(app)
|
||||
await runner.setup()
|
||||
site = web.TCPSite(runner, INTERNAL_HOST, INTERNAL_PORT)
|
||||
await site.start()
|
||||
|
||||
app = web.Application()
|
||||
app.router.add_get(STUB_HEALTH_PATH, stub_health)
|
||||
runner = web.AppRunner(app)
|
||||
await runner.setup()
|
||||
site = web.TCPSite(runner, STUB_HEALTH_HOST, STUB_HEALTH_PORT)
|
||||
await site.start()
|
||||
log.info(
|
||||
f"Null pyworker stub healthcheck listening on "
|
||||
f"http://{STUB_HEALTH_HOST}:{STUB_HEALTH_PORT}{STUB_HEALTH_PATH} "
|
||||
f"(override by setting BACKEND_HEALTH_URL)"
|
||||
lines = [
|
||||
f"Null pyworker internal control server: http://{INTERNAL_HOST}:{INTERNAL_PORT}",
|
||||
f" POST /release - end the active reservation (call from your queue consumer)",
|
||||
]
|
||||
if USE_STUB_HEALTH:
|
||||
lines.append(
|
||||
f" GET {STUB_HEALTH_PATH} - stub healthcheck (override with BACKEND_HEALTH_URL)"
|
||||
)
|
||||
else:
|
||||
log.info(f"Null pyworker healthcheck pointing at {BACKEND_HEALTH_URL}")
|
||||
lines.append(f"Framework healthcheck pointed at: {BACKEND_HEALTH_URL}")
|
||||
log.info("\n".join(lines))
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
if runner is not None:
|
||||
await runner.cleanup()
|
||||
await runner.cleanup()
|
||||
|
||||
|
||||
async def reserve_worker(**params: object) -> dict:
|
||||
global _active_reservation
|
||||
|
||||
if params.get(BENCHMARK_SENTINEL):
|
||||
return {"ok": True, "benchmark": True}
|
||||
|
||||
@@ -97,17 +127,27 @@ async def reserve_worker(**params: object) -> dict:
|
||||
except (TypeError, ValueError):
|
||||
duration = MAX_RESERVATION_SECONDS
|
||||
|
||||
event = asyncio.Event()
|
||||
_active_reservation = event
|
||||
log.info(
|
||||
f"Reservation acquired; holding worker busy for up to {duration:.1f}s "
|
||||
f"(release early by disconnecting the HTTP request)"
|
||||
f"Reservation acquired; awaiting POST /release on "
|
||||
f"http://{INTERNAL_HOST}:{INTERNAL_PORT}/release "
|
||||
f"(auto-release after {duration:.1f}s)"
|
||||
)
|
||||
try:
|
||||
await asyncio.sleep(duration)
|
||||
log.info("Reservation duration elapsed; releasing worker")
|
||||
return {"released": "duration_elapsed", "duration": duration}
|
||||
except asyncio.CancelledError:
|
||||
log.info("Reservation released by client disconnect")
|
||||
raise
|
||||
try:
|
||||
await asyncio.wait_for(event.wait(), timeout=duration)
|
||||
log.info("Reservation released via /release")
|
||||
return {"released": "explicit", "duration_cap": duration}
|
||||
except asyncio.TimeoutError:
|
||||
log.warning(
|
||||
f"Reservation hit duration cap of {duration:.1f}s without "
|
||||
f"explicit /release; releasing automatically"
|
||||
)
|
||||
return {"released": "duration_elapsed", "duration": duration}
|
||||
finally:
|
||||
if _active_reservation is event:
|
||||
_active_reservation = None
|
||||
|
||||
|
||||
worker_config = WorkerConfig(
|
||||
|
||||
Reference in New Issue
Block a user