import copy
import os
import re
import shutil
import tempfile
from collections import defaultdict
from collections import namedtuple
from contextlib import contextmanager
from itertools import combinations
from pathlib import Path
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
import numpy
import schrodinger.job.jobcontrol as jobcontrol
import schrodinger.utils.fileutils as fileutils
from schrodinger import gpgpu
from schrodinger.structutils.analyze import validate_asl
from schrodinger.application.desmond import config
from schrodinger.application.desmond import constants
from schrodinger.application.desmond.cms import AtomGroup
from schrodinger.application.desmond.cms import Cms
from schrodinger.application.desmond.packages import traj
from schrodinger.infra import licensing
from schrodinger.utils import log
from schrodinger.utils import sea
logger = log.get_output_logger("energygroup")
P_TENSOR_PATTERN = r"(Pressure_Tensor)"
SIM_BOX_PATTERN = r"(Simulation_Box)"
[docs]class EnergyGroupError(Exception):
pass
class _Const:
DEFAULT_P = 1.01325
DEFAULT_T = 300.
CAN_USE_GPU_DESMOND = licensing.licenseExists(
licensing.DESMOND_GPGPU) and gpgpu.is_any_gpu_available()
[docs]class EnergyGroupBase:
"""
Its subclasses work with `EnergyFacade` to perform energy group calculations.
The subclass instance should have the following public attributes:
- `key`: immutable and iterable. It is used as the key to retrieve
calculation result from cache in `EnergyFacade`
- `kwargs`: `dict`. It is the keyword arguments for `get_energies`.
Its roles include:
- store and validate the parameters
- retrieve result from the cache in `EnergyFacade`
"""
DEFAULT_OPTIONS = {
constants.ENERGY_GROUPS.PRESSURE_TENSOR,
constants.ENERGY_GROUPS.CORR_ENERGY
}
[docs] def getResult(self, result):
"""
Retrieve and format the result from the cache in `EnergyFacade`
"""
raise NotImplementedError # pragma: no cover
[docs]class MoleculeEnergy(EnergyGroupBase):
"""
Analyzer for energy group calculation with single molecule selection. The
pre-defined energy groups are (see COMPONENT_LIST)
1. All atoms - group 2, 3, 4, 5
2. Solute atoms - group 5
3. Membrane atoms - group 5
4. Solvent atoms - group 5
5. Selected molecule
"""
TYPE_MAP = {
'Coulomb': 'elec',
'vdW': 'vdw',
'Bond': 'stretch',
'Angle': 'angle',
'Torsion': 'dihedral',
'Total': 'Total'
}
COMPONENT_LIST = ['Other', 'Solute', 'Membrane', 'Solvent', 'Self']
[docs] def __init__(self, cms_model, molecule_number, type, type2):
"""
:type cms_model: `Cms`
:param molecule_number: index of the molecule
:type molecule_number: `int`
:param type: pre-defined energy type, see TYPE_MAP
:type type: `str`
:param type2: pre-defined atom group, see COMPONENT_LIST
:type type2: `str`
"""
self.key = (molecule_number,)
aids = set(cms_model.select_atom('mol.num %d' % molecule_number))
if not aids:
raise EnergyGroupError(f"Molecule {molecule_number} doesn't exist!")
solvent_aids = set(cms_model.select_atom('solvent')) - aids
solute_aids = set(cms_model.select_atom('solute')) - aids
membrane_aids = set(cms_model.select_atom('membrane')) - aids
other_aids = set(cms_model.select_atom('all')) - aids - solvent_aids - \
solute_aids - membrane_aids
groups = [other_aids, solute_aids, membrane_aids, solvent_aids, aids]
self.kwargs = {'options': self.DEFAULT_OPTIONS, 'groups': groups}
try:
self._idx = frozenset([4, self.COMPONENT_LIST.index(type2)])
self._attr = self.TYPE_MAP[type]
except KeyError:
raise EnergyGroupError('Energy type is not valid: %s.' % type)
except ValueError:
raise EnergyGroupError('Component name is not valid: %s.' % type2)
[docs] def getResult(self, result) -> List[float]:
"""
:return: frame-by-frame result of the `type` energy (see TYPE_MAP)
of the atom group `type2` (see COMPONENT_LIST)
"""
return [getattr(c[self._idx], self._attr) for c in result]
[docs]class SelectionEnergyMatrix(EnergyGroupBase):
"""
Analyzer for energy group matrix calculation (all self and cross terms). The
pre-defined energy groups are calculated for the specified ASL selection.
"""
[docs] def __init__(self, cms_model: Cms, asl_selections: Optional[List] = None):
"""
:param asl_selection: List of ASL srings to include in the energy group
calculation.
"""
self.key = frozenset(['MATRIX'])
if not asl_selections or not isinstance(asl_selections, list):
raise EnergyGroupError(
'ERROR: SelectionEnergyMatrix `asl_selection` cannot be empty '
'and must be iterable.')
for asl in asl_selections:
if asl is None or not validate_asl(asl):
raise EnergyGroupError(
'ERROR: SelectionEnergyMatrix analyzer cannot validate'
f'an ASL: {asl}')
self.asl_selections = asl_selections
self.groups = [
set(cms_model.select_atom(asl)) for asl in asl_selections
]
ngroups = len(self.groups)
if not any(self.groups) or ngroups > 999:
raise EnergyGroupError(
'ERROR: SelectionEnergyMatrix analyzer cannot be empty or '
f'supports upto 999 energy groups: {ngroups} energy groups '
'provided.')
self._self_groups = [frozenset([i]) for i in range(ngroups)]
self._self_group_keys = [f'sel_{grp:03}' for grp in range(ngroups)]
self._cross_groups = [
frozenset(c) for c in combinations(range(ngroups), 2)
]
self._cross_group_keys = [
f'sel_{g0:03}-sel_{g1:03}'
for g0, g1 in combinations(range(ngroups), 2)
]
self.kwargs = {'options': self.DEFAULT_OPTIONS, 'groups': self.groups}
[docs] def getResult(self, result) -> Dict[str, Dict[str, float]]:
"""
:return: frame-by-frame result all self and cross energy terms
"""
# self terms
self_angle = defaultdict(list)
self_stretch = defaultdict(list)
self_dihed = defaultdict(list)
self_elec = defaultdict(list)
self_vdw = defaultdict(list)
# cross terms
cross_angle = defaultdict(list)
cross_stretch = defaultdict(list)
cross_dihed = defaultdict(list)
cross_elec = defaultdict(list)
cross_vdw = defaultdict(list)
for frame in result:
for key, grp in zip(self._self_group_keys, self._self_groups):
self_angle[key].append(frame[grp].angle)
self_stretch[key].append(frame[grp].stretch)
self_dihed[key].append(frame[grp].dihedral)
self_elec[key].append(frame[grp].elec)
self_vdw[key].append(frame[grp].vdw)
for key, grp in zip(self._cross_group_keys, self._cross_groups):
cross_angle[key].append(frame[grp].angle)
cross_stretch[key].append(frame[grp].stretch)
cross_dihed[key].append(frame[grp].dihedral)
cross_elec[key].append(frame[grp].elec)
cross_vdw[key].append(frame[grp].vdw)
res = {
'self_ene': {
'angle': self_angle,
'stretch': self_stretch,
'dihedral': self_dihed,
'elec': self_elec,
'vdw': self_vdw,
},
'cross_ene': {
'elec': cross_elec,
'vdw': cross_vdw,
'angle': cross_angle,
'stretch': cross_stretch,
'dihedral': cross_dihed,
},
}
return {'Result': res}
[docs]class MoleculeEnergyMatrix(EnergyGroupBase):
"""
Analyzer for energy group matrix calculation (all self and cross terms). The
pre-defined energy groups are calculated for the specified molecules. If
molecules are not specified then all molecules will be used.
"""
[docs] def __init__(self, cms_model: Cms, molecule_numbers: Optional[List] = None):
"""
:param molecule_numbers: Set of molecules to include in the energy group
calculation. If `None` or empty, all molecules will be
used.
"""
self.key = frozenset(['MATRIX'])
self.mols = molecule_numbers or range(1, cms_model.mol_total + 1)
ngroups = len(self.mols)
if ngroups > 999:
raise EnergyGroupError(
'ERROR: MoleculeEnergyMatrix analyzer '
f'supports upto 999 energy groups: {ngroups} energy groups '
'provided.')
_grps = range(ngroups)
self._self_dict = {frozenset([m]): self.mols[m] for m in _grps}
self._cross_dict = {
frozenset([m0, m1]): (self.mols[m0], self.mols[m1])
for (m0, m1) in combinations(_grps, 2)
}
self.groups = [
set(cms_model.molecule[m].getAtomIndices()) for m in self.mols
]
self.kwargs = {'options': self.DEFAULT_OPTIONS, 'groups': self.groups}
[docs] def getResult(self, result) -> Dict[str, Dict[str, float]]:
"""
:return: frame-by-frame result all self and cross energy terms
"""
self_angle = defaultdict(list)
self_stretch = defaultdict(list)
self_dihed = defaultdict(list)
self_elec = defaultdict(list)
self_vdw = defaultdict(list)
cross_elec = defaultdict(list)
cross_vdw = defaultdict(list)
for frame in result:
for mol_idx, mol_num in self._self_dict.items():
key = f'mol_{mol_num:03}'
self_angle[key].append(frame[mol_idx].angle)
self_stretch[key].append(frame[mol_idx].stretch)
self_dihed[key].append(frame[mol_idx].dihedral)
self_elec[key].append(frame[mol_idx].elec)
self_vdw[key].append(frame[mol_idx].vdw)
for mol_idx_pair, mol_pair in self._cross_dict.items():
key = f'mol_{mol_pair[0]:03}-mol_{mol_pair[1]:03}'
cross_elec[key].append(frame[mol_idx_pair].elec)
cross_vdw[key].append(frame[mol_idx_pair].vdw)
res = {
'self_ene': {
'angle': self_angle,
'stretch': self_stretch,
'dihedral': self_dihed,
'elec': self_elec,
'vdw': self_vdw,
},
'cross_ene': {
'elec': cross_elec,
'vdw': cross_vdw
},
}
return {'Result': res}
[docs]class InteractionEnergy(EnergyGroupBase):
"""
Analyzer for interaction energy group calculation.
"""
[docs] def __init__(self, cms_model, asl1, asl2):
"""
:type cms_model: `Cms`
:param asl1: atom selection for group 1
:param asl2: atom selection for group 2
:type asl1: `str`
:type asl2: `str`
"""
self.key = (asl1, asl2)
groups = [set(cms_model.select_atom(asl)) for asl in self.key]
self.kwargs = {'options': self.DEFAULT_OPTIONS, 'groups': groups}
self._idx = frozenset([0, 1])
[docs] def getResult(self, result) -> List[Tuple[float, float]]:
"""
:return: frame-by-frame result of the electric and van der Walls
energies (both non-bonded and pair) between the two groups
"""
return [(getattr(c[self._idx], 'elec'), getattr(c[self._idx], 'vdw'))
for c in result]
def _get_ensemble(cfg_obj):
"""
Extract ensemble type from the original cfg block
:type cfg_obj: `sea.Map`
:param cfg_obj: 'ORIG_CFG' block as ark object
:rtype: `str`
"""
# the ensemble tag may be missing from the ORIG_CFG block
cfg = copy.deepcopy(config.DEFAULT_SETTING.MD)
cfg.update(cfg_obj)
cfg = config.canonicalize(cfg)
return cfg['ensemble']['class'].val
def _get_cfg_prop(cfg_obj, key, default):
"""
Extract target simulation condition from the original cfg block. If not
present, use `default` value.
:type key: `str`
:type cfg_obj: `sea.Map`
:param cfg_obj: 'ORIG_CFG' block as ark object
:rtype: `float`
"""
def extract(t):
if isinstance(t, list):
t = extract(t[0])
return float(t)
t = default
if key in cfg_obj:
t = extract(cfg_obj[key].val)
return t
[docs]class Bulk(EnergyGroupBase):
"""
Analyzer for material science energy group calculation
"""
OPTIONS = {
"TOTAL_E",
"INTRA_E", # these two are not used
"VOLUME",
"DENSITY",
"PRESSURE_TENSOR",
"SPECIFIC_HEAT",
"COHES_E",
"SOL_PARAM",
"HEAT_VAP"
}
ENERGY_TERM = r"(far_terms|nonbonded_elec|nonbonded_vdw|Total)"
FLOATING_NUMBER = r"([-+]?\b(?:[0-9]*\.)?[0-9]+(?:[eE][-+]?[0-9]+)?\b)"
E_PATTERN = r"%s\s+\(%s\)" % (ENERGY_TERM, FLOATING_NUMBER)
[docs] def __init__(self, cms_model, type, last_n_percent=70):
"""
@param last_n_percent: Only use `last_n_percent` of the trajectory
to compute specific heat
"""
self.key = frozenset([t.upper() for t in type])
invalid_opt = self.key - self.OPTIONS
if invalid_opt:
raise EnergyGroupError('Invalid options: %s' % invalid_opt)
if {'HEAT_VAP', 'COHES_E', 'SOL_PARAM'}.intersection(self.key):
# make each molecule an energy group
groups = [set(mol.getAtomIndices()) for mol in cms_model.molecule]
else: # one group for all molecules
groups = [set(cms_model.getAtomIndices())]
self._has_vol = bool(
{'DENSITY', 'VOLUME', 'SPECIFIC_HEAT',
'SOL_PARAM'}.intersection(self.key))
self._has_p_tensor = 'PRESSURE_TENSOR' in self.key
self._has_density = 'DENSITY' in self.key
self._has_c = 'SPECIFIC_HEAT' in self.key
if self._has_c:
self._num_atom = cms_model.atom_total
if self._has_density or self._has_c:
self._tot_weight = cms_model.total_weight
options = (self.DEFAULT_OPTIONS |
{constants.ENERGY_GROUPS.SELF_ENERGY_ONLY})
self.kwargs = {
'options': options,
'groups': groups,
'parse': self.parseEnergies,
}
self._times = []
self._tot_energies = []
self._vdw_energies = []
self._ele_energies = []
self._num_mol = cms_model.mol_total
self._is_gcmc = cms_model.is_for_gcmc
self._last_n_percent = last_n_percent
if 'SPECIFIC_HEAT' in self.key and self._is_gcmc:
raise EnergyGroupError('Cannot perform specific heat calculation '
'for GCMC simulations.')
[docs] def getResult(self, result):
return result
[docs] def getCfgParams(self, cfg_file):
"""
:param cfg_file: path to the simulation configuration file
"""
cfg = _read_orig_cfg(cfg_file)
self._ensemble = _get_ensemble(cfg)
if self._is_gcmc and self._ensemble not in {'NVT', 'NPT'}:
raise EnergyGroupError(
'Expecting ensemble=NPT or ensemble=NVT for'
f' GCMC simulations, but got ensemble={self._ensemble}.')
self._target_pressure = None
if self._ensemble == 'NPT':
self._target_pressure = _get_cfg_prop(cfg, 'pressure',
_Const.DEFAULT_P)
self._target_temperature = _get_cfg_prop(cfg, 'temperature',
_Const.DEFAULT_T)
def _reduceEnergies(self, energy_series, num_groups):
"""
To save memory, reduce all pairwise energies to total interaction energy,
total vdw interaction energy, and total electric interaction energy
"""
time, ene_pairs = energy_series.popitem()
tot_intra_e = vdw_intra_e = ele_intra_e = 0.
for i in range(num_groups):
e = ene_pairs[frozenset([i])]
tot_intra_e += e.Total
vdw_intra_e += getattr(e, 'nonbonded_vdw', 0)
ele_intra_e += getattr(e, 'far_terms', 0) + getattr(
e, 'nonbonded_elec', 0)
all_e = ene_pairs['all']
self._times.append(time)
# Here all interaction energies should be negative for stable configurations
self._tot_energies.append(abs(all_e.Total - tot_intra_e))
self._vdw_energies.append(
abs(getattr(all_e, 'nonbonded_vdw', 0) - vdw_intra_e))
self._ele_energies.append(
abs(
getattr(all_e, 'far_terms', 0) +
getattr(all_e, 'nonbonded_elec', 0) - ele_intra_e))
def _reduceSelfEnergies(self, energy_series, num_groups):
"""
Retrieve total interaction energy, total vdw interaction energy, and
total electric interaction energy, when the self_energy_only option is set.
"""
time, inter_mol_e = energy_series.popitem()
self._times.append(time)
# Here all interaction energies should be negative for stable configurations
vdw_e = getattr(inter_mol_e, 'nonbonded_vdw', 0)
ele_e = getattr(inter_mol_e, 'far_terms', 0) + getattr(
inter_mol_e, 'nonbonded_elec', 0)
self._tot_energies.append(abs(inter_mol_e.Total))
self._vdw_energies.append(abs(vdw_e))
self._ele_energies.append(abs(ele_e))
def _calcSpecHeat(self, tot_e):
"""
Calculates specific heat for NVT (Cv) and NPT (Cp) ensembles.
:param tot_e: total energy
:type tot_e: 1D `numpy.ndarray`
:rtype: `float`
"""
deltaH = numpy.mean(tot_e**2) - numpy.mean(tot_e)**2
kT_sq = 0.001987 * self._target_temperature**2
Cp = numpy.fabs(deltaH) / kT_sq
return Cp * constants.Conversion.KCAL_TO_JOUL / self._tot_weight
def _calcSpecHeatNVE(self, kin_e):
"""
Calculates specific heat for NVE (Cv) ensemble.
:param kin_e: kinetic energy
:type kin_e: `numpy.ndarray`
:rtype: `float`
"""
Z = numpy.mean(kin_e) * numpy.mean(kin_e**-1)
dof = self._num_atom * 6
Cv = (1. - (1. - ((2. / dof) * Z)))**-1
return Cv / self._tot_weight * constants.GAS_CONSTANT_JOUL
def _getResultsForLastNPercent(self, energy, func) -> List[float]:
"""
calculate the specific heat for the last N percent of the trajectory
:param energy: ensemble-dependent energy
:type energy: `numpy.array`
:param func: callable that calculates specific heat
"""
idx = int(len(energy) * (1.0 - self._last_n_percent / 100.))
return func(energy[idx:])
[docs] def parseEnergies(self, fname, num_groups):
"""
Parse the vrun output.engp file for total molecular interaction energies,
and other requested quantities.
See for example
https://opengrok.schrodinger.com/xref/desmond-gpu-src/test/data/bulktest/gpu
https://opengrok.schrodinger.com/xref/desmond-gpu-src/test/data/bulktest/cpu
:type num_groups: `int`
:rtype: `dict`
"""
bulk_pattern = r"(time)"
n_mol_pattern = r'Num_active_molecules\s+\({0}\)\s+{0}'.format(
self.FLOATING_NUMBER)
mass_pattern = r'Active_mass\s+\({0}\)\s+{0}'.format(
self.FLOATING_NUMBER)
re_energy = re.compile(self.E_PATTERN)
volumes = []
pot_energies = []
kin_energies = []
parse_header = False
if self._has_vol or self._has_c:
parse_header = True
re_header = re.compile(bulk_pattern)
if self._has_p_tensor:
re_p_tensor = re.compile(P_TENSOR_PATTERN)
p_tensors = []
if self._is_gcmc:
n_mols_per_frame = []
mass_per_frame = []
re_num_mol = re.compile(n_mol_pattern)
re_mass = re.compile(mass_pattern)
# num_atom may be needed for gcmc specific heat calculation
#n_atoms_per_frame = []
#re_num_atom = re.compile(rf'Num_active_atoms\s+\({floating_number}\)\s+{floating_number}')
energy_parser = _parse_self_energy_line
reducer = self._reduceSelfEnergies
energy_series = {}
with open(fname) as f:
for line in f:
# total inter-molecular energies
match = re_energy.match(line)
if match:
energy_parser(energy_series, match, line, num_groups)
if line.startswith('Total'):
reducer(energy_series, num_groups)
# other bulk properties
elif self._has_p_tensor and re_p_tensor.match(line):
time, p_tensor = _parse_p_tensor_line(line)
p_tensors.append(p_tensor)
else:
if parse_header and re_header.match(line):
data = _parse_header_line(line)
volumes.append(data['V'])
pot_energies.append(data['E_p'])
kin_energies.append(data['E_k'])
if self._is_gcmc:
match = re_num_mol.match(line)
if match:
n_mols_per_frame.append(float(match.group(2)))
match = re_mass.match(line)
if match:
mass_per_frame.append(float(match.group(2)))
# This may be needed for gcmc specific heat calculation
#match = re_num_atom.match(line)
#if match:
# n_atoms_per_frame.append(float(match.group(2)))
result = {}
if self._has_vol:
result['VolumeResult'] = volumes
result['VolumeUnit'] = "Angstrom^3"
if self._has_density:
if self._is_gcmc:
result['DensityResult'] = [
constants.Conversion.AU_TO_KG * tot_weight / v
for tot_weight, v in zip(mass_per_frame, volumes)
]
else:
result['DensityResult'] = [
constants.Conversion.AU_TO_KG * self._tot_weight / v
for v in volumes
]
result['DensityUnit'] = "g/cm^3"
if self._has_p_tensor:
result['PressureTensorResult'] = p_tensors
result['PressureTensorUnit'] = 'bar'
if self._has_c:
# usually the first frame contains no velocities, thus we copy the
# values form the second frame
kin_energies[0] = kin_energies[1]
kin_e = numpy.array(kin_energies)
tot_e = kin_e + numpy.array(pot_energies)
if self._ensemble == 'NVE':
c_type = 'Cv'
c = self._getResultsForLastNPercent(kin_e,
self._calcSpecHeatNVE)
elif self._ensemble == 'NVT':
c_type = 'Cv'
c = self._getResultsForLastNPercent(tot_e, self._calcSpecHeat)
elif self._ensemble == 'NPT':
c_type = 'Cp'
# 1 bar = 10^5 Pa = 10^5 kg/(m s^2)
# 1A^3 = 10^-30 m^3
# PV = 10^5 * 10^-30 J
# 1J = 1.44e20 kcal/mol
# PV(kcal/mol) = P(bar)*V(A^3)* 1.44e-5
pv = numpy.array(volumes) * self._target_pressure * 1.44e-5
c = self._getResultsForLastNPercent(tot_e + pv,
self._calcSpecHeat)
else:
raise EnergyGroupError('Unable to compute specific heat '
f'when the ensemble is {self._ensemble}')
result['SpecificHeatResult'] = [c_type, c]
result['SpecificHeatUnit'] = 'J / (g * K)'
if {'HEAT_VAP', 'COHES_E'}.intersection(self.key):
if self._is_gcmc:
cohes_tot_e = [
e / num_mol
for e, num_mol in zip(self._tot_energies, n_mols_per_frame)
]
else:
cohes_tot_e = [e / self._num_mol for e in self._tot_energies]
if 'COHES_E' in self.key:
result['EneCohesiveResult'] = cohes_tot_e
result['EneCohesiveUnit'] = 'kcal/mol'
if 'HEAT_VAP' in self.key:
RT = self._target_temperature * constants.GAS_CONSTANT
result['HeatVaporizationResult'] = [e + RT for e in cohes_tot_e]
result['HeatVaporizationUnit'] = 'kcal/mol'
if 'SOL_PARAM' in self.key:
norm_vol = 10000. / (numpy.array(volumes) * 6.022)
sol_param_tot = (
numpy.sqrt(norm_vol * numpy.array(self._tot_energies)) /
constants.Conversion.CAL_CM_TO_MPA)
sol_param_vdw = (
numpy.sqrt(norm_vol * numpy.array(self._vdw_energies)) /
constants.Conversion.CAL_CM_TO_MPA)
sol_param_ele = (
numpy.sqrt(norm_vol * numpy.array(self._ele_energies)) /
constants.Conversion.CAL_CM_TO_MPA)
result['SolubilityParameterResult'] = sol_param_tot.tolist()
result['SolubilityParameterVdwSqResult'] = sol_param_vdw.tolist()
result['SolubilityParameterEleSqResult'] = sol_param_ele.tolist()
result['SolubilityParameterUnit'] = "MPa^(1/2)"
result['SolubilityParameterVdwSqUnit'] = "MPa^(1/2)"
result['SolubilityParameterEleSqUnit'] = "MPa^(1/2)"
return self._times, result
[docs]class PartialCohesiveEnergy(EnergyGroupBase):
"""
Compute total cohesive energy (not per molecule) for a set of molecules
defined by ASL. For this, nonbonding cross terms must be summed up.
"""
[docs] def __init__(self, cms_model, asl):
"""
:param str asl: ASL that defines set of molecules.
:raise EnergyGroupError: If ASL defines incomplete molecules
"""
self.key = (asl,)
molnums = sorted({
cms_model.atom[x].molecule_number
for x in cms_model.select_atom(asl)
})
mol_asl = 'mol.num %s' % ','.join(map(str, molnums))
# Molecules defined by ASL must be complete.
if (set(cms_model.select_atom(mol_asl)) != set(
cms_model.select_atom(asl))):
raise EnergyGroupError(
'Cannot perform partial cohesive energy '
'calculation for ASL that defines incomplete molecules.')
self.groups = [
set(cms_model.molecule[molnum].getAtomIndices())
for molnum in molnums
]
# cross_energy_only sums up all the cross terms between groups
# (in this case a group is a molecule), except cross terms with 0 group
self.kwargs = {
'options': {constants.ENERGY_GROUPS.CROSS_ENERGY_ONLY},
'groups': self.groups,
'parse': self.parseEnergies
}
self._times = []
self._tot_energies = []
def _reduceEnergies(self, energy_series, num_groups):
"""
Retrieve total interaction energy as the sum of nonbonding energy
(vdw + elec + far).
"""
time, inter_mol_e = energy_series.popitem()
self._times.append(time)
# Here all interaction energies should be negative for stable configurations
vdw_e = getattr(inter_mol_e, 'nonbonded_vdw', 0)
ele_e = (getattr(inter_mol_e, 'nonbonded_elec', 0) +
getattr(inter_mol_e, 'far_terms', 0))
tot_en = vdw_e + ele_e
self._tot_energies.append(tot_en)
[docs] def parseEnergies(self, fname, num_groups):
"""
Parse the vrun output.engp file for nonbonding energies. Don't keep
everything in the memory.
:param str fname: Energy group file name
:param int num_groups: Number of groups
:return: list of times and dictionary with results
:rtype: tuple(list, dict)
"""
re_energy = re.compile(Bulk.E_PATTERN)
with open(fname) as f:
one_time_dict = {}
for line in f:
# total inter-molecular energies
match = re_energy.match(line)
if match:
_parse_cross_energy_line(one_time_dict, match, line,
num_groups)
if line.startswith('Total'):
self._reduceEnergies(one_time_dict, num_groups)
# Free memory
one_time_dict = {}
return self._times, {'EneCohesivePartialResult': self._tot_energies}
[docs] def getResult(self, result):
return result
def _parse_header_line(line):
"""
Parse header line that looks like this:
time=19.2 en=3.641e+04 E_p=-1.937e+02 E_k=7.039e+03 E_x=1.071e+02 P=5.999e+02 V=9.654+04
and return a dictionary
:param line: a frame header str that comes from energy-group output data file
"""
data = {}
for token in line.split():
if '=' in token:
key, value = token.split('=')
data[key] = float(value)
return data
def _parse_p_tensor_line(line):
"""
Process lines that look like this:
Pressure_Tensor(4.800000) -1138.7 39.734 300.07 116.98 -1124.3 -120.67 259.95 -65.07 -963.18
:param line: a pressure tensor line that comes from energy-group output file
:return: chemical time and 9 pressure tensor components
:rtype: (float, list of floats)
"""
stripped = line.split('(')[1].split(')')
l = stripped[1].split()
return (float(stripped[0]), [float(x) for x in l])
def _read_orig_cfg(cfg_file):
"""
:param cfg_file: path to the simulation configuration file
"""
try:
with open(cfg_file) as f:
cfg = sea.Map(f.read())
except Exception as e:
raise EnergyGroupError('Reading configuration file %s failed: %s.' %
(cfg_file, e))
if 'ORIG_CFG' not in cfg:
raise EnergyGroupError('ORIG_CFG is missing from %s' % cfg_file)
return copy.copy(cfg['ORIG_CFG'])
# first, interval, last are chemical times instead of frame indices
SliceParams = namedtuple('SliceParams', 'first interval last')
def _prepare_cfg(cfg_file, tr_path, slicing, options, is_gcmc):
"""
Read the -out.cfg file, and write to a temporary energy.cfg file with the
vrun options.
:type slicing: `SliceParams`
:param set options: Set of energygroup options
:type is_gcmc: bool
"""
cfg = _read_orig_cfg(cfg_file)
fout = 'output.engrp'
ene_cms_file = 'energy.cms'
try:
is_external_potential = (
cfg.backend.force.nonbonded.type.val == 'external')
except AttributeError:
is_external_potential = False
if (is_external_potential and
constants.ENERGY_GROUPS.SELF_ENERGY_ONLY not in options):
raise RuntimeError(f'"{constants.ENERGY_GROUPS.SELF_ENERGY_ONLY}" '
'energy_groups plugin option must '
'be enabled for the "external" potential')
with open('energy.cfg', 'w') as f:
cfg.update('energy_group = on')
cfg.update('vrun_frameset="%s"' % tr_path)
cfg.update('time="inf"')
cfg.update('trajectory.interval=0')
cfg.update('backend.vrun.checkpt.interval=inf')
cfg.update('backend.vrun.plugin.list = ["energy_groups" "status"]')
cfg.update('backend.vrun.plugin.energy_groups.name="%s"' % fout)
# frame slicing
cfg.update('backend.vrun.plugin.energy_groups.first=%s' % slicing.first)
cfg.update('backend.vrun.plugin.energy_groups.interval=%s' %
slicing.interval)
cfg.update('backend.vrun.plugin.status.first=%s' % slicing.first)
cfg.update('backend.vrun.plugin.status.interval=%s' % slicing.interval)
cfg.update('backend.vrun.frameset.first=%s' % slicing.first)
cfg.update('backend.vrun.frameset.interval=%s' % slicing.interval)
if slicing.last:
cfg.update('backend.vrun.last_time=%s' % slicing.last)
cfg.update('backend.vrun.frameset.last_time=%s' % slicing.last)
cfg.update('backend.vrun.plugin.status.last_time=%s' % slicing.last)
cfg.update('backend.vrun.plugin.energy_groups.last_time=%s' %
slicing.last)
is_bulk = constants.ENERGY_GROUPS.SELF_ENERGY_ONLY in options
if is_gcmc and is_bulk:
options = options | {constants.ENERGY_GROUPS.GCMC_INFO}
# Sort options to make them deterministic in the cfg
options_str = ('backend.vrun.plugin.energy_groups.options = [%s]' %
' '.join(sorted(options)))
cfg.update(options_str)
cfg.update('backend.vrun.plugin.maeff_output.bootfile="%s"' %
ene_cms_file)
f.write(str(cfg))
def _prepare_cms(cms_model, groups):
"""
Create a temporary energy.cms file with atom property energy group id
:type groups: list[set[int]] or list(str(AIDs))
"""
atom_groups = [
AtomGroup(
cms_model.select_atom_comp('(atom.num %s)' % ','.join(map(str, g))),
'energy', group_id)
for group_id, g in enumerate(groups, start=1)
if g
]
cms_model.set_atom_group(atom_groups)
cms_model.write('energy.cms')
# FIXME: since Aleks is considering changing the format of energy-group output file
# the old parsing code are copied here without much change, i.e.,
# EnergyComponent, _parse_line(), _parse_energies(), _sort_dict()
# _parse_header_line, _parse_p_tensor_line, etc -- DZ Aug.2017
[docs]class EnergyComponent:
"""
Class for storing different energy components
"""
[docs] def add(self, term, val):
self.__dict__[term] = val
@property
def elec(self):
return self.nonbonded_elec + getattr(self, 'pair_elec', 0) + getattr(
self, 'far_terms', 0) + getattr(self, 'far_exclusion', 0)
@property
def vdw(self):
return self.nonbonded_vdw + getattr(self, 'pair_vdw', 0)
def _parse_self_energy_line(energy_series, energy_match, line, ngroups):
"""
Parse one line of the energygroup output .engrp file with the
`self_energy_only` option on.
See docstring of `_parse_line` for arguments
Inter-molecular energy is saved in `energy_series[time]` as side effect.
"""
energy_comp_name = energy_match.group(1)
time = float(energy_match.group(2))
if time not in energy_series:
energy_series[time] = EnergyComponent()
inter_mol_result = energy_series[time]
# with n groups specified by user, skip n+3 columns, which are
# energy tag, (time), E00, E11 ... Enn, since group 0 is reserved
l = line.split()
values = l[3:-1]
inter_mol = sum(map(float, values))
energy_comp_total = float(l[-1])
inter_mol_result.add(energy_comp_name, energy_comp_total - inter_mol)
def _parse_cross_energy_line(energy_series, energy_match, line, ngroups):
"""
Parse one line of the energygroup output .engrp file with the
`cross_energy_only` option on.
See docstring of `_parse_line` for arguments
Inter-molecular energy is saved in `energy_series[time]` as side effect.
"""
energy_comp_name = energy_match.group(1)
time = float(energy_match.group(2))
if time not in energy_series:
energy_series[time] = EnergyComponent()
inter_mol_result = energy_series[time]
# line is: energy tag, (time), X-Y, Total
tmp = line.split()
inter_mol_result.add(energy_comp_name, float(tmp[2]))
def _parse_line(energy_series, energy_match, line, ngroups):
"""
Parse one line of the energygroup output file and save the results to `energy_series`
:type energy_series: `dict`, key = simulation time, `float`
value = Dict[component name, `EnergyComponent`]
The component name could be (i, j), or 'all'
:param energy_series: Holder of energies result
:type energy_match: `re.MatchObject`
:param energy_match: The first group is the energy component name and the
second group is time
:type line: `str`
:param line: A single line of text from the energy-group output
data file. It will be parsed to get the energy data.
Example: angle (0.0) 0.0 0.0 0.0 3.0 1.0 2.5 6.5
:type ngroups: `int`
:param ngroups: Number of energy groups
"""
energy_comp_name = energy_match.group(1)
time = float(energy_match.group(2))
if time not in energy_series:
energy_series[time] = {}
energy_pair = energy_series[time]
# with n groups specified by user, skip n+3 columns, which are
# energy tag, (time), E00, E01 ... E0n since group 0 is reserved
l = line.split()
values = l[ngroups + 3:-1]
offset = 0
for i in range(ngroups):
for j in range(i, ngroups):
if frozenset([i, j]) not in energy_pair:
energy_pair[frozenset([i, j])] = EnergyComponent()
try:
v = float(values[offset])
except IndexError:
v = 0.0
energy_pair[frozenset([i, j])].add(energy_comp_name, v)
offset += 1
energy_comp_total = float(l[-1])
if 'all' not in energy_pair:
energy_pair['all'] = EnergyComponent()
energy_pair['all'].add(energy_comp_name, energy_comp_total)
energy_pair['all'].add('time', energy_comp_total)
def _sort_dict(d):
"""
Convert the dictionary of {time: energy} to a list of two tuples:
[(times), (energies)]
"""
return list(zip(*sorted(d.items())))
def _parse_energies(fname, num_groups):
"""
:type num_groups: `int`
"""
energy_term = r"(angle|dihedral|far_exclusion|far_terms|nonbonded_elec|nonbonded_vdw|pair_elec|pair_vdw|stretch|Total)"
floating_number = r"([-+]?\b(?:[0-9]*\.)?[0-9]+(?:[eE][-+]?[0-9]+)?\b)"
pattern = r"%s\s+\(%s\)" % (energy_term, floating_number)
re_energy = re.compile(pattern)
energy_series = {}
with open(fname) as f:
for line in f:
match = re_energy.match(line)
if match:
_parse_line(energy_series, match, line, num_groups)
return _sort_dict(energy_series)
def _run_vrun():
"""
:raise: `EnergyGroupError`
"""
cmd = ["desmond", "-PROCS", "1", "-in", 'energy.cms', "-c", "energy.cfg"]
logger.info("launch vrun: %s" % cmd)
job = None
try:
job = jobcontrol.launch_job(cmd)
except Exception as e:
logger.info("Failed launching vrun: %s." % e)
job and job.wait()
try:
if job.ExitCode != 0:
raise EnergyGroupError("Vrun job exited with code %d, %s" %
(job.ExitCode, job.summary()))
except AttributeError:
raise EnergyGroupError("Vrun job prematurely exited.")
[docs]@contextmanager
def temp_dir():
d = tempfile.mkdtemp(dir=fileutils.get_directory_path(fileutils.TEMP))
oldpwd = os.getcwd()
os.chdir(d)
try:
yield d
finally:
os.chdir(oldpwd)
try:
debug = int(os.getenv("SCHRODINGER_JOB_DEBUG", "0"))
except ValueError:
# e.g: int("abc")
debug = 0
if debug == 0:
shutil.rmtree(d)
else:
logger.info("%s are kept for debugging purpose." % d)
[docs]def get_energies(sim_cfg,
tr_path,
cms_model,
slicing,
groups,
parse=_parse_energies,
options=None):
"""
Calculate energies between groups of atoms specified by ASL selections,
list of list of AIDs, or molecule number. Note there should be no overlap
between any pair of the atom groups.
:type slicing: `SliceParams`
:type groups: list(set(AIDs))
:type parse: callable
:param parse: parser for the energy-group output data file
:param set options: Set of energy group options
:rtype: tuple(list(float), list(`EnergyComponent`))
:return: simulation times, and various energies for each frame
See `EnergyFacade` for other arguments
"""
if not groups:
raise EnergyGroupError('No energy groups were specified')
if not _Const.CAN_USE_GPU_DESMOND:
raise EnergyGroupError('Energy group calculations require a GPU host.')
if len(groups) != 1 and set.intersection(*groups):
raise EnergyGroupError('Overlap among groups is not allowed.')
if options is None or not options.issubset(constants.ENERGY_GROUPS):
raise EnergyGroupError('Unknown analysis options.')
# GPU desmond supports self_energy_only option for bulk analysis
with temp_dir():
_prepare_cfg(sim_cfg, tr_path, slicing, options, cms_model.is_for_gcmc)
_prepare_cms(cms_model, groups)
_run_vrun()
return parse('output.engrp', len(groups))
[docs]class EnergyFacade:
"""
A helper class to manage energy group calculations. Different energy group
calculations have different input/output formats. All these details are
hidden by using this class and the subclasses of `EnergyGroupBase`.
To get full control over the energy group calculation, call `get_energies`
directly.
"""
[docs] def __init__(self, sim_cfg, tr_path, cms_model, slicing):
"""
:type sim_cfg: `str` or `None`
:param sim_cfg: Path to Desmond simulaiton configuration.
:type tr_path: `str`
:param tr_path: Path to trajectory folder
:type cms_model: `Cms`
:type slicing: `SliceParams`
"""
self._args = [sim_cfg, tr_path]
if sim_cfg:
self._args = list(map(os.path.abspath, self._args))
if cms_model.is_for_gcmc:
# put inactive molecules into the fsys_ct so that they all count as groups
cms_model = Cms(string=cms_model.write_to_string(),
remove_inactive_atoms_in_fsys=False)
self._args.extend([cms_model, slicing])
self._cache = {}
self.times = None
[docs] def get(self, ene):
"""
:type ene: instance inherited from `EnergyGroupBase`
"""
if ene.key not in self._cache:
if 'SPECIFIC_HEAT' in ene.key or 'PRESSURE_TENSOR' in ene.key:
trj_path = self._args[1]
tr = traj.read_traj(trj_path)
if tr[0].vel() is None:
raise EnergyGroupError(
'Velocity is needed for specific heat and pressure tensor'
f' calculation. The trajectory {trj_path} does '
'not contain velocity information.')
# Bulk needs simulation parameters for specific heat
if 'SPECIFIC_HEAT' in ene.key or 'HEAT_VAP' in ene.key:
ene.getCfgParams(self._args[0])
self.times, self._cache[ene.key] = get_energies(
*self._args, **ene.kwargs)
return ene.getResult(self._cache[ene.key])
[docs]def analyze(tr: List[traj.Frame],
cms_model: Cms,
*analyzers,
sim_cfg: Optional[str] = None):
"""
Do analyses on the given trajectory `tr`, and return the results.
The analyses are specified as one or more positional arguements. Each
analyzer should satisfy the interface requirements.
:param tr: The simulation trajectory to analyze
:param analyzers: A list of analyzer objects
:param sim_cfg: Path to Desmond simulation configuration.
:rtype: list
:return: For a single analyzer, this function will return a list of analysis
results, and each element in the list corresponds to the result of
the corresponding frame. For multiple analyzers, this function will
return a list of lists, and each element is a list of results of
the corresponding analyzer.
"""
results = []
if sim_cfg is not None:
sim_cfg = str(Path(sim_cfg).absolute())
with temp_dir():
tr_path = 'energygroup_trj'
traj.write_traj(tr, tr_path)
ef = EnergyFacade(sim_cfg, tr_path, cms_model,
SliceParams(0, 0, 0)) # select all frames
for ene in analyzers:
try:
results.append(ef.get(ene))
except EnergyGroupError as e:
print("Cannot compute energygroup: %s." % e)
results.append(None)
# FIXME: do we really want this special treatment?
# (Make consistent with analysis.analyze function)
return results[0] if 1 == len(analyzers) else results