Source code for mito.io.phylo_io

"""
I/O functions to read/write CassiopeiaTrees from annotated (supports) .newick strigs.
"""

import pandas as pd
from cassiopeia.data import CassiopeiaTree
from Bio.Phylo.NewickIO import Parser
from io import StringIO
import networkx as nx


##


def _add_edges(G, clade, parent=None, counter=[1]):
    """
    Update the binary graph recursively.
    """
    if clade.is_terminal():
        node_name = clade.name
    else:
        node_name = f"internal_{counter[0]}"
        counter[0] += 1

    G.add_node(node_name, support=clade.confidence)
    if parent:
        branch_length = clade.branch_length if clade.branch_length is not None else 1
        G.add_edge(parent, node_name, length=branch_length)
    for child in clade.clades:
        _add_edges(G, child, node_name, counter)


##


[docs] def read_newick( path, X_raw: pd.DataFrame = None, X_bin: pd.DataFrame = None, D: pd.DataFrame = None, meta: pd.DataFrame = None ) -> CassiopeiaTree: """ Read a newick string as CassiopeiaTree object. Parameters ---------- path : str Path to newick string. X_raw : pd.DataFrame, optional Raw allelic frequency table. Cell x variants. Default is None. X_bin : pd.DataFrame, optional Binary (1,0) cell genotypes. Cell x variants. Default is None. D : pd.DataFrame, optional Cell x cell distance matrix. Default is None. meta : pd.DataFrame, optional Cell metadata. Cell x covariates. Default is None. Returns ------- afm : AnnData The assembled Allele Frequency Matrix (AFM). """ with open(path, 'r') as f: newick = f.read().strip() parser = Parser(StringIO(newick)) original_tree = list(parser.parse())[0] G = nx.DiGraph() _add_edges(G, original_tree.root, counter=[1]) edge_list = [] for u, v, data in G.edges(data=True): length = data['length'] if 'length' in data else 0.0 edge_list.append((u, v, length)) cells = [ x for x in G.nodes if not x.startswith('internal') ] cassiopeia_tree = CassiopeiaTree( tree=G, character_matrix=X_bin.loc[cells,:] if X_bin is not None else None, dissimilarity_map=D.loc[cells,cells] if D is not None else None, cell_meta=meta.loc[cells,:] if meta is not None else None ) if X_raw is not None and X_bin is not None: cassiopeia_tree.layers['raw'] = X_raw.loc[cells,:] cassiopeia_tree.layers['transformed'] = X_bin.loc[cells,:] for u, v, length in edge_list: cassiopeia_tree.set_branch_length(u, v, length) for node in G.nodes: if 'support' in G.nodes[node]: support = G.nodes[node]['support'] cassiopeia_tree.set_attribute(node, 'support', support) return cassiopeia_tree
## def to_DiGraph(tree: CassiopeiaTree) -> nx.DiGraph: """ Create a nx.DiGraph from annotated (i.e., support, for now) CassiopeiaTree. """ G = nx.DiGraph() for node in tree.nodes: try: G.add_node(node, support=tree.get_attribute(node, 'support')) except: pass for u, v, in tree.edges: G.add_edge(u, v, branch_length=tree.get_branch_length(u, v)) return G ## def _to_newick_str(g, node): is_leaf = g.out_degree(node) == 0 branch_length_str = "" support_str = "" if g.in_degree(node) > 0: parent = list(g.predecessors(node))[0] branch_length_str = ":" + str(g[parent][node]["branch_length"]) if 'support' in g.nodes[node] and g.nodes[node]['support'] is not None: try: support_str = str(int(g.nodes[node]['support'])) except: support_str = "0" _name = str(node) return ( "%s" % (_name,) + branch_length_str if is_leaf else ( "(" + ",".join( _to_newick_str(g, child) for child in g.successors(node) ) + ")" + (support_str if support_str else "") + branch_length_str ) ) ##
[docs] def write_newick(tree: CassiopeiaTree, path: str): """ Write a CassiopeiaTree as a newick string. Parameters ---------- tree : CassiopeiaTree Tree to write. path : str Path to newick string. """ G = to_DiGraph(tree) root = [ node for node in G if G.in_degree(node) == 0 ][0] newick = f'{_to_newick_str(G, root)};' with open(path, 'w') as f: f.write(newick)
##