'''
Thin LigPrep wrapper and related subroutines (for PHASE-2070).
'''
import json
from voluptuous import Invalid
from voluptuous import Schema
from schrodinger.infra import mm
from schrodinger.utils import fileutils
from schrodinger.utils import log
logger = log.get_output_logger('shape.shape_ligprep')
try:
from schrodinger.application.macromodel.packages.ligprep3 import arguments
from schrodinger.application.macromodel.packages.ligprep3 import driver
from schrodinger.application.macromodel.packages.ligprep3 import ligprepworkflow
split_structures = driver.split_structures
except ImportError:
logger.error("LigPrep is not available")
# -----------------------------------------------------------------------------
LIGPREP_INPUT_OPTION = {
fileutils.SD: '-isd',
fileutils.MAESTRO: '-imae',
fileutils.SMILES: '-ismi',
fileutils.SMILESCSV: '-ismi'
}
# -----------------------------------------------------------------------------
# =============================================================================
# Business logic
# =============================================================================
[docs]class LigPrep(object):
[docs] def __init__(self, input_file, options='[]'):
'''
:param input_file: Input file name.
:type input_file: str
:param options: Ligprep command line options (in JSON format).
:type options: str
:raises RuntimeError: If something goes wrong.
'''
input_format = get_structure_file_format(input_file)
if input_format not in LIGPREP_INPUT_OPTION:
raise RuntimeError("ligprep does not support '%s' input format" %
input_format)
argv = json.loads(options)
argv += [
LIGPREP_INPUT_OPTION[input_format], input_file, '-omae', 'void.mae',
'-px',
mm.mmpipeline_initialize()
]
self._lp = ligprepworkflow.LigPrepWorkflow(logger, argv)
[docs] def close(self):
self._lp.close()
[docs] def prepareStructure(self, ct):
'''
Applies ligprep to `ct`.
:return: (outcomes, failures, summary) tuple
:rtype: (list(structure.Structure), list(structure.Structure), dict)
'''
return self._lp.run(ct)
[docs] def yieldStructures(self):
'''
Reads consecutive entries from the input file, applies LigPrep
and yields resulting structures.
'''
with self._lp.makeInputReader() as reader:
while True:
try:
st = next(reader)
except StopIteration:
break
except Exception as e:
logger.warning('entry #%d: %s', reader.getOrigPos(), e)
continue
(outcomes, _, _) = self.prepareStructure(st)
if outcomes:
yield outcomes
# =============================================================================
# Command line
# =============================================================================
[docs]def add_ligprep_arguments(parser):
'''
Registers ligprep-specific options.
:param parser: Command line argument parser.
:type parser: `argparse.ArgumentParser`
'''
parser.add_argument(
'-ligprep',
action='store_true',
help='Process input structures using LigPrep. Required '
'for SMILES and 2D input or if structures lack Hydrogens. '
'Recommended for 3D input structures not in a low-energy '
'ionization/tautomeric state. Not compatible with '
'-distinct, -connect, -stereo, or -title.')
# custom options
parser.add_argument(
'-ligprep_options',
metavar='<text>',
default='["-epik", "-pht", "1.0", "-s", "16"]',
help='Options to pass to the ligprep work flow '
'(JSON-formatted list of strings. Default: \'%(default)s\').')
# -----------------------------------------------------------------------------
[docs]def validate_ligprep_arguments(args):
'''
Validates ligprep-specific command line options.
:param args: Populated `argparse.Namespace` instance.
:type args: `argparse.Namespace`
:return: Tuple of validation success, and error message.
:rtype: bool, str
'''
if args.ligprep:
ok, msg = validate_ligprep_options(args.ligprep_options)
if not ok:
return False, msg
return True, ''
# -----------------------------------------------------------------------------
[docs]def validate_ligprep_options(text):
'''
Validates ligprep-specific command-line arguments.
:param text: Ligprep options as list of strings in JSON format.
:type text: str
:return: Tuple of validation success, and error message.
:rtype: bool, str
'''
validate = Schema([str])
try:
obj = validate(json.loads(text))
except json.JSONDecodeError as e:
return False, f'malformed JSON: {e}'
except Invalid as e:
return False, f'unexpected type: {e}'
for bad in ('-imae', '-isd', '-ismi', '-icsv'):
if bad in obj:
return False, f'must not include {bad}'
lp_parser = arguments.make_parser()
def raise_error(msg):
raise RuntimeError(msg)
lp_parser.error = lambda msg: raise_error(msg)
try:
lp_parser.parse_args(arguments.preprocess_args(obj))
except RuntimeError as e:
return False, f'{e}'
return True, ''
# =============================================================================