import json from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker import server from .table import Task class LogToDB: @classmethod def INPUT_TYPES(s): return { "required": { "job_id": ("STRING", {"forceInput": True}), "log": ("STRING", {"forceInput": True}), "status": ("INT", {"default": 1, "max": 1}), }, "hidden": { "unique_id": "UNIQUE_ID", } } RETURN_TYPES = ("STRING",) FUNCTION = "log2db" OUTPUT_NODE = True OUTPUT_IS_LIST = (True,) # OUTPUT_NODE = False CATEGORY = "不忘科技-自定义节点🚩" def log2db(self, log, status, unique_id): # 获取comfy服务器队列信息 (_, prompt_id, prompt, extra_data, outputs_to_execute) = next( iter(server.PromptServer.instance.prompt_queue.currently_running.values())) job_id = extra_data["client_id"] engine = create_engine( "mysql+pymysql://root:*k3&5xxG6oqHJM@sh-cdb-1xspb808.sql.tencentcdb.com:28795/comfy", echo=True ) # Base.metadata.create_all(engine) session = sessionmaker(bind=engine)() # 查询 tasks = session.query(Task).filter(Task.prompt_id == prompt_id).all() print(prompt) result = { "curr_node_id": str(unique_id), "last_node_id": list(prompt.keys())[-1], "node_output": str(log) } if len(tasks) == 0: # 不存在插入 task = Task(prompt_id=prompt_id, job_id=job_id, result=json.dumps(result), status=status) session.add(task) elif len(tasks) == 1: # 存在更新 session.query(Task).filter(Task.prompt_id == prompt_id).update({"result": json.dumps(result), "status": status}) else: # 异常报错 raise RuntimeError("状态数据库prompt_id不唯一, 无法记录状态!") session.commit() return {"ui": {"text": json.dumps(result)}, "result": (json.dumps(result),)}