"""
Copyright Schrodinger, LLC. All rights reserved.
This is a common module for parallel job execution.
It provides functionalities to configure and run parallel jobs.
Currently this script is used by epik_driver, bmin_driver, and ligprep_driver.
"""
# Maintainer: Hiral Oza
import datetime
import gzip
import os
import pickle
import re
import sys
import tarfile
from past.utils import old_div
from shutil import copy as shutil_copy
from shutil import rmtree as shutil_rmtree
from schrodinger import structure
from schrodinger.job import jobcontrol
from schrodinger.job import queue as jobqueue
from schrodinger.utils import deprecation
from schrodinger.utils import fileutils
############################## Global variables ###############################
(NJOBS, JOBCTS, FIRSTCT, LASTCT, RUN_JOBS, STRICT_END, MAX_RETRIES, OUTPUT_ORG,
NO_EXECUTION, RESTART, NC) = list(range(0, 11))
###############################################################################
###############################################################################
######################### Common functions ####################################
###############################################################################
[docs]def add_para_job_options(parser, options=None):
"""
Adds common para job control options to a SingleDashOptionParser instance.
:type parser: SingleDashOptionParser
:param parser: Instance of SingleDashOptionParser
:type options: List
:param options: List of module enums that indicate what options to add to
the parser.
"""
if not options:
options = [
NJOBS, JOBCTS, FIRSTCT, LASTCT, RUN_JOBS, STRICT_END, MAX_RETRIES,
OUTPUT_ORG, NO_EXECUTION, RESTART, NC
]
if NJOBS in options:
#Number of subjobs to prepare
parser.add_option("-NJOBS",
"-nprocs",
action="store",
type="int",
dest="njobs",
default=1)
if JOBCTS in options:
#Max number of cts
parser.add_option('-JOBCTS',
action="store",
type="int",
dest="jobcts",
default=10000)
if FIRSTCT in options:
#First structure to process
parser.add_option('-first', action="store", type="int", dest="firstct")
if LASTCT in options:
#Last structure to process
parser.add_option('-last', action="store", type="int", dest="lastct")
if RUN_JOBS in options:
#Run specified subjob
parser.add_option('-j', action="store", dest="run_jobs")
if STRICT_END in options:
#Job die if any subjob dies
parser.add_option("-STRICT_END",
action="store_true",
dest="strict_end",
default=False)
if MAX_RETRIES in options:
#Number of allowed retries per subjob
parser.add_option('-MAX_RETRIES',
action="store",
type="int",
default=2,
dest="max_retries")
if OUTPUT_ORG in options:
#Organization output
parser.add_option("-OUTPUT_ORG", action="store", dest="output_org")
if NO_EXECUTION in options:
#No execution
parser.add_option("-nx",
action="store_true",
dest="no_execution",
default=False)
if RESTART in options:
#Restart failed parent job
parser.add_option("-RESTART",
action="store_true",
dest="restart",
default=False)
if NC in options:
# Cleanup subdirectories for each subjob
parser.add_option("-nc",
"-NC",
action="store_true",
dest="nocleanup",
default=False)
[docs]def validate_options(options):
"""
Validate the job control options
:type options: Instance
:param options: object containing values of para job options
"""
if hasattr(options, 'njobs') and options.njobs < 1:
print("Error: -NJOBS/-nprocs option (%i) requests less than 1 job." % \
options.njobs)
sys.exit(1)
if hasattr(options, 'jobcts') and options.jobcts < 1:
print("Error: -JOBCTS option (%i) requests less than 1 ct structure " \
"per job." % options.jobcts)
sys.exit(1)
if hasattr(options, 'firstct') and options.firstct:
if options.firstct < 1:
print("Error: Specified first structure to process (%i) is less " \
"than 1." % options.firstct)
sys.exit(1)
if options.debug:
print("First structure to process set to :", \
options.firstct)
if hasattr(options, 'lastct') and options.lastct:
if options.lastct < 1:
print("Error: Specified last structure to process (%i) is less " \
"than 1." % options.lastct)
sys.exit(1)
if options.debug:
print("Last structure to process set to :", \
options.lastct)
if hasattr(options, 'firstct') and \
hasattr(options, 'lastct') and \
options.firstct and options.lastct and options.lastct < options.firstct:
print("Error: Specified last structure to process (%i) is less than " \
"that specified for the first structure (%i)." % (options.lastct,
options.firstct))
sys.exit(1)
if hasattr(options, 'output_org') and\
options.output_org and options.output_org != "BY_SUBJOB":
print("Error: If -OUTPUT_ORG is specified, it must be set to BY_SUBJOB")
sys.exit(1)
[docs]def print_job_info(argv, driver_script, version):
"""
Check command line arguments passed and environment variables based on the
driver.
:type argv: String
:param argv: Command line arguments
:type driver_script: String
:param driver_script: Driver script name
:type version: String
:param version: Driver script version
"""
if len(argv) < 2:
print("%s received no arguments.\n" % driver_script)
sys.exit(1)
print("%s. Copyright (c) Schrodinger, LLC.\n" % driver_script)
backend_instance = jobcontrol.get_backend()
if backend_instance:
backend_job = backend_instance.getJob()
print("Job id: %s" % backend_job.job_id)
local = (backend_job.JobDir == backend_job.Dir)
print('Launch directory:', backend_job.Dir)
print('Job directory:', backend_job.JobDir)
else:
local = True
print("Job directory: " + os.getcwd())
print('Running locally:', local)
print("======================================")
print("Running", driver_script)
print(_timestamp())
print("Version:", version)
print("Command:", ' '.join(sys.argv))
print(".....................................\n")
return backend_instance
def _subjob_input_id(text):
"""
To be used for the subjob input names sorting.
"""
m = re.match(r'^.*_subjob_(\d+)(\..*)?$', text)
assert m is not None
return int(m.group(1))
def _append_st_file(st_file, st_writer, raw_append=False):
"""
Append structure files.
:type st_file: String
:param st_file: Structure file name
:type raw_append: Boolean
:param raw_append: Indicates to append file using raw I/O
"""
try:
if raw_append:
st_writer.write(open(st_file, 'rb').read())
else:
for st in structure.StructureReader(st_file):
st_writer.append(st)
except:
print("Error: appending structure output file.")
def _print_subjob_logfile(log_file="", start_msg=None, end_msg=None):
"""
Prints given log file with start and end messages.
:type log_file: String
:param log_file: Log file name. Print the content of log_file, if specified
:type start_msg: String
:param start_msg: Message to place at the start of the log file
:type end_msg: String
:param end_msg: Message to place at the end of the log file
"""
if start_msg:
print(start_msg)
if log_file:
in_fh = open(log_file)
for line in in_fh:
print(line, end=' ') # trailing comma replacement for line.strip()
in_fh.close()
if end_msg:
print(end_msg)
def _timestamp():
now = datetime.datetime.now()
return now.strftime("%a, %d %b %Y %H:%M:%S.%f")
def _merge_subjobs(backend_instance, subjob_input_mae_files, output_file,
output_org, data):
"""
Merge subjobs structure and log files.
:type backend_instance: _Backend
:param backend_instance: Instance of the _Backend
:type subjob_input_mae_files: List
:param subjob_input_mae_files: List of subjob files with parent directory
:type output_file: String
:param output_file: Job output file name
:type output_org: String
:param output_org: Produce more than one output structure file
"""
msg1 = "###########################################\n" \
"###########################################\n" \
" Organizing Output Files: \n"
start_msg = "...........................................\n" \
" Collecting Results from Job: %s \n" \
" ------------------------------------- \n"
msg2 = "End of Log files for subjobs. \n\n" \
"-------------------------------------------\n\n" \
"All structure file output placed in one output " \
"structure file: %s.\n" \
"Number of structures placed in %s: %d.\n"\
"###########################################\n" \
"%s\nJob Complete"
end_msg = "\n ------------------------------------- \n" \
"...........................................\n"
base, ext = fileutils.splitext(output_file)
st_writer = open(output_file, "wb")
raw_append = True
tar = tarfile.open(base + '.tar.gz', "w:gz")
nfiles = len(subjob_input_mae_files)
for count, file_path in enumerate(subjob_input_mae_files):
subjobdir = os.path.dirname(file_path)
tar.add(subjobdir)
# Collect structures
st_file = os.path.join(subjobdir, subjobdir + ext)
if os.path.isfile(st_file):
if backend_instance and output_org:
# FIXME: we tried with backend.addOutputfile() but it couldn't
# work so now coping subjob output files individually from
# job_dir to launch_dir
try:
shutil_copy(
os.path.join(backend_instance.getJob().JobDir, st_file),
backend_instance.getJob().Dir)
except:
pass
_append_st_file(st_file, st_writer, raw_append)
# Collect log
tmp1_msg = start_msg % subjobdir
if count == 0:
tmp1_msg = msg1 + tmp1_msg
tmp2_msg = end_msg
log_file = os.path.join(subjobdir, subjobdir + ".log")
if os.path.isfile(log_file):
_print_subjob_logfile(log_file,
start_msg=tmp1_msg,
end_msg=tmp2_msg)
tmp1_msg = ""
st_writer.close()
tar.close()
frmt = None
if 'output_format' in data:
frmt = data['output_format']
if frmt == 'mae':
frmt = 'maestro'
tmp2_msg = end_msg + msg2 % (output_file, output_file,
structure.count_structures(output_file),
_timestamp())
_print_subjob_logfile(start_msg=tmp1_msg, end_msg=tmp2_msg)
def _get_jobs_to_run(run_jobs_str, njobs):
"""
Return the list of jobs to run.
:type run_jobs_str: String
:param run_jobs_str: String of ':' or ',' separated numbers
:type njobs: Number
:param njobs: Total number of jobs requested
"""
run_jobs = []
def validate_or_add_num(str, range_check=True):
try:
num = int(str)
except:
print("Error: Wrong job number '%s' specified." % str)
sys.exit(1)
if range_check:
if num > 0:
run_jobs.append(num)
else:
print("Warning: The specified job number %i is less than 0. " \
"Skipping this job." % job_num)
return num
if run_jobs_str:
for item in run_jobs_str.strip().split(','):
item = item.strip()
tmp_list = item.split(':')
if not tmp_list or len(tmp_list) > 2:
print("Warning: can not identify the job '%s'. Skipping." %
item)
continue
if len(tmp_list) == 2:
start = validate_or_add_num(tmp_list[0].strip(),
range_check=False)
end = validate_or_add_num(tmp_list[1].strip(),
range_check=False)
for i in range(start, end + 1):
validate_or_add_num(i)
else:
validate_or_add_num(item)
# Remove duplicates if any
run_jobs = list(set(run_jobs))
if not run_jobs:
print("Error: Specified job numbers '%s' are not valid." % \
run_jobs_str)
sys.exit(1)
else:
# Sort list
run_jobs.sort()
tmp_list = []
for job_num in run_jobs:
if job_num > njobs:
print("Warning: The specified job number %i is greater " \
"than the number of jobs (%i). Skipping this job." \
% (job_num, njobs))
else:
tmp_list += [job_num]
run_jobs = tmp_list
print("Only running subjobs: %s" % \
','.join(["%s" % j for j in run_jobs]))
return run_jobs
def _generate_subjobs(job_input_file,
job_output_file,
options,
driver_script,
split_input_file=True):
"""
Creates the subjob dirs and fill in data
:type job_input_file: String
:param job_input_file: Job input file
:type job_output_file: String
:param job_output_file: Job output file
:type options: Instance
:param options: object containing values of para job options
"""
in_file_base, in_file_ext = fileutils.splitext(job_input_file)
in_file_base = fileutils.get_basename(job_input_file)
out_file_base, out_file_ext = fileutils.splitext(job_output_file)
subjob_input_dat = {}
if split_input_file:
nct_in = structure.count_structures(job_input_file)
if options.firstct:
print("First structure to process set to :", \
options.firstct)
else:
options.firstct = 1
if options.lastct:
if options.lastct < nct_in:
print("Number of structures present in input structure file:", \
nct_in)
print("Number of last structure to be processed :", \
options.lastct)
elif options.lastct > nct_in:
print("Number of structures present in input structure file:", \
nct_in)
print("Number of last structure to be processed :", \
options.lastct)
print("Requested number of last structure to process exceeds " \
"number present.")
print("Reseting number of last structure to process to number " \
"present (%i)." % nct_in)
options.lastct = nct_in
else:
options.lastct = nct_in
nct_use = options.lastct - options.firstct + 1
print("Number of structures to process :", nct_use)
if nct_use < options.njobs:
print("More jobs requested (%i) than number of structures to " \
"process (%i)." % (options.njobs, nct_use))
options.njobs = nct_use
print("Number of jobs set to %i." % options.njobs)
else:
tmp_njobs = 0
if nct_use % options.jobcts != 0:
tmp_njobs = int(
old_div(
(nct_use + (options.jobcts - nct_use % options.jobcts)),
options.jobcts))
else:
tmp_njobs = int(old_div(nct_use, options.jobcts))
if tmp_njobs > options.njobs:
print("Number of jobs requested (%i) requires that more than\n" \
" JOBCTS (%i) structures be processed by each subjob." \
% (options.njobs, options.jobcts))
print("Number of jobs increased to: %i" % tmp_njobs)
options.njobs = tmp_njobs
run_jobs = _get_jobs_to_run(options.run_jobs, options.njobs)
cts_per_job = old_div(nct_use, options.njobs)
remainder = nct_use % options.njobs
st_writer = None
format = fileutils.get_structure_file_format(job_input_file)
adj = 0
ict = 1
pattern = b""
header = b""
if (in_file_ext[-2:] == 'gz'):
input_fh = gzip.open(job_input_file, 'rb')
else:
input_fh = open(job_input_file, 'rb')
if format == structure.MAESTRO:
pattern = rb"^\s*f_m_ct\s*{"
#header = '''{\n s_m_m2io_version\n :::\n 2.0.0\n}\n\n'''
input_dat = []
header_indx = 0
for line in input_fh:
input_dat.append(line)
header_indx += 1
if (header_indx == 6):
break
header = b''.join(input_dat[:6])
st_reader = structure.StructureReader(job_input_file,
index=options.firstct)
elif format == structure.SD:
pattern = b"$$$$"
header = b""
# Renumbering since delimiter is at end of ct
ict = 0
adj = 1
st_reader = structure.StructureReader(job_input_file,
index=options.firstct)
elif format == structure.SMILES:
st_reader = structure.SmilesReader(job_input_file,
index=options.firstct)
elif format == structure.SMILESCSV:
st_reader = structure.SmilesCsvReader(job_input_file,
index=options.firstct)
for line in input_fh:
if line[:6] == b"SMILES":
header = line
break
if (header == b""):
input_fh.seek(0)
else:
print("WARNING: Unsupported format ")
st_reader = structure.StructureReader(job_input_file,
index=options.firstct)
#subjob_input_mae_files = []
input_mae_file = None
ijob = 0
indx = 1
cts_for_job = cts_per_job
if remainder > 0:
cts_for_job += 1
cts_in_job = 0
ct_delim = 0
fw = None
for line in input_fh:
# For Maestro files, delimiter comes in the beginning so deal with here and for sdf later
if format == structure.MAESTRO:
ct_delim = 0
if re.match(pattern, line):
ct_delim = 1
# Open up the subjob output file if required and write header into it
if ((ct_delim != 0 and ict > 1 and cts_in_job == cts_for_job and ijob <= options.njobs) \
or (ijob == 0)):
# Close the previous output file
if (ijob != 0):
fw.close()
fw = None
if (ijob == options.njobs):
break
ijob += 1
# Extra cts have been added, now back to cts_per_job
if (ijob > remainder):
cts_for_job = cts_per_job
subjobname = out_file_base + "_subjob_" + str(ijob)
#print subjobname
# Create subjob direcotry name and the file name
if not os.path.isdir(subjobname):
os.mkdir(subjobname)
input_mae_file = os.path.join(subjobname, in_file_base + "_subjob_" + \
str(ijob) + in_file_ext)
#subjob_input_mae_files.append(input_mae_file)
# Populate the subjobname and starting index in returning dictionary.
subjob_input_dat[input_mae_file] = str(ict + adj)
# If this job is not to be run, remove it from list of jobs returned
if run_jobs and ijob not in run_jobs:
#subjob_input_mae_files.remove(input_mae_file)
subjob_input_dat.pop(input_mae_file)
if (in_file_ext[-2:] == 'gz'):
fw = gzip.open(input_mae_file, 'wb')
else:
fw = open(input_mae_file, 'wb')
fw.write(header)
cts_in_job = 0
if format == structure.SD:
ct_delim = 0
if pattern in line:
ct_delim = 2
fw.write(line)
line = b""
elif format == structure.SMILESCSV:
ct_delim = 0
if (len(line) > 1 and line[:6] != b"SMILES" and
line[0] != b"#" and line[0] != b" "):
ct_delim = 3
else:
line = b""
elif format == structure.SMILES:
ct_delim = 0
if (len(line) > 1 and line[0] != b"#" and line[0] != b" "):
ct_delim = 4
if (ct_delim != 0):
ict += 1
cts_in_job += 1
fw.write(line)
input_fh.close()
if fw is not None:
fw.close()
else:
input_mae_file = None
ijob = 1
while ijob <= options.njobs:
subjobname = out_file_base + "_subjob_" + str(ijob)
if not os.path.isdir(subjobname):
os.mkdir(subjobname)
input_mae_file = os.path.join(subjobname, in_file_base + "_subjob_" + \
str(ijob) + in_file_ext)
subjob_input_dat[input_mae_file] = 1
ijob += 1
return subjob_input_dat
def _run_subjobs(jdj, restart_file):
jdj.run(status_change_callback=lambda _: jdj.dump(restart_file))
if jdj.isComplete():
# Remove restart file if present
fileutils.force_remove(restart_file)
[docs]def launch_subjobs(
options,
driver_script,
backend_instance,
backend_args,
job_input_file,
job_output_file,
cmd_append="",
merge=True,
prepare_subjob_callback=None,
callback_data={}, # noqa: M511
split_input_file=True,
post_execution_processing_callback=None,
post_execution_processing_callback_data={}): # noqa: M511
"""
Launch subjobs.
:type options: Instance
:param options: object containing values of para job options
:type driver_script: String
:param driver_script: Driver script name
:type backend_instance: _Backend
:param backend_instance: Instance of the _Backend
:type backend_args: List
:param backend_args: List of arguments
:type job_input_file: String
:param job_input_file: Job input file
:type job_output_file: String
:param job_output_file: Job output file
:type cmd_append: String
:param cmd_append: Command to be appended to subjob command
:type merge: Boolean
:param merge: Whether to join subjob outputs or not
:type prepare_subjob_callback: Function
:param prepare_subjob_callback: Function to be called to prepare subjob data
:type callback_data: Dictionary
:param callback_data: A dictionary to be passed to subjob callback
"""
jobname = fileutils.get_basename(job_output_file)
restart_file = jobname + '.restart'
# Job restart functionality
if options.restart:
if os.path.isfile(restart_file):
fh = open(restart_file)
jdj = pickle.load(fh)
fh.close()
print("Total subjobs :", len(jdj.all_jobs))
print("Finished subjobs:", len(jdj.done_jobs))
print("Failed subjobs :", len(jdj.failed_jobs))
print("Running failed subjobs...")
_run_subjobs(jdj, restart_file)
if jdj.isComplete():
if merge:
# Prepare subjob input files to merge
in_file_base, in_file_ext = fileutils.splitext(
job_input_file)
out_file_base, out_file_ext = fileutils.splitext(
job_output_file)
subjob_input_mae_files = []
all_jobs = jdj.all_jobs
all_jobs.sort()
for i, job in enumerate(all_jobs):
job_obj = job.getJob()
subjobname = out_file_base + "_subjob_" + str(i + 1)
infile = os.path.join(subjobname, in_file_base + \
"_subjob_" + str(i + 1) + in_file_ext)
if os.path.basename(infile) in job_obj.InputFiles:
subjob_input_mae_files += [infile]
if subjob_input_mae_files:
subjob_input_mae_files.sort(key=_subjob_input_id)
_merge_subjobs(backend_instance, subjob_input_mae_files,
job_output_file, options.output_org,
post_execution_processing_callback_data)
return
else:
print("Warning: Restart file '%s', couldn't find." % restart_file)
print("Running without restart file.")
if split_input_file:
if options.output_org:
if options.debug:
print("Output organization set to", options.output_org)
if options.output_org == "BY_SUBJOB":
print("Separate output structure files will be created for " \
"each subjob.")
else:
print("All output structures will be stored in one file called '%s'" % \
job_output_file)
# Generate subjobs dirs and data
subjob_input_dat = _generate_subjobs(job_input_file, job_output_file,
options, driver_script,
split_input_file)
subjob_input_mae_files = list(subjob_input_dat)
subjob_input_mae_files.sort(key=_subjob_input_id)
print("Generated %d jobs" % len(subjob_input_mae_files))
#for key in subjob_input_mae_files:
# print key, subjob_input_dat[key]
#sys.exit(0)
if hasattr(options, 'no_execution') and options.no_execution:
print("------------------------------------------------------------")
print("-nx specified only creating subdirectories and input " \
"structure files.")
print(" jobs will not be run.")
print("------------------------------------------------------------")
return
print("Preparing subjobs...")
backend = ""
if driver_script == "epik_driver":
backend = "epik"
elif driver_script == "bmin_driver":
backend = "bmin"
elif driver_script == "ligprep_driver":
msg = 'Does not work with modern ligprep'
raise deprecation.DeprecationError(msg)
else:
raise RuntimeError(f'Unrecognized driver {driver_script}')
subjob_cmd = [os.path.join(os.environ['SCHRODINGER'], backend)]
subjob_cmd += backend_args
max_failures = jobqueue.NOLIMIT
if options.strict_end:
max_failures = 0
out_file_base, out_file_ext = fileutils.splitext(job_output_file)
hosts = jobcontrol.get_backend_host_list()
if not hosts:
hosts = [('localhost', 1)]
jdj = jobqueue.JobDJ(hosts=hosts,
max_failures=max_failures,
max_retries=options.max_retries,
verbosity="verbose")
for i, file_path in enumerate(subjob_input_mae_files):
subjobdir = os.path.dirname(file_path)
print(" subjobinfo subjobname", subjobdir)
in_file = os.path.basename(file_path)
if split_input_file:
print(" subjobinfo input ", in_file)
out_file = subjobdir + out_file_ext
print(" subjobinfo output ", out_file)
if split_input_file:
print(" subjobinfo log ", subjobdir + '.log')
if prepare_subjob_callback:
prepare_subjob_callback(subjobdir, in_file, out_file, callback_data)
cmd = list(subjob_cmd)
if driver_script == "epik_driver":
cmd.extend(['-imae', in_file, '-omae', out_file])
elif driver_script == "bmin_driver":
cmd.extend([subjobdir])
elif driver_script == "ligprep_driver":
tmp_cmd = cmd_append.strip().split(' ')
tmp_cmd[1] = in_file
tmp_cmd[3] = out_file
cmd.extend(tmp_cmd)
cmd.extend(['-indx', subjob_input_dat[file_path]])
if options.debug:
print("subjob command", ' '.join(cmd))
jdj.addJob(cmd, command_dir=subjobdir)
print("Running subjobs...")
_run_subjobs(jdj, restart_file)
if jdj.isComplete():
if merge:
_merge_subjobs(backend_instance, subjob_input_mae_files,
job_output_file, options.output_org,
post_execution_processing_callback_data)
if post_execution_processing_callback:
base, ext = fileutils.splitext(job_output_file)
main_log = base + '.log'
post_execution_processing_callback(
subjob_input_mae_files, main_log,
post_execution_processing_callback_data)
# EV# 109618: Remove subjob dirs if job ran locally (i.e. using -LOCAL or
# -NOJOBID) and in all other cases jobcontrol would take care of deleting
# subjob dirs.
if backend_instance:
backend_job = backend_instance.getJob()
local = (backend_job.JobDir == backend_job.Dir)
else:
local = True
if hasattr(options, 'nocleanup') and options.nocleanup == False and local:
print("-------------------------------------------")
print("Removing subjob directories")
for file_path in subjob_input_mae_files:
shutil_rmtree(os.path.dirname(file_path))
print("-------------------------------------------")
###############################################################################
#EOF