Source code for microbetag.tools

"""
Functions invoking other software/tools in the microbetag pipeline.
"""
import os
import sys
import glob
import shutil

import subprocess
import multiprocessing

from tqdm import tqdm

from julia.api import Julia
from typing import TYPE_CHECKING, List

from .utils import (
    get_files_with_suffixes,
    get_library_version,
    get_tool_location,
    ensure_flashweave_format,
    ensure_same_namespace_after_fw,
    mtg_logger,
)
from .seed_complementarity import ExportSeedComplementarities

if TYPE_CHECKING:
    from .config import Config


_logger_ = mtg_logger(__name__)


[docs] def run_seed_complementarity(config: "Config") -> None: """ Based on the type of files to be used for the seed complementarity step, adjustments are needed in terms of inner architecture of the files, then invokes :class:`.ExportSeedComplementarities` to get seed and non-seed sets, and based on those export seed complementarities. Args: config: Instance of the microbetag :class:`.Config` class """ if config.prev_conf is None or os.path.exists(config.prev_conf) is False: config.skip_sets = False if config.user_models: genre_files = [ os.path.join(config.for_reconstructions, file) for file in os.listdir(config.for_reconstructions) ] for file in genre_files: dest_path = os.path.join(config.genres, os.path.basename(file)) try: shutil.copy(file, dest_path) except Exception: pass else: all_files = [ os.path.join(config.reconstructions, file) for file in os.listdir(config.reconstructions) ] genre_files = [file for file in all_files if file.startswith(".xml")] for file in genre_files: dest_path = os.path.join(config.genres, os.path.basename(file)) shutil.move(file, dest_path) seeds = ExportSeedComplementarities(config) # If no seed and/or non-seed sets are missing, get them if config.skip_sets is False and config.api is False: seeds.get_sets() # If either the phylomint scores file or the one with the seed complementarities (pckl) is missing, exract them if seeds.get_scores or seeds.get_complements: seeds.get_scores_and_compls()
[docs] def hmmsearch(params: List) -> None: """ Function to invoke hmmsearch software. Arguments: params: list of parameters to be passed to the hmmsearch function. Note: We use only 1 cpu since we use a muliprocessing.Pool in the kegg_annotation(). Cite: HMMER 3.4 (Aug 2023); http://hmmer.org/ Copyright (C) 2023 Howard Hughes Medical Institute. Freely distributed under the BSD open source license. """ (threshold_method, threshold, outtype, output, hmm_db, faa) = params cmd_para = [ "hmmsearch", threshold_method, threshold, "--cpu", "1", "-o /dev/null", outtype, output, hmm_db, faa, ] cmd = " ".join(cmd_para) try: os.system(cmd) except Exception: _logger_.warning("Something wrong with KEGG hmmsearch!")
[docs] def run_prodigal(fasta: str, basename: str, outdir: str) -> None: """ Function to predict ORFs using Prodigal. By default outdir is the ORFs folder fna FASTA nucleic acid Used generically to specify nucleic acids ffn FASTA nucleotide of gene regions Contains coding regions for a genome Cite: Hyatt D, Chen GL, LoCascio PF, Land ML, Larimer FW, Hauser LJ. Prodigal: prokaryotic gene recognition and translation initiation site identification. BMC bioinformatics. 2010 Dec;11:1-1. """ faa_file = os.path.join(outdir, basename + ".faa") ffn_file = os.path.join(outdir, basename + ".ffn") fna_file = os.path.join(outdir, basename + ".fna") gbk_file = os.path.join(outdir, basename + ".gbk") PRODIGAL = get_tool_location("prodigal") cmd_para = [ PRODIGAL, "-q", "-i", fasta, "-p", "meta", "-a", faa_file, "-d", ffn_file, "-o", gbk_file, ] cmd = " ".join(cmd_para) if os.path.exists(faa_file) or os.path.exists(fna_file) or os.path.exists(ffn_file): _logger_.info("ORFs already predicted for bin: %s", basename) else: _logger_.info("ORFs to be predicted for bin: %s", basename) try: os.system(cmd) except Exception: _logger_.warning("Something wrong with prodigal annotation!")
[docs] def kegg_annotation( faa: str, basename: str, out_dir: str, db_dir: str, ko_dic: dict, threads: int ) -> bool: """ Function to perform KEGG annotation in parallel. The function invokes `hmmsearch`. Args: faa: Filepath to the .faa file of the bin in process basename: Bin id out_dir: Path to output directory where .hmmout files will be stored db_dir: Path to KEGG database directory ko_dic: threads: Number of threads to be used """ _logger_.info("KEGG annotation for %s", basename) params = [] if not os.path.exists(faa): _logger_.warn( f"A .faa file for {basename} is not availavle. microbetag will skip it." ) return False for knum, info in ko_dic.items(): # Check if hmmout for particular KO of a certain bin is already there hmmout_filename = ".".join([knum, str(basename), "hmmout"]) output = os.path.join(out_dir, basename, hmmout_filename) if os.path.exists(output): continue # Get hmm for KO under consideration hmm_db = os.path.join(db_dir, "profiles", knum + ".hmm") if not os.path.exists(hmm_db): continue # Set params for hmmsearch based on the ko_list annotation if info[1] == "full": threshold_method = "-T" outtype = "--tblout" elif info[1] == "domain": threshold_method = "--domT" outtype = "--domtblout" elif info[1] == "custom": threshold_method = "-E" outtype = "--tblout" params.append((threshold_method, info[0], outtype, output, hmm_db, faa)) _logger_.info("Number of KEGG processes to be performed: %s", str(len(params))) # process = multiprocessing.Pool(threads) # process.map(hmmsearch, params) # i.e. hmmsearch() with multiprocessing.Pool(threads) as pool: for _ in tqdm(pool.imap_unordered(hmmsearch, params), total=len(params)): pass return True
[docs] def phenotrex_genotype(config: "Config") -> None: """ Runs the `compute-genotype` program of phenotrex to get COGs present in the list of genomes under study. Cite: Feldbauer R, Schulz F, Horn M, Rattei T. Prediction of microbial phenotypes based on comparative genomics. BMC bioinformatics. 2015 Dec;16:1-8. https://phenotrex.readthedocs.io """ from . import _MTG_PHEN_ENV if not os.path.exists(config.genotypes_file): _logger_.info("Get phenotrex predictions") suffixes = [".fa", ".fasta", ".gz"] bin_files = get_files_with_suffixes(config.bins_path, suffixes) bin_files_in_a_row = " ".join(bin_files) # Build genotypes compute_genotype_params = [] if not config.cwd.startswith("/microbetag"): compute_genotype_params += ["conda run -n", _MTG_PHEN_ENV] compute_genotype_params += [ "phenotrex", "compute-genotype", "--out", config.genotypes_file, "--threads", str(config.threads), bin_files_in_a_row, ] # Container - case if config.cwd.startswith("/microbetag"): sk_init_version = get_library_version("scikit-learn") pheno_init_version = get_library_version("phenotrex") sk_required_version = "1.3.2" if sk_init_version != sk_required_version: # os.system("python3 -m pip install scikit-learn==1.3.2") get_sklearn_v = "==".join(["scikit-learn", sk_required_version]) subprocess.run([sys.executable, "-m", "pip", "install", get_sklearn_v]) pheno_required_version = "0.6.0" if pheno_init_version != pheno_required_version: get_pheno_v = "==".join(["phenotrex[fasta]", pheno_required_version]) subprocess.run([sys.executable, "-m", "pip", "install", get_pheno_v]) # Run the command compute_genotype_command = " ".join(compute_genotype_params) if os.system(compute_genotype_command) != 0: _logger_.info("Try phenotrex genotype for the second time.") if os.system(compute_genotype_command) != 0: _logger_.error("Phenotrex compute genotype command failed.") sys.exit(0) # Local case else: compute_genotype_command = " ".join(compute_genotype_params) try: subprocess.run(compute_genotype_command, shell=True, check=True) except Exception as e: _logger_.error(e) sys.exit(1)
[docs] def phenotrex_predict(config: "Config") -> None: """ Runs the :func:`predict` program of `phenotrex` to predict whether a genome does have a trait or not. Cite: Feldbauer R, Schulz F, Horn M, Rattei T. Prediction of microbial phenotypes based on comparative genomics. BMC bioinformatics. 2015 Dec;16:1-8. https://phenotrex.readthedocs.io """ from . import _MTG_PHEN_ENV # Get predictions phen_models = [ os.path.join(config.phen_classes, model) for model in os.listdir(config.phen_classes) ] # Parse through the phen models (classes) for model in phen_models: model_name = os.path.basename(model) model_predictions_output = "".join( [config.predictions_path, "/", model_name[:-4], ".prediction.tsv"] ) # Skip if predictions already computed if os.path.exists(model_predictions_output): _logger_.info("Predictions already exist for model: %s", model_name) else: predict_traits_params = [] # Local case if not config.cwd.startswith("/microbetag"): predict_traits_params = ["conda run -n", _MTG_PHEN_ENV] # All cases: local and container predict_traits_params += [ "phenotrex", "predict", "--genotype", config.genotypes_file, "--classifier", model, "--min_proba", str(config.min_proba), "--verb >", model_predictions_output, ] predict_trait_command = " ".join(predict_traits_params) try: subprocess.run(predict_trait_command, shell=True, check=True) except subprocess.CalledProcessError as e: _logger_.warn(f"Command execution failed with return code {e}") # If no predictions file is present for any classes, probably something went off. if not any(glob.glob(os.path.join(config.predictions_path, "*.prediction.tsv"))): _logger_.error( "No prediction was able to be retrieved with phenotrex. Check your input files." ) sys.exit(0)
[docs] def run_manta(config: "Config") -> None: """ Runs the manta package to perform network clustering. Cite: Röttjers L, Faust K. Manta: A clustering algorithm for weighted ecological networks. Msystems. 2020 Feb 25;5(1):10-128. """ # Build the manta command _logger_.info("Running manta clustering algorithm.") manta_output_file = "/".join([config.output_dir, "manta_annotated"]) manta_params = [ "manta", "-i", config.base_network_file, "-f", "cyjs", "-o", manta_output_file, "--layout", ] manta_command = " ".join(manta_params) # Run manta try: if os.system(manta_command) != 0: warning_msg = ( "The manta clustering algorithm failed." "Most likely this is because clusters could not be grouped based on the provided network " "and the parameters setup of manta." "Yet, microbetag will continue to the following steps without considering for clusters." ) _logger_.warning(warning_msg) # Changed cfg so the buld_cx_annotated_graph function will not fail. config.network_clustering = False else: _logger_.info("""manta ran fine.""") except Exception as e: _logger_.warning(e) config.network_clustering = False
[docs] def run_flashweave(config: "Config") -> None: """ Runs FlashWeave to infer co-occurrence network. Cite: Tackmann J, Rodrigues JF, von Mering C. Rapid inference of direct interactions in large-scale ecological networks from heterogeneous microbial sequencing data. Cell systems. 2019 Sep 25;9(3):286-96. """ _logger_.info("Fix FlashWeave arguments from config.") # Checking for format support. if not config.onthefly: ensure_flashweave_format(conf=config) pair_args = set() for arg, values in config.flashweave_args.items(): if values["required"]: if isinstance(values["value"], bool): pair_args.add((arg, str(values["value"]).lower())) else: _logger_.error( f'You need to provide values for "{arg}" argument of FlashWeave.' ) sys.exit(0) else: if values["value"] is not None: if values["type"] == "Bool": pair_args.add((arg, str(values["value"]).lower())) else: pair_args.add((arg, values["value"])) pair_args.add(("transposed", "true")) learn_in = ",".join(f"{arg[0]}={arg[1]}" for arg in pair_args) # Checking for Julia instance if not hasattr(config, 'julia_instance'): _logger_.info("Init Julia through Python!") config.jl = Julia(compiled_modules=False) config.jl.using("FlashWeave") # Run FlashWeave based on presence/absence of a metadata file try: if config.metadata_file: _logger_.info("Running FlashWeaeve along with a metadata file.") flashweave_cmd = ( f'save_network("{config.network}", ' f'learn_network("{config.flashweave_abd_table}", ' f'"{config.metadata_file}", {learn_in}))' ) else: _logger_.info("Running FlashWeaeve.") flashweave_cmd = f'save_network("{config.network}", learn_network("{config.flashweave_abd_table}", {learn_in}))' # Run flashweave _logger_.info(flashweave_cmd) config.jl.eval(flashweave_cmd) except Exception: error_msg = ( "FlashWeave failed to build a co-occurrence network. \n" "Please check on your abundance data and/or metadata files format.\n " ) raise RuntimeError(error_msg) try: ensure_same_namespace_after_fw(config) except Exception: error_msg = ( "FlashWeave ran but ended up with an empty edge file, meaning no co-occurrence or co-exclusive associations were found" "Consider looking at https://github.com/meringlab/FlashWeave.jl for FlashWeave parameterization and limitations." ) raise RuntimeError(error_msg)
[docs] def run_faprotax(config: "Config") -> None: """ Runs FAPROTAX collapse_table.py script to annotate taxa based on their taxonomy using literature. Cite: Louca S, Parfrey LW, Doebeli M. Decoupling function and taxonomy in the global ocean microbiome. Science. 2016 Sep 16;353(6305):1272-7. """ faprotax_params = [ "python", config.faprotax_script, "-i", config.abundance_table, "-o", config.faprotax_funct_table, "-g", config.faprotax_txt, "-c", '"' + "#" + '"', "-d", '"' + config.taxonomy_column_name + '"', "-v", "--force", "-s", config.faprotax_sub_tables, ] faprotax_command = " ".join(faprotax_params) _logger_.info(faprotax_command) _logger_.info(os.system("which python")) try: os.system(faprotax_command) except Exception: raise "FAPROTAX failed."