import glob import mimetypes import os import threading import time import tkinter as tk from tkinter import ttk, filedialog, scrolledtext, messagebox import requests from loguru import logger from qcloud_cos import CosConfig from qcloud_cos import CosS3Client cos_bucket_name = 'sucai-1324682537' cos_secret_id = 'AKIDsrihIyjZOBsjimt8TsN8yvv1AMh5dB44' cos_secret_key = 'CPZcxdk6W39Jd4cGY95wvupoyMd0YFqW' cos_region = 'ap-shanghai' api_key = '21575c22-14aa-40ca-8aa8-f00ca27a3a17' default_prompt = [ "女人扭动身体向摄像机展示身材,一只手撩了一下头发,镜头从左向右移动并放大画面", # "女人扭动身体向摄像机展示身材,一只手撩了一下头发后放在了裤子上,镜头从左向右移动", "时尚模特抬头自信的展示身材,扭动身体,一只手放在了头上,镜头逐渐放大聚焦在了下衣上", "女人扭动身体向摄像机展示身材,一只手撩了一下头发后放在了裤子上,镜头从左向右移动", "自信步伐跟拍模特,模特步伐自信地同时行走,镜头紧紧跟随。抬起手捋一捋头发。传递出自信与时尚的气息。", "女生两只手捏着拳头轻盈的左右摇摆跳舞,动作幅度不大,然后把手摊开放在胸口再做出像popping心脏跳动的动作,左右身体都要非常协调", "一个年轻女子自信地在相机前展示了她优美的身材,以自然的流体动作自由地摇摆,左手撩了一下头发之后停在了胸前。一个美女自拍跳舞扭动身体的视频,手从下到上最后放在胸前,妩媚的表情", "美女向后退了一步站在那里展示服装,双手轻轻提了一下裤子两侧,镜头从上到下逐渐放大", "女人低头看向裤子,向镜头展示身材,一只手放在了头上做pose动作", "美女向后退了一步站在那里展示服装,低头并用手抚摸裤子,镜头从上到下逐渐放大", "美女向后退了一步站在那里展示服装,双手从上到下整理衣服,自然扭动身体,自信的表情" ] default_prompt_str = '\n'.join(default_prompt) # pyinstaller -F -w --name seed_video jm_video_ui.py class VideoUtils: @staticmethod def download_video(video_url: str, save_path: str) -> str: try: file_name = f'{int(time.time() * 1000)}.mp4' response = requests.get(video_url, stream=True) full_path = os.path.join(save_path, file_name) with open(full_path, 'wb') as f: for chunk in response.iter_content(chunk_size=1024 * 1024 * 5): if chunk: f.write(chunk) return full_path except Exception as e: logger.error(e) pass @staticmethod def upload_file_to_cos(file_path: str, remove_src_file: bool = False): resp_data = {'status': True, 'data': '', 'msg': ''} mime_type, _ = mimetypes.guess_type(file_path) category = mime_type.split('/')[0] f_name = os.path.basename(file_path) suffix = f_name.split('.')[-1] real_name = f'{int(time.time() * 1000)}.{suffix}' try: object_key = f'tk/{category}/{real_name}' config = CosConfig(Region=cos_region, SecretId=cos_secret_id, SecretKey=cos_secret_key) client = CosS3Client(config) _ = client.upload_file( Bucket=cos_bucket_name, Key=object_key, LocalFilePath=file_path, EnableMD5=False, progress_callback=None ) url = f'https://{cos_bucket_name}.cos.ap-shanghai.myqcloud.com/{object_key}' resp_data['data'] = url resp_data['msg'] = '上传成功' except Exception as e: logger.error(e) resp_data['status'] = False resp_data['msg'] = str(e) finally: if remove_src_file: os.remove(file_path) return resp_data @staticmethod def submit_task(prompt: str, img_url: str, duration: str = '5', model_type:str='lite'): """ :param prompt: 生成视频的提示词 :param img_url: :param duration: :return: """ if model_type == 'lite': model = 'doubao-seedance-1-0-lite-i2v-250428' resolution = '720p' else: model = 'doubao-seedance-1-0-pro-250528' resolution = '1080p' if duration not in ('5', '10'): logger.error('Duration must be either 5 or 10') try: headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}', } json_data = { 'model': model, 'content': [ { 'type': 'text', 'text': f'{prompt} --resolution {resolution} --dur {duration} --camerafixed false', }, { 'type': 'image_url', 'image_url': { 'url': img_url, }, }, ], } response = requests.post('https://ark.cn-beijing.volces.com/api/v3/contents/generations/tasks', headers=headers, json=json_data) # 检查HTTP状态码 if response.status_code != 200: error_msg = f"API请求失败,状态码: {response.status_code}, 响应: {response.text}" logger.error(error_msg) return {"status": False, 'msg': error_msg} resp_json = response.json() # 检查响应中是否包含id字段 if 'id' not in resp_json: error_msg = f"API响应缺少id字段,响应内容: {resp_json}" logger.error(error_msg) return {"status": False, 'msg': error_msg} job_id = resp_json['id'] return {"data": job_id, 'status': True} except Exception as e: logger.error(e) return {"status": False, 'msg': str(e)} @staticmethod def query_task_result(job_id, timeout: int = 180, interval: int = 2, progress_callback=None): def query_status(t_id: str): resp_dict = {'status': False, 'data': None, 'msg': ''} try: headers = { 'Content-Type': 'application/json', 'Authorization': f'Bearer {api_key}', } response = requests.get(f'https://ark.cn-beijing.volces.com/api/v3/contents/generations/tasks/{t_id}', headers=headers) resp_json = response.json() resp_dict['status'] = resp_json['status'] == 'succeeded' resp_dict['msg'] = resp_json['status'] resp_dict['data'] = resp_json['content']['video_url'] if 'content' in resp_json else None except Exception as e: logger.error(e) resp_dict['msg'] = str(e) finally: return resp_dict end = time.time() + timeout final_result = {"status": False, "data": None, "msg": ""} success = False wait_count = 0 # 添加开始查询的日志 if progress_callback: progress_callback(f" 开始查询任务状态,任务ID: {job_id}") while time.time() < end: tmp_data = query_status(job_id) if tmp_data['status']: final_result['status'] = True final_result['data'] = tmp_data['data'] final_result['msg'] = 'succeeded' success = True if progress_callback: progress_callback(f" ✓ 视频生成完成!") break elif tmp_data['msg'] == 'running': wait_count += 1 elapsed = wait_count * interval remaining = max(0, timeout - elapsed) progress_msg = f" ⏳ 任务运行中,已等待{elapsed}秒,预计剩余{remaining}秒..." logger.info(progress_msg) if progress_callback: progress_callback(progress_msg) time.sleep(interval) elif tmp_data['msg'] == 'failed': final_result['msg'] = '任务执行失败' if progress_callback: progress_callback(f" ✗ 任务执行失败") break elif tmp_data['msg'] == 'pending' or tmp_data['msg'] == 'queued': wait_count += 1 elapsed = wait_count * interval remaining = max(0, timeout - elapsed) progress_msg = f" ⏳ 任务排队中,已等待{elapsed}秒,预计剩余{remaining}秒..." logger.info(progress_msg) if progress_callback: progress_callback(progress_msg) time.sleep(interval) else: # 其他未知状态继续等待 wait_count += 1 logger.info(f" 未知状态: {tmp_data['msg']},继续等待...") if progress_callback: progress_callback(f" ❔ 状态: {tmp_data['msg']},继续等待...") time.sleep(interval) if not success and final_result['msg'] == '': final_result['msg'] = '任务超时' if progress_callback: progress_callback(f" ⏰ 任务查询超时({timeout}秒)") return final_result @staticmethod def local_file_to_video(file_path: str, prompt: str, duration: str = '5', model_type: str = 'lite', timeout: int = 180, interval: int = 2, save_path: str = None, progress_callback=None): """ 将本地图片文件转换为视频 :param file_path: 本地图片文件路径 :param prompt: 视频生成提示词 :param duration: 视频时长 ('5' 或 '10') :param model_type: 模型类型 ('lite' 或 'pro') :param timeout: 任务超时时间(秒) :param interval: 查询间隔(秒) :param save_path: 视频保存目录 :return: {'status': bool, 'video_path': str, 'msg': str} """ result = {'status': False, 'video_path': '', 'msg': ''} try: # 检查文件是否存在 if not os.path.exists(file_path): result['msg'] = f'文件不存在: {file_path}' logger.error(result['msg']) return result # 步骤1: 上传图片到COS if progress_callback: progress_callback(" 📤 正在上传图片到云存储...") logger.info(f"正在上传图片到COS: {file_path}") cos_dict = VideoUtils.upload_file_to_cos(file_path) if not cos_dict['status']: result['msg'] = f"上传图片失败: {cos_dict['msg']}" logger.error(result['msg']) return result img_url = cos_dict['data'] if progress_callback: progress_callback(" ✓ 图片上传成功") logger.info(f"图片上传成功: {img_url}") # 步骤2: 提交视频生成任务 if progress_callback: progress_callback(" 🚀 正在提交视频生成任务...") logger.info("正在提交视频生成任务...") task_dict = VideoUtils.submit_task(prompt, img_url, duration, model_type) if not task_dict['status']: result['msg'] = f"提交任务失败: {task_dict['msg']}" logger.error(result['msg']) return result task_id = task_dict['data'] if progress_callback: progress_callback(f" ✓ 任务提交成功,任务ID: {task_id}") logger.info(f"任务提交成功,任务ID: {task_id}") # 步骤3: 查询任务结果 if progress_callback: progress_callback(" ⏳ 正在等待视频生成完成...") logger.info("正在等待视频生成完成...") status_dict = VideoUtils.query_task_result(task_id, timeout=timeout, interval=interval, progress_callback=progress_callback) if not status_dict['status']: result['msg'] = f"视频生成失败: {status_dict['msg']}" logger.error(result['msg']) return result video_url = status_dict['data'] logger.info(f"视频生成成功: {video_url}") # 步骤4: 下载视频到本地 if save_path: if progress_callback: progress_callback(" 📥 正在下载视频到本地...") logger.info("正在下载视频到本地...") video_path = VideoUtils.download_video(video_url, save_path) if video_path and os.path.exists(video_path): result['status'] = True result['video_path'] = video_path result['msg'] = '视频生成并下载成功' if progress_callback: progress_callback(f" ✓ 视频下载成功: {os.path.basename(video_path)}") logger.info(f"视频下载成功: {video_path}") else: result['msg'] = '视频下载失败' if progress_callback: progress_callback(" ✗ 视频下载失败") logger.error(result['msg']) else: result['status'] = True result['video_path'] = video_url result['msg'] = '视频生成成功' except Exception as e: result['msg'] = f'处理过程中发生异常: {str(e)}' logger.error(result['msg']) return result def run_actual_script_logic(image_folder, prompt, output_video_dir, video_duration, model_type, add_log_entry_func, other_params=None): add_log_entry_func(f"开始处理任务...") add_log_entry_func(f" 图片文件夹: {image_folder}") add_log_entry_func(f" 提示词: \n{prompt[:100]}{'...' if len(prompt) > 100 else ''}") add_log_entry_func(f" 视频保存目录: {output_video_dir}") add_log_entry_func(f" 视频时长: {video_duration}秒") add_log_entry_func(f" 模型类型: {model_type}") if other_params: for key, value in other_params.items(): add_log_entry_func(f" 额外参数 {key}: {value}") try: # 筛选图片文件 # img_extensions = ['.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff', '.webp'] img_list = glob.glob(f'{image_folder}/*') add_log_entry_func(f'目录下找到 {len(img_list)} 张图片') if len(img_list) == 0: add_log_entry_func("警告: 未找到任何图片文件") return # 解析提示词列表 prompt_lines = [line.strip() for line in prompt.split('\n') if line.strip()] if not prompt_lines: add_log_entry_func("错误: 提示词不能为空") return add_log_entry_func(f'解析到 {len(prompt_lines)} 个提示词,将循环使用') # 确保输出目录存在 if not os.path.exists(output_video_dir): os.makedirs(output_video_dir) add_log_entry_func(f"创建输出目录: {output_video_dir}") success_count = 0 failed_count = 0 # 处理每个图片文件 for i, img_path in enumerate(img_list): img_name = os.path.basename(img_path) # 使用循环提示词 current_prompt = prompt_lines[i % len(prompt_lines)] add_log_entry_func(f"[{i + 1}/{len(img_list)}] 处理图片: {img_name}") add_log_entry_func( f" 使用提示词[{i % len(prompt_lines) + 1}]: {current_prompt[:50]}{'...' if len(current_prompt) > 50 else ''}") try: # 调用视频生成功能 result = VideoUtils.local_file_to_video( file_path=img_path, prompt=current_prompt, duration=str(video_duration), model_type=model_type, timeout=300, # 5分钟超时 interval=3, # 3秒检查一次 save_path=output_video_dir, progress_callback=add_log_entry_func ) if result and result.get('status'): success_count += 1 video_path = result.get('video_path', '') add_log_entry_func(f" ✓ 视频生成成功: {os.path.basename(video_path)}") else: failed_count += 1 error_msg = result.get('msg', '未知错误') if result else '处理失败' add_log_entry_func(f" ✗ 视频生成失败: {error_msg}") except Exception as e: failed_count += 1 add_log_entry_func(f" ✗ 处理图片时出错: {str(e)}") # 输出最终统计 add_log_entry_func("=" * 50) add_log_entry_func(f"处理完成!成功: {success_count}, 失败: {failed_count}") if success_count > 0: add_log_entry_func(f"生成的视频已保存到: {output_video_dir}") except Exception as e: add_log_entry_func(f"处理过程中发生错误: {str(e)}") finally: add_log_entry_func("任务处理结束。") # --- 核心处理函数结束 --- class App: def __init__(self, root): self.root = root self.root.title("视频生成工具") self.root.geometry("700x650") self.root.resizable(False, False) # --- 变量 --- self.image_folder_var = tk.StringVar() self.output_video_dir_var = tk.StringVar() self.video_duration_var = tk.IntVar(value=5) self.model_type_var = tk.StringVar(value='lite') # --- 主框架 --- main_frame = ttk.Frame(root, padding="10") main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S)) root.grid_rowconfigure(0, weight=1) root.grid_columnconfigure(0, weight=1) # --- 组件 --- row_idx = 0 general_pady = 5 # 1. 选择图片文件夹 ttk.Label(main_frame, text="选择图片文件夹:").grid(row=row_idx, column=0, sticky=tk.W, pady=(general_pady, 2)) self.image_folder_entry = ttk.Entry(main_frame, textvariable=self.image_folder_var, width=50) self.image_folder_entry.grid(row=row_idx, column=1, sticky=(tk.W, tk.E), pady=(general_pady, 2), padx=5) ttk.Button(main_frame, text="浏览...", command=self.select_image_folder).grid(row=row_idx, column=2, sticky=tk.E, pady=(general_pady, 2)) row_idx += 1 # 2. 保存视频存储目录 ttk.Label(main_frame, text="视频保存目录:").grid(row=row_idx, column=0, sticky=tk.W, pady=general_pady) self.output_video_dir_entry = ttk.Entry(main_frame, textvariable=self.output_video_dir_var, width=50) self.output_video_dir_entry.grid(row=row_idx, column=1, sticky=(tk.W, tk.E), pady=general_pady, padx=5) ttk.Button(main_frame, text="浏览...", command=self.select_output_video_dir).grid(row=row_idx, column=2, sticky=tk.E, pady=general_pady) row_idx += 1 # 3. 生图提示词 ttk.Label(main_frame, text="生成视频提示词:").grid(row=row_idx, column=0, sticky=(tk.W, tk.NW), pady=(general_pady, 2)) self.prompt_text = scrolledtext.ScrolledText(main_frame, wrap=tk.WORD, width=60, height=8) self.prompt_text.grid(row=row_idx, column=1, columnspan=2, sticky=(tk.W, tk.E), pady=(general_pady, 2), padx=5) # 设置默认提示词 self.prompt_text.insert("1.0", default_prompt_str) row_idx += 1 # 4. 视频时长 duration_frame = ttk.LabelFrame(main_frame, text="视频时长") duration_frame.grid(row=row_idx, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=general_pady, padx=5) ttk.Radiobutton(duration_frame, text="5秒", variable=self.video_duration_var, value=5).pack(side=tk.LEFT, padx=10, pady=5) ttk.Radiobutton(duration_frame, text="10秒", variable=self.video_duration_var, value=10).pack(side=tk.LEFT, padx=10, pady=5) row_idx += 1 # 5. 模型类型 model_frame = ttk.LabelFrame(main_frame, text="模型类型") model_frame.grid(row=row_idx, column=0, columnspan=3, sticky=(tk.W, tk.E), pady=general_pady, padx=5) ttk.Radiobutton(model_frame, text="lite", variable=self.model_type_var, value='lite').pack(side=tk.LEFT, padx=10, pady=5) ttk.Radiobutton(model_frame, text="pro", variable=self.model_type_var, value='pro').pack(side=tk.LEFT, padx=10, pady=5) row_idx += 1 # 6. 运行按钮 (新位置) self.run_button = ttk.Button(main_frame, text="运行", command=self.start_processing_thread) # 增加上下边距,使其与上下组件有明显区隔 self.run_button.grid(row=row_idx, column=0, columnspan=3, pady=(general_pady + 10, general_pady + 5)) row_idx += 1 # 7. 运行日志 Label ttk.Label(main_frame, text="运行日志:").grid(row=row_idx, column=0, sticky=tk.W, pady=(general_pady, 0)) # 调整pady使其靠近下方的Text row_idx += 1 # 8. 运行日志 ScrolledText self.log_text = scrolledtext.ScrolledText(main_frame, wrap=tk.WORD, width=70, height=15, state='disabled') self.log_text.grid(row=row_idx, column=0, columnspan=3, sticky=(tk.W, tk.E, tk.N, tk.S), pady=(2, general_pady), padx=5) main_frame.grid_rowconfigure(row_idx, weight=1) # 让日志区域可以扩展 # row_idx += 1 # 最后一个组件后面不需要再增加row_idx,除非还要添加东西 # 配置列的伸缩 main_frame.grid_columnconfigure(1, weight=1) def select_image_folder(self): folder_selected = filedialog.askdirectory() if folder_selected: self.image_folder_var.set(folder_selected) self.add_log_entry(f"选择图片文件夹: {folder_selected}") def select_output_video_dir(self): folder_selected = filedialog.askdirectory() if folder_selected: self.output_video_dir_var.set(folder_selected) self.add_log_entry(f"选择视频保存目录: {folder_selected}") def add_log_entry(self, message): self.log_text.configure(state='normal') self.log_text.insert(tk.END, message + "\n") self.log_text.configure(state='disabled') self.log_text.see(tk.END) def start_processing_thread(self): image_folder = self.image_folder_var.get() prompt = self.prompt_text.get("1.0", tk.END).strip() output_video_dir = self.output_video_dir_var.get() video_duration = self.video_duration_var.get() model_type = self.model_type_var.get() other_params = {} if not image_folder: messagebox.showerror("错误", "请选择图片文件夹!") return if not prompt: messagebox.showerror("错误", "请输入生图提示词!") return if not output_video_dir: messagebox.showerror("错误", "请选择视频保存目录!") return self.add_log_entry("=" * 30) self.run_button.config(state="disabled") thread = threading.Thread(target=self.run_script_in_background, args=(image_folder, prompt, output_video_dir, video_duration, model_type, other_params)) thread.daemon = True thread.start() def run_script_in_background(self, image_folder, prompt, output_video_dir, video_duration, model_type, other_params): try: run_actual_script_logic( image_folder, prompt, output_video_dir, video_duration, model_type, self.add_log_entry, other_params ) except Exception as e: self.add_log_entry(f"线程中发生未捕获的错误: {e}") self.root.after(0, lambda: messagebox.showerror("线程错误", f"处理过程中发生错误:\n{e}")) finally: self.root.after(0, self.enable_run_button) def enable_run_button(self): self.run_button.config(state="normal") if __name__ == "__main__": root = tk.Tk() app = App(root) root.mainloop()