Source code for schrodinger.application.matsci.deswidgets

"""
Contains widgets that are useful in MatSci desmond panels.

Copyright Schrodinger, LLC. All rights reserved.
"""

import os
import shutil
from types import SimpleNamespace

import schrodinger
from schrodinger.application.desmond import cms
from schrodinger.application.matsci import appbase
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import codeutils
from schrodinger.application.matsci import msprops
from schrodinger.application.matsci.desmondutils import cms_writer
from schrodinger.infra import mm
from schrodinger.project import utils as pt_utils
from schrodinger.infra import canvas2d
from schrodinger.ui.qt import swidgets
from schrodinger.ui.qt.appframework2 import af2
from schrodinger.utils import fileutils

traj = codeutils.get_safe_package('desmond.traj')

maestro = schrodinger.get_maestro()


[docs]def get_cms_and_trj_path(maestro_struct, source_path=None): """ Get trajectory and cms path for the passed structure in maestro :param maestro_struct `schrodinger.structure.Structure`: Structure to get associated cms and trajectory of :param source_path str: Path to source cms and trj folder in case they do not exist in the working folder. Structure source path will be used in case it is not passed. :return str,str: Path to the cms file trajectory frames if both are found. :raise ValueError: Raises error if the cms file or trajectory is not found """ # Set source path if not source_path: source_path = jobutils.get_source_path(maestro_struct) # Get CMS path original_cms_prop = mm.M2IO_DATA_ORIGINAL_CMS_FILE entry_id = maestro_struct.property.get(msprops.ENTRY_ID_PROP) from_workspace = entry_id is not None # Get Project Table if from_workspace: prj_table = pt_utils.get_PT() from_workspace &= prj_table is not None # Get Row if from_workspace: try: row = prj_table[entry_id] from_workspace = True except KeyError: # Row not present from_workspace = False # Get cms and trj from row if from_workspace: cms_file = row.cms_file trj_dir = row[msprops.TRAJECTORY_FILE_PROP] from_workspace &= bool(cms_file) & bool(trj_dir) # Load cms and trajectory file using structure property and its source path if not from_workspace: cms_file = maestro_struct.property.get(original_cms_prop) trj_dir = maestro_struct.property.get(msprops.TRAJECTORY_FILE_PROP) # Check if cms and trj properties were found if cms_file is None: raise ValueError(f'The "{original_cms_prop}" property is not ' 'defined in the model system.') if trj_dir is None: raise ValueError( f'The "{msprops.TRAJECTORY_FILE_PROP}" property is not ' 'defined in the model system.') if not os.path.exists(cms_file) and from_workspace: # Attempt to find it in the original job (sub)directory rather than # whether Maestro thinks it stored a copy of the cms file. second_try = jobutils.get_file_path(row, original_cms_prop) if second_try: cms_file = second_try if not os.path.exists(cms_file): # Attempt to find cms path using source path cms_file = os.path.basename(cms_file) cms_file = os.path.join(source_path, cms_file) if not os.path.exists(cms_file): raise ValueError(f'The cms file {cms_file} does not exist.') # Check trajectory path if not os.path.exists(trj_dir): trj_dir = os.path.basename(trj_dir) trj_dir = os.path.join(source_path, trj_dir) if not os.path.exists(trj_dir): raise ValueError(f'The trajectory {trj_dir} does not exist.') # Try loading trajectory path try: trj = traj.read_traj(trj_dir) except Exception as msg_error: msg = "Cannot load trajectory: %s" % msg_error raise ValueError(msg) return cms_file, trj_dir
[docs]class FrameSpinBox(swidgets.SSpinBox): """ A spin box to change and set the frames for the trajectory """
[docs] def __init__(self, layout, commands=None): """ Create a FrameSpinBox instance :type layout: QLayout :param layout: If supplied, the FrameSpinBox created will be added to this layout :type commands: list :param commands: The list of callbacks for the valueChanged signal. """ swidgets.SSpinBox.__init__(self, parent=None, layout=layout) self.setMinimumWidth(80) # Connecting signals to slots if commands: for command in commands: self.valueChanged.connect(command)
[docs]class TrajRangeSelectorDialog(swidgets.SDialog): """ Trajectory range selector dialog with fixed labels, a spinbox for step, and no overlapping of sliders """ DIALOG_SIZE = (490, 160)
[docs] def __init__(self, master): """ Create a TrajRangeSelectorDialog instance """ swidgets.SDialog.__init__(self, master, title='Trajectory Frames') self.setMaximumSize(*self.DIALOG_SIZE) self.setMinimumSize(*self.DIALOG_SIZE)
[docs] def layOut(self): """ Layout the widgets in the Dialog """ layout = self.mylayout # Horizontal layout to add trajectory range selection widget selection_layout = swidgets.SHBoxLayout(layout=layout) # Slider to adjust the start and end limit of frames self.slider = canvas2d.ChmDoubleSlider() # SpinBox to set the start limit of frames self.start_sb = FrameSpinBox( layout=selection_layout, commands=[self.slider.leftPos, self.changeLimitsEnd]) selection_layout.addWidget(self.slider) self.slider.setRange(0.0, 100.0) # SpinBox to set the end limit of frames self.end_sb = FrameSpinBox( layout=selection_layout, commands=[self.slider.rightPos, self.changeLimitsStart]) self.step_sb = swidgets.SLabeledSpinBox('Step size:', minimum=1, value=1, tip=parserutils.TRJ_STEP_HELP, layout=selection_layout, command=self.updateRangeLabel) self.label = swidgets.SLabel('', layout=layout) layout.addSpacing(25) self.traj_range = SimpleNamespace(initial_min=0, initial_max=100.0, temp_min=0, temp_max=100.0, step_size=0.1) # Connecting signals to the slots self.slider.valuesChanged.connect(self.slidersMoved)
[docs] def updateRangeLabel(self): """ Function to change label when spinbox values are changed or slider is moved """ minT = self.start_sb.value() * self.traj_range.step_size maxT = self.end_sb.value() * self.traj_range.step_size self.time_range = "%.2f - %.2f ns" % (minT, maxT) self.label.setText(self.time_range) frames = list(map(str, self.getFrames())) if len(frames) > 6: frames = frames[:3] + ['... '] + frames[-3:] interval = self.getStep() * self.traj_range.step_size text = '\nFrames: ' + ', '.join(frames) text += f'\n\nTime: {self.time_range} at every {interval:.2f} ns' self.label.setText(text)
[docs] def slidersMoved(self): """ Sets values in spinboxes and label when the sliders are moved """ start_pos = round(self.slider.leftPos()) end_pos = round(self.slider.rightPos()) self.start_sb.setRange(self.traj_range.initial_min, end_pos) self.start_sb.setValue(start_pos) self.end_sb.setRange(start_pos, self.traj_range.initial_max) self.end_sb.setValue(end_pos) self.updateRangeLabel()
[docs] def reset(self): """ Reset the widgets """ self.step_sb.reset() self.step_val = 1
[docs] def getStep(self): """ Get the trajectory step :return int: The step """ return self.step_sb.value()
[docs] def getFrames(self): """ Get the frame numbers based on the range and step :return list: list of frames numbers """ start_val = self.start_sb.value() end_val = self.end_sb.value() step = self.getStep() frames = list(range(start_val, end_val, step)) + [end_val] return frames
[docs] def accept(self): """ Save the step value in addition to what parent does """ self.setTempTrajRange() self.step_val = self.getStep() super().accept()
[docs] def reject(self): """ Restore step value in addition to what parent does """ self.setTempRangeToSlider(self.traj_range) self.step_sb.setValue(self.step_val) super().reject()
[docs] def setTrajLimits(self, min_f, max_f, step_size): """ Set the limits for trajectory range :type min_f: float :param min_f: minimum value of the trajectory frames :type max_f: float :param max_f: maximum value of the trajectory frames :type step_size: float :param step_size: step size of the frames """ self.step_sb.setMaximum(max_f) self.slider.setRange(min_f, max_f) self.traj_range = SimpleNamespace(initial_min=min_f, initial_max=max_f, temp_min=min_f, temp_max=max_f, step_size=step_size) self.slidersMoved() self.update()
[docs] def setTempRangeToSlider(self, traj_range): """ Set temporary trajectory range values to the range selection widgets :type traj_range: SimpleNamespace :param traj_range: set of values for the minimum and maximum number of frames and the step size. """ self.slider.leftPos(traj_range.temp_min) self.slider.rightPos(traj_range.temp_max) self.slidersMoved()
[docs] def setTempTrajRange(self): """ Read values from widgets and store the temporary range value in traj_range property """ self.traj_range.temp_min = self.start_sb.value() self.traj_range.temp_max = self.end_sb.value()
[docs] def changeLimitsEnd(self): """ Change limits of end time spin box to reflect the left slider position """ lpos = self.slider.leftPos() self.end_sb.setRange(lpos, self.traj_range.initial_max) self.updateRangeLabel()
[docs] def changeLimitsStart(self): """ Change limits of start time spin box to reflect the right slider position """ rpos = self.slider.rightPos() self.start_sb.setRange(self.traj_range.initial_min, rpos) self.updateRangeLabel()
[docs] def getRangeLabel(self): """ This function returns text string showing trajectory range in ns. :return: range string :rtype: str """ return self.label.text()
[docs]class TrajRangeSelectorFrame(swidgets.SFrame): """ Frame that adds trajectory selection button and label. It is connected to a dialog to select the trajectory range """
[docs] def __init__(self, layout): """ :param `QLayout` layout: layout to add the button to """ super().__init__(layout=layout) trj_layout = swidgets.SHBoxLayout(layout=self.mylayout) self.traj_btn = swidgets.SPushButton( 'Trajectory Frames...', layout=trj_layout, command=self.showTrajRangeSelectorDialog) self.traj_label = swidgets.SLabel('', layout=trj_layout) trj_layout.addStretch() self.traj_dialog = TrajRangeSelectorDialog(self) self.traj_dialog.dbb.accepted.connect(self.updateTrajectoryRangeLabel)
[docs] def updateTrajectoryRangeLabel(self): """ Update the trajectory range label with new range from traj dialog and correct the text """ time_range = self.traj_dialog.time_range num_frames = len(self.traj_dialog.getFrames()) self.traj_label.setText(f'{num_frames} frames, {time_range}')
[docs] def showTrajRangeSelectorDialog(self): """ Show traj dialog """ self.traj_dialog.exec_()
[docs] def updateTrj(self, trj_path): """ Load new trajectory range :param str trj_path: path to the trajectory. :return bool: True if trajectory range was loaded, else False """ try: trj = traj.read_traj(trj_path) except Exception: return False nframes = len(trj) time_sec = [fr.time / 1000.0 for fr in trj] self.traj_dialog.setTrajLimits(0, nframes - 1, time_sec[1] - time_sec[0]) self.updateTrajectoryRangeLabel() return True
[docs] def getFlags(self): """ Get flags for current selected trajectory range """ start_val, end_val = self.getRange() flags = [] flags += [parserutils.FLAG_TRJ_MIN, str(start_val)] flags += [parserutils.FLAG_TRJ_MAX, str(end_val)] # Only add step if non-default step = self.traj_dialog.getStep() if step != 1: flags += [parserutils.FLAG_TRJ_STEP, str(step)] return flags
[docs] def getRange(self): """ Get current range of frames selected :rtype: tuple(int) :return: lower limit and upper limit of the selected range of frames """ start_val = self.traj_dialog.start_sb.value() end_val = self.traj_dialog.end_sb.value() return start_val, end_val
[docs] def setRange(self, start_val=None, end_val=None): """ Set the frame range (in number and not time) for trajectory selection :type start_val: int :param start_val: lower limit for frame in trajectory selection :type end_val: int :param end_val: upper limit for frame in trajectory selection """ # Set lower limit if start_val is not None: self.traj_dialog.start_sb.setValue(start_val) # Set upper limit if end_val is not None: self.traj_dialog.end_sb.setValue(end_val) # Update the dialog self.traj_dialog.setTempTrajRange() self.updateTrajectoryRangeLabel()
[docs] def setEnabled(self, state): """ Enable or disable the button and label for showing the traj dialog :param bool state: True to enable the button, and False to disable """ self.traj_btn.setEnabled(state) self.traj_label.setEnabled(state)
[docs] def reset(self): """ Reset the frame """ self.traj_dialog.reset() self.traj_label.setText('')
[docs]class TrajAnalysisGuiMixin(appbase.BaseAnalysisGui): """ Class for extension of af2 to add widgets to gui for desmond trajectory analysis. """
[docs] def addLoadTrajButton(self, layout, setup_method=None, allow_gcmc=True): """ Load button to load structure from workspace and associated trajectory :param `QLayout` layout: layout to add the button and range selector to :param callable setup_method: The method to call to setup the panel :param bool allow_gcmc: Whether gcmc cms's should be allowed """ self.load_btn = appbase.StructureLoadButton(command=self._importTrjFile, layout=layout) self.trj_range_selector = TrajRangeSelectorFrame(layout=layout) self.setup_method = setup_method self.allow_gcmc = allow_gcmc
def _importTrjFile(self): """ Load the workspace file and get the trajectory path and cms file from the structure. :return `schrodinger.structure.Structure`: Structure that was imported """ self.resetPanel() struct = self.getIncludedEntry() if struct is None: return try: cms_file, trj_dir = get_cms_and_trj_path(struct) except ValueError as err: self.error(str(err)) return if not self.allow_gcmc: cms_model = cms.Cms(cms_file) if cms_model.is_for_gcmc: self.error( 'The input Desmond model is for Grand-Canonical Monte ' 'Carlo, which is not supported by this feature.') return result = self.trj_range_selector.updateTrj(trj_dir) if not result: return self.cms_file = cms_file self.trj_path = trj_dir self.struct = struct # Do any extra setup steps if necessary if self.setup_method: if self.setup_method() is False: # Setup failed. Reset the panel to clear the changes. self.resetPanel() return self.toggleStateMain(True) return struct
[docs] def resetPanel(self): """ Reset the panel variables and widgets set by this mixin :raise NotImplementedError: Will raise error if load button is not added """ try: self.trj_range_selector except AttributeError: raise NotImplementedError('addLoadTrajButton not called.') super().resetPanel() self.cms_file = None self.trj_path = None self.struct = None self.trj_range_selector.reset()
[docs] def toggleStateMain(self, state): """ Enable and disable trajectory traj button and label :param bool state: True to enable the button, and False to disable """ self.trj_range_selector.setEnabled(state)
[docs] def getTrajFlags(self): """ Get the command line flags for cms file, trajectory path, and trajectory range. :return list: A list of command line flags and values """ flags = [] jobname = self.jobname() # Get cms extension cms_ext = fileutils.get_file_extension(self.cms_file) cms_name = jobname + cms_ext shutil.copy2(self.cms_file, cms_name) # Get trj extension trj_ext = traj.get_trajectory_extname(ref_fname=self.trj_path) trj_name = jobname + trj_ext copy_func = (shutil.copytree if os.path.isdir(self.trj_path) else shutil.copy2) copy_func(self.trj_path, trj_name) # Associate trajectory to cms and write it with cms_writer(cms_name) as model: model.set_cts_property(msprops.TRAJECTORY_FILE_PROP, trj_name) flags += [parserutils.FLAG_CMS_PATH, cms_name] flags += [parserutils.FLAG_TRJ_PATH, trj_name] flags += self.trj_range_selector.getFlags() return flags
@af2.validator(-1000) def validateCMS(self): """ Check if valid structure has been loaded. :rtype: bool or (bool, str) :return: The bool is True if file is loaded. False with message pop up if file is not loaded. """ return self._validateCMS() def _validateCMS(self): """ Check if valid structure has been loaded. This is a separate method as jobtasks.preprocessor does not work with a function that returns an af2 validation object. Also it can be used independently of any decorator. :rtype: bool or (bool, str) :return: The bool is True if file is loaded. False with message pop up if file is not loaded. """ if not hasattr(self, 'cms_file') or self.cms_file is None: return (False, 'No structure loaded.') return True