import os
import tempfile
import schrodinger
import schrodinger.protein._reliability as structure_reliability
from schrodinger import project
from schrodinger import structure
from schrodinger.infra import mm
from schrodinger.protein import reliability as prot_reliability
from schrodinger.Qt import QtCore
from schrodinger.Qt import QtGui
from schrodinger.Qt import QtWidgets
from schrodinger.Qt.QtCore import Qt
from schrodinger.structutils import analyze
from schrodinger.ui.qt import widgetmixins
from schrodinger.ui.qt.protein_health_viewer import ProteinHealthViewer
from schrodinger.ui.qt.standard.icons import icons
from schrodinger.utils import fileutils
from schrodinger.utils import subprocess
from . import constants
maestro = schrodinger.get_maestro()
ENTRY_ID_KEY = 's_fepmapper_entryid'
NO_CACHE_STRING = 'No cache string'
VACUUM_OPTION = '-vacuum'
FFBUILDER_ARG = '-ffbuilder'
FFBUILDER_HOST_ARG = '-ff-host'
FFBUILDER_HOST_SETTING = 'ffb_host'
FFBUILDER_SUBJOBS_SETTING = 'max_ffb_subjobs'
[docs]def run_protein_reliability(protein_file):
"""
Takes a protein ct and returns a message indicating problems, if any.
WARNING: This function runs *very* slowly (i.e., easily around 20 seconds).
:param protein_file: A protein to assess
:type protein_file: `schrodinger.structure.Structure`
:return: A tuple that contains a message indicating any problems with the
protein and model that can be used to bring up protein
reliability panel.
:rtype: tuple
"""
outputfile, popen, reportfile = launch_protein_reliability(protein_file)
_, std_err = popen.communicate()
msg, model = read_protein_reliability(popen, std_err, reportfile,
outputfile)
return msg, model
[docs]def read_protein_reliability(p, std_err, reportfile, outputfile):
if p.returncode != 0:
msg = ("ERROR: Failed to run protein reliability report.\n\n"
"Error output:\n%s" % std_err)
return msg, None
# generate message that will be used as a tooltip
prot_probs = []
with open(reportfile, 'r') as fh:
for line in fh:
prot_probs.append(line.strip())
msg = ""
if prot_probs:
PROT_PROBS_SEPARATOR = '\n - '
msg = "Potential receptor issues:" + PROT_PROBS_SEPARATOR
msg += PROT_PROBS_SEPARATOR.join(prot_probs)
# store model that will be used to show protein reliability report
protein = structure.Structure.read(outputfile)
model = structure_reliability.ModelCheck(protein, do_calc=False)
fileutils.force_remove(reportfile, outputfile)
return msg, model
[docs]def launch_protein_reliability(protein_file):
# Running in a separate process in order to prevent blocking of the
# GUI, see PANEL-2976
run = os.path.join(os.environ['SCHRODINGER'], 'run')
script = prot_reliability.__file__
reportfile = tempfile.mktemp('.txt', 'prot_probs_')
outputfile = tempfile.mktemp('.mae', 'prot_rel_')
popen = subprocess.Popen(
[run, script, protein_file, reportfile, outputfile],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
return outputfile, popen, reportfile
[docs]def is_prepped(protein):
"""
Check if a protein has been prepared in Protein PrepWizard
:param protein: protein to check
:type protein: protein: `schrodinger.structure.Structure`
"""
return protein.property.get('b_ppw_prepared', False)
[docs]class ProteinProcessManager(object):
"""
A lightweight thread that spawns the protein reliability process to run in
the background.
"""
[docs] def __init__(self, protein_file):
self.protein_file = protein_file
self.message = ''
self.model = None
self.popen = None
self._is_running = False
[docs] def start(self):
outputfile, popen, reportfile = launch_protein_reliability(
self.protein_file)
self.outputfile, self.reportfile = outputfile, reportfile
self.popen = popen
self._is_running = True
[docs] def poll(self):
retcode = self.popen.poll()
if retcode is not None:
_, std_err = self.popen.communicate()
self.message, self.model = read_protein_reliability(
self.popen, std_err, self.reportfile, self.outputfile)
self._is_running = False
finished = not self._is_running
return finished
[docs] def terminate(self):
if self.popen is not None:
# subprocess needs to be terminated or it will be stranded
self.popen.terminate()
if self.outputfile and self.reportfile:
fileutils.force_remove(self.reportfile, self.outputfile)
self._is_running = False
[docs] def isRunning(self):
return self._is_running
[docs] def results(self):
return self.message, self.model
[docs]class LabelSpinner(QtWidgets.QLabel):
"""
A label that can replace its icon with a spinning progress icon.
"""
DEFAULT_ICON_HEIGHT = 10
ANIMATION_TIME = 250
animationTimeElapsed = QtCore.pyqtSignal()
[docs] def __init__(self, text=None, parent=None):
super(LabelSpinner, self).__init__(text, parent)
self.updatePermanentPic()
self.updateAnimationSize()
self.current_num = 0
self.timer = QtCore.QTimer()
self.timer.setInterval(self.ANIMATION_TIME)
self.timer.timeout.connect(self.animationTimeElapsed)
self.animationTimeElapsed.connect(self.updateAnimation)
[docs] def updateAnimationSize(self):
"""
Set the size of the spinner animation to the height of the current
permanent icon, or else some default value if it is not available.
"""
if self.permanent_pic:
height = self.permanent_pic.height()
else:
height = self.DEFAULT_ICON_HEIGHT
# Generate the animation frames
path_template = ":/images/toolbuttons/jobrunning/{0}.png"
icons = [QtGui.QIcon(path_template.format(num)) for num in range(1, 9)]
self.pics = [icon.pixmap(height, height) for icon in icons]
[docs] def updatePermanentPic(self):
"""
Set the permenent icon picture to the current pixmap value and update
the size of the spinner animation to match.
"""
if self.pixmap():
self.permanent_pic = self.pixmap().copy()
else:
self.permanent_pic = None
self.updateAnimationSize()
[docs] def setPixmap(self, pixmap):
"""
Set a new pixmap, and update the spinner animation frames in response.
The `setTemporaryPixmap()` method should be called internally when
setting a temporary pixmap for the spinner animation frames and other
temporary icon images.
:param pixmap: the icon to be displayed by this label
:type pixmap: QtGui.QPixmap
"""
super(LabelSpinner, self).setPixmap(pixmap)
self.updatePermanentPic()
[docs] def setTemporaryPixmap(self, pixmap):
"""
Set the displayed pixmap without overwriting the "permanent" cached
pixmap value or altering the size of the animation frames. Meant to be
used to display animation frames or other temporary icon images.
:param pixmap: the icon to be displayed by this label
:type pixmap: QtGui.QPixmap
"""
super(LabelSpinner, self).setPixmap(pixmap)
[docs] def updateAnimation(self):
"""
Advance the animation by one frame. This method gets called periodically
while the spinner is running.
"""
pixmap = self.pics[self.current_num]
self.current_num += 1
self.current_num %= len(self.pics)
self.setTemporaryPixmap(pixmap)
[docs] def start(self):
"""
Start the spinner animation.
"""
self.current_num = 0
self.updateAnimation()
self.timer.start()
[docs] def stop(self):
"""
Stop the spinner animation and restore the original icon.
"""
self.timer.stop()
if self.permanent_pic:
self.setTemporaryPixmap(self.permanent_pic)
[docs]def truncate_label(label, label_string, max_chars=40):
"""
Sets the text on a label to label_string, truncating the text if necessary
and setting the full text in a tooltip, if the text has been truncated.
The original tooltip text will be returned.
:param label: the label to modify
:type label: QtWidgets.QLabel
:param label_string: the text for the label
:type label_string: str
:param max_chars: maximum number of characters in the label text
:type max_chars: int
:return: the original tooltip text
:rtype: str
"""
original_tooltip = label.toolTip()
new_tooltip = ""
if len(label_string) > max_chars:
cutoff = max_chars - 3
new_tooltip = label_string
label_string = label_string[:cutoff] + "..."
label.setText(label_string)
if new_tooltip:
label.setToolTip(new_tooltip)
return original_tooltip
[docs]def get_protein_label_string(ct):
"""
Returns a string for use in GUI to indicate the protein. Returns the
title if available; otherwise returns a short description. If called with
None, returns "--".
:param ct: the protein
:type ct: `schrodinger.structure.Structure`
"""
if not ct:
return '--'
if not ct.title.strip():
plural_chain = "s" if len(ct.chain) > 1 else ""
plural_res = "s" if len(ct.residue) > 1 else ""
ct_info = str(len(ct.chain)) + " chain%s; " % plural_chain
ct_info += str(len(ct.residue)) + " residue%s; " % plural_res
ct_info += str(len(ct.atom)) + " atoms"
return ct_info
return ct.title
[docs]def get_proteins(ct_list):
"""
Iterates through a list of structures and returns only the proteins
:param ct_list: a list of structures
:type ct_list: list
"""
proteins = []
for ct in ct_list:
if analyze.evaluate_asl(ct, "protein"):
proteins.append(ct)
return proteins
[docs]def get_ligands(ct_list):
"""
Iterates through a list of structures and returns only the ligands.
:param ct_list: a list of structures
:type ct_list: list
"""
def peptidic_ligand(ligand):
n_protein_atoms = len(analyze.evaluate_asl(ct, "protein and not a.e H"))
if n_protein_atoms < 50:
return True
return False
ligands = []
for ct in ct_list:
if analyze.evaluate_asl(ct, "ligand") and peptidic_ligand(ct):
ligands.append(ct)
return ligands
[docs]def import_pt_entries():
"""
Imports selected entries from the project table. For convenience,
workspace-included proteins are also imported.
:return: a list of imported structures. Each structure is tagged with a
property 's_fepmapper_entryid' to store the entry id.
:rtype: list
"""
if not maestro:
return []
try:
pt = maestro.project_table_get()
except project.ProjectException:
return []
if not pt.selected_rows:
return []
ct_list = []
entry_id_list = []
# Only proteins are imported via inclusion; ligands must be selected
for row in pt.included_rows:
ct = row.getStructure()
if analyze.evaluate_asl(ct, "protein"):
ct.property[ENTRY_ID_KEY] = row.entry_id
ct_list.append(ct)
entry_id_list.append(row.entry_id)
for row in pt.selected_rows:
ct = row.getStructure()
if row.entry_id in entry_id_list:
# Don't double-import workspace-included proteins
continue
ct.property[ENTRY_ID_KEY] = row.entry_id
ct_list.append(ct)
entry_id_list.append(row.entry_id)
return ct_list
[docs]def import_pv_file(filename):
"""
Imports a list of structures from a structure file. Typically used on PV
files, but can be used on any structure file.
:param filename: the filename
:type filename: str
"""
return [ct for ct in structure.StructureReader(filename)]
[docs]def get_opls_dir_cmd(opls_dir):
"""
Construct and return the cmd for the given OPLS directory.
:param opls_dir: OPLS directory path
:type opls_dir: str
:return: a command list for the OPLS directory
:rtype: list[str]
"""
opls_path = mm.get_archive_path(opls_dir)
return ['-OPLSDIR', opls_path]
[docs]def get_restart_opls_dir(jobname):
"""
Create opls dir command for use in restarting/extending scripts
:param jobname: Jobname
:type jobname: str
:return: a command list for the OPLS directory
:rtype: list[str]
"""
return f'{jobname}-out.opls'
[docs]def make_fep_cmd(
cd_params,
ao,
jobname,
struct_fname,
opt=[], # noqa: M511
opls_dir=None,
use_ffbuilder=False):
"""
Generates an FEP command list based on the specified parameters.
:param cd_params: config dialog parameters
:type cd_params: dict
:param ao: FEP Advanced Options (AO) parameters
:type ao: dict
:param jobname: the jobname
:type jobname: str
:param main_msj_fname: the filename for the main msj file
:type main_msj_fname: str
:param struct_fname: the filename for the input structure
:type struct_fname: str
:param opls_dir: OPLS directory path
:type opls_dir: str or None
:param use_ffbuilder: whether ffb_fep_plus as opposed to fep_plus should
be used in the command as well as in the written start script (only).
:type use_ffbuilder: bool
:return: a command list for launching the job
:rtype: list
"""
cmd = [
f'{os.environ["SCHRODINGER"]}/fep_plus', '-HOST', cd_params['host'],
'-SUBHOST', cd_params['subjob_host'], '-ppj',
str(cd_params['cpus'])
]
if use_ffbuilder:
ffb_host = cd_params[FFBUILDER_HOST_SETTING]
ffb_subjobs = cd_params[FFBUILDER_SUBJOBS_SETTING]
cmd += generate_ffbuilder_options(ffb_host, ffb_subjobs)
maxjob = cd_params.get('maxjobs')
if maxjob:
cmd += ['-maxjob', str(maxjob)]
sim_time = ao['sim_time'] * 1000
if sim_time != 5000.0:
cmd += ['-time', str(sim_time)]
cmd += ['-ensemble', ao['ensemble']]
if ao['random_seed'] != 2014:
cmd += ['-seed', str(ao['random_seed'])]
if ao['buffer_size'] != 5.0:
cmd += ['-buffer', str(ao['buffer_size'])]
if ao.get('add_salt', False):
cmd += ['-salt', str(ao['salt_molarity'])]
if ao['relative_solvation']:
cmd.append(VACUUM_OPTION)
if ao['membrane_equilibration']:
cmd.append('-membrane')
if ao.get('modify_dihe', False):
cmd.append('-modify_dihe')
custom_charge_mode = ao.get('custom_charge', 'assign')
if custom_charge_mode != 'assign':
cmd += ['-custom-charge-mode', ao['custom_charge']]
cmd += ['-lambda_windows', ao[constants.ALIAS_FEP_NUM_LW_DEFAULT]]
cmd += ['-core_hopping_lambda_windows', ao[constants.ALIAS_FEP_NUM_LW_CORE]]
cmd += ['-charged_lambda_windows', ao[constants.ALIAS_FEP_NUM_LW_CHARGE]]
opls_dir_setting = []
if opls_dir:
opls_dir_setting = get_opls_dir_cmd(opls_dir)
cmd += opls_dir_setting
cmd += [
'-JOBNAME',
jobname,
struct_fname,
]
cmd += opt
generate_scripts(cd_params,
jobname,
cmd,
opls_dir_setting,
use_ffbuilder=use_ffbuilder)
return cmd
[docs]def write_script(fname, cmd):
"""
Write the list of commands to a script file and make the file executable.
:param fname: the filename of the script file
:type fname: str
:param cmd: the list of commands
:type cmd: list of str
"""
with open(fname, 'w') as f:
print(subprocess.list2cmdline(cmd), file=f)
st = os.stat(fname)
os.chmod(fname, st.st_mode | 0o111)
[docs]def generate_scripts(cd_params,
jobname,
cmd,
opls_dir_setting,
use_ffbuilder=False):
"""
Write the start, restart and extend scripts.
:param cd_params: the configuration dialog parameters
:type cd_params: dict
:param jobname: the job name
:type jobname: str
:param cmd: the command list
:type cmd: list of str
:param opls_dir_setting: the opls_dir_setting
:type opls_dir_setting: list of str
:param use_ffbuilder: whether ffbuilder arguments should be used
:type use_ffbuilder: bool
"""
# start script
start_cmd = list(cmd)
start_cmd[0] = "$SCHRODINGER/fep_plus"
write_script("%s.sh" % jobname, start_cmd)
vacuum_args = [VACUUM_OPTION] if VACUUM_OPTION in cmd else []
oplsdir_args = []
if opls_dir_setting or use_ffbuilder:
oplsdir_args = ['-OPLSDIR', get_restart_opls_dir(jobname)]
sh_cmd = [
"$SCHRODINGER/fep_plus", "-HOST", cd_params['host'], "-SUBHOST",
cd_params['subjob_host'], '-JOBNAME', jobname, "-checkpoint",
jobname + "-multisim_checkpoint"
] + oplsdir_args + vacuum_args
# restart
restart_cmd = sh_cmd + ["-RESTART"]
write_script("restart_%s.sh" % jobname, restart_cmd)
# extend
extend_cmd = sh_cmd + ["-extend", jobname + ".edge", "-time", "5000.0"]
write_script("extend_%s.sh" % jobname, extend_cmd)
[docs]def generate_ffbuilder_options(ffb_host, ffb_subjobs=0):
"""
Generates command for using FFBuilder in the FEP executable
"""
if ffb_subjobs:
ffb_host += f':{str(ffb_subjobs)}'
return [FFBUILDER_ARG, FFBUILDER_HOST_ARG, ffb_host]