Source code for schrodinger.application.desmond.fep_scholar_msj_generator
import os
import re
import shutil
import schrodinger.application.desmond.cmj as cmj
import schrodinger.utils.sea as sea
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import stage
# Here to make sure import is not removed
_ = stage.Task
MMSHARE_DIR = os.path.dirname(os.path.dirname(os.environ["MMSHARE_EXEC"]))
DESMOND_DATA_DIR = os.path.join(MMSHARE_DIR, "data", "desmond")
ENSEMBLE = {'NVT': "desmond_nvt_fep.msj", 'NPT': "desmond_npt_fep.msj"}
[docs]class FEPScholarMsjGenerator(object):
"""
This class generates main msj file that manages both solvent and complex
legs of the FEP simulation.
"""
[docs] def __init__(self,
jobname,
cd_params,
structure_fname,
force_field="OPLS_2005",
ensemble="NPT",
sim_time="5000",
rand_seed="2015",
complex_box_buffer_width=5.0,
trj_interval=24.0):
"""
Valid FF : OPLS_2005 CHARMM AMBER
"""
self.jobname = jobname
self.cd_params = cd_params
self.structure_fname = structure_fname
self.force_field = force_field
self.ensemble = ENSEMBLE[ensemble]
self.sim_time = sim_time
self.rand_seed = rand_seed
self.trj_interval = trj_interval
self.from_main_msj = os.path.join(DESMOND_DATA_DIR,
"fep_scholar_launcher.msj")
# By default we should be running npt
self.from_subjob_msj = os.path.join(DESMOND_DATA_DIR, self.ensemble)
self.main_msj_fname = self.jobname + ".msj"
self.complexmsj_fname = self.jobname + "_complex.msj"
self.solventmsj_fname = self.jobname + "_solvent.msj"
self.pv_fname = self.jobname + '_pv.maegz'
# always use predefined REST selection region
self.asl = f"asl: atom.{constants.REST_HOTREGION} 1"
self.solvent_box_buffer_width = 10.0
self.complex_box_buffer_width = complex_box_buffer_width
[docs] def write_complex_msj(self):
"""
"""
self.generate_subjob_msj(self.complexmsj_fname,
self.complex_box_buffer_width)
return self.complexmsj_fname
[docs] def write_solvent_msj(self):
"""
"""
# Note we are using solvent box of 5 angstroms too since it will be a
# large protein fragment
self.generate_subjob_msj(self.solventmsj_fname,
self.solvent_box_buffer_width,
solvent_leg=True)
return self.solventmsj_fname
[docs] def write_main_msj(self):
"""
"""
shutil.copyfile(self.from_main_msj, self.main_msj_fname)
self.generate_main_msj()
return self.main_msj_fname
[docs] def generate_subjob_msj(self, fname, buffer_width, solvent_leg=False):
with open(self.from_subjob_msj) as fh:
subjobmsj_contents = fh.read()
if solvent_leg:
subjobmsj_contents = re.sub(
"buffer_width *= *5.0",
" buffer_width = {bf}".format(bf=buffer_width),
subjobmsj_contents,
count=1)
subjobmsj_contents = re.sub(
"extract_structures *{",
"extract_structures {\n keep = [ligand]",
subjobmsj_contents,
count=1)
with open(fname, 'w') as fh:
fh.write(subjobmsj_contents)
raw = cmj.msj2sea_full(fname)
# Default is 5.0 if it is larger then we set it
if float(self.complex_box_buffer_width) > 5.0:
self.set_box_buffer_width(raw, self.complex_box_buffer_width)
self.set_lambda_hopping(raw)
self.set_ff(raw)
cmj.write_sea2msj(raw.stage, fname)
[docs] def generate_main_msj(self):
"""
"""
# 'raw' is an object of sea.Sea (or its subclass).
raw = cmj.msj2sea_full(self.main_msj_fname)
self.modify_multisim_stage(raw)
cmj.write_sea2msj(raw.stage, self.main_msj_fname)
[docs] def modify_multisim_stage(self, raw):
"""
Method modifies 'multisim' stage in the main msj file
"""
for stage in raw.stage:
if stage.__NAME__ == "multisim":
multisim_stage = stage
break
else:
raise Exception("no multisim stage in the msj file")
multisim_stage.job[0][1].val = self.complexmsj_fname
multisim_stage.job[1][1].val = self.solventmsj_fname
multisim_stage.job[0][3].val = self.cd_params['processors_per_replica']
multisim_stage.job[1][3].val = self.cd_params['processors_per_replica']
subhost_ncpu = "$SUBHOST:%d" % int(self.cd_params['cpus'])
umbrella_options = \
[sea.Atom(x) for x in ["-HOST", subhost_ncpu, "-mode", "umbrella"]]
multisim_stage.job[0].extend(umbrella_options)
multisim_stage.job[1].extend(umbrella_options)
[docs] def get_total_proc(self):
return self.cd_params["cpus"]
[docs] @staticmethod
def set_box_buffer_width(raw, width):
"""
"""
for stage in raw.stage:
if stage.__NAME__ == "build_geometry":
stage["buffer_width"] = width
stage.buffer_width.add_tag("setbyuser")
break
[docs] def set_ff(self, raw):
for stage in raw.stage:
if stage.__NAME__ == "assign_forcefield":
assign_forcefield_stage = stage
break
else:
raise Exception("No assign_forcefield stage in the msj file")
assign_forcefield_stage["forcefield"] = self.force_field
assign_forcefield_stage.forcefield.add_tag("setbyuser")
assign_forcefield_stage["core_hopping_fepio"] = "off"
assign_forcefield_stage.core_hopping_fepio.add_tag("setbyuser")
[docs] def set_lambda_hopping(self, raw):
# Finds the last ``simulate'' or ``lambda_hopping'' stage.
last_simulate = None
for stage in raw.stage:
if stage.__NAME__ in ["simulate", "lambda_hopping"]:
last_simulate = stage
# Renames the last simulate stage to lambda_hopping. If it is
# already lambda_hopping, we just assign the same name back to
# itself. It doesn't matter.
last_simulate.__NAME__ = "lambda_hopping"
last_simulate["solute_tempering"] = sea.Map("""
atom = "%s"
temperature = {
generator = PvdS
exchange_probability = 0.3
}
""" % self.asl)
last_simulate["time"] = self.sim_time
last_simulate["trajectory"]["interval"] = self.trj_interval
last_simulate["trajectory"].add_tag("setbyuser")
last_simulate["randomize_velocity"]["seed"] = self.rand_seed
last_simulate["randomize_velocity"].add_tag("setbyuser")
last_simulate.solute_tempering.add_tag("setbyuser")
last_simulate["time"].add_tag("setbyuser")