"""
Workflow and Step classes to aid in running a series of Jaguar jobs.
For each molecule, a Workflow object is established. The Steps the Workflow will
run depend on the options chosen. Some Steps depend on other Steps to finish
before starting. It is the job of the Workflow to submit jobs when all the
required dependencies have finished successfully.
Workflow objects submit jobs to a JobDJ queue.
Copyright Schrodinger, LLC. All rights reserved.
"""
import logging
import os.path
import re
import numpy
from schrodinger import structure
from schrodinger.application.jaguar import autots_constants
from schrodinger.application.jaguar import constants as jaguar_constants
from schrodinger.application.jaguar import input as jaguar_input
from schrodinger.application.jaguar import output as jaguar_output
from schrodinger.application.jaguar import utils as jagutils
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import msutils
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import msprops
from schrodinger.infra import mm
from schrodinger.job import jobcontrol
from schrodinger.job import queue
from schrodinger.structure import workflow_action_menu as wam
from schrodinger.utils import fileutils
from schrodinger.utils import units
JAGUAR_INPATH_PROP = 's_j_Jaguar_input_file'
# see jaguar-src/main/restart.f for definitions of the following
GAS_PHASE_ENERGY_PROP = 'r_j_Gas_Phase_Energy'
# This is the canonical Jaguar final energy regardless of method/solvent/etc
FINAL_ENERGY_PROP = autots_constants.PROPERTY_KEY_ENERGY
HOMO_ENERGY_PROP = 'r_j_HOMO'
LUMO_ENERGY_PROP = 'r_j_LUMO'
ALPHA_HOMO_ENERGY_PROP = 'r_j_alpha_HOMO'
ALPHA_LUMO_ENERGY_PROP = 'r_j_alpha_LUMO'
BETA_HOMO_ENERGY_PROP = 'r_j_beta_HOMO'
BETA_LUMO_ENERGY_PROP = 'r_j_beta_LUMO'
LOWEST_EXCITATION_PROP = 'r_j_lowest_excitation'
LOWEST_SINGLET_EXCITATION_PROP = 'r_j_lowest_singlet_excitation'
LOWEST_TRIPLET_EXCITATION_PROP = 'r_j_lowest_triplet_excitation'
LOWEST_FREQUENCY_PROP = 'r_j_Lowest_Frequency_(cm-1)'
ZERO_POINT_ENERGY_PROP = 'r_j_Zero_Point_Energy_(kcal/mol)'
# sum of gas phase energy and zero point energy
GAS_PHASE_ZERO_POINT_ENERGY_PROP = 'r_matsci_Gas_Phase_Zero_Point_Energy_(au)'
THERMO_TOTAL_STARTER = 'r_j_Total'
TOTAL_INTERNAL_ENERGY_STARTER = THERMO_TOTAL_STARTER + '_Internal_Energy_(au)'
TOTAL_ENTHALPY_STARTER = THERMO_TOTAL_STARTER + '_Enthalpy_(au)'
INTERNAL_ENERGY_STARTER = 'r_j_Internal_Energy_(kcal/mol)'
ENTHALPY_STARTER = 'r_j_Enthalpy_(kcal/mol)'
LNQ_STARTER = 'r_j_ln(Q)'
TOTAL_FREE_ENERGY_STARTER = THERMO_TOTAL_STARTER + '_Free_Energy_(au)'
FREE_ENERGY_STARTER = 'r_j_Free_Energy_(kcal/mol)'
ENTROPY_STARTER = 'r_j_Entropy_(cal/mol/K)'
HEAT_CAPACITY_STARTER = 'r_j_Heat_Capacity_(cal/mol/K)'
SOLUTION_ENERGY_PROP = 'r_j_Solution_Phase_Energy'
SOLVATION_ENERGY_PROP = 'r_j_Solvation_Energy_(kcal/mol)'
GAS_PHASE_GROUND_ENERGY_PROP = 'r_j_Gas_Phase_Ground_Energy'
GAS_EXCITED_ENERGY_PROP = 'r_j_Gas_Phase_Target_Excited_Energy'
SOLUTION_GROUND_ENERGY_PROP = 'r_j_Solution_Phase_Ground_Energy'
SOL_EXCITED_ENERGY_PROP = 'r_j_Solution_Phase_Target_Excited_Energy'
GROUND_SOLVATION_ENERGY_PROP = 'r_j_Ground_Solvation_Energy_(kcal/mol)'
PCM_SOLVATION_ENERGY_PROP = 'r_j_Solvation_Energy_(electrostatic,kcal/mol)'
GROUND_PCM_SOLVATION_ENERGY_PROP = ('r_j_Ground_Solvation_Energy_'
'(electrostatic,kcal/mol)')
SINGLET_SPM_ENDING = '_uvv_singlet.spm'
WILDCARD = '*'
TEMP_PRESS_KEY_EXT = '_{temp}K_{press}atm'
ALL_TEMP_PRESS_KEY_EXT = TEMP_PRESS_KEY_EXT.format(temp=WILDCARD,
press=WILDCARD)
GEOPT_WAM = jobutils.WAM_TYPES.JAG_GEOPT
ROBUST_JAGUAR_DRIVER_PATH = 'reaction_workflow_gui_dir/robust_jaguar_driver.py'
ROBUST_JOB_ENDING = '_robust'
PRESS_RE = re.compile(r'(\d+\.?\d*E?[+-]?\d*)atm')
TEMP_RE = re.compile(r'(\d+\.?\d*)K')
[docs]def get_jaguar_max_atoms():
"""
Get maximum number of atoms currently supported by Jaguar
:rtype: int
:return: Maximum number of atoms
"""
return mm.NPA
[docs]def get_temp_press_key_ext(temp, press):
"""
Return Jaguar's temperature and pressure thermochemistry key
extension.
:type temp: float
:param temp: the temperature in K
:type press: float
:param press: the pressure in atm
:rtype: str
:return: the key extension
"""
# see MATSCI-9004 and JAGUAR-9540 where pressure number format
# changed
press = format_pressure(press)
return TEMP_PRESS_KEY_EXT.format(temp=temp, press=press)
[docs]def get_temperature(energy_key):
"""
Return the temperature (K) for the given energy key.
:type energy_key: str
:param energy_key: structure property energy key
:rtype: float, None
:return: the temperature (K) if there is one
"""
# temperature like '298.15' in '298.15K'
match = re.search(TEMP_RE, energy_key)
if match:
return float(match.group(1))
[docs]def get_pressure(energy_key):
"""
Return the pressure (atm) for the given energy key.
:type energy_key: str
:param energy_key: structure property energy key
:rtype: float, None
:return: the pressure (atm) if there is one
"""
# pressure like '1.00E+00' in '1.00E+00atm'
match = re.search(PRESS_RE, energy_key)
if match:
return float(match.group(1))
[docs]def get_wildcard_energy_key(energy_key):
"""
Return the wildcard version of the given energy key.
:type energy_key: str
:param energy_key: structure property energy key
:rtype: str, None
:return: the wildcard version of the energy key
if there is one, else None
"""
temp = get_temperature(energy_key)
press = get_pressure(energy_key)
if None not in (temp, press):
temp = str(temp)
press = format_pressure(press)
return energy_key.replace(temp, WILDCARD).replace(press, WILDCARD)
[docs]def get_internal_energy_key(temp, press):
"""
Return Jaguar's thermochemistry internal energy key.
:type temp: float
:param temp: the temperature in K
:type press: float
:param press: the pressure in atm
:rtype: str
:return: the key
"""
ext = get_temp_press_key_ext(temp, press)
return f'{TOTAL_INTERNAL_ENERGY_STARTER}{ext}'
[docs]def get_enthalpy_key(temp, press, total=True):
"""
Return Jaguar's thermochemistry enthalpy key.
:type temp: float
:param temp: the temperature in K
:type press: float
:param press: the pressure in atm
:param bool total: If True, return key for total enthalpy, otherwise
vibrational enthalpy key is returned
:rtype: str
:return: the key
"""
starter = TOTAL_ENTHALPY_STARTER if total else ENTHALPY_STARTER
ext = get_temp_press_key_ext(temp, press)
return f'{starter}{ext}'
[docs]def get_free_energy_key(temp, press, total=True):
"""
Return Jaguar's thermochemistry free energy key.
:type temp: float
:param temp: the temperature in K
:type press: float
:param press: the pressure in atm
:param bool total: If True, return key for free energy, otherwise
vibrational free energy key is returned
:rtype: str
:return: the key
"""
starter = TOTAL_FREE_ENERGY_STARTER if total else FREE_ENERGY_STARTER
ext = get_temp_press_key_ext(temp, press)
return f'{starter}{ext}'
[docs]def get_entropy_key(temp, press):
"""
Return Jaguar's thermochemistry entropy key.
:type temp: float
:param temp: the temperature in K
:type press: float
:param press: the pressure in atm
:rtype: str
:return: the key
"""
ext = get_temp_press_key_ext(temp, press)
return f'{ENTROPY_STARTER}{ext}'
[docs]def get_lnq_key(temp, press):
"""
Return Jaguar's thermochemistry ln(Q) key.
:type temp: float
:param temp: the temperature in K
:type press: float
:param press: the pressure in atm
:rtype: str
:return: the key
"""
ext = get_temp_press_key_ext(temp, press)
return f'{LNQ_STARTER}{ext}'
[docs]def get_gas_phase_zpe_key(temp, press):
"""
Return matsci gas phase + ZPE key.
:type temp: float
:param temp: the temperature in K
:type press: float
:param press: the pressure in atm
:rtype: str
:return: the key
"""
ext = get_temp_press_key_ext(temp, press)
return f'{GAS_PHASE_ZERO_POINT_ENERGY_PROP}{ext}'
[docs]def checkChargeAndMultiplicity(astructure, charge, multiplicity):
"""
Check the charge and multiplicity.
:type astructure: `schrodinger.structure.Structure`
:param astructure: the structure to check
:type charge: int
:param charge: net molecular charge
:type multiplicity: int
:param multiplicity: net molecular multiplicity
:raise: ValueError if there is an issue
"""
n_elec = sum([aatom.atomic_number for aatom in astructure.atom]) - charge
# (1) n_elec is less than 1
if n_elec < 1:
msg = ('With the specified charge, %s, the system has no electrons.')
raise ValueError(msg % charge)
# (2) n_elec is odd and multiplicity is odd and (2) n_elec
# is even and multiplicity is even
if (n_elec % 2 and multiplicity % 2) or \
(not n_elec % 2 and not multiplicity % 2):
msg = ('The given charge, %s, and multiplicity, %s, are '
'inconsistent with each other.')
raise ValueError(msg % (charge, multiplicity))
[docs]class JaguarFailedException(Exception):
"""
An exception that is thrown when either reading the Jaguar output file
fails for some reason, or a successful reading shows that Jaguar failed.
"""
[docs]class JaguarOutputNotFound(JaguarFailedException):
""" Raised when the .out file can't be located """
[docs]class JaguarOutputUnreadable(JaguarFailedException):
""" Raised when the .out file can't be read for some reason """
[docs]class JaguarStuckGeometry(JaguarFailedException):
""" Raised when a stuck geometry optimization is detected """
[docs]class JaguarSCFNotConverged(JaguarFailedException):
""" Raised when the SCF did not converge """
[docs]class JaguarGeomNotConverged(JaguarFailedException):
""" Raised when the Geometry did not converge (ran out of iterations) """
[docs]class JaguarInvalidKeyword(JaguarFailedException):
""" Raised when the Geometry did not converge (ran out of iterations) """
[docs]def is_jaguar_file_property(prop):
"""
Check whether this structure property is one of the Jaguar properties and
links to a file
:param str prop: The property name
:rtype: bool
:return: True if it is, False if it isn't
"""
return prop.startswith('s_j_Jaguar') and prop.endswith('_file')
[docs]def get_jaguar_output(path=None, step_info="", st=None):
"""
Get a JaguarOutput object for the given path or structure.
:type path: str or None
:param path: The path to the output file. May be just the base name of the
output file (file instead of file.out). If None then an input structure
must be specified.
:type step_info: str
:param step_info: The step name - optional, and only used to create more
informative error messages.
:type st: `schrodinger.structure.Structure` or None
:param st: The structure from which to get the output file. If None then an
input path must be specified.
:rtype: `schrodinger.application.jaguar.output.JaguarOutput`
:return: The JaguarOutput object for path
:raises JaguarFailedException: If a problem is detected with the output.
Subclasses of this exception are raised to specify what type of error
"""
assert bool(path) ^ bool(st)
if step_info:
step_tag = ' in ' + step_info
else:
step_tag = ""
if st:
path = st.property.get(msprops.JAGUAR_OUTPATH_PROP)
if not path:
msg = ('Structure is missing the path to the Jaguar output file.')
raise JaguarOutputNotFound(msg)
if not path.endswith('.out'):
path = f'{path}.out'
if not os.path.exists(path):
msg = ('File does not exist: %s%s' % (path, step_tag))
raise JaguarOutputNotFound(msg)
pathtag = f'{path}{step_tag}'
try:
output = jaguar_output.JaguarOutput(path)
except Exception as err:
# I know that at least ValueError can occur here, but not really
# sure what other errors can occur. Regardless, we don't want to
# choke - we want to continue and process other jobs - so we catch
# them all and print a message.
msg = ('Unable to read Jaguar output:\n%s\n %s has failed' %
(str(err), pathtag))
raise JaguarOutputUnreadable(msg)
if output.geopt_stuck:
msg = (f'Detected stuck Jaguar optimization: {pathtag}')
raise JaguarStuckGeometry(msg)
errnums = jaguar_constants.FatalErrors
if output.status != output.OK or output.fatal_errorno:
errno = int(output.fatal_errorno) if output.fatal_errorno else None
errmsg = output.fatal_error
if errno in (errnums.SCF_MAX_ITERS, errnums.LARGE_DIIS_ERROR,
errnums.LARGE_DENSITY_CHANGE):
msg = (f'Jaguar SCF failed to converge ({errmsg}): {pathtag}')
raise JaguarSCFNotConverged(msg)
elif errno == errnums.GEO_MAX_ITERS:
msg = (f'Jaguar geometry failed to converge: {pathtag}')
raise JaguarGeomNotConverged(msg)
elif errmsg is not None and 'not a valid keyword' in errmsg:
msg = (f'Jaguar could not run due to {errmsg}: {pathtag}')
raise JaguarInvalidKeyword(msg)
else:
msg = (f'Detected failed Jaguar job: {pathtag}\n'
f'failure was: {errmsg}')
raise JaguarFailedException(msg)
return output
[docs]def get_out_mae_path(base):
"""
Get the output Maestro structure path for the jaguar calculation
:param str base: The base name of the jaguar job
:return str: The path to the mae file
"""
restart = jaguar_output.restart_name(base)
maefile = restart + '.mae'
if not os.path.exists(maefile):
maefile = restart + '.maegz'
return maefile
[docs]def get_jaguar_out_mae(path, require_success=False):
"""
Get the output Maestro structure for the jaguar calculation given by path
:type path: str
:param path: The path to the desired .mae or .out file or the base
name of the Jaguar job
:param bool require_success: Whether to check related Jaguar output for
success
:rtype: `schrodinger.structure.Structure` or None
:return: The output structure, or None if the file doesn't exist.
"""
# see MATSCI-6982 we need to distinguish between incoming file paths
# and base names
if os.path.exists(path) or path.endswith('.out'):
base = fileutils.splitext(path)[0]
else:
base = path
if require_success:
try:
get_jaguar_output(base)
except JaguarFailedException:
return None
maefile = get_out_mae_path(base)
try:
numstructs = structure.count_structures(maefile)
except (IOError, ValueError, mm.MmException):
return None
try:
struct = structure.Structure.read(maefile, index=numstructs)
except Exception:
# schrodinger/structure.py can raise Exception
return None
# see MATSCI-5508 and MATSCI-11357 - Jaguar does not add the
# i_m_Molecular_charge structure property to the output .01.mae in the case
# where the property value would be the same as the sum of the atomic formal
# charges
struct.property['i_m_Molecular_charge'] = jagutils.get_total_charge(struct)
return struct
[docs]def add_jaguar_files_to_jc_backend(base_name,
backend=None,
spm=False,
others=None,
restart=True):
"""
Add the typical jaguar files for a job to the backend so they are returned
to the working directory.
:type base_name: str
:param base_name: The base name of the files
:type backend: `schrodinger.job.jobcontrol._Backend`
:param backend: The jobcontrol backend (from jobcontrol.get_backend()). If
not supplied, an attempt will be made to obtain one.
:type spm: bool
:param spm: Whether to add the _uvv_singlet.spm file
:type others: list
:param others: List of additional extensions for files named
base_name.extension that should be added to the job control backend. For
example: others=['_vib.spm', '_vcd.spm'] will add base_name_vib.spm and
basename_vcd.spm. Note that any extensions need to include the leading '.'.
:type restart: bool
:param restart: Whether to include the .0x.in file. These files are very
large, so it's best not to keep them unless necessary.
"""
if not backend:
backend = jobcontrol.get_backend()
if backend:
backend.addOutputFile(base_name + '.in')
backend.addOutputFile(base_name + '.out')
backend.addOutputFile(base_name + '.log')
backend.addOutputFile(base_name + '.mae')
if spm:
backend.addOutputFile(base_name + SINGLET_SPM_ENDING)
# Get the name with the correct .0x extension (.01 typically)
restart_name = jaguar_output.restart_name(base_name)
if restart:
backend.addOutputFile(restart_name + '.in')
backend.addOutputFile(restart_name + '.mae')
if others:
for ext in others:
backend.addOutputFile(base_name + ext)
[docs]def create_job(options,
filename,
jobclass=jobutils.RobustSubmissionJob,
serial_only=False,
path=None,
robust=False):
"""
Create a job of class jobclass that will run the Jaguar input file filename
with options
:type options: `argparse.Namespace`
:param options: The input options.
:type filename: str
:param filename: The name of the input file
:type jobclass: `schrodinger.job.JobControlJob`
:param jobclass: The class used to create the job
:type serial_only: bool
:param serial_only: Whether to force the job to run in serial. If False
(default), parallel options will be used if available in options.
:param str path: Set the subjob command directory to path (this is where the
subjob will be run from)
:param bool robust: Use the robust Jaguar driver rather than Jaguar itself
:rtype: jobclass
:return: The created job
"""
if robust:
# get_basename removes both the extension and any path information since
# we don't want either in the job name.
jobname = fileutils.get_basename(filename) + ROBUST_JOB_ENDING
cmd = [ROBUST_JAGUAR_DRIVER_PATH, '-JOBNAME', jobname]
else:
cmd = ['jaguar', 'run']
if not serial_only:
try:
if options.PARALLEL > 1:
cmd += ['-PROCS', str(options.PARALLEL)]
except AttributeError:
pass
tppval = jobutils.get_option(options, parserutils.FLAG_TPP)
if tppval and tppval > 1:
cmd += ['-TPP', str(tppval)]
cmd += [filename]
return jobclass(cmd, command_dir=path)
[docs]class Results(object):
"""
A low memory results object - because the driver ends up holding on to
results for a long time and can be simultaneously holding results for a
large number of calculations, we want to keep the memory footprint of each
result low. Mainly, we don't want to hold structures in memory, but also
orbital eigenvectors, etc.
This class mimics a limited subset of the jaguar.output.JaguarResults class
API. Future needs might increase which properties are kept, but do not keep
any large-memory properties.
"""
[docs] def __init__(self, path):
"""
Create a Results object.
:type path: str
:param path: path to the Jaguar Output file, or a path to the input
file, as the JaguarOutput class can find the output file from that.
"""
self.path = path
struct = self.getMaeStructure()
if not struct:
raise ValueError('Could not find MAE file for the path: %s' %
self.path)
(gas_ground_en, sol_ground_en, gas_excited_en_1,
sol_excited_en_1) = (struct.property.get(GAS_PHASE_ENERGY_PROP),
struct.property.get(SOLUTION_ENERGY_PROP),
struct.property.get(GAS_EXCITED_ENERGY_PROP),
struct.property.get(SOL_EXCITED_ENERGY_PROP))
# Only for CT coupling calculations, energy and HOMO/LUMO can be missing
ct_coupling = struct.property.get('r_matsci_Charge_Transfer_Coupling')
self.energy = self.gas_energy = self.solution_energy = None
if sol_excited_en_1 is not None:
self.energy = self.solution_energy = sol_excited_en_1
elif gas_excited_en_1 is not None:
self.energy = self.gas_energy = gas_excited_en_1
elif sol_ground_en is not None:
self.energy = self.solution_energy = sol_ground_en
elif gas_ground_en is not None:
self.energy = self.gas_energy = gas_ground_en
elif ct_coupling is None:
raise ValueError('Could not find any energies in MAE file with the '
'path: %s' % self.path)
# For open shell
self.homo_alpha, self.lumo_alpha, self.homo_beta, self.lumo_beta = (
struct.property.get(ALPHA_HOMO_ENERGY_PROP),
struct.property.get(ALPHA_LUMO_ENERGY_PROP),
struct.property.get(BETA_HOMO_ENERGY_PROP),
struct.property.get(BETA_LUMO_ENERGY_PROP))
uhf_orbs = (self.homo_alpha, self.lumo_alpha, self.homo_beta,
self.lumo_beta)
# For closed shell
self.homo = struct.property.get(HOMO_ENERGY_PROP)
self.lumo = struct.property.get(LUMO_ENERGY_PROP)
hf_orbs = (self.homo, self.lumo)
# It's either all of them present or None
if (not all(orb is None for orb in uhf_orbs) and
any(orb is None for orb in uhf_orbs)):
raise ValueError('Could not find all alpha/beta HOMO/LUMO energies '
'in MAE file with the path: %s' % self.path)
# It's either all of them present or None
elif (not all(orb is None for orb in hf_orbs) and
any(orb is None for orb in hf_orbs)):
raise ValueError('Could not find all HOMO/LUMO energies '
'in MAE file with the path: %s' % self.path)
elif ct_coupling is None and (all(orb is None for orb in hf_orbs) and
all(orb is None for orb in uhf_orbs)):
raise ValueError('Could not find any HOMO/LUMO energies in MAE '
'file with the path: %s' % self.path)
# Sometimes the dipole is None
self.dipole = struct.property.get('r_j_QM_Dipole_(debye)')
[docs] def getResultWithThisEnergy(self, energy=None):
"""
Return the JaguarResults object for the geometry optimization step with
the given energy
:type energy: float or None
:param energy: The gas phase energy of the desired step. If None, the
gas_energy property of this Result object will be used. If that value is
None, the energy of the last step in the geometry optimization will be
used.
:rtype: `schrodinger.application.jaguar.output.JaguarResults`
:return: The results object with this energy, or the only results object
if the output contains a single point.
"""
jagout = get_jaguar_output(self.path)
if energy is None:
if self.gas_energy is None:
# No structure with energy was found, just use the last step of
# the geometry optimization
energy = jagout.last_results.energy
else:
energy = self.gas_energy
if jagout.geopt_step:
for index in range(len(jagout.geopt_step) - 1, -1, -1):
result = jagout.geopt_step[index]
if abs(result.energy - energy) < result.energy_precision:
return result
raise ValueError('Could not find result with energy: %s' %
str(energy))
else:
# Only a single point, so just return that result
return jagout.last_results
[docs] def getStructure(self):
"""
Get the structure associated with these results
:rtype: schrodinger.structure.Structure
:return: The output structure object for this step
"""
jag_result = self.getResultWithThisEnergy()
return jag_result.getStructure()
[docs] def getMaeStructure(self):
"""
Get the structure associated with these results from the .01.mae file -
this may have some associated properties on it.
:rtype: schrodinger.structure.Structure
:return: The output structure object for this step
"""
return get_jaguar_out_mae(self.path)
[docs]class Step(object):
"""
Manages the start, monitoring and finish of a single step in a workflow
"""
# Files with these extensions are not deleted after archiving
ARCHIVED_KEEPERS = set(['.spm', '.smap', '.vib', '.vis'])
# Files with these endings are the only archived input files
ARCHIVED_INPUT = ['.in', '.mae', '.maegz']
[docs] def __init__(self,
workflow,
parent=None,
noninheritable_parents=None,
optimization=True,
charge=0,
multiplicity=1,
property_name=None,
step_name="",
job_name="",
kcal=True,
solvent=None,
keystring="",
serial_only=False,
keep_jag_restart=True,
need_spm=False,
archive_files=False):
"""
Create a Step object
:type workflow: Workflow
:param workflow: The workflow that owns this step
:type parent: Step
:param parent: The parent job that must finish successfully before this
job can start, this parent has information that is inherited
:type noninheritable_parents: list of Step
:param noninheritable_parents: The parent jobs that must finish successfully
before this job can start, these parents do not have information that is
inherited
:type optimization: bool
:param optimization: True if this step should optimize the geometry,
False if not
:type charge: int
:param charge: The molecular charge for this step
:type multiplicity: int
:param multiplicity: The spin multiplicity for this step
:type property_name: str
:param property_name: The name of the property this step should create
when finished, None if no property will be created
:type step_name: str
:param step_name: The user-readable name of this step to use in messages
:type job_name: str
:param job_name: The base name of the file.
:type kcal: bool
:param kcal: True if the property should be in kcal/mol, False if not
:type solvent: dict
:param solvent: Dictionary of keyword/value pairs for solvent keywords.
If not given, a gas phase calculation will be run
:type keystring: str
:param keystring: Space separated keyword=value pairs. Each pair must
contain an equals sign
:type serial_only: bool
:param serial_only: If True, do not use any parallel options when
running Jaguar
:type keep_jag_restart: bool
:param keep_jag_restart: If True, add .01.in files Jaguar restart files
to the backend (that will get them copied to the original folder)
:type need_spm: bool
:param need_spm: If True, add Jaguar spm file
to the backend (that will get them copied to the original folder)
"""
self.finished = False
# self.ok - if a job hasn't failed yet
self.ok = True
# self.dead - True if job will never be able to start
self.doa = False
self.workflow = workflow
self.parent = parent
self.noninheritable_parents = noninheritable_parents
self.input = None
self.job = None
self.results = None
self.step_name = step_name
self.job_name = job_name
self.optimization = optimization
self.charge = charge
self.multiplicity = multiplicity
self.property_name = property_name
self.kcal = kcal
self.solvent = solvent
self.keystring = keystring
self.serial_only = serial_only
self.keep_jag_restart = keep_jag_restart
self.need_spm = need_spm
self.job_input_filenames = []
self.job_output_filenames = []
self.archive_files = archive_files
self.archive_filename = None
self.robust = workflow.robust
[docs] def log(self, msg, prefix=True, level=logging.INFO):
"""
Add a message to the parent workflow's log file
:type msg: str
:param msg: The message to add
:type prefix: bool
:param prefix: Whether to add information about the workflow and step
name to the front of the message string
:type level: int
:param level: A `logging` constant indicating the priority level of the
message
"""
if not self.workflow.logger:
return
if prefix:
msg = self.step_name + ': ' + msg
self.workflow.log(msg, prefix=prefix, level=level)
[docs] def setKeywords(self, input, keystring):
"""
Set the keywords for this job
:type input: jaguar_input.JaguarInput
:param input: The JaguarInput object to set the keywords on
:type keystring: str
:param keystring: Space separated keyword=value pairs. Each pair must
contain an equals sign
"""
for pair in keystring.split():
try:
keyword, value = pair.split('=')
input.setValue(keyword, value)
except ValueError:
print('Skipping malformed keyword %s' % pair)
[docs] def getStructure(self):
"""
Get the starting structure for this step
:rtype: schrodinger.structure.Structure
:return: The starting structure for this step
"""
if self.parent:
return self.parent.results.getMaeStructure()
def _hasStartableStatus(self):
"""
Check if this step has basic status attributes that mean it might be
startable
:rtype: bool
:return: True if the step might be startable based on status,
False if there is no sense in checking any further if it can start
"""
# self.finished = True if the step has fully completed its work already
# self.doa = True if it was previously determined this step will
# never be startable
# self.job = object if the subjob is currently running or is finished
# but we haven't had a chance to clean it up yet
# self.ok = False if this step failed already
return not any([self.finished, self.doa, self.job, not self.ok])
[docs] def canStart(self):
"""
Check to see if this job can start - if the parent job(s) have finished
successfully.
:rtype: bool
:return: True if the job can start, False if not
"""
if not self._hasStartableStatus():
return False
parents = []
if self.parent:
parents.append(self.parent)
if self.noninheritable_parents:
parents.extend(self.noninheritable_parents)
for parent in parents:
if not parent.ok:
self.log(
'Will not be started due to failure of parent step %s' %
parent.step_name)
# This job will never be able to start
self.doa = True
self.ok = False
return False
if not parent.finished:
return False
return True
[docs] def createJob(self):
"""
Submit a jaguar job under job control
:type jaginput: `schrodinger.application.jaguar.input.JaguarInput`
:param jaginput: The JaguarInput object to submit
:rtype: `schrodinger.job.jobcontrol.Job` object
:return: The Job object for the launched job
"""
options = self.workflow.options
filename = self.input.filename
return create_job(options,
filename,
serial_only=self.serial_only,
robust=self.robust)
[docs] def start(self):
"""
Start the job - create the input and write it, adding necessary output
files to make sure they get copied back
"""
self.input = self.getInput()
# First check to see if there is an existing output file for this step -
# this might have been created in a previous aborted run.
if self.getOutput(quiet=True):
self.log('Completed output found, will not run again')
self.finish()
return
if self.input:
self.writeInput()
self.job = self.createJob()
self.workflow.jobq.addJob(self.job)
add_jaguar_files_to_jc_backend(self.input.name,
spm=self.need_spm,
backend=self.workflow.backend,
restart=self.keep_jag_restart)
self.log('Added to job queue')
[docs] def storeFilenames(self):
"""
Store file names associated with this job before we delete the job
object
"""
try:
jcjob = self.job.getJob()
except AttributeError:
# Some Meta Workflow steps can be subprocess jobs rather than job
# control jobs and therefore will not have any getJob method
return
self.job_input_filenames = list(jcjob.InputFiles)
self.job_output_filenames = jcjob.OutputFiles + jcjob.LogFiles
[docs] def calcsDone(self):
"""
Check to see if the calculation finished
If finished and the job failed, self.ok will be set to False
:rtype: bool
:return: True if the calculation finished, False if not
"""
if self.job.hasExited():
self.finishProcessingJobControlJob()
if self.job.state == queue.JobState.DONE:
self.storeFilenames()
else:
self.log('Run failed - job state is %s' % str(self.job.state))
self.ok = False
return True
return False
[docs] def getOutput(self, quiet=False):
"""
Read in the results of the calculation
:type quiet: bool
:param quiet: If True, no error messages will be printed. If False,
(default) error messages will be printed. Also, if True, self.ok will
not be set to False if the output file cannot be read.
:rtype: None or JaguarOutput
:return: None if the calculation failed, or JaguarOutput object for
successful calculations.
"""
if not self.ok:
return None
if not quiet:
self.ok = False
try:
output = get_jaguar_output(self.input.name,
step_info=self.step_name)
except JaguarFailedException as msg:
if not quiet:
self.log(str(msg))
return None
try:
self.results = Results(self.input.name + '.out')
except ValueError as msg:
if not quiet:
self.log(str(msg))
return None
self.ok = True
self.finished = True
return output
[docs] def periodicMaintenance(self):
""" This method is periodically called while the workflow is running """
pass
[docs] def finishProcessingJobControlJob(self):
"""
Finish processing the job control job object before we release our
handle to it
"""
if self.robust:
jobutils.add_subjob_files_to_backend(self.job)
[docs] def finish(self):
"""
Do any work required to create properties when the calculation has
finished.
If property_name was provided to the constructor, this computes the
energy difference between this step and the inherited parent step and
stores it in the property name.
"""
output = self.getOutput()
if output and self.property_name and self.parent:
parent_energy = self.parent.results.energy
my_energy = self.results.energy
if self.kcal:
delta_energy = units.KCAL_PER_MOL_PER_HARTREE * (my_energy -
parent_energy)
e_units = 'kcal/mol'
else:
delta_energy = units.EV_PER_HARTREE * (my_energy -
parent_energy)
e_units = 'eV'
self.workflow.properties[self.property_name] = delta_energy
self.log('dE = %.6f Hartree - %.6f Hartree = %.2f %s' %
(my_energy, parent_energy, delta_energy, e_units))
self.finished = True
self.log('Work completed')
[docs] def write(self, writer, props=None, hierarchy=None):
"""
Add the final structure for this step to the output structure file
:type writer: schrodinger.StructureWriter
:param writer: The writer to use to write the structure
:type props: dict
:param props: A dictionary of property/value pairs to add to the
property dictionary of this object.
:param list hierarchy: The project group hierarchy for this result -
each item is a str
"""
if self.ok and self.finished:
struct = self.results.getMaeStructure()
if not struct:
# mae file can't be found or read
# Something went wrong with the step and wasn't detected
self.log('Unable to obtain results')
return
struct.title = self.job_name
# Ensure the structure does not have an entry name of
# Scratch, which prevents the structure from being incorporated
struct.property['s_m_entry_name'] = self.job_name
self.handleFileLinkProperties(struct)
if props:
struct.property.update(props)
if hierarchy:
msutils.set_project_group_hierarchy(struct,
hierarchy,
collapsed=True)
# Change GEOPT WAM from file-level to structure-level
# Subclasses of Step can have different out mae file names. We are
# only interested in mae files coming from Jaguar
st_path = get_out_mae_path(self.job_name)
if os.path.exists(st_path):
if wam.read_workflow_menu_header(st_path) == GEOPT_WAM:
jobutils.set_structure_wam(struct, GEOPT_WAM)
writer.append(struct)
if self.archive_files:
# Remove the structure file now that we are done with it
fileutils.force_remove(st_path)
[docs] def handleFileLinkProperties(self, struct):
"""
Fix existing Jaguar file link properties and add any new ones
:param `structure.Structure` struct: The structure with the properties
"""
# Fix the Jaguar file links
fix_jaguar_file_links(struct)
jobutils.set_source_path(struct)
if self.archive_filename:
# Remove any jaguar file links because those files are archived
for prop in list(struct.property.keys()):
if is_jaguar_file_property(prop):
del struct.property[prop]
[docs] def archiveFiles(self):
"""
Create a tar.gz archive of all jaguar files and then removes any files
that are no longer needed.
"""
if not self.archive_files or not self.finished or not self.ok:
return
# Gather the names of all files to potentially archive
allfiles = set(self.job_output_filenames)
for ifile in self.job_input_filenames:
# InputFiles also contains a number of jobcontrol files which
# are not Jaguar files, so filter them out
for ending in self.ARCHIVED_INPUT:
if ifile.endswith(ending):
allfiles.add(os.path.basename(ifile))
files = set()
# Filter out Jaguar files that should not be saved or don't exist
for afile in allfiles:
if not os.path.exists(afile):
continue
if afile.endswith('.01.in') and not self.keep_jag_restart:
continue
files.add(afile)
# Create the archive
self.archive_filename = self.job_name + '.tar.gz'
jobutils.archive_job_data(self.archive_filename, files)
jobutils.add_outfile_to_backend(self.archive_filename)
# Remove files we don't need anymore - we keep some files because
# Maestro needs those for visualization.
for afile in files:
ext = fileutils.splitext(afile)[1]
# We also keep .01.mae becuase that's needed later when we write
# the structure to the master file
if ext in self.ARCHIVED_KEEPERS or afile.endswith('.01.mae'):
continue
fileutils.force_remove(afile)
[docs]class FrozenStep(Step):
"""
A step that does not perform geometry optimization but just runs a
calculation at the geometry of the parent step.
"""
[docs] def __init__(self, *args, **kwargs):
"""
Create a Vertical Step object.
Overwrites any value of optimization that is passed in.
"""
kwargs['optimization'] = False
Step.__init__(self, *args, **kwargs)
[docs]class OptStep(Step):
"""
A step that performs a geometry optimization
"""
[docs]class WorkFlow(object):
"""
A class to hold the data for and shepherd a single structure through all
the jobs required to gather its data.
For a typical workflow, the job dependency tree may look like::
Neutral Optimization
|
---------------------------------------------------------- ----
| | | | | |
Cation Opt Cation Froz Anion Opt Anion Froz Triplet Opt TD-DFT
| |
Solution Cat Solution An
| |
Final Reorg Step Final Reorg Step
Any job may be submitted by the Workflow to the Queue as long as the jobs
above it on its branch of the tree have completed successfully.
"""
FALLBACK_BASE_NAME = 'structure'
[docs] def __init__(self,
struct,
options,
count,
jobq,
strcleaner=None,
logger=None,
subhierarchy=None,
robust=False):
"""
Create a Workflow object
:type struct: schrodinger.structure.Structure
:param struct: The initial structure to use for the workflow
:type options: argparse Namespace object
:param options: The input options.
:type count: int
:param count: A serial number to distinguish this workflow from other
workflows. May be used to create a unique base name.
:type jobq: JobDJ
:param jobq: The queue to submit jobs to
:type strcleaner: schrodinger.application.matsci.jobutils.StringCleaner
or None
:param strcleaner: A StringCleaner instance used to guarantee unique
workflow base names
:type logger: `logging.Logger`
:param logger: The logger for this class
:param str subhierarchy: If given, the final structure for all steps
other than the main step will be placed in a PT subgroup with this
name.
:param bool robust: If True, use the robust Jaguar driver to run Jaguar
jobs. If false, use Jaguar directly.
"""
self.logger = logger
self.steps = []
self.options = options
# Properties for the main 'base' structure
self.properties = {}
# Properties for children structures (anion, cation, triplet, etc.)
self.child_properties = {}
self.struct = struct
# Remove any success/failure properties from previous jobs
if self.struct:
msutils.remove_properties(
self.struct,
props=[msprops.WORKFLOW_FAILURE_FILE_PROP],
matches=[msprops.STEP_SUCCESS_START])
self.generateBaseName(strcleaner, count)
self.backend = jobcontrol.get_backend()
self.jobq = jobq
# Set self.main_step to the step that should get all the properties
# generated by this workflow. It will also be the step that is included
# in the main PT group instead of the subgroup specified by subhierarchy
self.main_step = None
self.subhierarchy = subhierarchy
self.robust = robust
self.getSteps()
self.check(log_zero_steps=True)
[docs] def generateBaseName(self, strcleaner, count):
"""
Come up with a base name for this workflow based on the workflow
structure title or a fallback name if the title can't be used. Pass
in a StringCleaner object to guarantee uniqueness.
:type strcleaner: schrodinger.application.matsci.jobutils.StringCleaner
or None
:param strcleaner: a StringCleaner instance to use for uniquifying names
:param int count: A serial number to distinguish this workflow from
other workflows. May be used to create a unique base name.
"""
fallback_used = False
# Create base name from the title if possible
if self.struct:
self.base_name = self.struct.title.strip().replace(' ', '_')
else:
self.base_name = None
if not self.base_name:
# Title couldn't be used, use a generic fallback
self.base_name = (jobutils.get_option(self.options, 'name') or
self.FALLBACK_BASE_NAME)
fallback_used = True
# Make sure the name works as a file name and is unique
if strcleaner:
self.base_name = strcleaner.cleanAndUniquify(self.base_name)
elif fallback_used:
self.base_name = f'{self.base_name}-{count}'
[docs] def log(self, msg, prefix=True, level=logging.INFO):
"""
Add a message to the log file
:type msg: str
:param msg: The message to add
:type prefix: bool
:param prefix: Whether to add information about the workflow and step
name to the front of the message string
:type level: int
:param level: A `logging` constant indicating the priority level of the
message
"""
if not self.logger:
return
if prefix:
msg = self.base_name + ': ' + msg
self.logger.log(level, msg)
[docs] def getSteps(self):
"""
Create all the steps required for this workflow
This method should almost certainly be overridden by any child class.
The example given here is just that - an example
"""
geopt = OptStep(self, keystring='dftname=B3LYP basis=MIDIX')
self.steps.append(geopt)
frozen_step = FrozenStep(self,
parent=geopt,
keystring='dftname=B3LYP basis=6-311+G*')
self.steps.append(frozen_step)
[docs] def periodicMaintenance(self):
"""
The `run_workflows` function will call this method periodically - it can
be used to perform operations while one of the workflow steps is running
"""
for step in self.steps:
try:
step.periodicMaintenance()
except AttributeError:
pass
[docs] def check(self, log_zero_steps=False):
"""
Check if this workflow is complete. Also, submit the next step(s) if the
previous step has finished.
:type log_zero_steps: bool
:param log_zero_steps: log a message if there are zero steps
:rtype: bool
:return: True if the workflow is complete, False if not
"""
if log_zero_steps and not self.steps:
eid = self.struct.property[msprops.ENTRY_ID_PROP]
msg = ('No matching steps could be found for the structure '
'with entry ID {eid} and so no jobs will be run. This '
'workflow will be marked as finished.').format(eid=eid)
self.log(msg)
return True
running_step = None
for step in self.steps:
# Cycle through all steps, finding the first one not yet finished
if step.finished or step.doa:
# Step is done or will never start due to failed parent,
# ignore it
continue
if step.job is not None:
# step.job is None if step hasn't started or has finished, so
# step must be running at this point
#
# calcsDone checks to see if the subjob for this step has
# finished
if step.calcsDone():
# Step calcs have completed since the last check, finish it
step.finish()
# finish() might intentionally NOT set step.finished =
# True because the step is waiting on some other step to
# complete (for example). If finished is not True,
# then the step is not...finished, so we do not want to
# clean things up yet
if step.finished:
# Release any memory associated with this job object
step.job = None
step.archiveFiles()
else:
# Found a step that isn't done yet - make note of it, but
# continue to check for other steps that can be submitted
running_step = step
else:
# The first unfinished step we found has not yet been started
if step.canStart():
# This step can run (because parent finished OK)
step.start()
running_step = step
if running_step:
# Currently running a step
return False
return True
[docs] def recordFailureStatus(self):
"""
Set properties based on the success/failure of each step, and write
workflow structures to a summary failed file if any step failed
"""
success_props = {}
for step in self.steps:
status = step.ok and step.finished
propname = msprops.STEP_SUCCESS_PROP.format(
stepname=step.step_name.replace(' ', '_'))
success_props[propname] = status
self.properties.update(success_props)
if self.struct and not all(success_props.values()):
# Note: some meta workflows do not have an associated structure
# because the input requires more than one structure object
if self.backend:
base = self.backend.getJob().Name
else:
try:
base = self.options.name
except AttributeError:
base = 'workflow'
failname = base + '-failed.maegz'
scopy = self.struct.copy()
scopy.property.update(success_props)
scopy.append(failname)
jobutils.add_outfile_to_backend(failname)
sourcepath = jobutils.determine_source_path(backend=self.backend)
orig_fail_path = sourcepath + '/' + failname
self.properties[msprops.WORKFLOW_FAILURE_FILE_PROP] = orig_fail_path
[docs] def write(self, writer):
"""
Write out the structure for this workflow and all the child structures
:type writer: schrodinger.StructureWriter
:param writer: The writer to use to write the structure
"""
self.recordFailureStatus()
if self.subhierarchy:
# If any structure has a hierarchy set, they all must have one set
# or else the entry groups are not grouped together in the project
# table when Maestro incorporates the job output.
if self.backend:
hierarchy_root = self.backend.getJob().Name
else:
base_file_name = os.path.basename(writer.filename)
hierarchy_root = fileutils.splitext(base_file_name)[0]
main_hierarchy = [hierarchy_root]
sub_hierarchy = [hierarchy_root, self.subhierarchy]
else:
main_hierarchy = sub_hierarchy = None
# Write the main step first if one was specified
if self.main_step:
main_step = self.main_step
elif self.steps:
main_step = self.steps[0]
else:
return
if main_step:
main_step.write(writer,
props=self.properties,
hierarchy=main_hierarchy)
for step in self.steps:
if step != main_step:
step.write(writer,
props=self.child_properties,
hierarchy=sub_hierarchy)
[docs]def run_workflows(jobq, active_workflows, writer):
"""
Run all the workflows and return when they are finished. At the end of this
function, active_workflows will be empty. Returns True if at least one job
did not fail (or there were no jobs to run), otherwise False.
:type jobq: `schrodinger.job.queue.JobDJ`
:param jobq: The JobDJ object to which jobs are added
:type active_workflows: list
:param active_workflows: List of Workflow objects that should be run
:type writer: `schrodinger.structure.StructureWriter`
:param writer: The StructureWriter object to use to right output structures
:rtype: bool
:return: True if at least one job did not fail, otherwise False
"""
def check_all_workflows(*args):
"""
Check each remaining workflow to see if it has completed. If so, write
out the final structure and remove it from the active_workflows list
"""
for workflow in active_workflows[:]:
if workflow.check():
workflow.write(writer)
# Make sure any final periodic maintenance is done
workflow.periodicMaintenance()
active_workflows.remove(workflow)
def maintenance():
""" Periodically call each workflow's periodicMaintenance method """
for workflow in active_workflows[:]:
workflow.periodicMaintenance()
if not jobq.waiting_jobs:
# Avoids a traceback from JobDJ if there's nothing to do.
# In the case of restart, when all the files are already present
# (except maybe the final maegz output, i.e. -opto.maegz
# or -marcus.maegz), this is needed to (re)generate this final maegz
# (MATSCI-2695)
# Check any remaining workflows that have steps that need to clean up
check_all_workflows()
return True
jobq.run(periodic_callback=maintenance,
status_change_callback=check_all_workflows)
# Check any remaining workflows that have steps that need to clean up
check_all_workflows()
# if all jobs failed, return False
if jobq.total_added == len(jobq._failed):
return False
else:
return True
# from original wave_function_analysis.py in MATSCI-8896
[docs]def parse_matrix(out_file):
"""
Parse a matrix from a jaguar output file
:param iterable out_file: An iterable such as the file handle. Should be
at the beginning of the matrix part of the file.
:rtype: `numpy.array`
:return: Numpy array containing the correlation matrix
"""
next(out_file) # Skip column numbers row
data_length = 0
update_length = True
extend = False # Whether values should be extended or appended
rows = []
while True:
line = next(out_file).strip()
if line == '':
break # End of matrix data
data = line.split()
if update_length:
data_length = len(data)
update_length = False
elif len(data) != data_length:
# Column numbers row. Get the data length for next row and skip
update_length = True
extend = True # Extend existing lists from now on
continue
values = [float(x) for x in data[1:]]
if extend:
row_num = int(data[0]) - 1
rows[row_num].extend(values)
else:
rows.append(values)
num_rows = len(rows)
assert all([len(row) == num_rows for row in rows])
return numpy.array(rows)
[docs]def fix_jaguar_file_links(struct, subdir=None):
"""
Change the Jaguar file link properties to point to the original job
directory instead of the subjob directory. If not running under job
control, the current working directory is used as the original job
directory.
:param schrodinger.structure.Structure struct: The structure to fix
:param str subdir: The subdirectory for these files in the main job
directory
"""
backend = jobcontrol.get_backend()
if backend:
orig_dir = jobutils.determine_source_path(backend=backend)
else:
orig_dir = os.getcwd()
if subdir:
orig_dir = '/'.join((orig_dir, subdir))
for prop, val in struct.property.items():
if is_jaguar_file_property(prop):
fname = os.path.basename(val)
# We use '/'.join rather than os.path.join in order to create paths
# that work on any system
struct.property[prop] = '/'.join((orig_dir, fname))
[docs]def get_delta_energy(reactants, products):
"""
Compute delta energy as: dE = E(Products) - E(Reactants) in kcal/mol
:param list[structure.Structure] reactants: List of reactants
:param list[structure.Structure] products: List of products
:rtype: float
:return: Energy in kcal/mol
"""
energy = sum(product.property[FINAL_ENERGY_PROP] for product in products)
energy -= sum(
reactant.property[FINAL_ENERGY_PROP] for reactant in reactants)
return energy * units.KCAL_PER_MOL_PER_HARTREE