import contextlib
import enum
import itertools
from abc import ABCMeta
from abc import abstractmethod
from collections import defaultdict
from collections import namedtuple
import numpy as np
from Bio import pairwise2
from scipy.spatial import distance
from schrodinger import in_dev_env
from schrodinger import structure
from schrodinger.infra import mmerr
from schrodinger.protein import constants
from schrodinger.protein import residue
from schrodinger.protein import sequence
from schrodinger.protein.tasks import clustal
from schrodinger.protein.tasks import sta
from schrodinger.protein.tasks import structure_alignment
from schrodinger.structutils import analyze
from schrodinger.utils import scollections
ASLResult = namedtuple("ASLResult", ["ref_ok", "other_ok", "other_skips"])
[docs]class CantAlignException(Exception):
Exception raised when an aligner cannot start e.g. due to not enough seqs
[docs]class AbstractAligner(object, metaclass=ABCMeta):
Base class of objects that can perform an alignment
def _extractGapLocations(self, seq_str):
Convenience method to extract gap locations from a string of characters
Useful for interacting with backends that return results as gapped
sequences of strings
return [
ind for (ind, char) in enumerate(seq_str)
if char in constants.GAP_CHARS
[docs] @abstractmethod
def run(self, aln):
Aligns the sequences in an alignment using the parameters supplied on
Subclasses need to override this default implementation.
:param aln: The alignment to align
:type aln: `schrodinger.protein.alignment.BaseAlignment`
def __call__(self, aln, *args, **kwargs):
Convenience method to make the class instance itself a callable
""", *args, **kwargs)
[docs]class RescodeAligner(AbstractAligner):
Aligns sequences by rescode
def _fitSequenceToAllRescodes(self, seq, codes):
current_residue_index = 0
gaps = []
for index, code in enumerate(codes):
res = seq[current_residue_index]
except IndexError:
res = None
if res is not None and res.is_res and res.rescode == code:
current_residue_index += 1
return gaps
[docs] def run(self, aln):
all_codes = set()
for seq in aln:
sorted_codes = sorted(list(all_codes))
aln_gaps = []
for seq in aln:
gaps = self._fitSequenceToAllRescodes(seq, sorted_codes)
class _TB(enum.IntEnum):
An enum to keep track of the different pairwise matrices and the traceback
Match, RefGap, OtherGap = range(3)
[docs]class AbstractPairwiseAligner(AbstractAligner):
Abstract class for pairwise alignment where gaps can be merged into the
entire alignment to preserve relative alignment of all non-reference
sequences to the reference sequence.
Subclasses must implement `_getPairwiseGaps` to align one sequence to the
ref seq. Subclasses may override `_run` to customize aligning (e.g.
validation or setup of additional data needed by `_getPairwiseGaps`)
[docs] def __init__(self, preserve_reference_gaps=False):
:param preserve_reference_gaps: Whether to preserve the gaps in the
reference sequence.
:type preserve_reference_gaps: bool
self._preserve_reference_gaps = preserve_reference_gaps
self._ref_seq = None
self._other_seq = None
self._aln = None
self._gaps_orig = []
[docs] def run(self, aln, seqs_to_align=None, **kwargs):
`kwargs` are additional arguments that will be passed to `_run`.
:param aln: The alignment containing sequences to align.
:type aln: alignment.Alignment
:param seqs_to_align: The sequences in `aln` to align against the
reference sequence of `aln`. If `None`, defaults to the first
non-reference sequence in `aln` (ie `aln[1]`)
:type seqs_to_align: list(sequence.Sequence)
:raises CantAlignException: If `seqs_to_align` contains a sequence not
found in `aln`.
if len(aln) < 2:
raise ValueError("Alignment must have at least two sequences to "
f"align, got {len(aln)}.")
self._aln = aln
if seqs_to_align is None:
seqs_to_align = aln[1:2]
if any(seq not in aln for seq in seqs_to_align):
raise CantAlignException(
'Sequence(s) in seqs_to_align not found in the alignment')
self._ref_seq = aln.getReferenceSeq()
self._run(aln, seqs_to_align, **kwargs)
# Ensure that the alignment is rectangular after aligning
def _getPairwiseGaps(self, other_seq):
Subclasses must reimplement this method.
:return: Gaps to insert into ref seq and `other_seq` to align them
:rtype: tuple(list(int), list(int))
raise NotImplementedError
def _run(self, aln, seqs_to_align, **kwargs):
preserve_reference_gaps = self._preserve_reference_gaps
for seq in seqs_to_align:
ref_gaps, other_gaps = self._getPairwiseGaps(seq)
self._mergeGaps(ref_gaps, other_gaps, preserve_reference_gaps)
# When multiple sequences are selected for pairwise alignment,
# after the first non-reference sequence is aligned to reference
# sequence, the gaps in the reference sequence will be locked for
# further alignments.
preserve_reference_gaps = True
def _setUpSequences(self, other_seq):
Sanitizes sequence and initialize matrices in preparation for
the alignment.
:param other_seq: Sequence to be aligned the reference sequence.
:type other_seq: sequence.Sequence
# Keep a copy to merge original gaps into
self._gaps_orig = [g.idx_in_seq for g in self._ref_seq.getGaps()]
self._other_seq = other_seq
# Sanitize sequences before aligning
gaps = self._ref_seq.getGaps() + other_seq.getGaps()
def _getMappedGaps(from_gaps, to_gaps):
Utility function which calculates the offsets of gaps applied to
previously-aligned sequences.
:param from_gaps: Gap indices to apply to the new sequence.
:type from_gaps: list(int)
:param to_gaps: Existing gap indices of the targeted sequence
:type to_gaps: list(int)
:return: The indices of `from_gaps` offset to account for `to_gaps`
:rtype: list(int)
merged_gaps = list(from_gaps) # make a copy
for i in range(len(merged_gaps)):
for old_gap in to_gaps:
if old_gap + i > merged_gaps[i]:
merged_gaps[i] += 1 # move the old gap forward
return merged_gaps
def _mergeGaps(self, ref_gaps, other_gaps, preserve_reference_gaps):
Merge gaps into the original alignment.
:param ref_gaps: Gaps to insert into `self._ref_seq`
:param other_gaps: Gaps to insert into `self._other_seq`
:param preserve_reference_gaps: Whether to preserve the gaps in the reference
sequence.This is done to maintain the relative alignment between
the reference sequence and all the other sequences.
:type preserve_reference_gaps: bool
ref_len = len(self._ref_seq) + len(ref_gaps)
other_len = len(self._other_seq) + len(other_gaps)
if other_len > ref_len:
ref_gaps.extend(range(ref_len, other_len))
elif ref_len > other_len:
other_gaps.extend(range(other_len, ref_len))
# Add original gaps displacement for reference sequence gaps
ref_to_orig = self._getMappedGaps(ref_gaps, self._gaps_orig)
orig_to_ref = self._getMappedGaps(self._gaps_orig, ref_gaps)
other_idx = self._aln.index(self._other_seq)
ref_idx = self._aln.index(self._ref_seq)
gap_indices = [[] for s in self._aln]
gap_indices[ref_idx] = self._gaps_orig
# If we add the new other_gaps first, then we can get incorrect
# behavior from terminal gaps.
gap_indices[other_idx] = other_gaps
gap_indices = [[] for s in self._aln]
gap_indices[ref_idx] = ref_to_orig
gap_indices[other_idx] = orig_to_ref
if preserve_reference_gaps:
gap_indices = []
for seq_idx, seq in enumerate(self._aln):
cur_gap_indices = []
if seq_idx in (ref_idx, other_idx):
for gap_num, gap_idx in enumerate(ref_to_orig):
if gap_idx > len(seq) + gap_num:
# don't add gaps that are past the end of the sequence
[docs]class AbstractNWPairwiseAligner(AbstractPairwiseAligner):
Abstract class for the Needleman-Wunsch global alignment algorithm for
pairwise sequence alignment with affine gap penalties.
:cvar CONSTRAINT_SCORE: Reward amount for keeping constrained residues
:ctype CONSTRAINT_SCORE: float
:cvar RES_MATCH_BONUS: Reward amount for aligning matching residues. Used
by default if a substitution matrix is not specified.
:ctype RES_MATCH_BONUS: float
:cvar RES_MISMATCH_PENALTY: Penalty for aligning differing residues. Used by
default if a subtitution matrix is not specified
[docs] def __init__(self,
:param preserve_reference_gaps: Whether to preserve the gaps in the
reference sequence
:type preserve_reference_gaps: bool
:param gap_open_penalty: Penalty for opening a gap. Should be >=0.
:type gap_open_penalty: float
:param gap_extend_penalty: Penalty for extending a gap. Should be >=0.
:type gap_extend_penalty: float
:param sub_matrix: Scoring matrix to be used for the alignment.
If no matrix is specified, this method uses residue identity measure.
:type sub_matrix: 2D float array or dict mapping (char, char) to float
:param direct_scores: Use scoring matrix directly as (NxM) where N, M are
lengths of both sequences rather than default 20x20 substitution matrix.
:type direct_scores: bool
:param ss_constraints: Whether to constrain the alignment so
no gaps appear in middle of a secondary structure.
:type ss_constraints: bool
:param penalize_end_gaps: Whether to penalize start/end gaps
:type penalize_end_gaps: bool
if gap_open_penalty < 0 or gap_extend_penalty < 0:
raise CantAlignException('Penalty should be nonnegative')
if gap_extend_penalty > gap_open_penalty:
raise CantAlignException(
'Gap open penalty should be larger than gap extend penalty')
# Store settings
self._gap_open_penalty = gap_open_penalty
self._gap_extend_penalty = gap_extend_penalty
self._sub_matrix = sub_matrix
self._sim_matrix = None
self._direct_scores = direct_scores
self._ss_constraints = ss_constraints
self._penalize_end_gaps = penalize_end_gaps
# Set up instance attributes
self._anchored_pairs = defaultdict(list)
self._constrained_pairs = {}
self._constraints = []
def _run(self, aln, seqs_to_align, *, constraints=None):
Set up constraints and suspend anchors before running the alignment
:param constraints: Optional list of (ref_res, res) pairwise residue
constraints. Note that these constraints will be heavily favored
but are not guaranteed. Some constraints are impossible to respect
simultaneously [e.g. residues at indexes (1,1) and (0,2)]. The
first residue must belong to the reference sequence of the `aln`.
:type constraints: list of (residue.Residue, residue.Residue)
if not self._preserve_reference_gaps and len(
raise CantAlignException(
"Can't align an alignment with anchored residues without"
"preserving gaps in the reference sequence.")
if constraints is None:
constraints = []
for res1, res2 in constraints:
err_msg = ('Constraints must be a tuple of a reference residue '
'and a non-reference residue, in that order')
if res1.sequence != self._ref_seq:
raise ValueError(err_msg)
if res2.sequence not in aln:
raise ValueError(err_msg)
if res2.sequence == self._ref_seq:
raise ValueError(err_msg)
self._constraints = constraints
self._initializeAnchorPairs(aln, seqs_to_align)
with aln.suspendAnchors():
super()._run(aln, seqs_to_align, constraints=constraints)
def _initializeAnchorPairs(self, aln, seqs_to_align):
Initialize a dictionary mapping sequences to a list of anchor pairs.
Anchor pairs are defined as tuples of (reference residue, anchored
_anchored_residues = defaultdict(list)
for res in aln.getAnchoredResidues():
self._anchored_pairs = dict()
for seq in seqs_to_align:
anchors = _anchored_residues[seq]
anchor_pairs = [(self._ref_seq[a.idx_in_seq], a) for a in anchors]
self._anchored_pairs[seq] = anchor_pairs
def _getMatrixValue(self, code1, code2):
Returns the appropriate similarity score as defined by a similarity
:param code1: short code of first sequence residue
:type code1: char
:param code2: short code of second sequence residue
:type code2: char
:return: value for a residue-residue pair based on a scoring matrix
:rtype: float
if self._sub_matrix is not None:
value = self._sub_matrix[code1.upper(), code2.upper()]
value = self.RES_MATCH_BONUS if code1 == code2 else -self.RES_MISMATCH_PENALTY
return value
[docs]class SchrodingerPairwiseAligner(AbstractNWPairwiseAligner):
Implementation of the Needleman-Wunsch global alignment algorithm for
pairwise sequence alignment with affine gap penalties.
1) ability to merge new sequence with existing alignment,
2) ability to penalize gaps in secondary structure elements,
3) ability to use custom substitution matrix generated from a family of
proteins or provided by the user.
Any residues with variant residue types will have their short codes
uppercased. This means they will be treated identically to their
standard variant. If a nonstandard residue type has a lowercase short
code that doesn't match its standard variant, or if we need special
treatment for variant residues, _getMatrixValue will have to be
[docs] def __init__(self, **kwargs):
self._sim_matrix = None
self._scoring_matrix = None
self._trace_matrix = None
def _setUpSequences(self, seq):
# Initialize matrices
ref_len = len(self._ref_seq)
other_len = len(self._other_seq)
self._sim_matrix = np.zeros((ref_len, other_len))
mat_size = (ref_len + 1, other_len + 1, 3)
self._scoring_matrix = np.zeros(mat_size)
self._scoring_matrix[0, :, _TB.Match] = -float('inf')
self._scoring_matrix[:, 0, _TB.Match] = -float('inf')
self._scoring_matrix[:, 0, _TB.RefGap] = -float('inf')
self._scoring_matrix[0, 1:, _TB.RefGap] = list(range(other_len))
self._scoring_matrix[0, 1:, _TB.RefGap] *= -self._gap_extend_penalty
self._scoring_matrix[0, 1:, _TB.RefGap] -= self._gap_open_penalty
self._scoring_matrix[0, :, _TB.OtherGap] = -float('inf')
self._scoring_matrix[1:, 0, _TB.OtherGap] = list(range(ref_len))
self._scoring_matrix[1:, 0, _TB.OtherGap] *= -self._gap_extend_penalty
self._scoring_matrix[1:, 0, _TB.OtherGap] -= self._gap_open_penalty
self._scoring_matrix[0, 0, :] = 0
self._trace_matrix = np.full(mat_size, -1, np.int8)
[docs] def getAlignmentScore(self):
Get the score of the alignment. Found by taking the highest value
in the scoring matrix.
:return: Score of the pairwise alignment.
:rtype: float
score = np.max(self._scoring_matrix[-1, -1, :])
# Offset the score for the constraint bonuses used to keep
# constraints together.
score -= ((len(self._constraints) +
len(self._anchored_pairs[self._other_seq])) *
return score
def _getPairwiseGaps(self, other_seq):
Computes gaps to align a sequence against the reference sequence.
The alignment is calculated using three matrices: R, M, and O.
All have dimensions (n+1,m+1), where n is the number of residues
in the reference sequence and m is the number of residues in ther
"other" sequence. The values of the matrices represent::
R[i,j] - The score of the optimal alignment for the sequences
ref_seq[:i] and other_seq[:j] with other_seq[j-1] aligned to a gap.
M[i,j] - The score of the optimal alignment for the sequences
ref_seq[:i] and other_seq[:j] with ref_seq[i-1] aligned to other_seq[j].
O[i,j] - The score of the optimal alignment for the sequences
ref_seq[:i] and other_seq[:j] with ref_seq[i-1] aligned to a gap.
The scores are calculated as::
R[i,j] = max( R[i-1,j] - gap_extend_penalty,
M[i-1,j] - gap_open_penalty,
O[i-1,j] - gap_open_penalty)
M[i,j] = max( R[i-1,j-1] + score(ref_seq[i-1], other_seq[j-1]),
M[i-1,j-1] + score(ref_seq[i-1], other_seq[j-1]),
O[i-1,j-1] + score(ref_seq[i-1], other_seq[j-1]))
O[i,j] = max( R[i-1,j] - gap_open_penalty,
M[i-1,j] - gap_open_penalty,
O[i-1,j] - gap_extend_penalty)
`score()` is a function that typically gives a positive score if
two residues are the same or similar, and negative if they're different.
The function is defined based on the given substitution matrix and
defaults to +1 for a match and -1 for a mismatch.
:param other_seq: Sequence to align against the reference sequence
:type other_seq: sequence.Sequence
:raises CantAlignException: if `other_seq` is the reference sequence
if other_seq == self._ref_seq:
raise CantAlignException("Can't align sequence against itself")
return self._traceBack()
def _buildSimilarityMatrix(self):
Calculate scores for all residue pairs in the sequence pair.
sim_matrix = self._sim_matrix
if self._sub_matrix is not None:
if self._direct_scores:
sim_matrix = self._sub_matrix
codes1 = str(self._ref_seq).upper()
codes2 = str(self._other_seq).upper()
for (i, code1), (j, code2) in itertools.product(
enumerate(codes1), enumerate(codes2)):
sim_matrix[i, j] = self._getMatrixValue(code1, code2)
same_idxs = (
[i, j]
for (i, j) in itertools.product(range(len(self._ref_seq)),
if str(self._ref_seq[i]).upper() == str(
# Create row, column index lists for numpy array selection
for i, j in same_idxs:
sim_matrix -= self.RES_MISMATCH_PENALTY
# Prevent pairwise constraints from separating
anchor_pairs = self._anchored_pairs[self._other_seq]
constraints = self._constraints + anchor_pairs
for (res1, res2) in constraints:
if res2.sequence != self._other_seq:
res2.idx_in_seq] += self.CONSTRAINT_SCORE
def _buildScoringMatrix(self):
Iteratively builds the score matrix. See the docstring for `_getPairwiseGaps()`
for information on how it's calculated.
gap_open = self._gap_open_penalty
gap_extend = self._gap_extend_penalty
R = self._scoring_matrix[:, :, _TB.OtherGap]
M = self._scoring_matrix[:, :, _TB.Match]
O = self._scoring_matrix[:, :, _TB.RefGap] # noqa: E741
Rtb = self._trace_matrix[:, :, _TB.OtherGap]
Mtb = self._trace_matrix[:, :, _TB.Match]
Otb = self._trace_matrix[:, :, _TB.RefGap]
sim_matrix = self._sim_matrix
for (i, j) in itertools.product(range(1, len(self._ref_seq) + 1),
range(1, len(self._other_seq) + 1)): # yapf: disable
candidates = np.full((3,), -np.inf)
candidates[_TB.OtherGap] = R[i - 1, j] - gap_extend
candidates[_TB.Match] = M[i - 1, j] - gap_open
candidates[_TB.RefGap] = O[i - 1, j] - gap_open
R[i, j] = np.max(candidates)
Rtb[i, j] = np.argmax(candidates)
# Add an extra 0 to account for new subalignments starting at
# this position (this is how we differentiate between global alignment)
candidates = np.zeros((3,))
candidates[_TB.OtherGap] = R[i - 1, j - 1] + sim_matrix[i - 1, j - 1] # yapf: disable
candidates[_TB.Match] = M[i - 1, j - 1] + sim_matrix[i - 1, j - 1]
candidates[_TB.RefGap] = O[i - 1, j - 1] + sim_matrix[i - 1, j - 1]
candidates[_TB.Match] += self._ssConstraintBonus(i, j)
M[i, j] = np.max(candidates)
Mtb[i, j] = np.argmax(candidates)
candidates = np.full((3,), -np.inf)
candidates[_TB.OtherGap] = R[i, j - 1] - gap_open
candidates[_TB.Match] = M[i, j - 1] - gap_open
candidates[_TB.RefGap] = O[i, j - 1] - gap_extend
O[i, j] = np.max(candidates)
Otb[i, j] = np.argmax(candidates)
def _ssConstraintBonus(self, i, j):
Determine whether the current residues in either sequence should get a
score bonus for the matching matrix due to being part of a secondary
:param i: The one-based index of the ref_seq to consider.
:type i: int
:param j: The one-based index of the other_seq to consider.
:type j: int
:return: 0 if we're not constraining secondary structures or neither
residue is a part of a secondary structure. Otherwise, self.CONSTRAINT_SCORE
:rtype: float
if self._ss_constraints:
helix_or_strand = {structure.SS_HELIX, structure.SS_STRAND}
res1 = self._ref_seq[i - 1]
res2 = self._other_seq[j - 1]
if (res1.secondary_structure in helix_or_strand or
res2.secondary_structure in helix_or_strand):
return 0.0
def _traceBack(self):
Traces back through highest alignment path and finds optimal gap positions.
i, j = len(self._ref_seq), len(self._other_seq)
k = np.argmax(self._scoring_matrix[-1, -1, :])
ref_gaps = []
other_gaps = []
while i != 0 and j != 0:
tb = self._trace_matrix[i, j, k]
if k == _TB.OtherGap:
i -= 1
elif k == _TB.Match:
i -= 1
j -= 1
elif k == _TB.RefGap:
j -= 1
raise RuntimeError('The value for k is undefined.')
k = tb
# Add starting gaps
if abs(j - i) > 0:
if j > i:
ref_gaps.extend([0] * (j - i))
other_gaps.extend([0] * (i - j))
# Reverse gaps and convert from absolute indices to relative indices.
# Absolute gap indices don't account for gaps that come before it
# while relative gaps do.
ref_gaps = [i + pos for (pos, i) in enumerate(reversed(ref_gaps))]
other_gaps = [i + pos for (pos, i) in enumerate(reversed(other_gaps))]
return ref_gaps, other_gaps
[docs]class BiopythonPairwiseAligner(AbstractNWPairwiseAligner):
Pairwise alignment using Biopython.
Any residues with variant residue types will have their short codes
uppercased. This means they will be treated identically to their
standard variant. If a nonstandard residue type has a lowercase short
code that doesn't match its standard variant, or if we need special
treatment for variant residues, _getMatrixValue will have to be
[docs] def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if self._sub_matrix is None:
# If no sub matrix is defined, we just use an identity substitution
# matrix using all short codes.
short_codes = [restype.short_code for restype in ALL_RES_TYPES]
self._sub_matrix = {}
for c1 in short_codes:
for c2 in short_codes:
if c1 == c2:
value = self.RES_MATCH_BONUS
self._sub_matrix[c1, c2] = value
[docs] def getAlignmentScore(self):
Get the score of the alignment. Found by taking the highest value
in the scoring matrix.
:return: Score of the pairwise alignment.
:rtype: float
score = self._score
# Offset the score for the constraint bonuses used to keep
# constraints together.
score -= ((len(self._constraints) +
len(self._anchored_pairs[self._other_seq])) *
return score
def _getMatrixValue(self, res1, res2):
Returns the score for aligning `res1` and `res2`. These can either
be characters or `residue.Residue`s. If they're `residue.Residue`
objects, then we check if they're matching anchor residues and return a
large score if they are. Otherwise, we just use their short-codes by
calling `str(res).upper()`.
This is called /A LOT/ by Biopython's aligner, so if any changes
need to be made, make sure that performance is still reasonable.
:param res1: A residue to calculate a score for
:type res1: char or residue.Residue
:param res2: A residue to calculate a score for
:type res2: char or residue.Residue
:return: alignment score
:rtype: float
return self._sub_matrix[res1, res2]
except KeyError:
if self._constrained_pairs[res1] == res2:
except KeyError:
return self._sub_matrix[str(res1).upper(), str(res2).upper()]
def _transformSequenceForBiopython(self, seq):
Takes a `sequence.Sequence` and returns a list or string to pass
to Biopython's aligner. Subclasses may want to override this.
For the BiopythonPairwiseAligner, residues are converted into their
short codes unless they're anchor residues, in which case the
actual `residue.Residue` object is used.
This representation allows for fast computation in the usual case where
`_getMatrixValue` is called with two non-anchor residues. Calling
`_getMatrixValue` with an anchor residue will result in a
key error since the the substitution matrix only has string keys. By
using EAFP, we get to save on an attribute and instance check each
time `_getMatrixValue` is called with non-anchor residues, which results
in a large performance gain in the majority of cases.
:param seq: a sequence to encode for Biopython
:type seq: sequence.Sequence
:return: a list of characters and residues
:rtype: list(char or residue.Residue)
return [
str(res).upper() if res not in self._constrained_pairs else res
for res in seq
def _getPairwiseGaps(self, other_seq):
Computes gaps to align a sequence against the reference sequence.
:param other_seq: Sequence to align against the reference sequence
:type other_seq: sequence.Sequence
:raises CantAlignException: if `other_seq` is the reference sequence
if other_seq == self._ref_seq:
raise CantAlignException("Can't align sequence against itself")
ref_gaps = []
other_gaps = []
if len(self._ref_seq) != 0 and len(other_seq) != 0:
self._constrained_pairs = {}
anchor_pairs = self._anchored_pairs[self._other_seq]
constraints = self._constraints + anchor_pairs
for ref_res, other_res in constraints:
self._constrained_pairs[ref_res] = other_res
self._constrained_pairs[other_res] = ref_res
ref_seq = self._transformSequenceForBiopython(self._ref_seq)
other_seq = self._transformSequenceForBiopython(other_seq)
alignments = pairwise2.align.globalcs(
aln_ref_seq_str, aln_other_seq_str, self._score, *_ = alignments[0]
ref_gaps = self._extractGapLocations(aln_ref_seq_str)
other_gaps = self._extractGapLocations(aln_other_seq_str)
elif len(self._ref_seq) == 0 == len(other_seq):
self._score = 0
elif len(self._ref_seq) == 0:
self._score = -self._gap_open_penalty - (
len(other_seq) - 1) * self._gap_extend_penalty
ref_gaps = list(range(len(other_seq)))
elif len(other_seq) == 0:
self._score = -self._gap_open_penalty - (
len(self._ref_seq) - 1) * self._gap_extend_penalty
other_gaps = list(range(len(ref_gaps)))
return ref_gaps, other_gaps
[docs]class PrimeSTAAligner(AbstractAligner):
Sequence alignment using $SCHRODINGER/sta
[docs] def __init__(self, protein_family=None):
:param protein_family: 'GPCR' for specialized alignment or None for
default templates.
:type protein_family: str or NoneType
if protein_family not in (None, 'GPCR'):
raise ValueError(f"Invalid protein_family {protein_family}")
self._protein_family = protein_family
[docs] def run(self, aln, structured_seq=None, constraints=None):
:param aln: The alignment containing sequences to align.
:type aln: alignment.Alignment
:param structured_seq: Structured sequence to use as reference. If
None, the first non-reference seq will be aligned.
:type structured_seq: sequence.ProteinSequence or NoneType
:param constraints: Pairs of (reference_seq, structured_seq) residues to
:type constraints: list(tuple(residue.Residue, residue.Residue)) or
# TODO MSV-2728 allow aligning more than one query sequence
if structured_seq is None:
sta_ref_idx = 1
structured_seq = aln[sta_ref_idx]
sta_ref_idx = aln.index(structured_seq)
if not structured_seq.hasStructure():
raise CantAlignException(
f'Sequence {sta_ref_idx + 1} ({structured_seq.fullname}) must '
'have structure in order to align with STA')
with _StructureOnlyAlignment(aln):
query_seq = aln.getReferenceSeq()
query_idx = aln.index(query_seq)
task = sta.STATask()
task.input.ref_seq = structured_seq
task.input.query_seq = query_seq
task.input.protein_family = self._protein_family
if constraints is not None:
task.input.constraints = constraints
task.wait() # TODO PANEL-18317
if task.status is not task.DONE:
if in_dev_env():
print("Exception type:", type(task.failure_info.exception))
raise RuntimeError("STA failed")
ref_gaps, query_gaps = task.getGaps()
# Apply the gaps to the sequences:
gap_indices = [[] for s in aln]
gap_indices[sta_ref_idx] = ref_gaps
gap_indices[query_idx] = query_gaps
StructurelessGapAligner().run(aln, [query_seq, structured_seq])
[docs]class ClustalAligner(AbstractAligner):
Aligns sequences using the Clustal alignment algorithm.
[docs] def run(self, aln):
Aligns the sequences in an alignment
:param aln: The alignment to align
:type aln: `schrodinger.protein.alignment.BaseAlignment`
job = clustal.ClustalJob(aln)
new_aln =
if new_aln is not None:
new_gaps = new_aln.getGaps()
new_gap_indices = [
[g.idx_in_seq for g in seq_gaps] for seq_gaps in new_gaps
[docs]class SuperpositionAligner(BiopythonPairwiseAligner):
Align structured sequences based on their superposition.
[docs] def __init__(self, gap_open_penalty=None, gap_extend_penalty=None):
kwargs = dict(sub_matrix=None,
if gap_open_penalty is not None:
kwargs['gap_open_penalty'] = gap_open_penalty
if gap_extend_penalty is not None:
kwargs['gap_extend_penalty'] = gap_extend_penalty
self._ref_ca_positions = None
def _getMatrixValue(self, res1, res2):
Returns the score for aligning `res1` and `res2`. We reimplement this
so we can just look up the score in our own custom sub matrix
which has the direct scores between every pair of residues based
on distance.
This will also return a large bonus for aligning anchored residues.
:param res1: A residue to calculate a score for
:type res1: residue.Residue
:param res2: A residue to calculate a score for
:type res2: residue.Residue
:return: alignment score
:rtype: float
if self._constrained_pairs[res1] == res2:
except KeyError:
return self._sub_matrix[res1, res2]
def _transformSequenceForBiopython(self, seq):
Encode the sequence for Biopython by just passing a list of the
residues contained by the sequence.
:param seq: a sequence to encode for Biopython
:type seq: sequence.Sequence
:return: a list of characters and residues
:rtype: list(char or residue.Residue)
return list(seq)
def _run(self, aln, seqs_to_align, **kwargs):
Check for structured sequences and precompute ref CA positions before
ref_seq = aln.getReferenceSeq()
if not ref_seq.hasStructure() or not all(seq.hasStructure()
for seq in seqs_to_align):
raise CantAlignException(
'Sequence(s) must have structures in order to '
'align by superposition')
if len(seqs_to_align) < 1:
raise CantAlignException(
"At least two structures must be present to run an alignment.")
self._ref_ca_positions = self._getCAPositionsFromSeq(self._ref_seq)
super()._run(aln, seqs_to_align, **kwargs)
def _getPairwiseGaps(self, other_seq):
Align using a custom sub-matrix based on the CA positions of `other_seq`
other_ca_positions = self._getCAPositionsFromSeq(other_seq)
self._sub_matrix = self._getMatrixFromSuperposition(
self._ref_ca_positions, other_ca_positions)
return super()._getPairwiseGaps(other_seq)
def _getMatrixFromSuperposition(self, ca_positions_1, ca_positions_2):
Create scoring matrix based on residue distance.
:param ca_positions_1: An iterable of CA positions with None for
structureless residues
:type ca_positions_1: list(tuple(float, float, float) or None)
:param ca_positions_2: Another iterable of CA positions
:type ca_positions_2: list(tuple(float, float, float) or None)
:return: A scoring matrix
:rtype: list(list(float))
score_matrix = {}
# Build score matrix
for (res1, ca_pos_1), (res2, ca_pos_2) in itertools.product(
zip(self._ref_seq, ca_positions_1),
zip(self._other_seq, ca_positions_2)):
if ca_pos_1 is None or ca_pos_2 is None:
dist2 = 999 # dummy "far" distance for missing coords
dist2 = distance.sqeuclidean(ca_pos_1, ca_pos_2)
# Convert close dist to high score and far dist to low score
score = 20.0 / (1.0 + dist2 / 25.0)
score_matrix[res1, res2] = score
return score_matrix
def _getCAPositionsFromSeq(cls, seq):
Get XYZ coordinates representing the sequence.
:param seq: Sequence
:type seq: sequence.ProteinSequence
:return: A list of alpha carbon XYZ coordinates. If the residue does
not have an alpha carbon, the first atom will be used. If the
residue has no atoms, its coordinates will be None.
:rtype: list(tuple(float, float, float) or None)
struct_residues = (seq.getStructureResForRes(res) for res in seq)
return [
if struct_res is not None else struct_res
for struct_res in struct_residues
def _getCAPosition(struct_res):
:type struct_res: schrodinger.structure._Residue
atom = struct_res.getAlphaCarbon()
if atom is None:
atom = next(iter(struct_res.atom), None)
if atom is not None:
pos = (atom.x, atom.y, atom.z)
return pos
return None
[docs]class AbstractStructureAligner(AbstractAligner):
Subclasses must reimplement `run`:
- Call `_setUpSeqs` to set up instance attributes for the current alignment
- Call `_setASLs` to validate and store ASLs
- Call `_getUniqueEidSeqs` to get the sequences to align
- Call `_runStructureAlignment` to call the backend
Result = namedtuple("Result", ["ref_seq", "other_seq", "psd", "rmsd"])
[docs] def __init__(self, keywords=None, **kwargs):
:param keywords: Keywords to pass to the ska backend
:type keywords: dict
if keywords is None:
keywords = dict()
self.keywords = keywords
self._aln = None
self._ref_asl = None
self._other_asl = None
self._unique_eid_seqs = None
self._eid_name_dict = None
self._align_results = None
[docs] def getResultSeqs(self):
if self._align_results is None:
return []
seq_pairs = []
ref_seq = self._aln.getReferenceSeq()
ref_seq_name =
ref_eid = ref_seq.entry_id
name_eid_dict = {name: eid for eid, name in self._eid_name_dict.items()}
for name, result in self._align_results.items():
ref_seq = self._makeSequence(result.ref_string, result.ref_sse) = ref_seq_name
ref_seq.entry_id = ref_eid
other_seq = self._makeSequence(result.other_string,
result.other_sse) = structure_alignment.get_orig_seq_name(name)
other_seq.entry_id = name_eid_dict[name]
self.Result(ref_seq, other_seq, result.psd, result.rmsd))
return seq_pairs
def _setUpSeqs(self, aln, seqs_to_align):
self._aln = aln
self._unique_eid_seqs = self._getUniqueEidSeqs(aln.getReferenceSeq(),
def _setASLs(self, ref_asl, other_asl):
self._raiseForInvalidASLs(ref_asl, other_asl)
self._ref_asl = ref_asl
self._other_asl = other_asl
def _raiseForInvalidASLs(self, ref_asl, other_asl):
:param ref_asl: Reference ASL
:type ref_asl: str or NoneType
:param other_asl: Non-reference ASL
:type other_asl: str or NoneType
:raises CantAlignException: If either ASL is invalid
messages = []
if not self._isValidASL(ref_asl):
messages.append(f"Reference ASL is invalid: {ref_asl}")
if not self._isValidASL(other_asl):
messages.append(f"Non-reference ASL is invalid: {other_asl}")
if messages:
raise CantAlignException("\n".join(messages))
def _isValidASL(asl):
:rtype: bool
if asl is None:
return True
with mmerr.Capture():
return analyze.validate_asl(asl)
def _makeSequence(seq_str, sse_str):
seq = sequence.ProteinSequence(seq_str)
for res, sse in zip(seq, sse_str):
if res.is_gap:
res.secondary_structure = constants.SSA_MAP[sse]
return seq
def _validateSeqsToAlign(seqs_to_align):
if any(seq.entry_id is None for seq in seqs_to_align):
msg = ("One or more sequence(s) has an undefined entry_id. This "
"aligner can only be used with sequences imported by "
raise CantAlignException(msg)
def _getUniqueEidSeqs(ref_seq, seqs_to_align):
Get structured protein sequences with unique entry IDs
:param ref_seq: Reference sequence
:param seqs_to_align: Sequences to align
:return: iterable[sequence.ProteinSequence]
unique_eid_seqs = {ref_seq.entry_id: ref_seq}
for seq in seqs_to_align:
# TODO MSV-1504 use isinstance once NucleicAcidSequence does not
# inherit ProteinSequence
if (type(seq) not in (sequence.ProteinSequence,
sequence.CombinedChainProteinSequence) or
not seq.hasStructure()):
entry_id = seq.entry_id
if entry_id in unique_eid_seqs:
unique_eid_seqs[entry_id] = seq
return unique_eid_seqs.values()
def _runStructureAlignment(self, seqs):
:param seqs: Sequences with different structures (i.e. different
names_and_seqs = list(structure_alignment.get_unique_seq_names(seqs))
self._eid_name_dict = {
seq.entry_id: name for name, seq in names_and_seqs
names_and_strucs = [
(name, seq.getStructure()) for name, seq in names_and_seqs
result = structure_alignment.runStructureAlignment(
for (_, seq), (_, st) in zip(names_and_seqs, names_and_strucs):
self._align_results = result
[docs]class StructureAligner(AbstractStructureAligner):
Run structure alignment using the specified sequences to create chain ASLs
[docs] def run(self, aln, seqs_to_align, **kwargs):
self._setUpSeqs(aln, seqs_to_align)
ref_seq = aln.getReferenceSeq()
ref_eid = ref_seq.entry_id
ref_chains = {ref_seq.structure_chain}
other_asl_parts = defaultdict(set)
for seq in seqs_to_align:
eid = seq.entry_id
chain = seq.structure_chain
if eid == ref_eid:
ref_asl = self._makeASL(ref_eid, ref_chains)
other_asl = " or ".join(
self._makeASL(eid, chains)
for eid, chains in other_asl_parts.items())
self._setASLs(ref_asl, other_asl)
def _makeASL(eid, chains):
Create an ASL for the given entry ID and one or more chains
:param eid: Entry ID for the ASL
:param chains: Entry chains for the ASL
:return: The resulting ASL
:rtype: str
:raise ValueError: If the ASL is invalid
asl = f'( {eid} and {",".join(chains)})'
return asl
[docs]class CustomASLStructureAligner(AbstractStructureAligner):
Run structure alignment using specified ASLs
SENTINEL = object()
[docs] def __init__(self, keywords=None, ref_asl=None, other_asl=None):
self._setASLs(ref_asl, other_asl)
self._asl_result = None
[docs] def evaluateASLs(self, aln, seqs_to_align):
Determine whether the ASLs match any atoms in the sequences' structures
:param aln: Alignment
:param seqs_to_align: Sequences to align
:rtype: ASLResult
if aln is not self.SENTINEL and seqs_to_align is not self.SENTINEL:
# Internal call passes SENTINEL to indicate it already set up seqs
self._setUpSeqs(aln, seqs_to_align)
ref_seq = self._aln.getReferenceSeq()
if self._ref_asl is None:
ref_ok = True
ref_atoms = analyze.evaluate_asl(ref_seq.getStructure(),
ref_ok = bool(ref_atoms)
other_skips = []
if self._other_asl is None:
other_ok = True
other_ok = False
for seq in self._unique_eid_seqs:
if seq == ref_seq:
seq_atoms = analyze.evaluate_asl(seq.getStructure(),
if seq_atoms:
other_ok = True
result = ASLResult(ref_ok=ref_ok,
self._asl_result = result
return result
[docs] def run(self, aln, seqs_to_align, **kwargs):
self._setUpSeqs(aln, seqs_to_align)
if self._asl_result is None:
# Don't evaluate ASLs again if evaluateASLs was called before
# calling run
result = self.evaluateASLs(self.SENTINEL, self.SENTINEL)
if not (result.ref_ok and result.other_ok):
# Reset result to force evaluation if run is called again
self._asl_result = None
raise CantAlignException("No atoms matched the ASL")
seqs = [
seq for seq in self._unique_eid_seqs
if seq not in self._asl_result.other_skips
# Reset result to force evaluation if run is called again
self._asl_result = None
class _StructureOnlyAlignment(contextlib.AbstractContextManager):
Context manager to temporarily remove all structureless residues from
the alignment
:cvar LAST_RES_REF: Sentinel object to represent residues at the end of a
:vartype LAST_RES_REF: object
LAST_RES_REF = object()
structureless_block = namedtuple("structureless_block",
["next_residue", "subsequence"])
def __init__(self, aln):
self.aln = aln
def __enter__(self):
Remove and cache structureless residues
res_to_remove = []
self.all_res_to_add = scollections.DefaultIdDict(list)
for seq in self.aln:
if not seq.hasStructure():
for structureless_block in self._getStructurelessBlocks(seq):
structureless_subseq = structureless_block.subsequence
# Store the residues to remove
# Store the removed residues and a reference to re-add
return self.aln
def __exit__(self, exc_type, exc, exc_tb):
Restore structureless residues
for seq, res_to_add in self.all_res_to_add.items():
for structureless_block in res_to_add:
next_res = structureless_block.next_residue
residues = structureless_block.subsequence
if next_res == self.LAST_RES_REF:
# Blocks at the end of the sequence have no next res
res_i = len(seq)
res_i = next_res.idx_in_seq
self.aln.addElements(seq, res_i, residues)
def _getStructurelessBlocks(self, seq):
Find continuous blocks of structureless residues and the next
structured res, if any.
:return: Iterable of (next structured res, structureless residues)
:rtype: iterable(namedtuple(Residue, list(Residue)))
structureless_subseq = []
for res in seq:
if not res.hasStructure() and res.is_res:
elif structureless_subseq:
yield self.structureless_block(res, structureless_subseq)
structureless_subseq = []
if structureless_subseq:
# Blocks at the end of the sequence have no next res
yield self.structureless_block(self.LAST_RES_REF,
[docs]class MaxIdentityAligner(BiopythonPairwiseAligner):
Pairwise aligner that maximizes the number of matching residues between
two sequences. There are no penalties for mismatches or gaps.
class _IdentityDict(dict):
Dictionary that takes 2-tuples and returns 1 if the elements of the
tuple are the same and 0 otherwise.
def __getitem__(self, key):
assert isinstance(key, tuple)
assert len(key) == 2
if key[0] == key[1]:
return 1
return 0
[docs] def __init__(self):
[docs] def run(self, aln):
if len(aln) != 2:
raise ValueError(f'{self.__class__.__name__} can only align pairs '
'of sequences.')
[docs]class StructurelessGapAligner(AbstractAligner):
Align all structureless residues with gaps
For example, given the following alignment (where circled letters are
structureless residues):
Resnum: 0 1 2 3 4 5
Seq1: Ⓐ Ⓡ Ⓒ A D E
Seq2: Ⓒ Ⓐ Ⓝ A D A
The result will be:
Resnum: 0 1 2 3 4 5 6 7 8
Seq1: ~ ~ ~ Ⓐ Ⓡ Ⓒ A D E
Seq2: Ⓒ Ⓐ Ⓝ ~ ~ ~ A D A
[docs] def run(self, aln, seqs_to_align=None):
if seqs_to_align is None:
seqs_to_align = aln
n_seqs = len(aln)
col_idx = 0
while col_idx <= aln.num_columns:
column = aln.getColumn(col_idx)
structureless_seq_idx = None
for seq_idx in reversed(range(n_seqs)):
seq = aln[seq_idx]
if not seq.hasStructure() or seq not in seqs_to_align:
res = column[seq_idx]
if res is not None and res.is_res and not res.hasStructure():
structureless_seq_idx = seq_idx
if structureless_seq_idx is not None:
# Add a gap for this structureless res to all other seqs
new_gaps = [[] for _ in aln]
for s_idx, seq in enumerate(aln):
if s_idx != structureless_seq_idx and col_idx < len(seq):
col_idx += 1