""" Template Management Service Handles template import, processing, and management """ import os import json import shutil import uuid from pathlib import Path from typing import Dict, List, Any, Optional from dataclasses import dataclass, asdict from datetime import datetime from ..utils.logger import setup_logger from ..utils.jsonrpc import create_response_handler, create_progress_reporter, JSONRPCError from ..config import settings logger = setup_logger(__name__) @dataclass class TemplateInfo: """Template information structure""" id: str name: str description: str thumbnail_path: str draft_content_path: str resources_path: str created_at: str updated_at: str canvas_config: Dict[str, Any] duration: int material_count: int track_count: int tags: List[str] @dataclass class MaterialInfo: """Material information structure""" id: str material_id: str name: str type: str # video, audio, image, text, sticker, etc. original_path: str relative_path: str duration: Optional[int] = None size: Optional[int] = None class TemplateManager: """Template management service""" def __init__(self): self.templates_dir = settings.temp_dir / "templates" self.templates_dir.mkdir(parents=True, exist_ok=True) # Template metadata file self.metadata_file = self.templates_dir / "templates.json" self.templates_metadata = self._load_metadata() @staticmethod def detect_segment_type_from_path(file_path: str) -> str: """根据文件路径的扩展名检测片段类型""" if not file_path: return 'video' # 默认类型 file_extension = file_path.lower().split('.')[-1] if '.' in file_path else '' # 视频格式 video_extensions = { 'mp4', 'avi', 'mov', 'mkv', 'wmv', 'flv', 'webm', 'm4v', '3gp', 'ts', 'mts', 'mpg', 'mpeg', 'rm', 'rmvb', 'asf', 'divx' } # 音频格式 audio_extensions = { 'mp3', 'wav', 'aac', 'flac', 'ogg', 'wma', 'm4a', 'opus', 'ac3', 'dts', 'ape', 'aiff', 'au', 'ra' } # 图片格式 image_extensions = { 'jpg', 'jpeg', 'png', 'gif', 'bmp', 'webp', 'svg', 'tiff', 'tga', 'ico', 'psd', 'raw', 'cr2', 'nef' } # 文本/字幕格式 text_extensions = { 'txt', 'srt', 'ass', 'vtt', 'sub', 'ssa', 'lrc', 'sbv' } # 特效格式 effect_extensions = { 'fx', 'effect', 'json', 'xml', 'preset' } if file_extension in video_extensions: return 'video' elif file_extension in audio_extensions: return 'audio' elif file_extension in image_extensions: return 'image' elif file_extension in text_extensions: return 'text' elif file_extension in effect_extensions: return 'effect' else: return 'video' # 默认为视频类型 def _load_metadata(self) -> Dict[str, Dict]: """Load templates metadata""" if self.metadata_file.exists(): try: with open(self.metadata_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.error(f"Failed to load templates metadata: {e}") return {} def _save_metadata(self): """Save templates metadata""" try: with open(self.metadata_file, 'w', encoding='utf-8') as f: json.dump(self.templates_metadata, f, ensure_ascii=False, indent=2) except Exception as e: logger.error(f"Failed to save templates metadata: {e}") def batch_import_templates(self, source_folder: str, progress_callback=None) -> Dict[str, Any]: """ Batch import templates from a folder Args: source_folder: Source folder containing template subfolders progress_callback: Progress callback function Returns: Import result with success/failure counts and details """ result = { 'status': True, 'msg': '', 'imported_count': 0, 'failed_count': 0, 'imported_templates': [], 'failed_templates': [] } try: if not os.path.exists(source_folder): result['status'] = False result['msg'] = f"Source folder does not exist: {source_folder}" return result # Find all subdirectories subdirs = [d for d in os.listdir(source_folder) if os.path.isdir(os.path.join(source_folder, d))] if not subdirs: result['status'] = False result['msg'] = "No subdirectories found in source folder" return result total_templates = len(subdirs) logger.info(f"Found {total_templates} potential templates in {source_folder}") if progress_callback: progress_callback(f"Found {total_templates} potential templates") for i, template_dir in enumerate(subdirs): template_path = os.path.join(source_folder, template_dir) if progress_callback: progress_callback(f"Processing template {i+1}/{total_templates}: {template_dir}") try: template_info = self._import_single_template(template_path, template_dir) if template_info: result['imported_count'] += 1 result['imported_templates'].append(template_info) logger.info(f"Successfully imported template: {template_dir}") if progress_callback: progress_callback(f"✅ Successfully imported: {template_dir}") else: result['failed_count'] += 1 result['failed_templates'].append({ 'name': template_dir, 'error': 'No draft_content.json found or invalid template' }) logger.warning(f"Failed to import template: {template_dir}") if progress_callback: progress_callback(f"❌ Failed to import: {template_dir} (No draft_content.json found)") except Exception as e: result['failed_count'] += 1 result['failed_templates'].append({ 'name': template_dir, 'error': str(e) }) logger.error(f"Error importing template {template_dir}: {e}") if progress_callback: progress_callback(f"❌ Error importing: {template_dir} ({str(e)})") # Save metadata self._save_metadata() result['msg'] = f"Import completed. Success: {result['imported_count']}, Failed: {result['failed_count']}" logger.info(result['msg']) if progress_callback: progress_callback(f"🎉 Import completed! Success: {result['imported_count']}, Failed: {result['failed_count']}") except Exception as e: result['status'] = False result['msg'] = f"Batch import failed: {str(e)}" logger.error(result['msg']) return result def _import_single_template(self, template_path: str, template_name: str) -> Optional[TemplateInfo]: """ Import a single template Args: template_path: Path to template folder template_name: Template name Returns: TemplateInfo if successful, None otherwise """ try: # Check for draft_content.json draft_file = os.path.join(template_path, "draft_content.json") if not os.path.exists(draft_file): logger.warning(f"No draft_content.json found in {template_path}") return None # Load and parse draft content with open(draft_file, 'r', encoding='utf-8') as f: draft_content = json.load(f) # Use the original template ID from draft_content.json template_id = draft_content.get('id') if not template_id: logger.warning(f"No 'id' field found in draft_content.json for {template_path}") return None # Check if template already exists if template_id in self.templates_metadata: logger.info(f"Template {template_id} already exists, skipping import") return None # Create template directory template_dir = self.templates_dir / template_id template_dir.mkdir(parents=True, exist_ok=True) # Create resources directory resources_dir = template_dir / "resources" resources_dir.mkdir(exist_ok=True) # Process materials and copy resources materials = self._process_materials(draft_content, template_path, resources_dir) # Update draft content with relative paths updated_draft = self._update_draft_paths(draft_content, materials) # Save updated draft content draft_target = template_dir / "draft_content.json" with open(draft_target, 'w', encoding='utf-8') as f: json.dump(updated_draft, f, ensure_ascii=False, indent=2) # Create template info template_info = TemplateInfo( id=template_id, name=template_name, description=f"Imported template: {template_name}", thumbnail_path="", # TODO: Generate thumbnail draft_content_path=str(draft_target), resources_path=str(resources_dir), created_at=datetime.now().isoformat(), updated_at=datetime.now().isoformat(), canvas_config=draft_content.get('canvas_config', {}), duration=draft_content.get('duration', 0), material_count=len(materials), track_count=len(draft_content.get('tracks', [])), tags=[] ) # Save to metadata self.templates_metadata[template_id] = asdict(template_info) logger.info(f"Successfully processed template: {template_name} -> {template_id}") return template_info except Exception as e: logger.error(f"Failed to import template {template_name}: {e}") return None def _process_materials(self, draft_content: Dict, source_path: str, resources_dir: Path) -> List[MaterialInfo]: """ Process and copy materials from draft content Args: draft_content: Draft content JSON source_path: Source template path resources_dir: Target resources directory Returns: List of processed materials """ materials = [] materials_section = draft_content.get('materials', {}) # Process different material types material_types = ['videos', 'audios', 'images', 'stickers', 'texts'] for material_type in material_types: if material_type in materials_section: for material in materials_section[material_type]: material_info = self._process_single_material( material, material_type, source_path, resources_dir ) if material_info: materials.append(material_info) return materials def _process_single_material(self, material: Dict, material_type: str, source_path: str, resources_dir: Path) -> Optional[MaterialInfo]: """Process a single material""" try: material_id = material.get('id') if not material_id: return None # Get original path original_path = material.get('path', '') if not original_path or not os.path.exists(original_path): logger.warning(f"Material path not found: {original_path}") return None # Generate relative path filename = os.path.basename(original_path) relative_path = f"resources/{filename}" target_path = resources_dir / filename # Copy file shutil.copy2(original_path, target_path) # Get file info file_size = os.path.getsize(target_path) material_info = MaterialInfo( id=str(uuid.uuid4()), material_id=material_id, name=material.get('name', filename), type=material_type, original_path=original_path, relative_path=relative_path, size=file_size ) logger.debug(f"Processed material: {filename} -> {relative_path}") return material_info except Exception as e: logger.error(f"Failed to process material: {e}") return None def _update_draft_paths(self, draft_content: Dict, materials: List[MaterialInfo]) -> Dict: """Update draft content with relative paths""" # Create material ID to relative path mapping path_mapping = {mat.material_id: mat.relative_path for mat in materials} # Update materials section materials_section = draft_content.get('materials', {}) for material_type in materials_section: if isinstance(materials_section[material_type], list): for material in materials_section[material_type]: material_id = material.get('id') if material_id in path_mapping: material['path'] = path_mapping[material_id] return draft_content def get_templates(self) -> List[TemplateInfo]: """Get all templates""" templates = [] for template_data in self.templates_metadata.values(): templates.append(TemplateInfo(**template_data)) return templates def get_template(self, template_id: str) -> Optional[TemplateInfo]: """Get a specific template""" if template_id in self.templates_metadata: return TemplateInfo(**self.templates_metadata[template_id]) return None def delete_template(self, template_id: str) -> bool: """Delete a template""" try: if template_id in self.templates_metadata: # Remove template directory template_dir = self.templates_dir / template_id if template_dir.exists(): shutil.rmtree(template_dir) # Remove from metadata del self.templates_metadata[template_id] self._save_metadata() logger.info(f"Deleted template: {template_id}") return True except Exception as e: logger.error(f"Failed to delete template {template_id}: {e}") return False def get_template_detail(self, template_id: str) -> Optional[Dict[str, Any]]: """ Get detailed template information including tracks and segments Args: template_id: Template ID Returns: Dict containing detailed template information with tracks and segments """ try: # Get basic template info template = self.get_template(template_id) if not template: return None # Load draft content to get tracks and segments draft_content_path = Path(template.draft_content_path) if not draft_content_path.exists(): logger.error(f"Draft content file not found: {draft_content_path}") return None with open(draft_content_path, 'r', encoding='utf-8') as f: draft_content = json.load(f) # Build material lookup tables materials_lookup = {} if 'materials' in draft_content: # Index videos by id for video in draft_content['materials'].get('videos', []): video_id = video.get('id', '') if video_id: materials_lookup[video_id] = { 'type': 'video', 'path': video.get('path', ''), 'name': video.get('material_name', ''), 'duration': video.get('duration', 0) } # Index audios by id for audio in draft_content['materials'].get('audios', []): audio_id = audio.get('id', '') if audio_id: materials_lookup[audio_id] = { 'type': 'audio', 'path': audio.get('path', ''), 'name': audio.get('name', ''), 'duration': audio.get('duration', 0) } # Extract tracks and segments information tracks = [] if 'tracks' in draft_content: for idx,track_data in enumerate(draft_content['tracks']): track = { 'id': track_data.get('id', ''), 'name': track_data.get('name', f""), 'type': track_data.get('type', ''), 'index': idx+1, 'segments': [], 'properties': track_data.get('properties', {}) } # Extract segments if 'segments' in track_data: for segment_data in track_data['segments']: # Extract time information from target_timerange target_timerange = segment_data.get('target_timerange', {}) start_time_us = target_timerange.get('start', 0) # 微秒 duration_us = target_timerange.get('duration', 0) # 微秒 # Convert microseconds to seconds start_time = start_time_us / 1000000.0 duration = duration_us / 1000000.0 end_time = start_time + duration # Get material reference for resource path material_id = segment_data.get('material_id', '') # 参考assets/draft_content.json 根据material_id匹配materials中的videos resource_path = '' material_name = '' segment_type = segment_data.get('type', 'video') if material_id and material_id in materials_lookup: material_info = materials_lookup[material_id] resource_path = material_info['path'] material_name = material_info['name'] # 根据resource_path后缀名 设置类型 if not segment_data.get('type'): segment_type = material_info['type'] # 根据resource_path后缀名自动检测类型 if resource_path and not segment_data.get('type'): segment_type = TemplateManager.detect_segment_type_from_path(resource_path) # Generate segment name based on material name or default segment_name = segment_data.get('name', '随机') segment = { 'id': segment_data.get('id', ''), 'type': segment_type, 'name': segment_name, 'start_time': start_time, 'end_time': end_time, 'duration': duration, 'resource_path': resource_path, 'material_name': material_name, 'properties': segment_data.get('properties', {}), 'effects': segment_data.get('effects', []) } track['segments'].append(segment) tracks.append(track) # Build detailed template information detail = { 'id': template.id, 'name': template.name, 'description': template.description, 'canvas_config': template.canvas_config, 'tracks': tracks, 'duration': template.duration, 'fps': draft_content.get('fps', 30), 'sample_rate': draft_content.get('sample_rate') } return detail except Exception as e: logger.error(f"Failed to get template detail for {template_id}: {e}") return None def main(): """Command line interface for template management.""" import argparse import json import sys parser = argparse.ArgumentParser(description="Template Manager") parser.add_argument("--action", required=True, help="Action to perform") parser.add_argument("--source_folder", help="Source folder for batch import") parser.add_argument("--template_id", help="Template ID for operations") args = parser.parse_args() # Create JSON-RPC response handler rpc = create_response_handler("template_manager") progress = create_progress_reporter() try: manager = TemplateManager() if args.action == "batch_import": if not args.source_folder: rpc.error(JSONRPCError.INVALID_PARAMS, "Source folder is required for batch import") return def progress_callback(message): # Parse progress information from message if "Processing template" in message: # Extract template info from message like "Processing template 1/3: Template_Name" parts = message.split(":") if len(parts) >= 2: template_name = parts[1].strip() # Extract progress numbers if "/" in parts[0]: progress_part = parts[0].split("Processing template")[1].strip() if "/" in progress_part: current, total = progress_part.split("/") try: current_num = int(current.strip()) total_num = int(total.strip()) progress_percent = (current_num / total_num) * 100 progress.report( step="import", progress=progress_percent, message=message, details={ "current_template": template_name, "total_templates": total_num, "processed_templates": current_num } ) return except ValueError: pass # Default progress step progress.step("import", message) result = manager.batch_import_templates(args.source_folder, progress_callback) # Convert TemplateInfo objects to dictionaries for JSON serialization if 'imported_templates' in result: result['imported_templates'] = [asdict(template) for template in result['imported_templates']] # Send successful result via JSON-RPC rpc.success(result) elif args.action == "get_templates": templates = manager.get_templates() result = { "status": True, "templates": [asdict(template) for template in templates] } rpc.success(result) elif args.action == "get_template": if not args.template_id: rpc.error(JSONRPCError.INVALID_PARAMS, "Template ID is required") return template = manager.get_template(args.template_id) if template: result = { "status": True, "template": asdict(template) } rpc.success(result) else: rpc.error(JSONRPCError.TEMPLATE_NOT_FOUND, f"Template not found: {args.template_id}") elif args.action == "delete_template": if not args.template_id: rpc.error(JSONRPCError.INVALID_PARAMS, "Template ID is required") return success = manager.delete_template(args.template_id) if success: result = { "status": True, "msg": "Template deleted successfully" } rpc.success(result) else: rpc.error(JSONRPCError.TEMPLATE_NOT_FOUND, "Failed to delete template") elif args.action == "get_template_detail": if not args.template_id: rpc.error(JSONRPCError.INVALID_PARAMS, "Template ID is required") return detail = manager.get_template_detail(args.template_id) if detail: rpc.success(detail) else: rpc.error(JSONRPCError.TEMPLATE_NOT_FOUND, f"Template detail not found: {args.template_id}") else: rpc.error(JSONRPCError.METHOD_NOT_FOUND, f"Unknown action: {args.action}") except Exception as e: # Send error via JSON-RPC rpc.error(JSONRPCError.INTERNAL_ERROR, f"Template manager error: {str(e)}", str(e)) sys.exit(1) if __name__ == "__main__": main()