Source code for schrodinger.application.desmond.starter.generator.bpfep

"""
Binding-pose FEP generator

Copyright Schrodinger, LLC. All rights reserved.
"""
import os
import subprocess
import sys
from typing import List

from schrodinger.application.desmond import cmdline
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import envir
from schrodinger.application.desmond import launch_utils
from schrodinger.application.desmond import struc
from schrodinger.application.desmond import util
from schrodinger.application.desmond.constants import FEP_ABSOLUTE_ENERGY
from schrodinger.application.desmond.constants import FEP_ABSOLUTE_LIGAND
from schrodinger.application.desmond.constants import FEP_FRAGNAME
from schrodinger.application.desmond.constants import FEP_MAPPING
from schrodinger.application.desmond.constants import GCMC_LIGAND
from schrodinger.application.desmond.constants import REST_HOTREGION
from schrodinger.application.desmond.starter import ui
from schrodinger.structutils import analyze


def _reorder_water(st):
    """
    Put water atoms to the end of the structure, and return a new `Structure`
    object. No effects if there is no water, and `st` itself will be returned.
    """
    # FIXME: Replace this function with a public function in mmshare.
    #        DESMOND-9409.
    water_aids = analyze.evaluate_asl(st, 'water')
    if water_aids:
        water_st = st.extract(water_aids)
        st.deleteAtoms(water_aids)
        st = st.merge(water_st)
    return st


def _read_strucs(ref_complex_fname, mut_complex_fname):
    ref_st = struc.read_all_ct(ref_complex_fname)
    mut_st = struc.read_all_ct(mut_complex_fname)

    if 1 != len(ref_st):
        sys.exit(f"ERROR: {ref_complex_fname} must contain 1 structure.")
    if 1 != len(mut_st):
        sys.exit(f"ERROR: {mut_complex_fname} must contain 1 structure.")

    return ref_st[0], mut_st[0]


def _prepare_inpmae(inp_fnames, ligand_asl, mut_ligand_asl, receptor_asl):
    # FIXME: Refactor this function to mmshare?
    from schrodinger.application.scisol.packages.fep.utils import \
        set_atom_mapping_property

    ref_st, mut_st = map(_reorder_water, _read_strucs(*inp_fnames))
    ref_st.property[FEP_FRAGNAME] = "none"
    mut_st.property[FEP_FRAGNAME] = "mutant"

    # Cleans up FEP-related atom properties.
    FEP_ATOM_PROPNAMES = [
        FEP_MAPPING, REST_HOTREGION, FEP_ABSOLUTE_ENERGY, FEP_ABSOLUTE_LIGAND,
        GCMC_LIGAND
    ]
    struc.set_atom_properties(ref_st.atom, FEP_ATOM_PROPNAMES)
    struc.set_atom_properties(mut_st.atom, FEP_ATOM_PROPNAMES)

    # Sets the FEP atom-mapping property.
    ref_matched_atoms = analyze.evaluate_asl(
        ref_st, f"not (({ligand_asl}) or ({receptor_asl}))")
    mut_matched_atoms = analyze.evaluate_asl(
        mut_st, f"not (({mut_ligand_asl}) or ({receptor_asl}))")
    set_atom_mapping_property(ref_st, mut_st, ref_matched_atoms,
                              mut_matched_atoms)

    # Sets the `FEP_ABSOLUTE_LIGAND` property for unmapped ligand atoms. This
    # is for setting the crosslink restraints.
    struc.set_atom_properties(struc.selected_atoms(ref_st, asl=ligand_asl),
                              [FEP_ABSOLUTE_LIGAND], [1])
    struc.set_atom_properties(struc.selected_atoms(mut_st, asl=mut_ligand_asl),
                              [FEP_ABSOLUTE_LIGAND], [1])

    # Sets the `GCMC_LIGAND` property for all unmapped atoms.
    ASL_GCMC_ATOMS = f"a.{FEP_MAPPING} = 0"
    struc.set_atom_properties(struc.selected_atoms(ref_st, asl=ASL_GCMC_ATOMS),
                              [GCMC_LIGAND], [1])
    struc.set_atom_properties(struc.selected_atoms(mut_st, asl=ASL_GCMC_ATOMS),
                              [GCMC_LIGAND], [1])

    return ref_st, mut_st


def _prepare_inpmsj() -> cmj.sea.Map:
    """
    Parse the template .msj file, and edit the workflow per requests.
    Return a `sea.Map` object that contains the msj settings.
    Usage example:

       raw = _prepare_inpmsj()
       raw.stage[0].set_family    # First stage's set_family setting
       raw.stage[1].__NAME__      # Second stage's name, e.g., "simulate"
       raw.stage[1].__CLS__       # Second stage's class, e.g., `stage.Simulate`

    """
    # This function works now as a skeleton for future extensions.
    # Right now, it just parses the template file and return the `sea.Map`
    # object.
    msj_fname = os.path.join(envir.CONST.MMSHARE_DATA_DESMOND_DIR,
                             "bpfep_template.msj")
    return cmj.msj2sea_full(msj_fname)


def _cmd_for_restarting_job(args: ui.bpfep.Args) -> List[str]:
    """
    Return a command for launching a restarted multisim job.
    """
    cpt_fname, raw_restart_index = \
        launch_utils.get_checkpoint_file_and_restart_number(args.checkpoint)
    engine = launch_utils.read_checkpoint_file(cpt_fname)

    restart_index = (raw_restart_index or
                     launch_utils.get_restart_stage_from_engine(engine))
    launch_utils.validate_restart_stage(engine, restart_index)

    jobname = args.JOBNAME or engine.jobname
    cmd = [
        envir.CONST.MULTISIM_EXEC,
        "-HOST",
        args.HOST,
        "-SUBHOST",
        args.SUBHOST,
        "-JOBNAME",
        jobname,
        "-RESTART",
        f"{cpt_fname}:{raw_restart_index}" if raw_restart_index else cpt_fname,
    ]

    if args.maxjob is not None:
        cmd += ["-maxjob", str(args.maxjob)]

    prior_stage = engine.stage[restart_index - 1]
    if prior_stage.NAME in ["trim", "desmond_extend"]:
        prior_stage = engine.stage[restart_index - 2]
    needed_prior_stages = [prior_stage]
    if raw_restart_index is None:
        # We restart the specified stage from where it was interrupted.
        # We also need the data file from this stage (if it's not a trim stage)
        # for restarting.
        prior_stage = engine.stage[restart_index]
        if prior_stage.NAME not in ["trim", "desmond_extend"]:
            needed_prior_stages.append(engine.stage[restart_index])

    prev_jobname = engine.jobname
    for stg in needed_prior_stages:
        cmd += [
            "-d",
            "%s_%d-out%s" % (prev_jobname, stg._INDEX, cmj.PACKAGE_SUFFIX)
        ]

    forcefield = None
    cmd += launch_utils.additional_command_arguments([], args.RETRIES,
                                                     args.WAIT, args.LOCAL,
                                                     args.DEBUG, args.TMPDIR,
                                                     forcefield, args.OPLSDIR,
                                                     args.NICE, args.SAVE)

    return cmd


def _cmd_for_new_job(args: ui.bpfep.Args) -> List[str]:
    """
    Return a command for launching a new multisim job.
    """
    ref_st, mut_st = _prepare_inpmae(args.inp_file, args.ligand_asl,
                                     args.mut_ligand_asl, args.receptor_asl)
    inpmae_fname = args.JOBNAME + "-in.maegz"
    util.write_n_ct(inpmae_fname, [ref_st, mut_st])

    msj = _prepare_inpmsj()
    inpmsj_fname = args.JOBNAME + "-in.msj"
    cmj.write_sea2msj(msj.stage, inpmsj_fname)

    cmd = launch_utils.prepare_command_for_launch(args.HOST,
                                                  args.SUBHOST,
                                                  args.JOBNAME,
                                                  inpmsj_fname,
                                                  args.maxjob,
                                                  input_fname=inpmae_fname)

    stage_data_fnames = []
    forcefield = None
    cmd += launch_utils.additional_command_arguments(
        stage_data_fnames, args.RETRIES, args.WAIT, args.LOCAL, args.DEBUG,
        args.TMPDIR, forcefield, args.OPLSDIR, args.NICE, args.SAVE)

    return cmd


[docs]def generate(args: ui.bpfep.Args) -> List[str]: """ Generate the files and a command to run multisim for the binding-pose FEP workflow. :param args: Command line arguments :return: Command to launch a multisim job """ cmd = _cmd_for_restarting_job(args) if args.RESTART else \ _cmd_for_new_job(args) # Adds extra options. cmd += ['-mode', 'umbrella', '-o', args.JOBNAME + "-out.mae"] print( "Launch command:", subprocess.list2cmdline(cmd).replace(os.environ["SCHRODINGER"], "$SCHRODINGER")) cmd.extend([ "-encoded_description", cmdline.get_b64encoded_str(cmdline.get_job_command_in_startup()), ]) return cmd