mixvideo-v2/apps/desktop/src-tauri/test_batch_detail_api.py

196 lines
7.0 KiB
Python

#!/usr/bin/env python3
"""
测试 get_batch_task_detail API 的简单脚本
"""
import sqlite3
import json
import uuid
from datetime import datetime, timezone
def create_test_data():
"""创建测试数据"""
# 连接到数据库
db_path = r"C:\Users\imeep\AppData\Roaming\mixvideo\mixvideoV2.db"
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# 创建一个测试批量任务
batch_id = f"test_batch_{uuid.uuid4().hex[:8]}"
workflow_name = "test_workflow"
total_count = 5
print(f"创建测试批量任务: {batch_id}")
# 插入批量任务
cursor.execute("""
INSERT INTO uni_comfyui_batch_task
(batch_id, workflow_name, total_count, completed_count, failed_count, status, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (batch_id, workflow_name, total_count, 3, 1, 'running', datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S.%f')))
# 创建子任务
tasks = []
for i in range(total_count):
workflow_run_id = f"run_{uuid.uuid4().hex[:8]}"
if i < 3: # 前3个任务完成
status = 'completed'
started_at = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S.%f')
completed_at = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S.%f')
elif i == 3: # 第4个任务失败
status = 'failed'
started_at = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S.%f')
completed_at = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S.%f')
else: # 第5个任务运行中
status = 'running'
started_at = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S.%f')
completed_at = None
# 插入任务
cursor.execute("""
INSERT INTO uni_comfyui_task
(workflow_run_id, workflow_name, task_type, status, input_params,
result_data, error_message, created_at, started_at, completed_at, server_url)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
workflow_run_id, workflow_name, 'batch', status,
json.dumps({"test_param": f"value_{i}"}),
json.dumps({"output": f"result_{i}"}) if status == 'completed' else None,
f"Test error for task {i}" if status == 'failed' else None,
datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S.%f'),
started_at,
completed_at,
"http://192.168.0.148:18000"
))
# 获取任务ID
task_id = cursor.lastrowid
# 插入批量任务子项
cursor.execute("""
INSERT INTO uni_comfyui_batch_item (batch_id, task_id, item_index)
VALUES (?, ?, ?)
""", (batch_id, task_id, i))
tasks.append({
'task_id': task_id,
'workflow_run_id': workflow_run_id,
'status': status
})
conn.commit()
print(f"✅ 测试数据创建成功!")
print(f"批量任务ID: {batch_id}")
print(f"子任务数量: {len(tasks)}")
for task in tasks:
print(f" - 任务 {task['task_id']}: {task['status']}")
return batch_id
except Exception as e:
conn.rollback()
print(f"❌ 创建测试数据失败: {e}")
return None
finally:
conn.close()
def check_test_data(batch_id):
"""检查测试数据"""
db_path = r"C:\Users\imeep\AppData\Roaming\mixvideo\mixvideoV2.db"
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# 查询批量任务
cursor.execute("SELECT * FROM uni_comfyui_batch_task WHERE batch_id = ?", (batch_id,))
batch_task = cursor.fetchone()
print(f"\n批量任务信息: {batch_task}")
# 查询子任务
cursor.execute("""
SELECT t.*, bi.item_index
FROM uni_comfyui_task t
JOIN uni_comfyui_batch_item bi ON t.id = bi.task_id
WHERE bi.batch_id = ?
ORDER BY bi.item_index
""", (batch_id,))
tasks = cursor.fetchall()
print(f"\n子任务数量: {len(tasks)}")
for task in tasks:
print(f" - 任务 {task[0]}: {task[4]} (索引: {task[-1]})")
# 查询失败任务
cursor.execute("""
SELECT COUNT(*) FROM uni_comfyui_task t
JOIN uni_comfyui_batch_item bi ON t.id = bi.task_id
WHERE bi.batch_id = ? AND t.status = 'failed'
""", (batch_id,))
failed_count = cursor.fetchone()[0]
print(f"\n失败任务数量: {failed_count}")
# 查询完成任务
cursor.execute("""
SELECT COUNT(*) FROM uni_comfyui_task t
JOIN uni_comfyui_batch_item bi ON t.id = bi.task_id
WHERE bi.batch_id = ? AND t.status = 'completed'
""", (batch_id,))
completed_count = cursor.fetchone()[0]
print(f"完成任务数量: {completed_count}")
except Exception as e:
print(f"❌ 检查测试数据失败: {e}")
finally:
conn.close()
def cleanup_test_data():
"""清理测试数据"""
db_path = r"C:\Users\imeep\AppData\Roaming\mixvideo\mixvideoV2.db"
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# 删除测试批量任务
cursor.execute("DELETE FROM uni_comfyui_batch_task WHERE batch_id LIKE 'test_batch_%'")
batch_deleted = cursor.rowcount
# 删除测试任务
cursor.execute("DELETE FROM uni_comfyui_task WHERE workflow_run_id LIKE 'run_%'")
task_deleted = cursor.rowcount
# 删除测试批量任务子项
cursor.execute("DELETE FROM uni_comfyui_batch_item WHERE batch_id LIKE 'test_batch_%'")
item_deleted = cursor.rowcount
conn.commit()
print(f"✅ 清理完成: 删除了 {batch_deleted} 个批量任务, {task_deleted} 个任务, {item_deleted} 个子项")
except Exception as e:
conn.rollback()
print(f"❌ 清理失败: {e}")
finally:
conn.close()
if __name__ == "__main__":
print("=== UniComfyUI 批量任务详情 API 测试 ===")
# 清理旧的测试数据
print("\n1. 清理旧的测试数据...")
cleanup_test_data()
# 创建新的测试数据
print("\n2. 创建测试数据...")
batch_id = create_test_data()
if batch_id:
# 检查测试数据
print("\n3. 检查测试数据...")
check_test_data(batch_id)
print(f"\n✅ 测试数据准备完成!")
print(f"现在可以使用以下批量任务ID测试API: {batch_id}")
print(f"前端可以调用: get_batch_task_detail('{batch_id}')")
else:
print("\n❌ 测试数据创建失败!")