schrodinger.job.queue module

Provides a JobDJ class for running multiple concurrent jobs. Jobs can have dependencies and JobDJ can avoid starting a job until its prerequisites are met.

Step by step instructions for basic use:

  1. Create a JobDJ instance. For example:

    job_dj = queue.JobDJ()
    
  2. Add jobs to the JobDJ instance by calling the JobDJ.addJob method.

    the addJob method. For example:

    job_dj.addJob(["jaguar", "input1.inp"]))
    
  3. Run all jobs with the JobDJ.run method. This is simple:

    job_dj.run()
    

Copyright Schrodinger, LLC. All rights reserved.

class schrodinger.job.queue.PrintableStatus(value)

Bases: schrodinger.StrEnum

An enumeration.

FAILED_TO_LAUNCH = 'failed to launch'
LAUNCHED = 'launched'
RESTARTED = 'restarted'
RESTARTING = 'restarting'
STARTED = 'started'
class schrodinger.job.queue.CancelSubmittedStatus(value)

Bases: enum.Enum

An enumeration.

SUCCESS = 1
FAILED_TO_ACQUIRE_LICENSE = 2
FAILED = 3
class schrodinger.job.queue.JobState(value)

Bases: schrodinger.StrEnum

An enumeration.

WAITING = 'waiting'
LAUNCHING = 'launching'
ACTIVE = 'active'
FAILED = 'failed'
FAILED_RETRYABLE = 'failed but retryable'
DONE = 'done'
exception schrodinger.job.queue.MissingResourceError(requirement: Union[None, str, schrodinger.job.resource.ComputeType], available_resources: List[schrodinger.job.queue._ComputeResource])

Bases: RuntimeError

template = "A job requires a '{}' type of compute resource not provided by any hosts in JobDJ: {}"
__init__(requirement: Union[None, str, schrodinger.job.resource.ComputeType], available_resources: List[schrodinger.job.queue._ComputeResource])

Create a MissingResourceError instance

Parameters
  • requirement – The requested resource, if any

  • available_resources – The list of available hosts that failed to meet the resource request

args
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception schrodinger.job.queue.HostTypeError

Bases: TypeError

Raised if hosts are provided to JobDJ incorrectly

__init__(*args, **kwargs)
args
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception schrodinger.job.queue.MaxJobFailureError

Bases: RuntimeError

msg = 'Maximum number of failed jobs has been reached. All other subjobs will be killed and program will stop.'
__init__()
args
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

schrodinger.job.queue.get_default_launch_timeout() int

Get the default launch timeout in seconds.

schrodinger.job.queue.get_update_delay() int

Return the delay to use for jobdb reads in seconds.

schrodinger.job.queue.backup_file(log_file: pathlib.Path, copy: bool)

Backs up the file log_file copying it to .1, .2, etc.

Parameters
  • log_file – name of file to backup

  • copy – If True, copy file to backup location, otherwise rename

schrodinger.job.queue.get_command(base_command: List[str], host: str = 'localhost', procs: int = 1, local: bool = False) List[str]

Given a base command and additional launching specifications, return a modified command that is ready to pass to jobcontrol.launch_job.

class schrodinger.job.queue.BaseJob(command_dir: Optional[pathlib.Path] = None, resource_requirement: Optional[Union[str, schrodinger.job.resource.ComputeType]] = None)

Bases: object

A base job class for jobs run under JobDJ.

The main methods to be implemented in subclasses are:

  1. doCommand - The method that does the real work of the job, either running a simple local calculation or submitting a job to job control.

  2. update - A method called periodically while a job is running to update its current state.

  3. _getState - The get method used in the state property, used by JobDJ to determine the job’s current state.

A few additional methods only need to be implemented in special situations:

  1. finalize - If you want custom behavior in your finalize method, override this method.

  2. cancelSubmitted - If the job can run under a queue, implementing this method allows jobs that are waiting in the submitted state to be restarted immediately on a newly available non-queue host.

  3. getStatusStrings - If you want to use the JobDJ print summary, this method should be updated to provide more useful information.

The execution point for all jobs is in the BaseJob.run method. That method calls preCommand, doCommand and postCommand in order.

For jobs that are run locally, all main computation should be done in the doCommand method method. Note that the doCommand method blocks until completion and so no additional work will be done (e.g. job updates or submissions) until it returns. For this reason, only short jobs should be run locally without job control.

init_count = 0
__init__(command_dir: Optional[pathlib.Path] = None, resource_requirement: Optional[Union[str, schrodinger.job.resource.ComputeType]] = None)
Parameters
  • command_dir – The directory from which to run the command.

  • resource_requirement – Whether the job will require a special compute resource, such as GPU.

runsLocally() bool

Return True if the job runs on the JobDJ control host, False if not. Jobs that run locally don’t need hosts.

There is no limit on the number of locally run jobs.

update()

Update the current job status, stored in the state property.

When a job is running, this method will be called periodically by JobDJ until the job state property is JobState.DONE.

maxFailuresReached(msg: str)

This is a method that will be called after the job has failed and the maximum number of failures per JobDJ run has been reached. After invoking this method, JobDJ will raise a RuntimeError and the process will exit.

finalize()

Clean up after a job successfully runs.

doCommand(*args, **kwargs)

Execute the command associated with this job.

run(*args, **kwargs)

Run the job.

The steps taken are as follows:
  1. Execute the preCommand method for things like changing the working directory.

  2. Call the doCommand to do the actual work of computation or job launching.

  3. Call the postCommand method to undo the changes from the preCommand that need to be undone.

getCommandDir() str

Return the launch/command directory name. If None is returned, the job will be launched in the current directory.

getCommand() List[str]

Return the command used to run this job.

preCommand()

A method to make pre-command changes, like cd’ing to the correct directory to run the command in.

setup()

A method to do initial setup; executed after preCommand, just before doCommand.

postCommand()

A method to restore things to the pre-command state.

property state: schrodinger.job.queue.JobState

Return the current state of the job.

Note that this method can be overridden by subclasses that wish to provide for restartability at a higher level than unpickling BaseJob instances. For example, by examining some external condition (e.g. presence of output files) the state JobState.DONE could be returned immediately and the job would not run.

hasExited() bool

Returns True if this job finished, successfully or not.

isComplete() bool

Returns True if this job finished successfully

hasStarted() bool

Returns True if this job has started (not waiting)

getStatusStrings() Tuple[str, str, str]

Return a tuple of status strings for printing by JobDJ.

The strings returned are (status, jobid, host).

getJobDJ() schrodinger.job.queue.JobDJ

Return the JobDJ instance that this job has been added to.

addPrereq(job: schrodinger.job.queue.BaseJob)

Add a job that is an immediate prerequisite for this one.

addGroupPrereq(job: schrodinger.job.queue.BaseJob)

Make all jobs connected to job prerequisites of all jobs connected to this Job.

getPrereqs()

Return a set of all immediate prerequisites for this job.

genAllPrereqs(seen=None) Generator[schrodinger.job.queue.BaseJob, None, None]

A generator that yields all jobs that are prerequisites on this one.

genAllJobs(seen: Optional[Set[schrodinger.job.queue.BaseJob]] = None) Generator[schrodinger.job.queue.BaseJob, None, None]

A generator that yields all jobs connected to this one.

addFinalizer(function: Callable[[schrodinger.job.queue.BaseJob], None], run_dir: Optional[str] = None)

Add a function to be invoked when the job completes successfully.

See also the add_multi_job_finalizer function.

class schrodinger.job.queue.SubprocessJob(command: List[str], command_dir: Optional[str] = None, timeout: Optional[int] = None, stdout=- 1, stderr=- 1)

Bases: schrodinger.job.queue.BaseJob

A job for running an external process. By default, stdout and stderr are collected and made available as the ‘stdout’ and ‘stderr’ attributes when the job is completed.

init_count = 0
__init__(command: List[str], command_dir: Optional[str] = None, timeout: Optional[int] = None, stdout=- 1, stderr=- 1)

If stdout or stderr are expected to be large, you can pass an open file object instead of using PIPE.

Parameters
  • command – The command to be run.

  • timeout – Timeout (in seconds) after which the subprocess will be killed. If None, the subprocess is allowed to run indefinitely.

  • stdout – The stdout argument to be passed to the subprocess Popen constructor.

  • stderr – The stderr argument to be passed to the subprocess Popen constructor.

update()

Update the current job status and set state.

doCommand(*args, **kwargs)

Execute the command associated with this job via subprocess.

kill()

Send termination request to subprocess managed job.

cancel()

Send termination request to subprocess managed job. This method will eventually deprecate SubprocessJob.kill

getDuration() Optional[int]

Return the CPU time of the job in seconds.

If the job is still running, returns None.

addFinalizer(function: Callable[[schrodinger.job.queue.BaseJob], None], run_dir: Optional[str] = None)

Add a function to be invoked when the job completes successfully.

See also the add_multi_job_finalizer function.

addGroupPrereq(job: schrodinger.job.queue.BaseJob)

Make all jobs connected to job prerequisites of all jobs connected to this Job.

addPrereq(job: schrodinger.job.queue.BaseJob)

Add a job that is an immediate prerequisite for this one.

finalize()

Clean up after a job successfully runs.

genAllJobs(seen: Optional[Set[schrodinger.job.queue.BaseJob]] = None) Generator[schrodinger.job.queue.BaseJob, None, None]

A generator that yields all jobs connected to this one.

genAllPrereqs(seen=None) Generator[schrodinger.job.queue.BaseJob, None, None]

A generator that yields all jobs that are prerequisites on this one.

getCommand() List[str]

Return the command used to run this job.

getCommandDir() str

Return the launch/command directory name. If None is returned, the job will be launched in the current directory.

getJobDJ() schrodinger.job.queue.JobDJ

Return the JobDJ instance that this job has been added to.

getPrereqs()

Return a set of all immediate prerequisites for this job.

getStatusStrings() Tuple[str, str, str]

Return a tuple of status strings for printing by JobDJ.

The strings returned are (status, jobid, host).

hasExited() bool

Returns True if this job finished, successfully or not.

hasStarted() bool

Returns True if this job has started (not waiting)

isComplete() bool

Returns True if this job finished successfully

maxFailuresReached(msg: str)

This is a method that will be called after the job has failed and the maximum number of failures per JobDJ run has been reached. After invoking this method, JobDJ will raise a RuntimeError and the process will exit.

postCommand()

A method to restore things to the pre-command state.

preCommand()

A method to make pre-command changes, like cd’ing to the correct directory to run the command in.

run(*args, **kwargs)

Run the job.

The steps taken are as follows:
  1. Execute the preCommand method for things like changing the working directory.

  2. Call the doCommand to do the actual work of computation or job launching.

  3. Call the postCommand method to undo the changes from the preCommand that need to be undone.

runsLocally() bool

Return True if the job runs on the JobDJ control host, False if not. Jobs that run locally don’t need hosts.

There is no limit on the number of locally run jobs.

setup()

A method to do initial setup; executed after preCommand, just before doCommand.

property state: schrodinger.job.queue.JobState

Return the current state of the job.

Note that this method can be overridden by subclasses that wish to provide for restartability at a higher level than unpickling BaseJob instances. For example, by examining some external condition (e.g. presence of output files) the state JobState.DONE could be returned immediately and the job would not run.

class schrodinger.job.queue.JobControlJob(command: List[str], command_dir: Optional[str] = None, name: Optional[str] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, launch_timeout: Optional[int] = None, launch_env_variables: Optional[Dict[str, str]] = None, resource_requirement: Optional[Union[str, schrodinger.job.resource.ComputeType]] = None, license_requirement: Optional[List[str]] = None, smart_dist_eligible: Optional[bool] = True, **kwargs)

Bases: schrodinger.job.queue.BaseJob

This class defines a job control job to be run under JobDJ.

__init__(command: List[str], command_dir: Optional[str] = None, name: Optional[str] = None, max_retries: Optional[int] = None, timeout: Optional[int] = None, launch_timeout: Optional[int] = None, launch_env_variables: Optional[Dict[str, str]] = None, resource_requirement: Optional[Union[str, schrodinger.job.resource.ComputeType]] = None, license_requirement: Optional[List[str]] = None, smart_dist_eligible: Optional[bool] = True, **kwargs)

Job constructor.

Parameters
  • command – The command that runs the job.

  • command_dir – The directory from which to run the command.

  • name – The name of the job.

  • max_retries – Number of allowed retries for this job. If this is set, it is never overridden by the SCHRODINGER_MAX_RETRIES environment variable. If it is not set, the value of max_retries defined in JobDJ is used, and SCHRODINGER_MAX_RETRIES can be used to override this value at runtime. To prevent this job from being restarted altogether, set max_retries to zero.

  • timeout – Timeout (in seconds) after which the job will be killed. If None, the job is allowed to run indefinitely.

  • launch_timeout – Timeout (in seconds) for the job launch process to complete. If None, a default timeout will be used for jobserver and old jobcontrol jobs ( see get_default_timeout() ) unless a value for job timeout parameter is passed and is not greater than the default timeout.

  • launch_env_variables – A dictionary with the environment variables to add when the jobcontrol job is launched. The name of any additional variables to set should be in the keyword of the dict and the value should be the corresponding value. These will be added to any environment variables already present, but removed after the job has been launched.

  • kwargs – Additional keyword arguments. Provided for consistency of interface in subclasses.

  • resource_requirement – Whether the job will require special compute resources, such as GPU.

  • license_requirement – List of license tokens required for the job to be used for license checking when SMART_LICENSE_CHECK feature flag is turned on. This is useful for license checking the first job of the smart distribution launched directly to the localhost without canceling from the queue. The license requirements are not known until the job is launched. Each license token is in the form ‘TOKEN’ or ‘TOKEN:n’ where TOKEN is the name of the license, and n is the number of tokens.

  • smart_dist_eligible – Whether this job can be submitted via smart distribution (True) or not (False). This setting only comes into play if all other requirements (such as the resource_requirement, license requirement, number of processors, and smart distribution being turned on) are met. In other words, setting it to True will not force the job to run via smart distribution, but setting it to False will ensure that it does not.

addLaunchEnv(key: str, val: str)

Adds the given environment key and and value to the list of launch environment.

Parameters
  • key – environment key to add to the launch environment.

  • val – environment value associcated with the key to add to the launch environment.

getJob() Optional[schrodinger.job.jobcontrol.Job]

Return the job record as a schrodinger.job.jobcontrol.Job instance.

Returns None if the job hasn’t been launched.

getDuration() Optional[int]

Return the duration of the Job as recorded by job server. The duration does not include queue wait time.

If the job is running or has not launched, returns None.

Note that this method makes a blocking call to the job server.

runsLocally() bool

Return True if the job runs on the JobDJ control host, False if not. Jobs that run locally don’t need hosts.

There is no limit on the number of locally run jobs.

usesJobServer() bool

Detect, by looking at the jobId, whether this job uses a job server.

update()

Checks for changes in job status, and updates the object appropriately (marks for restart, etc).

Raises

RuntimeError – if an unknown Job Status or ExitStatus is encountered.

doCommand(host: str, local: bool = False)

Launch job on specified host using jobcontrol.launch_job().

Parameters
  • host – Host on which the job will be executed.

  • local – Removed in JOB_SERVER.

acquireLicenseForSmartDistribution() bool

Acquire and hold licenses for a smart distribution job. This makes sure the job won’t fail due to unavailable licenses.

Returns True if the licenses registered for the job are acquired, and False if they are not. If no licenses are registered, it always returns True to avoid preventing jobs from using the smart distribution feature. For legacy jobcontrol, the license check is not performed, and is always returned True. We want to use this feature as a pitch to move users to JOB_SERVER.

cancelSubmitted(do_license_check: bool = False) schrodinger.job.queue.CancelSubmittedStatus

If the job is still in the ‘submitted’ state, cancel it, purge the jobrecord and set the job handle to None. This tries to acquire licenses for the job before canceling from the queue if do_license_check is turned on.

Parameters

do_license_check – Acquire licenses for the job before canceling from the queue.

Returns one of the status of CancelSubmittedStatus.

retryFailure(max_retries: int = 0) bool

This method will be called when the job has failed, and JobDJ needs to know whether the job should be retried or not.

JobDJ’s value for the max_retries parameter is passed in, to be used when the job doesn’t have its own max_retries value.

Return True if this job should be retried, otherwise False.

kill()

Send kill request to jobcontrol managed job

cancel()

Send kill request to jobcontrol managed job. This method will eventually deprecate JobControlJob.kill

maxFailuresReached(msg: str)

Print an error summary, including the last 20 lines from each log file in the LogFiles list of the job record.

getStatusStrings() Tuple[str, str, str]

Return a tuple of status strings for printing by JobDJ.

The strings returned are (status, jobid, host).

addFinalizer(function: Callable[[schrodinger.job.queue.BaseJob], None], run_dir: Optional[str] = None)

Add a function to be invoked when the job completes successfully.

See also the add_multi_job_finalizer function.

addGroupPrereq(job: schrodinger.job.queue.BaseJob)

Make all jobs connected to job prerequisites of all jobs connected to this Job.

addPrereq(job: schrodinger.job.queue.BaseJob)

Add a job that is an immediate prerequisite for this one.

finalize()

Clean up after a job successfully runs.

genAllJobs(seen: Optional[Set[schrodinger.job.queue.BaseJob]] = None) Generator[schrodinger.job.queue.BaseJob, None, None]

A generator that yields all jobs connected to this one.

genAllPrereqs(seen=None) Generator[schrodinger.job.queue.BaseJob, None, None]

A generator that yields all jobs that are prerequisites on this one.

getCommand() List[str]

Return the command used to run this job.

getCommandDir() str

Return the launch/command directory name. If None is returned, the job will be launched in the current directory.

getJobDJ() schrodinger.job.queue.JobDJ

Return the JobDJ instance that this job has been added to.

getPrereqs()

Return a set of all immediate prerequisites for this job.

hasExited() bool

Returns True if this job finished, successfully or not.

hasStarted() bool

Returns True if this job has started (not waiting)

init_count = 0
isComplete() bool

Returns True if this job finished successfully

postCommand()

A method to restore things to the pre-command state.

preCommand()

A method to make pre-command changes, like cd’ing to the correct directory to run the command in.

run(*args, **kwargs)

Run the job.

The steps taken are as follows:
  1. Execute the preCommand method for things like changing the working directory.

  2. Call the doCommand to do the actual work of computation or job launching.

  3. Call the postCommand method to undo the changes from the preCommand that need to be undone.

setup()

A method to do initial setup; executed after preCommand, just before doCommand.

property state: schrodinger.job.queue.JobState

Return the current state of the job.

Note that this method can be overridden by subclasses that wish to provide for restartability at a higher level than unpickling BaseJob instances. For example, by examining some external condition (e.g. presence of output files) the state JobState.DONE could be returned immediately and the job would not run.

class schrodinger.job.queue.LinkedListNode(value, prev=None)

Bases: object

A node for the LinkedList class, holding a value, and a reference to the previous and next node in the list.

__init__(value, prev=None)
class schrodinger.job.queue.LinkedList

Bases: object

A doubly linked list, providing constant time addition, size, and truth checks. It provides for constant time removal if you have the node object in hand. It provides for linear time iteration without copying while allowing removals or additions to the list during iteration.

__init__()
__len__()
reverse_iter()

Iterate from tail to head over the list, yielding a (node, value) tuple for each element.

remove(node: schrodinger.job.queue.LinkedListNode)

Remove a node from the list.

add(value: schrodinger.job.queue.BaseJob)

Add a node to the list.

class schrodinger.job.queue.RunningJobs

Bases: schrodinger.job.queue.LinkedList

A LinkedList subclass that tracks running jobs and keeps a tally of jobs running on each machine.

__init__()
add(job: schrodinger.job.queue.BaseJob)

Add a running job.

remove(node: schrodinger.job.queue.LinkedListNode)

Remove a linked list node.

jobsCount() Dict[str, schrodinger.job.queue._RunningJobHost]

Return a dict telling how many jobs are running on each host.

__len__()
reverse_iter()

Iterate from tail to head over the list, yielding a (node, value) tuple for each element.

schrodinger.job.queue.add_multi_job_finalizer(function: Callable[[schrodinger.job.queue.BaseJob], None], jobs: List[schrodinger.job.queue.BaseJob], run_dir: Optional[str] = None)

Create a finalizer function that will be called when all jobs in the jobs iterator are complete.

class schrodinger.job.queue.PriorityQueue

Bases: object

This is a general priority queue.

__init__()
__len__()
push(item)

Add an item to the heap. This item must have a __lt__ method as per the heapq module requirement.

pop()

Get the highest priority item, removing it from the heap.

remove(item)

Remove any copies of item from the heap.

class schrodinger.job.queue.ResourceHost(name: str, procs: int, resource: Optional[str] = None)

Bases: object

Tracks the allowed number of processors for each resource on a given host

Use a ResourceHost object to specify a host for JobDJ that has processors reserved for jobs that request a specific resource. For instance, one could reserve some hosts to run only CPU jobs and some to run only GPU jobs. Or one could specify that a single host can run X CPU jobs and Y GPU jobs. Or that some hosts can run parallel jobs and some can only run serial jobs.

To reserve processors on a host for jobs that request a specific resource, pass that resource into the __init__ method or the addProcs method of this class. Pass the created ResourceHost instances into JobDJ via the hosts argument to the JobDJ constructor or the setHostList method. Then pass that same resource into the JobControlJob class as the resource_requirement.

A “resource” can be any custom string, and JobDJ will match jobs with a resource requirement and hosts that have the same resource.

Example code snippets::

CPU = ‘cpu’ GPU = ‘gpu’ cpu_host = queue.ResourceHost(cpu_hostname, cpu_procs, resource=CPU) gpu_host = queue.ResourceHost(gpu_hostname, gpu_procs, resource=GPU) hosts = [cpu_host, gpu_host] jobq = queue.JobDJ(hosts=hosts, smart_dist_resources=[CPU]) gpujob = queue.JobControlJob(cmd, resource_requirement=GPU)

Note - jobs can also have a resource_requirement that is a special schrodinger.job.resource.ComputeType enum for CPU and GPU. In that case JobDJ will use host file information and GPU detection to determine automatically which machines can run CPU or GPU jobs, and the ResourceHost class should not be used by calling code to define CPU and GPU hosts. While more convenient, that method has limitations such as the lack of ability to differentiate hosts with good GPU vs slow GPU, or to define different processor limits for CPU and GPU on the same host (-cpu_host a_host:8 -gpu_host a_host:2 will not work properly, for instance), and has issues with smart distribution (inability to control whether CPU or GPU jobs run on localhost, and inability to properly decrement processor limits on other hosts to account for the type of running smart distribution job).

__init__(name: str, procs: int, resource: Optional[str] = None)

Create a ResourceHost instance

Parameters
  • name – The name of the host as found in the .hosts file

  • procs – The number of processors this host has for the given resource

  • resource – A resource this host provides. None means jobs with no resource requirement can use these processors. Jobs can only request one type of resource although hosts can provide more than one type of resource. To add additional provided resource types for this host use the addProcessors method.

getManagedResource(resource: Union[str, schrodinger.job.resource.ComputeType]) Optional[str]

Translate the given resource name into a resource name this class manages. This is typically just the same resource name as input. However, since this class doesn’t manage jobresource.ComputeType resources, None is returned for those resources.

Parameters

resource – The resource to check

Returns

The managed resource type

addProcessors(procs: int, resource: Optional[str] = None)

Add some allowed processors for the given resource. These processors will be added to any previously added processors for the same resource

Parameters
  • procs – The number of processors to add. Use UNLIMITED_CPUS if there is no limit to the number of processors

  • resource – The job resource request these processors satisfy

removeProcessors(procs: int, resource: Optional[str] = None)

Remove some allowed processors for the given resource

Parameters
  • procs – The number of processors to remove

  • resource – The job resource to remove the processors from

getAllowedProcessors(resource: Optional[Union[str, schrodinger.job.resource.ComputeType]] = None) int

Get the number of processors that satisfy the given resource

Parameters

resource – The job resource to check

Returns

The number of processors on this host that satisfy this resource. The processors may or may not be currently in use. 0 is returned if this host does not provide the given resource.

getTotalProcessors()

Get all the processors this host provides across all resources

Returns

The total number of processors

matchesRequirement(requirement: Union[str, schrodinger.job.resource.ComputeType]) bool

Determine whether this host can meet the resource requirements requested.

All hosts will meet jobresource.ComputeType.CPU resource requirements.

schrodinger.job.queue.create_cpu_and_gpu_hosts(cpu_info: Tuple[str, int], gpu_info: Tuple[str, int]) List[schrodinger.job.queue.ResourceHost]

Convenience function to create ResourceHosts for a cpu host and a gpu host that will allow the JobDJ to manage CPU and GPU jobs separately but simultaneously. These host objects can then be passed in when creating the JobDJ via the hosts keyword argument. Jobs can be specified to run on one type of host or the other by passing in resource_requirement=queue.CPU_RESOURCE or resource_requirement=queue.GPU_RESOURCE when creating the JobControlJob

Parameters
  • cpu_info (tuple) – The name of the cpu host and the number of processors it provides (hostname, processors)

  • gpu_info (tuple) – The name of the gpu host and the number of processors it provides (hostname, processors)

Return type

list

Returns

The first item of the list is the cpu ResourceHost and the second item of the list is the gpu ResourceHost

Raises

HostTypeError – if the provided info has the wrong format

class schrodinger.job.queue.JobDJ(hosts: Optional[List[Union[Tuple[str, int], schrodinger.job.queue.ResourceHost]]] = None, local: bool = False, max_retries: Optional[int] = None, default_max_retries: int = 0, max_failures: Optional[int] = None, verbosity: str = 'quiet', job_class: schrodinger.job.queue.BaseJob = <class 'schrodinger.job.queue.JobControlJob'>, update_delay: Optional[int] = None, smart_dist_resources: Optional[list] = None)

Bases: object

Class for running commands/jobs in parallel under jobcontrol.

Create an instance of this class, add commands to run with .addJob(), and then call run().

__init__(hosts: Optional[List[Union[Tuple[str, int], schrodinger.job.queue.ResourceHost]]] = None, local: bool = False, max_retries: Optional[int] = None, default_max_retries: int = 0, max_failures: Optional[int] = None, verbosity: str = 'quiet', job_class: schrodinger.job.queue.BaseJob = <class 'schrodinger.job.queue.JobControlJob'>, update_delay: Optional[int] = None, smart_dist_resources: Optional[list] = None)

Constructor.

Parameters
  • hosts – A list of hosts to run on. Items of the list can be a (<hostname>, <maximum_concurrent_subjobs>) tuple that gives a host name and the maximum number of processors that can be used on that host. Or, items of the list can be a ResourceHost object that provides a detailed listing of the number of processors on each host that can be used for jobs requesting specific resources. The list can mix tuple and ResourceHost items. If a host appears more than once (whether as a tuple or ResourceHost), the information in the multiple entries is added together. Processors in the tuple form are only considered usable by jobs that have resource_requirement=None (The default state for jobs). The default value of None for this parameter means the host information is determined automatically from the command line and is usually desired.

  • local – No longer functional in JOB_SERVER.

  • max_retries – Number of allowed retries per subjob. If this is set, it is never overridden by the SCHRODINGER_MAX_RETRIES environment variable. If it is not set, the value in default_max_retries is used, and SCHRODINGER_MAX_RETRIES is allowed to override. If you wish to disable restarting altogether, set this value to zero.

  • default_max_retries – Number of allowed retries per subjob. This value can always be overridden by the SCHRODINGER_MAX_RETRIES environment variable. Default is zero.

  • max_failures – Total number of allowed subjob failures before JobDJ exits. If it is not defined, a default of zero will be used (exit on any failure after attempting to restart), but this can be overridden with the SCHRODINGER_MAX_FAILURES environment variable. To allow an unlimited number of subjob failures, set max_failures to the module level NOLIMIT constant.

  • verbosity – There are three allowed verbosity levels: “quiet” - only warnings and errors are printed; “normal” - JobDJ progress is printed; and “verbose” - additional debugging info is printed. Default is “quiet”.

  • job_class – The class to use as the default job constructor when the addJob argument is not a BaseJob instance.

  • update_delay – The number of seconds to wait between job control database reads for JobControlJob jobs. (This delay is for an individual job, not for any job database read.) Default is None, which causes the module level constant UPDATE_DELAY to be used.

  • smart_dist_resources – Custom resources that the localhost provides for smart distribution. Jobs that require a custom resource that is not included in this list will not be run on the localhost even if smart distribution is turned on. This information is unused if smart distribution is off. Each item of the list is a string that defines a custom resource.

Raises

HostTypeError – if the host argument or one of its items is of the wrong type

hasStarted() bool

Returns True if JobDJ has started already

isComplete() bool

Returns True if JobDJ has completed, False otherwise.

markForRestart(job: schrodinger.job.queue.BaseJob, action: str)

Mark a job as dead, but make sure that it gets restarted.

Parameters

action – Describes the reason the job is being restarted.

property waiting_jobs: List[schrodinger.job.queue.BaseJob]

Jobs waiting to be started.

property done_jobs: List[schrodinger.job.queue.BaseJob]

Successfully completed jobs, sorted into the order they were marked as completed by JobDJ.

property active_jobs: List[schrodinger.job.queue.BaseJob]
property failed_jobs: List[schrodinger.job.queue.BaseJob]
property all_jobs: List[schrodinger.job.queue.BaseJob]
killJobs()

Kill all active jobs

property total_added: int

The number of individual jobs that have been added to the JobDJ instance.

property total_active: int

The number of jobs currently running.

property total_finished: int

The number of jobs that have finished successfully.

property total_failed: int

The number of jobs that have failed.

addJob(job: Union[schrodinger.job.queue.BaseJob, List], add_connected: bool = True, **kwargs)

Add a job to run. If job is not a BaseJob instance, a BaseJob instance is constructed with job as the first argument. The default BaseJob class for the JobDJ instance can be specified in the constructor for JobDJ.

Additional keyword arguments are passed on to the job constructor.

All job prerequisites and dependencies need to be specified before adding a job to JobDJ.

Parameters

add_connected – If True, for jobs with dependencies only one job per connected group should be added and all connected jobs will be discovered and added automatically. If False, it is the user’s responsibility to make sure that any prerequisites of a job are also added.

dump(filename: pathlib.Path)

Pickle the JobDJ instance to the specified file name.

setSmartDistribution(state: bool)

Set smart distribution of jobs.

Parameters

state (bool) – Whether to enable smart distribution

disableSmartDistribution()

Disable smart distribution of jobs.

Smart distribution allows subjobs to run on the machine that JobDJ is running on when JobDJ itself is running under a queuing system. This is usually desirable since the JobDJ process doesn’t generally consume significant computational resources and you don’t want to leave a queue slot mostly idle.

getActiveProcCounts() Dict[str, int]

Return a dictionary containing the number of active jobs on each host.

Any resource requirement of the active jobs is ignored.

getActiveProcsOnHost(hostname: str, requirement: Optional[Union[str, schrodinger.job.resource.ComputeType]]) int

Get the current number of processors on the given host that are currently occupied by jobs that request the given requirement

Parameters
  • hostname – The host to check

  • requirement – The job requirement to check

Returns

The number of currently used processors on hostname that provide requirement

static createResourceHost(host_info: Tuple[str, int], resource: Optional[str] = None) schrodinger.job.queue.ResourceHost

Given a tuple of (hostname, procs), return a ResourceHost.

Parameters
  • host_info (tuple) – The hostname and number of processors it has

  • resource (str) – The resource the host provides, if any. If not given, the host will only run jobs that have no resource_requirement.

Return type

ResourceHost

Returns

The ResourceHost created from in the input data

Raises

HostTypeError – If the provided host_info has the wrong format

setHostList(host_list: List[Union[Tuple[str, int], schrodinger.job.queue.ResourceHost]])

Define compute hosts to run subjobs on.

Active jobs are not affected by a change in the host list.

Parameters

host_list – A list of hosts to run on. See the documentation for the hosts parameter to the __init__ method of this class for additional information.

Raises

HostTypeError – If one of the host_list items is not of the correct type

getFirstHost()

Get first host from the lists of hosts.

Return type

str

Returns

First host

printStatus(job: Optional[schrodinger.job.queue.BaseJob] = None, action: Optional[str] = None)

Prints the status of JobDJ and the action/status for the job.

If no job is specified, prints the status header.

If no action is specified, the status_string attribute of the job is used.

run(*, status_change_callback: Optional[Callable[[schrodinger.job.queue.BaseJob], None]] = None, periodic_callback: Optional[Callable[[schrodinger.job.queue.BaseJob], None]] = None, callback_interval: int = 300, restart_failed: bool = True)

Call this method to run all jobs that have been added. The method will return control when all jobs have completed.

Parameters
  • status_change_callback – A function to call every time a job status changes. For example, JobState.RUNNING->JobState.DONE. This function takes a single argument of a schrodinger.job.queue.BaseJob object.

  • periodic_callback – A command to call periodically, regardless of whether job status has changed or not. The function will be called without any arguments.

  • callback_interval – The interval at which the periodic interval will be called. This time is only approximately enforced and will depend on the timing delay settings (e.g. MONITOR_DELAY).

  • restart_failed – True (default) if previously failed jobs should be restarted, False if not.

schrodinger.job.queue.get_current_time() float

Return time, suitable for mocking.