"""
Functions to support multi-route enumeration (AKA "simple reaction enumeration"
or "automated reaction enumeration").
"""
import contextlib
import functools
import hashlib
import itertools
import math
import os
import random
import sys
import tempfile
import time
import zlib
import networkx as nx
import numpy as np
import requests
from rdkit import Chem
from schrodinger import structure
from schrodinger.application.combinatorial_screen import fingerprint_utils
from schrodinger.application.pathfinder import aligner
from schrodinger.application.pathfinder import analysis
from schrodinger.application.pathfinder import filtering
from schrodinger.application.pathfinder import molio
from schrodinger.application.pathfinder import reaction
from schrodinger.application.pathfinder import synthesis
from schrodinger.job import jobcontrol
from schrodinger.structutils import analyze
from schrodinger.structutils import transform
from schrodinger.thirdparty import rdkit_adapter
from schrodinger.utils import fileutils
from schrodinger.utils import log
logger = log.get_output_logger('pathfinder')
# Special reagent class for core hopping.
CORES = reaction.ReagentClass('cores')
ROUTE_ID_PROP = 'i_reaction_route_index'
ROUTE_PROP = 's_reaction_route'
# Maximum analysis depth to consider when the user doesn't specify a depth.
AUTO_MAX_DEPTH = 5
# Other defaults
DEFAULT_MAX_ROUTES = 100
DEFAULT_MAX_PER_ROUTE = 1000
# How many bytes to read at a time when downloading / hashing a file.
CHUNK_SIZE = 65536
[docs]class MultiRouteEnumerator:
"""
A generator of products following the multiroute enumeration protocol.
"""
[docs] def __init__(self,
mol,
reactions_dict,
*,
dedup=True,
depth=None,
descriptors=filtering.DEFAULT_DESCRIPTORS,
frozen_atoms=frozenset(),
libpath=None,
max_per_route=DEFAULT_MAX_PER_ROUTE,
max_routes=DEFAULT_MAX_ROUTES,
no_core_hopping=False,
product_property_filter_file=None,
product_smarts_filter_file=None,
ref_mols=None,
ch_dist_tol=1.0,
ch_ang_tol=15.0,
bond_reactions=None,
prefilter_reagents=None,
fp_dir=None,
fp_url=None,
fp_namespace=None,
forward=False,
prng=random,
route_prng=None,
**unused_args):
"""
:param mol: input molecule to be used for the initial retrosynthetic or
forward analysis.
:type mol: rdkit.Chem.Mol
:param dedup: skip duplicate products (using SMILES for comparison)
:type dedup: bool
:param depth: analysis depth (if None, increasing depths will be
attempted until enough routes are found)
:type depth: int or NoneType
:param descriptors: names of RDKit descriptors to compute for each
product
:type descriptors: list of str
:param frozen_atoms: indexes (1-based) of atoms to keep in the product
:type frozen_atoms: set of int
:param libpath: directories to search for reactant files
:type libpath: list of str
:param max_per_route: maximum number of products per route
:type max_per_route: int
:param max_routes: maximum number of routes to sample
:type max_routes: int
:param no_core_hopping: don't use the special core hopping mode even
when possible
:type no_core_hopping: bool
:param product_property_filter_file: name of JSON file with product
property filters
:type product_property_filter_file: str
:param product_smarts_filter_file: name of .cflt file with SMARTS
patterns
:type product_smarts_filter_file: str
:param ref_mols: reference molecules for similarity calculations
:type ref_mols: list(Mol) or str
:param ch_dist_tol: core-hopping distance tolerance in Angstroms
(maximum allowed change in the distance between side
chains, relative to the input structure)
:type ch_dist_tol: float
:param ch_ang_tol: core-hopping angle tolerance in degrees (maximum
change bond vector angle for side chains, relative to
the input structure)
:type ch_ang_tol: float
:param bond_reactions: dict specifying which reactions are allowed to
break certain bonds. Keys are tuples of two ints
(sorted atom indexes); values are sets of
reaction names.
:param bond_reactions: {(int, int): set(str)}
:param prefilter_reagents: number of most similar molecules to return.
:type prefilter_reagents: int or NoneType
:param fp_dir: directory to search for fingerprint files in addition to CWD
:type fp_dir: str or NoneType
:param forward: use forward analysis mode (routes start from
`mol` instead of ending there)
:type forward: bool
:param prng: pseudo-random number generator to be used for the
enumeration, for picking the route and reactants to try at
each iteration
:type prng: random.Random
:param route_prng: pseudo-random number generator to be used for
selecting the subset of routes to use, based on
max_routes. If not supplied, `prng` will be used.
:type route_prng: random.Random or NoneType
"""
self.mol = mol
self.reactions_dict = reactions_dict or reaction.get_reactions()
self.dedup = dedup
self.depth = depth
self.descriptors = descriptors
self.forward = forward
self.frozen_atoms = frozen_atoms
self.libpath = libpath
self.max_per_route = max_per_route
self.max_routes = max_routes
self.no_core_hopping = no_core_hopping
self.product_property_filter_file = product_property_filter_file
self.product_smarts_filter_file = product_smarts_filter_file
self.ch_dist_tol = ch_dist_tol
self.ch_ang_tol = ch_ang_tol
self.bond_reactions = bond_reactions
self.prefilter_reagents = prefilter_reagents
self.fp_dir = fp_dir
self.fp_url = fp_url
self.fp_namespace = fp_namespace
self.prng = prng
self.route_prng = route_prng or prng
if isinstance(ref_mols, str):
self.ref_mols = list(molio.get_mol_reader(ref_mols))
self.ref_mol = self.ref_mols[0]
else:
self.ref_mols = ref_mols
self.ref_mol = ref_mols[0] if ref_mols else None
if self.no_core_hopping:
self.ref_measurements, self.core_atoms, self.core_neighbors = (
{}, set(), set())
else:
self.ref_measurements, self.core_atoms, self.core_neighbors = \
analyze_frozen_atoms(mol, frozen_atoms)
self.frozen_atom_maps = []
[docs] def generate_mols(self):
"""
:rtype: generator of Mol
"""
samples = self._generate_samples()
unfiltered_products = meta_sample(samples, self.dedup, prng=self.prng)
products = filtering.add_filters(
unfiltered_products,
ref_mols=self.ref_mols,
property_filter_file=self.product_property_filter_file,
smarts_filter_file=self.product_smarts_filter_file,
descriptors=self.descriptors)
products = self._apply_core_hopping_filters(products)
return products
def _apply_core_hopping_filters(self, products):
"""
Generator to filter `products` to exclude those in which any measurement
of distance and angles between side chains differs too much from the
reference measurements. (If there are no reference measurements, all
products pass).
:param products: products to filter
:type products: iterator of rdkit.Chem.Mol
:return: molecules meeting the geometric criteria
:rtype: generator of rdkit.Chem.Mol
"""
for mol in products:
ok = True
if self.ref_measurements:
edge_atoms = self._get_edge_atoms(mol) # 1-based
st = st_from_mol(mol)
for (r1,
tgt_r1), (r2,
tgt_r2) in itertools.combinations(edge_atoms, 2):
path = Chem.GetShortestPath(mol, r1 - 1, r2 - 1) # 0-based
if len(path) < 3:
continue
c1, c2 = path[1] + 1, path[-2] + 1
dist, angle = measure_vectors(st, r1, c1, r2, c2)
key = tuple(sorted([tgt_r1, tgt_r2]))
logger.debug('prod: %.2f, %.1f' % (dist, angle))
ref_dist, ref_angle = self.ref_measurements[key]
logger.debug('ref: %.2f, %.1f' % (ref_dist, ref_angle))
if (abs(dist - ref_dist) > self.ch_dist_tol or
abs(angle - ref_angle) > self.ch_ang_tol):
ok = False
break
if ok:
yield mol
def _get_edge_atoms(self, mol):
"""
Return a list of tuples (index, ref_index) for atoms which were labeled
as attachment atoms and came from frozen components.
:param mol: molecule
:type mol: rdkit.Chem.Mol
:return: list of (index, ref_index), both 1-based, where "index" refers to
`mol` and "ref_index" is the index of the corresponding atom in the
input molecule.
:rtype: list of (int, int)
"""
atoms = []
route_idx = mol.GetIntProp(ROUTE_ID_PROP)
for atom in mol.GetAtoms():
try:
label = atom.GetIntProp(synthesis.ATOM_LABEL_PROP)
except KeyError:
continue
try:
ref_idx = self.frozen_atom_maps[route_idx][label]
except KeyError:
# Not all frozen atoms are edge atoms, so skip.
continue
atoms.append((atom.GetIdx() + 1, ref_idx))
return atoms
def _getRetrosyntheticRoutes(self):
"""
Perform a retrosynthetic analysis of `self.mol` and return routes.
"""
logger.info('Performing retrosynthetic analysis...')
if self.depth is not None:
depth_range = [self.depth]
else:
depth_range = range(1, AUTO_MAX_DEPTH + 1)
prev_nroutes = -1
for curr_depth in depth_range:
tree = analysis.retrosynthesize(self.mol,
self.reactions_dict,
max_depth=curr_depth,
frozen_atoms=self.frozen_atoms,
label_attachment_atoms=True,
broken_bonds={})
routes = [
r for r in tree.getRoutes(bond_reactions=self.bond_reactions)
if has_variable_reactants(r)
]
nroutes = len(routes)
logger.info(f'{nroutes} routes found with depth <= {curr_depth}.')
if nroutes == prev_nroutes or nroutes >= self.max_routes:
break
prev_nroutes = nroutes
return routes
def _getSyntheticRoutes(self):
"""
Perform a "forward analysis" of `self.mol` and return routes.
"""
logger.info('Performing forward analysis...')
transformer = analysis.Transformer(self.reactions_dict)
transform_routes = transformer.apply(self.mol, self.depth)
logger.info('Functional group transformations generated '
f'{len(transform_routes)} initial routes.')
analyzer = analysis.ForwardAnalyzer(self.reactions_dict)
routes = analyzer.analyzeRoutes(transform_routes, self.depth)
logger.info(f'Generated {len(routes)} synthetic routes.')
return routes
def _generate_samples(self):
"""
Analyze `self.mol`, generate all routes and pick up to max_routes at
random, and finally turn each route into a RandomSampleIterator.
:return: samples
:rtype: list of RandomSampleIterator
"""
if self.forward:
routes = self._getSyntheticRoutes()
else:
routes = self._getRetrosyntheticRoutes()
if len(routes) > self.max_routes:
routes = self.route_prng.sample(routes, self.max_routes)
logger.info(f'Will sample {len(routes)} routes.')
samples = []
# Zeroth element placeholder because routes are numbered from 1.
self.frozen_atom_maps = [{}]
for i, route in enumerate(routes, 1):
logger.debug(
f'Setting up route {i}: {route.getOneLineRepresentation()}')
sources, atom_map = self._get_reagent_sources(route)
self.frozen_atom_maps.append(atom_map)
syn = synthesis.Synthesizer(route,
allow_multiple_products=True,
keep_labels=True)
sample = synthesis.RandomSampleIterator(syn,
sources,
self.max_per_route,
prng=self.prng)
sample.index = i
samples.append(sample)
return samples
def _get_reagent_sources(self, route):
"""
Find the reagent sources for a route.
:param route: route to analyze
:type route: schrodinger.application.pathfinder.route.RouteNode
:return: reagent sources and frozen atom map, the latter being a dict
with frozen atom labels as keys and input molecule atom indexes
as values.
:rtype: ([ReagentSource], {int: int})
"""
sources = []
sizes = []
frozen_atom_map = {}
logger.debug(route.treeAsString())
for index, sm in enumerate(route.getStartingNodes(), 1):
if sm.reagent_class:
if is_core_sm(sm, self.core_atoms):
reagent_class = CORES
else:
reagent_class = sm.reagent_class
filename = reagent_class.findReagentFile(self.libpath)
mols = self._get_mol_supplier(filename)
source = filename
else:
route.updateTargetIndexMap(sm.getTargetIndexMap(index))
mols = sm.getLabeledMols(index)[:1]
self._update_frozen_atom_map(mols[0], self.core_neighbors,
frozen_atom_map)
source = 'SMILES'
size = len(mols)
logger.debug(f'{index}: {source} ({size:,})')
sources.append(
synthesis.ReagentSource(mols,
size,
source,
sm.reagent_class,
index,
has_frozen_atoms=bool(sm.frozen_atoms)))
return sources, frozen_atom_map
def _update_frozen_atom_map(self, mol, core_neighbors, atom_map):
"""
Update the `atom_map` by adding keys for each frozen atom that was
labeled as an "attachment atom" by the retrosynthesis and with the
label (which is the atom index in the original input molecule) as a
value.
As a side effect, also removes the attachment atom labels.
:param mols: molecule to modify
:type mols: rdkit.Chem.rdchem.Mol
:param core_neighbors: indices of atoms directly bound to the core
:type core_neighbors: set of int
:param atom_map: {frozen_atom_label: target_atom_idx}
:type atom_map: {int: int}
:return: molecule
:rtype: rdkit.Chem.rdchem.Mol
"""
for atom in mol.GetAtoms():
num = atom.GetAtomMapNum()
if num:
atom.SetAtomMapNum(0)
if num in core_neighbors:
iso = atom.GetIsotope()
atom_map[iso] = num
@functools.lru_cache(maxsize=256)
def _get_mol_supplier(self, filename):
"""
Read a reagent file. Calls are cached so if the function is called again
with the same arguments, the return value will be reused for speed.
If a positive value is given for prefilter_reagents and a reference
molecule is provided, the reactant file is filtered and all the top
`prefilter_reagents` molecules are returned, based on similarity to the
reference.
:param filename: filename
:type filename: str
:return: reactants
:rtype: rdkit.Chem.rdmolfiles.SmilesMolSupplier or list of
rdkit.Chem.rdchem.Mol
"""
# The caching speeds the setup of the job by at least an order of
# magnitude. In practice, let's say we are using 100 routes, but most of
# them share the same reagent files, so each file might be read at least
# 20 times. Currently we only have 82 reagent files, so even if a ligand
# managed to use them all (which is unlikely), we are still pretty far
# from filling the cache.
logger.debug(f'reading {filename}')
mols = molio.get_mol_reader(filename, skip_bad=False)
if self.prefilter_reagents:
size = self.prefilter_reagents
if self.fp_namespace is not None:
subdir = self.fp_namespace
else:
subdir = ''
fp_path = get_fp_file(filename,
self.fp_dir,
url_base=self.fp_url,
subdir=subdir)
if fp_path:
logger.info(
f'Using top {size} reagents by similarity: {filename}')
query_smiles = Chem.MolToSmiles(self.ref_mol)
query_fp = fingerprint_utils.smiles_to_fingerprint(query_smiles)
_, _, indexes, _ = fingerprint_utils.rank_reactants(fp_path,
query_fp,
size,
alpha=1.0,
beta=1.0)
mols = [mols[i] for i in indexes]
else:
logger.warning(
f'Fingerprint file not found for: {filename}. '
'Will skip similarity prefiltering for this reagent class.')
return mols
[docs]def get_fp_file(reactant_file, cache_dir, url_base=None, subdir=''):
"""
Get a fingerprint file for a given reactant file. First look at the cache
dir; if not found, and a URL is supplied, try to download it from the server
and write it to the cache dir.
For a reactant file named foo.pfx, the fingerprint file must be named
foo-<sha1>.fp, where <sha1> is the SHA1 hash of foo.pfx.
:param reactant_file: reactant file for which we are looking for
fingerprints
:type reactant_file: str
:param cache_dir: local directory where fingerprint files are/will be stored
:type cache_dir: str
:url_base: base URL to try to download fingerprint files from
:type url_base: str or NoneType
:subdir: optionally, subdirectory of cache_dir and URL to use
:type subdir: str
:return: path to fingerprint file, if found; else None
:rtype: str or NoneType
"""
if cache_dir is None:
return None
fp_basename = get_fp_basename(reactant_file)
fp_dir = os.path.join(cache_dir, subdir)
fp_path = os.path.join(fp_dir, fp_basename)
# Try locally
if os.path.isfile(fp_path):
logger.info(f'Using cached {fp_path}')
return fp_path
# Try remote
if url_base:
if subdir:
fp_url = f'{url_base}/{subdir}/{fp_basename}.gz'
else:
fp_url = f'{url_base}/{fp_basename}.gz'
fileutils.mkdir_p(fp_dir)
with get_lock(fp_path, max_wait=180):
if os.path.isfile(fp_path):
logger.info(f'Using cached {fp_path}')
return fp_path
else:
try:
return download_and_decompress(fp_url, fp_path)
except requests.exceptions.ConnectionError as e:
logger.error(e)
return None
return None
[docs]def download_and_decompress(url, dest):
"""
GET a gzip-compressed file from a URL and write it out, decompressed to the
local filesystem. The contents are first downloaded to a temporary file and
then renamed atomically. A locking mechanism is employed to try to prevent
concurrent downloads of the same file.
:param url: URL to download
:type url: str
:param dest: destination filename
:type dest: str
:return: `dest` if successful, else None (e.g. in case of 404)
:rtype: str or NoneType
"""
logger.info(f'GET {url}')
response = requests.get(url, stream=True, timeout=10)
if response.status_code == 200:
# wbits=31 means maximum window size, expect gzip header.
decompressor = zlib.decompressobj(wbits=31)
fp_dir = os.path.dirname(dest)
with tempfile.NamedTemporaryFile(dir=fp_dir, delete=False) as fh:
for chunk in response.iter_content(CHUNK_SIZE):
fh.write(decompressor.decompress(chunk))
try:
os.rename(fh.name, dest)
except FileExistsError:
# On Windows, someone else wrote the file already (maybe
# thanks to our imperfect locking scheme), but that's OK.
pass
return dest
else:
logger.info(f'GET failed: {response.status_code}')
return None
[docs]def get_fp_basename(reactant_file):
"""
Return the basename of the fingerprint file corresponding to the given
reactant file, following the convention that for a reactant file named
foo.pfx, the fingerprint file must be named foo-<sha1>.fp, where <sha1> is
the SHA1 hash of foo.pfx.
:param reactant_file: reactant file
:type reactant_file: str
:return: basename of fingerprint file
:rtype: str
"""
base = fileutils.get_basename(reactant_file)
checksum = get_sha1(reactant_file)
return f'{base}-{checksum}.fp'
[docs]def get_sha1(filename):
"""
Return the SHA1 hash of a file.
:param filename: input file
:type filename: str
:return: hex SHA1 digest of file contents
:rtype: str
"""
h = hashlib.sha1()
with open(filename, 'rb') as fh:
while True:
chunk = fh.read(CHUNK_SIZE)
if not chunk:
break
h.update(chunk)
return h.hexdigest()
[docs]@contextlib.contextmanager
def get_lock(basename, max_wait, interval=1.0):
"""
Create a <basename>.lock file on entry and remove it on exit. If the file
already exists, wait up to `max_wait` seconds for the lock to clear. If the
timeout is exceeded, assume that the lock is stale and ignore it.
This is a very rudimentary mechanism, but is good enough for the purposes of
this module, which is just to _try_ to prevent simultaneous downloads of the
same file, but where occasional collisions don't hurt beyond the slight
waste of bandwith and temporary use of disk space.
:param basename: basename of lock file
:type basename: str
:param max_wait: maximum wait in seconds
:type max_wait: float
:param interval: time to sleep between attempts, in seconds
:type interval: float
:return: context manager that removes lock file on exit
:rtype: contextlib._GeneratorContextManager
"""
lockfile = basename + '.lock'
try:
deadline = os.stat(lockfile).st_mtime + max_wait
except FileNotFoundError:
deadline = None
if deadline is not None:
logger.info(f'Waiting for lock {lockfile} to clear...')
while time.time() <= deadline:
time.sleep(interval)
try:
deadline = os.stat(lockfile).st_mtime + max_wait
except FileNotFoundError:
break # Lock is gone
else:
logger.info('Lock file is presumed stale '
f'(older than {max_wait} s).')
with open(lockfile, 'wb') as fh:
pass
try:
yield
finally:
fileutils.force_remove(lockfile)
[docs]def has_variable_reactants(route):
"""
Check if the route has at least one variable reactant.
:param route: route to analyze
:type route: schrodinger.application.pathfinder.route.RouteNode
:return: does the route have at least one variable reactant?
:rtype: bool
"""
return any(sm.reagent_class for sm in route.getStartingNodes())
[docs]def is_core_sm(sm, core_atoms):
"""
Check if a starting material node corresponds to a core.
:param sm: starting material
:type sm: schrodinger.application.pathfinder.route.ReagentNode
:param core_atoms: core atom indices (1-based)
:type core_atoms: set of int
:rtype: bool
"""
if core_atoms:
mol = Chem.MolFromSmiles(sm.smiles_list[0])
attachment_atoms = set()
for atom in mol.GetAtoms():
num = atom.GetAtomMapNum()
if num:
attachment_atoms.add(num)
return attachment_atoms <= core_atoms
return False
[docs]def measure_vectors(st, r1, c1, r2, c2):
"""
Measure the distance and angle between two bond vectors. The distance is
measured between atoms r1 and r2; the angle is between the c1-r1 and c2-r2
vectors.
:param st: structure to measure
:type st: schrodinger.structure.Structure
:param r1: R-group attachment atom 1
:type r1: int
:param c1: core atom 1
:type c1: int
:param r2: R-group attachment atom 2
:type r2: int
:param c2: core atom 2
:type c2: int
:return: distance and angle
:rtype: float, float
"""
dist = st.measure(r1, r2)
v1 = np.array(st.atom[r1].xyz) - np.array(st.atom[c1].xyz)
v2 = np.array(st.atom[r2].xyz) - np.array(st.atom[c2].xyz)
angle = math.degrees(transform.get_angle_between_vectors(v1, v2))
return dist, angle
[docs]def st_from_mol(mol):
"""
Convert a Mol into a Structure, with 3D coordinates but no added hydrogens.
Missing stereochemistry is tolerated.
:param mol: molecule
:type mol: rdkit.Chem.rdchem.Mol
:return: Structure
:rtype: schrodinger.structure.Structure
"""
st = rdkit_adapter.from_rdkit(mol)
st.generate3dConformation(require_stereo=False)
added_hydrogens = range(mol.GetNumAtoms() + 1, st.atom_total + 1)
st.deleteAtoms(added_hydrogens)
return st
[docs]def is_core(graph, free_component, frozen_components):
"""
Check if the `free_component` subgraph should be considered a core, meaning
that it is connected to more than one of the `free_components`.
:param graph: molecular graph
:type graph: networkx.classes.graph.Graph
:param free_component: possible core atom indices
:type free_component: set of int
:param frozen_components: possible sidechain atom indexes
:type frozen_components: list of set of int
:return: is it a core?
:rtype: bool
"""
neighbors = set().union(*[graph.neighbors(n) for n in free_component])
n = 0
for comp in frozen_components:
n += bool(neighbors & comp)
return n > 1
[docs]def apply_similarity_filters(products, args):
"""
Implement the -sim_keep_percent and -sim_discard_percent functionality.
:param products: molecules to filter
:type products: iterator of rdkit.Chem.Mol
:param args: command-line arguments
:type args: `argparse.Namespace`
:return: filtered products and number of products to keep
:rtype: generator of rdkit.Chem.Mol, int
"""
nkeep = None
if args.sim_keep_percent is not None:
nkeep = args.max_products * args.sim_keep_percent / 100.0
reverse = False
elif args.sim_discard_percent is not None:
nkeep = args.max_products * (1.0 - args.sim_discard_percent / 100.0)
reverse = True
if nkeep is not None:
filtered_products = filtering.keep_top_n(products,
nkeep,
args.max_products,
prop='r_reaction_similarity_1',
reverse=reverse)
return filtered_products, nkeep
else:
return products, args.max_products
[docs]def analyze_frozen_atoms(mol, frozen_atoms):
"""
Examine the molecular graph of the input molecule to partition it, based on
the set of frozen atoms, into free regions and frozen regions. Also
determine which free region is the core, if any.
A core in this context is a contiguous set of non-frozen atoms which is
adjacent to two or more sets of frozen atoms (the side chains).
Jobs with two or more cores will abort immediately.
When there is a core, also measure the distances and angles between all the
pairs of vectors leading from the core to the side chains. The resulting
dict has pairs of atoms as keys, and (distance, angle) tuples as values.
In addition to the set of core atoms, a set of "core neighbors" is also
returned. These are the non-core atoms that are directly connected to the
core.
:param mol: input molecule
:type mol: rdkit.Chem.rdchem.Mol
:param frozen_atoms: frozen atom indices
:type frozen_atoms: set of int
:return: measurements, core atoms, core neighbors
:rtype: dict {(int, int): (float, float)}, set of int, set of int
"""
st = st_from_mol(mol)
graph = analyze.create_nx_graph(st)
frozen_graph = nx.subgraph(graph, frozen_atoms)
frozen_components = list(nx.connected_components(frozen_graph))
free_graph = graph.copy()
free_graph.remove_nodes_from(n for n in graph if n in frozen_graph)
free_components = list(nx.connected_components(free_graph))
logger.info(f'Frozen regions: {frozen_components}')
logger.info(f'Free regions: {free_components}')
# Figure out which of these are cores...
core_atoms = set()
core_neighbors = set()
measurements = {}
for comp in free_components:
if is_core(graph, comp, frozen_components):
if core_atoms:
raise ValueError('Too many cores.')
core_atoms = comp
core_neighbors = set().union(
*[graph.neighbors(n) for n in core_atoms]) - core_atoms
if core_atoms:
logger.info(f'Core_atoms: {sorted(core_atoms)}')
# Find attachment atoms
attachment_atoms = []
for comp in frozen_components:
att = list(comp & core_neighbors)
if len(att) != 1:
raise ValueError(
'Core-hopping only supports R-groups attached via one bond')
attachment_atoms.append(att[0])
for r1, r2 in itertools.combinations(attachment_atoms, 2):
path = nx.shortest_path(graph, r1, r2)
logger.debug(f'Core path: {path}')
dist, angle = measure_vectors(st, path[0], path[1], path[-1],
path[-2])
key = tuple(sorted([r1, r2]))
measurements[key] = (dist, angle)
logger.info('Reference measurements:')
logger.info(' atom1 atom2 dist angle')
for (a1, a2), (d, ang) in measurements.items():
logger.info('%6d %6d %8.2f %8.1f' % (a1, a2, d, ang))
return measurements, core_atoms, core_neighbors
[docs]def generate_mols(*a, **d):
"""
A generator of products following the multiroute enumeration protocol.
This is a thin functional wrapper around the MultiRouteEnumerator class. For
arguments, see MultiRouteEnumerator.__init__.
:rtype: generator of Mol
"""
enumerator = MultiRouteEnumerator(*a, **d)
return enumerator.generate_mols()
[docs]def write_products(products, filename, max_products, has_frozen_atoms=False):
"""
Write out up to `max_products` from a mol iterator to a file.
:param products: molecules to write
:type products: iterator of Mol
:param filename: filename
:type filename: str
:param max_products: maximum number of structures to write
:type max_products: int
:param has_frozen_atoms: True if user specified frozen atoms
:type has_frozen_atoms: bool
"""
nproducts = 0
backend = jobcontrol.get_backend()
percent_done = 0
align_products = (fileutils.is_maestro_file(filename) and has_frozen_atoms)
with get_output_writer(align_products, filename) as writer:
mol_aligner = aligner.AlignedMolStructureGenerator()
for product in products:
logger.debug(
'%s %s' %
(product.GetIntProp(ROUTE_ID_PROP), Chem.MolToSmiles(product)))
output = product
if align_products:
output = mol_aligner.generateAlignedStructure(product)
writer.append(output)
nproducts += 1
new_percent = int(nproducts / max_products * 100)
if new_percent > percent_done:
percent_done = new_percent
logger.info(f'{percent_done}% done')
if backend and nproducts % 100 == 0:
backend.setJobProgress(steps=nproducts, totalsteps=max_products)
if nproducts >= max_products:
break
logger.info(f'Wrote {nproducts:,} products to {filename}')
[docs]def get_output_writer(align_products, filename):
"""
If we're generating maestro structures (.mae, mae.gz, .maegz
we need to generate structures, align, them, and then write
them out. If not, we can write products out with the MolWriter.
:param args: input arguments used to create the output sink
:type args: argparse.Namespace
:align_products: flag indicating product alignment
:type align_products: bool
"""
try:
# We need to generate and align structures
if align_products:
return structure.StructureWriter(filename)
# we can let the MolWriter handle structure generation
return molio.get_mol_writer(filename)
except ValueError as e:
sys.exit(e)