"""
Steps for use in Workflows
Copyright Schrodinger, LLC. All rights reserved.
"""
import glob
import math
import os
import pathlib
import shutil
from schrodinger import structure
from schrodinger.application.desmond import cms
from schrodinger.application.macromodel import utils as mmodutils
from schrodinger.application.matsci import desmondutils
from schrodinger.application.matsci import fragments
from schrodinger.application.matsci import \
jaguar_multistage_workflow_utils as jmswfu
from schrodinger.application.matsci import jaguarworkflows
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import msutils
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import msprops
from schrodinger.application.matsci import uq_utils
from schrodinger.infra import mm
from schrodinger.job import jobcontrol
from schrodinger.job import queue
from schrodinger.utils import fileutils
from schrodinger.utils import imputils
from schrodinger.utils import sea
# Import drivers
[docs]def import_driver(dpath):
scripts = fileutils.get_mmshare_scripts_dir()
return imputils.import_module_from_file(os.path.join(scripts, dpath))
DO_NOT_USE_FLAG = 'do_not_use_this_flag'
SAVE_PROPS_PROPERTY = 's_matsci_save_props'
[docs]class JobCreationError(Exception):
""" Raised when a job cannot be created """
[docs]class BaseWorkflow(jaguarworkflows.WorkFlow):
"""
Base Workflow class
This class does not assume the first step is the main step. Instead, set the
main_step property to the main step. This is the step that will get all the
workflow properties added to it.
The main step will also be the only step in the main PT group. All other
steps will be in a "subjobs" subgroup.
"""
[docs] def __init__(self, *args, **kwargs):
kwargs.setdefault('subhierarchy', 'subjobs')
super().__init__(*args, **kwargs)
[docs]class SubdirectoryStepMixin:
"""
A mixin to handle some common functionality for steps that run in
subdirectories
"""
[docs] def __init__(self, *args, **kwargs):
""" See parent class for documentation """
super().__init__(*args, **kwargs)
self.input_files = []
[docs] def finishProcessingJobControlJob(self):
""" Add all the subjob files to the backend """
backend = jobcontrol.get_backend()
if backend:
if self.USES_JC:
for ifile in self.input_files:
ipath = os.path.join(self.job_name, ifile)
backend.addOutputFile(ipath)
jobutils.add_subjob_files_to_backend(self.job,
path=self.job_name)
else:
# Subprocess jobs don't have a way to record files to copy
# back, so just copy back the entire directory
for afile in glob.iglob(f'{self.job_name}/*'):
backend.addOutputFile(afile)
[docs] def setSubDir(self):
"""
Determine the absolute path to this step's subdirectory so that we
can always access it no matter what the current directory is
"""
backend = jobcontrol.get_backend()
if backend:
job_path = backend.getJob().JobDir
else:
job_path = os.getcwd()
self.subdir = os.path.join(job_path, self.job_name)
[docs]class JaguarOptimization(SubdirectoryStepMixin, jaguarworkflows.OptStep):
"""
Class to run a Jaguar optimization and possibly thermochemistry
This class overrides many of the parent class methods to enable the step to
run in a subdirectory. The parent class expects the job to run in the main
workflow job directory.
"""
STEP_NAME = 'Optimization'
JOB_BASE = 'opt'
USES_JC = True
[docs] def __init__(self, *args, **kwargs):
""" See parent class for documentation """
super().__init__(*args, **kwargs)
self.job_name = self.job_name or (self.workflow.base_name + '_' +
self.JOB_BASE)
self.step_name = self.step_name or self.STEP_NAME
self.setSubDir()
[docs] def createJob(self):
""" Override the parent method to work in the subjob directory """
qjob = super().createJob()
qjob._command_dir = self.subdir
return qjob
[docs] def start(self):
""" Override the parent method to work in the subjob directory """
os.makedirs(self.subdir, exist_ok=True)
with fileutils.chdir(self.subdir):
return super().start()
[docs] def getOutput(self, *args, **kwargs):
""" Override the parent method to work in the subjob directory """
os.makedirs(self.subdir, exist_ok=True)
with fileutils.chdir(self.subdir):
output = super().getOutput(*args, **kwargs)
if self.results:
self.results.path = os.path.join(self.subdir, self.results.path)
return output
[docs] def createSmapFile(self):
"""
Create the smap file that allows the surfaces to automatically import
when the structure is manually imported into Maestro
"""
# Note that this method is not currently used, but is left here as an
# example of how to fix subjobs run in a subdirectory so that surfaces
# and vibrational files can be attached properly to the structures in
# the main results file in the main job directory.
# Writing the smap file manually is necessary due to JAGUAR-9464
vis_stems = ('potential', 'density')
def _write_lines(smap_name, st_name, stub, num):
with open(smap_name, 'w') as sfile:
sfile.write('# smap version 1.0\n')
sfile.write(st_name + '\n')
for stem in vis_stems:
sfile.write(f'{stub}_{stem}.vis: {num}\n')
sfile.write('#end\n')
# This writes the smap file for the subjob .01.mae file in the subjob
# subdirectory. While not technically necessary, it makes it so the user
# can import the structure manually and get the associated data.
smap_name = self.job_name + '.01.smap'
st_name = self.job_name + '.01.mae'
stub = self.job_name
# Here we use num = 1 with the expectation that this job produces a
# single structure in the .01.mae file, and therefore the smap file
# should point to the first (only) structure in that file.
num = 1
with fileutils.chdir(self.subdir):
_write_lines(smap_name, st_name, stub, num)
jobutils.add_outfile_to_backend(os.path.join(self.subdir, smap_name))
# This writes the smap file for the main output structure file
st_name = self.workflow.output_name
stub = fileutils.get_basename(st_name)
smap_name = stub + '.smap'
# This number should be the index of the structure in the main results
# (st_name) file. num=2 is completely made up for this example.
num = 2
_write_lines(smap_name, st_name, stub, num)
# Now copy the .vis files to the main directory and add main directory
# files to the backend
for stem in vis_stems:
fname = f'{self.job_name}_{stem}.vis'
path = os.path.join(self.subdir, fname)
new_name = f'{stub}_{stem}.vis'
shutil.copy(path, new_name)
jobutils.add_outfile_to_backend(new_name)
jobutils.add_outfile_to_backend(smap_name)
[docs]class BaseResults:
"""
A simple results class to provide the functionality from the
jaguarworkflows.Results class that is still needed for non-Jaguar jobs
"""
[docs] def __init__(self, parent):
"""
Create a BaseResults class
:param `jaguarworkflows.Step` parent: The class these results are for
"""
self.parent = parent
[docs] def getMaeStructure(self):
"""
Get the resulting Maestro structure for the parent class
"""
return self.parent.getStructForWriting()
[docs]class BaseStep(SubdirectoryStepMixin, jaguarworkflows.Step):
"""
A base class for Steps that are not simple Jaguar jobs. Because this step
fires off a Python script that itself fires off Jaguar jobs, we have to
modify a number of methods that dealt with the jaguar .in file directly or
expected standard Jaguar output.
"""
PROCS = 1
RESULTS_CLASS = BaseResults
STEP_NAME = 'Base Step'
JOB_BASE = 'base'
# The command line will be formed in the following order:
# - driver path (if not None)
# - flags in the self.flags dictionary, which is formed from:
# - The class variable FLAGS
# - flags passed in to the __init__ via the flags keyword (which
# override values in the class variable FLAGS)
# - input name if FLAGS_ADD_INPUT_NAME is True
# - values in the self.cmd_args list
# - -JOBNAME flag if FLAGS_ADD_JOB_NAME is True
# - -TPP flag if FLAGS_ADD_TPP is True
DRIVER_PATH = 'driver_dir/driver.py'
DRIVER = None
# For "-flag value" type flags, use FLAG[flag] = value
# For "-flag_with_no_value" flags, use FLAG[flag] = None
FLAGS = {}
# Add the string given by getInputName() after the command line flags
FLAGS_ADD_INPUT_NAME = True
# Preface the input name with this command line flag - leave blank if the
# input name is a positional parameter rather than prefaced by a -flag
FLAGS_INPUT_NAME_FLAG = ""
# Add -JOBNAME self.job_name after the command line flags
FLAGS_ADD_JOB_NAME = True
# Add -TPP options.TPP after the command line flags
FLAGS_ADD_TPP = False
# Use -- right before input file name to ensure keywords are terminated
FLAGS_ADD_DOUBLE_DASH = False
# Step might create a trajectory based on flag settings
CAN_CREATE_TRAJECTORY = False
# If step requires CMS input
REQUIRES_CMS_INPUT = False
# If step requires TRAJECTORY input
REQUIRES_TRAJECTORY_INPUT = False
# If the step outputs CMS
OUTPUTS_CMS = False
# If the step uses Job Control
USES_JC = True
# Property names in SAVE_PROPS will be saved to the main workflow structure
SAVE_PROPS = set()
# Property names that start with anything in SAVE_PROP_STARTS will be saved
# to the main workflow structure
SAVE_PROP_STARTS = set()
[docs] def __init__(self,
*args,
procs=PROCS,
tag=None,
flags=None,
archive=False,
monitor_globs=None,
full_command=None,
local_files=None,
input_file=None,
**kwargs):
"""
Create a BaseStep object
:param int procs: The number of processors to allocate to steps of this
class
:param tag: str(tag) will be added to the end of the job name and step
name
:param dict flags: Command line argument flags in addition to, or that
override, the class FLAGS variable. Keys are flag names ("-flag"),
values are values for that flag. For flags that take no value, use
None as the value. To turn off (not include in the command line) a
flag in the class FLAGS variable, use the DO_NOT_USE_FLAG constant
as the value.
:param bool archive: If True, instead of copying the subdirectory and
individual files, will tar.gz up the entire subdirectory and copy
that back.
:param list monitor_globs: List of valid file globs (i.e. `["*.zip"]`)
that should be monitored for and copied back to the original job
directory when they appear. Note that the interaction of archive and
moitor_globs is such that if both are specified, the result will be
a subdirectory with the monitored files AND an archive of the entire
subdirectory (including any monitored files that remain when this
step completes).
:param list full_command: the full command line for this step including
the driver path, etc. but without $SCHRODINGER/run. The command will
be used "as is" except for replacing some pre-defined tokens with
job-specific strings - see the createCommand function. Each item of
the list is a word of the command line invocation.
:param list local_files: A list of file paths to copy into the job
subdirectory before running the command
:param str input_file: The name of the file containing the input
structures/data. It is ignored if there is a parent for this step.
See parent class for additional documentation
"""
self.procs = procs
super().__init__(*args, **kwargs)
# self.results is set to None in the super class so set it afterwards
self.results = self.RESULTS_CLASS(self)
self.job_name = self.job_name or (self.workflow.base_name + '_' +
self.JOB_BASE)
self.step_name = self.step_name or self.STEP_NAME
# Check tag not None instead of "if tag" because it could be 0
if tag is None:
self.tag = tag
else:
self.tag = str(tag)
self.job_name += '_' + self.tag
self.step_name += ' ' + self.tag
self.setSubDir()
self.flags = self.FLAGS.copy()
if flags:
self.flags.update(flags)
# These are command line positional arguments that will appear after all
# the items in self.flags and after the input file name (if requested).
self.cmd_args = []
self.full_command = full_command
self.archive = archive
self.monitor_globs = monitor_globs
self.files_monitored = set()
self.cmd = []
self.local_files = local_files or []
# Associate with input file only if there is no parent
self.input_file = None if self.parent else input_file
[docs] def getParentalSubdirectories(self):
"""
Get the subdirectory names for parents and required stages
:rtype: str, str
:return: The parent subdirectory, and a space-separated string of all
required stage subdirectories. The subdirectory paths are relative
to the overall job directory.
"""
if self.parent:
parent_subdir = os.path.basename(self.parent.subdir)
else:
parent_subdir = ""
if self.noninheritable_parents:
slist = []
for nih_parent in self.noninheritable_parents:
slist.append(os.path.basename(nih_parent.subdir))
requires_subdirs = ' '.join(slist)
else:
requires_subdirs = ""
return parent_subdir, requires_subdirs
[docs] def addParentalSubdirProperties(self, struct):
"""
Add parental subdirectory properties to the structure
:param `schrodinger.structure.Structure` struct: The structure to add
properties to
"""
parent_subdir, requires_subdirs = self.getParentalSubdirectories()
if parent_subdir:
struct.property[msprops.PARENT_SUBDIRECTORY] = parent_subdir
if requires_subdirs:
struct.property[msprops.REQUIRES_SUBDIRECTORIES] = requires_subdirs
[docs] def start(self):
"""
Start the job - create the input and write it, adding necessary output
files to make sure they get copied back
"""
# First check to see if there is an existing output file for this step -
# this might have been created in a previous aborted run.
if self.getOutput(quiet=True):
self.log('Completed output found, will not run again')
self.finish()
return
os.makedirs(self.subdir, exist_ok=True)
for lfile in self.local_files:
shutil.copy(lfile, self.subdir)
struct = self.getStructure()
with fileutils.chdir(self.subdir):
self.writeInput(struct)
try:
self.job = self.createJob()
except JobCreationError:
self.log('Will not be run')
return
self.workflow.jobq.addJob(self.job)
self.log('Added to job queue')
[docs] def removeIncomingProperties(self, struct):
"""
Remove properties on the structure that may have come from previous
steps and that we do not want to propagate through this step
:param `structure.Structure` struct: The structure to remove properties
from
"""
if isinstance(struct, cms.Cms):
# If this is a CMS-type structure, remove the trajectory property
struct.remove_cts_property(desmondutils.PROP_TRJ)
else:
# This is an mae structure, perhaps from Jaguar
msutils.remove_properties(struct, matches=['_j_'])
[docs] def getStructure(self):
"""
Get the starting structure for this step
Overrides the parent class to get the initial structure for this
workflow if the Step has no parent.
:rtype: schrodinger.structure.Structure or None
:return: The starting structure for this step. None if input is not a
structure.
"""
struct = super().getStructure()
if not struct:
struct = self.workflow.struct
# In case there is no parent input structure and step input structure.
if struct:
self.removeIncomingProperties(struct)
return struct
def _getOutputData(self):
"""
Return the list of output structures from this calculation
:rtype: list
:return: The output structures from this step
"""
oname = os.path.join(self.subdir, self.getOutputName())
if not os.path.exists(oname):
return
output = list(structure.StructureReader(oname))
return output
[docs] def getOutput(self, quiet=False):
"""
Read in the results of the calculation
:type quiet: bool
:param quiet: If True, no error messages will be printed. If False,
(default) error messages will be printed. Also, if True, self.ok
will not be set to False if the output file cannot be read.
:rtype: None or list
:return: None if the calculation failed, or a list of output structures
from a successful calculation.
"""
if not self.ok:
return None
if not quiet:
self.ok = False
output = self._getOutputData()
if not output:
if not quiet:
oname = os.path.join(os.path.basename(self.subdir),
self.getOutputName())
self.log(f'No output structures found, looked for {oname}')
return None
self.ok = True
self.finished = True
return output
[docs] def getStructForWriting(self):
"""
Get the structure from this step to write to the final Workflow file
:rtype: `schrodinger.structure.Structure`
:return: The structure to write
"""
structs = self._getOutputData()
if structs:
return structs[0]
else:
return None
[docs] def getOutputName(self):
"""
Get the name of the output structure file
:rtype: str
:return: The output structure file name
"""
return self.job_name + '-out.maegz'
[docs] def finish(self):
"""
Finish this step, including processing any output
"""
output = self.getOutput()
self.processOutput(output)
self.finished = True
self.log('Work completed')
[docs] def write(self, writer, props=None, hierarchy=None):
"""
Add the final structure for this step to the output structure file
:type writer: schrodinger.StructureWriter
:param writer: The writer to use to write the structure
:type props: dict
:param props: A dictionary of property/value pairs to add to the
property dictionary of this object.
:param list hierarchy: The project group hierarchy for this result -
each item is a str
"""
if self.ok and self.finished:
struct = self.getStructForWriting()
if not struct:
self.log('Unable to obtain results')
return
struct.title = self.job_name
# Ensure the structure does not have an entry name of
# Scratch, which prevents the structure from being incorporated
struct.property['s_m_entry_name'] = self.job_name
if props:
struct.property.update(props)
if hierarchy:
msutils.set_project_group_hierarchy(struct,
hierarchy,
collapsed=True)
# Add a property to help downstream analysis find files from this
# subjob when given the overall job directory
subbase = os.path.basename(self.subdir)
struct.property[msprops.SUBDIRECTORY_PROP] = subbase
writer.append(struct)
[docs] def createCommand(self):
"""
Create the command line
:rtype: list
:return: The command line in list form
:raise JobCreationError: If the command cannot be created
"""
if self.full_command:
# Use the command exactly as specified
# Replacing the input name
input_name = self.getInputName()
ibase = fileutils.splitext(input_name)[0]
mask = jmswfu.CommandLine.INPUT_MASK
cmd = [x.replace(mask, ibase) for x in self.full_command]
# Replace the parent step trajectory info
tmask = jmswfu.CommandLine.TRAJ_MASK
if tmask in cmd:
if not self.parent:
self.log(f'Command contains {tmask} but this step has no '
'parent so there is no parent trajectory.')
raise JobCreationError(f'No parent for {tmask}')
try:
trajdir = self.parent.getTrajectoryName()
except AttributeError:
trajdir = None
if not trajdir:
self.log('Failed to create job because command contains '
f'{tmask} but parent step has no trajectory '
'information.')
raise JobCreationError(f'Unable to replace {tmask}')
if not os.path.exists(trajdir):
self.log('Cannot find parent trajectory required for this '
f'step: {trajdir}')
raise JobCreationError(f'Unable to find {trajdir}')
cmd = [x.replace(tmask, trajdir) for x in cmd]
else:
cmd = []
if self.DRIVER_PATH:
cmd += [self.DRIVER_PATH]
for flag, value in self.flags.items():
if value == DO_NOT_USE_FLAG:
continue
cmd += [flag]
if value is not None:
cmd += [str(value)]
if (self.FLAGS_ADD_TPP and self.workflow.options.TPP and
self.workflow.options.TPP > 1):
cmd += [parserutils.FLAG_TPP, str(self.workflow.options.TPP)]
# Add to ensure parsing of the keywords ends before positional
# arguments like the input file
if self.FLAGS_ADD_DOUBLE_DASH:
cmd += ['--']
# Add the input file name
if self.FLAGS_INPUT_NAME_FLAG:
cmd += [self.FLAGS_INPUT_NAME_FLAG]
if self.FLAGS_ADD_INPUT_NAME:
cmd += [self.getInputName()]
cmd.extend(self.cmd_args)
if self.FLAGS_ADD_JOB_NAME:
cmd += ['-JOBNAME', self.job_name]
return cmd
[docs] def createJob(self):
"""
Create the job command and object
:rtype: `jobutils.RobustSubmissionJob`
:return: The job object to run Macromodel
"""
self.cmd = self.createCommand()
return self.createQJobFromCommand(self.cmd)
[docs] def createQJobFromCommand(self, cmd):
"""
Create the JobDJ job from the list of command line arguments. The job
will be set to run in the Step subdirectory.
:rtype: `jobutils.RobustSubmissionJob`
:return: A job object that can be added to JobDJ
"""
if self.USES_JC:
return jobutils.RobustSubmissionJob(cmd,
command_dir=self.subdir,
procs=self.procs)
else:
if self.procs > 1:
raise JobCreationError('Cannot run steps that do not use '
'job control but use more than 1 '
'processor')
return queue.SubprocessJob(cmd, command_dir=self.subdir)
[docs] def processOutput(self, output):
"""
Process the output of a job during the finish part of the step
:param output: The output of the job. Type may vary in subclasses
"""
if not output:
return
struct = output[0]
propnames = struct.property.get(SAVE_PROPS_PROPERTY, "")
userprops = set(propnames.split())
if not any((self.SAVE_PROPS, userprops, self.SAVE_PROP_STARTS)):
return
for prop, value in struct.property.items():
if prop in self.SAVE_PROPS or prop in userprops:
self.workflow.properties[prop] = value
else:
for pstart in self.SAVE_PROP_STARTS:
if prop.startswith(pstart):
self.workflow.properties[prop] = value
[docs] def archiveSubDir(self):
"""
Archive this step's subdirectory and add it to the backend for copy back
"""
aname = self.subdir + '.tar.gz'
jobutils.archive_job_data(aname, [self.subdir])
jobutils.add_outfile_to_backend(aname)
[docs] def finishProcessingJobControlJob(self):
"""
Override the parent method to archive the directory if requested
"""
if self.archive:
self.archiveSubDir()
else:
super().finishProcessingJobControlJob()
[docs] def periodicMaintenance(self):
""" This method is periodically called while the workflow is running """
self.monitorFiles()
[docs] def monitorFiles(self):
"""
Check for any requested files that need to be copied back immediately
"""
if not self.monitor_globs:
return
backend = jobcontrol.get_backend()
if not backend:
return
# Run the recursive glob from the subjob directory not the main
# directory - we only want to find files for this step
with fileutils.chdir(self.subdir):
for mglob in self.monitor_globs:
for mfile in pathlib.Path().rglob(mglob):
# Paths for jobcontrol must be relative to the main
# directory
mpath = os.path.join(self.subdir, mfile)
if mpath not in self.files_monitored:
backend.copyOutputFile(mpath)
self.files_monitored.add(mpath)
[docs]class MacromodelConfSearchStep(BaseStep):
""" Step to perform a macromodel conformational search """
STEP_NAME = 'ConfSearch'
JOB_BASE = 'csearch'
DRIVER_PATH = 'bmin'
FLAGS = {}
FLAGS_ADD_INPUT_NAME = False
FLAGS_ADD_JOB_NAME = False
[docs] def __init__(self, *args, **kwargs):
""" See parent class for documentation """
super().__init__(*args, **kwargs)
self.cmd_args.append(self.job_name)
[docs] def getComUtil(self):
"""
Get the macromodel com utility that holds the settings to write
:rtype: `schrodinger.application.macromodel.utils.comUtil`
:return: The comUtil object with settings
"""
# Note - these settings reproduce
# the Conformational search GUI default settings when no solvent is
# selected
mcu = mmodutils.ComUtil(
serial=True,
ffld=desmondutils.OPLS3,
# Energy window for saving structures (kJ/mol)
demx_final=21.0,
demx_prelim=42.0,
# Write the EXNB line (cutoff parameters)
exnb=False,
# Don't write the NANT line (enantiomers)
nant=True)
mcu.serial_nonconf_input = True
mcu.BDCO[5] = 41.5692
# pure low modes explored
mcu.LMCS[3] = 0
# allowed approach distance in A
mcu.LMCS[6] = 0
mcu.DEMX[2] = 833
mcu.MCNV[2] = 5
# Do not use symmetry library
mcu.MSYM[1] = 0
# Number of steps per rotatable bond
mcu.AUOP[5] = 100
# tors, new setup
mcu.AUTO[5] = 0.0
# minimum ring size to setup
mcu.AUTO[7] = 0
# 'non-restrictive' sampling (MacroModel 9.0 deflault)
mcu.AUTO[8] = 1
# max energy diff required to skip geometric comp
mcu.CRMS[5] = 0.0
# max dist between 'equivalent' corresponding atoms
mcu.CRMS[6] = 0.5
# maximum dihedral angle change for polar hydrogens
mcu.CRMS[7] = 0.0
# minimize converge on gradient
mcu.CONV[5] = 0.05
# iterations
mcu.MINI[3] = 2500
# hessian cutoff, where appropriate
mcu.MINI[6] = 0.0
return mcu
[docs] def writeCom(self):
""" Write out the Macromodel com file """
mcu = self.getComUtil()
iname = self.getInputName()
cname = self.getComName()
oname = self.getOutputName()
mcu.mcmmlmod(mae_file=iname, com_file=cname, out_file=oname)
self.input_files.append(cname)
[docs] def getComName(self):
"""
Get the name of the com file for this step
:rtype: str
:return: The name of the com file
"""
return self.job_name + '.com'
[docs]class VoidStep(BaseStep):
""" A step for running the Distributed Voids driver """
STEP_NAME = 'Distribute voids'
JOB_BASE = 'voids'
DRIVER_PATH = 'distribute_voids_gui_dir/distribute_voids_driver.py'
DRIVER = import_driver(DRIVER_PATH)
FLAGS = {DRIVER.FLAG_VOID_PCT: 10, DRIVER.FLAG_FORCE: None}
[docs] def getOutputName(self):
"""
Get the name of the output structure file
:rtype: str
:return: The output structure file name
"""
return self.job_name + '-voids.maegz'
[docs]class BDEStep(BaseMetaJaguarWorkflowStep):
""" A step for running the Bond Dissociation Energy workflow """
KEYS = {
'dftname': 'B3LYP',
'basis': 'MIDIXL',
'igeopt': 1,
'iaccg': 3,
'maxit': 100,
'nofail': 1,
'isymm': 0,
'nops_opt_switch': 10
}
STEP_NAME = 'BDE'
JOB_BASE = 'bde'
DRIVER_PATH = 'bond_dissociation_gui_dir/bond_dissociation_driver.py'
DRIVER = import_driver(DRIVER_PATH)
FLAGS = {'-states': 's0', '-bonds': None, '-allow_hx': 'no'}
FLAGS_ADD_DOUBLE_DASH = True
[docs] def __init__(self, *args, **kwargs):
"""
Create a BDEStep instance.
See parent class for additional documentation.
"""
super().__init__(*args, **kwargs)
keystring = msutils.keyword_dict_to_string(self.KEYS)
self.flags['-keywords'] = keystring
self.cmd_args.append(self.getOutputName())
[docs] def getStructsOfType(self, stype):
"""
A generator for structures of the given type from BDE results
:param str stype: The type of structure to get 'Reactants', 'Reactions',
'Fragments'
:rtype: Iterator
:return: Iterator of `schrodinger.Structure` for the request type
"""
structs = self.getOutput()
if not structs:
return
for struct in self.getOutput():
archy = msutils.get_project_group_hierarchy(st=struct)
if archy[-1] == stype:
yield struct
[docs] def processOutput(self, output):
"""
Find the minimum BDE and store it as a workflow property
:param output: Unused, kept for API compatibility
"""
struct = self.getStructForWriting()
bde = struct.property.get(msprops.WEAKEST_BDE_PROP)
if bde is not None:
self.workflow.properties[msprops.WEAKEST_BDE_PROP] = bde
[docs] def getStructForWriting(self):
"""
Get the structure to write for this step. It will be the reactant
structure
:rtype: `schrodinger.Structure`
:return: The reactant structure
"""
return next(self.getStructsOfType(fragments.REACTANTS))
[docs]class HOFStep(BaseMetaJaguarWorkflowStep):
""" A step for running the Heat of Formation workflow """
PROCS = 2
STEP_NAME = 'HOF'
JOB_BASE = 'hof'
DRIVER_PATH = 'jaguar'
FLAGS = {
'run': None,
'deltah.py': None,
'-zpe': None,
'-scalfr': '0.9806',
'-method_sp': 'M06-2X'
}
FLAGS_ADD_JOB_NAME = False
HOF_PROP = 'r_j_Delta_H_of_Formation(298K)_(kcal/mol)'
SAVE_PROPS = {HOF_PROP}
[docs] def __init__(self, *args, high=False, **kwargs):
"""
Create a HOFStep instance.
:param bool high: True if we are computing heat of formation at the high
level of accuracy
See parent class for additional documentation.
"""
super().__init__(*args, **kwargs)
self.flags['-jobname'] = self.job_name
if high:
self.flags['-basis_sp'] = '6-311++G-3df-3pd'
else:
self.flags['-basis_sp'] = 'LACVP*'
self.flags['-ps'] = None
[docs] def outputStructurePath(self):
"""
Get the path to the subjob subdirectory that contains the results
:rtype: str
:return: The path to the subdirectory inside the subjob's directory that
holds the results.
"""
oname = os.path.join(self.subdir, self.job_name + '_deltah',
self.getOutputName())
return oname
def _getOutputData(self):
"""
Get the output structure
:rtype: list
:return: The one item of the list is the output structure
"""
oname = self.outputStructurePath()
if not os.path.exists(oname):
return
output = list(structure.StructureReader(oname))
return output
[docs] def getOutputName(self):
"""
Get the name of the output structure file
:rtype: str
:return: The output structure file name
"""
return self.job_name + '_deltah.mae'
[docs]class CustomMaeStep(BaseStep):
""" A step for running custom scripts that output a .mae file """
STEP_NAME = 'Custom'
JOB_BASE = 'custom'
USES_JC = False
[docs]class BaseDesmondResults(BaseResults):
"""
A simple results class to provide the functionality from the
jaguarworkflows.Results class that is still needed for Desmond jobs
"""
[docs] def getMaeStructure(self):
"""
Get the resulting Maestro structure for the parent class
"""
return self.parent.getCMSSystem()
[docs]class BaseDesmondStep(BaseStep):
"""
Base class for steps that use Desmond as the main engine or produce a
cms file
"""
RESULTS_CLASS = BaseDesmondResults
# If step requires CMS input
REQUIRES_CMS_INPUT = True
# If the step outputs CMS
OUTPUTS_CMS = True
[docs] def __init__(self, *args, maestro_in=False, **kwargs):
"""
Create a BaseDesmondStep instance
:param bool maestro_in: If true, the class will write a Maestro input
file (.maegz) rather than a CMS (.cms) file
See parent class for additional documentation
"""
self.maestro_in = maestro_in
if maestro_in:
self.REQUIRES_CMS_INPUT = False
super().__init__(*args, **kwargs)
[docs] def addParentalSubdirProperties(self, struct):
"""
Add parental subdirectory properties to the structure
:type struct: `cms.Cms` or `schrodinger.structure.Structure`
:param struct: The structure to add properties to
"""
if isinstance(struct, cms.Cms):
parent_subdir, requires_subdirs = self.getParentalSubdirectories()
if parent_subdir:
struct.set_cts_property(msprops.PARENT_SUBDIRECTORY,
parent_subdir)
if requires_subdirs:
struct.set_cts_property(msprops.REQUIRES_SUBDIRECTORIES,
requires_subdirs)
else:
super().addParentalSubdirProperties(struct)
def _getOutputData(self):
"""
Return the list of output structures from this calculation
:rtype: list
:return: The single item is the output structure from this step as a Cms
system
"""
oname = os.path.join(self.subdir, self.getOutputName())
if not fileutils.is_cms_file(oname):
# This handles steps like the Disordered System Builder whose output
# type can change based on command line flags
return super()._getOutputData()
if not os.path.exists(oname):
return
output = [cms.Cms(oname)]
return output
[docs] def getCMSSystem(self):
"""
Get the output CMS system
:rtype: `cms.Cms`
:return: The output system
"""
return super().getStructForWriting()
[docs] def getStructForWriting(self):
"""
Get the output Maestro structure derived from the output CMS system
:rtype: `schrodinger.Structure`
:return: The output Maestro structure
"""
struct = super().getStructForWriting()
try:
struct = struct.fsys_ct
except AttributeError:
# This is not a CMS structure (some steps can output mae or cms
# depending on options)
pass
# Desmond adds project group hierarchy properties, remove them
props = [
mm.M2IO_DATA_SUBGROUP_TITLE, mm.M2IO_DATA_SUBGROUPID,
mm.M2IO_DATA_SUBGROUP_COLLAPSED
]
msutils.remove_properties(struct, props=props)
# Make the trajectory path point to the Step subdirectory
trj = struct.property.get(desmondutils.PROP_TRJ)
if trj:
# Use / instead of os.path.join so it works on all platforms
struct.property[desmondutils.PROP_TRJ] = f'{self.job_name}/{trj}'
return struct
[docs] def getOutputName(self):
"""
Get the name of the output structure file
:rtype: str
:return: The output structure file name
"""
return self.job_name + '-out.cms'
[docs] def getTrajectoryName(self):
"""
Get the name of the output trajectory directory
:rtype: str or None
:return: The output trajectory directory name or None if this step does
not create trajectories
"""
if self.CAN_CREATE_TRAJECTORY:
return os.path.join(self.subdir, self.job_name + '_trj')
else:
return None
[docs]class DisorderedSystemBuilderStep(BaseDesmondStep):
"""
A step that runs the Disordered System Builder.
"""
STEP_NAME = 'Disordered system'
JOB_BASE = 'dsb'
DRIVER_PATH = ('disordered_system_builder_gui_dir/'
'disordered_system_builder_driver.py')
DRIVER = import_driver(DRIVER_PATH)
FLAGS = {
'-grid': None,
'-pack': None,
'-split_components': None,
'-forcefield': desmondutils.OPLS3
}
# If step requires CMS input
REQUIRES_CMS_INPUT = False
[docs] def __init__(self, *args, **kwargs):
"""
Create a DisorderedSystemBuilderStep instance
See parent class for additional documentation
"""
super().__init__(*args, maestro_in=True, **kwargs)
self.cmd_args.append(self.job_name)
[docs] def getOutputName(self):
"""
Get the name of the output structure file
:rtype: str
:return: The output structure file name
"""
if self.DRIVER.NO_SYSTEM_FLAG in self.cmd:
return self.job_name + '_amorphous.maegz'
else:
return self.job_name + '_system-out.cms'
[docs]class PolymerBuilderStep(BaseDesmondStep):
"""
A step that runs the Polymer Builder.
"""
STEP_NAME = 'Polymer Builder'
JOB_BASE = 'polybuild'
DRIVER_PATH = ('polymer_builder_gui_dir/polymer_builder_driver.py')
DRIVER = import_driver(DRIVER_PATH)
# If step requires CMS input
REQUIRES_CMS_INPUT = False
[docs] def __init__(self, *args, **kwargs):
"""
Create a PolymerBuilderStep instance
See parent class for additional documentation
"""
super().__init__(*args, maestro_in=True, **kwargs)
if not self.parent and not self.input_file:
raise RuntimeError('The structure file must be provided')
[docs] def getOutputName(self):
"""
Get the name of the output structure file
:rtype: str
:return: The output structure file name
"""
if self.DRIVER.FLAG_NO_SYSTEM in self.cmd:
return self.job_name + '-amcell.maegz'
else:
return self.job_name + '_system-out.cms'
[docs] def removeIncomingProperties(self, *args, **kwargs):
"""
Override the parent method to do nothing since we don't have a single
simple input structure and the properties do not survive the polymer
builder.
"""
return
[docs]class MolecularDynamicsStep(BaseDesmondStep):
""" A step that runs one or more Desmond MD stages """
STEP_NAME = 'Relaxation'
JOB_BASE = 'relax'
# The last percent of the frames that are used for analysis property
# averaging
ANALYSIS_FRAME_PERCENT = 20
CAN_CREATE_TRAJECTORY = True
[docs] def __init__(self, *args, analysis=False, average=None, **kwargs):
"""
Create a MolecularDynamicsStep object
:param bool analysis: Whether to perform a trajectory analysis as the
last stage
:param int average: What percent of the final stage trajectory to
average the cell size over
See parent class for documentation
"""
super().__init__(*args, **kwargs)
self.num_stages = 0
self.analysis = analysis
if average and average < 0:
raise RuntimeError('average must be > 0')
self.average = average
[docs] def addVelocityIfNeeded(self, kwargs):
"""
Add the velocity write keyword if there will be an analysis stage
:param dict kwargs: dict of current keyword/value pairs - modified if
velocity is needed
"""
if self.analysis:
kwargs[desmondutils.MSJStringer.TRJ_WRITE_VEL] = True
[docs] def getStringers(self):
"""
Create the stringers that define each stage
:rtype: list
:return: A list of MSJStringer objects
"""
# Note - it's important for velocities to be written if this stage will
# have analysis done
kwargs = {}
self.addVelocityIfNeeded(kwargs)
example = desmondutils.MDMSJStringer(last=True, **kwargs)
return [example]
[docs] def writeMSJFile(self):
""" Write the MSJ file for this step """
stringers = self.getStringers()
if self.analysis:
stringers.append(desmondutils.MSAnalysisMSJStringer())
if self.average:
kwargs = {
desmondutils.AveCellMSJStringer.PERCENT_TO_AVG: self.average
}
stringers.append(desmondutils.AveCellMSJStringer(**kwargs))
mname = self.getMSJName()
desmondutils.create_msj(stringers, mname)
self.input_files.append(mname)
# Add 1 for the initial "auto" stage
self.num_stages = len(stringers) + 1
[docs] def getMSJName(self):
"""
Get the name of the MSJ file
:rtype: str
:return: The MSJ file name
"""
return self.job_name + '.msj'
[docs] def createJob(self):
"""
Form the command line and create a job for the queue
:rtype: `jobutils.RobustSubmissionJob`
:return: The job to add to the JobDJ
"""
cmd = desmondutils.get_multisim_command(self.getInputName(),
self.getOutputName(),
msj_name=self.getMSJName(),
job_name=self.job_name)
return self.createQJobFromCommand(cmd)
[docs] def processOutput(self, output):
"""
Process the output of a job during the finish part of the step
:param output: The output of the job. If it evaluates to False, no
processing is done. Otherwise it is unused.
"""
super().processOutput(output)
if not output or not self.analysis:
return
# Define the properties to save
properties = {
'DensityResult': msprops.DENSITY_PROP,
'VolumeResult': msprops.VOLUME_PROP,
'EneCohesiveResult': msprops.COHESIVE_PROP,
'HeatVaporizationResult': msprops.HEAT_VAP_PROP,
'SolubilityParameterResult': msprops.SOLUBILITY_PROP,
'SolubilityParameterVdwSqResult': msprops.SOLUBILITY_VDW_PROP,
'SolubilityParameterEleSqResult': msprops.SOLUBILITY_ELE_PROP,
'SpecificHeatResult': msprops.SPECIFIC_HEAT_PROP
}
# Read the output file
tried_paths = []
for propext in ('eaf', 'st2'):
eaf_name = f'{self.job_name}_{self.num_stages}-out.{propext}'
eaf_path = os.path.join(self.subdir, eaf_name)
tried_paths.append(eaf_path)
if os.path.exists(eaf_path):
break
else:
self.log('Could not find analysis output file, tried:')
for tpath in tried_paths:
self.log(tpath)
return
with open(eaf_path, 'r') as eaf_file:
analysis = sea.Map(eaf_file.read())
# We'll average over the last 20% of frames
total_frames = analysis.TrajectoryNumFrames.val
# Use the last 20% of frame data for averages
ave_frames = math.ceil(total_frames * self.ANALYSIS_FRAME_PERCENT / 100)
# Compute all the properties and save them
bulk = analysis.Keywords[0].Bulk
for seaprop, stprop in properties.items():
if stprop == msprops.SPECIFIC_HEAT_PROP:
try:
average = bulk[seaprop][1].val
except KeyError:
continue
else:
try:
sea_atoms = bulk[seaprop][-ave_frames:]
except KeyError:
continue
average = sum(x.val for x in sea_atoms) / ave_frames
self.workflow.properties[stprop] = average
[docs]class BaseRelaxationStep(MolecularDynamicsStep):
"""A base class for relaxation step"""
STEP_NAME = 'BaseRelaxation'
JOB_BASE = 'baserelax'
[docs] def __init__(self, *args, temp=300.0, **kwargs):
"""
Create a base relaxation object
:param float temp: Temperature for the final relaxation step
See parent class for documentation
"""
super().__init__(*args, **kwargs)
self.temp = temp
[docs] def getStringers(self):
"""
Create the stringers that define each stage
:raise NotImplementedError: Overwrite this method
:rtype: str
:return: A list of MSJStringer objects
"""
raise NotImplementedError
[docs]class MatSciMDRelaxationStep(BaseRelaxationStep):
""" A step that runs a MatSci MD relaxation protocol """
STEP_NAME = 'MSRelaxation'
JOB_BASE = 'msrelax'
[docs] def getStringers(self):
"""
Create the stringers that define each stage
:rtype: str
:return: A list of MSJStringer objects
"""
return desmondutils.get_materials_relaxation_stringers(
temp=self.temp, compress_last=False)
[docs]class CompressiveRelaxationStep(BaseRelaxationStep):
""" A step that runs a compressive relaxation protocol """
STEP_NAME = 'CompressRelaxation'
JOB_BASE = 'comprelax'
[docs] def getStringers(self):
"""
Create the stringers that define each stage
:rtype: str
:return: A list of MSJStringer objects
"""
return desmondutils.get_compressive_relaxation_stringers(
temp=self.temp, compress_last=False)
[docs]class SemiCrystalRelaxation1Step(BaseRelaxationStep):
""" A step that runs first Semi-Crystalline relaxation protocol """
STEP_NAME = 'SemiCrystalRelaxation1'
JOB_BASE = 'semicrystalrelax1'
[docs] def getStringers(self):
"""
Create the stringers that define each stage
:rtype: str
:return: A list of MSJStringer objects
"""
return desmondutils.get_semicrystalline_relaxation1_stringers(
temp=self.temp, compress_last=False)
[docs]class SemiCrystalRelaxation2Step(BaseRelaxationStep):
""" A step that runs second Semi-Crystalline relaxation protocol """
STEP_NAME = 'SemiCrystalRelaxation2'
JOB_BASE = 'semicrystalrelax2'
[docs] def getStringers(self):
"""
Create the stringers that define each stage
:rtype: str
:return: A list of MSJStringer objects
"""
return desmondutils.get_semicrystalline_relaxation2_stringers(
temp=self.temp, compress_last=False)
[docs]class SingleMDStep(MolecularDynamicsStep):
""" A step that runs a single MD simulation """
STEP_NAME = 'md'
JOB_BASE = 'md'
STRINGER_CLASS = desmondutils.MDMSJStringer
[docs] def __init__(self, *args, params=None, **kwargs):
"""
Create a MatSciMDRelaxationStep object
:param dict temp: Keyword arguments to pass to the `STRINGER_CLASS`
See parent class for documentation
"""
super().__init__(*args, **kwargs)
self.params = params or {}
# Don't compress this step
if 'last' not in self.params:
self.params['last'] = True
[docs] def getStringers(self):
"""
Create the stringers that define each stage
:rtype: list
:return: A list of MSJStringer objects
"""
# Note - it's important for velocities to be written if this stage will
# have analysis done
self.addVelocityIfNeeded(self.params)
# Note - it's important for velocities to be written if this stage will
# have analysis done
return [self.STRINGER_CLASS(**self.params)]
[docs]class SingleBrownieStep(SingleMDStep):
""" A step that runs a single Brownie step """
STEP_NAME = 'brownie'
JOB_BASE = 'brownie'
STRINGER_CLASS = desmondutils.BrownieMSJStringer
[docs]class TgStep(BaseDesmondStep):
""" A step that runs a Tg calculation """
STEP_NAME = 'Tg'
JOB_BASE = 'tg'
DRIVER_PATH = ('thermophysical_properties_gui_dir/'
'thermophysical_properties_driver.py')
DRIVER = import_driver(DRIVER_PATH)
FLAGS = {
DRIVER.MIN_TEMP_FLAG: 200,
DRIVER.MAX_TEMP_FLAG: 600,
DRIVER.TEMP_STEP_FLAG: 25,
DRIVER.DENSITY_TIME_FLAG: 20,
DRIVER.DENSITY_CYCLES_FLAG: 5,
DRIVER.JOB_ORDER_FLAG: DRIVER.MAX_FIRST,
DRIVER.LOAD_STRUC_FLAG: 200.0,
DRIVER.INTERVAL_FLAG: 200.0
}
FLAGS_INPUT_NAME_FLAG = '-icms'
SAVE_PROP_STARTS = {msprops.TG_DENSITY_LEAD}
CAN_CREATE_TRAJECTORY = True
[docs] def __init__(self, *args, melting=False, bilinear=False, **kwargs):
"""
Create a TgStep object
:param bool melting: If True, temps will run from low to high.
Otherwise, (default), they will run from high to low.
:param bool bilinear: If True, the Tg fit will be done via the bilinear
method. Otherwise, (default), it will be done via hyperbola
See parent class for additional documentation
"""
super().__init__(*args, **kwargs)
self.melting = melting
self.bilinear = bilinear
if self.melting:
self.flags['-job_order'] = self.DRIVER.MIN_FIRST
[docs] def processOutput(self, output):
"""
Process the output of a job during the finish part of the step. This
computes the Tg via fit of temperature vs density.
:param list output: The output of the job. The first item should be the
structure (Cms or Structure) that contains the density vs
temperature properties.
"""
super().processOutput(output)
if not output:
return
data = []
system = output[0]
for key, value in system.property.items():
if key.startswith(msprops.TG_DENSITY_LEAD):
temp = float(key.split('_')[4])
density = float(system.property[key])
data.append((temp, density))
if self.bilinear:
tg = uq_utils.get_bilinear_prop_from_points(data)
else:
tg = uq_utils.get_hyperbola_prop_from_points(data)
if not tg:
self.log('Unable to obtain Tg value from fit to density data')
return
if self.melting:
prop = msprops.TM_PROP
else:
prop = msprops.TG_PROP
if self.tag:
prop += '_' + self.tag
self.workflow.properties[prop] = tg
[docs]class StressStrainStep(BaseDesmondStep):
""" A step that runs a Stress-Strain calculation """
STEP_NAME = 'Stress'
JOB_BASE = 'stress'
DRIVER_PATH = 'stress_strain_gui_dir/stress_strain_driver.py'
DRIVER = import_driver(DRIVER_PATH)
SAVE_PROP_STARTS = {'r_matsci_Stress', 'r_matsci_Strain'}
CAN_CREATE_TRAJECTORY = True
[docs]class PolymerCrosslinkStep(BaseDesmondStep):
""" A step that runs a polymer crosslink calculation """
STEP_NAME = 'Crosslinking'
JOB_BASE = 'xlink'
FLAGS_INPUT_NAME_FLAG = '-icms'
DRIVER_PATH = 'polymer_crosslink_gui_dir/polymer_crosslink_driver.py'
DRIVER = import_driver(DRIVER_PATH)
[docs]class ElasticConstantsStressStep(BaseDesmondStep):
""" A step that runs a stress-based elastic constants calculation """
STEP_NAME = 'Estress'
JOB_BASE = 'estress'
FLAGS_INPUT_NAME_FLAG = '-icms'
DRIVER_PATH = 'elastic_constants_gui_dir/elastic_constants_driver2.py'
DRIVER = import_driver(DRIVER_PATH)
SAVE_PROPS = {
msprops.EL_Y_MOD, msprops.EL_UNI_ANI, msprops.EL_P_RATIO,
msprops.EL_LAMBDA, msprops.EL_MU, msprops.EL_SHEAR, msprops.EL_BULK
}
CAN_CREATE_TRAJECTORY = True
[docs]class ElasticConstantsStep(ElasticConstantsStressStep):
""" A step that runs a non-stress elastic constants calculation """
STEP_NAME = 'Elastic'
JOB_BASE = 'elastic'
DRIVER_PATH = 'elastic_constants_gui_dir/elastic_constants_driver.py'
DRIVER = import_driver(DRIVER_PATH)
SAVE_PROP_STARTS = {'r_matsci_Strain'}
# If the step outputs CMS
OUTPUTS_CMS = False
[docs] def getOutputName(self):
"""
Get the name of the output structure file
:rtype: str
:return: The output structure file name
"""
return self.job_name + '_md-out.mae'
[docs]class PenetrantLoadingStep(BaseDesmondStep):
""" A step that runs a Penetrant Loading calculation """
STEP_NAME = 'pload'
JOB_BASE = 'pload'
DRIVER_PATH = ('penetrant_loading_gui_dir/penetrant_loading_driver.py')
DRIVER = import_driver(DRIVER_PATH)
CAN_CREATE_TRAJECTORY = True
[docs] def getOutputName(self):
"""
Get the name of the output structure file
:rtype: str
:return: The output structure file name
"""
return self.job_name + self.DRIVER.OUTFILE_EXTENSION
[docs] def getTrajectoryName(self):
"""
Get the name of the output trajectory directory
:rtype: str
:return: The output trajectory directory name
"""
return os.path.join(self.subdir, self.job_name + '-multisim_trj')
[docs]class PolymerChainAnalysisStep(BaseDesmondStep):
""" A step that runs a Polymer Chain Analysis """
STEP_NAME = 'pcan'
JOB_BASE = 'pcan'
DRIVER_PATH = ('polymer_chain_analysis_gui_dir/'
'polymer_chain_analysis_driver.py')
DRIVER = import_driver(DRIVER_PATH)
SAVE_PROPS = {
DRIVER.PERSISTENCE_LENGTH, DRIVER.END_TO_END_DISTANCE,
DRIVER.MEAN_SQUARED_END_DISTANCE, DRIVER.EXTENDED_CHAIN_LENGTH,
DRIVER.RADIUS_OF_GYRATION, DRIVER.MEAN_SQUARED_RADIUS_OF_GYRATION,
DRIVER.MEAN_FRACTIONAL_ANISOTROPY, DRIVER.END_DIST_FILE,
DRIVER.END_N_DIST_FILE, DRIVER.RG_FILE, DRIVER.TORSIONAL_FILE,
DRIVER.ORDER_FILE, DRIVER.START_TIME, DRIVER.END_TIME
}
# If step requires TRAJECTORY input
REQUIRES_TRAJECTORY_INPUT = True
[docs] def getOutputName(self):
"""
Get the name of the output structure file
:rtype: str
:return: The output structure file name
"""
return self.job_name + '-trj.cms'
[docs]class StructureFactorStep(BaseDesmondStep):
""" A step that runs a Structure Factor calculation """
STEP_NAME = 'sfactor'
JOB_BASE = 'sfactor'
DRIVER_PATH = 'structure_factor_gui_dir/structure_factor_driver.py'
DRIVER = import_driver(DRIVER_PATH)
SAVE_PROPS = {'s_matsci_sq_method', 'r_matsci_sq_resolution'}
FLAGS_INPUT_NAME_FLAG = '-cms_file'
# If step requires TRAJECTORY input
REQUIRES_TRAJECTORY_INPUT = True
[docs]class SimulationProfileStep(BaseDesmondStep):
""" A step that runs a Simulation Profile calculation """
STEP_NAME = 'profile'
JOB_BASE = 'profile'
DRIVER_PATH = ('trajectory_density_analysis_gui_dir/'
'trajectory_density_analysis_driver.py')
DRIVER = import_driver(DRIVER_PATH)
FLAGS_INPUT_NAME_FLAG = '-cms_file'
# If step requires TRAJECTORY input
REQUIRES_TRAJECTORY_INPUT = True
[docs]class CustomCmsStep(BaseDesmondStep):
""" A step for running custom scripts that output a .cms file """
STEP_NAME = 'Custom'
JOB_BASE = 'custom'
USES_JC = False
[docs]class CustomCmsToMaeStep(CustomCmsStep):
"""
A step for running custom scripts that take in a .cms file and output .maegz
"""
STEP_NAME = 'Custom'
JOB_BASE = 'custom'
OUTPUTS_CMS = False
[docs] def getOutputName(self):
"""
Get the name of the output structure file
:rtype: str
:return: The output structure file name
"""
return self.job_name + '-out.maegz'
[docs]class CustomTrajectoryStep(CustomCmsStep):
"""
A step for running custom scripts that output a .cms file and create
trajectories
"""
CAN_CREATE_TRAJECTORY = True
[docs]class ClusterAnalysisStep(BaseDesmondStep):
""" A step that runs a Cluster Analysis calculation """
STEP_NAME = 'cluster'
JOB_BASE = 'cluster'
DRIVER_PATH = 'cluster_analysis_gui_dir/cluster_analysis_driver.py'
DRIVER = import_driver(DRIVER_PATH)
FLAGS_INPUT_NAME_FLAG = '-cms_file'
# If the step outputs CMS
OUTPUTS_CMS = False
[docs] def getOutputName(self):
"""
Get the name of the output structure file
:rtype: str
:return: The output structure file name
"""
return self.job_name + '_out.maegz'