mxivideo/python_core/services/audio_manager.py

506 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
音频库管理服务
管理音频文件包括上传、批量处理、MD5唯一键、节奏提取和频率图绘制
"""
import json
import uuid
import os
import hashlib
import shutil
from pathlib import Path
from typing import List, Dict, Optional, Tuple
from dataclasses import dataclass, asdict
from datetime import datetime
import base64
from python_core.config import settings
from python_core.utils.logger import logger
from python_core.utils.jsonrpc import create_response_handler
# 音频处理库
try:
import librosa
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg') # 使用非交互式后端
AUDIO_LIBS_AVAILABLE = True
except ImportError:
logger.warning("Audio processing libraries not available. Install librosa, numpy, matplotlib for full functionality.")
AUDIO_LIBS_AVAILABLE = False
@dataclass
class AudioFile:
"""音频文件数据结构"""
id: str
filename: str
file_path: str
md5_hash: str
file_size: int
duration: float
sample_rate: int
channels: int
format: str
tempo: Optional[float] = None
beat_times: Optional[List[float]] = None
spectral_centroid: Optional[float] = None
spectral_rolloff: Optional[float] = None
zero_crossing_rate: Optional[float] = None
mfcc_features: Optional[List[float]] = None
frequency_chart_path: Optional[str] = None
created_at: str = ""
updated_at: str = ""
is_active: bool = True
# 视频分析 推荐使用 audio_processing.core.AudioProcessor
class AudioManager:
"""音频管理器"""
def __init__(self):
self.cache_dir = settings.temp_dir / "cache"
self.cache_dir.mkdir(parents=True, exist_ok=True)
# 音频数据文件
self.audio_files_file = self.cache_dir / "audio_files.json"
self.audio_files = self._load_audio_files()
# 音频存储目录
self.audio_storage_dir = settings.temp_dir / "audio_storage"
self.audio_storage_dir.mkdir(parents=True, exist_ok=True)
# 频率图存储目录
self.charts_dir = settings.temp_dir / "audio_charts"
self.charts_dir.mkdir(parents=True, exist_ok=True)
def _load_audio_files(self) -> List[AudioFile]:
"""加载音频文件数据"""
if self.audio_files_file.exists():
try:
with open(self.audio_files_file, 'r', encoding='utf-8') as f:
data = json.load(f)
return [AudioFile(**item) for item in data]
except Exception as e:
logger.error(f"Failed to load audio files: {e}")
return []
else:
return []
def _save_audio_files(self, audio_files: List[AudioFile] = None):
"""保存音频文件数据"""
if audio_files is None:
audio_files = self.audio_files
try:
data = [asdict(audio_file) for audio_file in audio_files]
with open(self.audio_files_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
logger.info(f"Audio files saved to {self.audio_files_file}")
except Exception as e:
logger.error(f"Failed to save audio files: {e}")
raise
def _calculate_md5(self, file_path: str) -> str:
"""计算文件MD5哈希值"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def _get_audio_info(self, file_path: str) -> Dict:
"""获取音频基本信息"""
if not AUDIO_LIBS_AVAILABLE:
# 如果没有音频处理库,返回基本信息
file_size = os.path.getsize(file_path)
return {
'duration': 0.0,
'sample_rate': 0,
'channels': 0,
'file_size': file_size
}
try:
# 使用librosa获取音频信息
y, sr = librosa.load(file_path, sr=None)
duration = librosa.get_duration(y=y, sr=sr)
return {
'duration': duration,
'sample_rate': sr,
'channels': 1 if len(y.shape) == 1 else y.shape[0],
'file_size': os.path.getsize(file_path)
}
except Exception as e:
logger.error(f"Failed to get audio info: {e}")
file_size = os.path.getsize(file_path)
return {
'duration': 0.0,
'sample_rate': 0,
'channels': 0,
'file_size': file_size
}
def _extract_audio_features(self, file_path: str) -> Dict:
"""提取音频特征(节奏、频谱等)"""
if not AUDIO_LIBS_AVAILABLE:
return {
'tempo': None,
'beat_times': None,
'spectral_centroid': None,
'spectral_rolloff': None,
'zero_crossing_rate': None,
'mfcc_features': None
}
try:
# 加载音频
y, sr = librosa.load(file_path, sr=None)
# 提取节拍和节奏
tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr)
beat_times = librosa.frames_to_time(beat_frames, sr=sr).tolist()
# 提取频谱特征
spectral_centroids = librosa.feature.spectral_centroid(y=y, sr=sr)[0]
spectral_rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)[0]
zero_crossing_rate = librosa.feature.zero_crossing_rate(y)[0]
# 提取MFCC特征
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
mfcc_mean = np.mean(mfccs, axis=1).tolist()
return {
'tempo': float(tempo),
'beat_times': beat_times,
'spectral_centroid': float(np.mean(spectral_centroids)),
'spectral_rolloff': float(np.mean(spectral_rolloff)),
'zero_crossing_rate': float(np.mean(zero_crossing_rate)),
'mfcc_features': mfcc_mean
}
except Exception as e:
logger.error(f"Failed to extract audio features: {e}")
return {
'tempo': None,
'beat_times': None,
'spectral_centroid': None,
'spectral_rolloff': None,
'zero_crossing_rate': None,
'mfcc_features': None
}
def _generate_frequency_chart(self, file_path: str, audio_id: str) -> Optional[str]:
"""生成音频频率图"""
if not AUDIO_LIBS_AVAILABLE:
return None
try:
# 加载音频
y, sr = librosa.load(file_path, sr=None)
# 生成频谱图
plt.figure(figsize=(12, 8))
# 子图1: 波形图
plt.subplot(3, 1, 1)
plt.plot(np.linspace(0, len(y)/sr, len(y)), y)
plt.title('Waveform')
plt.xlabel('Time (s)')
plt.ylabel('Amplitude')
# 子图2: 频谱图
plt.subplot(3, 1, 2)
D = librosa.amplitude_to_db(np.abs(librosa.stft(y)), ref=np.max)
librosa.display.specshow(D, sr=sr, x_axis='time', y_axis='hz')
plt.colorbar(format='%+2.0f dB')
plt.title('Spectrogram')
# 子图3: 梅尔频谱图
plt.subplot(3, 1, 3)
S = librosa.feature.melspectrogram(y=y, sr=sr)
S_dB = librosa.power_to_db(S, ref=np.max)
librosa.display.specshow(S_dB, sr=sr, x_axis='time', y_axis='mel')
plt.colorbar(format='%+2.0f dB')
plt.title('Mel-frequency spectrogram')
plt.tight_layout()
# 保存图表
chart_filename = f"{audio_id}_frequency_chart.png"
chart_path = self.charts_dir / chart_filename
plt.savefig(chart_path, dpi=150, bbox_inches='tight')
plt.close()
return str(chart_path)
except Exception as e:
logger.error(f"Failed to generate frequency chart: {e}")
return None
def get_audio_by_md5(self, md5_hash: str) -> Optional[Dict]:
"""根据MD5获取音频文件"""
for audio_file in self.audio_files:
if audio_file.md5_hash == md5_hash and audio_file.is_active:
return asdict(audio_file)
return None
def get_all_audio_files(self) -> List[Dict]:
"""获取所有音频文件"""
return [asdict(audio_file) for audio_file in self.audio_files if audio_file.is_active]
def get_audio_by_id(self, audio_id: str) -> Optional[Dict]:
"""根据ID获取音频文件"""
for audio_file in self.audio_files:
if audio_file.id == audio_id and audio_file.is_active:
return asdict(audio_file)
return None
def upload_audio_file(self, source_path: str, filename: str = None) -> Dict:
"""上传单个音频文件"""
if not os.path.exists(source_path):
raise FileNotFoundError(f"Source file not found: {source_path}")
# 计算MD5
md5_hash = self._calculate_md5(source_path)
# 检查是否已存在相同MD5的文件
existing = self.get_audio_by_md5(md5_hash)
if existing:
logger.info(f"Audio file with MD5 {md5_hash} already exists")
return existing
# 生成新的音频ID和文件名
audio_id = str(uuid.uuid4())
if filename is None:
filename = os.path.basename(source_path)
# 获取文件扩展名
file_ext = os.path.splitext(filename)[1].lower()
stored_filename = f"{audio_id}{file_ext}"
stored_path = self.audio_storage_dir / stored_filename
# 复制文件到存储目录
shutil.copy2(source_path, stored_path)
# 获取音频基本信息
audio_info = self._get_audio_info(str(stored_path))
# 提取音频特征
features = self._extract_audio_features(str(stored_path))
# 生成频率图
chart_path = self._generate_frequency_chart(str(stored_path), audio_id)
# 创建音频文件记录
now = datetime.now().isoformat()
audio_file = AudioFile(
id=audio_id,
filename=filename,
file_path=str(stored_path),
md5_hash=md5_hash,
file_size=audio_info['file_size'],
duration=audio_info['duration'],
sample_rate=audio_info['sample_rate'],
channels=audio_info['channels'],
format=file_ext[1:] if file_ext else 'unknown',
tempo=features['tempo'],
beat_times=features['beat_times'],
spectral_centroid=features['spectral_centroid'],
spectral_rolloff=features['spectral_rolloff'],
zero_crossing_rate=features['zero_crossing_rate'],
mfcc_features=features['mfcc_features'],
frequency_chart_path=chart_path,
created_at=now,
updated_at=now
)
self.audio_files.append(audio_file)
self._save_audio_files()
logger.info(f"Uploaded audio file: {filename} (MD5: {md5_hash})")
return asdict(audio_file)
def batch_upload_audio_files(self, source_directory: str) -> Dict:
"""批量上传音频文件"""
if not os.path.exists(source_directory):
raise FileNotFoundError(f"Source directory not found: {source_directory}")
# 支持的音频格式
audio_extensions = {'.mp3', '.wav', '.flac', '.aac', '.ogg', '.m4a', '.wma'}
results = {
'total_files': 0,
'uploaded_files': 0,
'skipped_files': 0,
'failed_files': 0,
'uploaded_list': [],
'skipped_list': [],
'failed_list': []
}
# 遍历目录中的所有文件
for root, dirs, files in os.walk(source_directory):
for file in files:
file_path = os.path.join(root, file)
file_ext = os.path.splitext(file)[1].lower()
# 检查是否为音频文件
if file_ext not in audio_extensions:
continue
results['total_files'] += 1
try:
# 尝试上传文件
result = self.upload_audio_file(file_path, file)
# 检查是否为新上传的文件
if any(existing['md5_hash'] == result['md5_hash']
for existing in results['uploaded_list']):
results['skipped_files'] += 1
results['skipped_list'].append({
'filename': file,
'reason': 'Already exists (same MD5)'
})
else:
results['uploaded_files'] += 1
results['uploaded_list'].append(result)
except Exception as e:
results['failed_files'] += 1
results['failed_list'].append({
'filename': file,
'error': str(e)
})
logger.error(f"Failed to upload {file}: {e}")
logger.info(f"Batch upload completed: {results['uploaded_files']} uploaded, "
f"{results['skipped_files']} skipped, {results['failed_files']} failed")
return results
def delete_audio_file(self, audio_id: str) -> bool:
"""删除音频文件"""
for i, audio_file in enumerate(self.audio_files):
if audio_file.id == audio_id:
# 删除物理文件
try:
if os.path.exists(audio_file.file_path):
os.remove(audio_file.file_path)
# 删除频率图
if audio_file.frequency_chart_path and os.path.exists(audio_file.frequency_chart_path):
os.remove(audio_file.frequency_chart_path)
except Exception as e:
logger.error(f"Failed to delete physical files: {e}")
# 从列表中移除
deleted_audio = self.audio_files.pop(i)
self._save_audio_files()
logger.info(f"Deleted audio file: {audio_id} - {deleted_audio.filename}")
return True
return False
def search_audio_files(self, keyword: str) -> List[Dict]:
"""搜索音频文件"""
keyword = keyword.lower()
results = []
for audio_file in self.audio_files:
if (audio_file.is_active and
keyword in audio_file.filename.lower()):
results.append(asdict(audio_file))
return results
# 全局实例
audio_manager = AudioManager()
def main():
"""命令行接口 - 使用JSON-RPC协议"""
import sys
import json
# 创建响应处理器
rpc = create_response_handler()
if len(sys.argv) < 2:
rpc.error("INVALID_REQUEST", "No command specified")
return
command = sys.argv[1]
try:
if command == "get_all_audio_files":
audio_files = audio_manager.get_all_audio_files()
rpc.success(audio_files)
elif command == "get_audio_by_id":
if len(sys.argv) < 3:
rpc.error("INVALID_REQUEST", "Audio ID required")
return
audio_id = sys.argv[2]
audio_file = audio_manager.get_audio_by_id(audio_id)
if audio_file:
rpc.success(audio_file)
else:
rpc.error("NOT_FOUND", "Audio file not found")
elif command == "get_audio_by_md5":
if len(sys.argv) < 3:
rpc.error("INVALID_REQUEST", "MD5 hash required")
return
md5_hash = sys.argv[2]
audio_file = audio_manager.get_audio_by_md5(md5_hash)
if audio_file:
rpc.success(audio_file)
else:
rpc.error("NOT_FOUND", "Audio file not found")
elif command == "upload_audio_file":
if len(sys.argv) < 3:
rpc.error("INVALID_REQUEST", "Source path required")
return
source_path = sys.argv[2]
filename = sys.argv[3] if len(sys.argv) > 3 else None
result = audio_manager.upload_audio_file(source_path, filename)
rpc.success(result)
elif command == "batch_upload_audio_files":
if len(sys.argv) < 3:
rpc.error("INVALID_REQUEST", "Source directory required")
return
source_directory = sys.argv[2]
result = audio_manager.batch_upload_audio_files(source_directory)
rpc.success(result)
elif command == "delete_audio_file":
if len(sys.argv) < 3:
rpc.error("INVALID_REQUEST", "Audio ID required")
return
audio_id = sys.argv[2]
success = audio_manager.delete_audio_file(audio_id)
rpc.success(success)
elif command == "search_audio_files":
if len(sys.argv) < 3:
rpc.error("INVALID_REQUEST", "Search keyword required")
return
keyword = sys.argv[2]
results = audio_manager.search_audio_files(keyword)
rpc.success(results)
else:
rpc.error("INVALID_REQUEST", f"Unknown command: {command}")
except Exception as e:
logger.error(f"Command execution failed: {e}")
rpc.error("INTERNAL_ERROR", str(e))
if __name__ == "__main__":
main()