"""
All filters: variants/cells.
"""
import os
import logging
import numpy as np
import pandas as pd
from scipy.stats import fisher_exact
from statsmodels.sandbox.stats.multicomp import multipletests
from mquad.mquad import *
from joblib import Parallel, delayed, parallel_backend, cpu_count
from .distances import *
from ..ut.positions import mask_mt_sites
from ..ut.utils import load_edits_REDIdb, load_common_dbSNP
##
filtering_options = [
'baseline',
'CV',
'miller2022',
'weng2024',
'MQuad',
'MiTo',
'GT_enriched'
# DEPRECATED
# 'ludwig2019',
# 'velten2021',
# 'seurat',
# 'MQuad_optimized',
# 'density',
# 'GT_stringent'
]
##
def nans_as_zeros(afm):
"""
Fill nans with zeros.
"""
X_copy = afm.X.copy()
X_copy[np.isnan(X_copy)] = 0
afm.X = X_copy
return afm
##
def filter_cells_with_at_least_one(
afm: AnnData,
bin_method: str = 'vanilla',
binarization_kwargs: Dict[str,Any] = {}
) -> AnnData:
"""
Filter cells with at least one variant (genotypes from `bin_method`).
"""
X = call_genotypes(a=afm.copy(), bin_method=bin_method, **binarization_kwargs)
afm = afm[afm.obs_names[X.sum(axis=1).A1>=1],:]
afm.uns['per_position_coverage'] = afm.uns['per_position_coverage'].loc[afm.obs_names,:]
afm.uns['per_position_quality'] = afm.uns['per_position_quality'].loc[afm.obs_names,:]
return afm
##
[docs]
def filter_cell_clones(
afm: AnnData,
column: str = 'GBC',
min_cell_number: int = 10
) -> AnnData:
"""
Filter only cells from groups in afm.obs[`column`] with more
than `min_cell_number` cells.
"""
logging.info(f'Filtering cells from {column} groups with >={min_cell_number} cells')
n0 = afm.shape[0]
cell_counts = afm.obs.groupby(column).size()
clones_to_retain = cell_counts[cell_counts>=min_cell_number].index
test = afm.obs[column].isin(clones_to_retain)
afm = afm[test,:].copy()
logging.info(f'Removed other {n0-afm.shape[0]} cells')
logging.info(f'Retaining {afm.obs[column].unique().size} discrete categories (i.e., {column}) for the analysis.')
return afm
##
[docs]
def annotate_vars(afm: AnnData, overwrite: bool = False):
"""
Annotate MT-SNVs properties as in in Weng et al., 2024, and Miller et al. 2022 before.
Create vars_df and update .var.
"""
if 'mean_af' in afm.var.columns:
if not overwrite:
return
else:
logging.info('Re-annotate variants in afm')
afm.var = afm.var.iloc[:,:4].copy() # Retain mean coverage, if present
# Initialize vars_df
# vars.tib <- tibble(var = rownames(af.dm),
# mean_af = rowMeans(af.dm),
# mean_cov = rowMeans(assays(maegtk)[["coverage"]])[as.numeric(cutf(rownames(af.dm), d = "_"))],
# quality = qual.num)
afm.var['mean_af'] = afm.X.mean(axis=0).A1
if 'site_coverage' in afm.layers:
afm.var['mean_cov'] = afm.layers['site_coverage'].mean(axis=0).A1
if 'qual' in afm.layers: # NB: not computed for redeem data
qual = afm.layers['qual'].toarray()
afm.var['quality'] = np.nanmean(np.where(qual>0, qual, np.nan), axis=0)
del qual
# Calculate the number of cells that exceed VAF thresholds 0, 1, 5, 10, 50 as in Weng et al., 2024
# vars.tib <- vars.tib %>%
# mutate(n0 = apply(af.dm, 1, function(x) sum(x == 0))) %>% # NEGATIVE CELLS
# mutate(n1 = apply(af.dm, 1, function(x) sum(x > 1))) %>%
# mutate(n5 = apply(af.dm, 1, function(x) sum(x > 5))) %>%
# mutate(n10 = apply(af.dm, 1, function(x) sum(x > 10))) %>%
# mutate(n50 = apply(af.dm, 1, function(x) sum(x > 50)))
# Variant_CellN<-apply(af.dm,1,function(x){length(which(x>0))})
# vars.tib<-cbind(vars.tib,Variant_CellN)
afm.var['n0'] = (afm.X==0).sum(axis=0).A1 # NEGATIVE CELLS
afm.var['n1'] = (afm.X>.01).sum(axis=0).A1
afm.var['n2'] = (afm.X>.02).sum(axis=0).A1
afm.var['n5'] = (afm.X>.05).sum(axis=0).A1
afm.var['n10'] = (afm.X>.1).sum(axis=0).A1
afm.var['n50'] = (afm.X>.5).sum(axis=0).A1
afm.var['Variant_CellN'] = (afm.X>0).sum(axis=0).A1
# Add mean AF, AD and DP in +cells
X = afm.X.toarray()>0
afm.var['median_af_in_positives'] = np.nanmean(np.where(X>0, X, np.nan), axis=0)
afm.var['mean_AD_in_positives'] = np.nanmean(
np.where(X>0, afm.layers['AD'].toarray(), np.nan), axis=0
)
afm.var['mean_DP_in_positives'] = np.nanmean(
np.where(X>0, afm.layers['DP'].toarray(), np.nan), axis=0
)
del X
##
[docs]
def filter_baseline(
afm: AnnData,
min_site_cov: int = 5,
min_var_quality: int = 30,
min_n_positive: int = 2,
only_genes: bool = True
) -> AnnData:
"""
Compute summary stats and baseline filter MT-SNVs (MAESTER, redeem).
"""
if afm.uns['scLT_system'] == 'MAESTER':
if only_genes:
test_sites = mask_mt_sites(afm.var['pos'])
afm = afm[:,test_sites].copy()
# Basic filter as in Weng et al., 2024
if afm.uns['pp_method'] in ['mito_preprocessing', 'maegatk']:
test_baseline = (
(afm.var['mean_cov']>=min_site_cov) & \
(afm.var['quality']>=min_var_quality) & \
(afm.var['Variant_CellN']>=min_n_positive)
)
afm = afm[:,test_baseline].copy()
else:
logging.info('Baseline filter only exlcudes MT-SNVs in un-targeted sites.')
elif afm.uns['scLT_system'] == 'RedeeM':
test_baseline = (
(afm.var['mean_cov']>=min_site_cov) & \
(afm.var['Variant_CellN']>=min_n_positive)
)
afm = afm[:,test_baseline].copy()
else:
raise ValueError(f'Baseline filter not available for scLT_system current scLT_system and pp_method')
# Exclude sites with more than one alt alleles observed
var_sites = afm.var_names.map(lambda x: x.split('_')[0])
test = var_sites.value_counts()[var_sites]==1
afm = afm[:,afm.var_names[test]].copy()
# Exclude variants sites not observed in any cells and vice versa
afm = afm[(afm.X>0).sum(axis=1).A1>0,:].copy()
afm = afm[:,(afm.X>0).sum(axis=0).A1>0].copy()
return afm
##
def filter_CV(afm: AnnData, n_top: int = 100) -> AnnData:
"""
Filter top `n_top` MT-SNVs (MAESTER, redeem), ranked by coefficient of variation (CV).
"""
scLT_system = afm.uns['scLT_system']
pp_method = afm.uns['pp_method']
if scLT_system == 'MAESTER' and pp_method in ['mito_preprocessing', 'maegatk']:
pass
else:
raise ValueError(f'CV filter not available for scLT_system {scLT_system} and pp_method {pp_method}')
X = afm.X.toarray()
CV = (np.std(X, axis=0)**2 / np.mean(X, axis=0))
idx_vars = np.argsort(CV)[::-1][:n_top]
afm = afm[:,idx_vars].copy()
del X
return afm
##
def filter_miller2022(
afm: AnnData,
min_site_cov: float = 100,
min_var_quality: float = 30,
p1: int = 1,
p2: int = 99,
perc1: float = 0.01,
perc2: float = 0.1
) -> AnnData:
"""
Filter MT-SNVs (MAESTER only) based on adaptive tresholds adopted in Miller et al., 2022.
"""
scLT_system = afm.uns['scLT_system']
pp_method = afm.uns['pp_method']
if scLT_system == 'MAESTER' and pp_method in ['mito_preprocessing', 'maegatk']:
pass
else:
raise ValueError(f'miller2022 filter not available for scLT_system {scLT_system} and pp_method {pp_method}')
X = afm.X.toarray()
test = (
(afm.var['mean_cov']>=min_site_cov) & \
(afm.var['quality']>=min_var_quality) & \
((np.percentile(X, q=p1, axis=0) < perc1) & \
(np.percentile(X, q=p2, axis=0) > perc2))
)
afm = afm[:,test].copy()
return afm
##
def fit_MQuad_mixtures(afm, n_top=None, path_=None, ncores=8, minDP=10, minAD=1, with_M=False):
"""
Filter MT-SNVs (MAESTER, redeem) with the MQuad method (Kwock et al., 2022)
"""
if n_top is not None:
afm = filter_CV(afm, n_top=n_top) # Prefilter again, if still too much MT-SNVs
# Fit models
M = Mquad(AD=afm.layers['AD'].T, DP=afm.layers['DP'].T)
path_ = os.getcwd() if path_ is None else path_
df = M.fit_deltaBIC(out_dir=path_, nproc=ncores, minDP=minDP, minAD=minAD)
df.index = afm.var_names
df['deltaBIC_rank'] = df['deltaBIC'].rank(ascending=False)
if with_M:
return df.sort_values('deltaBIC', ascending=False), M
else:
return df.sort_values('deltaBIC', ascending=False)
##
def filter_MQuad(
afm: AnnData,
ncores: int = 8,
minDP: int = 5,
minAD: int = 1,
minCell: int = 2,
path_: str = None,
n_top: int = None
) -> AnnData:
"""
Filter MT-SNVs (MAESTER, redeem) with the MQuad method (Kwock et al., 2022).
Parameters
----------
afm : AnnData
The Allele Frequency Matrix.
ncores : int, optional
Number of cores to use for computation. Default is 8.
minDP : int, optional
Minimum depth (DP) required. Default is 5.
minAD : int, optional
Minimum alternative allele (AD) count required. Default is 1.
minCell : int, optional
Minimum number of cells required. Default is 2.
path_ : str, optional
Path for saving output or intermediate results. Default is None.
n_top : int, optional
Number of top MT-SNVs to select. Default is None.
Returns
-------
afm : AnnData
Filtered Allele Frequency Matrix.
"""
scLT_system = afm.uns['scLT_system']
pp_method = afm.uns['pp_method']
if scLT_system == 'MAESTER' and pp_method in ['mito_preprocessing', 'maegatk', 'cellsnp-lite']:
pass
elif scLT_system == 'redeem':
pass
else:
raise ValueError(f'MQuad filter not available for scLT_system {scLT_system} and pp_method {pp_method}')
_, M = fit_MQuad_mixtures(
afm, n_top=n_top, path_=path_, ncores=ncores, minDP=minDP, minAD=minAD, with_M=True
)
_, _ = M.selectInformativeVariants(
min_cells=minCell, out_dir=path_, tenx_cutoff=None,
export_heatmap=False, export_mtx=False
)
idx = M.final_df.index.to_list()
selected = [ afm.var_names[i] for i in idx ]
afm = afm[:,selected].copy()
afm.var['deltaBIC'] = M.final_df['deltaBIC']
os.system(f'rm {os.path.join(path_, "*BIC*")}')
return afm
##
def filter_weng2024(
afm: AnnData,
min_site_cov: float = 5,
min_var_quality: float = 30,
min_frac_negative: float = .9,
min_n_positive: int = 2,
low_confidence_af: float = .1,
high_confidence_af: float = .5,
min_prevalence_low_confidence_af: float = .1,
min_cells_high_confidence_af: int = 2
) -> AnnData:
"""
Filter MT-SNVs (MAESTER only) as in Weng et al., 2024, and Miller et al. 2022.
Filters variants using the following criteria:
- At least `min_site_cov` mean site coverage (across cells)
- At least `min_var_quality` mean variant allele basecall quality (across cells)
- At least n cells * `min_frac_negative` negative cells
- At least `min_n_positive` (AF > 0) cells
- At least `min_prevalence_low_confidence_af` prevalence at AF less than `low_confidence_af`
- At least `min_cells_high_confidence_af` cells with AF greater than `high_confidence_af`
Parameters
----------
afm : AnnData
Allele Frequency Matrix.
min_site_cov : float
Minimum mean site coverage across cells.
min_var_quality : float
Minimum mean variant allele basecall quality across cells.
min_frac_negative : float
Fraction of cells that must be negative.
min_n_positive : int
Minimum number of cells with AF > 0.
min_prevalence_low_confidence_af : float
Minimum prevalence (fraction of cells) with AF less than low_confidence_af.
low_confidence_af : float
Threshold for low confidence allele frequency.
min_cells_high_confidence_af : int
Minimum number of cells required with AF greater than high_confidence_af.
high_confidence_af : float
Threshold for high confidence allele frequency.
Returns
-------
afm : AnnData
Filtered Allele Frequency Matrix.
"""
scLT_system = afm.uns['scLT_system']
pp_method = afm.uns['pp_method']
if scLT_system == 'MAESTER' and pp_method in ['mito_preprocessing', 'maegatk']:
pass
else:
raise ValueError(f'weng2024 filter not available for scLT_system {scLT_system} and pp_method {pp_method}')
# Filter Weng et al., 2024
# vars_filter.tib <- vars.tib %>% filter(mean_cov > 5, quality >= 30, n0 > 0.9*ncol(af.dm),Variant_CellN>=2)
## Apply the same filter as in MAESTER
# IsInfo<-function(x){
# total<-length(x)
# if(length(which(x<10))/total>0.1 & length(which(x>50))>10){
# return("Variable")
# }else{
# return("Non")
# }
# }
# Variability<-apply(af.dm,1,IsInfo) %>% data.frame(Info=.)
# vars_filter.tib<-Tomerge_v2(vars_filter.tib,Variability)
annotate_vars(afm, overwrite=True)
test = (
(afm.var['mean_cov']>min_site_cov) & \
(afm.var['quality']>=min_var_quality) & \
(afm.var['n0']>min_frac_negative*afm.shape[0]) & \
(afm.var['Variant_CellN']>=min_n_positive)
)
afm = afm[:,test].copy()
# Detect "Variable" variants as in MAESTER
# IsInfo<-function(x){
# total<-length(x)
# if(length(
# which(x<10))/total>0.1 # Test1 : low prevalence of minimal detection.
# &
# length(which(x>50))>10) # Test2 : enough cells with confident detection.
# {
# return("Variable")
# }else{
# return("Non")
# }
# }
# Variability<-apply(af.dm,1,IsInfo) %>% data.frame(Info=.)
t1 = (afm.X<low_confidence_af).sum(axis=0).A1 / afm.shape[0] > min_prevalence_low_confidence_af
t2 = (afm.X>high_confidence_af).sum(axis=0).A1 > min_cells_high_confidence_af
test = t1 & t2
afm = afm[:,test].copy()
return afm
##
[docs]
def filter_MiTo(
afm: AnnData,
min_cov: float = 5,
min_var_quality: float = 30,
min_frac_negative: float = 0.2,
min_n_positive: int = 5,
af_confident_detection: float = .02,
min_n_confidently_detected: int = 2,
min_mean_AD_in_positives: float = 1.25,
min_mean_DP_in_positives: float = 25
) -> AnnData:
"""
MiTo custom filter. Filter variants with:
- At least `min_cov` mean site coverage (across cells)
- At least `min_var_quality` mean variant allele basecall quality (across cells)
- At least n cells * `min_frac_negative` negative cells
- At least `min_n_positive` (AF > 0) cells
- At least `min_n_confidently_detected` cells in which the variant has been detected with AF greater than `af_confident_detection`
- At least `min_mean_AD_in_positives` mean AD in positive cells
- At least `min_mean_DP_in_positives` mean DP in positive cells
Parameters
----------
afm : AnnData
Allele Frequency Matrix.
min_cov : float
Minimum mean site coverage (across cells). Default is 5.
min_var_quality : float
Minimum mean variant allele basecall quality (across cells). Default is 30.
min_frac_negative : float
Minimum fraction of negative cells (expressed as a fraction of total cells). Default is 0.2.
min_n_positive : int
Minimum number of cells with AF > 0. Default is 5.
min_n_confidently_detected : int
Minimum number of cells in which the variant has been detected with an AF greater than
`af_confident_detection`. Default is 2.
af_confident_detection : float
Allele frequency threshold for confident detection. Default is 0.02.
min_mean_AD_in_positives : float
Minimum mean alternative allele count (AD) in positive cells. Default is 1.25.
min_mean_DP_in_positives : float
Minimum mean total UMI counts (DP) in positive cells. Default is 25.
Returns
-------
afm : AnnData
Filtered Allele Frequency Matrix.
"""
scLT_system = afm.uns['scLT_system']
pp_method = afm.uns['pp_method']
annotate_vars(afm, overwrite=True)
afm.var['n_confidently_detected'] = (afm.X>=af_confident_detection).sum(axis=0).A1
if scLT_system == 'MAESTER':
if pp_method in ['mito_preprocessing', 'maegatk']:
test = (
(afm.var['mean_cov']>=min_cov) & \
(afm.var['quality']>=min_var_quality) & \
(afm.var['n0']>=min_frac_negative*afm.shape[0]) & \
(afm.var['Variant_CellN']>=min_n_positive) & \
(afm.var['n_confidently_detected']>=min_n_confidently_detected) & \
(afm.var['mean_AD_in_positives']>=min_mean_AD_in_positives) & \
(afm.var['mean_DP_in_positives']>=min_mean_DP_in_positives)
)
afm = afm[:,test].copy()
else:
raise ValueError(f'MiTo filter not available for pp_method: {pp_method}')
elif scLT_system == 'RedeeM':
test = (
(afm.var['mean_cov']>=min_cov) & \
(afm.var['n0']>=min_frac_negative*afm.shape[0]) & \
(afm.var['Variant_CellN']>=min_n_positive) & \
(afm.var['n_confidently_detected']>=min_n_confidently_detected) & \
(afm.var['mean_AD_in_positives']>=min_mean_AD_in_positives) & \
(afm.var['mean_DP_in_positives']>=min_mean_DP_in_positives)
)
afm = afm[:,test].copy()
else:
raise ValueError(f'MiTo filter not available for scLT_system: {scLT_system}')
return afm
##
[docs]
def compute_lineage_biases(
afm: AnnData,
lineage_column: str,
target_lineage: str,
bin_method: str = 'MiTo',
binarization_kwargs: Dict[str,Any] = {},
alpha: float = .05
) -> pd.DataFrame:
"""
Compute MT-SNVs enrichment scores for a given lineage category using Fisher's exact test.
Parameters
----------
afm : AnnData
Allele Frequency Matrix.
lineage_column : str
Field in afm.obs containing the 'lineage' categorical variable.
target_lineage : str
The category in afm.obs[lineage_column] to test for MT-SNV enrichment.
bin_method : str, optional
Genotyping method. Default is "MiTo".
binarization_kwargs : dict, optional
Additional keyword arguments for genotyping. Default is {}.
alpha : float, optional
Family-wise error rate for p-value correction. Default is 0.05.
Returns
-------
results : pd.DataFrame
DataFrame containing computed statistics (e.g., -log10(FDR) from Fisher's exact test).
"""
if lineage_column not in afm.obs.columns:
raise ValueError(f'{lineage_column} not present in cell metadata!')
muts = afm.var_names
prevalences_array = np.zeros(muts.size)
target_ratio_array = np.zeros(muts.size)
oddsratio_array = np.zeros(muts.size)
pvals = np.zeros(muts.size)
if 'bin' not in afm.layers:
call_genotypes(afm, bin_method=bin_method, **binarization_kwargs)
# Here we go
G = afm.layers['bin'].toarray()
for i in range(muts.size):
test_mut = G[:,i] == 1
test_lineage = afm.obs[lineage_column] == target_lineage
n_mut_lineage = np.sum(test_mut & test_lineage)
n_mut_no_lineage = np.sum(test_mut & ~test_lineage)
n_no_mut_lineage = np.sum(~test_mut & test_lineage)
n_no_mut_no_lineage = np.sum(~test_mut & ~test_lineage)
prevalences_array[i] = n_mut_lineage / test_lineage.sum()
target_ratio_array[i] = n_mut_lineage / test_mut.sum()
# Fisher
oddsratio, pvalue = fisher_exact(
[
[n_mut_lineage, n_mut_no_lineage],
[n_no_mut_lineage, n_no_mut_no_lineage],
],
alternative='greater',
)
oddsratio_array[i] = oddsratio
pvals[i] = pvalue
# Correct pvals --> FDR
pvals = multipletests(pvals, alpha=alpha, method="fdr_bh")[1]
# Results
results = (
pd.DataFrame({
'prevalence' : prevalences_array,
'perc_in_target_lineage' : target_ratio_array,
'odds_ratio' : oddsratio_array,
'FDR' : pvals,
'lineage_bias' : -np.log10(pvals)
}, index=muts
)
.sort_values('lineage_bias', ascending=False)
)
return results
##
def filter_GT_enriched(
afm: AnnData,
lineage_column: str = None,
fdr_treshold: float = .1,
n_enriched_groups: int = 2,
bin_method: str = 'MiTo',
binarization_kwargs: Dict[str,Any] = {}
) -> AnnData:
"""
Compute MT-SNVs enrichment scores for a given lineage category using
-log10(FDR) from Fisher's exact test.
Parameters
----------
afm : AnnData
Allele Frequency Matrix.
lineage_column : str
Field in afm.obs that contains the 'lineage' categorical variable.
target_lineage : str
The category in afm.obs[lineage_column] to test for MT-SNV enrichment.
bin_method : str, optional
Genotyping method. Default is "MiTo".
binarization_kwargs : dict, optional
Additional keyword arguments for genotyping. Default is {}.
alpha : float, optional
Family-wise error rate for p-value correction. Default is 0.05.
Returns
-------
results : pd.DataFrame
Computed statistics.
"""
if lineage_column is not None and lineage_column in afm.obs.columns:
pass
else:
raise ValueError(f'{lineage_column} not available in afm.obs!')
L = []
lineages = afm.obs[lineage_column].dropna().unique()
for target_lineage in lineages:
print(f'Computing variants enrichment for lineage {target_lineage}...')
res = compute_lineage_biases(afm, lineage_column, target_lineage,
bin_method=bin_method, binarization_kwargs=binarization_kwargs)
L.append(res['FDR']<=fdr_treshold)
df_enrich = pd.concat(L, axis=1)
df_enrich.columns = lineages
test = df_enrich.apply(lambda x: np.sum(x>0)>0 and np.sum(x>0)<=n_enriched_groups, axis=1)
vois = df_enrich.loc[test].index.unique()
id_lineages = df_enrich.loc[test].sum(axis=0).loc[lambda x: x>0].index.to_list()
cells = afm.obs[lineage_column].loc[lambda x: x.isin(id_lineages)].index
afm = afm[cells, vois].copy()
return afm
##
def moran_I(W, x, num_permutations=100):
"""
Calculate normalized Moran's I statistics and permutation-based pvalue.
"""
W = W / W.sum()
x_stdzd = (x-np.mean(x)) / np.std(x,ddof=0)
I_obs = x_stdzd.T @ W @ x_stdzd
# Perform permutation test
permuted_Is = np.zeros(num_permutations)
for i in range(num_permutations):
x_perm = np.random.permutation(x_stdzd)
permuted_Is[i] = x_perm.T @ W @ x_perm
p_value = np.sum(permuted_Is >= I_obs) / num_permutations
return I_obs, p_value
##
def _compute_moran_batch(start_pos, end_pos, W, X, num_permutations):
"""
Compute Moran's I for a batch of variants using joblib.
Parameters:
- start_pos: start index for variant batch
- end_pos: end index for variant batch
- W: spatial weights matrix
- X: data matrix
- num_permutations: number of permutations for p-value
Returns:
- List of tuples: [(I, p_value), ...]
"""
results = []
for i in range(start_pos, end_pos):
I, p_value = moran_I(W, X[:,i], num_permutations=num_permutations)
results.append((I, p_value))
return results
##
def filter_variant_moransI(
afm: AnnData,
num_permutations: int = 100,
pval_treshold: float = .01,
n_cores: int = None
) -> AnnData:
"""
Filter out MT-SNVs if not significantly auto-correlated (PARALLEL VERSION).
Uses joblib for parallel processing.
"""
assert 'distances' in afm.obsp
W = 1-afm.obsp['distances'].toarray()
X = afm.X.toarray()
# Set number of cores
if n_cores is None:
n_cores = max(1, os.cpu_count()-1)
# Prepare batches
nvars = X.shape[1]
quotient = nvars // n_cores
residue = nvars % n_cores
intervals = []
start = end = 0
for i in range(n_cores):
end = start + quotient + (i < residue)
if end == start:
break
intervals.append((start, end))
start = end
# Parallel computation
with parallel_backend("loky", inner_max_num_threads=1):
result_list = Parallel(n_jobs=len(intervals))(
delayed(_compute_moran_batch)(
start_pos,
end_pos,
W,
X,
num_permutations
)
for start_pos, end_pos in intervals
)
# Flatten results from all batches
I_list = []
P_list = []
for batch_results in result_list:
for I, P in batch_results:
I_list.append(I)
P_list.append(P)
# Store results in afm
afm.var['Moran I '] = I_list
afm.var['Moran I pvalue'] = P_list
# Filter variants by pvalue
var_to_retain = afm[:,afm.var['Moran I pvalue']<=pval_treshold].var_names
afm = afm[:,var_to_retain].copy()
return afm
##
def filter_dbSNP_common(afm: AnnData):
"""
Filter Allele Frequency Matrix from "COMMON" MT-SNVs (annotated dbSNP database).
"""
common = load_common_dbSNP()
n_dbSNP = afm.var_names.isin(common).sum()
logging.info(f'Exclude {n_dbSNP} common SNVs events (dbSNP)')
variants = afm.var_names[~afm.var_names.isin(common)]
afm = afm[:,variants].copy()
return afm, n_dbSNP
##
def filter_REDIdb_edits(afm: AnnData):
"""
Filter Allele Frequency Matrix from previously annotated RNA-edits (REDIdb database).
"""
edits = load_edits_REDIdb()
n_REDIdb = afm.var_names.isin(edits).sum()
logging.info(f'Exclude {n_REDIdb} common RNA editing events (REDIdb)')
variants = afm.var_names[~afm.var_names.isin(edits)]
afm = afm[:,variants].copy()
return afm, n_REDIdb
##