Source code for schrodinger.application.phase.packages.shape_screen_gpu_generate

'''
Utility to generate Shape data files (.bin) suitable for GPU Shape screening.
Shape data files include representation of molecular shapes along with
the corresponding chemical structures.
'''

import argparse
import json
import os

from schrodinger import shape
from schrodinger import structure
from schrodinger.application.hitexpander import \
    LogFormatter as PrefixingLogFormatter
from schrodinger.application.phase.packages import shape_binary_utils
from schrodinger.application.phase.packages import shape_creation_logic
from schrodinger.application.phase.packages import shape_ligprep
from schrodinger.application.phase.packages.shape_creation_logic import \
    get_jobname_and_outfile
from schrodinger.job import jobcontrol
from schrodinger.job import queue
from schrodinger.utils import cmdline
from schrodinger.utils import fileutils
from schrodinger.utils import log

GENERATE_TASK = 'generate'
SHAPE_SCREEN_DRIVER_GPU_PY = 'shape_screen_driver_gpu.py'

logger = log.get_output_logger('shape.shape_screen_gpu_generate')

# =============================================================================
# Command line
# =============================================================================


[docs]def add_jobcontrol_arguments(parser): ''' Adds job control and standard arguments to the `parser`. ''' cmdline.add_jobcontrol_options(parser, options=[ cmdline.HOST, cmdline.JOBNAME, cmdline.SAVE, cmdline.DEBUG, ]) cmdline.add_standard_options(parser, [ cmdline.NJOBS, cmdline.NSTRUCTS, cmdline.STRICT, cmdline.RETRIES, ], default_retries=2)
# -----------------------------------------------------------------------------
[docs]def add_generate_arguments(parser): ''' Adds shape generation arguments to the `parser`. :param parser: Command line arguments parser. :type parser: argparse.ArgumentParser ''' parser.add_argument('-v', '--verbose', action='store_true', help=argparse.SUPPRESS) shape_creation_logic.add_arguments(parser) add_jobcontrol_arguments(parser)
# -----------------------------------------------------------------------------
[docs]def validate_jobcontrol_args(args): ''' Checks job control command line arguments. :param args: Namespace holding the command line options. :type args: argparse.Namespace :return: Tuple of validation success and error message. :rtype: (bool, str) ''' if args.shape_data_treatment == shape_binary_utils.REMOTE_BINARY: if args.shape_data_dir is None: return (False, "'-shape_data_dir' must be provided.") if not os.path.isabs(args.shape_data_dir): return (False, "'-shape_data_dir' must be an absolute path.") if (jobcontrol.under_job_control() and not os.path.isdir(args.shape_data_dir)): return (False, "'-shape_data_dir' must point to a directory.") elif args.shape_data_dir: return (False, "'-shape_data_dir' must not be provided.") divisible = args.ligprep or args.flex distributed = args.nstructs or args.njobs if distributed and not divisible: return (False, "distributed execution is supported only" " with '-ligprep' and/or '-flex'.") return True, ""
# -----------------------------------------------------------------------------
[docs]def validate_generate_args(args): ''' Checks generate task command line arguments. :param args: Namespace holding the command line options. :type args: argparse.Namespace :return: Tuple of validation success and error message. :rtype: (bool, str) ''' valid, msg = shape_creation_logic.validate_args(args) if not valid: return False, msg valid, msg = validate_jobcontrol_args(args) if not valid: return False, msg return True, ""
# =============================================================================
[docs]def get_shapes_file_metadata(filename): ''' Reads metadata from shape data file and parses it from JSON. :param filename: Shape file name. :type filename: str :return: Deserialized shape file metadata. :rtype: dict ''' with shape_binary_utils.ShapeFileReader(filename) as reader: return json.loads(reader.metadata)
# -----------------------------------------------------------------------------
[docs]def get_backend_argv(argv=None): ''' Isolates backend command line arguments. :param argv: Command line arguments (not including script name). :type argv: list(str) :return: Arguments that are not job-control related, not '-source', '-shape_data_dir', or '-shape_data_treatment'. :rtype: list(str) ''' parser = argparse.ArgumentParser() subparsers = parser.add_subparsers(dest="task") gen_parser = subparsers.add_parser(GENERATE_TASK) for name in ('-source', '-shape_data_dir', '-shape_data_treatment'): gen_parser.add_argument(name) add_jobcontrol_arguments(gen_parser) _, be_argv = parser.parse_known_args(argv) return be_argv
# -----------------------------------------------------------------------------
[docs]def echo_log_files(subjobnames): ''' Echoes subjob log files content. :param subjobnames: Names of the subjobs to consider. :type subjobnames: list(str) ''' def separator(): logger.info('=' * 80) if subjobnames: separator() for name in subjobnames: fn = f'{name}.log' logger.info('SUBJOB: %s', name) if os.path.exists(fn): with open(fn) as fp: for line in fp: logger.info('%s', line.rstrip()) else: logger.info('<no log file>') separator()
# -----------------------------------------------------------------------------
[docs]def split_input_and_run_subjobs(args, nstructs): ''' Splits input and runs subjobs for each piece. :return: Subjob names. :rtype: list(str) ''' jobname, _ = get_jobname_and_outfile(args) inputs = shape_ligprep.split_structures(jobname=jobname, infile=args.screen, njobs=0, nstructs=nstructs) subjob_argv = get_backend_argv() max_failures = 0 if args.strict else queue.NOLIMIT jobdj = queue.JobDJ( verbosity='normal', max_retries=args.retries, max_failures=max_failures, ) subjobnames = [] for (infile, _) in inputs: subjobname, _ = fileutils.splitext(infile) subjobnames.append(subjobname) cmd = ['run', SHAPE_SCREEN_DRIVER_GPU_PY, GENERATE_TASK, '-JOBNAME', subjobname, '-source', infile, '-shape_data_treatment', shape_binary_utils.COPY_BINARY ] + subjob_argv # yapf:disable logger.debug('Launching: %s', str(cmd)) jobdj.addJob(cmd) try: jobdj.run() except RuntimeError as e: logger.error('Too many subjob failures.') echo_log_files(subjobnames) if jobdj._failed: logger.warning('The following subjob(s) failed:') for job in jobdj._failed: logger.warning('%s', job.name) logger.warning('Examine earlier records in this file for details.') return subjobnames
# -----------------------------------------------------------------------------
[docs]def run_distributed(args, nstructs): ''' :param args: Namespace that holds (validated) command line arguments. :type args: argparse.Namespace :param nstructs: Number of structures per subjob. :type nstructs: int ''' # merge shape files shape_files = [] for jobname in split_input_and_run_subjobs(args, nstructs): fn = jobname + '.bin' if shape.is_shape_binary_file(fn): shape_files.append(fn) else: logger.warning("Could not load shapes obtained in subjob '%s'", jobname) _, outfile = get_jobname_and_outfile(args) if shape_files: metadata = get_shapes_file_metadata(shape_files[0]) metadata['source'] = args.screen metadata = json.dumps(metadata) writer = shape_binary_utils.ShapeFileWriter( outfile, metadata=metadata, compress=args.compress_shape_data_file) logger.info('Merging subjob shapes into: %s', outfile) total_num_entries, total_num_shapes = 0, 0 with writer: for fn in shape_files: num_entries, num_shapes = 0, 0 with shape_binary_utils.SingularShapeFileReader(fn) as reader: for (title, shapes) in reader: writer.append(title, shapes) num_entries += 1 num_shapes += shapes.size() logger.info(' %s: %d molecules, %d shapes', fn, num_entries, num_shapes) total_num_entries += num_entries total_num_shapes += num_shapes logger.info('\n%s : %d molecules, %d shapes', outfile, total_num_entries, total_num_shapes)
# -----------------------------------------------------------------------------
[docs]def shape_generate(args): ''' :param args: Namespace that holds command line arguments. :type args: argparse.Namespace ''' log_handler = log.get_output_logger_handler() log_handler.setFormatter(PrefixingLogFormatter()) if not args.nstructs and not args.njobs: njobs = 1 else: nligs = structure.count_structures(args.screen) if args.njobs: njobs = args.njobs nstructs = (nligs + njobs - 1) // njobs else: nstructs = args.nstructs njobs = (nligs + nstructs - 1) // nstructs if njobs > 1: run_distributed(args, nstructs) else: shape_creation_logic.execute(args)
# =============================================================================