""" 媒体库管理服务 管理视频素材,包括上传、转场镜头分割、标签管理等 """ import json import uuid import os import hashlib import shutil from pathlib import Path from typing import List, Dict, Optional, Tuple from dataclasses import dataclass, asdict from datetime import datetime from python_core.config import settings from python_core.utils.logger import logger from python_core.utils.jsonrpc import create_response_handler # 视频处理库 try: import cv2 import numpy as np VIDEO_LIBS_AVAILABLE = True except ImportError: logger.warning("Video processing libraries not available. Install opencv-python for full functionality.") VIDEO_LIBS_AVAILABLE = False @dataclass class VideoSegment: """视频片段数据结构""" id: str original_video_id: str segment_index: int filename: str file_path: str md5_hash: str file_size: int duration: float width: int height: int fps: float format: str start_time: float end_time: float tags: List[str] use_count: int thumbnail_path: Optional[str] = None scene_score: Optional[float] = None # 场景变化分数 motion_score: Optional[float] = None # 运动强度分数 brightness: Optional[float] = None contrast: Optional[float] = None created_at: str = "" updated_at: str = "" is_active: bool = True @dataclass class OriginalVideo: """原始视频数据结构""" id: str filename: str file_path: str md5_hash: str file_size: int duration: float width: int height: int fps: float format: str segment_count: int tags: List[str] created_at: str = "" updated_at: str = "" is_active: bool = True class MediaManager: """媒体库管理器""" def __init__(self): self.cache_dir = settings.temp_dir / "cache" self.cache_dir.mkdir(parents=True, exist_ok=True) # 数据文件 self.segments_file = self.cache_dir / "video_segments.json" self.original_videos_file = self.cache_dir / "original_videos.json" # 加载数据 self.video_segments = self._load_video_segments() self.original_videos = self._load_original_videos() # 存储目录 self.video_storage_dir = settings.temp_dir / "video_storage" self.video_storage_dir.mkdir(parents=True, exist_ok=True) self.segments_dir = settings.temp_dir / "video_segments" self.segments_dir.mkdir(parents=True, exist_ok=True) self.thumbnails_dir = settings.temp_dir / "video_thumbnails" self.thumbnails_dir.mkdir(parents=True, exist_ok=True) def _load_video_segments(self) -> List[VideoSegment]: """加载视频片段数据""" if self.segments_file.exists(): try: with open(self.segments_file, 'r', encoding='utf-8') as f: data = json.load(f) return [VideoSegment(**item) for item in data] except Exception as e: logger.error(f"Failed to load video segments: {e}") return [] else: return [] def _load_original_videos(self) -> List[OriginalVideo]: """加载原始视频数据""" if self.original_videos_file.exists(): try: with open(self.original_videos_file, 'r', encoding='utf-8') as f: data = json.load(f) return [OriginalVideo(**item) for item in data] except Exception as e: logger.error(f"Failed to load original videos: {e}") return [] else: return [] def _save_video_segments(self, segments: List[VideoSegment] = None): """保存视频片段数据""" if segments is None: segments = self.video_segments try: data = [asdict(segment) for segment in segments] with open(self.segments_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) logger.info(f"Video segments saved to {self.segments_file}") except Exception as e: logger.error(f"Failed to save video segments: {e}") raise def _save_original_videos(self, videos: List[OriginalVideo] = None): """保存原始视频数据""" if videos is None: videos = self.original_videos try: data = [asdict(video) for video in videos] with open(self.original_videos_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) logger.info(f"Original videos saved to {self.original_videos_file}") except Exception as e: logger.error(f"Failed to save original videos: {e}") raise def _calculate_md5(self, file_path: str) -> str: """计算文件MD5哈希值""" hash_md5 = hashlib.md5() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def _get_video_info(self, file_path: str) -> Dict: """获取视频基本信息""" if not VIDEO_LIBS_AVAILABLE: # 如果没有视频处理库,返回基本信息 file_size = os.path.getsize(file_path) return { 'duration': 0.0, 'width': 0, 'height': 0, 'fps': 0.0, 'frame_count': 0, 'file_size': file_size } try: cap = cv2.VideoCapture(file_path) # 获取视频属性 fps = cap.get(cv2.CAP_PROP_FPS) frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) duration = frame_count / fps if fps > 0 else 0 cap.release() return { 'duration': duration, 'width': width, 'height': height, 'fps': fps, 'frame_count': frame_count, 'file_size': os.path.getsize(file_path) } except Exception as e: logger.error(f"Failed to get video info: {e}") file_size = os.path.getsize(file_path) return { 'duration': 0.0, 'width': 0, 'height': 0, 'fps': 0.0, 'frame_count': 0, 'file_size': file_size } def _detect_scene_changes(self, file_path: str, threshold: float = 30.0) -> List[float]: """检测场景变化点(转场镜头)""" if not VIDEO_LIBS_AVAILABLE: logger.warning("Video processing not available, returning empty scene changes") return [] try: cap = cv2.VideoCapture(file_path) fps = cap.get(cv2.CAP_PROP_FPS) scene_changes = [0.0] # 开始时间 prev_frame = None frame_count = 0 while True: ret, frame = cap.read() if not ret: break # 转换为灰度图 gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if prev_frame is not None: # 计算帧间差异 diff = cv2.absdiff(prev_frame, gray) mean_diff = np.mean(diff) # 如果差异超过阈值,认为是场景变化 if mean_diff > threshold: timestamp = frame_count / fps scene_changes.append(timestamp) logger.debug(f"Scene change detected at {timestamp:.2f}s (diff: {mean_diff:.2f})") prev_frame = gray frame_count += 1 cap.release() # 添加结束时间 duration = frame_count / fps if fps > 0 else 0 if duration > 0: scene_changes.append(duration) logger.info(f"Detected {len(scene_changes)-1} scene changes in video") return scene_changes except Exception as e: logger.error(f"Failed to detect scene changes: {e}") return [0.0] def _generate_thumbnail(self, video_path: str, timestamp: float, output_path: str) -> bool: """生成视频缩略图""" if not VIDEO_LIBS_AVAILABLE: return False def _split_video_by_scenes(self, video_path: str, scene_changes: List[float], original_video_id: str, tags: List[str] = None) -> List[VideoSegment]: """根据场景变化分割视频""" if tags is None: tags = [] if not VIDEO_LIBS_AVAILABLE: logger.warning("Video processing not available, creating single segment") # 创建单个片段 video_info = self._get_video_info(video_path) segment_id = str(uuid.uuid4()) segment_filename = f"{segment_id}.mp4" segment_path = self.segments_dir / segment_filename # 复制整个视频作为单个片段 shutil.copy2(video_path, segment_path) now = datetime.now().isoformat() segment = VideoSegment( id=segment_id, original_video_id=original_video_id, segment_index=0, filename=segment_filename, file_path=str(segment_path), md5_hash=self._calculate_md5(str(segment_path)), file_size=video_info['file_size'], duration=video_info['duration'], width=video_info['width'], height=video_info['height'], fps=video_info['fps'], format='mp4', start_time=0.0, end_time=video_info['duration'], tags=tags.copy(), use_count=0, created_at=now, updated_at=now ) return [segment] segments = [] try: cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # 设置视频编码器 fourcc = cv2.VideoWriter_fourcc(*'mp4v') for i in range(len(scene_changes) - 1): start_time = scene_changes[i] end_time = scene_changes[i + 1] duration = end_time - start_time # 跳过太短的片段(小于1秒) if duration < 1.0: continue segment_id = str(uuid.uuid4()) segment_filename = f"{segment_id}.mp4" segment_path = self.segments_dir / segment_filename # 创建视频写入器 out = cv2.VideoWriter(str(segment_path), fourcc, fps, (width, height)) # 跳转到开始帧 start_frame = int(start_time * fps) end_frame = int(end_time * fps) cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) frame_count = 0 while frame_count < (end_frame - start_frame): ret, frame = cap.read() if not ret: break out.write(frame) frame_count += 1 out.release() # 生成缩略图 thumbnail_filename = f"{segment_id}_thumb.jpg" thumbnail_path = self.thumbnails_dir / thumbnail_filename thumbnail_generated = self._generate_thumbnail( str(segment_path), duration / 2, # 中间帧作为缩略图 str(thumbnail_path) ) # 创建片段记录 now = datetime.now().isoformat() segment = VideoSegment( id=segment_id, original_video_id=original_video_id, segment_index=i, filename=segment_filename, file_path=str(segment_path), md5_hash=self._calculate_md5(str(segment_path)), file_size=os.path.getsize(segment_path), duration=duration, width=width, height=height, fps=fps, format='mp4', start_time=start_time, end_time=end_time, tags=tags.copy(), use_count=0, thumbnail_path=str(thumbnail_path) if thumbnail_generated else None, created_at=now, updated_at=now ) segments.append(segment) logger.info(f"Created segment {i}: {start_time:.2f}s - {end_time:.2f}s ({duration:.2f}s)") cap.release() except Exception as e: logger.error(f"Failed to split video: {e}") # 如果分割失败,创建单个片段 return self._split_video_by_scenes(video_path, [0.0, self._get_video_info(video_path)['duration']], original_video_id, tags) return segments def get_video_by_md5(self, md5_hash: str) -> Optional[Dict]: """根据MD5获取原始视频""" for video in self.original_videos: if video.md5_hash == md5_hash and video.is_active: return asdict(video) return None def get_segment_by_md5(self, md5_hash: str) -> Optional[Dict]: """根据MD5获取视频片段""" for segment in self.video_segments: if segment.md5_hash == md5_hash and segment.is_active: return asdict(segment) return None def get_all_segments(self) -> List[Dict]: """获取所有视频片段""" return [asdict(segment) for segment in self.video_segments if segment.is_active] def get_all_original_videos(self) -> List[Dict]: """获取所有原始视频""" return [asdict(video) for video in self.original_videos if video.is_active] def get_segments_by_video_id(self, video_id: str) -> List[Dict]: """获取指定原始视频的所有片段""" segments = [] for segment in self.video_segments: if segment.original_video_id == video_id and segment.is_active: segments.append(asdict(segment)) return sorted(segments, key=lambda x: x['segment_index']) def upload_video_file(self, source_path: str, filename: str = None, tags: List[str] = None) -> Dict: """上传单个视频文件并分割成片段""" if not os.path.exists(source_path): raise FileNotFoundError(f"Source file not found: {source_path}") if tags is None: tags = [] # 计算MD5 md5_hash = self._calculate_md5(source_path) # 检查是否已存在相同MD5的视频 existing = self.get_video_by_md5(md5_hash) if existing: logger.info(f"Video with MD5 {md5_hash} already exists") return { 'original_video': existing, 'segments': self.get_segments_by_video_id(existing['id']), 'is_duplicate': True } # 生成新的视频ID和文件名 video_id = str(uuid.uuid4()) if filename is None: filename = os.path.basename(source_path) # 获取文件扩展名 file_ext = os.path.splitext(filename)[1].lower() stored_filename = f"{video_id}{file_ext}" stored_path = self.video_storage_dir / stored_filename # 复制文件到存储目录 shutil.copy2(source_path, stored_path) # 获取视频基本信息 video_info = self._get_video_info(str(stored_path)) # 检测场景变化 scene_changes = self._detect_scene_changes(str(stored_path)) # 创建原始视频记录 now = datetime.now().isoformat() original_video = OriginalVideo( id=video_id, filename=filename, file_path=str(stored_path), md5_hash=md5_hash, file_size=video_info['file_size'], duration=video_info['duration'], width=video_info['width'], height=video_info['height'], fps=video_info['fps'], format=file_ext[1:] if file_ext else 'unknown', segment_count=len(scene_changes) - 1, tags=tags, created_at=now, updated_at=now ) # 分割视频成片段,传递标签给片段 segments = self._split_video_by_scenes(str(stored_path), scene_changes, video_id, tags) # 保存数据 self.original_videos.append(original_video) self.video_segments.extend(segments) self._save_original_videos() self._save_video_segments() logger.info(f"Uploaded video: {filename} (MD5: {md5_hash}, {len(segments)} segments)") return { 'original_video': asdict(original_video), 'segments': [asdict(segment) for segment in segments], 'is_duplicate': False } def batch_upload_video_files(self, source_directory: str, tags: List[str] = None) -> Dict: """批量上传视频文件""" if not os.path.exists(source_directory): raise FileNotFoundError(f"Source directory not found: {source_directory}") if tags is None: tags = [] # 支持的视频格式 video_extensions = {'.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.webm', '.m4v'} results = { 'total_files': 0, 'uploaded_files': 0, 'skipped_files': 0, 'failed_files': 0, 'total_segments': 0, 'uploaded_list': [], 'skipped_list': [], 'failed_list': [] } # 遍历目录中的所有文件 for root, dirs, files in os.walk(source_directory): for file in files: file_path = os.path.join(root, file) file_ext = os.path.splitext(file)[1].lower() # 检查是否为视频文件 if file_ext not in video_extensions: continue results['total_files'] += 1 try: # 尝试上传文件 result = self.upload_video_file(file_path, file, tags) if result['is_duplicate']: results['skipped_files'] += 1 results['skipped_list'].append({ 'filename': file, 'reason': 'Already exists (same MD5)' }) else: results['uploaded_files'] += 1 results['total_segments'] += len(result['segments']) results['uploaded_list'].append(result) except Exception as e: results['failed_files'] += 1 results['failed_list'].append({ 'filename': file, 'error': str(e) }) logger.error(f"Failed to upload {file}: {e}") logger.info(f"Batch upload completed: {results['uploaded_files']} uploaded, " f"{results['skipped_files']} skipped, {results['failed_files']} failed, " f"{results['total_segments']} total segments created") return results def add_segment_tags(self, segment_id: str, tags: List[str]) -> bool: """为视频片段添加标签""" for i, segment in enumerate(self.video_segments): if segment.id == segment_id and segment.is_active: # 合并标签,去重 existing_tags = set(segment.tags) new_tags = existing_tags.union(set(tags)) segment.tags = list(new_tags) segment.updated_at = datetime.now().isoformat() self.video_segments[i] = segment self._save_video_segments() logger.info(f"Added tags {tags} to segment {segment_id}") return True return False def remove_segment_tags(self, segment_id: str, tags: List[str]) -> bool: """移除视频片段的标签""" for i, segment in enumerate(self.video_segments): if segment.id == segment_id and segment.is_active: # 移除指定标签 existing_tags = set(segment.tags) remaining_tags = existing_tags - set(tags) segment.tags = list(remaining_tags) segment.updated_at = datetime.now().isoformat() self.video_segments[i] = segment self._save_video_segments() logger.info(f"Removed tags {tags} from segment {segment_id}") return True return False def increment_segment_usage(self, segment_id: str) -> bool: """增加视频片段使用次数""" for i, segment in enumerate(self.video_segments): if segment.id == segment_id and segment.is_active: segment.use_count += 1 segment.updated_at = datetime.now().isoformat() self.video_segments[i] = segment self._save_video_segments() logger.info(f"Incremented usage count for segment {segment_id} to {segment.use_count}") return True return False def get_segments_by_tags(self, tags: List[str], match_all: bool = False) -> List[Dict]: """根据标签搜索视频片段""" results = [] tag_set = set(tags) for segment in self.video_segments: if not segment.is_active: continue segment_tags = set(segment.tags) if match_all: # 必须包含所有标签 if tag_set.issubset(segment_tags): results.append(asdict(segment)) else: # 包含任意一个标签 if tag_set.intersection(segment_tags): results.append(asdict(segment)) return results def get_popular_segments(self, limit: int = 10) -> List[Dict]: """获取最常用的视频片段""" active_segments = [segment for segment in self.video_segments if segment.is_active] sorted_segments = sorted(active_segments, key=lambda x: x.use_count, reverse=True) return [asdict(segment) for segment in sorted_segments[:limit]] def search_segments(self, keyword: str) -> List[Dict]: """搜索视频片段""" keyword = keyword.lower() results = [] for segment in self.video_segments: if not segment.is_active: continue # 搜索文件名和标签 if (keyword in segment.filename.lower() or any(keyword in tag.lower() for tag in segment.tags)): results.append(asdict(segment)) return results def delete_segment(self, segment_id: str) -> bool: """删除视频片段""" for i, segment in enumerate(self.video_segments): if segment.id == segment_id: # 删除物理文件 try: if os.path.exists(segment.file_path): os.remove(segment.file_path) # 删除缩略图 if segment.thumbnail_path and os.path.exists(segment.thumbnail_path): os.remove(segment.thumbnail_path) except Exception as e: logger.error(f"Failed to delete physical files: {e}") # 从列表中移除 deleted_segment = self.video_segments.pop(i) self._save_video_segments() logger.info(f"Deleted segment: {segment_id} - {deleted_segment.filename}") return True return False def delete_original_video(self, video_id: str) -> bool: """删除原始视频及其所有片段""" # 先删除所有相关片段 segments_to_delete = [s for s in self.video_segments if s.original_video_id == video_id] for segment in segments_to_delete: self.delete_segment(segment.id) # 删除原始视频 for i, video in enumerate(self.original_videos): if video.id == video_id: # 删除物理文件 try: if os.path.exists(video.file_path): os.remove(video.file_path) except Exception as e: logger.error(f"Failed to delete video file: {e}") # 从列表中移除 deleted_video = self.original_videos.pop(i) self._save_original_videos() logger.info(f"Deleted original video: {video_id} - {deleted_video.filename}") return True return False # 全局实例 media_manager = MediaManager() def main(): """命令行接口 - 使用JSON-RPC协议""" import sys import json # 创建响应处理器 rpc = create_response_handler() if len(sys.argv) < 2: rpc.error("INVALID_REQUEST", "No command specified") return command = sys.argv[1] try: if command == "get_all_segments": segments = media_manager.get_all_segments() rpc.success(segments) elif command == "get_all_original_videos": videos = media_manager.get_all_original_videos() rpc.success(videos) elif command == "get_segments_by_video_id": if len(sys.argv) < 3: rpc.error("INVALID_REQUEST", "Video ID required") return video_id = sys.argv[2] segments = media_manager.get_segments_by_video_id(video_id) rpc.success(segments) elif command == "upload_video_file": if len(sys.argv) < 3: rpc.error("INVALID_REQUEST", "Source path required") return source_path = sys.argv[2] filename = sys.argv[3] if len(sys.argv) > 3 else None tags = json.loads(sys.argv[4]) if len(sys.argv) > 4 else [] result = media_manager.upload_video_file(source_path, filename, tags) rpc.success(result) elif command == "batch_upload_video_files": if len(sys.argv) < 3: rpc.error("INVALID_REQUEST", "Source directory required") return source_directory = sys.argv[2] tags = json.loads(sys.argv[3]) if len(sys.argv) > 3 else [] result = media_manager.batch_upload_video_files(source_directory, tags) rpc.success(result) elif command == "add_segment_tags": if len(sys.argv) < 4: rpc.error("INVALID_REQUEST", "Segment ID and tags required") return segment_id = sys.argv[2] tags = json.loads(sys.argv[3]) success = media_manager.add_segment_tags(segment_id, tags) rpc.success(success) elif command == "remove_segment_tags": if len(sys.argv) < 4: rpc.error("INVALID_REQUEST", "Segment ID and tags required") return segment_id = sys.argv[2] tags = json.loads(sys.argv[3]) success = media_manager.remove_segment_tags(segment_id, tags) rpc.success(success) elif command == "increment_segment_usage": if len(sys.argv) < 3: rpc.error("INVALID_REQUEST", "Segment ID required") return segment_id = sys.argv[2] success = media_manager.increment_segment_usage(segment_id) rpc.success(success) elif command == "get_segments_by_tags": if len(sys.argv) < 3: rpc.error("INVALID_REQUEST", "Tags required") return tags = json.loads(sys.argv[2]) match_all = json.loads(sys.argv[3]) if len(sys.argv) > 3 else False segments = media_manager.get_segments_by_tags(tags, match_all) rpc.success(segments) elif command == "get_popular_segments": limit = int(sys.argv[2]) if len(sys.argv) > 2 else 10 segments = media_manager.get_popular_segments(limit) rpc.success(segments) elif command == "search_segments": if len(sys.argv) < 3: rpc.error("INVALID_REQUEST", "Search keyword required") return keyword = sys.argv[2] results = media_manager.search_segments(keyword) rpc.success(results) elif command == "delete_segment": if len(sys.argv) < 3: rpc.error("INVALID_REQUEST", "Segment ID required") return segment_id = sys.argv[2] success = media_manager.delete_segment(segment_id) rpc.success(success) elif command == "delete_original_video": if len(sys.argv) < 3: rpc.error("INVALID_REQUEST", "Video ID required") return video_id = sys.argv[2] success = media_manager.delete_original_video(video_id) rpc.success(success) else: rpc.error("INVALID_REQUEST", f"Unknown command: {command}") except Exception as e: logger.error(f"Command execution failed: {e}") rpc.error("INTERNAL_ERROR", str(e)) if __name__ == "__main__": main()