Source code for microbetag.config


import os
import logging
from .helpers import *
from .networks import get_edgelist
from .utils import resolve_file_path, detect_separator



# Set up custom logging format
logging.basicConfig(
    format='%(levelname)s: %(message)s',  # Define the format without "root:"
    level=logging.WARNING  # Set the logging level
)


[docs] def load_abundance(abd_file): """ Load a tsv/csv format abundance table assuming the sequence id is procided in the first column and the taxonomy in the last one :return seq_id_to_taxonomy: A pd.DataFrame with the sequence id and their corresponding genome :return sequence_id_column_name: The name (``str``) of the column with the sequence identifier (e.g. ``seqId``) :return taxonomy_column_name: The name (``str``) of the column with the taxonomy """ delimiter = detect_separator(abd_file) abd_tab_df = pd.read_csv(abd_file, sep=delimiter) sequence_id_column_name = abd_tab_df.columns[0] taxonomy_column_name = abd_tab_df.columns[-1] seq_id_to_taxonomy = abd_tab_df[[sequence_id_column_name, taxonomy_column_name]] seq_id_to_taxonomy.columns = ["sequence_id", "taxonomy"] return seq_id_to_taxonomy, sequence_id_column_name, taxonomy_column_name, delimiter
[docs] class Config: """ Parses a microbetag configuration file (yaml) to init a microbetag run. """ def __init__(self, conf, config_file): # Pass loaded yaml object self.yaml = conf # Load emojis emojis = Emojis() # User's group and user id self.yaml_file = os.stat(config_file) self.user_id = self.yaml_file.st_uid self.group_id = self.yaml_file.st_gid config_wd = os.path.dirname(os.path.realpath(__file__)) self.cwd = os.path.dirname(config_wd) # Check if microbetag runs as a container self.mount = None if self.cwd == "/microbetag": self.mount = "/data" self.base_dir = self.mount else: self.base_dir = os.path.dirname(config_file) # Threads to be used self.threads = conf["threads"]["value"] if conf["threads"]["value"] else 2 # Output dir output_dir = conf.get("output_directory", {}).get("dir_path") if output_dir: self.output_dir = os.path.join(self.base_dir, output_dir) else: raise ValueError("Output directory needs to be specified.") # Bins/MAGs/genomes bins_fasta = conf.get("bins_fasta", {}).get("dir_path") self.bins_path = resolve_file_path(self.base_dir, bins_fasta) # The abundance table is now optional, if no abundance table and no network provided, then it will only run pre-calculations abd_tbl_filename = conf.get("abundance_table_file", {}).get("file_path") self.abundance_table = resolve_file_path(self.base_dir, abd_tbl_filename) # if self.abundance_table is not None: # self.delimiter = detect_separator(self.abundance_table) # Edgelist of provided network edge_list = conf.get("edge_list", {}).get("file_path") self.network = resolve_file_path(self.base_dir, edge_list) #os.path.join(self.base_dir, edge_list) if edge_list else None # NOTE: Setting precalculations only as true, also allows to get a Config instance # without providing an abundance table or network, meaning you can use the Config instance # for partial/specific tasks of microbetag. precalc_only = conf.get("precalulations_only").get("value") self.precalc_only = precalc_only if precalc_only in [0,1] else False if self.abundance_table is None and self.network is None and precalc_only is False: raise ValueError(f"You need to provide at least one between an abundance table and a network's edgelist in 3-column format.") # IMPORTANT: Sequence to taxonomy map -- required if no abundance table is needed sequence_taxonomy_map = conf.get("sequence_id_taxonomy_map", {}).get("file_path") self.sequence_taxonomy_map = resolve_file_path(self.base_dir, sequence_taxonomy_map) if self.abundance_table is None and self.sequence_taxonomy_map is None and precalc_only is False: raise ValueError( f"Since an abundance table is not provided, you need to provide a 2-column file with the sequence id (e.g bin ids)" "and their corresponding taxonomy or taxon name." ) # IMPORTANT: Sequence id to taxonomy map if self.network is None and self.abundance_table: ( self.seq_to_taxon_df, self.sequence_id_column_name, self.taxonomy_column_name, self.delimiter ) = load_abundance(self.abundance_table) self.seq_ids = self.seq_to_taxon_df["sequence_id"].unique().tolist() elif self.abundance_table is None and self.network: # delimiter = detect_separator(self.sequence_taxonomy_map) seq_to_taxon_df = pd.read_csv(self.sequence_taxonomy_map, sep=self.delimiter) seq_to_taxon_df.columns = ["sequence_id", "taxonomy"] self.seq_to_taxon_df = seq_to_taxon_df self.seq_ids = self.seq_to_taxon_df["sequence_id"].unique().tolist() elif self.abundance_table and self.network: # NOTE: Not all sequence ids in the seq_ids need to have a taxonomy in this case -- only those coming from the abundance table # Yet, in case that the network has taxa not present in the abundance table, apparently it will lead to errors. network_df = get_edgelist(self) net_seq_ids = pd.concat([network_df.iloc[:, 0], network_df.iloc[:, 1]]).unique().tolist() ( self.seq_to_taxon_df, self.sequence_id_column_name, self.taxonomy_column_name, self.delimiter ) = load_abundance(self.abundance_table) abd_seq_ids = self.seq_to_taxon_df["sequence_id"].unique().tolist() self.seq_ids = net_seq_ids + abd_seq_ids else: logging.warning( "Neither an abundance table nor a network was procided.\n" "microbetag will only run some pre-calculations not requiring them.\n" f"This is only good to use if you are are quite familiar with microbetag and you know what you are doing. {emojis.WARNING_EMOJI}" ) # Set bins --- NOTE: CHECK FOR CONFLICTS self.bins_ids = None if self.bins_path is not None: bn = BinsHandler(config=self) self.__dict__.update(vars(bn)) # Get pathway complementarity related variables pcompl = conf.get("pathway_complementarity", {}).get("value") self.pathway_complementarity = pcompl if pcompl in [0,1] else True pc = PathwayComplementarity(config=self) self.__dict__.update(vars(pc)) # Load abundance table with taxonomy nsc = AbdTableHandler(config=self) self.__dict__.update(vars(nsc)) # Check whether bin names are the same in both abundance and edgelist files nh = NetworkHandler(config=self) self.__dict__.update(vars(nh)) # Build output dir os.makedirs(self.output_dir, exist_ok=True) self.predictions_path = os.path.join(self.output_dir, "phen_predictions") os.makedirs(self.predictions_path, exist_ok=True) # Open Reading Frames orfs = conf.get("orfs", {}).get("path") if orfs is None: self.prodigal = os.path.join(self.output_dir, "ORFs") os.makedirs(self.prodigal, exist_ok=True) else: self.prodigal = os.path.join(self.base_dir, orfs) # ModelSEEDpy arguments self.gapfill_model = conf["gapfill_model"]["value"] self.gapfill_media = conf["gapfill_media"]["value"] # Flashweave arguments self.metadata_file = conf.get("metadata_file").get("file_path") self.metadata = "false" if self.metadata_file == "false" else "true" self.flashweave_args = conf["flashweave_args"] # Phenotrex self.phen_classes = os.path.join(config_wd, "mtg_maps_models/phenDB/classes/") self.genotypes_file = os.path.join(self.output_dir, "train.genotype") min_proba = conf.get("min_proba", {}).get("value") ; self.min_proba = min_proba if not None else 0.6 # Mappings mappings = MappingPaths(config=self) self.__dict__.update(vars(mappings)) # FAPROTAX if self.abundance_table is not None: faprotax = Faprotax(config=self) self.__dict__.update(vars(faprotax)) # Manta net_clust = conf.get("network_clustering").get("value") self.network_clustering = ( net_clust if net_clust in [0,1] else False ) if self.network_clustering: self.prev_manta_net = conf.get("prev_clustered_network").get("file_path") self.manta_net = ( os.path.join(self.base_dir, self.prev_manta_net) if self.prev_manta_net is not None else os.path.join(self.output_dir, 'manta_annotated.cyjs') ) self.base_network_file = os.path.join(self.output_dir, "basenet.cyjs") # # Set variables regarding GENREs provided # genres = GenresHandler(config=self) # self.__dict__.update(vars(genres)) # Seed complementarity sc = SeedComplementarityHandler(config=self) self.__dict__.update(vars(sc)) # Intermediate annoteted network file name self.microbetag_annotated_network_file = os.path.join(self.output_dir, "pseudo_cx_annotated_net.cx") self.tinyurl = ( conf.get("tinyurl", {}).get("value") if conf.get("tinyurl", {}).get("value") else False ) # ========== # Init torch -- machine learning library # ========== import torch from deepnog.utils import get_weights_path from deepnog.utils import set_device device = set_device('auto') try: weights_path = get_weights_path( database="eggNOG5", level=str(2), architecture="deepencoding", ) _ = torch.load(weights_path, map_location=device) except: logging.warn("Could not load the deepnog weights. Please check the deepnog installation and setup.") pass logging.info("Configuration file loaded successfully.")
[docs] def export_to_log(self, log_file="parameters.log"): logging.basicConfig(filename=log_file, level=logging.INFO, format='%(message)s') print("Instance attribute values:") for key, value in self.__dict__.items(): print(f"{key}: {value}")