|
import os |
|
import io |
|
import abc |
|
import six |
|
import numpy as np |
|
import librosa |
|
import soundfile as sf |
|
import tensorflow as tf |
|
|
|
from util.utils import log10 |
|
from .gammatone import fft_weights |
|
|
|
|
|
def read_raw_audio(audio, sample_rate=16000): |
|
if isinstance(audio, str): |
|
wave, _ = librosa.load(os.path.expanduser(audio), sr=sample_rate) |
|
elif isinstance(audio, bytes): |
|
wave, sr = sf.read(io.BytesIO(audio)) |
|
wave = np.asfortranarray(wave) |
|
if sr != sample_rate: |
|
wave = librosa.resample(wave, sr, sample_rate) |
|
elif isinstance(audio, np.ndarray): |
|
return audio |
|
else: |
|
raise ValueError("input audio must be either a path or bytes") |
|
return wave |
|
|
|
|
|
def slice_signal(signal, window_size, stride=0.5) -> np.ndarray: |
|
""" Return windows of the given signal by sweeping in stride fractions of window """ |
|
assert signal.ndim == 1, signal.ndim |
|
n_samples = signal.shape[0] |
|
offset = int(window_size * stride) |
|
slices = [] |
|
for beg_i, end_i in zip(range(0, n_samples, offset), |
|
range(window_size, n_samples + offset, |
|
offset)): |
|
slice_ = signal[beg_i:end_i] |
|
if slice_.shape[0] < window_size: |
|
slice_ = np.pad( |
|
slice_, (0, window_size - slice_.shape[0]), 'constant', constant_values=0.0) |
|
if slice_.shape[0] == window_size: |
|
slices.append(slice_) |
|
return np.array(slices, dtype=np.float32) |
|
|
|
|
|
def tf_merge_slices(slices: tf.Tensor) -> tf.Tensor: |
|
|
|
return tf.keras.backend.flatten(slices) |
|
|
|
|
|
def merge_slices(slices: np.ndarray) -> np.ndarray: |
|
|
|
return np.reshape(slices, [-1]) |
|
|
|
|
|
def normalize_audio_feature(audio_feature: np.ndarray, per_feature=False): |
|
""" Mean and variance normalization """ |
|
axis = 0 if per_feature else None |
|
mean = np.mean(audio_feature, axis=axis) |
|
std_dev = np.std(audio_feature, axis=axis) + 1e-9 |
|
normalized = (audio_feature - mean) / std_dev |
|
return normalized |
|
|
|
|
|
def tf_normalize_audio_features(audio_feature: tf.Tensor, per_feature=False): |
|
""" |
|
TF Mean and variance features normalization |
|
Args: |
|
audio_feature: tf.Tensor with shape [T, F] |
|
|
|
Returns: |
|
normalized audio features with shape [T, F] |
|
""" |
|
axis = 0 if per_feature else None |
|
mean = tf.reduce_mean(audio_feature, axis=axis) |
|
std_dev = tf.math.reduce_std(audio_feature, axis=axis) + 1e-9 |
|
return (audio_feature - mean) / std_dev |
|
|
|
|
|
def normalize_signal(signal: np.ndarray): |
|
""" Normailize signal to [-1, 1] range """ |
|
gain = 1.0 / (np.max(np.abs(signal)) + 1e-9) |
|
return signal * gain |
|
|
|
|
|
def tf_normalize_signal(signal: tf.Tensor): |
|
""" |
|
TF Normailize signal to [-1, 1] range |
|
Args: |
|
signal: tf.Tensor with shape [None] |
|
|
|
Returns: |
|
normalized signal with shape [None] |
|
""" |
|
gain = 1.0 / (tf.reduce_max(tf.abs(signal), axis=-1) + 1e-9) |
|
return signal * gain |
|
|
|
|
|
def preemphasis(signal: np.ndarray, coeff=0.97): |
|
if not coeff or coeff <= 0.0: |
|
return signal |
|
return np.append(signal[0], signal[1:] - coeff * signal[:-1]) |
|
|
|
|
|
def tf_preemphasis(signal: tf.Tensor, coeff=0.97): |
|
""" |
|
TF Pre-emphasis |
|
Args: |
|
signal: tf.Tensor with shape [None] |
|
coeff: Float that indicates the preemphasis coefficient |
|
|
|
Returns: |
|
pre-emphasized signal with shape [None] |
|
""" |
|
if not coeff or coeff <= 0.0: return signal |
|
s0 = tf.expand_dims(signal[0], axis=-1) |
|
s1 = signal[1:] - coeff * signal[:-1] |
|
return tf.concat([s0, s1], axis=-1) |
|
|
|
|
|
def depreemphasis(signal: np.ndarray, coeff=0.97): |
|
if not coeff or coeff <= 0.0: return signal |
|
x = np.zeros(signal.shape[0], dtype=np.float32) |
|
x[0] = signal[0] |
|
for n in range(1, signal.shape[0], 1): |
|
x[n] = coeff * x[n - 1] + signal[n] |
|
return x |
|
|
|
|
|
def tf_depreemphasis(signal: tf.Tensor, coeff=0.97): |
|
""" |
|
TF Depreemphasis |
|
Args: |
|
signal: tf.Tensor with shape [B, None] |
|
coeff: Float that indicates the preemphasis coefficient |
|
|
|
Returns: |
|
depre-emphasized signal with shape [B, None] |
|
""" |
|
if not coeff or coeff <= 0.0: return signal |
|
|
|
def map_fn(elem): |
|
x = tf.expand_dims(elem[0], axis=-1) |
|
for n in range(1, elem.shape[0], 1): |
|
current = coeff * x[n - 1] + elem[n] |
|
x = tf.concat([x, [current]], axis=0) |
|
return x |
|
|
|
return tf.map_fn(map_fn, signal) |
|
|
|
|
|
class SpeechFeaturizer(metaclass=abc.ABCMeta): |
|
def __init__(self, speech_config: dict): |
|
""" |
|
We should use TFSpeechFeaturizer for training to avoid differences |
|
between tf and librosa when converting to tflite in post-training stage |
|
speech_config = { |
|
"sample_rate": int, |
|
"frame_ms": int, |
|
"stride_ms": int, |
|
"num_feature_bins": int, |
|
"feature_type": str, |
|
"delta": bool, |
|
"delta_delta": bool, |
|
"pitch": bool, |
|
"normalize_signal": bool, |
|
"normalize_feature": bool, |
|
"normalize_per_feature": bool |
|
} |
|
""" |
|
|
|
self.sample_rate = speech_config.get("sample_rate", 16000) |
|
self.frame_length = int(self.sample_rate * (speech_config.get("frame_ms", 25) / 1000)) |
|
self.frame_step = int(self.sample_rate * (speech_config.get("stride_ms", 10) / 1000)) |
|
|
|
self.num_feature_bins = speech_config.get("num_feature_bins", 80) |
|
self.feature_type = speech_config.get("feature_type", "log_mel_spectrogram") |
|
self.preemphasis = speech_config.get("preemphasis", None) |
|
|
|
self.normalize_signal = speech_config.get("normalize_signal", True) |
|
self.normalize_feature = speech_config.get("normalize_feature", True) |
|
self.normalize_per_feature = speech_config.get("normalize_per_feature", False) |
|
|
|
self.mel_filter = None |
|
|
|
@property |
|
def nfft(self) -> int: |
|
""" Number of FFT """ |
|
return 2 ** (self.frame_length - 1).bit_length() |
|
|
|
@property |
|
def shape(self) -> list: |
|
""" The shape of extracted features """ |
|
raise NotImplementedError() |
|
|
|
@abc.abstractclassmethod |
|
def stft(self, signal): |
|
raise NotImplementedError() |
|
|
|
@abc.abstractclassmethod |
|
def power_to_db(self, S, ref=1.0, amin=1e-10, top_db=80.0): |
|
raise NotImplementedError() |
|
|
|
@abc.abstractmethod |
|
def extract(self, signal): |
|
""" Function to perform feature extraction """ |
|
raise NotImplementedError() |
|
|
|
|
|
class NumpySpeechFeaturizer(SpeechFeaturizer): |
|
def __init__(self, speech_config: dict): |
|
super(NumpySpeechFeaturizer, self).__init__(speech_config) |
|
self.delta = speech_config.get("delta", False) |
|
self.delta_delta = speech_config.get("delta_delta", False) |
|
self.pitch = speech_config.get("pitch", False) |
|
|
|
@property |
|
def shape(self) -> list: |
|
|
|
channel_dim = 1 |
|
|
|
if self.delta: |
|
channel_dim += 1 |
|
|
|
if self.delta_delta: |
|
channel_dim += 1 |
|
|
|
if self.pitch: |
|
channel_dim += 1 |
|
|
|
return [None, self.num_feature_bins, channel_dim] |
|
|
|
def stft(self, signal): |
|
return np.square( |
|
np.abs(librosa.core.stft(signal, n_fft=self.nfft, hop_length=self.frame_step, |
|
win_length=self.frame_length, center=True, window="hann"))) |
|
|
|
def power_to_db(self, S, ref=1.0, amin=1e-10, top_db=80.0): |
|
return librosa.power_to_db(S, ref=ref, amin=amin, top_db=top_db) |
|
|
|
def extract(self, signal: np.ndarray) -> np.ndarray: |
|
signal = np.asfortranarray(signal) |
|
if self.normalize_signal: |
|
signal = normalize_signal(signal) |
|
signal = preemphasis(signal, self.preemphasis) |
|
|
|
if self.feature_type == "mfcc": |
|
features = self.compute_mfcc(signal) |
|
elif self.feature_type == "log_mel_spectrogram": |
|
features = self.compute_log_mel_spectrogram(signal) |
|
elif self.feature_type == "spectrogram": |
|
features = self.compute_spectrogram(signal) |
|
elif self.feature_type == "log_gammatone_spectrogram": |
|
features = self.compute_log_gammatone_spectrogram(signal) |
|
else: |
|
raise ValueError("feature_type must be either 'mfcc', " |
|
"'log_mel_spectrogram', 'log_gammatone_spectrogram' " |
|
"or 'spectrogram'") |
|
|
|
if self.normalize_feature: |
|
features = normalize_audio_feature(features, per_feature=self.normalize_per_feature) |
|
|
|
|
|
|
|
return features |
|
|
|
def compute_pitch(self, signal: np.ndarray) -> np.ndarray: |
|
pitches, _ = librosa.core.piptrack( |
|
y=signal, sr=self.sample_rate, |
|
n_fft=self.nfft, hop_length=self.frame_step, |
|
fmin=0.0, fmax=int(self.sample_rate / 2), win_length=self.frame_length, center=True |
|
) |
|
|
|
pitches = pitches.T |
|
|
|
assert self.num_feature_bins <= self.frame_length // 2 + 1, \ |
|
"num_features for spectrogram should \ |
|
be <= (sample_rate * window_size // 2 + 1)" |
|
|
|
return pitches[:, :self.num_feature_bins] |
|
|
|
def compute_spectrogram(self, signal: np.ndarray) -> np.ndarray: |
|
powspec = self.stft(signal) |
|
features = self.power_to_db(powspec.T) |
|
|
|
assert self.num_feature_bins <= self.frame_length // 2 + 1, \ |
|
"num_features for spectrogram should \ |
|
be <= (sample_rate * window_size // 2 + 1)" |
|
|
|
|
|
features = features[:, :self.num_feature_bins] |
|
|
|
return features |
|
|
|
def compute_mfcc(self, signal: np.ndarray) -> np.ndarray: |
|
S = self.stft(signal) |
|
|
|
mel = librosa.filters.mel(self.sample_rate, self.nfft, |
|
n_mels=self.num_feature_bins, |
|
fmin=0.0, fmax=int(self.sample_rate / 2)) |
|
|
|
mel_spectrogram = np.dot(S.T, mel.T) |
|
|
|
mfcc = librosa.feature.mfcc(sr=self.sample_rate, |
|
S=self.power_to_db(mel_spectrogram).T, |
|
n_mfcc=self.num_feature_bins) |
|
|
|
return mfcc.T |
|
|
|
def compute_log_mel_spectrogram(self, signal: np.ndarray) -> np.ndarray: |
|
S = self.stft(signal) |
|
|
|
mel = librosa.filters.mel(self.sample_rate, self.nfft, |
|
n_mels=self.num_feature_bins, |
|
fmin=0.0, fmax=int(self.sample_rate / 2)) |
|
|
|
mel_spectrogram = np.dot(S.T, mel.T) |
|
|
|
return self.power_to_db(mel_spectrogram) |
|
|
|
def compute_log_gammatone_spectrogram(self, signal: np.ndarray) -> np.ndarray: |
|
S = self.stft(signal) |
|
|
|
gammatone = fft_weights(self.nfft, self.sample_rate, |
|
self.num_feature_bins, width=1.0, |
|
fmin=0, fmax=int(self.sample_rate / 2), |
|
maxlen=(self.nfft / 2 + 1)) |
|
|
|
gammatone = gammatone.numpy().astype(np.float32) |
|
|
|
gammatone_spectrogram = np.dot(S.T, gammatone) |
|
|
|
return self.power_to_db(gammatone_spectrogram) |
|
|
|
|
|
class TFSpeechFeaturizer(SpeechFeaturizer): |
|
@property |
|
def shape(self) -> list: |
|
|
|
return [None, self.num_feature_bins, 1] |
|
|
|
def stft(self, signal): |
|
signal = tf.pad(signal, [[self.nfft // 2, self.nfft // 2]], mode="REFLECT") |
|
window = tf.signal.hann_window(self.frame_length, periodic=True) |
|
left_pad = (self.nfft - self.frame_length) // 2 |
|
right_pad = self.nfft - self.frame_length - left_pad |
|
window = tf.pad(window, [[left_pad, right_pad]]) |
|
framed_signals = tf.signal.frame(signal, frame_length=self.nfft, frame_step=self.frame_step) |
|
framed_signals *= window |
|
return tf.square(tf.abs(tf.signal.rfft(framed_signals, [self.nfft]))) |
|
|
|
def power_to_db(self, S, ref=1.0, amin=1e-10, top_db=80.0): |
|
if amin <= 0: |
|
raise ValueError('amin must be strictly positive') |
|
|
|
magnitude = S |
|
|
|
if six.callable(ref): |
|
|
|
ref_value = ref(magnitude) |
|
else: |
|
ref_value = np.abs(ref) |
|
|
|
log_spec = 10.0 * log10(tf.maximum(amin, magnitude)) |
|
log_spec -= 10.0 * log10(tf.maximum(amin, ref_value)) |
|
|
|
if top_db is not None: |
|
if top_db < 0: |
|
raise ValueError('top_db must be non-negative') |
|
log_spec = tf.maximum(log_spec, tf.reduce_max(log_spec) - top_db) |
|
|
|
return log_spec |
|
|
|
def extract(self, signal: np.ndarray) -> np.ndarray: |
|
signal = np.asfortranarray(signal) |
|
features = self.tf_extract(tf.convert_to_tensor(signal, dtype=tf.float32)) |
|
return features.numpy() |
|
|
|
def tf_extract(self, signal: tf.Tensor) -> tf.Tensor: |
|
""" |
|
Extract speech features from signals (for using in tflite) |
|
Args: |
|
signal: tf.Tensor with shape [None] |
|
|
|
Returns: |
|
features: tf.Tensor with shape [T, F] |
|
""" |
|
if self.normalize_signal: |
|
signal = tf_normalize_signal(signal) |
|
signal = tf_preemphasis(signal, self.preemphasis) |
|
|
|
if self.feature_type == "spectrogram": |
|
features = self.compute_spectrogram(signal) |
|
elif self.feature_type == "log_mel_spectrogram": |
|
features = self.compute_log_mel_spectrogram(signal) |
|
elif self.feature_type == "mfcc": |
|
features = self.compute_mfcc(signal) |
|
elif self.feature_type == "log_gammatone_spectrogram": |
|
features = self.compute_log_gammatone_spectrogram(signal) |
|
else: |
|
raise ValueError("feature_type must be either 'mfcc'," |
|
"'log_mel_spectrogram' or 'spectrogram'") |
|
|
|
if self.normalize_feature: |
|
features = tf_normalize_audio_features( |
|
features, per_feature=self.normalize_per_feature) |
|
|
|
|
|
|
|
return features |
|
|
|
def compute_log_mel_spectrogram(self, signal): |
|
spectrogram = self.stft(signal) |
|
if self.mel_filter is None: |
|
linear_to_weight_matrix = tf.signal.linear_to_mel_weight_matrix( |
|
num_mel_bins=self.num_feature_bins, |
|
num_spectrogram_bins=spectrogram.shape[-1], |
|
sample_rate=self.sample_rate, |
|
lower_edge_hertz=0.0, upper_edge_hertz=(self.sample_rate / 2) |
|
) |
|
else: |
|
linear_to_weight_matrix = self.mel_filter |
|
|
|
mel_spectrogram = tf.tensordot(spectrogram, linear_to_weight_matrix, 1) |
|
return self.power_to_db(mel_spectrogram) |
|
|
|
def compute_spectrogram(self, signal): |
|
S = self.stft(signal) |
|
spectrogram = self.power_to_db(S) |
|
return spectrogram[:, :self.num_feature_bins] |
|
|
|
def compute_mfcc(self, signal): |
|
log_mel_spectrogram = self.compute_log_mel_spectrogram(signal) |
|
return tf.signal.mfccs_from_log_mel_spectrograms(log_mel_spectrogram) |
|
|
|
def compute_log_gammatone_spectrogram(self, signal: np.ndarray) -> np.ndarray: |
|
S = self.stft(signal) |
|
|
|
gammatone = fft_weights(self.nfft, self.sample_rate, |
|
self.num_feature_bins, width=1.0, |
|
fmin=0, fmax=int(self.sample_rate / 2), |
|
maxlen=(self.nfft / 2 + 1)) |
|
|
|
gammatone_spectrogram = tf.tensordot(S, gammatone, 1) |
|
|
|
return self.power_to_db(gammatone_spectrogram) |
|
|
|
def set_mel_filter(self, librosa_mel_filter): |
|
""" |
|
Set librosa mel filter. |
|
""" |
|
self.mel_filter = librosa_mel_filter |
|
|