"""
Metrics.
"""
from joblib import cpu_count, parallel_backend, Parallel, delayed
import numpy as np
import pandas as pd
from typing import Dict, Iterable, Tuple, Any
from scipy.stats import chi2
from scipy.special import binom
from sklearn.metrics import (
normalized_mutual_info_score, recall_score, precision_score, auc
)
from networkx import shortest_path_length
from scipy.stats import pearsonr
from sklearn.metrics.pairwise import pairwise_distances
from cassiopeia.data import CassiopeiaTree
from .utils import rescale
from ..pp.kNN import kNN_graph
##
def chunker(n):
"""
Create an np.array of starting indeces for parallel computation.
"""
n_jobs = cpu_count()
starting_indeces = np.zeros(n_jobs + 1, dtype=int)
quotient = n // n_jobs
remainder = n % n_jobs
for i in range(n_jobs):
starting_indeces[i+1] = starting_indeces[i] + quotient + (1 if i < remainder else 0)
return starting_indeces
##
def kbet_one_chunk(index, batch, null_dist):
"""
kBET calculation for a single index chunk.
"""
dof = null_dist.size-1
n = index.shape[0]
k = index.shape[1]-1
results = np.zeros((n, 2))
for i in range(n):
observed_counts = (
pd.Series(batch[index[i,:]]).value_counts(sort=False).values
)
expected_counts = null_dist * k
stat = np.sum(
np.divide(
np.square(np.subtract(observed_counts, expected_counts)),
expected_counts,
)
)
p_value = 1 - chi2.cdf(stat, dof)
results[i, 0] = stat
results[i, 1] = p_value
return results
##
[docs]
def kbet(
index: np.array,
batch: pd.Series,
alpha: float = 0.05,
only_score: bool = True
) -> Tuple[float, float, float]:
"""
Computes the kBET metric (Buttner et al., 2018) to assess batch effects for an index matrix of a KNN graph.
Parameters
----------
index : np.array
Array of shape (n_cells, n_neighbors) containing kNN indices.
batch : pd.Series
Discrete-valued batch annotation for each cell (length n_cells).
alpha : float, optional
Significance level of the chi-squared test. Default is 0.05.
only_score : bool, optional
If True, return only the accept rate; otherwise, return full kBET results. Default is True.
Returns
-------
tuple of (stat_mean, pvalue_mean, accept_rate)
kBET statistics, where:
- stat_mean is the mean test statistic,
- pvalue_mean is the mean p-value,
- accept_rate is the overall acceptance rate.
"""
# Compute null batch distribution
batch = batch.astype('category')
null_dist = batch.value_counts(normalize=True, sort=False).values
# Parallel computation of kBET metric (pegasus code)
starting_idx = chunker(len(batch))
n_jobs = cpu_count()
with parallel_backend("loky", inner_max_num_threads=1):
kBET_arr = np.concatenate(
Parallel(n_jobs=n_jobs)(
delayed(kbet_one_chunk)(
index[starting_idx[i] : starting_idx[i + 1], :],
batch,
null_dist
)
for i in range(n_jobs)
)
)
# Gather results
stat_mean, pvalue_mean = kBET_arr.mean(axis=0)
accept_rate = (kBET_arr[:,1] >= alpha).sum() / len(batch)
if only_score:
return accept_rate
else:
return (stat_mean, pvalue_mean, accept_rate)
##
[docs]
def NN_entropy(index: np.array, labels: np.array) -> float:
"""
Calculate the median (over cells) lentiviral-labels Shannon Entropy,
given an index matrix of a KNN graph.
Parameters
----------
index : np.array
Array of shape (n_cells, k-neighbors) containing cell neighbors indeces.
labels : pd.Series
Discrete-valued batch annotation for each cell (length n_cells).
Returns
-------
float : NN Shannon Entropy score.
"""
SH = []
for i in range(index.shape[0]):
freqs = labels[index[i,:]].value_counts(normalize=True).values
SH.append(-np.sum(freqs * np.log(freqs + 0.00001))) # Avoid 0 division
return np.median(SH)
##
def NN_purity(index: np.array, labels: np.array) -> float:
"""
Calculate the median purity of cells neighborhoods.
Parameters
----------
index : np.array
Array of shape (n_cells, k-neighbors) containing cell neighbors indeces.
labels : pd.Series
Discrete-valued batch annotation for each cell (length n_cells).
Returns
-------
float : NN purity score.
"""
kNN_purities = []
n_cells = index.shape[0]
k = index.shape[1]-1
for i in range(n_cells):
l = labels[i]
idx = index[i, 1:]
l_neighbors = labels[idx]
kNN_purities.append(np.sum(l_neighbors == l) / k)
return np.median(kNN_purities)
##
def binom_sum(x, k=2):
return binom(x, k).sum()
##
[docs]
def custom_ARI(g1: Iterable[Any], g2: Iterable[Any]) -> float:
"""
Compute scIB (Luecken et al., 2022) modified Adjusted Rand Index.
"""
# Contingency table
n = len(g1)
contingency = pd.crosstab(g1, g2)
# Calculate and rescale ARI
ai_sum = binom_sum(contingency.sum(axis=0))
bi_sum = binom_sum(contingency.sum(axis=1))
index = binom_sum(np.ravel(contingency))
expected_index = ai_sum * bi_sum / binom_sum(n, 2)
max_index = 0.5 * (ai_sum + bi_sum)
return (index - expected_index) / (max_index - expected_index)
##
[docs]
def distance_AUPRC(D: np.array, labels: Iterable[Any]) -> float:
"""
Uses a n x n distance matrix D as a binary classifier for a set of labels (1,...,n).
Reports Area Under Precision Recall Curve. Used in Ludwig et al., 2019.
Parameters
----------
D : np.array
Array of shape (n_cells, n_cells) containing cell-cell distances.
labels : pd.Series
Discrete-valued batch annotation for each cell (length n_cells).
Returns
-------
float : AUPRC score.
"""
labels = pd.Categorical(labels)
final = {}
for alpha in np.linspace(0,1,10):
p_list = []
gt_list = []
for i in range(D.shape[0]):
x = rescale(D[i,:])
p_list.append(np.where(x<=alpha, 1, 0))
c = labels.codes[i]
gt_list.append(np.where(labels.codes==c, 1, 0))
predicted = np.concatenate(p_list)
gt = np.concatenate(gt_list)
p = precision_score(gt, predicted)
r = recall_score(gt, predicted)
final[alpha] = (p, r)
df = pd.DataFrame(final).T.reset_index(drop=True)
df.columns = ['precision', 'recall']
auc_score = auc(df['recall'], df['precision'])
return auc_score
##
[docs]
def calculate_corr_distances(tree: CassiopeiaTree) -> float:
"""
Calculate correlation between tree and character matrix cell-cell distances.
Used in Yang et al., 2023.
"""
if tree.get_dissimilarity_map() is not None:
D = tree.get_dissimilarity_map()
D = D.loc[tree.leaves, tree.leaves] # In case root is there...
else:
raise ValueError('No precomputed character distance. Add one...')
L = []
undirected = tree.get_tree_topology().to_undirected()
for node in tree.leaves:
d = shortest_path_length(undirected, source=node)
L.append(d)
D_phylo = pd.DataFrame(L, index=tree.leaves).loc[tree.leaves, tree.leaves]
assert (D_phylo.index == D.index).all()
scale = lambda x: (x-x.mean())/x.std()
x = scale(D.values.flatten())
y = scale(D_phylo.values.flatten())
corr, p = pearsonr(x, y)
return corr, p
##
def _compatibility_metric(x, y):
"""
Custom metric to calculate the compatibility between two characters.
Returns the fraction of compatible leaf pairs.
"""
return np.sum((x == x[:, None]) == (y == y[:, None])) / len(x) ** 2
##
def char_compatibility(tree):
"""
Compute a matrix of pairwise-compatibility scores between characters.
"""
return pairwise_distances(
tree.character_matrix.T,
metric=lambda x, y: _compatibility_metric(x, y),
force_all_finite=False
)
##
[docs]
def CI(tree: CassiopeiaTree) -> float:
"""
Calculate the Consistency Index (CI) of tree characters.
"""
tree.reconstruct_ancestral_characters()
observed_changes = np.zeros(tree.n_character)
for parent, child in tree.depth_first_traverse_edges():
p_states = np.array(tree.get_character_states(parent))
c_states = np.array(tree.get_character_states(child))
changes = (p_states != c_states).astype(int)
observed_changes += changes
return 1 / observed_changes # Assumes both characters are present (1,0)
##
[docs]
def RI(tree: CassiopeiaTree) -> float:
"""
Calculate the Consistency Index (RI) of tree characters.
"""
tree.reconstruct_ancestral_characters()
observed_changes = np.zeros(tree.n_character)
for parent, child in tree.depth_first_traverse_edges():
p_states = np.array(tree.get_character_states(parent))
c_states = np.array(tree.get_character_states(child))
changes = (p_states != c_states).astype(int)
observed_changes += changes
# Calculate the maximum number of changes (G)
max_changes = len(tree.nodes)-1 # If every node had a unique state
return (max_changes-observed_changes) / (max_changes-1)
##
def AOC_one_cell(idx_mito: np.array, CAS: np.array, i: int, k: int = 10, n_trials: int = 1000):
"""
Agreement of Closeness (AOC) calculation, for one cell.
"""
mt_neighbors = idx_mito[i,:]
dist_cas9_neighbors = CAS[i, mt_neighbors].mean()
obs_rank = np.sum(CAS[i,:] < dist_cas9_neighbors) + 1
random_ranks = np.zeros(n_trials)
for trial in range(n_trials):
cas9_random = np.random.choice(CAS.shape[0], size=k, replace=False)
dist_cas9_random = CAS[i, cas9_random].mean()
rank = np.sum(CAS[i,:] < dist_cas9_random) + 1
random_ranks[trial] = rank
aoc = np.mean((random_ranks - obs_rank) / idx_mito.shape[0])
p_value = np.sum(random_ranks < obs_rank) / n_trials
return aoc, p_value
##
[docs]
def AOC(D1: np.array, D2: np.array, k: int = 10, n_trials: int = 1000):
"""
Agreement of Closeness (AOC) metric calculation. See Weng et al., 2024.
"""
n = D1.shape[0]
idx_D2, _, _ = kNN_graph(D=D2, k=k, from_distances=True)
AOC = np.zeros(n)
pvals = np.zeros(n)
for i in range(n):
aoc, p = AOC_one_cell(idx_D2, D1, i, k=k, n_trials=n_trials)
AOC[i] = aoc
pvals[i] = p
return AOC, pvals
##