Source code for fmus_vox.core.audio

"""
Core audio processing functionality.

This module provides the Audio class which is the main entry point for all audio operations
in the fmus-vox library.
"""

from typing import Optional, Union, List, Tuple, BinaryIO, Any
import numpy as np
import os
import tempfile
from pathlib import Path
import soundfile as sf
import librosa
import pyrubberband as pyrb

from fmus_vox.core.errors import AudioError

[docs] class Audio: """ Main class for audio operations in fmus-vox. The Audio class provides an intuitive interface for loading, processing, and manipulating audio data. It supports method chaining for clean, readable code. Examples: >>> # Load and process audio >>> audio = Audio.load("recording.wav") >>> processed = audio.normalize().denoise().resample(target_sr=16000) >>> processed.save("processed.wav") >>> >>> # Record and save audio >>> audio = Audio.record(seconds=5) >>> audio.save("recording.wav") """
[docs] def __init__(self, data: np.ndarray, sample_rate: int): """ Initialize an Audio object. Args: data: Audio data as a numpy array sample_rate: Sample rate of the audio in Hz """ self._data = data self._sample_rate = sample_rate
[docs] @classmethod def load(cls, source: Union[str, Path, BinaryIO, np.ndarray], sample_rate: Optional[int] = None) -> "Audio": """ Load audio from file, bytes, or numpy array. Args: source: Audio source (file path, file-like object, or numpy array) sample_rate: Target sample rate for loading. If None, use the source's rate. If source is a numpy array, this must be provided. Returns: Audio object Raises: AudioError: If the audio cannot be loaded """ try: if isinstance(source, np.ndarray): if sample_rate is None: raise AudioError("Sample rate must be provided when loading from numpy array") return cls(source, sample_rate) if isinstance(source, (str, Path)): data, sr = librosa.load(source, sr=sample_rate, mono=True) return cls(data, sr) # Handle file-like objects with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: tmp.write(source.read()) tmp_name = tmp.name try: data, sr = librosa.load(tmp_name, sr=sample_rate, mono=True) return cls(data, sr) finally: os.unlink(tmp_name) except Exception as e: raise AudioError(f"Failed to load audio: {str(e)}")
[docs] @classmethod def record(cls, seconds: Optional[float] = None, sample_rate: int = 44100, **kwargs) -> "Audio": """ Record audio from microphone. Args: seconds: Duration in seconds to record. If None, records until stopped. sample_rate: Sample rate to record at **kwargs: Additional arguments for recording Returns: Audio object containing the recorded audio Raises: AudioError: If recording fails """ try: import sounddevice as sd if seconds is None: # Interactive recording print("Recording... Press Ctrl+C to stop.") try: data = sd.rec(int(sample_rate * 3600), samplerate=sample_rate, channels=1, blocking=True) except KeyboardInterrupt: sd.stop() else: # Fixed duration recording data = sd.rec(int(sample_rate * seconds), samplerate=sample_rate, channels=1, blocking=True) return cls(data.flatten(), sample_rate) except Exception as e: raise AudioError(f"Failed to record audio: {str(e)}")
[docs] def save(self, path: Union[str, Path], format: Optional[str] = None, **kwargs) -> str: """ Save audio to file. Args: path: Path to save the audio file format: Audio format (inferred from path if None) **kwargs: Additional arguments for saving Returns: Path to the saved file Raises: AudioError: If saving fails """ try: sf.write(path, self._data, self._sample_rate, format=format, **kwargs) return str(path) except Exception as e: raise AudioError(f"Failed to save audio: {str(e)}")
[docs] def play(self) -> None: """ Play audio through speakers. Raises: AudioError: If playback fails """ try: import sounddevice as sd sd.play(self._data, self._sample_rate) sd.wait() except Exception as e: raise AudioError(f"Failed to play audio: {str(e)}")
[docs] def trim(self, start: float = 0, end: Optional[float] = None) -> "Audio": """ Trim audio to specified time range. Args: start: Start time in seconds end: End time in seconds. If None, trim to the end of the audio. Returns: New Audio object with trimmed audio """ start_sample = int(start * self._sample_rate) if end is None: end_sample = len(self._data) else: end_sample = min(int(end * self._sample_rate), len(self._data)) return Audio(self._data[start_sample:end_sample], self._sample_rate)
[docs] def denoise(self, strength: float = 0.5) -> "Audio": """ Remove noise from audio. Args: strength: Denoising strength (0.0 to 1.0) Returns: New Audio object with denoised audio """ try: import noisereduce as nr denoised = nr.reduce_noise( y=self._data, sr=self._sample_rate, prop_decrease=strength ) return Audio(denoised, self._sample_rate) except ImportError: # Fall back to simple high-pass filter if noisereduce not available from scipy import signal b, a = signal.butter(5, 100 / (self._sample_rate / 2), 'highpass') denoised = signal.filtfilt(b, a, self._data) return Audio(denoised, self._sample_rate)
[docs] def normalize(self, target_db: float = -3) -> "Audio": """ Normalize audio volume. Args: target_db: Target peak dB level Returns: New Audio object with normalized audio """ peak = np.max(np.abs(self._data)) if peak > 0: target_peak = 10 ** (target_db / 20) normalized = self._data * (target_peak / peak) return Audio(normalized, self._sample_rate) return self
[docs] def resample(self, target_sr: int = 16000) -> "Audio": """ Resample audio to target sample rate. Args: target_sr: Target sample rate in Hz Returns: New Audio object with resampled audio """ if self._sample_rate == target_sr: return self resampled = librosa.resample( y=self._data, orig_sr=self._sample_rate, target_sr=target_sr ) return Audio(resampled, target_sr)
[docs] def detect_vad(self, threshold: float = 0.5) -> List[Tuple[float, float]]: """ Detect voice activity segments. Args: threshold: Energy threshold for voice detection (0.0 to 1.0) Returns: List of (start_time, end_time) tuples in seconds """ # Simple energy-based VAD energy = librosa.feature.rms(y=self._data)[0] energy_norm = energy / np.max(energy) if np.max(energy) > 0 else energy # Find segments above threshold is_speech = energy_norm > threshold # Convert to time ranges frame_length = 2048 hop_length = 512 segments = [] in_segment = False start_frame = 0 for i, speech in enumerate(is_speech): if speech and not in_segment: in_segment = True start_frame = i elif not speech and in_segment: in_segment = False # Convert frames to time start_time = librosa.frames_to_time(start_frame, sr=self._sample_rate, hop_length=hop_length) end_time = librosa.frames_to_time(i, sr=self._sample_rate, hop_length=hop_length) segments.append((start_time, end_time)) # Handle case where audio ends during speech if in_segment: start_time = librosa.frames_to_time(start_frame, sr=self._sample_rate, hop_length=hop_length) end_time = librosa.frames_to_time(len(is_speech), sr=self._sample_rate, hop_length=hop_length) segments.append((start_time, end_time)) return segments
[docs] def split_on_silence(self, min_silence_len: int = 500, silence_thresh: float = -40) -> List["Audio"]: """ Split audio on silence into segments. Args: min_silence_len: Minimum silence length in milliseconds silence_thresh: Silence threshold in dB Returns: List of Audio objects, one for each non-silent segment """ # Convert min_silence_len from ms to samples min_silence_samples = int(min_silence_len * self._sample_rate / 1000) # Convert silence_thresh from dB to amplitude ratio silence_thresh_amp = 10 ** (silence_thresh / 20) # Find silent points is_silent = np.abs(self._data) < silence_thresh_amp # Group silent points into ranges silent_ranges = [] start = None for i, silent in enumerate(is_silent): if silent and start is None: start = i elif not silent and start is not None: if i - start >= min_silence_samples: silent_ranges.append((start, i)) start = None # Handle case where audio ends in silence if start is not None and len(is_silent) - start >= min_silence_samples: silent_ranges.append((start, len(is_silent))) # Create segments based on silent ranges segments = [] if not silent_ranges: segments.append(self) return segments # Add segment from start to first silence if silent_ranges[0][0] > 0: segments.append( Audio(self._data[:silent_ranges[0][0]], self._sample_rate) ) # Add segments between silences for i in range(len(silent_ranges) - 1): start = silent_ranges[i][1] end = silent_ranges[i + 1][0] if end > start: segments.append( Audio(self._data[start:end], self._sample_rate) ) # Add segment from last silence to end if silent_ranges[-1][1] < len(self._data): segments.append( Audio(self._data[silent_ranges[-1][1]:], self._sample_rate) ) return segments
[docs] def change_speed(self, speed_factor: float = 1.0) -> "Audio": """ Change the playback speed of the audio. Args: speed_factor: Speed factor (1.0 = original speed) Returns: New Audio object with changed speed """ if speed_factor == 1.0: return self y_stretched = pyrb.time_stretch(self._data, self._sample_rate, speed_factor) return Audio(y_stretched, self._sample_rate)
[docs] def change_pitch(self, semitones: float = 0.0) -> "Audio": """ Change the pitch of the audio. Args: semitones: Number of semitones to shift (-12 to +12) Returns: New Audio object with changed pitch """ if semitones == 0: return self y_shifted = pyrb.pitch_shift(self._data, self._sample_rate, semitones) return Audio(y_shifted, self._sample_rate)
@property def duration(self) -> float: """Get audio duration in seconds.""" return len(self._data) / self._sample_rate @property def sample_rate(self) -> int: """Get audio sample rate.""" return self._sample_rate @property def data(self) -> np.ndarray: """Get audio data as numpy array.""" return self._data
[docs] def __len__(self) -> int: """Get length of audio in samples.""" return len(self._data)