"""
Classes for working with sequences containing alignment information (gaps) and
collections thereof.
Copyright Schrodinger, LLC. All rights reserved.
"""
import collections
import contextlib
import copy
import enum
import itertools
import types
from bisect import bisect
from schrodinger.application.msv import utils as msv_utils
from schrodinger.models import json
from schrodinger.protein import annotation
from schrodinger.protein import properties
from schrodinger.protein import residue
from schrodinger.protein import sequence
from schrodinger.Qt import QtCore
ResidueSimilarity = enum.Enum('ResidueSimilarity',
['Identical', 'Similar', 'Dissimilar', 'NA'])
[docs]class AnchoredResidueError(RuntimeError):
"""
Exception to indicate that an action would break anchors.
When possible, the specific anchors that block the action are stored on the
exception instance.
:ivar blocking_anchors: Anchors that block the action or `ALL_ANCHORS` if
all anchors block the action.
:vartype blocking_anchors: list[residue.Residue] or object
"""
ALL_ANCHORS = object()
[docs] def __init__(self, message=None, blocking_anchors=ALL_ANCHORS):
"""
:param message: Exception message
:type message: str
:param blocking_anchors: Anchored residues that block the action
:type blocking_anchors: list[residue.Residue]
"""
super().__init__(message)
self.blocking_anchors = blocking_anchors
[docs]class StructuredResidueError(RuntimeError):
pass
[docs]class AlignmentSignals(QtCore.QObject):
"""
A collection of signals that can be emitted by an alignment
:ivar domainsChanged: TODO
:vartype domainsChanged: `QtCore.pyqtSignal`
:ivar sequencesAboutToBeInserted: A signal emitted before sequences are
inserted into the alignment. Emitted with: (The index of the first
sequence to be inserted, The index of the last sequence to be inserted)
:vartype sequencesAboutToBeInserted: `QtCore.pyqtSignal`
:ivar sequencesInserted: A signal emitted after sequences are
inserted into the alignment. Emitted with: (The index of the first
sequence inserted, The index of the last sequence inserted)
:vartype sequencesInserted: `QtCore.pyqtSignal`
:ivar sequencesAboutToBeRemoved: A signal emitted before sequences are
removed from the alignment. Emitted with: (The index of the first
sequence to be removed, The index of the last sequence to be removed)
:vartype sequencesAboutToBeRemoved: `QtCore.pyqtSignal`
:ivar sequencesRemoved: A signal emitted after sequences are
removed from the alignment. Emitted with: (The index of the first
sequence removed, The index of the last sequence removed)
:vartype sequencesRemoved: `QtCore.pyqtSignal`
:ivar sequenceResiduesChanged: A signal emitted after the contents of a
sequence have changed. Note that this signal may also be emitted in
response to a sequence changing length, as positions in the alignment
may switch from blank to occupied or vice versa.
:vartype sequenceResiduesChanged: `QtCore.pyqtSignal`
:ivar sequencesAboutToBeReordered: Signal emitted before reordering
sequences
:type: sequencesAboutToBeReordered: `QtCore.pyqtSignals`
:ivar sequencesReordered: Signal emitted after sequences have been reordered
:type: sequencesReordered: `QtCore.pyqtSignals`
:ivar sequenceNameChanged: A signal emitted after a sequence has changed
names. Emitted with: (The modified sequence)
:vartype sequenceNameChanged: `QtCore.pyqtSignal`
:ivar annotationTitleChanged: A signal emitted after a sequence's annotation
has changed titles. Emitted with: (The sequence whose annotation title
has been modified)
:vartype annotationTitleChanged: `QtCore.pyqtSignal`
:ivar alignmentNumColumnsAboutToChange: A signal emitted before the
alignment changes length. Emitted with: (The current length of the
alignment, The new length of the alignment)
:vartype alignmentNumColumnsAboutToChange: `QtCore.pyqtSignal`
:ivar alignmentNumColumnsChanged: A signal emitted after the alignment
changes length. Emitted with: (The old length of the alignment, The
current length of the alignment)
:vartype alignmentNumColumnsChanged: `QtCore.pyqtSignal`
:ivar residuesAboutToBeRemoved: A signal emitted before residues are to be
removed. Emitted with a list of the residues to be removed.
:vartype residuesAboutToBeRemoved: `QtCore.pyqtSignal`
:ivar residuesRemoved: A signal emitted after residues are removed. This
signal is not emitted with any parameters, but the residues that were
removed were listed with the corresponding residuesAboutToBeRemoved
signal.
:vartype residuesRemoved: `QtCore.pyqtSignal`
:ivar residuesAdded: A signal emitted with added residues. Note that this
signal will be only be emitted once even if residues are added to
multiple sequences. In addition, each individual sequence will emit a
lengthChanged signal.
:vartype residuesAdded: `QtCore.pyqtSignal`
:ivar sequenceVisibilityChanged: A signal emitted when visibility of a
sequence changes. Emitted with: (the sequence whose visibility is
changing, the index of the sequence)
:vartype sequenceVisibilityChanged: `QtCore.pyqtSignal`
:ivar sequenceStructureChanged: A signal emitted when structure of a
sequence changes. Emitted with: (the sequence whose visibility is
changing, the index of the sequence)
:vartype sequenceStructureChanged: `QtCore.pyqtSignal`
:ivar alignmentAboutToBeCleared: A signal emitted just before all sequences
are removed from the alignment.
:vartype alignmentAboutToBeCleared: `QtCore.pyqtSignal`
:ivar alignmentCleared: A signal emitted just after all sequences have been
removed from the alignment.
:vartype alignmentCleared: `QtCore.pyqtSignal`
:ivar anchoredResiduesChanged: A signal emitted when one or more residues
are anchored or unanchored.
:vartype alignmentCleared: `QtCore.pyqtSignal`
:ivar alnSetsChanged: A signal emitted when the alignment set for one or
more sequences changes.
:vartype alnSetsChanged: QtCore.pyqtSignal
"""
domainsChanged = QtCore.pyqtSignal()
invalidatedDomains = QtCore.pyqtSignal()
descriptorsCleared = QtCore.pyqtSignal()
sequencesAboutToBeInserted = QtCore.pyqtSignal(int, int)
sequencesInserted = QtCore.pyqtSignal(int, int)
sequencesAboutToBeRemoved = QtCore.pyqtSignal(int, int)
sequencesRemoved = QtCore.pyqtSignal(int, int)
sequenceResiduesChanged = QtCore.pyqtSignal()
sequencesAboutToBeReordered = QtCore.pyqtSignal()
sequencesReordered = QtCore.pyqtSignal(list)
sequenceNameChanged = QtCore.pyqtSignal(sequence.Sequence)
annotationTitleChanged = QtCore.pyqtSignal(sequence.Sequence)
alignmentNumColumnsAboutToChange = QtCore.pyqtSignal(int, int)
alignmentNumColumnsChanged = QtCore.pyqtSignal(int, int)
residuesAboutToBeRemoved = QtCore.pyqtSignal(list)
residuesRemoved = QtCore.pyqtSignal()
residuesAdded = QtCore.pyqtSignal(list)
sequenceVisibilityChanged = QtCore.pyqtSignal(sequence.Sequence, int)
sequenceStructureChanged = QtCore.pyqtSignal(sequence.Sequence, int)
alignmentAboutToBeCleared = QtCore.pyqtSignal()
alignmentCleared = QtCore.pyqtSignal()
anchoredResiduesChanged = QtCore.pyqtSignal()
alnSetChanged = QtCore.pyqtSignal()
secondaryStructureChanged = QtCore.pyqtSignal()
predictionsChanged = QtCore.pyqtSignal()
pfamChanged = QtCore.pyqtSignal()
kinaseFeaturesChanged = QtCore.pyqtSignal()
kinaseConservationChanged = QtCore.pyqtSignal()
@property
def aln(self):
"""
Return the alignment that this signals object is reporting for.
:rtype: BaseAlignment
"""
return self.parent()
[docs] @QtCore.pyqtSlot()
def emitSeqResChanged(self):
# We use self.sender() to retrieve the sequence instead of using a
# functools.partial as a slot due to a memory leak (MSV-1538)
self.sequenceResiduesChanged.emit()
[docs] @QtCore.pyqtSlot()
def emitSeqNameChanged(self):
# We use self.sender() to retrieve the sequence instead of using a
# functools.partial as a slot due to a memory leak (MSV-1538)
self.sequenceNameChanged.emit(self.sender())
[docs] @QtCore.pyqtSlot()
def emitAnnTitleChanged(self):
# We use self.sender() to retrieve the sequence instead of using a
# functools.partial as a slot due to a memory leak (MSV-1538)
self.annotationTitleChanged.emit(self.sender())
[docs] def allSignals(self):
"""
Iterate over all signals in this object in alphabetical order.
:rtype: Iter(QtCore.pyqtBoundSignal)
"""
for signal, signal_name in self.allSignalsAndNames():
yield signal
[docs] def allSignalsAndNames(self):
"""
Iterate over all signals in this object and their names in alphabetical
order.
:rtype: Iter(tuple(QtCore.pyqtBoundSignal, str))
"""
for attr_name in sorted(dir(self)):
attr = getattr(self, attr_name)
if isinstance(attr, QtCore.pyqtBoundSignal):
yield attr, attr_name
[docs]class BaseAlignment(QtCore.QObject):
"""
Abstract base class for classes which handle alignment of various sequences
and corresponding annotations.
This is a pure domain object intended to make it easy to work with aligned
collections of sequences.
Some methods are decorated with @msv_utils.const in order to make it easy to
write a wrapper for this class that supports undo/redo operations.
:cvar _ALN_ANNOTATION_CLASS: The class for alignment annotations. This
value should be overridden in subclasses.
:vartype _ALN_ANNOTATION_CLASS: type
:cvar _SEQ_ANNOTATION_CLASS: The class for sequence annotations. This
value should be overridden in subclasses.
:vartype _SEQ_ANNOTATION_CLASS: type
"""
_ALN_ANNOTATION_CLASS = None
_SEQ_ANNOTATION_CLASS = None
[docs] def __init__(self, sequences=None):
"""
:param sequences: An optional iterable of sequences
:type sequences: list
"""
super().__init__()
# Count for number of times we've entered a suspendAnchors context.
self._suspend_anchors_count = 0
self._modifying_structure = False
self.signals = AlignmentSignals(self)
self._sequences = []
self._anchored_residues = collections.defaultdict(set)
self._name_to_set = {}
self._seq_to_set = {}
self._set_id_counter = 0
self._num_columns = 0
self._seq_positions_cache = {}
self._col_same_res_cache = None
self._res_similarity_cache = None
self._ref_type_cache = None
self._seqs_matching_ref_type_cache = None
self._RO_anchored_residues = None # read-only set of anchored residues
# read-only set of anchored residue plus the corresponding reference
# residues
self._RO_anchored_residues_with_ref = None
if self._ALN_ANNOTATION_CLASS is None:
self._annotations = None
self._global_annotations = []
else:
self._annotations = self._ALN_ANNOTATION_CLASS(self)
self._global_annotations = \
self._ALN_ANNOTATION_CLASS.ANNOTATION_TYPES
if self._SEQ_ANNOTATION_CLASS is None:
self._seq_annotations = []
else:
self._seq_annotations = self._SEQ_ANNOTATION_CLASS.ANNOTATION_TYPES
self.signals.sequencesAboutToBeRemoved.connect(
self._clearAnchorsFromSeqs)
self.signals.anchoredResiduesChanged.connect(self.clearAllCaching)
cache_invalidating_signals = [
self.signals.sequencesRemoved, self.signals.anchoredResiduesChanged,
self.signals.sequencesReordered,
self.signals.alignmentNumColumnsChanged,
self.signals.alignmentNumColumnsAboutToChange,
self.signals.sequenceResiduesChanged,
self.signals.sequencesInserted, self.signals.residuesRemoved,
self.signals.residuesAdded
]
for signal in cache_invalidating_signals:
signal.connect(self._resetCaches)
for signal in (self.signals.sequencesRemoved,
self.signals.sequencesInserted,
self.signals.sequencesReordered):
signal.connect(self._resetSeqTypeCache)
if sequences is not None:
if any(isinstance(seq, str) for seq in sequences):
sequences = [
sequence.ProteinSequence(seq_str) if isinstance(
seq_str, str) else seq_str for seq_str in sequences
]
self.addSeqs(sequences)
############################################################################
# Magic methods
############################################################################
[docs] @msv_utils.const
def __len__(self):
"""
Returns the number of sequences in the alignment
"""
return len(self._sequences)
@msv_utils.const
def __iter__(self):
"""
Returns an iterable of the sequences held in the alignment
"""
return iter(self._sequences)
[docs] @msv_utils.const
def __contains__(self, seq):
"""
Returns whether the sequence is present in the alignment
"""
return seq in self._sequences
@msv_utils.const
def __getitem__(self, index):
"""
Returns the sequence at the index in the alignment
"""
try:
return self._sequences[index]
except IndexError:
msg = ("index (%d) out of range (alignment length is only %d)" %
(index, len(self)))
raise IndexError(msg)
# TODO MSV-1907: Move magic methods into one contiguous block.
@msv_utils.const
def _getTextFormattedAlignment(self):
"""
Returns a formatted list of strings with detailed alignment information
"""
element_pad = 3
v_header_pad = 15
# pad all the strings in the list by a given amount
def pad_list(lst, amount):
return [i.ljust(amount) for i in lst]
seq_data = collections.OrderedDict()
ruler = [str(i) for i in range(self._num_columns)]
ruler = pad_list(ruler, element_pad)
seq_data["Resnum:"] = ruler
for num, seq in enumerate(self, 1):
name = seq.name
if not name:
name = "Seq" + str(num)
name += ":"
elements = []
for res in seq:
if res.is_gap:
res_str = seq.gap_char
else:
res_str = str(res)
if not res.hasStructure() and res.is_res:
res_str = residue.box_letter(res_str)
elements.append(res_str)
elements = pad_list(elements, element_pad)
seq_data[name] = elements
to_format = []
# break every stride elements and start a new line. This keeps lines
# about 75 characters long
stride = 20
for i in range(0, self._num_columns, stride):
if i != 0:
to_format.append("\n...cont...\n")
for title, rows in seq_data.items():
if len(rows) < self._num_columns:
# prevent index errors
padding_needed = self._num_columns - len(rows)
rows += ["".ljust(element_pad)] * padding_needed
line = title.ljust(v_header_pad) + "".join(rows[i:i + stride])
to_format.append(line)
return to_format
@msv_utils.const
def _getAlignmentDescription(self):
"""
Returns a formatted description of the alignment
"""
seq_lengths = ", ".join([str(len(seq)) for seq in self])
class_name = self.__class__.__name__
num_seqs = str(len(self))
info = "{class_name} containing {num_seqs} sequences"
if len(self) > 0:
info += " (of respective lengths: {lengths})"
info = info.format(class_name=class_name,
num_seqs=num_seqs,
lengths=seq_lengths)
return info
@msv_utils.const
def __str__(self):
"""
Returns a str representation of the alignment
This is fairly detailed since it is very useful for debugging
"""
to_format = self._getTextFormattedAlignment()
info = self._getAlignmentDescription()
return info + "\n\n" + "\n".join(to_format)
def __repr__(self):
"""
:rtype: str
:return: A str representation of the alignment
"""
return self._getRepr(type(self).__qualname__)
def _getRepr(self, classname):
aln_repr = [
classname, "([", ", ".join([repr(seq) for seq in self]), "])"
]
return "".join(aln_repr)
def __deepcopy__(self, memo):
"""
We should be able to copy an alignment, getting back an alignment with
all the essential data in the original alignment and sharing no
references with it
"""
sequences = copy.deepcopy(self._sequences, memo)
Cls = self.__class__
aln = Cls(sequences)
self._copyAnchoringTo(aln)
self._copyAlnSetsTo(aln)
return aln
@msv_utils.const
def _copyAnchoringTo(self, aln):
"""
Copy all anchored residues from this alignment to the given alignment.
:param aln: The alignment to copy anchoring to.
:type aln: BaseAlignment
"""
anchored_residues = self.getAnchoredResidues()
new_anchored_res = self._mapResidues(anchored_residues, aln)
aln.anchorResidues(new_anchored_res)
@msv_utils.const
def _copyAlnSetsTo(self, aln):
"""
Copy all alignment sets from this alignment to the given alignment.
:param aln: The alignment to copy alignment sets to.
:type aln: BaseAlignment
"""
for aln_set in self._name_to_set.values():
copied_seqs = [aln[self.index(seq)] for seq in aln_set]
aln._addSeqsToAlnSetNoReordering(copied_seqs, aln_set.name)
# make sure that the set ids match so that the colors are the same
aln._name_to_set[aln_set.name].set_id = aln_set.set_id
aln._set_id_counter = self._set_id_counter
@msv_utils.const
def _mapResidues(self, og_residues, new_alignment):
"""
Given residues and a new alignment, get the residues in the same
position in the new alignment.
"""
return [
new_alignment[self.index(res.sequence)][res.idx_in_seq]
for res in og_residues
]
############################################################################
# Annotations
############################################################################
@property
def annotations(self):
return self._annotations
@property
def global_annotations(self):
"""
Returns the alignment-level annotations available for the alignment
"""
return self._global_annotations
@property
def seq_annotations(self):
"""
Returns the sequence-level annotations available for sequences held in
the alignment
"""
return self._seq_annotations
[docs] @msv_utils.const
def getGlobalAnnotationData(self, index, annotation):
"""
Returns column-level annotation data at an index in the alignment
:param index: The index in the alignment
:type index: int
:param annotation: An enum representing the requested annotation, if any
:type annotation: `enum.Enum`
"""
annotation_name = annotation.name
data_array = getattr(self.annotations, annotation_name)
try:
return data_array[index]
except IndexError:
return
############################################################################
# Sequences and signals
############################################################################
@property
def num_columns(self):
return self._num_columns
[docs] @msv_utils.const
def getWorkspaceCounts(self):
"""
Summarize the visibility status of the alignment's sequences
:returns: Counts of each type of visibility
:rtype: collections.Counter
"""
return collections.Counter(seq.visibility for seq in self)
@QtCore.pyqtSlot()
@msv_utils.const
def _sequenceVisibilityChanged(self):
"""
Notify any listeners that a request to change visibility of
an entire sequence has been made.
"""
# We use self.sender() to retrieve the sequence instead of using a
# functools.partial as a slot due to a memory leak (MSV-1538)
seq = self.sender()
self.signals.sequenceVisibilityChanged.emit(seq, self.index(seq))
@QtCore.pyqtSlot()
@msv_utils.const
def _sequenceStructureChanged(self):
"""
Notify any listeners that the structure corresponding
to a sequence has changed.
"""
# We use self.sender() to retrieve the sequence instead of using a
# functools.partial as a slot due to a memory leak (MSV-1538)
seq = self.sender()
self.signals.sequenceStructureChanged.emit(seq, self.index(seq))
[docs] @msv_utils.const
def index(self, seq):
"""
Returns the index of the specified sequence.
:param seq: The requested sequence
:type seq: `sequence.Sequence`
:rtype: int
:return: The index of the requested sequence
"""
return self._seq_positions[seq]
[docs] def reorderSequences(self, seq_indices):
"""
Reorder the sequences in the alignment using the specified list of
indices.
In the undoable version of this class, the private function is needed to
perform the operation in an undoable operation.
:param seq_indices: A list with the new indices for sequences
:type: list of int
:raises ValueError: In the event that the list of indices does not match
the length of the alignment
"""
if len(seq_indices) != len(self):
msg = ("The number of elements in seq_indices should match the "
"number of sequences in the alignment")
raise ValueError(msg)
self.signals.sequencesAboutToBeReordered.emit()
new_sequences = [self._sequences[i] for i in seq_indices]
self._sequences = new_sequences
self.signals.sequencesReordered.emit(seq_indices)
[docs] def sortByProperty(self, seq_prop, reverse=False):
"""
Sort the sequences by a sequence property. Sequences that do not have
the sequence property defined will be grouped at the end of the
alignment (regardless of `reverse`)
:type seq_prop: properties.SequenceProperty
"""
sort_indices = self._getSortByPropertyIndices(seq_prop, reverse=reverse)
self.reorderSequences(sort_indices)
def _getSortByPropertyIndices(self, seq_prop, reverse=False):
def sort_key(seq):
val = seq.getProperty(seq_prop)
if val is None:
val = float('inf')
if reverse:
val = -float('inf')
return val
return self._getSortIndices(key=sort_key, reverse=reverse)
[docs] def sort(self, *, key, reverse=False):
"""
Sort the alignment by the specified criteria.
NOTE: Query sequence is not included in the sort.
:param key: A function that takes a sequence and returns a value to sort
by for each sequence. (required keyword-only argument)
:type key: function
:param reverse: Whether to sort in reverse (descending) order.
:type reverse: bool
"""
sort_indices = self._getSortIndices(key, reverse=reverse)
self.reorderSequences(sort_indices)
return sort_indices
def _getSortIndices(self, key, reverse=False):
'''
Helper method that gets the sequence indices for a sort.
:param key: A function that returns a key to sort by.
:type key: function
:param reverse: Whether to sort in reverse (descending) order.
:type order: bool
:raises ValueError: If given something other than a SortBy enum.
'''
# Extract key values and zip with indices
if len(self) == 0:
return []
else:
# Skip index 0 since that's the query sequence
key_values = [(key(self[i]), i) for i in range(1, len(self))]
key_values.sort(key=lambda x: x[0], reverse=reverse)
sort_indices = [0] + [kv[1] for kv in key_values]
return sort_indices
# TODO MSV-1910: Consider renaming length to numColumns?
@msv_utils.const
def _recalculateNumColumns(self, omit, extra_length=None):
"""
Determine what the length of the alignment would be if we removed
sequences.
:param omit: The sequences to omit
:type omit: Iterable[sequence.Sequence]
:param extra_length: An additional sequence length to include in the
calculation
:type extra_length: int
:return: The calculated alignment length if the length is going to
change. None otherwise.
:rtype: int or NoneType
"""
omitted = [seq for seq in self._sequences if seq not in omit]
if omitted:
new_length = max(map(len, omitted))
else:
new_length = 0
if extra_length is not None and extra_length > new_length:
new_length = extra_length
if new_length != self._num_columns:
return new_length
def _monitorSequence(self, seq):
"""
Monitor changes in the specified sequence.
:param seq: The sequence to monitor
:type seq: `sequence.Sequence`
"""
for signal, slot in self._getSeqSignalsAndSlots(seq):
signal.connect(slot)
def _stopMonitoringSequence(self, seq):
"""
Stop monitoring for changes in the specified sequence.
:param seq: The sequence to stop monitoring
:type seq: `sequence.Sequence`
"""
for signal, slot in self._getSeqSignalsAndSlots(seq):
signal.disconnect(slot)
def _getSeqSignalsAndSlots(self, seq):
"""
:return: pairs of (sequence signal, alignment slot)
:rtype: tuple(tuple())
"""
signals = self.signals
return ((seq.residuesChanged, signals.emitSeqResChanged),
(seq.annotations.domainsChanged, signals.domainsChanged),
(seq.annotations.invalidatedDomains, signals.invalidatedDomains),
(seq.descriptorsCleared, signals.descriptorsCleared),
(seq.nameChanged, signals.emitSeqNameChanged),
(seq.annotationTitleChanged, signals.emitAnnTitleChanged),
(seq.lengthAboutToChange, self._sequenceLengthAboutToChange),
(seq.lengthChanged, self._sequenceLengthChanged),
(seq.visibilityChanged, self._sequenceVisibilityChanged),
(seq.structureChanged, self._sequenceStructureChanged),
(seq.secondaryStructureChanged, signals.secondaryStructureChanged),
(seq.predictionsChanged, signals.predictionsChanged),
(seq.kinaseFeaturesChanged, signals.kinaseFeaturesChanged),
(seq.kinaseConservationChanged, signals.kinaseConservationChanged),
(seq.pfamChanged, signals.pfamChanged)) # yapf: disable
@QtCore.pyqtSlot(int, int)
def _sequenceLengthAboutToChange(self, old_seq_length, new_seq_length):
"""
Respond to a sequence `lengthAboutToChange` signal by emitting
`signals.alignmentNumColumnsAboutToChange` if necessary.
:param old_seq_length: The current length of the sequence
:type old_seq_length: int
:param new_seq_length: The new length of the sequence
:type new_seq_length: int
"""
# We use self.sender() to retrieve the sequence instead of using a
# functools.partial as a slot due to a memory leak (MSV-1538)
seq = self.sender()
new_num_columns = self._checkAlignmentNumColumns(
seq, old_seq_length, new_seq_length)
if new_num_columns is not None:
self.signals.alignmentNumColumnsAboutToChange.emit(
self._num_columns, new_num_columns)
@QtCore.pyqtSlot(int, int)
def _sequenceLengthChanged(self, old_seq_length, new_seq_length):
"""
Respond to a sequence `lengthChanged` signal by emitting
`signals.alignmentNumColumnsChanged` and `signals.sequenceResiduesChanged`
as necessary.
:param old_seq_length: The old length of the sequence
:type old_seq_length: int
:param new_seq_length: The current length of the sequence
:type new_seq_length: int
"""
# We use self.sender() to retrieve the sequence instead of using a
# functools.partial as a slot due to a memory leak (MSV-1538)
seq = self.sender()
new_num_columns = self._checkAlignmentNumColumns(
seq, old_seq_length, new_seq_length)
if new_num_columns is not None:
old_num_columns = self._num_columns
self._num_columns = new_num_columns
self.signals.alignmentNumColumnsChanged.emit(
old_num_columns, new_num_columns)
if old_seq_length < new_seq_length:
if old_seq_length < old_num_columns:
self.signals.sequenceResiduesChanged.emit()
else:
if new_seq_length < new_num_columns:
self.signals.sequenceResiduesChanged.emit()
else:
self.signals.sequenceResiduesChanged.emit()
# TODO MSV_1907: Move closer to _recalculateNumColumns
def _checkAlignmentNumColumns(self, seq, old_seq_length, new_seq_length):
"""
Determine what the length of the alignment will be if we changed the
length of the specified sequence
:param seq: The sequence that will change length
:type seq: `sequence.Sequence`
:param old_seq_length: The old length of the sequence
:type old_seq_length: int
:param new_seq_length: The new length of the sequence
:type new_seq_length: int
:return: If the alignment will change length, returns the new alignment
length. Otherwise, returns None.
:rtype: int or NoneType
"""
if new_seq_length > self._num_columns:
return new_seq_length
elif old_seq_length == self._num_columns:
new_num_columns = self._recalculateNumColumns([seq], new_seq_length)
return new_num_columns
@msv_utils.const
def _assertCanAddSeqs(self, index):
if index is not None and index > len(self):
raise ValueError("Cannot insert sequences at position past end")
[docs] def addSeq(self, seq, index=None):
"""
:param seq: The sequence to add
:type seq: `sequence.Sequence`
:param start: The index at which to insert; if None, seq is appended
:type start: int
"""
self.addSeqs([seq], index)
[docs] def addSeqs(self, sequences, start=None):
"""
Add multiple sequences to the alignment
:param sequences: Sequences to add
:type sequences: list of `sequence.Sequence`
:param start: The index at which to insert; if None, seqs are appended
:type start: int
"""
if not sequences:
return
self._assertCanAddSeqs(start)
# first, change the alignment length if necessary
old_num_columns = self._num_columns
seq_length = max(map(len, sequences))
if seq_length > old_num_columns:
self.signals.alignmentNumColumnsAboutToChange.emit(
old_num_columns, seq_length)
self._num_columns = seq_length
self.signals.alignmentNumColumnsChanged.emit(
old_num_columns, seq_length)
# then insert the sequences
if start is None:
start = len(self._sequences)
last_new_index = start + len(sequences) - 1
self.signals.sequencesAboutToBeInserted.emit(start, last_new_index)
for index, seq in enumerate(sequences, start=start):
self._sequences.insert(index, seq)
self._monitorSequence(seq)
self.signals.sequencesInserted.emit(start, last_new_index)
[docs] def removeSeq(self, seq):
"""
Remove a sequence from the alignment
:param seq: The sequence to remove
:type seq: `sequence.Sequence`
"""
self.removeSeqs([seq])
[docs] def removeSeqs(self, seqs):
"""
Remove multiple sequences from the alignment
"""
seqs = set(seqs)
if not seqs:
return
if self.getReferenceSeq() in seqs:
self._assertCanRemoveRef()
blocks, num_found = self._findSeqBlocks(seqs)
if num_found != len(seqs):
raise ValueError("Not all sequences present in alignment.")
self._additionalSeqRemovalSteps(seqs)
self._removeSeqsFromAlnSetNoReordering(seqs)
self._changeNumColumnsForRemovedSeqs(seqs)
self._removeSeqBlocks(blocks)
def _findSeqBlocks(self, seqs):
"""
Divide up the given sequences into blocks of contiguous sequences based
on their positions in the alignment.
:param seqs: The sequences to divide into blocks.
:type seqs: set
:return: A tuple of:
- A list of (start index, end index) for blocks containing the
specified sequences. This list is sorted in ascending order.
- The number of sequences that were found in the alignment. If this
is less than `len(seqs)`, then some of the input sequences are not
present in the alignment.
:rtype: tuple(list(tuple(int, int)), int)
"""
blocks = []
num_found = 0
block_start = None
for i, seq in enumerate(self):
if seq in seqs:
num_found += 1
if block_start is None:
block_start = i
elif block_start is not None:
blocks.append((block_start, i - 1))
block_start = None
if block_start is not None:
blocks.append((block_start, len(self) - 1))
return blocks, num_found
def _additionalSeqRemovalSteps(self, seqs):
"""
The is a hook for subclasses to carry out any additional sequence
removal operations. It will be called after we have confirmed that
`seqs` are safe to remove from the alignment but before the sequences
are actually removed.
:param seqs: The sequences that will be removed from the alignment.
:type seqs: set(sequence.Sequence)
"""
pass
def _changeNumColumnsForRemovedSeqs(self, seqs_to_remove):
"""
Update the number of columns in the alignment in preparation for
removing the specified sequences.
:param seqs_to_remove: The sequences that will be removed from the
alignment.
:type seqs_to_remove: set(sequence.Sequence)
"""
new_num_columns = self._recalculateNumColumns(seqs_to_remove)
if new_num_columns is None:
return
self.signals.alignmentNumColumnsAboutToChange.emit(
self._num_columns, new_num_columns)
old_num_columns = self._num_columns
self._num_columns = new_num_columns
self.signals.alignmentNumColumnsChanged.emit(old_num_columns,
new_num_columns)
def _removeSeqBlocks(self, blocks):
"""
Remove sequences at the specified indices in the alignment.
:param blocks: A list of (start index, end index) for blocks to remove.
This list must be sorted in ascending order.
:type blocks: list[tuple(int, int)]
"""
for start, end in reversed(blocks):
self.signals.sequencesAboutToBeRemoved.emit(start, end)
for seq in self._sequences[start:end + 1]:
self._stopMonitoringSequence(seq)
del self._sequences[start:end + 1]
self.signals.sequencesRemoved.emit(start, end)
@msv_utils.const
def _assertCanRemoveRef(self):
"""
If any anchored residues are present, raise an exception stating that we
can't remove the reference sequence.
"""
if self.getAnchoredResidues():
raise AnchoredResidueError("Can't remove reference sequence while "
"there are anchored residues")
@QtCore.pyqtSlot()
def _resetCaches(self):
"""
Utility function to simultaneously reset caches.
"""
self._seq_positions_cache.clear()
self._col_same_res_cache = None
self._res_similarity_cache = None
self._RO_anchored_residues = None
self._RO_anchored_residues_with_ref = None
self.annotations.clearAllCaching()
@QtCore.pyqtSlot()
def _resetSeqTypeCache(self):
self._ref_type_cache = None
self._seqs_matching_ref_type_cache = None
[docs] def clear(self):
"""
Clears the entire alignment of sequences
"""
if len(self) == 0:
return
self.signals.alignmentAboutToBeCleared.emit()
old_num_columns = self._num_columns
self.signals.alignmentNumColumnsAboutToChange.emit(old_num_columns, 0)
old_num_sequences = len(self._sequences) - 1
self.signals.sequencesAboutToBeRemoved.emit(0, old_num_sequences)
for seq in self._sequences:
self._stopMonitoringSequence(seq)
self._sequences = []
self.signals.sequencesRemoved.emit(0, old_num_sequences)
if old_num_columns > 0:
self._num_columns = 0
self.signals.alignmentNumColumnsChanged.emit(old_num_columns, 0)
self.signals.alignmentCleared.emit()
@property
def _seq_positions(self):
if not self._seq_positions_cache:
self._updateSequencePositionCache()
return self._seq_positions_cache
def _updateSequencePositionCache(self):
"""
Update the cache of sequences to their index within the sequence.
The sequence-index cache is stored as a mapping between sequences
and their indices.
"""
self._seq_positions_cache = {seq: i for i, seq in enumerate(self)}
############################################################################
# Operations on Sequences
############################################################################
@msv_utils.const
def _getReferenceSeqReordering(self, seq):
"""
Returns an ordering that will make the specified sequence the reference
sequence
:param seq: Sequence to set as reference sequence
:type seq: `sequence`
:rtype: list of int
:return: A list of indices
"""
seq_index = self.index(seq)
new_seq_ordering = self._getAlnSetSeqIdxs(seq)
to_skip = set(new_seq_ordering)
new_seq_ordering.extend(
[seq_i for seq_i in range(len(self)) if seq_i not in to_skip])
return new_seq_ordering
[docs] def setReferenceSeq(self, seq):
"""
Set the specified sequence as the reference sequence.
:param seq: Sequence to set as reference sequence
:type seq: `sequence`
"""
self._assertCanSetReferenceSeq()
new_seq_ordering = self._getReferenceSeqReordering(seq)
self.reorderSequences(new_seq_ordering)
@msv_utils.const
def _assertCanSetReferenceSeq(self):
"""
If any anchored residues are present, raise an exception stating that we
can't set a new reference sequence.
"""
if self.getAnchoredResidues():
raise AnchoredResidueError("Can't set reference sequence while "
"there are anchored residues")
[docs] @msv_utils.const
def getReferenceSeq(self):
"""
Returns the sequence that has been set as reference sequence or None if
there is no reference sequence.
:return: The reference sequence or None
:rtype: `Sequence` or None
"""
if not self._sequences:
return None
return self[0]
[docs] @msv_utils.const
def isReferenceSeq(self, seq):
"""
Return whether or not a sequence is the reference sequence.
:param seq: Sequence to check
:type seq: `Sequence`
:return: True if the sequence is the reference sequence, False
otherwise.
:rtype: bool
"""
return self._sequences and self._sequences[0] == seq
[docs] @msv_utils.const
def getResidueIndices(self, residues, sort=True):
"""
Returns the indices (in the alignment) of the specified residues
:param residues: The list of residues and gaps to get indices for.
:type residues: list[residue.AbstractSequenceElement]
:param sort: Whether the returned list should be sorted.
:type sort: bool
:rtype: A list of (sequence index, residue index) tuples
:return: list[tuple(int, int)]
"""
indices = []
by_seqs = collections.defaultdict(list)
for res in residues:
seq = res.sequence
if seq is None:
raise ValueError(
f"Residue {res} {id(res)} does not belong to a sequence")
by_seqs[seq].append(res)
for seq, residues_in_seq in by_seqs.items():
seq_i = self._seq_positions[seq]
res_indices = seq.indices(residues_in_seq)
indices.extend((seq_i, res_i) for res_i in res_indices)
if sort:
indices.sort()
return indices
@msv_utils.const
def _assertCanRemove(self, elements):
"""
Check whether the specified elements can be removed
:param elements: An iterable of elements.
:type elements: iterable(residue.AbstractSequenceElement)
:raises AnchoredResidueError: if the elements cannot be removed
:raises StructuredResidueError: In the event that this method attempts
to remove structured residues
"""
elements = set(elements)
anchored_res = set(self.getAnchoredResidues())
if elements.intersection(anchored_res):
err_msg = ("Can't remove an anchored residue. Unanchor the residue "
"and try again")
raise AnchoredResidueError(err_msg)
ref_seq = self.getReferenceSeq()
anchored_ref_res = {ref_seq[res.idx_in_seq] for res in anchored_res}
if elements.intersection(anchored_ref_res):
err_msg = ("Can't remove a reference residue while another residue "
"is anchored to it.")
raise AnchoredResidueError(err_msg)
if (not self._modifying_structure and
residue.any_structured_residues(elements)):
err_msg = ("Can't remove a structured residue.")
raise StructuredResidueError(err_msg)
[docs] def removeElements(self, elements):
"""
Removes the specified elements from the alignment.
:param elements: An iterable of elements.
:type elements: iterable(residue.AbstractSequenceElement)
:raises AnchoredResidueError: if the elements cannot be removed
:raises StructuredResidueError: In the event that this method attempts
to mutate structured residues
"""
elements = list(elements) # in case it's exhaustible
self._assertCanRemove(elements)
eles_to_remove = collections.defaultdict(list)
for ele in elements:
eles_to_remove[ele.sequence].append(ele)
self.signals.residuesAboutToBeRemoved.emit(elements)
for seq, eles in eles_to_remove.items():
gap_idxs = self._getAnchorConservingGapIdxs(seq, eles)
seq.removeElements(eles)
seq.addGapsByIndices(gap_idxs)
# We wait until after all the changes to emit the signal
# Note that the signals.residuesRemoved signal is emitted once even if
# elements are removed from multiple sequences.
self.signals.residuesRemoved.emit()
@msv_utils.const
def _getAnchorConservingGapIdxs(self, seq, eles):
"""
Given a sequence and a list of elements to remove, return a list
of indices to add gaps that ensure all residue anchors are maintained.
:param seq: The sequence to remove elements from
:type seq: sequence.Sequence
:param eles: A list of elements that will be removed from `seq`
:type eles: list(residue.Residue)
:return: A sorted list of gap indices that will maintain the alignment
of residue anchors. These indices should be added immediately after
`eles` are removed from `seq`
:rtype: list(int)
"""
if seq == self.getReferenceSeq():
seq_anchors = self.getAnchoredResidues()
else:
seq_anchors = self._anchored_residues.get(seq, [])
seq_anchors = sorted(seq_anchors, key=lambda ele: ele.idx_in_seq)
seq_anchor_idxs = [ele.idx_in_seq for ele in seq_anchors]
# Count up the number of removed elements before each anchor.
# We add a `None` to catch any removed elements that come after any
# anchors.
seq_anchors.append(None)
counts = collections.Counter(
seq_anchors[bisect(seq_anchor_idxs, ele.idx_in_seq)]
for ele in eles)
# Add gaps before any anchors equal to the number of elements
# removed before them.
gap_idxs = []
for anchor, count in counts.items():
if anchor is None:
continue
anchor_idx = anchor.idx_in_seq
gap_idxs.extend(range(anchor_idx - count, anchor_idx))
return gap_idxs
@msv_utils.const
def _assertCanMutateResidues(self, seq_i, start, end, elements):
seq = self[seq_i]
removed = seq[start:end]
self._assertCanRemove(removed)
if len(elements) > len(removed):
first_extra_idx = start + len(removed)
self._assertCanInsert(seq, [first_extra_idx])
[docs] def mutateResidues(self, seq_i, start, end, elements):
"""
Mutate a sequence.
:param seq_i: Index of seq to mutate
:type seq_i: int
:param start: Start index of seq region to mutate
:type start: int
:param end: End index of seq region to mutate
:type end: int
:param elements: Elements to mutate to
:type elements: iterable(str) or iterable(ElementClass)
:raises AnchoredResidueError: if the mutation violates anchoring
:raises StructuredResidueError: In the event that this method attempts
to mutate structured residues
"""
self._assertCanMutateResidues(seq_i, start, end, elements)
seq = self._sequences[seq_i]
length_preserving_end = start + len(elements)
self.signals.residuesAboutToBeRemoved.emit(seq[start:end])
if end <= length_preserving_end:
# When more residues are added than removed, we don't need to worry
# about anchors
seq.mutate(start, end, elements)
self.signals.residuesRemoved.emit()
else:
# A same-length mutate is safe for anchors
seq.mutate(start, length_preserving_end, elements)
to_remove = seq[length_preserving_end:end]
self.removeElements(to_remove)
[docs] def replaceResiduesWithGaps(self, residues):
"""
Replaces the specified residues with gaps
:param residues: A list of residues to replace with gaps
:type residues: list
"""
for seq_i, res_i in self.getResidueIndices(residues):
self.mutateResidues(seq_i, res_i, res_i + 1, [residue.Gap()])
[docs] def addElements(self, seq, res_i, elements):
"""
Adds the specified elements (residues and/or gaps) to the alignment.
:param seq: A sequence in the alignment
:type seq: sequence.Sequence
:param res_i: Index to insert the elements
:type res_i: int
:param elements: elements to insert
:type elements: iterable(str or residue.AbstractSequenceElement)
"""
self._assertCanInsert(seq, [res_i])
elements = list(elements) # In case it's exhaustable
seq.insertElements(res_i, elements)
self.signals.residuesAdded.emit(elements)
[docs] @msv_utils.const
def getResiduesWithStructure(self):
"""
Returns a list of all residues with structure
"""
def has_structure(res):
return res.is_res and not res.seqres_only
seqs = (seq for seq in self if seq.hasStructure())
return list(res for res in itertools.chain(*seqs) if has_structure(res))
[docs] @msv_utils.const
@contextlib.contextmanager
def modifyingStructure(self):
self._modifying_structure = True
yield
self._modifying_structure = False
############################################################################
# Residue Anchoring
############################################################################
[docs] @contextlib.contextmanager
def suspendAnchors(self):
"""
While inside this context, all anchors will be ignored. Upon exit, the
anchors will be restored and an exception will be raised if any of the
anchors are not aligned to the same reference residues they were
aligned to at the start.
"""
self._suspendAnchors()
yield
self._unsuspendAnchors()
def _suspendAnchors(self):
self._suspend_anchors_count += 1
if self._suspend_anchors_count > 1:
# If this is a nested anchor suspension, return early
return
self._saved_anchored_residues = self._anchored_residues
self._anchor_pairs = self._getAnchorPairs()
self._anchored_residues = types.MappingProxyType({})
self._RO_anchored_residues = None
self._RO_anchored_residues_with_ref = None
# We use a read-only dict so that any attempted changes to anchoring
# will throw an error.
@msv_utils.const
def _getAnchorPairs(self):
"""
Get all anchored residues paired with the corresponding reference
seqence residue.
:return: A list of (reference residue, anchored residue) tuples.
:rtype: list(tuple(residue.Residue, residue.Residue))
"""
ref_seq = self.getReferenceSeq()
return [(ref_seq[res.idx_in_seq], res)
for res in self.getAnchoredResidues()]
def _unsuspendAnchors(self):
self._suspend_anchors_count -= 1
if self._suspend_anchors_count > 0:
# If we haven't exited as many times as we've entered,
# return early
return
self._anchored_residues = self._saved_anchored_residues
del self._saved_anchored_residues
self._RO_anchored_residues = None
self._RO_anchored_residues_with_ref = None
for ref_res, res in self._anchor_pairs:
if ref_res.idx_in_seq != res.idx_in_seq:
# Discard anchors because at least one is invalid
self._anchored_residues.clear()
raise AnchoredResidueError(
'Residue anchoring broken during suspension'
f': {ref_res.idx_in_seq}, {res.idx_in_seq}')
del self._anchor_pairs
@msv_utils.const
def _assertCanAnchor(self, residues):
ref_seq = self.getReferenceSeq()
if any(res.idx_in_seq >= len(ref_seq) for res in residues):
raise ValueError("Can't anchor residues that extend beyond the "
"length of the reference sequence.")
if any(ref_seq[res.idx_in_seq].is_gap for res in residues):
raise ValueError("Can't anchor to a reference gap")
[docs] def anchorResidues(self, residues):
"""
Anchor the specified residues. If passed reference residues, all
residues aligned to the reference residues will be anchored.
Anchored residues are constrained to stay aligned to the reference
residue with the same column index at the time of anchoring. If
elements are removed from the alignment, gaps are added before anchors
to maintain alignment. If any other modifications are made to the
alignment that would break an anchor, an exception is raised. However,
calling code can temporarily take responsibility for maintaining the
anchors within the `suspendAnchors` context.
:param residues: Residues to anchor.
:type residues: list(residue.Residue)
"""
self._assertCanAnchor(residues)
requested_residues = set(residues)
ref_seq = self.getReferenceSeq()
should_emit_signal = False
residues_to_anchor = []
for res in requested_residues:
if res in ref_seq:
# If the reference residue is the only residue in the column
# in `requested_residues`, add all of the column residues to
# `residues_to_anchor`.
ref_idx = res.idx_in_seq
col_elems = self.getColumn(ref_idx, omit_gaps=False)[1:]
if not any(elem in requested_residues for elem in col_elems):
def is_gap(elem):
is_terminal_gap = elem is None
is_regular_gap = not is_terminal_gap and elem.is_gap
return is_terminal_gap or is_regular_gap
residues_to_anchor.extend(
elem for elem in col_elems if not is_gap(elem))
elif res.is_res:
residues_to_anchor.append(res)
for res in residues_to_anchor:
if res not in self._anchored_residues[res.sequence]:
self._anchored_residues[res.sequence].add(res)
should_emit_signal = True
if should_emit_signal:
self.signals.anchoredResiduesChanged.emit()
def _anchorResidueIndices(self, indices):
"""
Anchor the specified residues.
:param indices: A list of (sequence index, residue index) tuples for all
residues to anchor.
:type indices: list[tuple(int, int)]
"""
anchor_residues = [
self[seq_idx][res_idx] for seq_idx, res_idx in indices
]
self.anchorResidues(anchor_residues)
[docs] @msv_utils.const
def getAnchoredResidues(self):
"""
:return: A frozenset of residues that are currently anchored.
:rtype: frozenset(residue.Residue)
"""
# Return a read-only set of all anchored residues.
if self._RO_anchored_residues is None:
all_anchored_residues = itertools.chain(
*self._anchored_residues.values())
self._RO_anchored_residues = frozenset(all_anchored_residues)
return self._RO_anchored_residues
@msv_utils.const
def _getAnchoredResidueIndices(self):
"""
Return [sequence index, residue index] for all anchored residues.
:rtype: list[list(int, int)]
"""
residues = self.getAnchoredResidues()
return list(map(list, self.getResidueIndices(residues)))
[docs] @msv_utils.const
def getAnchoredResiduesWithRef(self):
"""
:return: A frozenset of residues that are currently anchored with the
corresponding reference sequence residues
:rtype: frozenset(residue.Residue)
"""
if len(self) == 0:
return set()
if self._RO_anchored_residues_with_ref is None:
self._RO_anchored_residues_with_ref = \
frozenset(itertools.chain.from_iterable(self._getAnchorPairs()))
return self._RO_anchored_residues_with_ref
[docs] def clearAnchors(self):
self._clearAnchorsFromSeqs(0, len(self))
@QtCore.pyqtSlot(int, int)
def _clearAnchorsFromSeqs(self, start_seq_idx, end_seq_idx):
"""
Clear all anchors from self[start_seq_idx:end_seq_idx+1]
"""
should_emit_signal = False
for seq in self._sequences[start_seq_idx:end_seq_idx + 1]:
if seq in self._anchored_residues:
residues = self._anchored_residues.pop(seq)
if residues:
should_emit_signal = True
if should_emit_signal:
self.signals.anchoredResiduesChanged.emit()
[docs] def removeAnchors(self, residues):
"""
Unanchor residues. If passed reference residues, all residues anchored
to those reference residues will be unanchored. Any given unanchored
residues will be ignored.
:param residues: The residues to unanchor.
:type residues: iterable(residue.Residue)
"""
residues = list(residues)
residues_to_unanchor = []
ref_seq = self.getReferenceSeq()
for res in residues:
if res in ref_seq:
ref_idx = res.idx_in_seq
col_elems = self.getColumn(ref_idx, omit_gaps=True)
residues_to_unanchor.extend(col_elems[1:]) # omit ref res
else:
residues_to_unanchor.append(res)
should_emit_signal = False
if residues_to_unanchor:
for res in residues_to_unanchor:
try:
self._anchored_residues[res.sequence].remove(res)
should_emit_signal = True
except KeyError:
pass
if should_emit_signal:
self.signals.anchoredResiduesChanged.emit()
@msv_utils.const
def _assertCanInsert(self, seq, insert_indices):
"""
Raise an exception if can't insert due to anchor
:param seq: Sequence where insertion will occur
:type seq: sequence.Sequence
:param insert_indices: Indices of insertion (must be non-empty).
:type insert_indices: iterable(int)
"""
if seq == self.getReferenceSeq():
seq_anchors = self.getAnchoredResidues()
else:
seq_anchors = self._anchored_residues.get(seq, [])
if seq_anchors:
first_idx = min(insert_indices)
anchor_ok = all(res.idx_in_seq < first_idx for res in seq_anchors)
if not anchor_ok:
err_msg = ("Adding elements before an anchored residue will "
"break the anchoring.")
raise AnchoredResidueError(err_msg)
############################################################################
# Subalignments
############################################################################
[docs] @msv_utils.const
def getSubalignment(self, start, end):
"""
Return another alignment containing the elements within the specified
start and end indices
:param start: The index at which the subalignment should start
:type start: int
:param end: The index at which the subalignment should end (exclusive)
:type end: int
:return: An alignment corresponding to the start and end points
specified
:rtype: BaseAligment
"""
AlignmentClass = self.__class__
seqs = [seq.getSubsequence(start, end) for seq in self._sequences]
aln = AlignmentClass(seqs)
return aln
[docs] @msv_utils.const
def getDiscontinuousSubalignment(self, indices):
"""
Given a list of indices, return a new alignment of sequences made up of
the residues at those specified indices within this alignment.
:param indices: List of (seq index, residue index) tuples
:type indices: list of (int, int)
:return: A new subalignment
:rtype: BaseAlignment
"""
indices.sort()
res_range = [idx[1] for idx in indices]
min_i = min(res_range)
max_i = max(res_range)
AlignmentClass = self.__class__
seqs = []
for seq_idx, group in itertools.groupby(indices, lambda idx: idx[0]):
seq = self._sequences[seq_idx]
SeqClass = seq.__class__
res_idxs = [idx[1] for idx in group]
res_list = []
for ridx in range(min_i, max_i + 1):
res = seq[ridx] if ridx in res_idxs else None
res_list.append(res)
seqs.append(
SeqClass(res_list,
name=seq.name,
long_name=seq.long_name,
chain=seq.chain))
aln = AlignmentClass(seqs)
return aln
@msv_utils.const
def _assertCanRemoveSubalignment(self, start, end):
removed = []
for seq in self:
removed.extend(seq[start:end])
self._assertCanRemove(removed)
[docs] def removeSubalignment(self, start, end):
"""
Remove a block of the subalignment from the start to end points.
:param start: The start index of the columns to remove
:type start: int
:param end: The end index of the columns to remove (exclusive)
:type end: int
"""
self._assertCanRemoveSubalignment(start, end)
removed = []
for seq in self:
removed.extend(seq[start:end])
with self.suspendAnchors():
self.removeElements(removed)
@property
def is_rectangular(self):
return all(len(seq) == self.num_columns for seq in self)
def _assertRectangular(self, other_aln):
"""
:raises ValueError: if either this or the other alignment are not
rectangular
"""
if not other_aln.is_rectangular:
msg = "The other alignment must be rectangular"
raise ValueError(msg)
if not self.is_rectangular:
msg = "This alignment must be rectangular to modify a subalignment"
raise ValueError(msg)
[docs] def insertSubalignment(self, aln, start):
"""
Insert an alignment into the current alignment at the specified index
:param aln: The alignment to insert
:type aln: `BaseAlignment`
:param start: The index at which to insert the alignment
:type start: int
:raises ValueError: if either alignment is not rectangular
"""
self._insertSubalignment(aln, start, require_rectangular=True)
@msv_utils.const
def _insertSubalignment(self, aln, start, require_rectangular=True):
if len(aln) != len(self):
msg = ("Attempting to insert alignment with %d sequences into an "
"alignment with %d sequences", (len(aln), len(self)))
raise ValueError(msg)
if require_rectangular:
self._assertRectangular(aln)
with self.suspendAnchors():
for seq, insert_seq in zip(self, aln):
self.addElements(seq, start, insert_seq)
[docs] def replaceSubalignment(self, aln, start, end):
"""
Replace a subsection of the alignment indicated by start and end indices
with the specified alignment
:param aln: The alignment to insert
:type aln: `BaseAlignment`
:param start: The starting index of the subsection to replace.
:type start: int
:param end: The ending index of the subsection to replace.
:type end: int
:raises ValueError: if either alignment is not rectangular
"""
self._assertRectangular(aln)
self.removeSubalignment(start, end)
self.insertSubalignment(aln, start)
############################################################################
# Gaps
############################################################################
[docs] @msv_utils.const
def getGaps(self):
"""
Returns a list of list of gaps.
:return: list(list(residue.Gap))
:rtype: list
"""
return [seq.getGaps() for seq in self]
[docs] @msv_utils.const
def getTerminalGaps(self):
"""
Returns the terminal gaps in all the sequences
:rtype: list
:return: list(list(residue.Gap))
"""
return [seq.getTerminalGaps() for seq in self]
[docs] def removeAllGaps(self):
"""
Removes all the gaps of the sequences in the alignment.
"""
elems_to_remove = [elem for seq in self for elem in seq if elem.is_gap]
if not elems_to_remove:
return
if self.getAnchoredResidues():
raise AnchoredResidueError(
"Can't removeAllGaps while there is an "
"anchored residue. Clear all anchors and try "
"again.")
self.signals.residuesAboutToBeRemoved.emit(elems_to_remove)
for seq in self:
seq.removeAllGaps()
self.signals.residuesRemoved.emit()
[docs] def removeTerminalGaps(self):
"""
Removes the gaps from the ends of every sequence in the alignment
"""
for seq in self:
seq.removeTerminalGaps()
@msv_utils.const
def _validateGapBeforeIndices(self, gap_indices):
"""
Check whether inserting the gaps would break anchors. Intended for gap
indices before the insertion, as passed to `addGapsBeforeIndices`.
See `_validateGapIndices` for additional documentation.
"""
self._validateGapIndices(
[sequence.offset_indices(indices) for indices in gap_indices])
@msv_utils.const
def _validateGapIndices(self, gap_indices):
"""
Check whether inserting the gaps would break anchors. Intended for gap
indices after the insertion, as passed to `addGapsByIndices`.
:param gap_indices: A list of lists of gap indices
:type gap_indices: list[list[int]]
:raises ValueError: If `gap_indices` is the wrong length or contains
indices that will be more than one column past the end of a
sequence.
:raises AnchoredResidueError: If any gap index is before an anchored col
"""
n_seqs = len(self)
if len(gap_indices) != n_seqs:
msg = ("The number of gap_indices passed to addGaps "
f"({len(gap_indices)}) should match the number of sequences "
f"in the alignment ({n_seqs})")
raise ValueError(msg)
for cur_gap_indices, seq in zip(gap_indices, self):
seq.validateGapIndices(cur_gap_indices)
seq_idxs_with_gaps = collections.defaultdict(list)
for seq_idx, gaps in enumerate(gap_indices):
for col_idx in gaps:
seq_idxs_with_gaps[col_idx].append(seq_idx)
broken_anchors = set()
n_gaps_per_seq = [0] * n_seqs
unbroken_seq_idxs = set(range(1, n_seqs))
for col_idx, seq_idxs in sorted(seq_idxs_with_gaps.items()):
prev_n_gaps_per_seq = n_gaps_per_seq.copy()
# Update running total of gaps added per seq
for seq_idx in seq_idxs:
n_gaps_per_seq[seq_idx] += 1
n_ref_gaps = n_gaps_per_seq[0]
for seq_idx in sorted(unbroken_seq_idxs):
n_nonref_gaps = n_gaps_per_seq[seq_idx]
# To keep anchors aligned, equal numbers of gaps must be added
# to the ref and non-ref seq.
insertion_allowed = bool(n_nonref_gaps == n_ref_gaps)
if insertion_allowed:
continue
# If the insertion is uneven, all downstream anchors for the
# current non-ref seq will be broken.
unbroken_seq_idxs.remove(seq_idx)
# Account for non-anchor-breaking upstream gap insertions
gap_offset = prev_n_gaps_per_seq[seq_idx]
msg = ("Previous gap offsets must be equal, otherwise this "
"seq was broken in a previous column and shouldn't have "
"been checked again")
assert gap_offset == prev_n_gaps_per_seq[0], msg
offset_col_idx = col_idx - gap_offset
downstream_anchors = self._getDownstreamAnchors(
seq_idx, offset_col_idx)
broken_anchors.update(downstream_anchors)
if broken_anchors:
err_msg = ("Adding elements before an anchored residue will "
"break the anchoring.")
raise AnchoredResidueError(err_msg, broken_anchors)
@msv_utils.const
def _getDownstreamAnchors(self, seq_idx, col_idx):
"""
Get anchors at or downstream of the given column.
:param seq_idx: Index of sequence to get anchors for
:type seq_idx: int
:param col_idx: Index of column to get anchors for
:type col_idx: int
:return: Anchors downstream of `col_idx` in the sequence
:rtype: list[residue.Residue]
"""
seq = self[seq_idx]
anchored_res = self._anchored_residues.get(seq, set())
return [res for res in anchored_res if res.idx_in_seq >= col_idx]
[docs] def addGapsByIndices(self, gap_indices):
"""
Adds gaps to the alignment
:note: the length of the gap_indices list must match the number
of sequences in the alignment.
:param gap_indices: A list of lists of gap indices, one for each
sequence in the alignment. Note that these indices are based on
residue/gap numbering *after* the insertion. To insert gaps using
indices based on numbering before the insertion, see
`addGapsBeforeIndices`.
:type gap_indices: list[list[int]]
:raises ValueError: if `gap_indices` is the wrong length
:raises AnchoredResidueError: if any gap index is before an anchored col
"""
self._validateGapIndices(gap_indices)
for seq, indices in zip(self._sequences, gap_indices):
seq.addGapsByIndices(indices)
[docs] def addGapsBeforeIndices(self, gap_indices):
"""
Add one gap to the alignment before each of the specified residue
positions.
:note: the length of the gap_indices list must match the number
of sequences in the alignment.
:param gap_indices: A list of lists of indices to insert gaps before, one
for each sequence in the alignment. Note that these indices are
based on residue/gap numbering *before* the insertion. To insert
gaps using indices based on numbering after the insertion, see
`addGapsByIndices`.
"""
self._validateGapBeforeIndices(gap_indices)
for seq, cur_indices in zip(self._sequences, gap_indices):
seq.addGapsBeforeIndices(cur_indices)
[docs] def padAlignment(self):
"""
Insert gaps into an alignment so that it forms a rectangular block
"""
pad_gaps = []
length = self.num_columns
for seq in self:
seq_len = len(seq)
gap_locations = list(range(seq_len, length))
pad_gaps.append(gap_locations)
self.addGapsByIndices(pad_gaps)
[docs] @msv_utils.const
def getGapOnlyColumns(self):
"""
For each sequence, return a list of the indices in that sequence for
which the entire alignment contains gaps. (Indices will be omitted for
a sequence if the sequence is shorter than the index.)
:return: List of list of indices
:rtype: list[list[int]]
"""
def all_gaps(col):
# Empty column means all gaps
return len(col) == 0
gap_indices = [
i for (i, col) in enumerate(self.columns(omit_gaps=True))
if all_gaps(col)
]
truncated_indices = []
for seq in self:
# filter out gap indices beyond the length of the seq
seq_gaps = [i for i in gap_indices if i < len(seq)]
truncated_indices.append(seq_gaps)
return truncated_indices
[docs] def minimizeAlignment(self):
"""
Minimizes the alignment, i.e. removes all gaps from the gap-only
columns.
"""
gap_only_columns = self.getGapOnlyColumns()
for seq, indices, in zip(self, gap_only_columns):
# Make sure that we're not requesting the removal of a gap that
# doesn't exist. This can happen when minimizing an alignment
# in which the sequences are different lengths.
gaps = [seq[i] for i in indices if i < len(seq)]
seq.removeElements(gaps)
[docs] @msv_utils.const
def getAlignmentMinimizedWithSpaces(self):
"""
This method returns a new alignment and removes gap only columns
however it leaves one gap column between blocks
:returns: the new, minimized alignment
:rtype: BaseAlignment
"""
new_aln = copy.deepcopy(self)
gap_only_cols = new_aln.getGapOnlyColumns()
# Process each sequence separately, because sequences may have
# unequal number of residues
for seq, gaps in zip(new_aln, gap_only_cols):
if len(gaps):
gap_groups = get_contiguous_groups(gaps)
to_remove = []
# If the alignment has a leading gap-only column, remove it
if gap_groups[0][0] == 0:
to_remove.append(0)
for group in gap_groups:
to_remove.extend(group[1:])
if len(to_remove):
seq.removeElements([seq[i] for i in to_remove])
return new_aln
############################################################################
# Columns
############################################################################
[docs] @msv_utils.const
def getColumn(self, index, omit_gaps=False):
"""
Returns single alignment column at index position. Optionally,
filters out gaps if omit_gaps is True.
:param index: The index in the alignment
:type index: int
:param omit_gaps: Whether to omit the gaps
:type omit_gaps: bool
:return: Single alignment column at index position. Returns None
to represent terminal gaps.
:rtype: tuple(residue.Residue or residue.Gap or None)
"""
column = []
for seq in self:
elem = None
if index < len(seq):
elem = seq[index]
if not omit_gaps or (elem is not None and elem.is_res):
column.append(elem)
return tuple(column)
[docs] @msv_utils.const
def columns(self, omit_gaps=False, *, match_type=False):
"""
A generator over all columns.
:param omit_gaps: Whether to omit gaps
:type omit_gaps: bool
:param match_type: Whether to match reference sequence type
:type match_type: bool
"""
if omit_gaps:
def keep_func(elem):
return elem is not None and elem.is_res
else:
def keep_func(elem):
return elem is not None
if match_type:
seqs = list(self.getSeqsMatchingRefType())
# Make sure we yield a tuple for every column
seqs.append(itertools.repeat(None, self.num_columns))
else:
seqs = self
for col in itertools.zip_longest(*seqs):
yield tuple(elem for elem in col if keep_func(elem))
[docs] @msv_utils.const
def seqMatchesRefType(self, seq):
if self._ref_type_cache is None:
# TODO MSV-1504 once nucleotide sequence doesn't inherit protein
# sequence, use isinstance instead of type check
self._ref_type_cache = type(self.getReferenceSeq())
return type(seq) == self._ref_type_cache
[docs] @msv_utils.const
def getSeqsMatchingRefType(self):
if self._seqs_matching_ref_type_cache is None:
self._seqs_matching_ref_type_cache = tuple(
seq for seq in self if self.seqMatchesRefType(seq))
return self._seqs_matching_ref_type_cache
def _updateColumnSameResCache(self):
"""
Update the cache of whether each column has all the same residue.
False if any residue in the column is unknown or the column is mixed
type (protein/nucleic acids).
"""
self._col_same_res_cache = [False] * self.num_columns
for index, column in enumerate(self.columns(omit_gaps=True)):
if self._getColumnHasSameRes(column):
self._col_same_res_cache[index] = True
def _getColumnHasSameRes(self, column):
col_set = set()
# To make sure we aren't mixing protein and nucleic acid residues
seen_types = set()
for res in column:
if res.type in {residue.UNKNOWN, residue.UNKNOWN_NA}:
return False
seen_types.add(type(res.type))
if len(seen_types) > 1:
return False
if res.type.nonstandard:
col_set.add(res.long_code)
else:
col_set.add(res.short_code)
if len(col_set) > 1:
return False
return len(col_set) == 1
def _updateResidueSimilarityCache(self):
"""
Update the cache of residue similarities
"""
self._res_similarity_cache = {}
ref_seq = self.getReferenceSeq()
if ref_seq is None:
return
for seq in self:
for ref_res, res in itertools.zip_longest(ref_seq, seq):
if not res:
continue
elif not ref_res or ref_res.is_gap or res.is_gap:
sim = ResidueSimilarity.NA
elif res.getIdentityStrict(ref_res):
sim = ResidueSimilarity.Identical
elif res.getBinarySimilarity(ref_res):
sim = ResidueSimilarity.Similar
else:
sim = ResidueSimilarity.Dissimilar
self._res_similarity_cache[res] = sim
[docs] @msv_utils.const
def columnHasAllSameResidues(self, index):
"""
Return whether or not the column at a specified index has all the
same residues (excluding gaps).
Note that if any unknown residues are present, the column will not
be considered to be of all the same residue type.
:param index: Index to check for uniformity
:type index: int
:return: True if the column is of uniform identity, False otherwise.
:rtype: bool
"""
if self._col_same_res_cache is None:
self._updateColumnSameResCache()
return self._col_same_res_cache[index]
[docs] @msv_utils.const
def getResidueSimilarity(self, res):
"""
Return the similarity score of a residue to the current reference
residue at the residues position in the alignment.
:param res: Residue to get the similarity score for
:type res: `residue.Residue`
:return: Similarity score for this residue
:rtype: float or None
"""
if self._res_similarity_cache is None:
self._updateResidueSimilarityCache()
return self._res_similarity_cache[res]
[docs] @msv_utils.const
def elementsToContiguousColumns(self,
elements,
invert=False,
additional_breaks=None,
last_col=None):
"""
Get elements marking contiguous columns containing any of the passed
elements
:param elements: Elements to convert to columns
:type elements: iterable(AbstractSequenceElement)
:param invert: Whether to invert logic (i.e. return columns not
containing the passed elements)
:type invert: bool
:param additional_breaks: If given, contiguous columns will be broken at
the specified indices. I.e., no contiguous set of columns will
contain both column i and column i-1.
:type additional_breaks: list[int] or None
:param last_col: If given, the last column to consider when constructing
contiguous columns. It not given, all columns will be considered.
:type last_col: int or None
:return: [start, end] elements of contiguous columns. Will be from the
ref sequence unless the ref sequence is shorter than `num_columns`
:rtype: iterable(tuple(AbstractSequenceElement,
AbstractSequenceElement))
"""
elements = set(elements)
if additional_breaks is None:
additional_breaks = set()
else:
additional_breaks = set(additional_breaks)
start = None
end = None
columns = itertools.islice(self.columns(), last_col)
for i, column_elems in enumerate(columns):
if i in additional_breaks and start is not None:
yield start, end
start = None
end = None
if invert:
match = all(ele not in elements for ele in column_elems)
else:
match = any(ele in elements for ele in column_elems)
if match:
column_residues = (ele for ele in column_elems if ele.is_res)
cur_ele = next(column_residues, None)
if cur_ele is None:
raise RuntimeError("Call aln.minimizeAlignment first")
if start is None:
start = cur_ele
end = cur_ele
elif start is not None:
yield start, end
start = None
end = None
if start is not None:
yield start, end
############################################################################
# Utilities
############################################################################
[docs] @QtCore.pyqtSlot()
@msv_utils.const
def clearAllCaching(self):
self._resetCaches()
self.annotations.clearAllCaching()
for seq in self._sequences:
seq.clearAllCaching()
@msv_utils.const
def _getMoveSequenceAfterIndices(self, dest_seq, seqs_to_move):
"""
Figure out the rearrangements required to move sequences in the
alignment. Note that the reference sequence cannot be moved by this
method. If present in `seqs_to_move`, it will be ignored.
:param dest_seq: The sequence to place `seqs_to_move` after.
:type dest_seq: sequence.Sequence
:param seqs_to_move: A list of all sequences to move.
:type seqs_to_move: list[sequence.Sequence]
:return: A list of new indices to accomplish the specified move.
:rtype: list[int]
"""
if not seqs_to_move:
raise ValueError("No sequences to move")
dest_index = self.index(dest_seq) + 1
indices_to_move = sorted(map(self.index, seqs_to_move))
if indices_to_move[0] == 0:
# don't allow the reference sequence to be dragged
del indices_to_move[0]
if not indices_to_move:
# Raise an exception if the reference sequence was the *only*
# sequence specified. (Otherwise, we just ignore it.)
raise ValueError("Cannot move the reference sequence")
dest_index -= sum(1 for index in indices_to_move if index < dest_index)
new_indices = [i for i in range(len(self)) if i not in indices_to_move]
new_indices[dest_index:dest_index] = indices_to_move
return new_indices
@msv_utils.const
def _getAddSeqsToAlnSetOrdering(self, seqs_to_add, set_name):
"""
Figure out how to rearrange the alignment to move the specified
sequences into the given alignment set.
:param seqs_to_add: The sequences to add to the alignment set. Must not
be empty and must not contains any sequences that are already in the
alignment set being added to.
:type seqs_to_add: Iterable[sequence.Sequence]
:param set_name: The name of the alignment set to add to
:type set_name: str
:return: A list of new indices to accomplish the desired rearrangement.
:rtype: list[int]
"""
seqs_to_add = set(seqs_to_add)
ref_seq = self.getReferenceSeq()
if set_name in self._name_to_set:
aln_set = self._name_to_set[set_name]
if ref_seq in seqs_to_add:
# We're adding the reference sequence to the set, so move the
# set to the top of the alignment
new_indices = self._alnSetOrderingRefSeq(
ref_seq, seqs_to_add, aln_set)
else:
last_seq_in_set = max(aln_set, key=self.index)
new_indices = self._getMoveSequenceAfterIndices(
last_seq_in_set, seqs_to_add)
else:
# this is a new set, so move all sequences to the location of the
# first added sequence
first_seq_to_add = min(seqs_to_add, key=self.index)
prev_set = self._seq_to_set.get(first_seq_to_add)
if prev_set is None:
seqs_to_add -= {first_seq_to_add}
if seqs_to_add:
new_indices = self._getMoveSequenceAfterIndices(
first_seq_to_add, seqs_to_add)
else:
# we don't need to do any rearranging
new_indices = list(range(len(self)))
elif first_seq_to_add is ref_seq:
# we're changing the set of the reference sequence, so move all
# of the sequences to add to the top of the alignment
new_indices = self._alnSetOrderingRefSeq(
ref_seq, seqs_to_add, [])
else:
# the first added sequence is already in a set, so we need to
# move it to the end of the set it's already in
new_set_indices = set(map(self.index, seqs_to_add))
last_index_from_prev_set = max(map(self.index, prev_set))
new_indices = list(i for i in range(last_index_from_prev_set +
1)
if i not in new_set_indices)
new_indices.extend(sorted(new_set_indices))
already_placed = set(new_indices)
new_indices.extend(
i for i in range(len(self)) if i not in already_placed)
return new_indices
def _alnSetOrderingRefSeq(self, ref_seq, seqs_to_add, aln_set):
"""
Figure out how to rearrange the alignment to move the specified
sequences, which contain the reference sequence, into a new or existing
alignment set.
:param ref_seq: The reference sequence
:type ref_seq: sequence.Sequence
:param seqs_to_add: The sequences to add to the alignment set. Must
contain the reference sequence and must not contains any sequences
that are already in the alignment set being added to.
:type seqs_to_add: Iterable[sequence.Sequence]
:param aln_set: An iterable of sequences already in the alignment set,
if any.
:type aln_set: Iterable(sequence.Sequence)
"""
new_indices = [0]
indices_in_set = sorted(map(self.index, aln_set))
new_indices.extend(indices_in_set)
indices_to_add = sorted(map(self.index, seqs_to_add - {ref_seq}))
new_indices.extend(indices_to_add)
already_placed = set(new_indices)
new_indices.extend(
i for i in range(len(self)) if i not in already_placed)
return new_indices
[docs] def addSeqsToAlnSet(self, seqs, set_name):
"""
Add all given sequences to the specified alignment set (i.e. a named
group of sequences that are always kept together in the alignment).
Sequences already in the set will be ignored. All other sequences will
be moved to the end of the set. (Except for the reference sequence: The
specified set will be moved to the top of the alignment if the reference
sequence is added.)
:param seqs: The sequences to add to the set.
:type seqs: Iterable[sequence.Sequence]
:param set_name: The name of the set to add the sequences to. If no set
of this name exists, one will be created.
:type set_name: str
"""
seqs = set(seqs)
if set_name in self._name_to_set:
seqs.difference_update(self._name_to_set[set_name])
if not seqs:
return
new_indices = self._getAddSeqsToAlnSetOrdering(seqs, set_name)
self.reorderSequences(new_indices)
self._addSeqsToAlnSetNoReordering(seqs, set_name)
def _addSeqsToAlnSetNoReordering(self, seqs, set_name):
"""
Add all given sequences to the specified alignment set (i.e. a named
group of sequences that are always kept together in the alignment).
Sequences will not be moved in the alingment.
:param seqs: The sequences to add to the set.
:type seqs: Iterable[sequence.Sequence]
:param set_name: The name of the set to add the sequences to. If no set
of this name exists, one will be created.
:type set_name: str
"""
alignment_set = self._name_to_set.get(set_name)
if alignment_set is None:
alignment_set = AlignmentSet(set_name, self._set_id_counter)
self._set_id_counter += 1
self._name_to_set[set_name] = alignment_set
for cur_seq in seqs:
old_set = self._seq_to_set.get(cur_seq)
if old_set is alignment_set:
# The sequence is already part of the requested set, so there's
# nothing to do
continue
elif old_set is not None:
self._removeSeqsFromAlnSetNoReordering([cur_seq])
alignment_set.add(cur_seq)
self._seq_to_set[cur_seq] = alignment_set
self.signals.alnSetChanged.emit()
[docs] def removeSeqsFromAlnSet(self, seqs):
"""
Remove all given sequences from any alignment sets they're part of.
Sequences not in a set will be ignored. All other sequences will be
moved to the end of the set that they were in.
:param seqs: The sequences to remove from alignment sets.
:type seqs: Iterable[sequence.Sequence]
"""
# ignore any sequences that aren't in a set
seqs = set(seqs).intersection(self._seq_to_set.keys())
if not seqs:
# nothing to remove
return
reordering = self._getRemoveFromAlnSetOrdering(seqs)
self.reorderSequences(reordering)
self._removeSeqsFromAlnSetNoReordering(seqs)
@msv_utils.const
def _getRemoveFromAlnSetOrdering(self, seqs_to_remove):
"""
Figure out how to rearrange the alignment to move the specified
sequences out of their current alignment set.
:param seqs_to_remove: The sequences to remove from alignment sets.
Must not contain any sequences that are not in an alignment set.
:type seqs_to_remove: Iterable[sequence.Sequence]
:return: A list of new indices to accomplish the desired rearrangement.
:rtype: list[int]
"""
seqs_to_remove = set(seqs_to_remove)
# never move the reference sequence
ordering = [0]
cur_to_remove = []
prev_set = None
for i, cur_seq in enumerate(self[1:], start=1):
cur_set = self.alnSetForSeq(cur_seq)
if prev_set is not None and cur_set is not prev_set:
# We've reached the end of a set, so put all sequences that we
# removed from the set here.
ordering.extend(cur_to_remove)
cur_to_remove.clear()
if cur_seq in seqs_to_remove:
cur_to_remove.append(i)
else:
ordering.append(i)
prev_set = cur_set
ordering.extend(cur_to_remove)
return ordering
def _removeSeqsFromAlnSetNoReordering(self, seqs):
"""
Remove all given sequences from any alignment sets they're part of.
Sequences will not be moved in the alignment.
:param seqs: The sequences to remove from alignment sets.
:type seqs: Iterable[sequence.Sequence]
"""
should_emit_signal = False
for cur_seq in seqs:
alignment_set = self._seq_to_set.get(cur_seq)
if alignment_set is not None:
should_emit_signal = True
del self._seq_to_set[cur_seq]
alignment_set.remove(cur_seq)
if not alignment_set:
# remove the set if it's empty
del self._name_to_set[alignment_set.name]
if should_emit_signal:
self.signals.alnSetChanged.emit()
[docs] def renameAlnSet(self, old_name, new_name):
"""
Rename the specified alignment set.
:param old_name: The old name of the alignment set.
:type old_name: str
:param new_name: The new name of the alignment set.
:type new_name: str
"""
if new_name in self._name_to_set:
raise ValueError(f"Set {new_name} already present.")
try:
alignment_set = self._name_to_set[old_name]
except KeyError:
raise ValueError(f"No set {old_name}")
del self._name_to_set[old_name]
alignment_set.name = new_name
self._name_to_set[new_name] = alignment_set
self.signals.alnSetChanged.emit()
[docs] @msv_utils.const
def alnSetForSeq(self, seq):
"""
Return the alignment set that contains the given sequence.
:param seq: The sequence to retrieve the alignment set for.
:type seq: sequence.Sequence
:return: The requested set. The calling scope must not modify the
returned value. Will return None if `seq` is not part of any set.
:rtype: AlignmentSet or None
"""
return self._seq_to_set.get(seq)
[docs] @msv_utils.const
def hasAlnSets(self):
"""
Does this alignment contain any alignment sets?
:rtype: bool
"""
return bool(self._seq_to_set)
[docs] @msv_utils.const
def alnSetNames(self):
"""
Return all alignment set names.
:rtype: set(str)
"""
return set(self._name_to_set.keys())
[docs] @msv_utils.const
def alnSets(self):
"""
Iterate through all alignment sets.
:return: An iterator through all alignment sets. The calling scope must
not modify any of the sets.
:rtype: dict_keys
"""
return self._name_to_set.values()
[docs] @msv_utils.const
def getAlnSet(self, set_name):
"""
Return the requested set.
:param set_name: The name of the set to retrieve.
:type set_name: str
:return: The requested set. The calling scope must not modify the
returned value.
:rtype: AlignmentSet
:raise ValueError: If no set with the given name was found.
"""
try:
return self._name_to_set[set_name]
except KeyError:
raise ValueError(f"No set {set_name}")
def _getAlnSetSeqIdxs(self, seq):
"""
:return: the indexes of all seqs in `seq`'s aln set, with the idx of `seq` first.
:rtype: list[int]
"""
aln_set = self.alnSetForSeq(seq)
seq_idx = self.index(seq)
indexes = [seq_idx]
if aln_set is not None:
set_idxs = {self.index(s) for s in aln_set}
set_idxs.remove(seq_idx)
indexes.extend(sorted(set_idxs))
return indexes
[docs] def gatherAlnSets(self):
if not self.hasAlnSets():
return
new_seq_ordering = self._getGatherAlnSetsReordering()
self.reorderSequences(new_seq_ordering)
@msv_utils.const
def _getGatherAlnSetsReordering(self):
new_seq_ordering = []
seen_idxs = set()
for idx, seq in enumerate(self):
if idx in seen_idxs:
continue
aln_set_seq_idxs = self._getAlnSetSeqIdxs(seq)
new_seq_ordering.extend(aln_set_seq_idxs)
seen_idxs.update(aln_set_seq_idxs)
return new_seq_ordering
[docs] @msv_utils.const
def getFrequencies(self, normalize=True):
"""
Returns the frequencies of each residue in each column.
Residues are sorted by decreasing frequency.
Gapped positions are not counted when calculating frequencies.
:param normalize: Whether to normalize the values; i.e. divide by the
number of non-gaps in the column
:type normalize: bool
:return: frequencies of each residue in each alignment column
:rtype: tuple(tuple(residue.Residue, float or int)))
"""
for column in self.columns(omit_gaps=True, match_type=True):
if not column:
yield tuple()
continue
# Initialize a list of residue strings to pass to Counter and a
# dict to associate the residue strings with residue objects.
# Standard amino acids are represented using 1 letter code and
# nonstandard amino acids are represented using 3 letter code.
res_str_list = list()
res_str_to_res_dict = dict()
for res in column:
add_to_dict = True
if res.type.nonstandard:
res_str = res.long_code
else:
res_str = res.short_code
prev_res = res_str_to_res_dict.get(res_str)
if (prev_res is not None and
prev_res.type in residue.STD_AMINO_ACIDS):
# Don't overwrite a standard amino acid
add_to_dict = False
res_str_list.append(res_str)
if add_to_dict:
res_str_to_res_dict[res_str] = res
freq_dict = collections.Counter(res_str_list)
# Create a new most_common list containing residue objects
most_common_res = []
for res_str, freq in freq_dict.most_common():
res = res_str_to_res_dict[res_str]
if normalize:
freq = freq / len(column)
most_common_res.append((res, freq))
yield tuple(most_common_res)
[docs] @msv_utils.const
def getResidueSeqProps(self, value_types=None):
"""
Get a list of all sequence properties that any residue has. If
'value_types' is defined, get only the specific property types listed.
:param value_types: list of specific properties types- str, int or
float etc as structure.PROP_STRING,structure.PROP_INTEGER etc
:type value_types: List
:return: All the sequence properties
:rtype: list[properties.SequenceProperty]
"""
seq_props = []
prop_names = set()
desc_names = set()
# Get all atom structure property names
for st in self.all_structures:
if len(st.atom):
prop_names.update(st.getAtomPropertyNames())
# Get all residue descriptor names
for seq in self:
for res in seq:
if res.is_gap:
continue
desc_names.update(res.getDescriptorKeys())
for prop_name in prop_names:
if value_types is not None and prop_name[0] not in value_types:
continue
seq_props.append(
properties.SequenceProperty(
property_name=prop_name,
property_source=properties.PropertySource.Residue,
property_type=properties.PropertyType.StructureProperty,
))
for desc_name in desc_names:
seq_props.append(
properties.SequenceProperty(
property_name=desc_name,
property_source=properties.PropertySource.Residue,
property_type=properties.PropertyType.Descriptor,
))
return seq_props
[docs] @msv_utils.const
def getSeqsDescriptors(self):
"""
Return a list of all the calculated descriptors of the sequences in
the alignment.
:return: All the sequence descriptors
:rtype: list[properties.SequenceProperty]
"""
seq_descriptors = []
names = set()
for seq in self:
for prop_source, prop_value in seq.descriptors.items():
for name, value in prop_value.items():
if name in names:
continue
seq_descriptors.append(
properties.SequenceProperty(
visible=False,
property_name=name,
property_source=prop_source,
property_type=properties.PropertyType.Descriptor))
names.add(name)
return seq_descriptors
@property
def all_structures(self):
"""
Return an iterator over all sequence structures in the alignment. This
does not repeat structures that belong to multiple sequences.
"""
entry_ids = set()
for seq in self:
# Dont return the same structure twice
eid = seq.entry_id
if seq.entry_id in entry_ids:
continue
entry_ids.add(eid)
st = seq.getStructure()
if st is None:
continue
yield st
[docs]class AlignmentSet(set):
"""
A named group of sequences that are always kept together in the alignment.
"""
[docs] def __init__(self, name, set_id):
"""
:param name: The name of the alignment set.
:type name: str
:param set_id: A unique integer ID for the alignment set. Used to
determine the color of the icon and text.
:type set_id: int
"""
super().__init__()
self.name = name
self.set_id = set_id
class _ProteinAlignment(BaseAlignment):
"""
A base class for split-chain and combined-chain protein alignments.
"""
_ALN_ANNOTATION_CLASS = annotation.ProteinAlignmentAnnotations
_SEQ_ANNOTATION_CLASS = annotation.ProteinSequenceAnnotations
@property
def disulfide_bonds(self):
return tuple(set().union(*(set(seq.disulfide_bonds) for seq in self)))
@property
def pred_disulfide_bonds(self):
return tuple(
set().union(*(set(seq.pred_disulfide_bonds) for seq in self)))
@msv_utils.const
def _getInvalidatedBonds(self, seqs):
"""
Return any inter-sequence bonds between a sequence in `seqs` and a
sequence not in `seqs`. In other words, remove any inter-sequence bonds
that will no longer be valid after we remove `seqs` from the alignment.
:param seqs: The sequences to remove inter-sequence bonds from
:type seqs: set[sequence.Sequence]
:return: A tuple of:
- invalidated known disulfide bonds
- invalidated predicted disulfide bonds
:rtype: tuple(list[residue.DisulfideBond], list[residue.DisulfideBond])
"""
raise NotImplementedError
def _restoreInvalidatedBonds(self, invalidated_known_bonds,
invalidated_pred_bonds):
"""
Restore the invalidated bonds returned by `_getInvalidatedBonds`
:param invalidated_known_bonds: Known disulfide bonds to restore
:type invalidated_known_bonds: list[residue.DisulfideBond]
:param invalidated_pred_bonds: Predicted disulfide bonds to restore
:type invalidated_pred_bonds: list[residue.DisulfideBond]
"""
raise NotImplementedError
[docs]class ProteinAlignment(json.JsonableClassMixin, _ProteinAlignment):
[docs] @msv_utils.const
def toJsonImplementation(self):
anchor_res_indices = self._getAnchoredResidueIndices()
interseq_bonds = [
bond for bond in self.disulfide_bonds if bond.is_inter_sequence
]
bond_idxs = [[[self.index(res1.sequence), res1.idx_in_seq],
[self.index(res2.sequence), res2.idx_in_seq]]
for res1, res2 in interseq_bonds]
aln_sets = []
for aln_set in self._name_to_set.values():
seq_indices = [self.index(seq) for seq in aln_set]
aln_sets.append([aln_set.name, aln_set.set_id, seq_indices])
return {
'sequences': self._sequences,
'anchored_residues': anchor_res_indices,
'inter_sequence_bond_idxs': bond_idxs,
'aln_sets': aln_sets,
}
[docs] @classmethod
def fromJsonImplementation(cls, json_obj):
seqs = [
sequence.ProteinSequence.fromJson(json_seq)
for json_seq in json_obj['sequences']
]
aln = cls(seqs)
aln._anchorResidueIndices(json_obj['anchored_residues'])
for bond in json_obj['inter_sequence_bond_idxs']:
seq_idx1, res_idx1 = bond[0]
seq_idx2, res_idx2 = bond[1]
aln.addDisulfideBond(aln[seq_idx1][res_idx1],
aln[seq_idx2][res_idx2])
for set_name, set_id, seq_idxs in json_obj['aln_sets']:
seqs = [aln[idx] for idx in seq_idxs]
aln._addSeqsToAlnSetNoReordering(seqs, set_name)
aln._name_to_set[set_name].set_id = set_id
return aln
[docs] @json.adapter(version=48002)
def adapter48002(cls, json_dict):
json_dict['aln_sets'] = []
return json_dict
[docs] def addDisulfideBond(self, res1, res2, known=True):
"""
Add a disulfide bond if both residues' sequences are in the alignment
:param res1: A residue to link with a disulfide bond
:type res1: residue.Residue
:param res2: Another residue to link with a disulfide bond
:type res2: residue.Residue
:param known: Whether the bond is known or predicted
:type known: bool
:raise ValueError: if either sequence is not in the alignment
"""
seqs = {r.sequence for r in (res1, res2)}
if any(seq not in self for seq in seqs):
raise ValueError(("Cannot add a disulfide bond to residues from "
"sequence(s) that are not in the alignment"))
residue.add_disulfide_bond(res1, res2, known=known)
[docs] def removeDisulfideBond(self, bond):
"""
Disconnect a disulfide bond. The bond may be either known or predicted.
:param bond: The bond to disconnect
:type bond: residue.DisulfideBond
:raise ValueError: if either sequence is not in the alignment
"""
seqs = {r.sequence for r in bond}
if any(seq not in self for seq in seqs):
raise ValueError(
("Cannot remove a disulfide bond from residues from "
"sequence(s) that are not in the alignment"))
residue.remove_disulfide_bond(bond)
def __deepcopy__(self, memo):
aln = super().__deepcopy__(memo)
self._copyInterchainDisulfidesTo(aln)
return aln
@msv_utils.const
def _copyInterchainDisulfidesTo(self, aln):
"""
Copy all interchain disulfide bonds (known and predicted) from this
alignment to the given alignment.
:param aln: The alignment to copy disulfides to.
:type aln: ProteinAlignment
"""
mapped_bonds = [
self._mapResidues(bond, aln)
for bond in self.disulfide_bonds
if bond.is_inter_sequence
]
for res1, res2 in mapped_bonds:
aln.addDisulfideBond(res1, res2)
mapped_pred_bonds = [
self._mapResidues(bond, aln)
for bond in self.pred_disulfide_bonds
if bond.is_inter_sequence
]
for res1, res2 in mapped_pred_bonds:
aln.addDisulfideBond(res1, res2, known=False)
def _additionalSeqRemovalSteps(self, seqs):
# remove invalidated disulfide bonds
invalidated_known_bonds, invalidated_pred_bonds = \
self._getInvalidatedBonds(seqs)
for bond in invalidated_known_bonds + invalidated_pred_bonds:
self.removeDisulfideBond(bond)
@msv_utils.const
def _getInvalidatedBonds(self, seqs):
# See parent class for method documentation
invalidated_known_bonds = self._getInvalidatedBondsFromBondList(
seqs, self.disulfide_bonds)
invalidated_pred_bonds = self._getInvalidatedBondsFromBondList(
seqs, self.pred_disulfide_bonds)
return invalidated_known_bonds, invalidated_pred_bonds
def _getInvalidatedBondsFromBondList(self, seqs, bonds_to_check):
invalidated_bonds = []
for bond in bonds_to_check:
res1, res2 = bond.res_pair
if bool(res1.sequence in seqs) is not bool(res2.sequence in seqs):
invalidated_bonds.append(bond)
return invalidated_bonds
def _restoreInvalidatedBonds(self, invalidated_known_bonds,
invalidated_pred_bonds):
# See parent class for method documentation
for res1, res2 in invalidated_known_bonds:
self.addDisulfideBond(res1, res2)
for res1, res2 in invalidated_pred_bonds:
self.addDisulfideBond(res1, res2, known=False)
############################################################################
# IO Convenience Methods
############################################################################
[docs] @classmethod
def fromStructure(cls, ct, eid=None):
"""
:param ct: The structure to convert
:type ct: schrodinger.structure.Structure
:param eid: The entry id to assign to the created sequences. If not
given, the entry id from the structure, if any, will be used.
:type eid: str
:rtype: cls
:return: An alignment containing the sequences in the structure
"""
from schrodinger.application.msv import seqio
converter = seqio.StructureConverter(ct, eid)
sequences = converter.makeSequences()
return cls(sequences)
[docs] @classmethod
def fromClustalFile(cls, file_name):
"""
Returns alignment read from file in Clustal .aln format preserving
order of sequences.
:type file_name: str
:param file_name: Source file name.
:raises IOError: If output file cannot be read.
:return: An alignment
:note: The alignment can be empty if no sequence was present in the
input file.
"""
from schrodinger.application.msv import seqio
return seqio.ClustalAlignmentReader.read(file_name, cls)
[docs] @msv_utils.const
def toClustalFile(self, file_name, use_unique_names=True):
"""
Writes aln to a Clustal alignment file.
:raises IOError: If output file cannot be written.
:type file_name: str
:param file_name: Destination file name.
:param use_unique_names: If True, write unique name for each sequence.
:type use_unique_names: bool
"""
from schrodinger.application.msv import seqio
return seqio.ClustalAlignmentWriter.write(self, file_name,
use_unique_names)
[docs] @classmethod
def fromFastaFile(cls, file_name):
"""
Returns alignment read from file in Clustal .aln format preserving
order of sequences.
:raises IOError: If the input file cannot be read.
:param file_name: name of input FASTA file
:type file_name: str
:return: Read alignment. The alignment can be empty if no sequence was
present in the input file.
:rtype: ProteinAlignment
"""
from schrodinger.application.msv import seqio
return seqio.FastaAlignmentReader.read(file_name, cls)
[docs] @classmethod
def fromFastaString(cls, lines):
"""
Read sequences from FASTA-formatted text, creates sequences and appends
them to alignment. Splits sequence name from the FASTA header.
:param lines: list of strings representing FASTA file
:type lines: list of str
:return: The alignment
:rtype: ProteinAlignment
"""
from schrodinger.application.msv import seqio
return seqio.FastaAlignmentReader.readFromText(lines, cls)
[docs] @classmethod
def fromFastaStringList(cls, strings):
"""
Return an alignment object created from an iterable of sequence strings
:param strings: Sequences as iterable of strings (1D codes)
:type strings: Iterable of strings
:return: The alignment
:rtype: ProteinAlignment
"""
from schrodinger.application.msv import seqio
return seqio.FastaAlignmentReader.readFromStringList(strings, cls)
[docs] @msv_utils.const
def toFastaString(self, use_unique_names=True, maxl=50):
"""
Convert ProteinAlignment object to list of sequence strings
:param aln: Alignment data
:type aln: `ProteinAlignment`
"""
from schrodinger.application.msv import seqio
return seqio.FastaAlignmentWriter.toString(self, use_unique_names, maxl)
[docs] @msv_utils.const
def toFastaStringList(self):
"""
Convert self to list of fasta sequence strings
:rtype: list
:return: list of str
"""
from schrodinger.application.msv import seqio
return seqio.FastaAlignmentWriter.toStringList(self)
[docs] @msv_utils.const
def toFastaFile(self, file_name, use_unique_names=True, maxl=50):
"""
Write self to specified FASTA file
:raises IOError: If output file cannot be written.
"""
from schrodinger.application.msv import seqio
seqio.FastaAlignmentWriter.write(self, file_name, use_unique_names,
maxl)
############################################################################
# Utilities
############################################################################
[docs] @msv_utils.const
def findPattern(self, pattern):
"""
Finds a specified PROSITE pattern in all sequences.
:param pattern: PROSITE pattern to search in sequences. See
`protein.sequence.find_generalized_pattern` for documentation.
:type pattern: str
:returns: List of matching residues
:rtype: list of `protein.residue.Residue`
"""
# Encode sequences for pattern search
seq_dicts = [
seq.encodeForPatternSearch(with_ss=True,
with_flex=True,
with_asa=True) for seq in self
]
# Perform pattern search
# Insert dashes into uppercase letters (e.g. "CMY" -> "C-M-Y")
if pattern.isalpha() and pattern == pattern.upper():
pattern = "-".join(pattern)
pattern_matches = sequence.find_generalized_pattern(
seq_dicts, str(pattern))
if not pattern_matches:
return []
# Select specified residues
selected_residues = []
for seq_index, match_list in enumerate(pattern_matches):
for start, end in match_list:
matched_seq = self[seq_index]
res_indices = [
i for (i, res) in enumerate(matched_seq) if res.is_res
]
# select residues independent of gaps
for res_index in res_indices[start:end]:
selected_residues.append(matched_seq[res_index])
return selected_residues
[docs]class NucleicAcidAlignment(BaseAlignment):
# TODO (MSV-1504): Create proper nucleic acid domain objects
pass
class _SeqsFromTheSameProtein:
"""
Information about sequences from the same protein. Used in
`CombinedChainProteinAlignment._combineSequences`.
:ivar seqs: A list of list of sequences. No list of sequences contains
multiple sequences with the same chain name.
:vartype seqs: list(list(sequence.Sequence))
:ivar counts: A dictionary of {chain name: number of sequences with that
chain name}
:vartype: Counter
"""
def __init__(self):
self.seqs = []
self.counts = collections.Counter()
[docs]class CombinedChainProteinAlignment(_ProteinAlignment):
"""
An alignment containing combined-chain sequences
(`sequence.CombinedChainProteinSequence` objects).
"""
[docs] def __init__(self, sequences=None, *, chains_to_combine=None):
"""
:param sequences: A list of split-chain or combined-chain sequences to
add to the alignment. If not given, an empty alignment will be
created.
:type sequences: list[sequence.ProteinSequence] or
list[sequence.CombinedChainProteinSequence]
:param chains_to_combine: Information about which split-chain sequences
in `split_undoable_aln` should be included in which combined-chain
sequence. Should be a list of lists of indices. Each index refers
to the split-chain sequence at that position of
`split_undoable_aln`, and split-chain sequences that are listed
together will be combined into the same combined-chain sequence.
Each split-chain sequence from `split_undoable_aln` must be
referenced exactly once.
:type chains_to_combine: list[list[int]]
"""
self._split_seq_cache = None
if chains_to_combine is None:
super().__init__(sequences)
else:
super().__init__()
self._combineAndAddSplitSeqs(sequences, chains_to_combine)
[docs] def addSeqs(self, seqs, start=None):
"""
Add multiple sequences to the alignment. Note that either single-chain
sequences or combined-chain sequences may be added (but not both at the
same time).
:param sequences: Sequences to add
:type sequences: list[sequence.ProteinSequence] or
list[sequence.CombinedChainProteinSequence]
:param start: The index at which to insert; if None, seqs are appended.
Must be None if adding single-chain sequences.
:type start: int
"""
if not seqs:
return []
elif self._isCombinedChainSeq(seqs[0]):
super().addSeqs(seqs, start)
self._split_seq_cache = None
return seqs
elif start is not None:
raise RuntimeError("Cannot specify start when adding split-chain "
"sequences to a combined-chain alignment.")
else:
combined_seqs = self._addSplitSeqs(seqs)
self._split_seq_cache = None
return combined_seqs
@msv_utils.const
def _isCombinedChainSeq(self, seq):
"""
Figure out if the given sequence is a single-chain or a combined-chain
sequence.
:param seq: The sequence to check.
:type seq: sequence.ProteinSequence or
sequence.CombinedChainProteinSequence
:return: True if the sequence is combined-chain. False if it is
single-chain.
:rtype: bool
"""
return isinstance(seq, sequence.CombinedChainProteinSequence)
def _addSplitSeqs(self, split_seqs):
"""
Add single-chain sequences to the alignment. If any of the sequences to
add represent new chains of an existing combined-chain sequence, they
will be added to the existing sequence.
:param split_seqs: The sequences to add
:type split_seqs: list[sequence.ProteinSequence]
:return: A list of all combined-chain sequences (both new and modified)
that contain the added sequences.
:rtype: list[sequence.CombinedChainProteinSequence]
"""
modified_combined_seqs, unmatched_split_seqs = \
self._addNewChainsToExistingSeqs(split_seqs)
if unmatched_split_seqs:
new_combined_seqs = self._combineSequences(unmatched_split_seqs)
super().addSeqs(new_combined_seqs)
return modified_combined_seqs + new_combined_seqs
else:
return modified_combined_seqs
def _addNewChainsToExistingSeqs(self, split_seqs):
"""
Add new single-chain sequences to existing combined-chain sequences.
:param split_seqs: The sequences to add
:type split_seqs: list[sequence.ProteinSequence]
:return: A tuple of:
- The combined-chain sequences that had new chains added
- The single-chain sequence that were not added to any existing
combined-chain sequences.
:rtype: tuple(list[sequence.CombinedChainProteinSequence],
list[sequence.ProteinSequence])
"""
if len(self) == 0:
return [], split_seqs
existing_seqs = {}
for seq in self:
key = (seq.entry_id, seq.name)
existing_seqs.setdefault(key, []).append(seq)
modified_combined_seqs = set()
unmatched_split_seqs = []
for seq in split_seqs:
matching_prots = existing_seqs.get((seq.entry_id, seq.name))
if not matching_prots:
unmatched_split_seqs.append(seq)
continue
for cur_existing_seq in matching_prots:
if not cur_existing_seq.hasChain(seq.chain):
modified_combined_seqs.add(cur_existing_seq)
cur_existing_seq.addChain(seq)
break
else:
unmatched_split_seqs.append(seq)
return list(modified_combined_seqs), unmatched_split_seqs
def _combineSequences(self, aln):
"""
Create combined sequences for all of the given split-chain sequences.
:param aln: The split-chain sequences to combine
:type aln: Iterable(sequence.ProteinSequence)
:return: The combined sequences. Note that this list preserves ordering
from `aln`.
:rtype: list(sequence.CombinedChainProteinSequence)
"""
grouped_seqs = []
seq_data = collections.defaultdict(_SeqsFromTheSameProtein)
for seq in aln:
cur_seq_data = seq_data[seq.entry_id, seq.name]
seq_num = cur_seq_data.counts[seq.chain]
cur_seq_data.counts[seq.chain] += 1
if seq_num == len(cur_seq_data.seqs):
cur_grouped_seqs = []
cur_seq_data.seqs.append(cur_grouped_seqs)
grouped_seqs.append(cur_grouped_seqs)
else:
cur_grouped_seqs = cur_seq_data.seqs[seq_num]
cur_grouped_seqs.append(seq)
return [
sequence.CombinedChainProteinSequence(seqs) for seqs in grouped_seqs
]
def _combineAndAddSplitSeqs(self, chains, indices_to_combine):
"""
Create new combined-chain sequences from the given split-chain sequences
and add the new combined-chain sequences to the alignmment.
:param chains: The split-chain sequences to combine and add to the
aligment.
:type chains: list[sequence.ProteinSequence]
:param indices_to_combine: Information about which chains should be
included in which combined-chain sequence. Should be a list of
lists of indices. Each index refers to the chain at that position
of `chains`, and chains that are listed together will be combined
into the same combined-chain sequence.
:type indices_to_combine: list[list[int]]
"""
combined_seqs = []
for cur_indices in indices_to_combine:
cur_chains = [chains[i] for i in cur_indices]
# make sure that we're only grouping together chains that are from
# the same protein
for chain_to_check in cur_chains[1:]:
assert chain_to_check.name == cur_chains[0].name
assert chain_to_check.entry_id == cur_chains[0].entry_id
seq = sequence.CombinedChainProteinSequence(cur_chains)
combined_seqs.append(seq)
self.addSeqs(combined_seqs)
[docs] def removeSeqs(self, seqs):
"""
Remove multiple sequences from the alignment. Note that either single-
chain sequences or combined-chain sequences may be added (but not both
at the same time).
:param sequences: Sequences to remove
:type sequences: Iterable[sequence.Sequence] or
Iterable[sequence.CombinedChainProteinSequence]
"""
seqs = list(seqs)
if not seqs:
return
elif self._isCombinedChainSeq(seqs[0]):
super().removeSeqs(seqs)
else:
self._removeSplitSeqs(seqs)
self._split_seq_cache = None
def _removeSplitSeqs(self, split_seqs):
"""
Remove single-chain sequences from the alignment. If all chains of a
combined-chain sequence are removed, the combined-chain sequence will be
removed from the alignment.
:param split_seqs: The sequences to remove
:type split_seqs: list[sequence.ProteinSequence]
"""
split_seqs = set(split_seqs)
combined_seqs_to_remove = []
chains_to_remove = []
# Figure out what chains go with which combined-chain sequence
for combined_seq in self:
cur_chains = split_seqs.intersection(combined_seq.chains)
if not cur_chains:
continue
if len(cur_chains) == len(combined_seq.chains):
combined_seqs_to_remove.append(combined_seq)
else:
chains_to_remove.append((combined_seq, cur_chains))
split_seqs -= cur_chains
if split_seqs:
raise ValueError("Cannot remove chains that are not in the "
"alignment")
# actually remove the chains and sequences
super().removeSeqs(combined_seqs_to_remove)
for combined_seq, chains in chains_to_remove:
combined_seq.removeChains(chains)
[docs] @msv_utils.const
def combinedSeqForSplitSeq(self, split_seq):
"""
Get the combined-chain sequence that contains the given split-chain
sequence.
:param split_seq: The split-chain sequence
:type split_seq: sequence.Sequence
:return: The combined-chain sequence
:rtype: sequence.CombinedChainProteinSequence
"""
if self._split_seq_cache is None:
self._split_seq_cache = {
cur_split_seq: combined_seq for combined_seq in self
for cur_split_seq in combined_seq.chains
}
return self._split_seq_cache[split_seq]
[docs] @msv_utils.const
def combinedResForSplitRes(self, split_res):
"""
Get the combined-chain residue for the given split-chain residue.
:param res: The split-chain residue
:type res: residue.AbstractSequenceElement
:return: The combined-chain residue
:rtype: residue.CombinedChainResidueWrapper
"""
combined_seq = self.combinedSeqForSplitSeq(split_res.sequence)
return residue.CombinedChainResidueWrapper(split_res, combined_seq)
@msv_utils.const
def _assertCanMutateResidues(self, seq_i, start, end, elements):
super()._assertCanMutateResidues(seq_i, start, end, elements)
# Make sure that the specified indices don't span a chain break
self[seq_i].assertCanMutateResidues(start, end)
[docs] @msv_utils.const
def getInterChainAnchors(self):
"""
Return all residues that are anchored to a different chain of the
reference sequence (e.g. a residue in the second chain anchored to a
reference residue from the first chain).
:return: The anchored residues.
:rtype: set[residue.Residue]
"""
inter_chain_anchors = set()
ref_seq = self.getReferenceSeq()
for ref_res, anchored_res in self._getAnchorPairs():
ref_chain_index = ref_seq.chains.index(ref_res.split_sequence)
anc_seq_chains = anchored_res.sequence.chains
anc_chain_index = anc_seq_chains.index(anchored_res.split_sequence)
if ref_chain_index != anc_chain_index:
inter_chain_anchors.add(anchored_res)
return inter_chain_anchors
[docs] def alignChainStarts(self):
"""
Align chain starting positions (e.g. make sure that the start of the
N-th chain occurs in the same column for all sequences). This method
will add gaps at the starts and/or ends of chains to preserve anchoring.
:return: A tuple of:
- A list of chain starting indices. This will not include the
starting index of the first chain, which is always 0.
- The starting index of the first chain for which there's no
corresponding reference chain (e.g. the starting index for the
third chain if there are only two chains in the reference
sequence). This will be None if there are no chains without a
corresponding reference chain.
:rtype: tuple(list[int], int or None)
"""
gaps, chain_starts, end_of_ref = self._gapsToAddToAlignChainStarts()
self._addGapsToChainStartsAndEnds(gaps)
return chain_starts, end_of_ref
@msv_utils.const
def _gapsToAddToAlignChainStarts(self):
"""
Figure out how many gaps need to be added to each chain start and end in
order to align the chain starting positions (e.g. make sure that the
start of the N-th chain occurs in the same column for all sequences).
:return: A tuple of:
- The numbers of gaps to add
- A list of chain starting indices. This will not include the
starting index of the first chain, which is always 0.
- The starting index of the first chain for which there's no
corresponding reference chain (e.g. the starting index for the
third chain if there are only two chains in the reference
sequence). This will be None if there are no chains without a
corresponding reference chain.
:rtype: tuple(GapRegionsForSeqs, list[int], int or None)
"""
if len(self) == 0:
return [], [], None
anchor_offsets = self._getAnchorOffsets()
gaps_to_add, chain_lengths = self._gapsToAddFromAnchorOffsets(
anchor_offsets)
chain_starts = list(itertools.accumulate(chain_lengths))[:-1]
num_ref_chains = len(self.getReferenceSeq().chains)
if num_ref_chains - 1 == len(chain_starts):
# There are as many chains in the reference sequence as there are in
# the whole alignment
end_of_ref = None
else:
end_of_ref = chain_starts[num_ref_chains - 1]
return gaps_to_add, chain_starts, end_of_ref
def _getAnchorOffsets(self):
"""
Determine how many residues we would need to offset the start of each
chain relative to the equivalent chain in the reference sequence in
order to preserve anchored residues.
:return: Offset values given as
`anchor_offsets[chain_index][sequence_index] = offset`. Note that
the chain index comes first, not the sequence index. If a given
chain doesn't contain any anchored residues, the offset will be
zero.
:rtype: list[list[int]]
:raise AnchoredResidueError: If any inter-chain anchors are present.
See `getInterChainAnchors` for additional information.
"""
num_chains = max(len(seq.chains) for seq in self)
anchor_offsets = [[0] * len(self) for _ in range(num_chains)]
ref_seq = self.getReferenceSeq()
for ref_res, anchored_res in self._getAnchorPairs():
ref_chain_index = ref_seq.chains.index(ref_res.split_sequence)
ref_res_index = ref_res.split_res.idx_in_seq
anc_seq = anchored_res.sequence
anc_seq_index = self.index(anc_seq)
anc_chain_index = anc_seq.chains.index(anchored_res.split_sequence)
anc_res_index = anchored_res.split_res.idx_in_seq
if ref_chain_index != anc_chain_index:
raise AnchoredResidueError()
cur_offset = ref_res_index - anc_res_index
anchor_offsets[anc_chain_index][anc_seq_index] = cur_offset
return anchor_offsets
def _gapsToAddFromAnchorOffsets(self, anchor_offsets):
"""
Using the anchor offsets returned by `_getAnchorOffsets`, figure out how
many gaps need to be added to each chain start and end in order to align
the chain starting positions while preserving anchored residues.
:param anchor_offsets: How many residues we need to offset the start of
each chain relative to the equivalent chain in the reference
sequence in order to preserve anchored residues.
:type anchor_offsets: list[list[int]]
:return: A tuple of:
- The number of gaps to add to the start and end of each chain
- The length that each chain will be after the gaps have been added.
:rtype: tuple(GapRegionsForSeqs, list[int])
"""
gaps_to_add = [[None] * len(seq.chains) for seq in self]
chain_lengths = []
for chain_i, (chain_anchor_offsets, chains) in enumerate(
zip(anchor_offsets,
itertools.zip_longest(*(seq.chains for seq in self)))):
# figure out how many gaps we're going to have to add before the
# start of this chain in the reference sequence
ref_start_gaps = -min(
(offset for offset in chain_anchor_offsets if offset < 0),
default=0)
# figure out how many columns this chain is going to take up
max_chain_length = max(ref_start_gaps + cur_anchor_offset +
len(cur_chain)
for cur_chain, cur_anchor_offset in zip(
chains, chain_anchor_offsets)
if cur_chain is not None)
chain_lengths.append(max_chain_length)
for seq_i, (cur_seq, cur_chain, cur_anchor_offset) in enumerate(
zip(self, chains, chain_anchor_offsets)):
if cur_chain is None:
# this sequence doesn't have a chain in this position
continue
start_gaps = ref_start_gaps + cur_anchor_offset
if chain_i < len(cur_seq.chains) - 1:
end_gaps = max_chain_length - len(cur_chain) - start_gaps
else:
# there's no need to add gaps to the end of the sequence
end_gaps = 0
gaps_to_add[seq_i][chain_i] = sequence.GapRegion(
from_start=start_gaps, from_end=end_gaps)
return gaps_to_add, chain_lengths
def _addGapsToChainStartsAndEnds(self, gaps: sequence.GapRegionsForSeqs):
"""
Add the specified numbers of gaps to the starts and ends of each chain
in each sequence.
:param gaps: The numbers of gaps to add
"""
assert len(gaps) == len(self)
with self.suspendAnchors():
for seq, cur_gaps in zip(self, gaps):
seq.addGapsToChainStartsAndEnds(cur_gaps)
def _removeGapsFromChainStartsAndEnds(self,
gaps: sequence.GapRegionsForSeqs):
"""
Remove the specified numbers of gaps from the starts and ends of each
chain in each sequence.
:param gaps: The numbers of gaps to remove
"""
assert len(gaps) == len(self)
with self.suspendAnchors():
for seq, cur_gaps in zip(self, gaps):
seq.removeGapsFromChainStartsAndEnds(cur_gaps)
[docs] def adjustChainStarts(self, adjust_by):
"""
Move each chain break position to the right by the specified number of
gaps. Note that chain breaks can only be moved along gaps, not
residues.
:param num_gaps: The number of gaps to move each chain break by, given
as `adjust_by[sequence_index][chain_break_index] = adjustment`.
Note that no adjustment is given for the start of the first chain or
the end of the last chain.
:type num_gaps: list[list[int]]
:raise AssertionError: If some of the sequence elements to be removed
aren't actually gaps.
"""
gaps_to_remove, gaps_to_add = self._adjustChainStartsToGaps(adjust_by)
with self.suspendAnchors():
self._removeGapsFromChainStartsAndEnds(gaps_to_remove)
self._addGapsToChainStartsAndEnds(gaps_to_add)
@msv_utils.const
def _adjustChainStartsToGaps(self, adjust_by):
"""
Convert the value passed to `adjustChainStarts` into values that can be
passed to `_removeGapsFromChainStartsAndEnds` and
`_addGapsToChainStartsAndEnds`.
:param num_gaps: The number of gaps to move each chain break by, given
as `adjust_by[sequence_index][chain_break_index] = adjustment`.
Note that no adjustment is given for the start of the first chain or
the end of the last chain.
:type num_gaps: list[list[int]]
:return: A tuple of gaps to remove and gaps to add.
:rtype: tuple(list[list[tuple(int, int)]], list[list[tuple(int, int)]])
"""
gaps_to_remove = []
gaps_to_add = []
for seq_adjust_by in adjust_by:
seq_gaps_to_remove = [sequence.GapRegion(0, 0)]
seq_gaps_to_add = []
for chain_adjust_by in seq_adjust_by:
seq_gaps_to_add.append(sequence.GapRegion(0, chain_adjust_by))
seq_gaps_to_remove.append(sequence.GapRegion(
chain_adjust_by, 0))
seq_gaps_to_add.append(sequence.GapRegion(0, 0))
gaps_to_remove.append(seq_gaps_to_remove)
gaps_to_add.append(seq_gaps_to_add)
return gaps_to_remove, gaps_to_add
def _addElementByChain(self, seq, index, chain, element):
"""
Add the given element to the specified chain of a sequence.
:param seq: The sequence to insert into.
:type seq: sequence.CombinedChainProteinSequence
:param index: The index to insert the element at. Note that this is the
index in `seq`, not `chain`.
:type index: int
:param chain: The chain to insert into
:type chain: sequence.ProteinSequence
:param element: The element to insert
:type element: str or residue.AbstractSequenceElement
"""
self._assertCanInsert(seq, [index])
seq.insertElementByChain(index, chain, element)
self.signals.residuesAdded.emit([element])
@msv_utils.const
def _getInvalidatedBonds(self, seqs):
# See parent class for method documentation
# This method only cares about inter-sequence disulfides, which can't
# happen in a combined-chain alignment. Inter-chain disulfides are
# always between chains in the same entry, so they'll always be within a
# single sequence in a combined-chain alignment.
return [], []
def _restoreInvalidatedBonds(self, invalidated_known_bonds,
invalidated_pred_bonds):
# See parent class for method documentation
# This method isn't needed for combined-chain alignments because
# _getInvalidatedBonds always returns empty lists
pass
[docs]def get_contiguous_groups(nums):
"""
Group numbers in a given list by contiguity. Each group that is returned
will be a list of numbers where every value is an int that only differs
from its neighbors by one.
e.g. [1, 2, 4] -> [[1, 2], [4]]
[1, 2, 4, 5, 10] -> [[1, 2], [4, 5], [10]]
:param nums: A list of numbers to group
:type nums: list(int)
:return: A list of groups of numbers, where the numbers in each group are
contiguous
:rtype: list(list(int))
"""
assert len(nums) > 0
nums.sort()
groups = []
start = nums[0]
while start <= nums[-1]:
group = []
for num in range(start, nums[-1] + 1):
start = num + 1
if num not in nums:
break
group.append(num)
if len(group):
groups.append(group)
return groups