"""
Implementation of screening large library with active learning scheme.
Active learning scheme
1. Select N ligands from the library
2. Dock the selected portion of the library.
3. Train a ligand_ml model with the scores.
4. Evaluate the whole library with the generated ligand_ml model.
5. Pick N from the top M best ligands predicted by the ligand_ml model.
6. Dock the ligands picked in step 5 and repeat step 3 until it reaches
num_iter.
Copyright Schrodinger Inc, All Rights Reserved.
"""
import argparse
import csv
import itertools
import math
import os
import pickle
import sys
from collections import OrderedDict
from rdkit import Chem
from schrodinger.active_learning import al_utils
from schrodinger.active_learning.al_utils import CSV_FORMAT
from schrodinger.active_learning.al_utils import EVAL_TASK
from schrodinger.active_learning.al_utils import MINIMAL_LIGAND_ML_TRAIN
from schrodinger.active_learning.al_utils import PILOT_TASK
from schrodinger.active_learning.al_utils import SCREEN_TASK
from schrodinger.active_learning.al_utils import SMILES_FORMAT
from schrodinger.active_learning.al_utils import TRUTH_COL
from schrodinger.active_learning.al_utils import get_file_ext
from schrodinger.active_learning.al_utils import positive_int
from schrodinger.job import jobcontrol
from schrodinger.utils import fileutils
from schrodinger.utils import log
try:
import resource
except ModuleNotFoundError:
resource = None
logger = log.get_output_logger(__file__)
LOCALHOST = "localhost"
PRED_Y = "Pred"
JOBDIR_PREFIX = "iter"
MINIMAL_TOTAL_LIGAND = 100
TASK_LIST = [SCREEN_TASK, EVAL_TASK, PILOT_TASK]
# Map translating Python types into ConfigObj types.
TYPEMAP = {
bool: 'boolean',
int: 'integer',
float: 'float',
str: 'string',
positive_int: 'integer'
}
FINISHALL = 'FinishAll'
[docs]class Option:
"""
A class to represent "options" which may be translated into argparse
command-line arguments or an InputConfig spec for parsing input files.
This is used to support the behavior of the legacy SiteMap driver, where
every option could be specified in an input file or on the command line,
with the latter taking precedence.
"""
[docs] def __init__(self,
*names,
dest=None,
help=None,
type=str,
metavar=None,
default=None,
action=None,
nargs=None,
choices=None,
required=False):
"""
The arguments all have the same meaning as for
argparse.ArgumentParser.add_argument(), except `min` and `max` which
are only used by ConfigObj and limit the range of allowed values for
numeric types.
"""
self.names = names
self.dest = dest
self.help = help
self.metavar = metavar
self.type = type
self.action = action
self.nargs = nargs
self.choices = choices
self.default = default
self.required = required
[docs] def toArgparse(self, parser):
"""
Add an option to an argument parser.
:param parser: argument parser
:type parser: arparse.ArgumentParser
"""
if self.action is None:
parser.add_argument(*self.names,
dest=self.dest,
help=self.help,
type=self.type,
nargs=self.nargs,
choices=self.choices,
metavar=self.metavar,
required=self.required,
action=self.action,
default=self.default)
else:
parser.add_argument(*self.names,
dest=self.dest,
help=self.help,
required=self.required,
action=self.action)
[docs] def toConfigObj(self):
"""
Return a ConfigObj validator spec for self.
:return: validation spec
:rtype: str
"""
typename = TYPEMAP[self.type]
restrictions = ''
default = self.default
if self.type == str and default is not None:
default = f"'{self.default}'"
if self.type == positive_int:
restrictions += 'min=1'
if self.action == 'append':
default = 'list()'
typename += '_list'
comment = ""
if self.help:
comment = f" # {self.help}"
name = self.names[0].upper().replace('-', '')
if self.action == "store_false":
typename = "boolean"
default = True
name = self.dest.upper()
if self.action == "store_true":
typename = "boolean"
default = False
name = self.dest.upper()
if self.choices and self.type == str:
typename = 'option'
restrictions += ', '.join(f'"{s}"' for s in self.choices)
restrictions += ', ' # because default always follows
line = "{} = {}({}default={}){}".format(name, typename, restrictions,
default, comment)
return line
[docs] def toSubparser(self, subparsers):
"""
Create a subparser for certain task.
:return: argument parser
:rtype: argparse.ArgumentParser
"""
subparse = subparsers.add_parser(*self.names, help=self.help)
return subparse
TASK_OPTIONS = [
Option(SCREEN_TASK,
help='Screen library with active learning.\
For help message type screen -h'),
Option(EVAL_TASK,
help='Evaluate library with generated ML model.\
For help message type evaluate -h'),
Option(PILOT_TASK,
help='Assess active learning workflow on the library with pilot \
mode. For help message type pilot -h')
]
SHARED_OPTIONS = [
# Hidden option: Sample ligands with largest uncertainty in the defined ratio
Option("-uncertainty_sample_ratio",
type=float,
default=0.1,
help=argparse.SUPPRESS),
Option("-random_seed",
type=int,
metavar="<random_seed_number>",
help="Random seed number for shuffling all the ligands \
and seeding ligand_ml training."),
Option("-overwrite_args",
action="store_true",
dest="overwrite_args",
help="Overwrite previous arguments, Default is False."),
Option("-force_restart",
action="store_true",
dest="force_restart",
help="Force the workflow to restart when some restarting files "
"are missing.")
]
SHARED_FILE_OPTIONS = [
Option("-infile_list_file",
metavar="<infile_path_list>",
help="A file that contains a list of infile paths."),
Option("-block_size",
metavar="<num_lig_per_block>",
type=int,
default=100000,
help="Number of ligands in each sub input ligands file. Default is "
"100,000."),
Option("-smi_index",
metavar="<smiles_column_index>",
type=int,
default=1,
help="1-based column index of ligand's SMILES. Default is 1."),
Option("-name_index",
metavar="<title_column_index>",
type=int,
default=2,
help="1-based column index of ligand's title. Default is 2."),
Option("-no_header",
action="store_false",
dest="with_header",
help="Whether the input file(s) has header in the first line."),
Option("-result_prefix",
metavar="<output_file_prefix>",
help="prefix of the .csv result files. Default is -jobname."),
Option("-remote_input_ligands",
action="store_true",
dest="remote_input_ligands",
help="Whether input ligand files are located at remote. Absolute "
"paths of input ligand files are required if this flag is "
"provided."),
Option("-restart_file",
type=str,
metavar="<restart_pkl_file>",
help=".pkl file for restarting or continuing the active learning "
"workflow."),
Option("-score_file", help=argparse.SUPPRESS),
Option("-local_args_file", help=argparse.SUPPRESS)
]
SHARED_JOB_OPTIONS = [
Option("-jobname",
metavar="<jobname>",
help="Job name of the active learning workflow run."),
Option("-stop_after",
metavar="<stop_workflow_after_stage>",
type=str,
help="Terminate the workflow after the specified stage finished. "
f"Specify '{FINISHALL}' to run all the remaining stages."),
Option("-max_ml_eval_cpu",
metavar="<maximum_ml_evaluation_cpu>",
type=positive_int,
help="Allowed maximum number of CPU for machine learning evaluation "
"subjobs.")
]
SCREEN_OPTIONS = [
Option("-selection_rule",
metavar="<training_ligands_selection_protocol>",
type=str,
default="diversity",
choices=["random", "diversity", "most_uncertain"],
help="Rule of selecting training set ligands. Supported selection "
"rule: [random, diversity, most_uncertain], Default is diversity."),
Option("-num_iter",
metavar="<num_of_iteration>",
type=int,
default=3,
help="Number of active learning iteration. Default is 3."),
Option("-train_time",
metavar="<hours>",
type=float,
default=4.0,
help="Floating point time limit for training deep learning models.\
Default is 4 hours."),
Option("-train_host",
metavar="<ligand_ml_training_host>",
help="ligand_ml training host name. Default is the same as -HOST."),
Option("-num_train_core",
type=int,
default=1,
metavar="<ligand_ml_training_core>",
help="Number of cpu or gpu for ligand_ml training. Default is 1."),
Option("-chosen_models",
metavar="<ligand_ml_chosen_models>",
type=str,
help="Type of sub-models to consider in ligand_ml. chosen_models "
"should be contained in a string and separated by space."
"Default is all available models in ligand_ml."),
Option("-pilot_score_file",
type=str,
metavar="<score_csv_file>",
help="Pilot ligands score .csv file generated by pilot mode."),
Option("-score_failed_ratio", type=float, default=1, help=argparse.SUPPRESS)
]
EVAL_OPTIONS = [
Option("-model",
metavar="<ml_model.qzip>",
help="Generated ligand_ml .qzip model.")
]
PILOT_OPTIONS = [
Option("-selection_rule",
metavar="<training_ligands_selection_protocol>",
type=str,
default="diversity",
choices=["random", "diversity", "most_uncertain"],
help="Rule of selecting training set ligands. Supported selection "
"rule: [random, diversity, most_uncertain] Default is diversity."),
Option("-train_time",
metavar="<hours>",
type=float,
default=4,
help="Floating point time limit for training pilot deep learning "
"models. Default is 4 hours."),
Option("-chosen_models",
metavar="<ligand_ml_chosen_models>",
type=str,
help="Type of sub-models to consider in ligand_ml. chosen_models "
"should be listed in a string and separated by space."
"Default is all available models in ligand_ml."),
Option("-train_host",
metavar="<ligand_ml_training_host>",
help="ligand_ml training host name. Default is the same as -HOST."),
Option("-num_train_core",
type=int,
default=1,
metavar="<ligand_ml_training_core>",
help="Number of cpu or gpu for ligand_ml training. Default is 1."),
]
SHARED_MUTUALLY_EXCLUSIVE_OPTIONS = [
[
Option(
"-keep",
metavar="<num_returned_ligand>",
type=int,
default=10_000_000,
help="Number of best ligands to be returned. Default is 10,000,000."
),
Option("-keep_fraction",
metavar="<fraction_of_returned_ligand>",
type=float,
help="Fraction of the ligands to be returned."),
],
[
Option(
"-num_rescore_ligand",
metavar="<num_rescore_ligand>",
type=int,
default=1_000_000,
help="Number of the best ligands to do rescore with Glide. Default "
"is 1,000,000."),
Option("-rescore_ligand_fraction",
metavar="<fraction_of_rescore_ligand>",
type=float,
help="Fraction of the ligands to be rescored."),
]
]
[docs]def get_workflow_node_names(task, num_iter, use_known_score, run_rescore_ligand,
al_node_supplier):
"""
Return a list of stages needed to complete the workflow based on the task
type, number of iteration, whether score is known and whether
to run rescore stage.
:param task: workflow task type
:type task: str in [SCREEN_TASK, PILOT_TASK or EVAL_TASK]
:param num_iter: number of iterations
:type num_iter: int
:param use_known_score: Use known scores in score_file to obtain the score.
:type use_known_score: bool
:param run_rescore_ligand: run rescore stage for ligand.
:type run_rescore_ligand: bool
:param al_node_supplier: Supplier of active learning nodes
:type al_node_supplier: ActiveLearningNodeSupplier
:return: list of names of stages needed to complete the workflow
:rtype: list(str)
"""
candidate_node_names = []
if task == PILOT_TASK:
current_iter = 0
node_class = al_node_supplier.pilot_score_node
candidate_node_names.append(node_class.getName(current_iter))
if task in [SCREEN_TASK, PILOT_TASK]:
node_class_list = ActiveLearningJob.getNodeClasses(
use_known_score, al_node_supplier)
for current_iter in range(1, num_iter + 1):
for node_class in node_class_list:
candidate_node_names.append(node_class.getName(current_iter))
if task == EVAL_TASK:
current_iter = 0
node_class = al_node_supplier.ligand_ml_eval_node
candidate_node_names.append(node_class.getName(current_iter))
if run_rescore_ligand:
node_class = al_node_supplier.rescore_node
candidate_node_names.append(node_class.getName(current_iter))
return candidate_node_names
[docs]def validate_stop_after(stop_after_node, task, num_iter, use_known_score,
run_rescore_ligand, restart_file, al_node_supplier):
"""
Check whether the node name user specified in -stop_after is valid.
:param stop_after_node: name of the node where workflow will exit when
it was finished.
:type stop_after_node: str in [ActiveLearningNode name] or 'FinishAll'
:param task: workflow task type
:type task: str in [SCREEN_TASK, PILOT_TASK or EVAL_TASK]
:param num_iter: number of iterations
:type num_iter: int
:param use_known_score: Use known scores in use_known_score to obtain
the score.
:type use_known_score: bool
:param run_rescore_ligand: run rescore stage for ligands.
:type run_rescore_ligand: bool
:param al_node_supplier: Supplier of active learning nodes
:type al_node_supplier: ActiveLearningNodeSupplier
:return: error message if validation failed; None if it passed
:rtype: str or None
"""
finished_node = ActiveLearningJob.LoadPreviousNodes(restart_file) \
if restart_file else []
candidate_node_names = [
node_name for node_name in get_workflow_node_names(
task, num_iter, use_known_score, run_rescore_ligand,
al_node_supplier) if node_name not in finished_node
] + [FINISHALL]
if candidate_node_names == [FINISHALL]:
error_msg = "Error: All stages have finished. No more stage to run."
return error_msg
if stop_after_node not in candidate_node_names:
error_msg = f"Error: Stop after stage {stop_after_node} is not in " \
f"the stages to run. Please choose from" \
f" {candidate_node_names}."
return error_msg
stages_to_run = candidate_node_names[:candidate_node_names.
index(stop_after_node) + 1]
stages_to_run = [node_name for node_name in stages_to_run if node_name != \
FINISHALL]
left_stages = [
node_name for node_name in candidate_node_names
if node_name not in stages_to_run and node_name != FINISHALL
]
logger.info(f"Stages in current job: {stages_to_run}.\n"
f"Stages not in current job: {left_stages}.")
[docs]def count_ligands(file_list, with_header=True):
"""
Count the number of ligands in all the files by counting the total
number of lines. We assume each line contains a SMILES string.
:param file_list: list of input file paths.
:type file_list: list(str)
:param with_header: Whether the input files have header.
:type with_header: bool
:return: Number of ligands in all the input files.
:rtype: int
"""
total_line = 0
for file in file_list:
total_line += fileutils.count_lines(file)
if with_header:
total_line -= 1
return total_line
[docs]class ActiveLearningJob:
[docs] def __init__(self, args, al_node_supplier):
"""
Initialize the ActiveLearningJob from the cmd argumenets.
:param args: argument namespace with command line options
:type args: argparse.Namespace
"""
self.args = args
self.al_node_supplier = al_node_supplier
self.jobname = self.args.jobname
self.subjob_prefix = self.jobname + "_sub"
self.sub_infile_list = []
self.total_ligand_num = None
self.sub_csv_header = None
self.backend = None
self.known_title_to_score = None
self.nodes_to_run = None
self.finished_nodes = None
self.pilot_score_file = None
self.optional_restart_files = []
self.restart_dict = {}
self.restart_file = "{}_restart.pkl".format(self.jobname)
self.discard_cutoff = 10
self.ascending = True
[docs] @staticmethod
def LoadPreviousNodes(restart_file):
"""
Load nodes that were finished in previous job.
:param restart_file: filename of the AL .pkl restart file
:type restart_file: str
:return: Nodes that were finished in previous job.
:rtype: OrderedDict that maps node name to node instance.
"""
finished_nodes = OrderedDict()
if restart_file:
with open(restart_file, 'rb') as f:
finished_nodes = pickle.load(f)["finished_nodes"]
return finished_nodes
[docs] @staticmethod
def getNodeClasses(use_known_score, al_node_supplier):
"""
Return a list of node classes to run based on the job type.
:param use_known_score: Use known scores in score_file to obtain
the score.
:type use_known_score: bool
:param al_node_supplier: Supplier of active learning nodes
:type al_node_supplier: ActiveLearningNodeSupplier
:return: a list of ActiveLearningNode subclass
:rtype: list
"""
if use_known_score:
score_node = al_node_supplier.known_score_provider_node
else:
score_node = al_node_supplier.calculate_score_node
node_class_list = [
al_node_supplier.prepare_smi_node, score_node,
al_node_supplier.ligand_ml_train_node,
al_node_supplier.ligand_ml_eval_node
]
return node_class_list
[docs] def LoadOptionalRestartFiles(self):
"""
Load the restart files for the possible restarting of the running node.
:return: list of filenames
:type: list(str) or None
"""
optional_restart_files = []
if self.args.restart_file:
with open(self.args.restart_file, 'rb') as f:
optional_restart_files = pickle.load(f).get(
"optional_restart_files")
return optional_restart_files
@property
def scored_csv_file_list(self):
"""
Get all the .csv files that contain scored ligands from ScoreProviderNode.
:return: list of .csv files contain score ligands.
:rtype: list(str)
"""
csv_list = []
if self.args.task == SCREEN_TASK and self.args.pilot_score_file:
csv_list.append(self.pilot_score_file)
for node_name, node in self.finished_nodes.items():
if isinstance(node, self.al_node_supplier.score_provider_node):
if not os.path.isfile(node.score_csv_file):
logger.warning(f"Warning: Score file for ML training "
f"{node.score_csv_file} does not exist.")
continue
csv_list.append(node.score_csv_file)
return csv_list
@property
def restart_files(self):
"""
Get all the necessary files for restarting the workflow from finished nodes.
:return: a set of files for restarting.
:rtype: set(str)
"""
restart_files = set()
for node in self.finished_nodes.values():
for node_restart in node.restart_files:
restart_files.add(node_restart)
return restart_files
[docs] def getPilotScoreFile(self):
"""
Reorder the columns in the pilot ligand score file for the
use of machine learning model training input.
:return: name of reorder .csv file.
:rtype: str
"""
import pandas as pd
if self.args.task != SCREEN_TASK or self.args.pilot_score_file is None:
return None
pilot_score_file_reordered = os.path.splitext(
self.args.pilot_score_file)[0] + "_reordered.csv"
score_df = pd.read_csv(self.args.pilot_score_file)
# In PilotScoreNode the pilot score file follow order: title, score,
# SMILES
score_df.columns = ["Title", TRUTH_COL, "SMILES"]
score_df.to_csv(pilot_score_file_reordered,
columns=["SMILES", TRUTH_COL, "Title"],
index=False)
return pilot_score_file_reordered
[docs] def checkOSFileLimit(self):
"""
Check the system file descriptors limit.
"""
if resource is None:
return
padding = 100
soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
target_limit = math.ceil(self.total_ligand_num / self.args.block_size)
if target_limit + padding > soft_limit:
logger.error(
f"Error: Current job settings will result in {target_limit} "
f"open files. We estimate that more than {soft_limit-padding} "
f"open files will likely lead to job failure. Please increase "
f"-block_size so that fewer files are opened by the job or "
f"increase the OS file descriptor limit")
sys.exit(1)
[docs] def getNodesToRun(self):
"""
Return nodes to run and finished nodes for current active
learning job.
"""
finished_nodes = ActiveLearningJob.LoadPreviousNodes(
self.args.restart_file)
nodes_to_run = OrderedDict()
if self.args.task == PILOT_TASK:
iter_num = 0
job_dir = "{}_{}_pilot".format(self.jobname, JOBDIR_PREFIX)
fileutils.mkdir_p(job_dir)
pilot_score_node = self.al_node_supplier.pilot_score_node(
self.args, iter_num, self.jobname, job_dir)
if pilot_score_node.node_name not in finished_nodes:
nodes_to_run[pilot_score_node.node_name] = pilot_score_node
if self.args.task in [SCREEN_TASK, PILOT_TASK]:
node_class_list = ActiveLearningJob.getNodeClasses(
bool(self.args.score_file), self.al_node_supplier)
for iter_num in range(1, self.args.num_iter + 1):
current_iter = iter_num
job_dir = "{}_{}_{}".format(self.jobname, JOBDIR_PREFIX,
iter_num)
fileutils.mkdir_p(job_dir)
for node_class in node_class_list:
node = node_class(self.args, iter_num, self.jobname,
job_dir)
if node.node_name not in finished_nodes:
nodes_to_run[node.node_name] = node
if self.args.task == EVAL_TASK:
current_iter = 0
node = self.al_node_supplier.ligand_ml_eval_node(
self.args, current_iter, self.jobname, ".")
if node.node_name not in finished_nodes:
nodes_to_run[node.node_name] = node
if self.args.num_rescore_ligand:
node = self.al_node_supplier.rescore_node(self.args, current_iter,
self.jobname, ".")
if node.node_name not in finished_nodes:
nodes_to_run[node.node_name] = node
return nodes_to_run, finished_nodes
[docs] def getRestartNode(self):
"""
Get the node for restarting the workflow.
:return: last finished node
:rtype: ActiveLearningNode
"""
for node in list(self.finished_nodes.values())[::-1]:
if not isinstance(node, self.al_node_supplier.rescore_node):
return node
[docs] def getLocalArgs(self):
"""
:return: arguments on local machine.
:rtype: argparse.Namespace
"""
if jobcontrol.get_backend():
with open(self.args.local_args_file, "rb") as f:
local_args = pickle.load(f)
else:
local_args = self.args
return local_args
[docs] def runNodes(self):
"""
Run all the ActiveLearningNode instances in self.nodes_to_run.
"""
local_args = self.getLocalArgs()
self.restart_dict = {
"remote_args": self.args,
"local_args": local_args,
"finished_nodes": self.finished_nodes,
"restart_files": self.restart_files,
"optional_restart_files": self.optional_restart_files
}
with open(self.restart_file, 'wb') as f:
pickle.dump(self.restart_dict, f, 0)
if self.backend:
self.backend.addOutputFile(self.restart_file)
inputs = dict(self.getInitialInputs())
try:
for node_name, node in self.nodes_to_run.items():
logger.info("Running node {}".format(node_name))
inputs.update({"active_learning_job": self})
node.runNode(**inputs)
inputs = dict(node.input_for_next_node)
self.finished_nodes[node.node_name] = node
self.optional_restart_files = node.optional_restart_files
with open(self.restart_file, 'wb') as f:
self.restart_dict["restart_files"] = self.restart_files
self.restart_dict["optional_restart_files"] = \
self.optional_restart_files
pickle.dump(self.restart_dict, f, 0)
al_utils.add_output_file(self.restart_file)
if self.args.stop_after is not None and node_name \
== self.args.stop_after:
logger.info(f"Last requested stage"
f" {self.args.stop_after} finished. ")
break
finally:
combined_logfile = self.jobname + "_subjobs.log"
subjob_logfile_list = [
node.log_file
for node in self.nodes_to_run.values()
if node.log_file and os.path.isfile(node.log_file)
]
al_utils.concatenate_logs(combined_logfile, subjob_logfile_list,
logger)
al_utils.add_output_file(combined_logfile)
[docs]def read_paths_listed_in_file(old_paths, paths_list_file):
"""
Add the paths specified in the paths_list_file to old_paths.
:param old_paths: None or list of original paths
:type old_paths: list
:param paths_list_file: path of the file that contains paths to be added
:type paths_list_file: string
:return: list of paths
:rtype: list(str)
"""
screen = old_paths.copy() if old_paths else []
if paths_list_file:
with open(paths_list_file, 'r') as fh:
screen += [
x.strip() for x in fh if not x.startswith("#") and x.strip()
]
return list(set(screen))
[docs]def restart_args_handler(args):
"""
Load the previous arguments stored in args.restart_file.
:param args: argument namespace with command line options
:type args: argparse.Namespace
:return: updated argument namespace, argument namespace of previous job or
None
:rtype: argument namespace, argument namespace or None
"""
if args.restart:
restart_file = args.jobname + "_restart.pkl"
if not args.restart_file:
if not os.path.isfile(restart_file):
logger.error(f"Error: Can not find restart .pkl file"
f" {restart_file}.")
sys.exit(1)
args.restart_file = restart_file
if args.restart_file:
with open(args.restart_file, 'rb') as f:
restart_dict = pickle.load(f)
restart_input_files = restart_dict["restart_files"]
optional_restart_files = restart_dict.get("optional_restart_files",
[])
args.restart_input_files = restart_input_files
args.optional_restart_files = optional_restart_files
if not args.overwrite_args:
if jobcontrol.get_backend():
prev_args = restart_dict["remote_args"]
else:
prev_args = restart_dict["local_args"]
prev_args.restart_file = args.restart_file
prev_args.restart_input_files = restart_input_files
prev_args.optional_restart_files = optional_restart_files
# Always overwrite args.stop_after and args.train_host if they
# were specified in the restart run.
if args.stop_after is not None:
prev_args.stop_after = args.stop_after
if args.task in [SCREEN_TASK, PILOT_TASK] and args.train_host\
is not None:
prev_args.train_host = args.train_host
for key, value in args.__dict__.items():
if not hasattr(prev_args, key):
setattr(prev_args, key, value)
logger.info("Restoring previous workflow...")
logger.info("Previous arguments:")
for key, value in prev_args.__dict__.items():
logger.info("argument: {:<30} value: {}".format(
key, value))
return args, prev_args
return args, None
[docs]def common_parse_args(args):
"""
Parses command-line arguments.
:param args: argument namespace with command line options
:type args: argparse.Namespace
:return: argument namespace with command line options
:rtype: argparse.Namespace
"""
# Append files listed in infile_list_file to infile_list
if args.infile_list_file:
args.infile_list = read_paths_listed_in_file(args.infile,
args.infile_list_file)
else:
args.infile_list = args.infile
# Set default result file prefix
if not args.result_prefix:
args.result_prefix = args.jobname
if args.keep_fraction is not None:
args.keep = None
if args.rescore_ligand_fraction is not None:
args.num_rescore_ligand = None
host_list = jobcontrol.get_backend_host_list()
if not host_list:
host_list = [(LOCALHOST, 1)]
if args.task in [SCREEN_TASK, PILOT_TASK]:
if not args.train_host:
args.train_host = host_list[0][0]
if args.task == EVAL_TASK:
args.outfile = "{}_pred.csv".format(args.result_prefix)
if args.task == PILOT_TASK:
args.num_iter = 1
args.keep = args.pilot_size
args.num_rescore_ligand = 0
args.score_file = f"{args.jobname}_pilot_score.csv"
args.pilot_ligands_csv = f"{args.jobname}_pilot_ligands.csv"
args.mix_score_failed = False # To be changed in specific application
args.score_failed_ratio = 1 # To be changed in specific application
return args
[docs]def common_validate_args(args):
"""
Validate command-line arguments
:param args: argument namespace with command line options
:type args: argparse.Namespace
:return: (validation outcome, error message)
:rtype: (bool, str)
"""
if not args.infile_list:
msg = "Error: input ligands file is not specified"
return False, msg
msg = validate_input_files(args.infile_list, args.remote_input_ligands)
if msg:
return False, msg
if not args.remote_input_ligands:
msg = validate_input_smiles(args.infile_list, args.smi_index,
args.name_index, args.with_header)
if msg:
return False, msg
if args.keep is not None and args.keep < 1:
msg = "Error: keep: {} is smaller than 1".format(args.keep)
return False, msg
if args.keep_fraction is not None:
if not (0.0 < args.keep_fraction <= 1.0):
msg = "Error: keep_fraction: {} is <= 0 or > 1".format(
args.keep_fraction)
return False, msg
if args.num_rescore_ligand is not None and args.num_rescore_ligand < 0:
msg = "Error: number of rescore ligands: {} is smaller than " \
"0".format(args.num_rescore_ligand)
return False, msg
if args.rescore_ligand_fraction is not None:
if args.rescore_ligand_fraction > 1 or args.rescore_ligand_fraction < 0:
msg = "Error: rescore_ligand_fraction: {} is < 0 or > 1".format(
args.rescore_ligand_fraction)
return False, msg
if args.task in [SCREEN_TASK, PILOT_TASK]:
if args.train_time <= 0:
msg = "Error: train time: {} is smaller than 0".format(
args.train_time)
return False, msg
if args.train_size < MINIMAL_LIGAND_ML_TRAIN:
msg = "Error: train size ({}) is smaller than the minimal " \
"training size requirement of ligand_ml ({}) ".format(
args.train_size, MINIMAL_LIGAND_ML_TRAIN)
return False, msg
if args.num_iter < 1:
msg = "Error: iteration: {} is smaller than 1".format(args.num_iter)
return False, msg
if args.task == EVAL_TASK:
if args.model is None:
msg = "ML model file is not specified."
return False, msg
if not os.path.isfile(args.model):
msg = f"ML model file {args.model} does not exist."
return False, msg
return True, None
[docs]def common_get_job_spec_from_args(args, jsb):
if args.input_file:
al_utils.add_input_file(jsb, args.input_file)
if args.restart_file:
al_utils.add_input_file(jsb, args.restart_file)
for file in args.restart_input_files:
if os.path.isfile(file):
al_utils.add_input_file(jsb, file)
else:
if args.force_restart:
logger.warning(f"Warning: restarting file: {file} does "
f"not exist.")
else:
logger.error(f"Error: restarting file: {file} does not "
f"exist.")
sys.exit(1)
for file in args.optional_restart_files:
if os.path.isfile(file):
al_utils.add_input_file(jsb, file)
if args.infile_list_file:
al_utils.add_input_file(jsb, args.infile_list_file)
if not args.remote_input_ligands:
al_utils.add_input_file(jsb, *args.infile_list)
if args.task == SCREEN_TASK and args.score_file:
al_utils.add_input_file(jsb, args.score_file)
if args.task == SCREEN_TASK and args.pilot_score_file:
al_utils.add_input_file(jsb, args.pilot_score_file)
# This is only used for restarting the pilot job.
if args.task == PILOT_TASK and os.path.isfile(args.score_file):
al_utils.add_input_file(jsb, args.score_file)
if args.task == EVAL_TASK:
al_utils.add_input_file(jsb, args.model)
jsb.setOutputFile(args.outfile)