717 lines
33 KiB
Python
717 lines
33 KiB
Python
import asyncio
|
||
import asyncio.exceptions
|
||
import json
|
||
import os
|
||
import random
|
||
import subprocess
|
||
import time
|
||
import traceback
|
||
from dataclasses import dataclass
|
||
from datetime import datetime
|
||
from typing import Dict, List, Optional
|
||
|
||
import aiofiles
|
||
import aiohttp
|
||
from loguru import logger
|
||
from models import TaskStatus # 从models.py导入TaskStatus
|
||
from moviepy.editor import VideoFileClip
|
||
from pydub import AudioSegment
|
||
|
||
# 配置日志输出
|
||
# 创建logs目录
|
||
os.makedirs("logs", exist_ok=True)
|
||
|
||
# 添加错误日志文件输出(只输出ERROR级别的日志)
|
||
logger.add(
|
||
"logs/error.log",
|
||
rotation="500 MB",
|
||
retention="10 days",
|
||
level="ERROR",
|
||
format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",
|
||
)
|
||
|
||
|
||
@dataclass
|
||
class Machine:
|
||
name: str
|
||
host: str
|
||
port: int
|
||
max_concurrent: int
|
||
enabled: bool
|
||
current_tasks: int = 0
|
||
lock: Optional[asyncio.Lock] = None
|
||
|
||
|
||
class ProcessManager:
|
||
def __init__(self, config_path: str, log_dir: str = "logs", task_manager=None):
|
||
self.machines: Dict[str, Machine] = {}
|
||
self.config_path = config_path
|
||
self.log_dir = log_dir
|
||
self.request_log_file = None
|
||
self.task_manager = task_manager # 添加task_manager引用
|
||
self._load_config()
|
||
# 添加文件锁字典
|
||
self.file_locks: Dict[str, asyncio.Lock] = {}
|
||
self.output_dir_locks: Dict[str, asyncio.Lock] = {}
|
||
# 添加任务队列
|
||
self.task_queue: asyncio.Queue = asyncio.Queue()
|
||
# 添加状态汇报任务
|
||
self.status_task: Optional[asyncio.Task] = None
|
||
# 添加运行状态标志
|
||
self.is_running = False
|
||
# 添加处理任务列表
|
||
self.process_tasks: List[asyncio.Task] = []
|
||
# 初始化日志文件
|
||
self._init_log_file()
|
||
# 添加机器使用统计
|
||
self.machine_stats: Dict[str, Dict] = {}
|
||
for machine_name in self.machines:
|
||
self.machine_stats[machine_name] = {
|
||
"total_tasks": 0,
|
||
"current_tasks": 0,
|
||
"last_used": None,
|
||
"average_processing_time": 0,
|
||
"success_rate": 1.0,
|
||
"total_processing_time": 0,
|
||
}
|
||
|
||
def _init_log_file(self):
|
||
"""初始化日志文件"""
|
||
# 确保日志目录存在
|
||
os.makedirs(self.log_dir, exist_ok=True)
|
||
|
||
# 创建新的日志文件
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
self.request_log_file = os.path.join(self.log_dir, f"video_generation_requests_{timestamp}.md")
|
||
|
||
# 创建表头
|
||
with open(self.request_log_file, "w", encoding="utf-8") as f:
|
||
f.write(
|
||
f"""# 视频生成请求记录 - {timestamp}
|
||
|
||
| 开始时间 | 结束时间 | 用时(秒) | 设备 | 视频大小(MB) | 音频大小(MB) | 视频时长(秒) | 音频时长(秒) | 视频名称 | 音频名称 | 输出路径 | 状态 | 错误信息 |
|
||
|----------|----------|----------|------|-------------|-------------|-------------|-------------|----------|----------|----------|------|----------|
|
||
"""
|
||
)
|
||
|
||
def write_request_log(
|
||
self,
|
||
start_time: float,
|
||
end_time: float,
|
||
machine_name: str,
|
||
video_size: float,
|
||
audio_size: float,
|
||
video_duration: float,
|
||
audio_duration: float,
|
||
video_name: str,
|
||
audio_name: str,
|
||
output_path: str,
|
||
status: bool,
|
||
error_msg: str = "",
|
||
):
|
||
"""记录视频生成请求的详细信息"""
|
||
if not self.request_log_file:
|
||
logger.error("日志文件未初始化")
|
||
return
|
||
|
||
duration = end_time - start_time
|
||
|
||
# 追加新的记录
|
||
with open(self.request_log_file, "a", encoding="utf-8") as f:
|
||
f.write(
|
||
f"| {datetime.fromtimestamp(start_time).strftime('%Y-%m-%d %H:%M:%S')} | "
|
||
f"{datetime.fromtimestamp(end_time).strftime('%Y-%m-%d %H:%M:%S')} | "
|
||
f"{duration:.2f} | {machine_name} | {video_size:.2f} | {audio_size:.2f} | "
|
||
f"{video_duration:.2f} | {audio_duration:.2f} | {video_name} | {audio_name} | "
|
||
f"{output_path} | {'成功' if status else '失败'} | {error_msg} |\n"
|
||
)
|
||
|
||
def _load_config(self):
|
||
"""加载机器配置"""
|
||
config_path = os.path.abspath(self.config_path)
|
||
with open(config_path, "r", encoding="utf-8") as f:
|
||
config = json.load(f)
|
||
for machine_config in config["machines"]:
|
||
if machine_config["enabled"]:
|
||
machine = Machine(
|
||
name=machine_config["name"],
|
||
host=machine_config["host"],
|
||
port=machine_config["port"],
|
||
max_concurrent=machine_config["max_concurrent"],
|
||
enabled=machine_config["enabled"],
|
||
lock=asyncio.Lock(),
|
||
)
|
||
self.machines[machine.name] = machine
|
||
|
||
async def get_available_machine(self) -> Optional[Machine]:
|
||
"""获取一个可用的机器"""
|
||
for machine in self.machines.values():
|
||
async with machine.lock: # type: ignore
|
||
if machine.current_tasks < machine.max_concurrent:
|
||
machine.current_tasks += 1
|
||
return machine
|
||
return None
|
||
|
||
async def release_machine(self, machine: Machine):
|
||
"""释放机器资源"""
|
||
async with machine.lock: # type: ignore
|
||
machine.current_tasks -= 1
|
||
|
||
def _get_file_lock(self, file_path: str) -> asyncio.Lock:
|
||
"""获取文件锁"""
|
||
if file_path not in self.file_locks:
|
||
self.file_locks[file_path] = asyncio.Lock()
|
||
return self.file_locks[file_path]
|
||
|
||
def _get_output_dir_lock(self, output_dir: str) -> asyncio.Lock:
|
||
"""获取输出目录锁"""
|
||
if output_dir not in self.output_dir_locks:
|
||
self.output_dir_locks[output_dir] = asyncio.Lock()
|
||
return self.output_dir_locks[output_dir]
|
||
|
||
async def _safe_copy_file(self, src: str, dst: str, timeout: float = 300) -> bool:
|
||
"""安全地复制文件,带超时控制"""
|
||
try:
|
||
# 获取文件锁
|
||
file_lock = self._get_file_lock(src)
|
||
|
||
# 尝试获取锁,带超时控制
|
||
try:
|
||
async with asyncio.timeout(timeout):
|
||
await file_lock.acquire()
|
||
except asyncio.exceptions.TimeoutError:
|
||
logger.warning(f"等待文件锁超时: {src}")
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"获取文件锁失败: {src}, 错误: {str(e)}")
|
||
return False
|
||
|
||
try:
|
||
# 确保源文件存在
|
||
if not os.path.exists(src):
|
||
logger.error(f"源文件不存在: {src}")
|
||
return False
|
||
|
||
# 确保目标目录存在
|
||
os.makedirs(os.path.dirname(dst), exist_ok=True)
|
||
|
||
# 复制文件
|
||
async with aiofiles.open(src, "rb") as fsrc:
|
||
content = await fsrc.read()
|
||
async with aiofiles.open(dst, "wb") as fdst:
|
||
await fdst.write(content)
|
||
return True
|
||
finally:
|
||
file_lock.release()
|
||
except Exception as e:
|
||
logger.error(f"复制文件失败 {src} -> {dst}: {str(e)}")
|
||
return False
|
||
|
||
async def start(self):
|
||
"""启动进程管理器"""
|
||
self.is_running = True
|
||
# 启动状态汇报任务
|
||
self.status_task = asyncio.create_task(self._status_report())
|
||
# 启动任务处理循环
|
||
# 计算总的最大并发数
|
||
total_max_concurrent = sum(machine.max_concurrent for machine in self.machines.values())
|
||
self.process_tasks = [asyncio.create_task(self._process_tasks()) for _ in range(total_max_concurrent)]
|
||
|
||
async def stop(self):
|
||
"""停止进程管理器"""
|
||
self.is_running = False
|
||
if self.status_task:
|
||
self.status_task.cancel()
|
||
try:
|
||
await self.status_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
# 取消所有处理任务
|
||
for task in self.process_tasks:
|
||
task.cancel()
|
||
try:
|
||
await task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
async def _status_report(self):
|
||
"""定时打印状态汇报"""
|
||
while self.is_running:
|
||
report: List[str] = []
|
||
|
||
# 汇报等待队列中的任务数量
|
||
report.append(f"\n状态: 队列等待任务数量: {self.task_queue.qsize()}")
|
||
# 更新机器统计信息
|
||
for machine_name, machine in self.machines.items():
|
||
stats = self.machine_stats[machine_name]
|
||
stats["current_tasks"] = machine.current_tasks
|
||
|
||
# 计算使用率
|
||
usage_rate = (machine.current_tasks / machine.max_concurrent) * 100
|
||
|
||
report.append(f" - {machine_name}: 当前负载: {machine.current_tasks}/{machine.max_concurrent} ({usage_rate:.1f}%)")
|
||
|
||
logger.info("\n".join(report))
|
||
await asyncio.sleep(5) # 每5秒打印一次状态
|
||
|
||
async def _process_tasks(self):
|
||
"""处理任务队列中的任务"""
|
||
while self.is_running:
|
||
try:
|
||
# 从队列中获取任务
|
||
task = await self.task_queue.get()
|
||
logger.info(f"获取任务: {task}")
|
||
task_id, audio_file_path, video_file_path, output_file_path = task
|
||
|
||
# 等待可用机器
|
||
machine = await self._wait_for_available_machine()
|
||
if not machine:
|
||
logger.error("无法获取可用机器,任务将重新入队")
|
||
# 将任务重新放回队列
|
||
await self.task_queue.put(task)
|
||
# 更新任务状态为失败
|
||
if self.task_manager and task_id:
|
||
await self.task_manager.update_task_status(task_id, TaskStatus.FAILED, error_message="无法获取可用机器")
|
||
self.task_queue.task_done()
|
||
await asyncio.sleep(5) # 等待5秒后重试
|
||
continue
|
||
|
||
# 处理任务
|
||
try:
|
||
success = await self._process_single_task(task_id, machine, audio_file_path, video_file_path, output_file_path)
|
||
|
||
if not success:
|
||
logger.error(f"任务处理失败: \n音频文件: {audio_file_path}\n视频文件: {video_file_path}\n输出文件: {output_file_path}")
|
||
# 更新任务状态为失败
|
||
if self.task_manager and task_id:
|
||
await self.task_manager.update_task_status(task_id, TaskStatus.FAILED, error_message="任务处理失败")
|
||
finally:
|
||
self.task_queue.task_done()
|
||
|
||
except asyncio.CancelledError:
|
||
break
|
||
except Exception as e:
|
||
logger.error(f"任务处理出错: {str(e)} {traceback.format_exc()}")
|
||
# 更新任务状态为失败
|
||
if self.task_manager and task_id:
|
||
await self.task_manager.update_task_status(task_id, TaskStatus.FAILED, error_message=str(e))
|
||
self.task_queue.task_done()
|
||
|
||
async def _wait_for_available_machine(self, timeout: float = 3600) -> Optional[Machine]:
|
||
"""等待获取可用机器,只关注负载"""
|
||
start_time = asyncio.get_event_loop().time()
|
||
while asyncio.get_event_loop().time() - start_time < timeout:
|
||
# 获取所有未满的机器
|
||
available_machines = []
|
||
for machine in self.machines.values():
|
||
async with machine.lock: # type: ignore
|
||
if machine.current_tasks < machine.max_concurrent:
|
||
available_machines.append(machine)
|
||
|
||
if available_machines:
|
||
# 选择负载最低的机器
|
||
selected_machine = min(available_machines, key=lambda m: m.current_tasks / m.max_concurrent)
|
||
|
||
# 再次检查机器是否可用
|
||
async with selected_machine.lock: # type: ignore
|
||
if selected_machine.current_tasks < selected_machine.max_concurrent:
|
||
selected_machine.current_tasks += 1
|
||
logger.info(
|
||
f"选择机器 {selected_machine.name} 处理任务,当前负载: {selected_machine.current_tasks}/{selected_machine.max_concurrent}"
|
||
)
|
||
return selected_machine
|
||
|
||
# 所有机器都满了,等待后重试
|
||
logger.warning("所有机器都已满,等待资源释放")
|
||
await asyncio.sleep(5)
|
||
|
||
async def _process_single_task(self, task_id: str, machine: Machine, audio_file_path: str, video_file_path: str, output_file_path: str) -> bool:
|
||
"""处理单个任务"""
|
||
start_time = time.time()
|
||
success = False
|
||
|
||
try:
|
||
logger.info(f"开始处理视频 {output_file_path}")
|
||
logger.info(f"任务ID: {task_id}")
|
||
logger.info(f"音频文件路径: {audio_file_path} 是否存在: {os.path.exists(audio_file_path)}")
|
||
logger.info(f"视频文件路径: {video_file_path} 是否存在: {os.path.exists(video_file_path)}")
|
||
logger.info(f"使用机器: {machine.name}")
|
||
|
||
url = f"http://{machine.host}:{machine.port}/process"
|
||
async with aiohttp.ClientSession() as session:
|
||
# 准备文件数据
|
||
data = aiohttp.FormData()
|
||
|
||
# 直接使用原文件
|
||
async with aiofiles.open(audio_file_path, "rb") as f:
|
||
audio_content = await f.read()
|
||
data.add_field("audio", audio_content, filename=os.path.basename(audio_file_path), content_type="audio/mpeg")
|
||
|
||
async with aiofiles.open(video_file_path, "rb") as f:
|
||
video_content = await f.read()
|
||
data.add_field("video", video_content, filename=os.path.basename(video_file_path), content_type="video/mp4")
|
||
|
||
# 获取音频和视频时长
|
||
try:
|
||
# 获取音频时长
|
||
audio = AudioSegment.from_mp3(audio_file_path)
|
||
audio_duration = len(audio) / 1000.0 # 转换为秒
|
||
|
||
# 获取视频时长
|
||
video = VideoFileClip(video_file_path)
|
||
video_duration = video.duration
|
||
video.close()
|
||
|
||
logger.info(f"音频时长: {audio_duration:.2f}秒")
|
||
logger.info(f"视频时长: {video_duration:.2f}秒")
|
||
except Exception as e:
|
||
logger.warning(f"获取音频或视频时长失败: {str(e)}")
|
||
|
||
# 如果视频时长小于音频时长+1s,则使用ffmpeg通过循环的形式延长视频时长到音频时长+1s
|
||
if video_duration < audio_duration + 1:
|
||
logger.info(f"视频时长小于音频时长+1s,通过循环的形式延长视频时长到音频时长+1s")
|
||
# 计算需要循环的次数
|
||
target_duration = audio_duration + 1
|
||
loop_count = int(target_duration / video_duration) + 1
|
||
|
||
# 使用ffmpeg通过循环的形式延长视频时长
|
||
command = [
|
||
"ffmpeg",
|
||
"-stream_loop",
|
||
str(loop_count - 1), # 循环次数减1,因为原始视频也算一次
|
||
"-i",
|
||
video_file_path,
|
||
"-c",
|
||
"copy",
|
||
"-t",
|
||
str(target_duration), # 设置目标时长
|
||
"-f",
|
||
"mp4", # 明确指定输出格式
|
||
"-y",
|
||
video_file_path + ".tmp", # 使用临时文件避免覆盖原文件
|
||
]
|
||
|
||
try:
|
||
# 使用 subprocess.run 替代 asyncio.create_subprocess_exec
|
||
logger.info(f"执行FFmpeg命令: {' '.join(command)}")
|
||
result = subprocess.run(command, capture_output=True, text=True, check=False)
|
||
|
||
if result.returncode != 0:
|
||
error_msg = result.stderr if result.stderr else "未知错误"
|
||
logger.error(f"扩展视频时长失败,FFmpeg返回码: {result.returncode}, 错误信息: {error_msg}")
|
||
return False
|
||
|
||
logger.info(f"扩展视频时长完成,扩展后视频时长: {target_duration:.2f}秒")
|
||
# 替换原文件
|
||
os.replace(video_file_path + ".tmp", video_file_path)
|
||
except Exception as e:
|
||
import traceback
|
||
|
||
error_trace = traceback.format_exc()
|
||
logger.error(f"扩展视频时长时发生错误:\n命令: {' '.join(command)}\n错误信息: {str(e)}\n完整堆栈:\n{error_trace}")
|
||
return False
|
||
|
||
# 如果视频时长大于音频时长+1s,则随机裁剪到音频时长+1s
|
||
elif video_duration > audio_duration + 1:
|
||
logger.info(f"视频时长大于音频时长+1s,随机裁剪到音频时长+1s")
|
||
target_duration = audio_duration + 1
|
||
# 计算随机起始时间,确保剩余时长足够
|
||
max_start_time = video_duration - target_duration
|
||
start_time = random.uniform(0, max_start_time)
|
||
|
||
# 使用ffmpeg随机裁剪视频
|
||
command = [
|
||
"ffmpeg",
|
||
"-ss",
|
||
str(start_time),
|
||
"-i",
|
||
video_file_path,
|
||
"-t",
|
||
str(target_duration),
|
||
"-c",
|
||
"copy",
|
||
"-f",
|
||
"mp4", # 明确指定输出格式
|
||
"-y",
|
||
video_file_path + ".tmp",
|
||
]
|
||
|
||
logger.info(f"实际请求音视频时长: {target_duration:.2f}秒")
|
||
|
||
try:
|
||
# 使用 subprocess.run 替代 asyncio.create_subprocess_exec
|
||
logger.info(f"执行FFmpeg命令: {' '.join(command)}")
|
||
result = subprocess.run(command, capture_output=True, text=True, check=False)
|
||
if result.returncode != 0:
|
||
error_msg = result.stderr if result.stderr else "未知错误"
|
||
logger.error(f"裁剪视频时长失败,FFmpeg返回码: {result.returncode}, 错误信息: {error_msg}")
|
||
return False
|
||
|
||
logger.info(f"裁剪视频时长完成,裁剪后视频时长: {target_duration:.2f}秒")
|
||
# 替换原文件
|
||
os.replace(video_file_path + ".tmp", video_file_path)
|
||
logger.info(f"替换原文件完成")
|
||
except Exception as e:
|
||
import traceback
|
||
|
||
error_trace = traceback.format_exc()
|
||
logger.error(f"裁剪视频时长时发生错误:\n命令: {' '.join(command)}\n错误信息: {str(e)}\n完整堆栈:\n{error_trace}")
|
||
return False
|
||
|
||
# 打印文件大小
|
||
audio_size = len(audio_content) / (1024 * 1024) # 转换为MB
|
||
video_size = len(video_content) / (1024 * 1024) # 转换为MB
|
||
logger.info(f"音频文件大小: {audio_size:.2f}MB")
|
||
logger.info(f"视频文件大小: {video_size:.2f}MB")
|
||
|
||
# 发送请求
|
||
logger.info(f"开始进行视频生成,视频名称: {os.path.basename(video_file_path)}, 音频名称: {os.path.basename(audio_file_path)}")
|
||
start_time = time.time()
|
||
|
||
# 设置30分钟超时
|
||
timeout = aiohttp.ClientTimeout(total=1800) # 30分钟 = 1800秒
|
||
async with session.post(url, data=data, timeout=timeout) as response:
|
||
if response.status == 200:
|
||
# 检查响应的 Content-Type
|
||
content_type = response.headers.get("Content-Type", "").lower()
|
||
|
||
# 如果是 JSON 响应
|
||
if "application/json" in content_type:
|
||
try:
|
||
response_json = await response.json()
|
||
if not response_json.get("status", True): # 如果status为false或不存在
|
||
error_msg = response_json.get("msg", "未知错误")
|
||
logger.error(f"服务器处理失败: {error_msg}")
|
||
self.write_request_log(
|
||
start_time,
|
||
time.time(),
|
||
machine.name,
|
||
video_size,
|
||
audio_size,
|
||
target_duration,
|
||
audio_duration,
|
||
os.path.basename(video_file_path),
|
||
os.path.basename(audio_file_path),
|
||
output_file_path,
|
||
False,
|
||
error_msg,
|
||
)
|
||
return False
|
||
except json.JSONDecodeError:
|
||
logger.error("解析JSON响应失败")
|
||
self.write_request_log(
|
||
start_time,
|
||
time.time(),
|
||
machine.name,
|
||
video_size,
|
||
audio_size,
|
||
target_duration,
|
||
audio_duration,
|
||
os.path.basename(video_file_path),
|
||
os.path.basename(audio_file_path),
|
||
output_file_path,
|
||
False,
|
||
"解析JSON响应失败",
|
||
)
|
||
return False
|
||
# 如果是视频文件响应
|
||
elif "video/mp4" in content_type:
|
||
# 确保输出目录存在
|
||
output_dir = os.path.dirname(output_file_path)
|
||
output_lock = self._get_output_dir_lock(output_dir)
|
||
|
||
# 尝试获取输出目录锁,带超时控制
|
||
try:
|
||
async with asyncio.timeout(300): # 5分钟超时
|
||
await output_lock.acquire()
|
||
except asyncio.exceptions.TimeoutError:
|
||
logger.error(f"等待输出目录锁超时: {output_dir}")
|
||
self.write_request_log(
|
||
start_time,
|
||
time.time(),
|
||
machine.name,
|
||
video_size,
|
||
audio_size,
|
||
target_duration,
|
||
audio_duration,
|
||
os.path.basename(video_file_path),
|
||
os.path.basename(audio_file_path),
|
||
output_file_path,
|
||
False,
|
||
f"等待输出目录锁超时: {output_dir}",
|
||
)
|
||
return False
|
||
|
||
try:
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
# 保存响应内容,添加重试机制
|
||
max_retries = 3
|
||
retry_delay = 5 # 秒
|
||
last_error = None
|
||
|
||
for attempt in range(max_retries):
|
||
try:
|
||
# 读取响应内容
|
||
content = await response.read()
|
||
if not content:
|
||
raise ValueError("响应内容为空")
|
||
|
||
# 保存文件
|
||
async with aiofiles.open(output_file_path, "wb") as f:
|
||
await f.write(content)
|
||
|
||
# 验证文件是否成功保存
|
||
if not os.path.exists(output_file_path) or os.path.getsize(output_file_path) == 0:
|
||
raise ValueError("文件保存失败或文件大小为0")
|
||
|
||
logger.info(f"成功保存视频文件: {output_file_path}")
|
||
|
||
# 更新任务状态为完成
|
||
if self.task_manager and task_id:
|
||
await self.task_manager.update_task_status(task_id, TaskStatus.COMPLETED)
|
||
|
||
self.write_request_log(
|
||
start_time,
|
||
time.time(),
|
||
machine.name,
|
||
video_size,
|
||
audio_size,
|
||
target_duration,
|
||
audio_duration,
|
||
os.path.basename(video_file_path),
|
||
os.path.basename(audio_file_path),
|
||
output_file_path,
|
||
True,
|
||
)
|
||
return True
|
||
except Exception as e:
|
||
last_error = e
|
||
if attempt < max_retries - 1:
|
||
logger.warning(f"保存视频文件失败,{retry_delay}秒后重试 ({attempt + 1}/{max_retries}): {str(e)}")
|
||
await asyncio.sleep(retry_delay)
|
||
else:
|
||
logger.error(f"保存视频文件失败,达到最大重试次数: {str(last_error)}")
|
||
# 更新任务状态为失败
|
||
if self.task_manager and task_id:
|
||
await self.task_manager.update_task_status(task_id, TaskStatus.FAILED, error_message=str(last_error))
|
||
self.write_request_log(
|
||
start_time,
|
||
time.time(),
|
||
machine.name,
|
||
video_size,
|
||
audio_size,
|
||
target_duration,
|
||
audio_duration,
|
||
os.path.basename(video_file_path),
|
||
os.path.basename(audio_file_path),
|
||
output_file_path,
|
||
False,
|
||
f"保存视频文件失败: {str(last_error)}",
|
||
)
|
||
return False
|
||
finally:
|
||
output_lock.release()
|
||
else:
|
||
logger.error(f"未知的响应类型: {content_type}")
|
||
self.write_request_log(
|
||
start_time,
|
||
time.time(),
|
||
machine.name,
|
||
video_size,
|
||
audio_size,
|
||
target_duration,
|
||
audio_duration,
|
||
os.path.basename(video_file_path),
|
||
os.path.basename(audio_file_path),
|
||
output_file_path,
|
||
False,
|
||
f"未知的响应类型: {content_type}",
|
||
)
|
||
return False
|
||
else:
|
||
error_text = await response.text()
|
||
try:
|
||
error_json = json.loads(error_text)
|
||
error_msg = error_json.get("msg", error_text)
|
||
logger.error(f"处理视频失败: HTTP {response.status} - {error_msg}")
|
||
except json.JSONDecodeError:
|
||
logger.error(f"处理视频失败: HTTP {response.status} - {error_text}")
|
||
self.write_request_log(
|
||
start_time,
|
||
time.time(),
|
||
machine.name,
|
||
video_size,
|
||
audio_size,
|
||
target_duration,
|
||
audio_duration,
|
||
os.path.basename(video_file_path),
|
||
os.path.basename(audio_file_path),
|
||
output_file_path,
|
||
False,
|
||
f"HTTP {response.status} - {error_text}",
|
||
)
|
||
return False
|
||
|
||
return success
|
||
|
||
except Exception as e:
|
||
logger.error(f"处理视频时发生错误: {str(e)}")
|
||
return False
|
||
finally:
|
||
await self.release_machine(machine)
|
||
|
||
async def process_video(self, task_id: str, audio_file_path: str, video_file_path: str, out_file_path: str) -> Optional[str]:
|
||
"""添加视频处理任务到队列"""
|
||
try:
|
||
# 生成输出文件路径
|
||
# 添加任务到队列
|
||
await self.task_queue.put((task_id, audio_file_path, video_file_path, out_file_path))
|
||
return out_file_path
|
||
except Exception as e:
|
||
logger.error(f"添加任务到队列失败: {str(e)}")
|
||
return None
|
||
|
||
def get_machine_status(self) -> List[Dict]:
|
||
"""获取所有机器的状态"""
|
||
return [
|
||
{
|
||
"name": machine.name,
|
||
"host": machine.host,
|
||
"current_tasks": machine.current_tasks,
|
||
"max_concurrent": machine.max_concurrent,
|
||
"enabled": machine.enabled,
|
||
}
|
||
for machine in self.machines.values()
|
||
]
|
||
|
||
|
||
async def main():
|
||
"""
|
||
命令行入口函数
|
||
"""
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(description="视频处理管理器")
|
||
args = parser.parse_args()
|
||
|
||
logger.info("启动视频处理管理器...")
|
||
|
||
# 初始化进程管理器
|
||
process_manager = ProcessManager(config_path="config/machines.json")
|
||
|
||
await process_manager.start()
|
||
|
||
try:
|
||
# 保持程序运行
|
||
while True:
|
||
await asyncio.sleep(1)
|
||
except KeyboardInterrupt:
|
||
logger.info("收到停止信号,正在关闭...")
|
||
finally:
|
||
await process_manager.stop()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
exit(asyncio.run(main()))
|