schrodinger.stepper.stepper module¶
Framework for writing computational workflows and running them in a highly
distributed manner. Each step of the workflow is either a “mapping” operation
(see MapStep) or “reducing” operation (see ReduceStep). These steps can then
be chained together using the `Chain class.
For a more complete introduction, see WordCount tutorial:
For documentation on specific stepper features, see the following feature list. You can ctrl+f the feature tag to jump to the relevant docstrings.
Feature |
Tag |
|---|---|
MapStep |
_map_step_ |
ReduceStep |
_reduce_step_ |
Chain |
_chain_ |
Settings |
_settings_ |
Serialization |
_serialization_ |
File Handling |
_file_handling_ |
Generic Steps |
_generics_ |
Custom Workflow |
_custom_workflows_ |
Double Batching |
_dbl_batching_ |
#=============================================================================== # Running stepper with custom, undistributed workflows <_custom_workflows_> #=============================================================================== To run steps that aren’t defined in the core suite: The script should be executed inside the working directory and import steps from a local package in the working directory.
Working dir contents:
script.py
my_lib/
__init__.py
steps.py
Minimal code in script.py if it needs to run under job control:
from schrodinger.job import launchapi
from schrodinger.ui.qt.appframework2 import application
from my_lib.steps import MyStep
def get_job_spec_from_args(argv):
jsb = launchapi.JobSpecificationArgsBuilder(argv)
jsb.setInputFile(__file__)
jsb.setInputDirectory('my_lib')
return jsb.getJobSpec()
def main():
step = MyStep()
set.getOutputs()
if __name__ == '__main__':
application.run_application(main)
#===============================================================================
# Generic Steps <_generics_>
#===============================================================================
Steps that can be run on any data type can be created by using the GENERIC
sentinel type as the input or output type of a step. The step will then
receive all inputs as serialized strings and should return all string outputs.
Example:
>>> class DeduplicateStep(ReduceStep):
... Input = Output = GENERIC
...
... def reduceFunction(self, inps):
... yield from sorted(set(inps))
...
>>> class IdentityStep(MapStep):
... Input = Output = int
...
... def mapFunction(self, value):
... yield value
...
>>> class DeduplicatedIdentityChain(Chain):
... def buildChain(self):
... self.addStep(IdentityStep())
... self.addStep(DeduplicateStep())
... self.addStep(IdentityStep())
...
>>> chain = DeduplicatedIdentityChain()
>>> chain.setInputs([1, 2, 3, 1, 2, 3])
>>> chain.getOutputs()
[1, 2, 3]
These steps can be chained together with any steps, regardless of the input or output types of the other steps.
- NOTE::
No validation is done to guarantee that the input type of the step after a generic step is the same as the output type of the precedeing step. The behavior for this circumstance is undefined and will likely resulted in an error.
#=============================================================================== # Double Batching <_dbl_batching_> #=============================================================================== Job launch speeds at the time of writing is about one job per 3 or 4 seconds. This rate becomes insufficient once we need more than a few hundred workers. To get around this, stepper employs a pattern we coin “double batching”, where we create subjobs whose sole purpose is to themselves create the subjobs that actually run the steps.
NOTE:: We use double-batching for the PubSub implementation of stepper as well as the file-based implementation. The literal meaning of “double-batching” doesn’t apply as well to the PubSub implementation but the general pattern of launching subjobs to launch more subjobs still applies.
#=============================================================================== # Environment variables and global settings #===============================================================================
Settings:
- SCHRODINGER_STEPPER_DEBUG
Set to 1 to have most files brought back from a workflow run. Set to 2 to have _all_ files brought back.
- SCHRODINGER_STEPPER_LOG_LEVEL
Set to an integer or name representing the log level to use for the logger. Valid log names are in decreasing order of verbosity NOTSET, DEBUG, INFO, WARN, ERROR, FATAL, and CRITICAL.
- SCHRODINGER_STEPPER_ERROR_THRESHOLD
Sets the batching error threshold. If not set, 0.9 will be used. Note that this value can still be overridden by a workflow’s configuration.
- schrodinger.stepper.stepper.get_debug_level()¶
- class schrodinger.stepper.stepper.ElapsedFormatter¶
Bases:
objectA stepper logging formatter that includes how much time has elapsed since a start time in all messages.
FORMATTER.start()is safe to call multiple times, the start time will be set to the time of the earliest call.Example usage:
logger.debug("Message before debug.") # Message before debug. FORMATTER.start() time.wait(1) logger.debug("My message") # <STEPPER> DEBUG +00:00:01: My message
- __init__()¶
- start()¶
- format(record)¶
- schrodinger.stepper.stepper.ichunked(iterable, n)¶
Reimplementation of more_itertools.ichunked that does not cache n items of iterable at a time.
Breaks
iterableinto sub-iterables withnelements each.Note that unlike more_itertools.ichunked, an error will be raised if you try to iterate over a chunk before its previous chunk has been consumed.
- class schrodinger.stepper.stepper.StepperFolder(value='', *args, **kwargs)¶
Bases:
TaskFolder
- class schrodinger.stepper.stepper.StepTaskInput(*args, _param_type=<object object>, **kwargs)¶
Bases:
CompoundParam- dehydrated_step: _DehydratedStep¶
A parameter of the class.
- debug_mode: bool¶
A parameter of the class.
- misc_input_folders: List[TaskFolder]¶
A parameter of the class.
- debug_modeChanged¶
A
pyqtSignalemitted by instances of the class.
- debug_modeReplaced¶
A
pyqtSignalemitted by instances of the class.
- dehydrated_stepChanged¶
A
pyqtSignalemitted by instances of the class.
- dehydrated_stepReplaced¶
A
pyqtSignalemitted by instances of the class.
- misc_input_filenamesChanged¶
A
pyqtSignalemitted by instances of the class.
- misc_input_filenamesReplaced¶
A
pyqtSignalemitted by instances of the class.
- misc_input_foldersChanged¶
A
pyqtSignalemitted by instances of the class.
- misc_input_foldersReplaced¶
A
pyqtSignalemitted by instances of the class.
- class schrodinger.stepper.stepper.StepTaskOutput(*args, _param_type=<object object>, **kwargs)¶
Bases:
CompoundParam- run_info: dict¶
A parameter of the class.
- misc_output_filenamesChanged¶
A
pyqtSignalemitted by instances of the class.
- misc_output_filenamesReplaced¶
A
pyqtSignalemitted by instances of the class.
- output_fileChanged¶
A
pyqtSignalemitted by instances of the class.
- output_fileReplaced¶
A
pyqtSignalemitted by instances of the class.
- run_infoChanged¶
A
pyqtSignalemitted by instances of the class.
- run_infoReplaced¶
A
pyqtSignalemitted by instances of the class.
- class schrodinger.stepper.stepper.StepTaskMixin(*args, step=None, **kwargs)¶
Bases:
CompoundParamMixinThis class must be mixed in with a subclass of AbstractComboTask. The resulting task class may be used to run any step as a task, provided the input, output, and settings classes are all JSONable.
- input: StepTaskInput¶
- output: StepTaskOutput¶
- DEFAULT_TASKDIR_SETTING = 1¶
- __init__(*args, step=None, **kwargs)¶
- addLicenseReservation(license, num_tokens=1)¶
- setStep(step)¶
- getStepClass()¶
- mainFunction()¶
- class schrodinger.stepper.stepper.StepSubprocessTask(*args, _param_type=<object object>, **kwargs)¶
Bases:
StepTaskMixin,ComboSubprocessTask- calling_contextChanged¶
A
pyqtSignalemitted by instances of the class.
- calling_contextReplaced¶
A
pyqtSignalemitted by instances of the class.
- failure_infoChanged¶
A
pyqtSignalemitted by instances of the class.
- failure_infoReplaced¶
A
pyqtSignalemitted by instances of the class.
- input: StepTaskInput¶
A parameter of the class.
- inputChanged¶
A
pyqtSignalemitted by instances of the class.
- inputReplaced¶
A
pyqtSignalemitted by instances of the class.
- max_progressChanged¶
A
pyqtSignalemitted by instances of the class.
- max_progressReplaced¶
A
pyqtSignalemitted by instances of the class.
- nameChanged¶
A
pyqtSignalemitted by instances of the class.
- nameReplaced¶
A
pyqtSignalemitted by instances of the class.
- output: StepTaskOutput¶
A parameter of the class.
- outputChanged¶
A
pyqtSignalemitted by instances of the class.
- outputReplaced¶
A
pyqtSignalemitted by instances of the class.
- progressChanged¶
A
pyqtSignalemitted by instances of the class.
- progressReplaced¶
A
pyqtSignalemitted by instances of the class.
- progress_stringChanged¶
A
pyqtSignalemitted by instances of the class.
- progress_stringReplaced¶
A
pyqtSignalemitted by instances of the class.
- statusChanged¶
A
pyqtSignalemitted by instances of the class.
- statusReplaced¶
A
pyqtSignalemitted by instances of the class.
- class schrodinger.stepper.stepper.StepJobTask(*args, _param_type=<object object>, **kwargs)¶
Bases:
StepTaskMixin,ComboJobTask- input: StepTaskInput¶
A parameter of the class.
- output: StepTaskOutput¶
A parameter of the class.
- isCanceled() bool¶
Whether the job underlying the task is canceled.
If there is no job associated with the task, then this returns False.
- mainFunction()¶
- calling_contextChanged¶
A
pyqtSignalemitted by instances of the class.
- calling_contextReplaced¶
A
pyqtSignalemitted by instances of the class.
- failure_infoChanged¶
A
pyqtSignalemitted by instances of the class.
- failure_infoReplaced¶
A
pyqtSignalemitted by instances of the class.
- inputChanged¶
A
pyqtSignalemitted by instances of the class.
- inputReplaced¶
A
pyqtSignalemitted by instances of the class.
- job_configChanged¶
A
pyqtSignalemitted by instances of the class.
- job_configReplaced¶
A
pyqtSignalemitted by instances of the class.
- max_progressChanged¶
A
pyqtSignalemitted by instances of the class.
- max_progressReplaced¶
A
pyqtSignalemitted by instances of the class.
- nameChanged¶
A
pyqtSignalemitted by instances of the class.
- nameReplaced¶
A
pyqtSignalemitted by instances of the class.
- outputChanged¶
A
pyqtSignalemitted by instances of the class.
- outputReplaced¶
A
pyqtSignalemitted by instances of the class.
- progressChanged¶
A
pyqtSignalemitted by instances of the class.
- progressReplaced¶
A
pyqtSignalemitted by instances of the class.
- progress_stringChanged¶
A
pyqtSignalemitted by instances of the class.
- progress_stringReplaced¶
A
pyqtSignalemitted by instances of the class.
- statusChanged¶
A
pyqtSignalemitted by instances of the class.
- statusReplaced¶
A
pyqtSignalemitted by instances of the class.
- class schrodinger.stepper.stepper.Topic(*args, _param_type=<object object>, **kwargs)¶
Bases:
CompoundParamAn abstraction over a container of messages in a Publish/Subscribe model
- Variables:
name – the name of the topic
num_uploaded_msgs – the number of messages that were uploaded to this topic; it is not reflective of the number of messages currently in the topic.
- name: str¶
A parameter of the class.
- num_uploaded_msgs: int¶
A parameter of the class.
- nameChanged¶
A
pyqtSignalemitted by instances of the class.
- nameReplaced¶
A
pyqtSignalemitted by instances of the class.
- num_uploaded_msgsChanged¶
A
pyqtSignalemitted by instances of the class.
- num_uploaded_msgsReplaced¶
A
pyqtSignalemitted by instances of the class.
- class schrodinger.stepper.stepper.BatchSettings(*args, _param_type=<object object>, **kwargs)¶
Bases:
CompoundParam- size: int¶
A parameter of the class.
- task_class: type¶
A parameter of the class.
- hostname: str¶
A parameter of the class.
- use_pubsub: bool¶
A parameter of the class.
- num_pubsub_workers: int¶
A parameter of the class.
- error_threshold: float¶
A parameter of the class.
- initializeValue()¶
Override to dynamically set up the default value of the param. Useful for default values that are determined at runtime. This is called any time the param is reset.
- error_thresholdChanged¶
A
pyqtSignalemitted by instances of the class.
- error_thresholdReplaced¶
A
pyqtSignalemitted by instances of the class.
- hostnameChanged¶
A
pyqtSignalemitted by instances of the class.
- hostnameReplaced¶
A
pyqtSignalemitted by instances of the class.
- num_pubsub_workersChanged¶
A
pyqtSignalemitted by instances of the class.
- num_pubsub_workersReplaced¶
A
pyqtSignalemitted by instances of the class.
- sizeChanged¶
A
pyqtSignalemitted by instances of the class.
- sizeReplaced¶
A
pyqtSignalemitted by instances of the class.
- task_classChanged¶
A
pyqtSignalemitted by instances of the class.
- task_classReplaced¶
A
pyqtSignalemitted by instances of the class.
- use_pubsubChanged¶
A
pyqtSignalemitted by instances of the class.
- use_pubsubReplaced¶
A
pyqtSignalemitted by instances of the class.
- class schrodinger.stepper.stepper.Serializer¶
Bases:
object<_serialization_> A class for defining special serialization for some datatype. Serialization by default uses the
jsonprotocol, but if a specialized protocol is wanted instead, users can subclass this class to do so.Subclasses should:
- Define
DataType. This is the class that this serializer can encode/decode.
- Define
- Define
toString(self, output), which defines how to serialize an output.
- Define
- Define
fromString(self, input_str), which defines how to deserialize an input.
- Define
This can then be used as the
InputSerializerorOutputSerializerfor any step.Here’s an example for defining an int that’s serialized in base-two as opposed to base-ten:
class IntBaseTwoSerializer(Serializer): DataType = int def toString(self, output): return bin(output) # 7 -> '0b111' def fromString(self, input_str): return int(input_str[2:], 2) # '0b111' -> 7
This can then be used anywhere you’d use an int as the output or input in a step. For example:
class SquaringStep(MapStep): Input = int InputSerializer = IntBaseTwoSerializer Output = int OutputSerializer = IntBaseTwoSerializer def mapFunction(self, inp): yield inp**2
Now, any time that a
SquaringStepwould read its inputs from a file or write its outputs to a file, it’ll do so using using a base-two representation.- DataType = NotImplemented¶
- serialize(items, fname)¶
Write
itemsto a file namedfname.
- deserialize(fname)¶
Read in items from
fname. :type fname: str :rtype: iterable[self.DataType]
- fromString(input_str)¶
- toString(output)¶
- exception schrodinger.stepper.stepper.ValidationIssue(source_step, msg)¶
Bases:
RuntimeError- __init__(source_step, msg)¶
- exception schrodinger.stepper.stepper.SettingsError(source_step, msg)¶
Bases:
ValidationIssueUsed in conjunction with
_BaseStep.validateSettingsto report an error with settings. Constructed with the step with the invalid settings and an error message, e.g.SettingsError(bad_step, "Step does not have required settings.")
- exception schrodinger.stepper.stepper.SettingsWarning(source_step, msg)¶
Bases:
ValidationIssueUsed in conjunction with
_BaseStep.validateSettingsto report a warning with settings. Constructed with the step with the invalid settings and an error message, e.g.SettingsWarning(bad_step, "Step setting FOO should ideally be positive")
- exception schrodinger.stepper.stepper.ResourceError(source_step, msg)¶
Bases:
ValidationIssueUsed in conjunction with
_BaseStep.validateSettingsto report an error with a resource setting. Constructed with the step with the invalid setting and an error message, e.g.,ResourceError(bad_step, "Step setting 'file' has not been set.")
- exception schrodinger.stepper.stepper.LocalResourceError(source_step, msg)¶
Bases:
ResourceErrorA ResourceError specifically for local StepperFile and StepperFolder validations, i.e., resources that are on a job submission host and may have to be transferred to compute resources
- exception schrodinger.stepper.stepper.StaticResourceError(source_step, msg)¶
Bases:
ResourceErrorA ResourceError specifically for static StepperFile and StepperFolder validations, i.e., resources that are not necessarily available on a job submission host
- class schrodinger.stepper.stepper.PubsubEnabledStepMixin(*args, **kwargs)¶
Bases:
objectA mixin that allows a step to be run using PubSub.
Steps with this mixin will have batch settings that have a
use_pubsubflag and anum_pubsub_workersinteger. Flippinguse_pubsubto on will have the step load up all its inputs into a pubsub topic before spinning upnum_pubsub_workerssubjobs that will all take from the input topic, run the step’s computation on it, and upload it to an output topic.Calling
my_pubsub_step.getOutputs()will return all the outputs from the output topic, so to a user this will all be implementation detail.- __init__(*args, **kwargs)¶
- property topic_prefix¶
- property topic_suffix¶
- outputs(*args, **kwargs)¶
- usingPubsub()¶
- initializeTopics()¶
- class schrodinger.stepper.stepper.UnbatchedReduceStep(settings=None, config=None, step_id=None, metrics_logger_depth=None, _run_info=None, **kwargs)¶
Bases:
_BaseStep” An unbatchable ReduceStep. See ReduceStep for more information.
- reduceFunction(inputs)¶
- class schrodinger.stepper.stepper.ReduceStep(*args, **kwargs)¶
Bases:
_BatchableStepMixin,UnbatchedReduceStep<_reduce_step_> A computational step that performs a function on a collection of inputs to produce output items.
To construct a ReduceStep:
Implement reduceFunction
Define Input (the type expected by the mapFunction)
Define Output (the type of item produced by the mapFunction)
Define Settings (data class for any settings needed by the mapFunction)
- reduceFunction(inputs)¶
The main computation for this step. This function should take in a iterable of inputs and return an iterable of outputs.
Example:
def reduceFunction(self, words): # Find all unique words seen_words = set() for word in words: if word not in seen_words: seen_words.add(word) yield word
- class schrodinger.stepper.stepper.UnbatchedMapStep(settings=None, config=None, step_id=None, metrics_logger_depth=None, _run_info=None, **kwargs)¶
Bases:
UnbatchedReduceStep<_unbatchability_> An unbatchable MapStep. See MapStep for more information.
- reduceFunction(inputs)¶
- mapFunction(input)¶
- class schrodinger.stepper.stepper.MapStep(*args, **kwargs)¶
Bases:
_BatchableStepMixin,UnbatchedMapStep<_map_step_> A computational step that performs a function on input items from an input source to produce output items.
To construct a MapStep:
Implement mapFunction
Define Input (the type expected by the mapFunction)
Optionally define a InputSerializer (see
Serializerfor more info.)Define Output (the type of item produced by the mapFunction)
Optionally define a OutputSerializer (see
Serializerfor more info.)Define Settings (data class for any settings needed by the mapFunction)
- mapFunction(input)¶
The main computation for this step. This function should take in a single input item and return an iterable of outputs. This allows a single output to produce multiple ouputs (e.g. enumeration).
The output may be yielded as a generator, in order to reduce memory usage.
If only a single output is produced for each input, return it as a single-element list.
- Parameters:
input –
this will be a single input item from the input source. Implementer is encouraged to use a more descriptive, context- specific variable name. Example:
- def mapFunction(self, starting_smiles):
…
- class schrodinger.stepper.stepper.UnbatchedChain(*args, **kwargs)¶
Bases:
UnbatchedReduceStep- property Input¶
- property Output¶
- property InputSerializer¶
The default serializer that simply uses
json.loadsandjson.dumps
- property OutputSerializer¶
The default serializer that simply uses
json.loadsandjson.dumps
- setInputs(inputs: Iterable[Any], starting_step_id: str = None)¶
Set the inputs for the chain. If
starting_step_idis specified, then all steps before the specified starting step will be skipped. This is useful for resuming a chain’s computation.
- setInputFile(input_file: str, starting_step_id: str = None)¶
Set the input file for the chain. If
starting_step_idis specified, then all steps before the specified starting step will be skipped. This is useful for resuming a chain’s computation.
- setStartingStep(starting_step: str)¶
- validateSettings()¶
Check whether the chain settings are valid and return a list of
SettingsErrorandSettingsWarningto report any invalid settings. Default implementation simply returns problems from all child steps.- Return type:
list[TaskError or TaskWarning]
- getResources(param_type, resource_type)¶
Get the stepper resources in the settings for the chain as well as for every step in the chain that are instances of
param_typeand have a resource_type attribute that isresource_type.Note does not work for list/set/tuple subparams in the settings.
- Parameters:
param_type (tasks._TaskResource) – the resource parameter type
resource_type (ResourceType) – the type of resource to get
- Returns:
the set of stepper resources of
resource_type- Return type:
set of tasks._TaskResource
- __init__(*args, **kwargs)¶
See class docstring for info on the different constructor arguments.
- __len__()¶
- addStep(step)¶
- report(prefix='')¶
Report the workflow steps and their settings (recursively).
- Parameters:
prefix (str) – the text to start each line with
- validateChain()¶
Checks that the declaration of the chain is internally consistent - i.e. that each step is valid and each step’s Input class matches the preceding step’s Output class.
- reduceFunction(inputs)¶
- buildChain()¶
This method must be implemented by subclasses to build the chain. The chain is built by modifying self.steps. The chain’s composition may be dependent on self.settings.
- class schrodinger.stepper.stepper.Chain(*args, **kwargs)¶
Bases:
_BatchableStepMixin,UnbatchedChain<_chain_> Run a series of steps. The steps must be created by overriding buildChain.
- getLicenseRequirements()¶
- schrodinger.stepper.stepper.get_all_steps_and_chains(step: _BaseStep) Set[_BaseStep]¶
Given a step, return a set of all steps it contains and itself. For example, given a chain A with the following topology:
A |-------| B C |-------| D E
this method will return:
A -> set([A, B, C, D, E]) B -> set[B]) C -> set([C, D, E])