Collapse null pyworker client to a single mode parameterized by --count
Now that the session model means no HTTP connection is held during the reservation, the dichotomy between "single reserve" and "trapezoid demo" collapses — both are "open N sessions, each held for H seconds, started I seconds apart, close." Replace --reserve/--demo/--duration/--plateau with --count/--hold/--interval. --session-cost becomes --cost. Client is now 64 lines (down from 120). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -68,17 +68,18 @@ consumer dies, the autoscaler sees the worker as broken.
|
|||||||
## Client demo
|
## Client demo
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
# Single reservation
|
# Single reservation, hold 180s
|
||||||
python -m workers.null.client --endpoint <NAME> --instance alpha
|
python -m workers.null.client --endpoint <NAME> --instance alpha
|
||||||
|
|
||||||
# Staggered three-session trapezoid
|
# Three concurrent reservations, started 30s apart, each held 360s
|
||||||
python -m workers.null.client --endpoint <NAME> --instance alpha --demo
|
python -m workers.null.client --endpoint <NAME> --instance alpha --count 3 --hold 360
|
||||||
```
|
```
|
||||||
|
|
||||||
Flags: `--duration` (single), `--interval` and `--plateau` (demo
|
Flags: `--count` (number of concurrent sessions, default 1), `--hold`
|
||||||
timing), `--session-cost` (overrides the cost reported at session
|
(seconds each session is held, default 180), `--interval` (seconds
|
||||||
create; default 100 = `max_perf`), `--instance` (`prod` | `alpha` |
|
between starts when `--count > 1`, default 30), `--cost` (cost reported
|
||||||
`candidate` | `local`).
|
at session-create, default 100 = `max_perf`), `--instance` (`prod` |
|
||||||
|
`alpha` | `candidate` | `local`).
|
||||||
|
|
||||||
## Environment variables
|
## Environment variables
|
||||||
|
|
||||||
|
|||||||
+33
-89
@@ -3,7 +3,6 @@ import asyncio
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
import time
|
|
||||||
|
|
||||||
from vastai import Serverless
|
from vastai import Serverless
|
||||||
|
|
||||||
@@ -14,103 +13,48 @@ logging.basicConfig(
|
|||||||
)
|
)
|
||||||
log = logging.getLogger(__file__)
|
log = logging.getLogger(__file__)
|
||||||
|
|
||||||
ENDPOINT_NAME = "null-prod"
|
|
||||||
DEFAULT_SESSION_COST = 100 # matches the worker's max_perf
|
|
||||||
|
|
||||||
|
async def reserve(client: Serverless, endpoint_name: str, hold: float, cost: int, label: str):
|
||||||
async def reserve(
|
|
||||||
client: Serverless,
|
|
||||||
*,
|
|
||||||
endpoint_name: str,
|
|
||||||
hold_for: float,
|
|
||||||
session_cost: int,
|
|
||||||
label: str = "session",
|
|
||||||
) -> None:
|
|
||||||
endpoint = await client.get_endpoint(name=endpoint_name)
|
endpoint = await client.get_endpoint(name=endpoint_name)
|
||||||
lifetime = hold_for + 60 # outlast the hold; no keepalives sent
|
async with await endpoint.session(cost=cost, lifetime=hold + 60) as s:
|
||||||
start = time.monotonic()
|
sid = s.session_id
|
||||||
log.info("[%s] creating session (cost=%d, hold=%.0fs)", label, session_cost, hold_for)
|
log.info("[%s] %s open, holding %.0fs", label, sid, hold)
|
||||||
async with await endpoint.session(cost=session_cost, lifetime=lifetime) as s:
|
await asyncio.sleep(hold)
|
||||||
log.info("[%s] session %s open", label, s.session_id)
|
log.info("[%s] %s closed", label, sid)
|
||||||
try:
|
|
||||||
await asyncio.sleep(hold_for)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
log.info("[%s] cancelled after %.1fs", label, time.monotonic() - start)
|
|
||||||
raise
|
|
||||||
log.info("[%s] closed cleanly after %.1fs", label, time.monotonic() - start)
|
|
||||||
|
|
||||||
|
|
||||||
async def run_demo(
|
|
||||||
client: Serverless,
|
|
||||||
*,
|
|
||||||
endpoint_name: str,
|
|
||||||
interval: float,
|
|
||||||
plateau: float,
|
|
||||||
session_cost: int,
|
|
||||||
) -> None:
|
|
||||||
n = 3
|
|
||||||
hold = (n - 1) * interval + plateau
|
|
||||||
tasks: list[asyncio.Task] = []
|
|
||||||
for i in range(1, n + 1):
|
|
||||||
label = f"res-{i}"
|
|
||||||
tasks.append(asyncio.create_task(
|
|
||||||
reserve(client, endpoint_name=endpoint_name, hold_for=hold,
|
|
||||||
session_cost=session_cost, label=label),
|
|
||||||
name=label,
|
|
||||||
))
|
|
||||||
if i < n:
|
|
||||||
await asyncio.sleep(interval)
|
|
||||||
log.info(
|
|
||||||
"All %d sessions in flight; plateau %.0fs, scale-down %.0fs apart",
|
|
||||||
n, plateau, interval,
|
|
||||||
)
|
|
||||||
await asyncio.gather(*tasks, return_exceptions=True)
|
|
||||||
|
|
||||||
|
|
||||||
def build_arg_parser() -> argparse.ArgumentParser:
|
|
||||||
p = argparse.ArgumentParser(description="Vast Null PyWorker demo client")
|
|
||||||
p.add_argument("--endpoint", default=os.environ.get("VAST_ENDPOINT", ENDPOINT_NAME),
|
|
||||||
help=f"endpoint name (default: {ENDPOINT_NAME})")
|
|
||||||
p.add_argument("--instance", choices=("prod", "alpha", "candidate", "local"),
|
|
||||||
default=os.environ.get("VAST_INSTANCE", "prod"),
|
|
||||||
help="serverless instance (default: prod)")
|
|
||||||
p.add_argument("--duration", type=float, default=180.0,
|
|
||||||
help="single-reserve mode: seconds to hold (default: 180)")
|
|
||||||
|
|
||||||
modes = p.add_mutually_exclusive_group(required=False)
|
|
||||||
modes.add_argument("--reserve", action="store_true",
|
|
||||||
help="single session (default if no mode given)")
|
|
||||||
modes.add_argument("--demo", action="store_true",
|
|
||||||
help="staggered 3-session trapezoid")
|
|
||||||
|
|
||||||
p.add_argument("--interval", type=float, default=30.0,
|
|
||||||
help="demo: seconds between sessions (default: 30)")
|
|
||||||
p.add_argument("--plateau", type=float, default=300.0,
|
|
||||||
help="demo: seconds to hold all 3 active (default: 300)")
|
|
||||||
p.add_argument("--session-cost", type=int, default=DEFAULT_SESSION_COST,
|
|
||||||
help=f"cost reported at session-create (default: {DEFAULT_SESSION_COST})")
|
|
||||||
return p
|
|
||||||
|
|
||||||
|
|
||||||
async def main_async():
|
async def main_async():
|
||||||
args = build_arg_parser().parse_args()
|
p = argparse.ArgumentParser(description="Vast Null PyWorker demo client")
|
||||||
print("=" * 60)
|
p.add_argument("--endpoint", default=os.environ.get("VAST_ENDPOINT", "null-prod"))
|
||||||
print(f"Endpoint: {args.endpoint} (instance: {args.instance})")
|
p.add_argument("--instance", choices=("prod", "alpha", "candidate", "local"),
|
||||||
print("=" * 60)
|
default=os.environ.get("VAST_INSTANCE", "prod"))
|
||||||
|
p.add_argument("--count", type=int, default=1,
|
||||||
|
help="concurrent sessions to open (default: 1)")
|
||||||
|
p.add_argument("--interval", type=float, default=30.0,
|
||||||
|
help="seconds between session starts when count>1 (default: 30)")
|
||||||
|
p.add_argument("--hold", type=float, default=180.0,
|
||||||
|
help="seconds to hold each session (default: 180)")
|
||||||
|
p.add_argument("--cost", type=int, default=100,
|
||||||
|
help="cost reported at session-create (default: 100)")
|
||||||
|
args = p.parse_args()
|
||||||
|
|
||||||
|
print(f"endpoint={args.endpoint} instance={args.instance} "
|
||||||
|
f"count={args.count} hold={args.hold}s cost={args.cost}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with Serverless(instance=args.instance) as client:
|
async with Serverless(instance=args.instance) as client:
|
||||||
if args.demo:
|
tasks = []
|
||||||
await run_demo(client, endpoint_name=args.endpoint,
|
for i in range(args.count):
|
||||||
interval=args.interval, plateau=args.plateau,
|
label = f"res-{i+1}" if args.count > 1 else "reservation"
|
||||||
session_cost=args.session_cost)
|
tasks.append(asyncio.create_task(
|
||||||
else:
|
reserve(client, args.endpoint, args.hold, args.cost, label),
|
||||||
await reserve(client, endpoint_name=args.endpoint,
|
name=label,
|
||||||
hold_for=args.duration,
|
))
|
||||||
session_cost=args.session_cost,
|
if i + 1 < args.count:
|
||||||
label="reservation")
|
await asyncio.sleep(args.interval)
|
||||||
|
await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
log.info("Interrupted; dropping in-flight sessions")
|
log.info("Interrupted")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.error("Error: %s", e, exc_info=True)
|
log.error("Error: %s", e, exc_info=True)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|||||||
Reference in New Issue
Block a user