Source code for schrodinger.protein.tasks.pfam
import copy
import os
from schrodinger.application.msv import seqio
from schrodinger.models import parameters
from schrodinger.protein import alignment
from schrodinger.protein import sequence
from schrodinger.tasks import jobtasks
from schrodinger.tasks import tasks
[docs]class PfamTask(jobtasks.CmdJobTask):
DEFAULT_TASKDIR_SETTING = tasks.TEMP_TASKDIR
backend_name = 'pfam'
[docs] class Output(jobtasks.CmdJobTask.Output):
pfam: str
name: str
@tasks.preprocessor(order=tasks.AFTER_TASKDIR)
def _createInputFasta(self):
inp_file_name = self.getTaskFilename(self.name + '.fasta')
seq = copy.deepcopy(self.input.seq)
seq.removeAllGaps()
aln = alignment.ProteinAlignment([seq])
seqio.FastaAlignmentWriter.write(aln, inp_file_name)
@tasks.preprocessor(order=tasks.AFTER_TASKDIR)
def _createJobParamsFile(self):
job_file_name = self.getTaskFilename(self.name + '.inp')
with open(job_file_name, 'w') as job_file:
fasta_fname = self.getTaskFilename(self.name + '.fasta')
lines = '\n'.join([
f'QUERY_FILE "{fasta_fname}"',
'FORMAT m2io'
]) # yapf: disable
job_file.writelines(lines)
[docs] def makeCmd(self):
"""
@overrides: tasks.AbstractCmdTask
"""
return ['pfam', self.name]
@tasks.postprocessor
def _incorporateResults(self):
pfam_out_fname = self.getTaskFilename(self.name + '.out')
if not os.path.isfile(pfam_out_fname):
# The backend returns 0 even if there's no output
return False, "No output produced"
pfam, pfam_name = _extract_pfam_from_mmio_file(pfam_out_fname)
self.output.pfam = pfam
self.output.name = pfam_name
def _extract_pfam_from_mmio_file(mmio_fname):
"""
### Below is copied from MSV1. It's kind of gross but using the
### m2io utilities doesn't make things all that much better.
"""
with open(mmio_fname, "r") as pfam_file:
lines = pfam_file.readlines()
pfam_string = ""
seq_idx = 0
field_idx = 0
level = 0
fields = []
field_dict = {}
for line in lines:
if "m_psp_seq" in line:
seq_idx += 1
continue
if seq_idx == 2:
if ":::" in line:
level += 1
continue
if level == 0:
fields.append(line.strip())
elif level == 1:
if field_idx < len(fields):
field_dict[fields[field_idx]] = line.strip(' "\n')
field_idx += 1
elif level == 2:
codes = line.split()
code = codes[1].replace('\"', '')
if code == '':
code = ' '
pfam_string += str(code)
else:
break
return pfam_string, field_dict["s_psp_query_family_name"]