Source code for schrodinger.application.matsci.multiapp

"""
Module for af2 App classes that run multiple jobs

Copyright Schrodinger, LLC. All rights reserved.
"""

from collections import namedtuple
import os

import schrodinger
from schrodinger.application.desmond import util as demond_util
from schrodinger.application.matsci import configdlgs
from schrodinger.application.matsci import equilibrium_md as emd
from schrodinger.application.matsci import parserutils
from schrodinger.job import jobcontrol
from schrodinger.job import jobwriter
from schrodinger.job import launchparams
from schrodinger.Qt import QtCore
from schrodinger.Qt import QtGui
from schrodinger.ui.qt import forcefield
from schrodinger.ui.qt.appframework2 import af2
from schrodinger.ui.qt.appframework2 import baseapp
from schrodinger.ui.qt.appframework2 import jobs
from schrodinger.utils import fileutils

maestro = schrodinger.get_maestro()

DARK_YELLOW = QtGui.QColor(QtCore.Qt.darkYellow)

CMS_EXT_FILTER = 'Chemical model system files (*cms *cms.gz *cmsgz)'

CMD_DRIVER_DATA = namedtuple('CMD_DRIVER_DATA', 'driver cmd')


[docs]class MultiJobApp(af2.JobApp): """ Subclass of AF2 JobApp that allows launching multiple simultanious jobs. Every panel subclassing this should re-implement getJobCount() and getJobSpec() methods. A start method should NOT be implemented. """ def _startOrWrite(self): """ Over-riden method from JobApp class. Launcher/writes single or multiple jobs. The value of self.start_mode determines which to do. """ # TODO: Either move this functionality into AF2, or factor out # duplication between this method and base JobApp._startOrWrite(). if not self._preLaunchValidate(): return njobs = self.getJobCount() # Create the outer/master job directory: if not jobs.CHDIR_MUTEX.tryLock(): self.warning(jobs.CHDIR_LOCKED_TEXT) return if self.createJobDir() is False: # User has cancelled the job start/write; we don't chdir into jobdir jobs.CHDIR_MUTEX.unlock() return s_if_multijob = 's' if njobs > 1 else '' if self.start_mode == af2.FULL_START: msg = 'Submitting job%s...' % s_if_multijob elif self.start_mode == af2.ONLY_WRITE: msg = 'Writing job%s...' % s_if_multijob self.orig_dir = os.getcwd() self.status_bar.showMessage(msg) start_bn = self.bottom_bar.start_bn start_bn.setEnabled(False) settings_bn = self.bottom_bar.settings_bn settings_bn.setEnabled(False) # Force some QT event processing to ensure these state changes show up # in the GUI - PANEL-7556 self.application.processEvents(QtCore.QEventLoop.ExcludeUserInputEvents) os.chdir(self.jobDir()) try: ret_list = self.multiJobStart() except: self.showLaunchStatus(0, njobs) raise finally: os.chdir(self.orig_dir) jobs.CHDIR_MUTEX.unlock() start_bn.setEnabled(True) settings_bn.setEnabled(True) self.orig_dir = '' num_succeeded = sum([1 for ret in ret_list if ret]) self.showLaunchStatus(num_succeeded, njobs) if self.start_mode == af2.ONLY_WRITE and num_succeeded == 0: fileutils.force_rmtree(self.jobDir()) self.status_bar.showMessage('Failed to write job') return self.updateJobname() if self.start_mode == af2.FULL_START: job_list = [job for job in ret_list if job] if not job_list or len(job_list) > 1: # Job tracking is not supported with multiple jobs; stop # any previous tracking: self.last_job = None else: # If a single job was launched; track it: self.last_job = job_list[0] if maestro: # Monitor started jobs: for job in job_list: maestro.job_started(job.JobId) @af2.validator(-1000) def validateJobCount(self): # Validate job counts: njobs = self.getJobCount() if njobs < 1: self.error("At least one job must be requested") return False if njobs > 100: msg = ("This panel supports at most 100 jobs; while " "%i jobs were requested" % njobs) return (False, msg) if njobs > 10: msg = ("Launching over 10 simultaneous jobs with this panel " "is not recommened; you requested %i." % njobs) return (True, msg) return True def _preLaunchValidate(self): """ Run validation before starting/writing jobs. :return: True if validation passed, False if operation should be cancelled. :rtype: bool """ if not fileutils.is_valid_jobname(self.jobname()): msg = fileutils.INVALID_JOBNAME_ERR % self.jobname() self.warning(msg) return False if not self.runValidation(stop_on_fail=True): return False if self.config_dlg and not self.config_dlg.validate(): return False return True
[docs] def showLaunchStatus(self, num_succeeded, njobs): """ Show a label at the bottom of the panel with status of the launch. :param int num_succeeded: Number of jobs that were successfully launched :param int njobs: Total number of jobs """ s_if_multijob = 's' if njobs > 1 else '' if num_succeeded == 0: # All jobs failed if self.start_mode == af2.FULL_START: msg = 'Failed to start job%s' % s_if_multijob else: msg = 'Failed to write job%s' % s_if_multijob self.status_bar.showMessage(msg) elif num_succeeded == njobs: # All jobs were launched/written successfully if self.start_mode == af2.FULL_START: if njobs == 1: msg = "Job started" else: msg = "%i jobs started" % njobs else: if njobs == 1: msg = "Job written" else: msg = "%i jobs written" % njobs self.status_bar.showMessage(msg, 3000, af2.DARK_GREEN) else: # Some of the job launched, some failed if self.start_mode == af2.FULL_START: template = "%i jobs started; %i failed to start" else: template = "%i jobs written; %i failed to write" msg = template % (num_succeeded, njobs - num_succeeded) self.status_bar.showMessage(msg, 3000, DARK_YELLOW)
[docs] def getJobCount(self): """ Over-ride in the subclass to return the number of jobs that the user would like to start. Used for validation. :return the number of jobs that the user requests. :rtype: int """ raise NotImplementedError()
# start() decorator is required for AF2 to show the Run button.
[docs] @af2.appmethods.start() def multiJobStart(self): """ Custom "start" method. For each job, creates a job sub-directory, cd's into it, and launches the job there. :return list of return values from self.launchFromJobSpec() calls. Each value is a Job object or False, in start mode, or True/False in write mode. """ njobs = self.getJobCount() master_jobname = self.jobname() if njobs == 1: ret = self.launchFromJobSpec(master_jobname, 1) return [ret] ret_list = [] for job_num in range(1, njobs + 1): sub_jobname = '%s-%03d' % (master_jobname, job_num) os.mkdir(sub_jobname) with fileutils.chdir(sub_jobname): ret = self.launchFromJobSpec(sub_jobname, job_num) ret_list.append(ret) return ret_list
[docs] def getValidatedOPLSDir(self, oplsdir=None): """ Validate the existing oplsdir or get a requested oplsdir from the user's selection in the force field selector widget :param str oplsdir: The currently requested oplsdir :rtype: False, None or str :return: False if no valid oplsdir can be found, None if there is no custom dir requested, or the path to the custom directory """ # PANEL-8401 has been filed to improve the AF2 infrastructure for using # the FF Selector. That case may eventually result in changes here. # Detect any forcefield selector if requested sanitized_opls_dir = False if oplsdir is None: child = self.findChild(forcefield.ForceFieldSelector) if child: if not child.sanitizeCustomOPLSDir(): return False sanitized_opls_dir = True oplsdir = child.getCustomOPLSDIR() # verify the oplsdir method argument's validity and allow using default if oplsdir and not sanitized_opls_dir: opls_dir_result = self.validateOPLSDir(oplsdir) if opls_dir_result == forcefield.OPLSDirResult.ABORT: return False elif opls_dir_result == forcefield.OPLSDirResult.DEFAULT: oplsdir = None return oplsdir
[docs] def makeJobSpec(self, sub_jobname, job_num): """ Return job spec from job spec itself or some other data. :param str sub_jobname: The job name for the subjob :param int job_num: The subjob number :rtype: `launchapi.JobSpecification` :return: JobSpecification object :raise SystemExit: On driver.get_job_spec_from_args error """ job_spec = self.getJobSpec(sub_jobname, job_num) if isinstance(job_spec, CMD_DRIVER_DATA): cmd, driver = list(job_spec.cmd), job_spec.driver lic_host = self.config_dlg.getLicHost() if lic_host: cmd += [parserutils.FLAG_MD_UMBRELLA] job_spec = driver.get_job_spec_from_args(cmd, license_host=lic_host) return job_spec
[docs] def launchFromJobSpec(self, sub_jobname, job_num, oplsdir=None): """ Re-implemented from JobApp; options and behavior is somewhat differnt. Starts or write the command for the given subjob. :param str sub_jobname: The job name for the subjob :param int job_num: The subjob number :param str oplsdir: Path to OPLS directory :return: Job object on successful start; True on successful write, False on failure. """ jobdir = os.getcwd() # TODO: Remove duplication between this method and JobApp method. try: job_spec = self.makeJobSpec(sub_jobname, job_num) except SystemExit as e: self.error('Error launching job {}'.format(e)) return False launch_params = launchparams.LaunchParameters() launch_params.setJobname(sub_jobname) cd_params = self.configDialogSettings() host = None if 'host' in cd_params: host = cd_params['host'] launch_params.setHostname(host) if 'product_hosts' in cd_params: status = self.config_dlg.setLaunchParams(job_spec, launch_params) # Error already was shown if status is False if status is False: return False if 'openmpcpus' in cd_params: threads = cd_params['threads'] cpus = cd_params['openmpcpus'] if threads: launch_params.setNumberOfSubjobs(cd_params['openmpsubjobs']) if job_spec.jobUsesTPP(): launch_params.setNumberOfProcessorsOneNode(threads) # NOTE: If the driver is not using the TPP option, but passing # to subjobs, this needs to go as part of command in getJobSpec # (use _addJaguarOptions) else: # NOTE: this is the right thing to do for matsci GUIs but # maybe be the wrong thing to do for jaguar GUIs, since # they may want ONLY the -PARALLEL N option and not also # -HOST foo:N as well launch_params.setNumberOfSubjobs(cpus) elif 'cpus' in cd_params: launch_params.setNumberOfSubjobs(cd_params['cpus']) if self.runMode() == baseapp.MODE_MAESTRO: if 'proj' in cd_params: launch_params.setMaestroProjectName(cd_params['proj']) # Setting disposition is only valid if we have a project if 'disp' in cd_params: launch_params.setMaestroProjectDisposition( cd_params['disp']) launch_params.setMaestroViewname(self.viewname) if maestro and maestro.get_command_option( "prefer", "enablejobdebugoutput") == "True": launch_params.setDebugLevel(2) oplsdir = self.getValidatedOPLSDir(oplsdir=oplsdir) if oplsdir is False: return False elif oplsdir: launch_params.setOPLSDir(oplsdir) launch_params.setDeleteAfterIncorporation(True) launch_params.setLaunchDirectory(jobdir) # Call private function here because there's not guaranteed a great analog # for cmdline launching. cmdlist = jobcontrol._get_job_spec_launch_command(job_spec, launch_params, write_output=True) self.writeJobCmd(cmdlist, job_spec=job_spec, launch_params=launch_params) if self.start_mode == af2.FULL_START: try: job = jobcontrol.launch_from_job_spec( job_spec, launch_params, display_commandline=jobwriter.cmdlist_to_cmd(cmdlist)) except jobcontrol.JobLaunchFailure: # NOTE: jobcontrol.launch_job() already showed an error dialog # to the user by this point. return False # NOTE: AF2 implementaiton calls registerJob(job) here. Not needed # here as when multiple jobs are launched, job tracking is not # possible. Instead we call maestro.job_started() at a later time, # in order for the job to show up in monitor panels. return job elif self.start_mode == af2.ONLY_WRITE: return True
def _getSHFilename(self): """ Return the name of the `*.sh` file that should be written. Assumes that the CWD is the job directory for the (sub)job. """ jobdir = os.getcwd() jobname = os.path.basename(jobdir) return os.path.join(jobdir, jobname + '.sh')
[docs]class CmdJobMixin: """ Mixin for multi apps that uses launchJobCmd() instead of launchFromJobSpec() """
[docs] @af2.appmethods.start() def multiJobStart(self): """ Custom "start" method. For each job, creates a job sub-directory, cd's into it, and launches the job there. :rtype: list :return: list of return values from self.launchFromJobSpec() calls. Each value is a Job object or False, in start mode, or True/False in write mode. """ njobs = self.getJobCount() master_jobname = self.jobname() if njobs == 1: cmd = self.getJobCmd(master_jobname, 1) ret = self.launchJobCmd(cmd) if self.start_mode == af2.ONLY_WRITE and ret is None: # launchJobCmd returns None on ONLY_WRITE mode. Convert it to True return [True] return [ret] ret_list = [] for job_num in range(1, njobs + 1): sub_jobname = '%s-%03d' % (master_jobname, job_num) os.mkdir(sub_jobname) with fileutils.chdir(sub_jobname): cmd = self.getJobCmd(sub_jobname, job_num) ret = self.launchJobCmd(cmd, jobdir=os.getcwd()) ret_list.append(ret) if self.start_mode == af2.ONLY_WRITE: # launchJobCmd returns None on ONLY_WRITE mode. Convert it to True ret_list = [True if x is None else x for x in ret_list] return ret_list
[docs]class MultiDesmondJobApp(MultiJobApp): """ Class with functionality for setting up, validating and running multiple Desmond jobs. """
[docs] def setup(self): MultiJobApp.setup(self) self._models = []
[docs] def setDefaults(self): MultiJobApp.setDefaults(self) self._models = []
[docs] def getModel(self, index): """ Return the model system at the specified index. :param index: Index of the system to return :type index: int :return: Model system at the specified job index :rtype: `cms.Cms` """ return self._models[index]
[docs] def getJobCount(self): """ Return the number of jobs that the user would like to run. :return: Number of jobs to run :rtype: int """ # Note that subclasses may choose to reimplement this function as needed; # this general implementation should work for most cases. if not maestro or not hasattr(self, 'input_selector'): return len(self._models) pt = maestro.project_table_get() input_state = self.input_selector.inputState() if input_state == self.input_selector.SELECTED_ENTRIES: return len(pt.selected_rows) elif input_state == self.input_selector.FILE: return 1 else: return len(pt.included_rows)
[docs] def getStructFromPtEntry(self): """ Get the first included entry in the Workspace if that entry is one of the chosen entries, or the first selected entry if no included entry is chosen. :rtype: (`schrodinger.structure.Structure`, string) or (None, None) :return: one structure from selected or included entries, the structure entry id """ val_st = None val_eid = None if not maestro: return val_st, val_eid # File is not supported here, so assert right away assert self.input_selector.options['file'] is False ptable = maestro.project_table_get() included_eids = maestro.get_included_entry_ids() input_state = self.input_selector.inputState() val_row = None if input_state == self.input_selector.SELECTED_ENTRIES: # Give priority to the the selected row that is also included in # the Workspace for eid in included_eids: row = ptable.getRow(eid) if row.is_selected: val_row = row break if not val_row and ptable.selected_rows: # None of the selected rows were included; use the first one: val_row = next(iter(ptable.selected_rows)) else: if included_eids: val_row = ptable.getRow(included_eids.pop()) if val_row: val_st = val_row.getStructure() val_eid = val_row.entry_id return val_st, val_eid
[docs] def customOPLSDirForModel(self, job_num): """ Get the custom OPLS directory (if any) for the model corresponding to job_num :param int job_num: The index of the job number :rtype: str or None :return: The opls directory to use, or None if this model does not use it """ struct = self._models[job_num - 1].fsys_ct # MATSCI-8027 / DESMOND-10004 if demond_util.use_custom_oplsdir(struct): oplsdir = forcefield.get_custom_opls_dir() else: oplsdir = None return oplsdir
[docs] def launchFromJobSpec(self, sub_jobname, job_num, oplsdir=None): """ See parent class for documentation. Here mainly OPLS directory is obtained from structure properties. """ if not oplsdir: oplsdir = self.customOPLSDirForModel(job_num) return super().launchFromJobSpec(sub_jobname, job_num, oplsdir=oplsdir)
@af2.validator(-999) def validateModelLoaded(self): """ At runtime we check the user's input selection and attempt to load the input, ensuring that all specified inputs are valid models. """ if not hasattr(self, 'input_selector'): # Does not use input_selector for specifying input. return True try: self._models = list(self.input_selector.cmsModels()) except TypeError as err: return False, str(err) if not self._models: return False, "No valid model systems have been specified." return True
[docs]class MultiCmdJobApp(CmdJobMixin, MultiDesmondJobApp): """ Class with functionality for setting up, validating and running multiple Desmond jobs with GPU subhosts via commands. Note: multiJobStart() fires off multiple jobs by launchJobCmd(), which calls setupJobCmd() to add HOST and SUBHOST flags. """
[docs] def getJobCmd(self, jobname, job_number, cmd=None): """ Must be inherited by subclasses to yield a command list :param str jobname: The job name :param int job_number: the job number :param list cmd: The list of command args to this point :rtype: list of str :return: a list of the command args for job submission """ if not cmd: raise NotImplementedError("Subclasses must implement this method") cmd += ['-JOBNAME', jobname] # Add a custom OPLSDIR if requested model_oplsdir = self.customOPLSDirForModel(job_number) oplsdir = self.getValidatedOPLSDir(oplsdir=model_oplsdir) if oplsdir: cmd += ['-OPLSDIR', oplsdir] return cmd
[docs] def setupJobCmd(self, cmdlist, auto_add_host=True, auto_add_subhost=True, **kwargs): """ Adds arguments such as HOST, SUBHOST, and GPU flags to cmdlist beyond the parent class method if they are set in the config dialog. Settings pre-existing in the cmdlist take precedence over the config dialog settings. :param cmdlist: the command list :type cmdlist: list :param auto_add_host: Whether or not to automatically add -HOST flat to command when it is not already included. :type auto_add_host: bool :param auto_add_subhost: Whether or not to automatically add -SUBHOST flat to command when it is not already included. :type auto_add_subhost: bool """ # MultiCmdJobApp has been tested with PerStructDesmondSubhostConfigDialog # Please only remove this block after confirming other config dialog working if self.config_dlg: assert isinstance(self.config_dlg, configdlgs.PerStructDesmondSubhostConfigDialog) cd_params = self.configDialogSettings() if cd_params.get('gpus'): is_full = True try: rtype_idx = cmdlist.index(emd.FLAG_RUN_TYPE) except ValueError: pass else: # Post analysis, line fitting, and replica averaging don't # request GPU resources is_full = cmdlist[rtype_idx + 1] == emd.FULL_RUN if is_full: cmdlist += [parserutils.FLAG_GPU] maxjobs = cd_params.get('maxjobs') subjob_host = cd_params.get('subjob_host') if subjob_host and '-SUBHOST' not in cmdlist and auto_add_subhost: if maxjobs: cmdlist.extend(['-SUBHOST', '%s:%s' % (subjob_host, maxjobs)]) else: cmdlist.extend(['-SUBHOST', subjob_host]) if cd_params.get('host') and 'HOST' not in cmdlist and auto_add_host: host = cd_params['host'] if maxjobs and not cd_params.get('subjob_host'): cmdlist.extend(['-HOST', '%s:%s' % (host, maxjobs)]) else: cmdlist.extend(['-HOST', host]) return super().setupJobCmd(cmdlist, auto_add_host=auto_add_host, **kwargs)