Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0bcd2219ea | |||
| 0339b471c5 | |||
| e143162438 | |||
| 7a792fd176 | |||
| e0449cb3c7 |
+8
-34
@@ -30,7 +30,7 @@ from lib.data_types import (
|
||||
BenchmarkResult
|
||||
)
|
||||
|
||||
VERSION = "0.2.0"
|
||||
VERSION = "0.2.1"
|
||||
|
||||
MSG_HISTORY_LEN = 100
|
||||
log = logging.getLogger(__file__)
|
||||
@@ -235,14 +235,10 @@ class Backend:
|
||||
log.debug("No healthcheck endpoint defined, skipping healthcheck")
|
||||
return
|
||||
|
||||
first_healthcheck = True
|
||||
while True:
|
||||
await sleep(10)
|
||||
if self.__start_healthcheck is False:
|
||||
continue
|
||||
if first_healthcheck:
|
||||
log.info(f"[healthcheck] First healthcheck starting (model is now loaded)")
|
||||
first_healthcheck = False
|
||||
try:
|
||||
log.debug(f"Performing healthcheck on {health_check_url}")
|
||||
async with self.healthcheck_session.get(health_check_url) as response:
|
||||
@@ -260,22 +256,9 @@ class Backend:
|
||||
self.backend_errored(str(e))
|
||||
|
||||
async def _start_tracking(self) -> None:
|
||||
log.info("Starting tracking tasks (read_logs, send_metrics_loop, healthcheck, send_delete_requests_loop)")
|
||||
task_names = ["read_logs", "send_metrics_loop", "healthcheck", "send_delete_requests_loop"]
|
||||
results = await gather(
|
||||
self.__read_logs(),
|
||||
self.metrics._send_metrics_loop(),
|
||||
self.__healthcheck(),
|
||||
self.metrics._send_delete_requests_loop(),
|
||||
return_exceptions=True
|
||||
await gather(
|
||||
self.__read_logs(), self.metrics._send_metrics_loop(), self.__healthcheck(), self.metrics._send_delete_requests_loop()
|
||||
)
|
||||
# If we get here, one or more tasks exited (they should run forever)
|
||||
log.error(f"CRITICAL: _start_tracking gather returned! This should never happen. Results: {results}")
|
||||
for name, result in zip(task_names, results):
|
||||
if isinstance(result, Exception):
|
||||
log.error(f"Tracking task '{name}' crashed with exception: {result}", exc_info=result)
|
||||
elif result is not None:
|
||||
log.warning(f"Tracking task '{name}' exited unexpectedly with result: {result}")
|
||||
|
||||
def backend_errored(self, msg: str) -> None:
|
||||
self.metrics._model_errored(msg)
|
||||
@@ -416,20 +399,15 @@ class Backend:
|
||||
# await sleep(5)
|
||||
try:
|
||||
max_throughput = await run_benchmark()
|
||||
log.info(f"[benchmark] Benchmark complete, max_throughput={max_throughput}, setting healthcheck=True")
|
||||
self.__start_healthcheck = True
|
||||
self.metrics._model_loaded(
|
||||
max_throughput=max_throughput,
|
||||
)
|
||||
log.info(f"[benchmark] _model_loaded() called, returning from handle_log_line")
|
||||
except ClientConnectorError as e:
|
||||
log.debug(
|
||||
f"failed to connect to model api during benchmark"
|
||||
f"failed to connect to comfyui api during benchmark"
|
||||
)
|
||||
self.backend_errored(str(e))
|
||||
except Exception as e:
|
||||
log.error(f"Unexpected error during benchmark: {e}", exc_info=True)
|
||||
self.backend_errored(f"Benchmark failed: {e}")
|
||||
case LogAction.ModelError if msg in log_line:
|
||||
log.debug(f"Got log line indicating error: {log_line}")
|
||||
self.backend_errored(msg)
|
||||
@@ -441,14 +419,10 @@ class Backend:
|
||||
log.debug(f"tailing file: {self.model_log_file}")
|
||||
async with await open_file(self.model_log_file, encoding='utf-8', errors='ignore') as f:
|
||||
while True:
|
||||
try:
|
||||
line = await f.readline()
|
||||
if line:
|
||||
await handle_log_line(line.rstrip())
|
||||
else:
|
||||
await asyncio.sleep(LOG_POLL_INTERVAL)
|
||||
except Exception as e:
|
||||
log.error(f"Error processing log line: {e}", exc_info=True)
|
||||
line = await f.readline()
|
||||
if line:
|
||||
await handle_log_line(line.rstrip())
|
||||
else:
|
||||
await asyncio.sleep(LOG_POLL_INTERVAL)
|
||||
|
||||
###########
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import logging
|
||||
import json
|
||||
@@ -18,14 +17,6 @@ DELETE_REQUESTS_INTERVAL = 1
|
||||
log = logging.getLogger(__file__)
|
||||
|
||||
|
||||
def _flush_logs():
|
||||
"""Force flush all log handlers and stdout/stderr."""
|
||||
for handler in logging.root.handlers:
|
||||
handler.flush()
|
||||
sys.stdout.flush()
|
||||
sys.stderr.flush()
|
||||
|
||||
|
||||
@cache
|
||||
def get_url() -> str:
|
||||
use_ssl = os.environ.get("USE_SSL", "false") == "true"
|
||||
@@ -128,41 +119,22 @@ class Metrics:
|
||||
await self.__send_delete_requests_and_reset()
|
||||
|
||||
async def _send_metrics_loop(self) -> Awaitable[NoReturn]:
|
||||
loop_count = 0
|
||||
first_loaded_send_done = False
|
||||
while True:
|
||||
await sleep(METRICS_UPDATE_INTERVAL)
|
||||
loop_count += 1
|
||||
elapsed = time.time() - self.last_metric_update
|
||||
# Log heartbeat every 30 seconds to confirm loop is running
|
||||
if loop_count % 30 == 0:
|
||||
log.debug(f"[heartbeat] metrics loop alive, loop_count={loop_count}, model_loaded={self.system_metrics.model_is_loaded}")
|
||||
_flush_logs()
|
||||
# Extra logging for first few iterations after model loads
|
||||
if self.system_metrics.model_is_loaded and not first_loaded_send_done:
|
||||
log.info(f"[transition] First iteration with model_loaded=True, loop_count={loop_count}, elapsed={elapsed:.1f}")
|
||||
_flush_logs()
|
||||
if self.system_metrics.model_is_loaded is False and elapsed >= 10:
|
||||
log.debug(f"sending loading model metrics after {int(elapsed)}s wait")
|
||||
await self.__send_metrics_and_reset()
|
||||
elif self.update_pending or elapsed > 10:
|
||||
log.debug(f"sending loaded model metrics after {int(elapsed)}s wait")
|
||||
await self.__send_metrics_and_reset()
|
||||
if self.system_metrics.model_is_loaded and not first_loaded_send_done:
|
||||
first_loaded_send_done = True
|
||||
log.info(f"[transition] First loaded metrics send complete, continuing to next iteration...")
|
||||
_flush_logs()
|
||||
|
||||
def _model_loaded(self, max_throughput: float) -> None:
|
||||
log.info(f"MODEL LOADED: Setting model_is_loaded=True, max_throughput={max_throughput}")
|
||||
_flush_logs()
|
||||
self.system_metrics.model_loading_time = (
|
||||
time.time() - self.system_metrics.model_loading_start
|
||||
)
|
||||
self.system_metrics.model_is_loaded = True
|
||||
self.model_metrics.max_throughput = max_throughput
|
||||
log.info(f"MODEL LOADED: model_loading_time={self.system_metrics.model_loading_time}")
|
||||
_flush_logs()
|
||||
|
||||
def _model_errored(self, error_msg: str) -> None:
|
||||
self.model_metrics.set_errored(error_msg)
|
||||
@@ -299,7 +271,6 @@ class Metrics:
|
||||
###########
|
||||
|
||||
self.system_metrics.update_disk_usage()
|
||||
had_loadtime = loadtime_snapshot is not None and loadtime_snapshot > 0
|
||||
|
||||
sent = False
|
||||
for report_addr in self.report_addr:
|
||||
@@ -308,14 +279,8 @@ class Metrics:
|
||||
break
|
||||
|
||||
if sent:
|
||||
if had_loadtime:
|
||||
log.info(f"FIRST LOADTIME METRICS SENT SUCCESSFULLY! loadtime={loadtime_snapshot}")
|
||||
_flush_logs()
|
||||
# clear the one-shot loadtime only if we actually sent *this* value
|
||||
self.system_metrics.reset(expected=loadtime_snapshot)
|
||||
self.update_pending = False
|
||||
self.model_metrics.reset()
|
||||
self.last_metric_update = time.time()
|
||||
if had_loadtime:
|
||||
log.info(f"POST-SEND: reset complete, last_metric_update={self.last_metric_update}, continuing loop...")
|
||||
_flush_logs()
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
import os
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
from typing import List
|
||||
import ssl
|
||||
from asyncio import run, gather
|
||||
@@ -14,25 +12,7 @@ from aiohttp import web
|
||||
log = logging.getLogger(__file__)
|
||||
|
||||
|
||||
def _setup_signal_handlers():
|
||||
"""Setup signal handlers to log when process receives termination signals."""
|
||||
def signal_handler(signum, frame):
|
||||
sig_name = signal.Signals(signum).name
|
||||
log.error(f"SIGNAL RECEIVED: {sig_name} ({signum}) - process is being terminated")
|
||||
sys.stdout.flush()
|
||||
sys.stderr.flush()
|
||||
sys.exit(128 + signum)
|
||||
|
||||
# Handle common termination signals
|
||||
for sig in [signal.SIGTERM, signal.SIGINT, signal.SIGHUP]:
|
||||
try:
|
||||
signal.signal(sig, signal_handler)
|
||||
except (OSError, ValueError):
|
||||
pass # Some signals may not be available
|
||||
|
||||
|
||||
def start_server(backend: Backend, routes: List[web.RouteDef], **kwargs):
|
||||
_setup_signal_handlers()
|
||||
try:
|
||||
log.debug("getting certificate...")
|
||||
use_ssl = os.environ.get("USE_SSL", "false") == "true"
|
||||
|
||||
@@ -11,6 +11,7 @@ MODEL_SERVER_START_LOG_MSG = [
|
||||
"llama runner started", # Ollama
|
||||
'"message":"Connected","target":"text_generation_router"', # TGI
|
||||
'"message":"Connected","target":"text_generation_router::server"', # TGI
|
||||
"main: model loaded" # llama.cpp
|
||||
]
|
||||
|
||||
MODEL_SERVER_ERROR_LOG_MSGS = [
|
||||
@@ -34,6 +35,7 @@ backend = Backend(
|
||||
model_server_url=os.environ["MODEL_SERVER_URL"],
|
||||
model_log_file=os.environ["MODEL_LOG"],
|
||||
allow_parallel_requests=True,
|
||||
max_wait_time=600.0,
|
||||
benchmark_handler=CompletionsHandler(benchmark_runs=3, benchmark_words=256),
|
||||
log_actions=[
|
||||
*[(LogAction.ModelLoaded, info_msg) for info_msg in MODEL_SERVER_START_LOG_MSG],
|
||||
|
||||
Reference in New Issue
Block a user