Source code for schrodinger.application.phase.packages.shape_creation_logic

'''
Creation of "binary shape files" from several kind of sources (PHASE-2070).
'''

import argparse
import json
import os
import time

from schrodinger.application.phase.packages import conformer_reader
from schrodinger.application.phase.packages import conformer_storage
from schrodinger.application.phase.packages import phase_screen_utils
from schrodinger.application.phase.packages import phase_utils
from schrodinger.application.phase.packages import shape_binary_utils
from schrodinger.application.phase.packages import shape_generator
from schrodinger.application.phase.packages import shape_ligprep
from schrodinger.infra import mm
from schrodinger.infra import phase
from schrodinger.job import jobcontrol
from schrodinger.structutils import smiles
from schrodinger.utils import fileutils
from schrodinger.utils import log

# Acceptable input file types
LEGAL_SOURCE_FORMATS = (phase.PhpFileFormat_PHP_FORMAT_MAE,
                        phase.PhpFileFormat_PHP_FORMAT_SD,
                        phase.PhpFileFormat_PHP_FORMAT_PHDB)

logger = log.get_output_logger('shape.shape_creation_logic')

DEFAULT_CONFORMER_SAMPLING_METHOD_NAME = phase.CONF_SAMPLE_FINE_NAME

# =============================================================================
# Command line
# =============================================================================


[docs]def add_arguments(p): ''' Adds shape-creation-specific arguments to the parser. :param p: Parser-like instance. :type p: argparser.ArgumentParser ''' # Required arguments g = p.add_argument_group("Required Named Arguments") g.add_argument( "-source", dest="screen", metavar="<path>", required=True, help="Source of the structures to generate the shapes for. " "Supported formats: compressed and uncompressed SMILES files, " "2D and 3D Maestro and Mol/SDF files, Phase Database " "(conformers must be included).") # Shape generation options shape_generator.add_shape_generation_options(p) # General options g = p.add_argument_group(title="General Options") g.add_argument("-titles_only", action='store_false', dest="store_conformers", help=argparse.SUPPRESS) g.add_argument( "-conformer_format", choices={x.value for x in conformer_storage.Format}, default=conformer_storage.Format.COMPACT.value, help="Format to be used for the conformers. Default: %(default)s.") g.add_argument("-keep_properties", action="store_true", help="Do not strip structure/atom/bond properties from the " "stored structures.") g.add_argument("-no_compress", action="store_false", dest="compress_shape_data_file", help="Do not compress shape data file.") g.add_argument("-limit", metavar="<n>", type=int, default=10, choices=[phase_utils.RestrictedRange(1, None)], help="Store shapes of no more than the first <n> conformers." " Default: %(default)d.") # File input options g = p.add_argument_group( title="Ligand Preparation and Conformer Perception Options") conformer_reader.add_file_options(g, create_group=False) shape_ligprep.add_ligprep_arguments(g) # Conformer generation conformer_reader.add_confgen_options( p, False, default_conf_sample_name=DEFAULT_CONFORMER_SAMPLING_METHOD_NAME) # Phase DB input g = p.add_argument_group(title="Phase Database Subset Option") g.add_argument( "-isub", metavar="<subset>", help="Instead of creating shapes for all records in the input " "Phase Database, process only the subset defined in the " "<subset>_phase.inp file.") return p
# -----------------------------------------------------------------------------
[docs]def validate_args(args): ''' Validates command-line arguments added by `add_arguments`. :param args: Namespace holding command line options. :type args: argparse.Namespace :return: Tuple of validation success and error message. :rtype: (bool, str) ''' runtimize_paths(args) # Source validation if args.ligprep: source_format_is_legal = \ shape_ligprep.get_structure_file_format(args.screen) in \ shape_ligprep.LIGPREP_INPUT_OPTION else: source_format_is_legal = \ phase.get_phase_file_format(args.screen) in LEGAL_SOURCE_FORMATS if not source_format_is_legal: if shape_ligprep.get_structure_file_format(args.screen) in \ shape_ligprep.LIGPREP_INPUT_OPTION: return False, "-ligprep flag is needed for input file format: " + args.screen else: return False, "Unexpected input file format: " + args.screen # Database source validation if phase_utils.is_phase_database_path(args.screen): if args.distinct or args.connect or args.stereo or args.title: return False, "File options are not legal for database input" if args.flex: # Due to screening shape binary format return False, "Conformer generation is not legal when processing " \ "a database; see $SCHRODINGER/phase_database " \ "-help_revise" validated, msg = phase_screen_utils.validate_source_dbs([args.screen]) if not validated: return False, msg if args.ligprep: return False, "-ligprep is not supported for database input" # File source validation else: if args.ligprep: if args.distinct or args.connect or args.stereo or args.title: return (False, "File options are not compatible with -ligprep") if args.isub: return False, "Database options are not legal for file input" if not os.path.isfile(args.screen): return False, "File not found: " + args.screen validated, msg = shape_generator.validate_shape_gen_options(args) if not validated: return False, msg validated, msg = conformer_reader.validate_confgen_conflicts(args) if not validated: return False, msg validated, msg = conformer_reader.validate_title_option(args) if not validated: return False, msg validated, msg = phase_screen_utils.validate_subset(args) if not validated: return False, msg valid, msg = shape_ligprep.validate_ligprep_arguments(args) if not valid: return False, msg return True, ""
# -----------------------------------------------------------------------------
[docs]def runtimize_paths(args): ''' Runtimizes input file paths for the command line arguments. :param args: Namespace populated with command line arguments. :type args: argparse.Namespace ''' # Update input files with appropriate path if running under job control for attr in ('shape', 'screen', 'fd', 'rad'): path = getattr(args, attr, None) if path: setattr(args, attr, phase_utils.get_proper_path(path))
# -----------------------------------------------------------------------------
[docs]def get_input_files(args): ''' Identifies input files among arguments (for job control purposes). :param args: Namespace populated with command line arguments. :type args: argparse.Namespace ''' return [args.screen, args.fd, args.rad]
# -----------------------------------------------------------------------------
[docs]def get_jobname_and_outfile(args): """ Returns the job name and job name-derived output file name. If job name is not provided it gets derived from the input file name. :param args: Namespace populated with command line arguments. :type args: argparse.Namespace :return: job name, output file name :rtype: str, str """ jobname = jobcontrol.get_jobname(args.screen) outfile = jobname + '.bin' shape_data_dir = getattr(args, 'shape_data_dir', None) if shape_data_dir: outfile = os.path.join(shape_data_dir, outfile) return jobname, outfile
# ============================================================================= # Business logic # =============================================================================
[docs]class ShapeCreator(object):
[docs] def __init__(self, args): ''' Expects validated and runtimized command line arguments. :param args: Namespace populated with command line arguments. :type args: argparse.Namespace ''' self._limit = args.limit self._source = args.screen if args.sample is None: args.sample = DEFAULT_CONFORMER_SAMPLING_METHOD_NAME self._reader_options = conformer_reader.ConformerReaderOptions( args, phase.SHAPE_SCREEN_SUBSET) self.serializeConformers = None self.deserializeConformers = None if args.store_conformers: self._compress_conformers = not args.compress_shape_data_file self._conformer_format = conformer_storage.Format( args.conformer_format) self.serializeConformers, self.deserializeConformers = \ conformer_storage.get_api( self._conformer_format, keep_properties=args.keep_properties, compress=self._compress_conformers) if self._conformer_format == conformer_storage.Format.LOSSLESS: self.deserializeConformers = None # set "shape" to empty string (expected in ShapeGenerator.__init__ # but not needed for shape generation) args.shape = '' self._generator = shape_generator.SerializableShapeGenerator(args) self._smiles_generator = smiles.SmilesGenerator(unique=True)
[docs] def getMetadata(self): ''' Returns JSON-able representation of the `self`. ''' outcome = { 'mmshare': mm.mmfile_get_product_version('mmshare'), 'schrodinger': os.getenv('SCHRODINGER'), 'source': self._source, 'generator': self._generator.toDict(), 'conformers': bool(self.serializeConformers), } # yapf: disable if self.serializeConformers: outcome['conformer_format'] = self._conformer_format.value outcome['compress_conformers'] = self._compress_conformers return outcome
def _getStructureTitleOrSMILES(self, st): ''' Returns either structure title or SMILES in case the former is empty. ''' title = st.title if title: return title else: logger.warning("empty structure title, using SMILES instead") return self._smiles_generator.getSmiles(st) def __iter__(self): ''' Yields (title, shapes) tuples. If self._store_conformers is True, the `title` string holds the serialized conformers instead of the actual structure title. ''' reader = conformer_reader.ConformerReader(self._source, self._reader_options) try: for conformers in reader.getConformers(): sts = conformers[:self._limit] title = self._getStructureTitleOrSMILES(sts[0]) if self.serializeConformers: try: serialized = self.serializeConformers(sts) except (RuntimeError, phase.PhpException) as e: logger.info("skipping '%s': %s.", title, e) continue if self.deserializeConformers: try: sts = self.deserializeConformers(serialized) except (RuntimeError, phase.PhpException) as e: logger.info("skipping '%s': %s.", title, e) continue try: shapes = self._generator.getConformerShapes(sts) except (RuntimeError, phase.PhpException) as e: logger.info("skipping '%s': %s.", title, e) continue if self.serializeConformers: yield (serialized, shapes) else: yield (title, shapes) finally: reader.close()
# =============================================================================
[docs]def validate_shape_data_file_metadata(meta): ''' Validates shape data file metadata. :param meta: Shape data file metadata. :type meta: dict :return: Validation outcome and complain (if any). :rtype: (bool, str) ''' if type(meta) is not dict: return False, 'is not a dictionary' if 'conformers' not in meta: return False, 'lacks "conformers"' if meta['conformers']: fmt_str = meta.get('conformer_format', conformer_storage.Format.LOSSLESS.value) try: conformer_storage.Format(fmt_str) except ValueError: return False, f'"{fmt_str}" is not a valid conformer format' if 'generator' not in meta: return False, 'lacks "generator"' if type(meta['generator']) is not dict: return False, '"generator" is not a dictionary' return shape_generator.validate_shape_generator_dict(meta['generator'])
# =============================================================================
[docs]def validate_shape_data_file_metadata_compatibility(meta1, meta2): ''' Validates shape data file metadata compatibility. :param meta1: Validated shape data file metadata #1. :type meta1: dict :param meta2: Validated shape data file metadata #2. :type meta2: dict :return: Validation outcome and complain (if any). :rtype: (bool, str) ''' # conformers availability may vary return shape_generator.validate_shape_generator_dict_compatibility( meta1['generator'], meta2['generator'])
# =============================================================================
[docs]def estimate_shape_data_size(shape_type, num_conformers, compress, conformer_format=None): ''' Estimates number of bytes needed to store the shape data for a single molecule. Based on observations for 10,000 molecules picked from Enamine REAL. :param shape_type: Shape assignment scheme name ("pharm", "atom_color" or "atom_no_color"). :type shape_type: str :param num_conformers: Number of conformers per molecule. :type num_conformers: int :param compress: File-level compression. :type compress: bool :param conformer_format: Format used to store conformer structures (`None` for structureless shape data). :type conformer_format: `conformer_storage.Format` or NoneType :return: Estimated shape data size for a single molecule (in bytes). :rtype: int ''' # name : (first conformer, any additional conformer) sizes = { 'pharm_titles' : ( 298, 132), 'pharm_compact' : (1508, 134), 'pharm_lossless' : (1300, 492), 'pharm_titles_nc' : ( 405, 379), 'pharm_compact_nc' : (2839, 396), 'pharm_lossless_nc' : (2441, 798), 'atom_titles' : ( 354, 144), 'atom_compact' : (1551, 137), 'atom_lossless' : (1328, 501), 'atom_titles_nc' : ( 485, 458), 'atom_compact_nc' : (2919, 475), 'atom_lossless_nc' : (2521, 873) } # yapf: disable if shape_type == shape_generator.SHAPE_PHARM_COLOR: name = 'pharm' elif shape_type in (shape_generator.SHAPE_ATOM_COLOR, shape_generator.SHAPE_ATOM_NO_COLOR): name = 'atom' else: raise ValueError(f'unexpected shape_type: "{shape_type}"') fmt = 'titles' if conformer_format is None else conformer_format.value name += '_' + fmt if not compress: name += '_nc' first, per_conf = sizes[name] return first + num_conformers * per_conf if num_conformers > 1 else first
# =============================================================================
[docs]def execute(args): ''' The "main" subroutine for the "create" task. :param args: Namespace populated with command line arguments. :type args: argparse.Namespace ''' if args.verbose: logger.setLevel(log.DEBUG) shape_ligprep.logger.setLevel(log.DEBUG) shape_generator.logger.setLevel(log.DEBUG) # note that if creator uses ligprep with epik # we cannot iterate over it twice (LIGPREP-1919) creator = ShapeCreator(args) metadata = creator.getMetadata() _, outfile = get_jobname_and_outfile(args) logger.info("\ninput structures: '%s'", args.screen) logger.info("shape data file: '%s'", outfile) logger.info("gzip-compressed: %s", args.compress_shape_data_file) logger.info('\nstore_conformers: %s', args.store_conformers) if args.store_conformers: logger.info('conformers format: %s', args.conformer_format) logger.info('') for (k, v) in metadata['generator'].items(): name = k[1:] if k.startswith('_') else k if name.startswith('feature'): logger.debug('%s:\n%s', name, v) else: logger.info('%s: %s', name, v) # writes out metadata header to the `outfile` writer = shape_binary_utils.ShapeFileWriter( outfile, json.dumps(metadata), compress=args.compress_shape_data_file) num_entries = 0 num_shapes = 0 t1 = time.process_time() milestones = tuple(10**n for n in range(5)) logger.info('') with writer: for ID, shapes in creator: writer.append(ID, shapes) num_entries += 1 num_shapes += len(shapes) if num_entries in milestones or num_entries % milestones[-1] == 0: seconds = time.process_time() - t1 logger.info('%8.1f sec: %8d structures %10d conformers', seconds, num_entries, num_shapes) if not num_shapes: # remove `outfile` if it contains only a header fileutils.force_remove(outfile) else: logger.info("\n'%s': %d shapes for %d structures", outfile, num_shapes, num_entries)
# =============================================================================