196 lines
5.4 KiB
Python
196 lines
5.4 KiB
Python
"""
|
|
Helper utilities for MixVideo V2
|
|
"""
|
|
|
|
import os
|
|
from pathlib import Path
|
|
from typing import Dict, Any, Union
|
|
import ffmpeg
|
|
|
|
|
|
def format_duration(seconds: float) -> str:
|
|
"""
|
|
Format duration in seconds to human-readable string.
|
|
|
|
Args:
|
|
seconds: Duration in seconds
|
|
|
|
Returns:
|
|
Formatted duration string (e.g., "1:23:45")
|
|
"""
|
|
hours = int(seconds // 3600)
|
|
minutes = int((seconds % 3600) // 60)
|
|
secs = int(seconds % 60)
|
|
|
|
if hours > 0:
|
|
return f"{hours}:{minutes:02d}:{secs:02d}"
|
|
else:
|
|
return f"{minutes}:{secs:02d}"
|
|
|
|
|
|
def get_file_info(file_path: Union[str, Path]) -> Dict[str, Any]:
|
|
"""
|
|
Get basic file information.
|
|
|
|
Args:
|
|
file_path: Path to file
|
|
|
|
Returns:
|
|
Dictionary with file information
|
|
"""
|
|
try:
|
|
path = Path(file_path)
|
|
|
|
if not path.exists():
|
|
return {"error": "File not found"}
|
|
|
|
stat = path.stat()
|
|
|
|
info = {
|
|
"name": path.name,
|
|
"size": stat.st_size,
|
|
"size_mb": round(stat.st_size / (1024 * 1024), 2),
|
|
"modified": stat.st_mtime,
|
|
"extension": path.suffix.lower(),
|
|
"is_file": path.is_file(),
|
|
"is_directory": path.is_dir()
|
|
}
|
|
|
|
# Try to get media info if it's a media file
|
|
if path.suffix.lower() in ['.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.mp3', '.wav', '.aac', '.flac', '.ogg']:
|
|
try:
|
|
probe = ffmpeg.probe(str(path))
|
|
format_info = probe.get('format', {})
|
|
|
|
info.update({
|
|
"duration": float(format_info.get('duration', 0)),
|
|
"duration_formatted": format_duration(float(format_info.get('duration', 0))),
|
|
"bitrate": int(format_info.get('bit_rate', 0)),
|
|
"format_name": format_info.get('format_name', ''),
|
|
})
|
|
|
|
# Video specific info
|
|
video_streams = [s for s in probe.get('streams', []) if s.get('codec_type') == 'video']
|
|
if video_streams:
|
|
video_stream = video_streams[0]
|
|
info.update({
|
|
"width": int(video_stream.get('width', 0)),
|
|
"height": int(video_stream.get('height', 0)),
|
|
"fps": eval(video_stream.get('r_frame_rate', '0/1')),
|
|
"video_codec": video_stream.get('codec_name', ''),
|
|
})
|
|
|
|
# Audio specific info
|
|
audio_streams = [s for s in probe.get('streams', []) if s.get('codec_type') == 'audio']
|
|
if audio_streams:
|
|
audio_stream = audio_streams[0]
|
|
info.update({
|
|
"sample_rate": int(audio_stream.get('sample_rate', 0)),
|
|
"channels": int(audio_stream.get('channels', 0)),
|
|
"audio_codec": audio_stream.get('codec_name', ''),
|
|
})
|
|
|
|
except Exception as e:
|
|
info["media_info_error"] = str(e)
|
|
|
|
return info
|
|
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
|
|
def ensure_directory(directory: Union[str, Path]) -> Path:
|
|
"""
|
|
Ensure directory exists, create if it doesn't.
|
|
|
|
Args:
|
|
directory: Directory path
|
|
|
|
Returns:
|
|
Path object of the directory
|
|
"""
|
|
path = Path(directory)
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
|
|
|
|
def get_unique_filename(file_path: Union[str, Path]) -> Path:
|
|
"""
|
|
Get a unique filename by appending a number if file already exists.
|
|
|
|
Args:
|
|
file_path: Original file path
|
|
|
|
Returns:
|
|
Unique file path
|
|
"""
|
|
path = Path(file_path)
|
|
|
|
if not path.exists():
|
|
return path
|
|
|
|
counter = 1
|
|
while True:
|
|
stem = path.stem
|
|
suffix = path.suffix
|
|
parent = path.parent
|
|
|
|
new_name = f"{stem}_{counter}{suffix}"
|
|
new_path = parent / new_name
|
|
|
|
if not new_path.exists():
|
|
return new_path
|
|
|
|
counter += 1
|
|
|
|
|
|
def format_file_size(size_bytes: int) -> str:
|
|
"""
|
|
Format file size in bytes to human-readable string.
|
|
|
|
Args:
|
|
size_bytes: Size in bytes
|
|
|
|
Returns:
|
|
Formatted size string (e.g., "1.5 MB")
|
|
"""
|
|
if size_bytes == 0:
|
|
return "0 B"
|
|
|
|
size_names = ["B", "KB", "MB", "GB", "TB"]
|
|
i = 0
|
|
size = float(size_bytes)
|
|
|
|
while size >= 1024.0 and i < len(size_names) - 1:
|
|
size /= 1024.0
|
|
i += 1
|
|
|
|
return f"{size:.1f} {size_names[i]}"
|
|
|
|
|
|
def clean_temp_files(temp_dir: Union[str, Path], max_age_hours: int = 24):
|
|
"""
|
|
Clean old temporary files.
|
|
|
|
Args:
|
|
temp_dir: Temporary directory path
|
|
max_age_hours: Maximum age of files to keep in hours
|
|
"""
|
|
import time
|
|
|
|
temp_path = Path(temp_dir)
|
|
if not temp_path.exists():
|
|
return
|
|
|
|
current_time = time.time()
|
|
max_age_seconds = max_age_hours * 3600
|
|
|
|
for file_path in temp_path.rglob('*'):
|
|
if file_path.is_file():
|
|
file_age = current_time - file_path.stat().st_mtime
|
|
if file_age > max_age_seconds:
|
|
try:
|
|
file_path.unlink()
|
|
except Exception:
|
|
pass # Ignore errors when deleting files
|