Source code for schrodinger.application.desmond.queue
import os
import time
from pathlib import Path
from typing import List
from typing import Optional
from typing import Tuple
from schrodinger.job import jobcontrol
from schrodinger.job import queue as jobcontrol_queue
# File names for automatic checkpointing
CHECKPOINT_REQUESTED_FILENAME = 'CHECKPOINT_REQUESTED'
CHECKPOINT_WITH_RESTART_REQUESTED_FILENAME = 'CHECKPOINT_WITH_RESTART_REQUESTED'
Host = Tuple[str, Optional[int]]
[docs]class Queue:
[docs]    def __init__(self,
                 hosts: str,
                 max_job: int,
                 max_retries: int,
                 periodic_callback=None):
        """
        :param hosts: string passed to -HOST.
        :param max_job: Maximum number of jobs to run simultaneously.
        :param max_retries: Maximum number of times to retry a failed job.
        :param periodic_callback: Function to call periodically as the jobs run.
            This can be used to handle the halt message for stopping
            a running workflow.
        """
        self.hosts = _parse_hosts(hosts, max_job)
        self.max_retries = max_retries
        self.jobdj = None
        self.periodic_callback = periodic_callback
        self._queued_jobs = []
        self._max_stop_time = 3600 
[docs]    def run(self):
        """
        Run jobs for all multisim stages.
        Starts a separate JobDJ for each multisim stage.::
            queue.push(jobs)
            queue.run()
                while jobs:  <---------------|
                    jobdj.run()              |
                    multisim_jobs.finish()   |
                      stage.capture()        |
                      next_stage.push()      |
                      next_stage.release()   |
                      queue.push(next_jobs) --
        """
        while self._queued_jobs:
            self._run_stage(self._queued_jobs) 
[docs]    def stop(self) -> int:
        """
        Attempt to stop the subjobs, but kill them if they
        do not stop in time.
        :return: Number of subjobs killed due to a failure to stop.
        """
        stop_time = time.time()
        num_subjobs_killed = 0
        # New jobs may be launched, so loop until
        # no more jobs are running.
        stopped_jobs = set()
        while self.running_jobs:
            for job in self.running_jobs:
                jctrl = job.getJob()
                if time.time() - stop_time > self._max_stop_time:
                    num_subjobs_killed += 1
                    jctrl.kill()
                elif job not in stopped_jobs:
                    stopped_jobs.add(job)
                    jctrl.stop()
        # Process the finished jobs
        self._finish_stage()
        return num_subjobs_killed 
[docs]    def push(self, jobs: List["cmj.Job"]):  # noqa: F821
        self._queued_jobs.extend(
            filter(lambda j: j.jlaunch_cmd is not None, jobs)) 
    @property
    def running_jobs(self) -> List["JobAdapter"]:
        running_jobs = []
        if self.jobdj is None:
            return []
        return [
            j for j in self.jobdj.active_jobs
            if j.getJob() and not j.getJob().isComplete()
        ]
    def _run_stage(self, jobs: List["JobAdapter"]):
        """
        Launch JobDJ for a given set of jobs.
        """
        # TODO: max_failures=jobcontrol_queue.NOLIMIT matches the current behavior
        # but in some cases it would be better to just exit on the first failure.
        self.jobdj = jobcontrol_queue.JobDJ(
            hosts=self.hosts,
            verbosity="normal",
            job_class=JobAdapter,
            max_failures=jobcontrol_queue.NOLIMIT,
            max_retries=self.max_retries)
        # The host running this often does not have a GPU
        # so smart distribution should be disabled.
        self.jobdj.disableSmartDistribution()
        # Run all jobs for this stage
        while jobs:
            # Add the jobs, linking to the multisim jobs
            for job in jobs:
                self.jobdj.addJob(job.jlaunch_cmd,
                                  multisim_job=job,
                                  command_dir=job.dir)
            # Clear queue, new jobs will be added if the current jobs
            # are requeued.
            self._queued_jobs = []
            try:
                self.jobdj.run(
                    status_change_callback=self._status_change_callback,
                    periodic_callback=self.periodic_callback,
                    callback_interval=60)
            except RuntimeError:
                # Use multisim error handling for failed jobs
                pass
            # Run any requeued jobs
            jobs = self._queued_jobs
        # Finish the stage and add new jobs (if any) to self._queued_jobs
        self._finish_stage()
    def _finish_stage(self):
        from schrodinger.application.desmond.cmj import JobStatus
        for job in self.jobdj.all_jobs:
            jctrl = job.getJob()
            if jctrl is None:
                # Check for jobs that never ran
                job.multisim_job.status.set(JobStatus.LAUNCH_FAILURE)
                continue
            # Skip intermediate jobs that were checkpointed
            if job.is_checkpointed:
                continue
            job.multisim_job.process_completed_job(jctrl)
            job.multisim_job.finish()
    def _status_change_callback(self, job: "JobAdapter"):
        """
        Process the job on a status change.
        """
        from schrodinger.application.desmond.cmj import JobStatus
        if job.state == jobcontrol_queue.JobState.WAITING:
            job.multisim_job.status.set(JobStatus.WAITING)
        elif job.state == jobcontrol_queue.JobState.ACTIVE:
            job.multisim_job.status.set(JobStatus.RUNNING)
        elif job.state == jobcontrol_queue.JobState.FAILED_RETRYABLE:
            job.multisim_job.status.set(JobStatus.RETRIABLE_FAILURE)
        elif job.state in (jobcontrol_queue.JobState.DONE,
                           jobcontrol_queue.JobState.FAILED):
            if job.state == jobcontrol_queue.JobState.DONE:
                jctrl = job.getJob()
                for out_fname in jctrl.getOutputFiles():
                    if Path(out_fname
                           ).name == CHECKPOINT_WITH_RESTART_REQUESTED_FILENAME:
                        job.multisim_job.process_completed_job(
                            jctrl, restart_requested=True)
                        job.multisim_job.requeue(jctrl)
                        job.is_checkpointed = True
                        self.push([job.multisim_job])
                        return
                    elif Path(out_fname).name == CHECKPOINT_REQUESTED_FILENAME:
                        job.multisim_job.process_completed_job(
                            jctrl, checkpoint_requested=True)
                        job.multisim_job.finish()
                        job.is_checkpointed = True
                        return 
[docs]class JobAdapter(jobcontrol_queue.JobControlJob):
[docs]    def __init__(self, *args, multisim_job=None, **kwargs):
        self.multisim_job = multisim_job
        # Set to True if this job is checkpointed by the auto restart mechanism
        self.is_checkpointed = False
        if launch_timeout := os.getenv('SCHRODINGER_MULTISIM_LAUNCH_TIMEOUT'):
            kwargs['launch_timeout'] = int(launch_timeout)
        else:
            kwargs['launch_timeout'] = 1800
        super().__init__(*args, **kwargs) 
[docs]    def getCommand(self) -> List[str]:
        # Restart command has priority over the original command
        return self.multisim_job.jlaunch_cmd 
[docs]    def maxFailuresReached(*args, **kwargs):
        # Use multisim failure reporting for now
        pass  
def _parse_hosts(hosts: str, max_job: int) -> List[Host]:
    """
    Parse the hosts while also respecting the given max_job.
    See `Queue` for the meaning of the arguments.
    :return: List of Host tuples.
    """
    # Handle multiple hosts "localhost localhost"
    split_hosts = hosts.strip().split()
    if len(split_hosts) > 1:
        # Only running on the same host is supported
        if len(set(split_hosts)) == 1:
            hosts = f'{split_hosts[0]}:{len(split_hosts)}'
        else:
            raise ValueError("Different hosts are no longer supported. "
                             "All jobs must be run on the same host.")
    split_host = hosts.split(':')
    host = split_host[0]
    if max_job:
        # Max job takes priority if specified
        hosts = f'{host}:{max_job}'
    else:
        # For non-queue hosts, if max_job and
        # the number of cpus are not specified,
        # set to the number of processors.
        # This is different from JobDJ, which
        # set it to 1.
        host_entry = jobcontrol.get_host(host)
        if len(split_host) == 1 and not host_entry.isQueue():
            hosts = f'{host}:{host_entry.processors}'
    return jobcontrol.host_str_to_list(hosts)