"""
Classes and functions to deal with ML features.
Copyright Schrodinger, LLC. All rights reserved."""
import argparse
import itertools
import math
import matplotlib
import operator
import os
import random
import shutil
from collections import defaultdict
from collections import namedtuple
import numpy
from matminer.featurizers.site import CrystalNNFingerprint
from matminer.featurizers.structure import SiteStatsFingerprint
from schrodinger import structure
from schrodinger import surface
from schrodinger.application.jaguar.input import JaguarInput
from schrodinger.application.jaguar.output import JaguarOutput
from schrodinger.application.matsci import amorphous
from schrodinger.application.matsci import clusterstruct as cstruct
from schrodinger.application.matsci import jaguar_restart
from schrodinger.application.matsci import jaguarworkflows
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import reaction_workflow_utils as rxnwfu
from schrodinger.application.matsci import textlogger
from schrodinger.application.matsci.elementalprops import ElementalProperties
from schrodinger.application.matsci import \
    genetic_optimization as go
from schrodinger.application.matsci.mlearn.base import BaseFeaturizer
from schrodinger.application.matsci.nano import xtal
from schrodinger.application.matsci import rxn_channel
from schrodinger.infra import structure as infrastructure
from schrodinger.job import queue
from schrodinger.structutils import analyze
from schrodinger.structutils import transform
from schrodinger.structutils.measure import measure_bond_angle
from schrodinger.structutils.measure import measure_distance
from schrodinger.utils import fileutils
from schrodinger.utils import subprocess
X_VEC = numpy.array(transform.X_AXIS)
Z_VEC = numpy.array(transform.Z_AXIS)
RESERVED_JAGUAR_KEYWORDS = {'ldips': 5, 'icfit': 1, 'fukui': 1}
MomentData = namedtuple('MomentData', ['flag', 'components', 'header', 'units'])
# yapf: disable
MOMENTS_DATA = [
    MomentData(flag='-dipolecomp', components=('X', 'Y', 'Z'), header='dipole',
        units='Debye'),
    MomentData(flag='-quadrupole', components=('XX', 'YY', 'ZZ', 'XY', 'XZ', 'YZ'),
        header='quadrupole', units='Debye * Ang.'),
    MomentData(flag='-trlessquad', components=('XX-YY', '2ZZ-XX-YY', 'XY', 'XZ', 'YZ'),
        header='traceless_quadrupole', units='Debye * Ang.'),
    MomentData(flag='-octupole', components=('XXX', 'YYY', 'ZZZ', 'XYY', 'XXY',
        'XXZ', 'XZZ', 'YZZ', 'YYZ', 'XYZ'), header='octupole', units='Debye * Ang.^2'),
    MomentData(flag='-trlessoct', components=('XXX', 'YYY', 'ZZZ', 'XYZ', 'XYY-XZZ',
        'XXY-YZZ', 'XXZ-YYZ'), header='traceless_octupole', units='Debye * Ang.^2'),
    MomentData(flag='-hexadecapole', components=('XXXX', 'YYYY', 'ZZZZ', 'XXXY',
        'XXXZ', 'YYYX', 'YYYZ', 'ZZZX', 'ZZZY', 'XXYY', 'XXZZ', 'YYZZ', 'XXYZ',
        'YYXZ', 'ZZXY'), header='hexadecapole', units='Debye * Ang.^3')
]
# yapf: enable
DescriptorUtility = namedtuple('DescriptorUtilitity', [
    'job_class', 'script_name', 'in_mae_flag', 'out_mae_flag', 'other_flags',
    'property_families', 'descriptor_header'
])
JAGUAR_PROP_FAMILY = '_j_'
SKIPPING_MSG = 'Skipping the descriptors for all structures.'
INPUT_OUTPUT_ERROR = ('The number of structures in the input and output '
                      'files for {0} is not the same. ') + SKIPPING_MSG
# the moldescriptors utility necessarily runs under JobControl even when
# using -NOJOBID therefore use JobDJ as opposed to subprocess.check_call
# and specify all job class types
LIGFILTER_DU = DescriptorUtility(job_class=jobutils.RobustSubmissionJob,
                                 script_name='ligfilter',
                                 in_mae_flag=None,
                                 out_mae_flag='-o',
                                 other_flags=['-add_descriptors'],
                                 property_families={'ligfilter'},
                                 descriptor_header='ligfilter')
CANVAS_DU = DescriptorUtility(
    job_class=jobutils.RobustSubmissionJob,
    script_name='canvasMolDescriptors',
    in_mae_flag='-imae',
    out_mae_flag='-omae',
    other_flags=['-All', '-JOB', 'canvas_mol_descriptors'],
    property_families={'canvas'},
    descriptor_header='canvas')
# the following also supports a qikprop option
MOLDESCRIPTORS_DU = DescriptorUtility(job_class=jobutils.RobustSubmissionJob,
                                      script_name='moldescriptors',
                                      in_mae_flag=None,
                                      out_mae_flag='-omae',
                                      other_flags=['-topo'],
                                      property_families={'desc', 'mopac', 'qp'},
                                      descriptor_header='moldescriptors')
[docs]def get_distance_cell(struct, cutoff):
    """
    Create an infrastructure Distance Cell. Struct MUST have the Chorus box
    properties.
    :type struct: `schrodinger.structure.Structure`
    :param struct: Input structure
    :type cutoff: float
    :param cutoff: The cutoff for finding nearest neighbor atoms
    :rtype: `schrodinger.structure.Structure`, ,
        `schrodinger.infra.structure.DistanceCell`,
        `schrodinger.infra.structure.PBC`
    :return: Supercell, an infrastructure Distance Cell that accounts for the
        PBC, and the pbc used to create it.
    :raise: ValueError if struct is missing PBCs
    """
    xtal_kwargs = {
        'bonding': xtal.ParserWrapper.CHOICE_OFF,
        'bond_orders': xtal.ParserWrapper.CHOICE_OFF,
        'translate': xtal.ParserWrapper.CHOICE_OFF,
    }
    # Get params from chorus
    params = xtal.get_params_from_chorus(xtal.get_chorus_properties(struct))
    min_lattice = min(params[:3])
    if cutoff > min_lattice:
        # round up to the closest integer
        extents = [int(numpy.ceil(cutoff / min_lattice))] * 3
        supercell = xtal.get_cell(struct.copy(),
                                  extents=extents,
                                  xtal_kwargs=xtal_kwargs)
    else:
        supercell = struct.copy()
    pbc = cstruct.create_pbc(supercell)
    cell = infrastructure.DistanceCell(supercell, cutoff, pbc)
    return supercell, cell, pbc 
[docs]def elemental_generator(struct, element, is_equal=True):
    comparison = operator.eq if is_equal else operator.ne
    return (x for x in struct.atom if comparison(x.element, element)) 
[docs]def get_anion(struct):
    """
    Get the most electronegative element in the structure (anion).
    :type struct: `schrodinger.structure.Structure`
    :param struct: Input structure
    :rtype: str, float, int
    :return: Element, it's electronegativity, number of anions in the cell
    """
    eneg = -numpy.inf
    anion = None
    elements_dict = defaultdict(int)
    for atom in struct.atom:
        element_eneg = ElementalProperties(atom.element).eneg
        elements_dict[atom.element] += 1
        if element_eneg > eneg:
            anion = atom.element
            eneg = element_eneg
    return anion, eneg, elements_dict[atom.element] 
[docs]class LatticeFeatures(BaseFeaturizer):
    """
    Class to generate lattice-based features.
    """
    # These features are defined in the SI of 10.1039/c6ee02697d
    # Sendek et al, Energy Environ. Sci., 2017,10, 306-320
    FEATURES = {'avgAtomicVol': 'Average atomic volume', # S 1.1
        'avgNeighborCount': 'Average neighbor count', # S 1.5
        'stdNeighborCount': 'Standard deviation of neighbor count', # S 1.2
        'avgSublatticeNeighborCount': 'Average sublattice neighbor count', # S 1.8
        'avgNeighborIon': 'Average neighbor ionicity', # S 1.4
        'stdNeighborIon': 'Standard deviation of neighbor ionicity', # S 1.3
        'avgSublatticeNeighborIon': 'Average sublattice neighbor ionicity', # S 1.7
        'volPerAnion': 'Volume per anion', # S 1.11
        'avgSublatticeEneg': 'Average sublattice electronegativity', # S 1.14
        'packingFraction': 'Crystal packing fraction', # S 1.16
        'avgElementNeighborCount': 'Average cation count', # S 1.6
        'avgAnionAnionShortDistance': 'Average anion anion shortest distance', # S 1.10
        'avgElementAnionShortDistance': 'Average cation anion shortest distance', # S 1.12
        'avgShortDistance': 'Average cation cation shortest distance', # S 1.13
        'anionFrameCoordination': 'Anion frame coordination', # S 1.9
        'sublatticePackingFraction': 'Sublattice packing fraction', # S.15
        'pathWidth': 'Average straight-line path width', # S 1.17
        'pathWidthEneg': 'Average straight-line path electronegativity', # S 1.18
        'ratioIonicity': 'Ratio of average cation to sublattice electronegativity', # S 1.19
        'ratioCount': 'Ratio of average cation to sublattice count', # S 1.20
        } # yapf: disable
[docs]    def __init__(self, features, element='Li', cutoff=4.0):
        """
        Initialize the object.
        """
        # Note that all the passed kwargs must be assigned to self.var !!
        # This is a requirement of sklearn.
        self.element = element
        self.cutoff = cutoff
        self.features = list(features)
        # Ensure that all of the features are known
        assert not set(self.features).difference(self.FEATURES) 
[docs]    def runFeature(self, feature):
        """
        Get result from a feature.
        :type feature: str
        :param: feature: One of the features listed in FEATURES.
        :rtype: int or float
        :return: Feature value
        """
        assert feature in self.FEATURES
        return getattr(self, feature)() 
[docs]    def avgAtomicVol(self):
        """
        Get average atomic volume.
        :type struct: `schrodinger.structure.Structure`
        :param struct: Structure to be used for feature calculation
        :rtype: float
        :return: Average atomic volume (A^3)
        """
        vecs = xtal.get_vectors_from_chorus(self.struct)
        vol = xtal.get_volume_from_vecs(vecs)
        return vol / float(self.struct.atom_total) 
[docs]    def avgNeighborCount(self):
        """
        Get average neighbor count.
        :rtype: float
        :return: Average neighbor count
        """
        natoms_type = 0
        neighbors = 0
        for iatom in elemental_generator(self.struct, self.element):
            natoms_type += 1
            # -1 since query_atoms matches also contains atom itself
            neighbors += len(self.sdcell.query_atoms(*iatom.xyz)) - 1
        if natoms_type == 0:
            return 0.
        return neighbors / float(natoms_type) 
[docs]    def stdNeighborCount(self):
        """
        Get standard deviation of neighbor count.
        :rtype: float
        :return: Average neighbor count
        """
        neighbors = []
        for iatom in elemental_generator(self.struct, self.element):
            # -1 since query_atoms matches also contains atom itself
            neighbors.append(len(self.sdcell.query_atoms(*iatom.xyz)) - 1)
        if len(neighbors) in [0, 1]:
            return 0.
        return numpy.std(neighbors, ddof=1) 
[docs]    def avgSublatticeEneg(self):
        """
        Get average sublattice electronegativity.
        :rtype: float
        :return: Average sublattice electronegativity
        """
        enegs = []
        for iatom in elemental_generator(self.struct,
                                         self.element,
                                         is_equal=False):
            enegs.append(ElementalProperties(iatom.element).eneg)
        if not enegs:
            return 0.
        return sum(enegs) / float(len(enegs)) 
[docs]    def avgSublatticeNeighborCount(self):
        """
        Get average sublattice neighbor count.
        :rtype: float
        :return: Average sublattice neighbor count
        """
        natoms_type = 0
        neighbors = 0
        for iatom in elemental_generator(self.scell,
                                         self.element,
                                         is_equal=False):
            natoms_type += 1
            # -1 since query_atoms matches also contains atom itself
            neighbors += len(self.sdcell.query_atoms(*iatom.xyz)) - 1
        if natoms_type == 0:
            return 0.
        return neighbors / float(natoms_type) 
[docs]    def avgNeighborIon(self):
        """
        Get average neighbor ionicity.
        :rtype: float
        :return: Average neighbor ionicity
        """
        element_eneg = ElementalProperties(self.element).eneg
        nbonds = 0
        avg_eneg = 0.
        for iatom in elemental_generator(self.scell, self.element):
            for jatom in self.sdcell.query_atoms(*iatom.xyz):
                if iatom.index == jatom.getIndex():
                    continue
                nbonds += 1
                eneg = ElementalProperties(
                    self.scell.atom[jatom.getIndex()].element).eneg
                avg_eneg += abs(element_eneg - eneg)
        if nbonds == 0:
            return 0.
        return avg_eneg / float(nbonds) 
[docs]    def stdNeighborIon(self):
        """
        Get standard deviation of neighbor ionicity.
        :rtype: float
        :return: Average neighbor ionicity
        """
        element_eneg = ElementalProperties(self.element).eneg
        enegs = []
        for iatom in elemental_generator(self.scell, self.element):
            for jatom in self.sdcell.query_atoms(*iatom.xyz):
                if iatom.index == jatom.getIndex():
                    continue
                eneg = ElementalProperties(
                    self.scell.atom[jatom.getIndex()].element).eneg
                enegs.append(abs(element_eneg - eneg))
        if len(enegs) in [0, 1]:
            return 0.
        return numpy.std(enegs, ddof=1) 
[docs]    def avgSublatticeNeighborIon(self):
        """
        Get average sublattice neighbor ionicity.
        :rtype: float
        :return: Average sublattice neighbor count
        """
        nbonds = 0
        avg_eneg = 0.
        for iatom in elemental_generator(self.scell,
                                         self.element,
                                         is_equal=False):
            element_eneg = ElementalProperties(iatom.element).eneg
            for jatom in self.sdcell.query_atoms(*iatom.xyz):
                if iatom.index == jatom.getIndex():
                    continue
                nbonds += 1
                eneg = ElementalProperties(
                    self.scell.atom[jatom.getIndex()].element).eneg
                avg_eneg += abs(element_eneg - eneg)
        if nbonds == 0:
            return 0.
        return avg_eneg / float(nbonds) 
[docs]    def volPerAnion(self):
        """
        Get volume per anion.
        :rtype: float
        :return: Volume per anion
        """
        anion, eneg, nanions = get_anion(self.struct)
        vecs = xtal.get_vectors_from_chorus(self.struct)
        vol = xtal.get_volume_from_vecs(vecs)
        return vol / float(nanions) 
[docs]    def packingFraction(self, skip_element=None):
        """
        Get packing fraction of the crystal.
        :type skip_element: str
        :param skip_element: Element to skip
        :rtype: float
        :return: Packing fraction
        """
        total_radius = 0.
        for atom in self.struct.atom:
            if atom.element == skip_element:
                continue
            eff_radius = self.effectiveRadius(atom)
            if eff_radius == 0:
                return 0.
            total_radius += eff_radius**3
        vecs = xtal.get_vectors_from_chorus(self.struct)
        vol = xtal.get_volume_from_vecs(vecs)
        return 4 * numpy.pi * total_radius / (3 * vol) 
[docs]    def effectiveRadius(self, atom):
        """
        Get atom effective radius.
        :type atom: schrodinger.structure._StructureAtom
        :param atom: Atom
        :rtype: float
        :return: Effective radius
        """
        properties = ElementalProperties(atom.element)
        eneg = properties.eneg
        try:
            cov_radius = properties.cradius
            ion_radius = properties.iradius
        except (ValueError, KeyError) as err:
            return 0.
        enegs = []
        for jatom in self.sdcell.query_atoms(*atom.xyz):
            if atom.index == jatom.getIndex():
                continue
            jelem = self.scell.atom[jatom.getIndex()].element
            jeneg = ElementalProperties(jelem).eneg
            enegs.append(abs(eneg - jeneg))
        if not enegs:
            return 0.
        bond_ionicity = numpy.average(enegs)
        return cov_radius if bond_ionicity < 2 else ion_radius 
[docs]    def sublatticePackingFraction(self):
        """
        Get packing fraction of the sublattice crystal.
        :rtype: float
        :return: Packing fraction
        """
        return self.packingFraction(skip_element=self.element) 
[docs]    def avgElementNeighborCount(self):
        """
        Get average element neighbor count.
        :rtype: float
        :return: Average number of bonds per element
        """
        natoms_type = 0
        neighbors = 0
        for iatom in elemental_generator(self.struct, self.element):
            natoms_type += 1
            matches = self.sdcell.query_atoms(*iatom.xyz)
            # -1 since query_atoms matches also contains atom itself
            neighbors += len([
                x for x in matches
                if self.scell.atom[x.getIndex()].element == self.element
            ]) - 1
        if natoms_type == 0:
            return 0.
        return neighbors / float(natoms_type) 
[docs]    def avgAnionAnionShortDistance(self):
        """
        Get average anion anion shortest distance.
        :rtype: float
        :return: Average anion anion shortest distance
        """
        anion, eneg, nanions = get_anion(self.struct)
        distances = []
        for iatom in elemental_generator(self.struct, anion):
            min_dist = numpy.inf
            for jatom in elemental_generator(self.scell, anion):
                if iatom.index == jatom.index:
                    continue
                distance = self.spbc.getDistance(self.struct, iatom, self.scell,
                                                 jatom)
                min_dist = min(min_dist, distance)
            if min_dist == numpy.inf:
                continue
            distances.append(min_dist)
        if not distances:
            return 0.
        return numpy.average(distances) 
[docs]    def avgElementAnionShortDistance(self):
        """
        Get average element anion shortest distance.
        :rtype: float
        :return: Average element anion shortest distance
        """
        anion, eneg, nanions = get_anion(self.struct)
        distances = []
        for iatom in elemental_generator(self.struct, self.element):
            min_dist = numpy.inf
            for jatom in elemental_generator(self.scell, anion):
                if iatom.index == jatom.index:
                    continue
                distance = self.spbc.getDistance(self.struct, iatom, self.scell,
                                                 jatom)
                min_dist = min(min_dist, distance)
            if min_dist == numpy.inf:
                continue
            distances.append(min_dist)
        if not distances:
            return 0.
        return numpy.average(distances) 
[docs]    def avgShortDistance(self):
        """
        Get average element element shortest distance.
        :rtype: float
        :return: Average element element shortest distance
        """
        distances = []
        for iatom in elemental_generator(self.struct, self.element):
            min_dist = numpy.inf
            for jatom in elemental_generator(self.scell, self.element):
                if iatom.index == jatom.index:
                    continue
                distance = self.spbc.getDistance(self.struct, iatom, self.scell,
                                                 jatom)
                min_dist = min(min_dist, distance)
            if min_dist == numpy.inf:
                continue
            distances.append(min_dist)
        if not distances:
            return 0.
        return numpy.average(distances) 
[docs]    def anionFrameCoordination(self):
        """
        Get anion framework coordination.
        :rtype: float
        :return: Anion framework coordination
        """
        anion, eneg, nanions = get_anion(self.struct)
        afc = 0.
        nanions = 0
        for iatom in elemental_generator(self.struct, anion):
            min_dist = numpy.inf
            for jatom in elemental_generator(self.scell, anion):
                if iatom.index == jatom.index:
                    continue
                distance = self.spbc.getDistance(self.struct, iatom, self.scell,
                                                 jatom)
                min_dist = min(min_dist, distance)
            if min_dist == numpy.inf:
                continue
            cutoff = min_dist + 1.0
            supercell, cell, pbc = get_distance_cell(self.struct, cutoff)
            # -1 since query_atoms matches also contains atom itself
            afc += len([
                x for x in cell.query_atoms(*iatom.xyz)
                if supercell.atom[x.getIndex()].element == anion
            ]) - 1
            nanions += 1
        if nanions == 0:
            return 0.
        return afc / float(nanions) 
[docs]    def pathWidth(self, eval_eneg=False):
        """
        Evaluate average straight line path width. See the reference in the
        constructor for more info.
        :type eval_eneg: bool
        :param eval_eneg: If True, return average over electronegativity,
            instead of distance
        :rtype: float
        :return: Average path or electronegativity
        """
        path_widths = []
        path_enegs = []
        for iatom in elemental_generator(self.scell, self.element):
            closest_distance = numpy.inf
            closest_atom = None
            for jatom in elemental_generator(self.scell, self.element):
                if jatom.index == iatom.index:
                    continue
                distance = measure_distance(iatom, jatom)
                if distance < closest_distance:
                    closest_distance = distance
                    closest_atom = jatom
            if not closest_atom:
                continue
            path_width = numpy.inf
            path_atom = None
            for atom in self.scell.atom:
                if atom in [iatom, closest_atom]:
                    continue
                # The goal is to get side2 of a triangle: iatom, atom and the
                # normal to the iatom - closest_atom line
                #            atom
                #           /  |
                #          /   | side2
                #         /    |
                #       iatom <--- closest_distance  ---> closest_atom
                angle1 = measure_bond_angle(closest_atom, iatom, atom)
                if not 0. < angle1 < 90.:
                    continue
                angle2 = measure_bond_angle(iatom, closest_atom, atom)
                if not 0. < angle2 < 90.:
                    continue
                hypotenuse = measure_distance(iatom, atom)
                side2 = abs(numpy.cos(numpy.radians(angle1))) * hypotenuse
                eff_radius = self.effectiveRadius(atom)
                if eff_radius == 0.:
                    return 0.
                side2 = side2 - eff_radius
                if 0 < side2 < path_width:
                    path_width = side2
                    path_atom = atom
            if not path_atom:
                continue
            path_widths.append(path_width)
            path_enegs.append(ElementalProperties(path_atom.element).eneg)
        if not path_widths:
            return 0.
        return (numpy.average(path_enegs)
                if eval_eneg else numpy.average(path_widths)) 
[docs]    def pathWidthEneg(self):
        """
        Evaluate average straight line path electronegativity.
        :rtype: float
        :return: Average electronegativity along the path
        """
        return self.pathWidth(eval_eneg=True) 
[docs]    def ratioIonicity(self):
        """
        Get ratio ionicity.
        :rtype: float
        :return: Ratio ionicity
        """
        sublattice = self.avgSublatticeNeighborIon()
        if sublattice == 0.:
            return 0.
        return self.avgNeighborIon() / sublattice 
[docs]    def ratioCount(self):
        """
        Get ratio neighbor count.
        :rtype: float
        :return: Ratio neighbor count
        """
        sublattice = self.avgSublatticeNeighborCount()
        if sublattice == 0.:
            return 0.
        return self.avgNeighborCount() / sublattice  
[docs]class Ligand(object):
    """
    Manage a ligand.
    """
[docs]    def __init__(self, st, metal_atom, new_to_old, coordination_idxs):
        """
        Create an instance.
        :type st: `schrodinger.structure.Structure`
        :param st: the structure
        :type metal_atom: `schrodinger.structure._StructureAtom`
        :param metal_atom: the metal atom
        :type new_to_old: dict
        :param new_to_old: the map of new indices (extracted ligand)
            to old indices (original structure)
        :type coordination_idxs: list
        :param coordination_idxs: contains groups of indicies (new indices)
            of coordinating atoms
        """
        self.st = st
        self.metal_atom = metal_atom
        self.new_to_old = new_to_old
        self.coordination_idxs = coordination_idxs 
[docs]    def getVec(self, point):
        """
        Return a vector pointing from the metal atom
        to the given point.
        :type point: `numpy.array`
        :param point: the point in Ang.
        :rtype: `numpy.array`
        :return: the vector in Ang.
        """
        return point - numpy.array(self.metal_atom.xyz) 
[docs]    def getCentroid(self, st, idxs):
        """
        Return the centroid vector of the given coordination atom indices.
        :type st: `schrodinger.structure.Structure`
        :param st: the structure
        :type idxs: list
        :param idxs: the coordination indices
        :rtype: `numpy.array`
        :return: the centroid vector in Ang.
        """
        return transform.get_centroid(st, atom_list=idxs)[:3] 
[docs]    def getCoordinationVec(self, st, idxs):
        """
        Return a coordination vector pointing from the metal atom
        to the centroid of the given coordination atom indices.
        :type st: `schrodinger.structure.Structure`
        :param st: the structure
        :type idxs: list
        :param idxs: the coordination indices
        :rtype: `numpy.array`
        :return: the coordination vector in Ang.
        """
        return self.getVec(self.getCentroid(st, idxs)) 
[docs]    def getStoichiometry(self):
        """
        Return the stoichiometry.
        :rtype: str
        :return: the stoichiometry
        """
        return analyze.generate_molecular_formula(self.st) 
[docs]    def getDenticity(self):
        """
        Return the denticity.
        :rtype: int
        :return: the denticity
        """
        return len(self.coordination_idxs) 
[docs]    def getHapticity(self):
        """
        Return the hapticity.
        :rtype: int
        :return: the hapticity
        """
        return max(
            len(coordination_idxs)
            for coordination_idxs in self.coordination_idxs) 
[docs]    def getHapticCharacter(self):
        """
        Return the haptic character.
        :rtype: int
        :return: the haptic character
        """
        characters = []
        for coordination_idxs in self.coordination_idxs:
            bond_lengths = []
            for coordination_idx in coordination_idxs:
                coordination_atom = self.st.atom[coordination_idx]
                bond_length = transform.get_vector_magnitude(
                    self.getVec(numpy.array(coordination_atom.xyz)))
                bond_lengths.append(bond_length)
            min_bond_length = min(bond_lengths)
            character = sum(1 for bond_length in bond_lengths
                            if abs(bond_length - min_bond_length) <= 0.05)
            characters.append(character)
        return max(characters) 
[docs]    def getBiteAngle(self):
        """
        Return the bite angle in degrees.
        :rtype: float or None
        :return: the bite angle in degrees
        """
        # follows from https://en.wikipedia.org/wiki/Bite_angle
        if self.getDenticity() == 1:
            return
        bite_angles = []
        for pair in itertools.combinations(self.coordination_idxs, 2):
            vec_idxs, vec_jdxs = map(
                lambda x: self.getCoordinationVec(self.st, x), pair)
            bite_angle = transform.get_angle_between_vectors(vec_idxs, vec_jdxs)
            bite_angles.append(bite_angle)
        return math.degrees(numpy.mean(bite_angles)) 
[docs]    def getAtomConeAngle(self, atom):
        """
        Return the cone angle for the given atom in degrees.
        :type atom: `schrodinger.structure._StructureAtom`
        :param atom: the atom
        :rtype: float
        :return: the cone angle for the given atom in degrees
        """
        vec = numpy.array(atom.xyz)
        proj_z = numpy.dot(vec, Z_VEC) * Z_VEC
        proj_xy = vec - proj_z
        if transform.get_vector_magnitude(proj_xy) < 1E-3:
            scaled_proj_xy = atom.radius * X_VEC
        else:
            scaled_proj_xy = proj_xy + atom.radius * transform.get_normalized_vector(
                proj_xy)
        scaled_vec = proj_z + scaled_proj_xy
        return 2 * math.degrees(
            transform.get_angle_between_vectors(scaled_vec, Z_VEC)) 
[docs]    def getConeAngle(self):
        """
        Return the cone angle in degrees.
        :rtype: float
        :return: the cone angle in degrees
        """
        # follows from https://en.wikipedia.org/wiki/Ligand_cone_angle, however
        # the convention for reporting cone angles is not to use 3D solid angles
        # in unitless steradians but rather simply 2D angles in degrees (twice
        # the angle to the cone's central axis)
        # the following cone angles in units of degrees are between maximally
        # subtending ligand branch atoms and the z-axis
        cone_angles = []
        # loop over groups of coordinating atoms that are bonded to one another,
        # for example where ligand hapticity > 1
        for coordination_idxs in self.coordination_idxs:
            # rotate the ligand so that the metal-to-centroid vector is along
            # the z-axis and translate so that the metal atom would be at the
            # origin
            coordination_vec = self.getCoordinationVec(self.st,
                                                       coordination_idxs)
            rot_matrix = transform.get_alignment_matrix(coordination_vec, Z_VEC)
            st = self.st.copy()
            transform.transform_structure(st, rot_matrix)
            dx, dy, dz = map(lambda x: -1 * x,
                             self.getCentroid(st, coordination_idxs))
            dz += transform.get_vector_magnitude(coordination_vec)
            transform.translate_structure(st, x=dx, y=dy, z=dz)
            # loop over coordinating atoms in the current coordinating group
            for coordination_idx in coordination_idxs:
                coordination_atom = st.atom[coordination_idx]
                # if this entire ligand is a single atom then make an arbitrary
                # vector and obtain the single angle
                if not coordination_atom.bond_total:
                    cone_angles.append(self.getAtomConeAngle(coordination_atom))
                # otherwise loop over ligand branches eminating from the current
                # coordinating atom and collect angles
                for branch_start_atom in coordination_atom.bonded_atoms:
                    if branch_start_atom.index in coordination_idxs:
                        continue
                    branch_idxs = rxn_channel.indicies_from_bonds_deep(
                        st,
                        branch_start_atom.index,
                        exclude_atom_indicies=[coordination_idx])
                    # FIXME - it is not clear from the documentation how to handle
                    # a branch containing sub-branches that both do and do not
                    # coordinate (see the following classification)
                    # loop over other groups of coordinating atoms, if the current
                    # branch coordinates somewhere in addition to the coordination
                    # from which it eminates then we have a multidentate ligand
                    # which needs special angle handling
                    branch_coordinates = False
                    for other_coordination_idxs in self.coordination_idxs:
                        if other_coordination_idxs == coordination_idxs:
                            continue
                        if set(other_coordination_idxs).issubset(
                                set(branch_idxs)):
                            branch_coordinates = True
                            pair = [coordination_idxs, other_coordination_idxs]
                            vec_idxs, vec_jdxs = map(
                                lambda x: self.getCoordinationVec(self.st, x),
                                pair)
                            bite_angle = math.degrees(
                                transform.get_angle_between_vectors(
                                    vec_idxs, vec_jdxs))
                            cone_angles.append(bite_angle)
                    # if the branch has no other coordination then loop over
                    # branch atoms, find the atom that maximally subtends, and
                    # collect the angle
                    if not branch_coordinates:
                        max_cone_angle = 0
                        for branch_idx in branch_idxs:
                            branch_atom = st.atom[branch_idx]
                            cone_angle = self.getAtomConeAngle(branch_atom)
                            if cone_angle > max_cone_angle:
                                max_cone_angle = cone_angle
                        cone_angles.append(max_cone_angle)
        return numpy.mean(cone_angles) 
[docs]    def getBondLength(self):
        """
        Return the bond length in Ang.
        :rtype: float
        :return: the bond length in Ang.
        """
        return numpy.mean([
            transform.get_vector_magnitude(self.getCoordinationVec(self.st, x))
            for x in self.coordination_idxs
        ]) 
[docs]    def getDescriptors(self):
        """
        Return descriptors.
        :rtype: dict
        :return: (label, data) pairs
        """
        adict = {}
        adict['stoichiometry'] = self.getStoichiometry()
        adict['denticity'] = self.getDenticity()
        adict['hapticity'] = self.getHapticity()
        adict['haptic_character'] = self.getHapticCharacter()
        bite_angle = self.getBiteAngle()
        if bite_angle is not None:
            adict['bite_angle/deg.'] = bite_angle
        adict['cone_angle/deg.'] = self.getConeAngle()
        adict['bond_length/Ang.'] = self.getBondLength()
        return adict  
[docs]class Complex(object):
    """
    Manage a complex.
    """
    BURIED_VOLUME_VDW_SCALE = 1.17
    CONTOURS_DIR = 'contours'
[docs]    def __init__(self, st, logger=None, nonmetallic_centers=()):
        """
        Create an instance.
        :type st: `schrodinger.structure.Structure`
        :param st: the structure
        :type logger: logging.Logger or None
        :param logger: output logger or None if there isn't one
        :param tuple nonmetallic_centers: Tuple of nonmetallic elements to also
            consider when looking for center atom
        """
        self.st = st
        self.logger = logger
        self.nonmetallic_centers = nonmetallic_centers
        self.metal_atom = None
        self.ligands = [] 
[docs]    def setLigands(self):
        """
        Set the ligands.
        """
        self.setMetalAtom()
        if not self.metal_atom:
            return []
        all_coordination_idxs = [
            atom.index for atom in self.metal_atom.bonded_atoms
        ]
        st = self.st.copy()
        for idx in all_coordination_idxs:
            st.deleteBond(self.metal_atom.index, idx)
        self.ligands = []
        for molecule in st.molecule:
            all_idxs = molecule.getAtomIndices()
            ligand = molecule.extractStructure()
            new_to_old = dict(zip(range(1, ligand.atom_total + 1), all_idxs))
            old_to_new = dict([(v, k) for k, v in new_to_old.items()])
            coordination_idxs = [
                old_to_new[idx]
                for idx in all_idxs
                if idx in all_coordination_idxs
            ]
            if coordination_idxs:
                coordination_idxs = analyze.group_by_connectivity(
                    ligand, coordination_idxs)
                self.ligands.append(
                    Ligand(ligand, self.metal_atom, new_to_old,
                           coordination_idxs)) 
[docs]    def getBondAngle(self):
        """
        Return the bond angle in degrees.
        :rtype: float
        :return: the bond angle in degrees
        """
        bond_vectors = []
        for ligand in self.ligands:
            for coordination_idxs in ligand.coordination_idxs:
                bond_vector = ligand.getCoordinationVec(ligand.st,
                                                        coordination_idxs)
                bond_vectors.append(bond_vector)
        bond_angles = [
            transform.get_angle_between_vectors(vec_i, vec_j)
            for vec_i, vec_j in itertools.combinations(bond_vectors, 2)
        ]
        return math.degrees(numpy.mean(bond_angles)) 
[docs]    def getVDWSurfaceArea(self):
        """
        Return the VDW surface area in Angstrom^2.
        :rtype: float
        :return: the VDW surface area in Angstrom^2
        """
        name = 'surface'
        surf = surface.Surface.newMolecularSurface(
            self.st,
            name,
            resolution=0.5,
            mol_surf_type=surface.MolSurfType.vdw)
        return surf.surface_area 
[docs]    def getVDWVolume(self, vdw_scale=1, buffer_len=2):
        """
        Return the VDW volume in Angstrom^3.
        :type vdw_scale: float
        :param vdw_scale: the VDW scale
        :type buffer_len: float
        :param buffer_len: a shape buffer lengths in Angstrom
        :rtype: float
        :return: the VDW volume in Angstrom^3
        """
        return amorphous.Scaffold.approximateVolume(self.st,
                                                    vdw_scale=vdw_scale,
                                                    seed=123,
                                                    buffer_len=buffer_len) 
[docs]    def getStructureContainingLargestLigands(self):
        """
        Return a structure containing the largest ligand or multiple copies
        thereof if it is symmetric.
        :rtype: `schrodinger.structure.Structure`
        :return: the structure containing the largest ligand(s)
        """
        max_n_atoms = max(ligand.st.atom_total for ligand in self.ligands)
        idxs = [self.metal_atom.index]
        for ligand in self.ligands:
            if ligand.st.atom_total < max_n_atoms:
                idxs.extend(
                    ligand.new_to_old[atom.index] for atom in ligand.st.atom)
        st = self.st.copy()
        st.deleteAtoms(idxs)
        return st 
[docs]    def getBuriedVDWVolumePct(self, vdw_scale=BURIED_VOLUME_VDW_SCALE):
        """
        Return the buried VDW volume percent.
        :type vdw_scale: float
        :param vdw_scale: the VDW scale
        :rtype: float
        :return: the buried VDW volume percent
        """
        # the algorithm for buried VDW volume percent is defined
        # in ACS Catal. 2015, 6815 and Eur. J. Inorg. Chem. 2009, 1759,
        # however here we use a slightly different metal positioning
        # and a Monte Carlo algorithm
        st = self.getStructureContainingLargestLigands()
        volume, ratio = amorphous.Scaffold.approximateVolume(
            st,
            vdw_scale=vdw_scale,
            seed=123,
            sphere_radius=3.5,
            sphere_origin=numpy.array(self.metal_atom.xyz),
            buffer_len=0,
            return_ratio=True)
        return ratio 
[docs]    def getFreeVolumeVector(self):
        """
        Return a unit vector pointing from the metal atom of the complex
        in the direction of free volume.
        :rtype: numpy.array
        :return: the free volume unit vector
        """
        max_n_atoms = max(ligand.st.atom_total for ligand in self.ligands)
        bond_vectors = []
        for ligand in self.ligands:
            if ligand.st.atom_total < max_n_atoms:
                continue
            for coordination_idxs in ligand.coordination_idxs:
                bond_vector = ligand.getCoordinationVec(ligand.st,
                                                        coordination_idxs)
                bond_vectors.append(bond_vector)
        return -1 * transform.get_normalized_vector(sum(bond_vectors)) 
[docs]    def getRotatedComplex(self):
        """
        Return a copy of the complex that is rotated so that the free volume
        vector points along the positive z-axis.
        :rtype: `structure.Structure`
        :return: A rotated copy of the input structure
        """
        struct = self.st.copy()
        transform.translate_atom_to_origin(struct,
                                           struct.atom[self.metal_atom.index])
        vec = self.getFreeVolumeVector()
        # see MATSCI-11215 where symmetric TiCl4 results in a vector of
        # zero length, in this case do not rotate
        if transform.get_vector_magnitude(vec) > 0:
            rot_matrix = transform.get_alignment_matrix(vec, Z_VEC)
            transform.transform_structure(struct, rot_matrix)
        return struct 
[docs]    def exportBuriedVolumeContour(self,
                                  sphere_radius=3.5,
                                  vdw_scale=BURIED_VOLUME_VDW_SCALE,
                                  num_bins=30,
                                  seed=1234):
        """
        Export the buried volume contour for the complex
        :param float sphere_radius: The radius for the sphere to sample points in
        :param float vdw_scale: The VdW scale factor to apply to VdW radii when
            checking to see if a point is "inside" an atom
        :param int num_bins: The number of bins in x and y direction to put the
            points in
        :param int seed: Seed for random number generation
        :rtype: str, str
        :return: The paths to contour png and csv files
        """
        if seed is not None:
            my_random = random.Random()
            my_random.seed(seed)
        else:
            my_random = random
        struct = self.getRotatedComplex()
        max_rad = amorphous.get_maximum_vdw_radius(struct)
        max_rad *= vdw_scale
        d_cell, _ = cstruct.create_distance_cell(struct, max_rad)
        bins = defaultdict(dict)
        bin_edges = numpy.linspace(-sphere_radius,
                                   sphere_radius,
                                   num=num_bins + 1,
                                   endpoint=True)
        num_tries = max(3000 * struct.atom_total, 200000)
        for _ in range(num_tries):
            vec = [my_random.gauss(0, 1) for _ in range(3)]
            coords = (sphere_radius * my_random.uniform(0, 1)**
                      (1. / 3.)) * transform.get_normalized_vector(vec)
            neighbor_atoms = [
                struct.atom[match.getIndex()]
                for match in d_cell.query_atoms(*coords)
            ]
            for atom in neighbor_atoms:
                dist = transform.get_vector_magnitude(atom.xyz - coords)
                if dist <= atom.radius * vdw_scale:
                    # idx is 1 based. 0 means less than first bin edge
                    x_idx = numpy.digitize(coords[0], bin_edges)
                    y_idx = numpy.digitize(coords[1], bin_edges)
                    # Store the point with the highest Z value in each bin
                    if (y_idx not in bins[x_idx] or
                            bins[x_idx][y_idx][2] < coords[2]):
                        bins[x_idx][y_idx] = coords
                    break
        contour_points = []
        for inner_dict in bins.values():
            for point in inner_dict.values():
                contour_points.append(point)
        contour_points = numpy.array(contour_points)
        self.plotContour(contour_points)
        os.makedirs(self.CONTOURS_DIR, exist_ok=True)
        title = getattr(self.st, 'unique_title', None) or self.st.title
        png_path = os.path.join(self.CONTOURS_DIR, f'{title}.png')
        self.canvas.print_figure(png_path)
        jobutils.add_outfile_to_backend(png_path)
        csv_path = os.path.join(self.CONTOURS_DIR, f'{title}.csv')
        numpy.savetxt(csv_path, contour_points, header='X, Y, Z', delimiter=',')
        jobutils.add_outfile_to_backend(csv_path)
        return png_path, csv_path 
[docs]    def plotContour(self, points):
        """
        Plot a contour for the passed points. matplotlib uses triangulation to
        create a grid for the contour.
        :param `numpy.array` points: The x, y, z values of points
        """
        from matplotlib import figure
        from matplotlib.backends.backend_agg import FigureCanvasAgg
        fig = figure.Figure(figsize=(10, 7.5), dpi=200)
        self.canvas = FigureCanvasAgg(fig)
        axes = fig.add_subplot(111)
        fig.subplots_adjust(left=0.05, right=0.95)
        cmap = matplotlib.cm.get_cmap('jet')
        contour = axes.tricontourf(points[:, 0],
                                   points[:, 1],
                                   points[:, 2],
                                   15,
                                   cmap=cmap)
        fig.colorbar(contour)
        self.canvas.draw() 
[docs]    def getVectorizedDescriptors(self, jaguar_out_file):
        """
        Return vectorized descriptors which are instance specific
        descriptors that are vectorized, for example depending on
        the molecule's geometric orientation, atom indexing, etc.
        :type jaguar_out_file: str or None
        :param jaguar_out_file: the name of a Jaguar `*.out` file from
            which descriptors will be extracted or None if there isn't
            one
        :rtype: dict
        :return: (label, data) pairs
        """
        # these properties are instance dependent and should be reduced
        # to a single hash (potentially non-physical), that can be used to
        # quantify the differences between different molecules, prior to
        # use in building machine learning models
        adict = {}
        if jaguar_out_file is None:
            return adict
        base_cmd = ['jaguar', 'results', jaguar_out_file]
        for moment_data in MOMENTS_DATA:
            cmd = base_cmd + [moment_data.flag]
            try:
                output = subprocess.check_output(cmd)
            except subprocess.CalledProcessError as err:
                if self.logger:
                    self.logger.warning(f'The following command fails: {cmd}.')
                    self.logger.warning(str(err))
                    self.logger.info('')
                continue
            output = output.strip()
            # if you query non-existing data then you get 'N/A'
            if not output or output == 'N/A':
                continue
            try:
                output = list(map(float, output.split()))
            except ValueError:
                continue
            for component, value in zip(moment_data.components, output):
                key = moment_data.header + '_' + component + '/' + moment_data.units
                adict[key] = value
        return adict 
[docs]    def getDescriptors(self, no_organometallic=False):
        """
        Return descriptors.
        :param bool no_organometallic: Whether organometallic descriptors should
            be skipped
        :rtype: dict
        :return: (label, data) pairs
        """
        adict = {}
        adict['vdw_surface_area/Ang.^2'] = self.getVDWSurfaceArea()
        adict['vdw_volume/Ang.^3'] = self.getVDWVolume()
        if no_organometallic:
            return adict
        self.setLigands()
        if self.ligands:
            for idx, ligand in enumerate(self.ligands, 1):
                base_label = f'ligand_{idx}_'
                for label, datum in ligand.getDescriptors().items():
                    adict[base_label + label] = datum
            adict['bond_angle/deg.'] = self.getBondAngle()
            adict['buried_volume_%'] = self.getBuriedVDWVolumePct()
            png_path, csv_path = self.exportBuriedVolumeContour()
            adict['buried_volume_contour'] = png_path
            adict['buried_volume_csv'] = csv_path
        return adict  
[docs]def get_unique_titles(sts):
    """
    Return a list of unique titles for the given structures.
    :type sts: list
    :param sts: contains `schrodinger.structure.Structure`
    :rtype: list
    :return: the unique titles
    """
    cleaner = jobutils.StringCleaner(separator='.')
    titles = []
    for st in sts:
        title = st.title.strip() or 'structure'
        titles.append(cleaner.cleanAndUniquify(title))
    return titles 
[docs]class ComplexFeatures(BaseFeaturizer):
    """
    Class to generate features for metal complexes.
    """
[docs]    def __init__(self,
                 jaguar=False,
                 jaguar_keywords=rxnwfu.DEFAULT_JAGUAR_KEYWORDS,
                 tpp=rxnwfu.DEFAULT_TPP,
                 ligfilter=False,
                 no_organometallic=False,
                 nonmetallic_centers=(),
                 canvas=False,
                 moldescriptors=False,
                 include_vectorized=False,
                 save_files=False,
                 logger=None):
        """
        Create an instance.
        :type jaguar: bool
        :param jaguar: specify whether to calculate Jaguar features
        :type jaguar_keywords: OrderedDict
        :param jaguar_keywords: if Jaguar jobs must be run to calculate
            the Jaguar features then specify the Jaguar keywords here
        :type tpp: int
        :param tpp: the number of threads for any Jaguar jobs
        :type ligfilter: bool
        :param ligfilter: specify whether to calculate Ligfilter features
        :type no_organometallic: bool
        :param no_organometallic: Whether organometallic descriptors should be
            skipped
        :type canvas: bool
        :param canvas: specify whether to calculate Canvas features
        :type moldescriptors: bool or list
        :param moldescriptors: specify whether to calculate Molecular Descriptors
            features. If it's a list, it contains command line arguments for
            moldescriptors
        :type include_vectorized: bool
        :param include_vectorized: whether to include instance
            specific features that are vectorized, for example depending on
            the molecule's geometric orientation, atom indexing, etc.
        :param bool save_files: Whether to save subjob files or not
        :type logger: logging.Logger or None
        :param logger: output logger or None if there isn't one
        """
        self.jaguar = jaguar
        self.jaguar_keywords = jaguar_keywords
        self.tpp = tpp
        self.ligfilter = ligfilter
        self.no_organometallic = no_organometallic
        self.nonmetallic_centers = nonmetallic_centers
        self.canvas = canvas
        self.moldescriptors = moldescriptors
        self.include_vectorized = include_vectorized
        self.save_files = save_files
        self.logger = logger 
[docs]    def runJaguar(self):
        """
        Run Jaguar on the given structures.
        :rtype: list
        :return: contains Jaguar `*.out` file names
        """
        options = argparse.Namespace(TPP=self.tpp)
        job_dj = jobutils.create_queue(options=options, host=jobutils.AUTOHOST)
        max_atoms = jaguarworkflows.get_jaguar_max_atoms()
        titles = []
        to_skip = set()
        for st in self.structs:
            title = st.unique_title
            titles.append(title)
            if st.atom_total > max_atoms:
                to_skip.add(title)
                continue
            # handle restarts
            if not jaguar_restart.needs_restart(title):
                continue
            _st = st.copy()
            kwargs = {'structure': _st}
            ji = JaguarInput(**kwargs)
            kwargs = self.jaguar_keywords.copy()
            kwargs.update(RESERVED_JAGUAR_KEYWORDS)
            ji.setValues(kwargs)
            ji.setStructure(_st)
            if ji.sectionDefined('valid'):
                ji.deleteSection('valid')
            in_file = title + '.in'
            ji.saveAs(in_file)
            cmd = ['jaguar', 'run', '-TPP', str(self.tpp), in_file]
            job_dj.addJob(jobutils.RobustSubmissionJob(cmd))
        if job_dj.total_added:
            self.log('Running Jaguar jobs...', pad=True)
            job_dj.run()
        out_files = []
        for title in titles:
            if title in to_skip:
                out_files.append(None)
                continue
            if self.save_files:
                jaguarworkflows.add_jaguar_files_to_jc_backend(title)
            out_file = title + '.out'
            if not os.path.exists(out_file):
                out_files.append(None)
                continue
            jag_out = JaguarOutput(out_file)
            if jag_out.status != JaguarOutput.OK or jag_out.fatal_error:
                out_files.append(None)
                continue
            out_files.append(out_file)
        return out_files 
[docs]    def getFeatures(self, structs, jaguar_out_files=None):
        """
        Return features dictionary for the given structures
        :type structs: list(`schrodinger.structure.Structure`)
        :param structs: list of structures to be featurized
        :type jaguar_out_files: list or None
        :param jaguar_out_files: if Jaguar features should be calculated
            using existing Jaguar `*.out` files then specify the files here
            using the same ordering as used for any given structures
        """
        self.structs = structs
        self.jaguar_out_files = jaguar_out_files
        self.struct_file = None
        # Set titles on structures
        titles = get_unique_titles(structs)
        for struct, unique_title in zip(structs, titles):
            struct.unique_title = unique_title
        if self.jaguar:
            self.verifyJaguarOutfiles()
        # An empty dict should be returned if no descriptors were calculated
        # for a structure. The order is the same as that of input structures
        all_descriptors = {title: {} for title in titles}
        descriptors_dicts = [
            self.getComplexDescriptors(),
            self.getJaguarDescriptors(),
            self.getUtilityDescriptors()
        ]
        # Merge all dictionaries
        for title in titles:
            for descriptors in descriptors_dicts:
                if title in descriptors:
                    all_descriptors[title].update(descriptors[title])
        return all_descriptors 
[docs]    def verifyJaguarOutfiles(self):
        """
        Run jaguar and get the out-files if the out-files have not been provided
        """
        if self.jaguar_out_files:
            assert len(self.jaguar_out_files) == len(self.structs)
            return
        max_atoms = jaguarworkflows.get_jaguar_max_atoms()
        to_skip = [st for st in self.structs if st.atom_total > max_atoms]
        all_skipped = len(to_skip) == len(self.structs)
        if to_skip:
            self.log('Jaguar descriptors for input structures with more '
                     f'than {max_atoms} atoms will be skipped.')
        if all_skipped:
            self.log('Skipping all Jaguar descriptors.')
        if not all_skipped:
            jaguar_out_files = self.runJaguar()
            all_failed = all(out is None for out in jaguar_out_files)
            if all_failed:
                self.log(
                    'All Jaguar jobs failed. Skipping all Jaguar descriptors.')
        else:
            jaguar_out_files = [None] * len(self.structs)
        self.jaguar_out_files = jaguar_out_files 
[docs]    def getComplexDescriptors(self):
        """
        Create a `Complex` object for each structure and get their descriptors
        :rtype: dict
        :return: The descriptors from `Complex` for each structure
        """
        descriptors = {}
        # Use Agg rather than QTAgg for contours to prevent requiring graphics
        # libraries in case running on a remote compute host
        old_backend = matplotlib.get_backend()
        matplotlib.use('agg', warn=False, force=True)
        for idx, struct in enumerate(self.structs):
            acomplex = Complex(struct,
                               logger=self.logger,
                               nonmetallic_centers=self.nonmetallic_centers)
            st_dict = acomplex.getDescriptors(
                no_organometallic=self.no_organometallic)
            if self.include_vectorized:
                out_file = self.jaguar_out_files[idx]
                st_dict.update(acomplex.getVectorizedDescriptors(out_file))
            descriptors[struct.unique_title] = st_dict
        matplotlib.use(old_backend, warn=False)
        return descriptors 
[docs]    def getJaguarDescriptors(self):
        """
        Return Jaguar descriptors for all structures. Sets Jaguar atom
        descriptors on structures.
        :rtype: dict
        :return: The jaguar descriptors for all structures. Keys are structure
            titles and values are dictionaries containing descriptors
        """
        if not self.jaguar or all(out is None for out in self.jaguar_out_files):
            return {}
        out_indexes = []
        for idx, out_file in enumerate(self.jaguar_out_files):
            if out_file is not None:
                out_indexes.append(idx)
        job_name = 'qm_descriptors'
        job_dj = jobutils.create_queue(host=jobutils.AUTOHOST)
        cmd = [
            'jaguar', 'run', 'qm_descriptors.py', '-formats', 'mae', '-jobname',
            job_name, '-outs'
        ] + [self.jaguar_out_files[idx] for idx in out_indexes]
        job_dj.addJob(jobutils.RobustSubmissionJob(cmd))
        self.log('Running job for Jaguar descriptors...', pad=True)
        job_dj.run()
        out_file = 'all_calcs.props.mae'
        if self.save_files:
            jobutils.add_outfile_to_backend(job_name + '.log')
            jobutils.add_outfile_to_backend(out_file)
        if not os.path.exists(out_file):
            self.log("Could not find Jaguar descriptor results. " +
                     SKIPPING_MSG)
            return {}
        out_sts = list(structure.StructureReader(out_file))
        if len(out_sts) != len(out_indexes):
            self.log(INPUT_OUTPUT_ERROR.format('Jaguar descriptors'))
            return {}
        descriptors = {}
        for idx, in_st in enumerate(self.structs):
            if idx not in out_indexes:
                continue
            out_st = out_sts.pop(0)
            # Structure descriptors
            descriptors[in_st.unique_title] = {
                key: val
                for key, val in out_st.property.items()
                if key[1:4] == JAGUAR_PROP_FAMILY
            }
            if len(in_st.atom) != len(out_st.atom):
                self.log('Different number of atoms in input and output of '
                         f'Jaguar descriptors for structure {idx + 1} ' +
                         f'({in_st.title}). Atom descriptors will be skipped.')
                continue
            # Atom descriptors are set directly on the input structures
            for in_atom, out_atom in zip(in_st.atom, out_st.atom):
                in_atom.property.update({
                    key: val
                    for key, val in out_atom.property.items()
                    if key[1:4] == JAGUAR_PROP_FAMILY
                })
        return descriptors 
[docs]    def getUtilityDescriptors(self):
        """
        Get the requested utility descriptors for all structures
        :rtype: dict
        :return: The descriptor utility descriptors for all structures. Keys are
             structure titles and values are dictionaries containing descriptors
        """
        jobs_dict = {}
        if self.ligfilter:
            jobs_dict[LIGFILTER_DU.descriptor_header] = \
                
self.getDescriptorUtilityJob(LIGFILTER_DU)
        if self.canvas:
            jobs_dict[CANVAS_DU.descriptor_header] = \
                
self.getDescriptorUtilityJob(CANVAS_DU)
        mol_desc_job = self.getMolecularDescriptorsJob()
        if mol_desc_job is not None:
            jobs_dict[MOLDESCRIPTORS_DU.descriptor_header] = mol_desc_job
        if not jobs_dict:
            return {}
        job_dj = jobutils.create_queue(host=jobutils.AUTOHOST)
        for job in jobs_dict.values():
            job_dj.addJob(job)
        self.log('Running descriptor utilities: ' + ', '.join(jobs_dict.keys()),
                 pad=True)
        job_dj.run()
        return self.processUtilityDescriptorOutputs(jobs_dict) 
[docs]    def getDescriptorUtilityJob(self, descriptor_utility):
        """
        Get the job to run to generate the descriptors using the passed
        descriptor_utility for all structures
        :param `DescriptorUtility` descriptor_utility: The descriptor utility
            to run to get the descriptors
        :rtype: `jobutils.RobustSubmissionJob`
        :return: The job to run to generate the descriptors
        """
        if self.struct_file is None:
            self.struct_file = 'structures-in.mae'
            with structure.StructureWriter(self.struct_file) as writer:
                writer.extend(self.structs)
        in_file = descriptor_utility.descriptor_header + '-in.mae'
        shutil.copy(self.struct_file, in_file)
        out_file = descriptor_utility.descriptor_header + '-out.mae'
        cmd = [descriptor_utility.script_name]
        cmd += descriptor_utility.other_flags
        if descriptor_utility.in_mae_flag:
            cmd += [descriptor_utility.in_mae_flag, in_file]
        else:
            cmd += [in_file]
        cmd += [descriptor_utility.out_mae_flag, out_file]
        return descriptor_utility.job_class(cmd) 
[docs]    def processUtilityDescriptorOutputs(self, jobs_dict):
        """
        Read the descriptors for all descriptor utilities that were run, and
        return them
        :type jobs_dict: dict
        :param jobs_dict: Dictionary with `DescriptorUtility` as keys and jobs
            as values
        :rtype: dict
        :return: The descriptor utility descriptors for all structures. Keys are
             structure titles and values are dictionaries containing descriptors
        """
        descriptors = defaultdict(dict)
        for descriptor_utility in [LIGFILTER_DU, CANVAS_DU, MOLDESCRIPTORS_DU]:
            du_name = descriptor_utility.descriptor_header
            if du_name not in jobs_dict:
                continue
            job = jobs_dict[du_name]
            out_file = du_name + '-out.mae'
            if self.save_files:
                if descriptor_utility.job_class == queue.SubprocessJob:
                    jobutils.add_outfile_to_backend(out_file)
                else:
                    jobutils.add_subjob_files_to_backend(job)
            if not os.path.exists(out_file):
                self.log(f'Could not find descriptor results for {du_name}. ' +
                         SKIPPING_MSG)
                continue
            if structure.count_structures(out_file) != len(self.structs):
                self.log(INPUT_OUTPUT_ERROR.format(du_name))
                continue
            with structure.StructureReader(out_file) as reader:
                for in_st, out_st in zip(self.structs, reader):
                    st_dict = {}
                    for key, value in out_st.property.items():
                        parts = key.split('_')
                        if len(parts) < 3:
                            continue  # Incorrect property name
                        if parts[1] in descriptor_utility.property_families:
                            st_dict[key] = value
                    descriptors[in_st.unique_title].update(st_dict)
        return descriptors 
[docs]    def getMolecularDescriptorsJob(self):
        """
        Get the job to run to generate molecular descriptors for all structures
        :rtype: `jobutils.RobustSubmissionJob`
        :return: The job to run to generate the descriptors
        """
        if not self.moldescriptors:
            return None
        if self.moldescriptors is True:
            other_flags = list(MOLDESCRIPTORS_DU.other_flags)
        else:
            # moldescriptors is a list of command line flags
            other_flags = list(self.moldescriptors)
        if self.save_files:
            other_flags += ['-savefiles']
        other_flags += ['-mopac_keywords', 'super']  # MATSCI-9017
        other_flags += ['-skipfailed']  # MATSCI-9580
        utility = MOLDESCRIPTORS_DU._replace(other_flags=other_flags)
        return self.getDescriptorUtilityJob(utility) 
[docs]    @staticmethod
    def writeFingerprintFiles(structs):
        """
        Write fingerprint files for the given structures.
        :type structs: list(`schrodinger.structure.Structure`)
        :param structs: list of structures to be fingerprinted
        :rtype: list
        :return: the fingerprint file names
        """
        sts, props = [], []
        obj = go.CanvasKPLS(sts, props)
        titles = get_unique_titles(structs)
        name = 'dummy'
        precision = obj.SINGLE_PRECISION
        atom_types = None
        fp_files = []
        for title, st in zip(titles, structs):
            for fp_type in obj.ALLOWED_FP_TYPES:
                mae_file = title + f'.{fp_type}{go.IN_MAE_EXT}'
                st.write(mae_file)
                obj.fp_options_dict = {name: (precision, fp_type, atom_types)}
                try:
                    fp_file = obj.makeFingerPrintInfile(mae_file, name)
                except RuntimeError:
                    continue
                fp_files.append(fp_file)
                fileutils.force_remove(mae_file)
        return fp_files 
[docs]    def log(self, msg, **kwargs):
        """
        Add a message to the log file
        :param str msg: The message to log
        Additional keyword arguments are passed to the textlogger.log_msg function
        """
        textlogger.log_msg(msg, logger=self.logger, **kwargs)  
[docs]class CrystalNNFeatures:
    """
    Calculates CrystalNN structure fingerprints as implemented in pymatgen
    """
    OPS_PRESET = 'ops'
    CN_PRESET = 'cn'
[docs]    def __init__(self, preset=OPS_PRESET):
        """
        Create a structure featurizer
        :param str preset: One of `OPS_PRESET` or `CN_PRESET` class constants
        """
        site_featurizer = CrystalNNFingerprint.from_preset(
            preset, distance_cutoffs=None, x_diff_weight=0)
        self.st_featurizer = SiteStatsFingerprint(site_featurizer,
                                                  stats=('mean', 'maximum')) 
[docs]    def featurize(self, struct):
        """
        Get CrystalNN fingerprints for the passed structure
        :param `structure.Structure` The structure to get features for
        :rtype: list
        :return: List of CrystalNN fingerprints for the structure
        """
        if not xtal.has_pbc(struct):
            raise ValueError(
                f'"{struct.title}" has no periodic boundary conditions.')
        pmg_struct = xtal.get_pymatgen_structure(struct)
        return self.st_featurizer.featurize(pmg_struct)