Compare commits

..

12 Commits

Author SHA1 Message Date
Lucas Armand 405a8f1c0d returned to worker-sdk 2025-12-10 16:37:09 -08:00
Lucas Armand 12f4f23d39 remove parse request 2025-12-10 15:16:23 -08:00
Lucas Armand e2a771bb5a update ace and wan workers 2025-12-10 15:09:27 -08:00
Lucas Armand 0cd64adfc4 remove input 2025-12-10 14:47:47 -08:00
Lucas Armand 6f795b8fb8 remove input from workers 2025-12-10 14:46:10 -08:00
Lucas Armand 4bcc508473 reduce vllm benchmark runs to 2 2025-11-25 16:54:17 -08:00
Lucas Armand 74d7330800 add wan and ace workers 2025-11-25 16:08:40 -08:00
Lucas Armand 2ce0450809 Add worker.pys 2025-11-25 16:08:38 -08:00
LucasArmandVast 0339b471c5 Merge pull request #66 from vast-ai/synthesis
PyWorker Error Handling
2025-11-25 16:02:26 -08:00
Lucas Armand e143162438 bumpy pyworker version 2025-11-25 16:01:23 -08:00
LucasArmandVast 7a792fd176 Merge pull request #64 from vast-ai/add-llama-log
add llama log
2025-11-21 10:24:27 -08:00
Lucas Armand e0449cb3c7 add llama log 2025-11-21 10:22:16 -08:00
11 changed files with 730 additions and 92 deletions
+8 -34
View File
@@ -30,7 +30,7 @@ from lib.data_types import (
BenchmarkResult BenchmarkResult
) )
VERSION = "0.2.0" VERSION = "0.2.1"
MSG_HISTORY_LEN = 100 MSG_HISTORY_LEN = 100
log = logging.getLogger(__file__) log = logging.getLogger(__file__)
@@ -235,14 +235,10 @@ class Backend:
log.debug("No healthcheck endpoint defined, skipping healthcheck") log.debug("No healthcheck endpoint defined, skipping healthcheck")
return return
first_healthcheck = True
while True: while True:
await sleep(10) await sleep(10)
if self.__start_healthcheck is False: if self.__start_healthcheck is False:
continue continue
if first_healthcheck:
log.info(f"[healthcheck] First healthcheck starting (model is now loaded)")
first_healthcheck = False
try: try:
log.debug(f"Performing healthcheck on {health_check_url}") log.debug(f"Performing healthcheck on {health_check_url}")
async with self.healthcheck_session.get(health_check_url) as response: async with self.healthcheck_session.get(health_check_url) as response:
@@ -260,22 +256,9 @@ class Backend:
self.backend_errored(str(e)) self.backend_errored(str(e))
async def _start_tracking(self) -> None: async def _start_tracking(self) -> None:
log.info("Starting tracking tasks (read_logs, send_metrics_loop, healthcheck, send_delete_requests_loop)") await gather(
task_names = ["read_logs", "send_metrics_loop", "healthcheck", "send_delete_requests_loop"] self.__read_logs(), self.metrics._send_metrics_loop(), self.__healthcheck(), self.metrics._send_delete_requests_loop()
results = await gather(
self.__read_logs(),
self.metrics._send_metrics_loop(),
self.__healthcheck(),
self.metrics._send_delete_requests_loop(),
return_exceptions=True
) )
# If we get here, one or more tasks exited (they should run forever)
log.error(f"CRITICAL: _start_tracking gather returned! This should never happen. Results: {results}")
for name, result in zip(task_names, results):
if isinstance(result, Exception):
log.error(f"Tracking task '{name}' crashed with exception: {result}", exc_info=result)
elif result is not None:
log.warning(f"Tracking task '{name}' exited unexpectedly with result: {result}")
def backend_errored(self, msg: str) -> None: def backend_errored(self, msg: str) -> None:
self.metrics._model_errored(msg) self.metrics._model_errored(msg)
@@ -416,20 +399,15 @@ class Backend:
# await sleep(5) # await sleep(5)
try: try:
max_throughput = await run_benchmark() max_throughput = await run_benchmark()
log.info(f"[benchmark] Benchmark complete, max_throughput={max_throughput}, setting healthcheck=True")
self.__start_healthcheck = True self.__start_healthcheck = True
self.metrics._model_loaded( self.metrics._model_loaded(
max_throughput=max_throughput, max_throughput=max_throughput,
) )
log.info(f"[benchmark] _model_loaded() called, returning from handle_log_line")
except ClientConnectorError as e: except ClientConnectorError as e:
log.debug( log.debug(
f"failed to connect to model api during benchmark" f"failed to connect to comfyui api during benchmark"
) )
self.backend_errored(str(e)) self.backend_errored(str(e))
except Exception as e:
log.error(f"Unexpected error during benchmark: {e}", exc_info=True)
self.backend_errored(f"Benchmark failed: {e}")
case LogAction.ModelError if msg in log_line: case LogAction.ModelError if msg in log_line:
log.debug(f"Got log line indicating error: {log_line}") log.debug(f"Got log line indicating error: {log_line}")
self.backend_errored(msg) self.backend_errored(msg)
@@ -441,14 +419,10 @@ class Backend:
log.debug(f"tailing file: {self.model_log_file}") log.debug(f"tailing file: {self.model_log_file}")
async with await open_file(self.model_log_file, encoding='utf-8', errors='ignore') as f: async with await open_file(self.model_log_file, encoding='utf-8', errors='ignore') as f:
while True: while True:
try: line = await f.readline()
line = await f.readline() if line:
if line: await handle_log_line(line.rstrip())
await handle_log_line(line.rstrip()) else:
else:
await asyncio.sleep(LOG_POLL_INTERVAL)
except Exception as e:
log.error(f"Error processing log line: {e}", exc_info=True)
await asyncio.sleep(LOG_POLL_INTERVAL) await asyncio.sleep(LOG_POLL_INTERVAL)
########### ###########
-35
View File
@@ -1,5 +1,4 @@
import os import os
import sys
import time import time
import logging import logging
import json import json
@@ -18,14 +17,6 @@ DELETE_REQUESTS_INTERVAL = 1
log = logging.getLogger(__file__) log = logging.getLogger(__file__)
def _flush_logs():
"""Force flush all log handlers and stdout/stderr."""
for handler in logging.root.handlers:
handler.flush()
sys.stdout.flush()
sys.stderr.flush()
@cache @cache
def get_url() -> str: def get_url() -> str:
use_ssl = os.environ.get("USE_SSL", "false") == "true" use_ssl = os.environ.get("USE_SSL", "false") == "true"
@@ -128,41 +119,22 @@ class Metrics:
await self.__send_delete_requests_and_reset() await self.__send_delete_requests_and_reset()
async def _send_metrics_loop(self) -> Awaitable[NoReturn]: async def _send_metrics_loop(self) -> Awaitable[NoReturn]:
loop_count = 0
first_loaded_send_done = False
while True: while True:
await sleep(METRICS_UPDATE_INTERVAL) await sleep(METRICS_UPDATE_INTERVAL)
loop_count += 1
elapsed = time.time() - self.last_metric_update elapsed = time.time() - self.last_metric_update
# Log heartbeat every 30 seconds to confirm loop is running
if loop_count % 30 == 0:
log.debug(f"[heartbeat] metrics loop alive, loop_count={loop_count}, model_loaded={self.system_metrics.model_is_loaded}")
_flush_logs()
# Extra logging for first few iterations after model loads
if self.system_metrics.model_is_loaded and not first_loaded_send_done:
log.info(f"[transition] First iteration with model_loaded=True, loop_count={loop_count}, elapsed={elapsed:.1f}")
_flush_logs()
if self.system_metrics.model_is_loaded is False and elapsed >= 10: if self.system_metrics.model_is_loaded is False and elapsed >= 10:
log.debug(f"sending loading model metrics after {int(elapsed)}s wait") log.debug(f"sending loading model metrics after {int(elapsed)}s wait")
await self.__send_metrics_and_reset() await self.__send_metrics_and_reset()
elif self.update_pending or elapsed > 10: elif self.update_pending or elapsed > 10:
log.debug(f"sending loaded model metrics after {int(elapsed)}s wait") log.debug(f"sending loaded model metrics after {int(elapsed)}s wait")
await self.__send_metrics_and_reset() await self.__send_metrics_and_reset()
if self.system_metrics.model_is_loaded and not first_loaded_send_done:
first_loaded_send_done = True
log.info(f"[transition] First loaded metrics send complete, continuing to next iteration...")
_flush_logs()
def _model_loaded(self, max_throughput: float) -> None: def _model_loaded(self, max_throughput: float) -> None:
log.info(f"MODEL LOADED: Setting model_is_loaded=True, max_throughput={max_throughput}")
_flush_logs()
self.system_metrics.model_loading_time = ( self.system_metrics.model_loading_time = (
time.time() - self.system_metrics.model_loading_start time.time() - self.system_metrics.model_loading_start
) )
self.system_metrics.model_is_loaded = True self.system_metrics.model_is_loaded = True
self.model_metrics.max_throughput = max_throughput self.model_metrics.max_throughput = max_throughput
log.info(f"MODEL LOADED: model_loading_time={self.system_metrics.model_loading_time}")
_flush_logs()
def _model_errored(self, error_msg: str) -> None: def _model_errored(self, error_msg: str) -> None:
self.model_metrics.set_errored(error_msg) self.model_metrics.set_errored(error_msg)
@@ -299,7 +271,6 @@ class Metrics:
########### ###########
self.system_metrics.update_disk_usage() self.system_metrics.update_disk_usage()
had_loadtime = loadtime_snapshot is not None and loadtime_snapshot > 0
sent = False sent = False
for report_addr in self.report_addr: for report_addr in self.report_addr:
@@ -308,14 +279,8 @@ class Metrics:
break break
if sent: if sent:
if had_loadtime:
log.info(f"FIRST LOADTIME METRICS SENT SUCCESSFULLY! loadtime={loadtime_snapshot}")
_flush_logs()
# clear the one-shot loadtime only if we actually sent *this* value # clear the one-shot loadtime only if we actually sent *this* value
self.system_metrics.reset(expected=loadtime_snapshot) self.system_metrics.reset(expected=loadtime_snapshot)
self.update_pending = False self.update_pending = False
self.model_metrics.reset() self.model_metrics.reset()
self.last_metric_update = time.time() self.last_metric_update = time.time()
if had_loadtime:
log.info(f"POST-SEND: reset complete, last_metric_update={self.last_metric_update}, continuing loop...")
_flush_logs()
-20
View File
@@ -1,7 +1,5 @@
import os import os
import logging import logging
import signal
import sys
from typing import List from typing import List
import ssl import ssl
from asyncio import run, gather from asyncio import run, gather
@@ -14,25 +12,7 @@ from aiohttp import web
log = logging.getLogger(__file__) log = logging.getLogger(__file__)
def _setup_signal_handlers():
"""Setup signal handlers to log when process receives termination signals."""
def signal_handler(signum, frame):
sig_name = signal.Signals(signum).name
log.error(f"SIGNAL RECEIVED: {sig_name} ({signum}) - process is being terminated")
sys.stdout.flush()
sys.stderr.flush()
sys.exit(128 + signum)
# Handle common termination signals
for sig in [signal.SIGTERM, signal.SIGINT, signal.SIGHUP]:
try:
signal.signal(sig, signal_handler)
except (OSError, ValueError):
pass # Some signals may not be available
def start_server(backend: Backend, routes: List[web.RouteDef], **kwargs): def start_server(backend: Backend, routes: List[web.RouteDef], **kwargs):
_setup_signal_handlers()
try: try:
log.debug("getting certificate...") log.debug("getting certificate...")
use_ssl = os.environ.get("USE_SSL", "false") == "true" use_ssl = os.environ.get("USE_SSL", "false") == "true"
+1 -1
View File
@@ -8,4 +8,4 @@ Requests~=2.32
transformers~=4.52 transformers~=4.52
utils==1.0.* utils==1.0.*
hf_transfer>=0.1.9 hf_transfer>=0.1.9
vastai-sdk>=0.2.0 git+https://github.com/vast-ai/vast-sdk.git@worker-sdk
+13 -2
View File
@@ -133,8 +133,19 @@ cd "$SERVER_DIR"
echo "launching PyWorker server" echo "launching PyWorker server"
set +e set +e
python3 -m "workers.$BACKEND.server" |& tee -a "$PYWORKER_LOG"
# Try worker entrypoint first
echo "trying workers.${BACKEND}.worker"
python3 -m "workers.${BACKEND}.worker" |& tee -a "$PYWORKER_LOG"
PY_STATUS=${PIPESTATUS[0]} PY_STATUS=${PIPESTATUS[0]}
# If that fails, fall back to server
if [ "${PY_STATUS}" -ne 0 ]; then
echo "workers.${BACKEND}.worker failed with status ${PY_STATUS}, trying workers.${BACKEND}.server"
python3 -m "workers.${BACKEND}.server" |& tee -a "$PYWORKER_LOG"
PY_STATUS=${PIPESTATUS[0]}
fi
set -e set -e
if [ "${PY_STATUS}" -ne 0 ]; then if [ "${PY_STATUS}" -ne 0 ]; then
@@ -171,4 +182,4 @@ JSON
done done
fi fi
echo "launching PyWorker server done" echo "launching PyWorker server done"
+184
View File
@@ -0,0 +1,184 @@
import random
import sys
from vastai import Worker, WorkerConfig, HandlerConfig, LogActionConfig, BenchmarkConfig
# ComyUI model configuration
MODEL_SERVER_URL = 'http://127.0.0.1'
MODEL_SERVER_PORT = 18288
MODEL_LOG_FILE = '/var/log/portal/comfyui.log'
MODEL_HEALTHCHECK_ENDPOINT = "/health"
# ComyUI-specific log messages
MODEL_LOAD_LOG_MSG = [
"To see the GUI go to: "
]
MODEL_ERROR_LOG_MSGS = [
"MetadataIncompleteBuffer",
"Value not in list: ",
"[ERROR] Provisioning Script failed"
]
MODEL_INFO_LOG_MSGS = [
'"message":"Downloading'
]
benchmark_lyrics = [
"[verse]\nGuardian cloaked in twilight hue\nShadows melt where he breaks through\nEchoes swirl in mystic flight\nHooded hero owns the night\n\n[verse]\nThrough the chaos shapes arise\nFeral whispers, glowing eyes\nOrcs and creatures side by side\nMarch within the inky tide\n\n[chorus]\nRise above the fear and gloom\nLet your courage fully bloom\nIn the darkness stand your ground\nHear the night proclaim your sound",
"[verse]\nMorning sun on fields of gold\nGentle stories unfold\nEvery breeze a quiet song\nWhere the peaceful hearts belong\n\n[verse]\nLanterns glow at stable doors\nRustling leaves on orchard floors\nSimple joys in every hand\nLife grows soft in fertile land\n\n[chorus]\nLet the day drift slow and free\nRoot your soul where you can be\nIn this haven warm and bright\nFeel the earth breathe pure delight",
"[verse]\nLittle feet on dusty ground\nChasing dreams without a sound\nSoccer ball in morning light\nHopes take wing in youthful flight\n\n[verse]\nChrome reflections paint the day\nSwagger in the steps that play\nCopper tones in shining air\nChildhood gleaming everywhere\n\n[chorus]\nKick the world with boundless cheer\nHold the magic close and near\nIn each moment bold and true\nLet the sky belong to you",
"[verse]\nSunset bleeds across the street\nGilded calm in summer heat\nLow-rise towers rimmed with fire\nDreams ignite as lights climb higher\n\n[verse]\nFootsteps scatter through the haze\nFutures shimmer in the blaze\nEvery window tells a tale\nFloating through a tangerine veil\n\n[chorus]\nLet the neon softly glow\nLet your restless heartbeat slow\nIn this city forged in light\nCarry hope into the night",
"[verse]\nOcean breathes in rolling arcs\nSprays of diamond, glowing sparks\nWaves unfold a perfect line\nNatures rhythm feels divine\n\n[verse]\nSun above in golden sweep\nPaints the rise of every deep\nShimmer drifting through the blue\nWorld reborn in every view\n\n[chorus]\nLet the tide pull you along\nHear the waters ancient song\nIn the cresting waves youll find\nQuiet peace for heart and mind",
"[verse]\nGlass aglow with swirling light\nFruits and mints in colors bright\nIcy whispers clink and chime\nFlowing forms suspend in time\n\n[verse]\nCreamy spirals drift within\nGentle currents slowly spin\nWarm reflections lingering sweet\nMixing flavors at your feet\n\n[chorus]\nSip the glow and let it rise\nTaste the sunset in disguise\nIn this moment clear and true\nLet the warmth flow into you",
"[verse]\nEngines rumble down the lane\nCopper clouds of steam and rain\nOilpunk dreams in metal shine\nRider drifting down the line\n\n[verse]\nLeather jacket, steady glare\nStories sparking in the air\nMagazine lights frame his face\nKing of roads in timeless grace\n\n[chorus]\nThrottle up beyond the bend\nFeel the force of steel ascend\nRide the night and hold on tight\nClaim the world in streaks of light",
"[verse]\nCut-out shapes in swirling play\nTextures dance in bold array\nCats in denim, grinning wide\nStrut across the patterned tide\n\n[verse]\nPosters hum with neon glow\nSurreal scenes begin to grow\nColors crisp as folded art\nPatchwork beating like a heart\n\n[chorus]\nLet the collage come alive\nWatch the vibrant pieces thrive\nIn this joyful, crafted space\nEvery shape finds its own place",
"[verse]\nTiny world in crystal glass\nAncient tales behind the mass\nVillage lights in winter gleam\nFrozen in a mystic dream\n\n[verse]\nLantern beams in swirling air\nSoft enchantment everywhere\nShadows drift with gentle grace\nMagic sealed within the space\n\n[chorus]\nHold the sphere and you will see\nEchoes of a memory\nIn the glow of fragile light\nLives a realm of pure delight",
"[verse]\nArmor hums with power bright\nChopping sparks in jungle night\nMecha spirits shift and scream\nThrough the ferns like shattered beams\n\n[verse]\nAxes blaze in glowing arcs\nLighting up the shadowed marks\nNature roars in trembling air\nClash of steel and cosmic flare\n\n[chorus]\nRaise the fire, strike the ground\nLet your legend shake the sound\nIn the wild where echoes roam\nForge the fight and carve your home",
"[verse]\nCrowds ignite in vibrant flare\nBeats explode through smoky air\nDJ robes replaced with flame\nPope on decks in holy frame\n\n[verse]\nLeather gleams in blinding light\nTurntables spin with sacred might\nChoirs echo in the bass\nHeaven pulses through the place\n\n[chorus]\nLift the roof and shake the floor\nSacred rhythm evermore\nLet the music take control\nFeel the blessing in your soul",
]
benchmark_dataset = [
{
"input": {
"request_id": "",
"workflow_json": {
"14": {
"inputs": {
"tags": "funk, pop, soul, rock, melodic, guitar, drums, bass, keyboard, percussion, 105 BPM, energetic, upbeat, groovy, vibrant, dynamic",
"lyrics": lyrics,
"lyrics_strength": 0.99,
"clip": ["40", 1]
},
"class_type": "TextEncodeAceStepAudio",
"_meta": {
"title": "TextEncodeAceStepAudio"
}
},
"17": {
"inputs": {
"seconds": 180,
"batch_size": 1
},
"class_type": "EmptyAceStepLatentAudio",
"_meta": {
"title": "EmptyAceStepLatentAudio"
}
},
"18": {
"inputs": {
"samples": ["52", 0],
"vae": ["40", 2]
},
"class_type": "VAEDecodeAudio",
"_meta": {
"title": "VAE Decode Audio"
}
},
"40": {
"inputs": {
"ckpt_name": "ace_step_v1_3.5b.safetensors"
},
"class_type": "CheckpointLoaderSimple",
"_meta": {
"title": "Load Checkpoint"
}
},
"44": {
"inputs": {
"conditioning": ["14", 0]
},
"class_type": "ConditioningZeroOut",
"_meta": {
"title": "ConditioningZeroOut"
}
},
"49": {
"inputs": {
"model": ["51", 0],
"operation": ["50", 0]
},
"class_type": "LatentApplyOperationCFG",
"_meta": {
"title": "LatentApplyOperationCFG"
}
},
"50": {
"inputs": {
"multiplier": 1.15
},
"class_type": "LatentOperationTonemapReinhard",
"_meta": {
"title": "LatentOperationTonemapReinhard"
}
},
"51": {
"inputs": {
"shift": 6,
"model": ["40", 0]
},
"class_type": "ModelSamplingSD3",
"_meta": {
"title": "ModelSamplingSD3"
}
},
"52": {
"inputs": {
"seed": "__RANDOM_INT__",
"steps": 65,
"cfg": 4,
"sampler_name": "er_sde",
"scheduler": "linear_quadratic",
"denoise": 1,
"model": ["49", 0],
"positive": ["14", 0],
"negative": ["44", 0],
"latent_image": ["17", 0]
},
"class_type": "KSampler",
"_meta": {
"title": "KSampler"
}
},
"59": {
"inputs": {
"filename_prefix": "audio/ComfyUI",
"quality": "V0",
"audioUI": "",
"audio": ["18", 0]
},
"class_type": "SaveAudioMP3",
"_meta": {
"title": "Save Audio (MP3)"
}
}
}
}
} for lyrics in benchmark_lyrics
]
worker_config = WorkerConfig(
model_server_url=MODEL_SERVER_URL,
model_server_port=MODEL_SERVER_PORT,
model_log_file=MODEL_LOG_FILE,
model_healthcheck_url=MODEL_HEALTHCHECK_ENDPOINT,
handlers=[
HandlerConfig(
route="/generate/sync",
allow_parallel_requests=False,
max_queue_time=10.0,
benchmark_config=BenchmarkConfig(
dataset=benchmark_dataset,
runs=1
),
workload_calculator= lambda _ : 1000.0
)
],
log_action_config=LogActionConfig(
on_load=MODEL_LOAD_LOG_MSG,
on_error=MODEL_ERROR_LOG_MSGS,
on_info=MODEL_INFO_LOG_MSGS
)
)
Worker(worker_config).run()
+81
View File
@@ -0,0 +1,81 @@
import random
import sys
from vastai import Worker, WorkerConfig, HandlerConfig, LogActionConfig, BenchmarkConfig
# ComyUI model configuration
MODEL_SERVER_URL = 'http://127.0.0.1'
MODEL_SERVER_PORT = 18288
MODEL_LOG_FILE = '/var/log/portal/comfyui.log'
MODEL_HEALTHCHECK_ENDPOINT = "/health"
# ComyUI-specific log messages
MODEL_LOAD_LOG_MSG = [
"To see the GUI go to: "
]
MODEL_ERROR_LOG_MSGS = [
"MetadataIncompleteBuffer",
"Value not in list: ",
"[ERROR] Provisioning Script failed"
]
MODEL_INFO_LOG_MSGS = [
'"message":"Downloading'
]
benchmark_prompts = [
"Cartoon hoodie hero; orc, anime cat, bunny; black goo; buff; vector on white.",
"Cozy farming-game scene with fine details.",
"2D vector child with soccer ball; airbrush chrome; swagger; antique copper.",
"Realistic futuristic downtown of low buildings at sunset.",
"Perfect wave front view; sunny seascape; ultra-detailed water; artful feel.",
"Clear cup with ice, fruit, mint; creamy swirls; fluid-sim CGI; warm glow.",
"Male biker with backpack on motorcycle; oilpunk; award-worthy magazine cover.",
"Collage for textile; surreal cartoon cat in cap/jeans before poster; crisp.",
"Medieval village inside glass sphere; volumetric light; macro focus.",
"Iron Man with glowing axe; mecha sci-fi; jungle scene; dynamic light.",
"Pope Francis DJ in leather jacket, mixing on giant console; dramatic.",
]
benchmark_dataset = [
{
"input": {
"request_id": f"test-{random.randint(1000, 99999)}",
"modifier": "Text2Image",
"modifications": {
"prompt": prompt,
"width": 512,
"height": 512,
"steps": 20,
"seed": random.randint(0, sys.maxsize)
}
}
} for prompt in benchmark_prompts
]
worker_config = WorkerConfig(
model_server_url=MODEL_SERVER_URL,
model_server_port=MODEL_SERVER_PORT,
model_log_file=MODEL_LOG_FILE,
model_healthcheck_url=MODEL_HEALTHCHECK_ENDPOINT,
handlers=[
HandlerConfig(
route="/generate/sync",
allow_parallel_requests=False,
max_queue_time=10.0,
benchmark_config=BenchmarkConfig(
dataset=benchmark_dataset,
)
)
],
log_action_config=LogActionConfig(
on_load=MODEL_LOAD_LOG_MSG,
on_error=MODEL_ERROR_LOG_MSGS,
on_info=MODEL_INFO_LOG_MSGS
)
)
Worker(worker_config).run()
+1
View File
@@ -11,6 +11,7 @@ MODEL_SERVER_START_LOG_MSG = [
"llama runner started", # Ollama "llama runner started", # Ollama
'"message":"Connected","target":"text_generation_router"', # TGI '"message":"Connected","target":"text_generation_router"', # TGI
'"message":"Connected","target":"text_generation_router::server"', # TGI '"message":"Connected","target":"text_generation_router::server"', # TGI
"main: model loaded" # llama.cpp
] ]
MODEL_SERVER_ERROR_LOG_MSGS = [ MODEL_SERVER_ERROR_LOG_MSGS = [
+78
View File
@@ -0,0 +1,78 @@
import nltk
import random
import os
from vastai import Worker, WorkerConfig, HandlerConfig, LogActionConfig, BenchmarkConfig
# vLLM model configuration
MODEL_SERVER_URL = 'http://127.0.0.1'
MODEL_SERVER_PORT = 18000
MODEL_LOG_FILE = '/var/log/portal/vllm.log'
MODEL_HEALTHCHECK_ENDPOINT = "/health"
# vLLM-specific log messages
MODEL_LOAD_LOG_MSG = [
"Application startup complete.",
]
MODEL_ERROR_LOG_MSGS = [
"INFO exited: vllm",
"RuntimeError: Engine",
"Traceback (most recent call last):"
]
MODEL_INFO_LOG_MSGS = [
'"message":"Download'
]
nltk.download("words")
WORD_LIST = nltk.corpus.words.words()
def completions_benchmark_generator() -> dict:
prompt = " ".join(random.choices(WORD_LIST, k=int(250)))
model = os.environ.get("MODEL_NAME")
if not model:
raise ValueError("MODEL_NAME environment variable not set")
benchmark_data = {
"model": model,
"prompt": prompt,
"temperature": 0.7,
"max_tokens": 500,
}
return benchmark_data
worker_config = WorkerConfig(
model_server_url=MODEL_SERVER_URL,
model_server_port=MODEL_SERVER_PORT,
model_log_file=MODEL_LOG_FILE,
model_healthcheck_url=MODEL_HEALTHCHECK_ENDPOINT,
handlers=[
HandlerConfig(
route="/v1/completions",
workload_calculator= lambda data: data.get("max_tokens", 0),
allow_parallel_requests=True,
max_queue_time=60.0,
benchmark_config=BenchmarkConfig(
generator=completions_benchmark_generator,
concurrency=100,
runs=2
)
),
HandlerConfig(
route="/v1/chat/completions",
workload_calculator= lambda data: data.get("max_tokens", 0),
allow_parallel_requests=True,
max_queue_time=60.0,
)
],
log_action_config=LogActionConfig(
on_load=MODEL_LOAD_LOG_MSG,
on_error=MODEL_ERROR_LOG_MSGS,
on_info=MODEL_INFO_LOG_MSGS
)
)
Worker(worker_config).run()
+76
View File
@@ -0,0 +1,76 @@
import nltk
import random
from vastai import Worker, WorkerConfig, HandlerConfig, LogActionConfig, BenchmarkConfig
# TGI model configuration
MODEL_SERVER_URL = 'http://0.0.0.0'
MODEL_SERVER_PORT = 5001
MODEL_LOG_FILE = "/workspace/infer.log"
MODEL_HEALTHCHECK_ENDPOINT = "/health"
# TGI-specific log messages
MODEL_LOAD_LOG_MSG = [
'"message":"Connected","target":"text_generation_router"',
'"message":"Connected","target":"text_generation_router::server"',
]
MODEL_ERROR_LOG_MSGS = [
"Error: WebserverFailed",
"Error: DownloadError",
"Error: ShardCannotStart",
]
MODEL_INFO_LOG_MSGS = [
'"message":"Download'
]
nltk.download("words")
WORD_LIST = nltk.corpus.words.words()
def benchmark_generator() -> dict:
prompt = " ".join(random.choices(WORD_LIST, k=int(250)))
benchmark_data = {
"inputs": prompt,
"parameters": {
"max_new_tokens": 128,
"temperature": 0.7,
"return_full_text": False
}
}
return benchmark_data
worker_config = WorkerConfig(
model_server_url=MODEL_SERVER_URL,
model_server_port=MODEL_SERVER_PORT,
model_log_file=MODEL_LOG_FILE,
model_healthcheck_url=MODEL_HEALTHCHECK_ENDPOINT,
handlers=[
HandlerConfig(
route="/generate",
allow_parallel_requests=True,
max_queue_time=60.0,
benchmark_config=BenchmarkConfig(
generator=benchmark_generator,
concurrency=50
),
workload_calculator= lambda x: x["parameters"]["max_new_tokens"]
),
HandlerConfig(
route="/generate_stream",
allow_parallel_requests=True,
max_queue_time=60.0,
workload_calculator= lambda x: x["parameters"]["max_new_tokens"]
)
],
log_action_config=LogActionConfig(
on_load=MODEL_LOAD_LOG_MSG,
on_error=MODEL_ERROR_LOG_MSGS,
on_info=MODEL_INFO_LOG_MSGS
)
)
Worker(worker_config).run()
+288
View File
@@ -0,0 +1,288 @@
import random
import sys
from vastai import Worker, WorkerConfig, HandlerConfig, LogActionConfig, BenchmarkConfig
# ComyUI model configuration
MODEL_SERVER_URL = 'http://127.0.0.1'
MODEL_SERVER_PORT = 18288
MODEL_LOG_FILE = '/var/log/portal/comfyui.log'
MODEL_HEALTHCHECK_ENDPOINT = "/health"
# ComyUI-specific log messages
MODEL_LOAD_LOG_MSG = [
"To see the GUI go to: "
]
MODEL_ERROR_LOG_MSGS = [
"MetadataIncompleteBuffer",
"Value not in list: ",
"[ERROR] Provisioning Script failed"
]
MODEL_INFO_LOG_MSGS = [
'"message":"Downloading'
]
benchmark_prompts = [
"Cartoon hoodie hero; orc, anime cat, bunny; black goo; buff; vector on white.",
"Cozy farming-game scene with fine details.",
"2D vector child with soccer ball; airbrush chrome; swagger; antique copper.",
"Realistic futuristic downtown of low buildings at sunset.",
"Perfect wave front view; sunny seascape; ultra-detailed water; artful feel.",
"Clear cup with ice, fruit, mint; creamy swirls; fluid-sim CGI; warm glow.",
"Male biker with backpack on motorcycle; oilpunk; award-worthy magazine cover.",
"Collage for textile; surreal cartoon cat in cap/jeans before poster; crisp.",
"Medieval village inside glass sphere; volumetric light; macro focus.",
"Iron Man with glowing axe; mecha sci-fi; jungle scene; dynamic light.",
"Pope Francis DJ in leather jacket, mixing on giant console; dramatic.",
]
benchmark_dataset = [
{
"input": {
"workflow_json": {
"90": {
"inputs": {
"clip_name": "umt5_xxl_fp8_e4m3fn_scaled.safetensors",
"type": "wan",
"device": "default"
},
"class_type": "CLIPLoader",
"_meta": {
"title": "Load CLIP"
}
},
"91": {
"inputs": {
"text": "色调艳丽,过曝,静态,细节模糊不清,字幕,风格,作品,画作,画面,静止,整体发灰,最差质量,低质量,JPEG压缩残留,丑陋的,残缺的,多余的手指,画得不好的手部,画得不好的脸部,畸形的,毁容的,形态畸形的肢体,手指融合,静止不动的画面,杂乱的背景,三条腿,背景人很多,倒着走,裸露,NSFW",
"clip": [
"90",
0
]
},
"class_type": "CLIPTextEncode",
"_meta": {
"title": "CLIP Text Encode (Negative Prompt)"
}
},
"92": {
"inputs": {
"vae_name": "wan_2.1_vae.safetensors"
},
"class_type": "VAELoader",
"_meta": {
"title": "Load VAE"
}
},
"93": {
"inputs": {
"shift": 8.000000000000002,
"model": [
"101",
0
]
},
"class_type": "ModelSamplingSD3",
"_meta": {
"title": "ModelSamplingSD3"
}
},
"94": {
"inputs": {
"shift": 8,
"model": [
"102",
0
]
},
"class_type": "ModelSamplingSD3",
"_meta": {
"title": "ModelSamplingSD3"
}
},
"95": {
"inputs": {
"add_noise": "disable",
"noise_seed": 0,
"steps": 20,
"cfg": 3.5,
"sampler_name": "euler",
"scheduler": "simple",
"start_at_step": 10,
"end_at_step": 10000,
"return_with_leftover_noise": "disable",
"model": [
"94",
0
],
"positive": [
"99",
0
],
"negative": [
"91",
0
],
"latent_image": [
"96",
0
]
},
"class_type": "KSamplerAdvanced",
"_meta": {
"title": "KSampler (Advanced)"
}
},
"96": {
"inputs": {
"add_noise": "enable",
"noise_seed": "__RANDOM_INT__",
"steps": 20,
"cfg": 3.5,
"sampler_name": "euler",
"scheduler": "simple",
"start_at_step": 0,
"end_at_step": 10,
"return_with_leftover_noise": "enable",
"model": [
"93",
0
],
"positive": [
"99",
0
],
"negative": [
"91",
0
],
"latent_image": [
"104",
0
]
},
"class_type": "KSamplerAdvanced",
"_meta": {
"title": "KSampler (Advanced)"
}
},
"97": {
"inputs": {
"samples": [
"95",
0
],
"vae": [
"92",
0
]
},
"class_type": "VAEDecode",
"_meta": {
"title": "VAE Decode"
}
},
"98": {
"inputs": {
"filename_prefix": "video/ComfyUI",
"format": "auto",
"codec": "auto",
"video": [
"100",
0
]
},
"class_type": "SaveVideo",
"_meta": {
"title": "Save Video"
}
},
"99": {
"inputs": {
"text":prompt,
"clip": [
"90",
0
]
},
"class_type": "CLIPTextEncode",
"_meta": {
"title": "CLIP Text Encode (Positive Prompt)"
}
},
"100": {
"inputs": {
"fps": 16,
"images": [
"97",
0
]
},
"class_type": "CreateVideo",
"_meta": {
"title": "Create Video"
}
},
"101": {
"inputs": {
"unet_name": "wan2.2_t2v_high_noise_14B_fp8_scaled.safetensors",
"weight_dtype": "default"
},
"class_type": "UNETLoader",
"_meta": {
"title": "Load Diffusion Model"
}
},
"102": {
"inputs": {
"unet_name": "wan2.2_t2v_low_noise_14B_fp8_scaled.safetensors",
"weight_dtype": "default"
},
"class_type": "UNETLoader",
"_meta": {
"title": "Load Diffusion Model"
}
},
"104": {
"inputs": {
"width": 640,
"height": 640,
"length": 81,
"batch_size": 1
},
"class_type": "EmptyHunyuanLatentVideo",
"_meta": {
"title": "EmptyHunyuanLatentVideo"
}
}
}
}
} for prompt in benchmark_prompts
]
worker_config = WorkerConfig(
model_server_url=MODEL_SERVER_URL,
model_server_port=MODEL_SERVER_PORT,
model_log_file=MODEL_LOG_FILE,
model_healthcheck_url=MODEL_HEALTHCHECK_ENDPOINT,
handlers=[
HandlerConfig(
route="/generate/sync",
allow_parallel_requests=False,
max_queue_time=10.0,
benchmark_config=BenchmarkConfig(
dataset=benchmark_dataset,
runs=1
),
workload_calculator= lambda _ : 10000.0
)
],
log_action_config=LogActionConfig(
on_load=MODEL_LOAD_LOG_MSG,
on_error=MODEL_ERROR_LOG_MSGS,
on_info=MODEL_INFO_LOG_MSGS
)
)
Worker(worker_config).run()