4. 技術スタック完全仕様 — ComfyUI API全エンドポイント
4-1. REST API エンドポイント一覧 [1]
| エンドポイント | メソッド | 主要パラメータ | 用途 |
POST /prompt | POST | {"prompt": workflow_json, "client_id": uuid, "front": false} | ワークフロー投入。prompt_id返却 |
GET /queue | GET | — | queue_running / queue_pending 取得 |
POST /queue | POST | {"clear": true} | キュークリア / ポーズ |
GET /history | GET | — | 全履歴取得 |
GET /history/{prompt_id} | GET | prompt_id (path) | 完了後の生成結果取得 |
POST /history | POST | {"clear": true} | 履歴クリア |
POST /interrupt | POST | — | 実行中ワークフローを即時中断 |
POST /free | POST | {"unload_models": true, "free_memory": true} | VRAMクリア・OOM後リカバリー |
GET /system_stats | GET | — | VRAM使用量・Python情報取得 |
GET /object_info | GET | — | 全ノードクラス定義取得 |
GET /object_info/{node_class} | GET | node_class (path) | 特定ノードの入出力定義 |
GET /models/{folder} | GET | folder (path) | checkpoints/loras等のモデル一覧 |
POST /upload/image | POST | multipart/form-data | 入力画像アップロード |
GET /view | GET | filename, subfolder, type | 生成画像のバイナリ取得 |
GET /embeddings | GET | — | 利用可能Embedding一覧 |
GET /extensions | GET | — | 登録拡張一覧 |
GET /features | GET | — | サーバー機能・ケーパビリティ |
4-2. WebSocket /ws イベント一覧 [6]
重要: WebSocketフレームはJSONとバイナリの両方が混在する。バイナリフレームはプレビュー画像。isinstance(msg, bytes) で必ず分岐すること。
| イベント type | data フィールド | 用途・トリガー |
status | status.exec_info.queue_remaining | キュー状態変化時に随時送信 |
execution_start | prompt_id | ワークフロー実行開始 |
executing | node, prompt_id | ノード実行中。node=null かつ prompt_id一致 = 完了 |
progress | value, max, prompt_id, node | KSamplerのステップ進捗 |
executed | node, output, prompt_id | ノード完了・出力あり |
execution_cached | nodes, prompt_id | キャッシュ再利用でノードスキップ |
execution_error | exception_message, exception_type, node_id | ワークフロー実行エラー |
| (binary frame) | PNG/JPEG バイナリ | プレビュー画像。無視してOK |
4-3. /prompt POST 詳細スペック [3]
# 成功レスポンス (HTTP 200)
{
"prompt_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"number": 3, # キュー内の順番
"node_errors": {} # 空なら検証OK
}
# バリデーションエラー (HTTP 400)
{
"error": {
"type": "prompt_outputs_failed_validation",
"message": "..."
},
"node_errors": {
"4": {
"errors": [{"type": "required_input_missing", "message": "lora_name is required"}],
"class_type": "LoraLoader"
}
}
}
4-4. system_stats レスポンス構造 (VRAM監視)
{
"system": {
"os": "nt",
"python_version": "3.12.x",
"embedded_python": false
},
"devices": [
{
"name": "NVIDIA GeForce RTX 3090 Ti",
"type": "cuda",
"index": 0,
"vram_total": 25769803776, # 24GB = 25.7GB (bytes)
"vram_free": 18253611008, # 空きVRAM (bytes)
"torch_vram_total": 25769803776,
"torch_vram_free": 18253611008
}
]
}
# 利用: vram_free_gb = devices[0]["vram_free"] / (1024**3)
4-5. コード1: ComfyUIBatchClient クラス (完全実装) [2][4]
# comfyui_client.py
# ComfyUI WebSocket API クライアント — 量産自動化用完全実装
import json
import time
import uuid
import requests
import websocket
from pathlib import Path
from typing import List, Dict, Optional
class ComfyUIError(Exception):
"""ComfyUI API 基底例外"""
pass
class OOMError(ComfyUIError):
"""CUDA Out of Memory"""
pass
class ValidationError(ComfyUIError):
"""ワークフロー検証エラー (リトライ不可)"""
pass
class ComfyUIBatchClient:
"""
ComfyUI WebSocket/HTTP API クライアント
量産自動化・OOMリカバリー・出力自動整理対応
"""
def __init__(self, host: str = "127.0.0.1", port: int = 8188, max_retry: int = 3):
self.host = host
self.port = port
self.base_url = f"http://{host}:{port}"
self.ws_url = f"ws://{host}:{port}/ws"
self.client_id = str(uuid.uuid4())
self.max_retry = max_retry
self.session = requests.Session()
self.session.headers.update({"Content-Type": "application/json"})
# ----------------------------------------------------------
# キュー投入
# ----------------------------------------------------------
def queue_prompt(self, workflow: dict) -> str:
"""
ワークフローをキューに投入。
Returns: prompt_id (str)
Raises: ValidationError (node_errorsあり)
"""
payload = {
"prompt": workflow,
"client_id": self.client_id,
"front": False,
}
resp = self.session.post(f"{self.base_url}/prompt", json=payload, timeout=30)
if resp.status_code == 400:
err = resp.json()
raise ValidationError(f"node_errors: {err.get('node_errors', {})}")
resp.raise_for_status()
data = resp.json()
return data["prompt_id"]
# ----------------------------------------------------------
# 完了待機 (WebSocket)
# ----------------------------------------------------------
def wait_completion(self, prompt_id: str, timeout: int = 600) -> dict:
"""
WebSocket で executing node=null を待ち、history を返す。
バイナリフレーム (プレビュー) は無視。
execution_error は OOMError または ComfyUIError を raise。
"""
ws = websocket.WebSocket()
ws.settimeout(timeout)
ws.connect(f"{self.ws_url}?clientId={self.client_id}")
start = time.time()
try:
while True:
if time.time() - start > timeout:
raise TimeoutError(f"Timeout waiting for {prompt_id}")
try:
msg = ws.recv()
except websocket.WebSocketTimeoutException:
raise TimeoutError(f"WebSocket timeout for {prompt_id}")
# バイナリ = プレビュー画像 → スキップ
if isinstance(msg, bytes):
continue
data = json.loads(msg)
msg_type = data.get("type", "")
payload = data.get("data", {})
if msg_type == "executing":
# node=null かつ prompt_id一致 = 完了シグナル
if (payload.get("node") is None and
payload.get("prompt_id") == prompt_id):
break
elif msg_type == "execution_error":
# OOM判定
exc_msg = str(payload.get("exception_message", "")).lower()
if "out of memory" in exc_msg or "cuda" in exc_msg:
raise OOMError(payload.get("exception_message", "OOM"))
raise ComfyUIError(payload.get("exception_message", "Unknown error"))
finally:
try:
ws.close()
except Exception:
pass
return self.get_history(prompt_id)
# ----------------------------------------------------------
# OOMリカバリー
# ----------------------------------------------------------
def recover_oom(self, wait_sec: int = 60):
"""POST /free でモデルアンロード + キャッシュクリア。"""
self.session.post(
f"{self.base_url}/free",
json={"unload_models": True, "free_memory": True},
timeout=30,
)
print(f" [OOM Recovery] /free 完了。{wait_sec}秒待機...")
time.sleep(wait_sec)
# ----------------------------------------------------------
# VRAM監視
# ----------------------------------------------------------
def get_vram_free(self) -> float:
"""空きVRAM (GB) を返す。"""
stats = self.session.get(f"{self.base_url}/system_stats", timeout=10).json()
return stats["devices"][0]["vram_free"] / (1024 ** 3)
# ----------------------------------------------------------
# キュー監視
# ----------------------------------------------------------
def get_queue_remaining(self) -> int:
"""実行中 + 待機中のジョブ数を返す。"""
q = self.session.get(f"{self.base_url}/queue", timeout=10).json()
return len(q.get("queue_running", [])) + len(q.get("queue_pending", []))
def wait_for_queue_slot(self, max_q: int = 4, poll_interval: float = 2.0):
"""キューに空きができるまでブロック。"""
while self.get_queue_remaining() >= max_q:
time.sleep(poll_interval)
# ----------------------------------------------------------
# 中断・履歴
# ----------------------------------------------------------
def interrupt(self):
"""実行中ワークフローを即時中断。"""
self.session.post(f"{self.base_url}/interrupt", json={}, timeout=10)
def get_history(self, prompt_id: str) -> dict:
"""完了後の履歴・出力情報を取得。"""
return self.session.get(
f"{self.base_url}/history/{prompt_id}", timeout=30
).json()
# ----------------------------------------------------------
# 出力ダウンロード
# ----------------------------------------------------------
def download_outputs(self, prompt_id: str, dest_dir: str) -> List[str]:
"""
history からファイル一覧を取り出し、dest_dir に保存。
Returns: 保存したローカルパスのリスト
"""
history = self.get_history(prompt_id)
outputs: List[str] = []
dest = Path(dest_dir)
dest.mkdir(parents=True, exist_ok=True)
prompt_data = history.get(prompt_id, {})
for node_id, node_out in prompt_data.get("outputs", {}).items():
for img in node_out.get("images", []):
fname = img["filename"]
subfolder = img.get("subfolder", "")
params = {"filename": fname, "subfolder": subfolder, "type": "output"}
r = self.session.get(
f"{self.base_url}/view", params=params, stream=True, timeout=60
)
save_path = dest / fname
with open(save_path, "wb") as f:
for chunk in r.iter_content(8192):
f.write(chunk)
outputs.append(str(save_path))
return outputs
4-6. コード2: inject_params — WF JSON動的書き換え [5]
# inject_params.py
# workflow.json のノードにseed/LoRA/プロンプトを外部注入
import copy
from typing import Dict, Any
def inject_params(workflow: Dict[str, Any], params: Dict[str, Any]) -> Dict[str, Any]:
"""
workflow (API形式JSON) にparams辞書の値を注入して返す。
workflow はdeepcopyされるので元データは保持される。
params キー一覧:
seed, steps, cfg, sampler_name, scheduler,
lora_name, lora_strength,
positive_prompt, negative_prompt,
width, height
"""
workflow = copy.deepcopy(workflow)
for node_id, node in workflow.items():
class_type = node.get("class_type", "")
inputs = node.get("inputs", {})
meta_title = node.get("_meta", {}).get("title", "").lower()
# ---- KSampler / KSamplerAdvanced ----
if class_type in ("KSampler", "KSamplerAdvanced"):
if "seed" in params: inputs["seed"] = params["seed"]
if "steps" in params: inputs["steps"] = params["steps"]
if "cfg" in params: inputs["cfg"] = params["cfg"]
if "sampler_name" in params: inputs["sampler_name"] = params["sampler_name"]
if "scheduler" in params: inputs["scheduler"] = params["scheduler"]
if class_type == "KSamplerAdvanced":
if "seed" in params:
inputs["noise_seed"] = params["seed"]
# ---- LoraLoader ----
elif class_type == "LoraLoader":
if "lora_name" in params:
# Windows backslash → forward slash に正規化
inputs["lora_name"] = params["lora_name"].replace("\\", "/")
if "lora_strength" in params:
inputs["strength_model"] = params["lora_strength"]
inputs["strength_clip"] = params["lora_strength"]
# ---- CLIPTextEncode (positive / negative 判定) ----
elif class_type == "CLIPTextEncode":
is_positive = any(k in meta_title for k in ["positive", "pos", "posi", "prompt+", "正"])
is_negative = any(k in meta_title for k in ["negative", "neg", "prompt-", "負"])
# タイトル判定が取れない場合は既存テキストの内容で推定
if not is_positive and not is_negative:
existing = inputs.get("text", "")
# ネガティブは "(worst quality" 等の典型ワードが含まれる
if "worst quality" in existing or "bad anatomy" in existing:
is_negative = True
else:
is_positive = True # デフォルトはポジティブ扱い
if is_positive and "positive_prompt" in params:
inputs["text"] = params["positive_prompt"]
elif is_negative and "negative_prompt" in params:
inputs["text"] = params["negative_prompt"]
# ---- EmptyLatentImage ----
elif class_type == "EmptyLatentImage":
if "width" in params: inputs["width"] = params["width"]
if "height" in params: inputs["height"] = params["height"]
return workflow
4-7. コード3: 100体バッチメインループ [9]
# batch_runner.py
# characters.json + base_workflow.json から100体を自動生成
# MAX_Q=4 キュー制御 / OOMリカバリー / progress.json 途中再開対応
import json, time, argparse
from pathlib import Path
from comfyui_client import ComfyUIBatchClient, OOMError, ValidationError
from inject_params import inject_params
from organize_outputs import organize_outputs
BASE_SEED = 42000
MAX_Q = 4 # RTX 3090 Ti 24GB / SDXL 1枚≈6GB → 同時4枚
MAX_RETRY = 3
OUTPUT_ROOT = Path(r"D:/ComfyUI/output/batch")
# シーンID → ベースプロンプト (SCENE_DISTに合わせて調整)
SCENE_PROMPTS = {
0: "(naked:1.3), standing, smile, looking at viewer",
1: "(from behind:1.2), (doggy style:1.4), moaning",
2: "(missionary:1.4), on bed, spread legs",
3: "(cowgirl:1.4), riding, blissful face", # ahegao禁止→blissful face
4: "(creampie:1.5), internal cumshot",
5: "(aftersex:1.2), lying down, satisfied",
}
# ---------- ユーティリティ ----------
def load_json(path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def save_json(data, path):
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# ---------- メインループ ----------
def run_batch(args):
client = ComfyUIBatchClient()
characters = load_json(args.characters)
workflow = load_json(args.workflow)
progress = load_json(args.progress) if (args.resume and Path(args.progress).exists()) else {}
total_jobs = sum(c["scenes"] for c in characters["characters"])
completed = 0
start_time = time.time()
print(f"[Batch] 開始: {total_jobs}ジョブ / MAX_Q={MAX_Q} / VRAM {client.get_vram_free():.1f}GB 空き")
for char in characters["characters"]:
char_name = char["name"]
for scene_id in range(char["scenes"]):
job_key = f"{char_name}_{scene_id}"
# 途中再開: 完了済みはスキップ
if args.resume and progress.get(job_key) == "done":
completed += 1
continue
seed = BASE_SEED + char.get("seed_offset", 0) + scene_id
params = {
"lora_name": char["lora"],
"lora_strength": char.get("strength", 0.8),
"seed": seed,
"positive_prompt": char.get("base_prompt", "") + ", " + SCENE_PROMPTS.get(scene_id, ""),
"negative_prompt": char.get("neg_prompt",
"(worst quality:1.4)(low quality:1.4)(blurry:1.2)(ahegao:1.5)"
"(mature:1.5)(adult:1.4)(old:1.4)"),
"width": 1024, "height": 1024,
"steps": 30, "cfg": 6.0,
"sampler_name": "dpmpp_2m", "scheduler": "karras",
}
# キュー待機
client.wait_for_queue_slot(max_q=MAX_Q)
# OOMリトライループ
success = False
for attempt in range(MAX_RETRY):
try:
wf = inject_params(workflow, params)
prompt_id = client.queue_prompt(wf)
_results = client.wait_completion(prompt_id, timeout=600)
organize_outputs(client, prompt_id, char_name, scene_id, seed, str(OUTPUT_ROOT))
progress[job_key] = "done"
save_json(progress, args.progress)
success = True
break
except OOMError:
print(f" [OOM] {job_key} attempt {attempt+1}/{MAX_RETRY}")
wait_sec = 60 * (2 ** attempt) # 60 / 120 / 240秒
client.recover_oom(wait_sec=wait_sec)
except ValidationError as e:
print(f" [VALIDATION] {job_key}: {e} → スキップ (リトライ不可)")
progress[job_key] = "error_validation"
save_json(progress, args.progress)
break
except Exception as e:
print(f" [ERROR] {job_key} attempt {attempt+1}: {e}")
time.sleep(10)
if not success and progress.get(job_key) not in ("done", "error_validation"):
progress[job_key] = "error_oom"
save_json(progress, args.progress)
completed += 1
elapsed = time.time() - start_time
rate = completed / elapsed if elapsed > 0 else 0.001
eta_min = (total_jobs - completed) / rate / 60
pct = completed / total_jobs * 100
print(f" [{completed}/{total_jobs} {pct:.0f}%] {job_key} | ETA {eta_min:.0f}min | VRAM {client.get_vram_free():.1f}GB")
# 完了サマリー
done_count = sum(1 for v in progress.values() if v == "done")
error_count = sum(1 for v in progress.values() if v.startswith("error"))
print(f"\n[Batch 完了] done={done_count} / error={error_count} / total={total_jobs}")
print(f" 総時間: {(time.time()-start_time)/60:.1f}分")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="ComfyUI 100体バッチ生成")
parser.add_argument("--characters", default="characters.json")
parser.add_argument("--workflow", default="base_workflow.json")
parser.add_argument("--progress", default="progress.json")
parser.add_argument("--resume", action="store_true", help="途中再開")
args = parser.parse_args()
run_batch(args)
4-8. コード4: 出力ファイル自動整理 [3]
# organize_outputs.py
# キャラ名別フォルダ / 命名規則 / 重複検出(MD5) / サムネ生成
import hashlib, json
from pathlib import Path
from PIL import Image as PILImage
# プロセス内重複排除セット (実行中のみ有効)
_existing_hashes: set = set()
def organize_outputs(
client,
prompt_id: str,
char_name: str,
scene_id: int,
seed: int,
output_root: str,
) -> list:
"""
生成画像を output_root/{char_name}/ へ整理保存。
重複 (MD5一致) はスキップ。256x256 サムネも生成。
Returns: 保存したファイルパスのリスト
"""
output_root = Path(output_root)
char_dir = output_root / char_name
thumb_dir = char_dir / "thumbs"
char_dir.mkdir(parents=True, exist_ok=True)
thumb_dir.mkdir(exist_ok=True)
saved = []
raw_paths = client.download_outputs(prompt_id, str(char_dir / "_raw"))
for raw_path in raw_paths:
raw_path = Path(raw_path)
with open(raw_path, "rb") as fh:
data = fh.read()
# MD5重複チェック
h = hashlib.md5(data).hexdigest()
if h in _existing_hashes:
raw_path.unlink(missing_ok=True)
continue
_existing_hashes.add(h)
# リネーム: {char_name}_{scene_id:02d}_{seed:010d}.png
dest_name = f"{char_name}_{scene_id:02d}_{seed:010d}.png"
dest_path = char_dir / dest_name
raw_path.rename(dest_path)
# サムネイル生成
try:
img = PILImage.open(dest_path).convert("RGB")
img.thumbnail((256, 256), PILImage.LANCZOS)
img.save(thumb_dir / dest_name)
except Exception as e:
print(f" [Thumb] {dest_name} 生成失敗: {e}")
saved.append(str(dest_path))
# index.json 更新
index_path = char_dir / "index.json"
index = json.loads(index_path.read_text(encoding="utf-8")) if index_path.exists() else {"char": char_name, "files": []}
for sp in saved:
index["files"].append({
"file": Path(sp).name,
"scene_id": scene_id,
"seed": seed,
})
index_path.write_text(json.dumps(index, indent=2, ensure_ascii=False), encoding="utf-8")
return saved
4-9. コード5: CC3共存・隙間実行 (GpuArbiter) [13]
# cc3_coexist.py
# R18量産(PRIORITY_R18=0) と CC3漫画生成(PRIORITY_MANGA=10) の GPU調停
import threading, queue, time
from comfyui_client import ComfyUIBatchClient
PRIORITY_R18 = 0 # 最高優先度
PRIORITY_MANGA = 10 # 低優先度 (隙間実行)
VRAM_MIN_GB = 8.0 # 漫画ジョブ投入に必要な空きVRAM
class GpuArbiter:
"""
PriorityQueue ベースの GPU調停クラス。
R18量産ジョブが来たとき CC3ジョブを /interrupt で割り込み中断。
"""
def __init__(self, client: ComfyUIBatchClient):
self.client = client
self.job_queue = queue.PriorityQueue()
self._thread = None
self.running = False
self._current_priority = None
def start(self):
self.running = True
self._thread = threading.Thread(target=self._worker, daemon=True)
self._thread.start()
print("[GpuArbiter] 開始")
def stop(self):
self.running = False
def _worker(self):
while self.running:
try:
prio, workflow, params, callback = self.job_queue.get(timeout=1.0)
except queue.Empty:
continue
# R18ジョブが来て CC3が実行中なら割り込み中断
if prio == PRIORITY_R18 and self._current_priority == PRIORITY_MANGA:
print("[GpuArbiter] R18優先: CC3ジョブを中断")
self.client.interrupt()
time.sleep(3)
# VRAM不足なら /free → 再キュー
vram = self.client.get_vram_free()
if vram < 6.0:
print(f"[GpuArbiter] VRAM不足 ({vram:.1f}GB) → /free 後再キュー")
self.client.recover_oom(wait_sec=30)
self.job_queue.put((prio, workflow, params, callback))
continue
self._current_priority = prio
try:
from inject_params import inject_params
wf = inject_params(workflow, params)
prompt_id = self.client.queue_prompt(wf)
results = self.client.wait_completion(prompt_id)
if callback:
callback(results)
except Exception as e:
print(f"[GpuArbiter] ジョブエラー: {e}")
finally:
self._current_priority = None
self.job_queue.task_done()
def submit_r18(self, workflow, params, callback=None):
"""R18量産ジョブを投入 (最高優先度)。"""
self.job_queue.put((PRIORITY_R18, workflow, params, callback))
def submit_manga(self, workflow, params, callback=None):
"""漫画ジョブを投入 (低優先度・隙間実行)。"""
# キューが空 かつ VRAM十分の場合のみ受け付け
if self.job_queue.empty() and self.client.get_vram_free() > VRAM_MIN_GB:
self.job_queue.put((PRIORITY_MANGA, workflow, params, callback))
print(f"[GpuArbiter] 漫画ジョブ投入 (VRAM {self.client.get_vram_free():.1f}GB)")
else:
print("[GpuArbiter] 漫画ジョブ: R18量産中または VRAM不足 → スキップ")
4-10. コード6: カスタムノード BatchProgressReporter (V1スキーマ) [7]
# custom_nodes/batch_progress_reporter/nodes.py
# 生成進捗を progress.json に書き出すユーティリティノード
import json, time
from pathlib import Path
class BatchProgressReporter:
"""
画像をスルーしながら進捗を progress.json に記録。
CATEGORY: fanza3_mass/utils
"""
@classmethod
def INPUT_TYPES(cls):
return {
"required": {
"images": ("IMAGE",),
"char_name": ("STRING", {"default": "unknown"}),
"scene_id": ("INT", {"default": 0, "min": 0, "max": 999}),
"total_jobs": ("INT", {"default": 100, "min": 1, "max": 10000}),
"status_file": ("STRING", {"default": "D:/progress.json"}),
}
}
RETURN_TYPES = ("IMAGE",)
RETURN_NAMES = ("images",)
FUNCTION = "report"
CATEGORY = "fanza3_mass/utils"
OUTPUT_NODE = False
def report(self, images, char_name, scene_id, total_jobs, status_file):
status_path = Path(status_file)
if status_path.exists():
status = json.loads(status_path.read_text(encoding="utf-8"))
else:
status = {}
# キャラ別進捗を更新
if char_name not in status:
status[char_name] = {}
status[char_name][str(scene_id)] = {
"completed": True,
"timestamp": time.time(),
}
# 全体進捗計算
completed = sum(len(v) for v in status.values())
pct = completed / total_jobs * 100
print(f"[BatchProgress] {char_name} scene {scene_id}: {pct:.1f}% ({completed}/{total_jobs})")
status_path.write_text(
json.dumps(status, ensure_ascii=False, indent=2), encoding="utf-8"
)
return (images,)
NODE_CLASS_MAPPINGS = {
"BatchProgressReporter": BatchProgressReporter,
}
NODE_DISPLAY_NAME_MAPPINGS = {
"BatchProgressReporter": "Batch Progress Reporter [fanza3_mass]",
}
4-11. コード7: カスタムノード AutoFolderSaver (V3スキーマ) [8]
# custom_nodes/auto_folder_saver/nodes.py
# V3スキーマ (comfy_api.latest) — キャラ名別フォルダ自動保存
# ComfyUI 0.3.63+ 必須
from pathlib import Path
import numpy as np
try:
from comfy_api.latest import io
V3_AVAILABLE = True
except ImportError:
V3_AVAILABLE = False
if V3_AVAILABLE:
class AutoFolderSaver(io.ComfyNode):
"""
キャラ名別フォルダに画像を自動保存するV3ノード。
CATEGORY: fanza3_mass/utils
"""
@classmethod
def define_schema(cls):
return io.Schema(
node_id="AutoFolderSaver",
display_name="Auto Folder Saver [fanza3_mass]",
inputs=[
io.Image.Input("images"),
io.String.Input("output_root", default="D:/ComfyUI/output/batch"),
io.String.Input("char_name", default="unknown"),
io.Int.Input("scene_id", default=0, min=0, max=999),
io.Int.Input("seed", default=0, min=0, max=2**32-1),
],
outputs=[
io.String.Output("saved_path"),
],
category="fanza3_mass/utils",
)
@classmethod
def execute(cls, images, output_root, char_name, scene_id, seed):
char_dir = Path(output_root) / char_name
char_dir.mkdir(parents=True, exist_ok=True)
filename = f"{char_name}_{scene_id:02d}_{seed:010d}.png"
save_path = char_dir / filename
# images shape: [B, H, W, C] float32 0-1
img_np = (images[0].cpu().numpy() * 255).astype(np.uint8)
from PIL import Image as PILImage
PILImage.fromarray(img_np).save(save_path)
print(f"[AutoFolderSaver] 保存: {save_path}")
return io.NodeOutput(saved_path=str(save_path))
async def comfy_entrypoint():
from comfy_api.latest import ComfyExtension
ext = ComfyExtension(name="auto_folder_saver")
ext.register_node(AutoFolderSaver)
return ext
else:
# V3非対応環境向けV1フォールバック
class AutoFolderSaver:
@classmethod
def INPUT_TYPES(cls):
return {"required": {
"images": ("IMAGE",),
"output_root": ("STRING", {"default": "D:/ComfyUI/output/batch"}),
"char_name": ("STRING", {"default": "unknown"}),
"scene_id": ("INT", {"default": 0}),
"seed": ("INT", {"default": 0}),
}}
RETURN_TYPES = ("STRING",)
FUNCTION = "save"
CATEGORY = "fanza3_mass/utils"
def save(self, images, output_root, char_name, scene_id, seed):
char_dir = Path(output_root) / char_name
char_dir.mkdir(parents=True, exist_ok=True)
filename = f"{char_name}_{scene_id:02d}_{seed:010d}.png"
save_path = char_dir / filename
img_np = (images[0].cpu().numpy() * 255).astype("uint8")
from PIL import Image as PILImage
PILImage.fromarray(img_np).save(save_path)
return (str(save_path),)
NODE_CLASS_MAPPINGS = {"AutoFolderSaver": AutoFolderSaver}
NODE_DISPLAY_NAME_MAPPINGS = {"AutoFolderSaver": "Auto Folder Saver [fanza3_mass]"}
4-12. characters.json スキーマ定義
{
"characters": [
{
"name": "miyabi",
"lora": "miyabi_v2.safetensors",
"strength": 0.85,
"scenes": 6,
"seed_offset": 1000,
"base_prompt": "(miyabi:1.1), black hair, long hair, purple eyes, (fair skin:1.3), school uniform",
"neg_prompt": "(worst quality:1.4)(low quality:1.4)(blurry:1.2)(ahegao:1.5)(mature:1.5)(adult:1.4)(old:1.4)(dark skin:1.4)(tanned:1.4)(blonde hair:1.3)(multicolored hair:1.3)"
},
{
"name": "yui",
"lora": "yui_v1.safetensors",
"strength": 0.80,
"scenes": 6,
"seed_offset": 2000,
"base_prompt": "(yui:1.1), blonde hair, short hair, blue eyes, (fair skin:1.3), maid outfit",
"neg_prompt": "(worst quality:1.4)(low quality:1.4)(blurry:1.2)(ahegao:1.5)(mature:1.5)(black hair:1.4)(red hair:1.3)(multicolored hair:1.3)"
}
]
}
4-13. status.json スキーマと進捗ダッシュボード連携 API
# status_server.py
# aiohttp で /api/batch_status を提供 → dashboard.html からポーリング
import json, asyncio
from pathlib import Path
from aiohttp import web
STATUS_FILE = Path(r"D:/projects/fanza3_mass/progress.json")
PORT = 8765
async def handle_status(request):
if STATUS_FILE.exists():
data = json.loads(STATUS_FILE.read_text(encoding="utf-8"))
done = sum(1 for v in data.values() if isinstance(v, dict) and
all(entry.get("completed") for entry in v.values())
or v == "done")
total = len(data)
payload = {
"progress": data,
"summary": {"done": done, "total": total, "pct": done/max(total,1)*100},
}
else:
payload = {"progress": {}, "summary": {"done": 0, "total": 0, "pct": 0}}
return web.json_response(payload, headers={"Access-Control-Allow-Origin": "*"})
async def main():
app = web.Application()
app.router.add_get("/api/batch_status", handle_status)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "127.0.0.1", PORT)
await site.start()
print(f"[StatusServer] http://127.0.0.1:{PORT}/api/batch_status")
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(main())