"""
A module to support reaction enumeration.
Examples::
# combinatorial enumeration from a route file
route = route.read_route('route.json')
synthesizer = Synthesizer(route)
# Assuming acids and amines are iterables with the reactants...
for product in synthesizer.synthesizeCombinations([acids, amines]):
print Chem.MolToSmiles(product)
"""
import collections
import copy
import csv
import functools
import itertools
import json
import os
import random
import re
from functools import reduce
from operator import mul
from rdkit import Chem
from rdkit.Chem import AllChem
from rdkit.Chem.Descriptors import MolWt
from schrodinger import structure
from schrodinger.application.pathfinder import molio
from schrodinger.application.pathfinder import reaction as reaction_mod
from schrodinger.application.pathfinder import route as route_mod
from schrodinger.thirdparty import rdkit_adapter
from schrodinger.utils import fileutils
from schrodinger.utils import log
logger = log.get_output_logger("pathfinder")
TITLE_PROP = 's_m_title'
ATOM_LABEL_PROP = 'i_reaction_atom_label'
PRODUCT_PROP = 'b_reaction_product'
H = Chem.MolFromSmarts('[H]')
[docs]class SystematicIterator:
"""
Generate all combinations (or a slice thereof) of products that
can be synthesized by a Synthesizer using the given sets of starting
materials.
"""
[docs] def __init__(self,
synthesizer,
reagent_sources,
start=0,
stop=None,
dedup=True):
"""
:param synthesizer: Synthesizer
:type synthesizer: Synthesizer
:param reagent_sources: List of reagent sources.
:type reagent_sources: iterable of ReagentSource or iterable of
list of Mol.
:param start: Start yielding from this reagent combination index
(counting from zero).
:type start: int
:param stop: Stop yielding when this product reagent combination index
is reached. None means unlimited.
:type stop: int or NoneType
:param dedup: skip duplicate products
:type dedup: bool
"""
def get_iter():
nonlocal reagent_sources
try:
check_frozen = any(r.has_frozen_atoms for r in reagent_sources)
reagent_sources = [r.mols for r in reagent_sources]
except AttributeError:
check_frozen = False
ranges = [range(len(source)) for source in reagent_sources]
cartesian_product = itertools.product(*ranges)
combinations = itertools.islice(cartesian_product, start, stop)
prods = set()
self.ntries = 0
for idxs in combinations:
self.ntries += 1
starting_materials = [
source[i] for source, i in zip(reagent_sources, idxs)
]
if not all(starting_materials):
continue # skip false mols which come from parsing errors
for product in synthesizer.run(starting_materials,
check_frozen):
smiles = Chem.MolToSmiles(product)
if not dedup:
yield product
elif smiles not in prods:
prods.add(smiles)
yield product
self.ntries = 0
self.route = synthesizer.route
self._iter = get_iter()
def __iter__(self):
return self._iter
def __next__(self):
return next(self._iter)
[docs]class RandomSampleIterator:
"""
Generate a random sample of unique products by repeatedly choosing a
random reagent from each reagent source and keeping track of which
products have been seen so far. Therefore, it is most efficient when
only a fraction of the possible space of reagent combinations is
sampled, because otherwise there may be many unproductive duplicates.
Also, since it keeps the set of products seen so far, it uses memory
proportional to 'size', so it is best if that number is not too large
(thousands is fine).
"""
[docs] def __init__(self,
synthesizer,
reagent_sources,
size,
max_tries=None,
dedup=True,
prng=random):
"""
:param synthesizer: Synthesizer
:type synthesizer: Synthesizer
:param reagent_sources: List of reagent sources
:type reagent_sources: iterable of ReagentSource or iterable of
list of Mol.
:param: size: desired sample size (number of products).
:type size: int
:param max_tries: maximum number of random attempts to make before
giving up, even if not enough products have been synthesized.
The default is a function of 'size' and the number of reactant
combinations.
:type max_tries: int
:param dedup: skip duplicate products
:type dedup: bool
:param prng: pseudo-random number generator
:type prng: random.Random
"""
def get_iter():
if self.ncombs == 0:
return
prods = set()
nprods = 0
while nprods < size and self.ntries < self.max_tries:
starting_materials = self._pickReagents()
if starting_materials is None:
break # Couldn't find any good combination.
self.npass_filters += 1
for product in synthesizer.run(starting_materials,
check_frozen):
smiles = Chem.MolToSmiles(product)
if not dedup:
nprods += 1
yield product
elif smiles not in prods:
nprods += 1
prods.add(smiles)
yield product
self.prng = prng
self.ntries = 0
self.npass_filters = 0
self.tries_per_source = [0] * len(reagent_sources)
self.route = synthesizer.route
try:
self.mol_lists = [r.mols for r in reagent_sources]
self.filter_lists = [r.filters for r in reagent_sources]
check_frozen = any(r.has_frozen_atoms for r in reagent_sources)
except AttributeError as e:
self.mol_lists = reagent_sources
self.filter_lists = [[] for s in reagent_sources]
check_frozen = False
self.ncombs = reduce(mul, (len(r) for r in self.mol_lists), 1)
self.max_tries = max_tries
if max_tries is None:
self.max_tries = min(size * 100, self.ncombs * 2)
self._iter = get_iter()
def __iter__(self):
return self._iter
def __next__(self):
return next(self._iter)
def _pickReagents(self):
"""
Find a random combination of reactants that pass the filters.
If none can be found within the alloted number of attempts,
return None.
:return: reactants
:rtype: list of Mol or None
"""
ret = []
self.ntries += 1
for i, mols, filts in zip(itertools.count(), self.mol_lists,
self.filter_lists):
while self.ntries < self.max_tries:
mol = self._pickMol(mols, filts)
self.tries_per_source[i] += 1
if mol:
ret.append(mol)
break
self.ntries += 1
else:
return None
return ret
def _pickMol(self, mols, filters):
"""
Pick a random molecule from a list and apply the filters.
If any of the filters don't pass, return None.
:param mols: molecules to pick from
:type mols: list of Mol
:param filters: filters to apply. Each is a callable taking an iterable
of Mol and yielding Mol.
:type filters: list of callable
:returns: selected Mol or None
:rtype: Mol or None
"""
mol = self.prng.choice(mols)
if not mol:
return None # false mols which come from parsing errors
passed = [mol]
for filt in filters:
passed = list(filt(passed))
if not passed:
return None
return passed[0]
[docs]class Synthesizer(object):
"""
A Synthesizer is a "machine" that given a RouteNode, knows how to apply all
the reactions in the RouteNode in the right order to a list of starting
materials to give the final products.
"""
[docs] def __init__(self,
route,
id_property=TITLE_PROP,
allow_multiple_products=False,
ncycles=None,
keep_labels=False,
failed_reaction_handle=None):
"""
Create a synthesizer from a route. The value of id_property is
propagated from the starting materials to the product, to make it
possible to keep track of where a product came from.
:param route: synthetic route to apply
:type route: RouteNode
:param id_property: name of the property that gets propagated to the
product for identification purposes
:type id_property: str
:param allow_multiple_products: if True, the synthesis won't fail if a
reaction generates more than one product
:type allow_multiple_products: bool
:param ncycles: how many times to try to apply each reaction; when
multiple substitution or polymerization is desired, use
ncycles > 1. The default is to use the ncycles specified
by each reaction node, or 1 if the node doesn't specify
it either.
:type ncycles: int or None
:param keep_labels: keep the frozen atom labels? If so, they are added
to the i_reaction_atom_label property.
:type keep_labels: bool
"""
self._sm_counter = itertools.count()
self.instructions = []
self._compile(route)
self.id_property = id_property
self.route = route
self.ncycles = ncycles
self.allow_multiple_products = allow_multiple_products
self.keep_labels = keep_labels
self.extra_props = collections.defaultdict(dict)
self.reaction_failure_tracker = None
if failed_reaction_handle is not None:
self.reaction_failure_tracker = ReactionFailureTracker(
failed_reaction_handle)
def _compile(self, route):
"""
Turn the route tree into a list of instructions for the stack-based
machine implemented in the run() method.
"""
if route.isStartingMaterial():
self.instructions.append(next(self._sm_counter))
else:
rxn_inst = route.reaction_instance
for precursor in route.precursors:
self._compile(precursor)
self.instructions.append((rxn_inst.reaction.inverse(),
route.ncycles, route.fallback_reagent))
[docs] def run(self, starting_materials, check_frozen=False):
"""
Return the final product of the RouteNode given a list of starting
materials.
:type starting_materials: iterable of Mol
:param check_frozen: if True, only return products for which all frozen
atoms from the reactants are still present in the product. This also
clears the frozen atom info (currently encoded as isotopes) from the
atoms.
:type check_frozen: bool
:return: list of products (may be empty)
:rtype: list of Mol
"""
reaction_mod.debug_mols('Synthesize from ', starting_materials)
self.has_nulls = any(is_null(mol) for mol in starting_materials)
stack = []
for step in self.instructions:
if isinstance(step, tuple):
reaction, rxn_ncycles, fallback_reagent = step
ncycles = self.ncycles or rxn_ncycles or 1
reactants_lists = self._popReactants(reaction, stack)
products = []
for reactants in itertools.product(*reactants_lists):
products += self._multiApplyReaction(
reaction, reactants, ncycles, fallback_reagent)
reaction_mod.debug_mols("pushing products: ", products)
stack.append(products)
if not products:
break
else:
stack.append([starting_materials[step]])
products = stack.pop()
if check_frozen:
products = self._checkFrozen(products, starting_materials)
products = uniquify_mols(products)
self._setProductProperties(products, starting_materials)
return products
def _popReactants(self, reaction, stack):
n = reaction.rxn.GetNumReactantTemplates()
reactants = stack[-n:]
del stack[-n:]
return reactants
def _multiApplyReaction(self, reaction, reactants, ncycles,
fallback_reagent):
"""
Apply a reaction up to `ncycles` times and return the
products from the last cycle that yielded products. Each cycle takes
each of the products from the previous cycle and tries to reuse it
as each of the reactants.
For example, if the first cycle was
OCC(C)O + CBr -> COCC(C)O + OCC(C)OC
the second cycle would try to do
COCC(C)O + CBr -> COCC(C)OC
OCC(C)OC + CBr -> COCC(C)OC
COCC(C)O + OCC(C)O + -> fail
OCC(C)OC + OCC(C)O + -> fail
(Multiple paths lead to the same polysubstituted product, but they
get deduplicated.)
When the reaction fails to generate any products, if fallback_reagent
is not None, the reactant with that index (1-based) is considered to be
the product. This is used when a route has an optional step (try the
reaction, but don't mind if it fails).
"""
products = self._applyReaction(reaction, reactants)
products = uniquify_mols(products)
for cycle in range(ncycles - 1):
new_products = []
for product in products:
for reactant_idx in range(len(reactants)):
curr_reactants = list(reactants)
curr_reactants[reactant_idx] = product
new_products += self._applyReaction(reaction,
curr_reactants)
if new_products:
# Uniquify here to reduce the combinatorial explosion due to
# many paths leading to the same products.
products = uniquify_mols(new_products)
else:
break
if not products:
# When products were not generated because the reaction didn't
# apply, turn the "fallback reactant", if any, into the product.
# We deliberately don't do this when the reaction is failed due
# to generating multiple products when allow_multiple_products is
# False.
if fallback_reagent:
# Copy because reactant is owned by caller.
product = copy.deepcopy(reactants[fallback_reagent - 1])
products = [product]
logger.debug(" optional %s failed" % reaction)
else:
logger.debug(" %s failed" % reaction)
elif len(products) > 1 and not self.allow_multiple_products:
logger.debug(" %s produces multiple products" % reaction)
reaction_mod.debug_mols(" reactants: ", reactants)
reaction_mod.debug_mols(" products: ", products)
products = [] # synthesis failed due to selectivity issues
if not products and self.reaction_failure_tracker is not None:
self.reaction_failure_tracker.writeFailure(reaction, reactants)
return products
def _applyReaction(self, reaction, reactants):
products_lists = reaction.apply(reactants)
# flatten products_lists because synthetic reactions have a
# a single product on the RHS.
products = [p[0] for p in products_lists]
if not products and self.has_nulls:
products = self._applyNull(reactants)
for product in products:
product.SetBoolProp(PRODUCT_PROP, True)
return products
def _applyNull(self, reactants):
"""
Apply a "reaction" using special rules for null reagents. The reaction
is considered successful in any of these cases:
- all the reactants but one are null;
- exactly one reactant is a product from a previous step (i.e., not a
starting material); or
- all the reactants are null.
The qualifying reactant becomes the product of the reaction. (In the
last case, we just pick the first null, under the assumption that all
nulls are the same.)
"""
nulls = []
products = []
starting_materials = []
for mol in reactants:
if is_null(mol):
nulls.append(mol)
elif mol.HasProp(PRODUCT_PROP) and mol.GetProp(PRODUCT_PROP):
products.append(mol)
else:
starting_materials.append(mol)
non_nulls = starting_materials + products
if nulls and len(non_nulls) == 1:
ret = non_nulls
elif len(products) == 1:
ret = products
elif len(reactants) == len(nulls):
ret = nulls[:1]
else:
ret = []
# Copy because starting materials are owned by the caller and we
# may modify them.
return copy.deepcopy(ret)
[docs] def synthesizeCombinations(self, reagent_sources, **kwargs):
"""
Create a SystematicIterator using the Synthesizer and the other
arguments. See SystematicIterator for details. (This method is redundant
but was kept for backward compatibility.)
"""
return SystematicIterator(self, reagent_sources, **kwargs)
[docs] def synthesizeRandomSample(self, reagent_sources, size, **kwargs):
"""
Crate a RandomSampleIterator using the Synthesizer and the other
arguments. See RandomSampleIterator for details. (This method is
redundant but was kept for backward compatibility.)
"""
return RandomSampleIterator(self, reagent_sources, size, **kwargs)
def _appendExtraProperties(self, product):
product_key = self._MolToKey(product)
if product_key in self.extra_props:
for prop_name, prop_value in self.extra_props[product_key].items():
product.SetProp(prop_name, prop_value)
def _setExtraProductProperties(self, product, prop_name, prop_value):
product_key = self._MolToKey(product)
self.extra_props[product_key][prop_name] = prop_value
def _MolToKey(self, product):
return id(product)
def _setProductProperties(self, products, starting_materials):
for product_idx, product in enumerate(products, 1):
self._clearProductProperties(product)
reagent_ids = [self._getReagentId(sm) for sm in starting_materials]
for i, reagent_id in enumerate(reagent_ids, 1):
prop = 's_reaction_reagent_%d' % i
product.SetProp(prop, reagent_id)
title = ' + '.join(reagent_ids)
title += f' [{product_idx}/{len(products)}]'
product.SetProp(TITLE_PROP, title)
product.SetProp('_Name', title) # used by RDKit file writers
product.SetIntProp('i_reaction_num_reagents',
len(starting_materials))
product.SetIntProp('i_reaction_num_products', len(products))
self._addReagentProperties(product, starting_materials)
# add extra product properties
self._appendExtraProperties(product)
def _addReagentProperties(self, product, starting_materials):
for sm_idx, sm in enumerate(starting_materials, 1):
try:
# Reactants labelled by ReagentNode.getLabeledMols() have a
# CXSMILES property based on the original reactant before
# applying the labels.
cxsmiles = sm.GetProp(route_mod.CXSMILES_PROP)
except KeyError:
cxsmiles = Chem.MolToCXSmiles(sm)
product.SetProp(f's_reaction_reagent_{sm_idx}_cxsmiles', cxsmiles)
props = rdkit_adapter.translate_rdkit_props_dict(
sm.GetPropsAsDict())
for name, value in props.items():
typechar = name[0]
new_name = '%s_reaction_reagent_%d_%s' % (typechar, sm_idx,
name)
set_prop(product, new_name, value)
def _checkFrozen(self, products, starting_materials):
"""
Check that:
1) All frozen atoms present in the starting materials are present in the
product.
2) All bonds between frozen atoms in the starting materials are present
in the product.
3) No new bonds invovling innerfrozen atoms are present in the product.
:param products: products to check
:type products: iterable of Mol
:param starting_materials: mols used to synthesize the products
:type products: iterable of Mol
:return: products that respect the frozen atom rules above
:rtype: generator of Mol
"""
sm_bonds = set()
sm_atoms = set()
for mol in starting_materials:
for atom in mol.GetAtoms():
iso = atom.GetIsotope()
if iso:
sm_atoms.add(iso)
for iso1, iso2 in get_bond_labels(mol):
if iso1 and iso2:
sm_bonds.add(tuple(sorted([iso1, iso2])))
for i, product in enumerate(products, 1):
product_bonds = set()
product_atoms = set()
forbidden_bonds = set()
frozen_info = {}
for iso1, iso2 in get_bond_labels(product):
innerfrozen1 = iso1 and iso1 > route_mod.INNERFROZEN_OFFSET
innerfrozen2 = iso2 and iso2 > route_mod.INNERFROZEN_OFFSET
if iso1 and iso2:
product_bonds.add(tuple(sorted([iso1, iso2])))
elif innerfrozen1 or innerfrozen2:
forbidden_bonds.add(iso1 or iso2)
for atom in product.GetAtoms():
iso = atom.GetIsotope()
if iso:
product_atoms.add(iso)
atom.SetIsotope(0)
if self.keep_labels:
atom.SetIntProp(ATOM_LABEL_PROP, iso)
if iso > route_mod.FROZEN_FACTOR:
frozen_attr = route_mod._decode_frozen_props(iso)
source_index = frozen_attr['source_index']
reagent_index = frozen_attr['reagent_index']
alignment_key = str((reagent_index, source_index))
# If the route has different reagent indices, map the
# (reagent_index, source_index) combination to the target
# index.
if self.route.target_atom_index_map:
alignment_key = self.route.target_atom_index_map[
alignment_key]
atom_prop = {'alignment_key': alignment_key}
frozen_info[atom.GetIdx() + 1] = atom_prop
missing_bonds = sm_bonds - product_bonds
missing_atoms = sm_atoms - product_atoms
if missing_bonds:
logger.debug(
"Skipping product %d due to missing frozen bonds: %s", i,
missing_bonds)
if missing_atoms:
logger.debug(
"Skipping product %d due to missing frozen atoms: %s", i,
missing_atoms)
if forbidden_bonds:
logger.debug(
"Skipping product %d due to new bonds to innerfrozen "
"atoms: %s", i, forbidden_bonds)
if not (missing_atoms or missing_bonds or forbidden_bonds):
if frozen_info:
self._setExtraProductProperties(product,
's_reaction_frozen_atoms',
json.dumps(frozen_info))
yield product
def _clearProductProperties(self, mol):
for prop in mol.GetPropNames():
if re.match(r'^._reaction_', prop):
mol.ClearProp(prop)
def _getReagentId(self, mol):
return reaction_mod.get_title(mol, self.id_property) or '<none>'
[docs]def get_bond_labels(mol):
"""
Yield the "labels" (isotopes) from the two atoms in each bond
"""
for bond in mol.GetBonds():
a1 = bond.GetBeginAtom()
a2 = bond.GetEndAtom()
yield a1.GetIsotope(), a2.GetIsotope()
[docs]class ReagentSource:
"""
A ReagentSource is just a struct-like object with the following fields:
- mols: a generator or list of Mol objects;
- size: (int) the number of molecules in mols;
- filename: (str) file where the mols came from (or as a special case,
"SMILES" if they came from the SMILES list hard-coded into the route);
- reagent_class: (str) name of the reagent class.
- index: the 1-based index that identifies the starting material in a route.
- filters: a list of filters to apply to this reagent source. Each filter is
a callable that takes an iterable of Mol and generating Mol.
- has_frozen_atoms: if true, the molecules have frozen atoms
"""
[docs] def __init__(self,
mols,
size,
filename,
reagent_class,
index,
has_frozen_atoms=False):
self.mols = mols
self.size = size
self.filename = filename
self.reagent_class = str(reagent_class)
self.index = index
self.filters = []
self.has_frozen_atoms = has_frozen_atoms
[docs]def get_reagent_sources(route, r_dict=None, libpath=None):
"""
Return a list of reagent sources given a route and, optionally, a dictionary
mapping reagent index to filename/SMILES (e.g,, {1: 'foo.smi', 2: 'CCO'}).
The special filename '' (empty string) means use SMILES associated with the
node. Values in r_dict take precedence, followed by filename associated with
the node, reagent class associated with the node, and SMILES associated with
the node.
:type r_dict: dict {int: str}
:param libpath: list of directories to prepend to the standard reagent
library search path
:type libpath: list of str
:return: reagent sources
:rtype: list of ReagentSource
:raises: ValueError when no file is found in the search path for a given
reagent class or there is neither reagent class nor SMILES.
"""
sources = []
r_dict = r_dict or {}
for index, sm in enumerate(route.getStartingNodes(), 1):
source = sm.getReagentSource(r_dict.get(index), libpath)
if source != route_mod.SMILES:
mols = molio.get_mol_reader(source, skip_bad=False)
try:
size = len(mols)
except TypeError:
size = structure.count_structures(source)
else:
route.updateTargetIndexMap(sm.getTargetIndexMap(index))
mols = sm.getLabeledMols(index)
size = len(mols)
sources.append(
ReagentSource(mols,
size,
source,
sm.reagent_class,
index,
has_frozen_atoms=bool(sm.frozen_atoms)))
return sources
[docs]def get_one_step_route(rxn):
"""
Create a route for a one-step synthesis given a Reaction.
:param rxn: reaction object
:type rxn: Reaction
:return: new route object
:rtype: RouteNode
"""
route = route_mod.RouteNode()
precursors = []
for cls in rxn.rhs_classes:
prec = route_mod.ReagentNode(reagent_class=cls)
precursors.append(prec)
route.setReactionInstance(rxn, precursors)
return route
[docs]def uniquify_mols(mols):
"""
Return a list of unique molecules from 'mols', using the canonical SMILES as
the comparison key.
:param mols: molecules to uniquify
:type mols: iterable of Mol
:return: unique molecules
:rtype: list of Mol
"""
unique_mols = []
seen = set()
for mol in mols:
smiles = Chem.MolToSmiles(mol)
if smiles in seen:
continue
seen.add(smiles)
unique_mols.append(mol)
return unique_mols
[docs]def desalt_mol(mol):
"""
Desalt molecule (keep first largest fragment).
:param mol: molecule to desalt
:type mol: rdkit.Chem.Mol
:return: desalted molecule (original is not modified)
:rtype: rdkit.Chem.Mol
"""
frags = Chem.GetMolFrags(mol, asMols=True, sanitizeFrags=True)
if len(frags) <= 1:
return mol
else:
desalted = max(frags, key=lambda f: (f.GetNumHeavyAtoms(), MolWt(f)))
for name in mol.GetPropNames(includePrivate=True):
desalted.SetProp(name, mol.GetProp(name))
return desalted
[docs]@functools.lru_cache()
def get_neutralization_patterns():
"""
Return the default neturalization patterns.
:return: list of (query, replacement)
:rtype: [(Mol, Mol)]
"""
# Patterns taken from
# https://www.rdkit.org/docs/Cookbook.html#neutralizing-charged-molecules
patts = (
# (SMARTS query, SMILES replacement)
# Imidazoles
('[n+;H]', 'n'),
# Amines
('[N+;!H0]', 'N'),
# Carboxylic acids and alcohols
('[$([O-]);!$([O-][#7])]', 'O'),
# Thiols
('[S-;X1]', 'S'),
# Sulfonamides
('[$([N-;X2]S(=O)=O)]', 'N'),
# Enamines
('[$([N-;X2][C,N]=C)]', 'N'),
# Tetrazoles
('[n-]', '[nH]'),
# Sulfoxides
('[$([S-]=O)]', 'S'),
# Amides
('[$([N-]C=O)]', 'N'),
)
return [(Chem.MolFromSmarts(smarts), Chem.MolFromSmiles(smiles, False))
for smarts, smiles in patts]
[docs]def neutralize_mol(mol):
"""
Neutralize a molecule.
:param mol: molecule to neutralize
:type mol: rdkit.Chem.Mol
:return: neutralized molecule (original is not modified)
:rtype: rdkit.Chem.Mol
"""
replaced = False
for reactant, product in get_neutralization_patterns():
while mol.HasSubstructMatch(reactant):
replaced = True
mol = AllChem.ReplaceSubstructs(mol, reactant, product)[0]
if replaced:
# This is needed to make sure the substructure search in
# create_reagent_library works properly.
Chem.SanitizeMol(mol)
return mol
[docs]def create_reagent_library(name,
input_file,
smarts,
id_property=TITLE_PROP,
properties=None,
require_single_match=False,
pfx=False,
add=False,
dedup=True,
desalt=False,
neutralize=False):
"""
Create a reagent library file for one reagent class by searching through
an input file using the provided SMARTS patterns. Writes a file called
<name>.csv or <name>.pfx, depending on the value of the `pfx` flag.
A structure is selected for the library when it matches ANY of the
specified patterns.
:param name: name of the reagent class.
:type name: str
:param input_file: name of structure file to process
:type input_file: str
:param smarts: SMARTS pattern (or patterns) to search
:type smarts: str or list of str
:param id_property: name of structure property to store as the title of each
reagent.
:type id_property: str
:param properties: properties to store in reaction file. None means "use all
properties from the first structure".
:type properties: (list of str) or None
:param require_single_match: skip input structures matching the SMARTS
pattern multiple times
:type require_single_match: bool
:param pfx: write pfx library file
:type pfx: bool
:param add: add new structures to output file if it exists, instead of
clobbering
:type add: bool
:param dedup: skip duplicate compunds, identified by canonical SMILES
:type dedup: bool
:param desalt: keep first largest molecule. This is done _before_ doing
the the SMARTS query.
:type desalt: bool
:param neutralize: neutralize the reactants. This is done _before_ doing
the the SMARTS query.
:type neutralize: bool
:return: output filename
:rtype: str
"""
def _write_mol(mol):
if dedup:
smiles = Chem.MolToSmiles(mol)
if smiles in seen:
return
seen.add(smiles)
try:
title = mol.GetProp(id_property)
except KeyError:
try:
title = mol.GetProp('_Name')
except KeyError:
title = f'{name}_{writer.written_count}'
mol.SetProp('_Name', title)
writer.append(mol)
seen = set()
if isinstance(smarts, str):
smarts = [smarts]
patts = [Chem.MolFromSmarts(s) for s in smarts]
explicit_h = any(patt.HasSubstructMatch(H) for patt in patts)
output_file = name
output_file += molio.PFX if pfx else '.csv'
writer_class = molio.PfxMolWriter if pfx else molio.CsvMolWriter
if add and os.path.isfile(output_file):
tmpout = '.tmp' + output_file
writer = writer_class(tmpout, properties=properties)
with molio.get_mol_reader(output_file, random_access=False) as reader:
for mol in reader:
_write_mol(mol)
old = writer.written_count
else:
writer = writer_class(output_file, properties=properties)
tmpout = None
with molio.get_mol_reader(input_file, random_access=False) as reader:
for mol in reader:
if desalt:
mol = desalt_mol(mol)
if neutralize:
mol = neutralize_mol(mol)
match_mol = Chem.AddHs(mol) if explicit_h else mol
for patt in patts:
if require_single_match:
keep = (1 == len(
match_mol.GetSubstructMatches(patt, maxMatches=2)))
else:
keep = match_mol.HasSubstructMatch(patt)
if keep:
_write_mol(mol)
break
writer.close()
total = writer.written_count
if tmpout:
fileutils.force_rename(tmpout, output_file)
new = total - old
logger.info(f'Wrote {total} structures ({new} new, {old} old) '
f'to {output_file}')
else:
logger.info(f'Wrote {total} structures to {output_file}')
return output_file
[docs]def set_prop(mol, name, value):
"""
Set a property for a Mol object, with the property type deduced from the
name, which should follow m2io conventions.
:param mol: molecule to modify
:type mol: rdkit.Chem.Mol
:param name: property name following m2io convention
:type name: str
:param value: property value
:type value: int, bool, float, or str, depending on `name`
"""
if name[0] == 'b':
mol.SetBoolProp(name, value)
elif name[0] == 'i':
mol.SetIntProp(name, value)
elif name[0] == 'r':
mol.SetDoubleProp(name, value)
else:
mol.SetProp(name, value)
[docs]def is_null(mol):
"""
Check if a molecule is considered a null reagent. The convention is that
null reagents are molecules consisting only of a dummy atom.
:param mol: molecule to test
:type mol: rdkit.Chem.Mol
:return: is the molecule a null reagent?
:rtype: bool
"""
return mol.GetNumAtoms() == 1 and mol.GetAtomWithIdx(0).GetAtomicNum() == 0
[docs]class ReactionFailureTracker:
"""
Helper class to track failed reactions, and print them out to a csv for
easy debugging. It currently also tracks total failures recorded, and is
intended to be a convenience class to track and report other statistics.
"""
[docs] def __init__(self, output_handle):
"""
:param output_handle: file handle for output
:type output_handle: _io.TextIOWrapper
"""
self.num_failures = 0
self.writer = csv.writer(output_handle, delimiter=',')
[docs] def writeFailure(self, reaction, reactants):
"""
Write failures into the output file handle provided at construction
:param reaction: reaction type
:type reaction: reaction.Reaction
:param reactants: list of reactants
:type reactants: list(rdkit.Chem.Mol)
"""
reactant_smiles = [Chem.MolToSmiles(reactant) for reactant in reactants]
self.writer.writerow([str(reaction)] + reactant_smiles)
self.num_failures += 1