import csv
import heapq
import itertools
import os
import pickle
import random
import sys
import more_itertools
from schrodinger.active_learning import al_utils
from schrodinger.active_learning.al_utils import EVAL_TASK
from schrodinger.active_learning.al_utils import MINIMAL_LIGAND_ML_TRAIN
from schrodinger.active_learning.al_utils import PILOT_TASK
from schrodinger.active_learning.al_utils import TRUTH_COL
from schrodinger.job import jobcontrol
from schrodinger.job import queue
from schrodinger.utils import fileutils
from schrodinger.utils import log
logger = log.get_output_logger(__file__)
UNCERTAINTY = "uncertainty"
SCORE = "score"
[docs]def estimate_time_cost(num_ligands,
num_iter,
train_size,
train_time,
num_score_license,
num_autoqsar_license,
available_cpu=None,
score_per_ligand_cost=20,
autoqsar_per_ligand_cost=0.02,
num_rescore_ligand=0,
multiplier=1.0,
application=''):
"""
Roughly estimate the time cost a active learning job based on the
inputs and number of available licenses.
:param num_ligands: total number of ligands in the library.
:type num_ligands: int
:param num_iter: number of active learning iterations.
:type num_iter: int
:param train_size: Ligand_ML training size per iteration.
:type train_size: int
:param train_time: Ligand_ML training time per iteration in hours.
:type train_time: float
:param num_score_license: total number of the application licenses
:type num_score_license: int
:param num_autoqsar_license: total number of AutoQSAR licenses
:type num_autoqsar_license: int
:param available_cpu: number of available CPU
:type available_cpu: int
:param score_per_ligand_cost: estimate time of of single ligand scoring
time cost in second.
:type score_per_ligand_cost: float
:param autoqsar_per_ligand_cost: estimate time of of single ligand
Ligand_ML time cost in second.
:type autoqsar_per_ligand_cost: float
:param num_rescore_ligand: Number of ligands to be rescored.
:type train_size: int
:param multiplier: estimate expansion number per ligand.
:type multiplier: float
:param application: name of the application that provides score
:type application: str
:return: estimate time cost in hour
:rtype: float
"""
if num_score_license < 1:
raise ValueError(f"No available {application} license.")
if num_autoqsar_license < 1:
raise ValueError("No available DeepChem/AutoQSAR license.")
estimate_train_capacity_per_hour = 10_000
estimate_max_train_hour = 24.0
if available_cpu is not None:
num_score_job = min(available_cpu, num_score_license)
num_autoqsar_job = min(available_cpu, num_autoqsar_license)
else:
num_score_job = num_score_license
num_autoqsar_job = num_autoqsar_license
def time_per_iteration(current_iter_num):
current_train_size = train_size * current_iter_num
minimal_train_time = min(
estimate_max_train_hour,
current_train_size / estimate_train_capacity_per_hour)
training_time = max(minimal_train_time, train_time)
score_time = (multiplier * train_size * score_per_ligand_cost / 60 /
60 / num_score_job)
eval_time = (num_ligands * autoqsar_per_ligand_cost / 60 / 60 /
num_autoqsar_job)
return training_time + score_time + eval_time
rescore_time = (multiplier * num_rescore_ligand * score_per_ligand_cost /
60 / 60 / num_score_job)
total_time_cost = sum(
time_per_iteration(current_iter_num)
for current_iter_num in range(1, num_iter + 1)) + rescore_time
return total_time_cost
[docs]def get_jobdj(host_list=None):
"""
Return JobDJ with specified host list
:param host_list: A list of (<hostname>, <maximum_concurrent_subjobs>)
:type host_list: [(str, int)] or None
:return: JobDJ with specific settings.
:rtype: queue.JobDJ object
"""
return queue.JobDJ(hosts=host_list,
default_max_retries=1,
verbosity="normal",
max_failures=queue.NOLIMIT)
[docs]def get_top_ligands_from_csv_list(csv_list, output_csv, num_ligands):
"""
Get the top ligands from a list of .csv files. Write the selected
ligands to output csv file.
:param csv_list: list of .csv files containing the ligands.
:type csv_list: list(str)
:param output_csv: name of output .csv file.
:type output_csv: str
:param num_ligands: number of ligands to select.
:type num_ligands: int
"""
count = 0
with open(output_csv, "wb") as f_out:
for ligands_csv_index, ligands_csv in enumerate(csv_list):
with open(ligands_csv, "rb") as f_in:
for line_index, line in enumerate(f_in):
if line_index == 0:
if ligands_csv_index == 0:
f_out.write(line)
continue
if count < num_ligands:
f_out.write(line)
else:
break
count += 1
if count > num_ligands:
break
[docs]class ActiveLearningNode:
[docs] def __init__(self, iter_num=1, job_name="active_learning", job_dir="."):
"""
Initialize node for active learning workflow.
:param iter_num: current active learning iteration number.
:type iter_num: int
:param job_name: active learning job name.
:type job_name: str
:param job_dir: directory of where the jobs in the node will run.
:type job_dir: str
"""
self.iter_num = iter_num
self.job_dir = job_dir
self.input_for_next_node = None
self.node_name = self.getName(self.iter_num)
self.job_name = job_name + "_" + self.node_name
self.restart_files = []
self.optional_restart_files = []
self.log_file = None
self.time_cost = (0, 0, 0) # (hours, minutes, seconds)
[docs] @classmethod
def getName(cls, iter_num):
return f"{cls.__name__}_iter_{iter_num}"
[docs] def addOptionalRestartFiles(self, active_learning_job):
"""
Add node's optional restart file(s) to driver's restart dict. Dump
the restart dict to the restart .pkl file.
:param active_learning_job: current AL driver
:type active_learning_job: ActiveLearningJob instance
"""
al_utils.add_output_file(*self.optional_restart_files)
with open(active_learning_job.restart_file, 'wb') as f:
active_learning_job.restart_dict["optional_restart_files"] = \
self.optional_restart_files
pickle.dump(active_learning_job.restart_dict, f, 0)
al_utils.add_output_file(active_learning_job.restart_file)
[docs]class PrepareSmilesNode(ActiveLearningNode):
[docs] def __init__(self, args, iter_num, job_name, job_dir):
"""
Initialize node for selecting ligands (SMILES) to be scored by ScoreProviderNode.
"""
super().__init__(iter_num, job_name, job_dir)
self.batch_size = args.train_size
self.random_seed = args.random_seed
self.csv_list = None
if args.selection_rule == "random":
self.select = self.randomSelect
elif args.selection_rule == "diversity":
self.select = self.diversitySelect
elif args.selection_rule == "most_uncertain":
# We do random selection for the first iteration since we do not
# have the uncertain value yet.
if iter_num == 1:
self.select = self.randomSelect
else:
self.select = self.uncertaintySelect
else:
raise ValueError(f"Unknown selection rule"
f" {args.selection_rule}")
[docs] @staticmethod
def readScoredLigands(scored_csv_file_list):
"""
Read the ligands that were already scored by ScoreProviderNode.
:param scored_csv_file_list: list of ligand_ml training .csv files.
:type scored_csv_file_list: list(str)
:return: set of titles of the scored ligands.
:rtype: set(str)
"""
scored_ligands = set()
for csv_file in scored_csv_file_list:
reader = al_utils.my_csv_reader(csv_file)
# In writeScoreCsv we put title in the third column.
scored_ligands.update({x[2] for x in reader})
return scored_ligands
[docs] def checkOutcome(self, smi_file):
"""
Validate the generated SMILES file.
:param smi_file: name of SMILES file to be validated.
:type smi_file: str
"""
if not os.path.isfile(smi_file):
logger.error(f"Error: Failed to generate SMILES file {smi_file} "
f"for running iteration {self.iter_num} scoring job")
sys.exit(1)
num_ligands = fileutils.count_lines(smi_file)
if num_ligands == 0:
logger.error(f"Error: Failed to select any ligands "
f"iteration {self.iter_num} training.")
sys.exit(1)
if self.iter_num == 1 and num_ligands < MINIMAL_LIGAND_ML_TRAIN:
logger.error(f"Error: Only selected {num_ligands} ligands "
f"for training. ligand_ml requires at least "
f"{MINIMAL_LIGAND_ML_TRAIN} training ligands.")
sys.exit(1)
[docs] @al_utils.node_run_timer
def runNode(self,
csv_list,
active_learning_job,
smi_file_name=None,
**kwargs):
"""
Select ligands to be scored.
:param csv_list: list of csv files that contain candidate ligands.
:type csv_list: list(str)
:param active_learning_job: current active learning job.
:type active_learning_job: ActiveLearningJob instance.
:param smi_file_name: SMILES file name that contains selected ligands.
:type smi_file_name: str
"""
self.csv_list = csv_list
scored_csv_file_list = active_learning_job.scored_csv_file_list
if smi_file_name is None:
smi_file_name = os.path.join(self.job_dir, self.job_name + ".smi")
# Selecting from the whole library in first round
if self.iter_num == 1:
self.select(smi_file_name,
scored_csv_file_list,
sample_size=self.batch_size,
sort=False)
# Selecting from the top ligands predicted by the ML Model generated
# In previous iteration.
else:
y_index = None
if self.select == self.uncertaintySelect:
with open(self.csv_list[0], "r", newline='') as f:
reader = csv.reader(f)
header = next(reader)
y_index = header.index(UNCERTAINTY)
self.select(smi_file_name,
scored_csv_file_list,
sample_size=self.batch_size,
y_index=y_index,
sort=True)
self.checkOutcome(smi_file_name)
al_utils.add_output_file(smi_file_name)
self.restart_files.append(smi_file_name)
self.input_for_next_node = {"smi_file_name": smi_file_name}
[docs] def uncertaintySelect(self,
smi_file_name,
scored_csv_file_list,
sample_size,
y_index=None,
**kwargs):
"""
Select random ligands from initial input csv or ligands with
largest uncertainty from sorted ligand_ml .csv output.
:param smi_file_name: SMILES file name that contains selected ligands.
:type smi_file_name: str
:param scored_csv_file_list: list of ligand_ml training .csv file.
:type scored_csv_file_list: list(str)
:param sample_size: number of ligands to be sampled.
:type sample_size: int
:param y_index: column index of values to be sorted.
:type y_index: int
"""
# In the random_split(), we always put SMILES and name as first column
# and second column in the sub .csv files.
smi_index = 0
name_index = 1
scored_ligands = self.readScoredLigands(scored_csv_file_list)
reader_list = [
al_utils.my_csv_reader(filename) for filename in self.csv_list
]
smi_writer = open(smi_file_name, "w")
selected_smi = 0
# Select ligands based on y_index
merged = heapq.merge(*reader_list,
key=lambda x: float(x[y_index]),
reverse=True)
for result in merged:
if result[name_index] not in scored_ligands:
smi_writer.write("{} {}\n".format(result[smi_index],
result[name_index]))
selected_smi += 1
if selected_smi >= sample_size:
break
smi_writer.close()
[docs] def randomSelect(self,
smi_file_name,
scored_csv_file_list,
sample_size,
sort=True,
**kwargs):
"""
Select sample_size random ligands from input csv file(s).
:param smi_file_name: SMILES file name that contains selected ligands.
:type smi_file_name: str
:param scored_csv_file_list: list of ligand_ml training .csv file.
:type scored_csv_file_list: list(str)
:param sample_size: number of ligands to be sampled.
:type sample_size: int
:param sort: Whether the csv files were sorted or initial inputs.
:type sort: bool
"""
smi_index = 0
name_index = 1
scored_ligands = self.readScoredLigands(scored_csv_file_list)
reader_list = [
al_utils.my_csv_reader(filename) for filename in self.csv_list
]
smi_writer = open(smi_file_name, "w")
# Ligands were sorted, so we need to sample randomly.
if sort:
rand = random.Random(self.random_seed)
selected_ligands = []
current_count = 0
# reservoir sampling
for reader in reader_list:
for result in reader:
if result[name_index] in scored_ligands:
continue
current_count += 1
if current_count <= sample_size:
selected_ligands.append(
(result[smi_index], result[name_index]))
else:
x = rand.randint(1, current_count)
if x < sample_size:
selected_ligands[x] = (result[smi_index],
result[name_index])
for smi, name in selected_ligands:
smi_writer.write("{} {}\n".format(smi, name))
# We only need to use roundrobin to select random ligands since we
# already randomized the ligands when creating the sub csv files.
else:
rr = more_itertools.roundrobin(*reader_list)
for result in itertools.islice(rr, sample_size):
smi_writer.write("{} {}\n".format(result[smi_index],
result[name_index]))
smi_writer.close()
[docs] def diversitySelect(self,
smi_file_name,
scored_csv_file_list,
sample_size,
sort=True,
**kwargs):
"""
Use combinatorial_diversity to select diverse ligands from input csv or
sorted ligand_ml .csv output.
:param smi_file_name: SMILES file name that contains selected ligands.
:type smi_file_name: str
:param scored_csv_file_list: list of ligand_ml training .csv file.
:type scored_csv_file_list: list(str)
:param sample_size: number of ligands to be sampled.
:type sample_size: int
:param sort: Whether the csv files were sorted or initial inputs..
:type sort: bool
"""
tmp_smi_file_name = os.path.join(
self.job_dir, self.job_name + "_random_for_diversity.smi")
max_diversity_sample_size = 10_000_000
max_diversity_selection_cpu = 100
self.randomSelect(tmp_smi_file_name, scored_csv_file_list,
max_diversity_sample_size, sort)
if sample_size >= fileutils.count_lines(tmp_smi_file_name):
fileutils.force_rename(tmp_smi_file_name, smi_file_name)
return
host, ncpu = al_utils.get_host_ncpu()
ncpu = min(max_diversity_selection_cpu, ncpu)
logger.info(f"Diversity selection will run with {ncpu} CPU(s)")
cmd = [
"utilities/combinatorial_diversity",
os.path.basename(tmp_smi_file_name),
str(sample_size)
]
cmd += ["-out", os.path.basename(smi_file_name), "-no3d", "-nosplit"]
cmd += ["-JOBNAME", self.job_name, "-HOST", host + ":" + str(ncpu)]
cmd += ["-DRIVERHOST", "localhost"]
self.log_file = os.path.join(self.job_dir, self.job_name + ".log")
diversity_job = jobcontrol.launch_job(cmd, launch_dir=self.job_dir)
diversity_job.wait()
[docs]class ScoreProviderNode(ActiveLearningNode):
[docs] def __init__(self, iter_num, job_name, job_dir):
"""
Initialize node for obtaining the score of each ligand (SMILES).
"""
super().__init__(iter_num, job_name, job_dir)
self.smi_file_name = None
self.score_csv_file = None
self.known_title_to_score = None
[docs] def checkOutcome(self, score_csv_file):
"""
Validate the .csv score file.
:param score_csv_file: name of generated .csv score file.
:type score_csv_file: str
"""
if not os.path.isfile(score_csv_file):
logger.error(f"Error: Failed to generate the .csv score file for "
f"iteration {self.iter_num} training ligands.")
sys.exit(1)
num_ligands = fileutils.count_lines(score_csv_file) - 1
if num_ligands <= 0:
logger.error(f"Error: Failed to generate any score for "
f"iteration {self.iter_num} training ligands.")
sys.exit(1)
if self.iter_num == 1 and num_ligands < MINIMAL_LIGAND_ML_TRAIN:
logger.error(f"Error: Only {num_ligands} ligands produced "
f"score. ligand_ml requires at least "
f"{MINIMAL_LIGAND_ML_TRAIN} training ligands.")
sys.exit(1)
[docs] def writeScoreCsv(self, title_to_score, output_csv):
"""
Write score to .csv file that ligand_ml needs for training
:param title_to_score: dict that maps ligand title to score
:type title_to_score: defaultdict(lambda : BAD_SCORE)
:param output_csv: ligand_ml training .csv file.
:param output_csv: str
"""
with open(self.smi_file_name, "r") as f:
smi_lines = f.readlines()
with open(output_csv, "w", newline='') as csv_f:
csv_writer = csv.writer(csv_f)
csv_writer.writerow(["SMILES", TRUTH_COL, "Title"])
for smi_line in smi_lines:
smi_title = al_utils.split_smi_line(smi_line)
if not smi_title:
continue
smi, title = smi_title
csv_writer.writerow([smi, title_to_score[title], title])
[docs]class KnownScoreProviderNode(ScoreProviderNode):
"""
Class for obtaining the scores from external .csv file.
This class is only used for the purpose of testing the performance active learning workflow.
"""
[docs] def __init__(self, args, iter_num, job_name, job_dir):
super().__init__(iter_num, job_name, job_dir)
[docs] @al_utils.node_run_timer
def runNode(self, smi_file_name, active_learning_job, score_csv_file=None):
"""
Read scores from active_learning_job.known_title_to_score.
:param smi_file_name: SMILES file that contains the ligands to be scored.
:type smi_file_name: str
:param active_learning_job: current active learning job.
:type active_learning_job: ActiveLearningJob instance.
:param score_csv_file: ligand_ml training .csv file.
:type score_csv_file: str
"""
self.smi_file_name = smi_file_name
known_title_to_score = active_learning_job.known_title_to_score
if score_csv_file is None:
score_csv_file = os.path.join(
self.job_dir, "score_iter_{}_result.csv".format(self.iter_num))
self.writeScoreCsv(known_title_to_score, score_csv_file)
self.checkOutcome(score_csv_file)
al_utils.add_output_file(score_csv_file)
self.restart_files.append(score_csv_file)
self.input_for_next_node = {}
self.score_csv_file = score_csv_file
[docs]class LigandMLTrainNode(ActiveLearningNode):
"""
Class for ligand_ml model generation.
"""
[docs] def __init__(self, args, iter_num, job_name, job_dir):
super().__init__(iter_num, job_name, job_dir)
self.combined_train_csv_file = os.path.join(
job_dir, "ligand_ml_combined_training.csv")
self.train_csv_file_list = None
self.train_host = args.train_host
self.num_train_core = args.num_train_core
self.train_time = args.train_time
self.random_seed = args.random_seed
self.chosen_models = args.chosen_models
self.max_iter = args.num_iter
self.mix_score_failed = args.mix_score_failed
self.score_failed_ratio = args.score_failed_ratio
[docs] def checkOutcome(self, model_file):
"""
Check whether ligand_ml model exist.
:param model_file: name of ligand_ml .qzip model file
:type model_file: str
"""
if not os.path.isfile(model_file):
logger.error(f"Error: Failed to generate machine learning model "
f"for iteration {self.iter_num}")
sys.exit(1)
[docs] def createTrainingCsvFile(self, discard_cutoff, ascending=True):
"""
Generate .csv file for ligand_ml training
:param discard_cutoff: score cutoff for excluding the ligands in ML
training set.
:type discard_cutoff: float
:param ascending: lower value means better ligand if ascending is True
:type ascending: bool
Generate training .csv file for ligand_ml model generation.
"""
scored_smi_title_to_score = {}
failed_smi_title_to_score = {}
for score_file in self.train_csv_file_list:
with open(score_file, "r", newline='') as f_in:
score_reader = csv.reader(f_in)
header = next(score_reader)
for smi, score, title in score_reader:
try:
score = float(score)
except ValueError:
continue
score_decorated = score if ascending else -score
discard_cutoff_decorated = discard_cutoff if ascending \
else -discard_cutoff
if score_decorated < discard_cutoff_decorated:
scored_smi_title_to_score[(smi, title)] = score
else:
failed_smi_title_to_score[(smi, title)] = score
train_smi_title_to_score = scored_smi_title_to_score.copy()
# Add failed scored ligands such as undockable ligands to training set.
if self.mix_score_failed:
if self.iter_num == self.max_iter or \
(not failed_smi_title_to_score):
pass
else:
num_scored = len(scored_smi_title_to_score)
num_selected_score_failed = min(
int(num_scored * self.score_failed_ratio),
len(failed_smi_title_to_score))
failed_selected = random.sample(
failed_smi_title_to_score.keys(), num_selected_score_failed)
failed_selected_dict = {
x: discard_cutoff for x in failed_selected
}
train_smi_title_to_score.update(failed_selected_dict)
if len(train_smi_title_to_score) < MINIMAL_LIGAND_ML_TRAIN:
logger.error(f"Error: Only selected "
f"{len(train_smi_title_to_score)} "
f"ligands for training. ligand_ml requires at least "
f"{MINIMAL_LIGAND_ML_TRAIN} training ligands.")
sys.exit(1)
with open(self.combined_train_csv_file, "w", newline='') as f_out:
writer = csv.writer(f_out)
writer.writerow(header)
for (smi, title), score in train_smi_title_to_score.items():
writer.writerow([smi, score, title])
[docs] @al_utils.node_run_timer
def runNode(self, active_learning_job):
"""
Perform ligand_ml training with all the scored ligands.
:param active_learning_job: current active learning job.
:type active_learning_job: ActiveLearningJob instance.
"""
from schrodinger.active_learning.al_report import make_train_report
self.train_csv_file_list = active_learning_job.scored_csv_file_list
self.createTrainingCsvFile(active_learning_job.discard_cutoff,
active_learning_job.ascending)
al_utils.add_output_file(self.combined_train_csv_file)
csv_file_abspath = os.path.abspath(self.combined_train_csv_file)
model_file = os.path.join(self.job_dir, self.job_name + ".qzip")
sub_model_list = []
subjob_log_list = []
jobdj = get_jobdj()
for model_index in range(0, self.num_train_core):
sub_ligand_ml_model = f"{self.job_name}_sub_model_" \
f"{model_index}.tar.gz"
input_basename = fileutils.get_jobname(csv_file_abspath)
model_basename = fileutils.get_jobname(sub_ligand_ml_model)
subjob_name = f"{model_basename}_{input_basename}_build"
subjob_log = os.path.join(self.job_dir, subjob_name + ".log")
subjob_log_list.append(subjob_log)
cmd = ["run", "al_ligand_ml_worker.py", "build"]
cmd += [
"-model", sub_ligand_ml_model, "-input_csv", csv_file_abspath,
"-task_type", "regression", "-target_col", TRUTH_COL,
"-train_time",
str(self.train_time)
]
if self.chosen_models is not None:
cmd += ["-chosen_models", self.chosen_models]
if self.random_seed is not None:
cmd += ["-seed", str(self.random_seed)]
cmd += ["-HOST", f"{self.train_host}:1"]
sub_model_list.append(sub_ligand_ml_model)
al_utils.add_output_file(subjob_log)
jobdj.addJob(cmd, command_dir=self.job_dir)
self.log_file = os.path.join(self.job_dir, self.job_name + "_build.log")
jobdj.run()
al_utils.merge_ligand_ml_models(sub_model_list, model_file,
self.job_dir)
self.checkOutcome(model_file)
self.restart_files.append(model_file)
fileutils.force_remove(*[
os.path.join(self.job_dir, sub_model)
for sub_model in sub_model_list
])
try:
report_pdf = os.path.join(
self.job_dir, f"ML_model_iter_"
f"{self.iter_num}_testset_metric.pdf")
make_train_report(model_file, report_pdf, self.iter_num)
al_utils.add_output_file(report_pdf)
except RuntimeError:
logger.warning(f"Failed to generate metric file for iteration "
f"{self.iter_num} machine learning model.")
al_utils.add_output_file(model_file)
al_utils.add_output_file(*subjob_log_list)
self.input_for_next_node = {"model_file": model_file}
[docs]class LigandMLEvalNode(ActiveLearningNode):
"""
Class for performing ligand_ml prediction with generated model.
"""
[docs] def __init__(self, args, iter_num, job_name, job_dir):
super().__init__(iter_num, job_name, job_dir)
self.model_file = None
self.eval_csv_list = None
self.uncertainty_sample_ratio = args.uncertainty_sample_ratio
self.result_prefix = args.result_prefix
self.keep = args.keep
self.task = args.task
self.sub_csv_header = None
self.max_num_subjobs = args.max_ml_eval_cpu
[docs] def getBestResults(self, file_list, outfile, ascending=True):
"""
Get the best ligands (with lowest score) predicted by ligand_ml.
:param file_list: list of ligand_ml .csv output files.
Each file is sorted by ligand_ml prediction score.
:type file_list: list(str)
:param outfile: .csv file that contains best ligands.
:type outfile: str
:param ascending: lower value means better ligand if ascending is True
:type ascending: bool
"""
with open(file_list[0], "r", newline='') as f:
reader = csv.reader(f)
header = next(reader)
y_index = header.index(SCORE)
# Keep the last two columns (score, uncertainty) of ligand_ml .csv output file.
result_header = self.sub_csv_header + header[-2:]
reader_list = [
al_utils.my_csv_reader(filename) for filename in file_list
]
with open(outfile, "w", newline='') as result_f:
result_writer = csv.writer(result_f)
result_writer.writerow(result_header)
merged = heapq.merge(*reader_list,
key=lambda x: float(x[y_index]),
reverse=not ascending)
for result in itertools.islice(merged, self.keep):
result_writer.writerow(result)
[docs] def checkOutcome(self, pred_csv_list, uncertain_csv_list):
"""
Check the existence of ligand_ml prediction files.
:param pred_csv: list of ligand_ml prediction csv file(s)
:type pred_csv: list(str)
:param uncertain_csv: list of ligand_ml prediction with uncertainty
csv file(s).
:type uncertain_csv: list(str)
"""
if not (pred_csv_list and uncertain_csv_list):
logger.error("Error: Failed to generate machine learning "
"prediction for any ligand")
sys.exit(1)
for csv_file in pred_csv_list + uncertain_csv_list:
if not os.path.isfile(csv_file):
logger.error(f"Error: Prediction result file: {csv_file} "
f"does not exist.")
[docs] @al_utils.node_run_timer
def runNode(self, model_file, active_learning_job):
"""
Use the trained model to evaluate all the ligands.
:param model_file: ligand_ml .qzip model file.
:param model_file: str
:param active_learning_job: current active learning job.
:type active_learning_job: ActiveLearningJob instance.
"""
self.model_file = model_file
self.eval_csv_list = active_learning_job.sub_infile_list
if self.task == PILOT_TASK:
self.eval_csv_list, _ = al_utils.random_split(
[active_learning_job.args.pilot_ligands_csv],
active_learning_job.args.pilot_size,
prefix="pilot_eval_split",
block_size=active_learning_job.args.block_size,
name_index=1,
smi_index=0)
self.sub_csv_header = active_learning_job.sub_csv_header
pred_csv_list = []
pred_csv_uncertain_sorted_list = []
output_csv = "{}_pred_iter_{}.csv".format(self.result_prefix,
self.iter_num)
if self.task == EVAL_TASK:
output_csv = "{}_pred.csv".format(self.result_prefix)
host, ncpu = al_utils.get_host_ncpu()
max_subjobs = min(ncpu, self.max_num_subjobs) if \
self.max_num_subjobs else ncpu
jobdj = get_jobdj(host_list=[(host, max_subjobs)])
logger.info(f"ML evaluation jobs will run with {max_subjobs} CPU("
f"s)")
tar_model_file = \
al_utils.convert_ligand_ml_model_format(self.model_file)
with open(self.eval_csv_list[0], 'r', newline='') as f_in:
reader = csv.reader(f_in)
header = next(reader)
for eval_csv in self.eval_csv_list:
eval_csv_basename = fileutils.get_jobname(eval_csv)
pred_csv = "{}_iter_{}_pred.csv".format(eval_csv_basename,
self.iter_num)
sorted_by_uncertain = "{}_iter_{}_pred_uncertain_sorted.csv".format(
eval_csv_basename, self.iter_num)
cmd = [
"run", "al_ligand_ml_worker.py", "evaluate", "-model",
os.path.basename(tar_model_file), "-smiles_col", header[0]
]
cmd += [
"-input_csv", eval_csv, "-output_csv", pred_csv,
"-sorted_by_uncertainy_csv", sorted_by_uncertain,
"-uncertainty_sample_ratio",
str(self.uncertainty_sample_ratio)
]
if not active_learning_job.ascending:
cmd += ["-descending"]
jobdj.addJob(cmd, command_dir=self.job_dir)
pred_csv_list.append(os.path.join(self.job_dir, pred_csv))
pred_csv_uncertain_sorted_list.append(
os.path.join(self.job_dir, sorted_by_uncertain))
jobdj.run()
pred_csv_finished_list = []
pred_csv_uncertain_sorted_finished_list = []
for pred_csv, pred_csv_uncertain_sorted in zip(
pred_csv_list, pred_csv_uncertain_sorted_list):
if os.path.isfile(pred_csv):
pred_csv_finished_list.append(pred_csv)
else:
logger.warning(f"Cannot locate Ligand_ML prediction file "
f"{pred_csv}")
if os.path.isfile(pred_csv_uncertain_sorted):
pred_csv_uncertain_sorted_finished_list.append(
pred_csv_uncertain_sorted)
else:
logger.warning(f"Cannot locate Ligand_ML prediction file "
f"{pred_csv_uncertain_sorted}")
self.checkOutcome(pred_csv_finished_list,
pred_csv_uncertain_sorted_finished_list)
self.getBestResults(pred_csv_finished_list, output_csv,
active_learning_job.ascending)
if self.task == EVAL_TASK:
al_utils.add_output_file(output_csv)
self.restart_files.append(output_csv)
self.input_for_next_node = {"ligands_csv": output_csv}
else:
al_utils.add_output_file(output_csv,
*pred_csv_uncertain_sorted_finished_list)
self.restart_files.append(output_csv)
self.restart_files += pred_csv_uncertain_sorted_finished_list
self.input_for_next_node = {
"ligands_csv": output_csv,
"csv_list": pred_csv_uncertain_sorted_finished_list
}
try:
fileutils.force_remove(*pred_csv_finished_list)
except OSError:
pass
[docs]class ActiveLearningNodeSupplier:
[docs] def __init__(self,
calculate_score_node,
pilot_score_node,
rescore_node,
score_provider_node=ScoreProviderNode,
prepare_smi_node=PrepareSmilesNode,
known_score_provider_node=KnownScoreProviderNode,
ligand_ml_train_node=LigandMLTrainNode,
ligand_ml_eval_node=LigandMLEvalNode):
self.calculate_score_node = calculate_score_node
self.pilot_score_node = pilot_score_node
self.rescore_node = rescore_node
self.score_provider_node = score_provider_node
self.prepare_smi_node = prepare_smi_node
self.known_score_provider_node = known_score_provider_node
self.ligand_ml_train_node = ligand_ml_train_node
self.ligand_ml_eval_node = ligand_ml_eval_node