'''
Creation of "binary shape files" from several kind of sources (PHASE-2070).
'''
import argparse
import json
import os
import time
from schrodinger.application.phase.packages import conformer_reader
from schrodinger.application.phase.packages import conformer_storage
from schrodinger.application.phase.packages import phase_screen_utils
from schrodinger.application.phase.packages import phase_utils
from schrodinger.application.phase.packages import shape_binary_utils
from schrodinger.application.phase.packages import shape_generator
from schrodinger.application.phase.packages import shape_ligprep
from schrodinger.infra import mm
from schrodinger.infra import phase
from schrodinger.job import jobcontrol
from schrodinger.structutils import smiles
from schrodinger.utils import fileutils
from schrodinger.utils import log
# Acceptable input file types
LEGAL_SOURCE_FORMATS = (phase.PhpFileFormat_PHP_FORMAT_MAE,
phase.PhpFileFormat_PHP_FORMAT_SD,
phase.PhpFileFormat_PHP_FORMAT_PHDB)
logger = log.get_output_logger('shape.shape_creation_logic')
DEFAULT_CONFORMER_SAMPLING_METHOD_NAME = phase.CONF_SAMPLE_FINE_NAME
# =============================================================================
# Command line
# =============================================================================
[docs]def add_arguments(p):
'''
Adds shape-creation-specific arguments to the parser.
:param p: Parser-like instance.
:type p: argparser.ArgumentParser
'''
# Required arguments
g = p.add_argument_group("Required Named Arguments")
g.add_argument(
"-source",
dest="screen",
metavar="<path>",
required=True,
help="Source of the structures to generate the shapes for. "
"Supported formats: compressed and uncompressed SMILES files, "
"2D and 3D Maestro and Mol/SDF files, Phase Database "
"(conformers must be included).")
# Shape generation options
shape_generator.add_shape_generation_options(p)
# General options
g = p.add_argument_group(title="General Options")
g.add_argument("-titles_only",
action='store_false',
dest="store_conformers",
help=argparse.SUPPRESS)
g.add_argument(
"-conformer_format",
choices={x.value for x in conformer_storage.Format},
default=conformer_storage.Format.COMPACT.value,
help="Format to be used for the conformers. Default: %(default)s.")
g.add_argument("-keep_properties",
action="store_true",
help="Do not strip structure/atom/bond properties from the "
"stored structures.")
g.add_argument("-no_compress",
action="store_false",
dest="compress_shape_data_file",
help="Do not compress shape data file.")
g.add_argument("-limit",
metavar="<n>",
type=int,
default=10,
choices=[phase_utils.RestrictedRange(1, None)],
help="Store shapes of no more than the first <n> conformers."
" Default: %(default)d.")
# File input options
g = p.add_argument_group(
title="Ligand Preparation and Conformer Perception Options")
conformer_reader.add_file_options(g, create_group=False)
shape_ligprep.add_ligprep_arguments(g)
# Conformer generation
conformer_reader.add_confgen_options(
p,
False,
default_conf_sample_name=DEFAULT_CONFORMER_SAMPLING_METHOD_NAME)
# Phase DB input
g = p.add_argument_group(title="Phase Database Subset Option")
g.add_argument(
"-isub",
metavar="<subset>",
help="Instead of creating shapes for all records in the input "
"Phase Database, process only the subset defined in the "
"<subset>_phase.inp file.")
return p
# -----------------------------------------------------------------------------
[docs]def validate_args(args):
'''
Validates command-line arguments added by `add_arguments`.
:param args: Namespace holding command line options.
:type args: argparse.Namespace
:return: Tuple of validation success and error message.
:rtype: (bool, str)
'''
runtimize_paths(args)
# Source validation
if args.ligprep:
source_format_is_legal = \
shape_ligprep.get_structure_file_format(args.screen) in \
shape_ligprep.LIGPREP_INPUT_OPTION
else:
source_format_is_legal = \
phase.get_phase_file_format(args.screen) in LEGAL_SOURCE_FORMATS
if not source_format_is_legal:
if shape_ligprep.get_structure_file_format(args.screen) in \
shape_ligprep.LIGPREP_INPUT_OPTION:
return False, "-ligprep flag is needed for input file format: " + args.screen
else:
return False, "Unexpected input file format: " + args.screen
# Database source validation
if phase_utils.is_phase_database_path(args.screen):
if args.distinct or args.connect or args.stereo or args.title:
return False, "File options are not legal for database input"
if args.flex: # Due to screening shape binary format
return False, "Conformer generation is not legal when processing " \
"a database; see $SCHRODINGER/phase_database " \
"-help_revise"
validated, msg = phase_screen_utils.validate_source_dbs([args.screen])
if not validated:
return False, msg
if args.ligprep:
return False, "-ligprep is not supported for database input"
# File source validation
else:
if args.ligprep:
if args.distinct or args.connect or args.stereo or args.title:
return (False, "File options are not compatible with -ligprep")
if args.isub:
return False, "Database options are not legal for file input"
if not os.path.isfile(args.screen):
return False, "File not found: " + args.screen
validated, msg = shape_generator.validate_shape_gen_options(args)
if not validated:
return False, msg
validated, msg = conformer_reader.validate_confgen_conflicts(args)
if not validated:
return False, msg
validated, msg = conformer_reader.validate_title_option(args)
if not validated:
return False, msg
validated, msg = phase_screen_utils.validate_subset(args)
if not validated:
return False, msg
valid, msg = shape_ligprep.validate_ligprep_arguments(args)
if not valid:
return False, msg
return True, ""
# -----------------------------------------------------------------------------
[docs]def runtimize_paths(args):
'''
Runtimizes input file paths for the command line arguments.
:param args: Namespace populated with command line arguments.
:type args: argparse.Namespace
'''
# Update input files with appropriate path if running under job control
for attr in ('shape', 'screen', 'fd', 'rad'):
path = getattr(args, attr, None)
if path:
setattr(args, attr, phase_utils.get_proper_path(path))
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
[docs]def get_jobname_and_outfile(args):
"""
Returns the job name and job name-derived output file name. If job
name is not provided it gets derived from the input file name.
:param args: Namespace populated with command line arguments.
:type args: argparse.Namespace
:return: job name, output file name
:rtype: str, str
"""
jobname = jobcontrol.get_jobname(args.screen)
outfile = jobname + '.bin'
shape_data_dir = getattr(args, 'shape_data_dir', None)
if shape_data_dir:
outfile = os.path.join(shape_data_dir, outfile)
return jobname, outfile
# =============================================================================
# Business logic
# =============================================================================
[docs]class ShapeCreator(object):
[docs] def __init__(self, args):
'''
Expects validated and runtimized command line arguments.
:param args: Namespace populated with command line arguments.
:type args: argparse.Namespace
'''
self._limit = args.limit
self._source = args.screen
if args.sample is None:
args.sample = DEFAULT_CONFORMER_SAMPLING_METHOD_NAME
self._reader_options = conformer_reader.ConformerReaderOptions(
args, phase.SHAPE_SCREEN_SUBSET)
self.serializeConformers = None
self.deserializeConformers = None
if args.store_conformers:
self._compress_conformers = not args.compress_shape_data_file
self._conformer_format = conformer_storage.Format(
args.conformer_format)
self.serializeConformers, self.deserializeConformers = \
conformer_storage.get_api(
self._conformer_format,
keep_properties=args.keep_properties,
compress=self._compress_conformers)
if self._conformer_format == conformer_storage.Format.LOSSLESS:
self.deserializeConformers = None
# set "shape" to empty string (expected in ShapeGenerator.__init__
# but not needed for shape generation)
args.shape = ''
self._generator = shape_generator.SerializableShapeGenerator(args)
self._smiles_generator = smiles.SmilesGenerator(unique=True)
def _getStructureTitleOrSMILES(self, st):
'''
Returns either structure title or SMILES in case the former is empty.
'''
title = st.title
if title:
return title
else:
logger.warning("empty structure title, using SMILES instead")
return self._smiles_generator.getSmiles(st)
def __iter__(self):
'''
Yields (title, shapes) tuples. If self._store_conformers is True,
the `title` string holds the serialized conformers instead of
the actual structure title.
'''
reader = conformer_reader.ConformerReader(self._source,
self._reader_options)
try:
for conformers in reader.getConformers():
sts = conformers[:self._limit]
title = self._getStructureTitleOrSMILES(sts[0])
if self.serializeConformers:
try:
serialized = self.serializeConformers(sts)
except (RuntimeError, phase.PhpException) as e:
logger.info("skipping '%s': %s.", title, e)
continue
if self.deserializeConformers:
try:
sts = self.deserializeConformers(serialized)
except (RuntimeError, phase.PhpException) as e:
logger.info("skipping '%s': %s.", title, e)
continue
try:
shapes = self._generator.getConformerShapes(sts)
except (RuntimeError, phase.PhpException) as e:
logger.info("skipping '%s': %s.", title, e)
continue
if self.serializeConformers:
yield (serialized, shapes)
else:
yield (title, shapes)
finally:
reader.close()
# =============================================================================
# =============================================================================
# =============================================================================
[docs]def estimate_shape_data_size(shape_type,
num_conformers,
compress,
conformer_format=None):
'''
Estimates number of bytes needed to store the shape data
for a single molecule. Based on observations for 10,000
molecules picked from Enamine REAL.
:param shape_type: Shape assignment scheme name
("pharm", "atom_color" or "atom_no_color").
:type shape_type: str
:param num_conformers: Number of conformers per molecule.
:type num_conformers: int
:param compress: File-level compression.
:type compress: bool
:param conformer_format: Format used to store conformer structures
(`None` for structureless shape data).
:type conformer_format: `conformer_storage.Format` or NoneType
:return: Estimated shape data size for a single molecule (in bytes).
:rtype: int
'''
# name : (first conformer, any additional conformer)
sizes = {
'pharm_titles' : ( 298, 132),
'pharm_compact' : (1508, 134),
'pharm_lossless' : (1300, 492),
'pharm_titles_nc' : ( 405, 379),
'pharm_compact_nc' : (2839, 396),
'pharm_lossless_nc' : (2441, 798),
'atom_titles' : ( 354, 144),
'atom_compact' : (1551, 137),
'atom_lossless' : (1328, 501),
'atom_titles_nc' : ( 485, 458),
'atom_compact_nc' : (2919, 475),
'atom_lossless_nc' : (2521, 873)
} # yapf: disable
if shape_type == shape_generator.SHAPE_PHARM_COLOR:
name = 'pharm'
elif shape_type in (shape_generator.SHAPE_ATOM_COLOR,
shape_generator.SHAPE_ATOM_NO_COLOR):
name = 'atom'
else:
raise ValueError(f'unexpected shape_type: "{shape_type}"')
fmt = 'titles' if conformer_format is None else conformer_format.value
name += '_' + fmt
if not compress:
name += '_nc'
first, per_conf = sizes[name]
return first + num_conformers * per_conf if num_conformers > 1 else first
# =============================================================================
[docs]def execute(args):
'''
The "main" subroutine for the "create" task.
:param args: Namespace populated with command line arguments.
:type args: argparse.Namespace
'''
if args.verbose:
logger.setLevel(log.DEBUG)
shape_ligprep.logger.setLevel(log.DEBUG)
shape_generator.logger.setLevel(log.DEBUG)
# note that if creator uses ligprep with epik
# we cannot iterate over it twice (LIGPREP-1919)
creator = ShapeCreator(args)
metadata = creator.getMetadata()
_, outfile = get_jobname_and_outfile(args)
logger.info("\ninput structures: '%s'", args.screen)
logger.info("shape data file: '%s'", outfile)
logger.info("gzip-compressed: %s", args.compress_shape_data_file)
logger.info('\nstore_conformers: %s', args.store_conformers)
if args.store_conformers:
logger.info('conformers format: %s', args.conformer_format)
logger.info('')
for (k, v) in metadata['generator'].items():
name = k[1:] if k.startswith('_') else k
if name.startswith('feature'):
logger.debug('%s:\n%s', name, v)
else:
logger.info('%s: %s', name, v)
# writes out metadata header to the `outfile`
writer = shape_binary_utils.ShapeFileWriter(
outfile, json.dumps(metadata), compress=args.compress_shape_data_file)
num_entries = 0
num_shapes = 0
t1 = time.process_time()
milestones = tuple(10**n for n in range(5))
logger.info('')
with writer:
for ID, shapes in creator:
writer.append(ID, shapes)
num_entries += 1
num_shapes += len(shapes)
if num_entries in milestones or num_entries % milestones[-1] == 0:
seconds = time.process_time() - t1
logger.info('%8.1f sec: %8d structures %10d conformers',
seconds, num_entries, num_shapes)
if not num_shapes:
# remove `outfile` if it contains only a header
fileutils.force_remove(outfile)
else:
logger.info("\n'%s': %d shapes for %d structures", outfile, num_shapes,
num_entries)
# =============================================================================