"""
Contains widgets that are useful in MatSci desmond panels.
Copyright Schrodinger, LLC. All rights reserved.
"""
import os
import shutil
from types import SimpleNamespace
import schrodinger
from schrodinger.application.desmond import cms
from schrodinger.application.matsci import appbase
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import codeutils
from schrodinger.application.matsci import msprops
from schrodinger.application.matsci.desmondutils import cms_writer
from schrodinger.infra import mm
from schrodinger.project import utils as pt_utils
from schrodinger.infra import canvas2d
from schrodinger.ui.qt import swidgets
from schrodinger.ui.qt.appframework2 import af2
from schrodinger.utils import fileutils
traj = codeutils.get_safe_package('desmond.traj')
maestro = schrodinger.get_maestro()
[docs]def get_cms_and_trj_path(maestro_struct, source_path=None):
"""
Get trajectory and cms path for the passed structure in maestro
:param maestro_struct `schrodinger.structure.Structure`: Structure to get
associated cms and trajectory of
:param source_path str: Path to source cms and trj folder in case they do
not exist in the working folder. Structure source path will be used
in case it is not passed.
:return str,str: Path to the cms file trajectory frames
if both are found.
:raise ValueError: Raises error if the cms file or trajectory is not found
"""
# Set source path
if not source_path:
source_path = jobutils.get_source_path(maestro_struct)
# Get CMS path
original_cms_prop = mm.M2IO_DATA_ORIGINAL_CMS_FILE
entry_id = maestro_struct.property.get(msprops.ENTRY_ID_PROP)
from_workspace = entry_id is not None
# Get Project Table
if from_workspace:
prj_table = pt_utils.get_PT()
from_workspace &= prj_table is not None
# Get Row
if from_workspace:
try:
row = prj_table[entry_id]
from_workspace = True
except KeyError:
# Row not present
from_workspace = False
# Get cms and trj from row
if from_workspace:
cms_file = row.cms_file
trj_dir = row[msprops.TRAJECTORY_FILE_PROP]
from_workspace &= bool(cms_file) & bool(trj_dir)
# Load cms and trajectory file using structure property and its source path
if not from_workspace:
cms_file = maestro_struct.property.get(original_cms_prop)
trj_dir = maestro_struct.property.get(msprops.TRAJECTORY_FILE_PROP)
# Check if cms and trj properties were found
if cms_file is None:
raise ValueError(f'The "{original_cms_prop}" property is not '
'defined in the model system.')
if trj_dir is None:
raise ValueError(
f'The "{msprops.TRAJECTORY_FILE_PROP}" property is not '
'defined in the model system.')
if not os.path.exists(cms_file) and from_workspace:
# Attempt to find it in the original job (sub)directory rather than
# whether Maestro thinks it stored a copy of the cms file.
second_try = jobutils.get_file_path(row, original_cms_prop)
if second_try:
cms_file = second_try
if not os.path.exists(cms_file):
# Attempt to find cms path using source path
cms_file = os.path.basename(cms_file)
cms_file = os.path.join(source_path, cms_file)
if not os.path.exists(cms_file):
raise ValueError(f'The cms file {cms_file} does not exist.')
# Check trajectory path
if not os.path.exists(trj_dir):
trj_dir = os.path.basename(trj_dir)
trj_dir = os.path.join(source_path, trj_dir)
if not os.path.exists(trj_dir):
raise ValueError(f'The trajectory {trj_dir} does not exist.')
# Try loading trajectory path
try:
trj = traj.read_traj(trj_dir)
except Exception as msg_error:
msg = "Cannot load trajectory: %s" % msg_error
raise ValueError(msg)
return cms_file, trj_dir
[docs]class FrameSpinBox(swidgets.SSpinBox):
"""
A spin box to change and set the frames for the trajectory
"""
[docs] def __init__(self, layout, commands=None):
"""
Create a FrameSpinBox instance
:type layout: QLayout
:param layout: If supplied, the FrameSpinBox created will be added to this layout
:type commands: list
:param commands: The list of callbacks for the valueChanged signal.
"""
swidgets.SSpinBox.__init__(self, parent=None, layout=layout)
self.setMinimumWidth(80)
# Connecting signals to slots
if commands:
for command in commands:
self.valueChanged.connect(command)
[docs]class TrajRangeSelectorDialog(swidgets.SDialog):
"""
Trajectory range selector dialog with fixed labels, a spinbox for step, and no overlapping of
sliders
"""
DIALOG_SIZE = (490, 160)
[docs] def __init__(self, master):
"""
Create a TrajRangeSelectorDialog instance
"""
swidgets.SDialog.__init__(self, master, title='Trajectory Frames')
self.setMaximumSize(*self.DIALOG_SIZE)
self.setMinimumSize(*self.DIALOG_SIZE)
[docs] def layOut(self):
"""
Layout the widgets in the Dialog
"""
layout = self.mylayout
# Horizontal layout to add trajectory range selection widget
selection_layout = swidgets.SHBoxLayout(layout=layout)
# Slider to adjust the start and end limit of frames
self.slider = canvas2d.ChmDoubleSlider()
# SpinBox to set the start limit of frames
self.start_sb = FrameSpinBox(
layout=selection_layout,
commands=[self.slider.leftPos, self.changeLimitsEnd])
selection_layout.addWidget(self.slider)
self.slider.setRange(0.0, 100.0)
# SpinBox to set the end limit of frames
self.end_sb = FrameSpinBox(
layout=selection_layout,
commands=[self.slider.rightPos, self.changeLimitsStart])
self.step_sb = swidgets.SLabeledSpinBox('Step size:',
minimum=1,
value=1,
tip=parserutils.TRJ_STEP_HELP,
layout=selection_layout,
command=self.updateRangeLabel)
self.label = swidgets.SLabel('', layout=layout)
layout.addSpacing(25)
self.traj_range = SimpleNamespace(initial_min=0,
initial_max=100.0,
temp_min=0,
temp_max=100.0,
step_size=0.1)
# Connecting signals to the slots
self.slider.valuesChanged.connect(self.slidersMoved)
[docs] def updateRangeLabel(self):
"""
Function to change label when spinbox values are changed or slider is moved
"""
minT = self.start_sb.value() * self.traj_range.step_size
maxT = self.end_sb.value() * self.traj_range.step_size
self.time_range = "%.2f - %.2f ns" % (minT, maxT)
self.label.setText(self.time_range)
frames = list(map(str, self.getFrames()))
if len(frames) > 6:
frames = frames[:3] + ['... '] + frames[-3:]
interval = self.getStep() * self.traj_range.step_size
text = '\nFrames: ' + ', '.join(frames)
text += f'\n\nTime: {self.time_range} at every {interval:.2f} ns'
self.label.setText(text)
[docs] def slidersMoved(self):
"""
Sets values in spinboxes and label when the sliders are moved
"""
start_pos = round(self.slider.leftPos())
end_pos = round(self.slider.rightPos())
self.start_sb.setRange(self.traj_range.initial_min, end_pos)
self.start_sb.setValue(start_pos)
self.end_sb.setRange(start_pos, self.traj_range.initial_max)
self.end_sb.setValue(end_pos)
self.updateRangeLabel()
[docs] def reset(self):
"""
Reset the widgets
"""
self.step_sb.reset()
self.step_val = 1
[docs] def getStep(self):
"""
Get the trajectory step
:return int: The step
"""
return self.step_sb.value()
[docs] def getFrames(self):
"""
Get the frame numbers based on the range and step
:return list: list of frames numbers
"""
start_val = self.start_sb.value()
end_val = self.end_sb.value()
step = self.getStep()
frames = list(range(start_val, end_val, step)) + [end_val]
return frames
[docs] def accept(self):
"""
Save the step value in addition to what parent does
"""
self.setTempTrajRange()
self.step_val = self.getStep()
super().accept()
[docs] def reject(self):
"""
Restore step value in addition to what parent does
"""
self.setTempRangeToSlider(self.traj_range)
self.step_sb.setValue(self.step_val)
super().reject()
[docs] def setTrajLimits(self, min_f, max_f, step_size):
"""
Set the limits for trajectory range
:type min_f: float
:param min_f: minimum value of the trajectory frames
:type max_f: float
:param max_f: maximum value of the trajectory frames
:type step_size: float
:param step_size: step size of the frames
"""
self.step_sb.setMaximum(max_f)
self.slider.setRange(min_f, max_f)
self.traj_range = SimpleNamespace(initial_min=min_f,
initial_max=max_f,
temp_min=min_f,
temp_max=max_f,
step_size=step_size)
self.slidersMoved()
self.update()
[docs] def setTempRangeToSlider(self, traj_range):
"""
Set temporary trajectory range values to the range selection widgets
:type traj_range: SimpleNamespace
:param traj_range: set of values for the minimum and maximum number of frames
and the step size.
"""
self.slider.leftPos(traj_range.temp_min)
self.slider.rightPos(traj_range.temp_max)
self.slidersMoved()
[docs] def setTempTrajRange(self):
"""
Read values from widgets and store the temporary range value in traj_range property
"""
self.traj_range.temp_min = self.start_sb.value()
self.traj_range.temp_max = self.end_sb.value()
[docs] def changeLimitsEnd(self):
"""
Change limits of end time spin box to reflect the left slider position
"""
lpos = self.slider.leftPos()
self.end_sb.setRange(lpos, self.traj_range.initial_max)
self.updateRangeLabel()
[docs] def changeLimitsStart(self):
"""
Change limits of start time spin box to reflect the right slider position
"""
rpos = self.slider.rightPos()
self.start_sb.setRange(self.traj_range.initial_min, rpos)
self.updateRangeLabel()
[docs] def getRangeLabel(self):
"""
This function returns text string showing trajectory
range in ns.
:return: range string
:rtype: str
"""
return self.label.text()
[docs]class TrajRangeSelectorFrame(swidgets.SFrame):
"""
Frame that adds trajectory selection button and label. It is connected to
a dialog to select the trajectory range
"""
[docs] def __init__(self, layout):
"""
:param `QLayout` layout: layout to add the button to
"""
super().__init__(layout=layout)
trj_layout = swidgets.SHBoxLayout(layout=self.mylayout)
self.traj_btn = swidgets.SPushButton(
'Trajectory Frames...',
layout=trj_layout,
command=self.showTrajRangeSelectorDialog)
self.traj_label = swidgets.SLabel('', layout=trj_layout)
trj_layout.addStretch()
self.traj_dialog = TrajRangeSelectorDialog(self)
self.traj_dialog.dbb.accepted.connect(self.updateTrajectoryRangeLabel)
[docs] def updateTrajectoryRangeLabel(self):
"""
Update the trajectory range label with new range from traj dialog and
correct the text
"""
time_range = self.traj_dialog.time_range
num_frames = len(self.traj_dialog.getFrames())
self.traj_label.setText(f'{num_frames} frames, {time_range}')
[docs] def showTrajRangeSelectorDialog(self):
"""
Show traj dialog
"""
self.traj_dialog.exec_()
[docs] def updateTrj(self, trj_path):
"""
Load new trajectory range
:param str trj_path: path to the trajectory.
:return bool: True if trajectory range was loaded, else False
"""
try:
trj = traj.read_traj(trj_path)
except Exception:
return False
nframes = len(trj)
time_sec = [fr.time / 1000.0 for fr in trj]
self.traj_dialog.setTrajLimits(0, nframes - 1,
time_sec[1] - time_sec[0])
self.updateTrajectoryRangeLabel()
return True
[docs] def getFlags(self):
"""
Get flags for current selected trajectory range
"""
start_val, end_val = self.getRange()
flags = []
flags += [parserutils.FLAG_TRJ_MIN, str(start_val)]
flags += [parserutils.FLAG_TRJ_MAX, str(end_val)]
# Only add step if non-default
step = self.traj_dialog.getStep()
if step != 1:
flags += [parserutils.FLAG_TRJ_STEP, str(step)]
return flags
[docs] def getRange(self):
"""
Get current range of frames selected
:rtype: tuple(int)
:return: lower limit and upper limit of the selected range of frames
"""
start_val = self.traj_dialog.start_sb.value()
end_val = self.traj_dialog.end_sb.value()
return start_val, end_val
[docs] def setRange(self, start_val=None, end_val=None):
"""
Set the frame range (in number and not time) for trajectory selection
:type start_val: int
:param start_val: lower limit for frame in trajectory selection
:type end_val: int
:param end_val: upper limit for frame in trajectory selection
"""
# Set lower limit
if start_val is not None:
self.traj_dialog.start_sb.setValue(start_val)
# Set upper limit
if end_val is not None:
self.traj_dialog.end_sb.setValue(end_val)
# Update the dialog
self.traj_dialog.setTempTrajRange()
self.updateTrajectoryRangeLabel()
[docs] def setEnabled(self, state):
"""
Enable or disable the button and label for showing the traj dialog
:param bool state: True to enable the button, and False to disable
"""
self.traj_btn.setEnabled(state)
self.traj_label.setEnabled(state)
[docs] def reset(self):
"""
Reset the frame
"""
self.traj_dialog.reset()
self.traj_label.setText('')
[docs]class TrajAnalysisGuiMixin(appbase.BaseAnalysisGui):
"""
Class for extension of af2 to add widgets to gui for desmond trajectory
analysis.
"""
def _importTrjFile(self):
"""
Load the workspace file and get the trajectory path and cms file from
the structure.
:return `schrodinger.structure.Structure`: Structure that was imported
"""
self.resetPanel()
struct = self.getIncludedEntry()
if struct is None:
return
try:
cms_file, trj_dir = get_cms_and_trj_path(struct)
except ValueError as err:
self.error(str(err))
return
if not self.allow_gcmc:
cms_model = cms.Cms(cms_file)
if cms_model.is_for_gcmc:
self.error(
'The input Desmond model is for Grand-Canonical Monte '
'Carlo, which is not supported by this feature.')
return
result = self.trj_range_selector.updateTrj(trj_dir)
if not result:
return
self.cms_file = cms_file
self.trj_path = trj_dir
self.struct = struct
# Do any extra setup steps if necessary
if self.setup_method:
if self.setup_method() is False:
# Setup failed. Reset the panel to clear the changes.
self.resetPanel()
return
self.toggleStateMain(True)
return struct
[docs] def resetPanel(self):
"""
Reset the panel variables and widgets set by this mixin
:raise NotImplementedError: Will raise error if load button is not added
"""
try:
self.trj_range_selector
except AttributeError:
raise NotImplementedError('addLoadTrajButton not called.')
super().resetPanel()
self.cms_file = None
self.trj_path = None
self.struct = None
self.trj_range_selector.reset()
[docs] def toggleStateMain(self, state):
"""
Enable and disable trajectory traj button and label
:param bool state: True to enable the button, and False to disable
"""
self.trj_range_selector.setEnabled(state)
[docs] def getTrajFlags(self):
"""
Get the command line flags for cms file, trajectory path, and trajectory
range.
:return list: A list of command line flags and values
"""
flags = []
jobname = self.jobname()
# Get cms extension
cms_ext = fileutils.get_file_extension(self.cms_file)
cms_name = jobname + cms_ext
shutil.copy2(self.cms_file, cms_name)
# Get trj extension
trj_ext = traj.get_trajectory_extname(ref_fname=self.trj_path)
trj_name = jobname + trj_ext
copy_func = (shutil.copytree
if os.path.isdir(self.trj_path) else shutil.copy2)
copy_func(self.trj_path, trj_name)
# Associate trajectory to cms and write it
with cms_writer(cms_name) as model:
model.set_cts_property(msprops.TRAJECTORY_FILE_PROP, trj_name)
flags += [parserutils.FLAG_CMS_PATH, cms_name]
flags += [parserutils.FLAG_TRJ_PATH, trj_name]
flags += self.trj_range_selector.getFlags()
return flags
@af2.validator(-1000)
def validateCMS(self):
"""
Check if valid structure has been loaded.
:rtype: bool or (bool, str)
:return: The bool is True if file is loaded. False with message pop up
if file is not loaded.
"""
return self._validateCMS()
def _validateCMS(self):
"""
Check if valid structure has been loaded.
This is a separate method as jobtasks.preprocessor does not work with a
function that returns an af2 validation object. Also it can be used
independently of any decorator.
:rtype: bool or (bool, str)
:return: The bool is True if file is loaded. False with message pop up
if file is not loaded.
"""
if not hasattr(self, 'cms_file') or self.cms_file is None:
return (False, 'No structure loaded.')
return True