Source code for schrodinger.application.desmond.packages.energygroup

import copy
import os
import re
import shutil
import tempfile
from collections import defaultdict
from collections import namedtuple
from contextlib import contextmanager
from itertools import combinations
from pathlib import Path
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple

import numpy

import schrodinger.job.jobcontrol as jobcontrol
import schrodinger.utils.fileutils as fileutils
from schrodinger import gpgpu
from schrodinger.structutils.analyze import validate_asl
from schrodinger.application.desmond import config
from schrodinger.application.desmond import constants
from schrodinger.application.desmond.cms import AtomGroup
from schrodinger.application.desmond.cms import Cms
from schrodinger.application.desmond.packages import traj
from schrodinger.infra import licensing
from schrodinger.utils import log
from schrodinger.utils import sea

logger = log.get_output_logger("energygroup")

P_TENSOR_PATTERN = r"(Pressure_Tensor)"
SIM_BOX_PATTERN = r"(Simulation_Box)"


[docs]class EnergyGroupError(Exception): pass
class _Const: DEFAULT_P = 1.01325 DEFAULT_T = 300. CAN_USE_GPU_DESMOND = licensing.licenseExists( licensing.DESMOND_GPGPU) and gpgpu.is_any_gpu_available()
[docs]class EnergyGroupBase: """ Its subclasses work with `EnergyFacade` to perform energy group calculations. The subclass instance should have the following public attributes: - `key`: immutable and iterable. It is used as the key to retrieve calculation result from cache in `EnergyFacade` - `kwargs`: `dict`. It is the keyword arguments for `get_energies`. Its roles include: - store and validate the parameters - retrieve result from the cache in `EnergyFacade` """ DEFAULT_OPTIONS = { constants.ENERGY_GROUPS.PRESSURE_TENSOR, constants.ENERGY_GROUPS.CORR_ENERGY }
[docs] def getResult(self, result): """ Retrieve and format the result from the cache in `EnergyFacade` """ raise NotImplementedError # pragma: no cover
[docs]class MoleculeEnergy(EnergyGroupBase): """ Analyzer for energy group calculation with single molecule selection. The pre-defined energy groups are (see COMPONENT_LIST) 1. All atoms - group 2, 3, 4, 5 2. Solute atoms - group 5 3. Membrane atoms - group 5 4. Solvent atoms - group 5 5. Selected molecule """ TYPE_MAP = { 'Coulomb': 'elec', 'vdW': 'vdw', 'Bond': 'stretch', 'Angle': 'angle', 'Torsion': 'dihedral', 'Total': 'Total' } COMPONENT_LIST = ['Other', 'Solute', 'Membrane', 'Solvent', 'Self']
[docs] def __init__(self, cms_model, molecule_number, type, type2): """ :type cms_model: `Cms` :param molecule_number: index of the molecule :type molecule_number: `int` :param type: pre-defined energy type, see TYPE_MAP :type type: `str` :param type2: pre-defined atom group, see COMPONENT_LIST :type type2: `str` """ self.key = (molecule_number,) aids = set(cms_model.select_atom('mol.num %d' % molecule_number)) if not aids: raise EnergyGroupError(f"Molecule {molecule_number} doesn't exist!") solvent_aids = set(cms_model.select_atom('solvent')) - aids solute_aids = set(cms_model.select_atom('solute')) - aids membrane_aids = set(cms_model.select_atom('membrane')) - aids other_aids = set(cms_model.select_atom('all')) - aids - solvent_aids - \ solute_aids - membrane_aids groups = [other_aids, solute_aids, membrane_aids, solvent_aids, aids] self.kwargs = {'options': self.DEFAULT_OPTIONS, 'groups': groups} try: self._idx = frozenset([4, self.COMPONENT_LIST.index(type2)]) self._attr = self.TYPE_MAP[type] except KeyError: raise EnergyGroupError('Energy type is not valid: %s.' % type) except ValueError: raise EnergyGroupError('Component name is not valid: %s.' % type2)
[docs] def getResult(self, result) -> List[float]: """ :return: frame-by-frame result of the `type` energy (see TYPE_MAP) of the atom group `type2` (see COMPONENT_LIST) """ return [getattr(c[self._idx], self._attr) for c in result]
[docs]class SelectionEnergyMatrix(EnergyGroupBase): """ Analyzer for energy group matrix calculation (all self and cross terms). The pre-defined energy groups are calculated for the specified ASL selection. """
[docs] def __init__(self, cms_model: Cms, asl_selections: Optional[List] = None): """ :param asl_selection: List of ASL srings to include in the energy group calculation. """ self.key = frozenset(['MATRIX']) if not asl_selections or not isinstance(asl_selections, list): raise EnergyGroupError( 'ERROR: SelectionEnergyMatrix `asl_selection` cannot be empty ' 'and must be iterable.') for asl in asl_selections: if asl is None or not validate_asl(asl): raise EnergyGroupError( 'ERROR: SelectionEnergyMatrix analyzer cannot validate' f'an ASL: {asl}') self.asl_selections = asl_selections self.groups = [ set(cms_model.select_atom(asl)) for asl in asl_selections ] ngroups = len(self.groups) if not any(self.groups) or ngroups > 999: raise EnergyGroupError( 'ERROR: SelectionEnergyMatrix analyzer cannot be empty or ' f'supports upto 999 energy groups: {ngroups} energy groups ' 'provided.') self._self_groups = [frozenset([i]) for i in range(ngroups)] self._self_group_keys = [f'sel_{grp:03}' for grp in range(ngroups)] self._cross_groups = [ frozenset(c) for c in combinations(range(ngroups), 2) ] self._cross_group_keys = [ f'sel_{g0:03}-sel_{g1:03}' for g0, g1 in combinations(range(ngroups), 2) ] self.kwargs = {'options': self.DEFAULT_OPTIONS, 'groups': self.groups}
[docs] def getResult(self, result) -> Dict[str, Dict[str, float]]: """ :return: frame-by-frame result all self and cross energy terms """ # self terms self_angle = defaultdict(list) self_stretch = defaultdict(list) self_dihed = defaultdict(list) self_elec = defaultdict(list) self_vdw = defaultdict(list) # cross terms cross_angle = defaultdict(list) cross_stretch = defaultdict(list) cross_dihed = defaultdict(list) cross_elec = defaultdict(list) cross_vdw = defaultdict(list) for frame in result: for key, grp in zip(self._self_group_keys, self._self_groups): self_angle[key].append(frame[grp].angle) self_stretch[key].append(frame[grp].stretch) self_dihed[key].append(frame[grp].dihedral) self_elec[key].append(frame[grp].elec) self_vdw[key].append(frame[grp].vdw) for key, grp in zip(self._cross_group_keys, self._cross_groups): cross_angle[key].append(frame[grp].angle) cross_stretch[key].append(frame[grp].stretch) cross_dihed[key].append(frame[grp].dihedral) cross_elec[key].append(frame[grp].elec) cross_vdw[key].append(frame[grp].vdw) res = { 'self_ene': { 'angle': self_angle, 'stretch': self_stretch, 'dihedral': self_dihed, 'elec': self_elec, 'vdw': self_vdw, }, 'cross_ene': { 'elec': cross_elec, 'vdw': cross_vdw, 'angle': cross_angle, 'stretch': cross_stretch, 'dihedral': cross_dihed, }, } return {'Result': res}
[docs]class MoleculeEnergyMatrix(EnergyGroupBase): """ Analyzer for energy group matrix calculation (all self and cross terms). The pre-defined energy groups are calculated for the specified molecules. If molecules are not specified then all molecules will be used. """
[docs] def __init__(self, cms_model: Cms, molecule_numbers: Optional[List] = None): """ :param molecule_numbers: Set of molecules to include in the energy group calculation. If `None` or empty, all molecules will be used. """ self.key = frozenset(['MATRIX']) self.mols = molecule_numbers or range(1, cms_model.mol_total + 1) ngroups = len(self.mols) if ngroups > 999: raise EnergyGroupError( 'ERROR: MoleculeEnergyMatrix analyzer ' f'supports upto 999 energy groups: {ngroups} energy groups ' 'provided.') _grps = range(ngroups) self._self_dict = {frozenset([m]): self.mols[m] for m in _grps} self._cross_dict = { frozenset([m0, m1]): (self.mols[m0], self.mols[m1]) for (m0, m1) in combinations(_grps, 2) } self.groups = [ set(cms_model.molecule[m].getAtomIndices()) for m in self.mols ] self.kwargs = {'options': self.DEFAULT_OPTIONS, 'groups': self.groups}
[docs] def getResult(self, result) -> Dict[str, Dict[str, float]]: """ :return: frame-by-frame result all self and cross energy terms """ self_angle = defaultdict(list) self_stretch = defaultdict(list) self_dihed = defaultdict(list) self_elec = defaultdict(list) self_vdw = defaultdict(list) cross_elec = defaultdict(list) cross_vdw = defaultdict(list) for frame in result: for mol_idx, mol_num in self._self_dict.items(): key = f'mol_{mol_num:03}' self_angle[key].append(frame[mol_idx].angle) self_stretch[key].append(frame[mol_idx].stretch) self_dihed[key].append(frame[mol_idx].dihedral) self_elec[key].append(frame[mol_idx].elec) self_vdw[key].append(frame[mol_idx].vdw) for mol_idx_pair, mol_pair in self._cross_dict.items(): key = f'mol_{mol_pair[0]:03}-mol_{mol_pair[1]:03}' cross_elec[key].append(frame[mol_idx_pair].elec) cross_vdw[key].append(frame[mol_idx_pair].vdw) res = { 'self_ene': { 'angle': self_angle, 'stretch': self_stretch, 'dihedral': self_dihed, 'elec': self_elec, 'vdw': self_vdw, }, 'cross_ene': { 'elec': cross_elec, 'vdw': cross_vdw }, } return {'Result': res}
[docs]class InteractionEnergy(EnergyGroupBase): """ Analyzer for interaction energy group calculation. """
[docs] def __init__(self, cms_model, asl1, asl2): """ :type cms_model: `Cms` :param asl1: atom selection for group 1 :param asl2: atom selection for group 2 :type asl1: `str` :type asl2: `str` """ self.key = (asl1, asl2) groups = [set(cms_model.select_atom(asl)) for asl in self.key] self.kwargs = {'options': self.DEFAULT_OPTIONS, 'groups': groups} self._idx = frozenset([0, 1])
[docs] def getResult(self, result) -> List[Tuple[float, float]]: """ :return: frame-by-frame result of the electric and van der Walls energies (both non-bonded and pair) between the two groups """ return [(getattr(c[self._idx], 'elec'), getattr(c[self._idx], 'vdw')) for c in result]
def _get_ensemble(cfg_obj): """ Extract ensemble type from the original cfg block :type cfg_obj: `sea.Map` :param cfg_obj: 'ORIG_CFG' block as ark object :rtype: `str` """ # the ensemble tag may be missing from the ORIG_CFG block cfg = copy.deepcopy(config.DEFAULT_SETTING.MD) cfg.update(cfg_obj) cfg = config.canonicalize(cfg) return cfg['ensemble']['class'].val def _get_cfg_prop(cfg_obj, key, default): """ Extract target simulation condition from the original cfg block. If not present, use `default` value. :type key: `str` :type cfg_obj: `sea.Map` :param cfg_obj: 'ORIG_CFG' block as ark object :rtype: `float` """ def extract(t): if isinstance(t, list): t = extract(t[0]) return float(t) t = default if key in cfg_obj: t = extract(cfg_obj[key].val) return t
[docs]class Bulk(EnergyGroupBase): """ Analyzer for material science energy group calculation """ OPTIONS = { "TOTAL_E", "INTRA_E", # these two are not used "VOLUME", "DENSITY", "PRESSURE_TENSOR", "SPECIFIC_HEAT", "COHES_E", "SOL_PARAM", "HEAT_VAP" } ENERGY_TERM = r"(far_terms|nonbonded_elec|nonbonded_vdw|Total)" FLOATING_NUMBER = r"([-+]?\b(?:[0-9]*\.)?[0-9]+(?:[eE][-+]?[0-9]+)?\b)" E_PATTERN = r"%s\s+\(%s\)" % (ENERGY_TERM, FLOATING_NUMBER)
[docs] def __init__(self, cms_model, type, last_n_percent=70): """ @param last_n_percent: Only use `last_n_percent` of the trajectory to compute specific heat """ self.key = frozenset([t.upper() for t in type]) invalid_opt = self.key - self.OPTIONS if invalid_opt: raise EnergyGroupError('Invalid options: %s' % invalid_opt) if {'HEAT_VAP', 'COHES_E', 'SOL_PARAM'}.intersection(self.key): # make each molecule an energy group groups = [set(mol.getAtomIndices()) for mol in cms_model.molecule] else: # one group for all molecules groups = [set(cms_model.getAtomIndices())] self._has_vol = bool( {'DENSITY', 'VOLUME', 'SPECIFIC_HEAT', 'SOL_PARAM'}.intersection(self.key)) self._has_p_tensor = 'PRESSURE_TENSOR' in self.key self._has_density = 'DENSITY' in self.key self._has_c = 'SPECIFIC_HEAT' in self.key if self._has_c: self._num_atom = cms_model.atom_total if self._has_density or self._has_c: self._tot_weight = cms_model.total_weight options = (self.DEFAULT_OPTIONS | {constants.ENERGY_GROUPS.SELF_ENERGY_ONLY}) self.kwargs = { 'options': options, 'groups': groups, 'parse': self.parseEnergies, } self._times = [] self._tot_energies = [] self._vdw_energies = [] self._ele_energies = [] self._num_mol = cms_model.mol_total self._is_gcmc = cms_model.is_for_gcmc self._last_n_percent = last_n_percent if 'SPECIFIC_HEAT' in self.key and self._is_gcmc: raise EnergyGroupError('Cannot perform specific heat calculation ' 'for GCMC simulations.')
[docs] def getResult(self, result): return result
[docs] def getCfgParams(self, cfg_file): """ :param cfg_file: path to the simulation configuration file """ cfg = _read_orig_cfg(cfg_file) self._ensemble = _get_ensemble(cfg) if self._is_gcmc and self._ensemble not in {'NVT', 'NPT'}: raise EnergyGroupError( 'Expecting ensemble=NPT or ensemble=NVT for' f' GCMC simulations, but got ensemble={self._ensemble}.') self._target_pressure = None if self._ensemble == 'NPT': self._target_pressure = _get_cfg_prop(cfg, 'pressure', _Const.DEFAULT_P) self._target_temperature = _get_cfg_prop(cfg, 'temperature', _Const.DEFAULT_T)
def _reduceEnergies(self, energy_series, num_groups): """ To save memory, reduce all pairwise energies to total interaction energy, total vdw interaction energy, and total electric interaction energy """ time, ene_pairs = energy_series.popitem() tot_intra_e = vdw_intra_e = ele_intra_e = 0. for i in range(num_groups): e = ene_pairs[frozenset([i])] tot_intra_e += e.Total vdw_intra_e += getattr(e, 'nonbonded_vdw', 0) ele_intra_e += getattr(e, 'far_terms', 0) + getattr( e, 'nonbonded_elec', 0) all_e = ene_pairs['all'] self._times.append(time) # Here all interaction energies should be negative for stable configurations self._tot_energies.append(abs(all_e.Total - tot_intra_e)) self._vdw_energies.append( abs(getattr(all_e, 'nonbonded_vdw', 0) - vdw_intra_e)) self._ele_energies.append( abs( getattr(all_e, 'far_terms', 0) + getattr(all_e, 'nonbonded_elec', 0) - ele_intra_e)) def _reduceSelfEnergies(self, energy_series, num_groups): """ Retrieve total interaction energy, total vdw interaction energy, and total electric interaction energy, when the self_energy_only option is set. """ time, inter_mol_e = energy_series.popitem() self._times.append(time) # Here all interaction energies should be negative for stable configurations vdw_e = getattr(inter_mol_e, 'nonbonded_vdw', 0) ele_e = getattr(inter_mol_e, 'far_terms', 0) + getattr( inter_mol_e, 'nonbonded_elec', 0) self._tot_energies.append(abs(inter_mol_e.Total)) self._vdw_energies.append(abs(vdw_e)) self._ele_energies.append(abs(ele_e)) def _calcSpecHeat(self, tot_e): """ Calculates specific heat for NVT (Cv) and NPT (Cp) ensembles. :param tot_e: total energy :type tot_e: 1D `numpy.ndarray` :rtype: `float` """ deltaH = numpy.mean(tot_e**2) - numpy.mean(tot_e)**2 kT_sq = 0.001987 * self._target_temperature**2 Cp = numpy.fabs(deltaH) / kT_sq return Cp * constants.Conversion.KCAL_TO_JOUL / self._tot_weight def _calcSpecHeatNVE(self, kin_e): """ Calculates specific heat for NVE (Cv) ensemble. :param kin_e: kinetic energy :type kin_e: `numpy.ndarray` :rtype: `float` """ Z = numpy.mean(kin_e) * numpy.mean(kin_e**-1) dof = self._num_atom * 6 Cv = (1. - (1. - ((2. / dof) * Z)))**-1 return Cv / self._tot_weight * constants.GAS_CONSTANT_JOUL def _getResultsForLastNPercent(self, energy, func) -> List[float]: """ calculate the specific heat for the last N percent of the trajectory :param energy: ensemble-dependent energy :type energy: `numpy.array` :param func: callable that calculates specific heat """ idx = int(len(energy) * (1.0 - self._last_n_percent / 100.)) return func(energy[idx:])
[docs] def parseEnergies(self, fname, num_groups): """ Parse the vrun output.engp file for total molecular interaction energies, and other requested quantities. See for example https://opengrok.schrodinger.com/xref/desmond-gpu-src/test/data/bulktest/gpu https://opengrok.schrodinger.com/xref/desmond-gpu-src/test/data/bulktest/cpu :type num_groups: `int` :rtype: `dict` """ bulk_pattern = r"(time)" n_mol_pattern = r'Num_active_molecules\s+\({0}\)\s+{0}'.format( self.FLOATING_NUMBER) mass_pattern = r'Active_mass\s+\({0}\)\s+{0}'.format( self.FLOATING_NUMBER) re_energy = re.compile(self.E_PATTERN) volumes = [] pot_energies = [] kin_energies = [] parse_header = False if self._has_vol or self._has_c: parse_header = True re_header = re.compile(bulk_pattern) if self._has_p_tensor: re_p_tensor = re.compile(P_TENSOR_PATTERN) p_tensors = [] if self._is_gcmc: n_mols_per_frame = [] mass_per_frame = [] re_num_mol = re.compile(n_mol_pattern) re_mass = re.compile(mass_pattern) # num_atom may be needed for gcmc specific heat calculation #n_atoms_per_frame = [] #re_num_atom = re.compile(rf'Num_active_atoms\s+\({floating_number}\)\s+{floating_number}') energy_parser = _parse_self_energy_line reducer = self._reduceSelfEnergies energy_series = {} with open(fname) as f: for line in f: # total inter-molecular energies match = re_energy.match(line) if match: energy_parser(energy_series, match, line, num_groups) if line.startswith('Total'): reducer(energy_series, num_groups) # other bulk properties elif self._has_p_tensor and re_p_tensor.match(line): time, p_tensor = _parse_p_tensor_line(line) p_tensors.append(p_tensor) else: if parse_header and re_header.match(line): data = _parse_header_line(line) volumes.append(data['V']) pot_energies.append(data['E_p']) kin_energies.append(data['E_k']) if self._is_gcmc: match = re_num_mol.match(line) if match: n_mols_per_frame.append(float(match.group(2))) match = re_mass.match(line) if match: mass_per_frame.append(float(match.group(2))) # This may be needed for gcmc specific heat calculation #match = re_num_atom.match(line) #if match: # n_atoms_per_frame.append(float(match.group(2))) result = {} if self._has_vol: result['VolumeResult'] = volumes result['VolumeUnit'] = "Angstrom^3" if self._has_density: if self._is_gcmc: result['DensityResult'] = [ constants.Conversion.AU_TO_KG * tot_weight / v for tot_weight, v in zip(mass_per_frame, volumes) ] else: result['DensityResult'] = [ constants.Conversion.AU_TO_KG * self._tot_weight / v for v in volumes ] result['DensityUnit'] = "g/cm^3" if self._has_p_tensor: result['PressureTensorResult'] = p_tensors result['PressureTensorUnit'] = 'bar' if self._has_c: # usually the first frame contains no velocities, thus we copy the # values form the second frame kin_energies[0] = kin_energies[1] kin_e = numpy.array(kin_energies) tot_e = kin_e + numpy.array(pot_energies) if self._ensemble == 'NVE': c_type = 'Cv' c = self._getResultsForLastNPercent(kin_e, self._calcSpecHeatNVE) elif self._ensemble == 'NVT': c_type = 'Cv' c = self._getResultsForLastNPercent(tot_e, self._calcSpecHeat) elif self._ensemble == 'NPT': c_type = 'Cp' # 1 bar = 10^5 Pa = 10^5 kg/(m s^2) # 1A^3 = 10^-30 m^3 # PV = 10^5 * 10^-30 J # 1J = 1.44e20 kcal/mol # PV(kcal/mol) = P(bar)*V(A^3)* 1.44e-5 pv = numpy.array(volumes) * self._target_pressure * 1.44e-5 c = self._getResultsForLastNPercent(tot_e + pv, self._calcSpecHeat) else: raise EnergyGroupError('Unable to compute specific heat ' f'when the ensemble is {self._ensemble}') result['SpecificHeatResult'] = [c_type, c] result['SpecificHeatUnit'] = 'J / (g * K)' if {'HEAT_VAP', 'COHES_E'}.intersection(self.key): if self._is_gcmc: cohes_tot_e = [ e / num_mol for e, num_mol in zip(self._tot_energies, n_mols_per_frame) ] else: cohes_tot_e = [e / self._num_mol for e in self._tot_energies] if 'COHES_E' in self.key: result['EneCohesiveResult'] = cohes_tot_e result['EneCohesiveUnit'] = 'kcal/mol' if 'HEAT_VAP' in self.key: RT = self._target_temperature * constants.GAS_CONSTANT result['HeatVaporizationResult'] = [e + RT for e in cohes_tot_e] result['HeatVaporizationUnit'] = 'kcal/mol' if 'SOL_PARAM' in self.key: norm_vol = 10000. / (numpy.array(volumes) * 6.022) sol_param_tot = ( numpy.sqrt(norm_vol * numpy.array(self._tot_energies)) / constants.Conversion.CAL_CM_TO_MPA) sol_param_vdw = ( numpy.sqrt(norm_vol * numpy.array(self._vdw_energies)) / constants.Conversion.CAL_CM_TO_MPA) sol_param_ele = ( numpy.sqrt(norm_vol * numpy.array(self._ele_energies)) / constants.Conversion.CAL_CM_TO_MPA) result['SolubilityParameterResult'] = sol_param_tot.tolist() result['SolubilityParameterVdwSqResult'] = sol_param_vdw.tolist() result['SolubilityParameterEleSqResult'] = sol_param_ele.tolist() result['SolubilityParameterUnit'] = "MPa^(1/2)" result['SolubilityParameterVdwSqUnit'] = "MPa^(1/2)" result['SolubilityParameterEleSqUnit'] = "MPa^(1/2)" return self._times, result
[docs]class PartialCohesiveEnergy(EnergyGroupBase): """ Compute total cohesive energy (not per molecule) for a set of molecules defined by ASL. For this, nonbonding cross terms must be summed up. """
[docs] def __init__(self, cms_model, asl): """ :param str asl: ASL that defines set of molecules. :raise EnergyGroupError: If ASL defines incomplete molecules """ self.key = (asl,) molnums = sorted({ cms_model.atom[x].molecule_number for x in cms_model.select_atom(asl) }) mol_asl = 'mol.num %s' % ','.join(map(str, molnums)) # Molecules defined by ASL must be complete. if (set(cms_model.select_atom(mol_asl)) != set( cms_model.select_atom(asl))): raise EnergyGroupError( 'Cannot perform partial cohesive energy ' 'calculation for ASL that defines incomplete molecules.') self.groups = [ set(cms_model.molecule[molnum].getAtomIndices()) for molnum in molnums ] # cross_energy_only sums up all the cross terms between groups # (in this case a group is a molecule), except cross terms with 0 group self.kwargs = { 'options': {constants.ENERGY_GROUPS.CROSS_ENERGY_ONLY}, 'groups': self.groups, 'parse': self.parseEnergies } self._times = [] self._tot_energies = []
def _reduceEnergies(self, energy_series, num_groups): """ Retrieve total interaction energy as the sum of nonbonding energy (vdw + elec + far). """ time, inter_mol_e = energy_series.popitem() self._times.append(time) # Here all interaction energies should be negative for stable configurations vdw_e = getattr(inter_mol_e, 'nonbonded_vdw', 0) ele_e = (getattr(inter_mol_e, 'nonbonded_elec', 0) + getattr(inter_mol_e, 'far_terms', 0)) tot_en = vdw_e + ele_e self._tot_energies.append(tot_en)
[docs] def parseEnergies(self, fname, num_groups): """ Parse the vrun output.engp file for nonbonding energies. Don't keep everything in the memory. :param str fname: Energy group file name :param int num_groups: Number of groups :return: list of times and dictionary with results :rtype: tuple(list, dict) """ re_energy = re.compile(Bulk.E_PATTERN) with open(fname) as f: one_time_dict = {} for line in f: # total inter-molecular energies match = re_energy.match(line) if match: _parse_cross_energy_line(one_time_dict, match, line, num_groups) if line.startswith('Total'): self._reduceEnergies(one_time_dict, num_groups) # Free memory one_time_dict = {} return self._times, {'EneCohesivePartialResult': self._tot_energies}
[docs] def getResult(self, result): return result
def _parse_header_line(line): """ Parse header line that looks like this: time=19.2 en=3.641e+04 E_p=-1.937e+02 E_k=7.039e+03 E_x=1.071e+02 P=5.999e+02 V=9.654+04 and return a dictionary :param line: a frame header str that comes from energy-group output data file """ data = {} for token in line.split(): if '=' in token: key, value = token.split('=') data[key] = float(value) return data def _parse_p_tensor_line(line): """ Process lines that look like this: Pressure_Tensor(4.800000) -1138.7 39.734 300.07 116.98 -1124.3 -120.67 259.95 -65.07 -963.18 :param line: a pressure tensor line that comes from energy-group output file :return: chemical time and 9 pressure tensor components :rtype: (float, list of floats) """ stripped = line.split('(')[1].split(')') l = stripped[1].split() return (float(stripped[0]), [float(x) for x in l]) def _read_orig_cfg(cfg_file): """ :param cfg_file: path to the simulation configuration file """ try: with open(cfg_file) as f: cfg = sea.Map(f.read()) except Exception as e: raise EnergyGroupError('Reading configuration file %s failed: %s.' % (cfg_file, e)) if 'ORIG_CFG' not in cfg: raise EnergyGroupError('ORIG_CFG is missing from %s' % cfg_file) return copy.copy(cfg['ORIG_CFG']) # first, interval, last are chemical times instead of frame indices SliceParams = namedtuple('SliceParams', 'first interval last') def _prepare_cfg(cfg_file, tr_path, slicing, options, is_gcmc): """ Read the -out.cfg file, and write to a temporary energy.cfg file with the vrun options. :type slicing: `SliceParams` :param set options: Set of energygroup options :type is_gcmc: bool """ cfg = _read_orig_cfg(cfg_file) fout = 'output.engrp' ene_cms_file = 'energy.cms' try: is_external_potential = ( cfg.backend.force.nonbonded.type.val == 'external') except AttributeError: is_external_potential = False if (is_external_potential and constants.ENERGY_GROUPS.SELF_ENERGY_ONLY not in options): raise RuntimeError(f'"{constants.ENERGY_GROUPS.SELF_ENERGY_ONLY}" ' 'energy_groups plugin option must ' 'be enabled for the "external" potential') with open('energy.cfg', 'w') as f: cfg.update('energy_group = on') cfg.update('vrun_frameset="%s"' % tr_path) cfg.update('time="inf"') cfg.update('trajectory.interval=0') cfg.update('backend.vrun.checkpt.interval=inf') cfg.update('backend.vrun.plugin.list = ["energy_groups" "status"]') cfg.update('backend.vrun.plugin.energy_groups.name="%s"' % fout) # frame slicing cfg.update('backend.vrun.plugin.energy_groups.first=%s' % slicing.first) cfg.update('backend.vrun.plugin.energy_groups.interval=%s' % slicing.interval) cfg.update('backend.vrun.plugin.status.first=%s' % slicing.first) cfg.update('backend.vrun.plugin.status.interval=%s' % slicing.interval) cfg.update('backend.vrun.frameset.first=%s' % slicing.first) cfg.update('backend.vrun.frameset.interval=%s' % slicing.interval) if slicing.last: cfg.update('backend.vrun.last_time=%s' % slicing.last) cfg.update('backend.vrun.frameset.last_time=%s' % slicing.last) cfg.update('backend.vrun.plugin.status.last_time=%s' % slicing.last) cfg.update('backend.vrun.plugin.energy_groups.last_time=%s' % slicing.last) is_bulk = constants.ENERGY_GROUPS.SELF_ENERGY_ONLY in options if is_gcmc and is_bulk: options = options | {constants.ENERGY_GROUPS.GCMC_INFO} # Sort options to make them deterministic in the cfg options_str = ('backend.vrun.plugin.energy_groups.options = [%s]' % ' '.join(sorted(options))) cfg.update(options_str) cfg.update('backend.vrun.plugin.maeff_output.bootfile="%s"' % ene_cms_file) f.write(str(cfg)) def _prepare_cms(cms_model, groups): """ Create a temporary energy.cms file with atom property energy group id :type groups: list[set[int]] or list(str(AIDs)) """ atom_groups = [ AtomGroup( cms_model.select_atom_comp('(atom.num %s)' % ','.join(map(str, g))), 'energy', group_id) for group_id, g in enumerate(groups, start=1) if g ] cms_model.set_atom_group(atom_groups) cms_model.write('energy.cms') # FIXME: since Aleks is considering changing the format of energy-group output file # the old parsing code are copied here without much change, i.e., # EnergyComponent, _parse_line(), _parse_energies(), _sort_dict() # _parse_header_line, _parse_p_tensor_line, etc -- DZ Aug.2017
[docs]class EnergyComponent: """ Class for storing different energy components """
[docs] def add(self, term, val): self.__dict__[term] = val
@property def elec(self): return self.nonbonded_elec + getattr(self, 'pair_elec', 0) + getattr( self, 'far_terms', 0) + getattr(self, 'far_exclusion', 0) @property def vdw(self): return self.nonbonded_vdw + getattr(self, 'pair_vdw', 0)
def _parse_self_energy_line(energy_series, energy_match, line, ngroups): """ Parse one line of the energygroup output .engrp file with the `self_energy_only` option on. See docstring of `_parse_line` for arguments Inter-molecular energy is saved in `energy_series[time]` as side effect. """ energy_comp_name = energy_match.group(1) time = float(energy_match.group(2)) if time not in energy_series: energy_series[time] = EnergyComponent() inter_mol_result = energy_series[time] # with n groups specified by user, skip n+3 columns, which are # energy tag, (time), E00, E11 ... Enn, since group 0 is reserved l = line.split() values = l[3:-1] inter_mol = sum(map(float, values)) energy_comp_total = float(l[-1]) inter_mol_result.add(energy_comp_name, energy_comp_total - inter_mol) def _parse_cross_energy_line(energy_series, energy_match, line, ngroups): """ Parse one line of the energygroup output .engrp file with the `cross_energy_only` option on. See docstring of `_parse_line` for arguments Inter-molecular energy is saved in `energy_series[time]` as side effect. """ energy_comp_name = energy_match.group(1) time = float(energy_match.group(2)) if time not in energy_series: energy_series[time] = EnergyComponent() inter_mol_result = energy_series[time] # line is: energy tag, (time), X-Y, Total tmp = line.split() inter_mol_result.add(energy_comp_name, float(tmp[2])) def _parse_line(energy_series, energy_match, line, ngroups): """ Parse one line of the energygroup output file and save the results to `energy_series` :type energy_series: `dict`, key = simulation time, `float` value = Dict[component name, `EnergyComponent`] The component name could be (i, j), or 'all' :param energy_series: Holder of energies result :type energy_match: `re.MatchObject` :param energy_match: The first group is the energy component name and the second group is time :type line: `str` :param line: A single line of text from the energy-group output data file. It will be parsed to get the energy data. Example: angle (0.0) 0.0 0.0 0.0 3.0 1.0 2.5 6.5 :type ngroups: `int` :param ngroups: Number of energy groups """ energy_comp_name = energy_match.group(1) time = float(energy_match.group(2)) if time not in energy_series: energy_series[time] = {} energy_pair = energy_series[time] # with n groups specified by user, skip n+3 columns, which are # energy tag, (time), E00, E01 ... E0n since group 0 is reserved l = line.split() values = l[ngroups + 3:-1] offset = 0 for i in range(ngroups): for j in range(i, ngroups): if frozenset([i, j]) not in energy_pair: energy_pair[frozenset([i, j])] = EnergyComponent() try: v = float(values[offset]) except IndexError: v = 0.0 energy_pair[frozenset([i, j])].add(energy_comp_name, v) offset += 1 energy_comp_total = float(l[-1]) if 'all' not in energy_pair: energy_pair['all'] = EnergyComponent() energy_pair['all'].add(energy_comp_name, energy_comp_total) energy_pair['all'].add('time', energy_comp_total) def _sort_dict(d): """ Convert the dictionary of {time: energy} to a list of two tuples: [(times), (energies)] """ return list(zip(*sorted(d.items()))) def _parse_energies(fname, num_groups): """ :type num_groups: `int` """ energy_term = r"(angle|dihedral|far_exclusion|far_terms|nonbonded_elec|nonbonded_vdw|pair_elec|pair_vdw|stretch|Total)" floating_number = r"([-+]?\b(?:[0-9]*\.)?[0-9]+(?:[eE][-+]?[0-9]+)?\b)" pattern = r"%s\s+\(%s\)" % (energy_term, floating_number) re_energy = re.compile(pattern) energy_series = {} with open(fname) as f: for line in f: match = re_energy.match(line) if match: _parse_line(energy_series, match, line, num_groups) return _sort_dict(energy_series) def _run_vrun(): """ :raise: `EnergyGroupError` """ cmd = ["desmond", "-PROCS", "1", "-in", 'energy.cms', "-c", "energy.cfg"] logger.info("launch vrun: %s" % cmd) job = None try: job = jobcontrol.launch_job(cmd) except Exception as e: logger.info("Failed launching vrun: %s." % e) job and job.wait() try: if job.ExitCode != 0: raise EnergyGroupError("Vrun job exited with code %d, %s" % (job.ExitCode, job.summary())) except AttributeError: raise EnergyGroupError("Vrun job prematurely exited.")
[docs]@contextmanager def temp_dir(): d = tempfile.mkdtemp(dir=fileutils.get_directory_path(fileutils.TEMP)) oldpwd = os.getcwd() os.chdir(d) try: yield d finally: os.chdir(oldpwd) try: debug = int(os.getenv("SCHRODINGER_JOB_DEBUG", "0")) except ValueError: # e.g: int("abc") debug = 0 if debug == 0: shutil.rmtree(d) else: logger.info("%s are kept for debugging purpose." % d)
[docs]def get_energies(sim_cfg, tr_path, cms_model, slicing, groups, parse=_parse_energies, options=None): """ Calculate energies between groups of atoms specified by ASL selections, list of list of AIDs, or molecule number. Note there should be no overlap between any pair of the atom groups. :type slicing: `SliceParams` :type groups: list(set(AIDs)) :type parse: callable :param parse: parser for the energy-group output data file :param set options: Set of energy group options :rtype: tuple(list(float), list(`EnergyComponent`)) :return: simulation times, and various energies for each frame See `EnergyFacade` for other arguments """ if not groups: raise EnergyGroupError('No energy groups were specified') if not _Const.CAN_USE_GPU_DESMOND: raise EnergyGroupError('Energy group calculations require a GPU host.') if len(groups) != 1 and set.intersection(*groups): raise EnergyGroupError('Overlap among groups is not allowed.') if options is None or not options.issubset(constants.ENERGY_GROUPS): raise EnergyGroupError('Unknown analysis options.') # GPU desmond supports self_energy_only option for bulk analysis with temp_dir(): _prepare_cfg(sim_cfg, tr_path, slicing, options, cms_model.is_for_gcmc) _prepare_cms(cms_model, groups) _run_vrun() return parse('output.engrp', len(groups))
[docs]class EnergyFacade: """ A helper class to manage energy group calculations. Different energy group calculations have different input/output formats. All these details are hidden by using this class and the subclasses of `EnergyGroupBase`. To get full control over the energy group calculation, call `get_energies` directly. """
[docs] def __init__(self, sim_cfg, tr_path, cms_model, slicing): """ :type sim_cfg: `str` or `None` :param sim_cfg: Path to Desmond simulaiton configuration. :type tr_path: `str` :param tr_path: Path to trajectory folder :type cms_model: `Cms` :type slicing: `SliceParams` """ self._args = [sim_cfg, tr_path] if sim_cfg: self._args = list(map(os.path.abspath, self._args)) if cms_model.is_for_gcmc: # put inactive molecules into the fsys_ct so that they all count as groups cms_model = Cms(string=cms_model.write_to_string(), remove_inactive_atoms_in_fsys=False) self._args.extend([cms_model, slicing]) self._cache = {} self.times = None
[docs] def get(self, ene): """ :type ene: instance inherited from `EnergyGroupBase` """ if ene.key not in self._cache: if 'SPECIFIC_HEAT' in ene.key or 'PRESSURE_TENSOR' in ene.key: trj_path = self._args[1] tr = traj.read_traj(trj_path) if tr[0].vel() is None: raise EnergyGroupError( 'Velocity is needed for specific heat and pressure tensor' f' calculation. The trajectory {trj_path} does ' 'not contain velocity information.') # Bulk needs simulation parameters for specific heat if 'SPECIFIC_HEAT' in ene.key or 'HEAT_VAP' in ene.key: ene.getCfgParams(self._args[0]) self.times, self._cache[ene.key] = get_energies( *self._args, **ene.kwargs) return ene.getResult(self._cache[ene.key])
[docs]def analyze(tr: List[traj.Frame], cms_model: Cms, *analyzers, sim_cfg: Optional[str] = None): """ Do analyses on the given trajectory `tr`, and return the results. The analyses are specified as one or more positional arguements. Each analyzer should satisfy the interface requirements. :param tr: The simulation trajectory to analyze :param analyzers: A list of analyzer objects :param sim_cfg: Path to Desmond simulation configuration. :rtype: list :return: For a single analyzer, this function will return a list of analysis results, and each element in the list corresponds to the result of the corresponding frame. For multiple analyzers, this function will return a list of lists, and each element is a list of results of the corresponding analyzer. """ results = [] if sim_cfg is not None: sim_cfg = str(Path(sim_cfg).absolute()) with temp_dir(): tr_path = 'energygroup_trj' traj.write_traj(tr, tr_path) ef = EnergyFacade(sim_cfg, tr_path, cms_model, SliceParams(0, 0, 0)) # select all frames for ene in analyzers: try: results.append(ef.get(ene)) except EnergyGroupError as e: print("Cannot compute energygroup: %s." % e) results.append(None) # FIXME: do we really want this special treatment? # (Make consistent with analysis.analyze function) return results[0] if 1 == len(analyzers) else results