#!/usr/bin/env desres-exec
#{
# exec desres-cleanenv -eUSER -eVIPARR_FFDIR -eVIPARR_PDIR \
# -m Python/2.5.1-09/bin \
# -m destro/0.4.7.1-D/lib-python \
# -m simplejson/1.7.4-1+Release \
# -- python $0 "$@"
#}
# viparr - force field parameter assignment program
# (Violet "Vi" Parr is the girl in The Incredibles who generates the force fields.)
import glob
import optparse
import platform
import six
from schrodinger.application.desmond.packages.destro import Maeff
from .. import get_ffdir
from .. import get_pdir
from . import viparr_merge
from . import viparr_residue
from .viparr_plugins_util import * # noqa F403
from .viparr_util import * # noqa F403
#import templates_check
if platform.system() == "Linux":
# Currently this causes a crash on Windows.
# The default precision appears to be 8 anyway...
from schrodinger.application.desmond.packages.destro import \
setglobalprecision
setglobalprecision(8)
[docs]class Struct(object):
pass
[docs]class Atom(object):
pass
[docs]def templates_convert(templates):
'''Convert templates structure to internal data structure.
'''
class Atom():
pass
for (name, res) in templates.items():
atoms = dict()
# atom structure is:
# [aname, atomic number, charge, [atypes]]
for a in res['atoms']:
atom = Atom()
atom.an = a[1]
atom.charge = a[2]
atom.atypes = a[3]
atoms[a[0]] = atom
templates[name]['atoms'] = atoms
[docs]def compress_residue_list(residue_list):
'''Compress FF representation if possible
'''
if len(residue_list) == 0:
return
if residue_list[0] == None:
return
first = residue_list[0].name
for res in residue_list:
if res == None:
break
if res.name != first:
break
else:
# natural termination of loop means all residues are the same
# now check the first residue to make sure all atom indices are contiguous
# if so, then this residue list should only contain one residue
if max(residue_list[0].nlist) + 1 == len(residue_list[0].nlist):
del residue_list[1:]
print('Compressing FF representation...')
[docs]def get_atom_list(atoms, neighbors, templates, residue_list):
'''Return a list of atoms in the ct structure. Each atom has the
following fields: pdbname, pdbres, resnum, atomic number, charge, atomtypes.
Also check that all residue and atom names are in the templates
data structure.
'''
class Atom(object):
pass
num_atoms = len(atoms)
# above is correct unless we have compressed the residue list
if len(residue_list) == 1:
if residue_list[0] != None:
num_atoms = len(residue_list[0].nlist)
atom_list = [None] * num_atoms
for ires, res in enumerate(residue_list):
if res != None:
# create marker to tell if atoms are polarizable
# Yuck... This needs to go...(its used in bondfields)
# and is specific to drude polarization
polist = set([
p[4]
for p in templates[res.name].get('pseudos', [])
if p[3].startswith("drude_")
])
for i, n in enumerate(res.nlist): # atom numbers are 0-based
a = Atom()
a.type = 'atom'
a.pdbname = res.alist[i]
a.pdbres = res.name
a.tmplname = res.name
a.resnum = ires + 1 # 1-based
a.neighbors = neighbors[n]
a.index = n
obj = templates[res.name]['atoms'][a.pdbname]
a.an = obj.an
a.charge = obj.charge
a.atypes = obj.atypes
if a.pdbname in polist:
a.polnotpol = True
else:
a.polnotpol = False
atom_list[n] = a
return atom_list
[docs]def get_neighbor_list(atom_list):
'''Return list of neighbor lists; one neighbor list for each atom, in order.
Each neighborlist is guaranteed to be sorted. Entries are 1-based.
'''
neighbor_list = list()
for a in atom_list:
if a == None:
neighbor_list.append([])
else:
neighbor_list.append(a.neighbors)
return neighbor_list
[docs]def get_bond_list(neighbor_list):
'''Return list of all bonds (i,j) such that i<j.
List will be sorted meaningfully. Entries are 1-based.
'''
bond_list = list()
bond_set = set()
num_atoms = len(neighbor_list)
for i in range(num_atoms):
for j in neighbor_list[i]:
if neighbor_list[j - 1] == []:
raise AssertionError(
'A bond (%d,%d) straddles your force fields' % (i + 1, j))
if i + 1 < j:
bond = (i + 1, j)
if bond not in bond_set:
bond_list.append(bond)
bond_set.add(bond)
return bond_list
[docs]def get_angle_list(neighbor_list):
'''Return list of all angles (i,j,k) such that i<k.
List will be sorted meaningfully. Entries are 1-based.
'''
angle_list = list()
num_atoms = len(neighbor_list)
for i in range(num_atoms):
b = neighbor_list[i] # assumes b[0]<b[1]<b[2]<b[3]
nb = len(b)
index = i + 1
if nb > 6:
six.reraise(AssertionError,
'Atom %d has more than six bonds:' % index, b)
for at0 in range(nb):
for at2 in range(at0 + 1, nb):
angle_list.append((b[at0], index, b[at2]))
angle_list.sort()
return angle_list
[docs]def get_proper_list(neighbor_list, bond_list):
'''Return list of all propers (i,j,k,l) such that i<l.
List will be sorted meaningfully. Entries are 1-based.
'''
proper_list = list()
for b in bond_list: # contains only one of (i,j) and (j,i)
# loop over possible (1,4)'s
for n0 in neighbor_list[b[0] - 1]:
if n0 == b[1]:
continue
for n1 in neighbor_list[b[1] - 1]:
if n1 == b[0]:
continue
if n1 == n0: # triangle
continue
if n0 < n1:
proper_list.append((n0, b[0], b[1], n1))
else:
proper_list.append((n1, b[1], b[0], n0))
proper_list.sort()
return proper_list
[docs]def get_bonded_term_list(fieldname, templates, residue_list, sorted):
'''Return a list of bonded items. Each bonded item is a list, namely:
[i,j] when fieldname = 'bonds' or 'exclusions'. if sorted, i<j
[i,j,k] when fieldname = 'angles'. if sorted, i<k
[i,j,k,l] when fieldname = 'propers' or 'impropers'. if sorted, i<l
The returned list will be sorted in a meaningful way.
'''
term_list = list()
for res in residue_list:
if res == None:
continue # ignore residue if templates does not know about it
name = res.name
terms = templates[name].get(fieldname, None)
if terms == None:
continue
for dih in terms:
nk = len(dih)
d = list()
for atom in dih:
a = res.atoms.get(atom)
if a == None:
raise AssertionError(
'In residue <"%s",%d> (%s), did not find atom %s' %
(res.chain, res.resnum_orig, res.name_orig, atom))
d.append(a + 1)
if len(d) == nk:
if d[0] > d[-1] and sorted:
d.reverse() # use sorted order i<j, i<k, i<l
term_list.append(tuple(d))
else:
print('Warning: some bonded terms were not found', nk)
term_list.sort()
return term_list
[docs]def merge(list0, list1):
'''Merge list1 into list0, as long as there are no entries in common.
'''
if list1 != None and list1 != []:
for p in list1:
if p in list0:
print(p, list1, list0)
raise AssertionError('Cannot merge lists; already in list')
else:
list0.append(p)
list0.sort()
[docs]def get_pseudo_terms(iffld, pseudo_residue_list, nres, nstart):
'''Return drude and virtual particle list (numbering starting at nstart) and list of 'bonds'
between drudes, virtual sites and particles. The pseudo_list can be appended to the atom list.
'''
if nres != 1 and nres != len(pseudo_residue_list):
raise AssertionError('Mismatched residue and pseudo residue lists')
pseudo_term_list = list()
pseudo_bond_list = list()
index = nstart - 1
for pres in pseudo_residue_list[:nres]:
for a in pres:
index += 1
if (iffld != a.ffldidx):
pseudo_term_list.append(None)
continue
if hasattr(a, 'index') and a.index != index - 1:
raise AssertionError('Bad Pseudo Indexes')
a.index = index - 1
pseudo_term_list.append(a)
# pseudo 'bonds'
pseudo_bond_list.append((a.partner_index, index))
if hasattr(a, 'partner_index2'):
pseudo_bond_list.append((a.partner_index2, index))
return pseudo_term_list, pseudo_bond_list
[docs]def update_pseudo_residue_list(iffld, pseudo_residue_list, atoms, templates,
residue_list):
'''pseudo_list is a list of all residues in ct block, and each residue is
a list of pseudo particles.
'''
for ires, res in enumerate(residue_list):
if res == None:
continue # ignore if do not recognize
# ignore if pseudo_list already has an entry for this residue
if len(pseudo_residue_list[ires]) != 0:
print('Warning: pseudo_list already has entry for residue')
continue
li = templates[res.name].get('pseudos', [])
for line in li:
nsite = line[4] #"primary site" of pseudo
site = res.atoms[nsite] # clone first atom in site definition
a = Atom()
a.ffldidx = iffld
a.type = 'pseudo'
a.pdbname = line[0]
a.chain = atoms[site].chain
a.resnum = atoms[site].resnum
a.pdbres = atoms[site].pdbres
a.tmplname = res.name
a.x = atoms[site].x
a.y = atoms[site].y
a.z = atoms[site].z
a.charge = line[1]
a.atypes = line[2]
a.field = line[3]
a.terms = line[4:]
a.partner_atom = nsite
a.partner_index = res.atoms[nsite] + 1
if a.field in ['virtual_midpoint']:
a.partner_index2 = res.atoms[line[5]] + 1
if hasattr(atoms[site], 'grp_thermostat'):
a.grp_thermostat = atoms[site].grp_thermostat
if hasattr(atoms[site], 'grp_energy'):
a.grp_energy = atoms[site].grp_energy
if (not (a.field.startswith("virtual_") or
a.field.startswith("drude_"))):
raise AssertionError('Unrecognized pseudo type: %s' %
(str(a.field)))
pseudo_residue_list[ires].append(a)
[docs]def construct_lists(iffld, atoms, neighbors, rules, templates, residue_list,
pseudo_residue_list):
'''Return lists that describe the structure.
'''
compress_residue_list(residue_list)
atom_list = get_atom_list(atoms, neighbors, templates, residue_list)
neighbor_list = get_neighbor_list(atom_list)
bond_list = get_bond_list(neighbor_list)
angle_list = get_angle_list(neighbor_list)
proper_list = get_proper_list(neighbor_list, bond_list)
# Keep input order for impropers (the order may be signigicant depending on how the coordinate is calculated)
improper_list = get_bonded_term_list('impropers', templates, residue_list,
False)
extra_angle_list = get_bonded_term_list('angles', templates, residue_list,
True)
extra_proper_list = get_bonded_term_list('propers', templates, residue_list,
True)
extra_excl_list = get_bonded_term_list('exclusions', templates,
residue_list, True)
# drude and virtual lists
pseudo_list, pseudo_bond_list = get_pseudo_terms(iffld, pseudo_residue_list,
len(residue_list),
len(atom_list) + 1)
atom_list.extend(pseudo_list)
merge(angle_list, extra_angle_list)
merge(proper_list, extra_proper_list)
# Setup for simultaneous generation of pairs and exclusions.
# default arguments in rules.get() are for backwards compatibility
excl_rule = rules.get('exclusions', 4)
if excl_rule < 1 or excl_rule > 4:
raise AssertionError(
"Invalid exclusion rule: %d. 'exclusions' must be between 1 and 4 inclusive"
% (excl_rule))
es_scale_list = rules.get('es_scale', [0.0] * (excl_rule - 1))
lj_scale_list = rules.get('lj_scale', [0.0] * (excl_rule - 1))
if (len(es_scale_list) != excl_rule - 1 or
len(lj_scale_list) != excl_rule - 1):
raise AssertionError(
"Invalid 'es_scale' (%d) or 'lj_scale' (%d) list lengths... Should be %d"
% (len(es_scale_list), len(lj_scale_list), excl_rule - 1))
# Keep track of (1,x) pairs separately while generating exclusions.
# Pair terms are only genertated up to and including the requested exclusion depth
# 1-2 pairs: particles separated by one bond and not in the extra exclusion list
# 1-3 pairs: particles separated by two bonds and not 1-{2 } pairs and not in the extra exclusion list
# 1-4 pairs: particles separated by three bonds and not 1-{2,3} pairs and not in the extra exclusion list
pair_list = []
excl_list = set(extra_excl_list)
if excl_rule >= 2:
tmp = set([(a[0], a[1]) for a in bond_list])
tmp.difference_update(excl_list)
pair_list.append(tmp)
excl_list.update(tmp)
if excl_rule >= 3:
tmp = set([(a[0], a[2]) for a in angle_list])
tmp.difference_update(excl_list)
pair_list.append(tmp)
excl_list.update(tmp)
if excl_rule >= 4:
tmp = set([(a[0], a[3]) for a in proper_list])
tmp.difference_update(excl_list)
pair_list.append(tmp)
excl_list.update(tmp)
excl_list = list(excl_list)
excl_list.sort()
for i, p in enumerate(pair_list):
p = list(p)
p.sort()
pair_list[i] = p
lists = Struct()
lists.atom_list = atom_list
lists.neighbor_list = neighbor_list
lists.residue_list = residue_list
lists.bond_list = bond_list
lists.angle_list = angle_list
lists.proper_list = proper_list
lists.improper_list = improper_list
lists.excl_list = excl_list
lists.pair_list = pair_list
lists.es_scale_list = es_scale_list
lists.lj_scale_list = lj_scale_list
lists.pseudo_list = pseudo_list
lists.pseudo_bond_list = pseudo_bond_list
# UNDONE: excl and pairs and cmap are lists of tuples, unlike the other lists
return lists
[docs]def merge_blocks(first, second):
'''Merge second block into first; print warning message if element in
second block is already in the first block.
Assumes blocks themselves hold unique entries.
First block is the merged one.
'''
firstset = set([tuple([tuple(f[0]), f[1]]) for f in first])
for e in second:
key = tuple([tuple(e[0]), e[1]])
if key in firstset:
print('Ignoring subsequent match:', e)
else:
first.append(e)
[docs]def merge_blocks2(first, second):
'''Merge first block into second; print warning message if element in
first block will override element in second block.
Assumes blocks themselves hold unique entries.
Second block is the merged one. [NOT THE SAME BEHAVIOR AS ABOVE.]
'''
seconddict = dict()
for i, e in enumerate(second):
t = tuple([tuple(e[0]), e[1]])
seconddict[t] = i
for f in first:
key = tuple([tuple(f[0]), f[1]])
if key in seconddict:
ind = seconddict[key]
print('Ignoring subsequent match:', second[ind])
second[ind] = f
else:
second.append(f)
[docs]def add_water_constraints(ff_handle):
"""
Add an ffio_constraints block to the ff_handle for a water ct by reading
the ffio_c1 values from the angles and bonds, and append
'_constrained' to the functional forms of the angles and bonds.
:param ff_handle: the handle of the ffio_block
:type ff_handle: destro.Destro
"""
# this will be a list of ffio_c1 values for [angle, bond1, bond2]
ffio_c1s = []
for block in [ff_handle.ffio_angles, ff_handle.ffio_bonds]:
for row in block:
funct = row.ffio_funct
if not funct.endswith('_constrained'):
row.ffio_funct = funct + '_constrained'
ffio_c1s.append(row.ffio_c1)
block = [[[1, 2, 3], 'HOH', ffio_c1s]]
del ff_handle.ffio_constraints
ffio_add_block(ff_handle, 'ffio_constraints', block)
################################################################################
[docs]def main(vargs=[]): # noqa: M511
usage = '''
%prog [options] mae_file maeff_outfile
Description:
viparr is a force field parameter assignment program.
-f and -d are used to specify force fields; the order of force fields
is important: earlier ones take precedence over later ones.
Simple example:
%prog -f charmm27 -f spc mae_file maeff_outfile
More complicated example:
%prog -d me/myfiles/myff -f charmm27 -f spc mae_file maeff_outfile
'''
def parser_add_directory(option, opt_str, value, parser, ffdir):
if ffdir == None:
raise AssertionError('VIPARR_FFDIR environment variable undefined; '+ \
'no built-in force fields can be specified')
parser.values.forcefield.append(os.path.join(ffdir, value))
def parser_merge(option, opt_str, value, parser):
if len(parser.values.forcefield) == 0:
raise AssertionError('Specify -f or -d before -m')
print('Merging <%s> and <%s>' % (parser.values.forcefield[-1], value))
destdir = viparr_merge.merge(parser.values.forcefield[-1], value)
print('Using temporary directory <%s>' % destdir)
parser.values.forcefield[-1] = destdir
ffdir = get_ffdir()
pdir = get_pdir()
if not ffdir:
usage += 'VIPARR_FF_PATH is empty: No built-in force fields'
else:
usage += 'Available built-in force fields:\n '
temp = os.listdir(ffdir)
temp.sort()
usage += '\n '.join(temp)
opt = optparse.OptionParser(usage=usage, version=viparr_version)
opt.set_defaults(ctnum=None)
opt.add_option('-c',
type='int',
dest='ctnum',
action="append",
help='''run viparr for selected CT blocks only, e.g., for the
first and third CT blocks, use "-c 1 -c 3"''')
opt.add_option('-f',
type='string',
dest='ffname',
action='callback',
callback=parser_add_directory,
callback_args=(ffdir,),
help='''built-in force field name; several can be listed,
each preceded by its own -f''')
opt.add_option('-d',
action='append',
dest='forcefield',
default=[],
help='''user-provided force field directory; several can
be listed, each preceded by its own -d''')
opt.add_option('-m',
type='string',
dest='merge_dir',
action='callback',
callback=parser_merge,
help='''path to user-defined force field directory that
is to be merged with previously specified force field;
several can be listed, each preceded by its own -m''')
opt.add_option('-n',
dest='ffio_name',
default='Generated by viparr',
help='force field name to put into output file')
opt.add_option('-p',
dest='plugindir',
default=pdir,
help='override for plugins directory (advanced usage)')
opt.add_option('-x',
action="store_true",
dest='no_header',
default=False,
help='do not print header info (for testing purposes)')
opt.add_option('-v',
action="store_true",
dest='verbose',
default=False,
help='verbose output')
opt.add_option(
'-w',
action="store_true",
dest='is_water',
default=False,
help='use this option when running viparr on a water ct, this '
'will recreate any constraints block that is removed')
if (vargs == []):
opts, args = opt.parse_args()
else:
opts, args = opt.parse_args(vargs)
print('viparr', viparr_version)
# check number of arguments first
if len(args) != 2:
opt.error('incorrect number of arguments: %d %s' %
(len(args), str(args)))
mae_file = args[0]
maeff_outfile = args[1]
# check that there is at least one forcefield
if len(opts.forcefield) < 1:
raise AssertionError('No force field specified')
# check that each directory exists
print('All FF directories, in order:\n', '\n'.join(opts.forcefield))
for dir in opts.forcefield:
if not os.path.isdir(dir):
raise AssertionError('Force field dir <%s> is not a directory' %
dir)
print('Plug-in directory:', opts.plugindir)
if opts.plugindir == None:
raise AssertionError('No plug-in directory specified with -p or with '+ \
'VIPARR_PDIR environment variable')
else:
if not os.path.isdir(opts.plugindir):
raise AssertionError('Plug-in dir <%s> is not a directory' %
opts.plugindir)
# open maestro file
print('Reading maestro file <%s> ...' % mae_file)
if mae_file.endswith('.mae.gz') or mae_file.endswith('.maegz'):
import gzip
mae_buffer = gzip.open(mae_file, 'rb').read()
M = Maeff(mae_buffer)
else:
with open(mae_file) as f:
M = Maeff(f)
print('Done reading maestro file')
num_structures = len(M)
# create structure list, which is 0-based list of ct numbers to process
if opts.ctnum == None:
structure_list = list(range(num_structures))
else:
structure_list = []
for e in opts.ctnum:
if e > num_structures or e < 1:
raise AssertionError('Bad value <%d> for selected CTNUM' % e)
structure_list.append(e - 1)
# skip structure if it is a "full_system"
for s in structure_list:
if M[s + 1].__contains__('ffio_ct_type'):
if M[s + 1].ffio_ct_type == 'full_system':
print('Skipping full_system structure <%d>' % (s + 1))
structure_list.remove(s)
if len(structure_list) == 0:
print('No CT blocks to process...exiting')
sys.exit(1)
# process the atoms and bonds in each ct
# this should be the only place that use CtBonds and CtAtoms
# ct_atoms and ct_bonds may have entries that are None
ct_atoms = [None] * num_structures
ct_bonds = [None] * num_structures
for s in structure_list:
print('Analyzing ct block %d' % (s + 1))
ct = M[s + 1]
ct_bonds[s] = CtBonds(ct)
ct_atoms[s] = CtAtoms(ct)
# residue list for each structure for each force field
all_residue_lists = [[[] for i in range(num_structures)] \
for j in range(len(opts.forcefield))]
######################################################################
# Loop for constructing the residue lists
# We do this as a pre-processing step so that we can check that all residues
# were matched by some template
for idir, dir in enumerate(opts.forcefield):
templates = {}
tnames = [t for t in glob.glob(os.path.join(dir, 'templates*'))]
for tmpl in tnames:
viparr_merge.update_templates_strict(templates, tmpl)
#templates_check.templates_check(templates)
viparr_residue.templates_reorder_atoms(templates)
# loop through all ct blocks
for s in structure_list:
print('******Constructing residue list for <%s> Structure %d' %
(dir, s + 1))
residue_list = viparr_residue.get_residue_list(
ct_atoms[s], ct_bonds[s], templates)
# save this residue list
all_residue_lists[idir][s] = residue_list
######################################################################
# Check that all residues were matched by some force field
# and print diagnostic messages.
# Also do the essential step of overwriting unmatched residues with 'None'
derror = dict()
dwarn = dict()
dmatch = dict()
num_messages = 5 # number of messages per residue name
num_errors = 0
for s in structure_list:
for i in range(len(all_residue_lists[0][s])):
matches = []
for idir, dir in enumerate(opts.forcefield):
res = all_residue_lists[idir][s][i]
if res.match:
matches.append(dir)
if res.name_orig != res.name:
pair = (res.name_orig, res.name)
dmatch[pair] = dmatch.setdefault(pair, 0) + 1
if dmatch[pair] <= num_messages:
print('Note: Struct %d, Res <"%s",%d> (%s) matched (%s)' % \
(s +1, res.chain, res.resnum_orig, res.name_orig, res.name))
if dmatch[pair] == num_messages:
print(
'Ignoring further messages for (%s) matching (%s)' %
pair)
if len(matches) != 1:
res = all_residue_lists[idir][s][i]
which = (s + 1, res.chain, res.resnum_orig, res.name_orig)
if len(matches) == 0:
num_errors = num_errors + 1
derror[res.name_orig] = derror.setdefault(res.name_orig,
0) + 1
if derror[res.name_orig] <= num_messages:
formula = viparr_residue.residue_formula(
[ct_atoms[s][ii].atomicnum for ii in res.nlist])
print(
'Error: Struct %d, Res <"%s",%d> (%s) not matched by any force field template'
% which, 'with formula', formula)
if derror[res.name_orig] == num_messages:
print('Ignoring further errors for (%s)' %
res.name_orig)
elif len(matches) > 1:
dwarn[res.name_orig] = dwarn.setdefault(res.name_orig,
0) + 1
if dwarn[res.name_orig] <= num_messages:
print(
'Warning: Struct %d, Res <"%s",%d> (%s) matched by multiple force fields:'
% which, matches)
if dwarn[res.name_orig] == num_messages:
print('Ignoring further warnings for (%s)' %
res.name_orig)
for idir, dir in enumerate(opts.forcefield):
if all_residue_lists[idir][s][i].match == False:
all_residue_lists[idir][s][i] = None # overwrite
if num_errors > 0:
sys.exit(1)
###################################################################################
# Get the combining rules and function type and check that they are consistent
# also check the forcefield versions for incompatability
comb_rule = set()
vdw_func = set()
for dir in opts.forcefield:
rules = parse_json(os.path.join(dir, 'rules'))
minver = rules.get('min_viparr_version', viparr_version)
if (list(map(int, minver.split('.'))) > list(
map(int, viparr_version.split('.')))):
raise AssertionError(
'Forcefield %s needs viparr version >= %s. Current version = %s'
% (dir, minver, viparr_version))
x = rules.get('vdw_comb_rule', '').upper()
if x == '':
x = 'WATER'
comb_rule.add(x)
x = rules.get('vdw_func', 'LJ12_6_sig_epsilon')
vdw_func.add(x)
if len(vdw_func) != 1:
print(vdw_func)
raise AssertionError('Inconsistent vdW func rules among force fields')
vdw_func = vdw_func.pop()
if comb_rule == set(['WATER', 'ARITHMETIC/GEOMETRIC']) or \
comb_rule == set(['WATER', 'GEOMETRIC']):
comb_rule.remove('WATER')
if comb_rule == set(['WATER']):
comb_rule = set(['ARITHMETIC/GEOMETRIC'])
if len(comb_rule) != 1:
print(comb_rule)
raise AssertionError('Inconsistent vdW mixing rules among force fields')
comb_rule = comb_rule.pop()
###################################################################################
# Get the exclusion rules and check that they are consistent
# This is NOT necessary! We already fail if a bond straddles 2 forcefields.
# This prevents us from needing to worry about mixing 2 different exclusion rules
# as exclusions/pair are based on topology...
#
#exclusion_rule = []
#for dir in opts.forcefield:
# rules = parse_json(os.path.join(dir, 'rules'))
# x = rules.get('exclusions', 4)
# if( x not in exclusion_rule):
# exclusion_rule.append(x)
#if len(exclusion_rule) != 1:
# print exclusion_rule
# raise AssertionError, 'Inconsistent exclusion rules among force fields'
#exclusion_rule = exclusion_rule[0]
######################################################################
# Main loop algorithm:
# Loop over the structures in the ct
# Loop over the force fields (some duplicate work here if multiple structures)
# Loop over the plug-ins
# Apply plug-in
for s in structure_list:
print('******Main loop: Structure', s + 1)
strules = []
sttemplates = []
# length is num_residues in structure
pseudo_residue_list = [[] for i in range(len(all_residue_lists[0][s]))]
# We need all the pseudos in the structure now so we can get the site indexing correct
for idir, dir in enumerate(opts.forcefield):
print('Pre-processing force field <%s> for rules and pseudos' % dir)
strules.append(parse_json(os.path.join(dir, 'rules')))
templates = {}
tnames = [t for t in glob.glob(os.path.join(dir, 'templates*'))]
for tmpl in tnames:
viparr_merge.update_templates_strict(templates, tmpl)
templates_convert(templates)
# update pseudo_list
update_pseudo_residue_list(
idir, pseudo_residue_list, ct_atoms[s], templates,
all_residue_lists[idir][s]) # before compression
sttemplates.append(templates)
tables = Struct() # the tables describing the chemical structure
blocks = dict(
) # dictionary of blocks that will be written to the mae file
info = list(
) # list of strings which are references for the force fields
info.append(('Plugins: <%s>' % opts.plugindir, ''))
for idir, dir in enumerate(opts.forcefield):
print('Processing force field <%s>' % dir)
tables.rules = strules[idir]
tables.templates = sttemplates[idir]
# construct lists that describe the chemical system for this force field
lists = construct_lists(idir, ct_atoms[s], ct_bonds[s],
tables.rules, tables.templates,
all_residue_lists[idir][s],
pseudo_residue_list)
# so rules can be passed to plugins...
# Viparr should take the position that rules are for internal use only.
# This dosent stop viparr from passing certain of these values to plugins however (see construct_lists)
#
# lists.rules = tables.rules
lists.comb_rule = comb_rule
lists.vdw_func = vdw_func
# so that ff directory is passed to plugins...
lists.dir = dir
# pass templates
lists.templates = tables.templates
# pass processed blocks
lists.blocks = {}
#info.append('FF dir: <%s>' % lists.dir) # force field directory
#info.extend(lists.rules.get('info', ['No info']))
ffd = 'FF: <%s>' % lists.dir
data = tables.rules.get('info', ['No info'])
for d in data:
info.append((ffd, d))
# loop over plug-ins
pluglist = [p[0] for p in tables.rules['plugins']]
if ('atoms' in pluglist and pluglist[-1] != 'atoms'):
raise UserWarning(
"atoms plugin should be last plugin applied. (Check rules file)"
)
for plugin_setup in tables.rules['plugins']:
plugin = plugin_setup[0]
plugin_cfg = plugin_setup[1]
if opts.verbose:
print('Applying plugin <%s>' % plugin)
context = {}
ns = dict(context=context)
# currently only read plugins from plugindir (or current dir if no plugindir)
fname = os.path.join(opts.plugindir, plugin + '.py')
with open(fname, 'rb') as f:
exec(compile(f.read(), fname, 'exec'),
ns) # define and 'declare' the plugin
temp = context['callback'](lists, plugin_cfg)
# for each component of temp, append to the corresponding local ff block
for name in list(temp):
if lists.blocks.get(name) != None:
merge_blocks2(lists.blocks[name], temp[name])
lists.blocks[name] = temp[name]
# for each component of lists.blocks, append to the corresponding global ff block
for name in list(lists.blocks):
if blocks.get(name) != None:
merge_blocks2(blocks[name], lists.blocks[name])
blocks[name] = lists.blocks[name]
print('Building mae data structure...')
# put the terms in the maestro file
del M[
s +
1].ffio_ff # delete any existing ffio block - could be very important!
ff_handle = M[s + 1].__new_block__('ffio_ff')
# add header
if not opts.no_header:
ffio_add_header(ff_handle, opts.ffio_name, info)
# add vdw func type
ff_handle.ffio_vdw_func = vdw_func
# add combining rule
ff_handle.ffio_comb_rule = comb_rule
# add pseudo block
ffio_add_pseudo_block(ff_handle, pseudo_residue_list)
# add each block automatically in sorted order
keys = list(blocks)
keys.sort()
for k in keys:
# skip cmap if no torsion_torsion
if k[:4] == 'cmap' and len(blocks.get('torsion_torsion', [])) == 0:
continue
if k == 'atoms':
ffio_add_atoms_block(ff_handle, blocks.get(k))
else:
try:
ffio_add_block(ff_handle, 'ffio_' + k, blocks.get(k))
except: # noqa: E722
print("Failed while adding 'ffio_%s':" % (k))
for f in blocks.get(k):
print(f)
if opts.is_water:
add_water_constraints(ff_handle)
# write the file
print('Writing maestro file <%s>' % maeff_outfile)
if maeff_outfile.endswith('.mae.gz') or maeff_outfile.endswith('.maegz'):
# unfortunately, the implementation of print and str functions are different
# if we just use below statement, we will miss linefeed in the actual output.
# import gzip
# gzip.open(maeff_outfile, 'w').write(str(M))
# The solution here is to write to a temporary file and read it back before
# compressing it.
from tempfile import mkstemp
handle, tmpfile = mkstemp()
with open(tmpfile, 'w') as f:
f.write(str(M))
import gzip
with open(tmpfile) as f:
gzip.open(maeff_outfile, 'w').write(f.read())
os.remove(tmpfile)
else:
with open(maeff_outfile, 'w') as f:
f.write(str(M))
if __name__ == '__main__':
main()