"""
fmus_vox.stream.voice_stream - Real-time voice streaming functionality.
This module provides classes for continuous audio streaming and processing,
with support for speech detection, VAD, and real-time transcription.
"""
import threading
import time
import queue
from typing import Callable, Dict, Any, List, Optional, Union, Tuple
import numpy as np
from fmus_vox.core.audio import Audio
from fmus_vox.core.errors import StreamError
from .microphone import Microphone
[docs]
class StreamBuffer:
"""
Audio buffer for streaming applications.
This class manages a ring buffer for audio data, providing methods
to add, retrieve, and manipulate audio frames for streaming processing.
"""
[docs]
def __init__(
self,
max_duration: float = 10.0,
sample_rate: int = 16000,
channels: int = 1,
dtype: np.dtype = np.float32
):
"""
Initialize an audio buffer for streaming.
Args:
max_duration: Maximum buffer duration in seconds.
sample_rate: Sample rate of the audio.
channels: Number of audio channels.
dtype: Data type for the buffer.
"""
self.sample_rate = sample_rate
self.channels = channels
self.max_duration = max_duration
self.dtype = dtype
# Calculate buffer size in samples
self.max_samples = int(max_duration * sample_rate * channels)
self.buffer = np.zeros(self.max_samples, dtype=dtype)
self.write_pos = 0
self.total_samples = 0
self.lock = threading.RLock()
[docs]
def write(self, data: Union[np.ndarray, bytes]) -> int:
"""
Write audio data to the buffer.
Args:
data: Audio data to write, as numpy array or bytes.
Returns:
Number of samples written.
"""
# Convert bytes to numpy array if needed
if isinstance(data, bytes):
if self.dtype == np.float32:
data = np.frombuffer(data, dtype=np.float32)
elif self.dtype == np.int16:
data = np.frombuffer(data, dtype=np.int16)
else:
raise ValueError(f"Unsupported data type for bytes conversion: {self.dtype}")
# Ensure data is the right dtype
if data.dtype != self.dtype:
data = data.astype(self.dtype)
with self.lock:
# Determine how many samples to write
n_samples = min(len(data), self.max_samples)
# Handle wrap-around if needed
first_chunk = min(n_samples, self.max_samples - self.write_pos)
self.buffer[self.write_pos:self.write_pos + first_chunk] = data[:first_chunk]
# If we need to wrap around
if first_chunk < n_samples:
second_chunk = n_samples - first_chunk
self.buffer[:second_chunk] = data[first_chunk:n_samples]
self.write_pos = second_chunk
else:
self.write_pos = (self.write_pos + first_chunk) % self.max_samples
self.total_samples += n_samples
return n_samples
[docs]
def read(self, duration: float = None, n_samples: int = None) -> np.ndarray:
"""
Read audio data from the buffer.
Args:
duration: Duration to read in seconds.
n_samples: Number of samples to read (overrides duration if provided).
Returns:
Numpy array containing the requested audio data.
"""
with self.lock:
# Calculate how many samples to read
if n_samples is None:
if duration is None:
# Default to entire buffer
n_samples = min(self.total_samples, self.max_samples)
else:
n_samples = int(duration * self.sample_rate * self.channels)
# Limit to available samples
n_samples = min(n_samples, min(self.total_samples, self.max_samples))
if n_samples == 0:
return np.array([], dtype=self.dtype)
# Calculate read position
read_pos = (self.write_pos - n_samples) % self.max_samples
# Handle wrap-around if needed
if read_pos < self.write_pos:
# No wrap-around needed
return self.buffer[read_pos:self.write_pos].copy()
else:
# Need to wrap around
first_part = self.buffer[read_pos:].copy()
second_part = self.buffer[:self.write_pos].copy()
return np.concatenate([first_part, second_part])
[docs]
def read_latest(self, duration: float) -> np.ndarray:
"""
Read the most recent audio data from the buffer.
Args:
duration: Duration to read in seconds.
Returns:
Numpy array containing the most recent audio data.
"""
n_samples = int(duration * self.sample_rate * self.channels)
return self.read(n_samples=n_samples)
[docs]
def clear(self) -> None:
"""Clear the buffer."""
with self.lock:
self.buffer.fill(0)
self.write_pos = 0
self.total_samples = 0
[docs]
def to_audio(self, duration: float = None) -> Audio:
"""
Convert buffer contents to an Audio object.
Args:
duration: Duration to convert in seconds. If None, uses all available data.
Returns:
Audio object containing the buffer data.
"""
data = self.read(duration=duration)
return Audio(
data,
sample_rate=self.sample_rate,
channels=self.channels
)
[docs]
def __len__(self) -> int:
"""Return the number of samples currently in the buffer."""
return min(self.total_samples, self.max_samples)
[docs]
class VoiceStream:
"""
Real-time voice processing stream.
This class provides functionality for continuous voice processing,
including voice activity detection, speech segmentation, and
real-time transcription.
"""
[docs]
def __init__(
self,
input_device: Optional[Union[int, Microphone]] = None,
sample_rate: int = 16000,
channels: int = 1,
buffer_duration: float = 30.0,
vad_mode: str = "normal",
min_silence_duration: float = 0.5,
min_speech_duration: float = 0.3,
**kwargs
):
"""
Initialize a voice stream for continuous processing.
Args:
input_device: Microphone device index or Microphone instance.
If None, the default input device is used.
sample_rate: Sample rate for audio processing.
channels: Number of audio channels.
buffer_duration: Maximum duration of audio buffer in seconds.
vad_mode: Voice activity detection sensitivity ('aggressive',
'normal', or 'relaxed').
min_silence_duration: Minimum silence duration to consider a
speech segment complete.
min_speech_duration: Minimum speech duration to consider a
speech segment valid.
**kwargs: Additional parameters for the microphone.
"""
# Set up microphone input
if isinstance(input_device, Microphone):
self.microphone = input_device
else:
self.microphone = Microphone(
device_index=input_device,
sample_rate=sample_rate,
channels=channels,
format="float32",
**kwargs
)
self.sample_rate = sample_rate
self.channels = channels
self.buffer_duration = buffer_duration
# Set up buffers
self.audio_buffer = StreamBuffer(
max_duration=buffer_duration,
sample_rate=sample_rate,
channels=channels
)
# VAD parameters
self.vad_mode = vad_mode
self.min_silence_duration = min_silence_duration
self.min_speech_duration = min_speech_duration
# Speech segmentation state
self.is_speech_active = False
self.speech_start_time = 0
self.last_speech_end_time = 0
self.silence_start_time = 0
# State variables
self._is_running = False
self._stop_event = threading.Event()
self._processing_thread = None
# Callbacks
self.callbacks = {
"on_audio": [],
"on_speech_start": [],
"on_speech_end": [],
"on_speech": [],
"on_vad": [],
}
# Try to import VAD
try:
import webrtcvad
self.vad = webrtcvad.Vad()
self._set_vad_aggressiveness()
except ImportError:
self.vad = None
def _set_vad_aggressiveness(self) -> None:
"""Set the VAD aggressiveness based on the mode setting."""
if self.vad is None:
return
# Convert string mode to numeric level
if self.vad_mode == "aggressive":
level = 3
elif self.vad_mode == "normal":
level = 2
elif self.vad_mode == "relaxed":
level = 1
else:
level = 2 # Default to normal
self.vad.set_mode(level)
[docs]
def on_audio(self, callback: Callable[[np.ndarray, Dict[str, Any]], None]) -> None:
"""
Register a callback for raw audio data.
Args:
callback: Function that takes (audio_data, metadata) parameters.
"""
self.callbacks["on_audio"].append(callback)
[docs]
def on_speech_start(self, callback: Callable[[Dict[str, Any]], None]) -> None:
"""
Register a callback for when speech begins.
Args:
callback: Function that takes a metadata dictionary.
"""
self.callbacks["on_speech_start"].append(callback)
[docs]
def on_speech_end(self, callback: Callable[[Audio, Dict[str, Any]], None]) -> None:
"""
Register a callback for when speech ends.
Args:
callback: Function that takes (audio, metadata) parameters.
"""
self.callbacks["on_speech_end"].append(callback)
[docs]
def on_speech(self, callback: Callable[[Audio, Dict[str, Any]], None]) -> None:
"""
Register a callback for complete speech segments.
Equivalent to on_speech_end but with a more intuitive name.
Args:
callback: Function that takes (audio, metadata) parameters.
"""
self.callbacks["on_speech_end"].append(callback)
[docs]
def on_vad(self, callback: Callable[[bool, Dict[str, Any]], None]) -> None:
"""
Register a callback for voice activity detection events.
Args:
callback: Function that takes (is_speech, metadata) parameters.
"""
self.callbacks["on_vad"].append(callback)
def _run_callbacks(self, event_type: str, *args) -> None:
"""
Run all registered callbacks for an event type.
Args:
event_type: The type of event ('on_audio', 'on_speech_start', etc.)
*args: Arguments to pass to the callbacks.
"""
if event_type in self.callbacks:
for callback in self.callbacks[event_type]:
try:
callback(*args)
except Exception as e:
# Log but don't crash
print(f"Error in {event_type} callback: {str(e)}")
[docs]
def start(self) -> None:
"""Start the voice stream processing."""
if self._is_running:
return
# Clear state
self._stop_event.clear()
self._is_running = True
self.audio_buffer.clear()
# Reset speech detection state
self.is_speech_active = False
self.speech_start_time = 0
self.last_speech_end_time = 0
self.silence_start_time = 0
# Start the microphone
self.microphone.open()
self.microphone.start_recording()
# Start processing thread
self._processing_thread = threading.Thread(target=self._process_stream)
self._processing_thread.daemon = True
self._processing_thread.start()
[docs]
def stop(self) -> None:
"""Stop the voice stream processing."""
if not self._is_running:
return
self._is_running = False
self._stop_event.set()
# Stop the microphone
try:
self.microphone.stop_recording()
self.microphone.close()
except:
pass
# Wait for processing thread to end
if self._processing_thread and self._processing_thread.is_alive():
self._processing_thread.join(timeout=1.0)
self._processing_thread = None
def _process_stream(self) -> None:
"""Main processing loop for the voice stream."""
chunk_duration = 0.03 # 30ms chunks for VAD
chunk_samples = int(chunk_duration * self.sample_rate)
processing_interval = 0.01 # 10ms processing interval
# Check if we have VAD
has_vad = self.vad is not None
# Processing loop
while self._is_running and not self._stop_event.is_set():
start_time = time.time()
# Read a chunk of audio
if self.channels == 1:
audio_chunk = self.microphone.read(chunk_samples)
else:
# If stereo, we need to convert to mono for VAD
stereo_chunk = self.microphone.read(chunk_samples)
mono_samples = np.frombuffer(stereo_chunk, dtype=np.float32)
mono_samples = mono_samples.reshape(-1, self.channels).mean(axis=1)
audio_chunk = mono_samples.tobytes()
# Add to buffer
self.audio_buffer.write(audio_chunk)
# Run audio callbacks
audio_data = np.frombuffer(audio_chunk, dtype=np.float32)
metadata = {
"timestamp": time.time(),
"sample_rate": self.sample_rate,
"channels": self.channels,
}
self._run_callbacks("on_audio", audio_data, metadata)
# Voice activity detection
is_speech = False
if has_vad:
try:
# WebRTC VAD requires 16-bit PCM
pcm_data = np.frombuffer(audio_chunk, dtype=np.float32)
pcm_data = (pcm_data * 32767).astype(np.int16).tobytes()
is_speech = self.vad.is_speech(pcm_data, self.sample_rate)
except Exception as e:
# VAD can fail if the chunk is not exactly the right size
print(f"VAD error: {str(e)}")
# Run VAD callbacks
vad_metadata = {
"timestamp": time.time(),
"sample_rate": self.sample_rate,
}
self._run_callbacks("on_vad", is_speech, vad_metadata)
# Speech segmentation logic
now = time.time()
if is_speech:
if not self.is_speech_active:
# Start of speech
self.is_speech_active = True
self.speech_start_time = now
# Run speech start callbacks
speech_start_metadata = {
"timestamp": now,
}
self._run_callbacks("on_speech_start", speech_start_metadata)
# Reset silence counter
self.silence_start_time = 0
else:
# Not speech
if self.is_speech_active:
# In speech mode but got silence
if self.silence_start_time == 0:
# Start of silence
self.silence_start_time = now
elif now - self.silence_start_time >= self.min_silence_duration:
# Silence duration exceeded threshold, end speech segment
speech_duration = now - self.speech_start_time - (now - self.silence_start_time)
if speech_duration >= self.min_speech_duration:
# Valid speech segment
# Get the speech audio from buffer
speech_audio = self.audio_buffer.to_audio(
duration=speech_duration + self.min_silence_duration
)
# Run speech end callbacks
speech_end_metadata = {
"start_time": self.speech_start_time,
"end_time": now,
"duration": speech_duration,
}
self._run_callbacks("on_speech_end", speech_audio, speech_end_metadata)
# Reset speech detection state
self.is_speech_active = False
self.speech_start_time = 0
self.last_speech_end_time = now
self.silence_start_time = 0
# Sleep to maintain processing interval
elapsed = time.time() - start_time
if elapsed < processing_interval:
time.sleep(processing_interval - elapsed)
[docs]
def __enter__(self):
"""Start the stream when used as a context manager."""
self.start()
return self
[docs]
def __exit__(self, exc_type, exc_val, exc_tb):
"""Stop the stream when exiting context manager."""
self.stop()
[docs]
def __del__(self):
"""Clean up resources when garbage collected."""
self.stop()