"""
This module provides infrastructure for expanding a stepper
workflow into an application for sharing and deploying with customers.
To use, subclass StepperApplication and implement the abstract methods (see
the docstring for StepperApplication for more info). By using StepperApplication,
you'll be implementing an interface with other stepper applications making
deployment and use of your stepper workflows much easier.
========
GLOSSARY
========
Constants dict: These are dictionaries providing values that are constant
within a deployment. These often define static/shared resources that
are available to all compute nodes on a cluster.
User dict: Dictionaries defining user inputs and settings for a particular
workflow run. These often include parameters for how to run a workflow
along with input files.
Configuration dict: Configuration dicts are the the combination of a user
dict with a constants dict. They should define all information necessary
to run the stepper application.
"""
from ruamel import yaml
import sys
import argparse
from typing import Iterable
from schrodinger.job import launchapi
from schrodinger.job import jobcontrol
def _get_schrodinger_product():
if '-FROM' in sys.argv:
return sys.argv[sys.argv.index('-FROM') + 1]
return None
[docs]class StepperApplication:
"""
Base class for all stepper applications. To use, subclasses *must*
implement the following abstract methods:
- runWorkflow
- deploymentCheck
- setUpTestUserDict
- writeConfiguration
The following methods are optional to implement but are highly
recommended:
- validate
- getLocalInputFiles
- getLocalInputFolders
See the docstrings of the individual methods to see what is expected
for each.
Note that the docstrings for the abstract methods will be used as the help
message for their associated subcommands
(run/deployment_check/test/write_config)
After subclassing, you can expose your StepperApplication cmdline interface
by creating a python file under `$SCHRODINGER/python/scripts` and
setting `get_job_spec_from_args` and calling your app's `main` method.
This is better described through example::
# my_stepper_app.py
class MyApp(StepperApplication):
... # Implement abstract methods here.
get_job_spec_from_args = MyApp.get_job_spec_from_args
if __name__ == '__main__':
MyApp.main()
"""
[docs] @classmethod
def validate(cls, config_dict: dict):
"""
Validate that the `config_dict` is properly configured. This is
where subclasses will construct a stepper workflow and
call `validateSettings()`. If there are any issues with the
configuration, it's expected that this method will raise an
exception.
When the application is run as a job, validation will be run
on the job-launching machine. The job itself will skip validation
by default. If an application would like to validate during
job execution, it's free to call this method within `runWorkflow`.
NOTE::
This is a good place to call `my_workflow.report()` so users
can see the topology of the workflow they will run.
:param config_dict: The configuration dictionary to run the application.
See the module glossary for more info.
"""
pass
[docs] @classmethod
def runWorkflow(cls, config_dict: dict):
"""
This method does the actual running of the stepper workflow(s)
associated with this stepper application.
Often times the workflows will generate output files that users are
interested in. To add output files or folders to be brought back
from a job run, use `addOutputFile` and `addOutputFolder`.
:param config_dict: The configuration dictionary used for setting
up and running the workflow. Usually includes settings and inputs.
"""
raise NotImplementedError
[docs] @classmethod
def deploymentCheck(cls, constants_dict: dict):
"""
This method checks that a particular deployment of this application is
set up correctly. It's expected that SAs will run this method on new
deployments before beginning to run small tests.
Some potentially useful checks that can go here:
- Confirm all static files noted in `constants_dict` exist
- Confirm the license capacity of the license server
- Confirm cloud service (aws/gcp) credentials are set up correctly
:param constants_dict: Dictionary providing values that are constant
within a deployment. These often define static/shared resources that
are available to all compute nodes on a cluster.
"""
raise NotImplementedError
[docs] @classmethod
def setUpTestUserDict(cls, constants_dict: dict, large=False):
"""
This method is used to create a user_dict for starting a
test run of the application. The user dict will be used in
conjunction with `constants_dict` to create a configuration to
start a test run.
Implementations of this method should be able to create both a
user dict for both small and large runs. Small runs should ideally
run in <30m but should still exercise as much functionality as possible
of the workflow(s).
:param constants_dict: Dictionary providing values that are constant
within a deployment. These often define static/shared resources that
are available to all compute nodes on a cluster.
:param config_fname: Where to write the configuration dict to.
"""
raise NotImplementedError
[docs] @classmethod
def writeConfiguration(cls, user_dict: dict, constants_yaml: str,
config_fname: str):
"""
Given a user_dict and constants_dict, implementations of this method
should write out a fully-fledged configuration file at `config_fname`.
:param user_dict: Dictionary defining user inputs and settings for a
particular workflow run.
:param constants_yaml: Filepath to yaml file providing values that are
constant within a deployment. These often define static/shared
resources that are available to all compute nodes on a cluster.
:param config_fname: Where to write the configuration dict to.
"""
raise NotImplementedError
[docs] @classmethod
def get_job_spec_from_args(cls, argv):
if jsb := cls.get_job_spec_builder_from_args(argv):
return jsb.getJobSpec()
[docs] @classmethod
def get_job_spec_builder_from_args(cls, argv):
"""
Implements the LaunchAPI method get_job_spec_from_args. Sets up
a job to run on a host. If the user specifies `--dry-run` however,
validation will be run instead and no job will be submitted.
:returns: JobSpecificationArgsBuilder or None
"""
args = cls.parseArgs(argv[1:])
if args.subcmd == 'test':
new_subcmd = cls._setUpRemoteTest(args)
argv[1:] = new_subcmd
args = cls.parseArgs(argv[1:])
if args.subcmd == 'run' and not args.skip_validation:
config_dict = load_yaml(args.config_yaml)
cls.validate(config_dict)
if args.dry_run:
return
argv.append('--skip-validation')
args = cls.parseArgs(argv[1:])
inp_files = []
inp_dirs = []
if getattr(args, 'config_yaml', None):
inp_files.append(args.config_yaml)
config_dict = load_yaml(args.config_yaml)
inp_files.extend(cls.getLocalInputFiles(config_dict))
inp_dirs.extend(cls.getLocalInputFolders(config_dict))
schrodinger_product = _get_schrodinger_product()
jsb = launchapi.JobSpecificationArgsBuilder(
argv, schrodinger_product=schrodinger_product, use_jobname_log=True)
jobname = jsb.getJobname()
if not jobname:
jsb.setJobname(cls.__name__)
if getattr(args, 'constants_yaml', None):
inp_files.append(args.constants_yaml)
for fname in inp_files:
jsb.setInputFile(fname)
for dir_ in inp_dirs:
jsb.setInputDirectory(dir_)
return jsb
[docs] @classmethod
def main(cls, args=None):
args = cls.parseArgs(args=args)
args.func(args)
[docs] @classmethod
def parseArgs(cls, args):
parser = cls._getParser()
if args is None:
args = sys.argv[1:]
if args and args[0] not in [
'run', 'run_deployment_checks', 'test', 'write_config'
]:
return parser.parse_args(args=['run'] + args)
return parser.parse_args(args=args)
[docs] @classmethod
def registerOutputFile(cls, fname):
if backend := jobcontrol.get_backend():
backend.addOutputFile(fname)
[docs] @classmethod
def registerOutputFolder(cls, dir_):
if backend := jobcontrol.get_backend():
backend.addOutputFile(dir_)
@classmethod
def _getParser(cls):
parser = argparse.ArgumentParser(
description=cls.__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
subparsers = parser.add_subparsers(dest='subcmd')
subparser_run = subparsers.add_parser(
'run', formatter_class=argparse.RawDescriptionHelpFormatter)
subparser_run.add_argument('config_yaml', type=str)
grp = subparser_run.add_mutually_exclusive_group()
grp.add_argument('--dry-run', action='store_true')
grp.add_argument('--skip-validation', action='store_true')
subparser_run.set_defaults(func=cls._invokeRunWorkflow)
subparser_write_config = subparsers.add_parser(
'write_config',
description=cls.writeConfiguration.__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
subparser_write_config.add_argument('user_yaml', type=str)
subparser_write_config.add_argument('constants_yaml', type=str)
subparser_write_config.add_argument('config_yaml', type=str)
subparser_write_config.set_defaults(func=cls._invokeWriteConfig)
subparser_run_deployment_checks = subparsers.add_parser(
'run_deployment_checks',
description=cls.deploymentCheck.__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
subparser_run_deployment_checks.add_argument('constants_yaml', type=str)
subparser_run_deployment_checks.set_defaults(
func=cls._invokeDeploymentCheck)
subparser_test = subparsers.add_parser(
'test',
description=cls.setUpTestUserDict.__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
subparser_test.add_argument('constants_yaml', type=str)
subparser_test.add_argument('--large',
action='store_true',
default=False)
subparser_test.set_defaults(func=cls._invokeTest)
return parser
@classmethod
def _invokeWriteConfig(cls, args):
user_dict = load_yaml(args.user_yaml)
cls.writeConfiguration(user_dict, args.constants_yaml, args.config_yaml)
@classmethod
def _invokeDeploymentCheck(cls, args):
constants_dict = load_yaml(args.constants_yaml)
cls.deploymentCheck(constants_dict)
@classmethod
def _invokeTest(cls, args):
constants_dict = load_yaml(args.constants_yaml)
user_dict = cls.setUpTestUserDict(constants_dict, args.large)
cls.writeConfiguration(user_dict, args.constants_yaml, 'config.yaml')
config_dict = load_yaml('config.yaml')
cls.validate(config_dict)
cls.runWorkflow(config_dict)
@classmethod
def _setUpRemoteTest(cls, args):
user_dict = cls.setUpTestUserDict(args.constants_yaml, args.large)
config_fname = 'test_config.yaml'
cls.writeConfiguration(user_dict, args.constants_yaml, config_fname)
return f'run {config_fname}'.split()
@classmethod
def _invokeRunWorkflow(cls, args):
config_dict = load_yaml(args.config_yaml)
if not args.skip_validation:
cls.validate(config_dict)
if not args.dry_run:
cls.runWorkflow(config_dict)
[docs]def load_yaml(filename):
with open(filename) as fh:
return yaml.load(fh.read(), Loader=yaml.RoundTripLoader)
[docs]def write_yaml(config_dict, filename):
with open(filename, 'w') as outfile:
return yaml.dump(config_dict, outfile, Dumper=yaml.RoundTripDumper)