Source code for schrodinger.application.phase.packages.mmp3d_driver_utils

"""
Provides argument parsing, job setup/cleanup, and other and miscellaneous
functionality for phase_mmp3d_driver.py.

Copyright Schrodinger LLC, All Rights Reserved.
"""

import argparse
import base64
import csv
import json
import os
import shutil
import tempfile
from enum import Enum

from schrodinger import structure
from schrodinger.application.phase.packages import mmp2d
from schrodinger.application.phase.packages import phase_utils
from schrodinger.infra import phase
from schrodinger.job import jobcontrol
from schrodinger.utils import cmdline
from schrodinger.utils import fileutils
from schrodinger.utils import log

logger = log.get_output_logger(__file__)

PHASE_MMP3D = "phase_mmp3d"

# Extensions of various files:
SMILES_EXT = "_smiles.csv"
SMILES_TO_3D_EXT = "_smiles_to_3d.maegz"
PAIRS_EXT = "_pairs.csv"
MMP3D_PHPRJ_EXT = "_mmp3d" + phase.PHASE_PRJ_FILE_EXT
MMP3D_PHZIP_EXT = "_mmp3d" + phase.PHASE_ZPRJ_FILE_EXT
MMP3D_DB_EXT = "_mmp3d.maegz"

# Properties stored in each 3D structure created from SMILES:
MMP3D_ID_PROP = "i_mmp3d_id"
MMP3D_PUBLIC_ID_PROP = "s_mmp3d_public_id"
MMP3D_SMILES_PROP = "s_mmp3d_smiles"
MMP3D_ACTIVITY_PROP = "r_mmp3d_activity"

# If -trans is used, the following base64-encoded property will be present
# in each structure in the file of aligned 3D pairs.
MMP2D_TRANSFORMATIONS = "s_mmp2d_transformations"

SubjobType = Enum("SubjobType", "smiles_to_3d align_pairs")
SUBJOB_FILE_EXT = {
    SubjobType.smiles_to_3d: SMILES_EXT,
    SubjobType.align_pairs: PAIRS_EXT
}

# Default minimum numbers of inputs per subjob:
DEFAULT_MIN_SMILES = 500
DEFAULT_MIN_PAIRS = 25


[docs]def add_transformations(mmp2d_path, maefile): """ Adds MMP transformation dictionaries to each of the aligned pairs in maefile and overwrites the file with the updated structures. The transformations (see mmp2d.get_transformations) are stored in the property MMP2D_TRANSFORMATIONS as a base64-encoded string which holds a JSON-encoded representation of the underlying Python data structure. Use decode_transformations to extract the data. :param mmp2d_path: Path to MMP 2D database :type mmp2d_path: str :param maefile: Maestro file with pairs of alignments. Overwritten. :type maefile: str """ id_pairs = get_compound_id_pairs(maefile) pair_to_rule_env_ids = mmp2d.get_pair_rule_env_ids(mmp2d_path, id_pairs) env_id_to_rule_id = mmp2d.get_rule_env_rule_id(mmp2d_path) rule_id_to_rule_smiles = mmp2d.get_rule_id_rule_smiles(mmp2d_path) env_id_to_stats = mmp2d.get_rule_env_stats(mmp2d_path) prop = MMP3D_ID_PROP with tempfile.TemporaryDirectory() as maefile_dir: transfile = os.path.join(maefile_dir, "transfile.maegz") with structure.StructureReader(maefile) as reader, \ structure.StructureWriter(transfile) as writer: for st1 in reader: st2 = next(reader) pair = (st1.property[prop], st2.property[prop]) trans = mmp2d.get_transformations(pair, pair_to_rule_env_ids, env_id_to_rule_id, rule_id_to_rule_smiles, env_id_to_stats) json_bytes = bytes(json.dumps(trans), 'ascii') json_base64 = str(base64.b64encode(json_bytes), 'ascii') # We store the same string in both structures. st1.property[MMP2D_TRANSFORMATIONS] = json_base64 st2.property[MMP2D_TRANSFORMATIONS] = json_base64 writer.extend((st1, st2)) shutil.copyfile(transfile, maefile)
[docs]def align_pairs(project_path, id_pairs_file, out_mae, verbose=False): """ Aligns pairs of multi-conformer ligands and writes the alignments to a Maestro file. :param project_path: Phase project containing the ligands :type project_path: str :param id_pairs_file: CSV file with sorted pairs of ligand IDs :type id_pairs: str :param out_mae: Output Maestro file for alignments :type out_mae: str :param verbose: If true, a one-line summary will be printed for each pair :type verbose: bool """ ligand_id1_prev = 0 bus1 = phase.PhpStructureBus() project = phase.PhpProject() project.openProject(project_path) if verbose: logger.info('\nid1,public_id1,id2,public_id2,shape_sim') with structure.StructureWriter(out_mae) as writer: with open(id_pairs_file, 'r') as fh: for ligand_id1, ligand_id2 in csv.reader(fh): if ligand_id1 != ligand_id1_prev: bus1 = phase.PhpStructureBus( project.getConformers(int(ligand_id1)), False) ligand_id1_prev = ligand_id1 bus2 = phase.PhpStructureBus( project.getConformers(int(ligand_id2)), False) try: sim_pair = phase.get_max_pairwise_similarity( bus1.getCtVector(), bus2.getCtVector()) except phase.PhpException as exc: if verbose: s = f'Pair {ligand_id1}, {ligand_id2}: {exc}' logger.info(s) continue st1 = structure.Structure(sim_pair.ctA) st2 = structure.Structure(sim_pair.ctB) public_id1 = st1.property[MMP3D_PUBLIC_ID_PROP] public_id2 = st2.property[MMP3D_PUBLIC_ID_PROP] st1.title = public_id1 st2.title = public_id2 if verbose: cmpd_id1 = st1.property[MMP3D_ID_PROP] cmpd_id2 = st2.property[MMP3D_ID_PROP] sim = "%.6f" % sim_pair.sim_trans[0] s = f'{cmpd_id1},{public_id1},{cmpd_id2},{public_id2},{sim}' logger.info(s) writer.extend((st1, st2)) project.closeProject()
[docs]def combine_alignments(args, nsub): """ Combines pairwise alignments from subjobs. :param args: argparser.Namespace with command line options :type args: argparser.Namespace :param nsub: Number of subjobs :type nsub: int """ infiles = [] jobname = phase_utils.get_jobname(args, args.mmp2d_path) subjob_prefix = jobname + "_align_pairs_sub_" for i in range(nsub): infile = f'{subjob_prefix}{i + 1}{MMP3D_DB_EXT}' infiles.append(infile) outfile = jobname + MMP3D_DB_EXT fileutils.cat(infiles, outfile)
[docs]def convert_smiles_to_3d(in_smiles, out_mae, verbose=False): """ Creates 3D structures from SMILES. :param in_smiles: Input CSV file with id, public_id, smiles, and prop_value :type in_smiles: str :param out_mae: Output Maestro file for 3D structures :type out_mae: str :param verbose: If true, each input row from infile will be printed :type verbose: bool """ if verbose: logger.info('\nid,public_id,smiles,prop_value') with structure.StructureWriter(out_mae) as writer: with open(in_smiles, 'r') as fh: for id, public_id, smiles, prop_value in csv.reader(fh): if verbose: logger.info(f'{id},{public_id},{smiles},{prop_value}') st = structure.SmilesStructure(smiles).get3dStructure( require_stereo=False) st.property[MMP3D_ID_PROP] = int(id) st.property[MMP3D_PUBLIC_ID_PROP] = public_id st.property[MMP3D_SMILES_PROP] = smiles st.property[MMP3D_ACTIVITY_PROP] = float(prop_value) writer.append(st)
[docs]def create_phase_project(project_path, maefiles): """ Creates a multi-conformer Phase project from the supplied Maestro files. :param project_path: Path to project to be created :type project_path: str :param maefiles: List of Maestro files with one conformer per compound :type maefiles: str """ with tempfile.TemporaryDirectory() as list_dir: list_file = os.path.join(list_dir, "maefiles.list") phase_utils.write_list_to_file(list_file, maefiles) project = phase.PhpProject() fd_file = phase.getDefaultFdFilePath() read_mode = phase.FILE_SINGLE_CONF title_prop = "" act_prop = "" jobname = "import" save = False hypo_fmt = "single" energy_prop = "" project.newProject(project_path, list_file, fd_file, read_mode, title_prop, act_prop, jobname, save, hypo_fmt, energy_prop) conf_options = phase.PhpConfOptions() conf_options.setMaxConfs(50) jobname = "revise" project.generateConfs(conf_options, jobname, save) project.closeProject()
[docs]def decode_transformations(st): """ Decodes the MMP transformations in the provided structure and returns them as a list of dictionaries. Each dictionary contains the following key, value pairs, which describe a single transformation linking the provided structure to its associated MMP: Key Value --- ----- TRANS_KEYS.FROM_SMILES MMP fragment SMIRKS for the first compound (str) TRANS_KEYS.TO_SMILES MMP fragment SMIRKS for the second compound (str) TRANS_KEYS.MIN The min statistic for the transformation (float) TRANS_KEYS.MAX The max statistic for the transformation (float) TRANS_KEYS.AVG The avg statistic for the transofrmation (float) TRANS_KEYS.STD The std statistic for the transformation (float) TRANS_KEYS.COUNT The count statistic for the transformation (int) Note that TRANS_KEYS is defined in the mmp2d module. :param st: Structure containing the property MMP2D_TRANSFORMATIONS :type st: structure.Structure :return: List of transformation dictionaries :rtype: list[dict{str: str/str/float/float/float/float/int}] """ json_bytes = base64.b64decode(st.property[MMP2D_TRANSFORMATIONS]) return json.loads(str(json_bytes, 'ascii'))
[docs]def get_compound_id_pairs(maefile): """ Given a Maestro file with pairs of aligned structures, this function reads pairs of compound ids from MMP3D_ID_PROP and returns the pairs in a set. :param maefile: Maestro file with pairs of alignments :type maefile: str :return: Set containing pairs of compound ids :rtype: set((int, int)) """ id_pairs = set() prop = MMP3D_ID_PROP with structure.StructureReader(maefile) as reader: for st1 in reader: st2 = next(reader) id_pairs.add((st1.property[prop], st2.property[prop])) return id_pairs
[docs]def get_parser(): """ Creates argparse.ArgumentParser with supported command line options. :return: Argument parser object :rtype: argparser.ArgumentParser """ parser = argparse.ArgumentParser( prog=PHASE_MMP3D, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument( "mmp2d_path", metavar="<mmp2d>", help="Matched molecular pairs 2D SQLite database created using " "LiveDesign. This database must contain the following tables: " "\"compound\", \"property_name\", \"compound_property\", and " "\"pair\". The 3D matched molecular pairs database created by this " "program will be <jobname>_mmp3d.maegz, where <jobname> is derived " "from the base name of <mmp2d>.") parser.add_argument( "propname", metavar="<propname>", help="The name of the property in <mmp2d> that will be used to " "identify activity cliffs.") parser.add_argument( "cutoff", type=float, metavar="<cutoff>", choices=[phase_utils.RestrictedRange(0, None, True)], help="Activity cliff cutoff. All matched molecular pairs which exhibit " "an absolute property difference of at least this value will be " "considered.") parser.add_argument( "-min_smiles", type=int, metavar="<n>", choices=[phase_utils.RestrictedRange(1, None, True)], default=DEFAULT_MIN_SMILES, help="Reduce the number of SMILES->3D subjobs, as necessary, to ensure " "that at least this many SMILES are processed per subjob (default: " "%d)." % DEFAULT_MIN_SMILES) parser.add_argument( "-min_pairs", type=int, metavar="<n>", choices=[phase_utils.RestrictedRange(1, None, True)], default=DEFAULT_MIN_PAIRS, help="Reduce the number of activity cliff alignment subjobs, as " "necessary, to ensure that at least this many pairs are aligned per " "subjob (default: %d)." % DEFAULT_MIN_PAIRS) parser.add_argument( "-trans", action="store_true", help="Add to each structure in a given pair the base64-encoded " "property %s with details of the transformation(s) linking the two " "structures." % MMP2D_TRANSFORMATIONS) jobcontrol_options = [cmdline.HOST, cmdline.JOBNAME, cmdline.TMPDIR] cmdline.add_jobcontrol_options(parser, options=jobcontrol_options) # Hidden options for subjobs: parser.add_argument("-subjob", help=argparse.SUPPRESS) parser.add_argument("-smiles_to_3d", action="store_true", help=argparse.SUPPRESS) parser.add_argument("-align_pairs", action="store_true", help=argparse.SUPPRESS) return parser
[docs]def get_ligand_id_pairs(project_path, mmp_id_pairs): """ Returns pairs of ligand IDs in the supplied Phase project that correspond to the provided pairs of compound IDs from the MMP 2D database. A pair will be skipped if either of the compounds in the pair failed to be imported into the project due to size or other characteristics that are unacceptable to phase_database. :param project_path: Path to Phase project :type project_path: str :param mmp_id_pairs: Pairs of compounds IDs from MMP 2D database :type mmp_id_pairs: list((int, int)) :return: Pairs of ligand IDs :rtype: list((int, int)) """ mmp_to_ligand = {} project = phase.PhpProject() project.openProject(project_path) for ligand_id in project.getLigandIDs("all"): st = structure.Structure(project.getConformer(ligand_id, 1)) mmp_to_ligand[st.property[MMP3D_ID_PROP]] = ligand_id ligand_id_pairs = [] for id1, id2 in mmp_id_pairs: try: ligand_id_pairs.append((mmp_to_ligand[id1], mmp_to_ligand[id2])) except KeyError: pass return ligand_id_pairs
[docs]def get_num_subjobs(args, total_inputs, subjob_type): """ Returns the number of subjobs to run, taking into account the requested number of CPUs and the minimum allowed inputs per subjob. :param args: argparser.Namespace with command line options :type args: argparser.Namespace :param total_inputs: Total number of inputs to be distributed over subjobs :type total_inputs: int :param subjob_type: Subjob type :type subjob_type: SubjobType :return: Number of subjobs :rtype: int """ min_inputs = args.min_smiles if subjob_type == SubjobType.smiles_to_3d else args.min_pairs # Ensure that multiple subjobs won't be run unless each of them is assigned # at least min_inputs. max_subjobs = 1 if min_inputs > total_inputs else total_inputs // min_inputs # And don't exceed the number of CPUs requested. ncpu = jobcontrol.calculate_njobs(jobcontrol.get_backend_host_list()) return min(ncpu, max_subjobs)
[docs]def get_parent_jobname(args): """ Returns parent job name of the current subjob. :param args: argparser.Namespace with command line options :type args: argparser.Namespace :return: Parent job name :rtype: str """ pattern = "_align_pairs_sub_" if args.align_pairs else "_smiles_to_3d_sub_" subjob = args.subjob return subjob[:subjob.rfind(pattern)]
[docs]def setup_distributed_align_pairs(args): """ Does setup for distributed alignment of activity cliff pairs. :param args: argparser.Namespace with command line options :type args: argparser.Namespace :return: list of subjob commands :rtype: list(list(str)) """ mmpdb = args.mmp2d_path mmp_id_pairs = mmp2d.get_activity_cliffs(mmpdb, args.propname, args.cutoff) jobname = phase_utils.get_jobname(args, mmpdb) project_path = jobname + MMP3D_PHPRJ_EXT ligand_id_pairs = get_ligand_id_pairs(project_path, mmp_id_pairs) if not ligand_id_pairs: mesg = "No activity cliff pairs survived import into Phase project" raise RuntimeError(mesg) project = phase.PhpProject() project.zipProject(".", project_path) return split_inputs(args, ligand_id_pairs, SubjobType.align_pairs)
[docs]def setup_distributed_smiles_to_3d(args): """ Does setup for a distributed conversion of SMILES to 3D. :param args: argparser.Namespace with command line options :type args: argparser.Namespace :return: list of subjob commands :rtype: list(list(str)) """ mmpdb = args.mmp2d_path pairs = mmp2d.get_activity_cliffs(mmpdb, args.propname, args.cutoff) cmpd_ids = mmp2d.get_ids_from_pairs(pairs) public_dict = mmp2d.get_public_ids(mmpdb) public_ids = [public_dict[cmpd_id] for cmpd_id in cmpd_ids] smiles_dict = mmp2d.get_compound_smiles(mmpdb) smiles_strings = [smiles_dict[cmpd_id] for cmpd_id in cmpd_ids] prop_dict = mmp2d.get_property_values(mmpdb, args.propname) prop_values = [prop_dict[cmpd_id] for cmpd_id in cmpd_ids] rows = list(zip(cmpd_ids, public_ids, smiles_strings, prop_values)) return split_inputs(args, rows, SubjobType.smiles_to_3d)
[docs]def split_inputs(args, rows, subjob_type, nsub=None): """ Divides rows of input data over subjob CSV files and returns the commands to run the subjobs. :param args: argparser.Namespace with command line arguments :type args: argparser.Namespace :param rows: Rows to split :type rows: list(tuple) :param subjob_type: Subjob type :type subjob_type: SubjobType :param nsub: Overrides automatic determination of number of subjobs :type nsub: int :return: list of subjob commands :rtype: list(list(str)) """ jobname = phase_utils.get_jobname(args, args.mmp2d_path) num_subjobs = nsub or get_num_subjobs(args, len(rows), subjob_type) counts = phase.partitionValues(len(rows), num_subjobs) isub = 0 subjobs = [] subjob_rows = [] for row in rows: subjob_rows.append(row) if len(subjob_rows) == counts[isub]: isub += 1 subjob = f'{jobname}_{subjob_type.name}_sub_{isub}' subjobs.append(subjob) subjob_file = subjob + SUBJOB_FILE_EXT[subjob_type] with open(subjob_file, 'w', newline='') as fh: writer = csv.writer(fh) for subjob_row in subjob_rows: writer.writerow(subjob_row) subjob_rows = [] common_args = [ PHASE_MMP3D, args.mmp2d_path, args.propname, str(args.cutoff), f'-{subjob_type.name}' ] if args.trans: common_args.append("-trans") return [common_args + ["-subjob", subjob] for subjob in subjobs]
[docs]def validate_args(args): """ Checks the validity of command line arguments. :param args: argparser.Namespace with command line arguments :type args: argparser.Namespace :return: tuple of validity and non-empty error message if not valid :rtype: bool, str """ if not os.path.isfile(args.mmp2d_path): mesg = f'MMP 2D database \"{args.mmp2d_path}\" not found' return False, mesg for table in ("compound", "property_name", "compound_property", "pair"): if not mmp2d.table_exists(args.mmp2d_path, table): mesg = f'Table \"{table}\" not found in {args.mmp2d_path}' return False, mesg if not args.propname in mmp2d.get_property_names(args.mmp2d_path): mesg = f'Property \"{args.propname}\" not found in {args.mmp2d_path}' return False, mesg cutoff = args.cutoff if not mmp2d.get_activity_cliffs(args.mmp2d_path, args.propname, cutoff): return False, f"No activity cliffs found for <cutoff> = {cutoff}" return True, ""