"""
Classes and functions to help with workflows that fragment multiple reactions
such as the bond dissociation and adsorption energy workflows.
Copyright Schrodinger, LLC. All rights reserved.
"""
from collections import defaultdict
import inflect
import numpy
from schrodinger.application.jaguar import input as jagin
from schrodinger.application.jaguar import results as jagresults
from schrodinger.application.matsci import buildcomplex
from schrodinger.application.matsci import jaguarworkflows
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import msutils
from schrodinger.application.matsci import msprops
from schrodinger.application.matsci import textlogger
from schrodinger.job import jobcontrol
from schrodinger.structutils import analyze
from schrodinger.structutils import rmsd
from schrodinger.test import ioredirect
from schrodinger.utils import units
TARGET_PROP = 'b_matsci_target'
H_TO_KCAL = units.KCAL_PER_MOL_PER_HARTREE
KCAL_TO_H = 1.0 / H_TO_KCAL
GROUND = 's0'
TRIPLET = 't1'
ANION = 'anion'
CATION = 'cation'
S1_STATE = 's1'
EXCITED = 'excited'
OXIDIZED = 'oxidized'
REDUCED = 'reduced'
STATES = [GROUND, TRIPLET, EXCITED, ANION, CATION, S1_STATE]
CHARGED = {ANION, CATION}
REACTANT_CHARGE = {ANION: -1, CATION: 1, GROUND: 0, TRIPLET: 0, EXCITED: 0}
REACTANT_MULTIPLICITY = {ANION: 2, CATION: 2, GROUND: 1, TRIPLET: 3, EXCITED: 1}
UHF = 'iuhf'
REACTANT = 'reactant'
REACTANTS = REACTANT.title() + 's'
TYPE_FRAGMENT = 'fragment'
TYPE_GAS = 'gas'
TYPE_SUBSTRATE = 'substrate'
FLAG_CLOSED_SHELL_CHARGE = '-cs_charge'
FLAG_EXCITED = '-excited'
PROPERTY_FILE_EXTENSION = '-properties.csv'
[docs]def split_spin_state(spinstate):
"""
Split a spin state such as 'S1' into ('S', 1)
:param str spinstate: The string to split - should be in the form of a
single first letter followed by an integer
:rtype: (str, int)
:return: The leading string character and the following int
"""
spin = spinstate[0]
state = int(spinstate[1:])
return (spin, state)
[docs]def spin_multiplicity(struct, charge=0):
"""
Return the multiplicity of the given structure with the given charge.
Structures will either be singlets (even number of electrons) or doublets
(odd number). No attempt is made to determine multiplicity beyond that.
:param `structure.Structure` struct: The structure to check
:param int charge: The charge on the structure
:rtype: int
:return: 1 if there is an even number of electrons, 2 if it is odd
"""
nelec = sum(x.atomic_number for x in struct.atom) - charge
return 1 + abs(nelec % 2)
[docs]def label_bond(bond, value):
"""
Add a label to a bond that will show in the workspace
:param `structure._StructureBond` bond: The bond to label
:param float value: The value to label the bond with
"""
bond.property[msprops.SHOW_BOND_LABEL_PROP] = msprops.BDE_BOND_LABEL
# Note - 1 decimal point due to MATSCI-10025
bond.property[msprops.BDE_BOND_LABEL] = f'{value:.1f}'
[docs]def set_pt_subgroup_info(struct, name, collapsed=True):
"""
Set the properties on the structure to have it incorporate in the proper
subgroup
:type struct: `schrodinger.structure.Structure`
:param struct: The structure to set the properties one
:type name: str
:param name: The name of the subgroup
:type collapsed: bool
:param collapsed: Whether the subgroup should be collapsed
"""
jobname = jobcontrol.get_jobname(filename='bde')
msutils.set_project_group_hierarchy(struct, [jobname, name],
collapsed=collapsed)
[docs]class LoggerUser(object):
""" Mixin for classes that need to use a logger """
[docs] def __init__(self, *args, logger=None, **kwargs):
"""
Create the LoggerUser
:param `log.logger` logger: The logger to use
"""
self.logger = logger
super().__init__(*args, **kwargs)
[docs] def log(self, msg):
"""
Log a message
:param str msg: The message to log
"""
textlogger.log(self.logger, msg)
[docs]class BondEnergies:
""" Keep track of a set of bond energies """
[docs] def __init__(self):
""" Create a BondEnergies instance """
# Keys are energy property names, values are the energy for that
# property
self.energies = defaultdict(lambda: numpy.inf)
[docs] def storeIfLowestEnergy(self, etype, energy):
"""
Check an energy to see if it is the lowest energy of this type, and
store it if it is
:param str etype: The type of energy (an energy property name)
:param float energy: The energy to store if it is the lowest of this
type
"""
self.energies[etype] = min(self.energies[etype], energy)
[docs] def getEnergy(self, etype):
"""
Get the lowest energy associated with the given type
:param str etype: The type of energy (an energy property name)
:rtype: float or numpy.inf
:return: The lowest energy found for the given type. numpy.inf is
returned if no energy of that type is found
"""
return self.energies[etype]
[docs] def getEnergies(self):
"""
Get an iterator of all energy type/energy combinations
:rtype: iterator
:return: Each item is (energy type, energy)
"""
return self.energies.items()
[docs] def hasFreeEnergy(self):
"""
Check if free energies exist
:rtype: bool
:return: Whether free energy has been recorded
"""
return msprops.BDE_FREE_ENERGY in self.energies
[docs]class BreakingBond(object):
"""
Defines and tracks the information for a bond that breaks
"""
[docs] def __init__(self, struct, bond, preserve_order=False):
"""
Create a BreakingBond object
:type struct: `schrodinger.structure.Structure`
:param struct: The structure containing the breaking bond
:type bond: `schrodinger.structure._StructureBond`
:param bond: The bond that will be broken
:param bool preserve_order: Preserve the found order of fragments. If
False, fragments will be sorted based on SMILES string.
"""
self.preserve_order = preserve_order
self.fragments = self.getFragments(struct, bond)
self.is_valid = all([
(x is not None and x.is_valid) for x in self.fragments
])
[docs] def getFragments(self, struct, bond):
"""
Define the two fragments that will be created when the bond breaks
:type struct: `schrodinger.structure.Structure`
:param struct: The structure containing the breaking bond
:type bond: `schrodinger.structure._StructureBond`
:param bond: The bond that will be broken
:rtype: list
:return: Each item of the list is a `Fragment` object defining one of
the fragments that is created by breaking the bond - or each item is
None if the bond is actually part of a macrocycle.
"""
ind1 = bond.atom1.index
ind2 = bond.atom2.index
# Find which structure atoms belong to which fragment
frag1_indexes = buildcomplex.find_atoms_to_remove(struct, ind2, ind1)
if len(frag1_indexes) == 1:
# Check to see if a macrocycle was found, in which case this is not
# actually a valid bond
if len(list(struct.atom[ind1].bonded_atoms)) > 1:
return [None, None]
frag1_set = set(frag1_indexes)
frag2_indexes = [
x for x in range(1, struct.atom_total + 1) if x not in frag1_set
]
# Create the fragments
frags = []
for target, indexes in zip([ind1, ind2],
[frag1_indexes, frag2_indexes]):
frag = Fragment(struct, indexes, [target])
frags.append(frag)
# Order the fragments in a meaningless but consistent way
if not self.preserve_order and frags[0].smiles > frags[1].smiles:
frags.reverse()
return frags
[docs]class Fragment(object):
"""
A fragment of a structure that will be created after dissociation
"""
[docs] def __init__(self, struct, indexes, targets):
"""
Create a Fragment object
:type struct: `schrodinger.structure.Structure`
:param struct: The structure containing the fragment
:type indexes: list
:param indexes: The list of atom indexes in struct to include in the
fragment
:type targets: list
:param targets: The atom index(es) in struct that are part of this
fragment and that are directly involved in the dissociation
"""
for target in targets:
struct.atom[target].property[TARGET_PROP] = True
self.struct = struct.extract(indexes)
for target in targets:
struct.atom[target].property[TARGET_PROP] = False
# self.parent_targets = target indexes in the original numbering scheme
self.parent_targets = targets
# self.parent_indexes = indexes in the parent structure that make up
# this fragment
self.parent_indexes = indexes
# self.targets = targets in the Fragment numbering scheme
self.smiles, self.targets = self.getSMILESForFrag()
self.is_valid = self.smiles is not None
self.closed_shell = spin_multiplicity(self.struct) == 1
self.fragment_type = TYPE_FRAGMENT
self.is_proton = (self.struct.atom_total == 1 and
self.struct.atom[1].element == 'H')
[docs] def getSMILESForFrag(self):
"""
Get the smiles string and list of targets for this fragment. Note that
this SMILES string
will have an addition At atom at the point of dissociation. This is done
to increase the robustness of the unique SMILES method. Without it, I
have found that a benzene ring radical can have different unique SMILES
strings depending on what atom the SMILES is on. Since we never generate
a structure from these SMILES strings, the extra atom isn't an issue.
:rtype: (str, list) or (None, [])
:return: (SMILES, list_of_targets). The SMILES string is the SMILES
string for this fragment with an At atom at the dissociation
point(s). Each item in list_of_targets is the atom index of a
target atom (atom at the point of dissociation) using the atom index
numbering in frag. (None, []) is returned if the SMILES string
fails to generate.
"""
fcopy = self.struct.copy()
my_targets = []
for atom in self.struct.atom:
if not atom.property.get(TARGET_PROP):
continue
# We add a halogen at the attachment point because it makes the
# process of forming unique SMILES strings more robust versus
# using a radical.
target = atom.index
fatom = fcopy.addAtom('At', 0.0, 0.0, 0.0)
fcopy.addBond(target, fatom.index, 1)
my_targets.append(atom.index)
try:
with ioredirect.IOSilence():
smiles = analyze.generate_smiles(fcopy)
except RuntimeError:
# A structure we can't obtain the SMILES string for
return None, []
return smiles, my_targets
[docs]class UniqueTracker(LoggerUser):
"""
Tracks the information for, and creates the Jaguar job for, a unique
fragment. Since the same fragment may be generated by multiple
dissociations, we use this class to track each unique fragment.
"""
[docs] def __init__(self,
struct,
keydict,
key,
options,
basename='fragment',
targets=None,
parent_indexes=None,
charge=None,
mult=None,
tddft=False,
vertical=False,
write_input_ok=True,
**kwargs):
"""
Create a UniqueTracker object
:type struct: `schrodinger.structure.Structure`
:param struct: The structure containing the leaving ligand
:type keydict: dict
:param keydict: The dictionary containing the base set of keywords
:type key: str
:param key: a unique identifier for this unique fragment
:param `argparse.Namespace` options: The command line options
:type basename: str
:param basename: The base name to use for files for this fragment.
Will be used as the fragment_type and combined with key to form the
file base names.
:type targets: list
:param targets: The atom indexes in struct that are at the dissociation
point
:type parent_indexes: list
:param parent_indexes: The atom indexes with the parent atom numbering
for the atoms in struct. Note this will only be valid for one
specific reaction, so must be overwritten in any NonUniqueTracker
objects that track this instance.
:type charge: int
:param charge: The charge of the fragment - will be used with the molchg
keyword
:type mult: int
:param mult: The multiplicity of the fragment - will be used with the
multip keyword
:param str tddft: The state to compute via TD-DFT. Should be a
spin-state string like S1, T2, etc. Only singlet (S) and triplet (T)
states are supported.
:type vertical: bool
:param vertical: True if this is tracking an object that should not have
its geometry optimized. If vertical is True, writing the Jaguar
input file is delayed and must be triggered manually later by a call
to setVerticalStructure.
:param bool write_input_ok: If True, write the input files for this
fragment. If False, do not.
"""
super().__init__(**kwargs)
if targets is None:
targets = []
self.targets = targets
self.tddft = tddft
self.title = struct.title
self.fragment_type = basename
self.basename = '%s_%s' % (basename, key)
self.job = None
self.parent_indexes = parent_indexes
self.options = options
self.unique = True
# Define the keywords based on the settings
keywords = keydict.copy()
if charge is not None:
keywords['molchg'] = charge
if mult is not None:
keywords['multip'] = mult
if keywords.get('multip', 0) < 2:
keywords.pop(UHF, None)
if self.tddft:
spin, state = split_spin_state(self.tddft)
if spin == 'S':
rkey = 'rsinglet'
else:
rkey = 'rtriplet'
nroot = state + 4
tddftkeys = {
'itddft': 1,
'itda': 1,
'nroot': nroot,
rkey: 1,
'target': state
}
for jkey, value in tddftkeys.items():
keywords.setdefault(jkey, value)
if vertical:
# Override any geometry optimization keyword if this is a vertical
# dissociation
keywords['igeopt'] = 0
self.keywords = keywords
if write_input_ok and not vertical:
self.writeInput(struct)
# Keys are tuples of bonded atom indexes, values are BondEnergies
self.bond_energies = defaultdict(BondEnergies)
[docs] def addBondEnergy(self, index1, index2, etype, energy):
"""
Add a computed bond dissociation energy for the given energy type. The
energy will not be added if a lower energy has already been found for
this bond and energy type.
:type index1: int or `schrodigner.structure._StructureAtom`
:param index1: The index of the first atom in the bond (or the atom
object)
:type index2: int or `schrodigner.structure._StructureAtom`
:param index2: The index of the second atom in the bond (or the atom
object)
:type etype: str
:param etype: The type of energy to add - typically an energy property
name
:type energy: float
:param energy: The bond dissociation energy for that bond
"""
key = (int(index1), int(index2))
self.bond_energies[key].storeIfLowestEnergy(etype, energy)
[docs] def getWeakestBonds(self, etype=msprops.BDE_ENERGY):
"""
Get the bond with the lowest energy for the given energy type
:param str etype: The energy type to use. Should be consistent with the
etype parameter passed in to addBondEnergy
:rtype: list, float
:return: Each item of the list is a (int, int) tuple that gives the
atom indexes involved the bond with the lowest energy. More than one
item in the list indicates that more than one bond shares that
energy. The energy is the energy of those bonds. The list will be
empty and the energy will be None if there are no BDE's defined for
this Reactant.
"""
energy = numpy.inf
indexes = []
for (ind1, ind2), ben in self.bond_energies.items():
ene = ben.getEnergy(etype)
if ene < energy:
# A new low energy is found
indexes = [(ind1, ind2)]
energy = ene
elif ene != numpy.inf and abs(ene - energy) < 0.001:
# Another bond with the same energy (probably either undetected
# symmetry or a bidentate ligand)
indexes.append((ind1, ind2))
if energy == numpy.inf:
energy = None
return indexes, energy
[docs] def updateVerticalStructure(self, parent_structure):
"""
Grab the structure for these atoms from the parent structure and write
out the Jaguar input file
:type parent_structure: `schrodinger.structure.Structure`
:param parent_structure: The parent structure to extract the fragment
structure from
"""
struct = parent_structure.extract(self.parent_indexes)
struct.title = self.title
self.writeInput(struct)
[docs] def addToQueue(self, jobq, backend):
"""
Check if output file exists and if not create a job for this fragment
and add it to the queue and add its output files to the backend
:type jobq: `schrodinger.job.queue.JobDJ`
:param jobq: The queue to add the job to
:type backend: `schrodinger.job.jobcontrol.Backend`
:param backend: The job control backend, if any
"""
# First check if there is an existing output file for this tracker -
# this might have been created in a previous aborted run.
try:
out_file = self.basename + '.out'
jaguarworkflows.get_jaguar_output(out_file)
self.log('Will use already completed output file: %s' % out_file)
return
except jaguarworkflows.JaguarFailedException:
# Continue submitting the jobs
pass
self.job = jaguarworkflows.create_job(self.options, self.filename)
jobq.addJob(self.job)
jaguarworkflows.add_jaguar_files_to_jc_backend(self.basename,
backend=backend)
[docs] def getStructureWithProps(self):
"""
Get the structure resulting from the Jaguar run including any existing
properties
:rtype: `schrodinger.structure.Structure` or None
:return: The output structure, or None if an error occurs
"""
try:
struct = jaguarworkflows.get_jaguar_out_mae(self.basename)
return struct
except (jaguarworkflows.JaguarFailedException, IOError,
jagresults.IncompleteOutput) as msg:
self.log(msg)
return None
[docs] def getStructureWithoutProps(self):
"""
Get the structure resulting from the Jaguar run but remove any existing
properties
:rtype: `schrodinger.structure.Structure` or None
:return: The output structure, or None if an error occurs
"""
struct = self.getStructureWithProps()
if not struct:
return struct
for prop in list(struct.property):
try:
del struct.property[prop]
except (AttributeError, ValueError, KeyError):
# All these are errors raised by del that we don't care about
pass
return struct
[docs] def superimposeOnParent(self, parent, struct):
"""
Superimpose this fragment on its parent structure to get the orientation
the same. Modifies struct directly
:type parent: `schrodinger.structure.Structure`
:param parent: The parent structure to superimpose on - must have the
same atom ordering as was used for the parent_indexes argument in
the class constructor.
:type struct: `schrodinger.structure.Structure`
:param struct: The structure for this fragment
"""
if len(self.parent_indexes) > 2:
rmsd.superimpose(parent, self.parent_indexes, struct,
list(range(1, struct.atom_total + 1)))
[docs] def getEnergies(self):
"""
Get the various energies from the Jaguar job. Energies will always
include the SCF energy (with solvent effect if included), plus possibly
the free energy, enthalpy and internal energy if frequencies were
computed.
:rtype: dict
:return: Keys of the dict are a BDE energy property, values are the
energy corresponding to that energy property.
"""
energies = {}
try:
output = jaguarworkflows.get_jaguar_output(self.basename)
except (jaguarworkflows.JaguarFailedException,
jagresults.IncompleteOutput):
return energies
results = output.last_results
if isinstance(results.thermo, list) and not self.tddft:
energies[msprops.BDE_INTERNAL_ENERGY] = results.thermo[0].UTotal
energies[msprops.BDE_ENTHALPY] = results.thermo[0].HTotal
energies[msprops.BDE_FREE_ENERGY] = results.thermo[0].GTotal
if results.solution_phase_energy:
energies[msprops.BDE_ENERGY] = results.solution_phase_energy
else:
energies[msprops.BDE_ENERGY] = results.energy
if self.tddft:
spin, state = split_spin_state(self.tddft)
if spin == 'S':
excitations = results.excitation_energies
else:
excitations = results.triplet_excitation_energies
try:
delta = excitations[state - 1]
except IndexError:
self.log(
'No excited state energies were found in %s - skipping' %
self.basename)
return None
delta_hartree = delta / units.EV_PER_HARTREE
energies[msprops.BDE_ENERGY] += delta_hartree
return energies
[docs] def write(self, writer):
"""
Write the structure with properties to the output file
:type writer: L{schrodinger.structure.StructureWriter
:param writer: The writer to use to write the output file
"""
struct = self.getStructureWithProps()
if struct:
# Add a visible BDE label to each bond for each energy type
etypes = set()
for (index1, index2), ben in self.bond_energies.items():
bond = struct.getBond(index1, index2)
for etype, energy in ben.getEnergies():
bond.property[etype] = energy
evtype = msprops.kcal_prop_to_ev_prop(etype)
bond.property[evtype] = energy / units.KCAL_PER_MOL_PER_EV
# Show free energy instead of SCF energy if available
if etype == msprops.BDE_FREE_ENERGY or (
etype == msprops.BDE_ENERGY and
not ben.hasFreeEnergy()):
label_bond(bond, energy)
etypes.add(etype)
# Add a property giving the weakest energy of any bond.
for etype in etypes:
indexes, energy = self.getWeakestBonds(etype=etype)
if energy is not None:
weak_prop = msprops.BDE_ENERGY_TO_WEAKEST_PROP[etype]
struct.property[weak_prop] = energy
# Set the PT group to be reactants or fragments
group = self.basename.split('_')[0].title() + 's'
set_pt_subgroup_info(struct, group)
writer.append(struct)
else:
self.log('Skipping %s, which appears to have failed' %
self.basename)
[docs]class NonUniqueTracker(UniqueTracker):
"""
This is a placeholder for a second (or third, etc.) time a fragment is used
in the reaction. Mimics the UniqueTracker job without creating any new
Jaguar jobs or writing to the output file.
"""
[docs] def __init__(self, unique_master, parent_indexes):
"""
Create a NonUniqueTracker object
:type unique_master: `UniqueTracker`
:param unique_master: The UniqueTracker object this NonUniqueTracker
should mimic
:type parent_indexes: list
:param parent_indexes: The atom indexes with the parent atom numbering
for the atoms in this fragment. Note this is only be valid for the
specific reaction this tracker is for, so must it overwrites the
parent_indexes of the UniqueTracker this object is mimicking
"""
self.targets = unique_master.targets
self.basename = unique_master.basename
self.parent_indexes = parent_indexes
# Fragments never use tddft
self.tddft = None
self.unique_master = unique_master
self.unique = False
[docs] def updateVerticalStructure(self, *args):
"""
Overwrite the parent method because this class doesn't deal with
structures but we want to be able to call this method safely.
"""
[docs] def addToQueue(self, *args, **kwargs):
"""
The whole point of this class is that it doesn't run a Jaguar job but
takes the results from a different job
"""
[docs] def write(self, writer):
"""
Don't write anything out - we're vaporware of the best kind
"""
[docs]class Reaction(LoggerUser):
"""
Holds the information for a single dissociation reaction
"""
[docs] def __init__(self, reactant_index, reactant_targets, product_trackers,
**kwargs):
"""
Create a Reaction object
:type reactant_index: str
:param reactant_index: the key into the reactants dictionary for the
reactant in this reaction
:type reactant_targets: list
:param reactant_targets: The atom indexes in the reactant structure that
dissociate in this reaction
:type product_trackers: list
:param product_trackers: Each item of this list is a UniqueTracker or
NonUniqueTracker object for one of the product fragments
:param `logging.logger` logger: The logger to use
"""
super().__init__(**kwargs)
self.reactant_index = reactant_index
self.product_trackers = product_trackers
self.reactant_targets = reactant_targets
[docs] def updateProductGeometries(self, reactants):
"""
Update the geometry of all product fragments to have the geometry of
that fragment in the reactant
:type reactants: dict
:param reactants: keys are reactant_index strings, values are
UniqueTracker objects for that reactant
"""
reactant = reactants[self.reactant_index]
rstruct = reactant.getStructureWithProps()
if rstruct:
for prod in self.product_trackers:
prod.updateVerticalStructure(rstruct)
else:
self.log('No structure found for %s, skipping' %
self.reactant_index)
[docs]class FragmentReactor(LoggerUser):
"""
Create reactions, track all reactant and fragment objects, and run jobs
on all reactants and products
Reactants are turned into products by the createProductsGroup method,
which must be implemented in subclasses
"""
ALL = 'all'
REACTANTS = 'reactants'
PRODUCTS = 'products'
[docs] def __init__(self,
structs,
states,
options,
keywords=None,
vertical=False,
logger=None,
tracker_class=UniqueTracker,
reactant_title_base=REACTANT):
"""
Create a fragment reactor instance
:param list structs: Each item is a reactant `Structure` object
:param list states: Each item is a module constant from the STATES list
:param `argparse.Namespace` options: The command line options
:param dict keywords: A dictionary of Jaguar keyword/value pairs
:param bool vertical: True if the products will use the reactant
geometry for those atoms, False if products will be optimized
:param `logging.logger` logger: The logger to use
:param tracker_class: The class to use for tracking unique fragments
:param reactant_title_base: The base title for reactant structures
"""
super().__init__(logger=logger)
self.structs = structs
self.tracker_class = tracker_class
self.reactant_title_base = reactant_title_base
self.vertical = vertical
self.keywords = keywords
self.options = options
self.states = states
# All fragments, keys are SMILES, values are index number
self.unique_fragments = {}
# Keys are SMILES with possibly a state modifier, values are
# tracker_class objects. There may be more than one unique tracker
# for each unique fragment because each state requires its own tracker
self.unique_trackers = {}
# Keys are strings formed from the index number of reactant plus
# possibly a state modifier, values are tracker_class objects
self.reactants = {}
# List of Reaction objects
self.reactions = []
[docs] def getAllUniqueTrackers(self):
"""
Get all unique reactant and product trackers
:rtype: list
:return: The list has one item per unique reactant or product,
the type of object is given by self.tracker_class
"""
return self.getReactantTrackers() + self.getUniqueProductTrackers()
[docs] def getReactantTrackers(self):
"""
Get all reactant trackers
:rtype: list
:return: The list has one item per reactant, the type of object is
given by self.tracker_class
"""
return list(self.reactants.values())
[docs] def getUniqueProductTrackers(self):
"""
Get all unique product trackers
:rtype: list
:return: The list has one item per unique product, the type of object is
given by self.tracker_class
"""
return list(self.unique_trackers.values())
[docs] def createAllReactions(self):
""" Create all reactions for all structures and states"""
for rind, struct in enumerate(self.structs, 1):
self.preprocessInputStruct(struct)
product_groups = self.createProductGroups(struct)
for state in self.states:
if state == EXCITED:
# tddft should be something like S1, T3, etc
tddft = jobutils.get_option(self.options, FLAG_EXCITED)
if not tddft:
raise RuntimeError('No excited state specified in '
f'options for state={EXCITED}')
else:
tddft = ""
self.reactions.extend(
self.createStateReactions(struct, product_groups, state,
rind, tddft))
[docs] def createStateReactions(self, struct, product_groups, state, rind, tddft):
"""
Create the reactions for this structure and state
:param structure.Structure struct: The reactant structure
:param list product_groups: Each item is a group of products
:param str state: The current state, should be one of the STATES items
:param int rind: The index for this reactant
:param str tddft: The target excited state
:rtype: list
:return: A list of Reaction objects created
"""
rtracker, unique_key, charge, mult = self.createReactantTracker(
struct, state, rind, tddft)
self.reactants[unique_key] = rtracker
product_index = 1
skipped_protons = 0
all_rxns = []
for products in product_groups:
reactant_targets = []
# For neutral, closed shell reactants, we need to create one
# reaction.
# Rxn1: Reactant -> P1(.) + P2(.)
# For charged or open shell reactants, we need to create
# two reactions, each reaction has the charge
# or unpaired electron on a different fragment. ex. for a
# radical cation:
# Rxn1: Reactant -> P1(.) + P2(+)
# Rxn2: Reactant -> P1(+) + P2(.)
potential_reactions = {0: [], 1: []}
skipped_protons += self.preventUnwantedReactions(
potential_reactions, products, charge)
# Iterate over both product fragments
for findex, fragment in enumerate(products.fragments):
# The order we create product trackers is:
# fragment 1, reaction 1 (uncharged)
# fragment 1, reaction 2 (charged)
# fragment 2, reaction 1 (charged)
# fragment 2, reaction 2 (uncharged)
reactant_targets.extend(fragment.parent_targets)
# Iterate over both reactions
for radical in [0, 1]:
if radical not in potential_reactions:
# This is an unwanted reaction
continue
if findex == radical:
this_charge = 0
else:
this_charge = charge
ptracker = self.getFragmentTracker(fragment,
state,
product_index,
charge=this_charge)
product_index += 1
potential_reactions[radical].append(ptracker)
rxns = [
Reaction(unique_key, reactant_targets, x, logger=self.logger)
for x in potential_reactions.values()
]
all_rxns.extend(rxns)
if skipped_protons and self.logger:
rword = inflect.engine().plural('reaction', skipped_protons)
self.log(f'Skipped {skipped_protons} {rword} involving H+')
return all_rxns
[docs] def getStateModifier(self, state, tddft):
"""
Get a tag modifier based on the current state
:param str state: The current state, should be one of the STATES items
:param str tddft: The target excited state
:rtype: str
:return: A tag modifier based on the current state
"""
if state == GROUND:
state_modifier = ""
elif state == EXCITED:
state_modifier = f' {tddft}'
elif state == CATION:
state_modifier = f' {OXIDIZED.title()}'
elif state == ANION:
state_modifier = f' {REDUCED.title()}'
else:
state_modifier = ' %s' % state.capitalize()
return state_modifier
[docs] def getStateChargeAndMultiplicity(self, state):
"""
Get the charge and multiplicity for the current state
:param str state: The current state, should be one of the STATES items
:rtype: (int, int)
:return: The charge and multiplicity for this state
"""
state_charge = REACTANT_CHARGE[state]
cs_charge = jobutils.get_option(self.options, FLAG_CLOSED_SHELL_CHARGE)
if cs_charge is None:
cs_charge = 0
charge = state_charge + cs_charge
if abs(charge) > 1:
raise RuntimeError('System charge must be -1, 0 or 1')
mult = REACTANT_MULTIPLICITY[state]
return charge, mult
[docs] def createReactantTracker(self, struct, state, rind, tddft):
"""
Create the reactant tracker for this structure and state
:param structure.Structure struct: The reactant structure
:param str state: The current state, should be one of the STATES items
:param int rind: The index for this reactant
:param str tddft: The target excited state
:rtype: (object, str, int, int)
:return: The object is the tracker for the reactant. Object type is
given by self.tracker_class. The string is the unique key for
this reactant. The two ints are charge and multiplicity
"""
reactant_struct = struct.copy()
# Create a unique identifier for this reactant and state
state_modifier = self.getStateModifier(state, tddft)
unique_key = str(rind) + state_modifier
# Give the reactant a descriptive title
title = reactant_struct.title
if title:
title = title + state_modifier
else:
title = f'Reactant {unique_key}'
reactant_struct.title = title
charge, mult = self.getStateChargeAndMultiplicity(state)
reactant_tracker = self.tracker_class(reactant_struct,
self.keywords,
unique_key.replace(' ',
'_').lower(),
self.options,
basename=self.reactant_title_base,
charge=charge,
mult=mult,
tddft=tddft)
return reactant_tracker, unique_key, charge, mult
[docs] def createProductGroups(self, struct):
"""
Form all the products that can be created from struct
Base class implementation does nothing
:param `structure.Structure` struct: The input structure
:rtype: list
:return: Each item should be a BreakingBond or LeavingLigand object,
one for each possible fragmentation
"""
raise NotImplementedError
return []
[docs] def preventUnwantedReactions(self, potential_reactions, products, charge):
"""
Modify the potential_reactions dictionary to prevent creating reactions
that we would not want
:param dict potential_reactions: keys are reaction index. This
dictionary may be modified by this function to remove the key for
an unwanted reaction
:type products: `BreakingBond` or `LeavingLigand`
:param products: The products that will be formed
:param int charge: The overall charge of this reaction
:rtype: int
:return: The number of reactions skipped due to avoiding H+
"""
skipped_protons = 0
# Do not create any reaction that would involve an H+
if charge == 1:
for frag, is_cation_site in zip(products.fragments, (1, 0)):
if frag.is_proton:
del potential_reactions[is_cation_site]
skipped_protons += 1
elif charge == 0:
# What we have is a (possibly radical) neutral. There's only one
# possible reaction that doesn't involve adding charges to
# both products. We'll just create one reaction with both
# fragments neutral. If the reactant is a radical, which product
# gets the radical will work out naturally based on which fragment
# has an odd number of protons
del potential_reactions[1]
# Don't create two reactions if they would be identical
if len(potential_reactions) == 2 and not self.vertical:
if products.fragments[0].smiles == products.fragments[1].smiles:
del potential_reactions[1]
return skipped_protons
[docs] def getExistingFragmentTracker(self, fragment, tagged_smiles):
"""
Return any tracker for this fragment with the given state and charge
:type fragment: `BreakingBond` or `LeavingLigand`
:param fragment: The fragment to generate tags for
:param str tagged_smiles: The smiles string for this fragment that
has been modified by adding the fragment tag
:rtype: object or None, int
:return: A tracker for this fragment with the given tagged_smiles if
one exists or None if no such tracker exists, and the fragment
index for this fragment. Note that fragment index is always valid
even if there is no tracker, and the type of tracker object returned
is given by self.tracker_class.
"""
try:
# Determine the fragment index for this smiles string
findex = self.unique_fragments[fragment.smiles]
except KeyError:
# A new smiles string, create a unique index for it
findex = len([
1 for x in self.unique_trackers.values()
if x.fragment_type == fragment.fragment_type
]) + 1
self.unique_fragments[fragment.smiles] = findex
existing_tracker = None
else:
# Check for an existing tracker for this state of this smiles string
existing_tracker = self.unique_trackers.get(tagged_smiles)
return existing_tracker, findex
[docs] def getFragmentTracker(self, fragment, state, index, charge=0):
"""
Return a tracker for this fragment with the given state and charge
:type fragment: `BreakingBond` or `LeavingLigand`
:param fragment: The fragment to generate tags for
:param str state: The current state, should be one of the STATES items
:param int index: The product index for the fragment
:param int charge: The charge of this fragment
:return: A tracker for this fragment with the given charge state. The
type of object returns is given by self.tracker_class
"""
tag, tagged_smiles = self.getFragmentTags(fragment, state, index,
charge)
existing_tracker, findex = self.getExistingFragmentTracker(
fragment, tagged_smiles)
if existing_tracker:
# This fragment already exists and is tracked by another
# reaction. Just return a tracker that mimics the real one.
return NonUniqueTracker(existing_tracker, fragment.parent_indexes)
# No tracker exists for this fragment/state/charge, make one
id_tag = '%d%s' % (findex, tag)
# Create a descriptive structure title
key = id_tag.replace(' ', '_').lower()
title = f'{fragment.fragment_type} {id_tag}'
backend = jobcontrol.get_backend()
if backend:
job = backend.getJob()
title = '%s %s' % (title, job.Name)
fragment.struct.title = title
# The below statement is True if only ONE of the two things is True
if fragment.closed_shell ^ charge:
mult = 1
else:
mult = 2
tracker = self.tracker_class(fragment.struct,
self.keywords,
key,
self.options,
basename=fragment.fragment_type,
targets=fragment.targets,
parent_indexes=fragment.parent_indexes,
charge=charge,
mult=mult,
vertical=self.vertical,
logger=self.logger)
self.unique_trackers[tagged_smiles] = tracker
return tracker
[docs] def runJobs(self, logfn, what=ALL):
"""
Create all jaguar jobs and run them
:param callable logfn: A logging function - should take a string to
log and an optional timestamp argument
:param str what: A class constant (ALL, REACTANTS, PRODUCTS) that says
which jobs to run
:rtype: bool or None
:return: None if no jobs were run, True if at least one job did not
fail, otherwise False
"""
jobq = jobutils.create_queue(self.options, host=jobutils.AUTOHOST)
backend = jobcontrol.get_backend()
trackers = []
if what in (self.ALL, self.REACTANTS):
trackers.extend(self.getReactantTrackers())
if what in (self.ALL, self.PRODUCTS):
trackers.extend(self.getUniqueProductTrackers())
for tracker in trackers:
tracker.addToQueue(jobq, backend)
logfn("")
try:
textlogger.run_jobs_with_update_logging(jobq, logfn)
except RuntimeError as msg:
return None
if jobq.total_added == 0 or len(jobq._failed) < jobq.total_added:
return True
else:
return False
[docs] def writeSingleStructures(self, writer):
"""
Write all single structures to the output file
:type writer: `schrodinger.structure.StructureWriter`
:param writer: The structure writer to use to write structures with
"""
for tracker in self.getAllUniqueTrackers():
tracker.write(writer)