"""
Provides miscellaneous functionality for combinatorial_screen_driver.py.
Copyright Schrodinger LLC, All Rights Reserved.
"""
import argparse
import csv
import json
import os
import shutil
import zipfile
from operator import itemgetter
from rdkit import Chem
from schrodinger import structure
from schrodinger.application.combinatorial_screen import combinatorial_screener
from schrodinger.application.pathfinder import molio
from schrodinger.application.pathfinder import route
from schrodinger.application.pathfinder import synthesis
from schrodinger.infra import canvas
from schrodinger.job import jobcontrol
from schrodinger.utils import cmdline
from schrodinger.utils import csv_unicode
from schrodinger.utils import fileutils
COMBINATORIAL_SCREEN = "combinatorial_screen"
COMBINATORIAL_SYNTHESIS = "combinatorial_synthesis"
DEFAULT_REPORT = 10
DEFAULT_PRODUCTS = 1000
DEFAULT_RETRY = 3
LEGAL_STRUCTURE_FILE_TYPES = [
fileutils.MAESTRO, fileutils.SD, fileutils.SMILESCSV, fileutils.SMILES
]
LEGAL_REACTANT_FILE_EXT = [".csv", ".pfx"]
SIM_PROP = "r_screen_SimTanimoto"
# Hard-coded file names within archive:
ROUTE_FILE = "route.json"
FP_LIST_FILE = "fp_files.csv"
# Parameters for adaptive adjustment of the number of requested products:
ADAPTIVE_MULTIPLIER_LIMIT = 4.0
ADAPTIVE_BUMP = 1.1
# Environment variable for directory containing reactant fingerprint files:
REACTANT_FP_DIR = "SCHRODINGER_REACTANT_FP_DIR"
[docs]def combine_log_files(subjobs, logger):
"""
Concatenates the contents of subjob log files.
:param subjobs: Subjob names
:type subjobs: list(str)
:param logger: Logger to which concatenated log files are to be written
:type logger: Logger
"""
logger.info("Combining subjob log files")
for subjob in subjobs:
logfile = subjob + ".log"
if os.path.isfile(logfile):
logger.info(f"\n*** Contents of {logfile} ***\n")
with open(logfile, 'r') as fh:
for line in fh:
logger.info(line.rstrip())
logger.info(f"*** End of {logfile} ***")
else:
logger.info(f"\n*** {logfile} Not found ***\n")
[docs]def create_clib_archive(archive_file, route_node, fp_dir=None):
"""
Creates a combinatorial library archive (.czip) from a synthetic route
node and fingerprint files for the reactants. It is assumed that for each
reactant file <path>/<reactant>.* referenced in route_node, there's an
associated fingerprint file <reactant>.fp in the CWD or in fp_dir.
:param archive_file: The name of the archive to create.
:type archive_file: str
:param route_node: Synthetic route node
:type route_node: route.RouteNode
:param fp_dir: The directory in which to look for a given fingerprint
file if it's not found in the CWD.
:type fp_dir: str
"""
if not archive_file.endswith(".czip"):
raise OSError('Combinatorial library archive must end with ".czip"')
fp_files = []
for source in synthesis.get_reagent_sources(route_node):
fp_file = get_existing_fp_file(
fileutils.get_basename(source.filename) + ".fp", fp_dir)
if not fp_file:
raise FileNotFoundError(f'Fingerprint file "{fp_file}" not found')
fp_files.append(fp_file)
archive_dir = archive_file[:-4] + "clib"
fileutils.force_rmtree(archive_dir)
os.mkdir(archive_dir)
fp_list_file = os.path.join(archive_dir, FP_LIST_FILE)
with csv_unicode.writer_open(fp_list_file) as fh:
writer = csv_unicode.writer(fh)
for fp_file in fp_files:
shutil.copy(fp_file, archive_dir)
reactant_count = canvas.ChmFPIn32(fp_file).getRowCount()
row = [fileutils.os.path.basename(fp_file), reactant_count]
writer.writerow(row)
# Create a route file within archive_dir which references reactant
# files that will ultimately contain subsets of reactants identified
# when a screen is run.
archive_route_node = replace_reactant_file_names(route_node)
archive_route_file = os.path.join(archive_dir, ROUTE_FILE)
archive_route_node.write(archive_route_file, self_contained=True)
zip_library_dir(archive_dir)
fileutils.force_rmtree(archive_dir)
[docs]def create_reactant_files(reactant_files, smiles, titles, counts):
"""
Creates reactant CSV files containing subsets of reactants. If N is the
number of reactants, then reactant_files, smiles, titles and counts
should each have a length of N. The number of smiles and titles written
for the ith reactant is counts[i].
:param reactant_files: The N reactant files to create
:type reactant_files: list(str)
:param smiles: N lists of reactant SMILES
:type smiles: list(list(str))
:param titles: N lists of reactant titles
:type titles: list(list(str))
:param counts: The number of reactants in each of the N subsets
:type subset: list(int)
"""
prev_reactant_files = set()
for i in range(len(reactant_files)):
reactant_file = reactant_files[i]
if reactant_file in prev_reactant_files:
continue
prev_reactant_files.add(reactant_file)
with csv_unicode.writer_open(reactant_file) as fh:
writer = csv_unicode.writer(fh)
writer.writerow(["SMILES", "s_m_title"])
for j in range(counts[i]):
writer.writerow([smiles[i][j], titles[i][j]])
[docs]def find_reactions(reaction_dict, reactions):
"""
Recursive function for descending a reaction route dictionary and
finding reaction names.
:param reaction_dict: Dictionary containing "reaction_name" and
"precursors" keys, where the latter points to a list of
dictionaries, one of which may also contain "reaction_name"
and "precursors" keys, hence the need for recursion.
:type reaction_dict: dict
:param reactions: List of reaction names. Updated by this function.
:type reactions: list(str)
"""
if "reaction_name" in reaction_dict:
reactions.append(reaction_dict["reaction_name"])
if "precursors" in reaction_dict:
precursors = reaction_dict["precursors"] # A list of dictionaries
for p_dict in precursors:
if "reaction_name" in p_dict:
# Recursive call.
find_reactions(p_dict, reactions)
[docs]def get_clib_dir(archive_file):
"""
Returns the name of the .clib directory within the supplied .czip
archive. The normal convention is <library>.czip <--> <library>.clib,
but a user may decide to change the name of the root portion of
the archive so that this is no longer true.
:param archive_file: The name of the combinatorial library archive
:type archive_file: str
:return: The name of the .clib directory within the archive
:rtype: str
:raises OSError: If no .clib directory is found
"""
with zipfile.ZipFile(archive_file) as zfile:
for file_name in zfile.namelist():
pos = file_name.find(".clib")
if pos != -1:
return file_name[:pos + 5]
raise OSError(f"No .clib directory found in \"{archive_file}\"")
[docs]def get_combinatorial_synthesis_command(args):
"""
Returns the command that should be used to run combinatorial_synthesis
minus the -max_products option.
:param args: Command line arguments
:type args: argparse.Namespace
:return: combinatorial_synthesis command
:rtype: list(str)
"""
jobname = get_jobname(args) + "_products"
out_file = jobname + ".smi"
command = [
COMBINATORIAL_SYNTHESIS, ROUTE_FILE, "-out", out_file,
"-no_multiple_products", "-JOBNAME", jobname
]
host_list = jobcontrol.get_backend_host_list()
if host_list:
host_str = jobcontrol.host_list_to_str(host_list)
command += ["-HOST", host_str]
njobs = jobcontrol.calculate_njobs(host_str)
command += ["-NJOBS", str(njobs)]
return command
[docs]def get_distributed_fp_commands(args):
"""
Returns lists of subjob commands for running distributed reactant
fingerprint generation. Commands are not returned for fingerprint files
that already exist in the CWD or in args.fp_dir.
:param args: Command line arguments
:type args: argparse.Namespace
:return: list of subjob commands
:rtype: list(list(str))
"""
jobname = get_jobname(args)
commands = []
route_file = jobcontrol.get_runtime_path(args.route_file)
prev_fps = set()
i = 0
for reactant_file in get_reactant_files(route_file):
fp_file = fileutils.get_basename(reactant_file) + ".fp"
# We don't need to run a subjob for this reactant if the fingerprints
# already exist or if it's a duplicate occurrence.
if get_existing_fp_file(fp_file, args.fp_dir) or fp_file in prev_fps:
continue
prev_fps.add(fp_file)
i += 1
subjob = f"{jobname}_sub_{i}"
command = [
COMBINATORIAL_SCREEN, "setup", "-route", route_file, "-subjob",
subjob, "-irct", reactant_file, "-ofp", fp_file, args.lib_file
]
commands.append(command)
return commands
[docs]def get_existing_fp_file(fp_file, fp_dir=None):
"""
Returns the name of a reactant fingerprint file if the file exists in
the CWD or in fp_dir. Returns None if the file doesn't exist in either
location.
:param fp_file: The reactant fingerprint file. Only the basename is used.
:type fp_file: str
:param fp_dir: The directory in which to look for the fingerprint file.
If None, only the CWD is checked.
:type fp_dir: str
:return: The name of the existing fingerprint file, or None. If found
in fp_dir, the leading path to fp_dir is included.
:rtype: str
"""
fp_base_file = os.path.basename(fp_file)
if os.path.isfile(fp_base_file):
return fp_base_file
elif fp_dir:
fp_dir_file = os.path.join(fp_dir, fp_base_file)
if os.path.isfile(fp_dir_file):
return fp_dir_file
return
[docs]def get_existing_fp_files(route_node, fp_dir=None):
"""
Returns the names of any existing reactant fingerprint files in the
CWD or in fp_dir that will be used in lieu of generating from scratch.
:param route_node: Synthetic route node
:type route_node: route.RouteNode
:param fp_dir: The directory in which to look for the fingerprint file.
If None, only the CWD is checked.
:type fp_dir: str
:return: Existing fingerprint file names, including the leading
path if found in fp_dir.
:rtype: list(str)
"""
fp_files = []
for source in synthesis.get_reagent_sources(route_node):
fp_file = get_existing_fp_file(
fileutils.get_basename(source.filename) + ".fp", fp_dir)
if fp_file:
fp_files.append(fp_file)
return fp_files
[docs]def get_fp_files(archive_dir):
"""
Returns the names of the fingerprint files within the directory of an
extracted combinatorial library archive.
:param archive_dir: Path to archive directory (.clib)
:type archive_dir: str
:return: Fingerprint files names, including leading path
:rtype: list(str)
"""
fp_files = []
fp_list_file = os.path.join(archive_dir, FP_LIST_FILE)
with open(fp_list_file) as fh:
for row in csv.reader(fh):
fp_files.append(os.path.join(archive_dir, row[0]))
return fp_files
[docs]def get_jobname(args):
"""
Returns an appropriate name based on args.subjob, SCHRODINGER_JOBNAME,
the job control backend, or the base name of args.lib_file. In the last
case, "_setup" or "_run" is appended to the string.
:param args: Command line arguments
:type args: argparse.Namespace
:return: job name
:rtype: str
"""
if args.subjob:
return args.subjob
file_taskname = fileutils.get_basename(args.lib_file) + "_" + args.task
return jobcontrol.get_jobname(file_taskname)
[docs]def get_num_license_tokens() -> int:
"""
Returns the number of license tokens to check out, which is equal to
the number of CPUs over which the job is to be distributed.
"""
if host_list := jobcontrol.get_backend_host_list():
return jobcontrol.calculate_njobs(host_list)
return 1
[docs]def get_parser():
"""
Creates argparse.ArgumentParser with supported command line options.
:return: Argument parser object
:rtype: argparser.ArgumentParser
"""
parser = argparse.ArgumentParser(
prog=COMBINATORIAL_SCREEN,
formatter_class=argparse.RawDescriptionHelpFormatter)
subparsers = parser.add_subparsers(
dest="task",
metavar="<task>",
help="The task to perform. For detailed help on a specific task, use "
f"{COMBINATORIAL_SCREEN} <task> -h.")
parser_setup = subparsers.add_parser(
"setup",
help="Set up a screen by creating a combinatorial library archive.")
parser_setup.add_argument(
"-route",
dest="route_file",
metavar="<route>.json",
required=True,
help="JSON route file containing a reaction path that utilizes two or "
"more reactants. Required.")
parser_setup.add_argument(
"-fp_dir",
metavar="<path>",
help="Use reactant fingerprints in the indicated directory, rather "
"than generating from scratch. Fingerprints will still be generated "
"for any reactant which has no fingerprint file in <path>. Equivalent "
"behavior may be achieved by storing this path in the environment "
f"variable {REACTANT_FP_DIR}.")
parser_setup.add_argument("-subjob", help=argparse.SUPPRESS)
parser_setup.add_argument("-irct", help=argparse.SUPPRESS)
parser_setup.add_argument("-ofp", help=argparse.SUPPRESS)
parser_run = subparsers.add_parser(
"run",
help="Run a similarity screen against an existing combinatorial "
"library archive.")
parser_run.add_argument("-query",
metavar="<smiles>",
required=True,
help="SMILES string of query structure. Required.")
parser_run.add_argument(
"-report",
metavar="<n>",
type=int,
default=DEFAULT_REPORT,
help="The number of hits to report. These are the <n> products with "
"the highest similarities to the query, where the subset of products "
"enumerated is determined by a heuristic ranking algorithm applied to "
"the reactants (default: %(default)d).")
parser_run.add_argument(
"-products",
metavar="<p>",
type=int,
help="The minimum number of products that must be successfully "
"enumerated before reporting hits. This number should normally be "
"large compared to the number of hits in order to increase the chances "
"of finding the true best <n> hits among the products. The default is "
"the larger of %d and 100 times the number of hits." % DEFAULT_PRODUCTS)
parser_run.add_argument(
"-retry",
metavar="<m>",
type=int,
default=DEFAULT_RETRY,
help="The maximum number of times to rerun the combinatorial synthesis "
"in order to obtain the minimum required number of products. Each "
"retry adaptively increases the number of products requested to "
"compensate for reactant combinations that fail to yield products "
"(default: %(default)d).")
max_combos = combinatorial_screener.MAX_COMBOS
parser_run.add_argument(
"-max_reactants",
metavar="<r>",
type=int,
help="The maximum number of reactants of each type to select. For "
"example, if there are two reactants A and B, setting this parameter "
"to 100 will result in a maximum of 100 reactants of type A and a "
"maximum of 100 reactants of type B. For a synthetic route with N "
f"reactants, the default is {max_combos:,}**1/N.")
parser_run.add_argument(
"-out",
dest="out_file",
metavar="<outfile>",
help="Output Maestro, SD, CSV or SMILES file for hits. The default "
"is <jobname>.csv.")
parser_run.add_argument("-no3d",
dest="gen_coords",
action="store_false",
help="Skip 3D coordinate generation for hits.")
parser_run.add_argument("-v3000",
action="store_true",
help="Write SD file hits in V3000 "
"format.")
parser_summarize = subparsers.add_parser(
"summarize",
help="Summarize the contents of a combinatorial library archive.")
parser_summarize.add_argument("-osum",
metavar="<sumfile>",
help="Write summary to a file.")
parser.add_argument(
"lib_file",
metavar="<lib>.czip",
help="The combinatorial library archive to create, screen or summarize."
" A default job name is assigned from the base name of this file.")
jobcontrol_options = [cmdline.HOST, cmdline.JOBNAME, cmdline.TMPDIR]
cmdline.add_jobcontrol_options(parser, options=jobcontrol_options)
# Name of subjob spawned by "setup" job:
parser.add_argument("-subjob", help=argparse.SUPPRESS)
# Input reactant file for subjob:
parser.add_argument("-irct", help=argparse.SUPPRESS)
# Output fingerprint file for subjob:
parser.add_argument("-ofp", help=argparse.SUPPRESS)
return parser
[docs]def get_reactant_files(route_file):
"""
Returns the list of runtime paths for the reactant files referenced in
the supplied JSON route file.
:param route_file: JSON route file
:type args: str
:return: List of reactant file names
:rtype: list(str)
"""
reactant_files = []
route_node = route.read_route_file(route_file, None)
for source in synthesis.get_reagent_sources(route_node):
# Note that source.filename contains a runtime path.
reactant_files.append(source.filename)
return reactant_files
[docs]def replace_reactant_file_name(reaction_dict):
"""
Recursive function for descending a reaction route dictionary and
replacing reagent names or reagent file names with CSV file names which
have a matching root but no leading path. For example, the reagent name
name "thiols" would be replaced with "thiols.csv", as would the reagent
file name "/path/to/thiols.pfx".
:param reaction_dict: Dictionary containing "reaction_name" and
"precursors" keys, where the latter points to a list of
dictionaries, one of which may also contain "reaction_name"
and "precursors" keys, hence the need for recursion. The
dictionary is modified in-place.
:type reaction_dict: dict
"""
if "precursors" in reaction_dict:
precursors = reaction_dict["precursors"] # A list of dictionaries
for p_dict in precursors:
if "reagent" in p_dict:
# Replace reagent name with file name.
csv_file = p_dict["reagent"] + ".csv"
p_dict.pop("reagent")
p_dict["file"] = csv_file
elif "file" in p_dict:
# Replace existing file name.
csv_file = fileutils.get_basename(p_dict["file"]) + ".csv"
p_dict["file"] = csv_file
if "reaction_name" in p_dict:
# Recursive call.
replace_reactant_file_name(p_dict)
[docs]def replace_reactant_file_names(route_node):
"""
Returns a synthetic route node which is identical to the supplied one,
except for the replacement of the reagent names or reagent file names
with matching CSV file names which have no leading path.
:param route_node: Synthetic route node
:type route_node: route.RouteNode
:return: Synthetic route node containing new reactant file names
:rtype: route.RouteNode
"""
tree_data = route_node.getTreeData(self_contained=True)
replace_reactant_file_name(tree_data["route"])
return route.parse_route_data(tree_data)
[docs]def summarize_archive(archive_file):
"""
Returns a string that summarizes the contents of a combinatorial library
archive.
:param archive_file: The name of the combinatorial library archive
:type archive_file: str
:return: A summary of the archive contents
:rtype: str
"""
archive_dir = get_clib_dir(archive_file)
# Path separators within Zip archives are always '/'.
fp_list_file = f"{archive_dir}/{FP_LIST_FILE}"
route_file = f"{archive_dir}/{ROUTE_FILE}"
summary = ["Reactions:"]
with zipfile.ZipFile(archive_file) as zfile:
with zfile.open(route_file) as fh:
json_data = json.load(fh)
reaction_dict = route.route_file_schema(json_data["route"])
reactions = []
find_reactions(reaction_dict, reactions)
for i, reaction in enumerate(reactions, 1):
summary.append(f"{i}. {reaction}")
summary.append("\nReactants:")
total_products = 1
with zfile.open(fp_list_file) as fh:
lines = csv.reader(str(fh.read(), 'utf-8').splitlines())
for i, line in enumerate(lines, 1):
reactant = line[0][:-3]
count = int(line[1])
summary.append(f"{i}. {reactant}: {count:,}")
total_products *= count
summary.append(f"Theoretical number of products: {total_products:,}")
return "\n".join(summary)
[docs]def validate_args(args):
"""
Checks the validity of command line arguments.
:param args: argparser.Namespace with command line arguments
:type args: argparser.Namespace
:return: tuple of validity and non-empty error message if not valid
:rtype: bool, str
"""
if not args.lib_file.endswith(".czip"):
mesg = 'Combinatorial library archive must end with ".czip"'
return False, mesg
if args.task == "setup":
if not os.path.isfile(args.route_file):
mesg = f'JSON route file "{args.route_file}" not found'
return False, mesg
# Ensure that reactants are file names and not SMILES strings.
for reactant in get_reactant_files(args.route_file):
ext = os.path.splitext(reactant)[1]
if ext not in LEGAL_REACTANT_FILE_EXT:
mesg = f'Illegal reactant file name: "{reactant}"'
return False, mesg
else:
if not os.path.isfile(args.lib_file):
mesg = f'Combinatorial library archive "{args.lib_file}" not found'
return False, mesg
if args.task == "run":
try:
mol = canvas.ChmMol.fromSMILES(args.query)
except:
return False, f'Invalid query SMILES: "{args.query}"'
if args.report < 1:
mesg = "The number of hits to report must be 1 or greater"
return False, mesg
if args.products is not None and args.products < args.report:
mesg = "The minimum number of products cannot be less than " + \
"the number of hits"
return False, mesg
if args.retry < 0:
return False, "The maximum number of retries cannot be negative"
if args.max_reactants is not None and args.max_reactants < 1:
mesg = "The maximum number of reactants must be 1 or greater"
return False, mesg
if args.out_file:
ftype = fileutils.get_structure_file_format(args.out_file)
if ftype not in LEGAL_STRUCTURE_FILE_TYPES:
return False, f'Illegal output file type: "{args.out_file}"'
return True, ""
[docs]def write_hits(args, num_hits, logger):
"""
Writes the products with the highest similarities to the query to the
hit file.
:param args: argparser.Namespace with command line arguments
:type args: argparser.Namespace
:param num_hits: The number of hits to write
:type num_hits: int
:param logger: Logger to which messages are to be written
:type logger: Logger
"""
products_file = get_jobname(args) + "_products.smi"
fp_generator = canvas.ChmDendriticOut32()
query_mol = canvas.ChmMol.fromSMILES(args.query)
query_fp = fp_generator.generate(query_mol)
# Read products file into memory to allow sorting by similarity.
rows = []
logger.info("\nComputing fingerprints and similarities for products")
with open(products_file) as fh:
for i, row in enumerate(csv.reader(fh), 1):
# Each row contains a SMILES and title, but the title contains
# embedded spaces without protecting quotes.
pos = row[0].find(" ")
smiles = row[0][:pos]
title = row[0][pos + 1:]
mol = canvas.ChmMol.fromSMILES(smiles)
product_fp = fp_generator.generate(mol)
rows.append([query_fp.simTanimoto(product_fp), smiles, title])
if i % 10000 == 0:
logger.info(f"Processed {i:,} products")
rows = sorted(rows, key=itemgetter(0), reverse=True)
hit_file = args.out_file if args.out_file else get_jobname(args) + ".csv"
writer = molio.get_mol_writer(hit_file,
generate_coordinates=args.gen_coords,
require_stereo=False,
v3000=args.v3000)
logger.info(f"Writing top {num_hits:,} products to {hit_file}")
with writer:
for i in range(num_hits):
sim, smiles, title = rows[i]
chem_mol = Chem.MolFromSmiles(smiles)
chem_mol.SetProp("_Name", title)
chem_mol.SetDoubleProp(SIM_PROP, sim)
try:
writer.append(chem_mol)
except structure.UndefinedStereoChemistry as e:
logger.warn(e)
[docs]def zip_library_dir(library_dir):
"""
Creates a Zip file <library>.czip from a directory <library>.clib.
:param library_dir: The library directory to zip
:type library_dir: str
"""
zip_file = library_dir[:-4] + "czip"
with zipfile.ZipFile(zip_file, 'w', zipfile.ZIP_DEFLATED) as zfile:
for root, dirs, files in os.walk(library_dir):
for file in files:
zfile.write(os.path.join(root, file))