"""
<<<<< DEPRECATED >>>>>
This module should not be used for new code. Instead, consider using
`schrodinger.tasks.tasks`
<<<<< !!!!!!!!! >>>>>
Task runner classes are designed to be subclassed to define runners for specific
tasks. A "task" is a generic term that encompasses jobs, threads, and subprocess
calls.
"""
from schrodinger.Qt import QtCore
from schrodinger.ui.qt.appframework2 import jobnames
from schrodinger.ui.qt.appframework2 import validation
ERROR, WARNING, QUESTION, INFO, STATUS = list(range(5))
#===============================================================================
# Task Wrapper
#===============================================================================
[docs]class Status(object):
NONE = '-'
RUNNING = 'Running'
FAILED = 'Failed'
DONE = 'Done'
ERROR = 'Error'
[docs]class AbstractTaskWrapper(object):
"""
Provides a common interface for tasks that is independent of the underlying
task object.
In main methods are self.isRunning() and self.status(), the self.getName() and
self.setName()
"""
TASK_CLASS = type(None)
[docs] def __init__(self, task, settings=None, name='', test_mode=False):
"""
:param task: the underlying task object (depends on subclass)
:type task: see derived class
:param name: the task name
:type name: str
:param settings: the settings used to run this task
:type settings: dict
:param test_mode: disables type-checking of the task object. Used for
mocking that task in tests
:type test_mode: bool
"""
if self.TASK_CLASS is None:
raise RuntimeError('Cannot instantiate AbstractTaskWrapper.')
if not test_mode and not isinstance(task, self.TASK_CLASS):
raise TypeError('Task %s must be of type %s.' %
(task, self.TASK_CLASS.__name__))
self._task = task
self._status = None
if settings is None:
settings = {}
self._settings = settings.copy()
self.setName(name)
[docs] def isRunning(self):
"""
Whether this task is currently running.
"""
raise NotImplementedError()
[docs] def status(self):
"""
The current status of the task. The schema is flexible and can be agreed
upon with the corresponding runner.
"""
return self._status
[docs] def settings(self):
"""
Returns the settings that were used to run this task.
"""
return self._settings
[docs] def getName(self):
return self._name
[docs] def setName(self, name):
self._name = name
def __str__(self):
return self.getName()
def __repr__(self):
return '%s: %s' % (self.__class__.__name__, self.getName())
#===============================================================================
# Task Runner Base Class
#===============================================================================
[docs]class AbstractTaskRunner(validation.ValidationMixin, QtCore.QObject):
stateChanged = QtCore.pyqtSignal()
startRequested = QtCore.pyqtSignal()
startFailed = QtCore.pyqtSignal()
taskStarted = QtCore.pyqtSignal(AbstractTaskWrapper)
taskEnded = QtCore.pyqtSignal(AbstractTaskWrapper)
nameChanged = QtCore.pyqtSignal()
resetAllRequested = QtCore.pyqtSignal()
[docs] def __init__(self, messaging_callback=None, settings_callback=None):
"""
Initializes a new task runner
:param messaging_callback: callback used for interacting with the user.
This includes both reporting results or errors and asking questions to
the user. The callback should be of the form:
f(message_type, text, options=None, runner=None)
where message_type is one of ERROR, WARNING, or QUESTION, text is the
text of the message to be displayed, and options is an optional dict
which can be used by the callback. For example, a dialog title could
be passed to the callback via the options dict. The callback should
never depend on the presence or absence of any options. The runner is
simply the runner that is making the call, so that the callback can tell
which runner is invoking the callback.
When the message is a question, the function should return the user's
response, typically True or False for a yes/no question.
:type messaging_callback: function
:param settings_callback: callback for communicating state with the
parent object. This callback can be used both to push state to or pull
state from the parent object. The callback should be of the form:
f(settings=None, runner=None)
If no settings are passed in, the callback should return the state of
the parent object (i.e. the panel state) in the form of a dictionary.
The runner can be passed in as well if the callback needs to access the
runner to properly process the callback.
If a settings dictionary is passed in, the callback should apply any
settings in the dictionary to the parent object (thus altering its
state). Passing in an empty dictionary is a no-op.
:type settings_callback: function
"""
QtCore.QObject.__init__(self)
validation.ValidationMixin.__init__(self)
self.setCallbacks(messaging_callback, settings_callback)
self._settings = self.defaultSettings()
self.active_tasks = []
self.past_tasks = []
self.tasks_by_name = {}
self.custom_name = None
self.history_length = 5
self.allow_concurrent = True
self.base_name = 'task'
self.runner_name = None
self.allow_custom_name = True
self.setRunnerOptions()
if self.runner_name is None:
self.runner_name = self.base_name
[docs] def setCallbacks(self, messaging_callback=None, settings_callback=None):
def no_op(*args, **kwargs):
return {}
if messaging_callback is None:
messaging_callback = no_op
if settings_callback is None:
settings_callback = no_op
# TODO: use weakref? callbacks disappeared when I tried.
self._settingsCallback = settings_callback
self._messagingCallback = messaging_callback
[docs] def setRunnerOptions(self):
"""
Optional override to set options for the runner. Not overriding this
at all results in using all default values.
self.allow custom_name - whether name is user-editable.
Default: False
self.allow_concurrent - whether another task can be started while
one is still running. Default: True
self.history_length - how many past jobs to keep track of.
Default: 5
self.base_name - the base for task names. The base name gets
modified to generate unique task names. Ex. MyTask_3.
Default: "task"
self.runner_name - a name to describe the type of task. Equivalent
to program_name for jobs. Default: 'task'
"""
[docs] def nextName(self, name_list=None):
"""
Returns the name that will be assigned to the next task that gets run.
There is no currentName(), as multiple tasks might be running
concurrently. To get the name of an existing task, use task.getName().
If a custom name has been set, that will be used as the next name.
Otherwise, the base name will be used to generate a new unique name.
This method can be overridden to alter the task naming behavior.
:param name_list: Optional list of names to uniquify against. If not
given, the name will be compared against the stored self.names()
:type name_list: list of basestring
"""
if not self.base_name and not self.custom_name:
return ''
# Determine the next name
if not name_list:
name_list = self._getTakenNames()
next_name = jobnames.update_jobname(self.base_name,
self.base_name,
name_list=name_list)
# If the custom name matches, reset
if self.custom_name == next_name:
self.custom_name = None
if self.custom_name:
return self.custom_name
return next_name
[docs] def setCustomName(self, name):
"""
Sets a custom name for the next task to be run.
:param name: the custom name. Pass in an empty string to return to
standard naming.
:type name: str
"""
name = str(name) # This value is usually unicode from a QLineEdit
if not self.allow_custom_name:
return
self.custom_name = name
self.nameChanged.emit()
[docs] def preValidate(self):
"""
Override this to include any logic that should be run prior to the
validation step.
:return: Whether this step has succeeded. Returning False will result
in aborting the task
:rtype: bool
"""
return True
[docs] def postStart(self, task):
"""
Override this to include any logic that should be run immediately after
a task is started. This will only be run after a task actually starts.
The started task is passed in as a parameter to allow interaction with
the task instance. Note that there is no guarantee that the task is
still running when this method is called.
:param task: the task that was just started
:type task: AbstractTaskWrapper
"""
[docs] def postProcess(self, task):
"""
Override this to include any logic that should be run whenever a task
completes. This method is called whenever a task stops running, whether
it succeeded, failed, or encounrtered an error.
The completed task is passed in as a parameter to allow querying and
modification of the task instance.
There is currently no mechanism for ensuring this logic gets run between
maestro sesions. If a session is closed while the job is running, this
method will never be called. Use this method to perform only actions
that make sense in the context of a single session.
:param task: the task that has ended
:type task: AbstractTaskWrapper
"""
[docs] def start(self):
"""
Starts the task. This includes the preliminary work of calling
preValidate() and running validation before attempting to actually
start the task itself.
The actual starting of the task should be handled in the _start method
in the derived classes and will vary depening on the type of runner.
"""
self.startRequested.emit()
try:
result = self._preFlight()
except:
self.startFailed.emit()
raise
if not result:
self.startFailed.emit()
return False
try:
task = self._start()
except:
self.startFailed.emit()
raise
if not task:
self.startFailed.emit()
return False
self.addTask(task)
self.taskStarted.emit(task)
self.stateChanged.emit()
self.postStart(task)
return True
def _start(self):
"""
Should be implemented by derived class. Whereas the public start()
method performs the preliminary tasks of validation and job setup, as
well as tracking the started task, this method, _start(), should only
contain the code to start the task using the mechanism specific to the
type of runner (i.e. start a thread, launch a job, create a subprocess,
etc.)
:return: the task that has been started, or None, if the task could not
be started.
:rtype: AbstractTaskWrapper
"""
raise NotImplementedError()
#===========================================================================
# Task list
#===========================================================================
[docs] def names(self):
return list(self.tasks_by_name)
def _getTakenNames(self):
"""
Returns a list of names that are already taken. By default, this is just
the names of all the tasks started by this runner; however, this method
may be overriden to include other names as well (for example, all the
dir names in the cwd, so that jobs will not accidentally overwrite each
other.
"""
return self.names()
[docs] def tasks(self):
return list(self.tasks_by_name.values())
[docs] def addTask(self, task):
"""
Add a new task to be tracked. This should be called whenever a task is
started.
:param task: the task
:type task: AbstractTaskWrapper
"""
name = task.getName()
# we can try and set the name here, but it might be too late if the
# task has already retrieved its name.
if not name:
name = self.nextName()
task.setName(name)
self.tasks_by_name[name] = task
if task.isRunning():
self.active_tasks.append(task)
[docs] def findTask(self, name):
return self.tasks_by_name.get(name)
#===========================================================================
# Messages
#===========================================================================
[docs] def showMessage(self, message_type, text, options=None):
"""
Communicates with the parent object via the messaging_callback.
This method generally doesn't need to be called; call error, warning,
question, or info instead.
:param message_type: the type of message to send
:type message_type: int
:param text: the main text of the message
:type text: str
:param options: a dictionary of other options to be processed by the
messaging_callback.
:type options: dict
"""
return self._messagingCallback(message_type, text, options)
[docs] def error(self, text, caption='Error'):
return self._messagingCallback(ERROR,
text, {'caption': caption},
runner=self)
[docs] def warning(self, text, caption='Warning'):
return self._messagingCallback(WARNING,
text, {'caption': caption},
runner=self)
[docs] def question(self, text, caption='Question'):
return self._messagingCallback(QUESTION,
text, {'caption': caption},
runner=self)
[docs] def info(self, text, caption='Info'):
return self._messagingCallback(INFO,
text, {'caption': caption},
runner=self)
[docs] def status(self, text, timeout=3000, color=None):
"""
Request a status message to be displayed by the runner's parent.
:param text: the text to display
:type text: str
:param timeout: duration in ms to display the status. A timeout of 0
results in a permanent message.
:type timeout: int
:param color: color of the status message.
:type color: QtGui.QColor
"""
options = {'timeout': timeout, 'color': color}
return self._messagingCallback(STATUS, text, options, runner=self)
[docs] def updateStatusText(self):
"""
Override this to update the status, for example, when settings have
changed or the current task runner is switched.
"""
#===========================================================================
# Task settings - retrieving the specifications for a job
#===========================================================================
[docs] def pullSettings(self):
"""
This method calls the settings callback, which should return the user's
input for this job, such as input files, options, etc. For GUI panels,
this is how the panel state is applied to the job runner.
"""
settings = self._settingsCallback(runner=self)
self._settings.update(settings)
[docs] def pushSettings(self, settings=None):
"""
Pushes a settings dictionary via the settings callback. Doing this will
alter the state of the parent object (generally the panel). This
function can be used to reset the panel or load saved presets.
If a settings dictionary is not passed in, the current job settings will
be used.
:param settings: a settings dictionary to push to the parent object
:type settings: dict
"""
if settings is None:
settings = self._settings
self._settingsCallback(settings, runner=self)
[docs] def settings(self):
return self._settings
[docs] def defaultSettings(self):
"""
Override this method to define default values for any settings. This
dictionary of default settings will be used to reset the parent.
"""
return {}
[docs] def resetAll(self):
self.resetAllRequested.emit()
[docs] def reset(self):
"""
Resets the parent object using the default settings defined by the task
runner.
"""
self._settings = self.defaultSettings()
self.pushSettings()
#===========================================================================
# Internal use
#===========================================================================
def _preFlight(self):
if not self.allow_concurrent and self.isRunning():
self.error('Please wait until the last task is complete.')
return False
self.pullSettings()
if self.preValidate() == False:
return False
if not self.runValidation():
return False
return True
def _update(self):
# PANEL-14099: Linux machines may return task.isRunning() True, while
# task.status() is DONE. Tested in GUI, this bandage works.
completed_tasks = [
task for task in self.active_tasks
if (not task.isRunning() or task.status() == Status.DONE)
]
for task in completed_tasks:
self.active_tasks.remove(task)
self.past_tasks.append(task)
self.postProcess(task)
self.taskEnded.emit(task)
if completed_tasks:
self.stateChanged.emit()
[docs] def reportValidation(self, results):
"""
Present validation messages to the user. This is an implmentation of
the ValidationMixin interface and does not need to be called directly.
:param results: Set of results generated by validate()
:type results: ValidationResults
"""
for result in results:
if not result:
message = result.message
if not message:
message = 'Validation failed. Check settings and try again.'
self.error(message)
return False
else:
if result.message:
cont = self.question(result.message, caption='Warning')
if not cont:
return False
return True
[docs] def isRunning(self):
return bool(self.active_tasks)
def __str__(self):
return '%s.%s' % (self.__module__, self.__class__.__name__)
#===============================================================================
# Base Function Runner
#===============================================================================
[docs]class BaseFunctionRunner(AbstractTaskRunner):
"""
Base class for runners that can take a callable on instantiation or define
task logic in the runMain() method. Passing in a callable will override any
implementation runMain().
"""
[docs] def __init__(self,
func=None,
messaging_callback=None,
settings_callback=None):
"""
:param func: the callable that will be run as the main task. Overrides
self.runMain(). If self.runMain() is not defined, a func must be
provided.
:type func: callable
"""
AbstractTaskRunner.__init__(self, messaging_callback, settings_callback)
if func is not None:
self.runMain = func
def _runMain(self, task):
"""
Wraps runMain() to handle common functionality in running a callable as
a task. Updates task status, handles errors, reports success/failure,
etc.
:param task: the wrapped task object
:type task: AbstractTaskWrapper
"""
task._status = Status.RUNNING
try:
return_code = self.runMain(task)
except:
task._status = Status.ERROR
QtCore.QTimer.singleShot(0, self._update)
raise
if return_code is not None:
task._status = return_code
else:
task._status = Status.DONE
if task.status() == Status.DONE:
self.status('Task completed')
else:
self.status('Task status: %s' % task.status())
# Singleshot timer so that _update is called after thread is finished
QtCore.QTimer.singleShot(0, self._update)
[docs] def runMain(self, task):
raise NotImplementedError()
#===============================================================================
# Blocking Runner
#===============================================================================
[docs]class BlockingRunner(BaseFunctionRunner):
"""
Runner class that makes a blocking call to its main function. Useful for
quick calculations.
This can either be subclassed with runMain() being implemented with the main
logic, or used directly by passing in a callable.
"""
def _start(self):
task = BlockingWrapper(settings=self.settings())
self._runMain(task)
return task
[docs]class BlockingWrapper(AbstractTaskWrapper):
"""
Since a blocking call has no real associated object, this is essentially an
empty wrapper to provide the right interface for the runner.
"""
TASK_CLASS = dict # Just need something other than None
[docs] def __init__(self, settings=None, name='', **kwargs):
dummy = {} # Just needs to match TASK_CLASS
AbstractTaskWrapper.__init__(self, dummy, settings, name, **kwargs)
self._status = Status.NONE
[docs] def isRunning(self):
return False
#===============================================================================
# Thread Runner
#===============================================================================
[docs]class ThreadRunner(BaseFunctionRunner):
"""
An object to run tasks in threads. To use, subclass this class and
override the runMain() method with logic to be run by the thread.
Options can be set by overriding setOptions(). See parent class for more
information.
This can either be subclassed with runMain() being implemented with the main
logic, or used directly by passing in a callable.
"""
use_event_loop = False
[docs] def __init__(self, *args, **kwargs):
super(ThreadRunner, self).__init__(*args, **kwargs)
self.event_loop = QtCore.QEventLoop()
def _start(self):
"""
Implements the creation and starting of the thread.
"""
thread = QtCore.QThread()
task = ThreadWrapper(thread,
settings=self.settings(),
name=self.nextName())
task._status = Status.NONE
thread.run = lambda: self._runMain(task)
thread.start()
return task
[docs] def addTask(self, task):
super(ThreadRunner, self).addTask(task)
if self.use_event_loop and self.active_tasks:
# we need to use a singleshot because starting the event loop will
# block any code that follows it
QtCore.QTimer.singleShot(0, self._startEventLoop)
def _startEventLoop(self):
if not self.event_loop.isRunning():
self.event_loop.exec()
def _update(self):
super(ThreadRunner, self)._update()
if self.use_event_loop and not self.active_tasks:
self.event_loop.exit()
[docs]class ThreadWrapper(AbstractTaskWrapper):
"""
Wraps a QtCore.QThread to present a common task API.
"""
TASK_CLASS = QtCore.QThread
[docs] def isRunning(self):
return self._task.isRunning()