#!/usr/bin/env python3 """ Audio Processing Core Module 音频处理核心模块 This module provides audio processing functionality using Librosa, Pydub, and other audio libraries. """ import argparse import json import sys from pathlib import Path from typing import Dict, Any, List, Tuple import numpy as np from pydub import AudioSegment import librosa import librosa.display import matplotlib.pyplot as plt import sys import os from ..config import settings from ..utils import setup_logger, validate_audio_file logger = setup_logger(__name__) class AudioProcessor: """Main audio processing class.""" def __init__(self): self.temp_dir = settings.temp_dir self.cache_dir = settings.cache_dir self.sample_rate = settings.default_sample_rate def analyze_audio(self, file_path: str, analysis_type: str) -> Dict[str, Any]: """ Analyze audio file with specified analysis type. Args: file_path: Path to audio file analysis_type: Type of analysis to perform Returns: Dictionary with analysis results """ try: if not validate_audio_file(file_path): raise ValueError(f"Invalid audio file: {file_path}") logger.info(f"Analyzing audio: {analysis_type} on {file_path}") # Load audio y, sr = librosa.load(file_path, sr=self.sample_rate) if analysis_type == "rhythm": result = self._analyze_rhythm(y, sr) elif analysis_type == "spectral": result = self._analyze_spectral(y, sr) elif analysis_type == "tempo": result = self._analyze_tempo(y, sr) elif analysis_type == "pitch": result = self._analyze_pitch(y, sr) elif analysis_type == "energy": result = self._analyze_energy(y, sr) elif analysis_type == "mfcc": result = self._analyze_mfcc(y, sr) else: raise ValueError(f"Unknown analysis type: {analysis_type}") return { "status": "success", "file_path": file_path, "analysis_type": analysis_type, "sample_rate": sr, "duration": len(y) / sr, "result": result } except Exception as e: logger.error(f"Audio analysis failed: {str(e)}") return { "status": "error", "error": str(e) } def _analyze_rhythm(self, y: np.ndarray, sr: int) -> Dict[str, Any]: """Analyze rhythm and beat tracking.""" # Beat tracking tempo, beats = librosa.beat.beat_track(y=y, sr=sr) beat_times = librosa.frames_to_time(beats, sr=sr) # Onset detection onset_frames = librosa.onset.onset_detect(y=y, sr=sr) onset_times = librosa.frames_to_time(onset_frames, sr=sr) # Rhythm patterns beat_intervals = np.diff(beat_times) rhythm_stability = 1.0 - np.std(beat_intervals) / np.mean(beat_intervals) return { "tempo": float(tempo), "beat_count": len(beats), "beat_times": beat_times.tolist(), "onset_count": len(onset_frames), "onset_times": onset_times.tolist(), "rhythm_stability": float(rhythm_stability), "average_beat_interval": float(np.mean(beat_intervals)) } def _analyze_spectral(self, y: np.ndarray, sr: int) -> Dict[str, Any]: """Analyze spectral features.""" # Spectral centroid spectral_centroids = librosa.feature.spectral_centroid(y=y, sr=sr)[0] # Spectral rolloff spectral_rolloff = librosa.feature.spectral_rolloff(y=y, sr=sr)[0] # Spectral bandwidth spectral_bandwidth = librosa.feature.spectral_bandwidth(y=y, sr=sr)[0] # Zero crossing rate zcr = librosa.feature.zero_crossing_rate(y)[0] return { "spectral_centroid_mean": float(np.mean(spectral_centroids)), "spectral_centroid_std": float(np.std(spectral_centroids)), "spectral_rolloff_mean": float(np.mean(spectral_rolloff)), "spectral_rolloff_std": float(np.std(spectral_rolloff)), "spectral_bandwidth_mean": float(np.mean(spectral_bandwidth)), "spectral_bandwidth_std": float(np.std(spectral_bandwidth)), "zero_crossing_rate_mean": float(np.mean(zcr)), "zero_crossing_rate_std": float(np.std(zcr)) } def _analyze_tempo(self, y: np.ndarray, sr: int) -> Dict[str, Any]: """Analyze tempo and timing.""" # Tempo estimation tempo, beats = librosa.beat.beat_track(y=y, sr=sr) # Dynamic tempo analysis hop_length = 512 tempo_dynamic = librosa.beat.tempo( onset_envelope=librosa.onset.onset_strength(y=y, sr=sr), sr=sr, hop_length=hop_length ) return { "tempo": float(tempo), "tempo_confidence": float(np.std(tempo_dynamic)), "tempo_stability": float(1.0 - np.std(tempo_dynamic) / np.mean(tempo_dynamic)) if np.mean(tempo_dynamic) > 0 else 0.0, "beat_count": len(beats) } def _analyze_pitch(self, y: np.ndarray, sr: int) -> Dict[str, Any]: """Analyze pitch and harmonic content.""" # Pitch tracking using piptrack pitches, magnitudes = librosa.piptrack(y=y, sr=sr) # Extract fundamental frequency pitch_values = [] for t in range(pitches.shape[1]): index = magnitudes[:, t].argmax() pitch = pitches[index, t] if pitch > 0: pitch_values.append(pitch) if pitch_values: pitch_mean = np.mean(pitch_values) pitch_std = np.std(pitch_values) pitch_range = np.max(pitch_values) - np.min(pitch_values) else: pitch_mean = pitch_std = pitch_range = 0.0 # Harmonic-percussive separation y_harmonic, y_percussive = librosa.effects.hpss(y) harmonic_ratio = np.mean(np.abs(y_harmonic)) / (np.mean(np.abs(y)) + 1e-8) return { "pitch_mean": float(pitch_mean), "pitch_std": float(pitch_std), "pitch_range": float(pitch_range), "pitch_count": len(pitch_values), "harmonic_ratio": float(harmonic_ratio) } def _analyze_energy(self, y: np.ndarray, sr: int) -> Dict[str, Any]: """Analyze energy and dynamics.""" # RMS energy rms = librosa.feature.rms(y=y)[0] # Short-time energy frame_length = 2048 hop_length = 512 energy = [] for i in range(0, len(y) - frame_length, hop_length): frame = y[i:i + frame_length] energy.append(np.sum(frame ** 2)) energy = np.array(energy) # Dynamic range dynamic_range = np.max(rms) - np.min(rms) return { "rms_mean": float(np.mean(rms)), "rms_std": float(np.std(rms)), "rms_max": float(np.max(rms)), "rms_min": float(np.min(rms)), "energy_mean": float(np.mean(energy)), "energy_std": float(np.std(energy)), "dynamic_range": float(dynamic_range) } def _analyze_mfcc(self, y: np.ndarray, sr: int) -> Dict[str, Any]: """Analyze MFCC (Mel-frequency cepstral coefficients).""" # Extract MFCC features mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13) # Statistical features for each MFCC coefficient mfcc_features = {} for i in range(mfccs.shape[0]): mfcc_features[f"mfcc_{i+1}_mean"] = float(np.mean(mfccs[i])) mfcc_features[f"mfcc_{i+1}_std"] = float(np.std(mfccs[i])) return mfcc_features def process_audio(self, input_path: str, output_path: str, operation: str, parameters: Dict[str, Any]) -> Dict[str, Any]: """ Process audio with specified operation and parameters. Args: input_path: Path to input audio file output_path: Path to output audio file operation: Type of operation to perform parameters: Operation-specific parameters Returns: Dictionary with processing results """ try: if not validate_audio_file(input_path): raise ValueError(f"Invalid audio file: {input_path}") logger.info(f"Processing audio: {operation} on {input_path}") # Load audio with pydub for processing audio = AudioSegment.from_file(input_path) if operation == "trim": result_audio = self._trim_audio(audio, parameters) elif operation == "volume": result_audio = self._adjust_volume(audio, parameters) elif operation == "fade": result_audio = self._apply_fade(audio, parameters) elif operation == "normalize": result_audio = self._normalize_audio(audio, parameters) elif operation == "merge": result_audio = self._merge_audio(parameters) else: raise ValueError(f"Unknown operation: {operation}") # Export result result_audio.export(output_path, format="wav") return { "status": "success", "output_path": output_path, "duration": len(result_audio) / 1000.0, "channels": result_audio.channels, "sample_rate": result_audio.frame_rate } except Exception as e: logger.error(f"Audio processing failed: {str(e)}") return { "status": "error", "error": str(e) } def _trim_audio(self, audio: AudioSegment, params: Dict[str, Any]) -> AudioSegment: """Trim audio to specified start and end times.""" start_ms = int(params.get("start_time", 0) * 1000) end_ms = int(params.get("end_time", len(audio) / 1000) * 1000) return audio[start_ms:end_ms] def _adjust_volume(self, audio: AudioSegment, params: Dict[str, Any]) -> AudioSegment: """Adjust audio volume.""" volume_change = params.get("volume_db", 0) return audio + volume_change def _apply_fade(self, audio: AudioSegment, params: Dict[str, Any]) -> AudioSegment: """Apply fade in/out effects.""" fade_in_ms = int(params.get("fade_in", 0) * 1000) fade_out_ms = int(params.get("fade_out", 0) * 1000) result = audio if fade_in_ms > 0: result = result.fade_in(fade_in_ms) if fade_out_ms > 0: result = result.fade_out(fade_out_ms) return result def _normalize_audio(self, audio: AudioSegment, params: Dict[str, Any]) -> AudioSegment: """Normalize audio to specified level.""" target_dBFS = params.get("target_db", -20.0) return audio.normalize().apply_gain(target_dBFS - audio.dBFS) def _merge_audio(self, params: Dict[str, Any]) -> AudioSegment: """Merge multiple audio files.""" audio_paths = params.get("audio_paths", []) if len(audio_paths) < 2: raise ValueError("At least 2 audio files required for merge operation") result = AudioSegment.from_file(audio_paths[0]) for path in audio_paths[1:]: audio = AudioSegment.from_file(path) result = result + audio return result def main(): """Command line interface for audio processing.""" parser = argparse.ArgumentParser(description="Audio Processing Core") parser.add_argument("--file", required=True, help="Audio file path") parser.add_argument("--analysis", required=True, help="Analysis type to perform") args = parser.parse_args() try: processor = AudioProcessor() result = processor.analyze_audio(args.file, args.analysis) print(json.dumps(result)) except Exception as e: error_result = { "status": "error", "error": str(e) } print(json.dumps(error_result)) sys.exit(1) if __name__ == "__main__": main()