schrodinger.pipeline.pipeline module¶
Classes for running pipelines.
The main class is called Pipeline. This class parses the input file, creates appropriate stages, and runs them in their own subdirectories.
The StageJob class represents a pipeline job linked to a specific stage.
The IO (In/out object) classes (defined in pipeio.py) represent information that is from one stage to another, such as a list of files. They are also called Variables.
Input Object Syntax¶
The Pipeline input file is used to specify which stages to run, how to run them (parameters), what to use for input, and where to send the output. An example input file looks like:
SET MY_INPUT
VARCLASS Structures
FILE /home/adzhigir/vsw_testing/20confs.mae
The SET
line value (MY_INPUT
) specifies the name of the IO object.
The VARCLASS
value (Structures
) specifies the PipeIO class to create.
Pipeline uses VARCLASS to determine which variable to create. Pipeline will
search schrodinger.pipeline.pipeio module for the class name specified of
this line. If it is not found there, it assumes a custom class is specified
as absolute path. (In this case, make sure the custom module is in your
PYTHONPATH
.)
All lines following VARCLASS
are used to define what information to put into
this variable, in this case it is a Maestro file (20confs.mae
).
Stage Syntax¶
An example stage file looks like:
STAGE MY_STAGE
STAGECLASS macromodel.ConfSearchStage
INPUT MY_INPUT
OUTPUT MY_OUTPUT
FFLD MMFFS
The STAGE
line value (MY_STAGE
) specifies the name of the stage. The
STAGECLASS
keyword specifies <module>.
that defines the
stage. Pipeline uses STAGECLASS
to determine which stage to create.
Pipeline will search schrodinger.pipeline.stages namespace as well. Please
make sure the module is in your PYTHONPATH
.
See schrodinger.pipeline.stages.combine
for an example on how to
write a stage module.
Input variables for the stage are specified via INPUT
keywords, and outputs
via OUTPUT
keywords. The rest of the keywords tell the stage how to run.
If you wish to run the Pipeline without using the pipeline startup machinery:
p = pipeline.Pipeline([options])
p.readFile(<input file>)
try:
p.run()
except RuntimeError:
...
If restartability is important, specify the restart_file
when
constructing the Pipeline object.
To restart Pipeline, do:
p = pipeline.Restart(restart_file [, new options]),
try:
p.run()
except RuntimeError:
...
where restart_file
is the same file that you specified to this
constructor when the initial instance was created.
Copyright Schrodinger, LLC. All rights reserved.
- schrodinger.pipeline.pipeline.log(text)¶
Prints specified text to the log pipe; adds a return at the end
- schrodinger.pipeline.pipeline.logn(text)¶
Print the specified text to the log pipe with no newline. This is especially useful when printing progress periods.
- schrodinger.pipeline.pipeline.add_host_lists(list1, list2)¶
Append hosts in list2 to list1.
- Example::
list1 = a:5,b:10 list2 = a:2,c:12 output = a:7,b:10,c:12
The order of hosts is retained (first list is given priority).
- schrodinger.pipeline.pipeline.subtract_host_lists(list1, dict2)¶
Return available (not used) hosts. This function subtracts the host dict
dict2
from the host dictlist1
.- Parameters
list1 (dict) – All available hosts (specified by user), with hostname as key and cpu count as value.
dict2 (dict) – All used hosts (used by stages)
- schrodinger.pipeline.pipeline.importName(modulename, name)¶
Import a named object from a module in the context of this function.
For example, if you would like to create an instance of the Foo class from the bar.py module:
foo_class = importName("bar", "Foo") foo_instance = foo_class()
- Raises
ImportError – Raised when the object can not be imported.
- class schrodinger.pipeline.pipeline.StageJob(stageobj, pipeline, module)¶
Bases:
object
A “Job” that is used by Pipeline to run a Stage.
Each StageJob has a Stage object associated with it. This object is periodically dumped to a file in order to support restartability. The process is called “dumping” and the file is the dump file. When Pipeline is restarted, each stage object is recovered from the associated dump file to get the latest state.
- __init__(stageobj, pipeline, module)¶
- Parameters
pipeline – Reference to the Pipeline object.
stageobj (Stage) – Stage to run.
module – The module where the above stage is defined.
- setUsedHosts(new_host_list)¶
- getUnusedHostList()¶
- updateStageCpus()¶
Based on current host usage and number of needed cpus, determine which hosts this stage should use and send them to it in a message.
- sendHostsToUse()¶
Send a message to the stage job telling it how many CPUS to use. Gets called periodically in case messages don’t go through.
- restart(action)¶
Mark this job to be restarted by the Pipeline.
- finish()¶
Sets the pipe’s stage object to the final finished stage object from the dump file, and parses all of the outputs.
- readyToRun(objects)¶
Return True if this StageJob has all inputs that are required for it to start.
- printAction(action)¶
Call the Pipeline’s printAction method.
- died(action)¶
Mark this stage as failed. The “action” gets saved in the “died_action” attribute, and will be printed out at the end of the workflow.
This gets called every time a StageJob dies by raising a RuntimeError exception.
- printFailureMessage()¶
Print the failure status of the stage and the path to the log file.
- updateFromDump(quiet=False)¶
Update this stage of the pipeline to the latest state from the dump file.
- class schrodinger.pipeline.pipeline.Pipeline(jobname='pipeline', prog=None, logfh=None, restart_file=None)¶
Bases:
object
A controller responsible for running the stages in the correct order.
Pipeline parses the input file, creates instances of all IO objects, stage objects, and stage job objects, submits the stages in the appropriate directories, and waits for them to finish. Once a stage finishes, it starts any stages that depend on its output. When all stages are complete, it presents the user with the USER OUTPUT objects - IO output objects that are to be returned by the pipeline.
- __init__(jobname='pipeline', prog=None, logfh=None, restart_file=None)¶
- setOptions(subjob_hosts=None, njobs=None, adjust=None, force=None, cleanup=None, max_retries=None)¶
Set the options of the pipeline.
Call this function before calling pipeline.run() to set hosts/njobs/etc. When restarting, call this function to modify the options.
- readNewFormatVariable(varname, keywords)¶
- readNewFormatStage(stagename, keywords)¶
- readNewFormat(command_file)¶
- readFile(command_file)¶
Read a Pipeline input file.
- Raises
RuntimeError – Raised if there is a problem with input file.
- checkUserOutputs()¶
Make sure that all specified user outputs are variable names that are returned by a stage. This is done to fail on typos in input file.
- Raises
RuntimeError – Raised on invalid USEROUT name.
- createStage(stagename, inIOnames, outIOnames, stage_class, keywords)¶
Create a stage object and add it to the pipeline.
- Parameters
stagename – Name of the stage.
inIOnames – Input pipeio object names.
outIOnames – Output pipeio object names.
stage_class – module.class defining the stage.
keywords (list) – All keywords for the stage, a list of (keyword, value) tuples.
- Raises
RuntimeError or ImportError – Raised on input file error.
- reportParameters()¶
Print the parameters of each stage.
- inputsForStagesDefined()¶
Check if the inputs for all stages are either specified in the input variables or are outputs from other stages.
- Raises
RuntimeError – Raised if inputs are not defined, as this indicates the input file is invalid.
- startReadyStages()¶
Start all stages that are ready to be run.
When restarting, start WAITING and RESTARTING stages. When NOT restarting, start only WAITING stages. Return the number of stages that were started (not currently used).
- setStageOptions(stageobj)¶
Propagate the pipeline options (hosts, ncpus, etc) to the specified stage.
- requestCpus()¶
- updateStagesCpus()¶
Send messages to stages (if necessary) telling them how many processors to use from each host.
- dump()¶
Dumps the Pipeline instance to a restart file
- getStageByJobid(jobid)¶
- handleStageMessages()¶
- distributeCpus()¶
Called when extra CPUs become available (as given back by the stages using them). Will distribute the freed-up CPUs to other stages.
- run(idle_function=None)¶
Run the Pipeline.
- Parameters
idle_function – A routine to call periodically while running the pipeline.
- Raises
RuntimeError – Raised if Pipeline failed for any reason.
- getUserOutputs()¶
Return a list of pipeio objects that are to be presented to the user at the end of the Pipeline run.
- getStructureOutput()¶
- getUserOutputFiles()¶
Return a list of files for all user (final) outputs of Pipeline.
- printAction(stagename, stagejobid, action)¶
Print an action for stage
stagename
.- Parameters
stagejobid – The jobid of the stage.
action – The latest action of the stage.
- cleanupIntermediateFiles(stagejob)¶
Remove any stage outputs that are no longer needed. Intermediate files are any outputs of a previously completed stage. They are no longer needed if they are not marked as a USEROUT, and they are not needed as input for any yet-to-be-run stage.
- getUsedHosts(host_pool)¶
Return a dictionary of hosts that are CURRENTLY checked out (used) by all stages combined (within specified host_pool).
- schrodinger.pipeline.pipeline.Restart(restart_file, restartbeg=False)¶
Recover a saved Pipeline instance.
Specify new options only if the settings need to change.
Returns a Pipeline instance recovered from the restart_file. You need to call
pipeline.run()
in order to get the pipeline running.- Raises
RuntimeError – Raised if a Pipeline can’t be loaded from the specified file.
- Parameters
restartbeg – Whether to start failed stages from beginning.