Source code for mito.tl.phenotype

"""
Tools to map phenotype to lineage structures.
"""

import numpy as np
import pandas as pd
from tqdm import tqdm
from typing import Any, Iterable
from cassiopeia.data import CassiopeiaTree
from cassiopeia.tools import score_small_parsimony
from scipy.stats import fisher_exact
from statsmodels.sandbox.stats.multicomp import multipletests
import statsmodels.api as sm
import statsmodels.formula.api as smf
from ..ut.phylo_utils import get_internal_node_stats


##


[docs] def compute_clonal_fate_bias( tree: CassiopeiaTree, state_column: str, clone_column: str, target_state: str|Any ) -> pd.DataFrame: """ Compute -log10(FDR) Fisher's exact test: clonal fate biases towards some target_state. """ n = len(tree.leaves) clones = np.sort(tree.cell_meta[clone_column].unique()) target_ratio_array = np.zeros(clones.size) oddsratio_array = np.zeros(clones.size) pvals = np.zeros(clones.size) # Here we go for i, clone in enumerate(clones): test_clone = tree.cell_meta[clone_column] == clone test_state = tree.cell_meta[state_column] == target_state clone_size = test_clone.sum() clone_state_size = (test_clone & test_state).sum() target_ratio = clone_state_size / clone_size target_ratio_array[i] = target_ratio other_clones_state_size = (~test_clone & test_state).sum() # Fisher oddsratio, pvalue = fisher_exact( [ [clone_state_size, clone_size - clone_state_size], [other_clones_state_size, n - other_clones_state_size], ], alternative='greater', ) oddsratio_array[i] = oddsratio pvals[i] = pvalue # Correct pvals --> FDR pvals = multipletests(pvals, alpha=0.05, method="fdr_bh")[1] # Results results = pd.DataFrame({ 'perc_in_target_state' : target_ratio_array, 'odds_ratio' : oddsratio_array, 'FDR' : pvals, 'fate_bias' : -np.log10(pvals) }).sort_values('fate_bias', ascending=False) return results
##
[docs] def compute_scPlasticity(tree: CassiopeiaTree, meta_column: str): """ Compute scPlasticity as in Yang et al., 2022. https://www.sc-best-practices.org/trajectories/lineage_tracing.html# """ # Format column of interest tree.cell_meta[meta_column] = pd.Categorical(tree.cell_meta[meta_column]) # parsimony = score_small_parsimony(tree, meta_item=meta_column) # compute plasticities for each node in the tree for node in tree.depth_first_traverse_nodes(): effective_plasticity = score_small_parsimony( tree, meta_item=meta_column, root=node ) size_of_subtree = len(tree.leaves_in_subtree(node)) tree.set_attribute( node, "effective_plasticity", effective_plasticity / size_of_subtree ) tree.cell_meta["scPlasticity"] = 0 for leaf in tree.leaves: plasticities = [] parent = tree.parent(leaf) while True: plasticities.append(tree.get_attribute(parent, "effective_plasticity")) if parent == tree.root: break parent = tree.parent(parent) tree.cell_meta.loc[leaf, "scPlasticity"] = np.mean(plasticities)
##
[docs] def nb_regression(df: pd.DataFrame, features: Iterable[str], predictor: str) -> pd.DataFrame: """ Negative binomial regression approach to associate clonal-level features to gene expression. Parameters ---------- afm : pd.DataFrame (clone/sample x features/covariates) Input data table. Contains raw counts for all genes, and covariates of interest. features : list, str List of variables to test the GLM model coefficients on. predictor : str Model specification via formula interface. Formula is in the form: "gene ~ predictor". Example predictor: "fitness + counts" Returns ------- AnnData Filtered Allelic Frequency Matrix. """ L = [] for gene in tqdm(features, total=len(features), desc="Features"): try: formula = f'{gene} ~ {predictor}' family = sm.families.NegativeBinomial() model = ( smf.glm(formula=formula, data=df, family=family) .fit() ) df_ = ( model.params.to_frame('coef') .join(model.pvalues.to_frame('pval')) .reset_index(names='param') .query('param!="Intercept"') .assign(gene=gene) ) L.append(df_) except: pass results = pd.concat(L) results['-logp10'] = -np.log10(results['pval']) results = results[['gene', 'param', 'coef', 'pval', '-logp10']] return results
## def _find_partitions(df, n_cells): partitions = [] i = 0 while i<=df.shape[0]: partitions.append( df.iloc[i:min(i+n_cells,df.shape[0]),:].index ) i += n_cells return partitions ## def agg_pseudobulk(tree, adata, agg_method='mean', min_n_cells=10, n_cells=None, n_samples=None): """ Aggragete expression data into psudobulk samples. """ meta = tree.cell_meta.copy() top_clones = meta['MiTo clone'].value_counts().loc[lambda x: x>=min_n_cells].index.to_list() meta_top = meta.loc[meta['MiTo clone'].isin(top_clones)] meta_top['MiTo clone'] = meta_top['MiTo clone'].astype('str') cells = meta_top.index if n_cells is None and n_samples is None: agg = ( pd.DataFrame( adata[cells,:].layers['raw'].toarray(), index=cells, columns=adata.var_names ) .join(meta_top[['MiTo clone']]) .groupby('MiTo clone') .agg(agg_method) .round() ) elif n_cells is not None and n_samples is not None: pseudobulk_samples = [] for clone in top_clones: df_ = meta_top.loc[meta_top['MiTo clone']==clone] for i in range(n_samples): cells = df_.sample(n_cells).index profile = ( pd.DataFrame( adata[cells,:].layers['raw'].toarray(), index=cells, columns=adata.var_names ) .agg(agg_method, axis=0) .round() .to_frame('counts') .reset_index(names='gene') .assign(sample=f'{clone}_{i}') ) pseudobulk_samples.append(profile) agg = ( pd.concat(pseudobulk_samples) .pivot_table(index='sample', columns='gene', values='counts') ) elif n_cells is not None and n_samples is None: pseudobulk_samples = [] for clone in top_clones: df_ = meta_top.loc[meta_top['MiTo clone']==clone] partitions = _find_partitions(df_, n_cells) for i,cells in enumerate(partitions): profile = ( pd.DataFrame( adata[cells,:].layers['raw'].toarray(), index=cells, columns=adata.var_names ) .agg(agg_method, axis=0) .round() .to_frame('counts') .reset_index(names='gene') .assign(sample=f'{clone}_{i}') ) pseudobulk_samples.append(profile) agg = ( pd.concat(pseudobulk_samples) .pivot_table(index='sample', columns='gene', values='counts') ) else: raise ValueError('Wrong combo of n_cells, n_samples') # Filter genes and add counts columns if agg_method == 'sum': total_clone_counts = agg.sum(axis=1) agg_norm = agg.apply(lambda x: x/(total_clone_counts+1)*10**6, axis=0) norm_mean_expression = agg_norm.mean(axis=0) test = (norm_mean_expression >= np.percentile(norm_mean_expression, 10)) agg = agg.loc[:,test].copy() agg['counts'] = total_clone_counts elif agg_method == 'mean': mean_expression = agg.mean(axis=0) test = mean_expression > 0 agg = agg.loc[:,test] agg['counts'] = agg.mean(axis=1) # Add clone level covariates, and re-scale them clone_features = ( get_internal_node_stats(tree) .loc[lambda x: x['clonal_node']].reset_index(names='lca') .merge(tree.cell_meta[['MiTo clone', 'lca', 'n cells']], on='lca') .drop_duplicates() .loc[lambda x: x['MiTo clone'].isin(top_clones)] [['MiTo clone', 'fitness', 'clade_size']] .drop_duplicates() .reset_index(drop=True) .set_index('MiTo clone') ) if n_cells is not None or n_samples is not None: agg['MiTo clone'] = agg.index.map(lambda x: x.split('_')[0]) agg = ( agg .reset_index().set_index('MiTo clone') .join(clone_features) .reset_index().set_index('sample') .drop(columns=['MiTo clone']) ) else: agg = agg.join(clone_features) # Rescale predictor variables rescale = lambda x: (x-x.mean()) / x.std() agg['fitness'] = rescale(agg['fitness']) agg['clade_size'] = rescale(agg['clade_size']) agg['counts'] = rescale(agg['counts']) return agg ##