"""
Classes for running pipelines.
The main class is called Pipeline. This class parses the input file, creates
appropriate stages, and runs them in their own subdirectories.
The StageJob class represents a pipeline job linked to a specific stage.
The IO (In/out object) classes (defined in pipeio.py) represent information
that is from one stage to another, such as a list of files. They are also
called Variables.
Input Object Syntax
===================
The Pipeline input file is used to specify which stages to run,
how to run them (parameters), what to use for input, and where
to send the output. An example input file looks like::
SET MY_INPUT
VARCLASS Structures
FILE /home/adzhigir/vsw_testing/20confs.mae
The `SET` line value (`MY_INPUT`) specifies the name of the IO object.
The `VARCLASS` value (`Structures`) specifies the PipeIO class to create.
Pipeline uses VARCLASS to determine which variable to create. Pipeline will
search schrodinger.pipeline.pipeio module for the class name specified of
this line. If it is not found there, it assumes a custom class is specified
as absolute path. (In this case, make sure the custom module is in your
`PYTHONPATH`.)
All lines following `VARCLASS` are used to define what information to put into
this variable, in this case it is a Maestro file (`20confs.mae`).
Stage Syntax
============
An example stage file looks like::
STAGE MY_STAGE
STAGECLASS macromodel.ConfSearchStage
INPUT MY_INPUT
OUTPUT MY_OUTPUT
FFLD MMFFS
The `STAGE` line value (`MY_STAGE`) specifies the name of the stage. The
`STAGECLASS` keyword specifies `<module>.<class name>` that defines the
stage. Pipeline uses `STAGECLASS` to determine which stage to create.
Pipeline will search schrodinger.pipeline.stages namespace as well. Please
make sure the module is in your `PYTHONPATH`.
See `schrodinger.pipeline.stages.combine` for an example on how to
write a stage module.
Input variables for the stage are specified via `INPUT` keywords, and outputs
via `OUTPUT` keywords. The rest of the keywords tell the stage how to run.
If you wish to run the Pipeline without using the pipeline startup
machinery::
p = pipeline.Pipeline([options])
p.readFile(<input file>)
try:
p.run()
except RuntimeError:
...
If restartability is important, specify the `restart_file` when
constructing the Pipeline object.
To restart Pipeline, do::
p = pipeline.Restart(restart_file [, new options]),
try:
p.run()
except RuntimeError:
...
where `restart_file` is the same file that you specified to this
constructor when the initial instance was created.
Copyright Schrodinger, LLC. All rights reserved.
"""
import os
import pickle
import shutil
import sys
import time
import weakref # For weak references to prevent cyclic references
import configobj
import schrodinger.pipeline.stage as stage_module
import schrodinger.utils.fileutils as fileutils
from schrodinger.application import inputconfig
from schrodinger.infra import mm
from schrodinger.infra import mmjob
from schrodinger.job import jobcontrol
from schrodinger.job import launcher
from schrodinger.utils import log
from schrodinger.utils import mmutil
# Check whether SCHRODINGER_PYTHON_DEBUG is set for debugging:
DEBUG = (log.get_environ_log_level() <= log.DEBUG)
# JOB_SERVER does not support -LOCAL option, so stages will not be restartable
USING_JOB_SERVER = mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER)
# Statuses of stage jobs:
WAITING = 'WAITING' # Not run yet
RUNNING = 'RUNNING' # Running right now
COMPLETED = 'COMPLETED' # Completed successfully
FAILED = 'FAILED' # Failed
RESTARTING = 'RESTARTING' # Failed eralier and marked for restart
# Dictionary of stages that were updated from the *.dump files.
# key: stagename, value: stage object.
# This variable is global because we don't want this info to go into
# the Pipeline *.restart file, as it is only temporary
updated_from_dump = {}
# For compatability with old names/locations for stages:
# (keys are old names, values are new names)
old_stage_classes = {
"merge.MergeStage": "glide.MergeStage", # r2006
"ligfilter.LigFilterStage": "filtering.LigFilterStage", # r2006
"phase.DBManage": "phase.DBManageStage", # only some old r2008 builds
# only some old r2008 builds
"phase.DBConfSites": "phase.DBConfSitesStage",
# Moved for r2010 alpha, Ev:95682
"chargefilter.ChargeFilterStage": "filtering.ChargeFilterStage",
"mmgbsa.MMGBSAStage": "prime.MMGBSAStage", # Ev:95727, r2011 alpha
"mopac.MopacStage": "semiemp.SemiEmpStage", # Ev:112709 r2012 alpha
"gencodes.GenCodesStage": "gencodes.Recombine", # Ev:83703
}
# Pipeline can be told to print the loggin infomation to any pipe,
# but the default is to print it to stdout, which corresponds to the
# main log file when running under jobcontrol:
global_logfh = sys.stdout
[docs]def log(text):
""" Prints specified text to the log pipe; adds a return at the end """
global_logfh.write(text + '\n')
global_logfh.flush()
[docs]def logn(text):
"""
Print the specified text to the log pipe with no newline. This is
especially useful when printing progress periods.
"""
global_logfh.write(text)
global_logfh.flush()
[docs]def add_host_lists(list1, list2):
"""
Append hosts in list2 to list1.
Example::
list1 = a:5,b:10
list2 = a:2,c:12
output = a:7,b:10,c:12
The order of hosts is retained (first list is given priority).
"""
out_list = []
list1_hostnames = []
for host, cpus in list1:
list1_hostnames.append(host)
for newhost, newcpus in list2:
if host == newhost:
cpus += newcpus
out_list.append((host, cpus))
for host, cpus in list2:
if host not in list1_hostnames:
out_list.append((host, cpus))
return out_list
[docs]def subtract_host_lists(list1, dict2):
"""
Return available (not used) hosts.
This function subtracts the host dict `dict2` from the host dict
`list1`.
:type list1: dict
:param list1:
All available hosts (specified by user), with hostname as key and
cpu count as value.
:type dict2: dict
:param dict2:
All used hosts (used by stages)
"""
avail_list = []
for host, cpus in list1:
used_cpus = dict2.get(host, 0)
if used_cpus == 0:
avail_list.append((host, cpus))
elif cpus is None: # Unlimited
avail_list.append((host, None))
else:
avail_cpus = cpus - used_cpus
if avail_cpus:
avail_list.append((host, avail_cpus))
return avail_list
def _host_is_queue(hostname):
"""
Return True if `hostname` is a queue.
"""
hostobj = jobcontrol.get_host(hostname)
if hostobj:
return hostobj.isQueue()
else:
return False
_job_objects = {} # Key: jobid, Value: Job object
def _get_job(jobid):
"""
Return the job object for job with `jobid`.
"""
if jobid not in _job_objects:
jobobj = jobcontrol.Job(jobid)
_job_objects[jobid] = jobobj
return _job_objects[jobid]
[docs]def importName(modulename, name):
"""
Import a named object from a module in the context of this function.
For example, if you would like to create an instance of the Foo class
from the bar.py module::
foo_class = importName("bar", "Foo")
foo_instance = foo_class()
:raises ImportError:
Raised when the object can not be imported.
"""
# Described in Python Cookbook (2002, section 15.3)
try:
module = __import__(modulename, globals(), locals(), [name])
except ImportError:
msg = "Could not import module: %s" % modulename
raise ImportError(msg)
try:
obj = vars(module)[name]
except KeyError:
msg = "Class %s is not defined in module %s!" % (name, modulename)
raise ImportError(msg)
return obj
[docs]class StageJob:
"""
A "Job" that is used by Pipeline to run a Stage.
Each StageJob has a Stage object associated with it. This object is
periodically dumped to a file in order to support restartability. The
process is called "dumping" and the file is the dump file. When Pipeline
is restarted, each stage object is recovered from the associated dump
file to get the latest state.
"""
[docs] def __init__(self, stageobj, pipeline, module):
"""
:param pipeline: Reference to the Pipeline object.
:type stageobj: Stage
:param stageobj: Stage to run.
:param module: The module where the above stage is defined.
"""
self.stageobj = stageobj # Stage object that is run by this stage job
self.pipeline = pipeline
self.module = module
self.jobid = "" # JobID of this stage job
self.status = WAITING # Can be: WAITING, RUNNING, DONE, or FAILED
# Hosts that are used by this stage AT THIS MOMENT
self.checked_out_hosts = []
self.num_needed_cpus = 0 # Number of CPUS needed by the stage
self.num_given_cpus = 0 # Number of CPUS given to the stage to use
self.host_pool = None
# The reason why the stage failed (e.g. "missing")
self.died_action = None
# Can't set the host_pool here, since self.pipeline.host_dict is not
# set yet
def _setHostPool(self):
"""
Set the self.host_pool variable based on the stage's product.
"""
# This method was created due to Ev:105135. The self.pipeline.host_dict
# needs to be read AFTER it is set, so can't be done in the constructor
# of this class, since pipeline.setOptions() is called afterwards.
if self.host_pool is not None:
return
# Can be equal to None as well:
product = self.stageobj.mainProduct()
# User specified special host list for this product
if product in self.pipeline.host_dict:
self.host_pool = product
else: # Use the general host list
self.host_pool = 'general'
[docs] def setUsedHosts(self, new_host_list):
diff = (new_host_list == self.checked_out_hosts)
self.checked_out_hosts = new_host_list
self.num_given_cpus = 0
for host, cpus in self.checked_out_hosts:
self.num_given_cpus += cpus
return diff
[docs] def getUnusedHostList(self):
# List of all AVAILABLE hosts for this product:
self._setHostPool()
hostlist = self.pipeline.host_dict[self.host_pool]
# Dict of all hosts used by all stages for this product
used_hosts_dict = self.pipeline.getUsedHosts(self.host_pool)
# List of hosts/cpus that are not used (available):
unused_hosts_list = subtract_host_lists(hostlist, used_hosts_dict)
return unused_hosts_list
[docs] def updateStageCpus(self):
"""
Based on current host usage and number of needed cpus, determine
which hosts this stage should use and send them to it in a message.
"""
requested_cpus = self.num_needed_cpus - self.num_given_cpus
# List of new hosts for the stage to use:
additional_hosts_to_use = []
for hostname, available_cpus in self.getUnusedHostList():
sys.stdout.flush()
if available_cpus is None or (available_cpus >= requested_cpus):
# Unlimited CPUS OR there are enough processors available
additional_hosts_to_use.append((hostname, requested_cpus))
requested_cpus = 0
break # Satisfied the request
elif available_cpus:
# At least some are available
additional_hosts_to_use.append((hostname, available_cpus))
requested_cpus -= available_cpus
if DEBUG:
print(' Addnl CPUs requested: %i, Addnl hosts granted: %s' %
(self.num_needed_cpus, additional_hosts_to_use))
# Append additional cpus to the checked_out_hosts list:
new_host_list = add_host_lists(self.checked_out_hosts,
additional_hosts_to_use)
self.setUsedHosts(new_host_list)
# Send message to stage updating its host list:
self.sendHostsToUse()
[docs] def sendHostsToUse(self):
"""
Send a message to the stage job telling it how many CPUS to use.
Gets called periodically in case messages don't go through.
"""
name = self.stageobj.stagename
if DEBUG:
print('Telling stage %s to use hosts: %s' %
(name, self.checked_out_hosts))
use_hosts_str = jobcontrol.host_list_to_str(self.checked_out_hosts)
msg = 'pipeline use %s' % use_hosts_str
if DEBUG:
print('DEBUG SENDING MESSAGE: "%s"' % msg)
mmjob.mmjobbe_send(self.jobid, msg)
sys.stdout.flush()
[docs] def restart(self, action):
"""
Mark this job to be restarted by the Pipeline.
"""
self.status = RESTARTING
self.printAction(action)
[docs] def finish(self):
"""
Sets the pipe's stage object to the final finished stage object
from the dump file, and parses all of the outputs.
"""
self.status = COMPLETED
self.printAction('completed')
sjname = self.stageobj.stagename
output_dump_file = sjname + ".out"
sys.stdout.flush()
if not os.path.isfile(output_dump_file):
print("ERROR: No output from stage!")
sys.stdout.flush()
self.died('failed') # quits the pipeline.
# The stage executable wrote the output objects to a file.
# Try to read that file:
with open(output_dump_file, "rb") as fh:
outputs = pickle.load(fh)
sys.stdout.flush()
output_objects = {}
for position, obj in outputs.items():
obj.check()
outnames = self.stageobj.getOutputNames()
if position not in outnames: # User did not request this output
continue
varname = outnames[position]
sys.stdout.flush()
obj.name = varname
output_objects[varname] = obj
if obj.isFilled():
count = obj.getCount()
if count is not None:
logn(" Output: %s (%s)" % (varname, count))
else:
logn(" Output: %s" % varname)
else:
logn(" Output: %s (empty)" % varname)
# FIXME: Add ability to return the number of structures:
if obj.type == 'ligands': # New is "structures"
logn(" Counting.")
count = obj.count()
log(" Number of structures: %i" % count)
else:
log('')
sys.stdout.flush()
for varname, var_obj in output_objects.items():
self.pipeline._objects[varname] = var_obj
self.pipeline.intermediate_varnames.append(varname)
[docs] def readyToRun(self, objects):
"""
Return True if this StageJob has all inputs that are required
for it to start.
"""
if self.status == RESTARTING:
sys.stdout.flush()
return True
elif self.status == WAITING:
inputs_ready = True
for varname in self.stageobj.getInputNames().values():
if varname not in objects:
inputs_ready = False
break
if not inputs_ready:
return False
return True
else: # Is running or has finished:
return False
[docs] def printAction(self, action):
"""
Call the Pipeline's printAction method.
"""
self.pipeline.printAction(self.stageobj.stagename, self.jobid, action)
[docs] def died(self, action):
"""
Mark this stage as failed. The "action" gets saved in the "died_action"
attribute, and will be printed out at the end of the workflow.
This gets called every time a StageJob dies by raising a RuntimeError
exception.
"""
self.printAction(action)
self.died_action = action
# When the Pipeline will be restarted, this stage will be restarted
# too:
self.status = FAILED
# Throw the CPUs used by this stage back into the common pool:
self.num_needed_cpus = 0 # Number of CPUS needed by the stage
self.setUsedHosts([])
self.pipeline.distributeCpus()
self.pipeline.dump() # Save the state of the pipeline
[docs] def printFailureMessage(self):
"""
Print the failure status of the stage and the path to the log file.
"""
print("Stage %s %s" % (self.stageobj.stagename, self.died_action))
log_file = self.stageobj.stagename + '.log'
if os.path.exists(log_file):
print(" SEE LOG FILE:", os.path.abspath(log_file))
else:
print(" No log file to examine; missing file: %s" % log_file)
sys.stdout.flush()
[docs] def updateFromDump(self, quiet=False):
"""
Update this stage of the pipeline to the latest state from the dump
file.
"""
sjname = self.stageobj.stagename
stage_dump_file = sjname + ".dump"
if os.path.isfile(stage_dump_file):
try:
latest_stageobj = stage_module.Restart(stage_dump_file)
except Exception as err:
if DEBUG:
print('EXCEPTION FROM stage.Restart():', err)
msg = 'Pipeline: WARNING: Failed to update %s from dump file! Will restart from beginning' % sjname
log(msg)
else: # update successful
updated_from_dump[sjname] = latest_stageobj
if not quiet:
msg = 'Pipeline: Updated stage %s from dump file successfully.' % sjname
log(msg)
else:
msg = 'Pipeline: WARNING: No dump file exists for stage %s! Will restart from beginning' % sjname
log(msg)
def _writeStageExecutable(self, destination):
"""
Write a Stage executable python script that will be submitted under
jobcontrol, and save it to `destination`.
:type destination: str
:param destination: The filename to write the stage executable python
script to.
"""
stage_class_module = self.module
#######################################################################
string = """
import os
import pickle
import sys
script = sys.argv[0]
try:
import {0}
except ImportError:
raise ImportError(script + ': Could not import module <{0}>.')
stagename = os.path.splitext(script)[0]
restart_file = stagename + '.dump'
try: # Load the stage dump file:
with open(restart_file, "rb") as fh:
stage = pickle.load(fh)
except Exception:
raise RuntimeError(script + ': Could not load stage from dump file')
######### MODIFY THIS SO THAT THE OPTIONS ARE UPGRADED EVEN WHEN RESTARTING ###
if not stage.hasStarted(): # If NOT restarting
print('Stage', stage.stagename, 'initializing...')
for position, obj in stage.iterInputs():
obj.check() # Check to see if the object is valid
else: # Restarting
print('Stage', stage.stagename, 'preparing to restart...')
# Periodically dump this instance to the dump file:
# Run the instance:
try:
outputs = stage.run(restart_file=restart_file)
except RuntimeError as err:
print(err) # Print the error without traceback
sys.exit(1) # Exit this script
# Dump the outputs to a dump file:
try:
with open(stagename + '.out', 'wb') as fh:
pickle.dump(outputs, fh, protocol=2)
except Exception:
raise RuntimeError(script + ': Could not write the output file')
""".format(stage_class_module)
with open(destination, "w") as fh:
fh.write(string)
[docs]class Pipeline:
"""
A controller responsible for running the stages in the correct order.
Pipeline parses the input file, creates instances of all IO objects,
stage objects, and stage job objects, submits the stages in the
appropriate directories, and waits for them to finish. Once a stage
finishes, it starts any stages that depend on its output. When all
stages are complete, it presents the user with the USER OUTPUT objects -
IO output objects that are to be returned by the pipeline.
"""
[docs] def __init__(
self,
jobname="pipeline", # Jobname of the pipeline
prog=None, # Name of the program
logfh=None, # Pipe where log text is to be sent to
restart_file=None # Periodically dump pipeline here
):
self.jobname = jobname
self.prog = prog
# If no restart file specified, use <jobname>.restart in CWD:
if restart_file:
self.restart_file = restart_file
else:
# Ev:63865 Save the restart file as local path:
self.restart_file = jobname + ".restart"
# If log file specified, dump all output to that file:
global global_logfh
if logfh:
global_logfh = logfh
else:
global_logfh = sys.stdout # Write to standard out by default
self.idle_function = None # Function to run periodically
self.stagejobs = [] # List of all stage jobs.
self._objects = {} # dictionary containing the actual data objects
# where keys are string variable names.
# variable names for variables that are
self.intermediate_varnames = []
# stage outputs (workflow inputs are
# excluded)
# variable names that need to be available to the user when the job
# finishes.
self._user_outputs = set()
# Job control will copy these files back to the
# launch directory.
# Variable name that is to be incorporated into Maestro
self._structure_output = None
self.host_dict = {} # key: product pool; value: host string
# Default value for -host (and -HOST sometimes)
self.host_dict['general'] = [('localhost', 1)]
# Default value for -njobs (Use number of hosts as -NJOBS), Ev:117548
self.njobs = None
# Default value for -adjust (adjust only if -NJOBS was not spcified),
# Ev:117548
self.adjust = None
self.force = None
self.cleanup = True # Default value for -cleanup
self.max_retries = None # What used specified for -max_retries.
# Default: use SCHRODINGER_MAX_RETRIES or 2.
self._has_started = False
self.restart_from_beginning = False # Ev:83981
[docs] def setOptions(
self,
subjob_hosts=None, # hosts to run subjobs on. Dictionary
# key: product pool; value: host list
# must include "general" host list
njobs=None, # number of subjobs to create
adjust=None,
# whether to continue with jobs in the event of subjob
# failure
force=None,
# whether to adjust njobs to a reasonable #
cleanup=None,
# should each stage cleanup after itself
max_retries=None, # how many times to restart subjobs
):
"""
Set the options of the pipeline.
Call this function before calling pipeline.run() to set
hosts/njobs/etc. When restarting, call this function to modify the
options.
"""
# Change the options only when new values have been supplied.
# Otherwise run as previously (when restarting)
if subjob_hosts is not None:
for host_pool, hoststr in subjob_hosts.items():
# For every non-queued host, if NCPUS was NOT specified (None),
# set it to 1. Leave it as None (unlimited) for queued hosts:
try:
hostlist = jobcontrol.host_str_to_list(hoststr)
except:
raise RuntimeError("Invalid host entry: '%s'" % hoststr)
hostlist2 = []
for (hostname, ncpus) in hostlist:
if ncpus is None and not _host_is_queue(hostname):
ncpus = 1
hostlist2.append((hostname, ncpus))
self.host_dict[host_pool] = hostlist2
if njobs is not None:
self.njobs = njobs
print('Setting njobs to:', njobs)
if adjust is not None:
print('Setting adjust to:', adjust)
self.adjust = adjust
if force is not None:
print('Setting force to:', force)
self.force = force
if cleanup is not None:
print('Setting cleanup to:', cleanup)
self.cleanup = cleanup
if max_retries is not None:
print('Setting max_retries to:', max_retries)
self.max_retries = max_retries
print('')
def _addUserOut(self, value):
value = str(value) # just in case
self._user_outputs.add(value)
def _setStructOut(self, value):
value = str(value) # just in case
self._structure_output = value
[docs] def readFile(self, command_file):
"""
Read a Pipeline input file.
:raise RuntimeError:
Raised if there is a problem with input file.
"""
# Determine if new or old format input file is specified:
configobj_format = False
with open(command_file) as fh:
for line in fh:
if line.strip().startswith('['):
configobj_format = True
break
if configobj_format:
self.readNewFormat(command_file)
else:
msg = "ERROR: Invalid input file. This version of VSW does not support this file format."
print(msg)
sys.stdout.flush()
log(msg)
raise RuntimeError(msg)
self.inputsForStagesDefined()
self.checkUserOutputs()
for sjob in self.stagejobs:
sjob.stageobj.initNonPersistent(self)
[docs] def checkUserOutputs(self):
"""
Make sure that all specified user outputs are variable names that
are returned by a stage. This is done to fail on typos in input
file.
:raise RuntimeError:
Raised on invalid USEROUT name.
"""
# Initialize to input object names:
output_varnames = []
for fullname in list(self._objects):
output_varnames.append(fullname)
for stagejob in self.stagejobs:
for varname in stagejob.stageobj.getOutputNames().values():
output_varnames.append(varname)
for shortvarname in self._user_outputs:
fullname = '%s-%s' % (self.jobname, shortvarname)
if fullname not in output_varnames:
raise RuntimeError("Invalid USEROUT: %s" % shortvarname)
if self._structure_output is not None:
fullname = '%s-%s' % (self.jobname, self._structure_output)
if fullname not in output_varnames:
raise RuntimeError("Invalid STRUCTOUT: %s" %
self._structure_output)
[docs] def createStage(self, stagename, inIOnames, outIOnames, stage_class,
keywords):
"""
Create a stage object and add it to the pipeline.
:param stagename:
Name of the stage.
:param inIOnames:
Input pipeio object names.
:param outIOnames:
Output pipeio object names.
:param stage_class:
module.class defining the stage.
:type keywords: list
:param keywords:
All keywords for the stage, a list of (keyword, value) tuples.
:raise RuntimeError or ImportError:
Raised on input file error.
"""
for s in self.stagejobs:
if s.stageobj.stagename == stagename:
raise RuntimeError("Duplicate stage:" + stagename)
s = stage_class.split('.')
stage_class_name = s[-1]
stage_class_module = ".".join(s[:-1])
if not stage_class:
raise RuntimeError("ERROR: No class specified for stage " +
stagename)
if not stage_class_module: # Use default stage module:
raise RuntimeError("ERROR: No stage module specified for stage " +
stagename)
# Import the class dynamically to avoid exec():
# Will raise ImportError if can't import:
try:
StageClass = importName(stage_class_module, stage_class_name)
except:
tmp = "schrodinger.pipeline.stages.%s" % stage_class_module
# Will raise ImportError if can't import:
try:
StageClass = importName(tmp, stage_class_name)
except:
raise ImportError("Could not import %s" % stage_class)
stage_class_module = tmp
# Create an instage of the StageClass:
self_weakref = weakref.proxy(self) # weak reference to pipeline
driver_dir = os.getcwd()
stageobj = StageClass(stagename, inpipeline=True, driver_dir=driver_dir)
# Will work for new and old formats:
for keyname, value in keywords:
if keyname.endswith('_FILE') and value:
# Ev:68946 Any file specified as a local path will be invalid
# when we enter the stage's directory, so make it absolute:
if DEBUG:
print("DEBUG: Converting to absolute path: %s" % value)
value = os.path.abspath(value)
stageobj[keyname] = value
for key, varname in enumerate(inIOnames):
fullname = '%s-%s' % (self.jobname, varname)
stageobj.setInput(key + 1, name=fullname)
for key, varname in enumerate(outIOnames):
fullname = '%s-%s' % (self.jobname, varname)
stageobj.setOutputName(key + 1, fullname)
stagejob = StageJob(stageobj, self_weakref, stage_class_module)
self.stagejobs.append(stagejob)
# Ev:45421 make sure all products that stage requires are installed:
stageobj.checkProducts()
def _addVariable(self, var_obj, varname):
"""
Add an initiation variable to the pipeline.
:raise RuntimeError:
Raised if there is a problem with the variable name or if the
variable name is already used.
"""
for letter in varname:
if not letter.isalnum() and letter not in ['_', '-']:
msg = "pipeline.py: varname must contain only alpha-numeric characters, dashes, and underscores."
msg += "\n invalid variable name: %s" % varname
raise RuntimeError(msg)
fullname = '%s-%s' % (self.jobname, varname)
if fullname in list(self._objects):
raise RuntimeError("Duplicate variable:" + fullname)
var_obj.name = varname
self._objects[fullname] = var_obj
self._objects[fullname].check()
[docs] def reportParameters(self):
"""
Print the parameters of each stage.
"""
#### Move these to Stage.run() #############################
log("General subjob hosts: %s" % str(self.host_dict['general']))
for host_pool, hosts in self.host_dict.items():
if host_pool != 'general':
log("%s hosts: %s" % (host_pool, hosts))
if not self.njobs:
log("Number of subjobs: Same as number of processors")
else:
log("Number of subjobs: %i" % self.njobs)
if self.adjust is None:
log("Adjust subjobs: False")
# log( "Adjust subjobs: Only if neither #subjobs nor #CPUs are
# specified")
else:
log("Adjust subjobs: %s" % str(self.adjust))
force_msg = ("Force job: Glide Docking job will continue in the event"
" of Glide subjob failure (merge output from successful"
" subjobs).")
if not self.force:
force_msg = ("No force job: Will terminate upon first docking"
" subjob failure instead of merging output from"
" successful Glide subjobs.")
log(force_msg)
# Determine what the stages will use for max_retries:
if self.max_retries is None: # User did not specify -max_retires:
env_max_retries = os.getenv('SCHRODINGER_MAX_RETRIES')
if env_max_retries:
max_retries = int(env_max_retries)
else:
max_retries = 2
else:
max_retries = self.max_retries
log("Maximum retries: %i" % max_retries)
log("Cleanup: %s" % str(self.cleanup))
for stagejob in self.stagejobs:
log('')
stagejob.stageobj.reportParameters(global_logfh)
log('')
log("Status of stages:")
log(" Stage Name Status")
log(" ------------------------------ ---------")
for stagejob in self.stagejobs:
s = " %-30s %s" % (stagejob.stageobj.stagename, stagejob.status)
log(s)
log('')
[docs] def startReadyStages(self):
"""
Start all stages that are ready to be run.
When restarting, start WAITING and RESTARTING stages. When NOT
restarting, start only WAITING stages. Return the number of stages
that were started (not currently used).
"""
num_started = 0
for stagejob in self.stagejobs:
if not stagejob.readyToRun(self._objects):
continue
# RESTARTING or WAITING
num_started += 1
self._submitStage(stagejob)
# Must be called after cd'ing into Pipeline's dir:
self.dump() # dump the Pipeline instance
return num_started
[docs] def setStageOptions(self, stageobj):
"""
Propagate the pipeline options (hosts, ncpus, etc) to the specified
stage.
"""
product = stageobj.mainProduct()
if product == 'mmod':
product = 'macromodel'
# Return hosts list specific to the product (if specified);
# otherwise the general host list:
hosts = self.host_dict.get(product, self.host_dict['general'])
if DEBUG:
print('DEBUG: Setting stage (%s) [%s] hosts to %s' %
(stageobj.stagename, stageobj.mainProduct(), hosts))
sys.stdout.flush()
# If max_retries passed to stageobj is None, then
# SCHRODINGER_MAX_RETRIES (or 2) is used.
stageobj.setJobOptions(hosts, self.njobs, self.adjust, self.force,
self.max_retries, self.cleanup)
[docs] def requestCpus(self):
pass
[docs] def updateStagesCpus(self):
"""
Send messages to stages (if necessary) telling them how many
processors to use from each host.
"""
def _submitStage(self, stagejob):
"""
Start the specified stage.
"""
# We are in the stage directory during this function execution...
###### ALL PATHS ARE RELATIVE TO STAGE'S DIRECTORY ######
# At this point we are in the stage's directory already #
#########################################################
# Ev:52954
# When a stage starts, it is not allcated any processors.
# When it needs to use a host/processor, it needs to check it
# from the Pipeline (jobdj will do that) by sending a jobcontrol
# message to it with the number of processors needed.
# Pipeline will respond by sending a message stating how many
# processors to use.
# When the stage no longer needs as many processors (or needs more)
# it sends another message to Pipeline asking for a different number,
# after which Pipeline sends a message with updated numbers.
sjname = stagejob.stageobj.stagename # Just for shorter lines
log_file = sjname + ".log"
dump_file = sjname + ".dump"
stage_script = sjname + ".py"
out_file = sjname + ".out"
stagejob._writeStageExecutable(stage_script)
stagelauncher = launcher.Launcher(
script=stage_script,
copyscript=True, # To allow use of custom (not built-in) stages
jobname=sjname,
host='localhost',
prog=self.prog, # Stage program name
local=not USING_JOB_SERVER, # For restartability
)
# stagejob.stageobj is already updated from the dump file (if exists)
# ####
# stored in Pipeline, NOT restart file
if stagejob.status == RESTARTING:
if self.restart_from_beginning:
# Ev:83981
print('Restarting stage "%s" from beginning' % sjname)
stagejob.status = WAITING
elif sjname not in updated_from_dump:
# Dump file did not exist for this stage
print(
'WARNING: No dump file for stage "%s"; starting from beginning'
% sjname)
stagejob.status = WAITING
if stagejob.status == RESTARTING:
# Restarting; use recovered instance:
stageobj = updated_from_dump.get(sjname)
stageobj.initNonPersistent(self)
else: # NOT restarting
# Use the not-started instance:
stageobj = stagejob.stageobj
# Will submit the instance of the stage that was created when the
# Pipeline was created. This instance has never run before. This
# is because when the stage runs, it never modifies the Pipeline
# instance, since it is running in its own process.
# Set the input objects for the stage and add the files of input
# objects to the jobcontrol -in file list:
for position, varname in stageobj.getInputNames().items():
var_obj = self._objects[varname]
stageobj.setInput(position, obj=var_obj)
for f in var_obj.getFiles():
stagelauncher.addInputFile(f)
# Set the names for output objects for the stage:
for position, varname in stageobj.getOutputNames().items():
stageobj.setOutputName(position, varname)
###### MODIFY THE STAGE INSTANCE - launch options ##########
# Any that are default are set to None:
self.setStageOptions(stageobj)
if os.path.isfile(dump_file):
# Dump the modified instance of the stage to tmp file:
tmp_file = dump_file + '.tmp'
stagejob.pipeline = None
with open(tmp_file, "wb") as fh:
pickle.dump(stageobj, fh, protocol=2)
# weak reference to pipeline
stagejob.pipeline = weakref.proxy(self)
# Backup the old file & copy new from tmp:
dump_bu = dump_file + '.bu'
if os.path.isfile(dump_bu):
os.remove(dump_bu) # Required for Win
os.rename(dump_file, dump_bu)
os.rename(tmp_file, dump_file)
else: # No need to back up the old instance:
with open(dump_file, "wb") as fh:
pickle.dump(stageobj, fh, protocol=2)
stagelauncher.addScriptArgs([dump_file])
stagelauncher.setStdOutErrFile(log_file)
# Now registers the dump file as an input file instead of log file,
# because jlaunch.pl now deletes the registered log files when
# launching the job. The problem with this approach is that the dump
# file will not be copied to to the launch dir at the end of the job.
# Dump file is registered as output file as well, at the end of
# stage's run(), to be copied back to driver's job dir.
stagelauncher.addInputFile(dump_file)
stagelauncher.addOutputFile(dump_file)
stagelauncher.addOutputFile(out_file)
job = stagelauncher.launch(print_jobid=False)
stagejob.jobid = job.job_id
if stagejob.status == RESTARTING:
stagejob.printAction('restarted')
else:
stagejob.printAction('launched')
stagejob.status = RUNNING
sys.stdout.flush()
def _updateStageStatusRestarting(self):
"""
Update the status of each stage job.
This runs when restarting a pipeline. If a job has failed for any
reason, it marks it to be restarted. If a job is still running or
submitted, it does nothing.
"""
for stagejob in self.stagejobs:
stagename = stagejob.stageobj.stagename
if stagejob.status == FAILED:
stagejob.restart('failed')
elif stagejob.status == RUNNING:
# Stage.stageobj is already UPDATED from the dump file!!!
# Was RUNNING, now COMPLETED.
if stagejob.stageobj.hasCompleted():
_get_job(self.jobid).download()
stagejob.finish()
self.dump()
self.cleanupIntermediateFiles(stagejob)
else:
# NOT completed successfully (failed or still running):
try:
jobobj = _get_job(stagejob.jobid)
jobobj.readAgain()
except mm.MmException as e:
if e.rc == mmjob.MMJOB_FILE_NOT_FOUND:
stagejob.restart('missing')
else:
raise # any other error
else: # Have gotten the job object.
# The job is under jobcontrol and the last time I
# checked was RUNNING:
if jobobj.Status in ['running', 'submitted']:
print("Stage %s is still %s" %
(stagename, jobobj.Status))
pass # The job is still running
else: # Failed or completed not successfully:
stagejob.restart(jobobj.Status)
def _updateStageStatus(self):
"""
Update the status of each RUNNING stage job object.
This is periodically run by the pipeline. It checks the status of
RUNNING jobs and updates them if they have completed of failed.
"""
for stagejob in self.stagejobs:
if stagejob.status != RUNNING:
continue
try:
jobobj = _get_job(stagejob.jobid)
jobobj.readAgain()
except mm.MmException as e:
if e.rc == mmjob.MMJOB_FILE_NOT_FOUND:
stagejob.died('missing')
continue
else:
raise # any other error
# Have aquired the job object.
# The job is under jobcontrol and the last time I checked was
# RUNNING:
if jobobj.Status in ["completed", "exited", "stranded", "fizzled"]:
try:
exitstatus = jobobj.ExitStatus
except:
pass # No exit status -- most likely still running
else: # Got exit status successfully.
if exitstatus in ["died", "stranded", "killed", "fizzled"]:
stagejob.died(exitstatus)
else: # good exit status:
# Download the output files of the stage job.
# This will allow the updated dump from stage job to be read.
_get_job(stagejob.jobid).download()
stagejob.updateFromDump(quiet=True)
latest_stageobj = updated_from_dump.get(
stagejob.stageobj.stagename)
if latest_stageobj is None:
print(
'ERROR: Stage has completed but could not be updated from the dump file'
)
stagejob.died('failed') # Will quit the Pipeline
elif not latest_stageobj.hasCompleted():
print(
'ERROR: Stage has completed but stageobj.hasCompleted() is False'
)
stagejob.died('failed') # Will quit the Pipeline
else:
# Stage object completed successfully
stagejob.finish() # Was RUNNING, now COMPLETED.
self.dump()
self.cleanupIntermediateFiles(stagejob)
else:
pass # The job is still running
def __getstate__(self):
state_dict = dict(self.__dict__)
# Backend objects can't be usefully pickled as they depend on the
# jmonitor instance the job was original launched under.
state_dict['_backend'] = None
return state_dict
[docs] def dump(self):
""" Dumps the Pipeline instance to a restart file """
idle_function_bu = self.idle_function
# Make the object pickable:
self.idle_function = None
for stagejob in self.stagejobs:
stagejob.pipeline = None
# Dump to restart file (if any):
if self.restart_file:
temp_file = self.restart_file + '.tmp'
with open(temp_file, "wb") as fh:
pickle.dump(self, fh, protocol=2)
# VSW-843 Must use force_rename() on Windows
# (it will replace the existing file too):
fileutils.force_rename(temp_file, self.restart_file)
# Run idle callback (if any):
if idle_function_bu:
idle_function_bu()
# Restore to original state:
self.idle_function = idle_function_bu
for stagejob in self.stagejobs:
stagejob.pipeline = weakref.proxy(self)
def _getMaxRetries(self):
"""
Unimplemented.
"""
[docs] def getStageByJobid(self, jobid):
for stagejob in self.stagejobs:
if stagejob.jobid == jobid:
return stagejob
return None
[docs] def handleStageMessages(self):
# Read messages from stages:
cpus_freed_up = False
message = self._backend.nextMessage()
while message:
s = message.split(None, 3)
# s[0] will be "pipeline"
if len(s) < 3:
print("PIPELINE ERROR: Could not handle message:")
print(' "%s"' % message)
raise RuntimeError("Cound not handle message: %s" % message)
stage_jobid = s[1]
total_ncpus_requested = int(s[2])
if len(s) < 4: # hosts list is empty:
stage_hosts_str = ''
else:
stage_hosts_str = s[3]
stagejob = self.getStageByJobid(stage_jobid)
if not stagejob:
print('ERROR: Cound not find stage with jobid: %s' %
stage_jobid)
sys.stdout.flush()
# Update Pipeline's list of currently used hosts from the info
# sent to it by the stage:
new_hosts = jobcontrol.host_str_to_list(stage_hosts_str)
stagejob.setUsedHosts(new_hosts)
# Update Pipeline's list of NCPUS requested by this stage:
if total_ncpus_requested < stagejob.num_needed_cpus:
cpus_freed_up = True
if total_ncpus_requested != stagejob.num_needed_cpus:
if DEBUG:
print('Stage %s requested %i cpu(s) total' %
(stagejob.stageobj.stagename, total_ncpus_requested))
stagejob.num_needed_cpus = total_ncpus_requested
# Update the list of hosts/cpus that the stage can use:
stagejob.updateStageCpus()
message = self._backend.nextMessage()
if cpus_freed_up:
if DEBUG:
print('Some CPUs freed up, distributing...')
sys.stdout.flush()
self.distributeCpus()
[docs] def distributeCpus(self):
"""
Called when extra CPUs become available (as given back by the stages
using them). Will distribute the freed-up CPUs to other stages.
"""
for stagejob in self.stagejobs:
avail_host_list = stagejob.getUnusedHostList()
more_cpus_needed = (stagejob.num_needed_cpus >
stagejob.num_given_cpus)
if avail_host_list and more_cpus_needed:
stagejob.updateStageCpus()
[docs] def run(self, idle_function=None):
"""
Run the Pipeline.
:param idle_function:
A routine to call periodically while running the pipeline.
:raise RuntimeError:
Raised if Pipeline failed for any reason.
"""
if DEBUG:
print("Running in debug mode")
self.idle_function = idle_function
restarting = self._has_started
if restarting:
print("Attempting to restart the pipeline...")
else:
print("Starting the pipeline...")
self._has_started = True
# Get the backend of this Pipeline driver job:
self._backend = jobcontrol.get_backend()
if self._backend:
print('\nWill listen to "pipeline" messages from stages')
self._backend.addMessageName('pipeline')
# Tell jobcontrol to periodically copy back the stage log files:
# (previously this was done in the startup script, which caused
# the jlaunch argument to be way too long) Ev:100401
if self._backend:
# We are running under job control
for stagejob in self.stagejobs:
log_file = stagejob.stageobj.stagename + '.log'
self._backend.addLogFile(log_file)
num_stages = 0
for stagejob in self.stagejobs:
num_stages += 1
# If restarting, the pipeline reference needs to be updated:
stagejob.pipeline = weakref.proxy(self)
if num_stages == 0:
log("ERROR: No stages to run! Please use readFile() to read an input file"
)
raise RuntimeError
# Update the statuses of stages based on latest jobcontrol info:
# Re-throw exception to reduce traceback printouts:
try:
if restarting:
# Will set status of failed stages to RESTARTING
# and status of completed stages to COMPLETED:
self._updateStageStatusRestarting()
else:
self._updateStageStatus()
except RuntimeError as e:
raise RuntimeError(str(e))
self.reportParameters()
self.printAction(None, None, None)
# ***** PIPELINE STARTED *****
old_statuses = []
while True:
# Run this loop until no more stages are running:
new_statuses = [(stagejob.status) for stagejob in self.stagejobs]
if old_statuses != new_statuses:
# Status of any stage changed, start ready stages:
self.startReadyStages()
self.dump()
tmp_statuses = [(stagejob.status) for stagejob in self.stagejobs
]
if RUNNING not in tmp_statuses:
# No stages are running/ready:
break # exit the loop
# Check for messages every second (and handle, if any)
if self._backend:
self.handleStageMessages()
# Sleep for 3 seconds - prevents from taking down the system.
time.sleep(3)
old_statuses = [(stagejob.status) for stagejob in self.stagejobs]
try:
# get updated status from jobcontrol for each stage:
self._updateStageStatus()
except RuntimeError as e:
raise RuntimeError(str(e))
# ***** PIPELINE FINISHED *****
num_waiting = new_statuses.count(WAITING)
num_failed = new_statuses.count(FAILED)
if num_failed:
# Print the list of stages that have failed:
failed_stages = [s for s in self.stagejobs if s.status == FAILED]
print("")
for stagejob in failed_stages:
stagejob.printFailureMessage()
print("")
self.dump() # Save the state of the pipeline before exiting
# Kill the Pipeline, since there are no more stages to run:
snames = [sj.stageobj.stagename for sj in failed_stages]
raise RuntimeError("Stage(s) failed: %s. Exiting." %
', '.join(snames))
elif num_waiting:
print(
"No more running stages, %i waiting stages, and none of them are ready."
% num_waiting)
print("User outputs until this point:", self._user_outputs)
if self.prog:
print(self.prog, end=' ')
print("EXITING: no ready stages!")
sys.stdout.flush()
raise RuntimeError("ERROR: no ready stages!")
else:
log("All stages have completed sucessfully.")
uouts = self.getUserOutputs()
structout = self.getStructureOutput()
if uouts:
output_files = []
incorporatable_file = None
log("User outputs:")
for obj in uouts:
# For each output file, if it's a USEROUT file, move it to
# the driver's directory (from stage's directory) and
# register it with jobcontrol so that it gets copied back:
obj_files = obj.getOutputPaths()
for filename in obj_files:
output_files.append(filename)
# Figure out whether this file needs to be incorporated:
is_struct_out = structout and (obj.name == structout.name)
if is_struct_out and len(obj_files) == 1:
incorporatable_file = obj_files[0]
if is_struct_out:
msg = " %s (incorporatable) %s" % (obj.name, str(obj))
else:
msg = " %s %s" % (obj.name, str(obj))
log(msg)
# Make a link (or copy) each output file into the CWD and
# add them to the job record:
for filename in output_files:
# Make a link to the output file to CWD (driver job
# directory):
directory, local_filename = os.path.split(filename)
if os.path.abspath(filename) != os.path.abspath(
local_filename):
# Only if this file is not already in the CWD,
# create a link to it in the CWD. If creating a
# link failed for whatever reason, make a copy:
# Ev:84675 & Ev:97589
try:
fileutils.create_hard_link(filename, local_filename)
except:
if os.path.isdir(filename):
# Typically a Phase directory
shutil.copytree(filename, local_filename)
else:
shutil.copy(filename, local_filename)
# Register the link with jobcontrol:
if self._backend: # Driver running under jobcontrol
# NOTE: For this file to be copied to launch dir
# correctly it needs to be in driver's job dir:
self._backend.addOutputFile(local_filename)
if filename == incorporatable_file:
self._backend.setStructureOutputFile(local_filename)
sys.stdout.flush()
[docs] def getUserOutputs(self):
"""
Return a list of pipeio objects that are to be presented to the
user at the end of the Pipeline run.
"""
outs = []
for varname in self._user_outputs:
fullname = '%s-%s' % (self.jobname, varname)
try:
obj = self._objects[fullname]
except KeyError: # Stage did not set this output object
continue
obj.name = varname
outs.append(obj)
return outs
[docs] def getStructureOutput(self):
if self._structure_output is None:
return None
else:
fullname = '%s-%s' % (self.jobname, self._structure_output)
try:
obj = self._objects[fullname]
except KeyError: # Stage did not set this output object
return None
obj.name = self._structure_output
return obj
[docs] def getUserOutputFiles(self):
"""
Return a list of files for all user (final) outputs of Pipeline.
"""
output_files = []
for obj in self.getUserOutputs():
output_files.extend(obj.getFiles())
return output_files
[docs] def printAction(self, stagename, stagejobid, action):
"""
Print an action for stage `stagename`.
:param stagejobid:
The jobid of the stage.
:param action:
The latest action of the stage.
"""
# m is the width of the #jobs column:
# m==1 when #jobs is 1-9, 2 when #jobs is 10-99, 3 when #jobs is
# 100-999.
m = len(str(len(self.stagejobs)))
if stagename is None:
log("Stage activity keys...")
log(" C: Number of completed stages")
log(" A: Number of active/running stages")
log(" W: Number of waiting/pending stages")
line = "\n%*s %*s %*s | %s" % (m, "C", m, "A", m, "W",
"Stage name and activity")
log(line)
s = "-" * m
line = "%*s %*s %*s | %s" % (m, s, m, s, m, s,
"-----------------------")
log(line)
else:
new_statuses = [(stagejob.status) for stagejob in self.stagejobs]
num_completed = new_statuses.count(COMPLETED)
num_running = new_statuses.count(RUNNING)
num_waiting = new_statuses.count(WAITING)
line = "%*d %*d %*d | %s (%s) %s." % (m, num_completed, m,
num_running, m, num_waiting,
stagename, stagejobid, action)
log(line)
sys.stdout.flush()
[docs] def getUsedHosts(self, host_pool):
"""
Return a dictionary of hosts that are CURRENTLY checked out (used)
by all stages combined (within specified host_pool).
"""
used_hosts = {}
for stagejob in self.stagejobs:
if stagejob.host_pool == host_pool:
for hostname, njobs in stagejob.checked_out_hosts:
try:
used_hosts[hostname] += njobs
except KeyError:
used_hosts[hostname] = njobs
return used_hosts
[docs]def Restart(restart_file, restartbeg=False):
"""
Recover a saved Pipeline instance.
Specify new options only if the settings need to change.
Returns a Pipeline instance recovered from the restart_file. You need
to call `pipeline.run()` in order to get the pipeline running.
:raise RuntimeError:
Raised if a Pipeline can't be loaded from the specified file.
:param restartbeg:
Whether to start failed stages from beginning.
"""
if not os.path.isfile(restart_file):
err = "Pipeline.Restart(): file does not exist: %s" % restart_file
raise IOError(err)
log('Pipeline: Loading from a saved state...')
if not os.path.isfile(restart_file):
raise RuntimeError("Pipeline.Restart(): restart file does not exist.")
try:
with open(restart_file, "rb") as fh:
pipeline = pickle.load(fh)
except Exception as err:
msg = str(err)
msg += '\n' + \
"Pipeline.Restart(): could not load job from restart file."
raise RuntimeError(msg)
log('Successfully loaded Pipeline from restart file.')
global updated_from_dump
updated_from_dump = {} # Reset the dictionary
# Update stages from the dump file if not WAITING:
for stagejob in pipeline.stagejobs:
if stagejob.status != WAITING:
stagejob.updateFromDump()
if updated_from_dump:
log(' Some stages were updated from dump files')
log('') # new-line to seperate from pipeline.run() messages
if restartbeg:
# Ev:83981
pipeline.restart_from_beginning = True
return pipeline
# EOF