"""
Classes and functions for finding MECPs.
Copyright Schrodinger, LLC. All rights reserved."""
import argparse
import math
import os
import random
from collections import OrderedDict
from past.utils import old_div
import numpy
import scipy.constants
import scipy.optimize
from schrodinger import structure
from schrodinger.application.jaguar.input import JaguarInput
from schrodinger.application.jaguar.output import JaguarOutput
from schrodinger.application.jaguar.output import restart_name
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import jaguarworkflows
from schrodinger.application.matsci import msutils
from schrodinger.application.matsci import textlogger
from schrodinger.job import jobcontrol
from schrodinger.job import queue
from schrodinger.structutils import transform
from schrodinger.utils import cmdline
DEFAULT_CHARGE_1 = 0
DEFAULT_CHARGE_2 = 0
DEFAULT_MULTIP_1 = 3
DEFAULT_MULTIP_2 = 1
DEFAULT_STATE_1 = 0
DEFAULT_STATE_2 = 1
DEFAULT_DATA = [(DEFAULT_CHARGE_1, DEFAULT_MULTIP_1, DEFAULT_STATE_1), \
(DEFAULT_CHARGE_2, DEFAULT_MULTIP_2, DEFAULT_STATE_2)]
DEFAULT_SCF_GS = False
DEFAULT_GS_KWARGS = OrderedDict([('iuhf', 2), ('dftname', 'B3LYP'),
('basis', 'MIDIX'), ('isymm', 0),
('maxit', 100), ('nofail', 0), ('nops', 1),
('iacc', 1)])
DEFAULT_ES_KWARGS = OrderedDict([('itddft', 1), ('itda', 1)])
DEFAULT_KWARGS = OrderedDict(DEFAULT_GS_KWARGS)
DEFAULT_KWARGS.update(DEFAULT_ES_KWARGS)
RESERVED_JAGUAR_KEYS = ['multip', 'molchg', 'igeopt', 'itddft', \
'nroot', 'target', 'rsinglet', 'rtriplet']
DEFAULT_JAGUAR_OPTIONS = \
['%s=%s' % pair for pair in DEFAULT_KWARGS.items()]
DEFAULT_ITERATIONS = 50
DEFAULT_EPS = 0.0001
IDENTITY_HESS = 'identity'
DIAG_FWD_FIN_DIFF_GRAD_HESS = 'diag_fwd_fin_diff_grad'
HESS_CHOICES = [IDENTITY_HESS, DIAG_FWD_FIN_DIFF_GRAD_HESS]
DEFAULT_HESS = IDENTITY_HESS
PROJECTION = 'projection'
UPDATING = 'updating'
METHOD_CHOICES = [PROJECTION, UPDATING]
DEFAULT_METHOD = PROJECTION
DEFAULT_PERP_FACTOR = 100.0
DEFAULT_PARA_FACTOR = 1.0
DELTA_ENERGY = 'delta_energy'
MAX_FORCE = 'max_force'
RMS_FORCE = 'rms_force'
MAX_DISPLACEMENT = 'max_displacement'
RMS_DISPLACEMENT = 'rms_displacement'
DEFAULT_DELTA_ENERGY = -1.0E-6
DEFAULT_MAX_FORCE = 1.0E-4
DEFAULT_RMS_FORCE = 1.0E-4
DEFAULT_MAX_DISPLACEMENT = 1.0E-5
DEFAULT_RMS_DISPLACEMENT = 1.0E-5
DEFAULT_CONVERGENCE_PAIRS = [(DELTA_ENERGY, DEFAULT_DELTA_ENERGY),
(MAX_FORCE, DEFAULT_MAX_FORCE),
(RMS_FORCE, DEFAULT_RMS_FORCE),
(MAX_DISPLACEMENT, DEFAULT_MAX_DISPLACEMENT),
(RMS_DISPLACEMENT, DEFAULT_RMS_DISPLACEMENT)]
DEFAULT_CONVERGENCE_DICT = OrderedDict(DEFAULT_CONVERGENCE_PAIRS)
DEFAULT_CONVERGENCE = \
msutils.keyword_dict_to_string(DEFAULT_CONVERGENCE_DICT).split()
INPUT = 'input'
OPT_1 = 'opt_1'
OPT_2 = 'opt_2'
GUESS_GEOM_CHOICES = [INPUT, OPT_1, OPT_2]
DEFAULT_GUESS_GEOM = INPUT
DEFAULT_ALL_GEOMETRIES = False
DEFAULT_ALL_RESULTS = False
DEFAULT_PROPERTIES = False
DEFAULT_BASE_NAME = 'mecp'
DEFAULT_VERBOSE = False
LOCAL_HOST = 'localhost'
DEFAULT_HOST = LOCAL_HOST
DEFAULT_TPP = 1
DEFAULT_NPROC = 2
DEFAULT_NRESOURCES = DEFAULT_TPP * DEFAULT_NPROC
JOB_DJ_RETRIES = 0
MAE_EXT = '.mae'
IN_EXT = '.in'
OUT_EXT = '.out'
ALL_FILE_SUFFIX = '_out_all' + MAE_EXT
OUT_FILE_SUFFIX = '_out' + MAE_EXT
STATE_OPTIMIZATIONS = 'state_optimizations'
MSG_WIDTH = 100
DEFAULT_GEOMETRY_HEADER = 'Geometry / Ang.'
DEFAULT_DISPLACEMENTS_HEADER = 'Displacements / Ang.'
DEFAULT_ENERGY_HEADER = 'Energy / Hartree'
DEFAULT_FORCES_HEADER = 'Forces / Hartree/Ang.'
TITLE_KEY = 's_m_title'
MECP_ENERGY_1_KEY = 'r_matsci_E_MECP(State_1)/Hartree'
MECP_ENERGY_2_KEY = 'r_matsci_E_MECP(State_2)/Hartree'
BARRIER_KEY = 'r_matsci_%s_barrier(State_%s)/kcal/mol'
BARRIER_1_KEY = BARRIER_KEY % ('Forward', '1')
BARRIER_2_KEY = BARRIER_KEY % ('Reverse', '2')
DEFAULT_C1 = 1.0E-4
DEFAULT_C2 = 0.9
DEFAULT_MAX_STEP_SIZE = 50.0
DEFAULT_MIN_STEP_SIZE = 1.0E-8
DEFAULT_XTOL = 1.0E-14
# the following functions are not used in the MECP calculation
# but rather included as debug functions, to run with debug functions
# change DEBUG from None to a tuple of functions to call for states 1
# and 2, for example DEBUG = (rosen, rosen) or (quadratic_1, quadratic_2),
# the mocking is done at the deepest level to allow deep debugging, a typical
# input deck for a debug run looks like:
#
# $SCHRODINGER/run mecp.py -iterations 500 -verbose
# -input_file structure.mae
#
[docs]def rosen(x_vec):
"""
The Rosenbrock function. The target
solution should be all ones and the function
value here should be zero.
:type x_vec: numpy.array
:param x_vec: the solution vector (N X 1)
:rtype: float, numpy.array
:return: the function value and Jacobian (N X 1)
"""
return scipy.optimize.rosen(x_vec), scipy.optimize.rosen_der(x_vec)
[docs]def quadratic_1(x_vec):
"""
The x**2 + y**2 + 2*x function. The target solution
is (x = -1, y = 0) with f = -1. The minimum crossing
point with quadratic_2 is (x = -1/2, y = -1/2).
:type x_vec: numpy.array
:param x_vec: the solution vector (N X 1)
:rtype: float, numpy.array
:return: the function value and Jacobian (N X 1)
"""
x = x_vec[0]
y = x_vec[1]
energy = x**2 + y**2 + 2 * x
gradient = numpy.zeros(len(x_vec))
gradient[0] = 2 * x + 2
gradient[1] = 2 * y
return energy, gradient
[docs]def quadratic_2(x_vec):
"""
The x**2 + y**2 + 2*y function. The target solution
is (x = 0, y = -1) with f = -1. The minimum crossing
point with quadratic_1 is (x = -1/2, y = -1/2).
:type x_vec: numpy.array
:param x_vec: the solution vector (N X 1)
:rtype: float, numpy.array
:return: the function value and Jacobian (N X 1)
"""
x = x_vec[0]
y = x_vec[1]
energy = x**2 + y**2 + 2 * y
gradient = numpy.zeros(len(x_vec))
gradient[0] = 2 * x
gradient[1] = 2 * y + 2
return energy, gradient
[docs]def quadratic_3(x_vec):
"""
The x**2 + y**2 + 6*x function. The target solution
is (x = -3, y = 0) with f = -9. The minimum crossing
point with quadratic_2 is (x = -3/10, y = -9/10).
:type x_vec: numpy.array
:param x_vec: the solution vector (N X 1)
:rtype: float, numpy.array
:return: the function value and Jacobian (N X 1)
"""
x = x_vec[0]
y = x_vec[1]
energy = x**2 + y**2 + 6 * x
gradient = numpy.zeros(len(x_vec))
gradient[0] = 2 * x + 6
gradient[1] = 2 * y
return energy, gradient
[docs]def get_guess(nvar=33, amp=5, seed=123):
"""
Return a guess solution.
:type nvar: int
:param nvar: the number of variables
:type amp: int
:param amp: the amplitude of the set of
random numbers used in generating the guess
solution
:type seed: int
:param seed: seed for random
:rtype: numpy.array
:return: the guess geometry (N X 1)
"""
random.seed(seed)
return numpy.array([amp * random.random() for i in range(nvar)])
[docs]def set_e_and_g(astructure, jobs):
"""
Set the energies and gradients for a debug run.
:type astructure: `schrodinger.structure.Structure`
:param astructure: the structure containing the solution as coordinates
:type jobs: list
:param jobs: contains instances of JaguarJob
"""
x_n = matrix_to_vector(astructure.getXYZ())
for job, func in zip(jobs, DEBUG):
energy, gradient = func(x_n)
forces = -1 * vector_to_matrix(gradient)
job.debug_energy = energy
job.debug_forces = forces
job.jag_restart_input_file = None
DEBUG = None
[docs]class ParserWrapper(object):
"""
Manages the argparse module to parse user command line
arguments.
"""
[docs] def __init__(self, scriptname, description):
"""
Create a ParserWrapper instance and process it.
:type scriptname: str
:param scriptname: name of this script
:type description: str
:param description: description of this script
"""
name = '$SCHRODINGER/run ' + scriptname
self.parserobj = argparse.ArgumentParser(
prog=name,
description=description,
add_help=False,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
[docs] def loadIt(self):
"""
Load ParserWrapper with options.
"""
charge_1_help = """Specify the net molecular charge of one of
the two relevant states (refer to it as the first state
although the order doesn't matter)."""
self.parserobj.add_argument('-charge_1',
type=int,
default=DEFAULT_CHARGE_1,
help=charge_1_help)
charge_2_help = """Specify the net molecular charge of the
other relevant state (refer to it as the second state
although the order doesn't matter)."""
self.parserobj.add_argument('-charge_2',
type=int,
default=DEFAULT_CHARGE_2,
help=charge_2_help)
multip_1_help = """Specify the multiplicity of the first
state."""
self.parserobj.add_argument('-multip_1',
type=int,
default=DEFAULT_MULTIP_1,
help=multip_1_help)
multip_2_help = """Specify the multiplicity of the second
state."""
self.parserobj.add_argument('-multip_2',
type=int,
default=DEFAULT_MULTIP_2,
help=multip_2_help)
state_1_help = """Specify the first state using 0 for the ground
state and <N> for the N-th excited state. The indexing used
for the excited states is the same as that used with the
'nroot' and 'target' &gen section keywords for excited state
calculations with Jaguar."""
self.parserobj.add_argument('-state_1',
type=int,
default=DEFAULT_STATE_1,
help=state_1_help)
state_2_help = """Specify the second state using 0 for the ground
state and <N> for the N-th excited state. The indexing used
for the excited states is the same as that used with the
'nroot' and 'target' &gen section keywords for excited state
calculations with Jaguar."""
self.parserobj.add_argument('-state_2',
type=int,
default=DEFAULT_STATE_2,
help=state_2_help)
scf_gs_help = """Specify that the ground state of states with
non-singlet multiplicities be determined using an SCF rather than
as an excited state using a ground state singlet reference
state. Note that this option is currently automatically
enabled for all multiplicities except triplets."""
self.parserobj.add_argument('-scf_gs',
action='store_true',
help=scf_gs_help)
jaguar_options_help = """Specify Jaguar &gen section key-value pairs.
Each key-value pair should be separated from the next by whitespace
and each should be represented in terms of a '<key>=<value>'
pair. If neither of the given states are excited states then
the excited state options will be removed from the default Jaguar
options given here. This option has a default value of: %s."""
metavar = ('<key_1>=<value_1>', '<key_2>=<value_2>')
self.parserobj.add_argument('-jaguar_options',
nargs='*',
type=str,
metavar=metavar,
default=DEFAULT_JAGUAR_OPTIONS,
help=jaguar_options_help %
' '.join(DEFAULT_JAGUAR_OPTIONS))
iterations_help = """Specify the maximum number of MECP geometry
optimization iterations."""
self.parserobj.add_argument('-iterations',
type=int,
default=DEFAULT_ITERATIONS,
help=iterations_help)
eps_help = """Specify the step size, in units of Angstrom, to use
in any finite difference approximations."""
self.parserobj.add_argument('-eps',
type=float,
default=DEFAULT_EPS,
help=eps_help)
init_hess_help = """Specify what to use for the initial guess Hessian
used in the BFGS optimization. The choice %s is the identity matrix.
The choice %s uses a diagonal forward finite difference of the
MECP gradient. Finite difference approximations use the option
-eps to specify the step size."""
self.parserobj.add_argument(
'-init_hess',
type=str,
choices=HESS_CHOICES,
default=DEFAULT_HESS,
help=init_hess_help % (IDENTITY_HESS, DIAG_FWD_FIN_DIFF_GRAD_HESS))
method_help = """Specify the method to use for determining the MECP.
The choice %s is the gradient projection method where the MECP
gradient includes a term for the gradient difference vector plus
a term in which the gradient difference vector is projected out
of the gradient averaged over the two states. The choice %s is
the updating branching plane method and is analogous to choice %s
except that a vector that is both orthogonal to the gradient
difference vector and on the branching plane is additionally
projected out of the gradient averaged over the two states."""
self.parserobj.add_argument('-method',
type=str,
choices=METHOD_CHOICES,
default=DEFAULT_METHOD,
help=method_help %
(PROJECTION, UPDATING, PROJECTION))
perp_factor_help = """Specify the prefactor, in units of inverse
Hartree, of the energy term for which the gradient is perpendicular
to the crossing seam."""
self.parserobj.add_argument('-perp_factor',
type=float,
default=DEFAULT_PERP_FACTOR,
help=perp_factor_help)
para_factor_help = """Specify the unitless prefactor of the energy
term for which the gradient is parallel to the crossing seam. Note
that setting this value to 0 reduces the MECP search to that of
an arbitrary crossing point rather than a minimum energy crossing
point. The arbitrary point may or may not be the minimum energy
point as it will depend on the initial geometry."""
self.parserobj.add_argument('-para_factor',
type=float,
default=DEFAULT_PARA_FACTOR,
help=para_factor_help)
convergence_help = """Specify various convergence criteria for the
MECP geometry optimization. Options include (1) %s, which specifies
a target energy decrease between the k and k-1 iterations (default of
%s Hartree), (2) %s, which specifies a target maximum magnitude
element of the Cartesian forces matrix (default of %s Hartree/Ang.),
(3) %s, which specifies a target RMS Cartesian force (default of %s
Hartree/Ang.), (4) %s, which specifies a target maximum magnitude element
of the Cartesian displacement matrix between the k and k-1 iterations
(default of %s Ang.), and (5) %s, which specifies a target RMS Cartesian
displacement (default of %s Ang.). Input should be whitespace
separated <key>=<value> pairs. The MECP geometry optimization will
terminate once any one of these conditions are met."""
self.parserobj.add_argument(
'-convergence',
nargs='*',
type=str,
default=DEFAULT_CONVERGENCE,
help=convergence_help %
(DELTA_ENERGY, DEFAULT_DELTA_ENERGY, MAX_FORCE, DEFAULT_MAX_FORCE,
RMS_FORCE, DEFAULT_RMS_FORCE, MAX_DISPLACEMENT,
DEFAULT_MAX_DISPLACEMENT, RMS_DISPLACEMENT,
DEFAULT_RMS_DISPLACEMENT))
guess_geom_help = """Specify the initial geometry used in the
MECP geometry optimization. The choices are %s, which is
the input geometry, %s, which will do a geometry optimization
for the first state and use its optimized geometry, and %s,
which will do a geometry optimization for the second state
and use its optimized geometry."""
self.parserobj.add_argument('-guess_geom',
type=str,
choices=GUESS_GEOM_CHOICES,
default=DEFAULT_GUESS_GEOM,
help=guess_geom_help %
(INPUT, OPT_1, OPT_2))
all_geometries_help = """Use this option to report all geometries
in the output from this script, i.e. the geometries
from all steps of the MECP geometry optimization will be reported
in the output file."""
self.parserobj.add_argument('-all_geometries',
action='store_true',
help=all_geometries_help)
all_results_help = """Use this option to copy all subdirectories
containing results from intermediate Jaguar force, etc.
calculations back to the launch host."""
self.parserobj.add_argument('-all_results',
action='store_true',
help=all_results_help)
properties_help = """Use this option to report properties on
the final MECP structure. These properties include (1) and (2)
the energy differences between the energies at the MECP less
the energies at the optimized geometries of the two states."""
self.parserobj.add_argument('-properties',
action='store_true',
help=properties_help)
base_name_help = """Specify a base name to use for naming job-related
files, etc."""
self.parserobj.add_argument('-base_name',
type=str,
default=DEFAULT_BASE_NAME,
help=base_name_help)
input_file_help = """Specify a Maestro file containing a single
input structure."""
self.parserobj.add_argument('-input_file',
type=str,
required=True,
help=input_file_help)
self.parserobj.add_argument('-verbose',
action='store_true',
help='Turn on verbose printing.')
self.parserobj.add_argument('-help',
'-h',
action='help',
default=argparse.SUPPRESS,
help='Show this help message and exit.')
jc_options = [cmdline.HOST, cmdline.NOJOBID, cmdline.SAVE, \
cmdline.WAIT]
cmdline.add_jobcontrol_options(self.parserobj, options=jc_options)
tpp_help = """Specify the number of threads to use for parallelizing
any intermediate Jaguar calculations."""
self.parserobj.add_argument('-TPP',
type=int,
default=DEFAULT_TPP,
dest='tpp',
help=tpp_help)
nresources_help = """Specify the total number of resources for Jaguar
calculations, i.e. the number of threads (see option -TPP) * the
number of processors. Here the number of processors means the number
of simultaneous Jaguar calculations each of which uses -TPP
threads. For MECP calculations this value can only be 1 or 2."""
self.parserobj.add_argument('-NPROC',
type=int,
default=DEFAULT_NRESOURCES,
dest='nresources',
help=nresources_help)
[docs] def parseArgs(self, args):
"""
Parse the command line arguments.
:type args: tuple
:param args: command line arguments
"""
self.options = self.parserobj.parse_args(args)
[docs]def get_final_state(state, scf_gs, multiplicity):
"""
Return the final state.
:type state: int
:param state: the electronic state, 0 is the ground state
and <N> is the N-th excited state
:type scf_gs: bool
:param scf_gs: specify whether ground states should be
determined using an SCF
:type multiplicity: int
:param multiplicity: molecular multiplicity
:rtype: int or None
:return: the final state or None if there isn't one
"""
if not scf_gs and multiplicity != 1:
final_state = state + 1
elif state:
final_state = state
else:
final_state = None
return final_state
[docs]class JaguarJob(object):
"""
Manages a Jaguar job.
"""
KEYS = ['idx', 'base_name', 'charge', 'multiplicity', 'state', 'scf_gs', \
'kwargs', 'mae_input_file', 'jag_input_file', 'jag_output_file', \
'jag_restart_input_file', 'in_template', 'job']
[docs] def __init__(self,
idx=None,
base_name=None,
charge=None,
multiplicity=None,
state=None,
scf_gs=None,
kwargs=None,
mae_input_file=None,
jag_input_file=None,
jag_output_file=None,
jag_restart_input_file=None,
in_template=None,
job=None):
"""
Create an instance.
:type idx: int
:param idx: the index
:type base_name: str
:param base_name: the base name
:type charge: int
:param charge: net molecular charge
:type multiplicity: int
:param multiplicity: molecular multiplicity
:type state: int
:param state: the electronic state, 0 is the ground state
and <N> is the N-th excited state
:type scf_gs: bool
:param scf_gs: specify whether ground states should be
determined using an SCF
:type kwargs: dict
:param kwargs: dictionary of Jaguar &gen section key-value
pairs
:type mae_input_file: str
:param mae_input_file: Maestro input file
:type jag_input_file: str
:param jag_input_file: Jaguar input file
:type jag_output_file: str
:param jag_output_file: Jaguar output file
:type jag_restart_input_file: str
:param jag_restart_input_file: Jaguar restart input file
:type in_template: str
:param in_template: a Jaguar input file to use as template
:type job: queue.JobControlJob
:param job: job object
"""
self.idx = idx
self.base_name = base_name
self.charge = charge
self.multiplicity = multiplicity
self.state = state
self.scf_gs = scf_gs
self.kwargs = kwargs
self.mae_input_file = mae_input_file
self.jag_input_file = jag_input_file
self.jag_output_file = jag_output_file
self.jag_restart_input_file = jag_restart_input_file
self.in_template = in_template
self.job = job
self.results = None
self.debug_energy = None
self.debug_forces = None
[docs] def getFinalTotalEnergy(self):
"""
Return the final total energy accounting for any excitation energies.
:rtype: float
:return: the total energy in hartree
"""
if DEBUG is not None:
return self.debug_energy
# although this might be an 'rtriplet=1' calculation .excitation_energies
# may be used (note that there is also a .triplet_excitation_energies
# attr)
final_state = get_final_state(self.state, self.scf_gs,
self.multiplicity)
if final_state is not None:
excitation_energy = \
self.results.last_results.excitation_energies[final_state - 1]
excitation_energy = ev_to_hartree(excitation_energy)
else:
excitation_energy = 0.0
return self.results.last_results.energy + excitation_energy
[docs] def getFinalForcesHartreePerAngstrom(self):
"""
Return the final forces in units of hartree/angstrom.
:rtype: numpy.array
:return: the forces in hartree/angstrom in natoms X 3
"""
# currently Jaguar excited state optimizations do not follow
# the excited state wavefunction in a qualitative sense but
# rather always use the excited state by <target> number (see
# &gen keywords) and so we follow the same convention here
# which for our test set appears to be OK
if DEBUG is not None:
forces = self.debug_forces
else:
forces = self.results.last_results.forces
return reciprocal_bohr_to_angstrom(forces)
[docs] def getFinalStructure(self):
"""
Return the final structure.
:type astructure: `schrodinger.structure.Structure`
:param astructure: the final structure
"""
return self.results.getStructure()
[docs]class JaguarError(Exception):
pass
[docs]class JaguarJobs(object):
"""
Manages Jaguar jobs.
"""
# chosen so that with the typical use of target=1, nroot=5
EXTRA_NROOT = 4
[docs] def __init__(self,
data=DEFAULT_DATA,
scf_gs=DEFAULT_SCF_GS,
kwargs=DEFAULT_KWARGS,
base_name=DEFAULT_BASE_NAME,
host=DEFAULT_HOST,
nproc=DEFAULT_NPROC,
tpp=DEFAULT_TPP,
logger=None):
"""
Create an instance.
:type data: list
:param data: contains (charge, multiplicity, state) tuples
for all jobs, for electronic state 0 is the ground state
and <N> is the N-th excited state
:type scf_gs: bool
:param scf_gs: specify whether ground states should be
determined using an SCF
:type kwargs: dict
:param kwargs: dictionary of Jaguar &gen section key-value
pairs
:type base_name: str
:param base_name: a base name used to name the jobs and their
related files
:type host: str
:param host: the host on which to run the jobs
:type nproc: int
:param nproc: the number of processors to use for running the
jobs, i.e. the number of simultaneous jobs
:type tpp: int
:param tpp: the number of threads to use for the jobs, i.e.
-TPP (threads-per-process)
:type logger: logging.Logger or None
:param logger: output logger or None if there isn't one
"""
self.data = data
self.scf_gs = scf_gs
self.kwargs = OrderedDict(kwargs)
self.base_name = base_name
self.host = host
self.nproc = nproc
self.tpp = tpp
self.logger = logger
self.astructure = None
self.in_templates = None
self.launch_dir = None
self.jobs = []
[docs] def clearJobs(self):
"""
Clear the list of jobs attribute.
"""
self.jobs = []
[docs] def setStructure(self, astructure):
"""
Set the structure to use for the jobs.
:type astructure: `schrodinger.structure.Structure`
:param astructure: the structure to use for the jobs
"""
self.astructure = astructure
[docs] def setInTemplates(self, in_templates=None):
"""
Set the input templates to use for the jobs.
:type in_templates: list or None
:param in_templates: a list of Jaguar input files to use as
templates for creating the jobs, if used then the length of
this list should be equivalent to that of the data attribute,
the first ZMAT section is overwritten with the input astructure
argument, use None if there aren't any templates
:raise: JaguarError if in_templates data is the wrong size
"""
if in_templates:
if len(in_templates) != len(self.data):
msg = ('The number of input templates and the the number '
'of data given in the constructor must be equivalent.')
raise JaguarError(msg)
else:
self.in_templates = in_templates
[docs] def setUpLaunchDir(self, launch_dir=None):
"""
Set up the launch directory for the jobs.
:type launch_dir: str or None
:param launch_dir: the directory from which to launch the jobs
or None if there isn't one (in which case the CWD will be used)
"""
if launch_dir is None:
self.launch_dir = os.curdir
else:
self.launch_dir = launch_dir
if not os.path.exists(self.launch_dir):
os.mkdir(self.launch_dir)
[docs] def getBaseName(self, idx):
"""
Return a base name for the given job.
:type idx: int
:param idx: the job index
:rtype: str
:return: base name for the job
"""
return self.base_name + '_' + str(idx)
[docs] def getKwargs(self, charge, multiplicity, state):
"""
Return the kwargs for the given job.
:type charge: int
:param charge: net molecular charge
:type multiplicity: int
:param multiplicity: molecular multiplicity
:type state: int
:param state: the electronic state, 0 is the ground state
and <N> is the N-th excited state
:rtype: dict
:return: Jaguar kwargs for the job
"""
kwargs = OrderedDict(self.kwargs)
kwargs['molchg'] = charge
if self.scf_gs:
amult = multiplicity
else:
amult = 1
kwargs['multip'] = amult
final_state = get_final_state(state, self.scf_gs, multiplicity)
if final_state is not None:
kwargs['nroot'] = final_state + self.EXTRA_NROOT
kwargs['target'] = final_state
else:
kwargs.pop('itddft', None)
kwargs.pop('itda', None)
if not self.scf_gs:
if multiplicity == 3:
kwargs['rtriplet'] = 1
return kwargs
[docs] def getFileNames(self, base_name):
"""
Return a list of files names for the given job.
:type base_name: str
:param base_name: base name for the job
:rtype: list
:return: file names for the job
"""
afunc = lambda x: os.path.join(self.launch_dir, x)
exts = [MAE_EXT, IN_EXT, OUT_EXT]
files = [afunc(base_name + ext) for ext in exts]
restart_input_file = afunc(restart_name(base_name) + IN_EXT)
files.append(restart_input_file)
return files
[docs] def getInTemplate(self, idx):
"""
Return an input template file.
:type idx: int
:param idx: the job index
:rtype: str
:return: the input template file
"""
if self.in_templates:
in_template = self.in_templates[idx - 1]
else:
in_template = None
return in_template
[docs] def getJob(self, base_name, jag_input_file):
"""
Return a queue.JobControlJob for the given job.
:type base_name: str
:param base_name: base name for the job
:type jag_input_file: str
:param jag_input_file: Jaguar input file for the job
:rtype: queue.JobControlJob
:return: the job object
"""
apath, afile = os.path.split(jag_input_file)
cmd = ['jaguar', 'run']
if self.tpp > 1:
cmd.extend(['-TPP', str(self.tpp)])
return queue.JobControlJob(cmd + [afile],
command_dir=self.launch_dir,
name=base_name)
[docs] def runJobs(self):
"""
Run the jobs.
"""
dj = queue.JobDJ(max_retries=JOB_DJ_RETRIES,
max_failures=queue.NOLIMIT,
verbosity='normal',
hosts=[(self.host, self.nproc)])
for job in self.jobs:
dj.addJob(job.job)
dj.run()
[docs] def processOutput(self):
"""
Process the output from the jobs.
:raise: JaguarError if there were problems with the job
"""
for job in self.jobs:
if not os.path.exists(job.jag_output_file):
msg = ('Jaguar output file %s can not be found.')
raise JaguarError(msg % job.jag_output_file)
jo = JaguarOutput(job.jag_output_file)
if jo.status != JaguarOutput.OK or jo.fatal_error:
msg = ('The Jaguar job for %s died with error: %s.')
if jo.fatal_error:
err = jo.fatal_error.strip().replace('\n', '')
else:
err = str(jo.status)
raise JaguarError(msg % (job.jag_input_file, err))
job.results = jo
[docs] def runIt(self, astructure, in_templates=None, launch_dir=None):
"""
Main function to run the jobs.
:type astructure: `schrodinger.structure.Structure`
:param astructure: the structure to use for the jobs
:type in_templates: list or None
:param in_templates: a list of Jaguar input files to use as
templates for creating the jobs, if used then the length of
this list should be equivalent to that of the data attribute,
the first ZMAT section is overwritten with the input astructure
argument, use None if there aren't any templates
:type launch_dir: str or None
:param launch_dir: the directory from which to launch the jobs
or None if there isn't one (in which case the CWD will be used)
:rtype: list
:return: contains JaguarJob objects for all jobs
"""
self.clearJobs()
self.setStructure(astructure)
self.setInTemplates(in_templates=in_templates)
self.setUpLaunchDir(launch_dir=launch_dir)
for idx, data in enumerate(self.data, 1):
base_name = self.getBaseName(idx)
jaguar_kwargs = self.getKwargs(*data)
file_names = self.getFileNames(base_name)
in_template = self.getInTemplate(idx)
jcjob = self.getJob(base_name, file_names[1])
args = [idx, base_name] + list(data) + \
[self.scf_gs, jaguar_kwargs] + file_names + \
[in_template, jcjob]
kwargs = OrderedDict(list(zip(JaguarJob.KEYS, args)))
job = JaguarJob(**kwargs)
self.setUpInputFiles(job)
self.jobs.append(job)
if DEBUG is not None:
set_e_and_g(self.astructure, self.jobs)
else:
self.runJobs()
self.processOutput()
return self.jobs
[docs]class TerminateException(Exception):
pass
[docs]class IterationError(Exception):
pass
[docs]class MECPStep(object):
"""
Manage an MECP step, which can be an iteration or a line
search iteration.
"""
ALPHA_THRESH = 1.0e-10
[docs] def __init__(self, iteration, call_number, x_initial, method, perp_factor,
para_factor, verbose, logger, bfgs_obj):
"""
Create an instance.
:type iteration: int
:param iteration: the iteration
:type call_number: int
:param call_number: the call number
:type x_initial: numpy.array
:param x_initial: the initial geometry vector (N X 1)
:type method: str
:param method: the method to use to determine the MECP
:type perp_factor: float
:param perp_factor: prefactor for the energy term whose gradient
lies perpendicular to the crossing seam
:type para_factor: float
:param para_factor: prefactor for the energy term whose gradient
lies parallel to the crossing seam
:type verbose: bool
:param verbose: specifies verbose logging
:type logger: logging.Logger
:param logger: output logger
:type bfgs_obj: BFGS
:param bfgs_obj: a BFGS object that manages the optimization
"""
self.iteration = iteration
self.call_number = call_number
self.x_initial = numpy.copy(x_initial)
self.method = method
self.perp_factor = perp_factor
self.para_factor = para_factor
self.verbose = verbose
self.logger = logger
self.bfgs_obj = bfgs_obj
self.jobs = None
self.energy_1 = None
self.energy_2 = None
self.perp_energy = None
self.para_energy = None
self.energy = None
self.delta_energy = None
self.weighted_energy = None
self.forces_1 = None
self.max_force_1 = None
self.rms_force_1 = None
self.len_force_1 = None
self.forces_2 = None
self.max_force_2 = None
self.rms_force_2 = None
self.len_force_2 = None
self.angle = None
self.len_delta_forces = None
self.unit_delta_forces = None
self.unit_norm_delta_forces = None
self.perp_forces = None
self.max_perp_force = None
self.rms_perp_force = None
self.len_perp_force = None
self.para_forces = None
self.max_para_force = None
self.rms_para_force = None
self.len_para_force = None
self.forces = None
self.max_force = None
self.rms_force = None
self.len_force = None
self.weighted_forces = None
self.weighted_gradients = None
self.displacements = None
self.max_displacement = None
self.rms_displacement = None
self.len_displacement = None
self.in_templates = None
self.summary_line = None
self.x_final = None
[docs] def logInitialGeometry(self):
"""
Log the initial geometry.
"""
if self.verbose and self.logger:
log_geometry(self.x_initial,
header=DEFAULT_GEOMETRY_HEADER,
logger=self.logger)
self.logger.info('')
[docs] def logHessianEigVals(self):
"""
Log the Hessian eigenvalues.
"""
if self.verbose and self.logger:
header = 'MECP Hessian eigenvalues / Hartree/(Ang.**2)'
self.logger.info(header)
self.logger.info('-' * len(header))
for eigval in self.bfgs_obj.hess_eigvals:
self.logger.info('%.8f' % eigval)
self.logger.info('')
[docs] def logStepInfo(self):
"""
Log the step info.
"""
if self.verbose and self.logger and self.bfgs_obj.step_size is not None:
alen = transform.get_vector_magnitude(self.bfgs_obj.step_k)
self.logger.info('Step direction length / Ang.: %.8f' % alen)
self.logger.info('(Gradient, Step) angle / deg.: %.2f' %
self.bfgs_obj.angle)
self.logger.info('Step size: %.8f' % self.bfgs_obj.step_size)
self.logger.info('')
[docs] def handleFinalGeometry(self, x_final):
"""
Handle the final geometry.
:type x_final: numpy.array
:param x_final: the final geometry vector (N X 1)
"""
self.x_final = numpy.copy(x_final)
self.displacements = self.x_final - self.x_initial
self.max_displacement, self.rms_displacement, self.len_displacement = \
get_max_and_rms_and_len(self.displacements)
if self.verbose and self.logger:
log_geometry(self.x_final,
header='Updated geometry / Ang.',
logger=self.logger)
self.logger.info('')
log_displacements(self.displacements,
self.max_displacement,
self.rms_displacement,
self.len_displacement,
logger=self.logger)
self.logger.info('')
[docs] def setInTemplates(self):
"""
Set the Jaguar input templates.
"""
self.in_templates = [x.jag_restart_input_file for x in self.jobs]
[docs] def getUnitNormalToDeltaForces(self, prev_mecp_step=None):
"""
Get the unit vector that is normal to the delta forces vector.
:type prev_mecp_step: MECPStep or None
:param prev_mecp_step: MECPStep from the previous iteration or
line search iteration or None if there isn't one
:rtype: numpy.array or None
:return: the unit vector perpendicular to the delta forces vector or
None if there isn't one
"""
if prev_mecp_step and self.method == UPDATING:
if prev_mecp_step.unit_norm_delta_forces is None:
self.unit_norm_delta_forces = \
numpy.copy(prev_mecp_step.unit_delta_forces)
else:
numerator1 = numpy.dot(prev_mecp_step.unit_norm_delta_forces,
self.unit_delta_forces)
numerator2 = numpy.dot(prev_mecp_step.unit_delta_forces,
self.unit_delta_forces)
denominator = math.sqrt(numerator1**2 + numerator2**2)
self.unit_norm_delta_forces = \
numerator1 * prev_mecp_step.unit_delta_forces - \
numerator2 * prev_mecp_step.unit_norm_delta_forces
self.unit_norm_delta_forces /= denominator
return self.unit_norm_delta_forces
[docs] def setEnergiesAndForces(self, prev_mecp_step=None):
"""
Set the energies and forces for the two states as well
as well as the energy and forces to use for the MECP
optimization.
:type prev_mecp_step: MECPStep or None
:param prev_mecp_step: MECPStep from the previous iteration or
line search iteration or None if there isn't one
"""
# the following interesting Lagrangian can also be used to
# find the MECP (it doesn't require a gradient projection and
# thus the gradient does go to zero) however it often requires
# finding the k parameter by trial-and-error and so in testing
# it was ruled out in favor of the projection method,
#
# L = f_perp*(E2 - E1)**4 + f_para*(E2 - E2,min)(1 - exp(-k(E2 - E1)**2)),
#
# where f_perp = 100, f_para = 1, E2,min is the energy minimum
# for state 2, and where the exp term is the inverse unit impulse
# function, i.e. k is infinite (in practice ranging from 10**2 to
# 10**8, etc.), for the correct choice of k the exp term modulates
# the min term to function as a full minimization except for when
# E2 - E1 is small enough for which it will be zero, other
# possibilities might include third party engineering-type
# solvers like https://www.geogebra.org/m/TEERJczX or CAD though
# it is unclear whether those can be used for cases lacking
# explicit functions and which therefore must solved as
# minimization problems
self.energy_1, self.energy_2 = \
[x.getFinalTotalEnergy() for x in self.jobs]
self.forces_1, self.forces_2 = \
[x.getFinalForcesHartreePerAngstrom() for x in self.jobs]
forces_1 = matrix_to_vector(self.forces_1)
forces_2 = matrix_to_vector(self.forces_2)
self.angle = get_angle_in_degrees(forces_1, forces_2)
# this alpha prevents projecting on to the zero vector in
# case the energies are equivalent
e_diff = self.energy_2 - self.energy_1
if e_diff < 0:
alpha = -1.0 * self.ALPHA_THRESH
else:
alpha = self.ALPHA_THRESH
self.perp_energy = (e_diff + alpha)**2
self.para_energy = old_div((self.energy_1 + self.energy_2), 2)
self.energy = self.perp_energy + self.para_energy
delta_forces = forces_2 - forces_1
self.len_delta_forces = transform.get_vector_magnitude(delta_forces)
self.unit_delta_forces = transform.get_normalized_vector(delta_forces)
avg_forces = old_div((forces_1 + forces_2), 2)
self.perp_forces = 2 * (e_diff + alpha) * delta_forces
self.para_forces = avg_forces
unit_perp_forces = transform.get_normalized_vector(self.perp_forces)
projection = numpy.dot(self.para_forces,
unit_perp_forces) * unit_perp_forces
self.para_forces -= projection
unit_norm_delta_forces = self.getUnitNormalToDeltaForces(prev_mecp_step)
if unit_norm_delta_forces is not None:
projection = numpy.dot(avg_forces, unit_norm_delta_forces)
self.para_forces -= projection * unit_norm_delta_forces
self.forces = self.perp_forces + self.para_forces
self.weighted_energy = self.perp_factor * self.perp_energy + \
self.para_factor * self.para_energy
self.weighted_forces = self.perp_factor * self.perp_forces + \
self.para_factor * self.para_forces
self.weighted_gradients = -1 * self.weighted_forces
self.perp_forces = vector_to_matrix(self.perp_forces)
self.para_forces = vector_to_matrix(self.para_forces)
self.forces = vector_to_matrix(self.forces)
[docs] def setEnergiesAndForcesData(self, prev_mecp_step=None):
"""
Set energies and forces data.
:type prev_mecp_step: MECPStep or None
:param prev_mecp_step: MECPStep from the previous iteration or
line search iteration or None if there isn't one
"""
if prev_mecp_step:
self.delta_energy = self.energy - prev_mecp_step.energy
self.max_force_1, self.rms_force_1, self.len_force_1 = \
get_max_and_rms_and_len(self.forces_1)
self.max_force_2, self.rms_force_2, self.len_force_2 = \
get_max_and_rms_and_len(self.forces_2)
self.max_perp_force, self.rms_perp_force, self.len_perp_force = \
get_max_and_rms_and_len(self.perp_forces)
self.max_para_force, self.rms_para_force, self.len_para_force = \
get_max_and_rms_and_len(self.para_forces)
self.max_force, self.rms_force, self.len_force = \
get_max_and_rms_and_len(self.forces)
[docs] def setJobData(self, jobs, prev_mecp_step=None):
"""
Set job data.
:type jobs: list
:param jobs: contains the JaguarJob instances for the two jobs
:type prev_mecp_step: MECPStep or None
:param prev_mecp_step: MECPStep from the previous iteration or
line search iteration or None if there isn't one
"""
self.jobs = jobs
self.setInTemplates()
self.setEnergiesAndForces(prev_mecp_step)
self.setEnergiesAndForcesData(prev_mecp_step)
[docs] def logEnergyAndForces(self,
header,
energy,
forces,
max_force,
rms_force,
len_force,
energy_header=DEFAULT_ENERGY_HEADER,
forces_header=DEFAULT_FORCES_HEADER):
"""
Log energy and forces.
:type header: str
:param header: a header
:type energy: float or None
:param energy: the energy or None if there isn't one
:type forces: numpy.array
:param forces: the forces (natoms X 3)
:type max_force: float
:param max_force: the magnitude of the largest forces element
:type rms_force: float
:param rms_force: the RMS of forces
:type len_force: float
:param len_force: the length of forces
:type energy_header: str
:param energy_header: an energy header
:type forces_header: str
:param forces_header: a forces header
"""
if self.logger:
self.logger.info(header)
self.logger.info('=' * len(header))
log_energy_and_forces(energy,
matrix_to_vector(forces),
max_force,
rms_force,
len_force,
energy_header=energy_header,
forces_header=forces_header,
logger=self.logger)
self.logger.info('')
[docs] def logSummary(self):
"""
Log a summary.
"""
all_entries = [self.energy_1, self.energy_2, self.energy, \
self.delta_energy, self.max_force, self.rms_force, \
self.max_displacement, self.rms_displacement]
if self.iteration == self.call_number == 1 or self.verbose:
self.logSummaryHeader()
entries = [str(self.iteration), str(self.call_number)]
for entry in all_entries:
if entry is None:
entries.append(str(entry))
else:
entries.append('%.8f' % entry)
self.summary_line = self.getFormattedSummaryLine(entries)
if self.logger:
self.logger.info(self.summary_line)
if self.verbose:
self.logger.info('')
[docs] def logJobData(self):
"""
Log job data.
"""
if self.verbose and self.logger:
self.logEnergyAndForces('State 1', self.energy_1, self.forces_1,
self.max_force_1, self.rms_force_1,
self.len_force_1)
self.logEnergyAndForces('State 2', self.energy_2, self.forces_2,
self.max_force_2, self.rms_force_2,
self.len_force_2)
self.logger.info('Delta forces length / Hartree/Ang.: %.8f' %
self.len_delta_forces)
self.logger.info('(Force 1, Force 2) angle / deg.: %.2f' %
self.angle)
self.logger.info('')
self.logEnergyAndForces('MECP perp', None, self.perp_forces,
self.max_perp_force, self.rms_perp_force,
self.len_perp_force)
self.logEnergyAndForces('MECP para', None, self.para_forces,
self.max_para_force, self.rms_para_force,
self.len_para_force)
self.logEnergyAndForces('MECP', self.energy, self.forces,
self.max_force, self.rms_force,
self.len_force)
[docs] def terminate(self, convergence_dict):
"""
Terminate the MECP optimization.
:type convergence_dict: dict
:param convergence_dict: contains various convergence
thresholds
:raise: TerminateException if it is time to terminate
"""
datas = [self.delta_energy, self.max_force, self.rms_force, \
self.max_displacement, self.rms_displacement]
keys = [DELTA_ENERGY, MAX_FORCE, RMS_FORCE, MAX_DISPLACEMENT, \
RMS_DISPLACEMENT]
msg = ('The optimization has converged with a %s of ' '%.8f %s %.8f.')
for data, key in zip(datas, keys):
if data is not None:
value = convergence_dict[key]
if key == DELTA_ENERGY:
condition = data <= 0 and data >= value
comparator = '>='
else:
condition = data <= value
comparator = '<='
if condition:
raise TerminateException(msg %
(key, data, comparator, value))
[docs]class MinimizerError(Exception):
pass
[docs]class BFGS(object):
"""
Manage a BFGS optimization.
"""
# the methods line_search_wolfe1 and minimize were
# based on those from the scipy.optimize.optimize module
# which has the following notice:
#
# ******NOTICE***************
# optimize.py module by Travis E. Oliphant
#
# You may copy and use this module as you see fit with no
# guarantee implied provided you keep this notice in all copies.
# *****END NOTICE************
#
# see http://home.agh.edu.pl/~pba/pdfdoc/Numerical_Optimization.pdf
#
LARGE_RHO_K = 1000.0
IMAG_TOL = 0.000001
ANGLE_TOL = 0.01
NEG_HESS_TOL = -1.0e-2
[docs] def __init__(self,
c1=DEFAULT_C1,
c2=DEFAULT_C2,
amax=DEFAULT_MAX_STEP_SIZE,
amin=DEFAULT_MIN_STEP_SIZE,
xtol=DEFAULT_XTOL,
max_force=DEFAULT_MAX_FORCE,
max_iterations=DEFAULT_ITERATIONS,
eps=DEFAULT_EPS,
init_hess=DEFAULT_HESS,
verbose=DEFAULT_VERBOSE,
logger=None):
"""
Create an instance.
:type c1: float
:param c1: parameter for Armijo condition rule
:type c2: float
:param c2: parameter for curvature condition rule
:type amax: float
:param amax: maximum allowable step size
:type amin: float
:param amin: minimum allowable step size
:type xtol: float
:param xtol: nonnegative relative tolerance for an acceptable step,
the search exits with a warning if the relative difference between
sty and stx is less than xtol where sty and stx define an interval
:type max_force: float
:param max_force: maximum allowable force element
:type max_iterations: int
:param max_iterations: the maximum number of iterations
:type eps: float
:param eps: step size in Angstrom for any finite difference
approximations
:type init_hess: str
:param init_hess: the type of initial Hessian to use
:type verbose: bool
:param verbose: specifies verbose logging
:type logger: logging.Logger or None
:param logger: output logger or None if there isn't one
"""
self.c1 = c1
self.c2 = c2
self.amax = amax
self.amin = amin
self.xtol = xtol
self.max_force = max_force
self.max_iterations = max_iterations
self.eps = eps
self.init_hess = init_hess
self.verbose = verbose
self.logger = logger
self.iteration = 1
self.inv_hess = None
self.hess = None
self.hess_eigvals = None
self.hess_eigvecs = None
self.step_k = None
self.angle = None
self.step_size = None
self.resetFiniteDiffCall()
[docs] def line_search_wolfe1(self,
f,
fprime,
xk,
pk,
gfk=None,
old_fval=None,
old_old_fval=None,
args=(),
c1=1e-4,
c2=0.9,
amax=50,
amin=1e-8,
xtol=1e-14):
"""
As `scalar_search_wolfe1` but do a line search to direction `pk`
Parameters
----------
f : callable
Function `f(x)`
fprime : callable
Gradient of `f`
xk : array_like
Current point
pk : array_like
Search direction
gfk : array_like, optional
Gradient of `f` at point `xk`
old_fval : float, optional
Value of `f` at point `xk`
old_old_fval : float, optional
Value of `f` at point preceding `xk`
The rest of the parameters are the same as for `scalar_search_wolfe1`.
Returns
-------
stp, f_count, g_count, fval, old_fval
As in `line_search_wolfe1`
gval : array
Gradient of `f` at the final point
"""
if gfk is None:
gfk = fprime(xk)
if isinstance(fprime, tuple):
eps = fprime[1]
fprime = fprime[0]
newargs = (f, eps) + args
gradient = False
else:
newargs = args
gradient = True
gval = [gfk]
gc = [0]
fc = [0]
def phi(s):
fc[0] += 1
return f(xk + s * pk, *args)
def derphi(s):
gval[0] = fprime(xk + s * pk, *newargs)
if gradient:
gc[0] += 1
else:
fc[0] += len(xk) + 1
return numpy.dot(gval[0], pk)
derphi0 = numpy.dot(gfk, pk)
stp, fval, old_fval = self.scalar_search_wolfe1(phi,
derphi,
old_fval,
old_old_fval,
derphi0,
c1=c1,
c2=c2,
amax=amax,
amin=amin,
xtol=xtol)
return stp, fc[0], gc[0], fval, old_fval, gval[0]
[docs] def scalar_search_wolfe1(self,
phi,
derphi,
phi0=None,
old_phi0=None,
derphi0=None,
c1=1e-4,
c2=0.9,
amax=50,
amin=1e-8,
xtol=1e-14):
"""
Scalar function search for alpha that satisfies strong Wolfe conditions
alpha > 0 is assumed to be a descent direction.
Parameters
----------
phi : callable phi(alpha)
Function at point `alpha`
derphi : callable dphi(alpha)
Derivative `d phi(alpha)/ds`. Returns a scalar.
phi0 : float, optional
Value of `f` at 0
old_phi0 : float, optional
Value of `f` at the previous point
derphi0 : float, optional
Value `derphi` at 0
amax : float, optional
Maximum step size
c1, c2 : float, optional
Wolfe parameters
Returns
-------
alpha : float
Step size, or None if no suitable step was found
phi : float
Value of `phi` at the new point `alpha`
phi0 : float
Value of `phi` at `alpha=0`
Notes
-----
Uses routine DCSRCH from MINPACK.
"""
if phi0 is None:
phi0 = phi(0.)
if derphi0 is None:
derphi0 = derphi(0.)
if old_phi0 is not None and derphi0 != 0:
alpha1 = min(1.0, 1.01 * 2 * (phi0 - old_phi0) / derphi0)
if alpha1 < 0:
alpha1 = 1.0
else:
alpha1 = 1.0
phi1 = phi0
derphi1 = derphi0
isave = numpy.zeros((2,), numpy.intc)
dsave = numpy.zeros((13,), float)
task = b'START'
# see http://ftp.mcs.anl.gov/pub/MINPACK-2/csrch/dcsrch.f
maxiter = 30
for i in range(maxiter):
stp, phi1, derphi1, task = scipy.optimize.minpack2.dcsrch(
alpha1, phi1, derphi1, c1, c2, xtol, task, amin, amax, isave,
dsave)
if task[:2] == b'FG':
alpha1 = stp
phi1 = phi(stp)
derphi1 = derphi(stp)
else:
break
else:
# maxiter reached, the line search did not converge
stp = None
if task[:5] == b'ERROR' or task[:4] == b'WARN':
stp = None # failed
return stp, phi1, phi0
[docs] def resetFiniteDiffCall(self):
"""
Reset the finite difference call.
"""
self.finite_diff_call = 0
[docs] def getInitialInvHessian(self, fun, jac, fun_0, jac_0, x_0):
"""
Return an initial guess for the inverse Hessian.
:type fun: function
:param fun: function to minimize
:type jac: function
:param jac: the Jacobian of the function being minimized
:type fun_0: float
:param fun_0: function value at initial solution
:type jac_0: float
:param jac_0: the Jacobian value at initial solution
:type x_0: numpy.array
:param x_0: initial solution
:rtype: numpy.array
:return: the initial guess inverse Hessian (N/3 X 3)
"""
nvar = len(x_0)
if self.init_hess == IDENTITY_HESS:
return numpy.eye(nvar)
elif self.init_hess == DIAG_FWD_FIN_DIFF_GRAD_HESS:
if self.logger:
msg = ('Building an initial guess inverse Hessian using '
'diagonal forward finite differences of the gradient.')
self.logger.info(msg)
self.logger.info('')
diags = []
for idx in range(nvar):
x_k = numpy.copy(x_0)
x_k[idx] += self.eps
self.finite_diff_call = idx + 1
fun_k = fun(x_k)
jac_k = jac(x_k)
diags.append(old_div(self.eps, (jac_k[idx] - jac_0[idx])))
self.resetFiniteDiffCall()
return numpy.diag(numpy.array(diags))
[docs] def minimize(self, fun, x_0, jac=None, **kwargs):
"""
Minimization of a function using the BFGS algorithm.
:type fun: function
:param fun: function to minimize
:type x_0: numpy.array
:param x_0: initial solution
:type jac: function
:param jac: the Jacobian of the function being minimized
:rtype: scipy.optimize.optimize.OptimizeResult
:return: optimization parameters
"""
x_k = numpy.copy(x_0)
old_fun_k = None
fun_k = fun(x_k)
jac_k = jac(x_k)
idty = numpy.eye(len(x_0))
self.inv_hess = self.getInitialInvHessian(fun, jac, fun_k, jac_k, x_k)
max_jac_k, rms_jac_k, len_jac_k = get_max_and_rms_and_len(jac_k)
while max_jac_k > self.max_force and self.iteration <= self.max_iterations:
self.hess = numpy.linalg.inv(self.inv_hess)
hess_eigvals, self.hess_eigvecs = numpy.linalg.eig(self.hess)
hess_eigvals_imag = numpy.imag(hess_eigvals)
if any(hess_eigvals_imag):
for imag in hess_eigvals_imag:
if abs(imag) > self.IMAG_TOL:
msg = (
'Hessian is sufficiently non-symmetric and has an '
'eigenvalue whose magnitude of the imaginary part is '
'%s which is larger than the acceptable threshold of %s.'
)
raise MinimizerError(msg % (abs(imag), self.IMAG_TOL))
self.hess_eigvals = numpy.real(hess_eigvals)
for eigval in self.hess_eigvals:
if eigval < self.NEG_HESS_TOL:
msg = (
'Hessian eigenvalues have become sufficiently negative.'
)
raise MinimizerError(msg)
self.step_k = -numpy.dot(self.inv_hess, jac_k)
self.angle = get_angle_in_degrees(self.step_k, -1 * jac_k)
if abs(self.angle - 90.) <= self.ANGLE_TOL:
msg = ('The gradient and step direction vectors are becoming '
'orthogonal.')
raise MinimizerError(msg)
# originally a line_search_wolfe12 was called here which wrapped
# a line_search_wolfe1 call followed by a line_search_wolfe2 call
# (called if the first didn't converge) but I don't quite see
# the point for this in this application
ret = self.line_search_wolfe1(fun,
jac,
x_k,
self.step_k,
gfk=jac_k,
old_fval=fun_k,
old_old_fval=old_fun_k,
c1=self.c1,
c2=self.c2,
amax=self.amax,
amin=self.amin,
xtol=self.xtol)
if ret[0] is None:
msg = (
'Desired error not necessarily achieved due to precision loss.'
)
raise MinimizerError(msg)
else:
self.step_size, n_fun_k, n_jac_k, fun_k, old_fun_k, new_jac_k = ret
if not numpy.isfinite(fun_k):
msg = ('Function value became infinite.')
raise MinimizerError(msg)
self.iteration += 1
s_k = self.step_size * self.step_k
x_k = x_k + s_k
if new_jac_k is None:
new_jac_k = jac(x_k)
y_k = new_jac_k - jac_k
jac_k = new_jac_k
max_jac_k, rms_jac_k, len_jac_k = get_max_and_rms_and_len(jac_k)
if max_jac_k <= self.max_force:
break
msg = 'Divide-by-zero encountered: rho_k set to %s.'
try:
rho_k = old_div(1.0, numpy.dot(y_k, s_k))
except ZeroDivisionError:
rho_k = self.LARGE_RHO_K
if self.logger:
self.logger.info(msg % rho_k)
if numpy.isinf(rho_k):
rho_k = self.LARGE_RHO_K
if self.logger:
self.logger.info(msg % rho_k)
A1 = idty - rho_k * s_k[:, numpy.newaxis] * y_k[numpy.newaxis, :]
A2 = idty - rho_k * y_k[:, numpy.newaxis] * s_k[numpy.newaxis, :]
self.inv_hess = numpy.dot(A1, numpy.dot(self.inv_hess, A2)) + \
rho_k * s_k[:, numpy.newaxis] * s_k[numpy.newaxis, :]
good_status = 0
if self.iteration > self.max_iterations:
msg = 'Maximum number of iterations has been exceeded.'
else:
msg = 'Optimization terminated successfully.'
return scipy.optimize.optimize.OptimizeResult(
fun=fun_k,
jac=jac_k,
hess_inv=self.inv_hess,
status=good_status,
success=(good_status == 0),
message=msg,
x=x_k,
nit=self.iteration)
[docs]class EnergyAndGradients(object):
"""
Manage energy and gradient calls.
"""
[docs] def __init__(self,
astructure,
data=DEFAULT_DATA,
scf_gs=DEFAULT_SCF_GS,
kwargs=DEFAULT_KWARGS,
base_name=DEFAULT_BASE_NAME,
all_results=DEFAULT_ALL_RESULTS,
convergence_dict=DEFAULT_CONVERGENCE_DICT,
max_iterations=DEFAULT_ITERATIONS,
method=DEFAULT_METHOD,
perp_factor=DEFAULT_PERP_FACTOR,
para_factor=DEFAULT_PARA_FACTOR,
all_geometries=DEFAULT_ALL_GEOMETRIES,
host=DEFAULT_HOST,
nproc=DEFAULT_NPROC,
tpp=DEFAULT_TPP,
verbose=DEFAULT_VERBOSE,
logger=None,
bfgs_obj=None):
"""
Create an instance.
:type astructure: `schrodinger.structure.Structure`
:param astructure: the structure for which the energy
and gradients are needed
:type data: list
:param data: contains (charge, multiplicity, state) tuples
for the jobs for the two MECP states, for electronic state 0
is the ground state and <N> is the N-th excited state
:type scf_gs: bool
:param scf_gs: specify whether ground states should be
determined using an SCF
:type kwargs: dict
:param kwargs: dictionary of Jaguar &gen section key-value
pairs
:type base_name: str
:param base_name: a base name used to name the jobs and their
related files
:type all_results: bool
:param all_results: use this option to copy all subdirectories
containing results from intermediate Jaguar force, etc. calculations
back to the launch host
:type convergence_dict: dict
:param convergence_dict: contains various convergence
thresholds
:type max_iterations: int
:param max_iterations: the maximum number of MECP geometry optimization
iterations
:type method: str
:param method: the method to use to determine the MECP
:type perp_factor: float
:param perp_factor: prefactor for the energy term whose gradient
lies perpendicular to the crossing seam
:type para_factor: float
:param para_factor: prefactor for the energy term whose gradient
lies parallel to the crossing seam
:type all_geometries: bool
:param all_geometries: use this option to report all geometries, i.e.
the geometries from all MECP geometries iterations will be reported in
the output
:type host: str
:param host: the host on which to run the jobs
:type nproc: int
:param nproc: the number of processors to use for running the
jobs, i.e. the number of simultaneous jobs
:type tpp: int
:param tpp: the number of threads to use for the jobs, i.e.
-TPP (threads-per-process)
:type verbose: bool
:param verbose: specifies verbose logging
:type logger: logging.Logger or None
:param logger: output logger or None if there isn't one
:type bfgs_obj: BFGS
:param bfgs_obj: a BFGS object that manages the optimization
"""
self.astructure = astructure.copy()
self.data = data
self.scf_gs = scf_gs
self.kwargs = OrderedDict(kwargs)
self.base_name = base_name
self.all_results = all_results
self.convergence_dict = convergence_dict
self.max_iterations = max_iterations
self.method = method
self.perp_factor = perp_factor
self.para_factor = para_factor
self.all_geometries = all_geometries
self.host = host
self.nproc = nproc
self.tpp = tpp
self.verbose = verbose
self.logger = logger
self.bfgs_obj = bfgs_obj
self.jobbe = jobcontrol.get_backend()
self.mae_out_file = self.base_name + ALL_FILE_SUFFIX
self.mae_out_writer = None
self.jaguar_jobs = None
self.iteration_launch_dir = None
self.prev_mecp_step = None
self.summary_lines = []
self.call_number = None
self.setUpMaeWriter()
self.resetCallNumber()
self.setUpJaguarJobs()
[docs] def setUpMaeWriter(self):
"""
Set up the Maestro output writer.
"""
if self.all_geometries:
self.mae_out_writer = structure.MaestroWriter(self.mae_out_file)
if self.jobbe:
self.jobbe.addLogFile(self.mae_out_file)
[docs] def tearDownMaeWriter(self, exception):
"""
Tear down the Maestro output writer and then raise
an exception.
:type exception: Exception
:param exception: the exception to raise after tear down
"""
if self.mae_out_writer:
self.mae_out_writer.close()
raise exception
[docs] def resetCallNumber(self):
"""
Reset the call number.
"""
self.call_number = 1
[docs] def setUpJaguarJobs(self):
"""
Set up for the Jaguar jobs.
"""
self.kwargs['igeopt'] = -1
self.jaguar_jobs = JaguarJobs(data=self.data,
scf_gs=self.scf_gs,
kwargs=self.kwargs,
base_name=self.base_name,
host=self.host,
nproc=self.nproc,
tpp=self.tpp,
logger=self.logger)
[docs] def logHeader(self):
"""
Log a header.
"""
if self.verbose and self.logger:
header = 'Iteration %s, call number %s' % (self.bfgs_obj.iteration,
self.call_number)
self.logger.info('+' * len(header))
self.logger.info(header)
self.logger.info('+' * len(header))
self.logger.info('')
[docs] def newIteration(self):
"""
Return True if this is a new iteration.
:rtype: bool
:return: True if this is a new iteration, False otherwise
"""
if not self.prev_mecp_step:
return True
else:
return self.bfgs_obj.iteration > self.prev_mecp_step.iteration
[docs] def setUpIterationLaunchDir(self):
"""
Set up the launch directory for this MECP iteration.
:rtype: str
:return: the launch directory for this MECP iteration
"""
self.iteration_launch_dir = self.base_name + '_' + \
str(self.bfgs_obj.iteration)
if not os.path.exists(self.iteration_launch_dir):
os.mkdir(self.iteration_launch_dir)
[docs] def getLaunchSubDir(self):
"""
Return the name of the launch subdirectory for the current job.
:rtype: str
:return: the launch subdirectory for the current job
"""
adir = os.path.join(self.iteration_launch_dir,
self.base_name + '_' + str(self.call_number))
return adir
[docs] def runJobs(self, launch_dir):
"""
Run the jobs.
:type launch_dir: str
:param launch_dir: the launch directory for the jobs
:rtype: list
:return: contains the JaguarJob instances for the two jobs
"""
if self.prev_mecp_step:
in_templates = self.prev_mecp_step.in_templates
else:
in_templates = None
try:
jobs = self.jaguar_jobs.runIt(self.astructure,
in_templates=in_templates,
launch_dir=launch_dir)
except JaguarError as err:
if self.jobbe:
self.jobbe.copyOutputFile(launch_dir)
self.tearDownMaeWriter(err)
if self.all_results and self.jobbe:
self.jobbe.copyOutputFile(launch_dir)
return jobs
[docs] def logSummary(self):
"""
Log a summary.
"""
if self.logger:
header = 'Summary'
self.logger.info(header)
self.logger.info('-' * len(header))
self.prev_mecp_step.logSummaryHeader()
for aline in self.summary_lines:
self.logger.info(aline)
self.logger.info('')
[docs] def handleStructure(self, x_to_use, step=None):
"""
Handle setting and writing the structure.
:type x_to_use: numpy.array
:param x_to_use: the geometry vector to use (N X 1)
:type step: MECPStep or None
:param step: the step from which to obtain the energy
to use or None if there is no energy
"""
if step is not None:
iteration = step.iteration
call_number = step.call_number
else:
iteration = self.bfgs_obj.iteration
call_number = self.call_number
self.astructure.setXYZ(vector_to_matrix(x_to_use))
properties = {TITLE_KEY: self.base_name + '_' + \
str(iteration) + '_' + str(call_number)}
if step is not None:
energies = {
MECP_ENERGY_1_KEY: step.energy_1,
MECP_ENERGY_2_KEY: step.energy_2
}
properties.update(energies)
else:
self.astructure.property.pop(MECP_ENERGY_1_KEY)
self.astructure.property.pop(MECP_ENERGY_2_KEY)
set_properties_and_write_structure(self.astructure, properties,
self.mae_out_writer)
[docs] def finishPrevStep(self, next_x):
"""
Finish the previous step.
:type next_x: numpy.array
:param next_x: the next geometry vector (N X 1)
"""
if self.newIteration() or \
self.bfgs_obj.iteration == self.call_number - 1 == 1:
self.prev_mecp_step.logHessianEigVals()
self.prev_mecp_step.logStepInfo()
self.prev_mecp_step.handleFinalGeometry(next_x)
self.prev_mecp_step.logSummary()
self.summary_lines.append(self.prev_mecp_step.summary_line)
self.handleStructure(self.prev_mecp_step.x_initial, self.prev_mecp_step)
try:
self.prev_mecp_step.terminate(self.convergence_dict)
except TerminateException as term_excp:
if not self.verbose and self.logger:
self.logger.info('')
self.tearDownMaeWriter(term_excp)
[docs] def handleFiniteDifferences(self, next_x, finite_diff_call):
"""
Handle finite difference calls.
:type next_x: numpy.array
:param next_x: the next geometry vector (N X 1)
:type finite_diff_call: int
:param finite_diff_call: the index of a finite difference
call
:rtype: float, numpy.array
:return: the energy and gradients (N X 1)
"""
self.astructure.setXYZ(vector_to_matrix(next_x))
launch_dir = self.base_name + '_finite_diff_' + str(finite_diff_call)
if not os.path.exists(launch_dir):
os.mkdir(launch_dir)
jobs = self.runJobs(launch_dir)
mecp_step = MECPStep(self.bfgs_obj.iteration, self.call_number, next_x,
self.method, self.perp_factor, self.para_factor,
self.verbose, self.logger, self.bfgs_obj)
mecp_step.setJobData(jobs, self.prev_mecp_step)
return mecp_step.weighted_energy, mecp_step.weighted_gradients
[docs] def getEnergyAndGradients(self, next_x):
"""
Return the energy and gradients that drive the MECP
geometry optimization.
:type next_x: numpy.array
:param next_x: the next geometry vector (N X 1)
:rtype: float, numpy.array
:return: the energy and gradients (N X 1)
"""
if self.bfgs_obj.finite_diff_call:
return self.handleFiniteDifferences(next_x,
self.bfgs_obj.finite_diff_call)
if self.prev_mecp_step and \
numpy.all(next_x == self.prev_mecp_step.x_initial):
return self.prev_mecp_step.weighted_energy, self.prev_mecp_step.weighted_gradients
if self.prev_mecp_step:
self.finishPrevStep(next_x)
if self.newIteration():
self.resetCallNumber()
if self.bfgs_obj.iteration > self.max_iterations:
self.handleStructure(next_x)
msg = ('This optimization has reached the '
'given maximum number of iterations, i.e. %s.')
self.tearDownMaeWriter(IterationError(msg %
self.max_iterations))
else:
self.setUpIterationLaunchDir()
self.astructure.setXYZ(vector_to_matrix(next_x))
self.logHeader()
mecp_step = MECPStep(self.bfgs_obj.iteration, self.call_number, next_x,
self.method, self.perp_factor, self.para_factor,
self.verbose, self.logger, self.bfgs_obj)
mecp_step.logInitialGeometry()
launch_subdir = self.getLaunchSubDir()
jobs = self.runJobs(launch_subdir)
mecp_step.setJobData(jobs, self.prev_mecp_step)
mecp_step.logJobData()
self.prev_mecp_step = mecp_step
self.call_number += 1
return mecp_step.weighted_energy, mecp_step.weighted_gradients
[docs]class MECP(object):
"""
Manages finding MECPs.
"""
[docs] def __init__(self,
astructure,
charge_1=DEFAULT_CHARGE_1,
charge_2=DEFAULT_CHARGE_2,
multip_1=DEFAULT_MULTIP_1,
multip_2=DEFAULT_MULTIP_2,
state_1=DEFAULT_STATE_1,
state_2=DEFAULT_STATE_2,
scf_gs=DEFAULT_SCF_GS,
jaguar_kwargs=DEFAULT_KWARGS,
iterations=DEFAULT_ITERATIONS,
eps=DEFAULT_EPS,
init_hess=DEFAULT_HESS,
method=DEFAULT_METHOD,
perp_factor=DEFAULT_PERP_FACTOR,
para_factor=DEFAULT_PARA_FACTOR,
convergence_dict=DEFAULT_CONVERGENCE_DICT,
guess_geom=DEFAULT_GUESS_GEOM,
all_geometries=DEFAULT_ALL_GEOMETRIES,
all_results=DEFAULT_ALL_RESULTS,
properties=DEFAULT_PROPERTIES,
base_name=DEFAULT_BASE_NAME,
tpp=DEFAULT_TPP,
nresources=DEFAULT_NRESOURCES,
verbose=DEFAULT_VERBOSE,
logger=None):
"""
Create an instance.
:type astructure: `schrodinger.structure.Structure`
:param astructure: the structure for which the MECP
is needed
:type charge_1: int
:param charge_1: net molecular charge of the first state
:type charge_2: int
:param charge_2: net molecular charge of the second state
:type multip_1: int
:param multip_1: multiplicity of the first state
:type multip_2: int
:param multip_2: multiplicity of the second state
:type state_1: int
:param state_1: the first state, 0 for ground state, N for
the N-th excited state
:type state_2: int
:param state_2: the second state, 0 for ground state, N for
the N-th excited state
:type scf_gs: bool
:param scf_gs: specify whether ground states should be
determined using an SCF
:type jaguar_kwargs: dict
:param jaguar_kwargs: dictionary of Jaguar &gen section key/value
pairs
:type iterations: int
:param iterations: the maximum number of MECP geometry optimization
iterations
:type eps: float
:param eps: step size in Angstrom for any finite difference
approximations
:type init_hess: str
:param init_hess: the type of initial Hessian to use
:type method: str
:param method: the method to use to determine the MECP
:type perp_factor: float
:param perp_factor: prefactor for the energy term whose gradient
lies perpendicular to the crossing seam
:type para_factor: float
:param para_factor: prefactor for the energy term whose gradient
lies parallel to the crossing seam
:type convergence_dict: dict
:param convergence_dict: contains various convergence
thresholds
:type guess_geom: str
:param guess_geom: the initial guess geometry to use for the MECP
geometry optimization, choices are GUESS_GEOM_CHOICES
:type all_geometries: bool
:param all_geometries: use this option to report all geometries, i.e.
the geometries from all MECP geometries iterations will be reported in
the output
:type all_results: bool
:param all_results: use this option to copy all subdirectories
containing results from intermediate Jaguar force, etc. calculations
back to the launch host
:type properties: bool
:param properties: whether to calculate properties of the MECP or not
:type base_name: str
:param base_name: a base name used to name the job and its related
files
:type tpp: int
:param tpp: the number of threads to use for any intermediate Jaguar
calculations, i.e. -TPP (threads-per-process)
:type nresources: int
:param nresources: the number of resources to use for Jaguar
calculations, i.e. the number of threads * the number of processors
:type verbose: bool
:param verbose: specifies verbose logging
:type logger: logging.Logger or None
:param logger: output logger or None if there isn't one
"""
self.astructure = astructure
self.charge_1 = charge_1
self.charge_2 = charge_2
self.multip_1 = multip_1
self.multip_2 = multip_2
self.state_1 = state_1
self.state_2 = state_2
self.scf_gs = scf_gs
self.jaguar_kwargs = OrderedDict(jaguar_kwargs)
self.iterations = iterations
self.eps = eps
self.init_hess = init_hess
self.method = method
self.perp_factor = perp_factor
self.para_factor = para_factor
self.convergence_dict = DEFAULT_CONVERGENCE_DICT.copy()
self.convergence_dict.update(convergence_dict)
self.guess_geom = guess_geom
self.all_geometries = all_geometries
self.all_results = all_results
self.properties = properties
self.base_name = base_name
self.tpp = tpp
self.nresources = nresources
self.verbose = verbose
self.logger = logger
self.data = [(self.charge_1, self.multip_1, self.state_1), \
(self.charge_2, self.multip_2, self.state_2)]
self.jobbe = jobcontrol.get_backend()
self.nproc = None
self.mae_out_file = self.base_name + OUT_FILE_SUFFIX
self.mae_out_writer = None
self.mae_out_all_file = None
self.opt_1_data = {}
self.opt_2_data = {}
self.mecp_data = {}
[docs] def setSCFGS(self):
"""
Set the SCF ground state.
"""
if not self.scf_gs:
self.scf_gs = not (self.multip_1 == 3 or self.multip_2 == 3)
[docs] def setJaguarKwargs(self):
"""
Set the Jaguar kwargs.
"""
for key in RESERVED_JAGUAR_KEYS:
self.jaguar_kwargs.pop(key, None)
defaults = OrderedDict(DEFAULT_KWARGS)
if 'dftname' not in self.jaguar_kwargs:
defaults.pop('dftname')
defaults.update(self.jaguar_kwargs)
self.jaguar_kwargs = defaults
if not self.state_1 and not self.state_2 and self.scf_gs:
self.jaguar_kwargs.pop('itddft')
self.jaguar_kwargs.pop('itda')
[docs] def setParallelization(self):
"""
Set the parallelization options.
"""
self.nproc = old_div(self.nresources, self.tpp)
[docs] def setUpMaeWriter(self):
"""
Set up the Maestro output writer.
"""
self.mae_out_writer = structure.MaestroWriter(self.mae_out_file)
[docs] def tearDownMaeWriter(self, exception=None):
"""
Tear down the Maestro output writer and then raise
an exception if there is one.
:type exception: Exception or None
:param exception: the exception to raise after tear down
or None if there isn't one
"""
self.mae_out_writer.close()
if exception is not None:
raise exception
[docs] def logParams(self):
"""
Log the parameters.
"""
header = 'Parameters'
charge_1 = textlogger.get_param_string('Charge 1', self.charge_1,
MSG_WIDTH)
multip_1 = \
textlogger.get_param_string('Multiplicity 1', self.multip_1, MSG_WIDTH)
state_1 = textlogger.get_param_string('State 1', self.state_1,
MSG_WIDTH)
charge_2 = textlogger.get_param_string('Charge 2', self.charge_2,
MSG_WIDTH)
multip_2 = \
textlogger.get_param_string('Multiplicity 2', self.multip_2, MSG_WIDTH)
state_2 = textlogger.get_param_string('State 2', self.state_2,
MSG_WIDTH)
scf_gs = textlogger.get_param_string('SCF ground state', self.scf_gs,
MSG_WIDTH)
iterations = \
textlogger.get_param_string('Iterations', self.iterations, MSG_WIDTH)
eps = textlogger.get_param_string(
'Step size for finite differences / Ang.', self.eps, MSG_WIDTH)
init_hess = \
textlogger.get_param_string('Initial Hessian', self.init_hess, MSG_WIDTH)
method = \
textlogger.get_param_string('Method', self.method, MSG_WIDTH)
perp_factor = textlogger.get_param_string('Perpendicular factor',
self.perp_factor, MSG_WIDTH)
para_factor = textlogger.get_param_string('Parallel factor',
self.para_factor, MSG_WIDTH)
delta_energy = textlogger.get_param_string(
'Convergence: Delta energy / Hartree',
self.convergence_dict[DELTA_ENERGY], MSG_WIDTH)
max_force = textlogger.get_param_string(
'Convergence: Max force / Hartree/Ang.',
self.convergence_dict[MAX_FORCE], MSG_WIDTH)
rms_force = textlogger.get_param_string(
'Convergence: RMS force / Hartree/Ang.',
self.convergence_dict[RMS_FORCE], MSG_WIDTH)
max_displacement = textlogger.get_param_string(
'Convergence: Max displacement / Ang.',
self.convergence_dict[MAX_DISPLACEMENT], MSG_WIDTH)
rms_displacement = textlogger.get_param_string(
'Convergence: RMS displacement / Ang.',
self.convergence_dict[RMS_DISPLACEMENT], MSG_WIDTH)
guess_geom = \
textlogger.get_param_string('Guess geometry', self.guess_geom, MSG_WIDTH)
all_geometries = textlogger.get_param_string('All geometries',
self.all_geometries,
MSG_WIDTH)
all_results = textlogger.get_param_string('All results',
self.all_results, MSG_WIDTH)
properties = textlogger.get_param_string('Properties', self.properties,
MSG_WIDTH)
base_name = textlogger.get_param_string('Base name', self.base_name,
MSG_WIDTH)
verbose = textlogger.get_param_string('Verbose logging', self.verbose,
MSG_WIDTH)
nproc = textlogger.get_param_string('Number of processors', self.nproc,
MSG_WIDTH)
tpp = textlogger.get_param_string('Threads-per-process (TPP)', self.tpp,
MSG_WIDTH)
self.logger.info(header)
self.logger.info('-' * len(header))
self.logger.info(charge_1)
self.logger.info(multip_1)
self.logger.info(state_1)
self.logger.info(charge_2)
self.logger.info(multip_2)
self.logger.info(state_2)
self.logger.info(scf_gs)
self.logger.info(iterations)
self.logger.info(eps)
self.logger.info(init_hess)
self.logger.info(method)
self.logger.info(perp_factor)
self.logger.info(para_factor)
self.logger.info(delta_energy)
self.logger.info(max_force)
self.logger.info(rms_force)
self.logger.info(max_displacement)
self.logger.info(rms_displacement)
self.logger.info(guess_geom)
self.logger.info(all_geometries)
self.logger.info(all_results)
self.logger.info(properties)
self.logger.info(base_name)
self.logger.info(verbose)
self.logger.info(nproc)
self.logger.info(tpp)
self.logger.info('')
header = 'Jaguar options'
self.logger.info(header)
self.logger.info('-' * len(header))
for pair in self.jaguar_kwargs.items():
self.logger.info('%s=%s' % pair)
self.logger.info('')
[docs] def doStateOptimizations(self):
"""
Do the geometry optimizations for the individual
states and record their data.
"""
data = []
if self.properties:
data.extend(self.data)
elif self.guess_geom == OPT_1:
data.append(self.data[0])
elif self.guess_geom == OPT_2:
data.append(self.data[1])
if not data:
return
if self.logger:
msg = ('Performing geometry optimizations of requested state(s).')
self.logger.info(msg)
self.logger.info('')
kwargs = OrderedDict(self.jaguar_kwargs)
kwargs['igeopt'] = 1
jaguar_jobs = JaguarJobs(data=data,
scf_gs=self.scf_gs,
kwargs=kwargs,
base_name=self.base_name,
host=DEFAULT_HOST,
nproc=self.nproc,
tpp=self.tpp,
logger=self.logger)
launch_dir = self.base_name + '_' + STATE_OPTIMIZATIONS
try:
jobs = jaguar_jobs.runIt(self.astructure,
in_templates=None,
launch_dir=launch_dir)
except JaguarError as err:
if self.jobbe:
self.jobbe.copyOutputFile(launch_dir)
self.tearDownMaeWriter(err)
if self.all_results and self.jobbe:
self.jobbe.copyOutputFile(launch_dir)
dicts = [{'structure': job.getFinalStructure(), \
'energy': job.getFinalTotalEnergy()} for job in jobs]
if self.properties:
self.opt_1_data, self.opt_2_data = dicts
set_properties_and_write_structure(
self.opt_1_data['structure'],
{TITLE_KEY: self.base_name + '_' + OPT_1}, self.mae_out_writer)
set_properties_and_write_structure(
self.opt_2_data['structure'],
{TITLE_KEY: self.base_name + '_' + OPT_2}, self.mae_out_writer)
if self.logger:
msg = ('State %s minimum Energy / Hartree: %.8f')
self.logger.info(msg % ('1', self.opt_1_data['energy']))
self.logger.info(msg % ('2', self.opt_2_data['energy']))
self.logger.info('')
elif self.guess_geom == OPT_1:
self.opt_1_data = dicts[0]
elif self.guess_geom == OPT_2:
self.opt_2_data = dicts[0]
[docs] def getGuessGeometry(self):
"""
Return the guess geometry.
:rtype: numpy.array
:return: the guess geometry (N X 1)
"""
msg = ('Using %s geometry as initial guess geometry.')
if self.guess_geom == INPUT:
x_0 = self.astructure.getXYZ()
msg = msg % INPUT
elif self.guess_geom == OPT_1:
x_0 = self.opt_1_data['structure'].getXYZ()
msg = msg % OPT_1
elif self.guess_geom == OPT_2:
x_0 = self.opt_2_data['structure'].getXYZ()
msg = msg % OPT_2
if self.logger:
self.logger.info(msg)
self.logger.info('')
return matrix_to_vector(x_0)
[docs] def postProcessOptimization(self, energy_and_gradients):
"""
Post process the optimization.
:type energy_and_gradients: EnergyAndGradients
:param energy_and_gradients: the instance passed to the optimizer
"""
get_energy = lambda x: energy_and_gradients.astructure.property.get(x)
get_barrier = lambda x, y: hartree_to_kcal_per_mol(x - y)
self.mecp_data['structure'] = energy_and_gradients.astructure
properties = {TITLE_KEY: self.base_name + '_' + DEFAULT_BASE_NAME}
if energy_and_gradients.prev_mecp_step:
if self.verbose and self.logger:
energy_and_gradients.logSummary()
energy_1, energy_2 = list(
map(get_energy, [MECP_ENERGY_1_KEY, MECP_ENERGY_2_KEY]))
if energy_1 is not None and energy_2 is not None:
self.mecp_data['energy_1'] = energy_1
self.mecp_data['energy_2'] = energy_2
energies = {
MECP_ENERGY_1_KEY: energy_1,
MECP_ENERGY_2_KEY: energy_2
}
properties.update(energies)
if self.properties:
barriers = {BARRIER_1_KEY: get_barrier(energy_1, self.opt_1_data['energy']), \
BARRIER_2_KEY: get_barrier(energy_2, self.opt_2_data['energy'])}
properties.update(barriers)
if self.logger:
msg = (
'%s barrier (E_MECP - E_min) (State %s) / kcal/mol: %.3f'
)
self.logger.info(
msg % ('Forward', '1', barriers[BARRIER_1_KEY]))
self.logger.info(
msg % ('Reverse', '2', barriers[BARRIER_2_KEY]))
self.logger.info('')
set_properties_and_write_structure(self.mecp_data['structure'],
properties, self.mae_out_writer)
[docs] def getBFGSObject(self, x_0):
"""
Return a BFGS object.
:type x_0: numpy.array
:param x_0: the guess geometry (N X 1)
:rtype: BFGS
:return: the object to manage the BFGS optimization
"""
kwargs = {}
kwargs['c1'] = DEFAULT_C1
kwargs['c2'] = DEFAULT_C2
kwargs['amax'] = DEFAULT_MAX_STEP_SIZE
kwargs['amin'] = DEFAULT_MIN_STEP_SIZE
kwargs['xtol'] = DEFAULT_XTOL
# set the max_force and max_iterations larger than that
# requested to ensure that we handle the termination
# from EnergyAndGradients.getEnergyAndGradients rather
# than from BFGS.minimize
kwargs['max_force'] = self.convergence_dict[MAX_FORCE] * 1E-10
kwargs['max_iterations'] = self.iterations + 1E10
kwargs['eps'] = self.eps
kwargs['init_hess'] = self.init_hess
kwargs['verbose'] = self.verbose
kwargs['logger'] = self.logger
return BFGS(**kwargs)
[docs] def getEnergyAndGradientsObject(self, bfgs_obj):
"""
Return a EnergyAndGradients object.
:type bfgs_obj: BFGS
:param bfgs_obj: the object to manage the BFGS optimization
:rtype: EnergyAndGradients
:return: the object to manage the energy and gradient calls
for the BFGS optimization
"""
kwargs = {}
kwargs['data'] = self.data
kwargs['scf_gs'] = self.scf_gs
kwargs['kwargs'] = self.jaguar_kwargs
kwargs['base_name'] = self.base_name
kwargs['all_results'] = self.all_results
kwargs['convergence_dict'] = self.convergence_dict
kwargs['max_iterations'] = self.iterations
kwargs['method'] = self.method
kwargs['perp_factor'] = self.perp_factor
kwargs['para_factor'] = self.para_factor
kwargs['all_geometries'] = self.all_geometries
kwargs['host'] = DEFAULT_HOST
kwargs['nproc'] = self.nproc
kwargs['tpp'] = self.tpp
kwargs['verbose'] = self.verbose
kwargs['logger'] = self.logger
kwargs['bfgs_obj'] = bfgs_obj
return EnergyAndGradients(self.astructure, **kwargs)
[docs] def getOptKwargs(self, bfgs_obj):
"""
Return the kwargs for the optimization.
:type bfgs_obj: BFGS
:param bfgs_obj: the object to manage the BFGS optimization
:rtype: dict
:return: the parameters for the optimization
"""
kwargs = {}
kwargs['method'] = bfgs_obj.minimize
kwargs['jac'] = True
return kwargs
[docs] def optimize(self):
"""
Optimize the geometry to the MECP.
"""
if DEBUG is None:
x_0 = self.getGuessGeometry()
else:
nvar = 3 * self.astructure.atom_total
x_0 = get_guess(nvar=nvar)
bfgs_obj = self.getBFGSObject(x_0)
energy_and_gradients = self.getEnergyAndGradientsObject(bfgs_obj)
kwargs = self.getOptKwargs(bfgs_obj)
if self.logger:
self.logger.info('Starting optimization')
self.logger.info('')
try:
optimize_result = \
scipy.optimize.minimize(energy_and_gradients.getEnergyAndGradients,
x_0, **kwargs)
except TerminateException as term_excp:
exception = None
if self.logger:
self.logger.info(str(term_excp))
self.logger.info('')
except (IterationError, JaguarError, MinimizerError) as err:
exception = err
self.postProcessOptimization(energy_and_gradients)
if exception:
if energy_and_gradients.mae_out_writer:
energy_and_gradients.mae_out_writer.close()
self.tearDownMaeWriter(exception)
if self.logger:
self.logger.info('Finished optimization')
self.logger.info('')
self.mae_out_all_file = energy_and_gradients.mae_out_file
[docs] def doIncorporation(self):
"""
Incorporate output Maestro files.
"""
zip_out_name = self.base_name + '_all.zip'
out_files = [self.mae_out_file]
if self.all_geometries:
out_files.append(self.mae_out_all_file)
jobutils.zip_and_set_incorporation(zip_out_name, out_files)
[docs] def runIt(self):
"""
Main function to find the MECP.
"""
self.setSCFGS()
self.setJaguarKwargs()
self.checkInput()
self.setParallelization()
if self.logger:
self.logParams()
self.setUpMaeWriter()
if self.jobbe:
self.jobbe.addLogFile(self.mae_out_file)
self.doStateOptimizations()
self.optimize()
self.doIncorporation()
self.tearDownMaeWriter()
[docs]def vector_to_matrix(vector):
"""
Convert vector to matrix.
:type vector: numpy.array
:param vector: N X 1 array
:rtype: numpy.array
:return: N/3 X 3 array
"""
natoms = old_div(vector.size, 3)
return numpy.reshape(vector, (natoms, 3))
[docs]def matrix_to_vector(matrix):
"""
Convert matrix to vector.
:type matrix: numpy.array
:param matrix: N/3 X 3 array
:rtype: numpy.array
:return: N X 1 array
"""
return matrix.flatten()
[docs]def get_max_and_rms_and_len(matrix):
"""
Return the max, RMS, and length of the given matrix.
:type matrix: numpy.array
:param matrix: the matrix
:rtype: float, float, float
:return: the max, RMS, and length of the given matrix
"""
amax = max([abs(numpy.amin(matrix)), abs(numpy.amax(matrix))])
arms = numpy.sqrt(numpy.mean(numpy.square(matrix)))
alen = transform.get_vector_magnitude(matrix_to_vector(matrix))
return amax, arms, alen
[docs]def log_vector(vector, header=None, log_sums=False, logger=None):
"""
Log or print the given vector in a natoms X 3 format.
:type vector: numpy.array
:param vector: the vector to log (N X 1)
:type header: str or None
:param header: a header or None if there isn't one
:type log_sums: bool
:param log_sums: whether to log the sums of x, y, and z
over atoms
:type logger: logging.Logger or None
:param logger: output logger or None if there isn't one
"""
spaces = 3 * ' '
idx_width = 5
xyz_width = 15
afunc = lambda i, x, y, z: spaces.join([
i.ljust(idx_width),
x.rjust(xyz_width),
y.rjust(xyz_width),
z.rjust(xyz_width)
])
lines = []
if header is not None:
lines.extend([header, '-' * len(header)])
first_row = afunc('atom', 'x', 'y', 'z')
lines.extend([first_row, '-' * len(first_row)])
sums = numpy.zeros(3)
for idx, row in enumerate(vector_to_matrix(vector), 1):
sums += row
xyz = tuple(['%.8f' % coord for coord in row])
lines.append(afunc(str(idx), *xyz))
if log_sums:
sums = tuple(['%.8f' % coord for coord in sums])
lines.append(afunc('total', *sums))
if logger:
for line in lines:
logger.info(line)
else:
for line in lines:
print(line)
[docs]def log_geometry(geometry, header=DEFAULT_GEOMETRY_HEADER, logger=None):
"""
Log or print the geometry in a natoms X 3 format.
:type geometry: numpy.array
:param geometry: the geometry to log (N X 1)
:type header: str
:param header: a header
:type logger: logging.Logger or None
:param logger: output logger or None if there isn't one
"""
log_vector(geometry, header=header, logger=logger)
[docs]def log_displacements(displacements,
max_displacement,
rms_displacement,
len_displacement,
header=DEFAULT_DISPLACEMENTS_HEADER,
logger=None):
"""
Log or print the displacements in a natoms X 3 format.
:type displacements: numpy.array
:param displacements: the displacements to log (N X 1)
:type max_displacement: float
:param max_displacement: the magnitude of the largest
displacements element
:type rms_displacement: float
:param rms_displacement: the RMS of displacements
:type len_displacement: float
:param len_displacement: the length of displacements
:type header: str
:param header: a header
:type logger: logging.Logger or None
:param logger: output logger or None if there isn't one
"""
log_vector(displacements, header=header, log_sums=True, logger=logger)
lines = ['Max: %.8f' % max_displacement]
lines.append('RMS: %.8f' % rms_displacement)
lines.append('Length: %.8f' % len_displacement)
if logger:
for line in lines:
logger.info(line)
else:
for line in lines:
print(line)
[docs]def log_energy_and_forces(energy,
forces,
max_force,
rms_force,
len_force,
energy_header=DEFAULT_ENERGY_HEADER,
forces_header=DEFAULT_FORCES_HEADER,
logger=None):
"""
Log or print the energy and forces in a natoms X 3 format.
:type energy: float or None
:param energy: the energy or None if there isn't one
:type forces: numpy.array
:param forces: the forces to log (N X 1)
:type max_force: float
:param max_force: the magnitude of the largest forces element
:type rms_force: float
:param rms_force: the RMS of forces
:type len_force: float
:param len_force: the length of forces
:type energy_header: str
:param energy_header: an energy header
:type forces_header: str
:param forces_header: a forces header
:type logger: logging.Logger or None
:param logger: output logger or None if there isn't one
"""
if energy is not None:
energy_line = energy_header + ': %.8f' % energy
if logger:
logger.info(energy_line)
else:
print(energy_line)
log_vector(forces, header=forces_header, log_sums=True, logger=logger)
lines = ['Max: %.8f' % max_force]
lines.append('RMS: %.8f' % rms_force)
lines.append('Length: %.8f' % len_force)
if logger:
for line in lines:
logger.info(line)
else:
for line in lines:
print(line)
[docs]def reciprocal_bohr_to_angstrom(array):
"""
Return the given array converted from units
of reciprocal Bohr to reciprocal Angstrom.
:type array: numpy.array
:param array: the array to convert
:rtype: numpy.array
:return: the converted array
"""
factor = old_div(scipy.constants.angstrom, \
scipy.constants.physical_constants['Bohr radius'][0])
return factor * array
[docs]def ev_to_hartree(energy):
"""
Return the given energy converted from eV to Hartree.
:rtype: float
:return: energy in Hartree
"""
key = 'electron volt-hartree relationship'
return scipy.constants.physical_constants[key][0] * energy
[docs]def hartree_to_kcal_per_mol(in_hartree):
"""
Return the given energy converted from Hartree to kcal/mol.
:type in_hartree: float
:param in_hartree: energy in Hartree
:rtype: float
:return: energy in kcal/mol
"""
key = 'hartree-joule relationship'
in_joule = in_hartree * scipy.constants.physical_constants[key][0]
in_kcal_per_mol = old_div(in_joule, scipy.constants.calorie)
return in_kcal_per_mol * scipy.constants.N_A / scipy.constants.kilo
[docs]def set_properties_and_write_structure(astructure, properties, writer):
"""
Set the given properties on the structure and write it using
the given writer.
:type astructure: `schrodinger.structure.Structure`
:param astructure: the structure to write
:type properties: dict
:param properties: key/value pairs of properties to write
:type writer: structure.MaestroWriter
:param writer: the writer
"""
for key, value in properties.items():
astructure.property[key] = value
if writer:
writer.append(astructure)
[docs]def get_angle_in_degrees(vec_1, vec_2):
"""
Return the angle in units of degrees between
the given two vectors.
:type vec_1: numpy.array
:param vec_1: the first vector
:type vec_2: numpy.array
:param vec_2: the second vector
:rtype: float
:return: the angle in degrees
"""
angle = transform.get_angle_between_vectors(vec_1, vec_2)
return numpy.rad2deg(angle)