mxivideo/python_core/services/template_manager.py

519 lines
21 KiB
Python

"""
Template Management Service
Handles template import, processing, and management
"""
import os
import json
import shutil
import uuid
from pathlib import Path
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
from ..utils.logger import setup_logger
from ..config import settings
from python_core.database.types import TemplateInfo, MaterialInfo
logger = setup_logger(__name__)
class TemplateManager:
"""Template management service"""
def __init__(self):
self.templates_dir = settings.temp_dir / "templates"
self.templates_dir.mkdir(parents=True, exist_ok=True)
# Template metadata file
self.metadata_file = self.templates_dir / "templates.json"
self.templates_metadata = self._load_metadata()
@staticmethod
def detect_segment_type_from_path(file_path: str) -> str:
"""根据文件路径的扩展名检测片段类型"""
if not file_path:
return 'video' # 默认类型
file_extension = file_path.lower().split('.')[-1] if '.' in file_path else ''
# 视频格式
video_extensions = {
'mp4', 'avi', 'mov', 'mkv', 'wmv', 'flv', 'webm', 'm4v', '3gp',
'ts', 'mts', 'mpg', 'mpeg', 'rm', 'rmvb', 'asf', 'divx'
}
# 音频格式
audio_extensions = {
'mp3', 'wav', 'aac', 'flac', 'ogg', 'wma', 'm4a', 'opus',
'ac3', 'dts', 'ape', 'aiff', 'au', 'ra'
}
# 图片格式
image_extensions = {
'jpg', 'jpeg', 'png', 'gif', 'bmp', 'webp', 'svg', 'tiff',
'tga', 'ico', 'psd', 'raw', 'cr2', 'nef'
}
# 文本/字幕格式
text_extensions = {
'txt', 'srt', 'ass', 'vtt', 'sub', 'ssa', 'lrc', 'sbv'
}
# 特效格式
effect_extensions = {
'fx', 'effect', 'json', 'xml', 'preset'
}
if file_extension in video_extensions:
return 'video'
elif file_extension in audio_extensions:
return 'audio'
elif file_extension in image_extensions:
return 'image'
elif file_extension in text_extensions:
return 'text'
elif file_extension in effect_extensions:
return 'effect'
else:
return 'video' # 默认为视频类型
def _load_metadata(self) -> Dict[str, Dict]:
"""Load templates metadata"""
if self.metadata_file.exists():
try:
with open(self.metadata_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"Failed to load templates metadata: {e}")
return {}
def _save_metadata(self):
"""Save templates metadata"""
try:
with open(self.metadata_file, 'w', encoding='utf-8') as f:
json.dump(self.templates_metadata, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Failed to save templates metadata: {e}")
def batch_import_templates(self, source_folder: str, progress_callback=None) -> Dict[str, Any]:
"""
Batch import templates from a folder
Args:
source_folder: Source folder containing template subfolders
progress_callback: Progress callback function
Returns:
Import result with success/failure counts and details
"""
result = {
'status': True,
'msg': '',
'imported_count': 0,
'failed_count': 0,
'imported_templates': [],
'failed_templates': []
}
try:
if not os.path.exists(source_folder):
result['status'] = False
result['msg'] = f"Source folder does not exist: {source_folder}"
return result
# Find all subdirectories
subdirs = [d for d in os.listdir(source_folder)
if os.path.isdir(os.path.join(source_folder, d))]
if not subdirs:
result['status'] = False
result['msg'] = "No subdirectories found in source folder"
return result
total_templates = len(subdirs)
logger.info(f"Found {total_templates} potential templates in {source_folder}")
if progress_callback:
progress_callback(f"Found {total_templates} potential templates")
for i, template_dir in enumerate(subdirs):
template_path = os.path.join(source_folder, template_dir)
if progress_callback:
progress_callback(f"Processing template {i+1}/{total_templates}: {template_dir}")
try:
template_info = self._import_single_template(template_path, template_dir)
if template_info:
result['imported_count'] += 1
result['imported_templates'].append(template_info)
logger.info(f"Successfully imported template: {template_dir}")
if progress_callback:
progress_callback(f"✅ Successfully imported: {template_dir}")
else:
result['failed_count'] += 1
result['failed_templates'].append({
'name': template_dir,
'error': 'No draft_content.json found or invalid template'
})
logger.warning(f"Failed to import template: {template_dir}")
if progress_callback:
progress_callback(f"❌ Failed to import: {template_dir} (No draft_content.json found)")
except Exception as e:
result['failed_count'] += 1
result['failed_templates'].append({
'name': template_dir,
'error': str(e)
})
logger.error(f"Error importing template {template_dir}: {e}")
if progress_callback:
progress_callback(f"❌ Error importing: {template_dir} ({str(e)})")
# Save metadata
self._save_metadata()
result['msg'] = f"Import completed. Success: {result['imported_count']}, Failed: {result['failed_count']}"
logger.info(result['msg'])
if progress_callback:
progress_callback(f"🎉 Import completed! Success: {result['imported_count']}, Failed: {result['failed_count']}")
except Exception as e:
result['status'] = False
result['msg'] = f"Batch import failed: {str(e)}"
logger.error(result['msg'])
return result
def _import_single_template(self, template_path: str, template_name: str) -> Optional[TemplateInfo]:
"""
Import a single template
Args:
template_path: Path to template folder
template_name: Template name
Returns:
TemplateInfo if successful, None otherwise
"""
try:
# Check for draft_content.json
draft_file = os.path.join(template_path, "draft_content.json")
if not os.path.exists(draft_file):
logger.warning(f"No draft_content.json found in {template_path}")
return None
# Load and parse draft content
with open(draft_file, 'r', encoding='utf-8') as f:
draft_content = json.load(f)
# Use the original template ID from draft_content.json
template_id = draft_content.get('id')
if not template_id:
logger.warning(f"No 'id' field found in draft_content.json for {template_path}")
return None
# Check if template already exists
if template_id in self.templates_metadata:
logger.info(f"Template {template_id} already exists, skipping import")
return None
# Create template directory
template_dir = self.templates_dir / template_id
template_dir.mkdir(parents=True, exist_ok=True)
# Create resources directory
resources_dir = template_dir / "resources"
resources_dir.mkdir(exist_ok=True)
# Process materials and copy resources
materials = self._process_materials(draft_content, template_path, resources_dir)
# Update draft content with relative paths
updated_draft = self._update_draft_paths(draft_content, materials)
# Save updated draft content
draft_target = template_dir / "draft_content.json"
with open(draft_target, 'w', encoding='utf-8') as f:
json.dump(updated_draft, f, ensure_ascii=False, indent=2)
# Create template info
template_info = TemplateInfo(
id=template_id,
name=template_name,
description=f"Imported template: {template_name}",
thumbnail_path="", # TODO: Generate thumbnail
draft_content_path=str(draft_target),
resources_path=str(resources_dir),
created_at=datetime.now().isoformat(),
updated_at=datetime.now().isoformat(),
canvas_config=draft_content.get('canvas_config', {}),
duration=draft_content.get('duration', 0),
material_count=len(materials),
track_count=len(draft_content.get('tracks', [])),
tags=[],
draft_content=draft_content # 设置 draft_content
)
# Save to metadata
self.templates_metadata[template_id] = asdict(template_info)
logger.info(f"Successfully processed template: {template_name} -> {template_id}")
return template_info
except Exception as e:
logger.error(f"Failed to import template {template_name}: {e}")
return None
def _process_materials(self, draft_content: Dict, source_path: str, resources_dir: Path) -> List[MaterialInfo]:
"""
Process and copy materials from draft content
Args:
draft_content: Draft content JSON
source_path: Source template path
resources_dir: Target resources directory
Returns:
List of processed materials
"""
materials = []
materials_section = draft_content.get('materials', {})
# Process different material types
material_types = ['videos', 'audios', 'images', 'stickers', 'texts']
for material_type in material_types:
if material_type in materials_section:
for material in materials_section[material_type]:
material_info = self._process_single_material(
material, material_type, source_path, resources_dir
)
if material_info:
materials.append(material_info)
return materials
def _process_single_material(self, material: Dict, material_type: str,
source_path: str, resources_dir: Path) -> Optional[MaterialInfo]:
"""Process a single material"""
try:
material_id = material.get('id')
if not material_id:
return None
# Get original path
original_path = material.get('path', '')
if not original_path or not os.path.exists(original_path):
logger.warning(f"Material path not found: {original_path}")
return None
# Generate relative path
filename = os.path.basename(original_path)
relative_path = f"resources/{filename}"
target_path = resources_dir / filename
# Copy file
shutil.copy2(original_path, target_path)
# Get file info
file_size = os.path.getsize(target_path)
material_info = MaterialInfo(
id=str(uuid.uuid4()),
material_id=material_id,
name=material.get('name', filename),
type=material_type,
original_path=original_path,
relative_path=relative_path,
size=file_size
)
logger.debug(f"Processed material: {filename} -> {relative_path}")
return material_info
except Exception as e:
logger.error(f"Failed to process material: {e}")
return None
def _update_draft_paths(self, draft_content: Dict, materials: List[MaterialInfo]) -> Dict:
"""Update draft content with relative paths"""
# Create material ID to relative path mapping
path_mapping = {mat.material_id: mat.relative_path for mat in materials}
# Update materials section
materials_section = draft_content.get('materials', {})
for material_type in materials_section:
if isinstance(materials_section[material_type], list):
for material in materials_section[material_type]:
material_id = material.get('id')
if material_id in path_mapping:
material['path'] = path_mapping[material_id]
return draft_content
def get_templates(self) -> List[TemplateInfo]:
"""Get all templates"""
templates = []
for template_data in self.templates_metadata.values():
templates.append(TemplateInfo(**template_data))
return templates
def get_template(self, template_id: str) -> Optional[TemplateInfo]:
"""Get a specific template"""
if template_id in self.templates_metadata:
return TemplateInfo(**self.templates_metadata[template_id])
return None
def delete_template(self, template_id: str) -> bool:
"""Delete a template"""
try:
if template_id in self.templates_metadata:
# Remove template directory
template_dir = self.templates_dir / template_id
if template_dir.exists():
shutil.rmtree(template_dir)
# Remove from metadata
del self.templates_metadata[template_id]
self._save_metadata()
logger.info(f"Deleted template: {template_id}")
return True
except Exception as e:
logger.error(f"Failed to delete template {template_id}: {e}")
return False
def get_template_detail(self, template_id: str) -> Optional[Dict[str, Any]]:
"""
Get detailed template information including tracks and segments
Args:
template_id: Template ID
Returns:
Dict containing detailed template information with tracks and segments
"""
try:
# Get basic template info
template = self.get_template(template_id)
if not template:
return None
# Load draft content to get tracks and segments
draft_content_path = Path(template.draft_content_path)
if not draft_content_path.exists():
logger.error(f"Draft content file not found: {draft_content_path}")
return None
with open(draft_content_path, 'r', encoding='utf-8') as f:
draft_content = json.load(f)
# Build material lookup tables
materials_lookup = {}
if 'materials' in draft_content:
# Index videos by id
for video in draft_content['materials'].get('videos', []):
video_id = video.get('id', '')
if video_id:
materials_lookup[video_id] = {
'type': 'video',
'path': video.get('path', ''),
'name': video.get('material_name', ''),
'duration': video.get('duration', 0)
}
# Index audios by id
for audio in draft_content['materials'].get('audios', []):
audio_id = audio.get('id', '')
if audio_id:
materials_lookup[audio_id] = {
'type': 'audio',
'path': audio.get('path', ''),
'name': audio.get('name', ''),
'duration': audio.get('duration', 0)
}
# Extract tracks and segments information
tracks = []
if 'tracks' in draft_content:
for idx,track_data in enumerate(draft_content['tracks']):
track = {
'id': track_data.get('id', ''),
'name': track_data.get('name', f""),
'type': track_data.get('type', ''),
'index': idx+1,
'segments': [],
'properties': track_data.get('properties', {})
}
# Extract segments
if 'segments' in track_data:
for segment_data in track_data['segments']:
# Extract time information from target_timerange
target_timerange = segment_data.get('target_timerange', {})
start_time_us = target_timerange.get('start', 0) # 微秒
duration_us = target_timerange.get('duration', 0) # 微秒
# Convert microseconds to seconds
start_time = start_time_us / 1000000.0
duration = duration_us / 1000000.0
end_time = start_time + duration
# Get material reference for resource path
material_id = segment_data.get('material_id', '')
# 参考assets/draft_content.json 根据material_id匹配materials中的videos
resource_path = ''
material_name = ''
segment_type = segment_data.get('type', 'video')
if material_id and material_id in materials_lookup:
material_info = materials_lookup[material_id]
resource_path = material_info['path']
material_name = material_info['name']
# 根据resource_path后缀名 设置类型
if not segment_data.get('type'):
segment_type = material_info['type']
# 根据resource_path后缀名自动检测类型
if resource_path and not segment_data.get('type'):
segment_type = TemplateManager.detect_segment_type_from_path(resource_path)
# Generate segment name based on material name or default
segment_name = segment_data.get('name', '随机')
segment = {
'id': segment_data.get('id', ''),
'type': segment_type,
'name': segment_name,
'start_time': start_time,
'end_time': end_time,
'duration': duration,
'resource_path': resource_path,
'material_name': material_name,
'properties': segment_data.get('properties', {}),
'effects': segment_data.get('effects', [])
}
track['segments'].append(segment)
tracks.append(track)
# Build detailed template information
detail = {
'id': template.id,
'name': template.name,
'description': template.description,
'canvas_config': template.canvas_config,
'tracks': tracks,
'duration': template.duration,
'fps': draft_content.get('fps', 30),
'sample_rate': draft_content.get('sample_rate')
}
return detail
except Exception as e:
logger.error(f"Failed to get template detail for {template_id}: {e}")
return None