import functools
import itertools
import math
import os
import random
from functools import reduce
from operator import mul
from rdkit import Chem
from rdkit.Chem import AllChem
from schrodinger import stepper
from schrodinger.models import json
from schrodinger.stepper import sideinputs
from schrodinger.analysis.transformations import TransformsRepository
from schrodinger.application.pathfinder import analysis
from schrodinger.application.pathfinder import filtering
from schrodinger.application.pathfinder import reaction
from schrodinger.application.pathfinder import synthesis
from schrodinger.application.pathfinder import route
from schrodinger.models import parameters
from . import filters
from . import utils
from .basesteps import LoggerMixin
from .basesteps import MolMapStep
from .dataclasses import FileInMixin
from .dataclasses import MolOutMixin
from .dataclasses import MolInMixin
from .dataclasses import MolMolMixin
from .dataclasses import MolToSmilesSerializer
# default R-group smarts for decorator step settings
DEFAULT_RGROUP_ATOM_SMARTS = '#6,#7,#8,#9,#16,#17,#35,#53'
def _auto_max_tries(max_products, total, tries_per_product=100, recovery=0.95):
"""
Determine a reasonable number of maximum number of combinations to try.
:param max_products: requested maximum number of products
:type max_products: int
:param total: the total number of reactant combinations
:type total: int
:param tries_per_product: number of tries per each generated product, on
average. Can be seen as an "inverse success rate".
:type tries_per_product: int
:param recovery: fraction of products which we would like to recover on
average when using random sampling in the limiting case where
max_products == total
:type tries_per_product: float
:return: max tries
:rtype: int
"""
# from python/scripts/combinatorial_synthesis.py
assert 0.0 < recovery < 1.0
# This is based on the observation that if we roll an N-sided die X*N
# times, we'll see 1 - exp(-X) of the faces at least once.
fac = -math.log(1.0 - recovery)
return min(
min(max_products, total) * tries_per_product, math.ceil(total * fac))
[docs]class Synthesizer(MolMolMixin, stepper.Chain):
"""
Enumerates unique sanitized molecules from a combinatorial synthesis using
routes based on the input molecules using the default reaction dictionary
and reagent library.
If the maximum number of products is less than the total number of
combinations the route synthesis will be done by random sampling, which may
yield fewer products than requested. Otherwise a systematic set of unique
products will be yielded.
The settings contain:
- `core_smarts`: the SMARTS that the products should have and needs to be
part of the input molecule.
- `depth`: the maximum depth of the retrosynthetic routes to use.
- `reagent_lib`: an optional directory to prepend to the standard reagent
library search path
- `max_products`: the maximum number of products try to synthesize for
each input molecule per route. Use 0 to force an exhaustive synthesis.
- `seed`: seed for random number generator. If `None`, the random number
generator will not be seeded.
- `yield_input`: whether the input molecule should be returned first
"""
[docs] class Settings(parameters.CompoundParam):
core_smarts: str = None
depth: int = 2
reagent_lib: stepper.StepperFolder = None # optional
max_products: int = 100
seed: int = None
yield_input: bool = True
deduplicate_routes: bool = False
[docs] def buildChain(self):
settings = self.settings
if self.settings.yield_input:
fork = sideinputs.ForkStep(step=MolMolMixin)
self.addStep(fork)
self.addStep(
RouteEnumerator(core_smarts=settings.core_smarts,
depth=settings.depth))
if self.settings.deduplicate_routes:
self.addStep(RouteDeduplicator())
self.addStep(
RouteEvaluator(max_products=settings.max_products,
reagent_lib=settings.reagent_lib,
seed=settings.seed))
if self.settings.yield_input:
self.addStep(sideinputs.JoinStep(fork))
[docs]class RouteSerializer(stepper.Serializer):
"""
A serializer for RouteNodes
"""
DataType = route.RouteNode
[docs] def toString(self, route: route.RouteNode) -> str:
return json.dumps(route.getTreeData(self_contained=True))
[docs] def fromString(self, route_str: str) -> route.RouteNode:
return route.parse_route_data(json.loads(route_str))
[docs]class RouteDeduplicator(stepper.ReduceStep):
Input = Output = route.RouteNode
InputSerializer = OutputSerializer = RouteSerializer
[docs] def setUp(self):
super().setUp()
self._seen_routes = set()
[docs] def reduceFunction(self, routes):
for rt in routes:
rt_str = rt.getOneLineRepresentation()
if rt_str in self._seen_routes:
continue
self._seen_routes.add(rt_str)
yield rt
[docs]class RouteEnumerator(LoggerMixin, MolInMixin, stepper.MapStep):
"""
Mol -> (RouteEnumerator) -> RouteNode
Takes a Mol and enumerates the retrosynthetic routes that can produce it.
"""
Output = route.RouteNode
OutputSerializer = RouteSerializer
[docs] class Settings(parameters.CompoundParam):
core_smarts: str = None
depth: int = 2
[docs] def validateSettings(self):
issues = utils.validate_core_smarts(self, self.settings.core_smarts)
if not reaction.get_reactions():
issues.append(stepper.SettingsError(self, 'no reactions defined'))
return issues
[docs] def setUp(self):
super().setUp()
self._reactions = reaction.get_reactions()
self._core_smarts = Chem.MolFromSmarts(self.settings.core_smarts)
[docs] def mapFunction(self, mol):
frozen_heavy_atoms = self._getFrozenHeavyAtoms(mol)
if frozen_heavy_atoms:
tree = analysis.retrosynthesize(mol,
self._reactions,
max_depth=self.settings.depth,
frozen_atoms=frozen_heavy_atoms)
yield from self._getRoutes(tree)
def _getFrozenHeavyAtoms(self, mol):
"""
The 1-based index of frozen heavy atoms to use in the retrosynthesis.
:param mol:
:type mol: Chem.Mol
:return: the set of frozen heavy atoms to use
:rtype: Set[int]
"""
mol_H = Chem.AddHs(mol)
return [
idx + 1
for idx in mol_H.GetSubstructMatch(self._core_smarts)
if mol_H.GetAtomWithIdx(idx).GetAtomicNum() != 1
]
def _getRoutes(self, tree):
"""
:param tree: the retrosynthesis result for a molecule
:type tree: schrodinger.application.pathfinder.route.RetroSynthesisNode
:return: routes that have not been synthesized before
:rtype: generator of schrodinger.application.pathfinder.route.RouteNode
"""
routes = tree.getRoutes()
for route in routes:
route_str = route.getOneLineRepresentation()
self.logger.debug(f'{self.getStepId()} ROUTE: {route_str}')
yield route
[docs]class RouteEvaluator(MolOutMixin, stepper.MapStep):
"""
RouteNode -> (RouteEvaluator) -> Mol
Takes a route and produces all products from it.
"""
InputSerializer = RouteSerializer
Input = route.RouteNode
[docs] class Settings(parameters.CompoundParam):
max_products: int = 100
seed: int = None
reagent_lib: stepper.StepperFolder = None # optional
[docs] def validateSettings(self):
issues = []
issues += utils.validate_folder(self, 'reagent_lib')
return issues
[docs] def setUp(self):
super().setUp()
reagent_lib = self.settings.reagent_lib
self._reagent_lib = [reagent_lib] if reagent_lib else None
def _getProducts(self, route):
"""
:param route: the route to get the products for
:type route: schrodinger.application.pathfinder.route.RouteNode
:return: if the maximum number of products requested in the settings is
less than the total number of products this route generates a random
sample of unique products, otherwise all products
:rtype: Iterable[Chem.Mol]
"""
settings = self.settings
max_products = settings.max_products
synthesizer = synthesis.Synthesizer(route)
reagent_sources = synthesis.get_reagent_sources(
route, libpath=self._reagent_lib)
if max_products:
total = reduce(mul, [src.size for src in reagent_sources])
if max_products < total:
return synthesis.RandomSampleIterator(
synthesizer,
reagent_sources,
max_products,
max_tries=_auto_max_tries(max_products, total),
prng=random.Random(settings.seed)
if settings.seed else random)
return synthesis.SystematicIterator(synthesizer, reagent_sources)
[docs] def mapFunction(self, route):
yield from self._getProducts(route)
[docs]class Decorator(MolMapStep):
"""
Enumerates unique sanitized molecules formed by replacing a hydrogen on a
C, N, or O atom in the ligand with an R-group that was attached to an Ar.
The `rgroup_atom_smarts` setting allows for filtering of which reagents in
the `rgroup_file` are used for the decoration reaction. The default value of
'#6,#7,#8,#9,#16,#17,#35,#53' is used if the `rgroup_atom_smarts` is an
empty string or None.
The settings is a `filters.ProfileSettings` instance whose `property_ranges`
are used to determine with R-groups are allowed to react. The
`property_ranges` are optional.
seealso:: filters.ProfileSettings
Example R-group reagents::
C[Ar]
N[Ar]
Example of a Decorator definition in a yaml file::
Decorator:
rgroup_atom_smarts: '*' # allow all unique rgroup reagents
rgroup_file: rgroups_small.smi
core_smarts: c1ccccc1
property_ranges:
MolWt: [250, 500]
RingCount: [0, 5]
NumAromaticRings: [0, 3]
NumAliphaticRings: [0, 5]
NumSpiroAtoms: [0, 0]
"""
LEAVING_MOL_WT = filtering.DESCRIPTORS_DICT['MolWt'](
Chem.MolFromSmiles('[ArH]'))
[docs] class Settings(filters.ProfileSettings):
rgroup_file: stepper.StepperFile
core_smarts: str = None
rgroup_atom_smarts: str = DEFAULT_RGROUP_ATOM_SMARTS
[docs] def validate(self, step):
issues = utils.validate_core_smarts(step, self.core_smarts)
rgroup_smarts_mol = self.getReagentMol()
if not rgroup_smarts_mol:
issues.append(
stepper.SettingsError(step, 'invalid rgroup_atom_smarts'))
file_issues = utils.validate_file(step,
'rgroup_file',
required=True)
issues += file_issues
if not file_issues and rgroup_smarts_mol and not self.getRGroups():
issues.append(
stepper.SettingsError(step, 'no matching R-groups'))
if self.property_ranges:
issues += super().validate(step)
return issues
[docs] def getReagentSmarts(self):
atoms = self.rgroup_atom_smarts or '*'
return f'[Ar]-[{atoms}:1]'
[docs] def getReagentMol(self):
return Chem.MolFromSmarts(self.getReagentSmarts())
[docs] def getReactionSmarts(self):
return f'{self.getReagentSmarts()}.[#6,#7,#8;h,H1,H2,H3:3]>>[*:1]-[*:3]'
[docs] def getRGroups(self):
rgroup_mol = self.getReagentMol()
if not rgroup_mol:
return []
rgroups = []
if os.path.isfile(self.rgroup_file):
supplier = Chem.SmilesMolSupplier(self.rgroup_file,
titleLine=0,
nameColumn=-1)
rgroups = [
m for m in supplier
if m is not None and m.HasSubstructMatch(rgroup_mol)
]
return synthesis.uniquify_mols(rgroups)
[docs] def validateSettings(self):
return self.settings.validate(self)
[docs] def setUp(self):
super().setUp()
self._seen_smiles = set()
self._core_smarts = Chem.MolFromSmarts(self.settings.core_smarts)
reaction_smarts = self.settings.getReactionSmarts()
self.logger.debug(
f'{self.getStepId()}: reaction smarts: {reaction_smarts}')
self._reaction = AllChem.ReactionFromSmarts(reaction_smarts)
self._rgroups = self.settings.getRGroups()
self.logger.info(
f'{self.getStepId()}: {len(self._rgroups)} unique R-group(s)')
def _getRGroupsForMol(self, mol):
"""
Generator yielding R-group reagents that will not violate the required
property ranges for the decorated mol.
:param mol: the molecule to get the appropriate R-group reagents for
:return: collections.abc.Iterable[Chem.Mol]
"""
for reagent in self._rgroups:
for prop, (prop_min,
prop_max) in self.settings.property_ranges.items():
mol_prop = filtering.DESCRIPTORS_DICT[prop](mol)
reagent_prop = filtering.DESCRIPTORS_DICT[prop](reagent)
if prop in ('MolWt', 'MVCorrMW'):
reagent_prop -= self.LEAVING_MOL_WT
total_prop = mol_prop + reagent_prop
if total_prop < prop_min or total_prop > prop_max:
break
else:
yield reagent
[docs] def mapFunction(self, mol):
for rgroup in self._getRGroupsForMol(mol):
reactants = (rgroup, mol)
for product, in self._reaction.RunReactants(reactants):
if Chem.SanitizeMol(product, catchErrors=True):
continue # silently ignore products that can't be sanitized
smiles = Chem.MolToSmiles(product)
if smiles not in self._seen_smiles:
if product.HasSubstructMatch(self._core_smarts):
self._seen_smiles.add(smiles)
yield product
[docs]class FragmenterMixin:
"""
A mixin providing trimmer functionality for fragmenters.
"""
BOND_SMARTS = NotImplementedError
[docs] def getBreakableBonds(self, mol):
"""
Return the bond indices in the molecule that are allowed to be broken.
:param mol: the molecule to fragment
:type mol: Chem.Mol
:return: a generator of bonds to break in the molecule
:rtype: generator of int
"""
return (mol.GetBondBetweenAtoms(x, y).GetIdx(
) for x, y in itertools.chain(
*[mol.GetSubstructMatches(smarts) for smarts in self.BOND_SMARTS]))
[docs] def fragmentToMolecule(self, fragment):
"""
Return the molecule version of the fragment.
:param fragment: the fragment
:type fragment: Chem.Mol
:return: the molecule version of the fragment
:rtype: Chem.Mol
"""
return utils.fragment_to_molecule(fragment)
[docs] def isFragmentableMol(self, mol):
"""
Use this method to determine whether the molecule may be fragmented.
:param mol: the molecule to check
:type mol: Chem.Mol
:return: whether the molecule can/should be further fragmented
:rtype: bool
"""
return True
[docs] def isAcceptableFragment(self, mol):
"""
Use this method to determine which fragments are acceptable.
:param mol: the molecule to check
:type mol: Chem.Mol
:return: whether the fragment is acceptable
:rtype: bool
"""
return True
def _trimmer(self, mol, seen_smiles=None):
"""
Recursively trim mol to generate unique fragment molecules.
:param mol: Mol to be fragmented.
:type mol: Chem.Mol
:param seen_smiles: SMILES of molecules that should not be returned. Use
NoneType if not
:type seen_smiles: set(str) or NoneType
:return: generator of trimmed molecules
:rtype: generator of Chem.Mol
"""
seen_smiles = seen_smiles or set()
for bond in self.getBreakableBonds(mol):
fragments = Chem.FragmentOnBonds(mol, [bond], dummyLabels=[(0, 0)])
for fragment in Chem.GetMolFrags(fragments, asMols=True):
molecule = self.fragmentToMolecule(fragment)
smiles = Chem.MolToSmiles(molecule)
if smiles not in seen_smiles:
seen_smiles.add(smiles)
if self.isAcceptableFragment(molecule):
yield molecule
if self.isFragmentableMol(molecule):
for frag in self._trimmer(molecule, seen_smiles):
yield frag
[docs] def trimmer(self, mol, max_fragments=500):
"""
Recursively trim the specified mol to generate maximally max_fragments
fragment molecules.
Returns the input molecule first if it is an acceptable fragment.
:param mol: Mol to be fragmented.
:type mol: Chem.Mol
:param max_fragments: the total number of fragment molecules to generate
:type max_fragments: int
:return: generator of trimmed molecules
:rtype: generator of Chem.Mol
"""
yield_count = 0
if self.isAcceptableFragment(mol):
yield_count += 1
yield mol
if self.isFragmentableMol(mol):
for fragment_mol in self._trimmer(
mol, seen_smiles={Chem.MolToSmiles(mol)}):
yield fragment_mol
yield_count += 1
if yield_count >= max_fragments:
break
[docs]class Fragmenter(filters.MaxMolWtMixin, FragmenterMixin, MolMapStep):
"""
Recursively fragment molecules.
Unless it is filtered due to it's molecular weight or core smarts, this step
returns the original molecule first, followed by unique fragment molecules
that contain the SMARTS substructure defined in settings and have a
molecular weight less than or equal to the optional max_mol_wt setting.
The number of molecules returned is limited by max_out in the settings.
Fragmentation will only take place for bonds defined in `BOND_SMARTS`.
"""
BOND_SMARTS = tuple(
Chem.MolFromSmarts(smarts) for smarts in [
'[#6;!R]-[!R]', '[!R]-[R]', '[R]!@&-[R]',
'[C;R1,R2;r3,r4,r5,r6]-&@[C,N,O,S;R1;r3,r4,r5,r6]'
])
[docs] class Settings(filters.MaxMolWtSettings):
core_smarts: str = None
max_out: int = 500
[docs] def validateSettings(self):
issues = utils.validate_core_smarts(self, self.settings.core_smarts)
if self.settings.max_out <= 1:
issues.append(
stepper.SettingsError(self, 'max_out should be larger than 1'))
max_mol_wt_issue = self.validateMaxMolWtSettings()
if max_mol_wt_issue:
issues.append(max_mol_wt_issue)
return issues
[docs] def setUp(self):
super().setUp()
self._core_smarts = Chem.MolFromSmarts(self.settings.core_smarts)
self._need_Hs = None
[docs] @functools.lru_cache()
def hasCoreSmarts(self, mol):
if self._need_Hs:
mol = Chem.AddHs(mol)
return mol.HasSubstructMatch(self._core_smarts)
[docs] def isAcceptableFragment(self, mol):
return self.hasAcceptableMolWt(mol) and self.hasCoreSmarts(mol)
[docs] def isFragmentableMol(self, mol):
# only fragments with the core smarts can be further fragmented
return self.hasCoreSmarts(mol)
[docs] def getBreakableBonds(self, mol):
# only breakable bonds that are not in the core may be broken
all_breakable_bonds = set(super().getBreakableBonds(mol))
breakable_core_bonds = set(self.getBreakableCoreBonds(mol))
return sorted(all_breakable_bonds - breakable_core_bonds)
[docs] def getBreakableCoreBonds(self, mol):
"""
Return the set of breakable core bond indices in the molecule.
:param mol: the molecule to fragment
:type mol: Chem.Mol
:return: a generator of breakable core bond indices in the molecule
:rtype: generator of int
"""
if self._need_Hs:
mol = Chem.AddHs(mol)
core_atoms = mol.GetSubstructMatch(self._core_smarts)
for x, y in itertools.combinations(core_atoms, 2):
if mol.GetBondBetweenAtoms(x, y) is not None:
yield mol.GetBondBetweenAtoms(x, y).GetIdx()
[docs] def mapFunction(self, mol):
# determine whether we need to add Hs to get core smarts matching
if self._need_Hs is None:
self._need_Hs = utils.need_Hs(mol, self._core_smarts)
if self._need_Hs is None: # wasn't able to match
return
yield from self.trimmer(mol, max_fragments=self.settings.max_out)