552 lines
20 KiB
Python
552 lines
20 KiB
Python
"""
|
||
Template Management Service (Cloud Version)
|
||
基于TemplateTable实现的云端模板管理服务
|
||
"""
|
||
|
||
import os
|
||
import json
|
||
import shutil
|
||
import uuid
|
||
from pathlib import Path
|
||
from typing import Dict, List, Any, Optional
|
||
from datetime import datetime
|
||
|
||
from ..utils.logger import setup_logger
|
||
from ..config import settings
|
||
from python_core.database.types import TemplateInfo, MaterialInfo
|
||
from python_core.database.template_postgres import template_table
|
||
|
||
logger = setup_logger(__name__)
|
||
|
||
class TemplateManagerCloud:
|
||
"""Cloud-based template management service using TemplateTable"""
|
||
|
||
def __init__(self, user_id: str = "default"):
|
||
self.user_id = user_id
|
||
self.template_table = template_table
|
||
self.templates_dir = settings.temp_dir / "templates"
|
||
self.templates_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
logger.info(f"TemplateManagerCloud initialized for user: {user_id}")
|
||
|
||
def _determine_material_type(self, file_path: str) -> str:
|
||
"""根据文件扩展名确定素材类型"""
|
||
file_extension = Path(file_path).suffix.lower().lstrip('.')
|
||
|
||
# 视频格式
|
||
video_extensions = {
|
||
'mp4', 'avi', 'mov', 'mkv', 'wmv', 'flv', 'webm', 'm4v',
|
||
'3gp', 'ogv', 'ts', 'mts', 'm2ts', 'vob', 'asf', 'rm', 'rmvb'
|
||
}
|
||
|
||
# 音频格式
|
||
audio_extensions = {
|
||
'mp3', 'wav', 'aac', 'flac', 'ogg', 'wma', 'm4a', 'opus',
|
||
'aiff', 'au', 'ra', 'ac3', 'dts', 'ape'
|
||
}
|
||
|
||
# 图片格式
|
||
image_extensions = {
|
||
'jpg', 'jpeg', 'png', 'gif', 'bmp', 'tiff', 'tga', 'webp',
|
||
'svg', 'ico', 'psd', 'ai', 'eps', 'raw', 'cr2', 'nef'
|
||
}
|
||
|
||
# 文本格式
|
||
text_extensions = {
|
||
'txt', 'srt', 'ass', 'ssa', 'vtt', 'sub', 'idx', 'sup'
|
||
}
|
||
|
||
# 特效格式
|
||
effect_extensions = {
|
||
'fx', 'effect', 'json', 'xml', 'preset'
|
||
}
|
||
|
||
if file_extension in video_extensions:
|
||
return 'video'
|
||
elif file_extension in audio_extensions:
|
||
return 'audio'
|
||
elif file_extension in image_extensions:
|
||
return 'image'
|
||
elif file_extension in text_extensions:
|
||
return 'text'
|
||
elif file_extension in effect_extensions:
|
||
return 'effect'
|
||
else:
|
||
return 'video' # 默认为视频类型
|
||
|
||
def batch_import_templates(self, source_folder: str, progress_callback=None) -> Dict[str, Any]:
|
||
"""
|
||
批量导入模板到云端存储
|
||
|
||
Args:
|
||
source_folder: 包含模板子文件夹的源文件夹
|
||
progress_callback: 进度回调函数
|
||
|
||
Returns:
|
||
导入结果,包含成功/失败计数和详情
|
||
"""
|
||
result = {
|
||
'status': True,
|
||
'msg': '',
|
||
'imported_count': 0,
|
||
'failed_count': 0,
|
||
'imported_templates': [],
|
||
'failed_templates': []
|
||
}
|
||
|
||
try:
|
||
source_path = Path(source_folder)
|
||
if not source_path.exists():
|
||
result['status'] = False
|
||
result['msg'] = f"Source folder does not exist: {source_folder}"
|
||
return result
|
||
|
||
# 获取所有子文件夹
|
||
template_folders = [d for d in source_path.iterdir() if d.is_dir()]
|
||
total_templates = len(template_folders)
|
||
|
||
if total_templates == 0:
|
||
result['msg'] = "No template folders found"
|
||
return result
|
||
|
||
logger.info(f"Found {total_templates} template folders to import")
|
||
|
||
for i, template_path in enumerate(template_folders):
|
||
try:
|
||
if progress_callback:
|
||
progress_callback(i, total_templates, f"Processing {template_path.name}")
|
||
|
||
# 处理单个模板
|
||
template_info = self._process_single_template(template_path)
|
||
if template_info:
|
||
# 保存到云端存储
|
||
template_id = self.template_table.create_template(template_info)
|
||
|
||
result['imported_count'] += 1
|
||
result['imported_templates'].append({
|
||
'id': template_id,
|
||
'name': template_info.name,
|
||
'path': str(template_path)
|
||
})
|
||
|
||
logger.info(f"Successfully imported template: {template_info.name}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to import template {template_path.name}: {e}")
|
||
result['failed_count'] += 1
|
||
result['failed_templates'].append({
|
||
'name': template_path.name,
|
||
'path': str(template_path),
|
||
'error': str(e)
|
||
})
|
||
|
||
if progress_callback:
|
||
progress_callback(total_templates, total_templates, "Import completed")
|
||
|
||
result['msg'] = f"Import completed: {result['imported_count']} success, {result['failed_count']} failed"
|
||
logger.info(result['msg'])
|
||
|
||
except Exception as e:
|
||
logger.error(f"Batch import failed: {e}")
|
||
result['status'] = False
|
||
result['msg'] = f"Batch import failed: {str(e)}"
|
||
|
||
return result
|
||
|
||
def _process_single_template(self, template_path: Path) -> Optional[TemplateInfo]:
|
||
"""
|
||
处理单个模板文件夹
|
||
|
||
Args:
|
||
template_path: 模板文件夹路径
|
||
|
||
Returns:
|
||
TemplateInfo对象,如果处理失败返回None
|
||
"""
|
||
try:
|
||
template_name = template_path.name
|
||
|
||
# 查找draft_content.json文件
|
||
draft_file = template_path / "draft_content.json"
|
||
if not draft_file.exists():
|
||
logger.warning(f"No draft_content.json found in {template_path}")
|
||
return None
|
||
|
||
# 读取draft content
|
||
with open(draft_file, 'r', encoding='utf-8') as f:
|
||
draft_content = json.load(f)
|
||
|
||
# 生成唯一ID
|
||
template_id = str(uuid.uuid4())
|
||
|
||
# 创建模板目录
|
||
template_dir = self.templates_dir / template_id
|
||
template_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 创建资源目录
|
||
resources_dir = template_dir / "resources"
|
||
resources_dir.mkdir(exist_ok=True)
|
||
|
||
# 处理素材并复制资源
|
||
materials = self._process_materials(draft_content, template_path, resources_dir)
|
||
|
||
# 更新draft content中的路径
|
||
updated_draft = self._update_draft_paths(draft_content, materials)
|
||
|
||
# 保存更新后的draft content
|
||
draft_target = template_dir / "draft_content.json"
|
||
with open(draft_target, 'w', encoding='utf-8') as f:
|
||
json.dump(updated_draft, f, ensure_ascii=False, indent=2)
|
||
|
||
# 创建模板信息
|
||
template_info = TemplateInfo(
|
||
id=template_id,
|
||
name=template_name,
|
||
description=f"Imported template: {template_name}",
|
||
thumbnail_path="", # TODO: 生成缩略图
|
||
draft_content_path=str(draft_target),
|
||
resources_path=str(resources_dir),
|
||
created_at=datetime.now().isoformat(),
|
||
updated_at=datetime.now().isoformat(),
|
||
canvas_config=draft_content.get('canvas_config', {}),
|
||
duration=draft_content.get('duration', 0),
|
||
material_count=len(materials),
|
||
track_count=len(draft_content.get('tracks', [])),
|
||
tags=[],
|
||
is_cloud=False, # 用户导入的模板默认为本地
|
||
user_id=self.user_id
|
||
)
|
||
|
||
logger.info(f"Successfully processed template: {template_name} -> {template_id}")
|
||
return template_info
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to process template {template_path}: {e}")
|
||
return None
|
||
|
||
def _process_materials(self, draft_content: Dict, template_path: Path, resources_dir: Path) -> List[MaterialInfo]:
|
||
"""处理模板中的素材文件"""
|
||
materials = []
|
||
|
||
try:
|
||
# 从draft content中提取素材信息
|
||
tracks = draft_content.get('tracks', [])
|
||
|
||
for track in tracks:
|
||
segments = track.get('segments', [])
|
||
for segment in segments:
|
||
material_path = segment.get('material_path', '')
|
||
if material_path:
|
||
# 构建完整路径
|
||
source_file = template_path / material_path
|
||
if source_file.exists():
|
||
# 生成新的文件名
|
||
material_id = str(uuid.uuid4())
|
||
file_extension = source_file.suffix
|
||
new_filename = f"{material_id}{file_extension}"
|
||
target_file = resources_dir / new_filename
|
||
|
||
# 复制文件
|
||
shutil.copy2(source_file, target_file)
|
||
|
||
# 创建素材信息
|
||
material_info = MaterialInfo(
|
||
id=material_id,
|
||
material_id=material_id,
|
||
name=source_file.name,
|
||
type=self._determine_material_type(str(source_file)),
|
||
original_path=str(source_file),
|
||
relative_path=new_filename,
|
||
duration=segment.get('duration'),
|
||
size=source_file.stat().st_size if source_file.exists() else None,
|
||
is_cloud=False,
|
||
user_id=self.user_id
|
||
)
|
||
|
||
materials.append(material_info)
|
||
|
||
# 更新segment中的路径
|
||
segment['material_path'] = new_filename
|
||
segment['material_id'] = material_id
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to process materials: {e}")
|
||
|
||
return materials
|
||
|
||
def _update_draft_paths(self, draft_content: Dict, materials: List[MaterialInfo]) -> Dict:
|
||
"""更新draft content中的路径为相对路径"""
|
||
# 创建路径映射
|
||
path_mapping = {}
|
||
for material in materials:
|
||
path_mapping[material.original_path] = material.relative_path
|
||
|
||
# 这里可以根据需要更新draft_content中的路径
|
||
# 当前实现已在_process_materials中更新了路径
|
||
|
||
return draft_content
|
||
|
||
def get_templates(self, include_cloud: bool = True) -> List[TemplateInfo]:
|
||
"""
|
||
获取模板列表
|
||
|
||
Args:
|
||
include_cloud: 是否包含云端公共模板
|
||
|
||
Returns:
|
||
模板列表
|
||
"""
|
||
try:
|
||
templates = self.template_table.get_templates_by_user(
|
||
self.user_id,
|
||
include_cloud=include_cloud
|
||
)
|
||
logger.info(f"Retrieved {len(templates)} templates for user {self.user_id}")
|
||
return templates
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to get templates: {e}")
|
||
return []
|
||
|
||
def get_template(self, template_id: str) -> Optional[TemplateInfo]:
|
||
"""
|
||
获取特定模板
|
||
|
||
Args:
|
||
template_id: 模板ID
|
||
|
||
Returns:
|
||
模板信息,如果不存在返回None
|
||
"""
|
||
try:
|
||
template = self.template_table.get_template_by_id(template_id)
|
||
if template:
|
||
logger.info(f"Retrieved template: {template.name}")
|
||
else:
|
||
logger.warning(f"Template not found: {template_id}")
|
||
return template
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to get template {template_id}: {e}")
|
||
return None
|
||
|
||
def delete_template(self, template_id: str) -> bool:
|
||
"""
|
||
删除模板
|
||
|
||
Args:
|
||
template_id: 模板ID
|
||
|
||
Returns:
|
||
删除成功返回True
|
||
"""
|
||
try:
|
||
# 获取模板信息以进行权限检查
|
||
template = self.template_table.get_template_by_id(template_id)
|
||
if not template:
|
||
logger.warning(f"Template not found for deletion: {template_id}")
|
||
return False
|
||
|
||
# 检查权限(只能删除自己的模板)
|
||
if template.user_id != self.user_id:
|
||
logger.warning(f"Permission denied: user {self.user_id} cannot delete template {template_id}")
|
||
return False
|
||
|
||
# 删除本地文件
|
||
template_dir = self.templates_dir / template_id
|
||
if template_dir.exists():
|
||
shutil.rmtree(template_dir)
|
||
logger.info(f"Deleted local template directory: {template_dir}")
|
||
|
||
# 从云端存储删除
|
||
result = self.template_table.delete_template(template_id)
|
||
if result:
|
||
logger.info(f"Successfully deleted template: {template_id}")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to delete template {template_id}: {e}")
|
||
return False
|
||
|
||
def search_templates(self, query: str, filters: Optional[Dict[str, Any]] = None) -> List[TemplateInfo]:
|
||
"""
|
||
搜索模板
|
||
|
||
Args:
|
||
query: 搜索关键词
|
||
filters: 搜索过滤器(暂未实现)
|
||
|
||
Returns:
|
||
匹配的模板列表
|
||
"""
|
||
try:
|
||
templates = self.template_table.search_templates(
|
||
query,
|
||
user_id=self.user_id,
|
||
include_cloud=True
|
||
)
|
||
logger.info(f"Search '{query}' found {len(templates)} templates")
|
||
return templates
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to search templates with query '{query}': {e}")
|
||
return []
|
||
|
||
def get_templates_by_tag(self, tag: str) -> List[TemplateInfo]:
|
||
"""
|
||
根据标签获取模板
|
||
|
||
Args:
|
||
tag: 标签名称
|
||
|
||
Returns:
|
||
匹配的模板列表
|
||
"""
|
||
try:
|
||
templates = self.template_table.get_templates_by_tag(
|
||
tag,
|
||
user_id=self.user_id,
|
||
include_cloud=True
|
||
)
|
||
logger.info(f"Tag '{tag}' found {len(templates)} templates")
|
||
return templates
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to get templates by tag '{tag}': {e}")
|
||
return []
|
||
|
||
def update_template(self, template_id: str, updates: Dict[str, Any]) -> bool:
|
||
"""
|
||
更新模板信息
|
||
|
||
Args:
|
||
template_id: 模板ID
|
||
updates: 要更新的字段
|
||
|
||
Returns:
|
||
更新成功返回True
|
||
"""
|
||
try:
|
||
# 检查权限
|
||
template = self.template_table.get_template_by_id(template_id)
|
||
if not template or template.user_id != self.user_id:
|
||
logger.warning(f"Permission denied or template not found: {template_id}")
|
||
return False
|
||
|
||
result = self.template_table.update_template(template_id, updates)
|
||
if result:
|
||
logger.info(f"Successfully updated template: {template_id}")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to update template {template_id}: {e}")
|
||
return False
|
||
|
||
def get_template_count(self, include_cloud: bool = True) -> int:
|
||
"""
|
||
获取模板数量
|
||
|
||
Args:
|
||
include_cloud: 是否包含云端公共模板
|
||
|
||
Returns:
|
||
模板数量
|
||
"""
|
||
try:
|
||
count = self.template_table.get_template_count(
|
||
user_id=self.user_id,
|
||
include_cloud=include_cloud
|
||
)
|
||
logger.info(f"Template count for user {self.user_id}: {count}")
|
||
return count
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to get template count: {e}")
|
||
return 0
|
||
|
||
def get_popular_tags(self, limit: int = 20) -> List[Dict[str, Any]]:
|
||
"""
|
||
获取热门标签
|
||
|
||
Args:
|
||
limit: 最大返回数量
|
||
|
||
Returns:
|
||
标签列表,包含标签名称和使用次数
|
||
"""
|
||
try:
|
||
tags = self.template_table.get_popular_tags(
|
||
user_id=self.user_id,
|
||
limit=limit
|
||
)
|
||
logger.info(f"Retrieved {len(tags)} popular tags")
|
||
return tags
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to get popular tags: {e}")
|
||
return []
|
||
|
||
def create_template_from_draft(self, name: str, description: str, draft_content: Dict[str, Any],
|
||
tags: List[str] = None, is_cloud: bool = False) -> Optional[str]:
|
||
"""
|
||
从draft内容创建模板
|
||
|
||
Args:
|
||
name: 模板名称
|
||
description: 模板描述
|
||
draft_content: draft内容
|
||
tags: 标签列表
|
||
is_cloud: 是否为云端公共模板
|
||
|
||
Returns:
|
||
模板ID,如果创建失败返回None
|
||
"""
|
||
try:
|
||
template_id = str(uuid.uuid4())
|
||
|
||
# 创建模板目录
|
||
template_dir = self.templates_dir / template_id
|
||
template_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 保存draft content
|
||
draft_target = template_dir / "draft_content.json"
|
||
with open(draft_target, 'w', encoding='utf-8') as f:
|
||
json.dump(draft_content, f, ensure_ascii=False, indent=2)
|
||
|
||
# 创建模板信息
|
||
template_info = TemplateInfo(
|
||
id=template_id,
|
||
name=name,
|
||
description=description,
|
||
thumbnail_path="",
|
||
draft_content_path=str(draft_target),
|
||
resources_path=str(template_dir / "resources"),
|
||
created_at=datetime.now().isoformat(),
|
||
updated_at=datetime.now().isoformat(),
|
||
canvas_config=draft_content.get('canvas_config', {}),
|
||
duration=draft_content.get('duration', 0),
|
||
material_count=len(draft_content.get('materials', [])),
|
||
track_count=len(draft_content.get('tracks', [])),
|
||
tags=tags or [],
|
||
is_cloud=is_cloud,
|
||
user_id=self.user_id
|
||
)
|
||
|
||
# 保存到云端存储
|
||
created_id = self.template_table.create_template(template_info)
|
||
logger.info(f"Successfully created template from draft: {name} -> {created_id}")
|
||
|
||
return created_id
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to create template from draft: {e}")
|
||
return None
|
||
|
||
|
||
# 创建全局实例工厂函数
|
||
def create_template_manager_cloud(user_id: str = "default") -> TemplateManagerCloud:
|
||
"""创建TemplateManagerCloud实例"""
|
||
return TemplateManagerCloud(user_id=user_id)
|