diff --git a/pyproject_comfyui.toml b/pyproject_comfyui.toml index 5bec768..d3b3c18 100644 --- a/pyproject_comfyui.toml +++ b/pyproject_comfyui.toml @@ -19,10 +19,6 @@ dependencies = [ "av", "imageio", "loguru", - "openai-whisper", - "sentry-sdk", "pydantic", - "pydantic_settings", - "conformer==0.3.2", - "einops>0.6.1" + "pydantic_settings" ] \ No newline at end of file diff --git a/src/BowongModalFunctions/api.py b/src/BowongModalFunctions/api.py index 1159bad..c0d8ebd 100644 --- a/src/BowongModalFunctions/api.py +++ b/src/BowongModalFunctions/api.py @@ -6,7 +6,7 @@ from sentry_sdk.integrations.fastapi import FastApiIntegration from fastapi.middleware.cors import CORSMiddleware from .utils.KVCache import MediaSourceKVCache -from .router import ffmpeg, cache, comfyui, google, task +from .router import ffmpeg, cache, comfyui, google, task, jm_router from .models.settings.cluster import WorkerConfig config = WorkerConfig() @@ -88,3 +88,4 @@ web_app.include_router(cache.router) web_app.include_router(google.router) web_app.include_router(task.router) +web_app.include_router(jm_router.router) diff --git a/src/BowongModalFunctions/router/jm_router.py b/src/BowongModalFunctions/router/jm_router.py new file mode 100644 index 0000000..c1daf6a --- /dev/null +++ b/src/BowongModalFunctions/router/jm_router.py @@ -0,0 +1,325 @@ +# -*- coding:utf-8 -*- +""" +File jm_api_local.py +Author silence +Date 2025/6/27 10:15 +""" +import random +import uvicorn +import json +from fastapi import APIRouter, HTTPException, FastAPI +import requests +import time +from loguru import logger +from typing import List +from pydantic import BaseModel +import asyncio + +router = APIRouter(prefix='/jm_router', tags=['Jm_router']) + +api_key = '21575c22-14aa-40ca-8aa8-f00ca27a3a17' + +jm_default_prompts = [ + '女人扭动身体向摄像机展示身材,一只手撩了一下头发,镜头从左向右移动并放大画面', + '女人扭动身体向摄像机展示身材,一只手撩了一下头发后放在了裤子上,镜头从左向右移动', + '时尚模特抬头自信的展示身材,扭动身体,一只手放在了头上,镜头逐渐放大聚焦在了下衣上', + '自信步伐跟拍模特,模特步伐自信地同时行走,镜头紧紧跟随。抬起手捋一捋头发。传递出自信与时尚的气息。', + '女生两只手捏着拳头轻盈的左右摇摆跳舞,动作幅度不大,然后把手摊开放在胸口再做出像popping心脏跳动的动作,左右身体都要非常协调', + '一个年轻女子自信地在相机前展示了她优美的身材,以自然的流体动作自由地摇摆,左手撩了一下头发之后停在了胸前。一个美女自拍跳舞扭动身体的视频,手从下到上最后放在胸前,妩媚的表情', + '美女向后退了一步站在那里展示服装,双手轻轻提了一下裤子两侧,镜头从上到下逐渐放大', + '女人低头看向裤子,向镜头展示身材,一只手放在了头上做pose动作', + '美女向后退了一步站在那里展示服装,低头并用手抚摸裤子,镜头从上到下逐渐放大', + '美女向后退了一步站在那里展示服装,双手从上到下整理衣服,自然扭动身体,自信的表情' +] + +logger.bind(name='jm_api') + + +async def submit_task(prompt: str, img_url: str, duration: str = '5'): + try: + headers = { + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {api_key}', + } + + json_data = { + 'model': 'doubao-seedance-1-0-lite-i2v-250428', + 'content': [ + { + 'type': 'text', + 'text': f'{prompt} --resolution 720p --dur {duration} --camerafixed false', + }, + { + 'type': 'image_url', + 'image_url': { + 'url': img_url, + }, + }, + ], + } + + response = requests.post('https://ark.cn-beijing.volces.com/api/v3/contents/generations/tasks', headers=headers, + json=json_data) + logger.info(f'submit task: {json.dumps(response.json())}') + resp_json = response.json() + job_id = resp_json['id'] + return {"data": job_id, 'status': True} + except Exception as e: + logger.error(e) + return {"data": None, "status": False} + + +async def sync_query_status(job_id: str): + def query_status(t_id: str): + resp_dict = {'status': False, 'data': None, 'msg': ''} + try: + headers = { + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {api_key}', + } + + response = requests.get(f'https://ark.cn-beijing.volces.com/api/v3/contents/generations/tasks/{t_id}', + headers=headers) + resp_json = response.json() + resp_dict['status'] = resp_json['status'] == 'succeeded' + resp_dict['msg'] = resp_json['status'] + resp_dict['data'] = resp_json['content']['video_url'] if 'content' in resp_json else None + except Exception as e: + raise ValueError(str(e)) + finally: + return resp_dict + + timeout = 180 + interval = 2 + end = time.time() + timeout + final_result = {"status": False, "data": None, "msg": ""} + success = False + while time.time() < end: + tmp_data = query_status(job_id) + if tmp_data['status']: + final_result['status'] = True + final_result['data'] = tmp_data['data'] + success = True + break + elif tmp_data['msg'] == 'running': + time.sleep(interval) + print(f'wait next interval:{interval}') + if not success: + final_result['msg'] = '运行超时' + return final_result + + +async def async_query_status(job_id: str): + resp_dict = {'status': False, 'data': None, 'msg': ''} + try: + headers = { + 'Content-Type': 'application/json', + 'Authorization': f'Bearer {api_key}', + } + + response = requests.get(f'https://ark.cn-beijing.volces.com/api/v3/contents/generations/tasks/{job_id}', + headers=headers) + resp_json = response.json() + print(resp_json) + resp_dict['status'] = resp_json['status'] == 'succeeded' + resp_dict['msg'] = resp_json['status'] + resp_dict['data'] = resp_json['content']['video_url'] if 'content' in resp_json else None + except Exception as e: + logger.error(f'error:{str(e)}') + resp_dict['msg'] = str(e) + finally: + return resp_dict + + +class VideoTaskRequest(BaseModel): + prompt: str + img_url: str + task_id: str + duration: str = '5' + + +class VideoTaskStatus(BaseModel): + job_ids: List[str] + + +class BatchVideoTaskRequest(BaseModel): + tasks: List[VideoTaskRequest] + + +@router.post('/jm/submit/task', summary='异步批量提交任务') +async def submit_video_task( + request: BatchVideoTaskRequest, +): + # 创建异步任务列表 + async def process_single_task(task: VideoTaskRequest): + if not task.prompt: + task.prompt = random.choice(jm_default_prompts) + result = await submit_task(task.prompt, task.img_url, task.duration) + + return { + "status": result.get('status', False), + "job_id": result.get('data', ''), + "img_url": task.img_url, + "task_id": task.task_id + } + + # 并发执行所有任务 + results = await asyncio.gather( + *[process_single_task(task) for task in request.tasks] + ) + + # 分离成功和失败的任务 + successful_tasks = [ + {"task_id": r["task_id"], "img_url": r["img_url"], "job_id": r["job_id"]} + for r in results if r["status"] + ] + failed_tasks = [ + {"img_url": r["img_url"]} + for r in results if not r["status"] + ] + + return { + "status": len(successful_tasks) > 0, # 如果有任何成功任务就返回True + "data": { + "success": successful_tasks, + "failed": failed_tasks + }, + "msg": f"成功提交 {len(successful_tasks)} 个任务,失败 {len(failed_tasks)} 个任务" + } + + +@router.get('/jm/query/status', summary='查询视频生成任务状态') +async def query_video_task( + job_id: str, +): + if not job_id: + raise HTTPException(status_code=400, detail="job_id is required") + + # 查询任务状态 + result = await async_query_status(job_id) + return result + + +@router.post('/jm/batch/query/status', summary='批量查询视频生成任务状态') +async def batch_query_video_status(data: VideoTaskStatus): + """ + 批量查询视频生成任务状态 + + Args: + data: VideoTaskStatus对象,包含job_ids列表 + + Returns: + dict: 按状态分类的任务信息 + { + 'data': { + 'finished': [{'job_id': 'xxx', 'video_url': 'xxx'}], + 'failed': ['失败的job_id'], + 'running': ['处理中的job_id'] + }, + 'status': True, + 'msg': '' + } + """ + if not data.job_ids: + return { + 'data': { + 'finished': [], + 'failed': [], + 'running': [] + }, + 'status': True, + 'msg': '任务列表为空' + } + + logger.info(f"开始批量查询 {len(data.job_ids)} 个视频任务状态") + + # 创建异步任务列表 + async def query_single_task(job_id: str): + try: + result = await async_query_status(job_id) + return { + 'job_id': job_id, + 'status': result.get('status', False), + 'msg': result.get('msg', ''), + 'data': result.get('data', None) + } + except Exception as e: + logger.error(f"查询任务 {job_id} 失败: {str(e)}") + return { + 'job_id': job_id, + 'status': False, + 'msg': f'查询异常: {str(e)}', + 'data': None + } + + try: + # 并发查询所有任务 + results = await asyncio.gather( + *[query_single_task(job_id) for job_id in data.job_ids], + return_exceptions=True + ) + + # 分类处理结果 + finished_tasks = [] + failed_tasks = [] + running_tasks = [] + + for i, result in enumerate(results): + job_id = data.job_ids[i] + + if isinstance(result, Exception): + logger.error(f"任务 {job_id} 查询异常: {str(result)}") + failed_tasks.append(job_id) + continue + + # 根据状态分类 + if result['status'] and result['data']: + # 任务完成且有视频URL + finished_tasks.append({ + 'job_id': job_id, + 'video_url': result['data'] + }) + logger.debug(f"任务 {job_id} 已完成: {result['data']}") + + elif result['msg'] in ['running', 'pending', 'processing']: + # 任务处理中 + running_tasks.append(job_id) + logger.debug(f"任务 {job_id} 处理中: {result['msg']}") + + else: + # 任务失败或其他异常状态 + failed_tasks.append(job_id) + logger.debug(f"任务 {job_id} 失败: {result['msg']}") + + # 构造返回结果 + finished_count = len(finished_tasks) + failed_count = len(failed_tasks) + running_count = len(running_tasks) + + return { + 'data': { + 'finished': finished_tasks, + 'failed': failed_tasks, + 'running': running_tasks + }, + 'status': True, + 'msg': f'批量查询完成 - 已完成:{finished_count}, 失败:{failed_count}, 处理中:{running_count}' + } + + except Exception as e: + logger.error(f"批量查询视频任务状态异常: {str(e)}") + return { + 'data': { + 'finished': [], + 'failed': data.job_ids.copy(), # 异常时将所有任务都标记为失败 + 'running': [] + }, + 'status': False, + 'msg': f'批量查询异常: {str(e)}' + } + + +if __name__ == '__main__': + app = FastAPI() + url = 'http://127.0.0.1:9010/docs' + uvicorn.run(app, host='127.0.0.1', port=9010) diff --git a/src/cluster/app.py b/src/cluster/app.py index ae35187..c2e0118 100644 --- a/src/cluster/app.py +++ b/src/cluster/app.py @@ -3,6 +3,7 @@ from BowongModalFunctions.models.settings.cluster import WorkerConfig from .video import app as media_app from .web import app as web_app from .ffmpeg_app import app as ffmpeg_app +from .comfyui_api import comfyui_app as comfyui_api_app # from .comfyui_v1 import app as comfyui_v1_app # from .comfyui_v2 import app as comfyui_v2_app @@ -15,5 +16,6 @@ app = modal.App(config.modal_app_name, app.include(media_app) app.include(ffmpeg_app) app.include(web_app) +app.include(comfyui_api_app) # app.include(comfyui_v1_app) # app.include(comfyui_v2_app) diff --git a/src/cluster/comfyui_api.py b/src/cluster/comfyui_api.py new file mode 100644 index 0000000..f68035a --- /dev/null +++ b/src/cluster/comfyui_api.py @@ -0,0 +1,33 @@ +import subprocess + +import modal +from cluster.comfyui_v1 import comfyui_image + +image = ( # build up a Modal Image to run ComfyUI, step by step + modal.Image.debian_slim( # start from basic Linux with Python + python_version="3.10" + ) + .apt_install("git", "gcc", "libportaudio2", "ffmpeg") + .pip_install("fastapi[standard]==0.115.4") + .pip_install("comfy_cli==0.0.0",index_url="https://packages-1747622887395:0ee15474ccd7b27b57ca63a9306327678e6c2631@g-ldyi2063-pypi.pkg.coding.net/dev/packages/simple") + .run_commands( + "comfy --skip-prompt install --fast-deps --nvidia --version 0.3.40" + ) + .pip_install_from_pyproject("../pyproject_comfyui.toml") + .run_commands("comfy node install https://e.coding.net/g-ldyi2063/dev/ComfyUI-CustomNode.git") +) +comfyui_app = modal.App(image=image) +custom_secret = modal.Secret.from_name("comfyui-custom-secret", environment_name="dev") + + +@comfyui_app.function( + max_containers=1, # limit interactive session to 1 container + secrets=[custom_secret], + region="ap" +) +@modal.concurrent( + max_inputs=10 +) # required for UI startup process which runs several API calls concurrently +@modal.web_server(8000, startup_timeout=60) +def ui(): + subprocess.Popen("comfy launch -- --cpu --listen 0.0.0.0 --port 8000", shell=True) \ No newline at end of file diff --git a/src/cluster/web.py b/src/cluster/web.py index 2276955..d353013 100644 --- a/src/cluster/web.py +++ b/src/cluster/web.py @@ -28,7 +28,7 @@ with fastapi_image.imports(): config = WorkerConfig() - @app.function(scaledown_window=60, + @app.function(scaledown_window=600, secrets=[ modal.Secret.from_name("cf-kv-secret"), ],