Source code for mito.pp.preprocessing

"""
Pre-process AFMs.
"""

from igraph import Graph
from typing import Dict, Iterable, Any
from .filters import *
from .distances import call_genotypes, compute_distances
from .kNN import *
from ..ut.positions import transitions, transversions
from ..tl.phylo import build_tree, AFM_to_seqs


##


[docs] def filter_cells( afm: AnnData, cell_subset: Iterable[str] = None, cell_filter: str = 'filter1', nmads: int = 5, mean_cov_all: float = 20, median_cov_target: int = 25, min_perc_covered_sites: float = .75 ) -> AnnData: """ Filter cells from a MAESTER/RedeeM Allele Frequency Matrix. Parameters ---------- afm : AnnData Allele Frequency Matrix. cell_subset : Iterable[str], optional Subset of cells to retain. Default is None. cell_filter : str, optional Cell filtering strategy. Options are: - "filter1": Filter cells based on mean MT-genome coverage (all sites). - "filter2": Filter cells based on median target MT-sites coverage and minimum percentage of target sites covered (MAESTER only). Default is None. nmads : int, optional Number of Minimum Absolute Deviations to filter cells with high MT-library UMI counts. Default is 5. mean_coverage : int, optional Minimum mean consensus (at least 3-supporting-reads) UMI coverage across the MT-genome per cell. Default is 20. median_cov_target : int, optional Minimum median UMI coverage at target MT-sites (only for MAESTER data). Default is 25. min_perc_covered_sites : float, optional Minimum fraction of MT target sites covered (only for MAESTER data). Default is 0.75. Returns ------- AnnData Filtered Allele Frequency Matrix. """ if cell_subset is not None: cells = list(set(cell_subset) & set(afm.obs_names)) logging.info(f'Filter provided cell subset. Valid CBs: {len(cells)}') afm = afm[cells,:].copy() scLT_system = afm.uns['scLT_system'] logging.info(f'scLT system: {scLT_system}') # Cell filters if cell_filter == 'filter1': if scLT_system == 'MAESTER' or scLT_system == 'RedeeM': x = afm.obs['mean_site_coverage'] median = np.median(x) MAD = np.median(np.abs(x-median)) test = (x>=mean_cov_all) & (x<=median+nmads*MAD) afm = afm[test,:].copy() logging.info(f'Filtered cells (i.e., mean MT-genome coverage >={mean_cov_all} and <={median+nmads*MAD:.2f}): {afm.shape[0]}') afm.uns['cell_filter'] = { 'cell_subset':cell_subset, 'cell_filter':cell_filter, 'nmads':nmads, 'mean_cov_all':mean_cov_all } else: raise ValueError(f'Cell filter {cell_filter} is not available for scLT_system {scLT_system}') elif cell_filter == 'filter2': if scLT_system == 'MAESTER': test1 = afm.obs['median_target_site_coverage'] >= median_cov_target test2 = afm.obs['frac_target_site_covered'] >= min_perc_covered_sites afm = afm[(test1) & (test2),:].copy() logging.info(f'Filtered cells (i.e., median target MT-genome coverage >={median_cov_target} and fraction covered sites >={min_perc_covered_sites}: {afm.shape[0]}') afm.uns['cell_filter'] = { 'cell_subset':cell_subset, 'cell_filter':cell_filter, 'median_cov_target':median_cov_target, 'min_perc_covered_sites':min_perc_covered_sites } else: raise ValueError(f'Cell filter {cell_filter} is not available for scLT_system {scLT_system}') else: afm.uns['cell_filter'] = {} logging.info(f'Skipping cell filters: {cell_filter} not available. Filtered cells: {afm.shape[0]}') # Ensure each site has been observed from at least one cell test_atleastone = (afm.X>0).sum(axis=0).A1>0 afm = afm[:,test_atleastone].copy() return afm
## def compute_metrics_raw(afm): """ Compute raw dataset metrics and update .uns. """ # Compute general cell-site coverage metrics d = {} if afm.uns["scLT_system"] == "MAESTER": if afm.uns['pp_method'] in ['mito_preprocessing', 'maegatk']: d['median_site_cov'] = afm.obs['median_target_site_coverage'].median() d['median_target_untarget_coverage_logratio'] = np.median( np.log10( afm.obs['median_target_site_coverage'] / \ (afm.obs['median_untarget_site_coverage']+0.000001) ) ).round(2) else: logging.info(f'Skip general metrics for pp_method {afm.uns["pp_method"]}.') elif afm.uns["scLT_system"] == "redeem": d['median_site_cov'] = afm.obs['mean_site_coverage'].median() else: logging.info(f'Skip raw metrics (scLT_system: {afm.uns["scLT_system"]}).') afm.uns['dataset_metrics'] = d ## def compute_connectivity_metrics(X): """ Calculate the connectivity metrics presented in Weng et al., 2024. """ # Create connectivity graph A = np.dot(X, X.T) np.fill_diagonal(A, 0) A.diagonal() g = Graph.Adjacency((A>0).tolist(), mode='undirected') edges = g.get_edgelist() weights = [A[i][j] for i, j in edges] g.es['weight'] = weights # Calculate metrics average_degree = sum(g.degree()) / g.vcount() # avg_path_length if g.is_connected(): average_path_length = g.average_path_length() else: largest_component = g.clusters().giant() average_path_length = largest_component.average_path_length() # avg_path_length transitivity = g.transitivity_undirected() # transitivity components = g.clusters() largest_component_size = max(components.sizes()) proportion_largest_component = largest_component_size / g.vcount() # % cells in largest subgraph return average_degree, average_path_length, transitivity, proportion_largest_component ## def compute_metrics_filtered(afm, spatial_metrics=False, tree_kwargs={}): """ Compute additional metrics on selected MT-SNVs feature space. """ d = {} assert 'bin' in afm.layers X_bin = afm.layers['bin'].toarray() # n cells and vars d['n_cells'] = X_bin.shape[0] d['n_vars'] = X_bin.shape[1] # n cells per var and n vars per cell (mean, median, std) d['median_n_vars_per_cell'] = np.median((X_bin>0).sum(axis=1)) d['mean_n_vars_per_cell'] = np.mean((X_bin>0).sum(axis=1)) d['std_n_vars_per_cell'] = np.std((X_bin>0).sum(axis=1)) d['mean_n_cells_per_var'] = np.mean((X_bin>0).sum(axis=0)) d['median_n_cells_per_var'] = np.median((X_bin>0).sum(axis=0)) d['std_n_cells_per_var'] = np.std((X_bin>0).sum(axis=0)) # AFM sparseness and genotypes uniqueness d['density'] = (X_bin>0).sum() / (X_bin.shape[0] * X_bin.shape[1]) seqs = AFM_to_seqs(afm) unique_genomes_occurrences = pd.Series(seqs).value_counts(normalize=True) d['genomes_redundancy'] = 1-(unique_genomes_occurrences.size / X_bin.shape[0]) d['median_genome_prevalence'] = unique_genomes_occurrences.median() # Mutational spectra class_annot = afm.var_names.map(lambda x: x.split('_')[1]).value_counts().astype('int') class_annot.index = class_annot.index.map(lambda x: f'mut_class_{x}') n_transitions = class_annot.loc[class_annot.index.str.contains('|'.join(transitions))].sum() n_transversions = class_annot.loc[class_annot.index.str.contains('|'.join(transversions))].sum() # % lineage-biased mutations if afm.var.columns.str.startswith('FDR').any(): freq_lineage_biased_muts = (afm.var.loc[:,afm.var.columns.str.startswith('FDR')]<=.1).any(axis=1).sum() / afm.shape[1] else: freq_lineage_biased_muts = np.nan # Collect d = pd.concat([ pd.Series(d), class_annot, pd.Series({'transitions_vs_transversions_ratio':n_transitions/n_transversions}), pd.Series({'freq_lineage_biased_muts':freq_lineage_biased_muts}), ]) # Spatial metrics tree = None if spatial_metrics: # Cell connectedness average_degree, average_path_length, transitivity, proportion_largest_component = compute_connectivity_metrics(X_bin) d['average_degree'] = average_degree d['average_path_length'] = average_path_length d['transitivity'] = transitivity d['proportion_largest_component'] = proportion_largest_component # Baseline tree internal nodes mutations support tree = build_tree(afm, precomputed=True, **tree_kwargs) # To .uns afm.uns['dataset_metrics'].update(d) return tree ##
[docs] def filter_afm( afm: AnnData, lineage_column: str = None, min_cell_number: int = 0, cells: Iterable[str] = None, filtering: str = 'MiTo', filtering_kwargs: Dict[str,Any] = {}, filter_moran: bool = True, moran_I_pvalue: float = .01, max_AD_counts: int = 2, variants: Iterable[str] = None, min_n_var: int = 1, fit_mixtures: bool = False, only_positive_deltaBIC: bool = False, filter_dbs: bool = True, compute_enrichment: bool = True, bin_method: str = 'MiTo', binarization_kwargs: Dict[str,Any] = {}, metric: str = 'weighted_jaccard', ncores: int = 8, spatial_metrics: bool = False, tree_kwargs: Dict[str,Any] = {}, return_tree: bool = False ): """ Filter an Allele Frequency Matrix for downstream analysis. This function implements different strategies to subset the detected cells and MT-SNVs to those that exhibit optimal properties for single-cell lineage tracing (scLT). The user can tune filtering method defaults via the `filtering_kwargs` argument. Pre-computed sets of cells and variants can be selected without relying on any specific method (the function ensures integrity of the AFM `AnnData` object after subsetting). Parameters ---------- afm : AnnData Allele Frequency Matrix. lineage_column : str, optional Lineage column of interest in afm.obs. Default is None. min_cell_number : int, optional Minimum number of cells required for groups in afm.obs[lineage_column]. Default is 0. cells : Iterable[str], optional Pre-defined list of cells to retain. Default is None. filtering : str, optional MT-SNVs filtering strategy. See mito.pp.filters for available strategies and parameters. Default is MiTo. filtering_kwargs : dict, optional Additional keyword arguments for the selected filtering method. Default is {}. filter_moran : bool, optional Whether to remove MT-SNVs that are not spatially auto-correlated. Default is True. moran_I_pvalue : float, optional P-value threshold for Moran's I statistics. Default is 0.01. max_AD_counts : int, optional Retain an MT-SNV if at least one cell has this number of alternative allele counts. Default is 2. variants : Iterable[str], optional Pre-defined list of variants to retain. Default is None. min_n_var : int, optional Retain cells with at least this number of MT-SNVs. Default is 1. fit_mixtures : bool, optional Whether to fit MQuad (Kwock et al., 2022) binomial mixtures. Default is False. only_positive_deltaBIC : bool, optional Retain only MT-SNVs with positive deltaBIC (from MQuad). Default is False. filter_dbs : bool, optional Filter MT-SNVs from dbSNP and REDIdb database. Default is True. compute_enrichment : bool, optional Whether to compute MT-SNVs enrichment in the lineage_column. Default is True. bin_method : str, optional Genotyping method. Default is MiTo. binarization_kwargs : dict, optional Additional keyword arguments for genotyping. Default is {}. metric : str, optional Distance metric to use. Default is weighted_jaccard. ncores : int, optional Number of cores to use for distance computations and fitting MQuad mixtures, if necessary. Default is 1. spatial_metrics : bool, optional Whether to compute "spatial" connectivity metrics for filtered MT-SNVs. Default is False. tree_kwargs : dict, optional Additional keyword arguments for tree inference (i.e., mito.tl.build_tree). Default is {}. return_tree : bool, optional Whether to return a CassiopeiaTree if spatial_metrics is True. Default is False. Returns ------- AnnData Filtered Allelic Frequency Matrix. """ T = Timer() T.start() logging.info('Compute general dataset metrics...') compute_metrics_raw(afm) logging.info('Compute vars_df as in Weng et al., 2024') annotate_vars(afm) logging.info(f'Filter MT-SNVs...') scLT_system = afm.uns['scLT_system'] pp_method = afm.uns['pp_method'] if 'pp_method' in afm.uns else 'previously pre-processed (public data)' logging.info(f'scLT_system: {scLT_system}') logging.info(f'pp_method: {pp_method}') logging.info(f'Feature selection method: {filtering}') logging.info(f'Original afm: n cells={afm.shape[0]}, n features={afm.shape[1]}') # Cells from <lineage_column> with at least min_cell_number cells, if necessary if min_cell_number>0 and lineage_column not in [None, 'null']: afm = filter_cell_clones(afm, column=lineage_column, min_cell_number=min_cell_number) annotate_vars(afm, overwrite=True) # Baseline filter afm = filter_baseline(afm) logging.info(f'afm after baseline filter: n cells={afm.shape[0]}, n features={afm.shape[1]}') # Custom filters if filtering in filtering_options: if filtering == 'baseline': pass if filtering == 'CV': afm = filter_CV(afm, **filtering_kwargs) elif filtering == 'miller2022': afm = filter_miller2022(afm, **filtering_kwargs) elif filtering == 'weng2024': afm = filter_weng2024(afm, **filtering_kwargs) elif filtering == 'MQuad': afm = filter_MQuad(afm, ncores=ncores, path_=os.getcwd(), **filtering_kwargs) elif filtering == 'MiTo': afm = filter_MiTo(afm, **filtering_kwargs) elif filtering == 'GT_enriched': afm = filter_GT_enriched(afm, lineage_column=lineage_column, **filtering_kwargs) elif filtering in [None, 'null']: logging.info(f'Filtering custom sets of cells and variants') rows = cells if cells is not None else afm.obs_names cols = variants if variants is not None else afm.var_names afm = afm[ [x for x in rows if x in afm.obs_names], [x for x in cols if x in afm.var_names] ].copy() else: raise ValueError( f'''The provided filtering method {filtering} is not supported. Choose another one...''' ) logging.info(f'afm after {filtering} filter: n cells={afm.shape[0]}, n features={afm.shape[1]}') # Filter common SNVs and possible RNA-edits if filter_dbs: afm, n_dbSNP = filter_dbSNP_common(afm) afm, n_REDIdb = filter_REDIdb_edits(afm) else: n_REDIdb = np.nan n_dbSNP = np.nan # Genotype cells, and filter the one with less than min_n_var mutations logging.info(f'Assign MT-genotypes with {bin_method} method') call_genotypes(afm, bin_method=bin_method, **binarization_kwargs) afm = afm[(afm.layers['bin']>0).sum(axis=1).A1>=min_n_var,:].copy() logging.info(f'Retain only cells with at least {min_n_var} MT-SNVs: {afm.shape[0]}') # Bimodal mixture modelling: deltaBIC (MQuad-like) and max AD in at least one cell (Weng et al., 2024) if fit_mixtures: afm.var = afm.var.join(fit_MQuad_mixtures(afm, ncores=ncores).dropna()[['deltaBIC']]) if only_positive_deltaBIC: afm = afm[:,afm.var['deltaBIC']>0].copy() logging.info(f'Remove MT-SNVs with MQuad deltaBIC<0') if max_AD_counts>1: afm = afm[:,afm.layers['AD'].max(axis=0).toarray()>=max_AD_counts].copy() logging.info(f'Remove MT-SNVs with no +cells having at least {max_AD_counts} AD counts') # Compute cell-cell distances and filter variants significantly auto-correlated. compute_distances(afm, precomputed=True, metric=metric, ncores=ncores) if filter_moran: logging.info(f'Filter only MT-SNVs with significant spatial auto-correlation (i.e., Moran I statistics)') n0 = afm.shape[1] afm = filter_variant_moransI(afm, pval_treshold=moran_I_pvalue, n_cores=ncores) n1 = afm.shape[1] n_not_autocorrelated = n0-n1 else: n_not_autocorrelated = np.nan # Final cell removal afm = afm[(afm.layers['bin']>0).sum(axis=1).A1>=min_n_var,:].copy() annotate_vars(afm, overwrite=True) logging.info(f'Retain cells with at least {min_n_var} MT-SNVs: {afm.shape[0]}') logging.info(f'Final afm after all filters: n cells={afm.shape[0]}, n features={afm.shape[1]}') ## # Lineage bias if lineage_column in afm.obs.columns and compute_enrichment: logging.info(f'Compute MT-SNVs enrichment for {lineage_column} categories') lineages = afm.obs[lineage_column].dropna().unique() for target_lineage in lineages: res = compute_lineage_biases(afm, lineage_column, target_lineage, bin_method=bin_method, binarization_kwargs=binarization_kwargs) afm.var[f'FDR_{target_lineage}'] = res['FDR'] afm.var[f'odds_ratio_{target_lineage}'] = res['odds_ratio'] # Compute final metrics logging.info(f'Compute last (filtered) statistics.') tree = compute_metrics_filtered( afm, spatial_metrics=spatial_metrics, tree_kwargs=tree_kwargs ) # Add params to .uns afm.uns['char_filter'] = { 'lineage_column' : lineage_column, 'min_cell_number' : min_cell_number, 'filtering' : filtering if not((cells is not None) or (variants is not None)) else 'predefined_sets', 'max_AD_counts' : max_AD_counts, 'only_positive_deltaBIC' : only_positive_deltaBIC, 'compute_enrichment' : compute_enrichment, 'filter_dbs' : filter_dbs, 'filter_moran' : filter_moran, 'spatial_metrics' : spatial_metrics, 'n_dbSNP' : n_dbSNP, 'n_REDIdb' : n_REDIdb, 'n_not_autocorrelated' : n_not_autocorrelated, 'min_n_var' : min_n_var } afm.uns['char_filter'].update(filtering_kwargs) logging.info(f'AFM filtering complete: {T.stop()}') if return_tree: return afm, tree else: return afm
##