schrodinger.job.queue module¶
Provides a JobDJ
class for running multiple concurrent jobs. Jobs can have
dependencies and JobDJ can avoid starting a job until its prerequisites are met.
Step by step instructions for basic use:
Create a
JobDJ
instance. For example:job_dj = queue.JobDJ()
- Add jobs to the
JobDJ
instance by calling theJobDJ.addJob
method. the
addJob
method. For example:job_dj.addJob(["jaguar", "input1.inp"]))
- Add jobs to the
Run all jobs with the
JobDJ.run
method. This is simple:job_dj.run()
Copyright Schrodinger, LLC. All rights reserved.
- class schrodinger.job.queue.PrintableStatus(value)[source]¶
Bases:
str
,enum.Enum
An enumeration.
- FAILED_TO_LAUNCH = 'failed to launch'¶
- LAUNCHED = 'launched'¶
- RESTARTED = 'restarted'¶
- RESTARTING = 'restarting'¶
- STARTED = 'started'¶
- class schrodinger.job.queue.JobState(value)[source]¶
Bases:
str
,enum.Enum
An enumeration.
- WAITING = 'waiting'¶
- LAUNCHING = 'launching'¶
- ACTIVE = 'active'¶
- FAILED = 'failed'¶
- FAILED_RETRYABLE = 'failed but retryable'¶
- DONE = 'done'¶
- exception schrodinger.job.queue.MissingResourceError(required_type, available_resources)[source]¶
Bases:
RuntimeError
- template = "A job requires a '{}' type of compute resource not provided by any hosts in JobDJ: {}"¶
- args¶
- with_traceback()¶
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- exception schrodinger.job.queue.MaxJobFailureError[source]¶
Bases:
RuntimeError
- msg = 'Maximum number of failed jobs has been reached. All other subjobs will be killed and program will stop.'¶
- args¶
- with_traceback()¶
Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.
- schrodinger.job.queue.get_default_launch_timeout() int [source]¶
Get the default launch timeout in seconds.
- schrodinger.job.queue.get_update_delay() int [source]¶
Return the delay to use for jobdb reads in seconds.
- schrodinger.job.queue.backup_file(log_file: pathlib.Path, copy: bool)[source]¶
Backs up the file log_file copying it to .1, .2, etc.
- Parameters
log_file – name of file to backup
copy – If True, copy file to backup location, otherwise rename
- schrodinger.job.queue.get_command(base_command: List[str], host: str = 'localhost', procs: int = 1, local: bool = False) List[str] [source]¶
Given a base command and additional launching specifications, return a modified command that is ready to pass to jobcontrol.launch_job.
- class schrodinger.job.queue.BaseJob(command_dir: Optional[pathlib.Path] = None, resource_requirement: Optional[schrodinger.job.resource.ComputeRequirement] = None)[source]¶
Bases:
object
A base job class for jobs run under
JobDJ
.The main methods to be implemented in subclasses are:
doCommand
- The method that does the real work of the job, either running a simple local calculation or submitting a job to job control.update
- A method called periodically while a job is running to update its current state._getState
- The get method used in thestate
property, used byJobDJ
to determine the job’s current state.
A few additional methods only need to be implemented in special situations:
finalize
- If you want custom behavior in your finalize method, override this method.cancelSubmitted
- If the job can run under a queue, implementing this method allows jobs that are waiting in thesubmitted
state to be restarted immediately on a newly available non-queue host.getStatusStrings
- If you want to use theJobDJ
print summary, this method should be updated to provide more useful information.
The execution point for all jobs is in the
BaseJob.run
method. That method callspreCommand
,doCommand
andpostCommand
in order.For jobs that are run locally, all main computation should be done in the doCommand method method. Note that the doCommand method blocks until completion and so no additional work will be done (e.g. job updates or submissions) until it returns. For this reason, only short jobs should be run locally without job control.
- init_count = 0¶
- __init__(command_dir: Optional[pathlib.Path] = None, resource_requirement: Optional[schrodinger.job.resource.ComputeRequirement] = None)[source]¶
- Parameters
command_dir – The directory from which to run the command.
resource_requirement – Whether the job will require special compute resources, such as GPU.
- runsLocally() bool [source]¶
Return True if the job runs on the
JobDJ
control host, False if not. Jobs that run locally don’t need hosts.There is no limit on the number of locally run jobs.
- update()[source]¶
Update the current job status, stored in the
state
property.When a job is running, this method will be called periodically by
JobDJ
until the jobstate
property isJobState.DONE
.
- maxFailuresReached(msg: str)[source]¶
This is a method that will be called after the job has failed and the maximum number of failures per
JobDJ
run has been reached. After invoking this method,JobDJ
will raise aRuntimeError
and the process will exit.
- run(*args, **kwargs)[source]¶
Run the job.
- The steps taken are as follows:
Execute the preCommand method for things like changing the working directory.
Call the doCommand to do the actual work of computation or job launching.
Call the postCommand method to undo the changes from the preCommand that need to be undone.
- getCommandDir() str [source]¶
Return the launch/command directory name. If None is returned, the job will be launched in the current directory.
- preCommand()[source]¶
A method to make pre-command changes, like cd’ing to the correct directory to run the command in.
- setup()[source]¶
A method to do initial setup; executed after
preCommand
, just beforedoCommand
.
- property state: schrodinger.job.queue.JobState¶
Return the current state of the job.
Note that this method can be overridden by subclasses that wish to provide for restartability at a higher level than unpickling
BaseJob
instances. For example, by examining some external condition (e.g. presence of output files) the state JobState.DONE could be returned immediately and the job would not run.
- getStatusStrings() Tuple[str, str, str] [source]¶
Return a tuple of status strings for printing by
JobDJ
.The strings returned are (status, jobid, host).
- getJobDJ() schrodinger.job.queue.JobDJ [source]¶
Return the JobDJ instance that this job has been added to.
- addPrereq(job: schrodinger.job.queue.BaseJob)[source]¶
Add a job that is an immediate prerequisite for this one.
- addGroupPrereq(job: schrodinger.job.queue.BaseJob)[source]¶
Make all jobs connected to
job
prerequisites of all jobs connected to this Job.
- genAllPrereqs(seen=None) Generator[schrodinger.job.queue.BaseJob, None, None] [source]¶
A generator that yields all jobs that are prerequisites on this one.
- genAllJobs(seen: Optional[Set[schrodinger.job.queue.BaseJob]] = None) Generator[schrodinger.job.queue.BaseJob, None, None] [source]¶
A generator that yields all jobs connected to this one.
- addFinalizer(function: Callable[[schrodinger.job.queue.BaseJob], None], run_dir: Optional[str] = None)[source]¶
Add a function to be invoked when the job completes successfully.
See also the add_multi_job_finalizer function.
- class schrodinger.job.queue.SubprocessJob(command: List[str], command_dir: Optional[List[str]] = None, timeout: Optional[int] = None, stdout=- 1, stderr=- 1)[source]¶
Bases:
schrodinger.job.queue.BaseJob
A job for running an external process. By default, stdout and stderr are collected and made available as the ‘stdout’ and ‘stderr’ attributes when the job is completed.
- init_count = 0¶
- __init__(command: List[str], command_dir: Optional[List[str]] = None, timeout: Optional[int] = None, stdout=- 1, stderr=- 1)[source]¶
If stdout or stderr are expected to be large, you can pass an open file object instead of using PIPE.
- Parameters
command – The command to be run.
timeout – Timeout (in seconds) after which the subprocess will be killed. If None, the subprocess is allowed to run indefinitely.
stdout – The stdout argument to be passed to the subprocess Popen constructor.
stderr – The stderr argument to be passed to the subprocess Popen constructor.
- cancel()[source]¶
Send termination request to subprocess managed job. This method will eventually deprecate SubprocessJob.kill
- getDuration() Optional[int] [source]¶
Return the CPU time of the job in seconds.
If the job is still running, returns None.
- addFinalizer(function: Callable[[schrodinger.job.queue.BaseJob], None], run_dir: Optional[str] = None)¶
Add a function to be invoked when the job completes successfully.
See also the add_multi_job_finalizer function.
- addGroupPrereq(job: schrodinger.job.queue.BaseJob)¶
Make all jobs connected to
job
prerequisites of all jobs connected to this Job.
- addPrereq(job: schrodinger.job.queue.BaseJob)¶
Add a job that is an immediate prerequisite for this one.
- finalize()¶
Clean up after a job successfully runs.
- genAllJobs(seen: Optional[Set[schrodinger.job.queue.BaseJob]] = None) Generator[schrodinger.job.queue.BaseJob, None, None] ¶
A generator that yields all jobs connected to this one.
- genAllPrereqs(seen=None) Generator[schrodinger.job.queue.BaseJob, None, None] ¶
A generator that yields all jobs that are prerequisites on this one.
- getCommand() List[str] ¶
Return the command used to run this job.
- getCommandDir() str ¶
Return the launch/command directory name. If None is returned, the job will be launched in the current directory.
- getJobDJ() schrodinger.job.queue.JobDJ ¶
Return the JobDJ instance that this job has been added to.
- getPrereqs()¶
Return a set of all immediate prerequisites for this job.
- getStatusStrings() Tuple[str, str, str] ¶
Return a tuple of status strings for printing by
JobDJ
.The strings returned are (status, jobid, host).
- hasExited() bool ¶
Returns True if this job finished, successfully or not.
- hasStarted() bool ¶
Returns True if this job has started (not waiting)
- isComplete() bool ¶
Returns True if this job finished successfully
- maxFailuresReached(msg: str)¶
This is a method that will be called after the job has failed and the maximum number of failures per
JobDJ
run has been reached. After invoking this method,JobDJ
will raise aRuntimeError
and the process will exit.
- postCommand()¶
A method to restore things to the pre-command state.
- preCommand()¶
A method to make pre-command changes, like cd’ing to the correct directory to run the command in.
- run(*args, **kwargs)¶
Run the job.
- The steps taken are as follows:
Execute the preCommand method for things like changing the working directory.
Call the doCommand to do the actual work of computation or job launching.
Call the postCommand method to undo the changes from the preCommand that need to be undone.
- runsLocally() bool ¶
Return True if the job runs on the
JobDJ
control host, False if not. Jobs that run locally don’t need hosts.There is no limit on the number of locally run jobs.
- setup()¶
A method to do initial setup; executed after
preCommand
, just beforedoCommand
.
- property state: schrodinger.job.queue.JobState¶
Return the current state of the job.
Note that this method can be overridden by subclasses that wish to provide for restartability at a higher level than unpickling
BaseJob
instances. For example, by examining some external condition (e.g. presence of output files) the state JobState.DONE could be returned immediately and the job would not run.
- class schrodinger.job.queue.JobControlJob(command: List[str], command_dir: Optional[str] = None, name: Optional[str] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, launch_timeout: Optional[int] = None, launch_env_variables: Optional[Dict[str, str]] = None, resource_requirement: Optional[schrodinger.job.resource.ComputeRequirement] = None, **kwargs)[source]¶
Bases:
schrodinger.job.queue.BaseJob
This class defines a job control job to be run under
JobDJ
.- __init__(command: List[str], command_dir: Optional[str] = None, name: Optional[str] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, launch_timeout: Optional[int] = None, launch_env_variables: Optional[Dict[str, str]] = None, resource_requirement: Optional[schrodinger.job.resource.ComputeRequirement] = None, **kwargs)[source]¶
Job constructor.
- Parameters
command – The command that runs the job.
command_dir – The directory from which to run the command.
name – The name of the job.
max_retries – Number of allowed retries for this job. If this is set, it is never overridden by the
SCHRODINGER_MAX_RETRIES
environment variable. If it is not set, the value of max_retries defined in JobDJ is used, andSCHRODINGER_MAX_RETRIES
can be used to override this value at runtime. To prevent this job from being restarted altogether, set max_retries to zero.timeout – Timeout (in seconds) after which the job will be killed. If None, the job is allowed to run indefinitely.
launch_timeout – Timeout (in seconds) for the job launch process to complete. If None, a default timeout will be used for jobserver and old jobcontrol jobs ( see get_default_timeout() ) unless a value for job timeout parameter is passed and is not greater than the default timeout.
launch_env_variables – A dictionary with the environment variables to add when the jobcontrol job is launched. The name of any additional variables to set should be in the keyword of the dict and the value should be the corresponding value. These will be added to any environment variables already present, but removed after the job has been launched.
kwargs – Additional keyword arguments. Provided for consistency of interface in subclasses.
resource_requirement – Whether the job will require special compute resources, such as GPU.
- getJob() Optional[schrodinger.job.jobcontrol.Job] [source]¶
Return the job record as a schrodinger.job.jobcontrol.Job instance.
Returns None if the job hasn’t been launched.
- getDuration() Optional[int] [source]¶
Return the duration of the Job as recorded by job server. The duration does not include queue wait time.
If the job is running or has not launched, returns None.
Note that this method makes a blocking call to the job server.
- runsLocally() bool [source]¶
Return True if the job runs on the
JobDJ
control host, False if not. Jobs that run locally don’t need hosts.There is no limit on the number of locally run jobs.
- update()[source]¶
Checks for changes in job status, and updates the object appropriately (marks for restart, etc).
- Raises
RuntimeError – if an unknown Job Status or ExitStatus is encountered.
- doCommand(host: str, local: bool = False)[source]¶
Launch job on specified
host
using jobcontrol.launch_job().- Parameters
host – Host on which the job will be executed.
local – Removed in JOB_SERVER.
- cancelSubmitted() bool [source]¶
If the job is still in the ‘submitted’ state, cancel it, purge the jobrecord and set the job handle to None.
Return True if this was successful, False otherwise.
- retryFailure(max_retries: int = 0) bool [source]¶
This method will be called when the job has failed, and JobDJ needs to know whether the job should be retried or not.
JobDJ’s value for the max_retries parameter is passed in, to be used when the job doesn’t have its own max_retries value.
Return True if this job should be retried, otherwise False.
- cancel()[source]¶
Send kill request to jobcontrol managed job. This method will eventually deprecate JobControlJob.kill
- maxFailuresReached(msg: str)[source]¶
Print an error summary, including the last 20 lines from each log file in the LogFiles list of the job record.
- getStatusStrings() Tuple[str, str, str] [source]¶
Return a tuple of status strings for printing by
JobDJ
.The strings returned are (status, jobid, host).
- addFinalizer(function: Callable[[schrodinger.job.queue.BaseJob], None], run_dir: Optional[str] = None)¶
Add a function to be invoked when the job completes successfully.
See also the add_multi_job_finalizer function.
- addGroupPrereq(job: schrodinger.job.queue.BaseJob)¶
Make all jobs connected to
job
prerequisites of all jobs connected to this Job.
- addPrereq(job: schrodinger.job.queue.BaseJob)¶
Add a job that is an immediate prerequisite for this one.
- finalize()¶
Clean up after a job successfully runs.
- genAllJobs(seen: Optional[Set[schrodinger.job.queue.BaseJob]] = None) Generator[schrodinger.job.queue.BaseJob, None, None] ¶
A generator that yields all jobs connected to this one.
- genAllPrereqs(seen=None) Generator[schrodinger.job.queue.BaseJob, None, None] ¶
A generator that yields all jobs that are prerequisites on this one.
- getCommand() List[str] ¶
Return the command used to run this job.
- getCommandDir() str ¶
Return the launch/command directory name. If None is returned, the job will be launched in the current directory.
- getJobDJ() schrodinger.job.queue.JobDJ ¶
Return the JobDJ instance that this job has been added to.
- getPrereqs()¶
Return a set of all immediate prerequisites for this job.
- hasExited() bool ¶
Returns True if this job finished, successfully or not.
- hasStarted() bool ¶
Returns True if this job has started (not waiting)
- init_count = 0¶
- isComplete() bool ¶
Returns True if this job finished successfully
- postCommand()¶
A method to restore things to the pre-command state.
- preCommand()¶
A method to make pre-command changes, like cd’ing to the correct directory to run the command in.
- run(*args, **kwargs)¶
Run the job.
- The steps taken are as follows:
Execute the preCommand method for things like changing the working directory.
Call the doCommand to do the actual work of computation or job launching.
Call the postCommand method to undo the changes from the preCommand that need to be undone.
- setup()¶
A method to do initial setup; executed after
preCommand
, just beforedoCommand
.
- property state: schrodinger.job.queue.JobState¶
Return the current state of the job.
Note that this method can be overridden by subclasses that wish to provide for restartability at a higher level than unpickling
BaseJob
instances. For example, by examining some external condition (e.g. presence of output files) the state JobState.DONE could be returned immediately and the job would not run.
- class schrodinger.job.queue.LinkedListNode(value, prev=None)[source]¶
Bases:
object
A node for the LinkedList class, holding a value, and a reference to the previous and next node in the list.
- class schrodinger.job.queue.LinkedList[source]¶
Bases:
object
A doubly linked list, providing constant time addition, size, and truth checks. It provides for constant time removal if you have the node object in hand. It provides for linear time iteration without copying while allowing removals or additions to the list during iteration.
- reverse_iter()[source]¶
Iterate from tail to head over the list, yielding a (node, value) tuple for each element.
- remove(node: schrodinger.job.queue.LinkedListNode)[source]¶
Remove a node from the list.
- add(value: schrodinger.job.queue.LinkedListNode)[source]¶
Add a node to the list.
- class schrodinger.job.queue.RunningJobs[source]¶
Bases:
schrodinger.job.queue.LinkedList
A LinkedList subclass that tracks running jobs and keeps a tally of jobs running on each machine.
- add(job: schrodinger.job.queue.BaseJob)[source]¶
Add a running job.
- remove(node: schrodinger.job.queue.LinkedListNode)[source]¶
Remove a linked list node.
- __len__()¶
- reverse_iter()¶
Iterate from tail to head over the list, yielding a (node, value) tuple for each element.
- schrodinger.job.queue.add_multi_job_finalizer(function: Callable[[schrodinger.job.queue.BaseJob], None], jobs: List[schrodinger.job.queue.BaseJob], run_dir: Optional[str] = None)[source]¶
Create a finalizer function that will be called when all jobs in the jobs iterator are complete.
- class schrodinger.job.queue.PriorityQueue[source]¶
Bases:
object
This is a general priority queue.
- class schrodinger.job.queue.JobDJ(hosts: Optional[List[Tuple[str, int]]] = None, local: bool = False, max_retries: Optional[int] = None, default_max_retries: int = 0, max_failures: Optional[int] = None, verbosity: str = 'quiet', job_class: schrodinger.job.queue.BaseJob = <class 'schrodinger.job.queue.JobControlJob'>, update_delay: Optional[int] = None)[source]¶
Bases:
object
Class for running commands/jobs in parallel under jobcontrol.
Create an instance of this class, add commands to run with .addJob(), and then call run().
- __init__(hosts: Optional[List[Tuple[str, int]]] = None, local: bool = False, max_retries: Optional[int] = None, default_max_retries: int = 0, max_failures: Optional[int] = None, verbosity: str = 'quiet', job_class: schrodinger.job.queue.BaseJob = <class 'schrodinger.job.queue.JobControlJob'>, update_delay: Optional[int] = None)[source]¶
Constructor.
- Parameters
hosts – A list of (<hostname>, <maximum_concurrent_subjobs>) tuples, where <hostname> is a string and <maximum_concurrent_subjobs> is an integer. The default value of None is determined automatically and is usually desired.
local – No longer functional in JOB_SERVER.
max_retries – Number of allowed retries per subjob. If this is set, it is never overridden by the
SCHRODINGER_MAX_RETRIES
environment variable. If it is not set, the value in default_max_retries is used, andSCHRODINGER_MAX_RETRIES
is allowed to override. If you wish to disable restarting altogether, set this value to zero.default_max_retries – Number of allowed retries per subjob. This value can always be overridden by the
SCHRODINGER_MAX_RETRIES
environment variable. Default is zero.max_failures – Total number of allowed subjob failures before
JobDJ
exits. If it is not defined, a default of zero will be used (exit on any failure after attempting to restart), but this can be overridden with theSCHRODINGER_MAX_FAILURES
environment variable. To allow an unlimited number of subjob failures, set max_failures to the module level NOLIMIT constant.verbosity – There are three allowed verbosity levels: “quiet” - only warnings and errors are printed; “normal” -
JobDJ
progress is printed; and “verbose” - additional debugging info is printed. Default is “quiet”.job_class – The class to use as the default job constructor when the
addJob
argument is not aBaseJob
instance.update_delay – The number of seconds to wait between job control database reads for
JobControlJob
jobs. (This delay is for an individual job, not for any job database read.) Default is None, which causes the module level constant UPDATE_DELAY to be used.
- markForRestart(job: schrodinger.job.queue.BaseJob, action: str)[source]¶
Mark a job as dead, but make sure that it gets restarted.
- Parameters
action – Describes the reason the job is being restarted.
- property waiting_jobs: List[schrodinger.job.queue.BaseJob]¶
Jobs waiting to be started.
- property done_jobs: List[schrodinger.job.queue.BaseJob]¶
Successfully completed jobs, sorted into the order they were marked as completed by JobDJ.
- property active_jobs: List[schrodinger.job.queue.BaseJob]¶
- property failed_jobs: List[schrodinger.job.queue.BaseJob]¶
- property all_jobs: List[schrodinger.job.queue.BaseJob]¶
- property total_added: int¶
The number of individual jobs that have been added to the JobDJ instance.
- property total_active: int¶
The number of jobs currently running.
- property total_finished: int¶
The number of jobs that have finished successfully.
- property total_failed: int¶
The number of jobs that have failed.
- addJob(job: Union[schrodinger.job.queue.BaseJob, List], add_connected: bool = True, **kwargs)[source]¶
Add a job to run. If
job
is not aBaseJob
instance, aBaseJob
instance is constructed withjob
as the first argument. The defaultBaseJob
class for theJobDJ
instance can be specified in the constructor forJobDJ
.Additional keyword arguments are passed on to the job constructor.
All job prerequisites and dependencies need to be specified before adding a job to
JobDJ
.- Parameters
add_connected – If True, for jobs with dependencies only one job per connected group should be added and all connected jobs will be discovered and added automatically. If False, it is the user’s responsibility to make sure that any prerequisites of a job are also added.
- setSmartDistribution(state: bool)[source]¶
Set smart distribution of jobs.
- Parameters
state (bool) – Whether to enable smart distribution
- disableSmartDistribution()[source]¶
Disable smart distribution of jobs.
Smart distribution allows subjobs to run on the machine that JobDJ is running on when JobDJ itself is running under a queuing system. This is usually desirable since the JobDJ process doesn’t generally consume significant computational resources and you don’t want to leave a queue slot mostly idle.
- getActiveProcCounts() Dict[str, int] [source]¶
Return a dictionary containing the number of active jobs on each host.
- setHostList(host_list: List[Tuple[str, int]])[source]¶
Define compute hosts to run subjobs on.
Active jobs are not affected by a change in the host list.
- Parameters
host_list – A list of (<host_entry_name>, <maximum_concurrent_subjobs>) tuples, where <host_entry_name> is a string and <maximum_concurrent_subjobs> is an integer.
- printStatus(job: Optional[schrodinger.job.queue.BaseJob] = None, action: Optional[str] = None)[source]¶
Prints the status of
JobDJ
and the action/status for the job.If no job is specified, prints the status header.
If no action is specified, the
status_string
attribute of the job is used.
- run(*, status_change_callback: Optional[Callable[[schrodinger.job.queue.BaseJob], None]] = None, periodic_callback: Optional[Callable[[schrodinger.job.queue.BaseJob], None]] = None, callback_interval: int = 300, restart_failed: bool = True)[source]¶
Call this method to run all jobs that have been added. The method will return control when all jobs have completed.
- Parameters
status_change_callback – A function to call every time a job status changes. For example, JobState.RUNNING->JobState.DONE. This function takes a single argument of a schrodinger.job.queue.BaseJob object.
periodic_callback – A command to call periodically, regardless of whether job status has changed or not. The function will be called without any arguments.
callback_interval – The interval at which the periodic interval will be called. This time is only approximately enforced and will depend on the timing delay settings (e.g. MONITOR_DELAY).
restart_failed – True (default) if previously failed jobs should be restarted, False if not.