"""
Desmond workup techniques, all based on actual numerical results. Intended for
import and use as a test workup.
Running directly allows execution of any of the workup methods
Methods for comparing the following Desmond-style information:
- `Metadynamics FES files<desmond_compare_FES>`
- `FEP energies from FEP 'result' files<desmond_FEP_energies>`
- Surface area of membrane systems
- .ene files:
- `similarity to a reference ene file<desmond_compare_ene>`
- log files: `Calculation speed (ns/day)<desmond_speed>`
- `st2 files<desmond_st_stats>`: comparison of median, mean, and standard
deviation to reference values
- `deltaE.txt files<desmond_compare_deltaE>`: compare two deltaE.txt files
Classes and helper functions have 'private' names, as only the workup methods
are intended to be imported and used as test workups.
$Revision 0.1 $
@TODO: improve speed test to print avg ns/day.
@TODO: improve speed test to be dependent on cpu^0.8
@copyright: (c) Schrodinger, Inc. All rights reserved.
"""
import gzip
import os
import re
import sys
import tarfile
from collections import Counter
from collections import defaultdict
from functools import partial
from past.utils import old_div
from typing import TYPE_CHECKING
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
import numpy
import pytest
from more_itertools import pairwise
from schrodinger import structure
from schrodinger.application.desmond import cmj
from schrodinger.application.desmond import constants
from schrodinger.application.desmond import stage
from schrodinger.utils import sea
from . import standard_workups
if TYPE_CHECKING:
from schrodinger.application.scisol.packages.fep import graph
_version = "$Revision 0.1 $"
REPLICA = re.compile(r"\[T(\d+), *T(\d+)\] *: *\[(.+), *(.+)\]")
# *****************************************************************************
# METADYNAMICS
def _FES(filename):
"""Returns a numpy array based on a FES format file."""
fes1 = []
with open(filename) as f:
for line in f:
line = line.strip()
if "#" in line:
continue
elif line:
fes1.append([float(x) for x in line.split()])
return numpy.array(fes1)
[docs]def desmond_compare_FES(file1, file2, tolerance=0.1):
"""
Compare metadynamics FES files.
Tolerance is optional, and defaults to 0.1
"""
fes1 = _FES(file1)
fes2 = _FES(file2)
try:
rmsd = fes2 - fes1
rmsd = rmsd * rmsd
rmsd = old_div(numpy.sum(rmsd.flat), len(fes1.flat))
rmsd = numpy.sqrt(rmsd)
if rmsd > tolerance:
msg = "FES were different, RMSD=%.2f" % rmsd
raise AssertionError(msg)
return True
except ValueError:
msg = "FES shapes were different!"
raise AssertionError(msg)
#THIS CODE WILL GENERATE A FES:
#from schrodinger.application.desmond.meta import MetaDynamicsAnalysis
#meta_data = MetaDynamicsAnalysis(, inp_fname=opts.input)
#FES = meta_data.computeFES(outputname)
#
#alternately, just add an analysis stage to a metadynamics job!
# *****************************************************************************
# FEP
[docs]def desmond_FEP_energies(filename, reference, tolerance=0.5):
"""
Compares the deltaF in an FEP result file to a standard.
"""
with open(filename) as result:
for line in result:
if "deltaF" in line:
delta_f = float(line.split()[3])
if abs(delta_f - reference) > tolerance:
msg = " Different deltaF: {} vs {} ".format(
delta_f, reference)
raise AssertionError(msg)
else:
return True
msg = " deltaF not found in file %s" % filename
raise AssertionError(msg)
# *****************************************************************************
# Membrane Systems
[docs]def desmond_check_surface_area(filename,
reference,
tolerance=0.5,
side1="r_chorus_box_ax",
side2="r_chorus_box_bx"):
"""
Compares surface area to a standard with optional tolerance and optional
choice of box sides. Default sides are a and b.
example usage: desmond_check_surface_area('filename', 7.5, tolerance=0.1)
"""
struct = next(structure.MaestroReader(filename))
side1 = struct.property[side1]
side2 = struct.property[side2]
area = side1 * side2
if old_div(abs(area - reference), reference) > tolerance:
msg = f" Areas were different: {area} vs {reference}"
raise AssertionError(msg)
return True
[docs]def desmond_tgz_file_list(tgz_fname, *fnames_to_check):
import tarfile
with tarfile.open(tgz_fname) as tf:
am = tf.getmembers()
mn = [m.name for m in am]
fnames_to_check = list(fnames_to_check)
fnames_to_check.sort()
mn.sort()
if mn == fnames_to_check:
return True
msg = ' '.join((mn, fnames_to_check))
raise AssertionError(msg)
# *****************************************************************************
# .ene files
class _EneFile:
"""
Class to find and contain the information in a Desmond .ene file. Also
does comparisons based on:
- compare initial and final energies to reference
- compare temperature profiles (useful for SAmd)
- number of time points and their separation
"""
def __init__(self, filename=None, tol=10.0):
self.eneData = {}
self.eneUnits = {}
self.tol = tol
if filename:
self.load_system(filename)
else:
self._filename = None
def filename(self, filename=None):
"""
Gets and sets the filename. Prevents having a loaded system which
does not match the file name.
"""
if filename:
self.load_system(filename)
else:
return self._filename
def load_system(self, filename):
"""Loads the information from file filename into the EneFile object."""
self._filename = filename
name_rows = [] #list describes the property name in each row
with open(self._filename) as ene_file:
for line in ene_file:
if "#" == line[0]:
array = line[1:].split()
if '0' == array[0][0]:
for elem in array:
if ":" in elem:
name = elem.split(':')[-1]
name_rows.append(name)
self.eneData[name] = []
elif "(" == elem[0] and ")" == elem[-1]:
#Add the name of the unit based on the most
#recent quantity name found
self.eneUnits[name_rows[-1]] = elem[1:-1]
else:
raise Exception(
"unknown row type in row keys list")
elif name_rows:
for i, datum in enumerate(line.split()):
self.eneData[name_rows[i]].append(float(datum))
def quantity_average(self, quantity):
"""
Finds the average of a quantity.
"""
return numpy.average(self.eneData[quantity])
def compare_units(self, other):
"""
Compares two EneFile objects based on whether the quantities are
measured in the same units in both files.
"""
for value in set(list(self.eneUnits) + list(other.eneUnits)):
if self.eneUnits.get(value, "") != other.eneUnits.get(value, ""):
msg = (" Units did not match for %s (%s vs %s)" %
(value, self.eneUnits.get(
value, ""), other.eneUnits.get(value, "")))
raise AssertionError(msg)
return True
def compare_quantity(self, other, quantity):
"""
Compares two EneFile objects based on the initial and final and average value of a
named quantity.
"""
max_diff = 0.1**self.tol
if abs(self.eneData[quantity][0] -
other.eneData[quantity][0]) > max_diff:
msg = " Initial values for {} differ: {} vs {}".format(
quantity, self.eneData[quantity][0], other.eneData[quantity][0])
raise AssertionError(msg)
if abs(self.eneData[quantity][-1] -
other.eneData[quantity][-1]) > max_diff:
msg = " Final values for {} differ: {} vs {}".format(
quantity, self.eneData[quantity][-1],
other.eneData[quantity][-1])
raise AssertionError(msg)
if abs(
self.quantity_average(quantity) -
other.quantity_average(quantity)) > max_diff:
msg = " Average values for {} differ: {} vs {}".format(
quantity, self.quantity_average(quantity),
other.quantity_average(quantity))
raise AssertionError(msg)
return True
def compare_times(self, other):
"""
Compares two EneFile objects based on the number of points in each
simulation as well as the final time point.
"""
r_value = (len(self.eneData['time']) == len(other.eneData['time']) and
self.eneData['time'][-1] == other.eneData['time'][-1])
if not r_value:
raise AssertionError(" Time profiles did not match")
return True
def compare_temperature_profile(self, other):
"""
Compares two EneFile objects based on the temperatures at each
timepoint.
"""
temps_self = numpy.array(self.eneData["T"])
temps_other = numpy.array(other.eneData["T"])
diffs = numpy.abs(temps_self - temps_other)
if max(diffs) > 40:
msg = " The maximum difference in Temperature at any given time"\
" point was large: {}".format(max(diffs))
raise AssertionError(msg)
return True
[docs]def desmond_compare_ene(file1, file2, tolerance, *quantities):
"""
Compares two Desmond ene files.
Checks that any quantities requested from the ene file match
within some tolerance number of decimal points.
usage desmond_compare_ene('file1.ene', 'file2.ene', 2, 'P', 'E_c')
"""
ene1 = _EneFile(file1, tolerance)
ene2 = _EneFile(file2, tolerance)
successes = list(map(partial(ene1.compare_quantity, ene2), quantities))
return all(successes)
[docs]def desmond_compare_eneseqs(file1,
file2,
*,
columns=None,
min_length=10,
atol=1e-2,
rtol=1e-5,
mean_atol=1e-4,
mean_rtol=1e-5):
"""
Compare two columns between two Desmond ene files at matching time points.
:param columns: Column names to compare. Use all by default.
:type columns: sequence of str
:param min_length: Require at least that many matching time points.
:type min_length: int
:param atol: Absolute tolerance (see `numpy.isclose`).
:type atol: float
:param rtol: Relative tolerance (see `numpy.isclose`).
:type rtol: float
:param mean_atol: Absolute tolerance for mean values (see `numpy.isclose`).
:type mean_atol: float
:param mean_rtol: Relative tolerance for mean values (see `numpy.isclose`).
:type mean_rtol: float
"""
TIME_COLUMN = 'time'
ene1 = _EneFile(file1)
ene2 = _EneFile(file2)
if columns is None:
assert ene1.eneData.keys() == ene2.eneData.keys(), \
" Column names do not match"
columns = ene1.eneData.keys() - {TIME_COLUMN}
else:
assert set(columns) <= ene1.eneData.keys(), \
" Some of the columns are missing in file1"
assert set(columns) <= ene2.eneData.keys(), \
" Some of the columns are missing in file2"
for (label, sequence) in (('file1', ene1.eneData[TIME_COLUMN]),
('file2', ene1.eneData[TIME_COLUMN])):
assert all(a < b for a, b in pairwise(sequence)), \
f" Non-monotonic time values in {label}"
common_times, idx1, idx2 = numpy.intersect1d(ene1.eneData[TIME_COLUMN],
ene2.eneData[TIME_COLUMN],
assume_unique=True,
return_indices=True)
num_common = len(idx1)
assert min_length > 0
assert num_common >= min_length, \
f" Too few matching time points: {num_common} < {min_length}"
for c in sorted(columns):
x1 = numpy.asarray(ene1.eneData[c])[idx1]
x2 = numpy.asarray(ene2.eneData[c])[idx2]
close = numpy.isclose(x1, x2, atol=atol, rtol=rtol)
if not close.all():
not_close = [arr[~close] for arr in (common_times, x1, x2)]
print(f"Divergent values of '{c}' at:")
for t_, x1_, x2_ in zip(*not_close):
print(f"t={t_}: {x1_} vs {x2_}")
assert close.all(), \
f" Divergent values of '{c}', see stdout for details"
mean1, mean2 = x1.mean(), x2.mean()
assert numpy.isclose(mean1, mean2, atol=mean_atol, rtol=mean_rtol), \
f" Divergent values of mean('{c}'): {mean1} vs {mean2}"
# *****************************************************************************
# log files
[docs]def desmond_speed(filename, reference_rate, tolerance=0.05):
"""
Compares the average rate per step in ns/day against a standard.
example usage: desmond_speed('file1.log', 7.5, tolerance=0.1)
:param reference_rate: Value to be compared against in ns/day
:param tolerance: Tolerance. Default is 5%.
"""
with open(filename) as logfile:
for line in logfile:
if "Total rate per step" in line:
rate = float(line.split()[-2])
if old_div(abs(rate - reference_rate),
reference_rate) > tolerance:
msg = (" Rates: new: %.1f, reference: %.1f" %
(rate, reference_rate))
raise AssertionError(msg)
else:
return True
msg = " Total rate per step not found."
raise AssertionError(msg)
[docs]def desmond_stage_time(log_file, stage_num, max_time):
"""
:param log_file : Name of log file to be analyzed.
:type log_file : str
:param stage_num : Stage number
:type stage_num : int
:param max_time : Maximum acceptable time for the stage in seconds
:type max_time : float
:return : `True` if the stage completed successfully within `max_time` seconds, or `False` if not.
"""
search_str = 'Stage %d duration:' % stage_num
try:
with open(log_file) as file_data:
stage_duration = [
line.strip(search_str)
for line in file_data
if re.search(search_str, line)
][-1]
except IndexError:
return False
with open(log_file) as fh:
multisim_complete = fh.read().find(
"\nStage %d completed successfully." % stage_num) != -1
h, m, s = [e[:-1] for e in stage_duration.split()]
duration = float(h) * 3600 + float(m) * 60 + float(s)
return ((duration < max_time) and multisim_complete)
# *****************************************************************************
# st2 files.
#compare averages and ranges.
[docs]def desmond_st_stats(filename, *options):
"""
Compares the mean, median and/or standard deviation of a desmond st2 file to
reference values.
usage: desmond_st_stats('file.st2', 'mean=1.2', 'median=2.3', 'stddev=1.2', 'tol=0.2')
"""
with open(filename) as fh:
cfg = sea.Map(fh.read())
meanRef, medianRef, stddevRef = None, None, None
tolerance = 0.4
for a in options:
key, val = a.split("=")
val = float(val)
if key.lower() == "mean":
meanRef = val
elif key.lower() == "median":
medianRef = val
elif key.lower() == "stddev":
stddevRef = val
elif key.lower()[:3] == "tol":
tolerance = val
try:
rvalue = True
values = []
for st in cfg.Keywords:
for key in st:
if key == "Angle" or key == "Torsion":
values = [x.val for x in st[key]["Result"]]
values = _normalize_angular_values(values)
else:
values = [x.val for x in st[key]["Result"]]
errors = []
if meanRef:
_compare_stats(values, meanRef, tolerance, numpy.mean, "Mean",
errors)
if medianRef:
_compare_stats(values, medianRef, tolerance, numpy.median, "Median",
errors)
if stddevRef:
_compare_stats(values, stddevRef, tolerance, numpy.std,
"Standard deviation", errors)
if errors:
raise AssertionError('\n'.join(errors))
except Exception as e:
raise AssertionError(str(e))
def _compare_stats(values, reference, tolerance, function, name, errors):
value = function(values)
if abs(reference - value) > tolerance:
msg = f" {name} differs: {value} vs. {reference}"
errors.append(msg)
def _normalize_angular_values(array):
"""
Centers an array of angles in the unit of degree to minimize its standard
deviation.
:param array : Array to be (possibly) modified.
:type array : list
:return : Values, some of which (possibly) offset (by 360 deg) to minimize std dev.
:rtype : list
"""
good_array = array
stddev = numpy.std(good_array)
for shift in numpy.arange(0, 360, 2.7):
tmp = list(((elem + shift) % 360 - shift) for elem in array)
if numpy.std(tmp) < stddev:
good_array = tmp
stddev = numpy.std(good_array)
good_array -= (numpy.mean(good_array) // 360) * 360
return good_array.tolist()
[docs]def read_dE_file(fname):
"""
Read a dE file and return a dict from time to energy_dict;
energy_dict is a dict from (lambda_win, lambda_win) to energy value
"""
dic, energy = {}, {}
with open(fname) as fh:
for line in fh:
line = line.strip()
if 'Time' == line[:4]:
time = line[6:]
dic = energy[time] = {}
elif '[' == line[0]:
match = REPLICA.match(line)
if match:
r0, r1, ef, er = match.group(1, 2, 3, 4)
r0 = int(r0)
r1 = int(r1)
dic[(r0, r1)] = ef
dic[(r1, r0)] = er
return energy
[docs]def desmond_compare_deltaE(file1, file2):
"""
Compares two Desmond deltaE.txt files.
Checks time and energy. No tolerance (exact sameness).
usage: desmond_compare_deltaE('file1', 'file2')
"""
energy1 = read_dE_file(file1)
energy2 = read_dE_file(file2)
return energy1 == energy2
"""
if len(energy1) != len(energy2):
return False
for k, d1 in energy1.iteritems():
try:
d2 = energy2[k]
except KeyError:
return False
if len(d1) != len(d2):
return False
for l, e1 in d1.iteritems():
try:
e2 = d2[l]
except KeyError:
return False
if e1 != e2:
return False
return True
"""
[docs]def desmond_compare_energy_group(file1, file2, tolerance=0):
"""
Compares two Desmond enegrp.dat files.
Checks all values upto tolerance value.
usage: desmond_compare_energy_group('file1', 'file2', tolerance=0)
"""
def compare_value_line(l1, l2):
k1 = l1[:l1.index('(')].strip()
k2 = l2[:l2.index('(')].strip()
v1 = l1[l1.index(')') + 1:].strip()
v2 = l2[l2.index(')') + 1:].strip()
return k1 == k2 and my_compare(v1, v2)
def compare_time_line(l1, l2):
d1 = read_time_line(l1)
d2 = read_time_line(l2)
if len(d1) != len(d2):
return False
for k, v1 in d1.items():
try:
v2 = d2[k]
except KeyError:
return False
if not my_compare(v1, v2):
return False
return True
def read_time_line(line):
d = {}
for s in line.split():
k, v = s.split('=')
d[k] = v
return d
def my_compare(v1, v2):
v1 = float(v1)
v2 = float(v2)
return abs(v1 - v2) <= tolerance
with open(file1) as fh1,\
open(file2) as fh2:
for i, (line1, line2) in enumerate(zip(fh1, fh2)):
if '(' in line1:
if not compare_value_line(line1, line2):
msg = f'line {i} {line1} differ from {line2}'
raise AssertionError(msg)
else:
if not compare_time_line(line1, line2):
msg = f'line {i} {line1} differ from {line2}'
raise AssertionError(msg)
return True
[docs]def desmond_test_mean_num_waters(tarname, limits):
"""
Check that the mean number of waters in the simulation is within the given
limits.
:param tarname: the input tar file name
:type tarname: str
:param limits: a 2-tuple containing the min and max values
:type limits: tuple
"""
with tarfile.open(tarname) as tarball:
for name in tarball.getnames():
if name.endswith('.log'):
break
else:
raise ValueError("No logfile found")
# Read the average number of waters at the end of each GCMC plugin
# iteration from the logfile of the GCMC simulation.
nwaters = []
# Format: Total solvent in simulation box: <N> Average: <x>
check_phrase = 'Total solvent in simulation box: '
# compatibility with quiet format:
# GCMC Moves accepted ... Total solvent molecules: <N>
check_phrase_quiet = 'GCMC moves accepted: '
with tarball.extractfile(name) as f:
for line in f:
# extractfile doesn't give decoded output
decoded = line.decode('utf-8')
if decoded.startswith(check_phrase):
# Extract the average number of particles at the end of
# a given GCMC plugin iteration
nwaters.append(float(decoded.split()[-1]))
elif decoded.startswith(check_phrase_quiet):
# Extract the instantaneous number of particles
# (not the average) at regular intervals
nwaters.append(int(decoded.split()[-1]))
# Average over possibly many GCMC plugin iterations
mean_num_waters = numpy.mean(nwaters)
nmin, nmax = limits
return nmin < mean_num_waters < nmax
[docs]def desmond_check_fep_main_msj(main_msj_fname: str,
fep_type: constants.FEP_TYPES):
"""
Basic checks for the main msj.
:param main_msj_fname: The input main msj file name.
:param fep_type: Type of fep used to run the job.
"""
from schrodinger.application.scisol.packages.fep.graph import Legtype
legs = (Legtype.COMPLEX.value, Legtype.SOLVENT.value)
job_name = os.path.splitext(os.path.basename(main_msj_fname))[0]
msj_as_sea = cmj.msj2sea(main_msj_fname)
found_mapper = False
found_launcher = False
found_fep_cleanup = False
found_protein_generator = False
mapper_name = {
constants.FEP_TYPES.SMALL_MOLECULE: stage.FepMapper.NAME,
constants.FEP_TYPES.METALLOPROTEIN: stage.FepMapper.NAME,
constants.FEP_TYPES.COVALENT_LIGAND: stage.CovalentFepMapper.NAME,
constants.FEP_TYPES.PROTEIN_STABILITY: stage.ProteinFepMapper.NAME,
constants.FEP_TYPES.PROTEIN_SELECTIVITY: stage.ProteinFepMapper.NAME,
constants.FEP_TYPES.LIGAND_SELECTIVITY: stage.ProteinFepMapper.NAME,
}
has_fragment_linking = fep_type == constants.FEP_TYPES.SMALL_MOLECULE
for s in msj_as_sea.stage:
msg_base = f"{main_msj_fname}: {s.__NAME__}"
if s.__NAME__ == mapper_name[fep_type]:
found_mapper = True
# Check mapper args
if fep_type == constants.FEP_TYPES.SMALL_MOLECULE:
pass
elif fep_type == constants.FEP_TYPES.COVALENT_LIGAND:
assert len(s.graph_file.val), \
f"{msg_base}.graph_file is not set."
elif fep_type == constants.FEP_TYPES.METALLOPROTEIN:
assert len(s.mp.val), \
f"{msg_base}.mp is not set."
assert sum(['i_fep_' in v for v in s.mp.val]), \
f"{msg_base}.mp is not set correctly, missing 'i_fep_' in name."
elif fep_type in constants.PROTEIN_FEP_TYPES:
assert s.fep_type.val == fep_type, \
f"{msg_base}.fep_type is not set correctly. Found {s.fep_type.val}, expected {fep_type}."
if fep_type in [
constants.FEP_TYPES.PROTEIN_SELECTIVITY,
constants.FEP_TYPES.LIGAND_SELECTIVITY
]:
assert s.asl.val, \
f"{msg_base}.asl is empty for selectivity calculation."
else:
assert False, f"Unknown fep_type: {fep_type}"
elif s.__NAME__ == stage.FepLauncher.NAME:
found_launcher = True
for ileg, leg in enumerate(legs):
expected_msj = {
constants.SIMULATION_PROTOCOL.CHARGED: f'{job_name}_chg_{leg}.msj',
constants.SIMULATION_PROTOCOL.FORMALCHARGED: f'{job_name}_chg_{leg}.msj',
constants.SIMULATION_PROTOCOL.COREHOPPING: f'{job_name}_corehopping_{leg}.msj',
constants.SIMULATION_PROTOCOL.MACROCYCLE_COREHOPPING: f'{job_name}_corehopping_{leg}.msj',
constants.SIMULATION_PROTOCOL.DEFAULT: f'{job_name}_{leg}.msj',
}
if has_fragment_linking:
expected_msj[
constants.SIMULATION_PROTOCOL.
FRAGMENT_LINKING] = f'{job_name}_fragment_linking_{leg}.msj'
for name, msj_fname in expected_msj.items():
assert name in s.dispatch.val, f"Missing {name} dispatcher."
assert msj_fname in s.dispatch[name].val[
ileg], f"Missing {msj_fname} in {name} dispatcher."
assert s.dispatch.val.keys() == expected_msj.keys(), \
f"Additional dispatchers found: " \
f"{set(s.dispatch.val.keys()) - set(expected_msj.keys())}, " \
"please update desmond_workups and the stu tests."
if has_fragment_linking:
for ileg, leg in enumerate(constants.FRAGMENT_LINKING_JOBS.VAL):
msj_fname = f'{job_name}_fragment_linking_{leg}.msj'
assert msj_fname in s.dispatch[name].val[
ileg], f"Missing {msj_fname} in {name} dispatcher."
elif s.__NAME__ == stage.ProteinMutationGenerator.NAME:
assert fep_type in constants.PROTEIN_FEP_TYPES, f"{msg_base}: not a protein fep calculation!"
assert s.mutation_file.val, f"{msg_base}.mutation_file is empty."
found_protein_generator = True
found_fep_cleanup = found_fep_cleanup or s.__NAME__ == stage.FepMapperCleanup.NAME
assert found_mapper, "Mapper stage not found"
assert found_launcher, "Launcher stage not found"
assert found_fep_cleanup, "Cleanup stage not found"
if fep_type in constants.PROTEIN_FEP_TYPES:
assert found_protein_generator, "Protein mutation generator stage not found"
return True
_EXPECTED_LAMBDAS = {
constants.SIMULATION_PROTOCOL.CHARGED: 'charge:24',
constants.SIMULATION_PROTOCOL.FORMALCHARGED: 'charge:24',
constants.SIMULATION_PROTOCOL.COREHOPPING: 'flexible:16',
constants.SIMULATION_PROTOCOL.MACROCYCLE_COREHOPPING: 'flexible:16',
constants.SIMULATION_PROTOCOL.DEFAULT: 'default:12',
constants.SIMULATION_PROTOCOL.FRAGMENT_LINKING: 'flexible:16',
constants.FRAGMENT_LINKING_JOBS.VAL.FRAGMENT_HYDRATION: 'default:24'
}
def _check_edge_sid(edge: "graph.Edge", exp_num_lambdas_complex: int,
exp_num_lambdas_solvent: int):
"""
Check the edge's sid report to check for the appropriate number of
replicas for both the complex and solvent legs
Related cases: DESMOND-8836, DESMOND-8811, DESMOND-8805,
DESMOND-8674, DESMOND-8243, DESMOND-8224, DESMOND-8208.
:param edge: graph.Edge object to check
"""
from schrodinger.application.scisol.packages.fep.graph import Legtype
fep_type = edge.graph.fep_type
legs = (Legtype.COMPLEX.value, Legtype.SOLVENT.value)
sids = [edge.complex_sid, edge.solvent_sid]
expected_num_replicas = [exp_num_lambdas_complex, exp_num_lambdas_solvent]
for leg, sid, exp_num in zip(legs, sids, expected_num_replicas):
assert sid and len(
sid) > 0, f"Edge {edge.short_id} is missing {leg}_sid."
sid_obj = sea.Map(sid)
checked_fep_simulation = False
checked_num_replicas = False
for k in sid_obj['Keywords']:
if 'FEPSimulation' in k:
assert k['FEPSimulation']['PerturbationType'].val == fep_type, \
f'Could not find "{fep_type}" in {leg} sid perturbation_type for {edge.short_id}.'
checked_fep_simulation = True
if 'Replica' in k:
# DESMOND-8811
assert len(k['Replica'].val) == exp_num, \
f"Job ran with wrong number of lambdas. Found {len(k['Replica'].val)}, " \
f"expected {exp_num}."
checked_num_replicas = True
assert checked_fep_simulation, "Did not find FEPSimulation in {edge.short_id} sid."
assert checked_num_replicas, "Did not find Replica in {edge.short_id} sid."
def _check_edge_parched(edge: "graph.Edge", min_frames=21):
"""
Check the parched trajectories and representative structures
for a given edge.
Related cases: DESMOND-8839, DESMOND-8819, DESMOND-8812, DESMOND-8596,
DESMOND-8499, DESMOND-8279, DESMOND-8198.
:param edge: graph.Edge object to check
:param min_frames: Minimum number of frames in the parched trajectories.
Default is 21 for the number of frames in a
standard 5 ns FEP job.
"""
from schrodinger.application.desmond.packages import traj_util
from schrodinger.application.scisol.packages.fep.graph import Legtype
legs = (Legtype.COMPLEX, Legtype.SOLVENT)
assert edge.graph.fmpdb is not None, "graph has no fmpdb file."
if edge.graph.fep_type == constants.FEP_TYPES.ABSOLUTE_BINDING:
lambdas = [0]
else:
lambdas = [0, 1]
edge.graph.fmpdb.open('.')
try:
fmpdb_fname = edge.graph.fmpdb.filename
for leg in legs:
trajs = edge.trajectories(leg)
assert trajs, f"{fmpdb_fname} is missing the trajectory for {edge.short_id} {leg.value}."
for ilambda in lambdas:
lambda_msys, lambda_cms, lambda_traj = traj_util.read_cms_and_traj(
trajs[ilambda][0])
try:
assert len(lambda_traj) >= min_frames, \
f"Trajectory for lambda {ilambda}, edge {edge.short_id}, leg {leg.value} has {len(lambda_traj)} frames, expected at least {min_frames} frames."
for (ct, _) in traj_util.extract_atoms(
lambda_cms,
f'water and not (atom.{constants.ALCHEMICAL_ION} or atom.{constants.CT_TYPE} "{constants.CT_TYPE.VAL.ALCHEMICAL_WATER}")',
lambda_traj):
assert len(ct.molecule) == 200, \
f"Trajectory for lambda {ilambda}, edge {edge.short_id}, leg {leg.value} "\
f" has wrong number of waters. Found {len(ct.molecule)}, expected 200."
except AssertionError:
raise
finally:
if lambda_traj:
# Close the reader to close the filehandle,
# otherwise fmpdb.close() can fail.
# DESMOND-8868
lambda_traj[0].source().close()
# Check representative structure (complex leg only).
leg = Legtype.COMPLEX
repr_fnames = edge.representative_strucs(leg)
assert repr_fnames is not None, f"{fmpdb_fname} is missing the representative struc for {edge.short_id} {leg.value}."
for ilambda in lambdas:
lambda_repr_ct = structure.Structure.read(repr_fnames[ilambda])
assert lambda_repr_ct.atom_total > 0, \
f"Representative structure for lambda {ilambda}, edge {edge.short_id}, leg {leg.value}, has no atoms."
except AssertionError:
raise
finally:
edge.graph.fmpdb.close()
def _check_graph_environment(g: "graph.Graph", membrane=False):
"""
Check the graph environment structures.
:param g: The graph object to check.
:param membrane: Set to True if this is a membrane fep run.
Default is False.
"""
from schrodinger.application.scisol.packages.fep import fepmae
if g.fep_type == constants.FEP_TYPES.SMALL_MOLECULE:
environments = ["receptor"]
elif g.fep_type == constants.FEP_TYPES.METALLOPROTEIN:
environments = ["receptor"]
elif g.fep_type == constants.FEP_TYPES.COVALENT_LIGAND:
environments = []
elif g.fep_type == constants.FEP_TYPES.PROTEIN_STABILITY:
environments = []
elif g.fep_type == constants.FEP_TYPES.PROTEIN_SELECTIVITY:
environments = ["receptor"]
elif g.fep_type == constants.FEP_TYPES.LIGAND_SELECTIVITY:
environments = ["receptor"]
elif g.fep_type == constants.FEP_TYPES.ABSOLUTE_BINDING:
environments = ["receptor"]
else:
raise ValueError(f'unknown fep_type: {g.fep_type}')
if membrane:
environments.extend(["membrane", "solvent"])
for environment in environments:
env_ct = getattr(g, f'{environment}_struc')
assert env_ct is not None, f"graph {environment}_struc is missing."
results = fepmae.filter_receptors_and_ligands([env_ct])
results_dict = {
'receptor': results[0],
'solvent': results[1],
'membrane': results[2],
'ligands': results[3]
}
# For ligand selectivity, the ligand is stored in the receptor_struc
if environment == "receptor" and g.fep_type == constants.FEP_TYPES.LIGAND_SELECTIVITY:
assert len(results_dict['ligands']) == 1, \
f"{g.fep_type} should have one ligand as the receptor. Found {len(results_dict['ligands'])} ligand(s). {results_dict}"
results_dict['receptor'] = results_dict['ligands'][0]
results_dict['ligands'] = []
for k, v in results_dict.items():
if k == environment:
found_ct = results_dict[environment]
found_title = found_ct.title if hasattr(found_ct,
'title') else found_ct
env_title = env_ct.title if hasattr(env_ct, 'title') else env_ct
assert env_ct is v, \
f"graph {environment}_struc is not correct. Found {found_title}, expected {env_title}."
else:
assert not v, \
f"graph {environment}_struc is missing or has wrong type."
found_env_count = len([ct for ct in g.environment_struc if ct is not None])
exp_env_count = len(environments)
assert found_env_count == exp_env_count, \
f'graph environment_struc is not correct. Found {found_env_count}, expected {exp_env_count} ct(s).'
def _check_graph_nodes(g: "graph.Graph", node_ids: List[str]):
"""
Check the node structures for a graph.
:param g: The graph object to check.
:param node_ids: List of expected node ids.
"""
assert g.number_of_nodes() > 0, "No nodes found in graph"
found_node_ids = [node.short_id for node in g.nodes_iter()]
extra_node_ids = set(found_node_ids) - set(node_ids)
missing_node_ids = set(node_ids) - set(found_node_ids)
assert len(extra_node_ids) == 0, \
f'Extra nodes found in graph: {extra_node_ids}'
assert len(missing_node_ids) == 0, \
f'Missing nodes in graph: {missing_node_ids}'
if g.fep_type in [
constants.FEP_TYPES.SMALL_MOLECULE,
constants.FEP_TYPES.METALLOPROTEIN,
constants.FEP_TYPES.PROTEIN_SELECTIVITY,
constants.FEP_TYPES.LIGAND_SELECTIVITY,
constants.FEP_TYPES.ABSOLUTE_BINDING,
]:
for node in g.nodes_iter():
assert node.struc is not None, \
f"graph node {node.short_id} has empty struc."
assert node.protein_struc is None, \
f"graph node {node.short_id} has non-empty protein_struc."
elif g.fep_type in [
constants.FEP_TYPES.COVALENT_LIGAND,
constants.FEP_TYPES.PROTEIN_STABILITY
]:
for node in g.nodes_iter():
assert node.struc is not None, \
f"graph node {node.short_id} has empty struc."
assert node.protein_struc is not None, \
f"graph node {node.short_id} has empty protein_struc."
def _check_fep_backend_log(log_contents: str):
"""
Check the Desmond backend log for expected contents.
:param log_contents: Desmond backend log file contents.
"""
assert 'GPU Desmond' in log_contents, \
'GPU Desmond is not detected.'
assert 'FEP_GPGPU' in log_contents or 'FEP_GPGPU' in log_contents, \
'Desmond is missing FEP_GPGPU license checkout in backend log file.'
def _check_fep_cms(cms_fname: str,
cts: List[structure.Structure],
leg: str,
fep_type: constants.FEP_TYPES,
membrane=False):
"""
Check input and output cms files for missing components.
Related cases: DESMOND-7328, DESMOND-8602, DESMOND-8894.
:param cms_fname: Filename of the cms.
:param cts: List of structures from the cms.
:param leg: Name of the leg to check.
:param fep_type: Type of fep used to run the job.
:param membrane: Set to True if this is a membrane fep run.
Default is False.
"""
from schrodinger.application.scisol.packages.fep.graph import Legtype
ffio_types = Counter()
for ct in cts:
ffio_types[ct.property[constants.CT_TYPE]] += 1
if fep_type in [constants.FEP_TYPES.SMALL_MOLECULE]:
expected_ffio_types = {
constants.CT_TYPE.VAL.FULL_SYSTEM: 1,
constants.CT_TYPE.VAL.SOLUTE: 3 if leg == Legtype.COMPLEX.value else
2,
constants.CT_TYPE.VAL.SOLVENT: 1,
}
if leg in constants.FRAGMENT_LINKING_HYDRATION_JOBS:
expected_ffio_types[constants.CT_TYPE.VAL.SOLUTE] = 1
if membrane and leg == Legtype.COMPLEX.value:
expected_ffio_types[constants.CT_TYPE.VAL.MEMBRANE] = 1
elif fep_type in [
constants.FEP_TYPES.PROTEIN_SELECTIVITY,
constants.FEP_TYPES.LIGAND_SELECTIVITY
]:
expected_ffio_types = {
constants.CT_TYPE.VAL.FULL_SYSTEM: 1,
constants.CT_TYPE.VAL.SOLUTE: 3 if leg == Legtype.COMPLEX.value else
2,
constants.CT_TYPE.VAL.SOLVENT: 1,
}
if membrane:
expected_ffio_types[constants.CT_TYPE.VAL.MEMBRANE] = 1
elif fep_type in [
constants.FEP_TYPES.METALLOPROTEIN,
constants.FEP_TYPES.COVALENT_LIGAND,
constants.FEP_TYPES.PROTEIN_STABILITY
]:
expected_ffio_types = {
constants.CT_TYPE.VAL.FULL_SYSTEM: 1,
constants.CT_TYPE.VAL.SOLUTE: 2,
constants.CT_TYPE.VAL.SOLVENT: 1,
}
if membrane and leg == Legtype.COMPLEX.value:
expected_ffio_types[constants.CT_TYPE.VAL.MEMBRANE] = 1
else:
assert False, f"Unknown fep_type: {fep_type}"
for ffio_type, count in expected_ffio_types.items():
assert ffio_types[ffio_type] == count, \
f'cms file {cms_fname} does not have {count} {ffio_type} ct(s), found {ffio_types[ffio_type]} ct(s). ' \
f'{ffio_types}'
def _check_fep_in_cfg(cfg_contents: str,
edge: "graph.Edge",
leg: str,
expected_lambdas=_EXPECTED_LAMBDAS):
"""
Check the in.cfg file given an graph edge.
:param cfg_contents: in.cfg file contents
:param edge: Corresponding edge from the graph.
:param leg: Name of the leg to check.
:param expected_lambdas: Expected number of lambdas for each supported FEP type.
If not given, use the default for each type.
"""
cfg = sea.Map(cfg_contents)
expected_lambda = expected_lambdas[edge.simulation_protocol]
if edge.is_fragment_linking and leg in constants.FRAGMENT_LINKING_HYDRATION_JOBS:
expected_lambda = expected_lambdas[
constants.FRAGMENT_LINKING_JOBS.VAL.FRAGMENT_HYDRATION]
assert cfg.fep['lambda'].val == expected_lambda, \
f'Wrong lambda schedule in in.cfg. " \
"Found {cfg.fep["lambda"].val}, expected: {expected_lambda}'
# Make sure macrocycle core-hopping has correct soft bond alpha
if edge.simulation_protocol == constants.SIMULATION_PROTOCOL.MACROCYCLE_COREHOPPING:
assert pytest.approx(
cfg.backend['soft_bond_alpha'].val
) == 0.5, "Macrocycle missing soft bond alpha in in.cfg."
def _check_deltaE(deltaE_contents: str, threshold=300):
"""
Check the deltaE file contents to make sure all of the energy
Related cases: DESMOND-9425, DESMOND-9429.
:param deltaE_contents: Contents of the deltaE.txt file.
:param threshold: Maximum absolute value for the energy in kcal/mol.
"""
vals = []
for line in deltaE_contents.split('\n'):
line = line.strip()
match = REPLICA.match(line)
if match:
ef = float(match.group(3))
er = float(match.group(4))
vals.extend([ef, er])
vals = numpy.array(vals)
msg = "deltaE values larger than threshold"
assert numpy.abs(
vals.min()) < threshold, f"{msg}: |{vals.min()}| > {threshold}"
assert numpy.abs(
vals.max()) < threshold, f"{msg}: |{vals.max()}| > {threshold}"
def _check_fep_launcher(launcher_path: str,
g: "graph.Graph",
membrane=False,
deltaE_threshold=None,
expected_lambdas=_EXPECTED_LAMBDAS):
"""
Check the output files generated by the FepLauncher stage.
Related cases: DESMOND-7831, DESMOND-6582.
:param launcher_path: Path to the output files of the fep launcher stage.
:param g: Graph object to use for checking the output files.
:param membrane: Set to True if this is a membrane fep run.
Default is False.
:param deltaE_threshold: If not None, maximum absolute value for the deltaE energy in
kcal/mol.
:param expected_lambdas: Expected number of lambdas for each supported FEP type.
If not given, use the default for each type.
"""
from schrodinger.application.scisol.packages.fep.graph import Legtype
job_name = '_'.join(os.path.basename(launcher_path).split('_')[:-1])
for e in g.edges_iter():
from_id, to_id = e.short_id
fep_type = g.fep_type
legs = [Legtype.COMPLEX.value, Legtype.SOLVENT.value]
if e.is_fragment_linking:
legs += list(constants.FRAGMENT_LINKING_HYDRATION_JOBS)
for leg in legs:
subjob_name = f'{job_name}_{from_id}_{to_id}_{leg}'
# Check that the job completed
multisim_log_fname = f'{os.path.join(launcher_path, subjob_name)}_multisim.log'
standard_workups.parse_log_file(multisim_log_fname,
'Multisim completed')
with open(multisim_log_fname) as f:
for line in f:
m = re.match(r'.*?stage (\d+) - lambda_hopping.*?', line)
if m:
lambda_hopping_idx = int(m.group(1))
break
# Check the lambda hopping out.tgz
lambda_hopping_tgz = os.path.join(
launcher_path, f'{subjob_name}_{lambda_hopping_idx}-out.tgz')
# /some/path/<JOBNAME>_8-out.tgz -> <JOBNAME>_8
lambda_hopping_stage = os.path.basename(
lambda_hopping_tgz.replace('-out.tgz', ''))
with tarfile.open(lambda_hopping_tgz) as tar:
tar_path = os.path.join(f'{subjob_name}_fep1',
lambda_hopping_stage)
if leg in constants.FRAGMENT_LINKING_HYDRATION_JOBS:
tar_path = lambda_hopping_stage
basename = os.path.join(tar_path, lambda_hopping_stage)
log_contents = tar.extractfile(basename +
'.log').read().decode('latin-1')
_check_fep_backend_log(log_contents)
# Check the -in.cfg file
# DESMOND-8811
cfg_contents = tar.extractfile(
f'{basename}-in.cfg').read().decode('latin-1')
_check_fep_in_cfg(cfg_contents, e, leg, expected_lambdas)
# Check the deltaE.txt file
# DESMOND-9445
if deltaE_threshold is not None:
deltaE = tar.extractfile(
f'{tar_path}/deltaE.txt').read().decode('latin-1')
_check_deltaE(deltaE, deltaE_threshold)
# Check cms files
def is_cms(fname) -> bool:
return fname.endswith('.cms') or fname.endswith(
'.cmsgz') or fname.endswith('.cms.gz')
assert len([m.name for m in tar.getmembers() if is_cms(m.name)]), \
f'No cms files found in {lambda_hopping_tgz}.'
for m in tar.getmembers():
if is_cms(m.name):
cms_fname = m.name
cms_contents_fobj = tar.extractfile(cms_fname)
if m.name.endswith('gz'):
cms_contents_fobj = gzip.GzipFile(
mode='r', fileobj=cms_contents_fobj)
replica_cts = list(
structure.StructureReader.fromString(
cms_contents_fobj.read().decode('latin-1')))
_check_fep_cms(cms_fname,
replica_cts,
leg,
fep_type,
membrane=membrane)
def _check_edge_values(edge: "graph.Edge", values: Dict[str, object]):
"""
Check values associated with the graph edge.
:param edge: Edge to check
:param values: Dictionary with the keys 'simulation_protocol',
'ddg', 'ddg_err'. The values in the edge are compared
with the corresponding values here.
"""
# Check the simulation protocol
if values.get('simulation_protocol'):
assert edge.simulation_protocol == values['simulation_protocol'], \
f"Edge {edge.short_id} has incorrect simulation protocol. " \
f"Found {edge.simulation_protocol}, expected {values['simulation_protocol']}"
# Check free energy values.
if values.get('ddg'):
assert pytest.approx(edge.bennett_ddg.val, abs=0.5) == values['ddg'], \
f'{edge.short_id} bennett_ddg.val different. ' \
f'Found {edge.bennett_ddg.val}, expected {values["ddg"]}.'
assert pytest.approx(edge.bennett_ddg.unc, abs=0.5) == values['ddg_err'], \
f'{edge.short_id} bennett_ddg.unc different. ' \
f'Found {edge.bennett_ddg.unc}, expected {values["ddg_err"]}.'
assert edge.bennett_ddg is not None, f'{edge.short_id} bennett_ddg is None'
assert edge.ccc_ddg is not None, f'{edge.short_id} ccc_ddg is None'
[docs]def desmond_check_fep_results(
fmp_fname: str,
launcher_path: str,
fep_type: constants.FEP_TYPES,
expected: Dict[Tuple[str, str], Dict[str, object]],
membrane: bool = False,
min_frames: int = 21,
skip_parched: bool = False,
skip_sid: bool = False,
edges_to_skip_parched_check: Optional[List[Tuple[str, str]]] = None,
deltaE_threshold: Optional[float] = None,
expected_lambdas=_EXPECTED_LAMBDAS):
"""
Check the fep results given the fep type and an `expected`
a dictionary mapping expected edge short ids to their
corresponding attributes.
:param fmp_fname: Name of the output fmp to check.
:param launcher_path: Path containing the output of the FepLauncher stage.
:param fep_type: Type of fep used to run the job.
:param expected: Dictionary mapping edge short ids to
a dictionary of attributes to check.
The attributes are the simulation_protocol,
with the values `constants.SIMULATION_PROTOCOL`,
'ddg', 'ddg_err' with the values corresponding to
the free energy. If the attributes are not present,
the corresponding checks are skipped.
:param membrane: Set to True if this is a membrane fep run.
Default is False.
:param min_frames: Minimum number of frames in the parched trajectories.
Default is 21 for the number of frames in a
standard 5 ns FEP job.
:param skip_parched: If True, skip checking the parched trajectories.
Default is False.
:param skip_sid: If True, skip checking for sid analysis. Default is False.
:param edges_to_skip_parched_check: A list of short edge ids for which to
skip checking for parched trajectory
when skip_parched is False. If
skip_parched is True then all edges
will be skipped.
:param deltaE_threshold: If not None, maximum absolute value for the deltaE energy in
kcal/mol.
:param expected_lambdas: Expected number of lambdas for each supported FEP type.
If not given, use the default for each type.
"""
from schrodinger.application.scisol.packages.fep.graph import Graph
g = Graph.deserialize(fmp_fname)
id2edge = {e.short_id: e for e in g.edges_iter()}
edge_ids = set(expected.keys())
node_ids = {a[0] for a in edge_ids}.union({a[1] for a in edge_ids})
missing_in_graph = edge_ids - set(id2edge.keys())
additional_in_graph = set(id2edge.keys()) - edge_ids
assert len(missing_in_graph) == 0, \
f"Missing edges in graph: {missing_in_graph}"
assert len(additional_in_graph) == 0, \
f"Additional edges in graph: {additional_in_graph}"
# Check edges.
for edge_id, values in expected.items():
e = id2edge.get(edge_id)
# Check that edge is in the graph.
assert e is not None, f"{edge_id} not in {fmp_fname}."
_check_edge_values(e, values)
if not skip_sid:
expected_num_replicas = int(
expected_lambdas[e.simulation_protocol].split(':')[-1])
_check_edge_sid(e, expected_num_replicas, expected_num_replicas)
edges_to_skip_parched_check = edges_to_skip_parched_check or []
if not skip_parched and edge_id not in edges_to_skip_parched_check:
_check_edge_parched(e, min_frames)
_check_graph_environment(g, membrane)
_check_graph_nodes(g, node_ids)
_check_fep_launcher(launcher_path, g, membrane, deltaE_threshold,
expected_lambdas)
return True
[docs]def desmond_check_ab_fep_results(
fmp_fname: str,
expected_values: Dict[Tuple[str, str], Dict[str, object]],
min_frames: int,
expected_lambdas_complex: int,
expected_lambdas_solvent: int,
membrane: bool = False,
):
from schrodinger.application.scisol.packages.fep.graph import Graph
g = Graph.deserialize(fmp_fname)
id2edge = {e.short_id: e for e in g.edges_iter()}
edge_ids = set(expected_values.keys())
node_ids = {a[0] for a in edge_ids}.union({a[1] for a in edge_ids})
missing_in_graph = edge_ids - set(id2edge.keys())
additional_in_graph = set(id2edge.keys()) - edge_ids
assert len(missing_in_graph) == 0, \
f"Missing edges in graph: {missing_in_graph}"
assert len(additional_in_graph) == 0, \
f"Additional edges in graph: {additional_in_graph}"
# Check edges.
for edge_id, values in expected_values.items():
e = id2edge.get(edge_id)
# Check that edge is in the graph.
assert e is not None, f"{edge_id} not in {fmp_fname}."
_check_edge_values(e, values)
_check_edge_sid(e, expected_lambdas_complex, expected_lambdas_solvent)
_check_edge_parched(e, min_frames)
_check_graph_environment(g, membrane)
_check_graph_nodes(g, node_ids)
return True
[docs]def desmond_check_memory_usage(logfile: str,
cpu_limits: List[float],
gpu_limits: Optional[List[float]] = None):
"""
Get statistics on memory usage printed to desmond logfile and compare
to specified inputs.
:param logfile: The logfile containing the memory usages
:param cpu_limits: The maximum acceptable value for the mean and maximum CPU
memory usage in kB
:param gpu_limits: The maximum acceptable value for the mean and maximum GPU
memory usage in kB [optional]
"""
types_to_limits = {'CPU': cpu_limits}
if gpu_limits:
types_to_limits['GPU'] = gpu_limits
memory_usages = defaultdict(list)
with open(logfile) as f:
for line in f.readlines():
match = re.search(r'using (\d+) kB of (.*?) memory', line)
if match is not None:
usage = match.group(1)
memory_usages[match.group(2)].append(int(usage))
def check_limits(usages, limits, memtype):
if not usages:
raise ValueError(
f"No {memtype} memory usage statements found in log "
f"file")
usages_arr = numpy.array(usages)
mean_usage = usages_arr.mean()
max_usage = usages_arr.max()
mean_limit, max_limit = limits
print(f"{memtype} usage mean: {mean_usage}, max {max_usage}")
return mean_usage < mean_limit and max_usage < max_limit
return all([
check_limits(memory_usages[memtype], limits, memtype)
for memtype, limits in types_to_limits.items()
])
[docs]def custom_charge_ct_count(input_mae_fname: str) -> int:
"""
Return the count of cts that have a custom charge block.
"""
count = 0
with open(input_mae_fname, 'r') as f:
for line in f.readlines():
if 'mmffld_custom_prop' in line:
count += 1
return count
if __name__ == "__main__":
def extract_summary(string):
summary = ""
for line in string.split("\n"):
line = line.strip()
if line:
summary += " " + line
elif summary:
return summary.strip()
if '-h' in sys.argv and len(sys.argv) > 2:
for arg in sys.argv[2:]:
if arg in dir():
print(arg + ":")
print(eval(arg).__doc__)
elif len(sys.argv) > 2 and sys.argv[1] in dir():
if eval(sys.argv[1])(sys.argv[2:]):
print(" Success!")
else:
print(" Failure.")
else:
print("usage: %s <workup> <args>\n" % os.path.split(__file__)[-1])
print(extract_summary(__doc__), "\n")
print("available workups:")
for method in dir():
if method.startswith("desmond"):
method = eval(method)
description = method.__doc__
summary = extract_summary(description)
print(f" {method.__name__}: {summary}")