This commit is contained in:
root 2025-07-11 21:55:34 +08:00
parent 0b7e3fb07a
commit d0881dc16e
3 changed files with 449 additions and 10 deletions

View File

@ -0,0 +1,5 @@
#!/usr/bin/env python3
from .cli import main
if __name__ == "__main__":
main()

View File

@ -0,0 +1,434 @@
#!/usr/bin/env python3
"""
场景检测服务
"""
import os
import time
import json
import csv
from pathlib import Path
from typing import List, Optional, Callable
from .types import (
DetectorType, SceneInfo, VideoSceneResult,
BatchDetectionConfig, BatchDetectionResult, DetectionStats
)
from python_core.utils.logger import logger
class SceneDetectionService:
"""场景检测服务"""
def __init__(self):
self.supported_formats = {'.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.webm', '.m4v'}
def detect_single_video(self, video_path: str, config: BatchDetectionConfig) -> VideoSceneResult:
"""检测单个视频的场景"""
start_time = time.time()
filename = os.path.basename(video_path)
try:
# 获取场景变化点
scene_changes = self._detect_scene_changes(video_path, config)
# 获取视频总时长
total_duration = self._get_video_duration(video_path)
# 构建场景信息
scenes = []
for i in range(len(scene_changes) - 1):
start_time_scene = scene_changes[i]
end_time_scene = scene_changes[i + 1]
duration = end_time_scene - start_time_scene
# 跳过太短的场景
if duration < config.min_scene_length:
continue
scene = SceneInfo(
index=len(scenes),
start_time=start_time_scene,
end_time=end_time_scene,
duration=duration,
confidence=1.0, # 简化版本,固定置信度
frame_count=int(duration * 25) # 假设25fps
)
scenes.append(scene)
detection_time = time.time() - start_time
return VideoSceneResult(
video_path=video_path,
filename=filename,
success=True,
total_scenes=len(scenes),
total_duration=total_duration,
scenes=scenes,
detection_time=detection_time,
detector_type=config.detector_type.value,
threshold=config.threshold
)
except Exception as e:
detection_time = time.time() - start_time
logger.error(f"Failed to detect scenes in {filename}: {e}")
return VideoSceneResult(
video_path=video_path,
filename=filename,
success=False,
total_scenes=0,
total_duration=0.0,
scenes=[],
detection_time=detection_time,
detector_type=config.detector_type.value,
threshold=config.threshold,
error=str(e)
)
def batch_detect_scenes(self, input_directory: str, config: BatchDetectionConfig,
progress_callback: Optional[Callable] = None) -> BatchDetectionResult:
"""批量检测场景"""
start_time = time.time()
# 扫描视频文件
video_files = self._scan_video_files(input_directory)
if not video_files:
return BatchDetectionResult(
total_files=0,
processed_files=0,
failed_files=0,
total_scenes=0,
total_duration=0.0,
average_scenes_per_video=0.0,
detection_time=0.0,
results=[],
failed_list=[],
config=config
)
results = []
failed_list = []
total_scenes = 0
total_duration = 0.0
for i, video_path in enumerate(video_files):
filename = os.path.basename(video_path)
# 报告进度
if progress_callback:
progress_callback(f"检测场景: {filename} ({i+1}/{len(video_files)})")
# 检测单个视频
result = self.detect_single_video(video_path, config)
if result.success:
results.append(result)
total_scenes += result.total_scenes
total_duration += result.total_duration
else:
failed_list.append({
'filename': filename,
'path': video_path,
'error': result.error
})
detection_time = time.time() - start_time
processed_files = len(results)
failed_files = len(failed_list)
average_scenes = total_scenes / processed_files if processed_files > 0 else 0.0
return BatchDetectionResult(
total_files=len(video_files),
processed_files=processed_files,
failed_files=failed_files,
total_scenes=total_scenes,
total_duration=total_duration,
average_scenes_per_video=average_scenes,
detection_time=detection_time,
results=results,
failed_list=failed_list,
config=config
)
def _detect_scene_changes(self, video_path: str, config: BatchDetectionConfig) -> List[float]:
"""检测场景变化点"""
try:
# 优先使用PySceneDetect
return self._detect_with_pyscenedetect(video_path, config)
except Exception:
try:
# 回退到OpenCV
return self._detect_with_opencv(video_path, config)
except Exception:
# 最后回退:返回整个视频作为一个场景
duration = self._get_video_duration(video_path)
return [0.0, duration]
def _detect_with_pyscenedetect(self, video_path: str, config: BatchDetectionConfig) -> List[float]:
"""使用PySceneDetect检测场景"""
try:
from scenedetect import VideoManager, SceneManager
from scenedetect.detectors import ContentDetector, ThresholdDetector
except ImportError:
raise Exception("PySceneDetect not available")
video_manager = VideoManager([video_path])
scene_manager = SceneManager()
# 根据配置选择检测器
if config.detector_type == DetectorType.CONTENT:
scene_manager.add_detector(ContentDetector(threshold=config.threshold))
elif config.detector_type == DetectorType.THRESHOLD:
scene_manager.add_detector(ThresholdDetector(threshold=config.threshold))
else: # ADAPTIVE
# 自适应:同时使用两种检测器
scene_manager.add_detector(ContentDetector(threshold=config.threshold))
scene_manager.add_detector(ThresholdDetector(threshold=config.threshold * 0.8))
video_manager.start()
scene_manager.detect_scenes(frame_source=video_manager)
scene_list = scene_manager.get_scene_list()
# 提取场景时间点
scene_changes = [0.0]
for scene in scene_list:
start_time = scene[0].get_seconds()
end_time = scene[1].get_seconds()
if start_time > 0 and start_time not in scene_changes:
scene_changes.append(start_time)
if end_time not in scene_changes:
scene_changes.append(end_time)
video_manager.release()
return sorted(scene_changes)
def _detect_with_opencv(self, video_path: str, config: BatchDetectionConfig) -> List[float]:
"""使用OpenCV检测场景"""
try:
import cv2
import numpy as np
except ImportError:
raise Exception("OpenCV not available")
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
if fps <= 0:
cap.release()
raise Exception(f"Invalid fps ({fps}) for video {video_path}")
scene_changes = [0.0]
prev_frame = None
frame_count = 0
frame_skip = max(1, int(fps / 2)) # 每秒检测2次
while True:
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_skip == 0:
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.resize(gray, (320, 240))
if prev_frame is not None:
diff = cv2.absdiff(prev_frame, gray)
mean_diff = np.mean(diff)
if mean_diff > config.threshold:
timestamp = frame_count / fps
if not scene_changes or timestamp - scene_changes[-1] > config.min_scene_length:
scene_changes.append(timestamp)
prev_frame = gray
frame_count += 1
# 添加视频结束时间
duration = frame_count / fps if fps > 0 else 0
if duration > 0 and (not scene_changes or duration - scene_changes[-1] > 0.5):
scene_changes.append(duration)
cap.release()
return scene_changes
def _get_video_duration(self, video_path: str) -> float:
"""获取视频时长"""
try:
import cv2
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
cap.release()
if fps > 0:
return frame_count / fps
return 0.0
except Exception:
return 0.0
def _scan_video_files(self, directory: str) -> List[str]:
"""扫描目录中的视频文件"""
video_files = []
for root, _, files in os.walk(directory):
for file in files:
file_ext = os.path.splitext(file)[1].lower()
if file_ext in self.supported_formats:
video_files.append(os.path.join(root, file))
return sorted(video_files)
def save_results(self, result: BatchDetectionResult, output_path: str) -> bool:
"""保存检测结果"""
try:
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
if result.config.output_format == "json":
self._save_json_results(result, output_path)
elif result.config.output_format == "csv":
self._save_csv_results(result, output_path)
elif result.config.output_format == "txt":
self._save_txt_results(result, output_path)
else:
raise ValueError(f"Unsupported output format: {result.config.output_format}")
logger.info(f"Results saved to {output_path}")
return True
except Exception as e:
logger.error(f"Failed to save results: {e}")
return False
def _save_json_results(self, result: BatchDetectionResult, output_path: Path):
"""保存JSON格式结果"""
# 转换为可序列化的字典
data = {
"summary": {
"total_files": result.total_files,
"processed_files": result.processed_files,
"failed_files": result.failed_files,
"total_scenes": result.total_scenes,
"total_duration": result.total_duration,
"average_scenes_per_video": result.average_scenes_per_video,
"detection_time": result.detection_time
},
"config": {
"detector_type": result.config.detector_type.value,
"threshold": result.config.threshold,
"min_scene_length": result.config.min_scene_length
},
"results": [],
"failed_files": result.failed_list
}
for video_result in result.results:
video_data = {
"filename": video_result.filename,
"video_path": video_result.video_path,
"total_scenes": video_result.total_scenes,
"total_duration": video_result.total_duration,
"detection_time": video_result.detection_time,
"scenes": [
{
"index": scene.index,
"start_time": scene.start_time,
"end_time": scene.end_time,
"duration": scene.duration,
"confidence": scene.confidence
}
for scene in video_result.scenes
]
}
data["results"].append(video_data)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def _save_csv_results(self, result: BatchDetectionResult, output_path: Path):
"""保存CSV格式结果"""
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# 写入表头
writer.writerow([
'filename', 'video_path', 'scene_index', 'start_time',
'end_time', 'duration', 'confidence'
])
# 写入数据
for video_result in result.results:
for scene in video_result.scenes:
writer.writerow([
video_result.filename,
video_result.video_path,
scene.index,
scene.start_time,
scene.end_time,
scene.duration,
scene.confidence
])
def _save_txt_results(self, result: BatchDetectionResult, output_path: Path):
"""保存文本格式结果"""
with open(output_path, 'w', encoding='utf-8') as f:
f.write("批量场景检测结果\n")
f.write("=" * 50 + "\n\n")
f.write(f"总文件数: {result.total_files}\n")
f.write(f"处理成功: {result.processed_files}\n")
f.write(f"处理失败: {result.failed_files}\n")
f.write(f"总场景数: {result.total_scenes}\n")
f.write(f"总时长: {result.total_duration:.2f}\n")
f.write(f"平均场景数: {result.average_scenes_per_video:.1f}\n")
f.write(f"检测耗时: {result.detection_time:.2f}\n\n")
for video_result in result.results:
f.write(f"文件: {video_result.filename}\n")
f.write(f" 场景数: {video_result.total_scenes}\n")
f.write(f" 时长: {video_result.total_duration:.2f}\n")
f.write(f" 检测时间: {video_result.detection_time:.2f}\n")
for scene in video_result.scenes:
f.write(f" 场景 {scene.index}: {scene.start_time:.2f}s - {scene.end_time:.2f}s ({scene.duration:.2f}s)\n")
f.write("\n")
def calculate_stats(self, result: BatchDetectionResult) -> DetectionStats:
"""计算检测统计信息"""
if not result.results:
return DetectionStats(
total_videos=0,
total_scenes=0,
total_duration=0.0,
average_duration_per_scene=0.0,
shortest_scene=0.0,
longest_scene=0.0,
most_scenes_video="",
least_scenes_video=""
)
all_scenes = []
for video_result in result.results:
all_scenes.extend(video_result.scenes)
scene_durations = [scene.duration for scene in all_scenes]
# 找出场景最多和最少的视频
most_scenes_video = max(result.results, key=lambda x: x.total_scenes)
least_scenes_video = min(result.results, key=lambda x: x.total_scenes)
return DetectionStats(
total_videos=len(result.results),
total_scenes=len(all_scenes),
total_duration=result.total_duration,
average_duration_per_scene=sum(scene_durations) / len(scene_durations) if scene_durations else 0.0,
shortest_scene=min(scene_durations) if scene_durations else 0.0,
longest_scene=max(scene_durations) if scene_durations else 0.0,
most_scenes_video=most_scenes_video.filename,
least_scenes_video=least_scenes_video.filename
)

View File

@ -69,7 +69,7 @@ pub async fn get_segments_by_video_id(app: AppHandle, video_id: String) -> Resul
pub async fn upload_video_file(app: AppHandle, request: UploadVideoRequest) -> Result<String, String> {
let mut args = vec![
"-m".to_string(),
"python_core.services.media_manager.cli".to_string(),
"python_core.services.media_manager".to_string(),
"upload_video_file".to_string(),
request.source_path,
];
@ -96,7 +96,7 @@ pub async fn upload_video_file(app: AppHandle, request: UploadVideoRequest) -> R
pub async fn batch_upload_video_files(app: AppHandle, request: BatchUploadVideoRequest) -> Result<String, String> {
let mut args = vec![
"-m".to_string(),
"python_core.services.media_manager.cli".to_string(),
"python_core.services.media_manager".to_string(),
"batch_upload_video_files".to_string(),
request.source_directory,
];
@ -120,7 +120,7 @@ pub async fn add_segment_tags(app: AppHandle, request: TagsRequest) -> Result<St
let args = vec![
"-m".to_string(),
"python_core.services.media_manager.cli".to_string(),
"python_core.services.media_manager".to_string(),
"add_segment_tags".to_string(),
request.segment_id,
tags_json,
@ -137,7 +137,7 @@ pub async fn remove_segment_tags(app: AppHandle, request: TagsRequest) -> Result
let args = vec![
"-m".to_string(),
"python_core.services.media_manager.cli".to_string(),
"python_core.services.media_manager".to_string(),
"remove_segment_tags".to_string(),
request.segment_id,
tags_json,
@ -151,7 +151,7 @@ pub async fn remove_segment_tags(app: AppHandle, request: TagsRequest) -> Result
pub async fn increment_segment_usage(app: AppHandle, segment_id: String) -> Result<String, String> {
let args = vec![
"-m".to_string(),
"python_core.services.media_manager.cli".to_string(),
"python_core.services.media_manager".to_string(),
"increment_segment_usage".to_string(),
segment_id,
];
@ -170,7 +170,7 @@ pub async fn get_segments_by_tags(app: AppHandle, request: SearchTagsRequest) ->
let args = vec![
"-m".to_string(),
"python_core.services.media_manager.cli".to_string(),
"python_core.services.media_manager".to_string(),
"get_segments_by_tags".to_string(),
tags_json,
match_all_json,
@ -184,7 +184,7 @@ pub async fn get_segments_by_tags(app: AppHandle, request: SearchTagsRequest) ->
pub async fn get_popular_segments(app: AppHandle, limit: Option<i32>) -> Result<String, String> {
let mut args = vec![
"-m".to_string(),
"python_core.services.media_manager.cli".to_string(),
"python_core.services.media_manager".to_string(),
"get_popular_segments".to_string(),
];
@ -200,7 +200,7 @@ pub async fn get_popular_segments(app: AppHandle, limit: Option<i32>) -> Result<
pub async fn search_segments(app: AppHandle, keyword: String) -> Result<String, String> {
let args = vec![
"-m".to_string(),
"pypython_core.services.media_manager.cli".to_string(),
"python_core.services.media_manager".to_string(),
"search_segments".to_string(),
keyword,
];
@ -213,7 +213,7 @@ pub async fn search_segments(app: AppHandle, keyword: String) -> Result<String,
pub async fn delete_segment(app: AppHandle, segment_id: String) -> Result<String, String> {
let args = vec![
"-m".to_string(),
"python_core.services.media_manager.cli".to_string(),
"python_core.services.media_manager".to_string(),
"delete_segment".to_string(),
segment_id,
];
@ -226,7 +226,7 @@ pub async fn delete_segment(app: AppHandle, segment_id: String) -> Result<String
pub async fn delete_original_video(app: AppHandle, video_id: String) -> Result<String, String> {
let args = vec![
"-m".to_string(),
"python_core.services.media_manager.cli".to_string(),
"python_core.services.media_manager".to_string(),
"delete_original_video".to_string(),
video_id,
];