import itertools
import re
from collections import defaultdict
from collections import namedtuple
from operator import itemgetter
from past.utils import old_div
from typing import Dict
from typing import List
from functools import cached_property
import numpy
# NOTE: It is very important that these imports do not
# load any modules that load the QT libraries.
# This was causing segfaults in the fep_analysis stage (DESMOND-8985).
import schrodinger.structure as structure
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import smarts
from schrodinger.application.desmond.fep_edge_data_classifier import CONVERGENCE
from schrodinger.application.desmond.fep_edge_data_classifier import LIGAND_RMSD
from schrodinger.application.desmond.fep_edge_data_classifier import \
    REST_EXCHANGE
from schrodinger.application.desmond.fep_edge_data_classifier import Rating
from schrodinger.application.desmond.fep_edge_data_classifier import rate
from schrodinger.application.desmond.measurement import Measurement
from schrodinger.structure import Structure
from schrodinger.structutils import smiles as smiles_mod
from schrodinger.structutils.analyze import evaluate_asl
from schrodinger.structutils.analyze import generate_molecular_formula
from schrodinger.application.desmond.solubility_utils import median_sublimation_free_energy
SOLVENT_NAMES = {
    constants.FEP_TYPES.SMALL_MOLECULE: 'Solvent',
    constants.FEP_TYPES.LIGAND_SELECTIVITY: 'Apo',
    constants.FEP_TYPES.PROTEIN_STABILITY: 'Frag. Solvent',
    constants.FEP_TYPES.PROTEIN_SELECTIVITY: 'ProteinA',
    constants.FEP_TYPES.COVALENT_LIGAND: 'Frag. Solvent',
    constants.FEP_TYPES.METALLOPROTEIN: 'Solvent',
    constants.FEP_TYPES.ABSOLUTE_BINDING: 'Solvent',
    constants.FEP_TYPES.SOLUBILITY: 'Hydration'
}
COMPLEX_NAMES = {
    constants.FEP_TYPES.SMALL_MOLECULE: 'Complex',
    constants.FEP_TYPES.LIGAND_SELECTIVITY: 'Holo',
    constants.FEP_TYPES.PROTEIN_STABILITY: 'Protein',
    constants.FEP_TYPES.PROTEIN_SELECTIVITY: 'ProteinA-ProteinB Complex',
    constants.FEP_TYPES.COVALENT_LIGAND: 'Complex',
    constants.FEP_TYPES.METALLOPROTEIN: 'Complex',
    constants.FEP_TYPES.ABSOLUTE_BINDING: 'Complex',
    constants.FEP_TYPES.SOLUBILITY: 'Sublimation'
}
ORI_MOL = "i_m_original_molecule"
OLD_TO_NEW_FEP_TYPES = {
    'prm_stability': constants.FEP_TYPES.PROTEIN_STABILITY,
    'prm_ligand_selectivity': constants.FEP_TYPES.LIGAND_SELECTIVITY,
    'prm_affinity': constants.FEP_TYPES.PROTEIN_SELECTIVITY,
    'small_molecule': constants.FEP_TYPES.SMALL_MOLECULE,
    'covalent_ligand': constants.FEP_TYPES.COVALENT_LIGAND,
    'metalloprotein': constants.FEP_TYPES.METALLOPROTEIN
}
[docs]def water_mol(respdb: str) -> structure.Structure:
    """
    Return a water molecule CT.
    """
    hoh = structure.create_new_structure()
    hoh.addAtom('O', 0., 0., 0., atom_type=16)
    hoh.addAtom('H', -0.7722, 0.4299, 0.4680, atom_type=42)
    hoh.addAtom('H', 0.6347, -0.3724, 0.6771, atom_type=42)
    hoh.addBond(1, 2, 1)
    hoh.addBond(1, 3, 1)
    for res in hoh.residue:
        res.pdbres = respdb.ljust(4)
        res.resnum = 1
    return hoh 
[docs]def tag_protein(proteinA, proteinB):
    """
    This is function was adopted from:
        `scisol.leadoptmap.protein_fep_mapper.tag_protein`
    This proceeds in several steps for mutation of protein A -> protein B
    Loop over all other atoms in the two full length proteins and assign
    1:1 atom mapping. For the mutated residue, it might put some wrong
    numbers in, but it is fine as the correct value will later overwrite
    them.
    :param proteinA: protein A
    :type proteinA: structure
    :param proteinB: protein B
    :type proteinB: structure
    """
    pos_dict = {}
    pdb_dict = {}
    for atom in proteinA.atom:
        pos_dict.setdefault((atom.x, atom.y, atom.z), []).append(atom)
        pdb_dict.setdefault(
            (atom.chain, atom.resnum, atom.inscode, atom.pdbname),
            []).append(atom)
    for atomB in proteinB.atom:
        atomAlist = pos_dict.get((atomB.x, atomB.y, atomB.z), [])
        if len(atomAlist) != 1:
            atomAlist = pdb_dict.get(
                (atomB.chain, atomB.resnum, atomB.inscode, atomB.pdbname), [])
        if len(atomAlist) != 1:
            continue
        atomA = atomAlist[0]
        atomB.property[constants.FEP_MAPPING] = atomA.index
    wt, mut = [], []
    for atom in proteinB.atom:
        if constants.FEP_MAPPING in atom.property:
            wt.append(atom.property[constants.FEP_MAPPING])
            mut.append(atom.index)
    return [wt, mut] 
[docs]def get_fep_simulation_details(
        *, complex_sid: Dict[str, object],
        solvent_sid: Dict[str, object]) -> Dict[str, object]:
    for sim_detail in (complex_sid, solvent_sid):
        if 'ReleaseVersion' not in sim_detail:
            sim_detail['ReleaseVersion'] = 'Unknown'
    return {'solvent': solvent_sid, 'complex': complex_sid} 
def _calculate_rmsf(atoms, rmsf, st):
    """
    Calculate the RMSF per residue.
    :type   atoms: List[int]
    :param  atoms: List of atom indicies.
    :type   rmsf: List[float]
    :param  rmsf: List of RMSF values.
    :type   st:   `schrodinger.structure.Structure` object
    :param  st:   Input structure, must have constants.ORIGINAL_INDEX atom property.
    :rtype: List[float]
    :return: The RMSF for each residue.
    """
    # some receptor files may contain cofactors and waters, so lets just
    # look at the backbone atoms
    atom_list = [
        a.index
        for a in st.atom
        if a.property[constants.ORIGINAL_INDEX] in atoms
    ]
    extracted_st = st.extract(atom_list)
    nres = 0
    res2atom = {}
    for ires, res in enumerate(structure.get_residues_unsorted(extracted_st)):
        nres += 1
        res2atom[ires] = [
            a.property[constants.ORIGINAL_INDEX] for a in res.atom
        ]
    res2rmsf = []
    for ires in range(nres):
        res_sum = sum(rmsf[atoms.index(iatom)] for iatom in res2atom[ires])
        res2rmsf.append(res_sum / len(res2atom[ires]))
    return res2rmsf
def _get_rb_potential(energy, conformation):
    """
    return the list of energy values for rotatable bonds by linear interpolation
    of values give and -180..180 degrees with 10 degree increments
    :param energy: the rotatable bond energy values from -180 to 180 degrees in
        steps of 10 degrees or None
    :type energy: list(float) or NoneType
    :param conformation: list of rotatable bond angles to get the energy values
        for.
    :type conformation: list(float)
    :return: the list of rotatable bond potential energy values at the angles
        given in `conformation`
    :rtype: list(float)
    """
    if energy is None:
        return [0.0] * len(conformation)
    angles = list(range(-180, 181, 10))
    pot = []
    for conf in conformation:
        i_start = len(angles) - 2
        for i, start in enumerate(angles[0:-1]):
            end = angles[i + 1]
            if start <= conf and end > conf:
                i_start = i
                break
        # interpolate potential at 'conf' using simple line (y=mx+b)
        # equation
        e1 = energy[i_start]
        e2 = energy[i_start + 1]
        ang_diff = angles[i_start + 1] - angles[i_start]
        x = (conf - angles[i_start]) / ang_diff
        b = e1
        m = e2 - e1
        potential = (m * x) + b
        pot.append(potential)
    return pot
[docs]class ResData(namedtuple("ResData", ["molnum", "chain", "name", "number"])):
    """
    A class to store the molecule number, chain, name, and number of a residue
    """
    @staticmethod
    def _get_common_name(res):
        substitute_dict = {
            'HIE': 'HIS',
            'HID': 'HIS',
            'HIP': 'HIS',
            'CYX': 'CYS',
            'LYN': 'LYS',
            'ARN': 'ARG',
            'ASH': 'ASP',
            'GLH': 'GLU'
        }
        if res in substitute_dict:
            return substitute_dict[res]
        return res
[docs]    def fullName(self):
        """
        Return a string of the residue data formatted as chain:resname_resnum
        :return: The formatted residue data
        :rtype: str
        """
        return "%s:%s_%d" % (self[1], self._get_common_name(self[2]), self[3]) 
 
[docs]class FEPTorsions:
    """
    :cvar NULL_Y_LIM: The default y min and max for plots which don't contain torsion
        data
    :vartype: tuple(int, int)
    """
    POT_NTICKS = 3
    NULL_Y_LIM = (-1, 1)
    Y_AXIS_SCALE = 1.05
[docs]    def __init__(self,
                 ark_sol=None,
                 ark_cpx=None,
                 sol_idx_offset=0,
                 cpx_idx_offset=0):
        self._ark_sol = ark_sol
        self._ark_cpx = ark_cpx
        self._bins = [-180 + x * 10 for x in range(37)]
        self._sol_idx_offset = sol_idx_offset
        self._cpx_idx_offset = cpx_idx_offset
        self._pot = None 
    @property
    def sol_result(self):
        return self._ark_sol and self._ark_sol.Result.val
    @property
    def cpx_result(self):
        return self._ark_cpx and self._ark_cpx.Result.val
    @property
    def max_potential_energy(self):
        pot_max = max(self.potential_energy)
        return pot_max * self.Y_AXIS_SCALE
    @property
    def potential_energy(self):
        """
        Returns potential energy that's offset to zero
        """
        if self._pot:
            return self._pot
        if ((self._ark_cpx and 'RBPotential' not in self._ark_cpx) and
            (self._ark_sol and 'RBPotential' not in self._ark_sol)):
            pot = [0.0] * 37
        elif 'RBPotential' in self._ark_cpx:
            pot = self._ark_cpx.RBPotential.val
        else:
            pot = self._ark_sol.RBPotential.val
        pot_min = min(pot)
        self._pot = [x - pot_min for x in pot]
        return self._pot
    @property
    def starting_conformation(self):
        """
        :return: the starting conformation's torsion value, with a preference
            for the value in the complex. None if not found.
        :rtype: float or NoneType
        """
        for ark in (self._ark_cpx, self._ark_sol):
            if ark and 'StartingConformation' in ark:
                return ark.StartingConformation.val
        return None
    @property
    def strain(self):
        """
        :return: the value (float) and standard deviation (float) tuples for
            torsion strain in the complex and solvent, i.e.,
            ((cpx_val, cpx_std_dev), (sol_val, sol_st_dev)).
            If the complex or solution didn't have torsion values None will be
            returned for the tuple in question
        :rtype: tuple
        """
        def mean_std(angles):
            if not angles:
                return None
            values = _get_rb_potential(self.potential_energy, angles)
            return (numpy.mean(values), numpy.std(values))
        return tuple(
            mean_std(res) for res in (self.cpx_result, self.sol_result))
    @property
    def _sol_atoms(self):
        if not self._ark_sol:
            return self._cpx_atoms
        aids = [
            self._ark_sol.a1.val, self._ark_sol.a2.val, self._ark_sol.a3.val,
            self._ark_sol.a4.val
        ]
        return [a - self._sol_idx_offset for a in aids]
    @property
    def _cpx_atoms(self):
        aids = [
            self._ark_cpx.a1.val, self._ark_cpx.a2.val, self._ark_cpx.a3.val,
            self._ark_cpx.a4.val
        ]
        return [a - self._cpx_idx_offset for a in aids]
    def __repr__(self):
        """Print Torsion atoms used in the torsion scan."""
        (a1, a2, a3, a4) = self._sol_atoms
        return "FEPTorsion_%i-%i-%i-%i" % (a1, a2, a3, a4)
    def _prepare_plot(self, ax, ax2, hist_tick_pos):
        ax.set_xlim(-200, 200)
        ax.set_yticklabels([])
        ax2.set_xlim(-200, 200)
        ax2.set_xticks([-180, -90, 0, 90, 180])
        if hist_tick_pos is not None:
            ax2.xaxis.set_ticks_position(hist_tick_pos)
            ax2.get_xaxis().set_visible(True)
        else:
            ax2.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
        ax.get_xaxis().set_visible(False)
    def _add_strain_info(self, axis, for_print):
        """
        Add the strain text to the axis, if available
        :param axis: the axis to add the strain to
        :type axis: matplotlib.axes.Axes
        :param for_print: whether the info is for print purposes
        :type for_print: bool
        """
        cpx, sol = self.strain
        if cpx is None and sol is None:
            return
        strain_x, value_x = -205, 5
        txt_y = self.max_potential_energy * -0.02
        kwargs = {'verticalalignment': 'top', 'fontsize': 6 if for_print else 8}
        axis.text(strain_x, txt_y, 'strain', **kwargs)
        if cpx is not None:
            axis.text(value_x, txt_y, f'cpx: {_strain_text(cpx)}', **kwargs)
        if sol is not None:
            kwargs['horizontalalignment'] = 'right'
            axis.text(-value_x, txt_y, f'sol: {_strain_text(sol)}', **kwargs)
[docs]    def plot(self,
             ax,
             color='gray',
             for_print=False,
             pot_tick_pos='right',
             hist_tick_pos='Bottom'):
        ax.clear()
        ax2 = ax.twinx()
        self._prepare_plot(ax, ax2, hist_tick_pos)
        if self.sol_result:
            ax.hist(self.sol_result,
                    self._bins,
                    density=1,
                    histtype='bar',
                    edgecolor=color,
                    facecolor='white',
                    range=(-180, 180),
                    alpha=0.9,
                    hatch='\\\\')
        ax.hist(self.cpx_result,
                self._bins,
                density=1,
                histtype='bar',
                edgecolor=color,
                facecolor=color,
                range=(-180, 180),
                alpha=0.75,
                hatch='//')
        if pot_tick_pos is not None:
            ax2.yaxis.set_ticks_position(pot_tick_pos)
        else:
            ax2.yaxis.set_visible(False)
        ax2.set_ylim(0, self.max_potential_energy)
        ax2.plot(self._bins, self.potential_energy, '-', color='#0C2C84')
        if self.starting_conformation is not None:
            ax2.vlines(self.starting_conformation,
                       *ax2.get_ylim(),
                       color='#A0A0A0',
                       linewidth=2)
        self._add_strain_info(ax2, for_print)
        if len(set(self.potential_energy)) == 1:
            ax2.set_ylim(*self.NULL_Y_LIM)
            ax2.set_yticklabels(["", "", 0.0, "", ""])
        else:
            tick_vals, tick_labels = get_ticks(0, self.max_potential_energy,
                                               self.POT_NTICKS)
            ax2.set_yticks(tick_vals)
            ax2.set_yticklabels(tick_labels)
        for tick in ax2.get_yticklabels():
            tick.set_color('#0C2C84')
        if for_print:
            # NOTE: This imports QT libraries, so needs to be delayed to
            # prevent conflicts with multisim (DESMOND-8985).
            from schrodinger.application.desmond.report_helper import \
                
change_plot_colors
            for tick in ax.get_xticklabels():
                tick.set_fontsize(7)
            change_plot_colors(ax2, spines=True)
            change_plot_colors(ax)
            for tick in ax2.get_yticklabels():
                tick.set_fontsize(6)
                tick.set_color('#0C2C84')
        return ax2  
[docs]class FEPTorsionsContainer:
    """
    Class that stores torsions for a single ligand. These torsions are
    from both solvent and complex legs of the simulations, corresponding
    to a single lambda value.
    """
[docs]    def __init__(self,
                 sol_torsions_sea_list,
                 cpx_torsions_sea_list,
                 sol_idx_offset=0,
                 cpx_idx_offset=0,
                 perturbation_type=constants.FEP_TYPES.SMALL_MOLECULE):
        self._sol_sea_list = sol_torsions_sea_list
        self._cpx_sea_list = cpx_torsions_sea_list
        self._sol_idx_offset = sol_idx_offset
        self._cpx_idx_offset = cpx_idx_offset
        self._perturbation_type = perturbation_type
        self._matched_tors = []
        self._unmatched_tors = []
        self._check_tors()
        self._process_tors() 
    def _check_tors(self):
        if self._perturbation_type == constants.FEP_TYPES.LIGAND_SELECTIVITY:
            self._sol_sea_list = [(None, None)] * len(self._cpx_sea_list)
            return
        if len(self._sol_sea_list) != len(self._cpx_sea_list):
            print("ERROR: inconsistent number of torsions in the ligand's sol"
                  " and cpx sims")
    def _process_tors(self):
        for sol, cpx in zip(self._sol_sea_list, self._cpx_sea_list):
            ft = FEPTorsions(sol[1],
                             cpx[1],
                             sol_idx_offset=self._sol_idx_offset,
                             cpx_idx_offset=self._cpx_idx_offset)
            if sol[0] is not None and cpx[0] is not None:
                self._matched_tors.append(ft)
            else:
                self._unmatched_tors.append(ft)
    @property
    def matched_tors(self):
        return self._matched_tors
    @property
    def unmatched_tors(self):
        return self._unmatched_tors
    @property
    def all_tors(self):
        return self._matched_tors + self._unmatched_tors
    @property
    def matched_total(self):
        return len(self._matched_tors)
    @property
    def unmatched_total(self):
        return len(self._unmatched_tors)
    @property
    def tors_total(self):
        return len(self._unmatched_tors) + len(self._matched_tors) 
[docs]class FEPEdgeData:
    """
    FEPEdgeData contains all the data related to an FEP perturbation.  This
    includes both solvent and complex legs of the simulations as well as
    analysis results produced by SID.
    """
    _SSE_CUTOFF = 0.7
    _pl_inter_names = ['H-bonds', 'Hydrophobic', 'Ionic', 'Water bridges']
    _pl_type = {
        'hb': [0, 1, 9, 10],
        'hphb': [2, 3, 4],
        'ion': [5, 6, 11, 12],
        'wb': [7, 8]
    }
    _pl_code = {'hb': 0, 'hphb': 1, 'ion': 2, 'wb': 3}
    _pl_detail_inter_type = {
        0: [0, '#762A83', 'Backbone donor'],
        1: [0, '#AF8DC3', 'Backbone acceptor'],
        9: [0, '#D9F0D3', 'Side-chain donor'],
        10: [0, '#7FBF7B', 'Side-chain acceptor'],
        2: [1, '#FB9A99', 'Other'],
        3: [1, '#33A02C', 'Pi-Pi stacking'],
        4: [1, '#B2DF8A', 'Pi-cation'],
        5: [2, '#2C7BB6', 'Side chains'],
        6: [2, '#A6BDDB', 'Backbone'],
        11: [2, '#99D8C9', 'Side chain metal-mediated'],
        12: [2, '#E5F5F9', 'Backbone metal-mediated'],
        7: [3, '#D01C8B', 'Donor'],
        8: [3, '#4DAC26', 'Acceptor']
    }
    _pl_contact_types = [
        'hbonds', 'hydrophobic', 'pi_pi', 'pi_cat', 'polar', 'water_br', 'metal'
    ]
[docs]    def __init__(self,
                 complex_sea,
                 solvent_sea,
                 pv_st=None,
                 atom_mapping=None,
                 perturbation_type=constants.FEP_TYPES.SMALL_MOLECULE,
                 ddg_corrections_map=None):
        """
        :type   complex_sea: `sea`
        :param  complex_sea: SEA object with results pertaining to the complex
                             leg of the FEP calculation
        :type   solvent_sea: `sea`
        :param  solvent_sea: SEA object with results pertaining to the solvent
                             leg of the FEP calculation
        :type         pv_st: `schrodinger.Structure`
        :param        pv_st: PoseViewer file must contain 3 structures..
                             [receptor, lig1, lig2]; otherwise it's None
        :type  atom_mapping: tuple(int, int)
        :param atom_mapping: mapping of ligand2 to ligand1 atoms
        :type  ddg_corrections_map: `Dict[str, Measurement]` or None
        """
        self._initSIDVariables()
        self._complex_sea = complex_sea
        self._solvent_sea = solvent_sea
        self._atom_mapping = atom_mapping
        self._ddg_corrections_map = ddg_corrections_map
        self.perturbation_type = OLD_TO_NEW_FEP_TYPES.get(
            perturbation_type, perturbation_type)
        self._pv_st = None
        if (pv_st and len(pv_st) == 3 and perturbation_type in [
                constants.FEP_TYPES.SMALL_MOLECULE,
                constants.FEP_TYPES.ABSOLUTE_BINDING
        ]):
            pv_st[0] = self._init_protein_st(pv_st[0])
            self._pv_st = pv_st
        if self._complex_sea:
            self._parse_cpx_sea(self._complex_sea)
            self._parse_cpx_sid()
        if self._solvent_sea:
            self._parse_sol_sea(self._solvent_sea)
            self._parse_sol_sid()
        self._cpx_sid_protein_residues = None
        self._cpx_sid_pl_results = None
        self._cpx_sid_lp_results = None
        self._receptor_residue_sequence_list = None
        self._ligand1_fep_tors = None
        self._ligand2_fep_tors = None
        self._fullsystem_ct = None
        self._receptor_st = None
        self._ligand1_st = None
        self._ligand2_st = None
        self._pl_contact_data0 = None
        self._pl_contact_data1 = None 
    def _initSIDVariables(self):
        self._sol_sid_lambda0_rmsd = {}
        self._sol_sid_lambda1_rmsd = {}
        self._sol_sid_lambda0_rmsf = {}
        self._sol_sid_lambda1_rmsf = {}
        self._ark_sol_sid_lambda0_torsion = []
        self._ark_sol_sid_lambda1_torsion = []
        self._ligand1_sol_sid_rgyr = []
        self._ligand2_sol_sid_rgyr = []
        self._ligand1_sol_sid_psa = []
        self._ligand2_sol_sid_psa = []
        self._ligand1_sol_sid_sasa = []
        self._ligand2_sol_sid_sasa = []
        self._ligand1_sol_sid_molsa = []
        self._ligand2_sol_sid_molsa = []
        self._ligand1_sol_sid_lighb = []
        self._ligand2_sol_sid_lighb = []
        self._cpx_sid_lambda0_rmsd = {}
        self._cpx_sid_lambda1_rmsd = {}
        self._cpx_sid_lambda0_rmsf = {}
        self._cpx_sid_lambda1_rmsf = {}
        self._cpx_sid_lambda0_sse = []
        self._cpx_sid_lambda1_sse = []
        self._cpx_sid_lambda0_ppi = {}
        self._cpx_sid_lambda1_ppi = {}
        self._ark_cpx_sid_lambda0_torsion = []
        self._ark_cpx_sid_lambda1_torsion = []
        self._ark_cpx_sid_lambda0_pli = None
        self._ark_cpx_sid_lambda1_pli = None
        self._ligand1_cpx_sid_rgyr = []
        self._ligand2_cpx_sid_rgyr = []
        self._ligand1_cpx_sid_psa = []
        self._ligand2_cpx_sid_psa = []
        self._ligand1_cpx_sid_sasa = []
        self._ligand2_cpx_sid_sasa = []
        self._ligand1_cpx_sid_molsa = []
        self._ligand2_cpx_sid_molsa = []
        self._ligand1_cpx_sid_lighb = []
        self._ligand2_cpx_sid_lighb = []
[docs]    def rate(self, name: str) -> Rating:
        """
        Return rating for the FEP edge data with the given `name`. The valid
        names can be found in fep_edge_data_classifiers.py.
        """
        if name == LIGAND_RMSD:
            return rate(LIGAND_RMSD, (self.ligand1_cpx_sid_rmsd(stats=False),
                                      self.ligand2_cpx_sid_rmsd(stats=False)))
        elif name == CONVERGENCE:
            return rate(CONVERGENCE,
                        (self.cpx_start_time_ns, self.cpx_end_time_ns,
                         self.cpx_delta_g_forward))
        elif name == REST_EXCHANGE:
            return rate(REST_EXCHANGE, self.cpx_rest_exchanges)
        else:
            return Rating.NA 
    @property
    def fep_simulation_details(self):
        sim_details = get_fep_simulation_details(
            complex_sid=self._ark_cpx_fep_simulation.val,
            solvent_sid=self._ark_sol_fep_simulation.val)
        sim_details['complex']['LambdaWindows'] = self.cpx_total_replicas
        sim_details['solvent']['LambdaWindows'] = self.sol_total_replicas
        return sim_details
    def _init_protein_st(self, prot_st, zob_waters=True):
        """
        This method cleans up the pv file by:
        1) removes non-ZOB waters
        2) adds property of original indices
        """
        water_smarts = ["[H]O[H]"]
        zob_water_smarts = ["[H]O([H])_[*]", "[H]O[H]_[*]"]
        if zob_waters:
            water_atoms = smarts.evaluate_net_smarts(prot_st, water_smarts,
                                                     zob_water_smarts)
        else:
            water_atoms = smarts.evaluate_net_smarts(prot_st, water_smarts,
                                                     water_smarts)
        nwater = len(water_atoms)
        if nwater and nwater != prot_st.atom_total:
            prot_st.deleteAtoms(water_atoms)
        for atom in prot_st.atom:
            atom.property[constants.ORIGINAL_INDEX] = atom.index
            atom.property[ORI_MOL] = atom.molecule_number
        return prot_st
[docs]    @staticmethod
    def get_smiles(st):
        """
        rtype:  str
        return: Generate SMILES from a given ligand structure.
        """
        smiles_gen = smiles_mod.SmilesGenerator()
        return smiles_gen.getSmiles(st) 
    @property
    def atom_mapping(self):
        return self._atom_mapping
    def _getLigandFragments(self, sid_pli, offset_by_receptor_natoms):
        if sid_pli is None:
            return None
        offset = self.receptor_total_atom if offset_by_receptor_natoms else 0
        dlf = sid_pli['DictLigandFragment'].val
        return self._ligand_fragments(dlf, offset)
[docs]    def ligand1_fragments(self, offset_by_receptor_natoms=True):
        return self._getLigandFragments(self._ark_cpx_sid_lambda0_pli,
                                        offset_by_receptor_natoms) 
[docs]    def ligand2_fragments(self, offset_by_receptor_natoms=True):
        return self._getLigandFragments(self._ark_cpx_sid_lambda1_pli,
                                        offset_by_receptor_natoms) 
    def _ligand_fragments(self, ark_obj, offset=0):
        """
        Return the dictionary of atom fragments for ligand. The atom
        numbers are in the context of a protein-ligand complex, so we need to
        offset the atom values by the atom number in the protein/receptor.
        """
        new_dict = {}
        for k, v in ark_obj:
            new_dict[k] = [x - offset for x in v]
        return new_dict
    def _parse_sol_sid(self):
        sid_list = self._ark_sol_sid_list
        if len(sid_list) < 2:
            return
        ret_dict = parse_sid(sid_list[0])
        self._sol_sid_lambda0_rmsd = self._parse_sid_rms(ret_dict['RMSD'])
        self._sol_sid_lambda0_rmsf = self._parse_sid_rms(ret_dict['RMSF'])
        self._ark_sol_sid_lambda0_torsion = ret_dict['Torsion']
        self._ligand1_sol_sid_rgyr = ret_dict['Rad_Gyration']['Result'].val
        self._ligand1_sol_sid_psa = ret_dict['Polar_Surface_Area']['Result'].val
        self._ligand1_sol_sid_sasa = ret_dict['SA_Surface_Area']['Result'].val
        self._ligand1_sol_sid_molsa = ret_dict['Molecular_Surface_Area'][
            'Result'].val
        self._ligand1_sol_sid_lighb = ret_dict['LigandHBonds']['Result'].val
        ret_dict = parse_sid(sid_list[1])
        self._sol_sid_lambda1_rmsd = self._parse_sid_rms(ret_dict['RMSD'])
        self._sol_sid_lambda1_rmsf = self._parse_sid_rms(ret_dict['RMSF'])
        self._ark_sol_sid_lambda1_torsion = ret_dict['Torsion']
        self._ligand2_sol_sid_rgyr = ret_dict['Rad_Gyration']['Result'].val
        self._ligand2_sol_sid_psa = ret_dict['Polar_Surface_Area']['Result'].val
        self._ligand2_sol_sid_sasa = ret_dict['SA_Surface_Area']['Result'].val
        self._ligand2_sol_sid_molsa = ret_dict['Molecular_Surface_Area'][
            'Result'].val
        self._ligand2_sol_sid_lighb = ret_dict['LigandHBonds']['Result'].val
[docs]    def ligand1_sol_sid_rb_strain(self, stats=True):
        val = self._get_ligand_strain(self._ark_sol_sid_lambda0_torsion)
        return self._lig_props(val, stats=stats) 
[docs]    def ligand2_sol_sid_rb_strain(self, stats=True):
        val = self._get_ligand_strain(self._ark_sol_sid_lambda1_torsion)
        return self._lig_props(val, stats=stats) 
[docs]    def ligand1_cpx_sid_rb_strain(self, stats=True):
        val = self._get_ligand_strain(self._ark_cpx_sid_lambda0_torsion)
        return self._lig_props(val, stats=stats) 
[docs]    def ligand2_cpx_sid_rb_strain(self, stats=True):
        val = self._get_ligand_strain(self._ark_cpx_sid_lambda1_torsion)
        return self._lig_props(val, stats=stats) 
    def _get_ligand_strain(self, ark_torsion):
        if not ark_torsion:
            return [0.]
        strain = []
        for torsion in ark_torsion:
            tval = torsion["Result"].val
            try:
                p = torsion["RBPotential"].val
            except KeyError:
                p = None
            strain.append(_get_rb_potential(p, tval))
        # add potentials of all torsions
        return numpy.array(strain).sum(axis=0)
    def _get_ligand_torsions(self, sol_idx_offsets, cpx_idx_offsets):
        """
        :type sol_idx_offsets: (int, int)
        :param sol_idx_offsets: Offset for the solvent ligand atom idx
                                in lambda0 and lambda1 respectively.
        :type cpx_idx_offsets: (int, int)
        :param cpx_idx_offsets: Offset for the complex ligand atom idx
                                in lambda0 and lambda1 respectively.
        :rtype: (`FEPTorsionsContainer` object, `FEPTorsionsContainer` object)
        :return: `FEPTorsionsContainer` objects for lambda0 and lambda1.
        """
        def sort_tors(ark_obj):
            """
            This method sorts and organizes the torsions in specific way
            """
            ret = []
            for tor in ark_obj:
                fep_pair = None
                if 'FEP_pair' in tor:
                    fep_pair = tor['FEP_pair'].val
                ret.append((fep_pair, tor))
            return ret
        if not self._ligand1_fep_tors:
            self._ligand1_fep_tors = FEPTorsionsContainer(
                sort_tors(self._ark_sol_sid_lambda0_torsion),
                sort_tors(self._ark_cpx_sid_lambda0_torsion),
                sol_idx_offset=sol_idx_offsets[0],
                cpx_idx_offset=cpx_idx_offsets[0],
                perturbation_type=self.perturbation_type)
        if not self._ligand2_fep_tors:
            self._ligand2_fep_tors = FEPTorsionsContainer(
                sort_tors(self._ark_sol_sid_lambda1_torsion),
                sort_tors(self._ark_cpx_sid_lambda1_torsion),
                cpx_idx_offset=cpx_idx_offsets[1],
                sol_idx_offset=sol_idx_offsets[1],
                perturbation_type=self.perturbation_type)
        return self._ligand1_fep_tors, self._ligand2_fep_tors
    @property
    def ligand_torsions(self):
        """
        :rtype: (`FEPTorsionsContainer` object, `FEPTorsionsContainer` object)
        :return: `FEPTorsionsContainer` objects for lambda0 and lambda1.
        """
        sol_idx_offsets = (0, self._pv_st[1].atom_total +
                           self.ligand1_alchemical_atom_total)
        cpx_idx_offsets = (self._pv_st[0].atom_total,
                           self._pv_st[0].atom_total +
                           self._pv_st[1].atom_total +
                           self.ligand1_alchemical_atom_total)
        return self._get_ligand_torsions(sol_idx_offsets, cpx_idx_offsets)
    @staticmethod
    def _lig_props(vals, idx_start=0, stats=True):
        if not stats:
            return vals
        if not len(vals):
            return 0., 0.
        return numpy.mean(vals[idx_start:]), numpy.std(vals[idx_start:])
[docs]    def ligand1_cpx_sid_waters(self, stats=True):
        ligwat = self._ark_cpx_sid_lambda0_pli['LigWatResult'].val
        val = []
        for frame in ligwat:
            val.append(len(frame))
        return self._lig_props(val, stats=stats) 
[docs]    def ligand2_cpx_sid_waters(self, stats=True):
        ligwat = self._ark_cpx_sid_lambda1_pli['LigWatResult'].val
        val = []
        for frame in ligwat:
            val.append(len(frame))
        return self._lig_props(val, stats=stats) 
[docs]    def ligand1_sol_sid_lighb(self, stats=True):
        lhb = self._ligand1_sol_sid_lighb
        val = []
        for frame in lhb:
            val.append(len(frame))
        return self._lig_props(val, stats=stats) 
[docs]    def ligand2_sol_sid_lighb(self, stats=True):
        lhb = self._ligand2_sol_sid_lighb
        val = []
        for frame in lhb:
            val.append(len(frame))
        return self._lig_props(val, stats=stats) 
[docs]    def ligand1_cpx_sid_lighb(self, stats=True):
        lhb = self._ligand1_cpx_sid_lighb
        val = []
        for frame in lhb:
            val.append(len(frame))
        return self._lig_props(val, stats=stats) 
[docs]    def ligand2_cpx_sid_lighb(self, stats=True):
        lhb = self._ligand2_cpx_sid_lighb
        val = []
        for frame in lhb:
            val.append(len(frame))
        return self._lig_props(val, stats=stats) 
[docs]    def ligand1_sol_sid_sasa(self, stats=True):
        val = self._ligand1_sol_sid_sasa
        return self._lig_props(val, stats=stats) 
[docs]    def ligand2_sol_sid_sasa(self, stats=True):
        val = self._ligand2_sol_sid_sasa
        return self._lig_props(val, stats=stats) 
[docs]    def ligand1_cpx_sid_sasa(self, stats=True):
        val = self._ligand1_cpx_sid_sasa
        return self._lig_props(val, stats=stats) 
[docs]    def ligand2_cpx_sid_sasa(self, stats=True):
        val = self._ligand2_cpx_sid_sasa
        return self._lig_props(val, stats=stats) 
[docs]    def ligand1_sol_sid_molsa(self, stats=True):
        val = self._ligand1_sol_sid_molsa
        return self._lig_props(val, stats=stats) 
[docs]    def ligand2_sol_sid_molsa(self, stats=True):
        val = self._ligand2_sol_sid_molsa
        return self._lig_props(val, stats=stats) 
[docs]    def ligand1_cpx_sid_molsa(self, stats=True):
        val = self._ligand1_cpx_sid_molsa
        return self._lig_props(val, stats=stats) 
[docs]    def ligand2_cpx_sid_molsa(self, stats=True):
        val = self._ligand2_cpx_sid_molsa
        return self._lig_props(val, stats=stats) 
[docs]    def ligand1_sol_sid_psa(self, stats=True):
        val = self._ligand1_sol_sid_psa
        return self._lig_props(val, stats=stats) 
[docs]    def ligand2_sol_sid_psa(self, stats=True):
        val = self._ligand2_sol_sid_psa
        return self._lig_props(val, stats=stats) 
[docs]    def ligand1_cpx_sid_psa(self, stats=True):
        val = self._ligand1_cpx_sid_psa
        return self._lig_props(val, stats=stats) 
[docs]    def ligand2_cpx_sid_psa(self, stats=True):
        val = self._ligand2_cpx_sid_psa
        return self._lig_props(val, stats=stats) 
[docs]    def ligand1_sol_sid_rgyr(self, stats=True):
        val = self._ligand1_sol_sid_rgyr
        return self._lig_props(val, stats=stats) 
[docs]    def ligand2_sol_sid_rgyr(self, stats=True):
        val = self._ligand2_sol_sid_rgyr
        return self._lig_props(val, stats=stats) 
[docs]    def ligand1_cpx_sid_rgyr(self, stats=True):
        val = self._ligand1_cpx_sid_rgyr
        return self._lig_props(val, stats=stats) 
[docs]    def ligand2_cpx_sid_rgyr(self, stats=True):
        val = self._ligand2_cpx_sid_rgyr
        return self._lig_props(val, stats=stats) 
    def _getRMSD(self, rmsd_data_map, stats):
        """
        :param rmsd_data_map: a SID data map containing RMSD data
        :type rmsd_data_map: schrodinger.utils.sea.Map
        :param stats: whether to return standard deviation in addition to the
            RMSD
        :type stats: true
        :return: the ligand RMSD, if available, as well as the standard
            deviation, if requested
        :rtype: tuple(float, float) or float or None
        """
        ligand_data = rmsd_data_map.get('Ligand')
        if not ligand_data:
            return None
        return self._lig_props(ligand_data['Result'].val,
                               stats=stats,
                               idx_start=1)
[docs]    def ligand1_sol_sid_rmsd(self, stats=True):
        return self._getRMSD(self._sol_sid_lambda0_rmsd, stats) 
[docs]    def ligand2_sol_sid_rmsd(self, stats=True):
        return self._getRMSD(self._sol_sid_lambda1_rmsd, stats) 
[docs]    def ligand1_cpx_sid_rmsd(self, stats=True):
        return self._getRMSD(self._cpx_sid_lambda0_rmsd, stats) 
[docs]    def ligand2_cpx_sid_rmsd(self, stats=True):
        return self._getRMSD(self._cpx_sid_lambda1_rmsd, stats) 
[docs]    def get_ligand1_atom_dict(self):
        return self._get_ligand_atom_dict(self.ligand1_st) 
[docs]    def get_ligand2_atom_dict(self):
        return self._get_ligand_atom_dict(self.ligand2_st) 
    def _process_sse_data(self, data):
        """
        Process Secondary Structure elements.  The input data is a list
        (for each frame).  This list needs to be parsed (by removing '.') and
        casting it as array
        :type  data:    a list containing raw SSE information from sid/eaf files
        :param data:    list
        :return: processed SSE data in numpy array
        :rtype:  numpy.array
        """
        new_list = []
        for frame in data:
            new_list.append([int(res) for res in frame.split(".")])
        return numpy.array(new_list)
    def _receptor_sid_sse_lambda0(self):
        return self._process_sse_data(self._cpx_sid_lambda0_sse)
    def _receptor_sid_sse_lambda1(self):
        return self._process_sse_data(self._cpx_sid_lambda1_sse)
    def _calculate_sse_limits(self, sse_by_res, tol=0):
        """
        This function takes a vector of residues that are in secondary
        structure elements (SSE), and returns their limits in the list.
        input is the following::
               sse_by_res:  [FFFTTTTTTTTFTTTTTTFFFFFFTTTTTTTFFFF]
            residue index:   1        10        20        30
            tol=0, output:  [(4,10), (12,17), (24,30)]
            tol=1, output:  [(4,17), (24,30)]
        :type  sse_by_res: a list of bools if the residue is a SSE
        :param sse_by_res: `bool`
        :type  tol: int
        :param tol: tolerance level to smoothing out the SSE data. Default is
                    zero, so no tolerance.  If tol>0 is used; then if a residue
                    is inbetween two SSE residues, then those residues will be
                    reported as being part of the SSE.
        :rtype:  `tuple`
        :return: a list of tuples where the SSE starts and begins in terms of
                 residue indices (ie: 51-63, means that a sse spans from residue
                 index 51 to 63).
        """
        s = "".join(str(int(x)) for x in sse_by_res)
        iterator = re.finditer(r'[^0]+', s)
        limits = [m.span() for m in iterator]
        if tol == 0 or len(limits) < 2:
            return limits
        else:
            return self._smooth_sse_limits(tol, limits)
    def _smooth_sse_limits(self, tolerance, limits):
        """
        Here we're trying to to bring some tolerance to the cutoff. if the
        number of residues <= tolerance then we merge the limits of adjacent
        spans.  This is done so that the plots look cleaner.
        :type  tolerance:   int
        :param tolerance:   tolerance for somoothing out the SSE limits
        :type  limits:      `tuple`
        :param limits:      raw limits, without smoothing ie: [(61,70), (82-94)]
        :rtype:  `tuple`
        :return: list of processed tuples
        """
        if not limits:
            return []
        new_limits = []
        working_limit = limits[0]
        for cur_limit in limits[1:]:
            if cur_limit[0] - working_limit[1] < tolerance:
                working_limit = (working_limit[0], cur_limit[1])
            else:
                new_limits.append(working_limit)
                working_limit = cur_limit
        new_limits.append(working_limit)
        return new_limits
    def _sse_limits(self, sse_data):
        """
        Get limits for helix and strands data
        :rtype: (`helix_limits`, `strand_limits`)
        :return: return helix and srand residue limits
        """
        nframes = sse_data.shape[0]
        sse_helix = (old_div((sse_data == 1).sum(axis=0), float(nframes)))
        sse_helix = (sse_helix >= self._SSE_CUTOFF)
        limits_helix = self._calculate_sse_limits(sse_helix, tol=1)
        sse_strand = (old_div((sse_data == 2).sum(axis=0), float(nframes)))
        sse_strand = (sse_strand >= self._SSE_CUTOFF)
        limits_strand = self._calculate_sse_limits(sse_strand, tol=1)
        return limits_helix, limits_strand
    @property
    def sse_limits_lambda0(self):
        data = self._receptor_sid_sse_lambda0()
        helix, strand = self._sse_limits(data)
        return helix, strand
    @property
    def sse_limits_lambda1(self):
        data = self._receptor_sid_sse_lambda1()
        helix, strand = self._sse_limits(data)
        return helix, strand
    @property
    def receptor_sid_rmsd_ligand_lambda0(self):
        """
        ligand1 RMSD wrt the protein
        """
        return self._cpx_sid_lambda0_rmsd['Ligand_wrt_protein']['Result'].val
    @property
    def receptor_sid_rmsd_ligand_lambda1(self):
        """
        ligand2 RMSD wrt the protein
        """
        return self._cpx_sid_lambda1_rmsd['Ligand_wrt_protein']['Result'].val
    def _get_receptor_backbone_atoms(self):
        return self._cpx_sid_lambda0_rmsf['Backbone']['AtomIndices'].val
[docs]    @staticmethod
    def protein_residue(res):
        """
        Get data about the specified residue
        :param res: The residue object to get data from
        :type res: `schrodinger.structure._Residue`
        :return: A namedtuple containing the molecule number, chain, residue
            name, and residue number
        :rtype: `ResData`
        """
        chain = res.chain.strip()
        # get molecule number before extracting the protein
        molnum = res.atom[1].property[ORI_MOL]
        resname = res.pdbres.strip()
        if not len(chain) or chain == ' ':
            chain = '_'
        resnum = res.resnum
        return ResData(molnum, chain, resname, resnum) 
    @property
    def receptor_residue_sequence_list(self):
        """
        Return a list of residue objects (ResData) in amino-to-carboxy order.
        :return: a list of residue objects, ordered N->C (amino to carboxy
            tails).
        :rtype: `ResData`
        """
        if self._receptor_residue_sequence_list:
            return self._receptor_residue_sequence_list
        self._receptor_residue_sequence_list = []
        for r in structure.get_residues_by_connectivity(self.receptor_st):
            self._receptor_residue_sequence_list.append(self.protein_residue(r))
        return self._receptor_residue_sequence_list
    @property
    def receptor_residue_sequence_tags(self):
        """
        A residue tag looks like this: A:THR_124 (Chain:resname_resnum)
        if chain is not defined, use '_' (underscore)
        :return: a list of residue tags
        :rtype: `residue_tag`
        """
        seq = self.receptor_residue_sequence_list
        return [s.fullName() for s in seq]
    def _assemble_protein_b_factor(self):
        """
        Look up all backbone atoms and calculate b factors by groping them by
        residues
        """
        protein_seq = self.receptor_residue_sequence_list
        st = self.fullsystem_ct
        if self.perturbation_type == constants.FEP_TYPES.PROTEIN_SELECTIVITY:
            protein_seq = self.wt_residue_sequence_list
            st = self.wt_st
        sums = defaultdict(float)
        count = defaultdict(int)
        b_atoms = self._get_receptor_backbone_atoms()
        for a in b_atoms:
            if 'r_m_pdb_tfactor' in st.atom[a].property:
                bfactor = st.atom[a].property['r_m_pdb_tfactor']
                if bfactor == 0:
                    continue
                label = self.protein_residue(st.atom[a].getResidue())
                i = protein_seq.index(label)
                sums[i] += bfactor
                count[i] += 1
        return sums, count
    @property
    def receptor_b_factor(self) -> List[float]:
        """
        Return B factors grouped by residues using PDB tfactor values stored in
        the structure. If the PDB tfactor values are not present, return zeros.
        """
        # FIXME: Is it safe to use lambda0 as receptor?
        #        This seems to be equivalent to the original logic.
        data = self._cpx_sid_lambda0_rmsf['Backbone']
        if 'BFactorResult' in data:
            return data['BFactorResult'].val
        sums, count = self._assemble_protein_b_factor()
        seq_list = self.receptor_residue_sequence_list
        if self.perturbation_type == constants.FEP_TYPES.PROTEIN_SELECTIVITY:
            seq_list = self.wt_residue_sequence_list
        new_data = numpy.zeros(len(seq_list))
        for rl in range(0, len(seq_list)):
            if rl in sums:
                new_data[rl] = sums[rl] / count[rl]
        return new_data.tolist()
    @property
    def receptor_sid_rmsd_backbone_lambda0(self):
        return self._cpx_sid_lambda0_rmsd['Backbone']['Result'].val
    @property
    def receptor_sid_rmsd_backbone_lambda1(self):
        return self._cpx_sid_lambda1_rmsd['Backbone']['Result'].val
    @cached_property
    def receptor_sid_rmsf_backbone_lambda0(self):
        data = self._cpx_sid_lambda0_rmsf['Backbone']
        if 'ProteinResidues' in data:
            return data['Result'].val
        return self._protein_sid_rmsf_backbone(data['Result'].val)
    @cached_property
    def receptor_sid_rmsf_backbone_lambda1(self):
        data = self._cpx_sid_lambda1_rmsf['Backbone']
        if 'ProteinResidues' in data:
            return data['Result'].val
        return self._protein_sid_rmsf_backbone(data['Result'].val)
    def _protein_sid_rmsf_backbone(self, backbone_rmsf):
        atom_list = self._get_receptor_backbone_atoms()
        # some receptor files may contain cofactors and waters, so lets just
        # look at the backbone atoms
        st = self.receptor_st.extract(atom_list, True)
        nres = 0
        res2atom = {}
        for ires, res in enumerate(structure.get_residues_by_connectivity(st)):
            nres += 1
            res2atom[ires] = [
                atom.property[constants.ORIGINAL_INDEX] for atom in res.atom
            ]
        rmsf = []
        for ires in range(nres):
            res_sum = 0.0
            for iatom in res2atom[ires]:
                res_sum += backbone_rmsf[atom_list.index(iatom)]
            rmsf.append(res_sum / len(res2atom[ires]))
        return rmsf
    def _parse_cpx_sid(self):
        sid_list = self._ark_cpx_sid_list
        if len(sid_list) < 2:
            return
        ret_dict = parse_sid(sid_list[0])
        self._cpx_sid_lambda0_rmsd = self._parse_sid_rms(ret_dict['RMSD'])
        self._cpx_sid_lambda0_rmsf = self._parse_sid_rms(ret_dict['RMSF'])
        self._cpx_sid_lambda0_sse = ret_dict['SSE']['Result'].val
        self._cpx_sid_lambda0_ppi = ret_dict['ProtProtInter'] and ret_dict[
            'ProtProtInter'].val
        self._ark_cpx_sid_lambda0_torsion = ret_dict['Torsion']
        self._ark_cpx_sid_lambda0_pli = ret_dict['ProtLigInter']
        self._ligand1_cpx_sid_rgyr = ret_dict['Rad_Gyration']['Result'].val
        self._ligand1_cpx_sid_psa = ret_dict['Polar_Surface_Area']['Result'].val
        self._ligand1_cpx_sid_sasa = ret_dict['SA_Surface_Area']['Result'].val
        self._ligand1_cpx_sid_molsa = ret_dict['Molecular_Surface_Area'][
            'Result'].val
        self._ligand1_cpx_sid_lighb = ret_dict['LigandHBonds']['Result'].val
        ret_dict = parse_sid(sid_list[1])
        self._cpx_sid_lambda1_rmsf = self._parse_sid_rms(ret_dict['RMSF'])
        self._cpx_sid_lambda1_rmsd = self._parse_sid_rms(ret_dict['RMSD'])
        self._cpx_sid_lambda1_sse = ret_dict['SSE']['Result'].val
        self._cpx_sid_lambda1_ppi = ret_dict['ProtProtInter'] and ret_dict[
            'ProtProtInter'].val
        self._ark_cpx_sid_lambda1_torsion = ret_dict['Torsion']
        self._ark_cpx_sid_lambda1_pli = ret_dict['ProtLigInter']
        self._ligand2_cpx_sid_rgyr = ret_dict['Rad_Gyration']['Result'].val
        self._ligand2_cpx_sid_psa = ret_dict['Polar_Surface_Area']['Result'].val
        self._ligand2_cpx_sid_sasa = ret_dict['SA_Surface_Area']['Result'].val
        self._ligand2_cpx_sid_molsa = ret_dict['Molecular_Surface_Area'][
            'Result'].val
        self._ligand2_cpx_sid_lighb = ret_dict['LigandHBonds']['Result'].val
    @property
    def cpx_sid_lp_results(self):
        if not self._cpx_sid_lp_results:
            self._cpx_sid_lp_stats(self.ligand1_st, self.ligand2_st)
        return self._cpx_sid_lp_results
    @property
    def pl_contact_data0(self):
        """
        A dictionary containing PL interactions for lambda=0
        """
        if not self._pl_contact_data0:
            self._pl_contact_data0 = self._pl_contact_data(
                self._ark_cpx_sid_lambda0_pli)
        return self._pl_contact_data0
    @property
    def pl_contact_data1(self):
        """
        A dictionary containing PL interactions for lambda=1
        """
        if not self._pl_contact_data1:
            self._pl_contact_data1 = self._pl_contact_data(
                self._ark_cpx_sid_lambda1_pli)
        return self._pl_contact_data1
    @staticmethod
    def _pl_interaction_similarity_matrix(contact_data, ordered_residues):
        """
        This method returns a similarity matrix that's calculated based on
        interactions observed for each frame.
        :param contact_data: A dict containing different interaction types
        :type  contact_data: dict
        :param ordered_residues: A list of residues ordered from N->C
        :type  ordered_residues: list(str)
        :rtype: `numpy.array`
        :return: A similarity matrix of pairwise protein-ligand interactions
                 for each frame.
        """
        def process_contact(iframe, res):
            col_idx = res.rfind(':')
            res_name = res if col_idx == 1 else res[:col_idx]
            resid = ordered_residues.index(res_name)
            pl_contacts[iframe, resid] = True
        if contact_data['hbonds'] is None:
            raise RuntimeError("contact_data has no frames")
        nframes = len(contact_data['hbonds'])
        pl_contacts = numpy.zeros((nframes, len(ordered_residues)), dtype=bool)
        for contact_type, contacts in contact_data.items():
            if contact_type in ["ligand_water", "metal"]:
                continue
            for iframe, frame_contacts in enumerate(contacts):
                for contact in frame_contacts:
                    process_contact(iframe, contact[1])
        sim_matrix = numpy.ones((nframes, nframes))
        for (iframe, jframe) in itertools.combinations(list(range(nframes)), 2):
            iresult = pl_contacts[iframe, :]
            jresult = pl_contacts[jframe, :]
            # Compute similarity between the frame i and j vectors using
            # Tanimoto similarity.
            sim = old_div(sum(iresult & jresult), float(sum(iresult | jresult))) \
                    
if sum(iresult | jresult) != 0 else 0.0
            sim_matrix[iframe, jframe] = sim_matrix[jframe, iframe] = sim
        return sim_matrix
    @property
    def pl_interaction_similarity_matrix0(self):
        """
        Protein-ligand interactions similarity matrix for lambda=0 sys
        for all available frames.
        """
        return self._pl_interaction_similarity_matrix(
            self.pl_contact_data0, self.cpx_sid_protein_residues)
    @property
    def pl_interaction_similarity_matrix1(self):
        """
        Protein-ligand interactions similarity matrix for lambda=1 sys
        for all available frames.
        """
        return self._pl_interaction_similarity_matrix(
            self.pl_contact_data1, self.cpx_sid_protein_residues)
    def _getLidStatMap(self, contact_data, sid_map, lig_frags, lighb, lig_st):
        if not sid_map:
            return {}
        fullsystem_ct = self.fullsystem_ct
        lid_stat_map = {}
        lid_stat_map['hbond'] = self._get_lp_hbond_stats(
            contact_data['hbonds'], sid_map['ligand_atoms'],
            sid_map['prot_hbond'], fullsystem_ct, lig_st)
        lid_stat_map['l_hbond'] = self._get_lig_intra_hbond_stats(
            lighb, sid_map['ligand_atoms'])
        lid_stat_map['p_metal'], lid_stat_map['l_metal'] = \
            
self._get_metal_stats(contact_data['metal'],
                                  sid_map['ligand_atoms'],
                                  sid_map['prot_ionic'], fullsystem_ct)
        lid_stat_map['hphob'] = self._get_hydrophobic_stats(
            contact_data['hydrophobic'], sid_map['prot_hydrophobic'],
            lig_frags(True), fullsystem_ct)
        lid_stat_map['pipi'] = self._get_pipi_stats(contact_data['pi_pi'],
                                                    lig_frags(True),
                                                    sid_map['ligand_rings'],
                                                    sid_map['prot_pi'],
                                                    fullsystem_ct)
        lid_stat_map['picat'] = self._get_picat_stats(contact_data['pi_cat'],
                                                      sid_map['ligand_rings'],
                                                      sid_map['prot_ionic'],
                                                      sid_map['prot_pi'],
                                                      sid_map['ligand_atoms'],
                                                      fullsystem_ct)
        lid_stat_map['ionic'] = self._get_ionic_stats(contact_data['polar'],
                                                      sid_map['ligand_atoms'],
                                                      sid_map['prot_ionic'],
                                                      fullsystem_ct)
        lid_stat_map['wat_br'] = self._get_water_bridge_stats(
            contact_data['water_br'], sid_map['ligand_atoms'],
            sid_map['prot_water_br'], fullsystem_ct)
        lid_stat_map['lig_wat'] = self._get_ligand_water_stats(
            contact_data['ligand_water'], lig_frags(False))
        return lid_stat_map
    def _cpx_sid_lp_stats(self, lig1_st, lig2_st):
        lid_stat_map0 = self._getLidStatMap(self.pl_contact_data0,
                                            self._cpx_sid_pli_lambda0_dict,
                                            self.ligand1_fragments,
                                            self._ligand1_cpx_sid_lighb,
                                            lig1_st)
        lid_stat_map1 = self._getLidStatMap(self.pl_contact_data1,
                                            self._cpx_sid_pli_lambda1_dict,
                                            self.ligand2_fragments,
                                            self._ligand2_cpx_sid_lighb,
                                            lig2_st)
        self._cpx_sid_lp_results = [lid_stat_map0, lid_stat_map1]
    def _get_ligand_water_stats(self, ligwat_data, lig_frags):
        nframes = len(ligwat_data)
        all_lw = []
        results = []
        for frame in ligwat_data:
            for lw in frame:
                all_lw.append((lw[1]))
        unique = set(all_lw)
        for a in unique:
            count = float(all_lw.count(a))
            results.append((old_div(count, nframes), lig_frags[a]))
        return results
    def _get_ionic_stats(self, polar_data, lig_dict, polar_prot_dict, pv_st):
        all_polar = []
        results = []
        nframes = len(polar_data)
        for frame in polar_data:
            for p in frame:
                if p:
                    all_polar.append(tuple(p[1:-1]))
        unique = set(all_polar)
        all_polar_filter = self._filter_degen_res_atoms(all_polar)
        unique_filter = set(all_polar_filter)
        for raw, filt in zip(unique, unique_filter):
            count = float(all_polar_filter.count(filt))
            aid = polar_prot_dict[raw[0]]
            xyz = pv_st.atom[aid].xyz
            results.append(
                (old_div(count, nframes), raw[0], aid, xyz, lig_dict[raw[2]]))
        return results
    def _get_picat_stats(self, picat_data, ligand_ring_dict, polar_prot_dict,
                         pi_prot_dict, lig_dict, pv_st):
        all_picat = []
        results = []
        nframes = len(picat_data)
        for frame in picat_data:
            for pc in frame:
                if len(pc) == 6:
                    all_picat.append(tuple((pc[1], pc[3])))
                if len(pc) == 5:
                    all_picat.append(tuple((pc[1], pc[2])))
        unique = set(all_picat)
        for a in unique:
            count = float(all_picat.count(a))
            if a[0] in polar_prot_dict:
                aid = polar_prot_dict[a[0]]
                ligand = ligand_ring_dict[a[1]]
            else:
                aid = pi_prot_dict[a[0]]
                ligand = lig_dict[a[1]]
            xyz = pv_st.atom[aid].xyz
            results.append((old_div(count, nframes), a[0], xyz, ligand))
        return results
    def _get_pipi_stats(self, pipi_data, lig_frags, ligand_ring_dict,
                        pi_prot_dict, pv_st):
        all_pi = []
        # contains face2face/edge2face information
        all_pi2 = []
        results = []
        nframes = len(pipi_data)
        for frame in pipi_data:
            for pi in frame:
                all_pi.append(tuple((pi[1], pi[3])))
                all_pi2.append(tuple((pi[1], pi[3], pi[6])))
        unique = set(all_pi)
        for a in unique:
            count = float(all_pi.count(a))
            f2f_count = float(all_pi2.count((a[0], a[1], 'f2f')))
            f2f_fraction = old_div(f2f_count, count)
            aid = pi_prot_dict[a[0]]
            xyz = pv_st.atom[aid].xyz
            results.append((old_div(count, nframes), a[0], xyz,
                            ligand_ring_dict[a[1]], f2f_fraction))
        return results
    def _get_hydrophobic_stats(self, hphob_data, hphob_prot_dict, lig_frags,
                               pv_st):
        """
        This function parses non-specific hydrophobic interaction and returns
        a list with statistics.
        """
        all_hp = []
        results = []
        nframes = len(hphob_data)
        for frame in hphob_data:
            for hp in frame:
                all_hp.append(tuple((hp[1], hp[2])))
        unique = set(all_hp)
        for a in unique:
            count = float(all_hp.count(a))
            aid = hphob_prot_dict[a[0]]
            xyz = pv_st.atom[aid].xyz
            results.append((old_div(count, nframes), a[0], xyz, a[1]))
        return results
    def _get_metal_stats(self, metal_data, lig_dict, polar_prot_dict, pv_st):
        """
        This function parses protein-metal and metal-ligand interactions and
        returns two lists, with statistics for each type.
        """
        def get_metal_aid(label):
            return int(label.split(':')[1])
        def get_stats(unique_interaction, all_metal, dictionary):
            results = []
            for a in unique_interaction:
                count = all_metal.count(a)
                aid = dictionary[a[1]]
                xyz = pv_st.atom[aid].xyz
                metal_aid = get_metal_aid(a[0])
                # these coordinates will be used for LID generation,
                # which will figure out better metal placement.
                metal_xyz = [0, 0, 0]
                results.append((count / nframes, a[1], aid, xyz, metal_aid,
                                a[0], metal_xyz))
            return results
        # protein-metal results
        pm_metal = []
        # ligand-metal results
        lm_metal = []
        nframes = len(metal_data)
        for frame in metal_data:
            for m in frame:
                if m[2] == 'ligand':
                    lm_metal.append((m[1], m[3]))
                else:
                    pm_metal.append((m[1], m[3]))
        lm_unique = list(set(lm_metal))
        pm_unique = list(set(pm_metal))
        lm_results = get_stats(lm_unique, lm_metal, lig_dict)
        pm_results = get_stats(pm_unique, pm_metal, polar_prot_dict)
        return pm_results, lm_results
    def _get_water_bridge_stats(self, wb_data, lig_dict, wb_prot_dict, pv_st):
        nframes = len(wb_data)
        all_wb = []
        for frame in wb_data:
            for w in frame:
                all_wb.append(tuple((w[1], w[3])))
        all_wb_filter = self._filter_degen_res_atoms(all_wb)
        unique = set(all_wb)
        # get rid of redundant interactions by collapsing them into single type
        unique_filter = set(all_wb_filter)
        res_dict = {}
        for raw in unique:
            filt = self._filter_degen_res_atoms([raw])[0]
            res_dict[filt[0]] = raw[0]
        results = []
        for filt in unique_filter:
            count = float(all_wb_filter.count(filt))
            aid = wb_prot_dict[res_dict[filt[0]]]
            xyz = pv_st.atom[aid].xyz
            results.append((old_div(count,
                                    nframes), filt[0], xyz, lig_dict[filt[1]]))
        return results
    def _get_lig_intra_hbond_stats(self, hb_data, lig_dict):
        """
        This function parses internal hydrogen bonds within a ligand and
        returns the statistics.
        """
        intra_hbond = []
        results = []
        nframes = len(hb_data)
        for frame in hb_data:
            for hbond in frame:
                intra_hbond.append(tuple(hbond))
        unique = set(intra_hbond)
        for a in unique:
            count = float(intra_hbond.count(a))
            aid_accept = a[0]
            aid_donor = a[1]
            results.append((old_div(count, nframes), aid_donor, aid_accept))
        return results
    def _get_lp_hbond_stats(self, hb_data, lig_dict, prot_dict, pv_st, lig_st):
        """
        Get the statistics on Hydrogen bonds during the simulation.  The
        function also filters out equivalent atoms on the protein & ligand
        and sums their contribution to yield one value.
        """
        dict_lig_noh_atom, dict_lig_atom = self._get_ligand_atom_dict(lig_st)
        all_hbonds = []
        nframes = len(hb_data)
        for frame in hb_data:
            for hbond in frame:
                all_hbonds.append(tuple(hbond[1:]))
        unique = set(all_hbonds)
        unique_filter = self._filter_degen_res_atoms(unique)
        h_dict = {}
        acceptor_types = {
            'acceptor_backbone': 'a-b',
            'acceptor_sidechain': 'a-s'
        }
        for raw, filt in zip(unique, unique_filter):
            lig_aid = lig_dict[raw[2]]
            ori_lig_aid = lig_aid
            # check if ligand's atom is a donor (protein is acceptor), if so,
            # then instead of using the aid of the hydrogen, get the heavy
            # atom that it is bonded to
            if raw[1] in acceptor_types.values():
                bond = pv_st.atom[lig_aid].bond[1]
                new_atm = bond.atom2.property[constants.ORIGINAL_INDEX]
                lig_aid = dict_lig_noh_atom[new_atm]
            count = old_div(all_hbonds.count(raw), float(nframes))
            aid = prot_dict[raw[0]]
            xyz = pv_st.atom[aid].xyz
            key = (filt[0], raw[1], lig_aid)
            if key in h_dict:
                count += h_dict[key][0]
            hbond = (count, filt[0], aid, xyz, filt[2], ori_lig_aid)
            h_dict[key] = hbond
        return list(h_dict.values())
    @staticmethod
    def _filter_degen_res_atoms(hbonds):
        """
        This function takes a list of hydrogen bonds and looks at the
        Protein Residue tag (ie: 'A:LYS_33:1HZ'), and determines if there
        are atoms there are equivalent hydrogen bond donors/acceptors in
        that residue.  If so, then the  atomname is replaced by a more
        a more generic name.
        We need this for LID, in the scenarios when equivalent residue
        donors/acceptors interact with the same ligand atom.  Multiple
        equivalent interactions are created in LID.
        """
        def parse_res_name_with_atoms(s):
            k = s.split(':')
            chain = k[0]
            atom_name = k[2]
            t = k[1].split('_')
            res_name = t[0]
            res_num = t[1]
            return (chain, res_name, res_num, atom_name)
        modify_res = ['LYS', 'ARG', 'ASP', 'ASN', 'GLU', 'GLN', 'HIS']
        LYS_pdb_atm = ['1HZ', '2HZ', '3HZ', 'HZ1', 'HZ2', 'HZ3']
        ARG_pdb_atm = [
            'HE', '1HH1', '2HH1', '1HH2', '2HH2', 'HH11', 'HH12', 'HH21', 'HH22'
        ]
        ARG_pdb_heavy = ['NH1', 'NH2']
        ASP_pdb_atm = ['OD1', 'OD2']
        ASN_pdb_atm = ['1HD2', '2HD2', 'HD21', 'HD22']
        GLU_pdb_atm = ['OE1', 'OE2']
        GLN_pdb_atm = ['1HE2', '2HE2', 'HE21', 'HE22']
        HIS_pdb_atm = ['ND1', 'NE2', 'HD1', 'HE2']
        replace_dict = {
            'LYS': 'HX',
            'ARG': 'HHX',
            'GLN': 'HEX',
            'GLU': 'OEX',
            'ASP': 'ODX',
            'ASN': 'HDX',
            'HIS': 'NX',
            'ARG_heavy': 'NX'
        }
        updated_list = []
        for h in hbonds:
            res_tag = h[0]
            c, resname, resnum, atom_name = parse_res_name_with_atoms(res_tag)
            if resname in modify_res:
                if resname in ['HIS', 'HIE', 'HIP'
                              ] and atom_name in HIS_pdb_atm:
                    if atom_name[0] == 'H':
                        atom_name = 'HX'
                    else:
                        atom_name = replace_dict['HIS']
                elif resname == 'GLN' and atom_name in GLN_pdb_atm:
                    atom_name = replace_dict['GLN']
                elif resname in ['GLU', 'GLH'] and atom_name in GLU_pdb_atm:
                    atom_name = replace_dict['GLU']
                elif resname == 'ASN' and atom_name in ASN_pdb_atm:
                    atom_name = replace_dict['ASN']
                elif resname in ['ASP', 'ASH'] and atom_name in ASP_pdb_atm:
                    atom_name = replace_dict['ASP']
                elif resname in ['ARG', 'ARN']:
                    if atom_name in ARG_pdb_atm:
                        atom_name = replace_dict['ARG']
                    elif atom_name in ARG_pdb_heavy:
                        atom_name = replace_dict['ARG_heavy']
                elif resname in ['LYS', 'LYN'] and atom_name in LYS_pdb_atm:
                    atom_name = replace_dict['LYS']
                h = list(h)
                h[0] = '%s:%s_%s:%s' % (c, resname, resnum, atom_name)
                h = tuple(h)
            updated_list.append(h)
        return updated_list
    @staticmethod
    def _get_ligand_atom_dict(ligst):
        dictNoh = {}
        dictAA = {}
        i = 1
        for a in ligst.atom:
            oi = a.property[constants.ORIGINAL_INDEX]
            dictAA[oi] = a.index
            if a.element != 'H':
                dictNoh[oi] = i
                i += 1
        return dictNoh, dictAA
    @staticmethod
    def _add_alchem_mols(ct: Structure, alchemical_mols: str) -> Structure:
        """
        Add alchemical molecules (e.g: "SPC SPC") to the ct
        """
        ct = ct.copy()
        for alchem_mol in alchemical_mols.split():
            if alchem_mol in ['SPC', 'T3P', 'T4P', 'T4PE', 'T5P', 'T4PD']:
                ct = ct.merge(water_mol(alchem_mol))
            else:
                ct.addAtom(alchem_mol.capitalize(), 0., 0., 0.)
        return ct
    @property
    def fullsystem_ct(self):
        if self._fullsystem_ct:
            return self._fullsystem_ct
        fs = structure.create_new_structure()
        # iterate non-alchemical CTs
        # check scisol's fep_gui.sid_report.py::get_edge_data(...)
        # FIXME: detect if a Structure is alchemical or not?
        for st in self._pv_st[:-2]:
            fs = fs.merge(st, copy_props=True)
        # add alchemical solvents to ref/mut CTs
        for st, alchem_mol in zip(
                self._pv_st[-2:],
            [self.ligand1_alchemical_mols, self.ligand2_alchemical_mols]):
            st = self._add_alchem_mols(st, alchem_mol) if alchem_mol else st
            fs = fs.merge(st, copy_props=True)
        for atom in fs.atom:
            atom.property[constants.ORIGINAL_INDEX] = atom.index
        self._fullsystem_ct = fs
        return self._fullsystem_ct
    @property
    def cpx_sid_pl_results(self):
        if not self._cpx_sid_pl_results:
            self._cpx_sid_pl_contacts()
        return self._cpx_sid_pl_results
    def _cpx_sid_pl_contacts(self):
        """
        This method calculates protein-residue contacts for both lambda0 and
        lambda1 simulations, collates the residue information and then sets
        `self._cpx_sid_pl_results` with the results dictionaries.
        """
        contact_data0 = self.pl_contact_data0
        contact_data1 = self.pl_contact_data1
        nkeys = len(self.cpx_sid_protein_residues)
        ntypes = len(self._pl_detail_inter_type)
        pl_cont_data0 = numpy.zeros(
            (nkeys, self.cpx_sid_number_of_frames, ntypes), dtype=numpy.int)
        pl_cont_data1 = numpy.zeros(
            (nkeys, self.cpx_sid_number_of_frames, ntypes), dtype=numpy.int)
        for i, t in enumerate(self._pl_contact_types):
            self._parse_pl_contact_data(t, contact_data0[t], pl_cont_data0)
            self._parse_pl_contact_data(t, contact_data1[t], pl_cont_data1)
        pl_results0 = {}
        pl_results1 = {}
        for ires, res in enumerate(self.cpx_sid_protein_residues):
            pl_results0[res] = self._get_pli_residue_profile(
                pl_cont_data0[ires, ::, ::])
            pl_results1[res] = self._get_pli_residue_profile(
                pl_cont_data1[ires, ::, ::])
        self._cpx_sid_pl_results = [pl_results0, pl_results1]
    def _get_pli_residue_profile(self, res_data):
        results = {}
        for type in list(self._pl_code):
            cont = numpy.sum(res_data, axis=0)
            ncontacts = 0
            for isubtype in self._pl_type[type]:
                ncontacts += cont[isubtype]
            results[type] = ncontacts
        return results
    def _parse_pl_contact_data(self, inter_type, data, all_cont_data):
        if inter_type in ['hydrophobic', 'pi_pi', 'pi_cat']:
            self._parse_pl_hydrophobic_data(inter_type, data, all_cont_data)
        elif inter_type == 'metal':
            self._parse_pl_metal_data(data, all_cont_data)
        elif inter_type == 'hbonds':
            self._parse_pl_hbond_data(data, all_cont_data)
        elif inter_type == 'polar':
            self._parse_pl_polar_data(data, all_cont_data)
        elif inter_type == 'water_br':
            self._parse_pl_waterbr_data(data, all_cont_data)
    def __get_resname(self, tag):
        temp = tag.split(':')
        return '%s:%s' % (temp[0], temp[1])
    def _parse_pl_hydrophobic_data(self, inter_type, data, all_data):
        subtype_dict = {'hydrophobic': 2, 'pi_pi': 3, 'pi_cat': 4}
        i_type = subtype_dict[inter_type]
        for frame in data:
            for v in frame:
                frame_no = v[0]
                res_name = self.__get_resname(v[1])
                k = self.cpx_sid_protein_residues.index(res_name)
                all_data[k, frame_no, i_type] += 1
    def _parse_pl_metal_data(self, data, all_data):
        subtype_dict = {'prot-bb': 12, 'prot-side': 11}
        for frame in data:
            for v in frame:
                if v[2] == 'ligand':
                    continue
                frame_no = v[0]
                res_name = self.__get_resname(v[3])
                k = self.cpx_sid_protein_residues.index(res_name)
                i_type = subtype_dict[v[2]]
                all_data[k, frame_no, i_type] += 1
    def _parse_pl_hbond_data(self, data, all_data):
        subtype_dict = {'d-b': 0, 'a-b': 1, 'd-s': 9, 'a-s': 10, 'd': 0, 'a': 1}
        self._parse_pl_general_data(subtype_dict, data, all_data)
    def _parse_pl_polar_data(self, data, all_data):
        subtype_dict = {'s': 5, 'b': 6}
        self._parse_pl_general_data(subtype_dict, data, all_data)
    def _parse_pl_waterbr_data(self, data, all_data):
        subtype_dict = {'d': 7, 'a': 8}
        self._parse_pl_general_data(subtype_dict, data, all_data)
    def _parse_pl_general_data(self, subtype_dict, data, all_data):
        for frame in data:
            for v in frame:
                frame_no = v[0]
                res_name = self.__get_resname(v[1])
                k = self.cpx_sid_protein_residues.index(res_name)
                i_type = subtype_dict[v[2]]
                all_data[k, frame_no, i_type] += 1
    @staticmethod
    def _parse_residue_tags(keys):
        """
        Given all protein-ligand contacts; just return the protein residue tag
        that are in contact with the ligand.
        :rtype: `str`
        :return: tags all residues in contact with the ligand
        """
        # get only unique residues tags
        s_keys = sorted(set(keys), key=itemgetter(1))
        pl_cont_keys = sorted(s_keys, key=itemgetter(2))
        return [x[0] for x in pl_cont_keys]
    @property
    def cpx_sid_protein_residues(self):
        """
        A list of protein residues that interact with both ligand1 and ligand2
        throughout the simulation
        :rtype: `str`
        :return: a list of protein tags that interact with both ligand1 and
            ligand2
        """
        if not self._cpx_sid_protein_residues:
            sid_dict0 = self._cpx_sid_pli_lambda0_dict
            keys0 = self._set_cpx_sid_protein_residues(sid_dict0)
            sid_dict1 = self._cpx_sid_pli_lambda1_dict
            keys1 = self._set_cpx_sid_protein_residues(sid_dict1)
            self._cpx_sid_protein_residues = self._parse_residue_tags(keys0 +
                                                                      keys1)
        return self._cpx_sid_protein_residues
    @staticmethod
    def _set_cpx_sid_protein_residues(interact_dict):
        def parse_tag(res_tag):
            tag = res_tag.split(":")
            chain = tag[0]
            rtype, rnum = tag[1].split('_')
            rnum = int(rnum)
            label = '%s:%s' % (tag[0], tag[1])
            return (label, rnum, chain)
        res_keys = []
        for t in [
                'prot_hbond', 'prot_hydrophobic', 'prot_ionic', 'prot_pi',
                'prot_water_br'
        ]:
            for cont in list(interact_dict[t]):
                res_keys.append(parse_tag(cont))
        return res_keys
    @property
    def receptor_residues_interaction_ligand1(self):
        """
        A list of preotein residues that interact just with ligand1
        :rtype:  list
        :return: list of protein residue tags
        """
        sid_dict = self._cpx_sid_pli_lambda0_dict
        keys = self._set_cpx_sid_protein_residues(sid_dict)
        return self._parse_residue_tags(keys)
    @property
    def receptor_residues_interaction_ligand2(self):
        """
        A list of preotein residues that interact just with ligand2
        :rtype:  list
        :return: list of protein residue tags
        """
        sid_dict = self._cpx_sid_pli_lambda1_dict
        keys = self._set_cpx_sid_protein_residues(sid_dict)
        return self._parse_residue_tags(keys)
    @staticmethod
    def _pl_contact_data(ark_block):
        if ark_block is None:
            return {}
        results_dict = {}
        results_dict['hbonds'] = ark_block['HBondResult'].val
        results_dict['hydrophobic'] = ark_block['HydrophobicResult'].val
        results_dict['ligand_water'] = ark_block['LigWatResult'].val
        results_dict['metal'] = ark_block['MetalResult'].val
        results_dict['pi_cat'] = ark_block['PiCatResult'].val
        results_dict['pi_pi'] = ark_block['PiPiResult'].val
        results_dict['polar'] = ark_block['PolarResult'].val
        results_dict['water_br'] = ark_block['WaterBridgeResult'].val
        return results_dict
    @property
    def _cpx_sid_pli_lambda0_dict(self):
        return self._cpx_sid_pli_dict(self._ark_cpx_sid_lambda0_pli)
    @property
    def _cpx_sid_pli_lambda1_dict(self):
        return self._cpx_sid_pli_dict(self._ark_cpx_sid_lambda1_pli)
    @staticmethod
    def _cpx_sid_pli_dict(ark_block):
        if ark_block is None:
            return {}
        def set2dict(list):
            new_dict = {}
            for l in list:
                new_dict[l[0]] = l[1]
            return new_dict
        ret_dict = {}
        ret_dict['ligand_atoms'] = set2dict(ark_block['DictLigandAtoms'].val)
        ret_dict['ligand_rings'] = set2dict(ark_block['DictLigandRing'].val)
        ret_dict['prot_hbond'] = set2dict(ark_block['DictProteinHbond'].val)
        ret_dict['prot_hydrophobic'] = set2dict(
            ark_block['DictProteinHydrophobic'].val)
        ret_dict['prot_ionic'] = set2dict(ark_block['DictProteinIonic'].val)
        ret_dict['prot_pi'] = set2dict(ark_block['DictProteinPi'].val)
        ret_dict['prot_water_br'] = set2dict(
            ark_block['DictProteinWaterBridge'].val)
        return ret_dict
    @property
    def cpx_sid_trajectory_interval_ns(self):
        return _get_interval_ns(self._ark_cpx_sid_list)
    @property
    def sol_sid_trajectory_interval_ns(self):
        return _get_interval_ns(self._ark_sol_sid_list)
    @property
    def cpx_sid_snapshot_times_ps(self):
        interval_ns = self.cpx_sid_trajectory_interval_ns
        return [x * interval_ns for x in range(self.cpx_sid_number_of_frames)]
    @property
    def sol_sid_snapshot_times_ps(self):
        interval_ns = self.sol_sid_trajectory_interval_ns
        return [x * interval_ns for x in range(self.sol_sid_number_of_frames)]
    @property
    def cpx_sid_number_of_frames(self):
        return _get_frame_count(self._ark_cpx_sid_list)
    @property
    def sol_sid_number_of_frames(self):
        return _get_frame_count(self._ark_sol_sid_list)
    def _parse_sid_rms(self, sea_obj):
        """
        :type sea_obj: `list` of sea objects
        """
        results = {}
        for sea in sea_obj:
            if 'SelectionType' in sea:
                seltype = sea['SelectionType'].val
            elif sea['Tab'].val == 'l_rmsf_tab':
                seltype = 'Ligand'
            if seltype == 'Ligand' and 'FitBy' in sea:
                results[seltype + '_wrt_protein'] = sea
            results[seltype] = sea
        return results
    @property
    def leg1_name(self):
        return SOLVENT_NAMES[self.perturbation_type]
    @property
    def leg2_name(self):
        return COMPLEX_NAMES[self.perturbation_type]
    @property
    def sol_timestep_list(self):
        return numpy.arange(self.sol_start_time_ns, self.sol_end_time_ns,
                            self.sol_timestep_interval)
    @property
    def cpx_timestep_list(self):
        return numpy.arange(self.cpx_start_time_ns, self.cpx_end_time_ns,
                            self.cpx_timestep_interval)
    @property
    def sol_timestep_interval(self):
        return float(self.sol_end_time_ns - self.sol_start_time_ns) / len(
            self.sol_delta_g_forward)
    @property
    def cpx_timestep_interval(self):
        return float(self.cpx_end_time_ns - self.cpx_start_time_ns) / len(
            self.cpx_delta_g_forward)
    @property
    def sol_delta_g_sliding_err(self):
        return self._get_values(self._ark_sol_pair_energetics, 'dGSliding',
                                'dG_error')
    @property
    def cpx_delta_g_sliding_err(self):
        return self._get_values(self._ark_cpx_pair_energetics, 'dGSliding',
                                'dG_error')
    @property
    def sol_delta_g_sliding(self):
        return self._get_values(self._ark_sol_pair_energetics, 'dGSliding',
                                'dG')
    @property
    def cpx_delta_g_sliding(self):
        return self._get_values(self._ark_cpx_pair_energetics, 'dGSliding',
                                'dG')
    @property
    def sol_delta_g_reverse_err(self):
        return self._get_values(self._ark_sol_pair_energetics, 'dGReverse',
                                'dG_error')
    @property
    def cpx_delta_g_reverse_err(self):
        return self._get_values(self._ark_cpx_pair_energetics, 'dGReverse',
                                'dG_error')
    @property
    def sol_delta_g_reverse(self):
        return self._get_values(self._ark_sol_pair_energetics, 'dGReverse',
                                'dG')
    @property
    def cpx_delta_g_reverse(self):
        return self._get_values(self._ark_cpx_pair_energetics, 'dGReverse',
                                'dG')
    @property
    def sol_delta_g_forward_err(self):
        return self._get_values(self._ark_sol_pair_energetics, 'dGForward',
                                'dG_error')
    @property
    def cpx_delta_g_forward_err(self):
        return self._get_values(self._ark_cpx_pair_energetics, 'dGForward',
                                'dG_error')
    @property
    def sol_delta_g_forward_dg(self):
        return self._ark_sol_pair_energetics['dGForward']['Bennett']['dG'].val
    @property
    def cpx_delta_g_forward_dg(self):
        return self._ark_cpx_pair_energetics['dGForward']['Bennett']['dG'].val
    @property
    def sol_delta_g_forward_bootstrap_std(self):
        return self._ark_sol_pair_energetics['dGForward']['Bennett'][
            'Bootstrap_std'].val
    @property
    def cpx_delta_g_forward_bootstrap_std(self):
        return self._ark_cpx_pair_energetics['dGForward']['Bennett'][
            'Bootstrap_std'].val
    @property
    def sol_delta_g_forward_analytical_std(self):
        return self._ark_sol_pair_energetics['dGForward']['Bennett'][
            'Analytical_std'].val
    @property
    def cpx_delta_g_forward_analytical_std(self):
        return self._ark_cpx_pair_energetics['dGForward']['Bennett'][
            'Analytical_std'].val
    @property
    def sol_delta_g_forward_df_per_replica(self):
        return self._ark_sol_pair_energetics['dGForward']['Bennett'][
            'dF_Per_Replica'].val
    @property
    def cpx_delta_g_forward_df_per_replica(self):
        return self._ark_cpx_pair_energetics['dGForward']['Bennett'][
            'dF_Per_Replica'].val
    @property
    def sol_delta_g_forward(self):
        return self._get_values(self._ark_sol_pair_energetics, 'dGForward',
                                'dG')
    @property
    def cpx_delta_g_forward(self):
        return self._get_values(self._ark_cpx_pair_energetics, 'dGForward',
                                'dG')
    @property
    def sol_end_time_ns(self):
        return self._get_values(self._ark_sol_pair_energetics, 'dGForward',
                                'EndTime_ns')
    @property
    def cpx_end_time_ns(self):
        return self._get_values(self._ark_cpx_pair_energetics, 'dGForward',
                                'EndTime_ns')
    @property
    def sol_start_time_ns(self):
        return self._get_values(self._ark_sol_pair_energetics, 'dGForward',
                                'StartTime_ns')
    @property
    def cpx_start_time_ns(self):
        return self._get_values(self._ark_cpx_pair_energetics, 'dGForward',
                                'StartTime_ns')
    @staticmethod
    def _get_values(ark_obj, key1, key2):
        return ark_obj[key1][key2].val
    @property
    def receptor_charge(self):
        return self._ark_cpx_protein_info['FormalCharge'].val
    @property
    def receptor_total_heavy(self):
        return self._ark_cpx_protein_info['NumberHeavyAtoms'].val
    @property
    def receptor_total_atom(self):
        return self._ark_cpx_protein_info['NumberAtoms'].val
    @property
    def receptor_title(self):
        name = self.receptor_st.title.strip()
        return name or 'N/A'
    @property
    def receptor_total_residues_in_chains(self):
        res_numb = str(self._ark_cpx_protein_info['ChainsResidues'].val).strip()
        return res_numb[1:-1]
    @property
    def receptor_chain_names(self):
        chain_names = str(self._ark_cpx_protein_info['ChainsNames'].val).strip()
        return chain_names[1:-1]
    @property
    def receptor_total_residues(self):
        return self._ark_cpx_protein_info['NumberResidues'].val
    @property
    def ligand1_total_fragments(self):
        return self._ark_sol_ligand_list[0]['NumberFragments'].val
    @property
    def ligand2_total_fragments(self):
        return self._ark_sol_ligand_list[1]['NumberFragments'].val
    @property
    def ligand1_mol_formula(self):
        return self._ark_sol_ligand_list[0]['MolecularFormula'].val
    @property
    def ligand2_mol_formula(self):
        return self._ark_sol_ligand_list[1]['MolecularFormula'].val
    @property
    def ligand1_charge(self):
        return self._ark_sol_ligand_list[0]['Charge'].val
    @property
    def ligand2_charge(self):
        return self._ark_sol_ligand_list[1]['Charge'].val
    @property
    def ligand1_alchemical_mols(self):
        if 'AlchemicalSolvent' not in self._ark_cpx_ligand_list[0]:
            return None
        return self._ark_cpx_ligand_list[0]['AlchemicalSolvent'].val
    @property
    def ligand2_alchemical_mols(self):
        if 'AlchemicalSolvent' not in self._ark_cpx_ligand_list[1]:
            return None
        return self._ark_cpx_ligand_list[1]['AlchemicalSolvent'].val
    @property
    def ligand1_alchemical_atom_total(self):
        if 'AlchemicalSolventAtoms' not in self._ark_cpx_ligand_list[0]:
            return 0
        return self._ark_cpx_ligand_list[0]['AlchemicalSolventAtoms'].val
    @property
    def ligand2_alchemical_atom_total(self):
        if 'AlchemicalSolventAtoms' not in self._ark_cpx_ligand_list[1]:
            return 0
        return self._ark_cpx_ligand_list[1]['AlchemicalSolventAtoms'].val
    @property
    def ligand1_atomic_mass(self):
        return self._ark_sol_ligand_list[0]['AtomicMass'].val
    @property
    def ligand2_atomic_mass(self):
        return self._ark_sol_ligand_list[1]['AtomicMass'].val
    @property
    def ligand1_rot_bonds(self):
        return self._ark_sol_ligand_list[0]['NumberRotatableBonds'].val
    @property
    def ligand2_rot_bonds(self):
        return self._ark_sol_ligand_list[1]['NumberRotatableBonds'].val
    @property
    def ligand1_total_hot(self):
        return self._ark_sol_ligand_list[0]['NumberHotAtoms'].val
    @property
    def ligand2_total_hot(self):
        return self._ark_sol_ligand_list[1]['NumberHotAtoms'].val
    @property
    def ligand1_total_heavy(self):
        return self._ark_sol_ligand_list[0]['NumberHeavyAtoms'].val
    @property
    def ligand2_total_heavy(self):
        return self._ark_sol_ligand_list[1]['NumberHeavyAtoms'].val
    @property
    def ligand1_total_atoms(self):
        return self._ark_sol_ligand_list[0]['NumberAtoms'].val
    @property
    def ligand2_total_atoms(self):
        return self._ark_sol_ligand_list[1]['NumberAtoms'].val
    @property
    def ligand1_cpx_asl(self):
        return self._ark_cpx_ligand_list[0]['ASL'].val
    @property
    def ligand2_cpx_asl(self):
        return self._ark_cpx_ligand_list[1]['ASL'].val
    @property
    def ligand1_sol_asl(self):
        return self._ark_sol_ligand_list[0]['ASL'].val
    @property
    def ligand2_sol_asl(self):
        return self._ark_sol_ligand_list[1]['ASL'].val
    @property
    def ligand1_pdb_name(self):
        return self._ark_sol_ligand_list[0]['PDBResidueName'].val
    @property
    def ligand2_pdb_name(self):
        return self._ark_sol_ligand_list[1]['PDBResidueName'].val
    @property
    def ligand1_smiles(self):
        return self._ark_sol_ligand_list[0]['SMILES'].val
    @property
    def ligand2_smiles(self):
        return self._ark_sol_ligand_list[1]['SMILES'].val
[docs]    @staticmethod
    def ligand_name(st):
        return st.title.strip() if st else 'None' 
    @property
    def ligand1_name(self):
        name = self.ligand_name(self.ligand1_st)
        if not name:
            return self.ligand2_mol_formula
        return name
    @property
    def ligand2_name(self):
        name = self.ligand_name(self.ligand2_st)
        if not name:
            return self.ligand2_mol_formula
        return name
    @property
    def short_hash(self):
        jn = self._ark_sol_fep_simulation['JobName'].val
        jn = jn.split('_')[-4:-2]
        return '_'.join(jn)
    @property
    def ligand1_hash(self):
        jn = self._ark_sol_fep_simulation['JobName'].val
        hash_ = jn.split('_')[-4:-2]
        return hash_[0]
    @property
    def ligand2_hash(self):
        jn = self._ark_sol_fep_simulation['JobName'].val
        hash_ = jn.split('_')[-4:-2]
        return hash_[1]
    @property
    def jobname(self):
        jn = self._ark_sol_fep_simulation['JobName'].val
        jn = jn.split('_')[0:-4]
        return '_'.join(jn)
    @property
    def delta_delta_g(self):
        sol_dg = Measurement(self.sol_delta_g[0], self.sol_delta_g[1])
        cpx_dg = Measurement(self.cpx_delta_g[0], self.cpx_delta_g[1])
        return cpx_dg.val - sol_dg.val + self.ddg_corrections_sum.val
    @property
    def sol_delta_g(self):
        """
        :return: dG and its standard deviation
        :rtype: float, float
        """
        return self._delta_g(self._ark_sol_pair_energetics)
    @property
    def ddg_corrections_map(self):
        return self._ddg_corrections_map or {}
    @property
    def ddg_corrections_sum(self):
        return sum(self.ddg_corrections_map.values()) or Measurement(0, 0)
    @property
    def cpx_delta_g(self):
        """
        :return: dG and its standard deviation
        :rtype: float, float
        """
        return self._delta_g(self._ark_cpx_pair_energetics)
    @staticmethod
    def _delta_g(ark_pair_energetics):
        if 'dGForward' not in list(ark_pair_energetics) or \
           
'Bennett' not in list(ark_pair_energetics['dGForward']):
            return None, None
        ark_obj = ark_pair_energetics['dGForward']['Bennett']
        dg = ark_obj['dG'].val
        dg_bootstrap_std = ark_obj['Bootstrap_std'].val
        return dg, dg_bootstrap_std
    @property
    def sol_total_replicas(self):
        return len(self._ark_sol_rest_exchanges.val)
    @property
    def cpx_total_replicas(self):
        return len(self._ark_cpx_rest_exchanges.val)
    @property
    def sol_total_waters(self):
        return self._ark_sol_fep_simulation['NumberWaters'].val
    @property
    def cpx_total_waters(self):
        return self._ark_cpx_fep_simulation['NumberWaters'].val
    @property
    def sol_total_atoms(self):
        return self._ark_sol_fep_simulation['TotalAtoms'].val
    @property
    def cpx_total_atoms(self):
        return self._ark_cpx_fep_simulation['TotalAtoms'].val
    @property
    def sol_sim_time(self):
        """
        Values returned in Ns (nanoseconds)
        """
        return self._ark_sol_fep_simulation['TotalSimulationNs'].val
    @property
    def cpx_sim_time(self):
        """
        Values returned in Ns (nanoseconds)
        """
        return self._ark_cpx_fep_simulation['TotalSimulationNs'].val
    @property
    def sol_temperature(self):
        return self._ark_sol_fep_simulation['TemperatureK'].val
    @property
    def cpx_temperature(self):
        return self._ark_cpx_fep_simulation['TemperatureK'].val
    @property
    def sol_ff(self):
        return self._ark_sol_fep_simulation['ForceFields'].val
    @property
    def cpx_ff(self):
        return self._ark_cpx_fep_simulation['ForceFields'].val
    @property
    def sol_ensemble(self):
        return self._ark_sol_fep_simulation['Ensemble'].val
    @property
    def cpx_ensemble(self):
        return self._ark_cpx_fep_simulation['Ensemble'].val
    @property
    def sol_charge(self):
        return self._ark_sol_fep_simulation['FormalCharge'].val
    @property
    def cpx_charge(self):
        return self._ark_cpx_fep_simulation['FormalCharge'].val
    @property
    def sol_salt_molecules(self):
        if not self._ark_sol_salt_info:
            return 0
        return self._ark_sol_salt_info['SaltMolecules'].val
    @property
    def cpx_salt_molecules(self):
        if not self._ark_cpx_salt_info:
            return 0
        return self._ark_cpx_salt_info['SaltMolecules'].val
    @property
    def sol_salt_info(self):
        if not self._ark_sol_salt_info:
            return None
        return self._ark_sol_salt_info['SaltConcentration'].val
    @property
    def cpx_salt_info(self):
        if not self._ark_cpx_salt_info:
            return None
        return self._ark_cpx_salt_info['SaltConcentration'].val
    @property
    def receptor_st(self):
        """
        Returns receptor structure
        """
        if not self._pv_st:
            return None
        if self._receptor_st:
            return self._receptor_st
        if self.perturbation_type == constants.FEP_TYPES.LIGAND_SELECTIVITY:
            self._receptor_st = self._pv_st[1]
        else:
            self._receptor_st = self._pv_st[0]
        return self._receptor_st
    @property
    def ligand1_st(self):
        """
        Returns ligand_1 structure
        """
        if self._ligand1_st:
            return self._ligand1_st
        if self._pv_st:
            offset = self._pv_st[0].atom_total
            for atom in self._pv_st[1].atom:
                atom.property[constants.ORIGINAL_INDEX] = offset + atom.index
            self._ligand1_st = self._pv_st[1]
            return self._ligand1_st
        return None
    @property
    def ligand2_st(self):
        """
        Returns ligand_2 structure
        """
        if self._ligand2_st:
            return self._ligand2_st
        if self._pv_st:
            offset = self._pv_st[0].atom_total + self._pv_st[1].atom_total + \
                
self.ligand1_alchemical_atom_total
            for atom in self._pv_st[2].atom:
                atom.property[constants.ORIGINAL_INDEX] = offset + atom.index
            self._ligand2_st = self._pv_st[2]
            return self._ligand2_st
        return None
    @staticmethod
    def _write_error(msg):
        print(str(msg))
    @property
    def sol_rest_exchanges(self):
        return self._rest_exchanges(self._ark_sol_rest_exchanges)
    @property
    def cpx_rest_exchanges(self):
        return self._rest_exchanges(self._ark_cpx_rest_exchanges)
    @staticmethod
    def _rest_exchanges(ark_obj):
        density_list = list(range(len(ark_obj)))
        for replica in ark_obj:
            irep = replica['Number'].val
            density_list[irep] = replica['HistoryDistribution'].val
        return density_list
    def _parse_cpx_sea(self, cpx_sea):
        """
        Since the ARK-formatted sid file contains a lot of data, in a list
        format, we need to parse it.
        """
        if not cpx_sea:
            return
        ret_dict = self._parse_sea(cpx_sea)
        self._ark_cpx_fep_simulation = ret_dict['fep_simulation']
        self._ark_cpx_salt_info = ret_dict.get('salt_information')
        self._ark_cpx_protein_info = ret_dict['protein_info']
        self._ark_cpx_pair_energetics = ret_dict['fep_pair_energetics_list']
        self._ark_cpx_ligand_list = ret_dict['ligand_list']
        self._ark_cpx_rest_exchanges = ret_dict['rest_exchanges']
        self._ark_cpx_sid_list = ret_dict.get('sid_ligand_list', [])
    def _parse_sol_sea(self, sol_sea):
        """
        Since the ARK-formatted sid file contains a lot of data, in a list
        format, we need to parse it.
        """
        if not sol_sea:
            return
        ret_dict = self._parse_sea(sol_sea)
        self._ark_sol_fep_simulation = ret_dict['fep_simulation']
        self._ark_sol_salt_info = ret_dict.get('salt_information')
        self._ark_sol_pair_energetics = ret_dict['fep_pair_energetics_list']
        self._ark_sol_ligand_list = ret_dict['ligand_list']
        self._ark_sol_rest_exchanges = ret_dict['rest_exchanges']
        self._ark_sol_sid_list = ret_dict.get('sid_ligand_list', [])
    def _parse_sea(self, sea_obj):
        try:
            ret_dict = parse_sea(sea_obj)
        except ValueError as err:
            self._write_error(err)
            raise
        return ret_dict 
[docs]class PRMEdgeData(FEPEdgeData):
    """
    PRMEdgeData is an object that stores Protein Residue Mutation related data.
    And inherits from FEPEdgeData.
    """
[docs]    def __init__(self,
                 complex_sea,
                 solvent_sea,
                 pv_st=None,
                 atom_mapping=None,
                 perturbation_type=constants.FEP_TYPES.SMALL_MOLECULE,
                 frag_atom_mapping=None):
        """
        :type   complex_sea: `sea`
        :param  complex_sea: SEA object with results pertaining to the complex
                             leg of the FEP calculation
        :type   solvent_sea: `sea`
        :param  solvent_sea: SEA object with results pertaining to the solvent
                             leg of the FEP calculation
        :type         pv_st: `schrodinger.Structure`
        :param        pv_st: PoseViewer file must contain 2 or 3 structures.
                             [receptor, lig1, lig2]; otherwise it's None
        :type  atom_mapping: `int`, `int`
        :param atom_mapping: mapping of ligand2 to ligand1 atoms
        :type  frag_atom_mapping: `int`, `int`
        :param frag_atom_mapping: mapping of frag1 and frag2 atoms
        """
        perturbation_type = OLD_TO_NEW_FEP_TYPES.get(perturbation_type,
                                                     perturbation_type)
        FEPEdgeData.__init__(self, complex_sea, solvent_sea, pv_st,
                             atom_mapping, perturbation_type)
        self._frag_atom_mapping = frag_atom_mapping
        self._pv_st = None
        self._wt_st = None
        self._mut_st = None
        self._wt_frag_st = None
        self._mut_frag_st = None
        self._solvent_fullsystem_ct = None
        self._wt_residue_sequence_list = None
        self._mut_residue_sequence_list = None
        ncts = 2 if perturbation_type == constants.FEP_TYPES.PROTEIN_STABILITY else 3
        if pv_st and len(pv_st) == ncts:
            self._pv_st = [
                self._init_protein_st(ct, zob_waters=True) for ct in pv_st
            ]
        if self._complex_sea:
            self._parse_cpx_sea(self._complex_sea)
            self._parse_cpx_sid()
        if self._solvent_sea:
            self._parse_sol_sea(self._solvent_sea)
            self._parse_sol_sid()
        if self.perturbation_type in [constants.FEP_TYPES.LIGAND_SELECTIVITY]:
            st = self._pv_st[0].copy()
            self.ligand_st = st.extract(
                evaluate_asl(st, "not water and not (metals) and not (ions)"))
            atom_list = [a.index for a in self.ligand_st.atom]
            self._atom_mapping = (atom_list, atom_list)
            self._calc_lig_props() 
    def _calc_lig_props(self):
        self.ligand_charge = self.ligand_st.formal_charge
        self.ligand_mass = self.ligand_st.total_weight
        self.ligand_name = list(self.ligand_st.residue)[0].pdbres.strip()
        self.ligand_nheavy = len(
            [a for a in self.ligand_st.atom if a.atomic_number != 1])
        self.ligand_torsions
        self.ligand_rb_total = self._ligand1_fep_tors.tors_total
    @property
    def wt_st(self):
        """
        Returns full structure of a WT protein
        """
        if self._wt_st:
            return self._wt_st
        offset = 0
        if self.perturbation_type != constants.FEP_TYPES.PROTEIN_STABILITY:
            offset = self._pv_st[0].atom_total
        self._wt_st = self._pv_st[-2].copy()
        for atom in self._wt_st.atom:
            atom.property[constants.ORIGINAL_INDEX] = offset + atom.index
        if self.perturbation_type == constants.FEP_TYPES.PROTEIN_SELECTIVITY:
            self._wt_st = self.receptor_st.merge(self._wt_st, copy_props=True)
        return self._wt_st
    @property
    def mut_st(self):
        """
        Returns full structure of a mutated protein
        """
        if self._mut_st:
            return self._mut_st
        offset = self._pv_st[0].atom_total + self.ligand1_alchemical_atom_total
        if self.perturbation_type in [
                constants.FEP_TYPES.PROTEIN_SELECTIVITY,
                constants.FEP_TYPES.LIGAND_SELECTIVITY
        ]:
            offset += self._pv_st[1].atom_total
        self._mut_st = self._pv_st[-1].copy()
        for atom in self._mut_st.atom:
            atom.property[constants.ORIGINAL_INDEX] = offset + atom.index
        if self.perturbation_type == constants.FEP_TYPES.PROTEIN_SELECTIVITY:
            self._mut_st = self.receptor_st.merge(self._mut_st, copy_props=True)
        return self._mut_st
    @property
    def ligand1_st(self):
        return self.wt_frag_st
    @property
    def ligand2_st(self):
        return self.mut_frag_st
    @property
    def receptor_st(self):
        if self.perturbation_type != constants.FEP_TYPES.PROTEIN_SELECTIVITY:
            return self.wt_st
        if self._receptor_st:
            return self._receptor_st
        self._receptor_st = self._pv_st[0].copy()
        for atom in self._receptor_st.atom:
            atom.property[constants.ORIGINAL_INDEX] = atom.index
        return self._receptor_st
    @property
    def wt_frag_st(self):
        if self._wt_frag_st:
            return self._wt_frag_st
        if self.perturbation_type in [
                constants.FEP_TYPES.PROTEIN_SELECTIVITY,
                constants.FEP_TYPES.PROTEIN_STABILITY
        ]:
            atomlist = evaluate_asl(self.fullsystem_ct, self.ligand1_cpx_asl)
            self._wt_frag_st = self.fullsystem_ct.extract(atomlist, True)
        else:
            atomlist = evaluate_asl(self.solvent_fullsystem_ct,
                                    self.ligand1_sol_asl)
            self._wt_frag_st = self.solvent_fullsystem_ct.extract(
                atomlist, True)
        return self._wt_frag_st
    @property
    def mut_frag_st(self):
        if self._mut_frag_st:
            return self._mut_frag_st
        if self.perturbation_type in [
                constants.FEP_TYPES.PROTEIN_SELECTIVITY,
                constants.FEP_TYPES.PROTEIN_STABILITY
        ]:
            atomlist = evaluate_asl(self.fullsystem_ct, self.ligand2_cpx_asl)
            self._mut_frag_st = self.fullsystem_ct.extract(atomlist, True)
        else:
            atomlist = evaluate_asl(self.solvent_fullsystem_ct,
                                    self.ligand2_sol_asl)
            self._mut_frag_st = self.solvent_fullsystem_ct.extract(
                atomlist, True)
        return self._mut_frag_st
    @property
    def solvent_fullsystem_ct(self):
        if self.perturbation_type == constants.FEP_TYPES.PROTEIN_STABILITY:
            # For PRM stability jobs we don't use the solvent fullsystem CT
            # If ever need to be implemented, we must also include peptide
            # fratments as an input.
            return None
        if self._solvent_fullsystem_ct:
            return self._solvent_fullsystem_ct
        fs = structure.create_new_structure()
        for st, alchem_mol in zip(
                self._pv_st[-2:],
            [self.ligand1_alchemical_mols, self.ligand2_alchemical_mols]):
            st = self._add_alchem_mols(st, alchem_mol) if alchem_mol else st
            fs = fs.merge(st, copy_props=True)
        for atom in fs.atom:
            atom.property[constants.ORIGINAL_INDEX] = atom.index
        self._solvent_fullsystem_ct = fs
        return self._solvent_fullsystem_ct
    @property
    def ligand_st_mol_formula(self):
        return generate_molecular_formula(self.ligand_st)
    @property
    def prm_name(self):
        from_res = list(self.wt_frag_st.residue)[1]
        to_res = list(self.mut_frag_st.residue)[1]
        return "%s:%s-%i-%s" % (from_res.chain, from_res.pdbres.strip(),
                                from_res.resnum, to_res.pdbres.strip())
    @staticmethod
    def _protein_sid_rmsf_backbone(backbone_atoms, backbone_rmsf, st):
        # some receptor files may contain cofactors and waters, so lets just
        # look at the backbone atoms
        atom_list = [
            a.index
            for a in st.atom
            if a.property[constants.ORIGINAL_INDEX] in backbone_atoms
        ]
        bb_st = st.extract(atom_list, True)
        nres = 0
        res2atom = {}
        for ires, res in enumerate(
                structure.get_residues_by_connectivity(bb_st)):
            nres += 1
            res2atom[ires] = [
                a.property[constants.ORIGINAL_INDEX] for a in res.atom
            ]
        rmsf = []
        for ires in range(nres):
            res_sum = 0.0
            for iatom in res2atom[ires]:
                res_sum += backbone_rmsf[backbone_atoms.index(iatom)]
            rmsf.append(old_div(res_sum, len(res2atom[ires])))
        return rmsf
    @cached_property
    def receptor_sid_rmsf_backbone_lambda0(self):
        # There are two types of data formats. The new one contains per-residue
        # RMSF. The old one contains atom-wise RMSF and per-residue RMSF is
        # calculated from self._protein_sid_rmsf_backbone().
        data = self._cpx_sid_lambda0_rmsf['Backbone']
        if 'ProteinResidues' in data:
            return data['Result'].val
        rmsf_vals = data['Result'].val
        backbone_atoms = data['AtomIndices'].val
        return self._protein_sid_rmsf_backbone(backbone_atoms, rmsf_vals,
                                               self.wt_st)
    @cached_property
    def receptor_sid_rmsf_backbone_lambda1(self):
        data = self._cpx_sid_lambda1_rmsf['Backbone']
        if 'ProteinResidues' in data:
            return data['Result'].val
        rmsf_vals = data['Result'].val
        backbone_atoms = data['AtomIndices'].val
        return self._protein_sid_rmsf_backbone(backbone_atoms, rmsf_vals,
                                               self.mut_st)
    @property
    def ligand_torsions(self):
        """
        :rtype: (`FEPTorsionsContainer` object, `FEPTorsionsContainer` object)
        :return: `FEPTorsionsContainer` objects for lambda0 and lambda1.
        """
        sol_idx_offsets = (0, 0)
        cpx_idx_offsets = (0, 0)
        return self._get_ligand_torsions(sol_idx_offsets, cpx_idx_offsets)
[docs]    @staticmethod
    def peptide_name(st):
        name = "-".join([res.pdbres.strip() for res in st.residue])
        resnum1 = list(st.residue)[0].resnum
        resnum3 = list(st.residue)[-1].resnum
        return "%i-%s-%i" % (resnum1, name, resnum3) 
    @property
    def wt_name(self):
        return self.peptide_name(self.wt_frag_st)
    @property
    def mut_name(self):
        return self.peptide_name(self.mut_frag_st)
    @property
    def wt_residue_sequence_list(self):
        """
        Return a list of residue objects (ResData) in amino-to-carboxy order.
        :return: a list of residue objects, ordered N->C (amino to carboxy
            tails).
        :rtype: `ResData`
        """
        if self._wt_residue_sequence_list:
            return self._wt_residue_sequence_list
        self._wt_residue_sequence_list = []
        st = self.wt_st.extract(evaluate_asl(self.wt_st, 'protein'), True)
        for r in structure.get_residues_by_connectivity(st):
            self._wt_residue_sequence_list.append(self.protein_residue(r))
        return self._wt_residue_sequence_list
    @property
    def wt_residue_sequence_tags(self):
        """
        A residue tag looks like this: A:THR_124 (Chain:resname_resnum)
        if chain is not defined, use '_' (underscore)
        :return: a list of residue tags
        :rtype: `residue_tag`
        """
        return [s.fullName() for s in self.wt_residue_sequence_list]
    @property
    def mut_residue_sequence_list(self):
        """
        Return a list of residue objects (ResData) in amino-to-carboxy order.
        :return: a list of residue objects, ordered N->C (amino to carboxy
            tails).
        :rtype: `ResData`
        """
        if self._mut_residue_sequence_list:
            return self._mut_residue_sequence_list
        self._mut_residue_sequence_list = []
        st = self.mut_st.extract(evaluate_asl(self.mut_st, 'protein'), True)
        for r in structure.get_residues_by_connectivity(st):
            self._mut_residue_sequence_list.append(self.protein_residue(r))
        return self._mut_residue_sequence_list
    @property
    def mut_residue_sequence_tags(self):
        """
        A residue tag looks like this: A:THR_124 (Chain:resname_resnum)
        if chain is not defined, use '_' (underscore)
        :return: a list of residue tags
        :rtype: `residue_tag`
        """
        return [s.fullName() for s in self.mut_residue_sequence_list]
    @property
    def cpx_sid_lp_results(self):
        if not self._cpx_sid_lp_results:
            if self.perturbation_type == constants.FEP_TYPES.LIGAND_SELECTIVITY:
                self._cpx_sid_lp_stats(self.ligand_st, self.ligand_st)
            else:
                self._cpx_sid_lp_stats(self.ligand1_st, self.ligand2_st)
        return self._cpx_sid_lp_results
    @property
    def atom_mapping(self):
        if self.perturbation_type in [
                constants.FEP_TYPES.PROTEIN_STABILITY,
                constants.FEP_TYPES.PROTEIN_SELECTIVITY
        ]:
            return self._frag_atom_mapping
        return self._atom_mapping 
[docs]class CovalentFEPEdgeData(FEPEdgeData):
    """
    This object stores covalent FEP related data.
    """
[docs]    def __init__(self,
                 complex_sea,
                 solvent_sea,
                 pv_st=None,
                 sol_st=None,
                 atom_mapping=None,
                 perturbation_type=constants.FEP_TYPES.SMALL_MOLECULE,
                 frag_atom_mapping=None):
        """
        :type   complex_sea: `sea`
        :param  complex_sea: SEA object with results pertaining to the complex
                             leg of the FEP calculation
        :type   solvent_sea: `sea`
        :param  solvent_sea: SEA object with results pertaining to the solvent
                             leg of the FEP calculation
        :type         pv_st: `schrodinger.Structure`
        :param        pv_st: PoseViewer file must contain 2 structures.
                             [complex1, complex2]; otherwise it's None
        :type        sol_st: solvent fragments tuple
        :param       sol_st: (`schrodinger.Structure`, `schrodinger.Structure`)
        :type  atom_mapping: `int`, `int`
        :param atom_mapping: mapping of ligand2 to ligand1 atoms
        :type  frag_atom_mapping: `int`, `int`
        :param frag_atom_mapping: mapping of frag1 and frag2 atoms
        """
        perturbation_type = OLD_TO_NEW_FEP_TYPES.get(perturbation_type,
                                                     perturbation_type)
        FEPEdgeData.__init__(self, complex_sea, solvent_sea, pv_st,
                             frag_atom_mapping, perturbation_type)
        self._full_atom_mapping = atom_mapping
        self._atom_mapping_cpx = None
        self._pv_st = None
        self._wt_st = None
        self._mut_st = None
        self._ligand1_st = sol_st[0]
        self._ligand2_st = sol_st[1]
        self._ligand1_cpx_st = None
        self._ligand2_cpx_st = None
        self._solvent_fullsystem_ct = None
        self._wt_residue_sequence_list = None
        self._mut_residue_sequence_list = None
        ncts = 2
        if pv_st and len(pv_st) == ncts:
            self._pv_st = [
                self._init_protein_st(ct, zob_waters=True) for ct in pv_st
            ]
        if self._complex_sea:
            self._parse_cpx_sea(self._complex_sea)
            self._parse_cpx_sid()
        if self._solvent_sea:
            self._parse_sol_sea(self._solvent_sea)
            self._parse_sol_sid() 
    @property
    def wt_st(self):
        """
        Returns full structure of a WT protein
        """
        if self._wt_st:
            return self._wt_st
        self._wt_st = self._pv_st[0].copy()
        for atom in self._wt_st.atom:
            atom.property[constants.ORIGINAL_INDEX] = atom.index
        return self._wt_st
    @property
    def mut_st(self):
        """
        Returns full structure of a mutated protein
        """
        if self._mut_st:
            return self._mut_st
        self._mut_st = self._pv_st[1].copy()
        offset = self.wt_st.atom_total
        for atom in self._mut_st.atom:
            atom.property[constants.ORIGINAL_INDEX] = offset + atom.index + \
                
self.ligand1_alchemical_atom_total
        return self._mut_st
    @property
    def receptor_st(self):
        return self.wt_st
    @property
    def ligand1_cpx_st(self):
        if self._ligand1_cpx_st:
            return self._ligand1_cpx_st
        atomlist = evaluate_asl(self.fullsystem_ct, self.ligand1_cpx_asl)
        self._ligand1_cpx_st = self.fullsystem_ct.extract(atomlist, True)
        self._ligand1_cpx_st.title = self.ligand1_st.title
        return self._ligand1_cpx_st
    @property
    def ligand2_cpx_st(self):
        if self._ligand2_cpx_st:
            return self._ligand2_cpx_st
        atomlist = evaluate_asl(self.fullsystem_ct, self.ligand2_cpx_asl)
        self._ligand2_cpx_st = self.fullsystem_ct.extract(atomlist, True)
        self._ligand2_cpx_st.title = self.ligand2_st.title
        return self._ligand2_cpx_st
    @property
    def ligand1_st(self):
        return self._ligand1_st
    @property
    def ligand2_st(self):
        return self._ligand2_st
    @property
    def solvent_fullsystem_ct(self):
        if self._solvent_fullsystem_ct:
            return self._solvent_fullsystem_ct
        fs = structure.create_new_structure()
        for frag, alchem_mols in zip(
            [self.ligand1_st, self.ligand2_st],
            [self.ligand1_alchemical_mols, self.ligand2_alchemical_mols]):
            frag = self._add_alchem_mols(frag, alchem_mols)
            fs = fs.merge(frag, copy_props=True)
        for atom in fs.atom:
            atom.property[constants.ORIGINAL_INDEX] = atom.index
        self._solvent_fullsystem_ct = fs
        return self._solvent_fullsystem_ct
    @staticmethod
    def _protein_sid_rmsf_backbone(backbone_atoms, backbone_rmsf, st):
        """
        Calculate the protein backbond RMSF per residue.
        :type   backbone_atoms: List[int]
        :param  backbone_atoms: List of backbone atom indicies.
        :type   backbone_rmsf: List[float]
        :param  backbone_rmsf: List of backbone RMSF values.
        :type   st:            `schrodinger.structure.Structure` object
        :param  st:            Input structure.
        :rtype: List[float]
        :return: The RMSF for each residue.
        """
        return _calculate_rmsf(backbone_atoms, backbone_rmsf, st)
    @cached_property
    def receptor_sid_rmsf_backbone_lambda0(self):
        data = self._cpx_sid_lambda0_rmsf['Backbone']
        if 'ProteinResidues' in data:
            return data['Result'].val
        backbone_atoms = data['AtomIndices'].val
        rmsf_vals = data['Result'].val
        return self._protein_sid_rmsf_backbone(backbone_atoms, rmsf_vals,
                                               self.wt_st)
    @cached_property
    def receptor_sid_rmsf_backbone_lambda1(self):
        data = self._cpx_sid_lambda1_rmsf['Backbone']
        if 'ProteinResidues' in data:
            return data['Result'].val
        backbone_atoms = data['AtomIndices'].val
        rmsf_vals = data['Result'].val
        return self._protein_sid_rmsf_backbone(backbone_atoms, rmsf_vals,
                                               self.mut_st)
    @property
    def ligand_torsions(self):
        """
        :rtype: (`FEPTorsionsContainer` object, `FEPTorsionsContainer` object)
        :return: `FEPTorsionsContainer` objects for lambda0 and lambda1.
        """
        sol_idx_offsets = (0, self.ligand1_st.atom_total +
                           self.ligand1_alchemical_atom_total)
        cpx_idx_offsets = (0, self._pv_st[0].atom_total +
                           self.ligand1_alchemical_atom_total)
        return self._get_ligand_torsions(sol_idx_offsets, cpx_idx_offsets)
    @property
    def receptor_title(self):
        return "Cov. Receptor (%s:%d)" % (
            self.ligand1_cpx_st.atom[1].chain,
            self.ligand1_cpx_st.atom[1].resnum,
        )
    @property
    def cpx_sid_lp_results(self):
        if not self._cpx_sid_lp_results:
            self._cpx_sid_lp_stats(self.ligand1_cpx_st, self.ligand2_cpx_st)
        return self._cpx_sid_lp_results
[docs]    def get_ligand1_atom_dict(self):
        return self._get_ligand_atom_dict(self.ligand1_cpx_st) 
[docs]    def get_ligand2_atom_dict(self):
        return self._get_ligand_atom_dict(self.ligand2_cpx_st) 
    @property
    def atom_mapping(self):
        """
        Return atom mapping for both ligand fragment that is used in the SID
        calculation. Ligands used in the solvent leg are different than those
        used in the complex leg. The difference is:
            - Complex leg -- we use the side chain of the attached residue plus
              the ligand.
            - Solvent leg -- we use the peptide plus the ligand.
        :return: atom mapping for ligand1 and ligand2. The values should
        :rtype: (`list`, `list`)
        """
        if self._atom_mapping_cpx:
            return self._atom_mapping_cpx
        lig1_map = {
            atom.property[constants.FEP_ATOM_INDEX]: atom.index
            for atom in self.ligand1_cpx_st.atom
        }
        lig2_map = {
            atom.property[constants.FEP_ATOM_INDEX]: atom.index
            for atom in self.ligand2_cpx_st.atom
        }
        frag_map1, frag_map2 = [], []
        for a1, a2 in zip(*self._full_atom_mapping):
            if a1 in lig1_map and a2 in lig2_map:
                frag_map1.append(lig1_map[a1])
                frag_map2.append(lig2_map[a2])
        self._atom_mapping_cpx = (frag_map1, frag_map2)
        return self._atom_mapping_cpx 
[docs]class AbFEPEdgeData(FEPEdgeData):
    """
    This object stores Absolute Binding FEP related data.
    """
[docs]    def __init__(self,
                 complex_sea,
                 solvent_sea,
                 pv_st=None,
                 atom_mapping=None,
                 perturbation_type=constants.FEP_TYPES.ABSOLUTE_BINDING,
                 ddg_corrections_map=None):
        """
        :type   complex_sea: `sea`
        :param  complex_sea: SEA object with results pertaining to the complex
                             leg of the FEP calculation
        :type   solvent_sea: `sea`
        :param  solvent_sea: SEA object with results pertaining to the solvent
                             leg of the FEP calculation
        :type         pv_st: `schrodinger.Structure`
        :param        pv_st: PoseViewer file must contain 3 structures,
                             [receptor, lig1, lig2]
        :type  atom_mapping: Dict[int, int]
        :param atom_mapping: mapping of ligand2 to ligand1 atoms
        See `FEPEdgeData` for definitions of other parameters.
        """
        super().__init__(complex_sea, solvent_sea, pv_st, atom_mapping,
                         perturbation_type, ddg_corrections_map)
        if pv_st and len(pv_st) == 3:
            pv_st[0] = self._init_protein_st(pv_st[0])
            self._pv_st = pv_st
        # offsets used to mock the cms full-system CT atom order
        self.ligand_offset = self._pv_st[0].atom_total * 2 + self._pv_st[
            2].atom_total
        self.dummy_offset = self._pv_st[2].atom_total 
    @property
    def fullsystem_ct(self):
        """
        :rtype: `schrodinger.structure.Structure`
        :return: The full system structure.
        """
        if self._fullsystem_ct:
            return self._fullsystem_ct
        fs = structure.create_new_structure()
        # In AbFEP simulations, the ct is in the order of
        # Protein, Ligand2 (Dummy), Protein, Ligand1
        for ct in [self._pv_st[0], self._pv_st[2],
                   self._pv_st[0], self._pv_st[1]]: # yapf: disable
            fs = fs if ct is None else fs.merge(ct, copy_props=True)
        for atom in fs.atom:
            atom.property[constants.ORIGINAL_INDEX] = atom.index
        self._fullsystem_ct = fs
        return self._fullsystem_ct
    @property
    def receptor_st(self):
        """
        :rtype: `schrodinger.structure.Structure`
        :return: The receptor structure.
        """
        if self._receptor_st:
            return self._receptor_st
        receptor_st = self._pv_st[0].copy()
        offset = self._pv_st[0].atom_total + self._pv_st[2].atom_total
        for atom in receptor_st.atom:
            atom.property[constants.ORIGINAL_INDEX] = offset + atom.index
        self._receptor_st = receptor_st
        return self._receptor_st
    @property
    def ligand1_st(self):
        """
        :rtype: `schrodinger.structure.Structure`
        :return: The ligand1 structure.
        """
        if self._ligand1_st:
            return self._ligand1_st
        self._ligand1_st = self._pv_st[1].copy()
        offset = self.ligand_offset
        for atom in self._ligand1_st.atom:
            atom.property[constants.ORIGINAL_INDEX] = offset + atom.index
        return self._ligand1_st
    @property
    def ligand2_st(self):
        """
        :rtype: `schrodinger.structure.Structure`
        :return: The ligand2 structure.
        """
        if self._ligand2_st:
            return self._ligand2_st
        self._ligand2_st = self._pv_st[2].copy()
        offset = self.dummy_offset
        for atom in self._ligand2_st.atom:
            atom.property[constants.ORIGINAL_INDEX] = offset + atom.index
        return self._ligand2_st
    @property
    def ligand_torsions(self):
        """
        :rtype: (`FEPTorsionsContainer` object, `FEPTorsionsContainer` object)
        :return: `FEPTorsionsContainer` objects for lambda0 and lambda1.
        """
        sol_idx_offsets = (self._pv_st[2].atom_total, 0)
        cpx_idx_offsets = (self.ligand_offset, self.dummy_offset)
        return self._get_ligand_torsions(sol_idx_offsets, cpx_idx_offsets)
    @property
    def ligand1_hash(self):
        jn = self._ark_sol_fep_simulation['JobName'].val
        hash_ = jn.split('_')[-4:-2]
        return hash_[1]
    def _cpx_sid_pl_contacts(self):
        """
        This method calculates protein-residue contacts for both lambda0 and
        lambda1 simulations, collates the residue information and then sets
        `self._cpx_sid_pl_results` with the results dictionaries.
        """
        contact_data0 = self.pl_contact_data0
        contact_data1 = {}
        nkeys = len(self.cpx_sid_protein_residues)
        ntypes = len(self._pl_detail_inter_type)
        pl_cont_data0 = numpy.zeros(
            (nkeys, self.cpx_sid_number_of_frames, ntypes), dtype=numpy.int)
        pl_cont_data1 = numpy.zeros(
            (nkeys, self.cpx_sid_number_of_frames, ntypes), dtype=numpy.int)
        for i, t in enumerate(self._pl_contact_types):
            self._parse_pl_contact_data(t, contact_data0[t], pl_cont_data0)
        pl_results0, pl_results1 = {}, {}
        for ires, res in enumerate(self.cpx_sid_protein_residues):
            pl_results0[res] = self._get_pli_residue_profile(
                pl_cont_data0[ires, ::, ::])
            pl_results1[res] = self._get_pli_residue_profile(
                pl_cont_data1[ires, ::, ::])
        self._cpx_sid_pl_results = [pl_results0, pl_results1]
    def _cpx_sid_lp_stats(self, lig1_st, lig2_st):
        lid_stat_map0 = self._getLidStatMap(self.pl_contact_data0,
                                            self._cpx_sid_pli_lambda0_dict,
                                            self.ligand1_fragments,
                                            self._ligand1_cpx_sid_lighb,
                                            lig1_st)
        self._cpx_sid_lp_results = [lid_stat_map0, {}]
    @property
    def delta_delta_g(self):
        # calculation of ddG is reversed due to the lambda-scheduling is
        # reversed from what it is in RE-FEP workflow.
        return -super().delta_delta_g
    @property
    def cpx_sid_protein_residues(self):
        """
        A list of protein residues that interact with both ligand1 throughout
        the simulation
        :rtype: `str`
        :return: a list of protein tags that interact with ligand1
        """
        if not self._cpx_sid_protein_residues:
            sid_dict0 = self._cpx_sid_pli_lambda0_dict
            keys0 = self._set_cpx_sid_protein_residues(sid_dict0)
            self._cpx_sid_protein_residues = self._parse_residue_tags(keys0)
        return self._cpx_sid_protein_residues 
[docs]def shorten_name(name: str) -> str:
    return name[:4] 
[docs]class SolubilityFEPEdgeData:
[docs]    def __init__(self, st, hydration_sea, sublimation_sea_list):
        self.st = st
        if hydration_sea:
            self._parse_hyd_sea(hydration_sea)
            self._parse_hyd_sid()
        if sublimation_sea_list:
            self._parse_sub_sea(sublimation_sea_list)
            self._parse_sub_sid()
        self.perturbation_type = constants.FEP_TYPES.SOLUBILITY
        self.numb_sub_legs = len(sublimation_sea_list)
        self._mol_torsions = [] 
    def _parse_hyd_sea(self, hyd_sea):
        """
        Since the ARK-formatted sid file contains a lot of data, in a list
        format, we need to parse it.
        """
        if not hyd_sea:
            return
        ret_dict = self._parse_sea(hyd_sea)
        self._ark_hyd_fep_simulation = ret_dict['fep_simulation']
        self._ark_hyd_pair_energetics = ret_dict['fep_pair_energetics_list']
        self._ark_hyd_rest_exchanges = ret_dict['rest_exchanges']
        # node[0] is a dummy compound
        self._ark_hyd_sid = ret_dict['sid_ligand_list'][1]
        self._ark_hyd_mol = ret_dict['ligand_list'][1]
    def _parse_sub_sea(self, sub_sea_list):
        """
        Since the ARK-formatted sid file contains a lot of data, in a list
        format, we need to parse it.
        """
        self._ark_sub_fep_simulation = []
        self._ark_sub_pair_energetics = []
        self._ark_sub_rest_exchanges = []
        self._ark_sub_sid = []
        self._ark_sub_mol = []
        for sub_sea in sub_sea_list:
            if not sub_sea:
                continue
            ret_dict = self._parse_sea(sub_sea)
            self._ark_sub_fep_simulation.append(ret_dict['fep_simulation'])
            self._ark_sub_pair_energetics.append(
                ret_dict['fep_pair_energetics_list'])
            self._ark_sub_rest_exchanges.append(ret_dict['rest_exchanges'])
            # node[0] is a dummy compound
            self._ark_sub_sid.append(ret_dict['sid_ligand_list'][1])
            self._ark_sub_mol.append(ret_dict['ligand_list'][1])
    def _parse_sub_sid(self):
        self._ark_sub_torsion = []
        self._sub_sid_rgyr = []
        self._sub_sid_psa = []
        self._sub_sid_sasa = []
        self._sub_sid_molsa = []
        self._sub_sid_hb = []
        self._sub_sid_rmsd = []
        self._ark_sub_interactions = []
        for ark_sub in self._ark_sub_sid:
            ret_dict = parse_sid(ark_sub)
            self._ark_sub_torsion.append(ret_dict['Torsion'])
            self._sub_sid_rgyr.append(ret_dict['Rad_Gyration']['Result'].val)
            self._sub_sid_psa.append(
                ret_dict['Polar_Surface_Area']['Result'].val)
            self._sub_sid_sasa.append(ret_dict['SA_Surface_Area']['Result'].val)
            self._sub_sid_molsa.append(
                ret_dict['Molecular_Surface_Area']['Result'].val)
            self._sub_sid_hb.append(ret_dict['LigandHBonds']['Result'].val)
            self._sub_sid_rmsd.append(ret_dict['RMSD'][0]['Result'].val)
            self._ark_sub_interactions.append(
                ret_dict['AmorphousCrystalInter'].val)
    def _parse_hyd_sid(self):
        ret_dict = parse_sid(self._ark_hyd_sid)
        self._ark_hyd_torsion = ret_dict['Torsion']
        self._hyd_sid_rgyr = ret_dict['Rad_Gyration']['Result'].val
        self._hyd_sid_psa = ret_dict['Polar_Surface_Area']['Result'].val
        self._hyd_sid_sasa = ret_dict['SA_Surface_Area']['Result'].val
        self._hyd_sid_molsa = ret_dict['Molecular_Surface_Area']['Result'].val
        self._hyd_sid_hb = ret_dict['LigandHBonds']['Result'].val
        self._hyd_sid_rmsd = ret_dict['RMSD'][0]['Result'].val
    @staticmethod
    def _timeseries_props(vals, stats=True):
        if not stats:
            return vals
        if not len(vals):
            return 0., 0.
        return numpy.mean(vals), numpy.std(vals)
[docs]    def hyd_sid_rmsd(self, stats=True):
        val = self._hyd_sid_rmsd
        return self._timeseries_props(val, stats=stats) 
[docs]    def sub_sid_rmsd(self, stats=True):
        return [
            self._timeseries_props(val, stats=stats)
            for val in self._sub_sid_rmsd
        ] 
[docs]    def hyd_sid_sasa(self, stats=True):
        val = self._hyd_sid_sasa
        return self._timeseries_props(val, stats=stats) 
[docs]    def sub_sid_sasa(self, stats=True):
        vals = self._sub_sid_sasa
        return [self._timeseries_props(v, stats=stats) for v in vals] 
[docs]    def hyd_sid_psa(self, stats=True):
        val = self._hyd_sid_psa
        return self._timeseries_props(val, stats=stats) 
[docs]    def sub_sid_psa(self, stats=True):
        vals = self._sub_sid_psa
        return [self._timeseries_props(v, stats=stats) for v in vals] 
[docs]    def hyd_sid_molsa(self, stats=True):
        val = self._hyd_sid_molsa
        return self._timeseries_props(val, stats=stats) 
[docs]    def sub_sid_molsa(self, stats=True):
        vals = self._sub_sid_molsa
        return [self._timeseries_props(v, stats=stats) for v in vals] 
[docs]    def hyd_sid_rgyr(self, stats=True):
        val = self._hyd_sid_rgyr
        return self._timeseries_props(val, stats=stats) 
[docs]    def sub_sid_rgyr(self, stats=True):
        vals = self._sub_sid_rgyr
        return [self._timeseries_props(v, stats=stats) for v in vals] 
[docs]    def sub_interactions(self, stats=True):
        # Include intramolecular hydrogen bonds together with interactions
        for inter, ihb in zip(self._ark_sub_interactions, self._sub_sid_hb):
            inter['IntraHBondResult'] = [len(hb) for hb in ihb]
        if stats:
            ret = []
            for leg in self._ark_sub_interactions:
                ret.append({
                    k: self._timeseries_props(leg[k], stats=True)
                    for k in leg
                    if k.endswith('Result')
                })
            return ret
        return self._ark_sub_interactions 
    @property
    def mol_st(self):
        return self.st
    @property
    def _atom_mapping(self):
        return [[], []]
    @property
    def mol_torsions(self):
        """
        :rtype: List(`FEPTorsionsContainer`)
        :return: each element in the list will contain (hyd, subX) tuplet.
        """
        if not self._mol_torsions:
            hyd = [(None, t) for t in self._ark_hyd_torsion]
            for sub in self._ark_sub_torsion:
                sub = [(None, t) for t in sub]
                self._mol_torsions.append(
                    FEPTorsionsContainer(
                        hyd, sub, perturbation_type=self.perturbation_type))
        return self._mol_torsions
    @property
    def sub_sid_trajectory_interval_ns(self):
        return _get_interval_ns(self._ark_sub_sid)
    @property
    def hyd_sid_trajectory_interval_ns(self):
        return _get_interval_ns([self._ark_hyd_sid])
    @property
    def sub_sid_snapshot_times_ns(self):
        interval_ns = self.sub_sid_trajectory_interval_ns
        return [x * interval_ns for x in range(self.sub_sid_number_of_frames)]
    @property
    def hyd_sid_snapshot_times_ns(self):
        interval_ns = self.hyd_sid_trajectory_interval_ns
        return [x * interval_ns for x in range(self.hyd_sid_number_of_frames)]
    @property
    def sub_sid_number_of_frames(self):
        return _get_frame_count(self._ark_sub_sid)
    @property
    def hyd_sid_number_of_frames(self):
        return _get_frame_count([self._ark_hyd_sid])
    @property
    def leg1_name(self):
        return SOLVENT_NAMES[self.perturbation_type]
    @property
    def leg2_name(self):
        return COMPLEX_NAMES[self.perturbation_type]
    @property
    def leg2_names(self):
        return [f'{self.leg2_name}-{i}' for i in range(self.numb_sub_legs)]
    @property
    def leg2_names_short(self):
        return [
            f'{shorten_name(self.leg2_name)}{i}'
            for i in range(self.numb_sub_legs)
        ]
    @property
    def hyd_delta_g_sliding_err(self):
        return self._get_values(self._ark_hyd_pair_energetics, 'dGSliding',
                                'dG_error')
    @property
    def sub_delta_g_sliding_err(self):
        return [
            self._get_values(v, 'dGSliding', 'dG_error')
            for v in self._ark_sub_pair_energetics
        ]
    @property
    def hyd_delta_g_sliding(self):
        return self._get_values(self._ark_hyd_pair_energetics, 'dGSliding',
                                'dG')
    @property
    def sub_delta_g_sliding(self):
        return [
            self._get_values(v, 'dGSliding', 'dG')
            for v in self._ark_sub_pair_energetics
        ]
    @property
    def hyd_delta_g_reverse_err(self):
        return self._get_values(self._ark_hyd_pair_energetics, 'dGReverse',
                                'dG_error')
    @property
    def sub_delta_g_reverse_err(self):
        return [
            self._get_values(v, 'dGReverse', 'dG_error')
            for v in self._ark_sub_pair_energetics
        ]
    @property
    def hyd_delta_g_reverse(self):
        return self._get_values(self._ark_hyd_pair_energetics, 'dGReverse',
                                'dG')
    @property
    def sub_delta_g_reverse(self):
        return [
            self._get_values(v, 'dGReverse', 'dG')
            for v in self._ark_sub_pair_energetics
        ]
    @property
    def hyd_delta_g_forward_err(self):
        return self._get_values(self._ark_hyd_pair_energetics, 'dGForward',
                                'dG_error')
    @property
    def sub_delta_g_forward_err(self):
        return [
            self._get_values(v, 'dGForward', 'dG_error')
            for v in self._ark_sub_pair_energetics
        ]
    @property
    def hyd_delta_g_forward(self):
        return self._get_values(self._ark_hyd_pair_energetics, 'dGForward',
                                'dG')
    @property
    def sub_delta_g_forward(self):
        return [
            self._get_values(v, 'dGForward', 'dG')
            for v in self._ark_sub_pair_energetics
        ]
    @property
    def hyd_delta_g_forward_bootstrap_std(self):
        return self._ark_hyd_pair_energetics['dGForward']['Bennett'][
            'Bootstrap_std'].val
    @property
    def sub_delta_g_forward_bootstrap_std(self):
        return [
            v['dGForward']['Bennett']['Bootstrap_std'].val
            for v in self._ark_sub_pair_energetics
        ]
    @property
    def hyd_delta_g_forward_analytical_std(self):
        return self._ark_hyd_pair_energetics['dGForward']['Bennett'][
            'Analytical_std'].val
    @property
    def sub_delta_g_forward_analytical_std(self):
        return [
            v['dGForward']['Bennett']['Analytical_std'].val
            for v in self._ark_sub_pair_energetics
        ]
    @property
    def hyd_delta_g_forward_df_per_replica(self):
        return self._ark_hyd_pair_energetics['dGForward']['Bennett'][
            'dF_Per_Replica'].val
    @property
    def sub_delta_g_forward_df_per_replica(self):
        return [
            v['dGForward']['Bennett']['dF_Per_Replica'].val
            for v in self._ark_sub_pair_energetics
        ]
    @property
    def hyd_end_time_ns(self):
        return self._get_values(self._ark_hyd_pair_energetics, 'dGForward',
                                'EndTime_ns')
    @property
    def sub_end_time_ns(self):
        return self._get_values(self._ark_sub_pair_energetics[0], 'dGForward',
                                'EndTime_ns')
    @property
    def hyd_start_time_ns(self):
        return self._get_values(self._ark_hyd_pair_energetics, 'dGForward',
                                'StartTime_ns')
    @property
    def sub_start_time_ns(self):
        return self._get_values(self._ark_sub_pair_energetics[0], 'dGForward',
                                'StartTime_ns')
    @property
    def hyd_mol_number(self):
        return self._ark_hyd_mol['ASL'].val.split()[1]
    @property
    def sub_mol_number(self):
        return [v['ASL'].val.split()[1] for v in self._ark_sub_mol]
    @property
    def mol_total_rot_bonds(self):
        return self._ark_hyd_mol['NumberRotatableBonds'].val
    @property
    def mol_total_fragments(self):
        return self._ark_hyd_mol['NumberFragments'].val
    @property
    def mol_formula(self):
        return self._ark_hyd_mol['MolecularFormula'].val
    @property
    def mol_charge(self):
        return self._ark_hyd_mol['Charge'].val
    @property
    def mol_atomic_mass(self):
        return self._ark_hyd_mol['AtomicMass'].val
    @property
    def mol_total_heavy(self):
        return self._ark_hyd_mol['NumberHeavyAtoms'].val
    @property
    def mol_total_atoms(self):
        return self._ark_hyd_mol['NumberAtoms'].val
    @property
    def mol_pdb_name(self):
        return self._ark_hyd_mol['PDBResidueName'].val
    @property
    def mol_smiles(self):
        return self._ark_hyd_mol['SMILES'].val
    @property
    def mol_name(self):
        return self.st.title
    @property
    def short_hash(self):
        jn = self._ark_hyd_fep_simulation['JobName'].val
        return jn.split('_')[-2]
    @property
    def mol_hash(self):
        return self.short_hash
    @property
    def jobname(self):
        jn = self._ark_hyd_fep_simulation['JobName'].val
        jn = jn.split('_')[:-3]
        return '_'.join(jn)
    @property
    def solubility_dg(self):
        return self.hyd_delta_g - self.sub_median_delta_g
    @property
    def sub_median_delta_g(self):
        """
        Return the median of solubility legs dG
        """
        sub_dgs = [Measurement(*v) for v in self.sub_delta_g]
        return median_sublimation_free_energy(sub_dgs)
    @property
    def hyd_delta_g(self):
        """
        :return: dG and its standard deviation
        :rtype: float, float
        """
        return Measurement(*self._delta_g(self._ark_hyd_pair_energetics))
    @property
    def sub_delta_g(self):
        """
        :return: dG and its standard deviation
        :rtype: float, float
        """
        return [self._delta_g(v) for v in self._ark_sub_pair_energetics]
    @staticmethod
    def _delta_g(ark_pair_energetics):
        if 'dGForward' not in list(ark_pair_energetics) or \
           
'Bennett' not in list(ark_pair_energetics['dGForward']):
            return None, None
        ark_obj = ark_pair_energetics['dGForward']['Bennett']
        dg = ark_obj['dG'].val
        dg_bootstrap_std = ark_obj['Bootstrap_std'].val
        return dg, dg_bootstrap_std
    @property
    def hyd_total_replicas(self):
        return len(self._ark_hyd_rest_exchanges.val)
    @property
    def sub_total_replicas(self):
        return len(self._ark_sub_rest_exchanges[0].val)
    @property
    def hyd_total_waters(self):
        return self._ark_hyd_fep_simulation['NumberWaters'].val
    @property
    def sub_total_waters(self):
        return self._ark_sub_fep_simulation[0]['NumberWaters'].val
    @property
    def hyd_total_atoms(self):
        return self._ark_hyd_fep_simulation['TotalAtoms'].val
    @property
    def sub_total_atoms(self):
        return self._ark_sub_fep_simulation[0]['TotalAtoms'].val
    @property
    def hyd_sim_time(self):
        """
        Values returned in Ns (nanoseconds)
        """
        return self._ark_hyd_fep_simulation['TotalSimulationNs'].val
    @property
    def sub_sim_time(self):
        """
        Values returned in Ns (nanoseconds)
        """
        return self._ark_sub_fep_simulation[0]['TotalSimulationNs'].val
    @property
    def hyd_temperature(self):
        return self._ark_hyd_fep_simulation['TemperatureK'].val
    @property
    def sub_temperature(self):
        return self._ark_sub_fep_simulation[0]['TemperatureK'].val
    @property
    def hyd_ff(self):
        return self._ark_hyd_fep_simulation['ForceFields'].val
    @property
    def sub_ff(self):
        return self._ark_sub_fep_simulation[0]['ForceFields'].val
    @property
    def hyd_ensemble(self):
        return self._ark_hyd_fep_simulation['Ensemble'].val
    @property
    def sub_ensemble(self):
        return self._ark_sub_fep_simulation[0]['Ensemble'].val
    @property
    def hyd_charge(self):
        return self._ark_hyd_fep_simulation['TotalCharge'].val
    @property
    def sub_charge(self):
        return self._ark_sub_fep_simulation[0]['TotalCharge'].val
    def _parse_sea(self, sea_obj):
        try:
            ret_dict = parse_sea(sea_obj)
        except ValueError as err:
            self._write_error(err)
            raise
        return ret_dict
    @staticmethod
    def _write_error(msg):
        print(str(msg))
    @staticmethod
    def _get_values(ark_obj, key1, key2):
        return ark_obj[key1][key2].val
    @property
    def hyd_rest_exchanges(self):
        return self._rest_exchanges(self._ark_hyd_rest_exchanges)
    @property
    def sub_rest_exchanges(self):
        return [self._rest_exchanges(v) for v in self._ark_sub_rest_exchanges]
    @staticmethod
    def _rest_exchanges(ark_obj):
        density_list = list(range(len(ark_obj)))
        for replica in ark_obj:
            irep = replica['Number'].val
            density_list[irep] = replica['HistoryDistribution'].val
        return density_list 
[docs]def parse_sid(obj_sea):
    ret_dict = {
        'RMSD': [],
        'RMSF': [],
        'SSE': None,
        'ProtLigInter': None,
        'ProtProtInter': None,
        'Torsion': [],
        'Rad_Gyration': None,
        'LigandHBonds': None,
        'Polar_Surface_Area': None,
        'SA_Surface_Area': None,
        'Molecular_Surface_Area': None,
        'NumberOfFrames': None,
        'TrajectoryInterval': None,
        'AmorphousCrystalInter': None
    }
    for m in obj_sea['Keywords']:
        if 'ProtLigInter' in m:
            ret_dict['ProtLigInter'] = m['ProtLigInter']
        if 'ProtProtInter' in m:
            ret_dict['ProtProtInter'] = m['ProtProtInter']
        if 'Torsion' in m:
            ret_dict['Torsion'].append(m['Torsion'])
        if 'SecondaryStructure' in m:
            ret_dict['SSE'] = m['SecondaryStructure']
        if 'Rad_Gyration' in m:
            ret_dict['Rad_Gyration'] = m['Rad_Gyration']
        if 'Polar_Surface_Area' in m:
            ret_dict['Polar_Surface_Area'] = m['Polar_Surface_Area']
        if 'SA_Surface_Area' in m:
            ret_dict['SA_Surface_Area'] = m['SA_Surface_Area']
        if 'Molecular_Surface_Area' in m:
            ret_dict['Molecular_Surface_Area'] = m['Molecular_Surface_Area']
        if 'LigandHBonds' in m:
            ret_dict['LigandHBonds'] = m['LigandHBonds']
        if 'RMSD' in m:
            ret_dict['RMSD'].append(m['RMSD'])
        if 'RMSF' in m:
            ret_dict['RMSF'].append(m['RMSF'])
        if 'AmorphousCrystalInter' in m:
            ret_dict['AmorphousCrystalInter'] = m['AmorphousCrystalInter']
    return ret_dict 
[docs]def parse_sea(sea_obj):
    """
    Given an ark object, parse the data
    """
    if 'Keywords' not in sea_obj:
        raise ValueError('No keywords present in FEP sid file.')
    kw_list = sea_obj['Keywords']
    ret_dict = {
        'ligand_list': [],
        'protein_info': None,
        'fep_simulation': None,
        'fep_pair_energetics': None,
        'rest_exchanges': None,
        'sid_ligand_list': []
    }
    for m in kw_list:
        if 'FEPSimulation' in m:
            ret_dict['fep_simulation'] = m['FEPSimulation']
        if 'SaltInformation' in m:
            ret_dict['salt_information'] = m['SaltInformation']
        if 'ProteinInfo' in m:
            ret_dict['protein_info'] = m['ProteinInfo']
        if 'LigandInfo' in m:
            ret_dict['ligand_list'].append(m['LigandInfo'])
        if 'PeptideInfo' in m:
            ret_dict['ligand_list'].append(m['PeptideInfo'])
        if 'BinCenter' in m:
            ret_dict['fep_pair_energetics_list'] = m
        if 'Replica' in m:
            ret_dict['rest_exchanges'] = m['Replica']
        if 'ResultLambda0' in m:
            ret_dict['sid_ligand_list'].append(m['ResultLambda0'])
        if 'ResultLambda1' in m:
            ret_dict['sid_ligand_list'].append(m['ResultLambda1'])
    return ret_dict 
def _get_interval_ns(sid_list):
    """
    :param sid_list: a list of SID data maps for a FEP map edge (one for each
        of the two ligands)
    :type sid_list: list(schrodinger.utils.sea.Map)
    :return: the MD simulation interval for the edge, if available
    :rtype: float or None
    """
    if len(sid_list) < 1:
        return None
    interval_ps = sid_list[0]['TrajectoryInterval_ps']
    return interval_ps.val / 1000.0
def _get_frame_count(sid_list):
    """
    :param sid_list: a list of SID data maps for a FEP map edge (one for each
        of the two ligands)
    :type sid_list: list(schrodinger.utils.sea.Map)
    :return: the number of frames for the edge MD simulation, if available
    :rtype: int or None
    """
    if len(sid_list) < 1:
        return None
    frame_count = sid_list[0]['TrajectoryNumFrames']
    return frame_count.val
def _strain_text(value):
    """
    Create the strain text: value +/- std_dev
    :param value: the values to use, i.e.,  (value, std_dev)
    :type value: tuple(float, float)
    :return: the text for use in matplotlib
    :rtype: str
    """
    return f'{value[0]:.1f}$\\pm${value[1]:.1f}'
[docs]def get_ticks(min_val, max_val, num_ticks):
    """
    Return tick values and labels for an axis with the given min and max
    :param min_val: The minimum value on the axis
    :type min_val: float
    :param max_val: The maximum value on the axis
    :type max_val: float
    :param num_ticks: How many ticks to use on the axis
    :type num_ticks: int
    :return: tick values and tick labels
    :rtype: list[float], list[str]
    """
    tick_vals = list(numpy.linspace(min_val, max_val, num_ticks))
    labels = ["%.2f" % x for x in tick_vals]
    return tick_vals, labels