| | """ |
| | Help functions to evaluate our visualization system |
| | """ |
| |
|
| | import numpy as np |
| | from pynndescent import NNDescent |
| | from sklearn.neighbors import NearestNeighbors |
| | from sklearn.manifold import trustworthiness |
| | from scipy.stats import kendalltau, spearmanr, pearsonr, rankdata |
| |
|
| |
|
| | def evaluate_isAlign(embeddingLeft, embeddingRight, align_metric=1): |
| | lens = len(embeddingLeft) |
| | align_indices = [] |
| | for i in range(lens): |
| | dist = np.linalg.norm( embeddingLeft[i]-embeddingRight[i]) |
| | if dist < align_metric: |
| | align_indices.append(i) |
| | return align_indices |
| |
|
| |
|
| | def evaluate_isAlign_single(embeddingLeft, embeddingRight, selected_left, selected_right,align_metric=1): |
| | lens = len(embeddingLeft) |
| | align_indices_left = [] |
| | align_indices_right = [] |
| |
|
| | if selected_left != -1: |
| | for i in range(lens): |
| | dist = np.linalg.norm( embeddingLeft[selected_left]-embeddingRight[i]) |
| | if dist < align_metric: |
| | align_indices_right.append(i) |
| | if selected_right != -1: |
| | for i in range(lens): |
| | dist = np.linalg.norm( embeddingLeft[i]-embeddingRight[selected_right]) |
| | if dist < align_metric: |
| | align_indices_left.append(i) |
| |
|
| | return align_indices_left, align_indices_right |
| | |
| | def evaluate_isNearestNeighbour(embeddingLeft, embeddingRight, n_neighbors=15, metric="euclidean"): |
| | """ |
| | Find indices where none of the nearest neighbors in embeddingLeft are preserved in embeddingRight. |
| | :param embeddingLeft: ndarray, first set of low dimensional representations |
| | :param embeddingRight: ndarray, second set of low dimensional representations |
| | :param n_neighbors: int, number of nearest neighbors to consider |
| | :param metric: str, metric for nearest neighbor calculation, default "euclidean" |
| | :return: list of indices where none of the neighbors are preserved |
| | """ |
| | n_trees = 5 + int(round((embeddingLeft.shape[0]) ** 0.5 / 20.0)) |
| | n_iters = max(5, int(round(np.log2(embeddingLeft.shape[0])))) |
| |
|
| | nnd_left = NNDescent( |
| | embeddingLeft, |
| | n_neighbors=n_neighbors, |
| | metric=metric, |
| | n_trees=n_trees, |
| | n_iters=n_iters, |
| | max_candidates=60, |
| | verbose=True |
| | ) |
| | left_neighbors, _ = nnd_left.neighbor_graph |
| |
|
| | nnd_right = NNDescent( |
| | embeddingRight, |
| | n_neighbors=n_neighbors, |
| | metric=metric, |
| | n_trees=n_trees, |
| | n_iters=n_iters, |
| | max_candidates=60, |
| | verbose=True |
| | ) |
| | right_neighbors, _ = nnd_right.neighbor_graph |
| |
|
| | non_preserved_indices = [] |
| | for i in range(len(embeddingLeft)): |
| | if len(np.intersect1d(left_neighbors[i], right_neighbors[i])) == 0: |
| | non_preserved_indices.append(i) |
| |
|
| | return non_preserved_indices |
| |
|
| | def evaluate_isNearestNeighbour_single(embeddingLeft, embeddingRight, selected_left, selected_right, n_neighbors=15, metric="euclidean"): |
| |
|
| | n_trees = 5 + int(round((embeddingLeft.shape[0]) ** 0.5 / 20.0)) |
| | n_iters = max(5, int(round(np.log2(embeddingLeft.shape[0])))) |
| |
|
| | nnd_left = NNDescent( |
| | embeddingLeft, |
| | n_neighbors=n_neighbors, |
| | metric=metric, |
| | n_trees=n_trees, |
| | n_iters=n_iters, |
| | max_candidates=60, |
| | verbose=True |
| | ) |
| | left_neighbors, _ = nnd_left.neighbor_graph |
| |
|
| | nnd_right = NNDescent( |
| | embeddingRight, |
| | n_neighbors=n_neighbors, |
| | metric=metric, |
| | n_trees=n_trees, |
| | n_iters=n_iters, |
| | max_candidates=60, |
| | verbose=True |
| | ) |
| | right_neighbors, _ = nnd_right.neighbor_graph |
| |
|
| | nearest_neighbors_leftE_rightS = [] |
| | nearest_neighbors_rightE_rightS = [] |
| | nearest_neighbors_leftE_leftS = [] |
| | nearest_neighbors_rightE_leftS = [] |
| | if selected_left != -1: |
| | nearest_neighbors_leftE_leftS = left_neighbors[selected_left].tolist() |
| | nearest_neighbors_rightE_leftS = right_neighbors[selected_left].tolist() |
| |
|
| | if selected_right != -1: |
| |
|
| | nearest_neighbors_leftE_rightS = left_neighbors[selected_right].tolist() |
| | nearest_neighbors_rightE_rightS = right_neighbors[selected_right].tolist() |
| |
|
| | return nearest_neighbors_leftE_leftS, nearest_neighbors_leftE_rightS, nearest_neighbors_rightE_leftS, nearest_neighbors_rightE_rightS |
| |
|
| | def evaluate_proj_nn_perseverance_knn(data, embedding, n_neighbors, metric="euclidean", selected_indices=None): |
| | """ |
| | evaluate projection function, nn preserving property using knn algorithm |
| | :param data: ndarray, high dimensional representations |
| | :param embedding: ndarray, low dimensional representations |
| | :param n_neighbors: int, the number of neighbors |
| | :param metric: str, by default "euclidean" |
| | :return nn property: float, nn preserving property |
| | """ |
| | n_trees = 5 + int(round((data.shape[0]) ** 0.5 / 20.0)) |
| | n_iters = max(5, int(round(np.log2(data.shape[0])))) |
| | |
| | nnd = NNDescent( |
| | data, |
| | n_neighbors=n_neighbors, |
| | metric=metric, |
| | n_trees=n_trees, |
| | n_iters=n_iters, |
| | max_candidates=60, |
| | verbose=True |
| | ) |
| | high_ind, _ = nnd.neighbor_graph |
| | nnd = NNDescent( |
| | embedding, |
| | n_neighbors=n_neighbors, |
| | metric=metric, |
| | n_trees=n_trees, |
| | n_iters=n_iters, |
| | max_candidates=60, |
| | verbose=True |
| | ) |
| | low_ind, _ = nnd.neighbor_graph |
| | lens_data = len(data) |
| |
|
| | if selected_indices: |
| | lens_data = len(selected_indices) |
| | high_ind = high_ind[selected_indices] |
| | low_ind = low_ind[selected_indices] |
| |
|
| | border_pres = np.zeros(lens_data) |
| | for i in range(lens_data): |
| | border_pres[i] = len(np.intersect1d(high_ind[i], low_ind[i])) |
| |
|
| | |
| | return border_pres.mean() |
| |
|
| | from sklearn.metrics import pairwise_distances |
| | import numpy as np |
| |
|
| | def subset_trustworthiness(X, X_subset, X_embedded, X_subset_embedded, n_neighbors=5): |
| | |
| | dist_X = pairwise_distances(X) |
| | dist_X_embedded = pairwise_distances(X_embedded) |
| |
|
| | |
| | subset_indices = np.where(np.isin(X, X_subset))[0] |
| |
|
| | |
| | neighbors_X = np.argsort(dist_X)[:, 1:n_neighbors+1] |
| | subset_neighbors_X = neighbors_X[subset_indices] |
| |
|
| | |
| | neighbors_X_embedded = np.argsort(dist_X_embedded)[:, 1:n_neighbors+1] |
| | subset_neighbors_X_embedded = neighbors_X_embedded[subset_indices] |
| |
|
| | |
| | trustworthiness_val = 0 |
| | for i, subset_index in enumerate(subset_indices): |
| | for j, neighbor in enumerate(subset_neighbors_X_embedded[i]): |
| | rank_in_X = np.where(neighbors_X[subset_index] == neighbor)[0][0] |
| | if rank_in_X >= n_neighbors: |
| | trustworthiness_val += rank_in_X - n_neighbors + 1 |
| |
|
| | trustworthiness_val = trustworthiness_val / (X_subset.shape[0] * n_neighbors) |
| | trustworthiness_val = 1 - (2. / n_neighbors) * trustworthiness_val |
| |
|
| | return trustworthiness_val |
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def evaluate_proj_nn_perseverance_trustworthiness(data, embedding, n_neighbors, metric="euclidean"): |
| | """ |
| | evaluate projection function, nn preserving property using trustworthiness formula |
| | :param data: ndarray, high dimensional representations |
| | :param embedding: ndarray, low dimensional representations |
| | :param n_neighbors: int, the number of neighbors |
| | :param metric: str, by default "euclidean" |
| | :return nn property: float, nn preserving property |
| | """ |
| | t = trustworthiness(data, embedding, n_neighbors=n_neighbors, metric=metric) |
| | return t |
| |
|
| |
|
| | def evaluate_proj_boundary_perseverance_knn(data, embedding, high_centers, low_centers, n_neighbors): |
| | """ |
| | evaluate projection function, boundary preserving property |
| | :param data: ndarray, high dimensional representations |
| | :param embedding: ndarray, low dimensional representations |
| | :param high_centers: ndarray, border points high dimensional representations |
| | :param low_centers: ndarray, border points low dimensional representations |
| | :param n_neighbors: int, the number of neighbors |
| | :return boundary preserving property: float,boundary preserving property |
| | """ |
| | high_neigh = NearestNeighbors(n_neighbors=n_neighbors, radius=0.4) |
| | high_neigh.fit(high_centers) |
| | high_ind = high_neigh.kneighbors(data, n_neighbors=n_neighbors, return_distance=False) |
| |
|
| | low_neigh = NearestNeighbors(n_neighbors=n_neighbors, radius=0.4) |
| | low_neigh.fit(low_centers) |
| | low_ind = low_neigh.kneighbors(embedding, n_neighbors=n_neighbors, return_distance=False) |
| |
|
| | border_pres = np.zeros(len(data)) |
| | for i in range(len(data)): |
| | border_pres[i] = len(np.intersect1d(high_ind[i], low_ind[i])) |
| |
|
| | |
| | return border_pres.mean() |
| |
|
| |
|
| | def evaluate_proj_temporal_perseverance_corr(alpha, delta_x): |
| | """ |
| | Evaluate temporal preserving property, |
| | calculate the correlation between neighbor preserving rate and moving distance in low dim in a time sequence |
| | :param alpha: ndarray, shape(N,) neighbor preserving rate |
| | :param delta_x: ndarray, shape(N,), moved distance in low dim for each point |
| | :return corr: ndarray, shape(N,), correlation for each point from temporal point of view |
| | """ |
| | alpha = alpha.T |
| | delta_x = delta_x.T |
| | shape = alpha.shape |
| | data_num = shape[0] |
| | corr = np.zeros(data_num) |
| | for i in range(data_num): |
| | |
| | correlation, pvalue = pearsonr(alpha[i], delta_x[i]) |
| | if np.isnan(correlation): |
| | correlation = 0.0 |
| | corr[i] = correlation |
| | return corr.mean(), corr.std() |
| |
|
| |
|
| | def evaluate_inv_distance(data, inv_data): |
| | """ |
| | The distance between original data and reconstruction data |
| | :param data: ndarray, high dimensional data |
| | :param inv_data: ndarray, reconstruction data |
| | :return err: mse, reconstruction error |
| | """ |
| | return np.linalg.norm(data-inv_data, axis=1).mean() |
| |
|
| |
|
| | def evaluate_inv_accu(labels, pred): |
| | """ |
| | prediction accuracy of reconstruction data |
| | :param labels: ndarray, shape(N,), label for each point |
| | :param pred: ndarray, shape(N,), prediction for each point |
| | :return accu: float, the reconstruction accuracy |
| | """ |
| | return np.sum(labels == pred) / len(labels) |
| |
|
| |
|
| | def evaluate_inv_conf(labels, ori_pred, new_pred): |
| | """ |
| | the confidence difference between original data and reconstruction data |
| | :param labels: ndarray, shape(N,), the original prediction for each point |
| | :param ori_pred: ndarray, shape(N,10), the prediction of original data |
| | :param new_pred: ndarray, shape(N,10), the prediction of reconstruction data |
| | :return diff: float, the mean of confidence difference for each point |
| | """ |
| | old_conf = [ori_pred[i, labels[i]] for i in range(len(labels))] |
| | new_conf = [new_pred[i, labels[i]] for i in range(len(labels))] |
| | old_conf = np.array(old_conf) |
| | new_conf = np.array(new_conf) |
| |
|
| | diff = np.abs(old_conf - new_conf) |
| | |
| | return diff.mean() |
| |
|
| |
|
| | def evaluate_proj_temporal_perseverance_entropy(alpha, delta_x): |
| | """ |
| | (discard) |
| | calculate the temporal preserving property |
| | based on the correlation between the entropy of moved distance and neighbor preserving rate(alpha) |
| | :param alpha: ndarray, shape(N,), neighbor preserving rate for each point |
| | :param delta_x: ndarray, shape(N,), the moved distance in low dim for each point |
| | :return corr: float, the mean of all correlation |
| | """ |
| | alpha = alpha.T |
| | delta_x = delta_x.T |
| | shape = alpha.shape |
| | data_num = shape[0] |
| | |
| | |
| | |
| | delta_x_norm = delta_x.max() |
| | delta_x_norm = delta_x / delta_x_norm |
| |
|
| | alpha = np.floor(alpha*10) |
| | delta_x_norm = np.floor(delta_x_norm*10) |
| |
|
| | corr = np.zeros(len(alpha)) |
| | |
| | for i in range(len(alpha)): |
| | |
| | index = list() |
| | entropy = list() |
| | for j in range(11): |
| | dx = delta_x_norm[i][np.where(alpha[i] == j)] |
| | entropy_x = np.zeros(11) |
| | for k in range(11): |
| | entropy_x[k] = np.sum(dx == k) |
| | if np.sum(entropy_x) == 0: |
| | continue |
| | else: |
| | entropy_x = entropy_x / np.sum(entropy_x+10e-8) |
| | entropy_x = np.sum(entropy_x*np.log(entropy_x+10e-8)) |
| | entropy.append(-entropy_x) |
| | index.append(j) |
| | if len(index) < 2: |
| | print("no enough data to form a correlation, setting correlation to be 0") |
| | corr[i] = 0 |
| | else: |
| | correlation, _ = pearsonr(index, entropy) |
| | corr[i] = correlation |
| |
|
| | return corr.mean() |
| |
|
| |
|
| | def evaluate_proj_temporal_global_corr(high_rank, low_rank): |
| | l = len(high_rank) |
| | tau_l = np.zeros(l) |
| | p_l = np.zeros(l) |
| | for i in range(l): |
| | r1 = high_rank[i] |
| | r2 = low_rank[i] |
| | tau, p = spearmanr(r1, r2) |
| | tau_l[i] = tau |
| | p_l[i] = p |
| | return tau_l, p_l |
| |
|
| |
|
| | def _wcov(x, y, w, ms): |
| | return np.sum(w * (x - ms[0]) * (y - ms[1])) |
| | def _wpearson(x, y, w): |
| | mx, my = (np.sum(i * w) / np.sum(w) for i in [x, y]) |
| | return _wcov(x, y, w, [mx, my]) / np.sqrt(_wcov(x, x, w, [mx, mx]) * _wcov(y, y, w, [my, my])) |
| | def evaluate_proj_temporal_weighted_global_corr(high_rank, low_rank): |
| | k = len(high_rank) |
| | r = rankdata(high_rank).astype("int")-1 |
| | tau = _wpearson(high_rank[r], low_rank[r], 1/np.arange(1, k+1)) |
| | return tau |
| |
|
| |
|
| | def evaluate_keep_B(low_B, grid_view, decision_view, threshold=0.8): |
| | """ |
| | evaluate whether high dimensional boundary points still lying on Boundary in low-dimensional space or not |
| | find the nearest grid point of boundary points, and check whether the color of corresponding grid point is white or not |
| | |
| | :param low_B: ndarray, (n, 2), low dimension position of boundary points |
| | :param grid_view: ndarray, (resolution^2, 2), the position array of grid points |
| | :param decision_view: ndarray, (resolution^2, 3), the RGB color of grid points |
| | :param threshold: |
| | :return: |
| | """ |
| | if len(low_B) == 0 or low_B is None: |
| | return .0 |
| | |
| | grid_view = grid_view.reshape(-1, 2) |
| | decision_view = decision_view.reshape(-1, 3) |
| |
|
| | |
| | nbs = NearestNeighbors(n_neighbors=1, algorithm="ball_tree").fit(grid_view) |
| | _, indices = nbs.kneighbors(low_B) |
| | indices = indices.squeeze() |
| | sample_colors = decision_view[indices] |
| |
|
| | |
| | c1 = np.zeros(indices.shape[0], dtype=np.bool) |
| | c1[sample_colors[:, 0] > threshold] = 1 |
| |
|
| | c2 = np.zeros(indices.shape[0], dtype=np.bool) |
| | c2[sample_colors[:, 1] > threshold] = 1 |
| |
|
| | c3 = np.zeros(indices.shape[0], dtype=np.bool) |
| | c3[sample_colors[:, 2] > threshold] = 1 |
| | c = np.logical_and(c1, c2) |
| | c = np.logical_and(c, c3) |
| |
|
| | |
| | return np.sum(c)/len(c) |