Source code for schrodinger.tasks.taskmanager

"""
A `TaskManager` manages multiple instances of a given task class.

One main responsibility of `TaskManager` is to assign unique names to its task
instances, which it does using a task namer. The namer uses a base name to
generate unique names for subsequent task instances. There are two kinds of
base names: standard base names and custom base names. Both can be used as the
current base name, but they behave quite differently.

Current base name: The base name that is applied to each subsequently loaded
    task. It is either the standard base name or a custom base name, and thus
    must be set using either `setStandardBaseName` or `setCustomBaseName`.

Standard base name: A base name that gets set as the current base name
    whenever `resetBaseName` is called. It may be set using
    `setStandardBaseName`, which also updates the current base name. The default
    standard base name is the task class name.

Custom base name: A base name to use as the current base name that will be lost
    when any other type of base name is specified or when `resetBaseName` is
    called. It may be set using `setCustomBaseName`.
"""

import os

from schrodinger.models import mappers
from schrodinger.models import parameters
from schrodinger.Qt import QtCore
from schrodinger.tasks import tasks
from schrodinger.ui.qt.appframework2 import jobnames
from schrodinger.utils import qt_utils
from schrodinger.utils import scollections


class _TaskNamer(parameters.CompoundParam):
    """
    Component class for TaskManager. Is responsible for choosing the name for
    the next task run from the task manager. The base_name is used to construct
    standard names which automatically increment.
    """
    base_name = parameters.StringParam()
    name = parameters.StringParam()
    taken_names = parameters.ListParam()

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.enableDirBasedNaming(False)
        self.base_nameChanged.connect(self.updateName)
        self.taken_namesChanged.connect(self.updateName)

    def enableDirBasedNaming(self, name_by_dir):
        """
        Set whether to update names baesd on `self.taken_names` or by files
        and directories in the current working directory.
        """
        self._name_by_dir = name_by_dir

    def updateName(self):
        self.name = self._getUniqueName()

    def _getUniqueName(self):
        unique_name = jobnames.update_jobname(self.base_name,
                                              self.base_name,
                                              name_list=self._getTakenNames())
        return unique_name

    def _getTakenNames(self):
        """
        This simple wrapper method is provided so behavior can be customized in
        subclasses. For example, for job tasks, "taken names" are determined
        from the contents of the CWD.
        """
        if self._name_by_dir:
            return os.listdir('.')
        else:
            return self.taken_names


[docs]@qt_utils.add_enums_as_attributes(tasks.Status) class TaskManager(mappers.TargetMixin, parameters.CompoundParam): status = parameters.EnumParam(tasks.Status, tasks.Status.WAITING) namer = _TaskNamer() taskStarted = QtCore.pyqtSignal(tasks.AbstractTask) taskEnded = QtCore.pyqtSignal(tasks.AbstractTask) newTaskLoaded = QtCore.pyqtSignal(tasks.AbstractTask) task_list = parameters.ParamListParam(tasks.AbstractTask) taskStatusChanged = QtCore.pyqtSignal(tasks.AbstractTask, tasks.Status) NAME_NOT_UNIQUE_MSG = 'Task name is not unique.' TaskClass = parameters.Param()
[docs] def __init__(self, TaskClass, directory_management=False, **kwargs): super().__init__(TaskClass=TaskClass, **kwargs) self._next_task = None self._directory_management = directory_management if directory_management: self.namer.enableDirBasedNaming(True) self.setStandardBaseName(self.TaskClass.__name__) self._started_tasks = scollections.IdSet() self.loadNewTask()
[docs] def resetBaseName(self): """ Set the standard base name as the current base name and apply it to the next task. """ self.namer.base_name = self._standard_base_name self.uniquifiyTaskName()
[docs] def setStandardBaseName(self, new_base_name): """ Set a new standard base name and apply it to the next task. This base name will be used whenever `resetBaseName` is called. """ self._standard_base_name = new_base_name self.resetBaseName()
[docs] def setCustomBaseName(self, new_base_name): """ Set a new current base name. This name does not immediately apply to the next task, and will be lost if `resetBaseName` is called. """ self.namer.base_name = new_base_name
[docs] def uniquifiyTaskName(self): """ Update the next tasks name to a unique name. A unique name is defined as a name that hasn't been previously used by this taskmanager and one that doesn't already have a directory with the same name in the current directory. """ self.namer.updateName() self.nextTask().name = self.namer.name
[docs] def wait(self, timeout=None): # Call the module-level wait function return tasks._wait(self, timeout=timeout)
def _onTaskStatusChanged(self, status): task = self.sender() self.taskStatusChanged.emit(task, status) if status in tasks.FINISHED_STATUSES: self.taskEnded.emit(task) if task not in self._started_tasks and status != tasks.Status.WAITING: self._started_tasks.add(task) self.task_list.append(task) self.taskStarted.emit(task) self.namer.taken_names.append(task.name) self.loadNewTask() self.status = self._getAggregateStatus()
[docs] def removeTask(self, task): self.task_list.remove(task) task.statusChanged.disconnect(self._onTaskStatusChanged)
def _getAggregateStatus(self): #TODO: consider moving status logic into a component class if all(task.status is tasks.Status.WAITING for task in self.task_list): return tasks.Status.WAITING elif any( task.status is tasks.Status.RUNNING for task in self.task_list): return tasks.Status.RUNNING elif any(task.status is tasks.Status.FAILED for task in self.task_list): return tasks.Status.FAILED elif any(task.status is tasks.Status.DONE for task in self.task_list): return tasks.Status.DONE else: print([task.status for task in self.task_list]) assert False, 'This shouldnt be reachable'
[docs] def loadNewTask(self): old_task = self._next_task if old_task is None: new_task = self.TaskClass() else: new_task = old_task.replicate() self._next_task = new_task self._connectTask(new_task) if old_task is not None: if isinstance(old_task.owner(), parameters.CompoundParam): setattr(old_task.owner(), old_task.paramName(), new_task) self.uniquifiyTaskName() self.newTaskLoaded.emit(new_task)
[docs] def nextTask(self): if self.TaskClass is None: raise RuntimeError("Can't get next task before a task class has " "been set.") if self._next_task is None: self.loadNewTask() return self._next_task
[docs] def isStartable(self): return True
[docs] def startNextTask(self): self.nextTask().start()
def _connectTask(self, task): if self._directory_management: task.specifyTaskDir(tasks.AUTO_TASKDIR) task.statusChanged.connect(self._onTaskStatusChanged)
[docs] def targetGetValue(self): return self._next_task
[docs] def targetSetValue(self, value): if self._next_task is value: return self.setNextTask(value)
[docs] def setNextTask(self, task): if task is not self._next_task: if not isinstance(task, self.TaskClass): raise ValueError self._next_task = task self._connectTask(task) self.newTaskLoaded.emit(task) self.uniquifiyTaskName()
[docs] def __len__(self): return len(self.task_list)
def __repr__(self): return '<%s: %s>' % (self.__class__.__name__, self.TaskClass.__name__)