"""
Copyright Schrodinger, LLC. All rights reserved.
"""
import contextlib
import json
import os
import os.path
import pathlib
import random
import re
import shutil
import sys
import tarfile
import tempfile
import zipfile
from collections import OrderedDict
from collections import defaultdict
import psutil
import schrodinger
from schrodinger import gpgpu
from schrodinger import structure
from schrodinger.application.desmond import cms
from schrodinger.application.desmond import license as dlicense
from schrodinger.application.matsci import msutils
from schrodinger.application.matsci import parserutils
from schrodinger.application.matsci import msprops
from schrodinger.application.matsci import textlogger
from schrodinger.infra import jobhub
from schrodinger.infra import mmjob
from schrodinger.job import jobcontrol
from schrodinger.job import launchapi
from schrodinger.job import launcher
from schrodinger.job import queue
from schrodinger.Qt import QtCore
from schrodinger.structure import workflow_action_menu as wam
from schrodinger.test.stu.common import zip_directory
from schrodinger.utils import cmdline
from schrodinger.utils import fileutils
from schrodinger.utils import license
from schrodinger.utils import subprocess
AMCELL_NO_SYSTEM_OUT = '-amcell.maegz'
DRIVER = 'driver'
ARGS = 'args'
RESTART_PARAMETERS_FILENAME = 'parameters.cmd'
RESTART_PROGRAM_NAME = 'RestartWorkflow'
RESTART_DEFAULT_JOBNAME = 'restart_workflow'
CLEAN_AND_UNIQUIFY_MAX_LEN = 100
DOLLARS = re.compile(r'([^\\])\$')
SOURCE_PATH_PROP = msprops.SOURCE_PATH_PROP
TGZ_FORMAT = tarfile.PAX_FORMAT
FROM_SUBJOB = 'from_subjob'
LOG_TAG = 'log_tag'
AUTOHOST = 'autohost'
CHECKED_OUT_MATSCI_MAIN = None
WAM_TYPES = wam.WorkflowType
[docs]def get_logging_tag(job, tag=LOG_TAG):
"""
Get the logging tag of a job.
:param job: a job object
:type job: `schrodinger.job.queue.JobControlJob`
:param tag: the job attribute containing the info.
:type tag: str
:return: the logging tag
:rtype: str
"""
return getattr(job, tag, "")
[docs]def add_outfile_to_backend(file_fn,
backend=None,
set_structure_output=False,
stream=False,
wait=True):
"""
Add output file or directory to the backend.
:type file_fn: str
:param file_fn: File or directory name
:type backend: `schrodinger.job._Backend` or None
:param backend: Backend handle. If None, a backend will be checked for. If
no backend is found, nothing will be done.
:type set_structure_output: bool
:param set_structure_output: If True, set this structure as output
:type stream: bool
:param stream: If True, stream the file to the submission host
:type wait: bool
:param wait: if True wait until the job has finished before
the output file or directory is copied back to the launch host,
otherwise copy the file or directory to the launch host when this
function is called, only relevant if stream is False
"""
if not backend:
backend = jobcontrol.get_backend()
if not backend:
return
if stream:
backend.addLogFile(file_fn)
elif not wait:
backend.copyOutputFile(file_fn)
else:
backend.addOutputFile(file_fn)
if set_structure_output:
backend.setStructureOutputFile(file_fn)
[docs]def log_structures_found(qjob,
structure_files,
log,
jobstates_for_logging=None):
"""
Log structures info for a job.
:type qjob: `schrodinger.job.queue.JobControlJob`
:param qjob: The subjob to find structures
:type structure_files: dict
:param structure_files: Keys are subjobs, values are sets of structure
file names
:type log: callable
:param log: function(msg) writes msg to log file
:type jobstates_for_logging: None or list of str
:param jobstates_for_logging: Log info for subjobs in these states
"""
if jobstates_for_logging is None:
jobstates_for_logging = {queue.JobState.DONE, queue.JobState.FAILED}
if qjob.state not in jobstates_for_logging:
return
sfiles = structure_files[qjob]
# When logging information about a job object, if the object has an attribute
# named the module constant LOG_TAG, the value of that attribute will precede
# the rest of the log message. This enables log file lines to be attributed
# to specific jobs.
msg = get_logging_tag(qjob)
if sfiles:
msg += f"Found completed structures: {', '.join(sfiles)}"
else:
jobname = qjob.name if qjob.name else ' '.join(qjob._command)
msg += f'No output structure found for {jobname}'
log(msg)
[docs]def run_jobdj_and_add_files(jobdj,
log,
expect_exts=None,
exclude_exts=None,
jobdj_dir=os.curdir):
"""
Run the subjobs currently queued up in the jobdj, adding their files to the
current backend for copying back to the job directory and locating the
expected structure output file for each job.
:type jobdj: `schrodinger.job.queue.JobDJ`
:param jobdj: The JobDJ with queued up jobs to run
:type log: callable
:param log: function(msg) writes msg to log file
:type expect_exts: None or list of str
:param expect_exts: The expected extensions of the output files
:type exclude_exts: None or list of str
:param exclude_exts: The output files found with the excluded extensions are
not copied back to the original job directory or documented into logger
:type jobdj_dir: str
:param jobdj_dir: jobdj_dir is the relative path from where the backend is
created to where the jobdj is created. Using /scr/user/jobname/subdir1/subdir2
as an example, normally backend and jobdj are created in /scr/user/jobname/,
and thus jobdj_dir by default is os.curdir. If the jobdj is created inside
/scr/user/jobname/subdir1/, the jobdj_dir is subdir1 as backend is created
in /scr/user/jobname/. In this example, the backend is in /scr/user/jobname/,
jobdj is in /scr/user/jobname/subdir1, and the subjob can run in
/scr/user/jobname/subdir1/subdir2. (subjob.getCommandDir() gives subdir2)
:rtype: list
:return: A list of structure output file names, sorted alphabetically
"""
# Run all jobs and make sure we grab their output
backend = jobcontrol.get_backend()
structure_files = defaultdict(set)
completed_jobs = set()
if expect_exts is None:
expect_exts = ['.cms', '.mae', '.zip']
if exclude_exts is None:
exclude_exts = []
def _finalize(qjob, force=False):
"""
Process status changes for a JobDJ job. Log relevant information for
completed jobs.
:param qjob: JobDJ's snapshot of job state at status change
:type qjob: schrodinger.job.queue.BaseJob
:param bool force: Whether to consider jobs complete even if
job.isComplete() returns False
"""
if not isinstance(qjob, queue.JobControlJob):
return
# The status change could be to RUNNING
if not qjob.hasExited():
return
# We attempt to finalize each job as it is retryable failed, failed
# or done so that its files are copied back even if the subjob fails
# or the parent job gets killed before all subjobs finish
if finalize_subjob(qjob,
backend,
structure_files,
expect_exts,
log,
exclude_exts=exclude_exts,
jobdj_dir=jobdj_dir) or force:
completed_jobs.add(qjob)
# Log information about results found and not found for completed
# Done and Failed subjobs
log_structures_found(qjob, structure_files, log)
# status_change_callback is called every time a job status changes. For
# example, RUNNING->DONE.
jobdj.run(status_change_callback=_finalize)
for qjob in jobdj.all_jobs:
if qjob in completed_jobs:
continue
# If multiple jobs complete simulatenously, there may be some jobs missing
# from the status_change_callback. Also jobs might be returned before
# isComplete returns True. This should catch all these caes.
_finalize(qjob, force=True)
# Sorting ensures that structures for similar systems appear near each
# other in the PT after incorporation
output_st_files = []
for subjob_outfiles in structure_files.values():
output_st_files += [
os.path.basename(cmd_dir_filename)
for cmd_dir_filename in subjob_outfiles
]
output_st_files.sort()
return output_st_files
[docs]def finalize_subjob(subjob,
backend,
structure_files,
structure_extensions,
log,
exclude_exts=None,
jobdj_dir=os.curdir):
"""
Mark subjob output and log files for copying back to the job directory,
and find the structure output file if there is one
:type subjob: `schrodinger.job.queue.JobControlJob` or None
:param subjob: The subjob to mark files from
:type backend: `schrodinger.job.jobcontrol._Backend` or None
:param backend: The current backend or None if there isn't one
:type structure_files: dict
:param structure_files: If an output structure file is found, it will
be added to this dict. Keys are subjobs, values are sets of structure
file names
:type structure_extensions: list of str
:param structure_extensions: The expected extension of the structure files
:type log: function
:param log: function(msg) writes msg to log file
:type exclude_exts: None or list of str
:param exclude_exts: The output files found with the excluded extensions are
not copied back to the original job directory or documented into logger
:type jobdj_dir: str
:param jobdj_dir: jobdj_dir is the relative path from where the backend is
created to where the jobdj is created. Using /scr/user/jobname/subdir1/subdir2
as an example, normally backend and jobdj are created in /scr/user/jobname/,
and thus jobdj_dir by default is os.curdir. If the jobdj is created inside
/scr/user/jobname/subdir1/, the jobdj_dir is subdir1 as backend is created
in /scr/user/jobname/. In this example, the backend is in /scr/user/jobname/,
jobdj is in /scr/user/jobname/subdir1, and the subjob can run in
/scr/user/jobname/subdir1/subdir2. (subjob.getCommandDir() gives subdir2)
:rtype: bool
:return: True if the job has completed, False if not
"""
ajob = subjob.getJob()
if not ajob:
# Job has not been submitted yet, or is just in that process
return
if exclude_exts is None:
exclude_exts = []
outfiles = ajob.OutputFiles
if hasattr(subjob, 'outfiles'):
# Permittivity workflow may set outfiles outside the subjob and this
# combines the standard ajob.OutputFiles with the customized job.outfiles
outfiles += subjob.outfiles
sub_dir = subjob.getCommandDir()
path = sub_dir if jobdj_dir == os.curdir else os.path.join(
jobdj_dir, sub_dir)
add_subjob_files_to_backend(ajob,
path=path,
backend=backend,
exclude_exts=exclude_exts)
for filename in outfiles:
if sub_dir:
filename = os.path.join(sub_dir, filename)
if not os.path.exists(filename):
continue
if any([filename.endswith(x) for x in exclude_exts]):
continue
extension = fileutils.splitext(filename)[1]
for structure_extension in structure_extensions:
if extension.startswith(structure_extension):
structure_files[subjob].add(filename)
# Completed means finished running. It does not mean success or failure.
# When logging information about a job object, if the object has an attribute
# named the module constant LOG_TAG, the value of that attribute will precede
# the rest of the log message. This enables log file lines to be attributed
# to specific jobs.
if ajob.isComplete():
msg = get_logging_tag(subjob)
msg += f'Job {ajob.Name} completed'
log(msg)
return True
else:
return False
[docs]def add_subjob_files_to_backend(subjob,
path=None,
backend=None,
exclude_exts=None,
also_input=False):
"""
Add all the output and log files from a subjob to the backend of this job so
that they get copied back to the original job directory.
:note: subjob log files are added as output files instead of log files. They
will not be streamed back to the original job directory but instead
copied back at the end of this job like a normal output file.
:type subjob: `schrodinger.job.jobcontrol.Job` or
`schrodinger.job.queue.JobControlJob`
:param subjob: The subjob to add files from.
:type path: str
:param path: The path to the subjob directory from where the backend is created
if it was not run in the same directory as this job. Use `FROM_SUBJOB` to
get the subjob directory from a JobControlJob object - this will be ignored
if subjob is a Job object.
:type backend: `schrodinger.job.jobcontrol._Backend`
:param backend: The backend if one exists
:type exclude_exts: None or list of str
:param exclude_exts: The output files found with the excluded extensions are
not copied back to the original job directory or documented into logger
:param bool also_input: Also add the job input files to the backend
"""
if not backend:
backend = jobcontrol.get_backend()
if not backend:
return
if isinstance(subjob, queue.JobControlJob):
if path == FROM_SUBJOB:
path = subjob.getCommandDir()
subjob = subjob.getJob()
if not subjob:
return
if exclude_exts is None:
exclude_exts = []
# Subjob log files may have already been registered with this backend as
# a log (streaming) file so that the subjob log gets streamed back
# into the original job directory. It is an error to register a file
# with the same backend as both a streaming and output file. We'll check
# against the list of streaming log files to avoid this error.
this_job_logfiles = set(backend.getJob().LogFiles)
subjob_files = subjob.OutputFiles + subjob.LogFiles
if also_input:
# Note that InputFiles are stored with absolute paths, which breaks
# everything. Just use the base file name. Also, ignore job control
# inputs that start with '.'
infiles = [os.path.basename(x) for x in subjob.InputFiles]
subjob_files += [x for x in infiles if not x.startswith('.')]
for filename in subjob_files:
if os.path.isabs(filename):
# Jobcontrol adds files with absolute paths to the OutputFiles list.
# We don't want those files and they cause a traceback with
# addOutputFile and old job control (JOBCON-7659)
continue
if any([filename.endswith(x) for x in exclude_exts]):
continue
if path:
filename = os.path.join(path, filename)
# Trying to register a log file as an out file both raises a traceback
# and prints messages to the log file, so we do a pre-check rather than
# a try/except to avoid the log file messages
if filename not in this_job_logfiles:
backend.addOutputFile(filename)
[docs]def determine_source_path(backend=None, job=None):
"""
Determine the original job directory. This is obtained from the job control
Job object for this process. If no Job object is found, the current
directory is used.
:type backend: `schrodinger.job.jobcontrol._Backend`
:param backend: The job control backend. Will be used to obtain the job
object if no job is supplied. If neither backend or job are supplied,
the backend will be obtained from job control (if one exists).
:type job: `schrodinger.job.jobcontrol.Job`
:param job: The job control job for this process. If not supplied, will be
obtained from the backend (if one exists).
:rtype: str
:return: The directory that is the source path. Will be either the
OrigLaunchDir property of the job or the current directory if not
running under job control.
"""
if not job:
if not backend:
backend = jobcontrol.get_backend()
if backend:
job = backend.getJob()
else:
# Not running under job control, so running in the local directory
return os.getcwd()
try:
sourcedir = job.OrigLaunchDir
except AttributeError:
# We don't know that this could ever happen, but just being safe
sourcedir = ""
return sourcedir
[docs]def set_source_path(struct, backend=None, job=None, path=None):
"""
Set the source path property to the original job directory. This is obtained
from the job control Job object for this process. If no Job object is found,
the current directory is used.
:type struct: `schrodinger.structure.Structure`
:param struct: The structure to set the property on. Note that property
setting works properly on both Structure and Cms objects.
:type backend: `schrodinger.job.jobcontrol._Backend`
:param backend: The job control backend. Will be used to obtain the job
object if no job is supplied. If neither backend or job are supplied,
the backend will be obtained from job control (if one exists).
:type job: `schrodinger.job.jobcontrol.Job`
:param job: The job control job for this process. If not supplied, will be
obtained from the backend (if one exists).
:param str path: Manually set this path to the source directory, overriding
all other options
:rtype: str
:return: The directory set as the source path. Will be either the
OrigLaunchDir property of the job or the current directory if not
running under job control.
"""
if path:
sourcedir = path
else:
sourcedir = determine_source_path(backend=backend, job=job)
# Note - deleting the property first is a workaround for SHARED-6890 just in
# case we are working in Maestro
if isinstance(struct, cms.Cms):
struct.remove_cts_property(SOURCE_PATH_PROP)
struct.set_cts_property(SOURCE_PATH_PROP, sourcedir)
else:
struct.property.pop(SOURCE_PATH_PROP, None)
struct.property[SOURCE_PATH_PROP] = sourcedir
return sourcedir
[docs]def get_source_path(source, existence_check=True):
"""
Get the source path to the original job directory
:type source: `schrodinger.structure.Structure` or
`schrodinger.project.ProjectRow`
:param source: Either the ProjectRow or the structure to obtain the
source information from. If a structure, can be either a Structure or a Cms
object.
:type existence_check: bool
:param existence_check: If True (default), a blank string will be returned
if the source path does not exist. If False, the path is returned regardless
of whether it exists or not.
:rtype: str
:return: The original job directory or a blank string if none is found
"""
try:
path = source.property.get(SOURCE_PATH_PROP, "")
except (AttributeError, TypeError):
# This is neither a Structure nor a ProjectRow
raise TypeError('source must be a Structure or ProjectRow')
if path and (not existence_check or os.path.exists(path)):
# Add on any job subdirectory for this structure
subdir = source.property.get(msprops.SUBDIRECTORY_PROP)
if subdir:
subpath = os.path.join(path, subdir)
if (not existence_check or os.path.exists(subpath)):
path = subpath
return path
return ""
[docs]def get_file_path(struct, prop):
"""
Get the path of the file defined by source path and property name
:param struct: The structure whose property is checked
:type struct: `schrodinger.structure.Structure`
:param prop: property pointing to a file
:type prop: str
:return: path of the file if found
:rtype: str or None
"""
filename = struct.property.get(prop)
if not filename:
return
source_path = get_source_path(struct)
filepath = os.path.join(source_path, filename)
if not os.path.isfile(filepath):
return
return filepath
[docs]def prepare_job_spec_builder(argv,
program_name,
default_jobname,
input_fn=None,
set_driver_reserves_cores=False,
schrodinger_product=None):
"""
Prepare generic job specification builder.
If set_driver_reserves_cores script (driver) is set to True, script is
expected to use all the cores (cpus), similar to umbrella mode in multisim.
For an example see stress-strain driver. For all other cases (such as in
opto/hopping/amorphous) keep set_driver_reserves_cores to False.
:type argv: list
:param argv: The list of command line arguments, including the script name
at [0], similar to that returned by sys.argv
:type program_name: str
:param program_name: Program name
:type default_jobname: str
:param default_jobname: Default job name
:type input_fn: str
:param input_fn: Input filename
:type set_driver_reserves_cores: bool
:param set_driver_reserves_cores: If True, enable
launchapi.setDriverReservesCores
:type schrodinger_product: str
:param schrodinger_product: A product directory to search for the
script/executable. This should be the name of a directory under
SCHRODINGER without the trailing version (i.e. the "-v*" part).
:rtype: `launchapi.JobSpecificationArgsBuilder`
:return: Job specification builder object
"""
job_builder = launchapi.JobSpecificationArgsBuilder(
argv,
use_jobname_log=True,
schrodinger_product=schrodinger_product,
program_name=program_name,
default_jobname=default_jobname)
if input_fn and os.path.isfile(input_fn):
job_builder.setInputFile(input_fn)
if set_driver_reserves_cores:
job_builder.setDriverReservesCores(True)
return job_builder
[docs]def add_folder_to_job_builder(job_builder, folder_path):
"""
Add folder (trajectory folder) to job directory.
:type job_builder: `launchapi.JobSpecificationArgsBuilder`
:param job_builder: Job specification builder object.
:type folder_path: str
:param folder_path: Full path to the folder that needs to copied.
"""
file_path = fileutils.get_files_from_folder(folder_path)
for (abs_pathname, runtime_path) in file_path:
job_builder.setInputFile(abs_pathname, runtime_path=runtime_path)
[docs]def add_desmond_license_to_job_builder(job_builder,
license_host=None,
toplevel_required=True):
"""
Add desmond GPU license to job builder.
:param launchapi.JobSpecificationArgsBuilder job_builder: Job specification
builder object
:type license_host: str or None
:param license_host: Host for which license should be generated. If None,
use host from the top level args
:param bool toplevel_required: If true, it's required that the current
process was executed through toplevel.py, which causes
TOPLEVEL_HOST_ARGS to be set
:rtype: str or None
:return: None on success, error message on error
"""
if license_host:
args = [cmdline.FLAG_HOST, license_host]
else:
toplevel_args = os.getenv(jobcontrol.TOPLEVEL_HOST_ARGS_ENV)
if not toplevel_args and not toplevel_required:
return
if not toplevel_args:
return ('ERROR: Must specify %s when using umbrella mode.' %
cmdline.FLAG_HOST)
if (toplevel_args.find(cmdline.FLAG_SUBHOST) >= 0):
return ('ERROR: Umbrella mode does not support the %s option.' %
cmdline.FLAG_SUBHOST)
host = msutils.get_val_from_cmdline(toplevel_args, cmdline.FLAG_HOST)
if not host:
return 'ERROR: Could not find %s argument.' % cmdline.FLAG_HOST
args = toplevel_args.split()
cmd = dlicense.add_md_lic(args)
lic = msutils.get_val_from_cmdline(cmd, cmdline.FLAG_LIC)
if not lic:
return 'ERROR: Could not assign a license.'
token, count = lic.split(':')
job_builder.addLicense(license.LICENSE_BY_NAME[token], count)
[docs]def add_desmond_license_to_job_spec(job_spec, license_host):
"""
Add desmond GPU license based on the license host to the job spec.
:param launchapi.JobSpecification job_spec: Job specification
:param str license_host: Host for which license should be generated
:rtype: str or None
:return: None on success, error message on error
"""
args = [cmdline.FLAG_HOST, license_host]
cmd = dlicense.add_md_lic(args)
lic = msutils.get_val_from_cmdline(cmd, cmdline.FLAG_LIC)
if not lic:
return 'ERROR: Could not assign a license.'
token, count = lic.split(':')
job_spec.addLicense(license.LICENSE_BY_NAME[token], count)
[docs]def parse_restart_parameters_file(path):
"""
Parse parameters file.
Format of the file:
1st line is driver's filename
2nd line is original arguments passed to the driver
:type path: str
:param path: Path to the file with original arguments
:rtype: dict
:return: Dictionary with parsed values
"""
params = {}
with open(path, 'r') as pfile:
params[DRIVER] = pfile.readline().strip()
params[ARGS] = pfile.readline().strip().split()
return params
[docs]def write_restart_parameters_file(driver, args, outpath):
"""
Write out original arguments to parameters file and add the file as an
output file to any existing jobcontrol backend.
:type driver: str
:param driver: Driver's filename
:type args: list
:param args: Original arguments passed to driver
:type outpath: str
:param outpath: Path to the parameters file to write original arguments
"""
args_str = ' '.join(args)
# Remove all occurrences of -HOST/-host in args
args_str = re.sub(r'(?i)-HOST [^\s]+', '', args_str).strip()
# Remove all occurrences of -TPP in args
args_str = re.sub(r'(?i)%s [^\s]+' % '-TPP', '', args_str).strip()
# For now, remove all occurrences of FLAG_USEZIPDATA in args
args_str = re.sub(r'(?i)%s [^\s]+' % parserutils.FLAG_USEZIPDATA, '',
args_str).strip()
with open(outpath, 'w') as outfile:
outfile.write('%s\n' % driver)
outfile.write(args_str + '\n')
backend = jobcontrol.get_backend()
if backend:
backend.addOutputFile(outpath)
[docs]def get_restart_id_filename(jobname):
"""
For the given restart jobname, return the name of the file containing the
job id.
:rtype: str
:return: The name of the restart jobid file
"""
return jobname + '.jobid'
[docs]def archive_job_data(path, files_path):
"""
Create a gzipped tar archive in the current directory from the provided list
of file paths. All the error handling is on the caller function.
:type path: path
:param path: Path to the new archive to be created
:type path: files_path
:param path: List of files to be archived
"""
with tarfile.open(name=path, mode='w:gz', format=TGZ_FORMAT) as tar:
for file_path in set(files_path):
tar.add(file_path)
[docs]def create_restart_launcher(script, prog, input_name, output_name, zip_name,
options, args):
(viewname, disp, proj, host, jobname) = get_restart_options(options)
scriptlauncher = launcher.Launcher(script=script,
jobname=jobname,
viewname=viewname,
disp=disp,
proj=proj,
prog=prog,
copyscript=False,
runtoplevel=True)
args = ['-use_zip_data', zip_name] + args
args = ['-HOST', host] + args
scriptlauncher.addScriptArgs(args)
scriptlauncher.addInputFile(input_name)
scriptlauncher.addInputFile(zip_name)
scriptlauncher.addOutputFile(output_name)
scriptlauncher.setStructureOutputFile(output_name)
# No need to update the parameters file from the restart
return scriptlauncher
[docs]def create_restart_jobcmd(driver_path, zip_fn, restart_options, args,
default_jobname):
"""
Generate command-line list for the job restart.
:type driver_path: str
:param driver_path: Path to the driver
:type zip_fn: str
:param zip_fn: Filename of the archive with all restart files
:type restart_options: `argparse.Namespace`
:param restart_options: The object holding all the option values
:type args: list
:param args: List of the arguments passed to the original run
:type default_jobname: str
:param default_jobname: Default job name
:rtype: list
:return: List of parameters ready for job submission
"""
(viewname, disp, proj, host, jobname) = get_restart_options(restart_options)
# Prepending
args[:0] = [parserutils.FLAG_USEZIPDATA, zip_fn]
if host:
args[:0] = ['-HOST', host]
if proj:
args[:0] = ['-PROJ', proj]
if disp:
args[:0] = ['-DISP', disp]
if viewname:
args[:0] = ['-VIEWNAME', viewname]
if jobname:
args[:0] = ['-JOBNAME', jobname]
else:
args[:0] = ['-JOBNAME', default_jobname]
args = ['$SCHRODINGER/run', driver_path] + args
return args
[docs]def write_idfile(jobobj):
"""
Store the job id in a file as a signal to the GUI that a new job has been
launched.
:type jobobj: `schrodinger.job.jobcontrol.Job`
:param jobobj: The object holding all the option values
"""
filename = get_restart_id_filename(jobobj.name)
idfile = open(filename, 'w')
idfile.write(jobobj.jobid)
idfile.close()
[docs]def get_string_from_flag(flag):
"""
Return the string from the flag
:type flag: str
:param flag: The flag for the desired option
:rtype: str
:return: The string from the flag (the flag minus the leading dash)
"""
if flag.startswith('-'):
flag = flag[1:]
return flag
[docs]def get_option(options, flag):
"""
Return the option value associated with flag
:type options: `argparse.Namespace`
:param options: The object holding all the option values
:type flag: str
:param flag: The flag for the desired option.
:rtype: any
:return: The value associated with flag, or None if flag (minus any leading
dashes) is not found as a property on options
"""
flag = get_string_from_flag(flag)
try:
return getattr(options, flag)
except AttributeError:
return None
[docs]def set_option(options, flag, value):
"""
Set the option value associated with flag
:type options: `argparse.Namespace`
:param options: The object holding all the option values
:type flag: str
:param flag: The flag for the desired option. If the string starts with a
'-', the '-' is removed.
:type value: any
:param value: The value to set the option.flag value to
"""
flag = get_string_from_flag(flag)
setattr(options, flag, value)
[docs]def get_restart_options(options):
"""
Get the command line options from the -restart_x flags
:rtype: tuple
:return: tuple of strings (viewname, incorporation, project, host:cpu,
jobname)
"""
restart_viewname = get_option(options, parserutils.FLAG_RESTART_VIEWNAME)
restart_disp = get_option(options, parserutils.FLAG_RESTART_DISP)
restart_proj = get_option(options, parserutils.FLAG_RESTART_PROJ)
restart_host = get_option(options, parserutils.FLAG_RESTART_HOST)
restart_jobname = get_option(options, parserutils.FLAG_RESTART_JOBNAME)
return (restart_viewname, restart_disp, restart_proj, restart_host,
restart_jobname)
[docs]def seed_random_number_generator(options, log=None):
"""
Seed the random number generator based on the command line options. If there
is no seed in the command line options, a random value is used.
:type options: `argparse.Namespace`
:param options: The command line options from argparse. Note that passing in
None for options will have the same affect as if the seed flag does not
exist on options (i.e. a random value will be generated).
:type log: function
:param log: A function to log the seed value. Should take a single str
argument.
:rtype: int
:return: The seed used for the generator
"""
seed = get_option(options, parserutils.FLAG_RANDOM_SEED)
if seed is None:
seed = random.randint(parserutils.RANDOM_SEED_MIN,
parserutils.RANDOM_SEED_MAX)
random.seed(seed)
if log:
log(f'Random number generator seeded with {seed}')
return seed
[docs]def check_license(panel=None,
token=license.MATERIALSCIENCE_MAIN,
name="",
as_validator=False,
fall_back_tokens=None):
"""
Check if a valid token exists. If called from Maestro, also check out and
hold a MATERIALSCIENCE_MAIN token.
:type panel: schrodinger.ui.qt.appframework.AppFramework
:param panel: panel to use to put up an error dialog if no license
:type token: `schrodinger.utils.license` constant
:param token: A token type from the schrodinger.utils.license module, such
as MATERIALSCIENCE_MAIN or MATERIALSCIENCE_GA
:type name: str
:param name: The user-identifiable name for the token - used for error
messages. If not provided, the string used in the license module for this
token (if one exists) will be used.
:type as_validator: bool
:param as_validator: If True, this function will work as an AF2 validation
method. Instead of posting a dialog or printing a message for a failed
license check, it will return (False, error_message).
:type fall_back_tokens: list or None
:param fall_back_tokens: if present specifies that if the intended license
check from the given token or name fails then attempt to get a valid
license by running through this ordered list of fall back tokens, each
of which is a `schrodinger.utils.license` constant for a non-MATSCI
product
:rtype: bool or (bool, str)
:return: True if valid license exists. If no valid license exists, False
will be returned by default, but (False, msg) will be returned if
as_validator=True. Note that (False, msg) evalutes to True so must be
handled by the calling routine as not a boolean if as_validator=True.
"""
global CHECKED_OUT_MATSCI_MAIN
fall_back_tokens = fall_back_tokens or []
# below is code that assumes that all calls of this function made from
# GUIs are to check out and hold on to a MATSCI main license, this is to
# prevent more than the allotted number of users from simultaneously
# running Maestro sessions using MATSCI products, in the case of using
# valid fall back tokens we do not want to block users due to exceeding
# the allotted number of users and so we use that code below, however
# we need to then ensure that the fall back tokens do NOT include those
# for any MATSCI products because otherwise that would prevent holding
# on to the license and thus allow unlimited users from GUIs
assert not any([license.is_matsci(token=x) for x in fall_back_tokens])
if not name:
try:
name = license.LICENSE_NAMES[token]
except KeyError:
pass
msg = ''
# Check out and hold a MATERIALSCIENCE_MAIN license if calling from Maestro
if schrodinger.get_maestro():
if not CHECKED_OUT_MATSCI_MAIN or not CHECKED_OUT_MATSCI_MAIN.isValid():
# Tampering with licensing is a violation of the license agreement
CHECKED_OUT_MATSCI_MAIN = license.License(
license.MATERIALSCIENCE_MAIN)
if not CHECKED_OUT_MATSCI_MAIN.isValid() and not fall_back_tokens:
msg = ('There are no remaining MATERIALSCIENCE_MAIN license '
'tokens. Materials Science features cannot be used in '
'Maestro.')
# Tampering with licensing is a violation of the license agreement
if not msg:
for token in [token] + fall_back_tokens:
if license.is_license_available(token):
break
else:
msg = (f'No {name} license token is available. '
'No calculation can be run.')
if msg:
if as_validator:
return (False, msg)
elif panel:
panel.error(msg)
else:
print(msg)
return False
return True
[docs]def check_licenses(*tokens, as_validator=False):
"""
Check if valid tokens exist. If called from Maestro, also check out and
hold a MATERIALSCIENCE_MAIN token.
:type tokens: list[schrodinger.utils.license]
:param tokens: List of license tokens
:type as_validator: bool
:param as_validator: If True, this function will work as an AF2 validation
method. Instead of posting a dialog or printing a message for a failed
license check, it will return (False, error_message).
:rtype: bool or (bool, str)
:return: True if valid license exists. If no valid license exists, False
will be returned by default, but (False, msg) will be returned if
as_validator=True. Note that (False, msg) evalutes to True so must be
handled by the calling routine as not a boolean if as_validator=True.
"""
for token in tokens:
ret = check_license(token=token, as_validator=as_validator)
if ret is not True:
return ret
return True
[docs]def create_run_dir(panel, jobname):
"""
Create a subdirectory to run a job in, asking the user and removing existing
directories if needed.
:type panel: schrodinger.ui.qt.appframework.AppFramework
:param panel: panel to use to put up an error dialog if no license
:type jobname: str
:param jobname: The name of the job. The directory will be jobname + _dir
:rtype: str or None
:return: The path to the directory or None if an existing directory was
found and the user elected not to remove it
"""
outdir = os.path.join(os.getcwd(), jobname + '_dir')
if os.path.exists(outdir):
if not panel.question('The job directory, %s, already exists.\nWould'
' you like to delete its contents and '
'continue?' % os.path.basename(outdir)):
return None
def force_remove(func, path, excinfo):
# Try to remove any difficult to rm file
if func in (os.rmdir, os.remove):
fileutils.force_remove(path)
else:
raise
shutil.rmtree(outdir, onerror=force_remove)
os.mkdir(outdir)
return outdir
[docs]def string_to_value(string):
"""
Change a text string from a file to a value. Converts string values of
special Python tokens such as True, False or None to the Python tokens.
Converts numbers to int or float if possible.
:type string: str
:param string: The string to convert
:return: string converted to, in order of preference: [True|False|None],
int, float, or input type
"""
literals = {'True': True, 'False': False, 'None': None}
if string in literals:
# Special words
value = literals[string]
else:
# Try to convert to a number if possible
try:
value = int(string)
except ValueError:
try:
value = float(string)
except ValueError:
value = string
return value
[docs]@contextlib.contextmanager
def working_directory(path):
"""
A context manager which changes the working directory to the given
path, and then changes it back to its previous value on exit.
"""
prev_cwd = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(prev_cwd)
[docs]class StringCleaner(object):
"""
Manages the cleaning of strings.
"""
[docs] def __init__(self, extra_replacement_pairs=None, separator='-'):
"""
Populate an instance with some defaults. The replacement
dictionary needs to be set such that the most specific
replacements occur last. This is because the replacements
should be done in a certain order, for example ('C:\\', '')
should be done before (':', '') and ('\\', ''), and because
people tend to append to an iterable rather than prepend we
will traverse the iterable backwards.
:type extra_replacement_pairs: list of tuples
:param extra_replacement_pairs: each tuple in this list contains
a single replacement pair, i.e. a single substring to be replaced
and a single substring to replace it.
:type separator: str
:param separator: in the case of non-unique strings this is the
string that separates the non-unique part from the number of times
used which is the unique part.
"""
def pair(from_str, to_str=''):
return (from_str, to_str)
BASE_PAIRS = [
pair('\\'),
pair('/'),
pair(r'\\'),
pair('?'),
pair('%'),
pair('*'),
pair(':'),
pair('|'),
pair('"'),
pair('>'),
pair('<'),
pair('('),
pair(')'),
pair('+'),
pair(',')
]
COMBIGLD_PAIRS = [
pair(' ', '_'),
pair(';'),
pair(']'),
pair('['),
pair('][', '-'),
pair('[]')
]
ALL_PAIRS = BASE_PAIRS + COMBIGLD_PAIRS
if extra_replacement_pairs:
ALL_PAIRS += extra_replacement_pairs
self.replacement_dict = OrderedDict(ALL_PAIRS)
self.separator = separator
self.prev_names = {}
[docs] def cleanAndUniquify(self,
input_str,
clear_prev=False,
max_len=CLEAN_AND_UNIQUIFY_MAX_LEN):
"""
Shorten if necessary, replace certain characters in an input string
and then uniquify the string by comparing with a dictionary of
previous names and number of times used.
:type input_str: str
:param input_str: the input string we want cleaned and uniqified
:type clear_prev: bool
:param clear_prev: specify if the dictionary of previous names
should first be cleared
:type max_len: int
:param max_len: maximum length of the input_str allowed, otherwise
it will be shortened to the max_len value
:rtype: str
:return: the input string now cleaned and uniquified
"""
# Shorten string if necessary
output_str = input_str[:max_len]
for from_substr in list(self.replacement_dict)[::-1]:
to_substr = self.replacement_dict[from_substr]
output_str = output_str.replace(from_substr, to_substr)
if clear_prev:
self.prev_names.clear()
if not self.prev_names.get(output_str):
self.prev_names[output_str] = 1
else:
self.prev_names[output_str] += 1
output_str += self.separator + str(self.prev_names[output_str])
return output_str
[docs]def clean_string(string, default='title'):
"""
Cleans the given string by removing special characters to make it
acceptable for a file name. If the string is blank, it will be replaced by
the value of default.
:type string: str
:param string: The string to clean.
:type default: str
:param default: The name to use if string is blank
:rtype: str
:return: A string usable as a filename
"""
cleaner = StringCleaner()
unclean = string or default
return cleaner.cleanAndUniquify(unclean)
[docs]def zip_and_set_incorporation(zipname, filelist):
"""
Zip up all the requested files and set the resulting archive as the job
control backend structure output file (if runnning under job control).
:type zipname: str
:param zipname: The name of the archive to create
:type filelist: list
:param filelist: Each item in filelist is the name of a file to add to file
zipname
"""
zipper = zipfile.ZipFile(zipname, 'w')
for filename in filelist:
zipper.write(filename)
zipper.close()
backend = jobcontrol.get_backend()
if backend:
backend.addOutputFile(zipname)
backend.setStructureOutputFile(zipname)
[docs]class CellRunInfo(object):
"""
Holds the information for the run for a single cell
"""
[docs] def __init__(self,
structs,
basename,
replicate,
multiple_cells,
component=None,
repeat_unit_info=None):
"""
Create a CellRunInfo object
:type structs: list
:param structs: The list of structures to include in the cell
:type basename: str
:param basename: The generic basename for job files. This will be
modified based on the value of replicate, component and multiple_cells
to form the base name for files for this specific cell.
:type replicate: int
:param replicate: Which replicate this is for - 1-based
:type multiple_cells: bool
:param multiple_cells: Whether there will be multiple replicates of this
cell
:type component: int or None
:param component: The structure number this cell is for, or None if this
is a mixed structure cell
:type repeat_unit_info: list or None
:param repeat_unit_info: Each item of the list is a tuple. The first item
of the list is the sequence of monomer one-letter codes that give the
repeat unit sequence. The second item is a tag to be added to the
polymer name for that sequence (used for enumerated sequences)
"""
self.structs = structs
self.basename = basename
self.replicate = replicate
self.cru = None
if component:
self.basename = self.basename + '_component%d' % component
else:
self.basename = self.basename + '_all_components'
if repeat_unit_info and repeat_unit_info[1]:
self.cru = repeat_unit_info[0]
self.basename = self.basename + '_%s' % repeat_unit_info[1]
if multiple_cells:
self.basename = self.basename + '_%d' % replicate
self.is_cg = all(
msutils.is_coarse_grain(x, by_atom=True) for x in structs)
[docs]class MultijobDriver(object):
"""
Resubmit the driver as subjobs
"""
COMPOSITION_FLAG = '-composition'
COMPOSITION_FLAG_SHORT = '-c'
[docs] def __init__(self,
runlist,
options,
args,
log,
remote_script,
default_jobname,
basename=None):
"""
Create multiple cells in parallel running a subjob for each cell. Zip up the
resulting cms files into a jobname.zip file and set it as the structure
output file to be incorporated.
:type runlist: list of `CellRunInfo`
:param runlist: Each item of runlist will generate a subjob and a single
cell
:type options: `argparse.Namespace`
:param options: The command line options
:type args: iterable
:param args: The command line arguments as passed in by sys
:type log: function
:param log: function(msg) writes msg to log file
:type remote_script: string
:param remote_script: the dir and name for driver to resubmit
:type default_jobname: string
:param default_jobname: Default job name
:type basename: str or None
:param basename: The generic basename defined from inputfile.
"""
if hasattr(options, 'output_basename'):
basename = options.output_basename
elif not basename:
basename = default_jobname
self.multijob_driver(runlist, options, args, basename, log,
remote_script)
[docs] def remove_flag(self, args, flag, and_value=False):
"""
Remove a flag from the comand line flags
:type args: list
:param args: The list of command line arguments
:type flag: str
:param flag: The flag to remove
:type and_value: bool
:param and_value: Also remove the value associated with the flag - it is
assumed that this is the following list item
"""
try:
index = args.index(flag)
except ValueError:
# Flag was not used, so we don't need to do anything
return
if and_value:
del args[index:index + 2]
else:
del args[index]
[docs] def replace_value(self, args, old_value, new_value):
"""
Replace the list item with value=old_value with the new value
:type args: list
:param args: The list of command line arguments
:type old_value: str
:param old_value: The value to replace
:type new_value: str
:param new_value: The value to replace old_value with
"""
try:
index = args.index(old_value)
except ValueError:
return
args[index] = new_value
[docs] def multijob_driver(self, runlist, options, args, basename, log,
remote_script):
"""
Create multiple cells in parallel running a subjob for each cell. Zip
up the resulting cms files into a jobname.zip file and set it as the
structure output file to be incorporated.
:type runlist: list of `CellRunInfo`
:param runlist: Each item of runlist will generate a subjob and a single
cell
:type options: `argparse.Namespace`
:param options: The command line options
:type args: iterable
:param args: The command line arguments as passed in by sys
:type basename: str
:param basename: The zipped .zip or . maegz filename for all job files.
:type log: function
:param log: function(msg) writes msg to log file
:type remote_script: string
:param remote_script: the dir and name for driver to resubmit
"""
log('Setting up job queue')
sub_args = list(args)
jobdj = queue.JobDJ(verbosity='normal',
max_failures=queue.NOLIMIT,
max_retries=3)
for host, procs in jobdj._hosts.items():
log('Host: %s, processors:%d' % (host, procs))
# Remove flags we don't want for subjobs
for flag in ['-homogeneous', '-no_disordered_cell']:
self.remove_flag(sub_args, flag)
self.remove_flag(sub_args, '-ncells', and_value=True)
# homogeneous cells should not have composition flags as they only have
# one component
homo_args = sub_args[:]
self.remove_flag(homo_args, self.COMPOSITION_FLAG, and_value=True)
# User might have used the short form (-c)
self.remove_flag(homo_args, self.COMPOSITION_FLAG_SHORT, and_value=True)
# Create a subjob for each cell to create
for runinfo in runlist:
if len(runinfo.structs) == 1:
cmdargs = homo_args[:]
else:
cmdargs = sub_args[:]
if options.seed:
seed = options.seed * runinfo.replicate
self.replace_value(cmdargs, str(options.seed), str(seed))
if runinfo.cru:
self.replace_value(cmdargs, options.repeat_unit, runinfo.cru)
# Set up the input and output file names
if hasattr(options, 'output_basename'):
self.replace_value(cmdargs, options.output_basename,
runinfo.basename)
if hasattr(options, 'title'):
self.replace_value(cmdargs, options.title, runinfo.basename)
subinput = runinfo.basename + '.maegz'
fileutils.force_remove(subinput)
for struct in runinfo.structs:
struct.append(subinput)
self.replace_value(cmdargs, options.input_file, subinput)
cmd = ['run', remote_script] + cmdargs
rjob = RobustSubmissionJob(cmd)
jobdj.addJob(rjob)
log('Number of jobs to run: %d' % jobdj.total_added)
# Run all jobs and make sure we grab their output
expect_exts = ['.maegz' if options.no_system else '.cms']
st_files = run_jobdj_and_add_files(jobdj, log, expect_exts=expect_exts)
backend = jobcontrol.get_backend()
if options.no_system:
# Read all the output structures and compile them into one file
outname = basename + AMCELL_NO_SYSTEM_OUT
writer = structure.StructureWriter(outname)
struct_titles = {}
for struct in structure.MultiFileStructureReader(st_files):
# same component of different structures needs different titles
# in PT
if struct.title not in struct_titles:
struct_titles[struct.title] = 0
else:
struct_titles[struct.title] += 1
struct.title = struct.title + '_' + str(
struct_titles[struct.title])
writer.append(struct)
writer.close()
if backend:
backend.addOutputFile(outname)
backend.setStructureOutputFile(outname)
else:
# Desmond CMS files need to be incorporated into a zip file.
# Zip up all the cms files and incorporate them all
if backend:
jobname = backend.getJob().Name
else:
jobname = basename
outname = jobname + '.zip'
zip_and_set_incorporation(outname, st_files)
log('Output compiled in %s' % outname)
[docs]def get_jobname(default_jobname):
"""
Return a the jobname from backend, command line (-NAMEJOB / environment), DEFAULT_JOBNAME
:type default_jobname: str
:param default_jobname: default_jobname of the current module
:rtype: string
:return: Jobname
"""
assert default_jobname
return jobcontrol.get_jobname(default_jobname) or default_jobname
[docs]def get_procs():
"""
Get number of processors from backend or command-line arguments.
:rtype: int
:return: Number of processors
"""
backend = jobcontrol.get_backend()
if backend:
if jobcontrol.get_backend_host_list():
return jobcontrol.get_backend_host_list()[0][1] or 1
else:
return 1
else:
if jobcontrol.get_command_line_host_list():
return jobcontrol.get_command_line_host_list()[0][1] or 1
else:
return 1
return 1
[docs]def memory_usage_psutil():
"""
return the memory usage in MB
:rtype: float
:rparam: memory usage in MB
"""
process = psutil.Process(os.getpid())
mem = process.memory_info()[0] / float(2**20)
return mem
[docs]def get_size_of(an_object, size_unit='megabytes'):
"""
Return the size of an object in size_unit. The object can be any type of object.
All built-in objects will return correct results, but this does not have to
hold true for third-party extensions as it is implementation specific.
:param an_object:
:type an_object: any type of python object
:return: the size of an object in size_unit
:rtype: float
"""
unit_to_power = {'bytes': 0, 'kilobytes': 1, 'megabytes': 2, 'gigabytes': 3}
return sys.getsizeof(an_object) / 1024**unit_to_power[size_unit]
[docs]def get_jobhosts(ncpu=None):
"""
Return the job hosts from backend or command line.
:param ncpu: number of processors
:type ncpu: int or None
:rtype: None or list of tuple
:rparam: the hosts or None
"""
if jobcontrol.get_backend():
hosts = jobcontrol.get_backend_host_list()
else:
hosts = jobcontrol.get_command_line_host_list()
if hosts and ncpu:
for host, cpus in hosts:
if not host[1]:
host[1] = cpus
# If no hosts have been specified, use (localhost, None)
return hosts if hosts else [(queue.LOCALHOST_ENTRY_NAME, ncpu)]
[docs]def get_jobhost():
"""
Return the first job hosts from backend or command line.
:rtype: list or None
:rparam: the first host or None
"""
hosts = get_jobhosts()
if not hosts:
return
host = hosts[0]
if host[1] is not None:
return list(host)
return [host[0], 1]
[docs]def get_jobhost_name():
"""
Return the job host name from backend or command line.
:rtype: str or None
:rparam: the host name
"""
hosts = get_jobhost()
return hosts[0] if hosts else None
[docs]def get_backend_hosts_str():
"""
Get backend host(s) as passed to the -HOST flag. This can be useful when
a subjob needs to resubmit its own subjobs to the original queue via subhost
and the current job may be running on localhost due to smart distribution
being enabled in the parent job.
For the hierarchy of submission:
driver.py -> opto_driver/pdft_driver -> backend subjobs
opto_driver.py will get 'localhost' as HOST because of the smart
distribution, its subjobs still need to go to the queue. In order to do this,
opto_driver.py can be submitted with '-SUBHOST get_queue_host()'.
We want to keep smart distribution in the driver.py jobdj, so that
opto_driver is not taking another slot in the queue.
:rtype: str or None
:return: Backend hosts or None if host is to determined by the JC
"""
# Must be running under JC
if not jobcontrol.get_backend():
return
hosts = jobcontrol.get_backend_host_list()
if not hosts:
return
hosts_str = jobcontrol.host_list_to_str(hosts)
# Return None if string is empty
return hosts_str or None
[docs]def is_hostname_known(hostname):
"""
Check whether hostname is defined in the host file.
:type hostname: str
:param hostname: the hostname to check against
:rtype: bool
:rparam: True, if the hostname is defined in the host file
"""
hosts_list = jobcontrol.get_hosts()
host_names = {h.name for h in hosts_list}
return hostname in host_names
[docs]def is_jobhost_gpu_available():
"""
Check whether the gpu is available on the host. First check SUBHOST, if defined.
Then, check HOST, if defined. At last, check localhost.
:rtype: bool
:rparam: True means gpu is available.
"""
hostname = get_jobhost_name()
if hostname in [None, queue.LOCALHOST_ENTRY_NAME]:
# The calculation runs on local
return gpgpu.is_any_gpu_available()
# This checks the gpu availability for SUBHOST, if defined.
# This checks the gpu availability for HOST, if SUBHOST not defined
# and HOST defined.
host = jobcontrol.get_host(hostname)
# Define the GPU availability based on gpgpu (SUPPORT-128375)
return bool(host.gpgpu)
[docs]def add_zipfile_to_backend(adir):
"""
Add a zip file of the given directory to the job backend.
:type adir: str
:param adir: the directory
"""
zip_file = f'{adir}.zip'
zip_directory(adir, fileobj=zip_file)
backend = None
add_outfile_to_backend(zip_file, backend)
[docs]def get_backend_first_host():
"""
Get backend first host.
:rtype: str, int
:return: host, number of processors
"""
assert jobcontrol.get_backend()
hostlist = jobcontrol.get_backend_host_list()
if hostlist:
return hostlist[0]
else:
return (queue.LOCALHOST_ENTRY_NAME, 1)
[docs]def write_cms_with_wam(cms_model, filename, wam_type):
"""
Write the cms model to a file with the provided WAM property
:param `cms.Cms` cms_model: The cms model to write to file
:param str filename: The cms path
:param int wam_type: One of the enums defined in workflow_action_menu.h
"""
with wam.WorkflowMenuMaestroWriter(filename, wam_type) as writer:
writer.append(cms_model._raw_fsys_ct)
for ct in cms_model.comp_ct:
ct.append(filename, "CMS")
[docs]def add_wam_to_cms(filename, wam_type):
"""
Rewrite the cms with the WAM type added
:param str filename: The cms path
:param int wam_type: One of the enums defined in workflow_action_menu.h
"""
cms_model = cms.Cms(filename)
write_cms_with_wam(cms_model, filename, wam_type)
[docs]def write_mae_with_wam(structs, filename, wam_type=None):
"""
:param list(structure.Structure) struct: The structures to write to file
:param str filename: The mae path
:param int wam_type: One of the enums defined in workflow_action_menu.h
:raise ValueError: If the file path is not for a Maestro file
"""
if fileutils.get_structure_file_format(filename) != structure.MAESTRO:
raise ValueError(f"{filename} is not a Maestro file path.")
with wam.WorkflowMenuMaestroWriter(filename, wam_type) as writer:
for struct in structs:
writer.append(struct)
[docs]def set_structure_wam(struct, wam_type):
"""
Sets the WAM property for the passed structure
:param `structure.Structure` struct: The structure to set the WAM for
:param int wam_type: One of the enums defined in workflow_action_menu.h
"""
if isinstance(struct, cms.Cms):
struct = struct._raw_fsys_ct
wam.set_workflow_action_menu(struct, wam_type)
[docs]def remove_structure_wam(struct):
"""
Remove the structure WAM property from the passed structure
:param structure.Structure struct: The structure to remove property from
"""
if isinstance(struct, cms.Cms):
struct.remove_cts_property(msprops.STRUCTURE_WAM_PROP)
else:
struct.property.pop(msprops.STRUCTURE_WAM_PROP, None)
[docs]def get_smart_distribution_from_environ(default=True):
"""
Get smart distribution of the queue value based on the environment variable.
:param bool default: Default value if env variable is not set. Must be
aligned with the default value of the `queue.JobDJ.smart_distribution`
:rtype: bool
:return: Smart distribution value
"""
enable_sd = os.environ.get('SCHRODINGER_MATSCI_REUSE_DRIVER_HOST')
if enable_sd is None:
return default
try:
return msutils.setting_to_bool(str(enable_sd))
except ValueError: # Invalid string
return default
[docs]def set_smart_distribution_from_environ(jobq, log=None):
"""
Set smart distribution of the queue to on/off based on the environment
variable.
:type jobq: `schrodinger.job.queue.JobDJ`
:param jobq: The JobDJ object
:type log: function or None
:param log: A function to log smart distribution status. Should take a
single str argument.
"""
state = get_smart_distribution_from_environ(default=None)
if state is not None:
jobq.setSmartDistribution(state)
if log:
log('Run some subjobs locally: %s' % jobq.smart_distribution)
[docs]class RobustSubmissionJob(queue.JobControlJob):
"""
A JobControlJob object that will retry to submit multiple times for fizzled,
jobs (if the queue setting is such) but will not attempt to retry a job
that died.
"""
[docs] def retryFailure(self, max_retries=0):
"""
Determine if the job should be retried or not. This overwrites the
parent method to not retry jobs that have a status of "died" as that
will indicate that Jaguar failed, which it almost certainly will again.
:type max_retries: int
:param max_retries: The queue's max_retries parameter
:rtype: bool
:return: True if the job should be retried, False if not
"""
job = self.getJob()
if job and job.isComplete() and job.ExitStatus == "died":
return False # don't retry the job (MATSCI-1020)
return queue.JobControlJob.retryFailure(self, max_retries=max_retries)
[docs]def create_queue(options=None, host=None, **kwargs):
"""
Create a JobDJ job with some default values and the given keyword arguments
Current defaults:
- verbosty: normal
- max_failures: NOLIMIT
- max_retries: 3
:type options: argparse Namespace object
:param options: the hostlist will be formed from the options.host property
if not supplied by the host argument
:type host: str
:param host: The host string to use to create the queue
All other keyword arguments will be passed on to the JobDJ object
:rtype: `schrodinger.job.queue.JobDJ`
:return: The JobDJ object
"""
if host != AUTOHOST:
if not host:
if hasattr(options, 'host') and options.host:
host = options.host
else:
host = queue.LOCALHOST_ENTRY_NAME
hostlist = jobcontrol.host_str_to_list(host)
else:
# This will cause JobDJ to figure out the host and processors from the
# environment
hostlist = None
kwargs.setdefault('verbosity', 'normal')
kwargs.setdefault('max_failures', queue.NOLIMIT)
kwargs.setdefault('max_retries', 3)
jobq = queue.JobDJ(hosts=hostlist, **kwargs)
# JobDJ uses smart distribution by default, which runs a subjob on the same
# host as the driver but not in the queue. This is done to keep avoid having
# the driver queue slot idle. We can't do this if subjobs will occupy more
# than one thread.
tppval = getattr(options, 'TPP', None)
if not tppval:
tppval = 0
if tppval > 1:
jobq.disableSmartDistribution()
return jobq
[docs]def get_all_subjobs(job):
"""
Get all subjobs (and recursively all subjobs of subjobs) of the given job
:param `jobcontrol.Job` job: The job to get subjobs of
:rtype: list
:return: A list of all subjobs stemming from the current job. Each item
is a `jobcontrol.Job` object
"""
# Note - it is possible for some subjobs to be missed if they are running
# on a remote machine under legacy job control. See the discussion in the RB
# for MATSCI-10086 for additional details.
subjobs = []
for sid in job.SubJobs:
sjob = get_job_from_hub(sid)
subjobs.append(sjob)
subjobs.extend(get_all_subjobs(sjob))
return subjobs
[docs]def is_job_server_job(job):
"""
Check if the job is/was run under Job Server
:param `jobcontrol.Job` job: The job to check
:rtype: bool
:return: Whether the job was run under job server
"""
return mmjob.mmjob_is_job_server_job(job.JobID)
[docs]def is_downloadable_job_server_job(job):
"""
Check if the job is/was run under Job Server and has not yet downloaded
its files
:param `jobcontrol.Job` job: The job to check
:rtype: bool
:return: Whether the job was run under job server and has not downloaded
files
"""
return is_job_server_job(job) and not job.isDownloaded()
[docs]def get_job_from_hub(jobid):
"""
Get a job object for the given job id
:param str jobid: The job id
:rtype: `jobcontrol.Job`
:return: The job object for this job id
:raise: `jobhub.StdException` If there is no job found for jobid
"""
job_manager = jobhub.get_job_manager()
return job_manager.getJob(jobid)
[docs]class JSFilePathData:
"""
Manage info for a file that belongs to a currently running job server job
The object has attributes for the associated job and file name.
Attributes requests not found on this class will be passed on to the
pathlib.Path file name attribute.
"""
[docs] def __init__(self, path, job):
"""
Create a JSFilePathData object
:param str path: The name of the file
:param `jobcontrol.Job` job: The job this file is associated with
"""
# Note - it would be far more clear just to add a job attribute to the
# pathlib.Path object and avoid this class altogether, but Path objects
# do not allow adding arbitrary attributes.
self.path = pathlib.Path(path)
self.job = job
def __repr__(self):
return self.job.JobId + ':' + str(self.path)
def __getattr__(self, attribute):
"""
Pass on any unknown attribute requests to the path attribute
:param str attribute: The requested attribute
:raise AttributeError: If the attribute is not found on this object
or on the path object
"""
try:
return getattr(self.path, attribute)
except AttributeError:
raise AttributeError(f'{self.__class__.__name__} has no attribute'
f' {attribute}')
[docs]class FileDownloadError(Exception):
""" Raised by FileDownloader for any error """
[docs]class FileDownloader:
"""
Manage retrieving file information from a running Job Server job and
downloaded associated files
"""
[docs] def __init__(self):
""" Create a FileDownloader instance """
# The system process communicating with Job Server
self.process = None
# An error message describing any error that occurred
self.error = None
# The path to the most recent stdout files created by this class
self.out_filename = None
# The path to any stdout files created by this class
self.out_filenames = []
# The files available for download from the server. Each item is a
# JSFilePathData object
self.available_filenames = []
# Any temp directories that should be removed by this class when temp
# files are removed. Must be added manually by calling code
self.temp_directories = []
# If this class is used from Maestro or a stand-alone GUI, try to
# clean up any temp files it creates when the process shuts down.
app = QtCore.QCoreApplication.instance()
if app:
app.aboutToQuit.connect(self.cleanFiles)
[docs] @staticmethod
def interpretJobServerError(msg, checkhost=True):
"""
Form a more user-friendly error message for some known Job Server errors
:param str msg: The Job Server error message
:param bool checkhost: Whether to try to extract the host name from
the message
:rtype: str
:return: A modified error message, or the original message if no
modification is found
"""
def _get_host_name_in_line(line):
if not all(x in line for x in '[@:]'):
return None, None
# Expects a line with format: [job server @ localhost:41789]...
chunk = line.split('@ ')[1]
host = chunk.split(':')[0]
rest = chunk.split(']')[1]
return host, rest
# Job server error messages have one line per server, with each line
# giving the error for that server. We reduce the amount of technical
# jargon in the messages to just a short line per server.
new_msg = ""
lookup = 'lookup'
for line in msg.split('\n'):
if checkhost:
host, rest = _get_host_name_in_line(line)
if not host:
new_msg += line + '\n'
continue
new_msg += host + ': '
else:
rest = line
if 'connection error' in rest and lookup in rest:
# Error messages such as:
# [job server @ boltsub3.schrodinger.com:8030] rpc error: code =
# Unavailable desc = connection error: desc = "transport: Error
# while dialing dial tcp: lookup boltsub3.schrodinger.com on
# 127.0.0.53:53: no such host"
new_msg += 'Unable to contact host\n'
elif 'desc =' in rest:
# Error messages such as:
# [job server @ boltsub3.schrodinger.com:8030] rpc error: code =
# Unknown desc = 'bob' is not a valid job server JobId
tokens = rest.split('=')
new_msg += tokens[-1].strip() + '\n'
else:
new_msg += rest + '\n'
new_msg = new_msg.rstrip('\n')
return new_msg
[docs] def listAvailableFiles(self, job, subjobs=True, wait=False):
"""
Get file names available on the server for the given job
:param `jobcontrol.Job` job: The job to get file names for
:param bool subjobs: Also get file names for subjobs of the given job
:param bool wait: If True, wait until the process finishes and return
the file info. If False, start a process to retrieve the file names
and return. The calling function will need to call the
parseAvailableFiles method when the process completes. Note that
using wait=True can freeze a GUI for a long time until the
network process times out. See the
jobdirdlg.InteractiveFileDownloader class.
:rtype: list or None
:return: If wait is False, the process attribute is the running
process. If wait is True, a list of `JSFilePathData` objects is
returned for each file found on the server.
:raise `FileDownloadError`: if an error occurs while retrieving names
"""
self.available_filenames = []
self.process = None
# Gather a list of all jobs to get file names for
all_jobs = [job]
if subjobs:
all_jobs.extend(get_all_subjobs(job))
# Form the command to get the file names written to a json file
ids = {x.JobId: x for x in all_jobs}
cmd = ['jsc', 'list-files', '-json']
cmd.extend(ids.keys())
tdir = fileutils.get_directory_path(fileutils.TEMP)
temp_file = tempfile.NamedTemporaryFile(suffix='.json',
dir=tdir,
delete=False)
self.out_filename = temp_file.name
self.out_filenames.append(self.out_filename)
# Start the process
self.process = subprocess.Popen(cmd,
stdout=temp_file,
stderr=subprocess.PIPE,
text=True)
if not wait:
return
# Wait for the process to finish and check for errors
code = self.process.wait()
if code:
self.raiseProcessError()
# Parse the output data
return self.parseAvailableFiles()
[docs] def raiseProcessError(self):
"""
Raise an exception with the stderr text from the current process
:raise `FileDownloadError`: Raised with stderr output from the process
"""
msg = self.process.stderr.read()
raise FileDownloadError(self.interpretJobServerError(msg))
[docs] def parseAvailableFiles(self):
"""
Parse the available file data from a process started by
get_files_available_on_server
:rtype: list
:return: Each item of the list is a `JSFilePathData` object for a
file found on the server
:raise RuntimeError: If no process has been started
:raise `FileDownloadError`: If unable to parse the data
"""
if not self.process:
raise RuntimeError('There is no process output to parse')
with open(self.out_filename, 'r') as jfile:
data = json.load(jfile)
errors = {}
# Data is a list of dictionaries. Each dictionary contains the data
# for one job. The keys in that dictionary are created in a Go
# program and not available as constants anywhere accessible to Python
for jobdata in data:
jobid = jobdata['jobId']
jerrors = jobdata['errors']
# Either errors is None, downloadableFiles is None, or both are.
# There is no case where errors is populated if downloadableFiles
# were found.
if jerrors:
msg = ""
# There will be one error per certified server
for jerror in jerrors:
# Form the server host name - the JC team says that
# serverAddress will always be present for errors.
msg += jerror['serverAddress'].split(':')[0] + ': '
# Make a friendly error message for this server
msg += self.interpretJobServerError(jerror['userMessage'],
checkhost=False)
msg += '\n'
errors[jobid] = msg
else:
# No errors occurred
try:
this_job = get_job_from_hub(jobid)
except jobhub.StdException:
# Unclear if this can actually happen, here for safety
errors[jobid] = f'Unable to convert {jobid} to a job'
continue
for name in jobdata['downloadableFiles']:
self.available_filenames.append(
JSFilePathData(name, this_job))
if errors and not self.available_filenames:
# There could in theory be a bunch of completely different errors
# for every subjob. However, it's more likely there's a bunch of
# subjobs with the same error, so only show one rather than
# innundate the user with messages
for error in errors.values():
numerr = len(errors)
jobp = 'job' if numerr == 1 else 'jobs'
msg = ('Errors occurred obtaining the list of files for '
f'{numerr} {jobp}. Example error is:\n{error}')
raise FileDownloadError(msg)
return self.available_filenames
[docs] def downloadJobFileToTemp(self, job, filename, temp_path=None, wait=False):
"""
Tail the given file on the server to a local file. This does not
count as "downloading" the file in Job Server terms (i.e. the file
will still be downloaded to the job directory at the end of the job
if requested by Job Server settings).
It is up to the calling code to remove the temporary file created by
this method.
:param `jobcontrol.Job` job: The job the file is associated with
:param str filename: The name of the file
:param str temp_path: The path to the file to write. If not given,
a temp path will be created.
:param bool wait: If wait is True, wait for the file to download
and return the name of the temp file. If wait is False,
start the download process and return None while the process
runs. Note that using wait=True can freeze a GUI for a long time
until the network process times out. See the
jobdirdlg.InteractiveFileDownloader class
:rtype: str or None
:return: If wait is False, the process attribute is the running
process and None is returned. If wait is True, the full path of the
file created is returned.
:raise `FileDownloadError`: if an error occurs while downloading
"""
self.process = None
# Determine the file path
if temp_path:
temp_file = open(temp_path, 'w')
else:
ext = fileutils.get_file_extension(filename)
tdir = fileutils.get_directory_path(fileutils.TEMP)
temp_file = tempfile.NamedTemporaryFile(suffix=ext,
dir=tdir,
delete=False)
self.out_filename = temp_file.name
self.out_filenames.append(self.out_filename)
# Create the command and start the process
cmd = ['jsc', 'tail-file', '--name', filename, job.JobID]
self.process = subprocess.Popen(cmd,
stdout=temp_file,
stderr=subprocess.PIPE,
text=True)
if not wait:
return
# Check that the process completed successfully
code = self.process.wait()
if code:
self.raiseProcessError()
return self.out_filename
[docs] def cleanFiles(self):
""" Remove any files created by this downloader """
for afile in self.out_filenames:
fileutils.force_remove(afile)
self.out_filenames = []
self.out_filename = None
for adir in self.temp_directories:
fileutils.force_rmtree(adir, ignore_errors=True)
[docs]def register_driver_log(job_builder, default_job_name):
"""
Register the driver log file upfront in the given job builder
specfication.
:type job_builder: `launchapi.JobSpecificationArgsBuilder`
:param job_builder: job specification builder object
:type default_job_name: str
:param default_job_name: the default job name to fall back on
"""
job_name = get_jobname(default_job_name)
log_file_name = f'{job_name}{textlogger.DRIVER_LOG_EXT}'
job_builder.setOutputFile(log_file_name, stream=True)