"""
Provides argument parsing, job setup/cleanup, and other and miscellaneous
functionality for phase_mmp3d_driver.py.
Copyright Schrodinger LLC, All Rights Reserved.
"""
import argparse
import base64
import csv
import json
import os
import shutil
import tempfile
from enum import Enum
from schrodinger import structure
from schrodinger.application.phase.packages import mmp2d
from schrodinger.application.phase.packages import phase_utils
from schrodinger.infra import phase
from schrodinger.job import jobcontrol
from schrodinger.utils import cmdline
from schrodinger.utils import fileutils
from schrodinger.utils import log
logger = log.get_output_logger(__file__)
PHASE_MMP3D = "phase_mmp3d"
# Extensions of various files:
SMILES_EXT = "_smiles.csv"
SMILES_TO_3D_EXT = "_smiles_to_3d.maegz"
PAIRS_EXT = "_pairs.csv"
MMP3D_PHPRJ_EXT = "_mmp3d" + phase.PHASE_PRJ_FILE_EXT
MMP3D_PHZIP_EXT = "_mmp3d" + phase.PHASE_ZPRJ_FILE_EXT
MMP3D_DB_EXT = "_mmp3d.maegz"
# Properties stored in each 3D structure created from SMILES:
MMP3D_ID_PROP = "i_mmp3d_id"
MMP3D_PUBLIC_ID_PROP = "s_mmp3d_public_id"
MMP3D_SMILES_PROP = "s_mmp3d_smiles"
MMP3D_ACTIVITY_PROP = "r_mmp3d_activity"
# If -trans is used, the following base64-encoded property will be present
# in each structure in the file of aligned 3D pairs.
MMP2D_TRANSFORMATIONS = "s_mmp2d_transformations"
SubjobType = Enum("SubjobType", "smiles_to_3d align_pairs")
SUBJOB_FILE_EXT = {
SubjobType.smiles_to_3d: SMILES_EXT,
SubjobType.align_pairs: PAIRS_EXT
}
# Default minimum numbers of inputs per subjob:
DEFAULT_MIN_SMILES = 500
DEFAULT_MIN_PAIRS = 25
[docs]def align_pairs(project_path, id_pairs_file, out_mae, verbose=False):
"""
Aligns pairs of multi-conformer ligands and writes the alignments to
a Maestro file.
:param project_path: Phase project containing the ligands
:type project_path: str
:param id_pairs_file: CSV file with sorted pairs of ligand IDs
:type id_pairs: str
:param out_mae: Output Maestro file for alignments
:type out_mae: str
:param verbose: If true, a one-line summary will be printed for each pair
:type verbose: bool
"""
ligand_id1_prev = 0
bus1 = phase.PhpStructureBus()
project = phase.PhpProject()
project.openProject(project_path)
if verbose:
logger.info('\nid1,public_id1,id2,public_id2,shape_sim')
with structure.StructureWriter(out_mae) as writer:
with open(id_pairs_file, 'r') as fh:
for ligand_id1, ligand_id2 in csv.reader(fh):
if ligand_id1 != ligand_id1_prev:
bus1 = phase.PhpStructureBus(
project.getConformers(int(ligand_id1)), False)
ligand_id1_prev = ligand_id1
bus2 = phase.PhpStructureBus(
project.getConformers(int(ligand_id2)), False)
try:
sim_pair = phase.get_max_pairwise_similarity(
bus1.getCtVector(), bus2.getCtVector())
except phase.PhpException as exc:
if verbose:
s = f'Pair {ligand_id1}, {ligand_id2}: {exc}'
logger.info(s)
continue
st1 = structure.Structure(sim_pair.ctA)
st2 = structure.Structure(sim_pair.ctB)
public_id1 = st1.property[MMP3D_PUBLIC_ID_PROP]
public_id2 = st2.property[MMP3D_PUBLIC_ID_PROP]
st1.title = public_id1
st2.title = public_id2
if verbose:
cmpd_id1 = st1.property[MMP3D_ID_PROP]
cmpd_id2 = st2.property[MMP3D_ID_PROP]
sim = "%.6f" % sim_pair.sim_trans[0]
s = f'{cmpd_id1},{public_id1},{cmpd_id2},{public_id2},{sim}'
logger.info(s)
writer.extend((st1, st2))
project.closeProject()
[docs]def combine_alignments(args, nsub):
"""
Combines pairwise alignments from subjobs.
:param args: argparser.Namespace with command line options
:type args: argparser.Namespace
:param nsub: Number of subjobs
:type nsub: int
"""
infiles = []
jobname = phase_utils.get_jobname(args, args.mmp2d_path)
subjob_prefix = jobname + "_align_pairs_sub_"
for i in range(nsub):
infile = f'{subjob_prefix}{i + 1}{MMP3D_DB_EXT}'
infiles.append(infile)
outfile = jobname + MMP3D_DB_EXT
fileutils.cat(infiles, outfile)
[docs]def convert_smiles_to_3d(in_smiles, out_mae, verbose=False):
"""
Creates 3D structures from SMILES.
:param in_smiles: Input CSV file with id, public_id, smiles, and prop_value
:type in_smiles: str
:param out_mae: Output Maestro file for 3D structures
:type out_mae: str
:param verbose: If true, each input row from infile will be printed
:type verbose: bool
"""
if verbose:
logger.info('\nid,public_id,smiles,prop_value')
with structure.StructureWriter(out_mae) as writer:
with open(in_smiles, 'r') as fh:
for id, public_id, smiles, prop_value in csv.reader(fh):
if verbose:
logger.info(f'{id},{public_id},{smiles},{prop_value}')
st = structure.SmilesStructure(smiles).get3dStructure(
require_stereo=False)
st.property[MMP3D_ID_PROP] = int(id)
st.property[MMP3D_PUBLIC_ID_PROP] = public_id
st.property[MMP3D_SMILES_PROP] = smiles
st.property[MMP3D_ACTIVITY_PROP] = float(prop_value)
writer.append(st)
[docs]def create_phase_project(project_path, maefiles):
"""
Creates a multi-conformer Phase project from the supplied Maestro files.
:param project_path: Path to project to be created
:type project_path: str
:param maefiles: List of Maestro files with one conformer per compound
:type maefiles: str
"""
with tempfile.TemporaryDirectory() as list_dir:
list_file = os.path.join(list_dir, "maefiles.list")
phase_utils.write_list_to_file(list_file, maefiles)
project = phase.PhpProject()
fd_file = phase.getDefaultFdFilePath()
read_mode = phase.FILE_SINGLE_CONF
title_prop = ""
act_prop = ""
jobname = "import"
save = False
hypo_fmt = "single"
energy_prop = ""
project.newProject(project_path, list_file, fd_file, read_mode,
title_prop, act_prop, jobname, save, hypo_fmt,
energy_prop)
conf_options = phase.PhpConfOptions()
conf_options.setMaxConfs(50)
jobname = "revise"
project.generateConfs(conf_options, jobname, save)
project.closeProject()
[docs]def get_compound_id_pairs(maefile):
"""
Given a Maestro file with pairs of aligned structures, this function reads
pairs of compound ids from MMP3D_ID_PROP and returns the pairs in a set.
:param maefile: Maestro file with pairs of alignments
:type maefile: str
:return: Set containing pairs of compound ids
:rtype: set((int, int))
"""
id_pairs = set()
prop = MMP3D_ID_PROP
with structure.StructureReader(maefile) as reader:
for st1 in reader:
st2 = next(reader)
id_pairs.add((st1.property[prop], st2.property[prop]))
return id_pairs
[docs]def get_parser():
"""
Creates argparse.ArgumentParser with supported command line options.
:return: Argument parser object
:rtype: argparser.ArgumentParser
"""
parser = argparse.ArgumentParser(
prog=PHASE_MMP3D, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument(
"mmp2d_path",
metavar="<mmp2d>",
help="Matched molecular pairs 2D SQLite database created using "
"LiveDesign. This database must contain the following tables: "
"\"compound\", \"property_name\", \"compound_property\", and "
"\"pair\". The 3D matched molecular pairs database created by this "
"program will be <jobname>_mmp3d.maegz, where <jobname> is derived "
"from the base name of <mmp2d>.")
parser.add_argument(
"propname",
metavar="<propname>",
help="The name of the property in <mmp2d> that will be used to "
"identify activity cliffs.")
parser.add_argument(
"cutoff",
type=float,
metavar="<cutoff>",
choices=[phase_utils.RestrictedRange(0, None, True)],
help="Activity cliff cutoff. All matched molecular pairs which exhibit "
"an absolute property difference of at least this value will be "
"considered.")
parser.add_argument(
"-min_smiles",
type=int,
metavar="<n>",
choices=[phase_utils.RestrictedRange(1, None, True)],
default=DEFAULT_MIN_SMILES,
help="Reduce the number of SMILES->3D subjobs, as necessary, to ensure "
"that at least this many SMILES are processed per subjob (default: "
"%d)." % DEFAULT_MIN_SMILES)
parser.add_argument(
"-min_pairs",
type=int,
metavar="<n>",
choices=[phase_utils.RestrictedRange(1, None, True)],
default=DEFAULT_MIN_PAIRS,
help="Reduce the number of activity cliff alignment subjobs, as "
"necessary, to ensure that at least this many pairs are aligned per "
"subjob (default: %d)." % DEFAULT_MIN_PAIRS)
parser.add_argument(
"-trans",
action="store_true",
help="Add to each structure in a given pair the base64-encoded "
"property %s with details of the transformation(s) linking the two "
"structures." % MMP2D_TRANSFORMATIONS)
jobcontrol_options = [cmdline.HOST, cmdline.JOBNAME, cmdline.TMPDIR]
cmdline.add_jobcontrol_options(parser, options=jobcontrol_options)
# Hidden options for subjobs:
parser.add_argument("-subjob", help=argparse.SUPPRESS)
parser.add_argument("-smiles_to_3d",
action="store_true",
help=argparse.SUPPRESS)
parser.add_argument("-align_pairs",
action="store_true",
help=argparse.SUPPRESS)
return parser
[docs]def get_ligand_id_pairs(project_path, mmp_id_pairs):
"""
Returns pairs of ligand IDs in the supplied Phase project that correspond
to the provided pairs of compound IDs from the MMP 2D database. A pair will
be skipped if either of the compounds in the pair failed to be imported
into the project due to size or other characteristics that are unacceptable
to phase_database.
:param project_path: Path to Phase project
:type project_path: str
:param mmp_id_pairs: Pairs of compounds IDs from MMP 2D database
:type mmp_id_pairs: list((int, int))
:return: Pairs of ligand IDs
:rtype: list((int, int))
"""
mmp_to_ligand = {}
project = phase.PhpProject()
project.openProject(project_path)
for ligand_id in project.getLigandIDs("all"):
st = structure.Structure(project.getConformer(ligand_id, 1))
mmp_to_ligand[st.property[MMP3D_ID_PROP]] = ligand_id
ligand_id_pairs = []
for id1, id2 in mmp_id_pairs:
try:
ligand_id_pairs.append((mmp_to_ligand[id1], mmp_to_ligand[id2]))
except KeyError:
pass
return ligand_id_pairs
[docs]def get_num_subjobs(args, total_inputs, subjob_type):
"""
Returns the number of subjobs to run, taking into account the requested
number of CPUs and the minimum allowed inputs per subjob.
:param args: argparser.Namespace with command line options
:type args: argparser.Namespace
:param total_inputs: Total number of inputs to be distributed over subjobs
:type total_inputs: int
:param subjob_type: Subjob type
:type subjob_type: SubjobType
:return: Number of subjobs
:rtype: int
"""
min_inputs = args.min_smiles if subjob_type == SubjobType.smiles_to_3d else args.min_pairs
# Ensure that multiple subjobs won't be run unless each of them is assigned
# at least min_inputs.
max_subjobs = 1 if min_inputs > total_inputs else total_inputs // min_inputs
# And don't exceed the number of CPUs requested.
ncpu = jobcontrol.calculate_njobs(jobcontrol.get_backend_host_list())
return min(ncpu, max_subjobs)
[docs]def get_parent_jobname(args):
"""
Returns parent job name of the current subjob.
:param args: argparser.Namespace with command line options
:type args: argparser.Namespace
:return: Parent job name
:rtype: str
"""
pattern = "_align_pairs_sub_" if args.align_pairs else "_smiles_to_3d_sub_"
subjob = args.subjob
return subjob[:subjob.rfind(pattern)]
[docs]def setup_distributed_align_pairs(args):
"""
Does setup for distributed alignment of activity cliff pairs.
:param args: argparser.Namespace with command line options
:type args: argparser.Namespace
:return: list of subjob commands
:rtype: list(list(str))
"""
mmpdb = args.mmp2d_path
mmp_id_pairs = mmp2d.get_activity_cliffs(mmpdb, args.propname, args.cutoff)
jobname = phase_utils.get_jobname(args, mmpdb)
project_path = jobname + MMP3D_PHPRJ_EXT
ligand_id_pairs = get_ligand_id_pairs(project_path, mmp_id_pairs)
if not ligand_id_pairs:
mesg = "No activity cliff pairs survived import into Phase project"
raise RuntimeError(mesg)
project = phase.PhpProject()
project.zipProject(".", project_path)
return split_inputs(args, ligand_id_pairs, SubjobType.align_pairs)
[docs]def setup_distributed_smiles_to_3d(args):
"""
Does setup for a distributed conversion of SMILES to 3D.
:param args: argparser.Namespace with command line options
:type args: argparser.Namespace
:return: list of subjob commands
:rtype: list(list(str))
"""
mmpdb = args.mmp2d_path
pairs = mmp2d.get_activity_cliffs(mmpdb, args.propname, args.cutoff)
cmpd_ids = mmp2d.get_ids_from_pairs(pairs)
public_dict = mmp2d.get_public_ids(mmpdb)
public_ids = [public_dict[cmpd_id] for cmpd_id in cmpd_ids]
smiles_dict = mmp2d.get_compound_smiles(mmpdb)
smiles_strings = [smiles_dict[cmpd_id] for cmpd_id in cmpd_ids]
prop_dict = mmp2d.get_property_values(mmpdb, args.propname)
prop_values = [prop_dict[cmpd_id] for cmpd_id in cmpd_ids]
rows = list(zip(cmpd_ids, public_ids, smiles_strings, prop_values))
return split_inputs(args, rows, SubjobType.smiles_to_3d)
[docs]def validate_args(args):
"""
Checks the validity of command line arguments.
:param args: argparser.Namespace with command line arguments
:type args: argparser.Namespace
:return: tuple of validity and non-empty error message if not valid
:rtype: bool, str
"""
if not os.path.isfile(args.mmp2d_path):
mesg = f'MMP 2D database \"{args.mmp2d_path}\" not found'
return False, mesg
for table in ("compound", "property_name", "compound_property", "pair"):
if not mmp2d.table_exists(args.mmp2d_path, table):
mesg = f'Table \"{table}\" not found in {args.mmp2d_path}'
return False, mesg
if not args.propname in mmp2d.get_property_names(args.mmp2d_path):
mesg = f'Property \"{args.propname}\" not found in {args.mmp2d_path}'
return False, mesg
cutoff = args.cutoff
if not mmp2d.get_activity_cliffs(args.mmp2d_path, args.propname, cutoff):
return False, f"No activity cliffs found for <cutoff> = {cutoff}"
return True, ""