"""
Provide an API that allows a definition of a job.
A job is composed of two pieces:
1) A task specification that defines a generic command-line invocation
and defines what input files are needed and what output files are
produced.
2) Runtime parameters that include specific input and output filenames.
To launch a job, you must also provide a set of launch parameters, which
specify things like the job host, number of threads per process, etc.
To construct a job specification for a script that you want to run under job
control, use the JobSpecificationArgsBuilder. This creates a
JobSpecification from a list of command-line arguments. You must indicate
the specific input and output files in the command-line arguments so they
can be transferred and retrieved.
"""
# A short time after writing this, it feels too complicated. But, we think
# this module will go away with a shift to protobuf-based JobSpecifications
# and so refactoring is likely to be a waste.
#
# In case this doesn't happen, some refactoring notes that may or may not
# make sense to others:
#
# 1) TaskSpecificationBuilder and JobSpecificationBuilder are unused
# outside of tests, which makes them good candidates for elimination.
# 2) The only point of the conversion to/from JSON is to make sure that
# the state is serializable; i.e. an attempt to ensure separation of
# job specification/state from launching behavior. If we can guarantee
# this some other way, the JSON can go away. We could possibly instead
# add an introspective test that the specification class doesn't gain
# any unwanted behavior.
# 3) _RuntimeParameters is unnecessary if the JobSpecification stops
# trying to be a template for jobs and instead becomes an instance of a
# specific job. Job templates are currently unused!
import errno
import json
import os
import pathlib
import re
import urllib
from typing import Optional
from urllib.parse import urljoin
import schrodinger.job.jobcontrol as jobcontrol
from schrodinger.infra import licensing
from schrodinger.utils import fileutils
from schrodinger.utils.env import prepend_sys_path
with prepend_sys_path(os.environ['MMSHARE_EXEC']):
import toplevel
_KEY_RE = re.compile("[a-zA-Z][a-zA-Z_0-9]+")
_VAR_RE = re.compile("<(%s)>" % _KEY_RE.pattern)
_JOBNAME_KEY = "JOBNAME"
_STDOUT_KEY = "STDOUT"
_STDERR_KEY = "STDERR"
_RESERVED_KEYS = {_JOBNAME_KEY, _STDOUT_KEY, _STDERR_KEY}
_DEFAULT_STDERR = "stderr"
_DEFAULT_STDOUT = "stdout"
_RESERVED_OUTPUT_FILENAMES = (_DEFAULT_STDERR, _DEFAULT_STDOUT)
#Keys used in the JSON serialization.
_COMMAND_LINE_ARGUMENTS = "command_line_arguments"
_FILENAME_SOURCE = "filename_source"
_HOST = "host"
_INCORPORATE = "incorporate"
_INPUT = "input"
_INPUT_FILE = "input_file"
_INPUT_FILENAME = "input_filename"
_JOB_USES_TPP = "job_uses_tpp"
_JOBNAME = "jobname"
_LICENSES = "licenses"
_OUTPUT = "output"
_OUTPUT_FILE = "output_file"
_OUTPUT_FILENAME = "output_filename"
_DRIVER_RESERVES_CORES = "processes_consumed_by_driver"
_PRODUCT = "product"
_PROGRAM_NAME = "progname"
_RUNTIME_PARAMETERS = "runtime_parameters"
_RUNTIME_PATH = "runtime_path"
_SCHRODINGER = "schrodinger"
_SOURCE = "source"
_STREAM = "stream"
_TASK_SPECIFICATION = "task_specification"
_TYPE = "type"
_USE_RUN = "use_run"
_VALUE = "value"
_JOB_STOPPABLE = "stoppable"
_TASK_INPUT_TYPES = {_INPUT_FILENAME, _OUTPUT_FILENAME}
_RUNTIME_INPUT_TYPES = {_INPUT_FILE, _OUTPUT_FILENAME}
[docs]class TaskSpecificationError(RuntimeError):
pass
[docs]class RuntimeParametersError(RuntimeError):
pass
[docs]class SpecificationKeyError(KeyError):
"""
An error thrown when a key is missing from a serialized specification.
"""
spec_description = "specification"
[docs] def __init__(self, key, context=None):
if context is None:
context = "the " + self.spec_description
super().__init__(f"No '{key}' key present in {context}.")
[docs]class TaskSpecificationKeyError(SpecificationKeyError):
spec_description = "task specification"
[docs]class RuntimeParametersKeyError(SpecificationKeyError):
spec_description = "runtime parameters"
[docs]class JobSpecificationKeyError(SpecificationKeyError):
spec_description = "job specification"
[docs]def check_valid_key(key):
"""
Make sure the provided key is valid. Raise a RuntimeError if it's not.
"""
if key in _RESERVED_KEYS:
raise RuntimeError("%s is a reserved key name." % key)
elif not _KEY_RE.match(key):
raise RuntimeError(
"'%s' is a bad key name. It must start with a-z (upper or "
"lowercase) and contain only letters, numbers, or "
"underscores." % key)
[docs]class TaskSpecification:
"""
A class that holds the information necessary to run a task on a compute
resource.
It holds a templated shell command along with info about required input,
output created, and information about how to generate a jobname. It
doesn't hold any information specific to a job run - e.g. hostname,
input filenames, output filenames.
This class has no methods that modify instance state. Use a builder
class (TaskSpecificationBuilder or JobSpecificationBuilder) to set up a
TaskSpecification.
"""
[docs] def __init__(self):
self._command_line_arguments = []
self._input = {}
self._jobname_source = None
self._program_name = None
self._output = {}
self._schrodinger = {}
self._licenses = []
self._driver_reserves_cores = False
self._job_uses_tpp = False
self._stoppable = False
[docs] def validate(self):
"""
Make sure that the variables used in the command-line template are
defined with input or output specifications. Raise a
TaskSpecificationError if not.
"""
commandline_keys = set()
for arg in self.getCommandTemplate():
m = _VAR_RE.search(arg)
if m:
key = m.group(1)
# _JOBNAME_KEY is special and not in input/output files.
if key != _JOBNAME_KEY:
commandline_keys.add(m.group(1))
input_and_output_keys = set()
input_and_output_keys.update(self.getInputKeys())
input_and_output_keys.update(self.getOutputKeys())
if not commandline_keys.issubset(input_and_output_keys):
raise TaskSpecificationError(
"The variables used in the command-line template don't match "
"up with the input and output keys. Unspecified files: %s" %
",".join(commandline_keys - input_and_output_keys))
[docs] @staticmethod
def fromJSON(json_spec):
data = json.loads(json_spec)
return TaskSpecificationBuilder(data).getTaskSpec()
[docs] def asJSON(self, **kwargs):
self.validate()
data = {}
data[_COMMAND_LINE_ARGUMENTS] = self._command_line_arguments
data[_INPUT] = self._input
if self._jobname_source:
data[_JOBNAME] = {_FILENAME_SOURCE: self._jobname_source}
if self._program_name:
data[_PROGRAM_NAME] = self._program_name
if self._output:
data[_OUTPUT] = self._output
data[_SCHRODINGER] = self._schrodinger
if self._licenses:
data[_LICENSES] = self._licenses
if self._driver_reserves_cores:
data[_DRIVER_RESERVES_CORES] = self._driver_reserves_cores
if self._job_uses_tpp:
data[_JOB_USES_TPP] = self._job_uses_tpp
if self._stoppable:
data[_JOB_STOPPABLE] = self._stoppable
return json.dumps(data, **kwargs)
[docs] def getCommandTemplate(self):
"""
Return the command-line as a list of arguments with keys in the form
of <KEY> that will be replaced with runtime parameters.
"""
return self._command_line_arguments
[docs] def needsExplicitJobname(self):
"""
Return True if a jobname must be specified as part of the runtime
parameters.
"""
if self._jobname_source:
return False
for output_file in self._output.values():
if _filename_uses_jobname(output_file[_RUNTIME_PATH]):
return True
for arg in self.getCommandTemplate():
if _filename_uses_jobname(arg):
return True
return False
[docs] def getJobnameSource(self):
"""
Return the input key that should be used to derive the jobname.
"""
return self._jobname_source
[docs] def getProgramName(self):
return self._program_name
[docs] def isStoppable(self):
"""
If True, a job is stoppable by sending 'halt' message to the backend.
And transfer output files if any. Otherwise, job supervisor kills the backend
and transfer output files.
"""
return self._stoppable
[docs] def getLicenses(self):
"""
Returns licenses as a list of tuples of ( license_name, num_tokens)
"""
return self._licenses
[docs] def driverReservesCores(self):
"""
If True, the driver will allocate N slots matching
-HOST hostname: N processes. This is useful when the driver
allocates parallel slots on one node so it can resubmit jobs to
localhost. If no N is specified, assume to be 1.
This is an optimization to avoid resubmission back to the queue
if we know we are running a set of parallel jobs with the same
CPU requirements.
"""
return self._driver_reserves_cores
[docs] def jobUsesTPP(self):
"""
If True, this job will allocate N slots on one node matching -TPP N.
If False, this job only passes TPP on as an option for subjobs.
TPP stands for Threads Per Process.
"""
return self._job_uses_tpp
[docs] def getOutputKeys(self):
"""
Return a list of keys that define outputs.
"""
return list(self._output)
[docs] def useSchrodingerRun(self):
"""
Return whether to run the command under $SCHRODINGER/run.
"""
return self._schrodinger.get(_USE_RUN)
[docs] def useSchrodingerProduct(self):
"""
Return whether to run the command under $SCHRODINGER/run.
"""
return self._schrodinger.get(_PRODUCT)
[docs]def var(string):
"""
Format the provided string to be used as a variable in the
TaskSpecification command-line template.
"""
return "<" + string + ">"
def _filename_uses_jobname(filename):
"""
Check whether the file depends on the JOBNAME key.
"""
return var(_JOBNAME_KEY) in filename
[docs]class TaskSpecificationBuilder:
"""
A class for building up a TaskSpecification from a specific list of
command-line arguments.
"""
[docs] def __init__(self, data=None):
self._task_spec = TaskSpecification()
if data:
self._fromData(data)
[docs] def getTaskSpec(self):
"""
Return a TaskSpecification from this builder.
"""
self._task_spec.validate()
return self._task_spec
[docs] def setCommandLine(self,
args,
use_schrodinger_run=True,
schrodinger_product=None):
"""
Set the command line as provided. Add a SCHRODINGER/run if
requested.
:param args: The command-line script and arguments.
:type args: list of str
:param use_schrodinger_run: If True, run the command under
$SCHRODINGER/run.
:type schrodinger_run: bool
:param schrodinger_product: A product directory to search for the
script/executable. This should be the name of a directory under
SCHRODINGER without the trailing version (i.e. the "-v*" part).
:type schrodinger_product: str
"""
self._task_spec._command_line_arguments = args[:]
self._task_spec._schrodinger[_USE_RUN] = use_schrodinger_run
if schrodinger_product:
if not use_schrodinger_run:
raise RuntimeError(
"Specifying a Schrodinger product without using "
"'$SCHRODINGER/run' is inconsistent")
self._task_spec._schrodinger[_PRODUCT] = schrodinger_product
[docs] def addLicense(self, license_name, license_tokens):
"""
:param license_name: Name of a license token that is checked out.
:type license_name: schrodinger.infra.licensing.MMLIC3*
:param license_tokens: The number of tokens required by this job type.
:type license_tokens: str
"""
license_string = licensing.getFeatureName(license_name)
self._task_spec._licenses.append((license_string, license_tokens))
[docs] def setOutputFile(self, key, runtime_path, stream=False, incorporate=False):
"""
:param runtime_path: The path of an output file that will be
generated. May contain input keys in the form <KEY> or <JOBNAME>
(if a jobname source has been specified).
:type runtime_path: str
:param key: A key to be used to reference this output file.
:type key: str
:param stream: If True and if possible, the output file will be streamed
back while the job is running. Defaults to False. Optional.
:type stream: bool
"""
check_valid_key(key)
if runtime_path in _RESERVED_OUTPUT_FILENAMES:
raise TaskSpecificationError("'%s' is a reserved output filename." %
runtime_path)
_validate_output_in_subdir(runtime_path)
# Don't allow the same output destination except for stderr/stdout.
stdout_stderr = {_STDOUT_KEY, _STDERR_KEY}
for (output_key, value) in self._task_spec._output.items():
if (value[_RUNTIME_PATH] == runtime_path and
{output_key, key} != stdout_stderr):
raise TaskSpecificationError(
"The output path of '%s' has already "
"been added for %s." % (runtime_path, output_key))
if value.get(_INCORPORATE) and incorporate:
raise TaskSpecificationError(
"The output file of '{}' already has incorporate "
"status and only one file can be incorporated".format(
output_key))
self._task_spec._output[key] = {
_TYPE: _OUTPUT_FILE,
_RUNTIME_PATH: runtime_path,
_STREAM: stream,
_INCORPORATE: incorporate,
}
def _setStdStreamLocation(self, key, runtime_path, stream=False):
"""
Specify a file for stderr or stdout to be redirected to.
:param key: _STDERR_KEY or _STDOUT_KEY
:type key: str
:param runtime_path: A runtime_path with input keys in the form
<KEY>.
:type runtime_path: str
:param stream: If True and if possible, the file will be streamed
back while the job is running. Defaults to False. Optional.
:type stream: bool
"""
redirect_keys = {_STDERR_KEY: "stderr", _STDOUT_KEY: "stdout"}
if key not in redirect_keys:
raise RuntimeError("Redirection of %s is unknown." % key)
# Don't allow redirection to a file listed as a non-redirect output
# file.
for (output_key, value) in self._task_spec._output.items():
if (value[_RUNTIME_PATH] == runtime_path and output_key != key and
output_key not in redirect_keys):
raise RuntimeError(
"The output path of '%s' has already been used for %s." %
(runtime_path, output_key))
if not _filename_uses_jobname(runtime_path):
# If stderr/stdout don't use a <JOBNAME> template they must use
# a hardwired pathname.
if _VAR_RE.search(runtime_path):
raise RuntimeError(
"A %s specification must be based on JOBNAME or "
"a hardwired value." % redirect_keys[key])
self._task_spec._output[key] = {
_TYPE: _OUTPUT_FILE,
_RUNTIME_PATH: runtime_path,
_STREAM: stream,
_INCORPORATE: False,
}
[docs] def setStderr(self, runtime_path, stream=False):
"""
:param runtime_path: A runtime_path template with input keys in
the form <KEY>.
:type runtime_path: str
:param stream: If True and if possible, the file will be streamed
back while the job is running. Defaults to False. Optional.
:type stream: bool
"""
return self._setStdStreamLocation(_STDERR_KEY, runtime_path, stream)
[docs] def setStdout(self, runtime_path, stream=False):
"""
:param runtime_path: A runtime_path template with input keys in
the form <KEY>.
:type runtime_path: str
:param stream: If True and if possible, the file will be streamed
back while the job is running. Defaults to False. Optional.
:type stream: bool
"""
return self._setStdStreamLocation(_STDOUT_KEY, runtime_path, stream)
[docs] def setStoppable(self, stoppable):
"""
If passed True, the job supervisor sends the 'halt' message to the
backend to stop. And transfer output files if any.
Otherwise, the supervisor kills the backend and transfer output files.
"""
self._task_spec._stoppable = stoppable
[docs] def setProgramName(self, program_name):
if not program_name:
raise ValueError("The value of program name cannot be empty.")
if self._task_spec._program_name:
raise RuntimeError(
"The value <%s> has already been set as the program name." % \
self._task_spec._program_name)
self._task_spec._program_name = program_name
if program_name in jobcontrol.STOPPABLE_PROGRAMS:
self.setStoppable(True)
[docs] def setDriverReservesCores(self, reserved):
"""
If passed True, the driver will allocate N slots matching
-HOST hostname: N processes. This is useful when the driver
allocates parallel slots on one node so it can resubmit jobs to
localhost. If no N is specified, assume to be 1.
This is an optimization to avoid resubmission back to the queue
if we know we are running a set of parallel jobs with the same
CPU requirements.
"""
self._task_spec._driver_reserves_cores = reserved
[docs] def setJobUsesTPP(self, uses_tpp):
"""
If passed True, this job will reserve the number of cores specified
from -TPP on the cmdline. Unfortunately, we use -TPP N to mean two
things
1) this job will reserve N processors on one node directly
2) this job takes -TPP as a cmdline argument, but only to pass along
to subjobs
In case 1, the uses_tpp should be set to True.
"""
self._task_spec._job_uses_tpp = uses_tpp
def _fromData(self, data):
self._loadCommandLine(data)
self._loadInputSpec(data)
self._loadJobname(data)
self._loadOutputSpec(data)
if _PROGRAM_NAME in data:
self._task_spec._program_name = data[_PROGRAM_NAME]
if _LICENSES in data:
self._task_spec._licenses = data[_LICENSES]
if _DRIVER_RESERVES_CORES in data:
self._task_spec._driver_reserves_cores = data[
_DRIVER_RESERVES_CORES]
if _JOB_USES_TPP in data:
self._task_spec._job_uses_tpp = data[_JOB_USES_TPP]
if _JOB_STOPPABLE in data:
self._task_spec._stoppable = data[_JOB_STOPPABLE]
def _loadCommandLine(self, data):
"""
Verify and load the command-line specification from the JSON data.
"""
key = _COMMAND_LINE_ARGUMENTS
if key not in data:
raise TaskSpecificationKeyError(key)
try:
data[key] + []
[arg + "" for arg in data[key]]
except TypeError:
raise TaskSpecificationError(
"Value for '%s' is not a list of strings." % key)
if _SCHRODINGER in data:
if _USE_RUN in data[_SCHRODINGER]:
use_run = data[_SCHRODINGER][_USE_RUN]
else:
use_run = False
if _PRODUCT in data[_SCHRODINGER]:
schrodinger_product = data[_SCHRODINGER][_PRODUCT]
else:
schrodinger_product = None
self.setCommandLine(data[key],
use_schrodinger_run=use_run,
schrodinger_product=schrodinger_product)
def _loadInputSpec(self, data):
key = _INPUT
if key not in data:
return
input_ = data[key]
type_key = _TYPE
for (key, value) in input_.items():
if type_key not in value:
raise TaskSpecificationKeyError(type_key,
"the '%s' input value." % key)
if value[type_key] not in _TASK_INPUT_TYPES:
raise TaskSpecificationError(
"Unrecognized input type '%s' for '%s' input value." %
(value[type_key], key))
self._task_spec._input = input_
def _loadJobname(self, data):
"""
Load the optional jobname spec.
"""
key = _JOBNAME
if key not in data:
return
jobname = data[key]
if _FILENAME_SOURCE not in jobname:
raise TaskSpecificationKeyError(_FILENAME_SOURCE,
"the jobname spec")
jobname_source = jobname[_FILENAME_SOURCE]
self._task_spec._jobname_source = jobname_source
if jobname_source not in self._task_spec._input:
raise TaskSpecificationError(
"The jobname source file is not in the input specification.")
elif self._task_spec._input[jobname_source][_TYPE] != \
_INPUT_FILENAME:
raise TaskSpecificationError(
"The jobname source is not an input file name.")
def _loadOutputSpec(self, data):
"""
Load the optional output spec.
"""
key = _OUTPUT
if key not in data:
return
output = data[key]
for (key, value) in output.items():
if value[_TYPE] != _OUTPUT_FILE:
raise TaskSpecificationError(
"Unrecognized output type '{}' for {}.".format(
value[_TYPE], key))
self._task_spec._output = output
class _RuntimeParameters:
"""
A class to hold job-specific input and output filenames.
"""
def __init__(self):
self._input = {}
self._jobname_val = None
def asJSON(self, **kwargs):
data = {}
data[_INPUT] = self._input
if self._jobname_val:
data[_JOBNAME] = {_VALUE: self._jobname_val}
return json.dumps(data, **kwargs)
@staticmethod
def fromJSON(json_params):
data = json.loads(json_params)
return _RuntimeParametersBuilder(data).getRuntimeParams()
def getInputKeys(self):
return list(self._input)
def getInputFiles(self):
input_files = []
for value in self._input.values():
if value[_TYPE] == _INPUT_FILE:
input_files.append((value[_SOURCE], value[_RUNTIME_PATH]))
return input_files
def getRuntimePath(self, key):
input_file_spec = self._input[key]
if input_file_spec[_TYPE] not in _RUNTIME_INPUT_TYPES:
raise RuntimeError("'%s' has an unrecognized input type of '%s'." %
(key, input_file_spec[_TYPE]))
return input_file_spec[_RUNTIME_PATH]
def getJobnameFromSource(self, jobname_source):
if jobname_source not in self._input:
raise RuntimeError(
"The jobname can't be determined because the '%s' input "
"key is not specified in the runtime parameters." %
jobname_source)
return fileutils.get_jobname(self._input[jobname_source][_RUNTIME_PATH])
def getJobname(self):
return self._jobname_val
[docs]def get_file_url(path):
"""
Get a file:// URL for a file path.
"""
return urljoin("file:", urllib.request.pathname2url(os.path.abspath(path)))
class _RuntimeParametersBuilder:
def __init__(self, data=None):
self._runtime_params = _RuntimeParameters()
self._runtime_params._jobname_val = \
os.getenv("SCHRODINGER_JOBNAME") or self._runtime_params._jobname_val
if data:
self._fromData(data)
def _fromData(self, data):
self._loadInput(data)
if _JOBNAME in data and _VALUE in data[_JOBNAME]:
self._runtime_params._jobname_val = data[_JOBNAME][_VALUE]
def _loadInput(self, data):
key = _INPUT
if key not in data:
raise RuntimeParametersKeyError(key)
else:
self._runtime_params._input = data[key]
def getRuntimeParams(self):
return self._runtime_params
def setJobname(self, jobname):
if not jobname:
raise ValueError("The value of jobname cannot be empty.")
if self._runtime_params._jobname_val:
raise RuntimeError(
"The value <%s> has already been set as the jobname." % \
self._runtime_params._jobname_val)
self._runtime_params._jobname_val = jobname
def getJobname(self):
"""
Get the currently set job name
:rtype: str or None
:return: The current set job name, or None if no name is set
"""
return self._runtime_params.getJobname()
def setInputFile(self, key, source_file, runtime_path=None):
"""
Specify the input file used.
:param key: The key that identifies the outputfile name in the
task command-line template.
:type key: str
"""
if not os.path.exists(source_file):
raise RuntimeError("Missing input file: %s" % source_file)
check_valid_key(key)
input_spec = {
_TYPE: _INPUT_FILE,
_SOURCE: get_file_url(source_file),
}
if runtime_path:
input_spec[_RUNTIME_PATH] = runtime_path
else:
input_spec[_RUNTIME_PATH] = _get_default_runtime_path(source_file)
self._runtime_params._input[key] = input_spec
def setOutputFile(self, key, runtime_path):
"""
Define an output file whose name is specified at runtime.
:param key: The key that identifies the outputfile name in the
task command-line template.
:type key: str
"""
# Output files that don't rely on jobname need to be included in the
# runtime input section because the filenames must be specified at
# runtime.
if not _filename_uses_jobname(runtime_path):
self._runtime_params._input[key] = {
_TYPE: _OUTPUT_FILENAME,
_RUNTIME_PATH: runtime_path,
}
[docs]class JobSpecification:
"""
This class provides a serializable job definiton and consists of a task
specification along with runtime parameters.
Currently unsupported features:
- Runtime requirements (e.g. GPU, memory requirements).
Features that will probably never be supported:
- The equivalent of job control's '-LOCAL' option.
- Full shell commands. (Arguments must be provided as a list of
strings. Chaining commands together is not supported. Redirection
is provided only through APIs.)
"""
[docs] def __init__(self, task_spec, runtime_params):
self._task_spec = task_spec
self._runtime_params = runtime_params
self._validate()
[docs] def asJSON(self, **kwargs):
task_spec = json.loads(self._task_spec.asJSON(**kwargs))
runtime_params = json.loads(self._runtime_params.asJSON(**kwargs))
return json.dumps(
{
_TASK_SPECIFICATION: task_spec,
_RUNTIME_PARAMETERS: runtime_params
}, **kwargs)
[docs] @staticmethod
def fromJSON(json_job_spec):
"""
Return a JobSpecification instance created from the provided JSON.
"""
data = json.loads(json_job_spec)
if _TASK_SPECIFICATION not in data:
raise JobSpecificationKeyError(_TASK_SPECIFICATION)
if _RUNTIME_PARAMETERS not in data:
raise JobSpecificationKeyError(_RUNTIME_PARAMETERS)
task_spec = TaskSpecificationBuilder(
data[_TASK_SPECIFICATION]).getTaskSpec()
runtime_params = _RuntimeParametersBuilder(
data[_RUNTIME_PARAMETERS]).getRuntimeParams()
return JobSpecification(task_spec, runtime_params)
def _validate(self):
"""
Validate that all the necessary inputs are present in the runtime
parameters. Raise RuntimeParametersError if not.
"""
runtime_input_keys = set(self._runtime_params.getInputKeys())
missing_keys = []
for input_key in self._task_spec.getInputKeys():
if input_key not in runtime_input_keys:
missing_keys.append(input_key)
else:
runtime_input_keys.remove(input_key)
if missing_keys:
if len(missing_keys) > 1:
raise RuntimeParametersError(
"The following input keys are missing from the runtime "
"parameters: %s" % ", ".join(missing_keys))
else:
raise RuntimeParametersError(
"The '%s' input key is missing from the runtime "
"parameters." % missing_keys[0])
if runtime_input_keys:
if len(runtime_input_keys) > 1:
raise RuntimeParametersError(
"The following input keys are not recognized by the task "
"specification: %s" % ", ".join(sorted(runtime_input_keys)))
else:
raise RuntimeParametersError(
"The '%s' input key is not recognized by the task "
"specification." % runtime_input_keys.pop())
[docs] def validate(self):
"""
A validation method that makes sure the JobSpecification has all the data it needs and is self-consistent.
"""
self._validate()
self._validateOutputFilenames()
if (self._task_spec.needsExplicitJobname() and
not self._runtime_params.getJobname()):
raise RuntimeParametersError(
"A jobname is needed but none was specified.")
def _validateOutputFilenames(self):
"""
Check for runtime name collisions.
"""
non_jobname_output_files = {}
jobname_output_files = {}
for key in self._task_spec.getOutputKeys():
if _filename_uses_jobname(
self._task_spec._output[key][_RUNTIME_PATH]):
if not self._task_spec._jobname_source and \
not self._runtime_params._jobname_val:
raise RuntimeParametersError(
"A jobname source hasn't been specified.")
jobname_output_files[self.getOutputFile(key)] = key
else:
non_jobname_output_files[self.getOutputFile(key)] = key
collisions = set(jobname_output_files).intersection(
non_jobname_output_files)
if collisions:
raise RuntimeParametersError(
"The jobname '%s' conflicts with the hardwired output "
"file(s) %s." % (self.getJobname(), ", ".join(collisions)))
def _getKeyValue(self, match):
"""
Return the value of the key in the provided match object.
:param match: A match of a template key.
:type match: re.MatchObject
"""
key = match.group(1)
if key == _JOBNAME_KEY:
return self.getJobname()
else:
return self._runtime_params.getRuntimePath(key)
def _subInputOrJobname(self, string):
"""
Replace template keys with the appropriate values.
"""
return _VAR_RE.sub(self._getKeyValue, string)
[docs] def getCommand(self):
"""
Return the shell command that will run the job.
:return: list of str
"""
if self._task_spec.useSchrodingerRun():
cmd = [
# Always use / here, not os.path.join.
# To be expanded by jlaunch/jmonitor
"%SCHRODINGER%/run"
]
product = self._task_spec.useSchrodingerProduct()
if product:
cmd.extend(["-FROM", product])
else:
cmd = []
for arg in self._task_spec.getCommandTemplate():
cmd.append(self._subInputOrJobname(arg))
return cmd
[docs] def getOutputFile(self, key, stream=None, incorporate=None):
"""
Get the output file corresponding to the given key from the
job specification.
:param stream: If stream=True, the file is returned only if it is
a log file. If stream=False, the file is returned only if it is
an output file. If stream=None, the file is returned.
"""
output_spec = self._task_spec._output[key]
streamval = output_spec.get(_STREAM, False)
incorporateval = output_spec.get(_INCORPORATE, False)
if (stream is None or
streamval == stream) and (incorporate is None or
incorporateval == incorporate):
return self._subInputOrJobname(output_spec[_RUNTIME_PATH])
else:
return None
[docs] def getOutputFiles(self, stream=None, incorporate=None):
"""
Get all the output files from the job specification.
:param stream: If stream=None, both output and log files are
returned. If stream=True, only log files are returned.
If stream=False, only output files are returned.
Defaults to None. Optional.
:type stream: True, False or None(Default)
:param incorporate: If incorporate=None, incorporatable and
other files are returned. If incorporate=True,
only files to be incorporated are returned.
If incoporate=False, only output files not incorporated
are returned.
:type stream: True, False or None(Default)
"""
# Use a set here because stderr/stdout can be the same.
output_files = set()
for key in self._task_spec.getOutputKeys():
outfile = self.getOutputFile(key, stream, incorporate)
if outfile is not None:
output_files.add(outfile)
return list(output_files)
def _getRedirect(self, key):
if key in self._task_spec._output:
return self.getOutputFile(key)
else:
return None
[docs] def getStderr(self):
return self._getRedirect(_STDERR_KEY)
[docs] def getStdout(self):
return self._getRedirect(_STDOUT_KEY)
[docs] def getDefaultStderr(self):
return _DEFAULT_STDERR
[docs] def getDefaultStdout(self):
return _DEFAULT_STDOUT
[docs] def useSchrodingerRun(self):
return self._task_spec.useSchrodingerRun()
[docs] def getHost(self):
return self._runtime_params.getHost()
[docs] def debugEnabled(self):
return self._runtime_params.debugEnabled()
[docs] def getJobname(self):
jobname_value = self._runtime_params._jobname_val
if jobname_value:
return jobname_value
jobname_source = self._task_spec.getJobnameSource()
if not jobname_source:
return None
else:
return self._runtime_params.getJobnameFromSource(jobname_source)
[docs] def setJobname(self, jobname):
self._runtime_params._jobname_val = jobname
[docs] def getProgramName(self):
return self._task_spec.getProgramName()
[docs] def isStoppable(self):
"""
If True, a job is stoppable by sending 'halt' message to the backend.
And transfer output files if any. Otherwise, job supervisor kills the backend
and transfer output files.
"""
return self._task_spec.isStoppable()
[docs] def driverReservesCores(self):
"""
If True, the driver will allocate N slots matching
-HOST hostname: N processes. This is useful when the driver
allocates parallel slots on one node so it can resubmit jobs to
localhost. If no N is specified, assume to be 1.
This is an optimization to avoid resubmission back to the queue
if we know we are running a set of parallel jobs with the same
CPU requirements.
"""
return self._task_spec.driverReservesCores()
[docs] def jobUsesTPP(self):
"""
If True, this job will allocate N slots on one node matching -TPP N.
If False, this job uses TPP as an option to pass along to subjobs
"""
return self._task_spec.jobUsesTPP()
[docs] def getLicenses(self):
"""
Returns licenses as a list of tuples of ( license_name, num_tokens)
"""
return self._task_spec.getLicenses()
[docs]class JobSpecificationBuilder:
"""
A helper class to create a JobSpecification from an existing
TaskSpecification.
"""
[docs] def __init__(self, task_spec):
self._task_spec = task_spec
self._runtime_params_builder = _RuntimeParametersBuilder()
def _getTaskSpec(self):
return self._task_spec
def _getRuntimeParams(self):
return self._runtime_params_builder.getRuntimeParams()
[docs] def getJobSpec(self):
return JobSpecification(self._getTaskSpec(), self._getRuntimeParams())
[docs] def setJobname(self, jobname):
self._runtime_params_builder.setJobname(jobname)
[docs] def setOutputFiles(self, **output_files):
"""
Indicate the filenames to be used for the output keys.
:param output_files: Keyword arguments with the key as OUTPUT_KEY
and the value as the associated filename.
"""
out_keys = set(self._task_spec.getOutputKeys())
for k, v in output_files.items():
if k not in out_keys:
raise KeyError("'%s' is not a valid output file key." % k)
self._runtime_params_builder.setOutputFile(k, v)
def _get_default_runtime_path(path):
"""
Absolute paths and relative paths not in a subdirectory get a runtime path
of the file basename. Relative paths in subdirs are used as is.
"""
# Determining whether something is in a subdir is not foolproof with this
# algorithm (e.g. you are in "dir" but specify a file in "subdir" as
# "../dir/subdir/filename"). In this case, you'll get runtime path of
# basename.
if os.path.isabs(path) or os.path.relpath(path).startswith(os.pardir):
return os.path.basename(path)
else:
return path
[docs]class JobSpecificationArgsBuilder:
"""
A helper class to create a JobSpecification from a specific (i.e.
non-generic) set of command-line arguments.
"""
[docs] def __init__(self,
args,
use_schrodinger_run=True,
schrodinger_product=None,
program_name=None,
default_jobname=None,
use_jobname_log=False):
"""
See TaskSpecificationBuilder.setCommandLine for argument
descriptions.
:param use_jobname_log: If True, set the STDOUT, STDERR as
<JOBNAME>.log and stream it. Default is False.
:type schrodinger_run: bool
"""
self._task_spec_builder = TaskSpecificationBuilder()
self._task_spec_builder.setCommandLine(
args, use_schrodinger_run, schrodinger_product=schrodinger_product)
self._runtime_params_builder = _RuntimeParametersBuilder()
# A script that's not findable by toplevel.find_startup won't work
# in a remote execution environment and is not supported.
script = args[0]
if schrodinger_product:
search_product = schrodinger_product
else:
search_product = "mmshare"
if not toplevel.find_startup(script, search_product, os.environ):
raise RuntimeError(f"Could not find '{script}'. "
"Scripts must be findable by $SCHRODINGER/run.")
if program_name:
self.setProgramName(program_name)
if default_jobname:
# runtime_params_builder may already have the jobname set
# (via the SCHRODINGER_JOBNAME environment variable);
# if it does, don't attempt to override it
if not self.getJobname():
self.setJobname(default_jobname)
if use_jobname_log:
self._task_spec_builder.setStdout("<JOBNAME>.log", stream=True)
self._task_spec_builder.setStderr("<JOBNAME>.log", stream=True)
def _getTaskSpec(self):
return self._task_spec_builder.getTaskSpec()
def _getRuntimeParams(self):
return self._runtime_params_builder.getRuntimeParams()
[docs] def getJobSpec(self):
return JobSpecification(self._getTaskSpec(), self._getRuntimeParams())
[docs] def setJobname(self, jobname):
self._runtime_params_builder.setJobname(jobname)
[docs] def getJobname(self):
"""
Get the job name set from runtime parameters if there is one
This will return any jobname set via command line or the setJobname
function. It will not return any jobname set using jobname_source=True
when setting an input/output file.
:rtype: str or None
:return: The current set job name, or None if no name is set
"""
return self._runtime_params_builder.getJobname()
[docs] def setProgramName(self, program_name):
self._task_spec_builder.setProgramName(program_name)
[docs] def setStoppable(self, stoppable):
"""
Mark the job as stoppable, meaning the backend has been designed to
listen for and respond to graceful stop requests through the jobcontrol
API.
Jobs that are not marked stoppable will be sent a SIGKILL and
immediately terminated upon a user stop request.
Please be aware that jobs that are marked stoppable but do not actually
implement graceful stops are bad actors and will never be stopped by a
user request.
"""
self._task_spec_builder.setStoppable(stoppable)
def _updateCommandLineArgs(self, filename, replacement):
"""
Update the input paths in the command-line arguments specified by
the user-level job invocation to create a command-line that will
work in the environment set up by job control to run the backend.
For example, absolute paths are turned into the proper relative
paths that will be available from the job directory.
"""
task_spec = self._task_spec_builder._task_spec
new_args = []
modified = 0
for arg in task_spec._command_line_arguments:
if arg == filename:
new_args.append(replacement)
modified += 1
elif arg.endswith("=" + filename):
new_args.append(arg[:-len(filename)] + replacement)
modified += 1
else:
new_args.append(arg)
task_spec._command_line_arguments = new_args
[docs] def setStderr(self, runtime_path, stream=False):
self._task_spec_builder.setStderr(runtime_path, stream)
[docs] def setStdout(self, runtime_path, stream=False):
self._task_spec_builder.setStdout(runtime_path, stream)
[docs] def setOutputFile(self,
runtime_path,
key=None,
stream=False,
incorporate=False):
"""
:param runtime_path: The path of an output file that will be
created by the job.
:type runtime_path: str
:param stream: If True and if possible, the output file will be streamed
back while the job is running. Defaults to False. Optional.
:type stream: bool
:param incoporate: If True, mark this file for incorporation in maestro.
NOTE: Only one file can be declared incorporatable. Defaults to
False. Optional.
:type incorporate: bool
"""
task_spec = self._task_spec_builder._task_spec
if key is None:
key = "OUTPUT_{}".format(len(task_spec._output))
self._task_spec_builder.setOutputFile(key, runtime_path, stream,
incorporate)
if not _filename_uses_jobname(runtime_path):
# Output files not using jobname are specified on the
# command-line.
self._updateCommandLineArgs(runtime_path, "<%s>" % key)
task_spec._input[key] = {_TYPE: _OUTPUT_FILENAME}
self._runtime_params_builder.setOutputFile(key, runtime_path)
[docs] def addLicense(self, license_string, license_tokens):
self._task_spec_builder.addLicense(license_string, license_tokens)
[docs] def setDriverReservesCores(self, reserved):
"""
If passed True, the driver will allocate N slots matching
-HOST hostname: N processes. This is useful when the driver
allocates parallel slots on one node so it can resubmit jobs to
localhost. If no N is specified, assume to be 1.
This is an optimization to avoid resubmission back to the queue
if we know we are running a set of parallel jobs with the same
CPU requirements.
"""
self._task_spec_builder.setDriverReservesCores(reserved)
[docs] def setJobUsesTPP(self, uses_tpp):
"""
If uses_tpp is True, this job will reserve the number of cores
specified from -TPP (Threads Per Process) on the command line.
Currently we use -TPP N to mean two different things:
1) Reserve N processors for the job being launched.
2) Reserve N processors for each subjob of the job being launched.
Case 1 is the case that needs uses_tpp=True.
"""
self._task_spec_builder.setJobUsesTPP(uses_tpp)
[docs]def is_file_not_found_exception(exc: OSError) -> bool:
"""
Return True if the exception indicates that a file does not exist.
"""
if exc.errno == errno.ENOENT:
return True
elif not hasattr(exc, 'winerror'):
return False
# ERROR_INVALID_NAME
elif exc.winerror == 123:
return True
return False
[docs]def are_same_file(path1: pathlib.Path, path2: pathlib.Path) -> bool:
"""
Compare if two file paths are the same, by examining the filesystem. This
does not raise an exception if either path does not exist.
"""
try:
if path1.samefile(path2):
return True
except OSError as e:
if not is_file_not_found_exception(e):
raise
return False
[docs]def resolve_to_abspath(path: pathlib.Path) -> pathlib.Path:
"""
Attempt resolution to absolute path. If the provided path does not exist,
return original path.
"""
try:
return path.resolve()
except OSError as e:
if not is_file_not_found_exception(e):
raise
return path
def _validate_output_in_subdir(output_filename: str):
"""
Raise a TaskSpecificationError if output_filename would be created outside
the cwd.
"""
cwd = pathlib.Path.cwd()
ancestor = pathlib.Path(output_filename)
while True:
if not ancestor:
break
ancestor = resolve_to_abspath(ancestor)
if are_same_file(ancestor, cwd):
return
elif ancestor == ancestor.parent:
break
ancestor = ancestor.parent
raise TaskSpecificationError(
f"Output filename {output_filename} cannot be registered outside "
f"of the launch directory {cwd}")