Source code for schrodinger.protein.tasks.kinase
import csv
import os
from schrodinger import structure
from schrodinger.models import parameters
from schrodinger.protein import residue
from schrodinger.protein import sequence
from schrodinger.protein.annotation import KinaseConservation
from schrodinger.protein.annotation import KinaseFeatureLabel
from schrodinger.structutils import analyze
from schrodinger.tasks import jobtasks
from schrodinger.tasks import tasks
from schrodinger.utils import csv_unicode
try:
    from schrodinger.application.prime.packages import kinase_annotation
except ImportError:
    kinase_annotation = None
[docs]class KinaseFeatureTask(tasks.ComboSubprocessTask):
    DEFAULT_TASKDIR_SETTING = tasks.TEMP_TASKDIR
[docs]    class Output(parameters.CompoundParam):
        annotation: list 
    BACKEND_SINGLE_LETTER_CODE_TO_KINASE_FEATURE = {
        'G': KinaseFeatureLabel.GLYCINE_RICH_LOOP,
        'A': KinaseFeatureLabel.ALPHA_C,
        'K': KinaseFeatureLabel.GATE_KEEPER,
        'H': KinaseFeatureLabel.HINGE,
        'L': KinaseFeatureLabel.LINKER,
        'R': KinaseFeatureLabel.HRD,
        'C': KinaseFeatureLabel.CATALYTIC_LOOP,
        'D': KinaseFeatureLabel.DFG,
        'T': KinaseFeatureLabel.ACTIVATION_LOOP,
        '~': KinaseFeatureLabel.NO_FEATURE
    }
[docs]    def mainFunction(self):
        seq = self.input.seq
        seq_str = str(seq).replace(seq.gap_char, "")
        seq_str = seq_str.upper()
        try:
            ret = kinase_annotation.KinaseAnnotation.process_annotation(seq_str)
        except RuntimeError:
            # Backend raises RuntimeError for non-kinase sequences
            return
        _gapless_sequence, backend_annotation, _annotation_head = ret
        self.output.annotation = backend_annotation 
[docs]    def getKinaseFeatures(self):
        backend_annotation = self.output.annotation
        if not backend_annotation:
            return {}
        # Create the annotation mapping with all gaps included from the
        # original sequence.
        annotation = [
            self.BACKEND_SINGLE_LETTER_CODE_TO_KINASE_FEATURE[code]
            for code in backend_annotation
        ]
        gap_idxs = [ii for ii, res in enumerate(self.input.seq) if res.is_gap]
        for idx in gap_idxs:
            annotation.insert(idx, KinaseFeatureLabel.NO_FEATURE)
        kinase_feature_map = {}
        for res, feature in zip(self.input.seq, annotation):
            if res.is_gap:
                continue
            kinase_feature_map[res] = feature
        return kinase_feature_map  
[docs]class KinaseConservationTask(jobtasks.CmdJobTask):
    """
    Task to run kinase binding site conservation analysis on the specified
    sequence and ligand.
    """
    DEFAULT_TASKDIR_SETTING = tasks.TEMP_TASKDIR
    _receptor_fname = "receptor.maegz"
    out_fname = "receptor_conservation.csv"
    @tasks.preprocessor(order=tasks.BEFORE_TASKDIR)
    def _checkInputs(self):
        if self.input.seq is None:
            return False, "Receptor sequence must be set"
        if not self.input.seq.hasStructure() or not self.input.seq.entry_id:
            return False, "Receptor sequence must have structure"
        if not self.input.ligand_asl:
            return False, "Ligand ASL must be set"
        st = self.input.seq.getStructure()
        if not analyze.evaluate_asl(st, self.input.ligand_asl):
            return False, f"Ligand {self.input.ligand_asl} was not found"
    @tasks.preprocessor(order=tasks.AFTER_TASKDIR)
    def _writeInputFile(self):
        receptor_st = self.input.seq.getStructure()
        receptor_st.write(self.getTaskFilename(self._receptor_fname))
[docs]    def makeCmd(self):
        return [
            '$SCHRODINGER/run',
            '-FROM', 'psp',
            'kinase_conservation_analysis.py',
            self._receptor_fname,
            '-receptor_chain', self.input.seq.structure_chain,
            '-ligand_asl', self.input.ligand_asl
        ]  # yapf: disable 
    def _getOutputFile(self):
        return self.getTaskFilename(self.out_fname)
    @tasks.postprocessor
    def _checkOutputFile(self):
        outfile = self._getOutputFile()
        if not os.path.isfile(outfile):
            return False, f"Output file not found: {outfile}"
[docs]    def parseOutput(self):
        """
        See parse_kinase_conservation for documentation.
        """
        outfile = self._getOutputFile()
        seq = self.input.seq
        return parse_kinase_conservation(outfile, seq)  
[docs]def parse_kinase_conservation(outfile: str, seq: sequence.ProteinSequence):
    """
    Read the output file and associate the output with the correct residues.
    :rtype: dict[residue.Residue, annotation.KinaseConservation]
    """
    _sres_id_key = 'ResidueID_in_ct(ChainID:ResidueNumberInscode)'
    _conservation_key = 'Conservation_category'
    st = seq.getStructure()
    results = dict()
    with csv_unicode.reader_open(outfile) as fh:
        for _ in range(4):
            # The first 4 lines are decorative
            next(fh)
        reader = csv.DictReader(fh)
        for row in reader:
            sres_id = row[_sres_id_key]
            try:
                sres = st.findResidue(sres_id)
            except ValueError:
                continue
            res_key = residue.get_structure_residue_key(sres, seq.entry_id)
            res = seq.getResByKey(res_key)
            if res is None:
                continue
            conservation = row[_conservation_key]
            conservation = KinaseConservation(conservation)
            results[res] = conservation
    return results 
[docs]class KinaseFeatureAndConservationTask(KinaseFeatureTask):
    """
    Task to compute both kinase features and kinase conservation
    """
    @tasks.preprocessor(order=tasks.BEFORE_TASKDIR)
    def _checkInputs(self):
        if self.input.seq is None:
            return False, "Receptor sequence must be set"
        if not self.input.seq.hasStructure() or not self.input.seq.entry_id:
            return False, "Receptor sequence must have structure"
        if not self.input.ligand_asl:
            return False, "Ligand ASL must be set"
        st = self.input.seq.getStructure()
        if not analyze.evaluate_asl(st, self.input.ligand_asl):
            return False, f"Ligand {self.input.ligand_asl} was not found"
        # seq.getStructure() and entry ID don't survive json so store
        self.input.st = st
        self.input.entry_id = self.input.seq.entry_id
[docs]    def mainFunction(self):
        """
        Compute kinase features if necessary, then compute kinase conservation
        if the sequence is a kinase
        """
        seq = self.input.seq
        if seq.is_kinase_annotated:
            if not seq.isKinaseChain():
                return
        else:
            # Compute kinase features
            super().mainFunction()
            if not self.output.annotation:
                # Not a kinase
                return
        cons_task = KinaseConservationTask()
        input_dict = self.input.toDict()
        st = input_dict.pop("st")
        seq._get_structure = lambda: st
        entry_id = input_dict.pop("entry_id")
        seq.entry_id = entry_id
        assert seq.hasStructure()
        assert seq.entry_id
        cons_task.input.setValue(input_dict)
        assert cons_task.input.seq.hasStructure()
        cons_task.specifyTaskDir(self.getTaskDir())
        cons_task.start()
        cons_task.wait()  # OK: subprocess
        if cons_task.status is cons_task.FAILED:
            self.failure_info = cons_task.failure_info 
[docs]    def parseConservationOutput(self):
        """
        See parse_kinase_conservation for documentation.
        """
        seq = self.input.seq
        outfile = self.getTaskFilename(KinaseConservationTask.out_fname)
        return parse_kinase_conservation(outfile, seq)