This commit is contained in:
root 2025-07-12 22:19:39 +08:00
parent 2a9199ca16
commit 3f5af3404b
5 changed files with 13 additions and 456 deletions

View File

@ -1,226 +0,0 @@
#!/usr/bin/env python3
"""
Test Batch Scene Detection and Splitting Workflow
测试批量场景检测和切分工作流
"""
import sys
import json
import tempfile
from pathlib import Path
# 添加项目根目录到Python路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from python_core.scene_detection import SceneDetector, DetectorType, OutputFormat
def test_batch_workflow():
"""测试批量工作流"""
print("🚀 测试批量场景检测和切分工作流")
print("=" * 60)
try:
# 创建检测器
detector = SceneDetector()
# 准备测试视频
test_videos = [
Path("assets/1/1752032011698.mp4")
]
# 检查测试视频是否存在
existing_videos = [v for v in test_videos if v.exists()]
if not existing_videos:
print("❌ 没有找到测试视频文件")
print("💡 请确保 assets/1/1752032011698.mp4 文件存在")
return False
print(f"📹 找到 {len(existing_videos)} 个测试视频:")
for video in existing_videos:
print(f"{video}")
# 创建临时输出目录
with tempfile.TemporaryDirectory() as temp_dir:
output_dir = Path(temp_dir) / "batch_output"
print(f"\n📂 输出目录: {output_dir}")
# 执行批量处理
print("\n🔄 开始批量处理...")
result = detector.batch_detect_and_split(
video_paths=existing_videos,
output_base_dir=output_dir,
detector_type=DetectorType.CONTENT,
threshold=30.0,
min_scene_length=1.0,
output_format=OutputFormat.JSON,
enable_ai_analysis=False,
enable_video_splitting=True,
max_concurrent=1, # 使用单线程避免资源竞争
continue_on_error=True
)
# 检查结果
if result.get("workflow_state") == "completed":
print("✅ 批量处理完成!")
# 显示摘要
batch_results = result.get("batch_results", {})
print(f"\n📊 处理摘要:")
print(f" 总视频数: {batch_results.get('total_videos', 0)}")
print(f" 成功处理: {batch_results.get('completed_videos', 0)}")
print(f" 处理失败: {batch_results.get('failed_videos', 0)}")
print(f" 成功率: {batch_results.get('success_rate', 0):.1f}%")
# 显示每个任务的详细信息
tasks = batch_results.get('tasks', [])
print(f"\n📋 任务详情:")
for i, task in enumerate(tasks, 1):
video_name = Path(task['video_path']).name
status = task['status']
scenes = task.get('total_scenes', 0)
splits = task.get('split_count', 0)
proc_time = task.get('processing_time', 0)
print(f" 任务 {i}: {video_name}")
print(f" 状态: {status}")
print(f" 场景数: {scenes}")
print(f" 切分数: {splits}")
print(f" 处理时间: {proc_time:.2f}s")
if task.get('error'):
print(f" 错误: {task['error']}")
# 检查输出文件
if task.get('output_dir'):
output_path = Path(task['output_dir'])
if output_path.exists():
print(f" 输出目录: {output_path}")
# 检查场景检测结果文件
scenes_file = output_path / "scenes.json"
if scenes_file.exists():
print(f" ✅ 场景检测结果: {scenes_file}")
# 检查切分目录
scenes_dir = output_path / "scenes"
if scenes_dir.exists():
split_files = list(scenes_dir.glob("*.mp4"))
print(f" ✅ 切分文件: {len(split_files)}")
for split_file in split_files[:3]: # 只显示前3个
size_mb = split_file.stat().st_size / (1024 * 1024)
print(f"{split_file.name} ({size_mb:.1f}MB)")
if len(split_files) > 3:
print(f" ... 还有 {len(split_files) - 3} 个文件")
# 检查切分摘要
summary_file = output_path / "split_summary.json"
if summary_file.exists():
print(f" ✅ 切分摘要: {summary_file}")
try:
with open(summary_file, 'r', encoding='utf-8') as f:
summary_data = json.load(f)
print(f" 成功切分: {summary_data.get('successful_splits', 0)}")
print(f" 失败切分: {summary_data.get('failed_splits', 0)}")
print(f" 总输出大小: {summary_data.get('total_output_size', 0):,} bytes")
except Exception as e:
print(f" ⚠️ 读取摘要失败: {e}")
return True
else:
print("❌ 批量处理失败")
errors = result.get("errors", [])
if errors:
print("错误信息:")
for error in errors:
print(f"{error}")
return False
except Exception as e:
print(f"❌ 测试异常: {e}")
import traceback
print(f"详细错误: {traceback.format_exc()}")
return False
def test_cli_batch_command():
"""测试CLI批量命令"""
print("\n🧪 测试CLI批量命令")
print("=" * 60)
try:
# 检查CLI帮助
import subprocess
result = subprocess.run(
['python3', '-m', 'python_core.cli', 'scene', 'batch', '--help'],
capture_output=True,
text=True,
cwd=project_root
)
if result.returncode == 0:
print("✅ CLI批量命令帮助正常")
print("📋 命令帮助预览:")
help_lines = result.stdout.split('\n')[:10] # 只显示前10行
for line in help_lines:
if line.strip():
print(f" {line}")
if len(result.stdout.split('\n')) > 10:
print(" ...")
return True
else:
print("❌ CLI批量命令帮助失败")
print(f"错误: {result.stderr}")
return False
except Exception as e:
print(f"❌ CLI测试异常: {e}")
return False
def main():
"""主函数"""
print("🚀 开始测试批量场景检测和切分工作流")
print("=" * 80)
tests = [
("批量工作流功能", test_batch_workflow),
("CLI批量命令", test_cli_batch_command),
]
passed = 0
total = len(tests)
for test_name, test_func in tests:
try:
if test_func():
passed += 1
print(f"{test_name} - 通过")
else:
print(f"{test_name} - 失败")
except Exception as e:
print(f"{test_name} - 异常: {e}")
print("\n" + "=" * 80)
print(f"🎉 测试完成: {passed}/{total} 通过")
if passed == total:
print("🎊 所有测试都通过了!批量工作流开发成功!")
print("\n💡 使用方法:")
print(" # 批量处理目录中的所有视频")
print(" python3 -m python_core.cli scene batch input_dir output_dir")
print(" ")
print(" # 自定义参数")
print(" python3 -m python_core.cli scene batch input_dir output_dir \\")
print(" --threshold 15.0 --concurrent 4 --no-ai --split")
else:
print("⚠️ 部分测试失败,需要进一步调试")
return passed == total
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)

View File

@ -1,154 +0,0 @@
#!/usr/bin/env python3
"""
Video Validation Tool
视频验证工具
验证切分后的视频文件是否有效
"""
import sys
import subprocess
from pathlib import Path
def validate_video_file(video_path: Path) -> dict:
"""验证单个视频文件"""
try:
# 使用ffprobe获取视频信息
cmd = [
'ffprobe',
'-v', 'quiet',
'-print_format', 'json',
'-show_format',
'-show_streams',
str(video_path)
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
import json
data = json.loads(result.stdout)
# 提取基本信息
format_info = data.get('format', {})
streams = data.get('streams', [])
video_stream = None
audio_stream = None
for stream in streams:
if stream.get('codec_type') == 'video':
video_stream = stream
elif stream.get('codec_type') == 'audio':
audio_stream = stream
return {
'valid': True,
'file_size': int(format_info.get('size', 0)),
'duration': float(format_info.get('duration', 0)),
'bitrate': int(format_info.get('bit_rate', 0)),
'video_codec': video_stream.get('codec_name') if video_stream else None,
'video_resolution': f"{video_stream.get('width')}x{video_stream.get('height')}" if video_stream else None,
'video_fps': eval(video_stream.get('r_frame_rate', '0/1')) if video_stream else 0,
'audio_codec': audio_stream.get('codec_name') if audio_stream else None,
'audio_sample_rate': int(audio_stream.get('sample_rate', 0)) if audio_stream else 0,
'error': None
}
else:
return {
'valid': False,
'error': f"ffprobe failed: {result.stderr}"
}
except Exception as e:
return {
'valid': False,
'error': str(e)
}
def validate_split_videos(scenes_dir: Path):
"""验证切分目录中的所有视频"""
print(f"🔍 验证目录: {scenes_dir}")
if not scenes_dir.exists():
print(f"❌ 目录不存在: {scenes_dir}")
return False
# 查找所有mp4文件
video_files = list(scenes_dir.glob("*.mp4"))
if not video_files:
print(f"❌ 目录中没有找到mp4文件")
return False
print(f"📹 找到 {len(video_files)} 个视频文件")
valid_count = 0
total_size = 0
total_duration = 0
for video_file in sorted(video_files):
print(f"\n🎬 验证: {video_file.name}")
validation = validate_video_file(video_file)
if validation['valid']:
valid_count += 1
file_size = validation['file_size']
duration = validation['duration']
total_size += file_size
total_duration += duration
print(f" ✅ 有效")
print(f" 📏 大小: {file_size:,} bytes ({file_size/1024/1024:.1f} MB)")
print(f" ⏱️ 时长: {duration:.2f}")
print(f" 🎥 视频: {validation['video_codec']} {validation['video_resolution']} {validation['video_fps']:.1f}fps")
if validation['audio_codec']:
print(f" 🔊 音频: {validation['audio_codec']} {validation['audio_sample_rate']}Hz")
print(f" 📊 码率: {validation['bitrate']:,} bps")
else:
print(f" ❌ 无效: {validation['error']}")
print(f"\n📊 验证摘要:")
print(f" 有效文件: {valid_count}/{len(video_files)}")
print(f" 成功率: {valid_count/len(video_files)*100:.1f}%")
print(f" 总大小: {total_size:,} bytes ({total_size/1024/1024:.1f} MB)")
print(f" 总时长: {total_duration:.2f}")
return valid_count == len(video_files)
def main():
"""主函数"""
print("🚀 视频验证工具")
print("=" * 50)
# 验证最新的切分结果
test_scenes_dir = Path("/tmp/test_batch_output/1752032011698/scenes")
if test_scenes_dir.exists():
print("🎯 验证最新的批量切分结果")
success = validate_split_videos(test_scenes_dir)
if success:
print("\n🎉 所有视频文件都有效!切分成功!")
return 0
else:
print("\n⚠️ 部分视频文件无效")
return 1
else:
print("❌ 没有找到测试切分结果")
print("💡 请先运行批量切分命令:")
print(" python3 -m python_core.cli scene batch /tmp/test_batch_input /tmp/test_batch_output --verbose --no-ai")
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@ -1,69 +0,0 @@
#!/usr/bin/env python3
"""
UserTablePostgres 使用示例
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from python_core.database.user_postgres import UserTablePostgres
def main():
"""主函数"""
print("=== UserTablePostgres 使用示例 ===\n")
# 创建用户表实例
user_table = UserTablePostgres()
print("✅ 用户表初始化成功")
# 创建用户
print("\n1. 创建用户...")
user_id = user_table.create_user(
username="john_doe",
email="john@example.com",
password="secure_password_123",
display_name="John Doe"
)
print(f"用户创建成功ID: {user_id}")
# 获取用户信息
print("\n2. 获取用户信息...")
user = user_table.get_user_by_username("john_doe")
print(f"用户信息: {user['display_name']} ({user['email']})")
# 用户认证
print("\n3. 用户认证...")
auth_result = user_table.authenticate_user("john_doe", "secure_password_123")
if auth_result:
print(f"认证成功: {auth_result['username']}")
else:
print("认证失败")
# 更新用户信息
print("\n4. 更新用户信息...")
user_table.update_user(user_id, {
"display_name": "John Smith",
"avatar_url": "https://example.com/avatar.jpg"
})
print("用户信息更新成功")
# 获取用户统计
print("\n5. 用户统计...")
user_count = user_table.get_user_count()
print(f"总用户数: {user_count}")
# 搜索用户
print("\n6. 搜索用户...")
search_results = user_table.search_users("john")
print(f"搜索到 {len(search_results)} 个用户")
# 清理:删除测试用户
print("\n7. 清理测试数据...")
user_table.delete_user(user_id)
print("测试用户已删除")
print("\n✅ 示例完成")
if __name__ == "__main__":
main()

View File

@ -43,6 +43,7 @@ pub fn run() {
commands::get_templates,
commands::get_template,
commands::get_template_detail,
commands::get_template_detail_cli,
commands::delete_template,
commands::get_all_resource_categories,
commands::get_resource_category_by_id,

View File

@ -229,19 +229,24 @@ export class TemplateServiceV2 {
/**
* ()
* 使get_template_detail命令CLI版本还未实现详细信息功能
*/
static async getTemplateDetail(templateId: string): Promise<TemplateDetail | null> {
static async getTemplateDetail(templateId: string, userId?: string): Promise<TemplateDetail | null> {
try {
// 暂时使用原版本的命令直到CLI版本实现详细信息功能
const response = await invoke<string>('get_template_detail', { template_id: templateId })
const detail = JSON.parse(response)
const request: TemplateGetRequest = {
template_id: templateId,
user_id: userId || this.getCurrentUserId(),
verbose: false,
json_output: true
}
if (!detail) {
const response = await invoke<TemplateResponse>('get_template_detail_cli', { request })
if (!response.success) {
console.error('Get template detail failed:', response.error || response.message)
return null
}
return detail as TemplateDetail
return response.data as TemplateDetail
} catch (error) {
console.error('Get template detail failed:', error)
return null