Source code for schrodinger.application.phase.packages.shape_screen_gpu_update

'''
Module to merge or append  Shape data files (.bin).
'''

import json
import math
import os

from schrodinger import shape
from schrodinger.application.phase.packages import phase_utils
from schrodinger.application.phase.packages import shape_binary_utils
from schrodinger.application.phase.packages import shape_creation_logic
from schrodinger.infra import phase
from schrodinger.job import jobcontrol
from schrodinger.utils import cmdline
from schrodinger.utils import log

# Shape data file treatment
COPY_BINARY = shape_binary_utils.COPY_BINARY
REMOTE_BINARY = shape_binary_utils.REMOTE_BINARY

# Task names
MERGE_TASK = 'merge'
SPLIT_TASK = 'split'

logger = log.get_output_logger('shape.shape_screen_gpu_update')


[docs]class ShapeDataUpdateDriver():
[docs] def __init__(self, args): """ Initializes the driver for mergeing or appending shape data file(s). :param args: argument namespace with command line options :type args: argparse.Namespace """ self._task = args.task self._append = args.append self._target_file = args.target_file self._infiles = args.infile self._source = args.source
[docs] def runMerge(self): """ Merge or append Shape data files in self._infile to self._target_file. """ with shape_binary_utils.ShapeFileReader(self._infiles[0]) as reader: metadata = json.loads(reader.metadata) compress = not metadata.get('compress_conformers', False) if not self._append: sub_task = "merge" metadata['source'] = self._source if self._source else ':'.join( self._infiles) metadata = json.dumps(metadata) writer = shape_binary_utils.ShapeFileWriter(self._target_file, metadata, compress=compress) else: sub_task = "append" writer = shape_binary_utils.ShapeFileWriter(self._target_file, '', compress=compress) total_num_entries, total_num_shapes = 0, 0 with writer: for fn in self._infiles: num_entries, num_shapes = 0, 0 with shape_binary_utils.SingularShapeFileReader(fn) as reader: for (title, shapes) in reader: writer.append(title, shapes) num_entries += 1 num_shapes += shapes.size() logger.info(' %s: %d entries, %d shapes.' % (fn, num_entries, num_shapes)) total_num_entries += num_entries total_num_shapes += num_shapes logger.info( '>>>Total %s counts for %s: %d entries, %d shapes.' % (sub_task, self._target_file, total_num_entries, total_num_shapes)) logger.info('%s finished successfully.' % sub_task)
[docs]def split_shape_data_file(args): """ The "split" task. :param args: argument namespace with command line options :type args: argparse.Namespace """ infile = phase_utils.get_proper_path(args.infile) num_molecules = 0 with shape_binary_utils.ShapeFileReader(infile) as reader: for (shapes, titles) in reader: num_molecules += len(titles) too_few_molecules = f'{args.infile}: too few molecules' if num_molecules == 0: logger.info(too_few_molecules) return num_molecules_per_outfile = int(math.ceil(num_molecules / args.num_parts)) num_parts = int(math.ceil(num_molecules / num_molecules_per_outfile)) if num_parts < args.num_parts: logger.info(too_few_molecules) return if args.shape_data_treatment == REMOTE_BINARY: backend = None else: backend = jobcontrol.get_backend() num_digits = len(str(num_parts)) outfile_fmt = f'{args.outprefix}_{{:0{num_digits}d}}.bin' with shape_binary_utils.SingularShapeFileReader(infile) as _readable: metadata = json.loads(_readable.metadata) metadata['source'] = args.infile compress = not metadata.get('compress_conformers', False) reader = iter(_readable) for part_index in range(num_parts): outfile = outfile_fmt.format(part_index + 1) writer = shape_binary_utils.ShapeFileWriter(outfile, json.dumps(metadata), compress=compress) num_written = 0 with writer: for i in range(num_molecules_per_outfile): try: writer.append(*next(reader)) num_written += 1 except StopIteration: break logger.info(f"wrote {num_written} entries to '{outfile}'") if backend: backend.addRequiredOutputFile(outfile)
# ============================================================================= # Command line # =============================================================================
[docs]def add_merge_arguments(parser): """ Adds arguments for merging Shape data file(s) to target Shape data file. :param parser: Argument parser object :type parser: argparse.ArgumentParser """ # Required arguments req_grp = parser.add_argument_group("Required Arguments") req_grp.add_argument("-target_file", metavar="<target_file>", required=True, help="Path of the merged Shape data file. \ If -shape_data_treatment is set to remote, \ valid absolute paths at the execution host must be provided.") req_grp.add_argument("-infile", metavar="<infile>", action="append", help="Path(s) of Shape data file(s) to be merged to \ the target_file. Multiple Shape data files \ can be included by specifying multiple -infile options, \ one per Shape data file. If -shape_data_treatment is set to \ remote, valid absolute paths at the execution host \ must be provided.") req_grp.add_argument( "-infile_list", metavar="<infile_list>", help="Name of the file that lists path(s) of Shape data file(s) \ to be merged to the target_file (one per line). \ If -shape_data_treatment is set to remote, \ valid absolute paths at the execution host must be provided.") req_grp.add_argument("-source", metavar='<string>', help="String to be set as the 'source' attribute in \ the target_file. If unspecified, names of the input files \ will be used. This option does not work for append mode.") req_grp.add_argument( "-append", action="store_true", help="Append the infile(s) to the exsiting target_file instead of \ merging to a new Shape data file. Merging instead of \ appending is recommended to for better compression") # Jobcontrol options jobcontrol_options = [cmdline.JOBNAME, cmdline.HOST] cmdline.add_jobcontrol_options(parser, options=jobcontrol_options)
[docs]def validate_merge_args(args): """ Validates meger/append task command-line specified arguments. :param args: argument namespace with command line options :type args: argparse.Namespace :return: tuple of validation success, and error message :rtype: bool, str """ for fn in args.infile: args.infile = [phase_utils.get_proper_path(fn) for fn in args.infile] args.target_file = phase_utils.get_proper_path(args.target_file) to_be_validated = args.infile.copy() copy_shapes = args.shape_data_treatment != REMOTE_BINARY if not to_be_validated: return (False, "no Shape data file to %s" % args.task) if not copy_shapes and not jobcontrol.under_job_control(): for f in args.infile + [args.target_file]: if not os.path.isabs(f): return (False, "Absolute path is expected for remote %s" % args.task) return True, "" if args.append: to_be_validated += [args.target_file] else: if os.path.isfile(args.target_file): return (False, "target_file %s: already exists" % args.target_file) target_format = phase.get_phase_file_format(args.target_file) if target_format != phase.PhpFileFormat_PHP_FORMAT_SHAPE_BIN: return (False, "target_file %s: not a Shape data file" % args.target_file) for fn in to_be_validated: if not os.path.isfile(fn): return (False, "%s: does not exist" % fn) if not shape.is_shape_binary_file(fn): return (False, "%s: not a Shape data file" % fn) first_metadata = None for fn in to_be_validated: with shape_binary_utils.ShapeFileReader(fn) as reader: metadata = json.loads(reader.metadata) ok, msg = shape_creation_logic.validate_shape_data_file_metadata( metadata) if not ok: return False, f"{fn}: {msg}" if first_metadata is None: first_metadata = metadata else: ok, msg = shape_creation_logic.validate_shape_data_file_metadata_compatibility( first_metadata, metadata) if not ok: return False, f"{to_be_validated[0]}, {fn}: {msg}" ok, msg = validate_conformer_related_compatibility( first_metadata, metadata) if not ok: return False, f"{to_be_validated[0]}, {fn}: {msg}" return True, ""
[docs]def add_split_arguments(parser): """ Adds arguments for splitting Shape data files. :param parser: Argument parser :type parser: argparse.ArgumentParser """ # Required arguments req_grp = parser.add_argument_group("Required Arguments") req_grp.add_argument("-infile", metavar="<path>", required=True, help="Shape data filename.") req_grp.add_argument("-outprefix", metavar="<string>", required=True, help="Output filenames prefix. \ If -shape_data_treatment is set to \"remote\", \ it may include valid absolute path at the execution host.") req_grp.add_argument( "-num_parts", metavar="<number>", type=int, required=True, help="Number of parts for the shape data to be split into.") # Jobcontrol options jobcontrol_options = [cmdline.HOST] cmdline.add_jobcontrol_options(parser, options=jobcontrol_options)
[docs]def validate_split_args(args): """ Validates "split" task command-line arguments. :param args: arguments to be validated :type args: argparse.Namespace :return: tuple of validation success, and error message :rtype: bool, str """ outdir, outbase = os.path.split(args.outprefix) if args.shape_data_treatment == COPY_BINARY: if outdir: return (False, "-outprefix may include path component " "only with \"remote\" shape data treatment.") else: if not os.path.isabs(args.infile): return False, "-infile must be an absolute path." if not os.path.isabs(outdir): return False, "-outprefix path component must be absolute." if (args.shape_data_treatment == COPY_BINARY or jobcontrol.under_job_control()): infile = phase_utils.get_proper_path(args.infile) if not shape.is_shape_binary_file(infile): return False, f"{infile}: not a Shape data file." if not args.num_parts > 1: return False, "-num_parts argument must be greater than one." return True, ""