import os
from typing import Set
from typing import Tuple
from typing import Union
import numpy
import schrodinger
import schrodinger.application.desmond.automatic_analysis_generator as aag
import schrodinger.application.desmond.cms as cms
import schrodinger.structure as structure
import schrodinger.structutils.analyze as analyze
import schrodinger.structutils.build as build
import schrodinger.utils.sea as sea
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import util
from schrodinger.application.desmond.replica_dE import replica_container
from schrodinger.application.desmond.replica_dE import replicas_monitor
from schrodinger.application.desmond.struc import get_reference_ct
from schrodinger.application.desmond.util import parse_res
from schrodinger.structutils import createfragments
from schrodinger.structutils import smiles as smiles_mod
from schrodinger.utils import subprocess
from . import config
from .util import get_traj_filename
try:
from schrodinger.application.scisol.packages.fep import \
protein_mutation_generator as pmg
except ImportError:
pmg = None
[docs]def get_cov_lig_info(cms_st):
"""
Find ligand residue ID for covalent ligand job. The inputs should always be
a complex system/complex leg.
:param cms_st: Desmond system structure
:type cms_st: `cms.Cms`
:rtype: tuple(`str`, `str`) or tuple(None, None)
:return: (chain, resnum) information of the covalent ligand
"""
custom_tag = cms_st.ref_ct.property.get(constants.COVALENT_LIGAND, None)
if custom_tag is not None:
return parse_res(custom_tag)[:2]
else:
from schrodinger.application.scisol.packages.fep.fepmae import \
find_covalent_residue_asl
cov_res_asl = find_covalent_residue_asl(cms_st.ref_ct)
cov_res_idx = analyze.evaluate_asl(cms_st.ref_ct, cov_res_asl)
if cov_res_idx:
cov_lig_atm = cms_st.ref_ct.atom[cov_res_idx[0]]
return (cov_lig_atm.chain, cov_lig_atm.resnum)
return (None, None)
[docs]class AlchemAsl(object):
[docs] def __init__(self, ref_asl, mut_asl, ref_solv_asl=None, mut_solv_asl=None):
self._ref_asl = ref_asl
self._mut_asl = mut_asl
self._ref_solv_asl = ref_solv_asl
self._mut_solv_asl = mut_solv_asl
@property
def ref_asl(self):
return self._ref_asl
@property
def mut_asl(self):
return self._mut_asl
@property
def ref_solv_asl(self):
return self._ref_solv_asl
@property
def mut_solv_asl(self):
return self._mut_solv_asl
[docs]def setup_alchem_properties(cms_st, alchem_asl_obj, perturbation_type,
leg_type):
"""
This method sets up all alchemical selections for different types of FEPs
and respected perturbation legs.
:type cms_st: `cms.Cms`
:type alchem_asl_obj: `AlchemAsl`
:param alchem_asl_obj: AlchemAsl object
:type perturbation_type: `str`
:param perturbation_type: FEP_TYPE as defined in constants.FEP_TYPES
:type leg_type: `str`
:param leg_type: either a 'solvent' or 'complex'
:rtype: (`SmallMoleculeReport`, `SmallMoleculeReport`),
(`str`, `str`)
:return: two tuples of pairs: SmallMoleculeReport and full protein ASL
strings
"""
full_protein_asl = (f"protein and not ({alchem_asl_obj.mut_asl})",
f"protein and not ({alchem_asl_obj.ref_asl})")
template_frag_asl = "protein and (chain.name %s and res.num %i-%i) and not (%s)"
ref_metal_asl = None
mut_metal_asl = None
if constants.FEP_TYPES.COVALENT_LIGAND == perturbation_type:
if leg_type == constants.FepLegTypes.COMPLEX:
chain_name, resid = get_cov_lig_info(cms_st)
templ_covlig_asl = f"(chain.name {chain_name} and res.num {resid}) and not (%s) and not " \
f"(atom.{constants.FEP_COVALENT_PROTEIN_BB_ATOM} 1 or res.ptype ACE NMA)"
ref_asl = templ_covlig_asl % alchem_asl_obj.mut_asl
mut_asl = templ_covlig_asl % alchem_asl_obj.ref_asl
# exclude covalent ligand from protein selection
tmpl = f" and not (chain.name {chain_name} and res.num {resid} and not " \
f"atom.{constants.FEP_COVALENT_PROTEIN_BB_ATOM} 1)"
full_protein_asl = (full_protein_asl[0] + tmpl,
full_protein_asl[1] + tmpl)
else:
ref_asl = alchem_asl_obj.ref_asl
mut_asl = alchem_asl_obj.mut_asl
full_protein_asl = (None, None)
elif constants.FEP_TYPES.PROTEIN_STABILITY == perturbation_type:
full_protein_asl = (leg_type == constants.FepLegTypes.COMPLEX) and \
full_protein_asl or (None, None)
chain_name, resid, ins_code = parse_prm_tag(cms_st)
if chain_name is None and resid is None:
return (None, None), full_protein_asl
ref_asl = _get_alchem_asl(chain_name, resid, ins_code,
alchem_asl_obj.mut_asl)
mut_asl = _get_alchem_asl(chain_name, resid, ins_code,
alchem_asl_obj.ref_asl)
elif constants.FEP_TYPES.SMALL_MOLECULE == perturbation_type:
ref_asl = alchem_asl_obj.ref_asl
mut_asl = alchem_asl_obj.mut_asl
full_protein_asl = full_protein_asl \
if leg_type == constants.FepLegTypes.COMPLEX else (None, None)
elif constants.FEP_TYPES.METALLOPROTEIN == perturbation_type:
template_metal_ligand_asl = 'ligand and ({})'
template_metal_protein_asl = 'protein and ({})'
template_metal_metal_asl = '((ions) or (metals) or (metalloids)) and ({})'
ref_asl = template_metal_ligand_asl.format(alchem_asl_obj.ref_asl)
mut_asl = template_metal_ligand_asl.format(alchem_asl_obj.mut_asl)
ref_prot_asl = template_metal_protein_asl.format(alchem_asl_obj.ref_asl)
mut_prot_asl = template_metal_protein_asl.format(alchem_asl_obj.mut_asl)
ref_metal_asl = template_metal_metal_asl.format(alchem_asl_obj.ref_asl)
mut_metal_asl = template_metal_metal_asl.format(alchem_asl_obj.mut_asl)
full_protein_asl = (ref_prot_asl, mut_prot_asl) \
if leg_type == constants.FepLegTypes.COMPLEX else (None, None)
elif constants.FEP_TYPES.PROTEIN_SELECTIVITY == perturbation_type:
chain_name, resid, ins_code = parse_prm_tag(cms_st)
if chain_name is None and resid is None:
return (None, None), full_protein_asl
ref_asl = _get_alchem_asl(chain_name, resid, ins_code,
alchem_asl_obj.mut_asl)
mut_asl = _get_alchem_asl(chain_name, resid, ins_code,
alchem_asl_obj.ref_asl)
elif constants.FEP_TYPES.LIGAND_SELECTIVITY == perturbation_type:
"""
In complex leg we want ligand-protein interactions. In solvent leg,
there is no ligand, so we're looking at fragment-protein interactions.
"""
if leg_type == constants.FepLegTypes.COMPLEX:
lig_aids = analyze.evaluate_asl(
cms_st.comp_ct[0],
'(not water and not (ions) and not (metals))')
ref_asl = mut_asl = analyze.generate_asl(cms_st.comp_ct[0],
lig_aids)
else:
chain_name, resid, ins_code = parse_prm_tag(cms_st)
if chain_name is None and resid is None:
return (None, None), full_protein_asl
ref_asl = _get_alchem_asl(chain_name, resid, ins_code,
alchem_asl_obj.mut_asl)
mut_asl = _get_alchem_asl(chain_name, resid, ins_code,
alchem_asl_obj.ref_asl)
elif perturbation_type == constants.FEP_TYPES.ABSOLUTE_BINDING:
# In ABFEP, even though the ref_ct is the dummy atom while the
# mut_ct is the ligand, lambda=0 is where the ligand interacts
# with the protein
ref_asl = f'({alchem_asl_obj.mut_asl}) and a.{constants.FEP_ABSOLUTE_BINDING_LIGAND} 1'
mut_asl = "res.ptype 'DU '"
# Define the protein asl, making sure there is only one copy of the
# protein in the analysis
full_protein_asl = (full_protein_asl[1], full_protein_asl[0]) \
if leg_type == constants.FepLegTypes.COMPLEX else (None, None)
alchem_asl_obj._ref_solv_asl, alchem_asl_obj._mut_solv_asl = \
alchem_asl_obj.mut_solv_asl, alchem_asl_obj.ref_solv_asl
elif perturbation_type == constants.FEP_TYPES.SOLUBILITY:
full_protein_asl = (None, None)
ref_asl = alchem_asl_obj.ref_asl
mut_asl = alchem_asl_obj.mut_asl
ref_st = cms_st.extract(cms_st.select_atom(ref_asl))
mut_st = cms_st.extract(cms_st.select_atom(mut_asl))
# get ASL/structures for alchemical solvent
ref_solvent_st = cms_st.extract(cms_st.select_atom(alchem_asl_obj.ref_solv_asl)) \
if alchem_asl_obj.ref_solv_asl else None
mut_solvent_st = cms_st.extract(cms_st.select_atom(alchem_asl_obj.mut_solv_asl)) \
if alchem_asl_obj.mut_solv_asl else None
ref = SmallMoleculeReport(ref_st, perturbation_type, leg_type, 1, ref_asl,
ref_solvent_st, alchem_asl_obj.ref_solv_asl,
ref_metal_asl)
mut = SmallMoleculeReport(mut_st, perturbation_type, leg_type, 2, mut_asl,
mut_solvent_st, alchem_asl_obj.mut_solv_asl,
mut_metal_asl)
alchem_info_list = (ref, mut)
return alchem_info_list, full_protein_asl
[docs]def parse_prm_tag(cms_model: cms.Cms) -> \
Union[Tuple[None, None, None], Tuple[str, int, str]]:
"""
Given a cms model, get the chain, resnum, and inscode of the mutated
residue.
Mutated sites are parsed from the s_bioluminate_Mutations property which
is a string in the format of A:33B(ALA->VAL) where A is the chain, 33 is the
residue number, B in the insertion code (optional), and the mutation is
from an alanine to a valine.
In the case of
1) a multisite+multistep mutation (e.g. WT -> A-ALA41ILE,A-ALA43GLY)
2) there is no s_bioluminate_Mutations property
we must skip ligand analysis, so return (None, None, None)
"""
mutated_sites = list(_get_cms_mutated_sites(cms_model))
if not mutated_sites:
# no mutation sites were found
return None, None, None
elif len(mutated_sites) > 1:
# sid analysis does not support multiple ligands, and hence must be
# skipped for multisite mutations
return None, None, None
chain, resnum, inscode = mutated_sites[0]
return chain, resnum, inscode
def _get_cms_mutated_sites(cms_model: cms.Cms) -> Set[Tuple[str, int, str]]:
"""
Given a cms, return a set of residues that were mutated between the
reference and mutant cts. If the BIOLUMINATE_MUTATION cannot be found (on
cms created before 20-2) return an empty set.
"""
reference_mutations = cms_model.ref_ct.property.get(
constants.BIOLUMINATE_MUTATION)
mutant_mutations = cms_model.mut_ct.property.get(
constants.BIOLUMINATE_MUTATION)
if reference_mutations is None or mutant_mutations is None:
return set()
return pmg.get_mutated_sites(reference_mutations, mutant_mutations)
def _get_alchem_asl(chain_name: str, resnum: int, inscode: str,
asl_to_exclude: str) -> str:
"""
Get asl containing residues on the given chain with the given residue
numbers (plus and minus 1) and insertion code while exclusing
`asl_to_exclude`
"""
asl = (f'protein and (chain.name "{chain_name}" '
f'and res.num {resnum-1}-{resnum+1}')
if inscode:
asl += f' and res.i "{inscode}"'
asl += f') and not ({asl_to_exclude})'
return asl
[docs]def write_recentered_traj(traj_fn: str, asl: str) -> Tuple[str, str]:
"""
Center the cms/trajectory to the provided asl string. We also need to
rename the CT_TYPE of 'solute' to 'solvent' to ensure proper centering.
Return the new cms and trj filenames.
"""
from schrodinger.application.desmond.packages import topo
from schrodinger.application.desmond.packages import traj
from schrodinger.application.desmond.packages import traj_util
from schrodinger.application.desmond.packages import pfx
msys_model, cms_model, tr = traj_util.read_cms_and_traj(traj_fn)
for ct in cms_model.comp_ct:
if ct.property[constants.CT_TYPE] == constants.CT_TYPE.VAL.SOLUTE:
ct.property[constants.CT_TYPE] = constants.CT_TYPE.VAL.SOLVENT
break
center_aids = cms_model.select_atom(asl)
if cms_model.need_msys:
gids = topo.aids2gids(cms_model, center_aids, True)
topo.center_cms(msys_model, gids, cms_model)
topo.center(msys_model, gids, tr)
else:
gids = cms_model.convert_to_gids(center_aids, True)
pfx.center_cms(gids, cms_model)
pfx.center(cms_model.glued_topology, gids, tr)
out_traj_fn = 'centered_solubility-out.cms'
out_traj_path = 'centered_solubility.xtc'
cms_model.fix_filenames(out_traj_fn, out_traj_path)
cms_model.write(out_traj_fn)
traj.write_traj(tr, out_traj_path)
return out_traj_fn, out_traj_path
[docs]class FEPReport(object):
[docs] def __init__(self,
basename,
energy_output,
task_type='lambda_hopping',
n_win=12,
perturbation_type=None):
self.basename = basename
self.replicas_monitor = None
self.nreplicas = n_win
self.task_type = task_type
self.prm_tag = None
self.prot_info = None
self.alchem_info_list = []
self.lambda0_result = None
self.lambda1_result = None
self.replica_energies = None
self.prot_info = None
self.perturbation_type = perturbation_type
self.full_protein_asl = [None, None]
self.alchem_info_list = [None, None]
self.simulation_details = FEPSimulationReport(basename, task_type,
perturbation_type)
self.cms_st = self.simulation_details.get_cms()
self.replica_energies = replica_container(basename,
energy_output,
task_type=task_type,
n_win=n_win)
if self.replica_energies:
self.nreplicas = len(self.replica_energies.replica_list)
if self.simulation_details.cfg and task_type in [
'lambda_hopping', 'afep'
]:
self.replicas_monitor = replicas_monitor(
basename, self.simulation_details.cfg, task_type)
self.setup_alchem_properties()
self.prot_info = ProteinReport(self.cms_st,
self.full_protein_asl[0],
mutation_tag=self.prm_tag)
self.lambda0_result = self.get_analysis(fep_lambda=0)
self.lambda1_result = self.get_analysis(fep_lambda=1)
[docs] def setup_alchem_properties(self):
self._determine_fep_leg()
self.alchem_asl = self._get_alchemical_asls()
self.alchem_info_list, self.full_protein_asl = \
setup_alchem_properties(self.cms_st, self.alchem_asl,
self.perturbation_type, self.leg_type)
self.simulation_details.perturbation_type = self.perturbation_type
def _determine_fep_leg(self):
"""Given a cms file we determine its component and the FEP leg"""
if self.perturbation_type == constants.FEP_TYPES.SOLUBILITY:
self.leg_type = constants.FepLegTypes.HYDRATION if self.cms_st.comp_ct[
0].mol_total == 1 else constants.FepLegTypes.SUBLIMATION
return
ref_ctid = self.cms_st.comp_ct.index(self.cms_st.ref_ct)
leg_type = constants.FepLegTypes.COMPLEX
if self.perturbation_type in [
constants.FEP_TYPES.SMALL_MOLECULE,
constants.FEP_TYPES.LIGAND_SELECTIVITY,
constants.FEP_TYPES.PROTEIN_SELECTIVITY
] and ref_ctid == 0:
leg_type = constants.FepLegTypes.SOLVENT
elif self.perturbation_type in [
constants.FEP_TYPES.PROTEIN_STABILITY,
constants.FEP_TYPES.COVALENT_LIGAND
]:
if self.cms_st.ref_ct.atom_total < constants.LIGAND_TOTAL_ATOMS_LIMIT:
leg_type = constants.FepLegTypes.SOLVENT
elif self.perturbation_type == constants.FEP_TYPES.METALLOPROTEIN:
if self.cms_st.ref_ct.atom_total < constants.LIGAND_TOTAL_ATOMS_LIMIT:
leg_type = constants.FepLegTypes.SOLVENT
elif self.perturbation_type == constants.FEP_TYPES.ABSOLUTE_BINDING:
leg_type = self.cms_st.ref_ct.property[
constants.ABSOLUTE_BINDING_LEGS]
self.leg_type = leg_type
def _get_alchemical_asls(self):
"""
For each lambda value (0 or 1), return the ligand asl and then the
alchemical solvent (can be either ion or water) asl.
"""
def _get_asl(_ct):
ctid = self.cms_st.comp_ct.index(_ct)
atom_list = self.cms_st.get_fullsys_ct_atom_index_range(ctid)
lig_struc = self.cms_st.extract(atom_list)
ions_or_waters = analyze.evaluate_asl(
lig_struc, f'atom.{constants.ALCHEMICAL_ION}')
# if no alchemical waters or ions
if not ions_or_waters:
return (analyze.generate_asl(self.cms_st, atom_list), None)
ions_or_waters_idx = [
lig_struc.atom[i].property[constants.ORIGINAL_INDEX]
for i in ions_or_waters
]
lig_idx = list(set(atom_list) - set(ions_or_waters_idx))
return (analyze.generate_asl(self.cms_st, lig_idx),
analyze.generate_asl(self.cms_st, ions_or_waters_idx))
if self.perturbation_type == constants.FEP_TYPES.SOLUBILITY:
alchem_prop = f'atom.{constants.FEP_ABSOLUTE_LIGAND} 1'
sel_atoms = analyze.evaluate_asl(self.cms_st, alchem_prop)
mol_number = set(
[self.cms_st.atom[i].molecule_number for i in sel_atoms])
# this property should be set only on a single molecule
if len(mol_number) != 1:
raise RuntimeError("More than one alchemical molecule set.")
mol_asl = f'mol.num {list(mol_number)[0]}'
return AlchemAsl(mol_asl, mol_asl, mol_asl, mol_asl)
ref_asl, ref_solvent_asl = _get_asl(self.cms_st.ref_ct)
mut_asl, mut_solvent_asl = _get_asl(self.cms_st.mut_ct)
return AlchemAsl(ref_asl, mut_asl, ref_solvent_asl, mut_solvent_asl)
[docs] def get_ark_results(self):
"""
Function organizes and returns ARK abject
"""
kw_list = sea.List()
m = sea.Map()
kw_list.append(self.simulation_details.export())
temp_m = self.prot_info and self.prot_info.export()
if temp_m:
kw_list.append(temp_m)
for lig in self.alchem_info_list:
if lig is not None:
kw_list.append(lig.export())
if self.replicas_monitor:
kw_list.append(self.replicas_monitor.export())
if self.replica_energies:
kw_list.append(self.replica_energies.export())
if self.lambda0_result:
res_m1 = sea.Map()
res_m1['ResultLambda0'] = self.lambda0_result
kw_list.append(res_m1)
if self.lambda1_result:
res_m2 = sea.Map()
res_m2['ResultLambda1'] = self.lambda1_result
kw_list.append(res_m2)
m['Keywords'] = kw_list
m['BaseFilename'] = self.basename
if self.prm_tag:
m["ProteinMutation"] = self.prm_tag
return m
[docs] def export(self, filename=None):
"""
Writes a file with SID results in them, so they can be read into SID gui
"""
m = self.get_ark_results()
if not filename:
filename = self.basename + '.sid'
with open(filename, 'w') as f:
f.write(self.ark_str(m))
[docs] def ark_str(self, str_in):
"""Sanitize ARK string, by removing the doubleqoutes"""
return str(str_in).strip("\"")
[docs] def launch_SID(self, traj_fn, st2_fn, eaf_fn):
"""
This method launches analyze_simulation.py, a backend for SID analysis
"""
from schrodinger.application.desmond.packages import topo
analyzeTrajExec = os.path.join(os.environ.get("SCHRODINGER"), "run")
# assuming the trajectory and cms file are in the same folder
traj_path = topo.find_traj_path_from_cms_path(traj_fn) or \
get_traj_filename(traj_fn.replace('-out.cms', '').replace('.gz', ''))
if traj_path is None:
print(
'Could not locate a trajectory directory for given CMS file: {}.'
.format(traj_fn))
return
# For solubility FEP, we must rename the 'solute' CT type and re-center
# trajectory on the molecule of interest. This is to prevent the glue to
# be applied to 'solute' block for reliable RMSD nad SASA calculations.
if self.perturbation_type == constants.FEP_TYPES.SOLUBILITY:
traj_fn, traj_path = write_recentered_traj(
traj_fn, self.alchem_info_list[1].asl)
cmd = [
analyzeTrajExec,
"analyze_simulation.py",
traj_fn,
traj_path,
eaf_fn,
st2_fn,
]
print(f"Post-process INFO: {' '.join(cmd)}")
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True,
)
if result.returncode:
print(f'ERROR: SID job failed: {result.stdout}, {result.stderr}')
[docs] def get_analysis(self, fep_lambda):
"""
This method generates an analysis input file, submits the analysis, and
returns an ARK object with results.
:type fep_lambda: int
:rtype `ARK object`
"""
# Save the original pose as reference structure for later analysis
ref_st = get_reference_ct(self.cms_st.fsys_ct)
# Use the analysis reference coords if present
ref_st = get_reference_ct(
ref_st, prop_names=constants.ANALYSIS_REFERENCE_COORD_PROPERTIES)
self.ref_ct_fname = os.path.join(os.getcwd(), 'reference_structure.mae')
ref_st.write(self.ref_ct_fname)
# protein ligand interaction
want_rmsd = want_prmsf = want_ppi = bool(self.prot_info.prot_st)
if self.perturbation_type == constants.FEP_TYPES.SOLUBILITY:
want_ltorsion = want_lrmsf = want_lprops = True if fep_lambda == 1 else False
alchem_st = self.alchem_info_list[fep_lambda].st
alchem_asl = self.alchem_info_list[fep_lambda].asl
want_pli = False
alchem_metal_asl = ""
elif self.alchem_info_list[fep_lambda] is not None:
want_pli = bool(
self.prot_info.prot_st) and self.alchem_info_list[0].asl
want_ltorsion = (self.perturbation_type ==
constants.FEP_TYPES.LIGAND_SELECTIVITY)
want_lrmsf = want_lprops = True
alchem_st = self.alchem_info_list[fep_lambda].st
alchem_asl = self.alchem_info_list[fep_lambda].asl
alchem_metal_asl = self.alchem_info_list[fep_lambda].metal_asl
else:
# see parse_prm_tag for cases when alchem_info may be None
want_pli = want_ltorsion = want_lrmsf = want_lprops = False
alchem_st, alchem_asl, alchem_metal_asl = None, "", ""
if self.perturbation_type == constants.FEP_TYPES.ABSOLUTE_BINDING and fep_lambda == 1:
want_ltorsion = want_lrmsf = False
alchem_st, alchem_asl = None, ''
protein_fep = self.perturbation_type in [
constants.FEP_TYPES.LIGAND_SELECTIVITY,
constants.FEP_TYPES.PROTEIN_STABILITY,
constants.FEP_TYPES.COVALENT_LIGAND,
constants.FEP_TYPES.PROTEIN_SELECTIVITY
]
ark_kw = aag.getPLISKWList(
self.cms_st,
alchem_st,
alchem_asl,
self.ref_ct_fname,
protein_asl=self.full_protein_asl[fep_lambda],
want_rmsd=want_rmsd,
want_ppi=want_ppi,
want_prmsf=want_prmsf,
want_pli=want_pli,
want_lrmsf=want_lrmsf,
want_ltorsion=want_ltorsion,
protein_fep=protein_fep,
metal_asl=alchem_metal_asl,
want_lprops=want_lprops)
# get torsions:
if self.perturbation_type in [
constants.FEP_TYPES.SMALL_MOLECULE,
constants.FEP_TYPES.COVALENT_LIGAND,
constants.FEP_TYPES.METALLOPROTEIN
]:
lig1_st = self.alchem_info_list[0].st.copy()
lig2_st = self.alchem_info_list[1].st.copy()
if self.perturbation_type == constants.FEP_TYPES.COVALENT_LIGAND:
asl = f"not (atom.{constants.FEP_COVALENT_PROTEIN_BB_ATOM} 1 " \
"or res.ptype ACE NMA)"
lig1_st = lig1_st.extract(analyze.evaluate_asl(lig1_st, asl))
lig2_st = lig2_st.extract(analyze.evaluate_asl(lig2_st, asl))
tors_ark_kw = aag.getFEPTorsionKeywords(
lig1_st,
lig2_st,
self.alchem_info_list[0].asl,
self.alchem_info_list[1].asl,
fep_lambda=fep_lambda,
is_covalent=self.perturbation_type ==
constants.FEP_TYPES.COVALENT_LIGAND)
ark_kw += tors_ark_kw
elif self.perturbation_type == constants.FEP_TYPES.ABSOLUTE_BINDING and fep_lambda == 0:
ark_kw += sea.List([
t for t in aag.getTorsionKeywords(
self.alchem_info_list[0].st.copy(),
self.alchem_info_list[0].asl)
])
elif (self.perturbation_type == constants.FEP_TYPES.SOLUBILITY and
fep_lambda == 1 and
self.leg_type == constants.FepLegTypes.SUBLIMATION):
ark_kw.append(
aag.getSolubilityKeywords(
self.alchem_info_list[fep_lambda].asl))
replica = 0
if fep_lambda == 1:
replica = self.nreplicas - 1
suffix = self.basename + '_replica%s' % replica
directory = ''
# in case of regular FEP, where all the lambda windows/replicas
# are in a separate directory
if self.task_type == 'fep':
basename = self.basename.split('_lambda')[0]
directory = basename + '_lambda%i/' % (replica)
suffix = basename + '_lambda%i' % (replica)
temp_ark = sea.Map()
temp_ark["Keywords"] = ark_kw
cms_filename = directory + suffix + '-out.cms'
if self.task_type in ['lambda_hopping', 'afep']:
temp_ark["Trajectory"] = f'{self.basename}_replica{replica}-out.cms'
if self.task_type == 'fep':
temp_ark["Trajectory"] = f'{directory}{suffix}-out.cms'
if self.alchem_info_list[fep_lambda]:
temp_ark["LigandASL"] = self.alchem_info_list[fep_lambda].asl
input_fn = directory + suffix + '-in.st2'
with open(input_fn, 'w') as st2:
st2.write(str(temp_ark))
temp_ark["Trajectory"].val = util.gz_fname_if_exists(
temp_ark["Trajectory"].val)
cms_filename = util.gz_fname_if_exists(cms_filename)
# setup output file
output_fn = suffix + '-out.eaf'
self.launch_SID(cms_filename, input_fn, output_fn)
if os.path.isfile(output_fn):
with open(output_fn) as fh:
return sea.Map(fh.read())
return None
[docs]class FEPSimulationReport(object):
[docs] def __init__(self, basename, task_type, perturbation_type, cfg=None):
self.salt = []
self.salt_concentration = []
self.basename = basename
self.task_type = task_type
self.perturbation_type = perturbation_type
self.cms_st = self.read_cms(self.basename)
if cfg is None:
self.cfg = self.read_cfg(self.basename)
else:
self.cfg = cfg
self.ncpu, self.ngpu = self.get_cpu_gpu_info()
self.job_name = self.basename
self.job_type = self.get_job_type()
self.temperature = self.get_temperature()
self.ensemble = self.get_ensemble()
self.total_atoms = self.cms_st.atom_total
self.total_charge = self.cms_st.formal_charge
self.nwaters = self.get_nwaters()
self.entry_title = self.get_entry_title()
self.force_fields = self.get_ff()
self.sim_time = self.get_sim_time_ns()
self.process_salt_and_ions()
[docs] def export(self):
m = sea.Map()
m['FEPSimulation'] = sea.Map()
m['FEPSimulation']['NumberCPU'] = self.ncpu
m['FEPSimulation']['JobName'] = self.job_name
m['FEPSimulation']['EntryTitle'] = self.entry_title
m['FEPSimulation']['JobType'] = self.job_type
m['FEPSimulation']['TemperatureK'] = self.temperature
m['FEPSimulation']['Ensemble'] = self.ensemble
m['FEPSimulation']['TotalAtoms'] = self.total_atoms
m['FEPSimulation']['TotalCharge'] = self.total_charge
m['FEPSimulation']['NumberWaters'] = self.nwaters
m['FEPSimulation']['ForceFields'] = self.force_fields
m['FEPSimulation']['TotalSimulationNs'] = self.sim_time
m['FEPSimulation']['PerturbationType'] = self.perturbation_type
m['FEPSimulation']['ReleaseVersion'] = schrodinger.get_release_name()
if self.salt:
m['SaltInformation'] = sea.Map()
m['SaltInformation']['SaltMolecules'] = self.salt
m['SaltInformation']['SaltConcentration'] = self.salt_concentration
return m
[docs] def process_salt_and_ions(self):
salt = []
salt_concentration = []
IONS_FFIO_TYPES = [
constants.CT_TYPE.VAL.POSITIVE_SALT,
constants.CT_TYPE.VAL.NEGATIVE_SALT, constants.CT_TYPE.VAL.ION
]
for ct in self.cms_st.comp_ct:
if ct.property[constants.CT_TYPE] not in IONS_FFIO_TYPES:
continue
salt.extend([m.atom[1].element for m in ct.molecule])
if salt:
wat = self.nwaters * 55. # molarity of pure water.
for ion in set(salt):
concent_str = "%.1f mM" % ((salt.count(ion) / wat) * 1e6)
salt_concentration.append((ion, concent_str))
self.salt = salt
self.salt_concentration = salt_concentration
[docs] def get_cms(self):
return self.cms_st
[docs] def get_cpu_gpu_info(self):
ncpus = 1
ngpus = 0
if not self.cfg:
return ncpus, ngpus
cpus = self.cfg.ORIG_CFG.cpu.val
if isinstance(cpus, int):
ncpus = cpus
else:
ncpus = numpy.product(cpus)
return ncpus, ngpus
[docs] def get_sim_time_ns(self):
sim_time = 0.0
if not self.cfg:
return sim_time
if 'time' in list(self.cfg):
sim_time = self.cfg.time.val
elif 'mdsim' in list(self.cfg) and \
'last_time' in list(self.cfg.mdsim):
sim_time = self.cfg.mdsim.last_time.val
elif 'remd' in list(self.cfg) and \
'last_time' in list(self.cfg.remd):
sim_time = self.cfg.remd.last_time.val
return sim_time / 1000.0
[docs] def get_job_type(self):
if self.task_type == 'lambda_hopping':
if self.perturbation_type == constants.FEP_TYPES.SOLUBILITY:
return 'Solubility FEP'
else:
return 'FEP/REST'
elif self.task_type == 'fep':
return 'FEP'
else:
return 'Unknown'
[docs] def get_ensemble(self):
ensemble = 'Unknown*'
if not self.cfg:
return ensemble
cfg = self.cfg
if 'ensemble' in list(cfg.ORIG_CFG):
if cfg.ORIG_CFG.ensemble.has_tag('class_'):
ensemble = str(cfg.ORIG_CFG.ensemble.class_.val)
else:
ensemble = str(cfg.ORIG_CFG.ensemble.val)
if config.is_gcmc(cfg.ORIG_CFG):
# DESMOND-9447
ensemble = ensemble.replace('N', 'mu')
return ensemble
[docs] def get_temperature(self):
temp = 300.
if not self.cfg:
return temp
cfg = self.cfg
if 'temperature' in list(cfg.ORIG_CFG):
temp = cfg.ORIG_CFG.temperature.val
elif 'temperature' in list(cfg.integrator):
temp = cfg.integrator.temperature.T_ref.val
else:
temp = 300.
return float(temp)
[docs] def read_cms(self, basename):
if self.task_type == 'lambda_hopping':
cms_filename = basename + '_replica_0-in.cms'
elif self.task_type == 'afep':
cms_filename = os.path.join(basename,
basename + '_replica_0-in.cms')
elif self.task_type == 'fep':
directory = basename + '_lambda%i/' % 0
cms_filename = directory + basename + '_lambda0-in.cms'
else:
raise TypeError(f'Unknown task_types: {self.task_type}')
cms_filename = util.gz_fname_if_exists(cms_filename)
cms_st = cms.Cms(cms_filename)
for atom in cms_st.atom:
atom.property[constants.ORIGINAL_INDEX] = atom.index
return cms_st
[docs] def get_nwaters(self):
atom_list = self.cms_st.select_atom('water and not a.e H')
return len(atom_list)
[docs] def get_entry_title(self):
entry_title = 'Unknown'
if 's_m_title' in self.cms_st.property:
temp_title = self.cms_st.property['s_m_title']
temp_title = temp_title.replace('(full system)', '')
temp_title = temp_title.replace('-out', '').strip()
if len(temp_title):
entry_title = temp_title
return entry_title
[docs] def get_ff(self):
if 's_ffio_name' in self.cms_st.comp_ct[0].ffio.property:
ff = self.cms_st.comp_ct[0].ffio.property['s_ffio_name']
if ff.count('_REASSIGN'):
ff = ff.replace('_REASSIGN', '*')
else:
ff = 'OPLS_2005'
return ff
[docs] def read_cfg(self, basename):
cfg_filename = basename + '-out.cfg'
if self.task_type == 'fep':
cfg_filename = basename + '_lambda0/' + basename + '_lambda0-out.cfg'
if os.path.isfile(cfg_filename):
with open(cfg_filename) as fh:
return sea.Map(fh.read())
else:
return None
[docs]class ProteinReport(object):
[docs] def __init__(self, cms_st, prot_asl, mutation_tag=None):
self.cms_st = cms_st
self.prot_st = self.get_protein(prot_asl)
self.mutation_tag = mutation_tag
if self.prot_st:
self.natoms, self.nheavy = self.get_number_atoms()
self.nresidues, self.prot_chains = self.get_residues()
self.charge = self.prot_st.formal_charge
self.hot_atoms, self.hot_heavy_atoms = self.get_hot_atoms()
[docs] def export(self):
if not self.prot_st:
return None
m = sea.Map()
m['ProteinInfo'] = sea.Map()
m['ProteinInfo']['NumberAtoms'] = self.natoms
m['ProteinInfo']['NumberHeavyAtoms'] = self.nheavy
m['ProteinInfo']['NumberResidues'] = self.nresidues
m['ProteinInfo']['ChainsNames'] = list(self.prot_chains)
m['ProteinInfo']['ChainsResidues'] = list(self.prot_chains.values())
m['ProteinInfo']['FormalCharge'] = self.charge
m['ProteinInfo']['NumberHotAtoms'] = len(self.hot_atoms)
m['ProteinInfo']['HotHeavyAtoms'] = self.hot_heavy_atoms
if self.mutation_tag:
m['ProteinInfo']['MutationTag'] = self.mutation_tag
return m
[docs] def get_hot_atoms(self):
"""
Returns atoms in the hot region
"""
if not self.prot_st:
return [], []
# depending how you set up rest region, you may have different
# property name
hot_atoms = analyze.evaluate_asl(
self.prot_st,
f'atom.{constants.REST_HOTREGION} 1 or atom.i_user_hotregion 1 '
f'or atom.{constants.REST_PROPERTIES.COMPLEX_HOTREGION} 1 '
f'or atom.{constants.REST_PROPERTIES.SOLVENT_HOTREGION} 1')
hot_heavy_atoms = [
i for i in hot_atoms if self.prot_st.atom[i].element != 'H'
]
return hot_atoms, hot_heavy_atoms
[docs] def get_residues(self):
if not self.prot_st:
return 0, 0
prot_residues = 0
prot_chains = {}
for chain in self.prot_st.chain:
chain_name = chain.name.strip()
if not chain_name:
chain_name = "NoChainId"
rescount = len(chain.residue)
prot_residues += rescount
prot_chains[chain_name] = rescount
return prot_residues, prot_chains
[docs] def get_number_atoms(self):
if not self.prot_st:
return 0, 0
natoms = self.prot_st.atom_total
prot_st_noh = self.prot_st.copy()
build.delete_hydrogens(prot_st_noh)
natoms_noh = prot_st_noh.atom_total
return natoms, natoms_noh
[docs] def get_protein(self, asl):
if not asl:
return None
prot_idx = self.cms_st.select_atom(asl)
if len(prot_idx):
return self.cms_st.extract(prot_idx)
return None
[docs]class SmallMoleculeReport(object):
[docs] def __init__(self,
st,
perturbation_type,
leg_type,
ligand_number=0,
asl=None,
alchem_solvent_st=None,
alchem_solvent_asl=None,
metal_asl=None):
"""
:param perturbation_type: one of several perturbation types
:type perturbation_type: str
:param leg_type: solvent, complex or vacuum
:type leg_type: str
:param asl: Asl for the ligand
:type asl: str
:param alchem_solvent_asl: Asl for alchemical solvent, can be either
water or ions
:type alchem_solvent_asl: str
:param alchem_solvent_st: Ct of alchemical solvent, can be either
water or ions
:type alchem_solvent_st: Structure
:param metal_asl: Asl for the metals and ions
:type metal_asl: str
"""
self.st = st
self.ligand_number = ligand_number
self.asl = asl
self.metal_asl = metal_asl
self.perturbation_type = perturbation_type
self._leg_type = leg_type
self.smiles = self.get_smiles()
self.nRB = analyze.get_num_rotatable_bonds(self.st,
max_size=aag.MAX_RING_SIZE)
self.atomic_mass = self.st.total_weight
self.charge = self.st.formal_charge
self.natoms, self.nheavy_atoms = self.get_natoms()
self.mol_formula = self.get_mol_formula()
self.fragments = self.getLigandFragments()
self.resname = self.get_resname()
self.hot_atoms, self.hot_heavy_atoms = self.get_hot_atoms()
self._alchem_solvent_st = alchem_solvent_st
self._alchem_solvent_asl = alchem_solvent_asl
self.alchem_solv_type, self.natoms_alchem_solv = self.get_alchem_solv()
[docs] def export(self):
m = sea.Map()
k = 'PeptideInfo'
if self.perturbation_type in [
constants.FEP_TYPES.LIGAND_SELECTIVITY,
constants.FEP_TYPES.SMALL_MOLECULE,
constants.FEP_TYPES.METALLOPROTEIN,
constants.FEP_TYPES.ABSOLUTE_BINDING,
constants.FEP_TYPES.SOLUBILITY
]:
k = 'LigandInfo'
m[k] = sea.Map()
m[k]['LigandNumber'] = self.ligand_number
m[k]['NumberAtoms'] = self.natoms
m[k]['NumberHeavyAtoms'] = self.nheavy_atoms
m[k]['ASL'] = self.asl
m[k]['SMILES'] = self.smiles
m[k]['NumberRotatableBonds'] = self.nRB
m[k]['AtomicMass'] = self.atomic_mass
m[k]['Charge'] = self.charge
m[k]['MolecularFormula'] = self.mol_formula
m[k]['NumberFragments'] = len(self.fragments)
m[k]['PDBResidueName'] = self.resname
m[k]['NumberHotAtoms'] = len(self.hot_atoms)
m[k]['HotHeavyAtoms'] = self.hot_heavy_atoms
m[k]['PerturbationType'] = self.perturbation_type
m[k]['LegType'] = self._leg_type
if self.alchem_solv_type:
m[k]['AlchemicalSolvent'] = self.alchem_solv_type
m[k]['AlchemicalSolventASL'] = self._alchem_solvent_asl
m[k]['AlchemicalSolventAtoms'] = self.natoms_alchem_solv
return m
[docs] def get_alchem_solv(self):
"""
Return a alchemical solvent types and number of atoms of such type
"""
if not self._alchem_solvent_st:
return None, 0
solv_type = ' '.join(
[res.pdbres.strip() for res in self._alchem_solvent_st.residue])
return solv_type, len(self._alchem_solvent_st.atom)
[docs] def get_hot_atoms(self):
"""
Returns number of atoms in the hot region. Depending where the rest
region is set up, different property names are used.
"""
hot_atoms = analyze.evaluate_asl(
self.st,
f'atom.{constants.REST_HOTREGION} 1 or atom.i_user_hotregion 1 '
f'or atom.{constants.REST_PROPERTIES.COMPLEX_HOTREGION} 1 '
f'or atom.{constants.REST_PROPERTIES.SOLVENT_HOTREGION} 1')
hot_heavy_atoms = [
i for i in hot_atoms if self.st.atom[i].element != 'H'
]
return hot_atoms, hot_heavy_atoms
[docs] def getLigandFragments(self):
"""
Fragments the ligand in several fragments using the murcko
rules. returns the list of mappings
"""
frag_creator = createfragments.FragmentCreator(
atoms=2,
bonds=100,
carbon_hetero=True,
maxatoms=500, # arbirary choice, which is big enough for cyclic
# peptide and small enough to be considered "ligand"
removedup=False,
murcko=False,
recap=True,
complete=False,
add_h=False,
verbose=False)
fragments = frag_creator.fragmentStructureForEA(self.st)
if len(fragments):
return fragments
return [self.st]
[docs] def get_resname(self):
resname = []
for res in structure.get_residues_by_connectivity(self.st):
res_tag = res.pdbres.strip()
if res_tag != '' and res_tag not in ['ACE', 'NMA']:
resname.append(res_tag)
res = list(set(resname))
if len(res) == 1 and not len(res[0]):
ligand_resname = ['None']
else:
ligand_resname = str(res)
return ligand_resname[1:-1]
[docs] def get_natoms(self):
natoms = self.st.atom_total
nheavy_atoms = 0
for atom in self.st.atom:
if atom.atomic_number != 1:
nheavy_atoms += 1
return natoms, nheavy_atoms
[docs] def get_smiles(self):
smiles_gen = smiles_mod.SmilesGenerator()
smiles = smiles_gen.getSmiles(self.st)
return smiles
if __name__ == '__main__': # coverage: +SKIP
import sys
fsr = FEPReport(sys.argv[1].replace('.sid', ''),
None,
task_type=None,
n_win=None,
perturbation_type=constants.FEP_TYPES.LIGAND_SELECTIVITY)
fsr.export()
print('done')