Compare commits

..

3 Commits

Author SHA1 Message Date
Colter Downing 62fbfb061d more logs 2025-11-24 18:40:45 -08:00
Colter Downing c772e1651b debug logs 2025-11-24 18:21:35 -08:00
Colter Downing ecc6a3ce0d catch all exceptions, add logs 2025-11-24 18:06:17 -08:00
6 changed files with 124 additions and 68 deletions
+34 -8
View File
@@ -30,7 +30,7 @@ from lib.data_types import (
BenchmarkResult
)
VERSION = "0.2.1"
VERSION = "0.2.0"
MSG_HISTORY_LEN = 100
log = logging.getLogger(__file__)
@@ -235,10 +235,14 @@ class Backend:
log.debug("No healthcheck endpoint defined, skipping healthcheck")
return
first_healthcheck = True
while True:
await sleep(10)
if self.__start_healthcheck is False:
continue
if first_healthcheck:
log.info(f"[healthcheck] First healthcheck starting (model is now loaded)")
first_healthcheck = False
try:
log.debug(f"Performing healthcheck on {health_check_url}")
async with self.healthcheck_session.get(health_check_url) as response:
@@ -256,9 +260,22 @@ class Backend:
self.backend_errored(str(e))
async def _start_tracking(self) -> None:
await gather(
self.__read_logs(), self.metrics._send_metrics_loop(), self.__healthcheck(), self.metrics._send_delete_requests_loop()
log.info("Starting tracking tasks (read_logs, send_metrics_loop, healthcheck, send_delete_requests_loop)")
task_names = ["read_logs", "send_metrics_loop", "healthcheck", "send_delete_requests_loop"]
results = await gather(
self.__read_logs(),
self.metrics._send_metrics_loop(),
self.__healthcheck(),
self.metrics._send_delete_requests_loop(),
return_exceptions=True
)
# If we get here, one or more tasks exited (they should run forever)
log.error(f"CRITICAL: _start_tracking gather returned! This should never happen. Results: {results}")
for name, result in zip(task_names, results):
if isinstance(result, Exception):
log.error(f"Tracking task '{name}' crashed with exception: {result}", exc_info=result)
elif result is not None:
log.warning(f"Tracking task '{name}' exited unexpectedly with result: {result}")
def backend_errored(self, msg: str) -> None:
self.metrics._model_errored(msg)
@@ -399,15 +416,20 @@ class Backend:
# await sleep(5)
try:
max_throughput = await run_benchmark()
log.info(f"[benchmark] Benchmark complete, max_throughput={max_throughput}, setting healthcheck=True")
self.__start_healthcheck = True
self.metrics._model_loaded(
max_throughput=max_throughput,
)
log.info(f"[benchmark] _model_loaded() called, returning from handle_log_line")
except ClientConnectorError as e:
log.debug(
f"failed to connect to comfyui api during benchmark"
f"failed to connect to model api during benchmark"
)
self.backend_errored(str(e))
except Exception as e:
log.error(f"Unexpected error during benchmark: {e}", exc_info=True)
self.backend_errored(f"Benchmark failed: {e}")
case LogAction.ModelError if msg in log_line:
log.debug(f"Got log line indicating error: {log_line}")
self.backend_errored(msg)
@@ -419,10 +441,14 @@ class Backend:
log.debug(f"tailing file: {self.model_log_file}")
async with await open_file(self.model_log_file, encoding='utf-8', errors='ignore') as f:
while True:
line = await f.readline()
if line:
await handle_log_line(line.rstrip())
else:
try:
line = await f.readline()
if line:
await handle_log_line(line.rstrip())
else:
await asyncio.sleep(LOG_POLL_INTERVAL)
except Exception as e:
log.error(f"Error processing log line: {e}", exc_info=True)
await asyncio.sleep(LOG_POLL_INTERVAL)
###########
+35
View File
@@ -1,4 +1,5 @@
import os
import sys
import time
import logging
import json
@@ -17,6 +18,14 @@ DELETE_REQUESTS_INTERVAL = 1
log = logging.getLogger(__file__)
def _flush_logs():
"""Force flush all log handlers and stdout/stderr."""
for handler in logging.root.handlers:
handler.flush()
sys.stdout.flush()
sys.stderr.flush()
@cache
def get_url() -> str:
use_ssl = os.environ.get("USE_SSL", "false") == "true"
@@ -119,22 +128,41 @@ class Metrics:
await self.__send_delete_requests_and_reset()
async def _send_metrics_loop(self) -> Awaitable[NoReturn]:
loop_count = 0
first_loaded_send_done = False
while True:
await sleep(METRICS_UPDATE_INTERVAL)
loop_count += 1
elapsed = time.time() - self.last_metric_update
# Log heartbeat every 30 seconds to confirm loop is running
if loop_count % 30 == 0:
log.debug(f"[heartbeat] metrics loop alive, loop_count={loop_count}, model_loaded={self.system_metrics.model_is_loaded}")
_flush_logs()
# Extra logging for first few iterations after model loads
if self.system_metrics.model_is_loaded and not first_loaded_send_done:
log.info(f"[transition] First iteration with model_loaded=True, loop_count={loop_count}, elapsed={elapsed:.1f}")
_flush_logs()
if self.system_metrics.model_is_loaded is False and elapsed >= 10:
log.debug(f"sending loading model metrics after {int(elapsed)}s wait")
await self.__send_metrics_and_reset()
elif self.update_pending or elapsed > 10:
log.debug(f"sending loaded model metrics after {int(elapsed)}s wait")
await self.__send_metrics_and_reset()
if self.system_metrics.model_is_loaded and not first_loaded_send_done:
first_loaded_send_done = True
log.info(f"[transition] First loaded metrics send complete, continuing to next iteration...")
_flush_logs()
def _model_loaded(self, max_throughput: float) -> None:
log.info(f"MODEL LOADED: Setting model_is_loaded=True, max_throughput={max_throughput}")
_flush_logs()
self.system_metrics.model_loading_time = (
time.time() - self.system_metrics.model_loading_start
)
self.system_metrics.model_is_loaded = True
self.model_metrics.max_throughput = max_throughput
log.info(f"MODEL LOADED: model_loading_time={self.system_metrics.model_loading_time}")
_flush_logs()
def _model_errored(self, error_msg: str) -> None:
self.model_metrics.set_errored(error_msg)
@@ -271,6 +299,7 @@ class Metrics:
###########
self.system_metrics.update_disk_usage()
had_loadtime = loadtime_snapshot is not None and loadtime_snapshot > 0
sent = False
for report_addr in self.report_addr:
@@ -279,8 +308,14 @@ class Metrics:
break
if sent:
if had_loadtime:
log.info(f"FIRST LOADTIME METRICS SENT SUCCESSFULLY! loadtime={loadtime_snapshot}")
_flush_logs()
# clear the one-shot loadtime only if we actually sent *this* value
self.system_metrics.reset(expected=loadtime_snapshot)
self.update_pending = False
self.model_metrics.reset()
self.last_metric_update = time.time()
if had_loadtime:
log.info(f"POST-SEND: reset complete, last_metric_update={self.last_metric_update}, continuing loop...")
_flush_logs()
+20
View File
@@ -1,5 +1,7 @@
import os
import logging
import signal
import sys
from typing import List
import ssl
from asyncio import run, gather
@@ -12,7 +14,25 @@ from aiohttp import web
log = logging.getLogger(__file__)
def _setup_signal_handlers():
"""Setup signal handlers to log when process receives termination signals."""
def signal_handler(signum, frame):
sig_name = signal.Signals(signum).name
log.error(f"SIGNAL RECEIVED: {sig_name} ({signum}) - process is being terminated")
sys.stdout.flush()
sys.stderr.flush()
sys.exit(128 + signum)
# Handle common termination signals
for sig in [signal.SIGTERM, signal.SIGINT, signal.SIGHUP]:
try:
signal.signal(sig, signal_handler)
except (OSError, ValueError):
pass # Some signals may not be available
def start_server(backend: Backend, routes: List[web.RouteDef], **kwargs):
_setup_signal_handlers()
try:
log.debug("getting certificate...")
use_ssl = os.environ.get("USE_SSL", "false") == "true"
+19 -27
View File
@@ -34,30 +34,12 @@ uv pip install -r requirements.txt
Several examples have been provided in the client to help you get started with your own implementation.
First, set your API key as an environment variable:
### Completions
Call to `/v1/completions` with json response
```bash
export VAST_API_KEY=<your_api_key>
```
The `--model` and `--endpoint` flags are optional. If not provided, they default to `Qwen/Qwen3-8B` and `my-vllm-endpoint` respectively.
### Chat Completion (streaming)
Call to `/v1/chat/completions` with streaming response
```bash
python -m workers.openai.client --chat-stream --endpoint <ENDPOINT_NAME> --model <MODEL_NAME>
```
### Interactive Chat (streaming)
Interactive session with calls to `/v1/chat/completions`.
Type `clear` to clear the chat history or `quit` to exit.
```bash
python -m workers.openai.client --interactive --endpoint <ENDPOINT_NAME> --model <MODEL_NAME>
python -m workers.openai.client -k <API_KEY> -e <ENDPOINT_NAME> --completion --model <MODEL_NAME>
```
### Chat Completion (json)
@@ -65,7 +47,15 @@ python -m workers.openai.client --interactive --endpoint <ENDPOINT_NAME> --model
Call to `/v1/chat/completions` with json response
```bash
python -m workers.openai.client --chat --endpoint <ENDPOINT_NAME> --model <MODEL_NAME>
python -m workers.openai.client -k <API_KEY> -e <ENDPOINT_NAME> --chat --model <MODEL_NAME>
```
### Chat Completion (streaming)
Call to `/v1/chat/completions` with streaming response
```bash
python -m workers.openai.client -k <API_KEY> -e <ENDPOINT_NAME> --chat-stream --model <MODEL_NAME>
```
### Tool Use (json)
@@ -75,14 +65,16 @@ Call to `/v1/chat/completions` with tool and json response.
This test defines a simple tool which will list the contents of the local pyworker directory. The output is then analysed by the model.
```bash
python -m workers.openai.client --tools --endpoint <ENDPOINT_NAME> --model <MODEL_NAME>
python -m workers.openai.client -k <API_KEY> -e <ENDPOINT_NAME> --tools --model <MODEL_NAME>
```
### Completions
### Interactive Chat (streaming)
Call to `/v1/completions` with json response
Interactive session with calls to `/v1/chat/completions`.
Type `clear` to clear the chat history or `quit` to exit.
```bash
python -m workers.openai.client --completion --endpoint <ENDPOINT_NAME> --model <MODEL_NAME>
python -m workers.openai.client -k <API_KEY> -e <ENDPOINT_NAME> --interactive --model <MODEL_NAME>
```
+16 -32
View File
@@ -18,7 +18,7 @@ logging.basicConfig(
log = logging.getLogger(__file__)
# ---------------------- Prompts ----------------------
COMPLETIONS_PROMPT = "Zebras are primarily grazers and can subsist on lower-quality vegetation. They are preyed on mainly by"
COMPLETIONS_PROMPT = "the capital of USA is"
CHAT_PROMPT = "Think step by step: Tell me about the Python programming language."
TOOLS_PROMPT = (
"Can you list the files in the current working directory and tell me what you see? "
@@ -97,9 +97,9 @@ def _tool_state_to_message_tool_calls(state: Dict[int, Dict[str, Any]]) -> List[
# ---- OpenAI-compatible calls (non-streaming) ----
async def call_completions(client: Serverless, *, model: str, prompt: str, endpoint_name: str, **kwargs) -> Dict[str, Any]:
async def call_completions(client: Serverless, *, model: str, prompt: str, **kwargs) -> Dict[str, Any]:
endpoint = await client.get_endpoint(name=endpoint_name)
endpoint = await client.get_endpoint(name=ENDPOINT_NAME)
payload = {
"input": {
@@ -113,9 +113,9 @@ async def call_completions(client: Serverless, *, model: str, prompt: str, endpo
resp = await endpoint.request("/v1/completions", payload, cost=payload["input"]["max_tokens"])
return resp["response"]
async def call_chat_completions(client: Serverless, *, model: str, messages: List[Dict[str, Any]], endpoint_name: str, **kwargs) -> Dict[str, Any]:
async def call_chat_completions(client: Serverless, *, model: str, messages: List[Dict[str, Any]], **kwargs) -> Dict[str, Any]:
endpoint = await client.get_endpoint(name=endpoint_name)
endpoint = await client.get_endpoint(name=ENDPOINT_NAME)
payload = {
"input": {
@@ -132,9 +132,9 @@ async def call_chat_completions(client: Serverless, *, model: str, messages: Lis
return resp["response"]
# ---- Streaming variants ----
async def stream_completions(client: Serverless, *, model: str, prompt: str, endpoint_name: str, **kwargs):
async def stream_completions(client: Serverless, *, model: str, prompt: str, **kwargs):
endpoint = await client.get_endpoint(name=endpoint_name)
endpoint = await client.get_endpoint(name=ENDPOINT_NAME)
payload = {
"input": {
@@ -150,9 +150,9 @@ async def stream_completions(client: Serverless, *, model: str, prompt: str, end
resp = await endpoint.request("/v1/completions", payload, cost=payload["input"]["max_tokens"], stream=True)
return resp["response"] # async generator
async def stream_chat_completions(client: Serverless, *, model: str, messages: List[Dict[str, Any]], endpoint_name: str, **kwargs):
async def stream_chat_completions(client: Serverless, *, model: str, messages: List[Dict[str, Any]], **kwargs):
endpoint = await client.get_endpoint(name=endpoint_name)
endpoint = await client.get_endpoint(name=ENDPOINT_NAME)
payload = {
"input": {
@@ -174,10 +174,9 @@ async def stream_chat_completions(client: Serverless, *, model: str, messages: L
class APIDemo:
"""Demo and testing functionality for the API client"""
def __init__(self, client: Serverless, model: str, endpoint_name: str, tool_manager: Optional[ToolManager] = None):
def __init__(self, client: Serverless, model: str, tool_manager: Optional[ToolManager] = None):
self.client = client
self.model = model
self.endpoint_name = endpoint_name
self.tool_manager = tool_manager or ToolManager()
# ----- Streaming handler -----
@@ -186,15 +185,10 @@ class APIDemo:
reasoning_content = ""
printed_reasoning = False
printed_answer = False
finish_reason = None
async for chunk in stream:
choice = (chunk.get("choices") or [{}])[0]
delta = choice.get("delta", {})
# Track finish reason
if choice.get("finish_reason"):
finish_reason = choice.get("finish_reason")
# reasoning tokens
rc = delta.get("reasoning_content")
@@ -225,8 +219,6 @@ class APIDemo:
print(f"Reasoning tokens: {len(reasoning_content.split())}")
if printed_answer:
print(f"Response tokens: {len(full_response.split())}")
if finish_reason:
print(f"Finish reason: {finish_reason}")
return full_response
@@ -239,7 +231,6 @@ class APIDemo:
client=self.client,
model=self.model,
prompt=COMPLETIONS_PROMPT,
endpoint_name=self.endpoint_name,
max_tokens=MAX_TOKENS,
temperature=DEFAULT_TEMPERATURE,
)
@@ -258,7 +249,6 @@ class APIDemo:
client=self.client,
model=self.model,
messages=messages,
endpoint_name=self.endpoint_name,
max_tokens=MAX_TOKENS,
temperature=DEFAULT_TEMPERATURE
)
@@ -271,7 +261,6 @@ class APIDemo:
client=self.client,
model=self.model,
messages=messages,
endpoint_name=self.endpoint_name,
max_tokens=MAX_TOKENS,
temperature=DEFAULT_TEMPERATURE
)
@@ -298,7 +287,6 @@ class APIDemo:
client=self.client,
model=self.model,
messages=messages,
endpoint_name=self.endpoint_name,
tools=minimal_tool,
tool_choice="none",
max_tokens=10
@@ -324,7 +312,6 @@ class APIDemo:
client=self.client,
model=self.model,
messages=messages,
endpoint_name=self.endpoint_name,
tools=self.tool_manager.get_ls_tool_definition(),
tool_choice="auto",
max_tokens=MAX_TOKENS,
@@ -402,7 +389,6 @@ class APIDemo:
client=self.client,
model=self.model,
messages=messages,
endpoint_name=self.endpoint_name,
max_tokens=MAX_TOKENS,
temperature=DEFAULT_TEMPERATURE,
)
@@ -441,6 +427,7 @@ class APIDemo:
print("=" * 60)
print("INTERACTIVE STREAMING CHAT")
print("=" * 60)
print(f"Using model: {self.model}")
print("Type 'quit' to exit, 'clear' to clear history")
print()
@@ -466,8 +453,7 @@ class APIDemo:
stream = await stream_chat_completions(
client=self.client,
model=self.model,
messages=messages,
endpoint_name=self.endpoint_name,
messages=messages,
max_tokens=MAX_TOKENS,
temperature=0.7
)
@@ -487,8 +473,8 @@ class APIDemo:
# ---------------------- CLI ----------------------
def build_arg_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(description="Vast vLLM Demo (Serverless SDK)")
p.add_argument("--model", default=DEFAULT_MODEL, help=f"Model to use for requests (default: {DEFAULT_MODEL})")
p.add_argument("--endpoint", default=ENDPOINT_NAME, help=f"Vast endpoint name (default: {ENDPOINT_NAME})")
p.add_argument("--model", required=True, help="Model to use for requests (required)")
p.add_argument("--endpoint", default="my-vllm-endpoint", help="Vast endpoint name (default: my-vllm-endpoint)")
modes = p.add_mutually_exclusive_group(required=False)
modes.add_argument("--completion", action="store_true", help="Test completions endpoint")
@@ -516,14 +502,12 @@ async def main_async():
print("Please specify exactly one test mode")
sys.exit(1)
print("=" * 60)
print(f"Using model: {args.model}")
print(f"Using endpoint: {args.endpoint}")
print("=" * 60)
try:
async with Serverless() as client:
demo = APIDemo(client, args.model, args.endpoint, ToolManager())
demo = APIDemo(client, args.model, ToolManager())
if args.completion:
await demo.demo_completions()
-1
View File
@@ -11,7 +11,6 @@ MODEL_SERVER_START_LOG_MSG = [
"llama runner started", # Ollama
'"message":"Connected","target":"text_generation_router"', # TGI
'"message":"Connected","target":"text_generation_router::server"', # TGI
"main: model loaded" # llama.cpp
]
MODEL_SERVER_ERROR_LOG_MSGS = [