diff --git a/workers/comfyui-json/README.md b/workers/comfyui-json/README.md index 7aa1ba3..9517dbb 100644 --- a/workers/comfyui-json/README.md +++ b/workers/comfyui-json/README.md @@ -1,8 +1,16 @@ # ComfyUI PyWorker -This is the base PyWorker for ComfyUI. It provides a unified interface for running any ComfyUI workflow through a proxy-based architecture. +This is the base PyWorker for ComfyUI. It provides a unified interface for running any ComfyUI workflow through a proxy-based architecture. See the [Serverless documentation](https://docs.vast.ai/serverless) for guides and how-to's. -The cost for each request has a static value of `1`. ComfyUI does not handle concurrent workloads and there is no current provision to load multiple instances of ComfyUI per worker node. +The cost for each request has a static value of `1`. ComfyUI does not handle concurrent workloads and there is no current provision to load multiple instances of ComfyUI per worker node. + +## Instance Setup + +1. Pick a template + +- [ComfyUI (Serverless)](https://cloud.vast.ai/?ref_id=62897&creator_id=62897&name=ComfyUI%20(Serverless)) + +2. Follow the [getting started guide](https://docs.vast.ai/documentation/serverless/quickstart) for help with configuring your serverless setup. For testing, we recommend that you use the default options presented by the web interface. ## Requirements @@ -10,6 +18,88 @@ This worker requires both [ComfyUI](https://github.com/comfyanonymous/ComfyUI) a A docker image is provided but you may use any if the above requirements are met. +## Client + +The client demonstrates how to use the Vast Serverless SDK to generate images, save them locally, and optionally upload to S3-compatible storage. + +### Setup + +1. Clone the PyWorker repository to your local machine and install the necessary requirements for running the test client. + +```bash +git clone https://github.com/vast-ai/pyworker +cd pyworker +pip install uv +uv venv -p 3.12 +source .venv/bin/activate +uv pip install -r requirements.txt +``` + +2. Set your API key: + +```bash +export VAST_API_KEY= +``` + +### Usage + +```bash +# Default prompt +python -m workers.comfyui-json.client + +# Custom prompt +python -m workers.comfyui-json.client --prompt "a cat sitting on a rainbow" + +# With options +python -m workers.comfyui-json.client --prompt "sunset" --width 1024 --height 1024 --steps 30 + +# Using a custom workflow file +python -m workers.comfyui-json.client --workflow my_workflow.json + +# With S3 upload +python -m workers.comfyui-json.client --s3 +``` + +### CLI Flags + +| Flag | Default | Description | +|------|---------|-------------| +| `--endpoint` | `my-comfyui-endpoint` | Vast endpoint name | +| `--prompt` | (default) | Text prompt for image generation | +| `--workflow` | (none) | Path to custom workflow JSON file | +| `--width` | 512 | Image width in pixels | +| `--height` | 512 | Image height in pixels | +| `--steps` | 20 | Number of denoising steps | +| `--seed` | (random) | Random seed for reproducibility | +| `--s3` | (disabled) | Upload generated images to S3 | + +### Output + +Images are saved to `./generated_images/comfy_{seed}.png`. + +### S3 Upload (Optional) + +You can optionally upload generated images to an S3-compatible storage service (AWS S3, Cloudflare R2, Backblaze B2, etc.) by using the `--s3` flag. + +**1. Set environment variables:** + +```bash +export S3_ENDPOINT_URL="https://your-account.r2.cloudflarestorage.com" +export S3_BUCKET_NAME="my-bucket" +export S3_ACCESS_KEY_ID="your-access-key-id" +export S3_SECRET_ACCESS_KEY="your-secret-access-key" +``` + +**2. Run with S3 upload enabled:** + +```bash +python -m workers.comfyui-json.client --prompt "a beautiful landscape" --s3 +``` + +Images will be saved locally AND uploaded to `s3://{bucket}/comfyui/{filename}`. + +**Note:** Requires `boto3` (`pip install boto3`). + ## Benchmarking ### Custom Benchmark Workflows @@ -212,11 +302,3 @@ WEBHOOK_TIMEOUT=30 # Webhook timeout in seconds } } ``` - -## Client Libraries - -See the test client examples for implementation details on how to integrate with the ComfyUI worker. - ---- - -See Vast's serverless documentation for more details on how to use ComfyUI with autoscaler. \ No newline at end of file diff --git a/workers/comfyui-json/client.py b/workers/comfyui-json/client.py index 93e184c..d79b30d 100644 --- a/workers/comfyui-json/client.py +++ b/workers/comfyui-json/client.py @@ -1,35 +1,312 @@ -from .data_types import count_workload +import os +import sys +import json import uuid import random import asyncio -import random +import logging +import argparse +import aiohttp from vastai import Serverless -async def main(): - async with Serverless() as client: - endpoint = await client.get_endpoint(name="my-comfy-endpoint") # Change this to your endpoint name +# ---------------------- Config ---------------------- +DEFAULT_PROMPT = "a beautiful sunset over mountains, digital art, highly detailed" +ENDPOINT_NAME = "my-comfyui-endpoint" +DEFAULT_WIDTH = 512 +DEFAULT_HEIGHT = 512 +DEFAULT_STEPS = 20 +COST = 100 # Fixed cost for ComfyUI requests - payload = { - "input": { - "request_id": str(uuid.uuid4()), - "modifier": "Text2Image", - "modifications": { - "prompt": "a beautiful landscape with mountains and lakes", - "width": 1024, - "height": 1024, - "steps": 20, - "seed": random.randint(0, 2**32 - 1) - }, - "workflow_json": {} # Empty since using modifier approach - } +# Optional S3 Configuration (from environment variables) +S3_ENDPOINT_URL = os.getenv("S3_ENDPOINT_URL") +S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME") +S3_ACCESS_KEY_ID = os.getenv("S3_ACCESS_KEY_ID") +S3_SECRET_ACCESS_KEY = os.getenv("S3_SECRET_ACCESS_KEY") + +logging.basicConfig(level=logging.INFO, format="%(levelname)s - %(message)s") +log = logging.getLogger(__name__) + + +def get_s3_client(): + """Create and return an S3 client configured for the S3-compatible endpoint""" + try: + import boto3 + from botocore.config import Config + except ImportError: + log.error("boto3 is required for S3 uploads. Install with: pip install boto3") + return None + + if not all([S3_ENDPOINT_URL, S3_BUCKET_NAME, S3_ACCESS_KEY_ID, S3_SECRET_ACCESS_KEY]): + log.error("S3 environment variables not fully configured. Required:") + log.error(" S3_ENDPOINT_URL, S3_BUCKET_NAME, S3_ACCESS_KEY_ID, S3_SECRET_ACCESS_KEY") + return None + + return boto3.client( + "s3", + endpoint_url=S3_ENDPOINT_URL, + aws_access_key_id=S3_ACCESS_KEY_ID, + aws_secret_access_key=S3_SECRET_ACCESS_KEY, + config=Config(signature_version="s3v4"), + ) + + +# ---------------------- API Functions ---------------------- +async def call_generate( + client: Serverless, + *, + endpoint_name: str, + prompt: str, + width: int, + height: int, + steps: int, + seed: int, +) -> dict: + """Generate image using Text2Image modifier""" + endpoint = await client.get_endpoint(name=endpoint_name) + payload = { + "input": { + "request_id": str(uuid.uuid4()), + "modifier": "Text2Image", + "modifications": { + "prompt": prompt, + "width": width, + "height": height, + "steps": steps, + "seed": seed, + }, } - - response = await endpoint.request("/generate/sync", payload, cost=count_workload()) + } + return await endpoint.request("/generate/sync", payload, cost=COST) + + +async def call_generate_workflow( + client: Serverless, + *, + endpoint_name: str, + workflow_json: dict, +) -> dict: + """Generate using custom workflow JSON""" + endpoint = await client.get_endpoint(name=endpoint_name) + payload = { + "input": { + "request_id": str(uuid.uuid4()), + "workflow_json": workflow_json, + } + } + return await endpoint.request("/generate/sync", payload, cost=COST) + + +# ---------------------- Demo Class ---------------------- +class APIDemo: + def __init__(self, client: Serverless, endpoint_name: str, upload_s3: bool = False): + self.client = client + self.endpoint_name = endpoint_name + self.upload_s3 = upload_s3 + self.s3_client = get_s3_client() if upload_s3 else None + + if upload_s3 and not self.s3_client: + log.warning("S3 upload requested but client creation failed. Images will only be saved locally.") + + def extract_filename(self, response: dict) -> str | None: + """Extract the generated image filename from ComfyUI response""" + if "comfyui_response" in response: + for data in response["comfyui_response"].values(): + if isinstance(data, dict) and "outputs" in data: + for node_output in data["outputs"].values(): + if "images" in node_output and node_output["images"]: + return node_output["images"][0].get("filename") + return None + + async def save_image(self, worker_url: str, filename: str, local_name: str) -> str | None: + """Fetch and save image locally from the worker, optionally upload to S3""" + os.makedirs("generated_images", exist_ok=True) + return await self._fetch_image(worker_url, filename, local_name) + + def _upload_to_s3(self, local_path: str, s3_key: str) -> str | None: + """Upload a local file to S3 and return the S3 URL""" + if not self.s3_client: + return None + + try: + self.s3_client.upload_file( + local_path, + S3_BUCKET_NAME, + s3_key, + ExtraArgs={"ContentType": "image/png"} + ) + s3_url = f"{S3_ENDPOINT_URL}/{S3_BUCKET_NAME}/{s3_key}" + print(f" ā˜ļø Uploaded to S3: {s3_key}") + return s3_url + except Exception as e: + log.error(f"Failed to upload to S3: {e}") + return None + + async def _fetch_image(self, worker_url: str, filename: str, local_name: str) -> str | None: + """Fetch image from worker's /view endpoint and save locally""" + if not worker_url: + return None + + try: + url = f"{worker_url}/view" + params = {"filename": filename, "type": "output"} + + async with aiohttp.ClientSession() as session: + async with session.get(url, params=params, ssl=False) as resp: + if resp.status == 200: + path = f"generated_images/{local_name}" + image_data = await resp.read() + with open(path, "wb") as f: + f.write(image_data) + print(f" šŸ’¾ Saved: {path}") + + # Upload to S3 if enabled + if self.upload_s3 and self.s3_client: + s3_key = f"comfyui/{local_name}" + self._upload_to_s3(path, s3_key) + + return path + return None + except Exception: + return None + + async def demo_prompt( + self, + prompt: str, + width: int, + height: int, + steps: int, + seed: int | None, + ): + """Demo: Generate image from text prompt""" + print("=" * 60) + print("COMFYUI TEXT-TO-IMAGE DEMO") + print("=" * 60) + + if seed is None: + seed = random.randint(0, 2**32 - 1) + + print(f"Prompt: {prompt[:100]}..." if len(prompt) > 100 else f"Prompt: {prompt}") + print(f"Size: {width}x{height}, Steps: {steps}, Seed: {seed}") + print("\nšŸŽØ Generating image...") + + response = await call_generate( + self.client, + endpoint_name=self.endpoint_name, + prompt=prompt, + width=width, + height=height, + steps=steps, + seed=seed, + ) + + print("\nāœ… Generation complete!") + + # Get worker URL for fetching images + worker_url = response.get("url", "") + print(f"Worker URL: {worker_url}") + + # Fetch and save image + if "response" in response: + filename = self.extract_filename(response["response"]) + if filename: + path = await self.save_image(worker_url, filename, f"comfy_{seed}.png") + if not path: + print(f"āŒ Failed to fetch image") + else: + print("āŒ No image in response") + else: + print("āŒ Unexpected response format") + + async def demo_workflow(self, workflow_file: str): + """Demo: Generate using custom workflow file""" + print("=" * 60) + print("COMFYUI CUSTOM WORKFLOW DEMO") + print("=" * 60) + + if not os.path.exists(workflow_file): + log.error(f"Workflow file not found: {workflow_file}") + return + + with open(workflow_file, "r") as f: + workflow_json = json.load(f) + + print(f"Workflow: {workflow_file}") + print("\nšŸŽØ Generating...") + + response = await call_generate_workflow( + self.client, + endpoint_name=self.endpoint_name, + workflow_json=workflow_json, + ) + + print("\nāœ… Generation complete!") + + worker_url = response.get("url", "") + + if "response" in response: + filename = self.extract_filename(response["response"]) + if filename: + path = await self.save_image(worker_url, filename, "workflow.png") + if not path: + print(f"āŒ Failed to fetch image") + else: + print("āŒ No image in response") + else: + print("āŒ Unexpected response format") + + +# ---------------------- CLI ---------------------- +def build_arg_parser() -> argparse.ArgumentParser: + p = argparse.ArgumentParser(description="Vast ComfyUI-JSON Demo (Serverless SDK)") + p.add_argument("--endpoint", default=ENDPOINT_NAME, help=f"Vast endpoint name (default: {ENDPOINT_NAME})") + p.add_argument("--prompt", type=str, default=DEFAULT_PROMPT, metavar="TEXT", + help=f"Prompt text (default: '{DEFAULT_PROMPT[:30]}...')") + p.add_argument("--workflow", type=str, metavar="FILE", help="Use custom workflow JSON file instead") + p.add_argument("--width", type=int, default=DEFAULT_WIDTH, help=f"Image width (default: {DEFAULT_WIDTH})") + p.add_argument("--height", type=int, default=DEFAULT_HEIGHT, help=f"Image height (default: {DEFAULT_HEIGHT})") + p.add_argument("--steps", type=int, default=DEFAULT_STEPS, help=f"Steps (default: {DEFAULT_STEPS})") + p.add_argument("--seed", type=int, default=None, help="Seed (default: random)") + p.add_argument("--s3", action="store_true", + help="Upload generated images to S3 (requires S3_ENDPOINT_URL, S3_BUCKET_NAME, S3_ACCESS_KEY_ID, S3_SECRET_ACCESS_KEY env vars)") + return p + + +async def main_async(): + args = build_arg_parser().parse_args() + + print("=" * 60) + print(f"Using endpoint: {args.endpoint}") + if args.s3: + print(f"S3 upload: enabled (bucket: {S3_BUCKET_NAME})") + + try: + async with Serverless() as client: + demo = APIDemo(client, args.endpoint, upload_s3=args.s3) + + if args.workflow: + await demo.demo_workflow(workflow_file=args.workflow) + else: + await demo.demo_prompt( + prompt=args.prompt, + width=args.width, + height=args.height, + steps=args.steps, + seed=args.seed, + ) + + except AttributeError as e: + if "API key" in str(e): + log.error("API key missing. Set VAST_API_KEY environment variable.") + else: + log.error(f"Error: {e}") + sys.exit(1) + except Exception as e: + log.error(f"Error: {e}") + import traceback + traceback.print_exc() + sys.exit(1) - # Get the file from the path on the local machine using SCP or SFTP - # or configure S3 to upload to cloud storage. - print(response["response"]["output"][0]["local_path"]) if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + asyncio.run(main_async()) diff --git a/workers/comfyui-json/server.py b/workers/comfyui-json/server.py index ed4e578..daf35e5 100644 --- a/workers/comfyui-json/server.py +++ b/workers/comfyui-json/server.py @@ -4,6 +4,7 @@ import dataclasses import base64 from typing import Optional, Union, Type +import aiohttp from aiohttp import web, ClientResponse from lib.backend import Backend, LogAction @@ -13,6 +14,7 @@ from .data_types import ComfyWorkflowData MODEL_SERVER_URL = os.getenv("MODEL_SERVER_URL", "http://127.0.0.1:18288") +COMFYUI_URL = os.getenv("COMFYUI_URL", "http://127.0.0.1:18188") # Raw ComfyUI server # This is the last log line that gets emitted once comfyui+extensions have been fully loaded MODEL_SERVER_START_LOG_MSG = "To see the GUI go to: " @@ -108,8 +110,39 @@ async def handle_ping(_): return web.Response(body="pong") +async def handle_view(request: web.Request) -> web.Response: + """Proxy /view requests to raw ComfyUI server to fetch generated images""" + # Forward query params to raw ComfyUI (not the API wrapper) + query_string = request.query_string + url = f"{COMFYUI_URL}/view?{query_string}" + + log.debug(f"Proxying /view request to: {url}") + + try: + async with aiohttp.ClientSession() as session: + async with session.get(url) as resp: + if resp.status == 200: + content = await resp.read() + return web.Response( + body=content, + status=200, + content_type=resp.content_type or "image/png" + ) + else: + text = await resp.text() + return web.Response( + text=text, + status=resp.status, + content_type="text/plain" + ) + except Exception as e: + log.error(f"Error proxying /view: {e}") + return web.Response(text=str(e), status=500) + + routes = [ web.post("/generate/sync", backend.create_handler(ComfyWorkflowHandler())), + web.get("/view", handle_view), web.get("/ping", handle_ping), ]