Source code for schrodinger.application.glide.ligand_designer
"""
This module provides the APIs behind the Ligand Designer panel and workflow.
It combines binding site Phase hypothesis generation with R-group enumeration
and Glide grid generation and docking. The docking uses a Glide HTTP server for
speed.
"""
import glob
import hashlib
import json
import os
from pathlib import Path
import schrodinger
from schrodinger import structure
from schrodinger.application.glide import glide
from schrodinger.application.glide import http_client
from schrodinger.application.glide import utils as glide_utils
from schrodinger.models import parameters
from schrodinger.Qt import QtCore
from schrodinger.tasks import jobtasks
from schrodinger.tasks import tasks
from schrodinger.utils import fileutils
from schrodinger.utils import log
from schrodinger.utils import mmutil
# Maximum time in seconds to wait for Glide grid generation to finish
GRIDGEN_WAIT = 600
# Default number of workers
DEFAULT_NUM_WORKERS = 2
logger = log.get_output_logger(__file__)
if schrodinger.in_dev_env():
    logger.setLevel(log.DEBUG)
REFLIG_NAME = 'reflig.maegz'
GRIDFILE_NAME = 'grid.grd'
JOBDIR_SEP = '-'
[docs]def num_glide_workers():
    """
    Determines number of workers from environment variable. If number of workers is
    not an integer or <= 0 we use default value of 2.
    :return: number of workers
    :rtype: int
    """
    try:
        nworkers = int(os.environ.get('LIGAND_DESIGNER_PROCESSORS', '2'))
        if nworkers <= 0:
            nworkers = DEFAULT_NUM_WORKERS
    except ValueError:
        nworkers = DEFAULT_NUM_WORKERS
    logger.debug(f'Using {nworkers} Glide server workers')
    return nworkers 
[docs]class GridgenRunningException(RuntimeError):
[docs]    def __init__(self, message="Grid generation is still running"):
        super().__init__(message)  
[docs]class BuildServerTask(tasks.BlockingFunctionTask):
    """
    Task to set up and start a glide server. The server performs
    core-constrained Glide docking.
    The server state and intermediate files are in a scratch
    directory. A unique subdirectory is created for each ligand-receptor
    complex; if another object is created for the same complex, it will share
    the same directory. This allows the reuse of existing grid files, for
    example. However, only one object at a time can be performing an
    enumeration because the underlying Glide server process is single-threaded.
    :ivar gridJobStarted: Signal when grid job has launched.  Emitted with task
    """
    gridJobStarted = QtCore.pyqtSignal(jobtasks.CmdJobTask)
    gg_task = parameters.NonParamAttribute()
[docs]    class Output(parameters.CompoundParam):
        server: http_client.NonBlockingGlideServerManager = None 
[docs]    def initConcrete(self, tmpdir=None, *args, **kwargs):
        """
        :param tmpdir: Base temporary directory for server directories
        """
        super().initConcrete(*args, **kwargs)
        self._tmpdir = tmpdir
        self.gg_task = None 
    @tasks.preprocessor(order=tasks.BEFORE_TASKDIR)
    def _initGridgenTask(self):
        if not self.input.start_gridgen or self.gg_task is not None:
            return
        self.gg_task = GridgenTask()
        self.gg_task.input.ligand_st = self.input.ligand_st
        self.gg_task.input.receptor_st = self.input.receptor_st
    @tasks.preprocessor(order=tasks.BEFORE_TASKDIR + 1)
    def _findTaskDir(self):
        """
        Check existing task dirs to see if any have a grid compatible with the
        current ligand_st and receptor_st
        """
        if self.taskDirSetting() is not self.DEFAULT_TASKDIR_SETTING:
            # Skip if we have already set a taskdir
            return
        recepname = fileutils.get_jobname(self.input.receptor_st.title)
        signature = get_structure_digest(self.input.receptor_st)
        jobname = '-'.join(['ld', recepname, signature])
        tmpdir = self._tmpdir or fileutils.get_directory_path(fileutils.TEMP)
        jobdir_stem = Path(tmpdir) / 'ligand_designer' / jobname
        jobdir_stem = jobdir_stem.absolute()
        jobdir = self._checkExistingJobDirs(jobdir_stem)
        if not jobdir:
            jobdir = fileutils.get_next_filename(str(jobdir_stem), JOBDIR_SEP)
        jobdir = Path(jobdir)
        logger.debug('Jobdir: %s', jobdir)
        self.specifyTaskDir(jobdir)
        self.gg_task.specifyTaskDir(jobdir)
    def _checkExistingJobDirs(self, jobdir_stem):
        dir_pattern = f"{jobdir_stem}{JOBDIR_SEP}*"
        for jobdir in glob.iglob(dir_pattern):
            self.gg_task.specifyTaskDir(jobdir)
            if self.gg_task.checkGridfile() or self.gg_task.checkReflig():
                return jobdir
[docs]    def mainFunction(self):
        self.output.reset()
        gg_task = self.gg_task
        grid_ok = gg_task.checkGridfile()
        if self.input.start_gridgen:
            if not grid_ok:
                gg_task.start()
                self.gridJobStarted.emit(gg_task)
            else:
                gg_task.updateReflig()
        elif grid_ok:
            self._startServer()
        else:
            # Fail if the gridfile isn't acceptable but we aren't supposed to
            # run gridgen
            if gg_task.status is gg_task.RUNNING:
                exc = GridgenRunningException()
            else:
                exc = RuntimeError("Gridgen failed")
            self._recordFailure(exc) 
    def _startServer(self):
        gg_task = self.gg_task
        docking_keywords = {
            'PRECISION': 'HTVS',
            'GRIDFILE': gg_task.getTaskFilename(GRIDFILE_NAME),
            'REF_LIGAND_FILE': gg_task.getTaskFilename(REFLIG_NAME),
            'CORE_DEFINITION': 'mcssmarts',
            'CORE_RESTRAIN': True,
            'CORECONS_FALLBACK': True,
            'WRITE_RES_INTERACTION': True,
        }
        docking_keywords.update(self.input.docking_keywords)
        kwargs = dict(
            jobdir=self.getTaskDir(),
            use_jc=False,
        )
        if kwargs['use_jc']:
            # Only set jobname for jobcontrol to allow reattaching
            kwargs['jobname'] = 'glide_server'
        if mmutil.feature_flag_is_enabled(mmutil.FAST_LIGAND_DESIGNER):
            ServerCls = http_client.NonBlockingGlideServerManagerZmq
            kwargs['nworkers'] = num_glide_workers()
        else:
            ServerCls = http_client.NonBlockingGlideServerManager
            kwargs['timeout'] = 1200  # 20 minutes
        server = ServerCls(docking_keywords, **kwargs)
        ready = server.isReady()  # Check whether server is already running
        if not ready:
            try:
                server.start()
            except Exception as exc:
                self._recordFailure(exc)
                return
            else:
                ready = True
        if ready:
            self.output.server = server 
[docs]class GridgenTask(jobtasks.CmdJobTask):
    """
    Task to run glide grid generation.
    :cvar RUNNING_MESSAGE: Message for RUNNING status
    :cvar FAILED_MESSAGE: Message for FAILED status
    """
    RUNNING_MESSAGE = 'Generating grid...'
    FAILED_MESSAGE = 'Grid generation failed'
    infile = parameters.NonParamAttribute()
[docs]    class Output(jobtasks.CmdJobTask.Output):
        gridfile: str = None 
[docs]    def initConcrete(self):
        super().initConcrete()
        self.name = os.path.splitext(GRIDFILE_NAME)[0]
        self.infile = self.name + ".in" 
    @tasks.preprocessor(order=tasks.AFTER_TASKDIR)
    def _writeInputs(self):
        logger.debug("Writing gridgen input files")
        self._writeReflig()
        pvfile = self.name + "_in.maegz"
        with structure.StructureWriter(self.getTaskFilename(pvfile)) as writer:
            writer.append(self.input.receptor_st)
            writer.append(self.input.ligand_st)
        keywords = {
            'RECEP_FILE': pvfile,
            'LIGAND_INDEX': 2,
            'GRIDFILE': GRIDFILE_NAME,
        }
        glide_job = glide.get_glide_job(keywords)
        infile = self.getTaskFilename(self.infile)
        glide_job.writeSimplified(infile)
[docs]    def makeCmd(self):
        return ['$SCHRODINGER/glide', self.infile] 
    @tasks.postprocessor
    def _checkOutput(self):
        gridfile = self.getTaskFilename(GRIDFILE_NAME)
        if not os.path.isfile(gridfile):
            return False, "Gridfile not found"
        self.output.gridfile = gridfile
[docs]    def checkGridfile(self) -> bool:
        """
        Return whether the specified taskdir contains a gridfile compatible
        with the input ligand.
        """
        taskdir = self.taskDirSetting()
        if not isinstance(taskdir, (str, Path)):
            raise ValueError("Can only be used with a specified taskdir")
        elif not os.path.exists(taskdir):
            raise ValueError("Can only be used with an existing taskdir")
        self._createTaskDir()  # will be a no-op but allows calling getTaskDir
        return self._checkGridfile() 
    def _checkGridfile(self) -> bool:
        """
        Check whether the taskdir contains a gridfile compatible with the input
        ligand. If so, writes the ref ligand file if it's missing.
        """
        grid_file = self.getTaskFilename(GRIDFILE_NAME)
        if (glide_utils.check_required_gridfiles(grid_file) and
                glide_utils.is_grid_good_for_ligand(grid_file,
                                                    self.input.ligand_st)):
            self._writeReflig(overwrite=False)
            return True
        return False
[docs]    def checkReflig(self) -> bool:
        """
        Whether the ref ligand file is equivalent to the input ligand
        """
        ref_lig_file = self.getTaskFilename(REFLIG_NAME)
        if os.path.isfile(ref_lig_file):
            ref_lig_st = structure.Structure.read(ref_lig_file)
            if ref_lig_st.isEquivalent(self.input.ligand_st):
                return True
        return False 
    def _writeReflig(self, overwrite: bool = True):
        ref_lig_file = self.getTaskFilename(REFLIG_NAME)
        if overwrite or not os.path.isfile(ref_lig_file):
            self.input.ligand_st.write(ref_lig_file)
[docs]    def updateReflig(self):
        """
        Overwrite the ref ligand if it isn't equivalent to the input ligand
        """
        if not self.checkReflig():
            self._writeReflig()  
[docs]def read_json_file(filename):
    """
    Read a JSON file. If there are issues reading it (doesn't exist, syntax
    errors...) quietly return an empty dict.
    :type filename: str
    :rtype: object
    """
    try:
        with open(filename) as fh:
            return json.load(fh)
    except (IOError, ValueError):
        return {} 
[docs]def md5sum(input_str):
    """
    MD5 hex digest of a string.
    :type input_str: str
    :rtype: str
    """
    m = hashlib.md5()
    m.update(input_str.encode('utf-8'))
    return m.hexdigest() 
[docs]def get_structure_digest(st, length=8):
    """
    Return an abbreviated MD5 hex digest given a Structure, considering only
    element, formal charge, and XYZ coordinates.
    :param st: input structure (not modified)
    :type st: schrodinger.structure.Structure
    :param length: digest length in hex digits
    :type length: int
    :return: hex digest
    :rtype: str
    """
    receptor_str = '\n'.join(
        '{} {:d} {:.3} {:.3} {:.3}'.format(a.element, a.formal_charge, *a.xyz)
        for a in st.atom)
    return md5sum(receptor_str)[:length]