"""
Utilities for order parameter analysis.
Copyright Schrodinger, LLC. All rights reserved.
"""
import copy
import json
import os
import warnings
from collections import namedtuple
from collections import defaultdict
import numpy
from schrodinger.application.desmond import cms
from schrodinger.application.desmond.packages import analysis
from schrodinger.application.desmond.packages import staf
from schrodinger.application.desmond.packages import topo
from schrodinger.application.desmond.packages import traj
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import msprops
from schrodinger.application.matsci import textlogger
from schrodinger.structutils import analyze
from schrodinger.structutils import transform
from schrodinger.utils import fileutils
# see SHARED-4320 which is an export to excel formatting issue
with warnings.catch_warnings():
warnings.simplefilter('ignore', UserWarning)
import pandas
ORDER_PARAMETER_OUTPUT_FILES_PROP = 's_matsci_order_parameter_output_files'
ORDER_PARAMETER_OUTPUT_FILES_SEP = ','
RESULTS_FILE_EXT = 'csv'
OUT_FILE_TAG = 'op'
PER_MOL_OUT_FILE_TAG = f'per_mol_{OUT_FILE_TAG}'
MOL_TAG = '_mol_'
INDEX_NAME = 'Frame'
Result = namedtuple('Result', ['total_op', 'per_mol_op'])
PerMolOP = namedtuple('PerMolOP', ['mol_numbers', 'mol_ops'])
[docs]def log(msg, **kwargs):
"""
Add a message to the log file
:param str msg: The message to log
Additional keyword arguments are passed to the textlogger.log_msg function
"""
textlogger.log_msg(msg, **kwargs)
[docs]def get_name_w_mol_n(name, mol_n):
"""
Return the name with the molecule number.
:type name: str
:param name: the name
:type mol_n: int
:param mol_n: the molecule number
:rtype: str
:return: the name with the molecule number
"""
return f'{name}{MOL_TAG}{mol_n}'
[docs]def get_name_wo_mol_n(name):
"""
Return the name without the molecule number.
:type name: str
:param name: the name
:rtype: (str, int) or (str, None)
:return: the name without the molecule number and
the molecule number if there is one
"""
if MOL_TAG in name:
name, mol_n = name.split(MOL_TAG)
return name, int(mol_n)
return name, None
[docs]class ReduceVecMixin:
"""
Manages reducing vectors.
"""
[docs] def reduce_vec(self, n, m):
"""
Specify how to reduce the reference director vector, n, and all of the
description vectors, m, into an order parameter.
:type n: numpy.array
:param n: the reference director vector (row vector of length 3)
:type m: numpy.array
:param m: the description vectors (matrix of size N X 3), where N is
the number of molecules
:rtype: float
:return: the order parameter
"""
# molecule numbers can't be cached here because they are dynamic
mol_numbers = getattr(self, '_mol_numbers', set())
if not mol_numbers:
for aid in self._aids:
mol_numbers.add(self._cms_model.atom[aid].getMolecule().number)
mol_numbers = sorted(mol_numbers)
self.per_mol_op = getattr(self, 'per_mol_op', [])
# need to shape the incoming array
n.shape = (1, 3)
mol_ops = [
analysis.reduce_vec(n, numpy.array([mol_m])) / len(m) for mol_m in m
]
self.per_mol_op.append(
PerMolOP(mol_numbers=mol_numbers, mol_ops=mol_ops))
return analysis.reduce_vec(n, m)
[docs]def get_trj_from_cms_file(cms_file):
"""
Return the trajectory from the given .cms file.
:type cms_file: str
:param cms_file: the .cms file (can be a path)
:rtype: list
:return: contains the trajectory as a list of
schrodinger.application.desmond.packages.traj.Frame
"""
path = os.path.split(cms_file)[0]
cms_obj = cms.Cms(file=cms_file)
trj_dir = parserutils.get_trj_dir_name(cms_obj.fsys_ct)
trj_dir = os.path.join(path, trj_dir)
return traj.read_traj(trj_dir)
[docs]class AslException(Exception):
pass
[docs]class SmartsException(Exception):
pass
[docs]class DipoleDirector(analysis.DipoleDirector, ReduceVecMixin):
"""
Manage a dipole director.
"""
def _dyninit(self):
"""
A method that is automatically called after the atomic ids are
updated and allows for dynamically redefining geometry
calculations (per frame) for cases where the ASL is geometry dependent,
i.e. contains a within or beyond argument.
:raise AslException: if there is an issue with the ASL
"""
if not set(self._aids):
msg = ('The given ASL of {asl} does not match any atoms.')
raise AslException(msg.format(asl=self._asl))
super()._dyninit()
[docs]class MomentOfInertiaDirector(analysis.MomentOfInertiaDirector, ReduceVecMixin):
"""
Manage a moment of inertia director.
"""
def _dyninit(self):
"""
A method that is automatically called after the atomic ids are
updated and allows for dynamically redefining geometry
calculations (per frame) for cases where the ASL is geometry dependent,
i.e. contains a within or beyond argument.
:raise AslException: if there is an issue with the ASL
"""
if not set(self._aids):
msg = ('The given ASL of {asl} does not match any atoms.')
raise AslException(msg.format(asl=self._asl))
super()._dyninit()
[docs]class UniqueSmartsDirector(analysis.SmartsDirector, ReduceVecMixin):
"""
Manage a unique SMARTS director.
"""
[docs] def __init__(self, msys_model, cms_model, asl, smarts):
"""
Create an instance.
:type msys_model: Desmond msys .System
:param msys_model: the msys object (from msys.LoadMAE)
:type cms_model: schrodinger.application.desmond.cms.Cms
:param cms_model: the cms object
:type asl: str
:param asl: the ASL for the descriptor
:type smarts: str
:param smarts: the SMARTS for the descriptor
"""
self._smarts = smarts
# note the following atom_pairs is a dict with completely arbitrary ordering
self._atom_pairs = analyze.evaluate_smarts_by_molecule(
cms_model.fsys_ct,
self._smarts,
uniqueFilter=True,
matches_by_mol=True)
staf.CompositeDynamicAslAnalyzer.__init__(self, msys_model, cms_model,
asl)
def _dyninit(self):
"""
A method that is automatically called after the atomic ids are
updated and allows for dynamically redefining geometry
calculations (per frame) for cases where the ASL is geometry dependent,
i.e. contains a within or beyond argument.
:raise AslException: if there is an issue with the ASL
:raise SmartsException: if there is an issue with the SMARTS
"""
if not set(self._aids):
msg = ('The given ASL of {asl} does not match any atoms.')
raise AslException(msg.format(asl=self._asl))
# use Canvas SMARTS even though it is slower than mmpatty SMARTS,
# because it supports component-level SMARTS, i.e. containing '.',
# as here we need to support matching nonbonded pairs within the same
# molecule
#
# note that in some applications the actual direction of the descriptor
# could matter little as the individual order parameter values are
# meaningless (only relative order parameters have meaning), in other
# applications it is expected that individual order parameters have
# meaning, for example as in 1.0 pointing along the reference director,
# 0.0 pointing orthogonal to it, and -1.0 pointing anti to it, the
# uniquification here ensure this
self._analyzers = []
self._mol_numbers = []
aids = set(self._aids)
fsys_ct = self._cms_model.fsys_ct
if not self._atom_pairs:
msg = ('The given SMARTS of {smarts} does not match any atoms.')
raise SmartsException(msg.format(smarts=self._smarts))
self._match_len = len(list(self._atom_pairs.values())[0][0])
if self._match_len not in [2, 3]:
msg = (
'The given SMARTS of {smarts} is not for pairs or triples of atoms.'
)
raise SmartsException(msg.format(smarts=self._smarts))
unique_match_found = False
for matches in self._atom_pairs.values():
# in order to unambiguously reduce the description vector only a single
# match per molecule is allowed
if len(matches) != 1:
continue
else:
unique_match_found = True
match = matches[0]
if set(match).issubset(aids):
root = match[0]
analyzers = [
analysis.Vector(self._msys_model, self._cms_model, root,
match[1])
]
if self._match_len == 3:
analyzers.append(
analysis.Vector(self._msys_model, self._cms_model, root,
match[2]))
self._analyzers.extend(analyzers)
self._mol_numbers.append(
fsys_ct.atom[root].getMolecule().number)
if not self._analyzers:
if not unique_match_found:
msg = (
'The given SMARTS of {smarts} does not uniquely match any of '
'the molecules.')
raise SmartsException(msg.format(smarts=self._smarts))
else:
msg = ('The given SMARTS of {smarts} does not overlap with the '
'given ASL of {asl}.')
raise SmartsException(
msg.format(smarts=self._smarts, asl=self._asl))
[docs] def reduce_vec(self, n, m):
"""
Specify how to reduce the reference director vector, n, and all of the
description vectors, m, into an order parameter.
:type n: numpy.array
:param n: the reference director vector (row vector of length 3)
:type m: numpy.array
:param m: the description vectors (matrix of size N X 3), where N is
the number of molecules
:rtype: float
:return: the order parameter
"""
# m has dimensions of the number of molecules (matches) or twice that depending
# on the match length
if self._match_len == 3:
new_m = []
for idx in range(0, len(m), 2):
normal = transform.get_normalized_vector(
numpy.cross(m[idx], m[idx + 1]))
new_m.append(normal)
m = numpy.array(new_m)
return ReduceVecMixin.reduce_vec(self, n, m)
[docs]class SmartsDirector(analysis.SmartsDirector, ReduceVecMixin):
"""
Manage a SMARTS director.
"""
def _dyninit(self):
"""
A method that is automatically called after the atomic ids are
updated and allows for dynamically redefining geometry
calculations (per frame) for cases where the ASL is geometry dependent,
i.e. contains a within or beyond argument.
:raise AslException: if there is an issue with the ASL
:raise SmartsException: if there is an issue with the SMARTS
"""
if not set(self._aids):
msg = ('The given ASL of {asl} does not match any atoms.')
raise AslException(msg.format(asl=self._asl))
self._analyzers = []
self._mol_numbers = []
aids = set(self._aids)
fsys_ct = self._cms_model.fsys_ct
if not self._atom_pairs:
msg = ('The given SMARTS of {smarts} does not match any atoms.')
raise SmartsException(msg.format(smarts=self._smarts))
match_len = len(self._atom_pairs[0])
if match_len != 2:
msg = ('The given SMARTS of {smarts} is not for bonding pairs '
'of atoms.')
raise SmartsException(msg.format(smarts=self._smarts))
for a1, a2 in self._atom_pairs:
if a1 in aids and a2 in aids:
self._analyzers.append(
analysis.Vector(self._msys_model, self._cms_model, a1, a2))
self._mol_numbers.append(fsys_ct.atom[a1].getMolecule().number)
[docs]class Descriptor:
"""
Manage a descriptor.
"""
DIPOLE = 'dipole'
MOMENT_OF_INERTIA = 'moment_of_inertia'
SMARTS_NONUNIQUE_BONDS = 'SMARTS_nonunique_bonds'
SMARTS_UNIQUE_PAIR = 'SMARTS_unique_pair'
SMARTS_UNIQUE_TRIPLE_NORMAL = 'SMARTS_unique_triple_normal'
TYPES_TO_CLASSES = {
DIPOLE: DipoleDirector,
MOMENT_OF_INERTIA: MomentOfInertiaDirector,
SMARTS_NONUNIQUE_BONDS: SmartsDirector,
SMARTS_UNIQUE_PAIR: UniqueSmartsDirector,
SMARTS_UNIQUE_TRIPLE_NORMAL: UniqueSmartsDirector
}
_SMARTS_MATCH_DICT = {
SMARTS_NONUNIQUE_BONDS: 2,
SMARTS_UNIQUE_PAIR: 2,
SMARTS_UNIQUE_TRIPLE_NORMAL: 3
}
GROUP_KWARG = 'group'
ASL_KWARG = 'asl'
ATYPE_KWARG = 'atype'
SMARTS_KWARG = 'smarts'
[docs] def __init__(self, name, group=None, asl=None, atype=None, smarts=None):
"""
Create an instance.
:type name: str
:param name: the name of the descriptor
:type group: str or None
:param group: the group of the descriptor or
None if there isn't one
:type asl: str
:param asl: the ASL for the descriptor or
None if there isn't one
:type atype: str
:param atype: the type of descriptor or
None if there isn't one
:type smarts: str or None
:param smarts: the SMARTS for the descriptor or
None if there isn't one
"""
self.name = name
self.group = group
self.asl = asl
self.atype = atype
self.smarts = smarts
self.descriptor = None
[docs] def getDescriptor(self, msys_obj, cms_obj):
"""
Get the descriptor.
:type msys_obj: Desmond msys .System
:param msys_obj: the msys object (from msys.LoadMAE)
:type cms_obj: schrodinger.application.desmond.cms.Cms
:param cms_obj: the cms object
:rtype: schrodinger.application.desmond.packages.staf.CompositeDynamicAslAnalyzer
:return: descriptor subclasses of the given type
"""
aclass = self.TYPES_TO_CLASSES[self.atype]
args = (msys_obj, cms_obj, self.asl)
if self.atype in self._SMARTS_MATCH_DICT:
args += (self.smarts,)
self.descriptor = aclass(*args)
return self.descriptor
[docs] def getFileName(self, basename):
"""
Return a file name for this descriptor using the given base name.
:type basename: str
:param basename: base name to use in naming the file
:rtype: str
:return: the descriptor file name
"""
return '.'.join([basename, OUT_FILE_TAG, self.group, RESULTS_FILE_EXT])
[docs] def getPerMolFileName(self, basename):
"""
Return a per molecule file name for this descriptor using the given
base name.
:type basename: str
:param basename: base name to use in naming the file
:rtype: str
:return: the descriptor per molecule file name
"""
return '.'.join(
[basename, PER_MOL_OUT_FILE_TAG, self.group, RESULTS_FILE_EXT])
[docs]def get_descriptors_from_file(descriptors_file):
"""
Return a list of descriptors from the given
descriptors file.
:type descriptors_file: str
:param descriptors_file: .json file containing specifications
for descriptors, i.e. ways to determine vectors used in
computing the order parameters with respect to the director,
a specification includes information like name, group, ASL,
type, and SMARTS (can be a path)
:rtype: list
:return: contains Descriptor
"""
# descriptor files are formatted like
#
# {
# "some_name_1":{
# "group":"some_group",
# "asl":"some_ASL",
# "atype":"some_type",
# "smarts":"some_SMARTS'
# },
# "some_name_2": ...
# }
with open(descriptors_file, 'r') as afile:
adict = json.load(afile, object_pairs_hook=dict)
return [Descriptor(name=name, **kwargs) for name, kwargs in adict.items()]
[docs]class Director(staf.GeomAnalyzerBase):
"""
Manage a director.
"""
[docs] def __init__(self, vec):
"""
Create an instance.
:type vec: numpy.array
:param vec: the director vector
"""
self._result = vec
[docs]class OrderParameter:
"""
Manage order parameter analysis.
"""
[docs] def __init__(self,
cms_file,
director_abc_coeffs,
descriptors_file,
logger=None):
"""
Create an instance.
:type cms_file: str
:param cms_file: the .cms file for the simulation on which
to run the order parameter analysis (can be a path)
:type director_abc_coeffs: tuple
:param director_abc_coeffs: coefficients of the static reference
director vector in the lattice vector basis, for example (0, 0, 1)
for the c-lattice vector or z-axis of a cubic cell
:type descriptors_file: str
:param descriptors_file: .json file containing specifications
for descriptors, i.e. ways to determine vectors used in
computing the order parameters with respect to the director,
a specification includes information like name, group, ASL,
type, and SMARTS (can be a path) (see get_descriptors_from_file
for more information)
:type logger: logging.Logger or None
:param logger: output logger or None if there isn't one
"""
self.cms_file = cms_file
self.director_abc_coeffs = director_abc_coeffs
self.descriptors_file = descriptors_file
self.logger = logger
self.basename = jobutils.get_jobname(
fileutils.get_basename(self.cms_file))
self.msys_obj = None
self.cms_obj = None
self.director = None
self.trajectory = None
self.descriptors = None
self.results = None
[docs] @staticmethod
def getDirector(cms_obj, director_abc_coeffs):
"""
Return the unit director vector object for the given cms
(in Angstrom).
:type cms_obj: schrodinger.application.desmond.cms.Cms
:param cms_obj: the cms object
:type director_abc_coeffs: tuple
:param director_abc_coeffs: coefficients of the static reference
director vector in the lattice vector basis, for example (0, 0, 1)
for the c-lattice vector or z-axis of a cubic cell
:rtype: Director
:return: the unit director vector object
"""
chorus = cms.get_box(cms_obj.fsys_ct)
step = 3
vecs = [
numpy.array(chorus[i:i + step])
for i in range(0, len(chorus), step)
]
director = numpy.zeros(3)
for coeff, vec in zip(director_abc_coeffs, vecs):
director += coeff * vec
return Director(transform.get_normalized_vector(director))
def _getResults(self):
"""
Return the order parameter results.
:rtype: list[Result]
:return: contains a Result for each descriptor
"""
# order parameters are calculated using the following equation
#
# S_k = (1/N) sum_i=1^N [3(M dot m_i^k)^2 - 1]/2
#
# where k is frame index, N is the number of descriptor vectors (typically
# the number of molecules if there is one descriptor per molecule), M is
# the static director reference vector, for example like the z-axis, m is
# a descriptor vector, for example dipole, head-to-tail vector, etc., S is
# the order parameter in units of Angstrom^4
analyses = []
for descriptor in self.descriptors:
d = descriptor.getDescriptor(self.msys_obj, self.cms_obj)
analyses.append(
analysis.OrderParameter(self.director, d, d.reduce_vec))
results = analysis.analyze(self.trajectory, *analyses)
if len(analyses) == 1:
results = [results]
all_results = []
for result, _analysis in zip(results, analyses):
all_results.append(
Result(total_op=result, per_mol_op=_analysis._vec2.per_mol_op))
return all_results
def _writeResultFiles(self):
"""
Write the result files.
:rtype: list
:return: contains the file names of the written files
"""
# there two types of files, those with total order parameters
# per frame and those with molecular order parameters per frame,
# each can have multiple types of order parameters, collect the
# types of order parameters and their total or molecular values
# per file name in order to prepare csv output files
results_by_file = defaultdict(list)
for descriptor, result in zip(self.descriptors, self.results):
# total
file_name = descriptor.getFileName(self.basename)
results_by_file[file_name].append(
(descriptor.name, result.total_op))
# molecular
file_name = descriptor.getPerMolFileName(self.basename)
results_by_mol_number = defaultdict(list)
for per_mol_op in result.per_mol_op:
for mol_number, mol_op in zip(per_mol_op.mol_numbers,
per_mol_op.mol_ops):
results_by_mol_number[mol_number].append(mol_op)
for mol_number, mol_ops in results_by_mol_number.items():
results_by_file[file_name].append(
(get_name_w_mol_n(descriptor.name, mol_number), mol_ops))
for afile, names_results in results_by_file.items():
names, results = list(zip(*names_results))
results = list(zip(*results))
df = pandas.DataFrame.from_records(data=results, columns=names)
df.index += 1
df.index.name = INDEX_NAME
df.to_csv(afile)
jobutils.add_outfile_to_backend(afile)
return list(results_by_file)
def _writeOutputCMS(self, files):
"""
Write the output cms file.
:type files: list
:param files: contains the file names of the order parameter
result files
"""
cms_obj = copy.copy(self.cms_obj)
for prop in (msprops.ORIGINAL_CMS_PROP, msprops.TRAJECTORY_FILE_PROP):
cms_obj.remove_cts_property(prop)
files_str = ORDER_PARAMETER_OUTPUT_FILES_SEP.join(files)
cms_obj.set_cts_property(ORDER_PARAMETER_OUTPUT_FILES_PROP, files_str)
jobutils.set_source_path(cms_obj)
cms_out_file = '.'.join([self.basename, OUT_FILE_TAG, 'cms'])
jobutils.write_cms_with_wam(
cms_obj,
cms_out_file,
wam_type=jobutils.WAM_TYPES.MS_ORDER_PARAMETER)
jobutils.add_outfile_to_backend(cms_out_file, set_structure_output=True)
[docs] def run(self):
"""
Run the order parameter analysis.
"""
self.msys_obj, self.cms_obj = topo.read_cms(self.cms_file)
self.director = OrderParameter.getDirector(self.cms_obj,
self.director_abc_coeffs)
self.trajectory = get_trj_from_cms_file(self.cms_file)
self.descriptors = get_descriptors_from_file(self.descriptors_file)
try:
self.results = self._getResults()
except (AslException, SmartsException) as err:
if self.logger:
self.logger.error(str(err))
raise
files = self._writeResultFiles()
self._writeOutputCMS(files)
log('All finished', timestamp=True, pad=True, logger=self.logger)