506 lines
18 KiB
Python
506 lines
18 KiB
Python
"""
|
||
音频库管理服务
|
||
管理音频文件,包括上传、批量处理、MD5唯一键、节奏提取和频率图绘制
|
||
"""
|
||
|
||
import json
|
||
import uuid
|
||
import os
|
||
import hashlib
|
||
import shutil
|
||
from pathlib import Path
|
||
from typing import List, Dict, Optional, Tuple
|
||
from dataclasses import dataclass, asdict
|
||
from datetime import datetime
|
||
import base64
|
||
|
||
from python_core.config import settings
|
||
from python_core.utils.logger import logger
|
||
from python_core.utils.jsonrpc import create_response_handler
|
||
|
||
# 音频处理库
|
||
try:
|
||
import librosa
|
||
import numpy as np
|
||
import matplotlib.pyplot as plt
|
||
import matplotlib
|
||
matplotlib.use('Agg') # 使用非交互式后端
|
||
AUDIO_LIBS_AVAILABLE = True
|
||
except ImportError:
|
||
logger.warning("Audio processing libraries not available. Install librosa, numpy, matplotlib for full functionality.")
|
||
AUDIO_LIBS_AVAILABLE = False
|
||
|
||
|
||
@dataclass
|
||
class AudioFile:
|
||
"""音频文件数据结构"""
|
||
id: str
|
||
filename: str
|
||
file_path: str
|
||
md5_hash: str
|
||
file_size: int
|
||
duration: float
|
||
sample_rate: int
|
||
channels: int
|
||
format: str
|
||
tempo: Optional[float] = None
|
||
beat_times: Optional[List[float]] = None
|
||
spectral_centroid: Optional[float] = None
|
||
spectral_rolloff: Optional[float] = None
|
||
zero_crossing_rate: Optional[float] = None
|
||
mfcc_features: Optional[List[float]] = None
|
||
frequency_chart_path: Optional[str] = None
|
||
created_at: str = ""
|
||
updated_at: str = ""
|
||
is_active: bool = True
|
||
|
||
# 视频分析 推荐使用 audio_processing.core.AudioProcessor
|
||
class AudioManager:
|
||
"""音频管理器"""
|
||
|
||
def __init__(self):
|
||
self.cache_dir = settings.temp_dir / "cache"
|
||
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 音频数据文件
|
||
self.audio_files_file = self.cache_dir / "audio_files.json"
|
||
self.audio_files = self._load_audio_files()
|
||
|
||
# 音频存储目录
|
||
self.audio_storage_dir = settings.temp_dir / "audio_storage"
|
||
self.audio_storage_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 频率图存储目录
|
||
self.charts_dir = settings.temp_dir / "audio_charts"
|
||
self.charts_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
def _load_audio_files(self) -> List[AudioFile]:
|
||
"""加载音频文件数据"""
|
||
if self.audio_files_file.exists():
|
||
try:
|
||
with open(self.audio_files_file, 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
return [AudioFile(**item) for item in data]
|
||
except Exception as e:
|
||
logger.error(f"Failed to load audio files: {e}")
|
||
return []
|
||
else:
|
||
return []
|
||
|
||
def _save_audio_files(self, audio_files: List[AudioFile] = None):
|
||
"""保存音频文件数据"""
|
||
if audio_files is None:
|
||
audio_files = self.audio_files
|
||
|
||
try:
|
||
data = [asdict(audio_file) for audio_file in audio_files]
|
||
with open(self.audio_files_file, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
logger.info(f"Audio files saved to {self.audio_files_file}")
|
||
except Exception as e:
|
||
logger.error(f"Failed to save audio files: {e}")
|
||
raise
|
||
|
||
def _calculate_md5(self, file_path: str) -> str:
|
||
"""计算文件MD5哈希值"""
|
||
hash_md5 = hashlib.md5()
|
||
with open(file_path, "rb") as f:
|
||
for chunk in iter(lambda: f.read(4096), b""):
|
||
hash_md5.update(chunk)
|
||
return hash_md5.hexdigest()
|
||
|
||
def _get_audio_info(self, file_path: str) -> Dict:
|
||
"""获取音频基本信息"""
|
||
if not AUDIO_LIBS_AVAILABLE:
|
||
# 如果没有音频处理库,返回基本信息
|
||
file_size = os.path.getsize(file_path)
|
||
return {
|
||
'duration': 0.0,
|
||
'sample_rate': 0,
|
||
'channels': 0,
|
||
'file_size': file_size
|
||
}
|
||
|
||
try:
|
||
# 使用librosa获取音频信息
|
||
y, sr = librosa.load(file_path, sr=None)
|
||
duration = librosa.get_duration(y=y, sr=sr)
|
||
|
||
return {
|
||
'duration': duration,
|
||
'sample_rate': sr,
|
||
'channels': 1 if len(y.shape) == 1 else y.shape[0],
|
||
'file_size': os.path.getsize(file_path)
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"Failed to get audio info: {e}")
|
||
file_size = os.path.getsize(file_path)
|
||
return {
|
||
'duration': 0.0,
|
||
'sample_rate': 0,
|
||
'channels': 0,
|
||
'file_size': file_size
|
||
}
|
||
|
||
def _extract_audio_features(self, file_path: str) -> Dict:
|
||
"""提取音频特征(节奏、频谱等)"""
|
||
if not AUDIO_LIBS_AVAILABLE:
|
||
return {
|
||
'tempo': None,
|
||
'beat_times': None,
|
||
'spectral_centroid': None,
|
||
'spectral_rolloff': None,
|
||
'zero_crossing_rate': None,
|
||
'mfcc_features': None
|
||
}
|
||
|
||
try:
|
||
# 加载音频
|
||
y, sr = librosa.load(file_path, sr=None)
|
||
|
||
# 提取节拍和节奏
|
||
tempo, beat_frames = librosa.beat.beat_track(y=y, sr=sr)
|
||
beat_times = librosa.frames_to_time(beat_frames, sr=sr).tolist()
|
||
|
||
# 提取频谱特征
|
||
spectral_centroids = librosa.feature.spectral_centroid(y=y, sr=sr)[0]
|
||
spectral_rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)[0]
|
||
zero_crossing_rate = librosa.feature.zero_crossing_rate(y)[0]
|
||
|
||
# 提取MFCC特征
|
||
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
|
||
mfcc_mean = np.mean(mfccs, axis=1).tolist()
|
||
|
||
return {
|
||
'tempo': float(tempo),
|
||
'beat_times': beat_times,
|
||
'spectral_centroid': float(np.mean(spectral_centroids)),
|
||
'spectral_rolloff': float(np.mean(spectral_rolloff)),
|
||
'zero_crossing_rate': float(np.mean(zero_crossing_rate)),
|
||
'mfcc_features': mfcc_mean
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"Failed to extract audio features: {e}")
|
||
return {
|
||
'tempo': None,
|
||
'beat_times': None,
|
||
'spectral_centroid': None,
|
||
'spectral_rolloff': None,
|
||
'zero_crossing_rate': None,
|
||
'mfcc_features': None
|
||
}
|
||
|
||
def _generate_frequency_chart(self, file_path: str, audio_id: str) -> Optional[str]:
|
||
"""生成音频频率图"""
|
||
if not AUDIO_LIBS_AVAILABLE:
|
||
return None
|
||
|
||
try:
|
||
# 加载音频
|
||
y, sr = librosa.load(file_path, sr=None)
|
||
|
||
# 生成频谱图
|
||
plt.figure(figsize=(12, 8))
|
||
|
||
# 子图1: 波形图
|
||
plt.subplot(3, 1, 1)
|
||
plt.plot(np.linspace(0, len(y)/sr, len(y)), y)
|
||
plt.title('Waveform')
|
||
plt.xlabel('Time (s)')
|
||
plt.ylabel('Amplitude')
|
||
|
||
# 子图2: 频谱图
|
||
plt.subplot(3, 1, 2)
|
||
D = librosa.amplitude_to_db(np.abs(librosa.stft(y)), ref=np.max)
|
||
librosa.display.specshow(D, sr=sr, x_axis='time', y_axis='hz')
|
||
plt.colorbar(format='%+2.0f dB')
|
||
plt.title('Spectrogram')
|
||
|
||
# 子图3: 梅尔频谱图
|
||
plt.subplot(3, 1, 3)
|
||
S = librosa.feature.melspectrogram(y=y, sr=sr)
|
||
S_dB = librosa.power_to_db(S, ref=np.max)
|
||
librosa.display.specshow(S_dB, sr=sr, x_axis='time', y_axis='mel')
|
||
plt.colorbar(format='%+2.0f dB')
|
||
plt.title('Mel-frequency spectrogram')
|
||
|
||
plt.tight_layout()
|
||
|
||
# 保存图表
|
||
chart_filename = f"{audio_id}_frequency_chart.png"
|
||
chart_path = self.charts_dir / chart_filename
|
||
plt.savefig(chart_path, dpi=150, bbox_inches='tight')
|
||
plt.close()
|
||
|
||
return str(chart_path)
|
||
except Exception as e:
|
||
logger.error(f"Failed to generate frequency chart: {e}")
|
||
return None
|
||
|
||
def get_audio_by_md5(self, md5_hash: str) -> Optional[Dict]:
|
||
"""根据MD5获取音频文件"""
|
||
for audio_file in self.audio_files:
|
||
if audio_file.md5_hash == md5_hash and audio_file.is_active:
|
||
return asdict(audio_file)
|
||
return None
|
||
|
||
def get_all_audio_files(self) -> List[Dict]:
|
||
"""获取所有音频文件"""
|
||
return [asdict(audio_file) for audio_file in self.audio_files if audio_file.is_active]
|
||
|
||
def get_audio_by_id(self, audio_id: str) -> Optional[Dict]:
|
||
"""根据ID获取音频文件"""
|
||
for audio_file in self.audio_files:
|
||
if audio_file.id == audio_id and audio_file.is_active:
|
||
return asdict(audio_file)
|
||
return None
|
||
|
||
def upload_audio_file(self, source_path: str, filename: str = None) -> Dict:
|
||
"""上传单个音频文件"""
|
||
if not os.path.exists(source_path):
|
||
raise FileNotFoundError(f"Source file not found: {source_path}")
|
||
|
||
# 计算MD5
|
||
md5_hash = self._calculate_md5(source_path)
|
||
|
||
# 检查是否已存在相同MD5的文件
|
||
existing = self.get_audio_by_md5(md5_hash)
|
||
if existing:
|
||
logger.info(f"Audio file with MD5 {md5_hash} already exists")
|
||
return existing
|
||
|
||
# 生成新的音频ID和文件名
|
||
audio_id = str(uuid.uuid4())
|
||
if filename is None:
|
||
filename = os.path.basename(source_path)
|
||
|
||
# 获取文件扩展名
|
||
file_ext = os.path.splitext(filename)[1].lower()
|
||
stored_filename = f"{audio_id}{file_ext}"
|
||
stored_path = self.audio_storage_dir / stored_filename
|
||
|
||
# 复制文件到存储目录
|
||
shutil.copy2(source_path, stored_path)
|
||
|
||
# 获取音频基本信息
|
||
audio_info = self._get_audio_info(str(stored_path))
|
||
|
||
# 提取音频特征
|
||
features = self._extract_audio_features(str(stored_path))
|
||
|
||
# 生成频率图
|
||
chart_path = self._generate_frequency_chart(str(stored_path), audio_id)
|
||
|
||
# 创建音频文件记录
|
||
now = datetime.now().isoformat()
|
||
audio_file = AudioFile(
|
||
id=audio_id,
|
||
filename=filename,
|
||
file_path=str(stored_path),
|
||
md5_hash=md5_hash,
|
||
file_size=audio_info['file_size'],
|
||
duration=audio_info['duration'],
|
||
sample_rate=audio_info['sample_rate'],
|
||
channels=audio_info['channels'],
|
||
format=file_ext[1:] if file_ext else 'unknown',
|
||
tempo=features['tempo'],
|
||
beat_times=features['beat_times'],
|
||
spectral_centroid=features['spectral_centroid'],
|
||
spectral_rolloff=features['spectral_rolloff'],
|
||
zero_crossing_rate=features['zero_crossing_rate'],
|
||
mfcc_features=features['mfcc_features'],
|
||
frequency_chart_path=chart_path,
|
||
created_at=now,
|
||
updated_at=now
|
||
)
|
||
|
||
self.audio_files.append(audio_file)
|
||
self._save_audio_files()
|
||
|
||
logger.info(f"Uploaded audio file: {filename} (MD5: {md5_hash})")
|
||
return asdict(audio_file)
|
||
|
||
def batch_upload_audio_files(self, source_directory: str) -> Dict:
|
||
"""批量上传音频文件"""
|
||
if not os.path.exists(source_directory):
|
||
raise FileNotFoundError(f"Source directory not found: {source_directory}")
|
||
|
||
# 支持的音频格式
|
||
audio_extensions = {'.mp3', '.wav', '.flac', '.aac', '.ogg', '.m4a', '.wma'}
|
||
|
||
results = {
|
||
'total_files': 0,
|
||
'uploaded_files': 0,
|
||
'skipped_files': 0,
|
||
'failed_files': 0,
|
||
'uploaded_list': [],
|
||
'skipped_list': [],
|
||
'failed_list': []
|
||
}
|
||
|
||
# 遍历目录中的所有文件
|
||
for root, dirs, files in os.walk(source_directory):
|
||
for file in files:
|
||
file_path = os.path.join(root, file)
|
||
file_ext = os.path.splitext(file)[1].lower()
|
||
|
||
# 检查是否为音频文件
|
||
if file_ext not in audio_extensions:
|
||
continue
|
||
|
||
results['total_files'] += 1
|
||
|
||
try:
|
||
# 尝试上传文件
|
||
result = self.upload_audio_file(file_path, file)
|
||
|
||
# 检查是否为新上传的文件
|
||
if any(existing['md5_hash'] == result['md5_hash']
|
||
for existing in results['uploaded_list']):
|
||
results['skipped_files'] += 1
|
||
results['skipped_list'].append({
|
||
'filename': file,
|
||
'reason': 'Already exists (same MD5)'
|
||
})
|
||
else:
|
||
results['uploaded_files'] += 1
|
||
results['uploaded_list'].append(result)
|
||
|
||
except Exception as e:
|
||
results['failed_files'] += 1
|
||
results['failed_list'].append({
|
||
'filename': file,
|
||
'error': str(e)
|
||
})
|
||
logger.error(f"Failed to upload {file}: {e}")
|
||
|
||
logger.info(f"Batch upload completed: {results['uploaded_files']} uploaded, "
|
||
f"{results['skipped_files']} skipped, {results['failed_files']} failed")
|
||
|
||
return results
|
||
|
||
def delete_audio_file(self, audio_id: str) -> bool:
|
||
"""删除音频文件"""
|
||
for i, audio_file in enumerate(self.audio_files):
|
||
if audio_file.id == audio_id:
|
||
# 删除物理文件
|
||
try:
|
||
if os.path.exists(audio_file.file_path):
|
||
os.remove(audio_file.file_path)
|
||
|
||
# 删除频率图
|
||
if audio_file.frequency_chart_path and os.path.exists(audio_file.frequency_chart_path):
|
||
os.remove(audio_file.frequency_chart_path)
|
||
except Exception as e:
|
||
logger.error(f"Failed to delete physical files: {e}")
|
||
|
||
# 从列表中移除
|
||
deleted_audio = self.audio_files.pop(i)
|
||
self._save_audio_files()
|
||
|
||
logger.info(f"Deleted audio file: {audio_id} - {deleted_audio.filename}")
|
||
return True
|
||
return False
|
||
|
||
def search_audio_files(self, keyword: str) -> List[Dict]:
|
||
"""搜索音频文件"""
|
||
keyword = keyword.lower()
|
||
results = []
|
||
|
||
for audio_file in self.audio_files:
|
||
if (audio_file.is_active and
|
||
keyword in audio_file.filename.lower()):
|
||
results.append(asdict(audio_file))
|
||
|
||
return results
|
||
|
||
|
||
# 全局实例
|
||
audio_manager = AudioManager()
|
||
|
||
|
||
def main():
|
||
"""命令行接口 - 使用JSON-RPC协议"""
|
||
import sys
|
||
import json
|
||
|
||
# 创建响应处理器
|
||
rpc = create_response_handler()
|
||
|
||
if len(sys.argv) < 2:
|
||
rpc.error("INVALID_REQUEST", "No command specified")
|
||
return
|
||
|
||
command = sys.argv[1]
|
||
|
||
try:
|
||
if command == "get_all_audio_files":
|
||
audio_files = audio_manager.get_all_audio_files()
|
||
rpc.success(audio_files)
|
||
|
||
elif command == "get_audio_by_id":
|
||
if len(sys.argv) < 3:
|
||
rpc.error("INVALID_REQUEST", "Audio ID required")
|
||
return
|
||
audio_id = sys.argv[2]
|
||
audio_file = audio_manager.get_audio_by_id(audio_id)
|
||
if audio_file:
|
||
rpc.success(audio_file)
|
||
else:
|
||
rpc.error("NOT_FOUND", "Audio file not found")
|
||
|
||
elif command == "get_audio_by_md5":
|
||
if len(sys.argv) < 3:
|
||
rpc.error("INVALID_REQUEST", "MD5 hash required")
|
||
return
|
||
md5_hash = sys.argv[2]
|
||
audio_file = audio_manager.get_audio_by_md5(md5_hash)
|
||
if audio_file:
|
||
rpc.success(audio_file)
|
||
else:
|
||
rpc.error("NOT_FOUND", "Audio file not found")
|
||
|
||
elif command == "upload_audio_file":
|
||
if len(sys.argv) < 3:
|
||
rpc.error("INVALID_REQUEST", "Source path required")
|
||
return
|
||
source_path = sys.argv[2]
|
||
filename = sys.argv[3] if len(sys.argv) > 3 else None
|
||
result = audio_manager.upload_audio_file(source_path, filename)
|
||
rpc.success(result)
|
||
|
||
elif command == "batch_upload_audio_files":
|
||
if len(sys.argv) < 3:
|
||
rpc.error("INVALID_REQUEST", "Source directory required")
|
||
return
|
||
source_directory = sys.argv[2]
|
||
result = audio_manager.batch_upload_audio_files(source_directory)
|
||
rpc.success(result)
|
||
|
||
elif command == "delete_audio_file":
|
||
if len(sys.argv) < 3:
|
||
rpc.error("INVALID_REQUEST", "Audio ID required")
|
||
return
|
||
audio_id = sys.argv[2]
|
||
success = audio_manager.delete_audio_file(audio_id)
|
||
rpc.success(success)
|
||
|
||
elif command == "search_audio_files":
|
||
if len(sys.argv) < 3:
|
||
rpc.error("INVALID_REQUEST", "Search keyword required")
|
||
return
|
||
keyword = sys.argv[2]
|
||
results = audio_manager.search_audio_files(keyword)
|
||
rpc.success(results)
|
||
|
||
else:
|
||
rpc.error("INVALID_REQUEST", f"Unknown command: {command}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Command execution failed: {e}")
|
||
rpc.error("INTERNAL_ERROR", str(e))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|