25 KiB
import glob import mimetypes import os import threading import time import tkinter as tk from tkinter import ttk, filedialog, scrolledtext, messagebox
import requests from loguru import logger from qcloud_cos import CosConfig from qcloud_cos import CosS3Client
cos_bucket_name = 'sucai-1324682537' cos_secret_id = 'AKIDsrihIyjZOBsjimt8TsN8yvv1AMh5dB44' cos_secret_key = 'CPZcxdk6W39Jd4cGY95wvupoyMd0YFqW' cos_region = 'ap-shanghai'
api_key = '21575c22-14aa-40ca-8aa8-f00ca27a3a17'
default_prompt = [ "女人扭动身体向摄像机展示身材,一只手撩了一下头发,镜头从左向右移动并放大画面", # "女人扭动身体向摄像机展示身材,一只手撩了一下头发后放在了裤子上,镜头从左向右移动", "时尚模特抬头自信的展示身材,扭动身体,一只手放在了头上,镜头逐渐放大聚焦在了下衣上", "女人扭动身体向摄像机展示身材,一只手撩了一下头发后放在了裤子上,镜头从左向右移动", "自信步伐跟拍模特,模特步伐自信地同时行走,镜头紧紧跟随。抬起手捋一捋头发。传递出自信与时尚的气息。", "女生两只手捏着拳头轻盈的左右摇摆跳舞,动作幅度不大,然后把手摊开放在胸口再做出像popping心脏跳动的动作,左右身体都要非常协调", "一个年轻女子自信地在相机前展示了她优美的身材,以自然的流体动作自由地摇摆,左手撩了一下头发之后停在了胸前。一个美女自拍跳舞扭动身体的视频,手从下到上最后放在胸前,妩媚的表情", "美女向后退了一步站在那里展示服装,双手轻轻提了一下裤子两侧,镜头从上到下逐渐放大", "女人低头看向裤子,向镜头展示身材,一只手放在了头上做pose动作", "美女向后退了一步站在那里展示服装,低头并用手抚摸裤子,镜头从上到下逐渐放大", "美女向后退了一步站在那里展示服装,双手从上到下整理衣服,自然扭动身体,自信的表情" ]
default_prompt_str = '\n'.join(default_prompt)
pyinstaller -F -w --name seed_video jm_video_ui.py
class VideoUtils:
@staticmethod
def download_video(video_url: str, save_path: str) -> str:
try:
file_name = f'{int(time.time() * 1000)}.mp4'
response = requests.get(video_url, stream=True)
full_path = os.path.join(save_path, file_name)
with open(full_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=1024 * 1024 * 5):
if chunk:
f.write(chunk)
return full_path
except Exception as e:
logger.error(e)
pass
@staticmethod
def upload_file_to_cos(file_path: str, remove_src_file: bool = False):
resp_data = {'status': True, 'data': '', 'msg': ''}
mime_type, _ = mimetypes.guess_type(file_path)
category = mime_type.split('/')[0]
f_name = os.path.basename(file_path)
suffix = f_name.split('.')[-1]
real_name = f'{int(time.time() * 1000)}.{suffix}'
try:
object_key = f'tk/{category}/{real_name}'
config = CosConfig(Region=cos_region, SecretId=cos_secret_id, SecretKey=cos_secret_key)
client = CosS3Client(config)
_ = client.upload_file(
Bucket=cos_bucket_name,
Key=object_key,
LocalFilePath=file_path,
EnableMD5=False,
progress_callback=None
)
url = f'https://{cos_bucket_name}.cos.ap-shanghai.myqcloud.com/{object_key}'
resp_data['data'] = url
resp_data['msg'] = '上传成功'
except Exception as e:
logger.error(e)
resp_data['status'] = False
resp_data['msg'] = str(e)
finally:
if remove_src_file:
os.remove(file_path)
return resp_data
@staticmethod
def submit_task(prompt: str, img_url: str, duration: str = '5', model_type:str='lite'):
"""
:param prompt: 生成视频的提示词
:param img_url:
:param duration:
:return:
"""
if model_type == 'lite':
model = 'doubao-seedance-1-0-lite-i2v-250428'
resolution = '720p'
else:
model = 'doubao-seedance-1-0-pro-250528'
resolution = '1080p'
if duration not in ('5', '10'):
logger.error('Duration must be either 5 or 10')
try:
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}',
}
json_data = {
'model': model,
'content': [
{
'type': 'text',
'text': f'{prompt} --resolution {resolution} --dur {duration} --camerafixed false',
},
{
'type': 'image_url',
'image_url': {
'url': img_url,
},
},
],
}
response = requests.post('https://ark.cn-beijing.volces.com/api/v3/contents/generations/tasks',
headers=headers,
json=json_data)
# 检查HTTP状态码
if response.status_code != 200:
error_msg = f"API请求失败,状态码: {response.status_code}, 响应: {response.text}"
logger.error(error_msg)
return {"status": False, 'msg': error_msg}
resp_json = response.json()
# 检查响应中是否包含id字段
if 'id' not in resp_json:
error_msg = f"API响应缺少id字段,响应内容: {resp_json}"
logger.error(error_msg)
return {"status": False, 'msg': error_msg}
job_id = resp_json['id']
return {"data": job_id, 'status': True}
except Exception as e:
logger.error(e)
return {"status": False, 'msg': str(e)}
@staticmethod
def query_task_result(job_id, timeout: int = 180, interval: int = 2, progress_callback=None):
def query_status(t_id: str):
resp_dict = {'status': False, 'data': None, 'msg': ''}
try:
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {api_key}',
}
response = requests.get(f'https://ark.cn-beijing.volces.com/api/v3/contents/generations/tasks/{t_id}',
headers=headers)
resp_json = response.json()
resp_dict['status'] = resp_json['status'] == 'succeeded'
resp_dict['msg'] = resp_json['status']
resp_dict['data'] = resp_json['content']['video_url'] if 'content' in resp_json else None
except Exception as e:
logger.error(e)
resp_dict['msg'] = str(e)
finally:
return resp_dict
end = time.time() + timeout
final_result = {"status": False, "data": None, "msg": ""}
success = False
wait_count = 0
# 添加开始查询的日志
if progress_callback:
progress_callback(f" 开始查询任务状态,任务ID: {job_id}")
while time.time() < end:
tmp_data = query_status(job_id)
if tmp_data['status']:
final_result['status'] = True
final_result['data'] = tmp_data['data']
final_result['msg'] = 'succeeded'
success = True
if progress_callback:
progress_callback(f" ✓ 视频生成完成!")
break
elif tmp_data['msg'] == 'running':
wait_count += 1
elapsed = wait_count * interval
remaining = max(0, timeout - elapsed)
progress_msg = f" ⏳ 任务运行中,已等待{elapsed}秒,预计剩余{remaining}秒..."
logger.info(progress_msg)
if progress_callback:
progress_callback(progress_msg)
time.sleep(interval)
elif tmp_data['msg'] == 'failed':
final_result['msg'] = '任务执行失败'
if progress_callback:
progress_callback(f" ✗ 任务执行失败")
break
elif tmp_data['msg'] == 'pending' or tmp_data['msg'] == 'queued':
wait_count += 1
elapsed = wait_count * interval
remaining = max(0, timeout - elapsed)
progress_msg = f" ⏳ 任务排队中,已等待{elapsed}秒,预计剩余{remaining}秒..."
logger.info(progress_msg)
if progress_callback:
progress_callback(progress_msg)
time.sleep(interval)
else:
# 其他未知状态继续等待
wait_count += 1
logger.info(f" 未知状态: {tmp_data['msg']},继续等待...")
if progress_callback:
progress_callback(f" ❔ 状态: {tmp_data['msg']},继续等待...")
time.sleep(interval)
if not success and final_result['msg'] == '':
final_result['msg'] = '任务超时'
if progress_callback:
progress_callback(f" ⏰ 任务查询超时({timeout}秒)")
return final_result
@staticmethod
def local_file_to_video(file_path: str, prompt: str, duration: str = '5', model_type: str = 'lite',
timeout: int = 180, interval: int = 2, save_path: str = None, progress_callback=None):
"""
将本地图片文件转换为视频
:param file_path: 本地图片文件路径
:param prompt: 视频生成提示词
:param duration: 视频时长 ('5' 或 '10')
:param model_type: 模型类型 ('lite' 或 'pro')
:param timeout: 任务超时时间(秒)
:param interval: 查询间隔(秒)
:param save_path: 视频保存目录
:return: {'status': bool, 'video_path': str, 'msg': str}
"""
result = {'status': False, 'video_path': '', 'msg': ''}
try:
# 检查文件是否存在
if not os.path.exists(file_path):
result['msg'] = f'文件不存在: {file_path}'
logger.error(result['msg'])
return result
# 步骤1: 上传图片到COS
if progress_callback:
progress_callback(" 📤 正在上传图片到云存储...")
logger.info(f"正在上传图片到COS: {file_path}")
cos_dict = VideoUtils.upload_file_to_cos(file_path)
if not cos_dict['status']:
result['msg'] = f"上传图片失败: {cos_dict['msg']}"
logger.error(result['msg'])
return result
img_url = cos_dict['data']
if progress_callback:
progress_callback(" ✓ 图片上传成功")
logger.info(f"图片上传成功: {img_url}")
# 步骤2: 提交视频生成任务
if progress_callback:
progress_callback(" 🚀 正在提交视频生成任务...")
logger.info("正在提交视频生成任务...")
task_dict = VideoUtils.submit_task(prompt, img_url, duration, model_type)
if not task_dict['status']:
result['msg'] = f"提交任务失败: {task_dict['msg']}"
logger.error(result['msg'])
return result
task_id = task_dict['data']
if progress_callback:
progress_callback(f" ✓ 任务提交成功,任务ID: {task_id}")
logger.info(f"任务提交成功,任务ID: {task_id}")
# 步骤3: 查询任务结果
if progress_callback:
progress_callback(" ⏳ 正在等待视频生成完成...")
logger.info("正在等待视频生成完成...")
status_dict = VideoUtils.query_task_result(task_id, timeout=timeout, interval=interval,
progress_callback=progress_callback)
if not status_dict['status']:
result['msg'] = f"视频生成失败: {status_dict['msg']}"
logger.error(result['msg'])
return result
video_url = status_dict['data']
logger.info(f"视频生成成功: {video_url}")
# 步骤4: 下载视频到本地
if save_path:
if progress_callback:
progress_callback(" 📥 正在下载视频到本地...")
logger.info("正在下载视频到本地...")
video_path = VideoUtils.download_video(video_url, save_path)
if video_path and os.path.exists(video_path):
result['status'] = True
result['video_path'] = video_path
result['msg'] = '视频生成并下载成功'
if progress_callback:
progress_callback(f" ✓ 视频下载成功: {os.path.basename(video_path)}")
logger.info(f"视频下载成功: {video_path}")
else:
result['msg'] = '视频下载失败'
if progress_callback:
progress_callback(" ✗ 视频下载失败")
logger.error(result['msg'])
else:
result['status'] = True
result['video_path'] = video_url
result['msg'] = '视频生成成功'
except Exception as e:
result['msg'] = f'处理过程中发生异常: {str(e)}'
logger.error(result['msg'])
return result
def run_actual_script_logic(image_folder, prompt, output_video_dir, video_duration, model_type, add_log_entry_func, other_params=None): add_log_entry_func(f"开始处理任务...") add_log_entry_func(f" 图片文件夹: {image_folder}") add_log_entry_func(f" 提示词: \n{prompt[:100]}{'...' if len(prompt) > 100 else ''}") add_log_entry_func(f" 视频保存目录: {output_video_dir}") add_log_entry_func(f" 视频时长: {video_duration}秒") add_log_entry_func(f" 模型类型: {model_type}")
if other_params:
for key, value in other_params.items():
add_log_entry_func(f" 额外参数 {key}: {value}")
try:
# 筛选图片文件
# img_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff', '.webp']
img_list = glob.glob(f'{image_folder}/*')
add_log_entry_func(f'目录下找到 {len(img_list)} 张图片')
if len(img_list) == 0:
add_log_entry_func("警告: 未找到任何图片文件")
return
# 解析提示词列表
prompt_lines = [line.strip() for line in prompt.split('\n') if line.strip()]
if not prompt_lines:
add_log_entry_func("错误: 提示词不能为空")
return
add_log_entry_func(f'解析到 {len(prompt_lines)} 个提示词,将循环使用')
# 确保输出目录存在
if not os.path.exists(output_video_dir):
os.makedirs(output_video_dir)
add_log_entry_func(f"创建输出目录: {output_video_dir}")
success_count = 0
failed_count = 0
# 处理每个图片文件
for i, img_path in enumerate(img_list):
img_name = os.path.basename(img_path)
# 使用循环提示词
current_prompt = prompt_lines[i % len(prompt_lines)]
add_log_entry_func(f"[{i + 1}/{len(img_list)}] 处理图片: {img_name}")
add_log_entry_func(
f" 使用提示词[{i % len(prompt_lines) + 1}]: {current_prompt[:50]}{'...' if len(current_prompt) > 50 else ''}")
try:
# 调用视频生成功能
result = VideoUtils.local_file_to_video(
file_path=img_path,
prompt=current_prompt,
duration=str(video_duration),
model_type=model_type,
timeout=300, # 5分钟超时
interval=3, # 3秒检查一次
save_path=output_video_dir,
progress_callback=add_log_entry_func
)
if result and result.get('status'):
success_count += 1
video_path = result.get('video_path', '')
add_log_entry_func(f" ✓ 视频生成成功: {os.path.basename(video_path)}")
else:
failed_count += 1
error_msg = result.get('msg', '未知错误') if result else '处理失败'
add_log_entry_func(f" ✗ 视频生成失败: {error_msg}")
except Exception as e:
failed_count += 1
add_log_entry_func(f" ✗ 处理图片时出错: {str(e)}")
# 输出最终统计
add_log_entry_func("=" * 50)
add_log_entry_func(f"处理完成!成功: {success_count}, 失败: {failed_count}")
if success_count > 0:
add_log_entry_func(f"生成的视频已保存到: {output_video_dir}")
except Exception as e:
add_log_entry_func(f"处理过程中发生错误: {str(e)}")
finally:
add_log_entry_func("任务处理结束。")
--- 核心处理函数结束 ---
class App: def init(self, root): self.root = root self.root.title("视频生成工具") self.root.geometry("700x650") self.root.resizable(False, False) # --- 变量 --- self.image_folder_var = tk.StringVar() self.output_video_dir_var = tk.StringVar() self.video_duration_var = tk.IntVar(value=5) self.model_type_var = tk.StringVar(value='lite') # --- 主框架 --- main_frame = ttk.Frame(root, padding="10") main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) root.grid_rowconfigure(0, weight=1) root.grid_columnconfigure(0, weight=1)
# --- 组件 ---
row_idx = 0
general_pady = 5
# 1. 选择图片文件夹
ttk.Label(main_frame, text="选择图片文件夹:").grid(row=row_idx, column=0, sticky=tk.W, pady=(general_pady, 2))
self.image_folder_entry = ttk.Entry(main_frame, textvariable=self.image_folder_var, width=50)
self.image_folder_entry.grid(row=row_idx, column=1, sticky=(tk.W, tk.E), pady=(general_pady, 2), padx=5)
ttk.Button(main_frame, text="浏览...", command=self.select_image_folder).grid(row=row_idx, column=2,
sticky=tk.E,
pady=(general_pady, 2))
row_idx += 1
# 2. 保存视频存储目录
ttk.Label(main_frame, text="视频保存目录:").grid(row=row_idx, column=0, sticky=tk.W, pady=general_pady)
self.output_video_dir_entry = ttk.Entry(main_frame, textvariable=self.output_video_dir_var, width=50)
self.output_video_dir_entry.grid(row=row_idx, column=1, sticky=(tk.W, tk.E), pady=general_pady, padx=5)
ttk.Button(main_frame, text="浏览...", command=self.select_output_video_dir).grid(row=row_idx, column=2,
sticky=tk.E,
pady=general_pady)
row_idx += 1
# 3. 生图提示词
ttk.Label(main_frame, text="生成视频提示词:").grid(row=row_idx, column=0, sticky=(tk.W, tk.NW),
pady=(general_pady, 2))
self.prompt_text = scrolledtext.ScrolledText(main_frame, wrap=tk.WORD, width=60, height=8)
self.prompt_text.grid(row=row_idx, column=1, columnspan=2, sticky=(tk.W, tk.E), pady=(general_pady, 2), padx=5)
# 设置默认提示词
self.prompt_text.insert("1.0", default_prompt_str)
row_idx += 1
# 4. 视频时长
duration_frame = ttk.LabelFrame(main_frame, text="视频时长")
duration_frame.grid(row=row_idx, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=general_pady, padx=5)
ttk.Radiobutton(duration_frame, text="5秒", variable=self.video_duration_var, value=5).pack(side=tk.LEFT,
padx=10, pady=5)
ttk.Radiobutton(duration_frame, text="10秒", variable=self.video_duration_var, value=10).pack(side=tk.LEFT,
padx=10, pady=5)
row_idx += 1
# 5. 模型类型
model_frame = ttk.LabelFrame(main_frame, text="模型类型")
model_frame.grid(row=row_idx, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=general_pady, padx=5)
ttk.Radiobutton(model_frame, text="lite", variable=self.model_type_var, value='lite').pack(side=tk.LEFT,
padx=10, pady=5)
ttk.Radiobutton(model_frame, text="pro", variable=self.model_type_var, value='pro').pack(side=tk.LEFT,
padx=10, pady=5)
row_idx += 1
# 6. 运行按钮 (新位置)
self.run_button = ttk.Button(main_frame, text="运行", command=self.start_processing_thread)
# 增加上下边距,使其与上下组件有明显区隔
self.run_button.grid(row=row_idx, column=0, columnspan=3, pady=(general_pady + 10, general_pady + 5))
row_idx += 1
# 7. 运行日志 Label
ttk.Label(main_frame, text="运行日志:").grid(row=row_idx, column=0, sticky=tk.W,
pady=(general_pady, 0)) # 调整pady使其靠近下方的Text
row_idx += 1
# 8. 运行日志 ScrolledText
self.log_text = scrolledtext.ScrolledText(main_frame, wrap=tk.WORD, width=70, height=15, state='disabled')
self.log_text.grid(row=row_idx, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(2, general_pady),
padx=5)
main_frame.grid_rowconfigure(row_idx, weight=1) # 让日志区域可以扩展
# row_idx += 1 # 最后一个组件后面不需要再增加row_idx,除非还要添加东西
# 配置列的伸缩
main_frame.grid_columnconfigure(1, weight=1)
def select_image_folder(self):
folder_selected = filedialog.askdirectory()
if folder_selected:
self.image_folder_var.set(folder_selected)
self.add_log_entry(f"选择图片文件夹: {folder_selected}")
def select_output_video_dir(self):
folder_selected = filedialog.askdirectory()
if folder_selected:
self.output_video_dir_var.set(folder_selected)
self.add_log_entry(f"选择视频保存目录: {folder_selected}")
def add_log_entry(self, message):
self.log_text.configure(state='normal')
self.log_text.insert(tk.END, message + "\n")
self.log_text.configure(state='disabled')
self.log_text.see(tk.END)
def start_processing_thread(self):
image_folder = self.image_folder_var.get()
prompt = self.prompt_text.get("1.0", tk.END).strip()
output_video_dir = self.output_video_dir_var.get()
video_duration = self.video_duration_var.get()
model_type = self.model_type_var.get()
other_params = {}
if not image_folder:
messagebox.showerror("错误", "请选择图片文件夹!")
return
if not prompt:
messagebox.showerror("错误", "请输入生图提示词!")
return
if not output_video_dir:
messagebox.showerror("错误", "请选择视频保存目录!")
return
self.add_log_entry("=" * 30)
self.run_button.config(state="disabled")
thread = threading.Thread(target=self.run_script_in_background,
args=(image_folder, prompt, output_video_dir, video_duration, model_type, other_params))
thread.daemon = True
thread.start()
def run_script_in_background(self, image_folder, prompt, output_video_dir, video_duration, model_type, other_params):
try:
run_actual_script_logic(
image_folder,
prompt,
output_video_dir,
video_duration,
model_type,
self.add_log_entry,
other_params
)
except Exception as e:
self.add_log_entry(f"线程中发生未捕获的错误: {e}")
self.root.after(0, lambda: messagebox.showerror("线程错误", f"处理过程中发生错误:\n{e}"))
finally:
self.root.after(0, self.enable_run_button)
def enable_run_button(self):
self.run_button.config(state="normal")
if name == "main": root = tk.Tk() app = App(root) root.mainloop()