#!/usr/bin/env python3 # -*- encoding: utf-8 -*- # Copyright FunASR (https://github.com/alibaba-damo-academy/FunASR). All Rights Reserved. # MIT License (https://opensource.org/licenses/MIT) # Modified from 3D-Speaker (https://github.com/alibaba-damo-academy/3D-Speaker) import scipy import torch import sklearn import hdbscan import numpy as np from sklearn.cluster._kmeans import k_means class SpectralCluster: r"""A spectral clustering mehtod using unnormalized Laplacian of affinity matrix. This implementation is adapted from https://github.com/speechbrain/speechbrain. """ def __init__(self, min_num_spks=1, max_num_spks=15, pval=0.022): self.min_num_spks = min_num_spks self.max_num_spks = max_num_spks self.pval = pval def __call__(self, X, oracle_num=None): # Similarity matrix computation sim_mat = self.get_sim_mat(X) # Refining similarity matrix with pval prunned_sim_mat = self.p_pruning(sim_mat) # Symmetrization sym_prund_sim_mat = 0.5 * (prunned_sim_mat + prunned_sim_mat.T) # Laplacian calculation laplacian = self.get_laplacian(sym_prund_sim_mat) # Get Spectral Embeddings emb, num_of_spk = self.get_spec_embs(laplacian, oracle_num) # Perform clustering labels = self.cluster_embs(emb, num_of_spk) return labels def get_sim_mat(self, X): # Cosine similarities M = sklearn.metrics.pairwise.cosine_similarity(X, X) return M def p_pruning(self, A): if A.shape[0] * self.pval < 6: pval = 6.0 / A.shape[0] else: pval = self.pval n_elems = int((1 - pval) * A.shape[0]) # For each row in a affinity matrix for i in range(A.shape[0]): low_indexes = np.argsort(A[i, :]) low_indexes = low_indexes[0:n_elems] # Replace smaller similarity values by 0s A[i, low_indexes] = 0 return A def get_laplacian(self, M): M[np.diag_indices(M.shape[0])] = 0 D = np.sum(np.abs(M), axis=1) D = np.diag(D) L = D - M return L def get_spec_embs(self, L, k_oracle=None): lambdas, eig_vecs = scipy.linalg.eigh(L) if k_oracle is not None: num_of_spk = k_oracle else: lambda_gap_list = self.getEigenGaps( lambdas[self.min_num_spks - 1 : self.max_num_spks + 1] ) num_of_spk = np.argmax(lambda_gap_list) + self.min_num_spks emb = eig_vecs[:, :num_of_spk] return emb, num_of_spk def cluster_embs(self, emb, k): _, labels, _ = k_means(emb, k) return labels def getEigenGaps(self, eig_vals): eig_vals_gap_list = [] for i in range(len(eig_vals) - 1): gap = float(eig_vals[i + 1]) - float(eig_vals[i]) eig_vals_gap_list.append(gap) return eig_vals_gap_list class UmapHdbscan: r""" Reference: - Siqi Zheng, Hongbin Suo. Reformulating Speaker Diarization as Community Detection With Emphasis On Topological Structure. ICASSP2022 """ def __init__( self, n_neighbors=20, n_components=60, min_samples=10, min_cluster_size=10, metric="cosine", ): self.n_neighbors = n_neighbors self.n_components = n_components self.min_samples = min_samples self.min_cluster_size = min_cluster_size self.metric = metric def __call__(self, X): import umap.umap_ as umap umap_X = umap.UMAP( n_neighbors=self.n_neighbors, min_dist=0.0, n_components=min(self.n_components, X.shape[0] - 2), metric=self.metric, ).fit_transform(X) labels = hdbscan.HDBSCAN( min_samples=self.min_samples, min_cluster_size=self.min_cluster_size, allow_single_cluster=True, ).fit_predict(umap_X) return labels class ClusterBackend(torch.nn.Module): r"""Perfom clustering for input embeddings and output the labels. Args: model_dir: A model dir. model_config: The model config. """ def __init__(self): super().__init__() self.model_config = {"merge_thr": 0.78} # self.other_config = kwargs self.spectral_cluster = SpectralCluster() self.umap_hdbscan_cluster = UmapHdbscan() def forward(self, X, **params): # clustering and return the labels k = params["oracle_num"] if "oracle_num" in params else None assert ( len(X.shape) == 2 ), "modelscope error: the shape of input should be [N, C]" if X.shape[0] < 20: return np.zeros(X.shape[0], dtype="int") if X.shape[0] < 2048 or k is not None: # unexpected corner case labels = self.spectral_cluster(X, k) else: labels = self.umap_hdbscan_cluster(X) if k is None and "merge_thr" in self.model_config: labels = self.merge_by_cos(labels, X, self.model_config["merge_thr"]) return labels def merge_by_cos(self, labels, embs, cos_thr): # merge the similar speakers by cosine similarity assert cos_thr > 0 and cos_thr <= 1 while True: spk_num = labels.max() + 1 if spk_num == 1: break spk_center = [] for i in range(spk_num): spk_emb = embs[labels == i].mean(0) spk_center.append(spk_emb) assert len(spk_center) > 0 spk_center = np.stack(spk_center, axis=0) norm_spk_center = spk_center / np.linalg.norm( spk_center, axis=1, keepdims=True ) affinity = np.matmul(norm_spk_center, norm_spk_center.T) affinity = np.triu(affinity, 1) spks = np.unravel_index(np.argmax(affinity), affinity.shape) if affinity[spks] < cos_thr: break for i in range(len(labels)): if labels[i] == spks[1]: labels[i] = spks[0] elif labels[i] > spks[1]: labels[i] -= 1 return labels