mxivideo/python_core/services/file_manager.py

383 lines
12 KiB
Python

#!/usr/bin/env python3
"""
File Manager Service
文件管理服务
Handles file operations, media import, and asset management.
"""
import os
import shutil
from pathlib import Path
from typing import Dict, Any, List, Optional, Union
import hashlib
import mimetypes
import sys
import os
from ..config import settings
from ..utils import setup_logger, validate_video_file, validate_audio_file, validate_image_file, get_file_info
logger = setup_logger(__name__)
class FileManager:
"""File management service."""
def __init__(self):
self.temp_dir = settings.temp_dir
self.cache_dir = settings.cache_dir
self.projects_dir = settings.projects_dir
def import_file(self, source_path: str, project_path: str, copy_file: bool = True) -> Dict[str, Any]:
"""
Import a media file into a project.
Args:
source_path: Source file path
project_path: Project directory path
copy_file: Whether to copy file to project assets
Returns:
Dictionary with import result
"""
try:
source = Path(source_path)
if not source.exists():
raise FileNotFoundError(f"Source file not found: {source_path}")
# Validate file type
file_type = self._get_file_type(source_path)
if file_type == "unknown":
raise ValueError(f"Unsupported file type: {source.suffix}")
# Get file info
file_info = get_file_info(source_path)
if "error" in file_info:
raise ValueError(f"Failed to get file info: {file_info['error']}")
# Determine destination
project_dir = Path(project_path)
assets_dir = project_dir / "assets"
assets_dir.mkdir(exist_ok=True)
if copy_file:
# Copy file to project assets
dest_path = assets_dir / source.name
# Handle name conflicts
counter = 1
while dest_path.exists():
stem = source.stem
suffix = source.suffix
dest_path = assets_dir / f"{stem}_{counter}{suffix}"
counter += 1
shutil.copy2(source, dest_path)
final_path = str(dest_path)
logger.info(f"Copied file to: {final_path}")
else:
# Use original path
final_path = source_path
logger.info(f"Linked file: {final_path}")
# Generate file hash for deduplication
file_hash = self._calculate_file_hash(final_path)
return {
"status": "success",
"file_path": final_path,
"file_type": file_type,
"file_info": file_info,
"file_hash": file_hash,
"imported_at": file_info.get("modified", 0)
}
except Exception as e:
logger.error(f"Failed to import file: {str(e)}")
return {
"status": "error",
"error": str(e)
}
def export_file(self, source_path: str, dest_path: str,
move_file: bool = False) -> Dict[str, Any]:
"""
Export a file from project to external location.
Args:
source_path: Source file path
dest_path: Destination file path
move_file: Whether to move instead of copy
Returns:
Dictionary with export result
"""
try:
source = Path(source_path)
dest = Path(dest_path)
if not source.exists():
raise FileNotFoundError(f"Source file not found: {source_path}")
# Ensure destination directory exists
dest.parent.mkdir(parents=True, exist_ok=True)
# Handle name conflicts
if dest.exists():
counter = 1
while dest.exists():
stem = dest.stem
suffix = dest.suffix
dest = dest.parent / f"{stem}_{counter}{suffix}"
counter += 1
if move_file:
shutil.move(source, dest)
logger.info(f"Moved file to: {dest}")
else:
shutil.copy2(source, dest)
logger.info(f"Copied file to: {dest}")
return {
"status": "success",
"dest_path": str(dest),
"operation": "move" if move_file else "copy"
}
except Exception as e:
logger.error(f"Failed to export file: {str(e)}")
return {
"status": "error",
"error": str(e)
}
def list_project_assets(self, project_path: str) -> Dict[str, Any]:
"""
List all assets in a project.
Args:
project_path: Project directory path
Returns:
Dictionary with asset list
"""
try:
project_dir = Path(project_path)
assets_dir = project_dir / "assets"
if not assets_dir.exists():
return {
"status": "success",
"assets": [],
"count": 0
}
assets = []
for file_path in assets_dir.rglob("*"):
if file_path.is_file():
file_type = self._get_file_type(str(file_path))
if file_type != "unknown":
file_info = get_file_info(str(file_path))
assets.append({
"name": file_path.name,
"path": str(file_path),
"relative_path": str(file_path.relative_to(project_dir)),
"type": file_type,
"size": file_info.get("size", 0),
"size_mb": file_info.get("size_mb", 0),
"modified": file_info.get("modified", 0),
"duration": file_info.get("duration"),
"width": file_info.get("width"),
"height": file_info.get("height"),
"file_hash": self._calculate_file_hash(str(file_path))
})
# Sort by name
assets.sort(key=lambda x: x["name"].lower())
return {
"status": "success",
"assets": assets,
"count": len(assets)
}
except Exception as e:
logger.error(f"Failed to list project assets: {str(e)}")
return {
"status": "error",
"error": str(e)
}
def create_temp_file(self, suffix: str = "", prefix: str = "mixvideo_") -> str:
"""
Create a temporary file.
Args:
suffix: File suffix/extension
prefix: File prefix
Returns:
Path to temporary file
"""
import tempfile
fd, temp_path = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=self.temp_dir)
os.close(fd) # Close file descriptor
logger.debug(f"Created temp file: {temp_path}")
return temp_path
def cleanup_temp_files(self, max_age_hours: int = 24) -> Dict[str, Any]:
"""
Clean up old temporary files.
Args:
max_age_hours: Maximum age of files to keep
Returns:
Dictionary with cleanup result
"""
try:
from utils import clean_temp_files
clean_temp_files(self.temp_dir, max_age_hours)
clean_temp_files(self.cache_dir, max_age_hours)
return {
"status": "success",
"message": f"Cleaned temp files older than {max_age_hours} hours"
}
except Exception as e:
logger.error(f"Failed to cleanup temp files: {str(e)}")
return {
"status": "error",
"error": str(e)
}
def get_disk_usage(self, path: str) -> Dict[str, Any]:
"""
Get disk usage information for a path.
Args:
path: Directory path
Returns:
Dictionary with disk usage info
"""
try:
path_obj = Path(path)
if not path_obj.exists():
raise FileNotFoundError(f"Path not found: {path}")
# Get disk usage
usage = shutil.disk_usage(path)
# Calculate directory size if it's a directory
if path_obj.is_dir():
dir_size = sum(f.stat().st_size for f in path_obj.rglob('*') if f.is_file())
else:
dir_size = path_obj.stat().st_size
return {
"status": "success",
"path": path,
"total_space": usage.total,
"used_space": usage.used,
"free_space": usage.free,
"directory_size": dir_size,
"usage_percent": (usage.used / usage.total) * 100
}
except Exception as e:
logger.error(f"Failed to get disk usage: {str(e)}")
return {
"status": "error",
"error": str(e)
}
def _get_file_type(self, file_path: str) -> str:
"""
Determine file type based on extension and validation.
Args:
file_path: Path to file
Returns:
File type: 'video', 'audio', 'image', or 'unknown'
"""
if validate_video_file(file_path):
return "video"
elif validate_audio_file(file_path):
return "audio"
elif validate_image_file(file_path):
return "image"
else:
return "unknown"
def _calculate_file_hash(self, file_path: str) -> str:
"""
Calculate MD5 hash of a file.
Args:
file_path: Path to file
Returns:
MD5 hash string
"""
try:
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
except Exception:
return ""
def find_duplicates(self, project_path: str) -> Dict[str, Any]:
"""
Find duplicate files in a project based on hash.
Args:
project_path: Project directory path
Returns:
Dictionary with duplicate files
"""
try:
assets_result = self.list_project_assets(project_path)
if assets_result["status"] != "success":
return assets_result
# Group files by hash
hash_groups = {}
for asset in assets_result["assets"]:
file_hash = asset["file_hash"]
if file_hash:
if file_hash not in hash_groups:
hash_groups[file_hash] = []
hash_groups[file_hash].append(asset)
# Find duplicates (groups with more than one file)
duplicates = {hash_val: files for hash_val, files in hash_groups.items() if len(files) > 1}
return {
"status": "success",
"duplicates": duplicates,
"duplicate_count": sum(len(files) - 1 for files in duplicates.values()),
"total_duplicate_size": sum(
sum(file["size"] for file in files[1:]) # Skip first file in each group
for files in duplicates.values()
)
}
except Exception as e:
logger.error(f"Failed to find duplicates: {str(e)}")
return {
"status": "error",
"error": str(e)
}