"""
Utilities for reaction workflows.
Copyright Schrodinger, LLC. All rights reserved.
"""
import argparse
import copy
import csv
import enum
import glob
import itertools
import json
import os
import pandas
import re
import shutil
import math
import warnings
from collections import OrderedDict
from collections import namedtuple
from types import SimpleNamespace
import networkx as nx
import numpy
import yaml
from collections import defaultdict
from scipy import constants
from schrodinger import structure
from schrodinger import adapter
from schrodinger.application.jaguar.input import JaguarInput
from schrodinger.application.jaguar.output import JaguarOutput
from schrodinger.application.macromodel import utils as mmodutils
from schrodinger.application.matsci import anharmonic
from schrodinger.application.matsci import desmondutils
from schrodinger.application.matsci import etarotamers
from schrodinger.application.matsci import etatoggle
from schrodinger.application.matsci import \
    jaguar_multistage_workflow_utils as jmswfu
from schrodinger.application.matsci import jaguar_restart
from schrodinger.application.matsci import textlogger
from schrodinger.application.matsci import jaguarworkflows
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import msutils
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import msprops
from schrodinger.application.matsci import \
    genetic_optimization as go
from schrodinger.application.matsci import swap_fragments_utils as sfu
from schrodinger.infra import mm
from schrodinger.job import jobcontrol
from schrodinger.structutils import analyze
from schrodinger.structutils import rmsd
from schrodinger.structutils import rings
from schrodinger.structutils import rgroup_enumerate
from schrodinger.structutils import build
from schrodinger.math import mathutils
from schrodinger.test.stu.common import zip_directory
from schrodinger.utils import csv_unicode
from schrodinger.utils import fileutils
from schrodinger.utils import subprocess
from schrodinger.utils import units
from schrodinger.utils.license import is_opls2_available
FLAG_N_CONFORMERS = '-n_conformers'
FLAG_PP_REL_ENERGY_THRESH = '-pp_rel_energy_thresh'
FLAG_RETURN_CSEARCH_FILES = '-return_csearch_files'
FLAG_SKIP_ETA_ROTAMERS = '-skip_eta_rotamers'
FLAG_JAGUAR = '-jaguar'
FLAG_JAGUAR_KEYWORDS = '-jaguar_keywords'
FLAG_TEMP_START = '-temp_start'
FLAG_TEMP_STEP = '-temp_step'
FLAG_TEMP_N = '-temp_n'
FLAG_PRESS_START = '-press_start'
FLAG_PRESS_STEP = '-press_step'
FLAG_PRESS_N = '-press_n'
FLAG_MAX_I_FREQ = anharmonic.FLAG_MAX_I_FREQ
FLAG_TPP = parserutils.FLAG_TPP
FLAG_DESCRIPTORS = '-descriptors'
FLAG_LIGFILTER = '-ligfilter'
FLAG_CANVAS = '-canvas'
FLAG_MOLDESCRIPTORS = '-moldescriptors'
FLAG_INCLUDE_VECTORIZED = '-include_vectorized'
FLAG_FINGERPRINTS = '-fingerprints'
FLAG_SAVEFILES = '-savefiles'
FLAG_STPROPS = '-stprops'
FLAG_INPUT_FILE = '-input_file'
DEFAULT_JOB_NAME = 'reaction_workflow'
DEFAULT_N_CONFORMERS = 5
# keys and values should be strings
DEFAULT_JAGUAR_KEYWORDS = OrderedDict([('dftname', 'B3LYP'), ('iuhf', '1'),
                                       ('basis', 'LACVP**')])
DEFAULT_TPP = 1
# OPLS_2005 does not allow ZOB to sp3 carbon while OPLS3 does
DEFAULT_FORCE_FIELD_NAME = mm.OPLS_NAME_F16  # OPLS3
RESERVED_JAGUAR_KEYS = set([
    'molchg', 'multip', 'igeopt', 'ifreq', 'itrvec', 'nhesref', 'ntemp',
    'tmpini', 'tmpstp', 'npress', 'press', 'press_step'
])
REACTION_WF_TAG = '_rxnwf'
REACTION_WF_STRUCTURE_KEY = sfu.REACTION_WF_STRUCTURE_KEY
CONFORMERS_GROUP_KEY = sfu.CONFORMERS_GROUP_KEY
SIBLING_GROUP_KEY = sfu.SIBLING_GROUP_KEY
PARENT_SIBLING_GROUPS_KEY = sfu.PARENT_SIBLING_GROUPS_KEY
CHARGE_KEY = msprops.CHARGE_PROP
MULTIPLICITY_KEY = msprops.MULTIPLICITY_PROP
KEEP_ATOM_KEY = sfu.KEEP_ATOM_PROP
SUPERPOSITION_ATOM_KEY = sfu.SUPERPOSITION_ATOM_PROP
RESTRAINED_ATOM_KEY = 'b_matsci_Reaction_Workflow_Restrained_Atom'
RESTRAINED_DISTANCES_KEY = sfu.RESTRAINED_DISTANCES_KEY
RESTRAINED_ANGLES_KEY = sfu.RESTRAINED_ANGLES_KEY
RESTRAINED_DIHEDRALS_KEY = sfu.RESTRAINED_DIHEDRALS_KEY
TRANSITION_STATE_STRUCTURE_KEY = sfu.TRANSITION_STATE_STRUCTURE_KEY
INDEX_SEPARATOR = sfu.INDEX_SEPARATOR
PARENT_SEPARATOR = ','
SEPARATOR = sfu.SEPARATOR
ENTRY_ID_KEY = msprops.ENTRY_ID_PROP
RATE_CONSTANT_STRING = 'Rate_Constant'
CONF_AVG_CTST_RATE_CONSTANT = 'Conf_Avg_CTST_Rate_Constant'
TempData = namedtuple('TempData', ['temp_start', 'temp_step', 'temp_n'])
PressData = namedtuple('PressData', ['press_start', 'press_step', 'press_n'])
ReactProdTS = namedtuple('ReactProdTS', ['ts_name', 'other_name'])
SameTypeGSTS = namedtuple('SameTypeGSTS', ['ts_name', 'other_name', 'is_gsgs'])
Pair = namedtuple('Pair', ['first', 'second'])
DF_COL_TS_SIBLING, DF_COL_OTHER_SIBLING = 'TS_Sibling', 'Other_Sibling'
DF_COL_EN_PROP = 'Energy_Property'
DfCustomRateColumns = namedtuple('DfCustomRateColumns', [
    DF_COL_TS_SIBLING, DF_COL_OTHER_SIBLING, 'Temperature_K', 'Pressure_atm',
    DF_COL_EN_PROP, 'Energy_Barrier_kcal_per_mol', 'TS_Partition_F',
    'Other_Partition_F', RATE_CONSTANT_STRING, 'Rate_Constant_Type',
    'Rate_Constant_Units'
])
DF_COL_SIBLING = 'Sibling'
DfCustomEqColumns = namedtuple('DfCustomEqColumns', [
    DF_COL_SIBLING, DF_COL_OTHER_SIBLING, 'Temperature_K', 'Pressure_atm',
    'Energy_Barrier_kcal_per_mol', DF_COL_EN_PROP, 'Keq'
])
HARTREE_UNITS = ['au']
KCAL_PER_MOL_UNITS = ['kcal/mol']
EV_UNITS = ['eV']
KJOULES_PER_MOL_UNITS = ['kJ/mol']
KNOWN_UNITS = (HARTREE_UNITS + KCAL_PER_MOL_UNITS + EV_UNITS +
               KJOULES_PER_MOL_UNITS)
# kcal/mol/K
IDEAL_GAS_CONSTANT = (constants.R / constants.calorie) / constants.kilo
STAGE_SEPARATOR = '_stage_'
REL_REACTANTS_W_X_EXT = '_conf_avg_rel_reactants.csv'
REL_REACTANTS_WO_X_EXT = '_conf_avg_wo_x_rel_reactants.csv'
REL_REACTANTS_LOWEST_EXT = '_lowest_conf_rel_reactants.csv'
REL_PARENTS_W_X_EXT = '_conf_avg_rel_parents.json'
REL_PARENTS_WO_X_EXT = '_conf_avg_wo_x_rel_parents.json'
REL_PARENTS_LOWEST_EXT = '_lowest_conf_rel_parents.json'
REL_REACTANTS_EXTS = [
    REL_REACTANTS_W_X_EXT, REL_REACTANTS_WO_X_EXT, REL_REACTANTS_LOWEST_EXT
]
REL_PARENTS_EXTS = [
    REL_PARENTS_W_X_EXT, REL_PARENTS_WO_X_EXT, REL_PARENTS_LOWEST_EXT
]
REL_EXT_TO_KWARG_DICT = {
    REL_REACTANTS_W_X_EXT: 'edict_w_x',
    REL_REACTANTS_WO_X_EXT: 'edict_wo_x',
    REL_REACTANTS_LOWEST_EXT: 'edict_lowest',
    REL_PARENTS_W_X_EXT: 'edict_w_x',
    REL_PARENTS_WO_X_EXT: 'edict_wo_x',
    REL_PARENTS_LOWEST_EXT: 'edict_lowest'
}
SIBLING_GROUP_NAME = 'Sibling_Group_Name'
ANHARMONIC_TAG = '_anharmonic'
RATE_CONSTANTS_W_X_EXT = '_conf_avg_rate_constants.csv'
CUSTOM_RATE_CONSTS_EXT = '_custom_rate_constants.csv'
CUSTOM_KEQ_CONSTS_EXT = '_custom_keq.csv'
FFLD_ENERGY_GLOBAL_TAG = '_global'
MmodEnergyKeys = namedtuple('MmodEnergyKeys', ['absolute', 'relative'])
# kJ/mol
CSEARCH_REL_ENERGY_THRESH = 50.
# Ang.
CSEARCH_RMSD_THRESH = 0.1
JMSWF_LAUNCH_DIR = 'jaguar_multistage_workflow'
ANHARM_LAUNCH_DIR = 'anharmonic'
CTST_RATE_CONSTANTS_DIR = 'CTST_rate_constants'
# the variable name pp_rel_energy_thresh uses the prefix pp to indicate
# that this is the relative energy used in postprocessing MacroModel results
# and is different from the variable name rel_energy_thresh, used elsewhere in this
# code, which is the relative energy used in the MacroModel csearch
PP_CSEARCH_REL_ENERGY_THRESH = None  # kJ/mol or None
DESCRIPTORS_LAUNCH_DIR_HEADER = 'descriptors_'
EnergyAnalysisProperty = namedtuple('EnergyAnalysisProperty', [
    'sibling_group', 'conformer_groups', 'representative_conformers',
    'temperature', 'pressure', 'energy_key', 'property_key', 'avg_property_key',
    'atom_idx', 'ensemble', 'include_x_terms', 'only_lowest_energy'
])
OUT_EXT = '-out'
DEFAULT_ENUM_RXNWF_JOB_NAME = 'enumerate_reaction_workflow'
# from_idx specifies the direction of the substitution relative
# to to_idx, to_idx specifies the substitution site, hash_idx
# is an enumeration index which maps the site to an R-group file,
# structure_idx indicates which structure in the input file this
# site applies to
Site = namedtuple('Site', ['from_idx', 'to_idx', 'hash_idx', 'structure_idx'])
Source = namedtuple('Source', ['rgroup_st', 'site_idxs'])
get_msg = lambda site, msg: (
    f'Invalid atom indices ({site.from_idx}, '
    f'{site.to_idx}) for enumeration index {site.hash_idx} '
    f'for structure index {site.structure_idx}.  {msg}')
RGROUP_SEPARATOR = '_'
NOVEL = 'novel'
REFERENCE = 'reference'
DEFAULT_AUTO_RXNWF_JOB_NAME = 'automatic_reaction_workflow'
ENERGY_KEYS = 'ENERGIES'
ANHARMONIC_ENERGY_KEYS = 'ANHARMONIC_ENERGIES'
CUSTOM_ENERGY_KEYS = "CUSTOM_ENERGIES"
RATE_CONSTANT_KEY = "RATE_CONSTANT_KEY"
MASS_N_DECIMAL = 3
# hard code was decided in MATSCI-10586
KEQ_MAX = 10e100
[docs]def log(msg, **kwargs):
    """
    Add a message to the log file
    :param str msg: The message to log
    Additional keyword arguments are passed to the textlogger.log_msg function
    """
    textlogger.log_msg(msg, **kwargs) 
[docs]def get_mmod_energy_key(ffld_name=None):
    """
    Returns the corresponding MacroModel energy key for the given forcefield.
    If no forcefield is given and the license is available, it defaults to
    S-OPLS. If no forcefield is given and the license is NOT available, it
    defaults to OPLS2005.
    :param str ffld_name: The forcefield to get the energy key for. If `None`,
        defaults to the latest forcefield available given your licenses.
    :return str energy_key: The MacroModel suffix `key` for the default
        forcefield. Units are in kJ/mol
    """
    keys = get_mmod_energy_keys(ffld_name)
    return keys.absolute 
[docs]def get_mmod_rel_energy_key(ffld_name=None):
    """
    Returns the corresponding MacroModel relative energy key for the given
    forcefield.  If no forcefield is given and the license is available, it
    defaults to S-OPLS. If no forcefield is given and the license is NOT
    available, it defaults to OPLS2005.
    :param str ffld_name: The forcefield to get the energy key for. If `None`,
        defaults to the latest forcefield available given your licenses.
    :return str energy_key: The MacroModel suffix `key` for the default
        forcefield.
    """
    keys = get_mmod_energy_keys(ffld_name)
    return keys.relative 
[docs]def get_mmod_energy_keys(ffld_name=None):
    """
    Returns the corresponding MacroModel energy key for the given forcefield.
    If no forcefield is given and the license is available, it defaults to
    S-OPLS. If no forcefield is given and the license is NOT available, it
    defaults to OPLS2005.
    :param str ffld_name: The forcefield to get the energy key for. If `None`,
        defaults to the latest forcefield available given your licenses.
    :return namedtuple energy_keys: A named two-tuple where the first element
        (with a key of `absolute`) is the MacroModel suffix for the absolute
        energy (in kJ/mol), and the second element (with a key of `relative`)
        is the MacroModel suffix for the relative energy.
    """
    sopls_key = f'r_mmod_Potential_Energy-{mm.OPLS_NAME_F16}'
    sopls_rel_key = f'r_mmod_Relative_Potential_Energy-{mm.OPLS_NAME_F16}'
    s_opls_keys = MmodEnergyKeys(sopls_key, sopls_rel_key)
    # The value for `mm.OPLS_NAME_F16` matches the suffix for the energy keys,
    # but the value for `mm.OPLS_NAME_F14` does not match its corresponding
    # suffix. Here we hard-code the correct suffix (suffix is 'OPLS-2005'
    # while name is 'OPLS_2005').
    opls2005_key = 'r_mmod_Potential_Energy-OPLS-2005'
    opls2005_rel_key = 'r_mmod_Relative_Potential_Energy-OPLS-2005'
    opls2005_keys = MmodEnergyKeys(opls2005_key, opls2005_rel_key)
    if ffld_name == mm.OPLS_NAME_F16:
        return s_opls_keys
    elif ffld_name == mm.OPLS_NAME_F14:
        return opls2005_keys
    elif ffld_name is None:
        if is_opls2_available():
            return s_opls_keys
        else:
            return opls2005_keys
    else:
        raise ValueError(f'{ffld_name} is not a recognize forcefield name.') 
[docs]@enum.unique
class DF_RATE_TYPE(enum.Enum):
    DEFAULT = 'default'
    LNQ = 'lnq'
    ANH_LNQ = 'anharmonic_lnq' 
[docs]def get_reactions_data(sts):
    """
    Given conformers dictionary return list of reactions.
    :param list[structure.Structure] sts: List of structures
    :rtype: list[Pair]: List of pairs that holds two ReactProdTS (first, second)
        or SameTypeGSTS (for the Int-Int or TS-TS case) which in turn hold the
        data for calculating both two rate constants and two equilibrium constants
    """
    sibling_conformers_dict = bin_structures_by_property(
        sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY)
    ts_sibling_groups, intermediate_sibling_groups = [], []
    for sibling_group, conformers_dict in sibling_conformers_dict.items():
        rep_sts = [conformers[0] for conformers in conformers_dict.values()]
        if has_transition_state(rxnwf_sts=rep_sts):
            ts_sibling_groups.append(sibling_group)
        else:
            intermediate_sibling_groups.append(sibling_group)
    react_prod_ts = []
    for ts_name, other_name in itertools.product(ts_sibling_groups,
                                                 intermediate_sibling_groups):
        react_prod_ts.append(ReactProdTS(ts_name, other_name))
    # Make pairs of same type of siblings, GS-GS, TS-TS (MATSCI-12024)
    for gs_name_a, gs_name_b in itertools.combinations(
            intermediate_sibling_groups, 2):
        react_prod_ts.append(SameTypeGSTS(gs_name_a, gs_name_b, True))
    for ts_name_a, ts_name_b in itertools.combinations(ts_sibling_groups, 2):
        react_prod_ts.append(SameTypeGSTS(ts_name_a, ts_name_b, False))
    if len(react_prod_ts) == 1:
        # If there is only one react_prod_ts, duplicate it in the pair
        return [Pair(react_prod_ts[0], react_prod_ts[0])]
    ret = []
    for rpts_a, rpts_b in itertools.combinations(react_prod_ts, 2):
        ret.append(Pair(rpts_a, rpts_b))
    # Note that list of pairs can contain repeating siblings, both ts and other,
    # since all the combinations are generated
    return ret 
[docs]def add_remove_props(sts, prop, add_props):
    """
    Set additional properties equal to the original property from input
    structures, original property will be deleted.
    :param list[structure.Structure] sts: List of structures to be modified
    :param str prop: Value of this property will be used, the property will be
        deleted
    :param list[str] add_props: List of properties to be added to the structure
    """
    for struct in sts:
        val = struct.property.pop(prop)
        for add_prop in add_props:
            struct.property[add_prop] = val 
[docs]def get_pressures_temps(press_data, temp_data):
    """
    Get list of pressures and temepratures from input data.
    :param PressData press_data: Pressure data
    :param TempData temp_data: Temperature data
    :rtype: list[float], list[float]
    :return: List of pressures, list of temperatures
    """
    pressures = [
        press_data.press_start + i * press_data.press_step
        for i in range(0, press_data.press_n)
    ]
    temps = [
        temp_data.temp_start + i * temp_data.temp_step
        for i in range(0, temp_data.temp_n)
    ]
    return pressures, temps 
[docs]def set_gas_phase_zpe_props(sts, press_data, temp_data, stage_idx):
    """
    Set temp/pressure dependent gas phase + ZPE property in the input structures
    based on the original property
    :param list[structure.Structure] sts: List of structures to be modified
    :param PressData press_data: Pressure data
    :param TempData temp_data: Temperature data
    :param int stage_idx: Stage index
    """
    pressures, temps = get_pressures_temps(press_data, temp_data)
    e_zpe_props = [
        '%s%s%d' % (jaguarworkflows.get_gas_phase_zpe_key(
            temp, press), STAGE_SEPARATOR, stage_idx)
        for press, temp in itertools.product(pressures, temps)
    ]
    prop = '%s%s%d' % (jaguarworkflows.GAS_PHASE_ZERO_POINT_ENERGY_PROP,
                       STAGE_SEPARATOR, stage_idx)
    add_remove_props(sts, prop, e_zpe_props) 
[docs]def get_eprop_data(eprop):
    """
    Get conversion factor to kcal/mol, temperature in K, pressure in atm,
    from energy property
    :param str eprop: Energy property
    :rtype: float, float, float
    :return: conversion factor to kcal/mol, temperature in K, pressure in atm
    """
    conv = ReactionWorkflowEnergyAnalysis.getKcalPerMolConversion(eprop)
    assert conv is not None  # Should not happen
    temp = jaguarworkflows.get_temperature(eprop)
    assert temp is not None  # Should not happen
    press = jaguarworkflows.get_pressure(eprop)
    assert press is not None  # Should not happen
    return conv, temp, press 
[docs]def add_custom_eq_rows(name, other_name, avg_props, other_avg_props, eprop,
                       dframe):
    """
    Add data with kEQ to the dataframe
    :param str name: Sibling name
    :param str other_name: Other sibling name
    :param list[EnergyAnalysisProperty] avg_props: List of average properties
        of sibling
    :param list[EnergyAnalysisProperty] other_avg_props: List of average
        properties of the other sibling
    :param str eprop: Energy property
    :param pandas.DataFrame dframe: Input dataframe
    :rtype: pandas.DataFrame
    :return: Updated dataframe
    """
    def keq_in_dframe(dframe, sibling, other_sibling, eprop):
        query = ('%s == "%s" and %s == "%s" and %s == "%s"' %
                 (DF_COL_SIBLING, sibling, DF_COL_OTHER_SIBLING, other_sibling,
                  DF_COL_EN_PROP, eprop))
        return bool(len(dframe.query(query)))
    from schrodinger.application.matsci import boltzmann_avg_props as boltz
    conv, temp, press = get_eprop_data(eprop)
    denergy = (boltz.get_averaged_value(eprop, avg_props) -
               boltz.get_averaged_value(eprop, other_avg_props)) * conv
    # Forward and backward
    for factor, (name_a, name_b) in zip([1, -1], [(name, other_name),
                                                  (other_name, name)]):
        if keq_in_dframe(dframe, name_a, name_b, eprop):
            # Skip already present row
            continue
        if denergy < -100:
            # hard code was decided in MATSCI-10586
            keq = KEQ_MAX
        else:
            denergy *= factor
            keq = get_keq(denergy, temp)
        keq = min(keq, KEQ_MAX)
        df_row = DfCustomEqColumns(Sibling=name_a,
                                   Other_Sibling=name_b,
                                   Temperature_K=temp,
                                   Pressure_atm=press,
                                   Energy_Property=eprop,
                                   Energy_Barrier_kcal_per_mol=denergy,
                                   Keq=keq)
        dframe = dframe.append(df_row._asdict(), ignore_index=True)
    return dframe 
[docs]def add_custom_rate_rows(reaction, ts_avg_props, other_avg_props, eprop, dframe,
                         is_anharm):
    """
    Add data with custom rates to the dataframe
    :param ReactProdTS reaction: Reaction/product - TS
    :param list[EnergyAnalysisProperty] ts_avg_props: List of average properties
    :param list[EnergyAnalysisProperty] other_avg_props: List of other average
        properties
    :param str eprop: Energy property
    :param pandas.DataFrame dframe: Input dataframe
    :param bool is_anharm: Whether it is anharmonic calculation
    :rtype: pandas.DataFrame
    :return: Updated dataframe
    """
    from schrodinger.application.matsci import boltzmann_avg_props as boltz
    conv, temp, press = get_eprop_data(eprop)
    denergy = (boltz.get_averaged_value(eprop, ts_avg_props) -
               boltz.get_averaged_value(eprop, other_avg_props)) * conv
    rate = get_custom_rate(denergy, temp)
    df_row = DfCustomRateColumns(TS_Sibling=reaction.ts_name,
                                 Other_Sibling=reaction.other_name,
                                 Temperature_K=temp,
                                 Pressure_atm=press,
                                 Energy_Property=eprop,
                                 Energy_Barrier_kcal_per_mol=denergy,
                                 TS_Partition_F=None,
                                 Other_Partition_F=None,
                                 Rate_Constant=rate,
                                 Rate_Constant_Type=DF_RATE_TYPE.DEFAULT.value,
                                 Rate_Constant_Units='1/s')
    if not is_anharm:
        # rate without Q is only computed for the harmonic case
        dframe = dframe.append(df_row._asdict(), ignore_index=True)
    ifreq = boltz.get_averaged_value(eprop, ts_avg_props,
                                     jaguarworkflows.LOWEST_FREQUENCY_PROP)
    ts_lnq = get_lnq(eprop, ts_avg_props, is_anharm)
    other_lnq = get_lnq(eprop, other_avg_props, is_anharm)
    dlnq = ts_lnq - other_lnq
    rate_type = (DF_RATE_TYPE.ANH_LNQ if is_anharm else DF_RATE_TYPE.LNQ)
    rate = get_custom_rate(denergy, temp, dlnq, ifreq)
    df_row = df_row._replace(TS_Partition_F=ts_lnq,
                             Other_Partition_F=other_lnq,
                             Rate_Constant_Type=rate_type.value,
                             Rate_Constant=rate)
    dframe = dframe.append(df_row._asdict(), ignore_index=True)
    return dframe 
[docs]def get_custom_keq_rates(out_mae,
                         press_data,
                         temp_data,
                         extra_stages_file=None,
                         compute_rates=True):
    """
    Given output mae file, write custom equilibrium constants and optionally
    custom rates to CSV dataframes.
    :param str out_mae: Output file name
    :param PressData press_data: Pressure data
    :param TempData temp_data: Temperature data
    :type extra_stages_file: str or None
    :param extra_stages_file: Extra staged file name or None
    :param bool compute_rates: Whether to compute rates or only equilibrium
        constants
    """
    def rate_in_dframe(dframe, rpts, eprop):
        query = ('%s == "%s" and %s == "%s" and %s == "%s"' %
                 (DF_COL_TS_SIBLING, rpts.ts_name, DF_COL_OTHER_SIBLING,
                  rpts.other_name, DF_COL_EN_PROP, eprop))
        return bool(len(dframe.query(query)))
    from schrodinger.application.matsci import boltzmann_avg_props as boltz
    sts = list(structure.StructureReader(out_mae))
    stage_idx = get_rep_stage_idx(rxnwf_sts=sts)
    extra_eprops = set()
    if extra_stages_file:
        extra_eprops = set_extra_energy_props(sts, extra_stages_file,
                                              press_data, temp_data)
    set_gas_phase_zpe_props(sts, press_data, temp_data, stage_idx)
    eprops = set()
    for starter in (jaguarworkflows.TOTAL_FREE_ENERGY_STARTER,
                    jaguarworkflows.TOTAL_ENTHALPY_STARTER,
                    jaguarworkflows.GAS_PHASE_ZERO_POINT_ENERGY_PROP,
                    jaguarworkflows.TOTAL_INTERNAL_ENERGY_STARTER):
        eprops.update(
            get_present_props(starter, sts[0].property, stage_idx=stage_idx))
    anh_eprops = set()
    for starter in (anharmonic.U_STARTER, anharmonic.H_STARTER,
                    anharmonic.G_STARTER):
        anh_eprops.update(
            get_present_props(starter, sts[0].property, stage_idx=stage_idx))
    all_eprops = eprops | anh_eprops | extra_eprops  # add sets
    avg_props = boltz.get_averaged_properties(sts,
                                              all_eprops,
                                              allow_sibling_groups=True)
    dframe = pandas.DataFrame(columns=DfCustomRateColumns._fields)
    dframe_eq = pandas.DataFrame(columns=DfCustomEqColumns._fields)
    reactions = get_reactions_data(sts)
    data = dict()
    for reaction in reactions:
        for name in (reaction.first.ts_name, reaction.first.other_name,
                     reaction.second.ts_name, reaction.second.other_name):
            if name not in data:
                data[name] = [
                    ea_prop for ea_prop in avg_props
                    if ea_prop.sibling_group == name
                ]
        for eprop in all_eprops:
            is_anharm = eprop in anh_eprops
            if compute_rates:
                # Only add if sibling + other sibling + energy property is not in
                # the dframe. reactions obtained from get_reactions_data can contain
                # repeating siblings, both ts and other.
                if (isinstance(reaction.first, ReactProdTS) and
                        not rate_in_dframe(dframe, reaction.first, eprop)):
                    dframe = add_custom_rate_rows(
                        reaction.first, data[reaction.first.ts_name],
                        data[reaction.first.other_name], eprop, dframe,
                        is_anharm)
                if (isinstance(reaction.second, ReactProdTS) and
                        not rate_in_dframe(dframe, reaction.second, eprop)):
                    dframe = add_custom_rate_rows(
                        reaction.second, data[reaction.second.ts_name],
                        data[reaction.second.other_name], eprop, dframe,
                        is_anharm)
            if reaction.first == reaction.second:
                # This is the case where there is only one pair, see
                # get_reactions_data
                if reaction.first.ts_name != reaction.first.other_name:
                    dframe_eq = add_custom_eq_rows(
                        reaction.first.ts_name, reaction.first.other_name,
                        data[reaction.first.ts_name],
                        data[reaction.first.other_name], eprop, dframe_eq)
            else:
                if reaction.first.ts_name != reaction.second.ts_name:
                    dframe_eq = add_custom_eq_rows(
                        reaction.first.ts_name, reaction.second.ts_name,
                        data[reaction.first.ts_name],
                        data[reaction.second.ts_name], eprop, dframe_eq)
                if reaction.first.other_name != reaction.second.other_name:
                    dframe_eq = add_custom_eq_rows(
                        reaction.first.other_name, reaction.second.other_name,
                        data[reaction.first.other_name],
                        data[reaction.second.other_name], eprop, dframe_eq)
    if len(dframe):
        # Only write non-empty file
        out_fn = '%s%s' % (jobutils.get_jobname(DEFAULT_JOB_NAME),
                           CUSTOM_RATE_CONSTS_EXT)
        dframe.to_csv(out_fn)
        jobutils.add_outfile_to_backend(out_fn)
    if len(dframe_eq):
        # Only write non-empty file
        out_eq_fn = '%s%s' % (jobutils.get_jobname(DEFAULT_JOB_NAME),
                              CUSTOM_KEQ_CONSTS_EXT)
        dframe_eq.to_csv(out_eq_fn)
        jobutils.add_outfile_to_backend(out_eq_fn) 
[docs]def get_keq(energy, temp):
    """
    Get k_equilibrium given energy and temperature.
    :param float energy: energy in kcal/mol
    :param float temp: Temperature in Kelvin
    :rtype: float
    :return: k_equilibrium
    """
    return numpy.exp(-1 * energy / IDEAL_GAS_CONSTANT / temp) 
[docs]def get_custom_rate(energy, temp, dlnq=None, ifreq=None):
    """
    Get custom rate.
    :param float energy: Energy in kcal/mol (!)
    :param temp: Temperature in K
    :type lnq: None or float
    :param lnq: lnQ to compute rate with partition function, unitless
    :type ifreq: None or float
    :param ifreq: Lowest negative frequency to compute rate with
        partition function, in 1/cm
    :rtype: float
    :return: Rate in 1/s
    """
    if energy < -100:
        # hard code was decided in MATSCI-10586
        return 10e100
    # k = kBh * T * np.exp(-1.0 * energy / kB1 / T) # 1/s
    kBh = constants.value('Boltzmann constant in Hz/K')
    chkB = constants.value('second radiation constant')  # = 0.01438776877 m * K
    kval = kBh * temp * get_keq(energy, temp)
    if dlnq is None:
        kval = min(kval, KEQ_MAX)
        return kval
    assert ifreq is not None  # required when lnq provided
    assert ifreq < 0, 'Found positive frequency: %f' % ifreq  # Must be imaginary (negative)
    ifreq_m = ifreq / constants.centi  # from 1/cm to 1/m
    w_corr = (-1 * ifreq_m * chkB / temp)**2 / 24.0 + 1.0
    # k = w * kBh * T * exp( D(lnQ) ) * exp(-1.0 * energy / kB1 / T) # 1/s
    kval = w_corr * numpy.exp(dlnq) * kval
    kval = min(kval, KEQ_MAX)
    return kval 
[docs]def get_present_props(en_starter, properties, stage_idx=None):
    """
    Get all properties with a certain starter and optionally stage index
    :param str en_starter: Property must start with this
    :param list[str] properties: List of properties
    :type stage_idx: None or int
    :param stage_idx: Stage index
    :rtype: list[str]
    :return: List of matching properties
    """
    postfix = ('' if stage_idx is None else '%s%d' %
               (STAGE_SEPARATOR, stage_idx))
    return [
        prop for prop in properties
        if (prop.startswith(en_starter) and prop.endswith(postfix))
    ] 
[docs]def get_present_props_from_sts(sts, prefix, stage_idx=None):
    """
    Get properties that start with prefix and end with stage index
    (if provided). Check if pressure/temperature is present. Ensure that all
    the structures have at least the same list of properties
    :param list[structure.Structure] sts: List of structures
    :param str prefix: Property must start with this
    :type stage_idx: None or int
    :param stage_idx: Stage index
    :rtype: set[str]
    :return: Set of properties
    """
    props = set()
    for aprop in get_present_props(prefix, sts[0].property,
                                   stage_idx=stage_idx):
        press = jaguarworkflows.get_pressure(aprop)
        temp = jaguarworkflows.get_temperature(aprop)
        assert press is not None and temp is not None, (
            'Pressure or '
            'temperature is missing from %s' % aprop)
        props.add(aprop)
    # All structures must have these properties
    assert all(props.issubset(struct.property) for struct in sts), (
        'Some '
        'structures are missing properties: %s' % props)
    return props 
[docs]def get_lnq(eprop, averaged_properties, is_anharm):
    """
    Get lnQ Boltzmann averaged over energy property.
    :param str e_prop: Energy property
    :param dict averaged_properties: Averaged Boltzmann properties
    :param bool is_anharm: Whether to use anharmonic approximation
    :rtype: float
    :return: Averaged lnQ
    """
    temp = jaguarworkflows.get_temperature(eprop)
    assert temp is not None  # Should not happen
    press = jaguarworkflows.get_pressure(eprop)
    assert press is not None  # Should not happen
    if is_anharm:
        press = jaguarworkflows.format_pressure(press)
        lnq_prop = anharmonic.LNQ_KEY.format(temp=temp, press=press)
        # Stage is set in anharmonic case, but not it harmonic case
        lnq_prop += '%s%d' % (STAGE_SEPARATOR, get_stage_idx(eprop))
    else:
        lnq_prop = jaguarworkflows.get_lnq_key(temp, press)
    from schrodinger.application.matsci import boltzmann_avg_props as boltz
    lnq = boltz.get_averaged_value(eprop, averaged_properties, lnq_prop)
    return lnq 
[docs]def get_restrain_atom_idxs(st):
    """
    Return a list of indices of restrain atoms
    in the given structure.
    :type st: schrodinger.structure.Structure
    :param st: the structure
    :rtype: list
    :return: contains indices of restrain atoms
    """
    return sfu.get_idxs_marked_atoms(st, RESTRAINED_ATOM_KEY) 
[docs]def get_idx_groups(text):
    """
    Get index groups from the given string.
    :type text: str
    :param text: the string
    :raise: InvalidInput if there is a formatting issue
    :rtype: list
    :return: contains list of indices
    """
    try:
        return sfu.get_idx_groups(text)
    except sfu.SwapFragmentsException as err:
        raise InvalidInput(str(err)) 
[docs]def get_restrain_distance_idxs(st):
    """
    Return a list of lists of indices of restrain distances
    in the given structure.
    :type st: schrodinger.structure.Structure
    :param st: the structure
    :rtype: list
    :return: contains lists of restrain distances
    """
    return sfu._get_restrain_group_idxs(st, RESTRAINED_DISTANCES_KEY) 
[docs]def get_restrain_angle_idxs(st):
    """
    Return a list of lists of indices of restrain angles
    in the given structure.
    :type st: schrodinger.structure.Structure
    :param st: the structure
    :rtype: list
    :return: contains lists of restrain angles
    """
    return sfu._get_restrain_group_idxs(st, RESTRAINED_ANGLES_KEY) 
[docs]def get_restrain_dihedral_idxs(st):
    """
    Return a list of lists of indices of restrain dihedrals
    in the given structure.
    :type st: schrodinger.structure.Structure
    :param st: the structure
    :rtype: list
    :return: contains lists of restrain dihedrals
    """
    return sfu._get_restrain_group_idxs(st, RESTRAINED_DIHEDRALS_KEY) 
[docs]def get_jaguar_keywords_list(jaguar_keywords_dict):
    """
    Return the Jaguar keywords list from the given dict.
    :type jaguar_keywords_dict: OrderedDict
    :param jaguar_keywords_dict: the Jaguar keywords dict
    :rtype: list
    :return: the Jaguar keywords list
    """
    return ['='.join(item) for item in jaguar_keywords_dict.items()] 
[docs]def type_cast_seed(seed):
    """
    Type cast the seed.
    :type seed: str or unicode
    :param seed: seed for random number generator
    :rtype: int
    :return: the seed
    """
    return parserutils.type_random_seed(seed,
                                        seed_min=go.CONF_SEARCH_SEED_RANGE[0],
                                        seed_max=go.CONF_SEARCH_SEED_RANGE[1]) 
[docs]def type_cast_jaguar_keywords(jaguar_keywords,
                              reserved_keys=RESERVED_JAGUAR_KEYS,
                              exception_type=argparse.ArgumentTypeError):
    """
    Type cast the Jaguar keywords.
    :type jaguar_keywords: str or unicode or list
    :param jaguar_keywords: the Jaguar keywords, a whitespace
        delimited string of '<key>=<value>' tokens or a list
        of such tokens
    :type reserved_keys: set
    :param reserved_keys: contains reserved Jaguar keys
    :type exception_type: type
    :param exception_type: the exception type to raise if invalid
    :rtype: OrderedDict
    :return: the Jaguar keywords OrderedDict
    """
    if not jaguar_keywords:
        msg = ('Jaguar keywords must be provided.')
        raise exception_type(msg)
    if isinstance(jaguar_keywords, list):
        jaguar_keywords = ' '.join(jaguar_keywords)
    jaguar_keywords = str(jaguar_keywords)
    try:
        msutils.keyword_string_to_dict(jaguar_keywords)
    except ValueError as err:
        msg = ('The following issue has been found with the '
               'specified Jaguar keywords: {err}.  Such options '
               'must be specified using whitespace separated '
               '<key>=<value> pairs.').format(err=str(err))
        raise exception_type(msg)
    adict = OrderedDict([token.split('=') for token in jaguar_keywords.split()])
    if reserved_keys.intersection(set(adict.keys())):
        msg = ('The following Jaguar keys are reserved for this workflow: '
               '{keys}.').format(keys=sorted(reserved_keys))
        raise exception_type(msg)
    return adict 
[docs]def check_ff_assignment(sts, ffld_name=None):
    """
    Check the assignment of the given force field to the given
    structures.
    :type sts: list
    :param sts: contains schrodinger.structure.Structure
    :type ffld_name: str
    :param ffld_name: the force field name.
    :raise ValueError: if invalid
    """
    if ffld_name is None:
        ffld_name = msutils.get_default_forcefield().name
    # in case incoming structures are in the centroid representation
    sts = [st.copy() for st in sts]
    for st in sts:
        etatoggle.toggle_structure(st, out_rep=parserutils.ETA)
    # currently the following does not support an mmlewis_apply
    # clean up layer (which can change the incoming atom types),
    # see minimize.Minimizer.setStructure for more details
    ffld_version = mm.opls_name_to_version(ffld_name)
    invalid_sts = desmondutils.find_forcefield_invalid_structures(
        sts, ffld_version)
    invalid_idxs = [str(sts.index(st) + 1) for st in invalid_sts]
    if invalid_idxs:
        idxs = ','.join(invalid_idxs)
        msg = ('The {name} force field could not be assigned to '
               'the following structures: {idxs}.').format(name=ffld_name,
                                                           idxs=idxs)
        raise ValueError(msg) 
[docs]def get_molecular_weight(st, idxs=None, decimal=None):
    """
    Return the molecular weight (amu) taken over the given atom
    indices in the given structure.
    :type st: schrodinger.structure.Structure
    :param st: the structure
    :type idxs: list
    :param idxs: the atom indices
    :type decimal: None or int
    :param decimal: an optional number of decimal places to which
        to round the weight
    :rtype: float
    :return: the molecular weight (amu)
    """
    # both st.total_weight and atom.atomic_weight use implicit hydrogens
    # so do it by hand
    if not idxs:
        idxs = range(1, st.atom_total + 1)
    weight = 0
    for idx in idxs:
        weight += mm.mmelement_get_atomic_weight_by_atomic_number(
            st.atom[idx].atomic_number)
    if decimal is not None:
        return round(weight, decimal)
    return weight 
[docs]def check_centroid_rep(st):
    """
    Check the centroid representation of the given structure.
    :type st: schrodinger.structure.Structure
    :param st: the structure
    :raise ValueError: if invalid
    """
    dummies = etatoggle.get_dummy_atoms(st)
    if not dummies:
        return
    dummy_idxs = [dummy.index for dummy in dummies]
    # if using the centroid representation then those dummy
    # atoms should be at the end of the atom list
    non_dummies = set(range(1, st.atom_total + 1)).difference(dummy_idxs)
    if len(non_dummies) < max(non_dummies):
        msg = ('Dummy atoms used for centroid representations must be at the '
               'end of the atom list.')
        raise ValueError(msg)
    # dummies should have no metadata set
    for key in (KEEP_ATOM_KEY, RESTRAINED_ATOM_KEY, SUPERPOSITION_ATOM_KEY):
        for atom in dummies:
            if atom.property.get(key):
                msg = ('Dummy atoms used for centroid representations can not '
                       'be used as keep, restrained, or superposition atoms.')
                raise ValueError(msg)
    # dummies should not be used in restraints
    for key in (RESTRAINED_DISTANCES_KEY, RESTRAINED_ANGLES_KEY,
                RESTRAINED_DIHEDRALS_KEY):
        jdxs = set([
            idx for idxs in sfu._get_restrain_group_idxs(st, key)
            for idx in idxs
        ])
        if jdxs.intersection(dummy_idxs):
            msg = ('Restrained distances, angles, or dihedrals can not use '
                   'dummy atoms.')
            raise ValueError(msg) 
[docs]def check_reaction_wf_structures(rxn_sts,
                                 ffld_name=None,
                                 mass_conserved=False,
                                 keep_atoms_only=False):
    """
    Check the given reaction workflow structues.
    :type rxn_sts: str or list
    :param rxn_sts: the reaction workflow structures, a file
        name or list of schrodinger.structure.Structure
    :type ffld_name: str
    :param ffld_name: the force field name to use when optionally
        checking its assignment to the given structures
    :type mass_conserved: bool
    :param mass_conserved: check that mass is conserved (see also
        keep_atoms_only kwarg)
    :type keep_atoms_only: bool
    :param keep_atoms_only: specifies that only keep atoms be considered
        when checking if mass is conserved (see also mass_conserved kwarg)
    :raise ValueError: if invalid
    """
    if isinstance(rxn_sts, str):
        if not fileutils.is_maestro_file(rxn_sts):
            msg = ('The file {afile} is not a Maestro file.').format(
                afile=rxn_sts)
            raise ValueError(msg)
        if not fileutils.get_basename(rxn_sts).endswith(REACTION_WF_TAG):
            msg = ('The file {afile} is not a reaction workflow file '
                   'as its base name does not end in {tag}.').format(
                       afile=rxn_sts, tag=REACTION_WF_TAG)
            raise ValueError(msg)
        rxn_sts = [st for st in structure.StructureReader(rxn_sts)]
    titles = set()
    for idx, st in enumerate(rxn_sts, 1):
        if not st.property.get(REACTION_WF_STRUCTURE_KEY):
            msg = ('Structure number {idx} does not belong to a '
                   'reaction workflow.').format(idx=idx)
            raise ValueError(msg)
        if not st.property.get(CONFORMERS_GROUP_KEY):
            msg = ('Structure number {idx} does not belong to a '
                   'conformer group.').format(idx=idx)
            raise ValueError(msg)
        titles.add(st.title)
    if len(titles) < len(rxn_sts):
        msg = ('Structures must have unique titles.')
        raise ValueError(msg)
    sibling_conformers_dict = bin_structures_by_property(
        rxn_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY)
    getters = [
        get_restrain_distance_idxs, get_restrain_angle_idxs,
        get_restrain_dihedral_idxs
    ]
    found_parents = True
    masses = set()
    for sibling_group, conformers_dict in sibling_conformers_dict.items():
        num_tss = 0
        parents = set()
        mass = 0
        for conformer_group, conformers in conformers_dict.items():
            # check conformers
            check_conformers(conformers, conformer_group)
            first_conformer = conformers[0]
            num_tss += first_conformer.property.get(
                TRANSITION_STATE_STRUCTURE_KEY, 0)
            parents.add(first_conformer.property.get(PARENT_SIBLING_GROUPS_KEY))
            # check restraint idxs
            keep_idxs = sfu.get_idxs_marked_atoms(first_conformer,
                                                  KEEP_ATOM_KEY)
            if keep_idxs:
                all_restrain_idxs = set()
                for getter in getters:
                    for restrain_idxs in getter(first_conformer):
                        all_restrain_idxs.update(restrain_idxs)
                if not all_restrain_idxs.issubset(keep_idxs):
                    msg = (
                        f'The structures in conformer group {conformer_group} '
                        'have atoms involved in distance, angle, and/or dihedral '
                        'restraints that are not among the found keep atoms.')
                    raise ValueError(msg)
            if keep_idxs and keep_atoms_only:
                mass += get_molecular_weight(first_conformer, idxs=keep_idxs)
            else:
                mass += get_molecular_weight(first_conformer)
        mass = round(mass, MASS_N_DECIMAL)
        masses.add(mass)
        # check transition states
        if num_tss > 1:
            msg = (
                f'The structures in sibling group {sibling_group} can not have '
                'more than a single transition state.')
            raise ValueError(msg)
        # check parent equivalency
        if len(parents) > 1:
            msg = (
                f'The structures in sibling group {sibling_group} do not have '
                'identical parents.')
            raise ValueError(msg)
        # check parent existence
        parents_tmp = list(parents)[0]
        if parents_tmp:
            for parent in parents_tmp.split(PARENT_SEPARATOR):
                if parent not in sibling_conformers_dict.keys():
                    msg = (
                        f'Sibling group {sibling_group} specifies a non-existent parent '
                        f'sibling group {parent}.')
                    raise ValueError(msg)
        # check reactant siblings
        if None in parents:
            if not found_parents:
                msg = (f'Structures in sibling group {sibling_group} '
                       'have no parents specified.  All groups '
                       'other than reactant siblings must '
                       'have parent groups defined.')
                raise ValueError(msg)
            found_parents = False
    if found_parents:
        msg = ('No reactants were found.')
        raise ValueError(msg)
    if mass_conserved and len(masses) > 1:
        msg = ('Mass is not conserved along the reaction path.')
        raise ValueError(msg)
    if ffld_name:
        check_ff_assignment(rxn_sts, ffld_name=ffld_name) 
[docs]def bin_structures_by_property(sts, key=CONFORMERS_GROUP_KEY, inner_key=None):
    """
    Return a dictionary of structures binned by a property with
    the given key.  If inner_key is provided then return a dictionary
    of dictionaries of structures with the inner dictionaries keyed
    by inner_key and outer dictionaries keyed by key.
    :type sts: list
    :param sts: the structures
    :type key: str
    :param key: the key for the property by which to bin
    :type inner_key: str
    :param inner_key: additionally bin by this inner_key
    :rtype: dict or dict of dict
    :return: dictionary where keys are properties and values
        are lists of structures or dictionary of dictionaries
        where the outer dictionary is keyed by key and inner
        dictionary is keyed by inner_key and values of the inner
        dictionary are lists of structures
    """
    # for backwards compatibility in the data structure if the inner
    # property exists but the outer property does not then for the
    # outer property use the inner property and update the structure
    adict = {}
    for st in sts:
        aprop = st.property.get(key)
        if inner_key:
            inner_aprop = st.property.get(inner_key)
            if aprop is None and inner_aprop:
                aprop = inner_aprop
                st.property[key] = aprop
            adict.setdefault(aprop, {}).setdefault(inner_aprop, []).append(st)
        else:
            adict.setdefault(aprop, []).append(st)
    return adict 
def _check_structures_for_property_equivalence(sts,
                                               key=RESTRAINED_ATOM_KEY,
                                               is_atom_key=True):
    """
    Check that the given structures have equivalent property values
    for the given structure or atom key.  If it is an atom key then
    the key must be a boolean key as this function only ensures that
    the groups of atoms for which the given property exists are
    equivalent as opposed to checking equivalence among the property
    values.
    :type sts: list
    :param sts: the structures
    :type key: str
    :param key: the key for the property for which to check
        equivalence
    :type is_atom_key: bool
    :param is_atom_key: True if the given key is an atom key in which
        case only property existence is checked as opposed to property
        values
    :raise ValueError: if invalid
    """
    aprops = set()
    for st in sts:
        if is_atom_key:
            aprop = tuple(sfu.get_idxs_marked_atoms(st, key))
            if not aprop:
                aprop = None
        else:
            aprop = st.property.get(key)
        aprops.add(aprop)
    if len(aprops) > 1:
        msg = (
            'The given structures have inequivalent {key} properties.').format(
                key=key)
        raise ValueError(msg)
[docs]def postprocess_conformational_search(conformer, out_mae_path, ffld_name=None):
    """
    Postprocess a MacroModel conformational search. Rewrite the given
    MacroModel out `*mae` file so that the conformers in it have properly
    updated properties.
    :type conformer: structure.Structure
    :param conformer: a representative conformer that seeded the
        search being postprocessed
    :type out_mae_path: str
    :param out_mae_path: the file path to the MacroModel out `*mae` file
    :type ffld_name: str
    :param ffld_name: the name of the force field to use for the search. If
        `None`, then defaults to the latest forcefield available.
    """
    energy_key, rel_energy_key = get_mmod_energy_keys(ffld_name)
    # see MATSCI-5508 where it was seen that charge and multiplicity
    # properties sometimes do not propagate
    charge = conformer.property.get(CHARGE_KEY)
    multiplicity = conformer.property.get(MULTIPLICITY_KEY)
    # see MATSCI-7495 since using MacroModel's "serial" search relative energies
    # need updating, energies in kJ/mol
    min_energy = min(st.property[energy_key]
                     for st in structure.StructureReader(out_mae_path))
    ff_rel_energy_key_g = f'{rel_energy_key}{FFLD_ENERGY_GLOBAL_TAG}'
    conformers_out = []
    for st in structure.StructureReader(out_mae_path):
        if charge is not None:
            st.property[CHARGE_KEY] = charge
        if multiplicity is not None:
            st.property[MULTIPLICITY_KEY] = multiplicity
        st.property[ff_rel_energy_key_g] = st.property[energy_key] - min_energy
        conformers_out.append(st)
    structure.write_cts(conformers_out, out_mae_path) 
[docs]def get_int_tuples_from_str_property(st, key, separator=SEPARATOR):
    """
    Return a list of tuples of integers from the given string structure
    property.
    :type st: schrodinger.structure.Structure
    :param st: the structure
    :type key: str
    :param key: the property key
    :type separator: str
    :param separator: the tuple separator used for the given property
    :rtype: list
    :return: contains tuples of integers
    """
    aproperty = st.property.get(key)
    if not aproperty:
        return []
    else:
        return [eval(token) for token in aproperty.split(separator)] 
[docs]def update_index_properties(st, old_to_new):
    """
    Update the index properties of the given structure.
    :type st: `structure.Structure`
    :param st: the structure
    :type old_to_new: dict
    :param old_to_new: a map of old-to-new atom indices
    """
    keys = (RESTRAINED_DISTANCES_KEY, RESTRAINED_ANGLES_KEY,
            RESTRAINED_DIHEDRALS_KEY)
    for key in keys:
        idxs = _get_new_restrain_group_idxs(st, key, old_to_new)
        text = sfu.get_idx_groups_str(idxs)
        if text:
            st.property[key] = text 
[docs]def get_core_idxs(st):
    """
    Return a set of atom indices for the core of the given structure.
    :type st: schrodinger.structure.Structure
    :param st: the structure
    :rtype: set
    :return: core atom indices
    """
    # keep atoms are not to be considered as core atoms
    atom_keys = [SUPERPOSITION_ATOM_KEY, RESTRAINED_ATOM_KEY]
    st_keys = [
        RESTRAINED_DISTANCES_KEY, RESTRAINED_ANGLES_KEY,
        RESTRAINED_DIHEDRALS_KEY
    ]
    idxs = set()
    for atom in st.atom:
        for key in atom_keys:
            if atom.property.get(key):
                idxs.add(atom.index)
    for key in st_keys:
        for jdxs in sfu._get_restrain_group_idxs(st, key):
            idxs.update(jdxs)
    return idxs 
def _get_new_restrain_group_idxs(st, key, old_to_new, offset=0):
    """
    Return new restrain group indices.
    :type st: schrodinger.structure.Structure
    :param st: the structure
    :type key: str
    :param key: determines the type of restrain coordinate
    :type old_to_new: dict
    :param old_to_new: the old-to-new atom index map
    :type offset: int
    :param offset: an offset added to all new restrain
        group indices, useful when assembling structures
        from parts of other structures
    :rtype: list
    :return: contains new restrain group indices
    """
    return sfu._get_new_restrain_group_idxs(st, key, old_to_new, offset=offset)
[docs]class ReactionWorkflowFile(RepresentativeConformersMixin):
    """
    Manage a reaction workflow file.
    """
[docs]    def __init__(self, rxn_sts):
        """
        Create an instance.
        :type rxn_sts: str or list
        :param rxn_sts: the reaction workflow structures, a file
            name or list of schrodinger.structure.Structure
        """
        if isinstance(rxn_sts, str):
            rxn_sts = [st for st in structure.StructureReader(rxn_sts)]
        self.sibling_conformers_dict = bin_structures_by_property(
            rxn_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY) 
[docs]    def getReactantsSiblingGroupName(self):
        """
        Return the reactants sibling group name.
        :rtype: str
        :return: the reactants sibling group name
        """
        for sibling_group, conformer_group, conformer in self.representativeConformers(
        ):
            parents = conformer.property.get(PARENT_SIBLING_GROUPS_KEY)
            if not parents:
                return sibling_group 
 
[docs]class ReactionWorkflowEnergyAnalysis(ReactionWorkflowFile):
    """
    Manage a reaction workflow energy analysis.
    """
[docs]    def __init__(self, rxn_sts, energy_keys):
        """
        Create an instance.
        :type rxn_sts: str or list
        :param rxn_sts: the reaction workflow structures, a file
            name or list of schrodinger.structure.Structure
        :type energy_keys: list
        :param energy_keys: structure property energy keys to consider, if it
            is temperature dependent then include the temperature (K) as a number
            followed by 'K' in the key and the corresponding energy must be in
            supported units (au, kcal/mol, eV, kJ/mol) and must be present in the
            key as '(<units>)'
        """
        super().__init__(rxn_sts)
        self.energy_keys = energy_keys
        # Cached units, see getUnitsData
        self._units_data = {} 
[docs]    def getUnitsData(self, prop):
        """
        Get conversion factor to kcal/mol, pressure and temperature from a
        property, saving the data into a dict.
        :param str prop: Property to use
        :rtype: float or None, float or None, float or None
        :return: Conversion to kcal/mol, pressure, temperature
        """
        try:
            return self._units_data[prop]
        except KeyError:
            val = (self.getKcalPerMolConversion(prop), self.getPressure(prop),
                   self.getTemperature(prop))
            self._units_data[prop] = val
            return val 
[docs]    @staticmethod
    def getTemperature(energy_key):
        """
        Return the temperature (K) for the given energy key.
        :type energy_key: str
        :param energy_key: structure property energy key
        :rtype: float, None
        :return: the temperature (K) if there is one
        """
        return jaguarworkflows.get_temperature(energy_key) 
[docs]    @staticmethod
    def getPressure(energy_key):
        """
        Return the pressure (atm) for the given energy key.
        :type energy_key: str
        :param energy_key: structure property energy key
        :rtype: float, None
        :return: the pressure (atm) if there is one
        """
        return jaguarworkflows.get_pressure(energy_key) 
[docs]    @staticmethod
    def getUnits(energy_key):
        """
        Return the units for the given energy key.
        :type energy_key: str
        :param energy_key: structure property energy key
        :rtype: str, None
        :return: the units if there is one
        """
        # units like 'kcal/mol' in '(kcal/mol)'
        match = re.search(r'\((\D+)\)', energy_key)
        if match:
            return match.group(1)
        base_energy_key = energy_key.split(STAGE_SEPARATOR)[0]
        _units = jmswfu.JAGUAR_PROP_UNITS_DICT.get(base_energy_key)
        if _units:
            return _units[1:-1] 
[docs]    @staticmethod
    def getKcalPerMolConversion(energy_key):
        """
        Return the kcal/mol conversion factor for the given energy key.
        :type energy_key: str
        :param energy_key: structure property energy key
        :rtype: float, None
        :return: the kcal/mol conversion factor if there is one
        """
        _units = ReactionWorkflowEnergyAnalysis.getUnits(energy_key)
        if _units in HARTREE_UNITS:
            conversion = units.KCAL_PER_MOL_PER_HARTREE
        elif _units in KCAL_PER_MOL_UNITS:
            conversion = 1
        elif _units in EV_UNITS:
            conversion = units.KCAL_PER_MOL_PER_EV
        elif _units in KJOULES_PER_MOL_UNITS:
            conversion = units.KCAL_PER_MOL_PER_EV
            conversion /= units.KJOULES_PER_MOL_PER_EV
        else:
            conversion = None
        return conversion 
[docs]    def getPropertyEnsemble(self,
                            conformers_dict,
                            energy_key,
                            conversion,
                            temp,
                            do_boltzmann,
                            include_x_terms=False,
                            only_lowest_energy=False,
                            property_key=None,
                            atom_idx=None):
        """
        Return an ensemble of properties for the given conformers
        dictionary of siblings and given energy key.
        :type conformers_dict: dict
        :param conformers_dict: keys are conformer group names, values are
            lists of `structure.Structure`
        :type energy_key: str
        :param energy_key: the relevant energy key
        :type conversion: float
        :param conversion: the energy conversion factor to kcal/mol
        :type temp: float
        :param temp: the temperature in K
        :type do_boltzmann: bool
        :param do_boltzmann: if True perform a Boltzmann average, otherwise an
            algebraic average
        :type include_x_terms: bool
        :param include_x_terms: whether to include cross terms in the
            conformational averaging
        :type only_lowest_energy: bool
        :param only_lowest_energy: use only the lowest energy conformer
            rather than averaging over conformers
        :type property_key: str
        :param property_key: the relevant property key, if not specified
            it is the same as the energy key
        :type atom_idx: int or None
        :param atom_idx: if an integer then the given property key is for an
            atomic property and this is the atom index, if None then the
            given property key is a structure property
        :raise ReactionWorkflowException: if there is an issue
        :rtype: list
        :return: ensemble of properties
        """
        # being able to use both siblings and a property key depends
        # on if the property key is for a size extensive property,
        # i.e. property(ABC) = property(A) + property(B) + property(C),
        # such is the case for energy but not for example bite angle, etc.
        if include_x_terms and only_lowest_energy:
            msg = ('Simultaneous use of include_x_terms and '
                   'only_lowest_energy is not supported.')
            raise ReactionWorkflowException(msg)
        if atom_idx and not property_key:
            msg = ('Atomic properties require an input property key.')
            raise ReactionWorkflowException(msg)
        if atom_idx and len(conformers_dict) > 1:
            msg = ('Atomic properties for sibling groups is not supported.')
            raise ReactionWorkflowException(msg)
        property_key = property_key or energy_key
        # the following getter takes a list that contains a sublist of conformers
        # for each reactant, if including cross terms then return the iterator
        # for all possible collections of mixed conformers of reactants else return
        # the original list
        getter = lambda x: itertools.product(*x) if include_x_terms else x
        ensemble = []
        for sample in getter(conformers_dict.values()):
            energies_properties = []
            for st in sample:
                energy = st.property.get(energy_key)
                if atom_idx:
                    if atom_idx <= st.atom_total:
                        aproperty = st.atom[atom_idx].property.get(property_key)
                    else:
                        aproperty = None
                else:
                    aproperty = st.property.get(property_key)
                pair = (energy, aproperty)
                if None in pair:
                    return []
                energies_properties.append(pair)
            min_energy, min_property = min(energies_properties,
                                           key=lambda x: x[0])
            energies, properties = list(zip(*energies_properties))
            if include_x_terms:
                ensemble.append(sum(properties))
            elif only_lowest_energy:
                ensemble.append(min_property)
            elif do_boltzmann:
                # calling code needs to ensure that conversion brings energies
                # to kcal/mol and that if the energy is temperature dependent
                # that temperature in K should be used, wp is the Boltzmann
                # averaged property, i runs over conformers, p_i is the property
                # being Boltzmann averaged, beta is 1/(kT) in units of mol/kcal,
                # E_i and E_min are in kcal/mol, and Z is the partition function
                #
                # wp = (sum_i p_i * exp(-beta * (E_i - E_min))) / Z
                # Z = sum_i exp(-beta * (E_i - E_min))
                #
                deltas = conversion * (numpy.array(energies) - min_energy)
                factors = numpy.exp(-1 * deltas / (IDEAL_GAS_CONSTANT * temp))
                weighted_property = sum(properties * factors)
                partition_f = sum(factors)
                weighted_property /= partition_f
                ensemble.append(weighted_property)
            else:
                ensemble.append(numpy.mean(properties))
        if not include_x_terms and ensemble:
            # see MATSCI-11763 - For sibling groups containing a mix of intermediates
            # and transition states treat the former as a spectator and use the
            # most negative frequency as the representative for the group, otherwise
            # assume the property is size-extensive and can be summed over the
            # siblings to arrive at the total property
            positive = [0 <= value for value in ensemble]
            if (property_key == jaguarworkflows.LOWEST_FREQUENCY_PROP and
                    any(positive) and not all(positive)):
                afunc = min
            else:
                afunc = sum
            ensemble = [afunc(ensemble)]
        return ensemble 
[docs]    def getProperties(self,
                      include_x_terms=False,
                      only_lowest_energy=False,
                      property_key=None,
                      atomic=False,
                      temps=None):
        """
        Return the properties.
        :type include_x_terms: bool
        :param include_x_terms: whether to include cross terms
        :type only_lowest_energy: bool
        :param only_lowest_energy: use only the lowest energy conformer
            rather than averaging over conformers
        :type property_key: str
        :param property_key: the property key, if not specified energy
            energy keys are used
        :type atomic: bool
        :param atomic: if True then the given property key is an atomic
            property, otherwise is a structure property
        :type temps: list
        :param temps: temperatures in K, only used for temperature independent
            energy and property keys
        :raise ReactionWorkflowException: if there is an issue
        :rtype: list[EnergyAnalysisProperty]
        :return: the properties
        """
        if include_x_terms and only_lowest_energy:
            msg = ('Simultaneous use of include_x_terms and '
                   'only_lowest_energy is not supported.')
            raise ReactionWorkflowException(msg)
        if atomic and not property_key:
            msg = ('Atomic properties require an input property key.')
            raise ReactionWorkflowException(msg)
        temps = temps or [jmswfu.DEFAULT_TEMP_START]
        properties = []
        for energy_key in self.energy_keys:
            conversion, energy_key_press, energy_key_temp = \
                
self.getUnitsData(energy_key)
            do_boltzmann = bool(conversion)
            conversion = conversion or 1
            this_property_key = property_key or energy_key
            _, this_property_key_press, this_property_key_temp = \
                    
self.getUnitsData(this_property_key)
            # if property temperature or pressure differs from that
            # of the energy used for averaging then skip it
            t_state = (energy_key_temp and this_property_key_temp and
                       energy_key_temp != this_property_key_temp)
            p_state = (energy_key_press and this_property_key_press and
                       energy_key_press != this_property_key_press)
            if t_state or p_state:
                continue
            # get the temperature used for averaging, comes from either
            # the given energy key, property key, or specified temperatures
            if energy_key_temp:
                these_temps = [energy_key_temp]
            elif this_property_key_temp:
                these_temps = [this_property_key_temp]
            else:
                these_temps = list(temps)
            press = this_property_key_press or energy_key_press or 1
            for temp in these_temps:
                # get the key that will hold the average property
                if this_property_key_temp:
                    avg_property_key = this_property_key
                else:
                    ext = jaguarworkflows.get_temp_press_key_ext(temp, press)
                    avg_property_key = f'{this_property_key}{ext}'
                for sibling_group, conformers_dict in self.sibling_conformers_dict.items(
                ):
                    representative_conformers = tuple(
                        min(conformers, key=lambda x: x.property[energy_key])
                        for conformers in conformers_dict.values())
                    # get atom indices if doing atomic properties
                    if atomic:
                        if len(conformers_dict) > 1:
                            msg = (
                                'Atomic properties for sibling groups is not '
                                'supported.')
                            raise ReactionWorkflowException(msg)
                        atom_idxs = list(
                            range(1,
                                  representative_conformers[0].atom_total + 1))
                    else:
                        atom_idxs = [None]
                    for atom_idx in atom_idxs:
                        ensemble = tuple(
                            self.getPropertyEnsemble(
                                conformers_dict,
                                energy_key,
                                conversion,
                                temp,
                                do_boltzmann,
                                include_x_terms=include_x_terms,
                                only_lowest_energy=only_lowest_energy,
                                property_key=this_property_key,
                                atom_idx=atom_idx))
                        if not ensemble:
                            continue
                        # define property object
                        aproperty = EnergyAnalysisProperty(
                            sibling_group=sibling_group,
                            conformer_groups=tuple(conformers_dict.keys()),
                            representative_conformers=representative_conformers,
                            temperature=temp,
                            pressure=press,
                            energy_key=energy_key,
                            property_key=this_property_key,
                            avg_property_key=avg_property_key,
                            atom_idx=atom_idx,
                            ensemble=ensemble,
                            include_x_terms=include_x_terms,
                            only_lowest_energy=only_lowest_energy)
                        properties.append(aproperty)
        return properties 
[docs]    def getConfAvgRelEnergies(self,
                              include_x_terms=True,
                              only_lowest_energy=False):
        """
        Return the conformationally averaged energies relative to that
        of the reactants.
        :type include_x_terms: bool
        :param include_x_terms: whether to include cross terms in the
            conformational averaging
        :type only_lowest_energy: bool
        :param only_lowest_energy: use only the lowest energy conformer
            rather than averaging over conformers
        :raise ReactionWorkflowException: if there is an issue
        :rtype: dict
        :return: keys are sibling group names, values are dicts with
            energy keys as keys and energy values as values
        """
        if include_x_terms and only_lowest_energy:
            msg = ('Simultaneous use of include_x_terms and '
                   'only_lowest_energy is not supported.')
            raise ReactionWorkflowException(msg)
        # with cross terms in the conformational averaging all possible
        # collections of conformers within a sibling group as well
        # between sibling groups are included in the averaging which
        # is done last, for example a high energy conformer of A is
        # allowed to react with a low energy conformer of B, etc. and that
        # relative to some collection of conformers from another sibling
        # group both (high, low), (low, high), etc. collections of (A, B)
        # are included, without cross terms the averaging is done first
        # over each molecule in the sibling group thereby reducing the
        # energy of the group down to a single number
        # canonical ensemble
        # relevant energies reported relative to reactants
        reactants_sibling_group = self.getReactantsSiblingGroupName()
        reactants_conformers_dict = self.getReactantsConformersDict()
        # for each energy key for both reactants and non-reactants
        # obtain a conformeric ensemble of energies summed over siblings
        # and collect energy differences for all pairs then perform a
        # Boltzmann average if the energy is both temperature dependent
        # and of known units and including cross terms, otherwise perform
        # an algebraic average
        edict = {}
        for energy_key in self.energy_keys:
            conversion = self.getKcalPerMolConversion(energy_key)
            temp = self.getTemperature(energy_key)
            do_boltzmann = conversion and temp
            conversion = conversion or 1
            reactants_energies = self.getPropertyEnsemble(
                reactants_conformers_dict,
                energy_key,
                conversion,
                temp,
                do_boltzmann,
                include_x_terms=include_x_terms,
                only_lowest_energy=only_lowest_energy)
            if not reactants_energies:
                # this means that at least a single reactant doesn't
                # have the given energy_key defined, so skip the energy_key
                continue
            header = self.getHeader(energy_key)
            edict.setdefault(reactants_sibling_group, {})[header] = 0.
            for sibling_group, conformers_dict in self.sibling_conformers_dict.items(
            ):
                if sibling_group == reactants_sibling_group:
                    continue
                energies = self.getPropertyEnsemble(
                    conformers_dict,
                    energy_key,
                    conversion,
                    temp,
                    do_boltzmann,
                    include_x_terms=include_x_terms,
                    only_lowest_energy=only_lowest_energy)
                if not energies:
                    # this means that at least a single non-reactant child doesn't
                    # have the given energy_key defined, so first undefine it for
                    # all sibling groups and then skip the energy_key by breaking
                    # out of the non-reactant child loop
                    for _edict in edict.values():
                        _edict.pop(header, None)
                    break
                deltas = numpy.array([
                    conversion * (enr - er) for enr, er in itertools.product(
                        energies, reactants_energies)
                ])
                if do_boltzmann and include_x_terms:
                    factors = numpy.exp(-1 * deltas /
                                        (IDEAL_GAS_CONSTANT * temp))
                    avg_delta = sum([
                        delta * factor
                        for delta, factor in zip(deltas, factors)
                    ])
                    partition_f = sum(factors)
                    # see MATSCI-7153 - where the denominator can be problematic
                    with warnings.catch_warnings():
                        warnings.filterwarnings('error')
                        try:
                            avg_delta /= partition_f
                        except RuntimeWarning:
                            msg = (
                                'Large energy differences with respect to reactants '
                                'have been detected.  This is typically a sign of a severely '
                                'imbalanced chemical reaction in the reaction workflow input '
                                'file.')
                            raise ReactionWorkflowException(msg)
                else:
                    avg_delta = numpy.mean(deltas)
                edict.setdefault(sibling_group, {})[header] = avg_delta
        return edict 
[docs]    def getGraph(self):
        """
        Return a NetworkX directed graph of the reaction workflow
        energy level diagram.
        :rtype: networkx.DiGraph
        :return: nodes are sibling group names and have energy
            dictionary kwargs, edges are directed (parent,
            child) pairs
        """
        edict_w_x = self.getConfAvgRelEnergies(include_x_terms=True,
                                               only_lowest_energy=False)
        edict_wo_x = self.getConfAvgRelEnergies(include_x_terms=False,
                                                only_lowest_energy=False)
        edict_lowest = self.getConfAvgRelEnergies(include_x_terms=False,
                                                  only_lowest_energy=True)
        graph = nx.DiGraph()
        for sibling_group, conformers_dict in self.sibling_conformers_dict.items(
        ):
            graph.add_node(sibling_group,
                           edict_w_x=edict_w_x[sibling_group],
                           edict_wo_x=edict_wo_x[sibling_group],
                           edict_lowest=edict_lowest[sibling_group])
            # parents are equivalent just grab an arbitrary first item
            parents = next(iter(conformers_dict.values()))[0].property.get(
                PARENT_SIBLING_GROUPS_KEY)
            if not parents:
                continue
            for parent_sibling_group in parents.split(PARENT_SEPARATOR):
                graph.add_edge(parent_sibling_group, sibling_group)
        return graph 
[docs]    @staticmethod
    def getOrderedNodeNames(graph):
        """
        Return an ordered list of node names from the given graph.
        :type graph: networkx.DiGraph
        :param graph: nodes are sibling group names and have energy
            dictionary kwargs, edges are directed (parent,
            child) pairs
        :rtype: list
        :return: contains ordered node names
        """
        # for example, all lines point right, up-right, or down-right
        # (just a non-physical example):
        #
        #       I2 -- TS2
        #      /  \  /   \
        #   TS1    I3     P
        #  /   \  /
        # R     I1
        #
        # is ordered as R,TS1,I1,I2,I3,TS2,P
        # find the reactant node
        for node in graph.nodes():
            if not list(graph.predecessors(node)):
                reactant_node = node
                break
        # start with the reactant node
        nodes = [reactant_node]
        current_nodes = [reactant_node]
        # collect nodes in order by adding for the current nodes
        # all successor nodes that have no predecessor nodes that
        # have not already been collected, currently successor nodes
        # are sorted by name rather than energy, avoid adding the
        # same node twice which can happen for loops in the graph,
        # the current nodes to search are updated until all nodes
        # have been collected
        while len(nodes) < graph.number_of_nodes():
            tmp = []
            for current_node in current_nodes:
                for successor_node in sorted(graph.successors(current_node)):
                    if not set(graph.predecessors(successor_node)).difference(
                            nodes):
                        if successor_node not in nodes:
                            nodes.append(successor_node)
                            tmp.append(successor_node)
            current_nodes = list(tmp)
        return nodes 
[docs]    def writeDataFiles(self):
        """
        Write data files.
        """
        # get ordered nodes
        graph = self.getGraph()
        node_names = self.getOrderedNodeNames(graph)
        base_name = jobutils.get_jobname(DEFAULT_JOB_NAME)
        out_files = []
        # handle the conformationally averaged energies relative to reactants
        for ext in REL_REACTANTS_EXTS:
            conf_avg_rel_reactants_file = base_name + ext
            kwarg = REL_EXT_TO_KWARG_DICT[ext]
            with csv_unicode.writer_open(conf_avg_rel_reactants_file) as afile:
                writer = csv.writer(afile)
                for idx, name in enumerate(node_names):
                    edict = graph.node[name][kwarg]
                    if not idx:
                        headers = sorted(edict.keys())
                        writer.writerow([SIBLING_GROUP_NAME] + headers)
                    values = [name] + [edict[header] for header in headers]
                    writer.writerow(values)
            out_files.append(conf_avg_rel_reactants_file)
        # handle the conformationally averaged energies relative to parents
        if len(node_names) > 1:
            for ext in REL_PARENTS_EXTS:
                conf_avg_rel_parents_file = base_name + ext
                kwarg = REL_EXT_TO_KWARG_DICT[ext]
                final_edict = {}
                for child_name in node_names:
                    child_edict = graph.node[child_name][kwarg]
                    parents_edict = {}
                    for parent_name in graph.predecessors(child_name):
                        parent_edict = graph.node[parent_name][kwarg]
                        parents_edict[parent_name] = dict([
                            (key, child_edict[key] - parent_edict[key])
                            for key in child_edict.keys()
                        ])
                    if parents_edict:
                        final_edict[child_name] = parents_edict
                with open(conf_avg_rel_parents_file, 'w') as afile:
                    json.dump(final_edict,
                              afile,
                              sort_keys=True,
                              indent=4,
                              separators=(',', ':'))
                out_files.append(conf_avg_rel_parents_file)
        for out_file in out_files:
            jobutils.add_outfile_to_backend(out_file)  
[docs]def get_stage_idx(astr, is_filename=False):
    """
    Return the stage index from the given string, can be a filename.
    :type astr: str
    :param astr: the string
    :type is_filename: bool
    :param is_filename: Whether astr is filename or not
    :rtype: int
    :return: the stage index
    """
    if is_filename:
        astr = fileutils.get_basename(astr)
    head, stage_hash = astr.split(STAGE_SEPARATOR)
    stage_idx = stage_hash.split('_')[0]
    return int(stage_idx) 
[docs]def check_TS_vetting(out_file):
    """
    Return False if TS vetting failed in the given Jaguar
    out file.
    :type out_file: str
    :param out_file: Jaguar out file
    :rtype: bool
    :return: False if TS vetting failed
    """
    with open(out_file, 'r') as afile:
        for line in afile:
            if line.startswith('Unable to find any valid transition vectors.'):
                return False
    return True 
[docs]def get_sub_host_str(obj, sub_host_attr, n_procs_attr):
    """
    Return the command line -HOST argument for using a subhost.
    :type obj: object
    :param obj: the object, possibly having the given attributes
        defined
    :type sub_host_attr: str
    :param sub_host_attr: the attribute for the subhost
    :type n_procs_attr: str
    :param n_procs_attr: the attribute for the number of processors
    """
    try:
        host = f'{getattr(obj, sub_host_attr)}:{getattr(obj, n_procs_attr)}'
    except AttributeError:
        host = jobutils.AUTOHOST
    return host 
[docs]class JMSWFException(Exception):
    pass 
[docs]class JMSWFMixin(RepresentativeConformersMixin):
    """
    Manage a Jaguar multistage workflow.
    """
    # various structure and atom properties dictate the behavior of this class
    def _overwriteConformerEIDS(self):
        """
        Overwrite the Maestro entry ID properties for the conformers.
        """
        # needed for Jaguar multistage workflow
        # see MATSCI-11669 - it is possible that input structures may have the
        # entry ID property undefined, handle that by grabbing the maximum value
        # that is defined in the input and incrementing from there
        max_eid = 0
        for conformers_dict in self.sibling_conformers_dict.values():
            for conformers in conformers_dict.values():
                for conformer in conformers:
                    eid = int(conformer.property.get(ENTRY_ID_KEY, 0))
                    max_eid = max(max_eid, eid)
        idx = 0
        fall_back_eid = max_eid
        for conformers_dict in self.sibling_conformers_dict.values():
            for conformers in conformers_dict.values():
                fall_back_eid += 1
                for conformer in conformers:
                    idx += 1
                    eid = conformer.property.get(ENTRY_ID_KEY,
                                                 str(fall_back_eid))
                    self.eids_dict.setdefault(eid, []).append(str(idx))
                    conformer.property[ENTRY_ID_KEY] = str(idx)
    def _getJMSWFGenResKeywords(self, st, include_temp_press_data=True):
        """
        Return the general reserved Jaguar multistage workflow keywords
        for the given structure.
        :type st: schrodinger.structure.Structure
        :param st: the structure
        :type include_temp_press_data: bool
        :param include_temp_press_data: whether to include the temperature
            and pressure data
        :rtype: OrderedDict
        :return: the keywords
        """
        adict = OrderedDict([('molchg', str(st.property.get(CHARGE_KEY, 0))),
                             ('multip',
                              str(st.property.get(MULTIPLICITY_KEY, 1)))])
        temp_data = getattr(self, 'temp_data', None)
        press_data = getattr(self, 'press_data', None)
        if include_temp_press_data and temp_data and press_data:
            adict['tmpini'] = str(temp_data.temp_start)
            adict['tmpstp'] = str(temp_data.temp_step)
            adict['ntemp'] = str(temp_data.temp_n)
            adict['press'] = str(press_data.press_start)
            adict['press_step'] = str(press_data.press_step)
            adict['npress'] = str(press_data.press_n)
        # see MATSCI-10584 and linked cases, if the Jaguar input file has
        # an initial guess wavefunction with a given dftname and basis, for
        # example from a previous calculation, that are the same as the dftname
        # and basis for the new job then igonly=1 (skip SCF) is set automatically
        # if igonly was not set, if the new job asks for properties, which will be
        # the general case, then the job will fail due to missing matrices required
        # for the property evaluation, prevent this by always setting igonly=0
        adict['igonly'] = '0'
        return adict
    def _getJMSWFOptKeywords(self, st, set_freq=True):
        """
        Return the Jaguar multistage workflow optimization keywords
        for the given structure.
        :type st: schrodinger.structure.Structure
        :param st: the structure
        :type set_freq: bool
        :param set_freq: if True then set 'ifreq=1'
        :rtype: str
        :return: the string of keywords
        """
        reserved_jaguar_keywords = self._getJMSWFGenResKeywords(
            st, include_temp_press_data=set_freq)
        reserved_jaguar_keywords['igeopt'] = '1'
        if set_freq:
            reserved_jaguar_keywords['ifreq'] = '1'
        jaguar_keywords = self.jaguar_keywords.copy()
        jaguar_keywords.update(reserved_jaguar_keywords)
        return ' '.join(get_jaguar_keywords_list(jaguar_keywords))
    @staticmethod
    def _getJMSWFCoords(st, are_restraints=True):
        """
        Return the Jaguar multistage workflow coords for the given structure.
        :type st: schrodinger.structure.Structure
        :param st: the structure
        :type are_restraints: bool
        :param are_restraints: True if the coords are restraint coords, as
            opposed to active coords
        :rtype: list
        :return: contains strings defining the coords
        """
        atype_key_pairs = [(4, RESTRAINED_DISTANCES_KEY),
                           (5, RESTRAINED_ANGLES_KEY),
                           (6, RESTRAINED_DIHEDRALS_KEY)]
        fields = ['{atype}', '{idxs}']
        if are_restraints:
            fields = [jmswfu.NONE] + fields
        aformat = jmswfu.DELIM.join(fields)
        coords = []
        for atype, key in atype_key_pairs:
            params = get_int_tuples_from_str_property(st, key)
            for param in params:
                idxs = jmswfu.DELIM.join(str(idx) for idx in param)
                coord = aformat.format(atype=atype, idxs=idxs)
                coords.append(coord)
        return coords
    def _getJMSWFTSKeywords(self, st):
        """
        Return the Jaguar multistage workflow transition state search
        keywords for the given structure.
        :type st: schrodinger.structure.Structure
        :param st: the structure
        :rtype: str
        :return: the string of keywords
        """
        reserved_jaguar_keywords = self._getJMSWFGenResKeywords(
            st, include_temp_press_data=True)
        reserved_jaguar_keywords['igeopt'] = '2'
        reserved_jaguar_keywords['ifreq'] = '1'
        reserved_jaguar_keywords['itrvec'] = '-6'
        reserved_jaguar_keywords['nhesref'] = '-1'
        # Jaguar TS vetting options, intentionally use the default
        # for ts_vet_olap_cut, for ts_vet use the value +3 which considers
        # both the eigenvector overlap with active atoms and distances
        # among active atoms, despite the Jaguar documentation using +3
        # as opposed to -3 does not cause a job failure so check
        # afterwards
        reserved_jaguar_keywords['ts_vet'] = '3'
        jaguar_keywords = self.jaguar_keywords.copy()
        jaguar_keywords.update(reserved_jaguar_keywords)
        return ' '.join(get_jaguar_keywords_list(jaguar_keywords))
    def _getJMSWFStages(self):
        """
        Return the Jaguar multistage workflow stages.
        :rtype: list
        :return: contains the jmswfu.StageData for the Jaguar
            multistage workflow
        """
        has_ts = getattr(self, 'has_ts', False)
        # if there is a TS stage then prevent divergence in the stage
        # indices by breaking up the work needed for non-TS stages
        # into two stages
        aformat = jmswfu.DELIM.join(['{eid}', '{rest}'])
        opt_jaguar_keywords_lines = []
        opt_restraints_lines = []
        ts_jaguar_keywords_lines = []
        ts_restraints_lines = []
        ts_actives_lines = []
        for sibling_group, conformers_dict in self.sibling_conformers_dict.items(
        ):
            for conformer_group, conformers in conformers_dict.items():
                for conformer in conformers:
                    eid = conformer.property[ENTRY_ID_KEY]
                    all_opt_restraints = self._getJMSWFCoords(
                        conformer, are_restraints=True)
                    set_freq = not has_ts and not bool(all_opt_restraints)
                    opt_jaguar_keywords = self._getJMSWFOptKeywords(
                        conformer, set_freq=set_freq)
                    opt_jaguar_keywords_lines.append(
                        aformat.format(eid=eid, rest=opt_jaguar_keywords))
                    for opt_restraints in all_opt_restraints:
                        opt_restraints_lines.append(
                            aformat.format(eid=eid, rest=opt_restraints))
                    is_ts = conformer.property.get(
                        TRANSITION_STATE_STRUCTURE_KEY)
                    if has_ts or is_ts:
                        if not is_ts:
                            set_freq = not bool(all_opt_restraints)
                            ts_jaguar_keywords = self._getJMSWFOptKeywords(
                                conformer, set_freq=set_freq)
                            for opt_restraints in all_opt_restraints:
                                ts_restraints_lines.append(
                                    aformat.format(eid=eid,
                                                   rest=opt_restraints))
                        else:
                            ts_jaguar_keywords = self._getJMSWFTSKeywords(
                                conformer)
                            for ts_actives in self._getJMSWFCoords(
                                    conformer, are_restraints=False):
                                ts_actives_lines.append(
                                    aformat.format(eid=eid, rest=ts_actives))
                        ts_jaguar_keywords_lines.append(
                            aformat.format(eid=eid, rest=ts_jaguar_keywords))
        # the final hessian from a restrained optimization may not be a good
        # guess for a transition state search so do not inherit it from the
        # parent
        in_txt_file = os.path.join(self._jmswf_launch_dir,
                                   self._jmswf_in_txt_file)
        with open(in_txt_file, 'w') as afile:
            afile.write(jmswfu.NEW_STAGE + '\n')
            afile.write(jmswfu.KEYWORDS + '\n')
            for opt_jaguar_keywords_line in opt_jaguar_keywords_lines:
                afile.write(opt_jaguar_keywords_line + '\n')
            if opt_restraints_lines:
                afile.write(jmswfu.GEOM_CONSTRAINTS + '\n')
                for opt_restraints_line in opt_restraints_lines:
                    afile.write(opt_restraints_line + '\n')
            if ts_jaguar_keywords_lines:
                afile.write(jmswfu.NEW_STAGE + '\n')
                afile.write(jmswfu.PARENT + '\n')
                afile.write(
                    jmswfu.DELIM.join(['1', jmswfu.WAVEFUNCTION]) + '\n')
                afile.write(jmswfu.KEYWORDS + '\n')
                for ts_jaguar_keywords_line in ts_jaguar_keywords_lines:
                    afile.write(ts_jaguar_keywords_line + '\n')
                if ts_restraints_lines:
                    afile.write(jmswfu.GEOM_CONSTRAINTS + '\n')
                    for ts_restraints_line in ts_restraints_lines:
                        afile.write(ts_restraints_line + '\n')
                if ts_actives_lines:
                    afile.write(jmswfu.ACTIVE_COORDINATES + '\n')
                    for ts_actives_line in ts_actives_lines:
                        afile.write(ts_actives_line + '\n')
        stages = jmswfu.read_stage_datafile(in_txt_file)
        fileutils.force_remove(in_txt_file)
        return stages
    def _updateJMSWFStages(self, stages):
        """
        Update the Jaguar multistage workflow stages.
        :type stages: list
        :param stages: contains the jmswfu.StageData for the Jaguar
            multistage workflow
        :raise JMSWFException: if there is an issue
        :rtype: list
        :return: contains the jmswfu.StageData for the Jaguar
            multistage workflow
        """
        unique_geom_dict = getattr(self, 'unique_geom_dict', {})
        n_unique_geom = len(unique_geom_dict)
        # here we need to read in the extra stages file and modify
        # the returned jmswfu.StageData objects so that they can
        # be seamlessly appended to the given stages, the first
        # of these extra stages is skipped so as to allow analysis
        # to potentially be the first extra stage
        extra_stages = jmswfu.read_stage_datafile(self.extra_stages_file)[1:]
        n_prev_stages = len(stages)  # will be either 1 or 2
        existing_names = {x.info.name for x in stages}
        for index, extra_stage in enumerate(extra_stages, n_prev_stages + 1):
            # update the index of this stage
            extra_stage.index = index
            # We are merging stages from two different sources and the job
            # name depends on the stage name, so make sure all the extra
            # stages have names that are unique
            count = index
            while extra_stage.info.name in existing_names:
                extra_stage.info.name = jmswfu.GENERIC_STAGE_TAG + str(count)
                count += 1
            existing_names.add(extra_stage.info.name)
            # update any parent indices
            for stage in extra_stage.parent_data:
                stage.stage += n_prev_stages - 1
            # update the parenting indices for any analysis stages
            for stage in extra_stage.analyze_data:
                stage.parent_st_idx += n_prev_stages - 1
                stage.parent_idxs = [
                    i + n_prev_stages - 1 for i in stage.parent_idxs
                ]
            # update entry IDs, the original entry IDs have been reset to start at
            # 1 and now include values for any generated conformers, see
            # ._overwriteConformerEIDS() and .eids_dict which for example
            # might contain something like eids_dict[17] = [4,5,6,7] if for the
            # original entry ID 17 there were 4 conformers generated which were
            # assigned entry IDs 4, 5, 6, and 7, all entry_data data needs
            # updated entry IDs
            for stage_subsection, eid_dict in extra_stage.entry_data.items():
                # see MATSCI-11251 - if using geometry deduplication then for relevant
                # stages in the extra stages file the number of structure entries will
                # be greater than it should be due to the deduplication so update it
                if n_unique_geom:
                    all_eid_pairs = list(eid_dict.items())
                    unique_eid_pairs = all_eid_pairs[:n_unique_geom]
                    eid_dict = dict(unique_eid_pairs)
                # see MATSCI-6429 - since the main interface and all known use cases
                # force equivalent Jaguar keyword data for all structure entries in an
                # extra stage we will overwrite stale entry IDs by making this assumption
                # and choosing an arbitrary mapping of stale entry IDs to current entry
                # IDs
                uncommon = set(eid_dict.keys()).difference(
                    self.eids_dict.keys())
                if len(eid_dict) == len(self.eids_dict) and uncommon:
                    eid_dict = dict(
                        zip(self.eids_dict.keys(), eid_dict.values()))
                new_eid_dict = {}
                for eid, datas in eid_dict.items():
                    for new_eid in self.eids_dict.get(eid, []):
                        new_datas = []
                        for data in datas:
                            new_data = copy.deepcopy(data)
                            new_data.eid = new_eid
                            new_datas.append(new_data)
                        new_eid_dict[new_eid] = new_datas
                if not new_eid_dict:
                    msg = (
                        f'The extra stages file {self.extra_stages_file} contains '
                        'at least a single stage that does not define stage data '
                        'for any of the entries in the input file.')
                    raise JMSWFException(msg)
                extra_stage.entry_data[stage_subsection] = new_eid_dict
        return stages + extra_stages
    def _setUpJMSWF(self):
        """
        Set up the Jaguar multistage workflow.
        :rtype: list
        :return: contains the jmswfu.StageData for the Jaguar
            multistage workflow
        """
        extra_stages_file = getattr(self, 'extra_stages_file', None)
        os.makedirs(self._jmswf_launch_dir, exist_ok=True)
        with structure.StructureWriter(
                os.path.join(self._jmswf_launch_dir,
                             self._jmswf_in_mae_file)) as writer:
            for sibling_group, conformers_dict in self.sibling_conformers_dict.items(
            ):
                for conformer_group, conformers in conformers_dict.items():
                    for conformer in conformers:
                        writer.append(conformer)
        # jmswfu manages the global smap file which if there aren't any
        # frequency jobs will be removed at the end
        base_name = os.path.join(self._jmswf_launch_dir,
                                 self._jmswf_base_name) + OUT_EXT
        jmswfu.create_smap(base_name, self._jmswf_out_mae_file)
        stages = self._getJMSWFStages()
        if extra_stages_file:
            stages = self._updateJMSWFStages(stages)
        jmswfu.write_stages_file(
            stages, os.path.join(self._jmswf_launch_dir,
                                 self._jmswf_in_txt_file))
        return stages
[docs]    def runJMSWF(self):
        """
        Run the Jaguar multistage workflow.
        :raise JMSWFException: if there is an issue
        """
        stages = self._setUpJMSWF()
        options = argparse.Namespace(TPP=self.tpp,
                                     name=self._jmswf_base_name,
                                     input_file=self._jmswf_in_mae_file)
        host = get_sub_host_str(self, 'subhost', 'n_jmswf_subjobs')
        job_dj = jobutils.create_queue(options=options, host=host)
        # uses jobutils.RobustSubmissionJob by default
        with fileutils.chdir(self._jmswf_launch_dir):
            workflows = jmswfu.create_workflows(options,
                                                job_dj,
                                                stages,
                                                self._jmswf_out_smap_file,
                                                hierarchical=False,
                                                robust=True)
            with structure.StructureWriter(self._jmswf_out_mae_file) as writer:
                jaguarworkflows.run_workflows(job_dj, workflows, writer)
            backend = None
            jmswfu.finalize_smap(self._jmswf_out_smap_file, backend)
        # for restarts it is possible that nothing was done
        all_failed = job_dj.total_added and len(
            job_dj._failed) == job_dj.total_added
        # handle partial failures in self.setRepresentatives
        if self.return_jaguar_files or all_failed:
            jobutils.add_zipfile_to_backend(self._jmswf_launch_dir)
        if all_failed:
            msg = ('All Jaguar multistage workflow jobs have failed.  '
                   'All output files will be returned.')
            raise JMSWFException(msg) 
[docs]    def prepareJMSWFOutput(self):
        """
        Prepare Jaguar multistage workflow output.
        """
        exts = [f'{OUT_EXT}.maegz', f'{OUT_EXT}.smap'
               ] + jmswfu.SMAP_ELIGIBLE_EXTENSIONS
        for ext in exts:
            pattern = os.path.join(self._jmswf_launch_dir, '*' + ext)
            for apath in glob.glob(pattern):
                afile = os.path.split(apath)[1]
                shutil.copy(apath, afile)
                jobutils.add_outfile_to_backend(afile) 
[docs]    def checkJMSWFOutputs(self, out_files):
        """
        Raises JMSWFException if any of the given Jaguar out files
        should be treated as a failure.
        :type out_files: list
        :param out_files: contains Jaguar output files
        :raises JMSWFException: If any of the given Jaguar out files
            should be treated as a failure
        """
        # for transition state structures vetting is used however a bad
        # result doesn't cause a Jaguar failure, nor does multiple imaginary
        # frequencies for either optimizations or transition states
        for out_file in out_files:
            # Jaguar failures have already been handled in the calling code
            # so missing Jaguar *out files means that the stage is an analysis
            # stage
            out_path = os.path.join(self._jmswf_launch_dir, out_file)
            if not os.path.exists(out_path):
                continue
            # skip non-frequency jobs
            jag_out = JaguarOutput(out_path)
            normal_modes = anharmonic.get_normal_modes(jag_out)
            if not normal_modes:
                continue
            # check TS, when asking for vetting there are three outcomes:
            # (1) vetting performed and a vetted vector found
            # (2) vetting performed and a vetted vector not found
            # (3) vetting not performed (see MATSCI-5145)
            #
            # see MATSCI-10176 - jag_out.getStructure() does NOT include input
            # Maestro structure properties
            mae_path = os.path.join(self._jmswf_launch_dir,
                                    f'{jag_out.restart}.mae')
            st = structure.Structure.read(mae_path)
            if st.property.get(TRANSITION_STATE_STRUCTURE_KEY):
                if not check_TS_vetting(out_path):
                    # the found transition state does not involve the requested
                    # active atoms
                    raise JMSWFException(
                        "The found transition state does not involve the requested active atoms"
                    )
                normal_mode = jag_out.vetted_ts_vector
                if normal_mode and normal_mode.frequency != min(
                    [x.frequency for idx, x in normal_modes]):
                    # the normal mode involving the active atoms is not the lowest
                    # frequency mode
                    raise JMSWFException(
                        "The normal mode involving the active atoms is not the lowest frequency mode"
                    )
            # check imaginary frequencies
            in_path = os.path.join(self._jmswf_launch_dir,
                                   f'{jag_out.restart}.in')
            if not os.path.exists(in_path):
                raise JMSWFException(f'The file {in_path} does not exists')
            jag_in = JaguarInput(input=in_path)
            try:
                anharmonic.check_imaginary_frequencies(
                    jag_out, jag_in, max_i_freq=self.max_i_freq)
            except anharmonic.AnharmonicException as err:
                raise JMSWFException(str(err)) 
    @staticmethod
    def _setProperties(st, key, stage_idx, value):
        """
        Set properties on the given structure.
        :type st: schrodinger.structure.Structure
        :param st: the structure
        :type key: str
        :param key: the property key
        :type stage_idx: int
        :param stage_idx: the stage index for the property
        :type value: float
        :param value: the property value
        :rtype: set
        :return: the created property keys
        """
        rep_key = key + STAGE_SEPARATOR + str(stage_idx)
        adict = {rep_key: value}
        st.property.update(adict)
        return set(adict.keys())
    def _updateSiblingConformersDict(self):
        """
        Update the sibling conformers dict.
        """
        # currently the self.sibling_conformers_dict contains conformers with
        # redundant structure titles if the conformers came from MacroModel, while
        # the conformers collected in the representatives coming from the Jaguar
        # multistage workflow job have been uniqueified using a '-<index>' appended
        # to the title, here we need to update the self.sibling_conformers_dict to
        # have unique titles, additionally the current self.sibling_conformers_dict
        # contains conformers considered as failures in the calling code and so also
        # remove any bad conformers
        rep_titles = set(rep_st.title for rep_st in self.rep_sts)
        for sibling_group, conformers_dict in list(
                self.sibling_conformers_dict.items()):
            for conformer_group, conformers in list(conformers_dict.items()):
                prev_title = None
                good_conformers = []
                for idx, conformer in enumerate(conformers, 1):
                    title = conformer.title
                    if title == prev_title and idx > 1:
                        title += f'-{idx}'
                    else:
                        prev_title = title
                    if title in rep_titles:
                        conformer.title = title
                        good_conformers.append(conformer)
                self.sibling_conformers_dict[sibling_group][
                    conformer_group] = good_conformers
[docs]    def setRepresentatives(self):
        """
        Associated with each structure is output data from potentially multiple
        Jaguar multistage workflow stages.  Pick representative structures to
        carry the data for all stages.
        :raise JMSWFException: if there is an issue
        """
        # bin stage structures by original structure title,
        # titles are like 'reactant_stage_1', 'transition_state-2_stage_4_analysis_2',
        # etc.
        title_stage_sts_dict = {}
        for st in structure.StructureReader(self._jmswf_out_mae_file):
            title, stage_hash = st.title.split(STAGE_SEPARATOR)
            title_stage_sts_dict.setdefault(title, []).append((st, stage_hash))
        stages = jmswfu.read_stage_datafile(
            os.path.join(self._jmswf_launch_dir, self._jmswf_in_txt_file))
        rep_stage_idx = get_rep_stage_idx(
            rxnwf_file_path=self._jmswf_out_mae_file)
        warned = False
        copied = self.return_jaguar_files
        for title, pairs in title_stage_sts_dict.items():
            skip_msg = False
            # the IndexError indicates that the required original workflow
            # Jaguar calculation has failed for this conformer, continue as there
            # may be other successful conformers
            try:
                rep_st = pairs[rep_stage_idx - 1][0].copy()
            except IndexError:
                skip_msg = ('Some Jaguar multistage workflow jobs have failed.')
            if not skip_msg:
                out_files = [f'{pair[0].title}.out' for pair in pairs]
                try:
                    self.checkJMSWFOutputs(out_files)
                except JMSWFException as err:
                    skip_msg = str(err)
                    skip_msg += (
                        'Postprocessing of at least a single Jaguar multistage workflow '
                        'job has failed.')
            if skip_msg:
                if not warned:
                    if self.logger:
                        skip_msg += ('  All output files will be returned.')
                        self.logger.warning(skip_msg)
                    warned = True
                if not copied:
                    jobutils.add_zipfile_to_backend(self._jmswf_launch_dir)
                    copied = True
                continue
            rep_st.title = title
            for st, stage_hash in pairs[rep_stage_idx - 1:]:
                stage_idx = int(stage_hash.split('_')[0])
                stage = stages[stage_idx - 1]
                keys = stage.getPropertyKeys(st=st)
                # see MATSCI-10926 where we need to add a key to the keys from a
                # standard JMSWF
                if set([
                        jaguarworkflows.GAS_PHASE_ENERGY_PROP,
                        jaguarworkflows.ZERO_POINT_ENERGY_PROP
                ]).issubset(keys):
                    keys.append(
                        jaguarworkflows.GAS_PHASE_ZERO_POINT_ENERGY_PROP)
                for key in keys:
                    rep_st.property.pop(key, None)
                    if key == jaguarworkflows.GAS_PHASE_ZERO_POINT_ENERGY_PROP:
                        gpe = st.property.get(
                            jaguarworkflows.GAS_PHASE_ENERGY_PROP)
                        zpe = st.property.get(
                            jaguarworkflows.ZERO_POINT_ENERGY_PROP)
                        if gpe is not None and zpe is not None:
                            zpe *= units.HARTREE_PER_KCAL_PER_MOL
                            value = gpe + zpe
                        else:
                            value = None
                    else:
                        value = st.property.get(key)
                    if value is not None:
                        _rep_keys = self._setProperties(rep_st, key, stage_idx,
                                                        value)
                        self.rep_keys.update(_rep_keys)
            self.rep_sts.append(rep_st)
        if not self.rep_sts:
            msg = (
                'Postprocessing of all Jaguar multistage workflow jobs has failed.'
            )
            raise JMSWFException(msg)
        self._updateSiblingConformersDict()
        # validate the output workflow file
        sibling_conformers_dict = bin_structures_by_property(
            self.rep_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY)
        for sibling_group, conformer_group, conformer in self.representativeConformers(
        ):
            conformers_dict = sibling_conformers_dict.get(sibling_group)
            if not conformers_dict:
                msg = (
                    'All Jaguar multistage workflow calculations for sibling '
                    f'group {sibling_group} have failed.')
                raise JMSWFException(msg)
            if not conformers_dict.get(conformer_group):
                msg = (
                    'All Jaguar multistage workflow calculations for conformer '
                    f'group {conformer_group} have failed.')
                raise JMSWFException(msg) 
[docs]    def finalizeJMSWFOutput(self):
        """
        Finalize the Jaguar multistage workflow output.
        :rtype: str
        :return: the Jaguar multistage workflow output file
        """
        jmswf_tag = getattr(self, 'jmswf_tag', '')
        # create a valid non-redundant output file from the
        # Jaguar multistage workflow output file
        base_name = jobutils.get_jobname(
            DEFAULT_JOB_NAME) + f'{OUT_EXT}{jmswf_tag}'
        out_mae_file = base_name + '.mae'
        structure.write_cts(self.rep_sts, out_mae_file)
        # the difference between the jmswf and rxnwf output Maestro files is that
        # the latter, rather than providing a structure for each stage, chooses a
        # single structure to be the representative structure in the rxnwf output,
        # that representative comes from the last stage of the rxnwf excluding any
        # stages from a potential extra stages file, this representative carries
        # the properties of all stages, as a result of this if both a rxnwf and
        # extra stage specify keywords to create files with smap extensions, like
        # *vib, *vis, etc., only a single file can be linked, since it is unlikely
        # that this will arise as extra stages are typically for energy
        # post-processing choose to link those from the rxnwf stages, all will
        # still be available in the jmswf output files which are copied back to the
        # launch host
        unique_geom_dict = getattr(self, 'unique_geom_dict', {})
        rep_stage_idx = get_rep_stage_idx(rxnwf_sts=self.rep_sts)
        rep_sts_dict = {
            rep_st.title: idx for idx, rep_st in enumerate(self.rep_sts, 1)
        }
        # since the Jaguar calculations were only performed on the representative
        # structure geometries the relevant files do not exist for the structures
        # that the representatives represent, so for these structures point
        # the files of the representatives
        smap_dict = {}
        for title, idx in rep_sts_dict.items():
            title = get_unique_geom_title(title, unique_geom_dict)
            for ext in jmswfu.SMAP_ELIGIBLE_EXTENSIONS:
                pattern = f'{title}{STAGE_SEPARATOR}{rep_stage_idx}*{ext}'
                for afile in glob.glob(pattern):
                    smap_dict.setdefault(afile, []).append(idx)
        if smap_dict:
            out_smap_file = jmswfu.create_smap(base_name,
                                               out_mae_file,
                                               smap_dict=smap_dict)
            jobutils.add_outfile_to_backend(out_smap_file)
        jobutils.add_outfile_to_backend(out_mae_file, set_structure_output=True)
        return out_mae_file  
[docs]class DescriptorsMixin(object):
    """
    Manage running descriptors.
    """
    DEFAULT_JOB_NAME = 'automatic_reaction_workflow'
[docs]    def runDescriptors(self, files):
        """
        Run descriptors.
        :type files: list
        :param files: the files on which to run descriptors
        :rtype: list
        :return: The output structure file paths. The returned list will be the
            same length as the input files list.
        """
        # handle restarts
        job_name = jobutils.get_jobname(self.DEFAULT_JOB_NAME)
        zip_files = jaguar_restart.get_restart_zip_files(
            base_name_patterns=[f'{DESCRIPTORS_LAUNCH_DIR_HEADER}{job_name}*'])
        jaguar_restart.prepare_restart_dirs(zip_files)
        host = get_sub_host_str(self, 'jmswf_host', 'n_rxnwf_subjobs')
        # keep Jaguar &gen section to a minimum for now
        keys = ['dftname', 'basis']
        jaguar_keywords = {key: self.jaguar_keywords[key] for key in keys}
        jaguar_keywords = get_jaguar_keywords_list(jaguar_keywords)
        jaguar_keywords = ' '.join(jaguar_keywords)
        options = argparse.Namespace(TPP=self.tpp)
        job_dj = jobutils.create_queue(options=options, host=host)
        base_cmd = ['complex_descriptors.py']
        base_cmd += [FLAG_JAGUAR]
        base_cmd += [FLAG_JAGUAR_KEYWORDS, jaguar_keywords]
        base_cmd += [FLAG_TPP, str(self.tpp)]
        base_cmd += [FLAG_LIGFILTER]
        base_cmd += [FLAG_CANVAS]
        base_cmd += [FLAG_MOLDESCRIPTORS]
        base_cmd += [FLAG_INCLUDE_VECTORIZED]
        base_cmd += [FLAG_FINGERPRINTS]
        if self.return_descriptor_files:
            base_cmd += [FLAG_SAVEFILES]
        if host != jobutils.AUTOHOST:
            base_cmd += ['-HOST', self.jmswf_host]
        out_rep = getattr(self, 'out_rep', None)
        if out_rep:
            base_cmd += [parserutils.FLAG_OUT_REP, out_rep]
        # collect jobs, the complex descriptors driver does more than
        # just running Jaguar jobs so currently restart each job even
        # if all Jaguar *out files are available, for such cases the Jaguar
        # jobs are not restarted
        launch_dirs = []
        for afile in files:
            job_name = fileutils.get_basename(afile)
            launch_dir = job_name = f'{DESCRIPTORS_LAUNCH_DIR_HEADER}{job_name}'
            cmd = list(base_cmd)
            cmd += ['-JOBNAME', job_name]
            cmd += [FLAG_INPUT_FILE, afile]
            cmd += [FLAG_STPROPS, f'{job_name}.mae']
            os.makedirs(launch_dir, exist_ok=True)
            shutil.copy(afile, os.path.join(launch_dir, afile))
            job = jobutils.RobustSubmissionJob(cmd, command_dir=launch_dir)
            job_dj.addJob(job)
            launch_dirs.append(launch_dir)
        if self.logger:
            self.logger.info('Running descriptors on all files.')
            self.logger.info('')
        job_dj.run()
        for launch_dir in launch_dirs:
            zip_file = launch_dir + '.zip'
            zip_directory(launch_dir, fileobj=zip_file)
            jobutils.add_outfile_to_backend(zip_file)
        file_paths = []
        for launch_dir in launch_dirs:
            file_path = os.path.join(launch_dir, f'{launch_dir}.mae')
            if not os.path.exists(file_path):
                msg = (f'Descriptor output structure file {file_path} '
                       'can not be found.')
                raise ReactionWorkflowException(msg)
            file_paths.append(file_path)
        return file_paths  
[docs]def has_transition_state(rxnwf_file_path=None, rxnwf_sts=None):
    """
    Return True if the reaction workflow features a transition state,
    False otherwise.
    :type rxnwf_file_path: str or None
    :type rxnwf_file_path: the path to a reaction workflow file
    :type rxnwf_sts: list[`schrodinger.structure.Structure`] or None
    :type rxnwf_sts: reaction workflow structures
    :rtype: bool
    :return: True if the reaction workflow features a transition state,
        False otherwise
    """
    assert bool(rxnwf_file_path) ^ bool(rxnwf_sts)
    if rxnwf_file_path:
        rxnwf_sts = list(structure.StructureReader(rxnwf_file_path))
    for rxnwf_st in rxnwf_sts:
        if rxnwf_st.property.get(TRANSITION_STATE_STRUCTURE_KEY):
            return True
    return False 
[docs]def get_rep_stage_idx(rxnwf_file_path=None, rxnwf_sts=None):
    """
    Return the representative stage index.
    :type rxnwf_file_path: str or None
    :type rxnwf_file_path: the path to a reaction workflow file
    :type rxnwf_sts: list[`schrodinger.structure.Structure`] or None
    :type rxnwf_sts: reaction workflow structures
    :rtype: int
    :return: the representative stage index
    """
    assert bool(rxnwf_file_path) ^ bool(rxnwf_sts)
    has_ts = has_transition_state(rxnwf_file_path=rxnwf_file_path,
                                  rxnwf_sts=rxnwf_sts)
    if has_ts:
        rep_stage_idx = 2
    else:
        rep_stage_idx = 1
    return rep_stage_idx 
[docs]class UniqueGeomMixin:
    """
    Manage uniqueifying structures by geometry.
    """
    def _setUniqueGeomDict(self, sts):
        """
        Set the unique geometry dictionary.
        :type sts: list[`schrodinger.structure.Structure`]
        :param sts: the structures
        """
        if not self.dedup_geom_eps:
            # this means deduplicating is not being performed
            return
        # see MATSCI-10591 - set up a cache for unique geometries to dedup
        # expensive calculations like MacroModel, Jaguar, etc., arbitrarily
        # use the first geometry
        #
        # see MATSCI-11251 - JMSWF uses cleaned structure titles to name
        # Jaguar subjob input files, while the title (and entry name) on the
        # structure in the input file is unchanged, the titles (and entry names)
        # of structures in the main JMSWF output Maestro file are cleaned, so
        # used cleaned titles here
        unique_geom_dict = bin_by_geometry(sts, eps=self.dedup_geom_eps)
        self.unique_geom_dict = {
            jobutils.clean_string(ref_st.title): [ref_st] + other_sts
            for ref_st, other_sts in unique_geom_dict.items()
        }
    def _getUniqueGeomSts(self, sts):
        """
        Return representative structures with unique geometries.
        :type sts: list[`schrodinger.structure.Structure`]
        :param sts: the structures
        :rtype: list[`schrodinger.structure.Structure`]
        :return: the structures
        """
        # arbitrarily use the first geometry
        return [sts[0] for sts in self.unique_geom_dict.values()] or sts
    def _duplicateUniqueGeomSts(self):
        """
        Duplicate the structures choosen as unique geometries.
        :rtype: list[`schrodinger.structure.Structure`]
        :return: the structures
        """
        # for conformers created from an original structure JMSWF
        # will append '-<idx>'
        new_rep_sts = []
        for rep_st in self.rep_sts:
            new_rep_sts.append(rep_st)
            if not self.unique_geom_dict:
                continue
            orig_title, title_ext = get_orig_title(rep_st.title,
                                                   self.unique_geom_dict)
            _, ename_ext = get_orig_title(
                rep_st.property[msprops.ENTRY_NAME_PROP], self.unique_geom_dict)
            sts = self.unique_geom_dict[orig_title]
            orig_st, other_sts = sts[0], sts[1:]
            for other_st in other_sts:
                _other_st = rep_st.copy()
                _other_st.property.update(other_st.property)
                _other_st.title = f'{jobutils.clean_string(other_st.title)}{title_ext}'
                _other_st.property[msprops.ENTRY_NAME_PROP] = \
                    
f'{jobutils.clean_string(other_st.title)}{ename_ext}'
                new_rep_sts.append(_other_st)
        return new_rep_sts
    def _updateUniqueGeoms(self, sibling_conformers_dict):
        """
        Update the unique geometry dictionary and use it to update the
        given structure dictionary.
        :type sibling_conformers_dict: dict
        :param sibling_conformers_dict: dictionary of dictionaries where the
            outer dictionary is keyed by sibling and inner dictionary is keyed by
            conformer and values of the inner dictionary are lists of structures
        :rtype: dict
        :return: dictionary of dictionaries where the outer dictionary is keyed
            by sibling and inner dictionary is keyed by conformer and values of
            the inner dictionary are lists of structures
        """
        # relative to all original structures the following list contains
        # those representatives that have unique geometries plus any of
        # their guess conformers
        new_sts = flatten_sibling_conformers(sibling_conformers_dict)
        # guess conformers have been generated for the representatives, now
        # generate the same for the structures being represented
        all_sts = []
        for new_st in new_sts:
            all_sts.append(new_st)
            new_xyz = new_st.getXYZ()
            for old_sts in self.unique_geom_dict.values():
                # structure titles of the generated guess conformers for the
                # representative may be appended with some identifier, for
                # example 'Fe-complex', 'Fe-complex_252_324', etc.
                if new_st.title.startswith(old_sts[0].title):
                    # 0th structure is the original old structure
                    for old_st in old_sts[1:]:
                        tmp_st = old_st.copy()
                        tmp_st.setXYZ(new_xyz)
                        all_sts.append(tmp_st)
        self._setUniqueGeomDict(all_sts)
        sts = self._getUniqueGeomSts(all_sts)
        return bin_structures_by_property(sts,
                                          key=SIBLING_GROUP_KEY,
                                          inner_key=CONFORMERS_GROUP_KEY) 
[docs]class ReactionWorkflowException(Exception):
    pass 
[docs]class ReactionWorkflow(ConformationalSearchMixin, JMSWFMixin, UniqueGeomMixin):
    """
    Manage a reaction workflow.
    """
[docs]    def __init__(self,
                 reaction_wf_input_sts,
                 dedup_geom_eps=parserutils.DEFAULT_DEDUP_GEOM_EPS,
                 n_conformers=DEFAULT_N_CONFORMERS,
                 skip_eta_rotamers=False,
                 pp_rel_energy_thresh=PP_CSEARCH_REL_ENERGY_THRESH,
                 seed=parserutils.RANDOM_SEED_DEFAULT,
                 only_rings=True,
                 return_csearch_files=False,
                 jaguar_keywords=DEFAULT_JAGUAR_KEYWORDS,
                 temp_data=None,
                 press_data=None,
                 return_jaguar_files=False,
                 anharm=False,
                 return_anharm_files=False,
                 anharm_max_freq=anharmonic.DEFAULT_MAX_FREQ,
                 anharm_factor_data=None,
                 rate_constants=False,
                 return_rate_constant_files=False,
                 custom_rate_constants=False,
                 wigner_tunnel_corr=False,
                 extra_stages_file=None,
                 max_i_freq=anharmonic.DEFAULT_MAX_I_FREQ,
                 out_rep=None,
                 n_jmswf_subjobs=1,
                 subhost=None,
                 tpp=DEFAULT_TPP,
                 ffld_name=None,
                 logger=None):
        """
        Create an instance.
        :type reaction_wf_input_sts: list
        :param reaction_wf_input_sts: reaction workflow input structures
        :type dedup_geom_eps: float
        :param dedup_geom_eps: reduce the number of calculations by deduplicating
            the input structures based on geometry, using this threshold
            in Ang., and only calculating the representatives, a value of
            zero means no deduplicating
        :type n_conformers: int
        :param n_conformers: number of conformers to search for
        :type skip_eta_rotamers: bool
        :param skip_eta_rotamers: skip eta rotamers generation if true
        :type pp_rel_energy_thresh: None or float
        :param pp_rel_energy_thresh: relative energy threshold in kJ/mol, if None then
            only the n_conformers lowest energy conformers are returned
        :type seed: int
        :param seed: seed for random number generator
        :type only_rings: bool
        :param only_rings: if True then only allow rotation of eta-bound rings,
            if False then also allow rotation of ligands where the eta-bound
            motif is acyclic, for example ethene, etc.
        :type return_csearch_files: bool
        :param return_csearch_files: whether to return all output files
            from the conformational search subjobs
        :type jaguar_keywords: OrderedDict
        :param jaguar_keywords: Jaguar keywords
        :type temp_data: TempData
        :param temp_data: the temperature data for thermochemical properties
        :type press_data: PressData
        :param press_data: the pressure data for thermochemical properties
        :type return_jaguar_files: bool
        :param return_jaguar_files: whether to return all output files
            from the Jaguar subjobs
        :type anharm: bool
        :param anharm: whether to run the anharmonic workflow
        :type return_anharm_files: bool
        :param return_anharm_files: whether to return all output files
            from the anharmonic workflow
        :type anharm_max_freq: float
        :param anharm_max_freq: anharmonic potentials are created for normal modes with
            harmonic frequencies less than this value in wavenumbers (cm^-1)
        :type anharm_factor_data: anharmonic.SeqData or None
        :param anharm_factor_data: unitless factor data for factors that multiply
            a normal mode displacement, if None then the defaults are used,
            the number of points is in the positive direction only, excluding
            zero and the negative direction, for example using a value
            of 4 in turn means 2 * 4 + 1 = 9 points total
        :type rate_constants: bool
        :param rate_constants: whether to report rate constant(s) for the rate
            determining step of the reaction using canonical transition state
            theory
        :type return_rate_constant_files: bool
        :param return_rate_constant_files: whether to return all output files
            from the rate constant subjobs
        :param bool custom_rate_constants: whether to compute custom rate
            constants
        :type wigner_tunnel_corr: bool
        :param wigner_tunnel_corr: whether to include the Wigner tunneling correction
            when computinig rate constant(s)
        :type extra_stages_file: str
        :param extra_stages_file: the name of a file containing extra
            stages for a Jaguar Multistage Workflow subjob that will
            be performed on all output structures from the reaction workflow,
            the first of these extra stages is always skipped so as to allow
            analysis to potentially be the first extra stage
        :type max_i_freq: float
        :param max_i_freq: tolerate small imaginary frequencies less than this value in
            wavenumbers (cm^-1)
        :param out_rep: if a string then must be either module constant
            parserutils.CENTROID or parserutils.ETA, if None then do nothing
        :type n_jmswf_subjobs: int
        :param n_jmswf_subjobs: the maximum number of simultaneous Jaguar
            multistage workflow subjobs
        :type subhost: str
        :param subhost: the host to use for subjobs
        :type tpp: int
        :param tpp: the number of threads to use for Jaguar
            subjobs, i.e. -TPP (threads-per-process)
        :type ffld_name: str
        :param ffld_name: the name of the force field to use (e.g., OPLS2005 or
            S-OPLS)
        :type logger: logging.Logger or None
        :param logger: output logger or None if there isn't one
        """
        if ffld_name is None:
            ffld_name = msutils.get_default_forcefield().name
        self.reaction_wf_input_sts = reaction_wf_input_sts
        self.dedup_geom_eps = dedup_geom_eps
        self.n_conformers = n_conformers
        self.skip_eta_rotamers = skip_eta_rotamers
        self.pp_rel_energy_thresh = pp_rel_energy_thresh
        self.seed = seed
        self.only_rings = only_rings
        self.return_csearch_files = return_csearch_files
        self.jaguar_keywords = jaguar_keywords
        self.temp_data = temp_data or TempData(
            temp_start=jmswfu.DEFAULT_TEMP_START,
            temp_step=jmswfu.DEFAULT_TEMP_STEP,
            temp_n=jmswfu.DEFAULT_TEMP_N)
        self.press_data = press_data or PressData(
            press_start=jmswfu.DEFAULT_PRESS_START,
            press_step=jmswfu.DEFAULT_PRESS_STEP,
            press_n=jmswfu.DEFAULT_PRESS_N)
        self.return_jaguar_files = return_jaguar_files
        self.anharm = anharm
        self.return_anharm_files = return_anharm_files
        self.anharm_max_freq = anharm_max_freq
        self.anharm_factor_data = anharm_factor_data or anharmonic.SeqData(
            start=anharmonic.DEFAULT_F_START,
            step=anharmonic.DEFAULT_F_STEP,
            n_points=anharmonic.DEFAULT_F_N_POINTS)
        self.rate_constants = rate_constants
        self.return_rate_constant_files = return_rate_constant_files
        self.custom_rate_constants = custom_rate_constants
        self.wigner_tunnel_corr = wigner_tunnel_corr
        self.extra_stages_file = extra_stages_file
        self.max_i_freq = max_i_freq
        self.out_rep = out_rep
        self.n_jmswf_subjobs = n_jmswf_subjobs
        self.subhost = subhost
        self.tpp = tpp
        self.ffld_name = ffld_name
        self.logger = logger
        self.unique_geom_dict = {}
        self.sibling_conformers_dict = None
        self.eids_dict = {}
        self._jmswf_launch_dir = JMSWF_LAUNCH_DIR
        self._jmswf_base_name = 'jaguar_multistage_workflow'
        self._jmswf_in_mae_file = self._jmswf_base_name + '.maegz'
        self._jmswf_in_txt_file = self._jmswf_base_name + '.txt'
        self._jmswf_out_mae_file = f'{self._jmswf_base_name}{OUT_EXT}.maegz'
        self._jmswf_out_smap_file = f'{self._jmswf_base_name}{OUT_EXT}.smap'
        self.has_ts = False
        self.rdss = None
        self._anharm_dir = ANHARM_LAUNCH_DIR
        self._ctst_dir = CTST_RATE_CONSTANTS_DIR
        self.rep_keys = set()
        self.rep_sts = []
        self.jmswf_tag = REACTION_WF_TAG 
[docs]    def validateRateConstants(self):
        """
        Validate rate constants.
        :raise ReactionWorkflowException: if there is an issue
        """
        if not self.rate_constants:
            return
        sibling_conformers_dict = bin_structures_by_property(
            self.reaction_wf_input_sts,
            key=SIBLING_GROUP_KEY,
            inner_key=CONFORMERS_GROUP_KEY)
        eligible_for_rate_constants = False
        # In order to be eligible we need a sibling group containing 1 TS whose parent
        # sibling group contains 0 TSs
        for sibling_group, conformers_dict in sibling_conformers_dict.items():
            # ensure a single TS in the sibling group
            is_ts = []
            for conformers in conformers_dict.values():
                rep_conf = conformers[0]  # all same here
                is_ts.append(
                    bool(rep_conf.property.get(TRANSITION_STATE_STRUCTURE_KEY)))
            if is_ts.count(True) != 1:
                continue
            # ensure the sibling group containing a single TS has
            # a parent sibling group with no TSs
            rep_conf = list(conformers_dict.values())[0][0]  # all same here
            parent_sibling_group_key = rep_conf.property.get(
                PARENT_SIBLING_GROUPS_KEY)
            if (not parent_sibling_group_key or
                    parent_sibling_group_key not in sibling_conformers_dict):
                continue
            parent_conformers_dict = sibling_conformers_dict[
                parent_sibling_group_key]
            is_not_ts = []
            for conformers in parent_conformers_dict.values():
                rep_conf = conformers[0]  # all same here
                is_not_ts.append(not bool(
                    rep_conf.property.get(TRANSITION_STATE_STRUCTURE_KEY)))
            if all(is_not_ts):
                eligible_for_rate_constants = True
                break
        if not eligible_for_rate_constants:
            msg = ('Rate constant calculations require child transition state '
                   'structures.')
            raise ReactionWorkflowException(msg) 
[docs]    def validateAnharmonic(self):
        """
        Validate anharmonic.
        :raise ReactionWorkflowException: if there is an issue
        """
        sibling_conformers_dict = bin_structures_by_property(
            self.reaction_wf_input_sts,
            key=SIBLING_GROUP_KEY,
            inner_key=CONFORMERS_GROUP_KEY)
        if not self.has_ts and self.anharm:
            getters = [
                get_restrain_distance_idxs, get_restrain_angle_idxs,
                get_restrain_dihedral_idxs
            ]
            only_restrained_opts = True
            for sibling_group, conformer_group, first_conformer in self.representativeConformers(
                    sibling_conformers_dict=sibling_conformers_dict):
                for getter in getters:
                    if getter(first_conformer):
                        break
                else:
                    only_restrained_opts = False
                if not only_restrained_opts:
                    break
            if only_restrained_opts:
                msg = (
                    'The anharmonic workflow requires frequencies but '
                    'all Jaguar multistage jobs are restrained optimizations.')
                raise ReactionWorkflowException(msg) 
[docs]    def validateSubhost(self):
        """
        Validate subhost.
        :raise ReactionWorkflowException: if there is an issue
        """
        host = 'localhost'
        if jobcontrol.get_backend():
            host_list = jobcontrol.get_backend_host_list()
            if host_list:
                host = host_list[0][0]
        if host != 'localhost' and self.subhost == 'localhost':
            msg = ('Jobs requesting a remote driver host and local '
                   'subjob host are not supported.')
            raise ReactionWorkflowException(msg)
        if not self.subhost:
            self.subhost = host 
[docs]    def validate(self):
        """
        Validate.
        :raise ReactionWorkflowException: if there is an issue
        """
        self.validateRateConstants()
        self.validateAnharmonic()
        self.validateSubhost() 
    def _setUpAnharmonic(self):
        """
        Set up anharmonic workflow calculations.
        :raise ReactionWorkflowException: if there is an issue
        """
        # determine which JMSWF stages calculate frequencies, currently allow
        # for the possibility of an extra stage calculating frequencies (so
        # consider not only the first one or two reaction workflow stages),
        # for each stage collect all Jaguar *.out and *.01.in files
        in_files_by_stage = {}
        pattern = os.path.join(self._jmswf_launch_dir, '*.out')
        for out_path in glob.glob(pattern):
            jag_out = JaguarOutput(out_path)
            if not anharmonic.get_normal_modes(jag_out):
                continue
            out_files = [os.path.split(out_path)[1]]
            try:
                self.checkJMSWFOutputs(out_files)
            except JMSWFException:
                continue
            rin_path = os.path.splitext(out_path)[0] + '.01.in'
            if not os.path.exists(rin_path):
                msg = (f'The Jaguar restart input file {rin_path} can '
                       'not be found.')
                raise ReactionWorkflowException(msg)
            out_file = os.path.split(out_path)[1]
            rin_file = os.path.split(rin_path)[1]
            stage_idx = get_stage_idx(out_file, is_filename=True)
            in_files_by_stage.setdefault(stage_idx, []).append(
                (out_file, rin_file))
        # ensure that for each of these stages each conformer in the reaction
        # workflow has an anharmonic input deck
        for idx, all_files in in_files_by_stage.items():
            if len(all_files) != len(self.rep_sts):
                msg = (
                    'Some Jaguar output files from Jaguar Multistage Workflow '
                    f'stage {idx} are missing frequency data.')
                raise ReactionWorkflowException(msg)
        # prepare anharmonic subjobs in a separate directory
        os.makedirs(self._anharm_dir, exist_ok=True)
        for all_files in in_files_by_stage.values():
            for files in all_files:
                for afile in files:
                    apath = os.path.join(self._jmswf_launch_dir, afile)
                    bpath = os.path.join(self._anharm_dir, afile)
                    shutil.copy(apath, bpath)
    @staticmethod
    def _updateAnharmonicJaguarKwargs(rin_file_path):
        """
        Update anharmonic Jaguar keyword arguments.
        :type rin_file_path: str
        :param rin_file_path: the restart Jaguar input file path (`*.01.in`) whose
            &gen keywords arguments will be updated
        """
        # see MATSCI-11151 - where *vis, etc. property files are
        # not needed for rxnwf
        rin_jag = JaguarInput(input=rin_file_path)
        rin_jag.deleteKeys(['iplotden', 'iplotesp', 'iorb1a', 'iorb2a', 'nbo'])
        rin_jag.saveAs(rin_file_path)
[docs]    def runAnharmonic(self):
        """
        Run the anharmonic workflow.
        """
        self._setUpAnharmonic()
        # allow multiple simultaneous anharmonic subjobs
        options = argparse.Namespace(TPP=self.tpp)
        host = f'{self.subhost}:{self.n_jmswf_subjobs}'
        job_dj = jobutils.create_queue(options=options, host=host)
        # allow multiple simultaneous Jaguar single point subjobs for each
        # anharmonic subjob, continue processing even if there are zero
        # normal modes to be treated anharmonically
        cmd = ['anharmonic_workflow_driver.py']
        cmd += [anharmonic.FLAG_MAX_FREQ, str(self.anharm_max_freq)]
        cmd += [
            anharmonic.FLAG_F_DATA,
            str(self.anharm_factor_data.start),
            str(self.anharm_factor_data.step),
            str(self.anharm_factor_data.n_points)
        ]
        cmd += [parserutils.FLAG_ROBUST]
        cmd += [FLAG_MAX_I_FREQ, str(self.max_i_freq)]
        cmd += [anharmonic.FLAG_PROCESS_NO_ANHARMONICITIES]
        cmd += [FLAG_TPP, str(self.tpp)]
        cmd += ['-HOST', host]
        # collect jobs, restart *.out files are marked with ANHARMONIC_TAG
        # and should be skipped here, in contrast to JMSWF which only runs
        # Jaguar jobs the anharmonic driver does more than just running
        # Jaguar jobs so currently restart each job even if all Jaguar *out
        # files are available, for such cases the Jaguar jobs are not restarted
        pattern = os.path.join(self._anharm_dir, '*.out')
        for out_path in glob.glob(pattern):
            if ANHARMONIC_TAG in out_path:
                continue
            out_file = os.path.split(out_path)[1]
            base_name, ext = os.path.splitext(out_file)
            rin_file = base_name + '.01.in'
            rin_file_path = os.path.join(self._anharm_dir, rin_file)
            self._updateAnharmonicJaguarKwargs(rin_file_path)
            job_name = base_name + ANHARMONIC_TAG
            _cmd = list(cmd)
            _cmd += [anharmonic.FLAG_JAG_OUT_FILE, out_file]
            _cmd += [anharmonic.FLAG_JAG_RIN_FILE, rin_file]
            _cmd += ['-JOBNAME', job_name]
            jc_job = jobutils.RobustSubmissionJob(_cmd,
                                                  command_dir=self._anharm_dir,
                                                  name=job_name)
            job_dj.addJob(jc_job)
        # run jobs
        job_dj.run() 
[docs]    def processAnharmonic(self):
        """
        Process the anharmonic workflow.
        :raise ReactionWorkflowException: if there is an issue
        """
        # check output, for now require success for each conformer
        pattern = os.path.join(self._anharm_dir, f'*{anharmonic.OUT_EXT}')
        mae_paths = glob.glob(pattern)
        stage_idxs = {
            get_stage_idx(mae_path, is_filename=True) for mae_path in mae_paths
        }
        force_return_anharm_files = len(mae_paths) != len(stage_idxs) * len(
            self.rep_sts)
        # copy output
        if self.return_anharm_files or force_return_anharm_files:
            jobutils.add_zipfile_to_backend(self._anharm_dir)
        if force_return_anharm_files:
            msg = ('At least a single anharmonic workflow subjob has failed.')
            raise ReactionWorkflowException(msg)
        # update representative structures with anharmonic properties for relevant
        # stages, update keys, and store energy keys
        for rep_st in self.rep_sts:
            for stage_idx in stage_idxs:
                mae_path = os.path.join(self._anharm_dir,
                    f'{rep_st.title}{STAGE_SEPARATOR}{stage_idx}{ANHARMONIC_TAG}' \
                    
f'{anharmonic.OUT_EXT}')
                st = structure.Structure.read(mae_path)
                for key, value in st.property.items():
                    if key.startswith(anharmonic.KEY_STARTER):
                        _rep_keys = self._setProperties(rep_st, key, stage_idx,
                                                        value)
                        if 'Total' in key:
                            self.rep_keys.update(_rep_keys) 
    def _setRateDeterminingSteps(self):
        """
        Set a list of rate determining steps.
        :raise ReactionWorkflowException: if there is an issue
        """
        rxnwfea = ReactionWorkflowEnergyAnalysis
        # use conformationally averaged energies relative to parents
        # including cross terms to determine the rate determining step
        # of the reaction
        base_name = jobutils.get_jobname(DEFAULT_JOB_NAME)
        rel_parents_file = base_name + REL_PARENTS_W_X_EXT
        with open(rel_parents_file, 'r') as afile:
            rel_parents_dict = json.load(afile)
        # bin (child - parent) energy differences by relevant stages to
        # determine the rate determining step for each stage, the energy
        # to use is U_0 = E_SCF + E_ZPE which is temperature independent
        # and thus previously algebraically averaged over conformers and
        # is now linearly summable, currently harmonic ZPE is used even
        # if modeling anharmonicities
        scf_energy_h = rxnwfea.getHeader(jaguarworkflows.GAS_PHASE_ENERGY_PROP)
        zpe_energy_h = rxnwfea.getHeader(jaguarworkflows.ZERO_POINT_ENERGY_PROP)
        stage_child_parent_energies_dict = {}
        for child_name, parent_dict in rel_parents_dict.items():
            for parent_name, edict in parent_dict.items():
                stage_energies_dict = {}
                for h, energy in edict.items():
                    if h.startswith(scf_energy_h) or h.startswith(zpe_energy_h):
                        stage_idx = get_stage_idx(h)
                        stage_energies_dict.setdefault(stage_idx,
                                                       []).append(energy)
                for stage_idx, energies in stage_energies_dict.items():
                    if len(energies) == 2:
                        stage_child_parent_energies_dict.setdefault(
                            stage_idx, []).append(
                                (child_name, parent_name, sum(energies)))
        if not stage_child_parent_energies_dict:
            msg = ('Rate constant calulations require at least a single stage '
                   'in which Jaguar frequencies are calculated.')
            raise ReactionWorkflowException(msg)
        # collect rate determining steps by sorting by energy and finding the
        # highest energy transition state
        rdss = []
        for stage_idx, values in stage_child_parent_energies_dict.items():
            rds = None
            for child_name, parent_name, energy in sorted(values,
                                                          key=lambda x: x[2],
                                                          reverse=True):
                for sibling_group, conformer_group, rep_child_conf in self.representativeConformers(
                        specific_sibling_group=child_name):
                    if rep_child_conf.property.get(
                            TRANSITION_STATE_STRUCTURE_KEY):
                        rds = SimpleNamespace(stage_idx=stage_idx,
                                              child_name=child_name,
                                              parent_name=parent_name)
                        break
                if rds:
                    rdss.append(rds)
                    break
        if not rdss:
            msg = ('Rate constant calulations require at least a single child '
                   'structure to be marked as a transition state structure.')
            raise ReactionWorkflowException(msg)
        self.rdss = rdss
    def _getJaguarOutputPathsChildAndParent(self, rds):
        """
        Return paths to the Jaguar output files for child and
        parent conformers for the given rate determining step.
        :type rds: SimpleNamespace
        :param rds: the rate determining step
        :rtype: list, list
        :return: the child and parent conformer paths the latter of
            which is a list of lists of paths for each parent structure
        """
        # among the child sibling group collect spectator masses and
        # for the found TS collect conformer titles
        child_spectator_masses = []
        child_ts_conformer_titles = []
        for conformer_group, conformers in self.sibling_conformers_dict[
                rds.child_name].items():
            first_conformer = conformers[0]
            if first_conformer.property.get(TRANSITION_STATE_STRUCTURE_KEY):
                for conformer in conformers:
                    child_ts_conformer_titles.append(conformer.title)
            else:
                child_spectator_masses.append(
                    get_molecular_weight(first_conformer,
                                         decimal=MASS_N_DECIMAL))
        # isolate spectators and determine which parent siblings
        # made the TS
        binned_parent_conformer_titles = {}
        for conformer_group, conformers in self.sibling_conformers_dict[
                rds.parent_name].items():
            first_conformer = conformers[0]
            parent_mass = get_molecular_weight(first_conformer,
                                               decimal=MASS_N_DECIMAL)
            if parent_mass in child_spectator_masses:
                child_spectator_masses.remove(parent_mass)
            else:
                for conformer in conformers:
                    binned_parent_conformer_titles.setdefault(
                        conformer_group, []).append(conformer.title)
        binned_parent_conformer_titles = \
            
list(binned_parent_conformer_titles.values())
        get_paths = lambda x: [
            os.path.join(
                self._jmswf_launch_dir,
                get_unique_geom_title(title, self.unique_geom_dict) +
                STAGE_SEPARATOR + str(rds.stage_idx) + '.out') for title in x
        ]
        child_paths = get_paths(child_ts_conformer_titles)
        binned_parent_paths = [
            get_paths(titles) for titles in binned_parent_conformer_titles
        ]
        return child_paths, binned_parent_paths
    def _setUpCTST(self):
        """
        Set up canonical transition state theory calculations.
        """
        self._setRateDeterminingSteps()
        # the Wigner tunneling correction is computed from the imaginary
        # frequency of the transition state and ultimately multiples its
        # partition function to alter the rate constant
        wigner_tunnel_corr = 'yes' if self.wigner_tunnel_corr else 'no'
        temps = ' '.join([
            str(self.temp_data.temp_start + i * self.temp_data.temp_step)
            for i in range(0, self.temp_data.temp_n)
        ])
        # collect all required CTST jobs
        get_conf_title = lambda x: fileutils.get_basename(x).split(
            STAGE_SEPARATOR)[0]
        for rds in self.rdss:
            child_paths, binned_parent_paths = self._getJaguarOutputPathsChildAndParent(
                rds)
            rds.jobs = []
            for child_path in child_paths:
                child_title = get_conf_title(child_path)
                for parent_paths in itertools.product(*binned_parent_paths):
                    parent_title = '_'.join(
                        get_conf_title(parent_path)
                        for parent_path in parent_paths)
                    job_name = (f'tst{STAGE_SEPARATOR}{rds.stage_idx}_'
                                f'{parent_title}_{child_title}')
                    cmd = [os.path.join(os.environ['SCHRODINGER'], 'run')]
                    cmd += ['tst_rate_startup.py']
                    cmd += ['-wigner_tunnel_corr', wigner_tunnel_corr]
                    cmd += [FLAG_MAX_I_FREQ, str(self.max_i_freq)]
                    cmd += ['-temperature', temps]
                    cmd += ['-COMPLEX_OUTPUT', child_path]
                    cmd += ['-COMPLEX_TRV', 'trv']
                    if self.anharm:
                        base_name = fileutils.get_basename(child_path)
                        base_name = get_unique_geom_title(
                            base_name, self.unique_geom_dict)
                        mae_path = os.path.join(
                            self._anharm_dir,
                            f'{base_name}{ANHARMONIC_TAG}{anharmonic.OUT_EXT}')
                        cmd += ['-COMPLEX_ANHARM_OUTPUT_MAE', mae_path]
                    for idx, parent_path in enumerate(parent_paths):
                        cmd += [f'-REAGENT_OUTPUT_{idx}', parent_path]
                        cmd += [f'-REAGENT_TRV_{idx}', 'trv']
                        if self.anharm:
                            base_name = fileutils.get_basename(parent_path)
                            base_name = get_unique_geom_title(
                                base_name, self.unique_geom_dict)
                            mae_path = os.path.join(
                                self._anharm_dir,
                                f'{base_name}{ANHARMONIC_TAG}{anharmonic.OUT_EXT}'
                            )
                            cmd += [
                                f'-REAGENT_ANHARM_OUTPUT_MAE_{idx}', mae_path
                            ]
                    cmd += ['-DEBUG']  # enable verbose logging
                    cmd += [job_name]
                    rds.jobs.append(cmd)
[docs]    def runCTST(self):
        """
        Run canonical transition state theory calculations to determine rate
        constant(s) for the rate determining step of the reaction.
        """
        self._setUpCTST()
        # see MATSCI-6611 - originally jobs were launched simultaneously on
        # self.subhost using a JobDJ filled with SubprocessJob types but it
        # was found that starting the first of such jobs was hanging
        # indefinitely on Windows and so it was switched to use subprocess
        # (so running on the original -HOST) in serial
        for rds in self.rdss:
            rds.stdouts = []
            for job in rds.jobs:
                try:
                    stdout = subprocess.check_output(job,
                                                     stderr=subprocess.STDOUT,
                                                     universal_newlines=True)
                except subprocess.CalledProcessError:
                    stdout = ''
                rds.stdouts.append(stdout) 
[docs]    def prepareCTSTOutput(self):
        """
        Prepare CTST output.
        :raise ReactionWorkflowException: if there is an issue
        """
        # process raw output data
        failed_stage_idx = None
        for rds in self.rdss:
            temp_data_dict = {}
            for stdout in rds.stdouts:
                for line in stdout.splitlines():
                    if line.startswith('{'):
                        data = yaml.safe_load(line.strip())
                        temperature = float(data['Temperature'][0])
                        energy = float(data['Energy Barrier'][0])
                        r_partition_f = float(
                            data['Reagents Partition Function'][0])
                        ts_partition_f = float(
                            data['Complex Partition Function'][0])
                        mantissa, order_of_magnitude = data[
                            'TST Rate Constant'][0].split(' x ')
                        rate_constant = float(mantissa) * eval(
                            order_of_magnitude.replace('^', '**'))
                        rate_constant_units = data['TST Rate Constant'][1]
                        data = (energy, r_partition_f, ts_partition_f,
                                rate_constant, rate_constant_units)
                        temp_data_dict.setdefault(temperature, []).append(data)
            if not temp_data_dict:
                failed_stage_idx = rds.stage_idx
                break
            for temp, data in temp_data_dict.items():
                energies, r_partition_fs, ts_partition_fs, rate_constants, \
                    
rate_constant_units = [numpy.array(i) for i in zip(*data)]
                # energies are temperature independent so perform an algebraic
                # average
                avg_energy = numpy.mean(energies)
                # Boltzmann average the temperature dependent partition functions
                # and rate constants
                factors = numpy.exp(-1 * energies / (IDEAL_GAS_CONSTANT * temp))
                partition_f = sum(factors)
                avg_r_partition_f = numpy.dot(r_partition_fs,
                                              factors) / partition_f
                avg_ts_partition_f = numpy.dot(ts_partition_fs,
                                               factors) / partition_f
                avg_rate_constant = numpy.dot(rate_constants,
                                              factors) / partition_f
                temp_data_dict[temp] = (avg_energy, avg_r_partition_f,
                                        avg_ts_partition_f, avg_rate_constant,
                                        rate_constant_units[0])
            rds.temp_data_dict = temp_data_dict
        # process raw output files
        if failed_stage_idx is not None or self.return_rate_constant_files:
            os.makedirs(self._ctst_dir, exist_ok=True)
            for rds in self.rdss:
                for job, stdout in zip(rds.jobs, rds.stdouts):
                    log_file = os.path.join(self._ctst_dir, job[-1] + '.log')
                    with open(log_file, 'w') as afile:
                        afile.write(stdout)
            jobutils.add_zipfile_to_backend(self._ctst_dir)
            if failed_stage_idx:
                msg = (
                    'All rate constant calculations for stage '
                    f'{failed_stage_idx} have failed for all child and parent '
                    'conformers.')
                raise ReactionWorkflowException(msg)
        # process summary output file
        base_name = jobutils.get_jobname(DEFAULT_JOB_NAME)
        rate_constants_file = base_name + RATE_CONSTANTS_W_X_EXT
        with csv_unicode.writer_open(rate_constants_file) as afile:
            writer = csv.writer(afile)
            writer.writerow([
                'Stage_Index', 'Parent_Sibling_Group_Name',
                'Child_Sibling_Group_Name', 'Temperature/K',
                'Conf_Avg_SCF+ZPE_Energy_Barrier/kcal/mol',
                'Conf_Avg_Reactants_Partition_F', 'Conf_Avg_TS_Partition_F',
                CONF_AVG_CTST_RATE_CONSTANT, 'CTST_Rate_Constant_Units'
            ])
            for rds in self.rdss:
                for temp, data in rds.temp_data_dict.items():
                    writer.writerow(
                        [rds.stage_idx, rds.parent_name, rds.child_name, temp] +
                        list(data))
        jobutils.add_outfile_to_backend(rate_constants_file) 
[docs]    def exportEnergyDiagrams(self):
        """
        Export the energy diagrams if the parents file exists
        """
        base_name = jobutils.get_jobname(DEFAULT_JOB_NAME)
        json_path = base_name + REL_PARENTS_WO_X_EXT
        if os.path.exists(json_path):
            csv_path = base_name + REL_REACTANTS_WO_X_EXT
            plotter = EnergyDiagramPlotter(json_path,
                                           csv_path,
                                           logger=self.logger)
            plotter.run() 
[docs]    def run(self):
        """
        Run the reaction workflow.
        :raise ReactionWorkflowException: if there is an issue
        """
        zip_files = jaguar_restart.get_restart_zip_files(
            base_names=[JMSWF_LAUNCH_DIR, ANHARM_LAUNCH_DIR])
        jaguar_restart.prepare_restart_dirs(zip_files)
        self.has_ts = has_transition_state(rxnwf_sts=self.reaction_wf_input_sts)
        self.validate()
        self._setUniqueGeomDict(self.reaction_wf_input_sts)
        sts = self._getUniqueGeomSts(self.reaction_wf_input_sts)
        try:
            self.createConformers(sts)
        except ConformationalSearchException as err:
            raise ReactionWorkflowException(str(err))
        try:
            self._overwriteConformerEIDS()
            self.runJMSWF()
            self.prepareJMSWFOutput()
            self.setRepresentatives()
        except JMSWFException as err:
            raise ReactionWorkflowException(str(err))
        if self.anharm:
            self.runAnharmonic()
            self.processAnharmonic()
        if self.out_rep:
            for st in self.rep_sts:
                etatoggle.toggle_structure(st, out_rep=self.out_rep)
        self.rep_sts = self._duplicateUniqueGeomSts()
        self.sibling_conformers_dict = bin_structures_by_property(
            self.rep_sts, key=SIBLING_GROUP_KEY, inner_key=CONFORMERS_GROUP_KEY)
        out_mae_file = self.finalizeJMSWFOutput()
        ReactionWorkflowEnergyAnalysis(out_mae_file,
                                       self.rep_keys).writeDataFiles()
        self.exportEnergyDiagrams()
        compute_custom_rates = (self.rate_constants and
                                self.custom_rate_constants)
        # Defined in MATSCI-10586
        try:
            get_custom_keq_rates(out_mae_file,
                                 self.press_data,
                                 self.temp_data,
                                 extra_stages_file=self.extra_stages_file,
                                 compute_rates=compute_custom_rates)
        except ValueError as err:
            raise ReactionWorkflowException(str(err))
        if self.rate_constants and not self.custom_rate_constants:
            self.runCTST()
            self.prepareCTSTOutput()
        log('All finished', timestamp=True, pad=True, logger=self.logger)  
[docs]class Sites:
    """
    Manage enumeration sites.
    """
[docs]    @staticmethod
    def getSites(sites_data, n_structures=1):
        """
        Return a list of Site from the given sites data.
        :type sites_data: list
        :param sites_data: contains for each site a list of
            data [from_idx, to_idx, hash_idx] with an optional
            fourth item structure_idx
        :type n_structures: int
        :param n_structures: the number of structures, used if the given
            sites lack the optional fourth item
        :raise InvalidInput: if there is an issue
        :rtype: list
        :return: contains `Site`
        """
        sites = []
        for site_data in sites_data:
            n_idxs = len(site_data)
            # 3 or 4 indices must be given (see below for cases)
            if not (n_idxs == 3 or n_idxs == 4):
                msg = ('Sites must be defined using either 3 or 4 indices.')
                raise InvalidInput(msg)
            elif n_idxs == 3:
                # backwards compatibility, no structure specific indexing given,
                # all structures have the same indexing
                i, j, k = site_data
                for l in range(1, n_structures + 1):
                    sites.append(
                        Site(from_idx=i, to_idx=j, hash_idx=k, structure_idx=l))
            elif n_idxs == 4:
                # structure specific indexing given
                i, j, k, l = site_data
                sites.append(
                    Site(from_idx=i, to_idx=j, hash_idx=k, structure_idx=l))
        return sites 
[docs]    @staticmethod
    def delete_substitution_site_bonds(st, sites):
        """
        Delete bonds in the given structure that occur after
        the given substitution sites and return extracted
        core information.
        :type st: `structure.Structure`
        :param st: the structure
        :type sites: list
        :param sites: contains `Site`
        :rtype: `structure.Structure`, dict
        :return: the extracted core and old-to-new atom index
            map
        """
        root_remove_idxs = set()
        for site in sites:
            from_atom = st.atom[site.from_idx]
            to_atom = st.atom[site.to_idx]
            bonded_atoms = [
                atom for atom in to_atom.bonded_atoms
                if atom.index != from_atom.index
            ]
            for atom in bonded_atoms:
                st.deleteBond(to_atom, atom)
                root_remove_idxs.add(atom.index)
        remove_idxs = set()
        for root_remove_idx in root_remove_idxs:
            mol = st.atom[root_remove_idx].getMolecule()
            remove_idxs.update(mol.getAtomIndices())
        core_st = st.copy()
        old_to_new = core_st.deleteAtoms(remove_idxs, renumber_map=True)
        for idx in list(old_to_new.keys()):
            if old_to_new[idx] is None:
                old_to_new.pop(idx)
        return core_st, old_to_new 
[docs]    @staticmethod
    def validateSitesData(sites, st):
        """
        Validate the given sites data.
        :type sites: list
        :param sites: contains `Site`
        :type st: `structure.Structure`
        :param st: the structure
        :raise InvalidInput: if there is an issue
        """
        st = st.copy()
        # validate sites
        ring_bonds = rings.find_ring_bonds(st)
        for site in sites:
            idxs = (site.from_idx, site.to_idx)
            if max(idxs) > st.atom_total:
                msg = get_msg(site, 'At least a single index does not exist.')
                raise InvalidInput(msg)
            bond = st.getBond(*idxs)
            if not bond:
                msg = get_msg(site, 'They are not bound.')
            elif bond.order != 1:
                msg = get_msg(site, 'It is not a single bond.')
            elif set(idxs) in ring_bonds:
                msg = get_msg(site, 'The bond is in a ring.')
            elif msutils.is_dummy_atom(st.atom[site.to_idx]):
                msg = get_msg(site, 'Can not enumerate on a centroid atom.')
            else:
                msg = None
            if msg:
                raise InvalidInput(msg)
        mol_total = st.mol_total
        # delete bonds
        core_st, old_to_new = Sites.delete_substitution_site_bonds(st, sites)
        # validate molecule
        molecule_idxs = set(
            st.atom[site.to_idx].getMolecule().number for site in sites)
        if len(molecule_idxs) > mol_total:
            msg = (
                'At least a single site is invalid.  Sites should all be '
                '("from", "to") index pairs where the "from" index is kept and the '
                '"to" index replaced.')
            raise InvalidInput(msg)
        # validate core
        core_idxs = get_core_idxs(st)
        to_idx_is_core = any(site.to_idx in core_idxs for site in sites)
        if to_idx_is_core or not core_idxs.issubset(old_to_new.keys()):
            msg = ('At least a single site is invalid.  The core of the '
                   'molecule can not be modified.')
            raise InvalidInput(msg) 
[docs]    @staticmethod
    def getBinnedSites(sites):
        """
        Get the sites binned firstly by structure_idx and secondly by hash_idx.
        :type sites: list
        :param sites: contains `Site`
        :rtype: dict
        :return: keys are structure_idx, values are dicts whose keys
            are hash_idx and values are lists of `Site`
        """
        binned_sites = {}
        for site in sites:
            adict = binned_sites.setdefault(site.structure_idx, {})
            adict.setdefault(site.hash_idx, []).append(site)
        return binned_sites  
[docs]def get_RGE_sources(rgroup_sts, binned_sites, old_to_new):
    """
    Return a list of Source that is prepared for enumeration
    using the R-Group Enumeration module.
    :type rgroup_sts: list of `structure.Structure`
    :param rgroup_sts: the R-group structures
    :type binned_sites: list of lists of `Site`
    :param binned_sites: the binned sites
    :type old_to_new: dict
    :param old_to_new: a map of old-to-new atom indices
    :rtype: list
    :return: contains `Source`
    """
    sources = []
    for rgroup_st, sites in zip(rgroup_sts, binned_sites):
        site_idxs = [old_to_new[site.to_idx] for site in sites]
        sources.append(Source(rgroup_st=rgroup_st, site_idxs=site_idxs))
    return sources 
[docs]def substitute(st, rgroups_dict, sites_dict):
    """
    Return a copy of the given structure that has been substituted
    with the given R-groups at the given sites.
    :type st: `structure.Structure`
    :param st: the structure on which the substitution is performed
    :type rgroups_dict: dict
    :param rgroups_dict: keys are integer hashes relating to sites,
        values are `structure.Structure`
    :type sites_dict: dict
    :param sites_dict: keys are integer hashes relating to R-groups,
        values are lists of `Site`
    :raise InvalidInput: if there is an issue
    :rtype: `structure.Structure`
    :return: the substituted structure
    """
    # the rgroup_enumerate module automatically substitutes
    # so that the leaving group is the smaller group and
    # because we can't be certain of the input structure
    # and substitution sites we are going to first prepare
    # the structure by trimming after the substitution sites
    all_sites = [site for sites in sites_dict.values() for site in sites]
    st, old_to_tmp = Sites.delete_substitution_site_bonds(st, all_sites)
    # collect substitution sources
    rgroup_sts, binned_sites = [], []
    for hash_idx in sorted(sites_dict):
        rgroup_sts.append(rgroups_dict[hash_idx])
        binned_sites.append(sites_dict[hash_idx])
    sources = get_RGE_sources(rgroup_sts, binned_sites, old_to_tmp)
    sources = [[[source.rgroup_st]] + source.site_idxs for source in sources]
    # do the substitution
    try:
        obj = rgroup_enumerate.RgroupEnumerator(st,
                                                sources,
                                                optimize_sidechains=True,
                                                enumerate_cistrans=False,
                                                yield_renum_maps=True)
        out_st, tmp_to_new = next(iter(obj))
    except rgroup_enumerate.RgroupError as err:
        msg = ('The R-group enumeration has failed for structure '
               f'{st.title}.  ') + str(err)
        raise InvalidInput(msg)
    amap = dict([(k, tmp_to_new[v]) for k, v in old_to_tmp.items()])
    update_index_properties(out_st, amap)
    tokens = out_st.title.replace(' ', '').split(',')
    rgroup_title = RGROUP_SEPARATOR.join(tokens[1:])
    out_st.title = tokens[0] + '_' + rgroup_title
    return out_st 
[docs]class EnumerateReactionWorkflow:
    """
    Manage enumeration of a reaction workflow.
    """
[docs]    def __init__(self,
                 rxnwf_file,
                 rgroup_files,
                 sites,
                 force_hetero_substitution=False,
                 out_rep=None,
                 base_name=DEFAULT_ENUM_RXNWF_JOB_NAME,
                 logger=None):
        """
        Create an instance.
        :type rxnwf_file: str
        :param rxnwf_file: the reaction workflow file
        :type rgroup_files: dict
        :param rgroup_files: keys are hash_idx (see sites), values are file names
        :type sites: list
        :param sites: contains `Site`
        :type force_hetero_substitution: bool
        :param force_hetero_substitution: if True then for hetero-eumeration
            do not additionally include homo-enumeration results
        :param out_rep: if a string then must be either module constant
            parserutils.CENTROID or parserutils.ETA, if None then do nothing
        :type base_name: str
        :param base_name: the base name to use in naming the enumerated output files
        :type logger: logging.Logger
        :param logger: the logger
        """
        self.rxnwf_file = rxnwf_file
        self.rgroup_files = rgroup_files
        self.binned_sites = Sites.getBinnedSites(sites)
        self.force_hetero_substitution = force_hetero_substitution
        self.out_rep = out_rep
        self.base_name = base_name
        self.logger = logger
        self.binned_rgroup_sts = {}
        self.has_keep_atoms = False
        self.out_structure_files = []
        self.has_structure_specific_sites = True 
[docs]    def validate(self):
        """
        Validate.
        :raise InvalidInput: if there is an issue
        """
        # check reaction workflow input file
        try:
            check_reaction_wf_structures(self.rxnwf_file, ffld_name=None)
        except ValueError as err:
            msg = str(err) + ('  A valid reaction workflow file must '
                              'first be prepared.')
            raise InvalidInput(msg)
        # check consistency between hash indices coming from both r-groups and sites
        hash_idxs = set(hash_idx for adict in self.binned_sites.values()
                        for hash_idx in adict.keys())
        rgroup_files_tmp = self.rgroup_files.copy()
        for hash_idx in hash_idxs:
            rgroup_file = rgroup_files_tmp.pop(hash_idx, None)
            if not rgroup_file:
                msg = (f'The site enumeration index {hash_idx} does not '
                       'have a corresponding R-group file defined.')
                raise InvalidInput(msg)
        if rgroup_files_tmp:
            hash_idxs = ','.join(map(str, sorted(rgroup_files_tmp.keys())))
            msg = ('The following enumeration indices given for R-group '
                   f'files are not used by any sites: {hash_idxs}.')
            raise InvalidInput(msg)
        # check r-group files
        self.binned_rgroup_sts = {}
        for hash_idx, rgroup_file in self.rgroup_files.items():
            self.binned_rgroup_sts.setdefault(rgroup_file, []).append(hash_idx)
        self.binned_rgroup_sts = {
            tuple(v): k for k, v in self.binned_rgroup_sts.items()
        }
        nfiles = 1
        for hash_idxs, rgroup_file in self.binned_rgroup_sts.items():
            n_needed = len(hash_idxs)
            n_available = structure.count_structures(rgroup_file)
            if n_available < n_needed:
                msg = (
                    f'There are only {n_available} R-groups present in the '
                    f'file {rgroup_file} but {n_needed} different structures '
                    'are needed.')
                raise InvalidInput(msg)
            if self.force_hetero_substitution:
                nfiles *= int(
                    math.factorial(n_available) /
                    math.factorial(n_available - n_needed))
            else:
                nfiles *= n_available**n_needed
        # check sites format
        sites = [
            site for adict in self.binned_sites.values()
            for sites in adict.values() for site in sites
        ]
        Sites.validateSitesFormat(sites)
        # check sites structure index
        max_structure_idx = max(self.binned_sites.keys())
        n_structures = structure.count_structures(self.rxnwf_file)
        if max_structure_idx > n_structures:
            msg = (
                'At least a single site specifies a structure index of '
                f'{max_structure_idx} but there are only {n_structures} in {self.rxnwf_file}.'
            )
            raise InvalidInput(msg)
        # check sites
        for idx, st in enumerate(structure.StructureReader(self.rxnwf_file), 1):
            sites_dict = self.binned_sites.get(idx)
            if not sites_dict:
                continue
            sites = [site for sites in sites_dict.values() for site in sites]
            Sites.validateSitesData(sites, st)
            if sfu.get_idxs_marked_atoms(st, KEEP_ATOM_KEY):
                self.has_keep_atoms = True
        if self.logger:
            self.logger.info(f'A total of {nfiles} reaction workflow input '
                             'files will be created.')
            self.logger.info('')
        if nfiles > 500 and self.logger:
            self.logger.warning('Enumerating more than 500 files can be time '
                                'consuming.') 
[docs]    def setRGroupStructures(self):
        """
        Set the R-group structures.
        """
        idx = 0
        for hash_idxs in list(self.binned_rgroup_sts.keys()):
            rgroup_file = self.binned_rgroup_sts[hash_idxs]
            sts = []
            for jdx, st in enumerate(structure.StructureReader(rgroup_file), 1):
                if not st.title:
                    st.title = 'R' + str(idx + jdx)
                build.add_hydrogens(st)
                # if keep atoms are present then set all atoms in the R-group
                # as keep atoms as it doesn't make sense to enumerate and not
                # keep the enumerated result
                if self.has_keep_atoms:
                    for atom in st.atom:
                        atom.property[KEEP_ATOM_KEY] = 1
                sts.append(st)
            self.binned_rgroup_sts[hash_idxs] = sts
            idx += len(sts) 
[docs]    def setHasStructureSpecificSites(self):
        """
        Set has structure specific sites.
        """
        all_sites_list = []
        for sites_dict in self.binned_sites.values():
            sites_list = []
            for hash_idx in sorted(sites_dict):
                sites = [(site.from_idx, site.to_idx)
                         for site in sites_dict[hash_idx]]
                sites_list.append((hash_idx, tuple(sites)))
            all_sites_list.append(tuple(sites_list))
        n_structures = structure.count_structures(self.rxnwf_file)
        self.has_structure_specific_sites = not (len(all_sites_list)
                                                 == n_structures and
                                                 len(set(all_sites_list)) == 1) 
[docs]    def getStructures(self):
        """
        Generates structure dictionaries where keys are enumeration indices
        and values are structures.
        :rtype: dict
        :return: keys are enumeration indices, values are `structure.Structure`
        """
        if self.force_hetero_substitution:
            getter = lambda x, y: itertools.permutations(x, r=y)
        else:
            getter = lambda x, y: itertools.product(x, repeat=y)
        getters, all_hash_idxs = [], []
        for hash_idxs, sts in self.binned_rgroup_sts.items():
            getters.append(getter(sts, len(hash_idxs)))
            all_hash_idxs.extend(hash_idxs)
        for tuples in itertools.product(*getters):
            yield dict(
                zip(all_hash_idxs,
                    tuple(st for atuple in tuples for st in atuple))) 
[docs]    def doEnumeration(self):
        """
        Do the enumeration.
        """
        rgroup_titles_count_dict = {}
        for rgroup_sts_dict in self.getStructures():
            out_sts, rgroup_titles = [], []
            for idx, st in enumerate(structure.StructureReader(self.rxnwf_file),
                                     1):
                # handle no enumeration case
                sites_dict = self.binned_sites.get(idx)
                if not sites_dict:
                    out_sts.append(st)
                    continue
                # in order to ensure that any dummy centroid atoms remain
                # at the end of the atom list temporarily convert to eta
                changed = etatoggle.toggle_structure(st,
                                                     out_rep=parserutils.ETA)
                out_st = substitute(st, rgroup_sts_dict, sites_dict)
                if changed:
                    etatoggle.toggle_structure(out_st,
                                               out_rep=parserutils.CENTROID)
                out_sts.append(out_st)
                rgroup_titles.append(out_st.title.replace(st.title + '_', ''))
            if self.has_structure_specific_sites:
                rgroups = []
                for rgroup_title in rgroup_titles:
                    for rgroup in rgroup_title.split(RGROUP_SEPARATOR):
                        if rgroup not in rgroups:
                            rgroups.append(rgroup)
                rgroup_titles = RGROUP_SEPARATOR.join(rgroups)
            else:
                rgroup_titles = rgroup_titles[0]  # they are all they same
            rgroup_titles_count_dict.setdefault(rgroup_titles, []).append(1)
            count = len(rgroup_titles_count_dict[rgroup_titles])
            if count == 1:
                idx_str = ''
            else:
                idx_str = f'_{count}'
            if self.out_rep:
                for st in out_sts:
                    etatoggle.toggle_structure(st, out_rep=self.out_rep)
            file_name = f'{self.base_name}_{rgroup_titles}{idx_str}{REACTION_WF_TAG}.mae'
            structure.write_cts(out_sts, file_name)
            self.out_structure_files.append(file_name) 
[docs]    def run(self):
        """
        Run it.
        """
        self.validate()
        self.setRGroupStructures()
        self.setHasStructureSpecificSites()
        self.doEnumeration()
        job_be = jobcontrol.get_backend()
        if job_be or self.logger:
            for afile in self.out_structure_files:
                if job_be:
                    job_be.addOutputFile(afile)
                if self.logger:
                    msg = (f'Created enumerated file {afile}.')
                    self.logger.info(msg)  
[docs]def replace_rxnwf_file_suffix(rxnwf_file, suffix):
    """
    Replace the suffix in the given reaction workflow file
    with a new suffix.
    :type rxnwf_file: str
    :param rxnwf_file: the reaction workflow file
    :type suffix: str
    :param suffix: the new suffix
    :rtype: str
    :return: the new string
    """
    return rxnwf_file.replace(f'{REACTION_WF_TAG}.mae', suffix) 
[docs]class EnumerateSwapMixin:
    """
    Manage enumeration and swapping.
    """
[docs]    def runEnumerateRXNWF(self, tag):
        """
        Run enumerate reaction workflow.
        :type tag: str
        :param tag: either the REFERENCE or NOVEL module constant
        :raise ReactionWorkflowException: if there is an issue
        :rtype: set
        :return: the names of the enumerated reaction workflow files
        """
        assert tag in [REFERENCE, NOVEL]
        if tag == REFERENCE:
            sites = self.reference_sites
            rxnwf_file = self.reference_rxnwf_file
            # do not allow this type of hetero substitution because
            # it should not be needed
            force_hetero_substitution = False
        elif tag == NOVEL:
            sites = self.novel_sites
            rxnwf_file = self.novel_rxnwf_file
            force_hetero_substitution = self.force_hetero_substitution
        rgroup_files = {}
        for site in sites:
            rgroup_file = self.rgroup_files.get(site.hash_idx)
            if rgroup_file:
                rgroup_files[site.hash_idx] = rgroup_file
        if not (rgroup_files and sites):
            return set([rxnwf_file])
        if self.logger:
            self.logger.info(
                f'R-group enumerating the {tag} reaction workflow file.')
            self.logger.info('')
        out_rep = getattr(self, 'out_rep', None)
        base_name = replace_rxnwf_file_suffix(rxnwf_file, '')
        obj = EnumerateReactionWorkflow(
            rxnwf_file,
            rgroup_files,
            sites,
            force_hetero_substitution=force_hetero_substitution,
            out_rep=out_rep,
            base_name=base_name,
            logger=self.logger)
        try:
            obj.run()
        except InvalidInput as err:
            raise ReactionWorkflowException(str(err))
        if self.logger:
            self.logger.info('')
        return set(obj.out_structure_files) 
[docs]    def runSwapFragments(self, enumerated_novel_files, reference_rxnwf_file):
        """
        Run swap fragments.
        :type enumerated_novel_files: set
        :param enumerated_novel_files: the names of the enumerated novel
            files
        :type reference_rxnwf_file: str
        :param reference_rxnwf_file: the reference reaction workflow file
        :raise ReactionWorkflowException: if there is an issue
        :rtype: set
        :return: the names of the enumerated reaction workflow files
        """
        if self.logger:
            self.logger.info(
                f'Swapping fragments in {reference_rxnwf_file} with each '
                'of the available novel fragments.')
            self.logger.info('')
        enumerated_rxnwf_files = set()
        for enumerated_novel_file in enumerated_novel_files:
            novel_st = structure.Structure.read(enumerated_novel_file)
            novel_keep_idxs = sfu.get_keep_idxs(novel_st)
            novel_superposition_idxs = sfu.get_superposition_idxs(novel_st)
            enumerated_rxnwf_file = replace_rxnwf_file_suffix(
                enumerated_novel_file, '_' + reference_rxnwf_file)
            enumerated_rxnwf_files.add(enumerated_rxnwf_file)
            group_name = replace_rxnwf_file_suffix(enumerated_rxnwf_file, '')
            with structure.StructureWriter(enumerated_rxnwf_file) as writer:
                for reference_st in structure.StructureReader(
                        reference_rxnwf_file):
                    reference_keep_idxs = sfu.get_keep_idxs(reference_st)
                    reference_superposition_idxs = sfu.get_superposition_idxs(
                        reference_st)
                    # only superpose relevant structures and copy the others (for
                    # example spectators)
                    if reference_keep_idxs and reference_superposition_idxs:
                        title = novel_st.title + '_' + reference_st.title
                        # in order to ensure that any dummy centroid atoms remain
                        # at the end of the atom list temporarily convert to eta
                        novel_changed = etatoggle.toggle_structure(
                            novel_st, out_rep=parserutils.ETA)
                        reference_changed = etatoggle.toggle_structure(
                            reference_st, out_rep=parserutils.ETA)
                        try:
                            st = sfu.get_assembled_structure(
                                novel_st,
                                reference_st,
                                novel_superposition_idxs,
                                reference_superposition_idxs,
                                novel_keep_idxs,
                                reference_keep_idxs,
                                title=title,
                                require_identical_bonds=False)
                        except sfu.SwapFragmentsException as err:
                            raise ReactionWorkflowException(str(err))
                        if novel_changed or reference_changed:
                            etatoggle.toggle_structure(
                                st, out_rep=parserutils.CENTROID)
                        # for the purposes of running the reaction workflow in the next
                        # step of this workflow force the eid to be consistent with the
                        # reference as it will be used in the underlying Jaguar multistage
                        # workflow
                        st.property[
                            msprops.ENTRY_ID_PROP] = reference_st.property[
                                msprops.ENTRY_ID_PROP]
                    else:
                        st = reference_st
                    # update Maestro grouping hierarchy
                    groups = msutils.get_project_group_hierarchy(
                        st=reference_st)
                    groups[0] = group_name
                    msutils.set_project_group_hierarchy(st, groups)
                    # handle output representation
                    out_rep = getattr(self, 'out_rep', None)
                    if out_rep:
                        etatoggle.toggle_structure(st, out_rep=out_rep)
                    writer.append(st)
        return enumerated_rxnwf_files 
 
[docs]class EnumerateSwap(EnumerateSwapMixin):
    """
    Manage enumeration and swapping.
    """
[docs]    def __init__(self,
                 reference_rxnwf_file,
                 novel_rxnwf_file=None,
                 rgroup_files=None,
                 reference_sites=None,
                 novel_sites=None,
                 force_hetero_substitution=False,
                 logger=None):
        """
        Create an instance.
        :type reference_rxnwf_file: str
        :param reference_rxnwf_file: the reaction workflow file containing the reference
            structures
        :type novel_rxnwf_file: str
        :param novel_rxnwf)file: the reaction workflow file containing the single
            novel structure
        :type rgroup_files: dict
        :param rgroup_files: keys are hash_idx (see sites), values are file names
        :type reference_sites: list
        :param reference_sites: contains `Site` for the reference structures
        :type novel_sites: list
        :param novel_sites: contains `Site` for the novel structure
        :type force_hetero_substitution: bool
        :param force_hetero_substitution: if True then for hetero-eumeration
            do not additionally include homo-enumeration results
        :type logger: logging.Logger or None
        :param logger: output logger or None if there isn't one
        """
        self.reference_rxnwf_file = reference_rxnwf_file
        self.novel_rxnwf_file = novel_rxnwf_file
        self.rgroup_files = rgroup_files
        self.reference_sites = reference_sites
        self.novel_sites = novel_sites
        self.force_hetero_substitution = force_hetero_substitution
        self.logger = logger 
[docs]    def run(self):
        """
        Run it.
        :rtype: set
        :return: all reaction workflow input files
        """
        return self.getRXNWFInputFiles()  
[docs]def enumerate_swap(reference_rxnwf_file,
                   novel_rxnwf_file=None,
                   rgroup_files=None,
                   reference_sites=None,
                   novel_sites=None,
                   force_hetero_substitution=False):
    """
    Return all reaction workflow files created by R-group enumerating the given
    reference and novel structures and swapping reference fragments for novel
    fragments.
    :type reference_rxnwf_file: str
    :param reference_rxnwf_file: the reaction workflow file containing the reference
        structures
    :type novel_rxnwf_file: str
    :param novel_rxnwf)file: the reaction workflow file containing the single
        novel structure
    :type rgroup_files: dict
    :param rgroup_files: keys are hash_idx (see sites), values are file names
    :type reference_sites: list
    :param reference_sites: contains `Site` for the reference structures
    :type novel_sites: list
    :param novel_sites: contains `Site` for the novel structure
    :type force_hetero_substitution: bool
    :param force_hetero_substitution: if True then for hetero-eumeration
        do not additionally include homo-enumeration results
    :rtype: set
    :return: all reaction workflow input files
    """
    eswap = EnumerateSwap(reference_rxnwf_file,
                          novel_rxnwf_file=novel_rxnwf_file,
                          rgroup_files=rgroup_files,
                          reference_sites=reference_sites,
                          novel_sites=novel_sites,
                          force_hetero_substitution=force_hetero_substitution)
    return eswap.run() 
[docs]def get_sites(sites):
    """
    Return the reference and novel sites.
    :type sites: list
    :param sites: contains for each site a list of data [from_idx, to_idx,
        hash_idx] with an optional fourth item structure_idx and an optional
        fifth item reaction_workflow_utils.REFERENCE or reaction_workflow_utils.NOVEL
        module constants
    :raise ReactionWorkflowException: if there is an issue
    :rtype: list, list
    :return: the reference and novel sites, each containing `rxnwfu.Site`
    """
    reference_sites, novel_sites = [], []
    for site in sites:
        items = []
        for item in site:
            try:
                items.append(int(item))
            except ValueError:
                items.append(item)
        if len(items) == 5:
            if items[-1] == REFERENCE:
                reference_sites.append(items[:-1])
            elif items[-1] == NOVEL:
                if items[-2] != 1:
                    msg = (
                        f'Sites designated as "{NOVEL}" must specify a single '
                        'structure index with a value of 1.')
                    raise ReactionWorkflowException(msg)
                novel_sites.append(items[:-2])
            else:
                msg = (f'Sites can only be designated as "{REFERENCE}" or '
                       f'"{NOVEL}".')
                raise ReactionWorkflowException(msg)
        elif len(items) == 4:
            if items[-1] != 1:
                msg = (f'Sites designated as "{NOVEL}" must specify a single '
                       'structure index with a value of 1.')
                raise ReactionWorkflowException(msg)
            novel_sites.append(items[:-1])
        elif len(items) == 3:
            novel_sites.append(items)
        else:
            msg = (
                'Sites must specify three indices with an optional fourth index '
                f'plus an optional fifth item, either "{REFERENCE}" or '
                f'"{NOVEL}".')
            raise ReactionWorkflowException(msg)
    reference_sites = Sites.getSites(reference_sites)
    novel_sites = Sites.getSites(novel_sites)
    return reference_sites, novel_sites 
[docs]def validate_auto_reaction_workflow_files(reaction_wf_input,
                                          novel_rxnwf_file=None,
                                          mass_conserved=False,
                                          sites=None):
    """
    Validate workflow input file, novel reaction workflow file
    :type reaction_wf_input: str
    :param reaction_wf_input: reference reaction workflow file name
    :type novel_rxnwf_file: str
    :param novel_rxnwf_file: the reaction workflow file containing the single
        novel structure
    :type mass_conserved: bool
    :param mass_conserved: True if mass is conserved else false
    :type sites: list
    :param sites: contains for each site a list of data [from_idx, to_idx,
        hash_idx] with an optional fourth item structure_idx and an optional
        fifth item reaction_workflow_utils.REFERENCE or reaction_workflow_utils.NOVEL
        module constants
    :raise ReactionWorkflowException: if there is an issue
    """
    # check reference rxnwf file
    type_cast_reaction_wf_input(reaction_wf_input,
                                exception_type=ReactionWorkflowException,
                                mass_conserved=mass_conserved)
    # check novel rxnwf file
    if novel_rxnwf_file and structure.count_structures(novel_rxnwf_file) > 1:
        msg = (
            'The novel reaction workflow file must contain a single structure.')
        raise ReactionWorkflowException(msg)
    # Validate sites
    if sites:
        get_sites(sites) 
[docs]def has_keep_and_superpose_atoms(input_st):
    """
    Return True if the given structure has at least a single keep atom
    and a single superpose atom.
    :type input_st: `structure.Structure`
    :param input_st: the structure
    :rtype: bool
    :return: True if the structure has at least a single keep atom
        and a single superpose atom
    """
    # if there are superpose atoms the requirement that there be at
    # least three of them is enforced in the Reaction Workflow Input GUI
    # which prepares the input here
    has_keep = has_superpose = False
    for atom in input_st.atom:
        if atom.property.get(KEEP_ATOM_KEY):
            has_keep = True
        if atom.property.get(SUPERPOSITION_ATOM_KEY):
            has_superpose = True
        if has_keep and has_superpose:
            return True
    return False 
[docs]def validate_auto_reaction_workflow_options(
        reaction_wf_input,
        novel_rxnwf_file=None,
        n_conformers=DEFAULT_N_CONFORMERS,
        jaguar_keywords=DEFAULT_JAGUAR_KEYWORDS):
    """
    Validate auto reaction workflow options.
    :type reaction_wf_input: str
    :param reaction_wf_input: reference reaction workflow file name
    :type novel_rxnwf_file: str
    :param novel_rxnwf_file: the reaction workflow file containing the single
        novel structure
    :type n_conformers: int
    :param n_conformers: number of conformers to search for
    :type jaguar_keywords: str
    :param jaguar_keywords: whitespace separated Jaguar <key>=<value> tokens
    :raise ReactionWorkflowException: if there is an issue
    """
    try:
        sts = list(structure.StructureReader(reaction_wf_input))
    except IOError as err:
        raise ReactionWorkflowException(str(err))
    if novel_rxnwf_file:
        # check references
        for st in sts:
            if has_keep_and_superpose_atoms(st):
                break
        else:
            msg = (
                'At least a single reference structure must contain atoms marked with '
                f'the "{KEEP_ATOM_KEY}" and "{SUPERPOSITION_ATOM_KEY}" '
                'properties which specify how to swap fragments with the novel '
                'structure.')
            raise ReactionWorkflowException(msg)
        # check novel
        if structure.count_structures(novel_rxnwf_file) > 1:
            msg = (
                'The novel reaction workflow file must contain a single structure.'
            )
            raise ReactionWorkflowException(msg)
        novel_st = structure.Structure.read(novel_rxnwf_file)
        if not has_keep_and_superpose_atoms(novel_st):
            msg = (
                'The novel structure must contain atoms marked with the '
                f'"{KEEP_ATOM_KEY}" and "{SUPERPOSITION_ATOM_KEY}" '
                'properties which specify how to swap fragments with the reference '
                'structures.')
            raise ReactionWorkflowException(msg)
        sts.append(novel_st)
    # check FF
    if n_conformers:
        try:
            check_ff_assignment(sts, ffld_name=DEFAULT_FORCE_FIELD_NAME)
        except ValueError as err:
            raise ReactionWorkflowException(str(err))
    # check Jaguar kwargs
    type_cast_jaguar_keywords(jaguar_keywords,
                              exception_type=ReactionWorkflowException) 
[docs]def validate_all_auto_reaction_workflow_options(
        reaction_wf_input,
        novel_rxnwf_file=None,
        mass_conserved=False,
        sites=None,
        n_conformers=5,
        jaguar_keywords=DEFAULT_JAGUAR_KEYWORDS):
    """
    Validate all auto reaction workflow options or raise
    ReactionWorkflowException error.
    :type reaction_wf_input: str
    :param reaction_wf_input: reference reaction workflow file name
    :type novel_rxnwf_file: str
    :param novel_rxnwf_file: the reaction workflow file containing the single
        novel structure
    :type mass_conserved: bool
    :param mass_conserved: True if mass is conserved else false
    :type sites: list
    :param sites: contains for each site a list of data [from_idx, to_idx,
        hash_idx] with an optional fourth item structure_idx and an optional
        fifth item reaction_workflow_utils.REFERENCE or reaction_workflow_utils.NOVEL
        module constants
    :type n_conformers: int
    :param n_conformers: number of conformers to search for
    :type jaguar_keywords: str
    :param jaguar_keywords: whitespace separated Jaguar <key>=<value> tokens
    :raise rxnwfu.ReactionWorkflowException: if there is an issue
    """
    validate_auto_reaction_workflow_files(reaction_wf_input=reaction_wf_input,
                                          novel_rxnwf_file=novel_rxnwf_file,
                                          mass_conserved=mass_conserved,
                                          sites=sites)
    validate_auto_reaction_workflow_options(reaction_wf_input=reaction_wf_input,
                                            novel_rxnwf_file=novel_rxnwf_file,
                                            n_conformers=n_conformers,
                                            jaguar_keywords=jaguar_keywords) 
[docs]def get_smiles(st):
    """
    Return the smiles of the given metal complex.
    :type st: `schrodinger.structure.Structure`
    :param st: the metal complex
    :raise: (`rdkit_adapter.InconsistentStructureError`,
        `rdkit_adapter.UnsupportedStructureError`)
    :rtype: str
    :return: smiles
    """
    # smiles generation requires centroid representation
    _st = st.copy()
    etatoggle.toggle_structure(_st, out_rep=parserutils.CENTROID)
    return adapter.to_smiles(_st, adapter.StereoChemistry_Copy,
                             adapter.Canonicalize_Enable,
                             adapter.Sanitize_Enable) 
[docs]def bin_by_geometry(sts, eps=parserutils.DEFAULT_DEDUP_GEOM_EPS):
    """
    Return a dictionary of structures binned by common geometry.
    :type sts: list[`schrodinger.structure.Structure`]
    :param sts: the structures
    :type eps: float
    :param eps: the RMSD precision in Angstrom that controls the size
        of the clusters, see sklearn.cluster.DBSCAN documentation for
        more details
    :rtype: dict
    :return: keys are representative structures from each geometry bin,
        values are lists of other structures in the same geometry bin
    """
    # while SMILES allows atomic charges they do not allow total charge
    # nor total multiplicity (nor atomic multiplicity) so generalize the
    # hash
    hash_dict = {}
    for st in sts:
        smiles = get_smiles(st)
        charge = st.property.get(CHARGE_KEY, 0)
        multip = st.property.get(MULTIPLICITY_KEY, 1)
        ahash = f'{smiles}_charge_{charge}_multip_{multip}'
        hash_dict.setdefault(ahash, []).append(st)
    st_dict = {}
    for conformers in hash_dict.values():
        ref_st = conformers[0]  # choose first conformer as the reference
        idxs = list(range(1, ref_st.atom_total + 1))
        key = lambda x: rmsd.superimpose(
            ref_st, idxs, x, idxs, use_symmetry=False, move_which=rmsd.CT)
        groups = mathutils.dbscan_cluster(conformers, eps, key=key)
        for group in groups.values():
            ref_st = group[0]  # choose first geometry as the reference
            st_dict[ref_st] = group[1:]
    return st_dict 
[docs]def get_orig_title(title, unique_geom_dict):
    """
    Return the original title of the given title.
    :type title: str
    :param title: the title
    :type unique_geom_dict: dict
    :param unique_geom_dict: keys are original titles, values are structures
        in the same geometry bin, with the representative structure at
        index 0
    :rtype: str, str
    :return: the original title and the extension
    """
    # titles come in like the following
    #
    # 'my_reactant' (structure title directly from user input for the job)
    #
    # 'my_reactant_stage_<x>' where x > 0, if the stage is an analysis stage
    #     then there will be a trailing '_analysis_<y>' where y > 0 is the parent
    #     stage whose output structure is used to store the analysis property,
    #     (structure title directly from JMSWF output)
    #
    # 'my_reactant-<x>_stage_<y>' where x > 1 is the conformer number, otherwise
    #     same as above (structure title directly from JMSWF output or structure
    #     JMSWF output file base name)
    #
    # 'my_reactant-<x>' where x > 1 is the conformer number, (structure title
    #     after postprocessing JMSWF)
    #
    # 'my_reactant-<x>_stage_<y>_anharmonic' where x > 1 is the conformer number,
    #     otherwise same as above (structure anharmonic output file base name)
    #
    # where 'my_reactant' has been jobutils.clean_string'd
    # see MATSCI-11362 - we need to distinguish whether titles like 'my_reactant-2'
    # were original titles or those generated by the conformational search which
    # uses an '-<idx>' convention
    orig_titles = set()
    for orig_sts in unique_geom_dict.values():
        orig_titles.update(
            [jobutils.clean_string(orig_st.title) for orig_st in orig_sts])
    if title in orig_titles:
        return title, ''
    if STAGE_SEPARATOR in title:
        conf_title, stage_hash = title.split(STAGE_SEPARATOR)
        stage_hash = f'{STAGE_SEPARATOR}{stage_hash}'
    else:
        conf_title, stage_hash = title, ''
    match = re.search(r'(-\d+)$', conf_title)
    if match:
        ext = match.group(1)
        orig_title = conf_title.rpartition(ext)[0]
    else:
        ext = ''
        orig_title = conf_title
    return orig_title, f'{ext}{stage_hash}' 
[docs]def get_unique_geom_title(title, unique_geom_dict):
    """
    Return the title of the unique geometry representative for the
    given title.
    :type title: str
    :param title: the title
    :type unique_geom_dict: dict
    :param unique_geom_dict: keys are original titles, values are structures
        in the same geometry bin, with the representative structure at
        index 0
    :rtype: str
    :return: the title of the representative
    """
    orig_title, ext = get_orig_title(title, unique_geom_dict)
    if not unique_geom_dict:
        return title
    elif unique_geom_dict.get(orig_title):
        return title
    else:
        for rep_title, orig_sts in unique_geom_dict.items():
            orig_st, orig_other_sts = orig_sts[0], orig_sts[1:]
            for orig_other_st in orig_other_sts:
                if orig_other_st.title == orig_title:
                    return f'{rep_title}{ext}' 
[docs]def get_energy_db_term(rxnwf_file_path=None,
                       rxnwf_sts=None,
                       tstart=jmswfu.DEFAULT_TEMP_START,
                       tstep=jmswfu.DEFAULT_TEMP_STEP,
                       ntemp=jmswfu.DEFAULT_TEMP_N,
                       pstart=jmswfu.DEFAULT_PRESS_START,
                       pstep=jmswfu.DEFAULT_PRESS_STEP,
                       npress=jmswfu.DEFAULT_PRESS_N,
                       anharm=False,
                       extra_stages_file=False,
                       **kwargs):
    """
    Get a dict containing all the energy term generated by reaction workflow
    :type rxnwf_file_path: str or None
    :param rxnwf_file_path: the path to a reaction workflow file
    :type rxnwf_sts: list[`schrodinger.structure.Structure`] or None
    :param rxnwf_sts: reaction workflow structures
    :type tstart: float
    :param tstart: the starting temperature (K)
    :type tstep: float
    :param tstep: the step size of the temperature (K)
    :type ntemp: int
    :param ntemp: the number of temperature points
    :type pstart: float
    :param pstart: the starting pressure (atm)
    :type pstep: float
    :param pstep: the step size of the pressure (atm)
    :type npress: int
    :param npress: the number of pressure points
    :type anharm: bool
    :param anharm: True if anharmonic calculation is performed else False
    :type extra_stages_file: string
    :param extra_stages_file: name of stage file
    :rtype energy_term_dict: dict
    :return: dictionary of energy keys.
    """
    msg = ('Both input file and input structures are present or absent. '
           'Please specify either input file or input structures')
    assert bool(rxnwf_file_path) ^ bool(rxnwf_sts), msg
    rate_constants = kwargs.pop('rate_constants', False)
    custom_rate_constants = kwargs.pop('custom_rate_constants', False)
    if rxnwf_file_path:
        rxnwf_sts = list(structure.StructureReader(rxnwf_file_path))
    keywords = {
        'ifreq': 1,
        'tmpini': tstart,
        'tmpstp': tstep,
        'ntemp': ntemp,
        'press': pstart,
        'press_step': pstep,
        'npress': npress
    }
    not_include = (jaguarworkflows.HOMO_ENERGY_PROP,
                   jaguarworkflows.ALPHA_HOMO_ENERGY_PROP,
                   jaguarworkflows.BETA_HOMO_ENERGY_PROP,
                   jaguarworkflows.LUMO_ENERGY_PROP,
                   jaguarworkflows.ALPHA_LUMO_ENERGY_PROP,
                   jaguarworkflows.BETA_LUMO_ENERGY_PROP,
                   jaguarworkflows.ENTROPY_STARTER)
    # Collect all key info from Jaguar
    keys = jmswfu.get_property_keys_from_keywords(keywords)
    keys.append(jaguarworkflows.GAS_PHASE_ZERO_POINT_ENERGY_PROP)
    # Create dict to collect info
    energy_term_dict = {}
    energy_term_dict.setdefault(ENERGY_KEYS, {})
    energy_term_dict.setdefault(ANHARMONIC_ENERGY_KEYS, {})
    energy_term_dict.setdefault(CUSTOM_ENERGY_KEYS, {})
    # Collect energy keys
    stage_idx = get_rep_stage_idx(rxnwf_sts=rxnwf_sts)
    energy_terms = [
        key + STAGE_SEPARATOR + str(stage_idx)
        for key in keys
        if not key.startswith(not_include)
    ]
    for term in energy_terms:
        key = ReactionWorkflowEnergyAnalysis.getHeader(term)
        energy_term_dict[ENERGY_KEYS][key] = term
    # Collect anharmonic keys
    if anharm:
        not_include_anharmonic_terms = (
            jaguarworkflows.ZERO_POINT_ENERGY_PROP,
            jaguarworkflows.GAS_PHASE_ENERGY_PROP,
            jaguarworkflows.GAS_PHASE_ZERO_POINT_ENERGY_PROP)
        filtered_terms = [
            key for key in energy_terms
            if not key.startswith(not_include_anharmonic_terms)
        ]
        for term in filtered_terms:
            anharmonic_energy_term = anharmonic.KEY_STARTER + "_".join(
                term.split("_")[2:])
            key = ReactionWorkflowEnergyAnalysis.getHeader(
                anharmonic_energy_term)
            energy_term_dict[ANHARMONIC_ENERGY_KEYS][
                key] = anharmonic_energy_term
    # Collect custom key
    if extra_stages_file:
        press_data = PressData(press_start=pstart,
                               press_step=pstep,
                               press_n=npress)
        temp_data = TempData(temp_start=tstart, temp_step=tstep, temp_n=ntemp)
        pressures, temps = get_pressures_temps(press_data, temp_data)
        extra_stages = jmswfu.read_stage_datafile(extra_stages_file)
        for extra_stage in extra_stages:
            if not extra_stage.analyze_data:
                continue
            for analyze_data in extra_stage.analyze_data:
                # if using wildcards collect keys over the entire pressure and
                # temperature matrix, doing this for a single line is sufficient
                # hence the break
                if jaguarworkflows.ALL_TEMP_PRESS_KEY_EXT in analyze_data.parent_key:
                    keys = []
                    for press, temp in itertools.product(pressures, temps):
                        ext = jaguarworkflows.get_temp_press_key_ext(
                            temp, press)
                        keys.append(f'{analyze_data.key}{ext}')
                    break
            else:
                keys = [extra_stage.analyze_data[0].key]
            for key in keys:
                term = f'{key}{STAGE_SEPARATOR}{stage_idx + extra_stage.index - 1}'
                key = ReactionWorkflowEnergyAnalysis.getHeader(term)
                energy_term_dict[CUSTOM_ENERGY_KEYS][key] = term
    if rate_constants:
        if custom_rate_constants:
            value = RATE_CONSTANT_STRING
        else:
            value = CONF_AVG_CTST_RATE_CONSTANT
    else:
        value = None
    energy_term_dict[RATE_CONSTANT_KEY] = value
    return energy_term_dict 
[docs]class SiblingNode:
    """
    Contains parent-child information for a sibling group as well as features
    for creating an energy diagram
    """
    LINE_LENGTH = 1
    X_GAP = LINE_LENGTH / 2
[docs]    def __init__(self, name, axes):
        """
        :param str name: The name of the sibling group
        :param `matplotlib.axes.Axes` axes: The plot's axes object
        """
        self.name = name
        self.axes = axes
        self.generation = None
        self.csv_idx = None
        self.color = None
        self.parents = []
        self.children = [] 
[docs]    def addToPlot(self, y_val, energy, text_offset):
        """
        Add this sibling group to the plot
        :param float y_val: The y-value of the sibling on the plot
        :param float energy: The energy to show on the label
        :param float text_offset: The offset for showing the label above
            horizontal lines
        """
        self.y_val = y_val
        x_mid = (self.X_GAP + self.LINE_LENGTH) * self.generation
        x_1 = x_mid - self.LINE_LENGTH / 2
        self.x_2 = x_mid + self.LINE_LENGTH / 2
        color = self.color or 'k'
        self.axes.plot((x_1, self.x_2), (y_val, y_val),
                       linewidth=3,
                       color=color)
        label_text = self.name + ': {0:.2f}'.format(energy)
        self.label = self.axes.text(x_mid,
                                    y_val + text_offset,
                                    label_text,
                                    ha='center',
                                    size='small')
        color = self.color or '0.75'
        for parent in self.parents:
            self.axes.plot((x_1, parent.x_2), (y_val, parent.y_val),
                           color=color)  
[docs]class EnergyDiagramPlotter:
    """
    Class for exporting energy level diagrams
    """
    TOTAL_FREE_ENERGY = 'Total_Free_Energy'
    TEXT_OFFSET_DIVISOR = 50
    VERTICAL_SPACING_DIVISOR = 23
    PDF_FILE_ENDING = '_e_diagrams.pdf'
    PNG_FILE_ENDING = '_std_e_diagram.png'
[docs]    def __init__(self, json_path, csv_path, logger=None):
        """
        :param str json_path: Path to _conf_avg_wo_x_rel_parents.json file
        :param str csv_path: Path to _conf_avg_wo_x_rel_reactants.csv file
        """
        # We need the csv in addition to the json because the former contains
        # energies relative to reactants while the latter contains the needed
        # parenting information
        self.json_path = json_path
        self.csv_path = csv_path
        self.logger = logger 
[docs]    def run(self):
        """
        Export the energy level diagrams
        """
        # Agg rather than QTAgg is supposed to be default backend (MATSCI-9819)
        # to prevent requiring graphics libraries in case running on a remote compute host
        import matplotlib
        from matplotlib import figure
        from matplotlib.backends import backend_pdf
        from matplotlib.backends.backend_agg import FigureCanvasAgg
        self.color_map = matplotlib.cm.get_cmap('gist_rainbow')
        fig = figure.Figure(figsize=(10, 7.5), dpi=200)
        self.canvas = FigureCanvasAgg(fig)
        self.axes = fig.add_subplot(111)
        fig.subplots_adjust(left=0.05, right=0.95)
        # Need to read the sibling names as str otherwise 000 will change to 0
        # and it won't match the json file anymore (MATSCI-11196)
        self.ene_df = pandas.read_csv(self.csv_path,
                                      dtype={SIBLING_GROUP_NAME: str})
        self.findTotalEnergyProp()
        self.parseSiblings()
        job_name = jobutils.get_jobname(DEFAULT_JOB_NAME)
        pdf_path = job_name + self.PDF_FILE_ENDING
        e_png_path = job_name + self.PNG_FILE_ENDING
        with backend_pdf.PdfPages(pdf_path) as pdf_pages:
            for prop in self.ene_df.columns[1:]:
                self.plotDiagram(prop)
                pdf_pages.savefig(fig)
                if prop == self.tot_ene_prop:
                    self.canvas.print_figure(e_png_path)
        jobutils.add_outfile_to_backend(pdf_path)
        jobutils.add_outfile_to_backend(e_png_path) 
[docs]    def findTotalEnergyProp(self):
        """
        Find the Total Free Energy property with temperature and pressure
        closest to 298.15 K and 1.0 atm
        """
        def get_closest_key(the_dict, value):
            keys = list(the_dict.keys())
            array = numpy.array(keys)
            idx = (numpy.abs(array - value)).argmin()
            return keys[idx]
        props = self.ene_df.columns[1:]
        tot_ene_props = [p for p in props if self.TOTAL_FREE_ENERGY in p]
        temp_press_dict = defaultdict(dict)
        for prop in tot_ene_props:
            temp = jaguarworkflows.get_temperature(prop)
            press = jaguarworkflows.get_pressure(prop)
            temp_press_dict[temp][press] = prop
        target_temp = get_closest_key(temp_press_dict, 298.15)
        target_press = get_closest_key(next(iter(temp_press_dict.values())),
                                       1.0)
        self.tot_ene_prop = temp_press_dict[target_temp][target_press] 
[docs]    def getSibling(self, name):
        """
        Get an existing sibling or create a new one and return it
        :param str name: The name of the sibling
        :rtype: `SiblingNode`
        :return: A SiblingNode object
        """
        if name in self.siblings:
            return self.siblings[name]
        else:
            sibling = SiblingNode(name, self.axes)
            self.siblings[name] = sibling
            return sibling 
[docs]    def parseSiblings(self):
        """
        Read the siblings and parent child info from json file and find
        the generation of each sibling
        """
        with open(self.json_path) as json_fh:
            json_data = json.load(json_fh)
        self.siblings = {}
        for sibling_name, parents_data in json_data.items():
            child = self.getSibling(sibling_name)
            for parent_name in parents_data:
                parent = self.getSibling(parent_name)
                parent.children.append(child)
                child.parents.append(parent)
        # Find indexes of each sibling in the csv file
        for idx, name in enumerate(self.ene_df[SIBLING_GROUP_NAME]):
            try:
                self.siblings[name].csv_idx = idx
            except KeyError:
                error = (f'Cannot find "{name}" sibling group in '
                         f'{os.path.basename(self.csv_path)}.')
                raise ReactionWorkflowException(error)
        self.findGenerations()
        self.findSameGenerationIndices()
        self.setColors() 
[docs]    def setColors(self):
        """
        Set colors for paths that have one parent and one child for all nodes,
        excluding the reactant
        """
        node_paths = []
        for first_gen in self.generations[1]:
            path = []
            node = first_gen
            while True:
                if len(node.parents) > 1 or len(node.children) > 1:
                    path.clear()
                    break
                if len(node.children) == 0:
                    # last node. Wrap up.
                    path.append(node)
                    break
                path.append(node)
                node = node.children[0]
            if path:
                node_paths.append(path)
        num_paths = len(node_paths)
        for idx, path in enumerate(node_paths):
            color = self.color_map(idx / num_paths)
            for node in path:
                node.color = color 
[docs]    def findGenerations(self):
        """
        Determine the generations of the siblings
        Each child's generation is the largest generation of its parents, plus 1
        which means that if we have the following ownerships:
        1 -> 2 -> 3 -> 4
        1 -> 5 -> 4
        "4" will be a 4th generation and "5" will be a 2nd generation, so the
        plot will look like this:
        1 -- 2 -- 3 -- 4
        1 -- 5 ------- 4
        as opposed to
        1 ------- 5 -- 4
        """
        def set_generation(sibling, generation):
            if sibling.generation is not None:
                if generation <= sibling.generation:
                    # Nothing to do
                    return
            sibling.generation = generation
            for child in sibling.children:
                set_generation(child, generation + 1)
        source = [x for x in self.siblings.values() if not x.parents][0]
        set_generation(source, 0)
        self.generations = defaultdict(list)
        for sibling in self.siblings.values():
            self.generations[sibling.generation].append(sibling) 
[docs]    def findSameGenerationIndices(self):
        """
        Find groups of csv indices that belong to the same generation
        """
        self.same_gen_indices = []
        for idx in range(1, max(self.generations) + 1):
            self.same_gen_indices.append(
                [sibling.csv_idx for sibling in self.generations[idx]]) 
[docs]    def plotDiagram(self, prop):
        """
        Plot the energy diagram for the passed property
        """
        self.axes.clear()
        self.axes.axis('off')
        self.axes.autoscale(axis="y")
        self.canvas.figure.suptitle(prop)
        disclaimer = ('Lines and labels are moved vertically to avoid overlaps.'
                      ' The Y-axis does not have a uniform scale.')
        self.axes.text(0.5,
                       -0.05,
                       disclaimer,
                       size=6,
                       ha="center",
                       transform=self.axes.transAxes)
        y_vals = self.getAdjustedYValues(prop)
        text_offset = self.ene_df[prop].values.ptp() / self.TEXT_OFFSET_DIVISOR
        for idx in range(max(self.generations) + 1):
            for sibling in self.generations[idx]:
                csv_index = sibling.csv_idx
                sibling.addToPlot(y_val=y_vals[csv_index],
                                  energy=self.ene_df[prop][csv_index],
                                  text_offset=text_offset)
        self.canvas.draw()
        self.labels = [sibling.label for sibling in self.siblings.values()]
        self.adjustLabels() 
[docs]    def adjustLabels(self):
        """
        Adjust the size of labels. If a label's width is large enough to overlap
        with a neighbour's label, reduce the font and redraw
        """
        adjust = False
        max_width = SiblingNode.LINE_LENGTH + SiblingNode.X_GAP
        for label in self.labels:
            extents = label.get_window_extent()
            width = extents.inverse_transformed(self.axes.transData).width
            if width > max_width:
                adjust = True
                break
        if adjust:
            for label in self.labels:
                label.set_size(label.get_size() - 0.5)
            self.canvas.draw()
            self.adjustLabels() 
[docs]    def getAdjustedYValues(self, prop):
        """
        Adjust the y values so that there is enough gap between all consecutive
        values in the same generation so there is no overlap.
        :param str prop: The property to adjust values for
        :rtype: numpy.array
        :return: The adjusted Y values as a numpy array
        """
        divisor = self.VERTICAL_SPACING_DIVISOR
        values = self.ene_df[prop].values.copy()
        base_range = values.ptp()
        # The required gap is a fixed portion of the plot size.
        required_diff = base_range / divisor
        last_num_raised = 0
        while True:
            num_raised = 0
            idx_to_deficit = {}
            for indices in self.same_gen_indices:
                sorted_indices = sorted(indices, key=lambda x: values[x])
                generation_values = values[sorted_indices]
                generation_diffs = numpy.diff(generation_values)
                for diff_idx, diff in enumerate(generation_diffs):
                    if diff < required_diff:
                        # Not enough room. All values above need to be raised
                        num_raised += 1
                        array_idx = sorted_indices[diff_idx + 1]
                        idx_to_deficit[array_idx] = required_diff - diff
            # Raising the values will increase the value range and shrink all
            # spacings because the plot height remains constant, so we will need
            # more space for labels to not overlap. Now we calculate the
            # extra spacing (x) required. Adding extra_spacing to deficits will
            # fix the problem if no new labels overlap
            # new_range = base_range + sum(deficits) + num_raised * x
            # new_required_diff = x + required_diff = new_height / divisor
            # divisor * x + divisor * required_diff =
            #   base_range + sum(deficits) + num_raised * x
            # (divisor-num_raised) * x = base_range + sum(deficits) -
            #   divisor * required_diff
            denom = divisor - num_raised
            if denom <= 0:
                # Too many colliding labels. Will need to reduce the spacing.
                divisor += 1
                continue
            numerator = (base_range + sum(idx_to_deficit.values()) -
                         divisor * required_diff)
            extra_spacing = numerator / denom
            if last_num_raised == num_raised:
                if abs(extra_spacing) > 1e-5:
                    if self.logger:
                        warning = (
                            'Error during adjustment of energy diagram'
                            f' positions for "{prop}". Using original values.')
                        self.logger.warning(warning)
                    return values
                    # Adjusting the sizes causes no new labels to overlap. Use the
                    # current deficits
                break
            last_num_raised = num_raised
            # Raise the requirement and see if any other labels overlap now
            required_diff += extra_spacing
        for idx, deficit in idx_to_deficit.items():
            val = values[idx]
            values[values >= val] += deficit
        return values