"""
Utilities for handling gDesmond config files.
Copyright Schrodinger, LLC. All rights reserved.
"""
# Contributors: Yujie Wu
from copy import deepcopy
from past.utils import old_div
from typing import List
import numpy as np
import scipy.interpolate
import schrodinger.application.desmond.cms as cms
import schrodinger.application.desmond.config as config
import schrodinger.utils.sea as sea
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond.constants import GCMC_BATCH_SIZE_DEFAULT
from schrodinger.application.desmond.constants import SystemType
from schrodinger.structure import StructureReader
DEFAULT_CONFIG = """{
app = mdsim # mdsim | remd | vrun | reinit
boot.file = filename
global_cell = {
reference_time = 0.0
topology = periodic
n_replica = 1
partition = [1 1 1]
r_clone = 5.903366
clone_policy = rounded
margin = 1.0
}
migration = {
first = 0.0
interval = 0.018
}
spatial_order = auto # cells | conpablo
#spatial_order = {
# style = foxels
# indexing = snake # snake | linear
#}
integrator = {
type = Multigrator
dt = 0.2
respa = {
near_timesteps = 1
far_timesteps = 2
outer_timesteps = 3
migrate_timesteps = 9
}
temperature = {
T_ref = 300.0
use_molecular_KE = false
}
pressure = {
isotropy = isotropic # isotropic | semi_isotropic | anisotropic | constant_area | flexible | monoclinic
P_ref = 1.01325
tension_ref = 0
max_margin_contraction = 0.9
}
Multigrator = {
thermostat = { # none or a map
type = NoseHoover # Langevin | NoseHoover | DPD
timesteps = 12
Langevin = {
tau = 0.016129 # Gamma = 1/tau = 62 is default -- collision rate of water.
seed = 2012
}
NoseHoover = {
mts = 2
tau = [0.1 0.1 0.1]
}
DPD = {
seed = 2012
}
}
barostat = { # none or a map
type = MTK
timesteps = 48
temperature = {
T_ref = 300.0
use_molecular_KE = false
}
MTK = {
tau = 0.05
thermostat = {
type = NoseHoover
NoseHoover = {
mts = 2
tau = [0.1 0.1 0.1]
}
}
}
Langevin = {
tau = 0.020833333 # Gamma = 1/tau = 48 ps
thermostat = {
type = Langevin
tau = 0.016129 # Gamma = 1/tau = 62 is default -- collision rate of water.
seed = 2012
}
}
}
nve.type = Verlet
}
brownie = {
delta_max = 0.1
thermostat = {
seed = 2012
}
barostat = { # none or a map
delta_max = "@*.delta_max"
tau = 1.0
thermostat = "@*.thermostat"
}
}
brownie_NVT = {
thermostat.seed = 2014
delta_max = 0.1
}
brownie_NPT = {
thermostat.seed = 2014
delta_max = 0.1
barostat = {
thermostat.seed = 2014
tau = 0.016129
T_ref = 300.0
}
}
posre_scaling = 1.0
}
force = {
nonbonded = {
type = useries # ewald | useries
sigma = 2.791856 # only needed when force.type = ewald
r_cut = 9.0
r_lazy = 10.0
n_zone = 1024 # only needed when force.type = ewald
accuracy_level = 0 # only needed when force.type = useries
near = {
type = minimax # default | force-only | table | alchemical:softcore | binding:softcore
# minimax | alchemical:softcore-minimax | binding:softcore-minimax
# use minimax's with useries
r_tap = 9.0
taper = none # none | shift | c1switch | c2switch
#average_dispersion = 69.5 # In kcal/mol A^6. If not set, it will be caculated by Desmond.
correct_average_dispersion = true
}
far = {
Nterms = 32
type = QuadS # pme | pme-cpu | binding:pme | QuadS
sigma_s = 0.85
r_spread = 4.0
n_k = [16 16 16]
order = [4 4 4]
kappa = [0.333333 0.333333 0.333333]
spreading_style = scatter_gather # scatter_gather | scatter
}
}
bonded = {
exclude = []
include = []
}
virtual = {
exclude = []
include = []
}
ignore_com_dofs = true # subtract 3 from the degrees of freedom.
term = {
list = []
gibbs = {
type = none # none | alchemical | binding
alpha_vdw = 0.5
window = -1 # window index
# flexible = {} # flexible lambdas
weights = {
vdw = [] # binding
es = [] # binding
vdwA = [] # alchemical
vdwB = [] # alchemical
qA = [] # alchemical
qB = [] # alchemical
qC = [] # alchemical
bondA = [] # alchemical
bondB = [] # alchemical
}
output = {
name = "fep.dE"
first = 0.0
interval = 0.1
}
}
}
constraint = {
maxit = 16
tol = 1.0e-8
}
}
PLUGIN_COMMON = {
list = [status eneseq trajectory remove_com_motion]
anneal = {
type = anneal
first = 0.0
interval = 1.2
schedule = {
time = [0.0 30.0 60.0 90.0 600.0]
value = [0.0 300.0 600.0 900.0 300.0]
}
}
status = {
type = status
first = 0.0
interval = 1.2
}
eneseq = {
type = eneseq
name = "system.ene"
first = 0.0
interval = 1.2
flush_interval = "@interval"
}
gcmc = {
type = gcmc
first = 0.0
name = "$JOBNAME$[_replica$REPLICA$].gcmc"
eneseq = {
name = "$JOBNAME$[_replica$REPLICA$]_gcmc.ene"
}
interval = 4.8
nsteps = 5000
# number of trans/rot moves to ins/del moves
# .3333 = 1:2 = 1:1:1 ratio (trans:ins:del)
# a:b = a:b/2:b/2 such that ins/del is fixed
# move_fraction = 0.0
mu_excess = -6.180
solvent_density = 0.03262
# number of trans/rot moves to perform on an inserted water
# post_insertion_moves = 0
grid = {
track_voids = true
spacing = .22
exclusion_radius = 2.2
region_buffer = 4.0
global_switching = {
frequency = 0.2
move_factor = 3.0
spacing_factor = 2.0
}
}
verbose = 0
batch_size = %s
temperature = 300.0
# for randomize velocities
seed = 2007
restore_engrps = false
}
trajectory = {
type = trajectory
format = dtr
name = "out_trj"
write_velocity = true
first = 0.0
interval = 4.8
center = []
glue = []
mode = noclobber # append | noclobber | clobber
periodicfix = true
write_last_step = true
write_first_step = true
frames_per_file = 250
write_last_vel = false
}
maeff_output = {
# note - this is not a real plugin in GPU desmond. instead, this info is
# read by the function extract_cms_impl in desmondbackend.py
type = maeff_output
first = 0.0
interval = 120.0
name = ""
write_last_step = true
periodicfix = true
precision = 8
full_system_only = false
trjdir = ""
center_atoms = ""
}
maeff_snapshot = {
type = maeff_snapshot
name = ""
first = 0.0
interval = 1.2
}
randomize_velocities = {
type = randomize_velocities
first = 0.0
interval = inf
seed = 2007
temperature = 300.0
}
simbox_output = {
type = simbox_output
name = ""
first = 0.0
interval = 1.2
}
energy_groups = {
type = energy_groups
name = ""
first = 0.0
interval = 1.2
options = [corr_energy ] # [pressure_tensor corr_energy self_energy]
write_report = true
}
remove_com_motion = {
type = remove_com_motion
first = 0.0
interval = inf
}
ptensor = {
type = ptensor
name = ""
first = 0.0
interval = 1.2
print_volume = false
}
dipole_moment = {
type = dipole_moment
first = 0.0
interval = 1.2
name = ""
}
}
CHECKPT_COMMON = {
name = "checkpt.cpt"
first = 0.0
interval = 240.06
wall_interval = 7200.0
write_first_step = false
write_last_step = true
}
mdsim = {
title = "Desmond MD simulation"
last_time = 1200.0 # In the unit of ps
checkpt = "@*.CHECKPT_COMMON"
plugin = "@*.PLUGIN_COMMON"
}
reinit = {
title = "Desmond reinit/MD simulation"
last_time = inf
checkpt = "@*.CHECKPT_COMMON"
plugin = "@*.PLUGIN_COMMON"
frameset = {
first = 0.0
interval = 1200.0
configurations_file = ""
}
}
remd = {
title = "Desmond REMD simulation"
last_time = 1200.0
first = 1.2
interval = 1.2
seed = 2012 # random number seed
checkpt = "@*.CHECKPT_COMMON"
plugin = "@*.PLUGIN_COMMON"
graph = {
edges = {type = linear nodes = []}
}
deltaE = {
first = "@*.first"
interval = "@*.interval"
name = "deltaE.txt"
}
exchange_dat = {
name = "exchange.dat"
}
}
vrun = {
title = "Desmond Vrun"
input = frameset # bootfile | frameset (|stdin to be a feature added later)
checkpt = "@*.CHECKPT_COMMON"
plugin = "@*.PLUGIN_COMMON"
last_time= inf
frameset = {
first = 0.0
interval = 0.0
name = ""
last_time = "@*.last_time"
}
}
gui.ewald_tol = 1E-10
}""" % (GCMC_BATCH_SIZE_DEFAULT)
get_default_setting = config.get_default_setting
parse_lambda = config.parse_lambda
get_exec_name = config.get_exec_name
get_exec = config.get_exec
is_concat = config.is_concat
is_minimize = config.is_minimize
is_meta = config.is_meta
__set_meta_file = config.__set_meta_file
__set_meta = config.__set_meta
__set_reinit_frameset = config.__set_reinit_frameset
is_vrun = config.is_vrun
is_reinit = config.is_reinit
is_remd = config.is_remd
has_plugin = config.has_plugin
remove_plugin = config.remove_plugin
get_homebox = config.get_homebox
is_triclinic_box = config.is_triclinic_box
[docs]def add_plugin(desmond_exec, plugin_name, position=None):
"""
Wrap config.add_plugin, making sure to place added plugins before GCMC
"""
# by default we always keep GCMC plugin at the end because it modifies the
# system and can produce seemingly inconsistent outputs between different
# reporting plugins if it executes between them. This is a sensible default,
# not a requirement.
if position is None and 'gcmc' in desmond_exec.plugin.list:
position = desmond_exec.plugin.list.index('gcmc')
config.add_plugin(desmond_exec, plugin_name, position)
[docs]def real_to_int(val):
"""
round up to the closest integer when the difference is less than 1e-7
"""
return int(val + 1.0e-7)
[docs]def optimize_key(msj, cfg, box: List[float]):
"""
Optimizes the simulation parameters in 'cfg', where 'cfg' must represent
a complete config file.
:param box: Simulation box dimensions.
"""
import math
r_cut = cfg.force.nonbonded.r_cut.val
cpu = cfg.global_cell.partition.val
dt = cfg.integrator.dt.val
far_ts = cfg.integrator.respa.far_timesteps.val
out_dt = dt * cfg.integrator.respa.outer_timesteps.val
far_dt = dt * far_ts
epsilon = cfg.gui.ewald_tol.val
# Below is the idea how to optimize the margin. -YW
# 1. The "margin" parameter's value can be optimized as k * rmsm + C, where k is a prefactor, rmsm is the root mean square
# motion, C is the maximum distance that a particle can travel without colliding to another one. Below, let's first
# consider the root mean square motion, and then k and C.
# 2. The root-mean-square motion (rmsm) can be calculated from the diffusion constant using the equation:
# rmsm = sqrt( <x^2> - <x>^2 ) = sqrt( 6 * D * dt )
# where D is diffusion constant, and dt is the time interval.
# 3. How to get dt and D?
# - dt is easy. We just get its value from `cfg' since it is a settable parameter.
# - D is a little bit troublesome. First of all, different molecules have different diffusion constants. In principle,
# We should use the largest D, but unfortunately we don't rigorously know what the largest D is. Nevertheless, for
# aqueous systems, it is probably safe to assume water is the most diffusive molecule. So we use water's D.
# - A more difficult problem is that D is temperature-dependent. For water molecules, D = 0.0187 (T = 242.5 K),
# 0.105 (T = 273.5 K), 0.223 (T = 298.0 K), 0.3575 (T = 318.0 K). The unit of D here is A^2/ps. So we need to somehow
# predict D based on the temperature. For this, we can use the Frenkel-Eyring theory, which basically says
# D = A * exp( -E / RT )
# where A and E presumably are constants (this assumption is an oversimplification, but should be good enough for our
# purpose here). By fitting to the experimental data, we get A = 807.015 A^2/ps and E = 4.87 kcal/mol.
# 4. For very high temperature, D is overestimated a lot by the Frenkel-Eyring theory. So we use the following empirical
# formula to correct the result: D = D - 0.03 * (T - 350), for T > 350.
# 5. How do we choose value for the prefactor k?
# - First of all, commonly used water models such as TIP3P and SPC tend to overestimate water diffusion constant. So to
# be safe, we should assume the D of water model is at least 2 times the experimental value.
# - Second, remember that mean square displacement formula for D is correct only for time -> inf; for time -> 0, the
# value would be much larger, presumedly ~100% larger.
# - The above thoughts lead to a working value for diffusion constant = 4 D, where D is the experimental value. So this
# gives k = sqrt( 4 ) = 2.
# 6. How about C?
# - For the value of C, we can look at the radial distribution function. The width of the first peak is the maximum
# radial space that first shell of water molecules can occupy. Because for any direction only one molecule can occupy
# that radial space, we can safely take the width as the value of C. For liquid water, the value is ~0.4 Angstrom.
# Gets the highest temperature.
desmond_exec = cfg[cfg.app.val]
if (has_plugin(desmond_exec, "anneal")):
temp = max(desmond_exec.plugin.anneal.schedule.value.val)
else:
if ("T_groups" in cfg.integrator.temperature):
# FIXME: This won't work for gDesmond
temp_list = []
for e in cfg.integrator.temperature.T_groups:
temp_list.append(e.T_ref.val)
temp = max(temp_list)
else:
temp = cfg.integrator.temperature.T_ref.val
box_x, box_y, box_z = cms.get_boxsize(box)
homebox = get_homebox(box, cpu)
diffusion = 807.015 * math.exp(-4.87 / 1.986E-3 / temp) - 0.03 * max(
0, temp - 350)
migration_interval = cfg.migration.interval.val
# Adjusts the migration interval to be `N x out_dt', where `N' is an integer number >= 1.
N = real_to_int(old_div(migration_interval, out_dt))
N = 1 if (1 > N) else N
cfg.integrator.respa.migrate_timesteps.val = N * cfg.integrator.respa.outer_timesteps.val
migration_interval = N * out_dt
rmsm = math.sqrt(6 * diffusion * migration_interval)
margin = rmsm * 2.0 + 0.8
# We don't want to transfer a small amount of data between nodes very frequently. This is because of the transfer latency
# overhead, which might exceed the actual data-transferring time. On the other hand we don't want a big margin either,
# because a lot of particle pairs would have distances larger than the cutoff. This thought leads to a further checking:
# If the margin is still ``big'' and the migration interval can be decreased, we will reduce the margin further more.
# How ``big'' is ``big''?
# - An arbitrary yet reasonable choice is this: If the margin causes > 50% extra computation, we consider it as big.
# - pow( 1.5, 1.0/3.0 ) = 1.1447142425533319
while (margin > r_cut * 0.1447142425533319 and
migration_interval >= far_dt * 1.9):
migration_interval -= far_dt
# Adjusts the migration interval to be `N x out_dt', where `N' is an integer number >= 1.
N = real_to_int(old_div(migration_interval, out_dt))
N = 1 if (1 > N) else N
cfg.integrator.respa.migrate_timesteps.val = N * cfg.integrator.respa.outer_timesteps.val
migration_interval = N * out_dt
rmsm = math.sqrt(6 * diffusion * migration_interval)
margin = rmsm * 2.0 + 0.8
r_clone = (r_cut + margin) * 0.5
cfg.global_cell.margin.val = margin
cfg.migration.interval.val = migration_interval
r_cut4 = r_cut * 4
if (is_triclinic_box(box) and
(box_x < r_cut4 or box_y < r_cut4 or box_y < r_cut4)):
cfg.global_cell.clone_policy.val = "unrounded"
else:
cfg.global_cell.clone_policy.val = "rounded"
if (cfg.force.nonbonded.far.type.val != "none"):
toler = math.sqrt(abs(math.log(epsilon * r_cut)))
sigma = old_div(math.sqrt(abs(math.log(epsilon * r_cut * toler))),
r_cut)
toler = math.sqrt(-math.log(epsilon * r_cut * (2.0 * toler * sigma)**2))
fac = sigma * toler / 3.1415926
n_k = [
0.25 + fac * box_x,
0.25 + fac * box_y,
0.25 + fac * box_z,
]
for i, n in enumerate(n_k):
p = cpu[i]
n = int(math.ceil(n))
# Each grid dimension must satisfy the following conditions:
# 1. if p is given, it must divide n
# 2. n must have no integer factors greater than 2, 3, or 5
while (True):
# check that n is commensurate with p
if (p):
if (n % p):
n += 1
continue
N = n
for radix in (
2,
3,
5,
):
while (not N % radix):
N /= radix
if (N == 1):
break
n += 1
continue
n_k[i] = n
cfg.force.nonbonded.far.n_k.val = n_k
cfg.force.nonbonded.sigma.val = old_div(1, sigma)
pme_order = cfg.force.nonbonded.far.order.val
margin_ = old_div(margin, 1.414)
if (cfg.force.nonbonded.far.type.val[:3] == "gse"):
r_clone = max(r_clone, cfg.force.nonbonded.far.r_spread.val + margin_)
r_lazy = r_clone * 2.0
elif (cfg.force.nonbonded.far.type.val[:3] == "pme"):
a = cpu[0] * (pme_order[0] - 1) * 0.5 / n_k[0]
b = cpu[1] * (pme_order[1] - 1) * 0.5 / n_k[1]
c = cpu[2] * (pme_order[2] - 1) * 0.5 / n_k[2]
tmpx = homebox[0] * a + homebox[1] * b + homebox[2] * c
tmpy = homebox[3] * a + homebox[4] * b + homebox[5] * c
tmpz = homebox[6] * a + homebox[7] * b + homebox[8] * c
r_pme = math.sqrt(tmpx * tmpx + tmpy * tmpy + tmpz * tmpz) + margin_
r_clone = max(r_clone, r_pme + 0.01)
mmc = cfg.integrator.pressure.max_margin_contraction.val
try:
if (msj.bigger_rclone.val):
r_clone /= mmc
except AttributeError:
pass
r_lazy = r_clone * 2.0
if (r_lazy > old_div(box_x, cpu[0]) and
r_lazy > old_div(box_y, cpu[1]) and
r_lazy > old_div(box_z, cpu[2])):
print("WARNING: Cutoff radius is too large relative to box size.\n" \
" Try either increasing box size or reducing cutoff radius.")
cfg.global_cell.r_clone.val = r_clone + 1E-6
cfg.force.nonbonded.r_lazy.val = cfg.global_cell.r_clone.val * 2
default_flex_vdw = r""" {
vdwA = "@*.*.weights.vdwA"
vdwB = "@*.*.weights.vdwB"
qA = "@*.*.weights.qA"
qB = "@*.*.weights.qB"
name = "%s"} """
default_flex_bond = r""" {
A = "@*.*.weights.bondA"
B = "@*.*.weights.bondB"
name = "%s" } """
default_flex_dihe = r""" {
A = "@*.*.flexible[0].A"
B = "@*.*.flexible[0].B"
name = "%s"} """
def __get_epsilon_lambda_schedule(n_win, B=False):
"""
generate lambda schedule for epsilons in soft-core
potentials
n_win: number of windows
B: final state
"""
tmp_list = [0.1] * n_win
if B:
tmp_list[0] = 0.0
else:
tmp_list[-1] = 0.0
return sea.List(tmp_list)
[docs]class LambdaSchedule:
"""Base class for lambda schedules."""
lambdas: List[float] = []
[docs] @classmethod
def schedule(cls, n_win: int) -> List[float]:
"""Return schedule with the desired number of windows.
This method constructs a piecewise linear spline interpolator of the
prototype lambda schedule and evaluates it on the specified number of
windows to obtain the new lambda schedule.
:param n_win: Number of lambda windows to use.
:return: New lambda schedule.
"""
lambdas = cls.lambdas
values = np.linspace(0.0, 1.0, len(lambdas))
interp = scipy.interpolate.interp1d(values, lambdas, kind='slinear')
new_values = np.linspace(0.0, 1.0, n_win)
return interp(new_values).tolist()
[docs]class ScheduleAngleCorePhysical(LambdaSchedule):
"""Schedule for angle_core_physical terms."""
lambdas = [
1.0, 0.8, 0.6, 0.35, 0.2, 0.1, 0.03, 0.01, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
0.0, 0.0
]
[docs]class ScheduleAngleCoreDummy(LambdaSchedule):
"""Schedule for angle_core_dummy terms."""
lambdas = [
1.0, 0.8, 0.65, 0.5, 0.4, 0.3, 0.2, 0.16, 0.12, 0.08, 0.04, 0.01, 0.0,
0.0, 0.0, 0.0
]
[docs]class ScheduleBondCorePhysical(LambdaSchedule):
"""Schedule for bond_core_physical terms."""
lambdas = [
1.0, 0.3, 0.1, 0.05, 0.03, 0.01, 0.002, 0.001, 0.0, 0.0, 0.0, 0.0, 0.0,
0.0, 0.0, 0.0
]
[docs]class ScheduleBondCoreDummy(LambdaSchedule):
"""Schedule for bond_core_dummy terms."""
lambdas = [
1.0, 0.6, 0.35, 0.22, 0.18, 0.14, 0.12, 0.11, 0.09, 0.08, 0.06, 0.02,
0.005, 0.0, 0.0, 0.0
]
[docs]class ScheduleAngleAttachment(LambdaSchedule):
"""Schedule for angle_attachment terms."""
lambdas = [
1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.95, 0.9, 0.85, 0.8, 0.75, 0.6,
0.35, 0.1, 0.0
]
[docs]class ScheduleLinear(LambdaSchedule):
"""Linear lambda schedule."""
lambdas = [1.0, 0.0]
def __get_flexible_lambda_schedule(n_win):
"""
Return sea.List object for flexible lambda schedules
"""
def sched2str(schedule: List[float]) -> str:
s = [str(sched) for sched in schedule]
return ' '.join(s), ' '.join(reversed(s))
angle_core_physical = ScheduleAngleCorePhysical.schedule(n_win)
angle_core_physicalA, angle_core_physicalB = sched2str(angle_core_physical)
angle_core_dummy = ScheduleAngleCoreDummy.schedule(n_win)
angle_core_dummyA, angle_core_dummyB = sched2str(angle_core_dummy)
bond_core_physical = ScheduleBondCorePhysical.schedule(n_win)
bond_core_physicalA, bond_core_physicalB = sched2str(bond_core_physical)
bond_core_dummy = ScheduleBondCoreDummy.schedule(n_win)
bond_core_dummyA, bond_core_dummyB = sched2str(bond_core_dummy)
angle_attachment = ScheduleAngleAttachment.schedule(n_win)
angle_attachmentA, angle_attachmentB = sched2str(angle_attachment)
alchemical_restraint = ScheduleLinear.schedule(n_win)
alchemical_restraintA, alchemical_restraintB = sched2str(
alchemical_restraint)
flex_lambda_list = [
f""" {{
A = [ {angle_core_physicalA}]
B = [ {angle_core_physicalB}]
name = "angle_core_physical" }} """, f""" {{
A = [ {angle_core_dummyA}]
B = [ {angle_core_dummyB}]
name = "angle_core_dummy" }} """, f""" {{
A = [ {bond_core_physicalA}]
B = [ {bond_core_physicalB}]
name = "bond_core_physical" }} """, f""" {{
A = [ {bond_core_dummyA}]
B = [ {bond_core_dummyB}]
name = "bond_core_dummy" }} """, f""" {{
A = [ {angle_attachmentA}]
B = [ {angle_attachmentB}]
name = "angle_attachment" }} """, f""" {{
A = [ {alchemical_restraintA}]
B = [ {alchemical_restraintB}]
name = "alchemical_restraint" }} """
]
flex_lambda_list += [
default_flex_dihe % x
for x in ["dihedral_core_physical", "dihedral_core_dummy"]
]
flex_lambda_list += [
default_flex_bond % x
for x in ["dihedral_attachment", "dihedral_restraint"]
]
flex_lambda_list += [
default_flex_vdw % x for x in [
"excluded2pair", "pair2nonbonded", "excluded2nonbonded",
"nonbonded2pair", "pair2excluded", "nonbonded2excluded", "unmatched"
]
]
flex_sea = sea.List(list(map(sea.Map, flex_lambda_list)))
for sched in flex_sea:
if sched.name.val in [
"angle_core_physical", "angle_core_dummy",
"dihedral_core_physical", "dihedral_core_dummy",
"dihedral_attachment", "angle_attachment", "dihedral_restraint"
]:
sched.EpsA = __get_epsilon_lambda_schedule(n_win)
sched.EpsB = __get_epsilon_lambda_schedule(n_win, B=True)
return flex_sea
def __set_fep(msj, cfg, model):
"""
"""
if (model is None):
return
sys_type = cms.get_model_system_type(model.comp_ct)
fep_type = "alchemical" if (sys_type
== SystemType.ALCHEMICAL) else "binding"
cfg.force.nonbonded.near["type"] = fep_type + ":softcore"
far_type = cfg.force.nonbonded.far.type.val
if (fep_type == "binding" and far_type in ["pme", "gse", "QuadS"]):
cfg.force.nonbonded.far.type.val = "binding:" + far_type
if (cfg.force.nonbonded.type.val == "useries"):
cfg.force.nonbonded.near.type.val += "-minimax"
if (sys_type == SystemType.OTHER):
return
desmond_exec = get_exec(msj, cfg)
if ("gibbs" not in desmond_exec.plugin.list.val):
cfg.force.term.list.append("gibbs")
gibbs = cfg.force.term.gibbs
gibbs.type.val = fep_type
gibbs.output.name.val = msj.fep.output.name.val
gibbs.output.first.val = msj.fep.output.first.val
gibbs.output.interval.val = msj.fep.output.interval.val
if (msj.fep.i_window.val is not None):
gibbs.window.val = msj.fep.i_window.val
if (isinstance(msj.fep.lambda_, sea.Atom)):
scheme, n_win = parse_lambda(msj.fep.lambda_.val)
if (n_win < 4):
raise ValueError("Total number of windows should be 4 at least")
fs = config.get_fep_lambdas(n_win, fep_type, scheme)
la = gibbs.weights
la["vdw"] = fs.vdw
la["es"] = fs.coulomb
la["vdwA"] = fs.vdwA
la["vdwB"] = fs.vdwB
la["qA"] = fs.chargeA
la["qB"] = fs.chargeB
la["qC"] = [1 - eA - eB for eA, eB in zip(fs.chargeA, fs.chargeB)]
la["bondA"] = fs.bondedA
la["bondB"] = fs.bondedB
la["N"] = n_win
if scheme == "flexible":
gibbs.flexible = __get_flexible_lambda_schedule(n_win)
else:
if (fep_type != config.lambda_type(msj.fep.lambda_)):
raise ValueError("Wrong lambda schedule type")
if (fep_type == "alchemical"):
fs = msj.fep.lambda_
la = gibbs.weights
la["vdwA"] = fs.vdwA
la["vdwB"] = fs.vdwB
la["qA"] = fs.chargeA
la["qB"] = fs.chargeB
la["qC"] = [
1 - eA.val - eB.val for eA, eB in zip(fs.chargeA, fs.chargeB)
]
la["bondA"] = fs.bondedA
la["bondB"] = fs.bondedB
la["N"] = len(fs.vdwA)
else:
fs = msj.fep.lambda_
la = gibbs.weights
la["vdw"] = fs.vdw
la["es"] = fs.coulomb
la["N"] = len(fs.vdw)
def __set_cutoff_radius(msj, cfg, model):
"""
"""
cfg.force.nonbonded.r_cut.val = msj.cutoff_radius.val
def __set_bigger_rclone(msj, cfg, model):
"""
"""
def __set_taper(msj, cfg, model):
"""
"""
default_taper_setting = sea.Map("method = shift width = 1.0")
if (isinstance(msj.taper, sea.Atom)):
if (msj.taper.val):
cfg.force.nonbonded.near.taper.val = default_taper_setting.method.val
cfg.force.nonbonded.near.r_tap.val = msj.cutoff_radius.val - default_taper_setting.width.val
else:
cfg.force.nonbonded.near.taper.val = "none"
cfg.force.nonbonded.near.r_tap.val = msj.cutoff_radius.val
else:
custom_taper_setting = msj.taper
msj.taper = default_taper_setting
msj.taper.update(custom_taper_setting)
cfg.force.nonbonded.near.taper.val = msj.taper.method.val
cfg.force.nonbonded.near.r_tap.val = msj.cutoff_radius.val - msj.taper.width.val
if (cfg.force.nonbonded.near.r_tap.val < 0):
raise ValueError("Invalid value for 'r_tap': %f" %
cfg.force.nonbonded.near.r_tap.val)
def __set_coulomb_method(msj, cfg, model):
"""
"""
if (isinstance(msj.coulomb_method, sea.Atom)):
if (msj.coulomb_method.val == "pme"):
cfg.force.nonbonded.type.val = "ewald"
cfg.force.nonbonded.near.type.val = "default"
cfg.force.nonbonded.far.type.val = "pme"
elif (msj.coulomb_method.val == "useries"):
if model and is_triclinic_box(model.box):
# Automatically resets the method to "pme" if the system uses a
# triclinic box because the useries method currently doesn't
# work with triclinic box. DESMOND-6842.
print(
"NOTE: The simulation box is triclinic. Changed coulomb_method from 'useries'\n"
" to 'pme'.")
msj.coulomb_method.val = "pme"
return __set_coulomb_method(msj, cfg, model)
cfg.force.nonbonded.type.val = "useries"
cfg.force.nonbonded.near.type.val = "minimax"
cfg.force.nonbonded.far.type.val = "QuadS"
else:
cfg.force.nonbonded.type.val = "vacuum"
cfg.force.nonbonded.far.type.val = "none"
cfg.force.nonbonded.sigma.val = "inf"
else:
method_name, ewald_tol = msj.coulomb_method.val
if (method_name == "pme"):
cfg.force.nonbonded.type.val = "ewald"
cfg.force.nonbonded.near.type.val = "default"
cfg.force.nonbonded.far.type.val = "pme"
cfg.gui.ewald_tol.val = ewald_tol
else:
raise ValueError("Unknown coulomb method: %s" % method_name)
def __set_temperature(msj, cfg, model):
"""
"""
if ("annealing" in msj and msj.annealing.val):
cfg.mdsim.plugin.anneal.schedule["time"] = sea.List()
cfg.mdsim.plugin.anneal.schedule["value"] = sea.List()
time = cfg.mdsim.plugin.anneal.schedule.time
value = cfg.mdsim.plugin.anneal.schedule.value
for e in msj.temperature:
value.append(e[0].val)
time.append(e[1].val)
else:
if (isinstance(msj.temperature, sea.Atom)):
cfg.integrator.temperature.T_ref.val = msj.temperature.val
else:
# This part probably does not work. But we don't know the correct syntax.
temperature = sea.Map()
if (1 < len(msj.temperature)):
t_list = sea.List()
for e in msj.temperature:
temp = sea.Map()
temp["T_ref"] = e[0].val
temp["groups"] = sea.List([
e[1].val,
])
t_list.append(temp)
temperature["T_groups"] = t_list
temperature["T_ref"] = msj.temperature[0][0].val
cfg.integrator["temperature"] = temperature
# Low temperature of the barostat can result in unstable simulations.
# We set the minimum temperature of the barostat to 100 K.
# For detail, refer to DESMOND-10477 and DESMOND-10543.
barostat = cfg.integrator.Multigrator.barostat
if isinstance(barostat, sea.Map):
barostat.temperature.T_ref.val = max(
100.0, cfg.integrator.temperature.T_ref.val)
def __set_annealing(msj, cfg, model):
"""
"""
if (msj.annealing.val):
add_plugin(cfg.mdsim, "anneal")
else:
remove_plugin(cfg.mdsim, "anneal")
def __set_pressure(msj, cfg, model):
"""
"""
if (isinstance(msj.pressure, sea.Atom)):
cfg.integrator.pressure.P_ref.val = msj.pressure.val
else:
cfg.integrator.pressure.P_ref.val = msj.pressure[0].val
if (isinstance(msj.ensemble, sea.Atom)):
if (msj.ensemble.val in [
"NPT",
"NPT_L",
]):
cfg.integrator.pressure.isotropy.val = msj.pressure[1].val
else:
if (msj.ensemble.class_.val == "NPT"):
cfg.integrator.pressure.isotropy.val = msj.pressure[1].val
def __set_surface_tension(msj, cfg, model):
"""
"""
if ((isinstance(msj.ensemble, sea.Atom) and msj.ensemble.val == "NPgT") or
(isinstance(msj.ensemble, sea.Map) and
msj.ensemble.class_.val == "NPgT")):
cfg.integrator.pressure.tension_ref.val = msj.surface_tension.val
else:
return
if ("tension_ref" in cfg.integrator.pressure):
del cfg.integrator.pressure["tension_ref"]
[docs]def get_ensemble_class_method(msj):
"""
"""
if (isinstance(msj.ensemble, sea.Atom)):
ens_type_map = {
"NPT": (
"NPT",
"MTK",
),
"NVT": (
"NVT",
"NH",
),
"NVE": (
"NVE",
"NH",
), # Method for "NVE" will be ignored.
"NPgT": (
"NPgT",
"MTK",
),
"NPAT": (
"NPAT",
"MTK",
),
"NPT_L": (
"NPT",
"Langevin",
),
"NVT_L": (
"NVT",
"Langevin",
),
"NPT_DPD": (
"NPT",
"DPD",
),
"NVT_DPD": (
"NVT",
"DPD",
),
}
return ens_type_map[msj.ensemble.val]
else:
return (
msj.ensemble.class_.val,
msj.ensemble.method.val,
)
def __set_ensemble(msj, cfg, model):
"""
"""
integrator = cfg.integrator
ens = "NVE"
tau = None
if (isinstance(msj.ensemble, sea.Atom)):
ens_type_map = {
"NPT": "MTK_NPT",
"NVT": "NH_NVT",
"NVE": "V_NVE",
"NPgT": "MTK_NPT",
"NPAT": "MTK_NPT",
"NPT_L": "L_NPT",
"NVT_L": "L_NVT",
"NPT_Brownie": "brownie_NPT",
"NVT_Brownie": "brownie_NVT",
"NPT_DPD": "DPD_NPT",
# FIXME: should likely be DPD_NVT
"NVT_DPD": "DPD",
}
ens = key = msj.ensemble.val
else:
if (msj.ensemble.class_.val == "NVE"):
ens_type_map = {
"NVE": "V_NVE",
}
key = msj.ensemble.class_.val
else:
# FIXME: Check the consistency between the "class" and "method" parameters when job is launched.
ens_type_map = {
(
"NPT",
"MTK",
): "MTK_NPT",
(
"NPgT",
"MTK",
): "MTK_NPT",
(
"NPAT",
"MTK",
): "MTK_NPT",
(
"NVT",
"NH",
): "NH_NVT",
(
"NPT",
"Langevin",
): "L_NPT",
(
"NPgT",
"Langevin",
): "L_NPT",
(
"NPAT",
"Langevin",
): "L_NPT",
(
"NVT",
"Langevin",
): "L_NVT",
(
"NPT",
"DPD",
): "DPD_NPT",
(
"NVT",
"DPD",
# FIXME: should be DPD_NVT
): "DPD",
(
"NPT",
"Brownie",
): "brownie_NPT",
(
"NVT",
"Brownie",
): "brownie_NVT",
}
ens = msj.ensemble.class_.val
method = msj.ensemble.method.val
ttau = msj.ensemble.thermostat.tau.val if (
"thermostat" in msj.ensemble and
method not in ["Brownie", "DPD"]) else None
btau = msj.ensemble.barostat.tau.val if ("barostat"
in msj.ensemble) else None
key = (
ens,
method,
)
tau = (
ttau,
btau,
)
try:
t = ens_type_map[key]
except KeyError:
raise ValueError(f"Unsupported ensemble class or method {key}")
if (ens == "NPgT"):
integrator.pressure.isotropy.val = "semi_isotropic"
elif (ens == "NPAT"):
integrator.pressure.isotropy.val = "constant_area"
if t in ["V_NVE"]:
integrator.Multigrator["thermostat"] = "none"
if t in ["NH_NVT", "L_NVT", "V_NVE", "NVT_DPD", "DPD"]:
integrator.Multigrator["barostat"] = "none"
if (tau is not None):
# tau[0] is for thermostat, tau[1] for barostat.
if t in ["MTK_NPT", "NH_NVT"]:
if (tau[0]):
ts = integrator.Multigrator.thermostat.timesteps.val
for e in integrator.Multigrator.thermostat.NoseHoover.tau:
e.val = old_div(tau[0], float(ts))
if (t == "MTK_NPT"):
ts = integrator.Multigrator.barostat.timesteps.val
if (tau[0]):
for e in integrator.Multigrator.barostat.MTK.thermostat.NoseHoover.tau:
e.val = old_div(tau[0], float(ts))
if (tau[1]):
integrator.Multigrator.barostat.MTK.tau.val = old_div(
tau[1], float(ts))
elif (t == "L_NVT"):
integrator.Multigrator.thermostat.type.val = "Langevin"
if (tau[0] is not None):
ts = integrator.Multigrator.thermostat.timesteps.val
integrator.Multigrator.thermostat.Langevin.tau.val = old_div(
tau[0], float(ts))
elif t == "DPD":
integrator.Multigrator.thermostat.type.val = "DPD"
elif (t == "L_NPT"):
integrator.Multigrator.thermostat.type.val = "Langevin"
integrator.Multigrator.barostat.type.val = "Langevin"
if (tau[0] is not None):
ts = integrator.Multigrator.thermostat.timesteps.val
integrator.Multigrator.thermostat.Langevin.tau.val = old_div(
tau[0], float(ts))
if (tau[1] is not None):
ts = integrator.Multigrator.barostat.timesteps.val
integrator.Multigrator.barostat.Langevin.thermostat.tau.val = old_div(
tau[0], float(ts))
integrator.Multigrator.barostat.Langevin.tau.val = old_div(
tau[1], float(ts))
elif (t == "brownie_NVT"):
integrator.brownie_NVT.delta_max.val = msj.ensemble.brownie.delta_max.val
elif (t == "brownie_NPT"):
if (tau[1] is not None):
integrator.brownie_NPT.barostat.tau.val = tau[1]
integrator.brownie_NPT.delta_max.val = msj.ensemble.brownie.delta_max.val
if (isinstance(msj.temperature, sea.Atom)):
integrator.brownie_NPT.barostat.T_ref.val = msj.temperature.val
else:
integrator.brownie_NPT.barostat.T_ref.val = msj.temperature[0][
0].val
if (t == "brownie_NVT"):
integrator.type.val = "brownie_NVT"
elif (t == "brownie_NPT"):
integrator.type.val = "brownie_NPT"
elif t in ["DPD", "DPD_NPT"]:
set_dpd_params(msj, cfg, ensemble=t)
[docs]def set_dpd_params(msj, cfg, ensemble):
"""
Set parameters for DPD simulation
:param str ensemble: name of the ensemble
"""
# Add special keywords from MATSCI-8136 and DESMOND-9886
cfg.force.nonbonded.far.type.val = "none"
cfg.force.nonbonded.near['average_dispersion'] = 0.0
cfg.force.nonbonded.near.correct_average_dispersion.val = "false"
cfg.force.nonbonded.near.type.val = 'polynomial'
cfg.force.nonbonded.near['seed'] = 2020
cfg.force.nonbonded.near['acceleration'] = 'dpd'
cfg.force.nonbonded.near['dt'] = msj.timestep.val[0]
cfg.force.nonbonded.near['T_ref'] = get_tempearture(msj)
cfg.integrator.Multigrator.nve.type.val = 'Verlet'
cfg.integrator.Multigrator['thermostat'] = 'none'
if ensemble == "DPD":
cfg.integrator.Multigrator.barostat.val = 'none'
elif ensemble == "DPD_NPT":
cfg.integrator.Multigrator.barostat.type.val = "MTK"
[docs]def get_tempearture(msj):
"""
get temperature from msj
:rtype: float
:return: temperature
"""
return (msj.temperature.val if isinstance(msj.temperature, sea.Atom) else
msj.temperature[0][0].val)
def __set_time(msj, cfg, model):
"""
"""
desmond_exec = get_exec(msj, cfg)
desmond_exec.last_time.val = msj.time.val
def __set_elapsed_time(msj, cfg, model):
"""
"""
cfg.global_cell.reference_time.val = msj.elapsed_time.val
def __set_timestep(msj, cfg, model):
"""
"""
timestep = msj.timestep.val
cfg.integrator.dt.val = timestep[0]
cfg.integrator.respa.near_timesteps.val = int(
old_div(timestep[1], timestep[0]) + 0.1)
cfg.integrator.respa.far_timesteps.val = int(
old_div(timestep[2], timestep[0]) + 0.1)
cfg.integrator.respa.outer_timesteps.val = int(
old_div(timestep[2], timestep[0]) + 0.1)
def __set_cpu(msj, cfg, model):
"""
"""
if (model is None):
return
if (isinstance(msj.cpu, sea.Atom)):
import schrodinger.application.desmond.autopartition as autopartition
x, y, z = cms.get_boxsize(model.box)
nx, ny, nz = autopartition.auto_partition([
x,
y,
z,
], msj.cpu.val)
else:
nx, ny, nz = msj.cpu.val
cfg.global_cell["partition"] = (
nx,
ny,
nz,
)
def __set_glue(msj, cfg, model):
"""
"""
def __set_vrun_frameset(msj, cfg, model):
"""
"""
cfg.vrun.frameset.name.val = msj.vrun_frameset.val
def __set_gcmc(msj, cfg, model):
"""
"""
desmond_exec = get_exec(msj, cfg)
gcmc = desmond_exec.plugin.gcmc
mgcmc = msj.gcmc
# set up plugin interval
gcmc.first.val = mgcmc.first.val
gcmc.interval.val = mgcmc.interval.val
# set up eneseq
gcmc.eneseq.name.val = mgcmc.ene_name.val
# setup moves
moves = mgcmc.moves
gcmc.nsteps.val = moves.moves_per_cycle.val
# disabled until functionality restored
# move_ratio_str = moves.trans_to_ins_del_ratio.val
# num_trans, num_ins_del = map(int, move_ratio_str.split(':'))
# gcmc.move_fraction.val = float(num_trans) / (num_trans + num_ins_del)
# gcmc.post_insertion_moves.val = \
# moves.post_insertion_translation_moves.val
# simulation values
gcmc.mu_excess.val = mgcmc.mu_excess.val
gcmc.solvent_density.val = mgcmc.solvent_density.val
# setup grid
gcmc.grid.spacing.val = mgcmc.gcmc_region.cell_size.val
gcmc.grid.exclusion_radius.val = mgcmc.gcmc_region.exclusion_radius.val
gcmc.grid.track_voids.val = mgcmc.gcmc_region.track_voids.val
gcmc.grid.region_buffer.val = mgcmc.gcmc_region.region_buffer.val
# check if deprecated spec is passed
if 'whole_box_frequency' in mgcmc.gcmc_region:
if 'global_switching' in mgcmc.gcmc_region:
raise ValueError(
"gcmc.gcmc_region: Only one of global_switching "
"and deprecated whole_box_frequency can be defined")
else:
global_switching = sea.Map()
global_switching.frequency = deepcopy(
mgcmc.gcmc_region.whole_box_frequency)
else:
global_switching = deepcopy(mgcmc.gcmc_region.global_switching)
# names should be the same - explicitly loop over keys instead of copying
# entire map to ensure keys are equivalent
for k in global_switching.keys():
gcmc.grid.global_switching[k].val = global_switching[k].val
gcmc.verbose.val = int(mgcmc.verbose.val)
# setup random seed
seed = mgcmc.seed.val
if seed == 'random':
# in case someone else has seeded the generator
state = np.random.get_state()
np.random.seed()
seed = np.random.randint(np.iinfo(np.int32).max)
# being a good neighbor
np.random.set_state(state)
gcmc.seed.val = seed
# set temperature from global msj value
ref_temp = msj.temperature.val if (isinstance(
msj.temperature, sea.Atom)) else msj.temperature[0][0].val
gcmc.temperature.val = ref_temp
add_plugin(desmond_exec, 'gcmc')
# if we use the GCMC plugin, we should tell energy_groups to output gcmc
# info. we can do this whether or not energy_groups is actually enabled (ie
# added to plugin.list)
if 'energy_groups' in desmond_exec.plugin:
desmond_exec.plugin.energy_groups.options.append('gcmc_info')
def _skip_replica_trajectory(msj: sea.Map) -> bool:
"""
Return True if the trajectory should not be
written for this replica based on the
`fep.trajetory.record_windows` setting.
"""
try:
fep_traj = msj.fep.trajectory.record_windows.val
i_window = msj.fep.i_window.val
except AttributeError:
pass
else:
if i_window is not None:
if isinstance(msj.fep.lambda_, sea.Atom):
_, n_win = parse_lambda(msj.fep.lambda_.val)
else:
fs = msj.fep.lambda_
n_win = len(
fs.vdwA) if config.lambda_type(fs) == "alchemical" else len(
fs.vdw)
if isinstance(fep_traj, str):
if fep_traj == "all":
fep_traj = list(range(n_win))
else:
fep_traj = [x if x >= 0 else n_win + x for x in fep_traj]
if i_window not in fep_traj:
return True
return False
def __set_trajectory(msj, cfg, model):
"""
"""
from schrodinger.application.desmond.packages import restraint
if (is_minimize(msj)):
return
desmond_exec = get_exec(msj, cfg)
if (isinstance(msj.trajectory, sea.Atom)):
remove_plugin(desmond_exec, "trajectory")
return
if _skip_replica_trajectory(msj):
for t_val in [msj.trajectory.first, msj.trajectory.interval]:
# use update in case t_val is of int type
t_val.update(sea.Atom(float("inf")))
desmond_exec.plugin.trajectory.first.val = msj.trajectory.first.val
desmond_exec.plugin.trajectory.interval.val = msj.trajectory.interval.val
desmond_exec.plugin.trajectory.periodicfix.val = msj.trajectory.periodicfix.val
desmond_exec.plugin.trajectory.write_velocity.val = msj.trajectory.write_velocity.val
has_last_vel = getattr(msj.trajectory, 'write_last_vel', False)
if has_last_vel:
desmond_exec.plugin.trajectory.write_last_vel.val = msj.trajectory.write_last_vel.val
try:
desmond_exec.plugin.trajectory.frames_per_file.val = msj.trajectory.frames_per_file.val
except AttributeError:
pass
try:
center = msj.trajectory.center
if (isinstance(center, sea.List)):
desmond_exec.plugin.trajectory.center.val = [
model.gid(e) for e in center.val
]
else:
desmond_exec.plugin.trajectory.center.val = [
model.gid(e) for e in model.select_atom(center.val)
]
except AttributeError:
pass
# Centering will mess up positional restraints, so disable if any are present
if model is not None and restraint.has_positional_restraints(model):
desmond_exec.plugin.trajectory.center.val = []
try:
format = "dtr"
format = msj.trajectory.format.val
desmond_exec.plugin.trajectory.format.val = format.lower()
except AttributeError:
pass
# Adjusts trajectory file name for its consistency with the format setting.
traj_fname = msj.trajectory.name.val
if format == "dtr":
# DTR doesn't have an extension name. Conventionally, we name after the
# pattern: <jobname>_trj.
if ".xtc" == traj_fname[-4:]:
traj_fname = traj_fname[:-4] + "_trj"
elif format == "xtc":
if "_trj" == traj_fname[-4:]:
traj_fname = traj_fname[:-4] + ".xtc"
elif ".xtc" != traj_fname[-4:]:
traj_fname = traj_fname + ".xtc"
if traj_fname != msj.trajectory.name.val:
print("NOTE: Renamed trajectory file name to %s to be consistent\n"
" with the specified format: %s." % (traj_fname, format))
desmond_exec.plugin.trajectory.name.val = traj_fname
# Set up the trajectory name to store the last frame velocity
if has_last_vel:
desmond_exec.plugin.trajectory['last_vel_name'] = sea.Atom(
f'{traj_fname[:-4]}_vel_trj')
def __set_eneseq(msj, cfg, model):
"""
"""
desmond_exec = get_exec(msj, cfg)
if (isinstance(msj.eneseq, sea.Atom)):
remove_plugin(desmond_exec, "eneseq")
return
desmond_exec.plugin.eneseq.name.val = msj.eneseq.name.val
desmond_exec.plugin.eneseq.first.val = msj.eneseq.first.val
desmond_exec.plugin.eneseq.interval.val = msj.eneseq.interval.val
try:
desmond_exec.plugin.eneseq["precision"] = msj.eneseq.precision.val
except AttributeError:
pass
def __set_checkpt(msj, cfg, model):
"""
"""
if (is_minimize(msj)):
return
desmond_exec = get_exec(msj, cfg)
if (isinstance(msj.checkpt, sea.Atom)):
if (msj.checkpt.val):
__set_checkpt(get_default_setting(msj), cfg, model)
else:
desmond_exec.checkpt.first.val = "inf"
desmond_exec.checkpt.interval.val = "inf"
desmond_exec.checkpt.wall_interval.val = "inf"
desmond_exec.checkpt.write_last_step.val = False
else:
desmond_exec.checkpt.name.val = msj.checkpt.name.val
desmond_exec.checkpt.write_last_step.val = msj.checkpt.write_last_step.val
if ("wall_interval" in msj.checkpt and
desmond_exec.checkpt.wall_interval.val != float("inf")):
desmond_exec.checkpt.first.val = "inf"
desmond_exec.checkpt.interval.val = "inf"
desmond_exec.checkpt.wall_interval.val = msj.checkpt.wall_interval.val
else:
desmond_exec.checkpt.first.val = msj.checkpt.first.val
desmond_exec.checkpt.interval.val = msj.checkpt.interval.val
desmond_exec.checkpt.wall_interval.val = "inf"
def __set_maeff_output(msj, cfg, model):
"""
"""
from schrodinger.application.desmond.packages import restraint
desmond_exec = get_exec(msj, cfg)
if isinstance(msj.maeff_output,
sea.Atom) or (is_remd(msj) and _skip_replica_trajectory(msj)):
remove_plugin(desmond_exec, "maeff_output")
return
desmond_exec.plugin.maeff_output.name.val = msj.maeff_output.name.val
desmond_exec.plugin.maeff_output.first.val = msj.maeff_output.first.val
desmond_exec.plugin.maeff_output.interval.val = msj.maeff_output.interval.val
try:
# If user runs desmond directly without multisim and if the center_atoms
# is not set in the input .cfg file, we will get an AttributeError.
desmond_exec.plugin.maeff_output[
"center_atoms"] = msj.maeff_output.center_atoms.val
except AttributeError:
desmond_exec.plugin.maeff_output[
"center_atoms"] = config.DEFAULT_SETTING.MD.maeff_output.center_atoms.val
# Centering will mess up positional restraints, so disable if any are present
if model is not None and restraint.has_positional_restraints(model):
desmond_exec.plugin.maeff_output["center_atoms"] = sea.List()
if is_minimize(msj):
desmond_exec.plugin.maeff_output.trjdir.val = ""
else:
# read trjidx key to support front-end cfg files generated with
# prior versions (<15-4)
if 'trjidx' in msj.maeff_output:
desmond_exec.plugin.maeff_output.trjdir.val = msj.maeff_output.trjidx.val.replace(
'-out.idx', '_trj')
else:
desmond_exec.plugin.maeff_output.trjdir.val = msj.maeff_output.trjdir.val
def __set_randomize_velocity(msj, cfg, model):
"""
"""
if (is_minimize(msj)):
return
desmond_exec = get_exec(msj, cfg)
if (isinstance(msj.randomize_velocity, sea.Atom)):
if (msj.randomize_velocity.val):
__set_randomize_velocity(get_default_setting(msj), cfg, model)
else:
remove_plugin(desmond_exec, "randomize_velocity")
else:
desmond_exec.plugin.randomize_velocities.first.val = msj.randomize_velocity.first.val
desmond_exec.plugin.randomize_velocities.interval.val = msj.randomize_velocity.interval.val
desmond_exec.plugin.remove_com_motion.first.val = msj.randomize_velocity.first.val
desmond_exec.plugin.remove_com_motion.interval.val = msj.randomize_velocity.interval.val
try:
desmond_exec.plugin.randomize_velocities.seed.val = msj.randomize_velocity.seed.val
except AttributeError:
pass
temp = msj.randomize_velocity.temperature.val
if (isinstance(temp, list)):
desmond_exec.plugin.randomize_velocities.temperature.val = temp[0][
0]
else:
desmond_exec.plugin.randomize_velocities.temperature.val = temp
com_idx = None
if has_plugin(desmond_exec, 'remove_com_motion'):
com_idx = desmond_exec.plugin.list.val.index('remove_com_motion')
add_plugin(desmond_exec, 'randomize_velocities', com_idx)
def __set_simbox(msj, cfg, model):
"""
"""
return
if (is_minimize(msj)):
return
desmond_exec = get_exec(msj, cfg)
if (isinstance(msj.simbox, sea.Atom)):
if (msj.simbox.val):
__set_simbox(config.get_default_setting(msj), cfg, model)
else:
remove_plugin(desmond_exec, "simbox_output")
else:
desmond_exec.plugin.simbox_output.name.val = msj.simbox.name.val
desmond_exec.plugin.simbox_output.first.val = msj.simbox.first.val
desmond_exec.plugin.simbox_output.interval.val = msj.simbox.interval.val
add_plugin(desmond_exec, "simbox_output")
[docs]def get_simbox_output_filename(msj):
"""
Returns 'None' if the simbox_output plugin is turned off.
"""
if (is_minimize(msj)):
return None
if (isinstance(msj.simbox, sea.Atom)):
if (sea.simbox.val):
return get_simbox_output_filename(get_default_setting(msj))
else:
return msj.simbox.name.val
def __set_energy_group(msj, cfg, model):
"""
"""
default_energy_group_setting = sea.Map("""
name = "$JOBNAME$[_replica$REPLICA$]_enegrp.dat"
first = 0.0
interval = 1.2
self_energy = false
corr_energy = true
""")
desmond_exec = get_exec(msj, cfg)
if (isinstance(msj.energy_group, sea.Atom)):
if (msj.energy_group.val):
msj["energy_group"] = default_energy_group_setting
__set_energy_group(msj, cfg, model)
else:
remove_plugin(desmond_exec, "energy_groups")
else:
custom_energy_group_setting = msj.energy_group
msj.energy_group = default_energy_group_setting
msj.energy_group.update(custom_energy_group_setting)
desmond_exec.plugin.energy_groups.name.val = msj.energy_group.name.val
desmond_exec.plugin.energy_groups.first.val = msj.energy_group.first.val
desmond_exec.plugin.energy_groups.interval.val = msj.energy_group.interval.val
eg = desmond_exec.plugin.energy_groups
def set_options(term):
if (term in msj.energy_group):
if (msj.energy_group[term].val):
eg.options.append(term)
else:
try:
eg.options.remove(term)
except ValueError:
pass
set_options("self_energy")
set_options("corr_energy")
# energy_groups slow down the simulation
add_plugin(desmond_exec, "energy_groups")
# if we use the energy groups plugin, we need to tell GCMC that it must
# restore energy groups. we can do this whether or not GCMC is actually
# enabled (ie added to plugin.list)
if 'gcmc' in desmond_exec.plugin:
desmond_exec.plugin.gcmc.restore_engrps.val = True
def __set_pressure_tensor(msj, cfg, model):
"""
Set ptensor plugin and corresponding variables from MSJ pressure_tensor
block.
"""
default_settings = sea.Map("""
name = "$JOBNAME$[_replica$REPLICA$].ptensor.gz"
first = 0.0
interval = 1.2
print_volume = false
""")
desmond_exec = get_exec(msj, cfg)
if (isinstance(msj.pressure_tensor, sea.Atom)):
if (msj.pressure_tensor.val):
msj["pressure_tensor"] = default_settings
__set_pressure_tensor(msj, cfg, model)
else:
remove_plugin(desmond_exec, "ptensor")
else:
custom_settings = msj.pressure_tensor
msj.pressure_tensor = default_settings
msj.pressure_tensor.update(custom_settings)
desmond_exec.plugin.ptensor.name.val = msj.pressure_tensor.name.val
desmond_exec.plugin.ptensor.first.val = msj.pressure_tensor.first.val
desmond_exec.plugin.ptensor.interval.val = msj.pressure_tensor.interval.val
desmond_exec.plugin.ptensor.print_volume.val = msj.pressure_tensor.print_volume.val
add_plugin(desmond_exec, "ptensor")
def __set_dipole_moment(msj, cfg, model):
"""
Set dipole moment plugin and corresponding variables from MSJ block.
"""
default_settings = sea.Map("""
first = 0.0
interval = 1.2
name = "$JOBNAME$[_replica$REPLICA$]_dipole.dat"
""")
desmond_exec = get_exec(msj, cfg)
if (isinstance(msj.dipole_moment, sea.Atom)):
if (msj.dipole_moment.val):
msj["dipole_moment"] = default_settings
__set_dipole_moment(msj, cfg, model)
else:
remove_plugin(desmond_exec, "dipole_moment")
else:
custom_settings = msj.dipole_moment
msj.dipole_moment = default_settings
msj.dipole_moment.update(custom_settings)
desmond_exec.plugin.dipole_moment.name.val = msj.dipole_moment.name.val
desmond_exec.plugin.dipole_moment.first.val = msj.dipole_moment.first.val
desmond_exec.plugin.dipole_moment.interval.val = msj.dipole_moment.interval.val
add_plugin(desmond_exec, "dipole_moment")
[docs]def get_energy_group_output_filename(msj):
"""
Returns 'None' if the energy_groups plugin is turned off.
"""
if (isinstance(msj.energy_group, sea.Atom)):
if (msj.energy_group.val):
return get_energy_group_output_filename(get_default_setting(msj))
else:
return msj.energy_group.name.val if (
msj.time.val > msj.energy_group.first.val) else None
[docs]def get_fep_output_filename(msj):
"""
"""
if (not is_minimize(msj)):
try:
if (msj.time.val > msj.fep.output.first.val):
return msj.fep.output.name.val
except AttributeError:
pass
# For minimization
def __set_max_steps(msj, cfg, model):
"""
"""
cfg.minimize.maxsteps.val = msj.max_steps.val
def __set_convergence(msj, cfg, model):
"""
"""
cfg.minimize.tol.val = msj.convergence.val
def __set_steepest_descent_steps(msj, cfg, model):
"""
"""
cfg.minimize.sdsteps.val = msj.steepest_descent_steps.val
def __set_num_vector(msj, cfg, model):
"""
"""
cfg.minimize.m.val = msj.num_vector.val
def __set_backend(msj, cfg, model):
"""
"""
if ("backend" in msj and isinstance(msj.backend, sea.Map)):
cfg.update(msj.backend.dval)
# For remd
[docs]def gen_temperature_for_solute_tempering(setting,
model,
base_temp,
asl,
printer=None):
"""
"""
from . import remd
if (model is None):
print(
"WARNING: Cannot predict REMD temperature ladder without model system."
)
return [
300,
400,
500,
600,
700,
800,
]
if ("highest_temperature" in setting and "exchange_probability" in setting):
top_temp = setting.highest_temperature.val
temp, prob = remd.predict_with_temp_and_exch((
base_temp,
top_temp,
), setting.exchange_probability.val, model, asl)
if ("highest_temperature" in setting and "n_replica" in setting):
top_temp = setting.highest_temperature.val
temp, prob = remd.predict_with_temp_and_nreplica((
base_temp,
top_temp,
), setting.n_replica.val, model, asl)
if ("exchange_probability" in setting and "n_replica" in setting):
temp, prob = remd.predict_with_nreplica_and_exch(
setting.n_replica.val, setting.exchange_probability.val, base_temp,
model, asl)[0]
if (printer):
printer("Temperature ladder (exchange probability):")
for t, p in zip(temp, prob):
printer(" %.3g (%.3g)" % (
t,
p,
))
printer(" %.3g (n/a)" % temp[-1])
return temp
[docs]def gen_replica(msj, model=None):
"""
"""
replica = msj.replica
generator = replica.generator.val
if (generator in [
"solute_tempering",
"parallel_tempering",
]):
ref_temp = msj.temperature.val if (isinstance(
msj.temperature, sea.Atom)) else msj.temperature[0][0].val
ref_temp = float(ref_temp)
asl = replica.atom.val if (generator == "solute_tempering") else "all"
if (isinstance(replica.temperature, sea.Map)):
rep_temp = gen_temperature_for_solute_tempering(
replica.temperature, model, ref_temp, asl)
else:
rep_temp = replica.temperature.val
replica = sea.List()
if (ref_temp not in rep_temp):
print(
"WARNING: Reference temperature (%f) not found in replicas (%s)"
% (
ref_temp,
str(rep_temp),
))
if (generator == "solute_tempering"):
rep_fname = sea.Atom(
"$JOBNAME_solute_tempering_replica%d-in.cms").val
for i, e in enumerate(rep_temp):
fname = rep_fname % i
if (model):
from . import remd
remd.write_ff_solute_tempering(model, fname, asl,
old_div(ref_temp, e))
replica.append(sea.Map("model_file = %s" % fname))
else:
for e in rep_temp:
replica.append(sea.Map("temperature = %.3g model_file = ?" %
e))
else:
raise ValueError("Unknown replica generator: %s" %
replica.generator.val)
return replica
def __set_replica(msj, cfg, model):
"""
"""
msj = deepcopy(msj)
replica_msj = []
if (is_remd(msj)):
replica = msj.replica
if (isinstance(replica, sea.Map)):
replica = gen_replica(msj, model)
if (not isinstance(replica, sea.List)):
raise ValueError("Unknown type for 'replica' parameter: %s" %
type(replica))
for e in replica:
msj_ = deepcopy(msj)
msj_.update(e)
replica_msj.append(msj_)
return (
msj,
replica_msj,
)
[docs]def get_replica_setting(msj):
"""
"""
ref, replica = __set_replica(msj, None, None)
return replica
def __set_model_file(msj, cfg, model):
"""
"""
if ("model_file" in msj):
desmond_exec = get_exec(msj, cfg)
desmond_exec.plugin.maeff_output["bootfile"] = msj.model_file.val
def __attach_msj(msj, cfg, model):
"""
"""
cfg["ORIG_CFG"] = msj
def __set_restrain(msj, cfg, model):
# only concatenated stages need this
pass
def __set_restraints(msj, cfg, model):
# only concatenated stages need this
pass
def __set_box(msj, cfg, model):
pass
[docs]def concatenate_cfgs(simulate_cfgs, base_cfg):
"""
Create a single backend config file from a list of simulate configs created
from the simulate block of a concatenate front-end config and the backend
config created from the rest of the concatenate front-end config.
:param simulate_cfgs: The list of backend config files created from the
simulate block of a concatenate front-end config
:type simulate_cfgs: `list` of `sea.Map`
:param base_cfg: The backend config file created from a concatenate
front-end config, with its simulate block removed.
:type base_cfg: `sea.Map`
:return: A copy of the base concatenate config with the information from the
simulate configs merged in.
:rtype: `sea.Map`
"""
base_cfg = deepcopy(base_cfg)
integrator = base_cfg.integrator
concat = sea.Map()
concat.sequence = sea.List()
first_restrain = None
posre_scaling = None
for i, simulate_cfg in enumerate(simulate_cfgs):
# we get a copy of the integrator that was modified and add it to the
# concatenator
simulate_integrator = simulate_cfg.integrator
this_integrator_type = simulate_integrator.type.val
this_integrator = simulate_integrator[this_integrator_type]
this_integrator_name = 'Stage_{}_'.format(i) + this_integrator_type
# copy default values from config's integrator level to each
# integrator-specific level
for cfg_attr in [
'dt', 'respa', 'temperature', 'pressure', 'thermostat',
'barostat', 'posre_scaling'
]:
if cfg_attr in simulate_integrator:
this_integrator[cfg_attr] = simulate_integrator[cfg_attr]
restraints = simulate_cfg.restraints
# not a valid part of backend config, just temporary attribute
del simulate_cfg.restraints
first_restrain, posre_scaling = set_relative_restrain_scaling(
first_restrain, posre_scaling, restraints, this_integrator)
# This updates values in base_cfg to be equal to the value of the last
# simulate cfg on which it is set.
# TODO This allows silent conflicts to be overwritten unseen. We should
# replace this with consistency checks, or refactor and use the
# consistency checks from cmj.get_concat_stages here
# Note: This will overwrite mdsim.last_time, which will
# be restored below.
for key in list(simulate_cfg):
if key not in ['integrator', 'ORIG_CFG', 'boot']:
base_cfg[key].update(simulate_cfg[key])
concat[this_integrator_name] = this_integrator
desmond_exec = get_exec(simulate_cfg.ORIG_CFG, simulate_cfg)
this_time = desmond_exec.last_time.val
# The concatenate stage runs cyclically.
# If the total concatenate stage simulation time is
# the sum of the sub-stage times, the last step will be
# one step run for the first sub-stage. This can cause
# stability problems (DESMOND-8606). Modify the last
# stage so it runs a little longer, preventing this issue.
if i == len(simulate_cfgs) - 1:
# Must be at least 1 timestep, larger is fine.
# Only 1 timestep of this is run, the rest is cut-off
# by the last_time.
this_time += 1.0
sequence_info = sea.Map(
dict(name=this_integrator_name,
time=this_time,
type=this_integrator_type))
concat.sequence.append(sequence_info)
integrator.Concatenator = concat
integrator.type.val = 'Concatenator'
# Restore the total time, which was overwriten in the update above.
__set_time(base_cfg.ORIG_CFG, base_cfg, None)
return base_cfg
[docs]def set_relative_restrain_scaling(first_restrain, previous_posre_scaling,
restrain, this_integrator):
"""
Set each integrator's posre_scaling attribute to the ratio between the
desired restraint force constant and the first force constant. This allows
us to modulate the restraint force constant between concatenate stages.
:param first_restrain: the restrain parameter from the first stage's
config, or None if updating the first stage
:type first_restrain: `sea.Map`
:param previous_posre_scaling: posre_scaling parameter from the previous
stage's config, or None if unknown
:type previous_posre_scaling: float or NoneType
:param restrain: this stage's restrain parameter
:type restrain: `sea.Map`
:param this_integrator: this stage's integrator
:type this_integrator: `sea.Map`
:return: the first restrain parameter and posre scaling factor
"""
if first_restrain is None:
assert previous_posre_scaling is None
if restrain.existing.val == 'retain':
raise ValueError('We cannot retain non-existing restraints')
first_restrain = restrain
first_fc = restrain_force_constant(first_restrain)
if 'posre_scaling' in this_integrator:
# should never be set prior to here to anything but one
assert this_integrator.posre_scaling.val == 1.
if restrain != first_restrain:
if not first_fc:
raise ValueError('We cannot change the restraints unless the first'
'restraint has a non-zero force constant')
if restrain.existing.val == 'retain':
assert previous_posre_scaling is not None
posre_scaling = previous_posre_scaling
else:
this_fc = restrain_force_constant(restrain)
if this_fc:
msg = cmj.check_restrain_diffs(restrain.new, first_restrain.new)
if msg:
raise ValueError(msg)
posre_scaling = this_fc / first_fc
else:
posre_scaling = 1.
this_integrator['posre_scaling'] = posre_scaling
return first_restrain, posre_scaling
[docs]def restrain_force_constant(restraints: sea.Map) -> float:
"""
Get restraints force constant.
:param restraints: the restraints block for a given stage
:return: the (first) restraint force constant
"""
if restraints.new:
# we can take the first here because we only allow lists when
# all restrain blocks are equal or none
r = restraints.new[0]
if 'fc' in r:
return r.fc.val
elif 'force_constant' in r:
return r.force_constant.val
else:
return r.force_constants.val
else:
return 0
def __optimize_cfg(msj, cfg, box: List[float] = None):
"""
Update the cfg settings based on the simulation box.
If box is None, do nothing.
"""
if box is not None:
optimize_key(msj, cfg, box)
# FIXME: replace optimize_key with forceconfig.
def __clean_cfg(cfg):
"""
"""
app_map = {
"mdsim": [
"PLUGIN_COMMON",
"CHECKPT_COMMON",
"minimize",
"remd",
"vrun",
"reinit",
],
"vrun": [
"PLUGIN_COMMON",
"CHECKPT_COMMON",
"minimize",
"remd",
"mdsim",
"reinit",
],
"reinit": [
"PLUGIN_COMMON",
"CHECKPT_COMMON",
"minimize",
"remd",
"mdsim",
"vrun",
],
"remd": [
"PLUGIN_COMMON",
"CHECKPT_COMMON",
"minimize",
"vrun",
"mdsim",
"reinit",
],
"minimize": [
"PLUGIN_COMMON",
"CHECKPT_COMMON",
"remd",
"vrun",
"mdsim",
"reinit",
],
}
for e in app_map[cfg.app.val]:
if (e in cfg):
del cfg[e]
# Deletes unused plugin settings.
desmond_exec = cfg[cfg.app.val]
used_plugin = desmond_exec.plugin.list.val
used_plugin += [
"list",
]
# for e in desmond_exec.plugin :
# if (e not in used_plugin) :
# del desmond_exec.plugin[e]
def __fix_cfg(msj, cfg):
"""
"""
# Special treatment for annealing
if ("annealing" in msj and msj.annealing.val):
ens_type = cfg.integrator.type.val
ens = cfg.integrator[ens_type]
del cfg.integrator[ens_type]
cfg.integrator.type.val = "anneal"
cfg.integrator["anneal"] = sea.Map("type = " + ens_type)
cfg.integrator.anneal[ens_type] = ens
# Special treatment for force.nonbonded.far
if (cfg.force.nonbonded.far.type.val == "none"):
cfg.force.nonbonded["far"] = "none"
# Adjusts the remd.interval and remd.deltaE.interval value to be dividible by migration.interval. DESMOND-3600
def justify_interval(val, againstee, description=""):
try:
old_val = val.val
val.val = (int(old_div(val.val, againstee) - 1E-7) + 1) * againstee
# Prints out warning. DESMOND-5196
print(
"NOTE: Adjusted %s to be dividible by migration.interval: %g => %g"
% (
description,
old_val,
val.val,
))
except OverflowError:
pass
try:
mi_interval_list = []
for replica in cfg.remd.graph.values():
try:
mi_interval_list.append(replica.migration.interval.val)
except AttributeError:
mi_interval_list.append(cfg.migration.interval.val)
mi_interval = min(mi_interval_list)
for replica in cfg.remd.graph.values():
try:
replica.migration.interval.val = mi_interval
except AttributeError:
# This exception happens when the replica uses the default migration.interval setting and doesn't have its
# own migration.interval data field.
pass
cfg.migration.interval.val = mi_interval
justify_interval(cfg.remd.first, mi_interval, "cfg.remd.first")
justify_interval(cfg.remd.interval, mi_interval, "cfg.remd.interval")
justify_interval(cfg.remd.deltaE.first, mi_interval,
"cfg.remd.deltaE.first")
justify_interval(cfg.remd.deltaE.interval, mi_interval,
"cfg.remd.deltaE.interval")
except AttributeError:
pass
def __msj2cfg_helper(msj, cfg, model, macro):
"""
"""
sea.set_macro_dict(macro)
cfg = sea.Map(DEFAULT_CONFIG) if (cfg is None) else cfg
cfg = cfg.dval
this_mod = globals()
for k in list(msj):
if (k not in [
"replica",
"backend",
]):
this_mod["__set_" + k](msj, cfg, model)
return cfg
[docs]def msj2cfg(msj, cfg, model, macro=None):
"""
"""
if 'simulate' in msj: # assumed to be a concatenate config
simulate_cfgs = []
for msj_ in msj.simulate:
simulate_cfg = msj2cfg(msj_, deepcopy(cfg), model, macro)
simulate_cfg.restraints = cmj.get_restraints_xor_convert_restrain(
msj_)
simulate_cfgs.append(simulate_cfg)
base_concat_msj = deepcopy(msj)
del base_concat_msj['simulate']
base_concat_cfg = msj2cfg(base_concat_msj, deepcopy(cfg), model, macro)
return concatenate_cfgs(simulate_cfgs, base_concat_cfg)
macro = macro or {}
msj_ref, msj_replica = __set_replica(msj, cfg, model)
cfg_ref = __msj2cfg_helper(msj_ref, deepcopy(cfg), model, macro)
cfg_replica = []
for i, e in enumerate(msj_replica):
macro["$REPLICA"] = str(i)
cfg_replica.append(__msj2cfg_helper(e, deepcopy(cfg), model, macro))
cfg_ref.app.val = get_exec_name(msj)
__optimize_cfg(msj_ref, cfg_ref, model and model.box)
__set_backend(msj_ref, cfg_ref, model)
cfg_ref = cfg_ref.bval
cfg_rep = []
for m, c in zip(msj_replica, cfg_replica):
c.app.val = get_exec_name(msj)
if m.model_file.val:
if m.has_key('box') and m.box.val is not None: # noqa: W601
# Use cached box info if available
box = m.box.val
else:
# Use StructureReader as it is faster than cms.Cms
rep_ct = StructureReader.read(m.model_file.val)
box = cms.get_box(rep_ct)
__optimize_cfg(m, c, box)
else:
__optimize_cfg(m, c, model and model.box)
__set_backend(m, c, model)
cfg_rep.append(c.bval)
cfg_diff = []
for e in cfg_rep:
changed, referred, added, lost = sea.diff(e, cfg_ref)
change = changed.update(added)
if ("mdsim" in change):
del change["mdsim"]
if ("minimize" in change):
del change["minimize"]
if ("vrun" in change):
del change["vrun"]
if ("reinit" in change):
del change["reinit"]
cfg_diff.append(change)
for i, e in enumerate(cfg_diff):
if cfg_ref.remd.graph.val.get("T%02d" % i):
cfg_ref.remd.graph["T%02d" % i].update(e)
else:
cfg_ref.remd.graph["T%02d" % i] = e
cfg_ref.remd.graph.edges.nodes.append("T%02d" % i)
__clean_cfg(cfg_ref)
__fix_cfg(msj, cfg_ref)
__attach_msj(msj, cfg_ref, model)
return cfg_ref
if ("__main__" == __name__):
import sys
msj = sea.Map(open(sys.argv[1], "r").read())
model = cms.Cms(file=sys.argv[2])
cfg = msj2cfg(msj, None, model, {
"$JOBNAME": "test",
})
print(cfg)
#optimize_key( cfg, model )