"""
Job runners are subclasses of task runners. See the tasks module for more
general information.
"""
import copy
import enum
import os
import schrodinger
from schrodinger import project
from schrodinger.infra import jobhub
from schrodinger.job import jobcontrol
from schrodinger.job import jobwriter
from schrodinger.Qt import QtCore
from schrodinger.ui.qt.appframework2 import tasks
from schrodinger.ui.qt.appframework2.validation import validator
from schrodinger.utils import fileutils
maestro = schrodinger.get_maestro()
SCHRODINGER_VAR = '${SCHRODINGER}'
CHDIR_MUTEX = QtCore.QMutex()
CHDIR_LOCKED_TEXT = ('Another panel is submitting a job. Please wait a few '
'seconds and try again.')
RunMode = enum.Enum('RunMode', ('START', 'WRITE', 'STU'))
[docs]class JobOptions(object):
"""
A simple class for storing the options for a particular job runner. These
options pertain to a runner, not to a specific job. For example, the host
parameter only determines whether a host can be specified for this job type,
not which host to use for any particular run.
"""
[docs] def __init__(self):
self.incorporation = True
self.default_disp = 'ignore'
self.host = True
self.cpus = False
self.gpus = False
self.create_job_dir = True
[docs] def toDict(self):
settings = {}
for name in self.__dict__:
settings[name] = getattr(self, name)
return settings
[docs]class JobConfig(object):
"""
Holds the standard configuration settings for a particular job run. The
job options determine what settings are available.
"""
[docs] def __init__(self, job_options):
"""
:param job_options: the options for the associated job type.
:type job_options: JobOptions
"""
self.disp = job_options.default_disp
self.job_options = job_options
self.jobname = ''
self.host = 'localhost'
self.cpus = 1
self.viewname = 'job'
[docs] def applySettings(self, settings):
for alias, value in settings.items():
if not hasattr(self, alias):
raise KeyError('Alias not found: %s' % alias)
setattr(self, alias, value)
[docs] def hostFlag(self):
if self.job_options.cpus:
return ['-HOST', '%s:%s' % (self.host, self.cpus)]
else:
return ['-HOST', self.host]
[docs] def dispFlag(self):
if not maestro:
return []
return ['-DISP', self.disp]
[docs] def projFlag(self):
if not maestro:
return []
try:
pt = maestro.project_table_get()
except project.ProjectException:
return []
return ['-PROJ', pt.project_name]
[docs] def viewnameFlag(self):
if not maestro:
return []
return ['-VIEWNAME', self.viewname]
[docs] def appendFlags(self, cmdlist, run_mode=RunMode.START):
"""
Takes a cmdlist and appends the standard configuration flags appropriate
for the context. This will depend on whether the intent is to start or
write the job and whether maestro is available.
:param cmdlist: the original command list
:type cmdlist: list
:param run_mode: which action is being taken - start, write, or STU
:type run_mode: RunMode
"""
# Note: no -HOST flag for STU jobs
if self.job_options.host and run_mode != RunMode.STU:
cmdlist.extend(self.hostFlag())
if run_mode != RunMode.START:
return cmdlist
cmdlist.extend(self.projFlag())
cmdlist.extend(self.viewnameFlag())
if self.job_options.incorporation:
cmdlist.extend(self.dispFlag())
# Tell job control that launch directory should be removed as well
# when removing all job files, PANEL-2164:
if self.job_options.create_job_dir:
cmdlist.append('-TMPLAUNCHDIR')
return cmdlist
[docs] def summaryText(self):
"""
Generates the text to display in the status bar via the updateStatusText
method.
"""
options = self.job_options
text_items = []
if options.host:
text = 'Host=' + self.host
if options.cpus:
text += ':' + str(self.cpus)
text_items.append(text)
if options.incorporation:
text_items.append('Incorporate=' + self.disp)
return ', '.join(text_items)
#===============================================================================
# Job Wrapper Classes
#===============================================================================
[docs]class JobWrapper(tasks.AbstractTaskWrapper):
"""
Wraps jobcontrol.Job objects to present a common interface for af2.tasks.
See tasks.AbstractTaskWrapper for more information.
"""
TASK_CLASS = jobcontrol.Job
[docs] def __init__(self, job, settings=None, name='', **kwargs):
tasks.AbstractTaskWrapper.__init__(self, job, settings, name, **kwargs)
self._status = 'None'
[docs] def isRunning(self):
# Preemptive job ID
try:
job = jobhub.get_cached_job(self._task.job_id)
except jobhub.StdException:
# Job record is missing, so do not update self._task
pass
else:
self._task = job
return not self._task.isComplete()
[docs] def status(self):
# Preemptive job ID
try:
job = jobhub.get_cached_job(self._task.job_id)
except jobhub.StdException:
# Job record is missing, so do not update self._task
pass
else:
self._task = job
if self._task.Status == 'completed':
return self._task.ExitStatus
return self._task.Status
[docs] def jobId(self):
return self._task.job_id
[docs]class WrittenJobWrapper(tasks.AbstractTaskWrapper):
"""
This is basically an empty wrapper for representing a written job.
"""
# In this case the task is simply the filename of the .sh script
TASK_CLASS = str
[docs] def isRunning(self):
return False
[docs] def status(self):
return 'written'
[docs] def jobId(self):
return None
[docs] def filename(self):
return self._task
#===============================================================================
# Runner classes
#===============================================================================
[docs]class BaseJobRunner(tasks.AbstractTaskRunner):
"""
A job runner is a type of task runner that performs its task via launching
a job under job control.
"""
[docs] def __init__(self, messaging_callback=None, settings_callback=None):
tasks.AbstractTaskRunner.__init__(self,
messaging_callback=messaging_callback,
settings_callback=settings_callback)
self.tasks_by_job_id = {}
self.run_mode = None
self.orig_dir = ''
options = self.jobOptions()
self.job_config = JobConfig(options)
self.job_config.viewname = self.viewname()
self.job_timer = QtCore.QTimer()
self.job_timer.setInterval(1000)
self.job_timer.timeout.connect(self.update)
self.cached_namelist = os.listdir(".")
def _getTakenNames(self):
"""
Returns the cached contents of the cwd as names that are "taken" in the
sense that using them as job names would result in an overwrite. See
parent class for more information.
"""
return self.cached_namelist
def _preFlight(self):
"""
In addition to the parent class _preFlight logic, this will also create
the job directory (prompt for overwrite if necessary and offer to
abort), and chdir into the job directory. The original directory will be
stored as self.orig_dir.
self.orig_dir having a value signifies that a chdir has been performed.
It is the responsibility of the calling code to restore the orig_dir and
set self.orig_dir back to an empty string.
"""
if tasks.AbstractTaskRunner._preFlight(self):
if self.jobOptions().create_job_dir:
if not CHDIR_MUTEX.tryLock():
self.warning(CHDIR_LOCKED_TEXT)
return False
self.orig_dir = os.getcwd()
job_dir = self.createJobDir()
if not job_dir:
return False
os.chdir(job_dir)
return True
return True
return False
[docs] def start(self):
# See parent class for details
self.run_mode = RunMode.START
try:
success = tasks.AbstractTaskRunner.start(self)
finally:
self.resetState()
if success:
self.status('Job started')
[docs] def postProcess(self, task):
"""
Download job outputs after the job completes.
"""
super().postProcess(task)
jobid = task.jobId()
if jobid:
job = jobhub.get_cached_job(jobid)
job.download()
[docs] def resetState(self):
self.run_mode = None
if self.orig_dir:
os.chdir(self.orig_dir)
CHDIR_MUTEX.unlock()
self.orig_dir = ''
self.stateChanged.emit()
@validator()
def validateJobName(self):
if not fileutils.is_valid_jobname(self.nextName()):
msg = fileutils.INVALID_JOBNAME_ERR % self.nextName()
return False, msg
return True
#===========================================================================
# Monitoring
#===========================================================================
[docs] def addTask(self, task):
# See parent class for documentation
tasks.AbstractTaskRunner.addTask(self, task)
# Update the cached file list with the new job dir
if self.orig_dir:
self.cached_namelist = os.listdir(self.orig_dir)
else:
self.cached_namelist = os.listdir(".")
jobid = task.jobId()
if jobid:
self.tasks_by_job_id[jobid] = task
[docs] def findJob(self, jobid):
"""
Finds a wrapped job by its job id.
:param jobid: the job id
:type jobid: str
:return: the job
:rtype: JobWrapper
"""
return self.tasks_by_job_id.get(jobid)
[docs] def viewname(self):
return str(self)
[docs] def update(self):
"""
Slot method to update the state of the job runner.
"""
self._update()
if not self.active_tasks:
self.job_timer.stop()
self.stateChanged.emit()
#===========================================================================
# Job writing
#===========================================================================
[docs] def write(self):
"""
Call this to write out a job to be run later.
"""
self.startRequested.emit()
self.run_mode = RunMode.WRITE
job_dir = None
try:
if not self._preFlight():
self.startFailed.emit()
return False
job_dir = os.getcwd()
cmdlist = self._makeCmdList()
filename = self._writeCmd(cmdlist)
if not filename:
self.startFailed.emit()
return False
finally:
self.resetState()
if job_dir is None:
return False
task = WrittenJobWrapper(filename, settings=self.settings())
self.addTask(task)
self.taskStarted.emit(task)
self.stateChanged.emit()
self.status('Job written to {}'.format(job_dir))
return True
def _writeCmd(self, cmdlist):
"""
Writes the given cmd to disk.
:param cmdlist:
:type cmdlist:
"""
filename = self.getSHFilename()
cmd = cmdlist_to_cmd(cmdlist)
with open(filename, 'w') as f:
f.write(cmd)
set_sh_file_flags(filename)
return filename
# TODO: make locale independent
[docs] def getSHFilename(self):
"""
Returns the .sh filename for the next job.
"""
return os.path.join(self.nextJobDir(), self.nextName() + '.sh')
[docs] def writeSTU(self):
"""
Writes out a STU test set.
"""
self.run_mode = RunMode.STU
os.chdir(self.nextJobDir())
try:
if not self._preFlight():
return False
cmdlist = self._makeCmdList()
self._writeSTU(cmdlist)
finally:
self.resetState()
def _writeSTU(self):
pass
#===========================================================================
# Job Directory
#===========================================================================
[docs] def createJobDir(self):
dirname = self.nextJobDir()
if os.path.exists(dirname):
qtext = ('The job directory, %s, already exists.\nWould you like '
'to delete its contents and continue?' % dirname)
overwrite = self.question(qtext, caption='Overwrite contents?')
if not overwrite:
return False
fileutils.force_rmtree(dirname)
os.mkdir(dirname)
return dirname
[docs] def nextJobDir(self):
if self.orig_dir: # We've chdir'ed into the jobdir already
base_dir = self.orig_dir
else:
base_dir = os.getcwd()
if self.jobOptions().create_job_dir:
return os.path.join(base_dir, self.nextName())
return base_dir
#===========================================================================
# Forming the command
#===========================================================================
[docs] def makeSchrodingerCmd(self, *args):
"""
Builds a $SCHRODINGER command string from all the args passed in. The
resulting string is suitable for use in a cmdlist and formatted for
use in starting or writing the job depending on self.run_mode. In START
mode, the $SCHRODINGER environment variable will be expanded. In WRITE
mode, it will stay as $SCHRODINGER and the path will be delimited with
linux-style forward slashes.
Example: self.makeSchrodingerCmd('utilities', 'my_utility') will return
"${SCHRODINGER}/utilities/my_utility" in WRITE mode.
"""
if self.run_mode == RunMode.START:
schrodinger_path = os.environ['SCHRODINGER']
return os.path.join(schrodinger_path, *args)
else:
return SCHRODINGER_VAR + '/' + '/'.join(args)
[docs] def getSchrodingerRun(self):
"""
Returns the correct version of the $SCHRODINGER/run string. This will
depend on whether the intent is to start or to write the job.
"""
return self.makeSchrodingerCmd('run')
def _makeCmdList(self):
"""
Gets the cmdlist and appends the standard flags. This will also do an
explicit str cast on every element of the cmdlist.
"""
cmdlist = self.makeCmdList()
cmdlist = self.appendConfigFlags(cmdlist)
return [
str(item) if isinstance(item, str) else item for item in cmdlist
]
[docs] def makeCmdList(self):
"""
Implement this to generate a cmdlist. This cmdlist will be used for
write functionality.
"""
raise NotImplementedError()
#===========================================================================
# Job options - options for job runner
#===========================================================================
[docs] def setupJobOptions(self, options):
"""
Override this to set the job options for this job. The options is passed
in by the framework. Modify and return the options object. The options
object will determine, for example, what the config dialog should look
like. For example::
options.incorporation = False
options.create_job_dir = False
return options
If this method is not overridden, default options will be used.
:param options: the options object to be customized.
:type options: JobOptions
"""
return options
[docs] def jobOptions(self):
options = JobOptions()
return self.setupJobOptions(options)
#===========================================================================
# Job config - options for launching the next job
#===========================================================================
[docs] def setConfig(self, config):
self.job_config = config
if config.jobname != self.nextName():
self.setCustomName(config.jobname)
self.updateStatusText()
[docs] def getNextConfig(self):
config = self.job_config
config.jobname = self.nextName()
config.viewname = self.viewname()
# Make a copy so that modifying the return value will not affect the
# runner directly.
return copy.deepcopy(config)
[docs] def appendConfigFlags(self, cmdlist):
config = self.getNextConfig()
return config.appendFlags(cmdlist, run_mode=self.run_mode)
[docs] def setCustomName(self, name):
tasks.AbstractTaskRunner.setCustomName(self, name)
self.job_config.jobname = self.nextName()
[docs] def updateStatusText(self):
config = self.getNextConfig()
self.status(config.summaryText(), timeout=0)
[docs]class CmdJobRunner(BaseJobRunner):
"""
This is the basic job runner for setting up and running a cmd line job. The
job is launched using jobcontrol.launch_job().
"""
def _launchCmd(self, cmd):
return jobcontrol.launch_job(cmd)
def _start(self):
try:
cmd = self._makeCmdList()
job = self._launchCmd(cmd)
except:
self.startFailed.emit()
raise
self.job_timer.start()
task = JobWrapper(job, settings=self.settings())
if maestro and job:
maestro.job_started(job.JobId)
return task
[docs] def makeCmdList(self):
"""
This is the main method that needs to be implemented to define a
specific cmd job runner. It should just return a complete cmd list
for the job to be launched. Standard job options should be left off.
"""
raise NotImplementedError()
#===============================================================================
# Job-related Utility Functions
#===============================================================================
[docs]def cmdlist_to_cmd(cmdlist):
"""
Converts a command list to a command string. Don't do this if you can
possibly avoid it.
:param cmdlist: a list of commands
:type cmdlist: list
:return: str
"""
return jobwriter.cmdlist_to_cmd(cmdlist)
[docs]def set_sh_file_flags(filename):
jobwriter.set_sh_file_flags(filename)
[docs]def get_first_hostname(host):
"""
Given a host string, get the corresponding hosts list from jobcontrol and
return the first hostname from that list.
:type host: string
:param host:
Hosts string which determine the string value of result
server argument. These are values usually from configuration
dialog and is of the form "galina" or "galina:1" or
"galina,monica" or "galina:2,monica:3" or "galina monica" or
"galina:2 monica:3".
:rtype: str
:return: Hostname based on the first of the first hosts in the
jobcontrol list.
"""
# FIXME Multiple hosts are being deprecated per JOBCON-4605
# This function should be removed once that is implemented.
hosts = jobcontrol.host_str_to_list(host)
if len(hosts):
return hosts[0][0]
return None
[docs]def job_belongs_to_panel(jobid, viewname):
"""
Return True if jobid belongs to viewname of a panel. Used by incorporation
callbacks to determine if the job belongs to us.
:param jobid: jobid for a given job
:type jobid: str
:param viewname: viewname corresponding to panel
:type viewname: str
"""
job = jobhub.get_cached_job(jobid)
if job.Viewname:
return viewname == job.Viewname
return False