Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 7d3be849d9 | |||
| 0397af719d | |||
| 4fdc314fd9 | |||
| 639d82f5b4 | |||
| 25db78e39d | |||
| 4e2f2311d0 | |||
| 38782d89bc | |||
| 0185216ccb | |||
| b20d9e714c | |||
| b1eb65d75d | |||
| 1d09d7fe96 | |||
| 1b37054dec | |||
| 1a1e4174b8 | |||
| b8377c4081 |
+13
-2
@@ -190,18 +190,30 @@ class Backend:
|
|||||||
log.debug(f"Exception in main handler loop {e}")
|
log.debug(f"Exception in main handler loop {e}")
|
||||||
return web.Response(status=500)
|
return web.Response(status=500)
|
||||||
|
|
||||||
|
@cached_property
|
||||||
|
def healthcheck_session(self):
|
||||||
|
"""Dedicated session for healthchecks to avoid conflicts with API session"""
|
||||||
|
log.debug("creating dedicated healthcheck session")
|
||||||
|
connector = TCPConnector(
|
||||||
|
force_close=True, # Keep this for isolation
|
||||||
|
enable_cleanup_closed=True,
|
||||||
|
)
|
||||||
|
timeout = ClientTimeout(total=10) # Reasonable timeout for healthchecks
|
||||||
|
return ClientSession(timeout=timeout, connector=connector)
|
||||||
|
|
||||||
async def __healthcheck(self):
|
async def __healthcheck(self):
|
||||||
health_check_url = self.benchmark_handler.healthcheck_endpoint
|
health_check_url = self.benchmark_handler.healthcheck_endpoint
|
||||||
if health_check_url is None:
|
if health_check_url is None:
|
||||||
log.debug("No healthcheck endpoint defined, skipping healthcheck")
|
log.debug("No healthcheck endpoint defined, skipping healthcheck")
|
||||||
return
|
return
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
await sleep(10)
|
await sleep(10)
|
||||||
if self.__start_healthcheck is False:
|
if self.__start_healthcheck is False:
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
log.debug(f"Performing healthcheck on {health_check_url}")
|
log.debug(f"Performing healthcheck on {health_check_url}")
|
||||||
async with self.session.get(health_check_url) as response:
|
async with self.healthcheck_session.get(health_check_url) as response:
|
||||||
if response.status == 200:
|
if response.status == 200:
|
||||||
log.debug("Healthcheck successful")
|
log.debug("Healthcheck successful")
|
||||||
elif response.status == 503:
|
elif response.status == 503:
|
||||||
@@ -210,7 +222,6 @@ class Backend:
|
|||||||
f"Healthcheck failed with status: {response.status}"
|
f"Healthcheck failed with status: {response.status}"
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# endpoint not ready yet so bail
|
|
||||||
log.debug(f"Healthcheck Endpoint not ready: {response.status}")
|
log.debug(f"Healthcheck Endpoint not ready: {response.status}")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
log.debug(f"Healthcheck failed with exception: {e}")
|
log.debug(f"Healthcheck failed with exception: {e}")
|
||||||
|
|||||||
+2
-2
@@ -59,12 +59,12 @@ then
|
|||||||
fi
|
fi
|
||||||
|
|
||||||
# Fork testing
|
# Fork testing
|
||||||
git clone "${PYWORKER_REPO:-https://github.com/vast-ai/pyworker}" "$SERVER_DIR"
|
[[ ! -d $SERVER_DIR ]] && git clone "${PYWORKER_REPO:-https://github.com/vast-ai/pyworker}" "$SERVER_DIR"
|
||||||
if [[ -n ${PYWORKER_REF:-} ]]; then
|
if [[ -n ${PYWORKER_REF:-} ]]; then
|
||||||
(cd "$SERVER_DIR" && git checkout "$PYWORKER_REF")
|
(cd "$SERVER_DIR" && git checkout "$PYWORKER_REF")
|
||||||
fi
|
fi
|
||||||
|
|
||||||
uv venv --managed-python "$ENV_PATH" -p 3.10
|
uv venv --python-preference only-managed "$ENV_PATH" -p 3.10
|
||||||
source "$ENV_PATH/bin/activate"
|
source "$ENV_PATH/bin/activate"
|
||||||
|
|
||||||
uv pip install -r "${SERVER_DIR}/requirements.txt"
|
uv pip install -r "${SERVER_DIR}/requirements.txt"
|
||||||
|
|||||||
@@ -13,8 +13,9 @@ with open("workers/comfyui/misc/test_prompts.txt", "r") as f:
|
|||||||
test_prompts = f.readlines()
|
test_prompts = f.readlines()
|
||||||
|
|
||||||
def count_workload() -> float:
|
def count_workload() -> float:
|
||||||
# Always 1.0 where there is a single instance of ComfyUI handling requests
|
# Always 100.0 where there is a single instance of ComfyUI handling requests
|
||||||
return 1.0
|
# Results will indicate % or a job completed per second. Avoids sub 0.1 sec performance indication
|
||||||
|
return 100.0
|
||||||
|
|
||||||
@dataclasses.dataclass
|
@dataclasses.dataclass
|
||||||
class ComfyWorkflowData(ApiPayload):
|
class ComfyWorkflowData(ApiPayload):
|
||||||
|
|||||||
@@ -33,32 +33,38 @@ log = logging.getLogger(__file__)
|
|||||||
async def generate_client_response(
|
async def generate_client_response(
|
||||||
client_request: web.Request, model_response: ClientResponse
|
client_request: web.Request, model_response: ClientResponse
|
||||||
) -> Union[web.Response, web.StreamResponse]:
|
) -> Union[web.Response, web.StreamResponse]:
|
||||||
# Check if the response is actually streaming based on response headers/content-type
|
match model_response.status:
|
||||||
is_streaming_response = (
|
case 200:
|
||||||
model_response.content_type == "text/event-stream"
|
log.debug("SUCCESS")
|
||||||
or model_response.content_type == "application/x-ndjson"
|
# Check if the response is actually streaming based on response headers/content-type
|
||||||
or model_response.headers.get("Transfer-Encoding") == "chunked"
|
is_streaming_response = (
|
||||||
or "stream" in model_response.content_type.lower()
|
model_response.content_type == "text/event-stream"
|
||||||
)
|
or model_response.content_type == "application/x-ndjson"
|
||||||
|
or model_response.headers.get("Transfer-Encoding") == "chunked"
|
||||||
|
or "stream" in model_response.content_type.lower()
|
||||||
|
)
|
||||||
|
|
||||||
if is_streaming_response:
|
if is_streaming_response:
|
||||||
log.debug("Detected streaming response...")
|
log.debug("Detected streaming response...")
|
||||||
res = web.StreamResponse()
|
res = web.StreamResponse()
|
||||||
res.content_type = model_response.content_type
|
res.content_type = model_response.content_type
|
||||||
await res.prepare(client_request)
|
await res.prepare(client_request)
|
||||||
async for chunk in model_response.content:
|
async for chunk in model_response.content:
|
||||||
await res.write(chunk)
|
await res.write(chunk)
|
||||||
await res.write_eof()
|
await res.write_eof()
|
||||||
log.debug("Done streaming response")
|
log.debug("Done streaming response")
|
||||||
return res
|
return res
|
||||||
else:
|
else:
|
||||||
log.debug("Detected non-streaming response...")
|
log.debug("Detected non-streaming response...")
|
||||||
content = await model_response.read()
|
content = await model_response.read()
|
||||||
return web.Response(
|
return web.Response(
|
||||||
body=content,
|
body=content,
|
||||||
status=model_response.status,
|
status=model_response.status,
|
||||||
content_type=model_response.content_type
|
content_type=model_response.content_type
|
||||||
)
|
)
|
||||||
|
case code:
|
||||||
|
log.debug(f"Model responded with error {code}")
|
||||||
|
return web.Response(status=code)
|
||||||
|
|
||||||
|
|
||||||
@dataclasses.dataclass
|
@dataclasses.dataclass
|
||||||
@@ -70,7 +76,7 @@ class ComfyWorkflowHandler(EndpointHandler[ComfyWorkflowData]):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def healthcheck_endpoint(self) -> Optional[str]:
|
def healthcheck_endpoint(self) -> Optional[str]:
|
||||||
return None
|
return f"{MODEL_SERVER_URL}/health"
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def payload_cls(cls) -> Type[ComfyWorkflowData]:
|
def payload_cls(cls) -> Type[ComfyWorkflowData]:
|
||||||
|
|||||||
Reference in New Issue
Block a user