#!/usr/bin/env python3 """ File Manager Service 文件管理服务 Handles file operations, media import, and asset management. """ import os import shutil from pathlib import Path from typing import Dict, Any, List, Optional, Union import hashlib import mimetypes import sys import os from ..config import settings from ..utils import setup_logger, validate_video_file, validate_audio_file, validate_image_file, get_file_info logger = setup_logger(__name__) class FileManager: """File management service.""" def __init__(self): self.temp_dir = settings.temp_dir self.cache_dir = settings.cache_dir self.projects_dir = settings.projects_dir def import_file(self, source_path: str, project_path: str, copy_file: bool = True) -> Dict[str, Any]: """ Import a media file into a project. Args: source_path: Source file path project_path: Project directory path copy_file: Whether to copy file to project assets Returns: Dictionary with import result """ try: source = Path(source_path) if not source.exists(): raise FileNotFoundError(f"Source file not found: {source_path}") # Validate file type file_type = self._get_file_type(source_path) if file_type == "unknown": raise ValueError(f"Unsupported file type: {source.suffix}") # Get file info file_info = get_file_info(source_path) if "error" in file_info: raise ValueError(f"Failed to get file info: {file_info['error']}") # Determine destination project_dir = Path(project_path) assets_dir = project_dir / "assets" assets_dir.mkdir(exist_ok=True) if copy_file: # Copy file to project assets dest_path = assets_dir / source.name # Handle name conflicts counter = 1 while dest_path.exists(): stem = source.stem suffix = source.suffix dest_path = assets_dir / f"{stem}_{counter}{suffix}" counter += 1 shutil.copy2(source, dest_path) final_path = str(dest_path) logger.info(f"Copied file to: {final_path}") else: # Use original path final_path = source_path logger.info(f"Linked file: {final_path}") # Generate file hash for deduplication file_hash = self._calculate_file_hash(final_path) return { "status": "success", "file_path": final_path, "file_type": file_type, "file_info": file_info, "file_hash": file_hash, "imported_at": file_info.get("modified", 0) } except Exception as e: logger.error(f"Failed to import file: {str(e)}") return { "status": "error", "error": str(e) } def export_file(self, source_path: str, dest_path: str, move_file: bool = False) -> Dict[str, Any]: """ Export a file from project to external location. Args: source_path: Source file path dest_path: Destination file path move_file: Whether to move instead of copy Returns: Dictionary with export result """ try: source = Path(source_path) dest = Path(dest_path) if not source.exists(): raise FileNotFoundError(f"Source file not found: {source_path}") # Ensure destination directory exists dest.parent.mkdir(parents=True, exist_ok=True) # Handle name conflicts if dest.exists(): counter = 1 while dest.exists(): stem = dest.stem suffix = dest.suffix dest = dest.parent / f"{stem}_{counter}{suffix}" counter += 1 if move_file: shutil.move(source, dest) logger.info(f"Moved file to: {dest}") else: shutil.copy2(source, dest) logger.info(f"Copied file to: {dest}") return { "status": "success", "dest_path": str(dest), "operation": "move" if move_file else "copy" } except Exception as e: logger.error(f"Failed to export file: {str(e)}") return { "status": "error", "error": str(e) } def list_project_assets(self, project_path: str) -> Dict[str, Any]: """ List all assets in a project. Args: project_path: Project directory path Returns: Dictionary with asset list """ try: project_dir = Path(project_path) assets_dir = project_dir / "assets" if not assets_dir.exists(): return { "status": "success", "assets": [], "count": 0 } assets = [] for file_path in assets_dir.rglob("*"): if file_path.is_file(): file_type = self._get_file_type(str(file_path)) if file_type != "unknown": file_info = get_file_info(str(file_path)) assets.append({ "name": file_path.name, "path": str(file_path), "relative_path": str(file_path.relative_to(project_dir)), "type": file_type, "size": file_info.get("size", 0), "size_mb": file_info.get("size_mb", 0), "modified": file_info.get("modified", 0), "duration": file_info.get("duration"), "width": file_info.get("width"), "height": file_info.get("height"), "file_hash": self._calculate_file_hash(str(file_path)) }) # Sort by name assets.sort(key=lambda x: x["name"].lower()) return { "status": "success", "assets": assets, "count": len(assets) } except Exception as e: logger.error(f"Failed to list project assets: {str(e)}") return { "status": "error", "error": str(e) } def create_temp_file(self, suffix: str = "", prefix: str = "mixvideo_") -> str: """ Create a temporary file. Args: suffix: File suffix/extension prefix: File prefix Returns: Path to temporary file """ import tempfile fd, temp_path = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=self.temp_dir) os.close(fd) # Close file descriptor logger.debug(f"Created temp file: {temp_path}") return temp_path def cleanup_temp_files(self, max_age_hours: int = 24) -> Dict[str, Any]: """ Clean up old temporary files. Args: max_age_hours: Maximum age of files to keep Returns: Dictionary with cleanup result """ try: from utils import clean_temp_files clean_temp_files(self.temp_dir, max_age_hours) clean_temp_files(self.cache_dir, max_age_hours) return { "status": "success", "message": f"Cleaned temp files older than {max_age_hours} hours" } except Exception as e: logger.error(f"Failed to cleanup temp files: {str(e)}") return { "status": "error", "error": str(e) } def get_disk_usage(self, path: str) -> Dict[str, Any]: """ Get disk usage information for a path. Args: path: Directory path Returns: Dictionary with disk usage info """ try: path_obj = Path(path) if not path_obj.exists(): raise FileNotFoundError(f"Path not found: {path}") # Get disk usage usage = shutil.disk_usage(path) # Calculate directory size if it's a directory if path_obj.is_dir(): dir_size = sum(f.stat().st_size for f in path_obj.rglob('*') if f.is_file()) else: dir_size = path_obj.stat().st_size return { "status": "success", "path": path, "total_space": usage.total, "used_space": usage.used, "free_space": usage.free, "directory_size": dir_size, "usage_percent": (usage.used / usage.total) * 100 } except Exception as e: logger.error(f"Failed to get disk usage: {str(e)}") return { "status": "error", "error": str(e) } def _get_file_type(self, file_path: str) -> str: """ Determine file type based on extension and validation. Args: file_path: Path to file Returns: File type: 'video', 'audio', 'image', or 'unknown' """ if validate_video_file(file_path): return "video" elif validate_audio_file(file_path): return "audio" elif validate_image_file(file_path): return "image" else: return "unknown" def _calculate_file_hash(self, file_path: str) -> str: """ Calculate MD5 hash of a file. Args: file_path: Path to file Returns: MD5 hash string """ try: hash_md5 = hashlib.md5() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() except Exception: return "" def find_duplicates(self, project_path: str) -> Dict[str, Any]: """ Find duplicate files in a project based on hash. Args: project_path: Project directory path Returns: Dictionary with duplicate files """ try: assets_result = self.list_project_assets(project_path) if assets_result["status"] != "success": return assets_result # Group files by hash hash_groups = {} for asset in assets_result["assets"]: file_hash = asset["file_hash"] if file_hash: if file_hash not in hash_groups: hash_groups[file_hash] = [] hash_groups[file_hash].append(asset) # Find duplicates (groups with more than one file) duplicates = {hash_val: files for hash_val, files in hash_groups.items() if len(files) > 1} return { "status": "success", "duplicates": duplicates, "duplicate_count": sum(len(files) - 1 for files in duplicates.values()), "total_duplicate_size": sum( sum(file["size"] for file in files[1:]) # Skip first file in each group for files in duplicates.values() ) } except Exception as e: logger.error(f"Failed to find duplicates: {str(e)}") return { "status": "error", "error": str(e) }