"""
Module for downloading PDB files from the web.
The data is retrieved from the RCSB. Current download URLs are documented
at http://www.rcsb.org/pdb/static.do?p=download/http/index.html
Running this module is no different from using a web-browser to access
the site - it's just a different type of web client. Therefore this should
cause no problems for the maintainers of that site and be within the
terms and conditions of use.
Note that certain assumptions are made about the layout of the web site -
changes there in future may make this script stop working.
Copyright Schrodinger, LLC. All rights reserved.
"""
import gzip
import os
import shutil
import sys
import tempfile
import requests
import requests.packages.urllib3
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util import retry
from schrodinger.utils import fileutils
from schrodinger.utils import log
from schrodinger.utils import subprocess
requests.packages.urllib3.disable_warnings()
# Constants for get_pdb() function:
AUTO, DATABASE, WEB = list(range(3))
# Main download URL. FASTA files are downloaded differently.
_RCSB_URL = 'https://files.rcsb.org/download/'
# EMDB download URL.
_EMDB_URL = 'https://ftp.rcsb.org/pub/emdb/structures'
# HTTP error codes for which request retry should happen
RETRY_HTTP_CODES = (500, 502, 503, 504)
logger = log.get_output_logger("getpdb")
logger.setLevel(log.INFO)
def _download_file_from_url(url, dest_file):
"""
Download file from given URL to the specified destination file. If CWD
is not writable, file will be written to the temporary directory.
:param url: URL to download the file from.
:type url: str
:param dest_file: Path where to write the file to.
:type dest_file: str
:return: Path to the written file.
:rtype: str
"""
with requests_retry_session() as session:
request = session.get(url, stream=True, verify=False)
request.raise_for_status()
with open_filename(dest_file, 'wb') as fh:
# If CWD is not writable, output path will be in temporary dir:
dest_file = fh.name
# Fetch by chunks of 8 KiB:
for chunk in request.iter_content(8192):
fh.write(chunk)
return dest_file
[docs]def download_file(filename):
"""
Download the given file from RCSB and save it to either CWD or temp dir
with same name. Path to the written file is returned.
:param filename: File to download from RSCB web site.
:type filename: str
:raises requests.HTTPError: if error in connection to RCSB.
"""
url = _RCSB_URL + filename
return _download_file_from_url(url, filename)
def _decompress_gz_file(compressed_file, dest_file):
"""
Decompress the given `*.gz` file to the given destination path, and delete
the original file. If destination path is not writable, the file will be
written to a temporary directory, and new path is returned.
:param compressed_file: Path to the file to decompress.
:type compressed_file: str
:param dest_file: Path to the file to write.
:type dest_file: str
:return: Path to the written file, which will be different form dest_file
if CWD is not writable.
:rtype: str
"""
outfile = None
with gzip.open(compressed_file, 'rb') as in_fh:
with open_filename(dest_file, 'w', encoding="utf-8") as out_fh:
for line in in_fh:
out_fh.write(line.decode())
# File path may be different form dest_file:
outfile = out_fh.name
os.remove(compressed_file)
return outfile
def _decompress_binary_gz_file(compressed_file, dest_file):
"""
Decompress the given binary `*.gz` file to the given destination path,
and delete the original file. If destination path is not writable,
the file will be written to a temporary directory, and new path is returned.
:param compressed_file: Path to the file to decompress.
:type compressed_file: str
:param dest_file: Path to the file to write.
:type dest_file: str
:return: Path to the written file, which will be different form dest_file
if CWD is not writable.
:rtype: str
"""
outfile = None
with gzip.open(compressed_file, 'rb') as in_fh:
with open_filename(dest_file, 'wb') as out_fh:
shutil.copyfileobj(in_fh, out_fh)
# File path may be different form dest_file:
outfile = out_fh.name
os.remove(compressed_file)
return outfile
[docs]def download_sf(pdb_code):
"""
Download the ENT file for the given PDB ID, converts it to CNS
format, and returns the CNS file name. Will raise a RuntimeError
if either download or conversion fails.
Not every pdb has structure factor files deposited, and not every
structure factor file will convert perfectly.
"""
try:
ent_file = download_ent(pdb_code)
except (RuntimeError, requests.HTTPError) as err:
if "404" in str(err):
msg = "No Structure factors deposited for %s" % pdb_code
raise RuntimeError(msg)
raise
refconvert = os.path.join(os.environ['SCHRODINGER'], 'utilities',
'refconvert')
cv_file = os.path.splitext(ent_file)[0] + '.sv'
cmd = [
refconvert,
'-icif',
ent_file,
'-ocns',
cv_file,
]
ret = subprocess.call(cmd)
if ret != 0 or not os.path.isfile(cv_file):
raise RuntimeError("Failed to convert ENT file: %s" % ent_file)
# TODO: Remove the ent_file
return cv_file
[docs]def download_fasta(pdb_code):
"""
Attempts to download the fasta file for the given PDB ID and chain.
:type pdb_code: str
:param pdb_code: PDB ID of the file to download
"""
# Currently, the RCSB website downloads from the http address, but the
# https address also exists.
url = f'https://www.rcsb.org/fasta/entry/{pdb_code}/download'
fasta_file = f"{pdb_code}.fasta"
_download_file_from_url(url, fasta_file)
return fasta_file
[docs]def download_em_map(emdb_code):
"""
Attempts to download the EM map file for the given EMDB ID.
:type emdb_code: str
:param emdb_code: EMDB ID of the map file to download
"""
compressed_em_file = f"emd_{emdb_code}.map.gz"
em_file = "emd_%s.map" % emdb_code
url = f'{_EMDB_URL}/EMD-{emdb_code}/map/{compressed_em_file}'
_download_file_from_url(url, compressed_em_file)
em_file = _decompress_binary_gz_file(compressed_em_file, em_file)
return em_file
[docs]def get_pdb(pdbid, source=AUTO, caps_asis=False):
"""
Attempts to get the specified PDB file from either the database or
the web, depending on the source option. Default is AUTO, which attempts
the database first, and then the web.
pdbid - string of 4 characters
source - one of: AUTO, DATABASE, WEB.
:type caps_asis: bool
:param caps_asis: True if the capitalization of pdbid should be preserved,
False (default) if it should be converted to lowercase.
:return: Path to the PDB file that was written (`*.pdb` or `*.cif`)
:rtype: str
:raises requests.HTTPError: if error in connection to RCSB
:raises RuntimeError: for other error retreiving file
"""
if source == DATABASE:
pdb_file = retrieve_pdb(pdbid, caps_asis=caps_asis)
if not pdb_file:
raise RuntimeError(
"PDB '%s' could not be retrieved from the database" % pdbid)
else:
return pdb_file
elif source == WEB:
return download_pdb(pdbid)
elif source == AUTO:
pdb_file = retrieve_pdb(pdbid, caps_asis=caps_asis)
if pdb_file:
return pdb_file
else:
pdb_file = download_pdb(pdbid)
return pdb_file
else:
raise ValueError("Invalid source")
[docs]def retrieve_pdb(pdbid, local_repos=None, verbose=False, caps_asis=False):
"""
Attempt to retrieve the PDB from the local repository
First we look for current files ending in .gz or .Z, then obsolete
files with the same endings. The file name we search for is:
pdbXXXX.ent.Y where XXXX is the PDB code and Y is either gz or Z
:type pdbid: str
:param pdbid: the PDB code of the desired file
:type local_repos: list of str
:param local_repos: the paths to the parent directories of each local
repository.
:type caps_asis: bool
:param caps_asis: True if the capitalization of pdbid should be preserved,
False (default) if it should be converted to lowercase.
:rtype: str
:return: the name of the pdb file or None if a failure occurs
"""
local_pdb_file = find_local_pdb(pdbid,
local_repos,
verbose=verbose,
caps_asis=caps_asis)
if not local_pdb_file:
return None
if local_pdb_file.endswith('.gz'):
myfile = gzip.open(local_pdb_file, 'rb')
else:
# A compress .Z file - there is no nice way in Python to handle this
command = ['gzip', '-c', '-d', local_pdb_file]
# Run the job and capture stdout:
myfile = tempfile.TemporaryFile()
subprocess.call(command, stdout=myfile, stderr=myfile)
myfile.seek(0)
# Transfer the compressed contents to a new, uncompressed file
if caps_asis:
uncompressed_pdb_file = pdbid + '.pdb'
else:
uncompressed_pdb_file = pdbid.lower() + '.pdb'
with open_filename(uncompressed_pdb_file, 'wb') as fh:
# NOTE: If CWD is not writable, the file will be written to temp dir.
uncompressed_pdb_file = fh.name
for line in myfile:
fh.write(line)
myfile.close()
return uncompressed_pdb_file
[docs]def find_local_repository(verbose=False):
"""
Determine a directory list for local repositories.
Note: the location of the PDB directory can be specified via environment
variables; the order of precedence is:
* SCHRODINGER_PDB
* SCHRODINGER_THIRDPARTY/database/pdb
* SCHRODINGER/thirdparty/database/pdb (the default)
:type verbose: bool
:param verbose: True if debugging messages should be printed to the screen
:rtype: list of str
:return: the paths to the parent directories of each local repository. Returns
an empty list if the local repository cannot be determined.
"""
# Each environement variable has a different implied path to add to the end
# of it.
varlist = [('SCHRODINGER_PDB', ""),
('SCHRODINGER_THIRDPARTY', '/database/pdb'),
('SCHRODINGER', '/thirdparty/database/pdb')]
local_repos = []
for var, pathend in varlist:
try:
envvar = os.environ[var]
logger.debug('environment variable {} is set to {}'.format(
var, envvar))
except KeyError:
# Variable not defined
continue
if not envvar:
logger.debug(
'environment variable {} is set but has no value'.format(var))
continue
local_repos.append(envvar + pathend)
if not local_repos:
logger.debug('Local database is not found')
return local_repos
local_repos = [os.path.normpath(r) for r in local_repos]
retval = []
for dir_name in local_repos:
if os.path.isdir(dir_name) and dir_name not in retval:
logger.debug('Local database found: {}'.format(dir_name))
retval.append(dir_name)
else:
logger.debug('Local database {} is not found'.format(dir_name))
return retval
[docs]def find_local_pdb(pdbid, local_repos=None, verbose=False, caps_asis=False):
"""
Check a series of local directories and filenames for the PDB files.
First we look for current files ending in .gz or .Z, then obsolete
files with the same endings. The file name we search for is:
pdbXXXX.ent.Y where XXXX is the PDB code and Y is either gz or Z
Note: the location of the PDB directory can be specified via environment
variables; the order of precedence is:
* SCHRODINGER_PDB
* SCHRODINGER_THIRDPARTY
* SCHRODINGER/thirdparty (the default)
:type pdbid: str
:param pdbid: the PDB code of the desired file
:type local_repos: list of str
:param local_repos: the paths to the parent directories of each local
repository.
:type verbose: bool
:param verbose: True if debug messages should be printed out
:type caps_asis: bool
:param caps_asis: True if the capitalization of pdbid should be preserved,
False (default) if it should be converted to lowercase.
:rtype: str
:return: the path to an existing file ith the desired PDB code
"""
if not local_repos:
local_repos = find_local_repository(verbose=verbose)
if not local_repos:
return None
# The PDB files are stored with a bit of a mangled name
if caps_asis:
file_id = pdbid
else:
file_id = pdbid.lower()
filename = '.'.join(['pdb' + file_id, 'ent'])
# PDB files are stored in an additional subdirectory under current or
# obsolete_dir based on the middle two characters of the PDB code
div_dir = file_id[1:3]
# these are the subdirectories of each repository that we search
sub_dirs = ['data/structures/%s/pdb/%s' % pair for \
pair in [('divided', div_dir), ('obsolete', div_dir), ('local', '')]]
for repo in local_repos:
for dir_name in sub_dirs:
path = os.path.join(repo, dir_name)
if not os.path.exists(path):
continue
for extension in ['.gz', '.Z']:
name = os.path.join(path, filename + extension)
logger.debug('Looking for: {}'.format(name))
if os.path.exists(name):
logger.debug('Returning: {}'.format(name))
return name
return None
[docs]def download_pdb(pdb_code, biological_unit=False, try_as_cif=True):
"""
Download the PDB record from www.rcsb.org into the CWD. If the PDB is
too large to be downloaded as `*.pdb` file, it will be saved as `*.cif`.
:param pdb_code: Four character alphanumeric string for the PDB id.
:type pdb_code: str
:param biological_unit: If True, and the file needs to be downloaded,
then download the file at the biological unit URL, otherwise use
the typical record URL. Default is False, get the typical record.
# NOTE: This option is no longer used by PrepWizard, but still
# used by getpdb_utility.py ($SCHRODINGER/utilities/getpdb)
:type biological_unit: bool
:param try_as_cif: Whether to try downloading the file as CIF format if
the structure is too large to be represented in PDB format.
:type try_as_cif: bool
:return: Path to the downloaded file.
:rtype: str
:raises requests.HTTPError: if error in connection to RCSB or pdb ID does not exist
:raises RuntimeError: for other error retreiving file
"""
logger.info("Downloading %s..." % pdb_code)
try:
if biological_unit:
gz_file = download_file(pdb_code + '.pdb1.gz')
out_file = pdb_code + '_bio1.pdb'
else:
gz_file = download_file(pdb_code + '.pdb.gz')
out_file = pdb_code + '.pdb'
except requests.exceptions.HTTPError as err:
if not try_as_cif:
raise
if 'Not Found for url' in str(err):
# Structure may be too large, attempt downloading as a CIF format.
if biological_unit:
gz_file = download_file(pdb_code + '-assembly1.cif.gz')
out_file = pdb_code + '_bio1.cif'
else:
gz_file = download_file(pdb_code + '.cif.gz')
out_file = pdb_code + '.cif'
out_file = _decompress_gz_file(gz_file, out_file)
return out_file
[docs]def download_cif(pdb_code):
"""
Download `*.cif` file from Web for a given PDB code.
:param pdb_code: Four character alphanumeric string for the PDB id.
:type pdb_code: str
:return: Path to the downloaded file.
:rtype: str
:raises requests.HTTPError: if error in connection to RCSB or pdb ID does not exist
"""
gz_file = download_file(pdb_code + '.cif.gz')
out_file = pdb_code + '.cif'
out_file = _decompress_gz_file(gz_file, out_file)
return out_file
[docs]def requests_retry_session(max_retries=3,
backoff_factor=0.3,
status_forcelist=RETRY_HTTP_CODES,
session=None):
"""
Return a session to connect to a web url. In case of network failures the
session will retry (number of re-attempts allowed is specified by `retries`)
to connect to the url.
:param retries: Total number of retries allowed
:type retries: int
:param backoff_factor: Backoff factor to apply between attempts after the
second try. `urllib3` will sleep for:
{backoff factor} * (2 ** ({number of total retries} - 1))
seconds before making next attempt.
:type backoff_factor: float
:param status_forcelist: Http error status codes for which retry will happen
:type status_forcelist: iterable of int
:param session: A session object
:type session: requests.Session
:return: A session object
:rtype: requests.Session
"""
session = session or requests.Session()
retries = retry.Retry(total=max_retries,
read=max_retries,
connect=max_retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist)
adapter = HTTPAdapter(max_retries=retries)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
[docs]def retrieve_ent(pdbid):
"""
Retrieves the ENT file for the specified PDB ID from the third-party
database and copies it to the CWD. File path is returned.
Raises RuntimeError on error.
"""
# Ev:96694
schrodingerpath = os.environ['SCHRODINGER']
try:
thirdpartypath = os.environ['SCHRODINGER_THIRDPARTY']
except KeyError:
thirdpartypath = os.path.join(schrodingerpath, "thirdparty")
filename = "pdb%s.ent.gz" % pdbid
compressed_ent_file = os.path.join(thirdpartypath, "database", "pdb",
"structures", "all", "pdb", filename)
ent_file = filename.rstrip(".gz")
#print 'ORIGINAL TEMPLATE FILE:', compressed_ent_file
if not os.path.isfile(compressed_ent_file):
raise RuntimeError("Template file is missing: %s" % compressed_ent_file)
ent_file = _decompress_gz_file(compressed_ent_file, ent_file)
return ent_file
[docs]def download_ent(pdbid):
"""
Downloads the ENT file for the specified PDB ID from the RCSB web site,
and saves it to the CWD. File path is returned.
:raises requests.HTTPError: if error in connection to RCSB
:raises RuntimeError: for other error retreiving file
"""
# Ev:96694
compressed_ent_file = download_file(f'{pdbid}-sf.cif.gz')
try:
ent_file = _decompress_gz_file(compressed_ent_file, "%ssf.ent" % pdbid)
except Exception as err: # Ev:71880
msg = "Failed to save downloded data.\nERROR: %s" % err
raise RuntimeError(msg)
return ent_file
[docs]def get_ent(pdbid, source=AUTO):
"""
Attempts to get the specified ENT file from either the database or
the web, depending on the source option. Default is AUTO, which attempts
the database first, and then the web.
pdbid - string of 4 characters
source - one of: AUTO, DATABASE, WEB.
:raises requests.HTTPError: if error in connection to RCSB
:raises RuntimeError: for other error retreiving file
"""
# Ev:96694
if source == DATABASE:
return retrieve_ent(pdbid)
elif source == WEB:
return download_ent(pdbid)
elif source == AUTO:
try:
ent_file = retrieve_ent(pdbid)
except RuntimeError:
return download_ent(pdbid)
else:
return ent_file
else:
raise ValueError("Invalid source")
[docs]def open_filename(filename, mode, encoding=None):
"""
Opens a filename, or a temporary filename, if filename is not writeable.
The name may change and is accessible via name attribute on file object.
"""
try:
return open(filename, mode, encoding=encoding)
except IOError:
temp_dir = fileutils.get_directory_path(fileutils.TEMP)
return open(os.path.join(temp_dir, filename), mode, encoding=encoding)
[docs]def download_reflection_data(pdbid):
"""
Attempt to download reflection data
type pdbid: str
param pdbid: PDB ID
"""
ent_file = get_ent(pdbid)
cv_file = "%s.cv" % pdbid
cmd = ['refconvert', '-icif', ent_file, '-ocns', cv_file]
subprocess.call(cmd)
if os.path.isfile(cv_file):
# Ev:71921:
os.remove(ent_file)
return cv_file
else:
msg = """Downloaded reflection data to: %s. Failed to convert
to CV format using default refconvert options. For help, run:
$SCHRODINGER/utilities/refconvert -help.""" % ent_file
raise FileNotFoundError(msg)
if __name__ == '__main__':
for pdb_code in sys.argv[1:]:
download_pdb(pdb_code, True)