Source code for schrodinger.application.watermap.watermap_inp
"""
This is a class that generates watermap input files and runs them through jobcontrol. This also supports all the options in GUI.
Copyright Schrodinger, LLC. All rights reserved.
"""
# Contributors: Byungchan Kim
import argparse
import os
import sys
from typing import Optional
import schrodinger.application.desmond.cmj as cmj
import schrodinger.application.desmond.envir as envir
import schrodinger.application.desmond.smarts as smarts
# stage.py needs to be imported for cmj module to know the stages
import schrodinger.application.desmond.stage as stage  # noqa: F401
import schrodinger.application.desmond.system_builder_util as system_builder_util
import schrodinger.job.jobcontrol as jobcontrol
import schrodinger.structure as structure
import schrodinger.structutils.transform as transform
import schrodinger.utils.cmdline as cmdline
from schrodinger.application.desmond import constants
from schrodinger.infra import mm
FFLD14 = mm.OPLS_NAME_F14
FFLD16 = mm.OPLS_NAME_F16
_SOLVATE_POCKET_TITLE = "GCMC solvate pocket"
# Define water SMARTS patterns
water_smarts = ["[H]O[H]"]
zob_water_smarts = ["[H]O([H])_[*]", "[H]O[H]_[*]"]
[docs]def print_deprecate(option, opt, value, parser):
    print()
    print("%s option has been DEPRECATED. Please check help message." %
          (opt[1:],))
    print()
[docs]class WaterMapInput:
[docs]    def __init__(self, protein_st, ligand_st, **kwargs):
        self._protein = protein_st
        self._ligand = ligand_st
        self._solute = None
        self._crystal_water = None
        self.retain_ligand = False
        self.do_not_truncate = False
        self.ligand_distance = 10.0
        self.truncate_distance = 20.0
        self.treat_solvent = 'as solvent'
        self.simulation_time = 2.0
        self.do_not_return_trajectory = False
        self.solvent_buffer_distance = 5.0
        self.has_membrane = False
        self.turn_off_solvate_pocket = False
        self.twobody = False
        self.forcefield = FFLD16
        self.num_solvate_pocket = 1
        self.extended_gcmc = False
        self.update(**kwargs)
[docs]    def update(self, **kwargs):
        '''
        updates states of options
        '''
        if 'retain_ligand' in kwargs:
            self.retain_ligand = kwargs['retain_ligand']
        if 'do_not_truncate' in kwargs:
            self.do_not_truncate = kwargs['do_not_truncate']
        if 'ligand_distance' in kwargs:
            self.ligand_distance = kwargs['ligand_distance']
        if 'truncate_distance' in kwargs:
            self.truncate_distance = kwargs['truncate_distance']
        if 'simulation_time' in kwargs:
            self.simulation_time = kwargs['simulation_time']
        if 'treat_solvent' in kwargs:
            self.treat_solvent = kwargs['treat_solvent']
        if 'do_not_return_trajectory' in kwargs:
            self.do_not_return_trajectory = kwargs['do_not_return_trajectory']
        if 'solvent_buffer_distance' in kwargs:
            print(
                "solvent_buffer_distance is ignored. 'ligand_distance' should be used to set solvent buffer distance."
            )
        if 'has_membrane' in kwargs:
            self.has_membrane = kwargs['has_membrane']
        if 'turn_off_solvate_pocket' in kwargs:
            self.turn_off_solvate_pocket = kwargs['turn_off_solvate_pocket']
        if 'twobody' in kwargs:
            self.twobody = kwargs['twobody']
        if 'forcefield' in kwargs:
            self.forcefield = kwargs['forcefield']
        if 'num_solvate_pocket' in kwargs:
            self.num_solvate_pocket = kwargs['num_solvate_pocket']
        if 'extended_gcmc' in kwargs:
            self.extended_gcmc = kwargs['extended_gcmc']
        if self.do_not_truncate:
            self.truncate_distance = 0.0
[docs]    def reorder_solute(self, st):
        '''
        reorder atoms so that the very first atom is located close to origin to prevent solute to be shifted to other images.
        '''
        new_st = st.copy()
        reorder_array = [0]
        for i in range(self._solute.atom_total):
            reorder_array.append(i + 1)
        min_dist = 1.0E10
        min_index = 0
        for a in self._solute.atom:
            if "b_wmap_ligand" in a.property and a.property["b_wmap_ligand"]:
                continue
            dist = a.x * a.x + a.y * a.y + a.z * a.z
            if dist < min_dist:
                min_dist = dist
                min_index = a.index
        reorder_array[1] = min_index
        reorder_array[min_index] = 1
        mm.mmct_ct_reorder(new_st, st, reorder_array)
        return new_st
[docs]    def createTip4pFfio(self, ct):
        mm.mmffio_initialize(mm.MMERR_DEFAULT_HANDLER)
        mmshare_exec = os.environ.get('MMSHARE_EXEC')
        # FIXME this will not work on Windows:
        mmshare_data = mmshare_exec + '/../../data/system_builder'
        tip4p_fname = os.path.join(mmshare_data, 'tip4p.box.mae')
        st_reader = structure.StructureReader(tip4p_fname)
        st = next(st_reader)
        ff = mm.mmffio_ff_mmct_get(st.handle)
        mm.mmffio_ff_mmct_put(ff, ct)
        mm.mmffio_terminate()
[docs]    def make_canonical_solvent(self, st):
        '''
        Fix atomic order to O H H
        '''
        new_st = st.copy()
        reorder = [0]
        for m in st.molecule:
            for i in m.getAtomIndices():
                reorder.append(i)
        mm.mmct_ct_reorder(new_st, st, reorder)
        return new_st
[docs]    def prepareStructures(self):
        '''
        Prepare -solute, -protein, -ligand.mae files.
        '''
        # tag ligand atoms when it is retained
        if self.retain_ligand:
            for a in self._ligand.atom:
                a.property["b_wmap_ligand"] = True
        st = system_builder_util.truncateProtein(self._protein, self._ligand,
                                                 self.retain_ligand,
                                                 self.truncate_distance)
        if self.truncate_distance > 0.0:
            for a in st.atom:
                if "b_wmap_ligand" in a.property and a.property["b_wmap_ligand"]:
                    continue
                else:
                    a.property[constants.FF_USE_EXISTING_CHARGE] = 1
        if self.retain_ligand:
            st.property["b_wmap_retain_ligand"] = True
        # Allow some structural waters to be kept when running WaterMap [Ev:72892]
        st.property[constants.CT_TYPE] = constants.CT_TYPE.VAL.SOLUTE
        self._ligand.property[constants.CT_TYPE] = constants.CT_TYPE.VAL.LIGAND
        self._protein.property[
            constants.CT_TYPE] = constants.CT_TYPE.VAL.RECEPTOR
        water_atoms = smarts.evaluate_net_smarts(st, water_smarts,
                                                 zob_water_smarts)
        if len(water_atoms):
            # Separate water to crystal water CT
            tmp_water_st = st.extract(water_atoms)
            self._crystal_water = self.make_canonical_solvent(tmp_water_st)
            self._crystal_water.property['i_ffio_num_component'] = 1
            if self.treat_solvent == 'as solvent':
                self._crystal_water.property[
                    constants.CT_TYPE] = constants.CT_TYPE.VAL.CRYSTAL_WATER
                self.createTip4pFfio(self._crystal_water.handle)
                for a in self._crystal_water.atom:
                    a.pdbres = 'T4P '
                # remove water from protein
                del_atoms = smarts.evaluate_net_smarts(self._protein,
                                                       water_smarts,
                                                       zob_water_smarts)
                self._protein.deleteAtoms(del_atoms)
                # remove water from solute
                st.deleteAtoms(water_atoms)
            # Restrain solute water
            elif self.treat_solvent == 'as solute':
                self._crystal_water = None
            # Remove water from solute and protein
            elif self.treat_solvent == 'delete':
                self._crystal_water = None
                # remove water from protein
                del_atoms = smarts.evaluate_net_smarts(self._protein,
                                                       water_smarts,
                                                       zob_water_smarts)
                self._protein.deleteAtoms(del_atoms)
                # remove water from solute
                st.deleteAtoms(water_atoms)
        # set all solute waters including zero-order bonded waters
        # with T3P resname and restraint
        water_atoms = smarts.evaluate_net_smarts(st, water_smarts, [])
        for i in water_atoms:
            a = st.atom[i]
            a.pdbres = 'T3P '
            if a.atomic_number > 1:
                a.property['i_wmap_restrain'] = 1
        self._solute = st
    def _writeStructures(self, jobname):
        '''
        write structures as mae files
        '''
        if not self._solute:
            try:
                self.prepareStructures()
            except Exception:
                raise
        if (not self._protein) or (self._protein.atom_total == 0):
            raise RuntimeError('Protein atoms do not exist.')
        if (not self._ligand) or (self._ligand.atom_total == 0):
            raise RuntimeError('Ligand atoms do not exist.')
        centroid = [0.0, 0.0, 0.0]
        if not self.has_membrane:
            centroid = transform.get_centroid(self._ligand)
        x = -centroid[0]
        y = -centroid[1]
        z = -centroid[2]
        transform.translate_structure(self._ligand, x, y, z)
        self._ligand.title = "sampling_coordinates"
        self._ligand.property['r_watermap_trans1'] = -x
        self._ligand.property['r_watermap_trans2'] = -y
        self._ligand.property['r_watermap_trans3'] = -z
        transform.translate_structure(self._solute, x, y, z)
        self._solute.property['r_watermap_trans1'] = -x
        self._solute.property['r_watermap_trans2'] = -y
        self._solute.property['r_watermap_trans3'] = -z
        if self._crystal_water:
            transform.translate_structure(self._crystal_water, x, y, z)
            self._crystal_water.property['r_watermap_trans1'] = -x
            self._crystal_water.property['r_watermap_trans2'] = -y
            self._crystal_water.property['r_watermap_trans3'] = -z
        if (not self._solute) or (self._solute.atom_total == 0):
            raise RuntimeError(
                'Truncation distance is too short to include any protein atoms. Please increase the truncation distance.'
            )
        self._solute = self.reorder_solute(self._solute)
        self._solute.write(self.in_fname)
        self._ligand.append(self.in_fname)
        self._protein.append(self.in_fname)
        if self._crystal_water:
            self._crystal_water.append(self.in_fname)
[docs]    def writeMSJ(self) -> Optional[str]:
        """
        Write multisim file to the returned str.
        If there was a problem generating the msj, return None.
        """
        # TODO should we support extended gcmc in the future or remove this
        # entirely?
        assert not self.extended_gcmc, "Extended GCMC is currently not " \
                                       "supported"
        if self.extended_gcmc:
            template_fname = os.path.join(envir.CONST.MMSHARE_DATA_DESMOND_DIR,
                                          "watermap_extended_template.msj")
        else:
            template_fname = os.path.join(envir.CONST.MMSHARE_DATA_DESMOND_DIR,
                                          "watermap_gpu_template.msj")
        with open(template_fname) as f:
            s = f.read()
        s = s.replace('$SIMULATION_TIME', str(self.simulation_time * 1000.0))
        s = s.replace('$LIGAND_DISTANCE', str(self.ligand_distance))
        all_stages = cmj.parse_msj(None, msj_content=s)
        water_model = None
        for stg in all_stages:
            if stg.NAME == "system_builder" or stg.NAME == "build_geometry":
                if self.has_membrane:
                    stg.param.should_skip.add_tag("setbyuser")
                    stg.param.should_skip.val = True
                if self.treat_solvent == "as solute":
                    stg.param.distil_solute.add_tag("setbyuser")
                    stg.param.distil_solute.val = False
            if stg.NAME == "build_geometry" and self.forcefield != FFLD14:
                stg.param.override_forcefield.add_tag("setbyuser")
                stg.param.override_forcefield.val = self.forcefield
            if stg.NAME == "assign_forcefield":
                if self.forcefield != FFLD14:
                    stg.param.forcefield.add_tag("setbyuser")
                    stg.param.forcefield.val = self.forcefield
                water_model = stg.param.water.val
            # the GCMC solvate_pocket replacement cannot be identified by stg.NAME
            # which is simply "simulate"
            if stg.param.title.val == _SOLVATE_POCKET_TITLE:
                if self.turn_off_solvate_pocket:
                    stg.param.should_skip.add_tag("setbyuser")
                    stg.param.should_skip.val = True
                else:
                    water_model = water_model or 'SPC'
                    timestep = int(stg.param.timestep.val[0] * 1000)
                    key = (water_model, timestep)
                    if key not in constants.GCMC_CHEMICAL_POTENTIALS:
                        print(
                            f"ERROR: Water model {water_model} not calibrated for GCMC at a {timestep} fs timestep. "
                            "Turn off solvate pocket to run with this model.")
                        return None
                    mu_excess, solvent_density = constants.GCMC_CHEMICAL_POTENTIALS[
                        key]
                    stg.param.gcmc.mu_excess.val = mu_excess
                    stg.param.gcmc.mu_excess.add_tag("setbyuser")
                    stg.param.gcmc.solvent_density.val = solvent_density
                    stg.param.gcmc.solvent_density.add_tag("setbyuser")
            if stg.NAME == "watermap_post_analysis" and self.do_not_return_trajectory:
                stg.param.do_not_return_trajectory.add_tag("setbyuser")
                stg.param.do_not_return_trajectory.val = True
            if stg.NAME == "watermap_cluster":
                if self.twobody:
                    stg.param.twobody.add_tag("setbyuser")
                    stg.param.twobody.val = 1
            if stg.NAME == "simulate" and self.extended_gcmc:
                try:
                    stg.param.backend.mdsim.plugin.SpatialActiveSite  # production stage
                    stg.param.time.val = 200.0
                except AttributeError:
                    pass
        s = cmj.write_msj(all_stages, to_str=True)
        return s
[docs]    def write(self, jobname, suffix="maegz", hostname=None, cpus=None):
        """
        Call all write functions
        :param jobname: The name of the job in the command example
        :type jobname: str
        :param suffix: The file extension for the input file
        :type suffix: str
        :param hostname: The name of the host in the command example
        :type hostname: str
        :param cpus: The number of cpus in the command example
        :type cpus: int
        """
        self.in_fname = jobname + "-in." + suffix
        self.msj_fname = jobname + '.msj'
        self._writeStructures(jobname)
        host_and_cpus_args = ""
        if hostname and cpus:
            host_and_cpus_args = " -HOST %s -cpu %s" % (hostname, str(cpus))
        s = self.writeMSJ()
        if not s:
            raise RuntimeError("Could not generate msj for job submission.")
        s += '# command example:\n'
        s += '# $SCHRODINGER/watermap -JOBNAME %s%s -m %s %s\n\n' % (
            jobname, host_and_cpus_args, self.msj_fname, self.in_fname)
        fh = open(self.msj_fname, "w")
        print(s, file=fh)
        fh.close()
[docs]    def run(self, jobname, host, cpu):
        '''
        Run WaterMap Job
        '''
        watermap_cmd = os.path.join(os.environ["SCHRODINGER"], "watermap")
        cmd_line = [
            watermap_cmd,
            "-JOBNAME",
            jobname,
            "-HOST",
            host,
            "-cpu",
            str(cpu),
            "-m",
            self.msj_fname,
            self.in_fname,
        ]
        job = jobcontrol.launch_job(cmd_line)
        if not job:
            raise RuntimeError(
                "Desmond submission failed. Please check the accessibility of the cluster machine and of Desmond engine over there."
            )
        return job
[docs]def main(opt):
    if not os.path.exists(opt.protein):
        print('%s does not exists.' % opt.protein)
        print('Use -protein to specify a protein file name.')
        sys.exit(1)
    if not os.path.exists(opt.ligand):
        print('Ligand, %s,  does not exists.' % opt.ligand)
        print('Use -ligand to specify a ligand file name.')
        sys.exit(1)
    if opt.extended_gcmc:
        print('WaterMap does not currently support the extended_gcmc option.')
        sys.exit(1)
    protein_st = None
    for st in structure.StructureReader(opt.protein):
        if protein_st:
            protein_st.extend(st)
        else:
            protein_st = st
    ligand_st = None
    for st in structure.StructureReader(opt.ligand):
        if ligand_st:
            ligand_st.extend(st)
        else:
            ligand_st = st
    wm_inp = WaterMapInput(
        protein_st,
        ligand_st,
        retain_ligand=opt.retain_ligand,
        do_not_truncate=opt.do_not_truncate,
        ligand_distance=opt.ligand_distance,
        truncate_distance=opt.truncate_distance,
        treat_solvent=opt.treat_solvent,
        simulation_time=opt.simulation_time,
        do_not_return_trajectory=opt.do_not_return_trajectory,
        turn_off_solvate_pocket=opt.turn_off_solvate_pocket,
        twobody=opt.twobody,
        forcefield=opt.forcefield,
        num_solvate_pocket=opt.num_solvate_pocket,
        extended_gcmc=opt.extended_gcmc)
    jobname = opt.jobname
    if not jobname:
        jobname = 'watermap'
    host_list = jobcontrol.get_command_line_host_list()
    host = host_list[0][0]
    if (not host):
        host = 'localhost'
    wm_inp.write(jobname)
    if not opt.do_not_run:
        job = wm_inp.run(jobname, opt.host, opt.cpu)
        print("   JobId  : %s" % job.job_id)
if (__name__ == "__main__"):
    usage = '''
  $SCHRODINGER/run -FROM watermap %prog -protein protein.mae -ligand ligand.mae [options]
Description:
  %prog prepares and submits WaterMap Jobs
Example to retain ligand:
  %prog -JOBNAME watermap_test -protein protein.mae -ligand ligand.mae -retain_ligand
'''
    parser = cmdline.SingleDashOptionParser(usage)
    jc_options = [
        cmdline.HOST,
    ]
    parser.add_option('-JOBNAME',
                      dest='jobname',
                      default='watermap',
                      help='JOBNAME')
    parser.add_option('-HOST',
                      dest='host',
                      default='localhost',
                      help='host name')
    parser.add_option('-cpu', dest='cpu', default=1, help='# of cpus')
    parser.add_option('-protein',
                      dest='protein',
                      default='',
                      help='protein file name')
    parser.add_option('-ligand',
                      dest='ligand',
                      default='',
                      help='ligand file name')
    parser.add_option('-retain_ligand',
                      action='store_true',
                      dest='retain_ligand',
                      default=False,
                      help='to retain ligand')
    parser.add_option('-do_not_truncate',
                      action='store_true',
                      dest='do_not_truncate',
                      default=False,
                      help='do no truncate protein')
    parser.add_option('-ligand_distance',
                      dest='ligand_distance',
                      default=10.0,
                      help='binding site definition')
    parser.add_option('-truncate_distance',
                      dest='truncate_distance',
                      default=20.0,
                      help='protein truncation distance')
    parser.add_option('-treat_solvent',
                      dest='treat_solvent',
                      default='as solvent',
                      help='"as solvent", "as solute", or "delete"')
    parser.add_option('-simulation_time',
                      dest='simulation_time',
                      default=2.0,
                      help='simulation time in ns')
    parser.add_option('-do_not_return_trajectory',
                      action='store_true',
                      dest='do_not_return_trajectory',
                      default=False,
                      help='do not return trajectory')
    parser.add_option('-solvent_buffer_distance',
                      dest='solvent_buffer_distance',
                      default=10.0,
                      type='float',
                      action='callback',
                      callback=print_deprecate,
                      help="""solvent buffer distance to solvate protein\n
                               (This option has been DEPRECATED, please use 'ligand_distance' option)"""
                     )
    parser.add_option(
        '-do_not_run',
        action='store_true',
        dest='do_not_run',
        default=False,
        help='do not run a job and stop after writing input files')
    parser.add_option(
        '-continuous',
        action='callback',
        dest='continuous',
        default=False,
        callback=print_deprecate,
        help=
        "This option is not needed anymore. Continuous WM runs automatically.")
    parser.add_option('-twobody',
                      action='store_true',
                      dest='twobody',
                      default=False,
                      help='calculate twobody term')
    parser.add_option('-forcefield',
                      dest='forcefield',
                      default=FFLD14,
                      help=(FFLD14 + " or " + FFLD16))
    parser.add_option('-num_solvate_pocket',
                      dest='num_solvate_pocket',
                      default=1,
                      help='number of solvate_pocket outputs')
    parser.add_option(
        '-extended_gcmc',
        action='store_true',
        dest='extended_gcmc',
        default=False,
        help=
        'Turn on extended gcmc protocol. The protocol is set to run more thorough GCMC sampling and generate 20 representative structures. Relaxation and production simulations continue on each output structure to obtain total of 4 ns trajectory and analysis is performed on the trajectory.'
    )
    # This value is ignored, use_gpu is assumed by the code
    parser.add_option('-use_gpu', action='store_true', help=argparse.SUPPRESS)
    options, args = parser.parse_args()
    main(options)