Source code for schrodinger.application.phase.packages.conformer_storage
'''
Support for serialization of "multiconformer structures".
'''
import re
import struct
import zlib
from enum import Enum
from functools import partial
import numpy
from schrodinger import structure
from schrodinger.infra import phase
_encoder = struct.Struct('f')  # "f" => "float", 4 bytes
#------------------------------------------------------------------------------#
def _str_to_bytes(s):
    return s.encode('latin-1')
def _bytes_to_str(b):
    return b.decode('latin-1')
#------------------------------------------------------------------------------#
def _copy_connectivity(st):
    '''
    Returns new CT that has the same title, connectivity and
    Lewis structure as `st`.
    :param st: Structure.
    :type st: `structure.Structure`
    :return: New structure.
    :rtype: `structure.Structure`
    '''
    natom = st.atom_total
    new = structure.create_new_structure(natom)
    new.title = st.title
    for (src, dst) in zip(st.atom, new.atom):
        dst.atom_type = src.atom_type
        dst.atomic_number = src.atomic_number
        dst.formal_charge = src.formal_charge
        dst.color = src.color
        dst.xyz = 0.0, 0.0, 0.0
    bonds = [(b.atom1.index, b.atom2.index, b.order) for b in st.bond]
    new.addBonds(bonds)
    return new
#------------------------------------------------------------------------------#
def _conformers_to_bytes(cts, keep_properties=False):
    '''
    Assumes that CTs are conformers (have same connectivity
    and Lewis structure).
    :param cts: Conformers to serialize.
    :type cts: container of `structure.Structure`
    :param keep_properties: Keep properties that would be discarded otherwise.
    :type keep_properties: bool
    :return: Bytes.
    :rtype: bytearray
    '''
    lead_ct = cts[0] if keep_properties else _copy_connectivity(cts[0])
    lead_ct_str = structure.write_ct_to_string(lead_ct)
    lead_ct_str = re.sub(r'0\.0+\s', '0 ', lead_ct_str)
    lead_ct_bytes = lead_ct_str.encode()
    num_confs = len(cts)
    num_atoms = lead_ct.atom_total
    num_float32 = 1 + 1 + 3 * num_confs * num_atoms
    num_bytes = _encoder.size * num_float32 + len(lead_ct_bytes)
    outcome = bytearray(num_bytes)
    offset = 0
    _encoder.pack_into(outcome, offset, num_atoms)
    offset += _encoder.size
    _encoder.pack_into(outcome, offset, num_confs)
    offset += _encoder.size
    for ct in cts:
        if ct.atom_total != num_atoms:
            raise RuntimeError('different number of atoms in a conformer')
        for x in numpy.ravel(ct.getXYZ(copy=False)):
            _encoder.pack_into(outcome, offset, x)
            offset += _encoder.size
    outcome[offset:] = lead_ct_bytes
    return outcome
#------------------------------------------------------------------------------#
def _bytes_to_conformers(data):
    '''
    Deserialize conformers serialized by `_conformers_to_bytes`.
    :param data: Bytes.
    :type data: bytearray
    :return: List of the conformers.
    :rtype: list(structure.Structure)
    '''
    assert len(data) >= 2 * _encoder.size
    offset = 0
    num_atoms = int(_encoder.unpack_from(data, offset)[0])
    offset += _encoder.size
    num_confs = int(_encoder.unpack_from(data, offset)[0])
    offset += _encoder.size
    lead_ct_offset = offset + 3 * _encoder.size * num_atoms * num_confs
    lead_ct_str = data[lead_ct_offset:].decode()
    lead_ct = next(structure.StructureReader.fromString(lead_ct_str))
    outcome = []
    for c in range(num_confs):
        st = lead_ct.copy()
        for atom in st.atom:
            atom.x = _encoder.unpack_from(data, offset)[0]
            offset += _encoder.size
            atom.y = _encoder.unpack_from(data, offset)[0]
            offset += _encoder.size
            atom.z = _encoder.unpack_from(data, offset)[0]
            offset += _encoder.size
        outcome.append(st)
    return outcome
#------------------------------------------------------------------------------#
[docs]def serialize_lossless(conformers, keep_properties=False, deflate=True):
    '''
    Serializes conformers to be deserialized by `deserialize_lossless()`.
    :param conformers: List of conformer structures.
    :type conformers: list(structure.Structure)
    :param keep_properties: Keep properties that would be discarded otherwise.
    :type keep_properties: bool
    :param deflate: Deflate using zlib?
    :type deflate: bool
    :return: Serialized conformers.
    :rtype: str
    '''
    raw = _conformers_to_bytes(conformers, keep_properties=keep_properties)
    zipped = \
        
zlib.compress(raw, level=zlib.Z_BEST_COMPRESSION) if deflate else raw
    return _bytes_to_str(zipped) 
#------------------------------------------------------------------------------#
[docs]def deserialize_lossless(data, inflate=True):
    '''
    Deserializes conformers serialized by `serialize_lossless()`.
    :param data: Serialized conformers.
    :type data: str
    :param inflate: Decompress using zlib?
    :type inflate: bool
    :return: Conformer structures.
    :rtype: list(structure.Structure)
    '''
    encoded = _str_to_bytes(data)
    unzipped = zlib.decompress(encoded) if inflate else encoded
    return _bytes_to_conformers(unzipped) 
#------------------------------------------------------------------------------#
[docs]def serialize_compact(conformers, keep_properties=False, deflate=True):
    '''
    Serializes conformers using approach from PHASE-2096.
    :param conformers: List of conformer structures.
    :type conformers: list(structure.Structure)
    :param keep_properties: Keep properties that would be discarded otherwise.
    :type keep_properties: bool
    :param deflate: Deflate using zlib?
    :type deflate: bool
    :return: Serialized conformers.
    :rtype: str
    '''
    if keep_properties:
        to_be_deflated = conformers
    else:
        to_be_deflated = list(map(_copy_connectivity, conformers))
        for (src, dst) in zip(conformers, to_be_deflated):
            dst.setXYZ(src.getXYZ(copy=False))
    deflator = phase.PhpConformerDeflator()
    blob = deflator.deflate(to_be_deflated)
    data = blob.getData()
    if not deflate:
        # blob is zlib-compressed
        data = zlib.decompress(data)
    return _bytes_to_str(data) 
#------------------------------------------------------------------------------#
[docs]def deserialize_compact(data, inflate=True):
    '''
    Deserializes conformers serialized by `serialize_compact`.
    :param data: Serialized conformers.
    :type data: str
    :param inflate: Decompress using zlib?
    :type inflate: bool
    :return: Conformer structures.
    :rtype: list(structure.Structure)
    '''
    encoded = _str_to_bytes(data)
    unzipped = zlib.decompress(encoded) if inflate else encoded
    inflator = phase.PhpConformerInflator(unzipped,
                                          features=phase.PhpFeatures_OMIT)
    return inflator.getStructureConformers() 
#------------------------------------------------------------------------------#
[docs]def get_api(fmt, keep_properties=False, compress=True):
    '''
    Returns conformer serializer/deserializer for format `fmt`.
    :param fmt: Desired data format.
    :type fmt: `Format`
    :param keep_properties: Keep properties that would be discarded otherwise.
    :type keep_properties: bool
    :param compress: Apply zlib compression.
    :type compress: bool
    :return: Couple of callables to serialize/deserialize conformers.
    :rtype: (iterable(structure.Structure) -> str, str -> list(structure.Structure))
    '''
    API = {
        Format.COMPACT: (serialize_compact, deserialize_compact),
        Format.LOSSLESS: (serialize_lossless, deserialize_lossless)
    }
    try:
        serialize, deserialize = API[fmt]
        return (partial(serialize,
                        keep_properties=keep_properties,
                        deflate=compress), partial(deserialize,
                                                   inflate=compress))
    except KeyError:
        raise ValueError(f'invalid conformer storage format {fmt}') 
#------------------------------------------------------------------------------#