"""
Trajectory viewer player toolbar.
Copyright Schrodinger, LLC. All rights reserved.
"""
import glob
import itertools
import os
import textwrap
from enum import Enum
from functools import partial
from typing import Union
import numpy
import schrodinger
from schrodinger import structure
from schrodinger.application.desmond import cms
from schrodinger.application.desmond.constants import IS_INFINITE
from schrodinger.application.desmond.packages import topo
from schrodinger.application.desmond.packages import traj
from schrodinger.application.desmond.packages import traj_util
from schrodinger.application.matsci import msprops
from schrodinger.application.matsci.nano import xtal
from schrodinger.infra import mm
from schrodinger.infra import projectmodel
from schrodinger.infra.mmbitset import Bitset
from schrodinger.project import utils as projutils
from schrodinger.Qt import QtCore
from schrodinger.Qt import QtGui
from schrodinger.Qt import QtWidgets
from schrodinger.structutils import analyze
from schrodinger.trajectory import utils
from schrodinger.trajectory import validate_trajectory as vt
from schrodinger.trajectory.trajectory_gui_dir import export_movie
from schrodinger.trajectory.trajectory_gui_dir import frame_structure_exporter
from schrodinger.trajectory.trajectory_gui_dir import playback_settings
from schrodinger.trajectory.trajectory_gui_dir import playback_settings_data
from schrodinger.trajectory.trajectory_gui_dir import plots as tplots
from schrodinger.trajectory.trajectory_gui_dir import stylesheet
from schrodinger.trajectory.trajectory_gui_dir import traj_plot_gui
from schrodinger.trajectory.trajectory_gui_dir import trajectory_messages
from schrodinger.trajectory.trajectory_gui_dir.export_structure_enums import \
ExportMode
from schrodinger.trajectory.trajectory_gui_dir.playback_settings_data import \
Direction
from schrodinger.trajectory.trajectory_gui_dir.playback_settings_data import \
Loop
from schrodinger.trajectory.trajectory_gui_dir.secondary_structure_data import \
SecondaryStructureData
from schrodinger.trajectory.trajectory_gui_dir.trajectory_messages import \
MessageButton
from schrodinger.ui import maestro_ui
from schrodinger.ui.qt import messagebox
from schrodinger.ui.qt import utils as qt_utils
from schrodinger.ui.qt.decorators import wait_cursor
maestro = schrodinger.get_maestro()
LOAD_TRAJ_MSG = ("The trajectory corresponding to the clicked plot is not "
"currently loaded into the Trajectory Player. Do you want "
"to load it now?\n\nLoading the trajectory will exclude all "
"other entries from the Workspace.")
REQUESTED_TRAJ_NOT_FOUND_MSG = ("The trajectory corresponding to the clicked "
"plot cannot be found in this project. The "
"entry may have been deleted or its "
"trajectory data may have been moved.\n\n"
"Delete all plots associated with this "
"missing trajectory?")
[docs]def get_mmct_id_map_from_dict(atom_id_map_dict: dict):
"""
:param atom_id_map_dict: A dictionary of old to new atom renumbering
dictionary (Keys are atom number before deleting and value for
each is the new atom number or None if that atom was deleted.
:return: Return id map object if there is a valid dict, otherwise -1.
:rtype: MM_ID_Map
Note: Caller must delete it explicitly using mmct_id_map_delete() method.
"""
atom_id_map = mm.MMCT_INVALID_ID_MAP
if atom_id_map_dict:
atom_id_map = mm.mmct_id_map_new(len(atom_id_map_dict))
for key, value in atom_id_map_dict.items():
mm.mmct_id_map_set(atom_id_map, key, value if value else -1)
return atom_id_map
[docs]def unroll_pos(ct, rep_vec, xyz0, start_atom):
"""
Set coordinates for all copies of the replicated CT.
:type ct: `structure.Structure`
:type rep_vec: `tuple` of 3 `int`
:type xyz0: Nx3 `numpy.array` where N is the number of atoms in the
un-replicated CT.
"""
# FIXME: DESMOND-9705 should remove it.
box = numpy.reshape(cms.get_box(ct), [3, 3])
n_copies = numpy.prod(rep_vec)
shifts = numpy.empty((n_copies, 1, 3))
for i, (a, b, c) in enumerate(itertools.product(*map(range, rep_vec))):
shifts[i][0] = numpy.dot((a, b, c), box)
len_xyz0 = len(xyz0)
xyz = numpy.empty((n_copies, len_xyz0, 3))
numpy.copyto(xyz, shifts + xyz0)
xyz.shape = (n_copies * len_xyz0, 3)
ct_xyz = ct.getXYZ(copy=False)
ct_xyz[start_atom - 1:start_atom - 1 + (n_copies * len_xyz0)] = xyz
ct.setXYZ(ct_xyz)
[docs]def set_structure_atoms_entry_id(st: structure.Structure, eid: int,
start_atom: int, end_atom: int):
"""
Update entry id of specified range of atoms in the structure.
:param st: Structure in which atoms needs to be updated.
:param eid: Entry id to be set.
:param start_atom: Starting atom in the structure to be updated. This atom
is also included when updating entry id.
:param end_atom: Ending atom in the structure to be updated. This atom is
also included when updating entry id.
"""
bs = Bitset(size=st.atom_total)
bs.range(start_atom, end_atom)
mm.mmct_ct_set_entry_name(st, str(eid), bs)
[docs]def get_workspace_structure_and_entry_total():
"""
:return: Return a tuple of workspace ct and entry total in the workspace
structure.
:rtype: tuple(structure.Structure, int)
"""
main_ct = maestro.workspace_get(copy=False)
entry_total = mm.mmct_ct_get_entry_total(main_ct)
return (main_ct, entry_total)
[docs]def is_replica_atom_count(main_ct_atom_total: int, frame_ct_atom_total: int):
"""
Whether the main_ct atom count is a result of number of atoms obtained by
replicating frame_ct.
:param main_ct_atom_total: Main CT atom count to verify
:param frame_ct_atom_total: Frame ct atom count to be used for reference.
:rtype: bool
:return: Whether the main ct atom count is a perfect replication of frame
ct atom count.
"""
return (main_ct_atom_total > frame_ct_atom_total and
(main_ct_atom_total % frame_ct_atom_total == 0))
[docs]def set_distance_cell_props(
proj: schrodinger.project.Project, eid: int,
box_values: schrodinger.infra.projectmodel.TriclinicBox):
"""
Set distance cell properties value in the project for the given entry id.
box_values represents ax, ay, az, bx, by, bz, cx, cy, cz respectively.
:rtype: bool
:return: True if values are set appropriately.
"""
if box_values and box_values.isValid():
row = get_entry_row(proj, eid)
return projectmodel.mm_set_distance_cell_props(proj.project_model,
row.index, box_values)
return False
[docs]def get_distance_cell_props(proj: schrodinger.project.Project, eid: int):
"""
Get distance cell properties value from the project for the given entry id.
:rtype: list
:return: schrodinger.infra.projectmodel.TriclinicBox on success.
"""
row = get_entry_row(proj, eid)
box_values = projectmodel.mm_get_distance_cell_props(
proj.project_model, row.index)
if box_values.isValid():
return box_values
[docs]class ReferenceEntryStatistics:
"""
A helper data class. It holds starting position of reference entry atom in
the workspace structure, reference entry atom total, and entry id.
"""
[docs] def __init__(self, workspace_st: structure.Structure,
frame_st_entry_id: int, frame_st_atom_total: int,
frame_st_replica_count: int):
"""
:param workspace_st: Workspace structure. Expected exactly two entries
in the workspace.
:param frame_st_entry_id: Frame structure associated entry id.
:param frame_st_atom_total: Atom total of the frame present in the
workspace structure.
:param frame_st_replica_count: Number of replica of frame structure
in the workspace strucuture.
"""
# It is possible that replication has not been done yet, so
# we can not always compute frame structure atom total by multiplying
# frame ct atom total.
# E.g.
# Include trajectory entry, set replica = 2, include another entry, save
# project, reopen project - replica values are set to 2, but workspace
# structure is not yet updated.
# If workspace atom total is greater than frame_st_atom_total's replica,
# it implies that we have other entry in the workspace structure.
exp_trajectory_total_atom_in_workspace_st = frame_st_atom_total * frame_st_replica_count
if workspace_st.atom_total > exp_trajectory_total_atom_in_workspace_st:
frame_st_all_replica_atom_total = exp_trajectory_total_atom_in_workspace_st
else:
frame_st_all_replica_atom_total = frame_st_atom_total
# If frame structure was a first included entry in the workspace
# structure, then workspace structure will have entry id set to
# 1-frame_st_atom_total for frame_st_entry_id.
if (int(workspace_st.atom[1].entry_id) == frame_st_entry_id and
int(workspace_st.atom[frame_st_all_replica_atom_total].entry_id)
== frame_st_entry_id):
self.start_atom = frame_st_all_replica_atom_total + 1
else:
self.start_atom = 1
self.atom_total = (workspace_st.atom_total -
frame_st_all_replica_atom_total)
self.entry_id = int(workspace_st.atom[self.start_atom].entry_id)
@property
def reference_entry_included_first(self):
"""
:return: True if reference entry is included first in the workspace.
:rtype: bool
"""
# If reference entry included first, its atoms will start from 1.
return self.start_atom == 1
[docs]def compare_structures_connectivity(st1: structure.Structure,
st2: structure.Structure):
"""
Whether given structures have same connectivity between atoms.
E.g. If at.n 1-2 is connected in st1, then same set of atoms must be
connected in the st2 also.
:param st1: Structure1 to compare
:param st2: Structure2 to compare
:rtype: bool
:return: Whether both structures have same connectivity.
"""
return mm.mmct_ct_compare_connect(st1.handle, st2.handle) == mm.MMCT_SAME
[docs]def get_entry_row_property_value(row: schrodinger.project.ProjectRow,
prop_name: str):
"""
Get project row's property value.
"""
if row:
try:
return row.property[prop_name]
except KeyError:
pass
[docs]def get_entry_row(proj: schrodinger.project.Project, eid: int):
"""
Get project entry row.
"""
if proj and eid:
return proj.getRow(eid)
[docs]def get_entry_ct(proj: schrodinger.project.Project, eid: int, props: bool,
copy: bool):
"""
Provide entry structure corresponding to given entry id in the project.
:param proj: The current project.
:param eid: Entry id associated with current project.
:param props: True if properties should also be copied.
:param copy: True if new copy of entry structure is returned.
:return: Entry structure stored in the project.
:rtype: structure.Structure or None
"""
row = get_entry_row(proj, eid)
if row is not None:
return row.getStructure(props=props, copy=copy, workspace_sync=False)
[docs]def set_entry_ct(proj: schrodinger.project.Project, st: structure.Structure,
eid: int, props: bool, sync_workspace: bool, copy: bool):
"""
Set entry structure corresponding to given entry id in the project.
:param proj: The current project.
:param st: Entry structure to be set.
:param eid: Entry id associated with current project.
:param props: True if properties should also be copied.
:param sync_workspace: Synchronize workspace as well.
:param copy: True if new copy of entry structure is returned.
:rtype: bool
:return: Whether the structure is set successfully in the project entry ct.
"""
row = get_entry_row(proj, eid)
if row is None or st is None:
return False
row.setStructure(st, props=props, sync_workspace=sync_workspace, copy=copy)
return True
[docs]class MovieInterruptionType(Enum):
"""
Type defining a save movie interruption type.
"""
# Movie is paused by clicking over Pause button.
PAUSED = 0,
# Player itself is closed by clicking over X button.
CLOSED = 1
[docs]class MovieInterruptionDialogRetCode(Enum):
"""
Type defining a return code from save movie interruption warning dialog.
"""
# Save movie using only currently saved frames and stop saving movie.
PARTIAL_SAVE = 0,
# Continue saving movie until all requested frames are saved.
RESUME = 1,
# Cancel save movie operation and discard currently saved frames.
CANCEL = 2
[docs]class MovieExportInterruptionDialog(messagebox.MessageBox):
"""
A dialog that displays a warning when movie export operation is interrupted.
"""
[docs] def __init__(self, parent, interruption_type):
"""
:type parent: QtWidgets.QWidget
:param parent: Parent widget of this dialog.
:type interruption_type: enum(MovieInterruptionType)
:param interruption_type: Interruption source button.
"""
if interruption_type == MovieInterruptionType.CLOSED:
action_text = 'Closing the Trajectory Player'
else:
action_text = 'Pausing now'
message = textwrap.dedent(f"""
{action_text} will terminate the export and truncate the resulting movie; only the previously displayed portion will be saved.
To save the partial movie, click Save. To continue the 'export, click Resume.
Cancel will terminate the export without saving.
""")
super().__init__(parent=parent,
title='Movie Export Interrupted',
text=message,
icon=QtWidgets.QMessageBox.Question)
save = self.addButton('Save', QtWidgets.QMessageBox.AcceptRole)
cancel = self.addButton('Cancel', QtWidgets.QMessageBox.RejectRole)
resume = self.addButton('Resume', QtWidgets.QMessageBox.AcceptRole)
self.setDefaultButton(save)
self.setEscapeButton(cancel)
self.button_ret_code_dict = {
save: MovieInterruptionDialogRetCode.PARTIAL_SAVE,
resume: MovieInterruptionDialogRetCode.RESUME,
cancel: MovieInterruptionDialogRetCode.CANCEL
}
[docs] def exec_(self):
"""
Show the warning dialog.
:rtype: enum(MovieInterruptionDialogRetCode)
:return: Return one of the enum value defined by
MovieInterruptionDialogRetCode.
"""
messagebox.MessageBox.exec_(self)
clicked_button = self.clickedButton()
return self.button_ret_code_dict[clicked_button]
[docs]class TrajectoryMaestroSettings:
"""
Whenever player is setup for a trajectory entry, it saves current settings
and apply specific settings.
Whenever player state is cleared, it restores original settings.
"""
[docs] def __init__(self, proj, eid, adjust_pos_func,
translate_to_first_unit_cell):
"""
Save original values of maestro settings and apply new settings
according to trajectory mode.
:type proj: project.Project handle.
:param proj: Project handle
:type eid: int
:param eid: Entry id
:type adjust_pos_func: method(bool)
:param adjust_pos_func: Function callback to be called to update
the adjust position.
:type translate_to_first_unit_cell: bool
:param translate_to_first_unit_cell: Indicates whther 'Translate to
first unit cell' is toggled on or off.
"""
row = get_entry_row(proj, eid)
periodic_fix = get_entry_row_property_value(row, msprops.PERIODIC_FIX)
is_infinite = bool(get_entry_row_property_value(row, IS_INFINITE))
# Remember original max pbc atoms used in maestro.
self._orig_max_pbc_atoms = maestro.get_command_option(
"trajectoryplayersettings", "maxpbcatoms")
self._pbc_atoms_set = is_infinite or periodic_fix is False
# MATSCI-3894, MATSCI-6234
# TODO remove periodicfix check
# It is here for backward compatibility (18-4 and older)
if self._pbc_atoms_set or translate_to_first_unit_cell:
self._setMaxPBCAtoms(0)
# Always set adjust position to none whenever entry is loaded.
adjust_pos_func(False)
# Remember original crystal unit cell marker state.
self._orig_crystal_unit_cell_state = maestro.get_command_option(
"crystalunitcell", "showmarker") == 'True'
# Turn off crystal unit cell marker state if it is on.
if self._orig_crystal_unit_cell_state:
self._setCrystalUnitCellMarkerState(False)
def _runMaestroCommand(self, cmd):
"""
Executes maestro command.
:param cmd: Maestro command to execute.
:type cmd: str
"""
try:
maestro.command(cmd)
except maestro.MaestroCommand:
pass
def _setCrystalUnitCellMarkerState(self, crystal_unit_cell_show):
"""
Set crystal unit cell marker state in maestro.
:param crystal_unit_cell_show: Indicates whether to show crystal unit
cell marker in maestro or not.
:type crystal_unit_cell_show: bool
"""
val = 'true' if crystal_unit_cell_show else 'false'
self._runMaestroCommand(f"crystalunitcell showmarker={val}")
def _setMaxPBCAtoms(self, max_pbc_atoms):
"""
Set maximum pbc atoms in maestro. If max_pbc_atoms is 0, maestro always
checks for PBC bonds regardless of how many atoms are in the workspace.
MAE-40798
:param max_pbc_atoms: Maximum pbc atoms to consider.
:type max_pbc_atoms: int
"""
self._runMaestroCommand(
f"trajectoryplayersettings maxpbcatoms={max_pbc_atoms}")
[docs] def setMaxPBCAtomsForUnitCell(self, translate_to_first_unit_cell):
"""
Set maxpbcatoms based on translate_to_first_unit_cell. maxpbcatoms will
be set to 0, if True, otherwise to the original maxpbcatoms.
:param translate_to_first_unit_cell: Indicates whther 'Translate to
first unit cell' is toggled on or off.
:type translate_to_first_unit_cell: bool
"""
# Do nothing, if maxpbcatoms was set to 0 because of periodicfix and
# is_infinite properties at the construction time.
if self._pbc_atoms_set:
return
self._setMaxPBCAtoms(
0 if translate_to_first_unit_cell else self._orig_max_pbc_atoms)
[docs] def restore(self):
""""
Restore maestro settings using original values which were initially
changed.
"""
# Display crystal unit cell marker if it was visible earlier.
if self._orig_crystal_unit_cell_state:
self._setCrystalUnitCellMarkerState(True)
self._setMaxPBCAtoms(self._orig_max_pbc_atoms)
[docs]class EntryTrajectory:
"""
It encapsulates all the data related to trajectory inside this object.
Primary information stored are trajectory entry settings, cms model,
msys model, and list of trajectory frames.
- Supports both fixed and varying atoms frame trajectory.
- When every frame has the same number of atoms, it is called fixed atoms
trajectory.
- When all frames do not have same number of atoms, it is called varying
atoms trajectory.
- Supports full system ct validation against entry ct.
- Atoms visibility handling.
- Trajectory frame positioning (center or superimpose, no-position)
- Secondary structure update
- Restoriation of original entry ct.
"""
[docs] def __init__(self, proj: schrodinger.project.Project, eid: int,
matsci_profile: bool):
"""
:param proj: The project containing this trajectory.
:param eid: The entry id attached to this trajectory.
:param matsci_profile: Whether current maestro profile is MatSci.
It is used to configure default settings based on the profile.
"""
self.proj = proj
self.eid = eid
self.cms_model = None
self.msys_model = None
self.trajectory = None
self._has_inactive_atoms = False
self.settings_data = None
self.total_frame = None
self.matsci_profile = matsci_profile
# This CT is used to track original entry ct. Visiblity changes outside
# of trajectory are updated in this ct. This information is used to
# restore default entry ct visiblity.
self.orig_entry_ct = None
# This CT is a primary ct which is updated using Desmond APIs.
# TrajectoryPlayer uses this as a base ct during trajectory player
# operations.
self.frame_ct = None
self.ss_data = SecondaryStructureData()
self._has_secondary_structure = False
self._reference_entry_statistics = None
self._orig_box_values = None
if self.readTrajectory():
self.total_frame = len(self.trajectory)
self.detectVaryingAtomsFrame()
self.initFrameToActiveAtomTotalMap()
self.settings_data = playback_settings_data.PlaybackSettingsData(
self.proj,
self.eid,
self.total_frame,
matsci_profile=matsci_profile)
self._updateSavedStructures()
# Save original box values from project because these will change
# according to current frame when player is active.
self._orig_box_values = get_distance_cell_props(self.proj, self.eid)
# Optimization - cache if structure has secondary structure
# property.
self._has_secondary_structure = any(
at.secondary_structure != structure.SS_NONE
for at in self.orig_entry_ct.atom)
# Dictionary of the dgo change type and the callback method
# which should be called on that particular dgo change.
self.dgo_change_type_callback_dict = {
maestro_ui.MM_DGO_RIBBON_SETTINGS_CHANGED:
maestro_ui.clone_atoms_ribbon_settings,
maestro_ui.MM_DGO_COLOR_CHANGED:
maestro_ui.clone_atoms_color_settings,
maestro_ui.MM_DGO_REP_CHANGED:
maestro_ui.clone_atoms_representation_settings,
maestro_ui.MM_DGO_ATOM_LABEL_SETTINGS_CHANGED:
maestro_ui.clone_atoms_label_settings
}
[docs] def detectVaryingAtomsFrame(self):
"""
Identify if trajectory contains varying atoms frame or not and
accordingly set EntryTrajectory._has_inactive_atoms data member.
"""
for frame_number in range(1, self.total_frame + 1):
fr = self.getFrame(frame_number)
if fr.natoms != self.getFrameActiveAtomTotal(frame_number):
self._has_inactive_atoms = True
return
@property
def reference_entry_statistics(self):
"""
:return: Return reference entry statistics object if available.
:rtype: ReferenceEntryStatistics
"""
return self._reference_entry_statistics
@reference_entry_statistics.setter
def reference_entry_statistics(self, ref_stat: ReferenceEntryStatistics):
"""
Set reference entry statics object instance and update dependent flags.
"""
self._reference_entry_statistics = ref_stat
self.settings_data.allow_replication = (
ref_stat is None or (self.reference_entry_included_first and
(not self.isVaryingAtomsFrame())))
@property
def reference_entry_included_first(self):
"""
:return: True if reference entry is included first in the workspace.
:rtype: bool
"""
return (self.allow_reference_entry and
self.reference_entry_statistics.reference_entry_included_first)
@property
def has_secondary_structure(self):
"""
@return bool indicating if structure has secondary structure properties
or not.
"""
return self._has_secondary_structure
def _updateSavedFrameStructure(self):
"""
Save original frame structure.
"""
# Optimization: If we are using fixed frame trajectory and there is no
# replica, we directly operate on workspace structure, so there is no
# need to have a temporary frame structure - MAE-42350
if self.usingTemporaryFrameStructure():
self.frame_ct = self.orig_entry_ct.copy()
else:
self.frame_ct = maestro.workspace_get(copy=False)
[docs] def updateSavedFrameStructureForExport(self, st: structure.Structure):
"""
Update active frame structure for export operation.
"""
self.frame_ct = st.copy() if self.usingTemporaryFrameStructure() else st
def _updateSavedStructures(self):
"""
Save original entry structure and default frame structure.
"""
self.orig_entry_ct = get_entry_ct(self.proj,
self.eid,
props=False,
copy=True)
self._updateSavedFrameStructure()
[docs] def updateTrajectoryEntryStructure(self, workspace_st, atoms_bs,
change_type):
"""
Update saved original entry structure according to change type from the
workspace structure.
:param workspace_st: The workspace strucutre
:type workspace_st: schrodinger.structure.Structure
:param atoms_bs: The MMbs handle of atoms which are affected by
this change. Note that atoms_bs represents indices in the workspace
structure and it is possible that workspace structure contains
replica or external reference structure, so mapping for workspace
structure atom indices to original entry structure needs to be
taken care in this method.
:type atoms_bs: MMbs handle
:param change_type: Type of workspace change (color, label, or ribbon)
:type change_type: maestro_ui.MMENUM_DGO_CHANGED_TYPE
"""
if not self.canAcceptWorkspaceStructureChange(change_type):
return
if not mm.mmbs_in_use(atoms_bs):
return
self.dgo_change_type_callback_dict[change_type](workspace_st, atoms_bs,
self.orig_entry_ct)
[docs] def canAcceptWorkspaceStructureChange(self, change_type):
"""
Find out if given change type can be accepted by EntryTrajectory
:param change_type: Type of workspace change (color, label, or ribbon,
representation)
:type change_type: maestro_ui.MMENUM_DGO_CHANGED_TYPE
:return: True if change type is allowed to be accepted by this class.
:rtype: bool
"""
return change_type in self.dgo_change_type_callback_dict.keys()
[docs] def hasPlaybackSettingsData(self):
"""
Check if the current entry trajectory has playback settings
as entry properties
:return: True if eid has playback settings set otherwise False
:rtype: bool
"""
entry_row = get_entry_row(self.proj, self.eid)
return playback_settings_data.STEP_PROP in entry_row.property
def __del__(self):
"""
Destructor
- Make sure that trajectory entry data is written back in the project.
- Clear Desmond trajectory cache, otherwise memory is not released.
"""
if self.settings_data:
self.settings_data.writeSettings()
self._clearTrajectoryFrames()
def _getCMSFilePath(self, eid):
"""
Return trajectory file path if there is any. For Material Science case,
it generates cms file, if required.
"""
# Get cms file for Desmond case
if projutils.has_desmond_trajectory(self.proj, eid):
return utils.get_cms_file_path(self.proj, eid)
elif projutils.has_materials_trajectory(self.proj, eid):
# Get cms file for Material Science case
hidden_cms_file_path = utils.get_hidden_cms_file_path(
self.proj, eid)
# If not present, then generate and re-query
if not hidden_cms_file_path:
utils.generate_cms_file_from_entry(self.proj, eid)
hidden_cms_file_path = utils.get_hidden_cms_file_path(
self.proj, eid)
return hidden_cms_file_path
[docs] def readTrajectory(self):
"""
Read trajectory using desmond APIs and initialize trajectory entry
information.
"""
cms_file_path = self._getCMSFilePath(self.eid)
if not cms_file_path:
maestro.warning("Entry does not contain any trajectory.")
return False
try:
self.msys_model, self.cms_model, self.trajectory = traj_util.read_cms_and_traj(
cms_file_path)
topo.make_glued_topology(self.msys_model, self.cms_model)
except traj_util.TrajectoryUnreadableError as ex:
trj_path = utils.get_trajectory_path(self.proj, self.eid)
if trajectory_messages.show_invalid_trajectory_file_dlg(
trj_path) == MessageButton.REMOVE:
utils.set_trajectory_path(self.eid, "")
return False
return True
[docs] def isSameTrajectory(self, eid: int):
"""
Find out if trajectory associated with given entry id is same as this
object.
:param eid: The entry id attached to this trajectory.
:rtype: bool
:return: Whether the trajectory of same entry id is already loaded.
"""
new_trj_file_path = utils.get_trajectory_path(self.proj, eid)
curr_trj_file_path = utils.get_trajectory_path(self.proj, self.eid)
return (self.isValid() and self.eid == eid and
curr_trj_file_path == new_trj_file_path)
[docs] def isValid(self):
"""
Check if trajetory is initialized properly or not.
:rtype: bool
:return: Whether the trajectory object is property constructed.
"""
return (self.eid and self.cms_model and self.trajectory and
self.msys_model)
[docs] def isSingleFrameTrajectory(self):
"""
Check if trajetory has a single frame or more than one frame.
:rtype: bool
:return: Whether the trajectory object has more than one frame.
"""
return (self.total_frame == 1)
def _clearTrajectoryFrames(self):
# As of DESMOND-8903, cached native frames are automatically
# garbage-collected (when all `Frame` objects referencing the `Source`
# objects are garbage collected). No need to clear the cache explicitly.
self.trajectory = None
[docs] def reload(self):
"""
Reload trajectory. When user switches between No Position, Align
reference frame, or Center view positions, we need to pull original
trajectory, so that we can get original trajectory because Align
reference frame and Center view position modify trajectory data.
This function is expected to be called only if desmond system has
already been loaded once.
"""
if self.isValid():
self._clearTrajectoryFrames()
trj_dir = utils.get_trajectory_path(self.proj, self.eid)
self.trajectory = traj.read_traj(trj_dir)
[docs] def getFrame(self, frame_number: int):
"""
Returns a frame object corresponding to frame number.
"""
if self.trajectory and (0 < frame_number <= self.total_frame):
return self.trajectory[frame_number - 1]
[docs] def getFrameTime(self, frame_number: int):
"""
Return frame chemical time in nano seconds.
"""
frame = self.getFrame(frame_number)
return frame.time * 0.001 if frame is not None else 0
[docs] def getSmoothingFrames(self, frame_number: int):
"""
Return smoothing frames based on given frame_number and smoothing
setting value, or None if smoothing is not greater than 1
:param frame_number: Frame number for which smoothing frames are
required.
:rtype: list(schrodinger.application.desmond.packages.traj.Frame) or None
:return: List of frames to be used for smoothing.
"""
smoothing = self.settings_data.smoothing
if smoothing > 1:
end_frame = min(self.total_frame, frame_number + smoothing - 1) + 1
frames = [self.getFrame(i) for i in range(frame_number, end_frame)]
return frames
[docs] def updateViewPosition(self, forced_update: bool, asl: str):
"""
Update trajectory based on view position tab settings and adjust frame.
To center molecule, ASL must be valid and matched atoms must be at least
1.
To align molecules, ASL must be valid and matched atoms must be at least
3.
:param forced_update: Reload frames from default.
:param asl: ASL to be used.
:rtype: bool
:return: Whether the trajectory view positions are updated successfully.
"""
data = self.settings_data
if forced_update:
# Restore atoms and box position in the frames.
self.reload()
if data.adjust_view_position and asl:
# Check if ASL is valid or not.
if not analyze.validate_asl(asl):
maestro.warning("Invalid ASL '%s'" % asl)
return False
gids = topo.asl2gids(self.cms_model, asl, False)
matched_atoms = len(gids)
if data.avp_center_molecules and matched_atoms >= 1:
self.trajectory = topo.center(self.msys_model, gids,
self.trajectory)
elif data.avp_align_on_frame and matched_atoms >= 3:
ref_pos = self.trajectory[data.avp_ref_frame - 1].pos(gids)
self.trajectory = topo.superimpose(self.msys_model, gids,
self.trajectory, ref_pos)
if data.translate_to_first_unit_cell:
self.cms_model, self.trajectory = utils.wrap_trajectory(
self.cms_model, self.trajectory)
return True
def _sameNumberAtoms(self, st1: structure.Structure,
st2: structure.Structure):
"""
Whether given structures have same number of atoms.
:param st1: Structure1 to compare
:param st2: Structure2 to compare
:rtype: Bool
"""
return st1.atom_total == st2.atom_total
[docs] def getAtomTotal(self, frame_ct: structure.Structure):
"""
:param frame_ct: Frame ct
:rtype: int
:return: Given frame ct atom total.
"""
return frame_ct.atom_total if frame_ct else 0
[docs] def getFrameActiveAtomTotal(self, frame_number: int):
"""
return: Return total active atom count in the given frame.
"""
tr = self.getFrame(frame_number)
cms_model = self.cms_model
return cms_model.active_total_from_nactive_gids(tr.nactive, tr.natoms)
[docs] def initFrameToActiveAtomTotalMap(self):
"""
Initialize a map of frame to its associated atom total.
"""
self._frame_to_atom_total_map = [
self.getFrameActiveAtomTotal(fr)
for fr in range(1, self.total_frame + 1)
]
[docs] def getFrameToActiveAtomTotalMap(self):
"""
:return: A list of active atom total of all frames.
:rtype: list(int)
"""
return self._frame_to_atom_total_map
[docs] def isFrameStructureChanged(self, main_ct: structure.Structure):
"""
Whether the given structure changed with respect to the frame structure.
Only atom count is taken into consideration when comparision structures
for changes.
It is possible to have atoms total in the workspace structure be
the same as full system structure because frame structure could have
been reset to full system structure for varying atoms frame.
:param main_ct: Main ct.
:rtype: bool
:return: Whether the given structure changed with respect to frame ct.
"""
if self.usingTemporaryFrameStructure():
st = self.frame_ct
else:
st = self.orig_entry_ct
frame_ct_atom_total = self.getAtomTotal(st)
trajectory_atom_total = self.getTrajectoryAtomTotalInWorkspaceStructure(
main_ct)
full_system_atom_total = self.cms_model.fsys_ct.atom_total
return not (
(trajectory_atom_total == frame_ct_atom_total) or
(trajectory_atom_total == full_system_atom_total) or
is_replica_atom_count(trajectory_atom_total, frame_ct_atom_total))
[docs] def restoreEntryCT(self, eid: int, sync_workspace: bool,
is_snapshot_mode_active: bool):
"""
Save original trajectory ct in the project entry because atoms position
or visibility might have changed during play. CT will not be restored in
case of any modification.
:param eid: Entry id associated with current project.
:param sync_workspace: Whether or not synchronize changes in the
workspace.
:param is_snapshot_mode_active: True if player was showing frame in the
snapshot viewing mode.
"""
props = False
copy = False
entry_ct = get_entry_ct(self.proj, eid, props, copy)
# Entry ct will be None only if entry is deleted.
if entry_ct is None:
return
# Restore original cell values.
set_distance_cell_props(self.proj, eid, self._orig_box_values)
# Don't do anything if user edited structure.
if self.isUserEditedEntryCt(entry_ct, is_snapshot_mode_active):
return
set_entry_ct(self.proj,
self.orig_entry_ct,
eid,
props=props,
sync_workspace=sync_workspace,
copy=True)
[docs] def isUserEditedEntryCt(self, entry_ct: structure.Structure,
is_snapshot_mode_active: bool):
"""
Structure can be modified by automatic mechanism like replication,
frame snapshot viewing, these situations should not modify real
structure.
:param entry_ct: Current entry ct stored in the project.
:rtype: bool
:return: True if structure is edited by the user.
"""
orig_st = self.orig_entry_ct
has_replication = self.getNumberOfReplica() > 1
frame_to_atom_total_map = self.getFrameToActiveAtomTotalMap()
current_frame = self.settings_data.current_frame
frame_atom_total = frame_to_atom_total_map[current_frame - 1]
# Check if entry CT has been modified
# In case of replication check atom count, otherwise call
# _sameNumberAtoms()
# In case of varying atoms frame, we can not be sure that entry_ct atom
# total is exact replica of orig_st because entry_ct atom depends on
# current frame's atom total.
if has_replication:
exact_replica = self._sameNumberAtoms(
entry_ct, orig_st) or is_replica_atom_count(
entry_ct.atom_total, orig_st.atom_total) or (
self.isVaryingAtomsFrame() and is_replica_atom_count(
entry_ct.atom_total, frame_atom_total))
modified_st = not exact_replica
return modified_st
not_modified_st = (self._sameNumberAtoms(entry_ct, orig_st) or
is_snapshot_mode_active or
(self.isVaryingAtomsFrame() and
entry_ct.atom_total == frame_atom_total))
return not not_modified_st
[docs] def getNumberOfReplica(self):
"""
Returns number of replica including all dimensions.
:rtype: int
:return: Number of replica
"""
data = self.settings_data
return data.replicate_x * data.replicate_y * data.replicate_z
[docs] def getReplicateVector(self):
"""
Returns replicate vector i.e tuple of x, y, z
:rtype: tuple(int, int, int)
:return: Return tuple of replication vector
"""
data = self.settings_data
return (data.replicate_x, data.replicate_y, data.replicate_z)
[docs] def visibilityChanged(self, displayed: list, undisplayed: list,
entry_start_atom: int):
"""
Update saved atoms visibility according to new state.
The list of atoms represent workspace atoms, so we need to make sure that
atoms belonging from this entry are updated.
:param displayed: List of atoms which are displayed.
:param undisplayed: List of atoms which are undisplayed.
:param entry_start_atom: First entry atom number in the workspace ct.
"""
self._updateSavedStructuresVisiblity(displayed, True, entry_start_atom)
self._updateSavedStructuresVisiblity(undisplayed, False,
entry_start_atom)
def _updateSavedStructuresVisiblity(self, atoms: list, visible: bool,
entry_start_atom: int):
"""
Updated visibility in the saved original entry and frame structures.
:param atoms: The list of atoms be updated in the saved structures.
:param visible: Visibility flag.
:param entry_start_atom: First entry atom number in the workspace ct.
"""
entry_atom_total = self.getEntryAtomTotal()
entry_end_atom = entry_start_atom + entry_atom_total - 1
for at in atoms:
# We may have more atoms than original entry structures atoms
# because of system replication or multiple trajectories,
# so we need to have this check.
if entry_start_atom <= at <= entry_end_atom:
entry_atom = at - entry_start_atom + 1
for st in [self.orig_entry_ct, self.frame_ct]:
mm.mmctg_atom_set_visible(st, entry_atom, visible)
[docs] def getEntryAtomTotal(self):
"""
Return trajectory entry atom count.
:rtype: int
:return: Trajectory entry atom total
"""
return self.orig_entry_ct.atom_total
[docs] def usingTemporaryFrameStructure(self):
"""
If trajectory contains fixed atoms frame and there is no replica, we
operate directly on workspace structure (i.e. main ct).
Otherwise, operate on temporary frame ct.
:return: True if frame structure is not intended to be used same as
workspace structure.
"""
return (self.isVaryingAtomsFrame() or self.getNumberOfReplica() > 1 or
self.allow_reference_entry)
@property
def allow_reference_entry(self):
"""
If trajectory is superimposed on external structure, we
allow one more entry in the workspace, so that total
entries in the workspace are 2.
"""
return self.reference_entry_statistics is not None
@property
def reference_entry_id(self):
"""
:return: Return reference entry id if there is any.
"""
if self.allow_reference_entry:
return self.reference_entry_statistics.entry_id
@property
def trajectory_start_atom_in_workspace_structure(self):
"""
:return: Return frame structure starting atom position in the workspace
structure.
:rtype: int
"""
if self.allow_reference_entry and self.reference_entry_statistics.start_atom == 1:
return self.reference_entry_statistics.atom_total + 1
else:
return 1
[docs] def getTrajectoryAtomTotalInWorkspaceStructure(
self, main_ct: structure.Structure):
"""
If external structure is allowed to superimpose on frame structure,
then main_ct atom total does not reflect trajectory atom total in the
workspace structure, so we compute trajectory atom total here.
:param main_ct: Workspace structure
:return: Return trajectory atoms total in the workspace structure
including replica or varying atoms.
:rtype: int
"""
trajectory_atom_total_in_main_ct = main_ct.atom_total
if self.allow_reference_entry:
trajectory_atom_total_in_main_ct -= self.reference_entry_statistics.atom_total
return trajectory_atom_total_in_main_ct
[docs] def isVaryingAtomsFrame(self):
"""
If trajectory contains inactive atoms (i.e. varying atoms frame) or not.
:rtype: bool
:return: - For fixed atoms trajectory, it returns False.
- For varying atoms trajectory, it returns True.
"""
return self._has_inactive_atoms
[docs] def restoreDefaultVisibility(self, frame_ct: structure.Structure):
"""
Set default trajectory entry atoms visibility in the frame ct.
:param frame_ct: Frame structure.
"""
orig_ct = self.orig_entry_ct
maestro_ui.clone_atoms_visibility(orig_ct.handle, frame_ct.handle)
[docs] def applyVisibilityAsl(self, frame_ct: structure.Structure, asl: str,
display_only: bool, visible: bool):
"""
Apply visibility asl according to evaluated asl on frame ct associated
with this trajectory entry.
:param frame_ct: Frame structure.
:param asl: ASL to be evaluated for this entry.
:param visible: Visibility value of atoms matching the asl.
"""
maestro_ui.update_atoms_visibility_by_asl(frame_ct.handle, asl,
display_only, visible)
[docs] def updateFrameStructure(self, frame_number: int):
"""
Update frame structure ct corresponding to given frame.
Also return frame structure corresponding to given frame number.
:param frame_number: Frame number of the requested frame.
:rtype: structure.Structure
:return: Return an updated frame ct.
Note function assumes frame_number is a valid frame in the trajectory
and caller should always use returned ct for updating its own data, but
should not modify returned ct.
"""
return self.getUpdatedFrameStructure(
frame_number, self.frame_ct, self.getSmoothingFrames(frame_number))
[docs] def getUpdatedFrameStructure(self, frame_number: int,
frame_ct: structure.Structure,
frames_to_smooth: list):
"""
Update given frame structure ct corresponding to given frame.
and return updated frame structure.
:param frame_number: Frame number of the requested frame.
:param frame_ct: Frame structure to be updated.
:param frames_to_smooth: Frames whose atom coordinates are to be
smoothed to update the atom coordinates in frame_ct.
:rtype: structure.Structure
:return: Return an updated frame ct.
"""
frame = self.getFrame(frame_number)
if frame is not None:
# Updates position and smoothing.
topo.update_fsys_ct_from_frame_GF(frame_ct, self.cms_model, frame,
frames_to_smooth)
return frame_ct
[docs] def resetStructure(self, sync_workspace: bool):
"""
Reset entry's structure with trajectory structure
:param sync_workspace: Synchronize workspace after resetting a
structure in the project.
:rtype: bool
:return: Whether the entry structure is reset succesfully.
"""
old_frame_atom_total = self.getAtomTotal(self.frame_ct)
if set_entry_ct(self.proj,
self.cms_model.fsys_ct,
self.eid,
props=False,
sync_workspace=sync_workspace,
copy=True):
self._updateSavedStructures()
return True
return False
[docs] def entryStructureIsValid(self, frame_number: int):
"""
Validates entry structure with trajectory frame or
full system structure.
:param frame_number: The frame number relative to that entry structure
will be validated for varying atoms frame.
:rtype: bool
:return: Whether the structure is valid.
Note: As a side effect, it also updates EntryTrajectory.frame_ct.
"""
entry_ct = self.orig_entry_ct
trj_st = self.cms_model.fsys_ct
# For varying atoms frame, we can not guarantee that full system
# structure atom total is same as entry atom total (while playing or if
# user modifies frame ct), so also check with
# updated frame structure.
if (self.isVaryingAtomsFrame() and
trj_st.atom_total != entry_ct.atom_total):
trj_st = self.updateFrameStructure(frame_number)
return compare_structures_connectivity(entry_ct, trj_st)
[docs] def assignSecondaryStructure(self):
"""
Quick assign secondary structure properties.
"""
self.ss_data.assign(self.orig_entry_ct, self.frame_ct,
self.isVaryingAtomsFrame())
[docs] def getBoxProperties(self, frame_number: int):
"""
Return box properties for the given frame number.
If there is no frame corresponding to frame number, it uses cms_model
box properties.
:param frame_number: The frame number of the box for which points will
be computed.
:rtype: list
:return: List of 9 properties (ax, ay, az, bx, by, bz, cx, cy, cz)
"""
frame = self.getFrame(frame_number)
if frame:
return [
frame.box[0][0], frame.box[0][1], frame.box[0][2],
frame.box[1][0], frame.box[1][1], frame.box[1][2],
frame.box[2][0], frame.box[2][1], frame.box[2][2]
]
else:
box = self.cms_model.box
return [
box[0], box[1], box[2], box[3], box[4], box[5], box[6], box[7],
box[8]
]
[docs] def getCoordsMinMax(self, frame_number: int):
"""
Return structure minimum, maximum coordinates along three axes of a
frame. If there is no frame corresponding to frame number, it uses
cms_model.
:param frame_number: The frame number
:rtype: list, list
:return: List of minimum coordinates, list of maximum coordinates
"""
frame = self.getFrame(frame_number)
if frame:
xyz = frame.pos()
else:
xyz = self.cms_model.fsys_ct.getXYZ(copy=False)
return xyz.min(axis=0), xyz.max(axis=0)
[docs] def getCrystalUnitCellBoxPoints(self, frame_number: int,
pbc_position: Union[None, str]):
"""
Compute crystal unit cell points.
:param frame_number: The frame number of the box for which points will
be computed.
:param pbc_position: PBC position anchored at some origin defined by
its value. None corresponds to the default value
:rtype: list(list)
:return: List of 4 box points
"""
box_properties = self.getBoxProperties(frame_number)
params = xtal.get_params_from_chorus(box_properties)
box_min = 0.0
box_max = 1.0
v0 = xtal.trans_fract_to_cart([box_max, box_max, box_min], *params)
v1 = xtal.trans_fract_to_cart([box_min, box_max, box_min], *params)
v2 = xtal.trans_fract_to_cart([box_min, box_min, box_min], *params)
v3 = xtal.trans_fract_to_cart([box_max, box_min, box_min], *params)
v4 = xtal.trans_fract_to_cart([box_max, box_max, box_max], *params)
v5 = xtal.trans_fract_to_cart([box_min, box_max, box_max], *params)
v6 = xtal.trans_fract_to_cart([box_min, box_min, box_max], *params)
v7 = xtal.trans_fract_to_cart([box_max, box_min, box_max], *params)
points = [v0, v1, v2, v3, v4, v5, v6, v7]
if pbc_position is None or pbc_position == xtal.CENTER_PBC_POSITION:
# Implements maestro-src :: mm_mcscrystalunitcell.cxx :: center_on_structure
cell_center = xtal.trans_fract_to_cart([0.5, 0.5, 0.5], *params)
min_max = self.getCoordsMinMax(frame_number)
for pt in points:
for i, v in enumerate(cell_center):
pt[i] += (0.5 * (min_max[0][i] + min_max[1][i]) -
cell_center[i])
elif pbc_position.startswith(xtal.ANCHOR_PREFIX):
offset = xtal.get_carts_from_anchor_string(pbc_position)
for pt in points:
for i, v in enumerate(offset):
pt[i] += v
new_points = [points[0], points[1], points[3], points[4]]
return new_points
[docs] def getDesmondBoxPoints(self, frame_number: int):
"""
Compute desmond box points for the given frame number.
:param frame_number: The frame number of the box for which points will
be computed.
:rtype: list(list)
:return: List of 4 box points
"""
def half_pt(xyz: list):
return [v / 2 for v in xyz]
box_properties = self.getBoxProperties(frame_number)
half_a = half_pt(
[box_properties[0], box_properties[1], box_properties[2]])
half_b = half_pt(
[box_properties[3], box_properties[4], box_properties[5]])
half_c = half_pt(
[box_properties[6], box_properties[7], box_properties[8]])
# Create 8 box points.
XYZ = 3
v0 = [-half_a[i] - half_b[i] - half_c[i] for i in range(XYZ)]
v1 = [half_a[i] - half_b[i] - half_c[i] for i in range(XYZ)]
v2 = [half_a[i] + half_b[i] - half_c[i] for i in range(XYZ)]
v3 = [-half_a[i] + half_b[i] - half_c[i] for i in range(XYZ)]
v4 = [-half_a[i] - half_b[i] + half_c[i] for i in range(XYZ)]
v5 = [half_a[i] - half_b[i] + half_c[i] for i in range(XYZ)]
v6 = [half_a[i] + half_b[i] + half_c[i] for i in range(XYZ)]
v7 = [-half_a[i] + half_b[i] + half_c[i] for i in range(XYZ)]
return [v0, v1, v3, v4]
[docs] def getBoxPoints(self, frame_number: int):
"""
Compute simulation box points.
If trajectory is coming from Desmond, it uses frame box properties.
If trajectory is coming from materials science, it uses crystal unit
cell properties.
:param frame_number: The frame number of the box for which points will
be computed.
:rtype: list(maestro_ui.MM_GraphicsVec3d) or None
:return: List of 4 box points or None
"""
points = None
# Treat matsci system differently (MATSCI-10398)
if self.matsci_profile:
row = get_entry_row(self.proj, self.eid)
pbc_position = get_entry_row_property_value(row,
xtal.PBC_POSITION_KEY)
points = self.getCrystalUnitCellBoxPoints(frame_number,
pbc_position)
if points is None:
points = self.getDesmondBoxPoints(frame_number)
watermap_translation_xyz = self.getWaterMapBoxTranslationVector()
box_points = [maestro_ui.MM_GraphicsVec3d() for i in range(len(points))]
for i, v in enumerate(points):
box_points[i].setX(v[0] + watermap_translation_xyz[0])
box_points[i].setY(v[1] + watermap_translation_xyz[1])
box_points[i].setZ(v[2] + watermap_translation_xyz[2])
return box_points
[docs] def getWaterMapBoxTranslationVector(self):
"""
If trajectory is generated from WaterMap, then trajectory cms file also
contains 3 watermap specific properties which define translation values.
r_watermap_trans1 -> x direction
r_watermap_trans2 -> y direction
r_watermap_trans3 -> z direction
This setting is applicable only if player view position setting is
not center molecule in workspace.
"""
data = self.settings_data
if data is not None and not data.avp_center_molecules:
entry_row = get_entry_row(self.proj, self.eid)
xyz = [
get_entry_row_property_value(entry_row, prop)
for prop in ('r_watermap_trans1', 'r_watermap_trans2',
'r_watermap_trans3')
]
if all(xyz):
return xyz
return [0, 0, 0]
[docs] def updateBoxPropertiesInProject(self, frame_number: int):
"""
Update box property's value in the project.
"""
cell_properties = self.getBoxProperties(frame_number)
box_values = projectmodel.TriclinicBox(
[cell_properties[0], cell_properties[1], cell_properties[2]],
[cell_properties[3], cell_properties[4], cell_properties[5]],
[cell_properties[6], cell_properties[7], cell_properties[8]])
set_distance_cell_props(self.proj, self.eid, box_values)
[docs]class TrajectoryPlayer(QWidgetStyled):
"""
This is a toolbar for the Trajectory player
:cvar currentFrameChanged: A signal emitted when the current frame changes
in the player.
:vartype currentFrameChanged: QtCore.pyqtSignal
:cvar showTrajectorySnapshotPanel: A signal emitted when user clicks on View
Snapshot menu from EXPORT button.
:vartype showTrajectorySnapshotPanel: QtCore.pyqtSignal
:cvar trajectoryLoaded: A signal emitted when a new trajectory is loaded in
the player. Emits the total number of frames in the new trajectory
:vartype trajectoryLoaded: QtCore.pyqtSignal
:cvar trajectoryUnloaded: A signal emitted when a trajectory is unloaded from
the player.
:vartype trajectoryUnloaded: QtCore.pyqtSignal
:cvar closed: A signal emitted when player toolbar is closed.
:vartype closeD: QtCore.pyqtSignal
:cvar atomsSelectedToggled: A signal emitted when atom selection changes. Emits
a bool indicating if there are any atoms selected in the workspace
:vartype atomsSelectedToggled: QtCore.pyqtSignal
"""
currentFrameChanged = QtCore.pyqtSignal(int)
showTrajectorySnapshotPanel = QtCore.pyqtSignal()
trajectoryLoaded = QtCore.pyqtSignal(int)
trajectoryUnloaded = QtCore.pyqtSignal()
closed = QtCore.pyqtSignal()
createPlot = QtCore.pyqtSignal(Enum)
atomsSelectedToggled = QtCore.pyqtSignal(bool)
multiAtomsSelectedToggled = QtCore.pyqtSignal(bool)
MAX_ALLOWED_ENTRIES_IN_WORKSPACE = 2
[docs] def __init__(self, trj_hub=None, matsci_profile=False, parent=None):
"""
Sets up all the tool buttons in the toolbar.
See parent QtWidgets.QWidget for documentation.
:param matsci_profile: A flag indicating if it is matsci profile. It is
used to configure default setting in the profile.
:type matsci_profile: bool
:param trj_hub: The TrajectoryViewerHub which provides interaction
with maestro.
:type trj_hub: maestro_ui.TrajectoryViewerHub
"""
super(TrajectoryPlayer, self).__init__(parent)
# Trajectory Plot Panel initialization and setup
self.trajectory_plot_panel = traj_plot_gui.TrajAnalysisPlotPanel.getPanelInstance(
)
self.trajectory_plot_panel.displayFrameAndAsl.connect(
self.displayPlotSelection)
self.trajectory_plot_panel.displayAsl.connect(self.displayPlotSelection)
self.createPlot.connect(self.trajectory_plot_panel.createPlot)
self.trajectoryLoaded.connect(
self.trajectory_plot_panel.trajectoryChanged)
self.currentFrameChanged.connect(
self.trajectory_plot_panel.onCurrentFrameChanged)
hub = maestro_ui.WorkspaceHub.instance()
hub.atomSelectionChanged.connect(self._onAtomSelectChanged)
self.matsci_profile = matsci_profile
self.trj_hub = trj_hub
self.setObjectName("trj_player_toolbar_form")
base_layout = QtWidgets.QHBoxLayout(self)
base_layout.setContentsMargins(7, 0, 0, 0)
self._trj_maestro_settings = None
self.setEntryTraj(None)
self.setSnapshotPanelVisibility(False)
self.is_snapshot_mode_active = False
self.all_included = None
# Construct dual state player buttons
btn_info = ('start_or_step_back', 'play_or_pause',
'end_or_step_forward')
for btn_name in btn_info:
setattr(self, btn_name, QtWidgets.QToolButton())
btn = getattr(self, btn_name)
btn.setCheckable(True)
btn_action = QtWidgets.QAction()
setattr(self, btn_name + "_action", btn_action)
btn_action.setCheckable(True)
btn.setDefaultAction(btn_action)
base_layout.addWidget(btn)
btn.setObjectName('trj_' + btn_name)
base_layout.addSpacing(5)
# Vertical line
self.addVerticalLine(base_layout)
base_layout.addSpacing(5)
# Current frame label
self.current_frame_label = QtWidgets.QLabel("Current\nFrame:")
base_layout.addWidget(self.current_frame_label)
self.current_frame_label.setObjectName("trj_current_frame_label")
base_layout.addSpacing(5)
# Current frame edit box
self.current_frame = maestro_ui.MM_QLineEdit(self)
base_layout.addWidget(self.current_frame)
self.current_frame.setObjectName("trj_current_frame")
current_frame_tooltip = textwrap.dedent("""
Click inside to edit the current frame value
Press Enter or click out when done
""")
self.current_frame.setToolTip(current_frame_tooltip)
self.frame_validator = QtGui.QIntValidator(self)
self.current_frame.setValidator(self.frame_validator)
empty_width = self.current_frame.minimumSizeHint().width()
min_width = self.current_frame.fontMetrics().averageCharWidth() * 5
self.current_frame.setMinimumWidth(min_width)
self.current_frame.setSizePolicy(QtWidgets.QSizePolicy.Fixed,
QtWidgets.QSizePolicy.Fixed)
base_layout.addSpacing(5)
# Total frame label
self.total_frame_label = QtWidgets.QLabel("of X")
base_layout.addWidget(self.total_frame_label)
self.total_frame_label.setObjectName("trj_total_frame")
base_layout.addSpacing(5)
# Vertical box layout to place slider and current frame, current elapsed
# time and current frame state data.
slider_vertical_box = QtWidgets.QVBoxLayout()
slider_vertical_box.setContentsMargins(0, 0, 0, 0)
base_layout.addLayout(slider_vertical_box)
# Add Slider
slider_vertical_box.addSpacing(10)
self.frame_slider = maestro_ui.TripleMarkerSlider(self)
slider_vertical_box.addWidget(self.frame_slider)
self.frame_slider.setObjectName("trj_frame_slider")
self.frame_slider.updateToolTipText.connect(
self.updateFrameSliderToolTip)
self.frame_slider.setSizePolicy(QtWidgets.QSizePolicy.Minimum,
QtWidgets.QSizePolicy.Fixed)
base_layout.setStretchFactor(slider_vertical_box, 2)
# Slider elapsed time (two types of value depending on if playing or
# dragging)
slider_time_hbox = QtWidgets.QHBoxLayout()
slider_time_hbox.setContentsMargins(0, 0, 0, 0)
slider_vertical_box.addLayout(slider_time_hbox)
self.slider_time = QtWidgets.QLabel("0 of 5 ns")
slider_time_hbox.addWidget(self.slider_time, 5, QtCore.Qt.AlignCenter)
self.slider_time.setObjectName("trj_slider_time")
base_layout.addSpacing(5)
# Add vertical line separator
self.addVerticalLine(base_layout)
base_layout.addSpacing(5)
# Add menu button hbox (displayed in default state)
self.menu_button_hbox = QtWidgets.QHBoxLayout()
self.menu_button_hbox.setContentsMargins(0, 0, 0, 0)
base_layout.addLayout(self.menu_button_hbox)
base_layout.addSpacing(5)
# Add Export and playback settings tool button with menu.
btn_info = (('export_button', "Open Export menu"),
('playback_settings_button', "Show Playback Settings pane"),
('interactive_plots_button', 'Open plots menu'))
for name, tooltip in btn_info:
setattr(self, name, QtWidgets.QToolButton(self))
btn = getattr(self, name)
btn.setObjectName('trj_' + name)
btn.setPopupMode(QtWidgets.QToolButton.InstantPopup)
self.menu_button_hbox.addWidget(btn, 2, QtCore.Qt.AlignLeft)
btn.setToolTip(tooltip)
# Hook up the interactive plots panel to its button
self.setupPlotMenu(self.interactive_plots_button)
self._movie_saver = MovieSaver(self)
self._structure_exporter = frame_structure_exporter.FrameStructureExporter(
player_obj=self,
export_mode=ExportMode.TRAJECTORY_VIEWER,
parent=self)
# Setup export menu.
self.setupExportMenu(self.export_button)
base_layout.addStretch(1)
self.playback_settings_button.setCheckable(True)
self.close_button = QtWidgets.QPushButton("X")
self.close_button.setObjectName("trj_player_close")
base_layout.addWidget(self.close_button, 0, QtCore.Qt.AlignTop or
QtCore.Qt.AlignRight)
self.close_button.clicked.connect(self.closePlayerIfAllowed)
self.setStyleSheet(stylesheet.PLAYER_TOOLBAR_STYLESHEET)
self.frame_slider.valuesChanged.connect(self.sliderValuesChanged)
self.current_frame.returnPressed.connect(self.currentFrameValueChanged)
self.current_frame.lostFocus.connect(self.ensureValidCurrentFrame)
self.play_or_pause.toggled.connect(self.playOrPauseButtonClicked)
self.play_or_pause.toggled.connect(self.playOrPauseButtonToggled)
self.start_or_step_back.clicked.connect(
self.startOrStepBackButtonClicked)
self.end_or_step_forward.clicked.connect(
self.endOrStepForwardButtonClicked)
self.current_frame.clicked.connect(self.pausePlayer)
self.frame_slider.mouseDragged.connect(self.pausePlayer)
self.frame_slider.mouseReleased.connect(self.resumePlayer)
self.frame_timer = QtCore.QTimer(self)
self.frame_timer.setSingleShot(True)
self.frame_timer.timeout.connect(self._showNextFrame)
# Initialize all variables.
self.clearTrajectory()
if maestro:
maestro.project_close_callback_add(self.projectAboutToClose)
self.setVisible(False)
self.playback_settings_popup = playback_settings.PlaybackSettings(
self.matsci_profile, self.playback_settings_button)
# Refer to MAE-43922 why we have single shot timer for inclusion change.
self.inclusion_update_timer = QtCore.QTimer()
self.inclusion_update_timer.timeout.connect(self._handleInclusionUpdate)
self.inclusion_update_timer.setInterval(10)
self.inclusion_update_timer.setSingleShot(True)
self.setupContextMenu()
[docs] def projectAboutToClose(self):
"""
Handle trajectory entry when project is about to be closed.
"""
if self.entry_traj:
self.is_snapshot_mode_active = False
self._restoreDefaultStateOfEntryCt(False)
[docs] def closePlayerIfAllowed(self):
"""
Called when user clicks on X button.
If player is saving a movie, warn user before closing a player,
otherwise hide player, notify clients and restore default entry ct.
"""
# Stop frame timer if player was active.
if self.player_active:
self.frame_timer.stop()
should_close = not self.savingMovie()
if self.savingMovie():
ret_code = self.showMovieInterruptionDialog(
MovieInterruptionType.CLOSED)
should_close = ret_code != MovieInterruptionDialogRetCode.RESUME
if should_close:
self.hide()
self.trj_hub.getPBCMeasurementManager().setPBCMeasurementActive(
False)
self.closed.emit()
# We should restore entry ct if snapshot panel is not active,
# otherwise snapshot panel takes care of it.
if self.entry_traj and (not self.is_snapshot_mode_active):
self.restoreEntryCT(eid=self.entry_traj.eid,
sync_workspace=True)
# Player toolbar should not clear trajectory if snapshot panel is
# still visible because underlying trajectory (too much memory
# intensive) is shared between player and snapshot panel.
if not self.is_snapshot_panel_visible:
self.clearTrajectory()
[docs] def setEntryTraj(self, entry_traj=None):
"""
Set the EntryTrajectory for this panel.
:param entry_traj: EntryTrajectory to be set.
:type entry_traj: EntryTrajectory or None
"""
self.entry_traj = entry_traj
self.trajectory_plot_panel.entry_traj = entry_traj
[docs] def showMovieInterruptionDialog(self, interruption_type):
"""
Show movie interruption warning dialog.
If user pressed Save, save only currently saved frames.
If user pressed Resume, continue saving all frames.
If user pressed Cancel, discard all saved frames.
:type interruption_type: enum(MovieInterruptionType)
:param interruption_type: Interruption source button.
:rtype: enum(MovieInterruptionDialogRetCode)
:return: Button pressed by user.
"""
dlg = MovieExportInterruptionDialog(self, interruption_type)
ret_code = dlg.exec_()
if ret_code == MovieInterruptionDialogRetCode.PARTIAL_SAVE:
self._movie_saver.stopSavingMovie(ok_status=True)
elif ret_code == MovieInterruptionDialogRetCode.RESUME:
self.player_active = True
else:
self._movie_saver.stopSavingMovie(ok_status=False)
return ret_code
[docs] def restoreEntryCT(self, eid: int, sync_workspace: bool):
"""
Save original trajectory ct in the project entry because atoms position
or visibility might have changed during play. CT will not be restored in
case of any modification.
:param eid: Entry id associated with current project.
:param sync_workspace: Whether or not synchronize changes in the
workspace.
"""
if self.entry_traj:
self.entry_traj.restoreEntryCT(eid, sync_workspace,
self.is_snapshot_mode_active)
[docs] def pausePlayer(self):
"""
Slot of maestro_ui.MM_QLineEdit.clicked() signal caused by current frame
text edit box.
Slot of maestro_ui.TripleMarkerSlider.mouseDragged() signal caused by
slider drag.
Pause a player and remember its current state.
"""
if self.player_active:
self.player_was_paused = True
self.player_active = False
[docs] def resumePlayer(self):
"""
1. Resume playing if player was paused due to current frame text edit box.
2. Slot of maestro_ui.TripleMarkerSlider.mouseReleased() signal caused by
stopping slider drag.
3. Advance Setting dialog is displayed.
"""
# If user edits current frame control (focus is with current frame).
# Now if without changing focus, if user directly closes maestro or
# project, first project close callback will be triggered which will
# clear trajectory data and later resumePlayer() will be triggered
# because current frame edit box lost a focus.
# In such cases, we don't have trajectory itself, so we should return
# early.
if not self.entry_traj:
return
if self.player_was_paused:
self.player_active = True
self.player_was_paused = False
else:
# Show desired frame immediately.
self.updateCurrentFrame()
[docs] def updateCurrentFrame(self):
"""
1. Show current frame in the workspace.
2. Update player gui.
:rtype: bool
:return: Whether the current frame is updated.
"""
main_ct = maestro.workspace_get(copy=False)
self.frame_timer.stop()
if not self.updateStructureFromFrame(
self.current_frame_number, main_ct, notify_maestro=True):
# Use single shot timer to call clearTrajectory() to avoid issue
# with disabling play_or_pause button
QtCore.QTimer.singleShot(0, self.clearTrajectory)
return False
self.updatePlayer()
if self.player_active:
self.frame_timer.start()
return True
[docs] def loadEntryID(self, eid):
"""
Loads an entry ID, displaying a message if the trajectory is
not found
:param eid: Entry ID of the requested trajectory
:type eid: int
:rtype: bool
:return: Whether the entry ID was found and included
"""
if not self.entry_traj:
return False
if self.entry_traj.eid == eid:
return True
pt = maestro.project_table_get()
row = pt.getRow(eid)
if not row:
delete_plots = messagebox.show_question(
self,
REQUESTED_TRAJ_NOT_FOUND_MSG,
title="Requested Trajectory Not Found")
if delete_plots:
self.trajectory_plot_panel.deletePlotsForEntry(eid)
return False
load_traj = messagebox.show_question(self,
LOAD_TRAJ_MSG,
"Requested Trajectory Not Loaded",
icon=messagebox.MessageBox.Warning)
if not load_traj:
return False
else:
entry_traj = EntryTrajectory(self.proj,
eid,
matsci_profile=self.matsci_profile)
if not entry_traj.isValid():
delete_plots = messagebox.show_question(
self,
REQUESTED_TRAJ_NOT_FOUND_MSG,
title="Requested Trajectory Not Found",
icon=messagebox.MessageBox.Warning)
if delete_plots:
self.trajectory_plot_panel.deletePlotsForEntry(eid)
return False
# Updating inclusion to this new trajectory will load it.
pt.includeRows([eid])
return True
[docs] def displayPlotSelection(self, fit_asl, eid, frame_idx=None):
"""
Loads the given entry ID, updates the current frame, and display
an asl selection
:param fit_asl: ASL to fit Workspace to after loading frame
:type fit_asl: str
:param eid: Entry ID of the requested trajectory
:type eid: int
:param frame_idx: Frame to be loaded with 1-based indexing.
If None, no frame is loaded
:type frame_idx: int or None
"""
if not self.loadEntryID(eid):
return
if frame_idx is not None:
self.current_frame_number = frame_idx
self.frame_slider.setMiddleValue(frame_idx)
self.updateCurrentFrame()
if fit_asl:
# Explicitly optimize fog until completion of MAE-45427
cmd = (f'fit {fit_asl}\n'
f'workspaceselectionreplace {fit_asl}\n'
f'optimizefog')
maestro.command(cmd)
def _togglePlotVisibleInteractions(self, use_visible):
"""
Toggle whether interactions plots should be generated using only
visible Workspace atoms.
:param viuse_visible: Whether only visible atoms should be considered.
:type use_visible: bool
"""
self.trajectory_plot_panel.interactions_use_visible_atoms = use_visible
[docs] def savingMovie(self):
"""
:rtype: bool
:return: True if saving a movie.
"""
return self._movie_saver.savingMovie()
[docs] def ensureValidCurrentFrame(self):
"""
Make sure that valid value is set in the current frame text edit box.
"""
if (self.current_frame_number < self.current_min_value or
self.current_frame_number > self.current_max_value):
self.current_frame_number = self.current_value
elif self.current_frame_number != self.current_value:
self.current_value = self.current_frame_number
self.resumePlayer()
[docs] def sliderValuesChanged(self):
"""
Slot which gets called whenenver any slider point value changes.
Updates current frame text box, elapsed time, and total time to
view entire trajectory, current frame text box valid range.
"""
self.current_frame_number = self.current_value
self.frame_validator.setRange(self.current_min_value,
self.current_max_value)
start_frame = self.playback_settings_popup.data.start_frame
end_frame = self.playback_settings_popup.data.end_frame
min_max_change = (start_frame != self.current_min_value or
end_frame != self.current_max_value)
self.playback_settings_popup.data.start_frame = self.current_min_value
self.playback_settings_popup.data.end_frame = self.current_max_value
if min_max_change and self.playback_settings_popup:
self.playback_settings_popup.resetMaxStepSize()
self.playback_settings_popup.data.current_frame = self.current_value
self.currentFrameChanged.emit(self.current_frame_number)
self.updatePlayer()
[docs] def updatePlayer(self):
"""
Update all components of player toolbar.
"""
self.updateTotalTime()
self.updateElapsedTime()
self.slider_time.setStyleSheet(
stylesheet.SLIDER_TIME_STYLE_BRIGHT if self.
player_active else stylesheet.SLIDER_TIME_STYLE_NORMAL)
self.updatePlayerBackAndForwardButtons(self.player_active,
self.current_value)
# Disable Export button whenever player is active and not saving movie.
self.export_button.setEnabled(not self.player_active and
(not self.savingMovie()))
# Update simulation box if player is not active because automatic
# update happens during play.
if not self.player_active:
self.updateSimulationBox()
# Update tooltip based on player state.
self.updatePlayerTooltip()
[docs] def getTimeValueInRange(self, start_frame: int, end_frame: int):
"""
Compute total time for a given range.
Function returns valid value only if there is a valid trajectory and
start and end frame numbers are valid.
:param start_frame: Starting frame number.
:param end_frame: Ending frame number.
:return: Total time in the given range.
:rtype: float
"""
if (self.entry_traj and self.isValidFrame(start_frame) and
self.isValidFrame(end_frame)):
frame_numbers = range(start_frame, end_frame + 1)
return sum(
self.entry_traj.getFrameTime(frame_number) -
self.entry_traj.getFrameTime(frame_number - 1)
for frame_number in frame_numbers)
else:
return 0
[docs] def updateTotalTime(self):
"""
Update total time label keeping slider min and max in the consideration.
"""
self.total_time = self.getTimeValueInRange(self.getStartFrame(),
self.getEndFrame())
[docs] def updateElapsedTime(self):
"""
Update elapsed time label keeping slider current and min
size in the consideration.
"""
self.elapsed_time = self.getTimeValueInRange(self.getStartFrame(),
self.current_value)
[docs] def currentFrameValueChanged(self):
"""
Slot which gets called whenenver current frame value text changes.
Updates middle slider point and elapsed time.
"""
self.current_value = self.current_frame_number
self.resumePlayer()
@property
def current_min_value(self):
"""
Return slider's left point value.
"""
return int(self.frame_slider.leftValue())
@current_min_value.setter
def current_min_value(self, value):
"""
Set slider's left point value.
"""
self.frame_slider.setLeftValue(value)
@property
def current_max_value(self):
"""
Return slider's right point value.
"""
return int(self.frame_slider.rightValue())
@current_max_value.setter
def current_max_value(self, value):
"""
Set slider's right point value.
"""
self.frame_slider.setRightValue(value)
@property
def current_value(self):
"""
Return slider's middle point value.
0-indexed value.
"""
return int(self.frame_slider.middleValue())
@current_value.setter
def current_value(self, value):
"""
Set slider's middle point value.
"""
self.frame_slider.setMiddleValue(value)
self.current_frame_number = value
# Current value is reset when clearing a trajectory
# i.e. deleting a trajectory object, so we won't have playback settings
# object at that time.
if self.playback_settings_popup:
self.playback_settings_popup.data.current_frame = value
self.currentFrameChanged.emit(self.current_frame_number)
@property
def current_frame_number(self):
"""
Return current frame number.
"""
text_val = self.current_frame.text()
return int(text_val) if text_val else 0
@current_frame_number.setter
def current_frame_number(self, frame_number):
"""
Set current frame number to be displayed.
"""
self.current_frame.setText(str(frame_number))
@property
def total_frame(self):
"""
Return total frame in the trajectory.
"""
return self._total_frame
@total_frame.setter
def total_frame(self, value):
"""
Set total frame label.
"""
self.total_frame_label.setText("of " + str(value))
self._total_frame = value
@property
def total_time(self):
"""
Return total time to view trajectory.
"""
return self._total_time
@total_time.setter
def total_time(self, value):
"""
Set total frame label.
"""
self.slider_time.setText(" ".join(
[str(0), "of", self.formattedValue(value), "ns"]))
self._total_time = value
@property
def elapsed_time(self):
"""
Return elapsed time of trajectory view.
"""
return self._elapsed_time
@elapsed_time.setter
def elapsed_time(self, value):
"""
Set elapsed time of trajectory view.
"""
self.slider_time.setText(" ".join([
self.formattedValue(value), "of",
self.formattedValue(self.total_time), "ns"
]))
self._elapsed_time = value
@property
def player_active(self):
"""
Return true if player is running.
"""
return self.play_or_pause.isChecked()
@player_active.setter
def player_active(self, state):
"""
Set player active or inactive state.
"""
self.play_or_pause.setChecked(state)
[docs] def updateSimulationBox(self):
"""
Compute simulation box points and notify maestro to show/hide simulation
box.
Notify maestro to draw simulation box based on current frame box
position.
TODO: MultipleTrajectory - We need to update all trajectories simulation
box.
"""
if self.trj_hub is None:
return
self.trj_hub.setDisplaySimulationBox.emit(self.simbox_visible)
if self.entry_traj is None:
return
if not self.simbox_visible:
return
cell_points = self.entry_traj.getBoxPoints(self.current_frame_number)
# To do pass the vector lengths here
if cell_points:
self.trj_hub.setSimulationBox.emit(
maestro_ui.MM_GraphicsParallelepiped(cell_points),
self.playback_settings_popup.data.include_vector_lengths)
[docs] def getStartAtom(self, entry_traj: EntryTrajectory):
"""
Get starting atom of this trajectory entry in the workspace ct.
:param entry_traj: Trajecotry object of which first atom we need to find
in the workspace structure.
:rtype: int
:return: First atom in the workspace structure associated with this
trajectory entry.
Note: This will change for multiple trajectories.
"""
return 1
@property
def simbox_visible(self):
"""
Return true if simulation box is visible.
"""
return self._simbox_visible
@simbox_visible.setter
def simbox_visible(self, value):
"""
Set simulation box visibility flag and notify maestro.
"""
self._simbox_visible = value
self.updateSimulationBox()
def _showNextFrame(self):
"""
Calculate next frame number according to step, direction, loop
options and display frame in the workspace.
"""
# Stop frame timer and setup current frame for maestro
# to display in the workspace.
self.frame_timer.stop()
continue_play = self.trj_hub.continueTrajectoryPlay()
if continue_play:
new_curr_frame = self.takeTrajectoryStep(self.current_frame_number)
# If next frame is valid, then only update it.
if continue_play and self.isValidFrame(new_curr_frame):
self.current_value = new_curr_frame
self.updateCurrentFrame()
else:
# Stop player if we reached to end.
if self.savingMovie():
self._movie_saver.stopSavingMovie(ok_status=True)
self.player_active = False
self.updatePlayer()
[docs] def setDisplayAtomsASL(self, unused, forced_update=True):
"""
Set matching atoms asl. This ASL acts as display only
asl if Playback Settings -> Advanced tab -> Display Only checked,
otherwise acts as hide specified atoms.
:param unused: It is not used.
:type unused: bool
:type forced_update: bool
:param forced_update: Update current frame in workspace if player is
not active.
"""
if self.playback_settings_popup.data.display_only:
asl = self.playback_settings_popup.getDisplayAtomsAsl()
else:
asl = self.playback_settings_popup.getHideAtomsAsl()
self._display_atoms_asl_changed = True
self._display_atoms_asl_is_valid = False
# Default is True to indicate that ASL is always dynamic, unless
# we are assured by mmasl_is_dynamic_expression API check.
self._display_atoms_asl_is_dynamic = asl is not None
if asl:
self._display_atoms_asl_is_valid = analyze.validate_asl(asl)
self._display_atoms_asl_is_dynamic = (
self._display_atoms_asl_is_valid and
mm.mmasl_is_dynamic_expression(asl) == mm.TRUE)
# If player is not active and user modified asl, then apply it
# immmediately.
if not self.player_active and forced_update:
self.updateCurrentFrame()
[docs] def canAcceptVisibilityChange(self, all_included):
"""
Check if visibility change can be accepted or not.
@param all_included: Currently included entries list.
@type all_included: list
@return: True if visibility change can be accepted
@rtype: bool
"""
if self.entry_traj is None:
return False
eid = self.entry_traj.eid
if eid is None or self._ignore_visibility_change:
return False
if all_included is not None and not (eid in all_included):
return False
return True
[docs] def visibilityChanged(self, displayed, undisplayed):
"""
Update saved atoms visibility according to new state. It honors
visibility changes only if visibility is updated by any component which
is not a part of player toolbar.
:type displayed: list(int)
:param displayed: List of atoms which are displayed.
:type undisplayed: list(int)
:param undisplayed: List of atoms which are undisplayed.
"""
if not self.canAcceptVisibilityChange(self.all_included):
return
eid = self.entry_traj.eid
self.entry_traj.visibilityChanged(displayed, undisplayed,
self.getStartAtom(self.entry_traj))
[docs] def workspaceEntryStructureChanged(self, atoms_bs, change_type):
"""
Slot to handle workspace entry structure changes.
It identifies type of structure change (color change, ribbon setting
change, label setting change, representation change) and accordingly
transfer changes in the saved original entry structure.
When snapshot mode is active, we don't propagate changes in the original
entry structure because those changes are transient.
:param atoms_bs: The MMbs handle of atoms which are affected by
this change.
:type atoms_bs: MMbs handle
:param change_type: Type of workspace change (color, label, or ribbon)
:type change_type: maestro_ui.MMENUM_DGO_CHANGED_TYPE
"""
if self.entry_traj is None or self.is_snapshot_mode_active:
return
workspace_st, _ = get_workspace_structure_and_entry_total()
self.entry_traj.updateTrajectoryEntryStructure(workspace_st, atoms_bs,
change_type)
[docs] def applyDisplayAtomsASLOnCT(self, entry_traj: EntryTrajectory,
frame_ct: structure.Structure):
"""
Display or hide atoms in the frame_ct based on stored atoms asl.
ASL acts as display only asl if Playback Settings -> Advanced tab
-> Display Only checked, otherwise acts as hide specified atoms.
:param frame_ct: The ct to be updated.
:param entry_traj: Entry trajectory to be used to update visibility.
"""
display_only = self.playback_settings_popup.data.display_only
visible = display_only
if display_only:
asl = self.playback_settings_popup.getDisplayAtomsAsl()
else:
asl = self.playback_settings_popup.getHideAtomsAsl()
entry_traj.applyVisibilityAsl(frame_ct, asl, display_only, visible)
[docs] def getOriginalEntryCT(self, copy: bool):
"""
:param copy: True if caller needs a copy.
:rtype: structure.Structure
:return: Copy of initial entry ct or reference to it.
"""
if copy:
st = self.entry_traj.orig_entry_ct.copy()
set_structure_atoms_entry_id(st, self.entry_traj.eid, 1,
st.atom_total)
return st
else:
return self.entry_traj.orig_entry_ct
[docs] def isVaryingAtomsFrame(self):
"""
If trajectory contains inactive atoms (i.e. varying atoms frame) or not.
:rtype: bool
:return: - For fixed atoms trajectory, it returns False.
- For varying atoms trajectory, it returns True.
"""
return self.entry_traj.isVaryingAtomsFrame()
[docs] def getUpdatedFrameSnapshot(self, frame_number: int,
frame_ct: structure.Structure,
forced_update: bool, use_display_asl: bool):
"""
Update frame ct using given frame number's associated frame.
Also update atoms visibility based on player defined visibility asl.
:param frame_number: Frame number of the frame to be used for updation.
:param frame_ct: Frame structure which will be updated.
:param forced_update: Forcefully updated (it is required when first
frame is loaded).
:param use_display_asl: Determine if display asl should also be applied
or not.
:rtype: structure.Structure
:return: Updated frame structure.
"""
old_atom_total = frame_ct.atom_total
updated_frame_ct = self.entry_traj.getUpdatedFrameStructure(
frame_number=frame_number, frame_ct=frame_ct, frames_to_smooth=None)
new_atom_total = updated_frame_ct.atom_total
# Update entry id if new atoms are added in the frame.
if new_atom_total > old_atom_total:
set_structure_atoms_entry_id(updated_frame_ct, self.entry_traj.eid,
old_atom_total + 1, new_atom_total)
if use_display_asl and (self.shouldRestoreVisibility(self.entry_traj) or
forced_update):
self.entry_traj.restoreDefaultVisibility(frame_ct)
if use_display_asl and (self.needVisibilityUpdate(self.entry_traj) or
(self._display_atoms_asl_is_valid and
forced_update)):
self.applyDisplayAtomsASLOnCT(self.entry_traj, updated_frame_ct)
return updated_frame_ct
[docs] def shouldUpdateVisibility(self, entry_traj: EntryTrajectory):
"""
Check if visibility should be updated or not. Applicable in both
restoring original visibility and apply asl based visibility.
:param entry_traj: Entry trajectory to be used to know if fixed atoms
frame ct or varying atoms frame ct.
:rtype: bool
:return: True if visibility should be updated.
"""
# If ASL is dynamic, we have to evaluate ASL every time because distance
# can vary between atoms frame to frame.
# If varying atoms frame, we have to restore because new atoms might
# have been added.
return self._display_atoms_asl_is_dynamic or (
entry_traj and entry_traj.isVaryingAtomsFrame())
[docs] def shouldRestoreVisibility(self, entry_traj: EntryTrajectory):
"""
Determine if frame ct visibility should be restored or not.
:param entry_traj: Entry trajectory to be used to know if fixed atoms
frame ct or varying atoms frame ct.
:rtype: bool
:return: True if visibility should be restored.
"""
# If asl changed, we have to restore to start from scratch.
return (self._display_atoms_asl_changed or
self.shouldUpdateVisibility(entry_traj))
[docs] def needVisibilityUpdate(self, entry_traj: EntryTrajectory):
"""
Determine if frame ct visibility should be updated or not.
:param entry_traj: Entry trajectory to be used to know if fixed atoms
frame ct or varying atoms frame ct.
:rtype: bool
:return: True if visibility should be updated.
"""
return (self._display_atoms_asl_is_valid and
self.shouldUpdateVisibility(entry_traj))
[docs] def applyDisplayAtomsASL(self, entry_traj: EntryTrajectory,
changed_structure: bool):
"""
Display or hide atoms based on stored atoms asl in the entry_traj's
frame ct.
ASL acts as display only asl if Playback Settings -> Advanced tab
-> Display Only checked, otherwise acts as hide specified atoms.
:param entry_traj: Entry trajectory to be used to update visibility.
:param changed_structure: Indicates if structure itself is changed or
not.
:rtype: bool
:return: Whether the asl is applied.
"""
# TODO: MultipleTrajectory - We need to iterate over all trajectories
# and update visiblity.
visibility_changed = False
always_apply_asl = entry_traj.isVaryingAtomsFrame()
if self.shouldRestoreVisibility(entry_traj):
entry_traj.restoreDefaultVisibility(entry_traj.frame_ct)
visibility_changed = True
display_atoms_asl_changed = self._display_atoms_asl_changed
self._display_atoms_asl_changed = False
if not self._display_atoms_asl_is_valid:
return visibility_changed
# Visibility does not depend on atom coordinate.
# If ASL did not change, structure did not change (except atom
# coordinate), so set of visible atoms should not have changed since
# the last call of this function.
# This check is intended for optimization purpose to avoid evaluating
# and applying asl for each frame.
if (not self.needVisibilityUpdate(entry_traj) and
(not changed_structure) and self.player_active and
(not display_atoms_asl_changed)):
return visibility_changed
self.applyDisplayAtomsASLOnCT(entry_traj, entry_traj.frame_ct)
return True
[docs] def updatePBCMeasurement(self, entry_traj, frame_number):
"""
Update data in maestro_ui.TrajectoryViewerHub, so that maestro
calculate measurements keeping PBC in consideration.
:param frame_number: Frame number to be displayed in workspace.
Possible values are 1 to self.total_frame
:type frame_number: int
"""
#TODO MultipleTrajectory - We need to update PBC measurement box for all
# trajectories.
# Maestro measurements are calculated keeping PBC in mind only
# if player is visible and replica is one.
honor_pbc = (entry_traj.getNumberOfReplica() == 1 and
self.isVisible() and self.isValidFrame(frame_number))
pbc_measurement_manager = self.trj_hub.getPBCMeasurementManager()
pbc_measurement_manager.setPBCMeasurementActive(honor_pbc)
# Update box values in the trajectory hub, so that maestro can
# compute measurement keeping PBC in mind.
if honor_pbc:
frame = entry_traj.getFrame(frame_number)
pbc_measurement_manager.updatePBC([
frame.box[0][0], frame.box[0][1], frame.box[0][2],
frame.box[1][0], frame.box[1][1], frame.box[1][2],
frame.box[2][0], frame.box[2][1], frame.box[2][2]
], entry_traj.getFrameActiveAtomTotal(frame_number))
[docs] def getUpdatedCT(self, entry_traj: EntryTrajectory, frame_number: int,
main_ct: structure.Structure, honor_replica: bool):
"""
Update entry_traj frame ct using given frame number and apply changes
in the main_ct either by transferring delta change or by copying frame
ct from scratch.
:param entry_traj: Entry trajectory object.
:param frame_number: Frame to be used to update main ct.
:param main_ct: The main ct which will be updated using entry traj's
frame.
:param honor_replica: Whether to honor replica. Export structure does
not support replica.
:rtype: tuple(structure.Structure, bool, bool,
maestro_ui.MMENUM_DGO_CHANGED_TYPE, dict)
:return: (Modified main_ct or a copy of frame_ct, True if original
structure is modified, True if visibility changed, DGO change
type if secondary structure assignment was performed,
renumbering dictionary (Keys are atom number before deleting and
value for each is the new atom number or None if that atom was
deleted).
"""
old_frame_atom_total = entry_traj.getAtomTotal(entry_traj.frame_ct)
trajectory_atom_total_in_main_ct = entry_traj.getTrajectoryAtomTotalInWorkspaceStructure(
main_ct)
had_replication = is_replica_atom_count(
trajectory_atom_total_in_main_ct, old_frame_atom_total)
varying_atoms_frame = entry_traj.isVaryingAtomsFrame()
num_replica = entry_traj.getNumberOfReplica()
# Optimization - we need to make a copy of original ct in the frame ct.
# Switching from replica == 1 to replica > 1 - MAE-42350
if not varying_atoms_frame and not had_replication and num_replica > 1:
entry_traj._updateSavedFrameStructure()
# Get updated frame ct from trajectory.
frame_ct = entry_traj.updateFrameStructure(frame_number)
ct_changed = False
visibility_changed = False
# We assume we do not have a connectivity change (e.g. a structural change,
# also the highest level change we have). If we do have a connectivity
# change, then we will set this flag
connectivity_changed = False
atom_id_map = None
if frame_ct is not None:
trajectory_start_atom = entry_traj.trajectory_start_atom_in_workspace_structure
frame_ct_changed = old_frame_atom_total != frame_ct.atom_total
self.updateFrameCTTitle(main_ct, frame_number)
visibility_changed = self.applyDisplayAtomsASL(
entry_traj, frame_ct_changed)
# Assign secondary structure
update_ss = (
self.playback_settings_popup.data.update_secondary_structure and
entry_traj.has_secondary_structure)
if update_ss:
entry_traj.assignSecondaryStructure()
if frame_ct_changed:
# The number of atoms has changed, so we have a connectivity
# change
connectivity_changed = True
main_ct, atom_id_map = self.syncCTAtomsFromFrameAtoms(
frame_ct, main_ct, old_frame_atom_total, entry_traj.eid,
trajectory_atom_total_in_main_ct, trajectory_start_atom)
if main_ct is None:
main_ct = frame_ct.copy()
#FIXMEREFENTRY - Varying atoms main ct is entirely replaced,
# so we won't have reference structure entry in the main ct.
# MAE-43247 should fix it.
entry_traj.reference_entry_statistics = None
set_structure_atoms_entry_id(main_ct, entry_traj.eid, 1,
main_ct.atom_total)
trajectory_atom_total_in_main_ct = self.entry_traj.getTrajectoryAtomTotalInWorkspaceStructure(
main_ct)
if honor_replica:
# Update main ct according to replicas.
replica_vector = entry_traj.getReplicateVector()
main_ct, main_ct_changed, imap = self._performReplication(
frame_ct, main_ct, numpy.prod(replica_vector),
entry_traj.eid, trajectory_atom_total_in_main_ct,
trajectory_start_atom)
if not entry_traj.isVaryingAtomsFrame():
atom_id_map = imap
# Optimization - we need to set reference to workspace
# ct to frame ct, so we don't have to transfer frame change to ct.
# Switching from replica > 1 to replica == 1 - MAE-42350
if (had_replication and not varying_atoms_frame and
num_replica == 1):
entry_traj._updateSavedFrameStructure()
connectivity_changed = True
else:
connectivity_changed = connectivity_changed or main_ct_changed
else:
# Ignore current replica values, so set explicitly [1,1,1]
replica_vector = [1, 1, 1]
main_ct_changed = False
fr = entry_traj.getFrame(frame_number)
topo.update_ct_box(main_ct, fr.box)
# Optimization: If we are not using temporary frame structure, it
# implies that we are directly updating workspace strutcture,
# so there is no need to transfer frame change - MAE-42350
if entry_traj.usingTemporaryFrameStructure():
if not connectivity_changed:
connectivity_changed = not compare_structures_connectivity(
frame_ct, main_ct)
self.transferFrameChangeToCT(frame_ct, main_ct, replica_vector,
update_ss, trajectory_start_atom)
ct_changed = frame_ct_changed or main_ct_changed
if connectivity_changed:
dgo_change = maestro_ui.MM_DGO_CONNECTIVITY_CHANGED
else:
dgo_change = maestro_ui.MM_DGO_COORDINATES_CHANGED
return (main_ct, ct_changed, visibility_changed, dgo_change,
atom_id_map)
[docs] def syncCTAtomsFromFrameAtoms(self, frame_ct: structure.Structure,
main_ct: structure.Structure,
old_frame_atom_total: int, eid: int,
trajectory_atom_total_in_main_ct: int,
trajectory_start_atom: int):
"""
Varying atoms frame does not always contain same number of atoms as in
the previous frame. main_ct represents a previously displayed frame
atoms. So we need to adjust main_ct atoms according to new frame ct.
If main_ct contains replica atoms, then we don't do anything because it
is simpler to clone frame_ct and extend in the main_ct instead of
selectively deleting/adding atoms in the middle of main_ct.
:param frame_ct: New frame ct to be used for synching atom.
:param main_ct: Main ct to be updated.
:param old_frame_atom_total: Previously displayed frame atom total.
:param eid: Entry id to be set when new atoms are added in the main_ct.
:param trajectory_atom_total_in_main_ct: Total trajectory atoms in the
workspace structure (including replica and varying atoms).
:param trajectory_start_atom: Starting position of trajectory atom in
the workspace structure.
:rtype: tuple(structure.Structure, dict)
:return: Modified main ct or unchanged main ct and renumbering
dictionary. Keys are atom number before deleting and value for each
is the new atom number or None if that atom was deleted.
"""
atom_id_map = None
frame_ct_atom_total = frame_ct.atom_total
# If main_ct is a replica of frame_ct or atom total in main_ct same as
# frame_ct, nothing to do here.
if (is_replica_atom_count(trajectory_atom_total_in_main_ct,
frame_ct_atom_total) or
trajectory_atom_total_in_main_ct == frame_ct_atom_total):
return (main_ct, atom_id_map)
# If main ct contains replica of previous frame ct, do nothing.
elif is_replica_atom_count(trajectory_atom_total_in_main_ct,
old_frame_atom_total):
return (None, atom_id_map)
# Delete extra atoms from main ct.
elif trajectory_atom_total_in_main_ct > frame_ct_atom_total:
atom_id_map = main_ct.deleteAtoms(range(
trajectory_start_atom + trajectory_atom_total_in_main_ct - 1,
trajectory_start_atom + frame_ct_atom_total - 1, -1),
renumber_map=True)
# Add extra atoms in the main ct.
elif trajectory_atom_total_in_main_ct < frame_ct_atom_total:
delta_st = frame_ct.extract(range(
trajectory_atom_total_in_main_ct + 1, frame_ct_atom_total + 1),
copy_props=False)
main_ct.extend(delta_st)
set_structure_atoms_entry_id(
main_ct, eid,
trajectory_start_atom + trajectory_atom_total_in_main_ct,
trajectory_start_atom + frame_ct_atom_total - 1)
return (main_ct, atom_id_map)
[docs] def updateStructureFromFrame(self, frame_number: int,
main_ct: structure.Structure,
notify_maestro: bool):
"""
Validate main ct and update main_ct using given trajectory frame.
If notify_maestro is True, notify maestro to display updated main ct
in the workspace as well.
:param frame_number: Frame number to be displayed in workspace.
Possible values are 1 to self.total_frame
:param main_ct: Workspace ct to be updated using given frame.
:param notify_maestro: Whether to notify maestro about workspace changes.
"""
# TODO: MultipleTrajectory
# Validate structure
if (self.entry_traj.isFrameStructureChanged(main_ct) and
(not self._handleInvalidStructures(self.entry_traj))):
return False
(main_ct, changed_structure, visibility_changed, dgo_change,
atom_id_map) = self.getUpdatedCT(self.entry_traj,
frame_number,
main_ct,
honor_replica=True)
self.updatePBCMeasurement(self.entry_traj, frame_number)
if main_ct is not None:
self._ignore_visibility_change = True
if notify_maestro:
# Update box properties in the project according to current
# frame, so that workspace drawing can update entry pbcs
self.entry_traj.updateBoxPropertiesInProject(frame_number)
self.notifyMaestro(main_ct, changed_structure,
visibility_changed, dgo_change, atom_id_map)
self._ignore_visibility_change = False
# Save movie frame if save movie option is active.
if self.savingMovie():
self._movie_saver.saveFrameForMovie()
else:
if self.savingMovie():
self.stopSavingMovie(ok_status=False)
return False
return True
[docs] def transferFrameChangeToCT(self, frame_ct: structure.Structure,
main_ct: structure.Structure,
replica_vector: tuple, update_ss: bool,
trajectory_start_atom: int):
"""
Copy frame ct changes - position, secondary structure, and visibility
in the main ct.
The main ct may not exactly have same number of atoms as in the frame ct if
replica_vector != (1,1,1)
:param frame_ct: Frame ct to be used as reference.
:param main_ct: Workspace ct to be updated using frame_ct.
:param replica_vector: Replication vector in a,b,c direction.
:param update_ss: Whether secondary structure properties needs to be
updated.
:param trajectory_start_atom: Starting position of trajectory atom in
the workspace structure.
Note: This function modifies the main_ct.
"""
# TODO: MultipleTrajectory
frame_ct_atom_total = frame_ct.atom_total
num_replica = numpy.prod(replica_vector)
for src_at in range(1, frame_ct_atom_total + 1):
visible = mm.mmctg_atom_get_visible(frame_ct, src_at)
if update_ss:
ss = mm.mmct_atom_get_secondary_struct(frame_ct, src_at)
atom_offset = trajectory_start_atom - 1 + src_at
for replica in range(num_replica):
dest_at = atom_offset + (frame_ct_atom_total * replica)
mm.mmctg_atom_set_visible(main_ct, dest_at, visible)
if update_ss:
mm.mmct_atom_set_secondary_struct(main_ct, dest_at, ss)
# Update xyz position in the entire ct.
xyz = frame_ct.getXYZ(copy=False)
if self.entry_traj.allow_reference_entry:
if num_replica > 1:
unroll_pos(main_ct, replica_vector, xyz, trajectory_start_atom)
else:
main_ct_xyz = main_ct.getXYZ(copy=False)
main_ct_xyz[trajectory_start_atom - 1:trajectory_start_atom -
1 + frame_ct_atom_total] = xyz
main_ct.setXYZ(main_ct_xyz)
else:
if num_replica > 1:
utils.unroll_pos(main_ct, replica_vector, xyz)
else:
main_ct.setXYZ(xyz)
[docs] def notifyMaestro(self, main_ct: structure.Structure,
changed_structure: bool, visibility_changed: bool,
dgo_change: maestro_ui.MMENUM_DGO_CHANGED_TYPE,
atom_id_map: dict):
"""
Notify maestro about workspace ct changes and update simulation box.
:param main_ct: Structure to be set as workspace ct if structure was
changed.
:param changed_structure: True if st is different than workspace ct.
:param visibility_changed: True if atoms visibility changed in the st.
:param dgo_change: Type of dgo change.
:type dgo_change: maestro.MMENUM_DGO_CHANGED_TYPE
:param atom_id_map: A dictionary of old to new atoms. Keys are atom
number before deleting and value for each is the new atom number or None
if that atom was deleted.
"""
if changed_structure:
maestro.workspace_set(main_ct,
copy=False,
regenerate_markers=False,
check_scratch_entry=False)
mmct_imap = get_mmct_id_map_from_dict(atom_id_map)
self.trj_hub.allAtomsChanged.emit(main_ct, dgo_change, mmct_imap)
if mm.mmct_id_map_in_use(mmct_imap):
mm.mmct_id_map_delete(mmct_imap)
if not changed_structure:
if visibility_changed:
self.trj_hub.clearAtomsVisibility.emit()
self.trj_hub.allAtomsChanged.emit(
main_ct, maestro_ui.MM_DGO_VISIBILITY_CHANGED)
self.updateSimulationBox()
[docs] def updateFrameCTTitle(self, st, frame_number):
"""
Update given frame ct title based on frame number.
:type st: structure.Structure
:param st: Frame structure
:type frame_number: int
:param frame_number: Frame number to be displayed in workspace.
"""
st.title = f"frame{frame_number}"
[docs] def getStartFrame(self):
"""
Return start frame number. Internally frame numbering uses 0-based
index, but user interface uses 1-based index.
"""
return self.current_min_value
[docs] def getEndFrame(self):
"""
Return end frame number.
"""
return self.current_max_value
[docs] def takeTrajectoryStep(self, curr_frame):
"""
Return next frame number based on player direction, step size, and
loop option.
"""
new_curr_frame = self.getNextFrameNumber(self.play_direction,
curr_frame)
if self.isValidFrame(new_curr_frame):
pass
# If we did not get valid frame, it implies we have reached to end of
# foward or backwand direction.
elif self.loop_option == Loop.NONE:
pass
elif self.loop_option == Loop.OSCILLATE:
# Reverse play direction when oscillating.
if self.play_direction == Direction.BACKWARD:
self.play_direction = Direction.FORWARD
else:
self.play_direction = Direction.BACKWARD
new_curr_frame = self.getNextFrameNumber(self.play_direction,
curr_frame)
else:
if self.play_direction == Direction.FORWARD:
start_frame = self.getStartFrame()
# Get the number of steps back to start frame.
num_steps = (curr_frame - start_frame) / self.step_size
new_curr_frame = int(curr_frame - (num_steps * self.step_size))
else:
end_frame = self.getEndFrame()
# Get the number of steps back to end frame.
num_steps = (end_frame - curr_frame) / self.step_size
new_curr_frame = int(curr_frame + (num_steps * self.step_size))
if self.isValidFrame(new_curr_frame):
return new_curr_frame
[docs] def isValidFrame(self, curr_frame):
"""
Return true if given frame number is valid.
"""
start_frame = self.getStartFrame()
end_frame = self.getEndFrame()
return (curr_frame and start_frame <= curr_frame <= end_frame)
def _performReplication(self, frame_ct: structure.Structure,
main_ct: structure.Structure,
num_required_copies: int, eid: int,
trajectory_atom_total_in_main_ct: int,
trajectory_start_atom: int):
"""
Extend main ct or delete atoms from it based num of required replicas.
setting.
:param frame_ct: Source ct to be used for replication.
:param main_ct: Destination ct to be updated.
:param num_required_copied: Number of replicas to be created.
:param eid: is the entry id to be set on newly added atoms.
:param trajectory_atom_total_in_main_ct: Total trajectory atoms in the
workspace structure (including replica and varying atoms).
:param trajectory_start_atom: Starting position of trajectory atom in
the workspace structure.
:rtype: tuple (structure.Structure, bool, dict)
:return: Modified main ct and whether atom count changed in the main ct,
and renumbering dictionary. Keys are atom number before deleting
and value for each is the new atom number or None if that atom was
deleted.
"""
atom_id_map = None
orig_atom_total = frame_ct.atom_total
num_existing_copies = trajectory_atom_total_in_main_ct // orig_atom_total
if num_required_copies == num_existing_copies:
return (main_ct, False, atom_id_map)
if num_required_copies > num_existing_copies:
for i in range(num_required_copies - num_existing_copies):
main_ct.extend(frame_ct)
set_structure_atoms_entry_id(
main_ct, eid,
trajectory_start_atom + (num_existing_copies * orig_atom_total),
trajectory_start_atom +
(num_required_copies * orig_atom_total) - 1)
else:
atom_id_map = main_ct.deleteAtoms(range(
trajectory_atom_total_in_main_ct,
orig_atom_total * num_required_copies, -1),
renumber_map=True)
return (main_ct, True, atom_id_map)
[docs] def setSnapshotPanelVisibility(self, visible: bool):
"""
Set snapshot visiblity state.
:param visible: True if visible.
"""
self._snapshot_panel_visible = visible
@property
def is_snapshot_panel_visible(self):
"""
:return: True if snapshot panel is visible.
:rtype: bool
"""
return self._snapshot_panel_visible
[docs] def setReferenceEntryStatistics(self, use_full_system_ct: bool):
"""
Set reference entry statistics for the current trajectory if allowed.
:param use_full_system_ct: When trajectory is initially loaded, this flag is
True indicating that full system ct should be used as reference because
for varying atoms, frame ct may not be in sync with full system ct.
"""
main_ct, entry_total = get_workspace_structure_and_entry_total()
entry_traj = self.entry_traj
if entry_total == self.MAX_ALLOWED_ENTRIES_IN_WORKSPACE:
# When setting up a trajectory, frame ct is not yet loaded in the
# main ct, so we should use original entry ct atom total as frame_ct
# atom total for varying frames atoms.
if use_full_system_ct:
frame_atom_total = entry_traj.cms_model.fsys_ct.atom_total
else:
frame_atom_total = entry_traj.getFrameActiveAtomTotal(
self.current_value)
num_replica = entry_traj.getNumberOfReplica()
entry_traj.reference_entry_statistics = ReferenceEntryStatistics(
main_ct, self.getEid(), frame_atom_total, num_replica)
update_saved_frame_structure = True
else:
# Need to save the frame structure only when switching from
# reference entry + trajectory entry to only trajectory entry
update_saved_frame_structure = entry_traj.allow_reference_entry
entry_traj.reference_entry_statistics = None
if update_saved_frame_structure:
entry_traj._updateSavedFrameStructure()
[docs] def inclusionUpdated(self, included, excluded, all_included,
previous_included):
"""
Slot associated with WorkspaceHub.inclusionChanged() signal.
:param included: List of included entry ids in the workspace.
:type included: list(int)
:param excluded: List of excluded entry ids from the workspace.
:type excluded: list(int)
:param all_included: List of currently included entry ids in the
workspace ct, it does not guarantee that project also has same
included entries at the moment because this slot may be triggered
before updating the entry workspace state in the project.
:type all_included: list(int)
:param previous_included: List of previously included entry ids
in the workspace ct.
:type all_included: list(int)
"""
if (not self.isVisible()) and (not self.is_snapshot_panel_visible):
return
# Do not load trajectory when inclusionUpdated() signal is triggered
# because loading trajectory involves changing the workspace which may
# again trigger inclusionUpdated(). So use singleshot timer to handle
# inclusion update.
self.all_included = all_included
self.previous_included = previous_included
self.inclusion_update_timer.start()
def _getTrajectoryEntryIdFromIncludedEntries(self, all_included: list,
previous_included: list):
"""
If exactly 2 entries are included and one is a trajectory entry and
another is a regular entry, it returns trajectory entry id.
If exactly one entry is included and it is a trajectory entry, it
returns trajectory entry id.
If exactly 2 entries are included and both are trajectory entry, then
returns a newly included entry's entry id.
In all other case, it returns None. Example other cases:-
If more than 2 entries are included, it always return None.
If there is no included trajectory entry, it always return None.
:return trajectory entry from all_included entries, otherwise None.
:rtype: int
"""
# Check if there is only one entry included and it has trajectory
ws_entry_total = len(all_included)
allowed_entries_total = ws_entry_total == self.MAX_ALLOWED_ENTRIES_IN_WORKSPACE
if ws_entry_total == 1 and projutils.has_trajectory(
self.proj, all_included[0]):
return all_included[0]
elif allowed_entries_total:
# Both must not be a trajectory entry because we do not support
# multiple trajectory at a time, but other can be a regular entry.
first_has_trajectory = projutils.has_trajectory(
self.proj, all_included[0])
second_has_trajectory = projutils.has_trajectory(
self.proj, all_included[1])
both_has_trajectory = first_has_trajectory and second_has_trajectory
if (not both_has_trajectory) and (first_has_trajectory or
second_has_trajectory):
return (all_included[0]
if first_has_trajectory else all_included[1])
if both_has_trajectory:
return (all_included[1] if previous_included[0]
== all_included[0] else all_included[0])
def _handleInclusionUpdate(self):
"""
Load the trajectory only if <= MAX_ALLOWED_ENTRIES_IN_WORKSPACE
are included and one of these is a trajectory entry.
"""
if self.all_included is None:
return
all_included = [eid for eid in self.all_included if eid > 0]
all_included_count = len(all_included)
has_scratch_entry = all_included_count != len(self.all_included)
previous_included = [eid for eid in self.previous_included if eid > 0]
self.all_included = None
self.previous_included = None
eid = self._getTrajectoryEntryIdFromIncludedEntries(
all_included, previous_included)
has_trajectory = eid is not None
traj_eid = None
if self.entry_traj:
traj_eid = self.entry_traj.eid
sync_workspace = traj_eid in all_included
# Trajectory player does not support scratch entry.
if has_scratch_entry:
if traj_eid is not None:
self._restoreDefaultStateOfEntryCt(
sync_workspace=sync_workspace)
return
# If two entries are already included and user is including 3rd entry.
if all_included_count > self.MAX_ALLOWED_ENTRIES_IN_WORKSPACE:
# If newly included entry is a trajectory entry, warn user that
# multiple entries are not allowed.
new_included = list(set(all_included) - set(previous_included))
for entry_id in new_included:
if projutils.has_trajectory(self.proj, entry_id):
self._validateIfMultipleEntriesAllowed(None)
break
# Previously there were two entries in the workspace, but one
# of those was a loaded trajectory, we have to unload existing
# trajectory as well.
if traj_eid is not None:
self._restoreDefaultStateOfEntryCt(
sync_workspace=sync_workspace)
return
# Entry's trajectory is already loaded, do nothing
if traj_eid == eid and has_trajectory:
if (self.reference_entry_id is None or
self.reference_entry_id not in all_included):
# Trajectory is already loaded and user included another entry,
# so it is possible that trajectory is present with replica, so
# don't use full system ct.
self.setReferenceEntryStatistics(use_full_system_ct=False)
# Trajectory entry with varying atoms is already included and
# user is attempting to include another reference entry.
# This should not be allowed to user.
if not self._validateIfMultipleEntriesAllowed(self.entry_traj):
self._restoreDefaultStateOfEntryCt(
sync_workspace=sync_workspace)
return
# We enter in this if block only if there is a single trajectory
# entry in the workspace or two entries (one trajectory and
# another regular entry). So as long as we are allowing other entry
# we can return safely.
if all_included_count == 1 or self.entry_traj.allow_reference_entry:
return
if traj_eid is not None:
self._restoreDefaultStateOfEntryCt(sync_workspace=sync_workspace)
# If trajectory was already loaded and was allowed to have another
# entry or only trajectory entry was present, then we should have
# early returned.
# If we reach here, it implies that we are not allow to keep
# trajectory active.
# Example use case: varying atoms trajectory is already loaded and
# later user includes regular entry.
if (all_included_count == self.MAX_ALLOWED_ENTRIES_IN_WORKSPACE and
not has_trajectory):
return
if has_trajectory and vt.validate_trajectory_path(
utils.get_trajectory_path(self.proj, eid), eid, self.parent()):
self.setupTrajectory([eid], False)
def _restoreDefaultStateOfEntryCt(self, sync_workspace):
"""
Restore default state of entry ct and clear trajectory viewer.
:type sync_workspace: bool
:param sync_workspace: Whether or not synchronize changes in the
workspace.
"""
if self.entry_traj:
self.restoreEntryCT(eid=self.entry_traj.eid,
sync_workspace=sync_workspace)
self.clearTrajectory()
@property
def is_snapshot_mode_active(self):
"""
:rtype: bool
:return: True if display frames from snapshot panel is activated.
"""
return self._snapshot_mode_active
@is_snapshot_mode_active.setter
def is_snapshot_mode_active(self, state: bool):
"""
Set snapshot mode.
"""
self._snapshot_mode_active = state
[docs] def clearTrajectory(self):
"""
Stop current trajectory play if player is active.
Delete trajectory object.
Disable player controls.
Clear all trajectory related gui control values.
"""
# When snapshot mode is active, we should not clear trajectory
# because snapshot panel depends on same trajectory used by player
# toolbar. Ideally, player toolbar, project table, entrylist, main
# window are disabled when snapshot mode is active, but it is still
# possible to trigger clearTrajectory() when user excludes all frames
# from the snapshot panel.
if self.is_snapshot_mode_active:
return
self.frame_timer.stop()
if self.entry_traj:
del self.entry_traj
self.setEntryTraj(None)
self.trj_hub.getPBCMeasurementManager().setPBCMeasurementActive(False)
if self._trj_maestro_settings:
self._trj_maestro_settings.restore()
self.total_time = 0
self.total_frame = 0
self.elapsed_time = 0
self.step_size = 0
# Range must be > 0 and between 1 to N.
self.frame_slider.setRange(1, 2)
self.player_active = False
self.setToolbarEnabled(False)
self.loop_option = Loop.NONE
self.play_direction = Direction.FORWARD
self._display_atoms_asl_is_valid = False
self._display_atoms_asl_is_dynamic = True
self._display_atoms_asl_changed = False
self.simbox_visible = False
self._ignore_visibility_change = False
self._movie_saver.clearData()
self.trajectoryUnloaded.emit()
@property
def proj(self):
"""
Return current maestro project object.
"""
return maestro.project_table_get()
def _validateStructure(self, entry_traj: EntryTrajectory):
"""
Validates entry structure with trajectory structure, resets it if
required based on user selection.
:rtype: bool
:return: Whether the structure is valid.
"""
current_frame = entry_traj.settings_data.current_frame
if entry_traj.entryStructureIsValid(current_frame):
return True
# Until MAE-41745 and MAE-41746 are fixed, we can not reliably
# ensure that entry structure is valid or not, so we
# unconditionally set it to full system ct.
if (entry_traj.isVaryingAtomsFrame() and
entry_traj.resetStructure(sync_workspace=True)):
return True
return self._handleInvalidStructures(entry_traj)
def _handleInvalidStructures(self, entry_traj: EntryTrajectory):
"""
Display invalid structure dialog with 'Cancel' and 'Reset' options,
and perform action accordingly.
:rtype: bool
:return: Whether invalid structure is reset.
TODO: MultipleTrajectory
"""
ret = trajectory_messages.show_invalid_structure_dlg()
if ret == MessageButton.RESET:
# This will change to loop for multiple structures.
return entry_traj.resetStructure(sync_workspace=True)
return False
[docs] def getEid(self):
"""
Return current trajectory entry id.
"""
return self.entry_traj.eid if self.entry_traj else None
@property
def reference_entry_id(self):
"""
:return: Return reference entry id is there is any.
"""
if self.entry_traj:
return self.entry_traj.reference_entry_id
def _validateIfMultipleEntriesAllowed(self, entry_traj: EntryTrajectory):
"""
Verify if multiple entries are allowed when trajectory is loaded
and display warning appropriately.
:param entry_traj: Entry trajectory instance.
:return: True if user is allowed to keep multiple entries.
:rtype: bool
"""
_, entry_total = get_workspace_structure_and_entry_total()
if entry_total > self.MAX_ALLOWED_ENTRIES_IN_WORKSPACE:
text = textwrap.dedent(f"""
The Trajectory cannot be played when there are more
than {self.MAX_ALLOWED_ENTRIES_IN_WORKSPACE} entries included in the Workspace.
""")
title = 'Trajectory Player - Too Many Entries'
QtWidgets.QMessageBox.warning(None, title, text)
return False
elif (entry_total == self.MAX_ALLOWED_ENTRIES_IN_WORKSPACE and
entry_traj.isVaryingAtomsFrame() and self.reference_entry_id and
(not entry_traj.reference_entry_included_first)):
text = textwrap.dedent("""
The current trajectory has a variable number of atoms per frame.
To play this trajectory with a second entry in the Workspace,
the trajectory must be included after the other entry.
To enable the player, exclude the trajectory entry and include it again.
""")
title = 'Cannot Run with Second Entry'
QtWidgets.QMessageBox.warning(None, title, text)
return False
return True
[docs] @wait_cursor
def setupTrajectory(self, eids, show_player):
"""
Setup trajectory settings in the player for a given list of entry ids.
:param eids: The list of entry ids to be used for trajectory playing.
:type eids: list(int)
:param show_player: Determine if player should also be shown.
:type bool: Show player if True.
:param multiple_included: True if more than one entries are included in
the workspace at the time of setting up trajectory.
:type multiple_included: bool
Note: current only single trajectory is supported, so eids is a list of
single entry.
"""
if not eids:
maestro.warning(
"Trajectory can be visualized with valid project entry.")
return
# Only single trajectory support for now.
eid = eids[0]
# If user clicks on same entry's T button and its trajectory is already
# visible, then don't do anything.
if self.entry_traj and self.entry_traj.isSameTrajectory(eid):
# It is possible that trajectory was loaded by snapshot panel
# and now user clicked on T button to show the player toolbar,
# so we should honor user request to display player with current
# frame.
if show_player:
self.show()
self.updateCurrentFrame()
return
entry_traj = EntryTrajectory(self.proj,
eid,
matsci_profile=self.matsci_profile)
if not entry_traj.isValid() or entry_traj.isSingleFrameTrajectory():
return
valid_structure = self._validateStructure(entry_traj)
if not valid_structure:
return
self.setEntryTraj(entry_traj)
# Trajectory is being loaded, so workspace ct won't always represent
# same number of atoms as in the frame ct for varying atoms frame, so for
# the consistency, always use full system ct.
self.setReferenceEntryStatistics(use_full_system_ct=True)
if not self._validateIfMultipleEntriesAllowed(entry_traj):
self.setEntryTraj(None)
return
self.setToolbarEnabled(True)
if show_player:
self.show()
self.total_frame = self.entry_traj.total_frame
# Create PlaybackSettings popup
has_pbsettings_data = self.entry_traj.hasPlaybackSettingsData()
# Set placback settings data
self.playback_settings_popup.setPlaybackSettingsData(
self.entry_traj.settings_data, has_pbsettings_data,
self.entry_traj.cms_model.fsys_ct.atom_total)
data = self.playback_settings_popup.data
self._trj_maestro_settings = TrajectoryMaestroSettings(
self.proj, eid,
self.playback_settings_popup.updateAdjustPositionCheckBox,
data.translate_to_first_unit_cell)
# Connect to 'Playback Settings' popup for change settings
self.connectToPlaybackSettingsPopup()
self.player_was_paused = False
self.frame_slider.setRange(1, self.total_frame)
self.updateSliderValues(data.start_frame, data.end_frame,
data.current_frame)
self.settingsChanged(forced_update=False)
if self.isVisible():
self.updateCurrentFrame()
self.trajectoryLoaded.emit(self.total_frame)
[docs] def updateSliderValues(self, start_frame, end_frame, current_frame):
"""
Update slider values based on start, end, and current frame.
"""
self.frame_slider.setCurrentRange(start_frame, end_frame)
self.current_value = current_frame
self.frame_validator.setRange(self.current_min_value,
self.current_max_value)
[docs] def updateFrameTimerInterval(self, frame_duration):
"""
Updates frame_timer interval using the frame_duration
"""
self.frame_timer.setInterval(frame_duration * 1000 + 0.5)
[docs] def updateStepSize(self, step_size, update_player=True):
"""
Updates step size.
:param step_size: Step size to be used while stepping in the
trajectory. Possible values are 1-99 or self.endFrame() - self.startFrame().
:type step_size: int
:param update_player: Indicates whether or not update player toolbar
gui.
:type update_player: bool
"""
self.step_size = step_size
if update_player:
self.updatePlayer()
def _checkShowPlotsPanel(self):
"""
Check if the Plots panel should be shown and, if so, show it.
"""
plot_panel = self.trajectory_plot_panel
if not plot_panel.isVisible() and plot_panel.getPlotCount() > 0:
plot_panel.show()
def _onAtomSelectChanged(self):
enable_single, enable_multi = self._areSingleAndMultiAtomsSelected()
self.atomsSelectedToggled.emit(enable_single)
self.multiAtomsSelectedToggled.emit(enable_multi)
def _areSingleAndMultiAtomsSelected(self):
if not maestro:
return False, False
num_selected_atoms = len(maestro.selected_atoms_get())
single_select = num_selected_atoms > 0
multi_select = num_selected_atoms > 1
return single_select, multi_select
[docs] def updateViewPosition(self, forced_update=True):
"""
Update trajectory based on view position tab settings and adjust frame.
To center molecule, ASL must be valid and matched atoms must be at least
1.
To align molecules, ASL must be valid and matched atoms must be at least
3.
:param forced_update: Copy all frame's position array and adjust frame
in the workspace using new view position setting.
:type forced_update: bool
"""
asl = self.playback_settings_popup.getAVPAsl()
# If player is not active and user modified asl, then apply it
# immmediately.
if (self.entry_traj.updateViewPosition(forced_update, asl) and
(not self.player_active) and forced_update):
self.updateCurrentFrame()
[docs] def translateToFirstUnitCellChanged(self):
"""
Update player according to 'Translate to first unit cell' and
simulation box settings
"""
# Update simulation box
data = self.playback_settings_popup.data
self.simbox_visible = data.show_simulation_box
self._trj_maestro_settings.setMaxPBCAtomsForUnitCell(
self.playback_settings_popup.data.translate_to_first_unit_cell)
[docs] def settingsChanged(self, forced_update=True):
"""
Update player according to current PlaybackSettings
:param forced_update: Whether to update current frame
:type forced_update: bool
"""
data = self.playback_settings_popup.data
self.updateFrameTimerInterval(data.frame_duration)
self.updateStepSize(data.step, False)
self.updatePlayDirection(data.direction, False)
self.updateLoopOption(data.loop)
self.updateViewPosition(False)
self.handleAdvancedSettings(False)
if forced_update:
self.updateCurrentFrame()
[docs] def handleAdvancedSettings(self, forced_update=True):
"""
Update player according to the advanced settings data
"""
data = self.playback_settings_popup.data
self.simbox_visible = data.show_simulation_box
self.setDisplayAtomsASL(None, forced_update=False)
# If user turned off option, clear data.
if not data.update_secondary_structure:
self.entry_traj.ss_data.clearData()
[docs] def updateLoopOption(self, loop_option):
"""
Update loop option (NONE, SINGLE, OSCILLATE)
"""
self.loop_option = loop_option
# Whenever loop option is changed, also update play direction
# because we may be using transient play direction due to
# loop -> bidirectional option.
self.updatePlayDirection(self.playback_settings_popup.data.direction,
update_player=False)
[docs] def updatePlayDirection(self, play_direction, update_player=True):
"""
Update play direction icon and direction member (FORWARD, BACKWARD)
:param play_direction: Direction of play as specified by
trajectory_gui_dir.playback_settings_popup.data.Direction.
:type play_direction: enum(Direction)
:param update_player: Indicates whether or not update player toolbar
gui.
:type update_player: bool
"""
self.play_or_pause.setProperty("direction", play_direction.value)
qt_utils.update_widget_style(self.play_or_pause)
self.play_direction = play_direction
if update_player:
self.updatePlayer()
[docs] def showAdvancedSettings(self):
"""
Shows 'Advanced Playback Settings' dialog
"""
if self.playback_settings_popup:
self.playback_settings_popup.showAdvancedSettings()
[docs] def addVerticalLine(self, base_layout):
"""
Add a vertical line of fixed width in the given layout
"""
vertical_line = QtWidgets.QFrame()
vertical_line.setObjectName("vertical_line")
vertical_line.setFrameStyle(QtWidgets.QFrame.VLine)
vertical_line.setFrameShadow(QtWidgets.QFrame.Plain)
vertical_line.setLineWidth(1)
base_layout.addWidget(vertical_line)
[docs] def snapshotModeChanged(self, state: bool):
"""
When snapshot panel mode activates, we should disable player toolbar and
hide simulation box.
:param state: Determine if snapshot panel mode activated or deactivated.
"""
# When snapshot mode active, we want to disconnect SH, so that workspace
# structure changes are not reflected in the SH and block updates in
# other maestro components.
if state:
self.trj_hub.trajectoryPlayerStateChanged.emit(state)
self.is_snapshot_mode_active = state
self.setToolbarEnabled(not self.is_snapshot_mode_active)
ps_popup = self.playback_settings_popup
simbox_visible = ps_popup is not None and ps_popup.data.show_simulation_box
self._ignore_visibility_change = state
if state:
# If simulation box is visible, then only hide it.
if simbox_visible:
self.simbox_visible = False
else:
# If simulation box was visible, then only show it.
if simbox_visible:
self.simbox_visible = True
if not self.is_snapshot_mode_active:
st = self.getOriginalEntryCT(copy=True)
# Since we already got a copy of st, so workspace_set can own this
# structure itself.
maestro.workspace_set(st, copy=False, check_scratch_entry=False)
# When coming out of snapshot mode, we need to ensure that
# trajectory's frame structure is now referred to appropriate structure.
self.entry_traj._updateSavedFrameStructure()
# If player toolbar is visible, then set workspace based on current
# frame.
if self.isVisible():
self.updateCurrentFrame()
# When snapshot mode is deactivated, we want to make sure that SH and
# other components are updated using current workspace ct rather
# than workspace ct which was present in the active snapshot
# mode.
if not state:
self.trj_hub.trajectoryPlayerStateChanged.emit(state)
[docs] def snapshotPanelClosed(self):
"""
Snapshot panel can load trajectory without showing a player toolbar, so
make sure that trajectory is unloaded if player is not visible.
"""
# If player is visible, we don't need to clear trajectory because it is
# also shared by the player toolbar.
if not self.isVisible():
self._restoreDefaultStateOfEntryCt(True)
[docs] def getFrameToActiveAtomTotalMap(self):
"""
:return: A list of active atom total of all frames
:rtype: list(int)
"""
if self.entry_traj:
return self.entry_traj.getFrameToActiveAtomTotalMap()
[docs]class MovieSaver:
"""
This class provides ability to save trajectory movie.
"""
[docs] def __init__(self, player_obj):
"""
:type player_obj: TrajectoryPlayer
:param player_obj: Trajectory player toolbar object.
"""
self.player_obj = player_obj
self.trj_hub = player_obj.trj_hub
self._dlg = None
self.clearData()
[docs] def clearData(self):
"""
Clean all data members.
"""
self._movie_file_path = None
self._saving_movie = False
self._saved_frame_count = 0
[docs] def exportMovie(self):
"""
Display a export movie dialog and collect all necessary inputs
to save a movie.
"""
# If we are saving movie already, don't proceed.
if self.savingMovie():
return
if self._dlg is None:
self._dlg = export_movie.ExportMovieDialog(self.player_obj)
self._movie_file_path = self._dlg.showDlg(
self.player_obj.current_min_value,
self.player_obj.current_max_value, self.player_obj.step_size,
self.player_obj.total_frame)
if self._movie_file_path:
self._startSavingMovie()
def _startSavingMovie(self):
"""
Stores current frame, start, end, direction, and loop options
temporarily and set values as specfied by user and starts saving movie
frame by frame.
"""
# If we are saving movie already, don't proceed.
if self.savingMovie():
return
# Save current player settings.
self.start_frame = self.player_obj.current_min_value
self.end_frame = self.player_obj.current_max_value
self.current_frame = self.player_obj.current_value
self.loop_option = self.player_obj.loop_option
self.play_direction = self.player_obj.play_direction
# Create temporary directory path where movie frames are stored.
dir_path = maestro_ui.mm_get_movie_frames_dir_path()
if not os.path.exists(dir_path):
os.mkdir(dir_path)
# Make sure if old frames are present, all are deleted.
else:
self._cleanDirectory(dir_path, False)
# Set player state according to movie settings.
start_frame = self._dlg.getStartFrameValue()
end_frame = self._dlg.getEndFrameValue()
current_frame = start_frame
self.player_obj.updateSliderValues(start_frame, end_frame,
current_frame)
self.player_obj.updateLoopOption(Loop.NONE)
self.player_obj.updatePlayDirection(Direction.FORWARD, False)
# Start playing entries.
self._saving_movie = True
self.player_obj.setToolbarEnabled(False)
self.player_obj.player_active = True
def _cleanDirectory(self, dir_path, remove_dir=True):
"""
Remove all files from the given directory path.
:type dir_path: str
:param dir_path: Directory path to be cleaned up.
:type remove_dir: bool
:param remove_dir: Remove directory also.
"""
if dir_path and os.path.exists(dir_path):
for f in glob.glob(os.path.join(dir_path, "*.jpeg")):
os.remove(f)
if remove_dir:
os.rmdir(dir_path)
[docs] def stopSavingMovie(self, ok_status=True):
"""
Restore current frame, start, end, direction, and loop options set
before saving movie and notify maestro to save in the movie file.
Function is a no-op if we were not saving a movie.
:type ok_status: bool
:param ok_status: True if stopping movie gracefully (i.e. all frames are
saved successfully)
"""
# If we are not saving movie, don't proceed.
if not self.savingMovie():
return
self._saving_movie = False
self.player_obj.player_active = False
if ok_status:
total = self._dlg.getTotalMovieFrames()
image_files = maestro_ui.mm_get_movie_files_path(total)
data = self._dlg.getMovieData()
success = self.trj_hub.requestToMakeVideo(self._movie_file_path,
image_files, data)
if not success:
maestro.warning("Failed to save movie in %s" %
self._movie_file_path)
self.clearData()
self.player_obj.setToolbarEnabled(True)
dir_path = maestro_ui.mm_get_movie_frames_dir_path()
self._cleanDirectory(dir_path)
self.player_obj.updateSliderValues(self.start_frame, self.end_frame,
self.current_frame)
self.player_obj.updateLoopOption(Loop.NONE)
self.player_obj.updatePlayDirection(Direction.FORWARD, False)
self.player_obj.updateCurrentFrame()
self.trj_hub.requestToRestoreWorkspaceSizeFromMovieMode.emit()
[docs] def savingMovie(self):
"""
:rtype: bool
:return: True if saving movie.
"""
return self._saving_movie