import contextlib
import copy
import os
import re
import shutil
import warnings
from pathlib import Path
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
import schrodinger.application.desmond.cmj as cmj
import schrodinger.utils.sea as sea
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import license
from schrodinger.application.desmond import stage
from schrodinger.application.desmond import struc
from schrodinger.application.desmond import util
from schrodinger.application.desmond.util import get_msj_filename
from schrodinger.application.desmond.constants import ABSOLUTE_BINDING_LEGS
from schrodinger.application.desmond.constants import FepLegTypes
from schrodinger.application.desmond.constants import CUSTOM_CHARGE_MODE
from schrodinger.application.desmond.constants import \
DEFAULT_LAMBDAS_BY_SCHEDULE
from schrodinger.application.desmond.constants import \
FEP_ABSOLUTE_BINDING_LIGAND
from schrodinger.application.desmond.constants import FEP_STRUC_TAG
from schrodinger.application.desmond.constants import FEP_TYPES
from schrodinger.application.desmond.constants import FRAGMENT_LINKING_JOBS
from schrodinger.application.desmond.constants import \
FRAGMENT_LINKING_SOLVENT_JOBS
from schrodinger.application.desmond.constants import REST_HOTREGION
from schrodinger.application.desmond.constants import REST_PROPERTIES
from schrodinger.application.desmond.constants import SIMULATION_PROTOCOL
from schrodinger.application.desmond.constants import Ensembles
from schrodinger.application.desmond.gcmc_utils import NUM_COPIES_DEFAULT
from schrodinger.application.desmond.multisim import parse
from schrodinger.application.desmond import solubility_utils
from schrodinger.infra import mm
MMSHARE_DIR = os.path.dirname(os.path.dirname(os.environ["MMSHARE_EXEC"]))
DESMOND_DATA_DIR = os.path.join(MMSHARE_DIR, "data", "desmond")
PROTOCOL_FNAME = {
Ensembles.NVT: "desmond_nvt_fep.msj",
Ensembles.NPT: "desmond_npt_fep.msj",
Ensembles.MUVT: "desmond_muvt_fep.msj"
}
# Only these FEP types have ligands in separate cts
CUSTOM_CHARGE_FEP_TYPES = (
FEP_TYPES.SMALL_MOLECULE,
FEP_TYPES.LIGAND_SELECTIVITY,
)
_SIMULATE_STAGE_NAMES = [
stage.Simulate.NAME, stage.LambdaHopping.NAME, stage.ReplicaExchange.NAME
]
_NET_CHARGE_COMPLEX_BUFFER_WIDTH = 8.0
_MIN_CHARGED_SALT_CONC = 0.15
_REST_ASL_TEMPLATE = "asl: atom.%s 1"
_MACRO_CYCLE_OPTION = 'stage[1].set_family.desmond.backend.soft_bond_alpha="0.5"'
_SET_BY_USER_TAG = "setbyuser"
_HMR_TIMESTEPS = [0.004, 0.004, 0.008]
_HMR_MIGRATION_INTERVAL = 0.024
_GCMC_SOLVENT_VDW_SCALE_FACTOR = 0.75
_GCMC_EQUILIBRATION_TOTAL_SOLVENT = NUM_COPIES_DEFAULT + 2000
_MEMBRANE = "membrane"
def _find_stage(stages: List[sea.Map], stage_name: str) -> Generator:
for s in stages:
if s.__NAME__ == stage_name:
yield s
def _set_stage_params(msj: sea.Map, stage_name: str, params: Dict):
"""
Set the params for the first stage that matches the given stage_name.
"""
for s in msj.stage:
if s.__NAME__ == stage_name:
for k, v in params.items():
s.set_value(k, v, tag=_SET_BY_USER_TAG)
break
[docs]class BaseMsjGenerator:
"""
Base class.
"""
[docs] def __init__(self, jobname: str, cd_params: Dict[str, object], **kwargs):
"""
# TODO consider renaming `cpus` to `gpus` or more generic `slots`
:param jobname: The jobname.
:param cd_params: A dictionary with `cpus` set to the number of gpu
slots to use, and 'mps_factor` set to the number of processes per
GPU.
:param forcefield: The name of the forcefield to use.
:type forcefield: str
:param sim_time: The simulation time for the production
stage in ps.
:type sim_time: int
:param sim_time_complex: The simulation time for the complex leg in ps
:type sim_time_complex: int
:param sim_time_solvent: The simulation time for the solvent leg in ps
:type sim_time_solvent: int
:param rand_seed: The random seed.
:type rand_seed: int
:param ensemble: The ensemble, NPT or NVT.
:type ensemble: str
:param buffer_width: The buffer width in Angstrom determines
how much solvent is added to the solute.
This is only used if it is larger than
the default buffer width.
:type buffer_width: float
:param lambda_windows: Number of lambda replicas to use for
the FEP stages.
:type lambda_windows: int
:param custom_charge_mode: Set to CUSTOM_CHARGE_MODE.ASSIGN to assign custom charges.
Set to CUSTOM_CHARGE_MODE.CLEAR to clear custom charges if present, without reassigning.
Set to CUSTOM_CHARGE_MODE.KEEP to keep existing custom charges if present.
:type custom_charge_mode: CUSTOM_CHARGE_MODE
:param h_mass: Set to True to enable hydrogen mass repartitioning
and False to disable it. Default is True.
:type h_mass: bool
:param concatenate: Set to True to concatenate the equilibration
FEP stages. Default is True.
:type concatenate: bool
:param max_walltime: Maximum walltime in seconds before the subjobs
are automatically checkpointed and restarted. The default of 0
means to not set a maximum walltime.
"""
self.jobname = jobname
self.cd_params = cd_params
self._DEFAULTS = {
'forcefield': mm.OPLS_NAME_F16,
'rand_seed': 2014,
'ensemble': Ensembles.NPT,
'buffer_width': 0.0,
'lambda_windows': None,
'custom_charge_mode': CUSTOM_CHARGE_MODE.ASSIGN,
'h_mass': True,
'concatenate': True,
'sim_time': 5000,
'sim_time_complex': None,
'sim_time_solvent': None,
'sim_time_vacuum': None,
'water_model': None, # Using default water model (SPC).
'molecules': stage.DisorderedSystemBuilder.N_MOLECULES,
"skip_legs": [],
'max_walltime': 0,
'ffbuilder': False,
'ff_host': None,
'ff_structure_file': None,
}
kwargs = copy.deepcopy(kwargs)
# Update the defaults
self._DEFAULTS.update(kwargs.pop('defaults', {}))
for k, v in self._DEFAULTS.items():
kwargs.setdefault(k, v)
for t in ['sim_time_complex', 'sim_time_solvent', 'sim_time_vacuum']:
kwargs[t] = kwargs[t] or kwargs['sim_time']
self.sim_params = kwargs
self._check_sim_params()
def _check_sim_params(self):
"""
Raise a ValueError if extra kwargs are specified.
Must be called by each subclass initializer after
setting sim_params and _DEFAULTS.
"""
keys_diff = set(self.sim_params) - set(self._DEFAULTS)
if keys_diff:
raise ValueError(
f'Invalid kwarg(s) specified: {",".join(keys_diff)}')
def _find_production_stages(self, msj: sea.Map) -> List[sea.Map]:
"""
Returns a list of production ``simulate`` or ``lambda_hopping`` stages
in the input.
:param msj: Msj input.
:return: Production simulate or lambda_hopping stage.
"""
_PRODUCTION_TITLE = "production"
prod_stages = []
last_prod_stage = None
for s in msj.stage:
if s.title.val == _PRODUCTION_TITLE:
prod_stages.append(s)
if s.__NAME__ in _SIMULATE_STAGE_NAMES:
last_prod_stage = s
if last_prod_stage is not None and \
last_prod_stage.title.val != _PRODUCTION_TITLE:
prod_stages.append(last_prod_stage)
return prod_stages
def _set_water_model(self, msj: sea.Map):
water = self.sim_params["water_model"]
if water is not None:
water = water.upper()
for s in _find_stage(msj.stage, stage.BuildGeometry.NAME):
s.set_value("solvent", water, tag=_SET_BY_USER_TAG)
for s in _find_stage(msj.stage, stage.AssignForcefield.NAME):
s.set_value("water", water, tag=_SET_BY_USER_TAG)
def _set_hmr(self, msj: sea.Map, only_last_stage: bool = False):
"""
Enable Hydrogen Mass Repartitioning in the simulation msj.
:param msj: Msj to update in place.
:param only_last_stage: Set to True to set HMR for the last production
simulate stage. Default is to set for all
production stages.
"""
for i, s in enumerate(msj.stage):
if s.__NAME__ == stage.AssignForcefield.NAME:
s.set_value("hydrogen_mass_repartition",
True,
tag=_SET_BY_USER_TAG)
break
prod_stages = list(self._find_production_stages(msj))
if only_last_stage:
prod_stages = [prod_stages[-1]]
additional_hmr_stages = self._get_additional_hmr_stages(msj)
for s in prod_stages + additional_hmr_stages:
s.set_value("timestep", _HMR_TIMESTEPS, tag=_SET_BY_USER_TAG)
s["backend"]["migration.interval"] = _HMR_MIGRATION_INTERVAL
s["backend"].add_tag(_SET_BY_USER_TAG)
def _get_additional_hmr_stages(self, msj: sea.Map) -> List[sea.Map]:
"""
Find any non-production stages which should be updated by _set_hmr
:param msj: The msj to search for stages in.
:return: A list of the stages to be udpated
"""
return []
def _set_buffer_width(self, msj: sea.Map, min_buffer_width: float):
"""
Set the buffer width for the simulation, if the custom_buffer width
is larger than the given default and the one in the msj.
:param msj: Msj to update in place.
:param min_buffer_width: Minimum buffer width in Angstrom.
"""
cur_buffer_width = 0
for i, s in enumerate(msj.stage):
if s.__NAME__ in [
stage.BuildGeometry.NAME, stage.SystemBuilder.NAME
]:
cur_buffer_width = s.buffer_width.val
buffer_width = max(cur_buffer_width, min_buffer_width,
self.sim_params["buffer_width"])
s.set_value("buffer_width", buffer_width, tag=_SET_BY_USER_TAG)
break
def _set_scale_solvent_vdw(self, msj: sea.Map, val: float):
# FIXME: Set all geometry builder params in a single function?
for s in _find_stage(msj.stage, stage.BuildGeometry.NAME):
s.set_value("scale_solvent_vdw", val, tag=_SET_BY_USER_TAG)
def _set_remove_overlapped_crystal_water(self, msj: sea.Map, val: bool):
for s in _find_stage(msj.stage, stage.BuildGeometry.NAME):
s.set_value("remove_overlapped_crystal_water",
val,
tag=_SET_BY_USER_TAG)
def _set_extract_structures(self, msj: sea.Map, keep_struc_tags: List[str]):
"""
Set the properties for the first extract_structures stage.
:param msj: Msj to update in place.
:param keep_struc_tags: List of structure tags for structures to keep.
Structure tags are as defined by `FEP_STRUC_TAG`.
"""
for s in _find_stage(msj.stage, stage.ExtractStructures.NAME):
if keep_struc_tags:
s.set_value("keep", keep_struc_tags, tag=_SET_BY_USER_TAG)
break
def _set_production_parameters(self, msj: sea.Map, sim_time: float,
rand_seed: int):
"""
Set the simulation time and random seed for the
production simulations.
:param msj: Msj to update in place.
:param sim_time: Simulation time in ps.
:param rand_seed: Random seed.
"""
self._set_seed(msj, rand_seed)
for s in self._find_production_stages(msj):
s.set_value("time", sim_time, tag=_SET_BY_USER_TAG)
s.set_value("title",
f"{self.sim_params.get('ensemble')}, {int(sim_time)}ps",
tag=_SET_BY_USER_TAG)
def _set_seed(self, msj: sea.Map, rand_seed: int):
"""
Set the random seed for the production stages.
:param msj: Msj to update in place.
:param rand_seed: Random seed.
"""
for s in self._find_production_stages(msj):
s["randomize_velocity"]["seed"] = rand_seed
s["randomize_velocity"].add_tag(_SET_BY_USER_TAG)
def _set_ff(self, msj: sea.Map):
"""
Set the forcefield to use in the simulation.
This updates the assign_forcefield and
disordered_system_builder stages.
:param msj: Msj to update in place.
"""
for s in msj.stage:
if s.__NAME__ == stage.BuildGeometry.NAME:
s.set_value("override_forcefield",
self.sim_params["forcefield"],
tag=_SET_BY_USER_TAG)
if s.__NAME__ in [
stage.AssignForcefield.NAME,
stage.DisorderedSystemBuilder.NAME
]:
s.set_value("forcefield",
self.sim_params["forcefield"],
tag=_SET_BY_USER_TAG)
def _set_permanent_restraint(self, msj: sea.Map, restraints: Dict):
"""
Set (permanent) restraints in the assign forcefield stage.
"""
for s in _find_stage(msj.stage, stage.AssignForcefield.NAME):
s.set_value("restraints", sea.Map(restraints), tag=_SET_BY_USER_TAG)
def _set_dsb_params(self, msj: sea.Map, params: Dict):
"""
Set the params for the disordered_system_builder stage.
"""
_set_stage_params(msj, stage.DisorderedSystemBuilder.NAME, params)
def _set_lambda_windows(self,
msj: sea.Map,
num_windows: Optional[int],
schedule_type: str = 'default'):
"""
Set the number of lambda windows and lambda schedule
to use in the simulation.
:param msj: Msj to update in place.
:param num_windows: Number of lambda windows or None to use default
for the given schedule.
:param schedule_type: Type of lambda schedule, 'default', 'flexible', or 'charge'.
"""
num_windows = num_windows or DEFAULT_LAMBDAS_BY_SCHEDULE[schedule_type]
task_stage = None
for s in _find_stage(msj.stage, 'task'):
task_stage = s
if task_stage is None:
raise RuntimeError("Could not find task stage.")
for stage_name in _SIMULATE_STAGE_NAMES:
task_stage["set_family"][stage_name] = sea.Map(
f'fep.lambda = "{schedule_type}:{num_windows}"')
task_stage.set_family[stage_name].add_tag(_SET_BY_USER_TAG)
def _set_assign_custom_charge(self, msj: sea.Map, mode: CUSTOM_CHARGE_MODE):
"""
Set the assign custom charge stage if
supported by the given forcefield.
:param msj: Msj to update in place.
:param mode: Custom charge mode to use.
"""
custom_charge_assigned = False
for s in msj.stage:
if s.__NAME__ == stage.AssignCustomCharge.NAME:
if mode in [CUSTOM_CHARGE_MODE.KEEP, CUSTOM_CHARGE_MODE.CLEAR]:
s.set_value("mode", mode, _SET_BY_USER_TAG)
continue
# Default to clear for unsupported forcefields
s.set_value("mode", CUSTOM_CHARGE_MODE.CLEAR, _SET_BY_USER_TAG)
forcefield = self.sim_params['forcefield']
ff_version = mm.opls_name_to_version(forcefield)
if ff_version >= 16:
s.set_value("mode", CUSTOM_CHARGE_MODE.ASSIGN,
_SET_BY_USER_TAG)
custom_charge_assigned = True
if not custom_charge_assigned:
for s in msj.stage:
if s.__NAME__ == stage.GenerateSolubilityFepStructures.NAME:
s.set_value("require_custom_charge", False,
_SET_BY_USER_TAG)
def _set_job_num_cpus(self,
jobs: List[sea.List],
num_cpus: int,
num_windows: int,
umbrella: bool = True,
mps_factor: Union[int, str] = 1,
max_walltime: int = 0):
"""
Set the number of cpus on each job. The number
of windows is used to determine the number of
cpus per job.
:param jobs: Jobs to update in place.
:param num_cpus: Number of cpus.
:param num_windows: Number of lambda windows.
:param umbrella: True to enable umbrella mode.
:param mps_factor: The oversubscription factor for CUDA MPS. 1
(default) is no oversubscription, higher values will allocate more
CPU processes and also add QARGs to the command that convey the
number of GPUs requested and the value of the oversubscription. If
mps_factor == `auto` the value will be determined from `num_cpus`
and `num_windows`
:param max_walltime: Maximum walltime in seconds before the subjobs are
automatically checkpointed and restarted. The default of 0 means to
not set a maximum walltime.
"""
num_cpus_arg = max(num_cpus // num_windows, 1)
if mps_factor == 'auto':
mps_factor = self._get_auto_mps_factor(num_cpus, num_windows)
def update(key, value, cmd):
key = sea.Atom(key)
if key in cmd:
cmd[cmd.index(key) + 1] = sea.Atom(value)
else:
cmd.extend([key, value])
for cmd in jobs:
subhost_ncpu = f"$SUBHOST:{num_cpus * mps_factor}"
update("-HOST", subhost_ncpu, cmd)
update("-cpu", str(num_cpus_arg), cmd)
if umbrella:
update("-mode", "umbrella", cmd)
if max_walltime:
update("-max-walltime", str(max_walltime), cmd)
if mps_factor > 1:
# in mps mode, the number of GPUs requested (mps_gpu) is equal
# to the number of slots (cpus) requested, and the actual number
# of cpus requested will be multiplied by the oversubscription
# factor. MPS=1 instructs the queue to create MPS daemons. (it
# is possible to submit non-mps jobs to the mps queue using
# mps_gpu as the resource, so we must pass MPS as well)
# TODO this code is queue-specific and should be removed when
# we have support for MPS in jobcontrol (JOBCON-5748)
update("-QARG", f"-l mps_gpu={num_cpus} -v MPS={mps_factor}",
cmd)
def _get_auto_mps_factor(self, num_cpus, num_windows):
# automatically determine smart value of oversubscription from ppj and
# lambda windows
if num_cpus >= num_windows:
warnings.warn(
'WARNING: There are not enough processors per job to make'
'use of MPS oversubscription.')
return 1
# search for oversubscription such that ppj * oversubscription
# evenly divides lambda windows. In order of preferability.
for oversub_factor in (4, 3, 2, 5):
if not num_windows % (num_cpus * oversub_factor):
break
else:
def round_up_to_int(a, b):
return ((a + b - 1) // b) * b
# find value of oversubscription that results in the least
# number of effective processes. Using MPS tends to give a 25%+
# speedup over single process version, so we penalize the single
# process effective processes by that amount
best_oversub = 1, round(1.25 *
round_up_to_int(num_windows, num_cpus) + .5)
# omit 5, it's likely only preferable if it evenly divides
for test_factor in (4, 3, 2):
effective_num_processes = round_up_to_int(
num_windows, num_cpus * test_factor)
if effective_num_processes < best_oversub[1]:
best_oversub = test_factor, effective_num_processes
oversub_factor = best_oversub[0]
if oversub_factor == 1:
warnings.warn(
'WARNING: No sensible value of MPS oversubscription '
'could be found based on the lambda_windows and ppj '
'values, running without MPS.')
return oversub_factor
def _set_compress_cms(self, msj: sea.Map):
"""
Set up a job to compress intermediate cms files.
:param msj: Msj to update in place.
"""
for s in msj.stage:
if s.__NAME__ in _SIMULATE_STAGE_NAMES:
name = s.get_value("maeff_output.name").raw_val
if not name.endswith('.gz'):
name += '.gz'
s.set_value("maeff_output.name", name, tag=_SET_BY_USER_TAG)
def _set_ffbuilder(self, msj: sea.Map):
"""
Set up the ffbuilder stage for the top level msj.
:param msj: Msj to update in place.
"""
_set_stage_params(
msj, stage.ForcefieldBuilderLauncher.NAME, {
'should_skip': False,
'host': self.sim_params['ff_host'],
'structure_file': self.sim_params['ff_structure_file']
})
def _get_msj_filename(
self,
leg: Optional[FepLegTypes] = None,
protocol: Optional[SIMULATION_PROTOCOL] = None) -> str:
return get_msj_filename(jobname=self.jobname,
leg=leg,
protocol=protocol)
[docs]class BaseFepMsjGenerator(BaseMsjGenerator):
def _set_is_for_fep(self, msj: sea.Map):
"""
Set the is for fep option.
:param msj: Msj to update in place.
"""
for s in msj.stage:
if s.__NAME__ in _SIMULATE_STAGE_NAMES + [
stage.Concatenate.NAME, stage.ReplicaExchange.NAME
]:
s.set_value("backend.is_for_fep", True, tag=_SET_BY_USER_TAG)
def _set_print_expected_memory(self, msj: sea.Map):
for s in msj.stage:
if s.__NAME__ in _SIMULATE_STAGE_NAMES + [
stage.Concatenate.NAME, stage.ReplicaExchange.NAME
]:
s.set_value("print_expected_memory", True, tag=_SET_BY_USER_TAG)
break
def _set_fep_type(self, msj: sea.Map):
"""
Set the fep_type in the input msj.
:param msj: Msj to update in place.
"""
last_task = None
for s in msj.stage:
# only use last task, as generic tasks may be appended before
# (for example in protein fep)
if s.__NAME__ == "task":
last_task = s
elif s.__NAME__ == "fep_analysis":
s.set_value("fep_type", self.fep_type, _SET_BY_USER_TAG)
if last_task:
last_task.set_value("set_family.simulate.fep.type", self.fep_type,
_SET_BY_USER_TAG)
def _set_fep_analysis(self, msj: sea.Map, include_sid: bool):
_set_stage_params(msj, stage.FepAnalysis.NAME,
{'include_sid': include_sid})
def _set_lambda_hopping(self,
msj: sea.Map,
sim_time: float,
rest_property: str = ''):
"""
Set the lambda hopping specific parameters in the simulation msj.
:param msj: Msj to update in place.
:param rest_property: Name of the ct property for custom REST scaling.
Default of empty str means to not set REST scaling.
"""
for production_stage in self._find_production_stages(msj):
# Renames the last simulate stage to lambda_hopping. If it is
# already lambda_hopping, we just assign the same name back to
# itself. It doesn't matter.
production_stage.__NAME__ = stage.LambdaHopping.NAME
if rest_property:
production_stage["solute_tempering"] = sea.Map("""
atom = "%s"
temperature = {
generator = PvdS
exchange_probability = 0.3
}
""" % (self.asl % rest_property))
production_stage.solute_tempering.add_tag(_SET_BY_USER_TAG)
self._set_production_parameters(msj, sim_time,
self.sim_params['rand_seed'])
[docs]class MapperMsjGenerator(BaseFepMsjGenerator):
"""
"""
[docs] def __init__(self, jobname, cd_params, **kwargs):
"""
In addition to the parameters passed to `BaseMsjGenerator`, the
`MapperMsjGenerator` supports the following.
:param atom_mapping: The SMARTS string used to determine the
core for atom mapping.
Default is to determine the atom mapping
automatically.
:param align_core_only: Do not adjust the non-core atoms when aligning the core atoms.
Default is False.
:param ats: (deprecated) Set to True to enable automated torsion scaling. Default is False.
:param charged_lambda_windows: Set the number of lambda windows used for charged perturbations.
Default is 24.
:param core_hopping_lambda_windows: Set the number of lambda windows used for core-hopping perturbations.
Default is 16.
:param membrane: Set to the filename containing the membrane structures.
Default is None, there is no membrane.
:param salt_concentration: Concentration of ions to add in M.
Default is 0.0, don't add any salt to non-charged protocols.
:param modify_dihe: Set to True to modify retained dihedral angle
interactions for customized core. Default is False.
:param minimize_volume: Set to True to turn on volume minimization
during the system_builder or build_geometry
stages. Default is False.
"""
_DEFAULTS = {
'atom_mapping': "",
'align_core_only': False,
'ats': False,
'charged_lambda_windows': None,
'core_hopping_lambda_windows': None,
'membrane': None,
'salt_concentration': 0.0,
'modify_dihe': False,
'minimize_volume': False,
}
kwargs.setdefault('defaults', {})
kwargs['defaults'].update(_DEFAULTS)
super(MapperMsjGenerator, self).__init__(jobname, cd_params, **kwargs)
self.fmp_fname = ""
# REST property name changes with different simulation leg
self.asl = _REST_ASL_TEMPLATE
self.solvent_box_buffer_width = 10.0
self.fep_type = FEP_TYPES.SMALL_MOLECULE
# Paths to template msj files
# May be modfied by subclasses to change the template in use.
self.from_main_msj = os.path.join(DESMOND_DATA_DIR,
"leadoptmap_launcher.msj")
self.from_subjob_msj = os.path.join(
DESMOND_DATA_DIR, PROTOCOL_FNAME[self.sim_params['ensemble']])
[docs] @contextlib.contextmanager
def patch_ensemble(self):
"""
A context manager to temporarily change the ensemble and subjob_msj
template for this instance to 'NPT' if the ensemble is 'muVT'.
NOTE: This is useful to allow the complex and solvent legs to have
different ensembles, but in the future this should be replaced with code
that natively supports different ensembles for the complex and solvent
leg.
"""
if self.sim_params['ensemble'] == Ensembles.MUVT:
self.sim_params['ensemble'] = Ensembles.NPT
self.from_subjob_msj = os.path.join(
DESMOND_DATA_DIR, PROTOCOL_FNAME[self.sim_params['ensemble']])
yield
self.sim_params['ensemble'] = Ensembles.MUVT
self.from_subjob_msj = os.path.join(
DESMOND_DATA_DIR, PROTOCOL_FNAME[self.sim_params['ensemble']])
else:
yield
[docs] def generate_main_msj(self):
"""
"""
msj_fname = self._get_msj_filename()
# 'raw is an object of sea.Sea (or its subclass).
raw = cmj.msj2sea_full(msj_fname)
self.modify_graph_stage(raw)
if self.sim_params['membrane']:
self._insert_stage_membrane_launcher(raw)
if self.sim_params['ffbuilder']:
self._set_ffbuilder(raw)
self.modify_fep_launcher_stage(raw)
cmj.write_sea2msj(raw.stage, msj_fname)
[docs] def generate_complex_msj(self, net_charge, ch=False, **kwargs):
buffer_width = 5.0
if net_charge:
buffer_width = _NET_CHARGE_COMPLEX_BUFFER_WIDTH
return self.generate_subjob_msj(buffer_width,
is_complex_leg=True,
net_charge=net_charge,
ch=ch,
**kwargs)
[docs] def generate_solvent_msj(self, net_charge, ch=False, **kwargs):
with self.patch_ensemble():
return self.generate_subjob_msj(self.solvent_box_buffer_width,
is_complex_leg=False,
net_charge=net_charge,
ch=ch,
**kwargs)
[docs] def generate_subjob_msj(self,
buffer_width,
is_complex_leg=False,
net_charge=False,
ch=False,
keep_struc_tags=None,
treat_solvent_like_complex=False,
**kwargs):
"""
:param keep_struc_tags: List of structure tags for structures to keep.
Structure tags are as defined by `FEP_STRUC_TAG`.
`None` means to use the default set of tags for
this fep type.
"""
with open(self.from_subjob_msj) as fh:
subjobmsj_contents = fh.read()
if keep_struc_tags is None:
if is_complex_leg:
keep_struc_tags = [
FEP_STRUC_TAG.VAL.RECEPTOR, FEP_STRUC_TAG.VAL.MEMBRANE,
FEP_STRUC_TAG.VAL.SOLVENT, FEP_STRUC_TAG.VAL.LIGAND
]
else:
keep_struc_tags = [FEP_STRUC_TAG.VAL.LIGAND]
subjobmsj_contents = re.sub(
"buffer_width *= *5.0",
" buffer_width = {bf}".format(bf=buffer_width),
subjobmsj_contents,
count=1)
return self._set_subjob_msj(
subjobmsj_contents,
buffer_width,
net_charge,
core_hopping=ch,
is_complex_leg=is_complex_leg,
keep_struc_tags=keep_struc_tags,
treat_solvent_like_complex=treat_solvent_like_complex,
**kwargs)
[docs] def generate_membrane_msj(self):
msj_fname = Path(DESMOND_DATA_DIR, "desmond_membrane_fep.msj")
msj = cmj.msj2sea_full(msj_fname)
self._set_is_for_fep(msj)
mode = self.sim_params['custom_charge_mode']
if mode == CUSTOM_CHARGE_MODE.ASSIGN and self.fep_type not in CUSTOM_CHARGE_FEP_TYPES:
# If fep_type does not support custom charges, keep them
mode = CUSTOM_CHARGE_MODE.KEEP
self._set_assign_custom_charge(msj, mode)
return msj.stage
[docs] def write_main_msj(self):
"""
"""
msj_fname = self._get_msj_filename()
shutil.copyfile(self.from_main_msj, msj_fname)
self.generate_main_msj()
return msj_fname
[docs] def write_subjob_msjs(self):
"""
Write the msjs for each leg type. Subclasses which run additional legs
should override and write the additional msjs
"""
self.write_solvent_msj()
self.write_complex_msj()
if self.sim_params['membrane']:
self.write_membrane_msj()
[docs] def write_complex_msj(self):
cmj.write_sea2msj(
self.generate_complex_msj(False, False),
self._get_msj_filename(FepLegTypes.COMPLEX,
SIMULATION_PROTOCOL.DEFAULT))
cmj.write_sea2msj(
self.generate_complex_msj(True, False),
self._get_msj_filename(FepLegTypes.COMPLEX,
SIMULATION_PROTOCOL.CHARGED))
cmj.write_sea2msj(
self.generate_complex_msj(False, True),
self._get_msj_filename(FepLegTypes.COMPLEX,
SIMULATION_PROTOCOL.COREHOPPING))
[docs] def write_solvent_msj(self):
cmj.write_sea2msj(
self.generate_solvent_msj(False, False),
self._get_msj_filename(FepLegTypes.SOLVENT,
SIMULATION_PROTOCOL.DEFAULT))
cmj.write_sea2msj(
self.generate_solvent_msj(True, False),
self._get_msj_filename(FepLegTypes.SOLVENT,
SIMULATION_PROTOCOL.CHARGED))
cmj.write_sea2msj(
self.generate_solvent_msj(False, True),
self._get_msj_filename(FepLegTypes.SOLVENT,
SIMULATION_PROTOCOL.COREHOPPING))
[docs] def write_membrane_msj(self):
cmj.write_sea2msj(self.generate_membrane_msj(),
self._get_msj_filename(_MEMBRANE))
[docs] def modify_fep_launcher_stage(self, raw):
COMPLEX = 0
SOLVENT = 1
MSJ_FNAME = 1
for s in _find_stage(raw.stage, "fep_launcher"):
fep_launcher_stage = s
break
else:
raise Exception("no fep_launcher stage in the template msj file")
num_cpus = int(self.cd_params['cpus'])
mps_factor = self.cd_params['mps_factor']
dispatch_template = copy.deepcopy(
fep_launcher_stage["dispatch"].default)
default_fep = fep_launcher_stage["dispatch"].default
default_fep[COMPLEX][MSJ_FNAME].val = self._get_msj_filename(
FepLegTypes.COMPLEX, SIMULATION_PROTOCOL.DEFAULT)
default_fep[SOLVENT][MSJ_FNAME].val = self._get_msj_filename(
FepLegTypes.SOLVENT, SIMULATION_PROTOCOL.DEFAULT)
num_windows = self.sim_params['lambda_windows'] or 12
self._set_job_num_cpus(default_fep,
num_cpus,
num_windows,
mps_factor=mps_factor,
max_walltime=self.sim_params['max_walltime'])
charge_fep = fep_launcher_stage["dispatch"].charge
charge_fep[COMPLEX][MSJ_FNAME].val = self._get_msj_filename(
FepLegTypes.COMPLEX, SIMULATION_PROTOCOL.CHARGED)
charge_fep[SOLVENT][MSJ_FNAME].val = self._get_msj_filename(
FepLegTypes.SOLVENT, SIMULATION_PROTOCOL.CHARGED)
num_windows = self.sim_params['charged_lambda_windows'] or 16
self._set_job_num_cpus(charge_fep,
num_cpus,
num_windows,
mps_factor=mps_factor,
max_walltime=self.sim_params['max_walltime'])
# formal change on dummy atoms, but no net charge change
charge0_fep = copy.deepcopy(charge_fep)
fep_launcher_stage["dispatch"]["charge0"] = charge0_fep
ch_fep = copy.deepcopy(dispatch_template)
ch_fep[COMPLEX][MSJ_FNAME].val = self._get_msj_filename(
FepLegTypes.COMPLEX, SIMULATION_PROTOCOL.COREHOPPING)
ch_fep[SOLVENT][MSJ_FNAME].val = self._get_msj_filename(
FepLegTypes.SOLVENT, SIMULATION_PROTOCOL.COREHOPPING)
num_windows = (self.sim_params['core_hopping_lambda_windows'] or
DEFAULT_LAMBDAS_BY_SCHEDULE['flexible'])
self._set_job_num_cpus(ch_fep,
num_cpus,
num_windows,
mps_factor=mps_factor,
max_walltime=self.sim_params['max_walltime'])
fep_launcher_stage["dispatch"]["core-hopping"] = ch_fep
macro_ch_fep = copy.deepcopy(ch_fep)
for k in [COMPLEX, SOLVENT]:
macro_ch_fep[k].extend(["-set", _MACRO_CYCLE_OPTION])
fep_launcher_stage["dispatch"]["macrocycle-core-hopping"] = macro_ch_fep
if self.sim_params["skip_legs"]:
fep_launcher_stage.set_value("skip_legs",
self.sim_params["skip_legs"],
tag=_SET_BY_USER_TAG)
def _insert_stage_membrane_launcher(self, raw):
"""
Insert membrane_launcher before the fep_launcher stage
"""
msj_str = cmj.write_sea2msj(raw.stage)
msj = parse(string=msj_str)
fep_launcher_stage = msj.find_stages(stage.FepLauncher.NAME)[0]
membrane_launcher_stage = cmj.msj2sea_full(
None, f"{stage.FepMembraneLauncher.NAME} {{ }}").stage[0]
membrane_launcher_stage.set_value("job", [[
'-HOST',
'$SUBHOST:1',
'-m',
self._get_msj_filename(_MEMBRANE),
'-JOBNAME',
'$JOBNAME',
'-mode',
'umbrella',
'-o',
stage.FepMembraneLauncher.CMS_OUT_FNAME,
]],
tag=_SET_BY_USER_TAG)
# STAGE_INDEX is 1-based but Msj indexing is 0-based
raw.stage.insert(fep_launcher_stage.STAGE_INDEX - 1,
membrane_launcher_stage)
[docs] def modify_graph_stage(self, raw):
"""
"""
for s in _find_stage(raw.stage, "fep_mapper"):
if self.fmp_fname:
s.set_value("graph_file", self.fmp_fname, tag=_SET_BY_USER_TAG)
if membrane_fname := self.sim_params['membrane']:
from schrodinger.application.scisol.packages.fep import fepmae
if not membrane_fname.endswith('.fmp'):
# Get number of environment CTs
fsts = fepmae.filter_structures(
struc.read_all_structures(membrane_fname))
num_env_sts = 1 + bool(fsts.solvent) + bool(fsts.membrane)
s.set_value("receptor", num_env_sts, tag=_SET_BY_USER_TAG)
if self.sim_params['atom_mapping']:
s.set_value("atom_mapping",
self.sim_params["atom_mapping"],
tag=_SET_BY_USER_TAG)
if self.sim_params['align_core_only']:
s.set_value("align_core_only", True, tag=_SET_BY_USER_TAG)
if self.sim_params['ats']:
s.set_value("ats", self.sim_params["ats"], tag=_SET_BY_USER_TAG)
break
else:
raise Exception("No fep_mapper stage in the msj file")
def _set_subjob_msj(self,
subjobmsj_contents,
buffer_width,
net_charge,
core_hopping: bool = False,
is_complex_leg: bool = False,
keep_struc_tags: List[str] = None,
treat_solvent_like_complex: bool = False):
"""
:param keep_struc_tags: List of structure tags for structures to keep.
Structure tags are as defined by `FEP_STRUC_TAG`.
`None` means to use the default set of tags for
this fep type.
"""
raw = cmj.msj2sea_full(None, msj_content=subjobmsj_contents)
sim_time = self.sim_params['sim_time_complex'] if is_complex_leg else \
self.sim_params['sim_time_solvent']
self._set_water_model(raw)
if self.sim_params['modify_dihe']:
self._set_modify_dihedral(raw)
if self.sim_params['h_mass']:
self._set_hmr(raw)
if self.sim_params['minimize_volume']:
self._set_minimize_volume(raw)
self._set_buffer_width(raw, min_buffer_width=buffer_width)
rest_property = REST_PROPERTIES.COMPLEX_HOTREGION if is_complex_leg else REST_PROPERTIES.SOLVENT_HOTREGION
self._set_lambda_hopping(raw, sim_time, rest_property)
self._set_ff(raw)
if net_charge:
self._set_lambda_windows(raw,
self.sim_params['charged_lambda_windows'],
'charge')
elif core_hopping:
self._set_lambda_windows(
raw, self.sim_params['core_hopping_lambda_windows'], 'flexible')
else:
self._set_lambda_windows(raw, self.sim_params['lambda_windows'])
if self.sim_params['salt_concentration'] or net_charge:
self._set_net_charge_box(raw, net_charge=net_charge)
if self.sim_params.get('membrane') and (is_complex_leg or
treat_solvent_like_complex):
self._set_box_parameters(raw)
self._set_fep_type(raw)
mode = self.sim_params['custom_charge_mode']
if mode == CUSTOM_CHARGE_MODE.ASSIGN and self.fep_type not in CUSTOM_CHARGE_FEP_TYPES:
# If fep_type does not support custom charges, keep them
mode = CUSTOM_CHARGE_MODE.KEEP
self._set_assign_custom_charge(raw, mode)
self._set_extract_structures(raw, keep_struc_tags)
if self.sim_params['ensemble'] == Ensembles.MUVT:
self._set_gcmc(raw)
self._set_compress_cms(raw)
if self.sim_params['concatenate']:
new_raw = cmj.concatenate_relaxation_stages(raw)
if new_raw is not None:
raw = new_raw
self._set_print_expected_memory(raw)
return raw.stage
def _set_net_charge_box(self,
msj: sea.Map,
make_alchemical_water: bool = True,
add_alchemical_ions: Optional[Dict[str,
bool]] = None,
net_charge: bool = True):
"""
Set the net charge box including ions in the simulation msj.
:param msj: Msj to update in place.
:param make_alchemical_water: Set the `make_alchemical_water` parameter
of the assign_forcefield stage.
:param add_alchemical_ions: Set the `add_alchemical_ions` parameter of
the build_geometry stage. This is only used by absolute binding FEP.
"""
assert not (make_alchemical_water and add_alchemical_ions)
build_geometry = next(_find_stage(msj.stage, stage.BuildGeometry.NAME))
build_geometry.set_value("box_shape", "cubic", tag=_SET_BY_USER_TAG)
if add_alchemical_ions:
build_geometry.set_value("add_alchemical_ions",
sea.Map(add_alchemical_ions),
tag=_SET_BY_USER_TAG)
assign_forcefield = next(
_find_stage(msj.stage, stage.AssignForcefield.NAME))
assign_forcefield.set_value("make_alchemical_water",
make_alchemical_water,
tag=_SET_BY_USER_TAG)
self._set_neutralize_box(msj, net_charge=net_charge)
def _set_neutralize_box(self, msj: sea.Map, net_charge: bool = True):
"""
Set the box to neutralize ions.
:param msj: Msj to update in place.
"""
build_geometry = next(_find_stage(msj.stage, stage.BuildGeometry.NAME))
build_geometry.set_value("neutralize_system",
"true",
tag=_SET_BY_USER_TAG)
build_geometry["salt"] = sea.Map()
build_geometry.salt["negative_ion"] = "Cl"
build_geometry.salt["positive_ion"] = "Na"
if net_charge:
build_geometry.salt["concentration"] = max(
_MIN_CHARGED_SALT_CONC, self.sim_params["salt_concentration"])
else:
build_geometry.salt["concentration"] = self.sim_params[
"salt_concentration"]
build_geometry.salt.add_tag(_SET_BY_USER_TAG)
def _set_modify_dihedral(self, msj: sea.Map):
"""
Set the modify dihedral parameter in the simulation msj.
:param msj: Msj to update in place.
"""
for i, s in enumerate(msj.stage):
if s.__NAME__ == stage.AssignForcefield.NAME:
s.set_value("fep_enhance_sampling_dihedral",
True,
tag=_SET_BY_USER_TAG)
break
def _set_minimize_volume(self, msj: sea.Map):
"""
Set the minimize volume parameter in the simulation msj.
:param msj: Msj to update in place.
"""
for i, s in enumerate(msj.stage):
if s.__NAME__ in [
stage.BuildGeometry.NAME, stage.SystemBuilder.NAME
]:
s.set_value("minimize_volume", "true", tag=_SET_BY_USER_TAG)
break
def _set_box_parameters(self,
msj: sea.Map,
box: Optional[Tuple[float]] = None):
"""
Set the build geometry parameters given the optional box information.
:param msj: Msj to update in place.
:param box: If provided, the absolute orthorhombic box parameters.
Otherwise, set up the build_geometry stage to read in the box
from the input structure.
"""
for s in _find_stage(msj.stage, stage.BuildGeometry.NAME):
s.set_value("solvate_system", False, tag=_SET_BY_USER_TAG)
s.set_value("distil_solute", True, tag=_SET_BY_USER_TAG)
s.set_value("rezero_system", False, tag=_SET_BY_USER_TAG)
del s["box_shape"]
del s["buffer_width"]
if box is None:
s["box"] = sea.Atom("read")
else:
s["box"] = sea.Map("""
shape = "orthorhombic"
size_type = "absolute"
size = [%s %s %s]
""" % box)
s.box.add_tag(_SET_BY_USER_TAG)
break
def _get_additional_hmr_stages(self, msj: sea.Map) -> List[sea.Map]:
"""
Here we return the GCMC equilibration stages, since they need to use
the same timestep as the production stage as the chemical potential
depends slightly on the timestep.
See super method for additional documentation.
"""
return self._find_gcmc_equil_stages(msj)
def _find_gcmc_equil_stages(self, msj):
gcmc_equil_stages = []
prod_stages = set(self._find_production_stages(msj))
for s in msj.stage:
if (s.__NAME__ in [stage.Simulate.NAME, stage.Concatenate.NAME] and
s not in prod_stages):
if "gcmc" in s.keys(tag=_SET_BY_USER_TAG):
gcmc_equil_stages.append(s)
return gcmc_equil_stages
def _set_gcmc_stage_params(self, stg: sea.Map, production=False):
"""
Enable GCMC and set the random seed, chemical potential, and solvent
density parameters on a simulate stage based on `self.sim_params`.
:param stg: A map representing a simulate stage
"""
rand_seed = self.sim_params['rand_seed']
water = self.sim_params['water_model']
water = water or 'SPC'
water = water.upper().replace('-', '')
timestep = int(stg.timestep.val[0] * 1000)
try:
potential, density = constants.GCMC_CHEMICAL_POTENTIALS[(water,
timestep)]
except KeyError:
raise ValueError("ERROR: No calibrated GCMC chemical potential "
"found for this water model: "
f"{water} at a {timestep} fs timestep.")
# set explicit seed for reproducibility at FEP+ level.
# need to change type from string to int so use new Atom object
stg.gcmc.seed.update(sea.Atom(rand_seed), tag=_SET_BY_USER_TAG)
# set chemical potential and density according to water model
stg.gcmc.mu_excess.update(potential, tag=_SET_BY_USER_TAG)
stg.gcmc.solvent_density.update(density, tag=_SET_BY_USER_TAG)
# production stage gcmc params are not defined by template and need to
# be updated
if not production:
# we need extra solvent copies during equilibration as the output of
# system builder tends to be under-solvated.
# TODO this number is probably less now with changes to vdw scaling
stg.gcmc.solvent.num_copies.update(
_GCMC_EQUILIBRATION_TOTAL_SOLVENT)
# The gcmc block is always defined, but if it is not
# explicitly set, it will be set to none by the stage.
# This is because a normal simulate stage is 'gcmc enabled'
# but may not use gcmc.
# The user can pass
# simulate {
# gcmc = {
# }
# }
# for gcmc behavior, otherwise it will not be used.
# So the _SET_BY_USER_TAG mediates if gcmc is enabled.
stg.gcmc.add_tag(_SET_BY_USER_TAG, propagate=False)
def _set_gcmc(self, msj: sea.Map):
"""
Configure the GCMC equilibration and production stages
:param msj: The top level msj content.
"""
self._set_scale_solvent_vdw(msj, _GCMC_SOLVENT_VDW_SCALE_FACTOR)
self._set_remove_overlapped_crystal_water(msj, True)
gcmc_equil_stages = self._find_gcmc_equil_stages(msj)
for s in gcmc_equil_stages:
self._set_gcmc_stage_params(s)
production_stages = self._find_production_stages(msj)
for production_stage in production_stages:
self._set_gcmc_stage_params(production_stage, production=True)
def _set_restraint_retain(self, msj: sea.Map):
"""
Update the simulate stages to retain the restraints
:param msj: Msj to update in place.
"""
for stage_name in _SIMULATE_STAGE_NAMES:
for s in _find_stage(msj.stage, stage_name):
s.set_value("print_restraint", False, _SET_BY_USER_TAG)
try:
existing = s.get_value("restraints.existing").val
except KeyError:
existing = "ignore"
if existing == "ignore":
s.set_value("restraints.existing", "ignore_posre",
_SET_BY_USER_TAG)
def _insert_stage_load_restraints_from_structure(self, msj: sea.Map):
"""
Insert the load_restraints_from_structure stage after the
assign_forcefield stage.
:param msj: Msj to update in place.
"""
for idx, s in enumerate(msj.stage):
if s.__NAME__ == stage.AssignForcefield.NAME:
break
load_restraints_msj = cmj.msj2sea_full(
None, msj_content=f'{stage.LoadRestraintsFromStructure.NAME} {{}}')
msj.stage.insert(idx + 1, load_restraints_msj.stage[0])
[docs]class SmallMoleculeMsjGenerator(MapperMsjGenerator):
[docs] def modify_fep_launcher_stage(self, raw):
super().modify_fep_launcher_stage(raw)
for s in _find_stage(raw.stage, stage.FepLauncher.NAME):
fep_launcher_stage = s
break
ch_fep = fep_launcher_stage["dispatch"][SIMULATION_PROTOCOL.COREHOPPING]
msj_fname_idx = ch_fep[0].val.index('-m') + 1
complex_idx = 0 if 'complex' in ch_fep[0][msj_fname_idx].val else 1
solvent_idx = 1 if complex_idx == 0 else 0
# The fragment linking complex stage is the same as core-hopping.
# There are three custom solvent stages, which are based on
# the core-hopping stage.
fl_fep = copy.deepcopy(ch_fep)
fl_fep[complex_idx][msj_fname_idx].val = self._get_msj_filename(
FepLegTypes.COMPLEX, SIMULATION_PROTOCOL.FRAGMENT_LINKING)
fl_solvent_orig = copy.deepcopy(fl_fep[solvent_idx])
del fl_fep[solvent_idx]
for job in FRAGMENT_LINKING_SOLVENT_JOBS:
fl_solvent = copy.deepcopy(fl_solvent_orig)
fl_solvent[msj_fname_idx].val = self._get_msj_filename(
job, SIMULATION_PROTOCOL.FRAGMENT_LINKING)
output_idx = fl_solvent.val.index('-o') + 1
fl_solvent[output_idx].val = f'$MAINJOBNAME_$JOBTAG_{job}-out.mae'
jobname_idx = fl_solvent.val.index('-JOBNAME') + 1
fl_solvent[jobname_idx].val = f'$MAINJOBNAME_$JOBTAG_{job}'
fl_fep.append(fl_solvent)
fep_launcher_stage["dispatch"][
SIMULATION_PROTOCOL.FRAGMENT_LINKING] = fl_fep
[docs] def write_complex_msj(self):
"""
Write the complex subjob msjs.
"""
super().write_complex_msj()
cmj.write_sea2msj(
self.generate_complex_msj(
net_charge=False,
ch=True,
fragment_linking_job=FRAGMENT_LINKING_JOBS.VAL.COMPLEX),
self._get_msj_filename(FepLegTypes.COMPLEX,
SIMULATION_PROTOCOL.FRAGMENT_LINKING))
[docs] def write_solvent_msj(self):
"""
Write the solvent subjob msjs.
"""
super().write_solvent_msj()
for job in FRAGMENT_LINKING_SOLVENT_JOBS:
cmj.write_sea2msj(
self.generate_solvent_msj(net_charge=False,
fragment_linking_job=job),
self._get_msj_filename(job,
SIMULATION_PROTOCOL.FRAGMENT_LINKING))
def _set_subjob_msj(self,
subjobmsj_contents: str,
buffer_width: float,
net_charge: bool,
core_hopping: bool = False,
is_complex_leg: bool = False,
keep_struc_tags: List[str] = None,
treat_solvent_like_complex: bool = False,
fragment_linking_job: FRAGMENT_LINKING_JOBS.VAL = None):
"""
:param subjobmsj_contents: Contents of the template msj for this subjob.
:param buffer_width: Size of the box buffer in Angstrom.
:param net_charge: True for net charge simulation.
:param core_hopping: True for core-hopping simulation.
:param is_complex_leg: True for complex leg. Default is False.
:param keep_struc_tags: List of structure tags for structures to keep.
Structure tags are as defined by `FEP_STRUC_TAG`.
`None` means to use the default set of tags for
this fep type.
:param treat_solvent_like_complex: Set to True to treat the solvent leg
more like the complex leg in terms of membrane parameters and gcmc.
Default is False.
:param fragment_linking_job: If not `None`, the type of fragment linking job.
"""
if fragment_linking_job is not None:
raw = cmj.msj2sea_full(None, msj_content=subjobmsj_contents)
if fragment_linking_job != FRAGMENT_LINKING_JOBS.VAL.COMPLEX:
self._set_fragment_linking_solvent(raw, fragment_linking_job)
subjobmsj_contents = cmj.write_sea2msj(raw.stage)
if fragment_linking_job in [
FRAGMENT_LINKING_JOBS.VAL.COMPLEX,
FRAGMENT_LINKING_JOBS.VAL.SOLVENT
]:
core_hopping = True
else:
# FIXME: Let user control number of lambdas
num_windows = self.sim_params['lambda_windows']
self.sim_params['lambda_windows'] = 24
stages = super()._set_subjob_msj(
subjobmsj_contents,
buffer_width,
net_charge,
core_hopping=core_hopping,
is_complex_leg=is_complex_leg,
keep_struc_tags=keep_struc_tags,
treat_solvent_like_complex=treat_solvent_like_complex)
# Custom charge is not currently supported by fragment linking
if fragment_linking_job is not None:
for s in _find_stage(stages, stage.AssignCustomCharge.NAME):
s.set_value("mode", CUSTOM_CHARGE_MODE.KEEP, _SET_BY_USER_TAG)
if not core_hopping:
self.sim_params['lambda_windows'] = num_windows
return stages
def _set_fragment_linking_solvent(self, msj: sea.Map,
job: FRAGMENT_LINKING_JOBS.VAL):
"""
Set up the job for the solvent leg of fragment linking.
:param msj: Msj to update in place.
:param job: Fragment linking job.
"""
fragment_linking_header = """task {
task = "generic"
}
# Set up structures for task below
extract_structures {
}
fragment_linking_primer {
job = %s
}
""" % job
restrained = job in [
FRAGMENT_LINKING_JOBS.VAL.SOLVENT,
FRAGMENT_LINKING_JOBS.VAL.RESTRAINED_FRAGMENT_HYDRATION
]
# Insert the header stages
header_stages = cmj.msj2sea_full(None,
msj_content=fragment_linking_header)
for i, header_stage in enumerate(header_stages.stage):
msj.stage.insert(i, header_stage)
if job in [
FRAGMENT_LINKING_JOBS.VAL.RESTRAINED_FRAGMENT_HYDRATION,
FRAGMENT_LINKING_JOBS.VAL.FRAGMENT_HYDRATION
]:
self._set_fep_analysis(msj, False)
# Insert LoadRestraintsFromStructure stage after AssignForcefield
if restrained:
self._insert_stage_load_restraints_from_structure(msj)
self._set_restraint_retain(msj)
[docs]class VacuumMsjGenerator(MapperMsjGenerator):
"""
"""
[docs] def __init__(self, jobname, cd_params, **kwargs):
"""
See `MapperMsjGenerator` for a description of the input parameters.
"""
kwargs.setdefault('ensemble', Ensembles.NVT)
super(VacuumMsjGenerator, self).__init__(jobname, cd_params, **kwargs)
if self.sim_params['ensemble'] != Ensembles.NVT:
raise NotImplementedError("Vacuum simulations do not support NPT.")
# Paths to template msj files
self.from_main_msj = os.path.join(DESMOND_DATA_DIR,
"vacuum_launcher.msj")
self.from_subjob_msj = os.path.join(DESMOND_DATA_DIR,
"desmond_nvt_fep_vacuum.msj")
self.min_buffer_width = 100
self.sim_params['buffer_width'] *= 20
self.sim_params['atom_mapping'] = ""
self.asl = _REST_ASL_TEMPLATE
[docs] def generate_vacuum_msj(self, net_charge=False, core_hopping=False):
with open(self.from_subjob_msj) as fh:
subjobmsj_contents = fh.read()
keep_struc_tags = [FEP_STRUC_TAG.VAL.LIGAND]
return self._set_subjob_msj(subjobmsj_contents,
net_charge,
core_hopping,
keep_struc_tags=keep_struc_tags)
[docs] def write_vacuum_msj(self, concatenate=False):
cmj.write_sea2msj(self.generate_vacuum_msj(),
self._get_msj_filename(FepLegTypes.VACUUM))
cmj.write_sea2msj(
self.generate_vacuum_msj(net_charge=True),
self._get_msj_filename(FepLegTypes.VACUUM,
SIMULATION_PROTOCOL.CHARGED))
cmj.write_sea2msj(
self.generate_vacuum_msj(core_hopping=True),
self._get_msj_filename(FepLegTypes.VACUUM,
SIMULATION_PROTOCOL.COREHOPPING))
[docs] def modify_fep_launcher_stage(self, raw):
MSJ_FNAME = 1
for s in raw.stage:
if (s.__NAME__ == "fep_launcher"):
fep_launcher_stage = s
break
else:
raise Exception("no fep_launcher stage in the template msj file")
num_cpus = int(self.cd_params['cpus'])
mps_factor = self.cd_params['mps_factor']
dispatch_template = copy.deepcopy(
fep_launcher_stage["dispatch"].default)
def get_protocol_cmd(launcher_stage,
protocol,
template=dispatch_template):
return (protocol in launcher_stage['dispatch'] and
launcher_stage['dispatch'][protocol] or
copy.deepcopy(template))
default_fep = fep_launcher_stage["dispatch"].default
for leg_index, f in enumerate(default_fep):
if f[f.val.index('-m') + 1].val.endswith('vacuum.msj'):
break
default_fep[leg_index][MSJ_FNAME].val = self._get_msj_filename(
FepLegTypes.VACUUM)
num_windows = self.sim_params['lambda_windows'] or 12
self._set_job_num_cpus(default_fep,
num_cpus,
num_windows,
mps_factor=mps_factor,
max_walltime=self.sim_params['max_walltime'])
charge_fep = get_protocol_cmd(fep_launcher_stage, 'charge')
charge_fep[leg_index][MSJ_FNAME].val = self._get_msj_filename(
FepLegTypes.VACUUM, SIMULATION_PROTOCOL.CHARGED)
num_windows = self.sim_params['charged_lambda_windows'] or 16
self._set_job_num_cpus(charge_fep,
num_cpus,
num_windows,
mps_factor=mps_factor,
max_walltime=self.sim_params['max_walltime'])
# formal change on dummy atoms, but no net charge change
charge0_fep = copy.deepcopy(charge_fep)
charge_fep = get_protocol_cmd(fep_launcher_stage,
'charge0',
template=charge_fep)
fep_launcher_stage["dispatch"]["charge0"] = charge0_fep
# delete protocols that are not supported for vacuum
for key in ["charge", "fragment-linking"]:
if key not in fep_launcher_stage["dispatch"]:
continue
protocol = fep_launcher_stage["dispatch"][key]
for leg_cmd in protocol:
val = leg_cmd.val
jobname = val[val.index('-JOBNAME') + 1]
if jobname.endswith('$JOBTAG_vacuum'):
protocol.remove(leg_cmd)
if not protocol:
del fep_launcher_stage["dispatch"][key]
ch_fep = get_protocol_cmd(fep_launcher_stage, 'core-hopping')
ch_fep[leg_index][MSJ_FNAME].val = self._get_msj_filename(
FepLegTypes.VACUUM, SIMULATION_PROTOCOL.COREHOPPING)
num_windows = (self.sim_params['core_hopping_lambda_windows'] or
DEFAULT_LAMBDAS_BY_SCHEDULE['flexible'])
self._set_job_num_cpus(ch_fep,
num_cpus,
num_windows,
mps_factor=mps_factor,
max_walltime=self.sim_params['max_walltime'])
fep_launcher_stage["dispatch"]["core-hopping"] = ch_fep
macro_ch_fep = get_protocol_cmd(fep_launcher_stage,
"macrocycle-core-hopping", ch_fep)
macro_ch_fep[leg_index].extend(["-set", _MACRO_CYCLE_OPTION])
macro_ch_fep[leg_index][MSJ_FNAME].val = self._get_msj_filename(
FepLegTypes.VACUUM, SIMULATION_PROTOCOL.MACROCYCLE_COREHOPPING)
fep_launcher_stage["dispatch"]["macrocycle-core-hopping"] = macro_ch_fep
[docs] def modify_graph_stage(self, raw):
"""
"""
for s in _find_stage(raw.stage, "fep_mapper"):
s.set_value("graph_file", self.fmp_fname, tag=_SET_BY_USER_TAG)
s.set_value("ignore_ddg", True, tag=_SET_BY_USER_TAG)
break
else:
raise Exception("No fep_mapper stage in the msj file")
def _set_subjob_msj(self,
subjobmsj_contents,
net_charge,
core_hopping,
keep_struc_tags: List[str] = None):
"""
:param keep_struc_tags: List of structure tags for structures to keep.
Structure tags are as defined by `FEP_STRUC_TAG`.
`None` means to use the default set of tags for
this fep type.
"""
raw = cmj.msj2sea_full(None, msj_content=subjobmsj_contents)
self._set_hmr(raw)
self._set_buffer_width(raw, self.min_buffer_width)
self._set_lambda_hopping(
raw,
self.sim_params['sim_time_vacuum'],
rest_property=REST_PROPERTIES.SOLVENT_HOTREGION)
self._set_ff(raw)
if net_charge:
self._set_lambda_windows(raw,
self.sim_params['charged_lambda_windows'],
'charge')
elif core_hopping:
self._set_lambda_windows(
raw, self.sim_params['core_hopping_lambda_windows'], 'flexible')
else:
self._set_lambda_windows(raw, self.sim_params['lambda_windows'])
self._set_fep_type(raw)
mode = self.sim_params['custom_charge_mode']
if mode == CUSTOM_CHARGE_MODE.ASSIGN and self.fep_type not in CUSTOM_CHARGE_FEP_TYPES:
# If fep_type does not support custom charges, keep them
mode = CUSTOM_CHARGE_MODE.KEEP
self._set_assign_custom_charge(raw, mode)
self._set_extract_structures(raw, keep_struc_tags)
self._set_compress_cms(raw)
if self.sim_params['concatenate']:
new_raw = cmj.concatenate_relaxation_stages(raw)
if new_raw is not None:
raw = new_raw
self._set_print_expected_memory(raw)
return raw.stage
[docs]class SmallMoleculeWithVacuumMsjGenerator(SmallMoleculeMsjGenerator):
[docs] def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
vacuum_kwargs = copy.deepcopy(kwargs)
for key in ['membrane', 'ensemble', 'atom_mapping']:
vacuum_kwargs.pop(key, None)
self.vg = VacuumMsjGenerator(*args, **vacuum_kwargs)
self.from_main_msj = os.path.join(DESMOND_DATA_DIR,
"vacuum2_launcher.msj")
[docs] def generate_main_msj(self):
raw = cmj.msj2sea_full(self.from_main_msj)
self.modify_graph_stage(raw)
if self.sim_params['membrane']:
self._insert_stage_membrane_launcher(raw)
if self.sim_params['ffbuilder']:
self._set_ffbuilder(raw)
self.modify_fep_launcher_stage(raw)
self.vg.modify_fep_launcher_stage(raw)
cmj.write_sea2msj(raw.stage, self._get_msj_filename())
[docs] def write_subjob_msjs(self):
"""
Write the vacuum msj in addition to the complex+solvent msjs
"""
self.write_vacuum_msj()
super().write_subjob_msjs()
[docs] def write_vacuum_msj(self):
return self.vg.write_vacuum_msj()
[docs]class CovalentMsjGenerator(MapperMsjGenerator):
"""
"""
[docs] def __init__(self, jobname: str, cd_params: Dict[str, object],
fmp_fname: str, **kwargs):
"""
See `MapperMsjGenerator` for a description of the input parameters.
In addition to these parameters, the following are supported.
:param fmp_fname: Name of the fmp file to use for covalent fep.
"""
super(CovalentMsjGenerator, self).__init__(jobname, cd_params, **kwargs)
# Path to template msj file
self.from_main_msj = os.path.join(DESMOND_DATA_DIR, "covalent.msj")
self.fep_type = FEP_TYPES.COVALENT_LIGAND
self.fmp_fname = fmp_fname
[docs] def generate_subjob_msj(self,
buffer_width,
is_complex_leg=False,
net_charge=False,
ch=False,
keep_struc_tags=None,
treat_solvent_like_complex=False):
"""
:param keep_struc_tags: List of structure tags for structures to keep.
Structure tags are as defined by `FEP_STRUC_TAG`.
`None` means to use the default set of tags for
this fep type.
"""
with open(self.from_subjob_msj) as fh:
subjobmsj_contents = fh.read()
subjobmsj_contents = COVALENT_HEAD + re.sub(
"buffer_width *= *5.0",
" buffer_width = {bf}".format(bf=buffer_width),
subjobmsj_contents,
count=1)
if keep_struc_tags is None:
if is_complex_leg:
keep_struc_tags = [
FEP_STRUC_TAG.VAL.RECEPTOR, FEP_STRUC_TAG.VAL.MEMBRANE,
FEP_STRUC_TAG.VAL.SOLVENT, FEP_STRUC_TAG.VAL.COMPLEX
]
else:
keep_struc_tags = [FEP_STRUC_TAG.VAL.LIGAND]
return self._set_subjob_msj(
subjobmsj_contents,
buffer_width,
net_charge,
core_hopping=ch,
is_complex_leg=is_complex_leg,
keep_struc_tags=keep_struc_tags,
treat_solvent_like_complex=treat_solvent_like_complex)
[docs] def modify_graph_stage(self, raw):
"""
Not support custom .pkl file for now
"""
for s in raw.stage:
if (s.__NAME__ == "covalent_fep_mapper"):
s.set_value("graph_file", self.fmp_fname, tag=_SET_BY_USER_TAG)
if self.sim_params['atom_mapping']:
s.set_value("atom_mapping",
self.sim_params["atom_mapping"],
tag=_SET_BY_USER_TAG)
break
else:
raise Exception("No covalent_fep_mapper stage in the msj file")
[docs]class ProteinMapperMsjGenerator(CovalentMsjGenerator):
"""
"""
[docs] def __init__(self,
jobname: str,
cd_params: Dict[str, object],
fep_type: str,
solvent_asl: Optional[str] = None,
mutation_filename: str = "mutations.txt",
residue_file: str = "",
neighbor: Optional[int] = None,
structure_file: str = "",
**kwargs):
"""
See `MapperMsjGenerator` for a description of the input parameters.
In addition to these parameters, the following are supported.
:param fep_type: Type of protein fep.
:param neighbor: Number of residues to flank the mutated residue when
creating the mutant peptides. Its default value will be
set based on `fep_type`.
:param solvent_asl: Specify ASL to put in solvent leg for protein residue
mutation.
:param mutation_filename: Filename containing the list of mutations.
:param residue_file: Mae filename which contains the non-standard
amino-acids structures.
:param structure_file: If present, the filename for the input structure.
This is used to expand a previous run, where
additional mutations are added to the graph
and run.
"""
super(ProteinMapperMsjGenerator, self).__init__(jobname,
cd_params,
fmp_fname=None,
**kwargs)
self.solvent_asl = solvent_asl
self.mutation_filename = mutation_filename
self.residue_file = residue_file
self.structure_file = structure_file
self.neighbor = neighbor
if self.neighbor is None:
self.neighbor = 1 if fep_type == FEP_TYPES.PROTEIN_STABILITY else 0
# Path to template msj file
self.from_main_msj = os.path.join(DESMOND_DATA_DIR,
"protein_mut_launcher.msj")
self.fep_type = fep_type
self.is_selectivity = self.fep_type in [
FEP_TYPES.PROTEIN_SELECTIVITY, FEP_TYPES.LIGAND_SELECTIVITY
]
# For PRM jobs that do not include PROTEIN_STABILITY, both FEP legs will
# contain proteins. For that reason we want to use a smaller box.
if self.is_selectivity:
self.solvent_box_buffer_width = 5.0
[docs] def generate_solvent_msj(self, net_charge, ch=False):
buffer_width = self.solvent_box_buffer_width
# Because there are proteins in the solvent leg of non-PROTEIN_STABILITY jobs,
# buffer width needs to be the same as complex leg's.
if net_charge and self.is_selectivity:
buffer_width = _NET_CHARGE_COMPLEX_BUFFER_WIDTH
# For selectivity fep, the solvent leg includes the environment
# so we need to treat it more like the complex leg.
if not self.is_selectivity:
with self.patch_ensemble():
return self.generate_subjob_msj(buffer_width,
is_complex_leg=False,
net_charge=net_charge,
ch=ch)
else:
return self.generate_subjob_msj(buffer_width,
is_complex_leg=False,
net_charge=net_charge,
ch=ch)
[docs] def generate_subjob_msj(self,
buffer_width,
is_complex_leg=False,
net_charge=False,
ch=False,
keep_struc_tags=None):
"""
The subjob msj file will be similar to the covalent bond workflow.
For protein/ligand selectivity calculations, both complex and solvent
legs contain the receptor, so they will be treated like the complex leg.
:param keep_struc_tags: List of structure tags for structures to keep.
Structure tags are as defined by `FEP_STRUC_TAG`.
`None` means to use the default set of tags for
this fep type.
"""
if self.is_selectivity:
# protein/ligand selectivity uses a different
# set of structures from covalent fep.
if keep_struc_tags is None:
if is_complex_leg:
keep_struc_tags = [
FEP_STRUC_TAG.VAL.RECEPTOR, FEP_STRUC_TAG.VAL.MEMBRANE,
FEP_STRUC_TAG.VAL.SOLVENT, FEP_STRUC_TAG.VAL.COMPLEX
]
else:
keep_struc_tags = [
FEP_STRUC_TAG.VAL.MEMBRANE, FEP_STRUC_TAG.VAL.SOLVENT,
FEP_STRUC_TAG.VAL.COMPLEX
]
treat_solvent_like_complex = True
else:
# protein stability uses the same structures as covalent fep
treat_solvent_like_complex = False
return super().generate_subjob_msj(
buffer_width,
is_complex_leg=is_complex_leg,
net_charge=net_charge,
ch=ch,
keep_struc_tags=keep_struc_tags,
treat_solvent_like_complex=treat_solvent_like_complex)
[docs] def modify_graph_stage(self, raw):
"""
"""
for s in raw.stage:
if s.__NAME__ == stage.ProteinMutationGenerator.NAME:
s.set_value("mutation_file",
self.mutation_filename,
tag=_SET_BY_USER_TAG)
if self.residue_file:
s.set_value("residue_file",
self.residue_file,
tag=_SET_BY_USER_TAG)
if self.fmp_fname:
s.set_value("graph_file",
self.fmp_fname,
tag=_SET_BY_USER_TAG)
if self.structure_file:
s.set_value("structure_file",
self.structure_file,
tag=_SET_BY_USER_TAG)
elif s.__NAME__ == stage.ProteinFepMapper.NAME:
if self.solvent_asl:
s.set_value("asl", self.solvent_asl, tag=_SET_BY_USER_TAG)
if self.sim_params['atom_mapping'] == 'sidechain':
s.set_value("sidechain", True, tag=_SET_BY_USER_TAG)
s.set_value("fep_type", self.fep_type, tag=_SET_BY_USER_TAG)
s.set_value("neighbor", self.neighbor, tag=_SET_BY_USER_TAG)
if self.fmp_fname:
s.set_value("graph_file",
self.fmp_fname,
tag=_SET_BY_USER_TAG)
break
else:
raise Exception("No protein_fep_mapper stage in the msj file")
[docs]class SolubilityMsjGenerator(BaseFepMsjGenerator):
"""
Generates the msj files needed to run the Solubility FEP workflow.
"""
[docs] def __init__(self, jobname: str, cd_params: Dict[str, object], **kwargs):
"""
See `BaseMsjGenerator` for a description of the input parameters.
In addition to these parameters, the following are supported.
:param hydration_fep_sim_time: The simulation time for the
Hydration FEP production stage.
:param sublimation_fep_sim_time: The simulation time for the
Sublimation FEP production stage.
:param hydration_only: Set to True to only run the hydration leg.
:param solvation_only: Set to True to run the solvation protocol.
Similar to `hydration_only`, but for non-water solvents.
"""
kwargs.setdefault('defaults', {})
_DEFAULTS = {
'hydration_fep_sim_time': 5000,
'sublimation_fep_sim_time': 10000,
'solvation_fep_sim_time': 10000,
'hydration_only': False,
'solvation_only': False,
'crystal_structure': None,
'graph_file': '',
'solvent_composition': '',
'solvent_template_structure': None,
}
kwargs['defaults'].update(_DEFAULTS)
self.fep_type = FEP_TYPES.SOLUBILITY
super(SolubilityMsjGenerator, self).__init__(jobname, cd_params,
**kwargs)
# Only NPT is supported
if self.sim_params['ensemble'] != Ensembles.NPT:
raise NotImplementedError(
'Only NPT ensemble is supported for solubility workflow.')
# Paths to template msj files
self.from_main_msj = os.path.join(DESMOND_DATA_DIR,
"solubility_launcher.msj")
self.from_subjob_msj = os.path.join(
DESMOND_DATA_DIR, PROTOCOL_FNAME[self.sim_params['ensemble']])
self.from_md_msj = os.path.join(DESMOND_DATA_DIR, "solubility_md.msj")
self.from_solvation_msj = os.path.join(DESMOND_DATA_DIR,
"desmond_solvation_fep.msj")
self.min_buffer_width = 10.0
[docs] def generate_main_msj(self) -> sea.Map:
"""
Generate the main msj.
"""
raw = cmj.msj2sea_full(self.from_main_msj)
mps_factor = self.cd_params['mps_factor']
md_launcher_stage = next(
_find_stage(raw.stage, stage.SolubilityMdLauncher.NAME))
fep_launcher_stage = next(
_find_stage(raw.stage, stage.SolubilityFepLauncher.NAME))
analysis_stage = next(
_find_stage(raw.stage, stage.SolubilityFepAnalysis.NAME))
if self.sim_params['ffbuilder']:
self._set_ffbuilder(raw)
# Set number of cpus
num_cpus = int(self.cd_params['cpus'])
num_windows = self.sim_params['lambda_windows'] or 12
# MD can only use 1 CPU/GPU
self._set_job_num_cpus([md_launcher_stage["md_job"]],
1,
num_windows,
max_walltime=self.sim_params['max_walltime'])
# FEP jobs can use more than 1 CPU/GPU
subjob_args = self._get_subjob_args()
self._set_job_num_cpus(subjob_args,
num_cpus,
num_windows,
mps_factor=mps_factor,
max_walltime=self.sim_params['max_walltime'])
fep_launcher_stage.set_value(f"dispatch.{SIMULATION_PROTOCOL.DEFAULT}",
subjob_args,
tag=_SET_BY_USER_TAG)
# Add subjob msjs
md_launcher_stage["md_job"].extend(
list(map(sea.Atom,
["-m", self._get_msj_filename(FepLegTypes.MD)])))
if self.sim_params["hydration_only"]:
analysis_stage.set_value("hydration_only",
True,
tag=_SET_BY_USER_TAG)
if self.sim_params['solvation_only']:
analysis_stage.set_value("solvation_only",
True,
tag=_SET_BY_USER_TAG)
md_launcher_stage.set_value("input_graph_file",
self.sim_params['graph_file'],
tag=_SET_BY_USER_TAG)
analysis_stage.set_value("input_graph_file",
self.sim_params['graph_file'],
tag=_SET_BY_USER_TAG)
return raw.stage
def _get_subjob_args(self):
jobs = sea.List()
if self.sim_params['solvation_only']:
for job_idx in range(solubility_utils.NUM_POLYMER_SOLVATION_JOBS):
jobs.append(self._create_solvation_job_args(job_idx))
else:
jobs.append(self._create_hydration_job_args())
if not (self.sim_params["hydration_only"] or
self.sim_params['solvation_only']):
for job_idx in range(solubility_utils.NUM_SUBLIMATION_JOBS):
jobs.append(self._create_sublimation_job_args(job_idx))
return jobs
def _create_hydration_job_args(self):
subjobname = "$MAINJOBNAME_$JOBTAG_hydration"
return self._create_subjob_args(
self._get_msj_filename(FepLegTypes.HYDRATION), subjobname, 0)
def _create_solvation_job_args(self, job_idx):
subjobname = f"$MAINJOBNAME_$JOBTAG_solvation_{job_idx}"
args = self._create_subjob_args(
self._get_msj_filename(FepLegTypes.SOLVATION), subjobname, 0)
args.extend([
'-set',
f'stage[1].set_family.disordered_system_builder.seed={self.sim_params["rand_seed"]+job_idx}',
'-ADD_FILE', self.sim_params['solvent_template_structure']
])
return args
def _create_sublimation_job_args(self, job_idx):
subjobname = f"$MAINJOBNAME_$JOBTAG_sublimation_{job_idx}"
return self._create_subjob_args(
self._get_msj_filename(FepLegTypes.SUBLIMATION), subjobname,
job_idx + 1)
@staticmethod
def _create_subjob_args(msj_fname, subjobname, start_index):
job_args = ["-m", msj_fname]
job_args.extend(["-o", subjobname + "-out.mae", "-JOBNAME", subjobname])
job_args.extend([
"-set",
f'stage[1].set_family.extract_structures.start_index={start_index}',
"-set",
f'stage[1].set_family.extract_structures.end_index={start_index +1}'
])
return job_args
[docs] def generate_solvation_msj(self) -> sea.Map:
msj_content = cmj.msj2sea_full(self.from_solvation_msj)
sim_time = self.sim_params['solvation_fep_sim_time']
# Setup
self._set_ff(msj_content)
self._set_dsb_params(
msj_content, {
'composition': self.sim_params['solvent_composition'],
'solvent_template_file':
self.sim_params['solvent_template_structure'],
})
self._set_assign_custom_charge(msj_content, CUSTOM_CHARGE_MODE.KEEP)
# Equilibration
self._set_compress_cms(msj_content)
self._set_fep_type(msj_content)
# Lambda hopping
self._set_lambda_hopping(msj_content, sim_time)
self._set_lambda_windows(msj_content, self.sim_params['lambda_windows'])
# Analysis
# For now skip sid/fmpdb generation
self._set_fep_analysis(msj_content, include_sid=False)
return msj_content.stage
[docs] def generate_subjob_msj(self,
is_fep: bool = True,
hydration: bool = False) -> sea.Map:
"""
Generate the msj and return as a sea.Map.
:param is_fep: Set to True for FEP subjob,
False for MD.
:param hydration: Set to True to for hydration FEP
and False for sublimation FEP.
Ignored if not FEP.
"""
msj_fname = self.from_subjob_msj if is_fep else self.from_md_msj
raw = cmj.msj2sea_full(msj_fname)
if self.sim_params['h_mass']:
self._set_hmr(raw, only_last_stage=True)
#If it is larger then default then we set it
self._set_buffer_width(raw, self.min_buffer_width)
if is_fep:
sim_time = self.sim_params[
'hydration_fep_sim_time'] if hydration else self.sim_params[
'sublimation_fep_sim_time']
else:
sim_time = self.sim_params['sim_time']
self._set_production_parameters(raw, sim_time,
self.sim_params['rand_seed'])
self._set_ff(raw)
self._set_dsb_params(raw, {'molecules': self.sim_params["molecules"]})
if is_fep:
self._set_lambda_windows(raw, self.sim_params['lambda_windows'])
self._set_assign_custom_charge(raw, CUSTOM_CHARGE_MODE.KEEP)
for production_stage in self._find_production_stages(raw):
production_stage.__NAME__ = stage.LambdaHopping.NAME
self._set_compress_cms(raw)
self._set_fep_type(raw)
else:
# MD stage, assign custom charges
self._set_assign_custom_charge(
raw, self.sim_params['custom_charge_mode'])
self._set_is_for_fep(raw)
if self.sim_params['concatenate'] and is_fep:
new_raw = cmj.concatenate_relaxation_stages(raw)
if new_raw is not None:
raw = new_raw
if not is_fep and self.sim_params["crystal_structure"] is not None:
raw = self._update_crystal_workflow(raw)
# For hydration energy workflow, need to update
# MD subjob to only generate hydration structure.
if not is_fep and (self.sim_params["hydration_only"] or
self.sim_params['solvation_only']):
num_stages = len(raw.stage)
for idx, s in enumerate(raw.stage[::-1]):
if s.__NAME__ not in [
stage.Task.NAME,
stage.ExtractStructures.NAME,
stage.AssignCustomCharge.NAME,
stage.GenerateSolubilityFepStructures.NAME,
stage.Trim.NAME,
]:
del raw.stage[num_stages - 1 - idx]
params = {
'hydration_only': self.sim_params["hydration_only"],
'solvation_only': self.sim_params['solvation_only']
}
_set_stage_params(raw, stage.GenerateSolubilityFepStructures.NAME,
params)
if is_fep:
self._set_print_expected_memory(raw)
return raw.stage
[docs] def write_main_msj(self) -> str:
"""
Write out the main msj and return the filename.
"""
msj_fname = self._get_msj_filename()
cmj.write_sea2msj(self.generate_main_msj(), msj_fname)
return msj_fname
[docs] def write_md_msj(self) -> str:
"""
Write out the md msj and return the filename.
"""
msj_fname = self._get_msj_filename(FepLegTypes.MD)
cmj.write_sea2msj(self.generate_subjob_msj(is_fep=False), msj_fname)
return msj_fname
[docs] def write_fep_msj(self):
"""
Write out the fep msj files.
"""
cmj.write_sea2msj(self.generate_subjob_msj(is_fep=True, hydration=True),
self._get_msj_filename(FepLegTypes.HYDRATION))
cmj.write_sea2msj(
self.generate_subjob_msj(is_fep=True, hydration=False),
self._get_msj_filename(FepLegTypes.SUBLIMATION))
if self.sim_params['solvation_only']:
cmj.write_sea2msj(self.generate_solvation_msj(),
self._get_msj_filename(FepLegTypes.SOLVATION))
def _update_crystal_workflow(self, raw):
msj = cmj.write_sea2msj(raw.stage)
m = parse(string=msj)
# STAGE_INDEX is 1-based but Msj indexing is 0-based
start = m.find_stages("disordered_system_builder")[0].STAGE_INDEX - 1
end = m.find_stages("extract_solute_structure")[0].STAGE_INDEX - 1
for i in range(end, start - 1, -1):
m.delete(f"[{i}]")
m.put(
f"stage[@]{start}.replicate_structure",
sea.Map("""{
mae_file = "%s"
}""" % self.sim_params["crystal_structure"]))
return cmj.msj2sea_full(None, msj_content=str(m))
[docs]class MixedSolventMsjGenerator(BaseMsjGenerator):
[docs] def __init__(self, jobname: str, cd_params: Dict[str, object],
builder_params: Dict[str, object], **kwargs):
kwargs = copy.deepcopy(kwargs)
self.subhost = kwargs.pop('subhost', 'localhost')
self.equilibrate_time = kwargs.pop('equilibrate_time', 15000.0)
self.custom_probe_dir = kwargs.pop('custom_probe_dir', None)
super(MixedSolventMsjGenerator, self).__init__(jobname, cd_params,
**kwargs)
self.from_main_msj = os.path.join(DESMOND_DATA_DIR, 'mxmd.msj')
self.from_md_msj = os.path.join(DESMOND_DATA_DIR, 'mxmd_npt.msj')
self._builder_params = builder_params
[docs] def write_main_msj(self) -> str:
msj_fname = self._get_msj_filename()
cmj.write_sea2msj(self.generate_main_msj(), msj_fname)
return msj_fname
[docs] def write_md_msj(self) -> str:
msj_fname = self._get_msj_filename(FepLegTypes.MD)
cmj.write_sea2msj(self.generate_subjob_msj(), msj_fname)
return msj_fname
[docs] def generate_main_msj(self) -> List[sea.Map]:
raw = cmj.msj2sea_full(self.from_main_msj)
self._set_system_builder_params(raw, **self._builder_params)
s = next(_find_stage(raw.stage, 'mixed_solvent_setup'))
s.set_value("custom_probe_dir",
self.custom_probe_dir,
tag=_SET_BY_USER_TAG)
# update msj filename
for s in _find_stage(raw.stage, stage.Multisim.NAME):
for job in s.job:
cmd = job.val
msj_index = cmd.index("-m") + 1
cmd[msj_index] = self._get_msj_filename(FepLegTypes.MD)
cmd += ['-lic', license.md_lic(self.subhost)]
job.val = cmd
return raw.stage
[docs] def generate_subjob_msj(self) -> List[sea.Map]:
raw = cmj.msj2sea_full(self.from_md_msj)
self._set_ff(raw)
self._set_time(raw)
return raw.stage
def _set_time(self, raw: sea.Map):
# Set equilibrate time
s = list(_find_stage(raw.stage, 'concatenate'))[-1]
s.simulate[-1].time.val = self.equilibrate_time
s.simulate[
-1].title.val = f'NPT and no restraints, {self.equilibrate_time/1000.}ns'
# Set simulation time
for s in self._find_production_stages(raw):
s.time.val = self.sim_params['sim_time']
def _set_system_builder_params(self, msj, **kwargs):
for s in _find_stage(msj.stage, stage.MixedSolventSetup.NAME):
for k, v in kwargs.items():
try:
attr = s[k]
attr.val = v
attr.add_tag(_SET_BY_USER_TAG)
except KeyError:
util.fatal(f"'{k}' is not a valid keyword")
[docs]class AbsoluteBindingMsjGenerator(MapperMsjGenerator):
"""
Generates the msj files needed to run the Absolute FEP workflow.
"""
# Default of True, fc = 0 means turn off the ligand rotatable
# dihedrals and do not add a restraint.
_DEFAULT_POSE_CONF_RESTRAINT = {
'enable': True,
'name': 'soft',
'sigma': 0.0,
'alpha': 1.0,
'fc': 0.0,
}
_DEEFAULT_MD_RESTRAINT = {
'new': [{
'name': 'posre_harm',
'atoms': 'asl:protein and backbone and not a.ele H',
'force_constants': 50.0
}],
'existing': 'ignore'
}
[docs] def __init__(self, jobname: str, cd_params: Dict[str, object], **kwargs):
"""
See `MapperMsjGenerator` for a description of the input parameters.
In addition to these parameters, the following are supported.
:param md_sim_time: The simulation time for the md production stage.
:param ligand_asl: Set the ASL used to identify the ligand.
:param ligand_restraint: Set to the parameters for the ligand restraints.
:type ligand_restraint: Dictionary with the keys: 'enable', 'name',
'sigma', 'alpha', 'fc'.
:param adaptive_ligand_restraint: Set to the parameters for the adaptive ligand restraints.
:type adaptive_ligand_restraint: Dictionary with the keys: 'enable', 'name',
'sigma', 'alpha', 'fc'.
:param use_representative_structure: Set to True to use the representative structure
from MD to prepare the FEP simulations. The default of False means
to use the last frame from MD.
"""
kwargs.setdefault('defaults', {})
kwargs['defaults'].update({
'md_sim_time': 1000,
'ligand_restraint': {
'enable': False
},
'adaptive_ligand_restraint': {
'enable': False
},
'use_representative_structure': True,
'use_centroid': True,
'use_pl_interactions': True,
'min_angle': 45,
'ligand_asl': f'a.{FEP_ABSOLUTE_BINDING_LIGAND} 1',
'receptor_asl': 'protein',
'graph_file': '',
})
super().__init__(jobname, cd_params, **kwargs)
if self.sim_params['concatenate']:
raise NotImplementedError(
'Concatenate is not supported for the absolute binding workflow.'
)
self.fep_type = FEP_TYPES.ABSOLUTE_BINDING
# Paths to template msj files
self.from_main_msj = Path(DESMOND_DATA_DIR, "abfep_launcher.msj")
self.from_subjob_msj = Path(DESMOND_DATA_DIR,
PROTOCOL_FNAME[self.sim_params['ensemble']])
self.from_md_msj = Path(DESMOND_DATA_DIR, "abfep_md.msj")
def _get_schedule(self, leg: str, protocol: SIMULATION_PROTOCOL):
schedule_path = Path(DESMOND_DATA_DIR, "abfep")
postfix = '_restrained' if self.restraint_enabled else ''
if protocol == SIMULATION_PROTOCOL.CHARGED:
postfix += '_chg'
return sea.Map(
(schedule_path /
f'{leg}_schedule{postfix}.msj').read_text('latin-1')).schedule
def _get_num_lambda_windows(self, leg: str, protocol: SIMULATION_PROTOCOL):
return self._get_schedule(leg, protocol).weights.N.val
def _get_solvent_restrained_lambda_windows(self,
protocol: SIMULATION_PROTOCOL):
sched = self._get_schedule('solvent', protocol)
if self.restraint_enabled:
return len([x for x in sched.flexible[0].A.val if x])
return 0
@property
def restraint_enabled(self):
return bool(self.sim_params['ligand_restraint']['enable'] or
self.sim_params['adaptive_ligand_restraint']['enable'])
[docs] def generate_main_msj(self) -> sea.Map:
msj = cmj.msj2sea_full(self.from_main_msj)
mps_factor = self.cd_params['mps_factor']
md_launcher_stage = next(
_find_stage(msj.stage, stage.FepAbsoluteBindingMdLauncher.NAME))
fep_launcher_stage = next(
_find_stage(msj.stage, stage.FepAbsoluteBindingFepLauncher.NAME))
analysis_stage = next(
_find_stage(msj.stage, stage.FepAbsoluteBindingAnalysis.NAME))
if self.sim_params['ffbuilder']:
self._set_ffbuilder(msj)
# Set number of cpus
num_cpus = self.cd_params['cpus']
# FEP jobs can use more than 1 CPU/GPU
for protocol in [
SIMULATION_PROTOCOL.DEFAULT,
SIMULATION_PROTOCOL.CHARGED,
]:
num_windows = max(
self._get_num_lambda_windows('complex', protocol),
self._get_num_lambda_windows('solvent', protocol),
)
# MD can only use 1 CPU/GPU
self._set_job_num_cpus(md_launcher_stage["dispatch"][protocol],
1,
1,
max_walltime=self.sim_params['max_walltime'])
self._set_job_num_cpus(fep_launcher_stage["dispatch"][protocol],
num_cpus,
num_windows,
mps_factor=mps_factor,
max_walltime=self.sim_params['max_walltime'])
for i, leg in enumerate(['complex', 'solvent']):
subjobname = f'$MAINJOBNAME_$JOBTAG_{leg}'
job = fep_launcher_stage["dispatch"][protocol][i]
job.extend([
'-m',
self._get_msj_filename(leg, protocol),
"-o",
f'{subjobname}-out.mae',
"-JOBNAME",
subjobname,
])
subjobname = '$MAINJOBNAME_$JOBTAG_md'
# MD is the only protocol here
job = md_launcher_stage["dispatch"][protocol][0]
job.extend([
'-m',
self._get_msj_filename('md', protocol),
"-o",
f'{subjobname}-out.mae',
"-JOBNAME",
subjobname,
])
md_launcher_stage.set_value("input_graph_file",
self.sim_params['graph_file'],
tag=_SET_BY_USER_TAG)
fep_launcher_stage.set_value("input_graph_file",
self.sim_params['graph_file'],
tag=_SET_BY_USER_TAG)
analysis_stage.set_value("input_graph_file",
self.sim_params['graph_file'],
tag=_SET_BY_USER_TAG)
return msj.stage
[docs] def generate_subjob_msj(
self,
leg: ABSOLUTE_BINDING_LEGS.VAL,
protocol: SIMULATION_PROTOCOL,
) -> sea.Map:
"""
:param leg: Generate the msj for the given leg.
"""
if leg == ABSOLUTE_BINDING_LEGS.VAL.SOLVENT:
# solvent leg uses NPT
with self.patch_ensemble():
fep_msj = self._generate_fep_msj(leg, protocol)
else:
fep_msj = self._generate_fep_msj(leg, protocol)
return fep_msj
[docs] def generate_md_msj(self, protocol: SIMULATION_PROTOCOL) -> sea.Map:
msj = cmj.msj2sea_full(self.from_md_msj)
subjob_msj = cmj.msj2sea_full(self.from_subjob_msj)
excluded_stages = [stage.Task.NAME, stage.FepAnalysis.NAME]
msj.stage.extend(
[s for s in subjob_msj.stage if s.__NAME__ not in excluded_stages])
if self.sim_params['h_mass']:
self._set_hmr(msj)
# Set up box
self._set_water_model(msj)
if self.sim_params.get('membrane'):
self._set_box_parameters(msj)
else:
buffer_width = 5.0
if protocol == SIMULATION_PROTOCOL.CHARGED:
self._set_neutralize_box(msj)
buffer_width = 8.0
self._set_buffer_width(msj, buffer_width)
# Set up forcefield
self._set_ff(msj)
self._set_assign_custom_charge(msj,
self.sim_params['custom_charge_mode'])
self._set_permanent_restraint(
msj, AbsoluteBindingMsjGenerator._DEEFAULT_MD_RESTRAINT)
# Set up production parameters
self._set_production_parameters(msj, self.sim_params['md_sim_time'],
self.sim_params['rand_seed'])
# Save MD traj more frequently, it is used to determine the restraints
self._set_trajectory_interval(msj, 3.6)
# Prepare for FEP using the MD outputs
self._set_fep_absolute_binding_fep_primer(msj)
keep_struc_tags = [
FEP_STRUC_TAG.VAL.RECEPTOR, FEP_STRUC_TAG.VAL.SOLVENT,
FEP_STRUC_TAG.VAL.MEMBRANE, FEP_STRUC_TAG.VAL.LIGAND
]
self._set_extract_structures(msj, keep_struc_tags)
# Set up ensemble
if self.sim_params['ensemble'] == Ensembles.MUVT:
self._set_gcmc(msj)
self._set_restraint_retain(msj)
self._set_fep_analysis(msj, False)
# Mark the MD stages as being preperation to run fep
self._set_is_for_fep(msj)
return msj.stage
def _generate_fep_msj(self, leg: ABSOLUTE_BINDING_LEGS.VAL,
protocol: SIMULATION_PROTOCOL) -> sea.Map:
msj = cmj.msj2sea_full(self.from_subjob_msj)
if self.sim_params['h_mass']:
self._set_hmr(msj)
# Set up box
self._set_water_model(msj)
# For Absolute binding FEP, we still use the old alchemical ion
# mechanism until DESMOND-9873 is resolved.
net_charge = (protocol == SIMULATION_PROTOCOL.CHARGED)
if self.sim_params['salt_concentration'] or net_charge:
alchemical_ions_params = None
if net_charge:
alchemical_ions_params = {
'map_to_neutral_atoms': False,
'use_alchemical_water': False
}
self._set_net_charge_box(msj,
False,
alchemical_ions_params,
net_charge=net_charge)
# Set up production parameters
lambda_windows = self._get_num_lambda_windows(leg, protocol)
schedule = self._get_schedule(leg, protocol)
if leg == ABSOLUTE_BINDING_LEGS.VAL.COMPLEX:
# The box parameters are read in from the MD stage
self._set_box_parameters(msj)
end_win = -1
else:
# Set up box
if net_charge:
self._set_neutralize_box(msj)
self._set_buffer_width(msj, min_buffer_width=10.0)
# solvent leg uses a custom rest schedule that omits
# scaling the lambdas where the ligand is restrained.
end_win = (lambda_windows -
self._get_solvent_restrained_lambda_windows(protocol))
# Set up forcefield
self._set_ff(msj)
self._set_assign_forcefield_pose_conf_restraint(
msj, AbsoluteBindingMsjGenerator._DEFAULT_POSE_CONF_RESTRAINT)
# DESMOND-10796 - set fc for pose_conf_restraint for ligand_restraint
# and not adaptive_ligand_restraint. The adaptive restraint uses
# flat bottom potential while the pose conf restraint uses a
# soft harmonic potential. In case that a torsion samples a
# wide range in the MD stage, the sigma for the flat potential
# is set to be large so that the torsion can still sample a
# wide range in FEP. But if the pose conf restraint is on with
# fc>0, the harmonic potential is always there so that the
# torsion is restrained in FEP. In other words, if the
# pose conf restraint is on with fc>0, the restraints is
# not adaptive as we intend to.
if self.sim_params['ligand_restraint']['enable']:
self._set_assign_forcefield_pose_conf_restraint(
msj, self.sim_params['ligand_restraint'])
self._set_assign_custom_charge(msj, CUSTOM_CHARGE_MODE.KEEP)
for production_stage in self._find_production_stages(msj):
production_stage.__NAME__ = stage.LambdaHopping.NAME
self._set_solute_tempering_temperature_ladder(msj,
start_win=0,
end_win=end_win)
if leg == FepLegTypes.COMPLEX:
sim_time = self.sim_params['sim_time_complex']
else:
sim_time = self.sim_params['sim_time_solvent']
self._set_production_parameters(msj, sim_time,
self.sim_params['rand_seed'])
self._set_lambda_windows(msj, lambda_windows)
self._set_custom_lambda_schedule(msj, schedule)
self._set_print_expected_memory(msj)
self._set_fep_type(msj)
# Set up ensemble
if self.sim_params['ensemble'] == Ensembles.MUVT:
self._set_gcmc(msj)
self._set_restraint_retain(msj)
self._insert_stage_load_restraints_from_structure(msj)
self._set_trim_stage(msj)
self._set_compress_cms(msj)
# This is the header used to filter the structures so only those
# for the given leg remain.
msj.stage.insert(
0,
cmj.msj2sea_full(
None,
msj_content=f"{stage.Task.NAME} {{task = generic}}").stage[0])
msj.stage.insert(
1,
cmj.msj2sea_full(
None,
msj_content=
f"{stage.FepAbsoluteBindingStructurePrimer.NAME} {{leg={leg}}}"
).stage[0])
return msj.stage
[docs] def write_main_msj(self) -> str:
"""
Write out the main msj and return the filename.
"""
msj_fname = self._get_msj_filename()
cmj.write_sea2msj(self.generate_main_msj(), msj_fname)
return msj_fname
[docs] def write_md_msj(self):
"""
Write out the md msjs.
"""
for protocol in [
SIMULATION_PROTOCOL.DEFAULT,
SIMULATION_PROTOCOL.CHARGED,
]:
msj_fname = self._get_msj_filename('md', protocol)
cmj.write_sea2msj(self.generate_md_msj(protocol), msj_fname)
[docs] def write_complex_msj(self):
"""
Write out the complex msjs
"""
for protocol in [
SIMULATION_PROTOCOL.DEFAULT,
SIMULATION_PROTOCOL.CHARGED,
]:
cmj.write_sea2msj(
self.generate_subjob_msj(ABSOLUTE_BINDING_LEGS.VAL.COMPLEX,
protocol),
self._get_msj_filename('complex', protocol))
[docs] def write_solvent_msj(self):
"""
Write out the solvent msjs
"""
for protocol in [
SIMULATION_PROTOCOL.DEFAULT,
SIMULATION_PROTOCOL.CHARGED,
]:
cmj.write_sea2msj(
self.generate_subjob_msj(ABSOLUTE_BINDING_LEGS.VAL.SOLVENT,
protocol),
self._get_msj_filename('solvent', protocol))
def _set_fep_absolute_binding_fep_primer(self, msj: sea.Map):
"""
Set the parameters for the absolute binding fep primer stage.
"""
primer = cmj.msj2sea_full(
None, msj_content=f"{stage.FepAbsoluteBindingFepPrimer.NAME} {{}}"
).stage[0]
primer.set_value("ligand_restraint.enable",
self.sim_params['ligand_restraint']['enable'],
tag=_SET_BY_USER_TAG)
primer.set_value("adaptive_ligand_restraint.enable",
self.sim_params['adaptive_ligand_restraint']['enable'],
tag=_SET_BY_USER_TAG)
primer.set_value("use_representative_structure",
self.sim_params['use_representative_structure'],
tag=_SET_BY_USER_TAG)
primer.set_value("cross_link_restraint.use_centroid",
self.sim_params['use_centroid'],
tag=_SET_BY_USER_TAG)
primer.set_value("cross_link_restraint.use_pl_interactions",
self.sim_params['use_pl_interactions'],
tag=_SET_BY_USER_TAG)
primer.set_value("cross_link_restraint.min_angle",
self.sim_params['min_angle'],
tag=_SET_BY_USER_TAG)
primer.set_value("cross_link_restraint.receptor_asl",
self.sim_params['receptor_asl'],
tag=_SET_BY_USER_TAG)
# Ignored with new restraints DESMOND-10760
primer.set_value("cross_link_restraint.use_bonded_atoms",
False,
tag=_SET_BY_USER_TAG)
primer.set_value("ligand_asl",
self.sim_params['ligand_asl'],
tag=_SET_BY_USER_TAG)
msj.stage.insert(-1, primer)
def _set_custom_lambda_schedule(self, msj: sea.Map, schedule: Dict):
"""
Set the custom lambda schedule on the task stage.
"""
for s in _find_stage(msj.stage, stage.Task.NAME):
s.set_value("set_family.desmond.backend.force.term.gibbs",
schedule,
tag=_SET_BY_USER_TAG)
def _set_assign_forcefield_pose_conf_restraint(self, msj: sea.Map,
pose_conf_restraint: Dict):
"""
Set the pose_conf_restraint parameters for the assign_forcefield stage.
:param pose_conf_restraint: Dictionary containing the parameters for the
pose_conf_restraint. See the `AssignForcefield` stage for
more information on valid parameters.
"""
for s in _find_stage(msj.stage, stage.AssignForcefield.NAME):
s.set_value("pose_conf_restraint",
sea.Map(pose_conf_restraint),
tag=_SET_BY_USER_TAG)
def _set_solute_tempering_temperature_ladder(
self,
msj: sea.Map,
rest_property: str = REST_HOTREGION,
start_win=0,
end_win=-1):
"""
Set the lambda hopping specific parameters in the simulation msj.
:param msj: Msj to update in place.
:param rest_property: Name of the ct property for custom REST scaling.
"""
for production_stage in self._find_production_stages(msj):
production_stage["solute_tempering"] = sea.Map("""
atom = "%s"
temperature = {
generator = PvdS
exchange_probability = 0.3
start_win = "%d"
end_win = "%d"
}
""" % (self.asl % rest_property, start_win, end_win))
production_stage.solute_tempering.add_tag(_SET_BY_USER_TAG)
def _set_trim_stage(self, msj: sea.Map):
"""
Add the trim stage after the last simulate or concatenate stage
to clear the cms files.
"""
indices = [
idx for idx, s in enumerate(msj.stage)
if s.__NAME__ in [stage.Simulate.NAME, stage.Concatenate.NAME]
]
# Add the trim stage after the last simulate/concatenate stage
trim_stage = cmj.msj2sea_full(None, msj_content=_TRIM_TEMPLATE)
trim_idx = indices[-1] + 1
erase_list = trim_stage.stage[0].erase.val
# Use the last item as the template to erase the cms files
erase_cms = erase_list[-2]
erase_tgz = erase_list[-1]
# Keep the first item to erase the previous stage input cms files
erase_list = [erase_list[0]]
# Update the stage index
for idx in indices[:-1]:
erase_cms_template = copy.copy(erase_cms)
erase_cms_template[0] = idx - trim_idx
erase_list.append(erase_cms_template)
for idx in indices[:-1]:
erase_tgz_template = copy.copy(erase_tgz)
erase_tgz_template[0] = idx - trim_idx
erase_list.append(erase_tgz_template)
trim_stage.stage[0].erase.val = erase_list
# Insert the trim stage
msj.stage.insert(trim_idx, trim_stage.stage[0])
def _set_trajectory_interval(self, msj: sea.Map,
trajectory_interval: float):
"""
Set the trajectory interval for the production simulations.
:param msj: Msj to update in place.
:param trajectory_interval: Frequency to save the trajectory in ps.
"""
for s in self._find_production_stages(msj):
s.set_value("trajectory.interval",
trajectory_interval,
tag=_SET_BY_USER_TAG)
_LAMBDA_PLUGIN_PATH = "set_family.desmond.backend.mdsim.plugin.lambda_plugin"
_REMD_PLUGIN_PATH = "set_family.replica_exchange.backend.remd.plugin.lambda_plugin"
[docs]class ConstantpHMsjGenerator(BaseFepMsjGenerator):
[docs] def __init__(self, jobname: str, cd_params: Dict[str, object], **kwargs):
kwargs.setdefault('defaults', {})
kwargs['defaults'].update({
'phs': [],
'ph_md': 7.0,
'order': constants.CONSTANT_PH_PMF_ORDER,
'site_info': {},
})
super().__init__(jobname, cd_params, **kwargs)
self.from_msj = os.path.join(DESMOND_DATA_DIR, 'constant_ph.msj')
self.from_cfg = os.path.join(DESMOND_DATA_DIR, 'constant_ph.cfg')
self.cfg_fname = f'{jobname}.cfg'
[docs] def write_msj(self) -> str:
msj_fname = self._get_msj_filename()
cmj.write_sea2msj(self.generate_msj(), msj_fname)
return msj_fname
[docs] def write_cfg(self) -> str:
Path(self.cfg_fname).write_text(str(self.generate_cfg()))
return self.cfg_fname
[docs] def generate_msj(self) -> List[sea.Map]:
msj = cmj.msj2sea_full(self.from_msj)
self._set_is_for_fep(msj)
self._set_compress_cms(msj)
# Set up lambda plugin via task stage
params = {}
for plugin in (_LAMBDA_PLUGIN_PATH, _REMD_PLUGIN_PATH):
params[plugin + '.sites'] = list(
self.sim_params['site_info'].keys())
params[plugin + '.order'] = self.sim_params['order']
for site, info in self.sim_params['site_info'].items():
for k, v in info.items():
params[f'{plugin}.{site}.{k}'] = v
# Only set pH for the equilibrium MD stages
params[_LAMBDA_PLUGIN_PATH + '.pH'] = self.sim_params['ph_md']
_set_stage_params(msj, 'task', params)
# Set up replica exchange parameters
replica_values = [sea.Map({'temperature': 300.0})] * len(
self.sim_params['phs'])
params = {
'replica': replica_values,
'cfg_file': self.cfg_fname,
'time': self.sim_params['sim_time'],
'backend.seed': self.sim_params['rand_seed']
}
# REMD pH graph
for replica_idx, pH in enumerate(self.sim_params['phs']):
replica_key = f'remd.graph.T{replica_idx:02d}.remd.plugin.lambda_plugin'
replica_val = {
'name': f"$JOBNAME_replica{replica_idx}_lam.txt",
'pH': pH
}
params[f'backend.{replica_key}'] = sea.Map(replica_val)
_set_stage_params(msj, stage.ReplicaExchange.NAME, params)
params = {'phs': self.sim_params['phs']}
_set_stage_params(msj, stage.ConstantpHAnalysis.NAME, params)
self._set_seed(msj, rand_seed=self.sim_params['rand_seed'])
return msj.stage
[docs] def generate_cfg(self) -> List[sea.Map]:
cfg = sea.Map(Path(self.from_cfg).read_text())
return cfg
COVALENT_HEAD = """task {
set_family = {
desmond = {
checkpt = {
write_last_step = false
}
}
}
task = "generic"
}
# Set up structures for task below
extract_structures {
}
"""
_TRIM_TEMPLATE = """
trim {
erase = [
[-1 "$MAINJOBNAME*/$MAINJOBNAME_$STAGENO*_lambda*/*-in.cms.gz" ]
[-2 "$MAINJOBNAME*/$MAINJOBNAME_$STAGENO*_lambda*/*.cms.gz" ]
[-2 "$MAINJOBNAME_$STAGENO-out.tgz" ]
]
title = "Remove cms and tgz files in the prior stages to save disk space."
}
"""