"""
Functionalities to handle molecular topologies
Copyright Schrodinger, LLC. All rights reserved.
"""
import copy
import os
from collections.abc import Iterable
import numpy
import schrodinger.application.desmond.packages.msys.pfx as pfx_module
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import constants
from schrodinger.application.desmond.packages import msys
from schrodinger.infra import mm
# - We face two topology models: cms and msys.
# - The cms topology model is more compatible with all Schrodinger stuff,
# whereas the msys model is with the Desmond backend and the trajectory data
# model.
# - A major goal of this module is to bridge the two models.
[docs]class DuckFrame:
"""
A duck-type frame with limited interface.
We can duck-type a msys model (i.e., a `msys.System` object) into a
`DuckFrame` object, example::
dfr = DuckFrame(msys_model)
Changes on the `DuckFrame' object should NOT affect the original `model`
object.
"""
# FIXME: I only implemented the interface for what is needed now. We may
# need enhancements down the road, but that should be easy.
# -YW (Jun 2, 2016)
[docs] def __init__(self, model):
"""
Copy the coordinates, the velocities, and the box matrix of the original
model.
FIXME: We only support `msys.System` type of `model` for now.
"""
if isinstance(model, msys.System):
self._pos = model.getPositions()
self._vel = model.getVelocities()
self._box = model.getCell()
self._natoms = model.natoms
self._nactive = model.nactive
else:
NotImplementedError("Unsupported model type: %s" % type(model))
[docs] def pos(self, i=None):
"""
Return the position vector(s).
i can be any of the following types:
- `None`: Return the whole position-vector array;
- `int` : `i` should be a GID, and this function return
position vector of the particle `i`.
- `Iterable`: `i` should be a "list" of GIDs, and this function
returns a NEW numpy array of position-vectors of the
specified particles.
"""
i = numpy.asarray(i) if isinstance(i, Iterable) else i
return self._pos if i is None else self._pos[i]
[docs] def vel(self):
return self._vel
@property
def natoms(self):
return self._natoms
@property
def nactive(self):
return self._nactive
@property
def box(self):
"""
Return a row-majored 3x3 matrix. The primitive cell vectors are the
rows of the matrix, which should be consistent with `traj.Frame.box`.
"""
return self._box
[docs]def cms_atom(cms_model):
"""
Returns an iterator through all atoms in a CMS model. At each iteration, we
get a tuple of `(fsys_atom, comp_atom, comp_ct, ct_index)`, where
- fsys_atom: atom in full-system CT
- comp_atom: atom in component CT
- comp_ct: component CT to which `comp_atom` belong
- ct_index: index of the component CT
The iterator breaks right before returning any inactive atoms, unlike the
function below that returns indices of all atoms (including inactive atoms).
"""
i = 1
for ct_index, ct in enumerate(cms_model.comp_ct):
for a in ct.atom:
if i > cms_model.atom_total:
# We have iterated through all active atoms.
break
yield (cms_model.atom[i], a, ct, ct_index)
i += 1
[docs]def cms_atom_index(cms_model):
"""
Returns an iterator through all atom indices in a CMS model. At each
iteration, we get a tuple of
`(fsys_atom_index, comp_atom_index, comp_ct, ct_index)`, where
- fsys_atom_index: atom index in full-system CT
- comp_atom_index: atom index in component CT
- comp_ct: component CT to which `comp_atom_index` belong
- ct_index: index of the component CT
This function differs from the above in that it returns indices of all atoms
including inactive atoms. Note that the `fsys_atom_index` for inactive atoms is
merely a "projected" index as the fullsystem CT doesn't really contain any
inactive atoms.
"""
fsys_idx = 1
for ct_index, ct in enumerate(cms_model.comp_ct):
for comp_atom_index in range(1, ct.atom_total + 1):
yield (fsys_idx, comp_atom_index, ct, ct_index)
fsys_idx += 1
[docs]def aid_match(cms_model):
"""
:rtype: 1D '1-indexed' `numpy.ndarray` of ints
:return: Returns an array of atom indices. Index = 1-based atom index as in
the full-system CT, value = index of the matched atom (0 means not
matched).
"""
match = numpy.zeros(cms_model.comp_atom_total + 1, dtype=int)
if cms_model.ref_ct and cms_model.mut_ct:
aid_delta = [
0,
]
for ct in cms_model.comp_ct:
aid_delta.append(aid_delta[-1] + ct.atom_total)
ref_ct = cms_model.ref_ct
mut_ct = cms_model.mut_ct
ref_ct_index = cms_model.comp_ct.index(ref_ct)
mut_ct_index = cms_model.comp_ct.index(mut_ct)
ref_aid_delta = aid_delta[ref_ct_index]
mut_aid_delta = aid_delta[mut_ct_index]
for atom in mut_ct.atom:
aid = mut_aid_delta + atom.index
matched_aid = atom.property[constants.FEP_MAPPING]
if matched_aid:
matched_aid += ref_aid_delta
match[aid] = matched_aid
match[matched_aid] = aid
return match
[docs]def pseudoatom_match(msys_model, cms_model):
"""
This function will find the match between pseudoatoms and return it as two
lists of pseudoatom indices. The first list is for the reference CT, and the
second for the mutant CT. ::
match = pseudoatom_match(msys_model, cms_model)
j = match[0][i]
where `j` is the matched pseudoatom's index in the ffio_pseudo block of the
mutant CT, `i` is the pseudoatom's index in that of the reference CT. If
i-th pseudoatom is not matched, j's value is zero. `match[0][0]` is
always 0, which is junk and should be ignored. ::
j = match[1][i]
is similar, except that `i` is of the mutant CT, and `j` is of the reference
CT.
For non-alchemical-FEP systems, this function returns `[]`.
:type msys_model: `msys.System`
:type cms_model: `cms.Cms`. This object should be created by the
`read_cms(...)` function (see below).
:rtype: `[list[int], list[int]]` or `[]`
"""
if cms_model.fep_ct is None:
return []
# The system may contain many more pseudoatoms than what we care about; for
# example, the system may use TIP4P water model and then have millions of
# pseudoatoms, and also the protein molecules may have pseudoatoms. We
# must NOT include those in our returned list. So we need to first figure
# out a rough range of the pseudoatoms and then get pseudatom match for
# those within the range.
ref_ct = cms_model.ref_ct
mut_ct = cms_model.mut_ct
ref_ct_index = cms_model.comp_ct.index(ref_ct)
mut_ct_index = cms_model.comp_ct.index(mut_ct)
# We should NOT assume which CT comes first.
first_ct_index = min(ref_ct_index, mut_ct_index)
last_ct_index = max(ref_ct_index, mut_ct_index) + 1
atom_totals = [ct.atom_total for ct in cms_model.comp_ct]
first_aid = 1 + sum(atom_totals[:first_ct_index])
first_gid = cms_model.gid(first_aid)
if len(cms_model.comp_ct) <= last_ct_index:
# If `ref_ct' or `mut_ct' is the last CT (e.g., in vacuum alchemical
# FEP simulations), this is the correct way to get `last_gid'.
last_gid = msys_model.natoms
else:
last_aid = 1 + sum(atom_totals[:last_ct_index])
last_gid = cms_model.gid(last_aid)
raw_match = [
msys_model.atom(i)["fep_mapping"]
for i in range(first_gid, last_gid)
if msys_model.atom(i).atomic_number == 0
]
mut_pseudomatch = [0] * (len(mut_ct.ffio.pseudo) + 1)
ref_pseudomatch = [0]
for i in range(len(ref_ct.ffio.pseudo)):
j = raw_match[i]
ref_pseudomatch.append(j)
if j > 0:
mut_pseudomatch[j] = i + 1
return [ref_pseudomatch, mut_pseudomatch]
# FIXME: this action should happen somewhere before using the protein-ligand
# interaction analyzers if constants.ORIGINAL_INDEX is missing in cms_model
[docs]def set_original_index_property(cms_model):
"""
For each atom in the full-system CT, add an atom-level property:
constants.ORIGINAL_INDEX, and set its value to the atom's ID.
"""
for a in cms_model.fsys_ct.atom:
a.property[constants.ORIGINAL_INDEX] = a.index
for a in cms_model._raw_fsys_ct.atom:
a.property[constants.ORIGINAL_INDEX] = a.index
[docs]def find_traj_path(cms_model, base_dir=None):
"""
Return path of the trajectory file or dir, whose name was saved into a
CT-level property in the output cms file. Return `None` is valid path cannot
be found.
Note that this function is temporary. In the long run the coupling between
trajectory directory and cms file will be removed. The user should be
explicit about where the trajectory file/folder is.
If you are developing a new command line tool, please do not rely on this
function to get the trajectory path. Instead, explicitly pass in the trajectory
path. See an example below:
$SCHRODINGER/run analyze_simulation.py -h
:type cms_model: `cms.Cms`
:type base_dir: `str`
:rtype: `str` or `None`
"""
try:
traj_path = cms_model.property['s_chorus_trajectory_file']
except KeyError:
return None
traj_path = cms.fix_idx_traj_path(traj_path)
if base_dir:
traj_path = os.path.join(base_dir, os.path.basename(traj_path))
if os.path.exists(traj_path):
return traj_path
[docs]def find_traj_path_from_cms_path(cms_path):
"""
Return path of the trajectory directory/file that is "associated" with the
given CMS file. This works only if both are in the same parent directory.
:param cms_path: Path to the CMS file
:type cms_path: `str`
:return: Path to the trajectory directory or file.
:rtype: `str` or `None`
"""
if not os.path.isfile(cms_path):
return None
st = cms.structure.StructureReader.read(cms_path)
return find_traj_path(st, os.path.dirname(cms_path))
[docs]def read_cms(fname=None, from_string=None, remove_inactive_atoms_in_fsys=True):
"""
Read a .cms file from the given file name `fname`, or from a string buffer.
How does this differ from `cms.Cms(fname)`?
- Here, two models will be created for the given .cms file. So you will
get 2-tuple return value: The first member is the msys model, and the
second is the cms model.
- The cms model returned by this function will give you correct gids,
whereas the model returned by `cms.Cms(fname)` might not do so when
there are virtual sites.
- The cms model returned by this function will have three extra attributes:
- `pseudoatoms` This is a map from the gid of a physical atom to that
of its pseudoatom(s).
- `pseudomatch` This is pseudoatom match between the two alchemical
molecules. (See the docstring of `pseudoatom_match` for detail)
Will we unite this with `cms.Cms(fname)`? Maybe. But I don't see a
strong motivation right now, and I don't want to overengineer.
-YW (May 26, 2016)
FIXME: `msys.LoadMAE(...)` is unbelievably slow and may take up to 10 sec
for typically-sized alchemical FEP systems.
"""
assert bool(fname) ^ bool(from_string)
if fname:
model0 = msys.LoadMAE(path=fname)
model1 = cms.Cms(
file=fname,
remove_inactive_atoms_in_fsys=remove_inactive_atoms_in_fsys)
else:
model0 = msys.LoadMAE(buffer=from_string.encode())
model1 = cms.Cms(
string=from_string,
remove_inactive_atoms_in_fsys=remove_inactive_atoms_in_fsys)
if model1.nactive_gids >= 0:
model0.nactive = model1.nactive_gids
# The gids in `model1' is incorrect when there are pseudoatoms, and it's
# unfortunately very difficult and error-prone to get the correct gids.
# Here we use the msys model to help us to fix the gids in the cms model.
# The fix here is a lot simpler and more robust.
cms_comp_atom_total = model1.comp_atom_total
gid = numpy.ones(cms_comp_atom_total + 1, dtype=int) * numpy.iinfo(int).min
# `gid[0]` is junk and ignored.
match = aid_match(model1)
curr_gid = 0
pseudoatoms = {} # Value = GIDs of pseudoatoms; key = GID of host atom
for aid in range(1, cms_comp_atom_total + 1):
if gid[aid] >= 0:
continue
a = model0.atom(curr_gid)
while a.atomic_number < 1: # skip pseudoatoms
for b in a.bonded_atoms:
pseudoatoms.setdefault(b.id, []).append(a.id)
curr_gid += 1
a = model0.atom(curr_gid)
gid[aid] = curr_gid
matched = match[aid]
if matched:
gid[matched] = curr_gid
curr_gid += 1
while curr_gid < model0.natoms:
a = model0.atom(curr_gid)
if a.atomic_number < 1:
for b in a.bonded_atoms:
pseudoatoms.setdefault(b.id, []).append(a.id)
else:
raise RuntimeError("Inconsistent number of atoms between cms and"
" msys models")
curr_gid += 1
pseudoatoms = {k: sorted(v) for k, v in pseudoatoms.items()}
model1.pseudoatoms = pseudoatoms
# don't let anyone else muck it up since it's directly exposed
gid.flags.writeable = False
model1.gid_map = gid
# Must be called after the `gid' is correctly set for `model1'.
model1.pseudomatch = pseudoatom_match(model0, model1)
# a list that contains all AIDs which also have pseudoatoms.
model1.aids_with_vs = get_aids_with_virtuals(model1)
set_original_index_property(model1)
return model0, model1
[docs]def matched_gids(cms_model, sub_cms, sub_msys):
"""
Return GIDs of `cms_model` that match the GIDs of `sub_cms` and preserve
the ordering, which is essential for trajectory extraction/reduction.
:param cms_model: Original system where `subsystem` was extracted from
:type cms_model: `cms.Cms`
:param sub_cms: A subsystem extracted from `cms_model`. The atoms are
expected to have a property called `i_des_orig_aid`, and the values
should be the AIDs in `cms_model`.
:type sub_cms: `cms.Cms`
:param sub_msys: This object must match `sub_cms`.
:type sub_msys: `msys.System`
@rtype: `list` of `int`
"""
# All this trouble originates from the fact that the AID to GID map is
# neither onto (pseudoatoms), nor 1-to-1 (FEP core atoms).
# Here the (retrospective) strategy is to use the reloaded subsytem which
# contains the desired GID order to figure out the original cms_model's GID
# order for extraction.
sub_gids_count = sub_msys.natoms
# There are three mappings involved:
# subsystem GID
# -> (subsystem AID, idx) map1: onto but not 1-to-1
# -> (original-system AID, idx) map2: 1-to-1 and onto
# -> original-system GID map3: 1-to-1 but most likely not onto
# Here idx is to identify pseudoatoms associtated with the AID.
# There are two situations when map1 is not 1-to-1:
# (Yuqing confirmed that pseudoatoms associated with dummy atoms are not
# mapped, i.e., same GID, different parent AIDs, and different parent GIDs
# does not occur.)
# 1. FEP core physical atoms (same GID, different AIDs)
# 2. FEP core mapped pseudoatoms (same GID, different parent AIDs,
# but same parent GID)
# To make it 1-to-1, we pick either AID. This does not affect the uniqueness
# of the output of map3.
# Another assumption here is that sibling pseudoatoms' GID order does not
# change after extraction.
# FIXME: Use a variable (constant) for 'i_des_orig_aid', no string literal.
sub_gid_2_orig_aid = {
gid: sub_cms.atom[aid].property['i_des_orig_aid']
for aid, gid in enumerate(sub_cms.allaid_gids, start=1)
}
pseudoatom_gid_2_parent_gid = {}
for parent_gid, pseudoatom_gids in sub_cms.pseudoatoms.items():
for idx, gid in enumerate(pseudoatom_gids):
pseudoatom_gid_2_parent_gid[gid] = (cms_model.gid(
sub_gid_2_orig_aid[parent_gid]), idx)
orig_gids = []
for sub_gid in range(sub_gids_count):
try:
orig_gid = cms_model.gid(sub_gid_2_orig_aid[sub_gid])
except KeyError: # sub_gid must be a pseudoatom
orig_parent_gid, idx = pseudoatom_gid_2_parent_gid[sub_gid]
orig_gid = cms_model.pseudoatoms[orig_parent_gid][idx]
orig_gids.append(orig_gid)
return orig_gids
[docs]def comp_atoms(cms_model):
"""
A coroutine to iterate through all atoms in all component CTs of the
`cms_model`.
:type cms_model: `cms.Cms`
"""
for ct in cms_model.comp_ct:
for a in ct.atom:
yield a
[docs]def update_ct_box(ct, box):
"""
Given a `ct`, set the following CT-level properties using the values from
`box`::
"r_chorus_box_ax", "r_chorus_box_ay", "r_chorus_box_az",
"r_chorus_box_bx", "r_chorus_box_by", "r_chorus_box_bz",
"r_chorus_box_cx", "r_chorus_box_cy", "r_chorus_box_cz",
:type box: Any type that supports double subscriptions like `box[0][1]`,
where both indices are zero-based.
:rtype: `structure.Structure`
:return: The same input `ct` object, with its "r_chorus_box_*" CT-level
properties updated.
"""
PREFIX = "r_chorus_box_"
for i, a in enumerate("abc"):
for j, x in enumerate("xyz"):
ct.property[PREFIX + a + x] = box[i][j]
return ct
[docs]def update_ct(ct,
cms_model,
fr,
allaid_gids=None,
is_fullsystem=True,
update_vel=False):
"""
Updates coordinates and simulation-box-matrix of the input `ct` with the
given trajectory frame. Here `cms_model` and `fr` should correspond to the
same simulation, and `ct` could be full system CT, component CT, or other
subsystems extracted from the system. Note GCMC systems with multiple
types of solvent are not supported.
:type ct: `structure.Structure`
:param ct: structure to be updated
:type fr: `traj.Frame` or `DuckFrame`
:type cms_model: `cms.Cms`
:param cms_model: It should be generated by the `read_cms` function.
:param allaid_gids: GIDs of all AIDs
:type allaid_gids: `numpy.ndarray`, or `list` of `int` or `None`. The caller
needs to make sure that its length and order match the
atoms in `ct`. `None` means `cms_model.allaid_gids` is
used.
:param is_fullsystem: whether or not this represents a fullsystem CT.
Knowing this can keep us from doing extra work
:type is_fullsystem: `bool`
:param update_vel: Update the atom velocities.
:type update_vel: `bool`
:rtype: `structure.Structure`
:return: The same input `ct` object, with the atom coordinates and
simulation box matrix updated from the frame.
Example:
fsys_ct = cms_model.fsys_ct.copy()
update_ct(fsys_ct, cms_model, tr[i])
"""
if allaid_gids is not None:
assert len(allaid_gids) == ct.atom_total, \
"Atom index array does not match atom count."
gids = allaid_gids
else:
# Only works for single solvent block with inactive atoms
gids = cms_model.allaid_gids[:ct.atom_total]
if is_fullsystem and fr.natoms == cms_model.comp_atom_total and \
cms_model.ref_ct is None:
# For simulation systems without pesudo-atoms and without FEP, the AIDs
# and the GIDs have one-to-one correspondence. This enables a much
# faster atom coordinate update.
ct.setXYZ(fr.pos()[:ct.atom_total])
else:
if allaid_gids is None and not is_fullsystem:
raise ValueError("allaid_gids must be supplied when ct is not a "
"full system ct")
ct.setXYZ(fr.pos(gids))
update_ct_box(ct, fr.box)
vel = fr.vel()
if update_vel and vel is not None:
allaid_vel = vel[gids]
for a, v in zip(ct.atom, allaid_vel):
set_atom_velocity(a, v)
return ct
[docs]def update_fsys_ct_from_frame_GF(fsys_ct,
cms_model,
fr,
frames_to_smooth=None,
aids_to_smooth=None,
update_vel=False):
"""
This function is the future version of `update_fsys_ct_from_frame` and the
difference is that `fsys_ct` will be inactive-free.
"""
orig_pos = None
# temporally write the smoothed positions into the frame
if frames_to_smooth and len(frames_to_smooth) > 1:
assert fr in frames_to_smooth, \
"The frames for smoothing should contain the frame of interest."
non_solvent_gids = asl2gids(cms_model, 'not solvent', False)
if aids_to_smooth is None:
gids = non_solvent_gids
else:
# FIXME: The input cms_model is not in sync with the input fr.
# It may be an overkill to call cms_model.set_nactive(fr)
# here though. Maybe we just dis-allow averaging on solvent
# and filter them out here?
gids = aids2gids(cms_model, aids_to_smooth, False)
if set(gids) - set(non_solvent_gids):
print(
'WARNING: Solvent atoms are selected for smoothing. It may not work for GCMC trajectories.'
)
if gids:
allpos = numpy.empty([len(frames_to_smooth), len(gids), 3])
for i, fr_s in enumerate(frames_to_smooth):
allpos[i] = fr_s.pos(gids)
pos = fr.pos()
orig_pos = pos[gids]
pos[gids] = allpos.mean(axis=0)
# TODO: Maybe all logic below should go into update_ct() at later point
active_total = cms_model.active_total_from_nactive_gids(
fr.nactive, fr.natoms)
fsys_ct_atom_total = fsys_ct.atom_total
comp_atom_total = cms_model.comp_atom_total
if fsys_ct_atom_total != active_total:
solvent_cts = cms_model.get_solvent_cts()
if len(solvent_cts) > 1:
raise ValueError("Only one gcmc solvent is currently supported")
solvent_ct = solvent_cts[0]
if fsys_ct_atom_total < active_total:
other_atom_total = comp_atom_total - solvent_ct.atom_total
solvent_active_total = active_total - other_atom_total
fsys_ct_atom_total_in_solvent = fsys_ct_atom_total - other_atom_total
solvent_extraction_indices = range(solvent_active_total,
fsys_ct_atom_total_in_solvent,
-1)
extracted = solvent_ct.extract(solvent_extraction_indices,
copy_props=True)
# perform the equivalent of mark_fullsystem_ct in cms.py on new
# atoms only
solvent_ct_type = solvent_ct.property[constants.CT_TYPE]
solvent_ct_index = cms_model.comp_ct.index(solvent_ct) + 1
for atom in extracted.atom:
atom.property[constants.CT_TYPE] = solvent_ct_type
atom.property[constants.CT_INDEX] = solvent_ct_index
fsys_ct.extend(extracted)
elif fsys_ct_atom_total > active_total:
fsys_ct.deleteAtoms(range(fsys_ct_atom_total, active_total, -1))
update_ct(fsys_ct,
cms_model,
fr,
allaid_gids=cms_model.gid_map[1:active_total + 1],
update_vel=update_vel)
if orig_pos is not None: # remove side effect on the input fr
pos[gids] = orig_pos
[docs]def update_fsys_ct_from_frame(fsys_ct,
cms_model,
fr,
frames_to_smooth=None,
aids_to_smooth=None):
"""
Update a full system CT using a frame object, including atom positions,
simulation box, atom-level properties i_des_atom_domain and i_m_visibility.
If inactive atoms are present, they are labelled and set invisible. If
`frames_to_smooth` is given, the smoothed coordinates are used to update
the full system CT's coordinates instead.
:type frames_to_smooth: `list` of `traj.Frame`
:param frames_to_smooth: Frames whose atom coordinates are to be smoothed
to update the atom coordinates in `fsys_ct`. Note
it should contain `fr`.
:type aids_to_smooth: `list` of `int` or `None`
:param aids_to_smooth: AIDs of atoms whose coordinates are to be smoothed.
If `None`, default to non-solvent atoms.
"""
if not cms_model._show_inactive:
update_fsys_ct_from_frame_GF(fsys_ct, cms_model, fr, frames_to_smooth,
aids_to_smooth)
return
orig_pos = None
# temporally write the smoothed positions into the frame
if frames_to_smooth and len(frames_to_smooth) > 1:
assert fr in frames_to_smooth, \
"The frames for smoothing should contain the frame of interest."
if aids_to_smooth is None:
gids = asl2gids(cms_model, 'not solvent', False)
else:
gids = aids2gids(cms_model, aids_to_smooth, False)
if gids:
allpos = numpy.empty([len(frames_to_smooth), len(gids), 3])
for i, fr_s in enumerate(frames_to_smooth):
allpos[i] = fr_s.pos(gids)
pos = fr.pos()
orig_pos = pos[gids]
pos[gids] = allpos.mean(axis=0)
update_ct(fsys_ct, cms_model, fr)
if orig_pos is not None: # remove side effect on the input fr
pos[gids] = orig_pos
# label inactive atoms
if fr.natoms != fr.nactive:
active_total = cms_model.active_total_from_nactive_gids(
fr.nactive, fr.natoms)
first_inactive_aid = active_total + 1
# Only works for single solvent
first_solvent_aid = cms_model.comp_atom_total - \
cms_model.comp_ct[-1].atom_total + 1
for i in range(first_solvent_aid, first_inactive_aid):
fsys_ct.atom[i].property[
constants.
DES_ATOM_DOMAIN] = constants.DES_ATOM_DOMAIN.VAL.SOLVENT
for i in range(first_inactive_aid, cms_model.comp_atom_total + 1):
fsys_ct.atom[i].property[
constants.
DES_ATOM_DOMAIN] = constants.DES_ATOM_DOMAIN.VAL.INACTIVE
# FIXME: this function will become obsolete when inactive-free fsys_ct is default
[docs]def get_active_fsys_ct_from_frame(fsys_ct, cms_model, fr):
"""
Return a `Structure` object that contains only the active atoms from the
given frame `fr`. Note that the atom coordinates in the input `fsys_ct` are
updated with those from the frame. For GCMC simulations, this active full
system CT is extracted from the input full system CT to exclude inactive
(inactive) atoms. A new Structure object will be returned in this case. For
normal MD simulations, the active full system CT is the entire system, and
the updated `fsys_ct` object will be returned. This function has the same
interface as `update_ct`.
:param fsys_ct: full system CT
:type fr: `traj.Frame`
"""
if not cms_model._show_inactive:
update_fsys_ct_from_frame_GF(fsys_ct, cms_model, fr)
return fsys_ct
update_ct(fsys_ct, cms_model, fr)
if fr.natoms != fr.nactive:
cms_model.set_nactive_gids(fr.nactive, fr.natoms)
active_aids = numpy.arange(1, cms_model.active_total + 1)
return fsys_ct.extract(active_aids)
else:
return fsys_ct
[docs]def set_atom_velocity(atom, vel):
"""
This function will set the following atom properties::
r_ffio_x_vel
r_ffio_y_vel
r_ffio_z_vel
with `vel[0]`, `vel[1]`, and `vel[2]`, respectively.
"""
atom.property["r_ffio_x_vel"] = vel[0]
atom.property["r_ffio_y_vel"] = vel[1]
atom.property["r_ffio_z_vel"] = vel[2]
[docs]def update_cms_physical(cms_model, pos, vel, box):
"""
Update the physical data of a cms model. The physical data include all atom
positions and velocities and also the box matrix.
:type cms_model: `cms.Cms`
:param cms_model: The cms model to be updated. This model should be created
by the `read_cms` function, and its atom count and order
should match to the `pos`, `vel` inputs.
:type pos: `numpy.ndarray`. 3xN matrix, where N is number of atoms.
:type vel: `numpy.ndarray`. 3xN matrix, where N is number of atoms.
:type box: `numpy.ndarray`. 3x3 matrix whose ROWS are primitive cell vectors.
"""
allaid_pos = pos[cms_model.allaid_gids]
cms_model.setXYZ(allaid_pos[:cms_model.atom_total])
cms_model._raw_fsys_ct.setXYZ(allaid_pos[:cms_model.atom_total])
start = 0
for ct in cms_model.comp_ct:
end = start + ct.atom_total
ct.setXYZ(allaid_pos[start:end])
start = end
if vel is not None:
atom0 = cms_model.atom
atom1 = cms_model._raw_fsys_ct.atom
atom2 = comp_atoms(cms_model)
allaid_vel = vel[cms_model.allaid_gids]
# zip truncates to smallest length so we needn't worry about mismatch
# between comp_atom_total and atom_total
for a0, a1, a2 in zip(atom0, atom1, atom2):
v = allaid_vel[a0.index - 1]
set_atom_velocity(a0, v)
set_atom_velocity(a1, v)
set_atom_velocity(a2, v)
for ct in [cms_model, cms_model._raw_fsys_ct] + cms_model.comp_ct:
update_ct_box(ct, box)
cms_model.box = box.flatten()
[docs]def check_consistency(cms_model, frame):
"""
Return `None` if `cms_model` and `frame` are consistent, or a `str` object
describing the inconsistency if they are not consistent.
:type cms_model: `cms.Cms`
:param cms_model: It should be generated by the `read_cms` function.
:type frame: `traj.Frame`
"""
all_gids = aids2gids(cms_model,
range(1, cms_model.comp_atom_total + 1),
include_pseudoatoms=True)
# FEP systems can have multiple AIDs sharing the same GID.
num_gids = len(set(all_gids))
if num_gids != frame.natoms:
return f"Number of GIDs is {num_gids} in CMS, whereas {frame.natoms} " \
f"in frame"
[docs]def update_cms(cms_model, frame, update_pseudoatoms=True):
"""
Update the given `cms_model` with the atom coordinates and the simulation
box matrix from a simulation frame `frame`. For GCMC systems, it also
updates the CT-level property 'i_des_active_total'.
N.B.: If you call this function for every frame of a long trajectory, you
might find this function is quite slow. Also keep in mind that its
performance has a strong dependency on the number of atoms in the system.
If all you want is a full-system CT with atom coordinates updated by the
trajectory frame, consider using the `get_active_fsys_ct_from_frame` or
`update_ct` functions above, which are much faster.
The input `cms_model` and `frame` objects should match. When in doubt, call
`check_consistency`.
:type cms_model: `cms.Cms`
:param cms_model: It should be generated by the `read_cms` function.
:type frame: `traj.Frame` or duck type (see `DuckFrame` for the
interface requirements).
:rtype: `structure.Structure`
:return: The same input `cms_model` object, with the atom coordinates and
the simulation box matrix updated from the frame.
"""
# FIXME: Can we make this function run faster?
pos = frame.pos()
vel = frame.vel() # `vel' could be `None'.
box = frame.box
update_cms_physical(cms_model, pos, vel, box)
cms_model.set_nactive_gids(frame.nactive, frame.natoms)
if not update_pseudoatoms:
return cms_model
# Updates the ffio_pseudo block. Damn, it's complicated.
# First, let's figure out number of pseudo atoms in each component CT.
has_vel = vel is not None
pseudo_counts = []
for ct in cms_model.comp_ct:
# Solvent block's ct.ffio.site has only 1 molecule to reduce redundancy.
num_site = len([e for e in ct.ffio.site if e.type != "pseudo"])
num_instance = ct.atom_total // num_site
pseudo_counts.append((len(ct.ffio.site) - num_site) * num_instance)
# Next, let's get the coordinates of all pseudoatoms from the trajectory
# frame. To do this, we first get the gids of all pseudoatoms and then use
# it as an index-array to get the coordinates.
pseudo_gids = sorted(sum(cms_model.pseudoatoms.values(), []))
pseudo_pos = pos[pseudo_gids]
pseudo_vel = has_vel and vel[pseudo_gids]
# Now, update the ffio_pseudo block with `pseudo_pos' and `pseudo_vel'.
# - We have to treat an alchemical FEP system specially because of the
# potential matches of pseudoatoms.
if cms_model.fep_ct:
comp_ct = cms_model.comp_ct
i_ref = comp_ct.index(cms_model.ref_ct)
i_mut = comp_ct.index(cms_model.mut_ct)
mut2ref_pseudomatch = cms_model.pseudomatch[1]
num_matched = len([_f for _f in mut2ref_pseudomatch if _f])
num_unmatched_in_mut_ct = pseudo_counts[i_mut] - num_matched
k = 0
k_mut_ct = None
pseudo_pos_in_ref_ct = ["0th-elem-is-junk"]
pseudo_vel_in_ref_ct = ["0th-elem-is-junk"]
for i, ct in enumerate(cms_model.comp_ct):
if cms_model.fep_ct:
if i == i_ref:
# Only updates the pseudoatoms in `ref_ct'.
for j in range(1, pseudo_counts[i] + 1):
a = ct.ffio.pseudo[j]
a.x_coord = pseudo_pos[k][0]
a.y_coord = pseudo_pos[k][1]
a.z_coord = pseudo_pos[k][2]
if has_vel:
a.x_vel = pseudo_vel[k][0]
a.y_vel = pseudo_vel[k][1]
a.z_vel = pseudo_vel[k][2]
pseudo_vel_in_ref_ct.append(pseudo_vel[k])
pseudo_pos_in_ref_ct.append(pseudo_pos[k])
k += 1
# Now a few elements after the `k'-th are coordinates of the
# pseudoatoms in `mut_ct'. Skip them.
k_mut_ct = k
k += num_unmatched_in_mut_ct
elif i == i_mut:
# We cannot update for `mut_ct' right now. Because its
# pseudoatoms' coordinates might depend on those of the matched
# pseudoatoms in `ref_ct', which may be behind `mut_ct' in the
# component CT list and thus hasn't been updated at this moment.
# We will update `mut_ct' at the end.
continue
else:
for j in range(1, pseudo_counts[i] + 1):
a = ct.ffio.pseudo[j]
a.x_coord = pseudo_pos[k][0]
a.y_coord = pseudo_pos[k][1]
a.z_coord = pseudo_pos[k][2]
if has_vel:
a.x_vel = pseudo_vel[k][0]
a.y_vel = pseudo_vel[k][1]
a.z_vel = pseudo_vel[k][2]
k += 1
if cms_model.fep_ct:
k = k_mut_ct
ct = cms_model.mut_ct
for j in range(1, pseudo_counts[i_mut] + 1):
matched_index = mut2ref_pseudomatch[j]
if matched_index > 0:
pos = pseudo_pos_in_ref_ct[matched_index]
vel = has_vel and pseudo_vel_in_ref_ct[matched_index]
else:
pos = pseudo_pos[k]
vel = has_vel and pseudo_vel[k]
k += 1
a = ct.ffio.pseudo[j]
a.x_coord = pos[0]
a.y_coord = pos[1]
a.z_coord = pos[2]
if has_vel:
a.x_vel = vel[0]
a.y_vel = vel[1]
a.z_vel = vel[2]
return cms_model
[docs]def update_msys(msys_model, frame):
"""
Updates the given msys model `msys_model` with the atom coordinates and
velocities and the simulation box matrix from a simulation frame `frame`.
:type msys_model: `msys.System`
:type frame: `traj.Frame` or duck type (see `DuckFrame` for the
interface requirements).
:rtype: `msys.System`
:return: The same input `msys_model` object, with the atom coordinates,
velocities and the simulation box matrix updated from the frame.
"""
msys_model.setPositions(frame.pos())
msys_model.setVelocities(frame.vel())
msys_model.setCell(frame.box)
msys_model.nactive = frame.nactive
return msys_model
[docs]def get_aids_with_virtuals(cms_model):
"""
:type cms_model: `cms.Cms`
:return: a set of full-system AIDs that have virtual sites attached to them.
:rtype: set of `int`
"""
atom_ids_with_virtuals = {}
for ict, comp_ct in enumerate(cms_model.comp_ct):
component_atom_ids = set()
if len(comp_ct.ffio.virtual) > 0:
# unroll ffio_sites if needed
natom = len(comp_ct.ffio.site) - len(comp_ct.ffio.virtual)
ncomponent = comp_ct.atom_total // natom
for v in comp_ct.ffio.virtual:
for offset in range(ncomponent):
component_atom_ids.add(v.ai + (offset * natom))
atom_ids_with_virtuals[ict] = component_atom_ids
# Full system atom ids that have virtual sites.
aids_with_virtuals = set()
for full_system_idx, component_idx, _, ct_index in cms_atom_index(
cms_model):
if component_idx in atom_ids_with_virtuals[ct_index]:
aids_with_virtuals.add(full_system_idx)
return aids_with_virtuals
[docs]def aids2gids(cms_model, aids, include_pseudoatoms=True):
"""
Convert a list of atom IDs (`aids`) into a list of gids. If any selected
atoms have pseudoatoms associated to them, the pseudoatoms will be included
into the list of gids. It puts all pseudoatoms' GIDs after all physical
atoms' GIDs with arbitrary ordering.
:type cms_model: `cms.Cms`
:param cms_model: A cms model generated by `read_cms` (see above).
:rtype: `list` of `int`
:return: A list of gids of the selected particles
"""
gids = [cms_model.gid(i) for i in aids]
if include_pseudoatoms:
gids_set = set(gids)
g2a = {k: v for k, v in zip(gids, aids)}
# Includes gids of the involved pseudoatoms.
for gid_phys, gid_pseudo in cms_model.pseudoatoms.items():
if gid_phys in gids_set and g2a[gid_phys] in cms_model.aids_with_vs:
gids.extend(gid_pseudo)
return gids
[docs]def asl2gids(cms_model, asl, include_pseudoatoms=True):
"""
Evaluate an ASL expression, and return a list of gids of particles selected
by `asl`. If any selected atoms have pseudoatoms associated to them, the
pseudoatoms will be included into the list of gids.
:type cms_model: `cms.Cms`
:param cms_model: A cms model generated by `read_cms` (see above).
:type asl: `str`
:param asl: An ASL expression
:rtype: `list` of `int`
:return: An unsorted list of gids of the selected particles
"""
aids = cms_model.select_atom(asl)
return aids2gids(cms_model, aids, include_pseudoatoms)
def _glue_action(pfx, gids):
pfx.glue(gids)
def _align_action(pfx, gids, ref_pos=None, weights=None):
pfx.align(gids, coords=ref_pos, weights=weights)
[docs]def make_glued_topology(msys_model, cms_model):
"""
Add glued topology to msys_model, which contains with extra bonds based on
proximity of the solute molecules. It works with `_pfx_apply` to properly
center disjoint solute molecules (e.g., proteins with missing segments,
dimers) The assumption is that the input cms_model has the correct spatial
configurations.
"""
gid_pairs = cms.get_gluepoints(cms_model)
msys_model.glue(gid_pairs)
def _pfx_apply(msys_model, tr, *actions):
"""
Return modified trajectory. Possible actions are `_glue_action` and
`_align_action`. All actions utilize {msys_model.glued_topology}.
"""
pfx = pfx_module.Pfx(msys_model.glued_topology, fixbonds=True)
for a in actions:
a(pfx)
for e in tr:
pfx.apply(e.pos(), e.box, e.vel())
return tr
[docs]def make_whole(msys_model, tr):
"""
In MD simulation, molecules can be broken due to the periodic boundary
condition, which makes some atoms be at one side of the simulation box and
the other atoms at the opposite side. This function will edit the atom
coordinates so to make broken molecules whole again for each frame of the
MD trajectory `tr`.
:type msys_model: msys.System
:param msys_model: The msys model, which must be consistent with the
trajectory in terms of the molecular topology. This
object will not be mutated.
:type tr: `list`
:param tr: The simulation trajectory to be modified
:rtype: `list`
:return: Modified simulation trajectory
"""
return tr and _pfx_apply(msys_model, tr)
[docs]def glue(msys_model, gids, tr):
"""
First, make-whole all molecules in the simulation system, and then glue
the selected molecules together.
:type msys_model: msys.System
:param msys_model: The msys model, which must be consistent with the
trajectory in terms of the molecular topology. This
object will not be mutated.
:type gids: `list`
:param gids: A list of gids to specify the molecules to be glued
:type tr: `list`
:param tr: The simulation trajectory to be modified
:rtype: `list`
:return: Modified simulation trajectory
"""
return tr and _pfx_apply(msys_model, tr,
lambda pfx: _glue_action(pfx, gids))
[docs]def center(msys_model, gids, tr, dims=None):
"""
This function will do what `glue` does, but it will do one more thing:
It will translate the coordinates of all atoms so that the specified
molecules will be placed at the center (origin) of the simulation box.
:type msys_model: msys.System
:param msys_model: The msys model, which must be consistent with the
trajectory in terms of the molecular topology. This
object will not be mutated.
:type gids: `list`
:param gids: A list of gids to specify the molecules to be glued and
centered
:type tr: `list`
:param tr: The simulation trajectory to be modified
:type dim: `None` or `list` of `int`, e.g., [2] for z axis,
[0, 1] for x-y plane. If set to `None`, center on all three
spatial dimensions
:param dim: dimensions to be centered on
:rtype: `list`
:return: Modified simulation trajectory
"""
if dims:
all_pos = [fr.pos() for fr in tr]
dims_to_keep = list({0, 1, 2} - set(dims))
cached = [pos[:, dims_to_keep] for pos in all_pos]
tr and _pfx_apply(msys_model, tr, lambda pfx: _glue_action(pfx, gids),
lambda pfx: _align_action(pfx, gids))
if dims:
for i, pos_dims_to_keep in enumerate(cached):
all_pos[i][:, dims_to_keep] = pos_dims_to_keep
return tr
[docs]def superimpose(msys_model, gids, tr, ref_pos, weights=None):
"""
This function will do what `center` does, but in addition it will align and
superimpose all the coordinates in the frame with respect to a reference
coordinates (ref_pos). This operation will be applied to all frames in the
trajectory.
:type msys_model: msys.System
:param msys_model: The msys model, which must be consistent with the
trajectory in terms of the molecular topology. This
object will not be mutated.
:type gids: `list`
:param gids: A list of gids to specify the molecules to be glued and
centered
:type tr: `list`
:param tr: The simulation trajectory to be modified
:type ref_pos: `numpy.array`. 3xN matrix, where N is length of `gids`.
:param ref_pos: Reference coordinates used for alignments
:type weights: `list` or None
:param weights: Atom weights used for alignments. If it's `None`, all
atoms will be weighted equally.
:rtype: `list`
:return: Modified simulation trajectory
"""
return tr and _pfx_apply(
msys_model, tr, lambda pfx: _glue_action(pfx, gids),
lambda pfx: _align_action(pfx, gids, ref_pos=ref_pos, weights=weights))
[docs]def make_whole_cms(msys_model, cms_model):
"""
Similar to `make_whole` (see above), but on a cms model instead of a
simulation trajectory.
Both `msys_model` and `cms_model` must be previously obtained through the
`read_cms` function. They both should have the same atom coordinates and
the same simulation box matrix.
:rtype: `structure.Structure`
:return: Modified cms model
"""
fr = DuckFrame(msys_model)
_pfx_apply(msys_model, [fr])
update_cms(cms_model, fr)
return cms_model
[docs]def glue_cms(msys_model, gids, cms_model):
"""
Similar to `glue` (see above), but on a cms model instead of a
simulation trajectory.
Both `msys_model` and `cms_model` must be previously obtained through the
`read_cms` function. They both should have the same atom coordinates and
the same simulation box matrix.
:rtype: `structure.Structure`
:return: Modified cms model
"""
fr = DuckFrame(msys_model)
_pfx_apply(msys_model, [fr], lambda pfx: _glue_action(pfx, gids))
update_cms(cms_model, fr)
return cms_model
[docs]def center_cms(msys_model, gids, cms_model, dims=None):
"""
Similar to `center` (see above), but on a cms model instead of a
simulation trajectory.
Both `msys_model` and `cms_model` must be previously obtained through the
`read_cms` function. They both should have the same atom coordinates and
the same simulation box matrix.
:param dim: dimensions to be centered on
:type dim: `None` or `list` of `int`, e.g., [2] for z axis,
[0, 1] for x-y plane. If set to `None`, center on all three
spatial dimensions
:rtype: `structure.Structure`
:return: Modified cms model
"""
fr = DuckFrame(msys_model)
if dims:
pos = fr.pos()
dims_to_keep = list({0, 1, 2} - set(dims))
cached = pos[:, dims_to_keep]
_pfx_apply(msys_model, [fr], lambda pfx: _glue_action(pfx, gids),
lambda pfx: _align_action(pfx, gids))
if dims:
pos[:, dims_to_keep] = cached
update_cms(cms_model, fr)
return cms_model
[docs]def superimpose_cms(msys_model, gids, cms_model, ref_pos, weights=None):
"""
Similar to `superimpose` (see above), but on a cms model instead of a
simulation trajectory.
Both `msys_model` and `cms_model` must be previously obtained through the
`read_cms` function. They both should have the same atom coordinates and
the same simulation box matrix.
:rtype: `structure.Structure`
:return: Modified cms model
"""
fr = DuckFrame(msys_model)
_pfx_apply(
msys_model, [fr], lambda pfx: _glue_action(pfx, gids),
lambda pfx: _align_action(pfx, gids, ref_pos=ref_pos, weights=weights))
update_cms(cms_model, fr)
return cms_model
[docs]def is_dynamic_asl(cms_model, asl) -> bool:
"""
Return True if ASL could evaluate to different results on different frames.
:type cms_model: `cms.Cms`
:type asl: `str`
"""
asl = cms.Cms.META_ASL.get(asl, asl)
mm.mmss_initialize(mm.error_handler)
if mm.mmasl_is_dynamic_expression(asl):
return True
if cms_model.active_total != cms_model.comp_atom_total:
aids = cms_model.select_atom(f'({asl}) and solvent')
return bool(aids)
return False