"""
Driver script for running MOPAC7.1 and MOPAC_MAIN backend execs.
"""
# Contributors: Mark A. Watson
import glob
import os
import re
import shutil
import socket
import sys
import tempfile
import time
from pathlib import Path
from schrodinger.application.jaguar import file_logger
from schrodinger.application.matsci.jobutils import RobustSubmissionJob
from schrodinger.application.mopac import mopac_parser
from schrodinger.application.mopac import utils
from schrodinger.application.mopac.mopac_launchers import MOPAC71
from schrodinger.application.mopac.mopac_launchers import get_mopac_launcher
from schrodinger.infra import mm
from schrodinger.job import jobcontrol
from schrodinger.job import queue
from schrodinger.structure import StructureReader
from schrodinger.structure import StructureWriter
from schrodinger.utils import fileutils
from schrodinger.utils import subprocess
_external_re = re.compile(r"EXTERNAL\s*=\s*(\S+)", re.I)
def _transfer_subjob_files(jobbe, job_id):
"""
Copy files held in a job record from the working dir to
the launch dir associated with a jobcontrol backend.
This function can handle jobs launched in subdirectories.
:type jobbe: schrodinger.job.jobcontrol._Backend
:param jobbe: The Backend object for the current job.
:type job_id: jobcontrol.Job.JobId
:param job_id: jobcontrol job id
"""
subdir = file_logger.relative_path()
job = jobcontrol.Job(job_id)
for ifile in job.getInputFiles():
jobbe.copyOutputFile(os.path.join(subdir, ifile))
for ofile in job.getOutputFiles():
jobbe.copyOutputFile(os.path.join(subdir, ofile))
for lfile in job.LogFiles:
jobbe.copyOutputFile(os.path.join(subdir, lfile))
stoutfile = job.StructureOutputFile
if stoutfile:
jobbe.copyOutputFile(os.path.join(subdir, stoutfile))
def _distribute(cmds, keep_subjobs):
"""
Create a JobDJ, populate it with cmdline invocations, and launch it.
:type cmds: list of strs
:param cmds: command lines to launch
:type keep_subjobs: bool
:param keep_subjobs: only re-register files with parent if True
"""
def job_status_change_callback(job):
# JobDJ callback
if job.state in [queue.JobState.DONE, queue.JobState.FAILED]:
# Copy files registered with this job to the launch dir
# associated with this driver job.
if keep_subjobs:
jobobj = job.getJob()
if jobobj:
_transfer_subjob_files(jobbe, jobobj.JobId)
# Initialize JobDJ
jobdj = queue.JobDJ(max_failures=queue.NOLIMIT, verbosity='normal')
# Populate JobDJ
print("Processing the following subjobs:")
for i, job in enumerate(cmds):
print('(' + str(i + 1) + ') ' + ' '.join(job))
jobdj.addJob(RobustSubmissionJob(job))
print("Please check subjob files for error messages.")
# Execute JobDJ
jobbe = jobcontrol.get_backend()
if jobdj.all_jobs:
# Launch the subjobs in the JobDJ and transfer
# completed subjob files on-the-fly.
jobdj.run(status_change_callback=job_status_change_callback)
def _run_structures(cts, parsed_args, name):
"""
Launch a single MOPAC job locally for one or many structures.
The same MOPAC method and keywords are applied to each structure.
Creates an output .mae file in the same directory.
:type cts: list
:param cts: list of Structure instances
:type parsed_args: Argparse Namespace instance
:param parsed_args: parsed cmdline arguments
:type name: str
:param name: unique name for output
"""
mopac_launcher = get_mopac_launcher(parsed_args.mopac_version,
parsed_args.energy_only)
mopfile = name + '.mop'
outmae = name + '_out.mae'
# Create a single .mop file
mopac_launcher.write_mop_file(cts, mopfile, parsed_args.method,
parsed_args.geopt, parsed_args.keywords,
parsed_args.plotMO, parsed_args.gridres,
parsed_args.gridext)
# Run .mop file
ok = _run_single_mop_file(mopac_launcher, mopfile, structures=cts)
# Create output .mae file
with StructureWriter(outmae, overwrite=True) as writer:
for i, ct in enumerate(cts):
if ok[i]:
writer.append(ct)
elif parsed_args.return_all_structs:
print(
f'Job {name + "_" + str(i+1)} failed. Including in output anyway.'
)
writer.append(ct)
else:
msg = f'Job {name + "_" + str(i+1)} failed.'
msg += f'\nPlease use the {mopac_parser.RETURN_ALL_STRUCTS_FLAG} command line flag'
msg += ' to include failed structures in output .mae.'
print(msg)
def _run_mopac71_in_loop(cts, parsed_args, name):
"""
Process a multiple-structure .mae file locally with MOPAC7.1.
One output .mae file will be generated in the current directory.
Unfortunately, the f2py MOPAC7.1 interface is not stable when called
multiple times from the same subprocess. Therefore, we create a
new .mae file for each structure and invoke the backend on each file
in a loop. Note this is done locally in serial. Parallelization
can be achieved by supplying multiple input files.
:type cts: list
:param cts: list of Structure instances
:type parsed_args: Argparse Namespace instance
:param parsed_args: parsed cmdline arguments
:type name: str
:param name: unique name
"""
jobbe = jobcontrol.get_backend()
outmaes = []
print("Running %d MOPAC7.1 subjobs..." % len(cts))
print("Please check subjob files for error messages.")
for i, ct in enumerate(cts, 1):
# Launch MOPAC7.1 locally in a new subprocess for each structure
subname = name + '_' + str(i)
if not os.path.exists(subname):
os.mkdir(subname)
# Create a new subdirectory for each subjob to ensure that the current
# working directory only ends up with one output .mae file for this
# single input .mae file.
with fileutils.chdir(subname):
submae = subname + '.mae'
ct.write(submae)
cmd = ['run', 'semi_emp.py', '-NOJOBID', submae] + parsed_args.flags
with open(subname + '.log', 'w') as fh:
subprocess.call(cmd, stdout=fh, stderr=subprocess.STDOUT)
if os.path.exists(subname + '_out.mae'):
outmaes.append(os.path.abspath(subname + '_out.mae'))
# Register files with jobcontrol
if jobbe and parsed_args.keep_subjobs:
jobbe.addOutputFile(subname)
# Create output .mae file
with StructureWriter(name + '_out.mae') as writer:
for outmae in outmaes:
writer.extend(StructureReader(outmae))
# Copy auxiliary files to main directory (currently only .vis files)
visfiles = glob.glob('*/*.vis')
for visfile in visfiles:
shutil.copy(visfile, os.getcwd())
if jobbe:
jobbe.addOutputFile(os.path.basename(visfile))
def _run_single_mae_file(parsed_args, maefile):
"""
Process a single or multiple-structure .mae file locally.
One output .mae file will be generated in the current directory.
For a multi-structure <maefile>, it is much faster to process all the
structures together inside a single MOPAC input file instead of calling
the backend in a loop. Unfortunately, the f2py MOPAC7.1 interface cannot
handle multiple structures, so it must be called in a loop. However,
for the special case where only energies are requested, we provide
a mechanism whereby MOPAC7.1 can be launched with a multi-structure
input file and only the energies are parsed from the output.
:type parsed_args: Argparse Namespace instance
:param parsed_args: parsed cmdline arguments
:type maefile: str
:param maefile: name of .mae file in CWD
"""
jobbe = jobcontrol.get_backend()
name, suffix = fileutils.splitext(maefile)
cts = [ct for ct in StructureReader(maefile)]
if parsed_args.mopac_version == MOPAC71:
if len(cts) == 1 or parsed_args.energy_only:
_run_structures(cts, parsed_args, name)
else:
_run_mopac71_in_loop(cts, parsed_args, name)
else:
_run_structures(cts, parsed_args, name)
def _gather_output_files(jobname):
"""
Gather subjob output .mae files into combined output .mae file to be
incorporated with the GUI, including .vis files, .smap file etc.
:type jobname: str
:param jobname: jobname used for output file names
"""
outmae = jobname + '_out.mae'
# Make summary output .mae file, if multiple files present
p = Path('.')
outmaes = sorted(p.glob('*_out.mae'))
if len(outmaes) == 1 and outmae in outmaes:
# Don't need to make summary out .mae file
pass
elif outmaes:
# Need to make summary output .mae file and handle the possibility
# that the jobname clashes with an existing out .mae from a subjob.
with tempfile.NamedTemporaryFile() as fh:
tmpmae = fh.name + '.mae'
with StructureWriter(tmpmae) as writer:
for submae in outmaes:
writer.extend(StructureReader(str(submae)))
shutil.copy(tmpmae, outmae)
else:
print("\nERROR: No subjob output .mae files found!")
msg = f'Please use the {mopac_parser.RETURN_ALL_STRUCTS_FLAG} command line flag'
msg += ' to include failed structures in output .mae.'
msg += '\nCheck subjob files for additional error messages.'
print(msg)
# Create .smap file for all the auxiliary files (currently only .vis
# files) in the current directory. (Note .smap files cannot accept paths
# to sub-directories.)
smapfile = jobname + '_out.smap'
file_logger.make_smapfile(outmae, smapfile)
# Register files with jobcontrol and for GUI incorporation
jobbe = jobcontrol.get_backend()
for visfile in glob.glob('*.vis'):
if jobbe:
jobbe.addOutputFile(visfile)
if jobbe:
jobbe.addOutputFile(outmae)
jobbe.setStructureOutputFile(outmae)
jobbe.addOutputFile(smapfile)
def _run_single_mop_file(mopac_launcher, mopfile, structures=()):
"""
Execute single MOPAC input file on the local machine.
Optional structure.Structure objects can be provided to collect
output data.
:type mopac_launcher: MopacLauncher instance
:param mopac_launcher: API class for executing MOPAC backend binary.
:type mopfile: str
:param mopfile: path to .mop MOPAC input file
:type structures: list of structure.Structure instances
:param structures: optional structures to populate with results
:rtype: list
:return: MOPAC job status for each structure
"""
# Launch MOPAC executable
name, _ = fileutils.splitext(mopfile)
ok = mopac_launcher.run(name, structures)
# Register auxiliary files with jobcontrol (currently only .vis files)
jobbe = jobcontrol.get_backend()
for visfile in glob.glob('*.vis'):
if jobbe:
jobbe.addOutputFile(visfile)
if not all(ok):
outfile = re.sub('.mop$', '.out', mopfile)
print(f"\nMOPAC job {mopfile} failed. ")
print(f"Please check {outfile} for additional information.\n")
return ok
def _run_mae_files(parsed_args):
"""
Execute MOPAC on list of .mae input files. If multiple files are
provided, they will be run in parallel through a JobDJ.
One output .mae file will be generated per input .mae file.
:type parsed_args: Argparse Namespace instance
:param parsed_args: parsed cmdline arguments
"""
maefiles = parsed_args.infiles
if len(maefiles) == 1:
# Run single .mae file locally
_run_single_mae_file(parsed_args, maefiles[0])
else:
# Run multiple .mae files distributed by a JobDJ
cmds = []
for maefile in maefiles:
cmd = ['run', 'semi_emp.py', maefile] + parsed_args.flags
cmds.append(cmd)
_distribute(cmds, parsed_args.keep_subjobs)
# Gather all output files into a single summary output .mae
_gather_output_files(parsed_args.jobname)
def _run_mop_files(mopac_version, mopfiles):
"""
Execute MOPAC on list of .mop input files. If multiple files are
provided, they will be run in parallel through a JobDJ.
:type mopac_version: str (e.g. results_main.MOPAC_MAIN)
:param mopac_version: mopac version to execute in backend.
:type mopfiles: list of str
:param mopfiles: paths to .mop input files
:return: 0 if MOPAC job completed successfully, 1 otherwise
"""
ok = True
if len(mopfiles) == 1:
mopac_launcher = get_mopac_launcher(mopac_version)
ok = _run_single_mop_file(mopac_launcher, mopfiles[0])
else:
# Run multiple .mop files distributed by a JobDJ
cmds = []
for mopfile in mopfiles:
cmd = ['run', 'semi_emp.py', mopfile, mopac_version]
cmds.append(cmd)
_distribute(cmds)
return 0 if ok else 1
def _setup_parallel_environment():
"""
Set up parallel environment defaults
"""
# Currently, we do not support running MOPAC in parallel. Therefore,
# this function is very simple. Parallelism is only implemented
# by running simultaneous serial MOPAC jobs.
# Users should run multi-threaded jobs by specifying -TPP in the
# commandline args and not OMP_NUM_THREADS, which we set to 1 here
# to avoid unpredictable behaviour when its undefined. (MOPAC-119)
os.environ['OMP_NUM_THREADS'] = '1'
def _print_header():
"""
Print some useful information about the job
"""
jobbe = jobcontrol.get_backend()
print('{0:<12}{1}'.format("Release:", mm.mmfile_get_release_name()))
print('{0:<12}{1}'.format("Exec:", os.getenv('MMSHARE_EXEC')))
print('{0:<12}{1}'.format("Host:", socket.gethostname()))
if jobbe and jobbe.job_id:
print('{0:<12}{1}'.format("JobId:", jobbe.job_id))
print('{0:<12}{1}'.format("JobDir:", os.getcwd()))
print('{0:<12}{1}'.format("Date:", time.strftime("%c")))
print(f"\nContents of {os.getcwd()}:")
for root, dirs, files in os.walk(os.getcwd()):
for f in files:
print(os.path.join(root, f))
print()
sys.stdout.flush()
[docs]def main(args):
"""
Run a MOPAC workflow on the local machine.
:type args: list
:param args: cmdline arguments
:rtype: int
:return: status of execution
"""
t0 = time.time()
parsed_args = mopac_parser.parse_args(args)
_setup_parallel_environment()
_print_header()
status = 0
if all(utils.is_mopac_file(x) for x in parsed_args.infiles):
status = _run_mop_files(parsed_args.mopac_version, parsed_args.infiles)
else:
# Assume they are all .mae files
_run_mae_files(parsed_args)
print("\nAll jobs complete.")
print("Finished: %s" % time.strftime("%c"))
print(f"{time.time() - t0:.2f} secs to run {os.path.basename(__file__)}\n")
return status