"""
`parameters` models for homology modeling
"""
import collections
import copy
import itertools
import logging
import os
from typing import List
import inflect
import schrodinger
import schrodinger.utils.log as log
from schrodinger import structure
from schrodinger.application.msv.gui import picking
from schrodinger.application.msv.gui.homology_modeling import constants
from schrodinger.application.msv.gui.homology_modeling.constants import \
HeteromultimerMode
from schrodinger.application.msv.gui.homology_modeling.constants import Mode
from schrodinger.application.msv.gui.homology_modeling.constants import HomologyStatus
from schrodinger.application.msv.gui.picking import PickMode
from schrodinger.application.prime import wrapper as prime_wrapper
from schrodinger.infra import jobhub
from schrodinger.job import jobcontrol
from schrodinger.models import adapters
from schrodinger.models import json
from schrodinger.models import parameters
from schrodinger.protein import residue
from schrodinger.protein import sequence
from schrodinger.protein.annotation import make_ligand_name_atom
from schrodinger.Qt import QtCore
from schrodinger.structutils import analyze
from schrodinger.structutils.build import delete_zobs
from schrodinger.tasks import jobtasks
from schrodinger.tasks import queue
from schrodinger.tasks import tasks
from schrodinger.utils import fileutils
from schrodinger.utils import mmutil
logger = log.get_output_logger("msv2_homology_panel")
if schrodinger.in_dev_env():
logger.setLevel(logging.DEBUG)
maestro = schrodinger.get_maestro()
HOMOLOGY_INP_EXT = ".inp"
HOMOLOGY_OUT_SUFFIX = "-out.mae"
HOMOLOGY_PROP = "s_psp_Homology_Source"
VIEWNAME = "MSV2_homology_modeling"
ENTRY_TITLE_KEY = 'HOMOLOGY_ENTRY_TITLE'
PROTEIN_CHAIN_ASL = 'protein AND chain "{}"'
NON_PROTEIN_ASL = 'fillmol (NOT protein AND within 5.0 ({protein}))'
[docs]def get_seq_display_name(seq):
"""
Formats sequence name and chain (if set) for display
"""
name_parts = [seq.name]
chain = seq.chain.strip()
if chain:
name_parts.append(chain)
return "_".join(name_parts)
def _get_file_base(input_filename):
"""
The backend uses the extensionless basename of the first input file to name
other files
"""
return fileutils.splitext(os.path.basename(input_filename))[0]
[docs]class EmptyPrimeConfig(prime_wrapper.PrimeConfig):
"""
Remove the spec from PrimeConfig.
The spec contains keys that can only be passed once in the input file, but
`TEMPLATE_NAME` must be present in the input file for each template
structure. Therefore, we create a new EmptyPrimeConfig object for each
template and append them to the same input file.
"""
specs = []
[docs] def validateValues(self, *args, **kwargs):
"""
No-op to prevent traceback due to empty spec
"""
[docs]class Ligand(parameters.CompoundParam):
"""
:param name: Unique ligand name including chain and residue number
:param pdbres: Ligand residue name
:param chain: Ligand chain name
:param resnum: Ligand residue number
:param molnum: Ligand molecule number
:param asl: ASL representing the ligand
:param structure_name: Name of the structure (excluding chain)
:param cofactor: Whether the het is not a ligand as defined by the ligand
preferences
"""
name: str = None
pdbres: str = None
chain: str = None
resnum: int = None
molnum: int = None
asl: str = None
structure_name: str = None
cofactor: bool = False
[docs]class LigandMolecule(parameters.CompoundParam):
"""
Ligand for display (combines ligands with the same molnum)
"""
name: str = None
structure_name: str = None
source_ligands: list
@property
def cofactor(self):
return any(lig.cofactor for lig in self.source_ligands)
[docs]class LigandConstraint(parameters.CompoundParam):
res: residue.Residue = None
ligname: str = None
[docs]class TargetTemplatePair(parameters.CompoundParam):
target_seq: sequence.ProteinSequence = None
template_seq: sequence.ProteinSequence = None
identity: float
ligands: List[Ligand]
[docs] def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._prime_template = None
self._has_waters = False
self.target_seqChanged.connect(self.updateIdentity)
self.template_seqChanged.connect(self._onTemplateSeqChanged)
self._onTemplateSeqChanged() # changed not emitted if set on init
@property
def is_valid(self):
"""
Whether both the target and template are defined
"""
return (self.target_seq is not None and
self.template_seq is not None and
self.template_seq.hasStructure())
@property
def has_waters(self):
"""
Whether the template structure has waters
:rtype: bool
"""
return self._has_waters
@property
def alignment_quality(self):
if self.identity is None:
return None
identity_percent = self.identity * 100
if identity_percent < constants.WEAK_CUTOFF:
quality = constants.AlignmentQuality.WEAK
elif identity_percent < constants.LOW_CUTOFF:
quality = constants.AlignmentQuality.LOW
else:
quality = constants.AlignmentQuality.ACCEPTABLE
return quality
[docs] def updateIdentity(self):
if self.is_valid:
identity = self.target_seq.getIdentity(self.template_seq,
consider_gaps=False)
else:
identity = None
self.identity = identity
[docs] def getPrimeTemplateName(self, suffix=None):
"""
Get the name that will be used to identify this prime template.
:param suffix: Optional suffix for alignment name (needed if more than
one target uses the same template)
:type suffix: int or NoneType
"""
template = self._prime_template
name_parts = [template.name, template.chain]
if suffix is not None and suffix > 0:
name_parts.append(str(suffix))
return "_".join(name_parts)
def _getPrimeAlignment(self, suffix=None):
target_seq = self.target_seq
template = self._prime_template
target_str, template_str = self._padSequences(str(target_seq),
template.sequence)
template.sequence = template_str
ali_name = self.getPrimeTemplateName(suffix=suffix)
ali = prime_wrapper.BasePrimeAlignment(ali_name, target_str)
ali.setTemplate(template)
return ali
[docs] def getSelectedLigandsForPair(self, all_selected_ligands):
"""
Get the names of the selected ligands belonging to this pair
:param all_selected_ligands: Selected ligands across all pairs
:type all_selected_ligands: collections.abc.iterable(Ligand)
:rtype: list[Ligand]
"""
return sorted(
(lig for lig in all_selected_ligands if lig in self.ligands),
key=lambda l: (l.chain, l.resnum))
def _onTemplateSeqChanged(self):
"""
Cache PrimeTemplate for new template seq and update ligand list
"""
self.updateIdentity()
if self.template_seq is None or not self.template_seq.hasStructure():
return
self._prime_template = self._extractTemplate()
self._has_waters = bool(self._prime_template.waters)
st = self._prime_template.st.copy()
delete_zobs(st)
all_ligands = self._getLigands(st)
if all_ligands:
# Prime ligands includes metals and other non-ligand hets
true_ligands = analyze.find_ligands(st)
true_ligand_pdbres = {lig.pdbres for lig in true_ligands}
ligand_names = set()
ligands = []
for lig in all_ligands:
ai = lig.atom_indexes[0]
name = make_ligand_name_atom(st, ai)
if name in ligand_names:
continue
ligand_names.add(name)
lig_atom = st.atom[ai]
ligand = Ligand(name=name,
pdbres=lig_atom.pdbres,
chain=lig_atom.chain,
resnum=lig_atom.resnum,
asl=lig.ligand_asl,
molnum=lig_atom.molecule_number_by_entry,
structure_name=self._prime_template.name,
cofactor=lig.pdbres not in true_ligand_pdbres)
ligands.append(ligand)
for ligand in self._getNucleicAcidLigands(st, known_hets=ligand_names):
ligand.structure_name = self._prime_template.name
ligands.append(ligand)
self.ligands = ligands
def _getLigands(self, st):
return analyze.find_ligands(st,
**self._prime_template.FIND_LIGANDS_KWARGS)
def _getNucleicAcidLigands(self, st, known_hets=None):
if known_hets is None:
known_hets = set()
atoms = analyze.evaluate_asl(st, "nucleic_acids")
for ai in atoms:
name = make_ligand_name_atom(st, ai)
if name in known_hets:
continue
known_hets.add(name)
lig_atom = st.atom[ai]
ligand = Ligand(
name=name,
pdbres=lig_atom.pdbres,
chain=lig_atom.chain,
resnum=lig_atom.resnum,
molnum=lig_atom.molecule_number_by_entry,
)
yield ligand
def _extractTemplate(self):
"""
:return: Prime wrapper template object
:rtype: prime_wrapper.PrimeTemplate
"""
template_seq = self.template_seq
seq_name = template_seq.name.strip() or "template"
seq_name = seq_name.replace(" ", "_")
template = prime_wrapper.PrimeTemplate(seq_name)
template.sequence = str(template_seq)
chain_id = template_seq.structure_chain
full_st = template_seq.getStructure()
protein_asl = PROTEIN_CHAIN_ASL.format(chain_id)
protein_atoms = analyze.evaluate_asl(full_st, protein_asl)
if not protein_atoms:
raise ValueError(f"No protein atoms with chain {chain_id} found.")
protein_st = full_st.extract(protein_atoms)
non_protein_atoms = analyze.evaluate_asl(
full_st, NON_PROTEIN_ASL.format(protein=protein_asl))
non_protein_atoms = set(non_protein_atoms).difference(protein_atoms)
if non_protein_atoms:
non_protein_st = full_st.extract(non_protein_atoms)
# Force protein atoms to be at the beginning (a single extract keeps
# original atom ordering)
protein_st.extend(non_protein_st)
template.st = protein_st
return template
def _padSequences(self, *sequence_strings):
max_len = max(map(len, sequence_strings))
return [mystr.ljust(max_len, "~") for mystr in sequence_strings]
[docs]class LigandDialogModel(parameters.CompoundParam):
ligands: List[LigandMolecule]
selected_ligands: List[LigandMolecule]
has_waters: bool = False
include_waters: bool = False
constrain_ligand: bool = False
ligand_constraints: List[LigandConstraint]
pick_ligand: bool = False
[docs] @classmethod
def getJsonBlacklist(cls):
"""
@overrides: tasks.AbstractCmdTask
"""
return [
cls.pick_ligand,
]
[docs] def getSelectedLigands(self):
return list(
itertools.chain.from_iterable(
ligmol.source_ligands for ligmol in self.selected_ligands))
[docs]class PrimeSettings(parameters.CompoundParam):
"""
Homology settings that are written to the prime input file
"""
prime_method: constants.PrimeMethod
build_deletions: bool = True
build_insertions: bool = True
build_transitions: bool = True
keep_rotamers: bool = True
max_insertion_size: int = 20
minimize: bool = True
num_output_struct: int = 1
side_opt: bool = True
template_residue_numbers: bool = False
[docs] def getSummaryText(self):
"""
Get a string summarizing the settings.
Prime method and the number of output structures will be shown; all
other options will be grouped under custom or default.
"""
method_kw = "prime_method"
num_kw = "num_output_struct"
primary_keywords = (method_kw, num_kw)
current_settings = self.toDict()
method_text = current_settings.pop(method_kw).value
num_models = current_settings.pop(num_kw)
model_text = inflect.engine().no("model", num_models)
# Only show "custom options" for secondary options
default_settings = type(self).defaultValue().toDict()
for kw in primary_keywords:
del default_settings[kw]
secondary_default = current_settings == default_settings
default_text = "defaults" if secondary_default else "custom options"
return f"{method_text}, {model_text}, {default_text}"
[docs] def initConcrete(self):
super().initConcrete()
self.prime_methodChanged.connect(self._onPrimeMethodChanged)
def _onPrimeMethodChanged(self, prime_method):
"""
Reset num_output_struct for energy-based modeling
"""
if prime_method is constants.PrimeMethod.ENERGY:
M = self.getAbstractParam()
self.num_output_struct = M.num_output_struct.defaultValue()
[docs]class HomologyModelingSettings(parameters.CompoundParam):
prime_settings: PrimeSettings
# Other settings
include_hets: bool = True
add_residue_labels: bool = False
ligand_dlg_model: LigandDialogModel
set_constraints: bool = False
proximity_constraints: List
pick_proximity: bool = False
[docs] def initConcrete(self):
super().initConcrete()
self.pick_proximityChanged.connect(self._onPickChanged)
self.set_constraintsChanged.connect(self._onSetConstraintsChanged)
[docs] def reset(self, *args):
if not args:
# by default, don't reset ligand dialog model
cls = type(self)
args = [
subparam for _, subparam in cls.getSubParams().items()
if not isinstance(subparam, LigandDialogModel)
]
super().reset(*args)
def _onPickChanged(self, pick):
"""
When enabling picking, also enable set constraints
"""
if pick:
self.set_constraints = True
def _onSetConstraintsChanged(self, set_constraints):
"""
When disabling set constraints, also disable picking
"""
if not set_constraints:
self.pick_proximity = False
[docs] @classmethod
def getJsonBlacklist(cls):
"""
@overrides: tasks.AbstractCmdTask
"""
return [
cls.pick_proximity,
]
[docs] @json.adapter(version=49009)
def adapter49009(cls, json_dict):
json_dict.pop("refine_loops")
return json_dict
[docs] @json.adapter(version=51105)
def adapter51105(cls, json_dict):
# commit 48aeafc1e7dfa4b9579cb34b5838c1e4d66555ea removed
# `has_constraints` and moved `set_constraints` and `pick_proximity`
# from PrimeSettings to HomologyModelingSettings
prime_settings = json_dict["prime_settings"]
prime_settings.pop("has_constraints", None)
for key in ("set_constraints", "pick_proximity"):
json_dict[key] = prime_settings.pop(key, False)
return json_dict
def _get_hm_job_config():
return jobtasks.job_config_factory(
default_incorp_mode=jobtasks.IncorporationMode.APPEND
if maestro else jobtasks.IncorporationMode.IGNORE,
viewname=VIEWNAME,
supports_subjobs=True)
[docs]class HMLauncherTask(jobtasks._AbstractJobMixin, tasks.SignalTask):
"""
Task to launch homology modeling jobtasks.
Inherits from `jobtasks._AbstractJobMixin` to get `jobtasks.is_jobtask` to
return True, but does not run a job directly.
"""
job_config = _get_hm_job_config()
# the subtask is a param, but we don't want it to be a subparam, so store
# it as a non-param attribute
subtask = parameters.NonParamAttribute()
def __eq__(self, other):
# Bypassing the AbstractTask implementation of `__eq__` because it
# shallow copies both tasks, which was throwing a traceback.
return super(object, self).__eq__(other)
@tasks.preprocessor
def _setUpSubtask(self):
# Copy settings from parent task to child task
self.subtask.specifyTaskDir(self.getTaskDir())
self.subtask.job_config.setValue(self.job_config)
# Hook up subtask signals
self.subtask.taskDone.connect(self.mainDone)
self.subtask.taskFailed.connect(self.mainDone)
# Preprocess subtask so it happens at the correct time
self.subtask.runPreprocessing()
[docs] def setUpMain(self):
self.subtask.start(skip_preprocessing=True)
[docs] def write(self, skip_preprocessing=False):
if not skip_preprocessing:
# Run launcher task preprocessing to set up and preprocess subtask
self.runPreprocessing()
# Delegate write to subtask
self.subtask.write(skip_preprocessing=True)
[docs] def stop(self):
# Delegate stop to subtask
self.subtask.stop()
[docs]class BaseHomologyTaskMixin(parameters.CompoundParamMixin):
"""
Mixin with shared methods for all homology modeling tasks
Subclasses must inherit from a Task class and define the appropriate task
methods (e.g. makeCmd/mainFunction, preprocessors).
"""
aln = parameters.NonParamAttribute()
input: HomologyModelingInput
output_file: tasks.TaskFile
job_id: str
ENTRY_TITLE_FMT = "Model of {target} based on {templates}"
[docs] def initializeValue(self):
super().initializeValue()
self.name = "homology"
[docs] def initConcrete(self):
super().initConcrete()
self.aln = None
[docs] def getOutputStructureFiles(self):
"""
Get the task's output structure file(s).
"""
return [self.getTaskFilename(self.output_file)]
def _getModelName(self):
tt_pairs = self.input.target_template_pairs
target = get_seq_display_name(tt_pairs[0].target_seq)
templates = [
get_seq_display_name(pair.template_seq) for pair in tt_pairs
]
templates = ", ".join(templates)
return self.ENTRY_TITLE_FMT.format(target=target, templates=templates)
[docs]class HomologyTaskMixin(BaseHomologyTaskMixin):
"""
Mixin with shared methods for simple homology modeling tasks.
Subclasses must inherit from a JobTask class and define the appropriate task
methods (e.g. makeCmd/mainFunction, preprocessors).
:cvar ALLOW_DUPLICATE_LIGANDS: Whether to allow duplicate ligands. Should
be False if all of the configs will be used to create one multimer.
:vartype ALLOW_DUPLICATE_LIGANDS: bool
"""
file_prefix = parameters.NonParamAttribute()
ALLOW_DUPLICATE_LIGANDS = False
[docs] def initConcrete(self):
super().initConcrete()
self.file_prefix = ""
def _getPrimeLogFilename(self, input_filename):
file_base = _get_file_base(input_filename)
log_file = file_base + ".log"
return self.getTaskFilename(log_file)
def _checkLogFile(self, input_filename):
"""
Parse the log file because the backend returns 0 when it fails
"""
log_file = self._getPrimeLogFilename(input_filename)
with open(log_file) as fh:
for line in fh:
if "Homology model building job complete" in line:
break
else:
raise tasks.TaskFailure(
"Homology model building did not complete")
##########################
# TASK METHODS
##########################
def _onJobStarted(self, job):
"""
@overrides: _AbstractJobMixin
"""
super()._onJobStarted(job)
if (maestro and
mmutil.feature_flag_is_enabled(mmutil.NEW_INCORPORATION_INFRA)):
jmgr = jobhub.get_job_manager()
jmgr.setJobCompletionOptions(job.JobId, VIEWNAME, job.Project)
def _getInputFilename(self, idx=None):
file_name = self.file_prefix + self.name
if idx:
file_name = f"{file_name}_{idx}"
return file_name + HOMOLOGY_INP_EXT
def _writeInput(self):
"""
Generate a separate input file for each target-template pair.
:return: File names (relative path) for all generated input files.
:rtype: list[str]
"""
input_files = []
prime_configs = self._getPrimeConfigs()
for i, cur_config in enumerate(prime_configs):
file_name = self._getInputFilename(i)
file_path = self.getTaskFilename(file_name)
with open(file_path, 'w') as fh:
cur_config.writeInputFile(fh, ignore_none=True, yesno=False)
input_files.append(file_name)
return input_files
def _getPrimeConfigs(self):
"""
Create a Prime config object for each target-template pair.
:return: All Prime config objects.
:rtype: list[prime_wrapper.PrimeConfig]
"""
tt_pairs = self.input.target_template_pairs
taskdir = self.getTaskDir()
all_configs = []
seen_ligs = set()
for template_number, pair in enumerate(tt_pairs, start=1):
prime_config = self._getPrimeConfigWithSettings()
self._addPairToConfig(prime_config, pair, taskdir, template_number)
if not self.ALLOW_DUPLICATE_LIGANDS:
# Remove ligands that were used in a previous config
to_remove = set()
for key, value in prime_config.items():
if "HETERO" in key:
if value in seen_ligs:
to_remove.add(key)
seen_ligs.add(value)
for key in to_remove:
del prime_config[key]
all_configs.append(prime_config)
return all_configs
def _getPrimeConfigWithSettings(self):
"""
Create a Prime config object with the current settings but no alignment
loaded.
:return: The Prime config object.
:rtype: prime_wrapper.PrimeConfig
"""
prime_config = prime_wrapper.PrimeConfig()
InputConfigAdapter.convert(self.input.settings.prime_settings,
prime_config)
return prime_config
def _addPairToConfig(self,
prime_config,
pair,
taskdir,
template_number,
suffix=None):
"""
Add a target-template pair to a Prime config object and write out the
target-template alignment in Prime format.
:param prime_config: The Prime config object to add the pair to.
:type prime_config: prime_wrapper.PrimeConfig
:param pair: The target-template pair to add.
:type pair: TargetTemplatePair
:param taskdir: The directory to write the Prime alignment to.
:type taskdir: str
:param template_number: The index of the target-template pair.
:type template_number: int
:param suffix: Suffix for prime alignment outfile name
:type suffix: str
"""
settings = self.input.settings
is_energy = (settings.prime_settings.prime_method is
constants.PrimeMethod.ENERGY)
selected_ligands, include_waters = self.input.getLigandAndWaterSettings(
)
prime_alignment = pair.writeInputForPrime(taskdir, suffix=suffix)
prime_config.setAlignment(prime_alignment, (prime_wrapper.NO_LIGANDS,),
include_waters,
template_number=template_number)
crosslinks = []
if settings.include_hets and selected_ligands:
ligands = pair.getSelectedLigandsForPair(selected_ligands)
template_name = prime_config['TEMPLATE_NAME']
if include_waters:
start_idx = sum(1 for key in prime_config if "HETERO" in key)
else:
start_idx = 0
lig_name_index_map = {}
for idx, lig in enumerate(ligands, start=start_idx):
lig_name_index_map[lig.name] = idx
het_name = f'{template_name}_HETERO_{idx}'
lig_id = f'LIG{lig.pdbres}{lig.chain}:{lig.resnum}'
prime_config[het_name] = lig_id
if is_energy and settings.ligand_dlg_model.constrain_ligand:
lig_constraints = settings.ligand_dlg_model.ligand_constraints
for constr in lig_constraints:
lig_idx = lig_name_index_map[constr.ligname]
# The backend takes ligand indexes starting at 500
lig_id = f"Z:{500 + lig_idx}"
res_idx = constr.res.gapless_idx_in_seq
res_id = f"A:{res_idx + 1}"
crosslinks.append(f"{lig_id} {res_id}")
if is_energy and settings.set_constraints:
for res1, res2 in settings.proximity_constraints:
ridx1 = res1.gapless_idx_in_seq + 1
ridx2 = res2.gapless_idx_in_seq + 1
crosslinks.append(f"A:{ridx1} A:{ridx2}")
if is_energy and crosslinks:
prime_config['CROSSLINK'] = " ".join(crosslinks)
return prime_alignment
def _check_unique_template_names(pairs):
"""
Return whether all pairs' templates are unique by name and chain
"""
names_and_chains = [
(pair.template_seq.name, pair.template_seq.chain) for pair in pairs
]
valid = len(set(names_and_chains)) == len(names_and_chains)
if valid:
return True
msg = "Templates must have unique names and chains. Non-unique items: "
non_unique = [
item for item, num in collections.Counter(names_and_chains).items()
if num > 1
]
msg += ", ".join(["_".join(parts) for parts in non_unique])
return False, msg
[docs]class HomologyModelingTask(HomologyTaskMixin, BasePrimeTask):
@tasks.preprocessor(order=tasks.BEFORE_TASKDIR)
def _checkUniqueTemplateNames(self):
return _check_unique_template_names(self.input.target_template_pairs)
@tasks.preprocessor(order=tasks.AFTER_TASKDIR)
def _prepareInputAndOutput(self):
"""
Generate the input files and expected output file names.
"""
input_files = self._writeInput()
self.input_files = input_files
def _getPrimeConfigWithSettings(self):
prime_config = super()._getPrimeConfigWithSettings()
prime_config[ENTRY_TITLE_KEY] = self._getModelName()
return prime_config
@tasks.postprocessor
def _checkFirstLogFile(self):
self._checkLogFile(self.input_files[0])
[docs]class ChimeraHomologyModelingTask(HomologyModelingTask):
ENTRY_TITLE_FMT = "Chimeric model of {target} based on {templates}"
def _writeInput(self):
"""
Generate the input file for chimera mode.
@overrides: HomologyTaskMixin
:return: File names (relative path) for all generated input files.
:rtype: list[str]
"""
prime_configs = self._getPrimeConfigs()
file_name = self._getInputFilename()
file_path = self.getTaskFilename(file_name)
with open(file_path, 'w') as fh:
for cur_config in prime_configs:
cur_config.writeInputFile(fh, ignore_none=True, yesno=False)
return [file_name]
def _getPrimeConfigs(self):
"""
Create Prime config objects for chimera mode. These config objects
should all be written to a single input file.
@overrides: HomologyTaskMixin
:return: All Prime config objects.
:rtype: list[prime_wrapper.PrimeConfig]
"""
main_prime_config = self._getPrimeConfigWithSettings()
composite_array, tt_pairs = self.input.getCompositeArray()
main_prime_config['COMPOSITE_ARRAY'] = composite_array
all_configs = []
all_configs.append(main_prime_config)
taskdir = self.getTaskDir()
for template_number, pair in enumerate(tt_pairs, start=1):
prime_config = EmptyPrimeConfig()
self._addPairToConfig(prime_config, pair, taskdir, template_number)
all_configs.append(prime_config)
return all_configs
[docs]class HeteromultimerHomologyModelingTask(BaseHomologyTaskMixin, BasePrimeTask):
@tasks.preprocessor(order=tasks.BEFORE_TASKDIR)
def _checkUniqueTemplateNames(self):
pairs = (pair for tab_input in self.input.input_list
for pair in tab_input.target_template_pairs)
return _check_unique_template_names(pairs)
@tasks.preprocessor(order=tasks.AFTER_TASKDIR)
def _prepareInputAndOutput(self):
taskdir = self.getTaskDir()
for idx, hm_input in enumerate(self.input.input_list):
TaskCls = get_task_class(hm_input.heteromultimer_mode)
subtask = TaskCls()
if idx > 0:
subtask.file_prefix = f"tab{idx}_"
subtask.name = self.name
subtask.input.setValue(hm_input)
subtask.specifyTaskDir(taskdir)
subtask.runPreprocessing()
self.input_files.extend(subtask.input_files)
[docs]class BatchHomologyModelingTask(HomologyTaskMixin, jobtasks.ComboJobTask):
job_config = _get_hm_job_config()
[docs] class Output(jobtasks.JobOutput):
pass
ALLOW_DUPLICATE_LIGANDS = True
ENTRY_TITLE_FMT = "Model of {target} based on {template}"
[docs] def initializeValue(self):
super().initializeValue()
self.name = "batch_homology"
[docs] def getOutputStructureFiles(self):
"""
@overrides: HomologyTaskMixin
"""
return [self.getTaskFilename(self.output.incorporation_file)]
@tasks.preprocessor(order=tasks.BEFORE_TASKDIR)
def _checkName(self):
# The name "homology" is used for the subjobs, so prevent trying to
# create homology.log for both parent and child task
if self.name == "homology":
return False, 'Batch task cannot be named "homology"'
@tasks.preprocessor(order=tasks.AFTER_TASKDIR)
def _prepareInputAndOutput(self):
"""
Generate the input files
"""
# Temporarily set name to homology to write subjob input files
old_name = self.name
self.name = "homology"
try:
# Note: `_writeInput` calls `_addPairToConfig` which
# adds the aln and pdb files to `self.input_files`
input_files = self._writeInput()
finally:
self.name = old_name
input_files = [self.getTaskFilename(fn) for fn in input_files]
# Store the prime input files separately
self.input.prime_input_files = input_files
[docs] def mainFunction(self):
subtasks = []
for in_fn in self.input.prime_input_files:
subtask = BasePrimeTask()
# Launch subtasks from the parent task jobdir (cwd)
# because that's where all the prime input files are
subtask.specifyTaskDir(None)
subtask.input_files = [in_fn]
subtasks.append(subtask)
queue.autoname_tasks(subtasks)
dj = queue.TaskDJ()
for task in subtasks:
dj.addTask(task)
dj.run()
queue.auto_register_files(subtasks,
auto_file_mode=queue.AutoFileMode.ALL)
# TODO after PANEL-19618, remove the next block
for task in subtasks:
if task.job_id:
job_obj = jobcontrol.Job(task.job_id)
jobcontrol.register_job_output(job_obj)
succeeded_tasks = []
for task in subtasks:
if task.status is task.DONE:
succeeded_tasks.append(task)
self.output.incorporation_file = "batch_homology_out.maegz"
with structure.StructureWriter(
self.output.incorporation_file) as writer:
for task in succeeded_tasks:
# Because the subtasks run in cwd, the relative output file
# path can be used directly
with structure.StructureReader(task.output_file) as reader:
writer.extend(reader)
num_done = len(succeeded_tasks)
num_started = len(subtasks)
if num_done < num_started:
raise tasks.TaskFailure(f"{num_done} of {num_started} homology "
"models successfully completed")
def _addPairToConfig(self, prime_config, pair, taskdir, template_number):
"""
@overrides: HomologyTaskMixin
"""
# many-one reuses the same template, so name and chain aren't
# sufficient to identify
suffix = template_number - 1
template_number = 1
prime_alignment = super()._addPairToConfig(prime_config,
pair,
taskdir,
template_number,
suffix=suffix)
# Add aln and pdb files to input files so they're copied to the job dir
self.input.misc_input_files.append(
self.getTaskFilename(prime_alignment.getAlnFilename()))
self.input.misc_input_files.append(
self.getTaskFilename(prime_alignment.template.file_name))
entry_title = self.ENTRY_TITLE_FMT.format(
target=get_seq_display_name(pair.target_seq),
template=get_seq_display_name(pair.template_seq))
prime_config[ENTRY_TITLE_KEY] = entry_title
@tasks.postprocessor
def _checkFirstLogFile(self):
self._checkLogFile(self.input.prime_input_files[0])
[docs]class ConsensusHomologyModelingTask(BaseHomologyTaskMixin, jobtasks.CmdJobTask):
input_file: tasks.TaskFile
job_config = _get_hm_job_config()
@tasks.preprocessor
def _writeInput(self):
if not self.input.target_template_pairs:
return
self.input_file = self.name + HOMOLOGY_INP_EXT
# Consensus output is maegz
self.output_file = _get_file_base(self.input_file) + "-out.maegz"
prime_config = prime_wrapper.ConsensusConfig()
first_ali_name = self.input.target_template_pairs[
0].getPrimeTemplateName()
file_name = self.getTaskFilename(first_ali_name + '.aln')
for idx, pair in enumerate(self.input.target_template_pairs):
do_append = bool(idx)
prime_alignment = pair.writeInputForConsensus(file_name=file_name,
append=do_append)
if idx == 0:
prime_config.setAlignment(prime_alignment)
model_name = self._getModelName()
# TODO PRIME-4731: consensus homology doesn't support whitespace
model_name = model_name.replace(" ", "_")
prime_config[ENTRY_TITLE_KEY] = model_name
with open(self.getTaskFilename(self.input_file), 'w') as fh:
prime_config.writeInputFile(fh, ignore_none=True, yesno=False)
[docs] def makeCmd(self):
"""
@overrides: tasks.AbstractCmdTask
"""
return ['consensus_homology', self.input_file]
[docs]def get_possible_templates(aln):
"""
Generator for sequences from the given alignment that can be templates
:rtype: collections.abc.Iterable[sequence.ProteinSequence]
"""
for seq in get_selected_nonref_seqs(aln):
if seq.hasStructure():
yield seq
[docs]def get_selected_nonref_seqs(aln):
"""
Generator for non-reference sequences from the given alignment that are
selected (or all non-reference seqs if there is no selection)
:rtype: collections.abc.Iterable[sequence.ProteinSequence]
"""
selected_seqs = aln.seq_selection_model.getSelection()
selected_seqs.discard(aln.getReferenceSeq())
for seq in itertools.islice(aln, 1, None):
if type(seq) != sequence.ProteinSequence:
# TODO MSV-1504 once NucleicAcidSequence doesn't inherit
# ProteinSequence, change to isinstance
# Skip nucleic acid sequences
return
if seq in selected_seqs or (
not selected_seqs and
(not seq.hasStructure() or
seq.getStructure().property.get(HOMOLOGY_PROP) is None)):
yield seq
[docs]def get_min_alignment_quality(pairs):
"""
:param pairs: Target template pairs
:type pairs: collections.abc.iterable[TargetTemplatePair]
:return: The minimum alignment quality of all alignment pairs or None
if no alignment pairs exist
:rtype: constants.AlignmentQuality or NoneType
"""
qualities = (pair.alignment_quality for pair in pairs)
return min((qual for qual in qualities if qual is not None), default=None)
[docs]def get_task_from_model(gui_model):
current_input = gui_model.current_page.homology_modeling_input
current_mode = current_input.mode
if current_mode is Mode.HETEROMULTIMER:
het_settings = gui_model.heteromultimer_settings
task = HeteromultimerHomologyModelingTask()
task.input.settings.setValue(het_settings.settings)
for page in het_settings.selected_pages:
# TODO MSV-3239 in standalone mode, deepcopying the sequences
# causes the template sequences to lose the associated structure
input_copy = copy.deepcopy(page.homology_modeling_input)
task.input.input_list.append(input_copy)
else:
task = get_task(current_input)
return task
[docs]def get_task(hm_input):
"""
Return a task for the specified input
"""
TaskCls = get_task_class(hm_input.mode)
task = TaskCls()
task.input.setValue(hm_input)
return task
[docs]def get_task_class(mode):
if mode in (Mode.ONE_ONE, Mode.HOMOMULTIMER, HeteromultimerMode.ONE_ONE):
TaskCls = HomologyModelingTask
elif mode in (Mode.CHIMERA, HeteromultimerMode.CHIMERA):
TaskCls = ChimeraHomologyModelingTask
elif mode is Mode.MANY_ONE:
TaskCls = BatchHomologyModelingTask
elif mode is Mode.CONSENSUS:
TaskCls = ConsensusHomologyModelingTask
else:
raise ValueError(
f"Task class has not been implemented for mode {str(mode)}")
return TaskCls