Source code for schrodinger.application.desmond.packages.viparr1.viparr.viparr

#!/usr/bin/env desres-exec
#{
# exec desres-cleanenv -eUSER -eVIPARR_FFDIR -eVIPARR_PDIR \
# -m Python/2.5.1-09/bin \
# -m destro/0.4.7.1-D/lib-python \
# -m simplejson/1.7.4-1+Release \
# -- python $0 "$@"
#}

# viparr - force field parameter assignment program
# (Violet "Vi" Parr is the girl in The Incredibles who generates the force fields.)

import glob
import optparse
import platform
import six

from schrodinger.application.desmond.packages.destro import Maeff

from .. import get_ffdir
from .. import get_pdir
from . import viparr_merge
from . import viparr_residue
from .viparr_plugins_util import *  # noqa F403
from .viparr_util import *  # noqa F403

#import templates_check

if platform.system() == "Linux":
    # Currently this causes a crash on Windows.
    # The default precision appears to be 8 anyway...
    from schrodinger.application.desmond.packages.destro import \
        setglobalprecision
    setglobalprecision(8)


[docs]class Struct(object): pass
[docs]class Atom(object): pass
[docs]def templates_convert(templates): '''Convert templates structure to internal data structure. ''' class Atom(): pass for (name, res) in templates.items(): atoms = dict() # atom structure is: # [aname, atomic number, charge, [atypes]] for a in res['atoms']: atom = Atom() atom.an = a[1] atom.charge = a[2] atom.atypes = a[3] atoms[a[0]] = atom templates[name]['atoms'] = atoms
[docs]def compress_residue_list(residue_list): '''Compress FF representation if possible ''' if len(residue_list) == 0: return if residue_list[0] == None: return first = residue_list[0].name for res in residue_list: if res == None: break if res.name != first: break else: # natural termination of loop means all residues are the same # now check the first residue to make sure all atom indices are contiguous # if so, then this residue list should only contain one residue if max(residue_list[0].nlist) + 1 == len(residue_list[0].nlist): del residue_list[1:] print('Compressing FF representation...')
[docs]def get_atom_list(atoms, neighbors, templates, residue_list): '''Return a list of atoms in the ct structure. Each atom has the following fields: pdbname, pdbres, resnum, atomic number, charge, atomtypes. Also check that all residue and atom names are in the templates data structure. ''' class Atom(object): pass num_atoms = len(atoms) # above is correct unless we have compressed the residue list if len(residue_list) == 1: if residue_list[0] != None: num_atoms = len(residue_list[0].nlist) atom_list = [None] * num_atoms for ires, res in enumerate(residue_list): if res != None: # create marker to tell if atoms are polarizable # Yuck... This needs to go...(its used in bondfields) # and is specific to drude polarization polist = set([ p[4] for p in templates[res.name].get('pseudos', []) if p[3].startswith("drude_") ]) for i, n in enumerate(res.nlist): # atom numbers are 0-based a = Atom() a.type = 'atom' a.pdbname = res.alist[i] a.pdbres = res.name a.tmplname = res.name a.resnum = ires + 1 # 1-based a.neighbors = neighbors[n] a.index = n obj = templates[res.name]['atoms'][a.pdbname] a.an = obj.an a.charge = obj.charge a.atypes = obj.atypes if a.pdbname in polist: a.polnotpol = True else: a.polnotpol = False atom_list[n] = a return atom_list
[docs]def get_neighbor_list(atom_list): '''Return list of neighbor lists; one neighbor list for each atom, in order. Each neighborlist is guaranteed to be sorted. Entries are 1-based. ''' neighbor_list = list() for a in atom_list: if a == None: neighbor_list.append([]) else: neighbor_list.append(a.neighbors) return neighbor_list
[docs]def get_bond_list(neighbor_list): '''Return list of all bonds (i,j) such that i<j. List will be sorted meaningfully. Entries are 1-based. ''' bond_list = list() bond_set = set() num_atoms = len(neighbor_list) for i in range(num_atoms): for j in neighbor_list[i]: if neighbor_list[j - 1] == []: raise AssertionError( 'A bond (%d,%d) straddles your force fields' % (i + 1, j)) if i + 1 < j: bond = (i + 1, j) if bond not in bond_set: bond_list.append(bond) bond_set.add(bond) return bond_list
[docs]def get_angle_list(neighbor_list): '''Return list of all angles (i,j,k) such that i<k. List will be sorted meaningfully. Entries are 1-based. ''' angle_list = list() num_atoms = len(neighbor_list) for i in range(num_atoms): b = neighbor_list[i] # assumes b[0]<b[1]<b[2]<b[3] nb = len(b) index = i + 1 if nb > 6: six.reraise(AssertionError, 'Atom %d has more than six bonds:' % index, b) for at0 in range(nb): for at2 in range(at0 + 1, nb): angle_list.append((b[at0], index, b[at2])) angle_list.sort() return angle_list
[docs]def get_proper_list(neighbor_list, bond_list): '''Return list of all propers (i,j,k,l) such that i<l. List will be sorted meaningfully. Entries are 1-based. ''' proper_list = list() for b in bond_list: # contains only one of (i,j) and (j,i) # loop over possible (1,4)'s for n0 in neighbor_list[b[0] - 1]: if n0 == b[1]: continue for n1 in neighbor_list[b[1] - 1]: if n1 == b[0]: continue if n1 == n0: # triangle continue if n0 < n1: proper_list.append((n0, b[0], b[1], n1)) else: proper_list.append((n1, b[1], b[0], n0)) proper_list.sort() return proper_list
[docs]def get_bonded_term_list(fieldname, templates, residue_list, sorted): '''Return a list of bonded items. Each bonded item is a list, namely: [i,j] when fieldname = 'bonds' or 'exclusions'. if sorted, i<j [i,j,k] when fieldname = 'angles'. if sorted, i<k [i,j,k,l] when fieldname = 'propers' or 'impropers'. if sorted, i<l The returned list will be sorted in a meaningful way. ''' term_list = list() for res in residue_list: if res == None: continue # ignore residue if templates does not know about it name = res.name terms = templates[name].get(fieldname, None) if terms == None: continue for dih in terms: nk = len(dih) d = list() for atom in dih: a = res.atoms.get(atom) if a == None: raise AssertionError( 'In residue <"%s",%d> (%s), did not find atom %s' % (res.chain, res.resnum_orig, res.name_orig, atom)) d.append(a + 1) if len(d) == nk: if d[0] > d[-1] and sorted: d.reverse() # use sorted order i<j, i<k, i<l term_list.append(tuple(d)) else: print('Warning: some bonded terms were not found', nk) term_list.sort() return term_list
[docs]def merge(list0, list1): '''Merge list1 into list0, as long as there are no entries in common. ''' if list1 != None and list1 != []: for p in list1: if p in list0: print(p, list1, list0) raise AssertionError('Cannot merge lists; already in list') else: list0.append(p) list0.sort()
[docs]def get_pseudo_terms(iffld, pseudo_residue_list, nres, nstart): '''Return drude and virtual particle list (numbering starting at nstart) and list of 'bonds' between drudes, virtual sites and particles. The pseudo_list can be appended to the atom list. ''' if nres != 1 and nres != len(pseudo_residue_list): raise AssertionError('Mismatched residue and pseudo residue lists') pseudo_term_list = list() pseudo_bond_list = list() index = nstart - 1 for pres in pseudo_residue_list[:nres]: for a in pres: index += 1 if (iffld != a.ffldidx): pseudo_term_list.append(None) continue if hasattr(a, 'index') and a.index != index - 1: raise AssertionError('Bad Pseudo Indexes') a.index = index - 1 pseudo_term_list.append(a) # pseudo 'bonds' pseudo_bond_list.append((a.partner_index, index)) if hasattr(a, 'partner_index2'): pseudo_bond_list.append((a.partner_index2, index)) return pseudo_term_list, pseudo_bond_list
[docs]def update_pseudo_residue_list(iffld, pseudo_residue_list, atoms, templates, residue_list): '''pseudo_list is a list of all residues in ct block, and each residue is a list of pseudo particles. ''' for ires, res in enumerate(residue_list): if res == None: continue # ignore if do not recognize # ignore if pseudo_list already has an entry for this residue if len(pseudo_residue_list[ires]) != 0: print('Warning: pseudo_list already has entry for residue') continue li = templates[res.name].get('pseudos', []) for line in li: nsite = line[4] #"primary site" of pseudo site = res.atoms[nsite] # clone first atom in site definition a = Atom() a.ffldidx = iffld a.type = 'pseudo' a.pdbname = line[0] a.chain = atoms[site].chain a.resnum = atoms[site].resnum a.pdbres = atoms[site].pdbres a.tmplname = res.name a.x = atoms[site].x a.y = atoms[site].y a.z = atoms[site].z a.charge = line[1] a.atypes = line[2] a.field = line[3] a.terms = line[4:] a.partner_atom = nsite a.partner_index = res.atoms[nsite] + 1 if a.field in ['virtual_midpoint']: a.partner_index2 = res.atoms[line[5]] + 1 if hasattr(atoms[site], 'grp_thermostat'): a.grp_thermostat = atoms[site].grp_thermostat if hasattr(atoms[site], 'grp_energy'): a.grp_energy = atoms[site].grp_energy if (not (a.field.startswith("virtual_") or a.field.startswith("drude_"))): raise AssertionError('Unrecognized pseudo type: %s' % (str(a.field))) pseudo_residue_list[ires].append(a)
[docs]def construct_lists(iffld, atoms, neighbors, rules, templates, residue_list, pseudo_residue_list): '''Return lists that describe the structure. ''' compress_residue_list(residue_list) atom_list = get_atom_list(atoms, neighbors, templates, residue_list) neighbor_list = get_neighbor_list(atom_list) bond_list = get_bond_list(neighbor_list) angle_list = get_angle_list(neighbor_list) proper_list = get_proper_list(neighbor_list, bond_list) # Keep input order for impropers (the order may be signigicant depending on how the coordinate is calculated) improper_list = get_bonded_term_list('impropers', templates, residue_list, False) extra_angle_list = get_bonded_term_list('angles', templates, residue_list, True) extra_proper_list = get_bonded_term_list('propers', templates, residue_list, True) extra_excl_list = get_bonded_term_list('exclusions', templates, residue_list, True) # drude and virtual lists pseudo_list, pseudo_bond_list = get_pseudo_terms(iffld, pseudo_residue_list, len(residue_list), len(atom_list) + 1) atom_list.extend(pseudo_list) merge(angle_list, extra_angle_list) merge(proper_list, extra_proper_list) # Setup for simultaneous generation of pairs and exclusions. # default arguments in rules.get() are for backwards compatibility excl_rule = rules.get('exclusions', 4) if excl_rule < 1 or excl_rule > 4: raise AssertionError( "Invalid exclusion rule: %d. 'exclusions' must be between 1 and 4 inclusive" % (excl_rule)) es_scale_list = rules.get('es_scale', [0.0] * (excl_rule - 1)) lj_scale_list = rules.get('lj_scale', [0.0] * (excl_rule - 1)) if (len(es_scale_list) != excl_rule - 1 or len(lj_scale_list) != excl_rule - 1): raise AssertionError( "Invalid 'es_scale' (%d) or 'lj_scale' (%d) list lengths... Should be %d" % (len(es_scale_list), len(lj_scale_list), excl_rule - 1)) # Keep track of (1,x) pairs separately while generating exclusions. # Pair terms are only genertated up to and including the requested exclusion depth # 1-2 pairs: particles separated by one bond and not in the extra exclusion list # 1-3 pairs: particles separated by two bonds and not 1-{2 } pairs and not in the extra exclusion list # 1-4 pairs: particles separated by three bonds and not 1-{2,3} pairs and not in the extra exclusion list pair_list = [] excl_list = set(extra_excl_list) if excl_rule >= 2: tmp = set([(a[0], a[1]) for a in bond_list]) tmp.difference_update(excl_list) pair_list.append(tmp) excl_list.update(tmp) if excl_rule >= 3: tmp = set([(a[0], a[2]) for a in angle_list]) tmp.difference_update(excl_list) pair_list.append(tmp) excl_list.update(tmp) if excl_rule >= 4: tmp = set([(a[0], a[3]) for a in proper_list]) tmp.difference_update(excl_list) pair_list.append(tmp) excl_list.update(tmp) excl_list = list(excl_list) excl_list.sort() for i, p in enumerate(pair_list): p = list(p) p.sort() pair_list[i] = p lists = Struct() lists.atom_list = atom_list lists.neighbor_list = neighbor_list lists.residue_list = residue_list lists.bond_list = bond_list lists.angle_list = angle_list lists.proper_list = proper_list lists.improper_list = improper_list lists.excl_list = excl_list lists.pair_list = pair_list lists.es_scale_list = es_scale_list lists.lj_scale_list = lj_scale_list lists.pseudo_list = pseudo_list lists.pseudo_bond_list = pseudo_bond_list # UNDONE: excl and pairs and cmap are lists of tuples, unlike the other lists return lists
[docs]def merge_blocks(first, second): '''Merge second block into first; print warning message if element in second block is already in the first block. Assumes blocks themselves hold unique entries. First block is the merged one. ''' firstset = set([tuple([tuple(f[0]), f[1]]) for f in first]) for e in second: key = tuple([tuple(e[0]), e[1]]) if key in firstset: print('Ignoring subsequent match:', e) else: first.append(e)
[docs]def merge_blocks2(first, second): '''Merge first block into second; print warning message if element in first block will override element in second block. Assumes blocks themselves hold unique entries. Second block is the merged one. [NOT THE SAME BEHAVIOR AS ABOVE.] ''' seconddict = dict() for i, e in enumerate(second): t = tuple([tuple(e[0]), e[1]]) seconddict[t] = i for f in first: key = tuple([tuple(f[0]), f[1]]) if key in seconddict: ind = seconddict[key] print('Ignoring subsequent match:', second[ind]) second[ind] = f else: second.append(f)
[docs]def add_water_constraints(ff_handle): """ Add an ffio_constraints block to the ff_handle for a water ct by reading the ffio_c1 values from the angles and bonds, and append '_constrained' to the functional forms of the angles and bonds. :param ff_handle: the handle of the ffio_block :type ff_handle: destro.Destro """ # this will be a list of ffio_c1 values for [angle, bond1, bond2] ffio_c1s = [] for block in [ff_handle.ffio_angles, ff_handle.ffio_bonds]: for row in block: funct = row.ffio_funct if not funct.endswith('_constrained'): row.ffio_funct = funct + '_constrained' ffio_c1s.append(row.ffio_c1) block = [[[1, 2, 3], 'HOH', ffio_c1s]] del ff_handle.ffio_constraints ffio_add_block(ff_handle, 'ffio_constraints', block)
################################################################################
[docs]def main(vargs=[]): # noqa: M511 usage = ''' %prog [options] mae_file maeff_outfile Description: viparr is a force field parameter assignment program. -f and -d are used to specify force fields; the order of force fields is important: earlier ones take precedence over later ones. Simple example: %prog -f charmm27 -f spc mae_file maeff_outfile More complicated example: %prog -d me/myfiles/myff -f charmm27 -f spc mae_file maeff_outfile ''' def parser_add_directory(option, opt_str, value, parser, ffdir): if ffdir == None: raise AssertionError('VIPARR_FFDIR environment variable undefined; '+ \ 'no built-in force fields can be specified') parser.values.forcefield.append(os.path.join(ffdir, value)) def parser_merge(option, opt_str, value, parser): if len(parser.values.forcefield) == 0: raise AssertionError('Specify -f or -d before -m') print('Merging <%s> and <%s>' % (parser.values.forcefield[-1], value)) destdir = viparr_merge.merge(parser.values.forcefield[-1], value) print('Using temporary directory <%s>' % destdir) parser.values.forcefield[-1] = destdir ffdir = get_ffdir() pdir = get_pdir() if not ffdir: usage += 'VIPARR_FF_PATH is empty: No built-in force fields' else: usage += 'Available built-in force fields:\n ' temp = os.listdir(ffdir) temp.sort() usage += '\n '.join(temp) opt = optparse.OptionParser(usage=usage, version=viparr_version) opt.set_defaults(ctnum=None) opt.add_option('-c', type='int', dest='ctnum', action="append", help='''run viparr for selected CT blocks only, e.g., for the first and third CT blocks, use "-c 1 -c 3"''') opt.add_option('-f', type='string', dest='ffname', action='callback', callback=parser_add_directory, callback_args=(ffdir,), help='''built-in force field name; several can be listed, each preceded by its own -f''') opt.add_option('-d', action='append', dest='forcefield', default=[], help='''user-provided force field directory; several can be listed, each preceded by its own -d''') opt.add_option('-m', type='string', dest='merge_dir', action='callback', callback=parser_merge, help='''path to user-defined force field directory that is to be merged with previously specified force field; several can be listed, each preceded by its own -m''') opt.add_option('-n', dest='ffio_name', default='Generated by viparr', help='force field name to put into output file') opt.add_option('-p', dest='plugindir', default=pdir, help='override for plugins directory (advanced usage)') opt.add_option('-x', action="store_true", dest='no_header', default=False, help='do not print header info (for testing purposes)') opt.add_option('-v', action="store_true", dest='verbose', default=False, help='verbose output') opt.add_option( '-w', action="store_true", dest='is_water', default=False, help='use this option when running viparr on a water ct, this ' 'will recreate any constraints block that is removed') if (vargs == []): opts, args = opt.parse_args() else: opts, args = opt.parse_args(vargs) print('viparr', viparr_version) # check number of arguments first if len(args) != 2: opt.error('incorrect number of arguments: %d %s' % (len(args), str(args))) mae_file = args[0] maeff_outfile = args[1] # check that there is at least one forcefield if len(opts.forcefield) < 1: raise AssertionError('No force field specified') # check that each directory exists print('All FF directories, in order:\n', '\n'.join(opts.forcefield)) for dir in opts.forcefield: if not os.path.isdir(dir): raise AssertionError('Force field dir <%s> is not a directory' % dir) print('Plug-in directory:', opts.plugindir) if opts.plugindir == None: raise AssertionError('No plug-in directory specified with -p or with '+ \ 'VIPARR_PDIR environment variable') else: if not os.path.isdir(opts.plugindir): raise AssertionError('Plug-in dir <%s> is not a directory' % opts.plugindir) # open maestro file print('Reading maestro file <%s> ...' % mae_file) if mae_file.endswith('.mae.gz') or mae_file.endswith('.maegz'): import gzip mae_buffer = gzip.open(mae_file, 'rb').read() M = Maeff(mae_buffer) else: with open(mae_file) as f: M = Maeff(f) print('Done reading maestro file') num_structures = len(M) # create structure list, which is 0-based list of ct numbers to process if opts.ctnum == None: structure_list = list(range(num_structures)) else: structure_list = [] for e in opts.ctnum: if e > num_structures or e < 1: raise AssertionError('Bad value <%d> for selected CTNUM' % e) structure_list.append(e - 1) # skip structure if it is a "full_system" for s in structure_list: if M[s + 1].__contains__('ffio_ct_type'): if M[s + 1].ffio_ct_type == 'full_system': print('Skipping full_system structure <%d>' % (s + 1)) structure_list.remove(s) if len(structure_list) == 0: print('No CT blocks to process...exiting') sys.exit(1) # process the atoms and bonds in each ct # this should be the only place that use CtBonds and CtAtoms # ct_atoms and ct_bonds may have entries that are None ct_atoms = [None] * num_structures ct_bonds = [None] * num_structures for s in structure_list: print('Analyzing ct block %d' % (s + 1)) ct = M[s + 1] ct_bonds[s] = CtBonds(ct) ct_atoms[s] = CtAtoms(ct) # residue list for each structure for each force field all_residue_lists = [[[] for i in range(num_structures)] \ for j in range(len(opts.forcefield))] ###################################################################### # Loop for constructing the residue lists # We do this as a pre-processing step so that we can check that all residues # were matched by some template for idir, dir in enumerate(opts.forcefield): templates = {} tnames = [t for t in glob.glob(os.path.join(dir, 'templates*'))] for tmpl in tnames: viparr_merge.update_templates_strict(templates, tmpl) #templates_check.templates_check(templates) viparr_residue.templates_reorder_atoms(templates) # loop through all ct blocks for s in structure_list: print('******Constructing residue list for <%s> Structure %d' % (dir, s + 1)) residue_list = viparr_residue.get_residue_list( ct_atoms[s], ct_bonds[s], templates) # save this residue list all_residue_lists[idir][s] = residue_list ###################################################################### # Check that all residues were matched by some force field # and print diagnostic messages. # Also do the essential step of overwriting unmatched residues with 'None' derror = dict() dwarn = dict() dmatch = dict() num_messages = 5 # number of messages per residue name num_errors = 0 for s in structure_list: for i in range(len(all_residue_lists[0][s])): matches = [] for idir, dir in enumerate(opts.forcefield): res = all_residue_lists[idir][s][i] if res.match: matches.append(dir) if res.name_orig != res.name: pair = (res.name_orig, res.name) dmatch[pair] = dmatch.setdefault(pair, 0) + 1 if dmatch[pair] <= num_messages: print('Note: Struct %d, Res <"%s",%d> (%s) matched (%s)' % \ (s +1, res.chain, res.resnum_orig, res.name_orig, res.name)) if dmatch[pair] == num_messages: print( 'Ignoring further messages for (%s) matching (%s)' % pair) if len(matches) != 1: res = all_residue_lists[idir][s][i] which = (s + 1, res.chain, res.resnum_orig, res.name_orig) if len(matches) == 0: num_errors = num_errors + 1 derror[res.name_orig] = derror.setdefault(res.name_orig, 0) + 1 if derror[res.name_orig] <= num_messages: formula = viparr_residue.residue_formula( [ct_atoms[s][ii].atomicnum for ii in res.nlist]) print( 'Error: Struct %d, Res <"%s",%d> (%s) not matched by any force field template' % which, 'with formula', formula) if derror[res.name_orig] == num_messages: print('Ignoring further errors for (%s)' % res.name_orig) elif len(matches) > 1: dwarn[res.name_orig] = dwarn.setdefault(res.name_orig, 0) + 1 if dwarn[res.name_orig] <= num_messages: print( 'Warning: Struct %d, Res <"%s",%d> (%s) matched by multiple force fields:' % which, matches) if dwarn[res.name_orig] == num_messages: print('Ignoring further warnings for (%s)' % res.name_orig) for idir, dir in enumerate(opts.forcefield): if all_residue_lists[idir][s][i].match == False: all_residue_lists[idir][s][i] = None # overwrite if num_errors > 0: sys.exit(1) ################################################################################### # Get the combining rules and function type and check that they are consistent # also check the forcefield versions for incompatability comb_rule = set() vdw_func = set() for dir in opts.forcefield: rules = parse_json(os.path.join(dir, 'rules')) minver = rules.get('min_viparr_version', viparr_version) if (list(map(int, minver.split('.'))) > list( map(int, viparr_version.split('.')))): raise AssertionError( 'Forcefield %s needs viparr version >= %s. Current version = %s' % (dir, minver, viparr_version)) x = rules.get('vdw_comb_rule', '').upper() if x == '': x = 'WATER' comb_rule.add(x) x = rules.get('vdw_func', 'LJ12_6_sig_epsilon') vdw_func.add(x) if len(vdw_func) != 1: print(vdw_func) raise AssertionError('Inconsistent vdW func rules among force fields') vdw_func = vdw_func.pop() if comb_rule == set(['WATER', 'ARITHMETIC/GEOMETRIC']) or \ comb_rule == set(['WATER', 'GEOMETRIC']): comb_rule.remove('WATER') if comb_rule == set(['WATER']): comb_rule = set(['ARITHMETIC/GEOMETRIC']) if len(comb_rule) != 1: print(comb_rule) raise AssertionError('Inconsistent vdW mixing rules among force fields') comb_rule = comb_rule.pop() ################################################################################### # Get the exclusion rules and check that they are consistent # This is NOT necessary! We already fail if a bond straddles 2 forcefields. # This prevents us from needing to worry about mixing 2 different exclusion rules # as exclusions/pair are based on topology... # #exclusion_rule = [] #for dir in opts.forcefield: # rules = parse_json(os.path.join(dir, 'rules')) # x = rules.get('exclusions', 4) # if( x not in exclusion_rule): # exclusion_rule.append(x) #if len(exclusion_rule) != 1: # print exclusion_rule # raise AssertionError, 'Inconsistent exclusion rules among force fields' #exclusion_rule = exclusion_rule[0] ###################################################################### # Main loop algorithm: # Loop over the structures in the ct # Loop over the force fields (some duplicate work here if multiple structures) # Loop over the plug-ins # Apply plug-in for s in structure_list: print('******Main loop: Structure', s + 1) strules = [] sttemplates = [] # length is num_residues in structure pseudo_residue_list = [[] for i in range(len(all_residue_lists[0][s]))] # We need all the pseudos in the structure now so we can get the site indexing correct for idir, dir in enumerate(opts.forcefield): print('Pre-processing force field <%s> for rules and pseudos' % dir) strules.append(parse_json(os.path.join(dir, 'rules'))) templates = {} tnames = [t for t in glob.glob(os.path.join(dir, 'templates*'))] for tmpl in tnames: viparr_merge.update_templates_strict(templates, tmpl) templates_convert(templates) # update pseudo_list update_pseudo_residue_list( idir, pseudo_residue_list, ct_atoms[s], templates, all_residue_lists[idir][s]) # before compression sttemplates.append(templates) tables = Struct() # the tables describing the chemical structure blocks = dict( ) # dictionary of blocks that will be written to the mae file info = list( ) # list of strings which are references for the force fields info.append(('Plugins: <%s>' % opts.plugindir, '')) for idir, dir in enumerate(opts.forcefield): print('Processing force field <%s>' % dir) tables.rules = strules[idir] tables.templates = sttemplates[idir] # construct lists that describe the chemical system for this force field lists = construct_lists(idir, ct_atoms[s], ct_bonds[s], tables.rules, tables.templates, all_residue_lists[idir][s], pseudo_residue_list) # so rules can be passed to plugins... # Viparr should take the position that rules are for internal use only. # This dosent stop viparr from passing certain of these values to plugins however (see construct_lists) # # lists.rules = tables.rules lists.comb_rule = comb_rule lists.vdw_func = vdw_func # so that ff directory is passed to plugins... lists.dir = dir # pass templates lists.templates = tables.templates # pass processed blocks lists.blocks = {} #info.append('FF dir: <%s>' % lists.dir) # force field directory #info.extend(lists.rules.get('info', ['No info'])) ffd = 'FF: <%s>' % lists.dir data = tables.rules.get('info', ['No info']) for d in data: info.append((ffd, d)) # loop over plug-ins pluglist = [p[0] for p in tables.rules['plugins']] if ('atoms' in pluglist and pluglist[-1] != 'atoms'): raise UserWarning( "atoms plugin should be last plugin applied. (Check rules file)" ) for plugin_setup in tables.rules['plugins']: plugin = plugin_setup[0] plugin_cfg = plugin_setup[1] if opts.verbose: print('Applying plugin <%s>' % plugin) context = {} ns = dict(context=context) # currently only read plugins from plugindir (or current dir if no plugindir) fname = os.path.join(opts.plugindir, plugin + '.py') with open(fname, 'rb') as f: exec(compile(f.read(), fname, 'exec'), ns) # define and 'declare' the plugin temp = context['callback'](lists, plugin_cfg) # for each component of temp, append to the corresponding local ff block for name in list(temp): if lists.blocks.get(name) != None: merge_blocks2(lists.blocks[name], temp[name]) lists.blocks[name] = temp[name] # for each component of lists.blocks, append to the corresponding global ff block for name in list(lists.blocks): if blocks.get(name) != None: merge_blocks2(blocks[name], lists.blocks[name]) blocks[name] = lists.blocks[name] print('Building mae data structure...') # put the terms in the maestro file del M[ s + 1].ffio_ff # delete any existing ffio block - could be very important! ff_handle = M[s + 1].__new_block__('ffio_ff') # add header if not opts.no_header: ffio_add_header(ff_handle, opts.ffio_name, info) # add vdw func type ff_handle.ffio_vdw_func = vdw_func # add combining rule ff_handle.ffio_comb_rule = comb_rule # add pseudo block ffio_add_pseudo_block(ff_handle, pseudo_residue_list) # add each block automatically in sorted order keys = list(blocks) keys.sort() for k in keys: # skip cmap if no torsion_torsion if k[:4] == 'cmap' and len(blocks.get('torsion_torsion', [])) == 0: continue if k == 'atoms': ffio_add_atoms_block(ff_handle, blocks.get(k)) else: try: ffio_add_block(ff_handle, 'ffio_' + k, blocks.get(k)) except: # noqa: E722 print("Failed while adding 'ffio_%s':" % (k)) for f in blocks.get(k): print(f) if opts.is_water: add_water_constraints(ff_handle) # write the file print('Writing maestro file <%s>' % maeff_outfile) if maeff_outfile.endswith('.mae.gz') or maeff_outfile.endswith('.maegz'): # unfortunately, the implementation of print and str functions are different # if we just use below statement, we will miss linefeed in the actual output. # import gzip # gzip.open(maeff_outfile, 'w').write(str(M)) # The solution here is to write to a temporary file and read it back before # compressing it. from tempfile import mkstemp handle, tmpfile = mkstemp() with open(tmpfile, 'w') as f: f.write(str(M)) import gzip with open(tmpfile) as f: gzip.open(maeff_outfile, 'w').write(f.read()) os.remove(tmpfile) else: with open(maeff_outfile, 'w') as f: f.write(str(M))
if __name__ == '__main__': main()