Source code for schrodinger.tasks.taskmanager
"""
A `TaskManager` manages multiple instances of a given task class.
One main responsibility of `TaskManager` is to assign unique names to its task
instances, which it does using a task namer. The namer uses a base name to
generate unique names for subsequent task instances. There are two kinds of
base names: standard base names and custom base names. Both can be used as the
current base name, but they behave quite differently.
Current base name: The base name that is applied to each subsequently loaded
task. It is either the standard base name or a custom base name, and thus
must be set using either `setStandardBaseName` or `setCustomBaseName`.
Standard base name: A base name that gets set as the current base name
whenever `resetBaseName` is called. It may be set using
`setStandardBaseName`, which also updates the current base name. The default
standard base name is the task class name.
Custom base name: A base name to use as the current base name that will be lost
when any other type of base name is specified or when `resetBaseName` is
called. It may be set using `setCustomBaseName`.
"""
import os
from schrodinger.models import mappers
from schrodinger.models import parameters
from schrodinger.Qt import QtCore
from schrodinger.tasks import tasks
from schrodinger.ui.qt.appframework2 import jobnames
from schrodinger.utils import qt_utils
from schrodinger.utils import scollections
class _TaskNamer(parameters.CompoundParam):
"""
Component class for TaskManager. Is responsible for choosing the name for
the next task run from the task manager. The base_name is used to construct
standard names which automatically increment.
"""
base_name = parameters.StringParam()
name = parameters.StringParam()
taken_names = parameters.ListParam()
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.enableDirBasedNaming(False)
self.base_nameChanged.connect(self.updateName)
self.taken_namesChanged.connect(self.updateName)
def enableDirBasedNaming(self, name_by_dir):
"""
Set whether to update names baesd on `self.taken_names` or by files
and directories in the current working directory.
"""
self._name_by_dir = name_by_dir
def updateName(self):
self.name = self._getUniqueName()
def _getUniqueName(self):
unique_name = jobnames.update_jobname(self.base_name,
self.base_name,
name_list=self._getTakenNames())
return unique_name
def _getTakenNames(self):
"""
This simple wrapper method is provided so behavior can be customized in
subclasses. For example, for job tasks, "taken names" are determined
from the contents of the CWD.
"""
if self._name_by_dir:
return os.listdir('.')
else:
return self.taken_names
[docs]@qt_utils.add_enums_as_attributes(tasks.Status)
class TaskManager(mappers.TargetMixin, parameters.CompoundParam):
status = parameters.EnumParam(tasks.Status, tasks.Status.WAITING)
namer = _TaskNamer()
taskStarted = QtCore.pyqtSignal(tasks.AbstractTask)
taskEnded = QtCore.pyqtSignal(tasks.AbstractTask)
newTaskLoaded = QtCore.pyqtSignal(tasks.AbstractTask)
task_list = parameters.ParamListParam(tasks.AbstractTask)
taskStatusChanged = QtCore.pyqtSignal(tasks.AbstractTask, tasks.Status)
NAME_NOT_UNIQUE_MSG = 'Task name is not unique.'
TaskClass = parameters.Param()
[docs] def __init__(self, TaskClass, directory_management=False, **kwargs):
super().__init__(TaskClass=TaskClass, **kwargs)
self._next_task = None
self._directory_management = directory_management
if directory_management:
self.namer.enableDirBasedNaming(True)
self.setStandardBaseName(self.TaskClass.__name__)
self._started_tasks = scollections.IdSet()
self.loadNewTask()
[docs] def resetBaseName(self):
"""
Set the standard base name as the current base name and apply it to the
next task.
"""
self.namer.base_name = self._standard_base_name
self.uniquifiyTaskName()
[docs] def setStandardBaseName(self, new_base_name):
"""
Set a new standard base name and apply it to the next task. This base
name will be used whenever `resetBaseName` is called.
"""
self._standard_base_name = new_base_name
self.resetBaseName()
[docs] def setCustomBaseName(self, new_base_name):
"""
Set a new current base name. This name does not immediately apply to the
next task, and will be lost if `resetBaseName` is called.
"""
self.namer.base_name = new_base_name
[docs] def uniquifiyTaskName(self):
"""
Update the next tasks name to a unique name. A unique name is defined
as a name that hasn't been previously used by this taskmanager and
one that doesn't already have a directory with the same name in
the current directory.
"""
self.namer.updateName()
self.nextTask().name = self.namer.name
[docs] def wait(self, timeout=None):
# Call the module-level wait function
return tasks._wait(self, timeout=timeout)
def _onTaskStatusChanged(self, status):
task = self.sender()
self.taskStatusChanged.emit(task, status)
if status in tasks.FINISHED_STATUSES:
self.taskEnded.emit(task)
if task not in self._started_tasks and status != tasks.Status.WAITING:
self._started_tasks.add(task)
self.task_list.append(task)
self.taskStarted.emit(task)
self.namer.taken_names.append(task.name)
self.loadNewTask()
self.status = self._getAggregateStatus()
[docs] def removeTask(self, task):
self.task_list.remove(task)
task.statusChanged.disconnect(self._onTaskStatusChanged)
def _getAggregateStatus(self):
#TODO: consider moving status logic into a component class
if all(task.status is tasks.Status.WAITING for task in self.task_list):
return tasks.Status.WAITING
elif any(
task.status is tasks.Status.RUNNING for task in self.task_list):
return tasks.Status.RUNNING
elif any(task.status is tasks.Status.FAILED for task in self.task_list):
return tasks.Status.FAILED
elif any(task.status is tasks.Status.DONE for task in self.task_list):
return tasks.Status.DONE
else:
print([task.status for task in self.task_list])
assert False, 'This shouldnt be reachable'
[docs] def loadNewTask(self):
old_task = self._next_task
if old_task is None:
new_task = self.TaskClass()
else:
new_task = old_task.replicate()
self._next_task = new_task
self._connectTask(new_task)
if old_task is not None:
if isinstance(old_task.owner(), parameters.CompoundParam):
setattr(old_task.owner(), old_task.paramName(), new_task)
self.uniquifiyTaskName()
self.newTaskLoaded.emit(new_task)
[docs] def nextTask(self):
if self.TaskClass is None:
raise RuntimeError("Can't get next task before a task class has "
"been set.")
if self._next_task is None:
self.loadNewTask()
return self._next_task
[docs] def isStartable(self):
return True
[docs] def startNextTask(self):
self.nextTask().start()
def _connectTask(self, task):
if self._directory_management:
task.specifyTaskDir(tasks.AUTO_TASKDIR)
task.statusChanged.connect(self._onTaskStatusChanged)
[docs] def targetGetValue(self):
return self._next_task
[docs] def targetSetValue(self, value):
if self._next_task is value:
return
self.setNextTask(value)
[docs] def setNextTask(self, task):
if task is not self._next_task:
if not isinstance(task, self.TaskClass):
raise ValueError
self._next_task = task
self._connectTask(task)
self.newTaskLoaded.emit(task)
self.uniquifiyTaskName()
[docs] def __len__(self):
return len(self.task_list)
def __repr__(self):
return '<%s: %s>' % (self.__class__.__name__, self.TaskClass.__name__)