from past.builtins import cmp
import schrodinger.utils.log as log
from schrodinger.application.desmond import constants
logger = log.get_output_logger(name="fep_util")
logger.setLevel(log.DEBUG)
[docs]def check1_subst_code(source_ct, dest_ct):
"""
take input cts and check single ring-atom/attachment subst_code, return true if pass
"""
def find_marked_atoms(ct):
atoms_98 = []
atoms_99 = []
atoms_97 = []
for atom in ct.atom:
if atom.property[constants.FEP_SUBST] == 98:
atoms_98.append(atom)
if atom.property[constants.FEP_SUBST] == 97:
atoms_97.append(atom)
if atom.property[constants.FEP_SUBST] == 99:
atoms_99.append(atom)
return (atoms_98, atoms_97, atoms_99)
(source_98_atoms, source_97_atoms,
source_99_atoms) = find_marked_atoms(source_ct)
(dest_98_atoms, dest_97_atoms, dest_99_atoms) = find_marked_atoms(dest_ct)
# print source_98_atoms, source_97_atoms, source_99_atoms
if len(source_98_atoms) == 0 or len(dest_98_atoms) == 0:
logger.error("No anchor atoms found")
return False
if len(source_99_atoms) == 0 and len(dest_99_atoms) == 0 and len(
source_97_atoms) == 0 and len(dest_97_atoms) == 0:
logger.error("No bridge atoms found")
return False
if len(source_97_atoms) != len(dest_97_atoms):
logger.error(
"Number of linker/ring atoms changed not the same in product and reatant"
)
return False
if (len(source_99_atoms) > 1 or len(dest_99_atoms) > 1
) and (len(source_98_atoms) > 1 or len(dest_98_atoms)) > 1:
logger.error("Too many attachment point detected")
return False
if len(source_97_atoms) == 1 or len(dest_97_atoms) == 1:
if len(source_98_atoms) < 2 or len(dest_98_atoms) < 2:
logger.error("Not enough connection for ring/linker atom")
return False
for atoms_98 in source_98_atoms:
if atoms_98 not in source_97_atoms[0].bonded_atoms:
logger.error(
"anchor atom not bonded to ring/linker atom in original molecule"
)
return False
for atoms_98 in dest_98_atoms:
if atoms_98 not in dest_97_atoms[0].bonded_atoms:
logger.error(
"anchor atom not bonded to ring/linker atom in mutated molecule"
)
return False
return True
[docs]def check_subst_code(source_ct, dest_ct):
"""
Take input cts and check multiple ring-atom/attachment subst_code, return
true if pass
:return: (error_code, msg) where error_code:
- 0 no inconsistance detected
- 1 too complicated, need manual inspection
- -1 error
"""
def find_bridge_atoms(atom):
"""
input atom object
return (98_atoms, 97_atoms)
"""
bridge_atoms = []
ring_atoms = []
#find all bridge atoms attached to anchor
for a in atom.bonded_atoms:
if a.property[constants.FEP_SUBST] % 100 == 98:
bridge_atoms.append(atom)
elif a.property[constants.FEP_SUBST] % 100 == 97:
ring_atoms.append(atom)
return (bridge_atoms, ring_atoms)
def find_marked_atoms(ct, n=0):
atoms_98 = []
atoms_99 = []
atoms_97 = []
for atom in ct.atom:
if atom.property[constants.FEP_SUBST] == 98 + n * 100:
atoms_98.append(atom)
if atom.property[constants.FEP_SUBST] == 97 + n * 100:
atoms_97.append(atom)
if atom.property[constants.FEP_SUBST] == 99 + n * 100:
atoms_99.append(atom)
return (atoms_98, atoms_97, atoms_99)
for i in range(20):
(source_98_atoms, source_97_atoms,
source_99_atoms) = find_marked_atoms(source_ct, i)
(dest_98_atoms, dest_97_atoms,
dest_99_atoms) = find_marked_atoms(dest_ct, i)
# print source_98_atoms, source_97_atoms, source_99_atoms
if i == 0:
if len(source_98_atoms) == 0 or len(dest_98_atoms) == 0:
logger.error("No anchor atoms found")
return (-1, "No anchor atoms found, identic molecules?")
else:
if len(source_98_atoms) == 0 and len(dest_98_atoms) == 0:
logger.info("No more anchor atoms found %d" % (i * 100 + 98))
continue
# return (0, "pass")
if len(source_99_atoms) == 0 and len(dest_99_atoms) == 0 and len(
source_97_atoms) == 0 and len(dest_97_atoms) == 0:
if len(source_98_atoms) > 1 or len(dest_98_atoms) > 1:
logger.error("extra anchor atoms found source: %s dest: %s" % ([
(x.index, x.property[constants.FEP_SUBST])
for x in source_98_atoms
], [(x.index, x.property[constants.FEP_SUBST])
for x in dest_98_atoms]))
return (-1, "extra anchor atoms found source: %s dest: %s" % ([
(x.index, x.property[constants.FEP_SUBST])
for x in source_98_atoms
], [(x.index, x.property[constants.FEP_SUBST])
for x in dest_98_atoms]))
#check if there is a 97 atom connect to the anchor atom
source_anchor = source_98_atoms[0]
source_bridge_atoms = []
for atom in source_anchor.bonded_atoms:
if atom.property[constants.FEP_SUBST] % 100 == 97:
source_bridge_atoms.append(atom)
dest_anchor = dest_98_atoms[0]
dest_bridge_atoms = []
for atom in dest_anchor.bonded_atoms:
if atom.property[constants.FEP_SUBST] % 100 == 97:
dest_bridge_atoms.append(atom)
if len(source_bridge_atoms) == 0 and len(dest_bridge_atoms) == 0:
logger.error("No bridge atoms found")
return (-1, "No bridge atoms found")
if len(source_97_atoms) != len(dest_97_atoms):
logger.error(
"Number of linker/ring atoms changed not the same in product and reatant"
)
return (
-1,
"Number of linker/ring atoms changed not the same in product and reatant"
)
if (len(source_99_atoms) > 1 or len(dest_99_atoms) > 1
) and (len(source_98_atoms) > 1 or len(dest_98_atoms)) > 1:
logger.error("Too many attachment point detected")
return (-1, "Too many attachment point detected")
source_bridge_atoms = []
source_anchor = source_98_atoms[0]
#find all bridge atoms attached to anchor
for atom in source_anchor.bonded_atoms:
if atom.property[constants.FEP_SUBST] % 10 == 9:
source_bridge_atoms.append(atom)
if (len(source_bridge_atoms) > 1):
# logger.error("source: %s attachments connected to the same anchor atom %d" % ([x.index for x in source_bridge_atoms], source_anchor.index))
# return (-1, "source: %s attachments connected to the same anchor atom %d" % ([x.index for x in source_bridge_atoms], source_anchor.index))
anchor_code = source_anchor.property[constants.FEP_SUBST]
bridge_codes = [
a.property[constants.FEP_SUBST] for a in source_bridge_atoms
]
ref_bg_codes = [
99 - x * 10 for x in range(len(source_bridge_atoms))
]
ref_bg_codes.sort()
bridge_codes.sort()
if cmp(ref_bg_codes, bridge_codes) != 0:
return (-1,
"wrong codes on multiple changes on source atom %d" %
source_anchor.index)
dest_bridge_atoms = []
dest_anchor = dest_98_atoms[0]
#find all bridge atoms attached to anchor
for atom in dest_anchor.bonded_atoms:
if atom.property[constants.FEP_SUBST] % 10 == 9:
dest_bridge_atoms.append(atom)
if (len(dest_bridge_atoms) > 1):
# logger.error("dest: %s attachments connected to the same anchor atom %d" % ([(x.index, x.property[constants.FEP_SUBST]) for x in dest_bridge_atoms], dest_anchor.index))
# return (-1, "dest: %s attachments connected to the same anchor atom %d" % ([(x.index, x.property[constants.FEP_SUBST]) for x in dest_bridge_atoms], dest_anchor.index))
anchor_code = dest_anchor.property[constants.FEP_SUBST]
bridge_codes = [
a.property[constants.FEP_SUBST] for a in dest_bridge_atoms
]
ref_bg_codes = [99 - x * 10 for x in range(len(dest_bridge_atoms))]
ref_bg_codes.sort()
bridge_codes.sort()
if cmp(ref_bg_codes, bridge_codes) != 0:
return (-1, "wrong codes on multiple changes on dest atom %d" %
source_anchor.index)
if len(source_97_atoms) == 1 or len(dest_97_atoms) == 1:
if len(source_98_atoms) < 2 or len(dest_98_atoms) < 2:
# logger.error("Not enough connection for ring/linker atom")
# return (-1, "Not enough connection for ring/linker atom")
(source_bridge_atoms,
source_ring_atoms) = find_bridge_atoms(source_97_atoms[0])
if len(source_bridge_atoms) + len(source_ring_atoms) < 2:
#at least two bridge/linker/atom connected to ring/linker
return (
-1,
"not enough ring/linker atoms connected to source ring/linker atoms %d"
% (source_anchor.index))
(dest_bridge_atoms,
dest_ring_atoms) = find_bridge_atoms(dest_97_atoms[0])
if len(dest_bridge_atoms) + len(dest_ring_atoms) < 2:
return (
-1,
"not enough ring/linker atoms connected to dest ring/linker atoms %d"
% (dest_anchor.index))
for atoms_98 in source_98_atoms:
if atoms_98 not in source_97_atoms[0].bonded_atoms:
logger.error(
"anchor atom not bonded to ring/linker atom in original molecule"
)
return (
-1,
"source anchor atom not bonded to ring/linker atom in original molecule"
)
for atoms_98 in dest_98_atoms:
if atoms_98 not in dest_97_atoms[0].bonded_atoms:
logger.error(
"dest anchor atom not bonded to ring/linker atom in mutated molecule"
)
return (
-1,
"dest anchor atom not bonded to ring/linker atom in mutated molecule"
)
elif len(source_97_atoms) > 1 or len(dest_97_atoms) > 1:
for (source, dest) in zip(source_97_atoms, dest_97_atoms):
(source_bridge_atoms,
source_ring_atoms) = find_bridge_atoms(source)
if len(source_bridge_atoms) + len(source_ring_atoms) < 2:
#at least two bridge/linker/atom connected to ring/linker
return (
-1,
"not enough ring/linker atoms connected to source ring/linker atoms %d"
% (source.index))
(dest_bridge_atoms, dest_ring_atoms) = find_bridge_atoms(dest)
if len(dest_bridge_atoms) + len(dest_ring_atoms) < 2:
return (
-1,
"not enough ring/linker atoms connected to dest ring/linker atoms %d"
% (dest.index))
logger.info(
"multiple ring/linker atoms detected, need manual inspection")
logger.info("source: %s" %
[(x.index, x.property[constants.FEP_SUBST])
for x in source_97_atoms])
logger.info("dest: %s" % [(x.index, x.property[constants.FEP_SUBST])
for x in dest_97_atoms])
return (
1,
"multiple ring/linker atoms detected, need manual inspection\n source: %s\n dest: %s"
% ([(x.index, x.property[constants.FEP_SUBST])
for x in source_97_atoms
], [(x.index, x.property[constants.FEP_SUBST])
for x in dest_97_atoms]))
return (0, "pass")
[docs]def get_umbrella_mode_command_line(jobname,
subhost,
use_queue=1,
num_gpu=0,
num_cpu=1,
num_windows=12):
"""
Given number of processors, return command line argument
:param jobname: name of the job
:param use_queue: is the job submitted to a queuing system or not
:param subhost: host entry for computing slots
:param num_gpu: number of gpu for each window
:param num_cpu: number of cpu for each window
:param num_windows: number of replicas/windows
:return: list of string for command line argument
"""
ret_val = ["-JOBNAME", jobname, "-mode", "umbrella"]
max_job = num_gpu
num_slot = num_cpu
if num_gpu:
num_slot = num_gpu
else:
max_job = num_windows
num_slot = num_cpu * num_windows
if not use_queue and not num_gpu:
max_job = 1
num_slot = num_cpu
ret_val.append("-maxjob")
ret_val.append("%d" % max_job)
ret_val.append("-HOST")
ret_val.append(subhost + ":%d" % num_slot)
ret_val.append("-cpu")
ret_val.append("%d" % num_cpu)
return ret_val