schrodinger.stepper.stepper module

Framework for writing computational workflows and running them in a highly distributed manner. Each step of the workflow is either a “mapping” operation (see MapStep) or “reducing” operation (see ReduceStep). These steps can then be chained together using the `Chain class.

For a more complete introduction, see WordCount tutorial:

For documentation on specific stepper features, see the following feature list. You can ctrl+f the feature tag to jump to the relevant docstrings.

Feature

Tag

MapStep

_map_step_

ReduceStep

_reduce_step_

Chain

_chain_

Settings

_settings_

Serialization

_serialization_

File Handling

_file_handling_

Generic Steps

_generics_

Custom Workflow

_custom_workflows_

Double Batching

_dbl_batching_

#=============================================================================== # Running stepper with custom, undistributed workflows <_custom_workflows_> #=============================================================================== To run steps that aren’t defined in the core suite: The script should be executed inside the working directory and import steps from a local package in the working directory.

Working dir contents:

script.py
my_lib/
    __init__.py
    steps.py

Minimal code in script.py if it needs to run under job control:

from schrodinger.job import launchapi
from schrodinger.ui.qt.appframework2 import application
from my_lib.steps import MyStep

def get_job_spec_from_args(argv):
    jsb = launchapi.JobSpecificationArgsBuilder(argv)
    jsb.setInputFile(__file__)
    jsb.setInputDirectory('my_lib')
    return jsb.getJobSpec()

def main():
    step = MyStep()
    set.getOutputs()

if __name__ == '__main__':
    application.run_application(main)

#=============================================================================== # Generic Steps <_generics_> #=============================================================================== Steps that can be run on any data type can be created by using the GENERIC sentinel type as the input or output type of a step. The step will then receive all inputs as serialized strings and should return all string outputs.

Example:

>>> class DeduplicateStep(ReduceStep):
...     Input = Output = GENERIC
...
...     def reduceFunction(self, inps):
...         yield from sorted(set(inps))
...
>>> class IdentityStep(MapStep):
...     Input = Output = int
...
...     def mapFunction(self, value):
...         yield value
...
>>> class DeduplicatedIdentityChain(Chain):
...     def buildChain(self):
...         self.addStep(IdentityStep())
...         self.addStep(DeduplicateStep())
...         self.addStep(IdentityStep())
...
>>> chain = DeduplicatedIdentityChain()
>>> chain.setInputs([1, 2, 3, 1, 2, 3])
>>> chain.getOutputs()
[1, 2, 3]

These steps can be chained together with any steps, regardless of the input or output types of the other steps.

NOTE::

No validation is done to guarantee that the input type of the step after a generic step is the same as the output type of the precedeing step. The behavior for this circumstance is undefined and will likely resulted in an error.

#=============================================================================== # Double Batching <_dbl_batching_> #=============================================================================== Job launch speeds at the time of writing is about one job per 3 or 4 seconds. This rate becomes insufficient once we need more than a few hundred workers. To get around this, stepper employs a pattern we coin “double batching”, where we create subjobs whose sole purpose is to themselves create the subjobs that actually run the steps.

NOTE:: We use double-batching for the PubSub implementation of stepper as well as the file-based implementation. The literal meaning of “double-batching” doesn’t apply as well to the PubSub implementation but the general pattern of launching subjobs to launch more subjobs still applies.

#=============================================================================== # Environment variables and global settings #===============================================================================

Settings:

  • SCHRODINGER_STEPPER_DEBUG

    Set to 1 to have most files brought back from a workflow run. Set to 2 to have _all_ files brought back.

  • SCHRODINGER_STEPPER_LOG_LEVEL

    Set to an integer or name representing the log level to use for the logger. Valid log names are in decreasing order of verbosity NOTSET, DEBUG, INFO, WARN, ERROR, FATAL, and CRITICAL.

  • SCHRODINGER_STEPPER_ERROR_THRESHOLD

    Sets the batching error threshold. If not set, 0.9 will be used. Note that this value can still be overridden by a workflow’s configuration.

schrodinger.stepper.stepper.get_debug_level()
class schrodinger.stepper.stepper.ElapsedFormatter

Bases: object

A stepper logging formatter that includes how much time has elapsed since a start time in all messages. FORMATTER.start() is safe to call multiple times, the start time will be set to the time of the earliest call.

Example usage:

logger.debug("Message before debug.")
# Message before debug.
FORMATTER.start()
time.wait(1)
logger.debug("My message")
# <STEPPER> DEBUG +00:00:01: My message
__init__()
start()
format(record)
schrodinger.stepper.stepper.ichunked(iterable, n)

Reimplementation of more_itertools.ichunked that does not cache n items of iterable at a time.

Breaks iterable into sub-iterables with n elements each.

Note that unlike more_itertools.ichunked, an error will be raised if you try to iterate over a chunk before its previous chunk has been consumed.

class schrodinger.stepper.stepper.StepperFile(value='', *args, **kwargs)

Bases: schrodinger.tasks.tasks.TaskFile

class schrodinger.stepper.stepper.StepperFolder(value='', *args, **kwargs)

Bases: schrodinger.tasks.tasks.TaskFolder

class schrodinger.stepper.stepper.StepTaskInput(*args, _param_type=<object object>, **kwargs)

Bases: schrodinger.models.parameters.CompoundParam

dehydrated_step: schrodinger.stepper.stepper._DehydratedStep

A parameter of the class.

debug_mode: bool

A parameter of the class.

misc_input_filenames: List[schrodinger.tasks.tasks.TaskFile]

A parameter of the class.

misc_input_folders: List[schrodinger.tasks.tasks.TaskFolder]

A parameter of the class.

debug_modeChanged

A pyqtSignal emitted by instances of the class.

debug_modeReplaced

A pyqtSignal emitted by instances of the class.

dehydrated_stepChanged

A pyqtSignal emitted by instances of the class.

dehydrated_stepReplaced

A pyqtSignal emitted by instances of the class.

misc_input_filenamesChanged

A pyqtSignal emitted by instances of the class.

misc_input_filenamesReplaced

A pyqtSignal emitted by instances of the class.

misc_input_foldersChanged

A pyqtSignal emitted by instances of the class.

misc_input_foldersReplaced

A pyqtSignal emitted by instances of the class.

class schrodinger.stepper.stepper.StepTaskOutput(*args, _param_type=<object object>, **kwargs)

Bases: schrodinger.models.parameters.CompoundParam

output_file: schrodinger.tasks.tasks.TaskFile

A parameter of the class.

run_info: dict

A parameter of the class.

misc_output_filenames: List[schrodinger.tasks.tasks.TaskFile]

A parameter of the class.

misc_output_filenamesChanged

A pyqtSignal emitted by instances of the class.

misc_output_filenamesReplaced

A pyqtSignal emitted by instances of the class.

output_fileChanged

A pyqtSignal emitted by instances of the class.

output_fileReplaced

A pyqtSignal emitted by instances of the class.

run_infoChanged

A pyqtSignal emitted by instances of the class.

run_infoReplaced

A pyqtSignal emitted by instances of the class.

class schrodinger.stepper.stepper.StepTaskMixin(*args, step=None, **kwargs)

Bases: schrodinger.models.parameters.CompoundParamMixin

This class must be mixed in with a subclass of AbstractComboTask. The resulting task class may be used to run any step as a task, provided the input, output, and settings classes are all JSONable.

input: schrodinger.stepper.stepper.StepTaskInput
output: schrodinger.stepper.stepper.StepTaskOutput
DEFAULT_TASKDIR_SETTING = 1
__init__(*args, step=None, **kwargs)
addLicenseReservation(license, num_tokens=1)
setStep(step)
getStepClass()
mainFunction()
class schrodinger.stepper.stepper.StepSubprocessTask(*args, _param_type=<object object>, **kwargs)

Bases: schrodinger.stepper.stepper.StepTaskMixin, schrodinger.tasks.tasks.ComboSubprocessTask

calling_contextChanged

A pyqtSignal emitted by instances of the class.

calling_contextReplaced

A pyqtSignal emitted by instances of the class.

failure_infoChanged

A pyqtSignal emitted by instances of the class.

failure_infoReplaced

A pyqtSignal emitted by instances of the class.

input: schrodinger.stepper.stepper.StepTaskInput

A parameter of the class.

inputChanged

A pyqtSignal emitted by instances of the class.

inputReplaced

A pyqtSignal emitted by instances of the class.

max_progressChanged

A pyqtSignal emitted by instances of the class.

max_progressReplaced

A pyqtSignal emitted by instances of the class.

nameChanged

A pyqtSignal emitted by instances of the class.

nameReplaced

A pyqtSignal emitted by instances of the class.

output: schrodinger.stepper.stepper.StepTaskOutput

A parameter of the class.

outputChanged

A pyqtSignal emitted by instances of the class.

outputReplaced

A pyqtSignal emitted by instances of the class.

progressChanged

A pyqtSignal emitted by instances of the class.

progressReplaced

A pyqtSignal emitted by instances of the class.

progress_stringChanged

A pyqtSignal emitted by instances of the class.

progress_stringReplaced

A pyqtSignal emitted by instances of the class.

statusChanged

A pyqtSignal emitted by instances of the class.

statusReplaced

A pyqtSignal emitted by instances of the class.

class schrodinger.stepper.stepper.StepJobTask(*args, _param_type=<object object>, **kwargs)

Bases: schrodinger.stepper.stepper.StepTaskMixin, schrodinger.tasks.jobtasks.ComboJobTask

input: schrodinger.stepper.stepper.StepTaskInput

A parameter of the class.

output: schrodinger.stepper.stepper.StepTaskOutput

A parameter of the class.

isCanceled() bool

Whether the job underlying the task is canceled.

If there is no job associated with the task, then this returns False.

mainFunction()
calling_contextChanged

A pyqtSignal emitted by instances of the class.

calling_contextReplaced

A pyqtSignal emitted by instances of the class.

failure_infoChanged

A pyqtSignal emitted by instances of the class.

failure_infoReplaced

A pyqtSignal emitted by instances of the class.

inputChanged

A pyqtSignal emitted by instances of the class.

inputReplaced

A pyqtSignal emitted by instances of the class.

job_configChanged

A pyqtSignal emitted by instances of the class.

job_configReplaced

A pyqtSignal emitted by instances of the class.

max_progressChanged

A pyqtSignal emitted by instances of the class.

max_progressReplaced

A pyqtSignal emitted by instances of the class.

nameChanged

A pyqtSignal emitted by instances of the class.

nameReplaced

A pyqtSignal emitted by instances of the class.

outputChanged

A pyqtSignal emitted by instances of the class.

outputReplaced

A pyqtSignal emitted by instances of the class.

progressChanged

A pyqtSignal emitted by instances of the class.

progressReplaced

A pyqtSignal emitted by instances of the class.

progress_stringChanged

A pyqtSignal emitted by instances of the class.

progress_stringReplaced

A pyqtSignal emitted by instances of the class.

statusChanged

A pyqtSignal emitted by instances of the class.

statusReplaced

A pyqtSignal emitted by instances of the class.

class schrodinger.stepper.stepper.Topic(*args, _param_type=<object object>, **kwargs)

Bases: schrodinger.models.parameters.CompoundParam

An abstraction over a container of messages in a Publish/Subscribe model

Variables
  • name – the name of the topic

  • num_uploaded_msgs – the number of messages that were uploaded to this topic; it is not reflective of the number of messages currently in the topic.

name: str

A parameter of the class.

num_uploaded_msgs: int

A parameter of the class.

nameChanged

A pyqtSignal emitted by instances of the class.

nameReplaced

A pyqtSignal emitted by instances of the class.

num_uploaded_msgsChanged

A pyqtSignal emitted by instances of the class.

num_uploaded_msgsReplaced

A pyqtSignal emitted by instances of the class.

class schrodinger.stepper.stepper.BatchSettings(*args, _param_type=<object object>, **kwargs)

Bases: schrodinger.models.parameters.CompoundParam

size: int

A parameter of the class.

task_class: type

A parameter of the class.

hostname: str

A parameter of the class.

use_pubsub: bool

A parameter of the class.

num_pubsub_workers: int

A parameter of the class.

error_threshold: float

A parameter of the class.

initializeValue()

Override to dynamically set up the default value of the param. Useful for default values that are determined at runtime. This is called any time the param is reset.

error_thresholdChanged

A pyqtSignal emitted by instances of the class.

error_thresholdReplaced

A pyqtSignal emitted by instances of the class.

hostnameChanged

A pyqtSignal emitted by instances of the class.

hostnameReplaced

A pyqtSignal emitted by instances of the class.

num_pubsub_workersChanged

A pyqtSignal emitted by instances of the class.

num_pubsub_workersReplaced

A pyqtSignal emitted by instances of the class.

sizeChanged

A pyqtSignal emitted by instances of the class.

sizeReplaced

A pyqtSignal emitted by instances of the class.

task_classChanged

A pyqtSignal emitted by instances of the class.

task_classReplaced

A pyqtSignal emitted by instances of the class.

use_pubsubChanged

A pyqtSignal emitted by instances of the class.

use_pubsubReplaced

A pyqtSignal emitted by instances of the class.

class schrodinger.stepper.stepper.Serializer

Bases: object

<_serialization_> A class for defining special serialization for some datatype. Serialization by default uses the json protocol, but if a specialized protocol is wanted instead, users can subclass this class to do so.

Subclasses should:

  • Define DataType. This is the class that this serializer can

    encode/decode.

  • Define toString(self, output), which defines how to serialize

    an output.

  • Define fromString(self, input_str), which defines how to

    deserialize an input.

This can then be used as the InputSerializer or OutputSerializer for any step.

Here’s an example for defining an int that’s serialized in base-two as opposed to base-ten:

class IntBaseTwoSerializer(Serializer):
    DataType = int

    def toString(self, output):
        return bin(output) # 7 -> '0b111'

    def fromString(self, input_str):
        return int(input_str[2:], 2) # '0b111' -> 7

This can then be used anywhere you’d use an int as the output or input in a step. For example:

class SquaringStep(MapStep):
    Input = int
    InputSerializer = IntBaseTwoSerializer
    Output = int
    OutputSerializer = IntBaseTwoSerializer

    def mapFunction(self, inp):
        yield inp**2

Now, any time that a SquaringStep would read its inputs from a file or write its outputs to a file, it’ll do so using using a base-two representation.

DataType = NotImplemented
serialize(items, fname)

Write items to a file named fname.

deserialize(fname)

Read in items from fname. :type fname: str :rtype: iterable[self.DataType]

fromString(input_str)
toString(output)
exception schrodinger.stepper.stepper.ValidationIssue(source_step, msg)

Bases: RuntimeError

__init__(source_step, msg)
exception schrodinger.stepper.stepper.SettingsError(source_step, msg)

Bases: schrodinger.stepper.stepper.ValidationIssue

Used in conjunction with _BaseStep.validateSettings to report an error with settings. Constructed with the step with the invalid settings and an error message, e.g. SettingsError(bad_step, "Step does not have required settings.")

exception schrodinger.stepper.stepper.SettingsWarning(source_step, msg)

Bases: schrodinger.stepper.stepper.ValidationIssue

Used in conjunction with _BaseStep.validateSettings to report a warning with settings. Constructed with the step with the invalid settings and an error message, e.g. SettingsWarning(bad_step, "Step setting FOO should ideally be positive")

exception schrodinger.stepper.stepper.ResourceError(source_step, msg)

Bases: schrodinger.stepper.stepper.ValidationIssue

Used in conjunction with _BaseStep.validateSettings to report an error with a resource setting. Constructed with the step with the invalid setting and an error message, e.g., ResourceError(bad_step, "Step setting 'file' has not been set.")

exception schrodinger.stepper.stepper.LocalResourceError(source_step, msg)

Bases: schrodinger.stepper.stepper.ResourceError

A ResourceError specifically for local StepperFile and StepperFolder validations, i.e., resources that are on a job submission host and may have to be transferred to compute resources

exception schrodinger.stepper.stepper.StaticResourceError(source_step, msg)

Bases: schrodinger.stepper.stepper.ResourceError

A ResourceError specifically for static StepperFile and StepperFolder validations, i.e., resources that are not necessarily available on a job submission host

class schrodinger.stepper.stepper.PubsubEnabledStepMixin(*args, **kwargs)

Bases: object

A mixin that allows a step to be run using PubSub.

Steps with this mixin will have batch settings that have a use_pubsub flag and a num_pubsub_workers integer. Flipping use_pubsub to on will have the step load up all its inputs into a pubsub topic before spinning up num_pubsub_workers subjobs that will all take from the input topic, run the step’s computation on it, and upload it to an output topic.

Calling my_pubsub_step.getOutputs() will return all the outputs from the output topic, so to a user this will all be implementation detail.

__init__(*args, **kwargs)
property topic_prefix
property topic_suffix
outputs(*args, **kwargs)
usingPubsub()
getInputTopic() Optional[schrodinger.stepper.stepper.Topic]
setInputTopic(inp_topic: Optional[schrodinger.stepper.stepper.Topic])
getOutputTopic() Optional[schrodinger.stepper.stepper.Topic]
setOutputTopic(outp_topic: Optional[schrodinger.stepper.stepper.Topic])
initializeTopics()
class schrodinger.stepper.stepper.UnbatchedReduceStep(settings=None, config=None, step_id=None, metrics_logger_depth=None, _run_info=None, **kwargs)

Bases: schrodinger.stepper.stepper._BaseStep

” An unbatchable ReduceStep. See ReduceStep for more information.

reduceFunction(inputs)
class schrodinger.stepper.stepper.ReduceStep(*args, **kwargs)

Bases: schrodinger.stepper.stepper._BatchableStepMixin, schrodinger.stepper.stepper.UnbatchedReduceStep

<_reduce_step_> A computational step that performs a function on a collection of inputs to produce output items.

To construct a ReduceStep:

  • Implement reduceFunction

  • Define Input (the type expected by the mapFunction)

  • Define Output (the type of item produced by the mapFunction)

  • Define Settings (data class for any settings needed by the mapFunction)

reduceFunction(inputs)

The main computation for this step. This function should take in a iterable of inputs and return an iterable of outputs.

Example:

def reduceFunction(self, words):
    # Find all unique words
    seen_words = set()
    for word in words:
        if word not in seen_words:
            seen_words.add(word)
            yield word
class schrodinger.stepper.stepper.UnbatchedMapStep(settings=None, config=None, step_id=None, metrics_logger_depth=None, _run_info=None, **kwargs)

Bases: schrodinger.stepper.stepper.UnbatchedReduceStep

<_unbatchability_> An unbatchable MapStep. See MapStep for more information.

reduceFunction(inputs)
mapFunction(input)
class schrodinger.stepper.stepper.MapStep(*args, **kwargs)

Bases: schrodinger.stepper.stepper._BatchableStepMixin, schrodinger.stepper.stepper.UnbatchedMapStep

<_map_step_> A computational step that performs a function on input items from an input source to produce output items.

To construct a MapStep:

  • Implement mapFunction

  • Define Input (the type expected by the mapFunction)

  • Optionally define a InputSerializer (see Serializer for more info.)

  • Define Output (the type of item produced by the mapFunction)

  • Optionally define a OutputSerializer (see Serializer for more info.)

  • Define Settings (data class for any settings needed by the mapFunction)

mapFunction(input)

The main computation for this step. This function should take in a single input item and return an iterable of outputs. This allows a single output to produce multiple ouputs (e.g. enumeration).

The output may be yielded as a generator, in order to reduce memory usage.

If only a single output is produced for each input, return it as a single-element list.

Parameters

input

this will be a single input item from the input source. Implementer is encouraged to use a more descriptive, context- specific variable name. Example:

def mapFunction(self, starting_smiles):

class schrodinger.stepper.stepper.UnbatchedChain(*args, **kwargs)

Bases: schrodinger.stepper.stepper.UnbatchedReduceStep

property Input
property Output
property InputSerializer

The default serializer that simply uses json.loads and json.dumps

property OutputSerializer

The default serializer that simply uses json.loads and json.dumps

setInputs(inputs: Iterable[Any], starting_step_id: str = None)

Set the inputs for the chain. If starting_step_id is specified, then all steps before the specified starting step will be skipped. This is useful for resuming a chain’s computation.

setInputFile(input_file: str, starting_step_id: str = None)

Set the input file for the chain. If starting_step_id is specified, then all steps before the specified starting step will be skipped. This is useful for resuming a chain’s computation.

setStartingStep(starting_step: str)
validateSettings()

Check whether the chain settings are valid and return a list of SettingsError and SettingsWarning to report any invalid settings. Default implementation simply returns problems from all child steps.

Return type

list[TaskError or TaskWarning]

getResources(param_type, resource_type)

Get the stepper resources in the settings for the chain as well as for every step in the chain that are instances of param_type and have a resource_type attribute that is resource_type.

Note does not work for list/set/tuple subparams in the settings.

Parameters
  • param_type (tasks._TaskResource) – the resource parameter type

  • resource_type (ResourceType) – the type of resource to get

Returns

the set of stepper resources of resource_type

Return type

set of tasks._TaskResource

__init__(*args, **kwargs)

See class docstring for info on the different constructor arguments.

__len__()
addStep(step)
report(prefix='')

Report the workflow steps and their settings (recursively).

Parameters

prefix (str) – the text to start each line with

validateChain()

Checks that the declaration of the chain is internally consistent - i.e. that each step is valid and each step’s Input class matches the preceding step’s Output class.

reduceFunction(inputs)
buildChain()

This method must be implemented by subclasses to build the chain. The chain is built by modifying self.steps. The chain’s composition may be dependent on self.settings.

class schrodinger.stepper.stepper.Chain(*args, **kwargs)

Bases: schrodinger.stepper.stepper._BatchableStepMixin, schrodinger.stepper.stepper.UnbatchedChain

<_chain_> Run a series of steps. The steps must be created by overriding buildChain.

getLicenseRequirements()
schrodinger.stepper.stepper.get_all_steps_and_chains(step: schrodinger.stepper.stepper._BaseStep) Set[schrodinger.stepper.stepper._BaseStep]

Given a step, return a set of all steps it contains and itself. For example, given a chain A with the following topology:

    A
|-------|
B       C
    |-------|
    D       E

this method will return:

A -> set([A, B, C, D, E])
B -> set[B])
C -> set([C, D, E])