Source code for schrodinger.trajectory.trajectory_gui_dir.playertoolbar

"""
Trajectory viewer player toolbar.

Copyright Schrodinger, LLC. All rights reserved.
"""

import glob
import itertools
import os
import textwrap
from enum import Enum
from functools import partial
from typing import Union

import numpy

import schrodinger
from schrodinger import structure
from schrodinger.application.desmond import cms
from schrodinger.application.desmond.constants import IS_INFINITE
from schrodinger.application.desmond.packages import topo
from schrodinger.application.desmond.packages import traj
from schrodinger.application.desmond.packages import traj_util
from schrodinger.application.matsci import msprops
from schrodinger.application.matsci.nano import xtal
from schrodinger.infra import mm
from schrodinger.infra import projectmodel
from schrodinger.infra.mmbitset import Bitset
from schrodinger.project import utils as projutils
from schrodinger.Qt import QtCore
from schrodinger.Qt import QtGui
from schrodinger.Qt import QtWidgets
from schrodinger.structutils import analyze
from schrodinger.trajectory import utils
from schrodinger.trajectory import validate_trajectory as vt
from schrodinger.trajectory.trajectory_gui_dir import export_movie
from schrodinger.trajectory.trajectory_gui_dir import frame_structure_exporter
from schrodinger.trajectory.trajectory_gui_dir import playback_settings
from schrodinger.trajectory.trajectory_gui_dir import playback_settings_data
from schrodinger.trajectory.trajectory_gui_dir import plots as tplots
from schrodinger.trajectory.trajectory_gui_dir import stylesheet
from schrodinger.trajectory.trajectory_gui_dir import traj_plot_gui
from schrodinger.trajectory.trajectory_gui_dir import trajectory_messages
from schrodinger.trajectory.trajectory_gui_dir.export_structure_enums import \
    ExportMode
from schrodinger.trajectory.trajectory_gui_dir.playback_settings_data import \
    Direction
from schrodinger.trajectory.trajectory_gui_dir.playback_settings_data import \
    Loop
from schrodinger.trajectory.trajectory_gui_dir.secondary_structure_data import \
    SecondaryStructureData
from schrodinger.trajectory.trajectory_gui_dir.trajectory_messages import \
    MessageButton
from schrodinger.ui import maestro_ui
from schrodinger.ui.qt import messagebox
from schrodinger.ui.qt import utils as qt_utils
from schrodinger.ui.qt.decorators import wait_cursor

maestro = schrodinger.get_maestro()

LOAD_TRAJ_MSG = ("The trajectory corresponding to the clicked plot is not "
                 "currently loaded into the Trajectory Player. Do you want "
                 "to load it now?\n\nLoading the trajectory will exclude all "
                 "other entries from the Workspace.")
REQUESTED_TRAJ_NOT_FOUND_MSG = ("The trajectory corresponding to the clicked "
                                "plot cannot be found in this project. The "
                                "entry may have been deleted or its "
                                "trajectory data may have been moved.\n\n"
                                "Delete all plots associated with this "
                                "missing trajectory?")


[docs]def get_mmct_id_map_from_dict(atom_id_map_dict: dict): """ :param atom_id_map_dict: A dictionary of old to new atom renumbering dictionary (Keys are atom number before deleting and value for each is the new atom number or None if that atom was deleted. :return: Return id map object if there is a valid dict, otherwise -1. :rtype: MM_ID_Map Note: Caller must delete it explicitly using mmct_id_map_delete() method. """ atom_id_map = mm.MMCT_INVALID_ID_MAP if atom_id_map_dict: atom_id_map = mm.mmct_id_map_new(len(atom_id_map_dict)) for key, value in atom_id_map_dict.items(): mm.mmct_id_map_set(atom_id_map, key, value if value else -1) return atom_id_map
[docs]def unroll_pos(ct, rep_vec, xyz0, start_atom): """ Set coordinates for all copies of the replicated CT. :type ct: `structure.Structure` :type rep_vec: `tuple` of 3 `int` :type xyz0: Nx3 `numpy.array` where N is the number of atoms in the un-replicated CT. """ # FIXME: DESMOND-9705 should remove it. box = numpy.reshape(cms.get_box(ct), [3, 3]) n_copies = numpy.prod(rep_vec) shifts = numpy.empty((n_copies, 1, 3)) for i, (a, b, c) in enumerate(itertools.product(*map(range, rep_vec))): shifts[i][0] = numpy.dot((a, b, c), box) len_xyz0 = len(xyz0) xyz = numpy.empty((n_copies, len_xyz0, 3)) numpy.copyto(xyz, shifts + xyz0) xyz.shape = (n_copies * len_xyz0, 3) ct_xyz = ct.getXYZ(copy=False) ct_xyz[start_atom - 1:start_atom - 1 + (n_copies * len_xyz0)] = xyz ct.setXYZ(ct_xyz)
[docs]def set_structure_atoms_entry_id(st: structure.Structure, eid: int, start_atom: int, end_atom: int): """ Update entry id of specified range of atoms in the structure. :param st: Structure in which atoms needs to be updated. :param eid: Entry id to be set. :param start_atom: Starting atom in the structure to be updated. This atom is also included when updating entry id. :param end_atom: Ending atom in the structure to be updated. This atom is also included when updating entry id. """ bs = Bitset(size=st.atom_total) bs.range(start_atom, end_atom) mm.mmct_ct_set_entry_name(st, str(eid), bs)
[docs]def get_workspace_structure_and_entry_total(): """ :return: Return a tuple of workspace ct and entry total in the workspace structure. :rtype: tuple(structure.Structure, int) """ main_ct = maestro.workspace_get(copy=False) entry_total = mm.mmct_ct_get_entry_total(main_ct) return (main_ct, entry_total)
[docs]def is_replica_atom_count(main_ct_atom_total: int, frame_ct_atom_total: int): """ Whether the main_ct atom count is a result of number of atoms obtained by replicating frame_ct. :param main_ct_atom_total: Main CT atom count to verify :param frame_ct_atom_total: Frame ct atom count to be used for reference. :rtype: bool :return: Whether the main ct atom count is a perfect replication of frame ct atom count. """ return (main_ct_atom_total > frame_ct_atom_total and (main_ct_atom_total % frame_ct_atom_total == 0))
[docs]def set_distance_cell_props( proj: schrodinger.project.Project, eid: int, box_values: schrodinger.infra.projectmodel.TriclinicBox): """ Set distance cell properties value in the project for the given entry id. box_values represents ax, ay, az, bx, by, bz, cx, cy, cz respectively. :rtype: bool :return: True if values are set appropriately. """ if box_values and box_values.isValid(): row = get_entry_row(proj, eid) return projectmodel.mm_set_distance_cell_props(proj.project_model, row.index, box_values) return False
[docs]def get_distance_cell_props(proj: schrodinger.project.Project, eid: int): """ Get distance cell properties value from the project for the given entry id. :rtype: list :return: schrodinger.infra.projectmodel.TriclinicBox on success. """ row = get_entry_row(proj, eid) box_values = projectmodel.mm_get_distance_cell_props( proj.project_model, row.index) if box_values.isValid(): return box_values
[docs]class ReferenceEntryStatistics: """ A helper data class. It holds starting position of reference entry atom in the workspace structure, reference entry atom total, and entry id. """
[docs] def __init__(self, workspace_st: structure.Structure, frame_st_entry_id: int, frame_st_atom_total: int, frame_st_replica_count: int): """ :param workspace_st: Workspace structure. Expected exactly two entries in the workspace. :param frame_st_entry_id: Frame structure associated entry id. :param frame_st_atom_total: Atom total of the frame present in the workspace structure. :param frame_st_replica_count: Number of replica of frame structure in the workspace strucuture. """ # It is possible that replication has not been done yet, so # we can not always compute frame structure atom total by multiplying # frame ct atom total. # E.g. # Include trajectory entry, set replica = 2, include another entry, save # project, reopen project - replica values are set to 2, but workspace # structure is not yet updated. # If workspace atom total is greater than frame_st_atom_total's replica, # it implies that we have other entry in the workspace structure. exp_trajectory_total_atom_in_workspace_st = frame_st_atom_total * frame_st_replica_count if workspace_st.atom_total > exp_trajectory_total_atom_in_workspace_st: frame_st_all_replica_atom_total = exp_trajectory_total_atom_in_workspace_st else: frame_st_all_replica_atom_total = frame_st_atom_total # If frame structure was a first included entry in the workspace # structure, then workspace structure will have entry id set to # 1-frame_st_atom_total for frame_st_entry_id. if (int(workspace_st.atom[1].entry_id) == frame_st_entry_id and int(workspace_st.atom[frame_st_all_replica_atom_total].entry_id) == frame_st_entry_id): self.start_atom = frame_st_all_replica_atom_total + 1 else: self.start_atom = 1 self.atom_total = (workspace_st.atom_total - frame_st_all_replica_atom_total) self.entry_id = int(workspace_st.atom[self.start_atom].entry_id)
@property def reference_entry_included_first(self): """ :return: True if reference entry is included first in the workspace. :rtype: bool """ # If reference entry included first, its atoms will start from 1. return self.start_atom == 1
[docs]def compare_structures_connectivity(st1: structure.Structure, st2: structure.Structure): """ Whether given structures have same connectivity between atoms. E.g. If at.n 1-2 is connected in st1, then same set of atoms must be connected in the st2 also. :param st1: Structure1 to compare :param st2: Structure2 to compare :rtype: bool :return: Whether both structures have same connectivity. """ return mm.mmct_ct_compare_connect(st1.handle, st2.handle) == mm.MMCT_SAME
[docs]def get_entry_row_property_value(row: schrodinger.project.ProjectRow, prop_name: str): """ Get project row's property value. """ if row: try: return row.property[prop_name] except KeyError: pass
[docs]def get_entry_row(proj: schrodinger.project.Project, eid: int): """ Get project entry row. """ if proj and eid: return proj.getRow(eid)
[docs]def get_entry_ct(proj: schrodinger.project.Project, eid: int, props: bool, copy: bool): """ Provide entry structure corresponding to given entry id in the project. :param proj: The current project. :param eid: Entry id associated with current project. :param props: True if properties should also be copied. :param copy: True if new copy of entry structure is returned. :return: Entry structure stored in the project. :rtype: structure.Structure or None """ row = get_entry_row(proj, eid) if row is not None: return row.getStructure(props=props, copy=copy, workspace_sync=False)
[docs]def set_entry_ct(proj: schrodinger.project.Project, st: structure.Structure, eid: int, props: bool, sync_workspace: bool, copy: bool): """ Set entry structure corresponding to given entry id in the project. :param proj: The current project. :param st: Entry structure to be set. :param eid: Entry id associated with current project. :param props: True if properties should also be copied. :param sync_workspace: Synchronize workspace as well. :param copy: True if new copy of entry structure is returned. :rtype: bool :return: Whether the structure is set successfully in the project entry ct. """ row = get_entry_row(proj, eid) if row is None or st is None: return False row.setStructure(st, props=props, sync_workspace=sync_workspace, copy=copy) return True
[docs]class MovieInterruptionType(Enum): """ Type defining a save movie interruption type. """ # Movie is paused by clicking over Pause button. PAUSED = 0, # Player itself is closed by clicking over X button. CLOSED = 1
[docs]class MovieInterruptionDialogRetCode(Enum): """ Type defining a return code from save movie interruption warning dialog. """ # Save movie using only currently saved frames and stop saving movie. PARTIAL_SAVE = 0, # Continue saving movie until all requested frames are saved. RESUME = 1, # Cancel save movie operation and discard currently saved frames. CANCEL = 2
[docs]class MovieExportInterruptionDialog(messagebox.MessageBox): """ A dialog that displays a warning when movie export operation is interrupted. """
[docs] def __init__(self, parent, interruption_type): """ :type parent: QtWidgets.QWidget :param parent: Parent widget of this dialog. :type interruption_type: enum(MovieInterruptionType) :param interruption_type: Interruption source button. """ if interruption_type == MovieInterruptionType.CLOSED: action_text = 'Closing the Trajectory Player' else: action_text = 'Pausing now' message = textwrap.dedent(f""" {action_text} will terminate the export and truncate the resulting movie; only the previously displayed portion will be saved. To save the partial movie, click Save. To continue the 'export, click Resume. Cancel will terminate the export without saving. """) super().__init__(parent=parent, title='Movie Export Interrupted', text=message, icon=QtWidgets.QMessageBox.Question) save = self.addButton('Save', QtWidgets.QMessageBox.AcceptRole) cancel = self.addButton('Cancel', QtWidgets.QMessageBox.RejectRole) resume = self.addButton('Resume', QtWidgets.QMessageBox.AcceptRole) self.setDefaultButton(save) self.setEscapeButton(cancel) self.button_ret_code_dict = { save: MovieInterruptionDialogRetCode.PARTIAL_SAVE, resume: MovieInterruptionDialogRetCode.RESUME, cancel: MovieInterruptionDialogRetCode.CANCEL }
[docs] def exec_(self): """ Show the warning dialog. :rtype: enum(MovieInterruptionDialogRetCode) :return: Return one of the enum value defined by MovieInterruptionDialogRetCode. """ messagebox.MessageBox.exec_(self) clicked_button = self.clickedButton() return self.button_ret_code_dict[clicked_button]
[docs]class QWidgetStyled(QtWidgets.QWidget): """ In order to style QWidgets using stylesheet, this custom paintEvent method must be implemented. """
[docs] def paintEvent(self, e): """ See `QtWidgets.QWidget` documentation for more information. """ opt = QtWidgets.QStyleOption() opt.initFrom(self) p = QtGui.QPainter(self) self.style().drawPrimitive(QtWidgets.QStyle.PE_Widget, opt, p, self)
[docs]class TrajectoryQMenu(QtWidgets.QMenu): """ In order to show the QMenu at location above parent tool button, it traps a showEvent() and move menu at desired position. """
[docs] def showEvent(self, event): """ See QtWidgets.QMenu documentation for more information. Moves menu position slightly above parent tool button. """ tb = self.parent() tbpos = tb.mapToGlobal(QtCore.QPoint(0, 0)) x = tbpos.x() size = self.sizeHint() y = tbpos.y() - size.height() self.move(QtCore.QPoint(x, y)) return super().showEvent(event)
[docs]class TrajectoryMaestroSettings: """ Whenever player is setup for a trajectory entry, it saves current settings and apply specific settings. Whenever player state is cleared, it restores original settings. """
[docs] def __init__(self, proj, eid, adjust_pos_func, translate_to_first_unit_cell): """ Save original values of maestro settings and apply new settings according to trajectory mode. :type proj: project.Project handle. :param proj: Project handle :type eid: int :param eid: Entry id :type adjust_pos_func: method(bool) :param adjust_pos_func: Function callback to be called to update the adjust position. :type translate_to_first_unit_cell: bool :param translate_to_first_unit_cell: Indicates whther 'Translate to first unit cell' is toggled on or off. """ row = get_entry_row(proj, eid) periodic_fix = get_entry_row_property_value(row, msprops.PERIODIC_FIX) is_infinite = bool(get_entry_row_property_value(row, IS_INFINITE)) # Remember original max pbc atoms used in maestro. self._orig_max_pbc_atoms = maestro.get_command_option( "trajectoryplayersettings", "maxpbcatoms") self._pbc_atoms_set = is_infinite or periodic_fix is False # MATSCI-3894, MATSCI-6234 # TODO remove periodicfix check # It is here for backward compatibility (18-4 and older) if self._pbc_atoms_set or translate_to_first_unit_cell: self._setMaxPBCAtoms(0) # Always set adjust position to none whenever entry is loaded. adjust_pos_func(False) # Remember original crystal unit cell marker state. self._orig_crystal_unit_cell_state = maestro.get_command_option( "crystalunitcell", "showmarker") == 'True' # Turn off crystal unit cell marker state if it is on. if self._orig_crystal_unit_cell_state: self._setCrystalUnitCellMarkerState(False)
def _runMaestroCommand(self, cmd): """ Executes maestro command. :param cmd: Maestro command to execute. :type cmd: str """ try: maestro.command(cmd) except maestro.MaestroCommand: pass def _setCrystalUnitCellMarkerState(self, crystal_unit_cell_show): """ Set crystal unit cell marker state in maestro. :param crystal_unit_cell_show: Indicates whether to show crystal unit cell marker in maestro or not. :type crystal_unit_cell_show: bool """ val = 'true' if crystal_unit_cell_show else 'false' self._runMaestroCommand(f"crystalunitcell showmarker={val}") def _setMaxPBCAtoms(self, max_pbc_atoms): """ Set maximum pbc atoms in maestro. If max_pbc_atoms is 0, maestro always checks for PBC bonds regardless of how many atoms are in the workspace. MAE-40798 :param max_pbc_atoms: Maximum pbc atoms to consider. :type max_pbc_atoms: int """ self._runMaestroCommand( f"trajectoryplayersettings maxpbcatoms={max_pbc_atoms}")
[docs] def setMaxPBCAtomsForUnitCell(self, translate_to_first_unit_cell): """ Set maxpbcatoms based on translate_to_first_unit_cell. maxpbcatoms will be set to 0, if True, otherwise to the original maxpbcatoms. :param translate_to_first_unit_cell: Indicates whther 'Translate to first unit cell' is toggled on or off. :type translate_to_first_unit_cell: bool """ # Do nothing, if maxpbcatoms was set to 0 because of periodicfix and # is_infinite properties at the construction time. if self._pbc_atoms_set: return self._setMaxPBCAtoms( 0 if translate_to_first_unit_cell else self._orig_max_pbc_atoms)
[docs] def restore(self): """" Restore maestro settings using original values which were initially changed. """ # Display crystal unit cell marker if it was visible earlier. if self._orig_crystal_unit_cell_state: self._setCrystalUnitCellMarkerState(True) self._setMaxPBCAtoms(self._orig_max_pbc_atoms)
[docs]class EntryTrajectory: """ It encapsulates all the data related to trajectory inside this object. Primary information stored are trajectory entry settings, cms model, msys model, and list of trajectory frames. - Supports both fixed and varying atoms frame trajectory. - When every frame has the same number of atoms, it is called fixed atoms trajectory. - When all frames do not have same number of atoms, it is called varying atoms trajectory. - Supports full system ct validation against entry ct. - Atoms visibility handling. - Trajectory frame positioning (center or superimpose, no-position) - Secondary structure update - Restoriation of original entry ct. """
[docs] def __init__(self, proj: schrodinger.project.Project, eid: int, matsci_profile: bool): """ :param proj: The project containing this trajectory. :param eid: The entry id attached to this trajectory. :param matsci_profile: Whether current maestro profile is MatSci. It is used to configure default settings based on the profile. """ self.proj = proj self.eid = eid self.cms_model = None self.msys_model = None self.trajectory = None self._has_inactive_atoms = False self.settings_data = None self.total_frame = None self.matsci_profile = matsci_profile # This CT is used to track original entry ct. Visiblity changes outside # of trajectory are updated in this ct. This information is used to # restore default entry ct visiblity. self.orig_entry_ct = None # This CT is a primary ct which is updated using Desmond APIs. # TrajectoryPlayer uses this as a base ct during trajectory player # operations. self.frame_ct = None self.ss_data = SecondaryStructureData() self._has_secondary_structure = False self._reference_entry_statistics = None self._orig_box_values = None if self.readTrajectory(): self.total_frame = len(self.trajectory) self.detectVaryingAtomsFrame() self.initFrameToActiveAtomTotalMap() self.settings_data = playback_settings_data.PlaybackSettingsData( self.proj, self.eid, self.total_frame, matsci_profile=matsci_profile) self._updateSavedStructures() # Save original box values from project because these will change # according to current frame when player is active. self._orig_box_values = get_distance_cell_props(self.proj, self.eid) # Optimization - cache if structure has secondary structure # property. self._has_secondary_structure = any( at.secondary_structure != structure.SS_NONE for at in self.orig_entry_ct.atom) # Dictionary of the dgo change type and the callback method # which should be called on that particular dgo change. self.dgo_change_type_callback_dict = { maestro_ui.MM_DGO_RIBBON_SETTINGS_CHANGED: maestro_ui.clone_atoms_ribbon_settings, maestro_ui.MM_DGO_COLOR_CHANGED: maestro_ui.clone_atoms_color_settings, maestro_ui.MM_DGO_REP_CHANGED: maestro_ui.clone_atoms_representation_settings, maestro_ui.MM_DGO_ATOM_LABEL_SETTINGS_CHANGED: maestro_ui.clone_atoms_label_settings }
[docs] def detectVaryingAtomsFrame(self): """ Identify if trajectory contains varying atoms frame or not and accordingly set EntryTrajectory._has_inactive_atoms data member. """ for frame_number in range(1, self.total_frame + 1): fr = self.getFrame(frame_number) if fr.natoms != self.getFrameActiveAtomTotal(frame_number): self._has_inactive_atoms = True return
@property def reference_entry_statistics(self): """ :return: Return reference entry statistics object if available. :rtype: ReferenceEntryStatistics """ return self._reference_entry_statistics @reference_entry_statistics.setter def reference_entry_statistics(self, ref_stat: ReferenceEntryStatistics): """ Set reference entry statics object instance and update dependent flags. """ self._reference_entry_statistics = ref_stat self.settings_data.allow_replication = ( ref_stat is None or (self.reference_entry_included_first and (not self.isVaryingAtomsFrame()))) @property def reference_entry_included_first(self): """ :return: True if reference entry is included first in the workspace. :rtype: bool """ return (self.allow_reference_entry and self.reference_entry_statistics.reference_entry_included_first) @property def has_secondary_structure(self): """ @return bool indicating if structure has secondary structure properties or not. """ return self._has_secondary_structure def _updateSavedFrameStructure(self): """ Save original frame structure. """ # Optimization: If we are using fixed frame trajectory and there is no # replica, we directly operate on workspace structure, so there is no # need to have a temporary frame structure - MAE-42350 if self.usingTemporaryFrameStructure(): self.frame_ct = self.orig_entry_ct.copy() else: self.frame_ct = maestro.workspace_get(copy=False)
[docs] def updateSavedFrameStructureForExport(self, st: structure.Structure): """ Update active frame structure for export operation. """ self.frame_ct = st.copy() if self.usingTemporaryFrameStructure() else st
def _updateSavedStructures(self): """ Save original entry structure and default frame structure. """ self.orig_entry_ct = get_entry_ct(self.proj, self.eid, props=False, copy=True) self._updateSavedFrameStructure()
[docs] def updateTrajectoryEntryStructure(self, workspace_st, atoms_bs, change_type): """ Update saved original entry structure according to change type from the workspace structure. :param workspace_st: The workspace strucutre :type workspace_st: schrodinger.structure.Structure :param atoms_bs: The MMbs handle of atoms which are affected by this change. Note that atoms_bs represents indices in the workspace structure and it is possible that workspace structure contains replica or external reference structure, so mapping for workspace structure atom indices to original entry structure needs to be taken care in this method. :type atoms_bs: MMbs handle :param change_type: Type of workspace change (color, label, or ribbon) :type change_type: maestro_ui.MMENUM_DGO_CHANGED_TYPE """ if not self.canAcceptWorkspaceStructureChange(change_type): return if not mm.mmbs_in_use(atoms_bs): return self.dgo_change_type_callback_dict[change_type](workspace_st, atoms_bs, self.orig_entry_ct)
[docs] def canAcceptWorkspaceStructureChange(self, change_type): """ Find out if given change type can be accepted by EntryTrajectory :param change_type: Type of workspace change (color, label, or ribbon, representation) :type change_type: maestro_ui.MMENUM_DGO_CHANGED_TYPE :return: True if change type is allowed to be accepted by this class. :rtype: bool """ return change_type in self.dgo_change_type_callback_dict.keys()
[docs] def hasPlaybackSettingsData(self): """ Check if the current entry trajectory has playback settings as entry properties :return: True if eid has playback settings set otherwise False :rtype: bool """ entry_row = get_entry_row(self.proj, self.eid) return playback_settings_data.STEP_PROP in entry_row.property
def __del__(self): """ Destructor - Make sure that trajectory entry data is written back in the project. - Clear Desmond trajectory cache, otherwise memory is not released. """ if self.settings_data: self.settings_data.writeSettings() self._clearTrajectoryFrames() def _getCMSFilePath(self, eid): """ Return trajectory file path if there is any. For Material Science case, it generates cms file, if required. """ # Get cms file for Desmond case if projutils.has_desmond_trajectory(self.proj, eid): return utils.get_cms_file_path(self.proj, eid) elif projutils.has_materials_trajectory(self.proj, eid): # Get cms file for Material Science case hidden_cms_file_path = utils.get_hidden_cms_file_path( self.proj, eid) # If not present, then generate and re-query if not hidden_cms_file_path: utils.generate_cms_file_from_entry(self.proj, eid) hidden_cms_file_path = utils.get_hidden_cms_file_path( self.proj, eid) return hidden_cms_file_path
[docs] def readTrajectory(self): """ Read trajectory using desmond APIs and initialize trajectory entry information. """ cms_file_path = self._getCMSFilePath(self.eid) if not cms_file_path: maestro.warning("Entry does not contain any trajectory.") return False try: self.msys_model, self.cms_model, self.trajectory = traj_util.read_cms_and_traj( cms_file_path) topo.make_glued_topology(self.msys_model, self.cms_model) except traj_util.TrajectoryUnreadableError as ex: trj_path = utils.get_trajectory_path(self.proj, self.eid) if trajectory_messages.show_invalid_trajectory_file_dlg( trj_path) == MessageButton.REMOVE: utils.set_trajectory_path(self.eid, "") return False return True
[docs] def isSameTrajectory(self, eid: int): """ Find out if trajectory associated with given entry id is same as this object. :param eid: The entry id attached to this trajectory. :rtype: bool :return: Whether the trajectory of same entry id is already loaded. """ new_trj_file_path = utils.get_trajectory_path(self.proj, eid) curr_trj_file_path = utils.get_trajectory_path(self.proj, self.eid) return (self.isValid() and self.eid == eid and curr_trj_file_path == new_trj_file_path)
[docs] def isValid(self): """ Check if trajetory is initialized properly or not. :rtype: bool :return: Whether the trajectory object is property constructed. """ return (self.eid and self.cms_model and self.trajectory and self.msys_model)
[docs] def isSingleFrameTrajectory(self): """ Check if trajetory has a single frame or more than one frame. :rtype: bool :return: Whether the trajectory object has more than one frame. """ return (self.total_frame == 1)
def _clearTrajectoryFrames(self): # As of DESMOND-8903, cached native frames are automatically # garbage-collected (when all `Frame` objects referencing the `Source` # objects are garbage collected). No need to clear the cache explicitly. self.trajectory = None
[docs] def reload(self): """ Reload trajectory. When user switches between No Position, Align reference frame, or Center view positions, we need to pull original trajectory, so that we can get original trajectory because Align reference frame and Center view position modify trajectory data. This function is expected to be called only if desmond system has already been loaded once. """ if self.isValid(): self._clearTrajectoryFrames() trj_dir = utils.get_trajectory_path(self.proj, self.eid) self.trajectory = traj.read_traj(trj_dir)
[docs] def getFrame(self, frame_number: int): """ Returns a frame object corresponding to frame number. """ if self.trajectory and (0 < frame_number <= self.total_frame): return self.trajectory[frame_number - 1]
[docs] def getFrameTime(self, frame_number: int): """ Return frame chemical time in nano seconds. """ frame = self.getFrame(frame_number) return frame.time * 0.001 if frame is not None else 0
[docs] def getSmoothingFrames(self, frame_number: int): """ Return smoothing frames based on given frame_number and smoothing setting value, or None if smoothing is not greater than 1 :param frame_number: Frame number for which smoothing frames are required. :rtype: list(schrodinger.application.desmond.packages.traj.Frame) or None :return: List of frames to be used for smoothing. """ smoothing = self.settings_data.smoothing if smoothing > 1: end_frame = min(self.total_frame, frame_number + smoothing - 1) + 1 frames = [self.getFrame(i) for i in range(frame_number, end_frame)] return frames
[docs] def updateViewPosition(self, forced_update: bool, asl: str): """ Update trajectory based on view position tab settings and adjust frame. To center molecule, ASL must be valid and matched atoms must be at least 1. To align molecules, ASL must be valid and matched atoms must be at least 3. :param forced_update: Reload frames from default. :param asl: ASL to be used. :rtype: bool :return: Whether the trajectory view positions are updated successfully. """ data = self.settings_data if forced_update: # Restore atoms and box position in the frames. self.reload() if data.adjust_view_position and asl: # Check if ASL is valid or not. if not analyze.validate_asl(asl): maestro.warning("Invalid ASL '%s'" % asl) return False gids = topo.asl2gids(self.cms_model, asl, False) matched_atoms = len(gids) if data.avp_center_molecules and matched_atoms >= 1: self.trajectory = topo.center(self.msys_model, gids, self.trajectory) elif data.avp_align_on_frame and matched_atoms >= 3: ref_pos = self.trajectory[data.avp_ref_frame - 1].pos(gids) self.trajectory = topo.superimpose(self.msys_model, gids, self.trajectory, ref_pos) if data.translate_to_first_unit_cell: self.cms_model, self.trajectory = utils.wrap_trajectory( self.cms_model, self.trajectory) return True
def _sameNumberAtoms(self, st1: structure.Structure, st2: structure.Structure): """ Whether given structures have same number of atoms. :param st1: Structure1 to compare :param st2: Structure2 to compare :rtype: Bool """ return st1.atom_total == st2.atom_total
[docs] def getAtomTotal(self, frame_ct: structure.Structure): """ :param frame_ct: Frame ct :rtype: int :return: Given frame ct atom total. """ return frame_ct.atom_total if frame_ct else 0
[docs] def getFrameActiveAtomTotal(self, frame_number: int): """ return: Return total active atom count in the given frame. """ tr = self.getFrame(frame_number) cms_model = self.cms_model return cms_model.active_total_from_nactive_gids(tr.nactive, tr.natoms)
[docs] def initFrameToActiveAtomTotalMap(self): """ Initialize a map of frame to its associated atom total. """ self._frame_to_atom_total_map = [ self.getFrameActiveAtomTotal(fr) for fr in range(1, self.total_frame + 1) ]
[docs] def getFrameToActiveAtomTotalMap(self): """ :return: A list of active atom total of all frames. :rtype: list(int) """ return self._frame_to_atom_total_map
[docs] def isFrameStructureChanged(self, main_ct: structure.Structure): """ Whether the given structure changed with respect to the frame structure. Only atom count is taken into consideration when comparision structures for changes. It is possible to have atoms total in the workspace structure be the same as full system structure because frame structure could have been reset to full system structure for varying atoms frame. :param main_ct: Main ct. :rtype: bool :return: Whether the given structure changed with respect to frame ct. """ if self.usingTemporaryFrameStructure(): st = self.frame_ct else: st = self.orig_entry_ct frame_ct_atom_total = self.getAtomTotal(st) trajectory_atom_total = self.getTrajectoryAtomTotalInWorkspaceStructure( main_ct) full_system_atom_total = self.cms_model.fsys_ct.atom_total return not ( (trajectory_atom_total == frame_ct_atom_total) or (trajectory_atom_total == full_system_atom_total) or is_replica_atom_count(trajectory_atom_total, frame_ct_atom_total))
[docs] def restoreEntryCT(self, eid: int, sync_workspace: bool, is_snapshot_mode_active: bool): """ Save original trajectory ct in the project entry because atoms position or visibility might have changed during play. CT will not be restored in case of any modification. :param eid: Entry id associated with current project. :param sync_workspace: Whether or not synchronize changes in the workspace. :param is_snapshot_mode_active: True if player was showing frame in the snapshot viewing mode. """ props = False copy = False entry_ct = get_entry_ct(self.proj, eid, props, copy) # Entry ct will be None only if entry is deleted. if entry_ct is None: return # Restore original cell values. set_distance_cell_props(self.proj, eid, self._orig_box_values) # Don't do anything if user edited structure. if self.isUserEditedEntryCt(entry_ct, is_snapshot_mode_active): return set_entry_ct(self.proj, self.orig_entry_ct, eid, props=props, sync_workspace=sync_workspace, copy=True)
[docs] def isUserEditedEntryCt(self, entry_ct: structure.Structure, is_snapshot_mode_active: bool): """ Structure can be modified by automatic mechanism like replication, frame snapshot viewing, these situations should not modify real structure. :param entry_ct: Current entry ct stored in the project. :rtype: bool :return: True if structure is edited by the user. """ orig_st = self.orig_entry_ct has_replication = self.getNumberOfReplica() > 1 frame_to_atom_total_map = self.getFrameToActiveAtomTotalMap() current_frame = self.settings_data.current_frame frame_atom_total = frame_to_atom_total_map[current_frame - 1] # Check if entry CT has been modified # In case of replication check atom count, otherwise call # _sameNumberAtoms() # In case of varying atoms frame, we can not be sure that entry_ct atom # total is exact replica of orig_st because entry_ct atom depends on # current frame's atom total. if has_replication: exact_replica = self._sameNumberAtoms( entry_ct, orig_st) or is_replica_atom_count( entry_ct.atom_total, orig_st.atom_total) or ( self.isVaryingAtomsFrame() and is_replica_atom_count( entry_ct.atom_total, frame_atom_total)) modified_st = not exact_replica return modified_st not_modified_st = (self._sameNumberAtoms(entry_ct, orig_st) or is_snapshot_mode_active or (self.isVaryingAtomsFrame() and entry_ct.atom_total == frame_atom_total)) return not not_modified_st
[docs] def getNumberOfReplica(self): """ Returns number of replica including all dimensions. :rtype: int :return: Number of replica """ data = self.settings_data return data.replicate_x * data.replicate_y * data.replicate_z
[docs] def getReplicateVector(self): """ Returns replicate vector i.e tuple of x, y, z :rtype: tuple(int, int, int) :return: Return tuple of replication vector """ data = self.settings_data return (data.replicate_x, data.replicate_y, data.replicate_z)
[docs] def visibilityChanged(self, displayed: list, undisplayed: list, entry_start_atom: int): """ Update saved atoms visibility according to new state. The list of atoms represent workspace atoms, so we need to make sure that atoms belonging from this entry are updated. :param displayed: List of atoms which are displayed. :param undisplayed: List of atoms which are undisplayed. :param entry_start_atom: First entry atom number in the workspace ct. """ self._updateSavedStructuresVisiblity(displayed, True, entry_start_atom) self._updateSavedStructuresVisiblity(undisplayed, False, entry_start_atom)
def _updateSavedStructuresVisiblity(self, atoms: list, visible: bool, entry_start_atom: int): """ Updated visibility in the saved original entry and frame structures. :param atoms: The list of atoms be updated in the saved structures. :param visible: Visibility flag. :param entry_start_atom: First entry atom number in the workspace ct. """ entry_atom_total = self.getEntryAtomTotal() entry_end_atom = entry_start_atom + entry_atom_total - 1 for at in atoms: # We may have more atoms than original entry structures atoms # because of system replication or multiple trajectories, # so we need to have this check. if entry_start_atom <= at <= entry_end_atom: entry_atom = at - entry_start_atom + 1 for st in [self.orig_entry_ct, self.frame_ct]: mm.mmctg_atom_set_visible(st, entry_atom, visible)
[docs] def getEntryAtomTotal(self): """ Return trajectory entry atom count. :rtype: int :return: Trajectory entry atom total """ return self.orig_entry_ct.atom_total
[docs] def usingTemporaryFrameStructure(self): """ If trajectory contains fixed atoms frame and there is no replica, we operate directly on workspace structure (i.e. main ct). Otherwise, operate on temporary frame ct. :return: True if frame structure is not intended to be used same as workspace structure. """ return (self.isVaryingAtomsFrame() or self.getNumberOfReplica() > 1 or self.allow_reference_entry)
@property def allow_reference_entry(self): """ If trajectory is superimposed on external structure, we allow one more entry in the workspace, so that total entries in the workspace are 2. """ return self.reference_entry_statistics is not None @property def reference_entry_id(self): """ :return: Return reference entry id if there is any. """ if self.allow_reference_entry: return self.reference_entry_statistics.entry_id @property def trajectory_start_atom_in_workspace_structure(self): """ :return: Return frame structure starting atom position in the workspace structure. :rtype: int """ if self.allow_reference_entry and self.reference_entry_statistics.start_atom == 1: return self.reference_entry_statistics.atom_total + 1 else: return 1
[docs] def getTrajectoryAtomTotalInWorkspaceStructure( self, main_ct: structure.Structure): """ If external structure is allowed to superimpose on frame structure, then main_ct atom total does not reflect trajectory atom total in the workspace structure, so we compute trajectory atom total here. :param main_ct: Workspace structure :return: Return trajectory atoms total in the workspace structure including replica or varying atoms. :rtype: int """ trajectory_atom_total_in_main_ct = main_ct.atom_total if self.allow_reference_entry: trajectory_atom_total_in_main_ct -= self.reference_entry_statistics.atom_total return trajectory_atom_total_in_main_ct
[docs] def isVaryingAtomsFrame(self): """ If trajectory contains inactive atoms (i.e. varying atoms frame) or not. :rtype: bool :return: - For fixed atoms trajectory, it returns False. - For varying atoms trajectory, it returns True. """ return self._has_inactive_atoms
[docs] def restoreDefaultVisibility(self, frame_ct: structure.Structure): """ Set default trajectory entry atoms visibility in the frame ct. :param frame_ct: Frame structure. """ orig_ct = self.orig_entry_ct maestro_ui.clone_atoms_visibility(orig_ct.handle, frame_ct.handle)
[docs] def applyVisibilityAsl(self, frame_ct: structure.Structure, asl: str, display_only: bool, visible: bool): """ Apply visibility asl according to evaluated asl on frame ct associated with this trajectory entry. :param frame_ct: Frame structure. :param asl: ASL to be evaluated for this entry. :param visible: Visibility value of atoms matching the asl. """ maestro_ui.update_atoms_visibility_by_asl(frame_ct.handle, asl, display_only, visible)
[docs] def updateFrameStructure(self, frame_number: int): """ Update frame structure ct corresponding to given frame. Also return frame structure corresponding to given frame number. :param frame_number: Frame number of the requested frame. :rtype: structure.Structure :return: Return an updated frame ct. Note function assumes frame_number is a valid frame in the trajectory and caller should always use returned ct for updating its own data, but should not modify returned ct. """ return self.getUpdatedFrameStructure( frame_number, self.frame_ct, self.getSmoothingFrames(frame_number))
[docs] def getUpdatedFrameStructure(self, frame_number: int, frame_ct: structure.Structure, frames_to_smooth: list): """ Update given frame structure ct corresponding to given frame. and return updated frame structure. :param frame_number: Frame number of the requested frame. :param frame_ct: Frame structure to be updated. :param frames_to_smooth: Frames whose atom coordinates are to be smoothed to update the atom coordinates in frame_ct. :rtype: structure.Structure :return: Return an updated frame ct. """ frame = self.getFrame(frame_number) if frame is not None: # Updates position and smoothing. topo.update_fsys_ct_from_frame_GF(frame_ct, self.cms_model, frame, frames_to_smooth) return frame_ct
[docs] def resetStructure(self, sync_workspace: bool): """ Reset entry's structure with trajectory structure :param sync_workspace: Synchronize workspace after resetting a structure in the project. :rtype: bool :return: Whether the entry structure is reset succesfully. """ old_frame_atom_total = self.getAtomTotal(self.frame_ct) if set_entry_ct(self.proj, self.cms_model.fsys_ct, self.eid, props=False, sync_workspace=sync_workspace, copy=True): self._updateSavedStructures() return True return False
[docs] def entryStructureIsValid(self, frame_number: int): """ Validates entry structure with trajectory frame or full system structure. :param frame_number: The frame number relative to that entry structure will be validated for varying atoms frame. :rtype: bool :return: Whether the structure is valid. Note: As a side effect, it also updates EntryTrajectory.frame_ct. """ entry_ct = self.orig_entry_ct trj_st = self.cms_model.fsys_ct # For varying atoms frame, we can not guarantee that full system # structure atom total is same as entry atom total (while playing or if # user modifies frame ct), so also check with # updated frame structure. if (self.isVaryingAtomsFrame() and trj_st.atom_total != entry_ct.atom_total): trj_st = self.updateFrameStructure(frame_number) return compare_structures_connectivity(entry_ct, trj_st)
[docs] def assignSecondaryStructure(self): """ Quick assign secondary structure properties. """ self.ss_data.assign(self.orig_entry_ct, self.frame_ct, self.isVaryingAtomsFrame())
[docs] def getBoxProperties(self, frame_number: int): """ Return box properties for the given frame number. If there is no frame corresponding to frame number, it uses cms_model box properties. :param frame_number: The frame number of the box for which points will be computed. :rtype: list :return: List of 9 properties (ax, ay, az, bx, by, bz, cx, cy, cz) """ frame = self.getFrame(frame_number) if frame: return [ frame.box[0][0], frame.box[0][1], frame.box[0][2], frame.box[1][0], frame.box[1][1], frame.box[1][2], frame.box[2][0], frame.box[2][1], frame.box[2][2] ] else: box = self.cms_model.box return [ box[0], box[1], box[2], box[3], box[4], box[5], box[6], box[7], box[8] ]
[docs] def getCoordsMinMax(self, frame_number: int): """ Return structure minimum, maximum coordinates along three axes of a frame. If there is no frame corresponding to frame number, it uses cms_model. :param frame_number: The frame number :rtype: list, list :return: List of minimum coordinates, list of maximum coordinates """ frame = self.getFrame(frame_number) if frame: xyz = frame.pos() else: xyz = self.cms_model.fsys_ct.getXYZ(copy=False) return xyz.min(axis=0), xyz.max(axis=0)
[docs] def getCrystalUnitCellBoxPoints(self, frame_number: int, pbc_position: Union[None, str]): """ Compute crystal unit cell points. :param frame_number: The frame number of the box for which points will be computed. :param pbc_position: PBC position anchored at some origin defined by its value. None corresponds to the default value :rtype: list(list) :return: List of 4 box points """ box_properties = self.getBoxProperties(frame_number) params = xtal.get_params_from_chorus(box_properties) box_min = 0.0 box_max = 1.0 v0 = xtal.trans_fract_to_cart([box_max, box_max, box_min], *params) v1 = xtal.trans_fract_to_cart([box_min, box_max, box_min], *params) v2 = xtal.trans_fract_to_cart([box_min, box_min, box_min], *params) v3 = xtal.trans_fract_to_cart([box_max, box_min, box_min], *params) v4 = xtal.trans_fract_to_cart([box_max, box_max, box_max], *params) v5 = xtal.trans_fract_to_cart([box_min, box_max, box_max], *params) v6 = xtal.trans_fract_to_cart([box_min, box_min, box_max], *params) v7 = xtal.trans_fract_to_cart([box_max, box_min, box_max], *params) points = [v0, v1, v2, v3, v4, v5, v6, v7] if pbc_position is None or pbc_position == xtal.CENTER_PBC_POSITION: # Implements maestro-src :: mm_mcscrystalunitcell.cxx :: center_on_structure cell_center = xtal.trans_fract_to_cart([0.5, 0.5, 0.5], *params) min_max = self.getCoordsMinMax(frame_number) for pt in points: for i, v in enumerate(cell_center): pt[i] += (0.5 * (min_max[0][i] + min_max[1][i]) - cell_center[i]) elif pbc_position.startswith(xtal.ANCHOR_PREFIX): offset = xtal.get_carts_from_anchor_string(pbc_position) for pt in points: for i, v in enumerate(offset): pt[i] += v new_points = [points[0], points[1], points[3], points[4]] return new_points
[docs] def getDesmondBoxPoints(self, frame_number: int): """ Compute desmond box points for the given frame number. :param frame_number: The frame number of the box for which points will be computed. :rtype: list(list) :return: List of 4 box points """ def half_pt(xyz: list): return [v / 2 for v in xyz] box_properties = self.getBoxProperties(frame_number) half_a = half_pt( [box_properties[0], box_properties[1], box_properties[2]]) half_b = half_pt( [box_properties[3], box_properties[4], box_properties[5]]) half_c = half_pt( [box_properties[6], box_properties[7], box_properties[8]]) # Create 8 box points. XYZ = 3 v0 = [-half_a[i] - half_b[i] - half_c[i] for i in range(XYZ)] v1 = [half_a[i] - half_b[i] - half_c[i] for i in range(XYZ)] v2 = [half_a[i] + half_b[i] - half_c[i] for i in range(XYZ)] v3 = [-half_a[i] + half_b[i] - half_c[i] for i in range(XYZ)] v4 = [-half_a[i] - half_b[i] + half_c[i] for i in range(XYZ)] v5 = [half_a[i] - half_b[i] + half_c[i] for i in range(XYZ)] v6 = [half_a[i] + half_b[i] + half_c[i] for i in range(XYZ)] v7 = [-half_a[i] + half_b[i] + half_c[i] for i in range(XYZ)] return [v0, v1, v3, v4]
[docs] def getBoxPoints(self, frame_number: int): """ Compute simulation box points. If trajectory is coming from Desmond, it uses frame box properties. If trajectory is coming from materials science, it uses crystal unit cell properties. :param frame_number: The frame number of the box for which points will be computed. :rtype: list(maestro_ui.MM_GraphicsVec3d) or None :return: List of 4 box points or None """ points = None # Treat matsci system differently (MATSCI-10398) if self.matsci_profile: row = get_entry_row(self.proj, self.eid) pbc_position = get_entry_row_property_value(row, xtal.PBC_POSITION_KEY) points = self.getCrystalUnitCellBoxPoints(frame_number, pbc_position) if points is None: points = self.getDesmondBoxPoints(frame_number) watermap_translation_xyz = self.getWaterMapBoxTranslationVector() box_points = [maestro_ui.MM_GraphicsVec3d() for i in range(len(points))] for i, v in enumerate(points): box_points[i].setX(v[0] + watermap_translation_xyz[0]) box_points[i].setY(v[1] + watermap_translation_xyz[1]) box_points[i].setZ(v[2] + watermap_translation_xyz[2]) return box_points
[docs] def getWaterMapBoxTranslationVector(self): """ If trajectory is generated from WaterMap, then trajectory cms file also contains 3 watermap specific properties which define translation values. r_watermap_trans1 -> x direction r_watermap_trans2 -> y direction r_watermap_trans3 -> z direction This setting is applicable only if player view position setting is not center molecule in workspace. """ data = self.settings_data if data is not None and not data.avp_center_molecules: entry_row = get_entry_row(self.proj, self.eid) xyz = [ get_entry_row_property_value(entry_row, prop) for prop in ('r_watermap_trans1', 'r_watermap_trans2', 'r_watermap_trans3') ] if all(xyz): return xyz return [0, 0, 0]
[docs] def updateBoxPropertiesInProject(self, frame_number: int): """ Update box property's value in the project. """ cell_properties = self.getBoxProperties(frame_number) box_values = projectmodel.TriclinicBox( [cell_properties[0], cell_properties[1], cell_properties[2]], [cell_properties[3], cell_properties[4], cell_properties[5]], [cell_properties[6], cell_properties[7], cell_properties[8]]) set_distance_cell_props(self.proj, self.eid, box_values)
[docs]class TrajectoryPlayer(QWidgetStyled): """ This is a toolbar for the Trajectory player :cvar currentFrameChanged: A signal emitted when the current frame changes in the player. :vartype currentFrameChanged: QtCore.pyqtSignal :cvar showTrajectorySnapshotPanel: A signal emitted when user clicks on View Snapshot menu from EXPORT button. :vartype showTrajectorySnapshotPanel: QtCore.pyqtSignal :cvar trajectoryLoaded: A signal emitted when a new trajectory is loaded in the player. Emits the total number of frames in the new trajectory :vartype trajectoryLoaded: QtCore.pyqtSignal :cvar trajectoryUnloaded: A signal emitted when a trajectory is unloaded from the player. :vartype trajectoryUnloaded: QtCore.pyqtSignal :cvar closed: A signal emitted when player toolbar is closed. :vartype closeD: QtCore.pyqtSignal :cvar atomsSelectedToggled: A signal emitted when atom selection changes. Emits a bool indicating if there are any atoms selected in the workspace :vartype atomsSelectedToggled: QtCore.pyqtSignal """ currentFrameChanged = QtCore.pyqtSignal(int) showTrajectorySnapshotPanel = QtCore.pyqtSignal() trajectoryLoaded = QtCore.pyqtSignal(int) trajectoryUnloaded = QtCore.pyqtSignal() closed = QtCore.pyqtSignal() createPlot = QtCore.pyqtSignal(Enum) atomsSelectedToggled = QtCore.pyqtSignal(bool) multiAtomsSelectedToggled = QtCore.pyqtSignal(bool) MAX_ALLOWED_ENTRIES_IN_WORKSPACE = 2
[docs] def __init__(self, trj_hub=None, matsci_profile=False, parent=None): """ Sets up all the tool buttons in the toolbar. See parent QtWidgets.QWidget for documentation. :param matsci_profile: A flag indicating if it is matsci profile. It is used to configure default setting in the profile. :type matsci_profile: bool :param trj_hub: The TrajectoryViewerHub which provides interaction with maestro. :type trj_hub: maestro_ui.TrajectoryViewerHub """ super(TrajectoryPlayer, self).__init__(parent) # Trajectory Plot Panel initialization and setup self.trajectory_plot_panel = traj_plot_gui.TrajAnalysisPlotPanel.getPanelInstance( ) self.trajectory_plot_panel.displayFrameAndAsl.connect( self.displayPlotSelection) self.trajectory_plot_panel.displayAsl.connect(self.displayPlotSelection) self.createPlot.connect(self.trajectory_plot_panel.createPlot) self.trajectoryLoaded.connect( self.trajectory_plot_panel.trajectoryChanged) self.currentFrameChanged.connect( self.trajectory_plot_panel.onCurrentFrameChanged) hub = maestro_ui.WorkspaceHub.instance() hub.atomSelectionChanged.connect(self._onAtomSelectChanged) self.matsci_profile = matsci_profile self.trj_hub = trj_hub self.setObjectName("trj_player_toolbar_form") base_layout = QtWidgets.QHBoxLayout(self) base_layout.setContentsMargins(7, 0, 0, 0) self._trj_maestro_settings = None self.setEntryTraj(None) self.setSnapshotPanelVisibility(False) self.is_snapshot_mode_active = False self.all_included = None # Construct dual state player buttons btn_info = ('start_or_step_back', 'play_or_pause', 'end_or_step_forward') for btn_name in btn_info: setattr(self, btn_name, QtWidgets.QToolButton()) btn = getattr(self, btn_name) btn.setCheckable(True) btn_action = QtWidgets.QAction() setattr(self, btn_name + "_action", btn_action) btn_action.setCheckable(True) btn.setDefaultAction(btn_action) base_layout.addWidget(btn) btn.setObjectName('trj_' + btn_name) base_layout.addSpacing(5) # Vertical line self.addVerticalLine(base_layout) base_layout.addSpacing(5) # Current frame label self.current_frame_label = QtWidgets.QLabel("Current\nFrame:") base_layout.addWidget(self.current_frame_label) self.current_frame_label.setObjectName("trj_current_frame_label") base_layout.addSpacing(5) # Current frame edit box self.current_frame = maestro_ui.MM_QLineEdit(self) base_layout.addWidget(self.current_frame) self.current_frame.setObjectName("trj_current_frame") current_frame_tooltip = textwrap.dedent(""" Click inside to edit the current frame value Press Enter or click out when done """) self.current_frame.setToolTip(current_frame_tooltip) self.frame_validator = QtGui.QIntValidator(self) self.current_frame.setValidator(self.frame_validator) empty_width = self.current_frame.minimumSizeHint().width() min_width = self.current_frame.fontMetrics().averageCharWidth() * 5 self.current_frame.setMinimumWidth(min_width) self.current_frame.setSizePolicy(QtWidgets.QSizePolicy.Fixed, QtWidgets.QSizePolicy.Fixed) base_layout.addSpacing(5) # Total frame label self.total_frame_label = QtWidgets.QLabel("of X") base_layout.addWidget(self.total_frame_label) self.total_frame_label.setObjectName("trj_total_frame") base_layout.addSpacing(5) # Vertical box layout to place slider and current frame, current elapsed # time and current frame state data. slider_vertical_box = QtWidgets.QVBoxLayout() slider_vertical_box.setContentsMargins(0, 0, 0, 0) base_layout.addLayout(slider_vertical_box) # Add Slider slider_vertical_box.addSpacing(10) self.frame_slider = maestro_ui.TripleMarkerSlider(self) slider_vertical_box.addWidget(self.frame_slider) self.frame_slider.setObjectName("trj_frame_slider") self.frame_slider.updateToolTipText.connect( self.updateFrameSliderToolTip) self.frame_slider.setSizePolicy(QtWidgets.QSizePolicy.Minimum, QtWidgets.QSizePolicy.Fixed) base_layout.setStretchFactor(slider_vertical_box, 2) # Slider elapsed time (two types of value depending on if playing or # dragging) slider_time_hbox = QtWidgets.QHBoxLayout() slider_time_hbox.setContentsMargins(0, 0, 0, 0) slider_vertical_box.addLayout(slider_time_hbox) self.slider_time = QtWidgets.QLabel("0 of 5 ns") slider_time_hbox.addWidget(self.slider_time, 5, QtCore.Qt.AlignCenter) self.slider_time.setObjectName("trj_slider_time") base_layout.addSpacing(5) # Add vertical line separator self.addVerticalLine(base_layout) base_layout.addSpacing(5) # Add menu button hbox (displayed in default state) self.menu_button_hbox = QtWidgets.QHBoxLayout() self.menu_button_hbox.setContentsMargins(0, 0, 0, 0) base_layout.addLayout(self.menu_button_hbox) base_layout.addSpacing(5) # Add Export and playback settings tool button with menu. btn_info = (('export_button', "Open Export menu"), ('playback_settings_button', "Show Playback Settings pane"), ('interactive_plots_button', 'Open plots menu')) for name, tooltip in btn_info: setattr(self, name, QtWidgets.QToolButton(self)) btn = getattr(self, name) btn.setObjectName('trj_' + name) btn.setPopupMode(QtWidgets.QToolButton.InstantPopup) self.menu_button_hbox.addWidget(btn, 2, QtCore.Qt.AlignLeft) btn.setToolTip(tooltip) # Hook up the interactive plots panel to its button self.setupPlotMenu(self.interactive_plots_button) self._movie_saver = MovieSaver(self) self._structure_exporter = frame_structure_exporter.FrameStructureExporter( player_obj=self, export_mode=ExportMode.TRAJECTORY_VIEWER, parent=self) # Setup export menu. self.setupExportMenu(self.export_button) base_layout.addStretch(1) self.playback_settings_button.setCheckable(True) self.close_button = QtWidgets.QPushButton("X") self.close_button.setObjectName("trj_player_close") base_layout.addWidget(self.close_button, 0, QtCore.Qt.AlignTop or QtCore.Qt.AlignRight) self.close_button.clicked.connect(self.closePlayerIfAllowed) self.setStyleSheet(stylesheet.PLAYER_TOOLBAR_STYLESHEET) self.frame_slider.valuesChanged.connect(self.sliderValuesChanged) self.current_frame.returnPressed.connect(self.currentFrameValueChanged) self.current_frame.lostFocus.connect(self.ensureValidCurrentFrame) self.play_or_pause.toggled.connect(self.playOrPauseButtonClicked) self.play_or_pause.toggled.connect(self.playOrPauseButtonToggled) self.start_or_step_back.clicked.connect( self.startOrStepBackButtonClicked) self.end_or_step_forward.clicked.connect( self.endOrStepForwardButtonClicked) self.current_frame.clicked.connect(self.pausePlayer) self.frame_slider.mouseDragged.connect(self.pausePlayer) self.frame_slider.mouseReleased.connect(self.resumePlayer) self.frame_timer = QtCore.QTimer(self) self.frame_timer.setSingleShot(True) self.frame_timer.timeout.connect(self._showNextFrame) # Initialize all variables. self.clearTrajectory() if maestro: maestro.project_close_callback_add(self.projectAboutToClose) self.setVisible(False) self.playback_settings_popup = playback_settings.PlaybackSettings( self.matsci_profile, self.playback_settings_button) # Refer to MAE-43922 why we have single shot timer for inclusion change. self.inclusion_update_timer = QtCore.QTimer() self.inclusion_update_timer.timeout.connect(self._handleInclusionUpdate) self.inclusion_update_timer.setInterval(10) self.inclusion_update_timer.setSingleShot(True) self.setupContextMenu()
[docs] def projectAboutToClose(self): """ Handle trajectory entry when project is about to be closed. """ if self.entry_traj: self.is_snapshot_mode_active = False self._restoreDefaultStateOfEntryCt(False)
[docs] def closePlayerIfAllowed(self): """ Called when user clicks on X button. If player is saving a movie, warn user before closing a player, otherwise hide player, notify clients and restore default entry ct. """ # Stop frame timer if player was active. if self.player_active: self.frame_timer.stop() should_close = not self.savingMovie() if self.savingMovie(): ret_code = self.showMovieInterruptionDialog( MovieInterruptionType.CLOSED) should_close = ret_code != MovieInterruptionDialogRetCode.RESUME if should_close: self.hide() self.trj_hub.getPBCMeasurementManager().setPBCMeasurementActive( False) self.closed.emit() # We should restore entry ct if snapshot panel is not active, # otherwise snapshot panel takes care of it. if self.entry_traj and (not self.is_snapshot_mode_active): self.restoreEntryCT(eid=self.entry_traj.eid, sync_workspace=True) # Player toolbar should not clear trajectory if snapshot panel is # still visible because underlying trajectory (too much memory # intensive) is shared between player and snapshot panel. if not self.is_snapshot_panel_visible: self.clearTrajectory()
[docs] def setEntryTraj(self, entry_traj=None): """ Set the EntryTrajectory for this panel. :param entry_traj: EntryTrajectory to be set. :type entry_traj: EntryTrajectory or None """ self.entry_traj = entry_traj self.trajectory_plot_panel.entry_traj = entry_traj
[docs] def showMovieInterruptionDialog(self, interruption_type): """ Show movie interruption warning dialog. If user pressed Save, save only currently saved frames. If user pressed Resume, continue saving all frames. If user pressed Cancel, discard all saved frames. :type interruption_type: enum(MovieInterruptionType) :param interruption_type: Interruption source button. :rtype: enum(MovieInterruptionDialogRetCode) :return: Button pressed by user. """ dlg = MovieExportInterruptionDialog(self, interruption_type) ret_code = dlg.exec_() if ret_code == MovieInterruptionDialogRetCode.PARTIAL_SAVE: self._movie_saver.stopSavingMovie(ok_status=True) elif ret_code == MovieInterruptionDialogRetCode.RESUME: self.player_active = True else: self._movie_saver.stopSavingMovie(ok_status=False) return ret_code
[docs] def restoreEntryCT(self, eid: int, sync_workspace: bool): """ Save original trajectory ct in the project entry because atoms position or visibility might have changed during play. CT will not be restored in case of any modification. :param eid: Entry id associated with current project. :param sync_workspace: Whether or not synchronize changes in the workspace. """ if self.entry_traj: self.entry_traj.restoreEntryCT(eid, sync_workspace, self.is_snapshot_mode_active)
[docs] def pausePlayer(self): """ Slot of maestro_ui.MM_QLineEdit.clicked() signal caused by current frame text edit box. Slot of maestro_ui.TripleMarkerSlider.mouseDragged() signal caused by slider drag. Pause a player and remember its current state. """ if self.player_active: self.player_was_paused = True self.player_active = False
[docs] def resumePlayer(self): """ 1. Resume playing if player was paused due to current frame text edit box. 2. Slot of maestro_ui.TripleMarkerSlider.mouseReleased() signal caused by stopping slider drag. 3. Advance Setting dialog is displayed. """ # If user edits current frame control (focus is with current frame). # Now if without changing focus, if user directly closes maestro or # project, first project close callback will be triggered which will # clear trajectory data and later resumePlayer() will be triggered # because current frame edit box lost a focus. # In such cases, we don't have trajectory itself, so we should return # early. if not self.entry_traj: return if self.player_was_paused: self.player_active = True self.player_was_paused = False else: # Show desired frame immediately. self.updateCurrentFrame()
[docs] def playOrPauseButtonClicked(self, checked): """ Rewind player when user clicks on play button and next frame is not a valid frame and loop option is NONE. """ if checked and self.loop_option == Loop.NONE: new_curr_frame = self.getNextFrameNumber(self.play_direction, self.current_frame_number) if not self.isValidFrame(new_curr_frame): if self.play_direction == Direction.FORWARD: self.current_value = self.getStartFrame() else: self.current_value = self.getEndFrame()
[docs] def updatePlayerTooltip(self): """ Player button tooltip depends on player state, so appropriately update tooltip. """ if self.player_active: self.start_or_step_back.setToolTip("Jump to start of trajectory") self.end_or_step_forward.setToolTip("Jump to end of trajectory") self.play_or_pause.setToolTip("Pause trajectory playback") else: self.start_or_step_back.setToolTip("Move one step back") self.end_or_step_forward.setToolTip("Move one step forward") if self.play_direction == Direction.FORWARD: self.play_or_pause.setToolTip("Play trajectory") else: self.play_or_pause.setToolTip("Play trajectory in reverse")
[docs] def playOrPauseButtonToggled(self): """ Update icons and start player if player button is checked. """ # Notify trajectory hub that player state has changed. self.trj_hub.trajectoryPlayerStateChanged.emit(self.player_active) # If user requested to play the trajectory, start timer. if self.player_active: # First frame should be displayed immediately instead of waiting # until timer expires. self.updateCurrentFrame() else: self.frame_timer.stop() if self.savingMovie(): self.showMovieInterruptionDialog(MovieInterruptionType.PAUSED) self.updatePlayer()
[docs] def startOrStepBackButtonClicked(self): """ Step back by step size or jump to start based on player state. """ if self.player_active: self.current_value = self.getStartFrame() elif self.isValidFrame(self.current_value - self.step_size): self.current_value -= self.step_size else: self.current_value = self.getStartFrame() # Show desired frame immediately. self.updateCurrentFrame()
[docs] def updateCurrentFrame(self): """ 1. Show current frame in the workspace. 2. Update player gui. :rtype: bool :return: Whether the current frame is updated. """ main_ct = maestro.workspace_get(copy=False) self.frame_timer.stop() if not self.updateStructureFromFrame( self.current_frame_number, main_ct, notify_maestro=True): # Use single shot timer to call clearTrajectory() to avoid issue # with disabling play_or_pause button QtCore.QTimer.singleShot(0, self.clearTrajectory) return False self.updatePlayer() if self.player_active: self.frame_timer.start() return True
[docs] def loadEntryID(self, eid): """ Loads an entry ID, displaying a message if the trajectory is not found :param eid: Entry ID of the requested trajectory :type eid: int :rtype: bool :return: Whether the entry ID was found and included """ if not self.entry_traj: return False if self.entry_traj.eid == eid: return True pt = maestro.project_table_get() row = pt.getRow(eid) if not row: delete_plots = messagebox.show_question( self, REQUESTED_TRAJ_NOT_FOUND_MSG, title="Requested Trajectory Not Found") if delete_plots: self.trajectory_plot_panel.deletePlotsForEntry(eid) return False load_traj = messagebox.show_question(self, LOAD_TRAJ_MSG, "Requested Trajectory Not Loaded", icon=messagebox.MessageBox.Warning) if not load_traj: return False else: entry_traj = EntryTrajectory(self.proj, eid, matsci_profile=self.matsci_profile) if not entry_traj.isValid(): delete_plots = messagebox.show_question( self, REQUESTED_TRAJ_NOT_FOUND_MSG, title="Requested Trajectory Not Found", icon=messagebox.MessageBox.Warning) if delete_plots: self.trajectory_plot_panel.deletePlotsForEntry(eid) return False # Updating inclusion to this new trajectory will load it. pt.includeRows([eid]) return True
[docs] def displayPlotSelection(self, fit_asl, eid, frame_idx=None): """ Loads the given entry ID, updates the current frame, and display an asl selection :param fit_asl: ASL to fit Workspace to after loading frame :type fit_asl: str :param eid: Entry ID of the requested trajectory :type eid: int :param frame_idx: Frame to be loaded with 1-based indexing. If None, no frame is loaded :type frame_idx: int or None """ if not self.loadEntryID(eid): return if frame_idx is not None: self.current_frame_number = frame_idx self.frame_slider.setMiddleValue(frame_idx) self.updateCurrentFrame() if fit_asl: # Explicitly optimize fog until completion of MAE-45427 cmd = (f'fit {fit_asl}\n' f'workspaceselectionreplace {fit_asl}\n' f'optimizefog') maestro.command(cmd)
def _togglePlotVisibleInteractions(self, use_visible): """ Toggle whether interactions plots should be generated using only visible Workspace atoms. :param viuse_visible: Whether only visible atoms should be considered. :type use_visible: bool """ self.trajectory_plot_panel.interactions_use_visible_atoms = use_visible
[docs] def endOrStepForwardButtonClicked(self): """ Jump to end or forward by step size based on player state. """ if self.player_active: self.current_value = self.getEndFrame() elif self.isValidFrame(self.current_value + self.step_size): self.current_value += self.step_size else: self.current_value = self.getEndFrame() # Show desired frame immediately. self.updateCurrentFrame()
[docs] def updatePlayerBackAndForwardButtons(self, player_active, current_value): """ Update back and forward buttons enable state depending on current value. If saving movie, then we should never enable back and forward buttons. """ self.end_or_step_forward.setChecked(player_active) self.start_or_step_back.setChecked(player_active) if player_active: self.start_or_step_back.setEnabled( current_value > self.getStartFrame() and (not self.savingMovie())) self.end_or_step_forward.setEnabled( current_value < self.getEndFrame() and (not self.savingMovie())) else: self.start_or_step_back.setEnabled( self.isValidFrame(current_value - self.step_size) and (not self.savingMovie())) self.end_or_step_forward.setEnabled( self.isValidFrame(current_value + self.step_size) and (not self.savingMovie()))
[docs] def savingMovie(self): """ :rtype: bool :return: True if saving a movie. """ return self._movie_saver.savingMovie()
[docs] def ensureValidCurrentFrame(self): """ Make sure that valid value is set in the current frame text edit box. """ if (self.current_frame_number < self.current_min_value or self.current_frame_number > self.current_max_value): self.current_frame_number = self.current_value elif self.current_frame_number != self.current_value: self.current_value = self.current_frame_number self.resumePlayer()
[docs] def sliderValuesChanged(self): """ Slot which gets called whenenver any slider point value changes. Updates current frame text box, elapsed time, and total time to view entire trajectory, current frame text box valid range. """ self.current_frame_number = self.current_value self.frame_validator.setRange(self.current_min_value, self.current_max_value) start_frame = self.playback_settings_popup.data.start_frame end_frame = self.playback_settings_popup.data.end_frame min_max_change = (start_frame != self.current_min_value or end_frame != self.current_max_value) self.playback_settings_popup.data.start_frame = self.current_min_value self.playback_settings_popup.data.end_frame = self.current_max_value if min_max_change and self.playback_settings_popup: self.playback_settings_popup.resetMaxStepSize() self.playback_settings_popup.data.current_frame = self.current_value self.currentFrameChanged.emit(self.current_frame_number) self.updatePlayer()
[docs] def updatePlayer(self): """ Update all components of player toolbar. """ self.updateTotalTime() self.updateElapsedTime() self.slider_time.setStyleSheet( stylesheet.SLIDER_TIME_STYLE_BRIGHT if self. player_active else stylesheet.SLIDER_TIME_STYLE_NORMAL) self.updatePlayerBackAndForwardButtons(self.player_active, self.current_value) # Disable Export button whenever player is active and not saving movie. self.export_button.setEnabled(not self.player_active and (not self.savingMovie())) # Update simulation box if player is not active because automatic # update happens during play. if not self.player_active: self.updateSimulationBox() # Update tooltip based on player state. self.updatePlayerTooltip()
[docs] def getTimeValueInRange(self, start_frame: int, end_frame: int): """ Compute total time for a given range. Function returns valid value only if there is a valid trajectory and start and end frame numbers are valid. :param start_frame: Starting frame number. :param end_frame: Ending frame number. :return: Total time in the given range. :rtype: float """ if (self.entry_traj and self.isValidFrame(start_frame) and self.isValidFrame(end_frame)): frame_numbers = range(start_frame, end_frame + 1) return sum( self.entry_traj.getFrameTime(frame_number) - self.entry_traj.getFrameTime(frame_number - 1) for frame_number in frame_numbers) else: return 0
[docs] def updateTotalTime(self): """ Update total time label keeping slider min and max in the consideration. """ self.total_time = self.getTimeValueInRange(self.getStartFrame(), self.getEndFrame())
[docs] def updateElapsedTime(self): """ Update elapsed time label keeping slider current and min size in the consideration. """ self.elapsed_time = self.getTimeValueInRange(self.getStartFrame(), self.current_value)
[docs] def currentFrameValueChanged(self): """ Slot which gets called whenenver current frame value text changes. Updates middle slider point and elapsed time. """ self.current_value = self.current_frame_number self.resumePlayer()
@property def current_min_value(self): """ Return slider's left point value. """ return int(self.frame_slider.leftValue()) @current_min_value.setter def current_min_value(self, value): """ Set slider's left point value. """ self.frame_slider.setLeftValue(value) @property def current_max_value(self): """ Return slider's right point value. """ return int(self.frame_slider.rightValue()) @current_max_value.setter def current_max_value(self, value): """ Set slider's right point value. """ self.frame_slider.setRightValue(value) @property def current_value(self): """ Return slider's middle point value. 0-indexed value. """ return int(self.frame_slider.middleValue()) @current_value.setter def current_value(self, value): """ Set slider's middle point value. """ self.frame_slider.setMiddleValue(value) self.current_frame_number = value # Current value is reset when clearing a trajectory # i.e. deleting a trajectory object, so we won't have playback settings # object at that time. if self.playback_settings_popup: self.playback_settings_popup.data.current_frame = value self.currentFrameChanged.emit(self.current_frame_number) @property def current_frame_number(self): """ Return current frame number. """ text_val = self.current_frame.text() return int(text_val) if text_val else 0 @current_frame_number.setter def current_frame_number(self, frame_number): """ Set current frame number to be displayed. """ self.current_frame.setText(str(frame_number)) @property def total_frame(self): """ Return total frame in the trajectory. """ return self._total_frame @total_frame.setter def total_frame(self, value): """ Set total frame label. """ self.total_frame_label.setText("of " + str(value)) self._total_frame = value @property def total_time(self): """ Return total time to view trajectory. """ return self._total_time
[docs] def formattedValue(self, value): """ Return formatted value of 2 precision in string format. """ return "{0:.2f}".format(value)
@total_time.setter def total_time(self, value): """ Set total frame label. """ self.slider_time.setText(" ".join( [str(0), "of", self.formattedValue(value), "ns"])) self._total_time = value @property def elapsed_time(self): """ Return elapsed time of trajectory view. """ return self._elapsed_time @elapsed_time.setter def elapsed_time(self, value): """ Set elapsed time of trajectory view. """ self.slider_time.setText(" ".join([ self.formattedValue(value), "of", self.formattedValue(self.total_time), "ns" ])) self._elapsed_time = value @property def player_active(self): """ Return true if player is running. """ return self.play_or_pause.isChecked() @player_active.setter def player_active(self, state): """ Set player active or inactive state. """ self.play_or_pause.setChecked(state)
[docs] def updateSimulationBox(self): """ Compute simulation box points and notify maestro to show/hide simulation box. Notify maestro to draw simulation box based on current frame box position. TODO: MultipleTrajectory - We need to update all trajectories simulation box. """ if self.trj_hub is None: return self.trj_hub.setDisplaySimulationBox.emit(self.simbox_visible) if self.entry_traj is None: return if not self.simbox_visible: return cell_points = self.entry_traj.getBoxPoints(self.current_frame_number) # To do pass the vector lengths here if cell_points: self.trj_hub.setSimulationBox.emit( maestro_ui.MM_GraphicsParallelepiped(cell_points), self.playback_settings_popup.data.include_vector_lengths)
[docs] def getStartAtom(self, entry_traj: EntryTrajectory): """ Get starting atom of this trajectory entry in the workspace ct. :param entry_traj: Trajecotry object of which first atom we need to find in the workspace structure. :rtype: int :return: First atom in the workspace structure associated with this trajectory entry. Note: This will change for multiple trajectories. """ return 1
@property def simbox_visible(self): """ Return true if simulation box is visible. """ return self._simbox_visible @simbox_visible.setter def simbox_visible(self, value): """ Set simulation box visibility flag and notify maestro. """ self._simbox_visible = value self.updateSimulationBox() def _showNextFrame(self): """ Calculate next frame number according to step, direction, loop options and display frame in the workspace. """ # Stop frame timer and setup current frame for maestro # to display in the workspace. self.frame_timer.stop() continue_play = self.trj_hub.continueTrajectoryPlay() if continue_play: new_curr_frame = self.takeTrajectoryStep(self.current_frame_number) # If next frame is valid, then only update it. if continue_play and self.isValidFrame(new_curr_frame): self.current_value = new_curr_frame self.updateCurrentFrame() else: # Stop player if we reached to end. if self.savingMovie(): self._movie_saver.stopSavingMovie(ok_status=True) self.player_active = False self.updatePlayer()
[docs] def setDisplayAtomsASL(self, unused, forced_update=True): """ Set matching atoms asl. This ASL acts as display only asl if Playback Settings -> Advanced tab -> Display Only checked, otherwise acts as hide specified atoms. :param unused: It is not used. :type unused: bool :type forced_update: bool :param forced_update: Update current frame in workspace if player is not active. """ if self.playback_settings_popup.data.display_only: asl = self.playback_settings_popup.getDisplayAtomsAsl() else: asl = self.playback_settings_popup.getHideAtomsAsl() self._display_atoms_asl_changed = True self._display_atoms_asl_is_valid = False # Default is True to indicate that ASL is always dynamic, unless # we are assured by mmasl_is_dynamic_expression API check. self._display_atoms_asl_is_dynamic = asl is not None if asl: self._display_atoms_asl_is_valid = analyze.validate_asl(asl) self._display_atoms_asl_is_dynamic = ( self._display_atoms_asl_is_valid and mm.mmasl_is_dynamic_expression(asl) == mm.TRUE) # If player is not active and user modified asl, then apply it # immmediately. if not self.player_active and forced_update: self.updateCurrentFrame()
[docs] def canAcceptVisibilityChange(self, all_included): """ Check if visibility change can be accepted or not. @param all_included: Currently included entries list. @type all_included: list @return: True if visibility change can be accepted @rtype: bool """ if self.entry_traj is None: return False eid = self.entry_traj.eid if eid is None or self._ignore_visibility_change: return False if all_included is not None and not (eid in all_included): return False return True
[docs] def visibilityChanged(self, displayed, undisplayed): """ Update saved atoms visibility according to new state. It honors visibility changes only if visibility is updated by any component which is not a part of player toolbar. :type displayed: list(int) :param displayed: List of atoms which are displayed. :type undisplayed: list(int) :param undisplayed: List of atoms which are undisplayed. """ if not self.canAcceptVisibilityChange(self.all_included): return eid = self.entry_traj.eid self.entry_traj.visibilityChanged(displayed, undisplayed, self.getStartAtom(self.entry_traj))
[docs] def workspaceEntryStructureChanged(self, atoms_bs, change_type): """ Slot to handle workspace entry structure changes. It identifies type of structure change (color change, ribbon setting change, label setting change, representation change) and accordingly transfer changes in the saved original entry structure. When snapshot mode is active, we don't propagate changes in the original entry structure because those changes are transient. :param atoms_bs: The MMbs handle of atoms which are affected by this change. :type atoms_bs: MMbs handle :param change_type: Type of workspace change (color, label, or ribbon) :type change_type: maestro_ui.MMENUM_DGO_CHANGED_TYPE """ if self.entry_traj is None or self.is_snapshot_mode_active: return workspace_st, _ = get_workspace_structure_and_entry_total() self.entry_traj.updateTrajectoryEntryStructure(workspace_st, atoms_bs, change_type)
[docs] def applyDisplayAtomsASLOnCT(self, entry_traj: EntryTrajectory, frame_ct: structure.Structure): """ Display or hide atoms in the frame_ct based on stored atoms asl. ASL acts as display only asl if Playback Settings -> Advanced tab -> Display Only checked, otherwise acts as hide specified atoms. :param frame_ct: The ct to be updated. :param entry_traj: Entry trajectory to be used to update visibility. """ display_only = self.playback_settings_popup.data.display_only visible = display_only if display_only: asl = self.playback_settings_popup.getDisplayAtomsAsl() else: asl = self.playback_settings_popup.getHideAtomsAsl() entry_traj.applyVisibilityAsl(frame_ct, asl, display_only, visible)
[docs] def getOriginalEntryCT(self, copy: bool): """ :param copy: True if caller needs a copy. :rtype: structure.Structure :return: Copy of initial entry ct or reference to it. """ if copy: st = self.entry_traj.orig_entry_ct.copy() set_structure_atoms_entry_id(st, self.entry_traj.eid, 1, st.atom_total) return st else: return self.entry_traj.orig_entry_ct
[docs] def isVaryingAtomsFrame(self): """ If trajectory contains inactive atoms (i.e. varying atoms frame) or not. :rtype: bool :return: - For fixed atoms trajectory, it returns False. - For varying atoms trajectory, it returns True. """ return self.entry_traj.isVaryingAtomsFrame()
[docs] def getUpdatedFrameSnapshot(self, frame_number: int, frame_ct: structure.Structure, forced_update: bool, use_display_asl: bool): """ Update frame ct using given frame number's associated frame. Also update atoms visibility based on player defined visibility asl. :param frame_number: Frame number of the frame to be used for updation. :param frame_ct: Frame structure which will be updated. :param forced_update: Forcefully updated (it is required when first frame is loaded). :param use_display_asl: Determine if display asl should also be applied or not. :rtype: structure.Structure :return: Updated frame structure. """ old_atom_total = frame_ct.atom_total updated_frame_ct = self.entry_traj.getUpdatedFrameStructure( frame_number=frame_number, frame_ct=frame_ct, frames_to_smooth=None) new_atom_total = updated_frame_ct.atom_total # Update entry id if new atoms are added in the frame. if new_atom_total > old_atom_total: set_structure_atoms_entry_id(updated_frame_ct, self.entry_traj.eid, old_atom_total + 1, new_atom_total) if use_display_asl and (self.shouldRestoreVisibility(self.entry_traj) or forced_update): self.entry_traj.restoreDefaultVisibility(frame_ct) if use_display_asl and (self.needVisibilityUpdate(self.entry_traj) or (self._display_atoms_asl_is_valid and forced_update)): self.applyDisplayAtomsASLOnCT(self.entry_traj, updated_frame_ct) return updated_frame_ct
[docs] def shouldUpdateVisibility(self, entry_traj: EntryTrajectory): """ Check if visibility should be updated or not. Applicable in both restoring original visibility and apply asl based visibility. :param entry_traj: Entry trajectory to be used to know if fixed atoms frame ct or varying atoms frame ct. :rtype: bool :return: True if visibility should be updated. """ # If ASL is dynamic, we have to evaluate ASL every time because distance # can vary between atoms frame to frame. # If varying atoms frame, we have to restore because new atoms might # have been added. return self._display_atoms_asl_is_dynamic or ( entry_traj and entry_traj.isVaryingAtomsFrame())
[docs] def shouldRestoreVisibility(self, entry_traj: EntryTrajectory): """ Determine if frame ct visibility should be restored or not. :param entry_traj: Entry trajectory to be used to know if fixed atoms frame ct or varying atoms frame ct. :rtype: bool :return: True if visibility should be restored. """ # If asl changed, we have to restore to start from scratch. return (self._display_atoms_asl_changed or self.shouldUpdateVisibility(entry_traj))
[docs] def needVisibilityUpdate(self, entry_traj: EntryTrajectory): """ Determine if frame ct visibility should be updated or not. :param entry_traj: Entry trajectory to be used to know if fixed atoms frame ct or varying atoms frame ct. :rtype: bool :return: True if visibility should be updated. """ return (self._display_atoms_asl_is_valid and self.shouldUpdateVisibility(entry_traj))
[docs] def applyDisplayAtomsASL(self, entry_traj: EntryTrajectory, changed_structure: bool): """ Display or hide atoms based on stored atoms asl in the entry_traj's frame ct. ASL acts as display only asl if Playback Settings -> Advanced tab -> Display Only checked, otherwise acts as hide specified atoms. :param entry_traj: Entry trajectory to be used to update visibility. :param changed_structure: Indicates if structure itself is changed or not. :rtype: bool :return: Whether the asl is applied. """ # TODO: MultipleTrajectory - We need to iterate over all trajectories # and update visiblity. visibility_changed = False always_apply_asl = entry_traj.isVaryingAtomsFrame() if self.shouldRestoreVisibility(entry_traj): entry_traj.restoreDefaultVisibility(entry_traj.frame_ct) visibility_changed = True display_atoms_asl_changed = self._display_atoms_asl_changed self._display_atoms_asl_changed = False if not self._display_atoms_asl_is_valid: return visibility_changed # Visibility does not depend on atom coordinate. # If ASL did not change, structure did not change (except atom # coordinate), so set of visible atoms should not have changed since # the last call of this function. # This check is intended for optimization purpose to avoid evaluating # and applying asl for each frame. if (not self.needVisibilityUpdate(entry_traj) and (not changed_structure) and self.player_active and (not display_atoms_asl_changed)): return visibility_changed self.applyDisplayAtomsASLOnCT(entry_traj, entry_traj.frame_ct) return True
[docs] def updatePBCMeasurement(self, entry_traj, frame_number): """ Update data in maestro_ui.TrajectoryViewerHub, so that maestro calculate measurements keeping PBC in consideration. :param frame_number: Frame number to be displayed in workspace. Possible values are 1 to self.total_frame :type frame_number: int """ #TODO MultipleTrajectory - We need to update PBC measurement box for all # trajectories. # Maestro measurements are calculated keeping PBC in mind only # if player is visible and replica is one. honor_pbc = (entry_traj.getNumberOfReplica() == 1 and self.isVisible() and self.isValidFrame(frame_number)) pbc_measurement_manager = self.trj_hub.getPBCMeasurementManager() pbc_measurement_manager.setPBCMeasurementActive(honor_pbc) # Update box values in the trajectory hub, so that maestro can # compute measurement keeping PBC in mind. if honor_pbc: frame = entry_traj.getFrame(frame_number) pbc_measurement_manager.updatePBC([ frame.box[0][0], frame.box[0][1], frame.box[0][2], frame.box[1][0], frame.box[1][1], frame.box[1][2], frame.box[2][0], frame.box[2][1], frame.box[2][2] ], entry_traj.getFrameActiveAtomTotal(frame_number))
[docs] def getUpdatedCT(self, entry_traj: EntryTrajectory, frame_number: int, main_ct: structure.Structure, honor_replica: bool): """ Update entry_traj frame ct using given frame number and apply changes in the main_ct either by transferring delta change or by copying frame ct from scratch. :param entry_traj: Entry trajectory object. :param frame_number: Frame to be used to update main ct. :param main_ct: The main ct which will be updated using entry traj's frame. :param honor_replica: Whether to honor replica. Export structure does not support replica. :rtype: tuple(structure.Structure, bool, bool, maestro_ui.MMENUM_DGO_CHANGED_TYPE, dict) :return: (Modified main_ct or a copy of frame_ct, True if original structure is modified, True if visibility changed, DGO change type if secondary structure assignment was performed, renumbering dictionary (Keys are atom number before deleting and value for each is the new atom number or None if that atom was deleted). """ old_frame_atom_total = entry_traj.getAtomTotal(entry_traj.frame_ct) trajectory_atom_total_in_main_ct = entry_traj.getTrajectoryAtomTotalInWorkspaceStructure( main_ct) had_replication = is_replica_atom_count( trajectory_atom_total_in_main_ct, old_frame_atom_total) varying_atoms_frame = entry_traj.isVaryingAtomsFrame() num_replica = entry_traj.getNumberOfReplica() # Optimization - we need to make a copy of original ct in the frame ct. # Switching from replica == 1 to replica > 1 - MAE-42350 if not varying_atoms_frame and not had_replication and num_replica > 1: entry_traj._updateSavedFrameStructure() # Get updated frame ct from trajectory. frame_ct = entry_traj.updateFrameStructure(frame_number) ct_changed = False visibility_changed = False # We assume we do not have a connectivity change (e.g. a structural change, # also the highest level change we have). If we do have a connectivity # change, then we will set this flag connectivity_changed = False atom_id_map = None if frame_ct is not None: trajectory_start_atom = entry_traj.trajectory_start_atom_in_workspace_structure frame_ct_changed = old_frame_atom_total != frame_ct.atom_total self.updateFrameCTTitle(main_ct, frame_number) visibility_changed = self.applyDisplayAtomsASL( entry_traj, frame_ct_changed) # Assign secondary structure update_ss = ( self.playback_settings_popup.data.update_secondary_structure and entry_traj.has_secondary_structure) if update_ss: entry_traj.assignSecondaryStructure() if frame_ct_changed: # The number of atoms has changed, so we have a connectivity # change connectivity_changed = True main_ct, atom_id_map = self.syncCTAtomsFromFrameAtoms( frame_ct, main_ct, old_frame_atom_total, entry_traj.eid, trajectory_atom_total_in_main_ct, trajectory_start_atom) if main_ct is None: main_ct = frame_ct.copy() #FIXMEREFENTRY - Varying atoms main ct is entirely replaced, # so we won't have reference structure entry in the main ct. # MAE-43247 should fix it. entry_traj.reference_entry_statistics = None set_structure_atoms_entry_id(main_ct, entry_traj.eid, 1, main_ct.atom_total) trajectory_atom_total_in_main_ct = self.entry_traj.getTrajectoryAtomTotalInWorkspaceStructure( main_ct) if honor_replica: # Update main ct according to replicas. replica_vector = entry_traj.getReplicateVector() main_ct, main_ct_changed, imap = self._performReplication( frame_ct, main_ct, numpy.prod(replica_vector), entry_traj.eid, trajectory_atom_total_in_main_ct, trajectory_start_atom) if not entry_traj.isVaryingAtomsFrame(): atom_id_map = imap # Optimization - we need to set reference to workspace # ct to frame ct, so we don't have to transfer frame change to ct. # Switching from replica > 1 to replica == 1 - MAE-42350 if (had_replication and not varying_atoms_frame and num_replica == 1): entry_traj._updateSavedFrameStructure() connectivity_changed = True else: connectivity_changed = connectivity_changed or main_ct_changed else: # Ignore current replica values, so set explicitly [1,1,1] replica_vector = [1, 1, 1] main_ct_changed = False fr = entry_traj.getFrame(frame_number) topo.update_ct_box(main_ct, fr.box) # Optimization: If we are not using temporary frame structure, it # implies that we are directly updating workspace strutcture, # so there is no need to transfer frame change - MAE-42350 if entry_traj.usingTemporaryFrameStructure(): if not connectivity_changed: connectivity_changed = not compare_structures_connectivity( frame_ct, main_ct) self.transferFrameChangeToCT(frame_ct, main_ct, replica_vector, update_ss, trajectory_start_atom) ct_changed = frame_ct_changed or main_ct_changed if connectivity_changed: dgo_change = maestro_ui.MM_DGO_CONNECTIVITY_CHANGED else: dgo_change = maestro_ui.MM_DGO_COORDINATES_CHANGED return (main_ct, ct_changed, visibility_changed, dgo_change, atom_id_map)
[docs] def syncCTAtomsFromFrameAtoms(self, frame_ct: structure.Structure, main_ct: structure.Structure, old_frame_atom_total: int, eid: int, trajectory_atom_total_in_main_ct: int, trajectory_start_atom: int): """ Varying atoms frame does not always contain same number of atoms as in the previous frame. main_ct represents a previously displayed frame atoms. So we need to adjust main_ct atoms according to new frame ct. If main_ct contains replica atoms, then we don't do anything because it is simpler to clone frame_ct and extend in the main_ct instead of selectively deleting/adding atoms in the middle of main_ct. :param frame_ct: New frame ct to be used for synching atom. :param main_ct: Main ct to be updated. :param old_frame_atom_total: Previously displayed frame atom total. :param eid: Entry id to be set when new atoms are added in the main_ct. :param trajectory_atom_total_in_main_ct: Total trajectory atoms in the workspace structure (including replica and varying atoms). :param trajectory_start_atom: Starting position of trajectory atom in the workspace structure. :rtype: tuple(structure.Structure, dict) :return: Modified main ct or unchanged main ct and renumbering dictionary. Keys are atom number before deleting and value for each is the new atom number or None if that atom was deleted. """ atom_id_map = None frame_ct_atom_total = frame_ct.atom_total # If main_ct is a replica of frame_ct or atom total in main_ct same as # frame_ct, nothing to do here. if (is_replica_atom_count(trajectory_atom_total_in_main_ct, frame_ct_atom_total) or trajectory_atom_total_in_main_ct == frame_ct_atom_total): return (main_ct, atom_id_map) # If main ct contains replica of previous frame ct, do nothing. elif is_replica_atom_count(trajectory_atom_total_in_main_ct, old_frame_atom_total): return (None, atom_id_map) # Delete extra atoms from main ct. elif trajectory_atom_total_in_main_ct > frame_ct_atom_total: atom_id_map = main_ct.deleteAtoms(range( trajectory_start_atom + trajectory_atom_total_in_main_ct - 1, trajectory_start_atom + frame_ct_atom_total - 1, -1), renumber_map=True) # Add extra atoms in the main ct. elif trajectory_atom_total_in_main_ct < frame_ct_atom_total: delta_st = frame_ct.extract(range( trajectory_atom_total_in_main_ct + 1, frame_ct_atom_total + 1), copy_props=False) main_ct.extend(delta_st) set_structure_atoms_entry_id( main_ct, eid, trajectory_start_atom + trajectory_atom_total_in_main_ct, trajectory_start_atom + frame_ct_atom_total - 1) return (main_ct, atom_id_map)
[docs] def updateStructureFromFrame(self, frame_number: int, main_ct: structure.Structure, notify_maestro: bool): """ Validate main ct and update main_ct using given trajectory frame. If notify_maestro is True, notify maestro to display updated main ct in the workspace as well. :param frame_number: Frame number to be displayed in workspace. Possible values are 1 to self.total_frame :param main_ct: Workspace ct to be updated using given frame. :param notify_maestro: Whether to notify maestro about workspace changes. """ # TODO: MultipleTrajectory # Validate structure if (self.entry_traj.isFrameStructureChanged(main_ct) and (not self._handleInvalidStructures(self.entry_traj))): return False (main_ct, changed_structure, visibility_changed, dgo_change, atom_id_map) = self.getUpdatedCT(self.entry_traj, frame_number, main_ct, honor_replica=True) self.updatePBCMeasurement(self.entry_traj, frame_number) if main_ct is not None: self._ignore_visibility_change = True if notify_maestro: # Update box properties in the project according to current # frame, so that workspace drawing can update entry pbcs self.entry_traj.updateBoxPropertiesInProject(frame_number) self.notifyMaestro(main_ct, changed_structure, visibility_changed, dgo_change, atom_id_map) self._ignore_visibility_change = False # Save movie frame if save movie option is active. if self.savingMovie(): self._movie_saver.saveFrameForMovie() else: if self.savingMovie(): self.stopSavingMovie(ok_status=False) return False return True
[docs] def transferFrameChangeToCT(self, frame_ct: structure.Structure, main_ct: structure.Structure, replica_vector: tuple, update_ss: bool, trajectory_start_atom: int): """ Copy frame ct changes - position, secondary structure, and visibility in the main ct. The main ct may not exactly have same number of atoms as in the frame ct if replica_vector != (1,1,1) :param frame_ct: Frame ct to be used as reference. :param main_ct: Workspace ct to be updated using frame_ct. :param replica_vector: Replication vector in a,b,c direction. :param update_ss: Whether secondary structure properties needs to be updated. :param trajectory_start_atom: Starting position of trajectory atom in the workspace structure. Note: This function modifies the main_ct. """ # TODO: MultipleTrajectory frame_ct_atom_total = frame_ct.atom_total num_replica = numpy.prod(replica_vector) for src_at in range(1, frame_ct_atom_total + 1): visible = mm.mmctg_atom_get_visible(frame_ct, src_at) if update_ss: ss = mm.mmct_atom_get_secondary_struct(frame_ct, src_at) atom_offset = trajectory_start_atom - 1 + src_at for replica in range(num_replica): dest_at = atom_offset + (frame_ct_atom_total * replica) mm.mmctg_atom_set_visible(main_ct, dest_at, visible) if update_ss: mm.mmct_atom_set_secondary_struct(main_ct, dest_at, ss) # Update xyz position in the entire ct. xyz = frame_ct.getXYZ(copy=False) if self.entry_traj.allow_reference_entry: if num_replica > 1: unroll_pos(main_ct, replica_vector, xyz, trajectory_start_atom) else: main_ct_xyz = main_ct.getXYZ(copy=False) main_ct_xyz[trajectory_start_atom - 1:trajectory_start_atom - 1 + frame_ct_atom_total] = xyz main_ct.setXYZ(main_ct_xyz) else: if num_replica > 1: utils.unroll_pos(main_ct, replica_vector, xyz) else: main_ct.setXYZ(xyz)
[docs] def notifyMaestro(self, main_ct: structure.Structure, changed_structure: bool, visibility_changed: bool, dgo_change: maestro_ui.MMENUM_DGO_CHANGED_TYPE, atom_id_map: dict): """ Notify maestro about workspace ct changes and update simulation box. :param main_ct: Structure to be set as workspace ct if structure was changed. :param changed_structure: True if st is different than workspace ct. :param visibility_changed: True if atoms visibility changed in the st. :param dgo_change: Type of dgo change. :type dgo_change: maestro.MMENUM_DGO_CHANGED_TYPE :param atom_id_map: A dictionary of old to new atoms. Keys are atom number before deleting and value for each is the new atom number or None if that atom was deleted. """ if changed_structure: maestro.workspace_set(main_ct, copy=False, regenerate_markers=False, check_scratch_entry=False) mmct_imap = get_mmct_id_map_from_dict(atom_id_map) self.trj_hub.allAtomsChanged.emit(main_ct, dgo_change, mmct_imap) if mm.mmct_id_map_in_use(mmct_imap): mm.mmct_id_map_delete(mmct_imap) if not changed_structure: if visibility_changed: self.trj_hub.clearAtomsVisibility.emit() self.trj_hub.allAtomsChanged.emit( main_ct, maestro_ui.MM_DGO_VISIBILITY_CHANGED) self.updateSimulationBox()
[docs] def updateFrameCTTitle(self, st, frame_number): """ Update given frame ct title based on frame number. :type st: structure.Structure :param st: Frame structure :type frame_number: int :param frame_number: Frame number to be displayed in workspace. """ st.title = f"frame{frame_number}"
[docs] def getStartFrame(self): """ Return start frame number. Internally frame numbering uses 0-based index, but user interface uses 1-based index. """ return self.current_min_value
[docs] def getEndFrame(self): """ Return end frame number. """ return self.current_max_value
[docs] def getNextFrameNumber(self, play_direction, curr_frame): """ Return next frame number based on player direction and step size. Returns None if it is not possible to get next frame number. """ if play_direction == Direction.FORWARD: if self.isValidFrame(curr_frame + self.step_size): return curr_frame + self.step_size else: if self.isValidFrame(curr_frame - self.step_size): return curr_frame - self.step_size
[docs] def takeTrajectoryStep(self, curr_frame): """ Return next frame number based on player direction, step size, and loop option. """ new_curr_frame = self.getNextFrameNumber(self.play_direction, curr_frame) if self.isValidFrame(new_curr_frame): pass # If we did not get valid frame, it implies we have reached to end of # foward or backwand direction. elif self.loop_option == Loop.NONE: pass elif self.loop_option == Loop.OSCILLATE: # Reverse play direction when oscillating. if self.play_direction == Direction.BACKWARD: self.play_direction = Direction.FORWARD else: self.play_direction = Direction.BACKWARD new_curr_frame = self.getNextFrameNumber(self.play_direction, curr_frame) else: if self.play_direction == Direction.FORWARD: start_frame = self.getStartFrame() # Get the number of steps back to start frame. num_steps = (curr_frame - start_frame) / self.step_size new_curr_frame = int(curr_frame - (num_steps * self.step_size)) else: end_frame = self.getEndFrame() # Get the number of steps back to end frame. num_steps = (end_frame - curr_frame) / self.step_size new_curr_frame = int(curr_frame + (num_steps * self.step_size)) if self.isValidFrame(new_curr_frame): return new_curr_frame
[docs] def isValidFrame(self, curr_frame): """ Return true if given frame number is valid. """ start_frame = self.getStartFrame() end_frame = self.getEndFrame() return (curr_frame and start_frame <= curr_frame <= end_frame)
def _performReplication(self, frame_ct: structure.Structure, main_ct: structure.Structure, num_required_copies: int, eid: int, trajectory_atom_total_in_main_ct: int, trajectory_start_atom: int): """ Extend main ct or delete atoms from it based num of required replicas. setting. :param frame_ct: Source ct to be used for replication. :param main_ct: Destination ct to be updated. :param num_required_copied: Number of replicas to be created. :param eid: is the entry id to be set on newly added atoms. :param trajectory_atom_total_in_main_ct: Total trajectory atoms in the workspace structure (including replica and varying atoms). :param trajectory_start_atom: Starting position of trajectory atom in the workspace structure. :rtype: tuple (structure.Structure, bool, dict) :return: Modified main ct and whether atom count changed in the main ct, and renumbering dictionary. Keys are atom number before deleting and value for each is the new atom number or None if that atom was deleted. """ atom_id_map = None orig_atom_total = frame_ct.atom_total num_existing_copies = trajectory_atom_total_in_main_ct // orig_atom_total if num_required_copies == num_existing_copies: return (main_ct, False, atom_id_map) if num_required_copies > num_existing_copies: for i in range(num_required_copies - num_existing_copies): main_ct.extend(frame_ct) set_structure_atoms_entry_id( main_ct, eid, trajectory_start_atom + (num_existing_copies * orig_atom_total), trajectory_start_atom + (num_required_copies * orig_atom_total) - 1) else: atom_id_map = main_ct.deleteAtoms(range( trajectory_atom_total_in_main_ct, orig_atom_total * num_required_copies, -1), renumber_map=True) return (main_ct, True, atom_id_map)
[docs] def setSnapshotPanelVisibility(self, visible: bool): """ Set snapshot visiblity state. :param visible: True if visible. """ self._snapshot_panel_visible = visible
@property def is_snapshot_panel_visible(self): """ :return: True if snapshot panel is visible. :rtype: bool """ return self._snapshot_panel_visible
[docs] def setReferenceEntryStatistics(self, use_full_system_ct: bool): """ Set reference entry statistics for the current trajectory if allowed. :param use_full_system_ct: When trajectory is initially loaded, this flag is True indicating that full system ct should be used as reference because for varying atoms, frame ct may not be in sync with full system ct. """ main_ct, entry_total = get_workspace_structure_and_entry_total() entry_traj = self.entry_traj if entry_total == self.MAX_ALLOWED_ENTRIES_IN_WORKSPACE: # When setting up a trajectory, frame ct is not yet loaded in the # main ct, so we should use original entry ct atom total as frame_ct # atom total for varying frames atoms. if use_full_system_ct: frame_atom_total = entry_traj.cms_model.fsys_ct.atom_total else: frame_atom_total = entry_traj.getFrameActiveAtomTotal( self.current_value) num_replica = entry_traj.getNumberOfReplica() entry_traj.reference_entry_statistics = ReferenceEntryStatistics( main_ct, self.getEid(), frame_atom_total, num_replica) update_saved_frame_structure = True else: # Need to save the frame structure only when switching from # reference entry + trajectory entry to only trajectory entry update_saved_frame_structure = entry_traj.allow_reference_entry entry_traj.reference_entry_statistics = None if update_saved_frame_structure: entry_traj._updateSavedFrameStructure()
[docs] def inclusionUpdated(self, included, excluded, all_included, previous_included): """ Slot associated with WorkspaceHub.inclusionChanged() signal. :param included: List of included entry ids in the workspace. :type included: list(int) :param excluded: List of excluded entry ids from the workspace. :type excluded: list(int) :param all_included: List of currently included entry ids in the workspace ct, it does not guarantee that project also has same included entries at the moment because this slot may be triggered before updating the entry workspace state in the project. :type all_included: list(int) :param previous_included: List of previously included entry ids in the workspace ct. :type all_included: list(int) """ if (not self.isVisible()) and (not self.is_snapshot_panel_visible): return # Do not load trajectory when inclusionUpdated() signal is triggered # because loading trajectory involves changing the workspace which may # again trigger inclusionUpdated(). So use singleshot timer to handle # inclusion update. self.all_included = all_included self.previous_included = previous_included self.inclusion_update_timer.start()
def _getTrajectoryEntryIdFromIncludedEntries(self, all_included: list, previous_included: list): """ If exactly 2 entries are included and one is a trajectory entry and another is a regular entry, it returns trajectory entry id. If exactly one entry is included and it is a trajectory entry, it returns trajectory entry id. If exactly 2 entries are included and both are trajectory entry, then returns a newly included entry's entry id. In all other case, it returns None. Example other cases:- If more than 2 entries are included, it always return None. If there is no included trajectory entry, it always return None. :return trajectory entry from all_included entries, otherwise None. :rtype: int """ # Check if there is only one entry included and it has trajectory ws_entry_total = len(all_included) allowed_entries_total = ws_entry_total == self.MAX_ALLOWED_ENTRIES_IN_WORKSPACE if ws_entry_total == 1 and projutils.has_trajectory( self.proj, all_included[0]): return all_included[0] elif allowed_entries_total: # Both must not be a trajectory entry because we do not support # multiple trajectory at a time, but other can be a regular entry. first_has_trajectory = projutils.has_trajectory( self.proj, all_included[0]) second_has_trajectory = projutils.has_trajectory( self.proj, all_included[1]) both_has_trajectory = first_has_trajectory and second_has_trajectory if (not both_has_trajectory) and (first_has_trajectory or second_has_trajectory): return (all_included[0] if first_has_trajectory else all_included[1]) if both_has_trajectory: return (all_included[1] if previous_included[0] == all_included[0] else all_included[0]) def _handleInclusionUpdate(self): """ Load the trajectory only if <= MAX_ALLOWED_ENTRIES_IN_WORKSPACE are included and one of these is a trajectory entry. """ if self.all_included is None: return all_included = [eid for eid in self.all_included if eid > 0] all_included_count = len(all_included) has_scratch_entry = all_included_count != len(self.all_included) previous_included = [eid for eid in self.previous_included if eid > 0] self.all_included = None self.previous_included = None eid = self._getTrajectoryEntryIdFromIncludedEntries( all_included, previous_included) has_trajectory = eid is not None traj_eid = None if self.entry_traj: traj_eid = self.entry_traj.eid sync_workspace = traj_eid in all_included # Trajectory player does not support scratch entry. if has_scratch_entry: if traj_eid is not None: self._restoreDefaultStateOfEntryCt( sync_workspace=sync_workspace) return # If two entries are already included and user is including 3rd entry. if all_included_count > self.MAX_ALLOWED_ENTRIES_IN_WORKSPACE: # If newly included entry is a trajectory entry, warn user that # multiple entries are not allowed. new_included = list(set(all_included) - set(previous_included)) for entry_id in new_included: if projutils.has_trajectory(self.proj, entry_id): self._validateIfMultipleEntriesAllowed(None) break # Previously there were two entries in the workspace, but one # of those was a loaded trajectory, we have to unload existing # trajectory as well. if traj_eid is not None: self._restoreDefaultStateOfEntryCt( sync_workspace=sync_workspace) return # Entry's trajectory is already loaded, do nothing if traj_eid == eid and has_trajectory: if (self.reference_entry_id is None or self.reference_entry_id not in all_included): # Trajectory is already loaded and user included another entry, # so it is possible that trajectory is present with replica, so # don't use full system ct. self.setReferenceEntryStatistics(use_full_system_ct=False) # Trajectory entry with varying atoms is already included and # user is attempting to include another reference entry. # This should not be allowed to user. if not self._validateIfMultipleEntriesAllowed(self.entry_traj): self._restoreDefaultStateOfEntryCt( sync_workspace=sync_workspace) return # We enter in this if block only if there is a single trajectory # entry in the workspace or two entries (one trajectory and # another regular entry). So as long as we are allowing other entry # we can return safely. if all_included_count == 1 or self.entry_traj.allow_reference_entry: return if traj_eid is not None: self._restoreDefaultStateOfEntryCt(sync_workspace=sync_workspace) # If trajectory was already loaded and was allowed to have another # entry or only trajectory entry was present, then we should have # early returned. # If we reach here, it implies that we are not allow to keep # trajectory active. # Example use case: varying atoms trajectory is already loaded and # later user includes regular entry. if (all_included_count == self.MAX_ALLOWED_ENTRIES_IN_WORKSPACE and not has_trajectory): return if has_trajectory and vt.validate_trajectory_path( utils.get_trajectory_path(self.proj, eid), eid, self.parent()): self.setupTrajectory([eid], False) def _restoreDefaultStateOfEntryCt(self, sync_workspace): """ Restore default state of entry ct and clear trajectory viewer. :type sync_workspace: bool :param sync_workspace: Whether or not synchronize changes in the workspace. """ if self.entry_traj: self.restoreEntryCT(eid=self.entry_traj.eid, sync_workspace=sync_workspace) self.clearTrajectory() @property def is_snapshot_mode_active(self): """ :rtype: bool :return: True if display frames from snapshot panel is activated. """ return self._snapshot_mode_active @is_snapshot_mode_active.setter def is_snapshot_mode_active(self, state: bool): """ Set snapshot mode. """ self._snapshot_mode_active = state
[docs] def clearTrajectory(self): """ Stop current trajectory play if player is active. Delete trajectory object. Disable player controls. Clear all trajectory related gui control values. """ # When snapshot mode is active, we should not clear trajectory # because snapshot panel depends on same trajectory used by player # toolbar. Ideally, player toolbar, project table, entrylist, main # window are disabled when snapshot mode is active, but it is still # possible to trigger clearTrajectory() when user excludes all frames # from the snapshot panel. if self.is_snapshot_mode_active: return self.frame_timer.stop() if self.entry_traj: del self.entry_traj self.setEntryTraj(None) self.trj_hub.getPBCMeasurementManager().setPBCMeasurementActive(False) if self._trj_maestro_settings: self._trj_maestro_settings.restore() self.total_time = 0 self.total_frame = 0 self.elapsed_time = 0 self.step_size = 0 # Range must be > 0 and between 1 to N. self.frame_slider.setRange(1, 2) self.player_active = False self.setToolbarEnabled(False) self.loop_option = Loop.NONE self.play_direction = Direction.FORWARD self._display_atoms_asl_is_valid = False self._display_atoms_asl_is_dynamic = True self._display_atoms_asl_changed = False self.simbox_visible = False self._ignore_visibility_change = False self._movie_saver.clearData() self.trajectoryUnloaded.emit()
[docs] def setToolbarEnabled(self, enable): """ Enable or disable all toolbar controls except X button. If player is in saving movie mode, then we don't disable play/pause button also. """ btns = (self.start_or_step_back, self.end_or_step_forward, self.current_frame_label, self.current_frame, self.total_frame_label, self.frame_slider, self.slider_time, self.export_button, self.playback_settings_button, self.interactive_plots_button) for btn in btns: btn.setEnabled(enable) if not self.savingMovie(): self.play_or_pause.setEnabled(enable)
@property def proj(self): """ Return current maestro project object. """ return maestro.project_table_get() def _validateStructure(self, entry_traj: EntryTrajectory): """ Validates entry structure with trajectory structure, resets it if required based on user selection. :rtype: bool :return: Whether the structure is valid. """ current_frame = entry_traj.settings_data.current_frame if entry_traj.entryStructureIsValid(current_frame): return True # Until MAE-41745 and MAE-41746 are fixed, we can not reliably # ensure that entry structure is valid or not, so we # unconditionally set it to full system ct. if (entry_traj.isVaryingAtomsFrame() and entry_traj.resetStructure(sync_workspace=True)): return True return self._handleInvalidStructures(entry_traj) def _handleInvalidStructures(self, entry_traj: EntryTrajectory): """ Display invalid structure dialog with 'Cancel' and 'Reset' options, and perform action accordingly. :rtype: bool :return: Whether invalid structure is reset. TODO: MultipleTrajectory """ ret = trajectory_messages.show_invalid_structure_dlg() if ret == MessageButton.RESET: # This will change to loop for multiple structures. return entry_traj.resetStructure(sync_workspace=True) return False
[docs] def getEid(self): """ Return current trajectory entry id. """ return self.entry_traj.eid if self.entry_traj else None
@property def reference_entry_id(self): """ :return: Return reference entry id is there is any. """ if self.entry_traj: return self.entry_traj.reference_entry_id def _validateIfMultipleEntriesAllowed(self, entry_traj: EntryTrajectory): """ Verify if multiple entries are allowed when trajectory is loaded and display warning appropriately. :param entry_traj: Entry trajectory instance. :return: True if user is allowed to keep multiple entries. :rtype: bool """ _, entry_total = get_workspace_structure_and_entry_total() if entry_total > self.MAX_ALLOWED_ENTRIES_IN_WORKSPACE: text = textwrap.dedent(f""" The Trajectory cannot be played when there are more than {self.MAX_ALLOWED_ENTRIES_IN_WORKSPACE} entries included in the Workspace. """) title = 'Trajectory Player - Too Many Entries' QtWidgets.QMessageBox.warning(None, title, text) return False elif (entry_total == self.MAX_ALLOWED_ENTRIES_IN_WORKSPACE and entry_traj.isVaryingAtomsFrame() and self.reference_entry_id and (not entry_traj.reference_entry_included_first)): text = textwrap.dedent(""" The current trajectory has a variable number of atoms per frame. To play this trajectory with a second entry in the Workspace, the trajectory must be included after the other entry. To enable the player, exclude the trajectory entry and include it again. """) title = 'Cannot Run with Second Entry' QtWidgets.QMessageBox.warning(None, title, text) return False return True
[docs] @wait_cursor def setupTrajectory(self, eids, show_player): """ Setup trajectory settings in the player for a given list of entry ids. :param eids: The list of entry ids to be used for trajectory playing. :type eids: list(int) :param show_player: Determine if player should also be shown. :type bool: Show player if True. :param multiple_included: True if more than one entries are included in the workspace at the time of setting up trajectory. :type multiple_included: bool Note: current only single trajectory is supported, so eids is a list of single entry. """ if not eids: maestro.warning( "Trajectory can be visualized with valid project entry.") return # Only single trajectory support for now. eid = eids[0] # If user clicks on same entry's T button and its trajectory is already # visible, then don't do anything. if self.entry_traj and self.entry_traj.isSameTrajectory(eid): # It is possible that trajectory was loaded by snapshot panel # and now user clicked on T button to show the player toolbar, # so we should honor user request to display player with current # frame. if show_player: self.show() self.updateCurrentFrame() return entry_traj = EntryTrajectory(self.proj, eid, matsci_profile=self.matsci_profile) if not entry_traj.isValid() or entry_traj.isSingleFrameTrajectory(): return valid_structure = self._validateStructure(entry_traj) if not valid_structure: return self.setEntryTraj(entry_traj) # Trajectory is being loaded, so workspace ct won't always represent # same number of atoms as in the frame ct for varying atoms frame, so for # the consistency, always use full system ct. self.setReferenceEntryStatistics(use_full_system_ct=True) if not self._validateIfMultipleEntriesAllowed(entry_traj): self.setEntryTraj(None) return self.setToolbarEnabled(True) if show_player: self.show() self.total_frame = self.entry_traj.total_frame # Create PlaybackSettings popup has_pbsettings_data = self.entry_traj.hasPlaybackSettingsData() # Set placback settings data self.playback_settings_popup.setPlaybackSettingsData( self.entry_traj.settings_data, has_pbsettings_data, self.entry_traj.cms_model.fsys_ct.atom_total) data = self.playback_settings_popup.data self._trj_maestro_settings = TrajectoryMaestroSettings( self.proj, eid, self.playback_settings_popup.updateAdjustPositionCheckBox, data.translate_to_first_unit_cell) # Connect to 'Playback Settings' popup for change settings self.connectToPlaybackSettingsPopup() self.player_was_paused = False self.frame_slider.setRange(1, self.total_frame) self.updateSliderValues(data.start_frame, data.end_frame, data.current_frame) self.settingsChanged(forced_update=False) if self.isVisible(): self.updateCurrentFrame() self.trajectoryLoaded.emit(self.total_frame)
[docs] def updateSliderValues(self, start_frame, end_frame, current_frame): """ Update slider values based on start, end, and current frame. """ self.frame_slider.setCurrentRange(start_frame, end_frame) self.current_value = current_frame self.frame_validator.setRange(self.current_min_value, self.current_max_value)
[docs] def updateFrameTimerInterval(self, frame_duration): """ Updates frame_timer interval using the frame_duration """ self.frame_timer.setInterval(frame_duration * 1000 + 0.5)
[docs] def updateStepSize(self, step_size, update_player=True): """ Updates step size. :param step_size: Step size to be used while stepping in the trajectory. Possible values are 1-99 or self.endFrame() - self.startFrame(). :type step_size: int :param update_player: Indicates whether or not update player toolbar gui. :type update_player: bool """ self.step_size = step_size if update_player: self.updatePlayer()
[docs] def setupContextMenu(self): """ Setup player toolbar context menu. """ self._context_menu = QtWidgets.QMenu(self) self._context_menu.setObjectName("trj_player_context_menu") self._context_menu.addAction("Close", self.closePlayerIfAllowed) self._context_menu.addSeparator() help_func = partial(qt_utils.help_dialog, "TRAJECTORY_PLAYER", self) self._context_menu.addAction("Help", help_func)
[docs] def contextMenuEvent(self, event): """ This function redefines context menu event for the player toolbar. Context menu is displayed only if there is no child widget at right clicked position. :param event: Qt context menu event :type event: QtGui.QContextMenuEvent """ obj = self.childAt(event.pos()) if obj is None: self._context_menu.exec_(self.mapToGlobal(event.pos()))
[docs] def setupPlotMenu(self, plot_button): """ Sets up the plot menu :param plot_button: Plot button to hook up menu to :type plot_button: QToolButton """ # Set up formatting of menu self.plot_menu = TrajectoryQMenu(plot_button) self.plot_menu.setObjectName('trj_plot_menu') self.plot_menu.aboutToShow.connect(self._checkShowPlotsPanel) waction = QtWidgets.QWidgetAction(self.plot_menu) layout_widget = QtWidgets.QWidget() layout_widget.setObjectName("plot_menu_title_layout_widget") vboxlayout = QtWidgets.QVBoxLayout() vboxlayout.setContentsMargins(0, 0, 0, 0) layout_widget.setLayout(vboxlayout) self.plot_menu_title = QtWidgets.QLabel( "COMPUTE PROPERTIES OVER TRAJECTORY") self.plot_menu_title.setObjectName("trj_plot_menu_title") vboxlayout.addWidget(self.plot_menu_title) waction.setDefaultWidget(layout_widget) self.plot_menu.insertAction(None, waction) # Define data for menu and submenus plots = tplots.TrajectoryPlotType SEPARATE = object() HYDROGEN_BONDS = 'Hydrogen Bonds' HALOGEN_BONDS = 'Halogen Bonds' SALT_BRIDGE = 'Salt Bridges' PI_PI_INTERACTIONS = 'Pi-Pi Stacking' CAT_PI_INTERACTIONS = 'Pi-Cation' INTERACTIONS_USE_VISIBLE_ATOMS = "For Visible Atoms Only" RMSD = 'RMSD' RMSF_ATOM = 'Per Atom' RMSF_RES = 'Per Residue' RADIUS_OF_GYRATION = 'Radius of Gyration' POLAR_SURFACE_AREA = 'Polar Surface Area' SOLVENT_ACCESSIBLE_SURFACE_AREA = 'Solvent Accessible Surface Area' MOLECULAR_SURFACE_AREA = 'Molecular Surface Area' CUSTOM_SUBSTRUCTURE_SETS = "Custom Substructure Sets..." def open_plot(plot_type): """ Return a function that emits self.createPlot signal with the given `plot_type` as the only parameter. """ return lambda: self.createPlot.emit(plot_type) menu_items = { "Measurements": { "Currently in Workspace": open_plot(plots.MEASUREMENT_WORKSPACE ), "Add Workspace Measurements": self.trajectory_plot_panel.addWorkspaceMeasurements, "SEPARATOR": SEPARATE, "Planar Angle": open_plot(plots.MEASUREMENT_PLANAR_ANGLE), }, "Interaction Counts": { "All Types": open_plot(plots.INTERACTIONS_ALL), "SEPARATOR": SEPARATE, HYDROGEN_BONDS: open_plot(plots.INTERACTIONS_HYDROGEN_BONDS), HALOGEN_BONDS: open_plot(plots.INTERACTIONS_HALOGEN_BONDS), SALT_BRIDGE: open_plot(plots.INTERACTIONS_SALT_BRIDGE), PI_PI_INTERACTIONS: open_plot(plots.INTERACTIONS_PI_PI), CAT_PI_INTERACTIONS: open_plot(plots.INTERACTIONS_CAT_PI), "SEPARATOR": SEPARATE, INTERACTIONS_USE_VISIBLE_ATOMS: self._togglePlotVisibleInteractions }, "Descriptors (Selected Atoms)": { RMSD: open_plot(plots.DESCRIPTORS_RMSD), RADIUS_OF_GYRATION: open_plot(plots.DESCRIPTORS_RADIUS_GYRATION ), "SEPARATOR1": SEPARATE, POLAR_SURFACE_AREA: open_plot(plots.DESCRIPTORS_PSA), SOLVENT_ACCESSIBLE_SURFACE_AREA: open_plot( plots.DESCRIPTORS_SASA), MOLECULAR_SURFACE_AREA: open_plot(plots.DESCRIPTORS_MOLECULAR_SA ), "SEPARATOR2": SEPARATE, "View RMSD Settings...": self.trajectory_plot_panel.showRmsdSettingsDialog, }, "SEPARATOR": SEPARATE, "RMSF (Selected Atoms)": { RMSF_ATOM: open_plot(plots.DESCRIPTORS_ATOM_RMSF), RMSF_RES: open_plot(plots.DESCRIPTORS_RES_RMSF), "SEPARATOR": SEPARATE, "View Settings...": self.trajectory_plot_panel.showRmsdSettingsDialog, }, "Energy Calculations": { "All Substructure Sets (Grouped)": open_plot( plots.ENERGY_ALL_GROUPED), "All Substructure Sets (Individual)": open_plot( plots.ENERGY_ALL_INDIVIDUAL), # Disabled for 2021-4; see PANEL-20280 #"All Individual Molecules": open_plot( # plots.ENERGY_INDIVIDUAL_MOLECULES), "SEPARATOR": SEPARATE, CUSTOM_SUBSTRUCTURE_SETS: open_plot( plots.ENERGY_CUSTOM_SUBSTRUCTURE_SETS), "Custom ASL Sets...": open_plot(plots.ENERGY_CUSTOM_ASL_SETS), "SEPARATOR": SEPARATE, "Job Settings...": self.trajectory_plot_panel.showEnergySettingsDialog, }, } hub = maestro_ui.MaestroHub.instance() view_state = hub.getInteractionViewState() atomsSelected, multiAtomsSelected = self._areSingleAndMultiAtomsSelected( ) # List of menu items that are disabled in current profile: DISABLED_ACTIONS = [] if hub.isMaterialsScienceProfileMode(): DISABLED_ACTIONS.append(CUSTOM_SUBSTRUCTURE_SETS) # Map of actions that should be checkable menu items and their initial # check state. CHECKABLE_ACTIONS = {INTERACTIONS_USE_VISIBLE_ATOMS: True} # Enabling signals maps a menu item to its toggle function and initial state ENABLING_SIGNALS = { HYDROGEN_BONDS: (hub.nonCovalentInteractionsToggled, view_state.non_covalent_bonds), HALOGEN_BONDS: (hub.nonCovalentInteractionsToggled, view_state.non_covalent_bonds), SALT_BRIDGE: (hub.nonCovalentInteractionsToggled, view_state.non_covalent_bonds), PI_PI_INTERACTIONS: (hub.piInteractionsToggled, view_state.pi_interactions), CAT_PI_INTERACTIONS: (hub.piInteractionsToggled, view_state.pi_interactions), RMSD: (self.multiAtomsSelectedToggled, multiAtomsSelected), RMSF_ATOM: (self.multiAtomsSelectedToggled, multiAtomsSelected), RMSF_RES: (self.multiAtomsSelectedToggled, multiAtomsSelected), RADIUS_OF_GYRATION: (self.multiAtomsSelectedToggled, multiAtomsSelected), POLAR_SURFACE_AREA: (self.atomsSelectedToggled, atomsSelected), SOLVENT_ACCESSIBLE_SURFACE_AREA: (self.atomsSelectedToggled, atomsSelected), MOLECULAR_SURFACE_AREA: (self.atomsSelectedToggled, atomsSelected), } # Wire up menu and submenus for text, item in menu_items.items(): if item is SEPARATE: self.plot_menu.addSeparator() elif isinstance(item, dict): submenu = QtWidgets.QMenu(text, self.plot_menu) self.plot_menu.addMenu(submenu) for subtext, callback in item.items(): if callback is SEPARATE: submenu.addSeparator() elif subtext in DISABLED_ACTIONS: continue else: action = submenu.addAction(subtext) if subtext in CHECKABLE_ACTIONS: action.setCheckable(True) action.setChecked(CHECKABLE_ACTIONS[subtext]) if subtext in ENABLING_SIGNALS: signal, initial_state = ENABLING_SIGNALS[subtext] # Partial is used here to retain the current local value of action getAction = partial(lambda act: act, action) toggleEnableAction = partial( lambda action, enable: action.setEnabled(enable ), getAction()) signal.connect(toggleEnableAction) action.setEnabled(initial_state) if subtext in CHECKABLE_ACTIONS: action.toggled.connect(callback) else: action.triggered.connect(callback) else: action = self.plot_menu.addAction(text) action.triggered.connect(item) plot_button.setMenu(self.plot_menu)
def _checkShowPlotsPanel(self): """ Check if the Plots panel should be shown and, if so, show it. """ plot_panel = self.trajectory_plot_panel if not plot_panel.isVisible() and plot_panel.getPlotCount() > 0: plot_panel.show() def _onAtomSelectChanged(self): enable_single, enable_multi = self._areSingleAndMultiAtomsSelected() self.atomsSelectedToggled.emit(enable_single) self.multiAtomsSelectedToggled.emit(enable_multi) def _areSingleAndMultiAtomsSelected(self): if not maestro: return False, False num_selected_atoms = len(maestro.selected_atoms_get()) single_select = num_selected_atoms > 0 multi_select = num_selected_atoms > 1 return single_select, multi_select
[docs] def setupExportMenu(self, export_button): """ Sets up Export button menu. """ self.export_menu = TrajectoryQMenu(export_button) self.export_menu.setObjectName("trj_export_menu") waction = QtWidgets.QWidgetAction(self.export_menu) layout_widget = QtWidgets.QWidget() layout_widget.setObjectName("export_menu_title_layout_widget") vboxlayout = QtWidgets.QVBoxLayout() vboxlayout.setContentsMargins(0, 0, 0, 0) layout_widget.setLayout(vboxlayout) self.export_menu_title = QtWidgets.QLabel("EXPORT") self.export_menu_title.setObjectName("trj_export_menu_title") vboxlayout.addWidget(self.export_menu_title) waction.setDefaultWidget(layout_widget) self.export_menu.insertAction(None, waction) SEPARATOR = "separator" menu_items = { "Structures...": self._structure_exporter.showExportStructuresDlg, "Trajectory...": self._structure_exporter.exportTrajectory, "Image...": self.trj_hub.saveImage, "Movie...": self._movie_saver.exportMovie, SEPARATOR: None, "View Snapshots...": self.showTrajectorySnapshotPanel.emit } for text, callback in menu_items.items(): if text == SEPARATOR: self.export_menu.addSeparator() else: action = self.export_menu.addAction(text) action.triggered.connect(callback) export_button.setMenu(self.export_menu)
[docs] def connectToPlaybackSettingsPopup(self): """ Connect to PlaybackSettings popup (self.playback_settings_popup) for change in playback settings. """ self.playback_settings_button.toggled.connect( self.playback_settings_popup.setVisible) self.playback_settings_popup.closed.connect( self.uncheckPlaybackSettingsButton) self.playback_settings_popup.frameDurationChanged.connect( self.updateFrameTimerInterval) self.playback_settings_popup.stepChanged.connect(self.updateStepSize) self.playback_settings_popup.smoothingChanged.connect( self.updateCurrentFrame) self.playback_settings_popup.playDirectionChanged.connect( self.updatePlayDirection) self.playback_settings_popup.loopChanged.connect(self.updateLoopOption) self.playback_settings_popup.beyondBindingSiteChanged.connect( self.setDisplayAtomsASL) self.playback_settings_popup.solventsOnlyChanged.connect( self.setDisplayAtomsASL) self.playback_settings_popup.nonpolarHydrogensChanged.connect( self.setDisplayAtomsASL) self.playback_settings_popup.proteinOnlyChanged.connect( self.setDisplayAtomsASL) self.playback_settings_popup.translateToFirstUnitCellChanged.connect( self.translateToFirstUnitCellChanged) self.playback_settings_popup.viewPositioningChanged.connect( self.updateViewPosition) self.playback_settings_popup.allSettingsChanged.connect( self.settingsChanged) self.playback_settings_popup.aboutToShowAdvancedSettings.connect( self.pausePlayer) self.playback_settings_popup.advancedSettingsDismissed.connect( self.resumePlayer) self.playback_settings_popup.advancedSettingsChanged.connect( self.handleAdvancedSettings)
[docs] def updateViewPosition(self, forced_update=True): """ Update trajectory based on view position tab settings and adjust frame. To center molecule, ASL must be valid and matched atoms must be at least 1. To align molecules, ASL must be valid and matched atoms must be at least 3. :param forced_update: Copy all frame's position array and adjust frame in the workspace using new view position setting. :type forced_update: bool """ asl = self.playback_settings_popup.getAVPAsl() # If player is not active and user modified asl, then apply it # immmediately. if (self.entry_traj.updateViewPosition(forced_update, asl) and (not self.player_active) and forced_update): self.updateCurrentFrame()
[docs] def translateToFirstUnitCellChanged(self): """ Update player according to 'Translate to first unit cell' and simulation box settings """ # Update simulation box data = self.playback_settings_popup.data self.simbox_visible = data.show_simulation_box self._trj_maestro_settings.setMaxPBCAtomsForUnitCell( self.playback_settings_popup.data.translate_to_first_unit_cell)
[docs] def settingsChanged(self, forced_update=True): """ Update player according to current PlaybackSettings :param forced_update: Whether to update current frame :type forced_update: bool """ data = self.playback_settings_popup.data self.updateFrameTimerInterval(data.frame_duration) self.updateStepSize(data.step, False) self.updatePlayDirection(data.direction, False) self.updateLoopOption(data.loop) self.updateViewPosition(False) self.handleAdvancedSettings(False) if forced_update: self.updateCurrentFrame()
[docs] def handleAdvancedSettings(self, forced_update=True): """ Update player according to the advanced settings data """ data = self.playback_settings_popup.data self.simbox_visible = data.show_simulation_box self.setDisplayAtomsASL(None, forced_update=False) # If user turned off option, clear data. if not data.update_secondary_structure: self.entry_traj.ss_data.clearData()
[docs] def updateLoopOption(self, loop_option): """ Update loop option (NONE, SINGLE, OSCILLATE) """ self.loop_option = loop_option # Whenever loop option is changed, also update play direction # because we may be using transient play direction due to # loop -> bidirectional option. self.updatePlayDirection(self.playback_settings_popup.data.direction, update_player=False)
[docs] def updatePlayDirection(self, play_direction, update_player=True): """ Update play direction icon and direction member (FORWARD, BACKWARD) :param play_direction: Direction of play as specified by trajectory_gui_dir.playback_settings_popup.data.Direction. :type play_direction: enum(Direction) :param update_player: Indicates whether or not update player toolbar gui. :type update_player: bool """ self.play_or_pause.setProperty("direction", play_direction.value) qt_utils.update_widget_style(self.play_or_pause) self.play_direction = play_direction if update_player: self.updatePlayer()
[docs] def showAdvancedSettings(self): """ Shows 'Advanced Playback Settings' dialog """ if self.playback_settings_popup: self.playback_settings_popup.showAdvancedSettings()
[docs] def uncheckPlaybackSettingsButton(self): """ Unchecks 'Playback Settings' button when 'Playback Settings' popup is hidden/closed. """ # If 'Playback Settings' popup is hidden by mouse click then button # will be automatically unchecked, so uncheck only when button is not # under mouse. if not self.playback_settings_button.underMouse(): self.playback_settings_button.setChecked(False)
[docs] def getTimeTooltip(self, slider_index, frame_number): """ Return tooltip text for time. :param slider_index: The frame slider index (left, middle, or right) :type slider_index: int :param frame_number: The current valid frame number represented by slider point. :type frame_number: int :return: Time tooltip text. :rtype: str """ if self.isValidFrame(frame_number): time_value = self.formattedValue( self.entry_traj.getFrameTime(frame_number)) if (slider_index == 0 and self.current_min_value != 1) or ( slider_index == 2 and self.current_max_value != self.total_frame) or (slider_index == 1): total_time = self.formattedValue( self.entry_traj.getFrameTime(self.total_frame)) time_value = f'{time_value} of {total_time}' return time_value
[docs] def updateFrameSliderToolTip(self, slider_index): """ Update tooltip text for a frame slider. Tooltip displays both frame number and elapsed time. For middle slider, it also displays total frames and current frame position taking into consideration min, max, and step size. :param slider_index: the frame slider index (left, middle, or right) :type slider_index: int """ index_callback_map = { 0: (self.current_min_value, "Start\n"), 1: (self.current_value, "Current\n"), 2: (self.current_max_value, "End\n") } frame_number = index_callback_map[slider_index][0] time_value = self.getTimeTooltip(slider_index, frame_number) if slider_index == 0 or slider_index == 2: tooltip = ("Frame: %d\nTime: %s") % (frame_number, time_value) else: total_frames = self.getEndFrame() - self.getStartFrame() + 1 visited_frames = self.current_value - self.getStartFrame() + 1 if (self.getEndFrame() != self.total_frame or self.current_min_value != 0): tooltip = ("Frame: %d (%d of %d)\nTime: %s") % ( frame_number, visited_frames, total_frames, time_value) else: tooltip = ("Frame: %d\nTime: %s") % (frame_number, time_value) if self.step_size > 1: step_tooltip = ( "\nStep: %d / %d") % (visited_frames / self.step_size, total_frames / self.step_size) tooltip += step_tooltip final_text = index_callback_map[slider_index][1] + tooltip self.frame_slider.setToolTip(final_text)
[docs] def addVerticalLine(self, base_layout): """ Add a vertical line of fixed width in the given layout """ vertical_line = QtWidgets.QFrame() vertical_line.setObjectName("vertical_line") vertical_line.setFrameStyle(QtWidgets.QFrame.VLine) vertical_line.setFrameShadow(QtWidgets.QFrame.Plain) vertical_line.setLineWidth(1) base_layout.addWidget(vertical_line)
[docs] def snapshotModeChanged(self, state: bool): """ When snapshot panel mode activates, we should disable player toolbar and hide simulation box. :param state: Determine if snapshot panel mode activated or deactivated. """ # When snapshot mode active, we want to disconnect SH, so that workspace # structure changes are not reflected in the SH and block updates in # other maestro components. if state: self.trj_hub.trajectoryPlayerStateChanged.emit(state) self.is_snapshot_mode_active = state self.setToolbarEnabled(not self.is_snapshot_mode_active) ps_popup = self.playback_settings_popup simbox_visible = ps_popup is not None and ps_popup.data.show_simulation_box self._ignore_visibility_change = state if state: # If simulation box is visible, then only hide it. if simbox_visible: self.simbox_visible = False else: # If simulation box was visible, then only show it. if simbox_visible: self.simbox_visible = True if not self.is_snapshot_mode_active: st = self.getOriginalEntryCT(copy=True) # Since we already got a copy of st, so workspace_set can own this # structure itself. maestro.workspace_set(st, copy=False, check_scratch_entry=False) # When coming out of snapshot mode, we need to ensure that # trajectory's frame structure is now referred to appropriate structure. self.entry_traj._updateSavedFrameStructure() # If player toolbar is visible, then set workspace based on current # frame. if self.isVisible(): self.updateCurrentFrame() # When snapshot mode is deactivated, we want to make sure that SH and # other components are updated using current workspace ct rather # than workspace ct which was present in the active snapshot # mode. if not state: self.trj_hub.trajectoryPlayerStateChanged.emit(state)
[docs] def snapshotPanelClosed(self): """ Snapshot panel can load trajectory without showing a player toolbar, so make sure that trajectory is unloaded if player is not visible. """ # If player is visible, we don't need to clear trajectory because it is # also shared by the player toolbar. if not self.isVisible(): self._restoreDefaultStateOfEntryCt(True)
[docs] def getFrameToActiveAtomTotalMap(self): """ :return: A list of active atom total of all frames :rtype: list(int) """ if self.entry_traj: return self.entry_traj.getFrameToActiveAtomTotalMap()
[docs]class MovieSaver: """ This class provides ability to save trajectory movie. """
[docs] def __init__(self, player_obj): """ :type player_obj: TrajectoryPlayer :param player_obj: Trajectory player toolbar object. """ self.player_obj = player_obj self.trj_hub = player_obj.trj_hub self._dlg = None self.clearData()
[docs] def clearData(self): """ Clean all data members. """ self._movie_file_path = None self._saving_movie = False self._saved_frame_count = 0
[docs] def exportMovie(self): """ Display a export movie dialog and collect all necessary inputs to save a movie. """ # If we are saving movie already, don't proceed. if self.savingMovie(): return if self._dlg is None: self._dlg = export_movie.ExportMovieDialog(self.player_obj) self._movie_file_path = self._dlg.showDlg( self.player_obj.current_min_value, self.player_obj.current_max_value, self.player_obj.step_size, self.player_obj.total_frame) if self._movie_file_path: self._startSavingMovie()
def _startSavingMovie(self): """ Stores current frame, start, end, direction, and loop options temporarily and set values as specfied by user and starts saving movie frame by frame. """ # If we are saving movie already, don't proceed. if self.savingMovie(): return # Save current player settings. self.start_frame = self.player_obj.current_min_value self.end_frame = self.player_obj.current_max_value self.current_frame = self.player_obj.current_value self.loop_option = self.player_obj.loop_option self.play_direction = self.player_obj.play_direction # Create temporary directory path where movie frames are stored. dir_path = maestro_ui.mm_get_movie_frames_dir_path() if not os.path.exists(dir_path): os.mkdir(dir_path) # Make sure if old frames are present, all are deleted. else: self._cleanDirectory(dir_path, False) # Set player state according to movie settings. start_frame = self._dlg.getStartFrameValue() end_frame = self._dlg.getEndFrameValue() current_frame = start_frame self.player_obj.updateSliderValues(start_frame, end_frame, current_frame) self.player_obj.updateLoopOption(Loop.NONE) self.player_obj.updatePlayDirection(Direction.FORWARD, False) # Start playing entries. self._saving_movie = True self.player_obj.setToolbarEnabled(False) self.player_obj.player_active = True def _cleanDirectory(self, dir_path, remove_dir=True): """ Remove all files from the given directory path. :type dir_path: str :param dir_path: Directory path to be cleaned up. :type remove_dir: bool :param remove_dir: Remove directory also. """ if dir_path and os.path.exists(dir_path): for f in glob.glob(os.path.join(dir_path, "*.jpeg")): os.remove(f) if remove_dir: os.rmdir(dir_path)
[docs] def stopSavingMovie(self, ok_status=True): """ Restore current frame, start, end, direction, and loop options set before saving movie and notify maestro to save in the movie file. Function is a no-op if we were not saving a movie. :type ok_status: bool :param ok_status: True if stopping movie gracefully (i.e. all frames are saved successfully) """ # If we are not saving movie, don't proceed. if not self.savingMovie(): return self._saving_movie = False self.player_obj.player_active = False if ok_status: total = self._dlg.getTotalMovieFrames() image_files = maestro_ui.mm_get_movie_files_path(total) data = self._dlg.getMovieData() success = self.trj_hub.requestToMakeVideo(self._movie_file_path, image_files, data) if not success: maestro.warning("Failed to save movie in %s" % self._movie_file_path) self.clearData() self.player_obj.setToolbarEnabled(True) dir_path = maestro_ui.mm_get_movie_frames_dir_path() self._cleanDirectory(dir_path) self.player_obj.updateSliderValues(self.start_frame, self.end_frame, self.current_frame) self.player_obj.updateLoopOption(Loop.NONE) self.player_obj.updatePlayDirection(Direction.FORWARD, False) self.player_obj.updateCurrentFrame() self.trj_hub.requestToRestoreWorkspaceSizeFromMovieMode.emit()
[docs] def savingMovie(self): """ :rtype: bool :return: True if saving movie. """ return self._saving_movie
[docs] def saveFrameForMovie(self): """ Save current trajectory frame for movie. """ if not self.savingMovie(): return total = self._dlg.getTotalMovieFrames() image_file_path = maestro_ui.mm_get_movie_file_path( total, self._saved_frame_count) self._saved_frame_count = self._saved_frame_count + 1 size = self._dlg.getResolution() # If we are saving first frame, we need to notify maestro to resize # workspace according to requested image resolution. if self._saved_frame_count == 1: self.trj_hub.requestToResizeWorkspaceForMovieMode.emit(size) success = self.trj_hub.requestToSaveFrame(image_file_path, size) if not success: self.stopSavingMovie(ok_status=False) maestro.warning("Failed to save movie frame f{image_file_path}.") if total == self._saved_frame_count: # Saved all frames, so we can stop saving movie. self.stopSavingMovie(ok_status=True)