569 lines
25 KiB
Markdown
569 lines
25 KiB
Markdown
import glob
|
||
import mimetypes
|
||
import os
|
||
import threading
|
||
import time
|
||
import tkinter as tk
|
||
from tkinter import ttk, filedialog, scrolledtext, messagebox
|
||
|
||
import requests
|
||
from loguru import logger
|
||
from qcloud_cos import CosConfig
|
||
from qcloud_cos import CosS3Client
|
||
|
||
cos_bucket_name = 'sucai-1324682537'
|
||
cos_secret_id = 'AKIDsrihIyjZOBsjimt8TsN8yvv1AMh5dB44'
|
||
cos_secret_key = 'CPZcxdk6W39Jd4cGY95wvupoyMd0YFqW'
|
||
cos_region = 'ap-shanghai'
|
||
|
||
api_key = '21575c22-14aa-40ca-8aa8-f00ca27a3a17'
|
||
|
||
default_prompt = [
|
||
"女人扭动身体向摄像机展示身材,一只手撩了一下头发,镜头从左向右移动并放大画面",
|
||
# "女人扭动身体向摄像机展示身材,一只手撩了一下头发后放在了裤子上,镜头从左向右移动",
|
||
"时尚模特抬头自信的展示身材,扭动身体,一只手放在了头上,镜头逐渐放大聚焦在了下衣上",
|
||
"女人扭动身体向摄像机展示身材,一只手撩了一下头发后放在了裤子上,镜头从左向右移动",
|
||
"自信步伐跟拍模特,模特步伐自信地同时行走,镜头紧紧跟随。抬起手捋一捋头发。传递出自信与时尚的气息。",
|
||
"女生两只手捏着拳头轻盈的左右摇摆跳舞,动作幅度不大,然后把手摊开放在胸口再做出像popping心脏跳动的动作,左右身体都要非常协调",
|
||
"一个年轻女子自信地在相机前展示了她优美的身材,以自然的流体动作自由地摇摆,左手撩了一下头发之后停在了胸前。一个美女自拍跳舞扭动身体的视频,手从下到上最后放在胸前,妩媚的表情",
|
||
"美女向后退了一步站在那里展示服装,双手轻轻提了一下裤子两侧,镜头从上到下逐渐放大",
|
||
"女人低头看向裤子,向镜头展示身材,一只手放在了头上做pose动作",
|
||
"美女向后退了一步站在那里展示服装,低头并用手抚摸裤子,镜头从上到下逐渐放大",
|
||
"美女向后退了一步站在那里展示服装,双手从上到下整理衣服,自然扭动身体,自信的表情"
|
||
]
|
||
|
||
default_prompt_str = '\n'.join(default_prompt)
|
||
|
||
# pyinstaller -F -w --name seed_video jm_video_ui.py
|
||
class VideoUtils:
|
||
|
||
@staticmethod
|
||
def download_video(video_url: str, save_path: str) -> str:
|
||
try:
|
||
file_name = f'{int(time.time() * 1000)}.mp4'
|
||
response = requests.get(video_url, stream=True)
|
||
full_path = os.path.join(save_path, file_name)
|
||
with open(full_path, 'wb') as f:
|
||
for chunk in response.iter_content(chunk_size=1024 * 1024 * 5):
|
||
if chunk:
|
||
f.write(chunk)
|
||
return full_path
|
||
except Exception as e:
|
||
logger.error(e)
|
||
pass
|
||
|
||
@staticmethod
|
||
def upload_file_to_cos(file_path: str, remove_src_file: bool = False):
|
||
resp_data = {'status': True, 'data': '', 'msg': ''}
|
||
mime_type, _ = mimetypes.guess_type(file_path)
|
||
category = mime_type.split('/')[0]
|
||
f_name = os.path.basename(file_path)
|
||
suffix = f_name.split('.')[-1]
|
||
real_name = f'{int(time.time() * 1000)}.{suffix}'
|
||
try:
|
||
object_key = f'tk/{category}/{real_name}'
|
||
config = CosConfig(Region=cos_region, SecretId=cos_secret_id, SecretKey=cos_secret_key)
|
||
client = CosS3Client(config)
|
||
_ = client.upload_file(
|
||
Bucket=cos_bucket_name,
|
||
Key=object_key,
|
||
LocalFilePath=file_path,
|
||
EnableMD5=False,
|
||
progress_callback=None
|
||
)
|
||
url = f'https://{cos_bucket_name}.cos.ap-shanghai.myqcloud.com/{object_key}'
|
||
resp_data['data'] = url
|
||
resp_data['msg'] = '上传成功'
|
||
except Exception as e:
|
||
logger.error(e)
|
||
resp_data['status'] = False
|
||
resp_data['msg'] = str(e)
|
||
finally:
|
||
if remove_src_file:
|
||
os.remove(file_path)
|
||
return resp_data
|
||
|
||
@staticmethod
|
||
def submit_task(prompt: str, img_url: str, duration: str = '5', model_type:str='lite'):
|
||
"""
|
||
:param prompt: 生成视频的提示词
|
||
:param img_url:
|
||
:param duration:
|
||
:return:
|
||
"""
|
||
if model_type == 'lite':
|
||
model = 'doubao-seedance-1-0-lite-i2v-250428'
|
||
resolution = '720p'
|
||
else:
|
||
model = 'doubao-seedance-1-0-pro-250528'
|
||
resolution = '1080p'
|
||
if duration not in ('5', '10'):
|
||
logger.error('Duration must be either 5 or 10')
|
||
try:
|
||
headers = {
|
||
'Content-Type': 'application/json',
|
||
'Authorization': f'Bearer {api_key}',
|
||
}
|
||
|
||
json_data = {
|
||
'model': model,
|
||
'content': [
|
||
{
|
||
'type': 'text',
|
||
'text': f'{prompt} --resolution {resolution} --dur {duration} --camerafixed false',
|
||
},
|
||
{
|
||
'type': 'image_url',
|
||
'image_url': {
|
||
'url': img_url,
|
||
},
|
||
},
|
||
],
|
||
}
|
||
|
||
response = requests.post('https://ark.cn-beijing.volces.com/api/v3/contents/generations/tasks',
|
||
headers=headers,
|
||
json=json_data)
|
||
|
||
# 检查HTTP状态码
|
||
if response.status_code != 200:
|
||
error_msg = f"API请求失败,状态码: {response.status_code}, 响应: {response.text}"
|
||
logger.error(error_msg)
|
||
return {"status": False, 'msg': error_msg}
|
||
|
||
resp_json = response.json()
|
||
|
||
# 检查响应中是否包含id字段
|
||
if 'id' not in resp_json:
|
||
error_msg = f"API响应缺少id字段,响应内容: {resp_json}"
|
||
logger.error(error_msg)
|
||
return {"status": False, 'msg': error_msg}
|
||
|
||
job_id = resp_json['id']
|
||
return {"data": job_id, 'status': True}
|
||
except Exception as e:
|
||
logger.error(e)
|
||
return {"status": False, 'msg': str(e)}
|
||
|
||
@staticmethod
|
||
def query_task_result(job_id, timeout: int = 180, interval: int = 2, progress_callback=None):
|
||
def query_status(t_id: str):
|
||
resp_dict = {'status': False, 'data': None, 'msg': ''}
|
||
try:
|
||
headers = {
|
||
'Content-Type': 'application/json',
|
||
'Authorization': f'Bearer {api_key}',
|
||
}
|
||
|
||
response = requests.get(f'https://ark.cn-beijing.volces.com/api/v3/contents/generations/tasks/{t_id}',
|
||
headers=headers)
|
||
resp_json = response.json()
|
||
resp_dict['status'] = resp_json['status'] == 'succeeded'
|
||
resp_dict['msg'] = resp_json['status']
|
||
resp_dict['data'] = resp_json['content']['video_url'] if 'content' in resp_json else None
|
||
except Exception as e:
|
||
logger.error(e)
|
||
resp_dict['msg'] = str(e)
|
||
finally:
|
||
return resp_dict
|
||
|
||
end = time.time() + timeout
|
||
final_result = {"status": False, "data": None, "msg": ""}
|
||
success = False
|
||
wait_count = 0
|
||
|
||
# 添加开始查询的日志
|
||
if progress_callback:
|
||
progress_callback(f" 开始查询任务状态,任务ID: {job_id}")
|
||
|
||
while time.time() < end:
|
||
tmp_data = query_status(job_id)
|
||
if tmp_data['status']:
|
||
final_result['status'] = True
|
||
final_result['data'] = tmp_data['data']
|
||
final_result['msg'] = 'succeeded'
|
||
success = True
|
||
if progress_callback:
|
||
progress_callback(f" ✓ 视频生成完成!")
|
||
break
|
||
elif tmp_data['msg'] == 'running':
|
||
wait_count += 1
|
||
elapsed = wait_count * interval
|
||
remaining = max(0, timeout - elapsed)
|
||
progress_msg = f" ⏳ 任务运行中,已等待{elapsed}秒,预计剩余{remaining}秒..."
|
||
logger.info(progress_msg)
|
||
if progress_callback:
|
||
progress_callback(progress_msg)
|
||
time.sleep(interval)
|
||
elif tmp_data['msg'] == 'failed':
|
||
final_result['msg'] = '任务执行失败'
|
||
if progress_callback:
|
||
progress_callback(f" ✗ 任务执行失败")
|
||
break
|
||
elif tmp_data['msg'] == 'pending' or tmp_data['msg'] == 'queued':
|
||
wait_count += 1
|
||
elapsed = wait_count * interval
|
||
remaining = max(0, timeout - elapsed)
|
||
progress_msg = f" ⏳ 任务排队中,已等待{elapsed}秒,预计剩余{remaining}秒..."
|
||
logger.info(progress_msg)
|
||
if progress_callback:
|
||
progress_callback(progress_msg)
|
||
time.sleep(interval)
|
||
else:
|
||
# 其他未知状态继续等待
|
||
wait_count += 1
|
||
logger.info(f" 未知状态: {tmp_data['msg']},继续等待...")
|
||
if progress_callback:
|
||
progress_callback(f" ❔ 状态: {tmp_data['msg']},继续等待...")
|
||
time.sleep(interval)
|
||
|
||
if not success and final_result['msg'] == '':
|
||
final_result['msg'] = '任务超时'
|
||
if progress_callback:
|
||
progress_callback(f" ⏰ 任务查询超时({timeout}秒)")
|
||
return final_result
|
||
|
||
@staticmethod
|
||
def local_file_to_video(file_path: str, prompt: str, duration: str = '5', model_type: str = 'lite',
|
||
timeout: int = 180, interval: int = 2, save_path: str = None, progress_callback=None):
|
||
"""
|
||
将本地图片文件转换为视频
|
||
:param file_path: 本地图片文件路径
|
||
:param prompt: 视频生成提示词
|
||
:param duration: 视频时长 ('5' 或 '10')
|
||
:param model_type: 模型类型 ('lite' 或 'pro')
|
||
:param timeout: 任务超时时间(秒)
|
||
:param interval: 查询间隔(秒)
|
||
:param save_path: 视频保存目录
|
||
:return: {'status': bool, 'video_path': str, 'msg': str}
|
||
"""
|
||
result = {'status': False, 'video_path': '', 'msg': ''}
|
||
|
||
try:
|
||
# 检查文件是否存在
|
||
if not os.path.exists(file_path):
|
||
result['msg'] = f'文件不存在: {file_path}'
|
||
logger.error(result['msg'])
|
||
return result
|
||
|
||
# 步骤1: 上传图片到COS
|
||
if progress_callback:
|
||
progress_callback(" 📤 正在上传图片到云存储...")
|
||
logger.info(f"正在上传图片到COS: {file_path}")
|
||
cos_dict = VideoUtils.upload_file_to_cos(file_path)
|
||
if not cos_dict['status']:
|
||
result['msg'] = f"上传图片失败: {cos_dict['msg']}"
|
||
logger.error(result['msg'])
|
||
return result
|
||
|
||
img_url = cos_dict['data']
|
||
if progress_callback:
|
||
progress_callback(" ✓ 图片上传成功")
|
||
logger.info(f"图片上传成功: {img_url}")
|
||
|
||
# 步骤2: 提交视频生成任务
|
||
if progress_callback:
|
||
progress_callback(" 🚀 正在提交视频生成任务...")
|
||
logger.info("正在提交视频生成任务...")
|
||
task_dict = VideoUtils.submit_task(prompt, img_url, duration, model_type)
|
||
if not task_dict['status']:
|
||
result['msg'] = f"提交任务失败: {task_dict['msg']}"
|
||
logger.error(result['msg'])
|
||
return result
|
||
|
||
task_id = task_dict['data']
|
||
if progress_callback:
|
||
progress_callback(f" ✓ 任务提交成功,任务ID: {task_id}")
|
||
logger.info(f"任务提交成功,任务ID: {task_id}")
|
||
|
||
# 步骤3: 查询任务结果
|
||
if progress_callback:
|
||
progress_callback(" ⏳ 正在等待视频生成完成...")
|
||
logger.info("正在等待视频生成完成...")
|
||
status_dict = VideoUtils.query_task_result(task_id, timeout=timeout, interval=interval,
|
||
progress_callback=progress_callback)
|
||
if not status_dict['status']:
|
||
result['msg'] = f"视频生成失败: {status_dict['msg']}"
|
||
logger.error(result['msg'])
|
||
return result
|
||
|
||
video_url = status_dict['data']
|
||
logger.info(f"视频生成成功: {video_url}")
|
||
|
||
# 步骤4: 下载视频到本地
|
||
if save_path:
|
||
if progress_callback:
|
||
progress_callback(" 📥 正在下载视频到本地...")
|
||
logger.info("正在下载视频到本地...")
|
||
video_path = VideoUtils.download_video(video_url, save_path)
|
||
if video_path and os.path.exists(video_path):
|
||
result['status'] = True
|
||
result['video_path'] = video_path
|
||
result['msg'] = '视频生成并下载成功'
|
||
if progress_callback:
|
||
progress_callback(f" ✓ 视频下载成功: {os.path.basename(video_path)}")
|
||
logger.info(f"视频下载成功: {video_path}")
|
||
else:
|
||
result['msg'] = '视频下载失败'
|
||
if progress_callback:
|
||
progress_callback(" ✗ 视频下载失败")
|
||
logger.error(result['msg'])
|
||
else:
|
||
result['status'] = True
|
||
result['video_path'] = video_url
|
||
result['msg'] = '视频生成成功'
|
||
|
||
except Exception as e:
|
||
result['msg'] = f'处理过程中发生异常: {str(e)}'
|
||
logger.error(result['msg'])
|
||
|
||
return result
|
||
|
||
|
||
def run_actual_script_logic(image_folder, prompt, output_video_dir, video_duration, model_type, add_log_entry_func,
|
||
other_params=None):
|
||
add_log_entry_func(f"开始处理任务...")
|
||
add_log_entry_func(f" 图片文件夹: {image_folder}")
|
||
add_log_entry_func(f" 提示词: \n{prompt[:100]}{'...' if len(prompt) > 100 else ''}")
|
||
add_log_entry_func(f" 视频保存目录: {output_video_dir}")
|
||
add_log_entry_func(f" 视频时长: {video_duration}秒")
|
||
add_log_entry_func(f" 模型类型: {model_type}")
|
||
|
||
if other_params:
|
||
for key, value in other_params.items():
|
||
add_log_entry_func(f" 额外参数 {key}: {value}")
|
||
|
||
try:
|
||
# 筛选图片文件
|
||
# img_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff', '.webp']
|
||
img_list = glob.glob(f'{image_folder}/*')
|
||
add_log_entry_func(f'目录下找到 {len(img_list)} 张图片')
|
||
|
||
if len(img_list) == 0:
|
||
add_log_entry_func("警告: 未找到任何图片文件")
|
||
return
|
||
|
||
# 解析提示词列表
|
||
prompt_lines = [line.strip() for line in prompt.split('\n') if line.strip()]
|
||
if not prompt_lines:
|
||
add_log_entry_func("错误: 提示词不能为空")
|
||
return
|
||
|
||
add_log_entry_func(f'解析到 {len(prompt_lines)} 个提示词,将循环使用')
|
||
|
||
# 确保输出目录存在
|
||
if not os.path.exists(output_video_dir):
|
||
os.makedirs(output_video_dir)
|
||
add_log_entry_func(f"创建输出目录: {output_video_dir}")
|
||
|
||
success_count = 0
|
||
failed_count = 0
|
||
|
||
# 处理每个图片文件
|
||
for i, img_path in enumerate(img_list):
|
||
img_name = os.path.basename(img_path)
|
||
# 使用循环提示词
|
||
current_prompt = prompt_lines[i % len(prompt_lines)]
|
||
add_log_entry_func(f"[{i + 1}/{len(img_list)}] 处理图片: {img_name}")
|
||
add_log_entry_func(
|
||
f" 使用提示词[{i % len(prompt_lines) + 1}]: {current_prompt[:50]}{'...' if len(current_prompt) > 50 else ''}")
|
||
|
||
try:
|
||
# 调用视频生成功能
|
||
result = VideoUtils.local_file_to_video(
|
||
file_path=img_path,
|
||
prompt=current_prompt,
|
||
duration=str(video_duration),
|
||
model_type=model_type,
|
||
timeout=300, # 5分钟超时
|
||
interval=3, # 3秒检查一次
|
||
save_path=output_video_dir,
|
||
progress_callback=add_log_entry_func
|
||
)
|
||
|
||
if result and result.get('status'):
|
||
success_count += 1
|
||
video_path = result.get('video_path', '')
|
||
add_log_entry_func(f" ✓ 视频生成成功: {os.path.basename(video_path)}")
|
||
else:
|
||
failed_count += 1
|
||
error_msg = result.get('msg', '未知错误') if result else '处理失败'
|
||
add_log_entry_func(f" ✗ 视频生成失败: {error_msg}")
|
||
|
||
except Exception as e:
|
||
failed_count += 1
|
||
add_log_entry_func(f" ✗ 处理图片时出错: {str(e)}")
|
||
|
||
# 输出最终统计
|
||
add_log_entry_func("=" * 50)
|
||
add_log_entry_func(f"处理完成!成功: {success_count}, 失败: {failed_count}")
|
||
if success_count > 0:
|
||
add_log_entry_func(f"生成的视频已保存到: {output_video_dir}")
|
||
|
||
except Exception as e:
|
||
add_log_entry_func(f"处理过程中发生错误: {str(e)}")
|
||
finally:
|
||
add_log_entry_func("任务处理结束。")
|
||
|
||
|
||
# --- 核心处理函数结束 ---
|
||
|
||
|
||
class App:
|
||
def __init__(self, root):
|
||
self.root = root
|
||
self.root.title("视频生成工具")
|
||
self.root.geometry("700x650")
|
||
self.root.resizable(False, False)
|
||
# --- 变量 ---
|
||
self.image_folder_var = tk.StringVar()
|
||
self.output_video_dir_var = tk.StringVar()
|
||
self.video_duration_var = tk.IntVar(value=5)
|
||
self.model_type_var = tk.StringVar(value='lite')
|
||
# --- 主框架 ---
|
||
main_frame = ttk.Frame(root, padding="10")
|
||
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
|
||
root.grid_rowconfigure(0, weight=1)
|
||
root.grid_columnconfigure(0, weight=1)
|
||
|
||
# --- 组件 ---
|
||
row_idx = 0
|
||
general_pady = 5
|
||
|
||
# 1. 选择图片文件夹
|
||
ttk.Label(main_frame, text="选择图片文件夹:").grid(row=row_idx, column=0, sticky=tk.W, pady=(general_pady, 2))
|
||
self.image_folder_entry = ttk.Entry(main_frame, textvariable=self.image_folder_var, width=50)
|
||
self.image_folder_entry.grid(row=row_idx, column=1, sticky=(tk.W, tk.E), pady=(general_pady, 2), padx=5)
|
||
ttk.Button(main_frame, text="浏览...", command=self.select_image_folder).grid(row=row_idx, column=2,
|
||
sticky=tk.E,
|
||
pady=(general_pady, 2))
|
||
row_idx += 1
|
||
|
||
# 2. 保存视频存储目录
|
||
ttk.Label(main_frame, text="视频保存目录:").grid(row=row_idx, column=0, sticky=tk.W, pady=general_pady)
|
||
self.output_video_dir_entry = ttk.Entry(main_frame, textvariable=self.output_video_dir_var, width=50)
|
||
self.output_video_dir_entry.grid(row=row_idx, column=1, sticky=(tk.W, tk.E), pady=general_pady, padx=5)
|
||
ttk.Button(main_frame, text="浏览...", command=self.select_output_video_dir).grid(row=row_idx, column=2,
|
||
sticky=tk.E,
|
||
pady=general_pady)
|
||
row_idx += 1
|
||
|
||
# 3. 生图提示词
|
||
ttk.Label(main_frame, text="生成视频提示词:").grid(row=row_idx, column=0, sticky=(tk.W, tk.NW),
|
||
pady=(general_pady, 2))
|
||
self.prompt_text = scrolledtext.ScrolledText(main_frame, wrap=tk.WORD, width=60, height=8)
|
||
self.prompt_text.grid(row=row_idx, column=1, columnspan=2, sticky=(tk.W, tk.E), pady=(general_pady, 2), padx=5)
|
||
# 设置默认提示词
|
||
self.prompt_text.insert("1.0", default_prompt_str)
|
||
row_idx += 1
|
||
|
||
# 4. 视频时长
|
||
duration_frame = ttk.LabelFrame(main_frame, text="视频时长")
|
||
duration_frame.grid(row=row_idx, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=general_pady, padx=5)
|
||
ttk.Radiobutton(duration_frame, text="5秒", variable=self.video_duration_var, value=5).pack(side=tk.LEFT,
|
||
padx=10, pady=5)
|
||
ttk.Radiobutton(duration_frame, text="10秒", variable=self.video_duration_var, value=10).pack(side=tk.LEFT,
|
||
padx=10, pady=5)
|
||
row_idx += 1
|
||
|
||
# 5. 模型类型
|
||
model_frame = ttk.LabelFrame(main_frame, text="模型类型")
|
||
model_frame.grid(row=row_idx, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=general_pady, padx=5)
|
||
ttk.Radiobutton(model_frame, text="lite", variable=self.model_type_var, value='lite').pack(side=tk.LEFT,
|
||
padx=10, pady=5)
|
||
ttk.Radiobutton(model_frame, text="pro", variable=self.model_type_var, value='pro').pack(side=tk.LEFT,
|
||
padx=10, pady=5)
|
||
row_idx += 1
|
||
|
||
# 6. 运行按钮 (新位置)
|
||
self.run_button = ttk.Button(main_frame, text="运行", command=self.start_processing_thread)
|
||
# 增加上下边距,使其与上下组件有明显区隔
|
||
self.run_button.grid(row=row_idx, column=0, columnspan=3, pady=(general_pady + 10, general_pady + 5))
|
||
row_idx += 1
|
||
|
||
# 7. 运行日志 Label
|
||
ttk.Label(main_frame, text="运行日志:").grid(row=row_idx, column=0, sticky=tk.W,
|
||
pady=(general_pady, 0)) # 调整pady使其靠近下方的Text
|
||
row_idx += 1
|
||
|
||
# 8. 运行日志 ScrolledText
|
||
self.log_text = scrolledtext.ScrolledText(main_frame, wrap=tk.WORD, width=70, height=15, state='disabled')
|
||
self.log_text.grid(row=row_idx, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(2, general_pady),
|
||
padx=5)
|
||
main_frame.grid_rowconfigure(row_idx, weight=1) # 让日志区域可以扩展
|
||
# row_idx += 1 # 最后一个组件后面不需要再增加row_idx,除非还要添加东西
|
||
|
||
# 配置列的伸缩
|
||
main_frame.grid_columnconfigure(1, weight=1)
|
||
|
||
def select_image_folder(self):
|
||
folder_selected = filedialog.askdirectory()
|
||
if folder_selected:
|
||
self.image_folder_var.set(folder_selected)
|
||
self.add_log_entry(f"选择图片文件夹: {folder_selected}")
|
||
|
||
def select_output_video_dir(self):
|
||
folder_selected = filedialog.askdirectory()
|
||
if folder_selected:
|
||
self.output_video_dir_var.set(folder_selected)
|
||
self.add_log_entry(f"选择视频保存目录: {folder_selected}")
|
||
|
||
def add_log_entry(self, message):
|
||
self.log_text.configure(state='normal')
|
||
self.log_text.insert(tk.END, message + "\n")
|
||
self.log_text.configure(state='disabled')
|
||
self.log_text.see(tk.END)
|
||
|
||
def start_processing_thread(self):
|
||
image_folder = self.image_folder_var.get()
|
||
prompt = self.prompt_text.get("1.0", tk.END).strip()
|
||
output_video_dir = self.output_video_dir_var.get()
|
||
video_duration = self.video_duration_var.get()
|
||
model_type = self.model_type_var.get()
|
||
|
||
other_params = {}
|
||
|
||
if not image_folder:
|
||
messagebox.showerror("错误", "请选择图片文件夹!")
|
||
return
|
||
if not prompt:
|
||
messagebox.showerror("错误", "请输入生图提示词!")
|
||
return
|
||
if not output_video_dir:
|
||
messagebox.showerror("错误", "请选择视频保存目录!")
|
||
return
|
||
|
||
self.add_log_entry("=" * 30)
|
||
self.run_button.config(state="disabled")
|
||
|
||
thread = threading.Thread(target=self.run_script_in_background,
|
||
args=(image_folder, prompt, output_video_dir, video_duration, model_type, other_params))
|
||
thread.daemon = True
|
||
thread.start()
|
||
|
||
def run_script_in_background(self, image_folder, prompt, output_video_dir, video_duration, model_type, other_params):
|
||
try:
|
||
run_actual_script_logic(
|
||
image_folder,
|
||
prompt,
|
||
output_video_dir,
|
||
video_duration,
|
||
model_type,
|
||
self.add_log_entry,
|
||
other_params
|
||
)
|
||
except Exception as e:
|
||
self.add_log_entry(f"线程中发生未捕获的错误: {e}")
|
||
self.root.after(0, lambda: messagebox.showerror("线程错误", f"处理过程中发生错误:\n{e}"))
|
||
finally:
|
||
self.root.after(0, self.enable_run_button)
|
||
|
||
def enable_run_button(self):
|
||
self.run_button.config(state="normal")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
root = tk.Tk()
|
||
app = App(root)
|
||
root.mainloop()
|