"""
Classes and functions for the genetic optimization module.
Copyright Schrodinger, LLC. All rights reserved."""
# Contributor: Thomas F. Hughes
import ast
import glob
import logging
import math
import ntpath
import os
import re
import shutil
import string
import sys
import tarfile
import traceback
from collections import namedtuple
from functools import reduce
import numpy
from pyevolve import Consts
from pyevolve import GenomeBase
from pyevolve import GSimpleGA
from pyevolve import Scaling
from pyevolve import Selectors
from pyevolve import Util
from scipy import constants
import schrodinger.structure as structure
from schrodinger.application.jaguar.input import JaguarInput
from schrodinger.application.jaguar.output import JaguarOutput
from schrodinger.application.macromodel.input import ConfSearch
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import optoelectronics
from schrodinger.application.matsci import msprops
from schrodinger.application.matsci import rxn_path
from schrodinger.application.matsci import textlogger
from schrodinger.application.matsci import rxn_channel
from schrodinger.infra import mm
from schrodinger.job import queue
from schrodinger.job.util import hunt
from schrodinger.structure import workflow_action_menu as wam
from schrodinger.structutils import analyze
from schrodinger.structutils import build
from schrodinger.structutils import color
from schrodinger.structutils import transform
from schrodinger.utils import fileutils
from schrodinger.utils import imputils
from schrodinger.utils import subprocess
# MATSCI-5160 - in contrast to the random module
# the methods in the numpy.random module are consistent
# between Python 2 and 3
my_random = numpy.random.RandomState()
PYEVOLVE_LOG_EXT = '-pyevolve.log'
IN_MAE_EXT = '-in.mae'
OUT_MAE_EXT = '-out.mae'
BOND_CROSSOVER = 'bond'
CROSSOVER_CHOICES = [BOND_CROSSOVER]
DEFAULT_CROSSOVERS = [BOND_CROSSOVER]
ELEMENTAL_MUTATOR = 'elemental'
FRAGMENT_MUTATOR = 'fragment'
ISOELECTRONIC_MUTATOR = 'isoelectronic'
MUTATOR_CHOICES = [ELEMENTAL_MUTATOR, FRAGMENT_MUTATOR, ISOELECTRONIC_MUTATOR]
DEFAULT_MUTATORS = [FRAGMENT_MUTATOR, ISOELECTRONIC_MUTATOR]
GENERATIONS = 10
POPULATION = 8
CROSSOVER_RATE = 90.0
MUTATION_RATE = 90.0
RANK_SELECTION = 'rank'
ROULETTE_WHEEL_SELECTION = 'roulette_wheel'
TOURNAMENT_SELECTION = 'tournament'
TOURNAMENT_SELECTION_WITH_ROULETTE = 'tournament_with_roulette'
UNIFORM_SELECTION = 'uniform'
SELECTION_DICT = {
RANK_SELECTION: Selectors.GRankSelector,
ROULETTE_WHEEL_SELECTION: Selectors.GRouletteWheel,
TOURNAMENT_SELECTION: Selectors.GTournamentSelectorAlternative,
TOURNAMENT_SELECTION_WITH_ROULETTE: Selectors.GTournamentSelector,
UNIFORM_SELECTION: Selectors.GUniformSelector
}
SELECTION_CHOICES = [RANK_SELECTION, ROULETTE_WHEEL_SELECTION, \
TOURNAMENT_SELECTION, TOURNAMENT_SELECTION_WITH_ROULETTE, UNIFORM_SELECTION]
DEFAULT_SELECTION = ROULETTE_WHEEL_SELECTION
TOURNAMENT_SIZE = 2
UNPRODUCTIVE_TERM = 'unproductive'
FIRST_PROPERTY_TERM = 'first_property'
ALL_PROPERTIES_TERM = 'all_properties'
MAX_GENERATIONS_TERM = 'max_generations'
TERM_CHOICES = [UNPRODUCTIVE_TERM, FIRST_PROPERTY_TERM, ALL_PROPERTIES_TERM, \
MAX_GENERATIONS_TERM]
DEFAULT_TERMS = [UNPRODUCTIVE_TERM, ALL_PROPERTIES_TERM]
NUM_UNPRODUCTIVE = 6
LINEAR_SCALING = 'linear'
POWER_LAW_SCALING = 'power_law'
SIGMA_TRUNCATION_SCALING = 'sigma_truncation'
BOLTZMANN_SCALING = 'boltzmann'
SCALING_DICT = {
LINEAR_SCALING: Scaling.LinearScaling,
POWER_LAW_SCALING: Scaling.PowerLawScaling,
SIGMA_TRUNCATION_SCALING: Scaling.SigmaTruncScaling,
BOLTZMANN_SCALING: Scaling.BoltzmannScaling
}
SCALING_CHOICES = [
LINEAR_SCALING, POWER_LAW_SCALING, SIGMA_TRUNCATION_SCALING,
BOLTZMANN_SCALING
]
DEFAULT_SCALING = SIGMA_TRUNCATION_SCALING
ALLOWS_NEGATIVE_SCORES = [SIGMA_TRUNCATION_SCALING, BOLTZMANN_SCALING]
ELITISM = 1
RANDOM_SEED = None
RANDOM_INT_BOUND = 1000000
NO_MINIMIZE = False
INDIVIDUAL_KEY = 'i_matsci_individual_index'
GENERATION_KEY = 'i_matsci_generation'
STRUCTURE_SCORE_KEY = 'r_matsci_structure_score' # this is a type of raw score
RAW_SCORE_KEY = 'r_matsci_raw_score'
FIT_SCORE_KEY = 'r_matsci_fit_score'
SKIP_KEY = 'b_matsci_skipped'
FAILURE_KEY = 'b_matsci_failed'
DEFAULT_EVAL_KWARGS = {}
ORGANIC = 'organic'
N_HETEROCYCLES = 'N-heterocycles'
O_HETEROCYCLES = 'O-heterocycles'
S_HETEROCYCLES = 'S-heterocycles'
MIXED_HETEROCYCLES = 'Mixed-heterocycles'
COMBIGLIDE_DEFAULT = 'combiglide_default'
OPTOELECTRONICS = 'optoelectronics'
ALL = 'all'
MMSHARE_MAIN_DATA = hunt('mmshare', dir='data')
FRAGMENT_LIBS = {
ORGANIC: os.path.join(MMSHARE_MAIN_DATA, 'res', 'organic.bld'),
N_HETEROCYCLES: os.path.join(MMSHARE_MAIN_DATA, 'res',
'N-heterocycles.bld'),
O_HETEROCYCLES: os.path.join(MMSHARE_MAIN_DATA, 'res',
'O-heterocycles.bld'),
S_HETEROCYCLES: os.path.join(MMSHARE_MAIN_DATA, 'res',
'S-heterocycles.bld'),
MIXED_HETEROCYCLES: os.path.join(MMSHARE_MAIN_DATA, 'res',
'Mixed-heterocycles.bld'),
COMBIGLIDE_DEFAULT: os.path.join(MMSHARE_MAIN_DATA, 'cg',
'interactive_palettes',
'default_palette.mae'),
OPTOELECTRONICS: os.path.join(MMSHARE_MAIN_DATA, 'genetic_optimization',
'optoelectronics.mae')
}
FRAGMENT_LIBS_DEFAULT = [OPTOELECTRONICS]
ENTRY_NAME_KEY = 's_m_entry_name'
GROW_NAME_KEY = 's_m_grow_name'
PDB_ATOM_NAME_KEY = 's_m_pdb_atom_name'
PDB_RES_NAME_KEY = 's_m_pdb_residue_name'
CROSSOVER_PARENTS_KEY = 's_matsci_crossover_parents'
CROSSOVER_APPLIED_KEY = 's_matsci_crossover_applied'
MUTATION_PARENT_KEY = 's_matsci_mutation_parent'
MUTATION_APPLIED_KEY = 's_matsci_mutation_applied'
EVALUATOR_JOBS_DIR = 'evaluator_jobs'
GENER_SUBDIR = 'generation_'
NUM_DECIMAL = '%.2f'
FIELD_WIDTH = 10
# this is used to handle a ZeroDivisionError
INFINITE_SCORE = 1000000000
# allow setting a bad score for both failed and skipped jobs
BAD_SCORE = -10000000
RXN_KEY = rxn_channel.Products.RXN_REPRESENTATION_KEY
SMALL_CHILD_FREQ = 0.75
SIMILAR_STDEV_CHILDREN_FREQ = 0.75
FILE_BASE_NAME = 'genopt'
NO_OPEN_SHELL = False
TERM_THRESH = 0.0001
# penalize data
NONE = 'none'
ATYPICAL = 'atypical'
ATYPICAL_PATTERNS = dict([
('three-atom or longer chains containing N, O, P, and S', \
'[$([#7,#8,#15,#16][#7,#8,#15,#16][#7,#8,#15,#16])]'),
('two or more different halogens', 'F.Cl_F.Br_F.I_Cl.Br_Cl.I_Br.I')
])
ATYP_MW_TARGET = 1100
ATYP_MW_WEIGHT = 1.0
ATYP_NAT_TARGET = 200
ATYP_NAT_WEIGHT = 1.0
ATYP_NEL_TARGET = 5
ATYP_NEL_WEIGHT = 15
ATYPICAL_PROPS = [
'name=natoms target=%d comparator=lt weight=%.1f' %
(ATYP_NAT_TARGET, ATYP_NAT_WEIGHT),
'name=molecular_weight target=%d comparator=lt weight=%.1f' %
(ATYP_MW_TARGET, ATYP_MW_WEIGHT),
'name=nelements target=%d comparator=lt weight=%.1f' %
(ATYP_NEL_TARGET, ATYP_NEL_WEIGHT),
'name=smarts patterns=%s minimax=min weight=15.0' % \
'_'.join(ATYPICAL_PATTERNS.values())
]
DIHYDROGEN = 'dihydrogen'
DIHYDROGEN_PATTERNS = dict([('dihydrogen molecule', '[H][H]')])
DIHYDROGEN_PROPS = ['name=smarts patterns=%s minimax=min weight=100.0' % \
'_'.join(list(DIHYDROGEN_PATTERNS.values()))]
PENALIZE_PROTOCOLS = dict([(ATYPICAL, (ATYPICAL_PATTERNS, ATYPICAL_PROPS)),
(DIHYDROGEN, (DIHYDROGEN_PATTERNS, DIHYDROGEN_PROPS))
])
PENALIZE_CHOICES = [NONE, ATYPICAL, DIHYDROGEN]
PENALIZE_DEFAULT = [ATYPICAL, DIHYDROGEN]
# base evaluations of structures with structure scores below this threshold
# will be skipped
STRUCTURE_SCORE_THRESHOLD = -50.0
# list of file extensions, files created by the base evaluator with these
# extensions will be copied back to the launch host as log files
EXTS_TO_RETURN = ['.spm']
STOICH_BASE_EXT_SEP = '.'
CONFORMATIONAL_SEARCH = False
CONF_SEARCH_FAILED_KEY = 'b_matsci_Conf_Search_Failed'
# see MATSCI-1988 and MMOD-1809, zero is reserved
CONF_SEARCH_SEED_RANGE = [1, 78593]
GENER_LOG_FILE_SEP = '-'
REMAINDER = 'remainder'
PREVIOUS = 'previous'
FRAGMENT = 'fragment'
FREEZER_CHOICES = [REMAINDER, PREVIOUS, FRAGMENT]
FREEZERS_DEFAULT = [REMAINDER, PREVIOUS]
FROM_FREEZER_KEY = 's_matsci_From_Freezer'
REMAINDER_FILE_EXT = '-remainder.mae'
NO_CHILD = 'no_child'
BAD_STRUCTURE = 'bad_structure'
INOCULATE_CHOICES = [NONE, NO_CHILD, BAD_STRUCTURE]
INOCULATE_DEFAULT = [NO_CHILD, BAD_STRUCTURE]
INOCULATE_KEY = 's_matsci_Inoculate'
TRIES_FROM_LIBS = 3
TRIES_FRAGMENT_MUTATOR = 3
EVALUATOR_RELATIVE_DIR = [os.curdir] + 3 * [os.pardir]
CM3_PER_BOHR3 = (100 * constants.physical_constants['Bohr radius'][0])**3
VACUUM_PERMITTIVITY_AU = 1. / (4 * constants.pi) # atomic units
PropertyInfo = namedtuple(
'PropertyInfo',
['key', 'units', 'is_positive', 'class_evaluator', 'class_kwargs'])
EV = 'eV'
NM = 'nm'
NM_X_INTENSITY = 'nm*Intensity'
ANGSTROM = 'Ang.'
NAME_PATTERN = re.compile(r'(name=){1}(\w+)')
OXIDATION_PROP = optoelectronics.OXIDATION
REDUCTION_PROP = optoelectronics.REDUCTION
HOLE_PROP = optoelectronics.HOLE_REORG
ELECTRON_PROP = optoelectronics.ELECTRON_REORG
TRIPLET_BASE_PROP = optoelectronics.TRIPLET
TRIPLET_PROP = TRIPLET_BASE_PROP
TRIPLET_RMSD_PROP = TRIPLET_BASE_PROP + '_rmsd'
TRIPLET_REORG_PROP = optoelectronics.TRIPLET_REORG
SPECTRUM_BASE_PROP = optoelectronics.WINDOW
SPECTRUM_LMAX_PROP = SPECTRUM_BASE_PROP + '_lmax'
SPECTRUM_RAREA_PROP = SPECTRUM_BASE_PROP + '_rarea'
SPECTRUM_GAREA_PROP = SPECTRUM_BASE_PROP + '_garea'
SPECTRUM_BAREA_PROP = SPECTRUM_BASE_PROP + '_barea'
TADF_BASE_PROP = optoelectronics.TADF # TADF is thermally activated delayed fluorescence
TADF_T1_PROP = TADF_BASE_PROP
TADF_T2_PROP = TADF_BASE_PROP.replace('t1', 't2')
TADF_T3_PROP = TADF_BASE_PROP.replace('t1', 't3')
FL_EMISSION_BASE_PROP = optoelectronics.FL_EMISSION
FL_EMISSION_EMAX_PROP = FL_EMISSION_BASE_PROP + '_emax'
FL_EMISSION_RAREA_PROP = FL_EMISSION_BASE_PROP + '_rarea'
FL_EMISSION_GAREA_PROP = FL_EMISSION_BASE_PROP + '_garea'
FL_EMISSION_BAREA_PROP = FL_EMISSION_BASE_PROP + '_barea'
FL_EMISSION_STOKES_SHIFT_PROP = FL_EMISSION_BASE_PROP + '_stokes_shift'
def _monomer_evaluate_smarts_canvas(st, pattern):
"""
Wrapper for doing Canvas SMARTS evaluation of polymer monomers.
:type st: schrodinger.structure.Structure
:param st: the structure
:type pattern: str
:param pattern: the pattern
:rtype: list
:return: contains lists of matching indices
"""
# not sure if the following is a bug or a feature but it is undesirable
# for this application, it is observed that if the given structure
# is a polymer monomer containing head and tail atoms marked with
# msprops.ROLE_PROP and the given pattern matches the tail
# atom then if there is a hydrogen bonded to the tail atom it will
# also be picked up as a match even though it is not a real match,
# fix it by passing a copy of the structure without the properties
stp = st.copy()
rxn_channel.remove_grow_properties(stp)
return analyze.evaluate_smarts_canvas(stp, pattern)
[docs]class ClassEvaluator:
"""
Manage a class evaluator.
"""
SEPARATOR = '_'
HOST_STR = 'host_str'
[docs] def __init__(self, structs, properties):
"""
Create an instance.
:type structs: list of schrodinger.structure.Structure
:param structs: contains input structures
:type properties: list
:param properties: contains Property
"""
self.structs = structs
self.properties = properties
self.job_dj = self.getQueue(self.properties)
[docs] def getBaseName(self, struct, aproperty):
"""
Get the base name.
:type struct: schrodinger.structure.Structure
:param struct: the structure
:type aproperty: Property
:param aproperty: the property
:rtype: str
:return: the base name
"""
return struct.title + self.SEPARATOR + aproperty.name
[docs] def runIt(self):
"""
Run it.
:raise RuntimeError: for any issue
"""
# needs to be reimplemented, raise a RuntimeError for any issue
[docs] def setQueue(self, job_dj):
"""
Set a JobDJ to run jobs.
:type job_dj: `schrodinger.job.queue.JobDJ`
:param job_dj: the queue
"""
self.job_dj = job_dj
[docs] def getQueue(self, properties):
"""
Return a JobDJ to run jobs.
:type properties: list
:param properties: contains Property
:rtype: JobDJ
:return: the queue
"""
job_dj = getattr(self, 'job_dj', None)
if job_dj:
return job_dj
# the -HOST string has been saved in each of the properties
# class kwargs because the -HOST string of the main GO job
# will have been removed from the command line by the time
# the tasks infrastructure starts the parallel tasks, these
# properties will always have the same -HOST string so just
# use the first property, the number of simultaneous
# subjobs, aka the number of processors, in the -HOST string
# is used for both (1) the number of parallel tasks (which is
# the number of molecules that are simultaneously scored) and (2)
# the number of simultaneous subjobs of the JobDJ available to
# each task or molecule which is used to run the subjobs needed
# to compute the molecule's score, the host in the -HOST string
# sets only the host used for the JobDJ in (2) since tasks
# always run on localhost
if not properties:
return None
host_str = properties[0].class_kwargs[self.HOST_STR]
return jobutils.create_queue(host=host_str)
[docs] @staticmethod
def getHostStr(host_str=None):
"""
Return the host string.
:type host_str: str
:param host_str: the host string, for example 'localhost:4'
:rtype: str
:return: the host string
"""
return host_str or f'{queue.LOCALHOST_ENTRY_NAME}:1'
[docs]class StructureEvaluator(ClassEvaluator):
"""
Manage structure evaluation.
"""
SMARTS_PATTERN_SEPARATOR = '_'
SMARTS_PROP = 'smarts'
MOL_WEIGHT_PROP = 'molecular_weight'
NATOMS_PROP = 'natoms'
NELEMENTS_PROP = 'nelements'
PROPERTIES = {SMARTS_PROP, MOL_WEIGHT_PROP, NATOMS_PROP, NELEMENTS_PROP}
SMARTS_KEY = 'i_matsci_SMARTS_property_%s'
MOL_WEIGHT_KEY = 'r_m_Molecular_weight'
NATOMS_KEY = 'i_m_Number_of_atoms'
NELEMENTS_KEY = 'i_m_Number_of_elements'
NO_UNITS = 'None'
MOL_WEIGHT_UNITS = 'g/mol'
PATTERNS = 'patterns'
[docs] def __init__(self, structs, properties):
"""
See parent class for documentation.
"""
super(StructureEvaluator, self).__init__(structs, properties)
self.structure_properties = [
aproperty for aproperty in self.properties
if aproperty.name in self.PROPERTIES
]
[docs] @staticmethod
def getInfo(key, units, patterns=None, host_str=None):
"""
Return a PropertyInfo.
:type key: str
:param key: the property key
:type units: str
:param units: the property units
:type patterns: list
:param patterns: the SMARTS patterns
:type host_str: str
:param host_str: the host string, for example 'localhost:4'
:rtype: PropertyInfo
:return: the property information
"""
host_str = ClassEvaluator.getHostStr(host_str=host_str)
obj = StructureEvaluator
class_kwargs = {}
if patterns:
class_kwargs[obj.PATTERNS] = patterns
class_kwargs[obj.HOST_STR] = host_str
return PropertyInfo(key=key,
units=units,
is_positive=True,
class_evaluator=obj,
class_kwargs=class_kwargs)
[docs] def runIt(self):
"""
Run it.
"""
for struct in self.structs:
for aproperty in self.structure_properties:
if aproperty.name == StructureEvaluator.SMARTS_PROP:
value = sum(
len(_monomer_evaluate_smarts_canvas(struct, pattern))
for pattern in aproperty.class_kwargs[self.PATTERNS])
elif aproperty.name == StructureEvaluator.MOL_WEIGHT_PROP:
value = struct.total_weight
elif aproperty.name == StructureEvaluator.NATOMS_PROP:
value = struct.atom_total
elif aproperty.name == StructureEvaluator.NELEMENTS_PROP:
value = len(get_element_histogram(struct))
struct.property[aproperty.key] = value
[docs]class CanvasKPLS(ChemInformatics):
"""
Manage Canvas KPLS jobs.
"""
EXT = 'kpls.tar.gz'
FP_TEXT_FILE = 'fpInfo.txt'
MODEL_OPTION = 'kpls_model'
KEY = 'r_matsci_KPLS_%s/%s'
PATH = os.path.join(MMSHARE_MAIN_DATA, 'genetic_optimization',
'canvas_kpls_models')
FP_EXT = '.fp'
VALUE_PATTERN = re.compile(r'\s*1\s+unknown.*\s+(-?\d+\.?\d*)$')
ALLOWED_FP_TYPES = [
'linear', 'maccs', 'radial', 'molprint2D', 'torsion', 'pairwise',
'triplet', 'quartet', 'dendritic'
]
# precision is either 32- or 64-bit
SINGLE_PRECISION = 32
DOUBLE_PRECISION = 64
BIT_EXT = '-bit'
LABEL = 'canvasKPLS'
CHECK_MSG = (' Model files must correspond to a single property type '
'and as input take fingerprint files featuring one of '
f'the following fingerprint types, {ALLOWED_FP_TYPES}, '
'as well as either no atom type or one of the 12 known atom '
'types (integers in [1, 12]). See '
'$SCHRODINGER/utilities/canvasFPGen -h for more details.')
[docs] def makeFingerPrintInfile(self, mae_infile, name, job_dj=None):
"""
Make fingerprint infile or add the job to do so to the given
queue.
:type mae_infile: str
:param mae_infile: the Maestro input file name
:type name: str
:param name: the property name
:type job_dj: None or `schrodinger.job.queue.JobDJ`
:param job_dj: if given then add the fingerprint job to this
queue and return
:raise RuntimeError: if canvasFPGen fails
:rtype: str
:return: the Canvas fingerprint input file name
"""
def finalizer(job):
if not os.path.isfile(job.fp_infile):
raise RuntimeError('canvasFPGen failed')
precision, fptype, atomtypes = self.fp_options_dict[name]
basename = mae_infile.split(IN_MAE_EXT)[0]
fp_infile = basename + self.FP_EXT
job_name = f'{basename}_canvasFPGen'
cmd = ['canvasFPGen']
if precision == self.DOUBLE_PRECISION:
cmd += ['-xp']
cmd += ['-fptype', fptype]
if atomtypes is not None:
cmd += ['-atomtype', str(atomtypes)]
cmd += ['-imae', mae_infile]
cmd += ['-o', fp_infile]
if job_dj:
# This will run the canvas job in the backend, which we don't want
# for subprocess
cmd += ['-JOB', job_name]
jc_job = jobutils.RobustSubmissionJob(cmd, name=job_name)
jc_job.fp_infile = fp_infile
jc_job.addFinalizer(finalizer)
job_dj.addJob(jc_job)
else:
try:
output = subprocess.check_output(cmd)
except subprocess.CalledProcessError as err:
raise RuntimeError('canvasFPGen failed: ' + str(err))
if not os.path.isfile(fp_infile):
raise RuntimeError('canvasFPGen failed to produce output file')
return fp_infile
[docs] def getPropertyValue(self, property_outfile, prop_key=None):
"""
See parent class.
"""
super().getPropertyValue(property_outfile, prop_key=prop_key)
with open(property_outfile, 'r') as afile:
for line in afile:
match = re.match(self.VALUE_PATTERN, line)
if match:
value = float(match.groups()[0])
break
else:
value = None
if value is None:
msg = ('The property value can not be found in the '
f'{self.LABEL} property output file.')
raise RuntimeError(msg)
return value
[docs] def makeFpOptionsDict(self):
"""
Make the fingerprint options dictionary.
"""
self.fp_options_dict = {}
for aproperty in self.cheminfo_properties:
model_infile = self.getModelFile(aproperty)
precision, fptype, atomtypes = CanvasKPLS.getFpOptions(model_infile)
self.fp_options_dict[aproperty.name] = (precision, fptype,
atomtypes)
[docs] def getCmd(self, mae_infile, model_infile, prop_name, xtra_infile,
property_outfile, job_name):
"""
See parent class.
"""
cmd = ['canvasKPLS']
cmd += ['-infp', xtra_infile]
cmd += ['-out', property_outfile]
cmd += ['-test', '-unknown', '-imod', model_infile]
cmd += ['-JOB', job_name]
return cmd
[docs] def setUp(self):
"""
See parent class.
"""
self.makeFpOptionsDict()
[docs] @staticmethod
def getFpOptions(model_file):
"""
Return fingerprint options obtained from the given
Canvas KPLS model file.
:type model_file: str
:param model_file: the name of the Canvas KPLS model file
:rtype: int, str, int or None
:return: contains (1) precision, (2) fingerprint
type, and (3) atom type if present
:raise RuntimeError: if there is anything wrong with the
Canvas KPLS model file
"""
# get contents of fingerprint information file
if not tarfile.is_tarfile(model_file):
msg = ('The Canvas KPLS model file, %s, is not '
'a valid tar archive file.' % model_file)
raise RuntimeError(msg)
try:
with tarfile.open(name=model_file) as tar:
fh = tar.extractfile(CanvasKPLS.FP_TEXT_FILE)
# see MATSCI-5226 where Python 3 bytes decoding is needed
lines = [
line.decode('utf-8').strip('\n') for line in fh.readlines()
]
fh.close()
except KeyError:
msg = ('The Canvas KPLS model file, %s, does not '
'contain the fingerprint information file, %s.' %
(model_file, CanvasKPLS.FP_TEXT_FILE))
raise RuntimeError(msg)
# build fingerprint information dict
fp_info_dict = {'path': lines[0]}
for line in lines[1:]:
if line.count('=') == 1:
key, value = line.split('=')
try:
value = int(value)
except ValueError:
pass
fp_info_dict[key] = value
elif line.endswith(CanvasKPLS.BIT_EXT):
fp_info_dict['precision'] = int(
line.split(CanvasKPLS.BIT_EXT)[0])
else:
fp_info_dict['properties'] = [
prop.strip() for prop in line.split(',')
]
# get data
fptype = fp_info_dict.get('type')
atomtypes = fp_info_dict.get('code')
precision = fp_info_dict.get('precision')
props = fp_info_dict.get('properties', [])
# check data
msg = None
error_msg = ('The Canvas KPLS model file, %s, can not be used '
'for genetic optimization because something is wrong '
'with the %s file. ' %
(model_file, CanvasKPLS.FP_TEXT_FILE))
if fptype not in CanvasKPLS.ALLOWED_FP_TYPES:
msg = error_msg + ('The fingerprint type, %s, is not one '
'of the following supported types: %s.' %
(fptype, CanvasKPLS.ALLOWED_FP_TYPES))
elif atomtypes is not None and not (1 <= atomtypes <= 12):
msg = error_msg + ('The atom type, %s, is not one of '
'the 12 standard types, i.e. in [1, 12].' %
atomtypes)
elif precision is None or precision not in [
CanvasKPLS.SINGLE_PRECISION, CanvasKPLS.DOUBLE_PRECISION
]:
msg = error_msg + ('The precision, %s, is neither %s nor %s.' %
(precision, CanvasKPLS.SINGLE_PRECISION,
CanvasKPLS.DOUBLE_PRECISION))
elif len(props) != 1:
msg = error_msg + ('More than a single property has been '
'specified: %s.' % props)
if msg:
raise RuntimeError(msg)
return precision, fptype, atomtypes
[docs]class AutoQSAR(ChemInformatics):
"""
Manage AutoQSAR jobs.
"""
EXT = 'qzip'
MODEL_OPTION = 'auto_qsar_model'
KEY = 'r_matsci_Auto_QSAR_%s/%s'
PROP_BASE_NAME = 'r_autoqsar_Pred_{prop_name}'
IN_EXT = '.inp'
LABEL = 'autoqsar'
[docs] def writeAutoQSARInFile(self, mae_infile, model_infile, base_name,
prop_name):
"""
Write the AutoQSAR input file.
:type mae_infile: str
:param mae_infile: the input Maestro file name
:type model_infile: str
:param model_infile: the input AutoQSAR model file name
:type base_name: str
:param base_name: the base name to use for the AutoQSAR
input file
:type prop_name: str
:param prop_name: the property name
:rtype: str
:return: the name of the AutoQSAR input file
"""
file_name = f'{base_name}{self.IN_EXT}'
with open(file_name, 'w') as afile:
afile.write(f'LIGANDS_FILE {mae_infile}')
afile.write(f'MODEL_FILE {model_infile}')
afile.write('MODEL_SET -best,')
afile.write(f'OUT_PRED_PROP_BASE {prop_name}')
return file_name
[docs] def getCmd(self, mae_infile, model_infile, prop_name, xtra_infile,
property_outfile, job_name):
"""
See parent class.
"""
cmd = ['autoqsar', model_infile, '-test']
cmd += ['-i', mae_infile]
cmd += ['-pred', property_outfile]
cmd += ['-best', '-unknown']
cmd += ['-prop', prop_name]
cmd += ['-JOB', job_name]
return cmd
[docs]class DeepChem(ChemInformatics):
"""
Manage DeepChem jobs.
"""
EXT = 'qzip'
MODEL_OPTION = 'deepchem_model'
KEY = 'r_matsci_DeepChem_%s/%s'
PROP_BASE_NAME = 'r_m_{prop_name}_score'
LABEL = 'deepchem'
[docs] def getCmd(self, mae_infile, model_infile, prop_name, xtra_infile,
property_outfile, job_name):
"""
See parent class.
"""
cmd = ['ligand_ml_driver.py', model_infile, '-predict']
cmd += ['-i', mae_infile]
cmd += ['-pred', property_outfile]
cmd += ['-prop', prop_name]
cmd += ['-JOB', job_name]
return cmd
[docs]class Jaguar(ClassEvaluator):
"""
Manage Jaguar jobs.
"""
JAGUAR_OPTIONS = 'jaguar_options'
TPP = 'tpp'
JAGUAR_OUTPUT_ATTR = 'jaguar_output_attr'
IN_EXT = '.in'
OUT_EXT = '.out'
[docs] def __init__(self, structs, properties):
"""
See parent class for documentation.
"""
super(Jaguar, self).__init__(structs, properties)
self.jaguar_properties = [
aproperty for aproperty in self.properties
if aproperty.class_kwargs.get(self.JAGUAR_OPTIONS)
]
[docs] def loadQueue(self, job_dj):
"""
Load the JobDJ with jobs.
:type job_dj: JobDJ
:param job_dj: the queue
"""
base_cmd = ['jaguar', 'run']
for struct in self.structs:
kwargs = {'structure': struct}
for aproperty in self.jaguar_properties:
ji = JaguarInput(**kwargs)
ji.setValues(aproperty.class_kwargs[self.JAGUAR_OPTIONS])
ji.setStructure(struct, set_molchg_multip=False)
if ji.sectionDefined('valid'):
ji.deleteSection('valid')
base_name = self.getBaseName(struct, aproperty)
jag_in_file = base_name + self.IN_EXT
ji.saveAs(jag_in_file)
cmd = list(base_cmd)
tpp = aproperty.class_kwargs.get(self.TPP, 1)
if tpp > 1:
cmd.extend([parserutils.FLAG_TPP, str(tpp)])
cmd.append(jag_in_file)
jc_job = jobutils.RobustSubmissionJob(cmd, name=base_name)
job_dj.addJob(jc_job)
[docs] def postProcess(self, silent=False):
"""
Post process the jobs and set the final results.
:type silent: bool
:param silent: if True, don't raise.
:raise RuntimeError: if there is a problem
"""
for struct in self.structs:
for aproperty in self.jaguar_properties:
jag_out_file = self.getBaseName(struct,
aproperty) + Jaguar.OUT_EXT
if not os.path.exists(jag_out_file):
if silent:
continue
msg = ('Jaguar job output file can not be found.')
raise RuntimeError(msg)
jo = JaguarOutput(jag_out_file)
if jo.status != JaguarOutput.OK or jo.fatal_error:
if silent:
continue
if jo.fatal_error:
err = jo.fatal_error.strip().replace('\n', '')
else:
err = str(jo.status)
msg = (f'Jaguar job died with the error: {err}.')
raise RuntimeError(msg.format())
attr = aproperty.class_kwargs.get(self.JAGUAR_OUTPUT_ATTR)
if attr:
struct.property[aproperty.key] = getattr(jo, attr)
[docs] def runIt(self, silent=False):
"""
Run it.
:type silent: bool
:param silent: if True, don't raise.
:raise RuntimeError: if there is a problem
"""
self.loadQueue(self.job_dj)
self.job_dj.run()
if not silent and (not self.job_dj.isComplete() or
self.job_dj.total_failed):
msg = ('Jaguar job has failed.')
raise RuntimeError(msg)
self.postProcess(silent=silent)
[docs]class GlassTransitionTemperature(CanvasKPLS):
"""
Manage glass transition temperature jobs.
"""
UNITS = 'C'
KEY = 'r_matsci_KPLS_Tg/{units}'.format(units=UNITS)
PROP = 'kpls_tg'
FILE = 'Tg250.kpls.tar.gz'
[docs] @staticmethod
def getInfo(host_str=None):
"""
Return a PropertyInfo.
:type host_str: str
:param host_str: the host string, for example 'localhost:4'
:rtype: PropertyInfo
:return: the property information
"""
host_str = ClassEvaluator.getHostStr(host_str=host_str)
obj = GlassTransitionTemperature
afile = os.path.join(obj.PATH, obj.FILE)
class_kwargs = {}
class_kwargs[obj.MODEL_OPTION] = afile
class_kwargs[obj.HOST_STR] = host_str
return PropertyInfo(key=obj.KEY,
units=obj.UNITS,
is_positive=False,
class_evaluator=obj,
class_kwargs=class_kwargs)
[docs]class RefractiveIndex(CanvasKPLS, Jaguar):
"""
Manage refractive index jobs.
"""
UNITS = 'none'
KEY = 'r_matsci_Refractive_Index_298K/{units}'.format(units=UNITS)
PROP = 'refractive_index'
FILE = '01a_kpls5_amorphous_density_HT.kpls.tar.gz'
MASS_DENSITY_KEY = 'r_matsci_Mass_Density/g/cm^3'
ISOTROPIC_POLARIZABILITY_KEY = 'r_matsci_Isotropic_Polarizability/bohr^3'
[docs] def __init__(self, structs, properties):
"""
See parent classes for documentation.
"""
CanvasKPLS.__init__(self, structs, properties)
Jaguar.__init__(self, structs, properties)
[docs] @staticmethod
def getInfo(jaguar_options=None, tpp=1, host_str=None):
"""
Return a PropertyInfo.
:type jaguar_options: dict
:param jaguar_options: contain Jaguar options
:type tpp: int
:param tpp: the threads per process
:type host_str: str
:param host_str: the host string, for example 'localhost:4'
:rtype: PropertyInfo
:return: the property information
"""
host_str = ClassEvaluator.getHostStr(host_str=host_str)
if not jaguar_options:
jaguar_options = {}
options = jaguar_options.copy()
options['ipolar'] = -1
obj = RefractiveIndex
afile = os.path.join(obj.PATH, obj.FILE)
class_kwargs = {}
class_kwargs[obj.MODEL_OPTION] = afile
class_kwargs[obj.JAGUAR_OPTIONS] = options
class_kwargs[obj.TPP] = tpp
class_kwargs[obj.JAGUAR_OUTPUT_ATTR] = 'polar_alpha'
class_kwargs[obj.HOST_STR] = host_str
return PropertyInfo(key=obj.KEY,
units=obj.UNITS,
is_positive=False,
class_evaluator=obj,
class_kwargs=class_kwargs)
[docs] @classmethod
def setRefractiveIndexProperty(cls, struct, mass_density, polarizability):
"""
Set the refractive index property on the given structure.
:type struct: schrodinger.structure.Structure
:param struct: the structure
:type mass_density: float
:param mass_density: the mass density in g/cm^3
:type polarizability: float
:param polarizability: the isotropically averaged polarizability
in atomic units of bohr^3
:raise ValueError: Abnormal small total_weight or large polarizability
"""
# the equations used to compute the refractive index are taken from
# https://chemrxiv.org/articles/Combining_First-Principles_and_ \
# Data_Modeling_for_the_Accurate_Prediction_of_the_Refractive_ \
# Index_of_Organic_Polymers/5446564
number_density = mass_density * constants.N_A / struct.total_weight
number_density *= CM3_PER_BOHR3 # particles/bohr^3
ratio = polarizability * number_density / (3. * VACUUM_PERMITTIVITY_AU)
refractive_index = math.sqrt((1 + 2 * ratio) / (1 - ratio))
struct.property[cls.KEY] = refractive_index
struct.property[cls.MASS_DENSITY_KEY] = mass_density
struct.property[cls.ISOTROPIC_POLARIZABILITY_KEY] = polarizability
[docs] def runIt(self):
"""
Run it.
"""
# in turn the mass densities and polarizabilities from the Canvas KPLS
# and Jaguar jobs are stored as the refractive index structure property
# as this method works by assembling the final property value using the
# results from intermediate calculations
CanvasKPLS.runIt(self)
mass_densities = [struct.property[self.KEY] for struct in self.structs]
Jaguar.runIt(self)
polarizabilities = [
struct.property[self.KEY] for struct in self.structs
]
data = zip(self.structs, mass_densities, polarizabilities)
for struct, mass_density, polarizability in data:
self.setRefractiveIndexProperty(struct, mass_density,
polarizability)
[docs]class CustomClassEvaluator(ClassEvaluator):
"""
Manage a custom class evaluator.
"""
UNITS = 'unknown'
KEY = 'r_matsci_{name}/{units}'
CUSTOM_CLASS_FILE_OPTION = 'custom_class_file'
# define some dictionaries, since the RUN_DICT is the only
# one that must have all keys defined do not use a name space
# object
UNITS_DICT = {'stub': UNITS}
RUN_DICT = {'stub': 'runStub'}
# files specified here should be pathless and located in the same
# directory as the python file defining the subclass of this class
EXTRA_INPUT_FILES_DICT = {'stub': []}
[docs] def __init__(self, structs, properties):
"""
See parent class for documentation.
"""
super().__init__(structs, properties)
self.custom_properties = [
aproperty for aproperty in self.properties
if aproperty.class_kwargs.get(self.CUSTOM_CLASS_FILE_OPTION)
]
# the API of ClassEvaluator is general and takes multiple
# structures however when used in genetic optimization only
# a single structure is passed because property evaluation
# of multiple structures is done in parallel and handled
# elsewhere, to be more clear in this custom class offer
# a new attribute
self.st = self.structs[0]
[docs] @staticmethod
def getInfo(name, units, custom_class_file, tpp=1, host_str=None):
"""
Return a PropertyInfo.
:type name: str
:param name: the property name
:type units: str
:param units: the property units
:type custom_class_file: str
:param custom_class_file: the Python file containing the custom class
:type tpp: int
:param tpp: the threads per process
:type host_str: str
:param host_str: the host string, for example 'localhost:4'
:rtype: PropertyInfo
:return: the property information
"""
host_str = ClassEvaluator.getHostStr(host_str=host_str)
CustomClassEvaluator.checkModule(custom_class_file, name)
custom_module = CustomClassEvaluator.getModule(custom_class_file)
custom_class = custom_module.CustomClassEvaluator
if units is None:
units = custom_class.UNITS_DICT.get(name,
CustomClassEvaluator.UNITS)
key = CustomClassEvaluator.KEY.format(name=name, units=units)
class_kwargs = {}
class_kwargs[
CustomClassEvaluator.CUSTOM_CLASS_FILE_OPTION] = custom_class_file
class_kwargs[Jaguar.TPP] = tpp
class_kwargs[CustomClassEvaluator.HOST_STR] = host_str
return PropertyInfo(key=key,
units=units,
is_positive=False,
class_evaluator=custom_class,
class_kwargs=class_kwargs)
[docs] @staticmethod
def getModule(path):
"""
Return the module from the given path.
:type path: str
:param path: the path to the module python file
:raise RuntimeError: if there is a problem
:rtype: object
:return: the module
"""
if not os.path.exists(path):
raise RuntimeError(f'The python file {path} can not be found.')
# note that imputils.import_module_from_file caches the import,
# likely multiple exception types so catch the bare exception
try:
custom_module = imputils.import_module_from_file(path)
except Exception as err:
raise RuntimeError(f'The python file {path} can not be '
f'imported: {str(err)}')
return custom_module
[docs] @staticmethod
def checkDict(path, dict_name):
"""
Check dictionary.
:type path: str
:param path: the path to the module python file
:type dict_name: str
:param dict_name: the name of the dictionary to check
:raise RuntimeError: if there is a problem
"""
custom_module = CustomClassEvaluator.getModule(path)
custom_class = custom_module.CustomClassEvaluator
adict = getattr(custom_class, dict_name)
if not isinstance(adict, dict):
raise RuntimeError(
f'The {dict_name} for the CustomClassEvaluator class in the '
f'python file {path} must be a python dictionary.')
[docs] @staticmethod
def checkModule(path, name):
"""
Check the module.
:type path: str
:param path: the path to the module python file
:type name: str or None
:param name: if given the name of the property using this module
will be checked against the module
:raise RuntimeError: if there is a problem
"""
# check class
custom_module = CustomClassEvaluator.getModule(path)
custom_class = getattr(custom_module, 'CustomClassEvaluator', None)
if not custom_class:
raise RuntimeError(f'The python file {path} is missing the '
'CustomClassEvaluator class.')
if CustomClassEvaluator not in custom_class.__bases__:
raise RuntimeError(
'The CustomClassEvaluator class in the '
f'python file {path} must be a subclass of go.CustomClassEvaluator.'
)
# check units dict
CustomClassEvaluator.checkDict(path, 'UNITS_DICT')
# check run dict
CustomClassEvaluator.checkDict(path, 'RUN_DICT')
run_dict = custom_class.RUN_DICT
run_dict.pop('stub', None)
if not run_dict:
raise RuntimeError(
'The CustomClassEvaluator class in the '
f'python file {path} must define the RUN_DICT dictionary.')
if name and name not in run_dict:
raise RuntimeError(
'The RUN_DICT for the CustomClassEvaluator '
f'class in the python file {path} must map the property name '
f'{name} to a run method that calculates this property.')
for value in run_dict.values():
method = getattr(custom_class, value, None)
if not method:
raise RuntimeError(
'The CustomClassEvaluator class in the '
f'python file {path} must define the {value} method.')
if type(method).__name__ != 'function':
raise RuntimeError(
f'The {value} method defined for the CustomClassEvaluator '
f'class in the python file {path} must be a python function.'
)
# check extra input files dict
CustomClassEvaluator.checkDict(path, 'EXTRA_INPUT_FILES_DICT')
if name:
files = CustomClassEvaluator.getExtraInputFiles(path, name)
for afile in files:
if not os.path.exists(afile):
raise RuntimeError(
f'The extra input file {afile} can not be '
'found.')
[docs] def runStub(self, st, job_dj, tpp):
"""
Run stub.
:type st: schrodinger.structure.Structure
:param st: the structure
:type job_dj: JobDJ
:param job_dj: the queue
:type tpp: int
:param tpp: the threads per process
:raise RuntimeError: if there is a problem
:rtype: float
:return: the property value
"""
raise RuntimeError('Stub function should not be used.')
[docs] def runIt(self):
"""
Run it.
:raise RuntimeError: if there is a problem
"""
for aproperty in self.custom_properties:
run_method = self.RUN_DICT.get(aproperty.name, None)
if run_method:
run_method = getattr(self, run_method, None)
if not run_method:
raise RuntimeError(
'Use the RUN_DICT constant of this class to '
'define the name of the run method of this class for property '
f'{aproperty.name}, for example by defining a key string '
f'"{aproperty.name}" and a value string '
f'"run{aproperty.name.capitalize()}". Additionally method '
'by this name must be defined that runs the property '
'evaluation and returns the property.')
tpp = aproperty.class_kwargs.get(Jaguar.TPP, 1)
value = run_method(self.st, self.job_dj, tpp)
self.st.property[aproperty.key] = value
# yapf: disable
PROPERTIES = (
StructureEvaluator.SMARTS_PROP,
StructureEvaluator.MOL_WEIGHT_PROP,
StructureEvaluator.NATOMS_PROP,
StructureEvaluator.NELEMENTS_PROP,
OXIDATION_PROP,
REDUCTION_PROP,
HOLE_PROP,
ELECTRON_PROP,
TRIPLET_PROP,
TRIPLET_RMSD_PROP,
TRIPLET_REORG_PROP,
TADF_T1_PROP,
TADF_T2_PROP,
TADF_T3_PROP,
SPECTRUM_LMAX_PROP,
SPECTRUM_RAREA_PROP,
SPECTRUM_GAREA_PROP,
SPECTRUM_BAREA_PROP,
FL_EMISSION_EMAX_PROP,
FL_EMISSION_RAREA_PROP,
FL_EMISSION_GAREA_PROP,
FL_EMISSION_BAREA_PROP,
FL_EMISSION_STOKES_SHIFT_PROP,
GlassTransitionTemperature.PROP,
RefractiveIndex.PROP
)
# yapf: enable
[docs]def get_script_property_info_dict(host_str=None):
"""
Return a (name, PropertyInfo) dict for script based properties.
:type host_str: str
:param host_str: the host string, for example 'localhost:4'
:rtype: dict
:return: contains (name, PropertyInfo)
"""
host_str = ClassEvaluator.getHostStr(host_str=host_str)
class_kwargs = {ClassEvaluator.HOST_STR: host_str}
get_info = lambda x: PropertyInfo(key=x[0],
units=x[1],
is_positive=x[2],
class_evaluator=None,
class_kwargs=class_kwargs)
# yapf: disable
info_dict = dict([
(OXIDATION_PROP, get_info([msprops.OE_OXIDATION, EV, False])),
(REDUCTION_PROP, get_info([msprops.OE_REDUCTION, EV, False])),
(HOLE_PROP, get_info([msprops.OE_HOLE_REORG, EV, True])),
(ELECTRON_PROP, get_info([msprops.OE_ELECTRON_REORG, EV, True])),
(TRIPLET_PROP, get_info([msprops.OE_TRIPLET_ENERGY, EV, False])),
(TRIPLET_RMSD_PROP, get_info([msprops.OE_T1S0_RMSD, ANGSTROM, True])),
(TRIPLET_REORG_PROP, get_info([msprops.OE_TRIPLET_REORG, EV, True])),
(TADF_T1_PROP, get_info([msprops.OE_TADF_GAP, EV, False])),
(TADF_T2_PROP, get_info([msprops.OE_TADF_GAP2, EV, False])),
(TADF_T3_PROP, get_info([msprops.OE_TADF_GAP3, EV, False])),
(SPECTRUM_LMAX_PROP, get_info([msprops.OE_LMAX, NM, True])),
(SPECTRUM_RAREA_PROP, get_info([msprops.OE_RED_AREA, NM_X_INTENSITY, True])),
(SPECTRUM_GAREA_PROP, get_info([msprops.OE_GREEN_AREA, NM_X_INTENSITY, True])),
(SPECTRUM_BAREA_PROP, get_info([msprops.OE_BLUE_AREA, NM_X_INTENSITY, True])),
(FL_EMISSION_EMAX_PROP, get_info([msprops.OE_EMAX, NM, True])),
(FL_EMISSION_RAREA_PROP, get_info([msprops.OE_EMS_RED_AREA, NM_X_INTENSITY, True])),
(FL_EMISSION_GAREA_PROP, get_info([msprops.OE_EMS_GREEN_AREA, NM_X_INTENSITY, True])),
(FL_EMISSION_BAREA_PROP, get_info([msprops.OE_EMS_BLUE_AREA, NM_X_INTENSITY, True])),
(FL_EMISSION_STOKES_SHIFT_PROP, get_info([msprops.OE_STOKES_SHIFT, NM, False]))
])
# yapf: enable
return info_dict
[docs]def get_property_info(name,
jaguar_options=None,
tpp=None,
patterns=None,
host_str=None):
"""
Return a PropertyInfo for the given name and properties.
:type name: str
:param name: the property name
:type jaguar_options: dict
:param jaguar_options: contains Jaguar options
:type tpp: int
:param tpp: the threads per process
:type patterns: list
:param patterns: the SMARTS patterns
:type host_str: str
:param host_str: the host string, for example 'localhost:4'
:rtype: PropertyInfo or None
:return: the PropertyInfo
"""
se = StructureEvaluator
gtt = GlassTransitionTemperature
ri = RefractiveIndex
if name == se.SMARTS_PROP:
return se.getInfo(se.SMARTS_KEY,
se.NO_UNITS,
patterns=patterns,
host_str=host_str)
if name == se.MOL_WEIGHT_PROP:
return se.getInfo(se.MOL_WEIGHT_KEY,
se.MOL_WEIGHT_UNITS,
host_str=host_str)
if name == se.NATOMS_PROP:
return se.getInfo(se.NATOMS_KEY, se.NO_UNITS, host_str=host_str)
if name == se.NELEMENTS_PROP:
return se.getInfo(se.NELEMENTS_KEY, se.NO_UNITS, host_str=host_str)
if name == gtt.PROP:
return gtt.getInfo(host_str=host_str)
if name == ri.PROP:
return ri.getInfo(jaguar_options=jaguar_options,
tpp=tpp,
host_str=host_str)
return get_script_property_info_dict(host_str=host_str).get(name)
[docs]def get_random_csearch_seed(this_random=None):
"""
Return a random csearch seed.
:type this_random: numpy.random.RandomState or None
:param this_random: random state, if None use the module constant
:rtype: int
:return: the seed
"""
this_random = this_random or my_random
lb, ub = CONF_SEARCH_SEED_RANGE
return this_random.randint(lb, high=ub + 1)
[docs]class PropertySyntaxError(Exception):
pass
[docs]class UnknownPropertySuboptionError(Exception):
pass
[docs]class Property:
"""
Manage a property to be used in a genetic optimization.
"""
MAX = 'max'
MIN = 'min'
EQUALS = 'eq'
GREATER_THAN = 'gt'
LESS_THAN = 'lt'
SUB_OPTIONS = [
'index', 'key', 'name', 'units', 'minimax', 'target', 'comparator',
'error', 'weight', 'positive', 'patterns', 'summarize',
CanvasKPLS.MODEL_OPTION, CustomClassEvaluator.CUSTOM_CLASS_FILE_OPTION,
'class_kwargs'
]
[docs] def __init__(self,
index=1,
key=None,
name=None,
units=None,
minimax=None,
target=None,
comparator=None,
error=None,
weight=1.0,
positive=None,
summarize=None,
class_kwargs=None):
"""
Create an instance.
:type index: int
:param index: a numeric index used to refer to this Property instance,
a default of 1 is used
:type key: str
:param key: the schrodinger.structure.Structure property key to be
optimized
:type name: str
:param name: specify a name for the property, this name will be,
for example used in any `*log` files, etc.
:type units: str
:param units: enter the units that the property is in, for
example eV, nm, etc.
:type minimax: str
:param minimax: to minimize or maximize this property then set this
option to the class constants MIN or MAX
:type target: float
:param target: if instead of maximizing or minimizing the
property, the genetic optimization is supposed to handle a specific
value then enter that value using this option.
:type comparator: str
:param comparator: specify here how the target value and
computed values are to be compared, i.e. either the class constants
EQUALS for =, GREATER_THAN for >, or LESS_THAN for <.
:type error: float
:param error: if equality to a target value has been specified
then this option allows the user to control the error bounds of
the target value, if not specified then a default of 10% of the
specified target value will be used.
:type weight: float
:param weight: specify the weight to use for this property, if
the genetic optimization is to be run on several properties then
the weight allows the user to bias the solution. This option can
also be used to control a situation where more than a single
property is desired and where those properties are quantified using
different physical units such that the numbers might be orders of
magnitude apart from one another, for example comparing eV and nm.
A default of 1.0 is used.
:type positive: bool
:param positive: True if this property can only take on positive
values, for example as in the area of a surface, False otherwise,
for example as in temperature in Celcius. The default is False.
:type summarize: bool
:param summarize: if True then print a summary of this property, False
otherwise
:type class_kwargs: dict or None
:param class_kwargs: contains kwargs for class based evaluation of
this property
"""
self.setAttributes(index, key, name, units, minimax, target, comparator,
error, weight, positive, summarize, class_kwargs)
[docs] def setAttributes(self,
index=1,
key=None,
name=None,
units=None,
minimax=None,
target=None,
comparator=None,
error=None,
weight=1.0,
positive=None,
summarize=None,
class_kwargs=None):
"""
Set some attributes for this class.
:type index: int
:param index: a numeric index used to refer to this Property instance,
a default of 1 is used
:type key: str
:param key: the schrodinger.structure.Structure property key to be
optimized
:type name: str
:param name: specify a name for the property, this name will be,
for example used in any `*log` files, etc.
:type units: str
:param units: enter the units that the property is in, for
example eV, nm, etc.
:type minimax: str
:param minimax: to minimize or maximize this property then set this
option to the class constants MIN or MAX
:type target: float
:param target: if instead of maximizing or minimizing the
property, the genetic optimization is supposed to handle a specific
value then enter that value using this option.
:type comparator: str
:param comparator: specify here how the target value and
computed values are to be compared, i.e. either the class constants
EQUALS for =, GREATER_THAN for >, or LESS_THAN for <.
:type error: float
:param error: if equality to a target value has been specified
then this option allows the user to control the error bounds of
the target value, if not specified then a default of 10% of the
specified target value will be used.
:type weight: float
:param weight: specify the weight to use for this property, if
the genetic optimization is to be run on several properties then
the weight allows the user to bias the solution. This option can
also be used to control a situation where more than a single
property is desired and where those properties are quantified using
different physical units such that the numbers might be orders of
magnitude apart from one another, for example comparing eV and nm.
A default of 1.0 is used.
:type positive: bool
:param positive: True if this property can only take on positive
values, for example as in the area of a surface, False otherwise,
for example as in temperature in Celcius. The default is False.
:type summarize: bool
:param summarize: if True then print a summary of this property, False
otherwise
:type class_kwargs: dict or None
:param class_kwargs: contains kwargs for class based evaluation of
this property
"""
# make sure that we leave with a bool but handle 'False' case
def handle_boolean(prop):
try:
prop = ast.literal_eval(prop)
except ValueError:
pass
return bool(prop)
# set some attrs, handle units
self.index = index
self.key = key
self.name = name
if units == 'None':
self.units = None
else:
self.units = units
self.minimax = minimax
self.target = target
self.comparator = comparator
self.weight = weight
self.positive = handle_boolean(positive)
# if the user has specified equality to a specific target value but
# has not specified any error bounds then use +/-10% of the target
# value as the error bounds
if self.target is not None and self.comparator == self.EQUALS and error is None:
self.error = 0.10 * self.target
else:
self.error = error
# by default summarize is off for structure properties and on
# for other properties
if summarize is None:
if self.name in StructureEvaluator.PROPERTIES:
summarize = False
else:
summarize = True
self.summarize = handle_boolean(summarize)
self.class_kwargs = class_kwargs or {}
[docs] def setClassKwargs(self, class_kwargs):
"""
Set the class kwargs.
:type class_kwargs: dict or None
:param class_kwargs: contains kwargs for class based evaluation of
this property
"""
self.class_kwargs = class_kwargs
[docs] def parsePropertyString(self, property_string):
"""
Parse the attributes of this class from a string representation of
the property specifications. For example, 'index=1
key=r_matsci_Reduction_Potential_(eV) name=reduction units=eV
target=1.28 comparator=eq error=0.05 weight=0.5' or 'index=2
key=r_matsci_Oxidation_Potential_(eV) name=oxidation units=eV
minimax=max weight=2.5'
:type property_string: str
:param property_string: the string representation of the property
specifications
:raise PropertySyntaxError: if there is something wrong
with the property syntax
:raise UnknownPropertySuboptionError: if an unknown property suboption
is found
"""
BAD_SYNTAX_MSG = (
'A property must be specified using whitespace '
'delimited <key>=<value> pairs, where <key> and <value> are '
'void of whitespace. The following property has incorrect syntax: %s.'
)
NO_VALID_SUBOPTION_MSG = ('You have specified the following invalid '
'property sub-option: %s.')
# build kwargs
kwargs = {}
for option in property_string.split():
try:
key, value = option.split('=')
except ValueError:
raise PropertySyntaxError(BAD_SYNTAX_MSG % property_string)
if key not in self.SUB_OPTIONS:
raise UnknownPropertySuboptionError(NO_VALID_SUBOPTION_MSG %
key)
# see MATSCI-4321 where numeric names weren't allowed,
# i.e. 'name=4'
if key != 'name':
try:
value = float(value)
except (ValueError, TypeError):
pass
kwargs[key] = value
# coming out of the previous loop all numeric values are floats but some
# values are supposed to be integers so change them back
kwargs['index'] = int(kwargs['index'])
self.setAttributes(**kwargs)
[docs] def checkProperty(self):
"""
Check this property instance.
"""
NO_VALID_INDEX_MSG = (
'A valid integer index is needed to create a '
'Property instance. Please specify such an index using '
'\'index=<index>\'.')
NO_VALID_KEY_MSG = (
'A valid string Maestro structure property key is '
'needed to create a Property instance. Please specify such a key '
'using \'key=<key>\'.')
NO_VALID_NAME_MSG = (
'A valid string property name is needed to create a '
'Property instance. Please specify such a name using '
'\'name=<name>\'.')
INVALID_UNITS_MSG = ('If units wish to be created please specify such '
'units using \'units=<units>\'.')
NO_VALID_CONDITION_MSG = (
'A valid minimax or target value condition '
'is needed to create a Property instance. Please specify such '
'options using \'minimax=<minimax>\' or \'target=<target>\'.')
INVALID_MINIMAX_MSG = (
'If a minimax condition is desired then it must '
'be specified as either %s or %s. If not specified then a target '
'condition must be specified using \'target=<target>\'.' %
(self.MIN, self.MAX))
MULTI_CONDITION_MSG = (
'A property can only be specified using either a '
'minimax or target value condition. Please specify a single condition '
'using either \'minimax=<minimax>\' or \'target=<target>\'.')
INVALID_TARGET_MSG = (
'If specified a target value must be a number. If '
'not specified then a minimax condition must be specified using '
'\'minimax=<minimax>\'.')
INVALID_COMPARATOR_MSG = ('If a comparator is necessary then it must be '
'either %s, %s, or %s. Please specify such a comparator using '
'\'comparator=<comparator>\'.' % (self.EQUALS, self.GREATER_THAN, \
self.LESS_THAN))
REQUIRED_COMPARATOR_MSG = (
'When specifying a target value condition a '
'valid comparator from the following list is required: %s, %s, or %s. '
'Please specify such a comparator using \'comparator=<comparator>\'.'
% (self.EQUALS, self.GREATER_THAN, self.LESS_THAN))
INVALID_ERROR_MSG = (
'If you need to specify an error bound then it must '
'a number.')
NO_VALID_WEIGHT_MSG = (
'A valid numeric weight >0 is needed to create a '
'Property instance.')
INVALID_SMARTS_MSG = ('The following SMARTS pattern is invalid, %s.')
# we must have a valid index
if type(self.index) != int:
raise ValueError(NO_VALID_INDEX_MSG)
# we must have a valid key
if type(self.key) != str:
raise ValueError(NO_VALID_KEY_MSG)
# we must have a valid name
if type(self.name) != str:
raise ValueError(NO_VALID_NAME_MSG)
# we must have a valid units
if self.units is not None:
if type(self.units) != str:
raise ValueError(INVALID_UNITS_MSG)
# we must have either a minimax or target condition
if self.minimax is None and self.target is None:
raise ValueError(NO_VALID_CONDITION_MSG)
# minimax condition must be valid
if self.minimax not in [None, self.MIN, self.MAX]:
raise ValueError(INVALID_MINIMAX_MSG)
# we must have a single condition to meet
if self.minimax is not None and self.target is not None:
raise ValueError(MULTI_CONDITION_MSG)
# turn off comparator and error if minimax is set and target
# is not
if self.minimax is not None and self.target is None:
self.comparator = None
self.error = None
# we must have a valid target
if self.target is not None:
try:
self.target = float(self.target)
except ValueError:
raise ValueError(INVALID_TARGET_MSG)
# comparator must be valid
if self.comparator not in [None, self.EQUALS, self.GREATER_THAN, \
self.LESS_THAN]:
raise ValueError(INVALID_COMPARATOR_MSG)
# if target has been specified then comparator must also be
if self.target is not None and self.comparator is None:
raise ValueError(REQUIRED_COMPARATOR_MSG)
# we must have a valid error
if self.error is not None:
try:
self.error = float(self.error)
except ValueError:
raise ValueError(INVALID_ERROR_MSG)
# if greater-than or less-than a target value has been specified
# then turn off the error
if self.target is not None and self.comparator in \
[self.GREATER_THAN, self.LESS_THAN]:
self.error = None
# we must have a valid weight
try:
self.weight = float(self.weight)
assert self.weight > 0
except (ValueError, TypeError, AssertionError):
raise ValueError(NO_VALID_WEIGHT_MSG)
# we must have valid SMARTS
if self.class_kwargs:
patterns = self.class_kwargs.get(StructureEvaluator.PATTERNS, [])
for pattern in patterns:
valid, msg = analyze.validate_smarts_canvas(pattern)
if not valid:
raise ValueError(INVALID_SMARTS_MSG % pattern)
def __repr__(self):
"""
Define a representation of the class.
"""
# yapf: disable
kwargs = dict([
('index', str(self.index)),
('key', str(self.key)),
('name', str(self.name)),
('units', str(self.units)),
('minimax', str(self.minimax)),
('target', '%.2f' % self.target if self.target is not None else str(None)),
('comparator', str(self.comparator)),
('error', '%.2f' % self.error if self.error is not None else str(None)),
('weight', '%.2f' % self.weight),
('positive', str(self.positive)),
('summarize', str(self.summarize)),
('class_kwargs', str(self.class_kwargs))
])
# yapf: enable
return ' '.join([key + '=' + value for key, value in kwargs.items()])
def __eq__(self, other_property):
"""
Define an equality of the class.
"""
if self.name == other_property.name == StructureEvaluator.SMARTS_PROP:
patterns = self.class_kwargs.get(StructureEvaluator.PATTERNS, [])
other_patterns = other_property.class_kwargs.get(
StructureEvaluator.PATTERNS, [])
return patterns == other_patterns
else:
return self.name == other_property.name
[docs] def isScriptProperty(self):
"""
Return True if this property is a script property, False otherwise.
:rtype: bool
:return: return True if this property is a script property,
False otherwise
"""
return (len(self.class_kwargs) == 1 and
self.name not in StructureEvaluator.PROPERTIES)
[docs] @staticmethod
def getPropertyStrings(property_lists):
"""
Return property strings from the given property lists.
:type property_lists: list
:param property_lists: contains lists of property specifications
:rtype: list
:return: contains string representations of the property
specifications
"""
property_strings = []
for property_list in property_lists:
property_string = ' '.join(property_list)
property_string = re.sub(r'\s=', '=', property_string)
property_string = re.sub(r'=\s', '=', property_string)
property_strings.append(property_string)
return property_strings
[docs] @staticmethod
def getRelPath(file_name):
"""
Return the relative path to the given file name.
:type file_name: str
:param file_name: the file name
:rtype: str
:return: the relative path to the file
"""
# some input files are only located in the top level job
# directory but are used by all subjobs which are located in their own
# subdirectories, this function returns the path to the shared input
# file relative to the subdirectories
# MATSCI-5053: ntpath covers both Linux and Windows path
# When Linux remote host receives abs path from Windows localhost,
# os.path.basename('C:\\Users\\test3.kpls.tar.gz') gives
# 'C:\\Users\\test3.kpls.tar.gz' instead of 'test.kpls.tar.gz'
path = EVALUATOR_RELATIVE_DIR + [ntpath.basename(file_name)]
return os.path.join(*path)
[docs] @staticmethod
def getKwargs(property_string, option_substrings, add_relative_paths=None):
"""
Return kwargs of the given property options from the given property
string.
:type property_string: str
:param property_string: the string representation of the property
specifications, containing options as '<option_substring>=<value>'
:type option_substrings: list or str
:param option_substrings: contains the option substrings for the
needed values, a single occurence or list of occurences may be passed
:type add_relative_paths: list
:param add_relative_paths: contains options for which relative paths
should be added, such relative paths might be needed for correctly
parallelizing the evaluation stage of the genetic optimization as
they will be needed to copy otherwise shared files into local
subdirectories
:rtype: dict, str, or None
:return: the extracted dictionary of kwargs or single kwarg depending
on the input option_substrings or None if nothing is found
"""
if add_relative_paths is None:
add_relative_paths = []
if isinstance(option_substrings, list):
f_option_substrings = list(option_substrings)
else:
f_option_substrings = [option_substrings]
kwargs = {}
for option_substring in f_option_substrings:
splitter = option_substring.rstrip('=') + '='
if splitter not in property_string:
continue
value = property_string.split(splitter)[1]
if not value:
continue
value = value.split()[0]
if option_substring in add_relative_paths:
value = Property.getRelPath(value)
kwargs[splitter[:-1]] = value
if not kwargs:
return
elif isinstance(option_substrings, list):
return kwargs
else:
return list(kwargs.values())[0]
[docs] @staticmethod
def rmKwargs(property_string, option_substrings):
"""
Return a copy of the given property string with all of the
given property option substrings removed.
:type property_string: str
:param property_string: the string representation of the property
specifications, containing options as '<option_substring>=<value>'
:type option_substrings: list
:param option_substrings: contains the option substrings to be
removed
:rtype: str
:return: the string representation of the property
specifications less the options substrings that were to be removed
"""
kwargs = Property.getKwargs(property_string, option_substrings)
if kwargs is not None:
for pair in kwargs.items():
token = ' ' + '='.join(pair)
if token not in property_string:
token = token[1:]
property_string = property_string.replace(token, '')
return property_string
[docs] @staticmethod
def addKwargs(property_string, kwargs):
"""
Add the given options to the given property string.
:type property_string: str
:param property_string: the string representation of the property
specifications, containing options as '<option_substring>=<value>'
:type kwargs: dict
:param kwargs: key-value option pairs to add to the property string
:rtype: str
:return: the string representation of the property
specifications containin the new options
"""
f_property_string = property_string
for pair in list(kwargs.items()):
f_property_string += ' ' + '='.join(pair)
return f_property_string
[docs] @staticmethod
def getCustomInfo(property_string, name, tpp=1, host_str=None):
"""
Return a PropertyInfo.
:type property_string: str
:param property_string: the string representation of the property
specifications, containing options as '<option_substring>=<value>'
:type name: str
:param name: the property name
:type tpp: int
:param tpp: the threads per process
:type host_str: str
:param host_str: the host string, for example 'localhost:4'
:raise RuntimeError: if there is a problem
:rtype: PropertyInfo
:return: the property information
"""
units = Property.getKwargs(property_string, 'units')
pairs = [(CanvasKPLS, 'MODEL_OPTION'), (AutoQSAR, 'MODEL_OPTION'),
(DeepChem, 'MODEL_OPTION'),
(CustomClassEvaluator, 'CUSTOM_CLASS_FILE_OPTION')]
infos = []
for cls, option_attr in pairs:
option = getattr(cls, option_attr)
if option_attr == 'MODEL_OPTION':
add_relative_paths = [option]
else:
add_relative_paths = None
afile = Property.getKwargs(property_string,
option,
add_relative_paths=add_relative_paths)
if not afile:
continue
infos.append(
cls.getInfo(name, units, afile, tpp=tpp, host_str=host_str))
if len(infos) > 1:
options_str = ','.join(
getattr(cls, option_attr) for cls, option_attr in pairs)
raise RuntimeError('The following sub-options can not be used '
f'simultaneously: {options_str}.')
elif not infos:
raise RuntimeError(f'The property name {name} is unknown.')
return infos[0]
[docs]def set_title_to_stoichiometry(astructure,
toappend=None,
separation=STOICH_BASE_EXT_SEP):
"""
Set the structure title to be the stoichiometry of the structure.
:type astructure: schrodinger.structure.Structure
:param astructure: the structure
:type toappend: str
:param toappend: a string to append to the stoichiometry
:type separation: str
:param separation: used to separate the stoichiometry and the
toappend str
"""
formula = analyze.generate_molecular_formula(astructure)
if toappend:
formula += separation + toappend
astructure.title = formula
astructure.property[ENTRY_NAME_KEY] = astructure.title
[docs]class StructureGenome(GenomeBase.GenomeBase):
"""
Manage a genome. The genome, aka chromosome, is the solution to the
problem trying to be solved via genetic optimization. It is referred
to as being composed of genes that are manipulated by the crossover
and mutation operators. In our genetic optimization module this genome
is basically just a schrodinger.structure.Structure object.
"""
[docs] def __init__(self):
"""
Create an instance.
"""
GenomeBase.GenomeBase.__init__(self)
# use attributes of genome for parameters that are unique
# to this genome and use the genome.setParams() method for
# parameters that are shared by all genomes in the population
self.astructure = None
self.basename_ext = None
self.seed = None
self.skip = None
self.failure = None
self.infile = None
self.outfile = None
[docs] def copy(self, genome):
"""
Copy the current genome to the provided genome.
:type genome: StructureGenome
:param genome: a new genome instance to which to copy the
current genome
"""
GenomeBase.GenomeBase.copy(self, genome)
if self.astructure:
genome.astructure = self.astructure.copy()
# use a unique random extension for the job sub-directory of
# this genome to avoid a threading race condition with regard
# to creating such sub-directories when doing the evaluation
# step in the genetic optimization
chars = list(
my_random.choice(list(string.ascii_letters + string.digits),
size=6,
replace=False))
genome.basename_ext = ''.join(chars)
genome.seed = my_random.randint(RANDOM_INT_BOUND + 1)
genome.skip = None
genome.failure = None
genome.infile = None
genome.outfile = None
[docs] def clone(self):
"""
Clone the current genome.
:rtype: StructureGenome
:return: genome
"""
genome = StructureGenome()
self.copy(genome)
return genome
[docs] def updateStructureProperties(self, index, generation):
"""
Update some structure properties.
:type index: int
:param index: the index of this individual
:type generation: int
:param generation: this generation
"""
self.astructure.property[FIT_SCORE_KEY] = self.getFitnessScore()
self.astructure.property[RAW_SCORE_KEY] = self.getRawScore()
self.astructure.property[GENERATION_KEY] = generation
self.astructure.property[INDIVIDUAL_KEY] = index
self.astructure.property[SKIP_KEY] = bool(self.skip)
self.astructure.property[FAILURE_KEY] = bool(self.failure)
[docs] def resetParentProperties(self):
"""
Reset the crossover and mutation parent structure properties.
"""
self.astructure.property[CROSSOVER_PARENTS_KEY] = ''
self.astructure.property[MUTATION_PARENT_KEY] = ''
self.astructure.property[CROSSOVER_APPLIED_KEY] = ''
self.astructure.property[MUTATION_APPLIED_KEY] = ''
self.astructure.property[SKIP_KEY] = False
self.astructure.property[FAILURE_KEY] = False
self.astructure.property.pop(CONF_SEARCH_FAILED_KEY, None)
self.astructure.property.pop(FROM_FREEZER_KEY, None)
self.astructure.property.pop(INOCULATE_KEY, None)
[docs] def removeProperties(self):
"""
Remove some structure properties.
"""
props_to_remove = self.getParam('props_to_remove')
if props_to_remove:
for prop_to_remove in props_to_remove:
self.astructure.property.pop(prop_to_remove, None)
self.astructure.property.pop(STRUCTURE_SCORE_KEY, None)
# MATSCI-2893 - remove EZ stereochemistry properties
mm.mmct_ct_clear_stereo(self.astructure)
[docs] def optimizeGeometry(self):
"""
Optimize the geometry of this genome's structure using OPLS.
"""
if not self.getParam('no_minimize'):
rxn_channel.minimize_geometry(self.astructure)
[docs] def addPreviousFreezerFile(self, freezer_file):
"""
Add the given file to the list of previous freezer files.
:type freezer_file: str
:param freezer_file: the name of the file to be added
"""
# the dictionary accessed with an individual's setParams method
# is shared by all individuals in the population and so it would
# seem that you only need to set it for one individual and it would
# be set for all, this is the behavior when run in serial,
# however, the other individuals appear to not be updated
# in this way when running in parallel, so in general we will set it
# for all individuals (done in the loop that calls this method)
freezer_files = self.getParam('freezer_files')
if freezer_file not in freezer_files[PREVIOUS]:
freezer_files[PREVIOUS].append(freezer_file)
self.setParams(freezer_files=freezer_files)
[docs] def evaluate(self, **args):
"""
Evaluate the score of this individual.
:type args: dict
:param args: dictionary of genetic optimization parameters
created and used by pyevolve
"""
# this is to properly catch exceptions generated by
# child processes
try:
GenomeBase.GenomeBase.evaluate(self, **args)
except Exception as msg:
traceback.print_exc()
raise msg
# initializers
[docs]def from_initial_population(genome, **args):
"""
Draw a unique genome from the initial population.
:type genome: StructureGenome
:param genome: a genome
:type args: dict
:param args: dictionary of genetic optimization parameters
created and used by pyevolve
"""
initial_population = genome.getParam('initial_population')
genome.astructure = initial_population.pop()
set_title_to_stoichiometry(genome.astructure, toappend=genome.basename_ext)
genome.resetParentProperties()
genome.removeProperties()
genome.setParams(initial_population=initial_population)
[docs]def get_num_simple_bonds(astructure):
"""
Return the number of simple bonds in the provided structure.
The definition of a simple bond follows from that used in the
reaction channel module and is an acyclic single order bond
that may involve a hydrogen atom.
:type astructure: schrodinger.structure.Structure
:param astructure: the structure for which to get the number of
simple bonds
:rtype: int
:return: the number of simple bonds
"""
num_simple_bonds = 0
for bond in astructure.bond:
if not any(rxn_channel.CheckInput().checkIfNotSimpleBond(
astructure, bond.atom1.index, bond.atom2.index, False)):
num_simple_bonds += 1
return num_simple_bonds
[docs]def combine_two_structures(astructure, bstructure, offset=10.0):
"""
Combine two structure objects into a single structure object using
somewhat arbitrary placement.
:type astructure: schrodinger.structure.Structure
:param astructure: the first of the structures to be combined
:type bstructure: schrodinger.structure.Structure
:param bstructure: the second of the structures to be combined
:type offset: float
:param offset: the final distance between the structures will be
the sum of the molecular VDW radii plus this offset in Angstrom
:rtype: schrodinger.structure.Structure
:return: the combined structure object
"""
THRESH = 0.1
# get a target distance used to separate the two structures
aatom1, aatom2, adistance = rxn_path.max_pair_vdw_distance(astructure)
batom1, batom2, bdistance = rxn_path.max_pair_vdw_distance(bstructure)
target_distance = adistance / 2 + bdistance / 2 + offset
# get some parameters
acentroid = transform.get_centroid(astructure)
bcentroid = transform.get_centroid(bstructure)
atobvector = bcentroid - acentroid
current_distance = transform.get_vector_magnitude(atobvector)
# if the two structures are on top of each other then use the
# x-axis as the vector along which to separate them
if current_distance > THRESH:
atobunitvector = transform.get_normalized_vector(atobvector)
else:
atobunitvector = numpy.append(transform.X_AXIS, 0.0)
# translate the second structure
delta = target_distance - current_distance
delta_vector = delta * atobunitvector
delta_vector = numpy.delete(delta_vector, 3)
delta_x = numpy.dot(delta_vector, transform.X_AXIS)
delta_y = numpy.dot(delta_vector, transform.Y_AXIS)
delta_z = numpy.dot(delta_vector, transform.Z_AXIS)
transform.translate_structure(bstructure, delta_x, delta_y, delta_z)
# combine the structures
cstructure = astructure.copy()
cstructure.extend(bstructure)
return cstructure
# crossovers
[docs]def bond_crossover(genome, **args):
"""
Perform a crossover operation by swapping molecular fragments at
two randomly choosen bonds, i.e. a double displacement reaction
channel.
:type genome: StructureGenome
:param genome: a genome
:type args: dict
:param args: dictionary of genetic optimization parameters
created and used by pyevolve
:rtype: tuple
:return: tuple containing the sister and brother StructureGenome
"""
# get, clone, and reset mom and dad genomes into brother and
# sister genomes
gmom = args['mom']
gdad = args['dad']
gsister = gmom.clone()
gbrother = gdad.clone()
gsister.resetStats()
gbrother.resetStats()
gsister.removeProperties()
gbrother.removeProperties()
# get parents string
gmom_title = gmom.astructure.title
gdad_title = gdad.astructure.title
parents = gmom_title + ' + ' + gdad_title
parents_basename_ext = gmom.basename_ext + '.' + gdad.basename_ext
# get some parameters
no_minimization = gsister.getParam('no_minimize')
freezer_files = gsister.getParam('freezer_files')
inoculate = gsister.getParam('inoculate')
# prep for reaction channel by combining the structures
combo = combine_two_structures(gsister.astructure, gbrother.astructure)
# first try to find a single unique random double displacement channel that
# does not involve hydrogen, if this is not possible (for example consider
# A-A + A-A or A-A + A-B, i.e. ethane + ethane or ethane + methanol) then
# try to find a single unique random double displacement channel that may
# involve hydrogen, note that here it is possible that the dihydrogen
# molecule may be produced as a child (for example consider the case where
# both bonds involve hydrogen, i.e. A-H + B-H), if even with enabling
# hydrogens we still can't get a unique channel, for example with
# parents like H-H + H-H or A-H + H-H, then for this worst case scenario
# we either just return brother and sister the same as mom and dad, with
# the understanding that a different genetic operator will make them
# amenable to bond crossover (each structure in the initial population is
# guaranteed to be a candidate for a least one of the specified genetic
# operators) or we draw children from the freezer depending on inoculate
no_reactive_h = True
channel_defs = rxn_channel.ChannelDefinitions()
random_seed = my_random.randint(RANDOM_INT_BOUND + 1)
channel_defs.addRandomChannels(combo,
num_random_channels=1,
random_types=['double_displacement'],
no_reactive_h=no_reactive_h,
random_seed=random_seed,
unique=True,
no_reactive_polymer_monomer_backbone=False,
no_reactive_polymer_monomer_sidechain=False)
if not channel_defs.definitions:
no_reactive_h = False
channel_defs = rxn_channel.ChannelDefinitions()
random_seed = my_random.randint(RANDOM_INT_BOUND + 1)
channel_defs.addRandomChannels(
combo,
num_random_channels=1,
random_types=['double_displacement'],
no_reactive_h=no_reactive_h,
random_seed=random_seed,
unique=True,
no_reactive_polymer_monomer_backbone=False,
no_reactive_polymer_monomer_sidechain=False)
if not channel_defs.definitions:
if NO_CHILD in inoculate:
gsister.astructure = get_freezer_structure(
freezer_files,
inoculate=NO_CHILD,
crossover_applied=BOND_CROSSOVER,
basename_ext=gsister.basename_ext)
gbrother.astructure = get_freezer_structure(
freezer_files,
inoculate=NO_CHILD,
crossover_applied=BOND_CROSSOVER,
basename_ext=gbrother.basename_ext)
return (gsister, gbrother)
# run the reaction channel module
mae_out_file = 'crossover' + '.' + parents_basename_ext + '.mae'
channels = rxn_channel.Channels(combo,
channel_defs,
mae_out_file=mae_out_file,
isolate_products=True,
no_minimization=no_minimization,
no_reactive_h=no_reactive_h,
unique=True)
channels.orchestrate()
# in the general case the 5 returned structures are (1) reactant, combination of
# two structures i.e. A-B + C-D, (2) A-C, (3) B-D, (4) A-D, and (5) B-C however
# we may also have only a single sub-channel, i.e. 3 structures
structures = [
astructure for astructure in structure.StructureReader(mae_out_file)
]
structures = structures[1:]
fileutils.force_remove(mae_out_file)
# handle cases with regard to A-A + B-B (1 unique product),
# like A-B + C-C (2 unique products), and A-B + C-D
# (4 unique products), note that it is also possible to get
# 3 unique products here for example when the A-C and B-D
# or A-D and B-C products have the same SMILES
if len(structures) == 1:
mol_one = structures[0]
if NO_CHILD in inoculate:
mol_two = get_freezer_structure(freezer_files,
inoculate=NO_CHILD,
crossover_applied=BOND_CROSSOVER,
basename_ext=gbrother.basename_ext)
else:
mol_two = mol_one.copy()
elif len(structures) == 2:
mol_one, mol_two = structures
elif len(structures) == 3:
# of the three we want to pick the two that came from the same reaction
# sub-channel
first_rxn = structures[0].property[RXN_KEY]
second_rxn = structures[1].property[RXN_KEY]
if first_rxn == second_rxn:
mol_one = structures[0]
mol_two = structures[1]
else:
mol_one = structures[1]
mol_two = structures[2]
elif len(structures) == 4:
mol_ac, mol_bd, mol_ad, mol_bc = structures
# if both reactive bonds were in side chains then prevent using
# the channel that features a product that is not a monomer, otherwise
# choose between the A-C + B-D and A-D + B-C products by minimizing
# the standard deviation in the size, do this with the specified
# probability using a biased coin flip
acbd_are_monomers = rxn_channel.are_monomers([mol_ac, mol_bd])
adbc_are_monomers = rxn_channel.are_monomers([mol_ad, mol_bc])
if acbd_are_monomers and not adbc_are_monomers:
mol_one, mol_two = mol_ac, mol_bd
elif not acbd_are_monomers and adbc_are_monomers:
mol_one, mol_two = mol_ad, mol_bc
else:
first_rxn_std = numpy.std(
numpy.array([mol_ac.atom_total, mol_bd.atom_total]))
second_rxn_std = numpy.std(
numpy.array([mol_ad.atom_total, mol_bc.atom_total]))
if first_rxn_std <= second_rxn_std:
small_stdev = mol_ac, mol_bd
large_stdev = mol_ad, mol_bc
else:
small_stdev = mol_ad, mol_bc
large_stdev = mol_ac, mol_bd
if Util.randomFlipCoin(SIMILAR_STDEV_CHILDREN_FREQ):
mol_one, mol_two = small_stdev
else:
mol_one, mol_two = large_stdev
# the following can happen for example if a polymer monomer
# reacts with itself by just swapping equivalent side chains,
# in this case one channel produces products that are equivalent
# to the reactants and thus are deduped while the other channel
# produces one product that functions as a monomer and one product
# that is not a monomer
mol_one_is_monomer = rxn_channel.are_monomers([mol_one])
mol_two_is_monomer = rxn_channel.are_monomers([mol_two])
if (mol_one_is_monomer and not mol_two_is_monomer) or \
(not mol_one_is_monomer and mol_two_is_monomer):
if NO_CHILD in inoculate:
mol_one = get_freezer_structure(freezer_files,
inoculate=NO_CHILD,
crossover_applied=BOND_CROSSOVER,
basename_ext=gsister.basename_ext)
mol_two = get_freezer_structure(freezer_files,
inoculate=NO_CHILD,
crossover_applied=BOND_CROSSOVER,
basename_ext=gbrother.basename_ext)
else:
mol_one = gsister.astructure.copy()
mol_two = gbrother.astructure.copy()
gsister.astructure = mol_one
gbrother.astructure = mol_two
# update properties
set_title_to_stoichiometry(gsister.astructure,
toappend=gsister.basename_ext)
set_title_to_stoichiometry(gbrother.astructure,
toappend=gbrother.basename_ext)
# clean up
gsister.astructure.property.pop(RXN_KEY, None)
gbrother.astructure.property.pop(RXN_KEY, None)
# set parents string
gsister.astructure.property[CROSSOVER_APPLIED_KEY] = BOND_CROSSOVER
if not gsister.astructure.property.get(FROM_FREEZER_KEY):
gsister.astructure.property[CROSSOVER_PARENTS_KEY] = parents
gbrother.astructure.property[CROSSOVER_APPLIED_KEY] = BOND_CROSSOVER
if not gbrother.astructure.property.get(FROM_FREEZER_KEY):
gbrother.astructure.property[CROSSOVER_PARENTS_KEY] = parents
return (gsister, gbrother)
[docs]def get_element_mutator_dict(astructure):
"""
Return a dictionary where the keys contain the indicies of the
mutatable atoms and the values contain those elements that the
keyed atom may be mutated to.
:type astructure: schrodinger.structure.Structure
:param astructure: the structure to be mutated
:rtype: dict
:return: keys are atom indicies of those atoms that are mutatable
and values are those elements that the atom can be mutated to
"""
# group hydrogen with the halogens, the following is not meant to
# be exhaustive, keys are SMARTS patterns where the mutatable atom
# is first and values are lists containing the elements that the
# mutatable atom may be mutated to, currently the key-value pairs
# may be in any order
ALL_PATTERNS_DICT = {
'[H]': ['F', 'Cl', 'Br', 'I'],
'[C]': ['Si'],
'[N]': ['P'],
'[O]': ['S'],
'[F]': ['H', 'Cl', 'Br', 'I'],
'[Si]': ['C'],
'[p-0X2]': ['N'],
'[P-0X2]': ['N'],
'[P-0X3]': ['N'],
'[S-0X1]': ['O'],
'[S-0X2]': ['O'],
'[Cl]': ['H', 'F', 'Br', 'I'],
'[Br]': ['H', 'F', 'Cl', 'I'],
'[I]': ['H', 'F', 'Cl', 'Br'],
'[Ti]': ['Zr', 'Hf'],
'[Zr]': ['Ti', 'Hf'],
'[Hf]': ['Ti', 'Zr'],
'[V]': ['Nb', 'Ta'],
'[Nb]': ['V', 'Ta'],
'[Ta]': ['V', 'Nb'],
'[Cr]': ['Mo', 'W'],
'[Mo]': ['Cr', 'W'],
'[W]': ['Cr', 'Mo'],
'[Mn]': ['Tc', 'Re'],
'[Tc]': ['Mn', 'Re'],
'[Re]': ['Mn', 'Tc'],
'[Fe]': ['Ru', 'Os'],
'[Ru]': ['Fe', 'Os'],
'[Os]': ['Fe', 'Ru'],
'[Co]': ['Rh', 'Ir'],
'[Rh]': ['Co', 'Ir'],
'[Ir]': ['Co', 'Rh'],
'[Ni]': ['Pd', 'Pt'],
'[Pd]': ['Ni', 'Pt'],
'[Pt]': ['Ni', 'Pd'],
'[Cu]': ['Ag', 'Au'],
'[Ag]': ['Cu', 'Au'],
'[Au]': ['Cu', 'Ag'],
'[Zn]': ['Cd', 'Hg'],
'[Cd]': ['Zn', 'Hg'],
'[Hg]': ['Zn', 'Cd']
}
mutatable_dict = {}
for pattern, mutate_to in ALL_PATTERNS_DICT.items():
matches = _monomer_evaluate_smarts_canvas(astructure, pattern)
for match in matches:
if not rxn_channel.get_grow_idxs(astructure, idxs=[match[0]]):
mutatable_dict[match[0]] = mutate_to
return mutatable_dict
[docs]def get_isoelectronic_mutator_indicies(astructure):
"""
Return a list of atom indicies that can be mutated by the isoelectronic
mutator.
:type astructure: schrodinger.structure.Structure
:param astructure: the structure to be mutated
:rtype: list
:return: mutatable indicies
"""
def find_heavy_atom(astructure, match):
for i in match:
atom = astructure.atom[i]
if atom.atomic_number > 1:
return atom
# isoelectronic SMARTS include aromatics
NINE_ELECTRONS_SINGLE = [
'[#1][C-0X4]([#1])[#1]', '[#1][N-0X3][#1]', '[#1][O-0X2]', '[F-0X1]'
]
EIGHT_ELECTRONS_SINGLE = ['[#1][C-0X4][#1]', '[#1][N-0X3]', '[O-0X2]']
EIGHT_ELECTRONS_DOUBLE = ['[#1][C-0X3][#1]', '[#1][N-0X2]', '[O-0X1]']
SEVEN_ELECTRONS_SINGLE = ['[#1][C-0X4]', '[N-0X3]']
SEVEN_ELECTRONS_DOUBLE = [
'[#1][c-0X3]', '[#1][C-0X3]', '[n-0X2]', '[N-0X2]'
]
SEVEN_ELECTRONS_TRIPLE = ['[#1][C-0X2]', '[N-0X1]']
ALL_PATTERNS = NINE_ELECTRONS_SINGLE + EIGHT_ELECTRONS_SINGLE + \
EIGHT_ELECTRONS_DOUBLE + SEVEN_ELECTRONS_SINGLE + SEVEN_ELECTRONS_DOUBLE + \
SEVEN_ELECTRONS_TRIPLE
# collect mutatable indicies
mutatable_indicies = set()
for pattern in ALL_PATTERNS:
matches = _monomer_evaluate_smarts_canvas(astructure, pattern)
for match in matches:
heavy_atom = find_heavy_atom(astructure, match)
idxs = [heavy_atom.index
] + [atom.index for atom in heavy_atom.bonded_atoms]
if not rxn_channel.get_grow_idxs(astructure, idxs=idxs):
mutatable_indicies.add(heavy_atom.index)
mutatable_indicies = list(mutatable_indicies)
mutatable_indicies.sort()
return mutatable_indicies
[docs]def get_child_like_parent(parent_st, children_sts, definition):
"""
Return the child structure that is most like the provided parent.
:type parent_st: schrodinger.structure.Structure
:param parent_st: the parent structure
:type children_sts: list of schrodinger.structure.Structure
:param children_sts: the children structures
:type definition: two-element list
:param definition: each sublist contains two atom indicies
describing the reactive bonds in parent and fragment structures
which created the children
:rtype: schrodinger.structure.Structure
:return: the sought child structure
"""
# from the chosen channel determine how many atoms are on either side
# of the reactive bond (inclusive) in the parent structure
bond_parent, bond_fragment = definition
aside_indicies = rxn_channel.indicies_from_bonds_deep(
parent_st, bond_parent[0], [bond_parent[1]])
aside_natoms = len(aside_indicies)
bside_natoms = parent_st.atom_total - aside_natoms
# set up the following data structure to easily handle the
# categorization of product structures given that the number
# of product structures may be less than four due to the
# unique option passed to reaction channel, here is the
# list of possible a1_b1_a2_b2 for the following reaction
# nomenclature, i.e. parent(A-B) + fragment(C-D) --> (1)
# A-C (aka a1) + B-D (aka b1) or (2) A-D (aka a2) + B-C
# (aka b2):
#
# [[ a1, None], [None, None]] A-A + C-C would skip second channel
# and eliminate b1 via SMILES
# [[ a1, b1], [None, None]] A-B + C-C would skip second channel
# [[ a1, None], [ a2, b2]] A-B + C-D and eliminate b1 via SMILES
# [[ a1, b1], [ a2, None]] A-B + C-D and eliminate b2 via SMILES
# [[ a1, b1], [ a2, b2]] A-B + C-D
#
NO_RXN = 2 * [None]
a1_b1_a2_b2 = [list(NO_RXN), list(NO_RXN)]
for astructure in children_sts:
pieces = astructure.title.split('.')
structure_idx = int(pieces.pop()) - 1
reaction_idx = int(pieces.pop()) - 1
a1_b1_a2_b2[reaction_idx][structure_idx] = astructure
# Mutation is meant to create a child structure by introducing
# a small amount of new genetic information into the parent
# structure and so the choice of child will be biased toward
# that containing the larger portion of the parent, if there is
# only one child like this the proceed in the obvious way, otherwise
# prefer smaller children unless they differ from the parent
# by a single atom (which really makes them medium sized children)
# in which case flip a biased coin to choose between the smallest
# child (that most like the parent and least like the fragment)
# and largest child (that most like the parent and most like the
# fragment), note that this chunk of code was added as a
# preventative measure against increasingly larger children
# handle one (in the if) or two (in the else) reactions
natoms_func = lambda astructure: astructure.atom_total
if NO_RXN in a1_b1_a2_b2:
# in this case there is only one child that contains the
# A-part of the parent and possibly one child that contains
# the B-part of the parent, the fragment component for these
# two children is identical so just pick the largest child
a1, b1 = a1_b1_a2_b2[0]
if not b1:
child = a1
elif not a1:
child = b1
else:
child = sorted([a1, b1], key=natoms_func).pop()
else:
# in this case we are guaranteed to have two children that
# contain the A-part of the parent and either one or two
# children that contain the B-part of the parent, for those
# that contain A and those that contain B determine the smaller
# and the larger, the inequality sets whether we will want to
# pick from A or from B in order to obtain a child that resembles
# the parent more than the fragment
#
# for a polymer monomer reacting at the side chain (A'-B), where the
# prime indicates that part with grow indices, with a non-monomer (C-D)
# fragment we have:
#
# A'-B + C-D --> (1) A'-C + B-D and (2) A'-D + B-C
#
# but the child must be a monomer and so only A'-C and A'-D are
# possible meaning that b1 and b2 can only ever be None
def get_small_big(pair):
sts = [st for st in pair if st]
if not sts:
return
elif len(sts) == 1:
return (sts[0], None)
elif len(sts) == 2:
return tuple(sorted(sts, key=natoms_func))
a1_a2, b1_b2 = map(get_small_big, zip(*a1_b1_a2_b2))
if a1_a2 and not b1_b2:
small, big = a1_a2
natoms = aside_natoms
elif not a1_a2 and b1_b2:
small, big = b1_b2
natoms = bside_natoms
elif a1_a2 and b1_b2:
if aside_natoms >= bside_natoms:
small, big = a1_a2
natoms = aside_natoms
else:
small, big = b1_b2
natoms = bside_natoms
# pick the smaller if (1) there is only a single option, (2)
# the smaller is different from the parent by more than a single
# atom (this avoids always picking the child that for example
# contains a single hydrogen from the fragment), or (3) if the
# smaller child is different from the parent by only a single
# atom then roll a biased coin to decide between picking that
# smaller child or just pick the larger child
if not big or small.atom_total != natoms + 1 or \
Util.randomFlipCoin(SMALL_CHILD_FREQ):
child = small
else:
child = big
return child
# mutators
[docs]def elemental_mutator(genome, **args):
"""
Perform a random elemental mutation to an element in the same column
(as known as group) of the periodic table. Note that hydrogen and
the halogens are considered to belong to the same column.
:type genome: StructureGenome
:param genome: a genome
:type args: dict
:param args: dictionary of genetic optimization parameters
created and used by pyevolve
:rtype: int
:return: the number of mutations applied, appears to never
be used in PyEvolve
"""
# get the mutation probability and test whether to perform a
# single mutation at the requested mutation rate
mutprob = args['pmut']
if not Util.randomFlipCoin(mutprob):
nmutations = 0
return nmutations
# reset the genome
genome.removeProperties()
# get some parameters
freezer_files = genome.getParam('freezer_files')
inoculate = genome.getParam('inoculate')
# get mutatable indicies and return if this structure is not
# mutatable
mutatable_indicies = get_element_mutator_dict(genome.astructure)
if not mutatable_indicies:
nmutations = 0
if NO_CHILD in inoculate:
genome.astructure = get_freezer_structure(
freezer_files,
inoculate=NO_CHILD,
mutation_applied=ELEMENTAL_MUTATOR,
basename_ext=genome.basename_ext)
nmutations = 1
return nmutations
# get parent string
parent = genome.astructure.title
# perform the single mutation
index = my_random.choice(list(mutatable_indicies))
elements = mutatable_indicies[index]
genome.astructure.atom[index].element = my_random.choice(elements)
# finish up, remove_basename_ext covers the case where the mutation is performed
# directly following a crossover, perform geometry optimization
set_title_to_stoichiometry(genome.astructure, toappend=genome.basename_ext)
color.apply_color_scheme(genome.astructure, 'element')
genome.astructure.property[MUTATION_PARENT_KEY] = remove_basename_ext(
parent)
genome.astructure.property[MUTATION_APPLIED_KEY] = ELEMENTAL_MUTATOR
genome.optimizeGeometry()
nmutations = 1
return nmutations
[docs]def fragment_mutator(genome, **args):
"""
Randomly mutate the genome by swapping a molecular fragement on one
side of a bond by a similar fragment from a library.
:type genome: StructureGenome
:param genome: a genome
:type args: dict
:param args: dictionary of genetic optimization parameters
created and used by pyevolve
:rtype: int
:return: the number of mutations applied, appears to never
be used in PyEvolve
"""
# get the mutation probability and test whether to perform a
# single mutation at the requested mutation rate
mutprob = args['pmut']
if not Util.randomFlipCoin(mutprob):
nmutations = 0
return nmutations
# reset the genome
genome.removeProperties()
# get some parameters
freezer_files = genome.getParam('freezer_files')
inoculate = genome.getParam('inoculate')
# get the number of mutatable bonds, i.e. the number of simple bonds,
# and return if this structure is not mutatable
num_simple_bonds = get_num_simple_bonds(genome.astructure)
if not num_simple_bonds:
nmutations = 0
if NO_CHILD in inoculate:
genome.astructure = get_freezer_structure(
freezer_files,
inoculate=NO_CHILD,
mutation_applied=FRAGMENT_MUTATOR,
basename_ext=genome.basename_ext)
nmutations = 1
return nmutations
# get parent string
parent = genome.astructure.title
parent_basename_ext = genome.basename_ext
# build the fragment dict
fragment_dict = {FRAGMENT: genome.getParam('fragment_libs')}
is_monomer = rxn_channel.are_monomers([genome.astructure])
# perform the single mutation
found_fragment = False
num_unproductive = 0
while not found_fragment:
if num_unproductive == TRIES_FRAGMENT_MUTATOR:
nmutations = 0
if NO_CHILD in inoculate:
genome.astructure = get_freezer_structure(
freezer_files,
inoculate=NO_CHILD,
mutation_applied=FRAGMENT_MUTATOR,
basename_ext=genome.basename_ext)
nmutations = 1
return nmutations
# get a random fragment
fragment_st = get_random_structure(fragment_dict)
if not fragment_st:
num_unproductive += 1
continue
fragment_st.property.pop(FROM_FREEZER_KEY, None)
rxn_channel.remove_grow_properties(fragment_st)
# arbitrarily combine the two structure objects into a single structure
# object to prep for reaction channel
combo = combine_two_structures(genome.astructure, fragment_st)
# find a single unique random double displacement channel or in the worst
# case scenario we can't just return the number of mutations applied, try
# random fragments TRIES_FRAGMENT_MUTATOR times before giving up, having to do
# more than a single pass is very rare because here as opposed to bond
# crossover no_reactive_h is False, for example only for A-A + A-A
# or A-A + A-B scenarios, i.e. if the genome is Cl-Cl and the fragment is
# Cl-Cl (ethane + ethane or ethane + methanol is now fine because of the
# reactive H)
channel_defs = rxn_channel.ChannelDefinitions()
random_seed = my_random.randint(RANDOM_INT_BOUND + 1)
channel_defs.addRandomChannels(
combo,
num_random_channels=1,
random_types=['double_displacement'],
no_reactive_h=False,
random_seed=random_seed,
unique=True,
bin_rxn_types=True,
no_reactive_polymer_monomer_backbone=True,
no_reactive_polymer_monomer_sidechain=False)
if not channel_defs.definitions:
num_unproductive += 1
else:
found_fragment = True
# run the reaction channel module
mae_out_file = 'mutate' + '.' + parent_basename_ext + '.mae'
channels = rxn_channel.Channels(
combo,
channel_defs,
mae_out_file=mae_out_file,
isolate_products=True,
no_minimization=genome.getParam('no_minimize'),
no_reactive_h=False,
unique=True)
channels.orchestrate()
# in the general case the 5 returned structures are (1) reactant, combination of
# two structures i.e. A-B + C-D, (2) A-C, (3) B-D, (4) A-D, and (5) B-C however
# we may also have only a single sub-channel, i.e. 3 structures
structures = [
astructure for astructure in structure.StructureReader(mae_out_file)
]
structures = structures[1:]
fileutils.force_remove(mae_out_file)
# require a monomer
if is_monomer:
structures = [st for st in structures if rxn_channel.are_monomers([st])]
# with a certain probability get that child structure which is most like
# the parent
genome.astructure = get_child_like_parent(
genome.astructure, structures, channel_defs.definitions[0].listdef)
# clean up
genome.astructure.property.pop(RXN_KEY, None)
# finish up, remove_basename_ext covers the case where the mutation is performed
# directly following a crossover
set_title_to_stoichiometry(genome.astructure, toappend=genome.basename_ext)
genome.astructure.property[MUTATION_PARENT_KEY] = remove_basename_ext(
parent)
genome.astructure.property[MUTATION_APPLIED_KEY] = FRAGMENT_MUTATOR
nmutations = 1
return nmutations
[docs]def isoelectronic_mutator(genome, **args):
"""
Perform a random isoelectronic mutation from the following sets of
series CH3X, NH2X, OHX, and FX, CH2XY, NHXY, OXY, and CHXYZ and NXYZ,
where X, Y, and Z are non-H-bonds.
:type genome: StructureGenome
:param genome: a genome
:type args: dict
:param args: dictionary of genetic optimization parameters
created and used by pyevolve
:rtype: int
:return: the number of mutations applied, appears to never
be used in PyEvolve
"""
# get the mutation probability and test whether to perform a
# single mutation at the requested mutation rate
mutprob = args['pmut']
if not Util.randomFlipCoin(mutprob):
nmutations = 0
return nmutations
# reset the genome
genome.removeProperties()
# get some parameters
freezer_files = genome.getParam('freezer_files')
inoculate = genome.getParam('inoculate')
# get mutatable indicies and return if this structure is not
# mutatable
mutatable_indicies = get_isoelectronic_mutator_indicies(genome.astructure)
if not mutatable_indicies:
nmutations = 0
if NO_CHILD in inoculate:
genome.astructure = get_freezer_structure(
freezer_files,
inoculate=NO_CHILD,
mutation_applied=ISOELECTRONIC_MUTATOR,
basename_ext=genome.basename_ext)
nmutations = 1
return nmutations
# get parent string
parent = genome.astructure.title
# perform the single mutation
index = my_random.choice(mutatable_indicies)
target_heavy = genome.astructure.atom[index]
target_hydrogens = [bonded_atom.index for bonded_atom in \
target_heavy.bonded_atoms if bonded_atom.atomic_number == 1]
to_elements = ['C', 'N', 'O', 'F']
for tmpidx in range(9 - target_heavy.atomic_number - len(target_hydrogens)):
to_elements.pop()
to_elements.remove(target_heavy.element)
to_element = my_random.choice(to_elements)
renumber_map = genome.astructure.deleteAtoms(target_hydrogens,
renumber_map=True)
new_index = renumber_map[index]
target_heavy = genome.astructure.atom[new_index]
target_heavy.element = to_element
build.add_hydrogens(genome.astructure, atom_list=[target_heavy])
# finish up, remove_basename_ext covers the case where the mutation is performed
# directly following a crossover, perform geometry optimization
set_title_to_stoichiometry(genome.astructure, toappend=genome.basename_ext)
color.apply_color_scheme(genome.astructure, 'element')
genome.astructure.property[MUTATION_PARENT_KEY] = remove_basename_ext(
parent)
genome.astructure.property[MUTATION_APPLIED_KEY] = ISOELECTRONIC_MUTATOR
genome.optimizeGeometry()
nmutations = 1
return nmutations
[docs]def get_loggable_float(afloat,
num_decimal=NUM_DECIMAL,
field_width=FIELD_WIDTH):
"""
Return a float as a string with the specified format.
:type afloat: float
:param afloat: a float to convert to a string
:type num_decimal: str
:param num_decimal: the format of the string representation
:type field_width: int
:param field_width: the field width of the final string
:rtype: str
:return: the float as a string
"""
afloat = num_decimal % afloat
afloat = afloat.rjust(field_width)
return afloat
# callbacks
[docs]def uniquify_titles_callback(ga_obj):
"""
Callback to uniquify titles of the individuals.
:type ga_obj: GSimpleGA.GSimpleGA
:param ga_obj: the entire current state of the genetic
optimization
"""
generation = ga_obj.getCurrentGeneration()
population = ga_obj.getPopulation()
for index, indiv in enumerate(population, 1):
ext = '_%s_%s' % (str(generation), str(index))
# titles coming into here look like either (1) <stoichiometry>.<basename_ext>
# if the structure passed through the operators or (2) <stoichiometry>_%s_%s
# if it did not (either because of elitism or the rates of operators), the order
# of splitting is irrelevant because there will never be both patterns
title = indiv.astructure.title.split('_')[0]
title = remove_basename_ext(title) + ext
indiv.astructure.title = title
indiv.astructure.property[ENTRY_NAME_KEY] = title
if indiv.skip:
indiv.skip.updateTitle(title)
if indiv.failure:
indiv.failure.updateTitle(title)
[docs]def prepare_next_generation_dirs_callback(ga_obj):
"""
Callback to update the generation property of the
genomes and to create a subdirectory to hold the next
series of evaluations.
:type ga_obj: GSimpleGA.GSimpleGA
:param ga_obj: the entire current state of the genetic
optimization
"""
# the dictionary accessed with an individual's setParams method
# is shared by all individuals in the population and so it would
# seem that you only need to set it for one individual and it would
# be set for all, this is the behavior when run in serial,
# however, the other individuals appear to not be updated
# in this way when running in parallel, so in general we will set it
# for all individuals
next_generation = ga_obj.getCurrentGeneration() + 1
for indiv in ga_obj.getPopulation():
indiv.setParams(generation=next_generation)
os.mkdir(os.path.join(EVALUATOR_JOBS_DIR, GENER_SUBDIR + \
str(next_generation)))
[docs]def manage_skips_callback(ga_obj):
"""
Callback to manage skips in the evaluation.
:type ga_obj: GSimpleGA.GSimpleGA
:param ga_obj: the entire current state of the genetic
optimization
"""
skips = ga_obj.getParam('skips')
generation = ga_obj.getCurrentGeneration()
skips[generation] = []
population = ga_obj.getPopulation()
for indiv in population:
skip = indiv.skip
if skip:
skips[generation].append(skip)
ga_obj.setParams(skips=skips)
[docs]def manage_failures_callback(ga_obj):
"""
Callback to manage failures in the evaluation.
:type ga_obj: GSimpleGA.GSimpleGA
:param ga_obj: the entire current state of the genetic
optimization
"""
jobbe = ga_obj.getParam('jobbe')
failures = ga_obj.getParam('failures')
generation = ga_obj.getCurrentGeneration()
failures[generation] = []
# copy subjob directories of failed jobs back to the launch host
population = ga_obj.getPopulation()
for indiv in population:
failure = indiv.failure
if failure:
failures[generation].append(failure)
if jobbe:
basename = indiv.infile.split(IN_MAE_EXT)[0]
apath = os.path.join(EVALUATOR_JOBS_DIR, GENER_SUBDIR + \
str(generation), basename)
jobbe.copyOutputFile(apath)
ga_obj.setParams(failures=failures)
if len(failures[generation]) == len(population):
logger = ga_obj.getParam('logger')
if generation:
logger.info('')
print_bad_jobs(failures, logger, bad_type='fail')
EXIT_MSG = ('All subjobs from generation %s have failed. '
'This indicates that something is wrong with the '
'provided input or job control settings. The subjob '
'directories of failed jobs are automatically copied '
'back to the launch host and should help isolate the '
'problem. This script will now exit.')
logger.error(EXIT_MSG % generation)
sys.exit(EXIT_MSG % generation)
[docs]def logging_summary_callback(ga_obj):
"""
Callback to log progress.
:type ga_obj: GSimpleGA.GSimpleGA
:param ga_obj: the entire current state of the genetic
optimization
"""
logger = ga_obj.getParam('logger')
if not logger:
return
def get_minmaxavgstdbest(key):
scores = []
for indiv in pop:
# if this individual has been marked as a failure then
# do not summarize any structure-based or physics-based
# properties, regardless of which caused the failure
if not indiv.failure:
score = indiv.astructure.property.get(key)
# the next two lines are needed to allow summarizing
# structure-based properties of skipped individuals,
# which are never simultaneously marked as failures,
# but do not have data for physics-based properties
if score is None:
continue
scores.append(score)
if not scores:
return None
min_score = min(scores)
max_score = max(scores)
avg_score = numpy.mean(numpy.array(scores))
std_score = numpy.std(numpy.array(scores))
best_score = pop.bestRaw().astructure.property[key]
return [min_score, max_score, avg_score, std_score, best_score]
def print_header_one():
HEADER = 'Summary'
logger.info(HEADER)
EMPTY_TAG = '-' * FIELD_WIDTH
all_loggables = [EMPTY_TAG] * 4
for name_and_units in prop_statistics_dict:
loggable = name_and_units.center(FIELD_WIDTH * 5, '-')
loggable = '|' + loggable[1:]
all_loggables.append(loggable)
EMPTY_TAG = '|' + EMPTY_TAG[1:]
all_loggables.append(EMPTY_TAG)
logger.info('%s' * len(all_loggables) % tuple(all_loggables))
def print_header_two():
GEN_TAG = 'Gen.'.ljust(FIELD_WIDTH)
PERCENT_TAG = '% Comp.'.rjust(FIELD_WIDTH)
NUM_SKIP_TAG = 'Skipped'.rjust(FIELD_WIDTH)
NUM_FAIL_TAG = 'Failed'.rjust(FIELD_WIDTH)
MIN_TAG = 'Min'.rjust(FIELD_WIDTH)
MAX_TAG = 'Max'.rjust(FIELD_WIDTH)
AVG_TAG = 'Avg'.rjust(FIELD_WIDTH)
STD_TAG = 'Std'.rjust(FIELD_WIDTH)
BEST_TAG = 'Best'.rjust(FIELD_WIDTH)
TIME_TAG = 'Time/s'.rjust(FIELD_WIDTH)
all_loggables = [GEN_TAG, PERCENT_TAG, NUM_SKIP_TAG, NUM_FAIL_TAG]
all_loggables.extend([MIN_TAG, MAX_TAG, AVG_TAG, STD_TAG, \
BEST_TAG] * len(prop_statistics_dict))
all_loggables.append(TIME_TAG)
HEADER = '%s' * len(all_loggables) % tuple(all_loggables)
logger.info(HEADER)
logger.info('-' * len(HEADER))
# collect by optimizable property a dictionary containing
# the min, max, avg, and std of that property over the
# current generation, also collect the value of that property
# for the current best individual (best defined in the context
# of all properties considered)
pop = ga_obj.getPopulation()
prop_statistics_dict = {}
for aproperty in ga_obj.getParam('properties'):
if aproperty.summarize:
results = get_minmaxavgstdbest(aproperty.key)
if results:
results = list(map(get_loggable_float, results))
else:
results = ['N/A'.rjust(FIELD_WIDTH)] * 5
aname = aproperty.name
if aname == StructureEvaluator.SMARTS_PROP:
aname += '_%s' % aproperty.index
if aproperty.units:
name_and_units = aname + '/' + aproperty.units
else:
name_and_units = aname
prop_statistics_dict[name_and_units] = results
# if it is the first generation then log a header including
# boundaries to separate the descriptors for the different
# sought properties and name the columns
generation = ga_obj.getCurrentGeneration()
if generation == 0:
print_header_one()
print_header_two()
# get string representations of some quantities to log
percent_done = 100 * float(generation) / float(ga_obj.getGenerations())
percent_done = NUM_DECIMAL % percent_done
percent_done = str(percent_done + '%').rjust(FIELD_WIDTH)
atime = get_loggable_float(ga_obj.printTimeElapsed())
skips = ga_obj.getParam('skips')
num_skips = len(skips[generation])
num_skips = str(num_skips).rjust(FIELD_WIDTH)
failures = ga_obj.getParam('failures')
num_failures = len(failures[generation])
num_failures = str(num_failures).rjust(FIELD_WIDTH)
generation = str(generation).ljust(FIELD_WIDTH)
# for every generation log the important data
all_loggables = [generation, percent_done, num_skips, num_failures]
for minmaxavgstdbest in prop_statistics_dict.values():
all_loggables.extend(minmaxavgstdbest)
all_loggables.append(atime)
logger.info('%s' * len(all_loggables) % tuple(all_loggables))
[docs]def molecule_history_callback(ga_obj):
"""
Callback to append all structures from all generations to
individual log files.
:type ga_obj: GSimpleGA.GSimpleGA
:param ga_obj: the entire current state of the genetic
optimization
"""
jobbe = ga_obj.getParam('jobbe')
subjob_data_exists = os.path.exists(EVALUATOR_JOBS_DIR)
generation = ga_obj.getCurrentGeneration()
generation_log_file_name = \
get_generation_log_file_name(ga_obj.getParam('file_base_name'), generation)
population = ga_obj.getPopulation()
molecule_history_writer = structure.MaestroWriter(generation_log_file_name)
for index, individual in enumerate(population, 1):
if PREVIOUS in individual.getParam('freezer_files'):
individual.addPreviousFreezerFile(generation_log_file_name)
individual.updateStructureProperties(index, generation)
molecule_history_writer.append(individual.astructure)
individual.resetParentProperties()
if jobbe and subjob_data_exists and not individual.skip and \
not individual.failure:
basename = individual.infile.split(IN_MAE_EXT)[0]
apath = os.path.join(EVALUATOR_JOBS_DIR, GENER_SUBDIR + \
str(generation), basename)
for content in os.listdir(apath):
root, ext = fileutils.splitext(content)
if os.path.isfile(os.path.join(apath, content)) and \
ext in EXTS_TO_RETURN:
shutil.copy(os.path.join(apath, content), os.getcwd())
jobbe.copyOutputFile(content)
molecule_history_writer.close()
if jobbe:
jobbe.copyOutputFile(generation_log_file_name)
# terminators
[docs]def first_property(ga_obj):
"""
Terminate when the first property has been matched.
:type ga_obj: GSimpleGA.GSimpleGA
:param ga_obj: the entire current state of the genetic
optimization
:rtype: bool
:return: True to terminate, False otherwise
"""
logger = ga_obj.getParam('logger')
# loop over the current values for the sought properties and for the first
# one that meets the specified criterion exit
for aproperty in ga_obj.getParam('properties'):
key, target, error, name, comparator, units = aproperty.key, aproperty.target, \
aproperty.error, aproperty.name, aproperty.comparator, aproperty.units
if units is None:
units = ''
current = ga_obj.bestIndividual().astructure.property.get(key)
if current is None:
continue
TERM_MSG = ''
if comparator == Property.EQUALS:
if target - error <= current <= target + error:
TERM_MSG = ('The first property has been matched and so evolution '
'will terminate, i.e. the current value of %s of %.2f%s is within %.2f%s '
'of the target value of %.2f%s.') % (name, current, units, error, \
units, target, units)
elif comparator == Property.GREATER_THAN:
if current > target:
TERM_MSG = (
'The first property has been matched and so evolution '
'will terminate, i.e. the current value of %s of %.2f%s is greater than '
'the target value of %.2f%s.') % (name, current, units,
target, units)
elif comparator == Property.LESS_THAN:
if current < target:
TERM_MSG = (
'The first property has been matched and so evolution '
'will terminate, i.e. the current value of %s of %.2f%s is less than '
'the target value of %.2f%s.') % (name, current, units,
target, units)
if TERM_MSG:
logger.info('')
logger.info(TERM_MSG)
logger.info('')
ga_obj.setParams(terminated=True)
return True
# if we didn't meet the first criterion and did specify the unproductive
# criterion as a second then run it now
if ga_obj.getParam('num_unproductive'):
return unproductive(ga_obj)
[docs]def all_properties(ga_obj):
"""
Terminate when all properties have been matched.
:type ga_obj: GSimpleGA.GSimpleGA
:param ga_obj: the entire current state of the genetic
optimization
:rtype: bool
:return: True to terminate, False otherwise
"""
# if we have only a single property then call first_property
properties = ga_obj.getParam('properties')
if len(properties) == 1:
return first_property(ga_obj)
attrs = ('key', 'minimax', 'target', 'error', 'comparator')
for prop in properties:
values = map(lambda x: getattr(prop, x), attrs)
key, minimax, target, error, comparator = values
current = ga_obj.bestIndividual().astructure.property.get(key)
if current is None:
break
if minimax in [Property.MIN, Property.MAX]:
break
if comparator == Property.EQUALS and not (target - error <= current <=
target + error):
break
if comparator == Property.GREATER_THAN and current <= target:
break
if comparator == Property.LESS_THAN and current >= target:
break
else:
logger = ga_obj.getParam('logger')
logger.info('')
logger.info('All properties have been matched and so evolution '
'will terminate.')
logger.info('')
ga_obj.setParams(terminated=True)
return True
if ga_obj.getParam('num_unproductive'):
return unproductive(ga_obj)
[docs]def unproductive(ga_obj):
"""
Terminate if the maximum number of unproductive generations has
been reached.
:type ga_obj: GSimpleGA.GSimpleGA
:param ga_obj: the entire current state of the genetic
optimization
:rtype: bool
:return: True to terminate, False otherwise
"""
logger = ga_obj.getParam('logger')
num_unproductive = ga_obj.getParam('num_unproductive')
TERM_MSG = (
'The maximum number of %s unproductive generations has been reached '
'and so evolution will terminate.')
current_best = ga_obj.bestIndividual().getRawScore()
# if generation zero then store the current best and start a counter for the
# number of unproductive generations, if not generation zero then if the
# current and previous are different then reset the counter, otherwise
# increment the counter until it reaches the exit condition
if ga_obj.getCurrentGeneration() == 0:
ga_obj.setParams(prev_gener_best=current_best, unproductive_so_far=0)
else:
previous_best = ga_obj.getParam('prev_gener_best')
if abs(current_best - previous_best) < TERM_THRESH:
unproductive_so_far = ga_obj.getParam('unproductive_so_far') + 1
ga_obj.setParams(unproductive_so_far=unproductive_so_far)
if unproductive_so_far == num_unproductive:
logger.info('')
logger.info(TERM_MSG % num_unproductive)
logger.info('')
ga_obj.setParams(terminated=True)
return True
else:
ga_obj.setParams(prev_gener_best=current_best,
unproductive_so_far=0)
[docs]class Skip:
"""
Manage a skip.
"""
[docs] def __init__(self, genome, msg):
"""
Create an instance.
:type genome: StructureGenome
:param genome: a genome
:type msg: str
:param msg: a message
"""
self.title = genome.astructure.title
self.msg = msg
[docs] def updateTitle(self, title):
"""
Update the title of this job.
:type title: str
:param title: a title to use for reference
"""
self.title = title
[docs]class Failure(Skip):
"""
Manage a failure.
"""
[docs]def print_bad_jobs(all_bad_jobs, logger, bad_type='skip'):
"""
Log bad jobs, i.e. skips and failures.
:type all_bad_jobs: dict
:param all_bad_jobs: a collection of bad subjobs,
keys are genetic optimization generation and values
are a list of Skip or Failure objects for bad subjobs
:type logger: logging.Logger
:param logger: output logger
:type bad_type: str
:param bad_type: specifies either 'skip' or 'fail' type
"""
FIELD_WIDTH = 15
if bad_type == 'skip':
HEADER = 'Skipped'
elif bad_type == 'fail':
HEADER = 'Failures'
logger.info(HEADER)
logger.info('-' * len(HEADER))
index_tag = 'index'.ljust(FIELD_WIDTH)
generation_tag = 'generation'.ljust(FIELD_WIDTH)
title_tag = 'title'.ljust(FIELD_WIDTH + 10)
bad_tag = 'msg'.ljust(FIELD_WIDTH + 50)
HEADER = '%s %s %s %s' % (index_tag, generation_tag, title_tag, bad_tag)
logger.info(HEADER)
logger.info('-' * len(HEADER))
index = 1
for generation, bad_jobs in all_bad_jobs.items():
generation_str = str(generation).ljust(FIELD_WIDTH)
for bad_job in bad_jobs:
index_str = str(index).ljust(FIELD_WIDTH)
title_str = bad_job.title.ljust(FIELD_WIDTH + 10)
bad_job_str = bad_job.msg.ljust(FIELD_WIDTH + 50)
logger.info('%s %s %s %s' % (index_str, generation_str, \
title_str, bad_job_str))
index += 1
logger.info('')
[docs]class GeneticOptimization:
"""
Manage the genetic optimization.
"""
MSGWIDTH = 80
[docs] def __init__(self,
initial_population,
properties,
structure_score_threshold=STRUCTURE_SCORE_THRESHOLD,
eval_kwargs=DEFAULT_EVAL_KWARGS,
crossovers=None,
mutators=None,
fragment_libs=FRAGMENT_LIBS_DEFAULT,
script_evaluator=None,
generations=GENERATIONS,
population=POPULATION,
crossover_rate=CROSSOVER_RATE,
mutation_rate=MUTATION_RATE,
selection=DEFAULT_SELECTION,
tournament_size=TOURNAMENT_SIZE,
terminators=DEFAULT_TERMS,
num_unproductive=NUM_UNPRODUCTIVE,
scaling=DEFAULT_SCALING,
elitism=ELITISM,
random_seed=RANDOM_SEED,
no_minimize=NO_MINIMIZE,
file_base_name=FILE_BASE_NAME,
no_open_shell=NO_OPEN_SHELL,
props_to_remove=None,
jobbe=None,
conformational_search=CONFORMATIONAL_SEARCH,
freezers=FREEZERS_DEFAULT,
inoculate=INOCULATE_DEFAULT,
class_evaluators=None,
logger=None):
"""
Create an instance.
:type initial_population: list
:param initial_population: the initial population of
schrodinger.structure.Structure
:type properties: list of Property
:param properties: the properties to be optimized, including structural
properties as well as more physical calculable observables
:type structure_score_threshold: float
:param structure_score_threshold: if structure-based properties are being sought
and if the base evaluator will be used then subjobs on structures with structure scores
below this value will not be launched but rather such structures treated as
skips
:type eval_kwargs: dict
:param eval_kwargs: a dictionary that will be available in all evaluator
functions
:type crossovers: list
:param crossovers: contains two-element tuples each of which holds a
crossover operator to be used in the optimization along with a weight
:type mutators: list
:param mutators: contains two-element tuples each of which holds a
mutation operator to be used in the optimization along with a weight
:type fragment_libs: list
:param fragment_libs: strings specifying fragment libraries to
be used, can be either module constants from FRAGMENT_LIBS.keys() (or ALL
if all of those are desired) or the names of Maestro files (including
the file extensions) containing fragments collected by the user
:type script_evaluator: method
:param script_evaluator: the evaluator function to be called to score individuals
during the optimization, takes a StructureGenome and returns a
JobDJ
:type generations: int
:param generations: the number of generations for which to run the optimization
:type population: int
:param population: the population size to use in the optimization, can be
less-than-or-equal-to the length of initial_population
:type crossover_rate: float
:param crossover_rate: the rate of crossover as a percentage
:type mutation_rate: float
:param mutation_rate: the rate of mutation as a percentage
:type selection: str
:param selection: the selection protocol used to select individuals
to the gene pool for the upcoming generation
:type tournament_size: int
:param tournament_size: the size of tournament to use if using tournament
based selection, unused if a tournament based selection is not being used
:type terminators: list
:param terminators: list of strings that specify the termination protocols
to be used to terminate the optimization, typically more than one is specified
only if the unproductive protocol is being used
:type num_unproductive: int
:param num_unproductive: if the unproductive protocol is being used to terminate
the optimization then this integer specifies how many unproductive cycles are
allowed before terminating, unused if a different termination protocol is used
:type scaling: str
:param scaling: specifies the scaling protocol to use, scaling scales the raw
scores of the individuals to produce fitness scores to ease selection in cases
where raw scores are nearly equal
:type elitism: int
:param elitism: specify the number of elite individuals guaranteed to be added
to the gene pool for the upcoming generation, zero disables elitism
:type random_seed: None or int
:param random_seed: the random seed, if None then system time will be used
:type no_minimize: bool
:param no_minimize: specify that the offspring structures generated by the
crossover and mutation operators not be geometry optimized prior to selection
:type file_base_name: str
:param file_base_name: base name to use for output and generation log files
:type no_open_shell: bool
:param no_open_shell: if True then do not allow the processing of open shell
molecules, False otherwise
:type props_to_remove: list
:param props_to_remove: a list of structure property keys to be removed prior
to the evaluation stage
:type jobbe: schrodinger.job.jobcontrol._Backend
:param jobbe: the jobcontrol backend of the driver job
:type conformational_search: bool or str
:param conformational_search: specifies whether a Macromodel conformational
search will be performed prior to evaluation, when a string it specifies
a simplified Macromodel input file containing extra options
:type freezers: list
:param freezers: a collection of freezers containing structures that
are used to swap out individuals from the population
:type inoculate: list
:param inoculate: the list of circumstances under which to use the structure
freezers
:type class_evaluators: dict
:param class_evaluators: keys are the evaluator classes to be called to score
individuals during the optimization, each must inherit ClassEvaluator,
values are lists of Property to be passed to the class evaluator
:type logger: logging.Logger
:param logger: output logger
"""
self.initial_population = initial_population
self.properties = properties
self.structure_score_threshold = structure_score_threshold
self.eval_kwargs = eval_kwargs
if not crossovers:
self.crossovers = [(bond_crossover, 1)]
else:
self.crossovers = crossovers
if not mutators:
self.mutators = \
[(fragment_mutator, 0.5), (isoelectronic_mutator, 0.5)]
else:
self.mutators = mutators
self.fragment_libs = list(map(os.path.basename, fragment_libs))
if not script_evaluator:
self.script_evaluator = optoelectronics_evaluator
else:
self.script_evaluator = script_evaluator
self.generations = generations
self.population = population
self.crossover_rate = crossover_rate
self.mutation_rate = mutation_rate
self.selection = selection
self.tournament_size = tournament_size
self.terminators = terminators
self.num_unproductive = num_unproductive
self.scaling = scaling
self.elitism = elitism
self.random_seed = random_seed
self.no_minimize = no_minimize
self.file_base_name = file_base_name
self.no_open_shell = no_open_shell
self.props_to_remove = props_to_remove
self.jobbe = jobbe
if type(conformational_search) is bool:
self.conformational_search = conformational_search
else:
self.conformational_search = os.path.basename(conformational_search)
self.freezers = freezers
self.inoculate = inoculate
if class_evaluators is None:
class_evaluators = {}
self.class_evaluators = class_evaluators
self.logger = logger
self.pyevolve_random_seed = None
self.initializers = [from_initial_population]
self.skips = {}
self.failures = {}
self.ga_obj = None
self.crossover_names = None
self.mutator_names = None
[docs] def setRootLoggerForPyEvolve(self):
"""
Set up the root logger for PyEvolve.
"""
# can't seem to get pyevolve.logEnable() (in __init__.py) to work so do it by hand,
# PyEvolve code will use the root logger (as well as a few print statements),
# the root logger comes in with a NullHandler so first remove that
if isinstance(logging.root.handlers[0], logging.FileHandler):
logging.root.handlers[0].close()
logging.root.removeHandler(logging.root.handlers[0])
# just use their original format
aformat = '%(asctime)s [%(module)s:%(funcName)s:%(lineno)d] %(levelname)s %(message)s'
# build root logger to use FileHandler
handler = logging.FileHandler(self.file_base_name + PYEVOLVE_LOG_EXT,
mode='w')
formatter = logging.Formatter(aformat)
handler.setFormatter(formatter)
logging.root.addHandler(handler)
logging.root.setLevel(logging.DEBUG)
# log a header
HEADER = 'PyEvolve log'
logging.info(HEADER)
logging.info('-' * len(HEADER))
logging.info('')
[docs] def setOperatorNames(self):
"""
Set the operator names.
"""
self.crossover_names = []
for atuple in self.crossovers:
crossover, weight = atuple
self.crossover_names.append(crossover.__name__)
self.mutator_names = []
for atuple in self.mutators:
mutator, weight = atuple
self.mutator_names.append(mutator.__name__)
[docs] def printProperties(self):
"""
Log the set of sought properties and their details.
"""
HEADER = 'The following properties have been defined:'
prop_strs = []
for property_obj in self.properties:
prop_strs.append(repr(property_obj))
if prop_strs:
self.logger.info(HEADER)
self.logger.info('-' * len(HEADER))
for prop_str in prop_strs:
self.logger.info(prop_str)
self.logger.info('')
[docs] def printParams(self):
"""
Log the parameters.
"""
HEADER = 'Genetic Optimization Parameters'
num_candidate_sts = textlogger.get_param_string(
'Num. candidate structures given', len(self.initial_population),
self.MSGWIDTH)
population = textlogger.get_param_string('Population', self.population,
self.MSGWIDTH)
structure_score_threshold = textlogger.get_param_string(
'Structure score threshold',
get_loggable_float(self.structure_score_threshold).strip(),
self.MSGWIDTH)
generations = textlogger.get_param_string('Generations',
self.generations,
self.MSGWIDTH)
crossovers = textlogger.get_param_string(
'Crossovers', ', '.join(self.crossover_names), self.MSGWIDTH)
crossover_rate = textlogger.get_param_string('Crossover rate',
self.crossover_rate,
self.MSGWIDTH)
mutators = textlogger.get_param_string('Mutators',
', '.join(self.mutator_names),
self.MSGWIDTH)
mutation_rate = textlogger.get_param_string('Mutation rate',
self.mutation_rate,
self.MSGWIDTH)
fragment_libs = textlogger.get_param_string(
'Fragment libraries', ', '.join(self.fragment_libs), self.MSGWIDTH)
elitism = textlogger.get_param_string('Elitism', self.elitism,
self.MSGWIDTH)
no_minimize = textlogger.get_param_string('No minimization',
self.no_minimize,
self.MSGWIDTH)
selection = textlogger.get_param_string('Selection', self.selection,
self.MSGWIDTH)
tournament_size = textlogger.get_param_string('Tournament size',
self.tournament_size,
self.MSGWIDTH)
scaling = textlogger.get_param_string('Scaling', self.scaling,
self.MSGWIDTH)
terminators = textlogger.get_param_string('Terminators',
', '.join(self.terminators),
self.MSGWIDTH)
num_unproductive = textlogger.get_param_string('Num. unproductive',
self.num_unproductive,
self.MSGWIDTH)
evaluators = []
script_props = [
aproperty for aproperty in self.properties
if aproperty.isScriptProperty()
]
if script_props:
evaluators.append(self.script_evaluator.__name__)
if self.class_evaluators:
evaluators.extend(
[aclass.__name__ for aclass in self.class_evaluators])
evaluators = ', '.join(evaluators)
evaluators = textlogger.get_param_string('Evaluators', evaluators,
self.MSGWIDTH)
n_procs = jobutils.get_procs()
n_molecules = textlogger.get_param_string('# simultaneous molecules',
n_procs, self.MSGWIDTH)
n_sim_subjobs = textlogger.get_param_string(
'# simultaneous subjobs per molecule', n_procs, self.MSGWIDTH)
tpp = self.eval_kwargs.get(parserutils.FLAG_TPP)
if tpp is None:
tpp = 1
tpp = textlogger.get_param_string('# threads per relevant subjob', tpp,
self.MSGWIDTH)
base_name = textlogger.get_param_string('File base name',
self.file_base_name,
self.MSGWIDTH)
no_open_shells = textlogger.get_param_string('No open shells',
self.no_open_shell,
self.MSGWIDTH)
conformational_search = textlogger.get_param_string('Conformational search', \
self.conformational_search, self.MSGWIDTH)
freezers = textlogger.get_param_string('Structure freezers', \
', '.join(self.freezers), self.MSGWIDTH)
inoculate = textlogger.get_param_string('Inoculate, i.e. draw from freezer, if', \
', '.join(self.inoculate), self.MSGWIDTH)
self.logger.info(HEADER)
self.logger.info('-' * len(HEADER))
self.logger.info(num_candidate_sts)
self.logger.info(population)
self.logger.info(structure_score_threshold)
self.logger.info(generations)
self.logger.info(crossovers)
self.logger.info(crossover_rate)
self.logger.info(mutators)
self.logger.info(mutation_rate)
self.logger.info(fragment_libs)
self.logger.info(elitism)
self.logger.info(no_minimize)
self.logger.info(selection)
self.logger.info(tournament_size)
self.logger.info(scaling)
self.logger.info(terminators)
self.logger.info(num_unproductive)
self.logger.info(evaluators)
self.logger.info(n_molecules)
self.logger.info(n_sim_subjobs)
self.logger.info(tpp)
self.logger.info(base_name)
self.logger.info(no_open_shells)
self.logger.info(conformational_search)
self.logger.info(freezers)
self.logger.info(inoculate)
self.logger.info('')
[docs] def initializeGenome(self):
"""
Initialize a genome.
:rtype: StructureGenome
:return: a genome
"""
genome = StructureGenome()
for initializator in self.initializers:
genome.initializator.add(initializator)
for atuple in self.crossovers:
crossover, weight = atuple
genome.crossover.add(crossover, weight)
for atuple in self.mutators:
mutator, weight = atuple
genome.mutator.add(mutator, weight)
if len(self.crossovers) > 1:
genome.crossover.setRandomApply(True)
if len(self.mutators) > 1:
genome.mutator.setRandomApply(True)
genome.evaluator.set(base_evaluator)
# set up freezer files
freezer_files = {}
if REMAINDER in self.freezers:
nremainder = len(self.initial_population) - self.population
my_random.shuffle(self.initial_population)
remainder_file = self.file_base_name + REMAINDER_FILE_EXT
awriter = structure.MaestroWriter(remainder_file)
for astructure in self.initial_population[-1 * nremainder:]:
awriter.append(astructure)
awriter.close()
freezer_files[REMAINDER] = [remainder_file]
self.initial_population = self.initial_population[:-1 * nremainder]
if FRAGMENT in self.freezers:
freezer_files[FRAGMENT] = self.fragment_libs
if PREVIOUS in self.freezers:
freezer_files[PREVIOUS] = []
# use genome.setParams for parameters shared by all of the genomes in
# a population, use an attribute for a property of that specific individual
genome.setParams(
initial_population=self.initial_population,
script_evaluator=self.script_evaluator,
eval_kwargs=self.eval_kwargs,
generation=0,
properties=self.properties,
no_minimize=self.no_minimize,
fragment_libs=self.fragment_libs,
no_open_shell=self.no_open_shell,
props_to_remove=self.props_to_remove,
freezer_files=freezer_files,
conformational_search=self.conformational_search,
structure_score_threshold=self.structure_score_threshold,
inoculate=self.inoculate,
class_evaluators=self.class_evaluators)
return genome
[docs] def initializeGA(self, genome):
"""
Initialize the genetic optimization.
:type genome: StructureGenome
:param genome: a genome
"""
self.ga_obj = GSimpleGA.GSimpleGA(genome,
seed=self.pyevolve_random_seed,
interactiveMode=False)
self.ga_obj.setParams(skips=self.skips,
failures=self.failures,
logger=self.logger,
properties=self.properties,
num_unproductive=self.num_unproductive,
terminated=False,
file_base_name=self.file_base_name,
jobbe=self.jobbe)
self.ga_obj.setGenerations(self.generations)
self.ga_obj.setPopulationSize(self.population)
self.ga_obj.setCrossoverRate(float(self.crossover_rate) / 100)
self.ga_obj.setMutationRate(float(self.mutation_rate) / 100)
# set selection protocol
self.ga_obj.selector.set(SELECTION_DICT[self.selection])
if MAX_GENERATIONS_TERM in self.terminators:
pass
elif ALL_PROPERTIES_TERM in self.terminators:
self.ga_obj.terminationCriteria.set(all_properties)
elif FIRST_PROPERTY_TERM in self.terminators:
self.ga_obj.terminationCriteria.set(first_property)
elif UNPRODUCTIVE_TERM in self.terminators:
self.ga_obj.terminationCriteria.set(unproductive)
self.ga_obj.setElitism(bool(self.elitism))
if self.elitism:
self.ga_obj.setElitismReplacement(self.elitism)
self.ga_obj.setSortType(Consts.sortType['scaled'])
self.ga_obj.setMinimax(Consts.minimaxType['maximize'])
# set full copy to True because in general the structure
# object of our genome will change inside the evaluator function
n_sim_subjobs = jobutils.get_procs()
self.ga_obj.setMultiProcessing(flag=True,
full_copy=True,
max_processes=n_sim_subjobs)
# set up callbacks
self.ga_obj.stepCallback.set(uniquify_titles_callback)
self.ga_obj.stepCallback.add(prepare_next_generation_dirs_callback)
self.ga_obj.stepCallback.add(manage_skips_callback)
self.ga_obj.stepCallback.add(manage_failures_callback)
self.ga_obj.stepCallback.add(logging_summary_callback)
self.ga_obj.stepCallback.add(molecule_history_callback)
# get initial population object
pop = self.ga_obj.getPopulation()
# set tournament size parameter on initial population
pop.setParams(tournamentPool=self.tournament_size)
# set scaling protocol on initial population
pop.scaleMethod.set(SCALING_DICT[self.scaling])
[docs] def setMonomerGrowAtoms(self):
"""
Set the monomer grow atoms using the mark monomer module convention
rather than the polymer builder module convention.
"""
# see MATSCI-6080 where for density used in refractive index monomers
# marked using the mark monomer module convention are needed, just make
# this the standard for monomer recognition in this module
# currently the treatment of polymer monomers in this module does
# not distinguish between head and tail atoms but rather simply
# considers both as a general grow atom so just arbitrarily update
# using the head property
for st in self.initial_population:
for idx in rxn_channel.get_grow_idxs(st):
atom = st.atom[idx]
if rxn_channel.is_polymer_builder_grow_atom(atom) and \
not rxn_channel.is_mark_monomer_grow_atom(atom):
atom.property[msprops.ROLE_PROP] = msprops.HEAD
[docs] def runIt(self):
"""
Run the components of the genetic optimization.
"""
self.setRootLoggerForPyEvolve()
self.setOperatorNames()
self.checkInputParams()
self.setMonomerGrowAtoms()
self.random_seed = rxn_channel.init_random_seed(self.random_seed,
self.logger)
my_random.seed(self.random_seed)
self.pyevolve_random_seed = my_random.randint(RANDOM_INT_BOUND + 1)
if self.logger:
self.printProperties()
self.printParams()
genome = self.initializeGenome()
self.initializeGA(genome)
os.mkdir(EVALUATOR_JOBS_DIR)
os.mkdir(os.path.join(EVALUATOR_JOBS_DIR, GENER_SUBDIR + '0'))
self.ga_obj.evolve(freq_stats=0)
# if the optimization was not terminated in the strict sense
# but has simply run all requested generations then call
# some callbacks by hand
if not self.ga_obj.getParam('terminated'):
uniquify_titles_callback(self.ga_obj)
manage_skips_callback(self.ga_obj)
manage_failures_callback(self.ga_obj)
logging_summary_callback(self.ga_obj)
molecule_history_callback(self.ga_obj)
if self.logger:
self.logger.info('')
# log the skips
self.skips = self.ga_obj.getParam('skips')
total_skips = sum([len(skips) for skips in self.skips.values()])
if total_skips and self.logger:
print_bad_jobs(self.skips, self.logger, bad_type='skip')
# log the failures
self.failures = self.ga_obj.getParam('failures')
total_failures = sum(
[len(failures) for failures in self.failures.values()])
if total_failures and self.logger:
print_bad_jobs(self.failures, self.logger, bad_type='fail')
# create final output file
generation_log_files = []
for index in range(self.ga_obj.getCurrentGeneration() + 1):
afile = get_generation_log_file_name(self.file_base_name, index)
generation_log_files.append(afile)
output_file_name = get_output_file_name(self.file_base_name)
with wam.WorkflowMenuMaestroWriter(
output_file_name,
wam.WorkflowType.MS_GENETIC_OPTIMIZATION) as output_writer:
for astructure in structure.MultiFileStructureReader(
generation_log_files):
jobutils.set_source_path(astructure)
output_writer.append(astructure)
# log that this genetic optimization is complete
if self.logger:
self.logger.info('This genetic optimization is now complete.')
self.logger.info('')
[docs]def get_output_file_name(basename):
"""
Get the output file name from the basename.
:type basename: str
:param basename: base name to use
:rtype: str
:return: output_file_name, name of output file
"""
return basename + OUT_MAE_EXT
[docs]def get_generation_log_file_name(basename, generation):
"""
Get the generation log file name.
:type basename: str
:param basename: base name to use
:type generation: int
:param generation: the generation
:rtype: str
:return: generation_log_file_name, name of generation log file
"""
return basename + GENER_LOG_FILE_SEP + str(generation) + '.mae'
# evaluators
[docs]def get_structure_score(astructure,
properties,
conformational_search,
seed=None,
this_random=None):
"""
Return the structure score for the provided structure.
:type astructure: schrodinger.structure.Structure
:param astructure: the structure to score
:type properties: list of Property
:param properties: the properties used in scoring
:type conformational_search: bool or str
:param conformational_search: specifies whether a Macromodel conformational
search will be performed prior to evaluation, when a string it specifies
a simplified Macromodel input file containing extra options
:type seed: int or None
:param seed: random seed used in conformational search or None
if conformational search is not being done
:type this_random: numpy.random.RandomState or None
:param this_random: random state, if None use the module constant
:rtype: float
:return: the structure score
"""
# the properties will always have the same HOST_STR so just use
# the first property
host_str = properties[0].class_kwargs[ClassEvaluator.HOST_STR]
this_random = this_random or my_random
score = astructure.property.get(STRUCTURE_SCORE_KEY)
if score is not None:
return score
# it is ok to pass on the exception as the input file will be there
# and we are simply marking any failed jobs with a structure property
if conformational_search:
if conformational_search is True:
macromodel_options_file = None
else:
macromodel_options_file = conformational_search
try:
get_low_energy_conformers(
astructure,
macromodel_options_file=macromodel_options_file,
overwrite=True,
seed=seed,
this_random=this_random,
host_str=host_str)
except ValueError as msg:
pass
obj = StructureEvaluator([astructure], properties)
obj.runIt()
score = 0
for aproperty in properties:
if aproperty.name not in StructureEvaluator.PROPERTIES:
continue
indiv_score = astructure.property[aproperty.key]
if aproperty.minimax == Property.MAX:
score += aproperty.weight * indiv_score
elif aproperty.minimax == Property.MIN:
score += aproperty.weight * -1 * indiv_score
else:
if aproperty.comparator == Property.EQUALS:
try:
score_tmp = aproperty.weight / abs(indiv_score -
aproperty.target)
except ZeroDivisionError:
score_tmp = INFINITE_SCORE
score += score_tmp
elif aproperty.comparator == Property.GREATER_THAN:
if indiv_score <= aproperty.target:
score += aproperty.weight * (indiv_score - aproperty.target)
elif aproperty.comparator == Property.LESS_THAN:
if indiv_score >= aproperty.target:
score += aproperty.weight * (aproperty.target - indiv_score)
astructure.property[STRUCTURE_SCORE_KEY] = score
return score
[docs]def structure_evaluator(genome, this_random=None):
"""
This is the structure evaulator.
:type genome: StructureGenome
:param genome: a genome
:type this_random: numpy.random.RandomState or None
:param this_random: random state, if None use the module constant
:rtype: float
:return: the score for this individual
"""
this_random = this_random or my_random
conf_seed = get_random_csearch_seed(this_random=this_random)
freezer_seed = this_random.randint(RANDOM_INT_BOUND + 1)
freezer_files = genome.getParam('freezer_files')
crossover_applied = genome.astructure.property[CROSSOVER_APPLIED_KEY]
mutation_applied = genome.astructure.property[MUTATION_APPLIED_KEY]
properties = genome.getParam('properties')
structure_score_threshold = genome.getParam('structure_score_threshold')
# do the conformational search in the evaluator rather than
# at the end of the operators this way it can be done in
# parallel over individuals but only if this genome came
# from an operation, i.e. if this genome came directly
# from the previous generation and skipped the operation,
# either because it was elite or because of the rates of
# operators, then do not perform the conformational search
# for reasons of consistency (for example in the case of an
# elite individual it could change its status due to the
# stochastic nature of the search), include the conformational
# search step in this structure evaluator, as well as the
# base evaluator, in case we implement a structure based
# property that depends on conformation
conformational_search = genome.getParam('conformational_search')
# structure titles coming into here look like either (1)
# <stoichiometry>.<basename_ext> if the structure passed through
# the operators or (2) <stoichiometry>_%d_%d if it did not
# (either because of elitism or the rates of operators)
if genome.astructure.title.count('_'):
conformational_search = False
basename = genome.astructure.title.split('_')[0] + STOICH_BASE_EXT_SEP + \
genome.basename_ext
genome.astructure.title = basename
score = get_structure_score(genome.astructure,
properties,
conformational_search,
seed=conf_seed,
this_random=this_random)
if score < structure_score_threshold and \
BAD_STRUCTURE in genome.getParam('inoculate'):
astructure = get_freezer_structure(
freezer_files,
structure_score_threshold=structure_score_threshold,
properties=properties,
conformational_search=genome.getParam('conformational_search'),
inoculate=BAD_STRUCTURE,
crossover_applied=crossover_applied,
mutation_applied=mutation_applied,
basename_ext=genome.basename_ext,
seed=freezer_seed,
this_random=this_random)
if astructure:
if astructure.property[STRUCTURE_SCORE_KEY] > score:
score = astructure.property[STRUCTURE_SCORE_KEY]
genome.astructure = astructure
return score
[docs]def base_evaluator(genome):
"""
This is the base evaulator used to wrap all other evaluators.
:type genome: StructureGenome
:param genome: a genome
:rtype: float
:return: the score for this individual
"""
# the properties will always have the same HOST_STR so just use
# the first property
host_str = genome.getParam('properties')[0].class_kwargs[
ClassEvaluator.HOST_STR]
this_random = numpy.random.RandomState()
this_random.seed(genome.seed)
class_evaluators = genome.getParam('class_evaluators')
if class_evaluators.get(StructureEvaluator, None):
structure_score = structure_evaluator(genome, this_random=this_random)
else:
structure_score = None
structure_score_threshold = genome.getParam('structure_score_threshold')
conf_seed = get_random_csearch_seed(this_random=this_random)
# structure titles coming into here look like either (1)
# <stoichiometry>.<basename_ext> if the structure passed through
# the operators or (2) <stoichiometry>_%d_%d if it did not
# (either because of elitism or the rates of operators)
if genome.astructure.title.count('_'):
operated_on = False
basename = genome.astructure.title.split('_')[0] + STOICH_BASE_EXT_SEP + \
genome.basename_ext
genome.astructure.title = basename
else:
operated_on = True
basename = genome.astructure.title
# do the conformational search in the evaluator rather than
# at the end of the operators this way it can be done in
# parallel over individuals, if the structure_evaluator was
# not first called then get the lowest energy conformer now
# but only if this genome came from an operation, i.e. if
# this genome came directly from the previous generation
# and skipped the operation, either because it was elite
# or because of the rates of operators, then do not perform
# the conformational search for reasons of consistency (for
# example in the case of an elite individual it could change
# its status due to the stochastic nature of the search),
# it is ok to pass on the exception as the input file will be there
# and we are simply marking any failed jobs with a structure property
conformational_search = genome.getParam('conformational_search')
if operated_on and conformational_search and structure_score is None:
if conformational_search is True:
macromodel_options_file = None
else:
macromodel_options_file = conformational_search
try:
get_low_energy_conformers(
genome.astructure,
macromodel_options_file=macromodel_options_file,
overwrite=True,
seed=conf_seed,
this_random=this_random,
host_str=host_str)
except ValueError as msg:
pass
# make subdirectory to hold this evaluation
current_cwd = os.getcwd()
generation = genome.getParam('generation')
generation_dir = os.path.join(current_cwd, EVALUATOR_JOBS_DIR,
GENER_SUBDIR + str(generation))
tmp_dir = os.path.join(generation_dir, basename)
os.mkdir(tmp_dir)
# put an input Maestro file for this individual in the subdirectory
infile = basename + IN_MAE_EXT
genome.infile = infile
genome.astructure.write(os.path.join(tmp_dir, infile))
# get and call the actual evaluator which returns the
# JobDJ
script_evaluator = genome.getParam('script_evaluator')
script_jobdj = script_evaluator(genome)
# handle skipped jobs
if not (structure_score is None or
structure_score >= structure_score_threshold):
SKIP_MSG = (
'Structure score is %.2f which is less than the %.2f threshold and '
'is therefore being skipped.')
with open(os.path.join(tmp_dir, basename + '-skipped.txt'),
'w') as skip_file:
skip_file.write(SKIP_MSG %
(structure_score, structure_score_threshold))
genome.skip = Skip(
genome, SKIP_MSG % (structure_score, structure_score_threshold))
return BAD_SCORE
# run the script based properties
script_props = [
aproperty for aproperty in genome.getParam('properties')
if aproperty.isScriptProperty()
]
if script_props:
with fileutils.chdir(tmp_dir):
script_jobdj.run()
outfile = os.path.join(tmp_dir, genome.outfile)
if not script_jobdj.isComplete() or script_jobdj.total_failed:
FAIL_MSG = ('Subjob has a bad exit status.')
elif not os.path.exists(outfile):
FAIL_MSG = ('Subjob output summary Maestro file can\'t be found.')
else:
FAIL_MSG = None
if FAIL_MSG:
genome.failure = Failure(genome, FAIL_MSG)
return BAD_SCORE
else:
outfile = os.path.join(tmp_dir, genome.infile)
opt_out_st = structure.Structure.read(outfile)
# run the class based properties
class_props = []
for class_evaluator, properties in class_evaluators.items():
if class_evaluator == StructureEvaluator:
continue
class_props.extend(properties)
obj = class_evaluator([opt_out_st], properties)
try:
with fileutils.chdir(tmp_dir):
obj.runIt()
except RuntimeError as err:
genome.failure = Failure(genome, str(err))
return BAD_SCORE
# process successful job by tabulating the total score or return a failure,
# only consider properties that are not structure properties as those are
# handled in structure_evaluator because they do not require an external
# calculation and are therefore much easier to manage
properties = script_props + class_props
score = 0
if structure_score:
score += structure_score
for aproperty in properties:
key, minimax, target, comparator, weight = aproperty.key, \
aproperty.minimax, aproperty.target, aproperty.comparator, \
aproperty.weight
indiv_score = opt_out_st.property.get(key)
if indiv_score is None:
FAIL_MSG = ('Subjob output summary Maestro file has no key: %s.')
genome.failure = Failure(genome, FAIL_MSG % key)
return BAD_SCORE
else:
# use simple linear and hyperbolic functions
if minimax == Property.MAX:
score += weight * indiv_score
elif minimax == Property.MIN:
score += weight * -1 * indiv_score
else:
if comparator == Property.EQUALS:
try:
score_tmp = weight / abs(indiv_score - target)
except ZeroDivisionError:
score_tmp = INFINITE_SCORE
score += score_tmp
elif comparator == Property.GREATER_THAN:
if indiv_score <= target:
score += weight * (indiv_score - target)
elif comparator == Property.LESS_THAN:
if indiv_score >= target:
score += weight * (target - indiv_score)
# this is how the genome is changed during evaluation
genome.astructure = opt_out_st
return score
[docs]def optoelectronics_evaluator(genome):
"""
Run an optoelectronics job.
:type genome: StructureGenome
:param genome: a genome
:rtype: JobDJ
:return: the JobDJ object for this individual,
it is run in the base evaluator
"""
# the properties will always have the same HOST_STR so just use
# the first property
host_str = genome.getParam('properties')[0].class_kwargs[
ClassEvaluator.HOST_STR]
# handle kwargs and input and output files
eval_kwargs = genome.getParam('eval_kwargs')
infile = genome.infile
basename = infile.split(IN_MAE_EXT)[0]
outfile = basename + OUT_MAE_EXT
genome.outfile = outfile
# build args containing user specified kwargs plus files, for
# files shared by all sub-jobs just use a relative path (always
# three sub-dirs deep) to the top-level directory
args = []
for key, value in eval_kwargs.items():
args.append(key)
if key == '-settings':
path = EVALUATOR_RELATIVE_DIR + [value]
args.append(os.path.join(*path))
else:
args.extend(value.split(' '))
args.extend([infile, outfile])
# build the actual JobDJ object for this individual,
# the actual evaluation is run on the same node as the driver and are
# run from this individuals subdirectory
PATH_ARGS = [
'..', '..', 'python', 'scripts', 'optoelectronics_gui_dir',
'optoelectronics_driver.py'
]
SCRIPTPATH = hunt('mmshare', dir='exec')
SCRIPTPATH = os.path.join(SCRIPTPATH, *PATH_ARGS)
cmd = [SCRIPTPATH] + args
job_dj = jobutils.create_queue(host=host_str)
jc_job = jobutils.RobustSubmissionJob(cmd, name=basename)
job_dj.addJob(jc_job)
return job_dj
[docs]def structure_is_open_shell(astructure, ignore_charge=True):
"""
Return True if the provided structure is open shell,
i.e. has an odd number of electrons.
:type astructure: schrodinger.structure.Structure
:param astructure: the structure in question
:type ignore_charge: bool
:param ignore_charge: if True then ignore any
structure.formal_charge settings
:rtype: bool
:return: True if the provided structure is open shell,
False otherwise
"""
nelectrons = sum([aatom.atomic_number for aatom in astructure.atom])
if not ignore_charge:
nelectrons -= astructure.formal_charge
return bool(nelectrons % 2)
[docs]def get_element_histogram(astructure):
"""
Return a dictionary where keys are elements and values
are the numbers of atoms of a given element.
:type astructure: schrodinger.structure.Structure
:param astructure: the structure in question
:rtype: dict
:return: dictionary with element histogram, keys are elements
(strs) and values are numbers (ints)
"""
pattern = re.compile(r'([A-Z][a-z]?)(\d*)')
formula = analyze.generate_molecular_formula(astructure)
matches = re.findall(pattern, formula)
elements, numbers = list(zip(*matches))
numbers = list(numbers)
while '' in numbers:
numbers[numbers.index('')] = '1'
numbers = tuple([int(number) for number in numbers])
return dict(zip(elements, numbers))
[docs]def remove_basename_ext(stoich_ext):
"""
Remove the basename extension from the given string and return
the remainder which is the stoichiometry. Do this instead of
having to recompute the stoichiometry which can be expensive.
:type stoich_ext: str
:param stoich_ext: contains the stoichiometry and basename extension
:rtype: str
:return: stoichiometry
"""
return stoich_ext.split(STOICH_BASE_EXT_SEP)[0]
[docs]def get_random_structure(structure_libs,
tries_from_libs=TRIES_FROM_LIBS,
structure_score_threshold=None,
properties=None,
conformational_search=CONFORMATIONAL_SEARCH,
seed=None,
this_random=None):
"""
From the given dictionary of libraries return a random structure.
:type structure_libs: dict
:param structure_libs: keys are strings specifying the types of libraries
to be used and can be module constants from FREEZER_CHOICES.keys(), values
are lists of libraries by type and can be either module constants from
FRAGMENT_LIBS.keys(), ALL, or the names of Maestro files (including the
file extensions)
:type tries_from_libs: int
:param tries_from_libs: the number of times to try before giving up
:type structure_score_threshold: float or None
:param structure_score_threshold: specifies that a structure with a
structure score greater-than-or-equal-to this threshold is sought,
the best of the considered structures will be returned and will
contain several structure properties related to the scoring
:type properties: list of Property or None
:param properties: the properties used in structure scoring
:type conformational_search: bool or str
:param conformational_search: specifies whether a Macromodel conformational
search will be performed prior to evaluation, when a string it specifies
a simplified Macromodel input file containing extra options
:type seed: int or None
:param seed: if not None specifies that random should be reseeded with
the given value
:type this_random: numpy.random.RandomState or None
:param this_random: random state, if None use the module constant
:rtype: schrodinger.structure.Structure or None
:return: the random structure or None if one couldn't be found
"""
this_random = this_random or my_random
if seed is not None:
this_random.seed(seed)
structure_libs_copy = structure_libs.copy()
if FRAGMENT in structure_libs_copy:
fragments = structure_libs_copy[FRAGMENT]
if ALL in fragments:
fragments = set(fragments)
fragments.update(set(FRAGMENT_LIBS))
fragments.remove(ALL)
fragments = list(fragments)
fragments.sort()
structure_libs_copy[FRAGMENT] = fragments
# if we are on the zeroth generation then we have no previous
# structures and if that was the only freezer requested then
# we will not be able to find a structure
if PREVIOUS in structure_libs_copy:
if not structure_libs_copy[PREVIOUS]:
del structure_libs_copy[PREVIOUS]
if not structure_libs_copy:
return
num_unproductive = 0
found = False
structures_and_scores = []
while not found:
if num_unproductive == tries_from_libs:
if structures_and_scores:
best_structure, best_score = \
sorted(structures_and_scores, key=lambda x: x[1]).pop()
else:
best_structure = None
return best_structure
# the fragment library has not been checked so do that on-the-fly,
# we are ignoring genome.getParam('no_open_shell') here because that
# is not exposed and always True
structure_lib_type = this_random.choice(list(structure_libs_copy))
if structure_lib_type == FRAGMENT:
closed_shell = simple_bonds = True
else:
closed_shell = simple_bonds = False
structure_lib = this_random.choice(
structure_libs_copy[structure_lib_type])
# if the library type is fragment then the structure_lib can be
# a key from FRAGMENT_LIBS.keys() rather than an explicit file name
structure_file = FRAGMENT_LIBS.get(structure_lib)
if not structure_file:
structure_file = structure_lib
num_structures = structure.count_structures(structure_file)
index = this_random.randint(1, high=num_structures + 1)
astructure = structure.Structure.read(structure_file, index=index)
# see MATSCI-7256 where a fragment library may contain molecules that
# feature dummy atoms used as attachment site markers
for atom in astructure.atom:
if atom.atomic_number == -2 and atom.bond_total == 1:
atom.atomic_number = 1
if closed_shell:
if structure_is_open_shell(astructure, ignore_charge=True):
num_unproductive += 1
continue
if simple_bonds:
if not get_num_simple_bonds(astructure):
num_unproductive += 1
continue
# set some properties, remove grow names and PDB atom and
# residue names that may be present in database structures
# (this is just to prevent the auto-labeling when importing
# structures into Maestro)
astructure.property[FROM_FREEZER_KEY] = structure_lib_type
astructure.property[CROSSOVER_PARENTS_KEY] = ''
astructure.property[MUTATION_PARENT_KEY] = ''
astructure.property[CROSSOVER_APPLIED_KEY] = ''
astructure.property[MUTATION_APPLIED_KEY] = ''
if structure_lib_type == FRAGMENT:
for aatom in astructure.atom:
aatom.property[GROW_NAME_KEY] = ''
aatom.property[PDB_ATOM_NAME_KEY] = ''
aatom.property[PDB_RES_NAME_KEY] = ''
if structure_score_threshold is not None:
set_title_to_stoichiometry(astructure)
conf_seed = get_random_csearch_seed(this_random=this_random)
structure_score = get_structure_score(astructure,
properties,
conformational_search,
seed=conf_seed,
this_random=this_random)
structures_and_scores.append((astructure, structure_score))
if structure_score < structure_score_threshold:
num_unproductive += 1
continue
found = True
return astructure
[docs]def get_freezer_structure(structure_libs,
tries_from_libs=TRIES_FROM_LIBS,
structure_score_threshold=None,
properties=None,
conformational_search=CONFORMATIONAL_SEARCH,
inoculate=NO_CHILD,
crossover_applied=None,
mutation_applied=None,
basename_ext=None,
seed=None,
this_random=None):
"""
Return a random structure from the freezer and update that structure's
properties.
:type structure_libs: dict
:param structure_libs: keys are strings specifying the types of libraries
to be used and can be module constants from FREEZER_CHOICES.keys(), values
are lists of libraries by type and can be either module constants from
FRAGMENT_LIBS.keys(), ALL, or the names of Maestro files (including the
file extensions)
:type tries_from_libs: int
:param tries_from_libs: the number of times to try before giving up
:type structure_score_threshold: float or None
:param structure_score_threshold: specifies that a structure with a
structure score greater-than-or-equal-to this threshold is sought,
the best of the considered structures will be returned and will
contain several structure properties related to the scoring
:type properties: list of Property or None
:param properties: the properties used in structure scoring
:type conformational_search: bool or str
:param conformational_search: specifies whether a Macromodel conformational
search will be performed prior to evaluation, when a string it specifies
a simplified Macromodel input file containing extra options
:type inoculate: str
:param inoculate: specify the reason for drawing from the freezer, which
is an inoculate option from INOCULATE_CHOICES
:type crossover_applied: str or None
:param crossover_applied: specify the intended crossover operator or None
if there isn't to be one
:type mutation_applied: str or None
:param mutation_applied: specify the intended mutation operator or None
if there isn't to be one
:type basename_ext: str or None
:param basename_ext: specify an extension to append to the stoichiometry
which is used to set the title of the returned structure
:type seed: int or None
:param seed: if not None specifies that random should be reseeded with
the given value
:type this_random: numpy.random.RandomState or None
:param this_random: random state, if None use the module constant
:rtype: schrodinger.structure.Structure or None
:return: the random structure or None if one couldn't be found
"""
this_random = this_random or my_random
astructure = get_random_structure(
structure_libs,
tries_from_libs=tries_from_libs,
structure_score_threshold=structure_score_threshold,
properties=properties,
conformational_search=conformational_search,
seed=seed,
this_random=this_random)
if not astructure:
return
astructure.property[INOCULATE_KEY] = inoculate
if crossover_applied:
astructure.property[CROSSOVER_APPLIED_KEY] = crossover_applied
if mutation_applied:
astructure.property[MUTATION_APPLIED_KEY] = mutation_applied
if basename_ext is None:
basename_ext = ''
set_title_to_stoichiometry(astructure, toappend=basename_ext)
return astructure