Source code for drugforge.docking.analysis

import os
import pickle as pkl
import re
from enum import Enum
from pathlib import Path

import numpy as np
import pandas as pd
from drugforge.data.backend.openeye import oechem, oeshape
from drugforge.data.schema.ligand import Ligand


[docs] class DockingDataset:
[docs] def __init__(self, pkl_fn, dir_path): self.pkl_fn = pkl_fn self.dir_path = dir_path for path in [self.pkl_fn, self.dir_path]: assert os.path.exists(path)
def read_pkl(self): self.compound_ids, self.xtal_ids, self.res_ranks = pkl.load( open(self.pkl_fn, "rb") ) def get_cmpd_dir_path(self, cmpd_id): # make sure this directory exists cmpd_dir = os.path.join(self.dir_path, cmpd_id) assert os.path.exists(cmpd_dir) return cmpd_dir def organize_docking_results(self): # Make lists we want to use to collect information about docking results cmpd_ids = [] xtal_ids = [] chain_ids = [] mcss_ranks = [] sdf_fns = [] cmplx_ids = [] # dictionary of reference sdf files for each compound id ref_fn_dict = {} # since each compound has its own directory, we can use that directory for cmpd_id in self.compound_ids: cmpd_id = cmpd_id.rstrip() cmpd_dir = self.get_cmpd_dir_path(cmpd_id) # get list of files in this directory fn_list = os.listdir(cmpd_dir) # Process this list into info # TODO: use REGEX instead sdf_list = [fn for fn in fn_list if os.path.splitext(fn)[1] == ".sdf"] # For each sdf file in this list, get all the information # This is very fragile to the file naming scheme for fn in sdf_list: info = fn.split(".")[0].split("_") xtal = info[3] chain = info[4] cmpd_id = info[7] # the ligand re-docked to their original xtal have no mcss rank, so this # will fail try: mcss_rank = info[9] except IndexError: # however its a convenient way of identifying which is the original # xtal ref_xtal = xtal ref_pdb_fn = f"{ref_xtal}_{chain}/{ref_xtal}_{chain}_bound.pdb" # save the ref filename to the dictionary and make the mcss_rank -1 ref_fn_dict[cmpd_id] = ref_pdb_fn mcss_rank = -1 # append all the stuff to lists! cmpd_ids.append(cmpd_id) xtal_ids.append(xtal) chain_ids.append(chain) mcss_ranks.append(mcss_rank) sdf_fns.append(fn) cmplx_ids.append(f"{cmpd_id}_{xtal}_{chain}_{mcss_rank}") ref_fns = [ref_fn_dict[cmpd_id] for cmpd_id in cmpd_ids] self.df = pd.DataFrame( { "Complex_ID": cmplx_ids, "Compound_ID": cmpd_ids, "Crystal_ID": xtal_ids, "Chain_ID": chain_ids, "MCSS_Rank": mcss_ranks, "SDF_Filename": sdf_fns, "Reference": ref_fns, } ) def write_csv(self, output_csv_fn): self.df.to_csv(output_csv_fn, index=False) def analyze_docking_results(self, fragalysis_dir, output_csv_fn, test=False): self.organize_docking_results() if test: self.df = self.df.head() self.write_csv(output_csv_fn=output_csv_fn)
[docs] def get_good_score(score): """ The idea here is that given an array from a DataFrame, x, that is a particular score type, there should be a single function that will tell you whether we think this score is "good" or not. I'm not sure this is the best way to do this but someway to make it obvious when we are using which cutoff would be good. Parameters ---------- score Returns ------- """ if score == "RMSD": lambda_func = lambda x: x[(x <= 2.5)].count() # noqa: E731 elif score == "POSIT": lambda_func = lambda x: x[(x > 0.7)].count() # noqa: E731 elif score == "Chemgauss4": lambda_func = lambda x: x[(x < 0)].count() # noqa: E731 elif score == "POSIT_R": lambda_func = lambda x: x[(x < 0.3)].count() # noqa: E731 else: raise NotImplementedError( f"good score acquisition not implemented for {score}. " f'Try using one of ["RMSD", "POSIT", "Chemgauss4", "POSIT_R"]' ) return lambda_func
[docs] class DockingResults: """ This is a class to parse docking results from a csv file. Mainly for mainipulating the data in various useful ways. """ column_names_dict = { "legacy": [ "ligand_id", "du_structure", "docked_file", "docked_RMSD", "POSIT_prob", "chemgauss4_score", "clash", ], "legacy_smiles": [ "ligand_id", "du_structure", "docked_file", "docked_RMSD", "POSIT_prob", "chemgauss4_score", "clash", "SMILES", ], "legacy_cleaned": [ "Compound_ID", "Structure_Source", "Docked_File", "RMSD", "POSIT", "Chemgauss4", "Clash", ], "legacy_cleaned_dimer": [ "Compound_ID", "Structure_Source", "Dimer", "Docked_File", "RMSD", "POSIT", "Chemgauss4", "Clash", ], "default_dimer": [ "Compound_ID", "Structure_Source", "Dimer", "Docked_File", "RMSD", "POSIT", "Chemgauss4", "Clash", "SMILES", ], "default": [ "Compound_ID", "Structure_Source", "Docked_File", "RMSD", "POSIT", "Chemgauss4", "Clash", "SMILES", ], }
[docs] def __init__(self, csv_path=None, df=None, column_names="default"): """ Parameters ---------- csv_path: path to csv file Optional df: pd.DataFrame """ if type(csv_path) is str: # load in data and replace the annoying `-1.0` and `-1` values with nans self.df = pd.read_csv(csv_path).replace(-1.0, np.nan).replace(-1, np.nan) elif isinstance(df, pd.DataFrame): if self.column_names_dict.get(column_names): df.columns = self.column_names_dict.get(column_names) self.df = df else: raise Exception("Must pass either a dataframe or a csv path")
[docs] def get_grouped_df( self, groupby_ID_column="Compound_ID", score_columns=("RMSD", "POSIT_R", "Chemgauss4", "MCSS_Rank"), ): """ The purpose of this function is to get a dataframe with meaningful information grouped by either the Compound_ID or by the Structure_Source. Parameters ---------- groupby_ID_column score_columns Returns ------- """ # TODO: Default argument `score_columns` is a mutable. This can lead to # unexpected behavior in Python. # TODO: I think this can be done without a for loop by # replacing [[score]] with [score_columns] score_df_list = [] for score in score_columns: if not score in self.df.columns: # noqa: E713 print(f"Skipping {score}") continue if not self.df[score].any(): print(f"Skipping {score} since no non-NA scores were found") continue # Group by the groupby_ID_column and then get either the number, mean, # or mean of the identified score not_na = self.df.groupby(groupby_ID_column)[[score]].count() mean = self.df.groupby(groupby_ID_column)[[score]].mean() min = self.df.groupby(groupby_ID_column)[[score]].min() # I barely understand how this works but basically it # applies the lambda function returned by `get_good_score` and then groups # the results, which means that # it can count how many "good" scores there were for each group. good = self.df.groupby(groupby_ID_column)[[score]].apply( get_good_score(score) ) feature_df = pd.concat([not_na, good, mean, min], axis=1) feature_df.columns = [ f"{score}_{name}" for name in ["Not_NA", "Good", "Mean", "Min"] ] score_df_list.append(feature_df) grouped_df = pd.concat(score_df_list, axis=1) grouped_df[groupby_ID_column] = grouped_df.index return grouped_df
def get_compound_df(self, csv_path=None, **kwargs): if csv_path: self.compound_df = pd.read_csv(csv_path) else: self.compound_df = self.get_grouped_df( groupby_ID_column="Compound_ID", **kwargs )
[docs] def get_structure_df(self, csv_path=None, resolution_csv=None, **kwargs): """ Either pull the structure_df from the csv file or generate it using the get_grouped_df function in addition to what the function normally does it also adds the resolution Parameters ---------- csv_path kwargs Returns ------- """ if csv_path: self.structure_df = pd.read_csv(csv_path) else: self.structure_df = self.get_grouped_df( groupby_ID_column="Structure_Source", **kwargs ) if resolution_csv: # Get a dictionary with {"PDB ID": "Resolution", ..} with open(resolution_csv) as f: resolution_df = pd.read_csv(f) resolution_df.index = resolution_df["PDB ID"] resolution_dict = resolution_df.to_dict()["Resolution"] # Iterates over the dataframe # Uses regex to pull out the PDB ID from each Structure_Source # and makes a list of the Resolutions that is saved to a column # of the new structure_df self.structure_df["Resolution"] = [ resolution_dict.get( re.search( pattern=r"[\d][A-Za-z0-9]{3}", string=pdb, ).group(0) ) for pdb in self.structure_df.Structure_Source.to_list() ]
[docs] def get_best_structure_per_compound( self, filter_score="RMSD", filter_value=2.5, score_order=("POSIT_R", "Chemgauss4", "RMSD"), score_ascending=(True, True, True), ): """ Gets the best structure by first filtering based on the filter_score and filter_value, then sorts in order of the scores listed in score_order. As with everything else, lower scores are assumed to be better, requiring a conversion of some scores. Parameters ---------- filter_score filter_value score_order Returns ------- """ # TODO: also this is really fragile # first do filtering print(filter_score, filter_value) if filter_score and type(filter_value) is float: print(f"Filtering by {filter_score} less than {filter_value}") filtered_df = self.df[self.df[filter_score] < filter_value] sort_list = ["Compound_ID"] + score_order # sort dataframe, ascending (smaller / better scores will move to the top) sorted_df = filtered_df.sort_values( sort_list, ascending=[True] + score_ascending ) # group by compound id and return the top row for each group g = sorted_df.groupby("Compound_ID") self.best_df = g.head(1) return self.best_df
def write_dfs_to_csv(self, output_dir): self.df.to_csv(os.path.join(output_dir, "all_results_cleaned.csv"), index=False) self.compound_df.to_csv( os.path.join(output_dir, "by_compound.csv"), index=False ) self.structure_df.to_csv( os.path.join(output_dir, "by_structure.csv"), index=False ) self.best_df.to_csv(os.path.join(output_dir, "best_results.csv"), index=False)
[docs] def load_dataframes(input_dir): """ Load csv files from an input directory Parameters ---------- input_dir Returns ------- dictionary of dataframes """ all_results_csv = os.path.join(input_dir, "all_results_cleaned.csv") by_compound_csv = os.path.join(input_dir, "by_compound.csv") by_structure_csv = os.path.join(input_dir, "by_structure.csv") df = pd.read_csv(all_results_csv) df.index = df.Complex_ID tidy = df.melt(id_vars="Complex_ID") df = df.round({"Chemgauss4": 3, "POSIT": 3, "POSIT_R": 3, "RMSD": 3}) by_compound = pd.read_csv(by_compound_csv) by_compound_tidy = by_compound.melt(id_vars="Compound_ID") by_structure = pd.read_csv(by_structure_csv) by_structure_tidy = by_structure.melt(id_vars="Structure_Source") return { "tidy": tidy, "df": df, "by_compound_tidy": by_compound_tidy, "by_compound": by_compound, "by_structure_tidy": by_structure_tidy, "by_structure": by_structure, }
[docs] def calculate_rmsd_openeye( reference_ligand: oechem.OEMol or oechem.OEGraphMol, docked_ligand: oechem.OEMol or oechem.OEGraphMol, ) -> float: """ Calculate the root-mean-square deviation (RMSD) between the coordinates of two OpenEye molecules. The OEMol or OEGraphMol objects are converted to coordinates using the .GetCoords() method and then oechem.OERMSD is used. Parameters ---------- reference_ligand : oechem.OEMol or oechem.OEGraphMol The reference molecule to which the docked molecule is compared. docked_ligand : oechem.OEMol or oechem.OEGraphMol The docked molecule to be compared to the reference molecule. Returns ------- float The RMSD between the two molecules' coordinates, with hydrogens excluded. """ # Calculate RMSD oechem.OECanonicalOrderAtoms(reference_ligand) oechem.OECanonicalOrderBonds(reference_ligand) oechem.OECanonicalOrderAtoms(docked_ligand) oechem.OECanonicalOrderBonds(docked_ligand) # Get coordinates, filtering out Hs predocked_coords = [ c for a in reference_ligand.GetAtoms() for c in reference_ligand.GetCoords()[a.GetIdx()] if a.GetAtomicNum() != 1 ] docked_coords = [ c for a in docked_ligand.GetAtoms() for c in docked_ligand.GetCoords()[a.GetIdx()] if a.GetAtomicNum() != 1 ] rmsd = oechem.OERMSD( oechem.OEDoubleArray(predocked_coords), oechem.OEDoubleArray(docked_coords), len(predocked_coords) // 3, ) return rmsd
[docs] class TanimotoType(str, Enum): """ Enum for the different types of Tanimoto coefficients that can be calculated. """ SHAPE = "TanimotoShape" COLOR = "TanimotoColor" COMBO = "TanimotoCombo"
[docs] def calculate_tanimoto_oe( refmol: Ligand, fitmol: Ligand, compute_type: TanimotoType = TanimotoType.COMBO, ): """ Calculate the Tanimoto coefficient between two molecules using OpenEye's shape toolkit. Parameters ---------- refmol : Ligand The reference molecule to which the docked molecule is compared. fitmol : Ligand The docked molecule to be compared to the reference molecule. Returns ------- float The Tanimoto coefficient between the two molecules. """ refmol = refmol.to_oemol() fitmol = fitmol.to_oemol() # Prepare reference molecule for calculation # With default options this will remove any explicit hydrogens present prep = oeshape.OEOverlapPrep() prep.Prep(refmol) # Get appropriate function to calculate exact shape shapeFunc = oeshape.OEOverlapFunc() shapeFunc.SetupRef(refmol) res = oeshape.OEOverlapResults() prep.Prep(fitmol) shapeFunc.Overlap(fitmol, res) if compute_type == TanimotoType.SHAPE: return res.GetShapeTanimoto() elif compute_type == TanimotoType.COLOR: return res.GetColorTanimoto() elif compute_type == TanimotoType.COMBO: return res.GetTanimotoCombo()
[docs] def write_all_rmsds_to_reference( ref_mol: oechem.OEGraphMol, docked_mols: [oechem.OEGraphMol], output_dir: Path, compound_id, ): """ Calculates the RMSD between a reference molecule and a list of docked molecules, and saves the results to a .npy file. Parameters ---------- ref_mol: oechem.OEGraphMol The reference molecule, for which the RMSD is calculated. docked_mols: [oechem.OEGraphMol] A list of docked molecules, for which the RMSD is calculated with respect to the reference molecule. output_dir: Path The directory where the output file will be saved. compound_id: str A unique identifier for the compound being analyzed, used as the name of the output file. Returns ------- None """ rmsds = [ (docked_mol.GetTitle(), calculate_rmsd_openeye(ref_mol, docked_mol)) for docked_mol in docked_mols ] np.save(str(output_dir / f"{compound_id}.npy"), rmsds)