349 lines
13 KiB
Python
349 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
媒体管理器主类
|
|
"""
|
|
|
|
import os
|
|
import uuid
|
|
import shutil
|
|
from typing import List, Dict, Optional
|
|
from datetime import datetime
|
|
from dataclasses import asdict
|
|
|
|
from .types import VideoSegment, OriginalVideo, UploadResult, BatchUploadResult
|
|
from .video_info import create_video_info_extractor
|
|
from .scene_detector import create_scene_detector
|
|
from .video_processor import create_video_processor
|
|
from .storage import MediaStorage
|
|
from python_core.utils.logger import logger
|
|
|
|
class MediaManager:
|
|
"""媒体库管理器 - 简化版本"""
|
|
|
|
def __init__(self):
|
|
# 初始化组件
|
|
self.storage = MediaStorage()
|
|
self.video_info_extractor = create_video_info_extractor()
|
|
self.scene_detector = create_scene_detector()
|
|
self.video_processor = create_video_processor()
|
|
|
|
# 加载数据
|
|
self.video_segments = self.storage.load_video_segments()
|
|
self.original_videos = self.storage.load_original_videos()
|
|
|
|
def upload_video_file(self, source_path: str, filename: str = None, tags: List[str] = None,
|
|
progress_callback=None) -> UploadResult:
|
|
"""上传单个视频文件并分割成片段"""
|
|
if not os.path.exists(source_path):
|
|
raise FileNotFoundError(f"Source file not found: {source_path}")
|
|
|
|
if tags is None:
|
|
tags = []
|
|
|
|
# 进度回调
|
|
def report_progress(message: str):
|
|
if progress_callback:
|
|
progress_callback(message)
|
|
|
|
report_progress("计算文件哈希...")
|
|
# 计算MD5
|
|
md5_hash = self.video_processor.calculate_hash(source_path)
|
|
|
|
report_progress("检查重复文件...")
|
|
# 检查是否已存在相同MD5的视频
|
|
existing = self.storage.get_video_by_md5(self.original_videos, md5_hash)
|
|
if existing:
|
|
logger.info(f"Video with MD5 {md5_hash} already exists")
|
|
existing_segments = self.storage.get_segments_by_video_id(self.video_segments, existing.id)
|
|
|
|
# 如果已存在的视频没有分镜头,重新生成分镜头
|
|
if not existing_segments:
|
|
segments = self._regenerate_segments(existing, tags)
|
|
return UploadResult(
|
|
original_video=asdict(existing),
|
|
segments=[asdict(segment) for segment in segments],
|
|
is_duplicate=True,
|
|
segments_regenerated=True
|
|
)
|
|
else:
|
|
return UploadResult(
|
|
original_video=asdict(existing),
|
|
segments=[asdict(segment) for segment in existing_segments],
|
|
is_duplicate=True,
|
|
segments_regenerated=False
|
|
)
|
|
|
|
# 创建新视频
|
|
return self._create_new_video(source_path, filename, tags, md5_hash, report_progress)
|
|
|
|
def _regenerate_segments(self, video: OriginalVideo, tags: List[str]) -> List[VideoSegment]:
|
|
"""重新生成视频片段"""
|
|
logger.info(f"Regenerating segments for video {video.id}")
|
|
|
|
# 检测场景变化
|
|
scene_changes = self.scene_detector.detect_scenes(video.file_path)
|
|
|
|
# 为分镜片段处理标签
|
|
segment_tags = [tag for tag in tags if tag != "原始"] if tags else []
|
|
if "分镜" not in segment_tags:
|
|
segment_tags.append("分镜")
|
|
|
|
# 分割视频成片段
|
|
segments = self.video_processor.split_video(video.file_path, scene_changes, video.id, segment_tags)
|
|
|
|
# 保存新生成的片段
|
|
self.video_segments.extend(segments)
|
|
self.storage.save_video_segments(self.video_segments)
|
|
|
|
# 更新原始视频的segment_count
|
|
for i, v in enumerate(self.original_videos):
|
|
if v.id == video.id:
|
|
v.segment_count = len(segments)
|
|
v.updated_at = datetime.now().isoformat()
|
|
self.original_videos[i] = v
|
|
break
|
|
|
|
self.storage.save_original_videos(self.original_videos)
|
|
logger.info(f"Regenerated {len(segments)} segments for video {video.id}")
|
|
|
|
return segments
|
|
|
|
def _create_new_video(self, source_path: str, filename: str, tags: List[str], md5_hash: str,
|
|
progress_callback=None) -> UploadResult:
|
|
"""创建新视频"""
|
|
def report_progress(message: str):
|
|
if progress_callback:
|
|
progress_callback(message)
|
|
|
|
# 生成新的视频ID和文件名
|
|
video_id = str(uuid.uuid4())
|
|
if filename is None:
|
|
filename = os.path.basename(source_path)
|
|
|
|
report_progress("复制文件到存储目录...")
|
|
# 获取文件扩展名
|
|
file_ext = os.path.splitext(filename)[1].lower()
|
|
stored_filename = f"{video_id}{file_ext}"
|
|
stored_path = self.storage.video_storage_dir / stored_filename
|
|
|
|
# 复制文件到存储目录
|
|
shutil.copy2(source_path, stored_path)
|
|
|
|
report_progress("提取视频信息...")
|
|
# 获取视频基本信息
|
|
video_info = self.video_info_extractor.extract_video_info(str(stored_path))
|
|
|
|
report_progress("检测场景变化...")
|
|
# 检测场景变化
|
|
scene_changes = self.scene_detector.detect_scenes(str(stored_path))
|
|
|
|
# 创建原始视频记录
|
|
now = datetime.now().isoformat()
|
|
original_video = OriginalVideo(
|
|
id=video_id,
|
|
filename=filename,
|
|
file_path=str(stored_path),
|
|
md5_hash=md5_hash,
|
|
file_size=video_info.file_size,
|
|
duration=video_info.duration,
|
|
width=video_info.width,
|
|
height=video_info.height,
|
|
fps=video_info.fps,
|
|
format=file_ext[1:] if file_ext else 'unknown',
|
|
segment_count=len(scene_changes) - 1,
|
|
tags=tags,
|
|
created_at=now,
|
|
updated_at=now
|
|
)
|
|
|
|
report_progress("分割视频成片段...")
|
|
# 分割视频成片段
|
|
segment_tags = [tag for tag in tags if tag != "原始"]
|
|
if "分镜" not in segment_tags:
|
|
segment_tags.append("分镜")
|
|
segments = self.video_processor.split_video(str(stored_path), scene_changes, video_id, segment_tags)
|
|
|
|
report_progress("保存数据...")
|
|
# 保存数据
|
|
self.original_videos.append(original_video)
|
|
self.video_segments.extend(segments)
|
|
|
|
self.storage.save_original_videos(self.original_videos)
|
|
self.storage.save_video_segments(self.video_segments)
|
|
|
|
logger.info(f"Uploaded video: {filename} (MD5: {md5_hash}, {len(segments)} segments)")
|
|
|
|
return UploadResult(
|
|
original_video=asdict(original_video),
|
|
segments=[asdict(segment) for segment in segments],
|
|
is_duplicate=False
|
|
)
|
|
|
|
def batch_upload_video_files(self, source_directory: str, tags: List[str] = None) -> BatchUploadResult:
|
|
"""批量上传视频文件"""
|
|
if not os.path.exists(source_directory):
|
|
raise FileNotFoundError(f"Source directory not found: {source_directory}")
|
|
|
|
if tags is None:
|
|
tags = []
|
|
|
|
# 支持的视频格式
|
|
video_extensions = {'.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.webm', '.m4v'}
|
|
|
|
result = BatchUploadResult(
|
|
total_files=0,
|
|
uploaded_files=0,
|
|
skipped_files=0,
|
|
failed_files=0,
|
|
total_segments=0,
|
|
uploaded_list=[],
|
|
skipped_list=[],
|
|
failed_list=[]
|
|
)
|
|
|
|
# 遍历目录中的所有文件
|
|
for root, _, files in os.walk(source_directory):
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
file_ext = os.path.splitext(file)[1].lower()
|
|
|
|
# 检查是否为视频文件
|
|
if file_ext not in video_extensions:
|
|
continue
|
|
|
|
result.total_files += 1
|
|
|
|
try:
|
|
# 尝试上传文件
|
|
upload_result = self.upload_video_file(file_path, file, tags)
|
|
|
|
if upload_result.is_duplicate:
|
|
result.skipped_files += 1
|
|
result.skipped_list.append({
|
|
'filename': file,
|
|
'reason': 'Already exists (same MD5)'
|
|
})
|
|
else:
|
|
result.uploaded_files += 1
|
|
result.total_segments += len(upload_result.segments)
|
|
result.uploaded_list.append(asdict(upload_result))
|
|
|
|
except Exception as e:
|
|
result.failed_files += 1
|
|
result.failed_list.append({
|
|
'filename': file,
|
|
'error': str(e)
|
|
})
|
|
logger.error(f"Failed to upload {file}: {e}")
|
|
|
|
logger.info(f"Batch upload completed: {result.uploaded_files} uploaded, "
|
|
f"{result.skipped_files} skipped, {result.failed_files} failed")
|
|
|
|
return result
|
|
|
|
# 查询方法
|
|
def get_all_segments(self) -> List[Dict]:
|
|
"""获取所有视频片段"""
|
|
return [asdict(segment) for segment in self.video_segments if segment.is_active]
|
|
|
|
def get_all_original_videos(self) -> List[Dict]:
|
|
"""获取所有原始视频"""
|
|
return [asdict(video) for video in self.original_videos if video.is_active]
|
|
|
|
def get_segments_by_video_id(self, video_id: str) -> List[Dict]:
|
|
"""获取指定原始视频的所有片段"""
|
|
segments = self.storage.get_segments_by_video_id(self.video_segments, video_id)
|
|
return [asdict(segment) for segment in segments]
|
|
|
|
def get_segments_by_tags(self, tags: List[str], match_all: bool = False) -> List[Dict]:
|
|
"""根据标签搜索视频片段"""
|
|
segments = self.storage.get_segments_by_tags(self.video_segments, tags, match_all)
|
|
return [asdict(segment) for segment in segments]
|
|
|
|
def search_segments(self, keyword: str) -> List[Dict]:
|
|
"""搜索视频片段"""
|
|
segments = self.storage.search_segments(self.video_segments, keyword)
|
|
return [asdict(segment) for segment in segments]
|
|
|
|
def get_popular_segments(self, limit: int = 10) -> List[Dict]:
|
|
"""获取最常用的视频片段"""
|
|
segments = self.storage.get_popular_segments(self.video_segments, limit)
|
|
return [asdict(segment) for segment in segments]
|
|
|
|
# 管理方法
|
|
def add_segment_tags(self, segment_id: str, tags: List[str]) -> bool:
|
|
"""为视频片段添加标签"""
|
|
for i, segment in enumerate(self.video_segments):
|
|
if segment.id == segment_id and segment.is_active:
|
|
existing_tags = set(segment.tags)
|
|
new_tags = existing_tags.union(set(tags))
|
|
segment.tags = list(new_tags)
|
|
segment.updated_at = datetime.now().isoformat()
|
|
|
|
self.video_segments[i] = segment
|
|
self.storage.save_video_segments(self.video_segments)
|
|
|
|
logger.info(f"Added tags {tags} to segment {segment_id}")
|
|
return True
|
|
return False
|
|
|
|
def increment_segment_usage(self, segment_id: str) -> bool:
|
|
"""增加视频片段使用次数"""
|
|
for i, segment in enumerate(self.video_segments):
|
|
if segment.id == segment_id and segment.is_active:
|
|
segment.use_count += 1
|
|
segment.updated_at = datetime.now().isoformat()
|
|
|
|
self.video_segments[i] = segment
|
|
self.storage.save_video_segments(self.video_segments)
|
|
|
|
logger.info(f"Incremented usage count for segment {segment_id}")
|
|
return True
|
|
return False
|
|
|
|
def delete_segment(self, segment_id: str) -> bool:
|
|
"""删除视频片段"""
|
|
for i, segment in enumerate(self.video_segments):
|
|
if segment.id == segment_id:
|
|
# 删除物理文件
|
|
self.storage.delete_segment_files(segment)
|
|
|
|
# 从列表中移除
|
|
self.video_segments.pop(i)
|
|
self.storage.save_video_segments(self.video_segments)
|
|
|
|
logger.info(f"Deleted segment: {segment_id}")
|
|
return True
|
|
return False
|
|
|
|
def delete_original_video(self, video_id: str) -> bool:
|
|
"""删除原始视频及其所有片段"""
|
|
# 先删除所有相关片段
|
|
segments_to_delete = [s for s in self.video_segments if s.original_video_id == video_id]
|
|
for segment in segments_to_delete:
|
|
self.delete_segment(segment.id)
|
|
|
|
# 删除原始视频
|
|
for i, video in enumerate(self.original_videos):
|
|
if video.id == video_id:
|
|
# 删除物理文件
|
|
self.storage.delete_video_files(video)
|
|
|
|
# 从列表中移除
|
|
self.original_videos.pop(i)
|
|
self.storage.save_original_videos(self.original_videos)
|
|
|
|
logger.info(f"Deleted original video: {video_id}")
|
|
return True
|
|
return False
|
|
|
|
# 全局实例
|
|
_global_media_manager = None
|
|
|
|
def get_media_manager() -> MediaManager:
|
|
"""获取全局MediaManager实例"""
|
|
global _global_media_manager
|
|
if _global_media_manager is None:
|
|
_global_media_manager = MediaManager()
|
|
return _global_media_manager
|