import enum
import os
import re
import warnings
from typing import Union
from schrodinger.utils import mmutil
[docs]@enum.unique
class ProjectDisposition(enum.Enum):
NONE = "None" # Unspecified
APPEND = "append" # append new entries
APPEND_FIT = "append:fit" # force a workspace fitting after incorporation
IGNORE = "ignore" # don't incorporate structures
CUSTOM = "custom" # Incorporate the job with custom incorporation only,
# as set by maestro.job_incorporation_function_add
# You probably don't want to use this option, think
# carefully before doing so.
ADDTOENTRY = "addtoentry" # To use this value, you must set by a string to
# a value like addtoentry_{entry_id} since the
# entry_id belongs to the current project.
# The following status MAY be supported by maestro, if you use one of
# them, please add a comment saying WHAT the purpose of it is. Otherwise,
# try not to use them.
APPENDINPLACE = "appendinplace"
APPENDINPLACE_FIT = "appendinplace:fit" # force a workspace fitting after incorporation
APPENDUNGROUPED = "appendungrouped"
REPLACE = "replace"
REPLACE_FIT = "replace:fit" # force a workspace fitting after incorporation
WORKSPACE = "workspace"
class _MaestroParameters:
def __init__(self):
self._project_name = None
self._project_disposition = ProjectDisposition.NONE
self._project_dispostion_add_to_entry = None
self._viewname = None
[docs]class LaunchParameters:
"""
These are parameters that can change with each job launch.
"""
[docs] def __init__(self):
self._hostname = None
self._maestro_params = _MaestroParameters()
# Initialize to none, since there is a different meaning in
# -HOST localhost:1 and -HOST localhost, sometimes we have
# none as "as many as possible"
self._nsubjobs = None
self._nprocessors_many_nodes = 1
self._nprocessors_one_node = 1
self._launch_directory = None
self._jobname = None
self._save_output = False
self._save_subjob_output = False
self._debug_level = 0
self._delete_launch_dir = False
self._oplsdir = None
self._original_hosts = None
self._driverhost = None
self._subhost = None
self._tpp = None
[docs] def setDebugLevel(self, debug_level):
"""
:param save: if True, copy back full output directory as zip
:type save: bool
"""
self._debug_level = debug_level
[docs] def getDebugLevel(self):
"""
Returns debug level (integer)
"""
return self._debug_level
[docs] def setDeleteAfterIncorporation(self, delete):
"""
:param delete: if True, the directory the job was launched from
is deleted after incporation
:type delete: bool
"""
self._delete_launch_dir = delete
[docs] def setSaveAllJobOutput(self, save):
"""
:param save: if True, copy back full output directory as zip
:type save: bool
"""
self._save_output = save
[docs] def setSaveAllSubJobOutput(self, save):
"""
:param save: if True, copy back full output directories of subjobs as zip
:type save: bool
"""
self._save_subjob_output = save
[docs] def setLaunchDirectory(self, launch_directory):
"""
Sets cwd for job launch. This avoids a chdir command before calling
launch potentially allowing for more than one launch from maestro.
"""
self._launch_directory = launch_directory
[docs] def getLaunchDirectory(self):
"""
Sets cwd for job launch. This avoids a chdir command before calling
launch potentially allowing for more than one launch from maestro.
"""
return self._launch_directory
def _setOriginalHosts(self, cmdline_str):
"""
Sets the original string from the cmdline for the -HOST entry since
it may include multiple entries.
"""
self._original_hosts = cmdline_str
def _setDriverHostname(self, hostname):
"""
Sets a DRIVERHOST hostname. This is an artifact of when we supported
remote and local drivers.
NOTE: This option is ONLY utilized by toplevel.py since it only is
used as an environment variable
"""
if re.search('[ :,].*', hostname):
raise RuntimeError(f"{hostname} must be a single hostname")
self._driverhost = hostname
def _setSubHost(self, hostname):
"""
-HOST argument typically refers to the HOST that will run the driver.
This host is used to run subjobs.
NOTE: This option is ONLY utilized by toplevel.py since it only is
used as an environment variable
"""
self._subhost = hostname
[docs] def setHostname(self, hostname):
"""
Hostname is the name of the host to run the job on. This name
needs to correspond to something in schrodinger.hosts file or
DNS-resolveable.
:param hostname: name of host
:type hostname: str
"""
self._hostname = hostname
[docs] def getHostname(self):
"""
Returns hostname is the name of the host to run the job on. This name
needs to correspond to something in schrodinger.hosts file or
DNS-resolveable.
"""
if self._driverhost:
return self._driverhost
return self._hostname
[docs] def getSubHostName(self) -> Union[str, None]:
"""
Returns None if -SUBHOST isn't set and the hostname if -SUBHOST is set.
"""
if self._subhost:
return self._subhost
[docs] def getJobname(self):
"""
Return the jobname.
"""
return self._jobname
[docs] def setJobname(self, jobname):
"""
Jobname is a non mandatory variable that can be used to indicate
name of job in job record.
:param jobname: name of job
:type jobname: str
"""
self._jobname = jobname
[docs] def getMaestroViewname(self):
"""
Viewname is used by a maestro panel to identify a job
as belonging to a panel.
:returns: viewname name of panel
:rtype: str
"""
return self._maestro_params._viewname
[docs] def setMaestroViewname(self, viewname):
"""
Viewname is used by a maestro panel to identify a job
as belonging to a panel.
:param viewname: name of panel
:type viewname: str
"""
# Should we assert this is a str or could be coerced into one?
# Nearly all python objects can be coerced into strings, but we have
# a bug if you point this to a instance value because it will change
# everytime
self._maestro_params._viewname = viewname
[docs] def getMaestroProjectDisposition(self):
"""
:returns ProjectDisposition
"""
return self._maestro_params._project_disposition
[docs] def getMaestroProjectDispositionString(self):
"""
:returns str matching project disposition, appropriate for cmdline
"""
disp = self.getMaestroProjectDisposition()
if disp is not ProjectDisposition.ADDTOENTRY:
return disp.value
return "{disp}_{entry_id}".format(
disp=disp.value,
entry_id=self._maestro_params._project_disposition_entry_id)
[docs] def setMaestroProjectDisposition(self, disp):
"""
Mark the state of project incorporation.
:param disp: should we incorporate?
:type disp: ProjectDisposition
"""
if not isinstance(disp, ProjectDisposition):
try:
disp = ProjectDisposition[disp.upper()]
except KeyError:
addtoentry_prefix = ProjectDisposition.ADDTOENTRY.value.upper(
) + "_"
# We can't use ":" in a enum name, so
# we special case this
if disp.upper() == "APPEND:FIT":
disp = ProjectDisposition.APPEND_FIT
elif disp.upper() == "APPENDINPLACE:FIT":
disp = ProjectDisposition.APPENDINPLACE_FIT
elif disp.upper() == "REPLACE:FIT":
disp = ProjectDisposition.REPLACE_FIT
elif disp.upper().startswith(addtoentry_prefix):
entry_id = int(disp[len(addtoentry_prefix):])
disp = ProjectDisposition.ADDTOENTRY
self._maestro_params._project_disposition_entry_id = entry_id
else:
raise
# Should we assert this is the enum, or the enum's values?
self._maestro_params._project_disposition = disp
[docs] def getMaestroProjectName(self):
"""
Returns associated project for a job, marking which project should do
the incorporation of results.
:returns: path to project
:rtype: str
"""
return self._maestro_params._project_name
[docs] def setMaestroProjectName(self, project):
"""
Set associated project for a job, marking which project should do
the incorporation of results.
:param project: path to project
:type project: str
"""
# Should we assert this path exists?
self._maestro_params._project_name = project
[docs] def setNumberOfProcessorsManyNodes(self, nprocessors):
"""
Set number of processors to allocate to a job. These may be across
multiple nodes. Used by MPI jobs to indicate how many processors the
queuing system needs to allocate.
:param nprocessors: number of processors requested for this job
:type nprocessors: int greater than 0
"""
self._nprocessors_many_nodes = int(nprocessors)
[docs] def setNumberOfSubjobs(self, nsubjobs):
"""
Set number of subjobs that a job could run. This is used by workflow jobs
to indicate that a driver job might run and use subjobs. This value is NOT
used by the queueing system, but can be accessed from inside the job.
:param nsubjobs: number of subjobs
:type nprocesses: int greater than 0
"""
self._nsubjobs = int(nsubjobs)
[docs] def getNumberOfProcessorsManyNodes(self):
"""
Get number of processors to allocate to a job. Used by MPI jobs.
For example 4 processors = 4 nodes allocated by single jobs.
"""
return self._nprocessors_many_nodes
[docs] def getNumberOfSubjobs(self):
"""
Get number of subjobs that a job may run. This value is used by product backend
(not jobcontrol).
In a workflow job, 4 process = 4 jobs running simultaneously.
"""
return self._nsubjobs
[docs] def setNumberOfProcessorsOneNode(self, nprocessors):
"""
Set number of processors on one node. Useful for OpenMP-type jobs,
indicates to the queueing system to allocate the requisite number of
processors on the same node..
:param nprocessors: number of processors requested for this job
:type nprocessors: int greater than 0
"""
self._nprocessors_one_node = int(nprocessors)
[docs] def getNumberOfProcessorsOneNode(self):
"""
Return number of processors on one node. Useful for OpenMP-type jobs,
indicates to the queueing system to allocate the requisite number of
processors on the same node.
:returns: number of processors requested for this job
:rtype: int greater than 0
"""
return self._nprocessors_one_node
[docs] def getTPP(self):
"""
Return the TPP option set by toplevel.py only.
"""
return self._tpp
[docs] def setTPP(self, tpp):
"""
Sets the TPP option on the cmdline. This will not be used by the parent
job unless jobUsesTPP is set in the job specification.
"""
self._tpp = tpp
[docs] def setOPLSDir(self, oplsdir):
self._oplsdir = oplsdir
[docs] def getNumberOfQueueSlots(self):
"""
Return the number of slots we need to request for the queueing system.
This value current maps to NPROC. If we don't know how many, return None.
"""
return self._nprocessors_many_nodes * self._nprocessors_one_node
[docs] def verify(self):
disp = self._maestro_params._project_disposition
if disp != ProjectDisposition.NONE and not self._maestro_params._project_name:
if disp == ProjectDisposition.IGNORE:
warnings.warn((
"No project was specified, but a disposition of {} was given. "
+ "You should not be setting disposition without a project."
).format(disp), DeprecationWarning)
else:
raise RuntimeError(
"Setting a project disposition {} without a project name is invalid"
.format(disp))
if self._subhost:
if not self._hostname:
hostarg = self._driverhost or "localhost"
warnings.warn((
"No HOST was specified, but a SUBHOST was given. A HOST of {} is being used for this job, "
+
"but this usage will result in an error in future releases."
).format(hostarg), DeprecationWarning)
self._hostname = hostarg
if self._original_hosts:
first_host = self._original_hosts.split()[0].split(',')[0]
if first_host != self._original_hosts:
warnings.warn((
"HOST arguments containing multiple entries (\"{}\") are discouraged. "
+ "They may not be supported in a future release").format(
self._original_hosts), UserWarning)
if self._subhost:
first_subhost = self._subhost.split()[0].split(',')[0]
if first_subhost != self._subhost:
warnings.warn((
"SUBHOST arguments containing multiple entries (\"{}\") are discouraged. "
+ "They may not be supported in a future release").format(
self._subhost), UserWarning)
[docs] def constructHostArg(self):
if self._hostname:
host = self._hostname
if self._nsubjobs:
host += f":{self._nsubjobs}"
return host
[docs] def convertToJLaunchOptions(self):
"""
Validate state of Launch Parameters (may throw RuntimeError)
and return list of cmdline parameters.
:returns: list of str
"""
self.verify()
cmdline = []
if self.getHostname():
cmdline.extend(["-HOST", self.getHostname()])
env = self.convertToEnv()
cmdline.extend(
["-schrodinger_nodelist", env["SCHRODINGER_NODELIST"]])
if self._maestro_params._project_name:
cmdline.extend(["-PROJ", self._maestro_params._project_name])
if self._maestro_params._viewname:
cmdline.extend(["-VIEWNAME", self._maestro_params._viewname])
if self.getMaestroProjectDisposition() != ProjectDisposition.NONE:
cmdline.extend(["-DISP", self.getMaestroProjectDispositionString()])
if self._nprocessors_one_node > 1:
cmdline.extend(["-TPP", f"{self._nprocessors_one_node}"])
if self._oplsdir:
cmdline.extend(["-OPLSDIR", self._oplsdir])
if self._save_output:
cmdline.append("-SAVE")
if self._save_subjob_output and mmutil.feature_flag_is_enabled(
mmutil.JOB_SERVER):
# Check the feature flag, to make sure the new argument doesn't get passed
# to legacy jobcontrol
cmdline.append("-SAVESUBJOBS")
if self._debug_level:
# -DDEBUG is only valid to toplevel.py, it gets stripped
# to -DEBUG for jlaunch
cmdline.append("-DEBUG")
if self._jobname:
cmdline.extend(["-name", self._jobname])
if self._delete_launch_dir:
cmdline.append("-tmplaunchdir")
return cmdline
[docs] def convertToEnv(self):
"""
Converts LaunchParameters to environment. Not all parameters
are supported. This option should be limited to use by toplevel.py
if possible.
"""
self.verify()
# Should we raise if there's a an argument that's not suitable?
# NJOBS, OPLSDIR ?
env = {}
if self._save_output:
env["SCHRODINGER_SAVE_JOBDIR"] = "yes"
if self._save_subjob_output:
env["SCHRODINGER_SAVE_SUBJOBDIR"] = "yes"
if self._debug_level:
env["SCHRODINGER_JOB_DEBUG"] = f"{self._debug_level}"
if self._maestro_params._project_name:
env["SCHRODINGER_PROJECT"] = self._maestro_params._project_name
if self._maestro_params._viewname:
env["SCHRODINGER_VIEWNAME"] = self._maestro_params._viewname
if self.getMaestroProjectDisposition() != ProjectDisposition.NONE:
env["SCHRODINGER_JOB_DISPOSITION"] = self.getMaestroProjectDispositionString(
)
if self._nprocessors_one_node > 1:
env["SCHRODINGER_TPP"] = f"{self._nprocessors_one_node}"
if self._jobname:
env["SCHRODINGER_JOBNAME"] = self._jobname
if self._delete_launch_dir:
env["SCHRODINGER_TMP_LAUNCHDIR"] = "yes"
if self._oplsdir:
env["OPLS_DIR"] = self._oplsdir
if self.getHostname():
toplevel_host_args = []
original_hosts = self._original_hosts or self.constructHostArg()
env["JOBHOST"] = self.getHostname()
if self._driverhost:
toplevel_host_args.extend(["-DRIVERHOST", self._driverhost])
if self._subhost:
env["SCHRODINGER_NODELIST"] = self._subhost
toplevel_host_args.extend(["-SUBHOST", self._subhost])
else:
env["SCHRODINGER_NODELIST"] = original_hosts or ""
if original_hosts:
toplevel_host_args.extend(["-HOST", original_hosts])
env['TOPLEVEL_HOST_ARGS'] = " ".join(toplevel_host_args)
env['HOST_ARGS'] = "-HOST {}".format(env["JOBHOST"])
env['ORIG_HOST_ARGS'] = '-HOST "{}"'.format(
env["SCHRODINGER_NODELIST"])
return env
[docs] def consumeCommandLine(self, cmdline):
"""
This consumes arguments from cmdline. The primary purpose is to consume
arguments from toplevel $SCHRODINGER/run.
:returns cmdline with options filtered out.
"""
output_cmdline = []
skip_arg = False
for i, arg in enumerate(cmdline):
if skip_arg:
skip_arg = False
elif arg in ("-DETACHED", "-ATTACHED"):
continue
elif arg == "-SAVE":
self.setSaveAllJobOutput(True)
elif arg == "-SAVESUBJOBS":
self.setSaveAllSubJobOutput(True)
# Debugging output
elif arg in ['-D', '-DEBUG']:
self.setDebugLevel(1)
output_cmdline.append("-DEBUG")
# Debugging output
elif arg in ['-DD', '-DDEBUG']:
self.setDebugLevel(2)
output_cmdline.append("-DEBUG")
elif arg == "-JOBNAME":
_require_argument(arg, cmdline, i)
self.setJobname(cmdline[i + 1])
skip_arg = True
elif arg == "-OPLSDIR":
_require_argument(arg, cmdline, i)
oplsdir = cmdline[i + 1]
if oplsdir == 'custom':
# keep this rare import here so launchparams doesn't
# require an mmlibs license in the general case.
from schrodinger.utils import fileutils
local_appdata_dir = fileutils.get_local_appdata_dir()
oplsdir = os.path.join(local_appdata_dir, 'opls_dir')
self.setOPLSDir(oplsdir)
skip_arg = True
elif arg == '-PROJ':
_require_argument(arg, cmdline, i)
self.setMaestroProjectName(cmdline[i + 1])
skip_arg = True
elif arg == '-DISP':
_require_argument(arg, cmdline, i)
self.setMaestroProjectDisposition(cmdline[i + 1])
skip_arg = True
elif arg == '-VIEWNAME':
_require_argument(arg, cmdline, i)
self.setMaestroViewname(cmdline[i + 1])
skip_arg = True
# Send back an archive of the job directory
elif arg == '-TMPLAUNCHDIR':
self.setDeleteAfterIncorporation(True)
# Host args
elif arg == "-HOST":
_require_argument(arg, cmdline, i)
host_arg = cmdline[i + 1].strip("\"'")
self._setOriginalHosts(host_arg)
if mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER):
if "," in host_arg or " " in host_arg.strip():
raise SyntaxError(
f"Only a single -HOST <host> argument can be used, the currently supplied argument is {host_arg}"
)
# Apparently, these are valid
# -HOST myhost:2,myotherhost
# -HOST myhost:2 myotherhost
# -HOST myhost,myotherhost
first_host = host_arg.split()[0].split(',')[0]
if ":" in first_host:
first_host, nproc = first_host.split(":")
self.setNumberOfSubjobs(nproc)
self.setHostname(first_host)
skip_arg = True
elif arg == "-DRIVERHOST":
_require_argument(arg, cmdline, i)
self._setDriverHostname(cmdline[i + 1])
skip_arg = True
elif arg == "-SUBHOST":
_require_argument(arg, cmdline, i)
self._setSubHost(cmdline[i + 1])
skip_arg = True
elif arg == "-MPICORES":
_require_argument(arg, cmdline, i)
self.setNumberOfProcessorsManyNodes(cmdline[i + 1])
skip_arg = True
else:
if arg == "-TPP":
_require_argument(arg, cmdline, i)
self.setTPP(cmdline[i + 1])
output_cmdline.append(arg)
return output_cmdline
[docs]class ArgumentParserError(Exception):
pass
def _require_argument(arg_name, cmdline, index):
if index + 1 >= len(cmdline):
raise ArgumentParserError(f"Argument {arg_name} requires an argument")