Spaces:
Runtime error
Runtime error
import os | |
from pathlib import Path | |
from typing import Union | |
import kaldi_native_fbank as knf | |
import numpy as np | |
import onnxruntime | |
from paraformer.runtime.python.utils.audioHelper import AudioReader | |
from paraformer.runtime.python.utils.singleton import singleton | |
class Campplus: | |
def __init__(self, onnx_path=None, threshold=0.5): | |
""" | |
:param onnx_path: onnx model file path | |
:param threshold: threshold of speaker embedding similarity | |
""" | |
self.onnx = onnx_path or os.path.join( | |
os.path.dirname( | |
os.path.dirname( | |
os.path.dirname(os.path.dirname(os.path.dirname(__file__))) | |
) | |
), | |
"onnx/sv/campplus.onnx", | |
) | |
cpu_ep = "CPUExecutionProvider" | |
cpu_provider_options = { | |
"arena_extend_strategy": "kSameAsRequested", | |
} | |
self.sess = onnxruntime.InferenceSession( | |
self.onnx, | |
providers=[ | |
(cpu_ep, cpu_provider_options), | |
], | |
) | |
self.output_name = [nd.name for nd in self.sess.get_outputs()] | |
self.threshhold = threshold | |
self.memory: np.ndarray = None | |
def compute_cos_similarity(self, emb): | |
assert len(emb.shape) == 2, "emb must be length * 80" | |
cos_sim = emb.dot(self.memory.T) / ( | |
np.linalg.norm(emb) * np.linalg.norm(self.memory, axis=1) | |
) | |
cos_sim[np.isneginf(cos_sim)] = 0 | |
return 0.5 + 0.5 * cos_sim | |
def register_speaker(self, emb: np.ndarray): | |
""" | |
register speaker with embedding and name | |
:param emb: | |
:param name: speaker name | |
:return: | |
""" | |
assert len(emb.shape) == 2, "emb must be length * 80" | |
self.memory = np.concatenate( | |
( | |
self.memory, | |
emb, | |
) | |
) | |
def extract_feature(self, audio: Union[str, Path, bytes], sample_rate=16000): | |
if isinstance(audio, str) or isinstance(audio, Path): | |
waveform, sample_rate = AudioReader.read_wav_file(audio) | |
elif isinstance(audio, np.ndarray): | |
waveform = audio | |
opts = knf.FbankOptions() | |
opts.frame_opts.samp_freq = float(sample_rate) | |
opts.frame_opts.dither = 0.0 | |
opts.energy_floor = 1.0 | |
opts.mel_opts.num_bins = 80 | |
fbank_fn = knf.OnlineFbank(opts) | |
fbank_fn.accept_waveform(sample_rate, waveform.tolist()) | |
frames = fbank_fn.num_frames_ready | |
mat = np.empty([frames, opts.mel_opts.num_bins]) | |
for i in range(frames): | |
mat[i, :] = fbank_fn.get_frame(i) | |
feature = mat.astype(np.float32) | |
feature = feature - feature.mean() | |
feature = feature[None, ...] | |
return feature | |
def embedding(self, feature: np.ndarray): | |
feed_dict = {"fbank": feature} | |
output = self.sess.run(self.output_name, input_feed=feed_dict) | |
return output | |
def recognize(self, waveform: Union[str, Path, bytes], threshold=0.65): | |
""" | |
auto register speaker with input waveform。 | |
input waveform, output speaker id , id in range 0,1,2,....,n | |
:param waveform: | |
:return index: if max similarity less than threshold, it will add current emb into memory | |
""" | |
feature = self.extract_feature(waveform) | |
emb = self.embedding(feature)[0] | |
if self.memory is None: | |
self.memory = emb / np.linalg.norm(emb) | |
return 0 | |
sim = self.compute_cos_similarity(emb)[0] | |
max_sim_index = np.argmax(sim) | |
if sim[max_sim_index] <= threshold: | |
self.register_speaker(emb) | |
return max_sim_index | |