4.2 KiB
4.2 KiB
Multi-HeyGem 视频处理服务
Multi-HeyGem 是一个分布式视频处理服务,支持多机器并行处理视频和音频合成任务。该服务提供了 RESTful API 接口,可以方便地集成到其他系统中。
功能特点
- 支持多机器分布式处理
- 异步任务处理
- RESTful API 接口
- 任务状态追踪
- 支持回调通知
- 详细的日志记录
- 自动负载均衡
系统要求
配置机器信息:
编辑 config/machines.json 文件,添加您的处理机器信息:
{
"machines": [
{
"name": "机器名称",
"host": "机器IP地址",
"port": 8000,
"max_concurrent": 1,
"enabled": true
}
]
}
启动服务
python api.py
服务将在 http://0.0.0.0:8101 启动。
API 接口
通过 http://localhost:8101/docs 可查看 api 接口
创建任务
创建任务接口需要上传文件,以下是使用示例:
同步生成示例(直接返回结果)
import requests
import json
# API 服务地址
API_URL = "http://localhost:8101"
# 准备文件
files = {
'audio': ('audio.mp3', open('./samples/input/audio.mp3', 'rb')),
'video': ('video.mp4', open('./samples/input/video.mp4', 'rb'))
}
# 准备请求数据
data = {
'description': '测试任务'
}
# 发送同步生成请求
response = requests.post(
f"{API_URL}/tasks/sync",
files=files,
data=data
)
# 处理响应
if response.status_code == 200:
# 直接保存返回的视频流
output_path = './output/result.mp4'
with open(output_path, 'wb') as f:
f.write(response.content)
print(f"视频生成成功!已保存到: {output_path}")
else:
print(f"视频生成失败: {response.text}")
异步调用示例
import aiohttp
import aiofiles
import asyncio
async def create_task():
API_URL = "http://localhost:8101"
# 准备文件
audio_file_path = './samples/input/audio.mp3'
video_file_path = './samples/input/video.mp4'
# 创建 FormData
data = aiohttp.FormData()
# 添加音频文件
async with aiofiles.open(audio_file_path, 'rb') as f:
audio_content = await f.read()
data.add_field(
'audio_file',
audio_content,
filename='audio.mp3',
content_type='audio/mpeg'
)
# 添加视频文件
async with aiofiles.open(video_file_path, 'rb') as f:
video_content = await f.read()
data.add_field(
'video_file',
video_content,
filename='video.mp4',
content_type='video/mp4'
)
# 添加其他参数
data.add_field('description', '测试任务')
data.add_field('callback_url', 'http://your-callback-url.com/webhook') # 可选的回调地址
# 发送请求
async with aiohttp.ClientSession() as session:
async with session.post(f"{API_URL}/tasks", data=data) as response:
if response.status == 200:
task = await response.json()
print(f"任务创建成功!任务ID: {task['task_id']}")
print(f"任务状态: {task['status']}")
# 查询任务状态
task_id = task['task_id']
async with session.get(f"{API_URL}/tasks/{task_id}") as status_response:
task_status = await status_response.json()
print(f"当前任务状态: {task_status['status']}")
else:
error_text = await response.text()
print(f"任务创建失败: {error_text}")
# 运行异步函数
asyncio.run(create_task())
任务状态说明
pending: 任务等待处理processing: 任务正在处理中completed: 任务处理完成failed: 任务处理失败
回调通知
当任务状态发生变化时,如果设置了 callback_url,系统会向该地址发送 POST 请求,请求体包含:
{
"task_id": "任务ID",
"status": "任务状态",
"completed_at": "完成时间",
"error_message": "错误信息(如果有)"
}