Source code for schrodinger.ui.sequencealignment.predictors

# Copyright Schrodinger, LLC. All rights reserved.

import copy
import glob
import os
import subprocess

from . import constants
from .jobs import blast_get_default_settings
from .jobs import blast_run
from .sequence import Sequence
from .sequence_group import SequenceGroup

predictors_initialized = False


[docs]def init_predictors(): global predictors_initialized if predictors_initialized: return os_cpu = os.getenv("OS_CPU") if not os_cpu: mmshare_exec = os.getenv("MMSHARE_EXEC") if mmshare_exec: first, os_cpu = os.path.split(mmshare_exec) schrodinger = os.getenv("SCHRODINGER") if not os_cpu or not schrodinger: return False # Get Prime library path pattern = os.path.join(schrodinger, 'psp-v*', 'lib', os_cpu) path_list = glob.glob(pattern) if not path_list: return False if os.name == 'nt' or os_cpu.startswith("Darwin"): lib_path = os.getenv("PATH") lib_path += ':' + path_list[0] os.environ["PATH"] = lib_path else: lib_path = os.getenv("LD_LIBRARY_PATH") lib_path += ':' + path_list[0] os.environ["LD_LIBRARY_PATH"] = lib_path predictors_initialized = True return True
[docs]def get_predictor_path(predictor_name): os_cpu = os.getenv("OS_CPU") if not os_cpu: mmshare_exec = os.getenv("MMSHARE_EXEC") if mmshare_exec: first, os_cpu = os.path.split(mmshare_exec) schrodinger = os.getenv("SCHRODINGER") if not os_cpu or not schrodinger: return None pattern = os.path.join(schrodinger, 'psp-v*', 'bin', os_cpu, predictor_name) if os.name == "nt": pattern += ".exe" path_list = glob.glob(pattern) if not path_list: return None return path_list[0]
[docs]def get_predictor_data_path(predictor_name): os_cpu = os.getenv("OS_CPU") if not os_cpu: mmshare_exec = os.getenv("MMSHARE_EXEC") if mmshare_exec: first, os_cpu = os.path.split(mmshare_exec) schrodinger = os.getenv("SCHRODINGER") if not os_cpu or not schrodinger: return None pattern = os.path.join(schrodinger, 'psp-v*', 'data', 'predictors', predictor_name) path_list = glob.glob(pattern) if not path_list: return None return path_list[0]
[docs]def make_model_def_file(model_data_dir, model_file_name, predictor): model_list = [] for model in glob.glob(os.path.join(model_data_dir, "*.model")): model_list.append(model + " \n") if not model_list: return -1 try: model_file = open(model_file_name, "w") except: return -2 if predictor == "sspro": model_file.write(str(len(model_list)) + " 3\n") elif predictor == "accpro": model_file.write(str(len(model_list)) + " 20\n") elif predictor == "dispro": model_file.write(str(len(model_list)) + " 2\n") elif predictor == "dompro": model_file.write(str(len(model_list)) + " 2\n") elif predictor == "dipro": model_file.write(str(len(model_list)) + " 0.5\n") elif predictor == "betapro": model_file.write(str(len(model_list)) + "\n") model_list.sort() try: model_file.writelines(model_list) model_file.close() except: return -3 return 0
[docs]def generate_flat_alignment(sequence, alignment_file_name, progress_dialog=None, remote_query_dialog=None): """ Runs a BLAST job on nr database in order to generate a flat alignment file required by the predictors. In case of failure it just writes the input sequence to the alignment file. """ group = SequenceGroup() group.sequences.append(sequence) group.reference = sequence settings = blast_get_default_settings() # We can use PDB, but it should be a user-controllable setting # Will implement for 2012 settings["database"] = "nr" result = blast_run(group, settings, progress_dialog=progress_dialog, quiet=True, remote_query_dialog=remote_query_dialog) if result != "ok": settings["remotely"] = True result = blast_run(group, settings, progress_dialog=progress_dialog, remote_query_dialog=remote_query_dialog) if result == "ok": group.minimizeAlignment(query_only=True) sequences = [] for sequence in group.sequences: if sequence.isValidProtein(): sequences.append(sequence.text()) if not sequences: return False try: alignment_file = open(alignment_file_name, "w") except: return False alignment_file.write(str(len(sequences)) + "\n") for seq in sequences: seq = seq.replace('~', '.') alignment_file.write(seq + "\n") alignment_file.close() return True else: return False
[docs]def write_input_file(predictor, alignment_name, model_name, sequence, pred_ss=None, pred_acc=None): """ Writes an input file for a specific predictor. """ input_file_name = predictor + ".inp" try: input_file = open(input_file_name, "w") except: return None if predictor == "sspro": input_file.write("1 20 3\n") input_file.write(alignment_name + "\n") input_file.write(sequence + "\n") fake_ssa = "".join('H' for c in sequence) input_file.write(fake_ssa + "\n") elif predictor == "accpro": input_file.write("1 20 3\n") input_file.write(alignment_name + "\n") input_file.write(sequence + "\n") elif predictor == "dispro": input_file.write("1 25 2\n") input_file.write(alignment_name + " " + str(len(sequence)) + "\n") input_file.write(sequence + "\n") if pred_ss and len(pred_ss) == len(sequence): input_file.write(pred_ss + "\n") else: fake_ssa = "".join('H' for c in sequence) input_file.write(fake_ssa + "\n") if pred_acc and len(pred_acc) == len(sequence): input_file.write(pred_acc + "\n") else: fake_acc = "".join('e' for c in sequence) input_file.write(fake_acc + "\n") fake_pred = "".join('N' for c in sequence) input_file.write(fake_pred + "\n") elif predictor == "dompro": input_file.write("1 25 3\n") input_file.write(alignment_name + " " + str(len(sequence)) + "\n") input_file.write(sequence + "\n") if pred_ss and len(pred_ss) == len(sequence): input_file.write(pred_ss + "\n") else: fake_ssa = "".join('H' for c in sequence) input_file.write(fake_ssa + "\n") if pred_acc and len(pred_acc) == len(sequence): input_file.write(pred_acc + "\n") else: fake_acc = "".join('e' for c in sequence) input_file.write(fake_acc + "\n") input_file.write("1\n") fake_pred = "".join('N' for c in sequence) input_file.write(fake_pred + "\n") elif predictor == "dipro": input_file.write("1\n") input_file.write(alignment_name + "\n" + str(len(sequence)) + "\n") input_file.write(sequence + "\n") if pred_ss and len(pred_ss) == len(sequence): input_file.write(pred_ss + "\n") else: fake_ssa = "".join('H' for c in sequence) input_file.write(fake_ssa + "\n") if pred_acc and len(pred_acc) == len(sequence): pred_acc = pred_acc.replace('b', '-') input_file.write(pred_acc + "\n") else: fake_acc = "".join('e' for c in sequence) input_file.write(fake_acc + "\n") elif predictor == "betapro": input_file.write("1 20 3\n") input_file.write(alignment_name + "\n") input_file.write(str(len(sequence)) + "\n") input_file.write(sequence + "\n") if pred_ss and len(pred_ss) == len(sequence): input_file.write(pred_ss + "\n") else: fake_ssa = "".join('H' for c in sequence) input_file.write(fake_ssa + "\n") input_file.close() return input_file_name
[docs]def has_predictor(predictor): predictor_exe = predictor # Proper SSpro executable is "sspro4", actually. if predictor == "sspro": predictor_exe += '4' # Get predictor executable and data paths. exe_path = get_predictor_path(predictor_exe) data_path = get_predictor_data_path(predictor) if exe_path and data_path: return True return False
[docs]def run(sequence, predictor_list, progress_dialog=None, remote_query_dialog=None): if not sequence: return "failed" init_predictors() sorted_predictor_list = [] sequence_string = sequence.gaplessText() if not sequence_string: return "failed" c_count = sequence_string.count('C') if len(predictor_list) == 1 and \ predictor_list[0] == "dipro" and c_count < 2: return "cancelled" # We have to run these two predictors anyway, these are # required by the others, unless we are calling sspro # or accpro, which can be run on theirs own. if len(predictor_list) > 1 or "accpro" not in predictor_list: sorted_predictor_list.append("sspro") if len(predictor_list) > 1 or "sspro" not in predictor_list: sorted_predictor_list.append("accpro") if "dompro" in predictor_list: sorted_predictor_list.append("dompro") if "dipro" in predictor_list: sorted_predictor_list.append("dipro") if "dispro" in predictor_list: sorted_predictor_list.append("dispro") if "betapro" in predictor_list: sorted_predictor_list.append("betapro") results = {} alignment_file_name = "predictor_alignment" pred_ss = pred_acc = None has_alignment = False # List of temporary job files tmp_files = [] for predictor in sorted_predictor_list: # Ignore S-S prediction if not enough cysteines present if predictor == "dipro" and c_count < 2: continue predictor_exe = predictor # Proper SSpro executable is "sspro4", actually. if predictor == "sspro": predictor_exe += '4' # Get predictor executable and data paths. exe_path = get_predictor_path(predictor_exe) data_path = get_predictor_data_path(predictor) if not exe_path or not data_path: continue # Prepare a model file. model_file_name = predictor + "_model.def" result = make_model_def_file(data_path, model_file_name, predictor) if result != 0: continue tmp_files.append(model_file_name) # Write input file input_file_name = write_input_file(predictor, alignment_file_name, model_file_name, sequence_string, pred_ss=pred_ss, pred_acc=pred_acc) if not input_file_name: continue tmp_files.append(input_file_name) cmd = [] if predictor == "betapro": cmd += [ exe_path, model_file_name, input_file_name, alignment_file_name ] elif predictor == "dipro": cmd += [ exe_path, model_file_name, input_file_name, alignment_file_name, '2' ] else: cmd += [exe_path, model_file_name, input_file_name, "." + os.sep] if predictor == "sspro": cmd.append("0") elif predictor == "accpro": cmd += ["2", "5"] elif predictor in ("dispro", "dompro"): pass if not has_alignment: sequence_copy = copy.deepcopy(sequence) has_alignment = generate_flat_alignment( sequence_copy, alignment_file_name, progress_dialog=progress_dialog, remote_query_dialog=remote_query_dialog) if has_alignment: tmp_files.append(alignment_file_name) if not has_alignment: return "cancelled" try: with open(predictor + ".out", 'w') as f: subprocess.call(cmd, stdout=f) except: continue try: prediction_file = open(predictor + ".out") except: continue result = prediction_file.readlines() prediction_file.close() tmp_files.append(predictor + ".out") if result: results[predictor] = result else: continue if predictor == "sspro": pred_ss = results[predictor][1].strip() elif predictor == "accpro": pred_acc = results[predictor][0].strip() final_sequences = [] if "sspro" in predictor_list and "sspro" in results: pred = results["sspro"][1].strip() seq = Sequence() seq.type = constants.SEQ_ANNOTATION seq.annotation_type = constants.ANNOTATION_SSP seq.appendResidues(pred) seq.name = "Secondary Structure Prediction" seq.short_name = "SSP" for res in seq.residues: if res.code == 'H': res.color = (240, 96, 64) res.inverted = True elif res.code == 'E': res.color = (128, 240, 240) res.inverted = True else: res.code = '-' res.color = (255, 255, 255) res.inverted = True final_sequences.append(seq) if "accpro" in predictor_list and "accpro" in results: pred = results["accpro"][0].strip() seq = Sequence() seq.type = constants.SEQ_ANNOTATION seq.annotation_type = constants.ANNOTATION_ACC seq.appendResidues(pred) seq.name = "Solvent Accessibility Prediction" seq.short_name = "ACC" for res in seq.residues: if res.code == 'B': res.code = 'b' res.color = (64, 127, 191) res.inverted = True elif res.code == 'E': res.code = 'e' res.color = (255, 255, 64) res.inverted = True final_sequences.append(seq) if "dompro" in results: pred = results["dompro"][1].strip() seq = Sequence() seq.type = constants.SEQ_ANNOTATION seq.annotation_type = constants.ANNOTATION_DOM seq.appendResidues(pred) seq.name = "Domain Arrangement Prediction" seq.short_name = "DOM" final_sequences.append(seq) for res in seq.residues: if res.code == 'N': res.color = (127, 127, 127) res.inverted = True else: res.color = (255, 0, 0) res.inverted = True res.code = ' ' if "dispro" in results: pred = results["dispro"][1].strip() values = results["dispro"][2].strip().split(' ') seq = Sequence() seq.type = constants.SEQ_ANNOTATION seq.annotation_type = constants.ANNOTATION_DIS seq.appendResidues(pred) seq.name = "Disordered Regions Prediction" seq.short_name = "DIS" for index, res in enumerate(seq.residues): val = float(values[index]) res.value = val if res.code == 'N': res.color = (191, 191, 191) res.inverted = True else: if val > 0.9: res.color = (255, 0, 0) else: res.color = (255, 127, 0) res.inverted = True res.code = ' ' final_sequences.append(seq) if "dipro" in results: read_bonds = False bond_list = [] for line in results["dipro"]: if line.startswith("Bond_Index"): read_bonds = True continue if read_bonds: bonds = line.split() if len(bonds) >= 3: bond_list.append((int(bonds[1]) - 1, int(bonds[2]) - 1)) if bond_list: ss_str = "".join('A' for c in pred_ss) seq = Sequence() seq.type = constants.SEQ_ANNOTATION seq.annotation_type = constants.ANNOTATION_CCB seq.appendResidues(ss_str) seq.name = "Disulfide Bridges Prediction" seq.short_name = "SSBPRED" seq.bond_list = bond_list seq.height = int(0.5 * len(bond_list) + 0.5) for index, res in enumerate(seq.residues): res.code = ' ' final_sequences.append(seq) if "betapro" in results and pred_ss: beta_p = [] n_beta = 0 length = sequence.gaplessLength() for index in range(length): if pred_ss[index] == 'E': beta_p.append(n_beta) n_beta += 1 else: beta_p.append(-1) if n_beta > 0: tmp_map = [] lines = results["betapro"] for line in lines: values = line.split() for val in values: tmp_map.append(float(val)) beta_map = [] length = sequence.gaplessLength() for y in range(length): for x in range(length): if beta_p[y] >= 0 and beta_p[x] >= 0: beta_map.append(1.0 - tmp_map[beta_p[y] * n_beta + beta_p[x]]) else: beta_map.append(1.0) sequence.beta_map = beta_map try: for file in tmp_files: os.remove(file) except: pass return final_sequences