Jobcontrol¶
This document gives an overview of job control. For Jobcontrol FAQs, see Job Control Frequently Asked Questions.
Jobcontrol is a tool for running tasks asynchronously and (optionally) on different machines. Schrödinger wrote Jobcontrol instead of using off the shelf tools for distributed computation in order to deal with unusual requirements of our software, especially the size of the binaries required for our computations.
You might use Jobcontrol to launch a task from a laptop (running Maestro) to a compute node, so that the task runs on several cores. Jobcontrol takes care of transferring input files from your machine to the cluster, monitoring progress of the job, and collecting results and log files once the job is complete.
Jobcontrol is also used to write more complicated multistage workflows that treat the outputs of some jobs as inputs for later stages. This is discussed below.
Running a Job¶
Jobs are usually launched by running a command with -HOST <host entry argument>. Any executable that supports this option can be run under Jobcontrol.
Host entries are defined in schrodinger.hosts files that live at the root of your installation. Most executables will run the job on localhost if -HOST is not specified, others may require explicit -HOST localhost argument.
Example:
$SCHRODINGER/ligprep -imae in.mae -omae out.mae
This will run ligprep
locally on in.mae
and produce an
out.mae
output file.
Adding -HOST bolt_cpu
would submit the job to the bolt host defined in the
hosts file.
Ordinary scripts¶
It is possible to add support for Jobcontrol for simple Python scripts. This section will show how to add such support to a simple script.
In this example, myscript.py
will simply print out the hostname
that the script is running on to show that our script
will have different outputs on different machines.
import socket
def main():
print(socket.gethostname())
if __name__ == "__main__":
main()
$SCHRODINGER/run myscript.py
will print out your local hostname.
(In this case, jobcontrol is not involved at all, and the value of
SCHRODINGER/run
is just that it allows the use of Schrödinger’s Python
API.)
Add jobcontrol API¶
If we want to execute our script under jobcontrol, locally or remotely, we need
to add a function at the top level that jobcontrol can use to understand the
requirements of the job. This function must be called
get_job_spec_from_args
and it must return a launchapi.JobSpecification.
Note that the script must be importable as a module for Jobcontrol to be
able to run the script’s function.
An important requirement for running our script under jobcontrol
is to set
the script itself as an input file since it is not already in the distribution.
We do this in the example below using setInputFile("myscript.py")
. Note that
this is necessary even when we run with -HOST localhost
since even in that
case the job will run under a temporary working directory.
In in the example below, in addition to adding the script itself as an input
file, we also direct stderr
and stdout
to a file named “myscript.log” that
can be streamed as the job progresses (using jsc tail-file
) and will be
downloaded to the current working directory as output upon completion of the
job:
import socket
from schrodinger.job import launchapi
def get_job_spec_from_args(argv):
"""
Return a JobSpecification necessary to run this script on a remote
machine (e.g. under job control with the launch.py script).
:type argv: list(str)
:param argv: The list of command line arguments, including the script name
at [0], matching $SCHRODINGER/run __file__ sys.argv
"""
job_builder = launchapi.JobSpecificationArgsBuilder(argv)
job_builder.setStderr("myscript.log")
job_builder.setStdout("myscript.log")
job_builder.setInputFile("myscript.py")
return job_builder.getJobSpec()
def main():
print(socket.gethostname())
if __name__ == "__main__":
main()
Note that we specify input and logging file names via relative path and not absolute path.
$SCHRODINGER/run myscript.py
will run the script without Jobcontrol, and
will print out your local hostname.
$SCHRODINGER/run myscript.py -HOST localhost
will run the script under
Jobcontrol on your local machine, and will log your local hostname to myscript.log
$SCHRODINGER/run myscript.py -HOST bolt_cpu
will log the hostname of bolt compute node to myscript.log
Register input and output files¶
As mentioned above, files that are transferred from the launch machine to the compute machine need to be registered by job control, including the script itself if it is not within the SCHRODINGER installation on the compute machine.
Input files are registered with the setInputFile method when building a job
specification. Within this document, we use the invocation
setInputFile("myscript.py")
to register the current jobcontrol script as an
input file, intending the script file to be saved and run as myscript.py
.
Other input files for the job can be added in the same way.
Similarly, output files can be registered with the setOutputFile method when building a job specification.
In the following example, in addition to the script itself, we register an input maestro file and an output maestro file.:
import os
import sys
from schrodinger import structure
from schrodinger.job import launchapi
def get_job_spec_from_args(argv):
job_builder = launchapi.JobSpecificationArgsBuilder(argv)
input_file = argv[1]
output_mae_file = os.path.basename(input_file) + "processed.mae"
job_builder.setInputFile("myscript.py")
job_builder.setInputFile(input_file)
job_builder.setOutputFile(output_mae_file)
job_builder.setStderr("myscript.log")
job_builder.setStdout("myscript.log")
return job_builder.getJobSpec()
def main():
input_file = sys.argv[1]
output_file = os.path.basename(input_file) + "processed.mae"
with structure.StructureReader(input_file) as reader:
with structure.StructureWriter(output_file) as writer:
for ct in reader:
ct.title = ct.title + "processed"
writer.append(ct)
if __name__ == "__main__":
main()
Execute using: $SCHRODINGER/run myscript.py foo.mae -HOST localhost
Output files will be downloaded to the current working directory.
Using a jobname¶
Any job can be given a jobname
via the -JOBNAME option, and many scripts use
it to determine the names of the log files and output files. If -JOBNAME
option is not specified, some scripts derive the job name from the name of the
main input file, while others use a default job name.
Above, we explicitly called setStderr
and setStdout
in
get_job_spec_from_args
. In the example below, we instead pass
use_jobname_log=True
to the constructor of
launchapi.JobSpecificationArgsBuilder:
import socket
from schrodinger.job import launchapi
def get_job_spec_from_args(argv):
job_builder = launchapi.JobSpecificationArgsBuilder(argv, use_jobname_log=True)
job_builder.setInputFile("myscript.py")
return job_builder.getJobSpec()
def main():
print(socket.gethostname())
if __name__ == "__main__":
main()
Execute using: $SCHRODINGER/run myscript.py -JOBNAME foo -HOST localhost
Because we passed use_jobname_log=True
to the JobSpecificationArgsBuilder
constructor our log file will get a suitable name without having to call
setStderr
and setStdout
, as in the examples above.
Note that the use of use_jobname_log
in the example above requires us to
specify a jobname
in some way. This can be done via the command line (as we
just did above), by passing default_job_name
into the constructor of
JobSpecificationArgsBuilder, or by calling setJobname
on an instance of
JobSpecificationArgsBuilder.
Maestro Incorporation¶
Maestro files from a job can be marked for incorporation into maestro, meaning that its structures will show up in the project table.
def get_job_spec_from_args(argv):
job_builder = launchapi.JobSpecificationArgsBuilder(argv)
job_builder.setOutputFile("foo.mae", incorporate=True)
return job_builder.getJobSpec()
Note that it is possible to set multiple output files to be incorporated as separate entries. Only Maestro-formatted files are supported for incorporation.
Using $SCHRODINGER/run -FROM <product>¶
Some scripts require $SCHRODINGER/run -FROM <product> to run (when they need to load libraries from one of the product directories). In this case, we mark this when we create a JobSpecification:
def get_job_spec_from_args(argv):
job_builder = launchapi.JobSpecificationArgsBuilder(
argv, schrodinger_product="scisol")
return job_builder.getJobSpec()
Integration with an Argument Parser¶
An argument parser is useful when we want to document, validate, and access
command line arguments within a script. It is easy to integrate an argument
parser into a script that uses jobcontrol. Also notice how in this example,
default job name is derived from the name of the input file, by using the
fileutils.get_jobname()
function.
import argparse
import os
import sys
from schrodinger import structure
from schrodinger.job import launchapi
from schrodinger.utils import fileutils
def parse_args(argv):
parser = argparse.ArgumentParser()
parser.add_argument("inputfile", help="Input Maestro file")
args = parser.parse_args(argv)
return args
def get_job_spec_from_args(argv):
# first argument is this script
args_namespace = parse_args(argv[1:])
job_builder = launchapi.JobSpecificationArgsBuilder(argv, use_jobname_log=True)
job_builder.setInputFile(__file__)
job_builder.setInputFile(args_namespace.inputfile)
if not job_builder.getJobname():
job_builder.setJobname(fileutils.get_jobname(args_namespace.inputfile))
return job_builder.getJobSpec()
def main(argv):
args = parse_args(argv)
with structure.StructureReader(args.inputfile) as reader:
for ct in reader:
print(f"ct title={ct.title}")
if __name__ == '__main__':
main(sys.argv[1:])
JobDJ¶
Introduction to JobDJ¶
The foregoing has shown us how to run a single job, with options, on a specified host. But often jobs are knit together into larger workflows. For this we use JobDJ, a workflow tool that makes it possible to run multiple, potentially simultaneous jobs, and provides a mechanism for enforcing dependencies between jobs representing different stages of a computation.
A JobDJ instance can submit individual jobs to a queueing system (like SLURM or UGE) or an explicit list of compute machines.
This document will cover a few types of more complex distributed workflows.
Basic Usage¶
In the simplest case, a driver script running under jobcontrol just needs to define one or more subjobs and call the JobDJ object’s run() method. JobDJ will run the jobs on the hosts specified on the command line. For example, you might write a driver script driver.py for running a set of Jaguar jobs like this:
import argparse
import sys
from schrodinger.job import launchapi
from schrodinger.job import queue
def parse_args(argv):
parser = argparse.ArgumentParser()
parser.add_argument("inputfiles", nargs='+', help="Jaguar input files")
args = parser.parse_args(argv)
return args
def get_job_spec_from_args(argv):
args_namespace = parse_args(argv[1:])
job_builder = launchapi.JobSpecificationArgsBuilder(
argv, default_jobname="test_jaguar_driver", use_jobname_log=True)
job_builder.setInputFile("myscript.py")
for infile in args_namespace.inputfiles:
job_builder.setInputFile(infile)
return job_builder.getJobSpec()
def main(argv):
args = parse_args(argv)
jobdj = queue.JobDJ()
for infile in args.inputfiles:
cmd = ['jaguar', 'run', infile]
jobdj.addJob(cmd)
jobdj.run()
if __name__ == '__main__':
main(sys.argv[1:])
Note that jobdj.run() will block until all subjobs have completed, which is why driver scripts themselves should be run under jobcontrol.
Workflow Basics¶
Complex job workflows are organized by a driver script that is typically run under job control using a remote host. The use of a remote host eliminates disruptions to a network connection from the driver script to the cluster that might complicate jobs launches from, e.g., a personal laptop.
When any stage of a distributed computation completes, its output files will be returned to the current working directory of the driver script. In the event of network glitches, jobcontrol will attempt to return files for some time (currently about 30 minutes) and then the job is considered failed.
A common pattern in driver scripts for collecting output files for a job into a
single location is to change the current working directory of the driver for
the duration of the job to a directory that uniquely identifies that job
(using, e.g., the job_id
). This prevents later uses of the same workflow from
accidentally overwriting the outputs of previous ones.
When a job completes successfully or is cancelled by a user, the job directory is cleaned up. Files not set as output files are not returned but rather are simply removed. If a job fails we leave the output files in the directory to aid investigation.
There is no support for restartability currently built into jobcontrol but drivers are free to implement their own solutions.
Working With The Workflow Graph¶
Jobs are added to a workflow by calling addJob on a JobDJ object. In the
simplest case, a job command given as a list of strings can be passed to
addJob and these are turned into a Job
object internally. A
Job
is almost always an instance of JobControlJob. You can think of a job
as a node in the workflow graph.
Running addJob in this way will create isolated job nodes in the workflow graph with no dependencies on each other. Ultimately, this means that calling jobdj.run() will run all added jobs concurrently because the order at which they run doesn’t matter.
In more complex cases, we can create a Job
object and pass it to addJob.
Relationships between stages of a computation are specified with the
addPrereq method on a Job
object. (A job’s immediate prerequisites can be
inspected by calling getPrereqs on it.):
>>> from schrodinger.job import queue
>>> jobdj = queue.JobDJ()
>>> # A job that will produce a "producer.out" output file.
>>> producer_job = queue.JobControlJob(["testapp", "-t", "1", "-j", "producer"])
>>> # A job that will consume a "producer.out" and create a "consumer.out" output file.
>>> consumer_job = queue.JobControlJob(["testapp", "-t", "1", "-j", "consumer", "-i", "producer.out"])
>>> # Because consumer_job requires the output of producer_job as input, add
>>> # producer_job as a prerequisite.
>>> consumer_job.addPrereq(producer_job)
>>> consumer_job.getPrereqs() == {producer_job}
True
>>> # addJob has a kwarg `add_connected=True` which will automatically add
>>> # all prerequisite jobs to the jobdj instance, so we only need to add
>>> # consumer_job.
>>> jobdj.addJob(consumer_job)
>>> sorted(jobdj.all_jobs) == sorted([producer_job, consumer_job])
True
>>> # Calling jobdj.run() will now run producer_job to completion, download its output,
>>> # and only then run consumer_job.
The JobDJ.run() method accepts two optional arguments that allow a caller to register callbacks in connection with the workflow.
The first, status_change_callback
will be called whenever any Job’s status
changes. This callback will be called with the schrodinger.job.queue.BaseJob
whose status has changed.
For periodic actions while the workflow is in progress, the periodic_callback
can be used. This callback will be called with no arguments. The
periodic_callback
is run every callback_interval
seconds (approximately);
the default value is every 300 seconds.
Note that status_change_callback
and periodic_callback
will be called by
JobDJ in a blocking fashion - this means job updates will stop being
processed and won’t continue processing until your callable returns!
Intermediate actions can also be added by calling the addFinalizer method on the job object and passing in a callable. The callable will be invoked when the job completes successfully. For example, if a job outputs a file named “foo.txt” but a job in the next stage expects this file as an input file named “bar.txt”, you can add a finalizer to the initial job that will rename “foo.txt” to “bar.txt”:
import shutil
from schrodinger.job import queue
job = queue.JobControlJob(["myscript.py"])
def renameFile(job: queue.BaseJob):
shutil.move("foo.txt", "bar.txt")
job.addFinalizer(renameFile)
Like the JobDJ.run() callbacks, finalizers are blocking calls and should ideally be short operations. For longer operations, it may instead be preferable to launch a JobControlJob that invokes addPrereq with the job to be finalized.
Failure Tolerance¶
Jobs can fail for a variety of reasons such as incorrect scientific assumptions or network issues. Failures can be acceptable in some workflows but detrimental in others. JobDJ allows for flexibility in failure handling and job retrying through some simple configuration parameters.
When constructing a JobDJ class, you can use the max_retries
,
default_max_retries
, and max_failures
parameters to control failure and
retry behavior. The default behavior is that no subjobs will be retried, and no
failures tolerated.
The
max_retries
parameter is the number of allowed retries per subjob. If you wish to disable retries, set this value to zero.The
default_max_retries
parameter is the same as themax_retries
parameter, except that it allows theSCHRODINGER_MAX_RETRIES
environment variable to override this value.The
max_failures
parameter is the total number of allowed independent subjob failures before JobDJ exits. Subjob failures that were retried successfully do not add to this count. To allow an unlimited number of subjob failures, use the module levelNOLIMIT
constant (schrodinger.job.queue.NOLIMIT
).
For example, if you wish to create a JobDJ that always allows 3 retries per subjob and allows any number of total failures, use the invocation:
from schrodinger.job import queue
jobdj = queue.JobDJ(max_retries=3, max_failures=queue.NOLIMIT)
The max_retries
parameter is also available when creating an instance of a
JobControlJob, and will override the value set in JobDJ.