"""
Module with common functionality for all Phase backends.
Copyright Schrodinger LLC, All Rights Reserved.
"""
import gzip
import os
import schrodinger.utils.subprocess as subprocess
from schrodinger import structure
from schrodinger.infra import phase
from schrodinger.job import jobcontrol
from schrodinger.utils import fileutils
[docs]class RestrictedRange(object):
"""
Provides generalized range checking suitable for the add_argument function
of argparser.ArgumentParser. For example::
parser = argparse.ArgumentParser()
parser.add_argument("-dihed", type=float, metavar="<degrees>",
choices=[RestrictedRange(-180.0, 180.0)],
help="Dihedral angle in degrees.")
parser.add_argument("-path", type=int, metavar="<length>",
choices=[RestrictedRange(1, None)],
help="Non-zero path length in bonds.")
More general usage is as follows::
legal_range = RestrictedRange(-180.0, 180.0)
dihed = 120.0
if dihed in legal_range:
print("Dihedral is legal")
"""
[docs] def __init__(self,
lower_limit,
upper_limit,
lower_inclusive=True,
upper_inclusive=True):
"""
Constructor taking lower and upper limits and whether those limits
are inclusive. Use None for a limit that doesn't exist.
:param lower_limit: Lower limit of legal range
:type lower_limit: Any numeric type or None
:param upper_limit: Upper limit of legal range
:type upper_limit: Any numeric type or None
:param lower_inclusive: True if lower limit is inclusive
:type lower_inclusive: bool
:param upper_inclusive: True if upper limit is inclusive
:type upper_inclusive: bool
"""
if lower_limit is None and upper_limit is None:
raise ValueError("Lower and upper limits cannot both be None")
self.lower_limit = lower_limit
self.upper_limit = upper_limit
self.lower_inclusive = lower_inclusive
self.upper_inclusive = upper_inclusive
self._make_repr()
[docs] def __contains__(self, value):
"""
The "in" operator for the provided value.
"""
lower_ok = True
if self.lower_limit is not None:
if self.lower_inclusive:
lower_ok = value >= self.lower_limit
else:
lower_ok = value > self.lower_limit
upper_ok = True
if lower_ok and self.upper_limit is not None:
if self.upper_inclusive:
upper_ok = value <= self.upper_limit
else:
upper_ok = value < self.upper_limit
return lower_ok and upper_ok
def __eq__(self, value):
"""
The "==" operator for the provided value, which is equivalent to "in".
argparse needs this.
"""
return self.__contains__(value)
def __repr__(self):
"""
Returns the representation string.
"""
return self.repr
def _make_repr(self):
"""
Constructs a representation string that's suitable for display in an
argparse error message.
"""
if self.lower_limit is None:
# Use <= or <.
comp = "<="
if not self.upper_inclusive:
comp = "<"
self.repr = "values {} {}".format(comp, self.upper_limit)
elif self.upper_limit is None:
# Use >= or >.
comp = ">="
if not self.lower_inclusive:
comp = ">"
self.repr = "values {} {}".format(comp, self.lower_limit)
else:
# Use an interval.
left_bracket = "["
if not self.lower_inclusive:
left_bracket = "("
right_bracket = "]"
if not self.upper_inclusive:
right_bracket = ")"
self.repr = "the interval {}{}, {}{}".format(
left_bracket, self.lower_limit, self.upper_limit, right_bracket)
[docs]class ValidationError(Exception):
"""
Used to simplify error passing in argument validation code.
"""
pass
[docs]def combine_log_files(subjobs, logger):
"""
Concatenates the contents of subjob log files.
:param subjobs: Subjob names
:type subjobs: list(str)
:param logger: Logger to which concatenated log files are to be written
:type logger: Logger
"""
logger.info("Combining subjob log files")
for subjob in subjobs:
logfile = subjob + ".log"
if os.path.isfile(logfile):
logger.info("\n*** Contents of %s ***\n" % logfile)
with open(logfile, 'r') as fh:
for line in fh:
logger.info(line.rstrip())
logger.info("\n*** End of %s ***" % logfile)
else:
logger.info("\n*** %s is not a file ***\n" % logfile)
[docs]def convert_to_sd(maefile, sdfile):
"""
Converts a Maestro file to a compressed SD file. Fails only if maefile
is missing or defective.
:param maefile: Maestro file to be converted
:type maefile: str
:param sdfile: Output SD file, which is assuemd to be compressed
:param sdfile: str
"""
fileutils.force_remove(sdfile)
if not os.path.isfile(maefile):
raise OSError("Cannot convert to SD: \"%s\" not found" % maefile)
# It's OK if maefile has no structures, but don't try to run sdconvert
# because it will fail with an error.
with structure.StructureReader(maefile) as reader:
try:
next(reader)
has_structures = True
except StopIteration:
has_structures = False
if not has_structures:
# Create a compressed SD file with only the zlib header.
with gzip.open(sdfile, 'wb'):
pass
return
# maefile has at least one structure. If conversion fails, it's because
# something else is wrong with it.
schrodinger = os.environ["SCHRODINGER"]
sdconvert = os.path.join(schrodinger, "utilities", "sdconvert")
command = [sdconvert, "-imae", maefile, "-osd", sdfile]
# TODO: Use subprocess.getstatusoutput when we switch over to Python 3.
rc = subprocess.call(command)
if rc:
raise OSError("Command failed: %s" % " ".join(command))
[docs]def get_default_feature_definitions():
"""
Reads and returns the default Phase feature definitions.
:return: Default Phase feature definitions.
:rtype: list(phase.PhpFeatureDefinition)
"""
return list(phase.read_feature_definitions(phase.getDefaultFdFilePath()))
[docs]def get_file_names_from_list_file(list_file, prefer_cwd=False):
"""
Returns the names of the files in the provided list file, taking proper
account of whether the current process is running under job control and
whether any of the files are Phase databases.
:param list_file: Name of the .list file
:type source: str
:param prefer_cwd: Use file in CWD if it exists, even if the file also
exists in the location specified in list_file
:type prefer_cwd: bool
:return: Names of files in the .list file
:rtype: list(str)
"""
local_list_file = get_proper_path(list_file)
if not os.path.isfile(local_list_file):
raise OSError("File \"%s\" not found" % local_list_file)
file_names = read_lines_in_file(local_list_file)
proper_path_names = [get_proper_path(f) for f in file_names]
if prefer_cwd:
for i in range(len(proper_path_names)):
cwd_file = os.path.basename(proper_path_names[i])
if os.path.isfile(cwd_file):
proper_path_names[i] = cwd_file
return proper_path_names
[docs]def get_jobname(args, filename):
"""
Returns the job name from subjob if defined, otherwise via jobcontrol.
:param args: Command line arguments
:type args: argparse.Namespace
:param filename: Name of file to use as a last resort job name
:type filename: str
:return: job name
:rtype: str
"""
return args.subjob or jobcontrol.get_jobname(filename)
[docs]def get_internal_zip_path(*argv):
"""
Joins the components of a path within a Zip file using forward slashes,
as per the Zip spec.
:param argv: Components in path
:type argv: tuple of str
:return: Components in path joined with forward slashes
:rtype: str
"""
return "/".join(argv)
[docs]def get_proper_path(file_path, use_runtime=False):
"""
Returns the appropriate path to use for the provided file, taking into
account whether the current process is running under job control and whether
file_path is a Phase database.
:param file_path: File whose proper path is being sought
:type file_path: str
:param use_runtime: Forces use of runtime path even if not running as job
:type use_runtime: bool
:return: The proper path to use for the provided file
:rtype: str
"""
# Allow empty string.
if not file_path:
return file_path
# Remove trailing slash, if there is one.
file_path = file_path.rstrip('\\/')
if is_phase_database_path(file_path):
# Don't alter a database path.
return file_path
elif use_runtime or jobcontrol.under_job_control():
return jobcontrol.get_runtime_path(file_path)
else:
return file_path
[docs]def get_subjob_names(num_subjobs, prefix):
"""
Returns a list of subjobs names of the form <prefix>_sub_<n>, where <n>
runs from 1 to num_subjobs.
:param num_subjobs: Number of subjobs
:type num_subjobs: int
:param prefix: Prefix for all subjob names
:type prefix: str
:return: Subjob names
:rtype: list(str)
"""
subjob_names = []
for i in range(num_subjobs):
subjob_names.append("%s_sub_%d" % (prefix, i + 1))
return subjob_names
[docs]def is_numeric(value):
"""
Returns True if the provided string value can be cast to a float.
:param value: The string to be tested
:type value: str
:return: Whether value is numeric
:rtype: bool
"""
try:
float(value)
except ValueError:
return False
return True
[docs]def is_phase_database_path(source):
"""
Returns whether the provided source of structures is a Phase database.
:param source: Path to source of structures
:type source: str
:return: whether the source is a Phase database
:rtype: bool
"""
source_format = phase.get_phase_file_format(source)
return source_format == phase.PhpFileFormat_PHP_FORMAT_PHDB
[docs]def is_phase_project_path(file_path, zipped=False):
"""
Returns True if the file_path corresponds to a Phase project. Set zipped
to True to check for a zipped project.
:param file_path: Path to file
:param file_path: str
:return: Whether file_path is a Phase project (or zipped project)
:rtype: bool
"""
file_format = phase.get_phase_file_format(file_path)
expected_format = phase.PhpFileFormat_PHP_FORMAT_PHPRJ
if zipped:
expected_format = phase.PhpFileFormat_PHP_FORMAT_PHZIP
return file_format == expected_format
[docs]def read_lines_in_file(file_name):
"""
Reads and returns the non-empty lines in a file.
:param file_name: The file to read
:type file_name: str
:return: The lines in the file, minus any leading/trailing whitespace
:rtype: list(str)
"""
with open(file_name, 'r') as fh:
stripped_lines = [line.strip() for line in fh.readlines()]
return [line for line in stripped_lines if line]
[docs]def write_list_to_file(file_name, list_of_strings):
"""
Writes a list of strings to a file with newlines after each string.
:param file_name: Name of file to which the strings should be written
:type file_name: str
:param list_of_strings: The list of strings to write
:type list_of_string: list(str)
"""
write_string_to_file(file_name, "\n".join(list_of_strings))
[docs]def write_string_to_file(file_name, s):
"""
Writes a string to a file with a trailing newline.
:param file_name: Name of file to which the string should be written
:type file_name: str
:param s: The string to write
:type s: str
"""
with open(file_name, 'w') as fh:
fh.write(s + "\n")