"""
Glide HTTP client
This module implements objects that connect to a Glide HTTP server (see
schrodinger.application.glide.http_server) and use the server to dock
ligands remotely.
Sample usage::
from schrodinger import structure
from schrodinger.application.glide import http_client
client = http_client.HTTPClient(host='localhost', port=8000)
ct = structure.Structure.read('mylig.mae')
poses = client.dock(ct)
for pose in poses:
print("gscore=%f" % pose.properties['r_i_glide_gscore'])
For a higher level API that also starts up and monitors the server itself,
see `GlideServerManager`.
"""
import http
import itertools
import json
import os
import random
import socket
import sys
import time
import urllib
import uuid
import zlib
from pathlib import Path
import backoff
import zmq
from schrodinger import structure
from schrodinger.application.glide import glide
from schrodinger.job import files as jobfiles
from schrodinger.job import jobcontrol
from schrodinger.Qt import QtCore
from schrodinger.Qt import QtNetwork
from schrodinger.utils import fileutils
from schrodinger.utils import log
from schrodinger.utils import subprocess
logger = log.get_output_logger(__file__)
# Maximum time in seconds to wait for the Glide server to be ready.
MAX_WAIT = 60
GLIDE = os.path.join(os.environ['SCHRODINGER'], 'glide')
RUN = os.path.join(os.environ['SCHRODINGER'], 'run')
SERVER_FINISHED_MESSAGE = "Server timed out while waiting for ligands"
SERVER_NOT_READY_MESSAGE = "Glide server was not ready"
[docs]class ServerTimedOutError(RuntimeError):
"""
Exception raised if the Glide server is not ready after `MAX_WAIT`
"""
[docs]class GlideResult:
"""
Sequence-like object containing the poses returned by Glide for one ligand.
When no poses were produced, the reason is available in the .message
property.
"""
[docs] def __init__(self, poses, message=''):
"""
:param poses: poses returned by glide (may be empty)
:type poses: list of schrodinger.structure.Structure
:param message: status message explaining why no poses were returned
:type message: str
"""
self.poses = poses
self.message = message
[docs] def __len__(self):
return len(self.poses)
def __getitem__(self, idx):
return self.poses[idx]
def __iter__(self):
return GlideResultIterator(self.poses, self.message)
[docs] def toJson(self):
"""
Return a JSON representation of the docking result.
:return: JSON
:rtype: str
"""
m2io = ''.join(structure.write_ct_to_string(st) for st in self.poses)
obj = {'message': self.message, 'poses': m2io}
return json.dumps(obj)
[docs] @classmethod
def fromJson(cls, json_str):
"""
Construct a GlideResult from its JSON representation.
:param json_str: JSON representation
:type json_str: str
:return: GlideResult
:rtype: GlideResult
"""
obj = json.loads(json_str)
if obj['poses']:
poses = list(structure.MaestroReader('', input_string=obj['poses']))
else:
poses = []
return cls(poses, obj['message'])
[docs]class GlideResultIterator:
"""
Iterator over the poses returned by Glide for one ligand. When no poses were
produced, the reason is available in the .message property.
"""
[docs] def __init__(self, poses, message=''):
"""
:param poses: poses returned by glide (may be empty)
:type poses: list of schrodinger.structure.Structure
:param message: status message explaining why no poses were returned
:type message: str
"""
self.poses = poses
self.message = message
self._iter = iter(poses)
def __next__(self):
return next(self._iter)
def __iter__(self):
return self
[docs] def asGlideResult(self):
"""
Convert the iterator back into a GlideResult.
:return: GlideResult
:rype: GlideResult
"""
return GlideResult(self.poses, self.message)
[docs]class AbstractHTTPClient:
"""
Interface for connecting to a Glide HTTP server.
"""
[docs] def dock(self, ct):
"""
Dock the ligand in Structure object `ct` using the remote Glide server.
"""
raise NotImplementedError()
[docs] def shutdown_server(self):
"""
Ask the Glide HTTP server to shut down.
"""
raise NotImplementedError()
[docs] @staticmethod
def ct_to_multipart(ct):
"""
Encode a CT in multipart/form-data format, ready to POST.
:param ct: Structure to encode
:type ct: structure.Structure
:return: The body of the request and the boundary
:rtype: tuple(str, str)
"""
cts = structure.write_ct_to_string(ct)
boundary = "----Boundary%d" % random.randint(0, 100000)
while boundary in cts: # unlikely, but you never know...
boundary += "%d" % random.randint(0, 100000)
body = '--%s\r\n' \
'Content-Disposition: form-data; name="lig"; ' \
'filename="lig.mae"\r\n' \
'Content-Type: application/octet-stream\r\n\r\n' \
'%s--%s--\r\n' % (boundary, cts, boundary)
return body, boundary
[docs]class HTTPClient(AbstractHTTPClient):
"""
This class provides an API to connect to an existing Glide HTTP server.
For a higher level API that also starts up and monitors the server itself,
see `GlideServerManager`.
"""
[docs] def __init__(self, con=None, host="localhost", port=8000, timeout=1000):
"""
Initialize a new HTTPClient object. The optional 'con'
is an existing httplib.HTTPConnection object. If not provided,
then 'host', 'port', and 'timeout' will be used to create one.
The default timeout value is very large to make sure that it is
enough for most docking jobs to finish.
"""
if con:
self.con = con
else:
self.con = http.client.HTTPConnection(host, port, timeout=timeout)
[docs] def dock(self, ct):
"""
Dock the ligand in Structure object 'ct' using the remote Glide server.
:param ct: input ligand
:type ct: schrodinger.structure.Structure
:return: docking results as an iterator
:rtype: GlideResultIterator
"""
# This method returns an iterator for backward compatibility; it used to
# return a MaestroReader, and some callers expect to be able to call
# next() on it. The higher-level version in the newer GlideServerManager
# class returns the sequence-like GlideResult, which is more versatile.
return self._postLigand('/dock_ligand', ct)
[docs] def setReferenceLigand(self, ct):
"""
Tell the server to use a new reference ligand when docking subsequent
ligands. (This only has an effect if the job started with a reference
ligand, for example when the job uses core constraints.)
:param ct: new reference ligand
:type ct: schrodinger.structure.Structure
:return: Glide results iterator (empty, but with an "Updated reflig"
message)
:rtype: GlideResultIterator
"""
return self._postLigand('/set_reflig', ct)
def _postLigand(self, path, ct):
body, boundary = self.ct_to_multipart(ct)
self.con.request(
"POST",
path,
body,
headers={
"Content-type": "multipart/form-data; boundary=%s" % boundary
})
return self._processResponse()
[docs] def dockSmiles(self, smiles):
"""
Dock a ligand from SMILES. For best results, the server should have been
launched with the LIGPREP keyword enabled.
:param smiles: ligand SMILES
:type smiles: str
:return: docking results as an iterator
:rtype: GlideResultIterator
"""
qs = urllib.parse.urlencode({'smiles': smiles})
self.con.request("GET", "/dock_smiles?" + qs)
return self._processResponse()
def _processResponse(self):
res = self.con.getresponse()
if res.status != http.HTTPStatus.OK:
raise IOError("Bad response %d %s" % (res.status, res.reason))
body = res.read().decode('utf-8')
return iter(GlideResult.fromJson(body))
[docs] def shutdown_server(self):
"""
Ask the Glide HTTP server to shut down.
"""
self.con.request("GET", "/shutdown")
res = self.con.getresponse()
if res.status != http.HTTPStatus.OK:
raise IOError("Bad response %d %s" % (res.status, res.reason))
[docs]class NonBlockingHTTPClient(AbstractHTTPClient, QtCore.QObject):
"""
Class for connecting to a Glide HTTP server and docking poses without
blocking.
:ivar posesDocked: Signal emitted when a ligand finishes docking. Emitted
with a list of pose structures.
:ivar noPosesDocked: Signal emitted when a ligand goes through the docking
workflow, but does not return any valid poses
:ivar errorOccurred: Signal emitted when there is an error communicating
with the server.
:ivar finished: Signal emitted when an HTTP request finishes. Note: aliased
from self.manager.finished
"""
posesDocked = QtCore.pyqtSignal(list)
noPosesDocked = QtCore.pyqtSignal()
errorOccurred = QtCore.pyqtSignal(str)
[docs] def __init__(self, host=None, port=8000):
"""
:param host: Hostname for server
:type host: str
:param port: Port for server
:type port: int
"""
super().__init__()
if host is None:
host = "http://localhost"
elif "://" not in host:
host = f"http://{host}"
self.dock_url = self._makeQUrl(host, port, "dock_ligand")
self.shutdown_url = self._makeQUrl(host, port, "shutdown")
self.dock_smiles_url = self._makeQUrl(host, port, "dock_smiles")
self.manager = QtNetwork.QNetworkAccessManager()
self.manager.finished.connect(self._onDockingFinished)
self.finished = self.manager.finished
@staticmethod
def _makeQUrl(host: str, port: int, path: str):
"""
Converts a host, port, and path into a QUrl
:param host: Host (e.g. http://localhost)
:param port: Port to connect to
:param path: Path relative to the host
:rtype: QtCore.QUrl
"""
parsed_host = urllib.parse.urlparse(host)
url_parts = list(parsed_host)
url_parts[2] = path
url_str = urllib.parse.urlunparse(url_parts)
qurl = QtCore.QUrl(url_str)
qurl.setPort(port)
return qurl
[docs] def dock(self, ct):
"""
Docks the structure without blocking.
:param ct: Structure to dock
:type ct: schrodinger.structure.Structure
"""
body, boundary = self.ct_to_multipart(ct)
request = QtNetwork.QNetworkRequest(self.dock_url)
content_key = "Content-type"
content_value = f"multipart/form-data; boundary={boundary}"
request.setRawHeader(content_key.encode("utf-8"),
content_value.encode("utf-8"))
self.manager.post(request, body.encode("utf-8"))
[docs] def dockSmiles(self, smiles):
"""
Dock a ligand from SMILES without blocking.
:param smiles: ligand SMILES
:type smiles: str
"""
query = QtCore.QUrlQuery()
query.addQueryItem('smiles', smiles)
self.dock_smiles_url.setQuery(query)
request = QtNetwork.QNetworkRequest(self.dock_smiles_url)
self.manager.get(request)
def _onDockingFinished(self, reply):
"""
Checks reply and emits poses if any were produced.
:param reply: Network reply
:type reply: QtNetwork.QNetworkReply
"""
error = reply.error()
if error != reply.NoError:
self.errorOccurred.emit(reply.errorString())
return
status = reply.attribute(
QtNetwork.QNetworkRequest.HttpStatusCodeAttribute)
if status != http.HTTPStatus.OK:
self.errorOccurred.emit(f"Bad response: {status}")
return
if reply.operation() == QtNetwork.QNetworkAccessManager.GetOperation and\
not reply.url().hasQuery():
return
data = reply.readAll()
data_str = str(data.data(), encoding="utf-8")
if data_str.strip() == "":
self.errorOccurred.emit("Server didn't produce an error but "
"returned no structure data")
return
result = GlideResult.fromJson(data_str)
if len(result) == 0:
# TODO: pass result.message so client knows why there are no poses.
self.noPosesDocked.emit()
return
self.posesDocked.emit(list(result))
reply.deleteLater()
[docs] def shutdown_server(self):
"""
Requests shutting down the server without blocking.
"""
request = QtNetwork.QNetworkRequest(self.shutdown_url)
self.manager.get(request)
[docs]class SubprocessJobAdapter:
"""
An adapter that starts a subprocess but makes it look a bit like a
jobcontrol.Job object. This class does not attempt to expose the full Job
API; it only has what is actually used within this module. Its purpose is so
the GlideServerManager class can treat job control jobs and subprocesses
equally.
"""
[docs] def __init__(self, cmd, logfile):
"""
:param cmd: command line to execute
:type cmd: list of str
:param logfile: file to use for log stdout/stderr
:type logfile: file
"""
self.proc = subprocess.Popen(cmd,
stdout=logfile,
stderr=subprocess.STDOUT,
stdin=subprocess.DEVNULL)
self.JobId = str(self.proc.pid)
[docs] def readAgain(self):
pass
@property
def Status(self):
retcode = self.proc.poll()
if retcode is None:
return 'running'
elif retcode == 0:
return 'finished'
else:
return 'died'
[docs] def kill(self):
self.proc.terminate()
# Wait to collect return status and prevent warnings about suprocesses
# "still running". Normally this only takes about 0.2 s, but let's wait
# a few seconds and send a hard kill if that fails.
try:
self.proc.wait(timeout=3.0)
except subprocess.TimeoutExpired:
self.proc.kill()
self.proc.wait()
[docs] def isComplete(self):
return self.Status != 'running'
[docs]class AbstractGlideServerManager:
[docs] def __init__(self, keywords, jobdir='.', jobname=None, use_jc=True):
"""
:param keywords: Glide keywords to use for the job. The only required
keyword is GRIDFILE.
:type keywords: dict
:param jobdir: job directory
:type jobdir: str
:param jobname: basename for input and output files and for job
control. By default, a random jobname is chosen.
:type jobname: str
:param use_jc: run the Glide backend under job control?
:type use_jc: bool
"""
super().__init__()
self.keywords = keywords
# Convert jobdir to absolute path and raise error if does not exist
self.jobdir = Path(jobdir).resolve(strict=True)
self.jobname = jobname or str(uuid.uuid4())
self.job = None
self.use_jc = use_jc
self.config_file = self.jobdir / (self.jobname + self.config_ext)
self._readConfig()
[docs] def start(self, wait=None):
"""
Launch the Glide Server job. By default, returns as soon as the job is
launched, but to make sure that the job is ready to dock call .isReady()
until True.
:param wait: if given, wait for the server to be ready up to `wait`
seconds; if the server is still not ready then, raise a
RuntimeError.
:type wait: int or NoneType
"""
# TODO: we could consider support for launching to a remote host.
self._clearConfig()
if self.use_jc:
self._startJC()
else:
self._startNoJC()
logger.info("Launched Glide server job: %s", self.job.JobId)
if wait:
self.waitUntilReady(wait)
[docs] def isReady(self):
"""
:return: is the server ready to dock?
:rtype: bool
"""
logger.debug('isReady: %s', self.config)
if self.config and self.job:
self.job.readAgain()
logger.debug('job.Status: %s', self.job.Status)
return self.job.Status == 'running'
else:
return False
[docs] def waitUntilReady(self, timeout=60):
"""
Wait for the server to be ready. If the timeout is reached
and the server is still not ready, raise a RuntimeError.
:param timeout: maximum wait in seconds
:type timeout: int
"""
t0 = time.time()
while (dt := time.time() - t0) < timeout:
time.sleep(1)
if self.isReady():
logger.info("Glide server ready after %.1f s", dt)
break
elif self.job.isComplete():
raise RuntimeError(
f"Glide backend failed to start after {dt:.1f} s")
else:
self.job.kill()
raise RuntimeError("Timed out while waiting for Glide server")
[docs] def stop(self, wait=0.0):
"""
Stop the server. First it will try to send it a shutdown request via
the network; if that doesn't work, it will kill via job control.
:param float wait: wait up to this time in seconds for the backend
process to exit before killing it and returning.
If wait==0.0 and the shutdown message was sent
successfully via the network, return immediately
without killing.
"""
if self.job is None:
return
self.job.readAgain()
if self.job.Status == "running":
try:
self.client.shutdown_server()
except (socket.error, ValueError):
pass # Ignore; will try job control below.
else:
if not wait:
return
@backoff.on_predicate(backoff.constant,
max_time=wait,
interval=0.5)
def _wait():
self.job.readAgain()
return self.job.isComplete()
if _wait():
return
# NOTE: call to shutdown_server() could have caused self.job to be
# set to None, even if it failed with exception.
if self.job is not None and not self.job.isComplete():
self.job.kill()
def _startJC(self):
"""
Launch the Glide server using job control.
"""
raise NotImplementedError()
def _startNoJC(self):
"""
Launch the Glide server as a subprocess, without job control.
"""
raise NotImplementedError()
def _clearConfig(self):
"""
Clear the server config data (e.g., host/port) and remove the config
file.
"""
fileutils.force_remove(self.config_file)
self.job = None
self._readConfig()
def _readConfig(self):
"""
Read the server config file, if it exists.
"""
if (self.use_jc and self.job is not None and
not self.config_file.exists()):
config_base = os.path.basename(self.config_file)
self.job.readAgain()
try:
with jobfiles.get_file(self.job, config_base) as tmpconfig:
fileutils.force_rename(tmpconfig, self.config_file)
except RuntimeError:
# This is a normal sign that the server isn't ready yet.
# It is up to the caller to retry.
pass
try:
self._config = json.loads(self.config_file.read_text())
except (IOError, ValueError):
# Either the file isn't there or doesn't contain valid JSON. The
# latter has been known to happen due to race conditions.
self._config = {}
# We only support "attaching" to an already running server when
# using job control. We could in principle do the same using the PID
# when not using job control, but YAGNI plus PIDs may be reused.
if self.job is None and self.use_jc:
jobid = self._config.get('jobid')
if jobid:
try:
self.job = jobcontrol.Job(jobid)
except RuntimeError:
pass # Maybe job is too old and job record is gone
@property
def config(self):
"""
A dictionary with the information needed to connect with the server:
host, port, and jobid. This data is obtained from a JSON file written
by the server. If the file does not exist (yet?), the dict will be
empty.
"""
if not self._config:
self._readConfig()
return self._config
[docs]class GlideServerManager(AbstractGlideServerManager):
"""
A class to start, stop, monitor, and use a Glide HTTP server. Sample use::
server = GlideServerManager({'GRIDFILE': 'grid.zip'})
server.start()
while not server.isReady():
time.sleep(1)
poses = server.dock(st)
server.stop()
"""
client_module = 'schrodinger.application.glide.http_server'
config_ext = '_http.json'
[docs] def __init__(self, *args, host='localhost', port=0, timeout=1000, **kwargs):
"""
See parent class for additional arguments.
:param host: host to which the server should bind to
:type host: str
:param port: port where the server should listen. If zero, pick one
automatically.
:type port: int
:param timeout: the server will shut down automatically if this time,
in seconds, passes without receiving any connections.
:type timeout: int
"""
super().__init__(*args, **kwargs)
options = 'host=%s;port=%d;timeout=%d' % (host, port, timeout)
server_keywords = {
**self.keywords,
'CLIENT_MODULE': 'schrodinger.application.glide.http_server',
'CLIENT_OPTIONS': options,
'JOBNAME': self.jobname,
}
self.glide_job = glide.Dock(server_keywords)
def _startJC(self):
infile = os.path.join(self.jobdir, self.jobname + '.in')
self.glide_job.writeSimplified(infile)
cmd = [GLIDE, infile]
logger.debug("Launching: %s", ' '.join(cmd))
self.job = jobcontrol.launch_job(cmd)
def _startNoJC(self):
infile = os.path.join(self.jobdir, self.jobname + '.js')
self.glide_job.writeJSON(infile)
cmd = [RUN, '-FROM', 'glide', 'glide_backend', infile]
logger.debug("Launching: %s", ' '.join(cmd))
logfile = os.path.join(self.jobdir, self.jobname + '.log')
with open(logfile, 'w') as fh:
self.job = SubprocessJobAdapter(cmd, fh)
[docs] def dock(self, st):
"""
Dock a ligand. Return a list of poses, which may be empty. If there was
a problem connecting to the server, socket.error exceptions may be
propagated.
:param st: Structure to dock
:type st: schrodinger.structure.Structure
:return: docking result
:rtype: GlideResult
"""
logger.debug('Docking ligand: %s', st.title)
result_iter = self.client.dock(st)
glide_result = result_iter.asGlideResult()
logger.debug(' got %d poses', len(glide_result))
return glide_result
[docs] def dockSmiles(self, smiles):
"""
Dock a ligand from SMILES. For best results, the server should have been
launched with the LIGPREP keyword enabled.
:param smiles: ligand SMILES
:type smiles: str
:return: docking result
:rtype: GlideResult
"""
logger.debug('Docking ligand SMILES: %s', smiles)
result_iter = self.client.dockSmiles(smiles)
glide_result = result_iter.asGlideResult()
logger.debug(' got %d poses', len(glide_result))
return glide_result
[docs] def setReferenceLigand(self, st):
"""
Tell the server to use a new reference ligand when docking subsequent
ligands. (This only has an effect if the job started with a reference
ligand, for example when the job uses core constraints.)
:param st: new reference ligand
:type st: schrodinger.structure.Structure
:return: Glide result (empty, but with an "Updated reflig" message)
:rtype: GlideResult
"""
result_iter = self.client.setReferenceLigand(st)
return result_iter.asGlideResult()
@property
def client(self):
"""
Client object to be used for connecting to the Glide server process.
:rtype: HTTPClient
:return: the client object.
"""
if not self.config:
raise ValueError('Glide server is not ready')
return HTTPClient(host=self.config['host'], port=self.config['port'])
[docs]class NonBlockingGlideServerManager(GlideServerManager, QtCore.QObject):
"""
A class to use a Glide HTTP server without blocking.
:ivar posesDocked: Signal emitted when a ligand finishes docking. Emitted
with a list of pose structures.
:ivar noPosesDocked: Signal emitted when a ligand goes through the docking
workflow, but does not return any valid poses
:ivar errorOccurred: Signal emitted when there is an error communicating
with the server.
:ivar batchFinished: Signal emitted when a docking batch finishes. Emitted
with the number of ligands that were docked.
"""
posesDocked = QtCore.pyqtSignal(list)
noPosesDocked = QtCore.pyqtSignal()
batchFinished = QtCore.pyqtSignal(int)
errorOccurred = QtCore.pyqtSignal(str)
[docs] def __init__(self, *args, **kwargs):
# See GlideServerManager for argument documentation
super().__init__(*args, **kwargs)
self._docking_active = False
self._resetBatch()
self._client = None
self._backend_checker = NonBlockingBackendChecker()
self._backend_checker.setBackend(self)
[docs] def stop(self, *args, **kwargs):
self._backend_checker.stopChecking()
super().stop(*args, **kwargs)
[docs] def dock(self, st):
"""
Docks a given structure
:param st: Structure or SMILES to dock
:type st: structure.Structure or str
:raises RuntimeError if stop() is called on the server during startup
"""
self._docking_active = True
client = self._getClient()
if isinstance(st, str):
client.dockSmiles(st)
else:
client.dock(st)
[docs] def dockBatch(self, structures, done_adding=True):
"""
Set a batch of structures to dock.
:param structures: Structures to dock
:type structures: iterable[schrodinger.structure.Structure]
:param bool done_adding: Whether to emit batchFinished when all sts in
`structures` finish docking. Pass False if structures is a
generator that can have more structures added to it at runtime.
"""
self._resetBatch()
self._structure_queue = iter(structures)
if done_adding:
self.setDoneAdding()
else:
# setDoneAdding calls _update so we manually call it here
self._update()
[docs] def cancelBatch(self):
self._cancel_requested = True
self._update()
[docs] def addStructures(self, structures):
"""
Add more structures to dock. Should call `setDoneAdding` once all
structures have been added.
:param structures: Structures to dock
:type structures: iterable[schrodinger.structure.Structure]
"""
self._structure_queue = itertools.chain(self._structure_queue,
structures)
self._update()
[docs] def setDoneAdding(self):
"""
Call this when done adding structures to a batch; once all structures
have been docked, `batchFinished` will be emitted.
"""
self._done_adding = True
self._update()
def _update(self):
"""
Update the server manager state.
- Docks the next structure if not already docking
- Emits batchFinished if all structures are docked and no more
structures will be added
"""
if self._docking_active:
# Return early if already docking
return
if self._cancel_requested:
self._backend_checker.stopChecking()
self._finishBatch()
return
next_st = next(self._structure_queue, None)
if next_st is not None:
self._batch_size += 1
self.dock(next_st)
elif self._done_adding:
self._finishBatch()
def _finishBatch(self):
self.batchFinished.emit(self._batch_size)
self._resetBatch()
def _resetBatch(self):
self._batch_size = 0
self._structure_queue = iter(())
self._done_adding = False
self._cancel_requested = False
@QtCore.pyqtSlot()
def _onDockFinished(self):
"""
When docking finishes, set docking active to False and update state.
"""
self._docking_active = False
self._update()
def _getClient(self):
self._backend_checker.wait()
if not self._backend_checker.ready:
exc = self._backend_checker.error or SERVER_NOT_READY_MESSAGE
self.errorOccurred.emit(str(exc))
self._finishBatch()
return self.client
@property
def client(self):
if not self.config:
raise ValueError('Glide server is not ready')
if self._client is None:
self._client = NonBlockingHTTPClient(host=self.config['host'],
port=self.config['port'])
self._client.posesDocked.connect(self.posesDocked)
self._client.noPosesDocked.connect(self.noPosesDocked)
self._client.errorOccurred.connect(self.errorOccurred)
self._client.finished.connect(self._onDockFinished)
return self._client
[docs]class NonBlockingBackendChecker(QtCore.QObject):
"""
Class to get an GlideServerManager object to the ready state without blocking
the GUI.
:ivar ready: Whether the backend is ready
"""
[docs] def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.ready = False
self.error = None
self._backend = None
self._timer = QtCore.QTimer()
self._timer.setInterval(2000)
self._timer.timeout.connect(self._checkBackend)
self._event_loop = QtCore.QEventLoop()
[docs] def wait(self, timeout=MAX_WAIT):
"""
Block python execution (but not the GUI) until the backend is ready or
the timeout is reached.
:param timeout: Timeout in seconds or None to not timeout.
:type timeout: int or NoneType
:return: Whether the backend is ready
"""
if self._backend is None:
raise RuntimeError("Must call `setBackend` before `wait`")
self.ready = False
self.error = None
# Check once and return early if it's already ready or an exception
# occurred
self._checkBackend()
if self.ready or self.error is not None:
return self.ready
# Otherwise, check on a timer
if timeout == 0:
self.onServerTimeout()
return self.ready
elif timeout is not None:
QtCore.QTimer.singleShot(timeout * 1000, self.onServerTimeout)
self._timer.start()
self._event_loop.exec()
return self.ready
[docs] def setBackend(self, backend):
"""
Set the backend object to call isReady on
:type backend: GlideServerManager
"""
self.stopChecking()
self._backend = backend
self.ready = False
[docs] def stopChecking(self):
"""
Stop checking whether the backend is ready
"""
self._timer.stop()
self._event_loop.quit()
[docs] @QtCore.pyqtSlot()
def onServerTimeout(self, *, msg=None):
if msg is None:
msg = "Server was not ready in time"
self.error = ServerTimedOutError(msg)
self.stopChecking()
def _checkBackend(self):
try:
self.ready = self._backend.isReady()
except jobcontrol.JobLaunchFailure as exc:
self.error = exc
self.stopChecking()
if self.ready:
self.stopChecking()
else:
job = self._backend.job
if job and job.Status in ('finished', 'died'):
if job.Status == 'finished':
msg = SERVER_FINISHED_MESSAGE
else:
msg = "Server died"
self.onServerTimeout(msg=msg)
[docs]class ZmqClient(QtCore.QObject):
"""
Connect to a Glide ZMQ driver and dock ligands synchronously.
A Glide ZMQ driver is a Glide job launched with the -mq and -server flags.
:ivar posesDocked: Signal emitted when a ligand finishes docking. Emitted
with a list of pose structures.
:ivar noPosesDocked: Signal emitted when a ligand goes through the docking
workflow, but does not return any valid poses
:ivar noMoreLigs: Signal emitted when there are no more ligands left to
dock in the server's queue.
:ivar gotStatus: Signal emitted when the server sends a status message.
Emitted with a dict with status information. Current keys
include "elapsed_time", "ligs_done", "poses_stored",
"ligs_per_sec", "lps_per_worker", "active_workers",
"idle_workers", and "lost_workers".
"""
posesDocked = QtCore.pyqtSignal(list)
noPosesDocked = QtCore.pyqtSignal()
noMoreLigs = QtCore.pyqtSignal()
gotStatus = QtCore.pyqtSignal(dict)
[docs] def __init__(self, url):
"""
:param url: URL of server to connect to.
:type url: str
"""
super().__init__()
self._lignum = 0
context = zmq.Context.instance()
self._sock = context.socket(zmq.DEALER)
self._sock.connect(url)
fd = self._sock.getsockopt(zmq.FD)
self._notifier = QtCore.QSocketNotifier(fd, QtCore.QSocketNotifier.Read,
self)
self._notifier.activated.connect(self._getMessages)
# For some reason, I can't get the QSocketNotifier above to work
# reliably; when I test this class by itself it works, but hooked up to
# the server manager / ligand designer, I don't get any notifications.
#
# Perhaps the raw socket by this point already has some readable data
# due to low-level ZMQ traffic, and the notifer only notifies about the
# transition from "no data" to "data", and not from "data" to "more
# data"?
#
# Polling the ZMQ socket here sometimes seemed to clear that up, but
# even that was not reliable enough, so to be safe, we'll check for
# incoming messages periodically using a timer.
self._timer = QtCore.QTimer(self)
self._timer.setInterval(1000)
self._timer.timeout.connect(self._getMessages)
self._timer.start()
[docs] def dock(self, st, lignum=None):
"""
Send a ligand to the server to dock. Poses are returned asynchronously
via the posesDocked and noPosesDocked signals.
:param st: Structure to dock
:type st: schrodinger.structure.Structure
:param lignum: ligand index (stored by Glide in the i_i_glide_lignum
property). It can be used to tell which ligand is which,
because the poses are not in general returned in the same
order in which the ligands are sent to the server.
If not provided, a sequential number will be used. Note
that unique numbers should be used, because lignum is
used as a key by the server!
:type lignum: int or NoneType
"""
if lignum:
self._lignum = lignum
else:
self._lignum += 1
blob = st.writeToString(structure.MAESTRO).encode('utf-8')
msg = {'cmd': 'LIGS', 'ligs': [(self._lignum, blob)]}
self._sock.send_pyobj(msg)
@QtCore.pyqtSlot()
def _getMessages(self):
"""
Receive and process any messages from the ZMQ socket.
"""
while self._sock.poll(1):
msg = self._sock.recv_pyobj()
if msg['cmd'] == 'POSES':
poses = [st_from_blob(blob) for blob in msg['blobs']]
if poses:
self.posesDocked.emit(poses)
else:
self.noPosesDocked.emit()
if msg['queue_len'] == 0:
self.noMoreLigs.emit()
elif msg['cmd'] == 'INFO':
self.gotStatus.emit(msg['status'])
[docs] def shutdown_server(self):
self._sock.send_pyobj({'cmd': 'STOP'})
[docs] def cancel(self):
self._sock.send_pyobj({'cmd': 'CANCEL'})
[docs] def getStatus(self):
"""
Request a status update from the server. When the response comes, the
gotStatus signal is emitted with a dict containg status information.
"""
self._sock.send_pyobj({'cmd': 'STATUS'})
[docs] def disconnect(self):
"""
Disconnect the ZMQ socket and stop listening.
"""
self._notifier.setEnabled(False)
self._timer.stop()
self._sock.close()
[docs]class NonBlockingGlideServerManagerZmq(AbstractGlideServerManager,
QtCore.QObject):
"""
Launch and use Glide ZMQ server without blocking. The server and workers
are run on localhost, but future versions might relax this restriction.
This class is meant as a drop-in replacement for
NonBlockingGlideServerManager, but there are a couple of subtle differences
in behavior: 1) when docking a batch, all ligands are sent immediately to
the server, which keeps its own queue. This could be a problem when feeding
an infinite iterable to dockBatch(). 2) cancelBatch() causes any currently
docking ligands to be aborted, unlike NonBlockingGlideServerManager which
waits for the current ligand to complete.
:ivar posesDocked: Signal emitted when a ligand finishes docking. Emitted
with a list of pose structures.
:ivar noPosesDocked: Signal emitted when a ligand goes through the docking
workflow, but does not return any valid poses
:ivar errorOccurred: Signal emitted when there is an error communicating
with the server.
:ivar batchFinished: Signal emitted when a docking batch finishes. Emitted
with the number of ligands that were docked.
:ivar gotStatus: Signal emitted when the server sends a status message.
Emitted with a dict with status information. Current keys
include "elapsed_time", "ligs_done", "poses_stored",
"ligs_per_sec", "lps_per_worker", "active_workers",
"idle_workers", and "lost_workers".
"""
config_ext = '_zmq.json'
posesDocked = QtCore.pyqtSignal(list)
noPosesDocked = QtCore.pyqtSignal()
batchFinished = QtCore.pyqtSignal(int)
errorOccurred = QtCore.pyqtSignal(str)
gotStatus = QtCore.pyqtSignal(dict)
[docs] def __init__(self, *args, nworkers=2, **kwargs):
"""
Most arguments are passed through to GlideServerManager; see that class
for documentation. Arguments specific to this class:
:param nworkers: number of workers to launch.
:type nworkers: int
"""
super().__init__(*args, **kwargs)
self.nworkers = nworkers
self._client = None
self._backend_checker = NonBlockingBackendChecker()
self._backend_checker.setBackend(self)
self._resetBatch()
server_keywords = {**self.keywords, 'JOBNAME': self.jobname}
self.glide_job = glide.Dock(server_keywords)
[docs] def stop(self, *args, **kwargs):
self._backend_checker.stopChecking()
super().stop(*args, **kwargs)
if self._client is not None:
self._client.disconnect()
[docs] def dock(self, st):
client = self._getClient()
client.dock(st)
self._sent += 1
[docs] def dockBatch(self, structures, done_adding=True, reset=True):
"""
Set a batch of structures to dock.
:param structures: Structures to dock
:type structures: iterable[schrodinger.structure.Structure]
:param bool done_adding: Whether to emit batchFinished when all sts in
`structures` finish docking. Pass False if structures is a
generator that can have more structures added to it at runtime.
:param bool reset: Whether to reset the batch before adding. Pass
False to expand the current batch instead of starting a new one.
"""
if reset:
self._resetBatch()
for st in structures:
self.dock(st)
if done_adding:
self.setDoneAdding()
[docs] def cancelBatch(self):
if self.config:
self.client.cancel()
self._finishBatch()
[docs] def addStructures(self, structures):
"""
Add more structures to dock. Should call `setDoneAdding` once all
structures have been added.
:param structures: Structures to dock
:type structures: iterable[schrodinger.structure.Structure]
"""
self._done_adding = False
self.dockBatch(structures, done_adding=False, reset=False)
[docs] def setDoneAdding(self):
"""
Call this when done adding structures to a batch; once all structures
have been docked, `batchFinished` will be emitted.
"""
self._done_adding = True
if self._sent == self._received:
self._finishBatch()
@property
def client(self):
if not self.config:
raise ValueError('Glide server is not ready')
if self._client is None:
self._client = ZmqClient(self.config['url'])
self._client.posesDocked.connect(self._onPosesDocked)
self._client.noPosesDocked.connect(self._onNoPosesDocked)
self._client.noMoreLigs.connect(self._onNoMoreLigs)
self._client.gotStatus.connect(self.gotStatus)
return self._client
[docs] def getStatus(self):
client = self._getClient()
client.getStatus()
def _getClient(self):
self._backend_checker.wait()
if not self._backend_checker.ready:
exc = self._backend_checker.error or SERVER_NOT_READY_MESSAGE
self.errorOccurred.emit(str(exc))
self._finishBatch()
return self.client
def _finishBatch(self):
self.batchFinished.emit(self._received)
self._resetBatch()
def _resetBatch(self):
self._sent = 0
self._received = 0
self._done_adding = False
def _setupJob(self):
"""
Write the input file and return the command line arguments which are
used whether launching the driver under job control or not.
"""
infile = os.path.join(self.jobdir, self.jobname + '.in')
self.glide_job.writeSimplified(infile)
cmd = [
'-HOST', f'localhost:{self.nworkers}', '-NJOBS',
str(self.nworkers), '-server', '-mq', '-bind_local', infile
]
return cmd
[docs] def start(self, *args, **kwargs):
with fileutils.chdir(self.jobdir):
return super().start(*args, **kwargs)
def _startJC(self):
common_args = self._setupJob()
cmd = [GLIDE] + common_args
logger.debug("Launching: %s", ' '.join(cmd))
self.job = jobcontrol.launch_job(cmd)
def _startNoJC(self):
common_args = self._setupJob()
cmd = [RUN, '-FROM', 'glide', 'glide_driver.py'] + common_args
logger.debug("Launching: %s", ' '.join(cmd))
logfile = os.path.join(self.jobdir, self.jobname + '.log')
with open(logfile, 'w') as fh:
self.job = SubprocessJobAdapter(cmd, fh)
@QtCore.pyqtSlot(list)
def _onPosesDocked(self, poses):
self._received += 1
self.posesDocked.emit(poses)
@QtCore.pyqtSlot()
def _onNoPosesDocked(self):
self._received += 1
self.noPosesDocked.emit()
@QtCore.pyqtSlot()
def _onNoMoreLigs(self):
if self._done_adding:
self._finishBatch()
[docs]def st_from_blob(blob):
"""
Return a Structure from a compressed blob.
:param blob: compressed m2io representation of a structure
:type blob: bytes
:return: Structure
:rtype: schrodinger.structure.Structure
"""
s = zlib.decompress(blob).decode('utf-8')
return next(structure.StructureReader.fromString(s))
if __name__ == '__main__':
# for standalone testing, we'll read a ligand file, POST it to the server
# one ligand at a time, and write the poses we get back to an output file.
if len(sys.argv) != 5:
sys.exit("Usage: %s <host> <port> <ligfile> <output-posefile>" %
sys.argv[0])
host, port, infile, outfile = sys.argv[1:]
client = HTTPClient(host=host, port=port)
writer = structure.StructureWriter(outfile)
for ct in structure.StructureReader(infile):
print("docking ligand: %s" % ct.title)
poses = client.dock(ct)
for pose in poses:
gscore = 0.0
try:
gscore = pose.property['r_i_glide_gscore']
except KeyError:
pass
print(" writing pose; gscore=%f" % gscore)
writer.append(pose)