"""
Module for af2 App classes that run multiple jobs
Copyright Schrodinger, LLC. All rights reserved.
"""
from collections import namedtuple
import os
import schrodinger
from schrodinger.application.desmond import util as demond_util
from schrodinger.application.matsci import configdlgs
from schrodinger.application.matsci import equilibrium_md as emd
from schrodinger.application.matsci import parserutils
from schrodinger.job import jobcontrol
from schrodinger.job import jobwriter
from schrodinger.job import launchparams
from schrodinger.Qt import QtCore
from schrodinger.Qt import QtGui
from schrodinger.ui.qt import forcefield
from schrodinger.ui.qt.appframework2 import af2
from schrodinger.ui.qt.appframework2 import baseapp
from schrodinger.ui.qt.appframework2 import jobs
from schrodinger.utils import fileutils
maestro = schrodinger.get_maestro()
DARK_YELLOW = QtGui.QColor(QtCore.Qt.darkYellow)
CMS_EXT_FILTER = 'Chemical model system files (*cms *cms.gz *cmsgz)'
CMD_DRIVER_DATA = namedtuple('CMD_DRIVER_DATA', 'driver cmd')
[docs]class MultiJobApp(af2.JobApp):
"""
Subclass of AF2 JobApp that allows launching multiple simultanious jobs.
Every panel subclassing this should re-implement getJobCount() and
getJobSpec() methods. A start method should NOT be implemented.
"""
def _startOrWrite(self):
"""
Over-riden method from JobApp class. Launcher/writes single or multiple
jobs. The value of self.start_mode determines which to do.
"""
# TODO: Either move this functionality into AF2, or factor out
# duplication between this method and base JobApp._startOrWrite().
if not self._preLaunchValidate():
return
njobs = self.getJobCount()
# Create the outer/master job directory:
if not jobs.CHDIR_MUTEX.tryLock():
self.warning(jobs.CHDIR_LOCKED_TEXT)
return
if self.createJobDir() is False:
# User has cancelled the job start/write; we don't chdir into jobdir
jobs.CHDIR_MUTEX.unlock()
return
s_if_multijob = 's' if njobs > 1 else ''
if self.start_mode == af2.FULL_START:
msg = 'Submitting job%s...' % s_if_multijob
elif self.start_mode == af2.ONLY_WRITE:
msg = 'Writing job%s...' % s_if_multijob
self.orig_dir = os.getcwd()
self.status_bar.showMessage(msg)
start_bn = self.bottom_bar.start_bn
start_bn.setEnabled(False)
settings_bn = self.bottom_bar.settings_bn
settings_bn.setEnabled(False)
# Force some QT event processing to ensure these state changes show up
# in the GUI - PANEL-7556
self.application.processEvents(QtCore.QEventLoop.ExcludeUserInputEvents)
os.chdir(self.jobDir())
try:
ret_list = self.multiJobStart()
except:
self.showLaunchStatus(0, njobs)
raise
finally:
os.chdir(self.orig_dir)
jobs.CHDIR_MUTEX.unlock()
start_bn.setEnabled(True)
settings_bn.setEnabled(True)
self.orig_dir = ''
num_succeeded = sum([1 for ret in ret_list if ret])
self.showLaunchStatus(num_succeeded, njobs)
if self.start_mode == af2.ONLY_WRITE and num_succeeded == 0:
fileutils.force_rmtree(self.jobDir())
self.status_bar.showMessage('Failed to write job')
return
self.updateJobname()
if self.start_mode == af2.FULL_START:
job_list = [job for job in ret_list if job]
if not job_list or len(job_list) > 1:
# Job tracking is not supported with multiple jobs; stop
# any previous tracking:
self.last_job = None
else:
# If a single job was launched; track it:
self.last_job = job_list[0]
if maestro:
# Monitor started jobs:
for job in job_list:
maestro.job_started(job.JobId)
@af2.validator(-1000)
def validateJobCount(self):
# Validate job counts:
njobs = self.getJobCount()
if njobs < 1:
self.error("At least one job must be requested")
return False
if njobs > 100:
msg = ("This panel supports at most 100 jobs; while "
"%i jobs were requested" % njobs)
return (False, msg)
if njobs > 10:
msg = ("Launching over 10 simultaneous jobs with this panel "
"is not recommened; you requested %i." % njobs)
return (True, msg)
return True
def _preLaunchValidate(self):
"""
Run validation before starting/writing jobs.
:return: True if validation passed, False if operation should be
cancelled.
:rtype: bool
"""
if not fileutils.is_valid_jobname(self.jobname()):
msg = fileutils.INVALID_JOBNAME_ERR % self.jobname()
self.warning(msg)
return False
if not self.runValidation(stop_on_fail=True):
return False
if self.config_dlg and not self.config_dlg.validate():
return False
return True
[docs] def showLaunchStatus(self, num_succeeded, njobs):
"""
Show a label at the bottom of the panel with status of the launch.
:param int num_succeeded: Number of jobs that were successfully launched
:param int njobs: Total number of jobs
"""
s_if_multijob = 's' if njobs > 1 else ''
if num_succeeded == 0:
# All jobs failed
if self.start_mode == af2.FULL_START:
msg = 'Failed to start job%s' % s_if_multijob
else:
msg = 'Failed to write job%s' % s_if_multijob
self.status_bar.showMessage(msg)
elif num_succeeded == njobs:
# All jobs were launched/written successfully
if self.start_mode == af2.FULL_START:
if njobs == 1:
msg = "Job started"
else:
msg = "%i jobs started" % njobs
else:
if njobs == 1:
msg = "Job written"
else:
msg = "%i jobs written" % njobs
self.status_bar.showMessage(msg, 3000, af2.DARK_GREEN)
else:
# Some of the job launched, some failed
if self.start_mode == af2.FULL_START:
template = "%i jobs started; %i failed to start"
else:
template = "%i jobs written; %i failed to write"
msg = template % (num_succeeded, njobs - num_succeeded)
self.status_bar.showMessage(msg, 3000, DARK_YELLOW)
[docs] def getJobCount(self):
"""
Over-ride in the subclass to return the number of jobs that the user
would like to start. Used for validation.
:return the number of jobs that the user requests.
:rtype: int
"""
raise NotImplementedError()
# start() decorator is required for AF2 to show the Run button.
[docs] @af2.appmethods.start()
def multiJobStart(self):
"""
Custom "start" method. For each job, creates a job sub-directory,
cd's into it, and launches the job there.
:return list of return values from self.launchFromJobSpec() calls.
Each value is a Job object or False, in start mode, or True/False
in write mode.
"""
njobs = self.getJobCount()
master_jobname = self.jobname()
if njobs == 1:
ret = self.launchFromJobSpec(master_jobname, 1)
return [ret]
ret_list = []
for job_num in range(1, njobs + 1):
sub_jobname = '%s-%03d' % (master_jobname, job_num)
os.mkdir(sub_jobname)
with fileutils.chdir(sub_jobname):
ret = self.launchFromJobSpec(sub_jobname, job_num)
ret_list.append(ret)
return ret_list
[docs] def getValidatedOPLSDir(self, oplsdir=None):
"""
Validate the existing oplsdir or get a requested oplsdir from the user's
selection in the force field selector widget
:param str oplsdir: The currently requested oplsdir
:rtype: False, None or str
:return: False if no valid oplsdir can be found, None if there is no
custom dir requested, or the path to the custom directory
"""
# PANEL-8401 has been filed to improve the AF2 infrastructure for using
# the FF Selector. That case may eventually result in changes here.
# Detect any forcefield selector if requested
sanitized_opls_dir = False
if oplsdir is None:
child = self.findChild(forcefield.ForceFieldSelector)
if child:
if not child.sanitizeCustomOPLSDir():
return False
sanitized_opls_dir = True
oplsdir = child.getCustomOPLSDIR()
# verify the oplsdir method argument's validity and allow using default
if oplsdir and not sanitized_opls_dir:
opls_dir_result = self.validateOPLSDir(oplsdir)
if opls_dir_result == forcefield.OPLSDirResult.ABORT:
return False
elif opls_dir_result == forcefield.OPLSDirResult.DEFAULT:
oplsdir = None
return oplsdir
[docs] def makeJobSpec(self, sub_jobname, job_num):
"""
Return job spec from job spec itself or some other data.
:param str sub_jobname: The job name for the subjob
:param int job_num: The subjob number
:rtype: `launchapi.JobSpecification`
:return: JobSpecification object
:raise SystemExit: On driver.get_job_spec_from_args error
"""
job_spec = self.getJobSpec(sub_jobname, job_num)
if isinstance(job_spec, CMD_DRIVER_DATA):
cmd, driver = list(job_spec.cmd), job_spec.driver
lic_host = self.config_dlg.getLicHost()
if lic_host:
cmd += [parserutils.FLAG_MD_UMBRELLA]
job_spec = driver.get_job_spec_from_args(cmd, license_host=lic_host)
return job_spec
[docs] def launchFromJobSpec(self, sub_jobname, job_num, oplsdir=None):
"""
Re-implemented from JobApp; options and behavior is somewhat differnt.
Starts or write the command for the given subjob.
:param str sub_jobname: The job name for the subjob
:param int job_num: The subjob number
:param str oplsdir: Path to OPLS directory
:return: Job object on successful start; True on successful write,
False on failure.
"""
jobdir = os.getcwd()
# TODO: Remove duplication between this method and JobApp method.
try:
job_spec = self.makeJobSpec(sub_jobname, job_num)
except SystemExit as e:
self.error('Error launching job {}'.format(e))
return False
launch_params = launchparams.LaunchParameters()
launch_params.setJobname(sub_jobname)
cd_params = self.configDialogSettings()
host = None
if 'host' in cd_params:
host = cd_params['host']
launch_params.setHostname(host)
if 'product_hosts' in cd_params:
status = self.config_dlg.setLaunchParams(job_spec, launch_params)
# Error already was shown if status is False
if status is False:
return False
if 'openmpcpus' in cd_params:
threads = cd_params['threads']
cpus = cd_params['openmpcpus']
if threads:
launch_params.setNumberOfSubjobs(cd_params['openmpsubjobs'])
if job_spec.jobUsesTPP():
launch_params.setNumberOfProcessorsOneNode(threads)
# NOTE: If the driver is not using the TPP option, but passing
# to subjobs, this needs to go as part of command in getJobSpec
# (use _addJaguarOptions)
else:
# NOTE: this is the right thing to do for matsci GUIs but
# maybe be the wrong thing to do for jaguar GUIs, since
# they may want ONLY the -PARALLEL N option and not also
# -HOST foo:N as well
launch_params.setNumberOfSubjobs(cpus)
elif 'cpus' in cd_params:
launch_params.setNumberOfSubjobs(cd_params['cpus'])
if self.runMode() == baseapp.MODE_MAESTRO:
if 'proj' in cd_params:
launch_params.setMaestroProjectName(cd_params['proj'])
# Setting disposition is only valid if we have a project
if 'disp' in cd_params:
launch_params.setMaestroProjectDisposition(
cd_params['disp'])
launch_params.setMaestroViewname(self.viewname)
if maestro and maestro.get_command_option(
"prefer", "enablejobdebugoutput") == "True":
launch_params.setDebugLevel(2)
oplsdir = self.getValidatedOPLSDir(oplsdir=oplsdir)
if oplsdir is False:
return False
elif oplsdir:
launch_params.setOPLSDir(oplsdir)
launch_params.setDeleteAfterIncorporation(True)
launch_params.setLaunchDirectory(jobdir)
# Call private function here because there's not guaranteed a great analog
# for cmdline launching.
cmdlist = jobcontrol._get_job_spec_launch_command(job_spec,
launch_params,
write_output=True)
self.writeJobCmd(cmdlist,
job_spec=job_spec,
launch_params=launch_params)
if self.start_mode == af2.FULL_START:
try:
job = jobcontrol.launch_from_job_spec(
job_spec,
launch_params,
display_commandline=jobwriter.cmdlist_to_cmd(cmdlist))
except jobcontrol.JobLaunchFailure:
# NOTE: jobcontrol.launch_job() already showed an error dialog
# to the user by this point.
return False
# NOTE: AF2 implementaiton calls registerJob(job) here. Not needed
# here as when multiple jobs are launched, job tracking is not
# possible. Instead we call maestro.job_started() at a later time,
# in order for the job to show up in monitor panels.
return job
elif self.start_mode == af2.ONLY_WRITE:
return True
def _getSHFilename(self):
"""
Return the name of the `*.sh` file that should be written. Assumes that
the CWD is the job directory for the (sub)job.
"""
jobdir = os.getcwd()
jobname = os.path.basename(jobdir)
return os.path.join(jobdir, jobname + '.sh')
[docs]class CmdJobMixin:
"""
Mixin for multi apps that uses launchJobCmd() instead of launchFromJobSpec()
"""
[docs] @af2.appmethods.start()
def multiJobStart(self):
"""
Custom "start" method. For each job, creates a job sub-directory,
cd's into it, and launches the job there.
:rtype: list
:return: list of return values from self.launchFromJobSpec() calls.
Each value is a Job object or False, in start mode, or True/False
in write mode.
"""
njobs = self.getJobCount()
master_jobname = self.jobname()
if njobs == 1:
cmd = self.getJobCmd(master_jobname, 1)
ret = self.launchJobCmd(cmd)
if self.start_mode == af2.ONLY_WRITE and ret is None:
# launchJobCmd returns None on ONLY_WRITE mode. Convert it to True
return [True]
return [ret]
ret_list = []
for job_num in range(1, njobs + 1):
sub_jobname = '%s-%03d' % (master_jobname, job_num)
os.mkdir(sub_jobname)
with fileutils.chdir(sub_jobname):
cmd = self.getJobCmd(sub_jobname, job_num)
ret = self.launchJobCmd(cmd, jobdir=os.getcwd())
ret_list.append(ret)
if self.start_mode == af2.ONLY_WRITE:
# launchJobCmd returns None on ONLY_WRITE mode. Convert it to True
ret_list = [True if x is None else x for x in ret_list]
return ret_list
[docs]class MultiDesmondJobApp(MultiJobApp):
"""
Class with functionality for setting up, validating and running multiple
Desmond jobs.
"""
[docs] def setup(self):
MultiJobApp.setup(self)
self._models = []
[docs] def setDefaults(self):
MultiJobApp.setDefaults(self)
self._models = []
[docs] def getModel(self, index):
"""
Return the model system at the specified index.
:param index: Index of the system to return
:type index: int
:return: Model system at the specified job index
:rtype: `cms.Cms`
"""
return self._models[index]
[docs] def getJobCount(self):
"""
Return the number of jobs that the user would like to run.
:return: Number of jobs to run
:rtype: int
"""
# Note that subclasses may choose to reimplement this function as needed;
# this general implementation should work for most cases.
if not maestro or not hasattr(self, 'input_selector'):
return len(self._models)
pt = maestro.project_table_get()
input_state = self.input_selector.inputState()
if input_state == self.input_selector.SELECTED_ENTRIES:
return len(pt.selected_rows)
elif input_state == self.input_selector.FILE:
return 1
else:
return len(pt.included_rows)
[docs] def getStructFromPtEntry(self):
"""
Get the first included entry in the Workspace if that entry is one of the
chosen entries, or the first selected entry if no included entry is
chosen.
:rtype: (`schrodinger.structure.Structure`, string) or (None, None)
:return: one structure from selected or included entries, the structure
entry id
"""
val_st = None
val_eid = None
if not maestro:
return val_st, val_eid
# File is not supported here, so assert right away
assert self.input_selector.options['file'] is False
ptable = maestro.project_table_get()
included_eids = maestro.get_included_entry_ids()
input_state = self.input_selector.inputState()
val_row = None
if input_state == self.input_selector.SELECTED_ENTRIES:
# Give priority to the the selected row that is also included in
# the Workspace
for eid in included_eids:
row = ptable.getRow(eid)
if row.is_selected:
val_row = row
break
if not val_row and ptable.selected_rows:
# None of the selected rows were included; use the first one:
val_row = next(iter(ptable.selected_rows))
else:
if included_eids:
val_row = ptable.getRow(included_eids.pop())
if val_row:
val_st = val_row.getStructure()
val_eid = val_row.entry_id
return val_st, val_eid
[docs] def launchFromJobSpec(self, sub_jobname, job_num, oplsdir=None):
"""
See parent class for documentation. Here mainly OPLS directory is
obtained from structure properties.
"""
if not oplsdir:
oplsdir = self.customOPLSDirForModel(job_num)
return super().launchFromJobSpec(sub_jobname, job_num, oplsdir=oplsdir)
@af2.validator(-999)
def validateModelLoaded(self):
"""
At runtime we check the user's input selection and attempt to load the
input, ensuring that all specified inputs are valid models.
"""
if not hasattr(self, 'input_selector'):
# Does not use input_selector for specifying input.
return True
try:
self._models = list(self.input_selector.cmsModels())
except TypeError as err:
return False, str(err)
if not self._models:
return False, "No valid model systems have been specified."
return True
[docs]class MultiCmdJobApp(CmdJobMixin, MultiDesmondJobApp):
"""
Class with functionality for setting up, validating and running multiple
Desmond jobs with GPU subhosts via commands.
Note: multiJobStart() fires off multiple jobs by launchJobCmd(), which calls
setupJobCmd() to add HOST and SUBHOST flags.
"""
[docs] def getJobCmd(self, jobname, job_number, cmd=None):
"""
Must be inherited by subclasses to yield a command list
:param str jobname: The job name
:param int job_number: the job number
:param list cmd: The list of command args to this point
:rtype: list of str
:return: a list of the command args for job submission
"""
if not cmd:
raise NotImplementedError("Subclasses must implement this method")
cmd += ['-JOBNAME', jobname]
# Add a custom OPLSDIR if requested
model_oplsdir = self.customOPLSDirForModel(job_number)
oplsdir = self.getValidatedOPLSDir(oplsdir=model_oplsdir)
if oplsdir:
cmd += ['-OPLSDIR', oplsdir]
return cmd
[docs] def setupJobCmd(self,
cmdlist,
auto_add_host=True,
auto_add_subhost=True,
**kwargs):
"""
Adds arguments such as HOST, SUBHOST, and GPU flags to cmdlist beyond the
parent class method if they are set in the config dialog. Settings pre-existing
in the cmdlist take precedence over the config dialog settings.
:param cmdlist: the command list
:type cmdlist: list
:param auto_add_host: Whether or not to automatically add -HOST flat to
command when it is not already included.
:type auto_add_host: bool
:param auto_add_subhost: Whether or not to automatically add -SUBHOST flat to
command when it is not already included.
:type auto_add_subhost: bool
"""
# MultiCmdJobApp has been tested with PerStructDesmondSubhostConfigDialog
# Please only remove this block after confirming other config dialog working
if self.config_dlg:
assert isinstance(self.config_dlg,
configdlgs.PerStructDesmondSubhostConfigDialog)
cd_params = self.configDialogSettings()
if cd_params.get('gpus'):
is_full = True
try:
rtype_idx = cmdlist.index(emd.FLAG_RUN_TYPE)
except ValueError:
pass
else:
# Post analysis, line fitting, and replica averaging don't
# request GPU resources
is_full = cmdlist[rtype_idx + 1] == emd.FULL_RUN
if is_full:
cmdlist += [parserutils.FLAG_GPU]
maxjobs = cd_params.get('maxjobs')
subjob_host = cd_params.get('subjob_host')
if subjob_host and '-SUBHOST' not in cmdlist and auto_add_subhost:
if maxjobs:
cmdlist.extend(['-SUBHOST', '%s:%s' % (subjob_host, maxjobs)])
else:
cmdlist.extend(['-SUBHOST', subjob_host])
if cd_params.get('host') and 'HOST' not in cmdlist and auto_add_host:
host = cd_params['host']
if maxjobs and not cd_params.get('subjob_host'):
cmdlist.extend(['-HOST', '%s:%s' % (host, maxjobs)])
else:
cmdlist.extend(['-HOST', host])
return super().setupJobCmd(cmdlist,
auto_add_host=auto_add_host,
**kwargs)