"""
Utility functions and classes
Copyright Schrodinger, LLC. All rights reserved.
"""
import fnmatch
import glob
import gzip
import hashlib
import os
import random
import re
import shutil
import sys
from pathlib import Path
from typing import Iterable
from typing import List
from typing import Optional
from typing import Tuple
from schrodinger.application.desmond import constants
from schrodinger.application.desmond.constants import FepLegTypes
from schrodinger.application.desmond.constants import SIMULATION_PROTOCOL
from schrodinger.application.desmond.constants import PROTOCOL_TO_POSTFIX
from schrodinger.job import util as jobutil
from schrodinger.structure import Structure
# pwd module is not available for Windows platform.
if (sys.platform == "win32"):
pass
else:
import pwd
[docs]class TestSuite(object):
"""
A small and tight class designed for unit tests.
With this utility, unit test code can be written much faster than with
the 'unittest' module.
"""
[docs] def __init__(self, title):
"""
Sets the title of the suite to 'title' and all internal counters to 0.
"""
self.__title = title
self.__n_case = 0
self.__n_pass = 0
self.__n_fail = 0
self.__suite = []
def __del__(self):
"""
Finishes testing untested cases and prints a simple summary.
"""
if (0 < len(self.__suite)):
self.run()
print("\ntesting summary: %d cases in total, %d passed, %d failed" %
(self.__n_case, self.__n_pass, self.__n_fail))
print("-------%s-------\n\n" % self.__title)
def __lshift__(self, case):
"""
Adds a case into the suite.
Each case must be a tuple of at least two elements:
- 'case[0]' must be one of the following:
- a boolean value:
- False - means the test is failed.
- True - means the test is passed.
- or a callable object that can return a boolean value. The
object will be called later on by this 'TestSuite' object
(via calling to the 'run' method), and the returned value
will be used to judge whether the test is passed on the same
rule as above. If an exception is thrown out of the call to
the object, the test will be considered failed.
- 'case[1]' must be a string, representing the name of the case.
If there are more than two elements, the remaining elements will be
printed or called (if callable) if the test is failed. This helps
diagnose the problem.
"""
self.__suite.append(case)
[docs] def run(self):
"""
Performs testing of all test cases accumulated so far and print
information indicating a particular case is passed or failed.
The user can explicitly call this function as many times as like.
Already tested cases will not be tested again. This means that the
user can do something like this::
tsuite = TestSuite( 'util' )
...
tsuite << case1
tsuite.run() # Testing of 'case1' will be performed
tsuite.run() # Allowed, but meaningless since 'case1' will be tested again.
and so this::
tsuite << case1
tsuite.run()
tsuite << case1
tsuite.run() # Allowed. The 'tsuite' takes the 2nd 'case1' as a new case and performs the testing.
The user usually does NOT need to call this function explicitly
because when the 'TestSuite' object is about to be destructed, it
will automatically check and test any untested cases.
"""
if (0 == self.__n_case):
print("-------%s-------" % self.__title)
for case in self.__suite:
print("testing case '%s'..." % case[1], end=' ')
if (callable(case)):
try:
result = case()
except:
result = False
else:
result = case[0]
self.__n_case += 1
if (result):
print("passed")
self.__n_pass += 1
else:
print("failed")
self.__n_fail += 1
if (2 < len(case)):
for item in case[2:]:
if (callable(item)):
item()
else:
print(item)
self.__suite = []
[docs]class Counter(object):
"""
This class was originally designed for the convenience of gridding
widgets. For example, instead of writing a code like::
my_label1.grid( row = 1, sticky = W )
my_label2.grid( row = 2, sticky = W )
my_label3.grid( row = 3, sticky = W )
we can avoid the hardcoding ('row = 1', etc., which is generally bad and
hard to maintain) using this class. The improved code will look like::
row_index = Counter()
my_label1.grid( row = row_index.val, sticky = W )
my_label2.grid( row = row_index.val, sticky = W )
my_label3.grid( row = row_index.val, sticky = W )
which is equivalent to the above code, but generally easier to write and
modify. The trick is that the property 'val', when read, will return the
current value of the internal counter and then increment the counter
(not the returned value) by 1.
If the user just wants to get the current value of the counter but not
want to change it, s/he can do either one of the two:
1. Use the `'va_'` property,
2. Explicitly convert the object to 'int'.
"""
[docs] def __init__(self, val=0):
"""
Constructs the object. One can provide a value 'val' to initialize
the internal variable. For example::
row_index = Counter( 2 )
will let the counter start from 2, instead of 0 (default value).
"""
self.__val = val
def __int__(self):
"""
Supports conversion to an integer.
"""
return self.__val
def __cmp__(self, other):
"""
Supports comparisons with integers or objects convertible to integers.
"""
return self.__val - int(other)
[docs] def reset(self, val=0):
"""
Resets the counter to 'val'.
"""
self.__val = 0
def __get_val(self):
self.__val += 1
return self.__val - 1
def __get_va_(self):
return self.__val
val = property(
fget=__get_val,
doc=
"Readonly. When read, this returns the value of the current count and then increment the count by"
" 1. The incrementation does not affect the returned value.")
va_ = property(
fget=__get_va_,
doc=
"Readonly. When read, this returns the value of the current count without changing the internal"
" state whatsoever of the object.")
[docs]def remove_file(basename: str,
prefix: List[str] = None,
suffix: List[str] = None):
"""
Tries to delete files (or dirs) whose names are composed by the given
`basename`, a list of prefixes (`prefix`), and a list of suffixes
(`suffix`). No effects if a file (or dir) does not exist.
"""
if (None != prefix and None != suffix):
for pre in prefix:
for suf in suffix:
fname = pre + basename + suf
if (os.path.isfile(fname)):
os.remove(fname)
elif (os.path.isdir(fname)):
shutil.rmtree(fname)
elif (None != prefix):
for pre in prefix:
fname = pre + basename
if (os.path.isfile(fname)):
os.remove(fname)
elif (os.path.isdir(fname)):
shutil.rmtree(fname)
elif (None != suffix):
for suf in suffix:
fname = basename + suf
if (os.path.isfile(fname)):
os.remove(fname)
elif (os.path.isdir(fname)):
shutil.rmtree(fname)
else:
fname = basename
if (os.path.isfile(fname)):
os.remove(fname)
elif (os.path.isdir(fname)):
shutil.rmtree(fname)
[docs]def write_n_ct(fname, struc):
"""
Writes a list of CTs to a file with the name as given by 'fname'.
The CTs in the output file are in the same order as in the list. The
list can contain None elements, which will be ignored. This function has
no effect if the 'struc' is an empty list or contains only Nones.
!!!DEPRECATED!!!
Use `struc.write_structures` instead.
"""
work_struc = [e for e in struc if (e is not None)]
if (work_struc != []):
work_struc[0].write(fname, format="maestro")
for ct in work_struc[1:]:
ct.append(fname, format="maestro")
[docs]def chdir(dir_name):
"""
Changes the current directory to the one of the name 'dir_name'.
If 'dir_name' is '..', then it will change to the parent directory, and
this is is done in a portable way.
"""
if (dir_name == ".."):
os.chdir(os.pardir)
else:
os.chdir(dir_name)
[docs]def parent_dir(dir_name, up=1):
"""
Returns the parent directory name.
:param up: This should be a non-negative integer value indicating the
parent along the path. Default value is 1, indicating the immediate
parent. Value 2, for example, indicates the parent of the immediate
parent directory.
"""
parent = dir_name
for i in range(up):
parent = os.path.dirname(parent)
return parent
[docs]def relpath(xpath, refpath=None):
"""
Given two paths ('xpath' and 'refpath'), returns the relative path of
'xpath' with respect to 'refpath'.
Both 'xpath' and 'refpath' can be relative or absolute paths, and
'refpath' defaults to the current directory if it is not provided.
"""
if (refpath is None):
refpath = os.getcwd()
a = [e for e in os.path.realpath(xpath).split(os.path.sep) if (e != '')]
r = [e for e in os.path.realpath(refpath).split(os.path.sep) if (e != '')]
i = 0
for s in a:
try:
if (s != r[i]):
break
except IndexError:
break
i += 1
p = [".."] * (len(r) - i) + a[i:]
if (p == []):
return ""
return os.path.join(*p)
[docs]def symlink(src_fname, des_fname):
"""
Creates a symbolic link on the current directory to a file as given by
'src_fname'.
This differs from 'os.symlink' in that this function creates a symbolic
link using a relative path. Also if there is already a file with the
same name as 'des_fname', this function will try to delete the file
and then create symbolic link. An exception will be raised, if this
attemp fails.
:param src_fname: The name of the file to link to.
:param def_fname: The name to use for the symbolic link.
"""
src_fname = relpath(src_fname)
try:
if os.path.isfile(des_fname):
os.remove(des_fname)
except OSError:
pass
try:
# Whether or not this works is OS-dependent.
os.symlink(src_fname, des_fname)
except (OSError, NotImplementedError):
# Just copy the darn file.
with open(des_fname, 'wb') as fdst, open(src_fname, 'rb') as fsrc:
shutil.copyfileobj(fsrc, fdst, 8192)
[docs]def is_subdir(xpath, refpath=None):
"""
Given two paths ('xpath' and 'refpath'), returns True if 'xpath' is a
direct or indirect subdirectory of 'refpath'. Also returns True if
'xpath' and 'refpath' are the same.
Both 'xpath' and 'refpath' can be relative or absolute path, and
'refpath' defaults to the current directory if it is not provided.
"""
rp = relpath(xpath, refpath)
if (rp[:2] == ".."):
return False
return True
[docs]def random_string(
n,
char_pool="ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz1234567890"
):
"""
Returns a random string with 'n' chars. The 'n' chars will be taken from
a pool of chars as given by 'char_pool'.
"""
rs = ""
m = len(char_pool)
for i in range(n):
c = random.randint(1, m - 1)
rs += char_pool[c]
return rs
[docs]def getlogin():
"""
Returns the login name if it can be found, or otherwise
'unidentified_user'.
Note: 'os.getlogin' seems to have a bad dependency on the terminal in
use, and as such exception can be thrown when the terminal does not
provide the requested login information. This function is to solve this
problem by trying three different ways to get the login name, and if all
ways failed it will return a valid string that tells the login name can
not be identified.
"""
login = None
if sys.platform == 'win32':
try:
login = os.environ['USERNAME']
except KeyError:
pass
else:
# Original implementation. Changed on request of DESMOND-2899
# try:
# login = os.getlogin()
# except OSError :
# try :
# login = pwd.getpwuid( os.getuid() )[0]
# except (KeyError, NameError,) :
# login = os.environ.get( "USER" )
# Hope this is sufficient in term of portability and robustness, but no guarantee.
try:
login = pwd.getpwuid(os.getuid())[0]
except (
KeyError,
NameError,
):
try:
login = os.getlogin()
except OSError:
login = os.environ.get("USER")
if (login is None):
login = "unidentified_user"
return login
def _strip_bracket(s):
"""
"""
br_open = None
br_close = None
has_dollar = False
i = 0
mask = [1] * len(s)
for c in s:
if (c == '['):
br_open = i
br_close = None
has_dollar = False
elif (c == ']' and br_open is not None):
br_close = i
if (has_dollar):
for j in range(br_open, br_close + 1):
mask[j] = 0
else:
mask[br_open] = 0
mask[br_close] = 0
br_open = None
elif (c == '$'):
has_dollar = True
i += 1
ret = ""
for c, m in zip(s, mask):
if (m):
ret += c
return ret
[docs]def expand_macro(s, macro_dict):
"""
Replaces the macros in the string 's' using the values given by the
macro dictionary 'macro_dict'. The expanded string will be returned.
Macro conventions:
- All macros should start with a single '$', followed by capital
letters, e.g., "$JOBNAME", "$USERNAME".
- Optional macros should be bracketed by '[]', e.g.,
"myjob[_lambda$LAMBDANO]", where "$LAMBDANO" is an optional macro.
- If optional macros are not expanded, the bracketed part of the
string will be discarded.
- Macro values should not contain the following chars: '[', ']', and
'$'.
"""
for m in macro_dict:
s = s.replace(m, str(macro_dict[m]), -1)
return _strip_bracket(s)
# DEPRECATED!!! Use `exit(msg)` instead.
[docs]def fatal(msg: str) -> None:
raise SystemExit(msg)
# DEPRECATED!!! Use `verify_file_exists` instead.
[docs]def ensure_file_exists(fname: str):
"""
Ensure that the file exists and is not empty.
:raise SystemExit: If the file is not found or is empty.
"""
if fname:
verify_file_exists(fname, exit_on_error=True)
[docs]def verify_file_exists(fname_pattern: str, exit_on_error=False) -> str:
"""
Verifies that a single file/path matching `fname_pattern` actually exists
and its size is not zero, and returns the actual file name. If the
verification failed, IOW, no files found or multiple files found or the
file is empty, raises an `IOError` (or `SystemExit' if `exit_on_error` is
true).
"""
Exc = SystemExit if exit_on_error else IOError
fnames = glob.glob(fname_pattern)
if fnames:
if len(fnames) == 1:
fname0 = fnames[0]
if os.path.getsize(fname0):
return fname0
err_msg = [f"ERROR: File is empty: {fname0}"]
else:
err_msg = [
"ERROR: Multiple files found matching pattern "
"`%s`: %s" % (fname_pattern, ", ".join(fnames))
]
else:
err_msg = [f"ERROR: No files found matching pattern: {fname_pattern}"]
# FIXME: When the file is not found, it's quite desirable to take a look at
# the parent directory and know what files are present and their sizes. This
# can be more complicated than listing the contents of the dir because the
# path might be somehow mistaken along the way.
raise Exc("\n".join(err_msg))
[docs]def verify_traj_exists(fname_pattern: str) -> str:
"""
Verifies that one and only one trajectory file (which may be a regular
file or a directory, depending on the trajectory format) matching the given
file name pattern `fname_pattern` actually exists, and returns the name of
the trajectory file if it's found, or raises an `IOError` otherwise.
`fname_pattern` follows the `glob` syntax. If the pattern doesn't contain
a supported trajectory extension name, it will be treated as the pattern of
the base name.
"""
from schrodinger.application.desmond.packages import traj_util
fnames = traj_util.find_trajectories(fname_pattern)
if len(fnames) > 1:
err_msg = ("ERROR: Multiple trajectory files found matching pattern: "
"'%s'\n %s" % (fname_pattern, "\n ".join(fnames)))
elif len(fnames) < 1:
err_msg = ("ERROR: No trajectory files found matching pattern: '%s'" %
fname_pattern)
else:
return fnames[0]
raise IOError(err_msg)
[docs]def time_duration(start_time, end_time, scale=1):
"""
Given the start time and the end time, returns a string that says the
duration between the two time points in the format of 'xh y' z"', where
'x', 'y', and 'z' are hours, minutes, and seconds, respectively.
:param start_time: A time in seconds since the Epoch (i.e. a value as
returned by the 'time.time()' function).
:param end_time: A time in seconds since the Epoch (i.e. a value as
returned by the 'time.time()' function).
"""
duration = (end_time - start_time) * scale
hour = int(duration / 3600.0)
minute = int((duration - hour * 3600.0) / 60.0)
second = int(duration - hour * 3600.0 - minute * 60.0)
return "%sh %s' %s\"" % (
hour,
minute,
second,
)
[docs]def get_product_info(product_name):
"""
Returns a tuple with the following elements:
- 0 - the exec dir
- 1 - the lib dir
- 2 - the version number
- 3 - the platform
All elements are strings.
"""
product_name = product_name.lower()
PRODUCT_NAME = product_name.upper()
PRODUCT_EXEC = PRODUCT_NAME + "_EXEC"
exec_dir_re = re.compile("(.*" + product_name + "-v)" +
"([0-9.]*)/bin/(.*)")
if (PRODUCT_EXEC in os.environ):
exec_dir = os.environ[PRODUCT_EXEC]
else:
# Gets the "exec" and "lib" directories by "hunting".
try:
exec_dir = jobutil.hunt(product_name)
except:
raise Exception("Could not determine MMSHARE_EXEC.")
lib_dir = exec_dir_re.sub(r"\1\2/lib/\3", exec_dir)
ver = exec_dir_re.sub(r"\2", exec_dir)
plat = exec_dir_re.sub(r"\3", exec_dir)
return (
exec_dir,
lib_dir,
ver,
plat,
)
[docs]def html_embed_image(url):
"""
"""
return "<img hspace=20 vspace=20 src=\"%s\"/>" % (url.replace("&",
"&"),)
[docs]def unique(seq: Iterable):
"""
Iterates over a given sequence `seq` in the same order. If there are
duplicate elements, only the first occurence is preserved. For example::
[1, 2, 3, 3, 4, 3, 4, 0] ==> [1, 2, 3, 4, 0]
This function requires that all elements in `seq` are hashable.
"""
added = set()
add = added.add
return (e for e in seq if not (e in added or add(e)))
# DEPRECATED!!! Use `traj_util.find_trajectories` instead.
[docs]def get_traj_filename(basename: str) -> Optional[str]:
# FIXME: this is an ugly solution to get existing trajectory file/folder.
# First, the code will check the existence of *_trj, then *.xtc
# NOTE: What if both of them (_trj and .xtc) exist?
# Ideally trajectory format should be passed from upstream.
supported_formats = ['_trj', '.xtc'] # move elsewhere?
for fm in supported_formats:
fname = f"{basename}{fm}"
if os.path.exists(fname):
return fname
return None
[docs]def parse_res(res):
"""
Return (chain, resnum, inscode) from residue name in the form <chain>:<resnum><inscode>
Input should be in the form <chain>:<resnum><inscode>
<chain> is the chain id or a _ or no character if space
<resnum> is the residue number (possibly negative)
<inscode> is the pdb insertion code
Examples A:12 :23A B:-1 _:12
"""
a = re.search(r"(.{0,1}):(-{0,1}\d+)(.{0,1})", res)
if not a:
raise ValueError(
"Input should be in the form <chain>:<resnum><inscode>")
chain = a.group(1)
if chain == "_" or chain == "":
chain = " "
resnum = int(a.group(2))
inscode = a.group(3)
return chain, resnum, inscode
[docs]def parse_edge_file(fname: str) -> List[Tuple[str, str]]:
""" Rules:
1. An edge is identified by its ID, which is a string of the two node IDs
separated by '_'.
2. Each node ID can be either a full ID or a short ID.
3. Each line in the .edge file should contain at most 1 edge ID.
4. Lines containing only white spaces are allowed, and they will be ignored by the parser.
5. A comment is a string that starts with '#' and ends with '\n',
and it will be ignored by the parser.
:return: A list of edge IDs from the parsed edge file.
"""
edges = []
with open(fname) as fh:
lines = fh.readlines()
for line in lines:
line = line.strip()
if line and not line.startswith("#"):
e = line.split('#')[0]
e = e.replace("-", "_")
e = e.replace(":", "_")
node_ids = tuple(nid.strip() for nid in e.split("_"))
if not (len(node_ids) == 2 and
all(re.match('([a-f]|[0-9]){7,40}', s) for s in node_ids)):
fatal("ERROR: Edge file %s is NOT in the right format.\n" \
"An example for the format of an edge-file:\n" \
" 36da5ad:397128e\n" \
" 33dd5ad:347118e\n" \
" 33fe5ad:3171f8e\n" \
"Each line specifies an edge with the two node's IDs. Each node ID is a hex" \
" number of at least 7 digits. The two IDs are separated by a '_' (or ':' or" \
" '-')." % fname)
edges.append(node_ids)
return edges
[docs]def parse_ligand_file(fname: str) -> List[str]:
r"""
Parse a ligand file with the following format:
1. On each line, a ligand is identified by the hash id
2. Lines containing only white spaces are allowed, and they will be
ignored by the parser.
3. Each line in the .ligand file should contain at most 1 ligand ID.
4. Lines containing only white spaces are allowed, and they will be
ignored by the parser.
5. A comment is a string that starts with '#' and ends with '\n', and
it will be ignored by the parser.
:return: A list of structure hash ids.
"""
ligands = []
with open(fname) as fh:
lines = fh.readlines()
for line in lines:
line = line.strip()
if not line or line.startswith("#"):
continue
lig_id = line.split("#")[0].strip()
if not lig_id or not re.match('([a-f]|[0-9]){7,40}', lig_id):
raise ValueError(
f"ERROR: Unable to parse .ligand file - {fname}\n"
f"Line '{line}' invalid.\n"
f"An example for the format of a ligand-file:\n"
f" 36da5ad # ligand1\n"
f" 347118e\n"
f"Each line specifies a ligand ID with an optional comment "
f"following a #\n")
ligands.append(lig_id)
return ligands
[docs]def write_ligand_file(fname: str, cts: List[Structure]) -> None:
"""
Given a list of structures, write a file containing ligand hash ids
:param fname: Path for output.
:param cts: List of structures.
"""
with open(fname, 'w') as f:
for ct in cts:
f.write(f'{str2hexid(ct.title)} # {ct.title}\n')
[docs]def str2hexid(s: str, full_id=False) -> str:
"""
Returns a unique hex code for the given string `s`. The chances of
returning the same hex code for different values of `s` is low enough
(though not zero in principle), the returned hex code can serve as an ID
of the input string.
By default, the returned hex code is 7 digit long, which is only the
initial part of the full code in 40 digits. To get the latter, set the
argument `full_id=True`.
"""
whole_hex_code = hashlib.sha1(s.encode('utf-8')).hexdigest()
return whole_hex_code if full_id else whole_hex_code[:7]
def _key2flags(k: str) -> List[str]:
"""
Converts a key of a function's keyword argument into a command line flag.
If `k` does NOT have a leading dash, a dash will be prepended.
If `k` has underscores, two flags will be returned: The first has the
underscores, the second has the dashes converted from the underscores.
"""
if k.startswith("DASHFLAG_"):
return ['-' + k[9:].replace('_', '-')]
elif k.startswith("UNDERSCOREFLAG_"):
return ['-' + k[15:]]
flags = [k, k.replace("_", "-")] if '_' in k else [k]
return flags if k.startswith('-') else ['-' + e for e in flags]
[docs]def check_command(cmd: List, *args, **kwargs):
"""
Check the command line arguments against `args` for positional arguments
and `kwargs` for keyword arguments.
For flags like -multiword-flag, the corresponding keyword in `kwargs` is
`multiword_flag` . This function by default does NOT distinguish a
-multiword-flag from a -multiword_flag. Both forms are considered valid
flags. If you want to force a dash flag, prefix the keyword with
`"DASHFLAG_"` , e.g., "DASHFLAG_multiword_flag"; to force a underscore flag,
prefix the keyword with `"UNDERSCOREFLAG_"`
For user's convenience, one can use any types of values in `args` and
`kwargs`. The value will be converted to `str` before checking if it exists
in `cmd`. For example, you can use this key-value pair in `kwargs`:
`maxjob=1`, which is equivalent to `maxjob='1'`.
For keyword arguments that take multiple values, e.g., "-d stage_1-out.tgz
-d stage_2-out.tgz", the values in `kwargs` should be specified in a list,
e.g., `d=["stage_1-out.tgz", "stage_2-out.tgz"]`.
For keyword arguments that take NO values, use `None` as the value in
`kwargs`.
:param cmd: Must satisfy the following requirements:
1. All argument flags should be single dash flags.
2. If an argument that takes a single value but specified multiple times
in `cmd` the right-most specification is in effect.
:raises AssertionError: If any arguments as specified by `args` and `kwargs`
are NOT found in `cmd`.
"""
for arg in args:
assert str(arg) in cmd, str(arg)
reversed_cmd = cmd[-1::-1]
for k, v in kwargs.items():
flags = _key2flags(k)
if isinstance(v, list):
# Multivalue argument. Note it's legitimate for `values` to have
# duplicate elements, and so we cannot use `set` here.
values = [cmd[i + 1] for i, flag in enumerate(cmd) if flag in flags]
# Ensures all elements in `v` exist in `values`.
for e in v:
assert e in values, f'{e} not in {values}'
# If we have duplicate elements in `v`, we have the same number
# of duplicates in `values`.
i = values.index(e)
del values[i]
assert not(values), \
"command has %d more values for flag: %s" % \
(len(values), flags[0])
else:
# For duplicate arguments the right-most one is significant.
for flag in flags:
if flag in cmd:
i = reversed_cmd.index(flag)
if v is not None:
assert i > 0
assert reversed_cmd[i - 1] == str(
v), f'{reversed_cmd[i - 1]} != {str(v)}'
break
else:
assert False, f"command has no flags: ({flags})"
def _commandify_segment(raw_cmd_seg: List) -> List[str]:
"""
Processes a sublist of the raw command. Examples::
['-dew-asl', 'ligand'] ==> ['-dew-asl', 'ligand']
['-n', 200] ==> ['-n', '200']
['-fep-lambda', None] ==> []
['-fep-lambda', 1] ==> ['-fep-lambda', '1']
['-transpose-box', True] ==> ['-transpose-box']
['-transpose-box', False] ==> []
['-fep-lambda', None, ['-protein-fep', True]] ==> []
['-fep-lambda', 1, ['-protein-fep', True]] ==>
['-fep-lambda', '1', '-protein-fep']
# -start-interval expects either a pair of values or a single value.
['-start-interval', [[0], interval]] ==> ['-start-interval', 0, interval]
['-start-interval', [[None], interval]] ==> ['-start-interval', interval]
"""
cmd_seg = []
for e in raw_cmd_seg:
if e is None:
return []
if isinstance(e, list):
cmd_seg += _commandify_segment(e)
else:
cmd_seg.append(str(e))
if 2 == len(cmd_seg):
if cmd_seg[1] == str(False):
# This is a switch argument that should NOT be specified.
return []
if cmd_seg[1] == str(True):
# This is a switch argument that should be specified.
return cmd_seg[:1]
return cmd_seg
[docs]def commandify(raw_cmd: List) -> List[str]:
"""
A `subprocess` command is a list of strings. This is often not the most
convenient data structure for composing the command. For example, if
you have numbers in the command, you have to convert them into strings;
if the value of an argument is `None`, instead of putting the string "None"
into the command, you want to drop the argument altogether.
This function is to make command composition a bit less boiler-plated, by
providing some grammars:
1. A "raw command" is one that can be properly processed by this function to
return a `subprocess` command.
2. A raw command is a list of arbitrary types of objects.
3. For positional arguments, they should be direct and string-convertible
elements in the raw command. If an element is `None` it will be removed
from the returned command.
4. A keyword argument should be specified as a `list` in the raw command.
The first element of the list should be the flag, which again can be of
any string-convertible type. The rest elements should be the values. If
any of the values is `None`, this keyword argument will be removed from
the returned command.
5. A switch argument (which has no values following the flag) is similar to
the keyward argument, except that it should have one and only one value,
of the boolean type. If the value is `True`, the flag will be added into
the returned command; otherwise it will be removed from there.
"""
cmd = []
for e in raw_cmd:
if e is None:
continue
if isinstance(e, list):
cmd += _commandify_segment(e)
else:
cmd.append(str(e))
return cmd
[docs]def use_custom_oplsdir(st):
"""
Determines if the given structure was marked by the System Build panel(s)
to indicate that the custom OPLSDIR in Maestro preferences should be used.
:param st: structure whose properties are to be queried
:type st: structure.Structure
:return: whether to use the Maestro preference custom OPLSDIR
:rtype: bool
"""
if st.property.get(constants.USE_CUSTOM_OPLSDIR):
return True
# Provide backwards compatibility support for original property; independent
# of its value, treat its presence as equivalent to USE_CUSTOM_OPLSDIR=True
return bool(st.property.get("s_ffio_custom_opls_dir"))
[docs]def gz_fname_if_exists(fname: str):
if Path(fname + '.gz').exists():
return fname + '.gz'
return fname
[docs]def copy_and_compress_files(src_dir: str, dest_dir: str, compress_pattern=None):
"""
Copy the files from `src_dir` to `dest_dir`, optionally
compressing a subset of files.
:param compress_pattern: Optional, files that match the pattern
will be gzip compressed and renamed to have a .gz extension.
"""
def _copy(src, dest, **kwargs):
if compress_pattern and fnmatch.fnmatch(src, compress_pattern):
Path(str(dest) + '.gz').write_bytes(
gzip.compress(Path(src).read_bytes(), compresslevel=1))
else:
shutil.copy2(src, dest, **kwargs)
shutil.copytree(src_dir, dest_dir, dirs_exist_ok=True, copy_function=_copy)
[docs]def get_leg_name_from_jobname(jobname: str) -> str:
parts = jobname.split("_")
if parts[-2] == FepLegTypes.SUBLIMATION or parts[
-2] == FepLegTypes.SOLVATION:
# legname will be in form "sublimation_<idx>", e.g. "sublimation_1"
return "_".join(parts[-2:])
elif jobname.endswith(FepLegTypes.FRAGMENT_HYDRATION):
# legname is 'solvent_fragment_hydration'
return FepLegTypes.FRAGMENT_HYDRATION
elif jobname.endswith(FepLegTypes.RESTRAINED_FRAGMENT_HYDRATION):
# legname is 'solvent_restrained_fragment_hydration'
return FepLegTypes.RESTRAINED_FRAGMENT_HYDRATION
return parts[-1]
[docs]def get_leg_type_from_jobname(jobname: str) -> str:
legname = get_leg_name_from_jobname(jobname)
if legname.startswith(FepLegTypes.SUBLIMATION):
# leg_name == leg_type for most leg types, except for sublimation
# sublimation is in form "sublimation_<idx>", e.g. "sublimation_1"
return FepLegTypes.SUBLIMATION
elif legname.startswith(FepLegTypes.SOLVATION):
# For same reason as above
return FepLegTypes.SOLVATION
return legname
[docs]def get_msj_filename(jobname: str,
leg: Optional[FepLegTypes] = None,
protocol: Optional[SIMULATION_PROTOCOL] = None,
extend: Optional[bool] = False) -> str:
"""Return the standardized .msj filename as a string."""
fname = jobname
if protocol:
fname += PROTOCOL_TO_POSTFIX[protocol]
if leg:
fname += f"_{leg}"
if extend:
fname += ".extend"
return f"{fname}.msj"
[docs]def make_structure_dummy(ct: Structure):
"""
Mark structure as the dummy and add a dummy atom. This is needed for FEP
simulations which use the Graph format but don't have traditional lambda
0 and lamdba 1 inputs (e.g. Absolute Binding, Solubility)
"""
ct.title = "dummy"
ct.property[constants.DUMMY_LIGAND] = 1
coords = [0.0, 0.0, 0.0]
new_atom = ct.addAtom('Na', *coords)
new_atom.formal_charge = 0
new_atom.pdbres = 'DU '
return new_atom
[docs]def predict_memory_utilization(fep_type: constants.FEP_TYPES, num_atoms: int,
num_windows: int) -> Tuple[int, int]:
"""
Predict the cpu and gpu memory utilization in MB for an fep job.
"""
if fep_type in [constants.FEP_TYPES.COVALENT_LIGAND] + list(
constants.SELECTIVITY_FEP_TYPES):
cpu_mem = constants.COVALENT_CPU_SLOPE_INTERCEPT[
0] * num_atoms * num_windows + constants.COVALENT_CPU_SLOPE_INTERCEPT[
1]
gpu_mem = constants.COVALENT_GPU_SLOPE_INTERCEPT[
0] * num_atoms * num_windows + constants.COVALENT_GPU_SLOPE_INTERCEPT[
1]
else:
cpu_mem = constants.CPU_SLOPE_INTERCEPT[
0] * num_atoms * num_windows + constants.CPU_SLOPE_INTERCEPT[1]
gpu_mem = constants.GPU_SLOPE_INTERCEPT[
0] * num_atoms * num_windows + constants.GPU_SLOPE_INTERCEPT[1]
return cpu_mem, gpu_mem