"""
Access non-scalar structured data stored on a Structure.
Copyright Schrodinger, LLC. All rights reserved.
"""
import contextlib
from schrodinger.infra import mm
def _get_unrequested_handle(st):
"""
Return the unrequested m2io data handle for the given structure; or None
if only additional data is available.
:param st: Input structure
:type st: structure.Structure
:return: m2io data handle (or None)
:rtype: int or None
"""
try:
ur_handle = mm.mmct_ct_m2io_get_unrequested_handle(st.handle)
except mm.MmException as err:
# mmct will return MMCT_WARNING if unrequested handle is not present
if err.rc == mm.MMCT_WARNING:
return None
else:
raise # Re-raise exceptions of type MMCT_ERROR
else:
return ur_handle
[docs]def get_blocks(st, truncate=False):
"""
Return a dict representation of data blocks in given a Structure object.
A block is a dict, and subblocks are stored in a list which is a value
within the parent block's dict. This includes data in the both the
unrequested and additiona data handles.
:param st: structure object or mmct handle
:type st: structure.Structure or int
:param truncate: whether to truncate multiple instances of the same blocks
as a single dict containing only the first subblock, or return all
repeated blocks as a list of dicts
:type truncate: bool
"""
ur_handle = _get_unrequested_handle(st)
if ur_handle is not None:
unrequested_data = _get_blocks(ur_handle, truncate)
else:
unrequested_data = {}
# Get an open handle if available, otherwise open a new one
ad_handle = mm.mmct_ct_get_or_open_additional_data(st, True)
additional_data = _get_blocks(ad_handle, truncate)
# Return combined data, where additional can overwrite any unrequested
return {**unrequested_data, **additional_data}
[docs]def write_blocks(st, data, truncate=False):
"""
NOTE: This is provided for working with legacy products only; please
use structure properties instead of introducing new m2io data blocks.
Inverse function to get_blocks(). Writes a dict representation of data
blocks to a Structure object. Top-level blocks that already exist in
the Structure object and are given in the input dict representation
will be overwritten. If you wish to merge some blocks instead of
overwriting them, try exporting existing data blocks and merging it
with the desired blocks, and then pass the merged dictionary to
write_blocks. Automatic merge is not provided as the operation is
ambiguous and may be product-dependent. Note that read_blocks()-
write_blocks()-read_blocks() cycle performed on a structure may return
error if there are 'None' values in the data blocks. To circumvent the
problem, after write_blocks() try writing structure to a string and
reading it back. Afterwards you may safely perform read_blocks().
:param st: structure object or mmct handle
:type st: structure.Structure or int
:param data: data structure containing data blocks in the same format
as returned by get_blocks()
:type data: dict
:param truncate: controls whether the data provided is in
truncated representation, as described in read_blocks()
:type truncate: boolean
"""
ad_handle = mm.mmct_ct_get_or_open_additional_data(st, True)
_delete_existing_blocks(ad_handle, data)
ur_handle = _get_unrequested_handle(st)
if ur_handle is not None:
_delete_existing_blocks(ur_handle, data)
if truncate:
_write_blocks(ad_handle, data)
else:
_write_blocks_as_lists_of_dicts(ad_handle, data)
[docs]def append_row_to_data(data, table_name, row_object):
"""
Helper function to present a table-like interface for interacting
with blocks. Handles the matching of object properties to block columns,
and raises an exception if the object doesn't contain all table columns.
:param data: data object to be written to the maestro block. If no table
with the specified name is present, it creates one.
:type data: dict
:param row_object: object corresponding to the new row to be appended
:type row_object: dict
:param table_name: name of the table to be appended.
:type table_name: string
"""
if table_name not in data:
data[table_name] = {}
for column_name in row_object:
data[table_name][column_name] = []
column_objects = data[table_name]
if len(column_objects) != len(row_object):
raise ValueError(f"Row mismatch: the row has {len(row_object)} columns"
f"whereas the object has {len(column_objects)}.")
for column_name in column_objects.keys():
if column_name not in row_object:
raise ValueError(f"Row mismatch: row missing column {column_name}")
column_value = row_object[column_name]
column_objects[column_name].append(column_value)
def _delete_existing_blocks(handle, data):
for key in data.keys():
if mm.m2io_inquire_block_name(handle, key):
mm.m2io_delete_named_block(handle, key)
def _get_blocks(handle, truncate):
data = {}
_get_props(handle, data)
if truncate:
_get_blocks_as_dicts(handle, data)
else:
_get_blocks_as_lists_of_dicts(handle, data)
return data
@contextlib.contextmanager
def _m2io_goto_block(handle, block, iblock):
mm.m2io_goto_block(handle, block, iblock)
try:
yield
finally:
mm.m2io_leave_block(handle)
def _get_blocks_as_dicts(handle, data):
for block in mm.m2io_get_block_names(handle, 1):
with _m2io_goto_block(handle, block, 1):
data[block] = _get_blocks(handle, truncate=True)
def _write_blocks(handle, data):
for key, block in data.items():
if isinstance(block, dict):
mm.m2io_open_block(handle, key)
_write_blocks(handle, block)
mm.m2io_close_block(handle)
else:
if isinstance(block, list):
mm.m2io_set_index_dimension(handle, len(block))
for inx in range(0, len(block)):
_write_prop_idx(handle, key, block[inx], inx + 1)
else:
_write_prop(handle, key, block)
def _write_blocks_as_lists_of_dicts(handle, data):
for key, block in data.items():
if not isinstance(block, list):
_write_prop(handle, key, block)
elif not isinstance(block[0], dict):
mm.m2io_set_index_dimension(handle, len(block))
for inx in range(0, len(block)):
_write_prop_idx(handle, key, block[inx], inx + 1)
else:
for iblock in block:
mm.m2io_open_block(handle, key)
_write_blocks_as_lists_of_dicts(handle, iblock)
mm.m2io_close_block(handle)
def _get_blocks_as_lists_of_dicts(handle, data):
for block in mm.m2io_get_block_names(handle, 1):
block_count = mm.m2io_get_number_blocks(handle, block)
block_list = []
for iblock in range(1, block_count + 1):
with _m2io_goto_block(handle, block, iblock):
block_list.append(_get_blocks(handle, truncate=False))
data[block] = block_list
def _get_props(handle, data):
try:
dim = mm.m2io_get_index_dimension(handle)
except mm.MmException:
dim = 0
for key in mm.m2io_get_data_names(handle, -1):
if dim > 0:
data[key] = []
for i in range(1, dim + 1):
data[key].append(_get_prop_idx(handle, key, i))
else:
data[key] = _get_prop(handle, key)
def _write_prop(handle, name, val):
# return the value of a scalar property given an unrequested handle
if val is None:
mm.m2io_set_no_value(handle, [name])
elif name.startswith("r_"):
_m2io_put(mm.m2io_put_real, handle, name, val)
elif name.startswith("i_"):
_m2io_put(mm.m2io_put_int, handle, name, val)
elif name.startswith("b_"):
_m2io_put(mm.m2io_put_boolean, handle, name, val)
elif name.startswith("s_"):
_m2io_put(mm.m2io_put_string, handle, name, val)
else:
raise ValueError("Invalid property type: '%s'" % name[0])
def _write_prop_idx(handle, name, val, index):
# return the value of a scalar property given an unrequested handle
if name.startswith("r_"):
_m2io_put(mm.m2io_put_real_indexed, handle, name, val, index)
elif name.startswith("i_"):
_m2io_put(mm.m2io_put_int_indexed, handle, name, val, index)
elif name.startswith("b_"):
_m2io_put(mm.m2io_put_boolean_indexed, handle, name, val, index)
elif name.startswith("s_"):
_m2io_put(mm.m2io_put_string_indexed, handle, name, val, index)
else:
raise ValueError("Invalid property type: '%s'" % name[0])
def _get_prop(handle, name):
# return the value of a scalar property given an unrequested handle
if name.startswith("r_"):
return _m2io_get(mm.m2io_get_real, handle, name)
elif name.startswith("i_"):
return _m2io_get(mm.m2io_get_int, handle, name)
elif name.startswith("b_"):
return _m2io_get(mm.m2io_get_boolean, handle, name)
elif name.startswith("s_"):
return _m2io_get(mm.m2io_get_string, handle, name)
raise ValueError("Invalid property type: '%s'" % name)
def _get_prop_idx(handle, name, index):
# return the value of an array element property given an unrequested handle
if name.startswith("r_"):
return _m2io_get(mm.m2io_get_real_indexed, handle, name, index)
elif name.startswith("i_"):
return _m2io_get(mm.m2io_get_int_indexed, handle, name, index)
elif name.startswith("b_"):
return _m2io_get(mm.m2io_get_boolean_indexed, handle, name, index)
elif name.startswith("s_"):
return _m2io_get(mm.m2io_get_string_indexed, handle, name, index)
raise ValueError("Invalid property type: '%s'" % name)
def _m2io_put(m2io_put_func, handle, name, value, index=None):
try:
if index is None:
m2io_put_func(handle, [name], [value])
else:
m2io_put_func(handle, index, [name], [value])
except Exception as e:
print(e)
def _m2io_get(m2io_get_func, handle, name, index=None):
try:
if index is None:
return m2io_get_func(handle, [name]).pop()
else:
return m2io_get_func(handle, index, [name]).pop()
except mm.MmException as exc:
if exc.rc == mm.M2IO_NO_VALUE:
return None
raise exc