mxivideo/python_core/utils/helpers.py

196 lines
5.4 KiB
Python

"""
Helper utilities for MixVideo V2
"""
import os
from pathlib import Path
from typing import Dict, Any, Union
import ffmpeg
def format_duration(seconds: float) -> str:
"""
Format duration in seconds to human-readable string.
Args:
seconds: Duration in seconds
Returns:
Formatted duration string (e.g., "1:23:45")
"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
if hours > 0:
return f"{hours}:{minutes:02d}:{secs:02d}"
else:
return f"{minutes}:{secs:02d}"
def get_file_info(file_path: Union[str, Path]) -> Dict[str, Any]:
"""
Get basic file information.
Args:
file_path: Path to file
Returns:
Dictionary with file information
"""
try:
path = Path(file_path)
if not path.exists():
return {"error": "File not found"}
stat = path.stat()
info = {
"name": path.name,
"size": stat.st_size,
"size_mb": round(stat.st_size / (1024 * 1024), 2),
"modified": stat.st_mtime,
"extension": path.suffix.lower(),
"is_file": path.is_file(),
"is_directory": path.is_dir()
}
# Try to get media info if it's a media file
if path.suffix.lower() in ['.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.mp3', '.wav', '.aac', '.flac', '.ogg']:
try:
probe = ffmpeg.probe(str(path))
format_info = probe.get('format', {})
info.update({
"duration": float(format_info.get('duration', 0)),
"duration_formatted": format_duration(float(format_info.get('duration', 0))),
"bitrate": int(format_info.get('bit_rate', 0)),
"format_name": format_info.get('format_name', ''),
})
# Video specific info
video_streams = [s for s in probe.get('streams', []) if s.get('codec_type') == 'video']
if video_streams:
video_stream = video_streams[0]
info.update({
"width": int(video_stream.get('width', 0)),
"height": int(video_stream.get('height', 0)),
"fps": eval(video_stream.get('r_frame_rate', '0/1')),
"video_codec": video_stream.get('codec_name', ''),
})
# Audio specific info
audio_streams = [s for s in probe.get('streams', []) if s.get('codec_type') == 'audio']
if audio_streams:
audio_stream = audio_streams[0]
info.update({
"sample_rate": int(audio_stream.get('sample_rate', 0)),
"channels": int(audio_stream.get('channels', 0)),
"audio_codec": audio_stream.get('codec_name', ''),
})
except Exception as e:
info["media_info_error"] = str(e)
return info
except Exception as e:
return {"error": str(e)}
def ensure_directory(directory: Union[str, Path]) -> Path:
"""
Ensure directory exists, create if it doesn't.
Args:
directory: Directory path
Returns:
Path object of the directory
"""
path = Path(directory)
path.mkdir(parents=True, exist_ok=True)
return path
def get_unique_filename(file_path: Union[str, Path]) -> Path:
"""
Get a unique filename by appending a number if file already exists.
Args:
file_path: Original file path
Returns:
Unique file path
"""
path = Path(file_path)
if not path.exists():
return path
counter = 1
while True:
stem = path.stem
suffix = path.suffix
parent = path.parent
new_name = f"{stem}_{counter}{suffix}"
new_path = parent / new_name
if not new_path.exists():
return new_path
counter += 1
def format_file_size(size_bytes: int) -> str:
"""
Format file size in bytes to human-readable string.
Args:
size_bytes: Size in bytes
Returns:
Formatted size string (e.g., "1.5 MB")
"""
if size_bytes == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB", "TB"]
i = 0
size = float(size_bytes)
while size >= 1024.0 and i < len(size_names) - 1:
size /= 1024.0
i += 1
return f"{size:.1f} {size_names[i]}"
def clean_temp_files(temp_dir: Union[str, Path], max_age_hours: int = 24):
"""
Clean old temporary files.
Args:
temp_dir: Temporary directory path
max_age_hours: Maximum age of files to keep in hours
"""
import time
temp_path = Path(temp_dir)
if not temp_path.exists():
return
current_time = time.time()
max_age_seconds = max_age_hours * 3600
for file_path in temp_path.rglob('*'):
if file_path.is_file():
file_age = current_time - file_path.stat().st_mtime
if file_age > max_age_seconds:
try:
file_path.unlink()
except Exception:
pass # Ignore errors when deleting files