#!/usr/bin/env python3 """ 直接测试数据库查询功能,验证我们的Repository方法是否正确 """ import sqlite3 import json from datetime import datetime def test_repository_queries(): """测试Repository查询方法""" # 连接到数据库 db_path = r"C:\Users\imeep\AppData\Roaming\mixvideo\mixvideoV2.db" conn = sqlite3.connect(db_path) cursor = conn.cursor() try: batch_id = "test_batch_1e75f2f3" print("=== 测试Repository查询方法 ===") # 1. 测试获取批量任务基本信息 print("\n1. 测试获取批量任务基本信息...") cursor.execute(""" SELECT id, batch_id, workflow_name, total_count, completed_count, failed_count, status, created_at, completed_at FROM uni_comfyui_batch_task WHERE batch_id = ? """, (batch_id,)) batch_task = cursor.fetchone() if batch_task: print(f"✅ 找到批量任务: {batch_task[1]}") print(f" 工作流: {batch_task[2]}") print(f" 总数: {batch_task[3]}, 完成: {batch_task[4]}, 失败: {batch_task[5]}") print(f" 状态: {batch_task[6]}") else: print("❌ 未找到批量任务") return # 2. 测试获取失败任务 print("\n2. 测试获取失败任务...") cursor.execute(""" SELECT t.id, t.workflow_run_id, t.workflow_name, t.task_type, t.status, t.input_params, t.result_data, t.error_message, t.created_at, t.started_at, t.completed_at, t.server_url FROM uni_comfyui_task t JOIN uni_comfyui_batch_item bi ON t.id = bi.task_id WHERE bi.batch_id = ? AND t.status = 'failed' ORDER BY t.created_at DESC """, (batch_id,)) failed_tasks = cursor.fetchall() print(f"✅ 找到 {len(failed_tasks)} 个失败任务") for task in failed_tasks: print(f" 任务ID: {task[0]}, 错误: {task[7]}") # 3. 测试获取完成任务样本 print("\n3. 测试获取完成任务样本...") cursor.execute(""" SELECT t.id, t.workflow_run_id, t.workflow_name, t.task_type, t.status, t.input_params, t.result_data, t.error_message, t.created_at, t.started_at, t.completed_at, t.server_url FROM uni_comfyui_task t JOIN uni_comfyui_batch_item bi ON t.id = bi.task_id WHERE bi.batch_id = ? AND t.status = 'completed' ORDER BY t.completed_at DESC LIMIT 10 """, (batch_id,)) completed_tasks = cursor.fetchall() print(f"✅ 找到 {len(completed_tasks)} 个完成任务样本") for task in completed_tasks: print(f" 任务ID: {task[0]}, 状态: {task[4]}") # 4. 测试获取所有任务 print("\n4. 测试获取所有任务...") cursor.execute(""" SELECT t.id, t.workflow_run_id, t.workflow_name, t.task_type, t.status, t.input_params, t.result_data, t.error_message, t.created_at, t.started_at, t.completed_at, t.server_url FROM uni_comfyui_task t JOIN uni_comfyui_batch_item bi ON t.id = bi.task_id WHERE bi.batch_id = ? ORDER BY bi.item_index ASC """, (batch_id,)) all_tasks = cursor.fetchall() print(f"✅ 找到 {len(all_tasks)} 个任务") # 5. 计算执行时间统计 print("\n5. 计算执行时间统计...") completed_with_times = [] for task in all_tasks: if task[4] == 'completed' and task[9] and task[10]: # status, started_at, completed_at try: started = datetime.fromisoformat(task[9].replace('Z', '+00:00')) completed = datetime.fromisoformat(task[10].replace('Z', '+00:00')) execution_time = (completed - started).total_seconds() completed_with_times.append(execution_time) print(f" 任务 {task[0]}: {execution_time:.2f}s") except Exception as e: print(f" 任务 {task[0]}: 时间解析错误 - {e}") if completed_with_times: total_time = sum(completed_with_times) avg_time = total_time / len(completed_with_times) print(f"✅ 执行时间统计:") print(f" 总执行时间: {total_time:.2f}s") print(f" 平均执行时间: {avg_time:.2f}s") print(f" 完成任务数: {len(completed_with_times)}") else: print("⚠️ 没有找到有效的执行时间数据") # 6. 验证数据完整性 print("\n6. 验证数据完整性...") status_counts = {} for task in all_tasks: status = task[4] status_counts[status] = status_counts.get(status, 0) + 1 print(f"✅ 任务状态统计:") for status, count in status_counts.items(): print(f" {status}: {count}") # 验证与批量任务记录的一致性 expected_completed = batch_task[4] expected_failed = batch_task[5] actual_completed = status_counts.get('completed', 0) actual_failed = status_counts.get('failed', 0) if actual_completed == expected_completed and actual_failed == expected_failed: print("✅ 数据一致性检查通过") else: print(f"❌ 数据不一致: 期望完成{expected_completed}个,实际{actual_completed}个;期望失败{expected_failed}个,实际{actual_failed}个") print("\n🎉 所有查询测试完成!") except Exception as e: print(f"❌ 测试失败: {e}") import traceback traceback.print_exc() finally: conn.close() def test_batch_task_detail_structure(): """测试批量任务详情的数据结构""" print("\n=== 测试批量任务详情数据结构 ===") # 模拟我们API返回的数据结构 batch_detail = { "batch_id": "test_batch_ad27bcac", "workflow_name": "test_workflow", "total_count": 5, "completed_count": 3, "failed_count": 1, "running_count": 1, "pending_count": 0, "status": "running", "progress_percentage": 80.0, "created_at": "2025-08-20T18:25:37.179221Z", "input_params_summary": "批量处理 5 个任务", "failed_tasks": [ { "id": 9, "workflow_run_id": "run_12345", "workflow_name": "test_workflow", "task_type": "batch", "status": "failed", "error_message": "Test error for task 3", "created_at": "2025-08-20T18:25:37.179221Z" } ], "completed_tasks_sample": [ { "id": 6, "workflow_run_id": "run_67890", "workflow_name": "test_workflow", "task_type": "batch", "status": "completed", "created_at": "2025-08-20T18:25:37.179221Z" } ], "total_execution_time_seconds": 0.0, "average_task_time_seconds": 0.0 } print("✅ 批量任务详情数据结构:") print(json.dumps(batch_detail, indent=2, ensure_ascii=False)) # 验证必要字段 required_fields = [ "batch_id", "workflow_name", "total_count", "completed_count", "failed_count", "running_count", "pending_count", "status", "progress_percentage", "created_at", "failed_tasks", "completed_tasks_sample", "total_execution_time_seconds", "average_task_time_seconds" ] missing_fields = [field for field in required_fields if field not in batch_detail] if missing_fields: print(f"❌ 缺少必要字段: {missing_fields}") else: print("✅ 所有必要字段都存在") if __name__ == "__main__": test_repository_queries() test_batch_task_detail_structure()