import collections
import contextlib
import copy
import itertools
import os
import typing
import warnings
import weakref
from collections import defaultdict
from collections import deque
from collections import namedtuple
from functools import partial
import schrodinger
from schrodinger import project
from schrodinger import structure
from schrodinger.application.msv import seqio
from schrodinger.application.msv.gui import gui_alignment
from schrodinger.application.msv.gui.viewconstants import Inclusion
from schrodinger.infra import util
from schrodinger.models import diffy
from schrodinger.protein import align
from schrodinger.protein import alignment
from schrodinger.protein import annotation
from schrodinger.protein import residue
from schrodinger.protein import seqres
from schrodinger.protein import sequence
from schrodinger.Qt import QtCore
from schrodinger.Qt import QtWidgets
from schrodinger.structutils import analyze
from schrodinger.ui import maestro_ui
from schrodinger.ui.qt.appframework2 import maestro_callback
from schrodinger.utils import fileutils
from schrodinger.utils import scollections
maestro = schrodinger.get_maestro()
pymol = None
SCRATCH_ENTRY_ID = -1
class _EntryData(dict):
"""
Information about a single entry from the current Maestro project.
:ivar has_seqres: Whether the entry has SEQRES records.
:vartype has_seqres: bool
"""
def __init__(self, has_seqres):
"""
:param has_seqres: Whether the entry currently has SEQRES records.
:type has_seqres: bool
"""
super().__init__()
self.has_seqres = has_seqres
def _gen_renumbered_res_map(seq, start, increment, preserve_icode):
"""
Generate maps mapping old residue numbers to new residue numbers given
a renumbering scheme (i.e. a sequence, a resnum to start, and an increment)
Returns two maps, a map for all nonstructured residues and a map for all
structured residues. These maps can be used to renumber residues using
`seq.renumberResidues`.
:param seq: The sequence to renumber.
:type seq: protein.sequence.AbstractSingleChainSequence
:param start: The number to start the renumbering with
:type start: int
:param increment: The amount to increment while numbering
:type increment: int
:param preserve_icode: Whether to keep the inscode for the residues. If
False, all inscodes will be set to " ".
:type preserve_icode: bool
:rtype: tuple(dict[(int, str), (int, str)], dict[(int, str), (int,str)])
"""
# Create map of old residue numbers to new residue numbers
new_resnum = start
nonst_resnum_mapping = {} # nonstructured residues
st_resnum_mapping = {}
for res in seq:
if res.is_res:
new_inscode = res.inscode if preserve_icode else " "
old_rescode = res.getChainKey()
new_rescode = (new_resnum, new_inscode)
if res.hasStructure():
st_resnum_mapping[old_rescode] = new_rescode
else:
nonst_resnum_mapping[old_rescode] = new_rescode
new_resnum += increment
return nonst_resnum_mapping, st_resnum_mapping
[docs]class RenumberResiduesError(ValueError):
pass
def _gen_renumbered_res_by_template_map(source_seq, template_seq):
"""
Generate maps mapping old residue numbers to new residue numbers given a
template sequence. Returns two maps, a map for all nonstructured residues
and a map for all structured residues. These maps can be used to renumber
residues using `seq.renumberResidues`.
:param source_seq: input sequence to be renumbered
:type source_seq: schrodinger.protein.sequence.ProteinSequence
:param template_seq: template sequence
:type template_seq: schrodinger.protein.sequence.ProteinSequence
:raises RenumberResiduesError: if there aren't enough valid insertion codes
to do the renumbering.
"""
# Make a copy of the source sequence since we don't want to directly modify
# it.
source_seq_copy = sequence.ProteinSequence(str(source_seq))
# Align the seq and template seq
aligner = align.MaxIdentityAligner()
aln = alignment.ProteinAlignment([source_seq_copy, template_seq])
aligner.run(aln)
# Extract new residue numbers from template seq
template_seq.removeElements(
[template_seq[g.idx_in_seq] for g in source_seq_copy.getGaps()])
gap_length = 0
newnums = []
for res in template_seq:
if not res.is_res:
gap_length += 1
continue
if gap_length != 0:
if not newnums:
# Leading gaps. Use numbers smaller than the next resnum.
next_resnum = res.resnum
for resnum in range(next_resnum - gap_length, next_resnum):
newnums.append((resnum, " "))
gap_length = 0
else:
# generate rescodes for the gaps
first_rescode = newnums[-1]
last_rescode = res.getChainKey()
new_codes = sequence.gen_resnums_and_inscodes(
*first_rescode, *last_rescode)
if len(new_codes) < gap_length:
raise RenumberResiduesError(
"Optimal alignment of template "
"sequence requires more insertion codes than "
"available.")
newnums.extend(new_codes[:gap_length])
newnums.append(res.getChainKey())
gap_length = 0
# Create mapping of old residue numbers to new residue numbers
nonst_resnum_mapping = {} # nonstructured residues
st_resnum_mapping = {}
for s_res, newcode in zip(source_seq.residues(), newnums):
old_rescode = s_res.getChainKey()
if s_res.hasStructure():
st_resnum_mapping[old_rescode] = newcode
else:
nonst_resnum_mapping[old_rescode] = newcode
return nonst_resnum_mapping, st_resnum_mapping
def _gen_renumbered_res_by_antibody_cdr(seq, new_res_num_list):
"""
Generate maps mapping old residue numbers to new residue numbers based the
AntibodyCDR scheme. Returns two maps, a map for all nonstructured residues
and a map for all structured residues. These maps can be used to renumber
residues using `seq.renumberResidues`.
:param seq: input sequence to be renumbered
:type seq: schrodinger.protein.sequence.ProteinSequence
:param new_res_num_list: List of residue numbers per the Antibody CDR scheme
:type new_res_num_list: List[str]
:return: Maps for structured residues and structureless residues.
:rtype: tuple(dict[(int, str), (int, str)], dict[(int, str), (int,str)])
"""
nonst_resnum_mapping = {} # nonstructured residues
st_resnum_mapping = {}
for s_res, newcode in zip(seq.residues(), new_res_num_list):
_resnum, _inscode = annotation.parse_antibody_rescode(newcode)
new_res_code = residue.ResidueChainKey(resnum=_resnum, inscode=_inscode)
old_rescode = s_res.getChainKey()
if s_res.hasStructure():
st_resnum_mapping[old_rescode] = new_res_code
else:
nonst_resnum_mapping[old_rescode] = new_res_code
return nonst_resnum_mapping, st_resnum_mapping
class _ChainData(QtCore.QObject):
"""
Information about a single chain of a single entry from the current Maestro
project.
:cvar wsVisibilityChangeRequested: A signal emitted when the workspace
visibility of a chain should be changed. The `MaestroStructureModel`
instance is responsible for changing the workspace visibility in
response to this signal. Emitted with:
- the entry id of the chain (int)
- the chain name (str)
- whether the chain should be shown (True) or hidden (False) (bool)
- whether the entry is currently in the workspace (bool)
:vartype wsVisibilityChangeRequested: `QtCore.pyqtSignal`
:ivar eid: The entry id of the chain.
:vartype eid: int
:ivar chain: The chain name.
:vartype chain: str
:ivar seqs: A set of all sequences representing this chain.
:vartype seqs: `weakref.WeakSet`
:ivar workspace_seq: The sequence representing this chain in the workspace
alignment (i.e. the alignment shown in the workspace tab). Note that
this sequence also appears in `seqs`.
:vartype workspace_seq: schrodinger.protein.sequence.Sequence
"""
wsVisibilityChangeRequested = QtCore.pyqtSignal(int, str, bool, bool)
_changingSeqVisibility = util.flag_context_manager(
"_changing_seq_visibility")
def __init__(self, eid, chain, all_res, vis_res, included):
"""
:param eid: The entry id of the chain.
:type eid: int
:param chain: The chain name.
:type chain: str
:param all_res: A set of all residues in the chain, where each residue
is a tuple of (residue number, insertion code).
:type all_res: set(residue.ResidueChainKey)
:param vis_res: A set of all residues in the chain that are currently
visible in the Maestro workspace, where each residue is a tuple of
(residue number, insertion code).
:type vis_res: set(residue.ResidueChainKey)
:param included: Whether the entry is currently included in the
Maestro workspace.
:type included: bool
"""
super().__init__()
self.eid = eid
self.chain = chain
self.seqs = weakref.WeakSet()
self.workspace_seq = None
self._all_res = all_res
self._vis_res = vis_res
self._included = included
self._visibility = None
self._visibility_slots = scollections.IdDict()
self._changing_seq_visibility = False
self._updateVisibility()
self._rescode_to_residues_map = defaultdict(set)
def renumberResiduesByTemplate(self, seq, template_seq):
"""
Renumber `seq` based on the residue numbers of `template_seq`.
:param seq: input sequence to be renumbered
:type seq: schrodinger.protein.sequence.ProteinSequence
:param template_seq: template sequence
:type template_seq: schrodinger.protein.sequence.ProteinSequence
"""
nonst_resnum_mapping, st_resnum_mapping = _gen_renumbered_res_by_template_map(
seq, template_seq)
self._renumberResiduesByMap(seq, nonst_resnum_mapping,
st_resnum_mapping)
def renumberResiduesByAntibodyCDR(self, seq, new_res_num_list):
"""
Renumber residues in the sequence based on the given new numbers.
"""
nonst_resnum_mapping, st_resnum_mapping = _gen_renumbered_res_by_antibody_cdr(
seq, new_res_num_list)
self._renumberResiduesByMap(seq, nonst_resnum_mapping,
st_resnum_mapping)
def _renumberResiduesByMap(self, seq, nonst_resnum_map, st_resnum_map):
# Apply new residue numbering to every sequence linked to this chain
# Renumber both non-structured and structured residues for `seq`
seq.renumberResidues({**nonst_resnum_map, **st_resnum_map})
# Renumber only structured residues for every other sequence
for other_seq in self.seqs:
if other_seq is seq:
continue
other_seq.renumberResidues(st_resnum_map)
# Apply new residue numbering to the structure
st = seq.getStructure()
ch = st.chain[seq.structure_chain]
for st_res in ch.residue:
if (st_res.resnum, st_res.inscode) in st_resnum_map:
new_resnum, new_inscode = st_resnum_map[st_res.resnum,
st_res.inscode]
st_res.resnum = new_resnum
st_res.inscode = new_inscode
seq.setStructure(st)
# Update the resnum to residues map
self._rescode_to_residues_map.clear()
for seq in self.seqs:
self._updateRescodeMap(seq)
seq.onStructureChanged()
def renumberResidues(self, seq, start, increment, preserve_icode):
"""
Renumber the residues for `seq`. New residue numbers and insertion
codes will be propagated to structured residues in all `seq`s managed
by this `_ChainData`.
:param seq: The sequence to renumber.
:type seq: protein.sequence.AbstractSingleChainSequence
:param start: The number to start the renumbering with
:type start: int
:param increment: The amount to increment while numbering
:type increment: int
:param preserve_icode: Whether to keep the inscode for the residues. If
False, all inscodes will be set to " ".
:type preserve_icode: bool
"""
nonst_resnum_mapping, st_resnum_mapping = _gen_renumbered_res_map(
seq, start, increment, preserve_icode)
self._renumberResiduesByMap(seq, nonst_resnum_mapping,
st_resnum_mapping)
@property
def included(self):
"""
Whether the entry is currently included in the Maestro workspace.
:type: bool
"""
return self._included
@included.setter
def included(self, value):
self._included = value
self._updateVisibility()
def updateVisRes(self, added, removed):
"""
Update the set of residues that are currently visible in the Maestro
workspace.
:param added: The set of residues that were just added to the workspace.
:type added: set
:param removed: The set of residues that were just removed from the
workspace.
:type removed: set
:note: Each residue must be a tuple of (residue number, insertion code).
"""
self._vis_res -= removed
self._vis_res |= added
self._updateVisibility()
def clearVisRes(self):
"""
Clear the set of residues that are currently visible in the Maestro
workspace.
"""
self._vis_res.clear()
self._updateVisibility()
def _updateVisibility(self):
"""
Update self.visibility after an inclusion or a residue visibility
change.
"""
if len(self._all_res) == 0:
# This chain is about to be deleted so don't worry about the
# visibility
return
elif not self._included:
new_visibility = Inclusion.Excluded
elif len(self._vis_res) == len(self._all_res):
new_visibility = Inclusion.FullyVisible
elif len(self._vis_res) == 0:
new_visibility = Inclusion.NotVisible
else:
new_visibility = Inclusion.PartiallyVisible
if new_visibility != self._visibility:
self._visibility = new_visibility
self._setSequenceVisibility(new_visibility)
def _setSequenceVisibility(self, visibility):
"""
Update the visibility of all sequences that represent this chain.
:param visibility: The new visibility of this chain
:type visibility: `Inclusion`
"""
with self._changingSeqVisibility():
for cur_seq in self.seqs:
cur_seq.visibility = visibility
def updateAllRes(self, added, removed):
"""
Update the set of residues that exist in this chain.
:param added: The set of residues that were just added to the chain.
:type added: set
:param removed: The set of residues that were just removed from the
chain.
:type removed: set
:note: Each residue must be a tuple of (residue number, insertion code).
"""
# TODO: update self._all_res
self._updateVisibility()
# TODO: update sequences
@property
def visibility(self):
"""
The visibility of this chain in the Maestro workspace. May not be
updated directly. Use `updateVisRes` or `updateAllRes` instead.
:type: `Inclusion`
"""
return self._visibility
def addSeq(self, seq):
"""
Monitor a new sequence that represents this chain.
:param seq: The sequence to monitor
:type seq: schrodinger.protein.sequence.Sequence
"""
if seq in self.seqs:
# we're already monitoring this sequence
return
seq.visibility = self.visibility
self.connectSeq(seq)
self.seqs.add(seq)
self._updateRescodeMap(seq)
@QtCore.pyqtSlot(set)
def _updateRescodeMap(self, residues):
for res in residues:
if res.is_gap or res.seqres_only:
continue
self._rescode_to_residues_map[res.getChainKey()].add(res)
@QtCore.pyqtSlot(set)
def _removeFromRescodeMap(self, residues):
for res in residues:
if res.is_gap or res.seqres_only:
continue
self._rescode_to_residues_map[res.getChainKey()].remove(res)
def removeSeq(self, seq):
"""
Stop monitoring a sequence that represents this chain and strip it
of structural data.
"""
# make sure that the partial doesn't keep this object alive
self.disconnectSeq(seq)
seq.visibility = None
seq._get_structure = None
seq._set_structure = None
seq.entry_id = None
seq.structure_chain = None
self.seqs.remove(seq)
seq.onStructureChanged()
self._removeFromRescodeMap(seq)
def disconnect(self):
"""
Disconnect all signals and slots between the sequences and this
_ChainData.
"""
for seq in self.seqs:
self.disconnectSeq(seq)
def connectSeq(self, seq):
seq.sequenceCopied.connect(self._sequenceCopied)
seq.residuesAdded.connect(self._updateRescodeMap)
seq.residuesRemoved.connect(self._removeFromRescodeMap)
# Partial slots with references to self cause problems with garbage
# collection. To avoid this, we replace self with a weakref.
self = weakref.proxy(self)
vis_slot = partial(self._sequenceChangedVisibility, seq)
self._visibility_slots[seq] = vis_slot
seq.visibilityChanged.connect(vis_slot)
def disconnectSeq(self, seq):
vis_slot = self._visibility_slots[seq]
seq.visibilityChanged.disconnect(vis_slot)
seq.sequenceCopied.disconnect(self._sequenceCopied)
seq.residuesAdded.disconnect(self._updateRescodeMap)
seq.residuesRemoved.disconnect(self._removeFromRescodeMap)
def mapRescodeToResidues(self, chain_key):
"""
Return residues matching a given residue key relative to entry and chain
:type chain_key: residue.ResidueChainKey
"""
return self._rescode_to_residues_map[chain_key]
def getAllStructuredResiduesExcept(self, chain_keys):
"""
Get all structured residues in this chain other than those specified.
:param chain_keys: Residues to exclude from the return value
:type chain_keys: set(residue.ResidueChainKey)
:return: All other residues
:rtype: list(residue.Residue)
"""
residues = []
for key, res in self._rescode_to_residues_map.items():
if key not in chain_keys:
residues.extend(res)
return residues
@util.skip_if("_changing_seq_visibility")
def _sequenceChangedVisibility(self, seq):
"""
When a sequence changes visibility, update all other sequences and the
Maestro workspace.
:param seq: The sequence that changed visibility.
:type seq: sequence.Sequence
"""
old_inclusion = self._included
self._included = True
new_visibility = seq.visibility
if new_visibility is Inclusion.FullyVisible:
# We can't do "self._vis_res = self._all_res" here, since that would
# make both attributes point to the same set object.
self._vis_res |= self._all_res
show = True
elif new_visibility is Inclusion.NotVisible:
self._vis_res.clear()
show = False
else:
raise RuntimeError("Cannot set sequence visibility to %s" %
new_visibility)
self._visibility = new_visibility
self._setSequenceVisibility(new_visibility)
self.wsVisibilityChangeRequested.emit(self.eid, self.chain, show,
old_inclusion)
def chainRemoved(self):
"""
Respond to the entry being removed from the project. Mark all
sequences as not having an associated structure and disconnect the
sequences from visibility updates.
"""
with self._changingSeqVisibility():
for cur_seq in list(self.seqs):
self.removeSeq(cur_seq)
@QtCore.pyqtSlot(object, object)
def _sequenceCopied(self, orig_seq, copy_seq):
"""
When a sequence that we're monitoring is copied, make sure the copy can
properly get and set the structure and that it gets monitored for
changes.
:param orig_seq: The sequence being copied.
:type orig_seq: schrodinger.protein.sequence.Sequence
:param copy_seq: The newly created copy.
:type copy_seq: schrodinger.protein.sequence.Sequence
"""
self.addSeq(copy_seq)
copy_seq._get_structure = orig_seq._get_structure
copy_seq._set_structure = orig_seq._set_structure
copy_seq.structure_chain = orig_seq.structure_chain
[docs]class AbstractStructureModel(QtCore.QObject):
"""
Manages interactions between sequences and their associated structures. A
separate AbstractStructureModel subclass should be created for each program
MSV can run alongside of (i.e. Maestro, PyMol, standalone). For programs
with a workspace, this class also maintains the workspace alignment, which
contains sequences for all structures currently included in the workspace.
This class should not be instantiated directly. Instead `StructureModel`
should be instantiated, which will create an object of the appropriate
`AbstractStructureModel` subclass.
Subclasses must implement `_readStructures`. Subclasses for programs with a
workspace should implement `getWorkspaceAlignment`, `getIncludedEntries`,
and `importStructuresIntoWorkspace` and should set `IMPLEMENTS_GET_INCLUDED`
to True. Subclasses for programs with a concept of selected entries should
implement `getSelectedEntries`, and should set `IMPLEMENTS_GET_SELECTED` to
True.
Note that there should be one structure model instance per panel, *not* one
per tab.
:cvar IMPLEMENTS_GET_SELECTED: Whether `getSelectedEntries` is implemented.
Should be set to True in any subclass that implements this method.
:vartype IMPLEMENTS_GET_SELECTED: bool
:cvar IMPLEMENTS_GET_INCLUDED: Whether `getIncludedEntries` is implemented.
Should be set to True in any subclass that implements this method.
:vartype IMPLEMENTS_GET_INCLUDED: bool
:cvar IMPLEMENTS_AUTOLOAD: Whether this class implements the concept of
autoloading. Subclasses that implement autoloading should define
`getMsvAutosaveProjectName` and emit `projectSaveRequested` and
`projectLoadRequested` whenever an autoload or autosave is required.
:vartype IMPLEMENTS_AUTOLOAD: bool
:ivar workspaceColorsChanged: Signal emitted when colors of atoms in the
workspace change, if the associated program has a concept of a
workspace.
:vartype workspaceColorsChanged: QtCore.pyqtSignal
:ivar seqProjectTitlesChanged: Signal emitted when Project Table entry
titles change for sequences. Emits a dict
mapping sequences whose titles have changed
to their new title in the Project Table and
whether an immediate sequence name update should
be performed.
:vartype seqProjectTitlesChanged: QtCore.pyqtSignal(dict(
sequence.ProteinSequence: str), bool)
:ivar projectLoadRequested: Signal emitted when the MSV should autoload
a project.
:ivar projectSaveRequested: Signal emitted when the MSV should autosave
a project. Emits if it should reset the last save file name.
:vartype projectSaveRequested: QtCore.pyqtSignal(bool)
:ivar structureWarningProduced: Signal emitted when a loading a structure
produces a warning
"""
IMPLEMENTS_GET_SELECTED = False
IMPLEMENTS_GET_INCLUDED = False
IMPLEMENTS_AUTOLOAD = False
workspaceColorsChanged = QtCore.pyqtSignal()
seqProjectTitlesChanged = QtCore.pyqtSignal(dict, bool)
projectLoadRequested = QtCore.pyqtSignal()
projectSaveRequested = QtCore.pyqtSignal(bool)
structureWarningProduced = QtCore.pyqtSignal(str)
[docs] def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._gui_model = None
[docs] def renumberResiduesByTemplate(self, seq, template_seq):
"""
Renumber `seq` based on the residue numbers of `template_seq`.
:param seq: input sequence to be renumbered
:type seq: schrodinger.protein.sequence.ProteinSequence
:param template_seq: template sequence
:type template_seq: schrodinger.protein.sequence.ProteinSequence
"""
resmap, _ = _gen_renumbered_res_by_template_map(seq, template_seq)
seq.renumberResidues(resmap)
[docs] def renumberResidues(self, seq, start, increment, preserve_icode):
"""
Renumbers residues for a sequence.
"""
resmap, _ = _gen_renumbered_res_map(seq, start, increment,
preserve_icode)
seq.renumberResidues(resmap)
[docs] def renumberResiduesByAntibodyCDR(self, seq, new_res_num_list):
"""
Renumber residues in the sequence based on the given new numbers.
:param seq: Sequnce to be renumbered
:type seq: protein.sequence.ProteinSequence
:param new_res_num_list: List of residue numbers based on the Antibody
CDR numbering scheme.
:type new_res_num_list: List[str]
"""
resmap, _ = _gen_renumbered_res_by_antibody_cdr(seq, new_res_num_list)
seq.renumberResidues(resmap)
[docs] def mapResidues(self, residues):
"""
Map residues to all residues represented by the same structure residue.
Note that only structures currently included in the workspace are
considered. If a residue has no structure, the residue is included
unchanged.
"""
return residues
[docs] def setGuiModel(self, gui_model):
self._gui_model = gui_model
[docs] @QtCore.pyqtSlot(object, object)
def onPagesMutated(self, new_pages, old_pages):
"""
Update state in response to gui_model.pages.mutated signal.
Note that this method must be connected to using
`getSignalsAndSlots` rather `mutated.connect`.
"""
pass
[docs] def renameSeq(self, seq, new_name):
"""
Rename the specified sequence
:param seq: Sequence to be renamed
:type seq: sequence.ProteinSequence
:param new_name: New name for the sequence
:type new_name: str
"""
raise NotImplementedError
[docs] def getWorkspaceAlignment(self):
"""
Return an alignment that contains all entries included in the workspace.
The structure model will ensure that this alignment is always kept in
sync with the workspace. Returns None if the associated program has no
concept of a workspace (i.e. StandaloneStructureModel).
:note: This method will always return a split-chain alignment regardless
of the current split-chain view setting.
:rtype: `alignment.BaseAlignment` or NoneType
"""
return None
[docs] def getLinkedAlnSeqs(self, seq):
"""
Return a set of linked sequences to the specified sequence
:type seq: sequence.ProteinSequence
:rtype: set
"""
return set()
[docs] def getSelectedEntries(self):
"""
Returns a list of sequences for all entries that are currently selected
in the project table. Raises NotImplementedError if the associated
program has no concept of a selected entry.
:rtype: list
"""
raise NotImplementedError
[docs] def getIncludedEntries(self):
"""
Returns a list of sequences for all entries that are currently included
in the workspace. Raises NotImplementedError if the associated
program has no concept of a workspace.
:rtype: list
"""
raise NotImplementedError
[docs] def getWorkspaceColors(self):
"""
Returns a dict mapping residues to their color in the workspace.
:rtype: dict
"""
raise NotImplementedError
[docs] def setWorkspaceColors(self, color_map, all_atoms=False):
"""
Sets the colors in the workspace to the colors given by color_map.
:type color_map: dict
:param all_atoms: Whether to color all atoms or just carbons
:type all_atoms: bool
"""
raise NotImplementedError
[docs] def importFile(self, filename):
"""
Return sequences for the specified file. If the file contains
structural data, then the sequences will have associated structures
accessible via `sequence.getStructure()`.
:param filename: The filename to read
:type filename: str
:return: All sequences. Note that these sequences *have not*
been loaded into *any* alignment, including the workspace alignment.
If `filename` contains structural data and the current structure
model backend implements a workspace, see
`importStructuresIntoWorkspace`, which imports a file and returns
the corresponding workspace alignment sequences.
:rtype: list(sequence.Sequence)
:raise IOError: If there was an error importing the file.
"""
if fileutils.get_structure_file_format(filename) is not None:
return self._readStructures(filename)
else:
return self._readSequences(filename)
[docs] def importFiles(self, filenames):
"""
Return sequences for all specified files. If any of the files contain
structural data, then those sequences will have associated structures
accessible via `sequence.getStructure()`.
:param filenames: The filenames to read
:type filenames: iterable
:return: All imported sequences. Note that these sequences *have not*
been loaded into *any* alignment, including the workspace alignment.
:rtype: list(sequence.Sequence)
:raise IOError: If there was an error importing the files.
"""
if isinstance(filenames, str):
raise TypeError("importFiles expects a non-string iterable. Try "
"importFile instead.")
seqs = []
for cur_file in filenames:
seqs.extend(self.importFile(cur_file))
return seqs
def _readStructures(self, filename):
"""
Return sequences for the specified file, which must contain structural
data. The sequences will have associated structures accessible via
`sequence.getStructure()`.
:param filename: The filename to read
:type filename: str
:return: All sequences
:rtype: list(sequence.Sequence)
"""
raise NotImplementedError
def _readSequences(self, filename):
"""
Return sequences for the specified file, which does not contain
structural data.
:param filename: The filename to read
:type filename: str
:return: All sequences
:rtype: list(sequence.Sequence)
"""
return seqio.read_sequences(filename)
def _convertStructure(self, st, *args, **kwargs):
"""
Return the sequences converted from the given `st`. args and kwargs will
be passed to seqio.StructureConverter.convert.
"""
with seqio.catch_sequence_warnings() as warn_catcher:
seqs = seqio.StructureConverter.convert(st, *args, **kwargs)
if warn_catcher.message:
# This can be called during panel init, so use a single-shot timer
# so the slot doesn't run until init is done
QtCore.QTimer.singleShot(
0, lambda: self.structureWarningProduced.emit(warn_catcher.
message))
return seqs
[docs] def importStructuresIntoWorkspace(self, filename):
"""
Import all structures from the given file into the workspace and
include only the first structure.
:param filename: The filename to read
:type filename: str
:return: Sequences from the workspace alignment that correspond to the
newly imported structures.
:rtype: list(sequence.Sequence)
"""
raise NotImplementedError
[docs] @classmethod
def generateEntryResidueASL(cls, residues_by_entry):
"""
Generate an ASL string for the given entry IDs and residues.
:param residues_by_entry: Mapping of entry id to residues
:type residues_by_entry: dict[str, list(protein.residue.Residue)]
"""
asl_parts = []
for eid, residues in sorted(residues_by_entry.items()):
entry_asl = cls.generateResidueASL(residues)
if entry_asl is not None:
entry_asl = f'(entry.id {eid} AND ({entry_asl}))'
asl_parts.append(entry_asl)
return " OR ".join(asl_parts)
[docs] @staticmethod
def generateResidueASL(residues):
"""
Generate an ASL string for the given residues. Residues should be
from the same entry.
:type residues: collections.abc.Iterable(protein.residue.Residue)
"""
sresidues = (
res.sequence.getStructureResForRes(res) for res in residues)
sresidues = [sres for sres in sresidues if sres is not None]
# There are bugs with selecting multiple negative residue numbers at
# the same time (e.g. "res.num -31, -32". See SHARED-7239). Using the
# ASL for each negative residue separately works around the bug.
neg_sresidues = [sres for sres in sresidues if sres.resnum < 0]
nonneg_sresidues = [sres for sres in sresidues if sres.resnum >= 0]
asl_parts = [res.getAsl() for res in neg_sresidues]
if nonneg_sresidues:
asl_parts.append(analyze.generate_residue_asl(nonneg_sresidues))
return " OR ".join(asl_parts)
[docs] @classmethod
def generateMultiEntryResidueASL(cls, residues):
"""
Generate an ASL string for the given residues. Residues can be from
different entries.
:type residues: collections.abc.Iterable(protein.residue.Residue)
"""
resmap = collections.defaultdict(list)
for res in residues:
if res.is_gap or res.sequence.entry_id is None:
continue
resmap[res.sequence.entry_id].append(res)
return cls.generateEntryResidueASL(resmap)
[docs] def applyWorkspaceSelectionToSeqs(self, aln, seqs=None):
"""
Select any residues in the given sequences that are selected in the
workspace. Sequences without structures or with structures that aren't
currently included in the workspace are ignored.
This method is a no-op for structure models without a workspace.
:param aln: The alignment to select the residues in
:type aln: gui_alignment._ProteinAlignment
:param seqs: The sequences to select residues in. If not given, all
sequences in `aln` will be used.
:type seqs: Iterable(sequence.Sequence)
"""
# This method intentionally left blank
[docs] def delayedSyncFromMsvToWorkspace(self, aln):
"""
Replace residue selection in the workspace with residue selection in the
given alignment. When new entries are included in the workspace, their
residue selection is not automatically synchronized until selection is
changed in either the workspace (in which case residue selection from
the workspace is applied to the MSV) or the MSV (in which case residue
selection from the MSV active tab is applied to the workspace and to the
other MSV tabs). This method forces selection to be immediately
synchronized.
This method will also remove workspace selection for any entries without
a linked sequence in the given alignment.
This method is a no-op for structure models without a workspace.
:param aln: The alignment to take residue selection from
:type aln: gui_alignment._ProteinAlignment
"""
# This method intentionally left blank
[docs] def getStructSeq(self, entry_id, chain_name):
"""
Return a sequence for the chain structure specified by `entry_id` and
`chain_name`. This sequence will *not* be monitored by the structure
model in any way and will not be kept up to date with any changes to the
structure.
This method will always raise a ValueError for structure models without
a workspace.
:param entry_id: The entry id of the structure.
:type entry_id: int or str
:param chain_name: The name of the chain to create a sequence for.
:type chain_name: str
:return: The requested sequence
:rtype: sequence.Sequence
:raises ValueError: If the specified entry_id or chain don't exist.
"""
raise ValueError("No workspace.")
[docs] def linkSequence(self, seq, entry_id, chain_name):
"""
Link a sequence to the structure specified by entry_id and chain name.
This method will always raise a ValueError for structure models without
a workspace.
:param seq: The sequence to associate with a structure.
:type seq: sequence.Sequence
:param entry_id: The entry id of the structure to associate
:type entry_id: str or int
:param chain_name: The name of chain of the structure to associate
with the sequence.
:type chain_name: str
:raises ValueError: If the specified entry_id or chain don't exist.
"""
raise ValueError("No workspace.")
[docs] def loadFileAndLink(self, filename, seq):
# See child class for documentation
raise ValueError("No workspace.")
[docs]class StructureModel(metaclass=StructureModelMeta):
"""
When instantiated, this class will return the appropriate
`AbstractStructureModel` subclass.
"""
def __new__(cls, parent, undo_stack):
"""
:param parent: The Qt parent widget
:type parent: QtWidgets.QWidget
:param undo_stack: The undo stack
:type undo_stack: schrodinger.application.msv.command.UndoStack
"""
if maestro:
return MaestroStructureModel(parent, undo_stack)
elif pymol:
return PyMolStructureModel()
else:
return StandaloneStructureModel()
[docs]class NewResInfo(
namedtuple("NewResInfo", ("resnum", "inscode", "resname", "is_na"))):
"""
Description of a new residue added to the workspace structure during a
residuesChanged signal.
"""
# TODO MSV-2379: Consider all items in WHResidue.d_hash
def __new__(cls, resnum, inscode, resname, is_na=None):
# Make is_na (whether the residue is a nucleic acid) optional
return super().__new__(cls, resnum, inscode, resname, is_na)
[docs] def chainKey(self):
"""
A key to uniquely identify the residue within the chain
"""
return residue.ResidueChainKey(self.resnum, self.inscode)
[docs]class WHResInfo(typing.NamedTuple):
"""
Tuple to hash WHResidue appropriately. Used to create `NewResInfo` objects.
Note: not using WHResidue.getHash() because it also considers molecule
number
"""
# TODO MSV-2379: Consider all items in WHResidue.d_hash
eid: int
chain: str
resnum: int
inscode: str
resname: str
is_na: bool
[docs] @classmethod
def fromWHRes(self, whres):
"""
Generate a `WHResInfo` object from a `WHResidue` object.
"""
return WHResInfo(int(whres.getEntryID()), whres.getChain(),
whres.getResNum(), whres.getInsCode(),
whres.getPDBName(),
whres.isDNA() or whres.isRNA())
[docs] def entryKey(self):
"""
A key to uniquely identify the entry chain
"""
return (self.eid, self.chain)
[docs] def chainKey(self):
"""
A key to uniquely identify the residue within the chain
"""
return residue.ResidueChainKey(self.resnum, self.inscode)
[docs] def residueKey(self):
"""
A key to uniquely identify the residue and chain
"""
return residue.ResidueKey(self.eid, self.chain, self.resnum,
self.inscode)
[docs]class MaestroStructureModel(AbstractStructureModel):
IMPLEMENTS_GET_SELECTED = True
IMPLEMENTS_GET_INCLUDED = True
IMPLEMENTS_AUTOLOAD = True
# valid PDB names for protein residues
VALID_AA_NAMES = set(residue.AMINO_ACIDS_THREE_LETTER.keys()) - {"UNK"}
VALID_NA_NAMES = set(residue.NA_THREE_LETTER.keys())
_changingMaestroVisibility = util.flag_context_manager(
"_changing_maestro_visibility")
_updatingSeqres = util.flag_context_manager("_updating_seqres")
_updatingColor = util.flag_context_manager("_updating_color")
_syncingSelection = util.flag_context_manager("_syncing_selection")
_syncingInclusion = util.flag_context_manager("_syncing_inclusion")
_renamingProjectEntries = util.flag_context_manager(
'_renaming_project_entries')
[docs] def __init__(self, parent, undo_stack):
"""
:param parent: The Qt parent widget
:type parent: QtWidgets.QWidget
:param undo_stack: The undo stack. This will be cleared whenever
Maestro initiates a change that we can't undo.
:type undo_stack: schrodinger.application.msv.command.UndoStack
"""
super().__init__(parent)
self.undo_stack = undo_stack
self._syncing_selection = False
self._syncing_inclusion = False
self._changing_maestro_visibility = False
self._closing_project = False
self._updating_seqres = False
self._updating_color = False
# Sequences that shouldn't have their titles synchronized with the
# Maestro entry name. Everything else about the sequences is still
# synchronized.
self.unsynched_seqs = set()
# a dictionary of [entry id as int][chain name] = _ChainData for chain
self._entry_chain_map = {}
self._name_synch_aln = None
self._renaming_project_entries = False
self._request_immediate_rename = False
self._valid_seqres_names = self.VALID_AA_NAMES | self.VALID_NA_NAMES
self._workspace_hub = maestro_ui.WorkspaceHub.instance()
for signal, slot in self._getWHSignalsAndSlots(self._workspace_hub):
signal.connect(slot)
self._maestro_hub = maestro_ui.MaestroHub.instance()
for signal, slot in self._getMHSignalsAndSlots(self._maestro_hub):
signal.connect(slot)
self._workspace_aln = gui_alignment.GuiProteinAlignment(
is_workspace=True)
self._split_workspace_aln = self._workspace_aln
self._inclusion_changing = set()
self._delayed_sync_eids = set()
included_eids = maestro.get_included_entry_ids()
self._initEidsInWorkspaceAln(included_eids)
self.applyWorkspaceSelectionToSeqs(self._workspace_aln)
for callback_info, callable in self._getMaestroCallbacks():
callback_info.add(callable)
[docs] def renumberResiduesByTemplate(self, seq, template_seq):
"""
Renumber `seq` based on the residue numbers of `template_seq`.
:param seq: input sequence to be renumbered
:type seq: schrodinger.protein.sequence.ProteinSequence
:param template_seq: template sequence
:type template_seq: schrodinger.protein.sequence.ProteinSequence
"""
cur_sel = self._gui_model.current_page.split_aln.res_selection_model.getSelection(
)
if seq.entry_id is not None:
chain_data = self._entry_chain_map[int(
seq.entry_id)][seq.structure_chain]
chain_data.renumberResiduesByTemplate(seq, template_seq)
else:
super().renumberResiduesByTemplate(seq, template_seq)
self._syncSelectionToMaestro(cur_sel)
[docs] def renumberResidues(self, seq, start, increment, preserve_icode):
"""
Renumbers residues for a sequence and propagates the renumbering
to all sequences linked to the same chain.
See `_ChainData.renumberResidues` for more documentation.
"""
cur_sel = self._gui_model.current_page.split_aln.res_selection_model.getSelection(
)
if seq.entry_id is not None:
chain_data = self._entry_chain_map[int(
seq.entry_id)][seq.structure_chain]
chain_data.renumberResidues(seq, start, increment, preserve_icode)
else:
super().renumberResidues(seq, start, increment, preserve_icode)
self._syncSelectionToMaestro(cur_sel)
[docs] def renumberResiduesByAntibodyCDR(self, seq, new_res_num_list):
cur_sel = self._gui_model.current_page.split_aln.res_selection_model.getSelection(
)
if seq.entry_id is not None:
chain_data = self._entry_chain_map[int(
seq.entry_id)][seq.structure_chain]
chain_data.renumberResiduesByAntibodyCDR(seq, new_res_num_list)
else:
super().renumberResiduesByAntibodyCDR(seq, new_res_num_list)
self._syncSelectionToMaestro(cur_sel)
[docs] def setGuiModel(self, gui_model):
"""
Set the GUI Model that this structure model should keep up to date. This
method will update any view pages in the GUI model. If a workspace page
is present, it will be updated. Otherwise, a new workspace page will be
created.
:param gui_model: The GUI model to keep up to date.
:type gui_model: gui.gui_model.MsvGuiModel
"""
super().setGuiModel(gui_model)
if gui_model.hasWorkspacePage():
ws_page = gui_model.getWorkspacePage()
# Note that _updateWorkspacePage must be called before
# _onWorkspaceSplitChainViewChanged so that _updateWorkspacePage is
# always run with a split-chain workspace alignment.
with self._syncingSelection():
self._updateWorkspacePage(ws_page)
else:
ws_page = gui_model.addWorkspacePage(self._workspace_aln)
ws_page.split_chain_viewChanged.connect(
self._onWorkspaceSplitChainViewChanged)
# make sure we're up to date with the current split-chain view setting
self._onWorkspaceSplitChainViewChanged()
self.updateViewPages(gui_model)
# connect residue selection changed signal at end (after WS selection
# has been synced to MSV)
for page in gui_model.pages:
page.aln_signals.resSelectionChanged.connect(
self._alignmentSelectionChanged)
def _updateWorkspacePage(self, page):
"""
Update an existing workspace page with the current workspace sequences.
This should be called whenever the MSV panel is reopened, since it
doesn't monitor workspace changes while it's closed.
:param page: The workspace page to update
:type page: gui.gui_model.PageModel
"""
if not page.is_workspace:
msg = "This method should only be called on a workspace page"
raise ValueError(msg)
# aln is guaranteed to be a split-chain alignment since we haven't
# looked at PageModel.split_chain_view yet
aln = self._workspace_aln
orig_aln = page.split_aln
orig_seqs_map = defaultdict(dict)
for seq in orig_aln:
orig_seqs_map[seq.entry_id][seq.chain] = seq
new_seqs = [seq for seq in aln]
# Make all seqs unparented because they'll be put into a temporary
# alignment in _alignSeqPair
orig_aln.clear()
aln.clear()
for seq in new_seqs:
try:
orig_seq = orig_seqs_map[seq.entry_id][seq.chain]
except KeyError:
pass
else:
# transfer old gaps
self._alignSeqPair(orig_aln, orig_seq, seq)
aln.addSeqs(new_seqs)
self.applyWorkspaceSelectionToSeqs(aln)
page.aln = aln
[docs] def applyWorkspaceSelectionToSeqs(self, aln, seqs=None):
# See parent class for method documentation
if seqs is None:
seqs = aln
res_to_select = self._getResiduesSelectedInWorkspace(seqs)
if not res_to_select:
return
with self._syncingSelection():
aln.res_selection_model.setSelectionState(res_to_select,
True,
_undoable=False)
# make sure that the selection update happens while we're in the
# _syncingSelection block so we know to ignore it
aln.res_selection_model.forceSelectionUpdate()
def _getResiduesSelectedInWorkspace(self, seqs):
"""
Find all residues in the given sequences that correspond to selected
residues in the workspace.
:param seqs: The sequences to find select residues for.
:type seqs: Iterable(sequence.Sequence)
:return: The selected residues
:rtype: set(residue.Residue)
"""
structured_seqs = [seq for seq in seqs if seq.entry_id]
if not structured_seqs:
return set()
ws_sel = self._workspace_hub.getSelAtomsToResSet()
whres_info = self._whresSet(ws_sel, set())
res_keys = {res.residueKey() for res in whres_info}
selected = set()
for cur_seq in structured_seqs:
eid = cur_seq.entry_id
seq_sel = {
res for res in cur_seq
if not res.is_gap and residue.get_residue_key(
res, eid, res.structure_chain) in res_keys
}
selected.update(seq_sel)
return selected
[docs] def updateViewPages(self, gui_model):
"""
Update linked sequences in all view (i.e. non-workspace) pages in the
GUI model. This should be called whenever the MSV panel is reopened,
since it doesn't monitor structure changes while it's closed.
:param gui_model: The model to update.
:type gui_model: gui.gui_model.MsvGuiModel
"""
for page in gui_model.getViewPages():
aln = page.split_aln
seqs_to_update = self._getLinkedSequences(aln)
for seq in seqs_to_update:
try:
self.linkSequence(seq, seq.entry_id, seq.chain)
except ValueError:
# the entry or chain was deleted while the MSV was closed
pass
page.regenerateCombinedChainAlignment()
self.applyWorkspaceSelectionToSeqs(page.aln)
def _alignSeqPair(self, aln, ref_seq, other_seq):
"""
Align two seqs. They should be unparented (i.e. not already in an
alignment) to avoid undefined behavior. Gaps in `other_seq` that don't
appear in `ref_seq` will be removed.
"""
if ref_seq in aln or other_seq in aln:
raise ValueError("Neither seq should be in an alignment.")
aln_class = type(aln)
tmp_alignment = aln_class([ref_seq, other_seq])
align.BiopythonPairwiseAligner().run(tmp_alignment)
to_remove = []
# Collect gaps that appear only in other_seq
for ref_elem, other_elem in tmp_alignment.columns():
if other_elem.is_gap and ref_elem.is_res:
to_remove.append(other_elem)
# Remove all gaps at once to avoid shifting columns
tmp_alignment.removeElements(to_remove)
tmp_alignment.clear()
[docs] @QtCore.pyqtSlot(object, object)
def onPagesMutated(self, new_pages, old_pages):
added, removed, moved = diffy.get_diff(new_pages, old_pages)
for page, _ in added:
page.aln_signals.resSelectionChanged.connect(
self._alignmentSelectionChanged)
for page, _ in removed:
page.aln_signals.resSelectionChanged.disconnect(
self._alignmentSelectionChanged)
@QtCore.pyqtSlot()
def _onWorkspaceSplitChainViewChanged(self):
self._workspace_aln = self._gui_model.getWorkspacePage().aln
[docs] def getStructSeq(self, entry_id, chain_name):
# See parent class for method documentation
seqs, _, _, _ = self._getUnlinkedSeqsForEid(entry_id)
return self._getSeqForChain(seqs, chain_name)
def _getSeqForChain(self, seqs, chain_name):
"""
Given a list of sequences, find the sequence with the desired chain
name.
:param seqs: The sequences to search.
:type seqs: Iterable(sequence.Sequence)
:param chain_name: The name of the chain to find.
:type chain_name: str
:return: The first sequence with the specified chain name.
:rtype: sequence.Sequence
:raises ValueError: If no sequence with the specified chain name is
present.
"""
for seq in seqs:
if seq.chain == chain_name:
return seq
raise ValueError(f"Chain {chain_name} not found")
[docs] def linkSequence(self, seq, entry_id, chain_name):
# See parent class for method documentation
# _getUnlinkedSeqsForEid and _getSeqForChain will raise ValueErrors if
# the specified entry id or chain name don't exist, so call those to
# sanity check the input values before we start making changes.
entry_id = int(entry_id)
eid_seqs, row, struc, proj = self._getUnlinkedSeqsForEid(entry_id)
struc_seq = self._getSeqForChain(eid_seqs, chain_name)
# initialize data for the project entry if necessary
if entry_id not in self._entry_chain_map:
vis_res = self._getVisResIfNeeded({entry_id})
chain_data = self._initDataForEntry(entry_id, row, struc, eid_seqs,
vis_res)
self._entry_chain_map[entry_id] = chain_data
seq.entry_id = str(entry_id)
seq.structure_chain = chain_name
# Copy before adding to chain data to avoid sequenceCopied side effects
copied_seq = copy.deepcopy(seq)
aligner = align.MaxIdentityAligner()
tmp_alignment = gui_alignment.GuiProteinAlignment(
[struc_seq, copied_seq])
aligner(tmp_alignment)
# Change residue numbers before adding to chain data
self._transferResidueInformation(from_seq=struc_seq,
to_seq=seq,
aligned_seq=copied_seq)
self._addSeqsToChainData([seq], int(entry_id), proj)
# Residue map needs entry ID and structure to be set
seq.generateResidueMap()
seq.onStructureChanged()
if row.title != seq.name:
# if the sequence has a different title than the Maestro entry,
# don't try to keep the titles synchronized
self.unsynched_seqs.add(seq)
# synchronize residue selection the next time the user changes it
self._delayed_sync_eids.add(entry_id)
self.undo_stack.clear()
[docs] def loadFileAndLink(self, filename, seq):
"""
Load a structure from the file and link it to the sequence.
:param filename: Filename of a structure containing a single protein
chain that corresponds to `seq`'s chain
:type filename: str
:param seq: The sequence to associate with the structure
:type seq: sequence.Sequence
:raises ValueError: If the file does not contain the protein chain
corresponding to the sequence
"""
new_seqs = self._readStructures(filename)
chain_id = seq.chain
for seq_ in new_seqs:
if seq_.structure_chain == chain_id:
entry_id = seq_.entry_id
break
else:
raise ValueError(
f"{filename} does not contain the expected chain {chain_id}")
self.linkSequence(seq, entry_id, chain_id)
def _transferResidueInformation(self, *, from_seq, aligned_seq, to_seq):
"""
Transfer residue information from one sequence to another using
`aligned_seq` as a reference. `aligned_seq` should have the exact
same sequence of residues as `to_seq` and be aligned to `from_seq`.
Any residue in `aligned_seq` that is aligned to a matching residue in
`from_seq` will have its corresponding residue in `to_seq` transformed
to match. For example, if we have the following arguments:
`from_seq`: ATCG
`aligned_seq`:AY~G
`to_seq`: AYG
The `A` and `G` residues of `to_seq` will have the same residue numbers,
insertion codes, and seqres_only value as the `A` and `G` in `from_seq`.
:param from_seq: The sequence to transfer residue information from.
Should be aligned to `to_seq`.
:type from_seq: sequence.Sequence
:param to_seq: The sequence to transfer residue information to. Should
have the same sequence of residues as `aligned_seq`
:type to_seq: sequence.Sequence
:param aligned_seq: A copy of `to_seq` that is aligned to `from_seq`.
:type aligned_seq: sequence.Sequence
"""
unaligned_residues = set()
ssa = []
for seq_res, copied_res in zip(to_seq.residues(),
aligned_seq.residues()):
struc_res = from_seq[copied_res.idx_in_seq]
if (struc_res.is_gap or
copied_res.type.short_code != struc_res.type.short_code):
unaligned_residues.add(seq_res)
ssa.append(None)
else:
seq_res.resnum = struc_res.resnum
seq_res.inscode = struc_res.inscode
seq_res.seqres_only = struc_res.seqres_only
ssa.append(struc_res.secondary_structure)
for res in unaligned_residues:
res.resnum = None
res.inscode = None
res.seqres_only = True
to_seq.setSSA(ssa)
[docs] def getAssociatedChainName(self, seq):
"""
Get the name of the chain associated with a sequence. Returns None
if the `seq` doesn't have a structure.
:return: The associated chain name
:rtype: str
"""
if not seq.hasStructure():
return None
entry_id = int(seq.entry_id)
for chain_name, chain_data in self._entry_chain_map[entry_id].items():
if seq in chain_data.seqs:
return chain_name
[docs] def unlinkSequence(self, seq):
"""
Unlink a sequence from its structure.
:param seq: The sequence to unlink.
:type seq: sequence.Sequence
"""
entry_id = int(seq.entry_id)
for chain_data in self._entry_chain_map[entry_id].values():
if seq in chain_data.seqs:
chain_data.removeSeq(seq)
seq.setResidueMap({})
[docs] def disconnect(self):
"""
Disconnect Maestro callbacks and Workspace Hub signals
"""
for callback_info, callable in self._getMaestroCallbacks():
callback_info.remove(callable)
for signal, slot in self._getWHSignalsAndSlots(self._workspace_hub):
signal.disconnect(slot)
self._workspace_hub = None
for chains in self._entry_chain_map.values():
for chain_data in chains.values():
chain_data.disconnect()
def _getMaestroCallbacks(self):
"""
Return a list of maestro callback info and corresponding slot
:rtype: list(tuple(maestro_callback.CallbackInfo, callable))
"""
cbs = maestro_callback.CALLBACK_FUNCTIONS
return [
(cbs[maestro_callback.PROJECT_CLOSE_CALLBACK], self._projectClose),
(cbs[maestro_callback.PROJECT_UPDATE_CALLBACK],
self._projectChanged),
(cbs[maestro_callback.WORKSPACE_CHANGED_CALLBACK],
self.onWorkspaceChanged),
] # yapf: disable
def _getMHSignalsAndSlots(self, mh):
"""
Return a list of maestro hub signals and corresponding slots
:rtype: list(tuple(signal, callable))
"""
return [
(mh.projectOpened, self._projectOpened)
] # yapf: disable
@QtCore.pyqtSlot()
def _projectOpened(self):
self.projectLoadRequested.emit()
def _getWHSignalsAndSlots(self, wh):
"""
Return a list of workspace hub signals and corresponding slots
:rtype: list(tuple(signal, callable))
"""
return [
(wh.ligandAtomsChanged, self._ligandAtomsChanged),
(wh.ligandResiduesChanged, self._ligandResiduesChanged),
(wh.inclusionChanged, self._inclusionChanged),
(wh.residuesUpdated, self._residuesUpdated),
(wh.residueDisplayChanged, self._residueDisplayChanged),
(wh.residueSelectionChanged, self._residueSelectionChanged)
] # yapf: disable
def _initEidsInWorkspaceAln(self, eids):
"""
Add sequences for the specified entry ids to the workspace
alignment. This method should only be called for entries that have
never been previously added to the workspace alignment. If an
entry has been previously added to the workspace alignment, then
instead add _ChainData.workspace_seq to the alignment for all chains
in the entry.
:param eids: The entry ids to add
:type eids: iterable
"""
# Ignore any scratch entries and make sure that all eids are ints
# since that's how WorkspaceHub provides them
eids = [
int(eid) for eid in eids if isinstance(eid, int) or eid.isdigit()
]
vis_res = self._getVisResIfNeeded(eids)
for cur_eid in eids:
# _getSeqsForEid will populate self._entry_chain_map if needed
seqs = self._getSeqsForEid(cur_eid, vis_res)
for cur_seq in seqs:
chain_data = self._entry_chain_map[cur_eid][
cur_seq.structure_chain]
chain_data.workspace_seq = cur_seq
self._workspace_aln.addSeqs(seqs, replace_selection=True)
def _getVisResIfNeeded(self, eids):
"""
If this structure model isn't yet tracking any of the specified entries,
return information about what residues are currently visible in the
workspace. Otherwise, return None.
:param eids: A list of entry ids
:type eids: iterable
:return: A dictionary of [entry id as integer][chain name] = set of
residues currently visible in the workspace, or None
:rtype: defaultdict or NoneType
"""
if set(eids) - set(self._entry_chain_map.keys()):
return self._getWorkspaceVisRes()
[docs] def getSeqsForEid(self, eid):
"""
Get sequences for each chain in the specified entry.
:param eid: The entry id to fetch sequences for.
:type eid: int or str
:return: A list of the requested sequences.
:rtype: list
"""
# Make sure eid is an int since that's how WorkspaceHub provides them
eid = int(eid)
vis_res = self._getVisResIfNeeded([eid])
return self._getSeqsForEid(eid, vis_res)
[docs] def getSeqsForEids(self, eids, *, ignore_missing=False):
"""
Get sequences for each chain in all specified entries.
:param eids: The entry ids to fetch sequences for.
:type eids: list
:param ignore_missing: Whether we should ignore any entry ids that
aren't present in the project. If this is False and an entry id is
not present, a ValueError will be raised.
:type ignore_missing: bool
:return: A list of the requested sequences.
:rtype: list
:raise ValueError: If any of the specified eids are not present in the
project and `ignore_missing` is `False`.
"""
# Make sure the eids are ints since that's how WorkspaceHub provides
# them
eids = list(map(int, eids))
vis_res = self._getVisResIfNeeded(eids)
seqs = []
for cur_eid in eids:
try:
cur_seqs = self._getSeqsForEid(cur_eid, vis_res)
except ValueError:
if not ignore_missing:
raise
else:
seqs.extend(cur_seqs)
return seqs
def _readStructures(self, filename):
"""
Return sequences for the specified file, which must contain structural
data. The sequences will have associated structures accessible via
`sequence.getStructure()`.
NOTE Two sets of sequences will be created for the structure.
One set will be loaded into the workspace tab in response to
new structures being loaded into Maestro, and one set will be used
as a return value.
NOTE If structures with the same title are found using NMR, then
only the sequence for the structure with the lowest entry ID will be
returned. For example, if the file has six 5z5q entries found using
NMR with entry IDs 1-6, and one 1cmy entry not found using NMR, then
two sequences will be returned: one sequence for one 5z5q entry (entry
ID 1) and one sequence for 1cmy.
This is done since files with NMR structures usually have multiple
conformers which all have the same sequence.
:param filename: The filename to read
:type filename: str
:return: All sequences
:rtype: list(sequence.Sequence)
"""
rows = self._importStructuresIntoWorkspace(filename)
rows = self._filterNMRRows(rows)
eids = [row.entry_id for row in rows]
return self.getSeqsForEids(eids)
def _filterNMRRows(self, rows):
"""
Given a list of rows, filter out NMR conformers, keeping only one per
shared entry title. See the notes in `_readStructures` for an example.
:param rows: List of the project rows to filter. Rows should be
in entry id order.
:type rows: list(project.ProjectRow)
:return: Filtered list of project rows
:rtype: list(project.ProjectRow)
"""
nmr_titles = set()
filtered = []
for row in rows:
if 'NMR' in row.property.get('s_pdb_PDB_EXPDTA', ''):
if row.title not in nmr_titles:
filtered.append(row)
nmr_titles.add(row.title)
else:
filtered.append(row)
return filtered
def _importStructuresIntoWorkspace(self, filename):
"""
Import all structures from the given file into the workspace and include
only the first structure. Additionally selects all the new structures.
If the structure is missing the title, the file base name is used as
title.
:param filename: The filename to read
:type filename: str
:return: list of the new project rows
:rtype: list(project.ProjectRow)
"""
proj = maestro.project_table_get()
strucs = structure.StructureReader(filename)
rows = []
for st in strucs:
if not st.title.strip():
st.title = fileutils.get_basename(filename)
row = proj.importStructure(st)
rows.append(row)
# include the first new entry
rows[0].in_workspace = project.IN_WORKSPACE
# select all new entries
for row in rows:
row.is_selected = True
return rows
[docs] def importStructuresIntoWorkspace(self, filename):
# See AbstractStructureModel for method documentation
rows = self._importStructuresIntoWorkspace(filename)
eids = set(row.entry_id for row in rows)
return [seq for seq in self._workspace_aln if seq.entry_id in eids]
[docs] def getSelectedEntries(self):
# See AbstractStructureModel for method documentation
proj = maestro.project_table_get()
eids = [row.entry_id for row in proj.selected_rows]
return self.getSeqsForEids(eids)
[docs] def getIncludedEntries(self):
# See AbstractStructureModel for method documentation
proj = maestro.project_table_get()
eids = [row.entry_id for row in proj.included_rows]
return self.getSeqsForEids(eids)
def _getSeqsForEid(self, eid, vis_res):
"""
Get sequences for each chain in the specified entry. Note that
this method will populate `self._entry_chain_map` with data for
entry `eid` if the entry is not already present.
:param eid: The entry id to fetch sequences for.
:type eid: int or str
:param vis_res: If information about `eid` is not already stored
in `self._entry_chain_map`, a dictionary of
[entry id as integer][chain name] = set of residues currently
visible in the workspace. If `self._entry_chain_map` already
contains information about `eid`, may be None.
:type vis_res: defaultdict or NoneType
:return: A list of the requested sequences.
:rtype: list
:raise ValueError: If the specified eid is not present in the project
"""
eid = int(eid)
seqs, row, struc, proj = self._getUnlinkedSeqsForEid(eid)
if eid not in self._entry_chain_map:
self._entry_chain_map[eid] = self._initDataForEntry(
eid, row, struc, seqs, vis_res)
self._addSeqsToChainData(seqs, eid, proj)
return seqs
def _getUnlinkedSeqsForEid(self, eid):
"""
Get sequences that correspond to the structure for the specified project
entry. These sequences will not be monitored by the structure model in
any way.
:param eid: The entry id to fetch sequences for
:type eid: str or int
:return: A tuple of
- A list of the requested sequences
- The ProjectRow for the specified entry id
- The structure for the specified entry id
- The Maestro project
:rtype: tuple(list(sequence.Sequence),
project.ProjectRow,
structure.Structure,
project.Project))
:raises ValueError: If the entry id is not found.
"""
proj = maestro.project_table_get()
try:
row = proj[eid]
except KeyError:
raise ValueError("Entry id %s not found" % eid)
struc = row.getStructure()
seqs = self._convertStructure(struc)
for seq in seqs:
# Update seqs with project row name
seq.name = row.title
return seqs, row, struc, proj
def _convertStructure(self, st, *args, **kwargs):
# See parent class for method documentation
seqs = super()._convertStructure(st, *args, **kwargs)
# record any new residue names so that we'll recognize them if the
# WorkspaceHub emits signals about them
resnames = {res.long_code for seq in seqs for res in seq}
resnames -= {"UNK", "", None}
self._valid_seqres_names.update(resnames)
return seqs
def _addSeqsToChainData(self, seqs, eid, proj):
"""
Set the structure getter and setter on all sequences and add them to the
appropriate `_ChainData` object.
:param seqs: All sequences to process
:type seqs: list[sequence.Sequence]
:param eid: The entry id of the sequences
:type eid: int
:param proj: The Maestro project
:type proj: project.Project
"""
# We can't use row.getStructure here because ProjectRow objects are
# based on entry index, which can become stale when the project
# changes. Instead, we create a lambda that fetches the structure
# based on entry id, which never stales.
get_struc = lambda: proj[eid].getStructure()
set_struc = lambda struc: proj[eid].setStructure(struc)
for cur_seq in seqs:
self._entry_chain_map[eid][cur_seq.structure_chain].addSeq(cur_seq)
cur_seq._get_structure = get_struc
cur_seq._set_structure = set_struc
def _getWorkspaceVisRes(self):
"""
Determine which residues are currently in the workspace.
:return: A dictionary of [entry id as integer][chain name] = set of
residues currently visible in the workspace.
:rtype: defaultdict
"""
vis_residues = self._workspace_hub.getDispAtomsToResSet()
vis_res_by_chain = defaultdict(lambda: defaultdict(set))
for whres in vis_residues:
if not self._isSeqRes(whres):
continue
vis_res_by_chain[whres.getEntryID()][whres.getChain()].add(
self._getKeyFromWHResidue(whres).chainKey())
return vis_res_by_chain
def _initDataForEntry(self, eid, row, struc, seqs, vis_res):
"""
Create `_ChainData` objects for all chains in the specified entry.
:param eid: The entry id to generate `_ChainData` objects for.
:type eid: int
:param row: The project table row for the specified entry.
:type row: `project.ProjectRow`
:param struc: The structure for the specified entry.
:type struc: `structure.Structure`
:param seqs: A list of all sequences for the specified entry.
:type seqs: list
:param vis_res: A dictionary of [entry id as integer][chain name] = set
of residues currently visible in the workspace.
:type vis_res: defaultdict
:return: A dictionary of {chain name: `_ChainData` object}
:rtype: dict
"""
included = row.in_workspace != project.NOT_IN_WORKSPACE
residues = {
maestro_ui.WHResidue(struc.handle, i)
for i in range(1, struc.atom_total + 1)
}
res_by_chain = defaultdict(set)
for whres in residues:
if not self._isSeqRes(whres):
continue
res_by_chain[whres.getChain()].add(
self._getKeyFromWHResidue(whres).chainKey())
data = _EntryData(seqres.has_seqres(struc))
for cur_seq in seqs:
chain = cur_seq.structure_chain
chain_data = self._createChainData(eid, chain, res_by_chain[chain],
vis_res[eid][chain], included)
data[chain] = chain_data
return data
def _createChainData(self, eid, chain, all_res, vis_res, included):
"""
Create a new `_ChainData` object and connect all required signals.
See `_ChainData.__init__` for argument documentation
:return: The newly created `_ChainData` object.
:rtype: _ChainData
"""
chain_data = _ChainData(eid, chain, all_res, vis_res, included)
chain_data.wsVisibilityChangeRequested.connect(
self._setWorkspaceVisibility)
return chain_data
def _isSeqRes(self, whres):
"""
Determine if the specified residue should be included in a
sequence. Only protein and nucleic acid residues are included.
Solvents, ions, ligands, and others are excluded.
:param whres: The residue to include or exclude
:type whres: maestro_ui.WHResidue
:return: True if the residue should be included. False otherwise.
:rtype: bool
"""
return whres.getPDBName() in self._valid_seqres_names
[docs] def getWorkspaceAlignment(self):
# See AbstractStructureModel for method documentation
return self._split_workspace_aln
def _getLinkedSequences(self, aln):
"""
Get sequences from the alignment that are linked to a Maestro entry.
There is no guarantee that the entry is still present in the Maestro
project, nor that the entry still contains the corresponding chain.
:param aln: Alignment
:type aln: schrodinger.protein.alignment.BaseAlignment
:return: Existing sequences that have corresponding entries
:rtype: list(sequence.Sequence)
"""
# TODO MSV-1982 mapping may need to include project name
seqs = []
for seq in aln:
try:
int(seq.entry_id)
except (ValueError, TypeError):
continue
seqs.append(seq)
return seqs
def _projectClose(self):
"""
Respond to the project closing by clearing the workspace alignment
and clearing all sequence data stored in this class.
"""
self._closing_project = True
self.projectSaveRequested.emit(True)
self._workspace_aln.clear()
self._resetEntryChainMap()
# The undo stack will be cleared by the panel when it resets
[docs] def getMsvAutosaveProjectName(self):
"""
Get the filepath where projects should be autosaved to and autoloaded
from.
:rtype: str
"""
pt = maestro.project_table_get()
maestro.project_table_synchronize()
project_path = pt.getAdditionalDataDir()
return os.path.join(project_path, 'project.msv2')
def _resetEntryChainMap(self):
for chains in self._entry_chain_map.values():
for chain_data in chains.values():
chain_data.chainRemoved()
self._entry_chain_map.clear()
self._delayed_sync_eids.clear()
@util.skip_if("_renaming_project_entries")
@util.skip_if("_changing_maestro_visibility")
@util.skip_if("_updating_seqres")
def _projectChanged(self):
"""
If an entry was just removed from the project, stop monitoring it.
"""
try:
proj = maestro.project_table_get()
except project.ProjectException:
# The project is currently closed
return
self._closing_project = False
for cur_eid, chains in list(self._entry_chain_map.items()):
if cur_eid not in proj:
for chain_data in chains.values():
if chain_data.visibility != Inclusion.Excluded:
ws_aln = self._workspace_aln
ws_seq = chain_data.workspace_seq
if ws_seq == ws_aln.getReferenceSeq():
ws_aln.clearAnchors()
ws_aln.removeSeq(ws_seq)
chain_data.chainRemoved()
del self._entry_chain_map[cur_eid]
self._delayed_sync_eids.discard(cur_eid)
self._checkProjectTableForRenames()
def _checkProjectTableForRenames(self):
"""
Check whether Project Table entries linked to chains have been renamed
Note: Assumes the caller has already verified the presence of the
Project Table and the presence of current stored entry IDs init.
"""
pt = maestro.project_table_get()
ws_aln = self.getWorkspaceAlignment()
new_name_seqs = {}
new_name_ws_seqs = {}
for eid, chains in self._entry_chain_map.items():
row = pt[eid]
for chain in chains.values():
ws_seq = chain.workspace_seq
# Don't need to rename ws seqs that aren't in the ws aln
# (e.g. linked but excluded)
if ws_seq is not None and ws_seq in ws_aln:
if row.title != ws_seq.name:
new_name_ws_seqs[ws_seq] = row.title
for seq in chain.seqs:
if seq in self.unsynched_seqs:
continue
elif self._name_synch_aln is not None and seq not in self._name_synch_aln:
continue
if row.title != seq.name:
new_name_seqs[seq] = row.title
with self._renamingProjectEntries():
if new_name_ws_seqs:
for seq, new_name in new_name_ws_seqs.items():
self.renameSeq(seq, new_name)
if new_name_seqs:
self.seqProjectTitlesChanged.emit(
new_name_seqs, self._request_immediate_rename)
self._request_immediate_rename = False
self._name_synch_aln = None
[docs] def getLinkedAlnSeqs(self, seq):
"""
Return a set of sequences linked to the same entry ID
:param seq: Split-chain sequence to get a linked sequence set for
:type seq: sequence.ProteinSequence
:return: Set of all sequences in the alignment with the same name
linked to the entry ID.
:rtype: set(sequence.ProteinSequence)
"""
if seq in self.unsynched_seqs or seq.entry_id is None or seq.entry_id == '':
return set()
aln = self._gui_model.getAlignmentOfSequence(seq)
linked_seqs = set([seq])
if aln is not None:
for other_seq in aln:
if other_seq in self.unsynched_seqs:
continue
if other_seq.entry_id == seq.entry_id and other_seq.name == seq.name:
linked_seqs.add(other_seq)
else:
ws_aln = self.getWorkspaceAlignment()
if seq in ws_aln:
for other_seq in ws_aln:
if other_seq.entry_id == seq.entry_id:
linked_seqs.add(other_seq)
return linked_seqs
[docs] def unsynchEntryID(self, eid):
"""
Unsynch all non-Workspace sequences for a specified entry ID from the
Workspace.
:param eid: Entry ID to unsynchronize.
:def eid: int
"""
for chains in self._entry_chain_map.get(eid, ()):
for chain in chains.values():
self.unsynched_seqs.update(chain.seqs)
[docs] def renameSeq(self,
seq,
new_name,
rename_linked_seqs=False,
rename_entry=False):
"""
Rename the specified sequence.
:param seq: Sequence to rename
:type seq: sequence.ProteinSequence
:param new_name: New name for the sequence
:type new_name: str
:param rename_linked_seqs: Whether to rename linked sequences from the
same alignment. Will be ignored if the seq
is from the Workspace alignment.
:type rename_linked_seqs: bool
:param rename_entry: Whether to rename the linked Project entry. Will
be ignored if the seq is from the Workspace
alignment.
:type rename_entry: bool
"""
ws_aln = self.getWorkspaceAlignment()
if seq in ws_aln:
ws_aln.renameSeq(seq, new_name)
self.renameProjectEntry(seq.entry_id, new_name, ws_aln)
else:
aln = self._gui_model.getAlignmentOfSequence(seq)
seqs_to_rename = set([seq])
if rename_linked_seqs:
for other_seq in aln:
if other_seq.entry_id == seq.entry_id and other_seq.name == seq.name:
seqs_to_rename.add(other_seq)
for rename_seq in seqs_to_rename:
aln.renameSeq(rename_seq, new_name)
if not rename_entry:
self.unsynched_seqs.update(seqs_to_rename)
else:
self.renameProjectEntry(seq.entry_id, new_name, aln)
[docs] @util.skip_if('_renaming_project_entries')
def renameProjectEntry(self, eid, new_title, aln=None):
"""
Rename the specified Project Table entry. If an alignment is specified,
find other sequences related to this entry and alignment and request a
rename for them as well.
"""
self._name_synch_aln = aln
self._request_immediate_rename = True
pt = maestro.project_table_get()
row = pt[eid]
row.title = new_title
pt.update()
@QtCore.pyqtSlot(int, str, bool, bool)
def _setWorkspaceVisibility(self, eid, chain, visible, already_included):
"""
Show or hide the specified chain in the workspace. If showing a
chain that's not currently included in the workspace, then the
entry will be included and all other chains will be hidden.
:param eid: The entry to show or hide.
:type eid: int
:param chain: The chain to show or hide.
:type chain: str
:param visible: Whether the chain should be shown (True) or hidden
(False).
:type visible: bool
:param already_included: Whether the entry is already included in the
workspace or not.
:type already_included: bool
"""
if already_included:
struc = maestro.workspace_get()
atom_nums = self._getWorkspaceAtoms(struc, eid, chain)
with self._changingMaestroVisibility():
if visible:
self._workspace_hub.displayAtomsAdd(atom_nums)
else:
self._workspace_hub.displayAtomsRemove(atom_nums)
elif visible:
proj = maestro.project_table_get()
with self._changingMaestroVisibility():
proj[eid].in_workspace = project.IN_WORKSPACE
# make sure we fetch the workspace structure after including the
# entry, not before
struc = maestro.workspace_get()
atom_nums_to_show = self._getWorkspaceAtoms(struc, eid, chain, True)
atom_nums_to_hide = self._getWorkspaceAtoms(struc, eid, chain,
False)
with self._changingMaestroVisibility():
self._workspace_hub.displayAtomsRemove(atom_nums_to_hide)
self._workspace_hub.displayAtomsAdd(atom_nums_to_show)
# Update inclusion for other chains in this entry
entry_data = self._entry_chain_map[eid]
for chain_name, chain_data in entry_data.items():
if chain_name != chain:
chain_data.clearVisRes()
chain_data.included = True
if chain_data.workspace_seq is None:
msg = f"{eid}{chain_name} has no workspace seq"
raise ValueError(msg)
self._workspace_aln.addSeq(chain_data.workspace_seq)
else:
raise RuntimeError("Trying to hide a chain that isn't in the "
"workspace.")
def _getWorkspaceAtoms(self, struc, eid, chain, want_chain=True):
"""
Get all workspace atom numbers that either
- belong to the specified chain
- belong to anything other than the specified chain
:param struc: The workspace structure.
:type struc: `structure.Structure`
:param eid: The entry id of the specified chain.
:type eid: int
:param chain: The specified chain.
:type chain: str
:param want_chain: Whether to return all atoms in the specified chain
(True) or all atoms not in the specified chain (False)
:type want_chain: bool
:return: A list of atom numbers.
:rtype: list
"""
negation = "" if want_chain else "not "
asl = ('entry.id %s and %schain.name "%s" and (protein or '
'nucleic_acids)' % (eid, negation, chain))
return analyze.evaluate_asl(struc, asl)
# @QtCore.pyqtSlot("QList<int>", "QList<int>")
@util.skip_if("_closing_project")
@util.skip_if("_changing_maestro_visibility")
def _inclusionChanged(self, included, excluded):
"""
Update the workspace alignment when entry inclusion changes.
:param included: A list of all entry ids that were just included.
:type included: list[int]
:param excluded: A list of entry ids that were just excluded.
:type excluded: list[int]
"""
# We don't synchronize residue selection for newly included entries
# until the user changes residue selection in either the workspace or
# the MSV. _delayed_sync_eids keeps track of entries that need their
# residue selection synchronized when that happens.
self._delayed_sync_eids.update(included)
self._delayed_sync_eids.difference_update(excluded)
self._delayed_sync_eids.discard(SCRATCH_ENTRY_ID)
# Remember these entries so we know to ignore them in _residuesChanged.
self._inclusion_changing = set(included + excluded)
self._setEntryInclusion(included, True)
self._setEntryInclusion(excluded, False)
# TODO: allow inclusion changes to be undone from the MSV (MSV-2192)
self.undo_stack.clear()
def _setEntryInclusion(self, eids, included):
"""
Update the workspace alignment when entry inclusion changes.
:param eids: A list of entry ids that were either included or excluded.
:type eids: list[int]
:param included: Whether the entries were included (True) or excluded
(False).
:type included: bool
"""
eids_to_init, seqs_to_add, seqs_to_remove = \
self._parseEntryInclusion(eids, included)
ws_aln = self._workspace_aln
ref_seq = ws_aln.getReferenceSeq()
if ref_seq is None:
ref_seq_eid = None
else:
ref_seq_eid = int(ref_seq.entry_id)
if not included and ref_seq_eid in eids:
ws_aln.clearAnchors()
if seqs_to_add:
ws_aln.addSeqs(seqs_to_add)
if seqs_to_remove:
if not self._gui_model.getWorkspacePage().split_chain_view:
# get the combined-chain sequences to remove (since
# seqs_to_remove currently contains split-chain sequences)
seqs_to_remove = [
seq for seq in ws_aln if int(seq.entry_id) in eids
]
with self._syncingInclusion():
ws_aln.removeSeqs(seqs_to_remove)
if eids_to_init:
self._initEidsInWorkspaceAln(sorted(eids_to_init))
def _parseEntryInclusion(self, eids, included):
"""
Figure out what changes need to be made in the workspace alignment when
entry inclusion changes. Note that this method does not make any
changes in the alignment. See `_setEntryInclusion` for that.
:param eids: A list of entry ids that were either included or excluded.
:type eids: list(int)
:param included: Whether the entries were included (True) or excluded
(False).
:type included: bool
:return: A tuple of:
- The entry ids of structures that need to be initialized for
inclusion in the workspace alignment.
- A list of split-chain sequences to add to the workspace alignment.
- A list of split-chain sequences to remove from the workspace
alignment.
:rtype: tuple(set(int), list(sequence.ProteinSequence),
list(sequence.ProteinSequence))
"""
eids_to_init = set()
seqs_to_add = list()
seqs_to_remove = list()
for cur_eid in eids:
if cur_eid <= 0:
# ignore scratch entries
pass
elif cur_eid in self._entry_chain_map:
for cname, chain_data in self._entry_chain_map[cur_eid].items():
chain_data.included = included
if chain_data.workspace_seq is None:
eids_to_init.add(cur_eid)
continue
ws_seq = chain_data.workspace_seq
if included:
seqs_to_add.append(ws_seq)
else:
seqs_to_remove.append(ws_seq)
elif included:
eids_to_init.add(cur_eid)
else: # trying to exclude an eid not found in self._entry_chain_map
# We don't have to do anything since the chain to exclude has
# already been removed. This can happen after Maestro undoes
# the importing of an entry into the workspace.
pass # Deliberately left here to record intention.
return eids_to_init, seqs_to_add, seqs_to_remove
def _getKeyFromWHResidue(self, whres):
"""
Turn a workspace hub residue object into a key that can uniquely
identify it in the workspace. This key is used to help map between
residues in the MSV workspace alignment and residues in the maestro
workspace.
:param whres: the residue to turn into a key
:type whres: maestro_ui.WHResidue
:returns: a unique key representing the residue
:rtype: residue.ResidueKey
"""
# TODO MSV-2379: Consider all items in WHResidue.d_hash
return residue.ResidueKey(int(whres.getEntryID()), whres.getChain(),
whres.getResNum(), whres.getInsCode())
def _getKeyFromStructureResidue(self, structure_res):
first_atom = next(iter(structure_res.atom))
eid = first_atom.entry_id
ch = structure_res.chain
resnum = structure_res.resnum
inscode = structure_res.inscode
return residue.ResidueKey(int(eid), ch, resnum, inscode)
# @QtCore.pyqtSlot("QSet<WHResidue>", "QSet<WHResidue>",
# "QHash<WHResidue,QSet<WHResidue> >")
@util.skip_if("_closing_project")
@util.skip_if("_changing_maestro_visibility")
@util.skip_if("_updating_seqres")
@util.skip_if("_updating_color")
def _residuesUpdated(self, removed, added, updated):
"""
Update sequences in response to any workspace residue additions,
removals, or mutations.
:param removed: Residues that were removed from the workspace structure.
:type removed: set[maestro_ui.WHResidue]
:param added: Residues that were added to the workspace structure.
:type added: set[maestro_ui.WHResidue]
:param updated: Residues that were modified in the workspace structure,
given as a dictionary of {old residue: set of new residues}.
:type updated: dict(maestro_ui.WHResidue, set(maestro_ui.WHResidue))
"""
# Changing inclusion triggers a residuesChanged signal, so we filter out
# any entry ids that have been included or excluded. We also ignore -1,
# which is the scratch entry id.
eids_to_ignore = self._inclusion_changing | {SCRATCH_ENTRY_ID}
self._inclusion_changing.clear()
(new_by_chain, deleted_by_chain, mutated_by_chain,
added_chains, chain_renames, modified_eids) = \
self._parseUpdatedResidues(added, removed, updated, eids_to_ignore)
self._createNewChains(added_chains)
self._deleteRemovedResidues(deleted_by_chain)
self._mutateResidues(mutated_by_chain)
self._insertNewResidues(new_by_chain)
self._deleteEmptyChains(deleted_by_chain)
self._renameChains(chain_renames)
self._updateSeqres(modified_eids)
if modified_eids:
self.undo_stack.clear()
def _parseUpdatedResidues(self, added, removed, updated, eids_to_ignore):
"""
Create lists of new, removed, and mutated residues by chain.
:param added: Residues that were added to the workspace structure.
:type added: list[maestro_ui.WHResidue]
:param removed: Residues that were removed from the workspace structure.
:type removed: list[maestro_ui.WHResidue]
:param updated: Residues that were modified in the workspace structure,
given as a dictionary of {old residue: set of new residues}.
:type updated: dict(maestro_ui.WHResidue, set(maestro_ui.WHResidue))
:param eids_to_ignore: A set of entry ids that we should exclude from
the return values. Used for entries that have been included or
excluded from the workspace or for scratch entries.
:type eids_to_ignore: set(int)
:return: A tuple of:
- New residues for existing sequences, reported as
{(entry id, chain): a set of NewResInfo objects}
- Deleted residues, reported as {(entry id, chain): a dictionary of
{(residue number, insertion code): residue name}}
- Mutated residues, reported as {(entry id, chain): a dictionary of
{(residue number, insertion code): new residue name}}
- New residues for new chains, reported as {(entry id, chain):
a set of NewResInfo objects}
- Chain renames, reported as a list of (entry id, old chain name,
new chain name) tuples
- Entry ids of all modified entries
:rtype: tuple(defaultdict, defaultdict, defaultdict, defaultdict, list,
set)
"""
modified_eids = set()
(added_from_updates, removed_from_updates, mutated_by_chain,
chain_renames) = self._parseModifiedResidues(updated, eids_to_ignore,
modified_eids)
added_res = self._whresSet(added, eids_to_ignore)
added_res.update(added_from_updates)
new_by_chain, added_chains = self._parseAddedResidues(
added_res, modified_eids)
removed_res = self._whresSet(removed, eids_to_ignore)
removed_res.update(removed_from_updates)
deleted_by_chain = self._parseRemovedResidues(removed_res,
modified_eids)
return (new_by_chain, deleted_by_chain, mutated_by_chain, added_chains,
chain_renames, modified_eids)
def _parseModifiedResidues(self, updated, eids_to_ignore, modified_eids):
"""
Parse the updated residues reported by the WorkspaceHub's
residuesUpdated signal.
:param updated: The updated residues.
:type updated: dict(maestro_ui.WHResidue, set(maestro_ui.WHResidue))
:param eids_to_ignore: A set of entry ids that we should exclude from
the return values.
:type eids_to_ignore: set(int)
:param modified_eids: A set of entry ids for modified residues. Will be
updated with the entry ids of any updated residues.
:type modified_eids: set(int)
:return: A tuple of
- Residues that should be handled as new residues.
- Residues that should be handled as deleted residues.
- Residues that have been mutated (The residue type changed, but
residue number, insertion code, etc remained the same). Given as
{(entry id, chain): a dictionary of
{(residue number, insertion code): new residue name}}
- Chains that have been renamed. Given as a list of
(entry id, old chain name, new chain name) tuples.
:rtype: tuple(set(WHResInfo), set(WHResInfo),
defaultdict(tuple(int, str), defaultdict(tuple(int, str), str)),
list(tuple(int, str, str)))
"""
res_to_remove = set()
res_to_add = set()
mutated_by_chain, chain_renamed_res = \
self._parseMutationsAndPotentialChainRenames(
updated, eids_to_ignore, modified_eids, res_to_remove,
res_to_add)
chain_renames = self._parseChainRenames(chain_renamed_res,
res_to_remove, res_to_add)
return (res_to_add, res_to_remove, mutated_by_chain, chain_renames)
def _parseMutationsAndPotentialChainRenames(self, updated, eids_to_ignore,
modified_eids, res_to_remove,
res_to_add):
"""
Parse the updated residues reported by the WorkspaceHub's
residuesUpdated signal for mutated residues and residues where only the
chain name has changed.
:param updated: The updated residues.
:type updated: dict(maestro_ui.WHResidue, set(maestro_ui.WHResidue))
:param eids_to_ignore: A set of entry idds that we should exclude from
the return values.
:type eids_to_ignore: set(int)
:param modified_eids: A set of entry ids for modified residues. Will be
updated with the entry ids of any updated residues.
:type modified_eids: set(int)
:param res_to_remove: A set of residues that should be handled as
deleted residues. Will be updated based on the contents of
`updated`.
:type res_to_remove: set(WHResInfo)
:param res_to_add: A set of residues that should be handled as
new residues. Will be updated based on the contents of `updated`.
:type res_to_add: set(WHResInfo)
:return: A tuple of:
- Residues that have been mutated (The residue type changed, but
residue number, insertion code, etc remained the same). Given as
{(entry id, chain): a dictionary of
{(residue number, insertion code): new residue name}}
- Residues where only the chain name has been changed and the new
chain name didn't previously exist in the entry. Given as nested
dictionaries of
chain_renamed_res[entry_id][old_chain_name][new_chain_name] =
list of (old_WHResInfo, new_WHResInfo) tuples
:rtype: tuple(
defaultdict(tuple(int, str), dict(tuple(int, str), str)),
defaultdict(int, defaultdict(str, defaultdict(
str, list[tuple(WHResInfo, WHResInfo)]))))
"""
mutated_by_chain = defaultdict(dict)
chain_renamed_res = defaultdict(
partial(defaultdict, partial(defaultdict, list)))
for old_res, cur_updated in updated.items():
eid = old_res.getEntryID()
if eid in eids_to_ignore or not self._isSeqRes(old_res):
continue
modified_eids.add(eid)
old_res = WHResInfo.fromWHRes(old_res)
cur_updated = self._whresSet(cur_updated, eids_to_ignore)
if len(cur_updated) == 1:
new_res = next(iter(cur_updated))
eid_and_resnum_match = (old_res.eid, old_res.resnum,
old_res.inscode) == (new_res.eid,
new_res.resnum,
new_res.inscode)
resname_match = (old_res.resname == new_res.resname)
chain_match = (old_res.chain == new_res.chain)
if (eid_and_resnum_match and resname_match and
not chain_match and
new_res.chain not in self._entry_chain_map[eid]):
# is a potential chain rename
eid = old_res.eid
chain_renamed_res[eid][old_res.chain][new_res.chain].append(
(old_res, new_res))
continue
elif eid_and_resnum_match and chain_match and not resname_match:
# is a mutation
entry_key = old_res.entryKey()
chain_key = old_res.chainKey()
mutated_by_chain[entry_key][chain_key] = new_res.resname
continue
# is neither a mutation nor a potential chain rename
if old_res in cur_updated:
# only some of the atoms in the residue got updated, so the
# residue itself still exists
cur_updated.remove(old_res)
else:
res_to_remove.add(old_res)
res_to_add.update(cur_updated)
return mutated_by_chain, chain_renamed_res
def _parseChainRenames(self, chain_renamed_res, res_to_remove, res_to_add):
"""
Given residues that have had their chain name changed, find chains where
all structured residues have been moved to a single new chain (i.e.
the whole chain has been renamed).
:param chain_renamed_res: Residues where only the chain name has
changed and there's no existing sequence for the new chain. Given
as nested dictionaries of
chain_renamed_res[entry_id][old_chain_name][new_chain_name] =
list of (old_WHResInfo, new_WHResInfo) tuples
:type chain_renamed_res: dict(int, dict(str, dict(
str, list[tuple(WHResInfo, WHResInfo)]))))
:param res_to_remove: A set of residues that should be handled as
deleted residues. Will be updated based on the contents of
`updated`.
:type res_to_remove: set(WHResInfo)
:param res_to_add: A set of residues that should be handled as
new residues. Will be updated based on the contents of `updated`.
:type res_to_add: set(WHResInfo)
:return: Chains that have been renamed. Given as a list of
(entry id, old chain name, new chain name) tuples.
:rtype: list(tuple(int, str, str))
"""
chain_renames = []
for eid, renamed_chains in chain_renamed_res.items():
for old_chain_name, new_chains in renamed_chains.items():
if len(new_chains) == 1:
new_chain_name, new_chain = next(iter(new_chains.items()))
seq = \
self._entry_chain_map[eid][old_chain_name].workspace_seq
if len(new_chain) == seq.structuredResidueCount():
# All structured residues in the old chain were moved to
# the same new chain, so this is a chain rename
chain_renames.append(
(eid, old_chain_name, new_chain_name))
continue
# This isn't a chain rename, so we instead want to manually
# remove residues from the old chain and create a new chain with
# new residue objects
for new_chain_name, updated_res in new_chains.items():
for old_res, new_res in updated_res:
res_to_remove.add(old_res)
res_to_add.add(new_res)
return chain_renames
def _parseAddedResidues(self, added_res, modified_eids):
"""
Parse the list of added residues to determine which residues have been
added to existing chains and which have been added to new chains.
:param added_res: Residues that were added to the workspace structure.
:type added_res: set(WHResInfo)
:param modified_eids: A set of entry ids for modified residues. Will be
updated with the entry ids of any added residues.
:type modified_eids: set(int)
:return: A tuple of:
- New residues for existing sequences, reported as
{(entry id, chain): a set of NewResInfo objects}
- New residues for new chains, reported as {(entry id, chain):
a set of NewResInfo objects}
:rtype: tuple(defaultdict(tuple(int, str), set(NewResInfo)),
defaultdict(tuple(int, str), set(NewResInfo)))
"""
new_by_chain = defaultdict(set)
added_chains = defaultdict(set)
for res_info in added_res:
entry_key = res_info.entryKey()
chain_key = res_info.chainKey()
resname = res_info.resname
modified_eids.add(res_info.eid)
if res_info.chain in self._entry_chain_map[res_info.eid]:
new_by_chain[entry_key].add(NewResInfo(*chain_key, resname))
else:
added_chains[entry_key].add(
NewResInfo(*chain_key, resname, res_info.is_na))
return new_by_chain, added_chains
def _parseRemovedResidues(self, removed_res, modified_eids):
"""
Convert a list of removed residues to a dictionary organized by entry
and chain information.
:param removed_res: Residues that were added to the workspace structure.
:type removed_res: set(WHResInfo)
:param modified_eids: A set of entry ids for modified residues. Will be
updated with the entry ids of any added residues.
:type modified_eids: set(int)
:return: Deleted residues, reported as {(entry id, chain): a dictionary
of {(residue number, insertion code): residue name}}
:rtype: defaultdict(tuple(int, str), dict(tuple(int, str), str))
"""
deleted_by_chain = defaultdict(dict)
for res_info in removed_res:
entry_key = res_info.entryKey()
chain_key = res_info.chainKey()
deleted_by_chain[entry_key][chain_key] = res_info.resname
modified_eids.add(res_info.eid)
return deleted_by_chain
def _whresSet(self, residues, eids_to_ignore):
"""
Convert a list of residues into a set of tuples that describe the
residues.
Note that WHResidue objects are hashable, but equality is defined using
identity, so subtracting two sets of WHResidues directly doesn't give
the expected results. The sets returned by this method avoid that
problem.
:param residues: The list of residues to convert
:type residues: list[maestro_ui.WHResidue]
:param eids_to_ignore: A set of entry ids that we should exclude from
the return values. Used for entries that have been included or
excluded from the workspace or for scratch entries.
:type eids_to_ignore: set(int)
:return: A set of (entry id, chain, residue number, insertion code,
residue name, and whether the res is protein)
:rtype: set(WHResInfo)
"""
res_set = set()
for whres in residues:
eid = whres.getEntryID()
if eid not in eids_to_ignore and self._isSeqRes(whres):
res_data = WHResInfo.fromWHRes(whres)
res_set.add(res_data)
return res_set
def _createNewChains(self, added_chains):
"""
Create new sequences and add them to the workspace alignment for all
chains that were added to the workspace structure.
:param added_chains: A dictionary of all residues to add, given as
{(entry id, chain): a set of NewResInfo objects}
:type added_chains: dict
"""
for (eid, chain), res_info in added_chains.items():
all_res = {res.chainKey() for res in res_info}
vis_res = self._getWorkspaceVisRes()
chain_data = self._createChainData(eid, chain, all_res,
vis_res[eid][chain], True)
self._entry_chain_map[eid][chain] = chain_data
seq = self._createSeqForNewChain(res_info, eid, chain)
self._addSeqsToChainData([seq], eid, maestro.project_table_get())
chain_data.workspace_seq = seq
self._workspace_aln.addSeq(seq)
def _createSeqForNewChain(self, res_info, eid, chain):
"""
Create a sequence object describing a chain that was just added to the
workspace structure.
:param res_info: The new residues to add to the sequence
:type res_info: set(NewResInfo)
:param eid: The entry id of the new chain
:type eid: int
:param chain: The chain name of the new chain
:type chain: str
:return: The newly created sequence
:rtype: sequence.Sequence
:note: This method orders new sequences based on residue number and
insertion code, not connectivity. That is identical to the behavior
of `seqio.StructureConverter._extractChains`. If that method ever
changes, this method should be updated as well to preserve the
consistency.
"""
# decide if chain is nucleic acid or protein
res_names = {info.resname for info in res_info}
SeqClass = sequence.guess_seq_type(res_names)
new_seqres = []
for cur_res_info in sorted(res_info):
res = SeqClass.makeSeqElement(cur_res_info.resname)
res.resnum = cur_res_info.resnum
res.inscode = cur_res_info.inscode
new_seqres.append(res)
# we pull name, long_name, PDB ID, etc from an arbitrary sequence from the
# same entry
chain_data = next(iter(self._entry_chain_map[eid].values()))
other_seq = chain_data.workspace_seq
return SeqClass(new_seqres,
name=other_seq.name,
chain=chain,
structure_chain=chain,
long_name=other_seq.long_name,
entry_id=eid,
entry_name=other_seq.entry_name,
pdb_id=other_seq.pdb_id,
origin=SeqClass.ORIGIN.Maestro)
def _deleteRemovedResidues(self, deleted_by_chain):
"""
Delete all sequence residues that were removed from the workspace
structure.
:param deleted_by_chain: A dictionary of all residues to remove, given
as {(entry id, chain): a dictionary of {(residue number, insertion
code): residue name}}
:type deleted_by_chain: dict
"""
for (eid, chain), deleted_res in deleted_by_chain.items():
chain_data = self._entry_chain_map[eid][chain]
for seq in chain_data.seqs:
seq_res_to_remove = []
for seq_res in seq:
if seq_res.is_gap or not seq_res.hasSetResNum():
continue
res_key = seq_res.getChainKey()
deleted_code = deleted_res.get(res_key)
if deleted_code is not None:
if seq_res.long_code != deleted_code:
res_info = "".join(map(str, res_key))
warnings.warn(
seqio.SequenceWarning(
f'Sequence residue {res_info} is '
f'{seq_res.long_code}, expected {deleted_code}'
))
seq_res_to_remove.append(seq_res)
# If the sequence belongs to an alignment, remove the residues
# through the alignment api.
seq_page_info = self._gui_model.getPageInfoForSequence(seq)
if seq_page_info is None:
seq.removeElements(seq_res_to_remove)
else:
aln = seq_page_info.aln
if not seq_page_info.split_chain_view:
seq_res_to_remove = list(
map(aln.combinedResForSplitRes, seq_res_to_remove))
with aln.modifyingStructure():
with self._notifyMaestroIfAnchoredRemoved(
aln, 'removed'):
aln.removeAnchors(seq_res_to_remove)
aln.removeElements(seq_res_to_remove)
@contextlib.contextmanager
def _notifyMaestroIfAnchoredRemoved(self, aln, why):
maestro_hub = maestro_ui.MaestroHub.instance()
# Lambda slots with references to QObjects may cause problems with
# garbage collection. To avoid this, we replace maestro hub with a
# weakref.
maestro_hub = weakref.proxy(maestro_hub)
slot = lambda: maestro_hub.emitAddBanner(
f'Anchors were removed in MSV to adjust for {why} residues.', '',
'', '')
aln.signals.anchoredResiduesChanged.connect(slot)
yield
aln.signals.anchoredResiduesChanged.disconnect(slot)
def _deleteEmptyChains(self, deleted_by_chain):
"""
If any chains are now empty, remove them from all alignments.
:param deleted_by_chain: A dictionary of all residues that have been
removed, given as {(entry id, chain): a dictionary of {(residue
number, insertion code): residue name}}. Note that this method only
pays attention to the keys of this dictionary, not the values.
:type deleted_by_chain: dict
"""
for (eid, chain) in deleted_by_chain.keys():
chain_data = self._entry_chain_map[eid][chain]
if chain_data.workspace_seq.hasStructuredResidues():
continue
for seq in chain_data.seqs:
seq_page_info = self._gui_model.getPageInfoForSequence(seq)
if seq_page_info is not None:
if seq_page_info.split_chain_view:
seq_page_info.aln.removeSeq(seq)
elif not seq_page_info.seq.hasStructuredResidues():
# The entire combined-chain sequence is empty, so we
# remove it. This will also remove all the chains from
# the split-chain alignment.
seq_page_info.aln.removeSeq(seq_page_info.seq)
else:
# The chain is empty, but there are still residues in
# other chains of the combined-chain sequence.
if len(seq):
# First remove any remaining gaps. If there are
# downstream anchors, this will make sure that new
# gaps get added to make up for the ones we're
# removing.
aln = seq_page_info.aln
gaps_to_remove = list(
map(aln.combinedResForSplitRes, seq))
seq_page_info.aln.removeElements(gaps_to_remove)
# remove the chain from the split-chain alignment.
seq_page_info.split_aln.removeSeq(seq)
# Remove the chain from the combined-chain sequence.
# This is done in a non-undoable manner, but we can't
# undo this operation anyway because it involves
# Maestro. We don't have to worry about anchoring here
# since we know that the sequence is empty.
seq_page_info.seq.removeChain(seq)
chain_data.chainRemoved()
del self._entry_chain_map[eid][chain]
def _mutateResidues(self, mutated_by_chain):
"""
Mutate all sequence residues that were mutated in workspace structure
:param mutated_by_chain: A dictionary of residues to mutate, given as
{(entry id, chain): a dictionary of {(residue number, insertion
code): new residue name}}
:type mutated_by_chain: dict
"""
for (eid, chain), mutated_res in mutated_by_chain.items():
chain_data = self._entry_chain_map[eid][chain]
for seq in chain_data.seqs:
old_seq_res = set(
res for res in seq
if (not res.is_gap and res.getChainKey() in mutated_res))
seq_page_info = self._gui_model.getPageInfoForSequence(seq)
if seq_page_info is not None:
aln = seq_page_info.aln
if seq_page_info.split_chain_view:
to_unanchor = old_seq_res
else:
to_unanchor = set(
map(aln.combinedResForSplitRes, old_seq_res))
with self._notifyMaestroIfAnchoredRemoved(aln, 'mutated'):
aln.removeAnchors(to_unanchor)
for res in old_seq_res:
new_resname = mutated_res[res.getChainKey()]
self._mutateRes(seq, res.idx_in_seq, res, new_resname)
def _mutateRes(self, seq, index, cur_res, resname):
"""
Mutate the specified sequence residue
:param seq: The sequence containing the residue to mutate
:type seq: sequence.ProteinSequence
:param index: The index of the residue to mutate
:type index: int
:param cur_res: The sequence residue object for the residue to mutate
:type cur_res: residue.Residue
:param resname: The residue name to mutate to
:type resname: str
"""
mutated_seq_res = seq.makeSeqElement(resname)
mutated_seq_res.resnum = cur_res.resnum
mutated_seq_res.inscode = cur_res.inscode
seq_page_info = self._gui_model.getPageInfoForSequence(seq)
if seq_page_info is None:
seq.mutate(index, index + 1, mutated_seq_res)
else:
seq_idx = seq_page_info.aln.index(seq_page_info.seq)
if not seq_page_info.split_chain_view:
index += seq_page_info.chain_offset
cur_res = seq_page_info.seq[index]
with seq_page_info.aln.modifyingStructure():
seq_page_info.aln.mutateResidues(seq_idx, index, index + 1,
[mutated_seq_res])
def _insertNewResidues(self, new_by_chain):
"""
Insert sequence residues (or convert structureless sequence residues to
structured) for all new residues in the workspace structure.
:param new_by_chain: A dictionary of residues to insert, given as
{(entry id, chain): a set of NewResInfo objects}
:type new_by_chain: dict
:note: This method assumes that sequences are ordered based on residue
number and insertion code, not connectivity. That is currently the
case for all sequences with structures due to the implementation of
`seqio.StructureConverter._extractChains`. If that method ever
changes, this method must be updated as well.
"""
for (eid, chain), new_residues_orig in new_by_chain.items():
new_residues_orig = sorted(new_residues_orig)
chain_data = self._entry_chain_map[eid][chain]
for seq in chain_data.seqs:
new_residues = new_residues_orig.copy()
# iterate backwards through the sequence and search for the
# first residue that matches or is before the last item on our
# new_residues list
for seq_i, seq_res in reversed(list(enumerate(seq))):
if seq_res.is_gap:
continue
if new_residues:
new_res_info = new_residues[-1].chainKey()
else:
# we've inserted all of the new residues into this
# sequence
break
seq_res_info = seq_res.getChainKey()
if seq_res_info == new_res_info:
if seq_res.seqres_only:
# we're converting a structureless residue to
# structured
resname = new_residues[-1].resname
if resname == seq_res.long_code:
seq_res.seqres_only = False
else:
# the structured residue is of a different type,
# so handle it as a mutation
self._mutateRes(seq, seq_i, seq_res, resname)
# If we're not adding a structure for a structureless
# residue, then this residue is probably being reported
# because of MAE-41133 and we can ignore it.
new_residues.pop()
elif seq_res_info < new_res_info:
# list.insert(0, elem) is O(N), so we use a deque here
# instead
res_info_to_add = deque()
while (new_residues and
seq_res_info < new_residues[-1].chainKey()):
# figure out if we need to insert more than one
# residue here.
res_info_to_add.appendleft(new_residues.pop())
self._addNewResToSeq(res_info_to_add, seq, seq_i + 1)
else:
if new_residues:
# residues were added to the beginning of the structure
self._addNewResToSeq(new_residues, seq, 0)
def _addNewResToSeq(self, res_info_to_add, split_seq, index):
"""
Insert a new sequence residue at the specified position
:param res_info_to_add: The residue number, insertion code, and residue
name for the residue to add.
:type res_info_to_add: NewResInfo
:param split_seq: The sequence to insert the residue into.
:type split_seq: sequence.ProteinSequence
:param index: The sequence index to insert the new residue at.
:type index: int
"""
res_to_add = []
for cur_res_info in res_info_to_add:
res = split_seq.makeSeqElement(cur_res_info.resname)
res.resnum = cur_res_info.resnum
res.inscode = cur_res_info.inscode
res_to_add.append(res)
# If the sequence belongs to an alignment, add the residues
# through the alignment api.
seq_page_info = self._gui_model.getPageInfoForSequence(split_seq)
if seq_page_info is None:
split_seq.insertElements(index, res_to_add)
else:
if seq_page_info.split_chain_view:
seq = split_seq
else:
seq = seq_page_info.seq
index += seq.offsetForChain(split_seq)
aln = seq_page_info.aln
with self._notifyMaestroIfAnchoredRemoved(aln, 'inserted'):
aln.removeAnchors(seq[index:])
aln.addElements(seq, index, res_to_add)
def _renameChains(self, chain_renames):
"""
Rename the specified chains
:param chain_renames: A list of (entry id, old chain name,
new chain name) tuples for chains to rename
:type chain_renames: list[tuple(int, str, str)]
"""
for eid, old_chain, new_chain in chain_renames:
chain_data = self._entry_chain_map[eid][old_chain]
for seq in chain_data.seqs:
seq.chain = new_chain
chain_data.chain = new_chain
del self._entry_chain_map[eid][old_chain]
self._entry_chain_map[eid][new_chain] = chain_data
def _updateSeqres(self, modified_eids):
"""
Update SEQRES records for all specified entries.
:param modified_eids: The entry ids for the entries to update.
:type modified_eids: set(int)
"""
proj = maestro.project_table_get()
for eid in modified_eids:
entry_data = self._entry_chain_map[eid]
if not entry_data.has_seqres:
continue
has_structureless = False
for chain_data in entry_data.values():
# all of the sequences in a ChainData object are identical
# except for gaps, so we only need to check one of them
seq = next(iter(chain_data.seqs))
if any(res.seqres_only for res in seq if not res.is_gap):
has_structureless = True
break
if has_structureless:
cur_seqres = {}
for chain, chain_data in sorted(entry_data.items()):
seq = next(iter(chain_data.seqs))
cur_seqres[chain] = [
res.long_code for res in seq if not res.is_gap
]
else:
# There are no structureless residues, so the SEQRES records are
# completely redundant with the structure itself. We clear the
# SEQRES data so that we won't need to continue to keep them in
# sync with the structure.
cur_seqres = None
entry_data.has_seqres = False
struc = proj[eid].getStructure()
seqres.set_seqres(struc, cur_seqres)
with self._updatingSeqres():
# There's no need to sync the workspace since we've only changed
# unstructured residues
proj[eid].setStructure(struc, sync_workspace=False)
def _getIncludedNonScratchEntryIDs(self):
"""
Return a list of non-scratch entry IDs included in the Workspace.
:return: List of non-scratch entry IDs currently included in Workspace
:rtype: list(int)
"""
all_eids = map(int, maestro.get_included_entry_ids())
return [e for e in all_eids if e > 0]
def _getEIDsForAtomIndexesList(self, atom_indexes_list):
"""
Given a list of lists of Workspace atom indexes, return a generator
of their entry IDs.
:param atom_indexes_list: List of lists of atom indexes
:type atom_indexes_list: list(list(int))
:return: Set of entry IDs for the atom indexes.
:rtype: set(int)
"""
atom_idxs = itertools.chain(*atom_indexes_list)
struc = maestro.workspace_get()
eids = set()
for idx in atom_idxs:
try:
eid = int(struc.atom[idx].entry_id)
except ValueError:
continue
if eid > 0:
eids.add(eid)
return eids
# @QtCore.pyqtSlot("QList<QList<int> >")
@util.skip_if("_changing_maestro_visibility")
def _ligandAtomsChanged(self, atom_indexes_list):
"""
Update sequences in response to any workspace ligand atom additions,
removals, or mutations.
:param atom_indexes_list: A list of list of indexes of atoms that were
changed in workspace structures.
:type atom_indexes_list: list
"""
if not atom_indexes_list:
# MSV-1554 - WorkspaceHub emits empty list when all ligand atoms
# are deleted.
eids = self._getIncludedNonScratchEntryIDs()
else:
eids = self._getEIDsForAtomIndexesList(atom_indexes_list)
self._ligandsChangedForEntryIDs(eids)
def _getEIDsForResiduesList(self, residues_list):
"""
Given a list of residues, return a generator of their entry IDs.
:param residues_list: List of residues to get entry IDs of
:type residues_list: list(list(schrodinger.structure._Residue))
:return: Set of entry IDs
:rtype: set(int)
"""
residues = itertools.chain(*residues_list)
eids = set()
for res in residues:
try:
eid = int(res.getEntryID())
except ValueError:
continue
if eid > 0:
eids.add(eid)
return eids
# @QtCore.pyqtSlot("QList<QList<WHResidue> >")
@util.skip_if("_changing_maestro_visibility")
def _ligandResiduesChanged(self, residues_list):
"""
Update sequences in response to any workspace ligand residue additions,
removals, or mutations.
:param residues_list: A list of list of residues
(`maestro_ui.WHResidue`) that were changed in workspace structures.
:type residues_list: list
"""
if not residues_list:
# MSV-1544 - WorkspaceHub passes an empty list when all ligand
# residues have been deleted.
eids = self._getIncludedNonScratchEntryIDs()
else:
eids = self._getEIDsForResiduesList(residues_list)
self._ligandsChangedForEntryIDs(eids)
def _ligandsChangedForEntryIDs(self, eids):
"""
Send onStructureChanged signals for the specified entry ids.
:param eids: Entry IDs that have changed
:type eids: iterable(int)
"""
for eid in eids:
for chain_data in self._entry_chain_map[eid].values():
for seq in chain_data.seqs:
seq.onStructureChanged()
# @QtCore.pyqtSlot("QSet<WHResidue>", "QSet<WHResidue>")
@util.skip_if("_closing_project")
@util.skip_if("_changing_maestro_visibility")
def _residueDisplayChanged(self, added, removed):
"""
Update sequence visibility in response to any workspace residues being
shown or hidden.
:param added: A set of residues (`maestro_ui.WHResidue`) that
were shown in the workspace.
:type added: set
:param removed: A set of residues (`maestro_ui.WHResidue`) that
were hidden in the workspace.
:type removed: set
"""
# by_chain[entry id][chain name] = (set_of_added_residues,
# set_of_removed_residues)
by_chain = defaultdict(lambda: defaultdict(lambda: (set(), set())))
self._resByChain(by_chain, added, 0)
self._resByChain(by_chain, removed, 1)
for cur_eid, chains in by_chain.items():
for cur_chain, (added_chain, removed_chain) in chains.items():
try:
chain_data = self._entry_chain_map[cur_eid][cur_chain]
except KeyError:
# It's possible that this entire chain was categorized as a
# ligand (or some other type of residue that we don't care
# about) by seqio.StructureConverter but not by
# self._isSeqRes since StructureConverter is more thorough
# about excluding things. If that's the case, there's
# nothing to update and we can safely ignore information
# about this chain.
pass
else:
chain_data.updateVisRes(added_chain, removed_chain)
def _resByChain(self, by_chain, residues, i):
"""
Organize a list of residues by entry and chain.
:param by_chain: A dictionary of [entry id][chain name] = tuple of sets
:type by_chain: defaultdict
:param residues: A set of residues (`maestro_ui.WHResidue`) to
organize.
:type residues: list
:param i: The index of the set that residues should be added to.
:type i: int
"""
for whres in residues:
if not self._isSeqRes(whres):
continue
eid = whres.getEntryID()
if eid > 0: # ignore scratch entries
chain = whres.getChain()
by_chain[eid][chain][i].add(
self._getKeyFromWHResidue(whres).chainKey())
# @QtCore.pyqtSlot("QSet<WHResidue>", "QSet<WHResidue>")
@util.skip_if("_syncing_selection")
@util.skip_if("_closing_project")
@util.skip_if("_changing_maestro_visibility")
def _residueSelectionChanged(self, selected, deselected):
"""
Update MSV residue selection in response to any workspace residues being
selected or deselected.
:param selected: A set of residues (`maestro_ui.WHResidue`) that
were selected in the workspace.
:type selected: set
:param deselected: A set of residues (`maestro_ui.WHResidue`) that
were deselected in the workspace.
:type deselected: set
"""
selected_res_keys = [
self._getKeyFromWHResidue(whres) for whres in selected
]
deselected_res_keys = [
self._getKeyFromWHResidue(whres) for whres in deselected
]
with self._changingMaestroVisibility(), self._syncingSelection():
self._delayedSyncFromWorkspaceToMsv()
self._setMSVResSelection(selected_res_keys, True)
self._setMSVResSelection(deselected_res_keys, False)
def _delayedSyncFromWorkspaceToMsv(self):
"""
When new entries are included in the workspace, their residue selection
is not automatically synchronized until selection is changed in either
the workspace or in the MSV. This method synchronizes residue selection
for those entries by replacing residue selection in the MSV with
residues selected in the workspace.
This method should only be called from inside a `_syncingSelection`
block.
"""
if not self._delayed_sync_eids:
return
ws_sel = self._workspace_hub.getSelAtomsToResSet()
ws_sel_keys = [self._getKeyFromWHResidue(whres) for whres in ws_sel]
ws_sel_keys = [
key for key in ws_sel_keys
if key.entry_id in self._delayed_sync_eids
]
self._setMsvResSelectionOnly(ws_sel_keys, self._delayed_sync_eids)
self._delayed_sync_eids.clear()
def _delayedSyncFromMsvToMsv(self, sel_model):
"""
Residue selection for linked sequences are only kept in sync while the
associated structure is included in the workspace. When the associated
structures are reincluded in the workspace, residue selection is not
resynchronized until selection is changed in either the workspace or in
the MSV. This method synchronizes residue selection for those entries
by replacing residue selection in the MSV with residues selected in the
given alignment selection model. If there are multiple sequences linked
to the same structure chain, then the union of all residues selected in
those sequences will be used. In this scenario, note that selection in
these sequences will be updated to reflect this, which means that
selection in the active tab *can* change as a result of this method.
Note that this method does not update residue selection in the
workspace.
This method should only be called from inside a `_syncingSelection`
block.
"""
if not self._delayed_sync_eids:
return
def include_res(res):
return (res.hasStructure() and res.sequence.entry_id is not None and
int(res.sequence.entry_id) in self._delayed_sync_eids)
selected_res_keys = {
res.getKey() for res in sel_model.getSelection() if include_res(res)
}
self._setMsvResSelectionOnly(selected_res_keys, self._delayed_sync_eids)
self._delayed_sync_eids.clear()
[docs] def mapResidues(self, residues):
# See parent class for method documentation
if not residues:
return []
res_keys = []
for res in residues:
seq = res.sequence
if not res.is_res or not seq.hasStructure():
continue
eid = int(seq.entry_id)
chain = res.structure_chain
if not self._entry_chain_map[eid][chain].included:
continue
res_key = residue.get_residue_key(res, eid, chain)
res_keys.append(res_key)
residues = set(residues)
for key in res_keys:
residues.update(self._mapKeyToMSVResidues(key))
return residues
def _setMSVResSelection(self, selected_keys, select):
"""
Set the selection of structured residues across all tabs. Takes a
collection of residue keys and whether to select or deselect them.
:param selected_keys: collection of residue keys (in the same format
returned by `._getKeyFromWHResidue()`)
:type selected_keys: iterable(residue.ResidueKey)
:param select: whether to select or deselect the associated residues
:type select: bool
"""
selected_res = [self._mapKeyToMSVResidues(key) for key in selected_keys]
selected_res = list(itertools.chain(*selected_res))
self._setMSVResSelectionByRes(selected_res, select)
def _setMSVResSelectionByRes(self,
selected_res,
select,
*,
standing_selection_override=False):
"""
Set the selection of structured residues across all tabs. Takes a
collection of residues and whether to select or deselect them.
:param selected_res: collection of residues
:type selected_keys: iterable(residue.Residue)
:param select: whether to select or deselect the associated residues
:type select: bool
:param standing_selection_override: If True and a tab is in the middle
of a click-and-drag selection (or in the middle of a click that may
turn into a click-and-drag selection), finish the current selection
in that tab so we can update selection with these changes. (The
click-and-drag can continue after this, but it will be considered a
separate click-and-drag and will be part of a separate undo
command.) If False, tabs in the middle of a click-and-drag
selection will be skipped.
:type standing_selection_override: bool
"""
# Create mappings of sequences to the residues that we need to select
# or deselect from them
seq_to_selected_res = defaultdict(set)
for res in selected_res:
seq_to_selected_res[res.sequence].add(res)
for page in self._gui_model.pages:
split_aln = page.split_aln
to_select = itertools.chain.from_iterable(
res for seq, res in seq_to_selected_res.items()
if seq in split_aln)
aln = page.aln
if standing_selection_override:
# finishCurrentSelection is a no-op unless the selection model
# is in the middle of a click-and-drag selection
aln.res_selection_model.finishCurrentSelection()
try:
aln.res_selection_model.setSelectionState(to_select,
select,
_undoable=False)
except gui_alignment.StandingSelectionError:
# This catches the case where we attempt to sync selection
# to an alignment whose selection is currently being modified.
# We don't need to sync the selection for this alignment since
# it's the selection we're syncing from.
pass
else:
# Force the selectionChanged signal to emit immediately so we
# can block them from causing redundant syncs.
aln.res_selection_model.forceSelectionUpdate()
def _mapKeyToMSVResidues(self, key):
"""
Map a key to all the residues in MSV that are linked to it. Expects a
key formatted by `._getKeyFromWHResidue()` or
`._getKeyFromStructureResidue`.
:param key: a unique key representing the structure residue
:type key: residue.ResidueKey
:rtype: set(residue.Residue)
"""
if key.entry_id < 0:
# This residue is from a scratch entry, so we can ignore it
return set()
entry_data = self._entry_chain_map[key.entry_id]
try:
chain_data = entry_data[key.chain]
except KeyError:
# This residue has no sequence data (e.g. ligand)
return set()
residues = chain_data.mapRescodeToResidues(key.chainKey())
return residues
def _setMsvResSelectionOnly(self, to_select, eids):
"""
For any sequences that are linked to a structure with the given entry
ids, replace the residue selection.
:param to_select: Residue keys for residues to select. Should only
contain residue keys with entry ids in `eids`.
:type to_select: set(residue.ResidueKey)
:param eids: The entry ids to replace the selection of.
:type eids: Iterable(int)
"""
chain_keys_per_chain = defaultdict(lambda: defaultdict(set))
for key in to_select:
chain_keys_per_chain[key.entry_id][key.chain].add(key.chainKey())
to_deselect = []
for cur_eid in eids:
for chain, chain_data in self._entry_chain_map[cur_eid].items():
cur_to_select = chain_keys_per_chain[cur_eid][chain]
cur_to_deselect = chain_data.getAllStructuredResiduesExcept(
cur_to_select)
to_deselect.extend(cur_to_deselect)
self._setMSVResSelectionByRes(to_deselect,
False,
standing_selection_override=True)
self._setMSVResSelection(to_select, True)
@QtCore.pyqtSlot(set, set)
@util.skip_if("_closing_project")
@util.skip_if("_syncing_selection")
@util.skip_if("_syncing_inclusion")
def _alignmentSelectionChanged(self, selected, deselected):
"""
Update selection in the workspace when selection in the MSV changes.
This should only be called by a `resSelectionChanged` signal on an
`AlignmentSignals` object.
:param selected: The residues who have been newly selected
:type selected: iterable(schrodinger.protein.residue.Residue)
:param deselected: The residues who have been newly deselected
:type deselected: iterable(schrodinger.protein.residue.Residue)
"""
# Only modify maestro selection if any of the changed residues
# were actually in the workspace
selected = self._convertMsvResiduesStResidues(selected)
deselected = self._convertMsvResiduesStResidues(deselected)
if not (selected or deselected):
return
aln_signals = self.sender()
sel_model = aln_signals.aln.res_selection_model
deselected_res_keys = [
self._getKeyFromStructureResidue(res) for res in deselected
]
selected_res_keys = [
self._getKeyFromStructureResidue(res) for res in selected
]
with self._syncingSelection():
self._delayedSyncFromMsvToMsv(sel_model)
self._setMSVResSelection(deselected_res_keys, False)
# We overwrite the entire workspace selection every time any of the
# selection changes to make sure that we remove selection from any
# structures that don't have linked sequences.
selected_res_keys.extend(elem.getKey()
for elem in sel_model.getSelection()
if elem.is_res and elem.hasStructure())
self._setMSVResSelection(selected_res_keys, True)
self._syncSelectionToMaestro(sel_model.getSelection())
def _syncSelectionToMaestro(self, selection):
"""
Given a selection, push the selection of any structured residue
to Maestro. Note that this replaces any existing workspace selection.
:type selection: iterable[residue.Residue]
"""
structured_selection = {res for res in selection if res.hasStructure()}
if structured_selection:
asl = self.generateMultiEntryResidueASL(structured_selection)
maestro.command('workspaceselectionreplace ' + asl)
else:
maestro.command('workspaceselectionclear')
[docs] def delayedSyncFromMsvToWorkspace(self, aln):
# See parent class for method documentation
sel_model = aln.res_selection_model
with self._syncingSelection():
self._delayedSyncFromMsvToMsv(sel_model)
self._syncSelectionToMaestro(sel_model.getSelection())
def _convertMsvResiduesStResidues(self, msv_residues):
"""
Convert sequence residues to structure residues. Sequence residues
that don't correspond to a residue currently in the workspace are
ignored.
:param msv_residues: Sequences residues to convert
:type msv_residues: Iterable(residue.AbstractSequenceElement or
residue.CombinedChainResidueWrapper)
:return: STructure residues
:rtype: list[schrodinger.structure._structure._Residue]
"""
st_residues = []
for res in msv_residues:
if not res.hasStructure():
continue
key = res.getKey()
if not self._entry_chain_map[key.entry_id][key.chain].included:
continue
st_res = res.sequence.getStructureResForRes(res)
if st_res is not None:
st_residues.append(st_res)
return st_residues
[docs] def onResidueMiddleClicked(self, res):
"""
Fits Maestro workspace to the residue clicked with the
middle button. The structure must be included in the workspace.
:param res: clicked residue
:type res: protein.residue.Residue
"""
if (res is not None and res.hasStructure() and
res.sequence.visibility != Inclusion.Excluded):
cmd = (f'fit (chain. {res.chain} AND res.num {res.resnum}) AND '
f'e.id {res.sequence.entry_id}')
maestro.command(cmd)
[docs] def disassociateChains(self,
entry_id,
is_workspace=False,
keep_chains=None):
"""
Disassociates chains for an entry.
:param entry_id: The entry ID to split
:type entry_id: int
:return: Disassociated sequences, now with unique entry ids
:rtype: list(sequence.Sequence)
"""
proj = maestro.project_table_get()
orig_included = {row.entry_id for row in proj.included_rows}
orig_eids = {row.entry_id for row in proj.all_rows}
with self._changingMaestroVisibility():
maestro.command("entrydisassociatebychain entry %i" % int(entry_id))
new_eids = set()
for row in proj.all_rows:
eid = row.entry_id
if eid in orig_included:
# Re-include original entries
row.in_workspace = project.IN_WORKSPACE
elif eid not in orig_eids:
# Temporarily exclude new entries
row.in_workspace = project.NOT_IN_WORKSPACE
new_eids.add(eid)
# Get the new sequences
new_seqs = self.getSeqsForEids(sorted(new_eids))
if keep_chains is not None:
new_seqs = [seq for seq in new_seqs if seq.chain in keep_chains]
new_eids = {seq.entry_id for seq in new_seqs}
for eid in new_eids:
proj[eid].in_workspace = project.IN_WORKSPACE
if is_workspace:
# Now that they have been included, get the corresponding sequences
# from the workspace alignment
new_seqs = [
seq for seq in self._workspace_aln if seq.entry_id in new_eids
]
return new_seqs
[docs] def superimposeByAlignment(self, entry_residue_map):
"""
Creates and runs a Maestro command to superimpose structures by aligned
residues in the Multiple Sequence Viewer.
:param selected_seqs: Current selected sequences in MSV
:type selected_seqs: iterable(sequence.ProteinSequence)
:raise ValueError: If selected_seqs do not have unique entry IDs
"""
# Cache list of included entries
proj = maestro.project_table_get()
eids = {row.entry_id for row in proj.included_rows}
# Exclude currently included entries that aren't in selected_seqs
wanted_eids = set(entry_residue_map.keys())
temp_exclude = eids - wanted_eids
for eid in temp_exclude:
proj[eid].in_workspace = project.NOT_IN_WORKSPACE
new_include = wanted_eids - eids
for eid in new_include:
proj[eid].in_workspace = project.IN_WORKSPACE
asl = self.generateEntryResidueASL(entry_residue_map)
if asl == "":
maestro.command("workspaceselectionclear")
else:
asl = f"atom.ptype CA AND ({asl})"
maestro.command("workspaceselectionreplace " + asl)
try:
maestro.command("superimposeset " + asl)
except Exception as e:
QtWidgets.QMessageBox.critical(
None, "Structure Superposition from Sequence Alignment Failed",
str(e))
# Re-include
for eid in temp_exclude:
proj[eid].in_workspace = project.IN_WORKSPACE
[docs] @util.skip_if("_updating_color")
def onWorkspaceChanged(self, changed):
"""
Callback that is called whenever the maestro workspace changes.
See maestro_callback for details.
:param changed: What kind of change occured in the workspace
:type changed: a WORKSPACE_CHANGED_* constant in maestro.py
"""
if changed in [
maestro.WORKSPACE_CHANGED_COLOR,
maestro.WORKSPACE_CHANGED_EVERYTHING
]:
self.workspaceColorsChanged.emit()
def _get_ResidueKey(self, res):
"""
:param res: The residue to turn into a key
:type res: schrodinger.structure._Residue
:return: A unique key to identify the residue in the workspace
:rtype: residue.ResidueKey
"""
return residue.get_structure_residue_key(res, res.atom[1].entry_id)
[docs] def getWorkspaceColors(self):
"""
Get the colors of each sequence residue in the workspace. For amino
acid residues, the color of the alpha carbon is returned; for nucleotide
residues, the color of the C1' on the sugar is returned.
:return: The colors of each residue in the workspace. Each residue is
represented by a tuple of (entry_id, chain, resnum, inscode) and
each color is represented by a tuple of (r,g,b) values.
:rtype: dict(residue.ResidueKey, tuple(int, int, int))
"""
color_map = {}
struc = maestro.workspace_get()
for res in struc.residue:
color_atom = res.getAlphaCarbon()
if color_atom is None:
# For nucleotides, use the color of C1' instead of C-alpha
color_atom = res.getAtomByPdbName(" C1'")
if color_atom is None:
# This residue is not an amino acid or a nucleotide, so we
# don't need its color.
continue
key = self._get_ResidueKey(res)
color_map[key] = color_atom.color.rgb
return color_map
[docs] def setWorkspaceColors(self, color_map, all_atoms=False):
"""
Set the colors in the workspace for all the residues in the color map.
If a residue is not in the color map, its color will not be changed.
All atoms in a residue will be re-colored.
:param color_map: The new colors that residues should have. Each
residue is represented by a 4-tuple of (entry_id, chain, resnum,
inscode), and each color is represented by a tuple of (r,g,b) values.
:type color_map: dict(residue.ResidueKey, tuple(int, int, int))
:param all_atoms: Whether to color all atoms or just carbons
:type all_atoms: bool
"""
proj = maestro.project_table_get()
all_eids = {key.entry_id for key in color_map.keys() if key is not None}
for eid in all_eids:
struc = proj[eid].getStructure()
for res in struc.residue:
key = self._get_ResidueKey(res)
color = color_map.get(key)
if color is None:
continue
for atom in res.atom:
if all_atoms or atom.element == 'C':
atom.setColorRGB(*color)
with self._updatingColor():
proj[eid].setStructure(struc)
[docs]class StandaloneStructureModel(AbstractStructureModel):
"""
A structure model for when the MSV is run directly from the command line.
:note: When copying a sequence, this structure model currently strips all
structural information from the copy. If we need the copy to retain
structural information, we should make sure that setting the structure
on one copied chain updates the structure on all other copied chains
without affecting the structure from the original sequences.
"""
[docs] def __init__(self):
super().__init__()
self._eid = 1
[docs] def renameSeq(self, seq, new_name):
"""
Rename the specified sequence
:param seq: Sequence to be renamed
:type seq: sequence.ProteinSequence
:param new_name: New name for the sequence
:type new_name: str
"""
aln = self._gui_model.getAlignmentOfSequence(seq)
aln.renameSeq(seq, new_name)
def _readStructures(self, filename):
# See AbstractStructureModel for method documentation
strucs = list(structure.StructureReader(filename))
seqs = []
for cur_struc in strucs:
# Since there's no project, we generate fake entry ids
eid = self._eid
self._eid += 1
# Set eid on structure so eid-based ASLs work
cur_struc.property['s_m_entry_id'] = str(eid)
cur_seqs = self._convertStructure(cur_struc, eid)
for cur_seq in cur_seqs:
cur_seq._get_structure = partial(copy.copy, cur_struc)
# We use weakrefs so that we don't prevent garbage collection
# for sequences or structures
weak_seqs = list(map(weakref.ref, cur_seqs))
cur_seq._set_structure = partial(self._setStructure, weak_seqs)
seqs.extend(cur_seqs)
for cur_seq in seqs:
cur_seq.sequenceCopied.connect(self._sequenceCopied)
return seqs
def _setStructure(self, weak_seqs, struc):
"""
Set the structure on all given sequences.
:param weak_seqs: A list of weak references to sequences. If the
referenced sequence has been deleted, it will be ignored.
:type weak_seqs: list[weakref.ref]
:param struc: The new structure to set.
:type struc: structure.Structure
"""
for cur_weak_seq in weak_seqs:
seq = cur_weak_seq()
if seq is not None:
seq._get_structure = partial(copy.copy, struc)
@QtCore.pyqtSlot(object, object)
def _sequenceCopied(self, orig_seq, copy_seq):
"""
When a sequence that we're monitoring is copied, strip all structural
information from the copy. See the class docstring for additional
information.
:param orig_seq: The sequence being copied.
:type orig_seq: schrodinger.protein.sequence.Sequence
:param copy_seq: The newly created copy.
:type copy_seq: schrodinger.protein.sequence.Sequence
"""
copy_seq.entry_id = None
[docs]class PyMolStructureModel(AbstractStructureModel):
"""
A stub for a PyMol structure model.
"""