"""
 Copyright Schrodinger, LLC. All rights reserved.
 This is a common module for parallel job execution.
 It provides functionalities to configure and run parallel jobs.
 Currently this script is used by epik_driver, bmin_driver, and ligprep_driver.
"""
# Maintainer: Hiral Oza
import datetime
import gzip
import os
import pickle
import re
import sys
import tarfile
from past.utils import old_div
from shutil import copy as shutil_copy
from shutil import rmtree as shutil_rmtree
from schrodinger import structure
from schrodinger.job import jobcontrol
from schrodinger.job import queue as jobqueue
from schrodinger.utils import deprecation
from schrodinger.utils import fileutils
############################## Global variables ###############################
(NJOBS, JOBCTS, FIRSTCT, LASTCT, RUN_JOBS, STRICT_END, MAX_RETRIES, OUTPUT_ORG,
 NO_EXECUTION, RESTART, NC) = list(range(0, 11))
###############################################################################
###############################################################################
######################### Common functions ####################################
###############################################################################
[docs]def add_para_job_options(parser, options=None):
    """
    Adds common para job control options to a SingleDashOptionParser instance.
    :type parser: SingleDashOptionParser
    :param parser: Instance of SingleDashOptionParser
    :type options: List
    :param options: List of module enums that indicate what options to add to
                    the parser.
    """
    if not options:
        options = [
            NJOBS, JOBCTS, FIRSTCT, LASTCT, RUN_JOBS, STRICT_END, MAX_RETRIES,
            OUTPUT_ORG, NO_EXECUTION, RESTART, NC
        ]
    if NJOBS in options:
        #Number of subjobs to prepare
        parser.add_option("-NJOBS",
                          "-nprocs",
                          action="store",
                          type="int",
                          dest="njobs",
                          default=1)
    if JOBCTS in options:
        #Max number of cts
        parser.add_option('-JOBCTS',
                          action="store",
                          type="int",
                          dest="jobcts",
                          default=10000)
    if FIRSTCT in options:
        #First structure to process
        parser.add_option('-first', action="store", type="int", dest="firstct")
    if LASTCT in options:
        #Last structure to process
        parser.add_option('-last', action="store", type="int", dest="lastct")
    if RUN_JOBS in options:
        #Run specified subjob
        parser.add_option('-j', action="store", dest="run_jobs")
    if STRICT_END in options:
        #Job die if any subjob dies
        parser.add_option("-STRICT_END",
                          action="store_true",
                          dest="strict_end",
                          default=False)
    if MAX_RETRIES in options:
        #Number of allowed retries per subjob
        parser.add_option('-MAX_RETRIES',
                          action="store",
                          type="int",
                          default=2,
                          dest="max_retries")
    if OUTPUT_ORG in options:
        #Organization output
        parser.add_option("-OUTPUT_ORG", action="store", dest="output_org")
    if NO_EXECUTION in options:
        #No execution
        parser.add_option("-nx",
                          action="store_true",
                          dest="no_execution",
                          default=False)
    if RESTART in options:
        #Restart failed parent job
        parser.add_option("-RESTART",
                          action="store_true",
                          dest="restart",
                          default=False)
    if NC in options:
        # Cleanup subdirectories for each subjob
        parser.add_option("-nc",
                          "-NC",
                          action="store_true",
                          dest="nocleanup",
                          default=False) 
[docs]def validate_options(options):
    """
    Validate the job control options
    :type options: Instance
    :param options: object containing values of para job options
    """
    if hasattr(options, 'njobs') and options.njobs < 1:
        print("Error: -NJOBS/-nprocs option (%i) requests less than 1 job." % \
              
options.njobs)
        sys.exit(1)
    if hasattr(options, 'jobcts') and options.jobcts < 1:
        print("Error: -JOBCTS option (%i) requests less than 1 ct structure " \
              
"per job." % options.jobcts)
        sys.exit(1)
    if hasattr(options, 'firstct') and options.firstct:
        if options.firstct < 1:
            print("Error: Specified first structure to process (%i) is less " \
                  
"than 1." % options.firstct)
            sys.exit(1)
        if options.debug:
            print("First structure to process set to                 :", \
                  
options.firstct)
    if hasattr(options, 'lastct') and options.lastct:
        if options.lastct < 1:
            print("Error: Specified last structure to process (%i) is less " \
                  
"than 1." % options.lastct)
            sys.exit(1)
        if options.debug:
            print("Last structure to process set to                  :", \
                  
options.lastct)
    if hasattr(options, 'firstct') and \
       
hasattr(options, 'lastct') and \
       
options.firstct and options.lastct and options.lastct < options.firstct:
        print("Error: Specified last structure to process (%i) is less than " \
              
"that specified for the first structure (%i)." % (options.lastct,
                      options.firstct))
        sys.exit(1)
    if hasattr(options, 'output_org') and\
       
options.output_org and options.output_org != "BY_SUBJOB":
        print("Error: If -OUTPUT_ORG is specified, it must be set to BY_SUBJOB")
        sys.exit(1) 
[docs]def print_job_info(argv, driver_script, version):
    """
    Check command line arguments passed and environment variables based on the
    driver.
    :type argv: String
    :param argv: Command line arguments
    :type driver_script: String
    :param driver_script: Driver script name
    :type version: String
    :param version: Driver script version
    """
    if len(argv) < 2:
        print("%s received no arguments.\n" % driver_script)
        sys.exit(1)
    print("%s. Copyright (c) Schrodinger, LLC.\n" % driver_script)
    backend_instance = jobcontrol.get_backend()
    if backend_instance:
        backend_job = backend_instance.getJob()
        print("Job id: %s" % backend_job.job_id)
        local = (backend_job.JobDir == backend_job.Dir)
        print('Launch directory:', backend_job.Dir)
        print('Job directory:', backend_job.JobDir)
    else:
        local = True
        print("Job directory: " + os.getcwd())
    print('Running locally:', local)
    print("======================================")
    print("Running", driver_script)
    print(_timestamp())
    print("Version:", version)
    print("Command:", ' '.join(sys.argv))
    print(".....................................\n")
    return backend_instance 
def _subjob_input_id(text):
    """
    To be used for the subjob input names sorting.
    """
    m = re.match(r'^.*_subjob_(\d+)(\..*)?$', text)
    assert m is not None
    return int(m.group(1))
def _append_st_file(st_file, st_writer, raw_append=False):
    """
    Append structure files.
    :type st_file: String
    :param st_file: Structure file name
    :type raw_append: Boolean
    :param raw_append: Indicates to append file using raw I/O
    """
    try:
        if raw_append:
            st_writer.write(open(st_file, 'rb').read())
        else:
            for st in structure.StructureReader(st_file):
                st_writer.append(st)
    except:
        print("Error: appending structure output file.")
def _print_subjob_logfile(log_file="", start_msg=None, end_msg=None):
    """
    Prints given log file with start and end messages.
    :type log_file: String
    :param log_file: Log file name. Print the content of log_file, if specified
    :type start_msg: String
    :param start_msg: Message to place at the start of the log file
    :type end_msg: String
    :param end_msg: Message to place at the end of the log file
    """
    if start_msg:
        print(start_msg)
    if log_file:
        in_fh = open(log_file)
        for line in in_fh:
            print(line, end=' ')  # trailing comma replacement for line.strip()
        in_fh.close()
    if end_msg:
        print(end_msg)
def _timestamp():
    now = datetime.datetime.now()
    return now.strftime("%a, %d %b %Y %H:%M:%S.%f")
def _merge_subjobs(backend_instance, subjob_input_mae_files, output_file,
                   output_org, data):
    """
    Merge subjobs structure and log files.
    :type backend_instance: _Backend
    :param backend_instance: Instance of the _Backend
    :type subjob_input_mae_files: List
    :param subjob_input_mae_files: List of subjob files with parent directory
    :type output_file: String
    :param output_file: Job output file name
    :type output_org: String
    :param output_org: Produce more than one output structure file
    """
    msg1 = "###########################################\n" \
                "###########################################\n" \
                " Organizing Output Files:                  \n"
    start_msg = "...........................................\n" \
                " Collecting Results from Job: %s           \n" \
                "   -------------------------------------   \n"
    msg2 = "End of Log files for subjobs.              \n\n" \
                "-------------------------------------------\n\n" \
                "All structure file output placed in one output " \
                "structure file: %s.\n" \
                "Number of structures placed in %s: %d.\n"\
                "###########################################\n" \
                "%s\nJob Complete"
    end_msg = "\n   -------------------------------------   \n" \
                "...........................................\n"
    base, ext = fileutils.splitext(output_file)
    st_writer = open(output_file, "wb")
    raw_append = True
    tar = tarfile.open(base + '.tar.gz', "w:gz")
    nfiles = len(subjob_input_mae_files)
    for count, file_path in enumerate(subjob_input_mae_files):
        subjobdir = os.path.dirname(file_path)
        tar.add(subjobdir)
        # Collect structures
        st_file = os.path.join(subjobdir, subjobdir + ext)
        if os.path.isfile(st_file):
            if backend_instance and output_org:
                # FIXME: we tried with backend.addOutputfile() but it couldn't
                # work so now coping subjob output files individually from
                # job_dir to launch_dir
                try:
                    shutil_copy(
                        os.path.join(backend_instance.getJob().JobDir, st_file),
                        backend_instance.getJob().Dir)
                except:
                    pass
            _append_st_file(st_file, st_writer, raw_append)
        # Collect log
        tmp1_msg = start_msg % subjobdir
        if count == 0:
            tmp1_msg = msg1 + tmp1_msg
        tmp2_msg = end_msg
        log_file = os.path.join(subjobdir, subjobdir + ".log")
        if os.path.isfile(log_file):
            _print_subjob_logfile(log_file,
                                  start_msg=tmp1_msg,
                                  end_msg=tmp2_msg)
            tmp1_msg = ""
    st_writer.close()
    tar.close()
    frmt = None
    if 'output_format' in data:
        frmt = data['output_format']
        if frmt == 'mae':
            frmt = 'maestro'
    tmp2_msg = end_msg + msg2 % (output_file, output_file,
                                 structure.count_structures(output_file),
                                 _timestamp())
    _print_subjob_logfile(start_msg=tmp1_msg, end_msg=tmp2_msg)
def _get_jobs_to_run(run_jobs_str, njobs):
    """
    Return the list of jobs to run.
    :type run_jobs_str: String
    :param run_jobs_str: String of ':' or ',' separated numbers
    :type njobs: Number
    :param njobs: Total number of jobs requested
    """
    run_jobs = []
    def validate_or_add_num(str, range_check=True):
        try:
            num = int(str)
        except:
            print("Error: Wrong job number '%s' specified." % str)
            sys.exit(1)
        if range_check:
            if num > 0:
                run_jobs.append(num)
            else:
                print("Warning: The specified job number %i is less than 0. " \
                      "Skipping this job." % job_num)
        return num
    if run_jobs_str:
        for item in run_jobs_str.strip().split(','):
            item = item.strip()
            tmp_list = item.split(':')
            if not tmp_list or len(tmp_list) > 2:
                print("Warning: can not identify the job '%s'. Skipping." %
                      item)
                continue
            if len(tmp_list) == 2:
                start = validate_or_add_num(tmp_list[0].strip(),
                                            range_check=False)
                end = validate_or_add_num(tmp_list[1].strip(),
                                          range_check=False)
                for i in range(start, end + 1):
                    validate_or_add_num(i)
            else:
                validate_or_add_num(item)
        # Remove duplicates if any
        run_jobs = list(set(run_jobs))
        if not run_jobs:
            print("Error: Specified job numbers '%s' are not valid." % \
                  run_jobs_str)
            sys.exit(1)
        else:
            # Sort list
            run_jobs.sort()
            tmp_list = []
            for job_num in run_jobs:
                if job_num > njobs:
                    print("Warning: The specified job number %i is greater " \
                          "than the number of jobs (%i). Skipping this job." \
                          % (job_num, njobs))
                else:
                    tmp_list += [job_num]
            run_jobs = tmp_list
            print("Only running subjobs: %s" % \
                  ','.join(["%s" % j for j in run_jobs]))
    return run_jobs
def _generate_subjobs(job_input_file,
                      job_output_file,
                      options,
                      driver_script,
                      split_input_file=True):
    """
    Creates the subjob dirs and fill in data
    :type job_input_file: String
    :param job_input_file: Job input file
    :type job_output_file: String
    :param job_output_file: Job output file
    :type options: Instance
    :param options: object containing values of para job options
    """
    in_file_base, in_file_ext = fileutils.splitext(job_input_file)
    in_file_base = fileutils.get_basename(job_input_file)
    out_file_base, out_file_ext = fileutils.splitext(job_output_file)
    subjob_input_dat = {}
    if split_input_file:
        nct_in = structure.count_structures(job_input_file)
        if options.firstct:
            print("First structure to process set to                   :", \
                  options.firstct)
        else:
            options.firstct = 1
        if options.lastct:
            if options.lastct < nct_in:
                print("Number of structures present in input structure file:", \
                      nct_in)
                print("Number of last structure to be processed            :", \
                      options.lastct)
            elif options.lastct > nct_in:
                print("Number of structures present in input structure file:", \
                      nct_in)
                print("Number of last structure to be processed            :", \
                      options.lastct)
                print("Requested number of last structure to process exceeds " \
                      "number present.")
                print("Reseting number of last structure to process to number " \
                      "present (%i)." % nct_in)
                options.lastct = nct_in
        else:
            options.lastct = nct_in
        nct_use = options.lastct - options.firstct + 1
        print("Number of structures to process                     :", nct_use)
        if nct_use < options.njobs:
            print("More jobs requested (%i) than number of structures to " \
                  "process (%i)." % (options.njobs, nct_use))
            options.njobs = nct_use
            print("Number of jobs set to %i." % options.njobs)
        else:
            tmp_njobs = 0
            if nct_use % options.jobcts != 0:
                tmp_njobs = int(
                    old_div(
                        (nct_use + (options.jobcts - nct_use % options.jobcts)),
                        options.jobcts))
            else:
                tmp_njobs = int(old_div(nct_use, options.jobcts))
            if tmp_njobs > options.njobs:
                print("Number of jobs requested (%i) requires that more than\n" \
                      " JOBCTS (%i) structures be processed by each subjob." \
                      % (options.njobs, options.jobcts))
                print("Number of jobs increased to: %i" % tmp_njobs)
                options.njobs = tmp_njobs
        run_jobs = _get_jobs_to_run(options.run_jobs, options.njobs)
        cts_per_job = old_div(nct_use, options.njobs)
        remainder = nct_use % options.njobs
        st_writer = None
        format = fileutils.get_structure_file_format(job_input_file)
        adj = 0
        ict = 1
        pattern = b""
        header = b""
        if (in_file_ext[-2:] == 'gz'):
            input_fh = gzip.open(job_input_file, 'rb')
        else:
            input_fh = open(job_input_file, 'rb')
        if format == structure.MAESTRO:
            pattern = rb"^\s*f_m_ct\s*{"
            #header = '''{\n  s_m_m2io_version\n  :::\n  2.0.0\n}\n\n'''
            input_dat = []
            header_indx = 0
            for line in input_fh:
                input_dat.append(line)
                header_indx += 1
                if (header_indx == 6):
                    break
            header = b''.join(input_dat[:6])
            st_reader = structure.StructureReader(job_input_file,
                                                  index=options.firstct)
        elif format == structure.SD:
            pattern = b"$$$$"
            header = b""
            # Renumbering since delimiter is at end of ct
            ict = 0
            adj = 1
            st_reader = structure.StructureReader(job_input_file,
                                                  index=options.firstct)
        elif format == structure.SMILES:
            st_reader = structure.SmilesReader(job_input_file,
                                               index=options.firstct)
        elif format == structure.SMILESCSV:
            st_reader = structure.SmilesCsvReader(job_input_file,
                                                  index=options.firstct)
            for line in input_fh:
                if line[:6] == b"SMILES":
                    header = line
                    break
            if (header == b""):
                input_fh.seek(0)
        else:
            print("WARNING: Unsupported format ")
            st_reader = structure.StructureReader(job_input_file,
                                                  index=options.firstct)
        #subjob_input_mae_files = []
        input_mae_file = None
        ijob = 0
        indx = 1
        cts_for_job = cts_per_job
        if remainder > 0:
            cts_for_job += 1
        cts_in_job = 0
        ct_delim = 0
        fw = None
        for line in input_fh:
            # For Maestro files, delimiter comes in the beginning so deal with here and for sdf later
            if format == structure.MAESTRO:
                ct_delim = 0
                if re.match(pattern, line):
                    ct_delim = 1
            # Open up the subjob output file if required and write header into it
            if ((ct_delim != 0 and ict > 1 and cts_in_job == cts_for_job and ijob <= options.njobs) \
                 or (ijob == 0)):
                # Close the previous output file
                if (ijob != 0):
                    fw.close()
                    fw = None
                if (ijob == options.njobs):
                    break
                ijob += 1
                # Extra cts have been added, now back to cts_per_job
                if (ijob > remainder):
                    cts_for_job = cts_per_job
                subjobname = out_file_base + "_subjob_" + str(ijob)
                #print subjobname
                # Create subjob direcotry name and the file name
                if not os.path.isdir(subjobname):
                    os.mkdir(subjobname)
                input_mae_file = os.path.join(subjobname, in_file_base + "_subjob_" + \
                       str(ijob) + in_file_ext)
                #subjob_input_mae_files.append(input_mae_file)
                # Populate the subjobname and starting index in returning dictionary.
                subjob_input_dat[input_mae_file] = str(ict + adj)
                # If this job is not to be run, remove it from list of jobs returned
                if run_jobs and ijob not in run_jobs:
                    #subjob_input_mae_files.remove(input_mae_file)
                    subjob_input_dat.pop(input_mae_file)
                if (in_file_ext[-2:] == 'gz'):
                    fw = gzip.open(input_mae_file, 'wb')
                else:
                    fw = open(input_mae_file, 'wb')
                fw.write(header)
                cts_in_job = 0
            if format == structure.SD:
                ct_delim = 0
                if pattern in line:
                    ct_delim = 2
                    fw.write(line)
                    line = b""
            elif format == structure.SMILESCSV:
                ct_delim = 0
                if (len(line) > 1 and line[:6] != b"SMILES" and
                        line[0] != b"#" and line[0] != b" "):
                    ct_delim = 3
                else:
                    line = b""
            elif format == structure.SMILES:
                ct_delim = 0
                if (len(line) > 1 and line[0] != b"#" and line[0] != b" "):
                    ct_delim = 4
            if (ct_delim != 0):
                ict += 1
                cts_in_job += 1
            fw.write(line)
        input_fh.close()
        if fw is not None:
            fw.close()
    else:
        input_mae_file = None
        ijob = 1
        while ijob <= options.njobs:
            subjobname = out_file_base + "_subjob_" + str(ijob)
            if not os.path.isdir(subjobname):
                os.mkdir(subjobname)
            input_mae_file = os.path.join(subjobname, in_file_base + "_subjob_" + \
                str(ijob) + in_file_ext)
            subjob_input_dat[input_mae_file] = 1
            ijob += 1
    return subjob_input_dat
def _run_subjobs(jdj, restart_file):
    jdj.run(status_change_callback=lambda _: jdj.dump(restart_file))
    if jdj.isComplete():
        # Remove restart file if present
        fileutils.force_remove(restart_file)
[docs]def launch_subjobs(
        options,
        driver_script,
        backend_instance,
        backend_args,
        job_input_file,
        job_output_file,
        cmd_append="",
        merge=True,
        prepare_subjob_callback=None,
        callback_data={},  # noqa: M511
        split_input_file=True,
        post_execution_processing_callback=None,
        post_execution_processing_callback_data={}):  # noqa: M511
    """
    Launch subjobs.
    :type options: Instance
    :param options: object containing values of para job options
    :type driver_script: String
    :param driver_script: Driver script name
    :type backend_instance: _Backend
    :param backend_instance: Instance of the _Backend
    :type backend_args: List
    :param backend_args: List of arguments
    :type job_input_file: String
    :param job_input_file: Job input file
    :type job_output_file: String
    :param job_output_file: Job output file
    :type cmd_append: String
    :param cmd_append: Command to be appended to subjob command
    :type merge: Boolean
    :param merge: Whether to join subjob outputs or not
    :type prepare_subjob_callback: Function
    :param prepare_subjob_callback: Function to be called to prepare subjob data
    :type callback_data: Dictionary
    :param callback_data: A dictionary to be passed to subjob callback
    """
    jobname = fileutils.get_basename(job_output_file)
    restart_file = jobname + '.restart'
    # Job restart functionality
    if options.restart:
        if os.path.isfile(restart_file):
            fh = open(restart_file)
            jdj = pickle.load(fh)
            fh.close()
            print("Total subjobs   :", len(jdj.all_jobs))
            print("Finished subjobs:", len(jdj.done_jobs))
            print("Failed subjobs  :", len(jdj.failed_jobs))
            print("Running failed subjobs...")
            _run_subjobs(jdj, restart_file)
            if jdj.isComplete():
                if merge:
                    # Prepare subjob input files to merge
                    in_file_base, in_file_ext = fileutils.splitext(
                        job_input_file)
                    out_file_base, out_file_ext = fileutils.splitext(
                        job_output_file)
                    subjob_input_mae_files = []
                    all_jobs = jdj.all_jobs
                    all_jobs.sort()
                    for i, job in enumerate(all_jobs):
                        job_obj = job.getJob()
                        subjobname = out_file_base + "_subjob_" + str(i + 1)
                        infile = os.path.join(subjobname, in_file_base + \
                                
"_subjob_" + str(i + 1) + in_file_ext)
                        if os.path.basename(infile) in job_obj.InputFiles:
                            subjob_input_mae_files += [infile]
                    if subjob_input_mae_files:
                        subjob_input_mae_files.sort(key=_subjob_input_id)
                        _merge_subjobs(backend_instance, subjob_input_mae_files,
                                       job_output_file, options.output_org,
                                       post_execution_processing_callback_data)
            return
        else:
            print("Warning: Restart file '%s', couldn't find." % restart_file)
            print("Running without restart file.")
    if split_input_file:
        if options.output_org:
            if options.debug:
                print("Output organization set to", options.output_org)
            if options.output_org == "BY_SUBJOB":
                print("Separate output structure files will be created for " \
                      
"each subjob.")
        else:
            print("All output structures will be stored in one file called '%s'" % \
                  
job_output_file)
    # Generate subjobs dirs and data
    subjob_input_dat = _generate_subjobs(job_input_file, job_output_file,
                                         options, driver_script,
                                         split_input_file)
    subjob_input_mae_files = list(subjob_input_dat)
    subjob_input_mae_files.sort(key=_subjob_input_id)
    print("Generated %d jobs" % len(subjob_input_mae_files))
    #for key in subjob_input_mae_files:
    #   print key, subjob_input_dat[key]
    #sys.exit(0)
    if hasattr(options, 'no_execution') and options.no_execution:
        print("------------------------------------------------------------")
        print("-nx specified only creating subdirectories and input " \
              
"structure files.")
        print("    jobs will not be run.")
        print("------------------------------------------------------------")
        return
    print("Preparing subjobs...")
    backend = ""
    if driver_script == "epik_driver":
        backend = "epik"
    elif driver_script == "bmin_driver":
        backend = "bmin"
    elif driver_script == "ligprep_driver":
        msg = 'Does not work with modern ligprep'
        raise deprecation.DeprecationError(msg)
    else:
        raise RuntimeError(f'Unrecognized driver {driver_script}')
    subjob_cmd = [os.path.join(os.environ['SCHRODINGER'], backend)]
    subjob_cmd += backend_args
    max_failures = jobqueue.NOLIMIT
    if options.strict_end:
        max_failures = 0
    out_file_base, out_file_ext = fileutils.splitext(job_output_file)
    hosts = jobcontrol.get_backend_host_list()
    if not hosts:
        hosts = [('localhost', 1)]
    jdj = jobqueue.JobDJ(hosts=hosts,
                         max_failures=max_failures,
                         max_retries=options.max_retries,
                         verbosity="verbose")
    for i, file_path in enumerate(subjob_input_mae_files):
        subjobdir = os.path.dirname(file_path)
        print(" subjobinfo subjobname", subjobdir)
        in_file = os.path.basename(file_path)
        if split_input_file:
            print(" subjobinfo input     ", in_file)
        out_file = subjobdir + out_file_ext
        print(" subjobinfo output    ", out_file)
        if split_input_file:
            print(" subjobinfo log       ", subjobdir + '.log')
        if prepare_subjob_callback:
            prepare_subjob_callback(subjobdir, in_file, out_file, callback_data)
        cmd = list(subjob_cmd)
        if driver_script == "epik_driver":
            cmd.extend(['-imae', in_file, '-omae', out_file])
        elif driver_script == "bmin_driver":
            cmd.extend([subjobdir])
        elif driver_script == "ligprep_driver":
            tmp_cmd = cmd_append.strip().split(' ')
            tmp_cmd[1] = in_file
            tmp_cmd[3] = out_file
            cmd.extend(tmp_cmd)
            cmd.extend(['-indx', subjob_input_dat[file_path]])
        if options.debug:
            print("subjob command", ' '.join(cmd))
        jdj.addJob(cmd, command_dir=subjobdir)
    print("Running subjobs...")
    _run_subjobs(jdj, restart_file)
    if jdj.isComplete():
        if merge:
            _merge_subjobs(backend_instance, subjob_input_mae_files,
                           job_output_file, options.output_org,
                           post_execution_processing_callback_data)
        if post_execution_processing_callback:
            base, ext = fileutils.splitext(job_output_file)
            main_log = base + '.log'
            post_execution_processing_callback(
                subjob_input_mae_files, main_log,
                post_execution_processing_callback_data)
    # EV# 109618: Remove subjob dirs if job ran locally (i.e. using -LOCAL or
    # -NOJOBID) and in all other cases jobcontrol would take care of deleting
    # subjob dirs.
    if backend_instance:
        backend_job = backend_instance.getJob()
        local = (backend_job.JobDir == backend_job.Dir)
    else:
        local = True
    if hasattr(options, 'nocleanup') and options.nocleanup == False and local:
        print("-------------------------------------------")
        print("Removing subjob directories")
        for file_path in subjob_input_mae_files:
            shutil_rmtree(os.path.dirname(file_path))
        print("-------------------------------------------") 
###############################################################################
#EOF