Source code for schrodinger.application.watermap.watermap_inp
"""
This is a class that generates watermap input files and runs them through jobcontrol. This also supports all the options in GUI.
Copyright Schrodinger, LLC. All rights reserved.
"""
# Contributors: Byungchan Kim
import argparse
import os
import sys
from typing import Optional
import schrodinger.application.desmond.cmj as cmj
import schrodinger.application.desmond.envir as envir
import schrodinger.application.desmond.smarts as smarts
# stage.py needs to be imported for cmj module to know the stages
import schrodinger.application.desmond.stage as stage # noqa: F401
import schrodinger.application.desmond.system_builder_util as system_builder_util
import schrodinger.job.jobcontrol as jobcontrol
import schrodinger.structure as structure
import schrodinger.structutils.transform as transform
import schrodinger.utils.cmdline as cmdline
from schrodinger.application.desmond import constants
from schrodinger.infra import mm
FFLD14 = mm.OPLS_NAME_F14
FFLD16 = mm.OPLS_NAME_F16
_SOLVATE_POCKET_TITLE = "GCMC solvate pocket"
# Define water SMARTS patterns
water_smarts = ["[H]O[H]"]
zob_water_smarts = ["[H]O([H])_[*]", "[H]O[H]_[*]"]
[docs]def print_deprecate(option, opt, value, parser):
print()
print("%s option has been DEPRECATED. Please check help message." %
(opt[1:],))
print()
[docs]class WaterMapInput:
[docs] def __init__(self, protein_st, ligand_st, **kwargs):
self._protein = protein_st
self._ligand = ligand_st
self._solute = None
self._crystal_water = None
self.retain_ligand = False
self.do_not_truncate = False
self.ligand_distance = 10.0
self.truncate_distance = 20.0
self.treat_solvent = 'as solvent'
self.simulation_time = 2.0
self.do_not_return_trajectory = False
self.solvent_buffer_distance = 5.0
self.has_membrane = False
self.turn_off_solvate_pocket = False
self.twobody = False
self.forcefield = FFLD16
self.num_solvate_pocket = 1
self.extended_gcmc = False
self.update(**kwargs)
[docs] def update(self, **kwargs):
'''
updates states of options
'''
if 'retain_ligand' in kwargs:
self.retain_ligand = kwargs['retain_ligand']
if 'do_not_truncate' in kwargs:
self.do_not_truncate = kwargs['do_not_truncate']
if 'ligand_distance' in kwargs:
self.ligand_distance = kwargs['ligand_distance']
if 'truncate_distance' in kwargs:
self.truncate_distance = kwargs['truncate_distance']
if 'simulation_time' in kwargs:
self.simulation_time = kwargs['simulation_time']
if 'treat_solvent' in kwargs:
self.treat_solvent = kwargs['treat_solvent']
if 'do_not_return_trajectory' in kwargs:
self.do_not_return_trajectory = kwargs['do_not_return_trajectory']
if 'solvent_buffer_distance' in kwargs:
print(
"solvent_buffer_distance is ignored. 'ligand_distance' should be used to set solvent buffer distance."
)
if 'has_membrane' in kwargs:
self.has_membrane = kwargs['has_membrane']
if 'turn_off_solvate_pocket' in kwargs:
self.turn_off_solvate_pocket = kwargs['turn_off_solvate_pocket']
if 'twobody' in kwargs:
self.twobody = kwargs['twobody']
if 'forcefield' in kwargs:
self.forcefield = kwargs['forcefield']
if 'num_solvate_pocket' in kwargs:
self.num_solvate_pocket = kwargs['num_solvate_pocket']
if 'extended_gcmc' in kwargs:
self.extended_gcmc = kwargs['extended_gcmc']
if self.do_not_truncate:
self.truncate_distance = 0.0
[docs] def reorder_solute(self, st):
'''
reorder atoms so that the very first atom is located close to origin to prevent solute to be shifted to other images.
'''
new_st = st.copy()
reorder_array = [0]
for i in range(self._solute.atom_total):
reorder_array.append(i + 1)
min_dist = 1.0E10
min_index = 0
for a in self._solute.atom:
if "b_wmap_ligand" in a.property and a.property["b_wmap_ligand"]:
continue
dist = a.x * a.x + a.y * a.y + a.z * a.z
if dist < min_dist:
min_dist = dist
min_index = a.index
reorder_array[1] = min_index
reorder_array[min_index] = 1
mm.mmct_ct_reorder(new_st, st, reorder_array)
return new_st
[docs] def createTip4pFfio(self, ct):
mm.mmffio_initialize(mm.MMERR_DEFAULT_HANDLER)
mmshare_exec = os.environ.get('MMSHARE_EXEC')
# FIXME this will not work on Windows:
mmshare_data = mmshare_exec + '/../../data/system_builder'
tip4p_fname = os.path.join(mmshare_data, 'tip4p.box.mae')
st_reader = structure.StructureReader(tip4p_fname)
st = next(st_reader)
ff = mm.mmffio_ff_mmct_get(st.handle)
mm.mmffio_ff_mmct_put(ff, ct)
mm.mmffio_terminate()
[docs] def make_canonical_solvent(self, st):
'''
Fix atomic order to O H H
'''
new_st = st.copy()
reorder = [0]
for m in st.molecule:
for i in m.getAtomIndices():
reorder.append(i)
mm.mmct_ct_reorder(new_st, st, reorder)
return new_st
[docs] def prepareStructures(self):
'''
Prepare -solute, -protein, -ligand.mae files.
'''
# tag ligand atoms when it is retained
if self.retain_ligand:
for a in self._ligand.atom:
a.property["b_wmap_ligand"] = True
st = system_builder_util.truncateProtein(self._protein, self._ligand,
self.retain_ligand,
self.truncate_distance)
if self.truncate_distance > 0.0:
for a in st.atom:
if "b_wmap_ligand" in a.property and a.property["b_wmap_ligand"]:
continue
else:
a.property[constants.FF_USE_EXISTING_CHARGE] = 1
if self.retain_ligand:
st.property["b_wmap_retain_ligand"] = True
# Allow some structural waters to be kept when running WaterMap [Ev:72892]
st.property[constants.CT_TYPE] = constants.CT_TYPE.VAL.SOLUTE
self._ligand.property[constants.CT_TYPE] = constants.CT_TYPE.VAL.LIGAND
self._protein.property[
constants.CT_TYPE] = constants.CT_TYPE.VAL.RECEPTOR
water_atoms = smarts.evaluate_net_smarts(st, water_smarts,
zob_water_smarts)
if len(water_atoms):
# Separate water to crystal water CT
tmp_water_st = st.extract(water_atoms)
self._crystal_water = self.make_canonical_solvent(tmp_water_st)
self._crystal_water.property['i_ffio_num_component'] = 1
if self.treat_solvent == 'as solvent':
self._crystal_water.property[
constants.CT_TYPE] = constants.CT_TYPE.VAL.CRYSTAL_WATER
self.createTip4pFfio(self._crystal_water.handle)
for a in self._crystal_water.atom:
a.pdbres = 'T4P '
# remove water from protein
del_atoms = smarts.evaluate_net_smarts(self._protein,
water_smarts,
zob_water_smarts)
self._protein.deleteAtoms(del_atoms)
# remove water from solute
st.deleteAtoms(water_atoms)
# Restrain solute water
elif self.treat_solvent == 'as solute':
self._crystal_water = None
# Remove water from solute and protein
elif self.treat_solvent == 'delete':
self._crystal_water = None
# remove water from protein
del_atoms = smarts.evaluate_net_smarts(self._protein,
water_smarts,
zob_water_smarts)
self._protein.deleteAtoms(del_atoms)
# remove water from solute
st.deleteAtoms(water_atoms)
# set all solute waters including zero-order bonded waters
# with T3P resname and restraint
water_atoms = smarts.evaluate_net_smarts(st, water_smarts, [])
for i in water_atoms:
a = st.atom[i]
a.pdbres = 'T3P '
if a.atomic_number > 1:
a.property['i_wmap_restrain'] = 1
self._solute = st
def _writeStructures(self, jobname):
'''
write structures as mae files
'''
if not self._solute:
try:
self.prepareStructures()
except Exception:
raise
if (not self._protein) or (self._protein.atom_total == 0):
raise RuntimeError('Protein atoms do not exist.')
if (not self._ligand) or (self._ligand.atom_total == 0):
raise RuntimeError('Ligand atoms do not exist.')
centroid = [0.0, 0.0, 0.0]
if not self.has_membrane:
centroid = transform.get_centroid(self._ligand)
x = -centroid[0]
y = -centroid[1]
z = -centroid[2]
transform.translate_structure(self._ligand, x, y, z)
self._ligand.title = "sampling_coordinates"
self._ligand.property['r_watermap_trans1'] = -x
self._ligand.property['r_watermap_trans2'] = -y
self._ligand.property['r_watermap_trans3'] = -z
transform.translate_structure(self._solute, x, y, z)
self._solute.property['r_watermap_trans1'] = -x
self._solute.property['r_watermap_trans2'] = -y
self._solute.property['r_watermap_trans3'] = -z
if self._crystal_water:
transform.translate_structure(self._crystal_water, x, y, z)
self._crystal_water.property['r_watermap_trans1'] = -x
self._crystal_water.property['r_watermap_trans2'] = -y
self._crystal_water.property['r_watermap_trans3'] = -z
if (not self._solute) or (self._solute.atom_total == 0):
raise RuntimeError(
'Truncation distance is too short to include any protein atoms. Please increase the truncation distance.'
)
self._solute = self.reorder_solute(self._solute)
self._solute.write(self.in_fname)
self._ligand.append(self.in_fname)
self._protein.append(self.in_fname)
if self._crystal_water:
self._crystal_water.append(self.in_fname)
[docs] def writeMSJ(self) -> Optional[str]:
"""
Write multisim file to the returned str.
If there was a problem generating the msj, return None.
"""
# TODO should we support extended gcmc in the future or remove this
# entirely?
assert not self.extended_gcmc, "Extended GCMC is currently not " \
"supported"
if self.extended_gcmc:
template_fname = os.path.join(envir.CONST.MMSHARE_DATA_DESMOND_DIR,
"watermap_extended_template.msj")
else:
template_fname = os.path.join(envir.CONST.MMSHARE_DATA_DESMOND_DIR,
"watermap_gpu_template.msj")
with open(template_fname) as f:
s = f.read()
s = s.replace('$SIMULATION_TIME', str(self.simulation_time * 1000.0))
s = s.replace('$LIGAND_DISTANCE', str(self.ligand_distance))
all_stages = cmj.parse_msj(None, msj_content=s)
water_model = None
for stg in all_stages:
if stg.NAME == "system_builder" or stg.NAME == "build_geometry":
if self.has_membrane:
stg.param.should_skip.add_tag("setbyuser")
stg.param.should_skip.val = True
if self.treat_solvent == "as solute":
stg.param.distil_solute.add_tag("setbyuser")
stg.param.distil_solute.val = False
if stg.NAME == "build_geometry" and self.forcefield != FFLD14:
stg.param.override_forcefield.add_tag("setbyuser")
stg.param.override_forcefield.val = self.forcefield
if stg.NAME == "assign_forcefield":
if self.forcefield != FFLD14:
stg.param.forcefield.add_tag("setbyuser")
stg.param.forcefield.val = self.forcefield
water_model = stg.param.water.val
# the GCMC solvate_pocket replacement cannot be identified by stg.NAME
# which is simply "simulate"
if stg.param.title.val == _SOLVATE_POCKET_TITLE:
if self.turn_off_solvate_pocket:
stg.param.should_skip.add_tag("setbyuser")
stg.param.should_skip.val = True
else:
water_model = water_model or 'SPC'
timestep = int(stg.param.timestep.val[0] * 1000)
key = (water_model, timestep)
if key not in constants.GCMC_CHEMICAL_POTENTIALS:
print(
f"ERROR: Water model {water_model} not calibrated for GCMC at a {timestep} fs timestep. "
"Turn off solvate pocket to run with this model.")
return None
mu_excess, solvent_density = constants.GCMC_CHEMICAL_POTENTIALS[
key]
stg.param.gcmc.mu_excess.val = mu_excess
stg.param.gcmc.mu_excess.add_tag("setbyuser")
stg.param.gcmc.solvent_density.val = solvent_density
stg.param.gcmc.solvent_density.add_tag("setbyuser")
if stg.NAME == "watermap_post_analysis" and self.do_not_return_trajectory:
stg.param.do_not_return_trajectory.add_tag("setbyuser")
stg.param.do_not_return_trajectory.val = True
if stg.NAME == "watermap_cluster":
if self.twobody:
stg.param.twobody.add_tag("setbyuser")
stg.param.twobody.val = 1
if stg.NAME == "simulate" and self.extended_gcmc:
try:
stg.param.backend.mdsim.plugin.SpatialActiveSite # production stage
stg.param.time.val = 200.0
except AttributeError:
pass
s = cmj.write_msj(all_stages, to_str=True)
return s
[docs] def write(self, jobname, suffix="maegz", hostname=None, cpus=None):
"""
Call all write functions
:param jobname: The name of the job in the command example
:type jobname: str
:param suffix: The file extension for the input file
:type suffix: str
:param hostname: The name of the host in the command example
:type hostname: str
:param cpus: The number of cpus in the command example
:type cpus: int
"""
self.in_fname = jobname + "-in." + suffix
self.msj_fname = jobname + '.msj'
self._writeStructures(jobname)
host_and_cpus_args = ""
if hostname and cpus:
host_and_cpus_args = " -HOST %s -cpu %s" % (hostname, str(cpus))
s = self.writeMSJ()
if not s:
raise RuntimeError("Could not generate msj for job submission.")
s += '# command example:\n'
s += '# $SCHRODINGER/watermap -JOBNAME %s%s -m %s %s\n\n' % (
jobname, host_and_cpus_args, self.msj_fname, self.in_fname)
fh = open(self.msj_fname, "w")
print(s, file=fh)
fh.close()
[docs] def run(self, jobname, host, cpu):
'''
Run WaterMap Job
'''
watermap_cmd = os.path.join(os.environ["SCHRODINGER"], "watermap")
cmd_line = [
watermap_cmd,
"-JOBNAME",
jobname,
"-HOST",
host,
"-cpu",
str(cpu),
"-m",
self.msj_fname,
self.in_fname,
]
job = jobcontrol.launch_job(cmd_line)
if not job:
raise RuntimeError(
"Desmond submission failed. Please check the accessibility of the cluster machine and of Desmond engine over there."
)
return job
[docs]def main(opt):
if not os.path.exists(opt.protein):
print('%s does not exists.' % opt.protein)
print('Use -protein to specify a protein file name.')
sys.exit(1)
if not os.path.exists(opt.ligand):
print('Ligand, %s, does not exists.' % opt.ligand)
print('Use -ligand to specify a ligand file name.')
sys.exit(1)
if opt.extended_gcmc:
print('WaterMap does not currently support the extended_gcmc option.')
sys.exit(1)
protein_st = None
for st in structure.StructureReader(opt.protein):
if protein_st:
protein_st.extend(st)
else:
protein_st = st
ligand_st = None
for st in structure.StructureReader(opt.ligand):
if ligand_st:
ligand_st.extend(st)
else:
ligand_st = st
wm_inp = WaterMapInput(
protein_st,
ligand_st,
retain_ligand=opt.retain_ligand,
do_not_truncate=opt.do_not_truncate,
ligand_distance=opt.ligand_distance,
truncate_distance=opt.truncate_distance,
treat_solvent=opt.treat_solvent,
simulation_time=opt.simulation_time,
do_not_return_trajectory=opt.do_not_return_trajectory,
turn_off_solvate_pocket=opt.turn_off_solvate_pocket,
twobody=opt.twobody,
forcefield=opt.forcefield,
num_solvate_pocket=opt.num_solvate_pocket,
extended_gcmc=opt.extended_gcmc)
jobname = opt.jobname
if not jobname:
jobname = 'watermap'
host_list = jobcontrol.get_command_line_host_list()
host = host_list[0][0]
if (not host):
host = 'localhost'
wm_inp.write(jobname)
if not opt.do_not_run:
job = wm_inp.run(jobname, opt.host, opt.cpu)
print(" JobId : %s" % job.job_id)
if (__name__ == "__main__"):
usage = '''
$SCHRODINGER/run -FROM watermap %prog -protein protein.mae -ligand ligand.mae [options]
Description:
%prog prepares and submits WaterMap Jobs
Example to retain ligand:
%prog -JOBNAME watermap_test -protein protein.mae -ligand ligand.mae -retain_ligand
'''
parser = cmdline.SingleDashOptionParser(usage)
jc_options = [
cmdline.HOST,
]
parser.add_option('-JOBNAME',
dest='jobname',
default='watermap',
help='JOBNAME')
parser.add_option('-HOST',
dest='host',
default='localhost',
help='host name')
parser.add_option('-cpu', dest='cpu', default=1, help='# of cpus')
parser.add_option('-protein',
dest='protein',
default='',
help='protein file name')
parser.add_option('-ligand',
dest='ligand',
default='',
help='ligand file name')
parser.add_option('-retain_ligand',
action='store_true',
dest='retain_ligand',
default=False,
help='to retain ligand')
parser.add_option('-do_not_truncate',
action='store_true',
dest='do_not_truncate',
default=False,
help='do no truncate protein')
parser.add_option('-ligand_distance',
dest='ligand_distance',
default=10.0,
help='binding site definition')
parser.add_option('-truncate_distance',
dest='truncate_distance',
default=20.0,
help='protein truncation distance')
parser.add_option('-treat_solvent',
dest='treat_solvent',
default='as solvent',
help='"as solvent", "as solute", or "delete"')
parser.add_option('-simulation_time',
dest='simulation_time',
default=2.0,
help='simulation time in ns')
parser.add_option('-do_not_return_trajectory',
action='store_true',
dest='do_not_return_trajectory',
default=False,
help='do not return trajectory')
parser.add_option('-solvent_buffer_distance',
dest='solvent_buffer_distance',
default=10.0,
type='float',
action='callback',
callback=print_deprecate,
help="""solvent buffer distance to solvate protein\n
(This option has been DEPRECATED, please use 'ligand_distance' option)"""
)
parser.add_option(
'-do_not_run',
action='store_true',
dest='do_not_run',
default=False,
help='do not run a job and stop after writing input files')
parser.add_option(
'-continuous',
action='callback',
dest='continuous',
default=False,
callback=print_deprecate,
help=
"This option is not needed anymore. Continuous WM runs automatically.")
parser.add_option('-twobody',
action='store_true',
dest='twobody',
default=False,
help='calculate twobody term')
parser.add_option('-forcefield',
dest='forcefield',
default=FFLD14,
help=(FFLD14 + " or " + FFLD16))
parser.add_option('-num_solvate_pocket',
dest='num_solvate_pocket',
default=1,
help='number of solvate_pocket outputs')
parser.add_option(
'-extended_gcmc',
action='store_true',
dest='extended_gcmc',
default=False,
help=
'Turn on extended gcmc protocol. The protocol is set to run more thorough GCMC sampling and generate 20 representative structures. Relaxation and production simulations continue on each output structure to obtain total of 4 ns trajectory and analysis is performed on the trajectory.'
)
# This value is ignored, use_gpu is assumed by the code
parser.add_option('-use_gpu', action='store_true', help=argparse.SUPPRESS)
options, args = parser.parse_args()
main(options)