"""
Utilities for handling Desmond config files.
Copyright Schrodinger, LLC. All rights reserved.
"""
# Contributors: Yujie Wu
import collections
import warnings
from copy import deepcopy
from past.utils import old_div
import schrodinger.application.desmond.cmj as cmj
import schrodinger.application.desmond.cms as cms
import schrodinger.application.desmond.fep_schedule as fep_schedule
import schrodinger.utils.sea as sea
from schrodinger.application.desmond import constants
from schrodinger.infra import mm
FEP_LAMBDA_SCHEMES = [
"default", "flexible", "charge", "quickcharge", "superquickcharge"
]
DEFAULT_CONFIG = """{
Desmond = {
config_version = 3
}
app = "mdsim"
boot.file = ""
global_cell = {
reference_time = 0.0
topology = periodic
n_replica = 1
partition = [1 1 1]
r_clone = 5.903366
clone_policy = rounded
margin = 1.0
#est_pdens = 0.1
#est_n_atom_per_voxel = 1.0
}
force = {
constraint = {
tol = 1e-08
maxit = 8
}
nonbonded = {
sigma = 2.791856
r_cut = 9.0
n_zone = 1024
near = {
type = default # default | force-only | table | alchemical:softcore | binding:softcore
r_tap = 9.0
taper = none # none | shift | c1switch | c2switch
#average_dispersion = 69.5 # In kcal/mol A^6. If not set, it will be caculated by Desmond.
correct_average_dispersion = true
}
far = {
type = pme # none | pme | gse
sigma_s = 0.85
r_spread = 4.0
n_k = [16 16 16]
order = [4 4 4]
}
}
bonded = {}
virtual = {}
ignore_com_dofs = false # subtract 3 from the degrees of freedom.
term = {
list = []
gibbs = {
type = none # none | alchemical | binding
alpha_vdw = 0.5
window = -1 # window index
weights = {
N = 1 # number of windows
vdw = [] # binding
es = [] # binding
vdwA = [] # alchemical
vdwB = [] # alchemical
qA = [] # alchemical
qB = [] # alchemical
qC = [] # alchemical
bondA = [] # alchemical
bondB = [] # alchemical
}
output = {
name = "fep.dE"
first = 0.0
interval = 0.1
}
}
}
}
migration = {
first = 0.0
interval = 0.012
}
integrator = {
type = Multigrator # Multigrator | Ber_NPT | Ber_NVT | MTK_NPT | NH_NVT | V_NVE | L_NVT | L_NPT | Piston_NPH | anneal
dt = 0.002
temperature = {T_ref = 300.0}
respa = {
near_timesteps = 1
far_timesteps = 3
outer_timesteps = 3
}
pressure = {
isotropy = isotropic # anisotropic | constant_area | isotropic | semi_isotropic | flexible | monoclinic
max_margin_contraction = 0.9
P_ref = 1.01325
#tension_ref = 4E3
}
Multigrator = {
thermostat = { # none or a map
type = NoseHoover # Langevin | NoseHoover
timesteps = 12
Langevin = {
tau = 0.016129 # Gamma = 1/tau = 62 is default -- collision rate of water.
seed = 2012
}
NoseHoover = {
mts = 2
tau = [0.1 0.1 0.1]
}
}
barostat = { # none or a map
type = MTK
timesteps = 48
MTK = {
T_ref = 300.0
tau = 0.05
thermostat = {
type = NoseHoover # NoseHoover | Langevin
NoseHoover = {
mts = 2
tau = [0.1 0.1 0.1]
}
Langevin = {
tau = 0.016129 # Gamma = 1/tau = 62 is default -- collision rate of water.
seed = 2012
}
}
}
}
nve.type = Verlet # none | Verlet | PLS
}
MTK_NPT = {
barostat = {
tau = 2.0
thermostat = {
mts = 2
tau = [1.0 1.0 1.0 ]
}
}
thermostat = {mts = 2 tau = [1.0 1.0 1.0 ]}
}
NH_NVT = {
thermostat = {mts = 2 tau = [1.0 1.0 1.0 ]}
}
Ber_NPT = {
tau = 1.0
min_velocity_scaling = 0.85
max_velocity_scaling = 1.2
barostat = {
tau = 2.0
kappa = 4.5e-05
min_contraction_per_step = 0.95
max_expansion_per_step = 1.1
}
}
Ber_NVT = {
tau = 1.0
min_velocity_scaling = 0.85
max_velocity_scaling = 1.2
}
L_NVT = {
thermostat = {
tau = 0.016129 # Gamma = 1/tau = 62 is default -- collision rate or water.
seed = 2007
}
}
L_NPT = {
thermostat = {
tau = 0.016129 # Gamma = 1/tau = 62 is default.
seed = 2007
}
barostat = {
tau = 1.0
T_ref = 300.0
thermostat = {
tau = 0.016129
seed = 2007
}
}
}
brownie_NVT = {
thermostat.seed = 2014
delta_max = 0.1
}
brownie_NPT = {
thermostat.seed = 2014
delta_max = 0.1
barostat = {
thermostat.seed = 2014
tau = 0.016129
T_ref = 300.0
}
}
Piston_NPH = {
barostat = {
tau = 1.0
T_ref = 300.0
}
}
V_NVE = {}
}
PLUGIN_COMMON = {
list = [status randomize_velocities remove_com_motion eneseq trajectory maeff_output simbox_output]
#
anneal = {
type = anneal
first = 0.0
interval = 1.2
schedule = {
time = [0.0 30.0 60.0 90.0 600.0]
value = [0.0 300.0 600.0 900.0 300.0]
}
}
#
status = {
type = status
first = 0.0
interval = 1.2
}
#
eneseq = {
type = eneseq
name = "system.ene"
first = 0.0
interval = 1.2
}
#
trajectory = {
type = trajectory
name = "out_trj"
write_velocity = false
first = 0.0
interval = 4.8
center = []
glue = []
mode = noclobber # append | noclobber | clobber
periodicfix = true
write_last_step = true
frames_per_file = 250
write_last_vel = false
}
#
maeff_output = {
type = maeff_output
first = 0.0
interval = 120.0
name = ""
write_last_step = true
periodicfix = true
precision = 8
full_system_only = false
trjdir = ""
center_atoms = ""
}
#
maeff_snapshot = {
type = maeff_snapshot
name = ""
first = 0.0
interval = 1.2
}
#
randomize_velocities = {
type = randomize_velocities
first = 0.0
interval = inf
seed = 2007
temperature = 300.0
}
#
simbox_output = {
type = simbox_output
name = ""
first = 0.0
interval = 1.2
}
#
energy_groups = {
type = energy_groups
name = ""
first = 0.0
interval = 1.2
options = [corr_energy ] # [pressure_tensor corr_energy self_energy]
write_report = true
}
#
remove_com_motion = {
type = remove_com_motion
first = 0.0
interval = inf
}
#
ptensor = {
type = ptensor
name = ""
first = 0.0
interval = 1.2
print_volume = false
}
#
dipole_moment = {
type = dipole_moment
first = 0.0
interval = 1.2
name = ""
}
}
CHECKPT_COMMON = {
name = "checkpt.cpt"
first = 0.0
interval = 240.06
wall_interval = 7200.0
write_first_step = false
write_last_step = true
}
mdsim = {
title = "Desmond MD simulation"
last_time = 1200.0 # In the unit of ps
checkpt = "@*.CHECKPT_COMMON"
plugin = "@*.PLUGIN_COMMON"
}
reinit = {
title = "Desmond reinit/MD simulation"
last_time = inf
checkpt = "@*.CHECKPT_COMMON"
plugin = "@*.PLUGIN_COMMON"
frameset = {
first = 0.0
interval = 1200.0
configurations_file = ""
}
}
remd = {
title = "Desmond REMD simulation"
last_time = 1200.0
first = 1.2
interval = 1.2
exchange_type = neighbors # neighbors | random
seed = 2008 # random number seed
cfg = []
checkpt = "@*.CHECKPT_COMMON"
plugin = "@*.PLUGIN_COMMON"
deltaE = {
first = "@*.first"
interval = "@*.interval"
name = "deltaE.txt"
}
}
vrun = {
title = "Desmond Simulation"
input = frameset # bootfile | frameset (|stdin to be a feature added later)
checkpt = "@*.CHECKPT_COMMON"
plugin = "@*.PLUGIN_COMMON"
last_time= inf
frameset = {
first = 0.0
interval = 0.0
name = ""
last_time = "@*.last_time"
}
}
minimize = {
m = 3 # Number of L-BFGS vectors to use
maxsteps = 200 # Max number of iterations
tol = 1.0 # Converging criteria for gradient norm (kcal/mol)
stepsize = 0.005 # Norm of first step
switch = 25.0 # Minimal gradient norm before switching to LBFGS (kcal/mol/Angstrom)
sdsteps = 10 # Min number of initial SD steps
plugin = {
list = [maeff_output eneseq]
#
eneseq = {
type = eneseq
name = "system.ene"
first = 0.0
interval = 1.2
}
#
maeff_output = {
type = maeff_output
first = inf
interval = inf
name = ""
write_last_step = true
periodicfix = true
precision = 8
full_system_only = false
trjdir = ""
center_atoms = ""
}
#
maeff_snapshot = {
type = maeff_snapshot
name = ""
first = 0.0
interval = 1.0
}
energy_groups = {
type = energy_groups
name = ""
first = 0.0
interval = 1.2
options = [corr_energy ] # [pressure_tensor corr_energy self_energy]
write_report = true
}
}
}
gui = {
ewald_tol = 1E-9
}
}"""
[docs]def has_plugin(desmond_exec, plugin_name):
"""
"""
try:
plugin_list = desmond_exec.plugin.list.val
return (plugin_name in plugin_list)
except AttributeError:
pass
return False
[docs]def add_plugin(desmond_exec, plugin_name, position=None):
"""
"""
if (position is None):
plugin_list = desmond_exec.plugin.list
if (plugin_name in plugin_list.val):
return
plugin_list.append(plugin_name)
else:
remove_plugin(desmond_exec, plugin_name)
desmond_exec.plugin.list.insert(position, plugin_name)
[docs]def remove_plugin(desmond_exec, plugin_name):
"""
"""
plugin_list = desmond_exec.plugin.list
to_be_removed = []
for i, p in enumerate(plugin_list):
if (p.val == plugin_name):
to_be_removed.append(i)
to_be_removed.reverse()
for i in to_be_removed:
plugin_list.pop(i)
[docs]def get_homebox(box, cpu_top):
"""
"""
x = cpu_top[0]
y = cpu_top[1]
z = cpu_top[2]
a = [
old_div(box[0], x),
old_div(box[3], y),
old_div(box[6], z),
]
b = [
old_div(box[1], x),
old_div(box[4], y),
old_div(box[7], z),
]
c = [
old_div(box[2], x),
old_div(box[5], y),
old_div(box[8], z),
]
return a + b + c
[docs]def is_triclinic_box(box):
"""
"""
box_ = deepcopy(box)
box_[0] = 0.0
box_[4] = 0.0
box_[8] = 0.0
for b in box_:
if (b != 0.0):
return True
return False
# lambda exp( x/2.95) * 0.024-0.024
# 0 0
# 1 0.0096844694
# 2 0.023276811
# 3 0.04235393
# 4 0.069129038
# 5 0.10670843
# 6 0.15945183
# 7 0.23347823
# 8 0.33737574
# 9 0.48319791
# 10 0.68786219
# 11 0.9751125
[docs]def optimize_key(msj, cfg, model):
"""
Optimizes the simulation parameters in 'cfg', where 'cfg' must represent
a complete config file.
"""
import math
r_cut = cfg.force.nonbonded.r_cut.val
cpu = cfg.global_cell.partition.val
dt = cfg.integrator.dt.val
far_ts = cfg.integrator.respa.far_timesteps.val
far_dt = dt * far_ts
out_dt = dt * cfg.integrator.respa.outer_timesteps.val
epsilon = min(cfg.gui.ewald_tol.val, 0.1)
# Below is the idea how to optimize the margin. -YW
# 1. The "margin" parameter's value can be optimized as k * rmsm + C, where k is a prefactor, rmsm is the root mean square
# motion, C is the maximum distance that a particle can travel without colliding to another one. Below, let's first
# consider the root mean square motion, and then k and C.
# 2. The root-mean-square motion (rmsm) can be calculated from the diffusion constant using the equation:
# rmsm = sqrt( <x^2> - <x>^2 ) = sqrt( 6 * D * dt )
# where D is diffusion constant, and dt is the time interval.
# 3. How to get dt and D?
# - dt is easy. We just get its value from `cfg' since it is a settable parameter.
# - D is a little bit troublesome. First of all, different molecules have different diffusion constants. In principle,
# We should use the largest D, but unfortunately we don't rigorously know what the largest D is. Nevertheless, for
# aqueous systems, it is probably safe to assume water is the most diffusive molecule. So we use water's D.
# - A more difficult problem is that D is temperature-dependent. For water molecules, D = 0.0187 (T = 242.5 K),
# 0.105 (T = 273.5 K), 0.223 (T = 298.0 K), 0.3575 (T = 318.0 K). The unit of D here is A^2/ps. So we need to somehow
# predict D based on the temperature. For this, we can use the Frenkel-Eyring theory, which basically says
# D = A * exp( -E / RT )
# where A and E presumably are constants (this assumption is an oversimplification, but should be good enough for our
# purpose here). By fitting to the experimental data, we get A = 807.015 A^2/ps and E = 4.87 kcal/mol.
# 4. For very high temperature, D is overestimated a lot by the Frenkel-Eyring theory. So we use the following empirical
# formula to correct the result: D = D - 0.03 * (T - 350), for T > 350.
# 5. How do we choose value for the prefactor k?
# - First of all, commonly used water models such as TIP3P and SPC tend to overestimate water diffusion constant. So to
# be safe, we should assume the D of water model is at least 2 times the experimental value.
# - Second, remember that mean square displacement formula for D is correct only for time -> inf; for time -> 0, the
# value would be much larger, presumedly ~100% larger.
# - The above thoughts lead to a working value for diffusion constant = 4 D, where D is the experimental value. So this
# gives k = sqrt( 4 ) = 2.
# 6. How about C?
# - For the value of C, we can look at the radial distribution function. The width of the first peak is the maximum
# radial space that first shell of water molecules can occupy. Because for any direction only one molecule can occupy
# that radial space, we can safely take the width as the value of C. For liquid water, the value is ~0.4 Angstrom.
# Gets the highest temperature.
desmond_exec = cfg[cfg.app.val]
if (has_plugin(desmond_exec, "anneal")):
temp = max(desmond_exec.plugin.anneal.schedule.value.val)
else:
if ("T_groups" in cfg.integrator.temperature):
temp_list = []
for e in cfg.integrator.temperature.T_groups:
temp_list.append(e.T_ref.val)
temp = max(temp_list)
else:
temp = cfg.integrator.temperature.T_ref.val
box_x, box_y, box_z = cms.get_boxsize(model.box)
homebox = get_homebox(model.box, cpu)
diffusion = 807.015 * math.exp(-4.87 / 1.986E-3 / temp) - 0.03 * max(
0, temp - 350)
migration_interval = cfg.migration.interval.val
rmsm = math.sqrt(6 * diffusion * migration_interval)
margin = rmsm * 2.0 + 0.4
# We don't want to transfer a small amount of data between nodes very frequently. This is because of the transfer latency
# overhead, which might exceed the actual data-transferring time. On the other hand we don't want a big margin either,
# because a lot of particle pairs would have distances larger than the cutoff. This thought leads to a further checking:
# If the margin is still ``big'' and the migration interval can be decreased, we will reduce the margin further more.
# How ``big'' is ``big''?
# - An arbitrary yet reasonable choice is this: If the margin causes > 50% extra computation, we consider it as big.
# - pow( 1.5, 1.0/3.0 ) = 1.1447142425533319
while (margin > r_cut * 0.1447142425533319 and
migration_interval >= far_dt * 1.9):
migration_interval -= far_dt
# Adjusts the migration interval to be `N x out_dt', where `N' is an integer number >= 1.
N = int(old_div(migration_interval, out_dt))
N = 1 if (1 > N) else N
migration_interval = N * out_dt
rmsm = math.sqrt(6 * diffusion * migration_interval)
margin = rmsm * 2.0 + 0.4
r_clone = (r_cut + margin) * 0.5
cfg.global_cell.margin.val = margin
cfg.migration.interval.val = migration_interval
r_cut4 = r_cut * 4
if (is_triclinic_box(model.box) and
(box_x < r_cut4 or box_y < r_cut4 or box_y < r_cut4)):
cfg.global_cell.clone_policy.val = "unrounded"
else:
cfg.global_cell.clone_policy.val = "rounded"
if (cfg.force.nonbonded.far.type.val != "none"):
toler = math.sqrt(abs(math.log(epsilon * r_cut)))
sigma = old_div(math.sqrt(abs(math.log(epsilon * r_cut * toler))),
r_cut)
toler = math.sqrt(-math.log(epsilon * r_cut * (2.0 * toler * sigma)**2))
fac = sigma * toler / 3.1415926
n_k = [
0.25 + fac * box_x,
0.25 + fac * box_y,
0.25 + fac * box_z,
]
for i, n in enumerate(n_k):
p = cpu[i]
n = int(math.ceil(n))
# Each grid dimension must satisfy the following conditions:
# 1. if p is given, it must divide n
# 2. n must have no integer factors greater than 2, 3, or 5
while (True):
# check that n is commensurate with p
if (p):
if (n % p):
n += 1
continue
N = n
for radix in (
2,
3,
5,
):
while (not N % radix):
N /= radix
if (N == 1):
break
n += 1
continue
n_k[i] = n
cfg.force.nonbonded.far.n_k.val = n_k
cfg.force.nonbonded.sigma.val = old_div(1, sigma)
pme_order = cfg.force.nonbonded.far.order.val
margin_ = old_div(margin, 1.414)
if (cfg.force.nonbonded.far.type.val[-3:] == "gse"):
r_clone = max(r_clone, cfg.force.nonbonded.far.r_spread.val + margin_)
r_lazy = r_clone * 2.0
elif (cfg.force.nonbonded.far.type.val[-3:] == "pme"):
a = cpu[0] * (pme_order[0] - 1) * 0.5 / n_k[0]
b = cpu[1] * (pme_order[1] - 1) * 0.5 / n_k[1]
c = cpu[2] * (pme_order[2] - 1) * 0.5 / n_k[2]
tmpx = homebox[0] * a + homebox[1] * b + homebox[2] * c
tmpy = homebox[3] * a + homebox[4] * b + homebox[5] * c
tmpz = homebox[6] * a + homebox[7] * b + homebox[8] * c
r_pme = math.sqrt(tmpx * tmpx + tmpy * tmpy + tmpz * tmpz) + margin_
r_clone = max(r_clone, r_pme + 0.01)
mmc = cfg.integrator.pressure.max_margin_contraction.val
# Ensures r_clone * max_margin_contraction > r_pme if the simulation is NPT.
if (r_clone * mmc < r_pme and -1 < cfg.integrator.type.val.find("_NP")):
delta = 0.0
while (old_div(r_pme, r_clone) > 0.97 and delta < 0.3):
delta += 0.001
r_clone += 0.001
mmc = old_div(r_pme, r_clone)
cfg.integrator.pressure.max_margin_contraction.val = mmc
try:
if (msj.bigger_rclone.val):
r_clone /= mmc
except AttributeError:
pass
r_lazy = r_clone * 2.0
if (r_lazy > old_div(box_x, cpu[0]) and
r_lazy > old_div(box_y, cpu[1]) and
r_lazy > old_div(box_z, cpu[2])):
print("WARNING: Cutoff radius is too large relative to box size.\n" \
" Try either increasing box size or reducing cutoff radius.")
cfg.global_cell.r_clone.val = r_clone + 1E-6
[docs]def parse_lambda(s):
"""
"""
try:
scheme, num_window = s.split(':')
except ValueError:
raise ValueError(
"value not in the form of '<scheme>:<number of windows>'")
scheme = scheme.strip().lower()
if (scheme not in FEP_LAMBDA_SCHEMES):
raise ValueError("unknown lambda scheme '%s'" % scheme)
try:
num_window = int(num_window)
except ValueError:
raise ValueError(
"value not in the form of '<scheme>:<number of windows>'")
if (num_window < 1):
raise ValueError("number of lambda windows is less than 1")
return scheme, num_window
[docs]def lambda_type(lambda_):
"""
"""
if (isinstance(lambda_, sea.Atom)):
return "generic"
has_vdw = ("vdw" in lambda_)
has_vdwA = ("vdwA" in lambda_)
if (has_vdw and has_vdwA):
return "ambiguous"
if (not has_vdw and not has_vdwA):
return "wrong"
if (not has_vdw and has_vdwA):
return "alchemical"
if (has_vdw and not has_vdwA):
return "binding"
return "impossible"
[docs]def num_lambda_window(lambda_):
"""
"""
if (isinstance(lambda_, sea.Atom)):
scheme, num_window = parse_lambda(lambda_.val)
else:
try:
num_window_vdw = len(lambda_.vdw)
except AttributeError:
num_window_vdw = None
try:
num_window_vdwA = len(lambda_.vdwA)
except AttributeError:
num_window_vdwA = None
if (num_window_vdw is not None and num_window_vdwA is not None):
raise ValueError("ambiguous lambda schedule")
if (num_window_vdw is None and num_window_vdwA is None):
raise ValueError("lambda schedule not set")
if (num_window_vdw is None):
num_window = num_window_vdwA
else:
num_window = num_window_vdw
return num_window
[docs]def get_fep_lambda_schedule(lambda_, fep_type):
"""
"""
if (isinstance(lambda_, sea.Atom)):
val = lambda_.val
if (isinstance(val, bool) and val):
val = "default:12"
if (val):
scheme, n_win = parse_lambda(val)
return get_fep_lambdas(n_win, fep_type, scheme)
else:
if (fep_type != lambda_type(lambda_)):
raise ValueError("Wrong lambda schedule type")
if (fep_type == "alchemical"):
la = collections.namedtuple(
'Helper',
['vdwA', 'vdwB', 'chargeA', 'chargeB', 'bondedA', 'bondedB'])
return la(vdwA=lambda_.vdwA.val,
vdwB=lambda_.vdwB.val,
chargeA=lambda_.chargeA.val,
chargeB=lambda_.chargeB.val,
bondedA=lambda_.bondedA.val,
bondedB=lambda_.bondedB.val)
else:
la = collections.namedtuple('Helper', ['vdw', 'coulomb'])
return la(vdw=lambda_.vdw.val, coulomb=lambda_.coulomb.val)
def _xchk_check_lambda(map, valid, ev, prefix):
"""
"""
try:
parse_lambda(map.val)
except ValueError as e:
ev.record_error(prefix, "Wrong value: %s" % str(e))
def _xchk_check_iwindow(map, valid, ev, prefix):
"""
"""
try:
num_window = num_lambda_window(ev._map.fep.lambda_)
except ValueError as e:
ev.record_error(prefix, "Wrong values: %s" % str(e))
else:
v = map.val
if (v >= num_window):
ev.record_error(
prefix + "[" + str(v) + "]",
"Value out of range: expecting within [0, %d), but got '%d'" % (
num_window,
v,
))
def _xchk_check_annealing(map, valid, ev, prefix):
"""
"""
if (is_remd(map.parent()) and map.val):
ev.record_error(prefix, "Cannot run similuated annealing with REMD")
def _xchk_check_remd_temp_generator(map, valid, ev, prefix):
"""
"""
try:
n = 0
if ("highest_temperature" in map):
n += 1
if ("exchange_probability" in map):
n += 1
if ("n_replica" in map):
n += 1
if (n != 2):
raise SyntaxError(
"Invalid setting for temperature-ladder generator")
except Exception as e:
ev.record_error(prefix, "%s" % e)
def _xchk_check_remd_temp_generator2(map, valid, ev, prefix):
"""
"""
try:
n = 0
if ("generator" in map and "PvdS" != map.generator.val):
raise SyntaxError("Invalid temperature-ladder generator")
if ("highest_temperature" in map):
n += 1
if ("exchange_probability" in map):
n += 1
if (n != 1):
raise SyntaxError(
"Invalid setting for temperature-ladder generator")
except Exception as e:
ev.record_error(prefix, "%s" % e)
def _xchk_check_gcmc_region(param, valid, ev, prefix):
if 'whole_box_frequency' in param:
# TODO for some reason param.global_switching.has_tag("setbyuser") is
# always true even when it is not set, but the filtered keys work.
if param.global_switching.keys(tag="setbyuser"):
ev.record_error(
prefix, "Cannot define both global_switching and deprecated "
"whole_box_frequency")
return
warnings.warn(
UserWarning(
f"{prefix[1:]}: whole_box_frequency is deprecated, please use "
f"global_switching.frequency"))
param.global_switching.frequency = deepcopy(param.whole_box_frequency)
del param.whole_box_frequency
def _xchk_check_forcefield(param, valid, ev, prefix):
# Allowed alternate force fields
if param.val in (
"CHARMM",
"AMBER",
"amber03",
"amber99",
"amber94",
"amber96",
"amber99SB",
"amber99SB-ILDN",
"charmm22nocmap",
"charmm22star",
"charmm36_lipids",
"charmm36_protein",
"charmm27",
"charmm32",
"oplsaa_ions_Jensen_2006",
"oplsaa_impact_2005",
):
return
_xchk_check_opls_forcefield(param, valid, ev, prefix)
def _xchk_check_opls_forcefield(param, valid, ev, prefix):
# Valid OPLS force field names will correctly map to an OPLS version
try:
mm.opls_name_to_version(param.val)
except IndexError as e:
ev.record_error(prefix, str(e))
sea.reg_xcheck("check_lambda", _xchk_check_lambda)
sea.reg_xcheck("check_iwindow", _xchk_check_iwindow)
sea.reg_xcheck("check_annealing", _xchk_check_annealing)
sea.reg_xcheck("check_remd_temp_generator", _xchk_check_remd_temp_generator)
sea.reg_xcheck("check_remd_temp_generator2", _xchk_check_remd_temp_generator2)
sea.reg_xcheck("check_gcmc_region", _xchk_check_gcmc_region)
sea.reg_xcheck("check_forcefield", _xchk_check_forcefield)
sea.reg_xcheck("check_opls_forcefield", _xchk_check_opls_forcefield)
MD = """
##################################################
DATA = {
fep = {
lambda = "default:12"
i_window = ?
output = {
name = "$JOBNAME$[_replica$REPLICA$].dE"
first = 0.0
interval = 1.2
}
trajectory = {
record_windows = [0 -1]
}
}
box = ?
cutoff_radius = 9.0
bigger_rclone = no
taper = off
coulomb_method = useries
temperature = 300.0
annealing = off
pressure = 1.01325
surface_tension = 0.0
ensemble = NPT
time = 1200.0
elapsed_time = 0.0
timestep = [0.002 0.002 0.006]
cpu = 1
glue = "solute"
trajectory = {
name = "$JOBNAME$[_replica$REPLICA$]_trj"
first = 0.0
interval = 4.8
periodicfix = true
write_velocity = false
frames_per_file= 250
center = []
format = dtr
write_last_vel = false
}
eneseq = {
name = "$JOBNAME$[_replica$REPLICA$].ene"
first = 0.0
interval = 1.2
}
checkpt = {
name = "$JOBNAME.cpt"
first = 0.0
interval = 240.06
write_last_step = yes
}
maeff_output = {
name = "$JOBNAME$[_replica$REPLICA$]-out.cms"
trjdir = "$JOBNAME$[_replica$REPLICA$]_trj"
first = 0.0
interval = 120.0
periodicfix = true
center_atoms = "solute"
}
randomize_velocity = {
first = 0.0
interval = inf
seed = 2007
temperature = '@*.temperature'
}
simbox = {
name = "$JOBNAME$[_replica$REPLICA$]_simbox.dat"
first = 0.0
interval = 1.2
}
# energy_group = {
# name = "$JOBNAME$[_replica$REPLICA$]_enegrp.dat"
# first = 0.0
# interval = 1.2
# self_energy = false
# corr_energy = true
# }
energy_group = off
pressure_tensor = off
dipole_moment = off
meta = off
meta_file = ?
backend = {
}
restrain = none
restraints = {
existing = ignore
new = []
}
}
##################################################
VALIDATE = {
fep = {
lambda = [
{type = str range = [3 10000000000] _check = check_lambda}
{vdw = {type = list size = [sizeof @DATA.fep.lambda.coulomb] elem = {type = float0_1}}
coulomb = {type = list size = [sizeof @DATA.fep.lambda.vdw ] elem = {type = float0_1}}
}
{vdwA = {type = list size = [sizeof @DATA.fep.lambda.vdwB ] elem = {type = float0_1}}
vdwB = {type = list size = [sizeof @DATA.fep.lambda.chargeA] elem = {type = float0_1}}
chargeA = {type = list size = [sizeof @DATA.fep.lambda.chargeB] elem = {type = float0_1}}
chargeB = {type = list size = [sizeof @DATA.fep.lambda.bondedA] elem = {type = float0_1}}
bondedA = {type = list size = [sizeof @DATA.fep.lambda.bondedB] elem = {type = float0_1}}
bondedB = {type = list size = [sizeof @DATA.fep.lambda.vdwA ] elem = {type = float0_1}}
}
]
i_window = [
{type = none}
{type = int0 _check = check_iwindow}
]
output = {
name = {type = str1}
first = {type = "float+"}
interval = {type = "float+"}
}
trajectory = {
record_windows = [
{type = list size = 0 elem = {type = int}}
{type = enum range = [all]}
]
}
}
box = [{type = list elem = {type = "float+"}} {type = none}]
cutoff_radius = {type = "float+"}
bigger_rclone = {type = bool}
taper = [
{type = bool0}
{method = {type = enum range = [shift c1switch c2switch]}
width = {type = "float+"}
}
]
coulomb_method = [
{type = enum range = [pme cutoff useries]}
{type = list size = 2 elem = [{type = enum range = [pme]} {type = "float+"}]}
]
temperature = [
{_if = ["!" @.DATA.annealing] type = "float+"}
{_if = @.DATA.annealing type = list size = -1
elem = {type = list size = 2 elem = {type = "float+"}}
}
{_if = ["!" @.DATA.annealing] type = list size = -1
elem = {type = list size = 2 elem = [{type = "float+"}
{type = int range = [0 7]}
]
}
}
#{_if = @.DATA.replica type = list size = -2 elem = {type = "float+"}}
]
annealing = {type = bool _check = check_annealing}
pressure = [
{type = "float+"}
{type = list size = 2
elem = [
{type = "float+"}
{type = enum range = [anisotropic isotropic flexible monoclinic]}
]
}
]
surface_tension = {type = "float+"}
ensemble = [
{type = enum range = [NPT NVT NVE NPgT NPAT NPT_L NVT_L NPT_Brownie NVT_Brownie NVT_DPD]}
{class = {type = enum range = [NVE]}}
{_enforce = [class method]
class = {type = enum range = [NVE NPT NVT NPgT NPAT]}
method = {type = enum range = [MTK NH Langevin Brownie DPD]}
thermostat = {
tau = {type = "float+"}
}
barostat = {
tau = {type = "float+"}
}
brownie = {
delta_max = {type = "float+"}
}
}
]
time = {type = "float+"}
elapsed_time = {type = "float+"}
timestep = {type = list size = 3 elem = {type = "float+"}}
cpu = [
{type = int1}
{type = list size = 3
elem = {type = int1}}
]
glue = [
{type = enum range = [solute retain none]}
#{type = str1}
#{type = list size = 2 elem = {type = str1}}
]
trajectory = [
{type = bool0}
{name = {type = str1}
first = {type = "float+"}
interval = {type = "float+"}
periodicfix = {type = bool}
write_velocity = {type = bool}
write_last_vel = {type = bool}
frames_per_file= {type = int1}
center = [
{type = list size = 0 elem = {type = int1}}
{type = str1}
]
format = {type = enum range = [dtr DTR xtc XTC]}
}
]
eneseq = [
{type = bool0}
{name = {type = str1}
first = {type = "float+"}
interval = {type = "float+"}
precision= {type = int1}
}
]
checkpt = [
{type = bool}
{name = {type = str1}
first = {type = "float+"}
interval = {type = "float+"}
wall_interval = {type = "float+"}
write_last_step = {type = bool}
}
]
maeff_output = [
{type = bool0}
{name = {type = str1}
trjdir = {type = str1}
first = {type = "float+"}
interval = {type = "float+"}
periodicfix = {type = bool}
center_atoms = [
{type = list size = 0 elem = {type = int1}}
{type = str}
]
}
]
randomize_velocity = {
_skip = [temperature]
first = {type = "float+"}
interval = {type = "float+"}
seed = {type = int}
}
simbox = [
{type = bool}
{name = {type = str1}
first = {type = "float+"}
interval = {type = "float+"}
}
]
energy_group = [
{type = bool}
{name = {type = str1}
first = {type = "float+"}
interval = {type = "float+"}
self_energy = {type = bool}
corr_energy = {type = bool}
}
]
pressure_tensor = [
{type = bool}
{name = {type = str1}
first = {type = "float+"}
interval = {type = "float+"}
print_volume = {type = bool}
}
]
dipole_moment = [
{type = bool}
{name = {type = str1}
first = {type = "float+"}
interval = {type = "float+"}
}
]
meta_file = [{type = str1} {type = none}]
meta = [
{type = enum range = [FILE]}
{height = {type = "float+"}
first = {type = "float+"}
interval = {type = "float+"}
last = {type = "float+"}
kTemp = {type = "float+"}
name = {type = str1}
cv_name = {type = str }
cv = {type = list
elem = {type = {type = enum range = [dist angle dihedral rmsd rmsd_alt rmsd_symm zdist0 zdist
rgyr rgyr_mass whim1 whim2]}
atom = {type = list size = -1 elem = {type = str1}}
width = {type = float+}
floor = {type = float }
wall = {type = float }
}
size = 0
}
}
{type = bool0}
]
backend = [
{_skip = all}
{type = none}
]
restrain = [
{type = enum range = [retain none]}
{_mapcheck = check_restrain _skip = all}
{type = list size = -1
elem = {_mapcheck = check_restrain _skip = all}
}
]
restraints = {
existing = {type = enum range = [retain ignore ignore_posre]}
new = {type = list size = 0 elem = {_skip = all}}
}
}"""
MINIMIZE = [
MD,
"""
DATA = {
max_steps = 2000
convergence = 1.0
steepest_descent_steps = 10
num_vector = 3
maeff_output.first = inf
maeff_output.name = "$JOBNAME-out.cms"
}
VALIDATE = {
max_steps = {type = int1}
convergence = {type = "float+"}
steepest_descent_steps = {type = int0}
num_vector = {type = int1}
}""",
]
BROWNIE_MINIMIZE = [
MD,
"""
DATA = {
ensemble.brownie.delta_max = 0.1
ensemble.method = Brownie
ensemble.class = NVT
ensemble.thermostat.tau = 1.0
time = 100.0
timestep = [0.001 0.001 0.003 ]
maeff_output.name = "$JOBNAME-out.cms"
maeff_output.first = inf
temperature = 10.0
}""",
]
REMD = [
MD,
"""
DATA = {
replica = [
{model_file = ? temperature = 300}
{model_file = ? temperature = 310}
]
}
VALIDATE = {
replica = [{type = list size = -1}
{generator = {type = enum range = [solute_tempering parallel_tempering]}
atom = {type = str1}
temperature = [
{generator = {type = enum range = [PvdS]}
highest_temperature = {type = float+ }
exchange_probability = {type = float0_1}
n_replica = {type = int0 }
_mapcheck = check_remd_temp_generator
}
{type = list size = -1 elem = {type = float+}}
]
}
]
}
""",
]
META = [
MD, """
DATA = {
meta = {
height = 0.03
first = 0.0
interval = 0.09
name = "$JOBNAME$[_replica$REPLICA$].kerseq"
cv_name = "$JOBNAME$[_replica$REPLICA$].cvseq"
cv = []
}
}
"""
]
VRUN = [
MD,
"""
DATA = {
vrun_frameset = "$FRAMESET"
}
VALIDATE = {
vrun_frameset = [
{type = none}
{type = str1}
]
}
""",
]
REINIT = [
MD,
"""
DATA = {
reinit_frameset = {
first = 0.0
interval = 1200.0
configurations_file = "$REINIT_CONFIGURATIONS_FILE"
}
}
VALIDATE = {
reinit_frameset = {
first = {type = "float+"}
interval = {type = "float+"}
configurations_file = {type = "str1"}
}
}
""",
]
CONCAT = [
MD, """
DATA = {
simulate = []
}
VALIDATE = {
simulate = {type = list}
}
"""
]
GCMC = [
MD,
"""
DATA = {
gcmc = {
first = 0.0
interval = 4.8
ene_name = "$JOBNAME$[_replica$REPLICA$]_gcmc.ene"
moves = {
moves_per_cycle = 5000
# number of trans/rot moves to ins/del moves
# 1:2 = 1:1:1 ratio (trans:ins:del)
# a:b = a:b/2:b/2 such that ins/del is fixed
### DISABLED UNTIL FUNCTIONALITY RESTORED
# trans_to_ins_del_ratio = 0:1
# post_insertion_translation_moves = 0
}
# the chemical potential in kcal/mol
mu_excess = -6.180
# the number density of the solvent in Angstroms^-3
solvent_density = 0.03262
gcmc_region = {
track_voids = true
cell_size = .22
exclusion_radius = 2.2
# setting this to a value larger than half the largest box dimension
# will cause gcmc to be done over the whole box.
region_buffer = 4.0
# randomly switch to performing global GCMC, aka over the whole
# simulation box.
# has no effect if system is already "whole box" as determined by
# the region_buffer and/or the absence of "gcmc_ligand" atoms.
global_switching = {
# percentage of time to do global
frequency = 0.2
# when performing random switches, multiply the number of
# attempts by this amount
move_factor = 3.0
# when performing random switches, multiply the spacing by this
# amount to conserve memory
spacing_factor = 2.0
}
}
verbose = 0
# use random seed by default for multisim for testing statistics
seed = random
solvent = {
s_file = ''
}
ligand_file = ?
}
}
VALIDATE = {
gcmc = {
first = {type = "float+"}
interval = {type = "float+"}
ene_name = {type = "str1"}
moves = {
moves_per_cycle = {type = int1}
# trans_to_ins_del_ratio = {type = str}
# post_insertion_translation_moves = {type = int}
}
mu_excess = {type = float}
solvent_density = {type = "float+"}
gcmc_region = {
track_voids = {type = bool}
cell_size = {type = "float+"}
exclusion_radius = {type = "float+"}
region_buffer = {type = "float+"}
global_switching = {
frequency = {type = "float0_1"}
move_factor = {type = "float+"}
spacing_factor = {type = "float+"}
}
# deprecated in favor of above spec
whole_box_frequency = {type = "float0_1"}
_mapcheck = check_gcmc_region
}
verbose = {type = enum range = [0 1 2]}
seed = [
{type = int}
{type = str}
]
solvent = {
# we cannot use multisim file exists because the file is not
# guaranteed to exist in the launch directory (it may be in the
# ALT_DIR). But we don't need to worry, because the ADD_FILE
# mechanism checks this anyway, and looks in the ALT_DIR.
s_file = {type = str}
num_copies = {type = int1}
}
ligand_file = [
{type = none}
{type = str1}
]
}
}""",
]
class _create_when_needed(object):
def __init__(self, name, should_return_data=True):
self._name = name
self._should_return_data = should_return_data
def __get__(self, obj, cls):
if (self._name == "MD"):
obj = sea.Map(MD)
else:
obj = sea.Map()
obj["MD"] = cls.MD
obj["VALIDATE"] = cls.VALIDATE_MD
obj = deepcopy(obj)
obj.update(globals()[self._name])
setattr(cls, self._name, obj.DATA)
setattr(cls, "VALIDATE_" + self._name, obj.VALIDATE)
return obj.DATA if (self._should_return_data) else obj.VALIDATE
[docs]class DEFAULT_SETTING(object):
"""
"""
MD = _create_when_needed("MD")
MINIMIZE = _create_when_needed("MINIMIZE")
BROWNIE_MINIMIZE = _create_when_needed("BROWNIE_MINIMIZE")
REMD = _create_when_needed("REMD")
META = _create_when_needed("META")
VRUN = _create_when_needed("VRUN")
REINIT = _create_when_needed("REINIT")
CONCAT = _create_when_needed("CONCAT")
GCMC = _create_when_needed("GCMC")
VALIDATE_MD = _create_when_needed("MD", False)
VALIDATE_MINIMIZE = _create_when_needed("MINIMIZE", False)
VALIDATE_BROWNIE_MINIMIZE = _create_when_needed("BROWNIE_MINIMIZE", False)
VALIDATE_REMD = _create_when_needed("REMD", False)
VALIDATE_META = _create_when_needed("META", False)
VALIDATE_VRUN = _create_when_needed("VRUN", False)
VALIDATE_REINIT = _create_when_needed("REINIT", False)
VALIDATE_CONCAT = _create_when_needed("CONCAT", False)
VALIDATE_GCMC = _create_when_needed("GCMC", False)
[docs]def get_default_setting(msj):
"""
"""
gcmc_map = sea.Map()
if is_gcmc(msj):
gcmc_map = DEFAULT_SETTING.GCMC
if is_minimize(msj):
return DEFAULT_SETTING.MINIMIZE
if is_remd(msj):
return DEFAULT_SETTING.REMD.update(gcmc_map)
if is_vrun(msj):
return DEFAULT_SETTING.VRUN
if is_reinit(msj):
return DEFAULT_SETTING.REINIT
if is_concat(msj):
return DEFAULT_SETTING.CONCAT.update(gcmc_map)
return DEFAULT_SETTING.MD.update(gcmc_map)
def _tag(map, tag, keys):
for k in keys:
if k in map:
map[k].add_tag(tag)
[docs]def tag_sim_map(map, sim_tag_spec, **kwargs):
tag = kwargs.get("tag", sim_tag_spec.tag)
_tag(map, tag, sim_tag_spec.keys)
class _create_tag_when_needed(object):
def __init__(self, *args, **kwargs):
self.tag = TagSpec(*args, **kwargs)
def __get__(self, obj, cls):
return deepcopy(self.tag)
_minimize_remove_keys = [
"temperature",
"annealing",
"pressure",
"surface_tension",
"ensemble",
"time",
"elapsed_time",
"timestep",
"trajectory",
"checkpt",
"randomize_velocity",
"simbox",
]
_brownie_remove_keys = ["annealing", "pressure", "surface_tension", "simbox"]
[docs]class TAG_SPECS(object):
md = _create_tag_when_needed("MD")
minimize = _create_tag_when_needed("MINIMIZE", _minimize_remove_keys)
brownie_minimize = _create_tag_when_needed("BROWNIE_MINIMIZE",
_brownie_remove_keys)
remd = _create_tag_when_needed("REMD", ["annealing"])
meta = _create_tag_when_needed("META")
vrun = _create_tag_when_needed("VRUN")
reinit = _create_tag_when_needed("REINIT")
concat = _create_tag_when_needed("CONCAT")
gcmc = _create_tag_when_needed("GCMC")
[docs]def is_minimize(msj):
"""
"""
return ("steepest_descent_steps" in msj)
[docs]def is_remd(msj):
"""
"""
return ("replica" in msj)
[docs]def is_vrun(msj):
"""
"""
return ("vrun_frameset" in msj and msj.vrun_frameset.val is not None)
[docs]def is_reinit(msj):
"""
Does `msj` require the "reinit" app?
"""
return ("reinit_frameset" in msj and
isinstance(msj.reinit_frameset, sea.Map))
[docs]def is_concat(msj):
"""
"""
return "simulate" in msj
[docs]def is_gcmc(msj):
return "gcmc" in msj
[docs]def is_fep(msj):
md_for_fep = msj.val.get("backend", {}).get("is_for_fep", False)
return "fep" in msj or md_for_fep
[docs]def get_exec(msj, cfg):
"""
"""
if is_minimize(msj):
return cfg.minimize
if is_remd(msj):
return cfg.remd
if is_vrun(msj):
return cfg.vrun
if is_reinit(msj):
return cfg.reinit
return cfg.mdsim
[docs]def get_exec_name(msj):
"""
"""
if is_minimize(msj):
return "minimize"
if is_remd(msj):
return "remd"
if is_vrun(msj):
return "vrun"
if is_reinit(msj):
return "reinit"
return "mdsim"
[docs]def num_replica(msj, model, printer=None):
"""
"""
if (not is_remd(msj)):
return 1
if (isinstance(msj.replica, sea.Map)):
generator = msj.replica.generator.val
if (generator in [
"solute_tempering",
"parallel_tempering",
]):
if (isinstance(msj.replica.temperature, sea.Map)):
ref_temp = msj.temperature.val if (isinstance(
msj.temperature, sea.Atom)) else msj.temperature[0][0].val
ref_temp = float(ref_temp)
asl = msj.replica.atom.val if (generator
== "solute_tempering") else "all"
temp = gen_temperature_for_solute_tempering(
msj.replica.temperature, model, ref_temp, asl, printer)
else:
temp = msj.replica.temperature
num_replica = len(temp)
else:
num_replica = len(msj.replica)
return num_replica
[docs]def get_rest_replica(atom_selection=f"atom.{constants.REST_HOTREGION} 1",
exchange_probability=0.3,
n_replica=2):
"""
Return REST-specific replicas
:rtype: `sea.Map`
"""
REST_REPLICA_TEMPLATE = """
replica = {
generator = solute_tempering
atom = "asl: %s"
temperature = {
generator = PvdS
exchange_probability = %g
n_replica = %i
}
}
"""
return sea.Map(REST_REPLICA_TEMPLATE %
(atom_selection, exchange_probability, n_replica))
[docs]def num_cpu(msj):
"""
"""
cpu = msj.cpu.val
n = 1
for e in (cpu if (isinstance(cpu, list)) else [
cpu,
]):
n *= e
return n
[docs]def get_fep_lambdas(n_win, fep_type, scheme):
"""
return default lambdas
"""
if scheme == "flexible":
scheme = "default"
return fep_schedule.get_fep_schedule(n_win, fep_type, scheme).get_lambda()
def __set_fep(msj, cfg, model):
"""
"""
if (model is None):
return
sys_type = cms.get_model_system_type(model.comp_ct)
fep_type = "alchemical" if (
sys_type == constants.SystemType.ALCHEMICAL) else "binding"
cfg.force.nonbonded.near.type.val = fep_type + ":softcore"
far_type = cfg.force.nonbonded.far.type.val
if (fep_type == "binding" and far_type in [
"pme",
"gse",
]):
cfg.force.nonbonded.far.type.val = "binding:" + far_type
if sys_type == constants.SystemType.OTHER:
return
if ("gibbs" not in cfg.force.term.list.val):
cfg.force.term.list.append("gibbs")
gibbs = cfg.force.term.gibbs
gibbs.type.val = fep_type
gibbs.output.name.val = msj.fep.output.name.val
gibbs.output.first.val = msj.fep.output.first.val
gibbs.output.interval.val = msj.fep.output.interval.val
if (msj.fep.i_window.val is not None):
gibbs.window.val = msj.fep.i_window.val
if (isinstance(msj.fep.lambda_, sea.Atom)):
scheme, n_win = parse_lambda(msj.fep.lambda_.val)
if (n_win < 4):
raise ValueError("Total number of windows should be 4 at least")
fs = fep_schedule.get_fep_schedule(n_win, fep_type, scheme).get_lambda()
la = gibbs.weights
la["vdw"] = fs.vdw
la["es"] = fs.coulomb
la["vdwA"] = fs.vdwA
la["vdwB"] = fs.vdwB
la["qA"] = fs.chargeA
la["qB"] = fs.chargeB
la["qC"] = [1 - eA - eB for eA, eB in zip(fs.chargeA, fs.chargeB)]
la["bondA"] = fs.bondedA
la["bondB"] = fs.bondedB
la["N"] = n_win
else:
if (fep_type != lambda_type(msj.fep.lambda_)):
raise ValueError("Wrong lambda schedule type")
if (fep_type == "alchemical"):
fs = msj.fep.lambda_
la = gibbs.weights
la["vdwA"] = fs.vdwA
la["vdwB"] = fs.vdwB
la["qA"] = fs.chargeA
la["qB"] = fs.chargeB
la["qC"] = [
1 - eA.val - eB.val for eA, eB in zip(fs.chargeA, fs.chargeB)
]
la["bondA"] = fs.bondedA
la["bondB"] = fs.bondedB
la["N"] = len(fs.vdwA)
else:
fs = msj.fep.lambda_
la = gibbs.weights
la["vdw"] = fs.vdw
la["es"] = fs.coulomb
la["N"] = len(fs.vdw)
def __set_meta_file(msj, cfg, model):
"""
"""
if msj.meta.val == 'FILE':
with open(msj.meta_file.val) as fh:
m_expr = fh.read()
from schrodinger.application.desmond.enhsamp import parseStr
s_expr = parseStr(model, m_expr)
cfg.force.term.list.append("ES")
cfg.force.term.ES = sea.Map(s_expr)
def __set_meta(msj, cfg, model):
"""
"""
if not model or isinstance(msj.meta, sea.Atom) or len(msj.meta.cv) == 0:
return
# Converts the ASL expressions in the `cv[*].atom' list to lists of atoms.
meta = deepcopy(msj.meta)
for cv in meta.cv:
new_atom = sea.List()
for atom in cv.atom:
atom_list = model.select_atom(atom.val)
new_atom.append(atom_list)
cv["atom"] = new_atom
from schrodinger.application.desmond.meta import generate_meta_cfg
cfg.force.term.list.append("ES")
cfg.force.term["ES"] = sea.Map(generate_meta_cfg(meta, model))
last = meta.last.val if ("last" in meta) else "inf"
for e in cfg.force.term.ES.metadynamics_accumulators:
e["last"] = last
def __set_cutoff_radius(msj, cfg, model):
"""
"""
cfg.force.nonbonded.r_cut.val = msj.cutoff_radius.val
def __set_bigger_rclone(msj, cfg, model):
"""
"""
def __set_taper(msj, cfg, model):
"""
"""
default_taper_setting = sea.Map("method = shift width = 1.0")
if (isinstance(msj.taper, sea.Atom)):
if (msj.taper.val):
cfg.force.nonbonded.near.taper.val = default_taper_setting.method.val
cfg.force.nonbonded.near.r_tap.val = msj.cutoff_radius.val - default_taper_setting.width.val
else:
cfg.force.nonbonded.near.taper.val = "none"
cfg.force.nonbonded.near.r_tap.val = msj.cutoff_radius.val
else:
custom_taper_setting = msj.taper
msj.taper = default_taper_setting
msj.taper.update(custom_taper_setting)
cfg.force.nonbonded.near.taper.val = msj.taper.method.val
cfg.force.nonbonded.near.r_tap.val = msj.cutoff_radius.val - msj.taper.width.val
if (cfg.force.nonbonded.near.r_tap.val < 0):
raise ValueError("Invalid value for 'r_tap': %f" %
cfg.force.nonbonded.near.r_tap.val)
def __set_coulomb_method(msj, cfg, model):
"""
"""
if (isinstance(msj.coulomb_method, sea.Atom)):
if (msj.coulomb_method.val in [
"useries",
"pme",
]):
cfg.force.nonbonded.far.type.val = "pme"
else:
cfg.force.nonbonded.far.type.val = "none"
cfg.force.nonbonded.sigma.val = "inf"
else:
method_name, ewald_tol = msj.coulomb_method.val
if (method_name == "pme"):
cfg.force.nonbonded.far.type.val = "pme"
cfg.gui.ewald_tol.val = ewald_tol
else:
raise ValueError("Unknown coulomb method: %s" % method_name)
def __set_temperature(msj, cfg, model):
"""
"""
if ("annealing" in msj and msj.annealing.val):
cfg.mdsim.plugin.anneal.schedule["time"] = sea.List()
cfg.mdsim.plugin.anneal.schedule["value"] = sea.List()
time = cfg.mdsim.plugin.anneal.schedule.time
value = cfg.mdsim.plugin.anneal.schedule.value
for e in msj.temperature:
value.append(e[0].val)
time.append(e[1].val)
else:
if (isinstance(msj.temperature, sea.Atom)):
# FIXME: Here we consider only the case of 1 thermostat.
temp = sea.Map()
temp["T_ref"] = msj.temperature.val
cfg.integrator["temperature"] = temp
if not isinstance(cfg.integrator.Multigrator.barostat, sea.Atom):
cfg.integrator.Multigrator.barostat.MTK.T_ref.val = msj.temperature.val
else:
temperature = sea.Map()
if (1 < len(msj.temperature)):
t_list = sea.List()
for e in msj.temperature:
temp = sea.Map()
temp["T_ref"] = e[0].val
temp["groups"] = sea.List([
e[1].val,
])
t_list.append(temp)
temperature["T_groups"] = t_list
temperature["T_ref"] = msj.temperature[0][0].val
cfg.integrator["temperature"] = temperature
def __set_annealing(msj, cfg, model):
"""
"""
if (msj.annealing.val):
add_plugin(cfg.mdsim, "anneal")
else:
remove_plugin(cfg.mdsim, "anneal")
def __set_pressure(msj, cfg, model):
"""
"""
if (isinstance(msj.pressure, sea.Atom)):
cfg.integrator.pressure.P_ref.val = msj.pressure.val
else:
cfg.integrator.pressure.P_ref.val = msj.pressure[0].val
if (isinstance(msj.ensemble, sea.Atom)):
if (msj.ensemble.val in [
"NPT",
"NPT_Ber",
"NPT_L",
]):
cfg.integrator.pressure.isotropy.val = msj.pressure[1].val
else:
if (msj.ensemble.class_.val == "NPT"):
cfg.integrator.pressure.isotropy.val = msj.pressure[1].val
def __set_surface_tension(msj, cfg, model):
"""
"""
if ((isinstance(msj.ensemble, sea.Atom) and msj.ensemble.val == "NPgT") or
(isinstance(msj.ensemble, sea.Map) and
msj.ensemble.class_.val == "NPgT")):
cfg.integrator.pressure["tension_ref"] = msj.surface_tension.val
else:
if ("tension_ref" in cfg.integrator.pressure):
del cfg.integrator.pressure["tension_ref"]
[docs]def get_ensemble_class_method(msj):
"""
"""
if (isinstance(msj.ensemble, sea.Atom)):
ens_type_map = {
"NPT": (
"NPT",
"MTK",
),
"NVT": (
"NVT",
"NH",
),
"NVE": (
"NVE",
"NH",
), # Method for "NVE" will be ignored.
"NPgT": (
"NPgT",
"MTK",
),
"NPAT": (
"NPAT",
"MTK",
),
"NPT_Ber": (
"NPT",
"Berendsen",
),
"NVT_Ber": (
"NVT",
"Berendsen",
),
"NPT_L": (
"NPT",
"Langevin",
),
"NVT_L": (
"NVT",
"Langevin",
),
}
return ens_type_map[msj.ensemble.val]
else:
return (
msj.ensemble.class_.val,
msj.ensemble.method.val,
)
def __set_ensemble(msj, cfg, model):
"""
"""
integrator = cfg.integrator
ens = "NVE"
tau = None
if (isinstance(msj.ensemble, sea.Atom)):
ens_type_map = {
"NPT": "MTK_NPT",
"NVT": "NH_NVT",
"NVE": "V_NVE",
"NPgT": "MTK_NPT",
"NPAT": "MTK_NPT",
"NPT_Ber": "Ber_NPT",
"NVT_Ber": "Ber_NVT",
"NPT_L": "L_NPT",
"NVT_L": "L_NVT",
"NPT_Brownie": "brownie_NPT",
"NVT_Brownie": "brownie_NVT",
"NVT_DPD": "DPD_NVT"
}
key = msj.ensemble.val
ens = key
else:
if (msj.ensemble.class_.val == "NVE"):
ens_type_map = {
"NVE": "V_NVE",
}
key = msj.ensemble.class_.val
else:
# FIXME: Check the consistency between the "class" and "method" parameters when job is launched.
ens_type_map = {
(
"NPT",
"MTK",
): "MTK_NPT",
(
"NPgT",
"MTK",
): "MTK_NPT",
(
"NPAT",
"MTK",
): "MTK_NPT",
(
"NVT",
"NH",
): "NH_NVT",
(
"NPT",
"Berendsen",
): "Ber_NPT",
(
"NPgT",
"Berendsen",
): "Ber_NPT",
(
"NPAT",
"Berendsen",
): "Ber_NPT",
(
"NVT",
"Berendsen",
): "Ber_NVT",
(
"NPT",
"Langevin",
): "L_NPT",
(
"NPgT",
"Langevin",
): "L_NPT",
(
"NPAT",
"Langevin",
): "L_NPT",
(
"NVT",
"Langevin",
): "L_NVT",
(
"NPT",
"Brownie",
): "brownie_NPT",
(
"NVT",
"Brownie",
): "brownie_NVT",
}
ens = msj.ensemble.class_.val
method = msj.ensemble.method.val
ttau = msj.ensemble.thermostat.tau.val if (
"thermostat" in msj.ensemble and method != "Brownie") else None
btau = msj.ensemble.barostat.tau.val if ("barostat"
in msj.ensemble) else None
key = (
ens,
method,
)
tau = (
ttau,
btau,
)
t = ens_type_map[key]
if (ens == "NPgT"):
integrator.pressure.isotropy.val = "semi_isotropic"
elif (ens == "NPAT"):
integrator.pressure.isotropy.val = "constant_area"
if (t in [
"V_NVE",
]):
integrator.Multigrator["thermostat"] = "none"
if (t in [
"NH_NVT",
"Ber_NVT",
"L_NVT",
"V_NVE",
]):
integrator.Multigrator["barostat"] = "none"
if (tau is not None):
# tau[0] is for thermostat, tau[1] for barostat.
if (t in [
"MTK_NPT",
"NH_NVT",
]):
if (tau[0]):
ts = integrator.Multigrator.thermostat.timesteps.val
for e in integrator.Multigrator.thermostat.NoseHoover.tau:
e.val = old_div(tau[0], float(ts))
if (t == "MTK_NPT"):
ts = integrator.Multigrator.barostat.timesteps.val
if (tau[0]):
for e in integrator.Multigrator.barostat.MTK.thermostat.NoseHoover.tau:
e.val = old_div(tau[0], float(ts))
if (tau[1]):
integrator.Multigrator.barostat.MTK.tau.val = old_div(
tau[1], float(ts))
elif (t in [
"Ber_NPT",
"Ber_NVT",
]):
if (tau[0]):
integrator[t].tau.val = tau[0]
if (t == "Ber_NPT" and tau[1]):
integrator.Ber_NPT.barostat.tau.val = tau[1]
elif t == "L_NVT":
integrator.Multigrator.thermostat.type.val = "Langevin"
if tau[0] is not None:
ts = integrator.Multigrator.thermostat.timesteps.val
integrator.Multigrator.thermostat.Langevin.tau.val = old_div(
tau[0], float(ts))
elif t == "L_NPT":
integrator.Multigrator.thermostat.type.val = "Langevin"
integrator.Multigrator.barostat.MTK.thermostat.type.val = "Langevin"
if tau[0] is not None:
ts = integrator.Multigrator.thermostat.timesteps.val
integrator.Multigrator.thermostat.Langevin.tau.val = old_div(
tau[0], float(ts))
integrator.Multigrator.barostat.MTK.thermostat.Langevin.tau.val = old_div(
tau[0], float(ts))
if tau[1] is not None:
ts = integrator.Multigrator.barostat.timesteps.val
integrator.Multigrator.barostat.MTK.tau.val = old_div(
tau[1], float(ts))
elif (t == "brownie_NVT"):
integrator.brownie_NVT.delta_max.val = msj.ensemble.brownie.delta_max.val
elif (t == "brownie_NPT"):
if (tau[1] is not None):
integrator.brownie_NPT.barostat.tau.val = tau[1]
integrator.brownie_NPT.delta_max.val = msj.ensemble.brownie.delta_max.val
if (isinstance(msj.temperature, sea.Atom)):
integrator.brownie_NPT.barostat.T_ref.val = msj.temperature.val
else:
integrator.brownie_NPT.barostat.T_ref.val = msj.temperature[0][
0].val
if t in ["Ber_NPT", "Ber_NVT", "brownie_NVT", "brownie_NPT"]:
integrator.type.val = t
desmond_exec = get_exec(msj, cfg)
if (t in [
"Ber_NPT",
"Ber_NVT",
]):
cfg.force.ignore_com_dofs.val = True
desmond_exec.plugin.remove_com_motion.first.val = 0.0
desmond_exec.plugin.remove_com_motion.interval.val = 0.0
def __set_time(msj, cfg, model):
"""
"""
desmond_exec = get_exec(msj, cfg)
desmond_exec.last_time.val = msj.time.val
def __set_elapsed_time(msj, cfg, model):
"""
"""
cfg.global_cell.reference_time.val = msj.elapsed_time.val
def __set_timestep(msj, cfg, model):
"""
"""
timestep = msj.timestep.val
cfg.integrator.dt.val = timestep[0]
cfg.integrator.respa.near_timesteps.val = int(
old_div(timestep[1], timestep[0]) + 0.1)
cfg.integrator.respa.far_timesteps.val = int(
old_div(timestep[2], timestep[0]) + 0.1)
cfg.integrator.respa.outer_timesteps.val = int(
old_div(timestep[2], timestep[0]) + 0.1)
def __set_cpu(msj, cfg, model):
"""
"""
if (model is None):
return
if (isinstance(msj.cpu, sea.Atom)):
import schrodinger.application.desmond.autopartition as autopartition
x, y, z = cms.get_boxsize(model.box)
nx, ny, nz = autopartition.auto_partition([
x,
y,
z,
], msj.cpu.val)
else:
nx, ny, nz = msj.cpu.val
cfg.global_cell["partition"] = (
nx,
ny,
nz,
)
def __set_glue(msj, cfg, model):
"""
"""
def __set_vrun_frameset(msj, cfg, model):
"""
"""
cfg.vrun.frameset.name.val = msj.vrun_frameset.val
def __set_reinit_frameset(msj, cfg, model):
for name in ('first', 'interval', 'configurations_file'):
cfg.reinit.frameset[name].val = msj.reinit_frameset[name].val
def __set_trajectory(msj, cfg, model):
"""
"""
if (is_minimize(msj)):
return
desmond_exec = get_exec(msj, cfg)
if (isinstance(msj.trajectory, sea.Atom)):
remove_plugin(desmond_exec, "trajectory")
return
desmond_exec.plugin.trajectory.name.val = msj.trajectory.name.val
desmond_exec.plugin.trajectory.first.val = msj.trajectory.first.val
desmond_exec.plugin.trajectory.interval.val = msj.trajectory.interval.val
desmond_exec.plugin.trajectory.periodicfix.val = msj.trajectory.periodicfix.val
desmond_exec.plugin.trajectory.write_velocity.val = msj.trajectory.write_velocity.val
try:
desmond_exec.plugin.trajectory.frames_per_file.val = msj.trajectory.frames_per_file.val
except AttributeError:
pass
try:
center = msj.trajectory.center
if (isinstance(center, sea.List)):
desmond_exec.plugin.trajectory.center.val = [
model.gid(e) for e in center.val
]
else:
desmond_exec.plugin.trajectory.center.val = [
model.gid(e) for e in model.select_atom(center.val)
]
except AttributeError:
pass
def __set_eneseq(msj, cfg, model):
"""
"""
desmond_exec = get_exec(msj, cfg)
if (isinstance(msj.eneseq, sea.Atom)):
remove_plugin(desmond_exec, "eneseq")
return
desmond_exec.plugin.eneseq.name.val = msj.eneseq.name.val
desmond_exec.plugin.eneseq.first.val = msj.eneseq.first.val
desmond_exec.plugin.eneseq.interval.val = msj.eneseq.interval.val
try:
desmond_exec.plugin.eneseq["precision"] = msj.eneseq.precision.val
except AttributeError:
pass
def __set_checkpt(msj, cfg, model):
"""
"""
if (is_minimize(msj)):
return
desmond_exec = get_exec(msj, cfg)
if (isinstance(msj.checkpt, sea.Atom)):
if (msj.checkpt.val):
__set_checkpt(get_default_setting(msj), cfg, model)
else:
desmond_exec.checkpt.first.val = "inf"
desmond_exec.checkpt.interval.val = "inf"
desmond_exec.checkpt.wall_interval.val = "inf"
desmond_exec.checkpt.write_last_step.val = False
else:
desmond_exec.checkpt.name.val = msj.checkpt.name.val
desmond_exec.checkpt.write_last_step.val = msj.checkpt.write_last_step.val
if ("wall_interval" in msj.checkpt and
desmond_exec.checkpt.wall_interval.val != float("inf")):
desmond_exec.checkpt.first.val = "inf"
desmond_exec.checkpt.interval.val = "inf"
desmond_exec.checkpt.wall_interval.val = msj.checkpt.wall_interval.val
else:
desmond_exec.checkpt.first.val = msj.checkpt.first.val
desmond_exec.checkpt.interval.val = msj.checkpt.interval.val
desmond_exec.checkpt.wall_interval.val = "inf"
def __set_maeff_output(msj, cfg, model):
"""
"""
desmond_exec = get_exec(msj, cfg)
if (isinstance(msj.maeff_output, sea.Atom)):
remove_plugin(desmond_exec, "maeff_output")
return
desmond_exec.plugin.maeff_output.name.val = msj.maeff_output.name.val
desmond_exec.plugin.maeff_output.first.val = msj.maeff_output.first.val
desmond_exec.plugin.maeff_output.interval.val = msj.maeff_output.interval.val
try:
# If user runs desmond directly without multisim and if the center_atoms
# is not set in the input .cfg file, we will get an AttributeError.
desmond_exec.plugin.maeff_output[
"center_atoms"] = msj.maeff_output.center_atoms.val
except AttributeError:
desmond_exec.plugin.maeff_output[
"center_atoms"] = DEFAULT_SETTING.MD.maeff_output.center_atoms.val
if is_minimize(msj):
desmond_exec.plugin.maeff_output.trjdir.val = ""
else:
# read trjidx key to support front-end cfg files generated with
# prior versions (<15-4)
if 'trjidx' in msj.maeff_output:
desmond_exec.plugin.maeff_output.trjdir.val = msj.maeff_output.trjidx.val.replace(
'-out.idx', '_trj')
else:
desmond_exec.plugin.maeff_output.trjdir.val = msj.maeff_output.trjdir.val
if ("periodicfix" in msj.maeff_output):
desmond_exec.plugin.maeff_output.periodicfix.val = msj.maeff_output.periodicfix.val
def __set_randomize_velocity(msj, cfg, model):
"""
"""
if (is_minimize(msj)):
return
desmond_exec = get_exec(msj, cfg)
if (isinstance(msj.randomize_velocity, sea.Atom)):
if (msj.randomize_velocity.val):
__set_randomize_velocity(get_default_setting(msj), cfg, model)
else:
remove_plugin(desmond_exec, "randomize_velocity")
else:
desmond_exec.plugin.randomize_velocities.first.val = msj.randomize_velocity.first.val
desmond_exec.plugin.randomize_velocities.interval.val = msj.randomize_velocity.interval.val
desmond_exec.plugin.remove_com_motion.first.val = msj.randomize_velocity.first.val
desmond_exec.plugin.remove_com_motion.interval.val = msj.randomize_velocity.interval.val
try:
desmond_exec.plugin.randomize_velocities.seed.val = msj.randomize_velocity.seed.val
except AttributeError:
pass
temp = msj.randomize_velocity.temperature.val
if (isinstance(temp, list)):
desmond_exec.plugin.randomize_velocities.temperature.val = temp[0][
0]
else:
desmond_exec.plugin.randomize_velocities.temperature.val = temp
def __set_simbox(msj, cfg, model):
"""
"""
if (is_minimize(msj)):
return
desmond_exec = get_exec(msj, cfg)
if (isinstance(msj.simbox, sea.Atom)):
if (msj.simbox.val):
__set_simbox(get_default_setting(msj), cfg, model)
else:
remove_plugin(desmond_exec, "simbox_output")
else:
desmond_exec.plugin.simbox_output.name.val = msj.simbox.name.val
desmond_exec.plugin.simbox_output.first.val = msj.simbox.first.val
desmond_exec.plugin.simbox_output.interval.val = msj.simbox.interval.val
add_plugin(desmond_exec, "simbox_output")
[docs]def get_simbox_output_filename(msj):
"""
Returns 'None' if the simbox_output plugin is turned off.
"""
if (is_minimize(msj)):
return None
if (isinstance(msj.simbox, sea.Atom)):
if (sea.simbox.val):
return get_simbox_output_filename(get_default_setting(msj))
else:
return msj.simbox.name.val
def __set_energy_group(msj, cfg, model):
"""
"""
default_energy_group_setting = sea.Map("""
name = "$JOBNAME$[_replica$REPLICA$]_enegrp.dat"
first = 0.0
interval = 1.2
self_energy = false
corr_energy = true
""")
desmond_exec = get_exec(msj, cfg)
if (isinstance(msj.energy_group, sea.Atom)):
if (msj.energy_group.val):
msj["energy_group"] = default_energy_group_setting
__set_energy_group(msj, cfg, model)
else:
remove_plugin(desmond_exec, "energy_groups")
else:
custom_energy_group_setting = msj.energy_group
msj.energy_group = default_energy_group_setting
msj.energy_group.update(custom_energy_group_setting)
desmond_exec.plugin.energy_groups.name.val = msj.energy_group.name.val
desmond_exec.plugin.energy_groups.first.val = msj.energy_group.first.val
desmond_exec.plugin.energy_groups.interval.val = msj.energy_group.interval.val
eg = desmond_exec.plugin.energy_groups
def set_options(term):
if (term in msj.energy_group):
if (msj.energy_group[term].val):
eg.options.append(term)
else:
try:
eg.options.remove(term)
except ValueError:
pass
set_options("self_energy")
set_options("corr_energy")
add_plugin(desmond_exec, "energy_groups")
def __set_pressure_tensor(msj, cfg, model):
"""
Set ptensor plugin and corresponding variables from MSJ pressure_tensor
block.
"""
default_settings = sea.Map("""
name = "$JOBNAME$[_replica$REPLICA$].ptensor.gz"
first = 0.0
interval = 1.2
print_volume = false
""")
desmond_exec = get_exec(msj, cfg)
if (isinstance(msj.pressure_tensor, sea.Atom)):
if (msj.pressure_tensor.val):
msj["pressure_tensor"] = default_settings
__set_pressure_tensor(msj, cfg, model)
else:
remove_plugin(desmond_exec, "ptensor")
else:
custom_settings = msj.pressure_tensor
msj.pressure_tensor = default_settings
msj.pressure_tensor.update(custom_settings)
desmond_exec.plugin.ptensor.name.val = msj.pressure_tensor.name.val
desmond_exec.plugin.ptensor.first.val = msj.pressure_tensor.first.val
desmond_exec.plugin.ptensor.interval.val = msj.pressure_tensor.interval.val
desmond_exec.plugin.ptensor.print_volume.val = msj.pressure_tensor.print_volume.val
add_plugin(desmond_exec, "ptensor")
def __set_dipole_moment(msj, cfg, model):
"""
Set dipole moment plugin and corresponding variables from MSJ block.
"""
default_settings = sea.Map("""
first = 0.0
interval = 1.2
name = "$JOBNAME$[_replica$REPLICA$]_dipole.dat"
""")
desmond_exec = get_exec(msj, cfg)
if (isinstance(msj.dipole_moment, sea.Atom)):
if (msj.dipole_moment.val):
msj["dipole_moment"] = default_settings
__set_dipole_moment(msj, cfg, model)
else:
remove_plugin(desmond_exec, "dipole_moment")
else:
custom_settings = msj.dipole_moment
msj.dipole_moment = default_settings
msj.dipole_moment.update(custom_settings)
desmond_exec.plugin.dipole_moment.name.val = msj.dipole_moment.name.val
desmond_exec.plugin.dipole_moment.first.val = msj.dipole_moment.first.val
desmond_exec.plugin.dipole_moment.interval.val = msj.dipole_moment.interval.val
add_plugin(desmond_exec, "dipole_moment")
[docs]def get_energy_group_output_filename(msj):
"""
Returns 'None' if the energy_groups plugin is turned off.
"""
if (isinstance(msj.energy_group, sea.Atom)):
if (msj.energy_group.val):
return get_energy_group_output_filename(get_default_setting(msj))
else:
return msj.energy_group.name.val if (
msj.time.val > msj.energy_group.first.val) else None
[docs]def get_fep_output_filename(msj):
"""
"""
if not is_minimize(msj) and not is_vrun(msj):
try:
if (msj.time.val > msj.fep.output.first.val):
return msj.fep.output.name.val
except AttributeError:
pass
# For minimization
def __set_max_steps(msj, cfg, model):
"""
"""
cfg.minimize.maxsteps.val = msj.max_steps.val
def __set_convergence(msj, cfg, model):
"""
"""
cfg.minimize.tol.val = msj.convergence.val
def __set_steepest_descent_steps(msj, cfg, model):
"""
"""
cfg.minimize.sdsteps.val = msj.steepest_descent_steps.val
def __set_num_vector(msj, cfg, model):
"""
"""
cfg.minimize.m.val = msj.num_vector.val
def __set_backend(msj, cfg, model):
"""
"""
if ("backend" in msj and isinstance(msj.backend, sea.Map)):
cfg.update(msj.backend.dval)
# For remd
[docs]def gen_temperature_for_solute_tempering(setting,
model,
base_temp,
asl,
printer=None):
"""
"""
from . import remd
if (model is None):
print(
"WARNING: Cannot predict REMD temperature ladder without model system."
)
return [
300,
400,
500,
600,
700,
800,
]
if ("highest_temperature" in setting and "exchange_probability" in setting):
top_temp = setting.highest_temperature.val
temp, prob = remd.predict_with_temp_and_exch((
base_temp,
top_temp,
), setting.exchange_probability.val, model, asl)
if ("highest_temperature" in setting and "n_replica" in setting):
top_temp = setting.highest_temperature.val
temp, prob = remd.predict_with_temp_and_nreplica((
base_temp,
top_temp,
), setting.n_replica.val, model, asl)
if ("exchange_probability" in setting and "n_replica" in setting):
temp, prob = remd.predict_with_nreplica_and_exch(
setting.n_replica.val, setting.exchange_probability.val, base_temp,
model, asl)
if (printer):
printer("Temperature ladder (exchange probability):")
for t, p in zip(temp, prob):
printer(" %.3g (%.3g)" % (
t,
p,
))
printer(" %.3g (n/a)" % temp[-1])
return temp
[docs]def gen_replica(msj, model=None):
"""
"""
replica = msj.replica
generator = replica.generator.val
if (generator in [
"solute_tempering",
"parallel_tempering",
]):
ref_temp = msj.temperature.val if (isinstance(
msj.temperature, sea.Atom)) else msj.temperature[0][0].val
ref_temp = float(ref_temp)
asl = replica.atom.val if (generator == "solute_tempering") else "all"
if (isinstance(replica.temperature, sea.Map)):
rep_temp = gen_temperature_for_solute_tempering(
replica.temperature, model, ref_temp, asl)
else:
rep_temp = replica.temperature.val
replica = sea.List()
if (ref_temp not in rep_temp):
print(
"WARNING: Reference temperature (%f) not found in replicas (%s)"
% (
ref_temp,
str(rep_temp),
))
if (generator == "solute_tempering"):
rep_fname = sea.Atom(
"$JOBNAME_solute_tempering_replica%d-in.cms").val
for i, e in enumerate(rep_temp):
fname = rep_fname % i
if (model):
from . import remd
remd.write_ff_solute_tempering(model, fname, asl,
old_div(ref_temp, e))
replica.append(sea.Map("model_file = %s" % fname))
else:
for e in rep_temp:
replica.append(sea.Map("temperature = %.3g model_file = ?" %
e))
else:
raise ValueError("Unknown replica generator: %s" %
replica.generator.val)
return replica
def __set_replica(msj, cfg, model):
"""
"""
msj = deepcopy(msj)
replica_msj = []
if (is_remd(msj)):
replica = msj.replica
if (isinstance(replica, sea.Map)):
replica = gen_replica(msj, model)
if (not isinstance(replica, sea.List)):
raise ValueError("Unknown type for 'replica' parameter: %s" %
type(replica))
for e in replica:
msj_ = deepcopy(msj)
msj_.update(e)
replica_msj.append(msj_)
return (
msj,
replica_msj,
)
[docs]def get_replica_setting(msj):
"""
"""
ref, replica = __set_replica(msj, None, None)
return replica
def __set_model_file(msj, cfg, model):
"""
"""
if ("model_file" in msj):
desmond_exec = get_exec(msj, cfg)
desmond_exec.plugin.maeff_output["bootfile"] = msj.model_file.val
def __attach_msj(msj, cfg, model):
"""
"""
cfg["ORIG_CFG"] = msj
def __set_restrain(msj, cfg, model):
"""
restrain only used in gconfig.py for concatenator
"""
def __set_restraints(msj, cfg, model):
"""
restraints only used in gconfig.py for concatenator
"""
def __set_box(msj, cfg, model):
pass
def __optimize_cfg(msj, cfg, model):
"""
"""
if (model):
optimize_key(msj, cfg, model)
# FIXME: replace optimize_key with forceconfig.
def __clean_cfg(cfg):
"""
"""
app_map = {
"mdsim": [
"PLUGIN_COMMON",
"CHECKPT_COMMON",
"minimize",
"remd",
"vrun",
],
"vrun": [
"PLUGIN_COMMON",
"CHECKPT_COMMON",
"minimize",
"remd",
"mdsim",
],
"remd": [
"PLUGIN_COMMON",
"CHECKPT_COMMON",
"minimize",
"vrun",
"mdsim",
],
"minimize": [
"PLUGIN_COMMON",
"CHECKPT_COMMON",
"remd",
"vrun",
"mdsim",
],
}
for e in app_map[cfg.app.val]:
if (e in cfg):
del cfg[e]
# Deletes unused plugin settings.
desmond_exec = cfg[cfg.app.val]
used_plugin = desmond_exec.plugin.list.val
used_plugin += [
"list",
"maeff_output",
]
for e in desmond_exec.plugin:
if (e not in used_plugin):
del desmond_exec.plugin[e]
# Deletes unused ensemble settings
ens_list = [
"Ber_NPT",
"Ber_NVT",
"MTK_NPT",
"NH_NVT",
"V_NVE",
"L_NVT",
"L_NPT",
"Piston_NPH",
]
try:
ens_index = ens_list.index(cfg.integrator.type.val)
del ens_list[ens_index]
except ValueError:
pass
for e in ens_list:
del cfg.integrator[e]
# Deletes the "gibbs" setting if it is not used.
if ("gibbs" not in cfg.force.term.list.val and "gibbs" in cfg.force.term):
del cfg.force.term["gibbs"]
def __fix_cfg(msj, cfg):
"""
"""
# Special treatment for annealing
if ("annealing" in msj and msj.annealing.val):
ens_type = cfg.integrator.type.val
ens = cfg.integrator[ens_type]
del cfg.integrator[ens_type]
cfg.integrator.type.val = "anneal"
cfg.integrator["anneal"] = sea.Map("type = " + ens_type)
cfg.integrator.anneal[ens_type] = ens
# Special treatment for force.nonbonded.far
if (cfg.force.nonbonded.far.type.val == "none"):
cfg.force.nonbonded["far"] = "none"
def __msj2cfg_helper(msj, cfg, model, macro):
"""
"""
sea.set_macro_dict(macro)
cfg = sea.Map(DEFAULT_CONFIG) if (cfg is None) else cfg
cfg = cfg.dval
this_mod = globals()
for k in list(msj):
if (k not in [
"replica",
"backend",
]):
this_mod["__set_" + k](msj, cfg, model)
return cfg.bval
[docs]def msj2cfg(msj, cfg, model, macro=None, is_gdesmond=False):
"""
"""
macro = macro or {}
if (is_gdesmond):
import schrodinger.application.desmond.gconfig as gconfig
return gconfig.msj2cfg(msj, cfg, model, macro)
if 'simulate' in msj: # assumed to be a concatenate config
# TODO: eliminate this code -- it should not be needed
# with CPU desmond gone (still called from tests though)
import schrodinger.application.desmond.gconfig as gconfig
simulate_cfgs = []
for msj_ in msj.simulate:
simulate_cfg = msj2cfg(msj_, deepcopy(cfg), model, macro)
simulate_cfg.restraints = cmj.get_restraints_xor_convert_restrain(
msj_)
simulate_cfgs.append(simulate_cfg)
base_concat_msj = deepcopy(msj)
del base_concat_msj['simulate']
base_concat_cfg = msj2cfg(base_concat_msj, deepcopy(cfg), model, macro)
return gconfig.concatenate_cfgs(simulate_cfgs, base_concat_cfg)
msj_ref, msj_replica = __set_replica(msj, cfg, model)
cfg_ref = __msj2cfg_helper(msj_ref, deepcopy(cfg), model, macro)
cfg_replica = []
for i, e in enumerate(msj_replica):
macro["$REPLICA"] = str(i)
cfg_replica.append(__msj2cfg_helper(e, deepcopy(cfg), model, macro))
cfg_ref.app.val = get_exec_name(msj)
__optimize_cfg(msj_ref, cfg_ref, model)
__set_backend(msj_ref, cfg_ref, model)
cfg_ref = cfg_ref.bval
cfg_rep = []
for m, c in zip(msj_replica, cfg_replica):
c.app.val = get_exec_name(msj)
if (m.model_file.val):
__optimize_cfg(m, c, cms.Cms(file=m.model_file.val))
else:
__optimize_cfg(m, c, model)
__set_backend(m, c, model)
cfg_rep.append(c.bval)
cfg_diff = []
for e in cfg_rep:
changed, referred, added, lost = sea.diff(e, cfg_ref)
change = changed.update(added)
if ("mdsim" in change):
del change["mdsim"]
if ("minimize" in change):
del change["minimize"]
if ("vrun" in change):
del change["vrun"]
if ("remd" in change):
remd = change.remd
del change["remd"]
change.update(remd)
cfg_diff.append(change)
for e in cfg_diff:
cfg_ref.remd.cfg.append(e)
__clean_cfg(cfg_ref)
__fix_cfg(msj, cfg_ref)
__attach_msj(msj, cfg_ref, model)
return cfg_ref
[docs]def translate_output_file_names_helper(msj, macro_dict):
sea.set_macro_dict(macro_dict)
cfg = sea.Map()
try:
desmond_exec = cfg[get_exec_name(msj)]
except KeyError:
cfg[get_exec_name(msj)] = sea.Map()
desmond_exec = cfg[get_exec_name(msj)]
for k in list(msj):
if (k == "fep"):
cfg.set_value("force.term.gibbs.output.name",
msj.fep.output.name.val)
elif (k == "trajectory"):
if (not (is_minimize(msj) or isinstance(msj.trajectory, sea.Atom))):
desmond_exec.set_value("plugin.trajectory.name",
msj.trajectory.name.val)
elif (k == "eneseq"):
if (not isinstance(msj.eneseq, sea.Atom)):
desmond_exec.set_value("plugin.eneseq.name",
msj.eneseq.name.val)
elif (k == "checkpt"):
if (not (is_minimize(msj))):
if (isinstance(msj.checkpt, sea.Atom)):
if (msj.checkpt.val):
desmond_exec.set_value("checkpt.name", "$JOBNAME.cpt")
else:
desmond_exec.set_value("checkpt.name", msj.checkpt.name.val)
elif (k == "maeff_output"):
if (not isinstance(msj.maeff_output, sea.Atom)):
desmond_exec.set_value("plugin.maeff_output.name",
msj.maeff_output.name.val)
desmond_exec.set_value("plugin.maeff_output.trjdir",
msj.trajectory.name.val)
elif (k == "simbox"):
if (not is_minimize(msj)):
if (isinstance(msj.simbox, sea.Atom)):
if (msj.simbox.val):
desmond_exec.set_value(
"plugin.simbox_output.name",
"$JOBNAME$[_replica$REPLICA$]_simbox.dat")
else:
desmond_exec.set_value("plugin.simbox_output.name",
msj.simbox.name.val)
elif (k == "energy_group"):
if (isinstance(msj.energy_group, sea.Atom)):
if (msj.energy_group.val):
desmond_exec.set_value(
"plugin.energy_groups.name",
"$JOBNAME$[_replica$REPLICA$]_enegrp.dat")
else:
desmond_exec.set_value("plugin.energy_groups.name",
msj.energy_group.name.val)
elif (k == "meta"):
if (not (isinstance(msj.meta, sea.Atom) or len(msj.meta.cv) == 0)):
#FIXME: uncomment below statements when the backend can reconfigure the file names
#DESMOND-2772
#cfg["force.term.ES.name"] = msj.meta.cv_name.val
#cfg["force.term.ES.metadynamics_accumulators.name"] = msj.meta.name.val
pass
elif (k == "vrun_frameset"):
cfg.set_value("vrun.frameset.name", msj.vrun_frameset.val)
return cfg.bval
[docs]def translate_output_file_names(msj, macro_dict, num_replica=1):
"""
"""
msj_ref = deepcopy(msj)
replica_msj = []
if (is_remd(msj)):
replica = msj.replica
if (isinstance(replica, sea.Map)):
replica = sea.List()
for i in len(num_replica):
replica.append(sea.Map())
if (not isinstance(replica, sea.List)):
raise ValueError("Unknown type for 'replica' parameter: %s" %
type(replica))
for e in replica:
msj_ = deepcopy(msj)
msj_.update(e)
replica_msj.append(msj_)
cfg_ref = translate_output_file_names_helper(msj, macro_dict)
cfg_replica = []
for i, e in enumerate(replica_msj):
macro_dict["$REPLICA"] = str(i)
cfg_replica.append(translate_output_file_names_helper(msj, macro_dict))
__set_backend(msj_ref, cfg_ref, None)
cfg_ref = cfg_ref.bval
cfg_rep = []
for m, c in zip(replica_msj, cfg_replica):
__set_backend(m, c, None)
cfg_rep.append(c.bval)
cfg_diff = []
for e in cfg_rep:
changed, referred, added, lost = sea.diff(e, cfg_ref)
change = changed.update(added)
if ("mdsim" in change):
del change["mdsim"]
if ("minimize" in change):
del change["minimize"]
if ("vrun" in change):
del change["vrun"]
if ("remd" in change):
remd = change.remd
del change["remd"]
change.update(remd)
cfg_diff.append(change)
for e in cfg_diff:
try:
cfg_ref.remd.cfg.append(e)
except AttributeError:
cfg_ref.remd["cfg"] = sea.List()
cfg_ref.remd.cfg.append(e)
return cfg_ref
def __canonicalize_fep(msj):
"""
"""
# Canonicalization of "fep.lambda" depends on the model system, so we temporarily don't do it for "fep.lambda".
def __canonicalize_temperature(msj):
"""
"""
if (isinstance(msj.temperature, sea.Atom)):
msj["temperature"] = sea.List([
[
msj.temperature.val,
0,
],
])
def __canonicalize_pressure(msj):
"""
"""
if (isinstance(msj.pressure, sea.Atom)):
msj["pressure"] = sea.List([
msj.pressure.val,
"isotropic",
])
def __canonicalize_ensemble(msj):
"""
"""
if (isinstance(msj.ensemble, sea.Atom)):
ens_class, method = get_ensemble_class_method(msj)
msj["ensemble"] = sea.Map(
"class = %s method = %s thermostat.tau = 1.0 barostat.tau = 2.0" % (
ens_class,
method,
))
def __canonicalize_coulomb_method(msj):
"""
"""
if (isinstance(msj.coulomb_method, sea.Atom) and
msj.coulomb_method.val == "pme"):
msj["coulomb_method"] = [
"pme",
1E-9,
]
[docs]def canonicalize(msj):
"""
"""
PARAM_TO_CANONICALIZE = [
"fep",
"temperature",
"pressure",
"ensemble",
"coulomb_method",
]
msj_copy = deepcopy(msj)
this_mod = globals()
for e in PARAM_TO_CANONICALIZE:
if (e in msj):
this_mod["__canonicalize_" + e](msj_copy)
return msj_copy
if ("__main__" == __name__):
# for n_win in range( 26, 26, 2 ) :
# print "default:%d" % n_win
# __get_fep_schedule( n_win, "binding", "default" )._print()
# print "quickcharge:%d" % n_win
# __get_fep_schedule( n_win, "binding", "quickcharge" )._print()
# print "superquickcharge:%d" % n_win
# __get_fep_schedule( n_win, "binding", "superquickcharge" )._print()
# print "\n"
# for n_win in range( 6, 26, 2 ) :
# print "default:%d" % n_win
# __get_fep_schedule( n_win, "alchemical", "default" )._print()
# print "quickcharge:%d" % n_win
# __get_fep_schedule( n_win, "alchemical", "quickcharge" )._print()
# print "superquickcharge:%d" % n_win
# __get_fep_schedule( n_win, "alchemical", "superquickcharge" )._print()
# print "\n"
#canonicalize( DEFAULT_SETTING.MD )
#remd = sea.Map( REMD )
#print sea.check_map( remd.DATA, remd.VALIDATE )
#print md.DATA.bval
#msj = DEFAULT_SETTING.META
#model = cms.Cms( file = "test-out.cms" )
#msj.meta.cv.append( sea.Map( "type = dist atom = [1 2] width = 0.8 wall = 9.0" ) )
#cfg = msj2cfg( msj, None, model, {"$JOBNAME" : "test",} )
import sys
msj = sea.Map(open(sys.argv[1], "r").read())
cfg = translate_output_file_names(msj, {"$JOBNAME": "test"})
print(cfg)
#model = cms.Cms( file = sys.argv[2] )
#cfg = msj2cfg( msj, None, model, {"$JOBNAME" : "test",}, is_gdesmond = True )
#cfg = msj2cfg( msj, None, model, {"$JOBNAME" : "test",} )
#print cfg
#optimize_key( cfg, model )
# remd.DATA.replica[0].temperature.val = 100.0
# remd.DATA.replica[1].temperature.val = 200.0
# print msj2cfg( remd.DATA, None, model, {"$JOBNAME" : "test",} )