"""
fmus_vox.stream.audioplayer - Audio playback functionality.
This module provides classes for audio playback with support for file playback,
streaming playback, and real-time audio output processing.
"""
import threading
import time
import queue
from typing import Optional, Union, Callable, Dict, Any, List, Tuple
import numpy as np
from fmus_vox.core.audio import Audio
from fmus_vox.core.errors import DeviceError, DependencyError
try:
import pyaudio
except ImportError:
pyaudio = None
[docs]
class AudioEffect:
"""
Base class for real-time audio output effects.
Subclasses should implement the process method to perform
audio processing on outgoing audio data.
"""
[docs]
def __init__(self, name: str = "AudioEffect"):
"""
Initialize an audio effect.
Args:
name: Name of the effect for identification
"""
self.name = name
self.enabled = True
[docs]
def process(self, data: np.ndarray, sample_rate: int) -> np.ndarray:
"""
Process audio data.
Args:
data: Audio data as numpy array
sample_rate: Sample rate of the audio
Returns:
Processed audio data
"""
# Base implementation just returns the data unchanged
return data
[docs]
def enable(self):
"""Enable the effect."""
self.enabled = True
[docs]
def disable(self):
"""Disable the effect."""
self.enabled = False
[docs]
class Equalizer(AudioEffect):
"""
Simple equalizer effect for audio playback.
Applies gain adjustments to different frequency bands.
"""
[docs]
def __init__(self, bands: Dict[str, float] = None):
"""
Initialize equalizer with frequency band gains.
Args:
bands: Dictionary of frequency bands and their gains (in dB)
Default bands: "low", "mid", "high"
"""
super().__init__("Equalizer")
# Default bands if none provided
if bands is None:
self.bands = {
"low": 0.0, # 20-250 Hz
"mid": 0.0, # 250-4000 Hz
"high": 0.0, # 4000-20000 Hz
}
else:
self.bands = bands
# Initialize filters
self._initialized = False
self._coeffs = {}
[docs]
def set_gain(self, band: str, gain_db: float) -> None:
"""
Set gain for a specific frequency band.
Args:
band: Band name ("low", "mid", "high", or custom band)
gain_db: Gain in decibels (-12 to +12 recommended)
"""
if band in self.bands:
self.bands[band] = gain_db
self._initialized = False # Force recalculation of filters
else:
raise ValueError(f"Unknown band: {band}. Available bands: {list(self.bands.keys())}")
def _initialize_filters(self, sample_rate: int) -> None:
"""
Initialize filter coefficients based on sample rate.
Implements simple FFT-based filtering.
Args:
sample_rate: Audio sample rate
"""
try:
from scipy import signal
except ImportError:
self._initialized = False
return
# Define frequency ranges for bands
freq_ranges = {
"low": (20, 250),
"mid": (250, 4000),
"high": (4000, 20000),
}
# Custom frequency ranges if provided
for band in self.bands:
if band not in freq_ranges and "-" in band:
try:
low, high = band.split("-")
freq_ranges[band] = (float(low), float(high))
except (ValueError, TypeError):
pass
# Create filters for each band
nyquist = sample_rate / 2
self._coeffs = {}
for band, (low_freq, high_freq) in freq_ranges.items():
if band in self.bands:
# Normalize frequencies to Nyquist
low_norm = low_freq / nyquist
high_norm = min(high_freq / nyquist, 0.99)
# Skip bands outside our frequency range
if low_norm >= 1.0 or high_norm <= 0:
continue
# Design bandpass filter
if band == "low":
# Lowpass for low band
b, a = signal.butter(2, high_norm, btype='lowpass')
elif band == "high":
# Highpass for high band
b, a = signal.butter(2, low_norm, btype='highpass')
else:
# Bandpass for other bands
b, a = signal.butter(2, [low_norm, high_norm], btype='bandpass')
self._coeffs[band] = (b, a)
self._initialized = True
[docs]
def process(self, data: np.ndarray, sample_rate: int) -> np.ndarray:
"""Apply equalization to the audio data."""
if not self.enabled or len(data) == 0:
return data
try:
from scipy import signal
except ImportError:
return data
# Initialize filters if needed
if not self._initialized:
self._initialize_filters(sample_rate)
if not self._initialized:
return data
# Apply each band filter
result = np.zeros_like(data)
for band, (b, a) in self._coeffs.items():
# Filter the band
filtered = signal.lfilter(b, a, data)
# Apply gain
gain_db = self.bands[band]
gain_linear = 10 ** (gain_db / 20.0)
# Add to result
result += filtered * gain_linear
return result
[docs]
class AudioPlayer:
"""
Class for playing audio from files or streams.
This class provides functionality for audio playback with support for
real-time effects processing and audio format conversion.
"""
# Map of common formats to PyAudio format constants
FORMAT_MAP = {
"float32": None, # Will be set if pyaudio is imported
"int32": None,
"int24": None,
"int16": None,
"int8": None,
"uint8": None,
}
[docs]
def __init__(
self,
device_index: Optional[int] = None,
sample_rate: int = 44100,
channels: int = 2,
format: str = "float32",
buffer_size: int = 1024,
**kwargs
):
"""
Initialize an audio player.
Args:
device_index: Index of the output device to use. None for default.
sample_rate: Sample rate for playback.
channels: Number of audio channels for playback.
format: Audio format ('float32', 'int16', etc.)
buffer_size: Size of audio buffer chunks for playback.
**kwargs: Additional parameters for PyAudio.
"""
if pyaudio is None:
raise DependencyError(
"PyAudio is not installed. Install with: pip install pyaudio"
)
# Initialize format map with actual pyaudio constants
if not AudioPlayer.FORMAT_MAP["float32"]:
AudioPlayer.FORMAT_MAP = {
"float32": pyaudio.paFloat32,
"int32": pyaudio.paInt32,
"int24": pyaudio.paInt24,
"int16": pyaudio.paInt16,
"int8": pyaudio.paInt8,
"uint8": pyaudio.paUInt8,
}
self.device_index = device_index
self.sample_rate = sample_rate
self.channels = channels
self.format = format
self.buffer_size = buffer_size
self.kwargs = kwargs
self._pyaudio_instance = None
self._stream = None
self._audio_buffer = queue.Queue()
self._is_playing = False
self._stop_event = threading.Event()
self._play_thread = None
self._audio_position = 0
self._audio_data = None
self._total_frames = 0
# Audio effects
self._effects: List[AudioEffect] = []
# Callbacks
self._on_complete = None
self._on_position_change = None
[docs]
def __enter__(self):
"""Open the audio stream when used as a context manager."""
self.open()
return self
[docs]
def __exit__(self, exc_type, exc_val, exc_tb):
"""Close the audio stream when exiting context manager."""
self.close()
[docs]
def open(self):
"""
Open the audio playback stream.
Raises:
DeviceError: If the specified device cannot be opened.
"""
try:
self._pyaudio_instance = pyaudio.PyAudio()
# Get pyaudio format from string format
pa_format = AudioPlayer.FORMAT_MAP.get(self.format.lower())
if pa_format is None:
raise ValueError(f"Unsupported audio format: {self.format}")
self._stream = self._pyaudio_instance.open(
output=True,
input=False,
start=False,
format=pa_format,
channels=self.channels,
rate=self.sample_rate,
frames_per_buffer=self.buffer_size,
output_device_index=self.device_index,
stream_callback=self._audio_callback,
**self.kwargs
)
except Exception as e:
if self._pyaudio_instance:
self._pyaudio_instance.terminate()
self._pyaudio_instance = None
raise DeviceError(f"Failed to open audio output device: {str(e)}")
return self
[docs]
def close(self):
"""Close the audio playback stream and release resources."""
self.stop()
if self._stream:
self._stream.stop_stream()
self._stream.close()
self._stream = None
if self._pyaudio_instance:
self._pyaudio_instance.terminate()
self._pyaudio_instance = None
def _audio_callback(self, in_data, frame_count, time_info, status):
"""Callback for PyAudio to handle outgoing audio data."""
if not self._is_playing or self._audio_data is None:
# Return silence if not playing
return (bytes(frame_count * self.channels * 4), pyaudio.paContinue)
if self._audio_position >= self._total_frames:
# End of playback
self._is_playing = False
self._stop_event.set()
# Run on_complete callback in a separate thread to avoid blocking
if self._on_complete:
threading.Thread(target=self._on_complete).start()
return (bytes(frame_count * self.channels * 4), pyaudio.paComplete)
# Calculate how many frames to read
frames_to_read = min(frame_count, self._total_frames - self._audio_position)
# Get audio data for this chunk
start_idx = self._audio_position * self.channels
end_idx = start_idx + frames_to_read * self.channels
audio_chunk = self._audio_data[start_idx:end_idx]
# Apply effects
if self._effects:
for effect in self._effects:
if effect.enabled:
audio_chunk = effect.process(audio_chunk, self.sample_rate)
# Convert to bytes based on format
if self.format == "float32":
output_data = audio_chunk.astype(np.float32).tobytes()
elif self.format == "int32":
output_data = (audio_chunk * 2147483648.0).astype(np.int32).tobytes()
elif self.format == "int24":
# int24 is tricky, we'll use int16 as approximation
output_data = (audio_chunk * 32768.0).astype(np.int16).tobytes()
elif self.format == "int16":
output_data = (audio_chunk * 32768.0).astype(np.int16).tobytes()
elif self.format == "int8":
output_data = (audio_chunk * 128.0).astype(np.int8).tobytes()
elif self.format == "uint8":
output_data = ((audio_chunk * 128.0) + 128.0).astype(np.uint8).tobytes()
else:
output_data = bytes(frames_to_read * self.channels * 4)
# If we didn't read enough frames, pad with silence
if len(audio_chunk) < frame_count * self.channels:
bytes_per_sample = 4 if self.format == "float32" else 2 # Simplified
silence_bytes = bytes((frame_count - frames_to_read) * self.channels * bytes_per_sample)
output_data += silence_bytes
# Update position
self._audio_position += frames_to_read
# Trigger position change callback
if self._on_position_change:
position_sec = self._audio_position / self.sample_rate
duration_sec = self._total_frames / self.sample_rate
threading.Thread(
target=self._on_position_change,
args=(position_sec, duration_sec)
).start()
return (output_data, pyaudio.paContinue)
[docs]
def add_effect(self, effect: AudioEffect) -> "AudioPlayer":
"""
Add an audio processing effect.
Args:
effect: The audio effect to add
Returns:
Self for method chaining
"""
self._effects.append(effect)
return self
[docs]
def remove_effect(self, effect_name: str) -> bool:
"""
Remove an audio processing effect by name.
Args:
effect_name: Name of the effect to remove
Returns:
True if effect was removed, False if not found
"""
for i, effect in enumerate(self._effects):
if effect.name == effect_name:
self._effects.pop(i)
return True
return False
[docs]
def on_playback_complete(self, callback: Callable[[], None]) -> "AudioPlayer":
"""
Set callback for when playback completes.
Args:
callback: Function to call when playback finishes
Returns:
Self for method chaining
"""
self._on_complete = callback
return self
[docs]
def on_position_change(self, callback: Callable[[float, float], None]) -> "AudioPlayer":
"""
Set callback for playback position updates.
The callback will be called with current position (seconds) and
total duration (seconds) as arguments.
Args:
callback: Function to call with position updates
Returns:
Self for method chaining
"""
self._on_position_change = callback
return self
[docs]
def play(self, audio: Union[Audio, np.ndarray, str]) -> None:
"""
Play audio from an Audio object, numpy array, or file.
Args:
audio: Audio data to play. Can be:
- Audio object
- Numpy array (float32, -1.0 to 1.0 range)
- String path to audio file
"""
self.stop()
# Load audio data
if isinstance(audio, str):
self._load_audio_file(audio)
elif isinstance(audio, Audio):
self._load_audio_object(audio)
elif isinstance(audio, np.ndarray):
self._audio_data = audio
self._total_frames = len(audio) // max(1, self.channels)
else:
raise ValueError("Audio must be an Audio object, numpy array, or file path")
# Reset playback position
self._audio_position = 0
# Start playback
self._start_playback()
def _load_audio_file(self, file_path: str) -> None:
"""
Load audio data from a file.
Args:
file_path: Path to the audio file
"""
try:
import soundfile as sf
except ImportError:
raise DependencyError(
"soundfile is required for file playback. "
"Install with: pip install soundfile"
)
try:
data, file_sample_rate = sf.read(file_path, dtype='float32')
# Convert mono to stereo if needed
if len(data.shape) == 1 and self.channels == 2:
data = np.column_stack((data, data))
# Convert stereo to mono if needed
elif len(data.shape) == 2 and data.shape[1] == 2 and self.channels == 1:
data = np.mean(data, axis=1)
# Resample if needed
if file_sample_rate != self.sample_rate:
try:
from scipy import signal
# Calculate resampling ratio
ratio = self.sample_rate / file_sample_rate
# Determine output length
output_len = int(len(data) * ratio)
# Resample each channel
if len(data.shape) == 1:
# Mono
data = signal.resample(data, output_len)
else:
# Stereo or multi-channel
resampled = np.zeros((output_len, data.shape[1]), dtype=np.float32)
for i in range(data.shape[1]):
resampled[:, i] = signal.resample(data[:, i], output_len)
data = resampled
except ImportError:
# Simple resampling by interpolation if scipy not available
if file_sample_rate > self.sample_rate:
# Downsampling
step = file_sample_rate / self.sample_rate
indices = np.arange(0, len(data), step)
if len(data.shape) == 1:
data = np.interp(indices, np.arange(len(data)), data)
else:
resampled = np.zeros((len(indices), data.shape[1]), dtype=np.float32)
for i in range(data.shape[1]):
resampled[:, i] = np.interp(indices, np.arange(len(data)), data[:, i])
data = resampled
else:
# Upsampling - simple repeat
ratio = self.sample_rate / file_sample_rate
data = np.repeat(data, int(ratio), axis=0)
# Flatten for audio buffer if needed
if len(data.shape) > 1:
# Interleave channels
data = data.flatten('F')
self._audio_data = data
self._total_frames = len(data) // max(1, self.channels)
except Exception as e:
raise ValueError(f"Failed to load audio file: {str(e)}")
def _load_audio_object(self, audio: Audio) -> None:
"""
Load audio data from an Audio object.
Args:
audio: Audio object to load
"""
data = audio.data
# Resample if needed
if audio.sample_rate != self.sample_rate:
try:
from scipy import signal
ratio = self.sample_rate / audio.sample_rate
output_len = int(len(data) * ratio)
data = signal.resample(data, output_len)
except ImportError:
# Simple resampling if scipy not available
if audio.sample_rate > self.sample_rate:
# Downsampling
step = audio.sample_rate / self.sample_rate
indices = np.arange(0, len(data), step)
data = np.interp(indices, np.arange(len(data)), data)
else:
# Upsampling - simple repeat
ratio = self.sample_rate / audio.sample_rate
data = np.repeat(data, int(ratio))
# Convert mono to stereo if needed
if audio.channels == 1 and self.channels == 2:
data = np.repeat(data.reshape(-1, 1), 2, axis=1).flatten()
# Convert stereo to mono if needed
elif audio.channels == 2 and self.channels == 1:
# Reshape to [frames, channels]
stereo = data.reshape(-1, 2)
# Mix down to mono
data = np.mean(stereo, axis=1)
self._audio_data = data
self._total_frames = len(data) // max(1, self.channels)
def _start_playback(self) -> None:
"""Start audio playback."""
if not self._stream:
self.open()
self._is_playing = True
self._stop_event.clear()
# Start the stream
self._stream.start_stream()
[docs]
def stop(self) -> None:
"""Stop audio playback."""
if self._stream and self._stream.is_active():
self._stream.stop_stream()
self._is_playing = False
self._stop_event.set()
[docs]
def pause(self) -> None:
"""Pause audio playback."""
if self._stream and self._stream.is_active():
self._stream.stop_stream()
self._is_playing = False
[docs]
def resume(self) -> None:
"""Resume audio playback."""
if self._stream and not self._stream.is_active() and self._audio_data is not None:
self._stream.start_stream()
self._is_playing = True
[docs]
def seek(self, position_seconds: float) -> None:
"""
Seek to a specific position in the audio.
Args:
position_seconds: Position in seconds to seek to
"""
if self._audio_data is None:
return
# Calculate position in frames
position_frames = int(position_seconds * self.sample_rate)
# Clamp to valid range
position_frames = max(0, min(position_frames, self._total_frames))
# Update position
self._audio_position = position_frames
[docs]
def get_position(self) -> float:
"""
Get current playback position in seconds.
Returns:
Current position in seconds
"""
if self._audio_data is None:
return 0.0
return self._audio_position / self.sample_rate
[docs]
def get_duration(self) -> float:
"""
Get total duration of the loaded audio in seconds.
Returns:
Total duration in seconds
"""
if self._audio_data is None:
return 0.0
return self._total_frames / self.sample_rate
[docs]
def is_playing(self) -> bool:
"""
Check if audio is currently playing.
Returns:
True if audio is playing, False otherwise
"""
return self._is_playing and self._stream and self._stream.is_active()
[docs]
@staticmethod
def list_devices() -> List[Dict[str, Any]]:
"""
List available audio output devices.
Returns:
List of dictionaries containing device information
"""
if pyaudio is None:
raise DependencyError(
"PyAudio is not installed. Install with: pip install pyaudio"
)
p = pyaudio.PyAudio()
devices = []
try:
# Get device count
device_count = p.get_device_count()
# Iterate over all devices
for i in range(device_count):
device_info = p.get_device_info_by_index(i)
# Only include output devices
if device_info.get('maxOutputChannels', 0) > 0:
devices.append({
'index': device_info.get('index', i),
'name': device_info.get('name', f"Device {i}"),
'channels': device_info.get('maxOutputChannels', 0),
'sample_rates': [
int(r) for r in device_info.get('supportedSampleRates', [44100, 48000])
],
'default': device_info.get('isDefaultOutputDevice', False)
})
finally:
p.terminate()
return devices
[docs]
@staticmethod
def get_default_device() -> Optional[Dict[str, Any]]:
"""
Get the default audio output device.
Returns:
Default device information or None if not found
"""
devices = AudioPlayer.list_devices()
# Find default device
for device in devices:
if device.get('default', False):
return device
# If no default device is marked, return the first one
if devices:
return devices[0]
return None
# Convenient alias
Player = AudioPlayer