"""
Utility functions and classes for MatSci workflows.
Copyright Schrodinger, LLC. All rights reserved.
"""
import collections
import copy
import contextlib
import csv
import functools
import os
import re
import shutil
import textwrap
import warnings
from distutils import util as dutil
from collections.abc import Mapping
import numpy
from rdkit import Chem
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import shlex
from contextlib import nullcontext
import schrodinger
from schrodinger import structure
# Matsci modules except msprops and codeutils should not be imported here
from schrodinger.application.desmond import cms
from schrodinger.application.matsci import codeutils
from schrodinger.application.matsci import msprops
from schrodinger.infra import mm
from schrodinger.structutils import analyze
from schrodinger.thirdparty import rdkit_adapter
from schrodinger.utils import fileutils
from schrodinger.utils.license import is_opls2_available
from schrodinger.test import ioredirect
maestro = schrodinger.get_maestro()
APP_DIR = 'matsci_templates'
# -2 is the official dummy atomic number, but some sources use 0, such as
# Maestro builder ligands (MATSCI-6328, SHARED-6160)
DUMMY_ATOMIC_NUMBERS = {0, -2}
DUMMY_ELEMENT_SYMBOLS = {'Du', 'DU'}
DUMMY_ATOM_NAMES = {'Xpi'} # Jaguar mmjag uses this
PROP_TYPE_GET_DICT = {
'r': mm.mmct_atom_property_get_real,
'b': mm.mmct_atom_property_get_bool,
'i': mm.mmct_atom_property_get_int,
's': mm.mmct_atom_property_get_string,
}
RMTREE_FAILED = 'rmtree_failed'
Forcefield = collections.namedtuple('Forcefield', 'version name')
SDGR_INDEX = 'schrodinger_index'
[docs]def get_default_forcefield():
"""
Returns information for S-OPLS if license is found. If no license is found,
returns information for OLPS2005, which requires no license.
:returns namedtuple forcefield: Named 2-tuple containing the `version` and
`name` of the default forcefield, respectively
"""
# OPLS_2005 does not allow ZOB to sp3 carbon while OPLS3 (now known as
# S-OPLS) does
if is_opls2_available():
forcefield = Forcefield(mm.OPLSVersion.F16, mm.OPLS_NAME_F16)
# Fall back to OPLS_2005 if an S-OPLS license is not available
else:
forcefield = Forcefield(mm.OPLSVersion.F14, mm.OPLS_NAME_F14)
return forcefield
[docs]def remove_properties(struct,
props=None,
matches=None,
atom_props=None,
atom_matches=None):
"""
Remove all the matching structure and atom properties. No error is thrown if
the given properties do not actually exist on the structure.
:type struct: `structure.Structure` or `cms.Cms`
:param struct: The structure to remove properties from
:type props: list
:param props: A list of structure properties to delete
:type matches: list
:param matches: Remove all structure properties whose name contains any of
these strings
:type atom_props: list
:param atom_props: A list of atom properties to delete
:type atom_matches: list
:param atom_matches: Remove all atom properties whose name contains any of
these strings
"""
def _get_proplist(prop_names, given, strings):
if given is None:
proplist = []
else:
proplist = given[:]
if strings is not None:
for name in prop_names:
for astring in strings:
if astring in name:
proplist.append(name)
return proplist
is_cms = isinstance(struct, cms.Cms)
for prop in _get_proplist(struct.getPropertyNames(), props, matches):
if is_cms:
struct.remove_cts_property(prop)
else:
struct.property.pop(prop, None)
for prop in _get_proplist(struct.getAtomPropertyNames(), atom_props,
atom_matches):
remove_atom_property(struct, prop)
[docs]def remove_atom_property(struct, prop):
"""
Delete atom property from all atoms in a structure
(structure will be modified).
:type struct: `structure.Structure` or `cms.Cms`
:param struct: Structure object to be modified
:type prop: str
:param prop: Atom property to be removed
"""
if isinstance(struct, cms.Cms):
remove_cms_atom_property(struct, prop)
try:
# The below call is vastly faster than iterating over all atoms
mm.mmct_atom_property_delete(struct, prop, mm.MMCT_ATOMS_ALL)
except mm.MmException as mmexc:
if mmexc.rc != mm.MMCT_ATOM_PROPERTY_NOT_DEFINED_IN_CT:
# Should not happen, but just to be sure we don't silently
# ignore important errors...
raise
[docs]@codeutils.deprecate(to_remove_in='22-1',
replacement=cms.Cms.remove_cts_property)
def remove_cms_property(cms_model, propname):
"""
Delete a property from a cms model
:param cms_model: cms model
:type cms_model: `cms.Cms`
:param propname: property name
:type propname: str
"""
cms_model.remove_cts_property(propname)
[docs]def remove_cms_atom_property(cms_model, propname):
"""
Delete an atom property from a cms model
:param cms_model: cms model
:type cms_model: `cms.Cms`
:param propname: property name
:type propname: str
"""
for struct in [cms_model._raw_fsys_ct, cms_model.fsys_ct
] + cms_model.comp_ct:
remove_atom_property(struct, propname)
[docs]def has_atom_property(struct, prop):
"""
Check if structure has any atom with the property set.
:param structure.Structure: Input structure
:param str: Property name
:raise KeyError: If property name doesn't start with: `s_`, `r_`, `i_`, `b_`
:raise mm.MmException: If unexpected error occurred
:return bool: True of property is present, False otherwise
"""
if struct.atom_total == 0:
return False
# get function based on the property first char
funct = PROP_TYPE_GET_DICT[prop[0]]
try:
funct(struct, prop, 1)
except mm.MmException as mmexc:
if mmexc.rc == mm.MMCT_ATOM_PROPERTY_NOT_DEFINED_IN_CT:
# Property does not exist on any atom
return False
elif mmexc.rc == mm.MMCT_ATOM_PROPERTY_UNDEFINED_ATOM:
# Property exists on at least one atom, but not this one
return True
else:
# An unexpected error occurred, don't be silent about it
raise
return True
[docs]def getstr(ret):
"""
Convert binary string (or other data) to str.
:type ret: binary_type or any other type convertable to str
:param ret: Value to be converted to str
:rtype: str
:return: Value converted to str
"""
if isinstance(ret, bytes):
ret = ret.decode()
return str(ret)
[docs]def get_project_group_hierarchy(st=None, row=None):
"""
Return the project group hierarchy for the given structure or
row.
:type st: `schrodinger.structure.Structure`
:param st: the structure
:type row: `schrodinger.project.ProjectRow`
:param row: the project row
:raise ValueError: if there is an issue
:rtype: list
:return: the hierarchy (outermost to innermost)
"""
if (not st and not row) or (st and row):
msg = ('Either a structure or a project row must be given.')
raise ValueError(msg)
if st:
hierarchy = st.property.get(mm.M2IO_DATA_SUBGROUP_TITLE)
if hierarchy:
return hierarchy.split(mm.M2IO_SUBGROUP_SEPARATOR)
eid = st.property.get(msprops.ENTRY_ID_PROP)
if eid is None:
msg = ('Structure has no entry ID.')
raise ValueError(msg)
if not maestro:
msg = ('No Maestro session is active.')
raise ValueError(msg)
pt = maestro.project_table_get()
row = pt.getRow(eid)
if not row:
msg = ('Structure is not in the project.')
raise ValueError(msg)
if row:
hierarchy = row.property.get(mm.M2IO_DATA_SUBGROUP_TITLE)
if hierarchy:
return hierarchy.split(mm.M2IO_SUBGROUP_SEPARATOR)
hierarchy = []
group = row.group
while group:
hierarchy.append(group.name)
group = group.getParentGroup()
hierarchy.reverse()
return hierarchy
[docs]def set_project_group_hierarchy(st, hierarchy, collapsed=False):
"""
Set the project group hierarchy for the given structure.
:type st: `schrodinger.structure.Structure`
:param st: the structure
:type hierarchy: list
:param hierarchy: the hierarchy (outermost to innermost)
:param bool collapsed: Whether the group should initially be collapsed
"""
if hierarchy:
hierarchy = mm.M2IO_SUBGROUP_SEPARATOR.join(hierarchy)
st.property[mm.M2IO_DATA_SUBGROUP_TITLE] = hierarchy
st.property[mm.M2IO_DATA_SUBGROUPID] = hierarchy
st.property[mm.M2IO_DATA_SUBGROUP_COLLAPSED] = collapsed
[docs]def get_matsci_user_data_dir():
"""
Get the absolute path to the user's local MatSci data directory for storing
custom templates, protocols, etc. Directory is created if it doesn't exist.
:rtype: str
:return: The absolute path the Materials Science data parent directory
"""
data_dir = os.path.join(
fileutils.get_directory_path(fileutils.LOCAL_APPDATA), APP_DIR)
fileutils.mkdir_p(data_dir)
return data_dir
[docs]def structure_reader(filename, log=None, do_raise=False):
"""
Read structures from a file until the end or the first structure with an
error.
:param str filename: filename
:param function log: Log function, if None, nothing is called
:param bool do_raise: Whether to raise on an exception
:yield schrodinger.structure.Structure: Next structure in the file
"""
# Send to /dev/null if log is None
log = log if log else lambda x: None
try:
with structure.StructureReader(filename) as reader:
yield from reader
except:
log('Failed to read a structure from %s.' % filename)
if do_raise:
raise
[docs]def is_dummy_atom(atom):
"""
Return True if the given atom is a dummy atom.
:type atom: schrodinger.structure._StructureAtom
:param atom: the atom
:rtype: bool
:return: return True if the given atom is a dummy atom
"""
return (atom.atomic_number in DUMMY_ATOMIC_NUMBERS or
atom.element in DUMMY_ELEMENT_SYMBOLS or
atom.name in DUMMY_ATOM_NAMES)
[docs]def validate_no_dummy_atoms(structs):
"""
Validate that the passed structures don't have dummy atoms
:param iterable structs: Structures to check
:rtype: bool or (bool, str)
:rtype: True if no structures has dummy atoms, False and error message if
at least one structure does
"""
for struct in structs:
if any(is_dummy_atom(atom) for atom in struct.atom):
return False, (f'At least one structure ({struct.title}) has dummy '
'atoms and cannot be used with this workflow.')
return True
[docs]def add_or_update_bond_order(ct, atom1, atom2, bond_order):
"""
Create a new bond, or update the existing bond order of this bond.
:type ct: schrodinger.structure.Structure
:type atom1: schrodinger.structure._StructureAtom
:type atom2: schrodinger.structure._StructureAtom
:type bond_type: int (0-3)
:rtype: schrodinger.structure._Bond
"""
bond = ct.getBond(atom1, atom2)
if not bond:
return ct.addBond(atom1, atom2, bond_order)
bond.order = bond_order
return bond
[docs]def add_or_update_bond_type(ct, atom1, atom2, bond_type):
"""
Create a new bond, or update the existing bond type of this bond.
:type ct: schrodinger.structure.Structure
:type atom1: schrodinger.structure._StructureAtom
:type atom2: schrodinger.structure._StructureAtom
:type bond_type: schrodinger.structure.BondType
:rtype: schrodinger.structure._Bond
"""
bond = ct.getBond(atom1, atom2)
if not bond:
return ct.addBond(atom1, atom2, bond_type)
bond.type = bond_type
return bond
[docs]def trim_str(text, max_len, suffix='...'):
"""
Trim the string to approximately max_len. Add a suffix if the string is
longer than max_len.
:param text: String to trim
:param int max_len: Max length of the string
:param str suffix: Suffix to add if the string is to be trimmed
:return str: Trimmed string
"""
# For usage see msutils_test.py :: test_trim_str
trim_len = len(suffix)
if max_len <= len(suffix):
trim_len = 0
if len(text) > max_len:
return text[:max_len - trim_len] + suffix
return text
[docs]@contextlib.contextmanager
def mmlewis_apply(quiet=True):
"""
Context manager that initializes mm and returns mm.mmlews_apply method.
Example usage:
with msutils.mmlewis_apply() as lewis_apply:
assert lewis_apply(struct) is None
:yield: mm.mmlewis_apply method.
:raise mm.MmException: On mmlewis_apply failure
"""
con_man = ioredirect.IOSilence() if quiet else nullcontext()
mm.mmerr_initialize()
mm.mmlewis_initialize(mm.MMERR_DEFAULT_HANDLER)
with con_man:
yield mm.mmlewis_apply
mm.mmlewis_terminate()
mm.mmerr_terminate()
[docs]def get_atom_ffio_velocity(atom):
"""
Get FFIO atom velocities.
:param structure._StructureAtom atom: Input atom
:return numpy.array: Array of velocities
"""
vel = [
atom.property.get(msprops.FFIO_ATOM_VEL(axis), 0.)
for axis in ['x', 'y', 'z']
]
return numpy.array(vel)
[docs]def set_atom_ffio_velocity(atom, velocity):
"""
Set FFIO atom velocities.
:param structure._StructureAtom atom: Atom to modify
:param list velocity: List of velocities (x, y, z components)
"""
for val, axis in zip(velocity, ['x', 'y', 'z']):
atom.property[msprops.FFIO_ATOM_VEL(axis)] = val
[docs]def get_unique_name(new_name, existing_names):
"""
Add a suffix to new_name to make a unique name if it already exists in
existing_names.
:param str new_name: The new name
:param list existing_names: Existing names
:rtype: str
:return: The unique version of new_name
"""
names_set = set(existing_names)
unique_name = new_name
index = 1
while unique_name in names_set:
unique_name = new_name + f"_{index}"
index += 1
return unique_name
[docs]def get_next_name(name):
"""
Get the next customer facing name. For example, 'xx' gives 'xx (1)' and
'xx (n)' gives 'xx (n + 1)'
:param str name: The name based on which the next is generated
:rtype: str
:return: The unique version of new_name
"""
matches = re.compile('\\(([0-9]*?)\\)$').search(name)
if not matches:
name += ' (1)'
return name
splitted = name.split('(')
splitted[-1] = str(int(splitted[-1][:-1]) + 1) + ')'
name = '('.join(splitted)
return name
[docs]def setting_to_bool(string, empty_is_false=True):
"""
Convert a yes/no/true/false/1/0/on/off type string to a Python boolean
:param str string: The string to convert
:param empty_is_false: If the string is empty or None, return False
:rtype: bool
:return: True if the string is a "true"-y word (TRUE, true, t, yes, on, 1,
etc), False if it is a "false"-y word (FALSE, false, f, no, off, 0).
:raise ValueError: If the string cannot be interpreted in a True/False
manner
:raise AttributeError: If something other than a string is passed in
"""
if empty_is_false and (string is None or string == ""):
return False
return bool(dutil.strtobool(string))
[docs]def flatten(alist, afunc=None):
"""
Flatten the given list into a set.
:type alist: list
:param alist: elements contain iterable data
:type afunc: function or None
:param afunc: function used to extract iterable data
from the given list elements or None if there isn't one
:rtype: set
:return: a flattened set of data from the given list
"""
if afunc is None:
afunc = lambda x: x
return set([j for i in alist for j in afunc(i)])
[docs]def get_unique_ordered_list(l_values):
"""
Remove the duplicates from the list while maintaining the order. If there
are duplicates then the value with lower index is kept
:param l_values: The list of values to make unique
:type l_values: list
:returns: The unique ordered list.
:rtype: list
"""
seen_vals = set()
seen_add = seen_vals.add
return [x for x in l_values if not (x in seen_vals or seen_add(x))]
[docs]def get_atomic_element(atomic_number):
"""Given atomic number return chemical element.
:param int atomic_number: Atomic number
:rtype: str
:return: Chemical element
"""
# TODO: Remove this when SHARED-7629 is implemented
return mm.mmat_get_element_by_atomic_number(atomic_number).rstrip()
[docs]@contextlib.contextmanager
def ignore_ssl_warnings():
"""
Context manager to temporarily ignore InsecureRequestWarning warning.
"""
# Disable InsecureRequestWarning - currently our ssl distribution is platform
# dependent, and our clients are not uniform in their ssl configuration,
# so we cannot use ssl verification.
with warnings.catch_warnings():
warnings.simplefilter('ignore', InsecureRequestWarning)
yield
[docs]def get_index_from_default_name(atom_name):
"""
Find the atom index from string of element name and atom index
:type atom_name: str
:param atom_name: concatenated string of element symbol with
the atom index
:rtype index: int or None
:para atom_index: Atom index
"""
index = re.findall(r'\d+', atom_name)
if not index:
return
return int(index[0])
[docs]def title_case(original,
exceptions=('an', 'of', 'the', 'for'),
skip_single_letters=True):
"""
Convert the string to title case, optionally ignoring articles and single
letters
Examples with default kwargs:
"Number of molecules": "Number of Molecules"
"axis b": "Axis b"
:param str original: The string to make title case
:param tuple exceptions: The words to not capitalize
:param bool skip_single_letters: Whether single letters should not be
capitalized
:rtype: str
:return: The string in title case
"""
parts = re.split(' ', original)
title_parts = [parts[0].capitalize()]
for part in parts[1:]:
if part in exceptions or part.isupper() or \
(skip_single_letters and len(part) == 1):
title_parts.append(part)
else:
title_parts.append(part.capitalize())
return ' '.join(title_parts)
[docs]def generate_smiles(struct):
"""
Return a SMILES string for `st`.
For more options, see the
`schrodinger.structutils.smiles.SmilesGenerator` class.
:type struct: `Structure`
:param struct: Structure for which SMILES string is desired.
:rtype: str
:return: SMILES string representing `st`.
"""
# TODO: investigate RKDIT for generating smiles
return analyze.generate_smiles(struct)
[docs]def get_common_property_names(sts):
"""
Return the property names that all of the given structures
have in common.
:type sts: list[`schrodinger.structure.Structure`]
:param sts: the structures to search
:rtype: set[str]
:return: the common property names
"""
names = set(sts[0].property)
for st in sts[1:]:
names = names.intersection(st.property)
return names
[docs]def get_common_float_property_names(sts):
"""
Return the float property names that all of the given
structures have in common.
:type sts: list[`schrodinger.structure.Structure`]
:param sts: the structures to search
:rtype: set[str]
:return: the common float property names
"""
names = set()
for name in get_common_property_names(sts):
prop = structure.PropertyName(dataname=name)
if prop.type == structure.PROP_FLOAT:
names.add(name)
return names
[docs]def get_common_atom_property_names(sts):
"""
Return the property names that all atoms of the given structures
have in common.
:type sts: list[`schrodinger.structure.Structure`]
:param sts: the structures to search
:rtype: set[str]
:return: the common atom property names
"""
names = None
for st in sts:
# this can be slow for large structures
for atom in st.atom:
if names is None:
names = set(atom.property)
else:
names = names.intersection(atom.property)
return names
[docs]def get_common_float_atom_property_names(sts):
"""
Return the float atom property names that all of the given
structures have in common.
:type sts: list[`schrodinger.structure.Structure`]
:param sts: the structures to search
:rtype: set[str]
:return: the common float atom property names
"""
names = set()
for name in get_common_atom_property_names(sts):
prop = structure.PropertyName(dataname=name)
if prop.type == structure.PROP_FLOAT:
names.add(name)
return names
[docs]def is_coarse_grain(struct, by_atom=False):
"""
Check if struct is a coarse grain structure
:type struct: `schrodinger.structure.Structure`
:param struct: The structure to check
:type by_atom: bool
:param by_atom: If True, check each atom to see if it is coarse grain and
return True if any atom is coarse grin. If False, check only for the coarse
grain structure property. True is useful when the structure has been
obtained via maestro.workspace_get, which removes structure-level
properties, or if the structure may be a mixed atomistic/coarse-grained
structure.
:rtype: bool
:return: True if it is a coarse grain structure, False if not
"""
if not struct:
# Handle cases where None is passed in by panels that do not have
# structures loaded yet
return False
if by_atom:
return mm.mmct_ct_has_coarse_grain_atom(struct)
else:
return struct.property.get(msprops.REP_TYPE_KEY) == msprops.CG_REP_TYPE
[docs]def structure_reader_to_3d(file_path,
require_stereo=False,
out_rep=None,
debug=False):
"""
Read structures from a file and return 3D representations.
:param str file_path: the file, can be of any format supported by
`schrodinger.structure.StructureReader` or
`schrodinger.structure.SmilesReader`
:param bool require_stereo: see
`schrodinger.structure.Structure.generate3dConformation`
:type out_rep: str or None
:param out_rep: For eta-bound transition metal complexes, if None then
the conversion is to the opposite of the given representation, eta
to centroid or centroid to eta, if a string then must be either module
constant parserutils.CENTROID or parserutils.ETA in which case the
conversion will always provide an output representation of the given type
:param bool debug: If debug is True it prints output else it doesn't
:rtype: list[`schrodinger.structure.Structure`]
:return: 3D structures
"""
# importing locally to avoid circular imports
from schrodinger.application.matsci import buildcomplex2d
from schrodinger.application.matsci import etatoggle as tesu
# see MATSCI-10912 where fast3d can not be used to volumize eta-bound
# metal complexes
if file_path.endswith(fileutils.SDF_EXT):
con_man = nullcontext() if debug else ioredirect.IOSilence()
with con_man:
try:
st = buildcomplex2d.ComplexSdfToMae(file_path).run()
except buildcomplex2d.ComplexSdfToMaeException:
# the input sdf could have been of a type not handled by
# buildcomplex2d and so try the standard approach
pass
else:
tesu.toggle_structure(st, out_rep=out_rep)
return [st]
# see SHARED-7822 for adding SMILES support to StructureReader,
# but for now do this manually
try:
sts = list(structure.StructureReader(file_path))
except ValueError as err:
try:
reader = structure.SmilesReader(file_path)
except Exception:
# intentionally only raise the original ValueError rather
# than this Exception
raise err from None
else:
# uses canvas
sts = [smi_st.get2dStructure() for smi_st in reader]
# uses fast3d
for st in sts:
st.generate3dConformation(require_stereo=require_stereo)
return sts
[docs]def keyword_string_to_dict(keystring):
"""
Return a dictionary whose keys are keywords and values are keyword
values
:type keystring: str
:param keystring: The keywords are taken from this string - keywords
must be in the keyword=value format and whitespace delimited.
:rtype: dict
:return: Dictionary of keyword/value pairs
:raise ValueError: if any tokens do not match the keyword=value format
"""
keylist = keystring.split()
keydict = {}
for keyvalue in keylist:
if not keyvalue[0].isalnum():
raise ValueError('Keywords must begin with a letter or number, '
'%s does not' % keyvalue)
msg = 'An invalid keyword syntax was encountered: %s' % keyvalue
try:
key, value = keyvalue.split('=')
except ValueError:
raise ValueError(msg)
if not key or not value:
raise ValueError(msg)
keydict[key] = value
return keydict
[docs]def keyword_dict_to_string(keydict):
"""
Return a string of keywords specified by keydict.
:type keydict: dict
:param keydict: Dictionary - keys are Jaguar keywords, values are keyword
values of str type
:rtype: str
:return: A string of space-separated keyword=value pairs
"""
# 'fix_for_cmdline' argument has been removed (MATSCI-3026)
keystring = ""
for key, value in keydict.items():
keystring += '%s=%s ' % (key, str(value))
keystring = keystring.rstrip()
return keystring
[docs]def get_st_rdkit_mappers(rd_mol):
"""
Gets mapper dictionaries for converting between schrodinger structure atom
index and rdkit molecule atom index
:type rd_mol: `rdkit.Chem.rdchem.Mol`
:param rd_mol: RDMol structure derived from schrodinger structure
:returns: The two dictionaries first one has key as schrodinger structure
atom id and value is the rdkit molecule atom id. The second dictionary
has key as rdkit molecule atom id and value as schrodinger structure
atom id
:rtype: dict, dict
"""
map_st_to_rd, map_rd_to_st = {}, {}
for rd_atom in rd_mol.GetAtoms():
rd_index = rd_atom.GetIdx()
st_index = rd_atom.GetIntProp(SDGR_INDEX)
map_st_to_rd[st_index] = rd_index
map_rd_to_st[rd_index] = st_index
return map_st_to_rd, map_rd_to_st
[docs]class DictCache(collections.OrderedDict):
"""
A first in first out dictionary cache which caches the key and associated
value.
"""
[docs] def __init__(self, maxcount=10000):
"""
Constructs a new lru cache.
:param maxcount: The maximum number of data that the cache can hold
:type maxcount: int
"""
super().__init__()
self.maxcount = maxcount
def __setitem__(self, key, value):
"""
Set the value for the passed key
:param key: The key
:type key: string or tuple
:param value: The value than any dictionary can hold
:type value: string
"""
super().__setitem__(key, value)
# Maintain size limit for the cache
if len(self) > self.maxcount:
self.popitem(last=False)
[docs]class RdkitAdapter:
"""
A class to calculate calculate SMARTS and SMILES pattern for a structure
multiple times. The class can be memory intensive to allow for increased
speed.
"""
PROTECTED_PATTERN_BIT = ['D', 'R', 'r', 'v', 'x', 'X', 'H']
[docs] def __init__(self, struct):
"""
Initiate PatternEvaluator class
:type struct: `schrodinger.structure.Structure`
:param struct: Structure for which patterns need to be selected
"""
self.struct = struct
self.is_coarse_grain = mm.mmct_ct_has_coarse_grain_atom(struct)
if self.is_coarse_grain:
self.rd_mol = self._createCGRdMol()
else:
self.rd_mol = rdkit_adapter.to_rdkit(struct)
self.map_st_to_rd, self.map_rd_to_st = get_st_rdkit_mappers(self.rd_mol)
self._setCacheVariables()
def _setCacheVariables(self):
"""
Create SMILES and SMARTS pattern cache for storing already calculated
values
"""
self._smiles = DictCache()
self._smarts = DictCache()
[docs] def toRdIndices(self, atom_indices):
"""
Convert list of Schrodinger structure atom indices to RDMol atom
indices
:type atom_indices: list
:param atom_indices: list of Schrodinger structure atom indices
:rtype: list
:return: list of RDMol atom indices
"""
return [self.map_st_to_rd[x] for x in atom_indices]
[docs] def toStIndices(self, atom_indices):
"""
Convert list of RDMol atom indices to Schrodinger structure atom
indices
:type atom_indices: list
:param atom_indices: list of RDMol atom indices
:rtype: list
:return: list of Schrodinger structure atom indices
"""
return [self.map_rd_to_st[x] for x in atom_indices]
def _setEleMapper(self):
"""
Create a mapper between cg atom name and element name
"""
# Get the element name and atomic number of RDkit elements which do
# not have valency restrictions
rd_pt = Chem.rdchem.GetPeriodicTable()
rd_ele_allowed = {
atomic_number: rd_pt.GetElementSymbol(atomic_number)
for atomic_number in range(1, 118)
if rd_pt.GetDefaultValence(atomic_number) == -1
}
# Get the coarse grain atom names in the structure
atom_names = sorted((set(x.name for x in self.struct.atom)))
# Remove atom names from the elements allowed due to possible conflict
for aname in atom_names:
if aname in self.PROTECTED_PATTERN_BIT:
raise IndexError(
f'{aname} is a protected name, and coarse grain structures '
f'containing particle named {aname} are not supported.')
if aname in rd_ele_allowed:
rd_ele_allowed.pop(aname)
# Check if enough elements are available to map the atom names
if len(atom_names) > len(rd_ele_allowed):
raise IndexError('Unique coarse grain atom cannot be more than '
f'{len(rd_ele_allowed)}')
# Create mapper between element and atom name
self.ele_name_mapper = {}
self.ele_to_proxy = {}
self.proxy_to_ele = {}
for aname, pt_id in zip(atom_names, rd_ele_allowed):
self.ele_name_mapper[aname] = rd_ele_allowed[pt_id]
self.ele_to_proxy[str(pt_id)] = aname
self.ele_to_proxy[rd_ele_allowed[pt_id]] = aname
self.proxy_to_ele[aname] = rd_ele_allowed[pt_id]
def _createCGRdMol(self):
"""
Create RDMol from coarse grain structure
:type rd_mol: `rdkit.Chem.rdchem.Mol`
:param rd_mol: RDMol representing the coarse grain structure
"""
# Create structure where each atom in the coarse grain structure is a
# different element
self._setEleMapper()
struct = self.struct.copy()
remove_atom_property(struct, prop=mm.MMCT_ATOM_PROPERTY_COARSE_GRAIN)
for atom in struct.atom:
atom.element = self.ele_name_mapper[atom.name]
# Convert the coarse grain structure to RDMol
rd_mol = rdkit_adapter.to_rdkit(struct, sanitize=False)
for atom in rd_mol.GetAtoms():
struct_idx = atom.GetIntProp(SDGR_INDEX)
struct_atom = struct.atom[struct_idx]
atom.name = struct_atom.name
atom.SetProp('smilesSymbol', struct_atom.name)
atom.SetNoImplicit(True)
rd_mol.UpdatePropertyCache(strict=False)
return rd_mol
def _patternTranslate(self, s_pattern, mapper):
"""
Replace passed SMARTS/SMILES such that the mapper key values are
replaced by
:param s_pattern: The SMARTS/SMILES pattern to change
:type s_pattern: str
:param mapper: The mapper used to convert the SMARTS/SMILES pattern
:type mapper: dictionary where the key is the element name to find in
the pattern and value is the name to replace it with
:returns: the converted SMARTS/SMILES pattern
:rtype: str
"""
split_pattern = [x for x in re.split(r'\[|\]', s_pattern) if x]
for sidx, smarts_val in enumerate(split_pattern):
# Check if name is part of the split pattern, if not continue
if smarts_val[0] not in self.PROTECTED_PATTERN_BIT:
for mapper_name in mapper.keys():
if mapper_name in smarts_val:
break
else:
continue
# Check if pattern matches mapper atom name
if smarts_val in mapper.keys():
split_pattern[sidx] = f'[{mapper[smarts_val]}]'
continue
# Remove # for representing atomic number
if smarts_val.startswith('#'):
smarts_val = smarts_val[1:]
# Get position where the element/atom name string ends
search_text = re.search(r'\W+', smarts_val)
pos = search_text.start() if search_text else len(smarts_val)
alpha_num_str = smarts_val[:pos]
non_alpha_str = smarts_val[pos:]
# Do not change anything for protected SMARTS pattern
alpha_str = ''.join(x for x in alpha_num_str if x.isalpha())
if (alpha_str not in self.PROTECTED_PATTERN_BIT and
alpha_str in mapper.keys()):
# Replace the element/atom name with the mapper value
smarts_val = mapper[alpha_num_str] + non_alpha_str
split_pattern[sidx] = f'[{smarts_val}]'
converted_smarts = ''.join(split_pattern)
return converted_smarts
[docs] def elementToAtomName(self, s_pattern):
"""
If the structure is a coarse grain structure convert the SMARTS/SMILES
pattern of elements to coarse grain atom name. Does nothing for all
atom structures
:param smarts: The SMARTS/SMILES pattern
:type smarts: str
:returns: The translated SMARTS/SMILES pattern
:rtype: str
"""
if not self.is_coarse_grain:
return s_pattern
return self._patternTranslate(s_pattern, self.ele_to_proxy)
[docs] def atomNameToElement(self, s_pattern):
"""
If the structure is a coarse grain structure convert the SMARTS/SMILES
pattern of coarse grain atom name to mapped elements. Does nothing for
all atom structures
:param s_pattern: The SMARTS/SMILES pattern
:type s_pattern: str
:returns: The translated SMARTS/SMILES pattern
:rtype: str
"""
if not self.is_coarse_grain:
return s_pattern
return self._patternTranslate(s_pattern, self.proxy_to_ele)
@property
def smiles(self):
"""
Get the SMILES for the passed structure
:returns: SMILES pattern for the passed structure
:rtype: str
"""
return self.toSmiles()
@smiles.setter
def smiles(self, value):
"""
Overwrite SMILES setter to not allow user to set it
:param value: SMILES
:type value: str
:raises AttributeError: Always raise error since setting SMILES is
not allowed
"""
raise AttributeError('SMILES cannot be set for a structure')
@property
def smarts(self):
"""
Get the SMARTS for the passed structure
:returns: SMARTS pattern for the passed structure
:rtype: str
"""
return self.toSmarts()
@smarts.setter
def smarts(self, value):
"""
Overwrite SMARTS setter to not allow user to set it
:param value: SMARTS
:type value: str
:raises AttributeError: Always raise error since setting SMARTS is
not allowed
"""
raise AttributeError('SMARTS cannot be set for a structure')
[docs] def toSmiles(self, atom_ids=None):
"""
Get SMILES for subset of atom ids
:type atom_ids: list
:param atom_ids: list of atom indices
:rtype: str
:return: SMILES pattern for the atom ids provided
"""
atom_ids = tuple(atom_ids) if atom_ids else None
if atom_ids in self._smiles:
return self._smiles[atom_ids]
if atom_ids:
self._smiles[atom_ids] = Chem.MolFragmentToSmiles(
self.rd_mol, self.toRdIndices(atom_ids))
else:
self._smiles[atom_ids] = Chem.MolToSmiles(self.rd_mol)
return self._smiles[atom_ids]
[docs] def toSmarts(self, atom_ids=None):
"""
Get SMARTS for subset of atom ids
:type atom_ids: list
:param atom_ids: list of atom indices
:rtype: str
:return: SMARTS pattern for the atom ids provided
"""
atom_ids = tuple(atom_ids) if atom_ids else None
if atom_ids in self._smarts:
return self._smarts[atom_ids]
if atom_ids:
smarts = Chem.MolFragmentToSmarts(self.rd_mol,
self.toRdIndices(atom_ids))
else:
smarts = Chem.MolToSmarts(self.rd_mol)
self._smarts[atom_ids] = self.elementToAtomName(smarts)
return self._smarts[atom_ids]
def _getPatternMatches(self, s_pattern):
"""
Search for the passed pattern in the current structure
:param s_pattern: The s pattern
:type s_pattern: `rdkit.Chem.rdchem.Mol`
:returns: The list of atom ids matches which matches the pattern
:rtype: list
"""
# Checking if the pattern matches in the structure
if s_pattern is None:
raise ValueError('Pattern matching failed due to invalid pattern')
# Changing maxMatches to reflect MATSCI-6657
rd_matches = self.rd_mol.GetSubstructMatches(s_pattern,
uniquify=True,
useChirality=True,
useQueryQueryMatches=False,
maxMatches=10000)
matches = []
for rd_indices in rd_matches:
matches.append(self.toStIndices(rd_indices))
return matches
[docs] @functools.lru_cache
def getSMILESMatches(self, smiles):
"""
Get the list of matches for the passed SMILES pattern in the reference
structure
:type smiles: str
:param smiles: SMILES pattern to find
:rtype: list or None
:return: list of list atom indices with matching SMILES.
"""
smiles = self.atomNameToElement(smiles)
patt = Chem.MolFromSmiles(smiles, sanitize=False)
return self._getPatternMatches(patt)
[docs] @functools.lru_cache
def getSMARTSMatches(self, smarts):
"""
Get the list of matches for the passed SMARTS pattern in the reference
structure
:type smarts: str
:param smarts: SMARTS pattern to find
:rtype: list or None
:return: list of list atom indices with matching SMARTS.
"""
smarts = self.atomNameToElement(smarts)
patt = Chem.MolFromSmarts(smarts)
return self._getPatternMatches(patt)
def _getMolecularPattern(self, cache_dict, search_func):
"""
Gets the SMILES/SMARTS pattern for all molecules
:param cache_dict: The cache SMILES or SMARTS dictionary to check if
value has already been calculated
:type cache_dict: DictCache
:param search_func: The rdkit function to calculate pattern
:type search_func: function
:returns: The dictionary where the key is the molecule number and
pattern is the associated SMILES or SMARTS pattern for the molecule
:rtype: dict
"""
mol_patt = {}
for mol_frag in Chem.rdmolops.GetMolFrags(self.rd_mol, asMols=True):
# Get the corresponding atom schrodinger structure indices
st_aid = tuple(
sorted(x.GetIntProp(SDGR_INDEX) for x in mol_frag.GetAtoms()))
# Set pattern if it is not already set in the cache
if st_aid in cache_dict:
patt_val = cache_dict[st_aid]
else:
patt_val = search_func(mol_frag)
cache_dict[st_aid] = patt_val
# Get the schrodinger structure molecule number and set the pattern
# value
st_mol_num = self.struct.atom[st_aid[0]].molecule_number
mol_patt[st_mol_num] = patt_val
return mol_patt
[docs] def getMoleculeSmiles(self):
"""
Get SMILES for each molecule in the structure
:returns: The dictionary where the key is the molecule number and the
value is the corresponding SMILES pattern
:rtype: dict
"""
return self._getMolecularPattern(self._smiles, Chem.MolToSmiles)
[docs] def getMoleculeSmarts(self):
"""
Get SMARTS for each molecule in the structure
:returns: The dictionary where the key is the molecule number and the
value is the corresponding SMARTS pattern
:rtype: dict
"""
return self._getMolecularPattern(self._smiles, Chem.MolToSmarts)
[docs] def getUniqueMolNums(self, use_smarts=False):
"""
Get the unique representative molecules in the structure
:param bool use_smarts: If true the unique molecules will share the
same SMARTS pattern. If false the unique molecules will share the
same SMILES pattern.
:rtype: list
:return: list of molecule numbers that are unique
"""
name_func = (self.getMoleculeSmarts
if use_smarts else self.getMoleculeSmiles)
name_mol = {y: x for x, y in name_func().items()}
return list(name_mol.values())
[docs]def deep_update_dict(source, overrides):
"""
Override/append source dict values using overrides dict, return a new dict.
Everything is deepcopied to prevent unexpected changes.
:type source: dict
:param source: Source dictionary
:type overrides: dict
:param overrides: Dictionary to override with
:rtype: dict
:return: Updated dictionary
"""
ret = copy.deepcopy(source)
for key, value in overrides.items():
if isinstance(value, Mapping) and value:
returned = deep_update_dict(source.get(key, {}), value)
ret[key] = copy.deepcopy(returned)
else:
ret[key] = copy.deepcopy(overrides[key])
return ret
[docs]def get_interface_normal(struct):
"""
Define normal interface vector in c direction.
:type struct: `schrodinger.structure.Structure`
:param struct: structure
"""
max_z = struct.getXYZ().max(axis=0)[2]
return '0:0:%.3f:0:0:%.3f' % (max_z, max_z + 1.)
[docs]def get_val_from_cmdline(args, var, default=None):
"""
Get value from the command line given variable name.
:param args: Arguments. If string it will be split into a list
:type args: list or str
:param str var: Variable name
:param Any default: Default value if var is not found
"""
if isinstance(args, str):
args = shlex.split(args)
for idx, arg in enumerate(args):
if arg == var and len(args) > idx + 1:
return args[idx + 1]
return default
[docs]def count_waters(filename=None, struct=None):
"""
Count the number of waters in the first structure in the given file
:type filename: str
:param filename: The path to a structure file
:type struct: `schrodinger.structure.Structure`
:param filename: A structure object. Either filename or struct must be given
:rtype: `schrodinger.structure.Structure`, int
:return: The first structure in the file and the number of waters in it
"""
if not bool(filename) ^ bool(struct):
raise RuntimeError('Either filename or struct must be given but not '
'both')
if filename:
struct = structure.Structure.read(filename)
# Note - can't use the 'water' alias because it detects bridging O as water
# SHARED-6231, MATSCI-6464
water_asl = ('(/H0-O0-H0/ and not /H0-O0(-H0)-H0/) and atom.ele O')
return struct, len(analyze.evaluate_asl(struct, water_asl))
# see MATSCI-2737 and SUPPORT-71364
[docs]def force_rmtree_resist_nfs(removal_dir, logger=None, failed_dir=RMTREE_FAILED):
"""
Force remove a directory tree or if it contains stale NFS handles then move
it to the specified failure repository.
:param str removal_dir: The directory tree to be removed
:param logging.Logger logger: The logging object you want to throw an error
to if we need to move the folder rather than delete it.
:param str failed_dir: The name of a failure repository/directory to put
the removed directory into
"""
msg = ('Removal of directory %s has failed because '
'it contains open files. Moving this directory '
'to %s and proceeding.')
try:
fileutils.force_rmtree(removal_dir)
except OSError:
if not os.path.exists(failed_dir):
os.mkdir(failed_dir)
if logger is not None:
logger.warning(msg % (removal_dir, failed_dir))
shutil.move(removal_dir, failed_dir)
[docs]def dedent(msg, rm_breaks=True):
'''
Light wrapper for the `textwrapper.dedent` function, but run with with a
`strip()` afterwards too. Can also remove mid-string line breaks. Useful
when paired with a pattern such as:
msg = ("""
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do
eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim
ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut
aliquip ex ea commodo consequat. Duis aute irure dolor in
reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla
pariatur. Excepteur sint occaecat cupidatat non proident, sunt in
culpa qui officia deserunt mollit anim id est laborum.
""")
This function will remove the leading & trailing linebreaks, and it will
also fix the indentations at the beginning of each new line.
:param str msg: The message string that you want to dedent.
:param bool rm_breaks: Whether or not you want to replace all linebreaks
with spaces. Typically more useful to turn off when making tooltips.
'''
dedented_msg = textwrap.dedent(msg).strip()
if rm_breaks:
dedented_msg = dedented_msg.replace('\n', ' ')
return dedented_msg
[docs]def write_csv_from_dicts(filename, rows):
"""
Write a csv file from a list of dicts. Column headers will be the
superset of all keys in the dictionaries in the rows list.
:param str filename: The name of the file to write to
:param list rows: A list of dicts, one for each row in the csv. Keys are
property names (column headers), values are row values for that
column. Each dictionary need not have key/value pairs for all columns.
"""
proplist = set()
for row in rows:
proplist.update(row.keys())
headers = sorted(proplist)
with open(filename, 'w', newline='') as propfile:
writer = csv.DictWriter(propfile, fieldnames=headers)
writer.writeheader()
for row in rows:
writer.writerow(row)