r"""
=================
Job incorporation
=================
If a CmdJobTask's job registers an incorporation file with job control, the file
can be accessed after the task is done via output.incorporation_file.
ComboJobTasks support the option to specify an incorporation file in the backend
of the task using JobOutput. Example::
class MyJobTask(jobtasks.ComboJobTask):
output: jobtasks.JobOutput
def mainFunction(self):
self.output.incorporation_file = 'foo.maegz' # In backend
Specifying an incorporation file in the frontend of the task will have no
effect and thus should never be done.
"""
import enum
import os
import typing
from typing import List
import schrodinger
from schrodinger.job import jobcontrol
from schrodinger.job import jobhandler
from schrodinger.job import jobwriter
from schrodinger.job import launchapi
from schrodinger.job import launchparams
from schrodinger.models import json
from schrodinger.models import jsonable
from schrodinger.models import parameters
from schrodinger.tasks import _filepaths
from schrodinger.tasks import cmdline
from schrodinger.tasks import hosts
from schrodinger.tasks import tasks
from schrodinger.tasks.hosts import strip_gpu_from_localhost
# Imported so other modules can access tasks features here.
from schrodinger.tasks.tasks import AFTER_TASKDIR # noqa: F401
from schrodinger.tasks.tasks import AUTO_TASKDIR # noqa: F401
from schrodinger.tasks.tasks import BEFORE_TASKDIR
from schrodinger.tasks.tasks import FINISHED_STATUSES # noqa: F401
from schrodinger.tasks.tasks import TEMP_TASKDIR # noqa: F401
from schrodinger.tasks.tasks import TaskFailure # noqa: F401
from schrodinger.tasks.tasks import TaskFile
from schrodinger.tasks.tasks import TaskFolder # noqa: F401
from schrodinger.tasks.tasks import TaskKilled
from schrodinger.tasks.tasks import postprocessor # noqa: F401
from schrodinger.tasks.tasks import preprocessor
from schrodinger.utils import fileutils
from schrodinger.utils import mmutil
maestro = schrodinger.get_maestro()
[docs]def is_jobtask(task):
"""
Utility function to check if an object is a jobtask.
"""
return isinstance(task, _AbstractJobMixin)
#===============================================================================
# Job Config - Hosts
#===============================================================================
[docs]class AllowedHostTypes(jsonable.JsonableEnum):
CPU_ONLY = enum.auto()
GPU_ONLY = enum.auto()
CPU_AND_GPU = enum.auto()
[docs]class HostParam(parameters.Param):
DataClass = hosts.Host
[docs] def __init__(self,
default_value='localhost',
allowed_types=AllowedHostTypes.CPU_ONLY,
*args,
**kwargs):
self.allowed_types = allowed_types
super().__init__(default_value, *args, **kwargs)
[docs]def get_hosts():
return hosts.get_hosts(excludeGPGPUs=False)
[docs]def get_default_host(allowed_host_types=AllowedHostTypes.CPU_AND_GPU):
"""
Gets the default host for a job to run on, which will be the jobhost if
this function is called from within a jobcontrol backend, or the localhost
otherwise. If the specified host type is GPU_ONLY, and the localhost and
jobhost don't have GPUs, then the returned host will be the first gpu-enabled
host returned from `get_hosts()`.
"""
default_host = hosts.get_host_by_name('localhost')
if allowed_host_types is AllowedHostTypes.GPU_ONLY and not default_host.num_gpus:
default_host = next((host for host in get_hosts() if host.num_gpus),
None)
return default_host
"""
NOT_SUPPORTED is a sentinel that can be used to signal that a host or
subjob are not supported for a particular host settings. For example::
class MyJobConfig(JobConfig):
host_settings = HostSettings(allowed_host_types=NOT_SUPPORTED)
or:
class MyJobConfig(JobConfig):
host_settings = HostSettings(num_subjobs=NOT_SUPPORTED)
"""
NOT_SUPPORTED = None
[docs]class HostSettings(parameters.CompoundParam):
"""
:cvar HOST_PLACEHOLDER_ARGS: Placeholders to pass into the -HOST argument
when no host is available based on the currently allowed host types.
"""
host = HostParam()
num_subjobs: int = None
allowed_host_types: AllowedHostTypes = AllowedHostTypes.CPU_ONLY
CPU_PLACEHOLDER_ARG = '<CPU-host-placeholder>'
GPU_PLACEHOLDER_ARG = '<GPU-host-placeholder>'
CPU_AND_GPU_PLACEHOLDER_ARG = '<host-placeholder>'
HOST_PLACEHOLDER_ARGS = {
AllowedHostTypes.CPU_ONLY: CPU_PLACEHOLDER_ARG,
AllowedHostTypes.GPU_ONLY: GPU_PLACEHOLDER_ARG,
AllowedHostTypes.CPU_AND_GPU: CPU_AND_GPU_PLACEHOLDER_ARG
}
[docs] def toCmdArg(self):
if self.host is None:
return self.getHostPlaceholderArg()
host_arg = self.host.name
if self.num_subjobs is not None:
host_arg += ':' + str(self.num_subjobs)
return strip_gpu_from_localhost(host_arg)
[docs] def initializeValue(self):
if self.allowed_host_types is not None:
self.host = get_default_host(self.allowed_host_types)
[docs] @json.adapter(version=55039)
def adapter55040(self, json_dict):
if json_dict['host'] is not None:
json_dict['host'] = json_dict['host']['name']
return json_dict
[docs] def toJsonImplementation(self):
json_dict = super().toJsonImplementation()
if self.host is not None:
json_dict['host'] = self.host.name
return json_dict
[docs] @classmethod
def fromJsonImplementation(cls, json_dict):
host_name = json_dict.pop('host')
host_settings = super().fromJsonImplementation(json_dict)
if host_name:
host_settings.host = hosts.get_host_by_name(host_name)
else:
host_settings.host = None
return host_settings
[docs] def getHostPlaceholderArg(self) -> str:
"""
Return the host placeholder argument for the currently allowed host
types.
"""
return self.HOST_PLACEHOLDER_ARGS.get(self.allowed_host_types, '')
#===============================================================================
# Job Config - Incorporation
#===============================================================================
[docs]class IncorporationMode(jsonable.JsonableEnum):
APPEND = 'append'
APPENDINPLACE = 'appendinplace'
REPLACE = 'replace'
IGNORE = 'ignore'
[docs]class IncorporationParam(parameters.EnumParam):
DEFAULT_ALLOWED_MODES = (
IncorporationMode.APPEND,
IncorporationMode.APPENDINPLACE,
IncorporationMode.IGNORE,
)
[docs] def __init__(self, *args, allowed_modes=DEFAULT_ALLOWED_MODES, **kwargs):
super().__init__(IncorporationMode, *args, **kwargs)
self.allowed_modes = allowed_modes
INCORPORATION_MODE_MAP = {
IncorporationMode.APPEND: launchparams.ProjectDisposition.APPEND,
IncorporationMode.APPENDINPLACE:
launchparams.ProjectDisposition.APPENDINPLACE,
IncorporationMode.IGNORE: launchparams.ProjectDisposition.IGNORE,
}
#===============================================================================
# Job Config
#===============================================================================
[docs]class JobConfig(parameters.CompoundParam):
"""
Subclass JobConfig to customize what job settings are available for a given
jobtask. To disable an option, set an ordinary (non-param) class variable
with value None for that option.
Subclasses may add any arbitrary options as desired; it is the
responsibility of the task to handle those options.
"""
viewname: str = None # A default value will be assigned by the task
jobname: str
host_settings: HostSettings
incorporation = None # Override with an IncorporationParam to enable
@json.adapter(version=52085)
def _adaptHost(self, json_dict):
"""
In 52085, we switched from defining host on the toplevel of jobconfig
to inside a HostSettings param.
"""
json_dict['host_settings'] = {'host': json_dict.pop('host')}
return json_dict
[docs]def job_config_factory(allowed_host_types=AllowedHostTypes.CPU_ONLY,
default_incorp_mode=None,
supports_subjobs=False,
viewname=None):
"""
Generate JobConfig objects with typical options.
:param allowed_host_types: Whether this job accepts cpu hosts, gpu hosts,
or both. Pass None to disable remote hosts (always run on localhost)
:type allowed_host_types: AllowedHostTypes or None
:param default_incorp_mode: The default disposition. Pass None for jobs that
do not incorporate at all.
:type default_incorp_mode: IncorporationMode or None
:param supports_subjobs: whether this job can be split into subjobs
:type supports_subjobs: bool
:param viewname: what viewname should be used for this type of job
:type viewname: str or None
"""
# Have to give `viewname` a different variable name before assigning it
# inside of `NewJobConfig` otherwise you get a weird NameError.
default_viewname = viewname
default_allowed_host_types = allowed_host_types
class NewJobConfig(JobConfig):
if default_incorp_mode is not None:
incorporation = IncorporationParam(default_incorp_mode)
viewname: str = default_viewname
host_settings = HostSettings(
allowed_host_types=default_allowed_host_types)
def initializeValue(self):
super().initializeValue()
if not allowed_host_types:
self.host_settings.host = None
if supports_subjobs:
self.host_settings.num_subjobs = 0
return NewJobConfig()
#===============================================================================
# Job Task Execution Mixins
#===============================================================================
[docs]class JobOutput(parameters.CompoundParam):
"""
Base class for jobtask output.
"""
incorporation_file: TaskFile = None
class _CmdJobTaskOutput(JobOutput):
"""
Base class for CmdJobTask output. All params defined thus far are intended
to be read-only, as CmdJobTask sets them during postprocessing.
Any new params defined by subclasses are safe to use as normal.
"""
output_files: List[TaskFile]
log_file: TaskFile = None
[docs]class SetJobRuntimeError(RuntimeError):
"""
An error while trying to a set a job on a task.
"""
class _AbstractJobMixin(parameters.CompoundParamMixin):
"""
Base class for running tasks via job control.
Child classes must be mixed in with `AbstractCmdTask`.
"""
# If PROGRAM_NAME is not overridden, it will default to the class name.
# Used for specifying the program name field of the job visible in the
# job monitor.
PROGRAM_NAME = None
job_config: JobConfig
input: parameters.CompoundParam
job_id = parameters.NonParamAttribute()
_use_async_jobhandler: bool = False
@json.adapter(50002)
def configToJobConfigAdapter(cls, json_dict):
json_dict['job_config'] = json_dict.pop('config')
return json_dict
@classmethod
def configureParam(cls):
"""
@overrides: parameters.CompoundParam
"""
super().configureParam()
cls.setReference(cls.name, cls.job_config.jobname)
def initializeValue(self):
"""
@overrides: paramters.CompoundParam
"""
super().initializeValue()
self._write_mode = False
if self.job_config.viewname is None:
self.job_config.viewname = type(self).__name__
def inWriteMode(self):
return self._write_mode
@classmethod
def runFromCmdLine(cls):
"""
@overrides: tasks.AbstractTask
"""
return cmdline.run_jobtask_from_cmdline(cls)
@classmethod
def _populateClassParams(cls):
"""
@overrides: tasks.AbstractTask
"""
cls._convertNestedClassToDescriptor('JobConfig', 'job_config')
super()._populateClassParams()
def runCmd(self, cmd):
"""
@overrides: tasks.AbstractCmdTask
"""
wrapped_cmd = self._wrapCmd(cmd)
self._launchCmd(wrapped_cmd)
def runToCmd(self, skip_preprocessing=False):
"""
Does the same thing as start except it doesn't actually launch the job.
Instead it just returns the final job cmd.
Intended to be used for running jobtasks on JobDJ, which requires a
job cmd rather than a task.
"""
if not skip_preprocessing:
self.runPreprocessing()
return self._wrapCmd(self.makeCmd())
def write(self, skip_preprocessing=False):
self._write_mode = True
try:
cmd = self.runToCmd(skip_preprocessing=skip_preprocessing)
sh_fname = self.getTaskFilename(self.name + '.sh')
jobwriter.write_job_cmd(cmd, sh_fname, self.getTaskDir())
finally:
self._write_mode = False
def replicate(self):
"""
@overrides: tasks.AbstractTask
"""
old_task = self
new_task = super().replicate()
new_task.job_config.setValue(old_task.job_config)
return new_task
@property
def PROGRAM_NAME(self):
return type(self).__name__
def _launchCmd(self, cmd):
if self._use_async_jobhandler:
JobHandlerClass = jobhandler.AsyncJobHandler
else:
JobHandlerClass = jobhandler.JobHandler
jhandler = JobHandlerClass(cmd,
launch_dir=self.getTaskDir(),
viewname=self.job_config.viewname)
# Need to keep a reference so that jobCompleted signal get emitted:
self._jhandler = jhandler
jhandler.jobDownloadFailed.connect(self._onJobDownloadFailed)
jhandler.jobCompleted.connect(self.__onJobCompletion)
jhandler.jobProgressChanged.connect(self._onJobProgressChanged)
if self._use_async_jobhandler:
jhandler.jobStarted.connect(self._onJobStarted)
jhandler.jobLaunchFailed.connect(self._onJobLaunchFailed)
jhandler.launchJob()
if not self._use_async_jobhandler:
self._onJobStarted(jhandler.job)
def _onJobStarted(self, job):
self.printDebug('job launched:', job.job_id)
self.job_id = job.job_id
def _getWrappedCmd(self):
cmd = self.makeCmd()
return self._wrapCmd(cmd)
def _wrapCmd(self, cmd):
return cmd
def _onJobProgressChanged(self, job, steps, total_steps, message):
self.progress = steps
self.max_progress = total_steps
self.progress_string = message
def writeStuZipFile(self):
# TODO
raise NotImplementedError
def _onJobDownloadFailed(self, job, error: str):
"""
Mark the task as failed if the job fails to download
"""
self.printDebug('job download failed')
self._recordFailure(RuntimeError(error))
@typing.final
def __onJobCompletion(self):
self.printDebug('job completed')
with self.guard():
self._onJobCompletion()
self._finish()
def _onJobCompletion(self):
"""
Hook for subclasses to customize behavior on job completion
"""
pass
def _onJobLaunchFailed(self, exception):
with self.guard():
raise exception
if self.failure_info is not None:
self.status = self.FAILED
def kill(self):
"""
@overrides: tasks.AbstractTask
"""
if self.status is not self.RUNNING:
raise RuntimeError("Can't kill a task that's not running.")
# Need to record failure before calling job.cancel(), because it can
# cause jobmanager.jobCompleted to be emitted
self._recordFailure(TaskKilled())
try:
job = self._jhandler.job
job.cancel()
if schrodinger.get_maestro():
# Non-GUI-blocking wait for jobComplete signal
self.wait()
else:
job.wait()
finally:
self._finish()
def stop(self):
if self.status is not self.RUNNING:
raise RuntimeError("Can't stop a task that's not running.")
# Need to record failure before calling job.stop(), because it can
# cause jobmanager.jobCompleted to be emitted
self._recordFailure(TaskKilled())
try:
job = self._jhandler.job
if mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER):
job.stop()
else:
job.kill()
if schrodinger.get_maestro():
# Non-GUI-blocking wait for jobComplete signal
self.wait()
else:
job.wait()
finally:
self._finish()
def setJob(self, job: jobcontrol.Job):
""""
Use given `jobcontrol.Job` to incorporate job results into the task and
run postprocessors. Example::
task = FooTask()
cmd = task.runToCommand()
job = jobcontrol.launch_job(cmd)
job.wait()
task.setJob(job)
If the job has not been downloaded, the task will be set to FAILED
with a SetJobRuntimeError.
:param job: `jobcontrol.Job` with results to incorporate into the task.
"""
self._jhandler = jobhandler.JobHandler([])
self._jhandler.job = job
self.specifyTaskDir(job.Dir)
self.name = job.Name
with self.guard():
if not job.isDownloaded():
err_msg = ("Job has not been downloaded. Check job for "
f"further info: {self._jhandler.job.JobId}")
raise SetJobRuntimeError(err_msg)
self.__onJobCompletion()
[docs]class JobBackendCmdMixin(_AbstractJobMixin):
"""
Base class for running backends that already support job control. Combine
with an AbstractCmdTask. To use, override `makeCmd`.
"""
[docs] def __init__(self, *args, cmd_list=None, **kwargs):
super().__init__(*args, **kwargs)
if cmd_list is not None:
# If this turns out to be too restrictive, feel free to remove it.
assert self.__class__ is CmdJobTask
self._cmd_list = cmd_list
[docs] def makeCmd(self):
"""
@overrides: tasks.AbstractCmdTask
Child classes must override.
"""
if self._cmd_list is not None:
return self._cmd_list
else:
return []
def _wrapCmd(self, cmd):
"""
@overrides: tasks._AbstractJobMixin
"""
cmd.extend(['-JOBNAME', self.name])
if self.job_config.host_settings.host is not None:
cmd.extend(['-HOST', self.job_config.host_settings.toCmdArg()])
if self.job_config.incorporation is not None and not self.inWriteMode():
cmd.extend(['-DISP', self.job_config.incorporation.value])
return cmd
def _onJobCompletion(self):
"""
@overrides: _AbstractJobMixin
"""
if not self._jhandler.job.succeeded():
self._recordFailure(RuntimeError("Job returned nonzero exit code."))
super()._onJobCompletion()
class _LaunchAPIMixin(_AbstractJobMixin):
"""
Base class for running python code under job control by wrapping with
launchapi. Combine with an AbstractCmdTask. To use, override `makeCmd`.
"""
_input_files: list
_output_files: list
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._license_reservations = {}
def runCmd(self, cmd):
"""
@overrides: tasks._AbstractJobMixin
Override runCmd to set SCHRODINGER_COMMANDLINE which is ultimately
used to set the job.Commandline attribute. Without setting
SCHRODINGER_COMMANDLINE ourselves, jobcontrol reports an incorrect
command.
.. WARNING:: There's a potential race condition when using asynchronous
job launching which may result in subjobs with incorrect
Commandlines.
"""
old_SCHRODINGER_COMMANDLINE = os.environ.get('SCHRODINGER_COMMANDLINE',
None)
os.environ['SCHRODINGER_COMMANDLINE'] = ' '.join(cmd)
try:
return super().runCmd(cmd)
finally:
os.environ.pop('SCHRODINGER_COMMANDLINE')
if old_SCHRODINGER_COMMANDLINE is not None:
os.environ[
'SCHRODINGER_COMMANDLINE'] = old_SCHRODINGER_COMMANDLINE
def addLicenseReservation(self, license, num_tokens=1):
"""
Add a license reservation for this job. This information is used by job
control to ensure the job is only started once the required licenses
become available.
In a preprocessor, (i.e. before launching the backend), a reservation
should be added for each license that will be checked out directly by
that backend. Example::
class GlideTask(ComboJobTask):
@preprocessor
def _reserveGlideLicense(self):
# Reserve a Glide license.
self.addLicenseReservation(license.GLIDE_MAIN)
def mainFunction(self):
# Check out the Glide license
lic = license.License(license.GLIDE_MAIN)
# ... Do computations requiring Glide ...
lic.checkin()
Licenses that will be checked out by subjobs of this job do not need
reservations added here; subjobs are responsible for their own license
reservations.
:param license: a license that will be used by the backend
:type license: module-constant from schrodinger.utils.license (e.g.
license.AUTODESIGNER)
:param num_tokens: number of tokens for this license reservations
:type num_tokens: int
"""
self._license_reservations[license] = num_tokens
def _addInputFile(self, filename):
"""
Register the given file with job control as an input file, so that
it gets copied to the job directory when the task starts.
Note: Only meant to be used within `jobtasks.py`
:param filename: Input file path.
:type filename: str
"""
self._input_files.append(filename)
def _addOutputFile(self, filename):
"""
Register the given file with job control as an output file, so that
it gets copied to the launch directory after the tasks completes.
Note: Only meant to be used within `jobtasks.py`
:param filename: Input file path.
:type filename: str
"""
self._output_files.append(filename)
def initConcrete(self):
super().initConcrete()
self._input_dirs = []
def addInputDirectory(self, directory):
"""
Add an input directory to be copied over with the job.
"""
self._input_dirs.append(directory)
def wrapCmdInLaunchApi(self, cmd):
job_spec = self.makeJobSpecFromCmd(cmd)
launch_parameters = self.makeLaunchParams()
if self._write_mode:
# This is a temporary fix to achieve portability of written jobs
# and can be removed after PANEL-19244
with fileutils.chdir(self.getTaskDir()):
full_cmd = jobcontrol._get_job_spec_launch_command(
job_spec, launch_parameters, write_output=True)
else:
full_cmd = jobcontrol._get_job_spec_launch_command(
job_spec, launch_parameters)
return full_cmd
def makeJobSpecFromCmd(self, cmd):
cmd = list(map(str, cmd))
job_builder = launchapi.JobSpecificationArgsBuilder(cmd)
for license, num_tokens in self._license_reservations.items():
job_builder.addLicense(license, num_tokens)
logfilename = self._getLogFilename()
job_builder.setStderr(logfilename, stream=True)
job_builder.setStdout(logfilename, stream=True)
for filename in self._input_files:
job_builder.setInputFile(filename)
for dir in self._input_dirs:
job_builder.setInputDirectory(dir)
self._output_files = list(set(self._output_files))
for filename in self._output_files:
job_builder.setOutputFile(os.path.basename(filename))
job_builder.setProgramName(self.PROGRAM_NAME)
return job_builder.getJobSpec()
def makeLaunchParams(self):
launch_parameters = launchparams.LaunchParameters()
if self.job_config.host_settings.host is not None:
launch_parameters.setHostname(
self.job_config.host_settings.host.name)
launch_parameters.setJobname(self.name)
if self.job_config.host_settings.num_subjobs is not None:
launch_parameters.setNumberOfSubjobs(
self.job_config.host_settings.num_subjobs)
if maestro and self.job_config.incorporation is not None and not \
self.inWriteMode():
proj_disp = INCORPORATION_MODE_MAP[self.job_config.incorporation]
launch_parameters.setMaestroProjectDisposition(proj_disp)
pt = maestro.project_table_get()
launch_parameters.setMaestroProjectName(pt.project_name)
return launch_parameters
def _wrapCmd(self, cmd):
"""
@overrides: _AbstractJobMixin
"""
return self.wrapCmdInLaunchApi(cmd)
def _getLogFilename(self):
return self.name + '.log'
def getLogAsString(self) -> str:
log_fn = self.getTaskFilename(self._getLogFilename())
if not os.path.isfile(log_fn):
return f'Log file not found: {log_fn}'
with open(log_fn) as log_file:
return log_file.read()
@json.adapter(version=55013)
def adapter55013(self, json_dict):
json_dict['_input_files'] = json_dict.pop('input_files')
json_dict['_output_files'] = json_dict.pop('output_files')
return json_dict
@json.adapter(version=56046)
def _adaptIncorporationFile(self, json_dict):
"""
In 56046, we switched from defining incorporation_file as a top-level
param to only allowing it inside of `output`.
"""
if 'incorporation_file' in json_dict.keys():
json_dict['output'] = {
'incorporation_file': json_dict.pop('incorporation_file')
}
return json_dict
[docs]class ComboJobMixin(_LaunchAPIMixin):
"""
Base class for running python code using the "combo" task pattern. Combine
with AbstractComboTask. To use, define:
mainFunction (or, equivalently backendMain): the python function that
will be executed in the backend process under job control.
"""
# Jobtask-specific params to serialize in frontend/backend conversions
_JOBTASK_SERIALIZATION_PARAMS = [
'job_config', '_use_async_jobhandler', '_input_files', '_output_files'
]
[docs] def setJob(self, job: jobcontrol.Job):
for filename in job.OutputFiles:
if filename.endswith('_backend.json') and filename.startswith('.'):
_, self._combo_id, _ = filename.rsplit('_', maxsplit=2)
super().setJob(job)
[docs] def write(self, skip_preprocessing=False):
try:
super().write(skip_preprocessing)
finally:
self._regenerateComboId()
def _writeFrontendJsonFile(self):
"""
@overrides: AbstractComboTask
"""
self._addInputFile(_filepaths.get_launch_path(self.json_filename))
self._addOutputFile(os.path.basename(self.json_out_filename))
def register_input(path, launchdir):
if os.path.isdir(path):
dummy_file = os.path.join(path, ".dummy_file")
with open(dummy_file, 'w'):
pass
self.addInputDirectory(path)
else:
self._addInputFile(path)
return path
# Need to do this in preprocessing for now. In PANEL-19244 we will
# register files in the launchapi layer so we can remove this.
self._processTaskFilesForFrontendWrite()
self._assertTaskFileExistence(self.input)
self._processTaskFiles(self.input, process_func=register_input)
super()._writeFrontendJsonFile()
def _copyScriptToBackend(self):
script_filename = super()._copyScriptToBackend()
self._addInputFile(script_filename)
return script_filename
[docs] def makeCmd(self):
"""
@overrides: tasks.AbstractCmdTask
Child classes must not override this method.
"""
cmd = super().makeCmd()
assert cmd[0] == 'run'
cmd.pop(0)
return cmd
def _getFrontEndJsonArg(self):
arg = super()._getFrontEndJsonArg()
return _filepaths.get_launch_path(arg)
[docs] def isBackendMode(self):
is_backend_mode = self._run_as_backend
return is_backend_mode
[docs] def getTaskDir(self):
taskdir = super().getTaskDir()
if jobcontrol.get_backend():
# workaround for JOBCON-6136
return os.path.relpath(taskdir)
return taskdir
[docs] def getTaskFilename(self, fname):
task_fname = super().getTaskFilename(fname)
if jobcontrol.get_backend():
# workaround for JOBCON-6136
return os.path.relpath(task_fname)
return task_fname
def _onJobCompletion(self):
"""
@overrides: _AbstractJobMixin
"""
self._processBackend()
super()._onJobCompletion()
[docs] def runBackend(self):
"""
@overrides: AbstractComboTask
"""
super().runBackend()
self._registerBackendOutputFiles()
def _onBackendProgressChanged(self):
backend = jobcontrol.get_backend()
backend.setJobProgress(self.progress,
self.max_progress,
description=self.progress_string)
def _registerBackendOutputFiles(self):
"""
Called by the backend to make sure any dynamically added output files
get registered with job control to be copied back to the launch dir.
"""
backend = jobcontrol.get_backend()
if backend is None:
# Running in-process, usually as a test or debugging.
return
for file in self._output_files:
if isinstance(self.output,
JobOutput) and file == self.output.incorporation_file:
backend.setStructureOutputFile(file)
else:
backend.addOutputFile(file)
def _getSerializationParamNames(self) -> List[str]:
"""
@overrides AbstractComboTask
Include jobtask-specific param names in the list of param names to
serialize.
"""
return super()._getSerializationParamNames() + \
self._JOBTASK_SERIALIZATION_PARAMS
#===============================================================================
# Prepackaged Job Task Classes
#===============================================================================
[docs]class CmdJobTask(JobBackendCmdMixin, tasks.AbstractCmdTask):
"""
Class for running backends that already support jobcontrol.
CmdJobTask can either be subclassed to implement custom input/output params
and other behavior, or can be instantiated and run directly by supplying the
optional cmd_list constructor argument. For example:
task = jobtasks.CmdJobTask(cmd_list=['testapp', '-t', '1'])
task.start()
CmdJobTask has a standard output that auto-populates with output files
from its respective job. Custom output classes must inherit
CmdJobTask.Output.
Note that specifying cmd_list will bypass some custom functionality and
should not be used with CmdJobTask subclasses.
"""
Output = _CmdJobTaskOutput
[docs] def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not isinstance(self.output, CmdJobTask.Output):
msg = (f'Expected {self.__class__.__name__}.Output to inherit'
f' CmdJobTask.Output.')
raise TypeError(msg)
def _onJobCompletion(self):
self._populateOutput()
super()._onJobCompletion()
def _populateOutput(self):
"""
Populate output with the files available from the completed or failed
job.
"""
try:
job = self._jhandler.job
except AttributeError:
# _jhandler may not exist when running just postprocessors
return
struct_file = self.getTaskFilename(job.StructureOutputFile)
if struct_file and os.path.exists(struct_file):
self.output.incorporation_file = struct_file
if job.LogFiles:
log_file = self.getTaskFilename(job.LogFiles[0])
self.output.log_file = log_file if os.path.exists(
log_file) else None
output_files = [
self.getTaskFilename(fname) for fname in job.OutputFiles
]
self.output.output_files = [
fname for fname in output_files if os.path.exists(fname)
]
WINDOWS_SEP = '\\'
POSIX_SEP = '/'
[docs]class ComboJobTask(ComboJobMixin, tasks.AbstractComboTask):
#===========================================================================
# TaskFile Processing
#===========================================================================
def _processTaskFilesForFrontendWrite(self):
# Override parent class behavior to make all paths relative to the CWD
# instead of the launch dir. This will be fixed in PANEL-19244 where the
# launchdir will be changed to the taskdir.
self._assertTaskFileExistence(self.input)
self._processTaskFiles(self.input,
process_func=_filepaths.get_launch_path,
dir=os.getcwd())
def _processTaskFilesForBackendExecution(self):
self._processTaskFiles(self.input,
process_func=_filepaths.get_job_backend_path)
self._assertTaskFileExistence(self.input)
def _processTaskFilesForBackendWrite(self):
# Extends parent class behavior to add a dummy file to all directories
# because jobserver will not copy empty directories to the backend.
super()._processTaskFilesForBackendWrite()
def process_output(path, launchdir):
if os.path.isdir(path):
dummy_file = os.path.join(path, ".dummy_file")
with open(dummy_file, 'w'):
pass
self._addOutputFile(path)
return path
self._assertTaskFileExistence(self.output)
self._processTaskFiles(self.output, process_func=process_output)
def _processTaskFilesForBackendRehydration(self):
self._processTaskFiles(self.output,
process_func=_filepaths.get_job_output_path)
self._assertTaskFileExistence(self.output)
[docs] def runBackend(self):
# Specify the task dir as the cwd since we've already chdirs into
# the directory with all the task files
self.specifyTaskDir(None)
return super().runBackend()
[docs] def getTaskDir(self):
if self.isBackendMode():
return ''
return super().getTaskDir()
def _updateFromBackend(self, rehydrated_backend):
super()._updateFromBackend(rehydrated_backend)
self._output_files = rehydrated_backend._output_files
@preprocessor(BEFORE_TASKDIR - 1000)
def _clearInputAndOutputFiles(self):
self._input_files.clear()
self._output_files.clear()