"""
Structure reading/writing.
`StructureReader` provides a convenient iterator to read structures from
files, and `StructureWriter` provides an efficient means of writing many
files.
`MultiFileStructureReader` iterates through all the structure in multiple files
using `StructureReader`.
Copyright Schrodinger LLC, All Rights Reserved.
"""
import collections
import contextlib
import csv # For SmilesCsvReader
import enum
import gzip
import os
import re
import tempfile
import warnings
from contextlib import contextmanager
from functools import partial
from schrodinger import adapter
from schrodinger.infra import canvas
from schrodinger.infra import mm
from schrodinger.infra import mmcheck
from schrodinger.infra import structure as infrastructure
from schrodinger.structure._structure import Structure
from schrodinger.structure._structure import _StructureProperty
from schrodinger.utils import csv_unicode
from schrodinger.utils import fileutils
from schrodinger.utils import mmutil
from schrodinger.utils import subprocess
from schrodinger.utils.fileutils import CIF
from schrodinger.utils.fileutils import MAESTRO
from schrodinger.utils.fileutils import MOL2
from schrodinger.utils.fileutils import PDB
from schrodinger.utils.fileutils import PHASE_HYPO
from schrodinger.utils.fileutils import SD
from schrodinger.utils.fileutils import SMILES
from schrodinger.utils.fileutils import SMILESCSV
# Placeholders for lazy (circular) module imports.
smiles = None
NO_STEREO = "none"
STEREO_FROM_GEOMETRY = "geometry"
STEREO_FROM_ANNOTATION = "annotation"
STEREO_FROM_ANNOTATION_AND_GEOM = "annotation_and_geom"
STEREO_FROM_3D = "3d"
# STEREO_FROM_3D is a deprecated version of STEREO_FROM_GEOMETRY, so is not
# included
stereo_options = set([
NO_STEREO, STEREO_FROM_GEOMETRY, STEREO_FROM_ANNOTATION,
STEREO_FROM_ANNOTATION_AND_GEOM
])
@contextmanager
def _suppress_error_reporting(error_handler):
save_level = mm.mmerr_get_level(error_handler)
mm.mmerr_level(error_handler, mm.MMERR_OFF)
try:
yield
finally:
mm.mmerr_level(error_handler, save_level)
class _ReaderWriterContextManager(object):
"""
A mixin to enable context manager usage in reader and writer classes.
"""
def __enter__(self):
return self
def __exit__(self, type, value, tb):
if hasattr(self, 'close'):
self.close()
[docs]class MaestroTextReader(_ReaderWriterContextManager):
"""
A class for reading structures from a Maestro format file. The structures
returned are TextualStructure objects. These allow read-only access to the
Structure-level properties but not to atoms or any properties which
rely on atoms.
"""
read_mode = mm.M2IO_READ_FORWARD
[docs] def __init__(self, filename, index=1, error_handler=None):
"""
Initialize the reader.
:type filename: string
:param filename:
The filename to read.
:type index: int
:param index:
The index of the first structure to read.
:type error_handler: int
:param error_handler:
The handle of the mmerr object to use for error logging.
Defaults to schrodinger.infra.mm.error_handler.
"""
self._index = index
if error_handler is None:
error_handler = mm.error_handler
mm.m2io_initialize(error_handler)
self.error_handler = error_handler
self.fh = None
self.filename = filename
def __del__(self, _m2io_terminate=mm.m2io_terminate):
self.close()
_m2io_terminate()
# required for iterator support
def __iter__(self):
return self
def _m2io_open_file(self):
try:
self.fh = mm.m2io_open_file(self.filename, self.read_mode)
except mm.MmException as e:
# If m2io_open_file returned M2IO_ERR, check to see if this is
# due to an empty file.
if e.rc == mm.M2IO_ERR and os.path.getsize(self.filename) == 0:
raise StopIteration()
else:
raise
if self._index > 1:
mm.m2io_goto_block(self.fh, mm.M2IO_BLOCK_WILDCARD_CT,
self._index - 1)
mm.m2io_leave_block(self.fh)
def __next__(self):
"""Return the next Structure object from the file. """
parse_list = []
txt = ""
if self.fh is None: # First iteration; open the file:
self._m2io_open_file()
try:
txt = mm.m2io_goto_next_block_as_text(self.fh,
mm.M2IO_BLOCK_WILDCARD_CT,
parse_list, True)
ct = infrastructure.create_structure(0)
# Title is a special case - we should store that with the CT:
datanames = ['s_m_title']
try:
ret = mm.m2io_get_string(self.fh, datanames)
mm.mmct_ct_set_title(ct.getHandle(), ret[0])
except KeyError:
pass
ur = mm.m2io_new_unrequested_handle(self.fh)
mm.mmct_ct_m2io_set_unrequested_handle(ct.getHandle(), ur)
st = TextualStructure(ct, txt)
mm.m2io_leave_block(self.fh)
except mm.MmException as e:
if e.rc == mm.M2IO_EOF:
raise StopIteration()
else:
raise Exception("Could not read the next structure from file")
return st
[docs] def close(self):
"""
Close the file.
"""
if getattr(self, "fh", None) is not None:
mm.m2io_close_file(self.fh)
self.fh = None
[docs]class MaestroReader(_ReaderWriterContextManager):
""" A class for reading structures from a Maestro (M2io) format file. """
# Make this setting a class variable so people can set it to M2IO_READ
# if needed.
read_mode = mm.M2IO_READ_FORWARD
[docs] def __init__(self,
filename,
index=1,
error_handler=None,
input_string=None):
"""
Initialize the reader.
:type filename: string
:param filename:
The filename to read.
:type index: int
:param index:
The index of the first structure to read.
:type error_handler: int
:param error_handler:
The handle of the mmerr object to use for error logging.
Defaults to schrodinger.infra.mm.error_handler.
:type input_string: string
:param input_string:
A string with the contents of a Maestro format file. If
provided, the filename argument is ignored.
"""
self.error_handler = self.getErrorHandler()
self._index = index
mm.m2io_initialize(self.error_handler)
mm.mmct_initialize(self.error_handler)
self.fh = None
self.filename = filename
self.input_string = input_string
if not filename and not input_string:
raise ValueError(
"Neither filename nor input text for MaestroReader is given")
if self.input_string and not isinstance(self.input_string, str):
t = type(self.input_string)
raise TypeError(
f"input_string of {self.__class__} needs to be type(str) but is {t}"
)
def __del__(self,
_mmct_terminate=mm.mmct_terminate,
_m2io_terminate=mm.m2io_terminate):
self.close()
_mmct_terminate()
_m2io_terminate()
# required for iterator support
def __iter__(self):
return self
[docs] def getErrorHandler(self):
"""
Returns the error handler by querying the m2io library and
if the refcount is > 0 then return the error handler that
is in use by m2io. Otherwise None is returned.
"""
if mm.m2io_refcount() > 0:
return mm.m2io_get_errhandler()
else:
return mm.MMERR_DEFAULT_HANDLER
def _open(self):
"""
Internal function to open the file.
"""
try:
if self.input_string:
self.fh = mm.m2io_open_read_from_buffer(self.input_string)
else:
if self.filename.endswith("dat"):
read_mode = mm.M2IO_READ
else:
read_mode = self.read_mode
self.fh = mm.m2io_open_file(self.filename, read_mode)
self.type = mm.m2io_get_file_type(self.fh)
except mm.MmException:
# Check to see if this is due to an empty file.
if self.input_string:
raise
elif os.path.getsize(self.filename) == 0:
raise EOFError(
"Could not open structure file due to zero size.")
else:
raise
[docs] def seek(self, position):
"""
Set the file position to the given position.
This raise an exception for zero size file.
"""
if self.fh is None:
self._open()
self.last_position = position
mm.m2io_set_file_pos(self.fh, position)
[docs] def read(self, position=None):
"""
Return the next Structure object. If position is given,
this will be honoured. Otherwise the current position is taken.
This raise an exception for zero size file, reading structure
beyond end of file indicator and m2io errors.
:raise EOFError:
on EOF or zero size file.
:raise Exception:
otherwise.
"""
try:
if position is not None:
self.seek(position)
return next(self)
except StopIteration:
raise EOFError("Could not read the next structure from file "
"due to EOF")
def __next__(self):
"""
Return the next Structure object from the file. Set
self.last_position to the file offset just before it was read.
:raise StopIteration:
on EOF or zero size file.
:raise mm.MmException or Exception:
otherwise.
"""
try:
if self.fh is None:
# First iteration; open the file:
try:
self._open()
except EOFError:
raise StopIteration()
if self._index > 1:
mm.m2io_goto_block(self.fh, mm.M2IO_BLOCK_WILDCARD_CT,
(self._index - 1))
mm.m2io_leave_block(self.fh)
if self.type == mm.M2IO_DISK_FILE or self.type == mm.M2IO_STRING:
# File position is not supported for Mmod format or
# "in-core" files, but we don't raise an exception if we're
# using such a file; rather, a NameError will arise should
# the user try to access self.last_position in this situation:
self.last_position = mm.m2io_get_file_pos(self.fh)
try:
mm.m2io_goto_next_block(self.fh, mm.M2IO_BLOCK_WILDCARD_CT)
ct = Structure(mm.mmct_ct_m2io_get(self.fh))
except mm.MmException as e:
if e.rc == mm.M2IO_EOF:
raise StopIteration()
else:
raise Exception(
f"Could not read the next structure from file: {e}")
return ct
finally:
pass
[docs] def close(self):
"""
Close the file.
"""
if getattr(self, "fh", None) is not None:
mm.m2io_close_file(self.fh)
self.fh = None
class OptionError(Exception):
"""
A parent exception class to indicate an error in setting an option.
"""
class UnsupportedOption(OptionError):
"""
An exception class to indicate an attempt to set an option that is not
supported.
"""
def __init__(self, option_name, class_name):
super(UnsupportedOption,
self).__init__("The '%s' option is not "
"supported by '%s'." % (option_name, class_name))
class UnsupportedOptionValue(OptionError):
"""
An exception class to indicate an attempt to set an option to a value
that is supported.
"""
def __init__(self, option_name, option_value, class_name):
super(UnsupportedOptionValue,
self).__init__("The '%s' value for "
"the '%s' option is not supported by '%s'." %
(option_value, option_name, class_name))
class _BaseWriter(_ReaderWriterContextManager):
"""
This class provides a common implementation for structure writers.
"""
def setOption(self, option, value):
"""
Set a single option for this writer. This method is meant for
options that may not be supported for all writer formats. See the
`StructureWriter` class documentation for details on the available
options.
Raises an OptionError subclass (either UnsupportedOption or
UnsupportedOptionValue) if unsuccessful.
:type option: str
:param option:
The name of the option to set.
:param value:
The value for the option. The data type of this parameter
depends on the option being set.
"""
# This default implementation always raises an UnsupportedOption
# exception. Override in the subclass to support option setting.
raise UnsupportedOption(option, value, self.__class__.__name__)
def _initFilename(self, filename, overwrite=True):
"""
Save filename as absolute path to make sure relative paths are
always relative to the cwd when the Writer is created.
(PYTHON-934)
"""
# retrieving the absolute path and extending Windows OS path with tag
# if number of characters in the path > 259
self.filename = fileutils.extended_windows_path(filename,
only_if_required=True)
if overwrite and os.path.isfile(filename):
# Don't use force_remove here; if write permissions are removed
# we want to honor them.
os.remove(filename)
[docs]class MaestroWriter(_BaseWriter):
"""
A class for more efficient appending of a large number of structures to
a single maestro structure file.
For writing single structures, just use the Structure.write method.
For appending a small (less than a thousand) number of structures, the
Structure.append method will perform acceptably.
"""
# Timings suggest a 5-10% speedup in write times on local disk (for 2500
# drug-like structure with a write time of about 5s) when compared with
# multiple structure.append() calls.
#
# For NFS mounted dirs (on /home in nyc) timings showed approximately a
# 30% speedup. (For 2500 structures write time goes from 17 to 11s, for
# 10000 structures write time goes from 63s to 46s.)
[docs] def __init__(self, filename, overwrite=True):
"""
Initialize needed mmlibs and open the file 'filename'.
Note that the file will not be completely written until it is
explicitly closed or the object is garbage collected.
:type filename: str
:param filename:
The filename to write to.
:type overwrite: bool
:param overwrite:
If False, append to an existing file if it exists.
"""
self.fh = None
self._initFilename(filename, overwrite=overwrite)
mm.m2io_initialize(mm.error_handler)
[docs] def append(self, ct):
"""
Append the provided structure to the open mae file. Set
self.last_position to the file offset just before it was appended.
The use of this class and method should be preferred for large
numbers of structures (say, >1000), but for smaller numbers of
structures you can use the Structure.append method directly.
"""
# Don't call the open until an append action is taken. This avoids
# the creation of a maestro file with just the s_m_m2io_version
# block in the situation where append is never called.
if self.fh is None:
self.fh = mm.m2io_open_file(self.filename, mm.M2IO_APPEND)
ct.closeBlockIfNecessary(self.fh)
# Call the method to put a structure to the mmct file. This
# allows the Structure and TextualStructure objects to do their own
# things:
self.last_position = mm.m2io_get_file_pos(self.fh)
ct.putToM2ioFile(self.fh)
[docs] def close(self):
"""
Close the file.
"""
if getattr(self, "fh", None) is not None:
mm.m2io_close_file(self.fh)
self.fh = None
def __del__(self, _m2io_terminate=mm.m2io_terminate):
"""
Close the file and terminate the mmlibs.
"""
self.close()
_m2io_terminate()
[docs]class SDWriter(_BaseWriter):
"""
A class for more efficient appending of a large number of structures to
a single SD structure file.
For writing single structures, just use the Structure.write method.
For appending a small (less than a thousand) number of structures, the
Structure.append method will perform acceptably.
"""
# subclass str to allow passing in str values (legacy)
[docs] class Options(str, enum.Enum):
stereo = 'stereo'
assume_3d = 'assume_3d' # default is False
write_v3000 = 'write_v3000' # default is False
[docs] def __init__(self, filename, overwrite=True):
"""
Initialize needed mmlibs and open the file 'filename'.
Note that the file will not be completely written until it is
explicitly closed or the object is garbage collected.
:type filename: str
:param filename:
The filename to write to.
:type overwrite: bool
:param overwrite:
If False, append to an existing file if it exists.
"""
self.fh = None
# Save filename as absolute path to make sure relative paths are
# always relative to the cwd when the SDWriter is created. (EV 72534)
self.filename = os.path.abspath(filename)
if overwrite and os.path.isfile(filename):
os.remove(filename)
self.stereo = None
self.assume_3d = None
self.write_v3000 = None
mm.mmmdl_initialize(mm.error_handler)
[docs] def setOption(self, option, value):
"""
Set an option not supported for all StructureWriter formats.
The supported options for SDWriter are:
* SDWriter.Options.stereo: NO_STEREO, STEREO_FROM_ANNOTATION, STEREO_FROM_ANNOTATION_AND_GEOM
* SDWriter.Options.assume_3d: True (default), False
* SDWriter.Options.write_v3000: True, False (default, only write V3000 for large
structures)
"""
if option not in list(type(self).Options):
raise UnsupportedOption(option, type(self).__name__)
if option == type(self).Options.stereo:
options = {o for o in stereo_options if o != STEREO_FROM_GEOMETRY}
if value in options:
self.stereo = value
return
else:
if value in (True, False):
if option == type(self).Options.assume_3d:
self.assume_3d = value
elif option == type(self).Options.write_v3000:
self.write_v3000 = value
return
raise UnsupportedOptionValue(option, value, type(self).__name__)
[docs] def append(self, ct):
"""
Append the provided structure to the open file.
"""
# First check CT is able to be written. TexualStructure objects
# are not:
if isinstance(ct, TextualStructure):
raise Exception("TextualStructure objects can not be written to "
"an SD format file")
if self.fh is None:
self.fh = mm.mmmdl_new(self.filename, mm.MMMDL_APPEND)
if self.stereo == NO_STEREO:
mm.mmmdl_set_option(self.fh, mm.MMMDL_NO_STEREO)
elif self.stereo == STEREO_FROM_ANNOTATION_AND_GEOM:
mm.mmmdl_set_option(self.fh, mm.MMMDL_STEREO)
elif self.stereo == STEREO_FROM_ANNOTATION:
mm.mmmdl_set_option(self.fh, mm.MMMDL_STEREO_BY_ANNOTATION)
if self.assume_3d is True:
mm.mmmdl_set_option(self.fh, mm.MMMDL_ASSUME_3D)
elif self.assume_3d is False:
mm.mmmdl_set_option(self.fh, mm.MMMDL_DONT_ASSUME_3D)
if self.write_v3000:
mm.mmmdl_set_option(self.fh, mm.MMMDL_WRITE_V3000)
mm.mmmdl_sdfile_put_ct(self.fh, ct)
[docs] def close(self):
"""
Close the file.
"""
if getattr(self, "fh", None) is not None:
mm.mmmdl_delete(self.fh)
self.fh = None
def __del__(self, _mmmdl_terminate=mm.mmmdl_terminate):
"""
Close the file and terminate the mmlibs.
"""
self.close()
_mmmdl_terminate()
class Mol2Writer(_BaseWriter):
"""
Mol2 support for the StructureWriter class.
"""
def __init__(self, filename, overwrite=True):
"""
Initialize needed mmlibs and open the file 'filename'.
:type filename: str
:param filename:
The filename to write to.
:type overwrite: bool
:param overwrite:
If False, append to an existing file if it exists.
"""
self.filename = os.path.abspath(filename)
if overwrite and os.path.isfile(filename):
os.remove(filename)
self.error_handler = mm.error_handler
self.fh = None
mm.mmmol2_initialize(self.error_handler)
def append(self, st):
"""
Append the provided structure to the file.
"""
# First check CT is able to be written. TexualStructure objects
# are not:
if isinstance(st, TextualStructure):
raise Exception("TextualStructure objects can not be written to "
"a mol2 format file.")
if self.fh is None:
self.fh = mm.mmmol2_new(self.filename, mm.MMMOL2_APPEND)
mm.mmmol2_put_ct(self.fh, st)
def close(self):
if getattr(self, "fh", None) is not None:
mm.mmmol2_delete(self.fh)
self.fh = None
def __del__(self, _mmmol2_terminate=mm.mmmol2_terminate):
self.close()
_mmmol2_terminate()
[docs]class PDBWriter(_BaseWriter):
"""
A class for writing PDB-formatted files. Only one structure can be written
to a PDB file. While this class overs no speed increase over the
Structure.write() method, it provides more options.
"""
[docs] def __init__(self,
filename,
reorder_by_sequence=False,
first_occ=False,
translate_pdb_resnames=True):
"""
Initialize needed mmlibs and open the file 'filename'.
Note that the file will not be completely written until it is
explicitly closed or the object is garbage collected.
:type filename: str
:param filename:
The filename to write to.
:type reorder_by_sequence: bool
:param reorder_by_sequence:
Whether to re-order the residues by sequence before writing the
PDB file.
:type first_occ: bool
:param first_occ:
If True and there are alternate occupancy sites, only the first
occupancy site will be included in the output PDB file. Otherwise,
all occupancy sites will be included.
:type translate_pdb_resnames: bool
:param translate_pdb_resnames:
If True, the pdb residue names get converted to a standard set.
If False, the translation is turned off.
NOTE: Any existing file will be overwritten when the class instance
is created.
"""
self._reorder_by_sequence = reorder_by_sequence
self.first_occ = first_occ
self.translate_pdb_resnames = translate_pdb_resnames
self._initFilename(filename)
self._num_structures_written = 0
[docs] def write(self, ct):
"""
Write the provided structure to the PDB file.
"""
if self._num_structures_written > 0:
raise RuntimeError(
"Cannot write more than one structure to PDB file.")
# First check CT is able to be written. TexualStructure objects
# are not:
if isinstance(ct, TextualStructure):
raise Exception("TextualStructure objects can not be written to "
"an PDB format file")
mm.mmpdb_initialize(mm.error_handler)
fh = mm.mmpdb_new()
if self._reorder_by_sequence:
mm.mmpdb_set(fh, mm.MMPDB_REORDER_BY_SEQUENCE)
if self.first_occ:
mm.mmpdb_set(fh, mm.MMPDB_FIRST_OCC)
if not self.translate_pdb_resnames:
mm.mmpdb_set(fh, mm.MMPDB_NO_TRANSLATE_PDB_RESNAMES)
mm.mmpdb_write(fh, ct, self.filename)
mm.mmpdb_delete(fh)
self._num_structures_written += 1
[docs] def append(self, ct):
"""
Alias to the write() method (for consistency with the other Writer
classes).
"""
self.write(ct)
[docs] def close(self):
"""
Does nothing. Added for consistency with other Writer classes.
"""
@contextlib.contextmanager
def _add_pdb_pbc_properties(st):
"""
Within a scope, adds the PDB-like PBC properties to a structure if there is a
way to determine the PBC data for the structure.
:type st: `schrodinger.Structure`
:param st: Structure to be updated within a context.
"""
def get_HM_space_group(st):
"""Use space group without spaces if it fits into PDB spec. Structure
property will be updated.
:return str: Return original space group name
"""
from schrodinger.application.matsci.nano import space_groups
spgname_original = st.property[mm.M2IO_PDB_CRYSTAL_SPACE_GROUP]
spgobj = space_groups.get_spacegroups().getSpgObjByName(
spgname_original)
if spgobj:
# Short name is HM name (MATSCI-9091)
spgname = spgobj.space_group_short_name
if len(spgname) > mm.M2IO_PDB_SPG_NAME_MAX_LEN:
spgname = spgname.replace(' ', '')
st.property[mm.M2IO_PDB_CRYSTAL_SPACE_GROUP] = spgname
return spgname_original
try:
pbc = infrastructure.PBC(st)
except:
yield
else:
added = set()
original_spgname = None
if not all(
st.property.get(n)
for n in infrastructure.LENGTHS_AND_ANGLES_PROPERTIES):
# update all if any are missing to ensure that the are consistent
for name, value in zip(infrastructure.LENGTHS_AND_ANGLES_PROPERTIES,
pbc.getBoxAngles() + pbc.getBoxLengths()):
st.property[name] = value
added.add(name)
pbc.applyToStructure(st)
if mm.M2IO_PDB_CRYSTAL_SPACE_GROUP in st.property:
original_spgname = get_HM_space_group(st)
else:
st.property[mm.M2IO_PDB_CRYSTAL_SPACE_GROUP] = mm.P1_SPACE_GROUP
added.add(mm.M2IO_PDB_CRYSTAL_SPACE_GROUP)
if mm.M2IO_PDB_CRYSTAL_Z not in st.property:
st.property[mm.M2IO_PDB_CRYSTAL_Z] = 1
added.add(mm.M2IO_PDB_CRYSTAL_Z)
try:
yield
finally:
for p in added:
del st.property[p]
if original_spgname:
st.property[mm.M2IO_PDB_CRYSTAL_SPACE_GROUP] = original_spgname
class MMCIFWriter(_BaseWriter):
"""
Write a structure to macromolecular cif aka pdbx format.
Suitable for use with applications that expect the cif format used
by the RCSB PDB, for instance. Can be read by the Schrodinger .cif
reader.
Currently uses openbabel and .pdb as a shim.
"""
_FMT = '-ommcif'
def __init__(self, filename):
"""
:type filename: str
:param filename: Name of file to which structures should be written
"""
self._initFilename(filename)
def write(self, st):
"""
Write a Structure to the file specified in the constructor.
:type st: `schrodinger.Structure`
:param st: Structure to write to a file
"""
with tempfile.NamedTemporaryFile(suffix='.pdb', delete=False) as tf:
name = tf.name
try:
with PDBWriter(name) as writer:
with _add_pdb_pbc_properties(st):
writer.append(st)
# otherwise the title is the name of the temporary file
output = subprocess.check_output([
'obabel', '-ipdb', name, '--title', st.title, self._FMT, '-O',
self.filename
],
stderr=subprocess.STDOUT)
# The pdb to cif route seems to always return a 0 exit code, so read
# the text output.
if 'err' in output.decode().lower():
raise RuntimeError(output.decode())
finally:
fileutils.force_remove(name)
def append(self, st):
raise AttributeError(
".cif files store a single structure, append is not allowed")
class CIFWriter(MMCIFWriter):
"""
Write a structure to small-molecule cif format.
Suitable for use with applications that expect the cif format used
by the Cambridge Crystalographic Database, for instance. Can be
read by the Schrodinger .cif reader.
Currently uses openbabel and .pdb as a shim.
"""
_FMT = '-ocif'
pass
[docs]class StructureWriter(_ReaderWriterContextManager):
"""
A class for efficient writing of multiple structures to a single
structure file. If you are writing a single structure, you can more
easily use the `Structure.write` method.
Options that are not supported for all formats can be set with the
setOption method, for example::
writer = StructureWriter(filename)
try:
writer.setOption(stereo=STEREO_FROM_ANNOTATION)
except OptionError:
# take action based on unsupported option/value here
Currently, the following options are available:
- `stereo`
- This option controls how stereochemical properties are written. It
does not affect the output geometry.
- This option is supported for `SD`, `SMILES`, and `SMILESCSV`,
although not all options are supported for `SD`.
- Option values are `NO_STEREO`,
`STEREO_FROM_ANNOTATION_AND_GEOM`, `STEREO_FROM_ANNOTATION`,
and `STEREO_FROM_GEOMETRY`.
- The default value is `STEREO_FROM_ANNOTATION_AND_GEOM`.
- With `STEREO_FROM_ANNOTATION_AND_GEOM`, current annotation
properties of the Structure are used when present. Chiral atoms
without annotation properties will have their stereochemistry
determined from geometry (if possible) and will be written with
definite stereochemical configuration.
- With `NO_STEREO`, no stereochemical information will be written.
- With `STEREO_FROM_ANNOTATION`, stereochemical information will
be written based only on the current annotations. Use this option to
allow for specification of stereochemistry on some centers while
leaving others undefined. This should be faster than identifying
stereochemistry from the 3D geometry.
- With `STEREO_FROM_GEOMETRY`, stereochemistry will be written
for all chiral atoms based on the 3D geometry. This option is not
supported for `SD` format.
"""
[docs] def __init__(self, filename, overwrite=True, format=None, stereo=None):
"""
Create a structure writer class based on the
format.
:type filename: str or pathlib.Path
:param filename:
The filename to write to.
:type overwrite: bool
:param overwrite:
If False, append to an existing file instead of overwriting it.
:type format: str
:param format:
The format of the file. Values should be specified by one of the
module-level constants MAESTRO, MOL2, SD, SMILES, or SMILESCSV.
If the format is not explicitly specified it will be determined
from the suffix of the filename. Multi-structure PDB files are
not supported.
:type stereo: enum
:param stereo:
Use of the stereo option in the constructor is pending
deprecation. Please use the setOption method instead.
See the class docstring for documentation on the stereo options.
"""
filename = str(filename)
format = _check_format(filename, format)
if stereo is not None:
warnings.warn(
"Use of the stereo option in the constructor is "
"deprecated. Please use either the setOption method "
"instead.",
PendingDeprecationWarning,
stacklevel=2)
if format == PDB:
if not overwrite:
raise ValueError("PDB-formatted files can not be appended to")
self.writer = PDBWriter(filename)
elif format == SD:
self.writer = SDWriter(filename, overwrite)
elif format == MAESTRO:
self.writer = MaestroWriter(filename, overwrite)
elif format == SMILES:
self.writer = SmilesWriter(filename, overwrite, stereo)
elif format == SMILESCSV:
if not overwrite:
raise ValueError(
"Smiles CSV-formatted files can not be appended to")
self.writer = SmilesCsvWriter(filename, stereo)
elif format == MOL2:
self.writer = Mol2Writer(filename, overwrite)
else:
raise ValueError(
"'%s' format is not supported by the StructureWriter" % format)
self.filename = filename # So that the user can easily get filename
self.written_count = 0 # The number of structures written
[docs] def append(self, ct):
"""
Append the provided structure to the open file.
"""
self.writer.append(ct)
self.written_count += 1
[docs] def extend(self, cts):
"""
Append all provided structures to the open file.
"""
for ct in cts:
self.writer.append(ct)
self.written_count += 1
[docs] def close(self):
"""
Close the file.
"""
self.writer.close()
[docs] def setOption(self, option, value):
# Avoid duplicating the docstring via __doc__ assignment below; see
# _BaseWriter.setOption.
self.writer.setOption(option, value)
setOption.__doc__ = _BaseWriter.setOption.__doc__
[docs] @staticmethod
def write(st, filename):
"""
Writes the given Structure to the specified file, overwriting the file
if it already exists.
:param st: structure object to write to file
:type st: structure.Structure
:param filename: filename to write to
:type filename: str or pathlib.Path
"""
with StructureWriter(filename) as writer:
writer.append(st)
[docs]class PDBReader(_ReaderWriterContextManager):
"""
A class for reading structures from a PDB format file.
"""
[docs] def __init__(self,
filename,
index=1,
error_handler=None,
all_occ=True,
use_strict_resname=False):
"""
Initialize with a filename, an optional starting index (default
of 1) and optional error_handler (default of mm.error_handler).
all_occ - Whether to include alternative positions (default=True)
use_strict_resname - Limit the residue name to 18-20 columns of pdb
record.
"""
self.error_handler = self.getErrorHandler()
self.fh = None
mm.mmpdb_initialize(self.error_handler)
self.fh = mm.mmpdb_new()
if all_occ:
mm.mmpdb_set(self.fh, mm.MMPDB_ALL_OCC)
else:
mm.mmpdb_set(self.fh, mm.MMPDB_FIRST_OCC)
if use_strict_resname:
mm.mmpdb_set(self.fh, mm.MMPDB_STRICT_RESNAME)
mm.mmpdb_open(self.fh, filename, "r")
self.current_model = index
self.filename = filename
def __del__(self, _mmpdb_terminate=mm.mmpdb_terminate):
self.close()
_mmpdb_terminate()
[docs] def close(self):
"""
Close the file.
"""
if getattr(self, "fh", None) is not None:
mm.mmpdb_delete(self.fh)
self.fh = None
# required for iterator support
def __iter__(self):
return self
def _seek_current_model(self):
with _suppress_error_reporting(self.getErrorHandler()):
try:
if self.current_model > 1:
mm.mmpdb_goto(self.fh, self.current_model)
except mm.MmException as e:
if e.rc == mm.MMPDB_EOF:
return False
else:
raise
return True
def __next__(self):
"""Return the next Structure object from the file. """
try:
mm.mmerr_suppress_print(self.getErrorHandler())
if os.path.getsize(self.filename) == 0 \
or not self._seek_current_model():
# Empty file, or reached end of file
raise StopIteration()
try:
mm.mmpdb_read(self.fh, mm.MMPDB_OVERWRITE)
pdb_ct = infrastructure.create_structure(0)
mm.mmpdb_to_mmct(self.fh, pdb_ct.getHandle())
except mm.MmException as e:
if e.rc == mm.MMPDB_EOF:
raise StopIteration()
else:
raise Exception(
f"Could not read the next structure from file: {e}")
finally:
mm.mmerr_restore_print(self.getErrorHandler())
ret_ct = Structure(pdb_ct)
self.current_model += 1
return ret_ct
[docs] def getErrorHandler(self):
"""
Returns the error handler by querying the pdb library and
if the refcount is > 0 then return the error handler that
is in use by pdb. Otherwise None is returned.
"""
if mm.mmpdb_refcount() > 0:
return mm.mmpdb_get_errhandler()
else:
return mm.MMERR_DEFAULT_HANDLER
[docs]class SDReader(_ReaderWriterContextManager):
"""
A class for reading structures from a SD format file.
"""
[docs] def __init__(self,
filename,
index=1,
error_handler=None,
ignore_errors=False,
input_string=None,
import_sdprop_as_string=False,
import_sdprop_per_file=True,
ignore_structureless=True):
"""
Initialize the reader.
:type filename: string
:param filename:
The filename to read.
:type index: int
:param index:
The index of the first structure to read.
:type error_handler: int
:param error_handler:
The handle of the mmerr object to use for error logging.
Defaults to schrodinger.infra.mm.error_handler.
:type ignore_errors: bool
:param ignore_errors:
If True, bad structures will be skipped instead of raising an
exception. If False, the caller may set self._previous_structure
to None to continue reading past the error.
:type ignore_structureless: bool
:param ignore_structureless:
If False, exception will be raised for SD entries without atoms
in case ignore_errors is also False (ignore_errors
takes precedence, ignore_structureless makes atomless
entries to be considered as errors).
:type input_string: string
:param input_string:
A string with the contents of an SD format file. If provided,
the filename argument is ignored.
:type import_sdprop_as_string: bool
:param import_sdprop_as_string:
Import all properties as strings. Setting this to True speeds
file reading.
:type import_sdprop_per_file: bool
:param import_sdprop_per_file:
Setting this to True indicates that all structures in the file
will have the same set of properties. If this can be guaranteed,
it speeds file reading.
"""
self.error_handler = self.getErrorHandler()
self.fh = None
mm.mmmdl_initialize(self.error_handler)
if input_string:
self.fh = mm.mmmdl_new_from_string(input_string)
else:
self.fh = mm.mmmdl_new(filename, mm.MMMDL_READ)
if import_sdprop_as_string:
mm.mmmdl_set_option(self.fh, mm.MMMDL_IMPORT_SDPROP_AS_STRING)
elif import_sdprop_per_file:
mm.mmmdl_set_option(self.fh, mm.MMMDL_IMPORT_SDPROP_PER_FILE)
mm.mmmdl_sdfile_fix_prop_types(self.fh, "1:")
# mm.MMMDL_STEREO is now the MMMDL default
self.current_structure = index
self._previous_structure = None
self.ignore_errors = ignore_errors
self.ignore_structureless = ignore_structureless
self.structures_skipped = 0 # number of SD structures that were skipped
[docs] def getErrorHandler(self):
"""
Returns the error handler by querying the mmmdl library and
if the refcount is > 0 then return the error handler that
is in use by mmmdl. Otherwise None is returned.
"""
if mm.mmmdl_refcount() > 0:
return mm.mmmdl_get_errhandler()
else:
return mm.MMERR_DEFAULT_HANDLER
def __del__(self, _mmmdl_terminate=mm.mmmdl_terminate):
self.close()
_mmmdl_terminate()
[docs] def close(self):
"""
Close the file.
"""
fh = getattr(self, "fh", None)
if fh is not None:
mm.mmmdl_delete(self.fh)
self.fh = None
# required for iterator support
def __iter__(self):
return self
def __next__(self):
"""
Return the next Structure object from the file.
"""
sd_ct = -1
try:
p = self._previous_structure
if p is None or p != self.current_structure:
# For performance reasons, only do a goto if this is the first
# structure we read, or if someone changed
# self.current_structure behind our back.
mm.mmmdl_sdfile_goto(self.fh, self.current_structure)
sd_ct = mm.mmmdl_sdfile_get_ct(self.fh)
except mm.MmException as e:
if e.rc == mm.MMMDL_EOF: # EOF
raise StopIteration()
else: # Could not read the next structure from SD file
# If __next__() method will get called again, read NEXT st:
self.current_structure += 1
self._previous_structure = self.current_structure
ignore = self.ignore_errors or (self.ignore_structureless and
e.rc == mm.MMMDL_NOSTRUCTURE)
if ignore:
# Skip the bad structure:
# Can skip up to 1000 structures at once.
self.structures_skipped += 1
# Will force a call to mmmdl_sdfile_goto() - Ev:123004
self._previous_structure = None
return next(self)
else:
raise Exception(
"Could not read the next structure from file")
self.current_structure += 1
self._previous_structure = self.current_structure
ret_ct = Structure(sd_ct)
return ret_ct
[docs]class StructureReader(_ReaderWriterContextManager):
"""
Read structures from files of various types.
Example usage::
# Read the first structure in a file:
st = structure.StructureReader.read('myfile.pdb')
# Read all structures from a file:
for st in structure.StructureReader('myfile.sdf'):
<do something with st>
# Start reading at the second structure entry in the file
for st in structure.StructureReader('myfile.sdf', index=2):
<do something with st>
# Assign iterator to a variable and read first 2 structures:
st_reader = structure.StructureReader('myfile.mae')
st1 = next(st_reader)
st2 = next(st_reader)
"""
[docs] def __init__(self, filename, index=1):
if isinstance(filename, infrastructure.StructureReader):
# Hack: accept reader as "filename"; used by fromString()
self.reader = filename
else:
filename = str(filename)
if not os.path.isfile(filename):
raise IOError("File does not exist: %s" % filename)
self.reader = infrastructure.StructureReader.getReader(filename)
if index != 1:
self.reader.setIndex(index)
def __iter__(self):
return self
def __next__(self):
ct = self.reader.readNext()
return Structure(ct)
[docs] def setIndex(self, index):
self.reader.setIndex(index)
[docs] def close(self):
self.reader = None
[docs] @staticmethod
def read(filename, index=1):
"""
Reads the first Structure from the given file.
:param filename: filename to read from
:type filename: str or pathlib.Path
:param index: the positional index of the structure to read
:type index: int
:return: first structure from the given file
:rtype: structure.Structure
"""
filename = str(filename)
# When reading a single structure from SD, update the iterator as to
# avoid precalculating property types for all structures in the file,
# which might be very slow
if _check_format(filename) == SD:
structure_reader = partial(SDReader, import_sdprop_per_file=False)
else:
structure_reader = StructureReader
with structure_reader(filename, index=index) as reader:
return next(reader)
[docs] @staticmethod
def fromString(input_string, index=1, format=MAESTRO):
"""
Creates a reader iterator from an input string. This is
only supported for Maestro and SD formats.
:param input_string: the string representation of the Structure.
:type input_string: str
:param index: the index of the first structure to read.
:type index: int
:param format: the string format, either MAESTRO or SD.
:type format: str
"""
if not isinstance(input_string, str):
raise TypeError("Invalid type for input_string: "
f"{type(input_string)}")
if format == MAESTRO:
reader = infrastructure.MaestroReader.fromString(input_string)
return StructureReader(reader, index=index)
elif format == SD:
return SDReader(None, index=index, input_string=input_string)
else:
raise NotImplementedError("StructureReader.fromString() does not "
f"support `{format}` format.")
def _check_format(filename, format=None):
"""
Get the format implied by the filename. If format isn't None, simply
return the format provided. Otherwise, check the filename suffix and
return one of "maestro", "pdb", "sd", "mol2", "smiles," or "smilescsv".
raise ValueError: If the suffix is unrecognized.
"""
if format is not None:
return format
format = fileutils.get_structure_file_format(filename)
if format is None:
raise ValueError(f"Unsupported file extension for file {filename}")
else:
return format
[docs]def write_cts(sts, filename):
"""
Write multiple structures to a file
:param sts: An iterable containing the structures to write
:type sts: iter
:param filename: The filename to write the structures to. File format will
be determined from the filename suffix.
:type filename: str
"""
with StructureWriter(filename) as writer:
writer.extend(sts)
[docs]def count_structures(filename):
"""
Returns the number of structures in the specified file.
For PDB files, returns the number of MODELs.
Optionally an error_handler may be specified (default of mm.error_handler).
"""
format = _check_format(filename)
if not os.path.isfile(filename):
raise IOError("File does not exist: %s" % filename)
if format in (MAESTRO, PHASE_HYPO, SD, MOL2, PDB, CIF):
return infrastructure.StructureReader.countStructures(filename)
elif format == SMILES:
with _get_file_handle(filename) as fh:
return sum((1 for line in fh if line.strip()))
elif format == SMILESCSV:
# Use the Python csv module to count rows, as each row can span
# multiple lines:
with csv_unicode.reader_open(filename) as fh:
num_structures = sum((1 for row in csv.reader(fh)))
if num_structures > 0: # First line is header
num_structures -= 1
return num_structures
# FIXME use ChmDelimitedPatterns.calculateRowCount() instead?
# Otherwise invalid format
raise ValueError(f"Unsupported file extension: {filename}")
[docs]class TextualStructure(Structure):
"""
A sub-class of Structure for use when reading from a Maestro format file
and only the structure-level properties are needed. The actual atom and
bond records are not parsed from the file and so can't actually be
accessed. The only things possible with this type of Strucure are to
access the structure level properties or to write it out unchanged to a
file. Attempts to access the atom or bond data, directly or indirectly,
will raise an exception.
The only useful way to create a TextualStructure object is via the
MaestroTextReader.
"""
[docs] def __init__(self, ct, txt):
"""
Initialize the TextualStructure object. The Structure handle will
usually have no atoms but will have an unrequested data handle
associated with it which can be used to access the Structure-level
properties. 'txt' should be the full textual representation of the
f_m_ct block as read from the Maestro format file.
"""
# Initialize the base class
Structure.__init__(self, ct)
self._text_rep = txt
def __str__(self):
"""
Return the structure object as a text string
"""
return self._text_rep
# Redefine atom, molecule, chain, residue and ring so they raise exceptions
@property
def atom(self):
raise AttributeError(
"It is not possible to access atoms for TextualStructure objects")
@property
def atom_total(self):
raise AttributeError(
"It is not possible to access atoms for TextualStructure objects")
@property
def molecule(self):
raise AttributeError(
"It is not possible to access molecules for TextualStructure "
"objects")
@property
def chain(self):
raise AttributeError(
"It is not possible to access chains for TextualStructure objects")
@property
def residue(self):
raise AttributeError("It is not possible to access residues for "
"TextualStructure objects")
@property
def ring(self):
raise AttributeError(
"It is not possible to access rings for TextualStructure objects")
@property
def property(self):
"""
Dictionary-like container of structure properties. Keys are strings of
the form `type_family_name` as described in the `PropertyName`
documentation.
:note: Unlike the `Structure.property` dictionary, this dictionary is
read-only.
"""
if self._property is None:
self._property = _StructureProperty(self, read_only=True)
return self._property
def _write_ct_as_text(self, filename, mode=mm.M2IO_WRITE):
"""
Write a TextualStructure object to a Maestro format file.
"""
fh = mm.m2io_open_file(filename, mode)
try:
# From 62436
# We may need to close the top-level block. If the file
# has just been opened then we'll need to close the header
# block. Turn off error handling as there'll be an error if the
# block wasn't actually open:
with _suppress_error_reporting(mm.error_handler):
try:
mm.m2io_close_block(fh)
except mm.MmException:
pass
mm.m2io_put_text_block(fh, str(self))
finally:
mm.m2io_close_file(fh)
[docs] def write(self, filename, format=None):
"""Write the structure to a file, overwriting any previous content.
File will only be written to Maestro format.
"""
if format is None:
fmt = _check_format(filename)
else:
fmt = format
if fmt != 'maestro':
raise Exception("Textual Structure objects can only be written to "
"Maestro format files.")
self._write_ct_as_text(filename, mm.M2IO_WRITE)
[docs] def append(self, filename, format=None):
"""
Append the structure to the file.
File will only be written to Maestro format.
"""
if format is None:
fmt = _check_format(filename)
else:
fmt = format
if fmt != 'maestro':
raise Exception("Textual Structure objects can only be written to "
"Maestro format files.")
self._write_ct_as_text(filename, mm.M2IO_APPEND)
[docs] def putToM2ioFile(self, filehandle):
"""
Used by the Maestro writer - put a single structure to
the (already open) filehandle
"""
mm.m2io_put_text_block(filehandle, self._text_rep)
[docs] def closeBlockIfNecessary(self, filehandle):
"""
Used by the Maestro writer to leave the header block if necessary.
For Structure objects this is not needed so it only returns
"""
mm.m2io_close_block(filehandle)
[docs] def getStructure(self):
"""
Return a Structure object for this TextualStructure by parsing the
internal text representation into an mmct.
"""
with MaestroReader("", input_string=self._text_rep) as reader:
return next(reader)
[docs] @staticmethod
def read(filename):
"""Reads the first structure from a Maestro file. TextualStructure
will only read from files in Maestro format.
"""
format = _check_format(filename)
if format == MAESTRO:
with MaestroTextReader(filename) as reader:
return next(reader)
else:
raise ValueError("TextualStructure can only read from "
"Maestro format files.")
[docs]class SmilesStructure(object):
"""
SMILES representation of a Structure that is returned by SmilesReader
and SmilesCsvReader. When written to a SMILES-formatted file, properties
other than the title are not retained.
When the USE_RDKIT_FOR_SMILESSTRUCTURE feature flag is enabled, CXSMILES
are supported (the extension string is part of the 'smiles' member).
"""
mmsmiles_initialized = False
[docs] def __init__(self, pattern, properties=None):
self.smiles = pattern
if not properties:
self.property = {"s_m_title": ""}
else:
if isinstance(properties, str):
# Support for previous
self.property = {"s_m_title": properties}
else:
self.property = dict(properties)
def __str__(self):
"""
Return a string representation of this structure.
"""
return "SmilesStructure(%s)" % (self.smiles)
[docs] def write(self, filename):
"""
Write the structure to a SMILES formatted file.
"""
with _get_file_handle(filename, 'wt') as fh:
self._writeSmiles(fh)
[docs] def append(self, filename):
"""
Append the structure to a SMILES formatted file.
"""
with _get_file_handle(filename, 'at') as fh:
self._writeSmiles(fh)
def _writeSmiles(self, fh):
text = "%s %s\n" % (self.smiles, self.property["s_m_title"])
fh.write(text)
def _writeSmilesCsv(self, filename, append):
pass
[docs] def get2dStructure(self, add_hydrogens=False):
"""
Return a 2D Structure object for this SMILES. The structure will have
only 2D coordinates, with stereo annotation properties for chiral
atoms with specified chirality. NOTE: Use for 2D applications only.
:rtype: `Structure.Structure`
:return: 2D structure.
:raises ValueError: if self.smiles is set to an invalid SMILES string.
"""
st = None
if mmutil.feature_flag_is_enabled(mmutil.USE_RDKIT_FOR_SMILESSTRUCTURE):
st = adapter.to_structure(self.smiles,
adapter.Generate2DCoordinates.Enable)
else:
adaptor = canvas.ChmMmctAdaptor()
try:
chmmol = canvas.ChmMol.fromSMILES(
self.smiles.split(maxsplit=1)[0])
except RuntimeError as err:
if str(err).startswith("Unable to parse SMILES"):
raise ValueError(err)
raise
if add_hydrogens:
atom_option = canvas.ChmAtomOption.H_All
h_visibility = canvas.optionMDL.H_Visible
else:
atom_option = canvas.ChmAtomOption.H_Default
h_visibility = canvas.optionMDL.H_AsWritten
canvas.Chm2DCoordGen.generateAndApply(chmmol, atom_option)
st = Structure(adaptor.create(chmmol, False, h_visibility))
st.property.update(self.property)
return st
[docs] def get3dStructure(self, require_stereo=True):
"""
Return a 3D Structure object for this SMILES with all hydrogens
added.
:type require_stereo: bool
:param require_stereo: Whether to require all chiral centers to have
defined stereochemistry via annotation properties. Defaults to
True. UndefinedStereochemistry exception is raised if any chiral
atom has ambiguous chirality. If set to False, ambiguous
chiralities will be expanded arbitrarily.
:rtype: `Structure.Structure`
:return: Volumized 3D structure.
"""
st = self.get2dStructure()
st.generate3dConformation(require_stereo)
return st
@property
def title(self):
return self.property['s_m_title']
@title.setter
def title(self, title):
self.property['s_m_title'] = title
[docs]class SmilesReader(_ReaderWriterContextManager):
"""
A class for reading structures from a SMILES formatted file.
Returns instances of SmilesStructure.
When the USE_RDKIT_FOR_SMILESSTRUCTURE feature flag is enabled,
this class will parse CXSMILES strings.
"""
[docs] def __init__(self, filename, index=1):
"""
Initialize with a filename, an optional starting index (default of 1).
"""
format = _check_format(filename)
if format != SMILES:
raise Exception("SmilesReader can read only SMILES-formatted files")
self.fh = _get_file_handle(filename)
current_structure = 1
while current_structure < index:
line = self.fh.readline()
if not line:
raise Exception("SmilesReader: reached EOF before reaching "
"specified position (%i)" % index)
current_structure += 1
def __del__(self):
if getattr(self, "fh", None) is not None:
self.fh.close()
self.fh = None
# required for iterator support
def __iter__(self):
"""
Return the iterator for all SmilesStructures from the file
"""
return self
def __next__(self):
"""
Return the next SmilesStructure from the file.
Raises StopIteration on EOF.
"""
line = self.fh.readline()
if not line: # EOF
raise StopIteration
if not line.strip():
return next(self) # Skip blank line
s = line.rstrip("\r\n").split(None, 1) # Fix for PYAPP-4659
pattern = s[0]
if len(s) == 1:
title = ''
else:
# Check for CXSMILES extensions
if s[1].startswith('|') and s[1].count('|') >= 2:
separator = line[len(pattern)]
try:
# CXSMILES can have other CXSMILES embedded inside curly braces,
# see examples in the R Group section in
# https://docs.chemaxon.com/display/docs/chemaxon-extended-smiles-and-smarts-cxsmiles-and-cxsmarts.md
# Find the CXSMILES delimiter + the .smi file separator
ext_end = s[1].index(f'|{separator}')
except ValueError:
if s[1].endswith('|'):
pattern = f'{pattern}{separator}{s[1]}'
properties = {"s_m_title": ''}
return SmilesStructure(pattern, properties)
else:
pattern = f'{pattern}{separator}{s[1][:ext_end + 1]}'
properties = {
"s_m_title": _unquote_string(s[1][ext_end + 2:])
}
return SmilesStructure(pattern, properties)
title = s[1]
properties = {"s_m_title": _unquote_string(title)}
return SmilesStructure(pattern, properties)
[docs]class SmilesCsvReader(_ReaderWriterContextManager):
"""
A class for reading structures from a SMILES CSV formatted file.
This format is used by Canvas.
Returns instances of SmilesStructure.
When the USE_RDKIT_FOR_SMILESSTRUCTURE feature flag is enabled,
this class will parse CXSMILES strings. The extension string
must be part of the SMILES field, and must be enclosed in double
quotes in case it contains any commas.
"""
[docs] def __init__(self, filename, index=1):
"""
Initialize with a filename, an optional starting index (default of 1).
"""
format = _check_format(filename)
if format != SMILESCSV:
raise Exception(
"SmilesCsvReader can read only SMILES CSV-formatted files")
self.fh = _get_file_handle(filename)
self.reader = csv.DictReader(self.fh)
self.reader.fieldnames = [
_unquote_string(key) for key in self.reader.fieldnames
]
if not any(key.lower() == 'smiles' for key in self.reader.fieldnames):
warnings.warn(
f'Could not find a header row in {filename}; assuming field names are [SMILES, TITLE]'
)
self.fh.seek(0)
self.reader = csv.DictReader(self.fh,
fieldnames=['smiles', 'title'])
# Increment to specified index
try:
for i in range(1, index):
next(self.reader)
except StopIteration:
raise ValueError("Structure index %i is not in input file "
"(total %i structures)" % (index, i))
def __del__(self):
self.close()
# required for iterator support
def __iter__(self):
"""
Return the iterator for all SmilesStructures from the file
"""
return self
def __next__(self):
"""
Return the next SmilesStructure from the file.
Raises StopIteration on EOF.
"""
row = next(self.reader)
prop_dict = {}
pattern = None
for key, value in row.items():
# Skip keys with missing values or extra values without a key
if key is None or value is None:
continue
key_lower = key.lower()
value = _unquote_string(value)
if key_lower == 'smiles':
pattern = value
elif key_lower in ('name', 's_m_title', 'title', 'idnumber'):
prop_dict['s_m_title'] = value
else:
prop_key, prop_value = _csv_parse_prop_value(key, value)
prop_dict[prop_key] = prop_value
return SmilesStructure(pattern, prop_dict)
[docs] def close(self):
if getattr(self, "fh", None) is not None:
self.fh.close()
self.fh = None
[docs]class SmilesWriter(_BaseWriter):
"""
More efficient writing of a large number of structures to a single SMILES
file.
"""
[docs] def __init__(self, filename, overwrite=True, stereo=None):
"""
:type filename: str
:param filename:
The filename to write to.
:type overwrite: bool
:param overwrite:
If False, append to an existing file if it exists.
:type stereo: enum
:param stereo:
See the `StructureWriter` class for documentation on the
allowed values.
"""
self.fh = None
self.filename = os.path.abspath(filename)
self._smiles_generator = None # for writing Structure objects
if stereo is None:
self._stereo = STEREO_FROM_ANNOTATION_AND_GEOM
else:
self._stereo = stereo
if overwrite and os.path.isfile(filename):
fileutils.force_remove(filename)
[docs] def append(self, st):
"""
Append the provided structure to the open SMILES file.
"""
_lazy_import_smiles()
if self.fh is None:
self.fh = _get_file_handle(self.filename, 'at')
if isinstance(st, SmilesStructure):
st._writeSmiles(self.fh)
else: # Assume st is a Structure object (generate SMILES)
if not self._smiles_generator:
self._smiles_generator = smiles.SmilesGenerator(
stereo=self._stereo, unique=True)
pattern = self._smiles_generator.getSmiles(st)
text = "%s %s\n" % (pattern, st.title)
self.fh.write(text)
[docs] def close(self):
"""
Close the file.
"""
if getattr(self, "fh", None) is not None:
self.fh.close()
self.fh = None
def __del__(self):
"""
Close the file when instance is deleted.
"""
self.close()
[docs]class SmilesCsvWriter(_BaseWriter):
"""
More efficient writing of a large number of structures to a single
SMILES CSV file.
"""
[docs] def __init__(self, filename, stereo=None, props=None):
"""
:note:
Excessive memory may be used by this class if the props argument
is not specified and many structures are appended.
:type filename: str
:param filename:
The filename to write to.
:type stereo: enum
:param stereo:
See the `StructureWriter` class for documentation on the
allowed values.
:type props: list
:param props:
List of property names to export. If specified, then the CSV header
is derived from this list, and structure lines are written by the
append() method. If not specified, then CSV header will include all
properties of all structures, and the output file will only be
written when the close() method is called. (All structures will
be cached in memory until flushed to disk.)
"""
self.fh = None
self.filename = os.path.abspath(filename)
self._smiles_generator = None # for writing Structure objects
if stereo is None:
self._stereo = STEREO_FROM_ANNOTATION_AND_GEOM
else:
self._stereo = stereo
if props is None:
self._props = None
else:
self._props = props
self._ct_data_list = []
self._ct_prop_names = []
# NOTE: Always overwriting, because of the Canvas CSV header line:
if os.path.isfile(filename):
fileutils.force_remove(filename)
[docs] def append(self, st):
"""
Append the provided structure to the open SMILES CSV file.
"""
pattern, prop_dict = self._getCtData(st)
if self._props:
# props argument was specified
if self.fh is None:
# write header:
self._ct_prop_names = self._props
self._openWriter(self._ct_prop_names)
# write st:
self._writeRow(pattern, prop_dict)
else:
# Expand internal prop list:
new_props = [p for p in prop_dict if p not in self._ct_prop_names]
self._ct_prop_names.extend(new_props)
self._ct_data_list.append((pattern, prop_dict))
def _openWriter(self, propnames):
"""
Open the CSV writer and write the header derived from
the specified property names.
"""
self.fh = _get_file_handle(self.filename,
'wt',
encoding="utf-8",
newline="")
header = ['SMILES']
# Always call the title "NAME":
if 's_m_title' in propnames:
header.append('NAME')
for propname in propnames:
if propname.startswith('s_csv_'):
propname = propname[6:]
if propname != 's_m_title':
header.append(propname)
self.writer = csv.DictWriter(self.fh,
fieldnames=header,
extrasaction='ignore')
self.writer.writeheader()
def _writeRow(self, pattern, prop_dict):
"""
Write a row to the CSV file, include all properties in
self._ct_prop_names.
"""
prop_dict['SMILES'] = pattern
prop_dict['NAME'] = prop_dict.get('s_m_title', '')
self.writer.writerow(prop_dict)
def _getCtData(self, st):
_lazy_import_smiles()
prop_dict = collections.OrderedDict(st.property)
if isinstance(st, SmilesStructure):
pattern = st.smiles
else: # Assume st is a Structure object (generate SMILES)
if not self._smiles_generator:
self._smiles_generator = smiles.SmilesGenerator(
stereo=self._stereo, unique=True)
pattern = self._smiles_generator.getSmiles(st)
return (pattern, prop_dict)
[docs] def close(self):
"""
Close the file.
"""
if self._props:
# props argument was specified
if self.fh is not None:
self.fh.close()
self.fh = None
else:
if self._ct_data_list: # If structures were written
# write header row:
self._openWriter(self._ct_prop_names)
for pattern, prop_dict in self._ct_data_list:
# write row
self._writeRow(pattern, prop_dict)
self.fh.close()
self.fh = None
def __del__(self):
"""
Close the file when instance is deleted.
"""
self.close()
[docs]class MultiFileStructureReader(_ReaderWriterContextManager):
"""
Provides a single iterator that reads structure from multiple files.
Typical usage is idential to typical useage of the StructureReader class
except that the class is instantiated with a python list of file names
rather than a single file name.
By default, the StructureReader class is used to read the files, but this is
customizable with the reacer_class keyword.
API Example::
names = ['file1.mae', 'file2.mae', 'file3.pdb']
reader = MultiFileStructureReader(names)
first_struct = next(reader)
for struct in reader:
do stuff
By default, the reader skips files that raise Exceptions and stores the list
of skipped files in the failed_files property.
The current StructureReader can be accessed with the reader property
"""
[docs] def __init__(self, files, *args, **kwargs):
"""
Create a MultiFileStructureReader
:type files: list
:param files: A list of paths to files to be read
:type reader_class: Reader class
:keyword reader_class: By default, StructureReader is used to read the
files. A more specific class can be provided, such as PDBReader
:type pass_errors: bool
:keyword pass_errors: If True, any filename that raises an expected
exception will be skipped. Skipped filenames are stored in the
failed_files property and can be retrieved after reading. Items of the
failed_files list are tuples (filename, error_message). Expected
Exceptions include: IOError (does not exist, or unreadable), ValueError
(unknown extension), MmException (error opening file) or an Exception
while reading structures. The default of False will cause the
exceptions to be raise'd.
:type skip_receptors: bool
:keyword skip_receptors: Whether to skip receptors of PV files.
Any additional parameters and keyword arguments are passed to the
structure reader class.
"""
self.reader_class = kwargs.pop('reader_class', StructureReader)
""" The class used to read files """
self.pass_errors = kwargs.pop('pass_errors', False)
""" False if exceptions should be raised, True if they should be caught
"""
self.skip_receptors = kwargs.pop('skip_receptors', False)
self.args = args
self.kwargs = kwargs
self.files = files[:]
""" List of files remaining to be read """
self.current_filename = ""
""" The file currently being read """
self.index_in_current_file = None
""" Index of current structure in current file """
self.failed_files = []
""" List of (failed_file_name, error_message) """
self.reader = None
""" Current file reader """
self._createNewReader()
def __iter__(self):
"""
Required to make the class an iterator
"""
return self
def _createNewReader(self):
"""
Create a file reader for the next file. Sets self.reader = None if there
are no more files to be read.
"""
if self.reader is not None:
# Explicitly closing and unsetting the reader helps with garbage
# collection in fast loops.
self.reader.close()
self.reader = None
self.index_in_current_file = None
while self.reader is None:
try:
# Raises IndexError if the file list is now empty
self.current_filename = self.files.pop(0)
except IndexError:
# The file list was empty
self.reader = None
self.index_in_current_file = None
return
try:
self.reader = self.reader_class(self.current_filename,
*self.args, **self.kwargs)
self.index_in_current_file = 0
if self.skip_receptors and fileutils.is_poseviewer_file(
self.current_filename):
next(self.reader)
self.index_in_current_file = 1
except (IOError, ValueError, mmcheck.MmException) as exc:
# Possible expected errors
if self.pass_errors:
self.failed_files.append((self.current_filename, str(exc)))
else:
raise
def __next__(self):
"""
Get the next structure to process. This might either be the next
structure in the currently open file, or might result in the next file
being opened.
:raise StopIteration: When all structures in all files have been read
"""
while True:
# Just looping through until a reader is successfully created and we
# return a structure, or we hit the end of all the files.
if self.reader is None:
# No more files, we're done
raise StopIteration()
try:
self.index_in_current_file += 1
return next(self.reader)
except StopIteration:
# No more structures in the current file, start the next file
self._createNewReader()
except Exception as exc:
if self.pass_errors:
self.failed_files.append((self.current_filename, str(exc)))
self._createNewReader()
else:
# This raises the caught Exception because we don't
# recognize it
raise
[docs]class MultiFileStructureWriter(_ReaderWriterContextManager):
"""
Similar to StructureWriter, except that it writes to multiple files,
while keeping the number of structures per file under sts_per_file.
Files will be named <basename>-NNN<extension>. Default extension is .maegz.
Options:
basename - The base name of the written files
extension - The extension of the written files (default ".maegz")
sts_per_file - Maximum number of structures to write to each file
Usage::
writer = MultiFileStructureWriter(out_basename, ".maegz", 50)
for st in sts:
writer.append(st)
writer.close()
written_files = writer.getFiles()
"""
[docs] def __init__(self, basename, extension=".maegz", sts_per_file=100000):
self._basename = basename
self._extension = extension
self._max_file_size = sts_per_file
self._files = []
self.current_filename = None
self.current_writer = None
self.index_in_current_file = 0
self._total_sts_written = 0
[docs] def append(self, st):
if not self.current_filename or self.index_in_current_file >= self._max_file_size:
if self.current_writer:
self.current_writer.close()
filenum = len(self._files) + 1
self.current_filename = '%s-%s%s' % (
self._basename, str(filenum).zfill(3), self._extension)
self.current_writer = StructureWriter(self.current_filename)
self._files.append(self.current_filename)
self.index_in_current_file = 0
self.current_writer.append(st)
self.index_in_current_file += 1
self._total_sts_written += 1
[docs] def getFiles(self):
"""
Return a list of file paths for the written files.
"""
return self._files
[docs] def getNumStructures(self):
"""
Return the total number of structures that were written.
"""
return self._total_sts_written
[docs] def close(self):
"""
Close any open file handles
"""
if self.current_writer:
self.current_writer.close()
def _lazy_import_smiles():
"""
Import schrodinger.structutils.smiles
"""
# Can not be done earlier due to circular import
global smiles
if smiles is None:
import schrodinger.structutils.smiles as smiles
def _get_file_handle(filename, mode='rt', *, encoding=None, newline=None):
"""
Helper function to open either a common file handle, or a gzipped
one. Text mode is required for csv files, so we enforce it here.
"""
opts = {'mode': mode, 'encoding': encoding, 'newline': newline}
fname = str(filename) # filename may be a pathlib obj
if fname.lower().endswith('gz'):
if 't' not in mode:
raise IOError('Gzipped files must be opened in text mode.')
return gzip.open(filename, **opts)
return open(filename, **opts)
def _csv_parse_prop_value(key, value):
"""
Parse a property key, and if it has a proper type prefix, cast
the associated value into the proper type. If there is no type
prefix, or the cast fails, mark the key as being a string from
a csv file.
"""
if key[1] != '_' or key.find('_', 3) == -1:
return _format_custom_property(key), value
if key[0] == 'i':
cast_function = int
elif key[0] == 'r':
cast_function = float
elif key[0] == 'b':
cast_function = bool
elif key[0] == 's':
return key, value
else:
return _format_custom_property(key), value
try:
return key, cast_function(value)
except ValueError:
return _format_custom_property(key), value
def _unquote_string(s):
s = s.strip()
if len(s) > 1 and s[0] == s[-1] and s[0] in ('"', "'"):
s = s[1:-1]
return s
def _format_custom_property(key):
# escape all free standing underscores
key = re.sub(r'(?<=[^\\])_', r'\_', key)
key = key.replace(' ', '_')
return f's_csv_{key}'