import contextlib
import itertools
import os
import pprint
import shlex
import subprocess
from collections import namedtuple
import jinja2
from schrodinger.application.vss import csvsmiles
from schrodinger.application.vss import database
from schrodinger.application.vss import pilot
from schrodinger.job import jobcontrol
from schrodinger.job import queue
from schrodinger.utils import fileutils
from schrodinger.utils import log
JobAndOutcomes = namedtuple('JobAndOutcomes', ['job', 'outcomes'])
JobAndOutcomes.__doc__ += ': job handle & output filenames'
JobAndOutcomes.job.__doc__ = 'job handle'
JobAndOutcomes.outcomes.__doc__ = 'notable output filenames'
logger = log.get_output_logger('vss')
[docs]def get_jobname(ctrl, default='vs'):
return jobcontrol.get_jobname() or ctrl.jobname or default
[docs]def make_job(cmd):
logger.info('job: %s', ' '.join(map(shlex.quote, cmd)))
return queue.JobControlJob(command=cmd, command_dir=os.getcwd())
[docs]def make_dise_job(dise_ctrl, *, infile, n_total):
'''
:param dise_ctrl: DiSE parameters.
:type dise_ctrl: `schrodinger.application.vss.control.DiseControl`
:param n_total: Total number of ordered compounds to be DiSE-ed.
:type n_total: int
:param infile: Ordered compounds.
:type infile: str
:return: DiSE job and output file name.
:rtype: (`schrodinger.job.queue.JobControlJob`, str)
'''
jobname = fileutils.get_jobname(infile)
root, ext = fileutils.splitext(infile)
outfile = f'{jobname}-dise_cpds{ext}'
cmd = [
'run', 'dise_select.py', infile,
'-JOBNAME', jobname,
'-sim_threshold', str(dise_ctrl.similarity),
'-seed_cpds', str(int(dise_ctrl.seed * n_total + 0.5)),
'-num_cpds', str(n_total)
] # yapf: disable
return (make_job(cmd), outfile)
[docs]def make_glide_jobs(cfg, ctrl):
'''
Run glide for actives/decoys only (not "databases").
:param cfg: VSS runner configuration.
:type cfg: `schrodinger.application.vss.config.Config`
:param ctrl: Run specification.
:type ctrl: `schrodinger.application.vss.control.RunnerControl`
:return: Glide job and output file names or None.
:rtype: `JobAndOutcomes` or NoneType
'''
sources = [s for s in [ctrl.actives, ctrl.decoys] if s]
if not ctrl.glide or not sources:
return None
jobname = f'{get_jobname(ctrl)}_glide'
ligands = f'{jobname}_ligands.csv'
pilot.extract_smiles_and_cids(sources, ligands)
glide_inp = f'{jobname}.inp'
with open(glide_inp, 'w') as fp:
params = {'gridfile': ctrl.glide.grid, 'ligandfile': ligands}
fp.write(jinja2.Template(cfg.glide_inp).render(params))
cmd = ['glide', glide_inp]
if cfg.glide_host:
cmd += ['-HOST', cfg.glide_host]
if cfg.glide_driver_host:
cmd += ['-DRIVERHOST', cfg.glide_driver_host]
return JobAndOutcomes(make_job(cmd), [f'{jobname}_pv.maegz'])
[docs]def make_glide_al_jobs(cfg, ctrl):
'''
:param cfg: VSS runner configuration.
:type cfg: `schrodinger.application.vss.config.Config`
:param ctrl: Run specification.
:type ctrl: `schrodinger.application.vss.control.RunnerControl`
:return: Glide AL job and output file names or None.
:rtype: `JobAndOutcomes` or NoneType
'''
if not ctrl.glide_al:
return None
smiles = list(cfg.get_smiles_csv(ctrl.databases))
if not smiles:
return None
jobname = f'{get_jobname(ctrl)}_glide_al'
cpds_list_file = f'{jobname}_cpds'
with open(cpds_list_file, 'w') as fp:
fp.write('\n'.join(smiles))
cmd = [
'run', '-FROM', 'glide', 'glide_active_learning.py',
'screen',
'-grid', jobcontrol.get_runtime_path(ctrl.glide_al.grid),
'-infile_list_file', cpds_list_file,
'-smi_index', '1', '-name_index', '2',
'-train_time', str(ctrl.glide_al.training_time),
'-train_size', str(cfg.glide_al_train_size),
'-num_iter', str(cfg.glide_al_num_iter),
'-keep', str(ctrl.glide_al.n_report),
'-jobname', jobname
] # yapf: disable
if cfg.ligprep_args is not None:
cmd += ['-ligprep_args', ' '.join(cfg.ligprep_args)]
if cfg.glide_al_njobs:
cmd += ['-num_train_core', str(cfg.glide_al_njobs)]
if cfg.glide_al_glide_subjob_size:
cmd += ['-glide_subjob_size', str(cfg.glide_al_glide_subjob_size)]
if cfg.glide_al_eval_host:
cmd += ['-HOST', cfg.glide_al_eval_host]
if cfg.glide_al_driver_host:
cmd += ['-DRIVERHOST', cfg.glide_al_driver_host]
if cfg.glide_al_train_host:
cmd += ['-train_host', cfg.glide_al_train_host]
if cfg.glide_al_num_rescore_ligand:
cmd += ['-num_rescore_ligand', str(cfg.glide_al_num_rescore_ligand)]
if cfg.glide_al_block_size:
cmd += ['-block_size', str(cfg.glide_al_block_size)]
glide_al_outfile = f'{jobname}_pred_iter_{cfg.glide_al_num_iter}.csv'
glide_al_job = make_job(cmd)
if ctrl.dise:
dise_job, dise_outfile = make_dise_job(ctrl.dise,
infile=glide_al_outfile,
n_total=ctrl.glide_al.n_report)
dise_job.addPrereq(glide_al_job)
return JobAndOutcomes(dise_job, [dise_outfile])
else:
return JobAndOutcomes(glide_al_job, [glide_al_outfile])
[docs]def get_ligprep_command(cfg, infile, outfile, jobname=None):
cmd = ['ligprep', '-ismi', infile, '-omae', outfile, '-NJOBS', '1']
if cfg.ligprep_args is not None:
cmd += cfg.ligprep_args
if jobname:
cmd += ['-JOBNAME', jobname]
return cmd
[docs]def get_shape_query(cfg, ctrl):
'''
:param cfg: VSS runner configuration.
:type cfg: `schrodinger.application.vss.config.Config`
:param ctrl: Run specification.
:type ctrl: `schrodinger.application.vss.control.RunnerControl`
:return: Shape query filename (maestro) and job handle.
:rtype: (str, `schrodinger.job.queue.JobControlJob` or NoneType)
'''
if not ctrl.shape:
return (None, None)
jobname = f'{get_jobname(ctrl)}_shape_query'
if ctrl.shape.query:
query = jobcontrol.get_runtime_path(ctrl.shape.query)
query_format = fileutils.get_structure_file_format(query)
if query_format in (fileutils.MAESTRO, fileutils.SD):
return (query, None)
elif query_format in (fileutils.SMILES, fileutils.SMILESCSV):
query_lp = f'{jobname}.maegz'
lp_cmd = get_ligprep_command(cfg, query, query_lp, jobname=jobname)
query_job = make_job(lp_cmd)
return (query_lp, query_job)
else:
raise ValueError(f'unsupported shape query format: {query_format}')
else:
actives = jobcontrol.get_runtime_path(ctrl.actives.filename)
actives_mae = f'{jobname}_actives.maegz'
lp_cmd = get_ligprep_command(cfg,
actives,
actives_mae,
jobname=f'{jobname}_actives')
lp_job = make_job(lp_cmd)
query_cmd = [
'run', 'shape_pick_diverse.py', '-shape_type',
ctrl.shape.shape_type, actives_mae, '-JOBNAME', f'{jobname}_pick'
]
if cfg.shape_host:
query_cmd += ['-HOST', cfg.shape_host]
query_job = make_job(query_cmd)
query_job.addPrereq(lp_job)
query = f'{jobname}_pick-out.maegz'
return (query, query_job)
[docs]def make_shape_jobs(cfg, ctrl):
'''
:param cfg: VSS runner configuration.
:type cfg: `schrodinger.application.vss.config.Config`
:param ctrl: Run specification.
:type ctrl: `schrodinger.application.vss.control.RunnerControl`
:return: Shape job and output file names or None.
:rtype: `JobAndOutcomes` or NoneType
'''
if not ctrl.shape:
return None
shape_data, shape_data_is_local = cfg.get_shape_data(
ctrl.databases, shape_type=ctrl.shape.shape_type)
if not shape_data:
return None
jobname = f'{get_jobname(ctrl)}_shape'
shape_data_treatment = 'copy' if shape_data_is_local else 'remote'
query, query_job = get_shape_query(cfg, ctrl)
outfiles = [query] if query_job else []
shape_data_list_file = f'{jobname}_shape_data_list'
with open(shape_data_list_file, 'w') as fp:
fp.write('\n'.join(shape_data))
cmd = [
'shape_screen_gpu', 'run',
'-shape', query,
'-screen_list', shape_data_list_file,
'-shape_data_treatment', shape_data_treatment,
'-keep', str(ctrl.shape.n_report),
'-JOBNAME', jobname
] # yapf: disable
if cfg.shape_host:
cmd += ['-HOST', cfg.shape_host]
if cfg.shape_driver_host:
cmd += ['-DRIVERHOST', cfg.shape_driver_host]
shape_job = make_job(cmd)
if query_job:
shape_job.addPrereq(query_job)
ctrl.shape.query_is_synthetic = True
shape_outfile = f'{jobname}-out.maegz'
if ctrl.dise:
dise_job, dise_outfile = make_dise_job(ctrl.dise,
infile=shape_outfile,
n_total=ctrl.shape.n_report)
dise_job.addPrereq(shape_job)
return JobAndOutcomes(dise_job, outfiles + [dise_outfile])
else:
return JobAndOutcomes(shape_job, outfiles + [shape_outfile])
[docs]def make_ligand_ml_training_set(cfg, ctrl):
'''
Concatenates actives and decoys into single
file ("training set" to be used for Ligand ML).
:param cfg: VSS runner configuration.
:type cfg: `schrodinger.application.vss.config.Config`
:param ctrl: Run specification.
:type ctrl: `schrodinger.application.vss.control.RunnerControl`
:return: CSV SMILES file with "truth" column.
:rtype: `csvsmiles.CsvSmilesFile`
'''
outcome = csvsmiles.CsvSmilesFile(filename=get_new_filename(
f'{get_jobname(ctrl)}_training_set', '.csv'),
cid_col=database.CID_COL,
truth_col=database.TRUTH_COL,
smiles_col=database.SMILES_COL)
with contextlib.ExitStack() as stack:
writer = outcome.get_dict_writer(stack,
fieldnames=[
database.SMILES_COL,
database.CID_COL,
database.TRUTH_COL
])
writer.writeheader()
for (src, t) in zip([ctrl.actives, ctrl.decoys], [1, 0]):
for row in src.get_dict_reader(stack):
writer.writerow({
database.SMILES_COL: row[src.smiles_col],
database.CID_COL: row[src.cid_col],
database.TRUTH_COL: t
})
return outcome
[docs]def make_ligand_ml_jobs(cfg, ctrl):
'''
:param cfg: VSS runner configuration.
:type cfg: `schrodinger.application.vss.config.Config`
:param ctrl: Run specification.
:type ctrl: `schrodinger.application.vss.control.RunnerControl`
:return: Ligand ML job and output file names or None.
:rtype: `JobAndOutcomes` or NoneType
'''
if not ctrl.ligand_ml:
return None
smiles = list(cfg.get_smiles_csv(ctrl.databases))
if not smiles:
return None
jobname = f'{get_jobname(ctrl)}_ligand_ml'
num_subjobs = cfg.ligand_ml_njobs or 1
model = f'{jobname}.qzip'
base_cmd = [
'run', 'ligand_ml_driver.py', model
] # yapf: disable
training_set = make_ligand_ml_training_set(cfg, ctrl)
build_cmd = base_cmd + [
'-build', '-i',
training_set.filename,
'-time', str(ctrl.ligand_ml.training_time),
'-y', training_set.truth_col,
'-smiles_col', training_set.smiles_col,
'-classification',
'-NUM_SUBJOBS', str(num_subjobs)
] # yapf: disable
if cfg.ligand_ml_train_host:
build_cmd += ['-HOST', cfg.ligand_ml_train_host]
if cfg.ligand_ml_driver_host:
build_cmd += ['-DRIVERHOST', cfg.ligand_ml_driver_host]
build_job = make_job(build_cmd)
# placeholder for now:
# need to either merge all databases (not ideal),
# or run multiple predictions and merge (needs another job-controllable
# script in current paradigm)
smiles_files = list(cfg.get_smiles_csv(ctrl.databases))
if not smiles_files:
return None
jobname = f'{get_jobname(ctrl)}_ligand_ml'
outfile = f'{jobname}_pred.csv'
cpds_list_file = f'{jobname}_cpds'
with open(cpds_list_file, 'w') as fp:
fp.write('\n'.join(smiles_files))
predict_cmd = [
'run', '-FROM', 'glide', 'glide_active_learning.py', 'evaluate',
'-model', model, '-infile_list_file', cpds_list_file, '-name_index',
'2', '-smi_index', '1', '-num_rescore_ligand', '0'
]
if cfg.ligand_ml_eval_host:
predict_cmd += ['-HOST', cfg.ligand_ml_eval_host]
if cfg.ligand_ml_driver_host:
predict_cmd += ['-DRIVERHOST', cfg.ligand_ml_driver_host]
if cfg.glide_al_block_size:
predict_cmd += ['-block_size', str(cfg.glide_al_block_size)]
predict_job = make_job(predict_cmd)
predict_job.addPrereq(build_job)
# skip DiSE for now: need to "glide_sort" the prediction outcomes
# by ranking_score = score - 1.96*std
# (another job-controllable script? or modify ligand_ml_driver.py?)
return JobAndOutcomes(predict_job, [outfile])
[docs]def get_new_filename(root, ext):
'''
Returns either `root + ext` (if no such file exists) or a non-existing
variation of it.
'''
fn = root + ext
n = 0
while os.path.exists(fn):
n = n + 1
fn = f'{root}_{n}{ext}'
return fn
[docs]def ensure_decoys(cfg, ctrl):
'''
Sample decoys from a pool if not provided via control.
:param cfg: VSS runner configuration.
:type cfg: `schrodinger.application.vss.config.Config`
:param ctrl: Run specification.
:type ctrl: `schrodinger.application.vss.control.RunnerControl`
'''
logger.info('actives:\n%s', pprint.pformat(ctrl.actives.to_dict()))
valid, msg = ctrl.actives.validate()
if not valid:
raise RuntimeError(msg)
if ctrl.decoys:
valid, msg = ctrl.decoys.validate()
if not valid:
raise RuntimeError(msg)
ctrl.synthetic_decoys = False
else:
logger.info('sampling decoys ...')
# column names from the property_matched_decoys.py
ctrl.decoys = csvsmiles.CsvSmilesFile(filename=os.path.abspath(
get_new_filename('decoys', '.csv.gz')),
smiles_col='SMILES',
cid_col='CID')
subprocess.run([
os.path.join(os.getenv('SCHRODINGER'),
'run'), 'property_matched_decoys.py', 'sample',
'-smiles_col', ctrl.actives.smiles_col, '-n', '100', '-outfile',
ctrl.decoys.filename, ctrl.actives.filename, '-pool',
cfg.decoys_pool
],
check=True)
backend = jobcontrol.get_backend()
if backend:
backend.addOutputFile(ctrl.decoys.filename)
ctrl.synthetic_decoys = True
logger.info('decoys:\n%s', pprint.pformat(ctrl.decoys.to_dict()))
[docs]def run_production(cfg, ctrl):
'''
Executes "virtual screening" tasks.
:param cfg: VSS runner configuration.
:type cfg: `schrodinger.application.vss.config.Config`
:param ctrl: Run specification.
:type ctrl: `schrodinger.application.vss.control.RunnerControl`
'''
jobname = get_jobname(ctrl)
logger.info('jobname: %s', jobname)
for dbname in ctrl.databases:
logger.info('database: %s', dbname)
if ctrl.glide or ctrl.glide_al or ctrl.ligand_ml:
ensure_decoys(cfg, ctrl)
tasks = {
'glide': make_glide_jobs(cfg, ctrl),
'glide_al': make_glide_al_jobs(cfg, ctrl),
'shape': make_shape_jobs(cfg, ctrl),
'ligand_ml': make_ligand_ml_jobs(cfg, ctrl),
}
jobdj = queue.JobDJ(verbosity='verbose')
for t in tasks.values():
if t:
jobdj.addJob(t.job)
if jobdj.total_added:
jobdj.run()
backend = jobcontrol.get_backend()
for fn in itertools.chain.from_iterable(
t.outcomes for t in tasks.values() if t):
logger.info('outcome: %s', fn)
if backend:
backend.addOutputFile(fn)
[docs]def run_pilot(cfg, ctrl):
'''
Executes "virtual screening" pilot.
:param cfg: VSS runner configuration.
:type cfg: `schrodinger.application.vss.config.Config`
:param ctrl: Run specification.
:type ctrl: `schrodinger.application.vss.control.RunnerControl`
'''
jobname = get_jobname(ctrl, 'vs-pilot')
logger.info('running "pilot"')
logger.info('jobname: %s', jobname)
ensure_decoys(cfg, ctrl)
logger.info('making database ...')
dbname = get_new_filename(f'{jobname}_db', '')
shape_type = ctrl.shape.shape_type if ctrl.shape else None
pilot.make_database(dbname, [ctrl.actives, ctrl.decoys], shape_type)
ctrl.jobname = jobname
ctrl.databases = [dbname]
ctrl.dise = None
ctrl.glide_al = None
ctrl.ligand_ml = None
cfg.databases = [database.Database(dbname)]
run_production(cfg, ctrl)
# TODO: enrichments => what about "n_report"?