Source code for schrodinger.application.desmond.queue
import os
import time
from pathlib import Path
from typing import List
from typing import Optional
from typing import Tuple
from schrodinger.job import jobcontrol
from schrodinger.job import queue as jobcontrol_queue
# File names for automatic checkpointing
CHECKPOINT_REQUESTED_FILENAME = 'CHECKPOINT_REQUESTED'
CHECKPOINT_WITH_RESTART_REQUESTED_FILENAME = 'CHECKPOINT_WITH_RESTART_REQUESTED'
Host = Tuple[str, Optional[int]]
[docs]class Queue:
[docs] def __init__(self,
hosts: str,
max_job: int,
max_retries: int,
periodic_callback=None):
"""
:param hosts: string passed to -HOST.
:param max_job: Maximum number of jobs to run simultaneously.
:param max_retries: Maximum number of times to retry a failed job.
:param periodic_callback: Function to call periodically as the jobs run.
This can be used to handle the halt message for stopping
a running workflow.
"""
self.hosts = _parse_hosts(hosts, max_job)
self.max_retries = max_retries
self.jobdj = None
self.periodic_callback = periodic_callback
self._queued_jobs = []
self._max_stop_time = 3600
[docs] def run(self):
"""
Run jobs for all multisim stages.
Starts a separate JobDJ for each multisim stage.::
queue.push(jobs)
queue.run()
while jobs: <---------------|
jobdj.run() |
multisim_jobs.finish() |
stage.capture() |
next_stage.push() |
next_stage.release() |
queue.push(next_jobs) --
"""
while self._queued_jobs:
self._run_stage(self._queued_jobs)
[docs] def stop(self) -> int:
"""
Attempt to stop the subjobs, but kill them if they
do not stop in time.
:return: Number of subjobs killed due to a failure to stop.
"""
stop_time = time.time()
num_subjobs_killed = 0
# New jobs may be launched, so loop until
# no more jobs are running.
stopped_jobs = set()
while self.running_jobs:
for job in self.running_jobs:
jctrl = job.getJob()
if time.time() - stop_time > self._max_stop_time:
num_subjobs_killed += 1
jctrl.kill()
elif job not in stopped_jobs:
stopped_jobs.add(job)
jctrl.stop()
# Process the finished jobs
self._finish_stage()
return num_subjobs_killed
[docs] def push(self, jobs: List["cmj.Job"]): # noqa: F821
self._queued_jobs.extend(
filter(lambda j: j.jlaunch_cmd is not None, jobs))
@property
def running_jobs(self) -> List["JobAdapter"]:
running_jobs = []
if self.jobdj is None:
return []
return [
j for j in self.jobdj.active_jobs
if j.getJob() and not j.getJob().isComplete()
]
def _run_stage(self, jobs: List["JobAdapter"]):
"""
Launch JobDJ for a given set of jobs.
"""
# TODO: max_failures=jobcontrol_queue.NOLIMIT matches the current behavior
# but in some cases it would be better to just exit on the first failure.
self.jobdj = jobcontrol_queue.JobDJ(
hosts=self.hosts,
verbosity="normal",
job_class=JobAdapter,
max_failures=jobcontrol_queue.NOLIMIT,
max_retries=self.max_retries)
# The host running this often does not have a GPU
# so smart distribution should be disabled.
self.jobdj.disableSmartDistribution()
# Run all jobs for this stage
while jobs:
# Add the jobs, linking to the multisim jobs
for job in jobs:
self.jobdj.addJob(job.jlaunch_cmd,
multisim_job=job,
command_dir=job.dir)
# Clear queue, new jobs will be added if the current jobs
# are requeued.
self._queued_jobs = []
try:
self.jobdj.run(
status_change_callback=self._status_change_callback,
periodic_callback=self.periodic_callback,
callback_interval=60)
except RuntimeError:
# Use multisim error handling for failed jobs
pass
# Run any requeued jobs
jobs = self._queued_jobs
# Finish the stage and add new jobs (if any) to self._queued_jobs
self._finish_stage()
def _finish_stage(self):
from schrodinger.application.desmond.cmj import JobStatus
for job in self.jobdj.all_jobs:
jctrl = job.getJob()
if jctrl is None:
# Check for jobs that never ran
job.multisim_job.status.set(JobStatus.LAUNCH_FAILURE)
continue
# Skip intermediate jobs that were checkpointed
if job.is_checkpointed:
continue
job.multisim_job.process_completed_job(jctrl)
job.multisim_job.finish()
def _status_change_callback(self, job: "JobAdapter"):
"""
Process the job on a status change.
"""
from schrodinger.application.desmond.cmj import JobStatus
if job.state == jobcontrol_queue.JobState.WAITING:
job.multisim_job.status.set(JobStatus.WAITING)
elif job.state == jobcontrol_queue.JobState.ACTIVE:
job.multisim_job.status.set(JobStatus.RUNNING)
elif job.state == jobcontrol_queue.JobState.FAILED_RETRYABLE:
job.multisim_job.status.set(JobStatus.RETRIABLE_FAILURE)
elif job.state in (jobcontrol_queue.JobState.DONE,
jobcontrol_queue.JobState.FAILED):
if job.state == jobcontrol_queue.JobState.DONE:
jctrl = job.getJob()
for out_fname in jctrl.getOutputFiles():
if Path(out_fname
).name == CHECKPOINT_WITH_RESTART_REQUESTED_FILENAME:
job.multisim_job.process_completed_job(
jctrl, restart_requested=True)
job.multisim_job.requeue(jctrl)
job.is_checkpointed = True
self.push([job.multisim_job])
return
elif Path(out_fname).name == CHECKPOINT_REQUESTED_FILENAME:
job.multisim_job.process_completed_job(
jctrl, checkpoint_requested=True)
job.multisim_job.finish()
job.is_checkpointed = True
return
[docs]class JobAdapter(jobcontrol_queue.JobControlJob):
[docs] def __init__(self, *args, multisim_job=None, **kwargs):
self.multisim_job = multisim_job
# Set to True if this job is checkpointed by the auto restart mechanism
self.is_checkpointed = False
if launch_timeout := os.getenv('SCHRODINGER_MULTISIM_LAUNCH_TIMEOUT'):
kwargs['launch_timeout'] = int(launch_timeout)
else:
kwargs['launch_timeout'] = 1800
super().__init__(*args, **kwargs)
[docs] def getCommand(self) -> List[str]:
# Restart command has priority over the original command
return self.multisim_job.jlaunch_cmd
[docs] def maxFailuresReached(*args, **kwargs):
# Use multisim failure reporting for now
pass
def _parse_hosts(hosts: str, max_job: int) -> List[Host]:
"""
Parse the hosts while also respecting the given max_job.
See `Queue` for the meaning of the arguments.
:return: List of Host tuples.
"""
# Handle multiple hosts "localhost localhost"
split_hosts = hosts.strip().split()
if len(split_hosts) > 1:
# Only running on the same host is supported
if len(set(split_hosts)) == 1:
hosts = f'{split_hosts[0]}:{len(split_hosts)}'
else:
raise ValueError("Different hosts are no longer supported. "
"All jobs must be run on the same host.")
split_host = hosts.split(':')
host = split_host[0]
if max_job:
# Max job takes priority if specified
hosts = f'{host}:{max_job}'
else:
# For non-queue hosts, if max_job and
# the number of cpus are not specified,
# set to the number of processors.
# This is different from JobDJ, which
# set it to 1.
host_entry = jobcontrol.get_host(host)
if len(split_host) == 1 and not host_entry.isQueue():
hosts = f'{host}:{host_entry.processors}'
return jobcontrol.host_str_to_list(hosts)