import collections
import contextlib
import csv
import itertools
import os
import re
import warnings
from functools import partial
import requests
from Bio.Seq import Seq as BioSeq
from Bio.SeqRecord import SeqRecord as BioSeqRecord
from requests.exceptions import HTTPError
# We don't currently support SSL verification
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from schrodinger import structure
from schrodinger.infra import mm
from schrodinger.protein import alignment
from schrodinger.protein import getpdb_utility
from schrodinger.protein import residue
from schrodinger.protein import seqres
from schrodinger.protein import sequence
from schrodinger.protein.annotation import ProteinSequenceAnnotations as PSAnno
from schrodinger.structutils import analyze
from schrodinger.structutils.interactions.ssbond import get_disulfide_bonds
from schrodinger.utils import csv_unicode
from schrodinger.utils import fileutils
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
FASTA_COMMENT_CHARS = {'>', ';'}
PERMISSIVE_CHAIN_RE = re.compile(
r"""
^(?P<name>\S+) # Non-whitespace name
[-.:_] # Separator
(?P<id>[A-Za-z])$ # Single letter chain
""", re.VERBOSE)
UNIPROT_RE = re.compile( # https://www.uniprot.org/help/accession_numbers
r"^[OPQ][0-9][A-Z0-9]{3}[0-9]$|[A-NR-Z][0-9]([A-Z][A-Z0-9]{2}[0-9]){1,2}$",
re.IGNORECASE)
SWISS_PROT_RE = re.compile( # https://www.uniprot.org/help/entry_name
r"[A-Z0-9]{1,5}_[A-Z0-9]{2,5}$", re.IGNORECASE)
HAS_DIGIT_RE = re.compile(r"\d")
# This dict is keyed by a string to be used in header strings.
EXPORT_ANNOTATIONS = {'|SSA': PSAnno.ANNOTATION_TYPES.secondary_structure}
NAME_FLAG = "NAME:"
CHAIN_FLAG = "CHAIN:"
NAME_STR = "%s{name}|%s{chain}" % (NAME_FLAG, CHAIN_FLAG)
SIM_STR = "|ID:{id:.2F}|SIM:{sim:.2f}|HOM:{hom:.2f}"
NCBI_FLAGS = {
"gb", "emb", "dbj", "pir", "prf", "pdb", "pat", "bbs", "gnl", "ref", "lcl",
'sp', 'Swiss-Prot'
}
FetchIDs = collections.namedtuple("FetchIDs", "pdb entrez uniprot")
INVALID_SSA_VAL = object()
SSA_MAP = {
'C': structure.SS_NONE,
'H': structure.SS_HELIX,
'E': structure.SS_STRAND,
'-': None,
}
SSA_MAP_RV = {v: k for k, v in SSA_MAP.items()}
SEQD_REQUIRED_COLUMNS = ('ResID', 'Chain', 'ResName')
[docs]class SequenceWarning(UserWarning):
"""
Custom warning for problems loading sequences
"""
[docs]class catch_sequence_warnings(contextlib.ExitStack):
"""
Filter SequenceWarnings and store them on the instance
"""
[docs] def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._caught_warnings = []
self.message = ""
def __enter__(self):
self._caught_warnings = self.enter_context(
warnings.catch_warnings(record=True))
return self
def __exit__(self, exc_type, exc_value, traceback):
super().__exit__(exc_type, exc_value, traceback)
if exc_type is None:
msg_parts = []
for cur_warning in self._caught_warnings:
if isinstance(cur_warning.message, SequenceWarning):
msg_parts.append(str(cur_warning.message))
else:
warnings.showwarning(cur_warning.message,
cur_warning.category,
cur_warning.filename,
cur_warning.lineno)
self.message = "\n".join(msg_parts)
def _partition_by_predicate(arr, pred):
"""
Utility function to groups a list into lists. Each sublist begins with an
element that matches the supplied predicate; or if none exist, the sublist
will only contain elements that don't match the predicate.
For example, partitioning by even values would map [1, 2, 3, 4, 5, 6] to
[1], [2, 3], [4, 5], [6].
Note that [1] is yielded despite 1 not being an even number, because there
was no even number before 1.
Note also that many file reading functions below would benefit from using
this function.
:param arr: A list to split into sublists
:type arr: list
:param pred: A function that takes a list item and returns True if the list
item meets a criteria and False otherwise
:type pred: function
"""
sublist = []
for item in arr:
if pred(item) and sublist:
yield sublist
sublist = []
sublist.append(item)
if sublist:
yield sublist
################################################################################
# Downloaders
################################################################################
[docs]class GetSequencesException(IOError):
"""
Custom Exception for problems retrieving sequences.
"""
PdbParts = collections.namedtuple("PdbParts", ("pdbcode", "pdbchain"))
FastaParts = collections.namedtuple("FastaParts",
("name", "long_name", "chain", "anno_type"))
[docs]def make_maestro_pdb_id(pdb_id):
"""
Convert a PDB ID to ":"-separated PDB code and PDB chain
(e.g. 4hhb if chain is blank or 4hhb:A)
:param pdb_id: PDB ID with optional chain, e.g. 4hhb, 4hhbA, 4hhb:A, 4hhb_A
:type pdb_id: str
:return: PDB ID with ":" between PDB code and PDB chain
:rtype: str
"""
pdbparts = parse_pdb_id(pdb_id)
pdb_part_list = [pdbparts.pdbcode.lower()]
if pdbparts.pdbchain != "":
pdb_part_list.append(pdbparts.pdbchain)
return ":".join(pdb_part_list)
[docs]def parse_pdb_id(pdb_id, permissive=False):
"""
Parse a PDB ID into a (pdb code, pdb chain) Named tuple.
:param pdb_id: PDB ID with optional chain, e.g. 4hhb, 4hhbA, 4hhb:A, 4hhb_A
:type pdb_id: str
:param permissive: Whether to use permissive parsing.
In strict mode, PDB ID must be 4 characters starting with a digit
and single-letter chain is optional.
In permissive mode, PDB ID can contain any non-whitespace characters
but chain separator and single-letter chain are required.
:type permissive: bool
:return: Named tuple of (pdbcode, pdbchain)
:type: PdbParts
:raises GetSequencesException: if pdb_id can't be parsed
"""
try:
return _parse_pdb_id(pdb_id)
except GetSequencesException:
if not permissive:
raise
else:
match = PERMISSIVE_CHAIN_RE.match(pdb_id)
if match:
groupdict = match.groupdict()
return PdbParts(groupdict["name"], groupdict["id"])
raise
def _parse_pdb_id(pdb_id):
"""
Parse a PDB ID into a (pdb code, pdb chain) Named tuple.
:param pdb_id: PDB ID with optional chain, e.g. 4hhb, 4hhbA, 4hhb:A, 4hhb_A
PDB code must be 4 characters starting with a digit. Chain must be blank
or a single letter.
:type pdb_id: str
:return: Named tuple of (pdbcode, pdbchain)
:type: PdbParts
:raises GetSequencesException: if pdb_id can't be parsed
"""
pdb_id = pdb_id.strip()
if not 4 <= len(pdb_id) <= 6:
raise GetSequencesException(f"Wrong length PDB code: {pdb_id}")
if not pdb_id[0].isdigit():
raise GetSequencesException(
f"First character of a PDB ID must be a digit: {pdb_id}")
if not re.match(r"^[0-9a-zA-Z]{3}$", pdb_id[1:4]):
raise GetSequencesException(
"Last 3 characters of a PDB ID must be digits or ASCII letters: "
f"{pdb_id}")
pdbparts = PdbParts(pdbcode=pdb_id[:4], pdbchain="")
# A PDB code has length 4
if len(pdb_id) == 4:
return pdbparts
else:
# If there are 5 or 6 characters, the last should be the chain ID
pdbparts = pdbparts._replace(pdbchain=pdb_id[-1])
if not pdbparts.pdbchain.isalnum():
raise GetSequencesException("Invalid PDB chain: %s" % pdb_id)
# With length 6, there should be a non-alnum separator
elif len(pdb_id) == 6 and pdb_id[4].isalnum():
raise GetSequencesException("Invalid PDB code: %s" % pdb_id)
return pdbparts
[docs]def get_valid_pdb_id_map_for_seqs(seqs, structureless_only=True):
"""
For a list of sequences return a map of valid PDB IDs to sequences.
:param seqs: List of sequences to get the map for
:type seqs: list(sequence.Sequence)
:param structureless_only: Whether to only return structureless seqs
:type structureless_only: bool
:return: Map of valid PDB IDs to their source sequence
:rtype: dict(str: sequence.Sequence)
"""
valid_pdb_id_map = {}
for seq in seqs:
if structureless_only and seq.hasStructure():
continue
for candidate_id in [seq.pdb_id, seq.name, seq.long_name]:
if candidate_id is None:
continue
try:
pdbparts = parse_pdb_id(candidate_id, True)
except GetSequencesException:
continue
candidate_id = pdbparts.pdbcode + pdbparts.pdbchain
if len(candidate_id) == 4 and seq.chain:
candidate_id += seq.chain
valid_pdb_id_map[candidate_id] = seq
break
return valid_pdb_id_map
[docs]def valid_pdb_id(pdb_id: str) -> bool:
"""
:return: Whether the ID appears to be a valid PDB ID
"""
try:
_parse_pdb_id(pdb_id)
except GetSequencesException:
return False
return True
[docs]def valid_entrez_id(entrez_id: str) -> bool:
"""
Entrez ID may be:
1) NCBI Accession number: 9 or 12 characters starting with any letter,
followed by `"P_"`, ending with 6 or 9 numbers and an optional
number following a period (ex. NP_123456, XP_123456789.1)
2) NCBI GenInfo identifier: A single 9-digit number (ex. 123456789).
:return: Whether the ID appears to be a valid Entrez ID
"""
entrez_id = entrez_id.strip()
ch_id = ''
if '.' in entrez_id:
parts = entrez_id.split('.')
if len(parts) != 2:
# Too many '.' chars
return False
entrez_id, ch_id = parts
if not ch_id.isdigit():
return False
id_len = len(entrez_id)
if id_len not in [9, 12]:
return False
if id_len == 9 and entrez_id.isdigit():
if ch_id:
return False
return True
elif not entrez_id[0].isalpha():
return False
elif not entrez_id[1:3].upper() == "P_":
return False
elif not entrez_id[3:].isdigit():
return False
return True
[docs]def valid_uniprot_id(uniprot_id: str) -> bool:
"""
UniProt ID must be 6 characters or 10 characters
starting with a letter
:return: Whether the ID appears to be a valid UniProt ID
"""
uniprot_id = uniprot_id.strip()
return bool(UNIPROT_RE.match(uniprot_id))
[docs]def valid_swiss_prot_name(swiss_prot_name: str) -> bool:
"""
Swiss-Prot entry name must be of the form X_Y, where X and Y are at most
5 alphanumeric characters and the underscore serves as a separator.
We also require Y to be a minimum of 2 characters to avoid confusion with a
PDB ID.
:return: Whether the name appears to be a valid Swiss-Prot entry name
"""
swiss_prot_name = swiss_prot_name.strip()
return bool(SWISS_PROT_RE.match(swiss_prot_name))
[docs]def process_fetch_ids(ids, *, dialog_parent, allow_pdb=True):
"""
Convenience method to parse a list or comma-separated strings into valid
sequence and/or structure identifiers. If any IDs can't be identified,
prompt the user to continue.
:param ids: Database ID or IDs (comma-separated str or list)
:type ids: str or list
:param dialog_parent: Parent to show dialog box
:type dialog_parent: QtWidgets.QWidget
:param allow_pdb: Whether to allow structure identifiers. If False, they
will be treated as unidentified.
:type allow_pdb: bool
:return: Namedtuple of IDs identified as PDB, entrez, uniprot; or None if
there are unidentified IDs and the user cancels.
:rtype: FetchIDs or NoneType
"""
error_ids = []
if isinstance(ids, str):
ids = ids.split(",")
ids = [word.strip() for word in ids]
pdb_like_ids = []
entrez_like_ids = []
uniprot_like_ids = []
for db_id in ids:
if valid_entrez_id(db_id):
entrez_like_ids.append(db_id)
elif (valid_uniprot_id(db_id) or valid_swiss_prot_name(db_id)):
uniprot_like_ids.append(db_id)
elif allow_pdb and valid_pdb_id(db_id):
pdb_like_ids.append(db_id)
else:
error_ids.append(db_id)
if error_ids:
from schrodinger.ui.qt import messagebox
ok = messagebox.show_question(
dialog_parent, "Some of the entered codes are invalid: "
f"{', '.join(error_ids)}. "
"Only the valid codes will be fetched. Continue?",
title="Fetch")
if not ok:
return None
return FetchIDs(pdb_like_ids, entrez_like_ids, uniprot_like_ids)
[docs]def maestro_get_pdb(maestro_pdb_id, pdb_dir=None, remote_ok=False):
"""
Download a PDB file. If specified, the chain will be split out into a
separate file.
:param maestro_pdb_id: 4-letter PDB code or code:chain (e.g. 4hhb or 4hhb:A)
:type maestro_pdb_id: str
:param pdb_dir: directory to check for existing files and destination to
download new files
:type pdb_dir: str
:param remote_ok: whether it's okay to make a remote query.
:type remote_ok: bool
:return: downloaded PDB path
:rtype: str
:raises GetSequencesException: if pdb file can't be downloaded
"""
file_pdb_id = maestro_pdb_id.replace(":", "_")
if pdb_dir is None:
pdb_dir = os.getcwd()
pdb_path = os.path.join(pdb_dir, "%s.pdb" % file_pdb_id)
if not os.path.isfile(pdb_path):
with fileutils.chdir(pdb_dir):
args = [maestro_pdb_id]
if not remote_ok:
args.append('-l')
error_code = getpdb_utility.main(args)
if error_code:
# Remove downloaded file with error
fileutils.force_remove(pdb_path)
raise GetSequencesException("An error occurred downloading %s" %
file_pdb_id)
if not os.path.isfile(pdb_path):
raise GetSequencesException("File not downloaded: %s" % pdb_path)
return pdb_path
[docs]class SeqDownloader(object):
ENTREZ_FORMAT_STR = (
"https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?" +
"db=protein&rettype=fasta&id={ID}")
UNIPROT_FORMAT_STR = "https://www.uniprot.org/uniprot/{ID}.{EXT}"
[docs] @classmethod
def downloadPDB(cls, pdb_id, pdb_dir=None, remote_ok=False):
"""
Parse PDB ID string and download PDB file.
:param pdb_id: PDB ID with optional chain (e.g. 4hhb, 4hhbA, 4hhb:A)
:type pdb_id: str
:param pdb_dir: directory to check for existing files and destination to
download new files
:type pdb_dir: str
:param remote_ok: whether it's okay to make a remote query.
:type remote_ok: bool
:return: Full path to downloaded PDB path
:type: str
:raises GetSequencesException: if pdb file can't be downloaded
"""
maestro_pdb_id = make_maestro_pdb_id(pdb_id)
return maestro_get_pdb(maestro_pdb_id,
pdb_dir=pdb_dir,
remote_ok=remote_ok)
[docs] @classmethod
def downloadEntrezSeq(cls, sequence_id, remote_ok):
"""
Download a sequence from Entrez database.
:param sequence_id: Sequence ID in Entrez format.
:type sequence_id: str
:param remote_ok: whether it's okay to make a remote query.
:type remote_ok: bool
:return: Full path to downloaded fasta file
:rtype: str
"""
fasta_path = cls._downloadSeqFromUrl(cls.ENTREZ_FORMAT_STR, sequence_id,
remote_ok)
return fasta_path
[docs] @classmethod
def downloadUniprotSeq(cls, sequence_id, remote_ok, *, use_xml=False):
"""
Download a sequence from Uniprot database.
:param sequence_id: Sequence ID in Uniprot format.
:type sequence_id: str
:param remote_ok: whether it's okay to make a remote query.
:type remote_ok: bool
:param use_xml: whether to get the xml file with the full UniProt
annotation information (e.g. domains). Setting this to True with
download the xml file *instead* of the FASTA file.
:type use_xml: bool
:return: Full path to downloaded fasta or xml file
:rtype: str
"""
ext = "xml" if use_xml else "fasta"
path = cls._downloadSeqFromUrl(cls.UNIPROT_FORMAT_STR,
sequence_id,
remote_ok,
ext=ext)
return path
@staticmethod
def _downloadSeqFromUrl(url_format_str,
sequence_id,
remote_ok,
*,
ext="fasta"):
"""
Downloads a sequence using a url_format_str and a seq ID
:param url_format_str: Server url format string
:type url_format_str: str
:param sequence_id: The sequence ID based on what DB we're req'ing from
:type sequence_id: str
:param remote_ok: whether it's okay to make a remote query.
:type remote_ok: bool
:param ext: File extension
:type ext: str
:return: The full path of the saved file or None if the file
couldn't be downloaded
:rtype: str or NoneType
:raises GetSequencesException: if a bad response is received from the
db hit or the response times out.
"""
file_name = f"{sequence_id}.{ext}"
# If file already exists, just load it, otherwise query the database
full_path = os.path.abspath(file_name)
if not os.path.isfile(full_path):
if not remote_ok:
return None
url = url_format_str.format(ID=sequence_id, EXT=ext)
try:
resp = requests.get(url,
timeout=30.0,
stream=True,
verify=False)
resp.raise_for_status()
except HTTPError:
raise GetSequencesException('Bad response. Status code: %d' %
resp.status_code)
except requests.exceptions.Timeout:
raise GetSequencesException('Request took too long to respond.')
except requests.exceptions.RequestException as exc:
raise GetSequencesException from exc
else:
if resp.text == '':
raise GetSequencesException('Empty response returned')
with open(full_path, "wb") as out_file:
for chunk in resp.iter_content(chunk_size=1024):
if chunk: # Filter out empty keep-alive chunks
out_file.write(chunk)
return full_path
################################################################################
# Readers
################################################################################
[docs]def read_sequences(filename):
"""
Read sequences from the filename. Format is detected from the file extension
Note that this function is only used for non-structure filetypes. For
structure filetypes, see the StructureConverter class.
:param filename: Path to sequence file
:type filename: str
:rtype: list
:return: A list of sequences in the file
"""
if (fileutils.get_sequence_file_format(filename) is
fileutils.SeqFormat.fasta):
return list(FastaAlignmentReader.read(filename))
elif filename.endswith(".aln"):
return list(ClustalAlignmentReader.read(filename))
elif filename.endswith(".seqd"):
return SeqDReader.read(filename)
else:
with MMSequenceConverter() as conv:
return conv.readSequences(filename)
[docs]def from_biopython(biopy_seq):
"""
Convert a Biopython sequence to a ProteinSequence
:param seq: A Biopython sequence to convert to a ProteinSequence
:type seq: Bio.SeqRecord.SeqRecord
:return: The converted sequence
:rtype: schrodinger.protein.sequence.ProteinSequence
"""
return sequence.make_sequence(biopy_seq.seq, name=biopy_seq.name)
[docs]class StructureConverter(object):
"""
Reads a structure and converts it to a list of sequences.
Note that this class produces sequences that are ordered based on residue
number and insertion code, not connectivity. If that ever changes,
`structure_model.MaestroStructureModel._extractChains` must also be updated.
"""
[docs] def __init__(self, ct, eid=None):
"""
:param ct: A structure to convert to sequences.
:type ct: schrodinger.structure.Structure
:param eid: The entry id to assign to the created sequences. If not
given, the entry id from the structure will be used.
:type eid: str
"""
self.ct = ct
self.eid = eid
[docs] @classmethod
def convert(cls, ct, eid=None):
"""
Convert the provided structure into a list of sequences.
:param ct: A structure to convert to sequences.
:type ct: schrodinger.structure.Structure
:param eid: The entry id to assign to the created sequences. If not
given, the entry id from the structure will be used.
:type eid: str
:return: A list of sequences, one per chain.
:rtype: list[sequence.Sequence]
"""
converter = cls(ct, eid)
return converter.makeSequences()
[docs] def makeSequences(self):
"""
Note that disulfide bonds might be between chains, so need to be
calculated at the ct level
:return: A list of sequences, one per chain.
:rtype: list[sequence.Sequence]
"""
chains = self._extractChains()
# Prepare disulfide bond maps
ssbond_atoms = get_disulfide_bonds(self.ct)
chain_res_ssatom_map = collections.defaultdict(dict)
for pair in ssbond_atoms:
for atom in pair:
chain_res_ssatom_map[atom.chain][atom.getResidue()] = atom
seqs, ssatom_res_map = self._convertChainsToSeqs(
chains, ssatom_map=chain_res_ssatom_map)
# Store inter- and intra-sequence disulfide bonds in residues
for atom1, atom2 in ssbond_atoms:
res1 = ssatom_res_map[atom1]
res2 = ssatom_res_map[atom2]
# Res will be None if the atom's res was skipped (e.g. non-protein)
if (res1 is not None and res2 is not None and
res1.disulfide_bond is None and
res2.disulfide_bond is None):
residue.add_disulfide_bond(res1, res2)
return seqs
def _extractChains(self):
"""
Extract information from a structure and returns a list of chains
as a lists of residues. This method mimics the behavior of
`structure._ChainIterator` but caches information during the atom
traversal to improve performance. See MSV-1317.
:return: A list of lists (one for each chain)
:rtype: list(list(structure._Residue))
"""
ct = self.ct
residue_atom_set = set(
analyze.evaluate_asl(ct, "protein OR nucleic_acids"))
# Atom indices that we assigned to a residue already
seen_atom_indices = set()
chain_res = collections.defaultdict(
list) # map of chains to their residues
for at in ct.atom:
ai = at.index
if ai in seen_atom_indices:
continue
if ai not in residue_atom_set:
continue
res = at.getResidue()
resatom_indices = set(res.getAtomIndices())
seen_atom_indices.update(resatom_indices)
chain_res[at.chain].append(res)
# if the ordering of residues is changed here,
# structure_model.MaestroStructureModel._extractChains and
# _createSeqForNewChain must also be updated
for chain in chain_res.values():
chain.sort(key=lambda r: (r.resnum, r.inscode))
return chain_res.values()
def _isAlphaCarbon(self, atom):
"""
:param atom: An atom index to check.
:type atom: int
:return: Whether the atom is an alpha carbon
:rtype: bool
"""
return mm.mmct_atom_get_pdbname(self.ct, atom) == ' CA '
def _convertChainsToSeqs(self, chains, ssatom_map):
"""
Takes a list of lists containing `structure._Residue` and converts
them into `sequence.Sequence`
:param chains: A list of lists, each representing a chain
:type chains: list(list(structure._Residue))
:param ssatom_map: Mapping of chain name to structure residue to
structure atom involved in disulfide bond.
:type ssatom_map: dict(str, dict(structure._Residue, structure._Atom))
:return: A tuple with a list of sequences and a map between structure
atoms in disulfide bonds and sequence elements.
:rtype: tuple(list(sequence.Sequence),
dict(structure._Atom, residue.Residue))
"""
seqs = []
ssatom_res_map = {}
props = self.ct.property
pdb_id = props.get("s_pdb_PDB_ID")
entry_name = props.get("s_m_entry_name")
name, long_name = _get_name_and_description(props)
protein_atoms = set(analyze.evaluate_asl(self.ct, '(protein)'))
seqres_by_chain = seqres.get_seqres(self.ct)
eid = self.eid or self.ct.atom[1].entry_id
for sresidues in chains:
chain_name = sresidues[0].chain
sres_ssatom_map = ssatom_map.get(chain_name, {})
SeqClass = self._detectSequenceType(sresidues)
make_res = SeqClass.makeSeqElement
# TODO MSV-1504: Change this condition to
# if issubclass(SeqClass,sequence.ProteinSequence)
if SeqClass == sequence.ProteinSequence:
kept_sresidues = []
for res in sresidues:
if (res.atom[1].index in protein_atoms or
res.pdbres.upper().strip()
in residue.CAPPING_GROUP_ALPHABET):
kept_sresidues.append(res)
else:
# Explicitly store None to indicate that residue was
# filtered out
ssatom = sres_ssatom_map.get(res)
if ssatom is not None:
ssatom_res_map[ssatom] = None
sresidues = kept_sresidues
# Add in any residues that appear in SEQRES records but aren't in
# the structure
if seqres_by_chain is not None and chain_name in seqres_by_chain:
seqres_res = seqres_by_chain[chain_name]
def get_short_code(ele_str, default=None):
ele_str = ele_str.strip().upper()
ele_type = SeqClass.ALPHABET.get(ele_str)
if ele_type is None:
return default
# Compare by short code to allow for protonation differences
return ele_type.short_code
# Use different invalid symbols for unknown amino acids
unk1, unk2 = "%", "&"
seqres_str = "".join(
get_short_code(ele, unk1) for ele in seqres_res)
pdbres_str = "".join(
get_short_code(res.pdbres, unk2) for res in sresidues)
if pdbres_str in seqres_str:
# Prior to 20-3, Prime populated the i_pdb_seqres_index
# property with meaningless numbers for modeled loops. (See
# PRIME-4376.) To avoid trying to parse these values, we
# skip adding missing loops if the structured sequence is a
# subset of the SEQRES.
sresidues = self._addStructurelessTerminalOnly(
sresidues, seqres_res, make_res)
else:
sresidues = self._addStructureless(sresidues, seqres_res,
make_res)
get_sres_key = partial(residue.get_structure_residue_key,
entry_id=eid)
residue_map = {}
residues = []
non_unique = []
for sres in sresidues:
new_res = self.convertStructResidue(sres, make_res)
residues.append(new_res)
if eid is not None and not isinstance(sres, residue.Residue):
res_key = get_sres_key(sres)
if res_key in residue_map:
non_unique.append(res_key)
residue_map[res_key] = new_res
ssatom = sres_ssatom_map.get(sres)
if ssatom is not None:
ssatom_res_map[ssatom] = new_res
if non_unique:
rks = (f"{rk.resnum}{rk.inscode.strip()}" for rk in non_unique)
msg = (f"Found non-unique residues {', '.join(rks)} "
f"in chain {chain_name}")
warnings.warn(SequenceWarning(msg))
seq = SeqClass(residues,
name=name,
chain=chain_name,
structure_chain=chain_name,
long_name=long_name,
entry_id=eid,
entry_name=entry_name,
pdb_id=pdb_id,
origin=SeqClass.ORIGIN.Maestro)
if eid is not None:
seq.setResidueMap(residue_map)
seqs.append(seq)
return sorted(seqs, key=lambda s: s.chain), ssatom_res_map
def _detectSequenceType(self, struct_residues):
"""
Takes a list of structure residues and returns the sequence type for it.
:param struct_residues: A list of residues
:type struct_residues: list(structure._Residue)
:return: A sequence class
:rtype: type
"""
res_names = {res.pdbres.upper().strip() for res in struct_residues}
return sequence.guess_seq_type(res_names)
[docs] @classmethod
def convertStructResidue(cls, struct_res, make_res):
"""
Convert a `structure._Residue` into a `residue.Residue`.
:param struct_res: A structure residue to convert. If this is a
`residue.Residue` object, it will be returned unchanged.
:type struct_res: structure._Residue or residue.Residue
:param make_res: A method to convert a string into a
`residue.Residue`
:type make_res: callable
:return: A newly created residue
:rtype: residue.Residue
"""
if isinstance(struct_res, residue.Residue):
return struct_res
res = make_res(struct_res.pdbres)
alpha_carbon = struct_res.getAlphaCarbon()
if alpha_carbon is None:
b_factor = struct_res.temperature_factor
else:
b_factor = alpha_carbon.temperature_factor
if b_factor is None:
b_factor is struct_res.temperature_factor
if b_factor is None:
res.b_factor = None
else:
res.b_factor = b_factor if b_factor > 0.0 else None
for item in [
'secondary_structure', 'resnum', 'inscode', 'molecule_number'
]:
setattr(res, item, getattr(struct_res, item))
return res
def _addStructureless(self, residues, seqres, make_res):
"""
Combine the structured and structureless residues.
:param residues: A list of structured residues.
:type residues: list[schrodinger.structure._Residue]
:param seqres: A list of structureless residue names. This must be a
`OneIndexedList` since the i_pdb_seqres_index property values are
one-indexed.
:type seqres: OneIndexedList[str]
:param make_res: A method to convert a string into a `residue.Residue`
:type make_res: callable
:return: A list of structured residues (as
`schrodinger.structure._Residue` objects) and structureless residues
(as `residue.Residue` objects).
:rtype: list
"""
make_newres = partial(self._seqresToResidues, make_res=make_res)
if not residues:
# If no structure residues were passed in, we just return
# a list of structureless residues.
return make_newres(seqres)
all_res = seqres.copy()
no_seqres_res = collections.defaultdict(list)
seqres_index_found = False
prev_seqres_index = 0
# add structured residues to all_res if they have a seqres_index
# property
for cur_res in residues:
cur_seqres_index = cur_res.atom[1].property.get(
'i_pdb_seqres_index')
if cur_seqres_index is None:
# This structured residue doesn't appear in the SEQRES list, so
# it will get added to the sequence below
no_seqres_res[prev_seqres_index].append(cur_res)
elif cur_seqres_index > len(all_res):
# The seqres_index value is too large, so we'll put this residue
# at the end of the sequence
no_seqres_res[len(all_res)].append(cur_res)
elif not isinstance(all_res[cur_seqres_index], str):
# We've already seen a structured residue with this seqres
# index, so we stick this one after it
no_seqres_res[cur_seqres_index].append(cur_res)
else:
# The value in all_res is a string, which means that this is the
# first time we've seen this seqres index. Replace the string
# residue in all_res with cur_res.
seqres_index_found = True
all_res[cur_seqres_index] = cur_res
prev_seqres_index = cur_seqres_index
if not seqres_index_found:
# None of the structured residues had valid seqres indices (e.g.
# all-DNA chain), so we don't attempt to add structureless residues
return residues
# add in structured residues that didn't have a valid seqres_index
for seqres_index, res_to_insert in sorted(no_seqres_res.items(),
reverse=True):
# insert res_to_insert after seqres_index instead of before
insert_index = seqres_index + 1
all_res[insert_index:insert_index] = res_to_insert
# convert any remaining strings to residue objects
res_before_seqres = None
seqres_to_convert = []
fully_converted = []
for cur_res in all_res:
if isinstance(cur_res, str):
seqres_to_convert.append(cur_res)
continue
elif seqres_to_convert:
converted = make_newres(seqres_to_convert,
start_res=res_before_seqres,
end_res=cur_res)
fully_converted.extend(converted)
seqres_to_convert = []
res_before_seqres = cur_res
fully_converted.append(cur_res)
if seqres_to_convert:
converted = make_newres(seqres_to_convert,
start_res=res_before_seqres)
fully_converted.extend(converted)
return fully_converted
def _addStructurelessTerminalOnly(self, residues, seqres, make_res):
"""
Add any structureless residues that appear at the N- and C-termini, but
ignore all other structureless residues. This method should be used for
structures generated by Prime prior to 20-3 to work around PRIME-4376.
(The i_pdb_seqres_index for modeled loops was populated with meaningless
numbers.)
See `_addStructureless` for argument and return value documentation.
"""
seqres_indices = [
res.atom[1].property.get('i_pdb_seqres_index') for res in residues
]
seqres_indices = [idx for idx in seqres_indices if idx is not None]
if not seqres_indices:
return residues
make_newres = partial(self._seqresToResidues, make_res=make_res)
converted = residues.copy()
min_seqres = min(seqres_indices)
if min_seqres > 1:
newres = make_newres(seqres[:min_seqres], end_res=residues[0])
converted[0:0] = newres
max_seqres = max(seqres_indices)
if max_seqres < len(seqres):
newres = make_newres(seqres[max_seqres + 1:],
start_res=residues[-1])
converted.extend(newres)
return converted
def _seqresToResidues(self,
seqres,
*,
make_res,
start_res=None,
end_res=None):
"""
Convert the list of residue names to a list of `residue.Residue` objects
using the given previous and next residue numbers.
:param seqres: A list of residue names.
:type seqres: list[str]
:param make_res: A method to convert a string into a
`residue.Residue`
:type make_res: callable
:param start_res: Previous residue. Pass None if the residues are
N-terminal
:type start_res: residue.Residue or NoneType
:param end_res: Next residue. Pass None if the residues are C-terminal
:type end_res: residue.Residue or NoneType
:return: The newly created residues
:rtype: list[residue.Residue]
"""
new_residues = []
for resname in seqres:
res = make_res(resname)
res.seqres_only = True
new_residues.append(res)
sequence.assign_residue_numbers(new_residues, start_res, end_res)
return new_residues
def _get_name_and_description(props):
"""
Get the appropriate short name and description from the structure property
dictionary
:param props: Structure property dictionary
:return: A short name and description
:rtype: tuple(str, str)
"""
maestro_title = props.get("s_m_title")
pdb_id = props.get("s_pdb_PDB_ID")
entry_name = props.get("s_m_entry_name", '')
pdb_description = props.get("s_pdb_PDB_TITLE", '')
name = maestro_title or entry_name
if not name:
name = pdb_id or pdb_description
else:
try:
pdbparts = parse_pdb_id(name.replace("_chain", "_"))
except GetSequencesException:
pass
else:
name = pdbparts.pdbcode
long_name_parts = [pdb_description]
if entry_name:
long_name_parts.insert(0, entry_name)
elif maestro_title:
long_name_parts.insert(0, maestro_title)
long_name = " | ".join(long_name_parts)
return name, long_name
[docs]class MMSequenceConverter(object):
"""
Converts sequence between mmseq and MSV sequence formats.
:note: This is supposed to be used with 'with' context manager.
"""
def __enter__(self):
"""
Initializes mmseq and mmseqio.
"""
mm.mmseq_initialize(mm.error_handler)
mm.mmseqio_initialize(mm.error_handler)
return self
def __exit__(self, exc_type, exc_value, traceback):
"""
Terminates mmseq and mmseqio.
"""
mm.mmseq_terminate()
mm.mmseqio_terminate()
@classmethod
def _makeSequence(cls, handle):
"""
Converts a mmseq sequence object specified by handle to
`schrodinger.protein.sequence.Sequence` object.
:type handle: mmseq handle
:param handle: Handle of mmseq object.
:rtype `schrodinger.protein.sequence.Sequence`
:return: Sequence in MSV format.
"""
name = mm.mmseq_get_name(handle)
match = re.match("(.*)_[A-Za-z0-9]+$", name)
if match is not None:
# strip out chain name from the sequence name
name = match.group(1)
codes = mm.mmseq_get_all_codes(handle)
seq = sequence.make_sequence(codes, name=name)
seq.chain = mm.mmseq_get_chainstr(handle)
return seq
@classmethod
def _makeMMSEQ(cls, seq):
"""
Converts a `schrodinger.protein.sequence.ProteinSequence` to mmseq
object.
:note: The returned handle needs to be deleted by calling
mm.mmseq_delete.
:type seq: `schrodinger.protein.sequence.ProteinSequence`
:param seq: Sequence to be converted.
:rtype: int
:return: mmseq sequence handle.
"""
seq_handle = mm.mmseq_new()
for index, res in enumerate(seq):
if res.is_res:
mm.mmseq_insert_code(seq_handle, index, res.short_code,
res.long_code)
else:
mm.mmseq_insert_code(seq_handle, index, '~', ' ')
mm.mmseq_res_set_is_gap(seq_handle, index + 1, True)
mm.mmseq_res_set_in_use(seq_handle, index + 1, True)
mm.mmseq_set_name(seq_handle, seq.name)
mm.mmseq_set_chain(seq_handle, seq.chain)
return seq_handle
[docs] @classmethod
def readSequences(cls, file_name, file_format=mm.MMSEQIO_ANY):
"""
Reads all sequences from file specified by file_name.
:type file_name: str
:param file_name: Name of input file.
:type file_format: int
:param file_format: Format of the input file. By default, the format
is MMSEQIO_ANY meaning file type is automatically recognized.
:rtype: List of `schrodinger.protein.sequence.Sequence`.
:return: List of sequences read from the file.
:raise GetSequencesException: If the file could not be read.
"""
if file_format == mm.MMSEQIO_ANY and file_name.endswith(".msf"):
# SHARED-6945: mmseqio does not auto-detect MSF
file_format = mm.MMSEQIO_MSF
try:
f_handle = mm.mmseqio_open_file(file_name, mm.MMSEQIO_READ,
file_format)
sequences = []
num_seqs = mm.mmseqio_get_num_sequences(f_handle)
for seq_index in range(num_seqs):
seq_handle = mm.mmseqio_read_sequence(f_handle, seq_index + 1)
sequences.append(cls._makeSequence(seq_handle))
mm.mmseqio_close_file(f_handle)
except mm.MmException as err:
msg = "Error reading %s\n\n%s" % (file_name, str(err))
raise GetSequencesException(msg)
return sequences
[docs] @classmethod
def writeSequences(cls,
sequences,
file_name,
file_format=mm.MMSEQIO_NATIVE):
"""
Writes sequences to a file specified by file_name.
:raises mmcheck.MmException: If the file could not be open for writing.
:type sequences: list of `schrodinger.protein.sequence.ProteinSequence`
:param seqences: List of sequences to be written to file.
:type file_name: str
:param file_name: Name of input file.
:type file_format: int
:param file_format: Format of the input file. By default, the format
is MMSEQIO_NATIVE.
"""
f_handle = mm.mmseqio_open_file(file_name, mm.MMSEQIO_WRITE,
file_format)
for seq in sequences:
seq_handle = cls._makeMMSEQ(seq)
mm.mmseqio_write_sequence(f_handle, seq_handle)
mm.mmseq_delete(seq_handle)
mm.mmseqio_close_file(f_handle)
[docs]class BaseProteinAlignmentReader(object):
"""
Base class for reading protein sequence alignments from files.
"""
@staticmethod
def _makeSequencesFromElements(sequences):
"""
Helper function that creates ProteinAlignment from an ordered dictionary
of sequence names and elements
:param sequences: Ordered dictionary of sequence elements
:type sequences: collections.OrderedDict
:return: A list of sequences
:rtype: list
"""
return [
sequence.make_sequence(elements, name=name)
for name, elements in sequences.items()
]
[docs] @classmethod
def read(cls, file_name, AlnCls=alignment.ProteinAlignment):
"""
Returns alignment read from file
:note: The alignment can be empty if no sequence was present in the
input file.
:param file_name: Source file name
:type file_name: str
:param AlnCls: The type of the Alignment to return
:type AlnCls: type
:return: An alignment of the specified type
:raises IOError: If file cannot be read
"""
raise NotImplementedError(
"Subclasses should implement this classmethod")
[docs]class ClustalAlignmentReader(BaseProteinAlignmentReader):
"""
Class for reading Clustal `*.aln` files.
"""
[docs] @classmethod
def read(cls, file_name, AlnCls=alignment.ProteinAlignment):
"""
:param file_name: Source file name
:type file_name: str
:param AlnCls: The type of the Alignment to return
:type AlnCls: type
:return: An alignment of the specified type
"""
sequences = collections.OrderedDict()
with open(file_name, "r") as f:
next(f) # Skip first line
for line in f:
if not line or line[0].isspace():
continue
chunks = line.split()
if len(chunks) < 2:
continue
seq_name = chunks[0]
seq_elements = chunks[1]
if seq_name in sequences:
sequences[seq_name] += seq_elements
else:
sequences[seq_name] = seq_elements
seqs = cls._makeSequencesFromElements(sequences)
aln = AlnCls(seqs)
return aln
[docs]class SeqDReader:
class _RawProteinSequence:
"""
A helper class to store raw information about a sequence.
After adding raw information, a ProteinSequence can be generated
with the `toProteinSequence` method.
"""
def __init__(self, seq_name, chain_name):
self.chain_name = chain_name
self.resnums = []
self.rescodes = []
self.ssa_list = []
self.descriptors = collections.defaultdict(list)
self.seq_name = seq_name
def addResnum(self, resnum):
self.resnums.append(int(resnum))
def addRescode(self, rescode):
self.rescodes.append(rescode)
def addSSA(self, ssa):
self.ssa_list.append(ssa)
def addDescriptor(self, descriptor_name, descriptor_value):
self.descriptors[descriptor_name].append(descriptor_value)
def toProteinSequence(self):
descriptors = self.descriptors
for k, v in descriptors.items():
def cast_values_to_float(values):
return list(map(float, values))
try:
descriptors[k] = cast_values_to_float(v)
except ValueError:
# If we can't cast the values to float, we just leave
# them as strings.
pass
seq = sequence.ProteinSequence(self.rescodes,
name=self.seq_name,
resnums=self.resnums,
chain=self.chain_name)
for descriptor_name, descriptor_values in descriptors.items():
for res, desc_value in zip(seq, descriptor_values):
res.updateDescriptors({descriptor_name: desc_value})
for idx, ssa in enumerate(self.ssa_list):
ssa_val = SSA_MAP.get(ssa)
seq[idx].secondary_structure = ssa_val
return seq
[docs] @classmethod
def read(cls, file_name):
with open(file_name) as infile:
header = infile.readline().strip()
seq_name = ''
if ',' not in header:
seq_name = header
header = infile.readline().strip()
fieldnames = [field.strip() for field in header.split(',')]
for col in SEQD_REQUIRED_COLUMNS:
if col not in fieldnames:
err_msg = f'SeqD file format requires a "{col}" column.'
raise IOError(err_msg)
get_ssa = 'SSA' in fieldnames
csv_reader = csv.DictReader(infile, fieldnames=fieldnames)
raw_seqs = {} # chain name to list of residue names
for line in csv_reader:
if line['Chain'] not in raw_seqs:
raw_seqs[line['Chain']] = cls._RawProteinSequence(
seq_name, line['Chain'])
raw_seq = raw_seqs[line['Chain']]
raw_seq.addRescode(line['ResName'])
raw_seq.addResnum(line['ResID'])
if get_ssa:
raw_seq.addSSA(line['SSA'])
for column_name, row_value in line.items():
if column_name in ('ResID', 'Chain', 'ResName', 'SSA'):
continue
raw_seq.addDescriptor(column_name, row_value)
seqs = []
for raw_seq in raw_seqs.values():
seqs.append(raw_seq.toProteinSequence())
return seqs
[docs]class FastaAlignmentReader:
@staticmethod
def _parseCodes(codes):
"""
Parses string of single-letter amino acid codes, optionally
interleaved with residue numbers. Converts the codes to uppercase
and filters out all characters except for A-Z and gap codes.
:type codes: str
:param codes: String to parse.
:rtype: Tuple of (str, list of ints)
:return: Parsed sequence elements string and list of residue numbers.
"""
codes = codes.upper()
if not HAS_DIGIT_RE.search(codes):
# If there are no digits in the input, don't waste time trying to
# parse out non-existant residue numbers
return (codes, ())
resnum = 1
nums = ""
elements = ""
gaps = ['.', '-', '~']
numbers = []
for ch in codes:
if '0' <= ch <= '9':
nums += ch
elif 'A' <= ch <= 'Z' or ch in gaps:
if nums:
resnum = int(nums)
nums = ""
elements += ch
if ch not in gaps:
numbers.append(resnum)
resnum += 1
return (elements, numbers)
[docs] @classmethod
def parseSSA(cls, seq):
"""
Parse a SSA sequence into a list of SSA values that can be assigned
to residues' `secondary_structure` property
:param seq: the "sequence" from the FASTA file which encodes the SSA
values
:type seq: str
:return: a list of the SSA values. The SSA values come from
schrodinger.structure. Returns None if any of the elements was
invalid
:type: list(int) or NoneType
"""
seq = seq.replace('\n', '')
anno_vals = []
for ele in seq:
val = SSA_MAP.get(ele, INVALID_SSA_VAL)
if val is INVALID_SSA_VAL:
return None
anno_vals.append(val)
return anno_vals
[docs] @classmethod
def read(cls, file_name, AlnClass=alignment.ProteinAlignment):
"""
Loads a sequence file in FASTA format, creates sequences and appends
them to alignment. Splits sequence name from the FASTA header.
:param file_name: name of input FASTA file
:type file_name: str
:param AlnClass: The class of the alignment object to return
:type AlnClass: type
:return: Read alignment.
:rtype: AlnClass
"""
with open(file_name) as f:
lines = [line for line in f if line]
return cls.readFromText(lines, AlnClass)
[docs] @classmethod
def readFromText(cls, lines, AlnClass=alignment.ProteinAlignment):
"""
Read sequences from FASTA-formatted text, creates sequences and appends
them to alignment. Splits sequence name from the FASTA header.
:param lines: list of strings representing FASTA file
:type lines: list of str
:param AlnClass: The class of the alignment object to return
:type AlnClass: type
:return: The alignment
:rtype: AlnClass
"""
seq_list = []
fasta_parts = None
for group in _partition_by_predicate(
lines, lambda x: x[0] in FASTA_COMMENT_CHARS):
cur_header = group[0]
seq_rows = group[1:]
# Store first comment row to use as long_name
if fasta_parts is None or cur_header[0] == ">":
header = cur_header
fasta_parts = parse_fasta_header(header)
if len(seq_rows) == 0:
continue
seq = "".join(seq_rows)
# Apply annotation values to the previous sequence which was parsed
# but only if the ssa's and sequence's name and chain name match
if fasta_parts.anno_type and len(seq_list):
prev_seq = seq_list[-1]
if (prev_seq.chain == fasta_parts.chain and
prev_seq.name == fasta_parts.name and
prev_seq.long_name == fasta_parts.long_name and
fasta_parts.anno_type is
PSAnno.ANNOTATION_TYPES.secondary_structure):
anno_values = cls.parseSSA(seq)
if anno_values:
for idx, val in enumerate(anno_values):
prev_seq[idx].secondary_structure = val
continue
elements, numbers = cls._parseCodes(seq)
seq = sequence.make_sequence(elements,
name=fasta_parts.name,
long_name=fasta_parts.long_name,
chain=fasta_parts.chain,
resnums=numbers)
seq_list.append(seq)
# Reset fasta_parts after sequences have been processed
fasta_parts = None
return AlnClass(sequences=seq_list)
[docs] @classmethod
def readFromStringList(cls, strings, AlnClass=alignment.ProteinAlignment):
"""
Return an alignment object created from an iterable of sequence strings
:param strings: Sequences as iterable of strings (1D codes)
:type strings: Iterable of strings
:param AlnClass: The class of the alignment object to return
:type AlnClass: type
:return: The alignment
:rtype: AlnClass
"""
fastalines = ((">{0}".format(i), ln) for i, ln in enumerate(strings))
return cls.readFromText(itertools.chain(*fastalines), AlnClass)
################################################################################
# Writers
################################################################################
[docs]def to_biopython(seq):
"""
Converts a sequence to a Biopython sequence
:param seq: A sequence to convert to a Biopython sequence
:type seq: schrodinger.protein.sequence.ProteinSequence
:return: The sequence converted to a Biopython SeqRecord
:rtype: Bio.SeqRecord.SeqRecord
"""
seq_str = str(seq).replace(seq.gap_char, '')
return BioSeqRecord(BioSeq(seq_str), name=seq.name)
[docs]class BaseProteinAlignmentWriter(object):
"""
Class for writing protein alignments to files.
"""
@staticmethod
def _makeSeqName(seq, aln, index, make_unique, use_long_name=True):
"""
Returns a name for the sequence. If make_unique is True, guarantees that
no other seq in the aln will have this name
"""
def get_name(the_seq):
if use_long_name and the_seq.long_name:
return the_seq.long_name
return the_seq.name
name = get_name(seq)
if not make_unique:
return name
uniques = {(get_name(seq_), seq_.chain) for seq_ in aln}
use_chains = len(uniques) == len(aln)
uniquifier = str(index + 1)
if use_chains and seq.chain:
uniquifier = seq.chain
return f"{name}_{uniquifier}"
[docs] @classmethod
def write(cls, aln, file_name, **kwargs):
"""
Writes aln to a file.
:type aln: `BaseAlignment`
:param aln: Alignment to be written to a file.
:type file_name: str
:param file_name: Destination file name.
:Note: Subclasses may take additional `**kwargs` as write options
"""
raise NotImplementedError(
"Subclasses should implement this classmethod")
[docs]class FastaAlignmentWriter(BaseProteinAlignmentWriter):
"""
Class for writing FASTA .fasta files.
Format is described here:
U{Fasta format wikipedia<https://en.wikipedia.org/wiki/FASTA_format>}
"""
HEADER_START = '>'
HEADER_END = ''
[docs] @classmethod
def toString(cls, aln, use_unique_names=True, maxl=50):
return cls.toStringAndNames(aln,
use_unique_names=use_unique_names,
maxl=maxl)[0]
[docs] @classmethod
def toStringAndNames(cls,
aln,
use_unique_names=True,
maxl=50,
export_annotations=False,
sim_ref_seq=None):
"""
Converts aln to FASTA string
:param aln: Structured sequences
:type aln: `ProteinAlignment`
:param use_unique_names: If True, write unique name for each sequence.
:type use_unique_names: bool
:param maxl: Maximum length of a line
:type maxl: int
:param export_annotations: Whether annotations should be exported along
with sequence information. If True, annotations listed in
`EXPORT_ANNOTATIONS` will be exported.
:type export_annotations: bool
:param sim_ref_seq: Reference sequence to calculate similarities for
the sequences to be exported. If None, similarity will not be exported.
:type sim_ref_seq: `sequence.Sequence` or None
:return: FASTA string
:rtype: string
"""
outlines = []
names = []
for index, seq in enumerate(aln):
# Format title
name = cls._makeSeqName(seq,
aln,
index,
use_unique_names,
use_long_name=True)
names.append(name)
h_start = cls.HEADER_START
h_end = cls.HEADER_END
chain = seq.chain if seq.chain is not None else ""
header_name = NAME_STR.format(name=name, chain=chain)
base_header = h_start + header_name
if sim_ref_seq:
ident = seq.getIdentity(sim_ref_seq) * 100
sim = seq.getSimilarity(sim_ref_seq) * 100
hom = seq.getConservation(sim_ref_seq) * 100
base_header += SIM_STR.format(id=ident, sim=sim, hom=hom)
header = base_header + h_end
# Format sequence (wrapped at maxl)
s = str(seq)
s = s.replace(seq.gap_char, "-") # FASTA uses "-" to denote gaps
cls._appendSeqOrAnnotationLine(header, s, outlines, maxl)
if export_annotations:
for header_ext, anno in EXPORT_ANNOTATIONS.items():
anno_header = base_header + header_ext + h_end
anno_vals = getattr(seq.annotations, anno.name)
if anno == PSAnno.ANNOTATION_TYPES.secondary_structure:
anno_vals = [SSA_MAP_RV.get(v, '-') for v in anno_vals]
anno_str = ''.join(anno_vals)
cls._appendSeqOrAnnotationLine(anno_header, anno_str,
outlines, maxl)
outstr = '\n'.join(outlines)
return outstr, names
@staticmethod
def _appendSeqOrAnnotationLine(header, row, outlines, maxl):
"""
Append a header and sequence or annotation info to the list of rows.
:param header: Header for this row
:type header: str
:param row: Sequence or annotation row
:type row: str
:param outlines: List of output lines to append row to
:type outlines: list
:param maxl: Max line length
:type maxl: int
"""
outlines.append(header)
s = row
outlines.extend(
[s[i * maxl:(i + 1) * maxl] for i in range(len(s) // maxl + 1)])
[docs] @classmethod
def toStringList(cls, aln):
"""
Convert ProteinAlignment object to list of sequence strings
:param aln: Alignment data
:type aln: `ProteinAlignment`
:rtype: list of str
:return: A list of sequence strings representing the alignment
"""
fastastr = cls.toString(aln, maxl=10000).split('\n')
return [s for s in fastastr if '>' not in s]
[docs] @classmethod
def write(cls,
aln,
file_name,
use_unique_names=True,
maxl=50,
export_annotations=False,
sim_ref_seq=None,
**kwargs):
"""
Write aln to FASTA file
:raises IOError: If output file cannot be written.
:param aln: Structured sequences
:type aln: `ProteinAlignment`
:param use_unique_names: If True, write unique name for each sequence.
:type use_unique_names: bool
:param maxl: Maximum length of a line
:type maxl: int
:type file_name: str
:param file_name: Destination file name.
:param export_annotations: Whether annotations should be exported along
with sequence information. If True, annotations listed in
`EXPORT_ANNOTATIONS` will be exported.
:type export_annotations: bool
:param sim_ref_seq: Reference sequence to calculate similarities for
the sequences to be exported. If None, similarity will not be exported.
:type sim_ref_seq: `sequence.Sequence` or None
:return: output names of each sequence
:rtype: list of str
"""
with open(file_name, 'w') as f:
fasta_str, names = cls.toStringAndNames(
aln,
use_unique_names=use_unique_names,
maxl=maxl,
export_annotations=export_annotations,
sim_ref_seq=sim_ref_seq)
f.write(fasta_str)
return names
[docs]class ClustalAlignmentWriter(BaseProteinAlignmentWriter):
"""
Class for writing Clustal `*.aln` files.
The format is described here:
`http://meme-suite.org/doc/clustalw-format.html`
"""
[docs] @classmethod
def write(cls, aln, file_name, use_unique_names=True, **kwargs):
"""
Writes aln to a Clustal alignment file.
Note: `**kwargs` are ignored, to preserve signature of BaseProteinAlignmentWriter
:raises IOError: If output file cannot be written.
:type aln: `BaseAlignment`
:param aln: Alignment to be written to a file.
:type file_name: str
:param file_name: Destination file name.
:param use_unique_names: If True, write unique name for each sequence.
:type use_unique_names: bool
:rtype: dict
:return: A mapping of names written to the clustal file and sequences
"""
# prepare list of sequences, replace '~' with '.'
name_seq_mapping = {}
sequences = collections.OrderedDict()
for index, seq in enumerate(aln):
name = cls._makeSeqName(seq,
aln,
index,
use_unique_names,
use_long_name=False)
name = name.replace(' ', '_')
name_seq_mapping[name] = seq
# clustal uses "-" to denote gaps
sequences[name] = list(str(seq).replace(seq.gap_char, '-'))
# remove terminal gaps
while len(sequences[name]) and sequences[name][-1] == '-':
sequences[name].pop()
# Pad sequences with gaps so they're rectangular
max_len = max(len(seq) for seq in sequences.values())
for name, elements in sequences.items():
elements += ['-'] * (max_len - len(elements))
sequences[name] = ''.join(elements)
name_length = max(map(len, list(sequences))) + 1
lines = []
# header
lines.append("CLUSTAL W (2.0) multiple sequence alignment\n\n")
# write 60 characters per line
line_length = 60
pos = 0
sequences_left = True
while sequences_left:
sequences_left = False
for name, elements in sequences.items():
line = name.ljust(name_length)
if pos < len(elements):
line += elements[pos:pos + line_length]
if pos + line_length < len(elements):
sequences_left = True
lines.append(line + '\n')
lines.append('\n')
pos += line_length
with open(file_name, "w") as f:
f.writelines(lines)
return name_seq_mapping
[docs]class CSVAlignmentWriter(BaseProteinAlignmentWriter):
[docs] @classmethod
def write(cls, aln, file_name, export_descriptors=False, **kwargs):
descriptor_names = []
descriptor_values = []
if export_descriptors:
descriptors = aln.getSeqsDescriptors()
descriptor_names = [desc.display_name for desc in descriptors]
with csv_unicode.writer_open(file_name) as fh:
csv_writer = csv.writer(fh)
headers = ['ID', 'Name', 'Chain', 'Sequence'] + descriptor_names
csv_writer.writerow(headers)
for idx, seq in enumerate(aln, start=1):
seq_name = seq.name
chain = seq.chain
seq_str = str(seq)
if export_descriptors:
descriptor_values = [
seq.getProperty(desc) if seq.getProperty(desc) else ''
for desc in descriptors
]
row = [idx, seq_name, chain, seq_str] + descriptor_values
csv_writer.writerow(row)
[docs]class SeqDAlignmentWriter(BaseProteinAlignmentWriter):
"""
Class to write sequence and descriptors to seqd file. Each sequence is
exported to a seqd file with name "<seq_name>_<chain_name>.seqd"
"""
[docs] @classmethod
def write(cls,
aln,
file_name,
export_descriptors=True,
export_annotations=False,
**kwargs):
for seq in aln:
seq_name = seq.name
headers = list(SEQD_REQUIRED_COLUMNS)
anno_to_add = False
if export_annotations:
for name, anno in EXPORT_ANNOTATIONS.items():
anno_header = name.strip("|")
anno_to_add = anno_header not in headers
if anno_to_add:
headers.append(anno_header)
anno_vals = getattr(seq.annotations, anno.name)
if anno == PSAnno.ANNOTATION_TYPES.secondary_structure:
anno_vals = [
SSA_MAP_RV.get(v, '-') for v in anno_vals
]
if export_descriptors:
headers += seq[0].getDescriptorKeys()
with csv_unicode.writer_open(file_name) as fh:
csv_writer = csv.writer(fh)
for row in ([seq_name], headers):
csv_writer.writerow(row)
for i, res in enumerate(seq):
row = [res.resnum, res.chain, res.short_code]
if anno_to_add:
row.append(anno_vals[i])
if export_descriptors:
descriptors = [
res.getDescriptorValue(desc)
for desc in res.getDescriptorKeys()
]
row += descriptors
assert len(row) == len(headers)
csv_writer.writerow(row)
[docs]def parse_long_name(long_name, permissive=True):
"""
Attempt to parse a long_name into a short name and a chain.
Example: 1FSK:A --> 1FSK, A
2BJM.H VH CDR_LENGTH: 5 17 11 --> 2BJM, H
sp|accession|entry name --> accession, ""
:param long_name: The long name to attempt to parse
:type long_name: str
:param permissive: Whether to use permissive parsing. See `parse_pdb_id`
for documentation.
:type permissive: bool
:return: A short name and a chain id
:rtype: PdbParts
"""
pdbparts = PdbParts(pdbcode="", pdbchain="")
# Next try NCBI formats
pipe_parts = long_name.split("|", 1)
if len(pipe_parts) == 2 and pipe_parts[0] in NCBI_FLAGS:
# Get first whitespace-separated word
ncbi_start = long_name.strip().split(None, 1)[0]
ncbi_parts = ncbi_start.split("|")
chain = ""
if ncbi_parts[0] == "pdb" and len(ncbi_parts) >= 3:
name_parts = ncbi_parts[1:2]
chain = ncbi_parts[2]
else:
name_parts = (part for part in ncbi_parts[1:] if part)
return PdbParts(pdbcode="|".join(name_parts), pdbchain=chain)
# Split by separator and/or whitespace
# Only split by "|" if there is no whitespace
if len(long_name.strip().split(None, 1)) == 1:
long_name = long_name.replace("|", " ")
separators = [","]
for sep in separators:
long_name = long_name.replace(sep, " ")
parts = long_name.split(None, 1)
if not parts:
return pdbparts
# Use first "word" as name
split_name = parts[0]
try:
pdbparts = parse_pdb_id(split_name, permissive=permissive)
except GetSequencesException:
# If parsing fails, don't extract a chain ID
return PdbParts(pdbcode=split_name, pdbchain=pdbparts.pdbchain)
else:
return pdbparts
[docs]def reorder_fasta_alignment(aln, orig_names):
"""
Reorder a FASTA alignment to match the order of names written to FASTA.
Intended for use after alignment methods that reorder the output.
Example usage::
orig_names = seqio.FastaAlignmentWriter.write(orig_aln, input_filename)
# run alignment method
aln = seqio.FastaAlignmentReader.read(out_filename)
reorder_fasta_alignment(aln, orig_names)
:param aln: Alignment to reorder. Will be modified in place.
:type aln: alignment.BaseAlignment
:param orig_names: Original order of sequence names as written to FASTA.
:type orig_names: list[str]
:raises ValueError: If the alignments have different lengths or mismatched
names
"""
if len(orig_names) != len(aln):
raise ValueError("The length of the alignment and names do not match "
f"({len(aln)}, {len(orig_names)})")
new_long_names = [seq.long_name for seq in aln]
reorder_indices = []
for name in orig_names:
if name in new_long_names:
idx = new_long_names.index(name)
else:
raise ValueError(
f"Could not find a sequence corresponding to {name}")
reorder_indices.append(idx)
aln.reorderSequences(reorder_indices)