#!/usr/bin/env python3 """ 测试 get_batch_task_detail API 的简单脚本 """ import sqlite3 import json import uuid from datetime import datetime, timezone def create_test_data(): """创建测试数据""" # 连接到数据库 db_path = r"C:\Users\imeep\AppData\Roaming\mixvideo\mixvideoV2.db" conn = sqlite3.connect(db_path) cursor = conn.cursor() try: # 创建一个测试批量任务 batch_id = f"test_batch_{uuid.uuid4().hex[:8]}" workflow_name = "test_workflow" total_count = 5 print(f"创建测试批量任务: {batch_id}") # 插入批量任务 cursor.execute(""" INSERT INTO uni_comfyui_batch_task (batch_id, workflow_name, total_count, completed_count, failed_count, status, created_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, (batch_id, workflow_name, total_count, 3, 1, 'running', datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S.%f'))) # 创建子任务 tasks = [] for i in range(total_count): workflow_run_id = f"run_{uuid.uuid4().hex[:8]}" if i < 3: # 前3个任务完成 status = 'completed' started_at = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S.%f') completed_at = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S.%f') elif i == 3: # 第4个任务失败 status = 'failed' started_at = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S.%f') completed_at = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S.%f') else: # 第5个任务运行中 status = 'running' started_at = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S.%f') completed_at = None # 插入任务 cursor.execute(""" INSERT INTO uni_comfyui_task (workflow_run_id, workflow_name, task_type, status, input_params, result_data, error_message, created_at, started_at, completed_at, server_url) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( workflow_run_id, workflow_name, 'batch', status, json.dumps({"test_param": f"value_{i}"}), json.dumps({"output": f"result_{i}"}) if status == 'completed' else None, f"Test error for task {i}" if status == 'failed' else None, datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S.%f'), started_at, completed_at, "http://192.168.0.148:18000" )) # 获取任务ID task_id = cursor.lastrowid # 插入批量任务子项 cursor.execute(""" INSERT INTO uni_comfyui_batch_item (batch_id, task_id, item_index) VALUES (?, ?, ?) """, (batch_id, task_id, i)) tasks.append({ 'task_id': task_id, 'workflow_run_id': workflow_run_id, 'status': status }) conn.commit() print(f"✅ 测试数据创建成功!") print(f"批量任务ID: {batch_id}") print(f"子任务数量: {len(tasks)}") for task in tasks: print(f" - 任务 {task['task_id']}: {task['status']}") return batch_id except Exception as e: conn.rollback() print(f"❌ 创建测试数据失败: {e}") return None finally: conn.close() def check_test_data(batch_id): """检查测试数据""" db_path = r"C:\Users\imeep\AppData\Roaming\mixvideo\mixvideoV2.db" conn = sqlite3.connect(db_path) cursor = conn.cursor() try: # 查询批量任务 cursor.execute("SELECT * FROM uni_comfyui_batch_task WHERE batch_id = ?", (batch_id,)) batch_task = cursor.fetchone() print(f"\n批量任务信息: {batch_task}") # 查询子任务 cursor.execute(""" SELECT t.*, bi.item_index FROM uni_comfyui_task t JOIN uni_comfyui_batch_item bi ON t.id = bi.task_id WHERE bi.batch_id = ? ORDER BY bi.item_index """, (batch_id,)) tasks = cursor.fetchall() print(f"\n子任务数量: {len(tasks)}") for task in tasks: print(f" - 任务 {task[0]}: {task[4]} (索引: {task[-1]})") # 查询失败任务 cursor.execute(""" SELECT COUNT(*) FROM uni_comfyui_task t JOIN uni_comfyui_batch_item bi ON t.id = bi.task_id WHERE bi.batch_id = ? AND t.status = 'failed' """, (batch_id,)) failed_count = cursor.fetchone()[0] print(f"\n失败任务数量: {failed_count}") # 查询完成任务 cursor.execute(""" SELECT COUNT(*) FROM uni_comfyui_task t JOIN uni_comfyui_batch_item bi ON t.id = bi.task_id WHERE bi.batch_id = ? AND t.status = 'completed' """, (batch_id,)) completed_count = cursor.fetchone()[0] print(f"完成任务数量: {completed_count}") except Exception as e: print(f"❌ 检查测试数据失败: {e}") finally: conn.close() def cleanup_test_data(): """清理测试数据""" db_path = r"C:\Users\imeep\AppData\Roaming\mixvideo\mixvideoV2.db" conn = sqlite3.connect(db_path) cursor = conn.cursor() try: # 删除测试批量任务 cursor.execute("DELETE FROM uni_comfyui_batch_task WHERE batch_id LIKE 'test_batch_%'") batch_deleted = cursor.rowcount # 删除测试任务 cursor.execute("DELETE FROM uni_comfyui_task WHERE workflow_run_id LIKE 'run_%'") task_deleted = cursor.rowcount # 删除测试批量任务子项 cursor.execute("DELETE FROM uni_comfyui_batch_item WHERE batch_id LIKE 'test_batch_%'") item_deleted = cursor.rowcount conn.commit() print(f"✅ 清理完成: 删除了 {batch_deleted} 个批量任务, {task_deleted} 个任务, {item_deleted} 个子项") except Exception as e: conn.rollback() print(f"❌ 清理失败: {e}") finally: conn.close() if __name__ == "__main__": print("=== UniComfyUI 批量任务详情 API 测试 ===") # 清理旧的测试数据 print("\n1. 清理旧的测试数据...") cleanup_test_data() # 创建新的测试数据 print("\n2. 创建测试数据...") batch_id = create_test_data() if batch_id: # 检查测试数据 print("\n3. 检查测试数据...") check_test_data(batch_id) print(f"\n✅ 测试数据准备完成!") print(f"现在可以使用以下批量任务ID测试API: {batch_id}") print(f"前端可以调用: get_batch_task_detail('{batch_id}')") else: print("\n❌ 测试数据创建失败!")