FIX 增加jm_router

This commit is contained in:
kyj@bowong.ai 2025-06-27 19:30:33 +08:00
parent ef687a0487
commit 6740c4a03f
6 changed files with 364 additions and 7 deletions

View File

@ -19,10 +19,6 @@ dependencies = [
"av",
"imageio",
"loguru",
"openai-whisper",
"sentry-sdk",
"pydantic",
"pydantic_settings",
"conformer==0.3.2",
"einops>0.6.1"
"pydantic_settings"
]

View File

@ -6,7 +6,7 @@ from sentry_sdk.integrations.fastapi import FastApiIntegration
from fastapi.middleware.cors import CORSMiddleware
from .utils.KVCache import MediaSourceKVCache
from .router import ffmpeg, cache, comfyui, google, task
from .router import ffmpeg, cache, comfyui, google, task, jm_router
from .models.settings.cluster import WorkerConfig
config = WorkerConfig()
@ -88,3 +88,4 @@ web_app.include_router(cache.router)
web_app.include_router(google.router)
web_app.include_router(task.router)
web_app.include_router(jm_router.router)

View File

@ -0,0 +1,325 @@
# -*- coding:utf-8 -*-
"""
File jm_api_local.py
Author silence
Date 2025/6/27 10:15
"""
import random
import uvicorn
import json
from fastapi import APIRouter, HTTPException, FastAPI
import requests
import time
from loguru import logger
from typing import List
from pydantic import BaseModel
import asyncio
router = APIRouter(prefix='/jm_router', tags=['Jm_router'])
api_key = '21575c22-14aa-40ca-8aa8-f00ca27a3a17'
jm_default_prompts = [
'女人扭动身体向摄像机展示身材,一只手撩了一下头发,镜头从左向右移动并放大画面',
'女人扭动身体向摄像机展示身材,一只手撩了一下头发后放在了裤子上,镜头从左向右移动',
'时尚模特抬头自信的展示身材,扭动身体,一只手放在了头上,镜头逐渐放大聚焦在了下衣上',
'自信步伐跟拍模特,模特步伐自信地同时行走,镜头紧紧跟随。抬起手捋一捋头发。传递出自信与时尚的气息。',
'女生两只手捏着拳头轻盈的左右摇摆跳舞动作幅度不大然后把手摊开放在胸口再做出像popping心脏跳动的动作左右身体都要非常协调',
'一个年轻女子自信地在相机前展示了她优美的身材,以自然的流体动作自由地摇摆,左手撩了一下头发之后停在了胸前。一个美女自拍跳舞扭动身体的视频,手从下到上最后放在胸前,妩媚的表情',
'美女向后退了一步站在那里展示服装,双手轻轻提了一下裤子两侧,镜头从上到下逐渐放大',
'女人低头看向裤子向镜头展示身材一只手放在了头上做pose动作',
'美女向后退了一步站在那里展示服装,低头并用手抚摸裤子,镜头从上到下逐渐放大',
'美女向后退了一步站在那里展示服装,双手从上到下整理衣服,自然扭动身体,自信的表情'
]
logger.bind(name='jm_api')
async def submit_task(prompt: str, img_url: str, duration: str = '5'):
try:
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}',
}
json_data = {
'model': 'doubao-seedance-1-0-lite-i2v-250428',
'content': [
{
'type': 'text',
'text': f'{prompt} --resolution 720p --dur {duration} --camerafixed false',
},
{
'type': 'image_url',
'image_url': {
'url': img_url,
},
},
],
}
response = requests.post('https://ark.cn-beijing.volces.com/api/v3/contents/generations/tasks', headers=headers,
json=json_data)
logger.info(f'submit task: {json.dumps(response.json())}')
resp_json = response.json()
job_id = resp_json['id']
return {"data": job_id, 'status': True}
except Exception as e:
logger.error(e)
return {"data": None, "status": False}
async def sync_query_status(job_id: str):
def query_status(t_id: str):
resp_dict = {'status': False, 'data': None, 'msg': ''}
try:
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}',
}
response = requests.get(f'https://ark.cn-beijing.volces.com/api/v3/contents/generations/tasks/{t_id}',
headers=headers)
resp_json = response.json()
resp_dict['status'] = resp_json['status'] == 'succeeded'
resp_dict['msg'] = resp_json['status']
resp_dict['data'] = resp_json['content']['video_url'] if 'content' in resp_json else None
except Exception as e:
raise ValueError(str(e))
finally:
return resp_dict
timeout = 180
interval = 2
end = time.time() + timeout
final_result = {"status": False, "data": None, "msg": ""}
success = False
while time.time() < end:
tmp_data = query_status(job_id)
if tmp_data['status']:
final_result['status'] = True
final_result['data'] = tmp_data['data']
success = True
break
elif tmp_data['msg'] == 'running':
time.sleep(interval)
print(f'wait next interval:{interval}')
if not success:
final_result['msg'] = '运行超时'
return final_result
async def async_query_status(job_id: str):
resp_dict = {'status': False, 'data': None, 'msg': ''}
try:
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}',
}
response = requests.get(f'https://ark.cn-beijing.volces.com/api/v3/contents/generations/tasks/{job_id}',
headers=headers)
resp_json = response.json()
print(resp_json)
resp_dict['status'] = resp_json['status'] == 'succeeded'
resp_dict['msg'] = resp_json['status']
resp_dict['data'] = resp_json['content']['video_url'] if 'content' in resp_json else None
except Exception as e:
logger.error(f'error:{str(e)}')
resp_dict['msg'] = str(e)
finally:
return resp_dict
class VideoTaskRequest(BaseModel):
prompt: str
img_url: str
task_id: str
duration: str = '5'
class VideoTaskStatus(BaseModel):
job_ids: List[str]
class BatchVideoTaskRequest(BaseModel):
tasks: List[VideoTaskRequest]
@router.post('/jm/submit/task', summary='异步批量提交任务')
async def submit_video_task(
request: BatchVideoTaskRequest,
):
# 创建异步任务列表
async def process_single_task(task: VideoTaskRequest):
if not task.prompt:
task.prompt = random.choice(jm_default_prompts)
result = await submit_task(task.prompt, task.img_url, task.duration)
return {
"status": result.get('status', False),
"job_id": result.get('data', ''),
"img_url": task.img_url,
"task_id": task.task_id
}
# 并发执行所有任务
results = await asyncio.gather(
*[process_single_task(task) for task in request.tasks]
)
# 分离成功和失败的任务
successful_tasks = [
{"task_id": r["task_id"], "img_url": r["img_url"], "job_id": r["job_id"]}
for r in results if r["status"]
]
failed_tasks = [
{"img_url": r["img_url"]}
for r in results if not r["status"]
]
return {
"status": len(successful_tasks) > 0, # 如果有任何成功任务就返回True
"data": {
"success": successful_tasks,
"failed": failed_tasks
},
"msg": f"成功提交 {len(successful_tasks)} 个任务,失败 {len(failed_tasks)} 个任务"
}
@router.get('/jm/query/status', summary='查询视频生成任务状态')
async def query_video_task(
job_id: str,
):
if not job_id:
raise HTTPException(status_code=400, detail="job_id is required")
# 查询任务状态
result = await async_query_status(job_id)
return result
@router.post('/jm/batch/query/status', summary='批量查询视频生成任务状态')
async def batch_query_video_status(data: VideoTaskStatus):
"""
批量查询视频生成任务状态
Args:
data: VideoTaskStatus对象包含job_ids列表
Returns:
dict: 按状态分类的任务信息
{
'data': {
'finished': [{'job_id': 'xxx', 'video_url': 'xxx'}],
'failed': ['失败的job_id'],
'running': ['处理中的job_id']
},
'status': True,
'msg': ''
}
"""
if not data.job_ids:
return {
'data': {
'finished': [],
'failed': [],
'running': []
},
'status': True,
'msg': '任务列表为空'
}
logger.info(f"开始批量查询 {len(data.job_ids)} 个视频任务状态")
# 创建异步任务列表
async def query_single_task(job_id: str):
try:
result = await async_query_status(job_id)
return {
'job_id': job_id,
'status': result.get('status', False),
'msg': result.get('msg', ''),
'data': result.get('data', None)
}
except Exception as e:
logger.error(f"查询任务 {job_id} 失败: {str(e)}")
return {
'job_id': job_id,
'status': False,
'msg': f'查询异常: {str(e)}',
'data': None
}
try:
# 并发查询所有任务
results = await asyncio.gather(
*[query_single_task(job_id) for job_id in data.job_ids],
return_exceptions=True
)
# 分类处理结果
finished_tasks = []
failed_tasks = []
running_tasks = []
for i, result in enumerate(results):
job_id = data.job_ids[i]
if isinstance(result, Exception):
logger.error(f"任务 {job_id} 查询异常: {str(result)}")
failed_tasks.append(job_id)
continue
# 根据状态分类
if result['status'] and result['data']:
# 任务完成且有视频URL
finished_tasks.append({
'job_id': job_id,
'video_url': result['data']
})
logger.debug(f"任务 {job_id} 已完成: {result['data']}")
elif result['msg'] in ['running', 'pending', 'processing']:
# 任务处理中
running_tasks.append(job_id)
logger.debug(f"任务 {job_id} 处理中: {result['msg']}")
else:
# 任务失败或其他异常状态
failed_tasks.append(job_id)
logger.debug(f"任务 {job_id} 失败: {result['msg']}")
# 构造返回结果
finished_count = len(finished_tasks)
failed_count = len(failed_tasks)
running_count = len(running_tasks)
return {
'data': {
'finished': finished_tasks,
'failed': failed_tasks,
'running': running_tasks
},
'status': True,
'msg': f'批量查询完成 - 已完成:{finished_count}, 失败:{failed_count}, 处理中:{running_count}'
}
except Exception as e:
logger.error(f"批量查询视频任务状态异常: {str(e)}")
return {
'data': {
'finished': [],
'failed': data.job_ids.copy(), # 异常时将所有任务都标记为失败
'running': []
},
'status': False,
'msg': f'批量查询异常: {str(e)}'
}
if __name__ == '__main__':
app = FastAPI()
url = 'http://127.0.0.1:9010/docs'
uvicorn.run(app, host='127.0.0.1', port=9010)

View File

@ -3,6 +3,7 @@ from BowongModalFunctions.models.settings.cluster import WorkerConfig
from .video import app as media_app
from .web import app as web_app
from .ffmpeg_app import app as ffmpeg_app
from .comfyui_api import comfyui_app as comfyui_api_app
# from .comfyui_v1 import app as comfyui_v1_app
# from .comfyui_v2 import app as comfyui_v2_app
@ -15,5 +16,6 @@ app = modal.App(config.modal_app_name,
app.include(media_app)
app.include(ffmpeg_app)
app.include(web_app)
app.include(comfyui_api_app)
# app.include(comfyui_v1_app)
# app.include(comfyui_v2_app)

View File

@ -0,0 +1,33 @@
import subprocess
import modal
from cluster.comfyui_v1 import comfyui_image
image = ( # build up a Modal Image to run ComfyUI, step by step
modal.Image.debian_slim( # start from basic Linux with Python
python_version="3.10"
)
.apt_install("git", "gcc", "libportaudio2", "ffmpeg")
.pip_install("fastapi[standard]==0.115.4")
.pip_install("comfy_cli==0.0.0",index_url="https://packages-1747622887395:0ee15474ccd7b27b57ca63a9306327678e6c2631@g-ldyi2063-pypi.pkg.coding.net/dev/packages/simple")
.run_commands(
"comfy --skip-prompt install --fast-deps --nvidia --version 0.3.40"
)
.pip_install_from_pyproject("../pyproject_comfyui.toml")
.run_commands("comfy node install https://e.coding.net/g-ldyi2063/dev/ComfyUI-CustomNode.git")
)
comfyui_app = modal.App(image=image)
custom_secret = modal.Secret.from_name("comfyui-custom-secret", environment_name="dev")
@comfyui_app.function(
max_containers=1, # limit interactive session to 1 container
secrets=[custom_secret],
region="ap"
)
@modal.concurrent(
max_inputs=10
) # required for UI startup process which runs several API calls concurrently
@modal.web_server(8000, startup_timeout=60)
def ui():
subprocess.Popen("comfy launch -- --cpu --listen 0.0.0.0 --port 8000", shell=True)

View File

@ -28,7 +28,7 @@ with fastapi_image.imports():
config = WorkerConfig()
@app.function(scaledown_window=60,
@app.function(scaledown_window=600,
secrets=[
modal.Secret.from_name("cf-kv-secret"),
],