"""
A module to perform retrosynthetic analysis.
Examples::
# retrosynthetic analysis
reactions = reaction.read_reactions_file('reactions.json')
retrosynthesis = analysis.retrosynthesize(amide, reactions)
print(retrosynthesis.asString())
for route in retrosynthesis.getRoutes():
print(route.asString())
print("Starting materials:")
for sm in route.getStartingNodes():
print("-", sm)
route.write('route.json')
"""
import collections
import copy
import fnmatch
import more_itertools
from rdkit import Chem
from schrodinger.application.pathfinder import route as route_mod
from schrodinger.utils import log
logger = log.get_output_logger("pathfinder")
# Tag for "auxiliary transforms" which get special treatment during the
# retrosynthetic analysis.
TRANSFORM = 'transformation'
RetrosynthesisState = collections.namedtuple('RetrosynthesisState',
'mols_analyzed depth done tree')
RetrosynthesisState.mols_analyzed.__doc__ = \
"Number of molecules analyzed so far (int)"
RetrosynthesisState.depth.__doc__ = \
"Depth of the last node to be analyzed (int)"
RetrosynthesisState.done.__doc__ = \
"Flag indicating whether the analysis ran to completion (bool)"
RetrosynthesisState.tree.__doc__ = \
"Retrosynthetic tree (RetrosynthesisNode)"
[docs]class ForwardAnalyzer:
"""
Using "applicable" synthetic reactions and a list of routes, generate a new
list of routes up to a certain depth. A reactions is synthetic if it has
more than one reactant; it is "applicable" if any of the reactant templates
match the target molecule of the route.
We don't actually apply the reactions because we don't know what the "other
reactant" would be yet; we just check which atoms would be matched. Multiple
reactions can be applied as long as the atoms matched by each don't overlap
(in other words, the reactions are orthogonal). This works best when the
reaction SMARTS are written in such a way that only relevant atoms are
matched.
"""
[docs] def __init__(self, rxns):
"""
:param rxns: reaction dictionary by name
:type rxns: {str: reaction.Reaction}
"""
self._rxns = rxns
self._syn_rxns = [
rxn.inverse() for rxn in rxns.values() if not is_transformation(rxn)
]
logger.debug(f'Got {len(self._syn_rxns)} synthetic reactions')
[docs] def analyzeRoutes(self, routes, max_depth):
"""
For each input route, generate new routes using "applicable" synthetic
reactions.
:param routes: list of input routes
:type routes: [schrodinger.application.pathfinder.route.RouteNode]
:param max_depth: maximum number of steps. NOTE: this is a total depth
including the steps already in the input route. For
example, if an input route has two steps and
max_depth=3, only 1 more step would be considered.
:type max_depth: int
:return: list of new routes. None of the input routes are returned,
because transformation-only routes are not considered
interesting for the purpose of this class.
:rtype: [schrodinger.application.pathfinder.route.RouteNode]
"""
new_routes = {}
for route in routes:
self._analyzeOne(route, max_depth, new_routes)
return list(new_routes.values())
def _analyzeOne(self, node, max_depth, new_routes):
used_atoms = set()
max_depth -= node.depth()
self._analyze(node.mol, max_depth, 0, node, used_atoms, new_routes)
def _analyze(self, mol, max_depth, start, node, used_atoms, new_routes):
if max_depth <= 0:
return
for i in range(start, len(self._syn_rxns)):
syn_rxn = self._syn_rxns[i]
rxn = self._rxns[syn_rxn.name]
for i, tpl in enumerate(syn_rxn.rxn.GetReactants()):
for match in mol.GetSubstructMatches(tpl):
atoms = set(match)
if not used_atoms & atoms:
new_node = self._makeNode(rxn, i, node)
key = new_node.treeAsString()
if key not in new_routes:
new_routes[key] = new_node
self._analyze(mol, max_depth - 1, i, new_node,
used_atoms | atoms, new_routes)
def _makeNode(self, rxn, prec_idx, prec_node):
new_node = route_mod.RouteNode()
precursors = []
for i, reagent_class in enumerate(rxn.rhs_classes):
if i == prec_idx:
precursors.append(prec_node)
else:
node = route_mod.ReagentNode(reagent_class=reagent_class)
precursors.append(node)
new_node.setReactionInstance(rxn, precursors)
return new_node
def _validate_reactions_for_retrosynthesis(reactions):
"""
Check that the reactions are usable for retrosysnthesis. For example, check
that they are all one-product reactions.
:param reactions: reactions to check
:type reactions: list of Reaction
:raises: ValueError
"""
for rxn in reactions:
numtargets = rxn.rxn.GetNumReactantTemplates()
if numtargets != 1:
raise ValueError(f"Bad reaction '{rxn}'. Retrosynthesis only "
"supports one-product reactions.")
[docs]def retrosynthesize(*a, **d):
"""
Convenience wrapper for retrosynthesize_gen for backward compatibility.
Takes takes the same arguments as retrosynthesize_gen and returns a
RetrosynthesisNode.
"""
for state in retrosynthesize_gen(*a, **d):
pass
return state.tree
[docs]def retrosynthesize_gen(target_mol,
reactions_dict,
max_depth=1,
exclude=None,
require_bonds=None,
frozen_atoms=None,
broken_bonds=None,
fragments=None,
label_attachment_atoms=False):
r"""
Generate a retrosynthetic tree to the desired depth based on the target
molecule. This function is a generator which yields after analyzing each
molecule (node in the tree). The search is done breadth-first and the
caller can break out at any time, getting a valid (but possibly incomplete)
tree.
:type target_mol: Mol
:param reactions_dict: Reaction dictionary.
:type reactions_dict: .reaction.ReactionDict or dict {str: Reaction}
:param exclude: Set of tags or reaction names to exclude. Tags are matched
exactly; names are matched using shell globbing (\*, ?, []).
:type exclude: set of str
:param require_bonds: If provided, only reactions which break at least
one bond in this set will be used for the analysis. Each bond
is a sorted tuple of two 1-based atom indexes.
:type require_bonds: set of tuple of int
:param frozen_atoms: If provided, reactions which break any bond between
two atoms in this set are skipped.
:type frozen_atoms: set of int
:param broken_bonds: If provided, this dict will be filled with a
bond: reactions mapping, where a bond is a pair of atom indexes,
and the reactions a set of strings (reaction names) for all
the bonds that were broken during the analysis.
:type broken_bonds: {(int, int): set(str)}
:param fragments: If provided, this set will be filled with tuples of atom
indexes for the fragments identified during the analysis.
"Fragment" here is understood as a subset of atoms in the original
molecule that is also present in a precursor.
:type fragments: set of tuple of int
:param label_attachment_atoms: if true, add atom mapping numbers to
"attachment atoms", which are atoms that were present in the target
molecule and which were involved in any broken bond.
:type label_attachment_atoms: bool
:return: yields a RetrosynthesisState after analyzing each molecule. The
current tree can be found in the .tree property of the state object.
NOTE: the same (evolving) tree object is returned with each state!
This is for efficiency reasons and because there's not a strong use
case for keeping a "trajectory".
:rtype: generator of RetrosynthesisState
"""
# sort reactions by name to get a predictable, reproducible result
reactions = [reactions_dict[r] for r in sorted(reactions_dict)]
_validate_reactions_for_retrosynthesis(reactions)
if exclude:
reactions = filter_reactions(reactions, exclude)
if (frozen_atoms or require_bonds or broken_bonds is not None or
fragments is not None):
target_mol = _add_atom_ids(target_mol)
if broken_bonds is None:
# We'll set it even if the caller didn't, because it's used by
# _retrosynthesize to switch on the bond/atom tracking.
broken_bonds = {}
if frozen_atoms:
frozen_bonds = _get_target_bonds(target_mol, frozen_atoms)
innerfrozen_atoms = _get_innerfrozen_atoms(target_mol, frozen_atoms)
else:
frozen_bonds = set()
innerfrozen_atoms = set()
mol = Chem.RemoveHs(target_mol)
def get_mol_for_node(mol):
if broken_bonds is not None:
# broken_bonds implies that atoms were labeled
tgt_bonds = _get_target_bonds(mol)
mol_for_node = _strip_atom_ids(mol, frozen_atoms, innerfrozen_atoms)
frozen = bool(tgt_bonds & frozen_bonds)
else:
mol_for_node = mol
frozen = False
tgt_bonds = None
return mol_for_node, frozen, tgt_bonds
mol_for_node, frozen, tgt_bonds = get_mol_for_node(mol)
root = route_mod.RetroSynthesisNode(mol_for_node,
reagent_class=None,
frozen=frozen)
queue = collections.deque([(root, mol, tgt_bonds, 0, set())])
n = 0
inverse_tags = getattr(reactions_dict, 'inverse_tags', None)
while queue:
n += 1
node, mol, tgt_bonds, depth, blacklist = queue.popleft()
if depth >= max_depth:
continue
for rxn in reactions:
new_blacklist = set(blacklist)
if inverse_tags:
if rxn.tags & blacklist:
continue
for tag in rxn.tags:
new_blacklist |= inverse_tags[tag]
for precursors in rxn.apply((mol,)):
if broken_bonds is not None:
prec_bonds = set()
for precursor in precursors:
prec_bonds |= _get_target_bonds(precursor)
if fragments is not None:
if fragment := _get_fragment_atoms(precursor):
fragments.add(fragment)
cur_changed_bonds = tgt_bonds - prec_bonds
# if a bond is missing from the precursor, it is broken
prec_bond_atoms = {bond[0] for bond in prec_bonds}
cur_changed_atoms = {bond[0] for bond in cur_changed_bonds}
cur_broken_bonds = cur_changed_atoms - prec_bond_atoms
logger.debug(
f'{max_depth}: {rxn} broken bonds: {cur_broken_bonds}')
if (TRANSFORM not in rxn.tags and require_bonds and
not (cur_broken_bonds & require_bonds)):
# We did not break any of the required bonds, so skip.
continue
if cur_changed_bonds & frozen_bonds:
# We broke or changed a frozen bond, so skip.
continue
for bond in cur_broken_bonds:
broken_bonds.setdefault(bond, set()).add(rxn.name)
if label_attachment_atoms:
for precursor in precursors:
_label_attachment_atoms(precursor, broken_bonds)
else:
cur_broken_bonds = set()
precursor_nodes = []
for precursor, reagent_class in zip(precursors,
rxn.rhs_classes):
if reagent_class and reagent_class.reactive:
next_depth = max_depth
else:
next_depth = depth + 1
mol_for_node, frozen, prec_tgt_bonds = get_mol_for_node(
precursor)
newnode = route_mod.RetroSynthesisNode(mol_for_node,
reagent_class,
frozen=frozen)
if next_depth < max_depth:
queue.append((newnode, precursor, prec_tgt_bonds,
next_depth, new_blacklist))
precursor_nodes.append(newnode)
node.addReactionInstance(rxn, precursor_nodes, cur_broken_bonds)
yield RetrosynthesisState(n, depth + 1, not queue, root)
def _label_attachment_atoms(mol, broken_bonds):
"""
Label the "attachment atoms", those that were involved in a bond in the
target but no longer are. The labels are added as atom mapping numbers and
the value is the original atom index (1-based).
:param mol: molecule to modify
:type mol: rdkit.Chem.rdchem.Mol
:param broken_bonds: bonds that have been broken so far
:type broken_bonds: set of (int, int)
"""
bond_atoms = set(more_itertools.flatten(broken_bonds))
for atom in mol.GetAtoms():
iso = atom.GetIsotope()
if iso and iso in bond_atoms:
atom.SetAtomMapNum(iso)
def _add_atom_ids(mol):
"""
Return a copy of mol in which a unique ID has been added to each atom
(currently implemented as a hack using isotope information).
:param mol: molecule to modify
:type mol: rdkit.Chem.rdchem.Mol
"""
new_mol = copy.copy(mol)
for i, atom in enumerate(new_mol.GetAtoms(), 1):
# We don't label hydrogens because we will use an implicit hydrogen
# representation, and RemoveHs won't remove hydrogen with isotope info.
if atom.GetAtomicNum() != 1:
atom.SetIsotope(i)
return new_mol
def _strip_atom_ids(mol, frozen_atoms, innerfrozen_atoms):
"""
Return a copy of mol from which the atom IDs have been removed (under the
current hack, this means removing all isotope information from the
molecule). Also, if a truthy `frozen_atoms` is provided, a boolean property
is added to each atom specifying whether the atom is frozen or not,
and the atom index of the atom in the target (user-input) molecule
is stored as an integer property.
:param mol: molecule to modify
:type mol: rdkit.Chem.rdchem.Mol
:param frozen_atoms: set of frozen atom indexes (1-based)
:type frozen_atoms: set of int
:param innerfrozen_atoms: set of innerfrozen atom indexes (1-based)
:type innerfrozen_atoms: set of int
:return: stripped molecule
:rtype: rdkit.Chem.rdchem.Mol
"""
new_mol = copy.copy(mol)
for atom in new_mol.GetAtoms():
if frozen_atoms:
iso = atom.GetIsotope()
atom.SetBoolProp(route_mod.FROZEN_PROP, iso in frozen_atoms)
atom.SetBoolProp(route_mod.INNERFROZEN_PROP, iso
in innerfrozen_atoms)
atom.SetIntProp(route_mod.TARGET_ATOM_IDX_PROP, iso)
atom.SetIsotope(0)
return new_mol
def _get_target_bonds(mol, atoms=None):
"""
Return a set of "target bonds" in the molecule. These are the bonds which
existed in the original target molecule, which means that both of the atoms
in the bond have a unique ID. (New atoms added by reactions don't have an
ID.)
:param mol: molecule
:type mol: rdkit.Chem.rdchem.Mol
:param atoms: if provided, only return bonds for which both atoms belong to
this set of 1-based atom indexes.
:type atoms: set of int
:return: set of bonds; each bond is a sorted tuple of two 1-based atom
indexes.
:rtype: set of tuple of int
"""
bonds = set()
for bond in mol.GetBonds():
a1 = bond.GetBeginAtom()
a2 = bond.GetEndAtom()
iso1 = a1.GetIsotope()
iso2 = a2.GetIsotope()
bond_order = bond.GetBondType()
if iso1 and iso2:
if atoms and not (iso1 in atoms and iso2 in atoms):
continue
atom_pair = tuple(sorted([iso1, iso2]))
bonds.add((atom_pair, bond_order))
return bonds
def _get_innerfrozen_atoms(mol, frozen_atoms):
"""
Find the "innerfrozen" atoms, which are the subset of frozen atoms that are
not connected to any non-frozen atom.
:param mol: molecule
:type mol: rdkit.Chem.rdchem.Mol
:param frozen_atoms: set of 1-based frozen atom indexes
:type frozen_atoms: set of int
:return: set of 1-based innerfrozen atom indexes
:type atoms: set of int
"""
innerfrozen = set(frozen_atoms)
for idx in frozen_atoms:
atom = mol.GetAtomWithIdx(idx - 1)
for nei in atom.GetNeighbors():
nei_idx = nei.GetIdx() + 1
if nei_idx not in frozen_atoms:
innerfrozen.discard(idx)
return innerfrozen
def _get_fragment_atoms(mol):
"""
Return a tuple of atom indices in the "original target molecule" that are
available in `mol` through atom labels (isotopes). Not all atoms in `mol`
carry the labels since only some are related (can be mapped back) to the
"original target molecule".
:param mol: molecule
:type mol: rdkit.Chem.rdchem.Mol
:return: atom indexes (1-based)
:rtype: tuple of int
"""
return tuple(
sorted(a.GetIsotope() for a in mol.GetAtoms() if a.GetIsotope()))
[docs]def reaction_is_excluded(reaction, exclude):
r"""
Check if a reaction meets any of the exclusion criteria.
:type reaction: Reaction
:param exclude: Set of tags or reaction names to exclude. Tags are matched
exactly; names are matched using shell globbing (\*, ?, []).
:type exclude: set of str
:return: True if reaction is meets any of the exclusion criteria
:rtype: bool
"""
if reaction.tags & exclude:
return True
for patt in exclude:
if fnmatch.fnmatch(reaction.name, patt):
return True
return False
[docs]def filter_reactions(reactions, exclude):
r"""
Return a shallow copy of a list of reactions, filtering out those matching
any of the exclusion criteria.
:type reactions: list of Reaction
:param exclude: Set of tags or reaction names to exclude. Tags are matched
exactly; names are matched using shell globbing (\*, ?, []).
:type exclude: set of str
:rtype: list of reaction
"""
return [r for r in reactions if not reaction_is_excluded(r, exclude)]