Source code for fmus_vox.stream.microphone

"""
fmus_vox.stream.microphone - Enhanced microphone audio streaming implementation.

This module provides comprehensive functionality for capturing audio from microphone devices,
with support for device selection, audio visualization, and real-time processing.
"""

import queue
import threading
import time
import collections
from typing import Optional, Union, Callable, Dict, Any, List, Tuple, Deque

import numpy as np

from fmus_vox.core.audio import Audio
from fmus_vox.core.errors import DeviceError, DependencyError

try:
    import pyaudio
except ImportError:
    pyaudio = None


[docs] class AudioFilter: """ Base class for real-time audio filters. Subclasses should implement the process method to perform audio processing on incoming audio data. """
[docs] def __init__(self, name: str = "AudioFilter"): """ Initialize an audio filter. Args: name: Name of the filter for identification """ self.name = name self.enabled = True
[docs] def process(self, data: np.ndarray, sample_rate: int) -> np.ndarray: """ Process audio data. Args: data: Audio data as numpy array sample_rate: Sample rate of the audio Returns: Processed audio data """ # Base implementation just returns the data unchanged return data
[docs] def enable(self): """Enable the filter.""" self.enabled = True
[docs] def disable(self): """Disable the filter.""" self.enabled = False
[docs] class NoiseReduction(AudioFilter): """ Noise reduction filter. Reduces background noise in audio recordings. """
[docs] def __init__(self, strength: float = 0.5): """ Initialize noise reduction filter. Args: strength: Noise reduction strength (0.0 to 1.0) """ super().__init__("NoiseReduction") self.strength = max(0.0, min(1.0, strength)) self._noise_profile = None self._initialized = False
[docs] def calibrate(self, noise_sample: np.ndarray): """ Calibrate the noise profile from a sample of background noise. Args: noise_sample: Audio sample containing only background noise """ try: import noisereduce as nr self._noise_profile = noise_sample self._initialized = True except ImportError: raise DependencyError( "noisereduce package is required for noise reduction. " "Install with: pip install noisereduce" )
[docs] def process(self, data: np.ndarray, sample_rate: int) -> np.ndarray: """Apply noise reduction to the audio data.""" if not self.enabled: return data try: import noisereduce as nr # If we haven't been calibrated, use the first part of the # audio as the noise profile if not self._initialized: noise_len = min(int(len(data) * 0.1), sample_rate // 4) # Use first 100ms or 250ms as noise self._noise_profile = data[:noise_len] self._initialized = True # Apply noise reduction return nr.reduce_noise( y=data, sr=sample_rate, y_noise=self._noise_profile, prop_decrease=self.strength ) except ImportError: # Return unprocessed data if noisereduce isn't available return data
[docs] class Normalization(AudioFilter): """ Audio normalization filter. Normalizes audio volume to a target level. """
[docs] def __init__(self, target_db: float = -3.0): """ Initialize normalization filter. Args: target_db: Target dB level to normalize to """ super().__init__("Normalization") self.target_db = target_db
[docs] def process(self, data: np.ndarray, sample_rate: int) -> np.ndarray: """Normalize audio volume.""" if not self.enabled or len(data) == 0: return data # Calculate current dB level eps = np.finfo(float).eps # To avoid log(0) current_db = 20 * np.log10(np.max(np.abs(data)) + eps) # Calculate gain gain = 10 ** ((self.target_db - current_db) / 20) # Apply gain return data * gain
[docs] class AudioLevelMeter: """ Audio level meter for real-time visualization. Provides RMS and peak level measurements for audio visualization. """
[docs] def __init__(self, window_size: int = 10): """ Initialize audio level meter. Args: window_size: Size of the averaging window in frames """ self.window_size = window_size self.rms_values: Deque[float] = collections.deque(maxlen=window_size) self.peak_values: Deque[float] = collections.deque(maxlen=window_size)
[docs] def process(self, data: np.ndarray) -> Dict[str, float]: """ Process audio data and calculate levels. Args: data: Audio data as numpy array Returns: Dictionary with rms and peak levels """ if len(data) == 0: return {"rms": 0.0, "peak": 0.0, "avg_rms": 0.0, "avg_peak": 0.0} # Calculate RMS level rms = np.sqrt(np.mean(np.square(data))) # Calculate peak level peak = np.max(np.abs(data)) # Add to history self.rms_values.append(rms) self.peak_values.append(peak) # Calculate averages avg_rms = sum(self.rms_values) / len(self.rms_values) avg_peak = sum(self.peak_values) / len(self.peak_values) return { "rms": rms, "peak": peak, "avg_rms": avg_rms, "avg_peak": avg_peak }
[docs] class Microphone: """ Enhanced class for recording audio from a microphone device. This class provides both blocking and streaming interfaces for capturing audio from microphone input devices, with support for device selection, audio visualization, and real-time processing. """ # Map of common formats to PyAudio format constants FORMAT_MAP = { "float32": None, # Will be set if pyaudio is imported "int32": None, "int24": None, "int16": None, "int8": None, "uint8": None, }
[docs] def __init__( self, device_index: Optional[int] = None, sample_rate: int = 16000, channels: int = 1, format: str = "float32", chunk_size: int = 1024, **kwargs ): """ Initialize a microphone input stream. Args: device_index: Index of the input device to use. None for default. sample_rate: Sample rate to record at. channels: Number of audio channels to record. format: Audio format ('float32', 'int16', etc.) chunk_size: Size of audio chunks to process at once. **kwargs: Additional parameters for PyAudio. """ if pyaudio is None: raise DependencyError( "PyAudio is not installed. Install with: pip install pyaudio" ) # Initialize format map with actual pyaudio constants if not Microphone.FORMAT_MAP["float32"]: Microphone.FORMAT_MAP = { "float32": pyaudio.paFloat32, "int32": pyaudio.paInt32, "int24": pyaudio.paInt24, "int16": pyaudio.paInt16, "int8": pyaudio.paInt8, "uint8": pyaudio.paUInt8, } self.device_index = device_index self.sample_rate = sample_rate self.channels = channels self.format = format self.chunk_size = chunk_size self.kwargs = kwargs self._pyaudio_instance = None self._stream = None self._audio_buffer = queue.Queue() self._is_recording = False self._stop_event = threading.Event() # Audio processing self._filters: List[AudioFilter] = [] self._level_meter = AudioLevelMeter() self._visualization_callback = None
[docs] def __enter__(self): """Start the microphone stream when used as a context manager.""" self.open() return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): """Close the microphone stream when exiting context manager.""" self.close()
[docs] def open(self): """ Open the microphone stream. Raises: DeviceError: If the specified device cannot be opened. """ try: self._pyaudio_instance = pyaudio.PyAudio() # Get pyaudio format from string format pa_format = Microphone.FORMAT_MAP.get(self.format.lower()) if pa_format is None: raise ValueError(f"Unsupported audio format: {self.format}") self._stream = self._pyaudio_instance.open( input=True, output=False, start=True, frames_per_buffer=self.chunk_size, rate=self.sample_rate, channels=self.channels, format=pa_format, input_device_index=self.device_index, stream_callback=self._audio_callback, **self.kwargs ) except Exception as e: if self._pyaudio_instance: self._pyaudio_instance.terminate() self._pyaudio_instance = None raise DeviceError(f"Failed to open microphone: {str(e)}") return self
[docs] def close(self): """Close the microphone stream and release resources.""" if self._stream: self._stream.stop_stream() self._stream.close() self._stream = None if self._pyaudio_instance: self._pyaudio_instance.terminate() self._pyaudio_instance = None # Clear the buffer while not self._audio_buffer.empty(): try: self._audio_buffer.get_nowait() except queue.Empty: break self._is_recording = False self._stop_event.set()
def _bytes_to_numpy(self, audio_bytes: bytes) -> np.ndarray: """ Convert raw audio bytes to numpy array. Args: audio_bytes: Raw audio data as bytes Returns: Audio data as numpy array """ if self.format == "float32": return np.frombuffer(audio_bytes, dtype=np.float32) elif self.format == "int32": return np.frombuffer(audio_bytes, dtype=np.int32).astype(np.float32) / 2147483648.0 elif self.format == "int24": # For int24, we need to convert it to int32 first # This is a bit more complex, we'll use a simplified approach return np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0 elif self.format == "int16": return np.frombuffer(audio_bytes, dtype=np.int16).astype(np.float32) / 32768.0 elif self.format == "int8": return np.frombuffer(audio_bytes, dtype=np.int8).astype(np.float32) / 128.0 elif self.format == "uint8": return (np.frombuffer(audio_bytes, dtype=np.uint8).astype(np.float32) - 128.0) / 128.0 else: raise ValueError(f"Unsupported format: {self.format}") def _numpy_to_bytes(self, audio_array: np.ndarray) -> bytes: """ Convert numpy array to raw audio bytes. Args: audio_array: Audio data as numpy array Returns: Raw audio data as bytes """ if self.format == "float32": return audio_array.astype(np.float32).tobytes() elif self.format == "int32": return (audio_array * 2147483648.0).astype(np.int32).tobytes() elif self.format == "int24": # int24 is tricky, we'll use int16 as approximation # (this would need proper implementation in production) return (audio_array * 32768.0).astype(np.int16).tobytes() elif self.format == "int16": return (audio_array * 32768.0).astype(np.int16).tobytes() elif self.format == "int8": return (audio_array * 128.0).astype(np.int8).tobytes() elif self.format == "uint8": return ((audio_array * 128.0) + 128.0).astype(np.uint8).tobytes() else: raise ValueError(f"Unsupported format: {self.format}") def _audio_callback(self, in_data, frame_count, time_info, status): """Callback for PyAudio to handle incoming audio data.""" if self._is_recording: # Process the audio data if we have filters if self._filters: # Convert bytes to numpy array audio_data = self._bytes_to_numpy(in_data) # Apply each filter in sequence for filter in self._filters: if filter.enabled: audio_data = filter.process(audio_data, self.sample_rate) # Convert back to bytes processed_data = self._numpy_to_bytes(audio_data) self._audio_buffer.put(processed_data) # Compute levels and call visualization callback if set if self._visualization_callback: levels = self._level_meter.process(audio_data) self._visualization_callback(levels) else: # No processing needed self._audio_buffer.put(in_data) # Still compute levels if we have a visualization callback if self._visualization_callback: audio_data = self._bytes_to_numpy(in_data) levels = self._level_meter.process(audio_data) self._visualization_callback(levels) return (None, pyaudio.paContinue)
[docs] def add_filter(self, filter: AudioFilter) -> "Microphone": """ Add an audio processing filter. Args: filter: The audio filter to add Returns: Self for method chaining """ self._filters.append(filter) return self
[docs] def remove_filter(self, filter_name: str) -> bool: """ Remove an audio processing filter by name. Args: filter_name: Name of the filter to remove Returns: True if filter was removed, False if not found """ for i, filter in enumerate(self._filters): if filter.name == filter_name: self._filters.pop(i) return True return False
[docs] def set_visualization_callback( self, callback: Callable[[Dict[str, float]], None] ) -> "Microphone": """ Set a callback for audio level visualization. The callback will be called with a dictionary containing: - rms: Root mean square level (0.0 to 1.0) - peak: Peak level (0.0 to 1.0) - avg_rms: Average RMS over window - avg_peak: Average peak over window Args: callback: Function to call with audio level data Returns: Self for method chaining """ self._visualization_callback = callback return self
[docs] def read(self, num_frames: Optional[int] = None) -> bytes: """ Read audio data from the microphone. Args: num_frames: Number of frames to read. If None, reads one chunk. Returns: Raw audio data as bytes. """ if not self._stream: self.open() chunk_size = num_frames if num_frames is not None else self.chunk_size # Calculate how many chunks we need to read num_chunks = (chunk_size + self.chunk_size - 1) // self.chunk_size # Read the specified number of chunks chunks = [] for _ in range(num_chunks): try: chunk = self._audio_buffer.get(timeout=1.0) chunks.append(chunk) except queue.Empty: break # Concatenate all chunks data = b''.join(chunks) # If we need to truncate the data to match num_frames exactly if num_frames is not None: bytes_per_sample = 4 if self.format == "float32" else 2 # Assuming int16 otherwise bytes_per_frame = bytes_per_sample * self.channels data = data[:num_frames * bytes_per_frame] return data
[docs] def start_recording(self): """Start recording audio to internal buffer.""" if not self._stream: self.open() self._is_recording = True self._stop_event.clear()
[docs] def stop_recording(self) -> Audio: """ Stop recording and return the recorded audio. Returns: Audio object containing the recorded audio """ self._is_recording = False # Wait for any remaining data to be processed time.sleep(0.1) # Get all data from the buffer chunks = [] while not self._audio_buffer.empty(): try: chunk = self._audio_buffer.get_nowait() chunks.append(chunk) except queue.Empty: break # Concatenate all chunks data = b''.join(chunks) # Convert to numpy array audio_data = self._bytes_to_numpy(data) # Create Audio object return Audio(audio_data, self.sample_rate)
[docs] def record_until_silence( self, silence_threshold: float = 0.01, silence_duration: float = 1.0, max_seconds: Optional[float] = None, pre_buffer_seconds: float = 0.5 ) -> Audio: """ Record until silence is detected. Args: silence_threshold: Threshold for silence detection (0.0 to 1.0) silence_duration: Duration of silence to stop recording (seconds) max_seconds: Maximum recording duration (seconds) pre_buffer_seconds: Seconds of audio to include before speech starts Returns: Audio object containing the recorded audio """ if not self._stream: self.open() # Clear the buffer while not self._audio_buffer.empty(): try: self._audio_buffer.get_nowait() except queue.Empty: break # Start recording self._is_recording = True self._stop_event.clear() # Pre-buffer for catching the start of speech pre_buffer = [] pre_buffer_size = int(pre_buffer_seconds * self.sample_rate / self.chunk_size) # Recording loop chunks = [] silence_count = 0 max_count = None if max_seconds is None else int(max_seconds * self.sample_rate / self.chunk_size) speech_detected = False try: while True: # Check if we've reached the maximum duration if max_count is not None and len(chunks) >= max_count: break # Get audio chunk try: chunk = self._audio_buffer.get(timeout=0.1) except queue.Empty: continue # Convert to numpy for level detection chunk_data = self._bytes_to_numpy(chunk) # Calculate audio level level = np.max(np.abs(chunk_data)) # Check if this is speech is_speech = level > silence_threshold if not speech_detected: # We're still waiting for speech to start if is_speech: # Speech has started, add pre-buffer to chunks speech_detected = True chunks.extend(pre_buffer) chunks.append(chunk) silence_count = 0 else: # Still silence, update pre-buffer (circular buffer) pre_buffer.append(chunk) if len(pre_buffer) > pre_buffer_size: pre_buffer.pop(0) else: # We're recording speech chunks.append(chunk) if is_speech: # Reset silence counter silence_count = 0 else: # Increment silence counter silence_count += 1 # Check if we've reached the silence duration silence_frames = int(silence_duration * self.sample_rate / self.chunk_size) if silence_count >= silence_frames: break # Check for stop event if self._stop_event.is_set(): break finally: # Stop recording self._is_recording = False # If we never detected speech, return empty audio if not speech_detected: return Audio(np.array([], dtype=np.float32), self.sample_rate) # Concatenate all chunks data = b''.join(chunks) # Convert to numpy array audio_data = self._bytes_to_numpy(data) # Create Audio object return Audio(audio_data, self.sample_rate)
[docs] def record(self, seconds: float, visualization_callback: Optional[Callable] = None) -> Audio: """ Record audio for a specified duration. Args: seconds: Duration to record in seconds visualization_callback: Optional callback for visualization during recording Returns: Audio object containing the recorded audio """ if not self._stream: self.open() # Set visualization callback if provided old_callback = self._visualization_callback if visualization_callback: self._visualization_callback = visualization_callback # Clear the buffer while not self._audio_buffer.empty(): try: self._audio_buffer.get_nowait() except queue.Empty: break # Start recording self._is_recording = True self._stop_event.clear() # Calculate number of frames to record num_frames = int(seconds * self.sample_rate) bytes_per_sample = 4 if self.format == "float32" else 2 # Assuming int16 otherwise bytes_per_frame = bytes_per_sample * self.channels total_bytes = num_frames * bytes_per_frame # Recording loop chunks = [] bytes_recorded = 0 try: while bytes_recorded < total_bytes: # Check for stop event if self._stop_event.is_set(): break # Get audio chunk try: chunk = self._audio_buffer.get(timeout=0.1) chunks.append(chunk) bytes_recorded += len(chunk) except queue.Empty: continue # Wait a little bit to make sure we get all data time.sleep(0.1) # Get any remaining data while not self._audio_buffer.empty() and bytes_recorded < total_bytes: try: chunk = self._audio_buffer.get_nowait() chunks.append(chunk) bytes_recorded += len(chunk) except queue.Empty: break finally: # Stop recording self._is_recording = False # Restore original visualization callback self._visualization_callback = old_callback # Concatenate all chunks data = b''.join(chunks) # Trim to exact length data = data[:total_bytes] # Convert to numpy array audio_data = self._bytes_to_numpy(data) # Create Audio object return Audio(audio_data, self.sample_rate)
[docs] @staticmethod def list_devices() -> List[Dict[str, Any]]: """ List available audio input devices. Returns: List of dictionaries containing device information """ if pyaudio is None: raise DependencyError( "PyAudio is not installed. Install with: pip install pyaudio" ) p = pyaudio.PyAudio() devices = [] try: # Get device count device_count = p.get_device_count() # Iterate over all devices for i in range(device_count): device_info = p.get_device_info_by_index(i) # Only include input devices if device_info.get('maxInputChannels', 0) > 0: devices.append({ 'index': device_info.get('index', i), 'name': device_info.get('name', f"Device {i}"), 'channels': device_info.get('maxInputChannels', 0), 'sample_rates': [ int(r) for r in device_info.get('supportedSampleRates', [44100, 48000]) ], 'default': device_info.get('isDefaultInputDevice', False) }) finally: p.terminate() return devices
[docs] @staticmethod def get_default_device() -> Optional[Dict[str, Any]]: """ Get the default audio input device. Returns: Default device information or None if not found """ devices = Microphone.list_devices() # Find default device for device in devices: if device.get('default', False): return device # If no default device is marked, return the first one if devices: return devices[0] return None
[docs] def calibrate_noise_profile(self, seconds: float = 2.0) -> None: """ Calibrate noise reduction filter with ambient noise. Records ambient noise to calibrate the noise reduction filter. Args: seconds: Duration to record ambient noise in seconds """ # Find noise reduction filter noise_filter = None for filter in self._filters: if isinstance(filter, NoiseReduction): noise_filter = filter break # If no noise reduction filter exists, create one if noise_filter is None: noise_filter = NoiseReduction() self.add_filter(noise_filter) # Record ambient noise print("Recording ambient noise for calibration...") # Temporarily disable all filters enabled_filters = [] for filter in self._filters: if filter.enabled: enabled_filters.append(filter) filter.enabled = False # Record ambient noise noise_audio = self.record(seconds) # Re-enable filters for filter in enabled_filters: filter.enabled = True # Calibrate noise filter noise_filter.calibrate(noise_audio.data) print("Noise profile calibrated.")
[docs] def create_vad_detector(self, threshold: float = 0.02, window_size: int = 10) -> None: """ Create a Voice Activity Detector filter. Args: threshold: Threshold for voice detection (0.0 to 1.0) window_size: Window size for smoothing in frames """ class VAD(AudioFilter): def __init__(self, threshold: float, window_size: int): super().__init__("VAD") self.threshold = threshold self.window_size = window_size self.energy_history = collections.deque(maxlen=window_size) self.speech_detected = False self.callback = None def set_callback(self, callback: Callable[[bool], None]): self.callback = callback def process(self, data: np.ndarray, sample_rate: int) -> np.ndarray: # Calculate energy energy = np.mean(np.square(data)) self.energy_history.append(energy) # Calculate average energy avg_energy = sum(self.energy_history) / len(self.energy_history) # Detect speech is_speech = avg_energy > self.threshold # Call callback if state changed if is_speech != self.speech_detected and self.callback: self.callback(is_speech) self.speech_detected = is_speech # Return data unchanged return data # Create VAD filter vad = VAD(threshold, window_size) self.add_filter(vad) return vad
# Convenient alias Mic = Microphone