"""
Classes and functions for building reaction channels.
Copyright Schrodinger, LLC. All rights reserved."""
# Contributor: Thomas F. Hughes
import os
import re
import sys
import time
import warnings
from collections import OrderedDict
import numpy
import schrodinger.structure as structure
from schrodinger.application.matsci import amorphous
from schrodinger.application.matsci import msprops
from schrodinger.infra import fast3d
from schrodinger.infra import mm
from schrodinger.structutils import analyze
from schrodinger.structutils import build
from schrodinger.structutils import minimize
from schrodinger.structutils import rings
from schrodinger.structutils import ringspear
from schrodinger.structutils import transform
from schrodinger.test import ioredirect
from schrodinger.utils import fileutils
# MATSCI-5124 - in contrast to the random module
# the methods in the numpy.random module are consistent
# between Python 2 and 3
my_random = numpy.random.RandomState()
# ring spear elimination failure key
FAILED_SPEAR_ELIM_KEY = 'b_matsci_Failed_Ring_Spear_Elimination'
# Two vectors with a dot product < this are considered perpendicular
PERPENDICULAR_DOT_PROD_THRESH = 0.05
[docs]def detect_tetrahedral(bond_vecs):
"""
Detect tetrahedral geometry
:param list(numpy.array) bond_vecs: Each item is a numpy.array(x, y, z) for
the vector from the atom in question along a bond
:rtype: bool
:return: Whether the atom with these bond vectors is tetrahedral (4 bonds
with a bond angle sum >> 360
"""
# Must have 4 bonds to be tetrahedral
if len(bond_vecs) != 4:
return False
# Accumulate the angles between all bond vectors
angles = []
for index, vec1 in enumerate(bond_vecs):
for vec2 in bond_vecs[index + 1:]:
radians = transform.get_angle_between_vectors(vec1, vec2)
angles.append(numpy.degrees(radians))
# For the square planar case (which is what we are trying to distinguish
# from, there will be 4 nearest neighbor (cis) angles that are near 90, and
# 2 angles with trans bonds that are near 180. The tetrahedral case will
# have all 6 angles near 109.5. So sort the bond angles and check the sum of
# the smallest 4 angles.
angles.sort()
nearest_neighbor_angles = angles[:4]
angle_sum = sum(nearest_neighbor_angles)
# Square planer = 360, tetrahedral ~= 438
return angle_sum > 400
[docs]def trigonal_bonding_vector(bond_vecs, director=None):
"""
Find the optimal 4th bond vector to add to a system with 3 existing bond
vectors.
For the case of an atom with 3 bonds, the 4th bond vector should be normal
to the plane formed by the 3 bonded atoms. The is true whether the system is
trigonal planar or trigonal pyramidal.
:param list(numpy.array) bond_vecs: Each item is a numpy.array(x, y, z) for
the vector from the atom in question along a bond
:param numpy.array director: In the case where the chosen bond vector is at
a 90 degree angle to the existing bond_vecs, the up/down direction of
the bond vector is ambiguous. Choose the direction that best aligns with
director.
:rtype: numpy.array
:return: The optimal next bonding vector pointing away from the atom
:raise RuntimeError: If bond_vecs doesn't have 3 items
"""
if len(bond_vecs) != 3:
raise RuntimeError('bond_vecs must be of length 3')
# vec1 points from the first bonding atom to the second, vec2 from the
# second to the third. The two vectors define the plane of the three atoms
vec1 = bond_vecs[0] - bond_vecs[1]
vec2 = bond_vecs[1] - bond_vecs[2]
# Find the normal to the plane.
normal = numpy.cross(vec1, vec2)
vector = transform.get_normalized_vector(normal)
# The normal can point above or below the plane. Place the normal on the
# central atom and check wither it is aligned (dot product > 0) or
# anti-aligned (dot product < 0). We want the vector to be anti-aligned so
# that it points away from the other bonds. (Note - reversing vectors with
# dot product = 0 (completely planar bond vecs) keeps results consistent
# with those from 19-4 and previous)
dotprod = vector.dot(bond_vecs[0])
if abs(dotprod) < PERPENDICULAR_DOT_PROD_THRESH and director is not None:
# The vector is almost perpendicular to a bond vector, which means all
# bond vectors lie almost in the same plane. Choose the perpendicular
# direction that is best aligned with the supplied director
vector = find_best_match_with_director([vector, -vector], director)
elif dotprod >= 0:
# The vector points in the same direction as the bond vectors, reverse
# it
vector = -vector
return vector
[docs]def tetrahedral_bonding_vector(bond_vecs, director=None):
"""
Find the optimal 5th bond vector to add to a system with 4 existing
tetrahedral bond vectors.
For tetrahedral systems, the 5th vector gets added anti-parallel to one of
the bonding vectors (classic Sn2-type reaction geometry). The first bond is
chosen unless director is given, in which case the bond is chosen so that
the new bond vector best aligns with the director.
:param list(numpy.array) bond_vecs: Each item is a numpy.array(x, y, z) for
the vector from the atom in question along a bond
:param numpy.array director: Chose the antiparallel bond vector that best
aligns with this vector.
:rtype: numpy.array
:return: The optimal next bonding vector pointing away from the atom
:raise RuntimeError: If bond_vecs doesn't have 4 items
"""
if len(bond_vecs) != 4:
raise RuntimeError('bond_vecs must be of length 4')
if director is None:
return -bond_vecs[0]
# Take the negative of each bond vector and use the one with the most
# positive dot product (best alignment) with the director
candidates = [-x for x in bond_vecs]
return find_best_match_with_director(candidates, director)
[docs]def find_best_match_with_director(candidates, director):
"""
Find the vector in candidates that has the best alignment with director
:param list(numpy.array) candidates: The vectors to choose from
:param numpy.array director: The chosen vector should be the one that aligns
most strongly with this vector
:rtype: numpy.array
:return: The vector from candidates that best aligns with director
"""
best_dot_prod = -numpy.inf
best_vector = None
for candidate in candidates:
dotp = director.dot(candidate)
if dotp > best_dot_prod:
best_dot_prod = dotp
best_vector = candidate
return best_vector
def _find_perpendicular_bond_vector(bond_vecs, director=None):
"""
For the given bond vectors, find pairs (or pairs of (bond vector, cardinal
axis)) that are non-colinear and produce perpendicular vectors by taking the
cross product. Choose an arbitrary resulting vector or the one that best
aligns with the director if supplied.
Because the three cardinal axes are used as a backup, this method is
gauranteed to find a pependicular vector.
:param list bond_vecs: Each item is a numpy.array for one bond vector
:param numpy.array director: If supplied, choose the resulting perpendicular
vector that best aligns (positive dot product) with this vector
:rtype: numpy.array
:return: The perpendicular bond vector (not normalized)
"""
def colinear_vectors(vec_one, vec_two):
"""
Check if two vectors are colinear or not
:param numpy.array vec_one: The first vector
:param numpy.array vec_two: The first vector
:rtype: bool
:return: If the two vector are colinear
"""
colinear_thresh = 0.01
if abs(1 - abs(numpy.dot(vec_one, vec_two))) > colinear_thresh:
return False
else:
return True
def _find_candidates(vec1, other_vecs, candidates):
"""
Find cross products between vec1 and and other vector that is
not co-linear with it
:param numpy.array vec1: The reference vector
:param list other_vecs: numpy arrays that represent other
vectors than can be used to obtain a cross product with vec1
:param list candidates: A list of cross-products that have been
found so far. Any new cross products will be added to it
"""
for ovec in other_vecs:
if not colinear_vectors(vec1, ovec):
# Cross product gives a vector normal to the plane of
# the two vectors. There's no reason to assume one of the two
# possible directions for a cross product is the better one, so
# use them both.
crossp = numpy.cross(vec1, ovec)
candidates.append(crossp)
candidates.append(-crossp)
candidates = []
# Prefer to use existing bond vectors
for index, bv1 in enumerate(bond_vecs):
_find_candidates(bv1, bond_vecs[index + 1:], candidates)
# If no two bonds are not colinear, use cardinal axes. It's guaranteed one
# of them will be non-colinear
if not candidates:
cards = [transform.X_AXIS, transform.Y_AXIS, transform.Z_AXIS]
for bv1 in bond_vecs:
_find_candidates(bv1, cards, candidates)
if director is None:
# If there is no director, choose an arbitrary vector
new_bond_vector = candidates[0]
else:
# If there is a director, choose the vector that aligns best
# with the director
new_bond_vector = find_best_match_with_director(candidates, director)
return new_bond_vector
[docs]def find_good_bond_vector(atom, length=1.0, director=None, pbc=None):
"""
Find a good vector to add a new bond given the existing bonds to this atom
:param 'schrodinger.structure._StructureAtom` atom: The atom to find the
bond to
:param float length: The desired length of the bond vector
:param numpy.array director: In some cases, such as trigonal planar and
tetrahedral, a simple choice is made by this function between several
directions. If a director is supplied, the choice that best aligns with
the director will be chosen.
:param `infrastructure.PBC` pbc: If given, will be used to find the closest
image of all neighboring atoms
:rtype: numpy.array
:return: An array pointing from the given atom along the new bond vector
"""
bond_sum_thresh = 0.1
atom_vec = numpy.array(atom.xyz)
# if the molecule is just an atom then grab the x-axis as a
# handle, otherwise obtain the handle from the list of existing
# bonds taking account for symmetric cases. there are a few
# different protocols that can be used here but I think this
# one will work nicely though not exhaustively
if not atom.bond_total:
if director is None:
new_bond_vector = numpy.array(transform.X_AXIS)
else:
new_bond_vector = numpy.copy(director)
else:
# make a handle that is opposite the sum of existing bond
# vectors
bond_vecs = []
for neighbor in atom.bonded_atoms:
neighbor_vec = numpy.array(neighbor.xyz)
if pbc:
# Make sure the correct image is used for the neighbor atom
bond_vec = pbc.getShortestVector(*atom_vec, *neighbor_vec)
else:
bond_vec = neighbor_vec - atom_vec
bond_vec = transform.get_normalized_vector(bond_vec)
bond_vecs.append(bond_vec)
num_bonds = len(bond_vecs)
if num_bonds == 3:
new_bond_vector = trigonal_bonding_vector(bond_vecs,
director=director)
elif detect_tetrahedral(bond_vecs):
new_bond_vector = tetrahedral_bonding_vector(bond_vecs,
director=director)
else:
new_bond_vector = -1 * sum(bond_vecs)
# if that handle vector is small, for example for a fully symmetric
# system, then pick a preexisting bond and find another non-colinear
# bond or cardinal vector for which their normal is non-zero.
if transform.get_vector_magnitude(new_bond_vector) < bond_sum_thresh:
new_bond_vector = _find_perpendicular_bond_vector(
bond_vecs, director)
new_bond_vector = transform.get_normalized_vector(new_bond_vector)
return new_bond_vector * length
[docs]def random_choice(collection):
"""
Return a random element from the given collection. Wraps
numpy.random.RandomState.choice to allow for multi-dimensional
input, i.e. cases where elements are also iterables. Choices
are made using a uniform distribution.
:type collection: iterable
:param collection: the collection from which to randomly choose
an element
:rtype: any
:return: the chosen element, potentially an iterable
"""
# numpy.random.RandomState.choice requires that input be 1D so
# choose from an iterable of iterables by indexing
elements = list(collection)
idx = my_random.randint(len(elements))
return elements[idx]
[docs]class Constants(object):
"""
Manage some constants.
"""
CHANNEL_FILE = 'CHANNEL_FILE'
NUM_RANDOM_CHANNELS = 0
ALL_RANDOM_CHANNELS = 'all'
NUM_RANDOM_CHANNELS_METAVAR = 'INT or \'%s\'' % ALL_RANDOM_CHANNELS
ASSOCIATION_TYPE = 'association'
DISSOCIATION_TYPE = 'dissociation'
SINGLE_DISPLACEMENT_TYPE = 'single_displacement'
DOUBLE_DISPLACEMENT_TYPE = 'double_displacement'
ALL_TYPES = 'all'
RANDOM_TYPES_CHOICES = [
ASSOCIATION_TYPE, DISSOCIATION_TYPE, SINGLE_DISPLACEMENT_TYPE,
DOUBLE_DISPLACEMENT_TYPE, ALL_TYPES
]
DEFAULT_RANDOM_TYPES = [DOUBLE_DISPLACEMENT_TYPE]
RANDOM_SEED = None
ALLOW_ADSORPTION_ONTO = OrderedDict([
('graphene', ('[c-0X3][c-0X3]([c-0X3]([c-0X3]([c-0X3])'
'[c-0X3])[c-0X3]([c-0X3])[c-0X3])[c-0X3]'))
])
ALL_SMARTS = 'all'
SMARTS_WILDCARD = '[*]'
ALLOW_ADSORPTION_ONTO_METAVAR = 'SMARTS or \'%s\'' % ALL_SMARTS
DISSOCIATION_BOND_LENGTH = 10.0
NO_MINIMIZATION = False
ISOLATE_PRODUCTS = False
NO_REACTIVE_H = False
SUPPORTED_IN_EXTS = ['.mae', '.mae.gz', '.maegz']
UNIQUE = False
[docs]def init_random_seed(random_seed, logger):
"""
Process the seed to the random number generator.
:type random_seed: None or int
:param random_seed: the seed for the random number generator
:type logger: logging.Logger
:param logger: output logger
:rtype: int
:return: final random seed
"""
# system time comes out with 6 digits, just use the integer part
if random_seed is None:
random_seed = int(time.time())
# print some stuff
SEED_MSG = ('Seed used is:')
if logger:
logger.info(SEED_MSG)
logger.info('-' * len(SEED_MSG))
logger.info('%s' % random_seed)
logger.info('')
return random_seed
[docs]def is_polymer_builder_grow_atom(atom):
"""
Return True if the given atom is a grow atom prepared by
the polymer builder module.
:type atom: schrodinger.structure._StructureAtom
:param atom: the atom
:rtype: bool
:return: True if a polymer builder grow atom
"""
head = atom.property.get(amorphous.HEAD_ATOM_PROP)
return ((head and atom.property.get(amorphous.COUPLER_ATOM_PROP)) or
(head and atom.property.get(amorphous.GROWER_ATOM_PROP)))
[docs]def is_mark_monomer_grow_atom(atom):
"""
Return True if the given atom is a grow atom prepared by
the mark monomer module.
:type atom: schrodinger.structure._StructureAtom
:param atom: the atom
:rtype: bool
:return: True if a mark monomer grow atom
"""
head_or_tail = atom.property.get(msprops.ROLE_PROP)
return head_or_tail in [msprops.HEAD, msprops.TAIL]
[docs]def get_grow_idxs(st, idxs=None):
"""
Return a list of grow indices for the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:type idxs: list
:param idxs: the indices to check
:rtype: list
:return: grow indices
"""
if idxs is None:
idxs = range(1, st.atom_total + 1)
grow_idxs = []
for idx in idxs:
atom = st.atom[idx]
if is_polymer_builder_grow_atom(atom) or is_mark_monomer_grow_atom(
atom):
grow_idxs.append(idx)
return grow_idxs
[docs]def remove_grow_properties(st, idxs=None):
"""
Remove the grow properties from atoms in the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:type idxs: list
:param idxs: the atom indices
"""
keys = (amorphous.HEAD_ATOM_PROP, amorphous.COUPLER_ATOM_PROP,
amorphous.GROWER_ATOM_PROP, msprops.ROLE_PROP)
if idxs is None:
idxs = range(1, st.atom_total + 1)
for idx in idxs:
atom = st.atom[idx]
for key in keys:
atom.property.pop(key, None)
[docs]def serves_as_backbone_bond(st, idx, jdx):
"""
Return True if the given bond serves as a backbone bond.
:type st: schrodinger.structure.Structure
:param st: the structure
:type idx: int
:param idx: the first atom index of the bond
:type jdx: int
:param jdx: the second atom index of the bond
:rtype: bool
:return: True if serves as a backbone bond
"""
# if the molecule containing the given bond is not a
# monomer then return True
mol_idxs = st.atom[idx].getMolecule().getAtomIndices()
if not get_grow_idxs(st, idxs=mol_idxs):
return True
a_grow_idxs = get_grow_idxs(st,
idxs=indicies_from_bonds_deep(st, idx, [jdx]))
b_grow_idxs = get_grow_idxs(st,
idxs=indicies_from_bonds_deep(st, jdx, [idx]))
return a_grow_idxs and b_grow_idxs
[docs]def serves_as_sidechain_bond(st, idx, jdx):
"""
Return True if the given bond serves as a sidechain bond.
:type st: schrodinger.structure.Structure
:param st: the structure
:type idx: int
:param idx: the first atom index of the bond
:type jdx: int
:param jdx: the second atom index of the bond
:rtype: bool
:return: True if serves as a sidechain bond
"""
# if the molecule containing the given bond is not a
# monomer then return True
mol_idxs = st.atom[idx].getMolecule().getAtomIndices()
if not get_grow_idxs(st, idxs=mol_idxs):
return True
a_grow_idxs = get_grow_idxs(st,
idxs=indicies_from_bonds_deep(st, idx, [jdx]))
b_grow_idxs = get_grow_idxs(st,
idxs=indicies_from_bonds_deep(st, jdx, [idx]))
# if the given bond contains grow atoms then return False,
# this could happen for a bond that acts as an end cap
# for the growth
if idx in a_grow_idxs or jdx in b_grow_idxs:
return False
return (len(a_grow_idxs) >= 2 and
not b_grow_idxs) or (not a_grow_idxs and len(b_grow_idxs) >= 2)
[docs]def valid_polymer_monomer_double_displacement_rxn(st,
idx,
jdx,
kdx,
ldx,
allow_backbone=True,
allow_sidechain=True):
"""
Return True if the given input is for a valid double displacement
polymer monomer reaction.
:type st: schrodinger.structure.Structure
:param st: the structure containing two molecules
:type idx: int
:param idx: the first atom index of the bond in the first
molecule
:type jdx: int
:param jdx: the second atom index of the bond in the first
molecule
:type kdx: int
:param kdx: the first atom index of the bond in the second
molecule
:type ldx: int
:param ldx: the second atom index of the bond in the second
molecule
:type allow_backbone: bool
:param allow_backbone: whether to allow backbone reactions
:type allow_sidechain: bool
:param allow_sidechain: whether to allow sidechain reactions
:rtype: bool
:return: True if a valid reaction
"""
# if not a polymer monomer reaction
if not get_grow_idxs(st):
return True
# use only the minimal number of polymer atom properties, for example
# those that are used in get_grow_idxs, so as to prevent having to update
# all of them as new monomers are successively created, specifically
# during fragment mutation for example
if allow_backbone and serves_as_backbone_bond(st, idx, jdx) and \
serves_as_backbone_bond(st, kdx, ldx):
return True
if allow_sidechain and serves_as_sidechain_bond(st, idx, jdx) and \
serves_as_sidechain_bond(st, kdx, ldx):
return True
return False
[docs]def query_monomers(sts):
"""
Return a list of bools indicating whether the given structures
are monomers, i.e. have at least two grow indices.
:type sts: list
:param sts: contains schrodinger.structure.Structure
:rtype: list
:return: contains bools indicating whether the given structures
are monomers
"""
return [len(get_grow_idxs(st)) >= 2 for st in sts]
[docs]def are_monomers(sts):
"""
Return True if the given structures are monomers, i.e.
have at least two grow indices.
:type sts: list
:param sts: contains schrodinger.structure.Structure
:rtype: bool
:return: True if the given structures are monomers
"""
return all(query_monomers(sts))
[docs]def get_deduped_monomers(sts):
"""
Return a list of smiles deduped monomers.
:type sts: list
:param sts: contains schrodinger.structure.Structure
:rtype: list, list
:return: contains smiles deduped monomers, contains filtered
titles
"""
# this is a safety measure for symmetric reactants to ensure
# that when deduping that a true monomer, with proper atom
# properties, isn't replaced by a fake monomer with the same
# smiles but that lacks proper atom properties
smiles_sts_dict = OrderedDict()
for st in sts:
smiles = analyze.generate_smiles(st)
smiles_sts_dict.setdefault(smiles, []).append(st)
f_sts = []
titles = []
for t_sts in smiles_sts_dict.values():
f_st = None
for st in t_sts:
if not f_st and are_monomers([st]):
f_st = st
else:
titles.append(st.property[CheckInput.TITLE_KEY])
if not f_st:
f_st = t_sts[0]
titles.remove(f_st.property[CheckInput.TITLE_KEY])
f_sts.append(f_st)
return f_sts, titles
[docs]class CacheInfo(object):
""" Holds caches of computed data to reduce timings """
[docs] def __init__(self):
""" Create a CacheInfo instance """
# keys = atom indexes, values = SMARTS pattern for that atom
self.smarts_cache = {}
# atom objects that are adsorption sites
self.ads_cache = set()
# Whether the ads_cache has been filled yet or not
self.ads_cache_filled = False
# keys = (atom index1, atom index2) for bonds, values = (SMILES, SMILES)
# for the two fragments created by breaking that bond
self.smiles_cache = {}
# keys = frozenset([atom index1, atom index2]) for bonds,
# see checkIfNotSimpleBond for values
self.bond_cache = {}
[docs] def fillAdsCache(self, struct, matches):
"""
Fill the adsorption site cache with atoms from struct
:type struct: `schrodinger.structure.Structure`
:param struct: The structure with the atoms to check
:param set matches: Atom indexes that are automatically considered
adsorption sites
"""
if not self.ads_cache_filled:
data = [
x for x in struct.atom if
not CheckInput().checkIfNotAdsorbable(struct, x.index, matches)
]
self.ads_cache.update(data)
self.ads_cache_filled = True
[docs]class ChannelDefinitions(object):
"""
Manage a collection of reaction channel definitions.
"""
BLANK_PATTERN = re.compile(r'\s*\n$')
RANDOM_ALL_TO_ON_THE_FLY_THRESH = 50
MAX_NUM_UNPRODUCTIVE_CYCLES = 1000
DESCRIPTOR_DICT = {
Constants.DISSOCIATION_TYPE: 'Dissociation: ',
Constants.ASSOCIATION_TYPE: 'Association: ',
Constants.SINGLE_DISPLACEMENT_TYPE: 'Single Displacement: ',
Constants.DOUBLE_DISPLACEMENT_TYPE: 'Double Displacement: '
}
[docs] def __init__(self, logger=None):
"""
Create an instance.
:type logger: logging.getLogger
:param logger: output logger
"""
self.logger = logger
self.definitions = []
self.bad_defs = []
self.allow_adsorption_onto = None
self.matches = None
[docs] @staticmethod
def flattenChannels(all_channel_defs):
"""
Return a flattened list of channels.
:type all_channel_defs: OrderedDict
:param all_channel_defs: all possible channel definitions,
keyed first by channel type and second by reaction type
:rtype: list
:return: a flattened list of channels
"""
return [
rxn for rxn_dict in list(all_channel_defs.values())
for rxns in list(rxn_dict.values()) for rxn in rxns
]
[docs] def addDefinition(self, str_definition):
"""
Add a reaction channel definition to the list of definitions.
:type str_definition: str
:param str_definition: the string representation of the definition
:rtype: bool
:return: not_accepted, False for an accepted definition, True for
a definition that was not accepted
"""
definition = ChannelDefinition(str_definition, self.logger)
if not definition.not_valid and definition not in self.definitions:
self.definitions.append(definition)
not_accepted = False
else:
self.bad_defs.append(str_definition.strip('\n'))
not_accepted = True
return not_accepted
[docs] def addDefinitionsFromFile(self, channels_file):
"""
Add reaction channel definitions from an input file to the
list of definitions.
:type channels_file: str
:param channels_file: input file containing the definitions
of the reaction channels
"""
channels_handle = open(channels_file, 'r')
for aline in channels_handle:
if not self.BLANK_PATTERN.match(aline):
self.addDefinition(aline)
channels_handle.close()
[docs] def buildChannelStrDef(self,
atom_a_one,
atom_b_one,
atom_a_two=None,
atom_b_two=None):
"""
Build the string representation of a reaction channel from a
sequence of integers.
:type atom_a_one: int
:param atom_a_one: the first atom of the first part of the
definition
:type atom_b_one: int
:param atom_b_one: the first atom of the second part of the
definition
:type atom_a_two: int
:param atom_a_two: the second atom of the first part of the
definition
:type atom_b_two: int
:param atom_b_two: the second atom of the second part of the
definition
:rtype: str
:return: str_def, the string representation of the provided
reaction channel
"""
FIRST_SEP = ChannelDefinition.FIRST_SEP
SECOND_SEP = ChannelDefinition.SECOND_SEP
str_def = str(atom_a_one)
if atom_a_two:
str_def += SECOND_SEP + str(atom_a_two)
str_def += FIRST_SEP + SECOND_SEP + str(atom_b_one)
if atom_b_two:
str_def += SECOND_SEP + str(atom_b_two)
return str_def
[docs] def getDissociationChannels(self, reactants, no_reactive_h, caches=None):
"""
Return an OrderedDict of dissociation channel string
definitions keyed by type.
:type reactants: schrodinger.structure.Structure
:param reactants: the reactants from which channels will be
enumerated
:type no_reactive_h: bool
:param no_reactive_h: specify that R-H bonds be considered inert
:param `CacheInfo` caches: Data caches to speed up the calculation
:rtype: OrderedDict
:return: keys are type, values are dissociation channels
"""
NO_DISSOCIATION_DEFS = """There are no dissociation channels
available for the reactants that you have provided. This
is likely because there are no single bonds outside of
rings or because you have disabled reactivity of R-H bonds.
Please reconsider the types of channels that you are trying
to sample."""
dissociation_defs = OrderedDict()
if not caches:
caches = CacheInfo()
# these are expensive functions and so are only called for small
# systems
for bond in reactants.bond:
if not any(CheckInput().checkIfNotSimpleBond(
reactants,
bond.atom1.index,
bond.atom2.index,
no_reactive_h,
bond_cache=caches.bond_cache)):
idxs = [(bond.atom1.index, bond.atom2.index)]
ahash = tuple(
get_sorted_atomic_smarts(reactants,
idxs,
smarts_cache=caches.smarts_cache))
str_def = self.buildChannelStrDef(*idxs[0])
dissociation_defs.setdefault(ahash, []).append(str_def)
if not dissociation_defs and self.logger:
self.logger.warning(NO_DISSOCIATION_DEFS)
return dissociation_defs
[docs] def getAssociationChannels(self, reactants, caches=None):
"""
Return an OrderedDict of association channel string
definitions keyed by type.
:type reactants: schrodinger.structure.Structure
:param reactants: the reactants from which channels will be
enumerated
:param `CacheInfo` caches: Data caches to speed up the calculation
:rtype: OrderedDict
:return: keys are type, values are association channels
"""
NO_ASSOCIATION_DEFS = """There are no association channels
available for the reactants that you have provided. This
is likely because you either have only a single reactant
molecule or you do not have at least two adsorbable atoms.
Please reconsider the types of channels that you are
trying to sample as well as the possibility of adding
custom adsorption sites."""
association_defs = OrderedDict()
if caches is None:
caches = CacheInfo()
caches.fillAdsCache(reactants, self.matches)
# these are expensive functions and so are only called for small
# systems
for molecule_one in reactants.molecule:
for molecule_two in reactants.molecule:
if molecule_one.number >= molecule_two.number:
continue
for atom_one in molecule_one.atom:
if atom_one not in caches.ads_cache:
continue
for atom_two in molecule_two.atom:
if atom_two not in caches.ads_cache:
continue
idxs = [(atom_one.index, atom_two.index)]
ahash = tuple(
get_sorted_atomic_smarts(
reactants,
idxs,
smarts_cache=caches.smarts_cache))
str_def = self.buildChannelStrDef(*idxs[0])
association_defs.setdefault(ahash, []).append(str_def)
if not association_defs and self.logger:
self.logger.warning(NO_ASSOCIATION_DEFS)
return association_defs
[docs] def getSingleDisplacementChannels(self,
reactants,
no_reactive_h,
unique,
caches=None):
"""
Return an OrderedDict of single displacement channel string
definitions keyed by type.
:type reactants: schrodinger.structure.Structure
:param reactants: the reactants from which channels will be
enumerated
:type no_reactive_h: bool
:param no_reactive_h: specify that R-H bonds be considered inert
:type unique: bool
:param unique: provide only unique products
:param `CacheInfo` caches: Data caches to speed up the calculation
:rtype: OrderedDict
:return: keys are type, values are single displacement channels
"""
NO_SINGLE_DISPLACEMENT_DEFS = """There are no single displacement
channels available for the reactants that you have provided.
This is likely because you either have only a single reactant
molecule, have no single bonds outside of rings, have disabled
the reactivity of R-H bonds, have no adsorbable atoms, or if
unique has been specified have no unique products. Please
reconsider the types of channels that you are trying to sample
as well as the possibility of adding custom adsorption sites."""
single_displacement_defs = OrderedDict()
if not caches:
caches = CacheInfo()
caches.fillAdsCache(reactants, self.matches)
# these are expensive functions and so are only called for small
# systems, this type of channel involves an atom in one molecule
# and a bond in another and so to handle the asymmetry we loop over
# all pairs of molecules and take an atom from the first and a bond
# from the second
for molecule_one in reactants.molecule:
for molecule_two in reactants.molecule:
if molecule_one.number != molecule_two.number:
for atom in molecule_one.atom:
if not atom in caches.ads_cache:
continue
for bond in get_bonds_in_molecule(molecule_two):
if any(CheckInput().checkIfNotSimpleBond(
reactants,
bond.atom1.index,
bond.atom2.index,
no_reactive_h,
bond_cache=caches.bond_cache)):
continue
if unique:
if not any(CheckInput().checkChannelUniqueness(
reactants,
atom.index,
None,
bond.atom1.index,
bond.atom2.index,
smiles_cache=caches.smiles_cache)):
continue
idxs = [(atom.index,),
(bond.atom1.index, bond.atom2.index)]
ahash = tuple(
get_sorted_atomic_smarts(
reactants,
idxs,
smarts_cache=caches.smarts_cache))
str_def = self.buildChannelStrDef(
atom.index, bond.atom1.index, None,
bond.atom2.index)
single_displacement_defs.setdefault(
ahash, []).append(str_def)
if not single_displacement_defs and self.logger:
self.logger.warning(NO_SINGLE_DISPLACEMENT_DEFS)
return single_displacement_defs
[docs] def getDoubleDisplacementChannels(
self,
reactants,
no_reactive_h,
unique,
no_reactive_polymer_monomer_backbone=False,
no_reactive_polymer_monomer_sidechain=False,
caches=None):
"""
Return an OrderedDict of double displacement channel string
definitions keyed by type.
:type reactants: schrodinger.structure.Structure
:param reactants: the reactants from which channels will be
enumerated
:type no_reactive_h: bool
:param no_reactive_h: specify that R-H bonds be considered inert
:type unique: bool
:param unique: provide only unique products
:type no_reactive_polymer_monomer_backbone: bool
:param no_reactive_polymer_monomer_backbone: specify that polymer monomer
backbone bonds be considered inert
:type no_reactive_polymer_monomer_sidechain: bool
:param no_reactive_polymer_monomer_sidechain: specify that polymer monomer
sidechain bonds be considered inert
:param `CacheInfo` caches: Data caches to speed up the calculation
:rtype: OrderedDict
:return: keys are type, values are double displacement channels
"""
NO_DOUBLE_DISPLACEMENT_DEFS = """There are no double displacement
channels available for the reactants that you have provided.
This is likely because you either have only a single reactant
molecule, have no single bonds outside of rings, have disabled
the reactivity of R-H bonds, have disabled the reactivity of
polymer monomer backbone and/or sidechain bonds, or if unique
has been specified have no unique products. Please reconsider
the types of channels that you are trying to sample."""
double_displacement_defs = OrderedDict()
allow_backbone = not no_reactive_polymer_monomer_backbone
allow_sidechain = not no_reactive_polymer_monomer_sidechain
if caches is None:
caches = CacheInfo()
# these are expensive functions and so are only called for small
# systems
is_polymer = get_grow_idxs(reactants)
for molecule_one in reactants.molecule:
for molecule_two in reactants.molecule:
if molecule_one.number < molecule_two.number:
for bond_one in get_bonds_in_molecule(molecule_one):
b1i1 = bond_one.atom1.index
b1i2 = bond_one.atom2.index
if any(CheckInput().checkIfNotSimpleBond(
reactants,
b1i1,
b1i2,
no_reactive_h,
bond_cache=caches.bond_cache)):
continue
for bond_two in get_bonds_in_molecule(molecule_two):
b2i1 = bond_two.atom1.index
b2i2 = bond_two.atom2.index
if any(CheckInput().checkIfNotSimpleBond(
reactants,
b2i1,
b2i2,
no_reactive_h,
bond_cache=caches.bond_cache)):
continue
# It's important speed-wise to check is_polymer
# first before calling valid_polymer
if (is_polymer and
not valid_polymer_monomer_double_displacement_rxn(
reactants,
b1i1,
b1i2,
b2i1,
b2i2,
allow_backbone=allow_backbone,
allow_sidechain=allow_sidechain)):
continue
if unique:
if not any(CheckInput().checkChannelUniqueness(
reactants,
b1i1,
b1i2,
b2i1,
b2i2,
smiles_cache=caches.smiles_cache)):
continue
idxs = [(b1i1, b1i2), (b2i1, b2i2)]
str_def = self.buildChannelStrDef(
b1i1, b2i1, b1i2, b2i2)
ahash = tuple(
get_sorted_atomic_smarts(
reactants,
idxs,
outer_sort=True,
smarts_cache=caches.smarts_cache))
double_displacement_defs.setdefault(
ahash, []).append(str_def)
if not double_displacement_defs and self.logger:
self.logger.warning(NO_DOUBLE_DISPLACEMENT_DEFS)
return double_displacement_defs
[docs] def getAllChannels(self,
reactants,
random_types,
no_reactive_h,
unique,
no_reactive_polymer_monomer_backbone=False,
no_reactive_polymer_monomer_sidechain=False):
"""
Return a nested OrderedDict of all channel string
definitions keyed first by type of channel and second
by type of reaction.
:type reactants: schrodinger.structure.Structure
:param reactants: the reactants from which channels will be
enumerated
:type random_types: list
:param random_types: list of strings specifying which
types of channels should be sampled when generating the
random reaction channels
:type no_reactive_h: bool
:param no_reactive_h: specify that R-H bonds be considered
inert
:type unique: bool
:param unique: provide only unique products
:type no_reactive_polymer_monomer_backbone: bool
:param no_reactive_polymer_monomer_backbone: specify that polymer monomer
backbone bonds be considered inert
:type no_reactive_polymer_monomer_sidechain: bool
:param no_reactive_polymer_monomer_sidechain: specify that polymer monomer
sidechain bonds be considered inert
:rtype: OrderedDict
:return: all possible channel definitions, keyed first by
channel type and second by reaction type
"""
channel_defs_to_sample = OrderedDict()
caches = CacheInfo()
if Constants.DISSOCIATION_TYPE in random_types:
dissociation_defs = self.getDissociationChannels(reactants,
no_reactive_h,
caches=caches)
if dissociation_defs:
channel_defs_to_sample[Constants.DISSOCIATION_TYPE] = \
dissociation_defs
if Constants.ASSOCIATION_TYPE in random_types:
association_defs = self.getAssociationChannels(reactants,
caches=caches)
if association_defs:
channel_defs_to_sample[Constants.ASSOCIATION_TYPE] = \
association_defs
if Constants.SINGLE_DISPLACEMENT_TYPE in random_types:
single_displacement_defs = self.getSingleDisplacementChannels(
reactants, no_reactive_h, unique, caches=caches)
if single_displacement_defs:
channel_defs_to_sample[Constants.SINGLE_DISPLACEMENT_TYPE] = \
single_displacement_defs
if Constants.DOUBLE_DISPLACEMENT_TYPE in random_types:
double_displacement_defs = self.getDoubleDisplacementChannels(
reactants,
no_reactive_h,
unique,
no_reactive_polymer_monomer_backbone=
no_reactive_polymer_monomer_backbone,
no_reactive_polymer_monomer_sidechain=
no_reactive_polymer_monomer_sidechain,
caches=caches)
if double_displacement_defs:
channel_defs_to_sample[Constants.DOUBLE_DISPLACEMENT_TYPE] = \
double_displacement_defs
return channel_defs_to_sample
[docs] def printAllChannels(self, all_channel_defs):
"""
For each type of sampled channel log the total number
of such channels.
:type all_channel_defs: OrderedDict
:param all_channel_defs: all possible channel definitions,
keyed first by channel type and second by reaction type
"""
types_to_sizes_dict = OrderedDict()
for channel_type, rxn_types in all_channel_defs.items():
total = sum([len(rxns) for rxns in rxn_types.values()])
types_to_sizes_dict[channel_type] = total
TOTAL_TAG = 'Total:'
max_descriptor = max(
[len(descriptor) for descriptor in self.DESCRIPTOR_DICT.values()])
max_size = max([len(str(size)) for size in \
list(types_to_sizes_dict.values())])
HEADER_ONE = 'Number of possible reaction channels by type'
HEADER_TWO = '-' * len(HEADER_ONE)
if self.logger:
self.logger.info(HEADER_ONE)
self.logger.info(HEADER_TWO)
self.logger.info('')
for key, value in types_to_sizes_dict.items():
descriptor = self.DESCRIPTOR_DICT[key]
aline = descriptor.ljust(max_descriptor) + \
str(value).rjust(max_size)
self.logger.info(aline)
if len(types_to_sizes_dict) > 1:
self.logger.info('-' * (max_descriptor + max_size))
total = sum(types_to_sizes_dict.values())
total_line = TOTAL_TAG.ljust(max_descriptor) + \
str(total).rjust(max_size)
self.logger.info(total_line)
self.logger.info('')
[docs] def addRandomChannelsFromAll(self,
num_random_channels,
random_seed,
all_channel_defs,
bin_rxn_types=False):
"""
Add the specified number of random channels to the list
of channels to be computed by picking a random sample of
the size given from the list of all possible channels.
If the number of random channels is 'all' or greater than
or equal to the number of possible channels then pick all
of them in order.
:type num_random_channels: int or str 'all'
:param num_random_channels: the desired number of random
channels to generate
:type random_seed: None or int
:param random_seed: the seed for the random number generator
:type all_channel_defs: OrderedDict
:param all_channel_defs: all possible channel definitions,
keyed first by channel type and second by reaction type
:type bin_rxn_types: bool
:param bin_rxn_types: if True then when generating random
channels first bin by reaction type followed by selecting a
random type followed by a random instance, if False then just
select a random instance from all instances
"""
ASKED_FOR_TOO_MANY_MSG = """
You have specified that %s random reaction channels
should be generated but this system, given the other
options you have specified, only has %s possible
reaction channels. Proceeding with all possible
reaction channels."""
flattened_channels = self.flattenChannels(all_channel_defs)
total_num_channels = len(flattened_channels)
if num_random_channels == Constants.ALL_RANDOM_CHANNELS or \
num_random_channels == total_num_channels:
defs_to_add = list(flattened_channels)
elif num_random_channels > total_num_channels:
if self.logger:
self.logger.warning(ASKED_FOR_TOO_MANY_MSG %
(num_random_channels, total_num_channels))
defs_to_add = list(flattened_channels)
else:
random_seed = init_random_seed(random_seed, self.logger)
my_random.seed(random_seed)
if bin_rxn_types:
defs_to_add = []
while len(defs_to_add) < num_random_channels:
channel_type = random_choice(all_channel_defs)
rxn_type = random_choice(all_channel_defs[channel_type])
rxn = random_choice(
all_channel_defs[channel_type][rxn_type])
if rxn not in defs_to_add:
defs_to_add.append(rxn)
else:
defs_to_add = list(
my_random.choice(flattened_channels,
size=num_random_channels,
replace=False))
for def_to_add in defs_to_add:
self.addDefinition(def_to_add)
def _getRandomAtomsFromBins(self, mols, cache=None):
"""
Return a list of random atom indicies, one per given
molecule, drawn by first binning by atom type.
:type mols: tuple
:param mols: contains schrodinger.structure._Molecule
:type cache: dict
:param cache: an atom type dict cache, keys are molecule
numbers, values are atom type dicts
:rtype: list
:return: contains single tuples of atom indices, i.e. (1,)
"""
singles = []
cache = cache or {}
for mol in mols:
atom_dict = cache.get(mol.number)
if not atom_dict:
st = mol.extractStructure()
atom_dict = get_atom_type_dict(st)
if atom_dict:
old = mol.getAtomIndices()
new = list(range(1, len(old) + 1))
new_to_old = dict(list(zip(new, old)))
atom_dict = _update_type_dict(atom_dict, new_to_old)
cache[mol.number] = atom_dict
atype = random_choice(atom_dict)
single = random_choice(atom_dict[atype])
singles.append(single)
return singles
def _getRandomBondsFromBins(self, mols, cache=None):
"""
Return a list of random bonding atom index pairs, one
per given molecule, drawn by first binning by bond type.
:type mols: tuple
:param mols: contains schrodinger.structure._Molecule
:type cache: dict
:param cache: a bond type dict cache, keys are molecule
numbers, values are bond type dicts
:rtype: list
:return: contains pair tuples of atom indices, i.e. (1, 2)
"""
pairs = []
cache = cache or {}
for mol in mols:
bond_dict = cache.get(mol.number)
if not bond_dict:
st = mol.extractStructure()
bond_dict = get_bond_type_dict(st)
if bond_dict:
old = mol.getAtomIndices()
new = list(range(1, len(old) + 1))
new_to_old = dict(list(zip(new, old)))
bond_dict = _update_type_dict(bond_dict, new_to_old)
cache[mol.number] = bond_dict
atype = random_choice(bond_dict)
pair = random_choice(bond_dict[atype])
pairs.append(pair)
return pairs
[docs] def addRandomChannelsOnTheFly(self,
reactants,
num_random_channels,
random_types,
no_reactive_h,
unique,
bin_rxn_types=False,
no_reactive_polymer_monomer_backbone=False,
no_reactive_polymer_monomer_sidechain=False):
"""
Generate the specified number of random reaction channels
without ever precomputing the list of all possible channels.
:type reactants: schrodinger.structure.Structure
:param reactants: the reactants from which channels will be
enumerated
:type num_random_channels: int
:param num_random_channels: the desired number of random
channels to generate
:type random_types: list
:param random_types: list of strings specifying which types of
channels should be sampled when generating the random reaction
channels
:type no_reactive_h: bool
:param no_reactive_h: specify that R-H bonds be considered inert
:type unique: bool
:param unique: provide only unique products
:type bin_rxn_types: bool
:param bin_rxn_types: if True then when generating random
channels first bin by reaction type followed by selecting a
random type followed by a random instance, if False then just
select a random instance from all instances
:type no_reactive_polymer_monomer_backbone: bool
:param no_reactive_polymer_monomer_backbone: specify that polymer monomer
backbone bonds be considered inert
:type no_reactive_polymer_monomer_sidechain: bool
:param no_reactive_polymer_monomer_sidechain: specify that polymer monomer
sidechain bonds be considered inert
"""
def wrapper_check_bonding():
"""
This is just a convenience wrapper to check the validity of
any random bond(s) generated on-the-fly for the current random
reaction channel type.
:rtype: boolean
:return: True if at least one of the bonds is not a simple bond
and therefore to skip this random definition, False otherwise
"""
if atom_a_two:
if any(CheckInput().checkIfNotSimpleBond(
reactants, atom_a_one, atom_a_two, no_reactive_h)):
return True
if atom_b_two:
if any(CheckInput().checkIfNotSimpleBond(
reactants, atom_b_one, atom_b_two, no_reactive_h)):
return True
def get_bonded_atoms(central_atom_ind):
"""
Return a list of atom indicies for atoms bound to the central
atom. This function just collects indicies provided by the
schrodinger.structure.Structure._StructureAtom.bonded_atoms
generator so that a sample from which we can randomly choose
can be obtained.
:type central_atom_ind: int
:param central_atom_ind: the central atom for which the bonded
atoms will be returned
:rtype: list of ints
:return: atoms bonded to the central atom
"""
central_atom = reactants.atom[central_atom_ind]
bonded_atoms = []
for bonded_atom in central_atom.bonded_atoms:
bonded_atoms.append(bonded_atom.index)
return bonded_atoms
def break_out():
if num_unproductive_cycles > self.MAX_NUM_UNPRODUCTIVE_CYCLES:
UNPRODUCTIVE_MSG = """Over the last %d attempts this script
has been unable to generate a valid randomly defined
reaction channel. It is likely that you have either
asked for a number of random channels that is greater
than the number of available channels given your input
or you are trying to sample a rare channel. This
script will proceed with the %d random channels that
have been determined thus far out of the %d that you
asked for."""
if self.logger:
self.logger.warning(
UNPRODUCTIVE_MSG %
(self.MAX_NUM_UNPRODUCTIVE_CYCLES,
num_random_channels_added, num_random_channels))
return True
def get_random_molecule_and_atom():
mol = random_choice(reactant_molecules)
atom_one, atom_two = random_choice(mol.getAtomIndices()), None
atom_one_bonded_atoms = get_bonded_atoms(atom_one)
return mol, atom_one, atom_two, atom_one_bonded_atoms
allow_backbone = not no_reactive_polymer_monomer_backbone
allow_sidechain = not no_reactive_polymer_monomer_sidechain
# begin generating random reaction channels
#
# in this case we have to use a while loop because we have no
# upper bound given that it is too expensive to calculate by
# default (if the user wants to they can pass 'all' as the
# argument to the number of random reaction channels)
num_random_channels_added = 0
num_unproductive_cycles = 0
reactant_molecules = [molecule for molecule in reactants.molecule]
atom_dict_cache = {}
bond_dict_cache = {}
while num_random_channels_added < num_random_channels:
if break_out():
break
# pick a random type of channel
channel_type = random_choice(random_types)
# pick a random molecule and a random atom
mol_a, atom_a_one, atom_a_two, atom_a_one_bonded_atoms = \
get_random_molecule_and_atom()
if channel_type != Constants.DISSOCIATION_TYPE and \
len(reactant_molecules) != 1:
# pick another random molecule and a random atom
# and compare
mol_b, atom_b_one, atom_b_two, \
atom_b_one_bonded_atoms = \
get_random_molecule_and_atom()
if mol_a.number == mol_b.number:
num_unproductive_cycles += 1
continue
# check association type
if channel_type == Constants.ASSOCIATION_TYPE:
if bin_rxn_types:
mols = (mol_a, mol_b)
(atom_a_one,), (atom_b_one,) = \
self._getRandomAtomsFromBins(mols,
cache=atom_dict_cache)
if CheckInput().checkIfNotAdsorbable(
reactants, atom_a_one, self.matches):
num_unproductive_cycles += 1
continue
if CheckInput().checkIfNotAdsorbable(
reactants, atom_b_one, self.matches):
num_unproductive_cycles += 1
continue
# check single displacement type
if channel_type == Constants.SINGLE_DISPLACEMENT_TYPE:
if not (atom_a_one_bonded_atoms or atom_b_one_bonded_atoms):
num_unproductive_cycles += 1
continue
elif atom_a_one_bonded_atoms:
if not bin_rxn_types:
atom_a_two = random_choice(atom_a_one_bonded_atoms)
else:
mols = (mol_a,)
(atom_a_one, atom_a_two), = \
self._getRandomBondsFromBins(mols,
cache=bond_dict_cache)
mols = (mol_b,)
(atom_b_one,), = self._getRandomAtomsFromBins(
mols, cache=atom_dict_cache)
if CheckInput().checkIfNotAdsorbable(
reactants, atom_b_one, self.matches):
num_unproductive_cycles += 1
continue
elif atom_b_one_bonded_atoms:
if not bin_rxn_types:
atom_b_two = random_choice(atom_b_one_bonded_atoms)
else:
mols = (mol_a,)
(atom_a_one,), = self._getRandomAtomsFromBins(
mols, cache=atom_dict_cache)
mols = (mol_b,)
(atom_b_one, atom_b_two), = \
self._getRandomBondsFromBins(mols,
cache=bond_dict_cache)
if CheckInput().checkIfNotAdsorbable(
reactants, atom_a_one, self.matches):
num_unproductive_cycles += 1
continue
# check double displacement type
if channel_type == Constants.DOUBLE_DISPLACEMENT_TYPE:
if not atom_a_one_bonded_atoms or not atom_b_one_bonded_atoms:
num_unproductive_cycles += 1
continue
elif not bin_rxn_types:
atom_a_two = random_choice(atom_a_one_bonded_atoms)
atom_b_two = random_choice(atom_b_one_bonded_atoms)
else:
mols = (mol_a, mol_b)
(atom_a_one, atom_a_two), (atom_b_one, atom_b_two) = \
self._getRandomBondsFromBins(mols,
cache=bond_dict_cache)
# check bonding
if wrapper_check_bonding():
num_unproductive_cycles += 1
continue
# check polymer monomers
if channel_type == Constants.DOUBLE_DISPLACEMENT_TYPE and \
not valid_polymer_monomer_double_displacement_rxn(
reactants, atom_a_one, atom_a_two, atom_b_one, atom_b_two,
allow_backbone=allow_backbone, allow_sidechain=allow_sidechain):
num_unproductive_cycles += 1
continue
# check channel uniqueness
if unique and (channel_type == Constants.SINGLE_DISPLACEMENT_TYPE or \
channel_type == Constants.DOUBLE_DISPLACEMENT_TYPE):
if not any(CheckInput().checkChannelUniqueness(
reactants, atom_a_one, atom_a_two, atom_b_one,
atom_b_two)):
num_unproductive_cycles += 1
continue
elif channel_type == Constants.DISSOCIATION_TYPE:
# check bonding
if not atom_a_one_bonded_atoms:
num_unproductive_cycles += 1
continue
elif not bin_rxn_types:
atom_b_one, atom_b_two = \
random_choice(atom_a_one_bonded_atoms), None
else:
mols = (mol_a,)
(atom_a_one, atom_b_one), = self._getRandomBondsFromBins(
mols, cache=bond_dict_cache)
if any(CheckInput().checkIfNotSimpleBond(
reactants, atom_a_one, atom_b_one, no_reactive_h)):
num_unproductive_cycles += 1
continue
# wrap up, do not store any random defs that are bad defs
# because we are directly creating the definitions from
# structural considerations and the contents here will
# only be due to duplicates and the user doesn't need to
# see them
random_def = self.buildChannelStrDef(atom_a_one, atom_b_one,
atom_a_two, atom_b_two)
if self.addDefinition(random_def):
num_unproductive_cycles += 1
self.bad_defs.remove(random_def.strip('\n'))
continue
else:
num_random_channels_added += 1
num_unproductive_cycles = 0
[docs] def addRandomChannels(self,
reactants,
num_random_channels=Constants.NUM_RANDOM_CHANNELS,
random_types=Constants.DEFAULT_RANDOM_TYPES,
random_seed=Constants.RANDOM_SEED,
allow_adsorption_onto=None,
no_reactive_h=Constants.NO_REACTIVE_H,
unique=Constants.UNIQUE,
bin_rxn_types=False,
no_reactive_polymer_monomer_backbone=False,
no_reactive_polymer_monomer_sidechain=False):
"""
Generate the specified number of random reaction channels.
:type reactants: schrodinger.structure.Structure
:param reactants: the reactants from which channels will be
enumerated
:type num_random_channels: int or str
:param num_random_channels: the desired number of random
channels to generate or 'all'
:type random_types: list
:param random_types: list of strings specifying which types of
channels should be sampled when generating the random reaction
channels
:type random_seed: None or int
:param random_seed: the seed for the random number generator
:type allow_adsorption_onto: list
:param allow_adsorption_onto: SMARTS patterns of arbitrary
adsorption sites to support
:type no_reactive_h: bool
:param no_reactive_h: specify that R-H bonds be considered inert
:type unique: bool
:param unique: provide only unique products
:type bin_rxn_types: bool
:param bin_rxn_types: if True then when generating random
channels first bin by reaction type followed by selecting a
random type followed by a random instance, if False then just
select a random instance from all instances
:type no_reactive_polymer_monomer_backbone: bool
:param no_reactive_polymer_monomer_backbone: specify that polymer monomer
backbone bonds be considered inert
:type no_reactive_polymer_monomer_sidechain: bool
:param no_reactive_polymer_monomer_sidechain: specify that polymer monomer
sidechain bonds be considered inert
"""
# check input needed for generating random channels
num_random_channels, random_types, \
self.allow_adsorption_onto, self.matches = \
CheckInput().checkRandomChannelsInput(reactants,
num_random_channels, random_types, random_seed,
allow_adsorption_onto, self.logger)
if not num_random_channels:
# MATSCI-1310 - reset allow_adsorption_onto just to be safe
self.allow_adsorption_onto = None
return
# log adsorption channels
print_allow_adsorption_onto(self.allow_adsorption_onto, self.logger)
# if all possible channels are wanted or if it is
# inexpensive to calculate them all then calculate
# them all and print them. note that this use of the
# word 'all' when referring to reaction channels means
# 'all' of each type specified in the list of random
# types to be sampling. if the latter is also 'all'
# then that will literally be all of the possible
# reaction channels that are calculable by this script.
#
# if the user either asked for all of the channels or
# for an inexpensive system they specified a number of
# reaction channels that is larger than or equal to the
# number of possible channels then all of the channels
# will be computed. otherwise a random sample of channels
# will be selected from the list of all possible channels
# or are generated on the fly, without ever precomputing the
# list of all possible channels, depending on system size.
# we do it this way to avoid having to precompute the huge
# list of all possible channels for large systems when the
# user might only want a small number of them. the sacrifice
# being that the number of channels by type will never be
# reported to the user and the exit conditions from the on
# the fly version are indeterministic. for example, without
# the implemented time-out it would be possible to enter an
# infinite loop in a situation where the user has asked for
# a number of channels that is greater than the number of
# available channels given that the latter is never computed
# for large systems. also, it will be possible that the
# script will prematurely give up on trying to randomly
# find valid reaction channels if too much time is spent
# trying to find a rare channel
if num_random_channels == Constants.ALL_RANDOM_CHANNELS or \
reactants.atom_total < self.RANDOM_ALL_TO_ON_THE_FLY_THRESH:
all_channel_defs = self.getAllChannels(
reactants,
random_types,
no_reactive_h,
unique,
no_reactive_polymer_monomer_backbone=
no_reactive_polymer_monomer_backbone,
no_reactive_polymer_monomer_sidechain=
no_reactive_polymer_monomer_sidechain)
if not all_channel_defs:
return
self.printAllChannels(all_channel_defs)
self.addRandomChannelsFromAll(num_random_channels,
random_seed,
all_channel_defs,
bin_rxn_types=bin_rxn_types)
else:
random_seed = init_random_seed(random_seed, self.logger)
my_random.seed(random_seed)
self.addRandomChannelsOnTheFly(
reactants,
num_random_channels,
random_types,
no_reactive_h,
unique,
bin_rxn_types=bin_rxn_types,
no_reactive_polymer_monomer_backbone=
no_reactive_polymer_monomer_backbone,
no_reactive_polymer_monomer_sidechain=
no_reactive_polymer_monomer_sidechain)
[docs]class ChannelDefinition(object):
"""
Manage a reaction channel definition.
"""
PATTERN = \
re.compile(r'\s*(\d+)\s*(\d+)?\s*;\s*(\d+)\s*(\d+)?\s*$')
FIRST_SEP = ';'
SECOND_SEP = ' '
[docs] def __init__(self, str_definition, logger=None):
"""
Create an instance.
:type str_definition: str
:param str_definition: the string representation of
the definition
:type logger: logging.getLogger
:param logger: output logger
"""
self.str_definition = str_definition
self.logger = logger
self.not_valid = False
self.r_one_a_one = None
self.r_one_a_two = None
self.r_two_a_one = None
self.r_two_a_two = None
self.r_one = []
self.r_two = []
self.listdef = []
self.flatdef = []
self.representation = ''
self.do_which_types = None
self.checkDefinition()
def __eq__(self, other):
"""
Define class equality.
"""
state = self.flatdef == other.flatdef
return state
def __repr__(self):
"""
Define representation format.
"""
return self.representation
[docs] def setChannelAttrs(self,
r_one_a_one,
r_two_a_one,
r_one_a_two=None,
r_two_a_two=None):
"""
Set up some attributes for this class.
:type r_one_a_one: int
:param r_one_a_one: the first atom of the first reactant
:type r_two_a_one: int
:param r_two_a_one: the first atom of the second reactant
:type r_one_a_two: int
:param r_one_a_two: the second atom of the first reactant
:type r_two_a_two: int
:param r_two_a_two: the second atom of the second reactant
"""
# sort the lists on either side of the separator and then
# do a final sort so that the pair with the smallest index
# comes first, this is just a convention
self.r_one_a_one = int(r_one_a_one)
self.r_one = [self.r_one_a_one]
self.r_two_a_one = int(r_two_a_one)
self.r_two = [self.r_two_a_one]
self.r_one_a_two = r_one_a_two
if self.r_one_a_two:
self.r_one_a_two = int(self.r_one_a_two)
self.r_one += [self.r_one_a_two]
self.r_one.sort()
self.r_two_a_two = r_two_a_two
if self.r_two_a_two:
self.r_two_a_two = int(self.r_two_a_two)
self.r_two += [self.r_two_a_two]
self.r_two.sort()
if self.r_one[0] <= self.r_two[0]:
self.listdef = [self.r_one, self.r_two]
self.flatdef = self.r_one + self.r_two
else:
self.listdef = [self.r_two, self.r_one]
self.flatdef = self.r_two + self.r_one
r_one, r_two = self.listdef
r_one = self.SECOND_SEP.join([str(index) for index in r_one])
r_two = self.SECOND_SEP.join([str(index) for index in r_two])
self.representation = r_one + self.FIRST_SEP + \
self.SECOND_SEP + r_two
[docs] def checkAtomUniqueness(self):
"""
Check for unique atoms.
"""
if len(self.flatdef) != len(set(self.flatdef)):
self.not_valid = True
return self.not_valid
[docs] def checkAtomZeroIndex(self):
"""
Check for an atom index that is zero.
"""
if 0 in self.flatdef:
self.not_valid = True
return self.not_valid
[docs] def checkDefinition(self):
"""
Check this reaction channel definition.
"""
if self.checkDefStrFormat():
return
if self.checkAtomUniqueness():
return
if self.checkAtomZeroIndex():
return
[docs]def get_fragment_smiles(astructure, idx1, idx2):
"""
Return two SMILES patterns of the two molecule fragments that
would result from breaking the bond specified by the given two
atoms or return the SMILES pattern of the molecule containing
idx1 if idx2 is None.
:type astructure: schrodinger.structure.Structure
:param astructure: the structure containing the molecule for which
fragment SMILES are wanted
:type idx1: int
:param idx1: an atom index
:type idx2: int
:param idx2: the atom index for the target bond formed with idx1 or
None
:rtype: str, str or str, None
:return: either two strings one for each of two fragments or a single
string for the molecule containing idx1 and None
"""
mol = astructure.atom[idx1].getMolecule()
orig_to_new = dict(
list(zip(mol.getAtomIndices(), list(range(1,
len(mol.atom) + 1)))))
mol = mol.extractStructure()
if idx2:
newidx1 = orig_to_new[idx1]
newidx2 = orig_to_new[idx2]
frag_a_indicies = indicies_from_bonds_deep(mol, newidx1, [newidx2])
frag_a_st = mol.extract(frag_a_indicies)
frag_a_smiles = analyze.generate_smiles(frag_a_st)
frag_b_indicies = indicies_from_bonds_deep(mol, newidx2, [newidx1])
frag_b_st = mol.extract(frag_b_indicies)
frag_b_smiles = analyze.generate_smiles(frag_b_st)
else:
frag_a_st = mol.copy()
frag_a_smiles = analyze.generate_smiles(frag_a_st)
frag_b_smiles = None
return frag_a_smiles, frag_b_smiles
[docs]def indicies_from_bonds_deep(astructure,
start_atom_index,
exclude_atom_indicies=None,
depth=None):
"""
Return a list of atom indicies obtained by collecting all atoms
that are connected, in the bond traversal sense, to the given
start atom by a number of bonds that is less than or equal to
that given by the provided depth parameter. The start atom is
included in the returned list. One can set a direction for the
traversal by specifying exclusion atoms that are bound to the given
start atom; the traversal will be done away from these atoms.
Useful for doing SMARTS analysis from a central atom and proceeding
outwards from that atom, as in nearest-neighbor,
next-nearest-neighbor, etc., i. e. shells. Also useful for
splitting a molecule into fragments about a given bond.
:type astructure: schrodinger.structure.Structure
:param astructure: the structure containing all atoms
:type start_atom_index: int
:param start_atom_index: the atom index from which to start the traversal
:type exclude_atom_indicies: list of int
:param exclude_atom_indicies: atom indicies to exclude from the bond
traversal which basically sets the direction of the bond traversal, for
example in the direction of a certain bond
:type depth: int
:param depth: the maximum depth that the traversal is allowed to traverse
in terms of the number of bonds from the starting atom
:rtype: list of ints
:return: indicies, atom indicies connected to the start atom by up to the
specified bond depth
"""
# if only radial shells are wanted then the exclusion list
# should be empty
if exclude_atom_indicies is None:
exclude_atom_indicies = []
# if only fragments from bond breaking are wanted then the
# depth doesn't matter so we will set it to somthing
# guaranteed to be larger than the number of connections,
# i.e. set it to the number of atoms plus one (the latter
# needed to handle the case where the number of atoms is
# the same as the number of connections)
if depth is None:
depth = astructure.atom_total + 1
# recursion function, if needed set the direction with
# exclusion atoms, and if needed set the number of times
# through the recursion with the depth. The num_times_through
# variable gives the current depth of the traversal, i.e. it
# tells you which 'shell' you are on either the first shell
# (nearest neighbors to the start atom), the second shell
# (next nearest neighbors to the start atom), etc. It is
# bounded by the depth argument. It starts at zero and is
# incremented by one each time a valid bonding atom is
# found in the next shell. Each next shell enters the problem
# with the next call of continue_traversal_recursion. This is
# done until the desired depth is acheived at which point we
# to return from the current call and get back to the most recent
# branch point. We first need to decrement the num_times_through
# counter by 2, once for incrementing-by-one that was just done
# with the current call and once to get back to the last branch
# point. If the current atom marks the end of a branch then
# we only decrement by one to get back to the last branch point.
# Remember that the structure of this set of recursive calls is
# that starting from atom A, with first shell [B, C, D], and
# second shell [E, F, G, H, I, J] and is greedy, that is that
# the code block will reach A->B->E before it reaches even A->C,
# etc.
def continue_traversal_recursion(start_atom_index, exclude_atom_indicies):
global num_times_through
for bonded_atom in \
astructure.atom[start_atom_index].bonded_atoms:
if bonded_atom.index not in indicies and \
bonded_atom.index not in exclude_atom_indicies:
num_times_through += 1
if num_times_through <= depth:
indicies.add(bonded_atom.index)
continue_traversal_recursion(bonded_atom.index,
[start_atom_index])
else:
num_times_through -= 2
return
num_times_through -= 1
return
# put the start atom in
indicies = set([start_atom_index])
# this global will track the number of times the
# resursion is called
global num_times_through
num_times_through = 0
# start it off
continue_traversal_recursion(start_atom_index, exclude_atom_indicies)
# return a sorted list
indicies = list(indicies)
indicies.sort()
return indicies
[docs]def set_zob_to_sob(astructure, atom_one, atom_two):
"""
If the given structure has a zero-order bond for the given
atom indicies then make this bond a single-order bond.
:type astructure: schrodinger.structure.Structure
:param astructure: the structure for which the bond order
may be changed
:type atom_one: int
:param atom_one: the first atom index of the bond whose order
needs changing
:type atom_two: int
:param atom_two: the second atom index of the bond whose order
needs changing
"""
bond = astructure.getBond(atom_one, atom_two)
if bond.order == 0:
bond.order = 1
[docs]def wrapper_build_attach_structure(mol_one_st, mol_one_from_atom,
mol_one_to_atom, mol_two_st,
mol_two_from_atom, mol_two_to_atom):
"""
A wrapper that corrects the atom reorder map returned from
build.attach_structure to account for deleted fragment atoms
and wraps the case where the active bonds may be of zero-order.
:type mol_one_st: schrodinger.structure.Structure
:param mol_one_st: the base structure to which the fragment
will be added
:type mol_one_from_atom: int
:param mol_one_from_atom: defines the point at which the
fragment structure will be added to the base structure
:type mol_one_to_atom: int
:param mol_one_to_atom: this atom defines the part of the
base structure that will be replaced by the fragment
:type mol_two_st: schrodinger.structure.Structure
:param mol_two_st: the fragment structure to be added
to the base structure
:type mol_two_from_atom: int
:param mol_two_from_atom: defines the point at which the
base structure will be added to the fragment structure
:type mol_two_to_atom: int
:param mol_two_to_atom: this atom defines the part of the
fragment structure that will be replaced by the base
:rtype: dict, dict
:return: two mapping dictionaries,
(1) orig_to_new_map, a mapping of original indices of atoms in input
structures to atom indices in the newly constructed structure. Indices
for atoms from the second structure were offset by the number of atoms
in the first structure, and
(2) new_to_orig_map, the reverse of (1).
"""
# handle zero-order bonds for both structures
set_zob_to_sob(mol_one_st, mol_one_from_atom, mol_one_to_atom)
set_zob_to_sob(mol_two_st, mol_two_from_atom, mol_two_to_atom)
# add atom indices to structures as properties before modification
orig_to_new_map = {}
atom_renumbering_index_property_key = 'i_matsci_atomRenumberingIndex'
mol_two_offset = 0
# initialize all fragment atoms to None in the renumber map
for at in mol_one_st.atom:
at.property[atom_renumbering_index_property_key] = at.index
orig_to_new_map[at.index] = None
mol_two_offset = at.index
for at in mol_two_st.atom:
at.property[
atom_renumbering_index_property_key] = mol_two_offset + at.index
orig_to_new_map[mol_two_offset + at.index] = None
build.attach_structure(mol_one_st, mol_one_from_atom, mol_one_to_atom,
mol_two_st, mol_two_from_atom, mol_two_to_atom)
# track atom renumbering and update the renumber map
for at in mol_one_st.atom:
if at.property.get(atom_renumbering_index_property_key) is not None:
orig_to_new_map[at.property.get(
atom_renumbering_index_property_key)] = at.index
mol_one_st.deletePropertyFromAllAtoms(atom_renumbering_index_property_key)
# while in here return the new to original index map for
# convenience, it will skip None atoms
new_to_orig_map = {}
for key, value in orig_to_new_map.items():
if value is not None:
new_to_orig_map[value] = key
return orig_to_new_map, new_to_orig_map
[docs]def dict_delete_and_decrement_keys(indict, key_to_delete):
"""
Return the dictionary obtained from the input dictionary
by deleting an entry with the specified key and then
decrementing all keys larger than the specified key.
Leave the values unchanged.
:type indict: dict
:param indict: dictionary with integer keys and values
from which to remove and decrement the specified key
:type key_to_delete: int
:param key_to_delete: the key to delete from the input
dictionary
:rtype: dict
:return outdict: the new dictionary without the specifed
key and with decremented keys
"""
outdict = {}
for key, value in indict.items():
if key < key_to_delete:
outdict[key] = value
elif key > key_to_delete:
outdict[key - 1] = value
return outdict
[docs]def open_bonding_sites(astructure, atom_index):
"""
Return the number of open bonding sites available for the
provided atom in the given structure. This number is the
same as the number of unpaired electrons for the atom in
its current state in the given structure.
:type astructure: schrodinger.structure.Structure
:param astructure: the structure that contains the provided
atom
:type atom_index: int
:param atom_index: the index of the atom for which the
number of open bonding sites will be reported
:rtype: int
:return: atom_open_val, the number of open bonding sites
for the provided atom
"""
# using older MacroModel atom types because mmlewis doesn't
# have an API to compute this
atom_obj = astructure.atom[atom_index]
atom_type = mm.mmat_get_atom_type(atom_obj.atom_type_name)
atom_max_val = mm.mmat_get_valence(atom_type)
atom_curr_val = sum([bond.order for bond in atom_obj.bond])
atom_open_val = atom_max_val - atom_curr_val
return atom_open_val
[docs]def minimize_geometry(astructure):
"""
Perform a geometry minimization on the provided structure.
:type astructure: schrodinger.structure.Structure
:param astructure: the structure object needing minimization
:rtype: bool
:return: False if the minimization was successful, True
otherwise
"""
version = Channels.OPLS_VERSION
# using a bare except because I am not entirely sure of the
# different ways that this function will traceback
try:
astructure.retype()
with ioredirect.IOSilence():
with minimize.minimizer_context(ffld_version=version,
struct=astructure,
cleanup=False) as minimize_obj:
min_res = minimize_obj.minimize()
astructure.property[
mm.OPLS_POTENTIAL_ENERGY] = min_res.potential_energy
except:
return True
[docs]def get_bonds_in_molecule(amolecule):
"""
This is a convenience function to get a list of bond objects
from a molecule object, since no such function exists in our
Python API.
:type amolecule: schrodinger.structure._Molecule
:param amolecule: the molecule object from which bonds will
be defined
:rtype: list of schrodinger.structure._StructureBond
:return: bonds, list of bond objects for bonds in the specified
molecule
"""
# originally this was a set but it was later discovered while
# testing the GA that consecutive runs using the same random seed
# gave different results, not because of the seed but due to the
# fact that the sets were being ordered differently for the two
# runs
bonds = OrderedDict()
for atom in amolecule.atom:
for bond in atom.bond:
key = [bond.atom1.index, bond.atom2.index]
key.sort()
key = tuple(key)
if key not in list(bonds):
bonds[key] = bond
return list(bonds.values())
[docs]def get_sorted_atomic_smarts(st, all_idxs, outer_sort=False, smarts_cache=None):
"""
Return a tuple of sorted atomic SMARTS for the given
collection of indicies in the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:type all_idxs: list
:param all_idxs: contains tuples of atom indices
:type outer_sort: bool
:param outer_sort: whether to sort the tuples in the
returned list
:type dict smarts_cache: keys are atom indexes, values are the SMARTS
pattern for that atom. Used to speed up multiple calls to this function.
This function will populate the dictionary as new atoms are computed.
:rtype: list
:return: contains tuples of sorted atomic SMARTS having
the same structure as the input indicies
"""
smarts = []
if smarts_cache is None:
smarts_cache = {}
for idxs in all_idxs:
patterns = []
for index in idxs:
try:
patterns.append(smarts_cache[index])
except KeyError:
with warnings.catch_warnings():
warnings.filterwarnings("ignore",
category=DeprecationWarning,
message="analyze.generate_smarts")
atsm = analyze.generate_smarts(st, atom_subset=[index])
patterns.append(atsm)
smarts_cache[index] = atsm
smarts.append(tuple(sorted(patterns)))
if outer_sort:
smarts = sorted(smarts, key=lambda x: x[0])
return smarts
def _update_type_dict(adict, amap):
"""
Return a copy of the given type dictionary with
atom indicies updated according to the given index
map.
:type adict: OrderedDict
:param adict: the type dictionary to be updated
:type amap: dict
:param amap: maps the indicies used in the given
dict to another set of indicies
:rtype: OrderedDict
:return: the updated dict
"""
new_adict = OrderedDict()
for key, values in adict.items():
new_values = []
for value in values:
new_value = [amap[x] for x in value]
new_values.append(tuple(new_value))
new_adict[key] = new_values
return new_adict
[docs]def get_atom_type_dict(st):
"""
Return an atom type dictionary for the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:rtype: OrderedDict
:return: atom type dict, keys are single tuples of SMARTS atom
types, like (smarts,), values are lists of single tuples of
atom indicies, like (1,)
"""
atom_type_dict = OrderedDict()
for atom in st.atom:
idxs = [(atom.index,)]
ahash = tuple(get_sorted_atomic_smarts(st, idxs))
atom_type_dict.setdefault(ahash, []).append(idxs[0])
return atom_type_dict
[docs]def get_bond_type_dict(st):
"""
Return a bond type dictionary for the given structure.
:type st: schrodinger.structure.Structure
:param st: the structure
:rtype: OrderedDict
:return: bond type dict, keys are pair tuples of SMARTS bond
types, like (smarts, smarts), values are lists of pair tuples
of atom indicies, like (1, 2)
"""
bond_type_dict = OrderedDict()
for bond in st.bond:
idxs = [(bond.atom1.index, bond.atom2.index)]
ahash = tuple(get_sorted_atomic_smarts(st, idxs))
bond_type_dict.setdefault(ahash, []).append(idxs[0])
return bond_type_dict
[docs]def print_allow_adsorption_onto(allow_adsorption_onto, logger=None):
"""
Log the list of allowed adsorption sites.
:type allow_adsorption_onto: list
:param allow_adsorption_onto: SMARTS patterns of arbitrary
adsorption sites to support
:type logger: logging.getLogger
:param logger: output logger
"""
if not allow_adsorption_onto or not logger:
return
INDEX_SEP = '. '
CUSTOM_ADSORPTION_SITE_DESCRIPTOR = 'custom_'
HEADER1 = 'Allowed adsorption sites'
HEADER2 = '-' * len(HEADER1)
logger.info(HEADER1)
logger.info(HEADER2)
logger.info('')
if Constants.SMARTS_WILDCARD in allow_adsorption_onto:
logger.info(Constants.ALL_SMARTS.capitalize())
else:
allow_adsorption_onto = list(allow_adsorption_onto)
total_dict = OrderedDict()
for key, value in Constants.ALLOW_ADSORPTION_ONTO.items():
if value in allow_adsorption_onto:
total_dict[key] = value
allow_adsorption_onto.remove(value)
for key, value in enumerate(allow_adsorption_onto, 1):
descriptor = \
CUSTOM_ADSORPTION_SITE_DESCRIPTOR + str(key)
total_dict[descriptor] = value
max_index = len(str(len(total_dict)) + INDEX_SEP)
for index, pair in enumerate(list(total_dict.items()), 1):
key, value = pair
index_part = str(str(index) + INDEX_SEP).ljust(max_index)
logger.info(index_part + key + ': ' + value)
logger.info('')
[docs]def get_structure_with_contiguous_molecules(molecules):
"""
Return a single structure object containing the given molecules but
such that the atom ordering per molecule is contiguous.
:type molecules: list of schrodinger.structure._Molecule
:param molecules: molecules that need to be contiguously assembled
into a single structure
:rtype: schrodinger.structure.Structure and two dicts
:return: mols_st, single structure containing all molecules in contiguous
form, and mols_new_to_old and mols_old_to_new, two reordering dictionaries
"""
def extract_molecule(molecule, start=1):
old_order = list(molecule.getAtomIndices())
new_order = list(range(start, len(old_order) + start))
new_to_old = dict(list(zip(new_order, old_order)))
old_to_new = dict(list(zip(old_order, new_order)))
molecule_st = molecule.extractStructure(copy_props=True)
return molecule_st, new_to_old, old_to_new
mols_st, mols_new_to_old, mols_old_to_new = extract_molecule(molecules[0])
for molecule in molecules[1:]:
mol_st, mol_new_to_old, mol_old_to_new = \
extract_molecule(molecule, mols_st.atom_total + 1)
mols_st.extend(mol_st)
mols_new_to_old.update(mol_new_to_old)
mols_old_to_new.update(mol_old_to_new)
return mols_st, mols_new_to_old, mols_old_to_new
[docs]class ReactantMolecule(object):
"""
Manage a reactant molecule in the reactants structure
object.
"""
HYDROGEN_SYM = 'H'
HYDROGEN_BOND_ORDER = 1
[docs] def __init__(self, molecule):
"""
Create an instance.
:type molecule: schrodinger.structure._Molecule
:param molecule: reactant molecule to be extracted from
the reactants structure
"""
self.mol_obj = molecule
self.new_to_orig_inds = {}
self.orig_to_new_inds = {}
for new, orig in enumerate(molecule.getAtomIndices(), 1):
self.new_to_orig_inds[new] = orig
self.orig_to_new_inds[orig] = new
self.mol_struct = molecule.extractStructure(copy_props=True)
self.reactive_inds_orig = []
self.reactive_inds_new = []
self.tmp_hydrogen_index = None
[docs] def setReactiveInds(self, reactive_inds_orig):
"""
Set the two lists of reactive indicies attributes using
the provided list of original reactive indicies
:type reactive_inds_orig: list
:param reactive_inds_orig: original reactive indicies for
this reactant molecule
"""
self.reactive_inds_orig = list(reactive_inds_orig)
self.reactive_inds_new = [
self.orig_to_new_inds[orig] for orig in reactive_inds_orig
]
[docs] def combineMapWith(self, other_molecule):
"""
Return the new to orig atom reordering map for this molecule
extended by that of the other molecule by offsetting the
keys of the map of the other molecule.
:type other_molecule: schrodinger.structure._Molecule
:param other_molecule: molecule whose new to orig atom
reordering map will be combined with that of the current
molecule
:rtype: dict
:return: final_map, the combined new to original atom
reordering maps of this molecule and the other molecule
"""
current_num_atoms = self.mol_struct.atom_total
final_map = self.new_to_orig_inds.copy()
for key, value in other_molecule.new_to_orig_inds.items():
final_map[key + current_num_atoms] = value
return final_map
[docs] def addTempHydrogens(self):
"""
Add a temporary hydrogen to the reactive atom of this molecule
to act as a bond handle for performing the reaction. All traces
of this temporary hydrogen will be removed from this instance
once the reactions are complete. Note that the family of
functions that handles the temporary hydrogens are put in
place so that all reaction channels can be handled just
like the bond-bond double displacement type, i.e. the
association and single displacement types then become
nothing more than double displacement with a fake hydrogen.
"""
atom_to_hydrogenate = self.mol_struct.atom[self.reactive_inds_new[0]]
atom_to_hydrogenate_vec = numpy.array(atom_to_hydrogenate.xyz)
vec_to_hydrogen = find_good_bond_vector(atom_to_hydrogenate)
# set the absolute position and bonding of the new temporary
# hydrogen handle, store the new atom index, and append it to
# the list of reactive atoms
hydrogen_xyz = atom_to_hydrogenate_vec + vec_to_hydrogen
hydrogen = self.mol_struct.addAtom(self.HYDROGEN_SYM, hydrogen_xyz[0],
hydrogen_xyz[1], hydrogen_xyz[2])
self.mol_struct.addBond(atom_to_hydrogenate, hydrogen,
self.HYDROGEN_BOND_ORDER)
self.tmp_hydrogen_index = hydrogen.index
self.reactive_inds_new.append(hydrogen.index)
[docs] def delTempHydrogens(self):
"""
Delete all traces of the previously added temporary
hydrogen.
"""
self.mol_struct.deleteAtoms([self.tmp_hydrogen_index])
self.reactive_inds_new.remove(self.tmp_hydrogen_index)
self.tmp_hydrogen_index = None
[docs] def reactWith(self, other_molecule, reverse=False):
"""
React this molecule with the provided other molecule
according to their reactive indicies and return the product
object.
:type other_molecule: ReactantMolecule
:param other_molecule: the input molecule that this molecule
will react with
:type reverse: bool
:param reverse: specifies to reverse the order of reactive
indicies for the other molecule
:rtype: Products
:return products: the products object containing the
components that result from reacting this molecule with
the input molecule
"""
# reverse handled whether A in the following A-B + C-D
# reaction is bound to C or D in the products
mol_one_from, mol_one_to = self.reactive_inds_new
if reverse:
mol_two_to, mol_two_from = other_molecule.reactive_inds_new
else:
mol_two_from, mol_two_to = other_molecule.reactive_inds_new
# call our wrapped verion of build.attach_structure, this
# is the first product (A-C or A-D), for which the A part
# will be fixed in space
product_st = self.mol_struct.copy()
mol_two_st = other_molecule.mol_struct.copy()
product_map, rev_product_map = \
wrapper_build_attach_structure(product_st,
mol_one_from, mol_one_to, mol_two_st, mol_two_from,
mol_two_to)
# call our wrapped verion of build.attach_structure, this
# is the second product (D-B or C-B), for which the D or C
# parts will be fixed in space
mol_one_st = self.mol_struct.copy()
other_product_st = other_molecule.mol_struct.copy()
other_product_map, rev_other_product_map = \
wrapper_build_attach_structure(other_product_st,
mol_two_to, mol_two_from, mol_one_st, mol_one_to,
mol_one_from)
# get Products object to finish this off
products = Products(product_st, product_map, rev_product_map,
other_product_st, other_product_map,
rev_other_product_map)
return products
[docs] def dissociate(self, dissociation_bond_length):
"""
Dissociate this molecule according to its reactive
indicies.
:type dissociation_bond_length: float
:param dissociation_bond_length: final bond length to use
for dissociative reaction channels
:rtype: Products
:return products: the products object containing the
components that result from reacting this molecule with
the input molecule
"""
product_st = self.mol_struct.copy()
# adjust the bond length
atom_one_ind, atom_two_ind = self.reactive_inds_orig
product_st.adjust(dissociation_bond_length, atom_one_ind, atom_two_ind)
# delete the bond
product_st.deleteBond(atom_one_ind, atom_two_ind)
# even though the Products class isn't perfect here we can
# easily still use it by passing the map twice, the map is
# lame because no real structure manipulations have been
# performed and so indexing hasn't changed
product_map = self.new_to_orig_inds.copy()
products = Products(product_st, product_map, product_map)
return products
[docs]class Products(object):
"""
Manage the properties of the products that result from
applying a reaction channel definition to the reactants.
"""
RXN_REPRESENTATION_KEY = 's_matsci_RXN_representation'
PRODUCT_NAME_BASE = 'product'
[docs] def __init__(self,
prod_one_st,
prod_one_map,
prod_one_rev_map,
prod_two_st=None,
prod_two_map=None,
prod_two_rev_map=None):
"""
Create an instance.
"""
self.prod_one_st = prod_one_st.copy()
self.prod_one_map = prod_one_map.copy()
self.prod_one_rev_map = prod_one_rev_map.copy()
if prod_two_st:
self.prod_two_st = prod_two_st.copy()
else:
self.prod_two_st = None
if prod_two_map:
self.prod_two_map = prod_two_map.copy()
if prod_two_rev_map:
self.prod_two_rev_map = prod_two_rev_map.copy()
self.new_total_reorder = []
self.product_st = self.prod_one_st.copy()
self.rxn_representation = None
self.index = None
self.product_sts = []
[docs] def deleteTmpHydrogens(self, mol_one, mol_two):
"""
Remove any of the previously added temporary hydrogens from
the product structures.
:type mol_one: schrodinger.structure._Molecule
:param mol_one: the first reactant molecule from which these
products came
:type mol_two: schrodinger.structure._Molecule
:param mol_two: the second reactant molecule from which these
products came
"""
def delete_tmp_hydrogens(amap, hydrogen, arevmap, struct):
hydrogen_new = amap.get(hydrogen)
if hydrogen_new:
arevmap = dict_delete_and_decrement_keys(arevmap, hydrogen_new)
struct.deleteAtoms([hydrogen_new])
return arevmap
# if the reactant molecule one had a temporary hydrogen
# then that hydrogen is now on product two from which
# it must be removed, for product two it is the second
# reactant molecules indexing which comes first so use
# that to offset the hydrogen index from reactant molecule
# one to get the final original hydrogen index to remove
# from product two, then find the new verion of this index
# and delete the atom and update the reverse atom map. Note
# that the reactant molecule one will never be left with
# its own temporary hydrogen in a product because of the
# way that the reverse option is applied only to reactant
# molecule two (that is why in the code block following
# the code block directly below you will see two called
# to delete_tmp_hydrogens, one for product one and one
# for product two, while in the block directly below you
# will only see one call).
if mol_one.tmp_hydrogen_index:
final_hydrogen_index = mol_one.tmp_hydrogen_index + \
mol_two.mol_struct.atom_total
self.prod_two_rev_map = \
delete_tmp_hydrogens(self.prod_two_map,
final_hydrogen_index, self.prod_two_rev_map,
self.prod_two_st)
# if the reactant molecule two had a temporary hydrogen
# then that hydrogen can now be on either product one or
# two from which it must be removed, for the product one
# it is the first reactant molecules indexing which comes
# first so use that to offset the hydrogen index from
# reactant molecule two to get the final original hydrogen
# index to remove from product # one, for the product two
# no offsetting will be needed because the temporary
# hydrogen belonged to the reactant from which the product
# came, then find the new verions of these indicies and
# delete the atoms and update the reverse atom maps
if mol_two.tmp_hydrogen_index:
final_hydrogen_index = mol_two.tmp_hydrogen_index + \
mol_one.mol_struct.atom_total
self.prod_one_rev_map = \
delete_tmp_hydrogens(self.prod_one_map,
final_hydrogen_index, self.prod_one_rev_map,
self.prod_one_st)
final_hydrogen_index = mol_two.tmp_hydrogen_index
self.prod_two_rev_map = \
delete_tmp_hydrogens(self.prod_two_map,
final_hydrogen_index, self.prod_two_rev_map,
self.prod_two_st)
[docs] def buildReorderList(self, mol_one, mol_two):
"""
Build the atom reorder list needed to make the product
structure ordering consistent with that of the original
reactants.
:type mol_one: schrodinger.structure._Molecule
:param mol_one: the first reactant molecule from which these
products came
:type mol_two: schrodinger.structure._Molecule
:param mol_two: the second reactant molecule from which these
products came
"""
# get the reorder map for product one using two maps,
# one from product to reactant molecules and another from
# reactant molecules to reactants
prod_one_map_to_orig = mol_one.combineMapWith(mol_two)
prod_one_reorder = []
for atom in self.prod_one_st.atom:
if atom.index in self.prod_one_rev_map:
orig_ind = \
prod_one_map_to_orig[self.prod_one_rev_map[atom.index]]
prod_one_reorder.append(orig_ind)
# get the reorder map for product two using two maps,
# one from product to reactant molecules and another from
# reactant molecules to reactants
prod_two_map_to_orig = mol_two.combineMapWith(mol_one)
prod_two_reorder = []
for atom in self.prod_two_st.atom:
if atom.index in self.prod_two_rev_map:
orig_ind = \
prod_two_map_to_orig[self.prod_two_rev_map[atom.index]]
prod_two_reorder.append(orig_ind)
# sum and invert the two product maps to get something that
# build.reorder_atoms can process
total_reorder = prod_one_reorder + prod_two_reorder
for element in range(1, len(total_reorder) + 1):
index = total_reorder.index(element)
self.new_total_reorder.append(index + 1)
[docs] def combineAndReorder(self):
"""
Assemble the final product structure for this reaction
channel by combining the structure objects for each
product molecule followed by reordering the product
atoms to be consistent with the reactants.
"""
# merge the product structures and apply the reorder map
self.product_st = self.prod_one_st.copy()
self.product_st.extend(self.prod_two_st)
self.product_st = build.reorder_atoms(self.product_st,
self.new_total_reorder)
[docs] def extendWithSpectator(self, spectator_st, map_to_orig):
"""
Extend the product structure object with the spectator
structure object and reorder the atoms to be consistent
with that of the reactants from which they came.
:type spectator_st: schrodinger.structure.Structure
:param spectator_st: the structure object containing
all spectator molecules
:type map_to_orig: dict
:param map_to_orig: an atom reordering dictionary where
the keys are the reactant atom indicies followed by the
spectator atom indicies and the values are their values
"""
# if there were non-reactive spectator reactant structures
# then append them to the product structure
if spectator_st:
self.product_st.extend(spectator_st)
# build the reorder map from the map back to original atom
# indicies, i.e. prior to splitting off the spectators and
# prior to reaction
reorder_map = []
for atom in self.product_st.atom:
orig_ind = map_to_orig[atom.index]
reorder_map.append(orig_ind)
# invert the reorder map
new_reorder_map = []
for index in range(1, len(reorder_map) + 1):
new_index = reorder_map.index(index)
new_reorder_map.append(new_index + 1)
# apply reorder map
self.product_st = build.reorder_atoms(self.product_st, new_reorder_map)
[docs] def updateProperties(self, index, channeldef, reverse=False):
"""
Set some properties on this product structure.
:type index: int
:param index: index of this set of products
:type channeldef: ChannelDefinition
:param channeldef: the channel definition from which
this product came
:type reverse: bool
:param reverse: specifies that the order of the reactive
indicies for the second molecule were reversed
"""
def separate_mol_part(mol):
mol = mol.strip().split(SECOND_SEP)
if len(mol) == 2:
atom_a, atom_b = mol
else:
atom_a, atom_b = mol[0], None
return atom_a, atom_b
def get_bond(atom_a, atom_b):
if atom_b:
bond_ab = atom_a + BOND_SEP + atom_b
else:
bond_ab = atom_a
return bond_ab
NAME_SEP = '.'
self.index = index
current_name = self.product_st.property[CheckInput.TITLE_KEY]
name_tag = current_name + NAME_SEP + self.PRODUCT_NAME_BASE + \
NAME_SEP + str(self.index)
self.product_st.property[CheckInput.TITLE_KEY] = name_tag
self.product_st.property[CheckInput.ENTRY_NAME_KEY] = name_tag
FIRST_SEP = ChannelDefinition.FIRST_SEP
SECOND_SEP = ChannelDefinition.SECOND_SEP
BOND_SEP = '-'
REACT_SEP = ' + '
RXN_SEP = ' --> '
mol_one_def, mol_two_def = channeldef.__repr__().split(FIRST_SEP)
atom_a, atom_b = separate_mol_part(mol_one_def)
atom_c, atom_d = separate_mol_part(mol_two_def)
left_side = get_bond(atom_a, atom_b) + REACT_SEP + \
get_bond(atom_c, atom_d)
if atom_b and atom_d:
if reverse:
right_side = get_bond(atom_a, atom_d) + REACT_SEP + \
get_bond(atom_b, atom_c)
else:
right_side = get_bond(atom_a, atom_c) + REACT_SEP + \
get_bond(atom_b, atom_d)
elif atom_b:
if reverse:
right_side = get_bond(atom_a, atom_d) + REACT_SEP + \
get_bond(atom_b, atom_c)
else:
right_side = get_bond(atom_a, atom_c) + REACT_SEP + \
get_bond(atom_b, atom_d)
elif atom_d:
if reverse:
right_side = get_bond(atom_c, atom_b) + REACT_SEP + \
get_bond(atom_a, atom_d)
else:
right_side = get_bond(atom_a, atom_c) + REACT_SEP + \
get_bond(atom_d, atom_b)
else:
right_side = get_bond(atom_a, atom_c)
if self.prod_two_st:
self.rxn_representation = left_side + RXN_SEP + right_side
else:
self.rxn_representation = right_side + RXN_SEP + left_side
self.product_st.property[self.RXN_REPRESENTATION_KEY] = \
self.rxn_representation
[docs] def isolateProducts(self, isolate_products, flatdef):
"""
If requested isolate the individual products to their
own structure objects.
:type isolate_products: bool
:param isolate_products: specifies that products should be
returned individually rather than as a whole
:type flatdef: list
:param flatdef: list containing the original reactive atom
indicies of the input structure
"""
if not isolate_products:
self.product_sts = [self.product_st]
return
NAME_SEP = '.'
title_tag = self.product_st.property[CheckInput.TITLE_KEY]
title_tag += NAME_SEP
entry_tag = self.product_st.property[CheckInput.ENTRY_NAME_KEY]
entry_tag += NAME_SEP
# from all molecules find the product molecules that
# resulted from a reaction and extract them, update
# properties, and append to list of product structures,
# skip over products that are atoms
flatdef_set = set(flatdef)
index = 1
for molecule in self.product_st.molecule:
if set(molecule.getAtomIndices()).intersection(flatdef_set) \
and len(molecule.atom) > 1:
product_st = molecule.extractStructure(True)
product_st.property[CheckInput.TITLE_KEY] = \
title_tag + str(index)
product_st.property[CheckInput.ENTRY_NAME_KEY] = \
entry_tag + str(index)
self.product_sts.append(product_st)
index += 1
[docs]class Reactants(object):
"""
Manage the properties of the reactants.
"""
[docs] def __init__(self, allreactants, listdef):
"""
Create an instance.
:type allreactants: schrodinger.structure.Structure
:param allreactants: the structure object of the input
reactants
:type listdef: list
:param listdef: a list of two lists containing the reaction
channel definitions
"""
self.allreactants = allreactants
self.def_part_one = list(listdef[0])
self.def_part_two = list(listdef[1])
self.reactant_st = None
self.reactant_new_to_orig = {}
self.reactant_orig_to_new = {}
self.spectator_st = None
self.spectator_new_to_orig = {}
self.spectator_orig_to_new = {}
self.map_to_orig = {}
self.mol_one = None
self.mol_two = None
self.splitIntoComponents()
self.updateDefParts()
self.setReactiveMolecules()
[docs] def splitIntoComponents(self):
"""
Split the input set of all reactants into reactive
molecules and a set of spectator molecules.
"""
# get reactive molecules
mol_one = \
self.allreactants.atom[self.def_part_one[0]].getMolecule()
mol_two = \
self.allreactants.atom[self.def_part_two[0]].getMolecule()
mol_reactants = [mol_one]
if mol_two.number != mol_one.number:
mol_reactants.append(mol_two)
self.reactant_st, self.reactant_new_to_orig, self.reactant_orig_to_new = \
get_structure_with_contiguous_molecules(mol_reactants)
# get spectator molecules
mol_spectators = []
for molecule in self.allreactants.molecule:
if molecule.number != mol_one.number and \
molecule.number != mol_two.number:
mol_spectators.append(molecule)
if mol_spectators:
self.spectator_st, self.spectator_new_to_orig, self.spectator_orig_to_new = \
get_structure_with_contiguous_molecules(mol_spectators)
# set reorder map used to later reorder products plus
# spectators back to original ordering
self.map_to_orig = self.reactant_new_to_orig.copy()
for key, value in self.spectator_new_to_orig.items():
self.map_to_orig[key + self.reactant_st.atom_total] = \
value
[docs] def updateDefParts(self):
"""
Update the indicies in the reaction channel definition parts
to be consistent with the new reactive molecule ordering.
"""
self.def_part_one = [
self.reactant_orig_to_new[index] for index in self.def_part_one
]
self.def_part_two = [
self.reactant_orig_to_new[index] for index in self.def_part_two
]
[docs] def setReactiveMolecules(self):
"""
Set the reactive molecule attributes.
"""
self.mol_one = \
self.reactant_st.atom[self.def_part_one[0]].getMolecule()
self.mol_two = \
self.reactant_st.atom[self.def_part_two[0]].getMolecule()
if self.mol_one.number == self.mol_two.number:
self.mol_two = None
[docs]class Channels(object):
"""
Manage the enumeration of reaction channels.
"""
# use OPLS 2005
OPLS_VERSION = minimize.OPLS_2005
OPLS_ENERGY_KEY = mm.OPLS_POTENTIAL_ENERGY
[docs] def __init__(self,
reactants,
channeldefs,
allow_adsorption_onto=None,
dissociation_bond_length=Constants.DISSOCIATION_BOND_LENGTH,
no_minimization=Constants.NO_MINIMIZATION,
isolate_products=Constants.ISOLATE_PRODUCTS,
no_reactive_h=Constants.NO_REACTIVE_H,
unique=Constants.UNIQUE,
mae_out_file=None,
logger=None):
"""
Create an instance.
:type reactants: schrodinger.structure.Structure
:param reactants: the reactants from which channels will be
enumerated
:type channeldefs: ChannelDefinitions
:param channeldefs: list of ChannelDefinitions
:type allow_adsorption_onto: None or list
:param allow_adsorption_onto: SMARTS patterns of arbitrary
adsorption sites to support
:type dissociation_bond_length: float
:param dissociation_bond_length: final bond length to use
for dissociative reaction channels
:type no_minimization: bool
:param no_minimization: specifies that product geometries
should not be minimized
:type isolate_products: bool
:param isolate_products: specifies that products should be
returned individually rather than as a whole
:type no_reactive_h: bool
:param no_reactive_h: specify that R-H bonds be considered inert
:type unique: bool
:param unique: provide only unique products, firstly do not
calculate redundant products and secondly filter out
duplicate final structures using SMILES
:type mae_out_file: str
:param mae_out_file: the full path, including filename, that
will be used to write the output `*mae` file that will contain
the final set of structures, i.e. reactants plus all sets of
products
:type logger: logging.getLogger
:param logger: output logger
"""
self.reactants = reactants
self.channeldefs = channeldefs
self.allow_adsorption_onto = allow_adsorption_onto
self.dissociation_bond_length = dissociation_bond_length
self.no_minimization = no_minimization
self.isolate_products = isolate_products
self.no_reactive_h = no_reactive_h
self.unique = unique
if not mae_out_file:
self.mae_out_file = os.path.basename(__file__).rstrip('.py')
else:
self.mae_out_file = mae_out_file
self.logger = logger
# A set of SMILES for the structures that have been processed
self.unique_smiles = set()
# Titles for the structures that have been filtered out due to
# duplicate SMILES
self.filtered_titles = []
self.not_minimized = []
self.rxn_representations = []
self.mae_writer = None
self.fast3d_obj = None
self.add_hydrogens = False
self.run_stereo = False
[docs] def setUpFast3D(self):
"""
Set up the fast 3D object.
"""
optimize = True
self.fast3d_obj = fast3d.Volumizer('default', optimize)
[docs] def updateChannelDefReprs(self):
"""
Update the representations of the reaction channel
definitions.
"""
FIRST_SEP = ChannelDefinition.FIRST_SEP
SECOND_SEP = ChannelDefinition.SECOND_SEP
def get_repr_with_names(reactant):
START = '('
END = ')'
repr_with_names = []
for index in reactant:
atom = self.reactants.atom[index]
if atom.name:
name = atom.name
else:
name = atom.element
repr_str = name + START + str(index) + END
repr_with_names.append(repr_str)
return repr_with_names
for channeldef in self.channeldefs.definitions:
r_one, r_two = channeldef.listdef
r_one = SECOND_SEP.join(get_repr_with_names(r_one))
r_two = SECOND_SEP.join(get_repr_with_names(r_two))
channeldef.representation = \
r_one + FIRST_SEP + SECOND_SEP + r_two
[docs] def printChannelDefs(self):
"""
Provide a formatted log of the final reaction channel
definitions.
"""
HEADER = 'Reaction channel definitions'
SEPARATOR = ':'
defs = self.channeldefs.definitions
index_max_char = len(str(len(defs)))
index_width = index_max_char + len(SEPARATOR)
definition_max_char = [
len(definition.__repr__()) for definition in defs
]
definition_width = max(definition_max_char) + 1
if self.logger:
self.logger.info(HEADER)
self.logger.info('-' * len(HEADER))
self.logger.info('')
for index, definition in enumerate(defs, 1):
index_str = \
str(str(index) + SEPARATOR).ljust(index_width)
definition_str = \
definition.__repr__().rjust(definition_width)
self.logger.info(index_str + definition_str)
self.logger.info('')
[docs] def initMaestroWriter(self):
"""
Initialize the Maestro writer for the output `*mae` file and
put the reactants structure in there.
"""
self.mae_writer = structure.MaestroWriter(self.mae_out_file)
[docs] def termMaestroWriter(self):
"""
Terminate the Maestro writer for the output `*mae` file.
"""
self.mae_writer.close()
[docs] def processProducts(self, product_sts):
"""
Process the product structures.
:type product_sts: list of schrodinger.structure.Structure
:param product_sts: contains all product structures for a
given reaction channel
"""
if self.unique:
f_product_sts, titles = get_deduped_monomers(product_sts)
self.filtered_titles.extend(titles)
else:
f_product_sts = list(product_sts)
for product_st in f_product_sts:
title = product_st.property[CheckInput.TITLE_KEY]
# if the final structures are to be uniquified then do the
# filtering here, since per reaction channel deduping is done
# above this controls the deduping of products from different
# reaction channels, only write the unique structures and collect
# redundancies to be logged later.
if self.unique:
smiles = analyze.generate_smiles(product_st)
if smiles not in self.unique_smiles:
self.unique_smiles.add(smiles)
else:
self.filtered_titles.append(title)
continue
# by default before any minimization eliminate any intra- or
# inter-molecular ring spears or concatenations, if we know
# that this elimination protocol has failed then just proceed
# with the ring speared structure but mark it with a structure
# property, for cases see MATSCI-1758
if ringspear.check_for_spears(product_st):
tmp_st = product_st.copy()
try:
self.fast3d_obj.run(tmp_st, self.add_hydrogens,
self.run_stereo)
product_st = tmp_st.copy()
except Exception:
product_st.property[FAILED_SPEAR_ELIM_KEY] = True
# all product structure objects are copied from the reactant
# structure object so if they can't be optimized then delete
# some OPLS properties in case they exist
if not self.no_minimization:
if minimize_geometry(product_st):
self.not_minimized.append(title)
product_st.property.pop(self.OPLS_ENERGY_KEY, None)
self.mae_writer.append(product_st)
[docs] def enumerateChannels(self):
"""
Main function to enumerate reaction channels.
"""
# loop over channel definitions and create the product
# structures
index = 1
for channeldef in self.channeldefs.definitions:
# split the reactive system into one or two reactive
# molecules and spectators
components = Reactants(self.reactants, channeldef.listdef)
# handle dissociations, i.e. only one reactive
# molecule, or associations, single displacements, and
# double displacements all of which involve two reactive
# molecules
if not components.mol_two:
mol_one = ReactantMolecule(components.mol_one)
mol_one.setReactiveInds(components.def_part_one + \
components.def_part_two)
products = mol_one.dissociate(self.dissociation_bond_length)
products.extendWithSpectator(components.spectator_st,
components.map_to_orig)
products.updateProperties(index, channeldef)
self.rxn_representations.append(products.rxn_representation)
products.isolateProducts(self.isolate_products,
channeldef.flatdef)
# loop over products, minimize and append to list
self.processProducts(products.product_sts)
index += 1
else:
mol_one = ReactantMolecule(components.mol_one)
mol_two = ReactantMolecule(components.mol_two)
mol_one.setReactiveInds(components.def_part_one)
mol_two.setReactiveInds(components.def_part_two)
# if association or single displacement then add a
# temporary hydrogen handle so that they can be handled
# using the double displacment protocol
if len(mol_one.reactive_inds_new) == 1:
mol_one.addTempHydrogens()
if len(mol_two.reactive_inds_new) == 1:
mol_two.addTempHydrogens()
# for each single or double displacement channel, i.e.
# A-B + C-D, there are two possible products, i.e.
# A-C + B-D or A-D + B-C, this is handled by do_which_types
# which is affected by unique, if unique has not been specified
# then we want both channels regardless and so we set both
# to True
if self.unique:
do_which_types = channeldef.do_which_types
else:
do_which_types = [True, True]
# do_which_types is only ever set for single and double
# displacement channels, for association we want to skip
# the swapping of fake hydrogens and so we pass True and False
if mol_one.tmp_hydrogen_index and mol_two.tmp_hydrogen_index:
do_which_types = [True, False]
# reverse will indicate whether the definition should be
# reversed (necessary for the second type)
is_reverse = [False, True]
# loop over sub-channel cases
for atype, reverse in zip(do_which_types, is_reverse):
if atype:
products = mol_one.reactWith(mol_two, reverse)
products.deleteTmpHydrogens(mol_one, mol_two)
products.buildReorderList(mol_one, mol_two)
products.combineAndReorder()
products.extendWithSpectator(components.spectator_st,
components.map_to_orig)
products.updateProperties(index, channeldef, reverse)
self.rxn_representations.append(
products.rxn_representation)
products.handleMetalBonding(channeldef.listdef, reverse)
products.isolateProducts(self.isolate_products,
channeldef.flatdef)
# loop over products, minimize and append to list
self.processProducts(products.product_sts)
index += 1
# post-process those molecules for which temporary
# hydrogens were added
if mol_one.tmp_hydrogen_index:
mol_one.delTempHydrogens()
if mol_two.tmp_hydrogen_index:
mol_two.delTempHydrogens()
[docs] def checkOutputFile(self):
"""
Check the output file.
"""
NO_PRODUCTS_MSG = """This calculation has resulted in zero
relevant product structures. Please check your input and
settings. This script will now exit."""
if structure.count_structures(self.mae_out_file) == 1:
if self.logger:
self.logger.error(NO_PRODUCTS_MSG)
fileutils.force_remove(self.mae_out_file)
sys.exit(1)
[docs] def printRxnReprs(self):
"""
Log a formatted print of all reaction channels in their
reaction representation.
"""
BUFFER = 1
SEP = '.'
# get some max field widths
max_repr = 0
for rxn_repr in self.rxn_representations:
length = len(rxn_repr)
if length > max_repr:
max_repr = length
max_index = len(str(len(self.rxn_representations))) + len(SEP)
HEADER1 = 'Reactions'
HEADER2 = '-' * (len(HEADER1))
self.logger.info(HEADER1)
self.logger.info(HEADER2)
self.logger.info('')
for index, rxn_repr in enumerate(self.rxn_representations, 1):
index_repr = str(index) + SEP
rxn_line = index_repr.ljust(max_index) + ' ' * BUFFER + \
rxn_repr.rjust(max_repr)
self.logger.info(rxn_line)
self.logger.info('')
[docs] def printFiltered(self):
"""
Log the structures that were filtered by SMILES.
"""
if self.filtered_titles and self.logger:
FILTERED_MSG = """The following product structures will not
be returned because they have non-unique SMILES, i.e. they
are redundant conformers. If you wish to have these structures
as well then do not pass the -unique flag. Here are the
filtered structures: %s."""
self.logger.warning(FILTERED_MSG % self.filtered_titles)
[docs] def printNotMinimized(self):
"""
Log the structures that were not minimized.
"""
num_total_structs = structure.count_structures(self.mae_out_file)
ALL_STRUCTURES = '[ <all structures> ]'
if self.not_minimized:
NOT_MINIMIZED_MSG = """The geometries for the following
list of structures were not able to be minimized likely
due to problems with the defined atom types: %s. Please
try redefining those atom types, for example if your
system contains metals then all bonds to those metals
may need to be of zero-order. The latter can be done
with the Edit > Build > Create Zero-Order Bonds to Metals
option in Maestro or with the bond decrement button.
Note that reactive bonds can only be single bonds."""
if self.logger:
if len(self.not_minimized) == num_total_structs:
self.logger.warning(NOT_MINIMIZED_MSG % ALL_STRUCTURES)
else:
self.logger.warning(NOT_MINIMIZED_MSG % self.not_minimized)
[docs] def orchestrate(self):
"""
Orchestrate all of the components of this main class.
"""
self.setUpFast3D()
self.checkInputParams()
self.updateChannelDefReprs()
self.printChannelDefs()
# set up writer
self.initMaestroWriter()
# minimize reactant structure so that total energies can be
# compared with that of products, think of reactants as just
# another set of products for the time being
self.processProducts([self.reactants])
# do the enumeration
self.enumerateChannels()
# tear down writer
self.termMaestroWriter()
if self.logger:
self.printRxnReprs()
self.printFiltered()
self.printNotMinimized()
# now that the output Maestro file is closed ensure that we have
# products, this check was implemented when it was found that
# running a unique dissociation calculation on zero-order bonds
# produced no products (unless products are isolated), just
# run this check all of the time
self.checkOutputFile()