"""
Copy a PDB file out of the PDB directory, or, if not found, download it from
the rcsb repository: https://rcsb.org/.
Note: the location of the PDB directory can be specified via environment
variables; the order of precedence is:
* SCHRODINGER_PDB
* SCHRODINGER_THIRDPARTY
* SCHRODINGER/thirdparty (the default)
Copyright Schrodinger, LLC. All rights reserved.
"""
import gzip
import subprocess
import sys
import tempfile
import warnings
import requests
from schrodinger import structure
from schrodinger.protein import getpdb as pyget_pdb
from schrodinger.structutils import build
from schrodinger.utils import log
from schrodinger.utils.cmdline import SingleDashOptionParser
current_dir = 'data/structures/divided/pdb/'
obsolete_dir = 'data/structures/obsolete/pdb/'
# auto_write_lines are always written no matter what chain we are looking for
auto_write_lines = ['MODEL', 'ENDMDL', 'TITLE', 'REMARK']
error_code = 0
logger = pyget_pdb.logger
[docs]def parse_arguments(args):
usage = """
getpdb_utility.py
Copy a PDB file out of the PDB directory, or, if not found, download it from the
RCSB repository.
usage: $SCHRODINGER/utilities/getpdb [options] <PDB code>[<:Chain ID>] [<PDB code>[<:Chain ID>] ...]
<PDB code> is the PDB code for the desired file
<:Chain ID> is the Chain for which to extract the TITLE, ATOM, HETATM,
MODEL, ENDMDL and TER records. Use the word 'space' (without quotes) to
indicate chains whose name is the space character.
Examples:
getpdb 1xyz
retrieves the PDB file for the structure with PDB code 1xyz
getpdb 1xyz:A
retrieves the PDB file for the structure with PDB code 1xyz
and extracts TITLE, ATOM, HETATM, MODEL, ENDMDL and TER records
for Chain A
getpdb 1xyz:space
retrieves the PDB file for the structure with PDB code 1xyz
and extracts TITLE, ATOM, HETATM, MODEL, ENDMDL and TER records
for Chain " "
getpdb 1xyz 1e6o:L
retrieves the PDB file for the structure with PDB code 1xyz
Also retrieves the PDB file for the structure with PDB code 1e6o
and extracts TITLE, ATOM, HETATM, MODEL, ENDMDL and TER records
for Chain L
"""
with warnings.catch_warnings():
warnings.filterwarnings("ignore",
category=PendingDeprecationWarning,
message="SingleDashOptionParser")
parser = SingleDashOptionParser(usage=usage)
parser.add_option(
'-b',
action='store_true',
dest='biological_unit',
default=False,
metavar='<download biological units>',
help='Download the complete biological unit for each PDB code.'
' This option forces -r, and will ignore Chain IDs.')
parser.add_option(
'-l',
action='store_true',
dest='local_only',
default=False,
metavar='<local directory only>',
help='Only search the local directory for PDB files, do not' +
' retrieve from the web. (def. use local and web)')
parser.add_option(
'-r',
action='store_true',
dest='remote_only',
default=False,
metavar='<RCSB repository only>',
help='Only retrieve PDB files from the RCSB respository, ' +
'do not search local directories. (def. use local' + ' and web)')
parser.add_option('-d',
action='store_true',
dest='debug',
default=False,
metavar='<debugging output>',
help='Print debugging output. (def. do not print)')
parser.add_option(
'-verbose',
action='store_true',
default=False,
metavar='<verbose debugging output>',
help='Print verbose debugging output. (def. do not print)')
parser.add_option('-fasta',
action='store_true',
default=False,
help='Obsolete. Use the -format option instead')
parser.add_option(
'-format',
default='pdb',
help=
('Download file in specified format. Available options: pdb|cif|xml|sf|fasta. '
'Single chain download is only available for pdb format.'))
options, args = parser.parse_args(args)
if not args:
logger.error(sys.argv[0])
logger.error(__doc__)
if options.debug or options.verbose:
logger.setLevel(log.DEBUG)
return options, args
[docs]class WholePDB(object):
"""
Class to take care of the file work for entire PDB files.
"""
[docs] def __init__(self, code):
"""
Create a WholePDB object.
:type code: str
:param code: the PDB code for this file
"""
self.name = None
self.code = code
self.file = None
self.filename = code.lower() + '.pdb'
[docs] def openFile(self, web):
global error_code
"""
Opens the PDB file for writing.
If the source file is from the web, it is already exactly how we want
it, so no need to do anything.
:type web: bool
:param web: True if the source file is from the web.
"""
if not web:
try:
self.file = open(self.filename, 'w')
except IOError:
logger.error('Could not open {}'.format(self.filename))
error_code = 1
[docs] def write(self, line):
"""
If we are writing a file, add this line to it.
:type line: str
:param line: The line to add to the file
"""
if self.file:
self.file.write(line)
[docs] def closeFile(self):
"""
Close the file if it is open
"""
if self.file:
self.file.close()
[docs] def checkOK(self):
"""
This is just a stub routine for compatibility with the Chain class
:rtype: bool
:return: True
"""
logger.info('saved data to file: {}'.format(self.filename))
return True
[docs]class Chain(object):
"""
Class to handle splitting a chain out of a PDB file.
"""
[docs] def __init__(self, name, code, debug=False):
"""
Create a Chain object.
:type code: str
:param code: The PDB code of the parent PDB file
:type name: 1 character string
:param name: the Chain ID to extract
"""
self.name = name
self.code = code
self.file = None
self.lines = 0
self.atoms = False
self.hetatoms = False
self.hetatm_numbers = set()
self.filename = ""
self.debug = debug
[docs] def openFile(self, web):
global error_code
"""
Creates the filename to contain this chain data and opens it.
Add a remark to the top of the file to indicate it is only a partial PDB
file.
:type web: bool
:param web: unused, kept for compatability with WholePDB class.
"""
self.filename = self.code + '_' + self.name + '.pdb'
try:
self.file = open(self.filename, 'w')
except IOError:
logger.error('Could not open {}'.format(self.file))
error_code = 1
remark = 'REMARK ' + self.code + ' Chain ' + self.name + '\n'
self.file.write(remark)
[docs] def write(self, line):
"""
Write a line to the chain file if it applies to this chain.
:type line: str
:param line: the current line of the PDB file.
"""
def write_me():
self.file.write(line)
self.lines = self.lines + 1
if not self.file:
return
# Some lines are included in all chain files
for starter in auto_write_lines:
if line.startswith(starter):
write_me()
return
# If column 22 has this chain ID, write the ATOM, HETATM and TER lines
if line.startswith('ATOM') and line[21] == self.name:
write_me()
self.atoms = True
elif line.startswith('HETATM') and (line[21] == self.name or
line[21] == ' '):
# HETATM lines require more care; as requested by Ramy and Tyler we
# write out HETATM lines if they either match the requested chain
# or have no chain ID assigned at all
write_me()
# Store the hetatm numbers to use later to write out only the
# related conect records.
self.hetatm_numbers.add(line[7:12].strip())
if line[21] == self.name:
# Only count the lines that actually match this chain
self.hetatoms = True
elif line.startswith('TER') and line[21] == self.name:
write_me()
# SEQRES lines have the chain ID in column 12
elif line.startswith('SEQRES') and line[11] == self.name:
write_me()
elif line.startswith('CONECT'):
het_nums = line[7:].split()
for num in het_nums:
if num in self.hetatm_numbers:
write_me()
break
[docs] def checkOK(self):
"""
Check to see if everything went OK writing the file.
:rtype: bool
:return: True if everything checks out OK, false if not
"""
if not self.lines:
logger.error('ERROR: no matching records for chain {}'.format(
self.name))
return False
elif not self.atoms + self.hetatoms:
logger.error(
'ERROR: found neither ATOM nor HETATM records for chain {}'.
format(self.name))
return False
elif not self.atoms:
logger.error(
'WARNING: no matching ATOM records for chain {}'.format(
self.name))
logger.debug('Number of lines for chain {} {}'.format(
self.name, self.lines))
logger.info('saved data to file: {}'.format(self.filename))
return True
[docs] def closeFile(self):
"""
Add the END tag and close the chain file
"""
if self.file:
self.file.write('END')
self.file.close()
[docs]class Code(object):
"""
Class to handle all the various files for each PDB code.
"""
[docs] def __init__(self, code, debug=False):
"""
Create a Code object.
:type code: str
:param code: the 4-letter PDB code
"""
self.code = code
self.chains = set()
self.obsolete = False
self.debug = debug
[docs] def addChain(self, chain):
"""
Add another Chain ID to be extracted from the parent file.
:type chain: 1 character string or None
:param chain: The Chain ID to be extracted. If chain is None, then the
whole PDB file is desired.
"""
if chain is None:
self.chains.add(WholePDB(self.code))
else:
self.chains.add(Chain(chain, self.code, self.debug))
[docs] def sidechains(self):
"""
Check if any chains are to be extracted.
:rtype: bool
:return: True if any chains are to be extracted, False if not
"""
sidechain = False
for chain in self.chains:
if chain.name:
# chain.name is None for Whole PDB files
sidechain = True
return sidechain
[docs] def wholePDB(self):
"""
Returns True if the whole PDB file should be kept
"""
for chain in self.chains:
if not chain.name:
# chain.name is None for Whole PDB files
return True
return False
[docs] def findLocally(self, local_dirs):
"""
Check a series of local directories and filenames for the PDB files.
First we look for current files ending in .gz or .Z, then obsolete files
with the same endings. The file name we search for is:
pdbXXXX.ent.Y where XXXX is the PDB code and Y is either gz or Z
"""
return pyget_pdb.find_local_pdb(self.code.lower(),
local_repos=local_dirs,
verbose=False)
[docs] def handleObsolete(self):
"""
Print a warning if we are using a file from the obsolete directory
"""
logger.warning('Retrieving OBSOLETE entry for PDB-ID {}'.format(
self.code))
logger.warning(
'WARNING: OBSOLETE ENTRIES SHOULD BE USED WITH EXTREME CAUTION')
logger.warning(
'SINCE THEY DO NOT REPRESENT THE MOST CURRENT VERSION OF A STRUCTURE'
)
self.obsolete = True
[docs] def openFiles(self, web=False):
"""
Prepare all the chain and whole PDB files for writing.
:type web: bool
:param web: True if the source file is from the web, False if not. In
this case "from the web" means uncompressed and in the destination
directory with the correct name.
"""
for chain in self.chains:
chain.openFile(web)
[docs] def write(self, line):
"""
Write line to each of the appropriate files
:type line: str
:param line: the current line from the source PDB file.
"""
for chain in self.chains:
chain.write(line)
if line.startswith('OBSLTE'):
# Grab the name of the structures that have made this file obsolete
# and report them to the user.
rids = []
for index in range(31, 75, 5):
if not line[index:index + 4].isspace():
rids.append(line[index:index + 4])
if rids:
logger.warning(
'WARNING: THE ENTRY {} HAS BEEN REPLACED BY THE FOLLOWING STRUCTURE(S): {}'
.format(self.code, ' '.join(rids)))
[docs] def checkOK(self):
"""
Check to make sure all the files were written properly.
:rtype: bool
:return: True if all the files were written properly, False if there
were any problems.
"""
ok_list = [chain.checkOK() for chain in self.chains]
return all(ok_list)
[docs] def closeFiles(self):
"""
Close all the files
"""
for chain in self.chains:
chain.closeFile()
[docs] def merge_biologic_unit(self, pdb_file):
with structure.PDBReader(pdb_file, all_occ=True) as st_reader:
merged_st, _ = build.merge_subunit_sts(st_reader)
if merged_st.title == 'XXXX':
merged_st.title = self.code.upper()
merged_st.property['s_pdb_PDB_ID'] = self.code.upper()
merged_st.write(pdb_file)
[docs]def invalid_code(acode):
"""
Print a warning that a given code is invalid
"""
global error_code
logger.error(
'WARNING: invalid PDB ID >|{}|< given on command line - skipping'.
format(acode))
error_code = 1
[docs]def from_maestro(*args):
main(args)
[docs]def main(args):
options, args = parse_arguments(args)
global error_code
error_code = 0
# Check the validity of the PDB codes
# Each key in pdb codes is a PDB code, and each value is a Code class object
# that will handle writing the requested file.
pdb_codes = {}
# code_order just keeps track of the original order so we process them in the
# same order the user asked for them.
code_order = []
for code in args:
code = code.strip("'") # Maestro input is wrapped with single quotes
logger.debug('ID: >|{}|<'.format(code))
tokens = code.split(':')
# Check for valid PDB code: 4 characters beginning with a digit, and
# characters 2-4 should be alphanumeric.
if (len(tokens[0]) != 4 or not tokens[0][0].isdigit() or
not tokens[0][1:].isalnum()):
invalid_code(code)
continue
if len(tokens) == 1 or options.biological_unit:
# User has requested the whole PDB file
logger.debug('ID-Code: >|{}|<'.format(code))
# Add this code to the list to process
if code not in pdb_codes:
pdb_codes[code] = Code(code, options.debug or options.verbose)
code_order.append(code)
pdb_codes[code].addChain(None)
elif len(tokens) == 2:
# PDB code and chain specified. If the chain name is literally
# 'space', we replace 'space' with an actual space character.
if tokens[1] == 'space':
tokens[1] = ' '
logger.debug('ID-Code: >|{}|<\tChain-ID: >|{}|<'.format(
tokens[0], tokens[1]))
# Add this code & chain to the list to process
if tokens[0] not in pdb_codes:
pdb_codes[tokens[0]] = Code(tokens[0], options.debug or
options.verbose)
code_order.append(tokens[0])
pdb_codes[tokens[0]].addChain(tokens[1])
else:
invalid_code(code)
if options.format not in ("pdb", "cif", "xml", "sf", "fasta"):
logger.error(
"Option -format can only take pdb, cif, xml, sf or fasta as value")
return 1
# For backwards-compatibility:
if options.fasta:
options.format = "fasta"
if options.biological_unit:
if options.format != "pdb":
logger.error("Option -b can only be used with pdb format")
return 1
if options.local_only:
logger.error("Options -b and -l cannot be used together")
return 1
options.remote_only = True
if options.local_only and options.format != "pdb":
logger.error('Local download supported only for "pdb" format')
return 1
# Find the local repository
if not options.remote_only:
local_dirs = pyget_pdb.find_local_repository(verbose=options.verbose)
if not local_dirs and options.local_only:
logger.error('Local repository not found')
return 1
# Now process each requested PDB file
for index in code_order:
code = pdb_codes[index]
filename = False
web = False
line_count = 0
# Handle non-PDB files differently
if options.format != "pdb":
for chain in code.chains:
try:
if chain.name is not None:
raise RuntimeError(
f'{code.code}:{chain.name}: Downloading of specific '
'chains is only supported for pdb format')
filename = download_format(code.code, options.format)
# Check for empty file
with open(filename, 'rb') as ftest:
if not ftest.readlines():
raise RuntimeError(
'Empty file returned from getpdb')
except (RuntimeError, requests.HTTPError) as message:
logger.error(message)
else:
# Downloaded successfully
logger.info('saved data to file: {}'.format(filename))
continue
# First look for the file locally if allowed
if not options.remote_only:
filename = code.findLocally(local_dirs)
if not filename:
logger.debug('Failed to get from local database - {}'.format(
code.code))
# If not found yet, local on the web if allowed
if not filename and not options.local_only:
logger.debug('Downloading {} from rcsb repository.'.format(
code.code))
try:
filename = pyget_pdb.download_pdb(code.code,
options.biological_unit,
try_as_cif=False)
web = True
except (RuntimeError, requests.HTTPError) as message:
logger.error(message)
# Check if we've found the file yet
if not filename:
logger.error('Could not obtain PDB file for {}'.format(code.code))
error_code = 1
continue
if web:
if code.sidechains():
# file exists and we need to parse out a chain
myfile = open(filename, 'rb')
elif options.biological_unit:
code.merge_biologic_unit(filename)
logger.debug(
'Biological unit for PDB code {} completed successfully'.
format(code.code))
logger.info('saved data to file: {}'.format(filename))
continue
else:
# If we downloaded from the web, the whole file is already in the
# directory. If we don't have to split any sidechains or merge the
# biological unit, we are done with this PDB code.
logger.debug('PDB code {} completed successfully'.format(
code.code))
logger.info('saved data to file: {}.pdb'.format(code.code))
continue
else:
if filename.endswith('.gz'):
myfile = gzip.open(filename, 'rb')
else:
# A .Z file, for which there is no nice python way of handling
command = ['gzip', '-c', '-d', filename]
# Run the job and capture stdout:
myfile = tempfile.TemporaryFile()
subprocess.call(command, stdout=myfile, stderr=myfile)
myfile.seek(0)
# Write out the desired file line by line from the original file.
# Files are opened in binary mode, so we need to explicitly handle
# line separators to avoid duplication of '\r' on windows.
code.openFiles(web=web)
for line in myfile:
line = line.decode('utf-8').rstrip('\r\n')
code.write(f'{line}\n')
line_count = line_count + 1
myfile.close()
# Some sanity checks
if code.sidechains() and not web:
logger.debug(
'Number of lines before chain splitting {}'.format(line_count))
did_ok = code.checkOK()
if not did_ok:
error_code = 1
code.closeFiles()
return error_code