mxivideo/python_core/services/template_manager_cloud.py

554 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Template Management Service (Cloud Version)
基于TemplateTable实现的云端模板管理服务
"""
import os
import json
import shutil
import uuid
from pathlib import Path
from typing import Dict, List, Any, Optional
from datetime import datetime
from ..utils.logger import setup_logger
from ..config import settings
from python_core.database.types import TemplateInfo, MaterialInfo
from python_core.database.template_postgres import template_table
logger = setup_logger(__name__)
class TemplateManagerCloud:
"""Cloud-based template management service using TemplateTable"""
def __init__(self, user_id: str = "default"):
self.user_id = user_id
self.template_table = template_table
self.templates_dir = settings.temp_dir / "templates"
self.templates_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"TemplateManagerCloud initialized for user: {user_id}")
def _determine_material_type(self, file_path: str) -> str:
"""根据文件扩展名确定素材类型"""
file_extension = Path(file_path).suffix.lower().lstrip('.')
# 视频格式
video_extensions = {
'mp4', 'avi', 'mov', 'mkv', 'wmv', 'flv', 'webm', 'm4v',
'3gp', 'ogv', 'ts', 'mts', 'm2ts', 'vob', 'asf', 'rm', 'rmvb'
}
# 音频格式
audio_extensions = {
'mp3', 'wav', 'aac', 'flac', 'ogg', 'wma', 'm4a', 'opus',
'aiff', 'au', 'ra', 'ac3', 'dts', 'ape'
}
# 图片格式
image_extensions = {
'jpg', 'jpeg', 'png', 'gif', 'bmp', 'tiff', 'tga', 'webp',
'svg', 'ico', 'psd', 'ai', 'eps', 'raw', 'cr2', 'nef'
}
# 文本格式
text_extensions = {
'txt', 'srt', 'ass', 'ssa', 'vtt', 'sub', 'idx', 'sup'
}
# 特效格式
effect_extensions = {
'fx', 'effect', 'json', 'xml', 'preset'
}
if file_extension in video_extensions:
return 'video'
elif file_extension in audio_extensions:
return 'audio'
elif file_extension in image_extensions:
return 'image'
elif file_extension in text_extensions:
return 'text'
elif file_extension in effect_extensions:
return 'effect'
else:
return 'video' # 默认为视频类型
def batch_import_templates(self, source_folder: str, progress_callback=None) -> Dict[str, Any]:
"""
批量导入模板到云端存储
Args:
source_folder: 包含模板子文件夹的源文件夹
progress_callback: 进度回调函数
Returns:
导入结果,包含成功/失败计数和详情
"""
result = {
'status': True,
'msg': '',
'imported_count': 0,
'failed_count': 0,
'imported_templates': [],
'failed_templates': []
}
try:
source_path = Path(source_folder)
if not source_path.exists():
result['status'] = False
result['msg'] = f"Source folder does not exist: {source_folder}"
return result
# 获取所有子文件夹
template_folders = [d for d in source_path.iterdir() if d.is_dir()]
total_templates = len(template_folders)
if total_templates == 0:
result['msg'] = "No template folders found"
return result
logger.info(f"Found {total_templates} template folders to import")
for i, template_path in enumerate(template_folders):
try:
if progress_callback:
progress_callback(i, total_templates, f"Processing {template_path.name}")
# 处理单个模板
template_info = self._process_single_template(template_path)
if template_info:
# 保存到云端存储
template_id = self.template_table.create_template(template_info)
result['imported_count'] += 1
result['imported_templates'].append({
'id': template_id,
'name': template_info.name,
'path': str(template_path)
})
logger.info(f"Successfully imported template: {template_info.name}")
except Exception as e:
logger.error(f"Failed to import template {template_path.name}: {e}")
result['failed_count'] += 1
result['failed_templates'].append({
'name': template_path.name,
'path': str(template_path),
'error': str(e)
})
if progress_callback:
progress_callback(total_templates, total_templates, "Import completed")
result['msg'] = f"Import completed: {result['imported_count']} success, {result['failed_count']} failed"
logger.info(result['msg'])
except Exception as e:
logger.error(f"Batch import failed: {e}")
result['status'] = False
result['msg'] = f"Batch import failed: {str(e)}"
return result
def _process_single_template(self, template_path: Path) -> Optional[TemplateInfo]:
"""
处理单个模板文件夹
Args:
template_path: 模板文件夹路径
Returns:
TemplateInfo对象如果处理失败返回None
"""
try:
template_name = template_path.name
# 查找draft_content.json文件
draft_file = template_path / "draft_content.json"
if not draft_file.exists():
logger.warning(f"No draft_content.json found in {template_path}")
return None
# 读取draft content
with open(draft_file, 'r', encoding='utf-8') as f:
draft_content = json.load(f)
# 生成唯一ID
template_id = str(uuid.uuid4())
# 创建模板目录
template_dir = self.templates_dir / template_id
template_dir.mkdir(parents=True, exist_ok=True)
# 创建资源目录
resources_dir = template_dir / "resources"
resources_dir.mkdir(exist_ok=True)
# 处理素材并复制资源
materials = self._process_materials(draft_content, template_path, resources_dir)
# 更新draft content中的路径
updated_draft = self._update_draft_paths(draft_content, materials)
# 保存更新后的draft content
draft_target = template_dir / "draft_content.json"
with open(draft_target, 'w', encoding='utf-8') as f:
json.dump(updated_draft, f, ensure_ascii=False, indent=2)
# 创建模板信息
template_info = TemplateInfo(
id=template_id,
name=template_name,
description=f"Imported template: {template_name}",
thumbnail_path="", # TODO: 生成缩略图
draft_content_path=str(draft_target),
resources_path=str(resources_dir),
created_at=datetime.now().isoformat(),
updated_at=datetime.now().isoformat(),
canvas_config=draft_content.get('canvas_config', {}),
duration=draft_content.get('duration', 0),
material_count=len(materials),
track_count=len(draft_content.get('tracks', [])),
tags=[],
is_cloud=False, # 用户导入的模板默认为本地
user_id=self.user_id,
draft_content=updated_draft # 设置 draft_content
)
logger.info(f"Successfully processed template: {template_name} -> {template_id}")
return template_info
except Exception as e:
logger.error(f"Failed to process template {template_path}: {e}")
return None
def _process_materials(self, draft_content: Dict, template_path: Path, resources_dir: Path) -> List[MaterialInfo]:
"""处理模板中的素材文件"""
materials = []
try:
# 从draft content中提取素材信息
tracks = draft_content.get('tracks', [])
for track in tracks:
segments = track.get('segments', [])
for segment in segments:
material_path = segment.get('material_path', '')
if material_path:
# 构建完整路径
source_file = template_path / material_path
if source_file.exists():
# 生成新的文件名
material_id = str(uuid.uuid4())
file_extension = source_file.suffix
new_filename = f"{material_id}{file_extension}"
target_file = resources_dir / new_filename
# 复制文件
shutil.copy2(source_file, target_file)
# 创建素材信息
material_info = MaterialInfo(
id=material_id,
material_id=material_id,
name=source_file.name,
type=self._determine_material_type(str(source_file)),
original_path=str(source_file),
relative_path=new_filename,
duration=segment.get('duration'),
size=source_file.stat().st_size if source_file.exists() else None,
is_cloud=False,
user_id=self.user_id
)
materials.append(material_info)
# 更新segment中的路径
segment['material_path'] = new_filename
segment['material_id'] = material_id
except Exception as e:
logger.error(f"Failed to process materials: {e}")
return materials
def _update_draft_paths(self, draft_content: Dict, materials: List[MaterialInfo]) -> Dict:
"""更新draft content中的路径为相对路径"""
# 创建路径映射
path_mapping = {}
for material in materials:
path_mapping[material.original_path] = material.relative_path
# 这里可以根据需要更新draft_content中的路径
# 当前实现已在_process_materials中更新了路径
return draft_content
def get_templates(self, include_cloud: bool = True) -> List[TemplateInfo]:
"""
获取模板列表
Args:
include_cloud: 是否包含云端公共模板
Returns:
模板列表
"""
try:
templates = self.template_table.get_templates_by_user(
self.user_id,
include_cloud=include_cloud
)
logger.info(f"Retrieved {len(templates)} templates for user {self.user_id}")
return templates
except Exception as e:
logger.error(f"Failed to get templates: {e}")
return []
def get_template(self, template_id: str) -> Optional[TemplateInfo]:
"""
获取特定模板
Args:
template_id: 模板ID
Returns:
模板信息如果不存在返回None
"""
try:
template = self.template_table.get_template_by_id(template_id)
if template:
logger.info(f"Retrieved template: {template.name}")
else:
logger.warning(f"Template not found: {template_id}")
return template
except Exception as e:
logger.error(f"Failed to get template {template_id}: {e}")
return None
def delete_template(self, template_id: str) -> bool:
"""
删除模板
Args:
template_id: 模板ID
Returns:
删除成功返回True
"""
try:
# 获取模板信息以进行权限检查
template = self.template_table.get_template_by_id(template_id)
if not template:
logger.warning(f"Template not found for deletion: {template_id}")
return False
# 检查权限(只能删除自己的模板)
if template.user_id != self.user_id:
logger.warning(f"Permission denied: user {self.user_id} cannot delete template {template_id}")
return False
# 删除本地文件
template_dir = self.templates_dir / template_id
if template_dir.exists():
shutil.rmtree(template_dir)
logger.info(f"Deleted local template directory: {template_dir}")
# 从云端存储删除
result = self.template_table.delete_template(template_id)
if result:
logger.info(f"Successfully deleted template: {template_id}")
return result
except Exception as e:
logger.error(f"Failed to delete template {template_id}: {e}")
return False
def search_templates(self, query: str, filters: Optional[Dict[str, Any]] = None) -> List[TemplateInfo]:
"""
搜索模板
Args:
query: 搜索关键词
filters: 搜索过滤器(暂未实现)
Returns:
匹配的模板列表
"""
try:
templates = self.template_table.search_templates(
query,
user_id=self.user_id,
include_cloud=True
)
logger.info(f"Search '{query}' found {len(templates)} templates")
return templates
except Exception as e:
logger.error(f"Failed to search templates with query '{query}': {e}")
return []
def get_templates_by_tag(self, tag: str) -> List[TemplateInfo]:
"""
根据标签获取模板
Args:
tag: 标签名称
Returns:
匹配的模板列表
"""
try:
templates = self.template_table.get_templates_by_tag(
tag,
user_id=self.user_id,
include_cloud=True
)
logger.info(f"Tag '{tag}' found {len(templates)} templates")
return templates
except Exception as e:
logger.error(f"Failed to get templates by tag '{tag}': {e}")
return []
def update_template(self, template_id: str, updates: Dict[str, Any]) -> bool:
"""
更新模板信息
Args:
template_id: 模板ID
updates: 要更新的字段
Returns:
更新成功返回True
"""
try:
# 检查权限
template = self.template_table.get_template_by_id(template_id)
if not template or template.user_id != self.user_id:
logger.warning(f"Permission denied or template not found: {template_id}")
return False
result = self.template_table.update_template(template_id, updates)
if result:
logger.info(f"Successfully updated template: {template_id}")
return result
except Exception as e:
logger.error(f"Failed to update template {template_id}: {e}")
return False
def get_template_count(self, include_cloud: bool = True) -> int:
"""
获取模板数量
Args:
include_cloud: 是否包含云端公共模板
Returns:
模板数量
"""
try:
count = self.template_table.get_template_count(
user_id=self.user_id,
include_cloud=include_cloud
)
logger.info(f"Template count for user {self.user_id}: {count}")
return count
except Exception as e:
logger.error(f"Failed to get template count: {e}")
return 0
def get_popular_tags(self, limit: int = 20) -> List[Dict[str, Any]]:
"""
获取热门标签
Args:
limit: 最大返回数量
Returns:
标签列表,包含标签名称和使用次数
"""
try:
tags = self.template_table.get_popular_tags(
user_id=self.user_id,
limit=limit
)
logger.info(f"Retrieved {len(tags)} popular tags")
return tags
except Exception as e:
logger.error(f"Failed to get popular tags: {e}")
return []
def create_template_from_draft(self, name: str, description: str, draft_content: Dict[str, Any],
tags: List[str] = None, is_cloud: bool = False) -> Optional[str]:
"""
从draft内容创建模板
Args:
name: 模板名称
description: 模板描述
draft_content: draft内容
tags: 标签列表
is_cloud: 是否为云端公共模板
Returns:
模板ID如果创建失败返回None
"""
try:
template_id = str(uuid.uuid4())
# 创建模板目录
template_dir = self.templates_dir / template_id
template_dir.mkdir(parents=True, exist_ok=True)
# 保存draft content
draft_target = template_dir / "draft_content.json"
with open(draft_target, 'w', encoding='utf-8') as f:
json.dump(draft_content, f, ensure_ascii=False, indent=2)
# 创建模板信息
template_info = TemplateInfo(
id=template_id,
name=name,
description=description,
thumbnail_path="",
draft_content_path=str(draft_target),
resources_path=str(template_dir / "resources"),
created_at=datetime.now().isoformat(),
updated_at=datetime.now().isoformat(),
canvas_config=draft_content.get('canvas_config', {}),
duration=draft_content.get('duration', 0),
material_count=len(draft_content.get('materials', [])),
track_count=len(draft_content.get('tracks', [])),
tags=tags or [],
is_cloud=is_cloud,
user_id=self.user_id,
draft_content=draft_content # 设置 draft_content
)
# 保存到云端存储
created_id = self.template_table.create_template(template_info)
logger.info(f"Successfully created template from draft: {name} -> {created_id}")
return created_id
except Exception as e:
logger.error(f"Failed to create template from draft: {e}")
return None
# 创建全局实例工厂函数
def create_template_manager_cloud(user_id: str = "default") -> TemplateManagerCloud:
"""创建TemplateManagerCloud实例"""
return TemplateManagerCloud(user_id=user_id)