|
"""The Sampeling class serve as a helper module for retriving subject model data""" |
|
from abc import ABC, abstractmethod |
|
|
|
import os |
|
import gc |
|
import time |
|
|
|
import numpy as np |
|
from sklearn.neighbors import NearestNeighbors |
|
from scipy.special import gamma |
|
import math |
|
from pynndescent import NNDescent |
|
from sklearn.cluster import KMeans |
|
|
|
|
|
""" |
|
DataContainder module |
|
1. calculate information entropy for singel sample and subset |
|
2. sample informative subset |
|
""" |
|
class SampelingAbstractClass(ABC): |
|
|
|
def __init__(self, data_provider, epoch): |
|
self.mode = "abstract" |
|
self.data_provider = data_provider |
|
|
|
self.epoch = epoch |
|
|
|
@abstractmethod |
|
def info_calculator(self): |
|
pass |
|
|
|
class Sampleing(SampelingAbstractClass): |
|
def __init__(self,data_provider, epoch, device): |
|
self.data_provider = data_provider |
|
|
|
self.epoch = epoch |
|
self.DEVICE = device |
|
|
|
|
|
def probability_density_cal(self,X,dim,k): |
|
""" |
|
calculate the probability of each sample |
|
:param data: numpy.ndarray |
|
:param k: nearest neibour number |
|
:return: probability, numpy.ndarray |
|
""" |
|
|
|
|
|
print("start calculate the nbrs") |
|
|
|
|
|
|
|
""" |
|
construct a vietoris-rips complex |
|
""" |
|
|
|
n_trees = min(64, 5 + int(round((X.shape[0]) ** 0.5 / 20.0))) |
|
|
|
n_iters = max(5, int(round(np.log2(X.shape[0])))) |
|
|
|
metric = "euclidean" |
|
|
|
|
|
nnd = NNDescent( |
|
X, |
|
n_neighbors=k, |
|
metric=metric, |
|
n_trees=n_trees, |
|
n_iters=n_iters, |
|
max_candidates=60, |
|
verbose=True |
|
) |
|
indicates, distances = nnd.neighbor_graph |
|
print("finish calculate the nbrs") |
|
pred = self.data_provider.get_pred(self.epoch,X) |
|
d = dim |
|
volumes = [] |
|
variances = [] |
|
r_col = [] |
|
print("start calculate the volumes and variances") |
|
for i in range(len(X)): |
|
r = distances[i, -1] |
|
V = (math.pi**(d/2) / gamma(d/2 + 1)) * (r**d) +1e-8 |
|
volumes.append(V) |
|
r_col.append(r) |
|
|
|
neighbor_indices = indicates[i] |
|
neighbor_preds = pred[neighbor_indices] |
|
|
|
flatten_preds = neighbor_preds.flatten() |
|
|
|
|
|
variance = np.var(flatten_preds) |
|
|
|
|
|
variances.append(variance) |
|
|
|
print("finsih calculate the volumes and variances") |
|
|
|
n = len(X) |
|
probabilities = [] |
|
for i in range(len(X)): |
|
|
|
p = k / (n * (r_col[i]) * (variances[i])) |
|
probabilities.append(p) |
|
|
|
return probabilities,volumes,variances,r_col |
|
|
|
|
|
def info_calculator(self): |
|
data = self.data_provider.train_representation(self.epoch) |
|
|
|
|
|
|
|
|
|
def clustering(self,data, n_clusters): |
|
|
|
kmeans = KMeans(n_clusters=n_clusters, random_state=0) |
|
|
|
kmeans.fit(data) |
|
|
|
|
|
labels = kmeans.labels_ |
|
|
|
|
|
centers = kmeans.cluster_centers_ |
|
|
|
return labels,centers |
|
|
|
def space_split(self, data): |
|
self.pred = self.data_provider.get_pred(self.epoch, data) |
|
cluster_idx = 10 |
|
print("clustering....") |
|
labels, centers = self.clustering(data, n_clusters=cluster_idx) |
|
print("clustering finfished") |
|
|
|
for i in range(10): |
|
subset_indices = np.where(labels == i)[0] |
|
subset = data[subset_indices] |
|
info = self.subset_info_cal(subset, centers[i]) |
|
|
|
|
|
print("info",info) |
|
if info > 0: |
|
labels, new_cluster_idx = self.split(data,subset_indices, labels, cluster_idx) |
|
cluster_idx = new_cluster_idx |
|
|
|
return labels |
|
|
|
def split(self, org_data,indices, labels, cluster_idx, m=1.5, n=1.5): |
|
data = org_data[indices] |
|
print("data.shape",data.shape) |
|
|
|
|
|
sub_labels, centers = self.clustering(data, n_clusters=2) |
|
|
|
|
|
info = [] |
|
for i in range(2): |
|
subset_indices = indices[sub_labels == i] |
|
subset = data[sub_labels == i] |
|
info_i = self.subset_info_cal(subset, centers[i], m, n) |
|
info.append(info_i) |
|
|
|
|
|
for i in range(2): |
|
if info[i] > 0: |
|
subset_indices = indices[sub_labels == i] |
|
labels, cluster_idx = self.split(org_data, subset_indices, labels, cluster_idx+1, m, n) |
|
else: |
|
subset_indices = indices[sub_labels == i] |
|
labels[subset_indices] = cluster_idx |
|
cluster_idx += 1 |
|
|
|
return labels, cluster_idx |
|
|
|
|
|
def subset_info_cal(self,data,center_data,m=1.5,n=1.5): |
|
""" |
|
use infomration theroy quintify the information of each subset |
|
information = - log(p(d < m)) - log(p(a < n)) |
|
""" |
|
|
|
dists = np.sqrt(np.sum((data - center_data)**2, axis=1)) |
|
preds = self.data_provider.get_pred(self.epoch, data) |
|
pred_i = self.data_provider.get_pred(self.epoch, np.array([center_data])) |
|
|
|
diffs = np.abs(preds - pred_i[0]) |
|
|
|
|
|
p_d = np.mean(dists < m) + 1e-8 |
|
|
|
p_a = np.mean(diffs < n) + 1e-8 |
|
|
|
print("p_d",p_d, "p_a",p_a) |
|
|
|
info = -np.log(p_d) - np.log(p_a) |
|
return info |
|
|
|
def sample_data(self, data, sample_ratio=0.2): |
|
all_indices = [] |
|
labels = self.space_split(data) |
|
unique_labels = np.unique(labels) |
|
|
|
for label in unique_labels: |
|
indices = np.where(labels == label)[0] |
|
sample_size = int(len(indices) * sample_ratio) |
|
if sample_size == 0 and len(indices) > 0: |
|
sample_size = len(indices) |
|
sampled_indices = np.random.choice(indices, size=sample_size, replace=False) |
|
all_indices.append(sampled_indices) |
|
|
|
return np.concatenate(all_indices) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|