"""
Utilities for working with VOTCA
Copyright Schrodinger, LLC. All rights reserved.
"""
import os
import pathlib
import sqlite3
from collections import OrderedDict
from collections import namedtuple
from schrodinger.application.desmond import cms
from schrodinger.application.matsci import clusterstruct
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci.nano import xtal
from schrodinger.job import jobcontrol
from schrodinger.structutils import analyze
from schrodinger.structutils import transform
DB_VERSION = 0.92
TOPO_ENDING = '_topo.xml'
PDB_ENDING = '.pdb'
MAP_ENDING = '_map.xml'
SQL_ENDING = '.sql'
SEGNAME_ENDING = '_seg'
FRAGNAME_ENDING = '_frag'
TOPOLOGY = 'topology'
MOLECULES = 'molecules'
PARTIAL_RESULTS = 'partial'
HOLE = 'hole'
ELECTRON = 'electron'
CHARGE_TYPES = (HOLE, ELECTRON)
CAP_HOLE = 'Hole'
CAP_ELECTRON = 'Electron'
H_ENDING = 'h'
E_ENDING = 'e'
CHARGE_ENDINGS = {HOLE: H_ENDING, ELECTRON: E_ENDING}
XAX = 'X'
YAX = 'Y'
ZAX = 'Z'
ALL_AXES = [XAX, YAX, ZAX]
AXIS_INDEX = {b: a for a, b in enumerate(ALL_AXES)}
# Make AXIS_INDEX work whether the axis name or index is passed in. That way we
# don't have to preprocess to ensure the axis name is being used as the key.
AXIS_INDEX.update({a: a for a in range(3)})
MOBILITY_TYPE = 'mobility'
VELOCITY_TYPE = 'velocity'
FIELD_TYPE = 'field'
DATABASE_TYPE = 'database'
# VOTCA SQL column names
# Coupling integral squared
JEFF = 'Jeff2'
# Hopping rate
RATE = 'rate'
# Charge occupation fraction
OCCUPATION = 'occP'
DEFAULT_OCCUPANCY = -1
# SQL data types
INT = 'INT'
REAL = 'REAL'
TEXT = 'TEXT'
# Column values
SQL_NOJOB = 'no_job'
SQL_HEAVY = 'heavy'
SQL_ALL = 'all'
SQL_DEPRECATED_NONE = 'none' # Retained for backwards compatibility
SQL_NOFILE = 'nofile'
SQL_NONAME = 'noname'
SQL_NOT_USED = 'NOT_USED'
# Properties
VOTCA_PROP_START = 'r_matsci_KMC_F'
VOTCA_SPROP_START = 's_matsci_KMC_F'
VELOCITY_PROP = '%s{field}_{charge}_Velocity_{axis}_(m/s)' % VOTCA_PROP_START
MOBILITY_PROP = ('%s{field}_{charge}_Mobility_{axis}_(cm^2/Vs)' %
VOTCA_PROP_START)
FIELD_PROP = '%sield_{field}_{axis}_(V/m)' % VOTCA_PROP_START
SQL_FILE = '%s{field}_{charge}_Database' % VOTCA_SPROP_START
PARAM_SQL_FILE = 's_matsci_KMC_Hopping_Params_Database'
VOTCA_JOB_ID = 's_matsci_KMC_Job_ID'
ColumnData = namedtuple('ColumnData', ['type', 'default'])
MoleculeData = namedtuple('MoleculeData',
['index', 'name', 'mtype', 'posx', 'posy', 'posz'])
[docs]def is_votca_prop(prop):
"""
Check if a property is a votca property
:param str prop: The property to check
:rtype: str or None
:return: If the property is a votca property, the type of property is
returned as a module-level constant. If the property is not recognized
as a VOTCA property, None is returned.
"""
if prop.startswith(VOTCA_PROP_START):
if '_Mobility_' in prop:
return MOBILITY_TYPE
elif '_Velocity_' in prop:
return VELOCITY_TYPE
elif '_Field_' in prop:
return FIELD_TYPE
elif prop.startswith(VOTCA_SPROP_START) and prop.endswith('_Database'):
return DATABASE_TYPE
return None
[docs]def parse_mobility_or_velocity_prop(prop):
"""
Parse a property name and return the information from it if it is a VOTCA
mobility or velocity property
:param str prop: The property to check
:rtype: (int, str, str) or None
:return: The integer is the field index, the first string is the charge
(HOLE or ELECTRON) and the second string is the axis name. None is
returned if the property is not a mobility or velocity property.
"""
if is_votca_prop(prop) not in (MOBILITY_TYPE, VELOCITY_TYPE):
return None
tokens = prop.replace(VOTCA_PROP_START, "").split('_')
field = int(tokens[0])
charge = tokens[1]
axis = tokens[3]
return field, charge, axis
[docs]def parse_field_prop(prop):
"""
Parse a property name and return the information from it if it is a VOTCA
field property
:param str prop: The property to check
:rtype: (int, str) or None
:return: The integer is the field index, the string is the axis name. None
is returned if the property is not a field property.
"""
if is_votca_prop(prop) != FIELD_TYPE:
return None
tokens = prop.split('_')
field = int(tokens[4])
axis = tokens[5]
return field, axis
[docs]def parse_database_prop(prop):
"""
Parse a property name and return the information from it if it is a VOTCA
database property
:param str prop: The property to check
:rtype: (int, str) or None
:return: The integer is the field index, the string is the charge
(HOLE or ELECTRON). None is returned if the property is not a database
property.
"""
if is_votca_prop(prop) != DATABASE_TYPE:
return None
tokens = prop.replace(VOTCA_SPROP_START, "").split('_')
field = int(tokens[0])
charge = tokens[1]
return field, charge
[docs]class AxisData(object):
""" Holds data that differs on the X, Y and Z axes """
[docs] def __init__(self):
""" Create an AxisData object """
self.components = [0] * 3
[docs] def setComponent(self, axis, value):
"""
Set the data for one axis
:type axis: str or int
:param axis: Either the capital name of an axis (X, Y, Z) or the
numerical index of that axis
"""
self.components[AXIS_INDEX[axis]] = value
[docs]class SQLCursor(object):
"""
Context manager for reading or modifying an SQL database. Ensures that
changes are commited and the cursor/connection are closed when finished.::
cmd = "black SQL magic"
with SQLCursor(path_to_sql_file) as cursor:
cursor.execute(cmd)
"""
[docs] def __init__(self, path):
self.path = str(path)
self.connection = None
self.cursor = None
def __enter__(self):
self.connection = sqlite3.connect(self.path)
self.cursor = self.connection.cursor()
self.cursor.row_factory = sqlite3.Row
return self.cursor
def __exit__(self, *args):
self.connection.commit()
self.cursor.close()
self.connection.close()
[docs]class Table(object):
""" Base table class for VOTCA SQL tables """
TABLE_NAME = ""
COLUMNS = OrderedDict()
CREATION_COMMAND = 'CREATE TABLE {name} ({columns});'
ADDROW_COMMAND = 'INSERT INTO {name} ({cols}) VALUES ({ph})'
NULL_ALLOWED = False
# Column names
SQL_ID = '_id'
SQID = 'id'
FRAME = 'frame'
TOP = 'top'
NAME = 'name'
TYPE = 'type'
MOL = 'mol'
SEG = 'seg'
POSX = 'posX'
POSY = 'posY'
POSZ = 'posZ'
[docs] def __init__(self, filename):
"""
Create a Table instance
:type filename: str
:param filename: The path to the SQL file
"""
self.filename = filename
self.cursor = None
[docs] def setCursor(self, cursor):
"""
Set the SQL cursor this table should use
:type cursor: sqlite3.Cursor
:param cursor: The cursor to use for database read/writes
"""
self.cursor = cursor
[docs] def create(self):
"""
Create this table in the database
:raise `SQLCreationError`: If the cursor is not defined
"""
if not self.cursor:
SQLCreationError(
f'Table {self.TABLE_NAME} cannot be created without a cursor.')
# This means that a new _id will automatically be created for each new
# row and the value of this will increase by one each time
colinfo = [f'{self.SQL_ID} INTEGER PRIMARY KEY AUTOINCREMENT']
for name, data in self.COLUMNS.items():
if not self.NULL_ALLOWED and data.default is None:
# Columns with no default will require a value
value = 'NOT NULL'
else:
value = 'DEFAULT %s' % data.default
colinfo.append('%s %s %s' % (name, data.type, value))
columns = ', '.join(colinfo)
cmd = self.CREATION_COMMAND.format(name=self.TABLE_NAME,
columns=columns)
self.cursor.execute(cmd)
def _addRow(self, props):
"""
Add a row to this table
:type props: dict
:param props: Non-default row values. Keys are column names, values are
the value for that column.
:raise `SQLCreationError`: If the cursor is not defined
"""
if not self.cursor:
SQLCreationError(
f'Table {self.TABLE_NAME} cannot add a row without a cursor.')
# Create a dict with default values
newline_data = OrderedDict()
for name, data in self.COLUMNS.items():
newline_data[name] = data.default
# Update with caller-supplied values
for name, value in props.items():
if name not in newline_data:
raise KeyError('%s is not a value column name for %s' %
(name, self.TABLE_NAME))
newline_data[name] = value
if not self.NULL_ALLOWED:
# Ensure that all required values are supplied
for name, value in newline_data.items():
if value is None:
raise ValueError('A value for %s must be supplied for %s' %
(name, self.TABLE_NAME))
colnames = ', '.join(newline_data.keys())
placeholders = ', '.join(['?'] * len(newline_data))
cmd = self.ADDROW_COMMAND.format(name=self.TABLE_NAME,
cols=colnames,
ph=placeholders)
self.cursor.execute(cmd, tuple(newline_data.values()))
[docs] def getMoleculeInfo(self, molecule, centroid=True):
"""
Get common database information for a molecule object
:type molecule: `schrodinger.structure._StructureMolecule`
:param molecule: The molecule object to get information for
:type centroid: bool
:param centroid: Include information about the molecule's centroid. If
False, all centroid information will be 0.
:rtype: `MoleculeData`
:return: A MoleculeData object containing the information
"""
index = molecule.number
if centroid:
centroid = transform.get_centroid(molecule.structure,
molecule.getAtomIndices())
else:
centroid = [0, 0, 0]
name = f'{self.getSegmentType(molecule)}_{molecule.number}'
mtype = molecule.atom[1].pdbres
# Positions are stored in nanometers
return MoleculeData(index=index,
name=name,
mtype=mtype,
posx=centroid[0] / 10.,
posy=centroid[1] / 10.,
posz=centroid[2] / 10.)
[docs] @staticmethod
def getSegmentType(molecule):
"""
Get the segment type for this molecule
Segment names will be type_X, where X is the molecule number
:param `structure._Molecule` molecule: The molecule object
:rtype: str
:return: The segment type for this molecule
"""
return molecule.atom[1].pdbres.strip()
[docs] @staticmethod
def getAllSegmentTypes(struct):
"""
Get all the segment types for this structure
:param `structure.Structure` struct: The structure object
:rtype: set
:return: Each item of the set is the name of a segment type
"""
return {Table.getSegmentType(x) for x in struct.molecule}
[docs]class FramesTable(Table):
""" The frames table """
TABLE_NAME = 'frames'
# Column names
TIME = 'time'
STEP = 'step'
BOX11 = 'box11'
BOX12 = 'box12'
BOX13 = 'box13'
BOX21 = 'box21'
BOX22 = 'box22'
BOX23 = 'box23'
BOX31 = 'box31'
BOX32 = 'box32'
BOX33 = 'box33'
CANRIGID = 'canRigid'
# yapf: disable
COLUMNS = OrderedDict(
[(Table.SQID, ColumnData(type=INT, default=0)),
(TIME, ColumnData(type=REAL, default=0)),
(STEP, ColumnData(type=INT, default=0)),
(BOX11, ColumnData(type=REAL, default=None)),
(BOX12, ColumnData(type=REAL, default=None)),
(BOX13, ColumnData(type=REAL, default=None)),
(BOX21, ColumnData(type=REAL, default=None)),
(BOX22, ColumnData(type=REAL, default=None)),
(BOX23, ColumnData(type=REAL, default=None)),
(BOX31, ColumnData(type=REAL, default=None)),
(BOX32, ColumnData(type=REAL, default=None)),
(BOX33, ColumnData(type=REAL, default=None)),
(CANRIGID, ColumnData(type=INT, default=0))])
# yapf: enable
[docs] def addRow(self, struct):
"""
Add a frame row to the table
The main frame information is the PBC box
:type struct: `schrodinger.structure.Structure`
:param struct: The structure with the PBC information
"""
props = {}
try:
chorus = xtal.get_chorus_properties(struct)
except KeyError as err:
raise SQLCreationError(
'The given structure is missing a required PBC property:\n ' +
str(err))
sqlbox = [
self.BOX11, self.BOX12, self.BOX13, self.BOX21, self.BOX22,
self.BOX23, self.BOX31, self.BOX32, self.BOX33
]
for prop, val in zip(sqlbox, chorus):
# Box size is stored in nanometers
props[prop] = val / 10.
self._addRow(props)
[docs]class MoleculesTable(Table):
""" The molecules table"""
TABLE_NAME = 'molecules'
# yapf: disable
COLUMNS = OrderedDict(
[(Table.FRAME, ColumnData(type=INT, default=0)),
(Table.TOP, ColumnData(type=INT, default=0)),
(Table.SQID, ColumnData(type=INT, default=None)),
(Table.NAME, ColumnData(type=TEXT, default=None)),
(Table.TYPE, ColumnData(type=TEXT, default=None))])
# yapf: enable
[docs] def addRow(self, molecule):
"""
Add a row
:type molecule: `schrodinger.structure._StructureMolecule`
:param molecule: The molecule object to add a row for
"""
data = self.getMoleculeInfo(molecule, centroid=False)
props = {}
props[self.SQID] = data.index
props[self.NAME] = data.name
props[self.TYPE] = data.mtype
self._addRow(props)
[docs]class SegmentsTable(Table):
""" The segments table """
TABLE_NAME = 'segments'
UNCNNE = 'UnCnNe'
UNCNNH = 'UnCnNh'
UCNCCE = 'UcNcCe'
UCNCCH = 'UcNcCh'
UCCNNE = 'UcCnNe'
UCCNNH = 'UcCnNh'
EANION = 'eAnion'
ENEUTRAL = 'eNeutral'
ECATION = 'eCation'
HAS_E = 'has_e'
HAS_H = 'has_h'
OCCPE = 'occPe'
OCCPH = 'occPh'
# yapf: disable
COLUMNS = OrderedDict(
[(Table.FRAME, ColumnData(type=INT, default=0)),
(Table.TOP, ColumnData(type=INT, default=0)),
(Table.SQID, ColumnData(type=INT, default=None)),
(Table.NAME, ColumnData(type=TEXT, default=None)),
(Table.TYPE, ColumnData(type=TEXT, default=None)),
(Table.MOL, ColumnData(type=INT, default=None)),
(Table.POSX, ColumnData(type=REAL, default=None)),
(Table.POSY, ColumnData(type=REAL, default=None)),
(Table.POSZ, ColumnData(type=REAL, default=None)),
(UNCNNE, ColumnData(type=REAL, default=0)),
(UNCNNH, ColumnData(type=REAL, default=0)),
(UCNCCE, ColumnData(type=REAL, default=0)),
(UCNCCH, ColumnData(type=REAL, default=0)),
(UCCNNE, ColumnData(type=REAL, default=0)),
(UCCNNH, ColumnData(type=REAL, default=0)),
(EANION, ColumnData(type=REAL, default=0)),
(ENEUTRAL, ColumnData(type=REAL, default=0)),
(ECATION, ColumnData(type=REAL, default=0)),
(HAS_E, ColumnData(type=INT, default=0)),
(HAS_H, ColumnData(type=INT, default=0)),
(OCCPE, ColumnData(type=REAL, default=DEFAULT_OCCUPANCY)),
(OCCPH, ColumnData(type=REAL, default=DEFAULT_OCCUPANCY))])
SITE_ENERGY_PROPS = {ELECTRON: [UCCNNE, UNCNNE, UCNCCE],
HOLE: [UCCNNH, UNCNNH, UCNCCH]}
# yapf: enable
[docs] def addRow(self, molecule, stypes):
"""
Add a row
:type molecule: `schrodinger.structure._StructureMolecule`
:param molecule: The molecule object to add a row for
:type stypes: dict
:param stypes: Kyes are segment names (atom pdbres names), values are
the index of that segment
"""
data = self.getMoleculeInfo(molecule)
props = {}
props[self.SQID] = data.index
props[self.NAME] = data.name
props[self.TYPE] = stypes[self.getSegmentType(molecule)]
props[self.MOL] = data.index
props[self.POSX] = data.posx
props[self.POSY] = data.posy
props[self.POSZ] = data.posz
self._addRow(props)
[docs]class SegmentTypesTable(Table):
""" The segmentTypes table """
TABLE_NAME = 'segmentTypes'
BASIS = 'basis'
ORBFILE = 'orbfile'
TORBNRS = 'torbnrs'
COORDFILE = 'coordfile'
CANRIGID = 'canRigid'
# yapf: disable
COLUMNS = OrderedDict(
[(Table.FRAME, ColumnData(type=INT, default=0)),
(Table.TOP, ColumnData(type=INT, default=0)),
(Table.SQID, ColumnData(type=INT, default=None)),
(Table.NAME, ColumnData(type=TEXT, default=None)),
(BASIS, ColumnData(type=TEXT, default=SQL_NONAME)),
(ORBFILE, ColumnData(type=TEXT, default=SQL_NOFILE)),
(TORBNRS, ColumnData(type=TEXT, default=SQL_NOT_USED)),
(COORDFILE, ColumnData(type=TEXT, default=SQL_NOFILE)),
(CANRIGID, ColumnData(type=INT, default=0))])
# yapf: enable
[docs] def addRow(self, stype, index):
"""
Add a row
:type stype: str
:param stype: The segment type name (should be an atom.pdbres name)
:type index: int
:param index: The segment type index
"""
props = {}
props[self.SQID] = index
props[self.NAME] = stype
self._addRow(props)
[docs]class FragmentsTable(Table):
""" The fragments table """
TABLE_NAME = 'fragments'
SYMMETRY = 'symmetry'
LEG1 = 'leg1'
LEG2 = 'leg2'
LEG3 = 'leg3'
# yapf: disable
COLUMNS = OrderedDict(
[(Table.FRAME, ColumnData(type=INT, default=0)),
(Table.TOP, ColumnData(type=INT, default=0)),
(Table.SQID, ColumnData(type=INT, default=None)),
(Table.NAME, ColumnData(type=TEXT, default=None)),
(Table.TYPE, ColumnData(type=TEXT, default=None)),
(Table.MOL, ColumnData(type=INT, default=None)),
(Table.SEG, ColumnData(type=INT, default=None)),
(Table.POSX, ColumnData(type=REAL, default=None)),
(Table.POSY, ColumnData(type=REAL, default=None)),
(Table.POSZ, ColumnData(type=REAL, default=None)),
(SYMMETRY, ColumnData(type=INT, default=-1)),
(LEG1, ColumnData(type=INT, default=1)),
(LEG2, ColumnData(type=INT, default=2)),
(LEG3, ColumnData(type=INT, default=3))])
# yapf: enable
[docs] def addRow(self, molecule):
"""
Add a row
:type molecule: `schrodinger.structure._StructureMolecule`
:param molecule: The molecule object to add a row for
"""
data = self.getMoleculeInfo(molecule)
props = {}
props[self.SQID] = data.index
props[self.NAME] = data.name
props[self.TYPE] = data.mtype
props[self.MOL] = data.index
props[self.SEG] = data.index
props[self.POSX] = data.posx
props[self.POSY] = data.posy
props[self.POSZ] = data.posz
self._addRow(props)
[docs]class AtomsTable(Table):
""" The atoms table """
TABLE_NAME = 'atoms'
FRAG = 'frag'
RESNR = 'resnr'
RESNAME = 'resname'
WEIGHT = 'weight'
ELEMENT = 'element'
QMID = 'qmid'
QMPOSX = 'qmPosX'
QMPOSY = 'qmPosY'
QMPOSZ = 'qmPosZ'
# yapf: disable
COLUMNS = OrderedDict(
[(Table.FRAME, ColumnData(type=INT, default=0)),
(Table.TOP, ColumnData(type=INT, default=0)),
(Table.SQID, ColumnData(type=INT, default=None)),
(Table.NAME, ColumnData(type=TEXT, default=None)),
(Table.TYPE, ColumnData(type=INT, default=None)),
(Table.MOL, ColumnData(type=INT, default=None)),
(Table.SEG, ColumnData(type=INT, default=None)),
(FRAG, ColumnData(type=INT, default=None)),
(RESNR, ColumnData(type=INT, default=1)),
(RESNAME, ColumnData(type=TEXT, default=None)),
(Table.POSX, ColumnData(type=REAL, default=None)),
(Table.POSY, ColumnData(type=REAL, default=None)),
(Table.POSZ, ColumnData(type=REAL, default=None)),
(WEIGHT, ColumnData(type=REAL, default=None)),
(ELEMENT, ColumnData(type=TEXT, default=None)),
(QMID, ColumnData(type=INT, default=0)),
(QMPOSX, ColumnData(type=REAL, default=0.0)),
(QMPOSY, ColumnData(type=REAL, default=0.0)),
(QMPOSZ, ColumnData(type=REAL, default=0.0))])
# yapf: enable
[docs] def addRow(self, atom):
"""
Add a row
:type atom: `structure.Structure._StructureAtom`
:param atom: The atom to add a row for
"""
props = {}
props[self.SQID] = atom.index
props[self.NAME] = atom.pdbname
props[self.TYPE] = atom.pdbname
props[self.MOL] = atom.molecule_number
props[self.SEG] = atom.molecule_number
props[self.FRAG] = atom.molecule_number
props[self.RESNAME] = atom.pdbres
# Atom position is stored in nanometers
props[self.POSX] = atom.x / 10
props[self.POSY] = atom.y / 10
props[self.POSZ] = atom.z / 10
props[self.WEIGHT] = atom.atomic_weight
props[self.ELEMENT] = atom.element
self._addRow(props)
[docs]class PairsTable(Table):
""" The pairs table """
TABLE_NAME = 'pairs'
# The molecule numbers of the two segments involved in a dimer
SEG1 = 'seg1'
SEG2 = 'seg2'
# Delta X, Y and Z distance between two molecules in a dimer
DRX = 'drx'
DRY = 'dry'
DRZ = 'drz'
LOE = 'lOe'
LOH = 'lOh'
HAS_E = 'has_e'
HAS_H = 'has_h'
RATE12E = 'rate12e'
RATE21E = 'rate21e'
RATE12H = 'rate12h'
RATE21H = 'rate21h'
JEFF2E = 'Jeff2e'
JEFF2H = 'Jeff2h'
# yapf: disable
COLUMNS = OrderedDict(
[(Table.FRAME, ColumnData(type=INT, default=0)),
(Table.TOP, ColumnData(type=INT, default=0)),
(Table.SQID, ColumnData(type=INT, default=None)),
(SEG1, ColumnData(type=INT, default=None)),
(SEG2, ColumnData(type=INT, default=None)),
(DRX, ColumnData(type=REAL, default=None)),
(DRY, ColumnData(type=REAL, default=None)),
(DRZ, ColumnData(type=REAL, default=None)),
(LOE, ColumnData(type=REAL, default=0)),
(LOH, ColumnData(type=REAL, default=0)),
(HAS_E, ColumnData(type=INT, default=0)),
(HAS_H, ColumnData(type=INT, default=0)),
(RATE12E, ColumnData(type=REAL, default=0)),
(RATE21E, ColumnData(type=REAL, default=0)),
(RATE12H, ColumnData(type=REAL, default=0)),
(RATE21H, ColumnData(type=REAL, default=0)),
(JEFF2E, ColumnData(type=REAL, default=0)),
(JEFF2H, ColumnData(type=REAL, default=0)),
(Table.TYPE, ColumnData(type=INT, default=0))])
# yapf: enable
[docs] def addRow(self, index, dimer):
"""
Add a row to the table
:type index: int
:param index: The index of this pair
:type dimer: `schrodinger.application.matsci.clusterstruct.Dimer`
:param dimer: The Dimer object for this row
"""
mol1, mol2 = sorted(dimer.molnumbers)
atom_a = dimer.neighbor_info.home_atom
atom_b = dimer.neighbor_info.neighbor_atom
coords = atom_a.xyz + atom_b.xyz
# Must make sure the PBC is accounted for
dx, dy, dz = dimer.pbc.getShortestVector(*coords)
props = {}
# Pair ID
props[self.SQID] = index
# Molecules involved
props[self.SEG1] = mol1
props[self.SEG2] = mol2
# Delta coordinates for the closest approach between the molecules,
# values are in NM
props[self.DRX] = dx / 10.
props[self.DRY] = dy / 10.
props[self.DRZ] = dz / 10.
self._addRow(props)
[docs] @classmethod
def setRowProperty(cls, mols, prop, value, cursor):
"""
Set the value of a property in the row for the given pair of molecules
:param iterable mols: The two mol numbers involved in this pair
:param str prop: The name of property column to set
:param any value: The value to set for the property
:param `sqlite3.Cursor` cursor: The cursor to use
:raise RuntimeError: If value has spaces
"""
try:
if "" in value:
raise RuntimeError('Values with spaces are not allowed')
except TypeError:
pass
mol1, mol2 = sorted(mols)
cursor.execute(f'UPDATE {cls.TABLE_NAME} SET {prop} = {value} '
f'WHERE {cls.SEG1} = {mol1} AND {cls.SEG2} = {mol2}')
[docs]class SuperExchangeTable(Table):
"""
The superExchange table
Note: unused
"""
TABLE_NAME = 'superExchange'
COLUMNS = OrderedDict([(Table.FRAME, ColumnData(type=INT, default=0)),
(Table.TOP, ColumnData(type=INT, default=0)),
(Table.TYPE, ColumnData(type=TEXT, default=None))])
[docs]class SchrodingerTable(Table):
"""
The schrodinger table. This contains Schrodinger-specific information and is
not used by VOTCA
"""
TABLE_NAME = 'schrodinger'
NULL_ALLOWED = True
MOLFORM = 'mol_formula'
VOLUME = 'pbc_volume_Ang3'
JOBID = 'jobid'
STRUCTURE_PATH = 'structure_path'
STRUCTURE_FILE = 'structure_file'
PAIR_DISTANCE = 'pair_distance_Ang'
PAIR_TYPE = 'pair_type'
VERSION = 'version'
# Note - stopped using JUMPFILE in 20-1
JUMPFILE = 'jumpfile'
JUMPSUMMARY = 'jumpsummary'
RUNTIME = 'runtime'
SEED = 'seed'
FIELDX = 'fieldX'
FIELDY = 'fieldY'
FIELDZ = 'fieldZ'
TEMPERATURE = 'temperature'
RATEFILE = 'ratefile'
CARRIERTYPE = 'carriertype'
SITE_KEYWORDS = 'site_keywords'
FIX_SITE_KEYWORDS = 'site_fix_keywords'
# yapf: disable
COLUMNS = OrderedDict(
[(MOLFORM, ColumnData(type=TEXT, default=None)),
(VOLUME, ColumnData(type=REAL, default=0.0)),
(JOBID, ColumnData(type=TEXT, default=None)),
(STRUCTURE_PATH, ColumnData(type=TEXT, default=None)),
(STRUCTURE_FILE, ColumnData(type=TEXT, default=None)),
(PAIR_DISTANCE, ColumnData(type=REAL, default=0.0)),
(PAIR_TYPE, ColumnData(type=REAL, default=None)),
(VERSION, ColumnData(type=REAL, default=DB_VERSION)),
(JUMPSUMMARY, ColumnData(type=TEXT, default=None)),
(RUNTIME, ColumnData(type=REAL, default=None)),
(SEED, ColumnData(type=INT, default=None)),
(FIELDX, ColumnData(type=REAL, default=None)),
(FIELDY, ColumnData(type=REAL, default=None)),
(FIELDZ, ColumnData(type=REAL, default=None)),
(TEMPERATURE, ColumnData(type=REAL, default=None)),
(RATEFILE, ColumnData(type=TEXT, default=None)),
(CARRIERTYPE, ColumnData(type=TEXT, default=None)),
(SITE_KEYWORDS, ColumnData(type=TEXT, default=None)),
(FIX_SITE_KEYWORDS, ColumnData(type=TEXT, default=None))])
# yapf: enable
[docs] def addRow(self, struct):
"""
Add a row
:type struct: `schrodinger.structure.Structure`
:param struct: The structure for the database
"""
props = {}
# Molecular formula
props[self.MOLFORM] = analyze.generate_molecular_formula(struct)
# Box volumne
try:
box = cms.get_box(struct)
except KeyError:
volume = 0.0
else:
volume = cms.get_boxvolume(box)
props[self.VOLUME] = volume
self._addRow(props)
[docs]class DatabaseManager(object):
"""
Manage initialization and filling of SQL database tables
"""
[docs] class Cursor(SQLCursor):
"""
Context manager for obtaining a cursor object for use by the tables.
Note that when adding many rows it saves a huge amount of
time to create the cursor once and then close it when finished rather
than create/close a cursor for each row.
"""
[docs] def __init__(self, manager):
self.manager = manager
super().__init__(manager.path)
def __enter__(self):
cursor = super().__enter__()
for table in self.manager.tables.values():
table.setCursor(cursor)
return self.cursor
def __exit__(self, *args):
for table in self.manager.tables.values():
table.setCursor(None)
super().__exit__()
TABLE_CLASSES = (FramesTable, PairsTable, MoleculesTable, SegmentsTable,
FragmentsTable, AtomsTable, SegmentTypesTable,
SchrodingerTable, SuperExchangeTable)
[docs] def __init__(self, struct, filename):
"""
Create a DatabaseManager instance
:type struct: `schrodinger.structure.Structure`
:param struct: The structure to find dimers in
:type filename: str
:param filename: The name of the SQL file to create
"""
self.struct = struct
self.path = filename
self.tables = {x.TABLE_NAME: x(self.path) for x in self.TABLE_CLASSES}
self.segment_types = {}
[docs] def initializeDatabase(self):
"""
Create all the tables and fill all but the pairs table with initial data
"""
with self.Cursor(self):
for table in self.tables.values():
table.create()
self.fillAtoms()
self.fillFragments()
self.fillFrames()
self.fillMolecules()
self.fillSegmentTypes()
self.fillSegments()
self.fillSchrodinger()
[docs] def fillAtoms(self):
"""
Fill the atoms table
"""
# Atoms must have a name
for mol in self.struct.molecule:
for atom in mol.atom:
if not atom.pdbname.strip():
atom.pdbname = atom.element + str(atom.number_by_molecule)
table = self.tables[AtomsTable.TABLE_NAME]
for atom in self.struct.atom:
table.addRow(atom)
[docs] def fillFragments(self):
"""
Fill the fragments table
"""
table = self.tables[FragmentsTable.TABLE_NAME]
for mol in self.struct.molecule:
table.addRow(mol)
[docs] def fillFrames(self):
"""
Fill the frames table
"""
table = self.tables[FramesTable.TABLE_NAME]
table.addRow(self.struct)
[docs] def fillMolecules(self):
"""
Fill the molecules table
"""
table = self.tables[MoleculesTable.TABLE_NAME]
for mol in self.struct.molecule:
table.addRow(mol)
[docs] def fillSegmentTypes(self):
"""
Fill the segmentTypes table
"""
for mol in self.struct.molecule:
resname = Table.getSegmentType(mol)
if resname not in self.segment_types:
self.segment_types[resname] = len(self.segment_types) + 1
table = self.tables[SegmentTypesTable.TABLE_NAME]
for stype, index in self.segment_types.items():
table.addRow(stype, index)
[docs] def fillSegments(self):
"""
Fill the segments table
:raise RuntimeError: If fillSegmentTypes has not been called yet
"""
if not self.segment_types:
raise RuntimeError('fillSegmentTypes must be called before fill '
'Segments')
table = self.tables[SegmentsTable.TABLE_NAME]
for mol in self.struct.molecule:
table.addRow(mol, self.segment_types)
[docs] def fillSchrodinger(self):
"""
Fill the schrodinger table
"""
table = self.tables[SchrodingerTable.TABLE_NAME]
table.addRow(self.struct)
[docs] def fillPairs(self, dist, pair_type=SQL_HEAVY):
"""
Find all dimers in the given structure based on the normal Schrodinger
dimer finding algorithm. Add all found dimers to the given VOTCA SQL
file.
:type dist: float
:param dist: The distance threshold for defining dimers
:type pair_type: str
:param pair_type: Either SQL_HEAVY (heavy atom distances only) or
SQL_ALL (all atoms are considered when determining pair distance)
:rtype: int
:return: The number of dimers found
"""
if pair_type != SQL_HEAVY and pair_type != SQL_ALL:
raise ValueError('pair_type must be SQL_HEAVY or SQL_ALL')
heavy_only = pair_type == SQL_HEAVY
dimers = clusterstruct.get_dimers_in_structure(self.struct,
distance=dist,
heavy_only=heavy_only)
# Fill the database
table = self.tables[PairsTable.TABLE_NAME]
with self.Cursor(self):
for index, dimer in enumerate(dimers, 1):
table.addRow(index, dimer)
set_schrodinger_db_value(self.path, SchrodingerTable.PAIR_TYPE,
pair_type)
set_schrodinger_db_value(self.path, SchrodingerTable.PAIR_DISTANCE,
dist)
return len(dimers)
[docs]def sql_command(cursor, cmd):
"""
Perform the given command without closing the cursor or saving the results
to the database
:type cursor: sqlite3.Cursor
:param cursor: The cursor used
:type cmd: str
:param cmd: The SQL command to perform
:rtype: bool
:return: True if the command executed, False if the command raised a no such
table error
:raise sqlite3.OperationalError: in unknown circumstances
"""
try:
cursor.execute(cmd)
except sqlite3.OperationalError as msg:
if is_no_table_error(msg):
# This database has no such table
return False
else:
# Unknown condition, raise it
raise
return True
[docs]def table_rows(db_path, table, orderby=None):
"""
Generator for all the rows in a specific table of the database
:type db_path: str or pathlib.Path
:param db_path: The path to the database
:type table: str
:param table: The name of the table to get the rows for
:rtype: sqlite3.Row
:return: Yields each row in the table
"""
with SQLCursor(db_path) as cursor:
cmd = f'SELECT * FROM {table}'
if orderby:
cmd += f' ORDER BY {orderby}'
if not sql_command(cursor, cmd):
return
for row in cursor.fetchall():
yield row
[docs]def delete_all_rows(db_path, table):
"""
Delete all the rows in this table
:type db_path: str or pathlib.Path
:param db_path: The path to the database
:type table: str
:param table: The name of the table to get the rows for
"""
with SQLCursor(db_path) as cursor:
sql_command(cursor, f'DELETE FROM {table}')
[docs]def is_no_table_error(exc):
"""
Detect if this exception is due to the requested table not existing
:type exc: Exception
:param exc: The Exception to check
:rtype: bool
:return: Whether this exception is for a missing table
"""
return 'no such table' in str(exc)
[docs]def add_schrodinger_column(db_path, name):
"""
Add a column to the schrodinger table. This may be needed if the SQL file
was created with an older version that didn't include this column
:type db_path: str or `pathlib.Path`
:param db_path: The path to the SQL file
:type name: str
:param name: The name of the column, must be a key in
SchrodingerTable.COLUMNS
"""
data = SchrodingerTable.COLUMNS[name]
# ALTER TABLE schrodinger ADD COLUMN bob REAL DEFAULT '0.0'
cmd = (f"ALTER TABLE {SchrodingerTable.TABLE_NAME} ADD COLUMN "
f"'{name}' {data.type} DEFAULT '{data.default}'")
with SQLCursor(db_path) as cursor:
cursor.execute(cmd)
[docs]def set_schrodinger_db_value(db_path, name, value):
"""
Set the value of the given column in the Schrodinger table.
:type db_path: str or `pathlib.Path`
:param db_path: The path to the SQL file
:type name: str
:param name: The name of the column, must be a key in
SchrodingerTable.COLUMNS
:param value: The value to put into the database. The type of the parameter
should be consistent with the expected type for that column.
"""
with SQLCursor(db_path) as cursor:
# UPDATE schrodinger SET bob='0.0'
cmd = f"UPDATE {SchrodingerTable.TABLE_NAME} SET {name}='{value}'"
try:
cursor.execute(cmd)
except sqlite3.OperationalError as msg:
if 'no such column' in str(msg):
# An old version of the database that pre-dates this column. Add
# the column.
add_schrodinger_column(db_path, name)
cursor.execute(cmd)
else:
# Unknown case, let's see the error
raise
[docs]def store_schrodinger_job_props(db_path, mae_name, struct):
"""
Store Schrodinger information about the current job in the database
:type db_path: str or `pathlib.Path`
:param db_path: The path to the SQL file
:type mae_name: str
:param mae_name: The name of the Maestro file that will hold the structure
:type struct: `schrodinger.structure.Structure`
:param struct: The structure to add corresponding job info props to
"""
backend = jobcontrol.get_backend()
stable = SchrodingerTable
if backend:
job = backend.getJob()
set_schrodinger_db_value(db_path, stable.JOBID, job.JobId)
id_prop = VOTCA_JOB_ID
struct.property[id_prop] = job.JobId
set_schrodinger_db_value(db_path, stable.STRUCTURE_PATH,
job.OrigLaunchDir)
else:
set_schrodinger_db_value(db_path, stable.JOBID, SQL_NOJOB)
set_schrodinger_db_value(db_path, stable.STRUCTURE_PATH, os.getcwd())
set_schrodinger_db_value(db_path, stable.STRUCTURE_FILE, mae_name)
[docs]def get_schrodinger_db_value(db_path, name):
"""
Get the value for the given column from the Schrodinger table in the
database
:type db_path: str or `pathlib.Path`
:param db_path: The path to the SQL file
:type name: str
:param name: The name of the column to get the data from
:rtype: variable or None
:return: The value for the given column in the Schrodinger table, or None if
no such table exists or no such column exists
"""
for row in table_rows(db_path, SchrodingerTable.TABLE_NAME):
try:
# Much like the Highlander, there should be only one Schrodinger row
value = row[name]
except IndexError:
# This row has no information for the requested name
pass
else:
# Backwards compatibility for 'none' values in Schrodinger table
# MATSCI-11011
if value == SQL_DEPRECATED_NONE:
value = None
return value
return None
[docs]def get_db_structure_path(db_path, existence_check=True):
"""
Get the path to the structure that created this database
:type db_path: str or `pathlib.Path`
:param db_path: The path to the SQL file
:type existence_check: bool
:param existence_check: If True, return None if the path in the database
does not point to an existing file. If False, return the path regardless
of whether the file exists.
:rtype: pathlib.Path or None
:return: The Path to the structure file, or None if no path is found in the
database or existence_check=True and the file does not exist
"""
path = get_schrodinger_db_value(db_path, SchrodingerTable.STRUCTURE_PATH)
fname = get_schrodinger_db_value(db_path, SchrodingerTable.STRUCTURE_FILE)
def check_path(path, fname):
"""
Check to see if the expected structure file exists in the path directory
:type path: str or `pathlib.Path`
:param path: The path to the directory maybe holding the structure file
:type fname: str
:param fname: The name of the desired structure file
:rtype: pathlib.Path or None
:return: An existing path or None if not exists
"""
path = pathlib.Path(path)
full_path = path / fname
if not existence_check or full_path.exists():
return full_path
return None
if path:
valid_path = check_path(path, fname)
if valid_path:
return valid_path
if db_path:
valid_path = check_path(os.path.dirname(db_path), fname)
if valid_path:
return valid_path
return None
[docs]def add_pairs_to_database(struct, path, dist, pair_type=SQL_HEAVY):
"""
Find all dimers in the given structure based on the normal Schrodinger
dimer finding algorithm. Add all found dimers to the given VOTCA SQL
file.
:type struct: `schrodinger.structure.Structure`
:param struct: The structure with the pairs
:type path: str
:param path: The path to the SQL database
:type dist: float
:param dist: The distance threshold for defining dimers
:type pair_type: str
:param pair_type: Either SQL_HEAVY (heavy atom distances only) or
SQL_ALL (all atoms are considered when determining pair distance)
:rtype: int
:return: The number of dimers found
"""
manager = DatabaseManager(struct, path)
return manager.fillPairs(dist, pair_type=pair_type)
[docs]def get_pairs_from_database(db_path):
"""
Get the pairs from the database
:type db_path: str or `pathlib.Path`
:param db_path: The path to the SQL file
:rtype: list
:return: Each item of the list is a
`schrodinger.application.matsci.clusterstruct.Dimer` object. The list is
empty if the pairs table has not been populated. Note that the Dimer
objects will not have set the home_atom or neighbor_atom properties of
the neighbor_info property.
"""
dimers = []
for row in table_rows(db_path, PairsTable.TABLE_NAME):
mol1 = row[PairsTable.SEG1]
mol2 = row[PairsTable.SEG2]
dx = row[PairsTable.DRX]
dy = row[PairsTable.DRY]
dz = row[PairsTable.DRZ]
distsq = dx * dx + dy * dy + dz * dz
info = clusterstruct.Neighbor(home_atom=None,
neighbor_atom=None,
dsq=distsq)
dimers.append(clusterstruct.Dimer(0, mol1, mol2, info))
return dimers
[docs]def has_pair_data(db_path):
"""
Check if the database has pair data
:type db_path: str or pathlib.Path
:param db_path: The path to the database
:rtype: bool or str
:return: If no data, False. If data, the distance type used to find pairs -
either SQL_HEAVY or SQL_ALL
"""
ptype = get_schrodinger_db_value(db_path, SchrodingerTable.PAIR_TYPE)
if ptype is None:
return False
else:
return ptype
[docs]def get_pair_info(db_path):
"""
Get the parameters used to determine the existing pairs in the database
:type db_path: str or pathlib.Path
:param db_path: The path to the database
:rtype: (str, float) or None
:return: The type of distance used to find pairs (SQL_HEAVY or SQL_ALL) and
the distance cutoff for pairs. None is returned if no pair data exists.
"""
ptype = has_pair_data(db_path)
if not ptype:
return None
dist = get_schrodinger_db_value(db_path, SchrodingerTable.PAIR_DISTANCE)
return ptype, dist
[docs]def find_missing_coupling_data(path, charge):
"""
Find any pair coupings that are 0
:param str path: The path to the database file
:param str charge: Either `HOLE` or `ELECTRON`
:rtype: list
:return: Each item is a tuple with the molecule numbers of the two molecules
involved in the missing coupling term.
"""
ptab = PairsTable
prop = JEFF + CHARGE_ENDINGS[charge]
missing = []
for row in table_rows(path, ptab.TABLE_NAME, orderby=ptab.SQID):
if row[prop] == 0.0:
missing.append((row[ptab.SEG1], row[ptab.SEG2]))
return missing
[docs]def find_missing_site_energies(path, charge):
"""
Find any segment that has any site energy property equal to 0
:param str path: The path to the database file
:param str charge: Either `HOLE` or `ELECTRON`
:rtype: list
:return: Each item is the integer SQID (which translates to molecule number)
of any segment with missing site energy information
"""
missing = []
stab = SegmentsTable
props = stab.SITE_ENERGY_PROPS[charge]
for row in table_rows(path, stab.TABLE_NAME, orderby=stab.SQID):
if any(row[x] == 0.0 for x in props):
missing.append(row[stab.SQID])
return missing
[docs]def copy_sql_data(source, destination, table, columns):
"""
Copy the column from table in the source database to the destination
database
:type source: str or pathlib.path
:param source: the path to the source database
:type destination: str or pathlib.path
:param destination: the path to the destination database
:type table: str
:param table: The name of the table to copy from
:type columns: list
:param columns: A list of column names to copy
:raise IndexError: If the two databases do not have the same number of rows
"""
source_num = len(list(table_rows(source, table)))
dest_num = len(list(table_rows(destination, table)))
if source_num != dest_num:
raise IndexError('Cannot copy data because the source database has '
f'{source_num} rows but the destination database has '
f'{dest_num} rows.')
with SQLCursor(destination) as cursor:
for row in table_rows(source, table):
row_id = row[Table.SQID]
coldata = ' , '.join(['%s = %s' % (x, row[x]) for x in columns])
cmd = (
f'UPDATE {table} SET {coldata} WHERE {Table.SQID} = {row_id}')
cursor.execute(cmd)
[docs]class SQLCreationError(Exception):
""" Raised if an issue occurs when creating the database """
[docs]def generate_votca_database(struct, backend=None):
"""
Create a VOTCA SQL database and include data on species and pairs
Note that VOTCA requires the segments in a system be ordered such that all
segments of the same type appear together in order. This function returns
the reordered structure used to create the database. (segments=molecules)
:type struct: `schrodinger.structure.Structure`
:param struct: The structure to create a database for
:type backend: `scschrodigner.job.jobcontrol._Backend`
:param backend: The backend if one exists
:rtype: str
:return: The name of the sql file that was created
:raise SQLCreationError: If the sql file can't be created
"""
if backend:
basename = backend.getJob().Name
else:
basename = jobutils.clean_string(struct.title, default='votca_input')
sqlname = basename + '.sql'
manager = DatabaseManager(struct, sqlname)
manager.initializeDatabase()
if backend:
backend.addOutputFile(sqlname)
return sqlname