590 lines
23 KiB
Python
590 lines
23 KiB
Python
"""
|
|
Template Management Service
|
|
Handles template import, processing, and management
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import shutil
|
|
import uuid
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime
|
|
|
|
from ..utils.logger import setup_logger
|
|
from ..utils.jsonrpc import create_response_handler, create_progress_reporter, JSONRPCError
|
|
from ..config import settings
|
|
|
|
logger = setup_logger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class TemplateInfo:
|
|
"""Template information structure"""
|
|
id: str
|
|
name: str
|
|
description: str
|
|
thumbnail_path: str
|
|
draft_content_path: str
|
|
resources_path: str
|
|
created_at: str
|
|
updated_at: str
|
|
canvas_config: Dict[str, Any]
|
|
duration: int
|
|
material_count: int
|
|
track_count: int
|
|
tags: List[str]
|
|
|
|
|
|
@dataclass
|
|
class MaterialInfo:
|
|
"""Material information structure"""
|
|
id: str
|
|
material_id: str
|
|
name: str
|
|
type: str # video, audio, image, text, sticker, etc.
|
|
original_path: str
|
|
relative_path: str
|
|
duration: Optional[int] = None
|
|
size: Optional[int] = None
|
|
|
|
|
|
class TemplateManager:
|
|
"""Template management service"""
|
|
|
|
def __init__(self):
|
|
self.templates_dir = settings.temp_dir / "templates"
|
|
self.templates_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Template metadata file
|
|
self.metadata_file = self.templates_dir / "templates.json"
|
|
self.templates_metadata = self._load_metadata()
|
|
|
|
def _load_metadata(self) -> Dict[str, Dict]:
|
|
"""Load templates metadata"""
|
|
if self.metadata_file.exists():
|
|
try:
|
|
with open(self.metadata_file, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
logger.error(f"Failed to load templates metadata: {e}")
|
|
return {}
|
|
|
|
def _save_metadata(self):
|
|
"""Save templates metadata"""
|
|
try:
|
|
with open(self.metadata_file, 'w', encoding='utf-8') as f:
|
|
json.dump(self.templates_metadata, f, ensure_ascii=False, indent=2)
|
|
except Exception as e:
|
|
logger.error(f"Failed to save templates metadata: {e}")
|
|
|
|
def batch_import_templates(self, source_folder: str, progress_callback=None) -> Dict[str, Any]:
|
|
"""
|
|
Batch import templates from a folder
|
|
|
|
Args:
|
|
source_folder: Source folder containing template subfolders
|
|
progress_callback: Progress callback function
|
|
|
|
Returns:
|
|
Import result with success/failure counts and details
|
|
"""
|
|
result = {
|
|
'status': True,
|
|
'msg': '',
|
|
'imported_count': 0,
|
|
'failed_count': 0,
|
|
'imported_templates': [],
|
|
'failed_templates': []
|
|
}
|
|
|
|
try:
|
|
if not os.path.exists(source_folder):
|
|
result['status'] = False
|
|
result['msg'] = f"Source folder does not exist: {source_folder}"
|
|
return result
|
|
|
|
# Find all subdirectories
|
|
subdirs = [d for d in os.listdir(source_folder)
|
|
if os.path.isdir(os.path.join(source_folder, d))]
|
|
|
|
if not subdirs:
|
|
result['status'] = False
|
|
result['msg'] = "No subdirectories found in source folder"
|
|
return result
|
|
|
|
total_templates = len(subdirs)
|
|
logger.info(f"Found {total_templates} potential templates in {source_folder}")
|
|
|
|
if progress_callback:
|
|
progress_callback(f"Found {total_templates} potential templates")
|
|
|
|
for i, template_dir in enumerate(subdirs):
|
|
template_path = os.path.join(source_folder, template_dir)
|
|
|
|
if progress_callback:
|
|
progress_callback(f"Processing template {i+1}/{total_templates}: {template_dir}")
|
|
|
|
try:
|
|
template_info = self._import_single_template(template_path, template_dir)
|
|
if template_info:
|
|
result['imported_count'] += 1
|
|
result['imported_templates'].append(template_info)
|
|
logger.info(f"Successfully imported template: {template_dir}")
|
|
if progress_callback:
|
|
progress_callback(f"✅ Successfully imported: {template_dir}")
|
|
else:
|
|
result['failed_count'] += 1
|
|
result['failed_templates'].append({
|
|
'name': template_dir,
|
|
'error': 'No draft_content.json found or invalid template'
|
|
})
|
|
logger.warning(f"Failed to import template: {template_dir}")
|
|
if progress_callback:
|
|
progress_callback(f"❌ Failed to import: {template_dir} (No draft_content.json found)")
|
|
|
|
except Exception as e:
|
|
result['failed_count'] += 1
|
|
result['failed_templates'].append({
|
|
'name': template_dir,
|
|
'error': str(e)
|
|
})
|
|
logger.error(f"Error importing template {template_dir}: {e}")
|
|
if progress_callback:
|
|
progress_callback(f"❌ Error importing: {template_dir} ({str(e)})")
|
|
|
|
# Save metadata
|
|
self._save_metadata()
|
|
|
|
result['msg'] = f"Import completed. Success: {result['imported_count']}, Failed: {result['failed_count']}"
|
|
logger.info(result['msg'])
|
|
|
|
if progress_callback:
|
|
progress_callback(f"🎉 Import completed! Success: {result['imported_count']}, Failed: {result['failed_count']}")
|
|
|
|
except Exception as e:
|
|
result['status'] = False
|
|
result['msg'] = f"Batch import failed: {str(e)}"
|
|
logger.error(result['msg'])
|
|
|
|
return result
|
|
|
|
def _import_single_template(self, template_path: str, template_name: str) -> Optional[TemplateInfo]:
|
|
"""
|
|
Import a single template
|
|
|
|
Args:
|
|
template_path: Path to template folder
|
|
template_name: Template name
|
|
|
|
Returns:
|
|
TemplateInfo if successful, None otherwise
|
|
"""
|
|
try:
|
|
# Check for draft_content.json
|
|
draft_file = os.path.join(template_path, "draft_content.json")
|
|
if not os.path.exists(draft_file):
|
|
logger.warning(f"No draft_content.json found in {template_path}")
|
|
return None
|
|
|
|
# Load and parse draft content
|
|
with open(draft_file, 'r', encoding='utf-8') as f:
|
|
draft_content = json.load(f)
|
|
|
|
# Use the original template ID from draft_content.json
|
|
template_id = draft_content.get('id')
|
|
if not template_id:
|
|
logger.warning(f"No 'id' field found in draft_content.json for {template_path}")
|
|
return None
|
|
|
|
# Check if template already exists
|
|
if template_id in self.templates_metadata:
|
|
logger.info(f"Template {template_id} already exists, skipping import")
|
|
return None
|
|
|
|
# Create template directory
|
|
template_dir = self.templates_dir / template_id
|
|
template_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Create resources directory
|
|
resources_dir = template_dir / "resources"
|
|
resources_dir.mkdir(exist_ok=True)
|
|
|
|
# Process materials and copy resources
|
|
materials = self._process_materials(draft_content, template_path, resources_dir)
|
|
|
|
# Update draft content with relative paths
|
|
updated_draft = self._update_draft_paths(draft_content, materials)
|
|
|
|
# Save updated draft content
|
|
draft_target = template_dir / "draft_content.json"
|
|
with open(draft_target, 'w', encoding='utf-8') as f:
|
|
json.dump(updated_draft, f, ensure_ascii=False, indent=2)
|
|
|
|
# Create template info
|
|
template_info = TemplateInfo(
|
|
id=template_id,
|
|
name=template_name,
|
|
description=f"Imported template: {template_name}",
|
|
thumbnail_path="", # TODO: Generate thumbnail
|
|
draft_content_path=str(draft_target),
|
|
resources_path=str(resources_dir),
|
|
created_at=datetime.now().isoformat(),
|
|
updated_at=datetime.now().isoformat(),
|
|
canvas_config=draft_content.get('canvas_config', {}),
|
|
duration=draft_content.get('duration', 0),
|
|
material_count=len(materials),
|
|
track_count=len(draft_content.get('tracks', [])),
|
|
tags=[]
|
|
)
|
|
|
|
# Save to metadata
|
|
self.templates_metadata[template_id] = asdict(template_info)
|
|
|
|
logger.info(f"Successfully processed template: {template_name} -> {template_id}")
|
|
return template_info
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to import template {template_name}: {e}")
|
|
return None
|
|
|
|
def _process_materials(self, draft_content: Dict, source_path: str, resources_dir: Path) -> List[MaterialInfo]:
|
|
"""
|
|
Process and copy materials from draft content
|
|
|
|
Args:
|
|
draft_content: Draft content JSON
|
|
source_path: Source template path
|
|
resources_dir: Target resources directory
|
|
|
|
Returns:
|
|
List of processed materials
|
|
"""
|
|
materials = []
|
|
materials_section = draft_content.get('materials', {})
|
|
|
|
# Process different material types
|
|
material_types = ['videos', 'audios', 'images', 'stickers', 'texts']
|
|
|
|
for material_type in material_types:
|
|
if material_type in materials_section:
|
|
for material in materials_section[material_type]:
|
|
material_info = self._process_single_material(
|
|
material, material_type, source_path, resources_dir
|
|
)
|
|
if material_info:
|
|
materials.append(material_info)
|
|
|
|
return materials
|
|
|
|
def _process_single_material(self, material: Dict, material_type: str,
|
|
source_path: str, resources_dir: Path) -> Optional[MaterialInfo]:
|
|
"""Process a single material"""
|
|
try:
|
|
material_id = material.get('id')
|
|
if not material_id:
|
|
return None
|
|
|
|
# Get original path
|
|
original_path = material.get('path', '')
|
|
if not original_path or not os.path.exists(original_path):
|
|
logger.warning(f"Material path not found: {original_path}")
|
|
return None
|
|
|
|
# Generate relative path
|
|
filename = os.path.basename(original_path)
|
|
relative_path = f"resources/{filename}"
|
|
target_path = resources_dir / filename
|
|
|
|
# Copy file
|
|
shutil.copy2(original_path, target_path)
|
|
|
|
# Get file info
|
|
file_size = os.path.getsize(target_path)
|
|
|
|
material_info = MaterialInfo(
|
|
id=str(uuid.uuid4()),
|
|
material_id=material_id,
|
|
name=material.get('name', filename),
|
|
type=material_type,
|
|
original_path=original_path,
|
|
relative_path=relative_path,
|
|
size=file_size
|
|
)
|
|
|
|
logger.debug(f"Processed material: {filename} -> {relative_path}")
|
|
return material_info
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to process material: {e}")
|
|
return None
|
|
|
|
def _update_draft_paths(self, draft_content: Dict, materials: List[MaterialInfo]) -> Dict:
|
|
"""Update draft content with relative paths"""
|
|
# Create material ID to relative path mapping
|
|
path_mapping = {mat.material_id: mat.relative_path for mat in materials}
|
|
|
|
# Update materials section
|
|
materials_section = draft_content.get('materials', {})
|
|
for material_type in materials_section:
|
|
if isinstance(materials_section[material_type], list):
|
|
for material in materials_section[material_type]:
|
|
material_id = material.get('id')
|
|
if material_id in path_mapping:
|
|
material['path'] = path_mapping[material_id]
|
|
|
|
return draft_content
|
|
|
|
def get_templates(self) -> List[TemplateInfo]:
|
|
"""Get all templates"""
|
|
templates = []
|
|
for template_data in self.templates_metadata.values():
|
|
templates.append(TemplateInfo(**template_data))
|
|
return templates
|
|
|
|
def get_template(self, template_id: str) -> Optional[TemplateInfo]:
|
|
"""Get a specific template"""
|
|
if template_id in self.templates_metadata:
|
|
return TemplateInfo(**self.templates_metadata[template_id])
|
|
return None
|
|
|
|
def delete_template(self, template_id: str) -> bool:
|
|
"""Delete a template"""
|
|
try:
|
|
if template_id in self.templates_metadata:
|
|
# Remove template directory
|
|
template_dir = self.templates_dir / template_id
|
|
if template_dir.exists():
|
|
shutil.rmtree(template_dir)
|
|
|
|
# Remove from metadata
|
|
del self.templates_metadata[template_id]
|
|
self._save_metadata()
|
|
|
|
logger.info(f"Deleted template: {template_id}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete template {template_id}: {e}")
|
|
return False
|
|
|
|
def get_template_detail(self, template_id: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get detailed template information including tracks and segments
|
|
|
|
Args:
|
|
template_id: Template ID
|
|
|
|
Returns:
|
|
Dict containing detailed template information with tracks and segments
|
|
"""
|
|
try:
|
|
# Get basic template info
|
|
template = self.get_template(template_id)
|
|
if not template:
|
|
return None
|
|
|
|
# Load draft content to get tracks and segments
|
|
draft_content_path = Path(template.draft_content_path)
|
|
if not draft_content_path.exists():
|
|
logger.error(f"Draft content file not found: {draft_content_path}")
|
|
return None
|
|
|
|
with open(draft_content_path, 'r', encoding='utf-8') as f:
|
|
draft_content = json.load(f)
|
|
|
|
# Extract tracks and segments information
|
|
tracks = []
|
|
if 'tracks' in draft_content:
|
|
for track_data in draft_content['tracks']:
|
|
track = {
|
|
'id': track_data.get('id', ''),
|
|
'name': track_data.get('name', f"Track {track_data.get('index', 0) + 1}"),
|
|
'type': track_data.get('type', 'video'),
|
|
'index': track_data.get('index', 0),
|
|
'segments': [],
|
|
'properties': track_data.get('properties', {})
|
|
}
|
|
|
|
# Extract segments
|
|
if 'segments' in track_data:
|
|
for segment_data in track_data['segments']:
|
|
# Extract time information from target_timerange
|
|
target_timerange = segment_data.get('target_timerange', {})
|
|
start_time_us = target_timerange.get('start', 0) # 微秒
|
|
duration_us = target_timerange.get('duration', 0) # 微秒
|
|
|
|
# Convert microseconds to seconds
|
|
start_time = start_time_us / 1000000.0
|
|
duration = duration_us / 1000000.0
|
|
end_time = start_time + duration
|
|
|
|
# Get material reference for resource path
|
|
material_id = segment_data.get('material_id', '')
|
|
resource_path = ''
|
|
if material_id:
|
|
# TODO: Look up material path from materials list
|
|
resource_path = f"material_{material_id}"
|
|
|
|
segment = {
|
|
'id': segment_data.get('id', ''),
|
|
'type': segment_data.get('type', 'video'),
|
|
'name': segment_data.get('name', f'Segment {len(track["segments"]) + 1}'),
|
|
'start_time': start_time,
|
|
'end_time': end_time,
|
|
'duration': duration,
|
|
'resource_path': resource_path,
|
|
'properties': segment_data.get('properties', {}),
|
|
'effects': segment_data.get('effects', [])
|
|
}
|
|
track['segments'].append(segment)
|
|
|
|
tracks.append(track)
|
|
|
|
# Build detailed template information
|
|
detail = {
|
|
'id': template.id,
|
|
'name': template.name,
|
|
'description': template.description,
|
|
'canvas_config': template.canvas_config,
|
|
'tracks': tracks,
|
|
'duration': template.duration,
|
|
'fps': draft_content.get('fps', 30),
|
|
'sample_rate': draft_content.get('sample_rate')
|
|
}
|
|
|
|
return detail
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get template detail for {template_id}: {e}")
|
|
return None
|
|
|
|
|
|
def main():
|
|
"""Command line interface for template management."""
|
|
import argparse
|
|
import json
|
|
import sys
|
|
|
|
parser = argparse.ArgumentParser(description="Template Manager")
|
|
parser.add_argument("--action", required=True, help="Action to perform")
|
|
parser.add_argument("--source_folder", help="Source folder for batch import")
|
|
parser.add_argument("--template_id", help="Template ID for operations")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Create JSON-RPC response handler
|
|
rpc = create_response_handler("template_manager")
|
|
progress = create_progress_reporter()
|
|
|
|
try:
|
|
manager = TemplateManager()
|
|
|
|
if args.action == "batch_import":
|
|
if not args.source_folder:
|
|
rpc.error(JSONRPCError.INVALID_PARAMS, "Source folder is required for batch import")
|
|
return
|
|
|
|
def progress_callback(message):
|
|
# Parse progress information from message
|
|
if "Processing template" in message:
|
|
# Extract template info from message like "Processing template 1/3: Template_Name"
|
|
parts = message.split(":")
|
|
if len(parts) >= 2:
|
|
template_name = parts[1].strip()
|
|
# Extract progress numbers
|
|
if "/" in parts[0]:
|
|
progress_part = parts[0].split("Processing template")[1].strip()
|
|
if "/" in progress_part:
|
|
current, total = progress_part.split("/")
|
|
try:
|
|
current_num = int(current.strip())
|
|
total_num = int(total.strip())
|
|
progress_percent = (current_num / total_num) * 100
|
|
|
|
progress.report(
|
|
step="import",
|
|
progress=progress_percent,
|
|
message=message,
|
|
details={
|
|
"current_template": template_name,
|
|
"total_templates": total_num,
|
|
"processed_templates": current_num
|
|
}
|
|
)
|
|
return
|
|
except ValueError:
|
|
pass
|
|
|
|
# Default progress step
|
|
progress.step("import", message)
|
|
|
|
result = manager.batch_import_templates(args.source_folder, progress_callback)
|
|
# Convert TemplateInfo objects to dictionaries for JSON serialization
|
|
if 'imported_templates' in result:
|
|
result['imported_templates'] = [asdict(template) for template in result['imported_templates']]
|
|
|
|
# Send successful result via JSON-RPC
|
|
rpc.success(result)
|
|
|
|
elif args.action == "get_templates":
|
|
templates = manager.get_templates()
|
|
result = {
|
|
"status": True,
|
|
"templates": [asdict(template) for template in templates]
|
|
}
|
|
rpc.success(result)
|
|
|
|
elif args.action == "get_template":
|
|
if not args.template_id:
|
|
rpc.error(JSONRPCError.INVALID_PARAMS, "Template ID is required")
|
|
return
|
|
|
|
template = manager.get_template(args.template_id)
|
|
if template:
|
|
result = {
|
|
"status": True,
|
|
"template": asdict(template)
|
|
}
|
|
rpc.success(result)
|
|
else:
|
|
rpc.error(JSONRPCError.TEMPLATE_NOT_FOUND, f"Template not found: {args.template_id}")
|
|
|
|
elif args.action == "delete_template":
|
|
if not args.template_id:
|
|
rpc.error(JSONRPCError.INVALID_PARAMS, "Template ID is required")
|
|
return
|
|
|
|
success = manager.delete_template(args.template_id)
|
|
if success:
|
|
result = {
|
|
"status": True,
|
|
"msg": "Template deleted successfully"
|
|
}
|
|
rpc.success(result)
|
|
else:
|
|
rpc.error(JSONRPCError.TEMPLATE_NOT_FOUND, "Failed to delete template")
|
|
|
|
elif args.action == "get_template_detail":
|
|
if not args.template_id:
|
|
rpc.error(JSONRPCError.INVALID_PARAMS, "Template ID is required")
|
|
return
|
|
|
|
detail = manager.get_template_detail(args.template_id)
|
|
if detail:
|
|
rpc.success(detail)
|
|
else:
|
|
rpc.error(JSONRPCError.TEMPLATE_NOT_FOUND, f"Template detail not found: {args.template_id}")
|
|
|
|
else:
|
|
rpc.error(JSONRPCError.METHOD_NOT_FOUND, f"Unknown action: {args.action}")
|
|
|
|
except Exception as e:
|
|
# Send error via JSON-RPC
|
|
rpc.error(JSONRPCError.INTERNAL_ERROR, f"Template manager error: {str(e)}", str(e))
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|