Source code for microbetag.build_mtg_cx2

"""
Aim:
    Gets an edgelist as input along with a df mentioning the NCBI Tax ids and the GTDB ids of the corresponding taxa
    and based on the annotation types asked, it build a cx-format graph that is going to be the final return object of microbetag

Author:
    Haris Zafeiropoulos

Based on:
    https://github.com/msysbio/microbetagApp/blob/develop/services/web/microbetag/scripts/buid_cx_annotated_graph.py
"""
import os
import pickle
import json
import logging
import datetime
import ndex2.cx2
import pyshorteners
import pandas as pd
from typing import Dict

from .networks import get_edgelist, read_cyjson
from .utils import extend_complements, extend_faprotax, load_phenotypic_traits
from .seed_complementarity import build_url_with_seed_complements, load_seed_complement_files



[docs] def build_pseudo_cx(conf): """ Builds a .cx (version 2) file, the user can then load on Cytoscape and parse it through the MGG app. Important! In every case, we need a way to map sequence id to a taxonomy. Either an abundance table or a sequence id to taxonomy map (``sequence_id_taxonomy_map``) is required. """ # Load edgelist edgelist = get_edgelist(conf) # Get pairs of nodes with an association unique_associated_pairs = {tuple(row) for _, row in edgelist.iloc[:, :2].iterrows()} # Make the map sequence to taxon df a dictionary seq_id_to_taxonomy_dic = conf.seq_to_taxon_df.set_index( conf.seq_to_taxon_df.columns[0] )[ conf.seq_to_taxon_df.columns[1] ].to_dict() # Init formatting microbetag annotations annotated_cx = [] init = {} init["numberVerification"] = [{"longNumber": 281474976710655}] annotated_cx.append(init) metadata = {} metadata["metaData"] = [ {"name": "cyTableColumn", "version": "1.0"}, {"name": "nodes", "version": "1.0"}, {"name": "edges", "version": "1.0"}, {"name": "nodeAttributes", "version": "1.0"}, {"name": "edgeAttributes", "version": "1.0"}, {"name": "networkAttributes", "version": "1.0"}, {"name": "cartesianLayout", "version": "1.0"}, ] annotated_cx.append(metadata) # =========== # GET COLUMN NAMES FOR ALL TABLES # =========== table_columns = {} table_columns["cyTableColumn"] = [] cyTableColumns = [ # for node table mandatory {"applies_to": "node_table", "n": "@id", "d": "string"}, {"applies_to": "node_table", "n": "name", "d": "string"}, # microbetag-oriented columns {"applies_to": "node_table", "n": "microbetag::taxon name", "d": "string"}, {"applies_to": "node_table", "n": "microbetag::namespace"}, {"applies_to": "node_table", "n": "microbetag::taxonomy", "d": "string"}, {"applies_to": "node_table", "n": "microbetag::gtdb-genomes", "d": "list_of_string"}, {"applies_to": "node_table", "n": "microbetag::ncbi-tax-level", "d": "string"}, # for edge table mandatory {"applies_to": "edge_table", "n": "shared name"}, {"applies_to": "edge_table", "n": "interaction type"}, {"applies_to": "edge_table", "n": "microbetag::weight", "d": "double"} ] """Phen traits""" # IF PHEND ASKED..? logging.info("Loading phenotypic traits") bin_phen_traits, phentraits = load_phenotypic_traits(conf) for term in phentraits: cyTableColumns.extend([ {"applies_to": "node_table", "n": "::".join(["phendb", term]), "d": "boolean"}, {"applies_to": "node_table", "n": "::".join(["phendbScore", "".join([term, "Score"])]), "d": "double"} ]) """FAPROTAX traits""" bin_faprotax_traits = faprotax_traits = None if conf.abundance_table is not None: logging.info("Loading FAPROTAX traits") bin_faprotax_traits, faprotax_traits = extend_faprotax(conf=conf) # Node columns reg. FAPROTAX annotations for term in faprotax_traits: column = {"applies_to": "node_table", "n": "::".join(["faprotax", term]), "d": "boolean"} cyTableColumns.append(column) """PATHWAY complements""" logging.info("Loading patwhay complementarities") complements_dict_ext = None if conf.pathway_complementarity: complements_dict_ext = extend_complements( complements_json=conf.compl_file, descrps_path=conf.module_descriptions, max_scratch_alt=conf.max_scratch_alt, pathway_complements_dir=conf.pathway_complements_dir, pathway_complement_percentage=conf.pathway_complement_percentage, ) for edge in unique_associated_pairs: edge_col = {"applies_to": "edge_table", "n": "".join(["compl::", edge[0], ":", edge[1]]), "d": "list_of_string"} cyTableColumns.append(edge_col) """SEED complements""" logging.info("Loading seeds complementarities") kmap = seed_scores = non_seed_sets = seed_complements_dict = None if conf.seed_complementarity: kmap = load_seed_complement_files(conf.kegg_mappings) with open(conf.module_related_non_seeds, "rb") as f: non_seed_sets = pickle.load(f) with open(conf.seed_complements, "rb") as f: seed_complements = pickle.load(f) # NOTE: rows of the df from pickle will be keys -- beneficiary ; columns the donor seed_complements_dict = seed_complements.to_dict(orient="index") for beneficiary, donors in seed_complements_dict.items(): for donor, complements in donors.items(): if len(complements) > 0: cyTableColumns.append( {"applies_to": "edge_table", "n": "".join(["seedCompl::", beneficiary, ":", donor]), "d": "list_of_string"} ) """SEED scores""" cyTableColumns.extend([ {"applies_to": "edge_table", "n": "::".join(["seed", "competition"]), "d": "double"}, {"applies_to": "edge_table", "n": "::".join(["seed", "cooperation"]), "d": "double"} ]) seed_scores = pd.read_csv(conf.phylomint_scores, sep="\t", header=None, skiprows=1) seed_scores.columns = ["A", "B", "Competition", "Complementarity"] """MANTA CLUSTERS""" manta_annotations = cartesianLayout = None if conf.network_clustering: cartesianLayout = {}; cartesianLayout["cartesianLayout"] = [] m1 = {"applies_to": "node_table", "n": "::".join(["manta", "cluster"]), "d": "integer"} m2 = {"applies_to": "node_table", "n": "::".join(["manta", "assignment"]), "d": "string"} cyTableColumns.append(m1) cyTableColumns.append(m2) # manta_output_file = conf.manta_net # "/".join([conf.output_dir, 'manta_annotated.cyjs']) manta_net = read_cyjson(conf.manta_net) clusters = list(manta_net.nodes(data="cluster")) assignments = list(manta_net.nodes(data="assignment")) positions = list(manta_net.nodes(data="position")) manta_annotations = {} for pair in clusters: manta_annotations[pair[0]] = {} manta_annotations[pair[0]]["cluster"] = pair[1] for pair in assignments: manta_annotations[pair[0]]["assignment"] = pair[1] for pair in positions: manta_annotations[pair[0]]["position"] = pair[1] table_columns["cyTableColumn"] = cyTableColumns annotated_cx.append(table_columns) # =========== # NETWORK TABLE # =========== networkAttributes = {} networkAttributes["networkAttributes"] = [] networkAttributes["networkAttributes"].extend([ {"n": "network type", "v": "microbetagAnnotated"}, {"n": "name", "v": "microbetagNetwork"}, {"n": "uri", "v": "https://hariszaf.github.io/microbetag/"}, {"n": "version", "v": "1.0"} ]) annotated_cx.append(networkAttributes) # NODES TABLE nodes, nodeAttributes, seq_to_nodeID, node_counter = build_mtg_nodes( conf, edgelist, seq_id_to_taxonomy_dic, bin_phen_traits, bin_faprotax_traits, manta_annotations, cartesianLayout ) annotated_cx.append(nodes) annotated_cx.append(nodeAttributes) # =========== # EDGES TABLE # =========== edges, edgeAttributes = build_mtg_edges(conf, edgelist, seq_to_nodeID, node_counter, complements_dict_ext, seed_scores, seed_complements_dict, non_seed_sets, kmap) annotated_cx.append(edges) annotated_cx.append(edgeAttributes) # POST-metadata post_metadata = {} post_metadata["metaData"] = [] post_metadata["metaData"].extend([ {"name": "nodeAttributes", "elementCount": len(nodeAttributes["nodeAttributes"]), "version": 1.0}, {"name": "edgeAttributes", "elementCount": len(edgeAttributes["edgeAttributes"]), "version": 1.0}, {"name": "cyTableColumn", "elementCount": len(table_columns["cyTableColumn"]), "version": 1.0}, {"name": "edges", "elementCount": len(edges["edges"]), "idCounter": node_counter + 1000, "version": 1.0}, {"name": "nodes", "elementCount": len(nodes["nodes"]), "idCounter": 1001, "version": 1.0}, {"name": "networkPropernetworkAttributesties", "elementCount": len(networkAttributes["networkAttributes"]), "version": 1.0}, ]) annotated_cx.append(post_metadata) # Status status = {}; status["status"] = [] status["status"].append({"error": "", "success": True}) annotated_cx.append(status) return annotated_cx
[docs] def build_mtg_nodes(conf, edgelist, seq_id_to_taxonomy, phen_traits, faprotax_traits, manta_annotations=None, cartesian_layout=None): """ Builds the nodes and attributes for the pseudo-CX Microbetag network. """ def add_node_attribute(attributes, node_id, name, value, dtype): """Helper function to add a node attribute.""" attributes.append({"po": node_id, "n": name, "v": value, "d": dtype}) def handle_phen_traits(node_id, seq_id, phen_traits, attributes): """Handles phenotypic traits for a node.""" if seq_id in phen_traits: for trait, data in phen_traits[seq_id].items(): presence = data.get("presence") == "YES" confidence = data.get("confidence", 0.0) add_node_attribute(attributes, node_id, f"phendb::{trait}", presence, "boolean") add_node_attribute(attributes, node_id, f"phendbScore::{trait}Score", str(confidence), "double") return True return False def handle_faprotax_traits(node_id, seq_id, faprotax_traits, attributes): """Handles FAPROTAX traits for a node.""" if seq_id in faprotax_traits: for term in faprotax_traits[seq_id]: add_node_attribute(attributes, node_id, f"faprotax::{term}", True, "boolean") def handle_manta_annotations(node_id, seq_id, manta_annotations, layout, attributes): """Handles manta cluster annotations and Cartesian layout.""" if seq_id in manta_annotations: annotation = manta_annotations[seq_id] add_node_attribute(attributes, node_id, "manta::cluster", int(annotation["cluster"]), "integer") add_node_attribute(attributes, node_id, "manta::assignment", annotation["assignment"], "string") layout["cartesianLayout"].append({ "node": node_id, "x": annotation["position"]["x"], "y": annotation["position"]["y"] }) set_of_nodes = set(edgelist.iloc[:, :2].values.ravel()) nodes = {"nodes": []} node_attributes = {"nodeAttributes": []} seq_to_node_id = {} node_counter = 1000 for node_name in set_of_nodes: node_counter += 1 node_id = node_counter seq_to_node_id[node_name] = node_id # Add basic node information nodes["nodes"].append({"@id": node_id, "n": node_name}) add_node_attribute(node_attributes["nodeAttributes"], node_id, "@id", node_name, "string") add_node_attribute(node_attributes["nodeAttributes"], node_id, "name", node_name, "string") if node_name in conf.seq_ids: check_mspecies = False # Handle phenotypic traits if handle_phen_traits(node_id, node_name, phen_traits, node_attributes["nodeAttributes"]): check_mspecies = True # Handle FAPROTAX traits if conf.abundance_table is not None: handle_faprotax_traits(node_id, node_name, faprotax_traits, node_attributes["nodeAttributes"]) # Add taxonomy information tax_level = "mspecies" if check_mspecies else "other" add_node_attribute(node_attributes["nodeAttributes"], node_id, "microbetag::ncbi-tax-level", tax_level, "string") taxonomy = seq_id_to_taxonomy.get(node_name) if taxonomy: add_node_attribute(node_attributes["nodeAttributes"], node_id, "microbetag::taxonomy", taxonomy, "string") else: logging.info(f"No taxonomy found for node {node_name}") # Handle manta annotations if clustering is enabled if conf.network_clustering and manta_annotations: handle_manta_annotations(node_id, node_name, manta_annotations, cartesian_layout, node_attributes["nodeAttributes"]) # Assign metavariables if metadata is provided if conf.metadata_file is not None and node_name not in conf.seq_ids: add_node_attribute(node_attributes["nodeAttributes"], node_id, "microbetag::ncbi-tax-level", "metavar", "string") return nodes, node_attributes, seq_to_node_id, node_counter
[docs] def build_mtg_edges(conf, edgelist, seq_to_nodeID, node_counter, complements_dict_ext, seed_scores, seed_complements_dict, non_seed_sets, kmap): edges = {}; edges["edges"] = [] edgeAttributes = {}; edgeAttributes["edgeAttributes"] = [] edge_counter = node_counter + 1000 shortener = pyshorteners.Shortener() if conf.tinyurl else None logging.info("Shortener in the main build_cx function is %s" % shortener) for case in edgelist.iterrows(): id_a = case[1][0] id_b = case[1][1] net_id_a = seq_to_nodeID[id_a] # get the bin_id and then get its corresponding id on the net net_id_b = seq_to_nodeID[id_b] score = case[1][2] # NOTE: In this case, we do not care about the source and the target since it is actually undirectional edge = {"@id": edge_counter, "s": net_id_a, "t": net_id_b, "i": "cooccurrence/depletion"} edges["edges"].append(edge) edgeAttributes["edgeAttributes"].append({"po": edge_counter, "n": "microbetag::weight", "v": str(score), "d": "double"}) if score > 0: edgeAttributes["edgeAttributes"].extend([ {"po": edge_counter, "n": "shared name", "v": " ".join([id_a, "(cooccurss with)", id_b]), "d": "string"}, {"po": edge_counter, "n": "interaction type", "v": "cooccurrence", "d": "string"} ]) else: edgeAttributes["edgeAttributes"].extend([ {"po": edge_counter, "n": "shared name", "v": " ".join([id_a, "(depletes)", id_b]), "d": "string"}, {"po": edge_counter, "n": "interaction type", "v": "depletion", "d": "string"} ]) edge_counter += 1 """ Edge for A -> B NOTE: we want as source of the edge the DONOR taxo -- since it is the one providing to the BENEFICIARY (target) """ check1 = False if conf.pathway_complementarity: # Potential pathway compl edge pot_edge = {"@id": (edge_counter), "t": net_id_a, "s": net_id_b, "i": "comp_coop"} # Path complements A -> B check1 = add_edge_pathway_complements(id_a, id_b, complements_dict_ext, edges, edgeAttributes, pot_edge, edge_counter) check2 = False if conf.seed_complementarity: # Seed complements A -> B check2 = add_edge_seed_complements( id_a, id_b, seed_complements_dict, edges, edgeAttributes, pot_edge, edge_counter, kmap, non_seed_sets, shortener ) # Seed scores A -> B add_seed_edge_attributes(id_a, id_b, seed_scores, edgeAttributes, edge_counter) if check1 or check2: edge_counter += 1 """ Edge for B -> A """ if conf.pathway_complementarity: pot_edge = {"@id": (edge_counter), "t": net_id_b, "s": net_id_a, "i": "comp_coop"} # Path complements B -> A check1 = False if conf.pathway_complementarity: check1 = add_edge_pathway_complements(id_b, id_a, complements_dict_ext, edges, edgeAttributes, pot_edge, edge_counter) check2 = False if conf.seed_complementarity: # Seed complements B -> A check2 = add_edge_seed_complements(id_b, id_a, seed_complements_dict, edges, edgeAttributes, pot_edge, edge_counter, kmap, non_seed_sets, shortener) # Seed scores B -> A add_seed_edge_attributes(id_b, id_a, seed_scores, edgeAttributes, edge_counter) if check1 or check2: edge_counter += 1 return edges, edgeAttributes
[docs] def seqId_faprotax_functions_assignment(path_to_subtables): """ Parse the sub tables of the faprotax analysis to assign the biological processes related to each sequence id """ seqId_faprotax_assignments = {} for subtable_file in os.listdir(path_to_subtables): f = os.path.join(path_to_subtables, subtable_file) process_name = subtable_file.split(".")[0].replace("_", " ") table_file = open(f, "r") table_file = table_file.readlines() for line in table_file[2:]: seqId = line.split("\t")[1] if seqId not in seqId_faprotax_assignments: seqId_faprotax_assignments[seqId] = [process_name] else: seqId_faprotax_assignments[seqId].append(process_name) return seqId_faprotax_assignments
[docs] def add_edge_pathway_complements(id_x, id_y, complements_dict_ext, edges, edgeAttributes, pot_edge, edge_counter): """ id_x beneficiary id_y donor """ check = False if id_x in complements_dict_ext and id_y in complements_dict_ext[id_x]: pathway_complements = complements_dict_ext[id_x][id_y] if not isinstance(pathway_complements, Dict): logging.info(f"Taxa {id_x} and {id_y} found to have not pathway complements") return check edges["edges"].append(pot_edge) edgeAttributes["edgeAttributes"].extend([ {"po": edge_counter, "n": "shared name", "v": f"{id_x} (completes/competes with) {id_y}", "d": "string"}, {"po": edge_counter, "n": "interaction type", "v": "completes/competes with", "d": "string"} ]) attr = f"compl::{id_x}:{id_y}" merged_compl = ["^".join(gcompl) for gcompl in pathway_complements.values()] edgeAttributes["edgeAttributes"].append({"po": edge_counter, "n": attr, "v": merged_compl, "d": "list_of_string"}) check = True return check
[docs] def add_edge_seed_complements( id_x, id_y, complements_dict, edges, edgeAttributes, pot_edge, edge_counter, kmap, non_seed_sets, shortener ): """ Appends the seed complementarities between two taxa as attributes to their corresponding edge id_x: id_y: """ check = False if id_x in complements_dict and id_y in complements_dict[id_x]: if pot_edge not in edges["edges"]: edges["edges"].append(pot_edge) edgeAttributes["edgeAttributes"].extend([ {"po": edge_counter, "n": "shared name", "v": f"{id_x} (completes/competes with) {id_y}", "d": "string"}, {"po": edge_counter, "n": "interaction type", "v": "completes/competes with", "d": "string"} ]) complements = complements_dict[id_x][id_y] complements_map = kmap[kmap['modelseed'].isin(complements)] maps_in = list(kmap[kmap['modelseed'].isin(complements)]["map"].unique()) beneficiarys_nonseed = non_seed_sets.loc[id_x].to_list()[0] beneficiarys_nonseeds_map = kmap[kmap['modelseed'].isin(beneficiarys_nonseed)] complements_verbose = [] for kegg_map in maps_in: ksc = list(complements_map[complements_map["map"] == kegg_map]["kegg_compound"]) msc = ";".join(set(complements_map[complements_map["map"] == kegg_map]["modelseed"])) ns = list(beneficiarys_nonseeds_map[beneficiarys_nonseeds_map["map"] == kegg_map]["kegg_compound"]) surl = build_url_with_seed_complements(ksc, ns, kegg_map, shortener) des = kmap[kmap["map"] == kegg_map]["description"].unique().item() cat = kmap[kmap["map"] == kegg_map]["category"].unique().item() ksc = ";".join(set(ksc)) complements_verbose.append([cat, des, msc, ksc, surl]) attr = f"seedCompl::{id_x}:{id_y}" merged_compl = ["^".join(gcompl) for gcompl in complements_verbose] edgeAttributes["edgeAttributes"].append({"po": edge_counter, "n": attr, "v": merged_compl, "d": "list_of_string"}) check = True return check
[docs] def add_seed_edge_attributes(id_x, id_y, seed_scores, edgeAttributes, edge_counter): matching_rows = seed_scores[(seed_scores['A'] == id_x) & (seed_scores['B'] == id_y)] if not matching_rows.empty: comp = matching_rows["Competition"].item() coop = matching_rows["Complementarity"].item() edgeAttributes["edgeAttributes"].extend([ {"po": edge_counter, "n": "seed::competition", "v": str(comp), "d": "double"}, {"po": edge_counter, "n": "seed::cooperation", "v": str(coop), "d": "double"} ])
[docs] class UpdateCX2Netork(): """ Convert the initial microbetag-annotated network to a .cx2 format file. """ def __init__(self, microbetag_cx, outfile=None): """ Initializes a cx2 microbetag-network converter. Attributes ----------- microbetag_cx (list): microbetag-annotated network in initial format """ self.graphml_file = None self.outfile = outfile if not isinstance(microbetag_cx, list): print("microbetag_cx:", microbetag_cx) try: with open(microbetag_cx, "r") as f: self.initial_cx = json.load(f) # base, _ = os.path.splitext(os.path.basename(microbetag_cx)) edir = os.path.dirname(microbetag_cx) self.graphml_file = edir except: raise ValueError("Please provide either a path to a initial cx file or the list returned when this is loaded") else: self.initial_cx = microbetag_cx try: # Build nodes self.cx2_nodes = self.get_nodes() # Build edges self.cx2_edges = self.get_edges() except: raise TypeError("File provided is not a valid json.")
[docs] def get_nodes(self): """ Builds cx2-like nodes based on the initial network Returns ------- nodes (list): List with cx2-like nodes after the ndex2 library """ nodes = [] for nid in self.initial_cx[4]["nodes"]: node = {} node["id"] = nid["@id"] node["v"] = {} for attribute in self.initial_cx[5]["nodeAttributes"]: if attribute["po"] == node["id"]: node["v"].update({attribute["n"]: attribute["v"]}) try: node["v"]["name"] = node["v"]["display name"] node["v"].pop("display name") except: logging.info("No display name found for node %s" % node["id"]) pass try: if len(node["v"]["microbetag::taxonomy"].split(";") ) == 7: (node["v"]["taxonomy::domain"], node["v"]["taxonomy::phylum"], node["v"]["taxonomy::class"], node["v"]["taxonomy::oder"], node["v"]["taxonomy::family"], node["v"]["taxonomy::genus"], node["v"]["taxonomy::species"]) = node["v"]["microbetag::taxonomy"].split(";") except: logging.info("No taxonomy found for node %s" % node["id"]) pass nodes.append(node) return nodes
[docs] def get_edges(self): """ Builds cx2-like edges based on the initial network Returns ------- edges (list): List with cx2-like edges after the ndex2 library """ edges = [] for edgeid in self.initial_cx[6]["edges"]: edge = {} edge["id"] = edgeid["@id"] edge["s"] = edgeid["s"] edge["t"] = edgeid["t"] edge["v"] = {} edge["v"]["interaction type"] = edgeid["i"] for attribute in self.initial_cx[7]["edgeAttributes"]: if attribute["po"] == edge["id"]: edge["v"].update({attribute["n"]:attribute["v"]}) edges.append(edge) return edges
[docs] def build_cx(self): """ Writes a .cx2 network using the nodes and edges returned by the get_nodes() and get_edges() attributes """ # Create an empty net cx net_cx = ndex2.cx2.CX2Network() # Add nodes on the net_cx mggId2netid = {} for node in self.cx2_nodes: node_attributes = node["v"] # Add node autoincr = net_cx.add_node(attributes=node_attributes) # Keep track of the ids on the inital mgg net mggId2netid[node["id"]] = autoincr # Add edges on the net_cx for edge in self.cx2_edges: source = mggId2netid[edge["s"]] target = mggId2netid[edge["t"]] attributes = edge["v"].copy() if attributes["interaction type"] in ["depletion", "cooccurrence"]: attributes['microbetag::weight'] = float(edge["v"]['microbetag::weight']) filtered_attributes = {key: value for key, value in attributes.items() if not (isinstance(value, list) and len(value) == 0)} # create an edge connecting the nodes, id of edge is returned _ = net_cx.add_edge(source=source, target=target, attributes=filtered_attributes) if self.outfile is None: # Basename timepoint = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M") netfile = "_".join(["mbtag_net", timepoint]) netfile = ".".join([netfile, "cx2"]) if self.graphml_file is not None: graphml_file = os.path.join(self.graphml_file, netfile) else: graphml_file = netfile else: graphml_file = self.outfile net_cx.set_network_attributes({'name': 'microbetag annotated network'}) net_cx.write_as_raw_cx2(graphml_file)
[docs] def build_ndex2_net(microbetag_pseudo_cx, outfile=None): """ Wrapper to fire an instance of UpdateCX2Netork class aiming to export a microbetag-annotated network file to .cx2 format Arguments --------- microbetag_net_file (str | list): path to initial microbetag-annotated network file Returns -------- (boolean): A .cx2 format file was successfully saved or not """ try: build_cx2 = UpdateCX2Netork(microbetag_pseudo_cx, outfile) build_cx2.build_cx() return True except Exception as e: logging.error('Error occurred when building cx2. %s' % str(e)) return False