"""
Classes and functions for building reaction channels.
Copyright Schrodinger, LLC. All rights reserved."""
# Contributor: Thomas F. Hughes
import os
import re
import sys
import time
import warnings
from collections import OrderedDict
import numpy
import schrodinger.structure as structure
from schrodinger.application.matsci import amorphous
from schrodinger.application.matsci import msprops
from schrodinger.infra import fast3d
from schrodinger.infra import mm
from schrodinger.structutils import analyze
from schrodinger.structutils import build
from schrodinger.structutils import minimize
from schrodinger.structutils import rings
from schrodinger.structutils import ringspear
from schrodinger.structutils import transform
from schrodinger.test import ioredirect
from schrodinger.utils import fileutils
# MATSCI-5124 - in contrast to the random module
# the methods in the numpy.random module are consistent
# between Python 2 and 3
my_random = numpy.random.RandomState()
# ring spear elimination failure key
FAILED_SPEAR_ELIM_KEY = 'b_matsci_Failed_Ring_Spear_Elimination'
# Two vectors with a dot product < this are considered perpendicular
PERPENDICULAR_DOT_PROD_THRESH = 0.05
[docs]def detect_tetrahedral(bond_vecs):
    """
    Detect tetrahedral geometry
    :param list(numpy.array) bond_vecs: Each item is a numpy.array(x, y, z) for
        the vector from the atom in question along a bond
    :rtype: bool
    :return: Whether the atom with these bond vectors is tetrahedral (4 bonds
        with a bond angle sum >> 360
    """
    # Must have 4 bonds to be tetrahedral
    if len(bond_vecs) != 4:
        return False
    # Accumulate the angles between all bond vectors
    angles = []
    for index, vec1 in enumerate(bond_vecs):
        for vec2 in bond_vecs[index + 1:]:
            radians = transform.get_angle_between_vectors(vec1, vec2)
            angles.append(numpy.degrees(radians))
    # For the square planar case (which is what we are trying to distinguish
    # from, there will be 4 nearest neighbor (cis) angles that are near 90, and
    # 2 angles with trans bonds that are near 180. The tetrahedral case will
    # have all 6 angles near 109.5. So sort the bond angles and check the sum of
    # the smallest 4 angles.
    angles.sort()
    nearest_neighbor_angles = angles[:4]
    angle_sum = sum(nearest_neighbor_angles)
    # Square planer = 360, tetrahedral ~= 438
    return angle_sum > 400 
[docs]def trigonal_bonding_vector(bond_vecs, director=None):
    """
    Find the optimal 4th bond vector to add to a system with 3 existing bond
    vectors.
    For the case of an atom with 3 bonds, the 4th bond vector should be normal
    to the plane formed by the 3 bonded atoms. The is true whether the system is
    trigonal planar or trigonal pyramidal.
    :param list(numpy.array) bond_vecs: Each item is a numpy.array(x, y, z) for
        the vector from the atom in question along a bond
    :param numpy.array director: In the case where the chosen bond vector is at
        a 90 degree angle to the existing bond_vecs, the up/down direction of
        the bond vector is ambiguous. Choose the direction that best aligns with
        director.
    :rtype: numpy.array
    :return: The optimal next bonding vector pointing away from the atom
    :raise RuntimeError: If bond_vecs doesn't have 3 items
    """
    if len(bond_vecs) != 3:
        raise RuntimeError('bond_vecs must be of length 3')
    # vec1 points from the first bonding atom to the second, vec2 from the
    # second to the third. The two vectors define the plane of the three atoms
    vec1 = bond_vecs[0] - bond_vecs[1]
    vec2 = bond_vecs[1] - bond_vecs[2]
    # Find the normal to the plane.
    normal = numpy.cross(vec1, vec2)
    vector = transform.get_normalized_vector(normal)
    # The normal can point above or below the plane. Place the normal on the
    # central atom and check wither it is aligned (dot product > 0) or
    # anti-aligned (dot product < 0). We want the vector to be anti-aligned so
    # that it points away from the other bonds. (Note - reversing vectors with
    # dot product = 0 (completely planar bond vecs) keeps results consistent
    # with those from 19-4 and previous)
    dotprod = vector.dot(bond_vecs[0])
    if abs(dotprod) < PERPENDICULAR_DOT_PROD_THRESH and director is not None:
        # The vector is almost perpendicular to a bond vector, which means all
        # bond vectors lie almost in the same plane. Choose the perpendicular
        # direction that is best aligned with the supplied director
        vector = find_best_match_with_director([vector, -vector], director)
    elif dotprod >= 0:
        # The vector points in the same direction as the bond vectors, reverse
        # it
        vector = -vector
    return vector 
[docs]def tetrahedral_bonding_vector(bond_vecs, director=None):
    """
    Find the optimal 5th bond vector to add to a system with 4 existing
    tetrahedral bond vectors.
    For tetrahedral systems, the 5th vector gets added anti-parallel to one of
    the bonding vectors (classic Sn2-type reaction geometry). The first bond is
    chosen unless director is given, in which case the bond is chosen so that
    the new bond vector best aligns with the director.
    :param list(numpy.array) bond_vecs: Each item is a numpy.array(x, y, z) for
        the vector from the atom in question along a bond
    :param numpy.array director: Chose the antiparallel bond vector that best
        aligns with this vector.
    :rtype: numpy.array
    :return: The optimal next bonding vector pointing away from the atom
    :raise RuntimeError: If bond_vecs doesn't have 4 items
    """
    if len(bond_vecs) != 4:
        raise RuntimeError('bond_vecs must be of length 4')
    if director is None:
        return -bond_vecs[0]
    # Take the negative of each bond vector and use the one with the most
    # positive dot product (best alignment) with the director
    candidates = [-x for x in bond_vecs]
    return find_best_match_with_director(candidates, director) 
[docs]def find_best_match_with_director(candidates, director):
    """
    Find the vector in candidates that has the best alignment with director
    :param list(numpy.array) candidates: The vectors to choose from
    :param numpy.array director: The chosen vector should be the one that aligns
        most strongly with this vector
    :rtype: numpy.array
    :return: The vector from candidates that best aligns with director
    """
    best_dot_prod = -numpy.inf
    best_vector = None
    for candidate in candidates:
        dotp = director.dot(candidate)
        if dotp > best_dot_prod:
            best_dot_prod = dotp
            best_vector = candidate
    return best_vector 
def _find_perpendicular_bond_vector(bond_vecs, director=None):
    """
    For the given bond vectors, find pairs (or pairs of (bond vector, cardinal
    axis)) that are non-colinear and produce perpendicular vectors by taking the
    cross product. Choose an arbitrary resulting vector or the one that best
    aligns with the director if supplied.
    Because the three cardinal axes are used as a backup, this method is
        gauranteed to find a pependicular vector.
    :param list bond_vecs: Each item is a numpy.array for one bond vector
    :param numpy.array director: If supplied, choose the resulting perpendicular
        vector that best aligns (positive dot product) with this vector
    :rtype: numpy.array
    :return: The perpendicular bond vector (not normalized)
    """
    def colinear_vectors(vec_one, vec_two):
        """
        Check if two vectors are colinear or not
        :param numpy.array vec_one: The first vector
        :param numpy.array vec_two: The first vector
        :rtype: bool
        :return: If the two vector are colinear
        """
        colinear_thresh = 0.01
        if abs(1 - abs(numpy.dot(vec_one, vec_two))) > colinear_thresh:
            return False
        else:
            return True
    def _find_candidates(vec1, other_vecs, candidates):
        """
        Find cross products between vec1 and and other vector that is
        not co-linear with it
        :param numpy.array vec1: The reference vector
        :param list other_vecs: numpy arrays that represent other
            vectors than can be used to obtain a cross product with vec1
        :param list candidates: A list of cross-products that have been
            found so far. Any new cross products will be added to it
        """
        for ovec in other_vecs:
            if not colinear_vectors(vec1, ovec):
                # Cross product gives a vector normal to the plane of
                # the two vectors. There's no reason to assume one of the two
                # possible directions for a cross product is the better one, so
                # use them both.
                crossp = numpy.cross(vec1, ovec)
                candidates.append(crossp)
                candidates.append(-crossp)
    candidates = []
    # Prefer to use existing bond vectors
    for index, bv1 in enumerate(bond_vecs):
        _find_candidates(bv1, bond_vecs[index + 1:], candidates)
    # If no two bonds are not colinear, use cardinal axes. It's guaranteed one
    # of them will be non-colinear
    if not candidates:
        cards = [transform.X_AXIS, transform.Y_AXIS, transform.Z_AXIS]
        for bv1 in bond_vecs:
            _find_candidates(bv1, cards, candidates)
    if director is None:
        # If there is no director, choose an arbitrary vector
        new_bond_vector = candidates[0]
    else:
        # If there is a director, choose the vector that aligns best
        # with the director
        new_bond_vector = find_best_match_with_director(candidates, director)
    return new_bond_vector
[docs]def find_good_bond_vector(atom, length=1.0, director=None, pbc=None):
    """
    Find a good vector to add a new bond given the existing bonds to this atom
    :param 'schrodinger.structure._StructureAtom` atom: The atom to find the
        bond to
    :param float length: The desired length of the bond vector
    :param numpy.array director: In some cases, such as trigonal planar and
        tetrahedral, a simple choice is made by this function between several
        directions. If a director is supplied, the choice that best aligns with
        the director will be chosen.
    :param `infrastructure.PBC` pbc: If given, will be used to find the closest
        image of all neighboring atoms
    :rtype: numpy.array
    :return: An array pointing from the given atom along the new bond vector
    """
    bond_sum_thresh = 0.1
    atom_vec = numpy.array(atom.xyz)
    # if the molecule is just an atom then grab the x-axis as a
    # handle, otherwise obtain the handle from the list of existing
    # bonds taking account for symmetric cases.  there are a few
    # different protocols that can be used here but I think this
    # one will work nicely though not exhaustively
    if not atom.bond_total:
        if director is None:
            new_bond_vector = numpy.array(transform.X_AXIS)
        else:
            new_bond_vector = numpy.copy(director)
    else:
        # make a handle that is opposite the sum of existing bond
        # vectors
        bond_vecs = []
        for neighbor in atom.bonded_atoms:
            neighbor_vec = numpy.array(neighbor.xyz)
            if pbc:
                # Make sure the correct image is used for the neighbor atom
                bond_vec = pbc.getShortestVector(*atom_vec, *neighbor_vec)
            else:
                bond_vec = neighbor_vec - atom_vec
            bond_vec = transform.get_normalized_vector(bond_vec)
            bond_vecs.append(bond_vec)
        num_bonds = len(bond_vecs)
        if num_bonds == 3:
            new_bond_vector = trigonal_bonding_vector(bond_vecs,
                                                      director=director)
        elif detect_tetrahedral(bond_vecs):
            new_bond_vector = tetrahedral_bonding_vector(bond_vecs,
                                                         director=director)
        else:
            new_bond_vector = -1 * sum(bond_vecs)
        # if that handle vector is small, for example for a fully symmetric
        # system, then pick a preexisting bond and find another non-colinear
        # bond or cardinal vector for which their normal is non-zero.
        if transform.get_vector_magnitude(new_bond_vector) < bond_sum_thresh:
            new_bond_vector = _find_perpendicular_bond_vector(
                bond_vecs, director)
        new_bond_vector = transform.get_normalized_vector(new_bond_vector)
    return new_bond_vector * length 
[docs]def random_choice(collection):
    """
    Return a random element from the given collection.  Wraps
    numpy.random.RandomState.choice to allow for multi-dimensional
    input, i.e. cases where elements are also iterables.  Choices
    are made using a uniform distribution.
    :type collection: iterable
    :param collection: the collection from which to randomly choose
        an element
    :rtype: any
    :return: the chosen element, potentially an iterable
    """
    # numpy.random.RandomState.choice requires that input be 1D so
    # choose from an iterable of iterables by indexing
    elements = list(collection)
    idx = my_random.randint(len(elements))
    return elements[idx] 
[docs]class Constants(object):
    """
    Manage some constants.
    """
    CHANNEL_FILE = 'CHANNEL_FILE'
    NUM_RANDOM_CHANNELS = 0
    ALL_RANDOM_CHANNELS = 'all'
    NUM_RANDOM_CHANNELS_METAVAR = 'INT or \'%s\'' % ALL_RANDOM_CHANNELS
    ASSOCIATION_TYPE = 'association'
    DISSOCIATION_TYPE = 'dissociation'
    SINGLE_DISPLACEMENT_TYPE = 'single_displacement'
    DOUBLE_DISPLACEMENT_TYPE = 'double_displacement'
    ALL_TYPES = 'all'
    RANDOM_TYPES_CHOICES = [
        ASSOCIATION_TYPE, DISSOCIATION_TYPE, SINGLE_DISPLACEMENT_TYPE,
        DOUBLE_DISPLACEMENT_TYPE, ALL_TYPES
    ]
    DEFAULT_RANDOM_TYPES = [DOUBLE_DISPLACEMENT_TYPE]
    RANDOM_SEED = None
    ALLOW_ADSORPTION_ONTO = OrderedDict([
        ('graphene', ('[c-0X3][c-0X3]([c-0X3]([c-0X3]([c-0X3])'
                      '[c-0X3])[c-0X3]([c-0X3])[c-0X3])[c-0X3]'))
    ])
    ALL_SMARTS = 'all'
    SMARTS_WILDCARD = '[*]'
    ALLOW_ADSORPTION_ONTO_METAVAR = 'SMARTS or \'%s\'' % ALL_SMARTS
    DISSOCIATION_BOND_LENGTH = 10.0
    NO_MINIMIZATION = False
    ISOLATE_PRODUCTS = False
    NO_REACTIVE_H = False
    SUPPORTED_IN_EXTS = ['.mae', '.mae.gz', '.maegz']
    UNIQUE = False 
[docs]def init_random_seed(random_seed, logger):
    """
    Process the seed to the random number generator.
    :type random_seed: None or int
    :param random_seed: the seed for the random number generator
    :type logger: logging.Logger
    :param logger: output logger
    :rtype: int
    :return: final random seed
    """
    # system time comes out with 6 digits, just use the integer part
    if random_seed is None:
        random_seed = int(time.time())
    # print some stuff
    SEED_MSG = ('Seed used is:')
    if logger:
        logger.info(SEED_MSG)
        logger.info('-' * len(SEED_MSG))
        logger.info('%s' % random_seed)
        logger.info('')
    return random_seed 
[docs]def is_polymer_builder_grow_atom(atom):
    """
    Return True if the given atom is a grow atom prepared by
    the polymer builder module.
    :type atom: schrodinger.structure._StructureAtom
    :param atom: the atom
    :rtype: bool
    :return: True if a polymer builder grow atom
    """
    head = atom.property.get(amorphous.HEAD_ATOM_PROP)
    return ((head and atom.property.get(amorphous.COUPLER_ATOM_PROP)) or
            (head and atom.property.get(amorphous.GROWER_ATOM_PROP))) 
[docs]def is_mark_monomer_grow_atom(atom):
    """
    Return True if the given atom is a grow atom prepared by
    the mark monomer module.
    :type atom: schrodinger.structure._StructureAtom
    :param atom: the atom
    :rtype: bool
    :return: True if a mark monomer grow atom
    """
    head_or_tail = atom.property.get(msprops.ROLE_PROP)
    return head_or_tail in [msprops.HEAD, msprops.TAIL] 
[docs]def get_grow_idxs(st, idxs=None):
    """
    Return a list of grow indices for the given structure.
    :type st: schrodinger.structure.Structure
    :param st: the structure
    :type idxs: list
    :param idxs: the indices to check
    :rtype: list
    :return: grow indices
    """
    if idxs is None:
        idxs = range(1, st.atom_total + 1)
    grow_idxs = []
    for idx in idxs:
        atom = st.atom[idx]
        if is_polymer_builder_grow_atom(atom) or is_mark_monomer_grow_atom(
                atom):
            grow_idxs.append(idx)
    return grow_idxs 
[docs]def remove_grow_properties(st, idxs=None):
    """
    Remove the grow properties from atoms in the given structure.
    :type st: schrodinger.structure.Structure
    :param st: the structure
    :type idxs: list
    :param idxs: the atom indices
    """
    keys = (amorphous.HEAD_ATOM_PROP, amorphous.COUPLER_ATOM_PROP,
            amorphous.GROWER_ATOM_PROP, msprops.ROLE_PROP)
    if idxs is None:
        idxs = range(1, st.atom_total + 1)
    for idx in idxs:
        atom = st.atom[idx]
        for key in keys:
            atom.property.pop(key, None) 
[docs]def serves_as_backbone_bond(st, idx, jdx):
    """
    Return True if the given bond serves as a backbone bond.
    :type st: schrodinger.structure.Structure
    :param st: the structure
    :type idx: int
    :param idx: the first atom index of the bond
    :type jdx: int
    :param jdx: the second atom index of the bond
    :rtype: bool
    :return: True if serves as a backbone bond
    """
    # if the molecule containing the given bond is not a
    # monomer then return True
    mol_idxs = st.atom[idx].getMolecule().getAtomIndices()
    if not get_grow_idxs(st, idxs=mol_idxs):
        return True
    a_grow_idxs = get_grow_idxs(st,
                                idxs=indicies_from_bonds_deep(st, idx, [jdx]))
    b_grow_idxs = get_grow_idxs(st,
                                idxs=indicies_from_bonds_deep(st, jdx, [idx]))
    return a_grow_idxs and b_grow_idxs 
[docs]def serves_as_sidechain_bond(st, idx, jdx):
    """
    Return True if the given bond serves as a sidechain bond.
    :type st: schrodinger.structure.Structure
    :param st: the structure
    :type idx: int
    :param idx: the first atom index of the bond
    :type jdx: int
    :param jdx: the second atom index of the bond
    :rtype: bool
    :return: True if serves as a sidechain bond
    """
    # if the molecule containing the given bond is not a
    # monomer then return True
    mol_idxs = st.atom[idx].getMolecule().getAtomIndices()
    if not get_grow_idxs(st, idxs=mol_idxs):
        return True
    a_grow_idxs = get_grow_idxs(st,
                                idxs=indicies_from_bonds_deep(st, idx, [jdx]))
    b_grow_idxs = get_grow_idxs(st,
                                idxs=indicies_from_bonds_deep(st, jdx, [idx]))
    # if the given bond contains grow atoms then return False,
    # this could happen for a bond that acts as an end cap
    # for the growth
    if idx in a_grow_idxs or jdx in b_grow_idxs:
        return False
    return (len(a_grow_idxs) >= 2 and
            not b_grow_idxs) or (not a_grow_idxs and len(b_grow_idxs) >= 2) 
[docs]def valid_polymer_monomer_double_displacement_rxn(st,
                                                  idx,
                                                  jdx,
                                                  kdx,
                                                  ldx,
                                                  allow_backbone=True,
                                                  allow_sidechain=True):
    """
    Return True if the given input is for a valid double displacement
    polymer monomer reaction.
    :type st: schrodinger.structure.Structure
    :param st: the structure containing two molecules
    :type idx: int
    :param idx: the first atom index of the bond in the first
        molecule
    :type jdx: int
    :param jdx: the second atom index of the bond in the first
        molecule
    :type kdx: int
    :param kdx: the first atom index of the bond in the second
        molecule
    :type ldx: int
    :param ldx: the second atom index of the bond in the second
        molecule
    :type allow_backbone: bool
    :param allow_backbone: whether to allow backbone reactions
    :type allow_sidechain: bool
    :param allow_sidechain: whether to allow sidechain reactions
    :rtype: bool
    :return: True if a valid reaction
    """
    # if not a polymer monomer reaction
    if not get_grow_idxs(st):
        return True
    # use only the minimal number of polymer atom properties, for example
    # those that are used in get_grow_idxs, so as to prevent having to update
    # all of them as new monomers are successively created, specifically
    # during fragment mutation for example
    if allow_backbone and serves_as_backbone_bond(st, idx, jdx) and \
        
serves_as_backbone_bond(st, kdx, ldx):
        return True
    if allow_sidechain and serves_as_sidechain_bond(st, idx, jdx) and \
        
serves_as_sidechain_bond(st, kdx, ldx):
        return True
    return False 
[docs]def query_monomers(sts):
    """
    Return a list of bools indicating whether the given structures
    are monomers, i.e. have at least two grow indices.
    :type sts: list
    :param sts: contains schrodinger.structure.Structure
    :rtype: list
    :return: contains bools indicating whether the given structures
        are monomers
    """
    return [len(get_grow_idxs(st)) >= 2 for st in sts] 
[docs]def are_monomers(sts):
    """
    Return True if the given structures are monomers, i.e.
    have at least two grow indices.
    :type sts: list
    :param sts: contains schrodinger.structure.Structure
    :rtype: bool
    :return: True if the given structures are monomers
    """
    return all(query_monomers(sts)) 
[docs]def get_deduped_monomers(sts):
    """
    Return a list of smiles deduped monomers.
    :type sts: list
    :param sts: contains schrodinger.structure.Structure
    :rtype: list, list
    :return: contains smiles deduped monomers, contains filtered
        titles
    """
    # this is a safety measure for symmetric reactants to ensure
    # that when deduping that a true monomer, with proper atom
    # properties, isn't replaced by a fake monomer with the same
    # smiles but that lacks proper atom properties
    smiles_sts_dict = OrderedDict()
    for st in sts:
        smiles = analyze.generate_smiles(st)
        smiles_sts_dict.setdefault(smiles, []).append(st)
    f_sts = []
    titles = []
    for t_sts in smiles_sts_dict.values():
        f_st = None
        for st in t_sts:
            if not f_st and are_monomers([st]):
                f_st = st
            else:
                titles.append(st.property[CheckInput.TITLE_KEY])
        if not f_st:
            f_st = t_sts[0]
            titles.remove(f_st.property[CheckInput.TITLE_KEY])
        f_sts.append(f_st)
    return f_sts, titles 
[docs]class CacheInfo(object):
    """ Holds caches of computed data to reduce timings """
[docs]    def __init__(self):
        """ Create a CacheInfo instance """
        # keys = atom indexes, values = SMARTS pattern for that atom
        self.smarts_cache = {}
        # atom objects that are adsorption sites
        self.ads_cache = set()
        # Whether the ads_cache has been filled yet or not
        self.ads_cache_filled = False
        # keys = (atom index1, atom index2) for bonds, values = (SMILES, SMILES)
        # for the two fragments created by breaking that bond
        self.smiles_cache = {}
        # keys = frozenset([atom index1, atom index2]) for bonds,
        # see checkIfNotSimpleBond for values
        self.bond_cache = {} 
[docs]    def fillAdsCache(self, struct, matches):
        """
        Fill the adsorption site cache with atoms from struct
        :type struct: `schrodinger.structure.Structure`
        :param struct: The structure with the atoms to check
        :param set matches: Atom indexes that are automatically considered
            adsorption sites
        """
        if not self.ads_cache_filled:
            data = [
                x for x in struct.atom if
                not CheckInput().checkIfNotAdsorbable(struct, x.index, matches)
            ]
            self.ads_cache.update(data)
            self.ads_cache_filled = True  
[docs]class ChannelDefinitions(object):
    """
    Manage a collection of reaction channel definitions.
    """
    BLANK_PATTERN = re.compile(r'\s*\n$')
    RANDOM_ALL_TO_ON_THE_FLY_THRESH = 50
    MAX_NUM_UNPRODUCTIVE_CYCLES = 1000
    DESCRIPTOR_DICT = {
        Constants.DISSOCIATION_TYPE: 'Dissociation: ',
        Constants.ASSOCIATION_TYPE: 'Association: ',
        Constants.SINGLE_DISPLACEMENT_TYPE: 'Single Displacement: ',
        Constants.DOUBLE_DISPLACEMENT_TYPE: 'Double Displacement: '
    }
[docs]    def __init__(self, logger=None):
        """
        Create an instance.
        :type logger: logging.getLogger
        :param logger: output logger
        """
        self.logger = logger
        self.definitions = []
        self.bad_defs = []
        self.allow_adsorption_onto = None
        self.matches = None 
[docs]    @staticmethod
    def flattenChannels(all_channel_defs):
        """
        Return a flattened list of channels.
        :type all_channel_defs: OrderedDict
        :param all_channel_defs: all possible channel definitions,
            keyed first by channel type and second by reaction type
        :rtype: list
        :return: a flattened list of channels
        """
        return [
            rxn for rxn_dict in list(all_channel_defs.values())
            for rxns in list(rxn_dict.values()) for rxn in rxns
        ] 
[docs]    def addDefinition(self, str_definition):
        """
        Add a reaction channel definition to the list of definitions.
        :type str_definition: str
        :param str_definition: the string representation of the definition
        :rtype: bool
        :return: not_accepted, False for an accepted definition, True for
            a definition that was not accepted
        """
        definition = ChannelDefinition(str_definition, self.logger)
        if not definition.not_valid and definition not in self.definitions:
            self.definitions.append(definition)
            not_accepted = False
        else:
            self.bad_defs.append(str_definition.strip('\n'))
            not_accepted = True
        return not_accepted 
[docs]    def addDefinitionsFromFile(self, channels_file):
        """
        Add reaction channel definitions from an input file to the
        list of definitions.
        :type channels_file: str
        :param channels_file: input file containing the definitions
            of the reaction channels
        """
        channels_handle = open(channels_file, 'r')
        for aline in channels_handle:
            if not self.BLANK_PATTERN.match(aline):
                self.addDefinition(aline)
        channels_handle.close() 
[docs]    def buildChannelStrDef(self,
                           atom_a_one,
                           atom_b_one,
                           atom_a_two=None,
                           atom_b_two=None):
        """
        Build the string representation of a reaction channel from a
        sequence of integers.
        :type atom_a_one: int
        :param atom_a_one: the first atom of the first part of the
            definition
        :type atom_b_one: int
        :param atom_b_one: the first atom of the second part of the
            definition
        :type atom_a_two: int
        :param atom_a_two: the second atom of the first part of the
            definition
        :type atom_b_two: int
        :param atom_b_two: the second atom of the second part of the
            definition
        :rtype: str
        :return: str_def, the string representation of the provided
            reaction channel
        """
        FIRST_SEP = ChannelDefinition.FIRST_SEP
        SECOND_SEP = ChannelDefinition.SECOND_SEP
        str_def = str(atom_a_one)
        if atom_a_two:
            str_def += SECOND_SEP + str(atom_a_two)
        str_def += FIRST_SEP + SECOND_SEP + str(atom_b_one)
        if atom_b_two:
            str_def += SECOND_SEP + str(atom_b_two)
        return str_def 
[docs]    def getDissociationChannels(self, reactants, no_reactive_h, caches=None):
        """
        Return an OrderedDict of dissociation channel string
        definitions keyed by type.
        :type reactants: schrodinger.structure.Structure
        :param reactants: the reactants from which channels will be
            enumerated
        :type no_reactive_h: bool
        :param no_reactive_h: specify that R-H bonds be considered inert
        :param `CacheInfo` caches: Data caches to speed up the calculation
        :rtype: OrderedDict
        :return: keys are type, values are dissociation channels
        """
        NO_DISSOCIATION_DEFS = """There are no dissociation channels
            available for the reactants that you have provided.  This
            is likely because there are no single bonds outside of
            rings or because you have disabled reactivity of R-H bonds.
            Please reconsider the types of channels that you are trying
            to sample."""
        dissociation_defs = OrderedDict()
        if not caches:
            caches = CacheInfo()
        # these are expensive functions and so are only called for small
        # systems
        for bond in reactants.bond:
            if not any(CheckInput().checkIfNotSimpleBond(
                    reactants,
                    bond.atom1.index,
                    bond.atom2.index,
                    no_reactive_h,
                    bond_cache=caches.bond_cache)):
                idxs = [(bond.atom1.index, bond.atom2.index)]
                ahash = tuple(
                    get_sorted_atomic_smarts(reactants,
                                             idxs,
                                             smarts_cache=caches.smarts_cache))
                str_def = self.buildChannelStrDef(*idxs[0])
                dissociation_defs.setdefault(ahash, []).append(str_def)
        if not dissociation_defs and self.logger:
            self.logger.warning(NO_DISSOCIATION_DEFS)
        return dissociation_defs 
[docs]    def getAssociationChannels(self, reactants, caches=None):
        """
        Return an OrderedDict of association channel string
        definitions keyed by type.
        :type reactants: schrodinger.structure.Structure
        :param reactants: the reactants from which channels will be
            enumerated
        :param `CacheInfo` caches: Data caches to speed up the calculation
        :rtype: OrderedDict
        :return: keys are type, values are association channels
        """
        NO_ASSOCIATION_DEFS = """There are no association channels
            available for the reactants that you have provided.  This
            is likely because you either have only a single reactant
            molecule or you do not have at least two adsorbable atoms.
            Please reconsider the types of channels that you are
            trying to sample as well as the possibility of adding
            custom adsorption sites."""
        association_defs = OrderedDict()
        if caches is None:
            caches = CacheInfo()
        caches.fillAdsCache(reactants, self.matches)
        # these are expensive functions and so are only called for small
        # systems
        for molecule_one in reactants.molecule:
            for molecule_two in reactants.molecule:
                if molecule_one.number >= molecule_two.number:
                    continue
                for atom_one in molecule_one.atom:
                    if atom_one not in caches.ads_cache:
                        continue
                    for atom_two in molecule_two.atom:
                        if atom_two not in caches.ads_cache:
                            continue
                        idxs = [(atom_one.index, atom_two.index)]
                        ahash = tuple(
                            get_sorted_atomic_smarts(
                                reactants,
                                idxs,
                                smarts_cache=caches.smarts_cache))
                        str_def = self.buildChannelStrDef(*idxs[0])
                        association_defs.setdefault(ahash, []).append(str_def)
        if not association_defs and self.logger:
            self.logger.warning(NO_ASSOCIATION_DEFS)
        return association_defs 
[docs]    def getSingleDisplacementChannels(self,
                                      reactants,
                                      no_reactive_h,
                                      unique,
                                      caches=None):
        """
        Return an OrderedDict of single displacement channel string
        definitions keyed by type.
        :type reactants: schrodinger.structure.Structure
        :param reactants: the reactants from which channels will be
            enumerated
        :type no_reactive_h: bool
        :param no_reactive_h: specify that R-H bonds be considered inert
        :type unique: bool
        :param unique: provide only unique products
        :param `CacheInfo` caches: Data caches to speed up the calculation
        :rtype: OrderedDict
        :return: keys are type, values are single displacement channels
        """
        NO_SINGLE_DISPLACEMENT_DEFS = """There are no single displacement
            channels available for the reactants that you have provided.
            This is likely because you either have only a single reactant
            molecule, have no single bonds outside of rings, have disabled
            the reactivity of R-H bonds, have no adsorbable atoms, or if
            unique has been specified have no unique products.  Please
            reconsider the types of channels that you are trying to sample
            as well as the possibility of adding custom adsorption sites."""
        single_displacement_defs = OrderedDict()
        if not caches:
            caches = CacheInfo()
        caches.fillAdsCache(reactants, self.matches)
        # these are expensive functions and so are only called for small
        # systems, this type of channel involves an atom in one molecule
        # and a bond in another and so to handle the asymmetry we loop over
        # all pairs of molecules and take an atom from the first and a bond
        # from the second
        for molecule_one in reactants.molecule:
            for molecule_two in reactants.molecule:
                if molecule_one.number != molecule_two.number:
                    for atom in molecule_one.atom:
                        if not atom in caches.ads_cache:
                            continue
                        for bond in get_bonds_in_molecule(molecule_two):
                            if any(CheckInput().checkIfNotSimpleBond(
                                    reactants,
                                    bond.atom1.index,
                                    bond.atom2.index,
                                    no_reactive_h,
                                    bond_cache=caches.bond_cache)):
                                continue
                            if unique:
                                if not any(CheckInput().checkChannelUniqueness(
                                        reactants,
                                        atom.index,
                                        None,
                                        bond.atom1.index,
                                        bond.atom2.index,
                                        smiles_cache=caches.smiles_cache)):
                                    continue
                            idxs = [(atom.index,),
                                    (bond.atom1.index, bond.atom2.index)]
                            ahash = tuple(
                                get_sorted_atomic_smarts(
                                    reactants,
                                    idxs,
                                    smarts_cache=caches.smarts_cache))
                            str_def = self.buildChannelStrDef(
                                atom.index, bond.atom1.index, None,
                                bond.atom2.index)
                            single_displacement_defs.setdefault(
                                ahash, []).append(str_def)
        if not single_displacement_defs and self.logger:
            self.logger.warning(NO_SINGLE_DISPLACEMENT_DEFS)
        return single_displacement_defs 
[docs]    def getDoubleDisplacementChannels(
            self,
            reactants,
            no_reactive_h,
            unique,
            no_reactive_polymer_monomer_backbone=False,
            no_reactive_polymer_monomer_sidechain=False,
            caches=None):
        """
        Return an OrderedDict of double displacement channel string
        definitions keyed by type.
        :type reactants: schrodinger.structure.Structure
        :param reactants: the reactants from which channels will be
            enumerated
        :type no_reactive_h: bool
        :param no_reactive_h: specify that R-H bonds be considered inert
        :type unique: bool
        :param unique: provide only unique products
        :type no_reactive_polymer_monomer_backbone: bool
        :param no_reactive_polymer_monomer_backbone: specify that polymer monomer
            backbone bonds be considered inert
        :type no_reactive_polymer_monomer_sidechain: bool
        :param no_reactive_polymer_monomer_sidechain: specify that polymer monomer
            sidechain bonds be considered inert
        :param `CacheInfo` caches: Data caches to speed up the calculation
        :rtype: OrderedDict
        :return: keys are type, values are double displacement channels
        """
        NO_DOUBLE_DISPLACEMENT_DEFS = """There are no double displacement
            channels available for the reactants that you have provided.
            This is likely because you either have only a single reactant
            molecule, have no single bonds outside of rings, have disabled
            the reactivity of R-H bonds, have disabled the reactivity of
            polymer monomer backbone and/or sidechain bonds, or if unique
            has been specified have no unique products.  Please reconsider
            the types of channels that you are trying to sample."""
        double_displacement_defs = OrderedDict()
        allow_backbone = not no_reactive_polymer_monomer_backbone
        allow_sidechain = not no_reactive_polymer_monomer_sidechain
        if caches is None:
            caches = CacheInfo()
        # these are expensive functions and so are only called for small
        # systems
        is_polymer = get_grow_idxs(reactants)
        for molecule_one in reactants.molecule:
            for molecule_two in reactants.molecule:
                if molecule_one.number < molecule_two.number:
                    for bond_one in get_bonds_in_molecule(molecule_one):
                        b1i1 = bond_one.atom1.index
                        b1i2 = bond_one.atom2.index
                        if any(CheckInput().checkIfNotSimpleBond(
                                reactants,
                                b1i1,
                                b1i2,
                                no_reactive_h,
                                bond_cache=caches.bond_cache)):
                            continue
                        for bond_two in get_bonds_in_molecule(molecule_two):
                            b2i1 = bond_two.atom1.index
                            b2i2 = bond_two.atom2.index
                            if any(CheckInput().checkIfNotSimpleBond(
                                    reactants,
                                    b2i1,
                                    b2i2,
                                    no_reactive_h,
                                    bond_cache=caches.bond_cache)):
                                continue
                            # It's important speed-wise to check is_polymer
                            # first before calling valid_polymer
                            if (is_polymer and
                                    not valid_polymer_monomer_double_displacement_rxn(
                                        reactants,
                                        b1i1,
                                        b1i2,
                                        b2i1,
                                        b2i2,
                                        allow_backbone=allow_backbone,
                                        allow_sidechain=allow_sidechain)):
                                continue
                            if unique:
                                if not any(CheckInput().checkChannelUniqueness(
                                        reactants,
                                        b1i1,
                                        b1i2,
                                        b2i1,
                                        b2i2,
                                        smiles_cache=caches.smiles_cache)):
                                    continue
                            idxs = [(b1i1, b1i2), (b2i1, b2i2)]
                            str_def = self.buildChannelStrDef(
                                b1i1, b2i1, b1i2, b2i2)
                            ahash = tuple(
                                get_sorted_atomic_smarts(
                                    reactants,
                                    idxs,
                                    outer_sort=True,
                                    smarts_cache=caches.smarts_cache))
                            double_displacement_defs.setdefault(
                                ahash, []).append(str_def)
        if not double_displacement_defs and self.logger:
            self.logger.warning(NO_DOUBLE_DISPLACEMENT_DEFS)
        return double_displacement_defs 
[docs]    def getAllChannels(self,
                       reactants,
                       random_types,
                       no_reactive_h,
                       unique,
                       no_reactive_polymer_monomer_backbone=False,
                       no_reactive_polymer_monomer_sidechain=False):
        """
        Return a nested OrderedDict of all channel string
        definitions keyed first by type of channel and second
        by type of reaction.
        :type reactants: schrodinger.structure.Structure
        :param reactants: the reactants from which channels will be
            enumerated
        :type random_types: list
        :param random_types: list of strings specifying which
            types of channels should be sampled when generating the
            random reaction channels
        :type no_reactive_h: bool
        :param no_reactive_h: specify that R-H bonds be considered
            inert
        :type unique: bool
        :param unique: provide only unique products
        :type no_reactive_polymer_monomer_backbone: bool
        :param no_reactive_polymer_monomer_backbone: specify that polymer monomer
            backbone bonds be considered inert
        :type no_reactive_polymer_monomer_sidechain: bool
        :param no_reactive_polymer_monomer_sidechain: specify that polymer monomer
            sidechain bonds be considered inert
        :rtype: OrderedDict
        :return: all possible channel definitions, keyed first by
            channel type and second by reaction type
        """
        channel_defs_to_sample = OrderedDict()
        caches = CacheInfo()
        if Constants.DISSOCIATION_TYPE in random_types:
            dissociation_defs = self.getDissociationChannels(reactants,
                                                             no_reactive_h,
                                                             caches=caches)
            if dissociation_defs:
                channel_defs_to_sample[Constants.DISSOCIATION_TYPE] = \
                    
dissociation_defs
        if Constants.ASSOCIATION_TYPE in random_types:
            association_defs = self.getAssociationChannels(reactants,
                                                           caches=caches)
            if association_defs:
                channel_defs_to_sample[Constants.ASSOCIATION_TYPE] = \
                    
association_defs
        if Constants.SINGLE_DISPLACEMENT_TYPE in random_types:
            single_displacement_defs = self.getSingleDisplacementChannels(
                reactants, no_reactive_h, unique, caches=caches)
            if single_displacement_defs:
                channel_defs_to_sample[Constants.SINGLE_DISPLACEMENT_TYPE] = \
                    
single_displacement_defs
        if Constants.DOUBLE_DISPLACEMENT_TYPE in random_types:
            double_displacement_defs = self.getDoubleDisplacementChannels(
                reactants,
                no_reactive_h,
                unique,
                no_reactive_polymer_monomer_backbone=
                no_reactive_polymer_monomer_backbone,
                no_reactive_polymer_monomer_sidechain=
                no_reactive_polymer_monomer_sidechain,
                caches=caches)
            if double_displacement_defs:
                channel_defs_to_sample[Constants.DOUBLE_DISPLACEMENT_TYPE] = \
                    
double_displacement_defs
        return channel_defs_to_sample 
[docs]    def printAllChannels(self, all_channel_defs):
        """
        For each type of sampled channel log the total number
        of such channels.
        :type all_channel_defs: OrderedDict
        :param all_channel_defs: all possible channel definitions,
            keyed first by channel type and second by reaction type
        """
        types_to_sizes_dict = OrderedDict()
        for channel_type, rxn_types in all_channel_defs.items():
            total = sum([len(rxns) for rxns in rxn_types.values()])
            types_to_sizes_dict[channel_type] = total
        TOTAL_TAG = 'Total:'
        max_descriptor = max(
            [len(descriptor) for descriptor in self.DESCRIPTOR_DICT.values()])
        max_size = max([len(str(size)) for size in \
            
list(types_to_sizes_dict.values())])
        HEADER_ONE = 'Number of possible reaction channels by type'
        HEADER_TWO = '-' * len(HEADER_ONE)
        if self.logger:
            self.logger.info(HEADER_ONE)
            self.logger.info(HEADER_TWO)
            self.logger.info('')
            for key, value in types_to_sizes_dict.items():
                descriptor = self.DESCRIPTOR_DICT[key]
                aline = descriptor.ljust(max_descriptor) + \
                    
str(value).rjust(max_size)
                self.logger.info(aline)
            if len(types_to_sizes_dict) > 1:
                self.logger.info('-' * (max_descriptor + max_size))
                total = sum(types_to_sizes_dict.values())
                total_line = TOTAL_TAG.ljust(max_descriptor) + \
                    
str(total).rjust(max_size)
                self.logger.info(total_line)
            self.logger.info('') 
[docs]    def addRandomChannelsFromAll(self,
                                 num_random_channels,
                                 random_seed,
                                 all_channel_defs,
                                 bin_rxn_types=False):
        """
        Add the specified number of random channels to the list
        of channels to be computed by picking a random sample of
        the size given from the list of all possible channels.
        If the number of random channels is 'all' or greater than
        or equal to the number of possible channels then pick all
        of them in order.
        :type num_random_channels: int or str 'all'
        :param num_random_channels: the desired number of random
            channels to generate
        :type random_seed: None or int
        :param random_seed: the seed for the random number generator
        :type all_channel_defs: OrderedDict
        :param all_channel_defs: all possible channel definitions,
            keyed first by channel type and second by reaction type
        :type bin_rxn_types: bool
        :param bin_rxn_types: if True then when generating random
            channels first bin by reaction type followed by selecting a
            random type followed by a random instance, if False then just
            select a random instance from all instances
        """
        ASKED_FOR_TOO_MANY_MSG = """
            You have specified that %s random reaction channels
            should be generated but this system, given the other
            options you have specified, only has %s possible
            reaction channels.  Proceeding with all possible
            reaction channels."""
        flattened_channels = self.flattenChannels(all_channel_defs)
        total_num_channels = len(flattened_channels)
        if num_random_channels == Constants.ALL_RANDOM_CHANNELS or \
            
num_random_channels == total_num_channels:
            defs_to_add = list(flattened_channels)
        elif num_random_channels > total_num_channels:
            if self.logger:
                self.logger.warning(ASKED_FOR_TOO_MANY_MSG %
                                    (num_random_channels, total_num_channels))
            defs_to_add = list(flattened_channels)
        else:
            random_seed = init_random_seed(random_seed, self.logger)
            my_random.seed(random_seed)
            if bin_rxn_types:
                defs_to_add = []
                while len(defs_to_add) < num_random_channels:
                    channel_type = random_choice(all_channel_defs)
                    rxn_type = random_choice(all_channel_defs[channel_type])
                    rxn = random_choice(
                        all_channel_defs[channel_type][rxn_type])
                    if rxn not in defs_to_add:
                        defs_to_add.append(rxn)
            else:
                defs_to_add = list(
                    my_random.choice(flattened_channels,
                                     size=num_random_channels,
                                     replace=False))
        for def_to_add in defs_to_add:
            self.addDefinition(def_to_add) 
    def _getRandomAtomsFromBins(self, mols, cache=None):
        """
        Return a list of random atom indicies, one per given
        molecule, drawn by first binning by atom type.
        :type mols: tuple
        :param mols: contains schrodinger.structure._Molecule
        :type cache: dict
        :param cache: an atom type dict cache, keys are molecule
            numbers, values are atom type dicts
        :rtype: list
        :return: contains single tuples of atom indices, i.e.  (1,)
        """
        singles = []
        cache = cache or {}
        for mol in mols:
            atom_dict = cache.get(mol.number)
            if not atom_dict:
                st = mol.extractStructure()
                atom_dict = get_atom_type_dict(st)
                if atom_dict:
                    old = mol.getAtomIndices()
                    new = list(range(1, len(old) + 1))
                    new_to_old = dict(list(zip(new, old)))
                    atom_dict = _update_type_dict(atom_dict, new_to_old)
                    cache[mol.number] = atom_dict
            atype = random_choice(atom_dict)
            single = random_choice(atom_dict[atype])
            singles.append(single)
        return singles
    def _getRandomBondsFromBins(self, mols, cache=None):
        """
        Return a list of random bonding atom index pairs, one
        per given molecule, drawn by first binning by bond type.
        :type mols: tuple
        :param mols: contains schrodinger.structure._Molecule
        :type cache: dict
        :param cache: a bond type dict cache, keys are molecule
            numbers, values are bond type dicts
        :rtype: list
        :return: contains pair tuples of atom indices, i.e. (1, 2)
        """
        pairs = []
        cache = cache or {}
        for mol in mols:
            bond_dict = cache.get(mol.number)
            if not bond_dict:
                st = mol.extractStructure()
                bond_dict = get_bond_type_dict(st)
                if bond_dict:
                    old = mol.getAtomIndices()
                    new = list(range(1, len(old) + 1))
                    new_to_old = dict(list(zip(new, old)))
                    bond_dict = _update_type_dict(bond_dict, new_to_old)
                    cache[mol.number] = bond_dict
            atype = random_choice(bond_dict)
            pair = random_choice(bond_dict[atype])
            pairs.append(pair)
        return pairs
[docs]    def addRandomChannelsOnTheFly(self,
                                  reactants,
                                  num_random_channels,
                                  random_types,
                                  no_reactive_h,
                                  unique,
                                  bin_rxn_types=False,
                                  no_reactive_polymer_monomer_backbone=False,
                                  no_reactive_polymer_monomer_sidechain=False):
        """
        Generate the specified number of random reaction channels
        without ever precomputing the list of all possible channels.
        :type reactants: schrodinger.structure.Structure
        :param reactants: the reactants from which channels will be
            enumerated
        :type num_random_channels: int
        :param num_random_channels: the desired number of random
            channels to generate
        :type random_types: list
        :param random_types: list of strings specifying which types of
            channels should be sampled when generating the random reaction
            channels
        :type no_reactive_h: bool
        :param no_reactive_h: specify that R-H bonds be considered inert
        :type unique: bool
        :param unique: provide only unique products
        :type bin_rxn_types: bool
        :param bin_rxn_types: if True then when generating random
            channels first bin by reaction type followed by selecting a
            random type followed by a random instance, if False then just
            select a random instance from all instances
        :type no_reactive_polymer_monomer_backbone: bool
        :param no_reactive_polymer_monomer_backbone: specify that polymer monomer
            backbone bonds be considered inert
        :type no_reactive_polymer_monomer_sidechain: bool
        :param no_reactive_polymer_monomer_sidechain: specify that polymer monomer
            sidechain bonds be considered inert
        """
        def wrapper_check_bonding():
            """
            This is just a convenience wrapper to check the validity of
            any random bond(s) generated on-the-fly for the current random
            reaction channel type.
            :rtype: boolean
            :return: True if at least one of the bonds is not a simple bond
                and therefore to skip this random definition, False otherwise
            """
            if atom_a_two:
                if any(CheckInput().checkIfNotSimpleBond(
                        reactants, atom_a_one, atom_a_two, no_reactive_h)):
                    return True
            if atom_b_two:
                if any(CheckInput().checkIfNotSimpleBond(
                        reactants, atom_b_one, atom_b_two, no_reactive_h)):
                    return True
        def get_bonded_atoms(central_atom_ind):
            """
            Return a list of atom indicies for atoms bound to the central
            atom.  This function just collects indicies provided by the
            schrodinger.structure.Structure._StructureAtom.bonded_atoms
            generator so that a sample from which we can randomly choose
            can be obtained.
            :type central_atom_ind: int
            :param central_atom_ind: the central atom for which the bonded
                atoms will be returned
            :rtype: list of ints
            :return: atoms bonded to the central atom
            """
            central_atom = reactants.atom[central_atom_ind]
            bonded_atoms = []
            for bonded_atom in central_atom.bonded_atoms:
                bonded_atoms.append(bonded_atom.index)
            return bonded_atoms
        def break_out():
            if num_unproductive_cycles > self.MAX_NUM_UNPRODUCTIVE_CYCLES:
                UNPRODUCTIVE_MSG = """Over the last %d attempts this script
                    has been unable to generate a valid randomly defined
                    reaction channel.  It is likely that you have either
                    asked for a number of random channels that is greater
                    than the number of available channels given your input
                    or you are trying to sample a rare channel.  This
                    script will proceed with the %d random channels that
                    have been determined thus far out of the %d that you
                    asked for."""
                if self.logger:
                    self.logger.warning(
                        UNPRODUCTIVE_MSG %
                        (self.MAX_NUM_UNPRODUCTIVE_CYCLES,
                         num_random_channels_added, num_random_channels))
                return True
        def get_random_molecule_and_atom():
            mol = random_choice(reactant_molecules)
            atom_one, atom_two = random_choice(mol.getAtomIndices()), None
            atom_one_bonded_atoms = get_bonded_atoms(atom_one)
            return mol, atom_one, atom_two, atom_one_bonded_atoms
        allow_backbone = not no_reactive_polymer_monomer_backbone
        allow_sidechain = not no_reactive_polymer_monomer_sidechain
        # begin generating random reaction channels
        #
        # in this case we have to use a while loop because we have no
        # upper bound given that it is too expensive to calculate by
        # default (if the user wants to they can pass 'all' as the
        # argument to the number of random reaction channels)
        num_random_channels_added = 0
        num_unproductive_cycles = 0
        reactant_molecules = [molecule for molecule in reactants.molecule]
        atom_dict_cache = {}
        bond_dict_cache = {}
        while num_random_channels_added < num_random_channels:
            if break_out():
                break
            # pick a random type of channel
            channel_type = random_choice(random_types)
            # pick a random molecule and a random atom
            mol_a, atom_a_one, atom_a_two, atom_a_one_bonded_atoms = \
                
get_random_molecule_and_atom()
            if channel_type != Constants.DISSOCIATION_TYPE and \
                
len(reactant_molecules) != 1:
                # pick another random molecule and a random atom
                # and compare
                mol_b, atom_b_one, atom_b_two, \
                    
atom_b_one_bonded_atoms = \
                    
get_random_molecule_and_atom()
                if mol_a.number == mol_b.number:
                    num_unproductive_cycles += 1
                    continue
                # check association type
                if channel_type == Constants.ASSOCIATION_TYPE:
                    if bin_rxn_types:
                        mols = (mol_a, mol_b)
                        (atom_a_one,), (atom_b_one,) = \
                            
self._getRandomAtomsFromBins(mols,
                            cache=atom_dict_cache)
                    if CheckInput().checkIfNotAdsorbable(
                            reactants, atom_a_one, self.matches):
                        num_unproductive_cycles += 1
                        continue
                    if CheckInput().checkIfNotAdsorbable(
                            reactants, atom_b_one, self.matches):
                        num_unproductive_cycles += 1
                        continue
                # check single displacement type
                if channel_type == Constants.SINGLE_DISPLACEMENT_TYPE:
                    if not (atom_a_one_bonded_atoms or atom_b_one_bonded_atoms):
                        num_unproductive_cycles += 1
                        continue
                    elif atom_a_one_bonded_atoms:
                        if not bin_rxn_types:
                            atom_a_two = random_choice(atom_a_one_bonded_atoms)
                        else:
                            mols = (mol_a,)
                            (atom_a_one, atom_a_two), = \
                                
self._getRandomBondsFromBins(mols,
                                cache=bond_dict_cache)
                            mols = (mol_b,)
                            (atom_b_one,), = self._getRandomAtomsFromBins(
                                mols, cache=atom_dict_cache)
                        if CheckInput().checkIfNotAdsorbable(
                                reactants, atom_b_one, self.matches):
                            num_unproductive_cycles += 1
                            continue
                    elif atom_b_one_bonded_atoms:
                        if not bin_rxn_types:
                            atom_b_two = random_choice(atom_b_one_bonded_atoms)
                        else:
                            mols = (mol_a,)
                            (atom_a_one,), = self._getRandomAtomsFromBins(
                                mols, cache=atom_dict_cache)
                            mols = (mol_b,)
                            (atom_b_one, atom_b_two), = \
                                
self._getRandomBondsFromBins(mols,
                                cache=bond_dict_cache)
                        if CheckInput().checkIfNotAdsorbable(
                                reactants, atom_a_one, self.matches):
                            num_unproductive_cycles += 1
                            continue
                # check double displacement type
                if channel_type == Constants.DOUBLE_DISPLACEMENT_TYPE:
                    if not atom_a_one_bonded_atoms or not atom_b_one_bonded_atoms:
                        num_unproductive_cycles += 1
                        continue
                    elif not bin_rxn_types:
                        atom_a_two = random_choice(atom_a_one_bonded_atoms)
                        atom_b_two = random_choice(atom_b_one_bonded_atoms)
                    else:
                        mols = (mol_a, mol_b)
                        (atom_a_one, atom_a_two), (atom_b_one, atom_b_two) = \
                            
self._getRandomBondsFromBins(mols,
                            cache=bond_dict_cache)
                # check bonding
                if wrapper_check_bonding():
                    num_unproductive_cycles += 1
                    continue
                # check polymer monomers
                if channel_type == Constants.DOUBLE_DISPLACEMENT_TYPE and \
                    
not valid_polymer_monomer_double_displacement_rxn(
                    reactants, atom_a_one, atom_a_two, atom_b_one, atom_b_two,
                    allow_backbone=allow_backbone, allow_sidechain=allow_sidechain):
                    num_unproductive_cycles += 1
                    continue
                # check channel uniqueness
                if unique and (channel_type == Constants.SINGLE_DISPLACEMENT_TYPE or \
                    
channel_type == Constants.DOUBLE_DISPLACEMENT_TYPE):
                    if not any(CheckInput().checkChannelUniqueness(
                            reactants, atom_a_one, atom_a_two, atom_b_one,
                            atom_b_two)):
                        num_unproductive_cycles += 1
                        continue
            elif channel_type == Constants.DISSOCIATION_TYPE:
                # check bonding
                if not atom_a_one_bonded_atoms:
                    num_unproductive_cycles += 1
                    continue
                elif not bin_rxn_types:
                    atom_b_one, atom_b_two = \
                        
random_choice(atom_a_one_bonded_atoms), None
                else:
                    mols = (mol_a,)
                    (atom_a_one, atom_b_one), = self._getRandomBondsFromBins(
                        mols, cache=bond_dict_cache)
                if any(CheckInput().checkIfNotSimpleBond(
                        reactants, atom_a_one, atom_b_one, no_reactive_h)):
                    num_unproductive_cycles += 1
                    continue
            # wrap up, do not store any random defs that are bad defs
            # because we are directly creating the definitions from
            # structural considerations and the contents here will
            # only be due to duplicates and the user doesn't need to
            # see them
            random_def = self.buildChannelStrDef(atom_a_one, atom_b_one,
                                                 atom_a_two, atom_b_two)
            if self.addDefinition(random_def):
                num_unproductive_cycles += 1
                self.bad_defs.remove(random_def.strip('\n'))
                continue
            else:
                num_random_channels_added += 1
                num_unproductive_cycles = 0 
[docs]    def addRandomChannels(self,
                          reactants,
                          num_random_channels=Constants.NUM_RANDOM_CHANNELS,
                          random_types=Constants.DEFAULT_RANDOM_TYPES,
                          random_seed=Constants.RANDOM_SEED,
                          allow_adsorption_onto=None,
                          no_reactive_h=Constants.NO_REACTIVE_H,
                          unique=Constants.UNIQUE,
                          bin_rxn_types=False,
                          no_reactive_polymer_monomer_backbone=False,
                          no_reactive_polymer_monomer_sidechain=False):
        """
        Generate the specified number of random reaction channels.
        :type reactants: schrodinger.structure.Structure
        :param reactants: the reactants from which channels will be
            enumerated
        :type num_random_channels: int or str
        :param num_random_channels: the desired number of random
            channels to generate or 'all'
        :type random_types: list
        :param random_types: list of strings specifying which types of
            channels should be sampled when generating the random reaction
            channels
        :type random_seed: None or int
        :param random_seed: the seed for the random number generator
        :type allow_adsorption_onto: list
        :param allow_adsorption_onto: SMARTS patterns of arbitrary
            adsorption sites to support
        :type no_reactive_h: bool
        :param no_reactive_h: specify that R-H bonds be considered inert
        :type unique: bool
        :param unique: provide only unique products
        :type bin_rxn_types: bool
        :param bin_rxn_types: if True then when generating random
            channels first bin by reaction type followed by selecting a
            random type followed by a random instance, if False then just
            select a random instance from all instances
        :type no_reactive_polymer_monomer_backbone: bool
        :param no_reactive_polymer_monomer_backbone: specify that polymer monomer
            backbone bonds be considered inert
        :type no_reactive_polymer_monomer_sidechain: bool
        :param no_reactive_polymer_monomer_sidechain: specify that polymer monomer
            sidechain bonds be considered inert
        """
        # check input needed for generating random channels
        num_random_channels, random_types, \
           
self.allow_adsorption_onto, self.matches = \
           
CheckInput().checkRandomChannelsInput(reactants,
           num_random_channels, random_types, random_seed,
           allow_adsorption_onto, self.logger)
        if not num_random_channels:
            # MATSCI-1310 - reset allow_adsorption_onto just to be safe
            self.allow_adsorption_onto = None
            return
        # log adsorption channels
        print_allow_adsorption_onto(self.allow_adsorption_onto, self.logger)
        # if all possible channels are wanted or if it is
        # inexpensive to calculate them all then calculate
        # them all and print them.  note that this use of the
        # word 'all' when referring to reaction channels means
        # 'all' of each type specified in the list of random
        # types to be sampling.  if the latter is also 'all'
        # then that will literally be all of the possible
        # reaction channels that are calculable by this script.
        #
        # if the user either asked for all of the channels or
        # for an inexpensive system they specified a number of
        # reaction channels that is larger than or equal to the
        # number of possible channels then all of the channels
        # will be computed.  otherwise a random sample of channels
        # will be selected from the list of all possible channels
        # or are generated on the fly, without ever precomputing the
        # list of all possible channels, depending on system size.
        # we do it this way to avoid having to precompute the huge
        # list of all possible channels for large systems when the
        # user might only want a small number of them.  the sacrifice
        # being that the number of channels by type will never be
        # reported to the user and the exit conditions from the on
        # the fly version are indeterministic.  for example, without
        # the implemented time-out it would be possible to enter an
        # infinite loop in a situation where the user has asked for
        # a number of channels that is greater than the number of
        # available channels given that the latter is never computed
        # for large systems.  also, it will be possible that the
        # script will prematurely give up on trying to randomly
        # find valid reaction channels if too much time is spent
        # trying to find a rare channel
        if num_random_channels == Constants.ALL_RANDOM_CHANNELS or \
            
reactants.atom_total < self.RANDOM_ALL_TO_ON_THE_FLY_THRESH:
            all_channel_defs = self.getAllChannels(
                reactants,
                random_types,
                no_reactive_h,
                unique,
                no_reactive_polymer_monomer_backbone=
                no_reactive_polymer_monomer_backbone,
                no_reactive_polymer_monomer_sidechain=
                no_reactive_polymer_monomer_sidechain)
            if not all_channel_defs:
                return
            self.printAllChannels(all_channel_defs)
            self.addRandomChannelsFromAll(num_random_channels,
                                          random_seed,
                                          all_channel_defs,
                                          bin_rxn_types=bin_rxn_types)
        else:
            random_seed = init_random_seed(random_seed, self.logger)
            my_random.seed(random_seed)
            self.addRandomChannelsOnTheFly(
                reactants,
                num_random_channels,
                random_types,
                no_reactive_h,
                unique,
                bin_rxn_types=bin_rxn_types,
                no_reactive_polymer_monomer_backbone=
                no_reactive_polymer_monomer_backbone,
                no_reactive_polymer_monomer_sidechain=
                no_reactive_polymer_monomer_sidechain)  
[docs]class ChannelDefinition(object):
    """
    Manage a reaction channel definition.
    """
    PATTERN = \
        
re.compile(r'\s*(\d+)\s*(\d+)?\s*;\s*(\d+)\s*(\d+)?\s*$')
    FIRST_SEP = ';'
    SECOND_SEP = ' '
[docs]    def __init__(self, str_definition, logger=None):
        """
        Create an instance.
        :type str_definition: str
        :param str_definition: the string representation of
            the definition
        :type logger: logging.getLogger
        :param logger: output logger
        """
        self.str_definition = str_definition
        self.logger = logger
        self.not_valid = False
        self.r_one_a_one = None
        self.r_one_a_two = None
        self.r_two_a_one = None
        self.r_two_a_two = None
        self.r_one = []
        self.r_two = []
        self.listdef = []
        self.flatdef = []
        self.representation = ''
        self.do_which_types = None
        self.checkDefinition() 
    def __eq__(self, other):
        """
        Define class equality.
        """
        state = self.flatdef == other.flatdef
        return state
    def __repr__(self):
        """
        Define representation format.
        """
        return self.representation
[docs]    def setChannelAttrs(self,
                        r_one_a_one,
                        r_two_a_one,
                        r_one_a_two=None,
                        r_two_a_two=None):
        """
        Set up some attributes for this class.
        :type r_one_a_one: int
        :param r_one_a_one: the first atom of the first reactant
        :type r_two_a_one: int
        :param r_two_a_one: the first atom of the second reactant
        :type r_one_a_two: int
        :param r_one_a_two: the second atom of the first reactant
        :type r_two_a_two: int
        :param r_two_a_two: the second atom of the second reactant
        """
        # sort the lists on either side of the separator and then
        # do a final sort so that the pair with the smallest index
        # comes first, this is just a convention
        self.r_one_a_one = int(r_one_a_one)
        self.r_one = [self.r_one_a_one]
        self.r_two_a_one = int(r_two_a_one)
        self.r_two = [self.r_two_a_one]
        self.r_one_a_two = r_one_a_two
        if self.r_one_a_two:
            self.r_one_a_two = int(self.r_one_a_two)
            self.r_one += [self.r_one_a_two]
            self.r_one.sort()
        self.r_two_a_two = r_two_a_two
        if self.r_two_a_two:
            self.r_two_a_two = int(self.r_two_a_two)
            self.r_two += [self.r_two_a_two]
            self.r_two.sort()
        if self.r_one[0] <= self.r_two[0]:
            self.listdef = [self.r_one, self.r_two]
            self.flatdef = self.r_one + self.r_two
        else:
            self.listdef = [self.r_two, self.r_one]
            self.flatdef = self.r_two + self.r_one
        r_one, r_two = self.listdef
        r_one = self.SECOND_SEP.join([str(index) for index in r_one])
        r_two = self.SECOND_SEP.join([str(index) for index in r_two])
        self.representation = r_one + self.FIRST_SEP + \
            
self.SECOND_SEP + r_two 
[docs]    def checkAtomUniqueness(self):
        """
        Check for unique atoms.
        """
        if len(self.flatdef) != len(set(self.flatdef)):
            self.not_valid = True
            return self.not_valid 
[docs]    def checkAtomZeroIndex(self):
        """
        Check for an atom index that is zero.
        """
        if 0 in self.flatdef:
            self.not_valid = True
            return self.not_valid 
[docs]    def checkDefinition(self):
        """
        Check this reaction channel definition.
        """
        if self.checkDefStrFormat():
            return
        if self.checkAtomUniqueness():
            return
        if self.checkAtomZeroIndex():
            return  
[docs]def get_fragment_smiles(astructure, idx1, idx2):
    """
    Return two SMILES patterns of the two molecule fragments that
    would result from breaking the bond specified by the given two
    atoms or return the SMILES pattern of the molecule containing
    idx1 if idx2 is None.
    :type astructure: schrodinger.structure.Structure
    :param astructure: the structure containing the molecule for which
        fragment SMILES are wanted
    :type idx1: int
    :param idx1: an atom index
    :type idx2: int
    :param idx2: the atom index for the target bond formed with idx1 or
        None
    :rtype: str, str or str, None
    :return: either two strings one for each of two fragments or a single
        string for the molecule containing idx1 and None
    """
    mol = astructure.atom[idx1].getMolecule()
    orig_to_new = dict(
        list(zip(mol.getAtomIndices(), list(range(1,
                                                  len(mol.atom) + 1)))))
    mol = mol.extractStructure()
    if idx2:
        newidx1 = orig_to_new[idx1]
        newidx2 = orig_to_new[idx2]
        frag_a_indicies = indicies_from_bonds_deep(mol, newidx1, [newidx2])
        frag_a_st = mol.extract(frag_a_indicies)
        frag_a_smiles = analyze.generate_smiles(frag_a_st)
        frag_b_indicies = indicies_from_bonds_deep(mol, newidx2, [newidx1])
        frag_b_st = mol.extract(frag_b_indicies)
        frag_b_smiles = analyze.generate_smiles(frag_b_st)
    else:
        frag_a_st = mol.copy()
        frag_a_smiles = analyze.generate_smiles(frag_a_st)
        frag_b_smiles = None
    return frag_a_smiles, frag_b_smiles 
[docs]def indicies_from_bonds_deep(astructure,
                             start_atom_index,
                             exclude_atom_indicies=None,
                             depth=None):
    """
    Return a list of atom indicies obtained by collecting all atoms
    that are connected, in the bond traversal sense, to the given
    start atom by a number of bonds that is less than or equal to
    that given by the provided depth parameter.  The start atom is
    included in the returned list.  One can set a direction for the
    traversal by specifying exclusion atoms that are bound to the given
    start atom; the traversal will be done away from these atoms.
    Useful for doing SMARTS analysis from a central atom and proceeding
    outwards from that atom, as in nearest-neighbor,
    next-nearest-neighbor, etc., i. e. shells.  Also useful for
    splitting a molecule into fragments about a given bond.
    :type astructure: schrodinger.structure.Structure
    :param astructure: the structure containing all atoms
    :type start_atom_index: int
    :param start_atom_index: the atom index from which to start the traversal
    :type exclude_atom_indicies: list of int
    :param exclude_atom_indicies: atom indicies to exclude from the bond
        traversal which basically sets the direction of the bond traversal, for
        example in the direction of a certain bond
    :type depth: int
    :param depth: the maximum depth that the traversal is allowed to traverse
        in terms of the number of bonds from the starting atom
    :rtype: list of ints
    :return: indicies, atom indicies connected to the start atom by up to the
        specified bond depth
    """
    # if only radial shells are wanted then the exclusion list
    # should be empty
    if exclude_atom_indicies is None:
        exclude_atom_indicies = []
    # if only fragments from bond breaking are wanted then the
    # depth doesn't matter so we will set it to somthing
    # guaranteed to be larger than the number of connections,
    # i.e. set it to the number of atoms plus one (the latter
    # needed to handle the case where the number of atoms is
    # the same as the number of connections)
    if depth is None:
        depth = astructure.atom_total + 1
    # recursion function, if needed set the direction with
    # exclusion atoms, and if needed set the number of times
    # through the recursion with the depth.  The num_times_through
    # variable gives the current depth of the traversal, i.e. it
    # tells you which 'shell' you are on either the first shell
    # (nearest neighbors to the start atom), the second shell
    # (next nearest neighbors to the start atom), etc.  It is
    # bounded by the depth argument.  It starts at zero and is
    # incremented by one each time a valid bonding atom is
    # found in the next shell.  Each next shell enters the problem
    # with the next call of continue_traversal_recursion.  This is
    # done until the desired depth is acheived at which point we
    # to return from the current call and get back to the most recent
    # branch point.  We first need to decrement the num_times_through
    # counter by 2, once for incrementing-by-one that was just done
    # with the current call and once to get back to the last branch
    # point.  If the current atom marks the end of a branch then
    # we only decrement by one to get back to the last branch point.
    # Remember that the structure of this set of recursive calls is
    # that starting from atom A, with first shell [B, C, D], and
    # second shell [E, F, G, H, I, J] and is greedy, that is that
    # the code block will reach A->B->E before it reaches even A->C,
    # etc.
    def continue_traversal_recursion(start_atom_index, exclude_atom_indicies):
        global num_times_through
        for bonded_atom in \
            
astructure.atom[start_atom_index].bonded_atoms:
            if bonded_atom.index not in indicies and \
                
bonded_atom.index not in exclude_atom_indicies:
                num_times_through += 1
                if num_times_through <= depth:
                    indicies.add(bonded_atom.index)
                    continue_traversal_recursion(bonded_atom.index,
                                                 [start_atom_index])
                else:
                    num_times_through -= 2
                    return
        num_times_through -= 1
        return
    # put the start atom in
    indicies = set([start_atom_index])
    # this global will track the number of times the
    # resursion is called
    global num_times_through
    num_times_through = 0
    # start it off
    continue_traversal_recursion(start_atom_index, exclude_atom_indicies)
    # return a sorted list
    indicies = list(indicies)
    indicies.sort()
    return indicies 
[docs]def set_zob_to_sob(astructure, atom_one, atom_two):
    """
    If the given structure has a zero-order bond for the given
    atom indicies then make this bond a single-order bond.
    :type astructure: schrodinger.structure.Structure
    :param astructure: the structure for which the bond order
        may be changed
    :type atom_one: int
    :param atom_one: the first atom index of the bond whose order
        needs changing
    :type atom_two: int
    :param atom_two: the second atom index of the bond whose order
        needs changing
    """
    bond = astructure.getBond(atom_one, atom_two)
    if bond.order == 0:
        bond.order = 1 
[docs]def wrapper_build_attach_structure(mol_one_st, mol_one_from_atom,
                                   mol_one_to_atom, mol_two_st,
                                   mol_two_from_atom, mol_two_to_atom):
    """
    A wrapper that corrects the atom reorder map returned from
    build.attach_structure to account for deleted fragment atoms
    and wraps the case where the active bonds may be of zero-order.
    :type mol_one_st: schrodinger.structure.Structure
    :param mol_one_st: the base structure to which the fragment
        will be added
    :type mol_one_from_atom: int
    :param mol_one_from_atom: defines the point at which the
        fragment structure will be added to the base structure
    :type mol_one_to_atom: int
    :param mol_one_to_atom: this atom defines the part of the
        base structure that will be replaced by the fragment
    :type mol_two_st: schrodinger.structure.Structure
    :param mol_two_st: the fragment structure to be added
        to the base structure
    :type mol_two_from_atom: int
    :param mol_two_from_atom: defines the point at which the
        base structure will be added to the fragment structure
    :type mol_two_to_atom: int
    :param mol_two_to_atom: this atom defines the part of the
        fragment structure that will be replaced by the base
    :rtype: dict, dict, dict
    :return: three mapping dictionaries, (1) original_map, which
        is the original map returned from build.attach_structure,
        (2) orig_to_new_map, which is the corrected version of (1),
        and (3) new_to_orig_map, the reverse of (2).
    """
    # handle zero-order bonds for both structures
    set_zob_to_sob(mol_one_st, mol_one_from_atom, mol_one_to_atom)
    set_zob_to_sob(mol_two_st, mol_two_from_atom, mol_two_to_atom)
    mol_one_size = mol_one_st.atom_total
    # no depth parameter because we are just returning fragments
    # after bond breaking
    mol_two_del_atoms = indicies_from_bonds_deep(mol_two_st, mol_two_to_atom,
                                                 [mol_two_from_atom])
    mol_two_del_atoms = [ind + mol_one_size for ind in mol_two_del_atoms]
    # call original function that returns faulty map
    original_map = build.attach_structure(mol_one_st, mol_one_from_atom,
                                          mol_one_to_atom, mol_two_st,
                                          mol_two_from_atom, mol_two_to_atom)
    # set deleted fragment atoms to None in the returned map
    orig_to_new_map = original_map.copy()
    mol_two_offset = 0
    for key, value in list(orig_to_new_map.items()):
        if key > mol_one_size:
            orig_to_new_map[key] = value - mol_two_offset
            if key in mol_two_del_atoms:
                orig_to_new_map[key] = None
                mol_two_offset += 1
    # while in here return the new to original index map for
    # convenience, it will skip None atoms
    new_to_orig_map = {}
    for key, value in orig_to_new_map.items():
        if value is not None:
            new_to_orig_map[value] = key
    return original_map, orig_to_new_map, new_to_orig_map 
[docs]def dict_delete_and_decrement_keys(indict, key_to_delete):
    """
    Return the dictionary obtained from the input dictionary
    by deleting an entry with the specified key and then
    decrementing all keys larger than the specified key.
    Leave the values unchanged.
    :type indict: dict
    :param indict: dictionary with integer keys and values
        from which to remove and decrement the specified key
    :type key_to_delete: int
    :param key_to_delete: the key to delete from the input
        dictionary
    :rtype: dict
    :return outdict: the new dictionary without the specifed
        key and with decremented keys
    """
    outdict = {}
    for key, value in indict.items():
        if key < key_to_delete:
            outdict[key] = value
        elif key > key_to_delete:
            outdict[key - 1] = value
    return outdict 
[docs]def open_bonding_sites(astructure, atom_index):
    """
    Return the number of open bonding sites available for the
    provided atom in the given structure.  This number is the
    same as the number of unpaired electrons for the atom in
    its current state in the given structure.
    :type astructure: schrodinger.structure.Structure
    :param astructure: the structure that contains the provided
        atom
    :type atom_index: int
    :param atom_index: the index of the atom for which the
        number of open bonding sites will be reported
    :rtype: int
    :return: atom_open_val, the number of open bonding sites
        for the provided atom
    """
    # using older MacroModel atom types because mmlewis doesn't
    # have an API to compute this
    atom_obj = astructure.atom[atom_index]
    atom_type = mm.mmat_get_atom_type(atom_obj.atom_type_name)
    atom_max_val = mm.mmat_get_valence(atom_type)
    atom_curr_val = sum([bond.order for bond in atom_obj.bond])
    atom_open_val = atom_max_val - atom_curr_val
    return atom_open_val 
[docs]def minimize_geometry(astructure):
    """
    Perform a geometry minimization on the provided structure.
    :type astructure: schrodinger.structure.Structure
    :param astructure: the structure object needing minimization
    :rtype: bool
    :return: False if the minimization was successful, True
        otherwise
    """
    version = Channels.OPLS_VERSION
    # using a bare except because I am not entirely sure of the
    # different ways that this function will traceback
    try:
        astructure.retype()
        with ioredirect.IOSilence():
            with minimize.minimizer_context(ffld_version=version,
                                            struct=astructure,
                                            cleanup=False) as minimize_obj:
                min_res = minimize_obj.minimize()
                astructure.property[
                    mm.OPLS_POTENTIAL_ENERGY] = min_res.potential_energy
    except:
        return True 
[docs]def get_bonds_in_molecule(amolecule):
    """
    This is a convenience function to get a list of bond objects
    from a molecule object, since no such function exists in our
    Python API.
    :type amolecule: schrodinger.structure._Molecule
    :param amolecule: the molecule object from which bonds will
        be defined
    :rtype: list of schrodinger.structure._StructureBond
    :return: bonds, list of bond objects for bonds in the specified
        molecule
    """
    # originally this was a set but it was later discovered while
    # testing the GA that consecutive runs using the same random seed
    # gave different results, not because of the seed but due to the
    # fact that the sets were being ordered differently for the two
    # runs
    bonds = OrderedDict()
    for atom in amolecule.atom:
        for bond in atom.bond:
            key = [bond.atom1.index, bond.atom2.index]
            key.sort()
            key = tuple(key)
            if key not in list(bonds):
                bonds[key] = bond
    return list(bonds.values()) 
[docs]def get_sorted_atomic_smarts(st, all_idxs, outer_sort=False, smarts_cache=None):
    """
    Return a tuple of sorted atomic SMARTS for the given
    collection of indicies in the given structure.
    :type st: schrodinger.structure.Structure
    :param st: the structure
    :type all_idxs: list
    :param all_idxs: contains tuples of atom indices
    :type outer_sort: bool
    :param outer_sort: whether to sort the tuples in the
        returned list
    :type dict smarts_cache: keys are atom indexes, values are the SMARTS
        pattern for that atom. Used to speed up multiple calls to this function.
        This function will populate the dictionary as new atoms are computed.
    :rtype: list
    :return: contains tuples of sorted atomic SMARTS having
        the same structure as the input indicies
    """
    smarts = []
    if smarts_cache is None:
        smarts_cache = {}
    for idxs in all_idxs:
        patterns = []
        for index in idxs:
            try:
                patterns.append(smarts_cache[index])
            except KeyError:
                with warnings.catch_warnings():
                    warnings.filterwarnings("ignore",
                                            category=DeprecationWarning,
                                            message="analyze.generate_smarts")
                    atsm = analyze.generate_smarts(st, atom_subset=[index])
                patterns.append(atsm)
                smarts_cache[index] = atsm
        smarts.append(tuple(sorted(patterns)))
    if outer_sort:
        smarts = sorted(smarts, key=lambda x: x[0])
    return smarts 
def _update_type_dict(adict, amap):
    """
    Return a copy of the given type dictionary with
    atom indicies updated according to the given index
    map.
    :type adict: OrderedDict
    :param adict: the type dictionary to be updated
    :type amap: dict
    :param amap: maps the indicies used in the given
        dict to another set of indicies
    :rtype: OrderedDict
    :return: the updated dict
    """
    new_adict = OrderedDict()
    for key, values in adict.items():
        new_values = []
        for value in values:
            new_value = [amap[x] for x in value]
            new_values.append(tuple(new_value))
        new_adict[key] = new_values
    return new_adict
[docs]def get_atom_type_dict(st):
    """
    Return an atom type dictionary for the given structure.
    :type st: schrodinger.structure.Structure
    :param st: the structure
    :rtype: OrderedDict
    :return: atom type dict, keys are single tuples of SMARTS atom
        types, like (smarts,), values are lists of single tuples of
        atom indicies, like (1,)
    """
    atom_type_dict = OrderedDict()
    for atom in st.atom:
        idxs = [(atom.index,)]
        ahash = tuple(get_sorted_atomic_smarts(st, idxs))
        atom_type_dict.setdefault(ahash, []).append(idxs[0])
    return atom_type_dict 
[docs]def get_bond_type_dict(st):
    """
    Return a bond type dictionary for the given structure.
    :type st: schrodinger.structure.Structure
    :param st: the structure
    :rtype: OrderedDict
    :return: bond type dict, keys are pair tuples of SMARTS bond
        types, like (smarts, smarts), values are lists of pair tuples
        of atom indicies, like (1, 2)
    """
    bond_type_dict = OrderedDict()
    for bond in st.bond:
        idxs = [(bond.atom1.index, bond.atom2.index)]
        ahash = tuple(get_sorted_atomic_smarts(st, idxs))
        bond_type_dict.setdefault(ahash, []).append(idxs[0])
    return bond_type_dict 
[docs]def print_allow_adsorption_onto(allow_adsorption_onto, logger=None):
    """
    Log the list of allowed adsorption sites.
    :type allow_adsorption_onto: list
    :param allow_adsorption_onto: SMARTS patterns of arbitrary
        adsorption sites to support
    :type logger: logging.getLogger
    :param logger: output logger
    """
    if not allow_adsorption_onto or not logger:
        return
    INDEX_SEP = '. '
    CUSTOM_ADSORPTION_SITE_DESCRIPTOR = 'custom_'
    HEADER1 = 'Allowed adsorption sites'
    HEADER2 = '-' * len(HEADER1)
    logger.info(HEADER1)
    logger.info(HEADER2)
    logger.info('')
    if Constants.SMARTS_WILDCARD in allow_adsorption_onto:
        logger.info(Constants.ALL_SMARTS.capitalize())
    else:
        allow_adsorption_onto = list(allow_adsorption_onto)
        total_dict = OrderedDict()
        for key, value in Constants.ALLOW_ADSORPTION_ONTO.items():
            if value in allow_adsorption_onto:
                total_dict[key] = value
                allow_adsorption_onto.remove(value)
        for key, value in enumerate(allow_adsorption_onto, 1):
            descriptor = \
                
CUSTOM_ADSORPTION_SITE_DESCRIPTOR + str(key)
            total_dict[descriptor] = value
        max_index = len(str(len(total_dict)) + INDEX_SEP)
        for index, pair in enumerate(list(total_dict.items()), 1):
            key, value = pair
            index_part = str(str(index) + INDEX_SEP).ljust(max_index)
            logger.info(index_part + key + ': ' + value)
    logger.info('') 
[docs]def get_structure_with_contiguous_molecules(molecules):
    """
    Return a single structure object containing the given molecules but
    such that the atom ordering per molecule is contiguous.
    :type molecules: list of schrodinger.structure._Molecule
    :param molecules: molecules that need to be contiguously assembled
        into a single structure
    :rtype: schrodinger.structure.Structure and two dicts
    :return: mols_st, single structure containing all molecules in contiguous
        form, and mols_new_to_old and mols_old_to_new, two reordering dictionaries
    """
    def extract_molecule(molecule, start=1):
        old_order = list(molecule.getAtomIndices())
        new_order = list(range(start, len(old_order) + start))
        new_to_old = dict(list(zip(new_order, old_order)))
        old_to_new = dict(list(zip(old_order, new_order)))
        molecule_st = molecule.extractStructure(copy_props=True)
        return molecule_st, new_to_old, old_to_new
    mols_st, mols_new_to_old, mols_old_to_new = extract_molecule(molecules[0])
    for molecule in molecules[1:]:
        mol_st, mol_new_to_old, mol_old_to_new = \
            
extract_molecule(molecule, mols_st.atom_total + 1)
        mols_st.extend(mol_st)
        mols_new_to_old.update(mol_new_to_old)
        mols_old_to_new.update(mol_old_to_new)
    return mols_st, mols_new_to_old, mols_old_to_new 
[docs]class ReactantMolecule(object):
    """
    Manage a reactant molecule in the reactants structure
    object.
    """
    HYDROGEN_SYM = 'H'
    HYDROGEN_BOND_ORDER = 1
[docs]    def __init__(self, molecule):
        """
        Create an instance.
        :type molecule: schrodinger.structure._Molecule
        :param molecule: reactant molecule to be extracted from
            the reactants structure
        """
        self.mol_obj = molecule
        self.new_to_orig_inds = {}
        self.orig_to_new_inds = {}
        for new, orig in enumerate(molecule.getAtomIndices(), 1):
            self.new_to_orig_inds[new] = orig
            self.orig_to_new_inds[orig] = new
        self.mol_struct = molecule.extractStructure(copy_props=True)
        self.reactive_inds_orig = []
        self.reactive_inds_new = []
        self.tmp_hydrogen_index = None 
[docs]    def setReactiveInds(self, reactive_inds_orig):
        """
        Set the two lists of reactive indicies attributes using
        the provided list of original reactive indicies
        :type reactive_inds_orig: list
        :param reactive_inds_orig: original reactive indicies for
            this reactant molecule
        """
        self.reactive_inds_orig = list(reactive_inds_orig)
        self.reactive_inds_new = [
            self.orig_to_new_inds[orig] for orig in reactive_inds_orig
        ] 
[docs]    def combineMapWith(self, other_molecule):
        """
        Return the new to orig atom reordering map for this molecule
        extended by that of the other molecule by offsetting the
        keys of the map of the other molecule.
        :type other_molecule: schrodinger.structure._Molecule
        :param other_molecule: molecule whose new to orig atom
            reordering map will be combined with that of the current
            molecule
        :rtype: dict
        :return: final_map, the combined new to original atom
            reordering maps of this molecule and the other molecule
        """
        current_num_atoms = self.mol_struct.atom_total
        final_map = self.new_to_orig_inds.copy()
        for key, value in other_molecule.new_to_orig_inds.items():
            final_map[key + current_num_atoms] = value
        return final_map 
[docs]    def addTempHydrogens(self):
        """
        Add a temporary hydrogen to the reactive atom of this molecule
        to act as a bond handle for performing the reaction.  All traces
        of this temporary hydrogen will be removed from this instance
        once the reactions are complete.  Note that the family of
        functions that handles the temporary hydrogens are put in
        place so that all reaction channels can be handled just
        like the bond-bond double displacement type, i.e. the
        association and single displacement types then become
        nothing more than double displacement with a fake hydrogen.
        """
        atom_to_hydrogenate = self.mol_struct.atom[self.reactive_inds_new[0]]
        atom_to_hydrogenate_vec = numpy.array(atom_to_hydrogenate.xyz)
        vec_to_hydrogen = find_good_bond_vector(atom_to_hydrogenate)
        # set the absolute position and bonding of the new temporary
        # hydrogen handle, store the new atom index, and append it to
        # the list of reactive atoms
        hydrogen_xyz = atom_to_hydrogenate_vec + vec_to_hydrogen
        hydrogen = self.mol_struct.addAtom(self.HYDROGEN_SYM, hydrogen_xyz[0],
                                           hydrogen_xyz[1], hydrogen_xyz[2])
        self.mol_struct.addBond(atom_to_hydrogenate, hydrogen,
                                self.HYDROGEN_BOND_ORDER)
        self.tmp_hydrogen_index = hydrogen.index
        self.reactive_inds_new.append(hydrogen.index) 
[docs]    def delTempHydrogens(self):
        """
        Delete all traces of the previously added temporary
        hydrogen.
        """
        self.mol_struct.deleteAtoms([self.tmp_hydrogen_index])
        self.reactive_inds_new.remove(self.tmp_hydrogen_index)
        self.tmp_hydrogen_index = None 
[docs]    def reactWith(self, other_molecule, reverse=False):
        """
        React this molecule with the provided other molecule
        according to their reactive indicies and return the product
        object.
        :type other_molecule: ReactantMolecule
        :param other_molecule: the input molecule that this molecule
            will react with
        :type reverse: bool
        :param reverse: specifies to reverse the order of reactive
            indicies for the other molecule
        :rtype: Products
        :return products: the products object containing the
            components that result from reacting this molecule with
            the input molecule
        """
        # reverse handled whether A in the following A-B + C-D
        # reaction is bound to C or D in the products
        mol_one_from, mol_one_to = self.reactive_inds_new
        if reverse:
            mol_two_to, mol_two_from = other_molecule.reactive_inds_new
        else:
            mol_two_from, mol_two_to = other_molecule.reactive_inds_new
        # call our wrapped verion of build.attach_structure, this
        # is the first product (A-C or A-D), for which the A part
        # will be fixed in space
        product_st = self.mol_struct.copy()
        mol_two_st = other_molecule.mol_struct.copy()
        orig_map, product_map, rev_product_map = \
            
wrapper_build_attach_structure(product_st,
            mol_one_from, mol_one_to, mol_two_st, mol_two_from,
            mol_two_to)
        # call our wrapped verion of build.attach_structure, this
        # is the second product (D-B or C-B), for which the D or C
        # parts will be fixed in space
        mol_one_st = self.mol_struct.copy()
        other_product_st = other_molecule.mol_struct.copy()
        orig_map, other_product_map, rev_other_product_map = \
            
wrapper_build_attach_structure(other_product_st,
            mol_two_to, mol_two_from, mol_one_st, mol_one_to,
            mol_one_from)
        # get Products object to finish this off
        products = Products(product_st, product_map, rev_product_map,
                            other_product_st, other_product_map,
                            rev_other_product_map)
        return products 
[docs]    def dissociate(self, dissociation_bond_length):
        """
        Dissociate this molecule according to its reactive
        indicies.
        :type dissociation_bond_length: float
        :param dissociation_bond_length: final bond length to use
            for dissociative reaction channels
        :rtype: Products
        :return products: the products object containing the
            components that result from reacting this molecule with
            the input molecule
        """
        product_st = self.mol_struct.copy()
        # adjust the bond length
        atom_one_ind, atom_two_ind = self.reactive_inds_orig
        product_st.adjust(dissociation_bond_length, atom_one_ind, atom_two_ind)
        # delete the bond
        product_st.deleteBond(atom_one_ind, atom_two_ind)
        # even though the Products class isn't perfect here we can
        # easily still use it by passing the map twice, the map is
        # lame because no real structure manipulations have been
        # performed and so indexing hasn't changed
        product_map = self.new_to_orig_inds.copy()
        products = Products(product_st, product_map, product_map)
        return products  
[docs]class Products(object):
    """
    Manage the properties of the products that result from
    applying a reaction channel definition to the reactants.
    """
    RXN_REPRESENTATION_KEY = 's_matsci_RXN_representation'
    PRODUCT_NAME_BASE = 'product'
[docs]    def __init__(self,
                 prod_one_st,
                 prod_one_map,
                 prod_one_rev_map,
                 prod_two_st=None,
                 prod_two_map=None,
                 prod_two_rev_map=None):
        """
        Create an instance.
        """
        self.prod_one_st = prod_one_st.copy()
        self.prod_one_map = prod_one_map.copy()
        self.prod_one_rev_map = prod_one_rev_map.copy()
        if prod_two_st:
            self.prod_two_st = prod_two_st.copy()
        else:
            self.prod_two_st = None
        if prod_two_map:
            self.prod_two_map = prod_two_map.copy()
        if prod_two_rev_map:
            self.prod_two_rev_map = prod_two_rev_map.copy()
        self.new_total_reorder = []
        self.product_st = self.prod_one_st.copy()
        self.rxn_representation = None
        self.index = None
        self.product_sts = [] 
[docs]    def deleteTmpHydrogens(self, mol_one, mol_two):
        """
        Remove any of the previously added temporary hydrogens from
        the product structures.
        :type mol_one: schrodinger.structure._Molecule
        :param mol_one: the first reactant molecule from which these
            products came
        :type mol_two: schrodinger.structure._Molecule
        :param mol_two: the second reactant molecule from which these
            products came
        """
        def delete_tmp_hydrogens(amap, hydrogen, arevmap, struct):
            hydrogen_new = amap.get(hydrogen)
            if hydrogen_new:
                arevmap = dict_delete_and_decrement_keys(arevmap, hydrogen_new)
                struct.deleteAtoms([hydrogen_new])
            return arevmap
        # if the reactant molecule one had a temporary hydrogen
        # then that hydrogen is now on product two from which
        # it must be removed, for product two it is the second
        # reactant molecules indexing which comes first so use
        # that to offset the hydrogen index from reactant molecule
        # one to get the final original hydrogen index to remove
        # from product two, then find the new verion of this index
        # and delete the atom and update the reverse atom map.  Note
        # that the reactant molecule one will never be left with
        # its own temporary hydrogen in a product because of the
        # way that the reverse option is applied only to reactant
        # molecule two (that is why in the code block following
        # the code block directly below you will see two called
        # to delete_tmp_hydrogens, one for product one and one
        # for product two, while in the block directly below you
        # will only see one call).
        if mol_one.tmp_hydrogen_index:
            final_hydrogen_index = mol_one.tmp_hydrogen_index + \
                
mol_two.mol_struct.atom_total
            self.prod_two_rev_map = \
                
delete_tmp_hydrogens(self.prod_two_map,
                final_hydrogen_index, self.prod_two_rev_map,
                self.prod_two_st)
        # if the reactant molecule two had a temporary hydrogen
        # then that hydrogen can now be on either product one or
        # two from which it must be removed, for the product one
        # it is the first reactant molecules indexing which comes
        # first so use that to offset the hydrogen index from
        # reactant molecule two to get the final original hydrogen
        # index to remove from product # one, for the product two
        # no offsetting will be needed because the temporary
        # hydrogen belonged to the reactant from which the product
        # came, then find the new verions of these indicies and
        # delete the atoms and update the reverse atom maps
        if mol_two.tmp_hydrogen_index:
            final_hydrogen_index = mol_two.tmp_hydrogen_index + \
                
mol_one.mol_struct.atom_total
            self.prod_one_rev_map = \
                
delete_tmp_hydrogens(self.prod_one_map,
                final_hydrogen_index, self.prod_one_rev_map,
                self.prod_one_st)
            final_hydrogen_index = mol_two.tmp_hydrogen_index
            self.prod_two_rev_map = \
                
delete_tmp_hydrogens(self.prod_two_map,
                final_hydrogen_index, self.prod_two_rev_map,
                self.prod_two_st) 
[docs]    def buildReorderList(self, mol_one, mol_two):
        """
        Build the atom reorder list needed to make the product
        structure ordering consistent with that of the original
        reactants.
        :type mol_one: schrodinger.structure._Molecule
        :param mol_one: the first reactant molecule from which these
            products came
        :type mol_two: schrodinger.structure._Molecule
        :param mol_two: the second reactant molecule from which these
            products came
        """
        # get the reorder map for product one using two maps,
        # one from product to reactant molecules and another from
        # reactant molecules to reactants
        prod_one_map_to_orig = mol_one.combineMapWith(mol_two)
        prod_one_reorder = []
        for atom in self.prod_one_st.atom:
            orig_ind = \
                
prod_one_map_to_orig[self.prod_one_rev_map[atom.index]]
            prod_one_reorder.append(orig_ind)
        # get the reorder map for product two using two maps,
        # one from product to reactant molecules and another from
        # reactant molecules to reactants
        prod_two_map_to_orig = mol_two.combineMapWith(mol_one)
        prod_two_reorder = []
        for atom in self.prod_two_st.atom:
            orig_ind = \
                
prod_two_map_to_orig[self.prod_two_rev_map[atom.index]]
            prod_two_reorder.append(orig_ind)
        # sum and invert the two product maps to get something that
        # build.reorder_atoms can process
        total_reorder = prod_one_reorder + prod_two_reorder
        for element in range(1, len(total_reorder) + 1):
            index = total_reorder.index(element)
            self.new_total_reorder.append(index + 1) 
[docs]    def combineAndReorder(self):
        """
        Assemble the final product structure for this reaction
        channel by combining the structure objects for each
        product molecule followed by reordering the product
        atoms to be consistent with the reactants.
        """
        # merge the product structures and apply the reorder map
        self.product_st = self.prod_one_st.copy()
        self.product_st.extend(self.prod_two_st)
        self.product_st = build.reorder_atoms(self.product_st,
                                              self.new_total_reorder) 
[docs]    def extendWithSpectator(self, spectator_st, map_to_orig):
        """
        Extend the product structure object with the spectator
        structure object and reorder the atoms to be consistent
        with that of the reactants from which they came.
        :type spectator_st: schrodinger.structure.Structure
        :param spectator_st: the structure object containing
            all spectator molecules
        :type map_to_orig: dict
        :param map_to_orig: an atom reordering dictionary where
            the keys are the reactant atom indicies followed by the
            spectator atom indicies and the values are their values
        """
        # if there were non-reactive spectator reactant structures
        # then append them to the product structure
        if spectator_st:
            self.product_st.extend(spectator_st)
        # build the reorder map from the map back to original atom
        # indicies, i.e. prior to splitting off the spectators and
        # prior to reaction
        reorder_map = []
        for atom in self.product_st.atom:
            orig_ind = map_to_orig[atom.index]
            reorder_map.append(orig_ind)
        # invert the reorder map
        new_reorder_map = []
        for index in range(1, len(reorder_map) + 1):
            new_index = reorder_map.index(index)
            new_reorder_map.append(new_index + 1)
        # apply reorder map
        self.product_st = build.reorder_atoms(self.product_st, new_reorder_map) 
[docs]    def updateProperties(self, index, channeldef, reverse=False):
        """
        Set some properties on this product structure.
        :type index: int
        :param index: index of this set of products
        :type channeldef: ChannelDefinition
        :param channeldef: the channel definition from which
            this product came
        :type reverse: bool
        :param reverse: specifies that the order of the reactive
            indicies for the second molecule were reversed
        """
        def separate_mol_part(mol):
            mol = mol.strip().split(SECOND_SEP)
            if len(mol) == 2:
                atom_a, atom_b = mol
            else:
                atom_a, atom_b = mol[0], None
            return atom_a, atom_b
        def get_bond(atom_a, atom_b):
            if atom_b:
                bond_ab = atom_a + BOND_SEP + atom_b
            else:
                bond_ab = atom_a
            return bond_ab
        NAME_SEP = '.'
        self.index = index
        current_name = self.product_st.property[CheckInput.TITLE_KEY]
        name_tag = current_name + NAME_SEP + self.PRODUCT_NAME_BASE + \
            
NAME_SEP + str(self.index)
        self.product_st.property[CheckInput.TITLE_KEY] = name_tag
        self.product_st.property[CheckInput.ENTRY_NAME_KEY] = name_tag
        FIRST_SEP = ChannelDefinition.FIRST_SEP
        SECOND_SEP = ChannelDefinition.SECOND_SEP
        BOND_SEP = '-'
        REACT_SEP = ' + '
        RXN_SEP = ' --> '
        mol_one_def, mol_two_def = channeldef.__repr__().split(FIRST_SEP)
        atom_a, atom_b = separate_mol_part(mol_one_def)
        atom_c, atom_d = separate_mol_part(mol_two_def)
        left_side = get_bond(atom_a, atom_b) + REACT_SEP + \
            
get_bond(atom_c, atom_d)
        if atom_b and atom_d:
            if reverse:
                right_side = get_bond(atom_a, atom_d) + REACT_SEP + \
                    
get_bond(atom_b, atom_c)
            else:
                right_side = get_bond(atom_a, atom_c) + REACT_SEP + \
                    
get_bond(atom_b, atom_d)
        elif atom_b:
            if reverse:
                right_side = get_bond(atom_a, atom_d) + REACT_SEP + \
                    
get_bond(atom_b, atom_c)
            else:
                right_side = get_bond(atom_a, atom_c) + REACT_SEP + \
                    
get_bond(atom_b, atom_d)
        elif atom_d:
            if reverse:
                right_side = get_bond(atom_c, atom_b) + REACT_SEP + \
                    
get_bond(atom_a, atom_d)
            else:
                right_side = get_bond(atom_a, atom_c) + REACT_SEP + \
                    
get_bond(atom_d, atom_b)
        else:
            right_side = get_bond(atom_a, atom_c)
        if self.prod_two_st:
            self.rxn_representation = left_side + RXN_SEP + right_side
        else:
            self.rxn_representation = right_side + RXN_SEP + left_side
        self.product_st.property[self.RXN_REPRESENTATION_KEY] = \
            
self.rxn_representation 
[docs]    def isolateProducts(self, isolate_products, flatdef):
        """
        If requested isolate the individual products to their
        own structure objects.
        :type isolate_products: bool
        :param isolate_products: specifies that products should be
            returned individually rather than as a whole
        :type flatdef: list
        :param flatdef: list containing the original reactive atom
            indicies of the input structure
        """
        if not isolate_products:
            self.product_sts = [self.product_st]
            return
        NAME_SEP = '.'
        title_tag = self.product_st.property[CheckInput.TITLE_KEY]
        title_tag += NAME_SEP
        entry_tag = self.product_st.property[CheckInput.ENTRY_NAME_KEY]
        entry_tag += NAME_SEP
        # from all molecules find the product molecules that
        # resulted from a reaction and extract them, update
        # properties, and append to list of product structures,
        # skip over products that are atoms
        flatdef_set = set(flatdef)
        index = 1
        for molecule in self.product_st.molecule:
            if set(molecule.getAtomIndices()).intersection(flatdef_set) \
                
and len(molecule.atom) > 1:
                product_st = molecule.extractStructure(True)
                product_st.property[CheckInput.TITLE_KEY] = \
                    
title_tag + str(index)
                product_st.property[CheckInput.ENTRY_NAME_KEY] = \
                    
entry_tag + str(index)
                self.product_sts.append(product_st)
                index += 1  
[docs]class Reactants(object):
    """
    Manage the properties of the reactants.
    """
[docs]    def __init__(self, allreactants, listdef):
        """
        Create an instance.
        :type allreactants: schrodinger.structure.Structure
        :param allreactants: the structure object of the input
            reactants
        :type listdef: list
        :param listdef: a list of two lists containing the reaction
            channel definitions
        """
        self.allreactants = allreactants
        self.def_part_one = list(listdef[0])
        self.def_part_two = list(listdef[1])
        self.reactant_st = None
        self.reactant_new_to_orig = {}
        self.reactant_orig_to_new = {}
        self.spectator_st = None
        self.spectator_new_to_orig = {}
        self.spectator_orig_to_new = {}
        self.map_to_orig = {}
        self.mol_one = None
        self.mol_two = None
        self.splitIntoComponents()
        self.updateDefParts()
        self.setReactiveMolecules() 
[docs]    def splitIntoComponents(self):
        """
        Split the input set of all reactants into reactive
        molecules and a set of spectator molecules.
        """
        # get reactive molecules
        mol_one = \
            
self.allreactants.atom[self.def_part_one[0]].getMolecule()
        mol_two = \
            
self.allreactants.atom[self.def_part_two[0]].getMolecule()
        mol_reactants = [mol_one]
        if mol_two.number != mol_one.number:
            mol_reactants.append(mol_two)
        self.reactant_st, self.reactant_new_to_orig, self.reactant_orig_to_new = \
            
get_structure_with_contiguous_molecules(mol_reactants)
        # get spectator molecules
        mol_spectators = []
        for molecule in self.allreactants.molecule:
            if molecule.number != mol_one.number and \
                
molecule.number != mol_two.number:
                mol_spectators.append(molecule)
        if mol_spectators:
            self.spectator_st, self.spectator_new_to_orig, self.spectator_orig_to_new = \
                
get_structure_with_contiguous_molecules(mol_spectators)
        # set reorder map used to later reorder products plus
        # spectators back to original ordering
        self.map_to_orig = self.reactant_new_to_orig.copy()
        for key, value in self.spectator_new_to_orig.items():
            self.map_to_orig[key + self.reactant_st.atom_total] = \
                
value 
[docs]    def updateDefParts(self):
        """
        Update the indicies in the reaction channel definition parts
        to be consistent with the new reactive molecule ordering.
        """
        self.def_part_one = [
            self.reactant_orig_to_new[index] for index in self.def_part_one
        ]
        self.def_part_two = [
            self.reactant_orig_to_new[index] for index in self.def_part_two
        ] 
[docs]    def setReactiveMolecules(self):
        """
        Set the reactive molecule attributes.
        """
        self.mol_one = \
            
self.reactant_st.atom[self.def_part_one[0]].getMolecule()
        self.mol_two = \
            
self.reactant_st.atom[self.def_part_two[0]].getMolecule()
        if self.mol_one.number == self.mol_two.number:
            self.mol_two = None  
[docs]class Channels(object):
    """
    Manage the enumeration of reaction channels.
    """
    # use OPLS 2005
    OPLS_VERSION = minimize.OPLS_2005
    OPLS_ENERGY_KEY = mm.OPLS_POTENTIAL_ENERGY
[docs]    def __init__(self,
                 reactants,
                 channeldefs,
                 allow_adsorption_onto=None,
                 dissociation_bond_length=Constants.DISSOCIATION_BOND_LENGTH,
                 no_minimization=Constants.NO_MINIMIZATION,
                 isolate_products=Constants.ISOLATE_PRODUCTS,
                 no_reactive_h=Constants.NO_REACTIVE_H,
                 unique=Constants.UNIQUE,
                 mae_out_file=None,
                 logger=None):
        """
        Create an instance.
        :type reactants: schrodinger.structure.Structure
        :param reactants: the reactants from which channels will be
            enumerated
        :type channeldefs: ChannelDefinitions
        :param channeldefs: list of ChannelDefinitions
        :type allow_adsorption_onto: None or list
        :param allow_adsorption_onto: SMARTS patterns of arbitrary
            adsorption sites to support
        :type dissociation_bond_length: float
        :param dissociation_bond_length: final bond length to use
            for dissociative reaction channels
        :type no_minimization: bool
        :param no_minimization: specifies that product geometries
            should not be minimized
        :type isolate_products: bool
        :param isolate_products: specifies that products should be
            returned individually rather than as a whole
        :type no_reactive_h: bool
        :param no_reactive_h: specify that R-H bonds be considered inert
        :type unique: bool
        :param unique: provide only unique products, firstly do not
            calculate redundant products and secondly filter out
            duplicate final structures using SMILES
        :type mae_out_file: str
        :param mae_out_file: the full path, including filename, that
            will be used to write the output `*mae` file that will contain
            the final set of structures, i.e. reactants plus all sets of
            products
        :type logger: logging.getLogger
        :param logger: output logger
        """
        self.reactants = reactants
        self.channeldefs = channeldefs
        self.allow_adsorption_onto = allow_adsorption_onto
        self.dissociation_bond_length = dissociation_bond_length
        self.no_minimization = no_minimization
        self.isolate_products = isolate_products
        self.no_reactive_h = no_reactive_h
        self.unique = unique
        if not mae_out_file:
            self.mae_out_file = os.path.basename(__file__).rstrip('.py')
        else:
            self.mae_out_file = mae_out_file
        self.logger = logger
        # A set of SMILES for the structures that have been processed
        self.unique_smiles = set()
        # Titles for the structures that have been filtered out due to
        # duplicate SMILES
        self.filtered_titles = []
        self.not_minimized = []
        self.rxn_representations = []
        self.mae_writer = None
        self.fast3d_obj = None
        self.add_hydrogens = False
        self.run_stereo = False 
[docs]    def setUpFast3D(self):
        """
        Set up the fast 3D object.
        """
        optimize = True
        self.fast3d_obj = fast3d.Volumizer('default', optimize) 
[docs]    def updateChannelDefReprs(self):
        """
        Update the representations of the reaction channel
        definitions.
        """
        FIRST_SEP = ChannelDefinition.FIRST_SEP
        SECOND_SEP = ChannelDefinition.SECOND_SEP
        def get_repr_with_names(reactant):
            START = '('
            END = ')'
            repr_with_names = []
            for index in reactant:
                atom = self.reactants.atom[index]
                if atom.name:
                    name = atom.name
                else:
                    name = atom.element
                repr_str = name + START + str(index) + END
                repr_with_names.append(repr_str)
            return repr_with_names
        for channeldef in self.channeldefs.definitions:
            r_one, r_two = channeldef.listdef
            r_one = SECOND_SEP.join(get_repr_with_names(r_one))
            r_two = SECOND_SEP.join(get_repr_with_names(r_two))
            channeldef.representation = \
                
r_one + FIRST_SEP + SECOND_SEP + r_two 
[docs]    def printChannelDefs(self):
        """
        Provide a formatted log of the final reaction channel
        definitions.
        """
        HEADER = 'Reaction channel definitions'
        SEPARATOR = ':'
        defs = self.channeldefs.definitions
        index_max_char = len(str(len(defs)))
        index_width = index_max_char + len(SEPARATOR)
        definition_max_char = [
            len(definition.__repr__()) for definition in defs
        ]
        definition_width = max(definition_max_char) + 1
        if self.logger:
            self.logger.info(HEADER)
            self.logger.info('-' * len(HEADER))
            self.logger.info('')
            for index, definition in enumerate(defs, 1):
                index_str = \
                    
str(str(index) + SEPARATOR).ljust(index_width)
                definition_str = \
                    
definition.__repr__().rjust(definition_width)
                self.logger.info(index_str + definition_str)
            self.logger.info('') 
[docs]    def initMaestroWriter(self):
        """
        Initialize the Maestro writer for the output `*mae` file and
        put the reactants structure in there.
        """
        self.mae_writer = structure.MaestroWriter(self.mae_out_file) 
[docs]    def termMaestroWriter(self):
        """
        Terminate the Maestro writer for the output `*mae` file.
        """
        self.mae_writer.close() 
[docs]    def processProducts(self, product_sts):
        """
        Process the product structures.
        :type product_sts: list of schrodinger.structure.Structure
        :param product_sts: contains all product structures for a
            given reaction channel
        """
        if self.unique:
            f_product_sts, titles = get_deduped_monomers(product_sts)
            self.filtered_titles.extend(titles)
        else:
            f_product_sts = list(product_sts)
        for product_st in f_product_sts:
            title = product_st.property[CheckInput.TITLE_KEY]
            # if the final structures are to be uniquified then do the
            # filtering here, since per reaction channel deduping is done
            # above this controls the deduping of products from different
            # reaction channels, only write the unique structures and collect
            # redundancies to be logged later.
            if self.unique:
                smiles = analyze.generate_smiles(product_st)
                if smiles not in self.unique_smiles:
                    self.unique_smiles.add(smiles)
                else:
                    self.filtered_titles.append(title)
                    continue
            # by default before any minimization eliminate any intra- or
            # inter-molecular ring spears or concatenations, if we know
            # that this elimination protocol has failed then just proceed
            # with the ring speared structure but mark it with a structure
            # property, for cases see MATSCI-1758
            if ringspear.check_for_spears(product_st):
                tmp_st = product_st.copy()
                try:
                    self.fast3d_obj.run(tmp_st, self.add_hydrogens,
                                        self.run_stereo)
                    product_st = tmp_st.copy()
                except Exception:
                    product_st.property[FAILED_SPEAR_ELIM_KEY] = True
            # all product structure objects are copied from the reactant
            # structure object so if they can't be optimized then delete
            # some OPLS properties in case they exist
            if not self.no_minimization:
                if minimize_geometry(product_st):
                    self.not_minimized.append(title)
                    product_st.property.pop(self.OPLS_ENERGY_KEY, None)
            self.mae_writer.append(product_st) 
[docs]    def enumerateChannels(self):
        """
        Main function to enumerate reaction channels.
        """
        # loop over channel definitions and create the product
        # structures
        index = 1
        for channeldef in self.channeldefs.definitions:
            # split the reactive system into one or two reactive
            # molecules and spectators
            components = Reactants(self.reactants, channeldef.listdef)
            # handle dissociations, i.e. only one reactive
            # molecule, or associations, single displacements, and
            # double displacements all of which involve two reactive
            # molecules
            if not components.mol_two:
                mol_one = ReactantMolecule(components.mol_one)
                mol_one.setReactiveInds(components.def_part_one + \
                    
components.def_part_two)
                products = mol_one.dissociate(self.dissociation_bond_length)
                products.extendWithSpectator(components.spectator_st,
                                             components.map_to_orig)
                products.updateProperties(index, channeldef)
                self.rxn_representations.append(products.rxn_representation)
                products.isolateProducts(self.isolate_products,
                                         channeldef.flatdef)
                # loop over products, minimize and append to list
                self.processProducts(products.product_sts)
                index += 1
            else:
                mol_one = ReactantMolecule(components.mol_one)
                mol_two = ReactantMolecule(components.mol_two)
                mol_one.setReactiveInds(components.def_part_one)
                mol_two.setReactiveInds(components.def_part_two)
                # if association or single displacement then add a
                # temporary hydrogen handle so that they can be handled
                # using the double displacment protocol
                if len(mol_one.reactive_inds_new) == 1:
                    mol_one.addTempHydrogens()
                if len(mol_two.reactive_inds_new) == 1:
                    mol_two.addTempHydrogens()
                # for each single or double displacement channel, i.e.
                # A-B + C-D, there are two possible products, i.e.
                # A-C + B-D or A-D + B-C, this is handled by do_which_types
                # which is affected by unique, if unique has not been specified
                # then we want both channels regardless and so we set both
                # to True
                if self.unique:
                    do_which_types = channeldef.do_which_types
                else:
                    do_which_types = [True, True]
                # do_which_types is only ever set for single and double
                # displacement channels, for association we want to skip
                # the swapping of fake hydrogens and so we pass True and False
                if mol_one.tmp_hydrogen_index and mol_two.tmp_hydrogen_index:
                    do_which_types = [True, False]
                # reverse will indicate whether the definition should be
                # reversed (necessary for the second type)
                is_reverse = [False, True]
                # loop over sub-channel cases
                for atype, reverse in zip(do_which_types, is_reverse):
                    if atype:
                        products = mol_one.reactWith(mol_two, reverse)
                        products.deleteTmpHydrogens(mol_one, mol_two)
                        products.buildReorderList(mol_one, mol_two)
                        products.combineAndReorder()
                        products.extendWithSpectator(components.spectator_st,
                                                     components.map_to_orig)
                        products.updateProperties(index, channeldef, reverse)
                        self.rxn_representations.append(
                            products.rxn_representation)
                        products.handleMetalBonding(channeldef.listdef, reverse)
                        products.isolateProducts(self.isolate_products,
                                                 channeldef.flatdef)
                        # loop over products, minimize and append to list
                        self.processProducts(products.product_sts)
                        index += 1
                # post-process those molecules for which temporary
                # hydrogens were added
                if mol_one.tmp_hydrogen_index:
                    mol_one.delTempHydrogens()
                if mol_two.tmp_hydrogen_index:
                    mol_two.delTempHydrogens() 
[docs]    def checkOutputFile(self):
        """
        Check the output file.
        """
        NO_PRODUCTS_MSG = """This calculation has resulted in zero
            relevant product structures.  Please check your input and
            settings.  This script will now exit."""
        if structure.count_structures(self.mae_out_file) == 1:
            if self.logger:
                self.logger.error(NO_PRODUCTS_MSG)
            fileutils.force_remove(self.mae_out_file)
            sys.exit(1) 
[docs]    def printRxnReprs(self):
        """
        Log a formatted print of all reaction channels in their
        reaction representation.
        """
        BUFFER = 1
        SEP = '.'
        # get some max field widths
        max_repr = 0
        for rxn_repr in self.rxn_representations:
            length = len(rxn_repr)
            if length > max_repr:
                max_repr = length
        max_index = len(str(len(self.rxn_representations))) + len(SEP)
        HEADER1 = 'Reactions'
        HEADER2 = '-' * (len(HEADER1))
        self.logger.info(HEADER1)
        self.logger.info(HEADER2)
        self.logger.info('')
        for index, rxn_repr in enumerate(self.rxn_representations, 1):
            index_repr = str(index) + SEP
            rxn_line = index_repr.ljust(max_index) + ' ' * BUFFER + \
                
rxn_repr.rjust(max_repr)
            self.logger.info(rxn_line)
        self.logger.info('') 
[docs]    def printFiltered(self):
        """
        Log the structures that were filtered by SMILES.
        """
        if self.filtered_titles and self.logger:
            FILTERED_MSG = """The following product structures will not
                be returned because they have non-unique SMILES, i.e. they
                are redundant conformers.  If you wish to have these structures
                as well then do not pass the -unique flag.  Here are the
                filtered structures: %s."""
            self.logger.warning(FILTERED_MSG % self.filtered_titles) 
[docs]    def printNotMinimized(self):
        """
        Log the structures that were not minimized.
        """
        num_total_structs = structure.count_structures(self.mae_out_file)
        ALL_STRUCTURES = '[ <all structures> ]'
        if self.not_minimized:
            NOT_MINIMIZED_MSG = """The geometries for the following
                list of structures were not able to be minimized likely
                due to problems with the defined atom types: %s.  Please
                try redefining those atom types, for example if your
                system contains metals then all bonds to those metals
                may need to be of zero-order.  The latter can be done
                with the Edit > Build > Create Zero-Order Bonds to Metals
                option in Maestro or with the bond decrement button.
                Note that reactive bonds can only be single bonds."""
            if self.logger:
                if len(self.not_minimized) == num_total_structs:
                    self.logger.warning(NOT_MINIMIZED_MSG % ALL_STRUCTURES)
                else:
                    self.logger.warning(NOT_MINIMIZED_MSG % self.not_minimized) 
[docs]    def orchestrate(self):
        """
        Orchestrate all of the components of this main class.
        """
        self.setUpFast3D()
        self.checkInputParams()
        self.updateChannelDefReprs()
        self.printChannelDefs()
        # set up writer
        self.initMaestroWriter()
        # minimize reactant structure so that total energies can be
        # compared with that of products, think of reactants as just
        # another set of products for the time being
        self.processProducts([self.reactants])
        # do the enumeration
        self.enumerateChannels()
        # tear down writer
        self.termMaestroWriter()
        if self.logger:
            self.printRxnReprs()
        self.printFiltered()
        self.printNotMinimized()
        # now that the output Maestro file is closed ensure that we have
        # products, this check was implemented when it was found that
        # running a unique dissociation calculation on zero-order bonds
        # produced no products (unless products are isolated), just
        # run this check all of the time
        self.checkOutputFile()