mxivideo/python_core/services/media_manager.py

834 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
媒体库管理服务
管理视频素材,包括上传、转场镜头分割、标签管理等
"""
import json
import uuid
import os
import hashlib
import shutil
from pathlib import Path
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass, asdict
from datetime import datetime
from python_core.config import settings
from python_core.utils.logger import logger
from python_core.utils.jsonrpc import create_response_handler
# 视频处理库
try:
import cv2
import numpy as np
VIDEO_LIBS_AVAILABLE = True
except ImportError:
logger.warning("Video processing libraries not available. Install opencv-python for full functionality.")
VIDEO_LIBS_AVAILABLE = False
@dataclass
class VideoSegment:
"""视频片段数据结构"""
id: str
original_video_id: str
segment_index: int
filename: str
file_path: str
md5_hash: str
file_size: int
duration: float
width: int
height: int
fps: float
format: str
start_time: float
end_time: float
tags: List[str]
use_count: int
thumbnail_path: Optional[str] = None
scene_score: Optional[float] = None # 场景变化分数
motion_score: Optional[float] = None # 运动强度分数
brightness: Optional[float] = None
contrast: Optional[float] = None
created_at: str = ""
updated_at: str = ""
is_active: bool = True
@dataclass
class OriginalVideo:
"""原始视频数据结构"""
id: str
filename: str
file_path: str
md5_hash: str
file_size: int
duration: float
width: int
height: int
fps: float
format: str
segment_count: int
tags: List[str]
created_at: str = ""
updated_at: str = ""
is_active: bool = True
class MediaManager:
"""媒体库管理器"""
def __init__(self):
self.cache_dir = settings.temp_dir / "cache"
self.cache_dir.mkdir(parents=True, exist_ok=True)
# 数据文件
self.segments_file = self.cache_dir / "video_segments.json"
self.original_videos_file = self.cache_dir / "original_videos.json"
# 加载数据
self.video_segments = self._load_video_segments()
self.original_videos = self._load_original_videos()
# 存储目录
self.video_storage_dir = settings.temp_dir / "video_storage"
self.video_storage_dir.mkdir(parents=True, exist_ok=True)
self.segments_dir = settings.temp_dir / "video_segments"
self.segments_dir.mkdir(parents=True, exist_ok=True)
self.thumbnails_dir = settings.temp_dir / "video_thumbnails"
self.thumbnails_dir.mkdir(parents=True, exist_ok=True)
def _load_video_segments(self) -> List[VideoSegment]:
"""加载视频片段数据"""
if self.segments_file.exists():
try:
with open(self.segments_file, 'r', encoding='utf-8') as f:
data = json.load(f)
return [VideoSegment(**item) for item in data]
except Exception as e:
logger.error(f"Failed to load video segments: {e}")
return []
else:
return []
def _load_original_videos(self) -> List[OriginalVideo]:
"""加载原始视频数据"""
if self.original_videos_file.exists():
try:
with open(self.original_videos_file, 'r', encoding='utf-8') as f:
data = json.load(f)
return [OriginalVideo(**item) for item in data]
except Exception as e:
logger.error(f"Failed to load original videos: {e}")
return []
else:
return []
def _save_video_segments(self, segments: List[VideoSegment] = None):
"""保存视频片段数据"""
if segments is None:
segments = self.video_segments
try:
data = [asdict(segment) for segment in segments]
with open(self.segments_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"Video segments saved to {self.segments_file}")
except Exception as e:
logger.error(f"Failed to save video segments: {e}")
raise
def _save_original_videos(self, videos: List[OriginalVideo] = None):
"""保存原始视频数据"""
if videos is None:
videos = self.original_videos
try:
data = [asdict(video) for video in videos]
with open(self.original_videos_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"Original videos saved to {self.original_videos_file}")
except Exception as e:
logger.error(f"Failed to save original videos: {e}")
raise
def _calculate_md5(self, file_path: str) -> str:
"""计算文件MD5哈希值"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def _get_video_info(self, file_path: str) -> Dict:
"""获取视频基本信息"""
if not VIDEO_LIBS_AVAILABLE:
# 如果没有视频处理库,返回基本信息
file_size = os.path.getsize(file_path)
return {
'duration': 0.0,
'width': 0,
'height': 0,
'fps': 0.0,
'frame_count': 0,
'file_size': file_size
}
try:
cap = cv2.VideoCapture(file_path)
# 获取视频属性
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
duration = frame_count / fps if fps > 0 else 0
cap.release()
return {
'duration': duration,
'width': width,
'height': height,
'fps': fps,
'frame_count': frame_count,
'file_size': os.path.getsize(file_path)
}
except Exception as e:
logger.error(f"Failed to get video info: {e}")
file_size = os.path.getsize(file_path)
return {
'duration': 0.0,
'width': 0,
'height': 0,
'fps': 0.0,
'frame_count': 0,
'file_size': file_size
}
def _detect_scene_changes(self, file_path: str, threshold: float = 30.0) -> List[float]:
"""检测场景变化点(转场镜头)"""
if not VIDEO_LIBS_AVAILABLE:
logger.warning("Video processing not available, returning empty scene changes")
return []
try:
cap = cv2.VideoCapture(file_path)
fps = cap.get(cv2.CAP_PROP_FPS)
scene_changes = [0.0] # 开始时间
prev_frame = None
frame_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
# 转换为灰度图
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if prev_frame is not None:
# 计算帧间差异
diff = cv2.absdiff(prev_frame, gray)
mean_diff = np.mean(diff)
# 如果差异超过阈值,认为是场景变化
if mean_diff > threshold:
timestamp = frame_count / fps
scene_changes.append(timestamp)
logger.debug(f"Scene change detected at {timestamp:.2f}s (diff: {mean_diff:.2f})")
prev_frame = gray
frame_count += 1
cap.release()
# 添加结束时间
duration = frame_count / fps if fps > 0 else 0
if duration > 0:
scene_changes.append(duration)
logger.info(f"Detected {len(scene_changes)-1} scene changes in video")
return scene_changes
except Exception as e:
logger.error(f"Failed to detect scene changes: {e}")
return [0.0]
def _generate_thumbnail(self, video_path: str, timestamp: float, output_path: str) -> bool:
"""生成视频缩略图"""
if not VIDEO_LIBS_AVAILABLE:
return False
def _split_video_by_scenes(self, video_path: str, scene_changes: List[float],
original_video_id: str) -> List[VideoSegment]:
"""根据场景变化分割视频"""
if not VIDEO_LIBS_AVAILABLE:
logger.warning("Video processing not available, creating single segment")
# 创建单个片段
video_info = self._get_video_info(video_path)
segment_id = str(uuid.uuid4())
segment_filename = f"{segment_id}.mp4"
segment_path = self.segments_dir / segment_filename
# 复制整个视频作为单个片段
shutil.copy2(video_path, segment_path)
now = datetime.now().isoformat()
segment = VideoSegment(
id=segment_id,
original_video_id=original_video_id,
segment_index=0,
filename=segment_filename,
file_path=str(segment_path),
md5_hash=self._calculate_md5(str(segment_path)),
file_size=video_info['file_size'],
duration=video_info['duration'],
width=video_info['width'],
height=video_info['height'],
fps=video_info['fps'],
format='mp4',
start_time=0.0,
end_time=video_info['duration'],
tags=[],
use_count=0,
created_at=now,
updated_at=now
)
return [segment]
segments = []
try:
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# 设置视频编码器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
for i in range(len(scene_changes) - 1):
start_time = scene_changes[i]
end_time = scene_changes[i + 1]
duration = end_time - start_time
# 跳过太短的片段小于1秒
if duration < 1.0:
continue
segment_id = str(uuid.uuid4())
segment_filename = f"{segment_id}.mp4"
segment_path = self.segments_dir / segment_filename
# 创建视频写入器
out = cv2.VideoWriter(str(segment_path), fourcc, fps, (width, height))
# 跳转到开始帧
start_frame = int(start_time * fps)
end_frame = int(end_time * fps)
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame)
frame_count = 0
while frame_count < (end_frame - start_frame):
ret, frame = cap.read()
if not ret:
break
out.write(frame)
frame_count += 1
out.release()
# 生成缩略图
thumbnail_filename = f"{segment_id}_thumb.jpg"
thumbnail_path = self.thumbnails_dir / thumbnail_filename
thumbnail_generated = self._generate_thumbnail(
str(segment_path),
duration / 2, # 中间帧作为缩略图
str(thumbnail_path)
)
# 创建片段记录
now = datetime.now().isoformat()
segment = VideoSegment(
id=segment_id,
original_video_id=original_video_id,
segment_index=i,
filename=segment_filename,
file_path=str(segment_path),
md5_hash=self._calculate_md5(str(segment_path)),
file_size=os.path.getsize(segment_path),
duration=duration,
width=width,
height=height,
fps=fps,
format='mp4',
start_time=start_time,
end_time=end_time,
tags=[],
use_count=0,
thumbnail_path=str(thumbnail_path) if thumbnail_generated else None,
created_at=now,
updated_at=now
)
segments.append(segment)
logger.info(f"Created segment {i}: {start_time:.2f}s - {end_time:.2f}s ({duration:.2f}s)")
cap.release()
except Exception as e:
logger.error(f"Failed to split video: {e}")
# 如果分割失败,创建单个片段
return self._split_video_by_scenes(video_path, [0.0, self._get_video_info(video_path)['duration']], original_video_id)
return segments
def get_video_by_md5(self, md5_hash: str) -> Optional[Dict]:
"""根据MD5获取原始视频"""
for video in self.original_videos:
if video.md5_hash == md5_hash and video.is_active:
return asdict(video)
return None
def get_segment_by_md5(self, md5_hash: str) -> Optional[Dict]:
"""根据MD5获取视频片段"""
for segment in self.video_segments:
if segment.md5_hash == md5_hash and segment.is_active:
return asdict(segment)
return None
def get_all_segments(self) -> List[Dict]:
"""获取所有视频片段"""
return [asdict(segment) for segment in self.video_segments if segment.is_active]
def get_all_original_videos(self) -> List[Dict]:
"""获取所有原始视频"""
return [asdict(video) for video in self.original_videos if video.is_active]
def get_segments_by_video_id(self, video_id: str) -> List[Dict]:
"""获取指定原始视频的所有片段"""
segments = []
for segment in self.video_segments:
if segment.original_video_id == video_id and segment.is_active:
segments.append(asdict(segment))
return sorted(segments, key=lambda x: x['segment_index'])
def upload_video_file(self, source_path: str, filename: str = None, tags: List[str] = None) -> Dict:
"""上传单个视频文件并分割成片段"""
if not os.path.exists(source_path):
raise FileNotFoundError(f"Source file not found: {source_path}")
if tags is None:
tags = []
# 计算MD5
md5_hash = self._calculate_md5(source_path)
# 检查是否已存在相同MD5的视频
existing = self.get_video_by_md5(md5_hash)
if existing:
logger.info(f"Video with MD5 {md5_hash} already exists")
return {
'original_video': existing,
'segments': self.get_segments_by_video_id(existing['id']),
'is_duplicate': True
}
# 生成新的视频ID和文件名
video_id = str(uuid.uuid4())
if filename is None:
filename = os.path.basename(source_path)
# 获取文件扩展名
file_ext = os.path.splitext(filename)[1].lower()
stored_filename = f"{video_id}{file_ext}"
stored_path = self.video_storage_dir / stored_filename
# 复制文件到存储目录
shutil.copy2(source_path, stored_path)
# 获取视频基本信息
video_info = self._get_video_info(str(stored_path))
# 检测场景变化
scene_changes = self._detect_scene_changes(str(stored_path))
# 创建原始视频记录
now = datetime.now().isoformat()
original_video = OriginalVideo(
id=video_id,
filename=filename,
file_path=str(stored_path),
md5_hash=md5_hash,
file_size=video_info['file_size'],
duration=video_info['duration'],
width=video_info['width'],
height=video_info['height'],
fps=video_info['fps'],
format=file_ext[1:] if file_ext else 'unknown',
segment_count=len(scene_changes) - 1,
tags=tags,
created_at=now,
updated_at=now
)
# 分割视频成片段
segments = self._split_video_by_scenes(str(stored_path), scene_changes, video_id)
# 保存数据
self.original_videos.append(original_video)
self.video_segments.extend(segments)
self._save_original_videos()
self._save_video_segments()
logger.info(f"Uploaded video: {filename} (MD5: {md5_hash}, {len(segments)} segments)")
return {
'original_video': asdict(original_video),
'segments': [asdict(segment) for segment in segments],
'is_duplicate': False
}
def batch_upload_video_files(self, source_directory: str, tags: List[str] = None) -> Dict:
"""批量上传视频文件"""
if not os.path.exists(source_directory):
raise FileNotFoundError(f"Source directory not found: {source_directory}")
if tags is None:
tags = []
# 支持的视频格式
video_extensions = {'.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.webm', '.m4v'}
results = {
'total_files': 0,
'uploaded_files': 0,
'skipped_files': 0,
'failed_files': 0,
'total_segments': 0,
'uploaded_list': [],
'skipped_list': [],
'failed_list': []
}
# 遍历目录中的所有文件
for root, dirs, files in os.walk(source_directory):
for file in files:
file_path = os.path.join(root, file)
file_ext = os.path.splitext(file)[1].lower()
# 检查是否为视频文件
if file_ext not in video_extensions:
continue
results['total_files'] += 1
try:
# 尝试上传文件
result = self.upload_video_file(file_path, file, tags)
if result['is_duplicate']:
results['skipped_files'] += 1
results['skipped_list'].append({
'filename': file,
'reason': 'Already exists (same MD5)'
})
else:
results['uploaded_files'] += 1
results['total_segments'] += len(result['segments'])
results['uploaded_list'].append(result)
except Exception as e:
results['failed_files'] += 1
results['failed_list'].append({
'filename': file,
'error': str(e)
})
logger.error(f"Failed to upload {file}: {e}")
logger.info(f"Batch upload completed: {results['uploaded_files']} uploaded, "
f"{results['skipped_files']} skipped, {results['failed_files']} failed, "
f"{results['total_segments']} total segments created")
return results
def add_segment_tags(self, segment_id: str, tags: List[str]) -> bool:
"""为视频片段添加标签"""
for i, segment in enumerate(self.video_segments):
if segment.id == segment_id and segment.is_active:
# 合并标签,去重
existing_tags = set(segment.tags)
new_tags = existing_tags.union(set(tags))
segment.tags = list(new_tags)
segment.updated_at = datetime.now().isoformat()
self.video_segments[i] = segment
self._save_video_segments()
logger.info(f"Added tags {tags} to segment {segment_id}")
return True
return False
def remove_segment_tags(self, segment_id: str, tags: List[str]) -> bool:
"""移除视频片段的标签"""
for i, segment in enumerate(self.video_segments):
if segment.id == segment_id and segment.is_active:
# 移除指定标签
existing_tags = set(segment.tags)
remaining_tags = existing_tags - set(tags)
segment.tags = list(remaining_tags)
segment.updated_at = datetime.now().isoformat()
self.video_segments[i] = segment
self._save_video_segments()
logger.info(f"Removed tags {tags} from segment {segment_id}")
return True
return False
def increment_segment_usage(self, segment_id: str) -> bool:
"""增加视频片段使用次数"""
for i, segment in enumerate(self.video_segments):
if segment.id == segment_id and segment.is_active:
segment.use_count += 1
segment.updated_at = datetime.now().isoformat()
self.video_segments[i] = segment
self._save_video_segments()
logger.info(f"Incremented usage count for segment {segment_id} to {segment.use_count}")
return True
return False
def get_segments_by_tags(self, tags: List[str], match_all: bool = False) -> List[Dict]:
"""根据标签搜索视频片段"""
results = []
tag_set = set(tags)
for segment in self.video_segments:
if not segment.is_active:
continue
segment_tags = set(segment.tags)
if match_all:
# 必须包含所有标签
if tag_set.issubset(segment_tags):
results.append(asdict(segment))
else:
# 包含任意一个标签
if tag_set.intersection(segment_tags):
results.append(asdict(segment))
return results
def get_popular_segments(self, limit: int = 10) -> List[Dict]:
"""获取最常用的视频片段"""
active_segments = [segment for segment in self.video_segments if segment.is_active]
sorted_segments = sorted(active_segments, key=lambda x: x.use_count, reverse=True)
return [asdict(segment) for segment in sorted_segments[:limit]]
def search_segments(self, keyword: str) -> List[Dict]:
"""搜索视频片段"""
keyword = keyword.lower()
results = []
for segment in self.video_segments:
if not segment.is_active:
continue
# 搜索文件名和标签
if (keyword in segment.filename.lower() or
any(keyword in tag.lower() for tag in segment.tags)):
results.append(asdict(segment))
return results
def delete_segment(self, segment_id: str) -> bool:
"""删除视频片段"""
for i, segment in enumerate(self.video_segments):
if segment.id == segment_id:
# 删除物理文件
try:
if os.path.exists(segment.file_path):
os.remove(segment.file_path)
# 删除缩略图
if segment.thumbnail_path and os.path.exists(segment.thumbnail_path):
os.remove(segment.thumbnail_path)
except Exception as e:
logger.error(f"Failed to delete physical files: {e}")
# 从列表中移除
deleted_segment = self.video_segments.pop(i)
self._save_video_segments()
logger.info(f"Deleted segment: {segment_id} - {deleted_segment.filename}")
return True
return False
def delete_original_video(self, video_id: str) -> bool:
"""删除原始视频及其所有片段"""
# 先删除所有相关片段
segments_to_delete = [s for s in self.video_segments if s.original_video_id == video_id]
for segment in segments_to_delete:
self.delete_segment(segment.id)
# 删除原始视频
for i, video in enumerate(self.original_videos):
if video.id == video_id:
# 删除物理文件
try:
if os.path.exists(video.file_path):
os.remove(video.file_path)
except Exception as e:
logger.error(f"Failed to delete video file: {e}")
# 从列表中移除
deleted_video = self.original_videos.pop(i)
self._save_original_videos()
logger.info(f"Deleted original video: {video_id} - {deleted_video.filename}")
return True
return False
# 全局实例
media_manager = MediaManager()
def main():
"""命令行接口 - 使用JSON-RPC协议"""
import sys
import json
# 创建响应处理器
rpc = create_response_handler()
if len(sys.argv) < 2:
rpc.error("INVALID_REQUEST", "No command specified")
return
command = sys.argv[1]
try:
if command == "get_all_segments":
segments = media_manager.get_all_segments()
rpc.success(segments)
elif command == "get_all_original_videos":
videos = media_manager.get_all_original_videos()
rpc.success(videos)
elif command == "get_segments_by_video_id":
if len(sys.argv) < 3:
rpc.error("INVALID_REQUEST", "Video ID required")
return
video_id = sys.argv[2]
segments = media_manager.get_segments_by_video_id(video_id)
rpc.success(segments)
elif command == "upload_video_file":
if len(sys.argv) < 3:
rpc.error("INVALID_REQUEST", "Source path required")
return
source_path = sys.argv[2]
filename = sys.argv[3] if len(sys.argv) > 3 else None
tags = json.loads(sys.argv[4]) if len(sys.argv) > 4 else []
result = media_manager.upload_video_file(source_path, filename, tags)
rpc.success(result)
elif command == "batch_upload_video_files":
if len(sys.argv) < 3:
rpc.error("INVALID_REQUEST", "Source directory required")
return
source_directory = sys.argv[2]
tags = json.loads(sys.argv[3]) if len(sys.argv) > 3 else []
result = media_manager.batch_upload_video_files(source_directory, tags)
rpc.success(result)
elif command == "add_segment_tags":
if len(sys.argv) < 4:
rpc.error("INVALID_REQUEST", "Segment ID and tags required")
return
segment_id = sys.argv[2]
tags = json.loads(sys.argv[3])
success = media_manager.add_segment_tags(segment_id, tags)
rpc.success(success)
elif command == "remove_segment_tags":
if len(sys.argv) < 4:
rpc.error("INVALID_REQUEST", "Segment ID and tags required")
return
segment_id = sys.argv[2]
tags = json.loads(sys.argv[3])
success = media_manager.remove_segment_tags(segment_id, tags)
rpc.success(success)
elif command == "increment_segment_usage":
if len(sys.argv) < 3:
rpc.error("INVALID_REQUEST", "Segment ID required")
return
segment_id = sys.argv[2]
success = media_manager.increment_segment_usage(segment_id)
rpc.success(success)
elif command == "get_segments_by_tags":
if len(sys.argv) < 3:
rpc.error("INVALID_REQUEST", "Tags required")
return
tags = json.loads(sys.argv[2])
match_all = json.loads(sys.argv[3]) if len(sys.argv) > 3 else False
segments = media_manager.get_segments_by_tags(tags, match_all)
rpc.success(segments)
elif command == "get_popular_segments":
limit = int(sys.argv[2]) if len(sys.argv) > 2 else 10
segments = media_manager.get_popular_segments(limit)
rpc.success(segments)
elif command == "search_segments":
if len(sys.argv) < 3:
rpc.error("INVALID_REQUEST", "Search keyword required")
return
keyword = sys.argv[2]
results = media_manager.search_segments(keyword)
rpc.success(results)
elif command == "delete_segment":
if len(sys.argv) < 3:
rpc.error("INVALID_REQUEST", "Segment ID required")
return
segment_id = sys.argv[2]
success = media_manager.delete_segment(segment_id)
rpc.success(success)
elif command == "delete_original_video":
if len(sys.argv) < 3:
rpc.error("INVALID_REQUEST", "Video ID required")
return
video_id = sys.argv[2]
success = media_manager.delete_original_video(video_id)
rpc.success(success)
else:
rpc.error("INVALID_REQUEST", f"Unknown command: {command}")
except Exception as e:
logger.error(f"Command execution failed: {e}")
rpc.error("INTERNAL_ERROR", str(e))
if __name__ == "__main__":
main()