"""
Classes for working with sequences containing alignment information (gaps) and
collections thereof.
Copyright Schrodinger, LLC. All rights reserved.
"""
import collections
import contextlib
import copy
import enum
import itertools
import types
from bisect import bisect
from schrodinger.application.msv import utils as msv_utils
from schrodinger.models import json
from schrodinger.protein import annotation
from schrodinger.protein import properties
from schrodinger.protein import residue
from schrodinger.protein import sequence
from schrodinger.Qt import QtCore
ResidueSimilarity = enum.Enum('ResidueSimilarity',
                              ['Identical', 'Similar', 'Dissimilar', 'NA'])
[docs]class AnchoredResidueError(RuntimeError):
    """
    Exception to indicate that an action would break anchors.
    When possible, the specific anchors that block the action are stored on the
    exception instance.
    :ivar blocking_anchors: Anchors that block the action or `ALL_ANCHORS` if
        all anchors block the action.
    :vartype blocking_anchors: list[residue.Residue] or object
    """
    ALL_ANCHORS = object()
[docs]    def __init__(self, message=None, blocking_anchors=ALL_ANCHORS):
        """
        :param message: Exception message
        :type message: str
        :param blocking_anchors: Anchored residues that block the action
        :type blocking_anchors: list[residue.Residue]
        """
        super().__init__(message)
        self.blocking_anchors = blocking_anchors  
[docs]class StructuredResidueError(RuntimeError):
    pass 
[docs]class AlignmentSignals(QtCore.QObject):
    """
    A collection of signals that can be emitted by an alignment
    :ivar domainsChanged: TODO
    :vartype domainsChanged: `QtCore.pyqtSignal`
    :ivar sequencesAboutToBeInserted: A signal emitted before sequences are
        inserted into the alignment.  Emitted with: (The index of the first
        sequence to be inserted, The index of the last sequence to be inserted)
    :vartype sequencesAboutToBeInserted: `QtCore.pyqtSignal`
    :ivar sequencesInserted: A signal emitted after sequences are
        inserted into the alignment.  Emitted with: (The index of the first
        sequence inserted, The index of the last sequence inserted)
    :vartype sequencesInserted: `QtCore.pyqtSignal`
    :ivar sequencesAboutToBeRemoved: A signal emitted before sequences are
        removed from the alignment.  Emitted with: (The index of the first
        sequence to be removed, The index of the last sequence to be removed)
    :vartype sequencesAboutToBeRemoved: `QtCore.pyqtSignal`
    :ivar sequencesRemoved: A signal emitted after sequences are
        removed from the alignment.  Emitted with: (The index of the first
        sequence removed, The index of the last sequence removed)
    :vartype sequencesRemoved: `QtCore.pyqtSignal`
    :ivar sequenceResiduesChanged: A signal emitted after the contents of a
        sequence have changed.  Note that this signal may also be emitted in
        response to a sequence changing length, as positions in the alignment
        may switch from blank to occupied or vice versa.
    :vartype sequenceResiduesChanged: `QtCore.pyqtSignal`
    :ivar sequencesAboutToBeReordered: Signal emitted before reordering
        sequences
    :type: sequencesAboutToBeReordered: `QtCore.pyqtSignals`
    :ivar sequencesReordered: Signal emitted after sequences have been reordered
    :type: sequencesReordered: `QtCore.pyqtSignals`
    :ivar sequenceNameChanged: A signal emitted after a sequence has changed
        names.  Emitted with: (The modified sequence)
    :vartype sequenceNameChanged: `QtCore.pyqtSignal`
    :ivar annotationTitleChanged: A signal emitted after a sequence's annotation
        has changed titles. Emitted with: (The sequence whose annotation title
        has been modified)
    :vartype annotationTitleChanged: `QtCore.pyqtSignal`
    :ivar alignmentNumColumnsAboutToChange: A signal emitted before the
        alignment changes length.  Emitted with: (The current length of the
        alignment, The new length of the alignment)
    :vartype alignmentNumColumnsAboutToChange: `QtCore.pyqtSignal`
    :ivar alignmentNumColumnsChanged: A signal emitted after the alignment
        changes length. Emitted with: (The old length of the alignment, The
        current length of the alignment)
    :vartype alignmentNumColumnsChanged: `QtCore.pyqtSignal`
    :ivar residuesAboutToBeRemoved: A signal emitted before residues are to be
        removed. Emitted with a list of the residues to be removed.
    :vartype residuesAboutToBeRemoved: `QtCore.pyqtSignal`
    :ivar residuesRemoved: A signal emitted after residues are removed.  This
        signal is not emitted with any parameters, but the residues that were
        removed were listed with the corresponding residuesAboutToBeRemoved
        signal.
    :vartype residuesRemoved: `QtCore.pyqtSignal`
    :ivar residuesAdded: A signal emitted with added residues. Note that this
        signal will be only be emitted once even if residues are added to
        multiple sequences. In addition, each individual sequence will emit a
        lengthChanged signal.
    :vartype residuesAdded: `QtCore.pyqtSignal`
    :ivar sequenceVisibilityChanged: A signal emitted when visibility of a
        sequence changes.  Emitted with: (the sequence whose visibility is
        changing, the index of the sequence)
    :vartype sequenceVisibilityChanged: `QtCore.pyqtSignal`
    :ivar sequenceStructureChanged: A signal emitted when structure of a
        sequence changes.  Emitted with: (the sequence whose visibility is
        changing, the index of the sequence)
    :vartype sequenceStructureChanged: `QtCore.pyqtSignal`
    :ivar alignmentAboutToBeCleared: A signal emitted just before all sequences
        are removed from the alignment.
    :vartype alignmentAboutToBeCleared: `QtCore.pyqtSignal`
    :ivar alignmentCleared: A signal emitted just after all sequences have been
        removed from the alignment.
    :vartype alignmentCleared: `QtCore.pyqtSignal`
    :ivar anchoredResiduesChanged: A signal emitted when one or more residues
        are anchored or unanchored.
    :vartype alignmentCleared: `QtCore.pyqtSignal`
    :ivar alnSetsChanged: A signal emitted when the alignment set for one or
        more sequences changes.
    :vartype alnSetsChanged: QtCore.pyqtSignal
    """
    domainsChanged = QtCore.pyqtSignal()
    invalidatedDomains = QtCore.pyqtSignal()
    descriptorsCleared = QtCore.pyqtSignal()
    sequencesAboutToBeInserted = QtCore.pyqtSignal(int, int)
    sequencesInserted = QtCore.pyqtSignal(int, int)
    sequencesAboutToBeRemoved = QtCore.pyqtSignal(int, int)
    sequencesRemoved = QtCore.pyqtSignal(int, int)
    sequenceResiduesChanged = QtCore.pyqtSignal()
    sequencesAboutToBeReordered = QtCore.pyqtSignal()
    sequencesReordered = QtCore.pyqtSignal(list)
    sequenceNameChanged = QtCore.pyqtSignal(sequence.Sequence)
    annotationTitleChanged = QtCore.pyqtSignal(sequence.Sequence)
    alignmentNumColumnsAboutToChange = QtCore.pyqtSignal(int, int)
    alignmentNumColumnsChanged = QtCore.pyqtSignal(int, int)
    residuesAboutToBeRemoved = QtCore.pyqtSignal(list)
    residuesRemoved = QtCore.pyqtSignal()
    residuesAdded = QtCore.pyqtSignal(list)
    sequenceVisibilityChanged = QtCore.pyqtSignal(sequence.Sequence, int)
    sequenceStructureChanged = QtCore.pyqtSignal(sequence.Sequence, int)
    alignmentAboutToBeCleared = QtCore.pyqtSignal()
    alignmentCleared = QtCore.pyqtSignal()
    anchoredResiduesChanged = QtCore.pyqtSignal()
    alnSetChanged = QtCore.pyqtSignal()
    secondaryStructureChanged = QtCore.pyqtSignal()
    predictionsChanged = QtCore.pyqtSignal()
    pfamChanged = QtCore.pyqtSignal()
    kinaseFeaturesChanged = QtCore.pyqtSignal()
    kinaseConservationChanged = QtCore.pyqtSignal()
    @property
    def aln(self):
        """
        Return the alignment that this signals object is reporting for.
        :rtype: BaseAlignment
        """
        return self.parent()
[docs]    @QtCore.pyqtSlot()
    def emitSeqResChanged(self):
        # We use self.sender() to retrieve the sequence instead of using a
        # functools.partial as a slot due to a memory leak (MSV-1538)
        self.sequenceResiduesChanged.emit() 
[docs]    @QtCore.pyqtSlot()
    def emitSeqNameChanged(self):
        # We use self.sender() to retrieve the sequence instead of using a
        # functools.partial as a slot due to a memory leak (MSV-1538)
        self.sequenceNameChanged.emit(self.sender()) 
[docs]    @QtCore.pyqtSlot()
    def emitAnnTitleChanged(self):
        # We use self.sender() to retrieve the sequence instead of using a
        # functools.partial as a slot due to a memory leak (MSV-1538)
        self.annotationTitleChanged.emit(self.sender()) 
[docs]    def allSignals(self):
        """
        Iterate over all signals in this object in alphabetical order.
        :rtype: Iter(QtCore.pyqtBoundSignal)
        """
        for signal, signal_name in self.allSignalsAndNames():
            yield signal 
[docs]    def allSignalsAndNames(self):
        """
        Iterate over all signals in this object and their names in alphabetical
        order.
        :rtype: Iter(tuple(QtCore.pyqtBoundSignal, str))
        """
        for attr_name in sorted(dir(self)):
            attr = getattr(self, attr_name)
            if isinstance(attr, QtCore.pyqtBoundSignal):
                yield attr, attr_name  
[docs]class BaseAlignment(QtCore.QObject):
    """
    Abstract base class for classes which handle alignment of various sequences
    and corresponding annotations.
    This is a pure domain object intended to make it easy to work with aligned
    collections of sequences.
    Some methods are decorated with @msv_utils.const in order to make it easy to
    write a wrapper for this class that supports undo/redo operations.
    :cvar _ALN_ANNOTATION_CLASS: The class for alignment annotations.  This
        value should be overridden in subclasses.
    :vartype _ALN_ANNOTATION_CLASS: type
    :cvar _SEQ_ANNOTATION_CLASS: The class for sequence annotations.  This
        value should be overridden in subclasses.
    :vartype _SEQ_ANNOTATION_CLASS: type
    """
    _ALN_ANNOTATION_CLASS = None
    _SEQ_ANNOTATION_CLASS = None
[docs]    def __init__(self, sequences=None):
        """
        :param sequences: An optional iterable of sequences
        :type sequences: list
        """
        super().__init__()
        # Count for number of times we've entered a suspendAnchors context.
        self._suspend_anchors_count = 0
        self._modifying_structure = False
        self.signals = AlignmentSignals(self)
        self._sequences = []
        self._anchored_residues = collections.defaultdict(set)
        self._name_to_set = {}
        self._seq_to_set = {}
        self._set_id_counter = 0
        self._num_columns = 0
        self._seq_positions_cache = {}
        self._col_same_res_cache = None
        self._res_similarity_cache = None
        self._ref_type_cache = None
        self._seqs_matching_ref_type_cache = None
        self._RO_anchored_residues = None  # read-only set of anchored residues
        # read-only set of anchored residue plus the corresponding reference
        # residues
        self._RO_anchored_residues_with_ref = None
        if self._ALN_ANNOTATION_CLASS is None:
            self._annotations = None
            self._global_annotations = []
        else:
            self._annotations = self._ALN_ANNOTATION_CLASS(self)
            self._global_annotations = \
                
self._ALN_ANNOTATION_CLASS.ANNOTATION_TYPES
        if self._SEQ_ANNOTATION_CLASS is None:
            self._seq_annotations = []
        else:
            self._seq_annotations = self._SEQ_ANNOTATION_CLASS.ANNOTATION_TYPES
        self.signals.sequencesAboutToBeRemoved.connect(
            self._clearAnchorsFromSeqs)
        self.signals.anchoredResiduesChanged.connect(self.clearAllCaching)
        cache_invalidating_signals = [
            self.signals.sequencesRemoved, self.signals.anchoredResiduesChanged,
            self.signals.sequencesReordered,
            self.signals.alignmentNumColumnsChanged,
            self.signals.alignmentNumColumnsAboutToChange,
            self.signals.sequenceResiduesChanged,
            self.signals.sequencesInserted, self.signals.residuesRemoved,
            self.signals.residuesAdded
        ]
        for signal in cache_invalidating_signals:
            signal.connect(self._resetCaches)
        for signal in (self.signals.sequencesRemoved,
                       self.signals.sequencesInserted,
                       self.signals.sequencesReordered):
            signal.connect(self._resetSeqTypeCache)
        if sequences is not None:
            if any(isinstance(seq, str) for seq in sequences):
                sequences = [
                    sequence.ProteinSequence(seq_str) if isinstance(
                        seq_str, str) else seq_str for seq_str in sequences
                ]
            self.addSeqs(sequences) 
    ############################################################################
    # Magic methods
    ############################################################################
[docs]    @msv_utils.const
    def __len__(self):
        """
        Returns the number of sequences in the alignment
        """
        return len(self._sequences) 
    @msv_utils.const
    def __iter__(self):
        """
        Returns an iterable of the sequences held in the alignment
        """
        return iter(self._sequences)
[docs]    @msv_utils.const
    def __contains__(self, seq):
        """
        Returns whether the sequence is present in the alignment
        """
        return seq in self._sequences 
    @msv_utils.const
    def __getitem__(self, index):
        """
        Returns the sequence at the index in the alignment
        """
        try:
            return self._sequences[index]
        except IndexError:
            msg = ("index (%d) out of range (alignment length is only %d)" %
                   (index, len(self)))
            raise IndexError(msg)
    # TODO MSV-1907: Move magic methods into one contiguous block.
    @msv_utils.const
    def _getTextFormattedAlignment(self):
        """
        Returns a formatted list of strings with detailed alignment information
        """
        element_pad = 3
        v_header_pad = 15
        # pad all the strings in the list by a given amount
        def pad_list(l, amount):
            return [i.ljust(amount) for i in l]
        seq_data = collections.OrderedDict()
        ruler = [str(i) for i in range(self._num_columns)]
        ruler = pad_list(ruler, element_pad)
        seq_data["Resnum:"] = ruler
        for num, seq in enumerate(self, 1):
            name = seq.name
            if not name:
                name = "Seq" + str(num)
            name += ":"
            elements = []
            for res in seq:
                if res.is_gap:
                    res_str = seq.gap_char
                else:
                    res_str = str(res)
                if not res.hasStructure() and res.is_res:
                    res_str = residue.box_letter(res_str)
                elements.append(res_str)
            elements = pad_list(elements, element_pad)
            seq_data[name] = elements
        to_format = []
        # break every stride elements and start a new line. This keeps lines
        # about 75 characters long
        stride = 20
        for i in range(0, self._num_columns, stride):
            if i != 0:
                to_format.append("\n...cont...\n")
            for title, rows in seq_data.items():
                if len(rows) < self._num_columns:
                    # prevent index errors
                    padding_needed = self._num_columns - len(rows)
                    rows += ["".ljust(element_pad)] * padding_needed
                line = title.ljust(v_header_pad) + "".join(rows[i:i + stride])
                to_format.append(line)
        return to_format
    @msv_utils.const
    def _getAlignmentDescription(self):
        """
        Returns a formatted description of the alignment
        """
        seq_lengths = ", ".join([str(len(seq)) for seq in self])
        class_name = self.__class__.__name__
        num_seqs = str(len(self))
        info = "{class_name} containing {num_seqs} sequences"
        if len(self) > 0:
            info += " (of respective lengths: {lengths})"
        info = info.format(class_name=class_name,
                           num_seqs=num_seqs,
                           lengths=seq_lengths)
        return info
    @msv_utils.const
    def __str__(self):
        """
        Returns a str representation of the alignment
        This is fairly detailed since it is very useful for debugging
        """
        to_format = self._getTextFormattedAlignment()
        info = self._getAlignmentDescription()
        return info + "\n\n" + "\n".join(to_format)
    def __repr__(self):
        """
        :rtype: str
        :return: A str representation of the alignment
        """
        return self._getRepr(type(self).__qualname__)
    def _getRepr(self, classname):
        aln_repr = [
            classname, "([", ", ".join([repr(seq) for seq in self]), "])"
        ]
        return "".join(aln_repr)
    def __deepcopy__(self, memo):
        """
        We should be able to copy an alignment, getting back an alignment with
        all the essential data in the original alignment and sharing no
        references with it
        """
        sequences = copy.deepcopy(self._sequences, memo)
        Cls = self.__class__
        aln = Cls(sequences)
        self._copyAnchoringTo(aln)
        self._copyAlnSetsTo(aln)
        return aln
    @msv_utils.const
    def _copyAnchoringTo(self, aln):
        """
        Copy all anchored residues from this alignment to the given alignment.
        :param aln: The alignment to copy anchoring to.
        :type aln: BaseAlignment
        """
        anchored_residues = self.getAnchoredResidues()
        new_anchored_res = self._mapResidues(anchored_residues, aln)
        aln.anchorResidues(new_anchored_res)
    @msv_utils.const
    def _copyAlnSetsTo(self, aln):
        """
        Copy all alignment sets from this alignment to the given alignment.
        :param aln: The alignment to copy alignment sets to.
        :type aln: BaseAlignment
        """
        for aln_set in self._name_to_set.values():
            copied_seqs = [aln[self.index(seq)] for seq in aln_set]
            aln._addSeqsToAlnSetNoReordering(copied_seqs, aln_set.name)
            # make sure that the set ids match so that the colors are the same
            aln._name_to_set[aln_set.name].set_id = aln_set.set_id
        aln._set_id_counter = self._set_id_counter
    @msv_utils.const
    def _mapResidues(self, og_residues, new_alignment):
        """
        Given residues and a new alignment, get the residues in the same
        position in the new alignment.
        """
        return [
            new_alignment[self.index(res.sequence)][res.idx_in_seq]
            for res in og_residues
        ]
    ############################################################################
    # Annotations
    ############################################################################
    @property
    def annotations(self):
        return self._annotations
    @property
    def global_annotations(self):
        """
        Returns the alignment-level annotations available for the alignment
        """
        return self._global_annotations
    @property
    def seq_annotations(self):
        """
        Returns the sequence-level annotations available for sequences held in
        the alignment
        """
        return self._seq_annotations
[docs]    @msv_utils.const
    def getGlobalAnnotationData(self, index, annotation):
        """
        Returns column-level annotation data at an index in the alignment
        :param index: The index in the alignment
        :type index: int
        :param annotation: An enum representing the requested annotation, if any
        :type annotation: `enum.Enum`
        """
        annotation_name = annotation.name
        data_array = getattr(self.annotations, annotation_name)
        try:
            return data_array[index]
        except IndexError:
            return 
    ############################################################################
    # Sequences and signals
    ############################################################################
    @property
    def num_columns(self):
        return self._num_columns
[docs]    @msv_utils.const
    def getWorkspaceCounts(self):
        """
        Summarize the visibility status of the alignment's sequences
        :returns: Counts of each type of visibility
        :rtype: collections.Counter
        """
        return collections.Counter(seq.visibility for seq in self) 
    @QtCore.pyqtSlot()
    @msv_utils.const
    def _sequenceVisibilityChanged(self):
        """
        Notify any listeners that a request to change visibility of
        an entire sequence has been made.
        """
        # We use self.sender() to retrieve the sequence instead of using a
        # functools.partial as a slot due to a memory leak (MSV-1538)
        seq = self.sender()
        self.signals.sequenceVisibilityChanged.emit(seq, self.index(seq))
    @QtCore.pyqtSlot()
    @msv_utils.const
    def _sequenceStructureChanged(self):
        """
        Notify any listeners that the structure corresponding
        to a sequence has changed.
        """
        # We use self.sender() to retrieve the sequence instead of using a
        # functools.partial as a slot due to a memory leak (MSV-1538)
        seq = self.sender()
        self.signals.sequenceStructureChanged.emit(seq, self.index(seq))
[docs]    @msv_utils.const
    def index(self, seq):
        """
        Returns the index of the specified sequence.
        :param seq: The requested sequence
        :type seq:  `sequence.Sequence`
        :rtype: int
        :return: The index of the requested sequence
        """
        return self._seq_positions[seq] 
[docs]    def reorderSequences(self, seq_indices):
        """
        Reorder the sequences in the alignment using the specified list of
        indices.
        In the undoable version of this class, the private function is needed to
        perform the operation in an undoable operation.
        :param seq_indices: A list with the new indices for sequences
        :type: list of int
        :raises ValueError: In the event that the list of indices does not match
            the length of the alignment
        """
        if len(seq_indices) != len(self):
            msg = ("The number of elements in seq_indices should match the "
                   "number of sequences in the alignment")
            raise ValueError(msg)
        self.signals.sequencesAboutToBeReordered.emit()
        new_sequences = [self._sequences[i] for i in seq_indices]
        self._sequences = new_sequences
        self.signals.sequencesReordered.emit(seq_indices) 
[docs]    def sortByProperty(self, seq_prop, reverse=False):
        """
        Sort the sequences by a sequence property. Sequences that do not have
        the sequence property defined will be grouped at the end of the
        alignment (regardless of `reverse`)
        :type seq_prop: properties.SequenceProperty
        """
        sort_indices = self._getSortByPropertyIndices(seq_prop, reverse=reverse)
        self.reorderSequences(sort_indices) 
    def _getSortByPropertyIndices(self, seq_prop, reverse=False):
        def sort_key(seq):
            val = seq.getProperty(seq_prop)
            if val is None:
                val = float('inf')
                if reverse:
                    val = -float('inf')
            return val
        return self._getSortIndices(key=sort_key, reverse=reverse)
[docs]    def sort(self, *, key, reverse=False):
        """
        Sort the alignment by the specified criteria.
        NOTE: Query sequence is not included in the sort.
        :param key: A function that takes a sequence and returns a value to sort
                    by for each sequence. (required keyword-only argument)
        :type key: function
        :param reverse: Whether to sort in reverse (descending) order.
        :type reverse: bool
        """
        sort_indices = self._getSortIndices(key, reverse=reverse)
        self.reorderSequences(sort_indices)
        return sort_indices 
    def _getSortIndices(self, key, reverse=False):
        '''
        Helper method that gets the sequence indices for a sort.
        :param key: A function that returns a key to sort by.
        :type key: function
        :param reverse: Whether to sort in reverse (descending) order.
        :type order: bool
        :raises ValueError: If given something other than a SortBy enum.
        '''
        # Extract key values and zip with indices
        if len(self) == 0:
            return []
        else:
            # Skip index 0 since that's the query sequence
            key_values = [(key(self[i]), i) for i in range(1, len(self))]
            key_values.sort(key=lambda x: x[0], reverse=reverse)
            sort_indices = [0] + [kv[1] for kv in key_values]
            return sort_indices
    # TODO MSV-1910: Consider renaming length to numColumns?
    @msv_utils.const
    def _recalculateNumColumns(self, omit, extra_length=None):
        """
        Determine what the length of the alignment would be if we removed
        sequences.
        :param omit: The sequences to omit
        :type omit: Iterable[sequence.Sequence]
        :param extra_length: An additional sequence length to include in the
            calculation
        :type extra_length: int
        :return: The calculated alignment length if the length is going to
            change.  None otherwise.
        :rtype: int or NoneType
        """
        omitted = [seq for seq in self._sequences if seq not in omit]
        if omitted:
            new_length = max(map(len, omitted))
        else:
            new_length = 0
        if extra_length is not None and extra_length > new_length:
            new_length = extra_length
        if new_length != self._num_columns:
            return new_length
    def _monitorSequence(self, seq):
        """
        Monitor changes in the specified sequence.
        :param seq: The sequence to monitor
        :type seq: `sequence.Sequence`
        """
        for signal, slot in self._getSeqSignalsAndSlots(seq):
            signal.connect(slot)
    def _stopMonitoringSequence(self, seq):
        """
        Stop monitoring for changes in the specified sequence.
        :param seq: The sequence to stop monitoring
        :type seq: `sequence.Sequence`
        """
        for signal, slot in self._getSeqSignalsAndSlots(seq):
            signal.disconnect(slot)
    def _getSeqSignalsAndSlots(self, seq):
        """
        :return: pairs of (sequence signal, alignment slot)
        :rtype: tuple(tuple())
        """
        signals = self.signals
        return ((seq.residuesChanged, signals.emitSeqResChanged),
                (seq.annotations.domainsChanged, signals.domainsChanged),
                (seq.annotations.invalidatedDomains, signals.invalidatedDomains),
                (seq.descriptorsCleared, signals.descriptorsCleared),
                (seq.nameChanged, signals.emitSeqNameChanged),
                (seq.annotationTitleChanged, signals.emitAnnTitleChanged),
                (seq.lengthAboutToChange, self._sequenceLengthAboutToChange),
                (seq.lengthChanged, self._sequenceLengthChanged),
                (seq.visibilityChanged, self._sequenceVisibilityChanged),
                (seq.structureChanged, self._sequenceStructureChanged),
                (seq.secondaryStructureChanged, signals.secondaryStructureChanged),
                (seq.predictionsChanged, signals.predictionsChanged),
                (seq.kinaseFeaturesChanged, signals.kinaseFeaturesChanged),
                (seq.kinaseConservationChanged, signals.kinaseConservationChanged),
                (seq.pfamChanged, signals.pfamChanged)) # yapf: disable
    @QtCore.pyqtSlot(int, int)
    def _sequenceLengthAboutToChange(self, old_seq_length, new_seq_length):
        """
        Respond to a sequence `lengthAboutToChange` signal by emitting
        `signals.alignmentNumColumnsAboutToChange` if necessary.
        :param old_seq_length: The current length of the sequence
        :type old_seq_length: int
        :param new_seq_length: The new length of the sequence
        :type new_seq_length: int
        """
        # We use self.sender() to retrieve the sequence instead of using a
        # functools.partial as a slot due to a memory leak (MSV-1538)
        seq = self.sender()
        new_num_columns = self._checkAlignmentNumColumns(
            seq, old_seq_length, new_seq_length)
        if new_num_columns is not None:
            self.signals.alignmentNumColumnsAboutToChange.emit(
                self._num_columns, new_num_columns)
    @QtCore.pyqtSlot(int, int)
    def _sequenceLengthChanged(self, old_seq_length, new_seq_length):
        """
        Respond to a sequence `lengthChanged` signal by emitting
        `signals.alignmentNumColumnsChanged` and `signals.sequenceResiduesChanged`
        as necessary.
        :param old_seq_length: The old length of the sequence
        :type old_seq_length: int
        :param new_seq_length: The current length of the sequence
        :type new_seq_length: int
        """
        # We use self.sender() to retrieve the sequence instead of using a
        # functools.partial as a slot due to a memory leak (MSV-1538)
        seq = self.sender()
        new_num_columns = self._checkAlignmentNumColumns(
            seq, old_seq_length, new_seq_length)
        if new_num_columns is not None:
            old_num_columns = self._num_columns
            self._num_columns = new_num_columns
            self.signals.alignmentNumColumnsChanged.emit(
                old_num_columns, new_num_columns)
            if old_seq_length < new_seq_length:
                if old_seq_length < old_num_columns:
                    self.signals.sequenceResiduesChanged.emit()
            else:
                if new_seq_length < new_num_columns:
                    self.signals.sequenceResiduesChanged.emit()
        else:
            self.signals.sequenceResiduesChanged.emit()
    # TODO MSV_1907: Move closer to _recalculateNumColumns
    def _checkAlignmentNumColumns(self, seq, old_seq_length, new_seq_length):
        """
        Determine what the length of the alignment will be if we changed the
        length of the specified sequence
        :param seq: The sequence that will change length
        :type seq: `sequence.Sequence`
        :param old_seq_length: The old length of the sequence
        :type old_seq_length: int
        :param new_seq_length: The new length of the sequence
        :type new_seq_length: int
        :return: If the alignment will change length, returns the new alignment
            length.  Otherwise, returns None.
        :rtype: int or NoneType
        """
        if new_seq_length > self._num_columns:
            return new_seq_length
        elif old_seq_length == self._num_columns:
            new_num_columns = self._recalculateNumColumns([seq], new_seq_length)
            return new_num_columns
    @msv_utils.const
    def _assertCanAddSeqs(self, index):
        if index is not None and index > len(self):
            raise ValueError("Cannot insert sequences at position past end")
[docs]    def addSeq(self, seq, index=None):
        """
        :param seq: The sequence to add
        :type seq: `sequence.Sequence`
        :param start: The index at which to insert; if None, seq is appended
        :type start: int
        """
        self.addSeqs([seq], index) 
[docs]    def addSeqs(self, sequences, start=None):
        """
        Add multiple sequences to the alignment
        :param sequences: Sequences to add
        :type sequences: list of `sequence.Sequence`
        :param start: The index at which to insert; if None, seqs are appended
        :type start: int
        """
        if not sequences:
            return
        self._assertCanAddSeqs(start)
        # first, change the alignment length if necessary
        old_num_columns = self._num_columns
        seq_length = max(map(len, sequences))
        if seq_length > old_num_columns:
            self.signals.alignmentNumColumnsAboutToChange.emit(
                old_num_columns, seq_length)
            self._num_columns = seq_length
            self.signals.alignmentNumColumnsChanged.emit(
                old_num_columns, seq_length)
        # then insert the sequences
        if start is None:
            start = len(self._sequences)
        last_new_index = start + len(sequences) - 1
        self.signals.sequencesAboutToBeInserted.emit(start, last_new_index)
        for index, seq in enumerate(sequences, start=start):
            self._sequences.insert(index, seq)
            self._monitorSequence(seq)
        self.signals.sequencesInserted.emit(start, last_new_index) 
[docs]    def removeSeq(self, seq):
        """
        Remove a sequence from the alignment
        :param seq: The sequence to remove
        :type seq: `sequence.Sequence`
        """
        self.removeSeqs([seq]) 
[docs]    def removeSeqs(self, seqs):
        """
        Remove multiple sequences from the alignment
        """
        seqs = set(seqs)
        if not seqs:
            return
        if self.getReferenceSeq() in seqs:
            self._assertCanRemoveRef()
        blocks, num_found = self._findSeqBlocks(seqs)
        if num_found != len(seqs):
            raise ValueError("Not all sequences present in alignment.")
        self._additionalSeqRemovalSteps(seqs)
        self._removeSeqsFromAlnSetNoReordering(seqs)
        self._changeNumColumnsForRemovedSeqs(seqs)
        self._removeSeqBlocks(blocks) 
    def _findSeqBlocks(self, seqs):
        """
        Divide up the given sequences into blocks of contiguous sequences based
        on their positions in the alignment.
        :param seqs: The sequences to divide into blocks.
        :type seqs: set
        :return: A tuple of:
            - A list of (start index, end index) for blocks containing the
              specified sequences.  This list is sorted in ascending order.
            - The number of sequences that were found in the alignment.  If this
              is less than `len(seqs)`, then some of the input sequences are not
              present in the alignment.
        :rtype: tuple(list(tuple(int, int)), int)
        """
        blocks = []
        num_found = 0
        block_start = None
        for i, seq in enumerate(self):
            if seq in seqs:
                num_found += 1
                if block_start is None:
                    block_start = i
            elif block_start is not None:
                blocks.append((block_start, i - 1))
                block_start = None
        if block_start is not None:
            blocks.append((block_start, len(self) - 1))
        return blocks, num_found
    def _additionalSeqRemovalSteps(self, seqs):
        """
        The is a hook for subclasses to carry out any additional sequence
        removal operations.  It will be called after we have confirmed that
        `seqs` are safe to remove from the alignment but before the sequences
        are actually removed.
        :param seqs: The sequences that will be removed from the alignment.
        :type seqs: set(sequence.Sequence)
        """
        pass
    def _changeNumColumnsForRemovedSeqs(self, seqs_to_remove):
        """
        Update the number of columns in the alignment in preparation for
        removing the specified sequences.
        :param seqs_to_remove: The sequences that will be removed from the
            alignment.
        :type seqs_to_remove: set(sequence.Sequence)
        """
        new_num_columns = self._recalculateNumColumns(seqs_to_remove)
        if new_num_columns is None:
            return
        self.signals.alignmentNumColumnsAboutToChange.emit(
            self._num_columns, new_num_columns)
        old_num_columns = self._num_columns
        self._num_columns = new_num_columns
        self.signals.alignmentNumColumnsChanged.emit(old_num_columns,
                                                     new_num_columns)
    def _removeSeqBlocks(self, blocks):
        """
        Remove sequences at the specified indices in the alignment.
        :param blocks: A list of (start index, end index) for blocks to remove.
            This list must be sorted in ascending order.
        :type blocks: list[tuple(int, int)]
        """
        for start, end in reversed(blocks):
            self.signals.sequencesAboutToBeRemoved.emit(start, end)
            for seq in self._sequences[start:end + 1]:
                self._stopMonitoringSequence(seq)
            del self._sequences[start:end + 1]
            self.signals.sequencesRemoved.emit(start, end)
    @msv_utils.const
    def _assertCanRemoveRef(self):
        """
        If any anchored residues are present, raise an exception stating that we
        can't remove the reference sequence.
        """
        if self.getAnchoredResidues():
            raise AnchoredResidueError("Can't remove reference sequence while "
                                       "there are anchored residues")
    @QtCore.pyqtSlot()
    def _resetCaches(self):
        """
        Utility function to simultaneously reset caches.
        """
        self._seq_positions_cache.clear()
        self._col_same_res_cache = None
        self._res_similarity_cache = None
        self._RO_anchored_residues = None
        self._RO_anchored_residues_with_ref = None
        self.annotations.clearAllCaching()
    @QtCore.pyqtSlot()
    def _resetSeqTypeCache(self):
        self._ref_type_cache = None
        self._seqs_matching_ref_type_cache = None
[docs]    def clear(self):
        """
        Clears the entire alignment of sequences
        """
        if len(self) == 0:
            return
        self.signals.alignmentAboutToBeCleared.emit()
        old_num_columns = self._num_columns
        self.signals.alignmentNumColumnsAboutToChange.emit(old_num_columns, 0)
        old_num_sequences = len(self._sequences) - 1
        self.signals.sequencesAboutToBeRemoved.emit(0, old_num_sequences)
        for seq in self._sequences:
            self._stopMonitoringSequence(seq)
        self._sequences = []
        self.signals.sequencesRemoved.emit(0, old_num_sequences)
        if old_num_columns > 0:
            self._num_columns = 0
            self.signals.alignmentNumColumnsChanged.emit(old_num_columns, 0)
        self.signals.alignmentCleared.emit() 
    @property
    def _seq_positions(self):
        if not self._seq_positions_cache:
            self._updateSequencePositionCache()
        return self._seq_positions_cache
    def _updateSequencePositionCache(self):
        """
        Update the cache of sequences to their index within the sequence.
        The sequence-index cache is stored as a mapping between sequences
        and their indices.
        """
        self._seq_positions_cache = {seq: i for i, seq in enumerate(self)}
    ############################################################################
    # Operations on Sequences
    ############################################################################
    @msv_utils.const
    def _getReferenceSeqReordering(self, seq):
        """
        Returns an ordering that will make the specified sequence the reference
        sequence
        :param seq: Sequence to set as reference sequence
        :type seq: `sequence`
        :rtype: list of int
        :return: A list of indices
        """
        seq_index = self.index(seq)
        new_seq_ordering = self._getAlnSetSeqIdxs(seq)
        to_skip = set(new_seq_ordering)
        new_seq_ordering.extend(
            [seq_i for seq_i in range(len(self)) if seq_i not in to_skip])
        return new_seq_ordering
[docs]    def setReferenceSeq(self, seq):
        """
        Set the specified sequence as the reference sequence.
        :param seq: Sequence to set as reference sequence
        :type seq: `sequence`
        """
        self._assertCanSetReferenceSeq()
        new_seq_ordering = self._getReferenceSeqReordering(seq)
        self.reorderSequences(new_seq_ordering) 
    @msv_utils.const
    def _assertCanSetReferenceSeq(self):
        """
        If any anchored residues are present, raise an exception stating that we
        can't set a new reference sequence.
        """
        if self.getAnchoredResidues():
            raise AnchoredResidueError("Can't set reference sequence while "
                                       "there are anchored residues")
[docs]    @msv_utils.const
    def getReferenceSeq(self):
        """
        Returns the sequence that has been set as reference sequence or None if
        there is no reference sequence.
        :return: The reference sequence or None
        :rtype: `Sequence` or None
        """
        if not self._sequences:
            return None
        return self[0] 
[docs]    @msv_utils.const
    def isReferenceSeq(self, seq):
        """
        Return whether or not a sequence is the reference sequence.
        :param seq: Sequence to check
        :type seq: `Sequence`
        :return: True if the sequence is the reference sequence, False
                 otherwise.
        :rtype: bool
        """
        return self._sequences and self._sequences[0] == seq 
[docs]    @msv_utils.const
    def getResidueIndices(self, residues, sort=True):
        """
        Returns the indices (in the alignment) of the specified residues
        :param residues: The list of residues and gaps to get indices for.
        :type residues: list[residue.AbstractSequenceElement]
        :param sort: Whether the returned list should be sorted.
        :type sort: bool
        :rtype: A list of (sequence index, residue index) tuples
        :return: list[tuple(int, int)]
        """
        indices = []
        by_seqs = collections.defaultdict(list)
        for res in residues:
            seq = res.sequence
            if seq is None:
                raise ValueError(
                    f"Residue {res} {id(res)} does not belong to a sequence")
            by_seqs[seq].append(res)
        for seq, residues_in_seq in by_seqs.items():
            seq_i = self._seq_positions[seq]
            res_indices = seq.indices(residues_in_seq)
            indices.extend((seq_i, res_i) for res_i in res_indices)
        if sort:
            indices.sort()
        return indices 
    @msv_utils.const
    def _assertCanRemove(self, elements):
        """
        Check whether the specified elements can be removed
        :param elements: An iterable of elements.
        :type elements: iterable(residue.AbstractSequenceElement)
        :raises AnchoredResidueError: if the elements cannot be removed
        :raises StructuredResidueError: In the event that this method attempts
            to remove structured residues
        """
        elements = set(elements)
        anchored_res = set(self.getAnchoredResidues())
        if elements.intersection(anchored_res):
            err_msg = ("Can't remove an anchored residue. Unanchor the residue "
                       "and try again")
            raise AnchoredResidueError(err_msg)
        ref_seq = self.getReferenceSeq()
        anchored_ref_res = {ref_seq[res.idx_in_seq] for res in anchored_res}
        if elements.intersection(anchored_ref_res):
            err_msg = ("Can't remove a reference residue while another residue "
                       "is anchored to it.")
            raise AnchoredResidueError(err_msg)
        if (not self._modifying_structure and
                residue.any_structured_residues(elements)):
            err_msg = ("Can't remove a structured residue.")
            raise StructuredResidueError(err_msg)
[docs]    def removeElements(self, elements):
        """
        Removes the specified elements from the alignment.
        :param elements: An iterable of elements.
        :type elements: iterable(residue.AbstractSequenceElement)
        :raises AnchoredResidueError: if the elements cannot be removed
        :raises StructuredResidueError: In the event that this method attempts
            to mutate structured residues
        """
        elements = list(elements)  # in case it's exhaustible
        self._assertCanRemove(elements)
        eles_to_remove = collections.defaultdict(list)
        for ele in elements:
            eles_to_remove[ele.sequence].append(ele)
        self.signals.residuesAboutToBeRemoved.emit(elements)
        for seq, eles in eles_to_remove.items():
            gap_idxs = self._getAnchorConservingGapIdxs(seq, eles)
            seq.removeElements(eles)
            seq.addGapsByIndices(gap_idxs)
        # We wait until after all the changes to emit the signal
        # Note that the signals.residuesRemoved signal is emitted once even if
        # elements are removed from multiple sequences.
        self.signals.residuesRemoved.emit() 
    @msv_utils.const
    def _getAnchorConservingGapIdxs(self, seq, eles):
        """
        Given a sequence and a list of elements to remove, return a list
        of indices to add gaps that ensure all residue anchors are maintained.
        :param seq: The sequence to remove elements from
        :type  seq: sequence.Sequence
        :param eles: A list of elements that will be removed from `seq`
        :type  eles: list(residue.Residue)
        :return: A sorted list of gap indices that will maintain the alignment
            of residue anchors. These indices should be added immediately after
            `eles` are removed from `seq`
        :rtype: list(int)
        """
        if seq == self.getReferenceSeq():
            seq_anchors = self.getAnchoredResidues()
        else:
            seq_anchors = self._anchored_residues.get(seq, [])
        seq_anchors = sorted(seq_anchors, key=lambda ele: ele.idx_in_seq)
        seq_anchor_idxs = [ele.idx_in_seq for ele in seq_anchors]
        # Count up the number of removed elements before each anchor.
        # We add a `None` to catch any removed elements that come after any
        # anchors.
        seq_anchors.append(None)
        counts = collections.Counter(
            seq_anchors[bisect(seq_anchor_idxs, ele.idx_in_seq)]
            for ele in eles)
        # Add gaps before any anchors equal to the number of elements
        # removed before them.
        gap_idxs = []
        for anchor, count in counts.items():
            if anchor is None:
                continue
            anchor_idx = anchor.idx_in_seq
            gap_idxs.extend(range(anchor_idx - count, anchor_idx))
        return gap_idxs
    @msv_utils.const
    def _assertCanMutateResidues(self, seq_i, start, end, elements):
        seq = self[seq_i]
        removed = seq[start:end]
        self._assertCanRemove(removed)
        if len(elements) > len(removed):
            first_extra_idx = start + len(removed)
            self._assertCanInsert(seq, [first_extra_idx])
[docs]    def mutateResidues(self, seq_i, start, end, elements):
        """
        Mutate a sequence.
        :param seq_i: Index of seq to mutate
        :type  seq_i: int
        :param start: Start index of seq region to mutate
        :type  start: int
        :param end: End index of seq region to mutate
        :type  end: int
        :param elements: Elements to mutate to
        :type  elements: iterable(str) or iterable(ElementClass)
        :raises AnchoredResidueError: if the mutation violates anchoring
        :raises StructuredResidueError: In the event that this method attempts
            to mutate structured residues
        """
        self._assertCanMutateResidues(seq_i, start, end, elements)
        seq = self._sequences[seq_i]
        length_preserving_end = start + len(elements)
        self.signals.residuesAboutToBeRemoved.emit(seq[start:end])
        if end <= length_preserving_end:
            # When more residues are added than removed, we don't need to worry
            # about anchors
            seq.mutate(start, end, elements)
            self.signals.residuesRemoved.emit()
        else:
            # A same-length mutate is safe for anchors
            seq.mutate(start, length_preserving_end, elements)
            to_remove = seq[length_preserving_end:end]
            self.removeElements(to_remove) 
[docs]    def replaceResiduesWithGaps(self, residues):
        """
        Replaces the specified residues with gaps
        :param residues: A list of residues to replace with gaps
        :type residues: list
        """
        for seq_i, res_i in self.getResidueIndices(residues):
            self.mutateResidues(seq_i, res_i, res_i + 1, [residue.Gap()]) 
[docs]    def addElements(self, seq, res_i, elements):
        """
        Adds the specified elements (residues and/or gaps) to the alignment.
        :param seq: A sequence in the alignment
        :type seq: sequence.Sequence
        :param res_i: Index to insert the elements
        :type res_i: int
        :param elements: elements to insert
        :type elements: iterable(str or residue.AbstractSequenceElement)
        """
        self._assertCanInsert(seq, [res_i])
        elements = list(elements)  # In case it's exhaustable
        seq.insertElements(res_i, elements)
        self.signals.residuesAdded.emit(elements) 
[docs]    @msv_utils.const
    def getResiduesWithStructure(self):
        """
        Returns a list of all residues with structure
        """
        def has_structure(res):
            return res.is_res and not res.seqres_only
        seqs = (seq for seq in self if seq.hasStructure())
        return list(res for res in itertools.chain(*seqs) if has_structure(res)) 
[docs]    @msv_utils.const
    @contextlib.contextmanager
    def modifyingStructure(self):
        self._modifying_structure = True
        yield
        self._modifying_structure = False 
    ############################################################################
    # Residue Anchoring
    ############################################################################
[docs]    @contextlib.contextmanager
    def suspendAnchors(self):
        """
        While inside this context, all anchors will be ignored.  Upon exit, the
        anchors will be restored and an exception will be raised if any of the
        anchors are not aligned to the same reference residues they were
        aligned to at the start.
        """
        self._suspendAnchors()
        yield
        self._unsuspendAnchors() 
    def _suspendAnchors(self):
        self._suspend_anchors_count += 1
        if self._suspend_anchors_count > 1:
            # If this is a nested anchor suspension, return early
            return
        self._saved_anchored_residues = self._anchored_residues
        self._anchor_pairs = self._getAnchorPairs()
        self._anchored_residues = types.MappingProxyType({})
        self._RO_anchored_residues = None
        self._RO_anchored_residues_with_ref = None
        # We use a read-only dict so that any attempted changes to anchoring
        # will throw an error.
    @msv_utils.const
    def _getAnchorPairs(self):
        """
        Get all anchored residues paired with the corresponding reference
        seqence residue.
        :return: A list of (reference residue, anchored residue) tuples.
        :rtype: list(tuple(residue.Residue, residue.Residue))
        """
        ref_seq = self.getReferenceSeq()
        return [(ref_seq[res.idx_in_seq], res)
                for res in self.getAnchoredResidues()]
    def _unsuspendAnchors(self):
        self._suspend_anchors_count -= 1
        if self._suspend_anchors_count > 0:
            # If we haven't exited as many times as we've entered,
            # return early
            return
        self._anchored_residues = self._saved_anchored_residues
        del self._saved_anchored_residues
        self._RO_anchored_residues = None
        self._RO_anchored_residues_with_ref = None
        for ref_res, res in self._anchor_pairs:
            if ref_res.idx_in_seq != res.idx_in_seq:
                # Discard anchors because at least one is invalid
                self._anchored_residues.clear()
                raise AnchoredResidueError(
                    'Residue anchoring broken during suspension'
                    f': {ref_res.idx_in_seq}, {res.idx_in_seq}')
        del self._anchor_pairs
    @msv_utils.const
    def _assertCanAnchor(self, residues):
        ref_seq = self.getReferenceSeq()
        if any(res.idx_in_seq >= len(ref_seq) for res in residues):
            raise ValueError("Can't anchor residues that extend beyond the "
                             "length of the reference sequence.")
        if any(ref_seq[res.idx_in_seq].is_gap for res in residues):
            raise ValueError("Can't anchor to a reference gap")
[docs]    def anchorResidues(self, residues):
        """
        Anchor the specified residues. If passed reference residues, all
        residues aligned to the reference residues will be anchored.
        Anchored residues are constrained to stay aligned to the reference
        residue with the same column index at the time of anchoring. If
        elements are removed from the alignment, gaps are added before anchors
        to maintain alignment. If any other modifications are made to the
        alignment that would break an anchor, an exception is raised. However,
        calling code can temporarily take responsibility for maintaining the
        anchors within the `suspendAnchors` context.
        :param residues: Residues to anchor.
        :type  residues: list(residue.Residue)
        """
        self._assertCanAnchor(residues)
        requested_residues = set(residues)
        ref_seq = self.getReferenceSeq()
        should_emit_signal = False
        residues_to_anchor = []
        for res in requested_residues:
            if res in ref_seq:
                # If the reference residue is the only residue in the column
                # in `requested_residues`, add all of the column residues to
                # `residues_to_anchor`.
                ref_idx = res.idx_in_seq
                col_elems = self.getColumn(ref_idx, omit_gaps=False)[1:]
                if not any(elem in requested_residues for elem in col_elems):
                    def is_gap(elem):
                        is_terminal_gap = elem is None
                        is_regular_gap = not is_terminal_gap and elem.is_gap
                        return is_terminal_gap or is_regular_gap
                    residues_to_anchor.extend(
                        elem for elem in col_elems if not is_gap(elem))
            elif res.is_res:
                residues_to_anchor.append(res)
        for res in residues_to_anchor:
            if res not in self._anchored_residues[res.sequence]:
                self._anchored_residues[res.sequence].add(res)
                should_emit_signal = True
        if should_emit_signal:
            self.signals.anchoredResiduesChanged.emit() 
    def _anchorResidueIndices(self, indices):
        """
        Anchor the specified residues.
        :param indices: A list of (sequence index, residue index) tuples for all
            residues to anchor.
        :type indices: list[tuple(int, int)]
        """
        anchor_residues = [
            self[seq_idx][res_idx] for seq_idx, res_idx in indices
        ]
        self.anchorResidues(anchor_residues)
[docs]    @msv_utils.const
    def getAnchoredResidues(self):
        """
        :return: A frozenset of residues that are currently anchored.
        :rtype: frozenset(residue.Residue)
        """
        # Return a read-only set of all anchored residues.
        if self._RO_anchored_residues is None:
            all_anchored_residues = itertools.chain(
                *self._anchored_residues.values())
            self._RO_anchored_residues = frozenset(all_anchored_residues)
        return self._RO_anchored_residues 
    @msv_utils.const
    def _getAnchoredResidueIndices(self):
        """
        Return [sequence index, residue index] for all anchored residues.
        :rtype: list[list(int, int)]
        """
        residues = self.getAnchoredResidues()
        return list(map(list, self.getResidueIndices(residues)))
[docs]    @msv_utils.const
    def getAnchoredResiduesWithRef(self):
        """
        :return: A frozenset of residues that are currently anchored with the
            corresponding reference sequence residues
        :rtype: frozenset(residue.Residue)
        """
        if len(self) == 0:
            return set()
        if self._RO_anchored_residues_with_ref is None:
            self._RO_anchored_residues_with_ref = \
                
frozenset(itertools.chain.from_iterable(self._getAnchorPairs()))
        return self._RO_anchored_residues_with_ref 
[docs]    def clearAnchors(self):
        self._clearAnchorsFromSeqs(0, len(self)) 
    @QtCore.pyqtSlot(int, int)
    def _clearAnchorsFromSeqs(self, start_seq_idx, end_seq_idx):
        """
        Clear all anchors from self[start_seq_idx:end_seq_idx+1]
        """
        should_emit_signal = False
        for seq in self._sequences[start_seq_idx:end_seq_idx + 1]:
            if seq in self._anchored_residues:
                residues = self._anchored_residues.pop(seq)
                if residues:
                    should_emit_signal = True
        if should_emit_signal:
            self.signals.anchoredResiduesChanged.emit()
[docs]    def removeAnchors(self, residues):
        """
        Unanchor residues. If passed reference residues, all residues anchored
        to those reference residues will be unanchored.  Any given unanchored
        residues will be ignored.
        :param residues: The residues to unanchor.
        :type  residues: iterable(residue.Residue)
        """
        residues = list(residues)
        residues_to_unanchor = []
        ref_seq = self.getReferenceSeq()
        for res in residues:
            if res in ref_seq:
                ref_idx = res.idx_in_seq
                col_elems = self.getColumn(ref_idx, omit_gaps=True)
                residues_to_unanchor.extend(col_elems[1:])  # omit ref res
            else:
                residues_to_unanchor.append(res)
        should_emit_signal = False
        if residues_to_unanchor:
            for res in residues_to_unanchor:
                try:
                    self._anchored_residues[res.sequence].remove(res)
                    should_emit_signal = True
                except KeyError:
                    pass
            if should_emit_signal:
                self.signals.anchoredResiduesChanged.emit() 
    @msv_utils.const
    def _assertCanInsert(self, seq, insert_indices):
        """
        Raise an exception if can't insert due to anchor
        :param seq: Sequence where insertion will occur
        :type seq: sequence.Sequence
        :param insert_indices: Indices of insertion (must be non-empty).
        :type insert_indices: iterable(int)
        """
        if seq == self.getReferenceSeq():
            seq_anchors = self.getAnchoredResidues()
        else:
            seq_anchors = self._anchored_residues.get(seq, [])
        if seq_anchors:
            first_idx = min(insert_indices)
            anchor_ok = all(res.idx_in_seq < first_idx for res in seq_anchors)
            if not anchor_ok:
                err_msg = ("Adding elements before an anchored residue will "
                           "break the anchoring.")
                raise AnchoredResidueError(err_msg)
    ############################################################################
    # Subalignments
    ############################################################################
[docs]    @msv_utils.const
    def getSubalignment(self, start, end):
        """
        Return another alignment containing the elements within the specified
        start and end indices
        :param start: The index at which the subalignment should start
        :type start: int
        :param end: The index at which the subalignment should end (exclusive)
        :type end: int
        :return: An alignment corresponding to the start and end points
            specified
        :rtype: BaseAligment
        """
        AlignmentClass = self.__class__
        seqs = [seq.getSubsequence(start, end) for seq in self._sequences]
        aln = AlignmentClass(seqs)
        return aln 
[docs]    @msv_utils.const
    def getDiscontinuousSubalignment(self, indices):
        """
        Given a list of indices, return a new alignment of sequences made up of
        the residues at those specified indices within this alignment.
        :param indices: List of (seq index, residue index) tuples
        :type indices: list of (int, int)
        :return: A new subalignment
        :rtype: BaseAlignment
        """
        indices.sort()
        res_range = [idx[1] for idx in indices]
        min_i = min(res_range)
        max_i = max(res_range)
        AlignmentClass = self.__class__
        seqs = []
        for seq_idx, group in itertools.groupby(indices, lambda idx: idx[0]):
            seq = self._sequences[seq_idx]
            SeqClass = seq.__class__
            res_idxs = [idx[1] for idx in group]
            res_list = []
            for ridx in range(min_i, max_i + 1):
                res = seq[ridx] if ridx in res_idxs else None
                res_list.append(res)
            seqs.append(
                SeqClass(res_list,
                         name=seq.name,
                         long_name=seq.long_name,
                         chain=seq.chain))
        aln = AlignmentClass(seqs)
        return aln 
    @msv_utils.const
    def _assertCanRemoveSubalignment(self, start, end):
        removed = []
        for seq in self:
            removed.extend(seq[start:end])
        self._assertCanRemove(removed)
[docs]    def removeSubalignment(self, start, end):
        """
        Remove a block of the subalignment from the start to end points.
        :param start: The start index of the columns to remove
        :type start: int
        :param end: The end index of the columns to remove (exclusive)
        :type end: int
        """
        self._assertCanRemoveSubalignment(start, end)
        removed = []
        for seq in self:
            removed.extend(seq[start:end])
        with self.suspendAnchors():
            self.removeElements(removed) 
    @property
    def is_rectangular(self):
        return all(len(seq) == self.num_columns for seq in self)
    def _assertRectangular(self, other_aln):
        """
        :raises ValueError: if either this or the other alignment are not
            rectangular
        """
        if not other_aln.is_rectangular:
            msg = "The other alignment must be rectangular"
            raise ValueError(msg)
        if not self.is_rectangular:
            msg = "This alignment must be rectangular to modify a subalignment"
            raise ValueError(msg)
[docs]    def insertSubalignment(self, aln, start):
        """
        Insert an alignment into the current alignment at the specified index
        :param aln: The alignment to insert
        :type aln: `BaseAlignment`
        :param start: The index at which to insert the alignment
        :type start: int
        :raises ValueError: if either alignment is not rectangular
        """
        self._insertSubalignment(aln, start, require_rectangular=True) 
    @msv_utils.const
    def _insertSubalignment(self, aln, start, require_rectangular=True):
        if len(aln) != len(self):
            msg = ("Attempting to insert alignment with %d sequences into an "
                   "alignment with %d sequences", (len(aln), len(self)))
            raise ValueError(msg)
        if require_rectangular:
            self._assertRectangular(aln)
        with self.suspendAnchors():
            for seq, insert_seq in zip(self, aln):
                self.addElements(seq, start, insert_seq)
[docs]    def replaceSubalignment(self, aln, start, end):
        """
        Replace a subsection of the alignment indicated by start and end indices
        with the specified alignment
        :param aln: The alignment to insert
        :type aln: `BaseAlignment`
        :param start: The starting index of the subsection to replace.
        :type start: int
        :param end: The ending index of the subsection to replace.
        :type end: int
        :raises ValueError: if either alignment is not rectangular
        """
        self._assertRectangular(aln)
        self.removeSubalignment(start, end)
        self.insertSubalignment(aln, start) 
    ############################################################################
    # Gaps
    ############################################################################
[docs]    @msv_utils.const
    def getGaps(self):
        """
        Returns a list of list of gaps.
        :return: list(list(residue.Gap))
        :rtype: list
        """
        return [seq.getGaps() for seq in self] 
[docs]    @msv_utils.const
    def getTerminalGaps(self):
        """
        Returns the terminal gaps in all the sequences
        :rtype: list
        :return: list(list(residue.Gap))
        """
        return [seq.getTerminalGaps() for seq in self] 
[docs]    def removeAllGaps(self):
        """
        Removes all the gaps of the sequences in the alignment.
        """
        elems_to_remove = [elem for seq in self for elem in seq if elem.is_gap]
        if not elems_to_remove:
            return
        if self.getAnchoredResidues():
            raise AnchoredResidueError(
                "Can't removeAllGaps while there is an "
                "anchored residue. Clear all anchors and try "
                "again.")
        self.signals.residuesAboutToBeRemoved.emit(elems_to_remove)
        for seq in self:
            seq.removeAllGaps()
        self.signals.residuesRemoved.emit() 
[docs]    def removeTerminalGaps(self):
        """
        Removes the gaps from the ends of every sequence in the alignment
        """
        for seq in self:
            seq.removeTerminalGaps() 
    @msv_utils.const
    def _validateGapBeforeIndices(self, gap_indices):
        """
        Check whether inserting the gaps would break anchors. Intended for gap
        indices before the insertion, as passed to `addGapsBeforeIndices`.
        See `_validateGapIndices` for additional documentation.
        """
        self._validateGapIndices(
            [sequence.offset_indices(indices) for indices in gap_indices])
    @msv_utils.const
    def _validateGapIndices(self, gap_indices):
        """
        Check whether inserting the gaps would  break anchors. Intended for gap
        indices after the insertion, as passed to `addGapsByIndices`.
        :param gap_indices: A list of lists of gap indices
        :type gap_indices: list[list[int]]
        :raises ValueError: If `gap_indices` is the wrong length or contains
            indices that will be more than one column past the end of a
            sequence.
        :raises AnchoredResidueError: If any gap index is before an anchored col
        """
        n_seqs = len(self)
        if len(gap_indices) != n_seqs:
            msg = ("The number of gap_indices passed to addGaps "
                   f"({len(gap_indices)}) should match the number of sequences "
                   f"in the alignment ({n_seqs})")
            raise ValueError(msg)
        for cur_gap_indices, seq in zip(gap_indices, self):
            seq.validateGapIndices(cur_gap_indices)
        seq_idxs_with_gaps = collections.defaultdict(list)
        for seq_idx, gaps in enumerate(gap_indices):
            for col_idx in gaps:
                seq_idxs_with_gaps[col_idx].append(seq_idx)
        broken_anchors = set()
        n_gaps_per_seq = [0] * n_seqs
        unbroken_seq_idxs = set(range(1, n_seqs))
        for col_idx, seq_idxs in sorted(seq_idxs_with_gaps.items()):
            prev_n_gaps_per_seq = n_gaps_per_seq.copy()
            # Update running total of gaps added per seq
            for seq_idx in seq_idxs:
                n_gaps_per_seq[seq_idx] += 1
            n_ref_gaps = n_gaps_per_seq[0]
            for seq_idx in sorted(unbroken_seq_idxs):
                n_nonref_gaps = n_gaps_per_seq[seq_idx]
                # To keep anchors aligned, equal numbers of gaps must be added
                # to the ref and non-ref seq.
                insertion_allowed = bool(n_nonref_gaps == n_ref_gaps)
                if insertion_allowed:
                    continue
                # If the insertion is uneven, all downstream anchors for the
                # current non-ref seq will be broken.
                unbroken_seq_idxs.remove(seq_idx)
                # Account for non-anchor-breaking upstream gap insertions
                gap_offset = prev_n_gaps_per_seq[seq_idx]
                msg = ("Previous gap offsets must be equal, otherwise this "
                       "seq was broken in a previous column and shouldn't have "
                       "been checked again")
                assert gap_offset == prev_n_gaps_per_seq[0], msg
                offset_col_idx = col_idx - gap_offset
                downstream_anchors = self._getDownstreamAnchors(
                    seq_idx, offset_col_idx)
                broken_anchors.update(downstream_anchors)
        if broken_anchors:
            err_msg = ("Adding elements before an anchored residue will "
                       "break the anchoring.")
            raise AnchoredResidueError(err_msg, broken_anchors)
    @msv_utils.const
    def _getDownstreamAnchors(self, seq_idx, col_idx):
        """
        Get anchors at or downstream of the given column.
        :param seq_idx: Index of sequence to get anchors for
        :type seq_idx: int
        :param col_idx: Index of column to get anchors for
        :type col_idx: int
        :return: Anchors downstream of `col_idx` in the sequence
        :rtype: list[residue.Residue]
        """
        seq = self[seq_idx]
        anchored_res = self._anchored_residues.get(seq, set())
        return [res for res in anchored_res if res.idx_in_seq >= col_idx]
[docs]    def addGapsByIndices(self, gap_indices):
        """
        Adds gaps to the alignment
        :note: the length of the gap_indices list must match the number
            of sequences in the alignment.
        :param gap_indices: A list of lists of gap indices, one for each
            sequence in the alignment.  Note that these indices are based on
            residue/gap numbering *after* the insertion.  To insert gaps using
            indices based on numbering before the insertion, see
            `addGapsBeforeIndices`.
        :type gap_indices: list[list[int]]
        :raises ValueError: if `gap_indices` is the wrong length
        :raises AnchoredResidueError: if any gap index is before an anchored col
        """
        self._validateGapIndices(gap_indices)
        for seq, indices in zip(self._sequences, gap_indices):
            seq.addGapsByIndices(indices) 
[docs]    def addGapsBeforeIndices(self, gap_indices):
        """
        Add one gap to the alignment before each of the specified residue
        positions.
        :note: the length of the gap_indices list must match the number
            of sequences in the alignment.
        :param gap_indices: A list of lists of indices to insert gaps before, one
            for each sequence in the alignment.  Note that these indices are
            based on residue/gap numbering *before* the insertion.  To insert
            gaps using indices based on numbering after the insertion, see
            `addGapsByIndices`.
        """
        self._validateGapBeforeIndices(gap_indices)
        for seq, cur_indices in zip(self._sequences, gap_indices):
            seq.addGapsBeforeIndices(cur_indices) 
[docs]    def padAlignment(self):
        """
        Insert gaps into an alignment so that it forms a rectangular block
        """
        pad_gaps = []
        length = self.num_columns
        for seq in self:
            seq_len = len(seq)
            gap_locations = list(range(seq_len, length))
            pad_gaps.append(gap_locations)
        self.addGapsByIndices(pad_gaps) 
[docs]    @msv_utils.const
    def getGapOnlyColumns(self):
        """
        For each sequence, return a list of the indices in that sequence for
        which the entire alignment contains gaps.  (Indices will be omitted for
        a sequence if the sequence is shorter than the index.)
        :return: List of list of indices
        :rtype: list[list[int]]
        """
        def all_gaps(col):
            # Empty column means all gaps
            return len(col) == 0
        gap_indices = [
            i for (i, col) in enumerate(self.columns(omit_gaps=True))
            if all_gaps(col)
        ]
        truncated_indices = []
        for seq in self:
            # filter out gap indices beyond the length of the seq
            seq_gaps = [i for i in gap_indices if i < len(seq)]
            truncated_indices.append(seq_gaps)
        return truncated_indices 
[docs]    def minimizeAlignment(self):
        """
        Minimizes the alignment, i.e. removes all gaps from the gap-only
        columns.
        """
        gap_only_columns = self.getGapOnlyColumns()
        for seq, indices, in zip(self, gap_only_columns):
            # Make sure that we're not requesting the removal of a gap that
            # doesn't exist. This can happen when minimizing an alignment
            # in which the sequences are different lengths.
            gaps = [seq[i] for i in indices if i < len(seq)]
            seq.removeElements(gaps) 
[docs]    @msv_utils.const
    def getAlignmentMinimizedWithSpaces(self):
        """
        This method returns a new alignment and removes gap only columns
        however it leaves one gap column between blocks
        :returns: the new, minimized alignment
        :rtype: BaseAlignment
        """
        new_aln = copy.deepcopy(self)
        gap_only_cols = new_aln.getGapOnlyColumns()
        # Process each sequence separately, because sequences may have
        # unequal number of residues
        for seq, gaps in zip(new_aln, gap_only_cols):
            if len(gaps):
                gap_groups = get_contiguous_groups(gaps)
                to_remove = []
                # If the alignment has a leading gap-only column, remove it
                if gap_groups[0][0] == 0:
                    to_remove.append(0)
                for group in gap_groups:
                    to_remove.extend(group[1:])
                if len(to_remove):
                    seq.removeElements([seq[i] for i in to_remove])
        return new_aln 
    ############################################################################
    # Columns
    ############################################################################
[docs]    @msv_utils.const
    def getColumn(self, index, omit_gaps=False):
        """
        Returns single alignment column at index position. Optionally,
        filters out gaps if omit_gaps is True.
        :param index: The index in the alignment
        :type index: int
        :param omit_gaps: Whether to omit the gaps
        :type omit_gaps: bool
        :return: Single alignment column at index position. Returns None
            to represent terminal gaps.
        :rtype: tuple(residue.Residue or residue.Gap or None)
        """
        column = []
        for seq in self:
            elem = None
            if index < len(seq):
                elem = seq[index]
            if not omit_gaps or (elem is not None and elem.is_res):
                column.append(elem)
        return tuple(column) 
[docs]    @msv_utils.const
    def columns(self, omit_gaps=False, *, match_type=False):
        """
        A generator over all columns.
        :param omit_gaps: Whether to omit gaps
        :type omit_gaps: bool
        :param match_type: Whether to match reference sequence type
        :type match_type: bool
        """
        if omit_gaps:
            def keep_func(elem):
                return elem is not None and elem.is_res
        else:
            def keep_func(elem):
                return elem is not None
        if match_type:
            seqs = list(self.getSeqsMatchingRefType())
            # Make sure we yield a tuple for every column
            seqs.append(itertools.repeat(None, self.num_columns))
        else:
            seqs = self
        for col in itertools.zip_longest(*seqs):
            yield tuple(elem for elem in col if keep_func(elem)) 
[docs]    @msv_utils.const
    def seqMatchesRefType(self, seq):
        if self._ref_type_cache is None:
            # TODO MSV-1504 once nucleotide sequence doesn't inherit protein
            # sequence, use isinstance instead of type check
            self._ref_type_cache = type(self.getReferenceSeq())
        return type(seq) == self._ref_type_cache 
[docs]    @msv_utils.const
    def getSeqsMatchingRefType(self):
        if self._seqs_matching_ref_type_cache is None:
            self._seqs_matching_ref_type_cache = tuple(
                seq for seq in self if self.seqMatchesRefType(seq))
        return self._seqs_matching_ref_type_cache 
    def _updateColumnSameResCache(self):
        """
        Update the cache of whether each column has all the same residue.
        False if any residue in the column is unknown or the column is mixed
        type (protein/nucleic acids).
        """
        self._col_same_res_cache = [False] * self.num_columns
        for index, column in enumerate(self.columns(omit_gaps=True)):
            if self._getColumnHasSameRes(column):
                self._col_same_res_cache[index] = True
    def _getColumnHasSameRes(self, column):
        col_set = set()
        # To make sure we aren't mixing protein and nucleic acid residues
        seen_types = set()
        for res in column:
            if res.type in {residue.UNKNOWN, residue.UNKNOWN_NA}:
                return False
            seen_types.add(type(res.type))
            if len(seen_types) > 1:
                return False
            if res.type.nonstandard:
                col_set.add(res.long_code)
            else:
                col_set.add(res.short_code)
            if len(col_set) > 1:
                return False
        return len(col_set) == 1
    def _updateResidueSimilarityCache(self):
        """
        Update the cache of residue similarities
        """
        self._res_similarity_cache = {}
        ref_seq = self.getReferenceSeq()
        if ref_seq is None:
            return
        for seq in self:
            for ref_res, res in itertools.zip_longest(ref_seq, seq):
                if not res:
                    continue
                elif not ref_res or ref_res.is_gap or res.is_gap:
                    sim = ResidueSimilarity.NA
                elif res.getIdentityStrict(ref_res):
                    sim = ResidueSimilarity.Identical
                elif res.getBinarySimilarity(ref_res):
                    sim = ResidueSimilarity.Similar
                else:
                    sim = ResidueSimilarity.Dissimilar
                self._res_similarity_cache[res] = sim
[docs]    @msv_utils.const
    def columnHasAllSameResidues(self, index):
        """
        Return whether or not the column at a specified index has all the
        same residues (excluding gaps).
        Note that if any unknown residues are present, the column will not
        be considered to be of all the same residue type.
        :param index: Index to check for uniformity
        :type index: int
        :return: True if the column is of uniform identity, False otherwise.
        :rtype: bool
        """
        if self._col_same_res_cache is None:
            self._updateColumnSameResCache()
        return self._col_same_res_cache[index] 
[docs]    @msv_utils.const
    def getResidueSimilarity(self, res):
        """
        Return the similarity score of a residue to the current reference
        residue at the residues position in the alignment.
        :param res: Residue to get the similarity score for
        :type res: `residue.Residue`
        :return: Similarity score for this residue
        :rtype: float or None
        """
        if self._res_similarity_cache is None:
            self._updateResidueSimilarityCache()
        return self._res_similarity_cache[res] 
[docs]    @msv_utils.const
    def elementsToContiguousColumns(self,
                                    elements,
                                    invert=False,
                                    additional_breaks=None,
                                    last_col=None):
        """
        Get elements marking contiguous columns containing any of the passed
        elements
        :param elements: Elements to convert to columns
        :type elements: iterable(AbstractSequenceElement)
        :param invert: Whether to invert logic (i.e. return columns not
            containing the passed elements)
        :type invert: bool
        :param additional_breaks: If given, contiguous columns will be broken at
            the specified indices.  I.e., no contiguous set of columns will
            contain both column i and column i-1.
        :type additional_breaks: list[int] or None
        :param last_col: If given, the last column to consider when constructing
            contiguous columns.  It not given, all columns will be considered.
        :type last_col: int or None
        :return: [start, end] elements of contiguous columns. Will be from the
            ref sequence unless the ref sequence is shorter than `num_columns`
        :rtype: iterable(tuple(AbstractSequenceElement,
                               AbstractSequenceElement))
        """
        elements = set(elements)
        if additional_breaks is None:
            additional_breaks = set()
        else:
            additional_breaks = set(additional_breaks)
        start = None
        end = None
        columns = itertools.islice(self.columns(), last_col)
        for i, column_elems in enumerate(columns):
            if i in additional_breaks and start is not None:
                yield start, end
                start = None
                end = None
            if invert:
                match = all(ele not in elements for ele in column_elems)
            else:
                match = any(ele in elements for ele in column_elems)
            if match:
                column_residues = (ele for ele in column_elems if ele.is_res)
                cur_ele = next(column_residues, None)
                if cur_ele is None:
                    raise RuntimeError("Call aln.minimizeAlignment first")
                if start is None:
                    start = cur_ele
                end = cur_ele
            elif start is not None:
                yield start, end
                start = None
                end = None
        if start is not None:
            yield start, end 
    ############################################################################
    # Utilities
    ############################################################################
[docs]    @QtCore.pyqtSlot()
    @msv_utils.const
    def clearAllCaching(self):
        self._resetCaches()
        self.annotations.clearAllCaching()
        for seq in self._sequences:
            seq.clearAllCaching() 
    @msv_utils.const
    def _getMoveSequenceAfterIndices(self, dest_seq, seqs_to_move):
        """
        Figure out the rearrangements required to move sequences in the
        alignment.  Note that the reference sequence cannot be moved by this
        method.  If present in `seqs_to_move`, it will be ignored.
        :param dest_seq: The sequence to place `seqs_to_move` after.
        :type dest_seq: sequence.Sequence
        :param seqs_to_move: A list of all sequences to move.
        :type seqs_to_move: list[sequence.Sequence]
        :return: A list of new indices to accomplish the specified move.
        :rtype: list[int]
        """
        if not seqs_to_move:
            raise ValueError("No sequences to move")
        dest_index = self.index(dest_seq) + 1
        indices_to_move = sorted(map(self.index, seqs_to_move))
        if indices_to_move[0] == 0:
            # don't allow the reference sequence to be dragged
            del indices_to_move[0]
        if not indices_to_move:
            # Raise an exception if the reference sequence was the *only*
            # sequence specified.  (Otherwise, we just ignore it.)
            raise ValueError("Cannot move the reference sequence")
        dest_index -= sum(1 for index in indices_to_move if index < dest_index)
        new_indices = [i for i in range(len(self)) if i not in indices_to_move]
        new_indices[dest_index:dest_index] = indices_to_move
        return new_indices
    @msv_utils.const
    def _getAddSeqsToAlnSetOrdering(self, seqs_to_add, set_name):
        """
        Figure out how to rearrange the alignment to move the specified
        sequences into the given alignment set.
        :param seqs_to_add: The sequences to add to the alignment set.  Must not
            be empty and must not contains any sequences that are already in the
            alignment set being added to.
        :type seqs_to_add: Iterable[sequence.Sequence]
        :param set_name: The name of the alignment set to add to
        :type set_name: str
        :return: A list of new indices to accomplish the desired rearrangement.
        :rtype: list[int]
        """
        seqs_to_add = set(seqs_to_add)
        ref_seq = self.getReferenceSeq()
        if set_name in self._name_to_set:
            aln_set = self._name_to_set[set_name]
            if ref_seq in seqs_to_add:
                # We're adding the reference sequence to the set, so move the
                # set to the top of the alignment
                new_indices = self._alnSetOrderingRefSeq(
                    ref_seq, seqs_to_add, aln_set)
            else:
                last_seq_in_set = max(aln_set, key=self.index)
                new_indices = self._getMoveSequenceAfterIndices(
                    last_seq_in_set, seqs_to_add)
        else:
            # this is a new set, so move all sequences to the location of the
            # first added sequence
            first_seq_to_add = min(seqs_to_add, key=self.index)
            prev_set = self._seq_to_set.get(first_seq_to_add)
            if prev_set is None:
                seqs_to_add -= {first_seq_to_add}
                if seqs_to_add:
                    new_indices = self._getMoveSequenceAfterIndices(
                        first_seq_to_add, seqs_to_add)
                else:
                    # we don't need to do any rearranging
                    new_indices = list(range(len(self)))
            elif first_seq_to_add is ref_seq:
                # we're changing the set of the reference sequence, so move all
                # of the sequences to add to the top of the alignment
                new_indices = self._alnSetOrderingRefSeq(
                    ref_seq, seqs_to_add, [])
            else:
                # the first added sequence is already in a set, so we need to
                # move it to the end of the set it's already in
                new_set_indices = set(map(self.index, seqs_to_add))
                last_index_from_prev_set = max(map(self.index, prev_set))
                new_indices = list(i for i in range(last_index_from_prev_set +
                                                    1)
                                   if i not in new_set_indices)
                new_indices.extend(sorted(new_set_indices))
                already_placed = set(new_indices)
                new_indices.extend(
                    i for i in range(len(self)) if i not in already_placed)
        return new_indices
    def _alnSetOrderingRefSeq(self, ref_seq, seqs_to_add, aln_set):
        """
        Figure out how to rearrange the alignment to move the specified
        sequences, which contain the reference sequence, into a new or existing
        alignment set.
        :param ref_seq: The reference sequence
        :type ref_seq: sequence.Sequence
        :param seqs_to_add: The sequences to add to the alignment set.  Must
            contain the reference sequence and must not contains any sequences
            that are already in the alignment set being added to.
        :type seqs_to_add: Iterable[sequence.Sequence]
        :param aln_set: An iterable of sequences already in the alignment set,
            if any.
        :type aln_set: Iterable(sequence.Sequence)
        """
        new_indices = [0]
        indices_in_set = sorted(map(self.index, aln_set))
        new_indices.extend(indices_in_set)
        indices_to_add = sorted(map(self.index, seqs_to_add - {ref_seq}))
        new_indices.extend(indices_to_add)
        already_placed = set(new_indices)
        new_indices.extend(
            i for i in range(len(self)) if i not in already_placed)
        return new_indices
[docs]    def addSeqsToAlnSet(self, seqs, set_name):
        """
        Add all given sequences to the specified alignment set (i.e. a named
        group of sequences that are always kept together in the alignment).
        Sequences already in the set will be ignored.  All other sequences will
        be moved to the end of the set.  (Except for the reference sequence: The
        specified set will be moved to the top of the alignment if the reference
        sequence is added.)
        :param seqs: The sequences to add to the set.
        :type seqs: Iterable[sequence.Sequence]
        :param set_name: The name of the set to add the sequences to.  If no set
            of this name exists, one will be created.
        :type set_name: str
        """
        seqs = set(seqs)
        if set_name in self._name_to_set:
            seqs.difference_update(self._name_to_set[set_name])
        if not seqs:
            return
        new_indices = self._getAddSeqsToAlnSetOrdering(seqs, set_name)
        self.reorderSequences(new_indices)
        self._addSeqsToAlnSetNoReordering(seqs, set_name) 
    def _addSeqsToAlnSetNoReordering(self, seqs, set_name):
        """
        Add all given sequences to the specified alignment set (i.e. a named
        group of sequences that are always kept together in the alignment).
        Sequences will not be moved in the alingment.
        :param seqs: The sequences to add to the set.
        :type seqs: Iterable[sequence.Sequence]
        :param set_name: The name of the set to add the sequences to.  If no set
            of this name exists, one will be created.
        :type set_name: str
        """
        alignment_set = self._name_to_set.get(set_name)
        if alignment_set is None:
            alignment_set = AlignmentSet(set_name, self._set_id_counter)
            self._set_id_counter += 1
            self._name_to_set[set_name] = alignment_set
        for cur_seq in seqs:
            old_set = self._seq_to_set.get(cur_seq)
            if old_set is alignment_set:
                # The sequence is already part of the requested set, so there's
                # nothing to do
                continue
            elif old_set is not None:
                self._removeSeqsFromAlnSetNoReordering([cur_seq])
            alignment_set.add(cur_seq)
            self._seq_to_set[cur_seq] = alignment_set
        self.signals.alnSetChanged.emit()
[docs]    def removeSeqsFromAlnSet(self, seqs):
        """
        Remove all given sequences from any alignment sets they're part of.
        Sequences not in a set will be ignored.  All other sequences will be
        moved to the end of the set that they were in.
        :param seqs: The sequences to remove from alignment sets.
        :type seqs: Iterable[sequence.Sequence]
        """
        # ignore any sequences that aren't in a set
        seqs = set(seqs).intersection(self._seq_to_set.keys())
        if not seqs:
            # nothing to remove
            return
        reordering = self._getRemoveFromAlnSetOrdering(seqs)
        self.reorderSequences(reordering)
        self._removeSeqsFromAlnSetNoReordering(seqs) 
    @msv_utils.const
    def _getRemoveFromAlnSetOrdering(self, seqs_to_remove):
        """
        Figure out how to rearrange the alignment to move the specified
        sequences out of their current alignment set.
        :param seqs_to_remove: The sequences to remove from alignment sets.
            Must not contain any sequences that are not in an alignment set.
        :type seqs_to_remove: Iterable[sequence.Sequence]
        :return: A list of new indices to accomplish the desired rearrangement.
        :rtype: list[int]
        """
        seqs_to_remove = set(seqs_to_remove)
        # never move the reference sequence
        ordering = [0]
        cur_to_remove = []
        prev_set = None
        for i, cur_seq in enumerate(self[1:], start=1):
            cur_set = self.alnSetForSeq(cur_seq)
            if prev_set is not None and cur_set is not prev_set:
                # We've reached the end of a set, so put all sequences that we
                # removed from the set here.
                ordering.extend(cur_to_remove)
                cur_to_remove.clear()
            if cur_seq in seqs_to_remove:
                cur_to_remove.append(i)
            else:
                ordering.append(i)
            prev_set = cur_set
        ordering.extend(cur_to_remove)
        return ordering
    def _removeSeqsFromAlnSetNoReordering(self, seqs):
        """
        Remove all given sequences from any alignment sets they're part of.
        Sequences will not be moved in the alignment.
        :param seqs: The sequences to remove from alignment sets.
        :type seqs: Iterable[sequence.Sequence]
        """
        should_emit_signal = False
        for cur_seq in seqs:
            alignment_set = self._seq_to_set.get(cur_seq)
            if alignment_set is not None:
                should_emit_signal = True
                del self._seq_to_set[cur_seq]
                alignment_set.remove(cur_seq)
                if not alignment_set:
                    # remove the set if it's empty
                    del self._name_to_set[alignment_set.name]
        if should_emit_signal:
            self.signals.alnSetChanged.emit()
[docs]    def renameAlnSet(self, old_name, new_name):
        """
        Rename the specified alignment set.
        :param old_name: The old name of the alignment set.
        :type old_name: str
        :param new_name: The new name of the alignment set.
        :type new_name: str
        """
        if new_name in self._name_to_set:
            raise ValueError(f"Set {new_name} already present.")
        try:
            alignment_set = self._name_to_set[old_name]
        except KeyError:
            raise ValueError(f"No set {old_name}")
        del self._name_to_set[old_name]
        alignment_set.name = new_name
        self._name_to_set[new_name] = alignment_set
        self.signals.alnSetChanged.emit() 
[docs]    @msv_utils.const
    def alnSetForSeq(self, seq):
        """
        Return the alignment set that contains the given sequence.
        :param seq: The sequence to retrieve the alignment set for.
        :type seq: sequence.Sequence
        :return: The requested set.  The calling scope must not modify the
            returned value.  Will return None if `seq` is not  part of any set.
        :rtype: AlignmentSet or None
        """
        return self._seq_to_set.get(seq) 
[docs]    @msv_utils.const
    def hasAlnSets(self):
        """
        Does this alignment contain any alignment sets?
        :rtype: bool
        """
        return bool(self._seq_to_set) 
[docs]    @msv_utils.const
    def alnSetNames(self):
        """
        Return all alignment set names.
        :rtype: set(str)
        """
        return set(self._name_to_set.keys()) 
[docs]    @msv_utils.const
    def alnSets(self):
        """
        Iterate through all alignment sets.
        :return: An iterator through all alignment sets.  The calling scope must
            not modify any of the sets.
        :rtype: dict_keys
        """
        return self._name_to_set.values() 
[docs]    @msv_utils.const
    def getAlnSet(self, set_name):
        """
        Return the requested set.
        :param set_name: The name of the set to retrieve.
        :type set_name: str
        :return: The requested set.  The calling scope must not modify the
            returned value.
        :rtype: AlignmentSet
        :raise ValueError: If no set with the given name was found.
        """
        try:
            return self._name_to_set[set_name]
        except KeyError:
            raise ValueError(f"No set {set_name}") 
    def _getAlnSetSeqIdxs(self, seq):
        """
        :return: the indexes of all seqs in `seq`'s aln set, with the idx of `seq` first.
        :rtype: list[int]
        """
        aln_set = self.alnSetForSeq(seq)
        seq_idx = self.index(seq)
        indexes = [seq_idx]
        if aln_set is not None:
            set_idxs = {self.index(s) for s in aln_set}
            set_idxs.remove(seq_idx)
            indexes.extend(sorted(set_idxs))
        return indexes
[docs]    def gatherAlnSets(self):
        if not self.hasAlnSets():
            return
        new_seq_ordering = self._getGatherAlnSetsReordering()
        self.reorderSequences(new_seq_ordering) 
    @msv_utils.const
    def _getGatherAlnSetsReordering(self):
        new_seq_ordering = []
        seen_idxs = set()
        for idx, seq in enumerate(self):
            if idx in seen_idxs:
                continue
            aln_set_seq_idxs = self._getAlnSetSeqIdxs(seq)
            new_seq_ordering.extend(aln_set_seq_idxs)
            seen_idxs.update(aln_set_seq_idxs)
        return new_seq_ordering
[docs]    @msv_utils.const
    def getFrequencies(self, normalize=True):
        """
        Returns the frequencies of each residue in each column.
        Residues are sorted by decreasing frequency.
        Gapped positions are not counted when calculating frequencies.
        :param normalize: Whether to normalize the values; i.e. divide by the
            number of non-gaps in the column
        :type normalize: bool
        :return: frequencies of each residue in each alignment column
        :rtype: tuple(tuple(residue.Residue, float or int)))
        """
        for column in self.columns(omit_gaps=True, match_type=True):
            if not column:
                yield tuple()
                continue
            # Initialize a list of residue strings to pass to Counter and a
            # dict to associate the residue strings with residue objects.
            # Standard amino acids are represented using 1 letter code and
            # nonstandard amino acids are represented using 3 letter code.
            res_str_list = list()
            res_str_to_res_dict = dict()
            for res in column:
                add_to_dict = True
                if res.type.nonstandard:
                    res_str = res.long_code
                else:
                    res_str = res.short_code
                    prev_res = res_str_to_res_dict.get(res_str)
                    if (prev_res is not None and
                            prev_res.type in residue.STD_AMINO_ACIDS):
                        # Don't overwrite a standard amino acid
                        add_to_dict = False
                res_str_list.append(res_str)
                if add_to_dict:
                    res_str_to_res_dict[res_str] = res
            freq_dict = collections.Counter(res_str_list)
            # Create a new most_common list containing residue objects
            most_common_res = []
            for res_str, freq in freq_dict.most_common():
                res = res_str_to_res_dict[res_str]
                if normalize:
                    freq = freq / len(column)
                most_common_res.append((res, freq))
            yield tuple(most_common_res) 
[docs]    @msv_utils.const
    def getResidueSeqProps(self, value_types=None):
        """
        Get a list of all sequence properties that any residue has. If
        'value_types' is defined, get only the specific property types listed.
        :param value_types:  list of specific properties types-  str, int or
            float etc  as structure.PROP_STRING,structure.PROP_INTEGER etc
        :type value_types: List
        :return: All the sequence properties
        :rtype: list[properties.SequenceProperty]
        """
        seq_props = []
        prop_names = set()
        desc_names = set()
        # Get all atom structure property names
        for st in self.all_structures:
            if len(st.atom):
                prop_names.update(st.getAtomPropertyNames())
        # Get all residue descriptor names
        for seq in self:
            for res in seq:
                if res.is_gap:
                    continue
                desc_names.update(res.getDescriptorKeys())
        for prop_name in prop_names:
            if value_types is not None and prop_name[0] not in value_types:
                continue
            seq_props.append(
                properties.SequenceProperty(
                    property_name=prop_name,
                    property_source=properties.PropertySource.Residue,
                    property_type=properties.PropertyType.StructureProperty,
                ))
        for desc_name in desc_names:
            seq_props.append(
                properties.SequenceProperty(
                    property_name=desc_name,
                    property_source=properties.PropertySource.Residue,
                    property_type=properties.PropertyType.Descriptor,
                ))
        return seq_props 
[docs]    @msv_utils.const
    def getSeqsDescriptors(self):
        """
        Return a list of all the calculated descriptors of the sequences in
        the alignment.
        :return: All the sequence descriptors
        :rtype: list[properties.SequenceProperty]
        """
        seq_descriptors = []
        names = set()
        for seq in self:
            for prop_source, prop_value in seq.descriptors.items():
                for name, value in prop_value.items():
                    if name in names:
                        continue
                    seq_descriptors.append(
                        properties.SequenceProperty(
                            visible=False,
                            property_name=name,
                            property_source=prop_source,
                            property_type=properties.PropertyType.Descriptor))
                    names.add(name)
        return seq_descriptors 
    @property
    def all_structures(self):
        """
        Return an iterator over all sequence structures in the alignment. This
        does not repeat structures that belong to multiple sequences.
        """
        entry_ids = set()
        for seq in self:
            # Dont return the same structure twice
            eid = seq.entry_id
            if seq.entry_id in entry_ids:
                continue
            entry_ids.add(eid)
            st = seq.getStructure()
            if st is None:
                continue
            yield st 
[docs]class AlignmentSet(set):
    """
    A named group of sequences that are always kept together in the alignment.
    """
[docs]    def __init__(self, name, set_id):
        """
        :param name: The name of the alignment set.
        :type name: str
        :param set_id: A unique integer ID for the alignment set.  Used to
            determine the color of the icon and text.
        :type set_id: int
        """
        super().__init__()
        self.name = name
        self.set_id = set_id  
class _ProteinAlignment(BaseAlignment):
    """
    A base class for split-chain and combined-chain protein alignments.
    """
    _ALN_ANNOTATION_CLASS = annotation.ProteinAlignmentAnnotations
    _SEQ_ANNOTATION_CLASS = annotation.ProteinSequenceAnnotations
    @property
    def disulfide_bonds(self):
        return tuple(set().union(*(set(seq.disulfide_bonds) for seq in self)))
    @property
    def pred_disulfide_bonds(self):
        return tuple(
            set().union(*(set(seq.pred_disulfide_bonds) for seq in self)))
    @msv_utils.const
    def _getInvalidatedBonds(self, seqs):
        """
        Return any inter-sequence bonds between a sequence in `seqs` and a
        sequence not in `seqs`.  In other words, remove any inter-sequence bonds
        that will no longer be valid after we remove `seqs` from the alignment.
        :param seqs: The sequences to remove inter-sequence bonds from
        :type seqs: set[sequence.Sequence]
        :return: A tuple of:
            - invalidated known disulfide bonds
            - invalidated predicted disulfide bonds
        :rtype: tuple(list[residue.DisulfideBond], list[residue.DisulfideBond])
        """
        raise NotImplementedError
    def _restoreInvalidatedBonds(self, invalidated_known_bonds,
                                 invalidated_pred_bonds):
        """
        Restore the invalidated bonds returned by `_getInvalidatedBonds`
        :param invalidated_known_bonds: Known disulfide bonds to restore
        :type invalidated_known_bonds: list[residue.DisulfideBond]
        :param invalidated_pred_bonds: Predicted disulfide bonds to restore
        :type invalidated_pred_bonds: list[residue.DisulfideBond]
        """
        raise NotImplementedError
[docs]class ProteinAlignment(json.JsonableClassMixin, _ProteinAlignment):
[docs]    @msv_utils.const
    def toJsonImplementation(self):
        anchor_res_indices = self._getAnchoredResidueIndices()
        interseq_bonds = [
            bond for bond in self.disulfide_bonds if bond.is_inter_sequence
        ]
        bond_idxs = [[[self.index(res1.sequence), res1.idx_in_seq],
                      [self.index(res2.sequence), res2.idx_in_seq]]
                     for res1, res2 in interseq_bonds]
        aln_sets = []
        for aln_set in self._name_to_set.values():
            seq_indices = [self.index(seq) for seq in aln_set]
            aln_sets.append([aln_set.name, aln_set.set_id, seq_indices])
        return {
            'sequences': self._sequences,
            'anchored_residues': anchor_res_indices,
            'inter_sequence_bond_idxs': bond_idxs,
            'aln_sets': aln_sets,
        } 
[docs]    @classmethod
    def fromJsonImplementation(cls, json_obj):
        seqs = [
            sequence.ProteinSequence.fromJson(json_seq)
            for json_seq in json_obj['sequences']
        ]
        aln = cls(seqs)
        aln._anchorResidueIndices(json_obj['anchored_residues'])
        for bond in json_obj['inter_sequence_bond_idxs']:
            seq_idx1, res_idx1 = bond[0]
            seq_idx2, res_idx2 = bond[1]
            aln.addDisulfideBond(aln[seq_idx1][res_idx1],
                                 aln[seq_idx2][res_idx2])
        for set_name, set_id, seq_idxs in json_obj['aln_sets']:
            seqs = [aln[idx] for idx in seq_idxs]
            aln._addSeqsToAlnSetNoReordering(seqs, set_name)
            aln._name_to_set[set_name].set_id = set_id
        return aln 
[docs]    @json.adapter(version=48002)
    def adapter48002(cls, json_dict):
        json_dict['aln_sets'] = []
        return json_dict 
[docs]    def addDisulfideBond(self, res1, res2, known=True):
        """
        Add a disulfide bond if both residues' sequences are in the alignment
        :param res1: A residue to link with a disulfide bond
        :type  res1: residue.Residue
        :param res2: Another residue to link with a disulfide bond
        :type  res2: residue.Residue
        :param known: Whether the bond is known or predicted
        :type known: bool
        :raise ValueError: if either sequence is not in the alignment
        """
        seqs = {r.sequence for r in (res1, res2)}
        if any(seq not in self for seq in seqs):
            raise ValueError(("Cannot add a disulfide bond to residues from "
                              "sequence(s) that are not in the alignment"))
        residue.add_disulfide_bond(res1, res2, known=known) 
[docs]    def removeDisulfideBond(self, bond):
        """
        Disconnect a disulfide bond. The bond may be either known or predicted.
        :param bond: The bond to disconnect
        :type  bond: residue.DisulfideBond
        :raise ValueError: if either sequence is not in the alignment
        """
        seqs = {r.sequence for r in bond}
        if any(seq not in self for seq in seqs):
            raise ValueError(
                ("Cannot remove a disulfide bond from residues from "
                 "sequence(s) that are not in the alignment"))
        residue.remove_disulfide_bond(bond) 
    def __deepcopy__(self, memo):
        aln = super().__deepcopy__(memo)
        self._copyInterchainDisulfidesTo(aln)
        return aln
    @msv_utils.const
    def _copyInterchainDisulfidesTo(self, aln):
        """
        Copy all interchain disulfide bonds (known and predicted) from this
        alignment to the given alignment.
        :param aln: The alignment to copy disulfides to.
        :type aln: ProteinAlignment
        """
        mapped_bonds = [
            self._mapResidues(bond, aln)
            for bond in self.disulfide_bonds
            if bond.is_inter_sequence
        ]
        for res1, res2 in mapped_bonds:
            aln.addDisulfideBond(res1, res2)
        mapped_pred_bonds = [
            self._mapResidues(bond, aln)
            for bond in self.pred_disulfide_bonds
            if bond.is_inter_sequence
        ]
        for res1, res2 in mapped_pred_bonds:
            aln.addDisulfideBond(res1, res2, known=False)
    def _additionalSeqRemovalSteps(self, seqs):
        # remove invalidated disulfide bonds
        invalidated_known_bonds, invalidated_pred_bonds = \
            
self._getInvalidatedBonds(seqs)
        for bond in invalidated_known_bonds + invalidated_pred_bonds:
            self.removeDisulfideBond(bond)
    @msv_utils.const
    def _getInvalidatedBonds(self, seqs):
        # See parent class for method documentation
        invalidated_known_bonds = self._getInvalidatedBondsFromBondList(
            seqs, self.disulfide_bonds)
        invalidated_pred_bonds = self._getInvalidatedBondsFromBondList(
            seqs, self.pred_disulfide_bonds)
        return invalidated_known_bonds, invalidated_pred_bonds
    def _getInvalidatedBondsFromBondList(self, seqs, bonds_to_check):
        invalidated_bonds = []
        for bond in bonds_to_check:
            res1, res2 = bond.res_pair
            if bool(res1.sequence in seqs) is not bool(res2.sequence in seqs):
                invalidated_bonds.append(bond)
        return invalidated_bonds
    def _restoreInvalidatedBonds(self, invalidated_known_bonds,
                                 invalidated_pred_bonds):
        # See parent class for method documentation
        for res1, res2 in invalidated_known_bonds:
            self.addDisulfideBond(res1, res2)
        for res1, res2 in invalidated_pred_bonds:
            self.addDisulfideBond(res1, res2, known=False)
    ############################################################################
    # IO Convenience Methods
    ############################################################################
[docs]    @classmethod
    def fromStructure(cls, ct, eid=None):
        """
        :param ct: The structure to convert
        :type ct: schrodinger.structure.Structure
        :param eid: The entry id to assign to the created sequences.  If not
            given, the entry id from the structure, if any, will be used.
        :type eid: str
        :rtype: cls
        :return: An alignment containing the sequences in the structure
        """
        from schrodinger.application.msv import seqio
        converter = seqio.StructureConverter(ct, eid)
        sequences = converter.makeSequences()
        return cls(sequences) 
[docs]    @classmethod
    def fromClustalFile(cls, file_name):
        """
        Returns alignment read from file in Clustal .aln format preserving
        order of sequences.
        :type file_name: str
        :param file_name: Source file name.
        :raises IOError: If output file cannot be read.
        :return: An alignment
        :note: The alignment can be empty if no sequence was present in the
               input file.
        """
        from schrodinger.application.msv import seqio
        return seqio.ClustalAlignmentReader.read(file_name, cls) 
[docs]    @msv_utils.const
    def toClustalFile(self, file_name, use_unique_names=True):
        """
        Writes aln to a Clustal alignment file.
        :raises IOError: If output file cannot be written.
        :type file_name: str
        :param file_name: Destination file name.
        :param use_unique_names: If True, write unique name for each sequence.
        :type use_unique_names: bool
        """
        from schrodinger.application.msv import seqio
        return seqio.ClustalAlignmentWriter.write(self, file_name,
                                                  use_unique_names) 
[docs]    @classmethod
    def fromFastaFile(cls, file_name):
        """
        Returns alignment read from file in Clustal .aln format preserving
        order of sequences.
        :raises IOError: If the input file cannot be read.
        :param file_name: name of input FASTA file
        :type file_name: str
        :return: Read alignment. The alignment can be empty if no sequence was
                present in the input file.
        :rtype: ProteinAlignment
        """
        from schrodinger.application.msv import seqio
        return seqio.FastaAlignmentReader.read(file_name, cls) 
[docs]    @classmethod
    def fromFastaString(cls, lines):
        """
        Read sequences from FASTA-formatted text, creates sequences and appends
        them to alignment. Splits sequence name from the FASTA header.
        :param lines: list of strings representing FASTA file
        :type lines: list of str
        :return: The alignment
        :rtype: ProteinAlignment
        """
        from schrodinger.application.msv import seqio
        return seqio.FastaAlignmentReader.readFromText(lines, cls) 
[docs]    @classmethod
    def fromFastaStringList(cls, strings):
        """
        Return an alignment object created from an iterable of sequence strings
        :param strings: Sequences as iterable of strings (1D codes)
        :type  strings: Iterable of strings
        :return: The alignment
        :rtype: ProteinAlignment
        """
        from schrodinger.application.msv import seqio
        return seqio.FastaAlignmentReader.readFromStringList(strings, cls) 
[docs]    @msv_utils.const
    def toFastaString(self, use_unique_names=True, maxl=50):
        """
        Convert ProteinAlignment object to list of sequence strings
        :param aln: Alignment data
        :type  aln: `ProteinAlignment`
        """
        from schrodinger.application.msv import seqio
        return seqio.FastaAlignmentWriter.toString(self, use_unique_names, maxl) 
[docs]    @msv_utils.const
    def toFastaStringList(self):
        """
        Convert self to list of fasta sequence strings
        :rtype: list
        :return: list of str
        """
        from schrodinger.application.msv import seqio
        return seqio.FastaAlignmentWriter.toStringList(self) 
[docs]    @msv_utils.const
    def toFastaFile(self, file_name, use_unique_names=True, maxl=50):
        """
        Write self to specified FASTA file
        :raises IOError: If output file cannot be written.
        """
        from schrodinger.application.msv import seqio
        seqio.FastaAlignmentWriter.write(self, file_name, use_unique_names,
                                         maxl) 
    ############################################################################
    # Utilities
    ############################################################################
[docs]    @msv_utils.const
    def findPattern(self, pattern):
        """
        Finds a specified PROSITE pattern in all sequences.
        :param pattern: PROSITE pattern to search in sequences. See
            `protein.sequence.find_generalized_pattern` for documentation.
        :type pattern: str
        :returns: List of matching residues
        :rtype: list of `protein.residue.Residue`
        """
        # Encode sequences for pattern search
        seq_dicts = [
            seq.encodeForPatternSearch(with_ss=True,
                                       with_flex=True,
                                       with_asa=True) for seq in self
        ]
        # Perform pattern search
        # Insert dashes into uppercase letters (e.g. "CMY" -> "C-M-Y")
        if pattern.isalpha() and pattern == pattern.upper():
            pattern = "-".join(pattern)
        pattern_matches = sequence.find_generalized_pattern(
            seq_dicts, str(pattern))
        if not pattern_matches:
            return []
        # Select specified residues
        selected_residues = []
        for seq_index, match_list in enumerate(pattern_matches):
            for start, end in match_list:
                matched_seq = self[seq_index]
                res_indices = [
                    i for (i, res) in enumerate(matched_seq) if res.is_res
                ]
                # select residues independent of gaps
                for res_index in res_indices[start:end]:
                    selected_residues.append(matched_seq[res_index])
        return selected_residues  
[docs]class NucleicAcidAlignment(BaseAlignment):
    # TODO (MSV-1504): Create proper nucleic acid domain objects
    pass 
class _SeqsFromTheSameProtein:
    """
    Information about sequences from the same protein.  Used in
    `CombinedChainProteinAlignment._combineSequences`.
    :ivar seqs: A list of list of sequences.  No list of sequences contains
        multiple sequences with the same chain name.
    :vartype seqs: list(list(sequence.Sequence))
    :ivar counts: A dictionary of {chain name: number of sequences with that
        chain name}
    :vartype: Counter
    """
    def __init__(self):
        self.seqs = []
        self.counts = collections.Counter()
[docs]class CombinedChainProteinAlignment(_ProteinAlignment):
    """
    An alignment containing combined-chain sequences
    (`sequence.CombinedChainProteinSequence` objects).
    """
[docs]    def __init__(self, sequences=None, *, chains_to_combine=None):
        """
        :param sequences: A list of split-chain or combined-chain sequences to
            add to the alignment.  If not given, an empty alignment will be
            created.
        :type sequences: list[sequence.ProteinSequence] or
            list[sequence.CombinedChainProteinSequence]
        :param chains_to_combine: Information about which split-chain sequences
            in `split_undoable_aln` should be included in which combined-chain
            sequence.  Should be a list of lists of indices.  Each index refers
            to the split-chain sequence at that position of
            `split_undoable_aln`, and split-chain sequences that are listed
            together will be combined into the same combined-chain sequence.
            Each split-chain sequence from `split_undoable_aln` must be
            referenced exactly once.
        :type chains_to_combine: list[list[int]]
        """
        self._split_seq_cache = None
        if chains_to_combine is None:
            super().__init__(sequences)
        else:
            super().__init__()
            self._combineAndAddSplitSeqs(sequences, chains_to_combine) 
[docs]    def addSeqs(self, seqs, start=None):
        """
        Add multiple sequences to the alignment.  Note that either single-chain
        sequences or combined-chain sequences may be added (but not both at the
        same time).
        :param sequences: Sequences to add
        :type sequences: list[sequence.ProteinSequence] or
            list[sequence.CombinedChainProteinSequence]
        :param start: The index at which to insert; if None, seqs are appended.
            Must be None if adding single-chain sequences.
        :type start: int
        """
        if not seqs:
            return []
        elif self._isCombinedChainSeq(seqs[0]):
            super().addSeqs(seqs, start)
            self._split_seq_cache = None
            return seqs
        elif start is not None:
            raise RuntimeError("Cannot specify start when adding split-chain "
                               "sequences to a combined-chain alignment.")
        else:
            combined_seqs = self._addSplitSeqs(seqs)
            self._split_seq_cache = None
            return combined_seqs 
    @msv_utils.const
    def _isCombinedChainSeq(self, seq):
        """
        Figure out if the given sequence is a single-chain or a combined-chain
        sequence.
        :param seq: The sequence to check.
        :type seq: sequence.ProteinSequence or
            sequence.CombinedChainProteinSequence
        :return: True if the sequence is combined-chain.  False if it is
            single-chain.
        :rtype: bool
        """
        return isinstance(seq, sequence.CombinedChainProteinSequence)
    def _addSplitSeqs(self, split_seqs):
        """
        Add single-chain sequences to the alignment.  If any of the sequences to
        add represent new chains of an existing combined-chain sequence, they
        will be added to the existing sequence.
        :param split_seqs: The sequences to add
        :type split_seqs: list[sequence.ProteinSequence]
        :return: A list of all combined-chain sequences (both new and modified)
            that contain the added sequences.
        :rtype: list[sequence.CombinedChainProteinSequence]
        """
        modified_combined_seqs, unmatched_split_seqs = \
            
self._addNewChainsToExistingSeqs(split_seqs)
        if unmatched_split_seqs:
            new_combined_seqs = self._combineSequences(unmatched_split_seqs)
            super().addSeqs(new_combined_seqs)
            return modified_combined_seqs + new_combined_seqs
        else:
            return modified_combined_seqs
    def _addNewChainsToExistingSeqs(self, split_seqs):
        """
        Add new single-chain sequences to existing combined-chain sequences.
        :param split_seqs: The sequences to add
        :type split_seqs: list[sequence.ProteinSequence]
        :return: A tuple of:
            - The combined-chain sequences that had new chains added
            - The single-chain sequence that were not added to any existing
              combined-chain sequences.
        :rtype: tuple(list[sequence.CombinedChainProteinSequence],
                      list[sequence.ProteinSequence])
        """
        if len(self) == 0:
            return [], split_seqs
        existing_seqs = {}
        for seq in self:
            key = (seq.entry_id, seq.name)
            existing_seqs.setdefault(key, []).append(seq)
        modified_combined_seqs = set()
        unmatched_split_seqs = []
        for seq in split_seqs:
            matching_prots = existing_seqs.get((seq.entry_id, seq.name))
            if not matching_prots:
                unmatched_split_seqs.append(seq)
                continue
            for cur_existing_seq in matching_prots:
                if not cur_existing_seq.hasChain(seq.chain):
                    modified_combined_seqs.add(cur_existing_seq)
                    cur_existing_seq.addChain(seq)
                    break
            else:
                unmatched_split_seqs.append(seq)
        return list(modified_combined_seqs), unmatched_split_seqs
    def _combineSequences(self, aln):
        """
        Create combined sequences for all of the given split-chain sequences.
        :param aln: The split-chain sequences to combine
        :type aln: Iterable(sequence.ProteinSequence)
        :return: The combined sequences.  Note that this list preserves ordering
            from `aln`.
        :rtype: list(sequence.CombinedChainProteinSequence)
        """
        grouped_seqs = []
        seq_data = collections.defaultdict(_SeqsFromTheSameProtein)
        for seq in aln:
            cur_seq_data = seq_data[seq.entry_id, seq.name]
            seq_num = cur_seq_data.counts[seq.chain]
            cur_seq_data.counts[seq.chain] += 1
            if seq_num == len(cur_seq_data.seqs):
                cur_grouped_seqs = []
                cur_seq_data.seqs.append(cur_grouped_seqs)
                grouped_seqs.append(cur_grouped_seqs)
            else:
                cur_grouped_seqs = cur_seq_data.seqs[seq_num]
            cur_grouped_seqs.append(seq)
        return [
            sequence.CombinedChainProteinSequence(seqs) for seqs in grouped_seqs
        ]
    def _combineAndAddSplitSeqs(self, chains, indices_to_combine):
        """
        Create new combined-chain sequences from the given split-chain sequences
        and add the new combined-chain sequences to the alignmment.
        :param chains: The split-chain sequences to combine and add to the
            aligment.
        :type chains: list[sequence.ProteinSequence]
        :param indices_to_combine: Information about which chains should be
            included in which combined-chain sequence.  Should be a list of
            lists of indices.  Each index refers to the chain at that position
            of `chains`, and chains that are listed together will be combined
            into the same combined-chain sequence.
        :type indices_to_combine: list[list[int]]
        """
        combined_seqs = []
        for cur_indices in indices_to_combine:
            cur_chains = [chains[i] for i in cur_indices]
            # make sure that we're only grouping together chains that are from
            # the same protein
            for chain_to_check in cur_chains[1:]:
                assert chain_to_check.name == cur_chains[0].name
                assert chain_to_check.entry_id == cur_chains[0].entry_id
            seq = sequence.CombinedChainProteinSequence(cur_chains)
            combined_seqs.append(seq)
        self.addSeqs(combined_seqs)
[docs]    def removeSeqs(self, seqs):
        """
        Remove multiple sequences from the alignment.  Note that either single-
        chain sequences or combined-chain sequences may be added (but not both
        at the same time).
        :param sequences: Sequences to remove
        :type sequences: Iterable[sequence.Sequence] or
            Iterable[sequence.CombinedChainProteinSequence]
        """
        seqs = list(seqs)
        if not seqs:
            return
        elif self._isCombinedChainSeq(seqs[0]):
            super().removeSeqs(seqs)
        else:
            self._removeSplitSeqs(seqs)
        self._split_seq_cache = None 
    def _removeSplitSeqs(self, split_seqs):
        """
        Remove single-chain sequences from the alignment.  If all chains of a
        combined-chain sequence are removed, the combined-chain sequence will be
        removed from the alignment.
        :param split_seqs: The sequences to remove
        :type split_seqs: list[sequence.ProteinSequence]
        """
        split_seqs = set(split_seqs)
        combined_seqs_to_remove = []
        chains_to_remove = []
        # Figure out what chains go with which combined-chain sequence
        for combined_seq in self:
            cur_chains = split_seqs.intersection(combined_seq.chains)
            if not cur_chains:
                continue
            if len(cur_chains) == len(combined_seq.chains):
                combined_seqs_to_remove.append(combined_seq)
            else:
                chains_to_remove.append((combined_seq, cur_chains))
            split_seqs -= cur_chains
        if split_seqs:
            raise ValueError("Cannot remove chains that are not in the "
                             "alignment")
        # actually remove the chains and sequences
        super().removeSeqs(combined_seqs_to_remove)
        for combined_seq, chains in chains_to_remove:
            combined_seq.removeChains(chains)
[docs]    @msv_utils.const
    def combinedSeqForSplitSeq(self, split_seq):
        """
        Get the combined-chain sequence that contains the given split-chain
        sequence.
        :param split_seq: The split-chain sequence
        :type split_seq: sequence.Sequence
        :return: The combined-chain sequence
        :rtype: sequence.CombinedChainProteinSequence
        """
        if self._split_seq_cache is None:
            self._split_seq_cache = {
                cur_split_seq: combined_seq for combined_seq in self
                for cur_split_seq in combined_seq.chains
            }
        return self._split_seq_cache[split_seq] 
[docs]    @msv_utils.const
    def combinedResForSplitRes(self, split_res):
        """
        Get the combined-chain residue for the given split-chain residue.
        :param res: The split-chain residue
        :type res: residue.AbstractSequenceElement
        :return: The combined-chain residue
        :rtype: residue.CombinedChainResidueWrapper
        """
        combined_seq = self.combinedSeqForSplitSeq(split_res.sequence)
        return residue.CombinedChainResidueWrapper(split_res, combined_seq) 
    @msv_utils.const
    def _assertCanMutateResidues(self, seq_i, start, end, elements):
        super()._assertCanMutateResidues(seq_i, start, end, elements)
        # Make sure that the specified indices don't span a chain break
        self[seq_i].assertCanMutateResidues(start, end)
[docs]    @msv_utils.const
    def getInterChainAnchors(self):
        """
        Return all residues that are anchored to a different chain of the
        reference sequence (e.g. a residue in the second chain anchored to a
        reference residue from the first chain).
        :return: The anchored residues.
        :rtype: set[residue.Residue]
        """
        inter_chain_anchors = set()
        ref_seq = self.getReferenceSeq()
        for ref_res, anchored_res in self._getAnchorPairs():
            ref_chain_index = ref_seq.chains.index(ref_res.split_sequence)
            anc_seq_chains = anchored_res.sequence.chains
            anc_chain_index = anc_seq_chains.index(anchored_res.split_sequence)
            if ref_chain_index != anc_chain_index:
                inter_chain_anchors.add(anchored_res)
        return inter_chain_anchors 
[docs]    def alignChainStarts(self):
        """
        Align chain starting positions (e.g. make sure that the start of the
        N-th chain occurs in the same column for all sequences).  This method
        will add gaps at the starts and/or ends of chains to preserve anchoring.
        :return: A tuple of:
            - A list of chain starting indices.  This will not include the
              starting index of the first chain, which is always 0.
            - The starting index of the first chain for which there's no
              corresponding reference chain (e.g. the starting index for the
              third chain if there are only two chains in the reference
              sequence).  This will be None if there are no chains without a
              corresponding reference chain.
        :rtype: tuple(list[int], int or None)
        """
        gaps, chain_starts, end_of_ref = self._gapsToAddToAlignChainStarts()
        self._addGapsToChainStartsAndEnds(gaps)
        return chain_starts, end_of_ref 
    @msv_utils.const
    def _gapsToAddToAlignChainStarts(self):
        """
        Figure out how many gaps need to be added to each chain start and end in
        order to align the chain starting positions (e.g. make sure that the
        start of the N-th chain occurs in the same column for all sequences).
        :return: A tuple of:
            - The numbers of gaps to add
            - A list of chain starting indices.  This will not include the
              starting index of the first chain, which is always 0.
            - The starting index of the first chain for which there's no
              corresponding reference chain (e.g. the starting index for the
              third chain if there are only two chains in the reference
              sequence).  This will be None if there are no chains without a
              corresponding reference chain.
        :rtype: tuple(GapRegionsForSeqs, list[int], int or None)
        """
        if len(self) == 0:
            return [], [], None
        anchor_offsets = self._getAnchorOffsets()
        gaps_to_add, chain_lengths = self._gapsToAddFromAnchorOffsets(
            anchor_offsets)
        chain_starts = list(itertools.accumulate(chain_lengths))[:-1]
        num_ref_chains = len(self.getReferenceSeq().chains)
        if num_ref_chains - 1 == len(chain_starts):
            # There are as many chains in the reference sequence as there are in
            # the whole alignment
            end_of_ref = None
        else:
            end_of_ref = chain_starts[num_ref_chains - 1]
        return gaps_to_add, chain_starts, end_of_ref
    def _getAnchorOffsets(self):
        """
        Determine how many residues we would need to offset the start of each
        chain relative to the equivalent chain in the reference sequence in
        order to preserve anchored residues.
        :return: Offset values given as
            `anchor_offsets[chain_index][sequence_index] = offset`.  Note that
            the chain index comes first, not the sequence index.  If a given
            chain doesn't contain any anchored residues, the offset will be
            zero.
        :rtype: list[list[int]]
        :raise AnchoredResidueError: If any inter-chain anchors are present.
            See `getInterChainAnchors` for additional information.
        """
        num_chains = max(len(seq.chains) for seq in self)
        anchor_offsets = [[0] * len(self) for _ in range(num_chains)]
        ref_seq = self.getReferenceSeq()
        for ref_res, anchored_res in self._getAnchorPairs():
            ref_chain_index = ref_seq.chains.index(ref_res.split_sequence)
            ref_res_index = ref_res.split_res.idx_in_seq
            anc_seq = anchored_res.sequence
            anc_seq_index = self.index(anc_seq)
            anc_chain_index = anc_seq.chains.index(anchored_res.split_sequence)
            anc_res_index = anchored_res.split_res.idx_in_seq
            if ref_chain_index != anc_chain_index:
                raise AnchoredResidueError()
            cur_offset = ref_res_index - anc_res_index
            anchor_offsets[anc_chain_index][anc_seq_index] = cur_offset
        return anchor_offsets
    def _gapsToAddFromAnchorOffsets(self, anchor_offsets):
        """
        Using the anchor offsets returned by `_getAnchorOffsets`, figure out how
        many gaps need to be added to each chain start and end in order to align
        the chain starting positions while preserving anchored residues.
        :param anchor_offsets: How many residues we need to offset the start of
            each chain relative to the equivalent chain in the reference
            sequence in order to preserve anchored residues.
        :type anchor_offsets: list[list[int]]
        :return: A tuple of:
            - The number of gaps to add to the start and end of each chain
            - The length that each chain will be after the gaps have been added.
        :rtype: tuple(GapRegionsForSeqs, list[int])
        """
        gaps_to_add = [[None] * len(seq.chains) for seq in self]
        chain_lengths = []
        for chain_i, (chain_anchor_offsets, chains) in enumerate(
                zip(anchor_offsets,
                    itertools.zip_longest(*(seq.chains for seq in self)))):
            # figure out how many gaps we're going to have to add before the
            # start of this chain in the reference sequence
            ref_start_gaps = -min(
                (offset for offset in chain_anchor_offsets if offset < 0),
                default=0)
            # figure out how many columns this chain is going to take up
            max_chain_length = max(ref_start_gaps + cur_anchor_offset +
                                   len(cur_chain)
                                   for cur_chain, cur_anchor_offset in zip(
                                       chains, chain_anchor_offsets)
                                   if cur_chain is not None)
            chain_lengths.append(max_chain_length)
            for seq_i, (cur_seq, cur_chain, cur_anchor_offset) in enumerate(
                    zip(self, chains, chain_anchor_offsets)):
                if cur_chain is None:
                    # this sequence doesn't have a chain in this position
                    continue
                start_gaps = ref_start_gaps + cur_anchor_offset
                if chain_i < len(cur_seq.chains) - 1:
                    end_gaps = max_chain_length - len(cur_chain) - start_gaps
                else:
                    # there's no need to add gaps to the end of the sequence
                    end_gaps = 0
                gaps_to_add[seq_i][chain_i] = sequence.GapRegion(
                    from_start=start_gaps, from_end=end_gaps)
        return gaps_to_add, chain_lengths
    def _addGapsToChainStartsAndEnds(self, gaps: sequence.GapRegionsForSeqs):
        """
        Add the specified numbers of gaps to the starts and ends of each chain
        in each sequence.
        :param gaps: The numbers of gaps to add
        """
        assert len(gaps) == len(self)
        with self.suspendAnchors():
            for seq, cur_gaps in zip(self, gaps):
                seq.addGapsToChainStartsAndEnds(cur_gaps)
    def _removeGapsFromChainStartsAndEnds(self,
                                          gaps: sequence.GapRegionsForSeqs):
        """
        Remove the specified numbers of gaps from the starts and ends of each
        chain in each sequence.
        :param gaps: The numbers of gaps to remove
        """
        assert len(gaps) == len(self)
        with self.suspendAnchors():
            for seq, cur_gaps in zip(self, gaps):
                seq.removeGapsFromChainStartsAndEnds(cur_gaps)
[docs]    def adjustChainStarts(self, adjust_by):
        """
        Move each chain break position to the right by the specified number of
        gaps.  Note that chain breaks can only be moved along gaps, not
        residues.
        :param num_gaps: The number of gaps to move each chain break by, given
            as `adjust_by[sequence_index][chain_break_index] = adjustment`.
            Note that no adjustment is given for the start of the first chain or
            the end of the last chain.
        :type num_gaps: list[list[int]]
        :raise AssertionError: If some of the sequence elements to be removed
            aren't actually gaps.
        """
        gaps_to_remove, gaps_to_add = self._adjustChainStartsToGaps(adjust_by)
        with self.suspendAnchors():
            self._removeGapsFromChainStartsAndEnds(gaps_to_remove)
            self._addGapsToChainStartsAndEnds(gaps_to_add) 
    @msv_utils.const
    def _adjustChainStartsToGaps(self, adjust_by):
        """
        Convert the value passed to `adjustChainStarts` into values that can be
        passed to `_removeGapsFromChainStartsAndEnds` and
        `_addGapsToChainStartsAndEnds`.
        :param num_gaps: The number of gaps to move each chain break by, given
            as `adjust_by[sequence_index][chain_break_index] = adjustment`.
            Note that no adjustment is given for the start of the first chain or
            the end of the last chain.
        :type num_gaps: list[list[int]]
        :return: A tuple of gaps to remove and gaps to add.
        :rtype: tuple(list[list[tuple(int, int)]], list[list[tuple(int, int)]])
        """
        gaps_to_remove = []
        gaps_to_add = []
        for seq_adjust_by in adjust_by:
            seq_gaps_to_remove = [sequence.GapRegion(0, 0)]
            seq_gaps_to_add = []
            for chain_adjust_by in seq_adjust_by:
                seq_gaps_to_add.append(sequence.GapRegion(0, chain_adjust_by))
                seq_gaps_to_remove.append(sequence.GapRegion(
                    chain_adjust_by, 0))
            seq_gaps_to_add.append(sequence.GapRegion(0, 0))
            gaps_to_remove.append(seq_gaps_to_remove)
            gaps_to_add.append(seq_gaps_to_add)
        return gaps_to_remove, gaps_to_add
    def _addElementByChain(self, seq, index, chain, element):
        """
        Add the given element to the specified chain of a sequence.
        :param seq: The sequence to insert into.
        :type seq: sequence.CombinedChainProteinSequence
        :param index: The index to insert the element at.  Note that this is the
            index in `seq`, not `chain`.
        :type index: int
        :param chain: The chain to insert into
        :type chain: sequence.ProteinSequence
        :param element: The element to insert
        :type element: str or residue.AbstractSequenceElement
        """
        self._assertCanInsert(seq, [index])
        seq.insertElementByChain(index, chain, element)
        self.signals.residuesAdded.emit([element])
    @msv_utils.const
    def _getInvalidatedBonds(self, seqs):
        # See parent class for method documentation
        # This method only cares about inter-sequence disulfides, which can't
        # happen in a combined-chain alignment.  Inter-chain disulfides are
        # always between chains in the same entry, so they'll always be within a
        # single sequence in a combined-chain alignment.
        return [], []
    def _restoreInvalidatedBonds(self, invalidated_known_bonds,
                                 invalidated_pred_bonds):
        # See parent class for method documentation
        # This method isn't needed for combined-chain alignments because
        # _getInvalidatedBonds always returns empty lists
        pass 
[docs]def get_contiguous_groups(nums):
    """
    Group numbers in a given list by contiguity. Each group that is returned
    will be a list of numbers where every value is an int that only differs
    from its neighbors by one.
    e.g. [1, 2, 4] -> [[1, 2], [4]]
         [1, 2, 4, 5, 10] -> [[1, 2], [4, 5], [10]]
    :param nums: A list of numbers to group
    :type nums: list(int)
    :return: A list of groups of numbers, where the numbers in each group are
        contiguous
    :rtype: list(list(int))
    """
    assert len(nums) > 0
    nums.sort()
    groups = []
    start = nums[0]
    while start <= nums[-1]:
        group = []
        for num in range(start, nums[-1] + 1):
            start = num + 1
            if num not in nums:
                break
            group.append(num)
        if len(group):
            groups.append(group)
    return groups