'''
Module to merge or append Shape data files (.bin).
'''
import json
import math
import os
from schrodinger import shape
from schrodinger.application.phase.packages import phase_utils
from schrodinger.application.phase.packages import shape_binary_utils
from schrodinger.application.phase.packages import shape_creation_logic
from schrodinger.infra import phase
from schrodinger.job import jobcontrol
from schrodinger.utils import cmdline
from schrodinger.utils import log
# Shape data file treatment
COPY_BINARY = shape_binary_utils.COPY_BINARY
REMOTE_BINARY = shape_binary_utils.REMOTE_BINARY
# Task names
MERGE_TASK = 'merge'
SPLIT_TASK = 'split'
logger = log.get_output_logger('shape.shape_screen_gpu_update')
[docs]class ShapeDataUpdateDriver():
[docs] def __init__(self, args):
"""
Initializes the driver for mergeing or appending shape data file(s).
:param args: argument namespace with command line options
:type args: argparse.Namespace
"""
self._task = args.task
self._append = args.append
self._target_file = args.target_file
self._infiles = args.infile
self._source = args.source
[docs] def runMerge(self):
"""
Merge or append Shape data files in self._infile to self._target_file.
"""
with shape_binary_utils.ShapeFileReader(self._infiles[0]) as reader:
metadata = json.loads(reader.metadata)
compress = not metadata.get('compress_conformers', False)
if not self._append:
sub_task = "merge"
metadata['source'] = self._source if self._source else ':'.join(
self._infiles)
metadata = json.dumps(metadata)
writer = shape_binary_utils.ShapeFileWriter(self._target_file,
metadata,
compress=compress)
else:
sub_task = "append"
writer = shape_binary_utils.ShapeFileWriter(self._target_file,
'',
compress=compress)
total_num_entries, total_num_shapes = 0, 0
with writer:
for fn in self._infiles:
num_entries, num_shapes = 0, 0
with shape_binary_utils.SingularShapeFileReader(fn) as reader:
for (title, shapes) in reader:
writer.append(title, shapes)
num_entries += 1
num_shapes += shapes.size()
logger.info(' %s: %d entries, %d shapes.' %
(fn, num_entries, num_shapes))
total_num_entries += num_entries
total_num_shapes += num_shapes
logger.info(
'>>>Total %s counts for %s: %d entries, %d shapes.' %
(sub_task, self._target_file, total_num_entries, total_num_shapes))
logger.info('%s finished successfully.' % sub_task)
[docs]def split_shape_data_file(args):
"""
The "split" task.
:param args: argument namespace with command line options
:type args: argparse.Namespace
"""
infile = phase_utils.get_proper_path(args.infile)
num_molecules = 0
with shape_binary_utils.ShapeFileReader(infile) as reader:
for (shapes, titles) in reader:
num_molecules += len(titles)
too_few_molecules = f'{args.infile}: too few molecules'
if num_molecules == 0:
logger.info(too_few_molecules)
return
num_molecules_per_outfile = int(math.ceil(num_molecules / args.num_parts))
num_parts = int(math.ceil(num_molecules / num_molecules_per_outfile))
if num_parts < args.num_parts:
logger.info(too_few_molecules)
return
if args.shape_data_treatment == REMOTE_BINARY:
backend = None
else:
backend = jobcontrol.get_backend()
num_digits = len(str(num_parts))
outfile_fmt = f'{args.outprefix}_{{:0{num_digits}d}}.bin'
with shape_binary_utils.SingularShapeFileReader(infile) as _readable:
metadata = json.loads(_readable.metadata)
metadata['source'] = args.infile
compress = not metadata.get('compress_conformers', False)
reader = iter(_readable)
for part_index in range(num_parts):
outfile = outfile_fmt.format(part_index + 1)
writer = shape_binary_utils.ShapeFileWriter(outfile,
json.dumps(metadata),
compress=compress)
num_written = 0
with writer:
for i in range(num_molecules_per_outfile):
try:
writer.append(*next(reader))
num_written += 1
except StopIteration:
break
logger.info(f"wrote {num_written} entries to '{outfile}'")
if backend:
backend.addRequiredOutputFile(outfile)
# =============================================================================
# Command line
# =============================================================================
[docs]def add_merge_arguments(parser):
"""
Adds arguments for merging Shape data file(s) to
target Shape data file.
:param parser: Argument parser object
:type parser: argparse.ArgumentParser
"""
# Required arguments
req_grp = parser.add_argument_group("Required Arguments")
req_grp.add_argument("-target_file",
metavar="<target_file>",
required=True,
help="Path of the merged Shape data file. \
If -shape_data_treatment is set to remote, \
valid absolute paths at the execution host must be provided.")
req_grp.add_argument("-infile",
metavar="<infile>",
action="append",
help="Path(s) of Shape data file(s) to be merged to \
the target_file. Multiple Shape data files \
can be included by specifying multiple -infile options, \
one per Shape data file. If -shape_data_treatment is set to \
remote, valid absolute paths at the execution host \
must be provided.")
req_grp.add_argument(
"-infile_list",
metavar="<infile_list>",
help="Name of the file that lists path(s) of Shape data file(s) \
to be merged to the target_file (one per line). \
If -shape_data_treatment is set to remote, \
valid absolute paths at the execution host must be provided.")
req_grp.add_argument("-source",
metavar='<string>',
help="String to be set as the 'source' attribute in \
the target_file. If unspecified, names of the input files \
will be used. This option does not work for append mode.")
req_grp.add_argument(
"-append",
action="store_true",
help="Append the infile(s) to the exsiting target_file instead of \
merging to a new Shape data file. Merging instead of \
appending is recommended to for better compression")
# Jobcontrol options
jobcontrol_options = [cmdline.JOBNAME, cmdline.HOST]
cmdline.add_jobcontrol_options(parser, options=jobcontrol_options)
[docs]def validate_merge_args(args):
"""
Validates meger/append task command-line specified arguments.
:param args: argument namespace with command line options
:type args: argparse.Namespace
:return: tuple of validation success, and error message
:rtype: bool, str
"""
for fn in args.infile:
args.infile = [phase_utils.get_proper_path(fn) for fn in args.infile]
args.target_file = phase_utils.get_proper_path(args.target_file)
to_be_validated = args.infile.copy()
copy_shapes = args.shape_data_treatment != REMOTE_BINARY
if not to_be_validated:
return (False, "no Shape data file to %s" % args.task)
if not copy_shapes and not jobcontrol.under_job_control():
for f in args.infile + [args.target_file]:
if not os.path.isabs(f):
return (False,
"Absolute path is expected for remote %s" % args.task)
return True, ""
if args.append:
to_be_validated += [args.target_file]
else:
if os.path.isfile(args.target_file):
return (False, "target_file %s: already exists" % args.target_file)
target_format = phase.get_phase_file_format(args.target_file)
if target_format != phase.PhpFileFormat_PHP_FORMAT_SHAPE_BIN:
return (False,
"target_file %s: not a Shape data file" % args.target_file)
for fn in to_be_validated:
if not os.path.isfile(fn):
return (False, "%s: does not exist" % fn)
if not shape.is_shape_binary_file(fn):
return (False, "%s: not a Shape data file" % fn)
first_metadata = None
for fn in to_be_validated:
with shape_binary_utils.ShapeFileReader(fn) as reader:
metadata = json.loads(reader.metadata)
ok, msg = shape_creation_logic.validate_shape_data_file_metadata(
metadata)
if not ok:
return False, f"{fn}: {msg}"
if first_metadata is None:
first_metadata = metadata
else:
ok, msg = shape_creation_logic.validate_shape_data_file_metadata_compatibility(
first_metadata, metadata)
if not ok:
return False, f"{to_be_validated[0]}, {fn}: {msg}"
ok, msg = validate_conformer_related_compatibility(
first_metadata, metadata)
if not ok:
return False, f"{to_be_validated[0]}, {fn}: {msg}"
return True, ""
[docs]def add_split_arguments(parser):
"""
Adds arguments for splitting Shape data files.
:param parser: Argument parser
:type parser: argparse.ArgumentParser
"""
# Required arguments
req_grp = parser.add_argument_group("Required Arguments")
req_grp.add_argument("-infile",
metavar="<path>",
required=True,
help="Shape data filename.")
req_grp.add_argument("-outprefix",
metavar="<string>",
required=True,
help="Output filenames prefix. \
If -shape_data_treatment is set to \"remote\", \
it may include valid absolute path at the execution host.")
req_grp.add_argument(
"-num_parts",
metavar="<number>",
type=int,
required=True,
help="Number of parts for the shape data to be split into.")
# Jobcontrol options
jobcontrol_options = [cmdline.HOST]
cmdline.add_jobcontrol_options(parser, options=jobcontrol_options)
[docs]def validate_split_args(args):
"""
Validates "split" task command-line arguments.
:param args: arguments to be validated
:type args: argparse.Namespace
:return: tuple of validation success, and error message
:rtype: bool, str
"""
outdir, outbase = os.path.split(args.outprefix)
if args.shape_data_treatment == COPY_BINARY:
if outdir:
return (False, "-outprefix may include path component "
"only with \"remote\" shape data treatment.")
else:
if not os.path.isabs(args.infile):
return False, "-infile must be an absolute path."
if not os.path.isabs(outdir):
return False, "-outprefix path component must be absolute."
if (args.shape_data_treatment == COPY_BINARY or
jobcontrol.under_job_control()):
infile = phase_utils.get_proper_path(args.infile)
if not shape.is_shape_binary_file(infile):
return False, f"{infile}: not a Shape data file."
if not args.num_parts > 1:
return False, "-num_parts argument must be greater than one."
return True, ""