# Copyright Schrodinger, LLC. All rights reserved.
# Contributors: Mike Beachy, Mark A. Watson
import errno
import glob
import os
import re
import shutil
import tempfile
import zipfile
REMOVE, ZIP, SAVE = "remove", "zip", "save"
[docs]def open_file(basename, suffix):
"""
Open a file with a unique name based on basename and suffix in the form
"basename-<index>.suffix", where "<index>" is an integer that is
incremented until the file can be created. Return the opened file
object.
"""
ix = 0
filename = "%s.%s" % (basename, suffix)
while True:
if not os.path.exists(filename):
try:
fdescriptor = os.open(filename,
os.O_EXCL | os.O_CREAT | os.O_WRONLY)
file_ = os.fdopen(fdescriptor, "w")
break
except OSError as e:
if e.errno != errno.EEXIST:
raise
ix += 1
filename = "%s-%d.%s" % (basename, ix, suffix)
return filename, file_
[docs]def make_scratch_dir(tmpdir, basename):
"""
Make a separate directory for the job to enable easy cleanup.
:type basename: str
:param basename:
A name that will be used to generate a scratch dir name. The name used
will either be basename or basename.1, basename.2, etc.
:type tmpdir: str
:param tmpdir:
A directory that will hold the scratch directory.
:returns:
The name of the directory that was created.
"""
scr_dir_base = os.path.join(tmpdir, basename)
ix = 0
scr_dir = scr_dir_base
while True:
if not os.path.exists(scr_dir):
try:
os.mkdir(scr_dir)
break
except OSError as e:
if e.errno != errno.EEXIST:
raise
ix += 1
scr_dir = "%s.%d" % (scr_dir_base, ix)
return os.path.abspath(scr_dir)
[docs]def run_cleanup(results, start_dir, scr_dir, jobname, save_output_file,
scratch_cleanup):
"""
Cleanup from a subdirectory run.
This will completely nuke the scratch dir and cd to the start_dir.
:type results: MopacResults
:param results:
A MopacResults object, in which output filename and zipped_output
filenames are stored if available.
:type start_dir: str
:param start_dir:
The launch directory for the job.
:type scr_dir: str
:param scr_dir:
The scratch directory for the job as an absolute path.
:type jobname: str
:param jobname:
The base job name.
:type save_output_file: bool
:param save_output_file:
If True, copy the output file from the scratch dir back to the
starting directory.
:type scratch_cleanup: enum
:param scratch_cleanup:
If REMOVE, simply remove the scratch dir at the end of the job; if
ZIP, create a zip file of the scratch directory contents; if SAVE,
do no cleanup.
"""
os.chdir(scr_dir)
if save_output_file and os.path.isfile(jobname + ".out"):
shutil.copy(jobname + ".out", start_dir)
if results:
results.set_output_file(jobname + ".out")
# Always copy .vis files up to the start_dir. They are registered
# as outputfiles by the semi_emp.py driver
L = glob.glob('*.vis')
for visfile in L:
shutil.copy(visfile, start_dir)
if scratch_cleanup == ZIP:
filename, file_ = open_file(os.path.join(start_dir, jobname), "zip")
print("Creating zip file '%s'." % filename)
try:
zfile = zipfile.ZipFile(file_, "w", zipfile.ZIP_DEFLATED)
except RuntimeError:
# The zipfile docs say that ZIP_DEFLATED isn't always available.
# If it isn't, just bundle up the files without compression.
zfile = zipfile.ZipFile(file_, "w")
for f in os.listdir(os.path.curdir):
zfile.write(f, os.path.join(jobname, f))
zfile.close()
os.chdir(start_dir)
if scratch_cleanup != SAVE:
shutil.rmtree(scr_dir)
[docs]def is_mopac_file(filename):
"""
Determine if the file provided is a MOPAC input file or not. Returns True
or False.
"""
if filename.endswith(".mop") or filename.endswith(".dat"):
return True
else:
return False
[docs]def cleanup_external(inputfile, start_dir):
"""
If the input file has a relative path specification for an EXTERNAL
file, rewrite it to the local dir, then copy the original file to the
current directory.
"""
filename = None
newfd, newname = tempfile.mkstemp(dir=os.path.curdir)
newfile = os.fdopen(newfd, 'w')
# Pattern to search for EXTERNAL keyword specification.
_external_re = re.compile(r"EXTERNAL *= *(\S+)", re.I)
for ix, line in enumerate(open(inputfile)):
match = _external_re.search(line)
if match:
external_file = match.group(1)
if not os.path.isabs(external_file):
filename = os.path.basename(external_file)
line = _external_re.sub("EXTERNAL=%s" % (filename,), line)
shutil.copy(os.path.join(start_dir, external_file),
os.path.curdir)
newfile.write(line)
newfile.close()
if filename:
os.remove(inputfile)
os.rename(newname, inputfile)
[docs]def convert_sparse_dict_to_list(sdict):
"""
:type sdict: dict
:param sdict: dictionary keys must be integers.
:return list with non-key elements as None.
e.g. { 2:'a', 4:'b', 5:'c'} returns
[None, 'a', None, 'b', 'c']
"""
# unittested
if not sdict:
return []
lst = [None for i in range(max(list(sdict)))]
for idx, val in sdict.items():
try:
lst[idx - 1] = val
except TypeError:
raise TypeError('sdict expects integer keys!')
except:
raise RuntimeError('problem in convert_sparse_dict_to_list')
return lst