"""
Job handling functions for multiple sequence viewer.
Copyright Schrodinger, LLC. All rights reserved.
"""
# Contributors: Deacon Sweeney, Quentin McDonald, Piotr Rotkiewicz
import errno
import glob
import os
import subprocess
import sys
import tempfile
import time
from contextlib import contextmanager
from schrodinger.application.msv.seqio import GetSequencesException
from schrodinger.application.msv.seqio import SeqDownloader
from schrodinger.utils import fileutils
from schrodinger.utils import preferences
from . import constants
from . import fileio
from . import maestro as maestro_helpers
from . import residue
from . import sequence_group
from .sequence import Sequence
[docs]def stderr_msg(header, msg):
"""
When no display is available, we can re-route dialog popups to stderr
"""
sys.stderr.write('%s: %s' % (header, msg))
try:
from .dialogs import error_dialog
from .dialogs import warning_dialog
except ImportError:
warning_dialog = stderr_msg
error_dialog = stderr_msg
try:
from schrodinger.maestro import maestro
except:
maestro = None
try:
from schrodinger.job import jobcontrol
except ImportError:
jobcontrol = None
try:
from schrodinger.infra import mm
except ImportError:
mm = None
try:
from schrodinger.Qt import QtCore
except:
QtCore = None
_JOB_IS_RUNNING = False
_url_opener = None
# Job status flags.
JOB_STATUS_OK = 0
JOB_STATUS_MISSING = -1
JOB_STATUS_FAILED = -2
JOB_STATUS_KILLED = -3
BLAST_SERVER_URL_BASE = "https://blast.ncbi.nlm.nih.gov"
BLAST_SERVER_URL_QUERY = BLAST_SERVER_URL_BASE + "/Blast.cgi?"
NUM_HITS_ARG = '-num_hits'
[docs]def getSchrodingerPath():
"""
Returns a path to Schrodinger directory.
:rtype: string
:return: Schrodinger path or None if the path is not found.
"""
if "SCHRODINGER" not in os.environ:
if sequence_group:
error_dialog("Cannot Run Job",
"SCHRODINGER environment variable not defined.")
return None
schrodinger = os.getenv("SCHRODINGER")
if not schrodinger or \
len(schrodinger) < 1:
if sequence_group:
error_dialog("Cannot Run Job",
"Wrong SCHRODINGER environment variable.")
return None
if schrodinger[-1] != os.sep:
schrodinger += os.sep
return schrodinger
[docs]def checkJobIsRunning():
"""
Checks if there is another job currently running.
"""
global _JOB_IS_RUNNING
if _JOB_IS_RUNNING:
warning_dialog("Job Is Running",
"Please cancel current job or wait until it finishes.")
return True
return False
[docs]def processPendingEvents():
"""
Processess pending events from Qt and Maestro.
"""
if QtCore and QtCore.QCoreApplication.instance():
QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents, 100)
if maestro:
maestro.process_pending_events()
[docs]def killJob(job_id):
"""
Kills a specified job using jobcontrol.
"""
schrodinger = getSchrodingerPath()
if not schrodinger or not job_id:
return False
exe = schrodinger + "jobcontrol"
command = [exe, "-kill", job_id]
if os.name == "nt":
command.insert(0, "sh")
try:
subprocess.Popen(command)
except:
pass
return True
[docs]def run_job_via_jobcontrol(command_list,
log_file_name,
dialog_title="Job Progress",
initial_text="",
progress_dialog=None,
quiet=False,
no_jobid=False,
monitor_job=True,
settings=""):
"""
Runs a job using Job class of jobcontrol module.
:note: Eventually, we will migrate all jobs to use this function.
:type command: str
:param command: Command to be executed.
:rtype: int
:return: 0 on successfull execution, error code otherwise
"""
global _JOB_IS_RUNNING
if not jobcontrol:
return JOB_STATUS_MISSING
if maestro:
project_table = maestro.project_table_get()
project_name = project_table.fullname
command_list.insert(1, "-PROJ")
command_list.insert(2, project_name)
_JOB_IS_RUNNING = True
if progress_dialog:
progress_dialog.setButtonText("Cancel")
progress_dialog.show()
progress_dialog.setText(initial_text + "Launching a job...")
progress_dialog.startJob()
elif not quiet:
print(initial_text + "Launching a job...\n")
if QtCore and QtCore.QCoreApplication.instance():
QtCore.QCoreApplication.flush()
if no_jobid:
# Don't launch job if no_jobid is set.
# Instead launch it directly.
command_list.insert(1, " -NOJOBID ")
_JOB_IS_RUNNING = False
return run_job(command_list,
use_windows_shell=False,
progress_dialog=progress_dialog,
no_jobid=True)
if settings:
command_list[1:1] = settings["command_line"]
try:
job = jobcontrol.launch_job(command_list)
except Exception as _err:
# Log always to stderr
stderr_msg("Error running job", str(_err))
if progress_dialog:
progress_dialog.endJob()
_JOB_IS_RUNNING = False
return JOB_STATUS_FAILED
if not job:
if progress_dialog:
progress_dialog.endJob()
_JOB_IS_RUNNING = False
return JOB_STATUS_FAILED
job_id_text = "Job launched. Job ID = %s\n\n" % str(job.job_id)
init_text = initial_text + job_id_text
if maestro and monitor_job:
maestro.job_started(job.job_id)
show_monitor_panel = maestro.get_command_option("prefer",
"showmonitorpanel")
if show_monitor_panel == 'True':
maestro.command("showpanel monitor")
if progress_dialog:
progress_dialog.setText(init_text)
progress_dialog.raise_()
progress_dialog.setButtonText("Cancel")
if QtCore and QtCore.QCoreApplication.instance():
QtCore.QCoreApplication.flush()
elif not quiet:
print(job_id_text)
killed = False
ticks = 0
text = init_text
log_file = None
while not job.isComplete():
# Process all Qt events. Use a 0.1 s timeout.
if QtCore and QtCore.QCoreApplication.instance():
processPendingEvents()
if progress_dialog and (progress_dialog.result() or not \
progress_dialog.isRunning()):
job.kill()
# Sleep for a moment..
time.sleep(1)
killed = True
break
# Sleep for 0.05 s.
time.sleep(0.05)
ticks += 1
if ticks > 10:
# Update every 1/2 second.
ticks = 0
job.readAgain()
if log_file_name:
if not log_file:
try:
log_file = open(log_file_name, "r")
except:
log_file = None
else:
while True:
line = log_file.readline()
if not line:
break
text += line
if not progress_dialog and not quiet:
print(line.rstrip())
if progress_dialog:
progress_dialog.setText(text)
if log_file and progress_dialog:
log_file.seek(0)
text = init_text
while True:
line = log_file.readline()
if not line:
break
text += line
log_file.close()
progress_dialog.setText(text)
if progress_dialog:
progress_dialog.endJob()
_JOB_IS_RUNNING = False
if killed:
return JOB_STATUS_KILLED
# Download job output files:
job.download()
return JOB_STATUS_OK
[docs]def run_job(command,
use_windows_shell=True,
quiet=False,
progress_dialog=None,
no_jobid=False):
"""
Executes the specified command and waits until it returns.
It processess pending Qt events, so the main program is not blocked.
:type command: list
:param command: Command list to be executed.
:rtype: bool
:return: True on successfull execution, False otherwise
"""
tmp_file = tempfile.TemporaryFile(mode='w+t')
tmp_file_d = tmp_file.fileno()
if os.name == "nt" and use_windows_shell:
command.insert(0, "sh.exe")
is_darwin = sys.platform == "darwin"
process = None
try:
process = subprocess.Popen(command,
stdout=tmp_file_d,
universal_newlines=True)
except:
pass
if not process:
if not quiet:
error_dialog("Cannot execute job.", "Job could not be executed.")
return JOB_STATUS_FAILED
result = JOB_STATUS_OK
text = ""
job_id = None
if progress_dialog:
progress_dialog.show()
progress_dialog.raise_()
progress_dialog.setButtonText("Cancel")
progress_dialog.setText("Launching a job...")
progress_dialog.startJob()
elif not quiet:
print("Launching a local job...\n")
while process.poll() is None:
# Process all Qt events. Use 0.1 s timeout.
if QtCore and QtCore.QCoreApplication.instance():
QtCore.QCoreApplication.processEvents(QtCore.QEventLoop.AllEvents,
100)
if maestro and not is_darwin:
maestro.process_pending_events()
# Sleep for 0.1 s.
time.sleep(0.1)
tmp_file.seek(0, os.SEEK_SET)
text = "Running a local job...\n"
while True:
line = tmp_file.readline()
if not line:
break
text += line
if not progress_dialog and not quiet:
print(line)
if not job_id and not no_jobid:
dummy, dummy, job_id = line.partition("JobId:")
if progress_dialog:
progress_dialog.setText(text)
if progress_dialog.result() or not progress_dialog.isRunning():
process.kill()
killJob(job_id)
result = JOB_STATUS_KILLED
break
if progress_dialog:
progress_dialog.endJob()
if result == JOB_STATUS_OK and process.returncode != 0:
if not quiet:
error_dialog("Error", "Cannot complete the job.")
result = JOB_STATUS_FAILED
if result == JOB_STATUS_OK:
tmp_file.seek(0, os.SEEK_SET)
text = ""
while True:
line = tmp_file.readline()
if not line:
break
text += line
if progress_dialog:
progress_dialog.appendText(text + "\nJob completed.")
tmp_file.close()
return result
[docs]def run_clustal(widget,
sequence_group,
ss_constraints=False,
merge_groups=False,
progress_dialog=None,
global_alignment=True,
quick_alignment=True,
ignore_selection=False,
viewer=None):
"""
Runs Clustal alignment job and incorporates the results.
:type sequence_group: `SequenceGroup`
:param sequence_group: target sequence group
"""
clustal_exec = None
inp_file_name = None
aln_file = None
dnd_file = None
maestro_path = os.getenv("MAESTRO_EXEC")
if maestro_path and len(maestro_path) > 0:
clustal_exec = maestro_path + os.sep + "clustalw2"
if os.name == "nt":
clustal_exec += ".exe"
if not clustal_exec or not os.path.isfile(clustal_exec):
clustal_exec = os.getenv("CLUSTALW2")
if not clustal_exec or not os.path.isfile(clustal_exec):
clustal_exec = None
if not clustal_exec:
schrodinger = os.getenv("SCHRODINGER")
if schrodinger:
pattern = schrodinger + os.sep + 'maestro-v*' + os.sep + 'bin' + \
os.sep + '*' + os.sep + "clustalw2" + "*"
results = glob.glob(pattern)
if results:
clustal_exec = results[0]
if not clustal_exec:
error_dialog(
"Cannot Find ClustalW2 Executable",
"Please define CLUSTALW2 environment variable"
" pointing to the executable file.")
return False
any_selected = sequence_group.hasSelectedSequences()
if ignore_selection:
any_selected = False
partial_alignment = False
if not global_alignment and sequence_group.hasSelectedResidues():
partial_alignment = True
sequence_group.expandSelection()
if viewer:
viewer.updateView()
# Count number of sequences that will be aligned.
n_selected = 0
if any_selected:
for seq in sequence_group.sequences:
if seq.type == constants.SEQ_AMINO_ACIDS and seq.selected:
n_selected += 1
else:
for seq in sequence_group.sequences:
if seq.type == constants.SEQ_AMINO_ACIDS:
n_selected += 1
# Not enough sequences to do the alignment.
if n_selected <= 1:
error_dialog("Error Running Job",
"Cannot align less than two sequences.")
return False
# Make sure the alignment is padded with gaps.
sequence_group.removeTerminalGaps()
sequence_group.padAlignment()
if partial_alignment:
# Here we will perform series of alignments for every
# discontinouous selected block.
max_length = sequence_group.findMaxLength()
start_col = -1
end_col = -1
for pos in range(max_length):
column_selected = sequence_group.isColumnSelected(pos)
if start_col < 0:
if column_selected:
start_col = pos
elif end_col < 0:
if not column_selected or (pos == max_length - 1):
end_col = pos
if start_col >= 0 and end_col >= 0:
with tempfile.NamedTemporaryFile(prefix="msv_",
suffix=".aln",
mode="w",
delete=False) as inp_file:
inp_file_name = inp_file.name
if inp_file is None:
return False
fileio.save_clustal_file(sequence_group,
None,
file=inp_file,
start=start_col,
end=end_col,
ss_constraints=ss_constraints,
ignore_selection=ignore_selection)
aln_file = inp_file_name[:]
aln_file, ext = os.path.splitext(aln_file)
aln_file += "_out.aln"
command = [
clustal_exec,
"-infile=%s" % inp_file_name,
"-outorder=input",
"-outfile=%s" % aln_file,
]
if quick_alignment:
command.append("-quicktree")
if run_job(command,
use_windows_shell=False,
quiet=True,
progress_dialog=progress_dialog) != JOB_STATUS_OK:
error_dialog("Error Running Job",
"Cannot execute multiple alignment job!")
return False
if not fileio.load_clustal_file(sequence_group,
aln_file,
replace=False,
start=start_col,
end=end_col):
error_dialog(
"Error Running Job",
"Cannot read multiple sequence alignment job results.")
return False
pos = end_col
start_col = -1
end_col = -1
else:
with tempfile.NamedTemporaryFile(prefix="msv_",
suffix=".aln",
mode="w",
delete=False) as inp_file:
inp_file_name = inp_file.name
if inp_file is None:
return False
fileio.save_clustal_file(sequence_group,
None,
file=inp_file,
ss_constraints=ss_constraints,
ignore_selection=ignore_selection)
aln_file = inp_file_name[:]
aln_file, ext = os.path.splitext(aln_file)
aln_file += "_out.aln"
command = [
clustal_exec,
"-infile=%s" % inp_file_name, "-outorder=input",
"-outfile=%s" % aln_file
]
if quick_alignment:
command.append("-quicktree")
if run_job(command,
use_windows_shell=False,
quiet=True,
progress_dialog=progress_dialog) != JOB_STATUS_OK:
error_dialog("Error Running Job",
"Cannot run multiple sequence alignment job.")
return False
if not fileio.load_clustal_file(sequence_group, aln_file,
replace=False):
error_dialog(
"Error Running Job",
"Cannot read multiple sequence alignment job results.")
return False
if not any_selected:
# Load NJ tree only if all sequences and residues were aligned.
dnd_file = inp_file_name[:]
dnd_file, ext = os.path.splitext(dnd_file)
dnd_file += ".dnd"
fileio.load_DND_tree(dnd_file, sequence_group)
sequence_group.unselectAll()
sequence_group.minimizeAlignment()
if viewer:
viewer.displayMessage(str(n_selected) + " sequences aligned.")
try:
if inp_file_name:
os.remove(inp_file_name)
if aln_file:
os.remove(aln_file)
if dnd_file:
os.remove(dnd_file)
except:
pass
return True
[docs]def run_pfam(sequence_group, progress_dialog=None, job_settings=""):
"""
Runs a Pfam job and incorporates the results.
:type sequence_group: `SequenceGroup`
:param sequence_group: target sequence group
"""
if checkJobIsRunning():
return False
schrodinger = getSchrodingerPath()
if not schrodinger:
return False
cwd = os.getcwd()
tmpdir = None
if job_settings and job_settings["temporary_dir"]:
# Create a temporary directory where the job will run and change
# current directory to temporary.
try:
tmpdir = tempfile.mkdtemp()
os.chdir(tmpdir)
except:
error_dialog("Cannot Run Job", "Cannot create temporary directory.")
return False
any_selected = sequence_group.hasSelectedSequences()
for seq in sequence_group.sequences:
if ((any_selected and seq.selected) or not any_selected) and \
seq.type == constants.SEQ_AMINO_ACIDS:
inp_file_name = "msv_pfam.fasta"
seq.selected = True
fileio.save_fasta_file(sequence_group,
inp_file_name,
target_sequence=seq)
base_file_name, ext = os.path.splitext(inp_file_name)
job_file_name = base_file_name + ".inp"
# Create the job file.
lines = [
"QUERY_FILE %s\n" % (inp_file_name), "FORMAT m2io\n"
]
job_file = open(job_file_name, 'w')
job_file.writelines(lines)
job_file.close()
# Run Pfam job.
job_command = schrodinger + os.sep + "pfam"
# Launch the job.
if run_job_via_jobcontrol([job_command, base_file_name],
base_file_name + ".log",
progress_dialog=progress_dialog,
settings=job_settings) < 0:
os.chdir(cwd)
return False
out_file_name = base_file_name + ".out"
# Incorporate the Pfam job.
try:
pfam_file = open(out_file_name, "r")
lines = pfam_file.readlines()
pfam_file.close()
except:
error_dialog(
"Cannot Run Job",
"Cannot run Pfam job. Did you install Prime thirdparty tools?"
)
os.chdir(cwd)
return False
pfam_string = ""
seq_idx = 0
field_idx = 0
level = 0
fields = []
field_dict = {}
for line in lines:
if "m_psp_seq" in line:
seq_idx += 1
continue
if seq_idx == 2:
if ":::" in line:
level += 1
continue
if level == 0:
fields.append(line.strip())
elif level == 1:
if field_idx < len(fields):
field_dict[fields[field_idx]] = line.strip()
field_idx += 1
elif level == 2:
codes = line.split()
code = codes[1].replace('\"', '')
if code == '':
code = ' '
pfam_string += str(code)
else:
break
if "s_psp_query_family_name" in field_dict:
pfam = Sequence()
pfam.name = "Pfam " + field_dict["s_psp_query_family_name"]
pfam.short_name = pfam.name
for pos in range(len(pfam_string)):
res = residue.Residue()
res.code = pfam_string[pos]
res.color = (255, 255, 255)
res.inverted = True
res.sequence = pfam
pfam.residues.append(res)
pfam.type = constants.SEQ_ANNOTATION
pfam.annotation_type = constants.ANNOTATION_PFAM
pfam.parent_sequence = seq
seq.children.append(pfam)
try:
os.chdir(cwd)
if job_settings:
keep_files = job_settings["keep_files"]
else:
keep_files = False
if tmpdir and not keep_files:
removeTmpJobDir(tmpdir)
except:
error_dialog("Cannot Remove Temporary Files",
"Cannot remove temporary job files.")
return False
return True
[docs]def removeTmpJobDir(tmpdir, keep=[]): # noqa: M511
"""
Empties the temporary job directory and removes it.
Be careful with this function.
:type keep: list of strings
:param keep: Optional list of files that should not be deleted.
"""
for dirpath, dirnames, filenames in os.walk(tmpdir):
# Remove individual files.
for name in filenames:
if name in keep:
continue
filename = os.path.join(dirpath, name)
os.remove(filename)
if keep:
continue
# Remove the empty directory.
# Make at least 3 attempts before reporting an error. Ev:99762
for attempts in range(0, 3):
try:
os.rmdir(dirpath)
except OSError as err:
# Check if its a permission denied error
if err.errno == errno.EACCES:
# Sleep for 1 second and retry
time.sleep(1)
continue
else:
# Not a permission denied error.
raise
else:
# No exception raised. Directory removal successful
break
else:
# No more attempts left. Rethrow the exception
raise
break
[docs]def run_ssp(sequence_group, progress_dialog=None, job_settings=""):
"""
Runs a secondary structure prediction job and incorporates the results.
:type sequence_group: `SequenceGroup`
:param sequence_group: target sequence group
"""
if checkJobIsRunning():
return False
schrodinger = getSchrodingerPath()
if not schrodinger:
return False
cwd = os.getcwd()
tmpdir = None
if job_settings and job_settings["temporary_dir"]:
# Create a temporary directory where the job will run and change
# current directory to temporary.
try:
tmpdir = tempfile.mkdtemp()
os.chdir(tmpdir)
except:
error_dialog("Cannot Run Job", "Cannot create temporary directory.")
return False
any_selected = sequence_group.hasSelectedSequences()
for seq in sequence_group.sequences:
if ((any_selected and seq.selected) or not any_selected) and \
seq.type == constants.SEQ_AMINO_ACIDS:
inp_file_name = "msv_ssp.fasta"
seq.selected = True
fileio.save_fasta_file(sequence_group,
inp_file_name,
target_sequence=seq,
skip_gaps=True)
base_file_name, ext = os.path.splitext(inp_file_name)
job_file_name = base_file_name + ".inp"
# Create the job file.
lines = [
"QUERY_FILE %s\n" % (inp_file_name), "METHOD all\n"
]
job_file = open(job_file_name, 'w')
job_file.writelines(lines)
job_file.close()
# Run SSP.
job_command = schrodinger + "ssp"
# Launch the job.
if run_job_via_jobcontrol([job_command, base_file_name],
base_file_name + ".log",
progress_dialog=progress_dialog,
settings=job_settings) < 0:
error_dialog(
"Job Failed",
"Secondary structure prediction job could not be completed."
)
os.chdir(cwd)
return False
out_file_name = base_file_name + ".out"
# Incorporate the SSP job.
try:
ssp_file = open(out_file_name, "r")
lines = ssp_file.readlines()
ssp_file.close()
except:
error_dialog("Cannot Incorporate Results",
"Cannot incorporate job results.")
os.chdir(cwd)
return False
read_sequence = False
read_prediction = False
prediction_name = None
ssp_string = ""
for line in lines:
if read_sequence:
if line.find(":::") >= 0:
read_sequence = False
read_prediction = False
ssp = Sequence()
ssp.name = prediction_name + " SSP"
ssp.short_name = prediction_name + " SSP"
for pos in range(len(ssp_string)):
res = residue.Residue()
res.code = ssp_string[pos]
res.sequence = ssp
if res.code == 'H':
res.color = (240, 96, 64)
res.inverted = True
elif res.code == 'E':
res.color = (128, 240, 240)
res.inverted = True
else:
res.code = '-'
res.color = (255, 255, 255)
res.inverted = True
ssp.residues.append(res)
ssp.type = constants.SEQ_ANNOTATION
ssp.annotation_type = constants.ANNOTATION_SSP
ssp.parent_sequence = seq
seq.children.append(ssp)
seq.propagateGapsToChildren(ssp)
prediction_name = None
ssp_string = ""
else:
code = line[10]
ssp_string += str(code)
if read_prediction:
if not prediction_name:
prediction_name = line[2:].replace('\"', '').replace(
'\n', '')
if line.find(":::") >= 0:
read_sequence = True
if line.find("PRED_NAME") >= 0:
read_prediction = True
try:
os.chdir(cwd)
if job_settings:
keep_files = job_settings["keep_files"]
else:
keep_files = False
if tmpdir and not keep_files:
removeTmpJobDir(tmpdir)
except:
error_dialog("Cannot Remove Temporary Files",
"Cannot remove temporary job files.")
return False
[docs]def incorporatePDB(sequence_group,
pdb_file_name,
chain_id=None,
maestro_include=False,
new_list=None,
viewer=None):
status = True
has_atoms = False
if maestro:
format = "pdb"
basename = os.path.basename(pdb_file_name)
name, ext = os.path.splitext(basename)
if ".gz" in pdb_file_name:
# Uncompress if necessary.
command = 'gzip -c -d "' + pdb_file_name + '" > ' + name
try:
os.system(command)
except:
status = False
pdb_file_name = name
if status:
new_pdb_file_name = pdb_file_name + ".chain.pdb"
try:
pdb_file = open(pdb_file_name, "r")
except:
status = False
if status:
try:
new_pdb_file = open(new_pdb_file_name, "w")
except:
pdb_file.close()
status = False
if status:
lines = pdb_file.readlines()
new_lines = []
ignore = False
for line in lines:
if line.startswith("MODEL"):
model_num = int(line[5:14])
if model_num > 1:
# Incorporate only the first model.
ignore = True
if ignore and line.startswith("ENDMDL"):
ignore = False
continue
if ignore:
continue
if (line.startswith("ATOM") or line.startswith("HETATM")):
if (chain_id is None or line[21] == chain_id):
new_lines.append(line)
has_atoms = True
else:
new_lines.append(line)
pdb_file.close()
new_pdb_file.writelines(new_lines)
new_pdb_file.close()
if status and has_atoms:
if viewer:
tmp_status = viewer.auto_synchronize
viewer.auto_synchronize = False
current_entries = maestro_helpers.maestroGetListOfEntryIDs()
wsreplace = maestro.get_command_option("entryimport", "wsreplace")
wsinclude = maestro.get_command_option("entryimport", "wsinclude")
if maestro_include:
maestro.command("entryimport wsreplace=false format=" + format +
" \"" + new_pdb_file_name + "\"")
else:
maestro.command("entryimport wsreplace=false" +
" wsinclude=none format=" + format + " \"" +
new_pdb_file_name + "\"")
# Revert to original Maestro command settings.
maestro.command("entryimport" + " wsreplace=" + wsreplace +
" wsinclude=" + wsinclude)
if viewer:
viewer.auto_synchronize = True
maestro_helpers.maestroIncorporateEntries(
sequence_group,
what="all",
ignore=current_entries,
include=maestro_include,
align_func=pdb_align_structure_to_sequence)
if viewer:
viewer.auto_synchronize = tmp_status
if new_list is not None:
for seq in sequence_group.sequences:
if seq.maestro_entry_id and seq.maestro_entry_id not \
in current_entries:
new_list.append(seq)
if new_pdb_file and new_pdb_file_name:
os.remove(new_pdb_file_name)
return status
return False
[docs]def fetch_pdb(sequence_group,
entry_id,
maestro_include=False,
maestro_incorporate=True,
progress_dialog=None,
progress=None,
remote_query_dialog=None,
viewer=None):
"""
Fetches a PDB file from from a thirdparty directory or from
online repository if thirdpary is not available.
:type sequence_group: `SequenceGroup`
:param sequence_group: target sequence group.
:type entry_id: str
:param entry_id: PDB entry ID (4- or 5-letter code)
:rtype: string
:return: Return status, can be "ok", "error", "cancelled" or "invalid"
"""
if not entry_id or len(entry_id) < 4:
error_dialog("Error", "Invalid PDB entry ID.")
return "invalid"
entry_id = entry_id.replace('_', "")
pdb_id = entry_id[:4].lower()
chain_id = None
if len(entry_id) == 5:
chain_id = entry_id[4].upper()
pdb_file_name = None
# Test $SCHRODINGER location
if "SCHRODINGER" in os.environ:
schrodinger = os.getenv("SCHRODINGER")
# Test "divided" location.
pdb_file_name = os.path.join(schrodinger, "thirdparty", "database",
"pdb", "data", "structures", "divided",
"pdb", pdb_id[1:3],
"pdb" + pdb_id[:4] + ".ent")
if not os.path.isfile(pdb_file_name):
# Test compressed file
pdb_file_name += ".gz"
if not os.path.isfile(pdb_file_name):
pdb_file_name = None
# Test $SCHRODINGER_PDB location
if pdb_file_name is None:
if "SCHRODINGER_PDB" in os.environ:
schrodinger_pdb = os.getenv("SCHRODINGER_PDB")
pdb_file_name = os.path.join(schrodinger_pdb, "data", "structures",
"divided", "pdb", pdb_id[1:3],
"pdb" + pdb_id[:4] + ".ent")
if not os.path.isfile(pdb_file_name):
# Test compressed file
pdb_file_name += ".gz"
if not os.path.isfile(pdb_file_name):
pdb_file_name = None
# Test $SCHRODINGER_THIRDPARTY location
if pdb_file_name is None:
if "SCHRODINGER_THIRDPARTY" in os.environ:
schrodinger_third = os.getenv("SCHRODINGER_THIRDPARTY")
pdb_file_name = os.path.join(schrodinger_third, "database", "pdb",
"data", "structures", "divided", "pdb",
pdb_id[1:3],
"pdb" + pdb_id[:4] + ".ent")
if not os.path.isfile(pdb_file_name):
# Test compressed file
pdb_file_name += ".gz"
if not os.path.isfile(pdb_file_name):
pdb_file_name = None
status = "ok"
if pdb_file_name:
if maestro:
if not incorporatePDB(sequence_group,
pdb_file_name,
maestro_include=maestro_include,
chain_id=chain_id,
viewer=viewer):
status = "error"
# Try to load the PDB file
elif not fileio.load_PDB_file(
sequence_group,
pdb_file_name,
requested_chain_id=chain_id,
given_pdb_id=pdb_id,
align_func=pdb_align_structure_to_sequence):
status = "error"
return status
if remote_query_dialog and remote_query_dialog.query() != True:
return "cancelled"
if progress_dialog:
progress_dialog.updateProgress(progress=progress)
progress_dialog.show()
progress_dialog.raise_()
progress_dialog.setButtonText("Cancel")
progress_dialog.setText("Fetching PDB structure " + pdb_id +
" from http://www.rcsb.org...\n")
progress_dialog.refresh()
tmp_dir = tempfile.mkdtemp() + "/"
with fileutils.chdir(tmp_dir):
pdb_file_name = SeqDownloader.downloadPDB(pdb_id, remote_ok=True)
status = "ok"
if progress_dialog:
progress_dialog.appendText("Incorporating PDB file...\n")
if maestro and maestro_incorporate:
if not incorporatePDB(sequence_group,
pdb_file_name,
chain_id=chain_id,
maestro_include=maestro_include,
viewer=viewer):
status = "error"
else:
if not fileio.load_PDB_file(
sequence_group,
pdb_file_name,
requested_chain_id=chain_id,
align_func=pdb_align_structure_to_sequence):
status = "error"
if progress_dialog:
progress_dialog.appendText("Done.")
progress_dialog.refresh()
if progress is None:
progress_dialog.endJob()
return status
# http://www.ncbi.nlm.nih.gov/sviewer/viewer.fcgi?tool=portal&db=protein&val=12
# 9295&dopt=fasta&sendto=on&log$=seqview
[docs]def fetch_entrez(sequence_group,
entry_id,
progress_dialog=None,
remote_query_dialog=None):
"""
Fetches a protein sequence entry from Entrez repository.
:type sequence_group: `SequenceGroup`
:param sequence_group: target sequence group.
:type entry_id: str
:param entry_id: Entrez entry ID
:rtype: bool
:return: True on success, False otherwise.
"""
if remote_query_dialog and \
remote_query_dialog.query() != True:
return False
if progress_dialog:
progress_dialog.show()
progress_dialog.raise_()
progress_dialog.setText("Fetching a protein sequence " + entry_id +
" from " + BLAST_SERVER_URL_BASE + "\n")
tmp_dir = tempfile.mkdtemp() + "/"
with fileutils.chdir(tmp_dir):
try:
entrez_file_path = SeqDownloader.downloadEntrezSeq(entry_id)
fileio.load_fasta_file(sequence_group, entrez_file_path)
except GetSequencesException:
pass
if progress_dialog:
progress_dialog.appendText("Done.")
progress_dialog.hide()
[docs]def blast_get_default_settings():
"""
Returns a list of default BLAST settings.
"""
settings = {}
settings["database"] = "nr"
settings["matrix"] = "BLOSUM62"
settings["filter"] = "false"
settings["ehits"] = False
settings["evalue"] = 0.01
settings["word_size"] = 3
settings["progname"] = "blastp"
settings["i_threshold"] = 0.05
settings["n_iter"] = 1
settings["gap_open"] = 11
settings["gap_extend"] = 1
settings["taxid"] = ""
settings["remotely"] = None
return settings
[docs]@contextmanager
def fix_blast_search(remotely):
"""
Temporarily changes preferences for "ALLOW_WEB_BLAST" to the user's
preference when running a BLAST search, then resets back to the original
preference when the search is over.
:param remotely: Whether the user has requested a remote search or None to
leave global preference unchanged
:type remotely: bool or NoneType
"""
if remotely is None:
yield
return
elif not isinstance(remotely, bool):
raise TypeError("Remotely must be a bool or None, got "
f"{type(remotely)} ({remotely})")
try:
pref_obj = preferences.Preferences(preferences.SHARED)
orig_blast_pref = pref_obj.get("ALLOW_WEB_BLAST", False)
pref_obj.set("ALLOW_WEB_BLAST", remotely)
yield
finally:
pref_obj.set("ALLOW_WEB_BLAST", orig_blast_pref)
[docs]def blast_run(sequence_group,
settings=None,
progress_dialog=None,
results_dialog=None,
quiet=False,
remote_query_dialog=None,
job_settings=""):
"""
Runs a Blast search. Appends the results to the sequence group.
:type sequence_group: `SequenceGroup`
:param sequence_group: target sequence group.
:type nodialog: bool
:param nodialog: Do not display settings dialog.
:rtype: string
:return: status, can be "ok", "failed", or "cancelled"
"""
if checkJobIsRunning():
return "jobrunning"
any_selected = sequence_group.hasSelectedSequences()
# Make sure we have sequences present
found = False
for seq in sequence_group.sequences:
if seq.type == constants.SEQ_AMINO_ACIDS and \
(not any_selected or seq.selected):
found = True
break
if not found:
return "nosequence"
database = settings["database"]
matrix = settings["matrix"]
blast_filter = settings["filter"]
ehits = settings["ehits"]
expect = settings["evalue"]
word_size = settings["word_size"]
progname = settings["progname"]
i_threshold = settings["i_threshold"]
n_iter = settings["n_iter"]
gap_open = settings["gap_open"]
gap_extend = settings["gap_extend"]
remotely = settings["remotely"]
if results_dialog:
results_dialog.remote_search = False
if remotely and remote_query_dialog and not remote_query_dialog.query():
return "cancelled"
else:
if results_dialog:
results_dialog.remote_search = True
schrodinger = getSchrodingerPath()
if not schrodinger:
error_dialog("Cannot Run Job", "Cannot run BLAST search job.")
return "failed"
if not mm:
error_dialog("Cannot Run Job", "Cannot run BLAST search job.")
return "failed"
cwd = os.getcwd()
tmpdir = None
if job_settings and job_settings["temporary_dir"]:
# Create a temporary directory where the job will run and change
# current directory to temporary.
try:
tmpdir = tempfile.mkdtemp()
os.chdir(tmpdir)
except:
error_dialog("Cannot Run Job", "Cannot create temporary directory.")
return False
for seq in sequence_group.sequences:
if seq.type == constants.SEQ_AMINO_ACIDS and \
(not any_selected or seq.selected):
inp_file = tempfile.NamedTemporaryFile(prefix="msv_",
suffix=".fasta",
mode="w",
delete=False)
# TODO: Add an option for custom job name, and use that as
# the base of this file:
inp_file_name = "blast.seq"
fileio.save_fasta_file(sequence_group,
inp_file_name,
target_sequence=seq)
base_file_name, ext = os.path.splitext(inp_file_name)
job_file_name = base_file_name + ".inp"
if database[:3] != "pdb":
subset = "all"
else:
subset = "pdb"
lines = [
"PROGNAME %s\n" % (progname),
"DATABASE %s\n" % (database),
"SHOW %s\n" % (subset),
"QUERY_FILE %s\n" % (inp_file_name),
"FILTER %s\n" % (blast_filter),
"MATRIX %s\n" % (matrix),
"EXPECT %f\n" % (expect),
"WORD_SIZE %d\n" % (word_size),
"GAP_OPEN_COST %d\n" % (gap_open),
"GAP_EXTEND_COST %d\n" % (gap_extend)
]
if progname == "blastpgp":
lines.append("E_VALUE_THRESHOLD %f\n" % (i_threshold))
lines.append("MAX_ROUNDS %d\n" % (n_iter))
if ehits:
lines.append("EXPAND_HITS true\n")
job_file = open(job_file_name, "w")
job_file.writelines(lines)
job_file.close()
# Run BLAST
job_command = schrodinger + os.sep + "blast"
status = "ok"
cmd = [job_command, base_file_name]
if NUM_HITS_ARG in settings:
cmd.extend([NUM_HITS_ARG, str(settings[NUM_HITS_ARG])])
with fix_blast_search(remotely):
status = run_job_via_jobcontrol(cmd,
base_file_name + ".log",
quiet=quiet,
progress_dialog=progress_dialog,
monitor_job=False,
settings=job_settings)
if status == JOB_STATUS_OK:
output_file_name = base_file_name + ".out"
if not os.path.isfile(output_file_name):
os.chdir(cwd)
if not quiet:
error_dialog(
"Job Failed",
"BLAST search did not produce any results.")
return "failed"
# Thanks, Quentin, for the following block.
mm.mmseqio_initialize(mm.error_handler)
mm.mmseq_initialize(mm.error_handler)
mm.mmalign_initialize(mm.error_handler)
try:
handle = mm.mmseqio_open_file(output_file_name,
mm.MMSEQIO_READ,
mm.MMSEQIO_ANY)
except:
handle = -1
status = "failed"
if handle >= 0:
seqs = []
scores = []
evalues = []
identities = []
positives = []
gaps = []
short_names = []
names = []
accession = []
lengths = []
query_seqs = []
if results_dialog:
results_dialog.setSequenceGroup(sequence_group)
cnt = 1
while True:
try:
align = mm.mmseqio_read_alignment(handle, cnt)
cnt += 1
query = mm.mmalign_get_query_sequence(align)
query_seqs.append(mm.mmseq_get_all_codes(query))
seq = mm.mmalign_get_aligned_sequence(align, 1)
seqs.append(mm.mmseq_get_all_codes(seq))
short_names.append(mm.mmseq_get_name(seq))
names.append(mm.mmseq_get_description(seq))
accession.append(mm.mmseq_get_accession_number(seq))
scores.append(mm.mmalign_get_score(align, 1))
evalues.append(mm.mmalign_get_evalue(align, 1))
identities.append(
mm.mmalign_get_percent_identity(align, 1))
positives.append(
mm.mmalign_get_percent_positive(align, 1))
gaps.append(mm.mmalign_get_percent_gaps(align, 1))
lengths.append(len(seqs[-1]))
mm.mmalign_delete(align)
except mm.MmException as e:
break
mm.mmseq_terminate()
mm.mmseqio_terminate()
mm.mmalign_terminate()
# Create a popup dialog to control the number of blast
# results to load.
start_count = len(seqs)
if start_count == 0:
os.chdir(cwd)
warning_dialog("No sequences found",
"BLAST search returned 0 results.")
status = "failed"
break
if results_dialog:
results_dialog.resultsTable.setSortingEnabled(False)
results_dialog.clearContents()
for index in range(len(seqs)):
query_sequence = Sequence()
query_sequence.appendResidues(query_seqs[index])
seq = Sequence()
description = names[index]
if len(description) > 100:
description = description[:100] + "..."
seq.name = accession[index] + "\n" + description
seq.short_name = short_names[index]
seq.makeShortName(name=seq.short_name)
seq.appendResidues(seqs[index])
id = 100.0 * seq.calcIdentity(query_sequence, True,
False)
sim = 100.0 * seq.calcSimilarity(
query_sequence, True, False)
hom = 100.0 * seq.calcHomology(query_sequence, True,
False)
if results_dialog:
results_dialog.addRow(
seq,
name=names[index],
pdb_id=short_names[index],
score=scores[index],
coverage=gaps[index],
evalue=evalues[index],
length=lengths[index],
identity=id,
similarity=sim,
homology=hom,
tooltip=make_blast_table_tooltip(
names[index], query_sequence, seq))
else:
sequence_group.sequences.append(seq)
if results_dialog:
results_dialog.resultsTable.setSortingEnabled(True)
results_dialog.resultsTable.sortItems(
1, QtCore.Qt.AscendingOrder)
status = "ok"
else:
status = "cancelled"
break
try:
os.chdir(cwd)
if job_settings:
keep_files = job_settings["keep_files"]
else:
keep_files = False
if tmpdir and not keep_files:
removeTmpJobDir(tmpdir)
except:
pass
return status
[docs]def run_align_structures(sequence_group,
progress_dialog=None,
viewer=None,
job_settings=""):
"""
This function aligns all or selected structures in current sequence
group using external "ska" program. The alignment is then incorporated
into MSV and structures are reoriented in Maestro workspace
according to the alignment.
"""
schrodinger = getSchrodingerPath()
if not schrodinger:
return False
any_selected = sequence_group.hasSelectedSequences(exclude_reference=True)
# Build a list of structures to align.
structure_list = []
for seq in sequence_group.sequences:
if seq.type == constants.SEQ_AMINO_ACIDS and \
(seq.selected or not any_selected):
if seq.has_structure:
if seq.from_maestro:
if not maestro_helpers.maestroExtractStructureData(seq):
continue
structure_list.append(seq)
if len(structure_list) == 0:
error_dialog("Cannot Align Structures", "No valid structures found.")
return False
if len(structure_list) == 1:
error_dialog(
"Cannot Align Structures",
"More than one structure is required for structure alignment.")
return False
cwd = os.getcwd()
tmpdir = None
if job_settings and job_settings["temporary_dir"]:
# Create a temporary directory where the job will run and change
# current directory to temporary.
try:
tmpdir = tempfile.mkdtemp()
os.chdir(tmpdir)
except:
error_dialog("Cannot Run Job", "Cannot create temporary directory.")
return False
input_file_lines = []
input_file_lines.append("MODE align\n")
input_file_lines.append("ORDER seed\n")
for index, structure in enumerate(structure_list):
chain_id = structure.chain_id
if chain_id == ' ':
chain_id = '_'
structure_name = "structure%d_%c" % (index + 1, chain_id)
if index == 0:
input_file_lines.append("QUERY_FILE " + structure_name + ".pdb " +
str(index + 1) + "\n")
else:
input_file_lines.append("TEMPLATE " + structure_name + ".pdb " +
str(index + 1) + "\n")
structure_file_name = structure_name + ".pdb"
structure_file = open(structure_file_name, "w")
for res in structure.residues:
structure_file.writelines(res.structure)
structure_file.close()
input_file_lines.append("GAP_OPEN 2\n")
input_file_lines.append("GAP_DEL 1\n")
input_file_lines.append("SSE_WINLEN 5\n")
input_file_lines.append("SSE_MINSIM 1\n")
input_file_lines.append("SSE_MINLEN 2\n")
input_file_lines.append("ALISTRUCS_OUTFILE structalign-out.pdb\n")
inp_file_prefix = "structalign"
inp_file_name = inp_file_prefix + ".inp"
inp_file = open(inp_file_name, 'w')
inp_file.writelines(input_file_lines)
inp_file.close()
# Run SKA.
job_command = schrodinger + "ska"
# Launch the job.
result = run_job_via_jobcontrol([job_command, inp_file_prefix],
inp_file_prefix + ".log",
dialog_title="Protein Structure Alignment",
progress_dialog=progress_dialog,
settings=job_settings)
if result == -1:
os.chdir(cwd)
error_dialog("Job Failed",
"Could not execute the structure alignment job.")
return False
try:
out_file = open(inp_file_prefix + ".out")
lines = out_file.readlines()
out_file.close()
except:
os.chdir(cwd)
error_dialog("Job Failed",
"Could not read structure alignment output file.")
return False
for structure in structure_list:
structure._tmp_sequence = ""
# Analyze SKA output file
sse = False
seq = False
for line in lines:
line = line.strip()
if "sse:" in line:
sse = True
elif sse:
sse = False
seq = True
if len(line) < 3:
seq = False
if seq:
# This is the actual alignment line
# Extract the template index
idx, sep, line = line.partition(" ")
if line and len(line) > 0:
line = line.strip()
idx = int(idx)
pos, sep, line = line.partition(" ")
if line and len(line) > 0:
line = line.strip()
# We have actual alignment line in "line" now
idx -= 1
if idx >= 0 and idx < len(structure_list):
structure_list[idx]._tmp_sequence += line
for structure in structure_list:
structure.replaceSequence(structure._tmp_sequence)
structure._tmp_sequence = None
if maestro and viewer:
maestro_helpers.maestroSuperposition(viewer, show_panel=False)
viewer.deselectAll()
maestro.command("fit all")
viewer.contents_changed = True
viewer.maestroProjectChanged()
try:
os.chdir(cwd)
if job_settings:
keep_files = job_settings["keep_files"]
else:
keep_files = False
if tmpdir and not keep_files:
removeTmpJobDir(tmpdir)
except:
error_dialog("Cannot Remove Temporary Files",
"Cannot remove temporary job files.")
return False
[docs]def pdb_align_structure_to_sequence(sequence, structure, ssa=None):
"""
Aligns PDB structure to SEQRES sequence.
"""
if sequence.gaplessText() == structure.gaplessText():
sequence.residues = structure.residues
for res in sequence.residues:
res.sequence = sequence
for child in sequence.children:
for res in child.residues:
res.parent_sequence = sequence
return
test_sequence = sequence.copyForUndo()
for index, res in enumerate(test_sequence.residues):
res.tmp_index = index
length = len(test_sequence.residues)
new_children = []
if structure.children:
for child in structure.children:
new_child = child.copyForUndo()
new_child.residues = [
residue.Residue(sequence=new_child) for index in range(length)
]
new_children.append(new_child)
tmp_group = sequence_group.SequenceGroup()
structure_selected = structure.selected
test_sequence.selected = False
structure.selected = False
test_sequence.type = constants.SEQ_AMINO_ACIDS
tmp_group.sequences.append(test_sequence)
structure.type = constants.SEQ_AMINO_ACIDS
tmp_group.sequences.append(structure)
run_clustal(None, tmp_group)
structure.selected = structure_selected
for index, res in enumerate(test_sequence.residues):
if not structure.residues[index].is_gap and \
not res.is_gap:
sequence.residues[res.tmp_index] = structure.residues[index]
sequence.residues[res.tmp_index].structureless = False
sequence.residues[res.tmp_index].sequence = sequence
sequence.selected = structure.selected
if structure.children:
for child_index, child in enumerate(new_children):
child.residues[res.tmp_index] = structure.children[
child_index].residues[index]
child.residues[res.tmp_index].sequence = child
child.residues[res.tmp_index].parent_sequence = sequence
# Assign residue numbers
first = last = None
previous = None
for res in sequence.residues:
if res.structureless:
if not first:
first = res
last = res
else:
if first and last:
if previous:
# Assign numbers between previous and current
num1 = previous.num
num2 = res.num
icode = ord('A')
for idx in range(sequence.residues.index(first),
sequence.residues.index(last) + 1):
if num1 < num2:
num1 += 1
sequence.residues[idx].num = num1
else:
sequence.residues[idx].icode = chr(icode)
icode += 1
sequence.residues[idx].num = num2
else:
num2 = res.num
for idx in range(sequence.residues.index(last),
sequence.residues.index(first) - 1, -1):
num2 -= 1
sequence.residues[idx].num = num2
first = last = None
previous = res
if first and last:
num1 = previous.num
for idx in range(sequence.residues.index(first),
sequence.residues.index(last) + 1):
num1 += 1
sequence.residues[idx].num = num1
structure.residues = sequence.residues
for index, child in enumerate(new_children):
structure.children[index].residues = child.residues
return