Source code for schrodinger.protein.tasks.pfam
import copy
import os
from schrodinger.application.msv import seqio
from schrodinger.models import parameters
from schrodinger.protein import alignment
from schrodinger.protein import sequence
from schrodinger.tasks import jobtasks
from schrodinger.tasks import tasks
[docs]class PfamTask(jobtasks.CmdJobTask):
    DEFAULT_TASKDIR_SETTING = tasks.TEMP_TASKDIR
    backend_name = 'pfam'
[docs]    class Output(jobtasks.CmdJobTask.Output):
        pfam: str
        name: str 
    @tasks.preprocessor(order=tasks.AFTER_TASKDIR)
    def _createInputFasta(self):
        inp_file_name = self.getTaskFilename(self.name + '.fasta')
        seq = copy.deepcopy(self.input.seq)
        seq.removeAllGaps()
        aln = alignment.ProteinAlignment([seq])
        seqio.FastaAlignmentWriter.write(aln, inp_file_name)
    @tasks.preprocessor(order=tasks.AFTER_TASKDIR)
    def _createJobParamsFile(self):
        job_file_name = self.getTaskFilename(self.name + '.inp')
        with open(job_file_name, 'w') as job_file:
            fasta_fname = self.getTaskFilename(self.name + '.fasta')
            lines = '\n'.join([
                f'QUERY_FILE        "{fasta_fname}"',
                'FORMAT    m2io'
            ]) # yapf: disable
            job_file.writelines(lines)
[docs]    def makeCmd(self):
        """
        @overrides: tasks.AbstractCmdTask
        """
        return ['pfam', self.name] 
    @tasks.postprocessor
    def _incorporateResults(self):
        pfam_out_fname = self.getTaskFilename(self.name + '.out')
        if not os.path.isfile(pfam_out_fname):
            # The backend returns 0 even if there's no output
            return False, "No output produced"
        pfam, pfam_name = _extract_pfam_from_mmio_file(pfam_out_fname)
        self.output.pfam = pfam
        self.output.name = pfam_name 
def _extract_pfam_from_mmio_file(mmio_fname):
    """
    ### Below is copied from MSV1. It's kind of gross but using the
    ### m2io utilities doesn't make things all that much better.
    """
    with open(mmio_fname, "r") as pfam_file:
        lines = pfam_file.readlines()
    pfam_string = ""
    seq_idx = 0
    field_idx = 0
    level = 0
    fields = []
    field_dict = {}
    for line in lines:
        if "m_psp_seq" in line:
            seq_idx += 1
            continue
        if seq_idx == 2:
            if ":::" in line:
                level += 1
                continue
            if level == 0:
                fields.append(line.strip())
            elif level == 1:
                if field_idx < len(fields):
                    field_dict[fields[field_idx]] = line.strip(' "\n')
                    field_idx += 1
            elif level == 2:
                codes = line.split()
                code = codes[1].replace('\"', '')
                if code == '':
                    code = ' '
                pfam_string += str(code)
            else:
                break
    return pfam_string, field_dict["s_psp_query_family_name"]