"""
Utilities for reaction workflows.
Copyright Schrodinger, LLC. All rights reserved.
"""
import argparse
import copy
import csv
import enum
import glob
import itertools
import json
import os
import pandas
import re
import shutil
import math
import warnings
from collections import OrderedDict
from collections import namedtuple
from types import SimpleNamespace
import networkx as nx
import numpy
import yaml
from collections import defaultdict
from scipy import constants
from schrodinger import structure
from schrodinger import adapter
from schrodinger.application.jaguar.input import JaguarInput
from schrodinger.application.jaguar.output import JaguarOutput
from schrodinger.application.macromodel import utils as mmodutils
from schrodinger.application.matsci import anharmonic
from schrodinger.application.matsci import desmondutils
from schrodinger.application.matsci import etarotamers
from schrodinger.application.matsci import etatoggle
from schrodinger.application.matsci import \
jaguar_multistage_workflow_utils as jmswfu
from schrodinger.application.matsci import jaguar_restart
from schrodinger.application.matsci import textlogger
from schrodinger.application.matsci import jaguarworkflows
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import msutils
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import msprops
from schrodinger.application.matsci import \
genetic_optimization as go
from schrodinger.application.matsci import swap_fragments_utils as sfu
from schrodinger.infra import mm
from schrodinger.job import jobcontrol
from schrodinger.structutils import analyze
from schrodinger.structutils import rmsd
from schrodinger.structutils import rings
from schrodinger.structutils import rgroup_enumerate
from schrodinger.structutils import build
from schrodinger.math import mathutils
from schrodinger.test.stu.common import zip_directory
from schrodinger.utils import csv_unicode
from schrodinger.utils import fileutils
from schrodinger.utils import subprocess
from schrodinger.utils import units
from schrodinger.utils.license import is_opls2_available
FLAG_N_CONFORMERS = '-n_conformers'
FLAG_PP_REL_ENERGY_THRESH = '-pp_rel_energy_thresh'
FLAG_RETURN_CSEARCH_FILES = '-return_csearch_files'
FLAG_SKIP_ETA_ROTAMERS = '-skip_eta_rotamers'
FLAG_JAGUAR = '-jaguar'
FLAG_JAGUAR_KEYWORDS = '-jaguar_keywords'
FLAG_TEMP_START = '-temp_start'
FLAG_TEMP_STEP = '-temp_step'
FLAG_TEMP_N = '-temp_n'
FLAG_PRESS_START = '-press_start'
FLAG_PRESS_STEP = '-press_step'
FLAG_PRESS_N = '-press_n'
FLAG_MAX_I_FREQ = anharmonic.FLAG_MAX_I_FREQ
FLAG_TPP = parserutils.FLAG_TPP
FLAG_DESCRIPTORS = '-descriptors'
FLAG_LIGFILTER = '-ligfilter'
FLAG_CANVAS = '-canvas'
FLAG_MOLDESCRIPTORS = '-moldescriptors'
FLAG_INCLUDE_VECTORIZED = '-include_vectorized'
FLAG_FINGERPRINTS = '-fingerprints'
FLAG_SAVEFILES = '-savefiles'
FLAG_STPROPS = '-stprops'
FLAG_INPUT_FILE = '-input_file'
DEFAULT_JOB_NAME = 'reaction_workflow'
DEFAULT_N_CONFORMERS = 5
# keys and values should be strings
DEFAULT_JAGUAR_KEYWORDS = OrderedDict([('dftname', 'B3LYP'), ('iuhf', '1'),
('basis', 'LACVP**')])
DEFAULT_TPP = 1
# OPLS_2005 does not allow ZOB to sp3 carbon while OPLS3 does
DEFAULT_FORCE_FIELD_NAME = mm.OPLS_NAME_F16 # OPLS3
RESERVED_JAGUAR_KEYS = set([
'molchg', 'multip', 'igeopt', 'ifreq', 'itrvec', 'nhesref', 'ntemp',
'tmpini', 'tmpstp', 'npress', 'press', 'press_step'
])
REACTION_WF_TAG = '_rxnwf'
REACTION_WF_STRUCTURE_KEY = sfu.REACTION_WF_STRUCTURE_KEY
CONFORMERS_GROUP_KEY = sfu.CONFORMERS_GROUP_KEY
SIBLING_GROUP_KEY = sfu.SIBLING_GROUP_KEY
PARENT_SIBLING_GROUPS_KEY = sfu.PARENT_SIBLING_GROUPS_KEY
CHARGE_KEY = msprops.CHARGE_PROP
MULTIPLICITY_KEY = msprops.MULTIPLICITY_PROP
KEEP_ATOM_KEY = sfu.KEEP_ATOM_PROP
SUPERPOSITION_ATOM_KEY = sfu.SUPERPOSITION_ATOM_PROP
RESTRAINED_ATOM_KEY = 'b_matsci_Reaction_Workflow_Restrained_Atom'
RESTRAINED_DISTANCES_KEY = sfu.RESTRAINED_DISTANCES_KEY
RESTRAINED_ANGLES_KEY = sfu.RESTRAINED_ANGLES_KEY
RESTRAINED_DIHEDRALS_KEY = sfu.RESTRAINED_DIHEDRALS_KEY
TRANSITION_STATE_STRUCTURE_KEY = sfu.TRANSITION_STATE_STRUCTURE_KEY
INDEX_SEPARATOR = sfu.INDEX_SEPARATOR
PARENT_SEPARATOR = ','
SEPARATOR = sfu.SEPARATOR
ENTRY_ID_KEY = msprops.ENTRY_ID_PROP
RATE_CONSTANT_STRING = 'Rate_Constant'
CONF_AVG_CTST_RATE_CONSTANT = 'Conf_Avg_CTST_Rate_Constant'
TempData = namedtuple('TempData', ['temp_start', 'temp_step', 'temp_n'])
PressData = namedtuple('PressData', ['press_start', 'press_step', 'press_n'])
ReactProdTS = namedtuple('ReactProdTS', ['ts_name', 'other_name'])
SameTypeGSTS = namedtuple('SameTypeGSTS', ['ts_name', 'other_name', 'is_gsgs'])
Pair = namedtuple('Pair', ['first', 'second'])
DF_COL_TS_SIBLING, DF_COL_OTHER_SIBLING = 'TS_Sibling', 'Other_Sibling'
DF_COL_EN_PROP = 'Energy_Property'
DfCustomRateColumns = namedtuple('DfCustomRateColumns', [
DF_COL_TS_SIBLING, DF_COL_OTHER_SIBLING, 'Temperature_K', 'Pressure_atm',
DF_COL_EN_PROP, 'Energy_Barrier_kcal_per_mol', 'TS_Partition_F',
'Other_Partition_F', RATE_CONSTANT_STRING, 'Rate_Constant_Type',
'Rate_Constant_Units'
])
DF_COL_SIBLING = 'Sibling'
DfCustomEqColumns = namedtuple('DfCustomEqColumns', [
DF_COL_SIBLING, DF_COL_OTHER_SIBLING, 'Temperature_K', 'Pressure_atm',
'Energy_Barrier_kcal_per_mol', DF_COL_EN_PROP, 'Keq'
])
HARTREE_UNITS = ['au']
KCAL_PER_MOL_UNITS = ['kcal/mol']
EV_UNITS = ['eV']
KJOULES_PER_MOL_UNITS = ['kJ/mol']
KNOWN_UNITS = (HARTREE_UNITS + KCAL_PER_MOL_UNITS + EV_UNITS +
KJOULES_PER_MOL_UNITS)
# kcal/mol/K
IDEAL_GAS_CONSTANT = (constants.R / constants.calorie) / constants.kilo
STAGE_SEPARATOR = '_stage_'
REL_REACTANTS_W_X_EXT = '_conf_avg_rel_reactants.csv'
REL_REACTANTS_WO_X_EXT = '_conf_avg_wo_x_rel_reactants.csv'
REL_REACTANTS_LOWEST_EXT = '_lowest_conf_rel_reactants.csv'
REL_PARENTS_W_X_EXT = '_conf_avg_rel_parents.json'
REL_PARENTS_WO_X_EXT = '_conf_avg_wo_x_rel_parents.json'
REL_PARENTS_LOWEST_EXT = '_lowest_conf_rel_parents.json'
REL_REACTANTS_EXTS = [
REL_REACTANTS_W_X_EXT, REL_REACTANTS_WO_X_EXT, REL_REACTANTS_LOWEST_EXT
]
REL_PARENTS_EXTS = [
REL_PARENTS_W_X_EXT, REL_PARENTS_WO_X_EXT, REL_PARENTS_LOWEST_EXT
]
REL_EXT_TO_KWARG_DICT = {
REL_REACTANTS_W_X_EXT: 'edict_w_x',
REL_REACTANTS_WO_X_EXT: 'edict_wo_x',
REL_REACTANTS_LOWEST_EXT: 'edict_lowest',
REL_PARENTS_W_X_EXT: 'edict_w_x',
REL_PARENTS_WO_X_EXT: 'edict_wo_x',
REL_PARENTS_LOWEST_EXT: 'edict_lowest'
}
SIBLING_GROUP_NAME = 'Sibling_Group_Name'
ANHARMONIC_TAG = '_anharmonic'
RATE_CONSTANTS_W_X_EXT = '_conf_avg_rate_constants.csv'
CUSTOM_RATE_CONSTS_EXT = '_custom_rate_constants.csv'
CUSTOM_KEQ_CONSTS_EXT = '_custom_keq.csv'
FFLD_ENERGY_GLOBAL_TAG = '_global'
MmodEnergyKeys = namedtuple('MmodEnergyKeys', ['absolute', 'relative'])
# kJ/mol
CSEARCH_REL_ENERGY_THRESH = 50.
# Ang.
CSEARCH_RMSD_THRESH = 0.1
JMSWF_LAUNCH_DIR = 'jaguar_multistage_workflow'
ANHARM_LAUNCH_DIR = 'anharmonic'
CTST_RATE_CONSTANTS_DIR = 'CTST_rate_constants'
# the variable name pp_rel_energy_thresh uses the prefix pp to indicate
# that this is the relative energy used in postprocessing MacroModel results
# and is different from the variable name rel_energy_thresh, used elsewhere in this
# code, which is the relative energy used in the MacroModel csearch
PP_CSEARCH_REL_ENERGY_THRESH = None # kJ/mol or None
DESCRIPTORS_LAUNCH_DIR_HEADER = 'descriptors_'
EnergyAnalysisProperty = namedtuple('EnergyAnalysisProperty', [
'sibling_group', 'conformer_groups', 'representative_conformers',
'temperature', 'pressure', 'energy_key', 'property_key', 'avg_property_key',
'atom_idx', 'ensemble', 'include_x_terms', 'only_lowest_energy'
])
OUT_EXT = '-out'
DEFAULT_ENUM_RXNWF_JOB_NAME = 'enumerate_reaction_workflow'
# from_idx specifies the direction of the substitution relative
# to to_idx, to_idx specifies the substitution site, hash_idx
# is an enumeration index which maps the site to an R-group file,
# structure_idx indicates which structure in the input file this
# site applies to
Site = namedtuple('Site', ['from_idx', 'to_idx', 'hash_idx', 'structure_idx'])
Source = namedtuple('Source', ['rgroup_st', 'site_idxs'])
get_msg = lambda site, msg: (
f'Invalid atom indices ({site.from_idx}, '
f'{site.to_idx}) for enumeration index {site.hash_idx} '
f'for structure index {site.structure_idx}. {msg}')
RGROUP_SEPARATOR = '_'
NOVEL = 'novel'
REFERENCE = 'reference'
DEFAULT_AUTO_RXNWF_JOB_NAME = 'automatic_reaction_workflow'
ENERGY_KEYS = 'ENERGIES'
ANHARMONIC_ENERGY_KEYS = 'ANHARMONIC_ENERGIES'
CUSTOM_ENERGY_KEYS = "CUSTOM_ENERGIES"
RATE_CONSTANT_KEY = "RATE_CONSTANT_KEY"
MASS_N_DECIMAL = 3
# hard code was decided in MATSCI-10586
KEQ_MAX = 10e100
[docs]def log(msg, **kwargs):
"""
Add a message to the log file
:param str msg: The message to log
Additional keyword arguments are passed to the textlogger.log_msg function
"""
textlogger.log_msg(msg, **kwargs)
[docs]def get_mmod_energy_key(ffld_name=None):
"""
Returns the corresponding MacroModel energy key for the given forcefield.
If no forcefield is given and the license is available, it defaults to
S-OPLS. If no forcefield is given and the license is NOT available, it
defaults to OPLS2005.
:param str ffld_name: The forcefield to get the energy key for. If `None`,
defaults to the latest forcefield available given your licenses.
:return str energy_key: The MacroModel suffix `key` for the default
forcefield. Units are in kJ/mol
"""
keys = get_mmod_energy_keys(ffld_name)
return keys.absolute
[docs]def get_mmod_rel_energy_key(ffld_name=None):
"""
Returns the corresponding MacroModel relative energy key for the given
forcefield. If no forcefield is given and the license is available, it
defaults to S-OPLS. If no forcefield is given and the license is NOT
available, it defaults to OPLS2005.
:param str ffld_name: The forcefield to get the energy key for. If `None`,
defaults to the latest forcefield available given your licenses.
:return str energy_key: The MacroModel suffix `key` for the default
forcefield.
"""
keys = get_mmod_energy_keys(ffld_name)
return keys.relative
[docs]def get_mmod_energy_keys(ffld_name=None):
"""
Returns the corresponding MacroModel energy key for the given forcefield.
If no forcefield is given and the license is available, it defaults to
S-OPLS. If no forcefield is given and the license is NOT available, it
defaults to OPLS2005.
:param str ffld_name: The forcefield to get the energy key for. If `None`,
defaults to the latest forcefield available given your licenses.
:return namedtuple energy_keys: A named two-tuple where the first element
(with a key of `absolute`) is the MacroModel suffix for the absolute
energy (in kJ/mol), and the second element (with a key of `relative`)
is the MacroModel suffix for the relative energy.
"""
sopls_key = f'r_mmod_Potential_Energy-{mm.OPLS_NAME_F16}'
sopls_rel_key = f'r_mmod_Relative_Potential_Energy-{mm.OPLS_NAME_F16}'
s_opls_keys = MmodEnergyKeys(sopls_key, sopls_rel_key)
# The value for `mm.OPLS_NAME_F16` matches the suffix for the energy keys,
# but the value for `mm.OPLS_NAME_F14` does not match its corresponding
# suffix. Here we hard-code the correct suffix (suffix is 'OPLS-2005'
# while name is 'OPLS_2005').
opls2005_key = 'r_mmod_Potential_Energy-OPLS-2005'
opls2005_rel_key = 'r_mmod_Relative_Potential_Energy-OPLS-2005'
opls2005_keys = MmodEnergyKeys(opls2005_key, opls2005_rel_key)
if ffld_name == mm.OPLS_NAME_F16:
return s_opls_keys
elif ffld_name == mm.OPLS_NAME_F14:
return opls2005_keys
elif ffld_name is None:
if is_opls2_available():
return s_opls_keys
else:
return opls2005_keys
else:
raise ValueError(f'{ffld_name} is not a recognize forcefield name.')
[docs]@enum.unique
class DF_RATE_TYPE(enum.Enum):
DEFAULT = 'default'
LNQ = 'lnq'
ANH_LNQ = 'anharmonic_lnq'
[docs]def get_reactions_data(sts):
"""
Given conformers dictionary return list of reactions.
:param list[structure.Structure] sts: List of structures
:rtype: list[Pair]: List of pairs that holds two ReactProdTS (first, second)
or SameTypeGSTS (for the Int-Int or TS-TS case) which in turn hold the
data for calculating both two rate constants and two equilibrium constants
"""
sibling_conformers_dict = bin_structures_by_property(
sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY)
ts_sibling_groups, intermediate_sibling_groups = [], []
for sibling_group, conformers_dict in sibling_conformers_dict.items():
rep_sts = [conformers[0] for conformers in conformers_dict.values()]
if has_transition_state(rxnwf_sts=rep_sts):
ts_sibling_groups.append(sibling_group)
else:
intermediate_sibling_groups.append(sibling_group)
react_prod_ts = []
for ts_name, other_name in itertools.product(ts_sibling_groups,
intermediate_sibling_groups):
react_prod_ts.append(ReactProdTS(ts_name, other_name))
# Make pairs of same type of siblings, GS-GS, TS-TS (MATSCI-12024)
for gs_name_a, gs_name_b in itertools.combinations(
intermediate_sibling_groups, 2):
react_prod_ts.append(SameTypeGSTS(gs_name_a, gs_name_b, True))
for ts_name_a, ts_name_b in itertools.combinations(ts_sibling_groups, 2):
react_prod_ts.append(SameTypeGSTS(ts_name_a, ts_name_b, False))
if len(react_prod_ts) == 1:
# If there is only one react_prod_ts, duplicate it in the pair
return [Pair(react_prod_ts[0], react_prod_ts[0])]
ret = []
for rpts_a, rpts_b in itertools.combinations(react_prod_ts, 2):
ret.append(Pair(rpts_a, rpts_b))
# Note that list of pairs can contain repeating siblings, both ts and other,
# since all the combinations are generated
return ret
[docs]def add_remove_props(sts, prop, add_props):
"""
Set additional properties equal to the original property from input
structures, original property will be deleted.
:param list[structure.Structure] sts: List of structures to be modified
:param str prop: Value of this property will be used, the property will be
deleted
:param list[str] add_props: List of properties to be added to the structure
"""
for struct in sts:
val = struct.property.pop(prop)
for add_prop in add_props:
struct.property[add_prop] = val
[docs]def get_pressures_temps(press_data, temp_data):
"""
Get list of pressures and temepratures from input data.
:param PressData press_data: Pressure data
:param TempData temp_data: Temperature data
:rtype: list[float], list[float]
:return: List of pressures, list of temperatures
"""
pressures = [
press_data.press_start + i * press_data.press_step
for i in range(0, press_data.press_n)
]
temps = [
temp_data.temp_start + i * temp_data.temp_step
for i in range(0, temp_data.temp_n)
]
return pressures, temps
[docs]def set_gas_phase_zpe_props(sts, press_data, temp_data, stage_idx):
"""
Set temp/pressure dependent gas phase + ZPE property in the input structures
based on the original property
:param list[structure.Structure] sts: List of structures to be modified
:param PressData press_data: Pressure data
:param TempData temp_data: Temperature data
:param int stage_idx: Stage index
"""
pressures, temps = get_pressures_temps(press_data, temp_data)
e_zpe_props = [
'%s%s%d' % (jaguarworkflows.get_gas_phase_zpe_key(
temp, press), STAGE_SEPARATOR, stage_idx)
for press, temp in itertools.product(pressures, temps)
]
prop = '%s%s%d' % (jaguarworkflows.GAS_PHASE_ZERO_POINT_ENERGY_PROP,
STAGE_SEPARATOR, stage_idx)
add_remove_props(sts, prop, e_zpe_props)
[docs]def get_eprop_data(eprop):
"""
Get conversion factor to kcal/mol, temperature in K, pressure in atm,
from energy property
:param str eprop: Energy property
:rtype: float, float, float
:return: conversion factor to kcal/mol, temperature in K, pressure in atm
"""
conv = ReactionWorkflowEnergyAnalysis.getKcalPerMolConversion(eprop)
assert conv is not None # Should not happen
temp = jaguarworkflows.get_temperature(eprop)
assert temp is not None # Should not happen
press = jaguarworkflows.get_pressure(eprop)
assert press is not None # Should not happen
return conv, temp, press
[docs]def add_custom_eq_rows(name, other_name, avg_props, other_avg_props, eprop,
dframe):
"""
Add data with kEQ to the dataframe
:param str name: Sibling name
:param str other_name: Other sibling name
:param list[EnergyAnalysisProperty] avg_props: List of average properties
of sibling
:param list[EnergyAnalysisProperty] other_avg_props: List of average
properties of the other sibling
:param str eprop: Energy property
:param pandas.DataFrame dframe: Input dataframe
:rtype: pandas.DataFrame
:return: Updated dataframe
"""
def keq_in_dframe(dframe, sibling, other_sibling, eprop):
query = ('%s == "%s" and %s == "%s" and %s == "%s"' %
(DF_COL_SIBLING, sibling, DF_COL_OTHER_SIBLING, other_sibling,
DF_COL_EN_PROP, eprop))
return bool(len(dframe.query(query)))
from schrodinger.application.matsci import boltzmann_avg_props as boltz
conv, temp, press = get_eprop_data(eprop)
denergy = (boltz.get_averaged_value(eprop, avg_props) -
boltz.get_averaged_value(eprop, other_avg_props)) * conv
# Forward and backward
for factor, (name_a, name_b) in zip([1, -1], [(name, other_name),
(other_name, name)]):
if keq_in_dframe(dframe, name_a, name_b, eprop):
# Skip already present row
continue
if denergy < -100:
# hard code was decided in MATSCI-10586
keq = KEQ_MAX
else:
denergy *= factor
keq = get_keq(denergy, temp)
keq = min(keq, KEQ_MAX)
df_row = DfCustomEqColumns(Sibling=name_a,
Other_Sibling=name_b,
Temperature_K=temp,
Pressure_atm=press,
Energy_Property=eprop,
Energy_Barrier_kcal_per_mol=denergy,
Keq=keq)
dframe = dframe.append(df_row._asdict(), ignore_index=True)
return dframe
[docs]def add_custom_rate_rows(reaction,
ts_avg_props,
other_avg_props,
eprop,
dframe,
is_anharm,
wigner_tunnel_corr=False):
"""
Add data with custom rates to the dataframe
:param ReactProdTS reaction: Reaction/product - TS
:param list[EnergyAnalysisProperty] ts_avg_props: List of average properties
:param list[EnergyAnalysisProperty] other_avg_props: List of other average
properties
:param str eprop: Energy property
:param pandas.DataFrame dframe: Input dataframe
:param bool is_anharm: Whether it is anharmonic calculation
:type wigner_tunnel_corr: bool
:param wigner_tunnel_corr: whether to include the Wigner tunneling correction
when computing rate constant(s)
:rtype: pandas.DataFrame
:return: Updated dataframe
"""
from schrodinger.application.matsci import boltzmann_avg_props as boltz
conv, temp, press = get_eprop_data(eprop)
denergy = (boltz.get_averaged_value(eprop, ts_avg_props) -
boltz.get_averaged_value(eprop, other_avg_props)) * conv
rate = get_custom_rate(denergy, temp)
df_row = DfCustomRateColumns(TS_Sibling=reaction.ts_name,
Other_Sibling=reaction.other_name,
Temperature_K=temp,
Pressure_atm=press,
Energy_Property=eprop,
Energy_Barrier_kcal_per_mol=denergy,
TS_Partition_F=None,
Other_Partition_F=None,
Rate_Constant=rate,
Rate_Constant_Type=DF_RATE_TYPE.DEFAULT.value,
Rate_Constant_Units='1/s')
if not is_anharm:
# rate without Q is only computed for the harmonic case
dframe = dframe.append(df_row._asdict(), ignore_index=True)
ifreq = boltz.get_averaged_value(eprop, ts_avg_props,
jaguarworkflows.LOWEST_FREQUENCY_PROP)
ts_lnq = get_lnq(eprop, ts_avg_props, is_anharm)
other_lnq = get_lnq(eprop, other_avg_props, is_anharm)
dlnq = ts_lnq - other_lnq
rate_type = (DF_RATE_TYPE.ANH_LNQ if is_anharm else DF_RATE_TYPE.LNQ)
rate = get_custom_rate(denergy,
temp,
dlnq=dlnq,
ifreq=ifreq,
wigner_tunnel_corr=wigner_tunnel_corr)
df_row = df_row._replace(TS_Partition_F=ts_lnq,
Other_Partition_F=other_lnq,
Rate_Constant_Type=rate_type.value,
Rate_Constant=rate)
dframe = dframe.append(df_row._asdict(), ignore_index=True)
return dframe
[docs]def get_custom_keq_rates(out_mae,
press_data,
temp_data,
extra_stages_file=None,
compute_rates=True,
wigner_tunnel_corr=False):
"""
Given output mae file, write custom equilibrium constants and optionally
custom rates to CSV dataframes.
:param str out_mae: Output file name
:param PressData press_data: Pressure data
:param TempData temp_data: Temperature data
:type extra_stages_file: str or None
:param extra_stages_file: Extra staged file name or None
:param bool compute_rates: Whether to compute rates or only equilibrium
constants
:type wigner_tunnel_corr: bool
:param wigner_tunnel_corr: whether to include the Wigner tunneling correction
when computing rate constant(s)
"""
def rate_in_dframe(dframe, rpts, eprop):
query = ('%s == "%s" and %s == "%s" and %s == "%s"' %
(DF_COL_TS_SIBLING, rpts.ts_name, DF_COL_OTHER_SIBLING,
rpts.other_name, DF_COL_EN_PROP, eprop))
return bool(len(dframe.query(query)))
from schrodinger.application.matsci import boltzmann_avg_props as boltz
sts = list(structure.StructureReader(out_mae))
stage_idx = get_rep_stage_idx(rxnwf_sts=sts)
extra_eprops = set()
if extra_stages_file:
extra_eprops = set_extra_energy_props(sts, extra_stages_file,
press_data, temp_data)
set_gas_phase_zpe_props(sts, press_data, temp_data, stage_idx)
eprops = set()
for starter in (jaguarworkflows.TOTAL_FREE_ENERGY_STARTER,
jaguarworkflows.TOTAL_ENTHALPY_STARTER,
jaguarworkflows.GAS_PHASE_ZERO_POINT_ENERGY_PROP,
jaguarworkflows.TOTAL_INTERNAL_ENERGY_STARTER):
eprops.update(
get_present_props(starter, sts[0].property, stage_idx=stage_idx))
anh_eprops = set()
for starter in (anharmonic.U_STARTER, anharmonic.H_STARTER,
anharmonic.G_STARTER):
anh_eprops.update(
get_present_props(starter, sts[0].property, stage_idx=stage_idx))
all_eprops = eprops | anh_eprops | extra_eprops # add sets
# see MATSCI-12044 - for a frequency calculation on an atom the following
# property is not defined by Jaguar in the restart Maestro file, yet for
# proper handling of imaginary frequencies for transition states it needs
# to be defined so arbitrarily define it to be zero
for st in sts:
if st.atom_total == 1:
st.property[jaguarworkflows.LOWEST_FREQUENCY_PROP] = 0
# do not dedup by geometry for now
avg_props = boltz.get_averaged_properties(sts,
all_eprops,
allow_sibling_groups=True,
dedup_geom_eps=0)
dframe = pandas.DataFrame(columns=DfCustomRateColumns._fields)
dframe_eq = pandas.DataFrame(columns=DfCustomEqColumns._fields)
reactions = get_reactions_data(sts)
data = dict()
for reaction in reactions:
for name in (reaction.first.ts_name, reaction.first.other_name,
reaction.second.ts_name, reaction.second.other_name):
if name not in data:
data[name] = [
ea_prop for ea_prop in avg_props
if ea_prop.sibling_group == name
]
for eprop in all_eprops:
is_anharm = eprop in anh_eprops
if compute_rates:
# Only add if sibling + other sibling + energy property is not in
# the dframe. reactions obtained from get_reactions_data can contain
# repeating siblings, both ts and other.
if (isinstance(reaction.first, ReactProdTS) and
not rate_in_dframe(dframe, reaction.first, eprop)):
dframe = add_custom_rate_rows(
reaction.first,
data[reaction.first.ts_name],
data[reaction.first.other_name],
eprop,
dframe,
is_anharm,
wigner_tunnel_corr=wigner_tunnel_corr)
if (isinstance(reaction.second, ReactProdTS) and
not rate_in_dframe(dframe, reaction.second, eprop)):
dframe = add_custom_rate_rows(
reaction.second,
data[reaction.second.ts_name],
data[reaction.second.other_name],
eprop,
dframe,
is_anharm,
wigner_tunnel_corr=wigner_tunnel_corr)
if reaction.first == reaction.second:
# This is the case where there is only one pair, see
# get_reactions_data
if reaction.first.ts_name != reaction.first.other_name:
dframe_eq = add_custom_eq_rows(
reaction.first.ts_name, reaction.first.other_name,
data[reaction.first.ts_name],
data[reaction.first.other_name], eprop, dframe_eq)
else:
if reaction.first.ts_name != reaction.second.ts_name:
dframe_eq = add_custom_eq_rows(
reaction.first.ts_name, reaction.second.ts_name,
data[reaction.first.ts_name],
data[reaction.second.ts_name], eprop, dframe_eq)
if reaction.first.other_name != reaction.second.other_name:
dframe_eq = add_custom_eq_rows(
reaction.first.other_name, reaction.second.other_name,
data[reaction.first.other_name],
data[reaction.second.other_name], eprop, dframe_eq)
if len(dframe):
# Only write non-empty file
out_fn = '%s%s' % (jobutils.get_jobname(DEFAULT_JOB_NAME),
CUSTOM_RATE_CONSTS_EXT)
dframe.to_csv(out_fn)
jobutils.add_outfile_to_backend(out_fn)
if len(dframe_eq):
# Only write non-empty file
out_eq_fn = '%s%s' % (jobutils.get_jobname(DEFAULT_JOB_NAME),
CUSTOM_KEQ_CONSTS_EXT)
dframe_eq.to_csv(out_eq_fn)
jobutils.add_outfile_to_backend(out_eq_fn)
[docs]def get_keq(energy, temp):
"""
Get k_equilibrium given energy and temperature.
:param float energy: energy in kcal/mol
:param float temp: Temperature in Kelvin
:rtype: float
:return: k_equilibrium
"""
return numpy.exp(-1 * energy / IDEAL_GAS_CONSTANT / temp)
[docs]def get_custom_rate(energy,
temp,
dlnq=None,
ifreq=None,
wigner_tunnel_corr=False):
"""
Get custom rate.
:param float energy: Energy in kcal/mol (!)
:param temp: Temperature in K
:type lnq: None or float
:param lnq: lnQ to compute rate with partition function, unitless
:type ifreq: None or float
:param ifreq: Lowest negative frequency to compute rate with
partition function, in 1/cm
:type wigner_tunnel_corr: bool
:param wigner_tunnel_corr: whether to include the Wigner tunneling correction
when computing rate constant(s)
:rtype: float
:return: Rate in 1/s
"""
if energy < -100:
# hard code was decided in MATSCI-10586
return 10e100
# k = kBh * T * np.exp(-1.0 * energy / kB1 / T) # 1/s
kBh = constants.value('Boltzmann constant in Hz/K')
chkB = constants.value('second radiation constant') # = 0.01438776877 m * K
kval = kBh * temp * get_keq(energy, temp)
if dlnq is None:
kval = min(kval, KEQ_MAX)
return kval
if wigner_tunnel_corr:
assert ifreq is not None
assert ifreq < 0, 'Found positive frequency: %f' % ifreq
ifreq_m = ifreq / constants.centi # from 1/cm to 1/m
w_corr = (-1 * ifreq_m * chkB / temp)**2 / 24.0 + 1.0
else:
w_corr = 1
# k = w * kBh * T * exp( D(lnQ) ) * exp(-1.0 * energy / kB1 / T) # 1/s
kval = w_corr * numpy.exp(dlnq) * kval
kval = min(kval, KEQ_MAX)
return kval
[docs]def get_present_props(en_starter, properties, stage_idx=None):
"""
Get all properties with a certain starter and optionally stage index
:param str en_starter: Property must start with this
:param list[str] properties: List of properties
:type stage_idx: None or int
:param stage_idx: Stage index
:rtype: list[str]
:return: List of matching properties
"""
postfix = ('' if stage_idx is None else '%s%d' %
(STAGE_SEPARATOR, stage_idx))
return [
prop for prop in properties
if (prop.startswith(en_starter) and prop.endswith(postfix))
]
[docs]def get_present_props_from_sts(sts, prefix, stage_idx=None):
"""
Get properties that start with prefix and end with stage index
(if provided). Check if pressure/temperature is present. Ensure that all
the structures have at least the same list of properties
:param list[structure.Structure] sts: List of structures
:param str prefix: Property must start with this
:type stage_idx: None or int
:param stage_idx: Stage index
:rtype: set[str]
:return: Set of properties
"""
props = set()
for aprop in get_present_props(prefix, sts[0].property,
stage_idx=stage_idx):
press = jaguarworkflows.get_pressure(aprop)
temp = jaguarworkflows.get_temperature(aprop)
assert press is not None and temp is not None, (
'Pressure or '
'temperature is missing from %s' % aprop)
props.add(aprop)
# All structures must have these properties
assert all(props.issubset(struct.property) for struct in sts), (
'Some '
'structures are missing properties: %s' % props)
return props
[docs]def get_lnq(eprop, averaged_properties, is_anharm):
"""
Get lnQ Boltzmann averaged over energy property.
:param str e_prop: Energy property
:param dict averaged_properties: Averaged Boltzmann properties
:param bool is_anharm: Whether to use anharmonic approximation
:rtype: float
:return: Averaged lnQ
"""
temp = jaguarworkflows.get_temperature(eprop)
assert temp is not None # Should not happen
press = jaguarworkflows.get_pressure(eprop)
assert press is not None # Should not happen
if is_anharm:
press = jaguarworkflows.format_pressure(press)
lnq_prop = anharmonic.LNQ_KEY.format(temp=temp, press=press)
# Stage is set in anharmonic case, but not it harmonic case
lnq_prop += '%s%d' % (STAGE_SEPARATOR, get_stage_idx(eprop))
else:
lnq_prop = jaguarworkflows.get_lnq_key(temp, press)
from schrodinger.application.matsci import boltzmann_avg_props as boltz
lnq = boltz.get_averaged_value(eprop, averaged_properties, lnq_prop)
return lnq
[docs]def get_restrain_atom_idxs(st):
"""
Return a list of indices of restrain atoms
in the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:rtype: list
:return: contains indices of restrain atoms
"""
return sfu.get_idxs_marked_atoms(st, RESTRAINED_ATOM_KEY)
[docs]def get_idx_groups(text):
"""
Get index groups from the given string.
:type text: str
:param text: the string
:raise: InvalidInput if there is a formatting issue
:rtype: list
:return: contains list of indices
"""
try:
return sfu.get_idx_groups(text)
except sfu.SwapFragmentsException as err:
raise InvalidInput(str(err))
[docs]def get_restrain_distance_idxs(st):
"""
Return a list of lists of indices of restrain distances
in the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:rtype: list
:return: contains lists of restrain distances
"""
return sfu._get_restrain_group_idxs(st, RESTRAINED_DISTANCES_KEY)
[docs]def get_restrain_angle_idxs(st):
"""
Return a list of lists of indices of restrain angles
in the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:rtype: list
:return: contains lists of restrain angles
"""
return sfu._get_restrain_group_idxs(st, RESTRAINED_ANGLES_KEY)
[docs]def get_restrain_dihedral_idxs(st):
"""
Return a list of lists of indices of restrain dihedrals
in the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:rtype: list
:return: contains lists of restrain dihedrals
"""
return sfu._get_restrain_group_idxs(st, RESTRAINED_DIHEDRALS_KEY)
[docs]def get_jaguar_keywords_list(jaguar_keywords_dict):
"""
Return the Jaguar keywords list from the given dict.
:type jaguar_keywords_dict: OrderedDict
:param jaguar_keywords_dict: the Jaguar keywords dict
:rtype: list
:return: the Jaguar keywords list
"""
return ['='.join(item) for item in jaguar_keywords_dict.items()]
[docs]def type_cast_seed(seed):
"""
Type cast the seed.
:type seed: str or unicode
:param seed: seed for random number generator
:rtype: int
:return: the seed
"""
return parserutils.type_random_seed(seed,
seed_min=go.CONF_SEARCH_SEED_RANGE[0],
seed_max=go.CONF_SEARCH_SEED_RANGE[1])
[docs]def type_cast_jaguar_keywords(jaguar_keywords,
reserved_keys=RESERVED_JAGUAR_KEYS,
exception_type=argparse.ArgumentTypeError):
"""
Type cast the Jaguar keywords.
:type jaguar_keywords: str or unicode or list
:param jaguar_keywords: the Jaguar keywords, a whitespace
delimited string of '<key>=<value>' tokens or a list
of such tokens
:type reserved_keys: set
:param reserved_keys: contains reserved Jaguar keys
:type exception_type: type
:param exception_type: the exception type to raise if invalid
:rtype: OrderedDict
:return: the Jaguar keywords OrderedDict
"""
if not jaguar_keywords:
msg = ('Jaguar keywords must be provided.')
raise exception_type(msg)
if isinstance(jaguar_keywords, list):
jaguar_keywords = ' '.join(jaguar_keywords)
jaguar_keywords = str(jaguar_keywords)
try:
msutils.keyword_string_to_dict(jaguar_keywords)
except ValueError as err:
msg = ('The following issue has been found with the '
'specified Jaguar keywords: {err}. Such options '
'must be specified using whitespace separated '
'<key>=<value> pairs.').format(err=str(err))
raise exception_type(msg)
adict = OrderedDict([token.split('=') for token in jaguar_keywords.split()])
if reserved_keys.intersection(set(adict.keys())):
msg = ('The following Jaguar keys are reserved for this workflow: '
'{keys}.').format(keys=sorted(reserved_keys))
raise exception_type(msg)
return adict
[docs]def check_ff_assignment(sts, ffld_name=None):
"""
Check the assignment of the given force field to the given
structures.
:type sts: list
:param sts: contains schrodinger.structure.Structure
:type ffld_name: str
:param ffld_name: the force field name.
:raise ValueError: if invalid
"""
if ffld_name is None:
ffld_name = msutils.get_default_forcefield().name
# in case incoming structures are in the centroid representation
sts = [st.copy() for st in sts]
for st in sts:
etatoggle.toggle_structure(st, out_rep=parserutils.ETA)
# currently the following does not support an mmlewis_apply
# clean up layer (which can change the incoming atom types),
# see minimize.Minimizer.setStructure for more details
ffld_version = mm.opls_name_to_version(ffld_name)
invalid_sts = desmondutils.find_forcefield_invalid_structures(
sts, ffld_version)
invalid_idxs = [str(sts.index(st) + 1) for st in invalid_sts]
if invalid_idxs:
idxs = ','.join(invalid_idxs)
msg = ('The {name} force field could not be assigned to '
'the following structures: {idxs}.').format(name=ffld_name,
idxs=idxs)
raise ValueError(msg)
[docs]def get_molecular_weight(st, idxs=None, decimal=None):
"""
Return the molecular weight (amu) taken over the given atom
indices in the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:type idxs: list
:param idxs: the atom indices
:type decimal: None or int
:param decimal: an optional number of decimal places to which
to round the weight
:rtype: float
:return: the molecular weight (amu)
"""
# both st.total_weight and atom.atomic_weight use implicit hydrogens
# so do it by hand
if not idxs:
idxs = range(1, st.atom_total + 1)
weight = 0
for idx in idxs:
weight += mm.mmelement_get_atomic_weight_by_atomic_number(
st.atom[idx].atomic_number)
if decimal is not None:
return round(weight, decimal)
return weight
[docs]def check_centroid_rep(st):
"""
Check the centroid representation of the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:raise ValueError: if invalid
"""
dummies = etatoggle.get_dummy_atoms(st)
if not dummies:
return
dummy_idxs = [dummy.index for dummy in dummies]
# if using the centroid representation then those dummy
# atoms should be at the end of the atom list
non_dummies = set(range(1, st.atom_total + 1)).difference(dummy_idxs)
if len(non_dummies) < max(non_dummies):
msg = ('Dummy atoms used for centroid representations must be at the '
'end of the atom list.')
raise ValueError(msg)
# dummies should have no metadata set
for key in (KEEP_ATOM_KEY, RESTRAINED_ATOM_KEY, SUPERPOSITION_ATOM_KEY):
for atom in dummies:
if atom.property.get(key):
msg = ('Dummy atoms used for centroid representations can not '
'be used as keep, restrained, or superposition atoms.')
raise ValueError(msg)
# dummies should not be used in restraints
for key in (RESTRAINED_DISTANCES_KEY, RESTRAINED_ANGLES_KEY,
RESTRAINED_DIHEDRALS_KEY):
jdxs = set([
idx for idxs in sfu._get_restrain_group_idxs(st, key)
for idx in idxs
])
if jdxs.intersection(dummy_idxs):
msg = ('Restrained distances, angles, or dihedrals can not use '
'dummy atoms.')
raise ValueError(msg)
[docs]def check_reaction_wf_structures(rxn_sts,
ffld_name=None,
mass_conserved=False,
keep_atoms_only=False):
"""
Check the given reaction workflow structues.
:type rxn_sts: str or list
:param rxn_sts: the reaction workflow structures, a file
name or list of schrodinger.structure.Structure
:type ffld_name: str
:param ffld_name: the force field name to use when optionally
checking its assignment to the given structures
:type mass_conserved: bool
:param mass_conserved: check that mass is conserved (see also
keep_atoms_only kwarg)
:type keep_atoms_only: bool
:param keep_atoms_only: specifies that only keep atoms be considered
when checking if mass is conserved (see also mass_conserved kwarg)
:raise ValueError: if invalid
"""
if isinstance(rxn_sts, str):
if not fileutils.is_maestro_file(rxn_sts):
msg = ('The file {afile} is not a Maestro file.').format(
afile=rxn_sts)
raise ValueError(msg)
if not fileutils.get_basename(rxn_sts).endswith(REACTION_WF_TAG):
msg = ('The file {afile} is not a reaction workflow file '
'as its base name does not end in {tag}.').format(
afile=rxn_sts, tag=REACTION_WF_TAG)
raise ValueError(msg)
rxn_sts = [st for st in structure.StructureReader(rxn_sts)]
titles = set()
for idx, st in enumerate(rxn_sts, 1):
if not st.property.get(REACTION_WF_STRUCTURE_KEY):
msg = ('Structure number {idx} does not belong to a '
'reaction workflow.').format(idx=idx)
raise ValueError(msg)
if not st.property.get(CONFORMERS_GROUP_KEY):
msg = ('Structure number {idx} does not belong to a '
'conformer group.').format(idx=idx)
raise ValueError(msg)
titles.add(st.title)
if len(titles) < len(rxn_sts):
msg = ('Structures must have unique titles.')
raise ValueError(msg)
sibling_conformers_dict = bin_structures_by_property(
rxn_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY)
getters = [
get_restrain_distance_idxs, get_restrain_angle_idxs,
get_restrain_dihedral_idxs
]
found_parents = True
masses = set()
for sibling_group, conformers_dict in sibling_conformers_dict.items():
num_tss = 0
parents = set()
mass = 0
for conformer_group, conformers in conformers_dict.items():
# check conformers
check_conformers(conformers, conformer_group)
first_conformer = conformers[0]
num_tss += first_conformer.property.get(
TRANSITION_STATE_STRUCTURE_KEY, 0)
parents.add(first_conformer.property.get(PARENT_SIBLING_GROUPS_KEY))
# check restraint idxs
keep_idxs = sfu.get_idxs_marked_atoms(first_conformer,
KEEP_ATOM_KEY)
if keep_idxs:
all_restrain_idxs = set()
for getter in getters:
for restrain_idxs in getter(first_conformer):
all_restrain_idxs.update(restrain_idxs)
if not all_restrain_idxs.issubset(keep_idxs):
msg = (
f'The structures in conformer group {conformer_group} '
'have atoms involved in distance, angle, and/or dihedral '
'restraints that are not among the found keep atoms.')
raise ValueError(msg)
if keep_idxs and keep_atoms_only:
mass += get_molecular_weight(first_conformer, idxs=keep_idxs)
else:
mass += get_molecular_weight(first_conformer)
mass = round(mass, MASS_N_DECIMAL)
masses.add(mass)
# check transition states
if num_tss > 1:
msg = (
f'The structures in sibling group {sibling_group} can not have '
'more than a single transition state.')
raise ValueError(msg)
# check parent equivalency
if len(parents) > 1:
msg = (
f'The structures in sibling group {sibling_group} do not have '
'identical parents.')
raise ValueError(msg)
# check parent existence
parents_tmp = list(parents)[0]
if parents_tmp:
for parent in parents_tmp.split(PARENT_SEPARATOR):
if parent not in sibling_conformers_dict.keys():
msg = (
f'Sibling group {sibling_group} specifies a non-existent parent '
f'sibling group {parent}.')
raise ValueError(msg)
# check reactant siblings
if None in parents:
if not found_parents:
msg = (f'Structures in sibling group {sibling_group} '
'have no parents specified. All groups '
'other than reactant siblings must '
'have parent groups defined.')
raise ValueError(msg)
found_parents = False
if found_parents:
msg = ('No reactants were found.')
raise ValueError(msg)
if mass_conserved and len(masses) > 1:
msg = ('Mass is not conserved along the reaction path.')
raise ValueError(msg)
if ffld_name:
check_ff_assignment(rxn_sts, ffld_name=ffld_name)
[docs]def bin_structures_by_property(sts, key=CONFORMERS_GROUP_KEY, inner_key=None):
"""
Return a dictionary of structures binned by a property with
the given key. If inner_key is provided then return a dictionary
of dictionaries of structures with the inner dictionaries keyed
by inner_key and outer dictionaries keyed by key.
:type sts: list
:param sts: the structures
:type key: str
:param key: the key for the property by which to bin
:type inner_key: str
:param inner_key: additionally bin by this inner_key
:rtype: dict or dict of dict
:return: dictionary where keys are properties and values
are lists of structures or dictionary of dictionaries
where the outer dictionary is keyed by key and inner
dictionary is keyed by inner_key and values of the inner
dictionary are lists of structures
"""
# for backwards compatibility in the data structure if the inner
# property exists but the outer property does not then for the
# outer property use the inner property and update the structure
adict = {}
for st in sts:
aprop = st.property.get(key)
if inner_key:
inner_aprop = st.property.get(inner_key)
if aprop is None and inner_aprop:
aprop = inner_aprop
st.property[key] = aprop
adict.setdefault(aprop, {}).setdefault(inner_aprop, []).append(st)
else:
adict.setdefault(aprop, []).append(st)
return adict
def _check_structures_for_property_equivalence(sts,
key=RESTRAINED_ATOM_KEY,
is_atom_key=True):
"""
Check that the given structures have equivalent property values
for the given structure or atom key. If it is an atom key then
the key must be a boolean key as this function only ensures that
the groups of atoms for which the given property exists are
equivalent as opposed to checking equivalence among the property
values.
:type sts: list
:param sts: the structures
:type key: str
:param key: the key for the property for which to check
equivalence
:type is_atom_key: bool
:param is_atom_key: True if the given key is an atom key in which
case only property existence is checked as opposed to property
values
:raise ValueError: if invalid
"""
aprops = set()
for st in sts:
if is_atom_key:
aprop = tuple(sfu.get_idxs_marked_atoms(st, key))
if not aprop:
aprop = None
else:
aprop = st.property.get(key)
aprops.add(aprop)
if len(aprops) > 1:
msg = (
'The given structures have inequivalent {key} properties.').format(
key=key)
raise ValueError(msg)
[docs]def postprocess_conformational_search(conformer, out_mae_path, ffld_name=None):
"""
Postprocess a MacroModel conformational search. Rewrite the given
MacroModel out `*mae` file so that the conformers in it have properly
updated properties.
:type conformer: structure.Structure
:param conformer: a representative conformer that seeded the
search being postprocessed
:type out_mae_path: str
:param out_mae_path: the file path to the MacroModel out `*mae` file
:type ffld_name: str
:param ffld_name: the name of the force field to use for the search. If
`None`, then defaults to the latest forcefield available.
"""
energy_key, rel_energy_key = get_mmod_energy_keys(ffld_name)
# see MATSCI-5508 where it was seen that charge and multiplicity
# properties sometimes do not propagate
charge = conformer.property.get(CHARGE_KEY)
multiplicity = conformer.property.get(MULTIPLICITY_KEY)
# see MATSCI-7495 since using MacroModel's "serial" search relative energies
# need updating, energies in kJ/mol
min_energy = min(st.property[energy_key]
for st in structure.StructureReader(out_mae_path))
ff_rel_energy_key_g = f'{rel_energy_key}{FFLD_ENERGY_GLOBAL_TAG}'
conformers_out = []
for st in structure.StructureReader(out_mae_path):
if charge is not None:
st.property[CHARGE_KEY] = charge
if multiplicity is not None:
st.property[MULTIPLICITY_KEY] = multiplicity
st.property[ff_rel_energy_key_g] = st.property[energy_key] - min_energy
conformers_out.append(st)
structure.write_cts(conformers_out, out_mae_path)
[docs]def get_int_tuples_from_str_property(st, key, separator=SEPARATOR):
"""
Return a list of tuples of integers from the given string structure
property.
:type st: schrodinger.structure.Structure
:param st: the structure
:type key: str
:param key: the property key
:type separator: str
:param separator: the tuple separator used for the given property
:rtype: list
:return: contains tuples of integers
"""
aproperty = st.property.get(key)
if not aproperty:
return []
else:
return [eval(token) for token in aproperty.split(separator)]
[docs]def update_index_properties(st, old_to_new):
"""
Update the index properties of the given structure.
:type st: `structure.Structure`
:param st: the structure
:type old_to_new: dict
:param old_to_new: a map of old-to-new atom indices
"""
keys = (RESTRAINED_DISTANCES_KEY, RESTRAINED_ANGLES_KEY,
RESTRAINED_DIHEDRALS_KEY)
for key in keys:
idxs = _get_new_restrain_group_idxs(st, key, old_to_new)
text = sfu.get_idx_groups_str(idxs)
if text:
st.property[key] = text
[docs]def get_core_idxs(st):
"""
Return a set of atom indices for the core of the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:rtype: set
:return: core atom indices
"""
# keep atoms are not to be considered as core atoms
atom_keys = [SUPERPOSITION_ATOM_KEY, RESTRAINED_ATOM_KEY]
st_keys = [
RESTRAINED_DISTANCES_KEY, RESTRAINED_ANGLES_KEY,
RESTRAINED_DIHEDRALS_KEY
]
idxs = set()
for atom in st.atom:
for key in atom_keys:
if atom.property.get(key):
idxs.add(atom.index)
for key in st_keys:
for jdxs in sfu._get_restrain_group_idxs(st, key):
idxs.update(jdxs)
return idxs
def _get_new_restrain_group_idxs(st, key, old_to_new, offset=0):
"""
Return new restrain group indices.
:type st: schrodinger.structure.Structure
:param st: the structure
:type key: str
:param key: determines the type of restrain coordinate
:type old_to_new: dict
:param old_to_new: the old-to-new atom index map
:type offset: int
:param offset: an offset added to all new restrain
group indices, useful when assembling structures
from parts of other structures
:rtype: list
:return: contains new restrain group indices
"""
return sfu._get_new_restrain_group_idxs(st, key, old_to_new, offset=offset)
[docs]class ReactionWorkflowFile(RepresentativeConformersMixin):
"""
Manage a reaction workflow file.
"""
[docs] def __init__(self, rxn_sts):
"""
Create an instance.
:type rxn_sts: str or list
:param rxn_sts: the reaction workflow structures, a file
name or list of schrodinger.structure.Structure
"""
if isinstance(rxn_sts, str):
rxn_sts = [st for st in structure.StructureReader(rxn_sts)]
self.sibling_conformers_dict = bin_structures_by_property(
rxn_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY)
[docs] def getReactantsSiblingGroupName(self):
"""
Return the reactants sibling group name.
:rtype: str
:return: the reactants sibling group name
"""
for sibling_group, conformer_group, conformer in self.representativeConformers(
):
parents = conformer.property.get(PARENT_SIBLING_GROUPS_KEY)
if not parents:
return sibling_group
[docs]class UniqueGeomMixin:
"""
Manage uniqueifying structures by geometry.
"""
def _setUniqueGeomDict(self, sts):
"""
Set the unique geometry dictionary.
:type sts: list[`schrodinger.structure.Structure`]
:param sts: the structures
"""
if not self.dedup_geom_eps:
# this means deduplicating is not being performed
return
# see MATSCI-10591 - set up a cache for unique geometries to dedup
# expensive calculations like MacroModel, Jaguar, etc., arbitrarily
# use the first geometry
#
# see MATSCI-11251 - JMSWF uses cleaned structure titles to name
# Jaguar subjob input files, while the title (and entry name) on the
# structure in the input file is unchanged, the titles (and entry names)
# of structures in the main JMSWF output Maestro file are cleaned, so
# used cleaned titles here
unique_geom_dict = bin_by_geometry(sts, eps=self.dedup_geom_eps)
self.unique_geom_dict = {
jobutils.clean_string(ref_st.title): [ref_st] + other_sts
for ref_st, other_sts in unique_geom_dict.items()
}
def _getUniqueGeomSts(self, sts):
"""
Return representative structures with unique geometries.
:type sts: list[`schrodinger.structure.Structure`]
:param sts: the structures
:rtype: list[`schrodinger.structure.Structure`]
:return: the structures
"""
# arbitrarily use the first geometry
return [sts[0] for sts in self.unique_geom_dict.values()] or sts
def _duplicateUniqueGeomSts(self):
"""
Duplicate the structures choosen as unique geometries.
:rtype: list[`schrodinger.structure.Structure`]
:return: the structures
"""
# for conformers created from an original structure JMSWF
# will append '-<idx>'
new_rep_sts = []
for rep_st in self.rep_sts:
new_rep_sts.append(rep_st)
if not self.unique_geom_dict:
continue
orig_title, title_ext = get_orig_title(rep_st.title,
self.unique_geom_dict)
_, ename_ext = get_orig_title(
rep_st.property[msprops.ENTRY_NAME_PROP], self.unique_geom_dict)
sts = self.unique_geom_dict[orig_title]
orig_st, other_sts = sts[0], sts[1:]
for other_st in other_sts:
_other_st = rep_st.copy()
_other_st.property.update(other_st.property)
_other_st.title = f'{jobutils.clean_string(other_st.title)}{title_ext}'
_other_st.property[msprops.ENTRY_NAME_PROP] = \
f'{jobutils.clean_string(other_st.title)}{ename_ext}'
new_rep_sts.append(_other_st)
return new_rep_sts
def _updateUniqueGeoms(self, sibling_conformers_dict, title_dict):
"""
Update the unique geometry dictionary and use it to update the
given structure dictionary.
:type sibling_conformers_dict: dict
:param sibling_conformers_dict: dictionary of dictionaries where the
outer dictionary is keyed by sibling and inner dictionary is keyed by
conformer and values of the inner dictionary are lists of structures
:type title_dict: dict
:param title_dict: dictionary mapping the structure titles of generated
rotamers to the structure title of the structure from which they were
generated
:rtype: dict
:return: dictionary of dictionaries where the outer dictionary is keyed
by sibling and inner dictionary is keyed by conformer and values of
the inner dictionary are lists of structures
"""
# relative to all original structures the following list contains
# those representatives that have unique geometries plus any of
# their guess conformers
new_sts = flatten_sibling_conformers(sibling_conformers_dict)
# guess conformers have been generated for the representatives, now
# generate the same for the structures being represented
all_sts = []
for new_st in new_sts:
all_sts.append(new_st)
if not self.unique_geom_dict:
continue
# structure titles of the generated guess conformers for the
# representative may be appended with some identifier, for
# example 'Fe-complex', 'Fe-complex_252_324', etc.
orig_title = title_dict[new_st.title]
old_sts = self.unique_geom_dict[orig_title]
new_xyz = new_st.getXYZ()
# 0th structure is the original old structure
for old_st in old_sts[1:]:
tmp_st = old_st.copy()
tmp_st.setXYZ(new_xyz)
all_sts.append(tmp_st)
self._setUniqueGeomDict(all_sts)
sts = self._getUniqueGeomSts(all_sts)
return bin_structures_by_property(sts,
key=SIBLING_GROUP_KEY,
inner_key=CONFORMERS_GROUP_KEY)
[docs]class ReactionWorkflowEnergyAnalysis(ReactionWorkflowFile, UniqueGeomMixin):
"""
Manage a reaction workflow energy analysis.
"""
[docs] def __init__(self,
rxn_sts,
energy_keys,
dedup_geom_eps=parserutils.DEFAULT_DEDUP_GEOM_EPS):
"""
Create an instance.
:type rxn_sts: str or list
:param rxn_sts: the reaction workflow structures, a file
name or list of schrodinger.structure.Structure
:type energy_keys: list
:param energy_keys: structure property energy keys to consider, if it
is temperature dependent then include the temperature (K) as a number
followed by 'K' in the key and the corresponding energy must be in
supported units (au, kcal/mol, eV, kJ/mol) and must be present in the
key as '(<units>)'
:type dedup_geom_eps: float
:param dedup_geom_eps: reduce the number of calculations by deduplicating
the input structures based on geometry, using this threshold
in Ang., and only calculating the representatives, a value of
zero means no deduplicating
"""
super().__init__(rxn_sts)
self.energy_keys = energy_keys
self.dedup_geom_eps = dedup_geom_eps
# Cached units, see getUnitsData
self._units_data = {}
self.unique_geom_dict = {}
[docs] def getUnitsData(self, prop):
"""
Get conversion factor to kcal/mol, pressure and temperature from a
property, saving the data into a dict.
:param str prop: Property to use
:rtype: float or None, float or None, float or None
:return: Conversion to kcal/mol, pressure, temperature
"""
try:
return self._units_data[prop]
except KeyError:
val = (self.getKcalPerMolConversion(prop), self.getPressure(prop),
self.getTemperature(prop))
self._units_data[prop] = val
return val
[docs] @staticmethod
def getTemperature(energy_key):
"""
Return the temperature (K) for the given energy key.
:type energy_key: str
:param energy_key: structure property energy key
:rtype: float, None
:return: the temperature (K) if there is one
"""
return jaguarworkflows.get_temperature(energy_key)
[docs] @staticmethod
def getPressure(energy_key):
"""
Return the pressure (atm) for the given energy key.
:type energy_key: str
:param energy_key: structure property energy key
:rtype: float, None
:return: the pressure (atm) if there is one
"""
return jaguarworkflows.get_pressure(energy_key)
[docs] @staticmethod
def getUnits(energy_key):
"""
Return the units for the given energy key.
:type energy_key: str
:param energy_key: structure property energy key
:rtype: str, None
:return: the units if there is one
"""
# units like 'kcal/mol' in '(kcal/mol)'
match = re.search(r'\((\D+)\)', energy_key)
if match:
return match.group(1)
base_energy_key = energy_key.split(STAGE_SEPARATOR)[0]
_units = jmswfu.JAGUAR_PROP_UNITS_DICT.get(base_energy_key)
if _units:
return _units[1:-1]
[docs] @staticmethod
def getKcalPerMolConversion(energy_key):
"""
Return the kcal/mol conversion factor for the given energy key.
:type energy_key: str
:param energy_key: structure property energy key
:rtype: float, None
:return: the kcal/mol conversion factor if there is one
"""
_units = ReactionWorkflowEnergyAnalysis.getUnits(energy_key)
if _units in HARTREE_UNITS:
conversion = units.KCAL_PER_MOL_PER_HARTREE
elif _units in KCAL_PER_MOL_UNITS:
conversion = 1
elif _units in EV_UNITS:
conversion = units.KCAL_PER_MOL_PER_EV
elif _units in KJOULES_PER_MOL_UNITS:
conversion = units.KCAL_PER_MOL_PER_EV
conversion /= units.KJOULES_PER_MOL_PER_EV
else:
conversion = None
return conversion
[docs] def getPropertyEnsemble(self,
conformers_dict,
energy_key,
conversion,
temp,
do_boltzmann,
include_x_terms=False,
only_lowest_energy=False,
property_key=None,
atom_idx=None):
"""
Return an ensemble of properties for the given conformers
dictionary of siblings and given energy key.
:type conformers_dict: dict
:param conformers_dict: keys are conformer group names, values are
lists of `structure.Structure`
:type energy_key: str
:param energy_key: the relevant energy key
:type conversion: float
:param conversion: the energy conversion factor to kcal/mol
:type temp: float
:param temp: the temperature in K
:type do_boltzmann: bool
:param do_boltzmann: if True perform a Boltzmann average, otherwise an
algebraic average
:type include_x_terms: bool
:param include_x_terms: whether to include cross terms in the
conformational averaging
:type only_lowest_energy: bool
:param only_lowest_energy: use only the lowest energy conformer
rather than averaging over conformers
:type property_key: str
:param property_key: the relevant property key, if not specified
it is the same as the energy key
:type atom_idx: int or None
:param atom_idx: if an integer then the given property key is for an
atomic property and this is the atom index, if None then the
given property key is a structure property
:raise ReactionWorkflowException: if there is an issue
:rtype: list
:return: ensemble of properties
"""
# being able to use both siblings and a property key depends
# on if the property key is for a size extensive property,
# i.e. property(ABC) = property(A) + property(B) + property(C),
# such is the case for energy but not for example bite angle, etc.
if include_x_terms and only_lowest_energy:
msg = ('Simultaneous use of include_x_terms and '
'only_lowest_energy is not supported.')
raise ReactionWorkflowException(msg)
if atom_idx and not property_key:
msg = ('Atomic properties require an input property key.')
raise ReactionWorkflowException(msg)
if atom_idx and len(conformers_dict) > 1:
msg = ('Atomic properties for sibling groups is not supported.')
raise ReactionWorkflowException(msg)
property_key = property_key or energy_key
# the following getter takes a list that contains a sublist of conformers
# for each reactant, if including cross terms then return the iterator
# for all possible collections of mixed conformers of reactants else return
# the original list
getter = lambda x: itertools.product(*x) if include_x_terms else x
ensemble = []
for sample in getter(conformers_dict.values()):
# see MATSCI-12187 - deduplicate conformational averaging by geometry
self._setUniqueGeomDict(sample)
sample = self._getUniqueGeomSts(sample)
energies_properties = []
for st in sample:
energy = st.property.get(energy_key)
if atom_idx:
if atom_idx <= st.atom_total:
aproperty = st.atom[atom_idx].property.get(property_key)
else:
aproperty = None
else:
aproperty = st.property.get(property_key)
pair = (energy, aproperty)
if None in pair:
return []
energies_properties.append(pair)
min_energy, min_property = min(energies_properties,
key=lambda x: x[0])
energies, properties = list(zip(*energies_properties))
if include_x_terms:
ensemble.append(sum(properties))
elif only_lowest_energy:
ensemble.append(min_property)
elif do_boltzmann:
# calling code needs to ensure that conversion brings energies
# to kcal/mol and that if the energy is temperature dependent
# that temperature in K should be used, wp is the Boltzmann
# averaged property, i runs over conformers, p_i is the property
# being Boltzmann averaged, beta is 1/(kT) in units of mol/kcal,
# E_i and E_min are in kcal/mol, and Z is the partition function
#
# wp = (sum_i p_i * exp(-beta * (E_i - E_min))) / Z
# Z = sum_i exp(-beta * (E_i - E_min))
#
deltas = conversion * (numpy.array(energies) - min_energy)
factors = numpy.exp(-1 * deltas / (IDEAL_GAS_CONSTANT * temp))
weighted_property = sum(properties * factors)
partition_f = sum(factors)
weighted_property /= partition_f
ensemble.append(weighted_property)
else:
ensemble.append(numpy.mean(properties))
if not include_x_terms and ensemble:
# see MATSCI-11763 - For sibling groups containing a mix of intermediates
# and transition states treat the former as a spectator and use the
# most negative frequency as the representative for the group, otherwise
# assume the property is size-extensive and can be summed over the
# siblings to arrive at the total property
positive = [0 <= value for value in ensemble]
if (property_key == jaguarworkflows.LOWEST_FREQUENCY_PROP and
any(positive) and not all(positive)):
afunc = min
else:
afunc = sum
ensemble = [afunc(ensemble)]
return ensemble
[docs] def getProperties(self,
include_x_terms=False,
only_lowest_energy=False,
property_key=None,
atomic=False,
temps=None):
"""
Return the properties.
:type include_x_terms: bool
:param include_x_terms: whether to include cross terms
:type only_lowest_energy: bool
:param only_lowest_energy: use only the lowest energy conformer
rather than averaging over conformers
:type property_key: str
:param property_key: the property key, if not specified energy
energy keys are used
:type atomic: bool
:param atomic: if True then the given property key is an atomic
property, otherwise is a structure property
:type temps: list
:param temps: temperatures in K, only used for temperature independent
energy and property keys
:raise ReactionWorkflowException: if there is an issue
:rtype: list[EnergyAnalysisProperty]
:return: the properties
"""
if include_x_terms and only_lowest_energy:
msg = ('Simultaneous use of include_x_terms and '
'only_lowest_energy is not supported.')
raise ReactionWorkflowException(msg)
if atomic and not property_key:
msg = ('Atomic properties require an input property key.')
raise ReactionWorkflowException(msg)
temps = temps or [jmswfu.DEFAULT_TEMP_START]
properties = []
for energy_key in self.energy_keys:
conversion, energy_key_press, energy_key_temp = \
self.getUnitsData(energy_key)
do_boltzmann = bool(conversion)
conversion = conversion or 1
this_property_key = property_key or energy_key
_, this_property_key_press, this_property_key_temp = \
self.getUnitsData(this_property_key)
# if property temperature or pressure differs from that
# of the energy used for averaging then skip it
t_state = (energy_key_temp and this_property_key_temp and
energy_key_temp != this_property_key_temp)
p_state = (energy_key_press and this_property_key_press and
energy_key_press != this_property_key_press)
if t_state or p_state:
continue
# get the temperature used for averaging, comes from either
# the given energy key, property key, or specified temperatures
if energy_key_temp:
these_temps = [energy_key_temp]
elif this_property_key_temp:
these_temps = [this_property_key_temp]
else:
these_temps = list(temps)
press = this_property_key_press or energy_key_press or 1
for temp in these_temps:
# get the key that will hold the average property
if this_property_key_temp:
avg_property_key = this_property_key
else:
ext = jaguarworkflows.get_temp_press_key_ext(temp, press)
avg_property_key = f'{this_property_key}{ext}'
for sibling_group, conformers_dict in self.sibling_conformers_dict.items(
):
representative_conformers = tuple(
min(conformers, key=lambda x: x.property[energy_key])
for conformers in conformers_dict.values())
# get atom indices if doing atomic properties
if atomic:
if len(conformers_dict) > 1:
msg = (
'Atomic properties for sibling groups is not '
'supported.')
raise ReactionWorkflowException(msg)
atom_idxs = list(
range(1,
representative_conformers[0].atom_total + 1))
else:
atom_idxs = [None]
for atom_idx in atom_idxs:
ensemble = tuple(
self.getPropertyEnsemble(
conformers_dict,
energy_key,
conversion,
temp,
do_boltzmann,
include_x_terms=include_x_terms,
only_lowest_energy=only_lowest_energy,
property_key=this_property_key,
atom_idx=atom_idx))
if not ensemble:
continue
# define property object
aproperty = EnergyAnalysisProperty(
sibling_group=sibling_group,
conformer_groups=tuple(conformers_dict.keys()),
representative_conformers=representative_conformers,
temperature=temp,
pressure=press,
energy_key=energy_key,
property_key=this_property_key,
avg_property_key=avg_property_key,
atom_idx=atom_idx,
ensemble=ensemble,
include_x_terms=include_x_terms,
only_lowest_energy=only_lowest_energy)
properties.append(aproperty)
return properties
[docs] def getConfAvgRelEnergies(self,
include_x_terms=True,
only_lowest_energy=False):
"""
Return the conformationally averaged energies relative to that
of the reactants.
:type include_x_terms: bool
:param include_x_terms: whether to include cross terms in the
conformational averaging
:type only_lowest_energy: bool
:param only_lowest_energy: use only the lowest energy conformer
rather than averaging over conformers
:raise ReactionWorkflowException: if there is an issue
:rtype: dict
:return: keys are sibling group names, values are dicts with
energy keys as keys and energy values as values
"""
if include_x_terms and only_lowest_energy:
msg = ('Simultaneous use of include_x_terms and '
'only_lowest_energy is not supported.')
raise ReactionWorkflowException(msg)
# with cross terms in the conformational averaging all possible
# collections of conformers within a sibling group as well
# between sibling groups are included in the averaging which
# is done last, for example a high energy conformer of A is
# allowed to react with a low energy conformer of B, etc. and that
# relative to some collection of conformers from another sibling
# group both (high, low), (low, high), etc. collections of (A, B)
# are included, without cross terms the averaging is done first
# over each molecule in the sibling group thereby reducing the
# energy of the group down to a single number
# canonical ensemble
# relevant energies reported relative to reactants
reactants_sibling_group = self.getReactantsSiblingGroupName()
reactants_conformers_dict = self.getReactantsConformersDict()
# for each energy key for both reactants and non-reactants
# obtain a conformeric ensemble of energies summed over siblings
# and collect energy differences for all pairs then perform a
# Boltzmann average if the energy is both temperature dependent
# and of known units and including cross terms, otherwise perform
# an algebraic average
edict = {}
for energy_key in self.energy_keys:
conversion = self.getKcalPerMolConversion(energy_key)
temp = self.getTemperature(energy_key)
do_boltzmann = conversion and temp
conversion = conversion or 1
reactants_energies = self.getPropertyEnsemble(
reactants_conformers_dict,
energy_key,
conversion,
temp,
do_boltzmann,
include_x_terms=include_x_terms,
only_lowest_energy=only_lowest_energy)
if not reactants_energies:
# this means that at least a single reactant doesn't
# have the given energy_key defined, so skip the energy_key
continue
header = self.getHeader(energy_key)
edict.setdefault(reactants_sibling_group, {})[header] = 0.
for sibling_group, conformers_dict in self.sibling_conformers_dict.items(
):
if sibling_group == reactants_sibling_group:
continue
energies = self.getPropertyEnsemble(
conformers_dict,
energy_key,
conversion,
temp,
do_boltzmann,
include_x_terms=include_x_terms,
only_lowest_energy=only_lowest_energy)
if not energies:
# this means that at least a single non-reactant child doesn't
# have the given energy_key defined, so first undefine it for
# all sibling groups and then skip the energy_key by breaking
# out of the non-reactant child loop
for _edict in edict.values():
_edict.pop(header, None)
break
deltas = numpy.array([
conversion * (enr - er) for enr, er in itertools.product(
energies, reactants_energies)
])
if do_boltzmann and include_x_terms:
factors = numpy.exp(-1 * deltas /
(IDEAL_GAS_CONSTANT * temp))
avg_delta = sum([
delta * factor
for delta, factor in zip(deltas, factors)
])
partition_f = sum(factors)
# see MATSCI-7153 - where the denominator can be problematic
with warnings.catch_warnings():
warnings.filterwarnings('error')
try:
avg_delta /= partition_f
except RuntimeWarning:
msg = (
'Large energy differences with respect to reactants '
'have been detected. This is typically a sign of a severely '
'imbalanced chemical reaction in the reaction workflow input '
'file.')
raise ReactionWorkflowException(msg)
else:
avg_delta = numpy.mean(deltas)
edict.setdefault(sibling_group, {})[header] = avg_delta
return edict
[docs] def getGraph(self):
"""
Return a NetworkX directed graph of the reaction workflow
energy level diagram.
:rtype: networkx.DiGraph
:return: nodes are sibling group names and have energy
dictionary kwargs, edges are directed (parent,
child) pairs
"""
edict_w_x = self.getConfAvgRelEnergies(include_x_terms=True,
only_lowest_energy=False)
edict_wo_x = self.getConfAvgRelEnergies(include_x_terms=False,
only_lowest_energy=False)
edict_lowest = self.getConfAvgRelEnergies(include_x_terms=False,
only_lowest_energy=True)
graph = nx.DiGraph()
for sibling_group, conformers_dict in self.sibling_conformers_dict.items(
):
graph.add_node(sibling_group,
edict_w_x=edict_w_x[sibling_group],
edict_wo_x=edict_wo_x[sibling_group],
edict_lowest=edict_lowest[sibling_group])
# parents are equivalent just grab an arbitrary first item
parents = next(iter(conformers_dict.values()))[0].property.get(
PARENT_SIBLING_GROUPS_KEY)
if not parents:
continue
for parent_sibling_group in parents.split(PARENT_SEPARATOR):
graph.add_edge(parent_sibling_group, sibling_group)
return graph
[docs] @staticmethod
def getOrderedNodeNames(graph):
"""
Return an ordered list of node names from the given graph.
:type graph: networkx.DiGraph
:param graph: nodes are sibling group names and have energy
dictionary kwargs, edges are directed (parent,
child) pairs
:rtype: list
:return: contains ordered node names
"""
# for example, all lines point right, up-right, or down-right
# (just a non-physical example):
#
# I2 -- TS2
# / \ / \
# TS1 I3 P
# / \ /
# R I1
#
# is ordered as R,TS1,I1,I2,I3,TS2,P
# find the reactant node
for node in graph.nodes():
if not list(graph.predecessors(node)):
reactant_node = node
break
# start with the reactant node
nodes = [reactant_node]
current_nodes = [reactant_node]
# collect nodes in order by adding for the current nodes
# all successor nodes that have no predecessor nodes that
# have not already been collected, currently successor nodes
# are sorted by name rather than energy, avoid adding the
# same node twice which can happen for loops in the graph,
# the current nodes to search are updated until all nodes
# have been collected
while len(nodes) < graph.number_of_nodes():
tmp = []
for current_node in current_nodes:
for successor_node in sorted(graph.successors(current_node)):
if not set(graph.predecessors(successor_node)).difference(
nodes):
if successor_node not in nodes:
nodes.append(successor_node)
tmp.append(successor_node)
current_nodes = list(tmp)
return nodes
[docs] def writeDataFiles(self):
"""
Write data files.
"""
# get ordered nodes
graph = self.getGraph()
node_names = self.getOrderedNodeNames(graph)
base_name = jobutils.get_jobname(DEFAULT_JOB_NAME)
out_files = []
# handle the conformationally averaged energies relative to reactants
for ext in REL_REACTANTS_EXTS:
conf_avg_rel_reactants_file = base_name + ext
kwarg = REL_EXT_TO_KWARG_DICT[ext]
with csv_unicode.writer_open(conf_avg_rel_reactants_file) as afile:
writer = csv.writer(afile)
for idx, name in enumerate(node_names):
edict = graph.nodes[name][kwarg]
if not idx:
headers = sorted(edict.keys())
writer.writerow([SIBLING_GROUP_NAME] + headers)
values = [name] + [edict[header] for header in headers]
writer.writerow(values)
out_files.append(conf_avg_rel_reactants_file)
# handle the conformationally averaged energies relative to parents
if len(node_names) > 1:
for ext in REL_PARENTS_EXTS:
conf_avg_rel_parents_file = base_name + ext
kwarg = REL_EXT_TO_KWARG_DICT[ext]
final_edict = {}
for child_name in node_names:
child_edict = graph.nodes[child_name][kwarg]
parents_edict = {}
for parent_name in graph.predecessors(child_name):
parent_edict = graph.nodes[parent_name][kwarg]
parents_edict[parent_name] = dict([
(key, child_edict[key] - parent_edict[key])
for key in child_edict.keys()
])
if parents_edict:
final_edict[child_name] = parents_edict
with open(conf_avg_rel_parents_file, 'w') as afile:
json.dump(final_edict,
afile,
sort_keys=True,
indent=4,
separators=(',', ':'))
out_files.append(conf_avg_rel_parents_file)
for out_file in out_files:
jobutils.add_outfile_to_backend(out_file)
[docs]def get_stage_idx(astr, is_filename=False):
"""
Return the stage index from the given string, can be a filename.
:type astr: str
:param astr: the string
:type is_filename: bool
:param is_filename: Whether astr is filename or not
:rtype: int
:return: the stage index
"""
if is_filename:
astr = fileutils.get_basename(astr)
head, stage_hash = astr.split(STAGE_SEPARATOR)
stage_idx = stage_hash.split('_')[0]
return int(stage_idx)
[docs]def check_TS_vetting(out_file):
"""
Return False if TS vetting failed in the given Jaguar
out file.
:type out_file: str
:param out_file: Jaguar out file
:rtype: bool
:return: False if TS vetting failed
"""
with open(out_file, 'r') as afile:
for line in afile:
if line.startswith('Unable to find any valid transition vectors.'):
return False
return True
[docs]def get_sub_host_str(obj, sub_host_attr, n_procs_attr):
"""
Return the command line -HOST argument for using a subhost.
:type obj: object
:param obj: the object, possibly having the given attributes
defined
:type sub_host_attr: str
:param sub_host_attr: the attribute for the subhost
:type n_procs_attr: str
:param n_procs_attr: the attribute for the number of processors
"""
try:
host = f'{getattr(obj, sub_host_attr)}:{getattr(obj, n_procs_attr)}'
except AttributeError:
host = jobutils.AUTOHOST
return host
[docs]class JMSWFException(Exception):
pass
[docs]class JMSWFMixin(RepresentativeConformersMixin):
"""
Manage a Jaguar multistage workflow.
"""
# various structure and atom properties dictate the behavior of this class
def _overwriteConformerEIDS(self):
"""
Overwrite the Maestro entry ID properties for the conformers.
"""
# needed for Jaguar multistage workflow
# see MATSCI-11669 - it is possible that input structures may have the
# entry ID property undefined, handle that by grabbing the maximum value
# that is defined in the input and incrementing from there
max_eid = 0
for conformers_dict in self.sibling_conformers_dict.values():
for conformers in conformers_dict.values():
for conformer in conformers:
eid = int(conformer.property.get(ENTRY_ID_KEY, 0))
max_eid = max(max_eid, eid)
idx = 0
fall_back_eid = max_eid
for conformers_dict in self.sibling_conformers_dict.values():
for conformers in conformers_dict.values():
fall_back_eid += 1
for conformer in conformers:
idx += 1
eid = conformer.property.get(ENTRY_ID_KEY,
str(fall_back_eid))
self.eids_dict.setdefault(eid, []).append(str(idx))
conformer.property[ENTRY_ID_KEY] = str(idx)
def _getJMSWFGenResKeywords(self, st, include_temp_press_data=True):
"""
Return the general reserved Jaguar multistage workflow keywords
for the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:type include_temp_press_data: bool
:param include_temp_press_data: whether to include the temperature
and pressure data
:rtype: OrderedDict
:return: the keywords
"""
adict = OrderedDict([('molchg', str(st.property.get(CHARGE_KEY, 0))),
('multip',
str(st.property.get(MULTIPLICITY_KEY, 1)))])
temp_data = getattr(self, 'temp_data', None)
press_data = getattr(self, 'press_data', None)
if include_temp_press_data and temp_data and press_data:
adict['tmpini'] = str(temp_data.temp_start)
adict['tmpstp'] = str(temp_data.temp_step)
adict['ntemp'] = str(temp_data.temp_n)
adict['press'] = str(press_data.press_start)
adict['press_step'] = str(press_data.press_step)
adict['npress'] = str(press_data.press_n)
# see MATSCI-10584 and linked cases, if the Jaguar input file has
# an initial guess wavefunction with a given dftname and basis, for
# example from a previous calculation, that are the same as the dftname
# and basis for the new job then igonly=1 (skip SCF) is set automatically
# if igonly was not set, if the new job asks for properties, which will be
# the general case, then the job will fail due to missing matrices required
# for the property evaluation, prevent this by always setting igonly=0
adict['igonly'] = '0'
return adict
def _getJMSWFOptKeywords(self, st, set_freq=True):
"""
Return the Jaguar multistage workflow optimization keywords
for the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:type set_freq: bool
:param set_freq: if True then set 'ifreq=1'
:rtype: str
:return: the string of keywords
"""
reserved_jaguar_keywords = self._getJMSWFGenResKeywords(
st, include_temp_press_data=set_freq)
reserved_jaguar_keywords['igeopt'] = '1'
if set_freq:
reserved_jaguar_keywords['ifreq'] = '1'
jaguar_keywords = self.jaguar_keywords.copy()
jaguar_keywords.update(reserved_jaguar_keywords)
return ' '.join(get_jaguar_keywords_list(jaguar_keywords))
@staticmethod
def _getJMSWFCoords(st, are_restraints=True):
"""
Return the Jaguar multistage workflow coords for the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:type are_restraints: bool
:param are_restraints: True if the coords are restraint coords, as
opposed to active coords
:rtype: list
:return: contains strings defining the coords
"""
atype_key_pairs = [(4, RESTRAINED_DISTANCES_KEY),
(5, RESTRAINED_ANGLES_KEY),
(6, RESTRAINED_DIHEDRALS_KEY)]
fields = ['{atype}', '{idxs}']
if are_restraints:
fields = [jmswfu.NONE] + fields
aformat = jmswfu.DELIM.join(fields)
coords = []
for atype, key in atype_key_pairs:
params = get_int_tuples_from_str_property(st, key)
for param in params:
idxs = jmswfu.DELIM.join(str(idx) for idx in param)
coord = aformat.format(atype=atype, idxs=idxs)
coords.append(coord)
return coords
def _getJMSWFTSKeywords(self, st):
"""
Return the Jaguar multistage workflow transition state search
keywords for the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:rtype: str
:return: the string of keywords
"""
reserved_jaguar_keywords = self._getJMSWFGenResKeywords(
st, include_temp_press_data=True)
reserved_jaguar_keywords['igeopt'] = '2'
reserved_jaguar_keywords['ifreq'] = '1'
reserved_jaguar_keywords['itrvec'] = '-6'
reserved_jaguar_keywords['nhesref'] = '-1'
# Jaguar TS vetting options, intentionally use the default
# for ts_vet_olap_cut, for ts_vet use the value +3 which considers
# both the eigenvector overlap with active atoms and distances
# among active atoms, despite the Jaguar documentation using +3
# as opposed to -3 does not cause a job failure so check
# afterwards
reserved_jaguar_keywords['ts_vet'] = '3'
jaguar_keywords = self.jaguar_keywords.copy()
jaguar_keywords.update(reserved_jaguar_keywords)
return ' '.join(get_jaguar_keywords_list(jaguar_keywords))
def _getJMSWFStages(self):
"""
Return the Jaguar multistage workflow stages.
:rtype: list
:return: contains the jmswfu.StageData for the Jaguar
multistage workflow
"""
has_ts = getattr(self, 'has_ts', False)
# if there is a TS stage then prevent divergence in the stage
# indices by breaking up the work needed for non-TS stages
# into two stages
aformat = jmswfu.DELIM.join(['{eid}', '{rest}'])
opt_jaguar_keywords_lines = []
opt_restraints_lines = []
ts_jaguar_keywords_lines = []
ts_restraints_lines = []
ts_actives_lines = []
for sibling_group, conformers_dict in self.sibling_conformers_dict.items(
):
for conformer_group, conformers in conformers_dict.items():
for conformer in conformers:
eid = conformer.property[ENTRY_ID_KEY]
all_opt_restraints = self._getJMSWFCoords(
conformer, are_restraints=True)
set_freq = not has_ts and not bool(all_opt_restraints)
opt_jaguar_keywords = self._getJMSWFOptKeywords(
conformer, set_freq=set_freq)
opt_jaguar_keywords_lines.append(
aformat.format(eid=eid, rest=opt_jaguar_keywords))
for opt_restraints in all_opt_restraints:
opt_restraints_lines.append(
aformat.format(eid=eid, rest=opt_restraints))
is_ts = conformer.property.get(
TRANSITION_STATE_STRUCTURE_KEY)
if has_ts or is_ts:
if not is_ts:
set_freq = not bool(all_opt_restraints)
ts_jaguar_keywords = self._getJMSWFOptKeywords(
conformer, set_freq=set_freq)
for opt_restraints in all_opt_restraints:
ts_restraints_lines.append(
aformat.format(eid=eid,
rest=opt_restraints))
else:
ts_jaguar_keywords = self._getJMSWFTSKeywords(
conformer)
for ts_actives in self._getJMSWFCoords(
conformer, are_restraints=False):
ts_actives_lines.append(
aformat.format(eid=eid, rest=ts_actives))
ts_jaguar_keywords_lines.append(
aformat.format(eid=eid, rest=ts_jaguar_keywords))
# the final hessian from a restrained optimization may not be a good
# guess for a transition state search so do not inherit it from the
# parent
in_txt_file = os.path.join(self._jmswf_launch_dir,
self._jmswf_in_txt_file)
with open(in_txt_file, 'w') as afile:
afile.write(jmswfu.NEW_STAGE + '\n')
afile.write(jmswfu.KEYWORDS + '\n')
for opt_jaguar_keywords_line in opt_jaguar_keywords_lines:
afile.write(opt_jaguar_keywords_line + '\n')
if opt_restraints_lines:
afile.write(jmswfu.GEOM_CONSTRAINTS + '\n')
for opt_restraints_line in opt_restraints_lines:
afile.write(opt_restraints_line + '\n')
if ts_jaguar_keywords_lines:
afile.write(jmswfu.NEW_STAGE + '\n')
afile.write(jmswfu.PARENT + '\n')
afile.write(
jmswfu.DELIM.join(['1', jmswfu.WAVEFUNCTION]) + '\n')
afile.write(jmswfu.KEYWORDS + '\n')
for ts_jaguar_keywords_line in ts_jaguar_keywords_lines:
afile.write(ts_jaguar_keywords_line + '\n')
if ts_restraints_lines:
afile.write(jmswfu.GEOM_CONSTRAINTS + '\n')
for ts_restraints_line in ts_restraints_lines:
afile.write(ts_restraints_line + '\n')
if ts_actives_lines:
afile.write(jmswfu.ACTIVE_COORDINATES + '\n')
for ts_actives_line in ts_actives_lines:
afile.write(ts_actives_line + '\n')
stages = jmswfu.read_stage_datafile(in_txt_file)
fileutils.force_remove(in_txt_file)
return stages
def _updateJMSWFStages(self, stages):
"""
Update the Jaguar multistage workflow stages.
:type stages: list
:param stages: contains the jmswfu.StageData for the Jaguar
multistage workflow
:raise JMSWFException: if there is an issue
:rtype: list
:return: contains the jmswfu.StageData for the Jaguar
multistage workflow
"""
unique_geom_dict = getattr(self, 'unique_geom_dict', {})
n_unique_geom = len(unique_geom_dict)
# here we need to read in the extra stages file and modify
# the returned jmswfu.StageData objects so that they can
# be seamlessly appended to the given stages, the first
# of these extra stages is skipped so as to allow analysis
# to potentially be the first extra stage
extra_stages = jmswfu.read_stage_datafile(self.extra_stages_file)[1:]
n_prev_stages = len(stages) # will be either 1 or 2
existing_names = {x.info.name for x in stages}
for index, extra_stage in enumerate(extra_stages, n_prev_stages + 1):
# update the index of this stage
extra_stage.index = index
# We are merging stages from two different sources and the job
# name depends on the stage name, so make sure all the extra
# stages have names that are unique
count = index
while extra_stage.info.name in existing_names:
extra_stage.info.name = jmswfu.GENERIC_STAGE_TAG + str(count)
count += 1
existing_names.add(extra_stage.info.name)
# update any parent indices
for stage in extra_stage.parent_data:
stage.stage += n_prev_stages - 1
# update the parenting indices for any analysis stages
for stage in extra_stage.analyze_data:
stage.parent_st_idx += n_prev_stages - 1
stage.parent_idxs = [
i + n_prev_stages - 1 for i in stage.parent_idxs
]
# update entry IDs, the original entry IDs have been reset to start at
# 1 and now include values for any generated conformers, see
# ._overwriteConformerEIDS() and .eids_dict which for example
# might contain something like eids_dict[17] = [4,5,6,7] if for the
# original entry ID 17 there were 4 conformers generated which were
# assigned entry IDs 4, 5, 6, and 7, all entry_data data needs
# updated entry IDs
for stage_subsection, eid_dict in extra_stage.entry_data.items():
# see MATSCI-11251 - if using geometry deduplication then for relevant
# stages in the extra stages file the number of structure entries will
# be greater than it should be due to the deduplication so update it
if n_unique_geom:
all_eid_pairs = list(eid_dict.items())
unique_eid_pairs = all_eid_pairs[:n_unique_geom]
eid_dict = dict(unique_eid_pairs)
# see MATSCI-6429 - since the main interface and all known use cases
# force equivalent Jaguar keyword data for all structure entries in an
# extra stage we will overwrite stale entry IDs by making this assumption
# and choosing an arbitrary mapping of stale entry IDs to current entry
# IDs
uncommon = set(eid_dict.keys()).difference(
self.eids_dict.keys())
if len(eid_dict) == len(self.eids_dict) and uncommon:
eid_dict = dict(
zip(self.eids_dict.keys(), eid_dict.values()))
new_eid_dict = {}
for eid, datas in eid_dict.items():
for new_eid in self.eids_dict.get(eid, []):
new_datas = []
for data in datas:
new_data = copy.deepcopy(data)
new_data.eid = new_eid
new_datas.append(new_data)
new_eid_dict[new_eid] = new_datas
if not new_eid_dict:
msg = (
f'The extra stages file {self.extra_stages_file} contains '
'at least a single stage that does not define stage data '
'for any of the entries in the input file.')
raise JMSWFException(msg)
extra_stage.entry_data[stage_subsection] = new_eid_dict
return stages + extra_stages
def _setUpJMSWF(self):
"""
Set up the Jaguar multistage workflow.
:rtype: list
:return: contains the jmswfu.StageData for the Jaguar
multistage workflow
"""
extra_stages_file = getattr(self, 'extra_stages_file', None)
os.makedirs(self._jmswf_launch_dir, exist_ok=True)
with structure.StructureWriter(
os.path.join(self._jmswf_launch_dir,
self._jmswf_in_mae_file)) as writer:
for sibling_group, conformers_dict in self.sibling_conformers_dict.items(
):
for conformer_group, conformers in conformers_dict.items():
for conformer in conformers:
writer.append(conformer)
# jmswfu manages the global smap file which if there aren't any
# frequency jobs will be removed at the end
base_name = os.path.join(self._jmswf_launch_dir,
self._jmswf_base_name) + OUT_EXT
jmswfu.create_smap(base_name, self._jmswf_out_mae_file)
stages = self._getJMSWFStages()
if extra_stages_file:
stages = self._updateJMSWFStages(stages)
jmswfu.write_stages_file(
stages, os.path.join(self._jmswf_launch_dir,
self._jmswf_in_txt_file))
return stages
[docs] def runJMSWF(self):
"""
Run the Jaguar multistage workflow.
:raise JMSWFException: if there is an issue
"""
stages = self._setUpJMSWF()
options = argparse.Namespace(TPP=self.tpp,
name=self._jmswf_base_name,
input_file=self._jmswf_in_mae_file)
host = get_sub_host_str(self, 'subhost', 'n_jmswf_subjobs')
job_dj = jobutils.create_queue(options=options, host=host)
# uses jobutils.RobustSubmissionJob by default
with fileutils.chdir(self._jmswf_launch_dir):
workflows = jmswfu.create_workflows(options,
job_dj,
stages,
self._jmswf_out_smap_file,
hierarchical=False,
robust=True,
tmp_logger=self.logger)
with structure.StructureWriter(self._jmswf_out_mae_file) as writer:
jaguarworkflows.run_workflows(job_dj, workflows, writer)
backend = None
jmswfu.finalize_smap(self._jmswf_out_smap_file, backend)
# for restarts it is possible that nothing was done
all_failed = job_dj.total_added and len(
job_dj._failed) == job_dj.total_added
# handle partial failures in self.setRepresentatives
if self.return_jaguar_files or all_failed:
jobutils.add_zipfile_to_backend(self._jmswf_launch_dir)
if all_failed:
msg = ('All Jaguar multistage workflow jobs have failed. '
'All output files will be returned.')
raise JMSWFException(msg)
[docs] def prepareJMSWFOutput(self):
"""
Prepare Jaguar multistage workflow output.
"""
exts = [f'{OUT_EXT}.maegz', f'{OUT_EXT}.smap'
] + jmswfu.SMAP_ELIGIBLE_EXTENSIONS
for ext in exts:
pattern = os.path.join(self._jmswf_launch_dir, '*' + ext)
for apath in glob.glob(pattern):
afile = os.path.split(apath)[1]
shutil.copy(apath, afile)
jobutils.add_outfile_to_backend(afile)
[docs] def checkJMSWFOutputs(self, out_files):
"""
Raises JMSWFException if any of the given Jaguar out files
should be treated as a failure.
:type out_files: list
:param out_files: contains Jaguar output files
:raises JMSWFException: If any of the given Jaguar out files
should be treated as a failure
"""
# for transition state structures vetting is used however a bad
# result doesn't cause a Jaguar failure, nor does multiple imaginary
# frequencies for either optimizations or transition states
for out_file in out_files:
# Jaguar failures have already been handled in the calling code
# so missing Jaguar *out files means that the stage is an analysis
# stage
out_path = os.path.join(self._jmswf_launch_dir, out_file)
if not os.path.exists(out_path):
continue
# skip non-frequency jobs
jag_out = JaguarOutput(out_path)
normal_modes = anharmonic.get_normal_modes(jag_out)
if not normal_modes:
continue
# check TS, when asking for vetting there are three outcomes:
# (1) vetting performed and a vetted vector found
# (2) vetting performed and a vetted vector not found
# (3) vetting not performed (see MATSCI-5145)
#
# see MATSCI-10176 - jag_out.getStructure() does NOT include input
# Maestro structure properties
mae_path = os.path.join(self._jmswf_launch_dir,
f'{jag_out.restart}.mae')
st = structure.Structure.read(mae_path)
if st.property.get(TRANSITION_STATE_STRUCTURE_KEY):
if not check_TS_vetting(out_path):
# the found transition state does not involve the requested
# active atoms
raise JMSWFException(
"The found transition state does not involve the requested active atoms"
)
normal_mode = jag_out.vetted_ts_vector
if normal_mode and normal_mode.frequency != min(
[x.frequency for idx, x in normal_modes]):
# the normal mode involving the active atoms is not the lowest
# frequency mode
raise JMSWFException(
"The normal mode involving the active atoms is not the lowest frequency mode"
)
# check imaginary frequencies
in_path = os.path.join(self._jmswf_launch_dir,
f'{jag_out.restart}.in')
if not os.path.exists(in_path):
raise JMSWFException(f'The file {in_path} does not exists')
jag_in = JaguarInput(input=in_path)
try:
anharmonic.check_imaginary_frequencies(
jag_out, jag_in, max_i_freq=self.max_i_freq)
except anharmonic.AnharmonicException as err:
raise JMSWFException(str(err))
@staticmethod
def _setProperties(st, key, stage_idx, value):
"""
Set properties on the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:type key: str
:param key: the property key
:type stage_idx: int
:param stage_idx: the stage index for the property
:type value: float
:param value: the property value
:rtype: set
:return: the created property keys
"""
rep_key = key + STAGE_SEPARATOR + str(stage_idx)
adict = {rep_key: value}
st.property.update(adict)
return set(adict.keys())
def _updateSiblingConformersDict(self):
"""
Update the sibling conformers dict.
"""
# currently the self.sibling_conformers_dict contains conformers with
# redundant structure titles if the conformers came from MacroModel, while
# the conformers collected in the representatives coming from the Jaguar
# multistage workflow job have been uniqueified using a '-<index>' appended
# to the title, here we need to update the self.sibling_conformers_dict to
# have unique titles, additionally the current self.sibling_conformers_dict
# contains conformers considered as failures in the calling code and so also
# remove any bad conformers
rep_titles = set(rep_st.title for rep_st in self.rep_sts)
for sibling_group, conformers_dict in list(
self.sibling_conformers_dict.items()):
for conformer_group, conformers in list(conformers_dict.items()):
prev_title = None
good_conformers = []
for idx, conformer in enumerate(conformers, 1):
title = conformer.title
if title == prev_title and idx > 1:
title += f'-{idx}'
else:
prev_title = title
if title in rep_titles:
conformer.title = title
good_conformers.append(conformer)
self.sibling_conformers_dict[sibling_group][
conformer_group] = good_conformers
[docs] def setRepresentatives(self):
"""
Associated with each structure is output data from potentially multiple
Jaguar multistage workflow stages. Pick representative structures to
carry the data for all stages.
:raise JMSWFException: if there is an issue
"""
# bin stage structures by original structure title,
# titles are like 'reactant_stage_1', 'transition_state-2_stage_4_analysis_2',
# etc.
title_stage_sts_dict = {}
for st in structure.StructureReader(self._jmswf_out_mae_file):
title, stage_hash = st.title.split(STAGE_SEPARATOR)
title_stage_sts_dict.setdefault(title, []).append((st, stage_hash))
stages = jmswfu.read_stage_datafile(
os.path.join(self._jmswf_launch_dir, self._jmswf_in_txt_file))
rep_stage_idx = get_rep_stage_idx(
rxnwf_file_path=self._jmswf_out_mae_file)
warned = False
copied = self.return_jaguar_files
for title, pairs in title_stage_sts_dict.items():
skip_msg = False
# the IndexError indicates that the required original workflow
# Jaguar calculation has failed for this conformer, continue as there
# may be other successful conformers
try:
rep_st = pairs[rep_stage_idx - 1][0].copy()
except IndexError:
skip_msg = ('Some Jaguar multistage workflow jobs have failed.')
if not skip_msg:
out_files = [f'{pair[0].title}.out' for pair in pairs]
try:
self.checkJMSWFOutputs(out_files)
except JMSWFException as err:
skip_msg = str(err)
skip_msg += (
'Postprocessing of at least a single Jaguar multistage workflow '
'job has failed.')
if skip_msg:
if not warned:
if self.logger:
skip_msg += (' All output files will be returned.')
self.logger.warning(skip_msg)
warned = True
if not copied:
jobutils.add_zipfile_to_backend(self._jmswf_launch_dir)
copied = True
continue
rep_st.title = title
for st, stage_hash in pairs[rep_stage_idx - 1:]:
stage_idx = int(stage_hash.split('_')[0])
stage = stages[stage_idx - 1]
keys = stage.getPropertyKeys(st=st)
# see MATSCI-10926 where we need to add a key to the keys from a
# standard JMSWF
if set([
jaguarworkflows.GAS_PHASE_ENERGY_PROP,
jaguarworkflows.ZERO_POINT_ENERGY_PROP
]).issubset(keys):
keys.append(
jaguarworkflows.GAS_PHASE_ZERO_POINT_ENERGY_PROP)
for key in keys:
rep_st.property.pop(key, None)
if key == jaguarworkflows.GAS_PHASE_ZERO_POINT_ENERGY_PROP:
gpe = st.property.get(
jaguarworkflows.GAS_PHASE_ENERGY_PROP)
zpe = st.property.get(
jaguarworkflows.ZERO_POINT_ENERGY_PROP)
if gpe is not None and zpe is not None:
zpe *= units.HARTREE_PER_KCAL_PER_MOL
value = gpe + zpe
else:
value = None
else:
value = st.property.get(key)
if value is not None:
_rep_keys = self._setProperties(rep_st, key, stage_idx,
value)
self.rep_keys.update(_rep_keys)
self.rep_sts.append(rep_st)
if not self.rep_sts:
msg = (
'Postprocessing of all Jaguar multistage workflow jobs has failed.'
)
raise JMSWFException(msg)
self._updateSiblingConformersDict()
# validate the output workflow file
sibling_conformers_dict = bin_structures_by_property(
self.rep_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY)
for sibling_group, conformer_group, conformer in self.representativeConformers(
):
conformers_dict = sibling_conformers_dict.get(sibling_group)
if not conformers_dict:
msg = (
'All Jaguar multistage workflow calculations for sibling '
f'group {sibling_group} have failed.')
raise JMSWFException(msg)
if not conformers_dict.get(conformer_group):
msg = (
'All Jaguar multistage workflow calculations for conformer '
f'group {conformer_group} have failed.')
raise JMSWFException(msg)
[docs] def finalizeJMSWFOutput(self):
"""
Finalize the Jaguar multistage workflow output.
:rtype: str
:return: the Jaguar multistage workflow output file
"""
jmswf_tag = getattr(self, 'jmswf_tag', '')
# create a valid non-redundant output file from the
# Jaguar multistage workflow output file
base_name = jobutils.get_jobname(
DEFAULT_JOB_NAME) + f'{OUT_EXT}{jmswf_tag}'
out_mae_file = base_name + '.mae'
structure.write_cts(self.rep_sts, out_mae_file)
# the difference between the jmswf and rxnwf output Maestro files is that
# the latter, rather than providing a structure for each stage, chooses a
# single structure to be the representative structure in the rxnwf output,
# that representative comes from the last stage of the rxnwf excluding any
# stages from a potential extra stages file, this representative carries
# the properties of all stages, as a result of this if both a rxnwf and
# extra stage specify keywords to create files with smap extensions, like
# *vib, *vis, etc., only a single file can be linked, since it is unlikely
# that this will arise as extra stages are typically for energy
# post-processing choose to link those from the rxnwf stages, all will
# still be available in the jmswf output files which are copied back to the
# launch host
unique_geom_dict = getattr(self, 'unique_geom_dict', {})
rep_stage_idx = get_rep_stage_idx(rxnwf_sts=self.rep_sts)
rep_sts_dict = {
rep_st.title: idx for idx, rep_st in enumerate(self.rep_sts, 1)
}
# since the Jaguar calculations were only performed on the representative
# structure geometries the relevant files do not exist for the structures
# that the representatives represent, so for these structures point
# the files of the representatives
smap_dict = {}
for title, idx in rep_sts_dict.items():
title = get_unique_geom_title(title, unique_geom_dict)
for ext in jmswfu.SMAP_ELIGIBLE_EXTENSIONS:
pattern = f'{title}{STAGE_SEPARATOR}{rep_stage_idx}*{ext}'
for afile in glob.glob(pattern):
smap_dict.setdefault(afile, []).append(idx)
if smap_dict:
out_smap_file = jmswfu.create_smap(base_name,
out_mae_file,
smap_dict=smap_dict)
jobutils.add_outfile_to_backend(out_smap_file)
jobutils.add_outfile_to_backend(out_mae_file, set_structure_output=True)
return out_mae_file
[docs] def getFreqStageIdxs(self):
"""
Return the stage indices of frequency stages.
:rtype: tuple[int]
:return: the stage indices
"""
freq_stage_idxs = set()
pattern = os.path.join(self._jmswf_launch_dir, '*.out')
for out_path in glob.glob(pattern):
jag_out = JaguarOutput(out_path)
if anharmonic.get_normal_modes(jag_out):
out_file = os.path.split(out_path)[1]
stage_idx = get_stage_idx(out_file, is_filename=True)
freq_stage_idxs.add(stage_idx)
return tuple(freq_stage_idxs)
[docs]class DescriptorsMixin(object):
"""
Manage running descriptors.
"""
DEFAULT_JOB_NAME = 'automatic_reaction_workflow'
[docs] def runDescriptors(self, files):
"""
Run descriptors.
:type files: list
:param files: the files on which to run descriptors
:rtype: list
:return: The output structure file paths. The returned list will be the
same length as the input files list.
"""
# handle restarts
job_name = jobutils.get_jobname(self.DEFAULT_JOB_NAME)
zip_files = jaguar_restart.get_restart_zip_files(
base_name_patterns=[f'{DESCRIPTORS_LAUNCH_DIR_HEADER}{job_name}*'])
jaguar_restart.prepare_restart_dirs(zip_files)
host = get_sub_host_str(self, 'jmswf_host', 'n_rxnwf_subjobs')
# keep Jaguar &gen section to a minimum for now
keys = ['dftname', 'basis']
jaguar_keywords = {key: self.jaguar_keywords[key] for key in keys}
jaguar_keywords = get_jaguar_keywords_list(jaguar_keywords)
jaguar_keywords = ' '.join(jaguar_keywords)
options = argparse.Namespace(TPP=self.tpp)
job_dj = jobutils.create_queue(options=options, host=host)
base_cmd = ['complex_descriptors.py']
base_cmd += [FLAG_JAGUAR]
base_cmd += [FLAG_JAGUAR_KEYWORDS, jaguar_keywords]
base_cmd += [FLAG_TPP, str(self.tpp)]
base_cmd += [FLAG_LIGFILTER]
base_cmd += [FLAG_CANVAS]
base_cmd += [FLAG_MOLDESCRIPTORS]
base_cmd += [FLAG_INCLUDE_VECTORIZED]
base_cmd += [FLAG_FINGERPRINTS]
if self.return_descriptor_files:
base_cmd += [FLAG_SAVEFILES]
if host != jobutils.AUTOHOST:
base_cmd += ['-HOST', self.jmswf_host]
out_rep = getattr(self, 'out_rep', None)
if out_rep:
base_cmd += [parserutils.FLAG_OUT_REP, out_rep]
# collect jobs, the complex descriptors driver does more than
# just running Jaguar jobs so currently restart each job even
# if all Jaguar *out files are available, for such cases the Jaguar
# jobs are not restarted
launch_dirs = []
for afile in files:
job_name = fileutils.get_basename(afile)
launch_dir = job_name = f'{DESCRIPTORS_LAUNCH_DIR_HEADER}{job_name}'
cmd = list(base_cmd)
cmd += ['-JOBNAME', job_name]
cmd += [FLAG_INPUT_FILE, afile]
cmd += [FLAG_STPROPS, f'{job_name}.mae']
os.makedirs(launch_dir, exist_ok=True)
shutil.copy(afile, os.path.join(launch_dir, afile))
job = jobutils.RobustSubmissionJob(cmd, command_dir=launch_dir)
job_dj.addJob(job)
launch_dirs.append(launch_dir)
if self.logger:
self.logger.info('Running descriptors on all files.')
self.logger.info('')
job_dj.run()
for launch_dir in launch_dirs:
zip_file = launch_dir + '.zip'
zip_directory(launch_dir, fileobj=zip_file)
jobutils.add_outfile_to_backend(zip_file)
file_paths = []
for launch_dir in launch_dirs:
file_path = os.path.join(launch_dir, f'{launch_dir}.mae')
if not os.path.exists(file_path):
msg = (f'Descriptor output structure file {file_path} '
'can not be found.')
raise ReactionWorkflowException(msg)
file_paths.append(file_path)
return file_paths
[docs]def has_transition_state(rxnwf_file_path=None, rxnwf_sts=None):
"""
Return True if the reaction workflow features a transition state,
False otherwise.
:type rxnwf_file_path: str or None
:type rxnwf_file_path: the path to a reaction workflow file
:type rxnwf_sts: list[`schrodinger.structure.Structure`] or None
:type rxnwf_sts: reaction workflow structures
:rtype: bool
:return: True if the reaction workflow features a transition state,
False otherwise
"""
assert bool(rxnwf_file_path) ^ bool(rxnwf_sts)
if rxnwf_file_path:
rxnwf_sts = list(structure.StructureReader(rxnwf_file_path))
for rxnwf_st in rxnwf_sts:
if rxnwf_st.property.get(TRANSITION_STATE_STRUCTURE_KEY):
return True
return False
[docs]def get_rep_stage_idx(rxnwf_file_path=None, rxnwf_sts=None):
"""
Return the representative stage index.
:type rxnwf_file_path: str or None
:type rxnwf_file_path: the path to a reaction workflow file
:type rxnwf_sts: list[`schrodinger.structure.Structure`] or None
:type rxnwf_sts: reaction workflow structures
:rtype: int
:return: the representative stage index
"""
assert bool(rxnwf_file_path) ^ bool(rxnwf_sts)
has_ts = has_transition_state(rxnwf_file_path=rxnwf_file_path,
rxnwf_sts=rxnwf_sts)
if has_ts:
rep_stage_idx = 2
else:
rep_stage_idx = 1
return rep_stage_idx
[docs]class ReactionWorkflowException(Exception):
pass
[docs]class ReactionWorkflow(ConformationalSearchMixin, JMSWFMixin, UniqueGeomMixin):
"""
Manage a reaction workflow.
"""
[docs] def __init__(self,
reaction_wf_input_sts,
dedup_geom_eps=parserutils.DEFAULT_DEDUP_GEOM_EPS,
n_conformers=DEFAULT_N_CONFORMERS,
skip_eta_rotamers=False,
pp_rel_energy_thresh=PP_CSEARCH_REL_ENERGY_THRESH,
seed=parserutils.RANDOM_SEED_DEFAULT,
only_rings=True,
return_csearch_files=False,
jaguar_keywords=DEFAULT_JAGUAR_KEYWORDS,
temp_data=None,
press_data=None,
return_jaguar_files=False,
anharm=False,
return_anharm_files=False,
anharm_max_freq=anharmonic.DEFAULT_MAX_FREQ,
anharm_factor_data=None,
rate_constants=False,
return_rate_constant_files=False,
custom_rate_constants=False,
wigner_tunnel_corr=False,
extra_stages_file=None,
max_i_freq=anharmonic.DEFAULT_MAX_I_FREQ,
out_rep=None,
n_jmswf_subjobs=1,
subhost=None,
tpp=DEFAULT_TPP,
ffld_name=None,
logger=None):
"""
Create an instance.
:type reaction_wf_input_sts: list
:param reaction_wf_input_sts: reaction workflow input structures
:type dedup_geom_eps: float
:param dedup_geom_eps: reduce the number of calculations by deduplicating
the input structures based on geometry, using this threshold
in Ang., and only calculating the representatives, a value of
zero means no deduplicating
:type n_conformers: int
:param n_conformers: number of conformers to search for
:type skip_eta_rotamers: bool
:param skip_eta_rotamers: skip eta rotamers generation if true
:type pp_rel_energy_thresh: None or float
:param pp_rel_energy_thresh: relative energy threshold in kJ/mol, if None then
only the n_conformers lowest energy conformers are returned
:type seed: int
:param seed: seed for random number generator
:type only_rings: bool
:param only_rings: if True then only allow rotation of eta-bound rings,
if False then also allow rotation of ligands where the eta-bound
motif is acyclic, for example ethene, etc.
:type return_csearch_files: bool
:param return_csearch_files: whether to return all output files
from the conformational search subjobs
:type jaguar_keywords: OrderedDict
:param jaguar_keywords: Jaguar keywords
:type temp_data: TempData
:param temp_data: the temperature data for thermochemical properties
:type press_data: PressData
:param press_data: the pressure data for thermochemical properties
:type return_jaguar_files: bool
:param return_jaguar_files: whether to return all output files
from the Jaguar subjobs
:type anharm: bool
:param anharm: whether to run the anharmonic workflow
:type return_anharm_files: bool
:param return_anharm_files: whether to return all output files
from the anharmonic workflow
:type anharm_max_freq: float
:param anharm_max_freq: anharmonic potentials are created for normal modes with
harmonic frequencies less than this value in wavenumbers (cm^-1)
:type anharm_factor_data: anharmonic.SeqData or None
:param anharm_factor_data: unitless factor data for factors that multiply
a normal mode displacement, if None then the defaults are used,
the number of points is in the positive direction only, excluding
zero and the negative direction, for example using a value
of 4 in turn means 2 * 4 + 1 = 9 points total
:type rate_constants: bool
:param rate_constants: whether to report rate constant(s) for the rate
determining step of the reaction using canonical transition state
theory
:type return_rate_constant_files: bool
:param return_rate_constant_files: whether to return all output files
from the rate constant subjobs
:param bool custom_rate_constants: whether to compute custom rate
constants
:type wigner_tunnel_corr: bool
:param wigner_tunnel_corr: whether to include the Wigner tunneling correction
when computing rate constant(s)
:type extra_stages_file: str
:param extra_stages_file: the name of a file containing extra
stages for a Jaguar Multistage Workflow subjob that will
be performed on all output structures from the reaction workflow,
the first of these extra stages is always skipped so as to allow
analysis to potentially be the first extra stage
:type max_i_freq: float
:param max_i_freq: tolerate small imaginary frequencies less than this value in
wavenumbers (cm^-1)
:param out_rep: if a string then must be either module constant
parserutils.CENTROID or parserutils.ETA, if None then do nothing
:type n_jmswf_subjobs: int
:param n_jmswf_subjobs: the maximum number of simultaneous Jaguar
multistage workflow subjobs
:type subhost: str
:param subhost: the host to use for subjobs
:type tpp: int
:param tpp: the number of threads to use for Jaguar
subjobs, i.e. -TPP (threads-per-process)
:type ffld_name: str
:param ffld_name: the name of the force field to use (e.g., OPLS2005 or
S-OPLS)
:type logger: logging.Logger or None
:param logger: output logger or None if there isn't one
"""
if ffld_name is None:
ffld_name = msutils.get_default_forcefield().name
self.reaction_wf_input_sts = reaction_wf_input_sts
self.dedup_geom_eps = dedup_geom_eps
self.n_conformers = n_conformers
self.skip_eta_rotamers = skip_eta_rotamers
self.pp_rel_energy_thresh = pp_rel_energy_thresh
self.seed = seed
self.only_rings = only_rings
self.return_csearch_files = return_csearch_files
self.jaguar_keywords = jaguar_keywords
self.temp_data = temp_data or TempData(
temp_start=jmswfu.DEFAULT_TEMP_START,
temp_step=jmswfu.DEFAULT_TEMP_STEP,
temp_n=jmswfu.DEFAULT_TEMP_N)
self.press_data = press_data or PressData(
press_start=jmswfu.DEFAULT_PRESS_START,
press_step=jmswfu.DEFAULT_PRESS_STEP,
press_n=jmswfu.DEFAULT_PRESS_N)
self.return_jaguar_files = return_jaguar_files
self.anharm = anharm
self.return_anharm_files = return_anharm_files
self.anharm_max_freq = anharm_max_freq
self.anharm_factor_data = anharm_factor_data or anharmonic.SeqData(
start=anharmonic.DEFAULT_F_START,
step=anharmonic.DEFAULT_F_STEP,
n_points=anharmonic.DEFAULT_F_N_POINTS)
self.rate_constants = rate_constants
self.return_rate_constant_files = return_rate_constant_files
self.custom_rate_constants = custom_rate_constants
self.wigner_tunnel_corr = wigner_tunnel_corr
self.extra_stages_file = extra_stages_file
self.max_i_freq = max_i_freq
self.out_rep = out_rep
self.n_jmswf_subjobs = n_jmswf_subjobs
self.subhost = subhost
self.tpp = tpp
self.ffld_name = ffld_name
self.logger = logger
self.unique_geom_dict = {}
self.sibling_conformers_dict = None
self.eids_dict = {}
self._jmswf_launch_dir = JMSWF_LAUNCH_DIR
self._jmswf_base_name = 'jaguar_multistage_workflow'
self._jmswf_in_mae_file = self._jmswf_base_name + '.maegz'
self._jmswf_in_txt_file = self._jmswf_base_name + '.txt'
self._jmswf_out_mae_file = f'{self._jmswf_base_name}{OUT_EXT}.maegz'
self._jmswf_out_smap_file = f'{self._jmswf_base_name}{OUT_EXT}.smap'
self.has_ts = False
self.rdss = None
self._anharm_dir = ANHARM_LAUNCH_DIR
self._ctst_dir = CTST_RATE_CONSTANTS_DIR
self.rep_keys = set()
self.rep_sts = []
self.jmswf_tag = REACTION_WF_TAG
[docs] def validateRateConstants(self):
"""
Validate rate constants.
:raise ReactionWorkflowException: if there is an issue
"""
if not self.rate_constants:
return
sibling_conformers_dict = bin_structures_by_property(
self.reaction_wf_input_sts,
key=SIBLING_GROUP_KEY,
inner_key=CONFORMERS_GROUP_KEY)
eligible_for_rate_constants = False
# In order to be eligible we need a sibling group containing 1 TS whose parent
# sibling group contains 0 TSs
for sibling_group, conformers_dict in sibling_conformers_dict.items():
# ensure a single TS in the sibling group
is_ts = []
for conformers in conformers_dict.values():
rep_conf = conformers[0] # all same here
is_ts.append(
bool(rep_conf.property.get(TRANSITION_STATE_STRUCTURE_KEY)))
if is_ts.count(True) != 1:
continue
# ensure the sibling group containing a single TS has
# a parent sibling group with no TSs
rep_conf = list(conformers_dict.values())[0][0] # all same here
parent_sibling_group_key = rep_conf.property.get(
PARENT_SIBLING_GROUPS_KEY)
if (not parent_sibling_group_key or
parent_sibling_group_key not in sibling_conformers_dict):
continue
parent_conformers_dict = sibling_conformers_dict[
parent_sibling_group_key]
is_not_ts = []
for conformers in parent_conformers_dict.values():
rep_conf = conformers[0] # all same here
is_not_ts.append(not bool(
rep_conf.property.get(TRANSITION_STATE_STRUCTURE_KEY)))
if all(is_not_ts):
eligible_for_rate_constants = True
break
if not eligible_for_rate_constants:
msg = ('Rate constant calculations require child transition state '
'structures.')
raise ReactionWorkflowException(msg)
[docs] def validateAnharmonic(self):
"""
Validate anharmonic.
:raise ReactionWorkflowException: if there is an issue
"""
sibling_conformers_dict = bin_structures_by_property(
self.reaction_wf_input_sts,
key=SIBLING_GROUP_KEY,
inner_key=CONFORMERS_GROUP_KEY)
if not self.has_ts and self.anharm:
getters = [
get_restrain_distance_idxs, get_restrain_angle_idxs,
get_restrain_dihedral_idxs
]
only_restrained_opts = True
for sibling_group, conformer_group, first_conformer in self.representativeConformers(
sibling_conformers_dict=sibling_conformers_dict):
for getter in getters:
if getter(first_conformer):
break
else:
only_restrained_opts = False
if not only_restrained_opts:
break
if only_restrained_opts:
msg = (
'The anharmonic workflow requires frequencies but '
'all Jaguar multistage jobs are restrained optimizations.')
raise ReactionWorkflowException(msg)
[docs] def validateSubhost(self):
"""
Validate subhost.
:raise ReactionWorkflowException: if there is an issue
"""
host = 'localhost'
if jobcontrol.get_backend():
host_list = jobcontrol.get_backend_host_list()
if host_list:
host = host_list[0][0]
if host != 'localhost' and self.subhost == 'localhost':
msg = ('Jobs requesting a remote driver host and local '
'subjob host are not supported.')
raise ReactionWorkflowException(msg)
if not self.subhost:
self.subhost = host
[docs] def validate(self):
"""
Validate.
:raise ReactionWorkflowException: if there is an issue
"""
self.validateRateConstants()
self.validateAnharmonic()
self.validateSubhost()
def _setUpAnharmonic(self):
"""
Set up anharmonic workflow calculations.
:raise ReactionWorkflowException: if there is an issue
"""
# determine which JMSWF stages calculate frequencies, currently allow
# for the possibility of an extra stage calculating frequencies (so
# consider not only the first one or two reaction workflow stages),
# for each stage collect all Jaguar *.out and *.01.in files
in_files_by_stage = {}
for stage_idx in self.getFreqStageIdxs():
pattern = os.path.join(self._jmswf_launch_dir,
f'*{STAGE_SEPARATOR}{stage_idx}.out')
for out_path in glob.glob(pattern):
jag_out = JaguarOutput(out_path)
# see MATSCI-12044 - anharmonic calculations on atomic systems will
# fall through, ensure that here so that the properties defined for all
# structures are consistent
st = jag_out.getStructure()
if st.atom_total > 1 and not anharmonic.get_normal_modes(
jag_out):
continue
out_files = [os.path.split(out_path)[1]]
try:
self.checkJMSWFOutputs(out_files)
except JMSWFException:
continue
rin_path = os.path.splitext(out_path)[0] + '.01.in'
if not os.path.exists(rin_path):
msg = (f'The Jaguar restart input file {rin_path} can '
'not be found.')
raise ReactionWorkflowException(msg)
out_file = os.path.split(out_path)[1]
rin_file = os.path.split(rin_path)[1]
in_files_by_stage.setdefault(stage_idx, []).append(
(out_file, rin_file))
# ensure that for each of these stages each conformer in the reaction
# workflow has an anharmonic input deck
for idx, all_files in in_files_by_stage.items():
if len(all_files) != len(self.rep_sts):
msg = (
'Some Jaguar output files from Jaguar Multistage Workflow '
f'stage {idx} are missing frequency data.')
raise ReactionWorkflowException(msg)
# prepare anharmonic subjobs in a separate directory
os.makedirs(self._anharm_dir, exist_ok=True)
for all_files in in_files_by_stage.values():
for files in all_files:
for afile in files:
apath = os.path.join(self._jmswf_launch_dir, afile)
bpath = os.path.join(self._anharm_dir, afile)
shutil.copy(apath, bpath)
@staticmethod
def _updateAnharmonicJaguarKwargs(rin_file_path):
"""
Update anharmonic Jaguar keyword arguments.
:type rin_file_path: str
:param rin_file_path: the restart Jaguar input file path (`*.01.in`) whose
&gen keywords arguments will be updated
"""
# see MATSCI-11151 - where *vis, etc. property files are
# not needed for rxnwf
rin_jag = JaguarInput(input=rin_file_path)
rin_jag.deleteKeys(['iplotden', 'iplotesp', 'iorb1a', 'iorb2a', 'nbo'])
rin_jag.saveAs(rin_file_path)
[docs] def runAnharmonic(self):
"""
Run the anharmonic workflow.
"""
self._setUpAnharmonic()
# allow multiple simultaneous anharmonic subjobs
options = argparse.Namespace(TPP=self.tpp)
host = f'{self.subhost}:{self.n_jmswf_subjobs}'
job_dj = jobutils.create_queue(options=options, host=host)
# allow multiple simultaneous Jaguar single point subjobs for each
# anharmonic subjob, continue processing even if there are zero
# normal modes to be treated anharmonically
cmd = ['anharmonic_workflow_driver.py']
cmd += [anharmonic.FLAG_MAX_FREQ, str(self.anharm_max_freq)]
cmd += [
anharmonic.FLAG_F_DATA,
str(self.anharm_factor_data.start),
str(self.anharm_factor_data.step),
str(self.anharm_factor_data.n_points)
]
cmd += [parserutils.FLAG_ROBUST]
cmd += [FLAG_MAX_I_FREQ, str(self.max_i_freq)]
cmd += [anharmonic.FLAG_PROCESS_NO_ANHARMONICITIES]
cmd += [FLAG_TPP, str(self.tpp)]
cmd += ['-HOST', host]
# collect jobs, restart *.out files are marked with ANHARMONIC_TAG
# and should be skipped here, in contrast to JMSWF which only runs
# Jaguar jobs the anharmonic driver does more than just running
# Jaguar jobs so currently restart each job even if all Jaguar *out
# files are available, for such cases the Jaguar jobs are not restarted
pattern = os.path.join(self._anharm_dir, '*.out')
for out_path in glob.glob(pattern):
if ANHARMONIC_TAG in out_path:
continue
out_file = os.path.split(out_path)[1]
base_name, ext = os.path.splitext(out_file)
rin_file = base_name + '.01.in'
rin_file_path = os.path.join(self._anharm_dir, rin_file)
self._updateAnharmonicJaguarKwargs(rin_file_path)
job_name = base_name + ANHARMONIC_TAG
_cmd = list(cmd)
_cmd += [anharmonic.FLAG_JAG_OUT_FILE, out_file]
_cmd += [anharmonic.FLAG_JAG_RIN_FILE, rin_file]
_cmd += ['-JOBNAME', job_name]
jc_job = jobutils.RobustSubmissionJob(_cmd,
command_dir=self._anharm_dir,
name=job_name)
job_dj.addJob(jc_job)
# run jobs
job_dj.run()
[docs] def processAnharmonic(self):
"""
Process the anharmonic workflow.
:raise ReactionWorkflowException: if there is an issue
"""
# check output, for now require success for each conformer
pattern = os.path.join(self._anharm_dir, f'*{anharmonic.OUT_EXT}')
mae_paths = glob.glob(pattern)
stage_idxs = {
get_stage_idx(mae_path, is_filename=True) for mae_path in mae_paths
}
force_return_anharm_files = len(mae_paths) != len(stage_idxs) * len(
self.rep_sts)
# copy output
if self.return_anharm_files or force_return_anharm_files:
jobutils.add_zipfile_to_backend(self._anharm_dir)
if force_return_anharm_files:
msg = ('At least a single anharmonic workflow subjob has failed.')
raise ReactionWorkflowException(msg)
# update representative structures with anharmonic properties for relevant
# stages, update keys, and store energy keys
for rep_st in self.rep_sts:
for stage_idx in stage_idxs:
mae_path = os.path.join(self._anharm_dir,
f'{rep_st.title}{STAGE_SEPARATOR}{stage_idx}{ANHARMONIC_TAG}' \
f'{anharmonic.OUT_EXT}')
st = structure.Structure.read(mae_path)
for key, value in st.property.items():
if key.startswith(anharmonic.KEY_STARTER):
_rep_keys = self._setProperties(rep_st, key, stage_idx,
value)
if 'Total' in key:
self.rep_keys.update(_rep_keys)
def _setRateDeterminingSteps(self):
"""
Set a list of rate determining steps.
:raise ReactionWorkflowException: if there is an issue
"""
rxnwfea = ReactionWorkflowEnergyAnalysis
# use conformationally averaged energies relative to parents
# including cross terms to determine the rate determining step
# of the reaction
base_name = jobutils.get_jobname(DEFAULT_JOB_NAME)
rel_parents_file = base_name + REL_PARENTS_W_X_EXT
with open(rel_parents_file, 'r') as afile:
rel_parents_dict = json.load(afile)
# bin (child - parent) energy differences by relevant stages to
# determine the rate determining step for each stage, the energy
# to use is U_0 = E_SCF + E_ZPE which is temperature independent
# and thus previously algebraically averaged over conformers and
# is now linearly summable, currently harmonic ZPE is used even
# if modeling anharmonicities
scf_energy_h = rxnwfea.getHeader(jaguarworkflows.GAS_PHASE_ENERGY_PROP)
zpe_energy_h = rxnwfea.getHeader(jaguarworkflows.ZERO_POINT_ENERGY_PROP)
stage_child_parent_energies_dict = {}
for child_name, parent_dict in rel_parents_dict.items():
for parent_name, edict in parent_dict.items():
stage_energies_dict = {}
for h, energy in edict.items():
if h.startswith(scf_energy_h) or h.startswith(zpe_energy_h):
stage_idx = get_stage_idx(h)
stage_energies_dict.setdefault(stage_idx,
[]).append(energy)
for stage_idx, energies in stage_energies_dict.items():
if len(energies) == 2:
stage_child_parent_energies_dict.setdefault(
stage_idx, []).append(
(child_name, parent_name, sum(energies)))
if not stage_child_parent_energies_dict:
msg = ('Rate constant calulations require at least a single stage '
'in which Jaguar frequencies are calculated.')
raise ReactionWorkflowException(msg)
# collect rate determining steps by sorting by energy and finding the
# highest energy transition state
rdss = []
for stage_idx, values in stage_child_parent_energies_dict.items():
rds = None
for child_name, parent_name, energy in sorted(values,
key=lambda x: x[2],
reverse=True):
for sibling_group, conformer_group, rep_child_conf in self.representativeConformers(
specific_sibling_group=child_name):
if rep_child_conf.property.get(
TRANSITION_STATE_STRUCTURE_KEY):
rds = SimpleNamespace(stage_idx=stage_idx,
child_name=child_name,
parent_name=parent_name)
break
if rds:
rdss.append(rds)
break
if not rdss:
msg = ('Rate constant calulations require at least a single child '
'structure to be marked as a transition state structure.')
raise ReactionWorkflowException(msg)
self.rdss = rdss
def _getJaguarOutputPathsChildAndParent(self, rds):
"""
Return paths to the Jaguar output files for child and
parent conformers for the given rate determining step.
:type rds: SimpleNamespace
:param rds: the rate determining step
:rtype: list, list
:return: the child and parent conformer paths the latter of
which is a list of lists of paths for each parent structure
"""
# among the child sibling group collect spectator masses and
# for the found TS collect conformer titles
child_spectator_masses = []
child_ts_conformer_titles = []
for conformer_group, conformers in self.sibling_conformers_dict[
rds.child_name].items():
first_conformer = conformers[0]
if first_conformer.property.get(TRANSITION_STATE_STRUCTURE_KEY):
for conformer in conformers:
child_ts_conformer_titles.append(conformer.title)
else:
child_spectator_masses.append(
get_molecular_weight(first_conformer,
decimal=MASS_N_DECIMAL))
# isolate spectators and determine which parent siblings
# made the TS
binned_parent_conformer_titles = {}
for conformer_group, conformers in self.sibling_conformers_dict[
rds.parent_name].items():
first_conformer = conformers[0]
parent_mass = get_molecular_weight(first_conformer,
decimal=MASS_N_DECIMAL)
if parent_mass in child_spectator_masses:
child_spectator_masses.remove(parent_mass)
else:
for conformer in conformers:
binned_parent_conformer_titles.setdefault(
conformer_group, []).append(conformer.title)
binned_parent_conformer_titles = \
list(binned_parent_conformer_titles.values())
get_paths = lambda x: [
os.path.join(
self._jmswf_launch_dir,
get_unique_geom_title(title, self.unique_geom_dict) +
STAGE_SEPARATOR + str(rds.stage_idx) + '.out') for title in x
]
child_paths = get_paths(child_ts_conformer_titles)
binned_parent_paths = [
get_paths(titles) for titles in binned_parent_conformer_titles
]
return child_paths, binned_parent_paths
def _setUpCTST(self):
"""
Set up canonical transition state theory calculations.
"""
self._setRateDeterminingSteps()
# the Wigner tunneling correction is computed from the imaginary
# frequency of the transition state and ultimately multiples its
# partition function to alter the rate constant
wigner_tunnel_corr = 'yes' if self.wigner_tunnel_corr else 'no'
temps = ' '.join([
str(self.temp_data.temp_start + i * self.temp_data.temp_step)
for i in range(0, self.temp_data.temp_n)
])
# collect all required CTST jobs
get_conf_title = lambda x: fileutils.get_basename(x).split(
STAGE_SEPARATOR)[0]
for rds in self.rdss:
child_paths, binned_parent_paths = self._getJaguarOutputPathsChildAndParent(
rds)
rds.jobs = []
for child_path in child_paths:
child_title = get_conf_title(child_path)
for parent_paths in itertools.product(*binned_parent_paths):
parent_title = '_'.join(
get_conf_title(parent_path)
for parent_path in parent_paths)
job_name = (f'tst{STAGE_SEPARATOR}{rds.stage_idx}_'
f'{parent_title}_{child_title}')
cmd = [os.path.join(os.environ['SCHRODINGER'], 'run')]
cmd += ['tst_rate_startup.py']
cmd += ['-wigner_tunnel_corr', wigner_tunnel_corr]
cmd += [FLAG_MAX_I_FREQ, str(self.max_i_freq)]
cmd += ['-temperature', temps]
cmd += ['-COMPLEX_OUTPUT', child_path]
cmd += ['-COMPLEX_TRV', 'trv']
if self.anharm:
base_name = fileutils.get_basename(child_path)
base_name = get_unique_geom_title(
base_name, self.unique_geom_dict)
mae_path = os.path.join(
self._anharm_dir,
f'{base_name}{ANHARMONIC_TAG}{anharmonic.OUT_EXT}')
cmd += ['-COMPLEX_ANHARM_OUTPUT_MAE', mae_path]
for idx, parent_path in enumerate(parent_paths):
cmd += [f'-REAGENT_OUTPUT_{idx}', parent_path]
cmd += [f'-REAGENT_TRV_{idx}', 'trv']
if self.anharm:
base_name = fileutils.get_basename(parent_path)
base_name = get_unique_geom_title(
base_name, self.unique_geom_dict)
mae_path = os.path.join(
self._anharm_dir,
f'{base_name}{ANHARMONIC_TAG}{anharmonic.OUT_EXT}'
)
cmd += [
f'-REAGENT_ANHARM_OUTPUT_MAE_{idx}', mae_path
]
cmd += ['-DEBUG'] # enable verbose logging
cmd += [job_name]
rds.jobs.append(cmd)
[docs] def runCTST(self):
"""
Run canonical transition state theory calculations to determine rate
constant(s) for the rate determining step of the reaction.
"""
self._setUpCTST()
# see MATSCI-6611 - originally jobs were launched simultaneously on
# self.subhost using a JobDJ filled with SubprocessJob types but it
# was found that starting the first of such jobs was hanging
# indefinitely on Windows and so it was switched to use subprocess
# (so running on the original -HOST) in serial
for rds in self.rdss:
rds.stdouts = []
for job in rds.jobs:
try:
stdout = subprocess.check_output(job,
stderr=subprocess.STDOUT,
universal_newlines=True)
except subprocess.CalledProcessError:
stdout = ''
rds.stdouts.append(stdout)
[docs] def prepareCTSTOutput(self):
"""
Prepare CTST output.
:raise ReactionWorkflowException: if there is an issue
"""
# process raw output data
failed_stage_idx = None
for rds in self.rdss:
temp_data_dict = {}
for stdout in rds.stdouts:
for line in stdout.splitlines():
if line.startswith('{'):
data = yaml.safe_load(line.strip())
temperature = float(data['Temperature'][0])
energy = float(data['Energy Barrier'][0])
r_partition_f = float(
data['Reagents Partition Function'][0])
ts_partition_f = float(
data['Complex Partition Function'][0])
mantissa, order_of_magnitude = data[
'TST Rate Constant'][0].split(' x ')
rate_constant = float(mantissa) * eval(
order_of_magnitude.replace('^', '**'))
rate_constant_units = data['TST Rate Constant'][1]
data = (energy, r_partition_f, ts_partition_f,
rate_constant, rate_constant_units)
temp_data_dict.setdefault(temperature, []).append(data)
if not temp_data_dict:
failed_stage_idx = rds.stage_idx
break
for temp, data in temp_data_dict.items():
energies, r_partition_fs, ts_partition_fs, rate_constants, \
rate_constant_units = [numpy.array(i) for i in zip(*data)]
# energies are temperature independent so perform an algebraic
# average
avg_energy = numpy.mean(energies)
# Boltzmann average the temperature dependent partition functions
# and rate constants
factors = numpy.exp(-1 * energies / (IDEAL_GAS_CONSTANT * temp))
partition_f = sum(factors)
avg_r_partition_f = numpy.dot(r_partition_fs,
factors) / partition_f
avg_ts_partition_f = numpy.dot(ts_partition_fs,
factors) / partition_f
avg_rate_constant = numpy.dot(rate_constants,
factors) / partition_f
temp_data_dict[temp] = (avg_energy, avg_r_partition_f,
avg_ts_partition_f, avg_rate_constant,
rate_constant_units[0])
rds.temp_data_dict = temp_data_dict
# process raw output files
if failed_stage_idx is not None or self.return_rate_constant_files:
os.makedirs(self._ctst_dir, exist_ok=True)
for rds in self.rdss:
for job, stdout in zip(rds.jobs, rds.stdouts):
log_file = os.path.join(self._ctst_dir, job[-1] + '.log')
with open(log_file, 'w') as afile:
afile.write(stdout)
jobutils.add_zipfile_to_backend(self._ctst_dir)
if failed_stage_idx:
msg = (
'All rate constant calculations for stage '
f'{failed_stage_idx} have failed for all child and parent '
'conformers.')
raise ReactionWorkflowException(msg)
# process summary output file
base_name = jobutils.get_jobname(DEFAULT_JOB_NAME)
rate_constants_file = base_name + RATE_CONSTANTS_W_X_EXT
with csv_unicode.writer_open(rate_constants_file) as afile:
writer = csv.writer(afile)
writer.writerow([
'Stage_Index', 'Parent_Sibling_Group_Name',
'Child_Sibling_Group_Name', 'Temperature/K',
'Conf_Avg_SCF+ZPE_Energy_Barrier/kcal/mol',
'Conf_Avg_Reactants_Partition_F', 'Conf_Avg_TS_Partition_F',
CONF_AVG_CTST_RATE_CONSTANT, 'CTST_Rate_Constant_Units'
])
for rds in self.rdss:
for temp, data in rds.temp_data_dict.items():
writer.writerow(
[rds.stage_idx, rds.parent_name, rds.child_name, temp] +
list(data))
jobutils.add_outfile_to_backend(rate_constants_file)
[docs] def exportEnergyDiagrams(self):
"""
Export the energy diagrams if the parents file exists
"""
base_name = jobutils.get_jobname(DEFAULT_JOB_NAME)
json_path = base_name + REL_PARENTS_WO_X_EXT
if os.path.exists(json_path):
csv_path = base_name + REL_REACTANTS_WO_X_EXT
plotter = EnergyDiagramPlotter(json_path,
csv_path,
logger=self.logger)
plotter.run()
[docs] def run(self):
"""
Run the reaction workflow.
:raise ReactionWorkflowException: if there is an issue
"""
zip_files = jaguar_restart.get_restart_zip_files(
base_names=[JMSWF_LAUNCH_DIR, ANHARM_LAUNCH_DIR])
jaguar_restart.prepare_restart_dirs(zip_files)
self.has_ts = has_transition_state(rxnwf_sts=self.reaction_wf_input_sts)
self.validate()
self._setUniqueGeomDict(self.reaction_wf_input_sts)
sts = self._getUniqueGeomSts(self.reaction_wf_input_sts)
try:
self.createConformers(sts)
except ConformationalSearchException as err:
raise ReactionWorkflowException(str(err))
try:
self._overwriteConformerEIDS()
self.runJMSWF()
self.prepareJMSWFOutput()
self.setRepresentatives()
except JMSWFException as err:
raise ReactionWorkflowException(str(err))
if self.anharm:
self.runAnharmonic()
self.processAnharmonic()
if self.out_rep:
for st in self.rep_sts:
etatoggle.toggle_structure(st, out_rep=self.out_rep)
self.rep_sts = self._duplicateUniqueGeomSts()
self.sibling_conformers_dict = bin_structures_by_property(
self.rep_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY)
out_mae_file = self.finalizeJMSWFOutput()
ReactionWorkflowEnergyAnalysis(
out_mae_file, self.rep_keys,
dedup_geom_eps=self.dedup_geom_eps).writeDataFiles()
self.exportEnergyDiagrams()
compute_custom_rates = (self.rate_constants and
self.custom_rate_constants)
# Defined in MATSCI-10586
try:
get_custom_keq_rates(out_mae_file,
self.press_data,
self.temp_data,
extra_stages_file=self.extra_stages_file,
compute_rates=compute_custom_rates,
wigner_tunnel_corr=self.wigner_tunnel_corr)
except ValueError as err:
raise ReactionWorkflowException(str(err))
if self.rate_constants and not self.custom_rate_constants:
self.runCTST()
self.prepareCTSTOutput()
log('All finished', timestamp=True, pad=True, logger=self.logger)
[docs]class Sites:
"""
Manage enumeration sites.
"""
[docs] @staticmethod
def getSites(sites_data, n_structures=1):
"""
Return a list of Site from the given sites data.
:type sites_data: list
:param sites_data: contains for each site a list of
data [from_idx, to_idx, hash_idx] with an optional
fourth item structure_idx
:type n_structures: int
:param n_structures: the number of structures, used if the given
sites lack the optional fourth item
:raise InvalidInput: if there is an issue
:rtype: list
:return: contains `Site`
"""
sites = []
for site_data in sites_data:
n_idxs = len(site_data)
# 3 or 4 indices must be given (see below for cases)
if not (n_idxs == 3 or n_idxs == 4):
msg = ('Sites must be defined using either 3 or 4 indices.')
raise InvalidInput(msg)
elif n_idxs == 3:
# backwards compatibility, no structure specific indexing given,
# all structures have the same indexing
i, j, k = site_data
for l in range(1, n_structures + 1): # noqa: E741, bond atom
sites.append(
Site(from_idx=i, to_idx=j, hash_idx=k, structure_idx=l))
elif n_idxs == 4:
# structure specific indexing given
i, j, k, l = site_data # noqa: E741, bond atom
sites.append(
Site(from_idx=i, to_idx=j, hash_idx=k, structure_idx=l))
return sites
[docs] @staticmethod
def delete_substitution_site_bonds(st, sites):
"""
Delete bonds in the given structure that occur after
the given substitution sites and return extracted
core information.
:type st: `structure.Structure`
:param st: the structure
:type sites: list
:param sites: contains `Site`
:rtype: `structure.Structure`, dict
:return: the extracted core and old-to-new atom index
map
"""
root_remove_idxs = set()
for site in sites:
from_atom = st.atom[site.from_idx]
to_atom = st.atom[site.to_idx]
bonded_atoms = [
atom for atom in to_atom.bonded_atoms
if atom.index != from_atom.index
]
for atom in bonded_atoms:
st.deleteBond(to_atom, atom)
root_remove_idxs.add(atom.index)
remove_idxs = set()
for root_remove_idx in root_remove_idxs:
mol = st.atom[root_remove_idx].getMolecule()
remove_idxs.update(mol.getAtomIndices())
core_st = st.copy()
old_to_new = core_st.deleteAtoms(remove_idxs, renumber_map=True)
for idx in list(old_to_new.keys()):
if old_to_new[idx] is None:
old_to_new.pop(idx)
return core_st, old_to_new
[docs] @staticmethod
def validateSitesData(sites, st):
"""
Validate the given sites data.
:type sites: list
:param sites: contains `Site`
:type st: `structure.Structure`
:param st: the structure
:raise InvalidInput: if there is an issue
"""
st = st.copy()
# validate sites
ring_bonds = rings.find_ring_bonds(st)
for site in sites:
idxs = (site.from_idx, site.to_idx)
if max(idxs) > st.atom_total:
msg = get_msg(site, 'At least a single index does not exist.')
raise InvalidInput(msg)
bond = st.getBond(*idxs)
if not bond:
msg = get_msg(site, 'They are not bound.')
elif bond.order != 1:
msg = get_msg(site, 'It is not a single bond.')
elif set(idxs) in ring_bonds:
msg = get_msg(site, 'The bond is in a ring.')
elif msutils.is_dummy_atom(st.atom[site.to_idx]):
msg = get_msg(site, 'Can not enumerate on a centroid atom.')
else:
msg = None
if msg:
raise InvalidInput(msg)
mol_total = st.mol_total
# delete bonds
core_st, old_to_new = Sites.delete_substitution_site_bonds(st, sites)
# validate molecule
molecule_idxs = set(
st.atom[site.to_idx].getMolecule().number for site in sites)
if len(molecule_idxs) > mol_total:
msg = (
'At least a single site is invalid. Sites should all be '
'("from", "to") index pairs where the "from" index is kept and the '
'"to" index replaced.')
raise InvalidInput(msg)
# validate core
core_idxs = get_core_idxs(st)
to_idx_is_core = any(site.to_idx in core_idxs for site in sites)
if to_idx_is_core or not core_idxs.issubset(old_to_new.keys()):
msg = ('At least a single site is invalid. The core of the '
'molecule can not be modified.')
raise InvalidInput(msg)
[docs] @staticmethod
def getBinnedSites(sites):
"""
Get the sites binned firstly by structure_idx and secondly by hash_idx.
:type sites: list
:param sites: contains `Site`
:rtype: dict
:return: keys are structure_idx, values are dicts whose keys
are hash_idx and values are lists of `Site`
"""
binned_sites = {}
for site in sites:
adict = binned_sites.setdefault(site.structure_idx, {})
adict.setdefault(site.hash_idx, []).append(site)
return binned_sites
[docs]def get_RGE_sources(rgroup_sts, binned_sites, old_to_new):
"""
Return a list of Source that is prepared for enumeration
using the R-Group Enumeration module.
:type rgroup_sts: list of `structure.Structure`
:param rgroup_sts: the R-group structures
:type binned_sites: list of lists of `Site`
:param binned_sites: the binned sites
:type old_to_new: dict
:param old_to_new: a map of old-to-new atom indices
:rtype: list
:return: contains `Source`
"""
sources = []
for rgroup_st, sites in zip(rgroup_sts, binned_sites):
site_idxs = [old_to_new[site.to_idx] for site in sites]
sources.append(Source(rgroup_st=rgroup_st, site_idxs=site_idxs))
return sources
[docs]def substitute(st, rgroups_dict, sites_dict):
"""
Return a copy of the given structure that has been substituted
with the given R-groups at the given sites.
:type st: `structure.Structure`
:param st: the structure on which the substitution is performed
:type rgroups_dict: dict
:param rgroups_dict: keys are integer hashes relating to sites,
values are `structure.Structure`
:type sites_dict: dict
:param sites_dict: keys are integer hashes relating to R-groups,
values are lists of `Site`
:raise InvalidInput: if there is an issue
:rtype: `structure.Structure`
:return: the substituted structure
"""
# the rgroup_enumerate module automatically substitutes
# so that the leaving group is the smaller group and
# because we can't be certain of the input structure
# and substitution sites we are going to first prepare
# the structure by trimming after the substitution sites
all_sites = [site for sites in sites_dict.values() for site in sites]
st, old_to_tmp = Sites.delete_substitution_site_bonds(st, all_sites)
# collect substitution sources
rgroup_sts, binned_sites = [], []
for hash_idx in sorted(sites_dict):
rgroup_sts.append(rgroups_dict[hash_idx])
binned_sites.append(sites_dict[hash_idx])
sources = get_RGE_sources(rgroup_sts, binned_sites, old_to_tmp)
sources = [[[source.rgroup_st]] + source.site_idxs for source in sources]
# do the substitution
try:
obj = rgroup_enumerate.RgroupEnumerator(st,
sources,
optimize_sidechains=True,
enumerate_cistrans=False,
yield_renum_maps=True)
out_st, tmp_to_new = next(iter(obj))
except rgroup_enumerate.RgroupError as err:
msg = ('The R-group enumeration has failed for structure '
f'{st.title}. ') + str(err)
raise InvalidInput(msg)
amap = dict([(k, tmp_to_new[v]) for k, v in old_to_tmp.items()])
update_index_properties(out_st, amap)
tokens = out_st.title.replace(' ', '').split(',')
rgroup_title = RGROUP_SEPARATOR.join(tokens[1:])
out_st.title = tokens[0] + '_' + rgroup_title
return out_st
[docs]class EnumerateReactionWorkflow:
"""
Manage enumeration of a reaction workflow.
"""
[docs] def __init__(self,
rxnwf_file,
rgroup_files,
sites,
force_hetero_substitution=False,
out_rep=None,
base_name=DEFAULT_ENUM_RXNWF_JOB_NAME,
logger=None):
"""
Create an instance.
:type rxnwf_file: str
:param rxnwf_file: the reaction workflow file
:type rgroup_files: dict
:param rgroup_files: keys are hash_idx (see sites), values are file names
:type sites: list
:param sites: contains `Site`
:type force_hetero_substitution: bool
:param force_hetero_substitution: if True then for hetero-eumeration
do not additionally include homo-enumeration results
:param out_rep: if a string then must be either module constant
parserutils.CENTROID or parserutils.ETA, if None then do nothing
:type base_name: str
:param base_name: the base name to use in naming the enumerated output files
:type logger: logging.Logger
:param logger: the logger
"""
self.rxnwf_file = rxnwf_file
self.rgroup_files = rgroup_files
self.binned_sites = Sites.getBinnedSites(sites)
self.force_hetero_substitution = force_hetero_substitution
self.out_rep = out_rep
self.base_name = base_name
self.logger = logger
self.binned_rgroup_sts = {}
self.has_keep_atoms = False
self.out_structure_files = []
self.has_structure_specific_sites = True
[docs] def validate(self):
"""
Validate.
:raise InvalidInput: if there is an issue
"""
# check reaction workflow input file
try:
check_reaction_wf_structures(self.rxnwf_file, ffld_name=None)
except ValueError as err:
msg = str(err) + (' A valid reaction workflow file must '
'first be prepared.')
raise InvalidInput(msg)
# check consistency between hash indices coming from both r-groups and sites
hash_idxs = set(hash_idx for adict in self.binned_sites.values()
for hash_idx in adict.keys())
rgroup_files_tmp = self.rgroup_files.copy()
for hash_idx in hash_idxs:
rgroup_file = rgroup_files_tmp.pop(hash_idx, None)
if not rgroup_file:
msg = (f'The site enumeration index {hash_idx} does not '
'have a corresponding R-group file defined.')
raise InvalidInput(msg)
if rgroup_files_tmp:
hash_idxs = ','.join(map(str, sorted(rgroup_files_tmp.keys())))
msg = ('The following enumeration indices given for R-group '
f'files are not used by any sites: {hash_idxs}.')
raise InvalidInput(msg)
# check r-group files
self.binned_rgroup_sts = {}
for hash_idx, rgroup_file in self.rgroup_files.items():
self.binned_rgroup_sts.setdefault(rgroup_file, []).append(hash_idx)
self.binned_rgroup_sts = {
tuple(v): k for k, v in self.binned_rgroup_sts.items()
}
nfiles = 1
for hash_idxs, rgroup_file in self.binned_rgroup_sts.items():
n_needed = len(hash_idxs)
n_available = structure.count_structures(rgroup_file)
if n_available < n_needed:
msg = (
f'There are only {n_available} R-groups present in the '
f'file {rgroup_file} but {n_needed} different structures '
'are needed.')
raise InvalidInput(msg)
if self.force_hetero_substitution:
nfiles *= int(
math.factorial(n_available) /
math.factorial(n_available - n_needed))
else:
nfiles *= n_available**n_needed
# check sites format
sites = [
site for adict in self.binned_sites.values()
for sites in adict.values() for site in sites
]
Sites.validateSitesFormat(sites)
# check sites structure index
max_structure_idx = max(self.binned_sites.keys())
n_structures = structure.count_structures(self.rxnwf_file)
if max_structure_idx > n_structures:
msg = (
'At least a single site specifies a structure index of '
f'{max_structure_idx} but there are only {n_structures} in {self.rxnwf_file}.'
)
raise InvalidInput(msg)
# check sites
for idx, st in enumerate(structure.StructureReader(self.rxnwf_file), 1):
sites_dict = self.binned_sites.get(idx)
if not sites_dict:
continue
sites = [site for sites in sites_dict.values() for site in sites]
Sites.validateSitesData(sites, st)
if sfu.get_idxs_marked_atoms(st, KEEP_ATOM_KEY):
self.has_keep_atoms = True
if self.logger:
self.logger.info(f'A total of {nfiles} reaction workflow input '
'files will be created.')
self.logger.info('')
if nfiles > 500 and self.logger:
self.logger.warning('Enumerating more than 500 files can be time '
'consuming.')
[docs] def setRGroupStructures(self):
"""
Set the R-group structures.
"""
idx = 0
for hash_idxs in list(self.binned_rgroup_sts.keys()):
rgroup_file = self.binned_rgroup_sts[hash_idxs]
sts = []
for jdx, st in enumerate(structure.StructureReader(rgroup_file), 1):
if not st.title:
st.title = 'R' + str(idx + jdx)
build.add_hydrogens(st)
# if keep atoms are present then set all atoms in the R-group
# as keep atoms as it doesn't make sense to enumerate and not
# keep the enumerated result
if self.has_keep_atoms:
for atom in st.atom:
atom.property[KEEP_ATOM_KEY] = 1
sts.append(st)
self.binned_rgroup_sts[hash_idxs] = sts
idx += len(sts)
[docs] def setHasStructureSpecificSites(self):
"""
Set has structure specific sites.
"""
all_sites_list = []
for sites_dict in self.binned_sites.values():
sites_list = []
for hash_idx in sorted(sites_dict):
sites = [(site.from_idx, site.to_idx)
for site in sites_dict[hash_idx]]
sites_list.append((hash_idx, tuple(sites)))
all_sites_list.append(tuple(sites_list))
n_structures = structure.count_structures(self.rxnwf_file)
self.has_structure_specific_sites = not (len(all_sites_list)
== n_structures and
len(set(all_sites_list)) == 1)
[docs] def getStructures(self):
"""
Generates structure dictionaries where keys are enumeration indices
and values are structures.
:rtype: dict
:return: keys are enumeration indices, values are `structure.Structure`
"""
if self.force_hetero_substitution:
getter = lambda x, y: itertools.permutations(x, r=y)
else:
getter = lambda x, y: itertools.product(x, repeat=y)
getters, all_hash_idxs = [], []
for hash_idxs, sts in self.binned_rgroup_sts.items():
getters.append(getter(sts, len(hash_idxs)))
all_hash_idxs.extend(hash_idxs)
for tuples in itertools.product(*getters):
yield dict(
zip(all_hash_idxs,
tuple(st for atuple in tuples for st in atuple)))
[docs] def doEnumeration(self):
"""
Do the enumeration.
"""
rgroup_titles_count_dict = {}
for rgroup_sts_dict in self.getStructures():
out_sts, rgroup_titles = [], []
for idx, st in enumerate(structure.StructureReader(self.rxnwf_file),
1):
# handle no enumeration case
sites_dict = self.binned_sites.get(idx)
if not sites_dict:
out_sts.append(st)
continue
# in order to ensure that any dummy centroid atoms remain
# at the end of the atom list temporarily convert to eta
changed = etatoggle.toggle_structure(st,
out_rep=parserutils.ETA)
out_st = substitute(st, rgroup_sts_dict, sites_dict)
if changed:
etatoggle.toggle_structure(out_st,
out_rep=parserutils.CENTROID)
out_sts.append(out_st)
rgroup_titles.append(out_st.title.replace(st.title + '_', ''))
if self.has_structure_specific_sites:
rgroups = []
for rgroup_title in rgroup_titles:
for rgroup in rgroup_title.split(RGROUP_SEPARATOR):
if rgroup not in rgroups:
rgroups.append(rgroup)
rgroup_titles = RGROUP_SEPARATOR.join(rgroups)
else:
rgroup_titles = rgroup_titles[0] # they are all they same
rgroup_titles_count_dict.setdefault(rgroup_titles, []).append(1)
count = len(rgroup_titles_count_dict[rgroup_titles])
if count == 1:
idx_str = ''
else:
idx_str = f'_{count}'
if self.out_rep:
for st in out_sts:
etatoggle.toggle_structure(st, out_rep=self.out_rep)
file_name = f'{self.base_name}_{rgroup_titles}{idx_str}{REACTION_WF_TAG}.mae'
structure.write_cts(out_sts, file_name)
self.out_structure_files.append(file_name)
[docs] def run(self):
"""
Run it.
"""
self.validate()
self.setRGroupStructures()
self.setHasStructureSpecificSites()
self.doEnumeration()
job_be = jobcontrol.get_backend()
if job_be or self.logger:
for afile in self.out_structure_files:
if job_be:
job_be.addOutputFile(afile)
if self.logger:
msg = (f'Created enumerated file {afile}.')
self.logger.info(msg)
[docs]def replace_rxnwf_file_suffix(rxnwf_file, suffix):
"""
Replace the suffix in the given reaction workflow file
with a new suffix.
:type rxnwf_file: str
:param rxnwf_file: the reaction workflow file
:type suffix: str
:param suffix: the new suffix
:rtype: str
:return: the new string
"""
return rxnwf_file.replace(f'{REACTION_WF_TAG}.mae', suffix)
[docs]class EnumerateSwapMixin:
"""
Manage enumeration and swapping.
"""
[docs] def runEnumerateRXNWF(self, tag):
"""
Run enumerate reaction workflow.
:type tag: str
:param tag: either the REFERENCE or NOVEL module constant
:raise ReactionWorkflowException: if there is an issue
:rtype: set
:return: the names of the enumerated reaction workflow files
"""
assert tag in [REFERENCE, NOVEL]
if tag == REFERENCE:
sites = self.reference_sites
rxnwf_file = self.reference_rxnwf_file
# do not allow this type of hetero substitution because
# it should not be needed
force_hetero_substitution = False
elif tag == NOVEL:
sites = self.novel_sites
rxnwf_file = self.novel_rxnwf_file
force_hetero_substitution = self.force_hetero_substitution
rgroup_files = {}
for site in sites:
rgroup_file = self.rgroup_files.get(site.hash_idx)
if rgroup_file:
rgroup_files[site.hash_idx] = rgroup_file
if not (rgroup_files and sites):
return set([rxnwf_file])
if self.logger:
self.logger.info(
f'R-group enumerating the {tag} reaction workflow file.')
self.logger.info('')
out_rep = getattr(self, 'out_rep', None)
base_name = replace_rxnwf_file_suffix(rxnwf_file, '')
obj = EnumerateReactionWorkflow(
rxnwf_file,
rgroup_files,
sites,
force_hetero_substitution=force_hetero_substitution,
out_rep=out_rep,
base_name=base_name,
logger=self.logger)
try:
obj.run()
except InvalidInput as err:
raise ReactionWorkflowException(str(err))
if self.logger:
self.logger.info('')
return set(obj.out_structure_files)
[docs] def runSwapFragments(self, enumerated_novel_files, reference_rxnwf_file):
"""
Run swap fragments.
:type enumerated_novel_files: set
:param enumerated_novel_files: the names of the enumerated novel
files
:type reference_rxnwf_file: str
:param reference_rxnwf_file: the reference reaction workflow file
:raise ReactionWorkflowException: if there is an issue
:rtype: set
:return: the names of the enumerated reaction workflow files
"""
if self.logger:
self.logger.info(
f'Swapping fragments in {reference_rxnwf_file} with each '
'of the available novel fragments.')
self.logger.info('')
enumerated_rxnwf_files = set()
for enumerated_novel_file in enumerated_novel_files:
novel_st = structure.Structure.read(enumerated_novel_file)
novel_keep_idxs = sfu.get_keep_idxs(novel_st)
novel_superposition_idxs = sfu.get_superposition_idxs(novel_st)
enumerated_rxnwf_file = replace_rxnwf_file_suffix(
enumerated_novel_file, '_' + reference_rxnwf_file)
enumerated_rxnwf_files.add(enumerated_rxnwf_file)
group_name = replace_rxnwf_file_suffix(enumerated_rxnwf_file, '')
with structure.StructureWriter(enumerated_rxnwf_file) as writer:
for reference_st in structure.StructureReader(
reference_rxnwf_file):
reference_keep_idxs = sfu.get_keep_idxs(reference_st)
reference_superposition_idxs = sfu.get_superposition_idxs(
reference_st)
# only superpose relevant structures and copy the others (for
# example spectators)
if reference_keep_idxs and reference_superposition_idxs:
title = novel_st.title + '_' + reference_st.title
# in order to ensure that any dummy centroid atoms remain
# at the end of the atom list temporarily convert to eta
novel_changed = etatoggle.toggle_structure(
novel_st, out_rep=parserutils.ETA)
reference_changed = etatoggle.toggle_structure(
reference_st, out_rep=parserutils.ETA)
try:
st = sfu.get_assembled_structure(
novel_st,
reference_st,
novel_superposition_idxs,
reference_superposition_idxs,
novel_keep_idxs,
reference_keep_idxs,
title=title,
require_identical_bonds=False)
except sfu.SwapFragmentsException as err:
raise ReactionWorkflowException(str(err))
if novel_changed or reference_changed:
etatoggle.toggle_structure(
st, out_rep=parserutils.CENTROID)
# for the purposes of running the reaction workflow in the next
# step of this workflow force the eid to be consistent with the
# reference as it will be used in the underlying Jaguar multistage
# workflow
st.property[
msprops.ENTRY_ID_PROP] = reference_st.property[
msprops.ENTRY_ID_PROP]
else:
st = reference_st
# update Maestro grouping hierarchy
groups = msutils.get_project_group_hierarchy(
st=reference_st)
groups[0] = group_name
msutils.set_project_group_hierarchy(st, groups)
# handle output representation
out_rep = getattr(self, 'out_rep', None)
if out_rep:
etatoggle.toggle_structure(st, out_rep=out_rep)
writer.append(st)
return enumerated_rxnwf_files
[docs]class EnumerateSwap(EnumerateSwapMixin):
"""
Manage enumeration and swapping.
"""
[docs] def __init__(self,
reference_rxnwf_file,
novel_rxnwf_file=None,
rgroup_files=None,
reference_sites=None,
novel_sites=None,
force_hetero_substitution=False,
logger=None):
"""
Create an instance.
:type reference_rxnwf_file: str
:param reference_rxnwf_file: the reaction workflow file containing the reference
structures
:type novel_rxnwf_file: str
:param novel_rxnwf)file: the reaction workflow file containing the single
novel structure
:type rgroup_files: dict
:param rgroup_files: keys are hash_idx (see sites), values are file names
:type reference_sites: list
:param reference_sites: contains `Site` for the reference structures
:type novel_sites: list
:param novel_sites: contains `Site` for the novel structure
:type force_hetero_substitution: bool
:param force_hetero_substitution: if True then for hetero-eumeration
do not additionally include homo-enumeration results
:type logger: logging.Logger or None
:param logger: output logger or None if there isn't one
"""
self.reference_rxnwf_file = reference_rxnwf_file
self.novel_rxnwf_file = novel_rxnwf_file
self.rgroup_files = rgroup_files
self.reference_sites = reference_sites
self.novel_sites = novel_sites
self.force_hetero_substitution = force_hetero_substitution
self.logger = logger
[docs] def run(self):
"""
Run it.
:rtype: set
:return: all reaction workflow input files
"""
return self.getRXNWFInputFiles()
[docs]def enumerate_swap(reference_rxnwf_file,
novel_rxnwf_file=None,
rgroup_files=None,
reference_sites=None,
novel_sites=None,
force_hetero_substitution=False):
"""
Return all reaction workflow files created by R-group enumerating the given
reference and novel structures and swapping reference fragments for novel
fragments.
:type reference_rxnwf_file: str
:param reference_rxnwf_file: the reaction workflow file containing the reference
structures
:type novel_rxnwf_file: str
:param novel_rxnwf)file: the reaction workflow file containing the single
novel structure
:type rgroup_files: dict
:param rgroup_files: keys are hash_idx (see sites), values are file names
:type reference_sites: list
:param reference_sites: contains `Site` for the reference structures
:type novel_sites: list
:param novel_sites: contains `Site` for the novel structure
:type force_hetero_substitution: bool
:param force_hetero_substitution: if True then for hetero-eumeration
do not additionally include homo-enumeration results
:rtype: set
:return: all reaction workflow input files
"""
eswap = EnumerateSwap(reference_rxnwf_file,
novel_rxnwf_file=novel_rxnwf_file,
rgroup_files=rgroup_files,
reference_sites=reference_sites,
novel_sites=novel_sites,
force_hetero_substitution=force_hetero_substitution)
return eswap.run()
[docs]def get_sites(sites):
"""
Return the reference and novel sites.
:type sites: list
:param sites: contains for each site a list of data [from_idx, to_idx,
hash_idx] with an optional fourth item structure_idx and an optional
fifth item reaction_workflow_utils.REFERENCE or reaction_workflow_utils.NOVEL
module constants
:raise ReactionWorkflowException: if there is an issue
:rtype: list, list
:return: the reference and novel sites, each containing `rxnwfu.Site`
"""
reference_sites, novel_sites = [], []
for site in sites:
items = []
for item in site:
try:
items.append(int(item))
except ValueError:
items.append(item)
if len(items) == 5:
if items[-1] == REFERENCE:
reference_sites.append(items[:-1])
elif items[-1] == NOVEL:
if items[-2] != 1:
msg = (
f'Sites designated as "{NOVEL}" must specify a single '
'structure index with a value of 1.')
raise ReactionWorkflowException(msg)
novel_sites.append(items[:-2])
else:
msg = (f'Sites can only be designated as "{REFERENCE}" or '
f'"{NOVEL}".')
raise ReactionWorkflowException(msg)
elif len(items) == 4:
if items[-1] != 1:
msg = (f'Sites designated as "{NOVEL}" must specify a single '
'structure index with a value of 1.')
raise ReactionWorkflowException(msg)
novel_sites.append(items[:-1])
elif len(items) == 3:
novel_sites.append(items)
else:
msg = (
'Sites must specify three indices with an optional fourth index '
f'plus an optional fifth item, either "{REFERENCE}" or '
f'"{NOVEL}".')
raise ReactionWorkflowException(msg)
reference_sites = Sites.getSites(reference_sites)
novel_sites = Sites.getSites(novel_sites)
return reference_sites, novel_sites
[docs]def validate_auto_reaction_workflow_files(reaction_wf_input,
novel_rxnwf_file=None,
mass_conserved=False,
sites=None):
"""
Validate workflow input file, novel reaction workflow file
:type reaction_wf_input: str
:param reaction_wf_input: reference reaction workflow file name
:type novel_rxnwf_file: str
:param novel_rxnwf_file: the reaction workflow file containing the single
novel structure
:type mass_conserved: bool
:param mass_conserved: True if mass is conserved else false
:type sites: list
:param sites: contains for each site a list of data [from_idx, to_idx,
hash_idx] with an optional fourth item structure_idx and an optional
fifth item reaction_workflow_utils.REFERENCE or reaction_workflow_utils.NOVEL
module constants
:raise ReactionWorkflowException: if there is an issue
"""
# check reference rxnwf file
type_cast_reaction_wf_input(reaction_wf_input,
exception_type=ReactionWorkflowException,
mass_conserved=mass_conserved)
# check novel rxnwf file
if novel_rxnwf_file and structure.count_structures(novel_rxnwf_file) > 1:
msg = (
'The novel reaction workflow file must contain a single structure.')
raise ReactionWorkflowException(msg)
# Validate sites
if sites:
get_sites(sites)
[docs]def has_keep_and_superpose_atoms(input_st):
"""
Return True if the given structure has at least a single keep atom
and a single superpose atom.
:type input_st: `structure.Structure`
:param input_st: the structure
:rtype: bool
:return: True if the structure has at least a single keep atom
and a single superpose atom
"""
# if there are superpose atoms the requirement that there be at
# least three of them is enforced in the Reaction Workflow Input GUI
# which prepares the input here
has_keep = has_superpose = False
for atom in input_st.atom:
if atom.property.get(KEEP_ATOM_KEY):
has_keep = True
if atom.property.get(SUPERPOSITION_ATOM_KEY):
has_superpose = True
if has_keep and has_superpose:
return True
return False
[docs]def validate_auto_reaction_workflow_options(
reaction_wf_input,
novel_rxnwf_file=None,
n_conformers=DEFAULT_N_CONFORMERS,
jaguar_keywords=DEFAULT_JAGUAR_KEYWORDS):
"""
Validate auto reaction workflow options.
:type reaction_wf_input: str
:param reaction_wf_input: reference reaction workflow file name
:type novel_rxnwf_file: str
:param novel_rxnwf_file: the reaction workflow file containing the single
novel structure
:type n_conformers: int
:param n_conformers: number of conformers to search for
:type jaguar_keywords: str
:param jaguar_keywords: whitespace separated Jaguar <key>=<value> tokens
:raise ReactionWorkflowException: if there is an issue
"""
try:
sts = list(structure.StructureReader(reaction_wf_input))
except IOError as err:
raise ReactionWorkflowException(str(err))
if novel_rxnwf_file:
# check references
for st in sts:
if has_keep_and_superpose_atoms(st):
break
else:
msg = (
'At least a single reference structure must contain atoms marked with '
f'the "{KEEP_ATOM_KEY}" and "{SUPERPOSITION_ATOM_KEY}" '
'properties which specify how to swap fragments with the novel '
'structure.')
raise ReactionWorkflowException(msg)
# check novel
if structure.count_structures(novel_rxnwf_file) > 1:
msg = (
'The novel reaction workflow file must contain a single structure.'
)
raise ReactionWorkflowException(msg)
novel_st = structure.Structure.read(novel_rxnwf_file)
if not has_keep_and_superpose_atoms(novel_st):
msg = (
'The novel structure must contain atoms marked with the '
f'"{KEEP_ATOM_KEY}" and "{SUPERPOSITION_ATOM_KEY}" '
'properties which specify how to swap fragments with the reference '
'structures.')
raise ReactionWorkflowException(msg)
sts.append(novel_st)
# check FF
if n_conformers:
try:
check_ff_assignment(sts, ffld_name=DEFAULT_FORCE_FIELD_NAME)
except ValueError as err:
raise ReactionWorkflowException(str(err))
# check Jaguar kwargs
type_cast_jaguar_keywords(jaguar_keywords,
exception_type=ReactionWorkflowException)
[docs]def validate_all_auto_reaction_workflow_options(
reaction_wf_input,
novel_rxnwf_file=None,
mass_conserved=False,
sites=None,
n_conformers=5,
jaguar_keywords=DEFAULT_JAGUAR_KEYWORDS):
"""
Validate all auto reaction workflow options or raise
ReactionWorkflowException error.
:type reaction_wf_input: str
:param reaction_wf_input: reference reaction workflow file name
:type novel_rxnwf_file: str
:param novel_rxnwf_file: the reaction workflow file containing the single
novel structure
:type mass_conserved: bool
:param mass_conserved: True if mass is conserved else false
:type sites: list
:param sites: contains for each site a list of data [from_idx, to_idx,
hash_idx] with an optional fourth item structure_idx and an optional
fifth item reaction_workflow_utils.REFERENCE or reaction_workflow_utils.NOVEL
module constants
:type n_conformers: int
:param n_conformers: number of conformers to search for
:type jaguar_keywords: str
:param jaguar_keywords: whitespace separated Jaguar <key>=<value> tokens
:raise rxnwfu.ReactionWorkflowException: if there is an issue
"""
validate_auto_reaction_workflow_files(reaction_wf_input=reaction_wf_input,
novel_rxnwf_file=novel_rxnwf_file,
mass_conserved=mass_conserved,
sites=sites)
validate_auto_reaction_workflow_options(reaction_wf_input=reaction_wf_input,
novel_rxnwf_file=novel_rxnwf_file,
n_conformers=n_conformers,
jaguar_keywords=jaguar_keywords)
[docs]def get_smiles(st):
"""
Return the smiles of the given metal complex.
:type st: `schrodinger.structure.Structure`
:param st: the metal complex
:raise: (`rdkit_adapter.InconsistentStructureError`,
`rdkit_adapter.UnsupportedStructureError`)
:rtype: str
:return: smiles
"""
# smiles generation requires centroid representation
_st = st.copy()
etatoggle.toggle_structure(_st, out_rep=parserutils.CENTROID)
return adapter.to_smiles(_st, adapter.StereoChemistry_Copy,
adapter.Canonicalize_Enable,
adapter.Sanitize_Enable)
[docs]def bin_by_geometry(sts, eps=parserutils.DEFAULT_DEDUP_GEOM_EPS):
"""
Return a dictionary of structures binned by common geometry.
:type sts: list[`schrodinger.structure.Structure`]
:param sts: the structures
:type eps: float
:param eps: the RMSD precision in Angstrom that controls the size
of the clusters, see sklearn.cluster.DBSCAN documentation for
more details
:rtype: dict
:return: keys are representative structures from each geometry bin,
values are lists of other structures in the same geometry bin
"""
# while SMILES allows atomic charges they do not allow total charge
# nor total multiplicity (nor atomic multiplicity) so generalize the
# hash
hash_dict = {}
for st in sts:
smiles = get_smiles(st)
charge = st.property.get(CHARGE_KEY, 0)
multip = st.property.get(MULTIPLICITY_KEY, 1)
ahash = f'{smiles}_charge_{charge}_multip_{multip}'
hash_dict.setdefault(ahash, []).append(st)
st_dict = {}
for conformers in hash_dict.values():
ref_st = conformers[0] # choose first conformer as the reference
idxs = list(range(1, ref_st.atom_total + 1))
key = lambda x: rmsd.superimpose(
ref_st, idxs, x, idxs, use_symmetry=False, move_which=rmsd.CT)
groups = mathutils.dbscan_cluster(conformers, eps, key=key)
for group in groups.values():
ref_st = group[0] # choose first geometry as the reference
st_dict[ref_st] = group[1:]
return st_dict
[docs]def get_orig_title(title, unique_geom_dict):
"""
Return the original title of the given title.
:type title: str
:param title: the title
:type unique_geom_dict: dict
:param unique_geom_dict: keys are original titles, values are structures
in the same geometry bin, with the representative structure at
index 0
:rtype: str, str
:return: the original title and the extension
"""
# titles come in like the following
#
# 'my_reactant' (structure title directly from user input for the job)
#
# 'my_reactant_stage_<x>' where x > 0, if the stage is an analysis stage
# then there will be a trailing '_analysis_<y>' where y > 0 is the parent
# stage whose output structure is used to store the analysis property,
# (structure title directly from JMSWF output)
#
# 'my_reactant-<x>_stage_<y>' where x > 1 is the conformer number, otherwise
# same as above (structure title directly from JMSWF output or structure
# JMSWF output file base name)
#
# 'my_reactant-<x>' where x > 1 is the conformer number, (structure title
# after postprocessing JMSWF)
#
# 'my_reactant-<x>_stage_<y>_anharmonic' where x > 1 is the conformer number,
# otherwise same as above (structure anharmonic output file base name)
#
# where 'my_reactant' has been jobutils.clean_string'd
# see MATSCI-11362 - we need to distinguish whether titles like 'my_reactant-2'
# were original titles or those generated by the conformational search which
# uses an '-<idx>' convention
orig_titles = set()
for orig_sts in unique_geom_dict.values():
orig_titles.update(
[jobutils.clean_string(orig_st.title) for orig_st in orig_sts])
if title in orig_titles:
return title, ''
if STAGE_SEPARATOR in title:
conf_title, stage_hash = title.split(STAGE_SEPARATOR)
stage_hash = f'{STAGE_SEPARATOR}{stage_hash}'
else:
conf_title, stage_hash = title, ''
match = re.search(r'(-\d+)$', conf_title)
if match:
ext = match.group(1)
orig_title = conf_title.rpartition(ext)[0]
else:
ext = ''
orig_title = conf_title
return orig_title, f'{ext}{stage_hash}'
[docs]def get_unique_geom_title(title, unique_geom_dict):
"""
Return the title of the unique geometry representative for the
given title.
:type title: str
:param title: the title
:type unique_geom_dict: dict
:param unique_geom_dict: keys are original titles, values are structures
in the same geometry bin, with the representative structure at
index 0
:rtype: str
:return: the title of the representative
"""
orig_title, ext = get_orig_title(title, unique_geom_dict)
if not unique_geom_dict:
return title
elif unique_geom_dict.get(orig_title):
return title
else:
for rep_title, orig_sts in unique_geom_dict.items():
orig_st, orig_other_sts = orig_sts[0], orig_sts[1:]
for orig_other_st in orig_other_sts:
if orig_other_st.title == orig_title:
return f'{rep_title}{ext}'
[docs]def get_energy_db_term(rxnwf_file_path=None,
rxnwf_sts=None,
tstart=jmswfu.DEFAULT_TEMP_START,
tstep=jmswfu.DEFAULT_TEMP_STEP,
ntemp=jmswfu.DEFAULT_TEMP_N,
pstart=jmswfu.DEFAULT_PRESS_START,
pstep=jmswfu.DEFAULT_PRESS_STEP,
npress=jmswfu.DEFAULT_PRESS_N,
anharm=False,
extra_stages_file=False,
**kwargs):
"""
Get a dict containing all the energy term generated by reaction workflow
:type rxnwf_file_path: str or None
:param rxnwf_file_path: the path to a reaction workflow file
:type rxnwf_sts: list[`schrodinger.structure.Structure`] or None
:param rxnwf_sts: reaction workflow structures
:type tstart: float
:param tstart: the starting temperature (K)
:type tstep: float
:param tstep: the step size of the temperature (K)
:type ntemp: int
:param ntemp: the number of temperature points
:type pstart: float
:param pstart: the starting pressure (atm)
:type pstep: float
:param pstep: the step size of the pressure (atm)
:type npress: int
:param npress: the number of pressure points
:type anharm: bool
:param anharm: True if anharmonic calculation is performed else False
:type extra_stages_file: string
:param extra_stages_file: name of stage file
:rtype energy_term_dict: dict
:return: dictionary of energy keys.
"""
msg = ('Both input file and input structures are present or absent. '
'Please specify either input file or input structures')
assert bool(rxnwf_file_path) ^ bool(rxnwf_sts), msg
rate_constants = kwargs.pop('rate_constants', False)
custom_rate_constants = kwargs.pop('custom_rate_constants', False)
if rxnwf_file_path:
rxnwf_sts = list(structure.StructureReader(rxnwf_file_path))
keywords = {
'ifreq': 1,
'tmpini': tstart,
'tmpstp': tstep,
'ntemp': ntemp,
'press': pstart,
'press_step': pstep,
'npress': npress
}
not_include = (jaguarworkflows.HOMO_ENERGY_PROP,
jaguarworkflows.ALPHA_HOMO_ENERGY_PROP,
jaguarworkflows.BETA_HOMO_ENERGY_PROP,
jaguarworkflows.LUMO_ENERGY_PROP,
jaguarworkflows.ALPHA_LUMO_ENERGY_PROP,
jaguarworkflows.BETA_LUMO_ENERGY_PROP,
jaguarworkflows.ENTROPY_STARTER)
# Collect all key info from Jaguar
keys = jmswfu.get_property_keys_from_keywords(keywords)
keys.append(jaguarworkflows.GAS_PHASE_ZERO_POINT_ENERGY_PROP)
# Create dict to collect info
energy_term_dict = {}
energy_term_dict.setdefault(ENERGY_KEYS, {})
energy_term_dict.setdefault(ANHARMONIC_ENERGY_KEYS, {})
energy_term_dict.setdefault(CUSTOM_ENERGY_KEYS, {})
# Collect energy keys
stage_idx = get_rep_stage_idx(rxnwf_sts=rxnwf_sts)
energy_terms = [
key + STAGE_SEPARATOR + str(stage_idx)
for key in keys
if not key.startswith(not_include)
]
for term in energy_terms:
key = ReactionWorkflowEnergyAnalysis.getHeader(term)
energy_term_dict[ENERGY_KEYS][key] = term
# Collect anharmonic keys
if anharm:
not_include_anharmonic_terms = (
jaguarworkflows.ZERO_POINT_ENERGY_PROP,
jaguarworkflows.GAS_PHASE_ENERGY_PROP,
jaguarworkflows.GAS_PHASE_ZERO_POINT_ENERGY_PROP)
filtered_terms = [
key for key in energy_terms
if not key.startswith(not_include_anharmonic_terms)
]
for term in filtered_terms:
anharmonic_energy_term = anharmonic.KEY_STARTER + "_".join(
term.split("_")[2:])
key = ReactionWorkflowEnergyAnalysis.getHeader(
anharmonic_energy_term)
energy_term_dict[ANHARMONIC_ENERGY_KEYS][
key] = anharmonic_energy_term
# Collect custom key
if extra_stages_file:
press_data = PressData(press_start=pstart,
press_step=pstep,
press_n=npress)
temp_data = TempData(temp_start=tstart, temp_step=tstep, temp_n=ntemp)
pressures, temps = get_pressures_temps(press_data, temp_data)
extra_stages = jmswfu.read_stage_datafile(extra_stages_file)
for extra_stage in extra_stages:
if not extra_stage.analyze_data:
continue
for analyze_data in extra_stage.analyze_data:
# if using wildcards collect keys over the entire pressure and
# temperature matrix, doing this for a single line is sufficient
# hence the break
if jaguarworkflows.ALL_TEMP_PRESS_KEY_EXT in analyze_data.parent_key:
keys = []
for press, temp in itertools.product(pressures, temps):
ext = jaguarworkflows.get_temp_press_key_ext(
temp, press)
keys.append(f'{analyze_data.key}{ext}')
break
else:
keys = [extra_stage.analyze_data[0].key]
for key in keys:
term = f'{key}{STAGE_SEPARATOR}{stage_idx + extra_stage.index - 1}'
key = ReactionWorkflowEnergyAnalysis.getHeader(term)
energy_term_dict[CUSTOM_ENERGY_KEYS][key] = term
if rate_constants:
if custom_rate_constants:
value = RATE_CONSTANT_STRING
else:
value = CONF_AVG_CTST_RATE_CONSTANT
else:
value = None
energy_term_dict[RATE_CONSTANT_KEY] = value
return energy_term_dict
[docs]class SiblingNode:
"""
Contains parent-child information for a sibling group as well as features
for creating an energy diagram
"""
LINE_LENGTH = 1
X_GAP = LINE_LENGTH / 2
[docs] def __init__(self, name, axes):
"""
:param str name: The name of the sibling group
:param `matplotlib.axes.Axes` axes: The plot's axes object
"""
self.name = name
self.axes = axes
self.generation = None
self.csv_idx = None
self.color = None
self.parents = []
self.children = []
[docs] def addToPlot(self, y_val, energy, text_offset):
"""
Add this sibling group to the plot
:param float y_val: The y-value of the sibling on the plot
:param float energy: The energy to show on the label
:param float text_offset: The offset for showing the label above
horizontal lines
"""
self.y_val = y_val
x_mid = (self.X_GAP + self.LINE_LENGTH) * self.generation
x_1 = x_mid - self.LINE_LENGTH / 2
self.x_2 = x_mid + self.LINE_LENGTH / 2
color = self.color or 'k'
self.axes.plot((x_1, self.x_2), (y_val, y_val),
linewidth=3,
color=color)
label_text = self.name + ': {0:.2f}'.format(energy)
self.label = self.axes.text(x_mid,
y_val + text_offset,
label_text,
ha='center',
size='small')
color = self.color or '0.75'
for parent in self.parents:
self.axes.plot((x_1, parent.x_2), (y_val, parent.y_val),
color=color)
[docs]class EnergyDiagramPlotter:
"""
Class for exporting energy level diagrams
"""
TOTAL_FREE_ENERGY = 'Total_Free_Energy'
TEXT_OFFSET_DIVISOR = 50
VERTICAL_SPACING_DIVISOR = 23
PDF_FILE_ENDING = '_e_diagrams.pdf'
PNG_FILE_ENDING = '_std_e_diagram.png'
[docs] def __init__(self, json_path, csv_path, logger=None):
"""
:param str json_path: Path to _conf_avg_wo_x_rel_parents.json file
:param str csv_path: Path to _conf_avg_wo_x_rel_reactants.csv file
"""
# We need the csv in addition to the json because the former contains
# energies relative to reactants while the latter contains the needed
# parenting information
self.json_path = json_path
self.csv_path = csv_path
self.logger = logger
[docs] def run(self):
"""
Export the energy level diagrams
"""
# Agg rather than QTAgg is supposed to be default backend (MATSCI-9819)
# to prevent requiring graphics libraries in case running on a remote compute host
import matplotlib
from matplotlib import figure
from matplotlib.backends import backend_pdf
from matplotlib.backends.backend_agg import FigureCanvasAgg
self.color_map = matplotlib.cm.get_cmap('gist_rainbow')
fig = figure.Figure(figsize=(10, 7.5), dpi=200)
self.canvas = FigureCanvasAgg(fig)
self.axes = fig.add_subplot(111)
fig.subplots_adjust(left=0.05, right=0.95)
# Need to read the sibling names as str otherwise 000 will change to 0
# and it won't match the json file anymore (MATSCI-11196)
self.ene_df = pandas.read_csv(self.csv_path,
dtype={SIBLING_GROUP_NAME: str})
self.findTotalEnergyProp()
self.parseSiblings()
job_name = jobutils.get_jobname(DEFAULT_JOB_NAME)
pdf_path = job_name + self.PDF_FILE_ENDING
e_png_path = job_name + self.PNG_FILE_ENDING
with backend_pdf.PdfPages(pdf_path) as pdf_pages:
for prop in self.ene_df.columns[1:]:
self.plotDiagram(prop)
pdf_pages.savefig(fig)
if prop == self.tot_ene_prop:
self.canvas.print_figure(e_png_path)
jobutils.add_outfile_to_backend(pdf_path)
jobutils.add_outfile_to_backend(e_png_path)
[docs] def findTotalEnergyProp(self):
"""
Find the Total Free Energy property with temperature and pressure
closest to 298.15 K and 1.0 atm
"""
def get_closest_key(the_dict, value):
keys = list(the_dict.keys())
array = numpy.array(keys)
idx = (numpy.abs(array - value)).argmin()
return keys[idx]
props = self.ene_df.columns[1:]
tot_ene_props = [p for p in props if self.TOTAL_FREE_ENERGY in p]
temp_press_dict = defaultdict(dict)
for prop in tot_ene_props:
temp = jaguarworkflows.get_temperature(prop)
press = jaguarworkflows.get_pressure(prop)
temp_press_dict[temp][press] = prop
target_temp = get_closest_key(temp_press_dict, 298.15)
target_press = get_closest_key(next(iter(temp_press_dict.values())),
1.0)
self.tot_ene_prop = temp_press_dict[target_temp][target_press]
[docs] def getSibling(self, name):
"""
Get an existing sibling or create a new one and return it
:param str name: The name of the sibling
:rtype: `SiblingNode`
:return: A SiblingNode object
"""
if name in self.siblings:
return self.siblings[name]
else:
sibling = SiblingNode(name, self.axes)
self.siblings[name] = sibling
return sibling
[docs] def parseSiblings(self):
"""
Read the siblings and parent child info from json file and find
the generation of each sibling
"""
with open(self.json_path) as json_fh:
json_data = json.load(json_fh)
self.siblings = {}
for sibling_name, parents_data in json_data.items():
child = self.getSibling(sibling_name)
for parent_name in parents_data:
parent = self.getSibling(parent_name)
parent.children.append(child)
child.parents.append(parent)
# Find indexes of each sibling in the csv file
for idx, name in enumerate(self.ene_df[SIBLING_GROUP_NAME]):
try:
self.siblings[name].csv_idx = idx
except KeyError:
error = (f'Cannot find "{name}" sibling group in '
f'{os.path.basename(self.csv_path)}.')
raise ReactionWorkflowException(error)
self.findGenerations()
self.findSameGenerationIndices()
self.setColors()
[docs] def setColors(self):
"""
Set colors for paths that have one parent and one child for all nodes,
excluding the reactant
"""
node_paths = []
for first_gen in self.generations[1]:
path = []
node = first_gen
while True:
if len(node.parents) > 1 or len(node.children) > 1:
path.clear()
break
if len(node.children) == 0:
# last node. Wrap up.
path.append(node)
break
path.append(node)
node = node.children[0]
if path:
node_paths.append(path)
num_paths = len(node_paths)
for idx, path in enumerate(node_paths):
color = self.color_map(idx / num_paths)
for node in path:
node.color = color
[docs] def findGenerations(self):
"""
Determine the generations of the siblings
Each child's generation is the largest generation of its parents, plus 1
which means that if we have the following ownerships:
1 -> 2 -> 3 -> 4
1 -> 5 -> 4
"4" will be a 4th generation and "5" will be a 2nd generation, so the
plot will look like this:
1 -- 2 -- 3 -- 4
1 -- 5 ------- 4
as opposed to
1 ------- 5 -- 4
"""
def set_generation(sibling, generation):
if sibling.generation is not None:
if generation <= sibling.generation:
# Nothing to do
return
sibling.generation = generation
for child in sibling.children:
set_generation(child, generation + 1)
source = [x for x in self.siblings.values() if not x.parents][0]
set_generation(source, 0)
self.generations = defaultdict(list)
for sibling in self.siblings.values():
self.generations[sibling.generation].append(sibling)
[docs] def findSameGenerationIndices(self):
"""
Find groups of csv indices that belong to the same generation
"""
self.same_gen_indices = []
for idx in range(1, max(self.generations) + 1):
self.same_gen_indices.append(
[sibling.csv_idx for sibling in self.generations[idx]])
[docs] def plotDiagram(self, prop):
"""
Plot the energy diagram for the passed property
"""
self.axes.clear()
self.axes.axis('off')
self.axes.autoscale(axis="y")
self.canvas.figure.suptitle(prop)
disclaimer = ('Lines and labels are moved vertically to avoid overlaps.'
' The Y-axis does not have a uniform scale.')
self.axes.text(0.5,
-0.05,
disclaimer,
size=6,
ha="center",
transform=self.axes.transAxes)
y_vals = self.getAdjustedYValues(prop)
text_offset = self.ene_df[prop].values.ptp() / self.TEXT_OFFSET_DIVISOR
for idx in range(max(self.generations) + 1):
for sibling in self.generations[idx]:
csv_index = sibling.csv_idx
sibling.addToPlot(y_val=y_vals[csv_index],
energy=self.ene_df[prop][csv_index],
text_offset=text_offset)
self.canvas.draw()
self.labels = [sibling.label for sibling in self.siblings.values()]
self.adjustLabels()
[docs] def adjustLabels(self):
"""
Adjust the size of labels. If a label's width is large enough to overlap
with a neighbour's label, reduce the font and redraw
"""
adjust = False
max_width = SiblingNode.LINE_LENGTH + SiblingNode.X_GAP
for label in self.labels:
extents = label.get_window_extent()
width = extents.transformed(self.axes.transData.inverted()).width
if width > max_width:
adjust = True
break
if adjust:
for label in self.labels:
label.set_size(label.get_size() - 0.5)
self.canvas.draw()
self.adjustLabels()
[docs] def getAdjustedYValues(self, prop):
"""
Adjust the y values so that there is enough gap between all consecutive
values in the same generation so there is no overlap.
:param str prop: The property to adjust values for
:rtype: numpy.array
:return: The adjusted Y values as a numpy array
"""
divisor = self.VERTICAL_SPACING_DIVISOR
values = self.ene_df[prop].values.copy()
base_range = values.ptp()
# The required gap is a fixed portion of the plot size.
required_diff = base_range / divisor
last_num_raised = 0
while True:
num_raised = 0
idx_to_deficit = {}
for indices in self.same_gen_indices:
sorted_indices = sorted(indices, key=lambda x: values[x])
generation_values = values[sorted_indices]
generation_diffs = numpy.diff(generation_values)
for diff_idx, diff in enumerate(generation_diffs):
if diff < required_diff:
# Not enough room. All values above need to be raised
num_raised += 1
array_idx = sorted_indices[diff_idx + 1]
idx_to_deficit[array_idx] = required_diff - diff
# Raising the values will increase the value range and shrink all
# spacings because the plot height remains constant, so we will need
# more space for labels to not overlap. Now we calculate the
# extra spacing (x) required. Adding extra_spacing to deficits will
# fix the problem if no new labels overlap
# new_range = base_range + sum(deficits) + num_raised * x
# new_required_diff = x + required_diff = new_height / divisor
# divisor * x + divisor * required_diff =
# base_range + sum(deficits) + num_raised * x
# (divisor-num_raised) * x = base_range + sum(deficits) -
# divisor * required_diff
denom = divisor - num_raised
if denom <= 0:
# Too many colliding labels. Will need to reduce the spacing.
divisor += 1
continue
numerator = (base_range + sum(idx_to_deficit.values()) -
divisor * required_diff)
extra_spacing = numerator / denom
if last_num_raised == num_raised:
if abs(extra_spacing) > 1e-5:
if self.logger:
warning = (
'Error during adjustment of energy diagram'
f' positions for "{prop}". Using original values.')
self.logger.warning(warning)
return values
# Adjusting the sizes causes no new labels to overlap. Use the
# current deficits
break
last_num_raised = num_raised
# Raise the requirement and see if any other labels overlap now
required_diff += extra_spacing
for idx, deficit in idx_to_deficit.items():
val = values[idx]
values[values >= val] += deficit
return values