"""
Glide HTTP client
This module implements objects that connect to a Glide HTTP server (see
schrodinger.application.glide.http_server) and use the server to dock
ligands remotely.
Sample usage::
    from schrodinger import structure
    from schrodinger.application.glide import http_client
    client = http_client.HTTPClient(host='localhost', port=8000)
    ct = structure.Structure.read('mylig.mae')
    poses = client.dock(ct)
    for pose in poses:
        print("gscore=%f" % pose.properties['r_i_glide_gscore'])
For a higher level API that also starts up and monitors the server itself,
see `GlideServerManager`.
"""
import http
import itertools
import json
import os
import random
import socket
import sys
import time
import urllib
import uuid
import zlib
from pathlib import Path
import backoff
import zmq
from schrodinger import structure
from schrodinger.application.glide import glide
from schrodinger.job import files as jobfiles
from schrodinger.job import jobcontrol
from schrodinger.Qt import QtCore
from schrodinger.Qt import QtNetwork
from schrodinger.utils import fileutils
from schrodinger.utils import log
from schrodinger.utils import subprocess
logger = log.get_output_logger(__file__)
# Maximum time in seconds to wait for the Glide server to be ready.
MAX_WAIT = 60
GLIDE = os.path.join(os.environ['SCHRODINGER'], 'glide')
RUN = os.path.join(os.environ['SCHRODINGER'], 'run')
SERVER_FINISHED_MESSAGE = "Server timed out while waiting for ligands"
SERVER_NOT_READY_MESSAGE = "Glide server was not ready"
[docs]class ServerTimedOutError(RuntimeError):
    """
    Exception raised if the Glide server is not ready after `MAX_WAIT`
    """ 
[docs]class GlideResult:
    """
    Sequence-like object containing the poses returned by Glide for one ligand.
    When no poses were produced, the reason is available in the .message
    property.
    """
[docs]    def __init__(self, poses, message=''):
        """
        :param poses: poses returned by glide (may be empty)
        :type poses: list of schrodinger.structure.Structure
        :param message: status message explaining why no poses were returned
        :type message: str
        """
        self.poses = poses
        self.message = message 
[docs]    def __len__(self):
        return len(self.poses) 
    def __getitem__(self, idx):
        return self.poses[idx]
    def __iter__(self):
        return GlideResultIterator(self.poses, self.message)
[docs]    def toJson(self):
        """
        Return a JSON representation of the docking result.
        :return: JSON
        :rtype: str
        """
        m2io = ''.join(structure.write_ct_to_string(st) for st in self.poses)
        obj = {'message': self.message, 'poses': m2io}
        return json.dumps(obj) 
[docs]    @classmethod
    def fromJson(cls, json_str):
        """
        Construct a GlideResult from its JSON representation.
        :param json_str: JSON representation
        :type json_str: str
        :return: GlideResult
        :rtype: GlideResult
        """
        obj = json.loads(json_str)
        if obj['poses']:
            poses = list(structure.MaestroReader('', input_string=obj['poses']))
        else:
            poses = []
        return cls(poses, obj['message'])  
[docs]class GlideResultIterator:
    """
    Iterator over the poses returned by Glide for one ligand. When no poses were
    produced, the reason is available in the .message property.
    """
[docs]    def __init__(self, poses, message=''):
        """
        :param poses: poses returned by glide (may be empty)
        :type poses: list of schrodinger.structure.Structure
        :param message: status message explaining why no poses were returned
        :type message: str
        """
        self.poses = poses
        self.message = message
        self._iter = iter(poses) 
    def __next__(self):
        return next(self._iter)
    def __iter__(self):
        return self
[docs]    def asGlideResult(self):
        """
        Convert the iterator back into a GlideResult.
        :return: GlideResult
        :rype: GlideResult
        """
        return GlideResult(self.poses, self.message)  
[docs]class AbstractHTTPClient:
    """
    Interface for connecting to a Glide HTTP server.
    """
[docs]    def dock(self, ct):
        """
        Dock the ligand in Structure object `ct` using the remote Glide server.
        """
        raise NotImplementedError() 
[docs]    def shutdown_server(self):
        """
        Ask the Glide HTTP server to shut down.
        """
        raise NotImplementedError() 
[docs]    @staticmethod
    def ct_to_multipart(ct):
        """
        Encode a CT in multipart/form-data format, ready to POST.
        :param ct: Structure to encode
        :type ct: structure.Structure
        :return: The body of the request and the boundary
        :rtype: tuple(str, str)
        """
        cts = structure.write_ct_to_string(ct)
        boundary = "----Boundary%d" % random.randint(0, 100000)
        while boundary in cts:  # unlikely, but you never know...
            boundary += "%d" % random.randint(0, 100000)
        body = '--%s\r\n' \
                
'Content-Disposition: form-data; name="lig"; ' \
                    
'filename="lig.mae"\r\n' \
                
'Content-Type: application/octet-stream\r\n\r\n' \
                
'%s--%s--\r\n' % (boundary, cts, boundary)
        return body, boundary  
[docs]class HTTPClient(AbstractHTTPClient):
    """
    This class provides an API to connect to an existing Glide HTTP server.
    For a higher level API that also starts up and monitors the server itself,
    see `GlideServerManager`.
    """
[docs]    def __init__(self, con=None, host="localhost", port=8000, timeout=1000):
        """
        Initialize a new HTTPClient object. The optional 'con'
        is an existing httplib.HTTPConnection object. If not provided,
        then 'host', 'port', and 'timeout' will be used to create one.
        The default timeout value is very large to make sure that it is
        enough for most docking jobs to finish.
        """
        if con:
            self.con = con
        else:
            self.con = http.client.HTTPConnection(host, port, timeout=timeout) 
[docs]    def dock(self, ct):
        """
        Dock the ligand in Structure object 'ct' using the remote Glide server.
        :param ct: input ligand
        :type ct: schrodinger.structure.Structure
        :return: docking results as an iterator
        :rtype: GlideResultIterator
        """
        # This method returns an iterator for backward compatibility; it used to
        # return a MaestroReader, and some callers expect to be able to call
        # next() on it. The higher-level version in the newer GlideServerManager
        # class returns the sequence-like GlideResult, which is more versatile.
        return self._postLigand('/dock_ligand', ct) 
[docs]    def setReferenceLigand(self, ct):
        """
        Tell the server to use a new reference ligand when docking subsequent
        ligands. (This only has an effect if the job started with a reference
        ligand, for example when the job uses core constraints.)
        :param ct: new reference ligand
        :type ct: schrodinger.structure.Structure
        :return: Glide results iterator (empty, but with an "Updated reflig"
                 message)
        :rtype: GlideResultIterator
        """
        return self._postLigand('/set_reflig', ct) 
    def _postLigand(self, path, ct):
        body, boundary = self.ct_to_multipart(ct)
        self.con.request(
            "POST",
            path,
            body,
            headers={
                "Content-type": "multipart/form-data; boundary=%s" % boundary
            })
        return self._processResponse()
[docs]    def dockSmiles(self, smiles):
        """
        Dock a ligand from SMILES. For best results, the server should have been
        launched with the LIGPREP keyword enabled.
        :param  smiles: ligand SMILES
        :type smiles: str
        :return: docking results as an iterator
        :rtype: GlideResultIterator
        """
        qs = urllib.parse.urlencode({'smiles': smiles})
        self.con.request("GET", "/dock_smiles?" + qs)
        return self._processResponse() 
    def _processResponse(self):
        res = self.con.getresponse()
        if res.status != http.HTTPStatus.OK:
            raise IOError("Bad response %d %s" % (res.status, res.reason))
        body = res.read().decode('utf-8')
        return iter(GlideResult.fromJson(body))
[docs]    def shutdown_server(self):
        """
        Ask the Glide HTTP server to shut down.
        """
        self.con.request("GET", "/shutdown")
        res = self.con.getresponse()
        if res.status != http.HTTPStatus.OK:
            raise IOError("Bad response %d %s" % (res.status, res.reason))  
[docs]class NonBlockingHTTPClient(AbstractHTTPClient, QtCore.QObject):
    """
    Class for connecting to a Glide HTTP server and docking poses without
    blocking.
    :ivar posesDocked: Signal emitted when a ligand finishes docking. Emitted
        with a list of pose structures.
    :ivar noPosesDocked: Signal emitted when a ligand goes through the docking
        workflow, but does not return any valid poses
    :ivar errorOccurred: Signal emitted when there is an error communicating
        with the server.
    :ivar finished: Signal emitted when an HTTP request finishes. Note: aliased
        from self.manager.finished
    """
    posesDocked = QtCore.pyqtSignal(list)
    noPosesDocked = QtCore.pyqtSignal()
    errorOccurred = QtCore.pyqtSignal(str)
[docs]    def __init__(self, host=None, port=8000):
        """
        :param host: Hostname for server
        :type host: str
        :param port: Port for server
        :type port: int
        """
        super().__init__()
        if host is None:
            host = "http://localhost"
        elif "://" not in host:
            host = f"http://{host}"
        self.dock_url = self._makeQUrl(host, port, "dock_ligand")
        self.shutdown_url = self._makeQUrl(host, port, "shutdown")
        self.dock_smiles_url = self._makeQUrl(host, port, "dock_smiles")
        self.manager = QtNetwork.QNetworkAccessManager()
        self.manager.finished.connect(self._onDockingFinished)
        self.finished = self.manager.finished 
    @staticmethod
    def _makeQUrl(host: str, port: int, path: str):
        """
        Converts a host, port, and path into a QUrl
        :param host: Host (e.g. http://localhost)
        :param port: Port to connect to
        :param path: Path relative to the host
        :rtype: QtCore.QUrl
        """
        parsed_host = urllib.parse.urlparse(host)
        url_parts = list(parsed_host)
        url_parts[2] = path
        url_str = urllib.parse.urlunparse(url_parts)
        qurl = QtCore.QUrl(url_str)
        qurl.setPort(port)
        return qurl
[docs]    def dock(self, ct):
        """
        Docks the structure without blocking.
        :param ct: Structure to dock
        :type ct: schrodinger.structure.Structure
        """
        body, boundary = self.ct_to_multipart(ct)
        request = QtNetwork.QNetworkRequest(self.dock_url)
        content_key = "Content-type"
        content_value = f"multipart/form-data; boundary={boundary}"
        request.setRawHeader(content_key.encode("utf-8"),
                             content_value.encode("utf-8"))
        self.manager.post(request, body.encode("utf-8")) 
[docs]    def dockSmiles(self, smiles):
        """
        Dock a ligand from SMILES without blocking.
        :param  smiles: ligand SMILES
        :type smiles: str
        """
        query = QtCore.QUrlQuery()
        query.addQueryItem('smiles', smiles)
        self.dock_smiles_url.setQuery(query)
        request = QtNetwork.QNetworkRequest(self.dock_smiles_url)
        self.manager.get(request) 
    def _onDockingFinished(self, reply):
        """
        Checks reply and emits poses if any were produced.
        :param reply: Network reply
        :type reply: QtNetwork.QNetworkReply
        """
        error = reply.error()
        if error != reply.NoError:
            self.errorOccurred.emit(reply.errorString())
            return
        status = reply.attribute(
            QtNetwork.QNetworkRequest.HttpStatusCodeAttribute)
        if status != http.HTTPStatus.OK:
            self.errorOccurred.emit(f"Bad response: {status}")
            return
        if reply.operation() == QtNetwork.QNetworkAccessManager.GetOperation and\
                
not reply.url().hasQuery():
            return
        data = reply.readAll()
        data_str = str(data.data(), encoding="utf-8")
        if data_str.strip() == "":
            self.errorOccurred.emit("Server didn't produce an error but "
                                    "returned no structure data")
            return
        result = GlideResult.fromJson(data_str)
        if len(result) == 0:
            # TODO: pass result.message so client knows why there are no poses.
            self.noPosesDocked.emit()
            return
        self.posesDocked.emit(list(result))
        reply.deleteLater()
[docs]    def shutdown_server(self):
        """
        Requests shutting down the server without blocking.
        """
        request = QtNetwork.QNetworkRequest(self.shutdown_url)
        self.manager.get(request)  
[docs]class SubprocessJobAdapter:
    """
    An adapter that starts a subprocess but makes it look a bit like a
    jobcontrol.Job object. This class does not attempt to expose the full Job
    API; it only has what is actually used within this module. Its purpose is so
    the GlideServerManager class can treat job control jobs and subprocesses
    equally.
    """
[docs]    def __init__(self, cmd, logfile):
        """
        :param cmd: command line to execute
        :type cmd: list of str
        :param logfile: file to use for log stdout/stderr
        :type logfile: file
        """
        self.proc = subprocess.Popen(cmd,
                                     stdout=logfile,
                                     stderr=subprocess.STDOUT,
                                     stdin=subprocess.DEVNULL)
        self.JobId = str(self.proc.pid) 
[docs]    def readAgain(self):
        pass 
    @property
    def Status(self):
        retcode = self.proc.poll()
        if retcode is None:
            return 'running'
        elif retcode == 0:
            return 'finished'
        else:
            return 'died'
[docs]    def kill(self):
        self.proc.terminate()
        # Wait to collect return status and prevent warnings about suprocesses
        # "still running". Normally this only takes about 0.2 s, but let's wait
        # a few seconds and send a hard kill if that fails.
        try:
            self.proc.wait(timeout=3.0)
        except subprocess.TimeoutExpired:
            self.proc.kill()
            self.proc.wait() 
[docs]    def isComplete(self):
        return self.Status != 'running'  
[docs]class AbstractGlideServerManager:
[docs]    def __init__(self, keywords, jobdir='.', jobname=None, use_jc=True):
        """
        :param keywords: Glide keywords to use for the job. The only required
            keyword is GRIDFILE.
        :type keywords: dict
        :param jobdir: job directory
        :type jobdir: str
        :param jobname: basename for input and output files and for job
            control. By default, a random jobname is chosen.
        :type jobname: str
        :param use_jc: run the Glide backend under job control?
        :type use_jc: bool
        """
        super().__init__()
        self.keywords = keywords
        # Convert jobdir to absolute path and raise error if does not exist
        self.jobdir = Path(jobdir).resolve(strict=True)
        self.jobname = jobname or str(uuid.uuid4())
        self.job = None
        self.use_jc = use_jc
        self.config_file = self.jobdir / (self.jobname + self.config_ext)
        self._readConfig() 
[docs]    def start(self, wait=None):
        """
        Launch the Glide Server job. By default, returns as soon as the job is
        launched, but to make sure that the job is ready to dock call .isReady()
        until True.
        :param wait: if given, wait for the server to be ready up to `wait`
            seconds; if the server is still not ready then, raise a
            RuntimeError.
        :type wait: int or NoneType
        """
        # TODO: we could consider support for launching to a remote host.
        self._clearConfig()
        if self.use_jc:
            self._startJC()
        else:
            self._startNoJC()
        logger.info("Launched Glide server job: %s", self.job.JobId)
        if wait:
            self.waitUntilReady(wait) 
[docs]    def isReady(self):
        """
        :return: is the server ready to dock?
        :rtype: bool
        """
        logger.debug('isReady: %s', self.config)
        if self.config and self.job:
            self.job.readAgain()
            logger.debug('job.Status: %s', self.job.Status)
            return self.job.Status == 'running'
        else:
            return False 
[docs]    def waitUntilReady(self, timeout=60):
        """
        Wait for the server to be ready. If the timeout is reached
        and the server is still not ready, raise a RuntimeError.
        :param timeout: maximum wait in seconds
        :type timeout: int
        """
        t0 = time.time()
        while (dt := time.time() - t0) < timeout:
            time.sleep(1)
            if self.isReady():
                logger.info("Glide server ready after %.1f s", dt)
                break
            elif self.job.isComplete():
                raise RuntimeError(
                    f"Glide backend failed to start after {dt:.1f} s")
        else:
            self.job.kill()
            raise RuntimeError("Timed out while waiting for Glide server") 
[docs]    def stop(self, wait=0.0):
        """
        Stop the server. First it will try to send it a shutdown request via
        the network; if that doesn't work, it will kill via job control.
        :param float wait: wait up to this time in seconds for the backend
                           process to exit before killing it and returning.
                           If wait==0.0 and the shutdown message was sent
                           successfully via the network, return immediately
                           without killing.
        """
        if self.job is None:
            return
        self.job.readAgain()
        if self.job.Status == "running":
            try:
                self.client.shutdown_server()
            except (socket.error, ValueError):
                pass  # Ignore; will try job control below.
            else:
                if not wait:
                    return
                @backoff.on_predicate(backoff.constant,
                                      max_time=wait,
                                      interval=0.5)
                def _wait():
                    self.job.readAgain()
                    return self.job.isComplete()
                if _wait():
                    return
        # NOTE: call to shutdown_server() could have caused self.job to be
        # set to None, even if it failed with exception.
        if self.job is not None and not self.job.isComplete():
            self.job.kill() 
    def _startJC(self):
        """
        Launch the Glide server using job control.
        """
        raise NotImplementedError()
    def _startNoJC(self):
        """
        Launch the Glide server as a subprocess, without job control.
        """
        raise NotImplementedError()
    def _clearConfig(self):
        """
        Clear the server config data (e.g., host/port) and remove the config
        file.
        """
        fileutils.force_remove(self.config_file)
        self.job = None
        self._readConfig()
    def _readConfig(self):
        """
        Read the server config file, if it exists.
        """
        if (self.use_jc and self.job is not None and
                not self.config_file.exists()):
            config_base = os.path.basename(self.config_file)
            self.job.readAgain()
            try:
                with jobfiles.get_file(self.job, config_base) as tmpconfig:
                    fileutils.force_rename(tmpconfig, self.config_file)
            except RuntimeError:
                # This is a normal sign that the server isn't ready yet.
                # It is up to the caller to retry.
                pass
        try:
            self._config = json.loads(self.config_file.read_text())
        except (IOError, ValueError):
            # Either the file isn't there or doesn't contain valid JSON. The
            # latter has been known to happen due to race conditions.
            self._config = {}
        # We only support "attaching" to an already running server when
        # using job control. We could in principle do the same using the PID
        # when not using job control, but YAGNI plus PIDs may be reused.
        if self.job is None and self.use_jc:
            jobid = self._config.get('jobid')
            if jobid:
                try:
                    self.job = jobcontrol.Job(jobid)
                except RuntimeError:
                    pass  # Maybe job is too old and job record is gone
    @property
    def config(self):
        """
        A dictionary with the information needed to connect with the server:
        host, port, and jobid. This data is obtained from a JSON file written
        by the server. If the file does not exist (yet?), the dict will be
        empty.
        """
        if not self._config:
            self._readConfig()
        return self._config 
[docs]class GlideServerManager(AbstractGlideServerManager):
    """
    A class to start, stop, monitor, and use a Glide HTTP server. Sample use::
        server = GlideServerManager({'GRIDFILE': 'grid.zip'})
        server.start()
        while not server.isReady():
            time.sleep(1)
        poses = server.dock(st)
        server.stop()
    """
    client_module = 'schrodinger.application.glide.http_server'
    config_ext = '_http.json'
[docs]    def __init__(self, *args, host='localhost', port=0, timeout=1000, **kwargs):
        """
        See parent class for additional arguments.
        :param host: host to which the server should bind to
        :type host: str
        :param port: port where the server should listen. If zero, pick one
                     automatically.
        :type port: int
        :param timeout: the server will shut down automatically if this time,
                        in seconds, passes without receiving any connections.
        :type timeout: int
        """
        super().__init__(*args, **kwargs)
        options = 'host=%s;port=%d;timeout=%d' % (host, port, timeout)
        server_keywords = {
            **self.keywords,
            'CLIENT_MODULE': 'schrodinger.application.glide.http_server',
            'CLIENT_OPTIONS': options,
            'JOBNAME': self.jobname,
        }
        self.glide_job = glide.Dock(server_keywords) 
    def _startJC(self):
        infile = os.path.join(self.jobdir, self.jobname + '.in')
        self.glide_job.writeSimplified(infile)
        cmd = [GLIDE, infile]
        logger.debug("Launching: %s", ' '.join(cmd))
        self.job = jobcontrol.launch_job(cmd)
    def _startNoJC(self):
        infile = os.path.join(self.jobdir, self.jobname + '.js')
        self.glide_job.writeJSON(infile)
        cmd = [RUN, '-FROM', 'glide', 'glide_backend', infile]
        logger.debug("Launching: %s", ' '.join(cmd))
        logfile = os.path.join(self.jobdir, self.jobname + '.log')
        with open(logfile, 'w') as fh:
            self.job = SubprocessJobAdapter(cmd, fh)
[docs]    def dock(self, st):
        """
        Dock a ligand. Return a list of poses, which may be empty. If there was
        a problem connecting to the server, socket.error exceptions may be
        propagated.
        :param st: Structure to dock
        :type st: schrodinger.structure.Structure
        :return: docking result
        :rtype: GlideResult
        """
        logger.debug('Docking ligand: %s', st.title)
        result_iter = self.client.dock(st)
        glide_result = result_iter.asGlideResult()
        logger.debug('   got %d poses', len(glide_result))
        return glide_result 
[docs]    def dockSmiles(self, smiles):
        """
        Dock a ligand from SMILES. For best results, the server should have been
        launched with the LIGPREP keyword enabled.
        :param  smiles: ligand SMILES
        :type smiles: str
        :return: docking result
        :rtype: GlideResult
        """
        logger.debug('Docking ligand SMILES: %s', smiles)
        result_iter = self.client.dockSmiles(smiles)
        glide_result = result_iter.asGlideResult()
        logger.debug('   got %d poses', len(glide_result))
        return glide_result 
[docs]    def setReferenceLigand(self, st):
        """
        Tell the server to use a new reference ligand when docking subsequent
        ligands. (This only has an effect if the job started with a reference
        ligand, for example when the job uses core constraints.)
        :param st: new reference ligand
        :type st: schrodinger.structure.Structure
        :return: Glide result (empty, but with an "Updated reflig" message)
        :rtype: GlideResult
        """
        result_iter = self.client.setReferenceLigand(st)
        return result_iter.asGlideResult() 
    @property
    def client(self):
        """
        Client object to be used for connecting to the Glide server process.
        :rtype: HTTPClient
        :return: the client object.
        """
        if not self.config:
            raise ValueError('Glide server is not ready')
        return HTTPClient(host=self.config['host'], port=self.config['port']) 
[docs]class NonBlockingGlideServerManager(GlideServerManager, QtCore.QObject):
    """
    A class to use a Glide HTTP server without blocking.
    :ivar posesDocked: Signal emitted when a ligand finishes docking. Emitted
        with a list of pose structures.
    :ivar noPosesDocked: Signal emitted when a ligand goes through the docking
        workflow, but does not return any valid poses
    :ivar errorOccurred: Signal emitted when there is an error communicating
        with the server.
    :ivar batchFinished: Signal emitted when a docking batch finishes. Emitted
        with the number of ligands that were docked.
    """
    posesDocked = QtCore.pyqtSignal(list)
    noPosesDocked = QtCore.pyqtSignal()
    batchFinished = QtCore.pyqtSignal(int)
    errorOccurred = QtCore.pyqtSignal(str)
[docs]    def __init__(self, *args, **kwargs):
        # See GlideServerManager for argument documentation
        super().__init__(*args, **kwargs)
        self._docking_active = False
        self._resetBatch()
        self._client = None
        self._backend_checker = NonBlockingBackendChecker()
        self._backend_checker.setBackend(self) 
[docs]    def stop(self, *args, **kwargs):
        self._backend_checker.stopChecking()
        super().stop(*args, **kwargs) 
[docs]    def dock(self, st):
        """
        Docks a given structure
        :param st: Structure or SMILES to dock
        :type st: structure.Structure or str
        :raises RuntimeError if stop() is called on the server during startup
        """
        self._docking_active = True
        client = self._getClient()
        if isinstance(st, str):
            client.dockSmiles(st)
        else:
            client.dock(st) 
[docs]    def dockBatch(self, structures, done_adding=True):
        """
        Set a batch of structures to dock.
        :param structures: Structures to dock
        :type structures: iterable[schrodinger.structure.Structure]
        :param bool done_adding: Whether to emit batchFinished when all sts in
            `structures` finish docking. Pass False if structures is a
            generator that can have more structures added to it at runtime.
        """
        self._resetBatch()
        self._structure_queue = iter(structures)
        if done_adding:
            self.setDoneAdding()
        else:
            # setDoneAdding calls _update so we manually call it here
            self._update() 
[docs]    def cancelBatch(self):
        self._cancel_requested = True
        self._update() 
[docs]    def addStructures(self, structures):
        """
        Add more structures to dock. Should call `setDoneAdding` once all
        structures have been added.
        :param structures: Structures to dock
        :type structures: iterable[schrodinger.structure.Structure]
        """
        self._structure_queue = itertools.chain(self._structure_queue,
                                                structures)
        self._update() 
[docs]    def setDoneAdding(self):
        """
        Call this when done adding structures to a batch; once all structures
        have been docked, `batchFinished` will be emitted.
        """
        self._done_adding = True
        self._update() 
    def _update(self):
        """
        Update the server manager state.
        - Docks the next structure if not already docking
        - Emits batchFinished if all structures are docked and no more
            structures will be added
        """
        if self._docking_active:
            # Return early if already docking
            return
        if self._cancel_requested:
            self._backend_checker.stopChecking()
            self._finishBatch()
            return
        next_st = next(self._structure_queue, None)
        if next_st is not None:
            self._batch_size += 1
            self.dock(next_st)
        elif self._done_adding:
            self._finishBatch()
    def _finishBatch(self):
        self.batchFinished.emit(self._batch_size)
        self._resetBatch()
    def _resetBatch(self):
        self._batch_size = 0
        self._structure_queue = iter(())
        self._done_adding = False
        self._cancel_requested = False
    @QtCore.pyqtSlot()
    def _onDockFinished(self):
        """
        When docking finishes, set docking active to False and update state.
        """
        self._docking_active = False
        self._update()
    def _getClient(self):
        self._backend_checker.wait()
        if not self._backend_checker.ready:
            exc = self._backend_checker.error or SERVER_NOT_READY_MESSAGE
            self.errorOccurred.emit(str(exc))
            self._finishBatch()
        return self.client
    @property
    def client(self):
        if not self.config:
            raise ValueError('Glide server is not ready')
        if self._client is None:
            self._client = NonBlockingHTTPClient(host=self.config['host'],
                                                 port=self.config['port'])
            self._client.posesDocked.connect(self.posesDocked)
            self._client.noPosesDocked.connect(self.noPosesDocked)
            self._client.errorOccurred.connect(self.errorOccurred)
            self._client.finished.connect(self._onDockFinished)
        return self._client 
[docs]class NonBlockingBackendChecker(QtCore.QObject):
    """
    Class to get an GlideServerManager object to the ready state without blocking
    the GUI.
    :ivar ready: Whether the backend is ready
    """
[docs]    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.ready = False
        self.error = None
        self._backend = None
        self._timer = QtCore.QTimer()
        self._timer.setInterval(2000)
        self._timer.timeout.connect(self._checkBackend)
        self._event_loop = QtCore.QEventLoop() 
[docs]    def wait(self, timeout=MAX_WAIT):
        """
        Block python execution (but not the GUI) until the backend is ready or
        the timeout is reached.
        :param timeout: Timeout in seconds or None to not timeout.
        :type timeout: int or NoneType
        :return: Whether the backend is ready
        """
        if self._backend is None:
            raise RuntimeError("Must call `setBackend` before `wait`")
        self.ready = False
        self.error = None
        # Check once and return early if it's already ready or an exception
        # occurred
        self._checkBackend()
        if self.ready or self.error is not None:
            return self.ready
        # Otherwise, check on a timer
        if timeout == 0:
            self.onServerTimeout()
            return self.ready
        elif timeout is not None:
            QtCore.QTimer.singleShot(timeout * 1000, self.onServerTimeout)
        self._timer.start()
        self._event_loop.exec()
        return self.ready 
[docs]    def setBackend(self, backend):
        """
        Set the backend object to call isReady on
        :type backend: GlideServerManager
        """
        self.stopChecking()
        self._backend = backend
        self.ready = False 
[docs]    def stopChecking(self):
        """
        Stop checking whether the backend is ready
        """
        self._timer.stop()
        self._event_loop.quit() 
[docs]    @QtCore.pyqtSlot()
    def onServerTimeout(self, *, msg=None):
        if msg is None:
            msg = "Server was not ready in time"
        self.error = ServerTimedOutError(msg)
        self.stopChecking() 
    def _checkBackend(self):
        try:
            self.ready = self._backend.isReady()
        except jobcontrol.JobLaunchFailure as exc:
            self.error = exc
            self.stopChecking()
        if self.ready:
            self.stopChecking()
        else:
            job = self._backend.job
            if job and job.Status in ('finished', 'died'):
                if job.Status == 'finished':
                    msg = SERVER_FINISHED_MESSAGE
                else:
                    msg = "Server died"
                self.onServerTimeout(msg=msg) 
[docs]class ZmqClient(QtCore.QObject):
    """
    Connect to a Glide ZMQ driver and dock ligands synchronously.
    A Glide ZMQ driver is a Glide job launched with the -mq and -server flags.
    :ivar posesDocked: Signal emitted when a ligand finishes docking. Emitted
                       with a list of pose structures.
    :ivar noPosesDocked: Signal emitted when a ligand goes through the docking
                         workflow, but does not return any valid poses
    :ivar noMoreLigs: Signal emitted when there are no more ligands left to
                      dock in the server's queue.
    :ivar gotStatus: Signal emitted when the server sends a status message.
                     Emitted with a dict with status information. Current keys
                     include "elapsed_time", "ligs_done", "poses_stored",
                     "ligs_per_sec", "lps_per_worker", "active_workers",
                     "idle_workers", and "lost_workers".
    """
    posesDocked = QtCore.pyqtSignal(list)
    noPosesDocked = QtCore.pyqtSignal()
    noMoreLigs = QtCore.pyqtSignal()
    gotStatus = QtCore.pyqtSignal(dict)
[docs]    def __init__(self, url):
        """
        :param url: URL of server to connect to.
        :type url: str
        """
        super().__init__()
        self._lignum = 0
        context = zmq.Context.instance()
        self._sock = context.socket(zmq.DEALER)
        self._sock.connect(url)
        fd = self._sock.getsockopt(zmq.FD)
        self._notifier = QtCore.QSocketNotifier(fd, QtCore.QSocketNotifier.Read,
                                                self)
        self._notifier.activated.connect(self._getMessages)
        # For some reason, I can't get the QSocketNotifier above to work
        # reliably; when I test this class by itself it works, but hooked up to
        # the server manager / ligand designer, I don't get any notifications.
        #
        # Perhaps the raw socket by this point already has some readable data
        # due to low-level ZMQ traffic, and the notifer only notifies about the
        # transition from "no data" to "data", and not from "data" to "more
        # data"?
        #
        # Polling the ZMQ socket here sometimes seemed to clear that up, but
        # even that was not reliable enough, so to be safe, we'll check for
        # incoming messages periodically using a timer.
        self._timer = QtCore.QTimer(self)
        self._timer.setInterval(1000)
        self._timer.timeout.connect(self._getMessages)
        self._timer.start() 
[docs]    def dock(self, st, lignum=None):
        """
        Send a ligand to the server to dock. Poses are returned asynchronously
        via the posesDocked and noPosesDocked signals.
        :param st: Structure to dock
        :type st: schrodinger.structure.Structure
        :param lignum: ligand index (stored by Glide in the i_i_glide_lignum
                       property). It can be used to tell which ligand is which,
                       because the poses are not in general returned in the same
                       order in which the ligands are sent to the server.
                       If not provided, a sequential number will be used. Note
                       that unique numbers should be used, because lignum is
                       used as a key by the server!
        :type lignum: int or NoneType
        """
        if lignum:
            self._lignum = lignum
        else:
            self._lignum += 1
        blob = st.writeToString(structure.MAESTRO).encode('utf-8')
        msg = {'cmd': 'LIGS', 'ligs': [(self._lignum, blob)]}
        self._sock.send_pyobj(msg) 
    @QtCore.pyqtSlot()
    def _getMessages(self):
        """
        Receive and process any messages from the ZMQ socket.
        """
        while self._sock.poll(1):
            msg = self._sock.recv_pyobj()
            if msg['cmd'] == 'POSES':
                poses = [st_from_blob(blob) for blob in msg['blobs']]
                if poses:
                    self.posesDocked.emit(poses)
                else:
                    self.noPosesDocked.emit()
                if msg['queue_len'] == 0:
                    self.noMoreLigs.emit()
            elif msg['cmd'] == 'INFO':
                self.gotStatus.emit(msg['status'])
[docs]    def shutdown_server(self):
        self._sock.send_pyobj({'cmd': 'STOP'}) 
[docs]    def cancel(self):
        self._sock.send_pyobj({'cmd': 'CANCEL'}) 
[docs]    def getStatus(self):
        """
        Request a status update from the server. When the response comes, the
        gotStatus signal is emitted with a dict containg status information.
        """
        self._sock.send_pyobj({'cmd': 'STATUS'}) 
[docs]    def disconnect(self):
        """
        Disconnect the ZMQ socket and stop listening.
        """
        self._notifier.setEnabled(False)
        self._timer.stop()
        self._sock.close()  
[docs]class NonBlockingGlideServerManagerZmq(AbstractGlideServerManager,
                                       QtCore.QObject):
    """
    Launch and use Glide ZMQ server without blocking. The server and workers
    are run on localhost, but future versions might relax this restriction.
    This class is meant as a drop-in replacement for
    NonBlockingGlideServerManager, but there are a couple of subtle differences
    in behavior: 1) when docking a batch, all ligands are sent immediately to
    the server, which keeps its own queue. This could be a problem when feeding
    an infinite iterable to dockBatch(). 2) cancelBatch() causes any currently
    docking ligands to be aborted, unlike NonBlockingGlideServerManager which
    waits for the current ligand to complete.
    :ivar posesDocked: Signal emitted when a ligand finishes docking. Emitted
        with a list of pose structures.
    :ivar noPosesDocked: Signal emitted when a ligand goes through the docking
        workflow, but does not return any valid poses
    :ivar errorOccurred: Signal emitted when there is an error communicating
        with the server.
    :ivar batchFinished: Signal emitted when a docking batch finishes. Emitted
        with the number of ligands that were docked.
    :ivar gotStatus: Signal emitted when the server sends a status message.
                     Emitted with a dict with status information. Current keys
                     include "elapsed_time", "ligs_done", "poses_stored",
                     "ligs_per_sec", "lps_per_worker", "active_workers",
                     "idle_workers", and "lost_workers".
    """
    config_ext = '_zmq.json'
    posesDocked = QtCore.pyqtSignal(list)
    noPosesDocked = QtCore.pyqtSignal()
    batchFinished = QtCore.pyqtSignal(int)
    errorOccurred = QtCore.pyqtSignal(str)
    gotStatus = QtCore.pyqtSignal(dict)
[docs]    def __init__(self, *args, nworkers=2, **kwargs):
        """
        Most arguments are passed through to GlideServerManager; see that class
        for documentation. Arguments specific to this class:
        :param nworkers: number of workers to launch.
        :type nworkers: int
        """
        super().__init__(*args, **kwargs)
        self.nworkers = nworkers
        self._client = None
        self._backend_checker = NonBlockingBackendChecker()
        self._backend_checker.setBackend(self)
        self._resetBatch()
        server_keywords = {**self.keywords, 'JOBNAME': self.jobname}
        self.glide_job = glide.Dock(server_keywords) 
[docs]    def stop(self, *args, **kwargs):
        self._backend_checker.stopChecking()
        super().stop(*args, **kwargs)
        if self._client is not None:
            self._client.disconnect() 
[docs]    def dock(self, st):
        client = self._getClient()
        client.dock(st)
        self._sent += 1 
[docs]    def dockBatch(self, structures, done_adding=True, reset=True):
        """
        Set a batch of structures to dock.
        :param structures: Structures to dock
        :type structures: iterable[schrodinger.structure.Structure]
        :param bool done_adding: Whether to emit batchFinished when all sts in
            `structures` finish docking. Pass False if structures is a
            generator that can have more structures added to it at runtime.
        :param bool reset: Whether to reset the batch before adding. Pass
            False to expand the current batch instead of starting a new one.
        """
        if reset:
            self._resetBatch()
        for st in structures:
            self.dock(st)
        if done_adding:
            self.setDoneAdding() 
[docs]    def cancelBatch(self):
        if self.config:
            self.client.cancel()
        self._finishBatch() 
[docs]    def addStructures(self, structures):
        """
        Add more structures to dock. Should call `setDoneAdding` once all
        structures have been added.
        :param structures: Structures to dock
        :type structures: iterable[schrodinger.structure.Structure]
        """
        self._done_adding = False
        self.dockBatch(structures, done_adding=False, reset=False) 
[docs]    def setDoneAdding(self):
        """
        Call this when done adding structures to a batch; once all structures
        have been docked, `batchFinished` will be emitted.
        """
        self._done_adding = True
        if self._sent == self._received:
            self._finishBatch() 
    @property
    def client(self):
        if not self.config:
            raise ValueError('Glide server is not ready')
        if self._client is None:
            self._client = ZmqClient(self.config['url'])
            self._client.posesDocked.connect(self._onPosesDocked)
            self._client.noPosesDocked.connect(self._onNoPosesDocked)
            self._client.noMoreLigs.connect(self._onNoMoreLigs)
            self._client.gotStatus.connect(self.gotStatus)
        return self._client
[docs]    def getStatus(self):
        client = self._getClient()
        client.getStatus() 
    def _getClient(self):
        self._backend_checker.wait()
        if not self._backend_checker.ready:
            exc = self._backend_checker.error or SERVER_NOT_READY_MESSAGE
            self.errorOccurred.emit(str(exc))
            self._finishBatch()
        return self.client
    def _finishBatch(self):
        self.batchFinished.emit(self._received)
        self._resetBatch()
    def _resetBatch(self):
        self._sent = 0
        self._received = 0
        self._done_adding = False
    def _setupJob(self):
        """
        Write the input file and return the command line arguments which are
        used whether launching the driver under job control or not.
        """
        infile = os.path.join(self.jobdir, self.jobname + '.in')
        self.glide_job.writeSimplified(infile)
        cmd = [
            '-HOST', f'localhost:{self.nworkers}', '-NJOBS',
            str(self.nworkers), '-server', '-mq', '-bind_local', infile
        ]
        return cmd
[docs]    def start(self, *args, **kwargs):
        with fileutils.chdir(self.jobdir):
            return super().start(*args, **kwargs) 
    def _startJC(self):
        common_args = self._setupJob()
        cmd = [GLIDE] + common_args
        logger.debug("Launching: %s", ' '.join(cmd))
        self.job = jobcontrol.launch_job(cmd)
    def _startNoJC(self):
        common_args = self._setupJob()
        cmd = [RUN, '-FROM', 'glide', 'glide_driver.py'] + common_args
        logger.debug("Launching: %s", ' '.join(cmd))
        logfile = os.path.join(self.jobdir, self.jobname + '.log')
        with open(logfile, 'w') as fh:
            self.job = SubprocessJobAdapter(cmd, fh)
    @QtCore.pyqtSlot(list)
    def _onPosesDocked(self, poses):
        self._received += 1
        self.posesDocked.emit(poses)
    @QtCore.pyqtSlot()
    def _onNoPosesDocked(self):
        self._received += 1
        self.noPosesDocked.emit()
    @QtCore.pyqtSlot()
    def _onNoMoreLigs(self):
        if self._done_adding:
            self._finishBatch() 
[docs]def st_from_blob(blob):
    """
    Return a Structure from a compressed blob.
    :param blob: compressed m2io representation of a structure
    :type blob: bytes
    :return: Structure
    :rtype: schrodinger.structure.Structure
    """
    s = zlib.decompress(blob).decode('utf-8')
    return next(structure.StructureReader.fromString(s)) 
if __name__ == '__main__':
    # for standalone testing, we'll read a ligand file, POST it to the server
    # one ligand at a time, and write the poses we get back to an output file.
    if len(sys.argv) != 5:
        sys.exit("Usage: %s <host> <port> <ligfile> <output-posefile>" %
                 sys.argv[0])
    host, port, infile, outfile = sys.argv[1:]
    client = HTTPClient(host=host, port=port)
    writer = structure.StructureWriter(outfile)
    for ct in structure.StructureReader(infile):
        print("docking ligand: %s" % ct.title)
        poses = client.dock(ct)
        for pose in poses:
            gscore = 0.0
            try:
                gscore = pose.property['r_i_glide_gscore']
            except KeyError:
                pass
            print("  writing pose; gscore=%f" % gscore)
            writer.append(pose)