"""
Utils supporting tasks for file parsing and formating
"""
import os, re
import json, csv
import time
import copy
import glob
import shutil
import random
import logging
import subprocess
import pandas as pd
from typing import List
# Handling data related
[docs]
def get_library_version(library_name):
import pkg_resources
try:
version = pkg_resources.get_distribution(library_name).version
return version
except pkg_resources.DistributionNotFound:
return "Library not found"
except Exception as e:
return str(e)
[docs]
def resolve_relative_path(base_dir, file_path):
# Count the number of "../" at the beginning
steps_back = 0
while file_path.startswith('../'):
steps_back += 1
file_path = file_path[3:] # Remove the leading "../"
# Move back "steps_back" levels from base_dir
for _ in range(steps_back):
base_dir = os.path.dirname(base_dir)
# Combine the modified base_dir with the remaining file_path
return os.path.join(base_dir, file_path)
[docs]
def resolve_file_path(base_dir, file_path):
# If the file_path is None, return None
if file_path is None:
return None
# Otherwise, resolve the file path based on the base directory and the user's input
if file_path.startswith('/'):
path = file_path
if file_path.startswith('~'):
path = os.path.expanduser(file_path)
if file_path.startswith('../'):
path = resolve_relative_path(base_dir, file_path)
else:
path = os.path.join(base_dir, file_path)
# Check if the file exists
if os.path.exists(path):
return path
else:
raise FileNotFoundError(f"File not found: {path}")
[docs]
class SetEncoder(json.JSONEncoder):
"""
Custom JSON encoder that handles serialization of Python sets.
This encoder extends the functionality of the standard JSONEncoder to support
serializing Python sets. JSON does not have a native representation for sets,
so this encoder converts sets to lists before serializing them.
Usage:
When serializing data to JSON using json.dump() or json.dumps(), specify
cls=SetEncoder to use this custom encoder.
References:
- json.JSONEncoder: https://docs.python.org/3/library/json.html#json.JSONEncoder
"""
[docs]
def default(self, obj):
"""
Override the default method of JSONEncoder to handle serialization of sets.
Notes:
If the object is a set, it is converted to a list before serialization.
Otherwise, the default behavior of JSONEncoder.default() is used. Notes:
If the object is a set, it is converted to a list before serialization.
Otherwise, the default behavior of JSONEncoder.default() is used.
"""
if isinstance(obj, set):
return list(obj)
return json.JSONEncoder.default(self, obj)
[docs]
def get_files_with_suffixes(directory, suffixes):
"""
Recursively retrieves files from a specified directory and its subdirectories
that have extensions matching a given list of suffixes.
Parameters:
directory (str): The root directory to start the search.
suffixes (list of str): A list of file suffixes (extensions) to match.
Each suffix should include the dot (e.g., '.txt', '.csv').
Returns:
list of str: A list of full paths to files that match any of the specified suffixes.
Example:
>>> get_files_with_suffixes('/path/to/directory', ['.txt', '.csv'])
['/path/to/directory/file1.txt', '/path/to/directory/subdir/file2.csv']
"""
matching_files = []
for root, _, files in os.walk(directory):
for file in files:
if any(file.endswith(suffix) for suffix in suffixes):
matching_files.append(os.path.join(root, file))
return matching_files
[docs]
def flatten(list_of_lists: List):
"""
This function takes a list of lists and flattens it until it returns a list with
all the components of the initial one.
"""
if len(list_of_lists) == 0:
return list_of_lists
if isinstance(list_of_lists[0], list):
return flatten(list_of_lists[0]) + flatten(list_of_lists[1:])
return list_of_lists[:1] + flatten(list_of_lists[1:])
[docs]
def run_until_done(command: str):
"""
Function to run recursively a command until
"""
if os.system(command) == 0:
return 1
else:
time.sleep(random.randint(2, 10))
logging.warning("recurscive run of: %s", command)
run_until_done(command)
[docs]
def file_exists_and_nonzero(filename: str):
"""
Check if a file exists and its size is nonzero.
Args:
filename (str): The path to the file.
Returns:
bool: True if the file exists and its size is nonzero, False otherwise.
"""
return os.path.exists(filename) and os.path.getsize(filename) > 0
[docs]
def split_list(input_list: List, chunk_size: int):
"""
Split a list to sublists of a size.
"""
return [input_list[i:i + chunk_size] for i in range(0, len(input_list), chunk_size)]
[docs]
def many_to_one_files(dir_with_files, merged_file):
"""
Makes a single file out of all files in a directory by concatenating having rows of one after the other
"""
command = " ".join([
"find", dir_with_files,
"-type", "f",
"-name", 'K*',
"-print0",
"|",
"xargs", "-0", "cat", ">", merged_file
])
os.system(command)
[docs]
def ko_list_parser(ko_list: str):
"""
Parses ko_list file into a dict object - based on DiTing
:param ko_list: path to the ko_list file that comes from the kofam database https://www.genome.jp/ftp/db/kofam/
:return: a dictionary mapping knum to threshold and score_type
:rtype: dict
"""
ko_dic = {} # { knum : [threshold, score_type] }
with open(ko_list) as fi:
next(fi) # skip the first line (header)
for line in fi:
knum, threshold, score_type = line.split('\t')[0:3]
if threshold == '-':
continue
else:
ko_dic[knum] = [threshold, score_type]
return ko_dic
[docs]
def merge_ko(hmmout_dir, output):
"""
Parses the KO<>.<bin>.hmmout files produced by the kegg_annotation() function
to create a single 3-column file (output) with the bin_id, the corresponding conting and the KO that wa mapped to it.
The function then returns a dictionary with the bin ids as the keys and the set of KOs found to each as the value.
:param hmmout_dir (str): path to the .hmmout files
:param output (str): path/filename to save the output file
"""
# Under any circumstances microbetag will overwrite the ko_merged.txt file
with open(output, 'w') as fo:
fo.write('bin_id\tcontig_id\tko_term\n')
# Iterate through the bin folders in the hmmout folder
for bin_id in os.listdir(hmmout_dir):
bin_folder = os.path.join(hmmout_dir, bin_id)
bin_file = "_".join([bin_id, "kos.tsv"])
bin_kos_file = os.path.join(bin_folder, bin_file)
# Append
os.system(
" ".join([
"cat", bin_kos_file, ">>", output
])
)
[docs]
def bin_kos_to_file(hmmout_dir, bin_id):
"""
Builds a 3-col file for a bin and remove the KO-specific output files of hmmsearch
:param hmmout_dir: Directory to the hmmout files
:param ko_tmp:
"""
# Write 3-cols entries in tmp file
bin_kos_file = os.path.join(hmmout_dir, "".join([bin_id, "_kos.tsv"]))
if not os.path.exists(bin_kos_file):
open(bin_kos_file, 'w').close()
for hmmout_file in os.listdir(hmmout_dir):
try:
basename, gene_id, k_number = parse_hmmout(hmmout_file, hmmout_dir)
with open(bin_kos_file, 'a') as fo:
fo.write(basename + '\t' + gene_id + '\t' + k_number + '\n')
except:
# Ignore non-informative lines
pass
# Remove .hmmout files
bin_hmmout = os.path.join(hmmout_dir, ".".join([bin_id, "hmmout.all"]))
print(bin_hmmout)
many_to_one_files(hmmout_dir, bin_hmmout)
for p in glob.glob(hmmout_dir, recursive=True):
print(p)
if os.path.isfile(p) and p.endswith(".hmmout"):
os.remove(p)
[docs]
def parse_hmmout(hmmout_file, hmmout_dir):
"""
Parses the output of the hmmsearch
:param hmmout_file (str): Filename of the .hmmout file
:param hmmout_dir (str): Directory where hmmout_file is located
:return basename (str): Bin id
:return gene_id (str): Gene id
:retrun k_number (str): KEGG ORTHOLOGY term found
"""
if hmmout_file.endswith('.hmmout'):
kobasename = hmmout_file.rsplit('.', 1)[0]
basename = kobasename.split('.', 1)[1]
hmmout_file_path = os.path.join(hmmout_dir, hmmout_file)
with open(hmmout_file_path, 'r') as fi:
for line in fi:
if not line.startswith('#'):
gene_id, _ = line.split()[0:2] # under _ the accession
lines = line.split()
if re.match(r'[0-9]+$', lines[2]):
k_number = lines[3]
else:
k_number = lines[2]
return basename, gene_id, k_number
[docs]
def load_merged_ko_file(merged_ko):
"""
Load the 3-columns KEGG annotations file as built from the merge_ko()
Input:
merged_ko (str): path to 3-columns output file of the merge_ko()
Returns:
pivot_df (pd.DataFrame): a presence-absence (1/0) df where KOs are the rows and bin_ids the columns
"""
if merged_ko.endswith('.gz'):
os.system(f"gunzip {merged_ko}")
merged_ko = merged_ko.rsplit('.gz', 1)[0]
df = pd.read_csv(merged_ko, sep="\t")
column_names = df.columns.tolist()
bin_id, _, ko = column_names[:3]
# Pivot the DataFrame to have 'kegg_id' as rows and 'bin_id' as columns
unique_combinations = df.drop_duplicates().copy()
unique_combinations.loc[:, 'presence'] = 1
pivot_df = unique_combinations.pivot_table(index=ko, columns=bin_id, values='presence', fill_value=0)
os.system(f"gzip {merged_ko}")
return pivot_df # keep one | used to alse return the bins_kos
[docs]
def convert_to_json_serializable(obj):
"""
Recursively serializes entries of an object, i.e. a set is converted to a list, a list is split to its items
and a dictionary keeps its key and their values get serialized
"""
if isinstance(obj, (int, float, str, bool, type(None))):
return obj
elif isinstance(obj, set):
return list(obj)
elif isinstance(obj, list):
return [convert_to_json_serializable(item) for item in obj]
elif isinstance(obj, dict):
return {key: convert_to_json_serializable(value) for key, value in obj.items()}
else:
try:
return json.dumps(obj)
except TypeError:
return str(obj)
[docs]
def ensure_same_namespace_after_fw(conf):
"""
[TODO] can be removed but let's wait
Inconsistencies from D300244:bin_000023 in the abundance table to D300244.bin_000023 in FlashWeave.
Keep the routine in general along with the find_id_differences().
Apparently, the conf.network is a FlashWeave network file - thus the skip 2 rows
"""
import difflib
# Function to find the closest match and its index
def find_closest_match_with_index(element, list2, cutoff=0.6):
matches = difflib.get_close_matches(element, list2, n=1, cutoff=cutoff)
if matches:
closest_match = matches[0]
index = list2.index(closest_match)
return closest_match, index
return None, None
abd_df = pd.read_csv(conf.flashweave_abd_table, sep="\t")
abd_df_seqids = abd_df[abd_df.columns[0]].tolist()
net_df = pd.read_csv(conf.network, sep="\t", skiprows=2, header = None)
net_df.columns = ["bin_a", "bind_b", "weight"]
col1 = net_df["bin_a"].tolist()
col2 = net_df["bind_b"].tolist()
weight = net_df["weight"].tolist()
# Replace closest match in both col1 and col2 with the element from abd_df_seqids
for element in abd_df_seqids:
for col in [col1, col2]: # Iterate over both columns
closest_match, index = find_closest_match_with_index(element, col)
if closest_match:
# Replace the closest match in the current column
col[index] = element
net_df = pd.DataFrame(list(zip(col1, col2, weight)), columns=["bin_a", "bind_b", "microbetag::weight"])
net_df.to_csv(conf.network, sep="\t", index=False, header=False)
return 1
[docs]
def extend_complements(complements_json,
descrps_path,
max_scratch_alt,
pathway_complement_percentage,
pathway_complements_dir):
"""
Extends pathway complement annotations based on given settings and descriptions.
Parameters:
- complements_json: Path to the complements JSON file.
- descrps_path: Path to the KEGG MODULES description file.
- max_scratch_alt: Maximum number of alternative complements allowed.
- pathway_complement_percentage: Maximum allowable percentage of required KOs that must be present.
- pathway_complements_dir: Directory to save the extended complements JSON file.
complements_dict (dict): Dictionary of complements loaded from a JSON file.
descrps_path (str): Path to the module descriptions file (tab-separated file with no header).
Returns:
dict: Pathway Complementarities in a dictionary to be assigned in the mgg format
Builds:
pathway_complements_extended: JSON file with the dictionary returned
"""
# Load and process module descriptions
descrps = pd.read_csv(descrps_path, sep="\t", header=None)
descrps.columns = ["category", "moduleId", "description"]
column_order = ["moduleId", "description", "category"]
descrps = descrps[column_order]
# Deep copy the complements dictionary
with open(complements_json, 'r') as file:
complements_dict = json.load(file)
# complements_dict = json.load(open(complements_json))
complements_dict_ext = copy.deepcopy(complements_dict)
# Process complements
for beneficiary_bin, potential_donors in complements_dict.items():
for potential_donor, compls in potential_donors.items():
if compls:
complements_dict_ext[beneficiary_bin][potential_donor] = {}
for compl in compls:
module_id = compl[0][3:] # Extract module ID
kos_to_get = compl[1] # KOs required to complete the pathway
complet_alt = compl[2] # Alternative complements
# Skip if the complement is too complex based on settings
if len(complet_alt) == len(kos_to_get) > max_scratch_alt:
continue
# Skip if the percentage exceeds the threshold
perce = len(kos_to_get) / len(complet_alt)
if perce > pathway_complement_percentage:
continue
# Prepare the complement string
compl_str = [x if isinstance(x, str) else ";".join(x) for x in compl[1:]]
# Fetch module description details
triplet = descrps[descrps["moduleId"] == module_id].values.tolist()[0]
# Add extended complement details
complements_dict_ext[beneficiary_bin][potential_donor][
len(complements_dict_ext[beneficiary_bin][potential_donor])
] = triplet + compl_str
# Save extended complements to JSON
extended_path_compl_json = os.path.join(pathway_complements_dir, "pathway_complements_extended.json")
with open(extended_path_compl_json, "w") as f:
json.dump(complements_dict_ext, f)
return complements_dict_ext
[docs]
def extend_faprotax(conf):
bin_faprotax_traits = {}
fapro_sub_tables = [os.path.join(conf.faprotax_sub_tables, file) for file in os.listdir(conf.faprotax_sub_tables)]
for file in fapro_sub_tables:
trait_name, _ = os.path.splitext(os.path.basename(file))
trait = pd.read_csv(file, sep="\t", skiprows=1)
bins_with_trait = trait[conf.sequence_id_column_name].dropna()
for bin_id in bins_with_trait:
bin_faprotax_traits.setdefault(bin_id, []).append(trait_name)
faprotax_traits = list(flatten_list(bin_faprotax_traits.values()))
return bin_faprotax_traits, faprotax_traits
[docs]
def load_phenotypic_traits(conf):
"""PhenDB-like traits"""
logging.info("Loading phenotypic traits")
bin_phen_traits = {}
phentraits = set()
all_phen_df = os.path.join(conf.predictions_path, "phen_traits.tsv") # TODO: consider giving this also as an argument in the config -- MAKE SURE THEY GIVE SEQUENCE ID AND NOT GTDB ID !!
if os.path.exists(all_phen_df):
df = pd.read_csv(all_phen_df, sep="\t")
df = df.drop(["NCBI_ID", "gtdb_id"], axis=1)
phentraits = {x for x in df.columns if "Score" not in x}
phentraits.remove("sequence_id")
list_of_dics = df.to_dict(orient="records")
for entry in list_of_dics:
bin_phen_traits[entry["sequence_id"]] = {}
for trait in phentraits:
bin_phen_traits[entry["sequence_id"]][trait] = {
"presence": entry[trait],
"confidence": entry["".join([trait, "Score"])]
}
else:
prediction_files = [os.path.join(conf.predictions_path, file) for file in os.listdir(conf.predictions_path)]
for file in prediction_files:
if os.path.getsize(file) == 0:
continue
trait = pd.read_csv(file, sep="\t", skiprows=1)
trait_name = os.path.basename(file).split(".prediction.tsv")[0]
trait_filtered = trait[trait['Trait present'].notna()]
trait_dict = trait_filtered.to_dict(orient="records")
for case in trait_dict:
bin_id, _ = os.path.splitext(case["Identifier"])
if bin_id not in bin_phen_traits:
bin_phen_traits[bin_id] = {}
phentraits.add(trait_name)
bin_phen_traits[bin_id][trait_name] = {
"presence": case["Trait present"],
"confidence": case["Confidence"]
}
return bin_phen_traits, phentraits
[docs]
def flatten_list(lista, flat_list=[]):
"""
Recursive function taking as input a nested list and returning a flatten one.
E.g. ['GCF_003252755.1', 'GCF_900638025.1', 'GCF_003252725.1', 'GCF_003253005.1', 'GCF_003252795.1', ['GCF_000210895.1'], ['GCF_000191405.1']]
becomes ['GCF_003252755.1', 'GCF_900638025.1', 'GCF_003252725.1', 'GCF_003253005.1', 'GCF_003252795.1'].
"""
for i in lista:
if isinstance(i, list):
flatten_list(i, flat_list)
else:
flat_list.append(i)
return set(flat_list)
[docs]
def detect_separator(file_path):
"""
Detect the separator used in a text file, i.e `\t`, `,` , `;` etc.
"""
try:
with open(file_path, 'r') as file:
# Get the total file size
file.seek(0, 2) # Move to the end of the file
file_size = file.tell()
# Calculate 1% of the file size: 1e6 is 1MB
percent_size = (
file_size if file_size < 1e5 else
int(file_size * 0.2) if file_size < 1e6 else
int(file_size * 0.1) if 1e7 < file_size < 1e8 else
int(file_size * 0.01)
)
percent_size = max(percent_size, int(1e5))
# Log sizes
logging.info(f"file_size of {file_path}: {file_size}") ; logging.info(f"percent_size: {percent_size}")
# # Move to the start of the file
file.seek(0)
sample = file.read(percent_size)
# Use csv.Sniffer to detect the dialect
sniffer = csv.Sniffer()
dialect = sniffer.sniff(sample)
return dialect.delimiter
except:
raise TypeError(f"Cannot get delimiter for file {file_path}")