"""
Core job control for python.
There are currently four major sections of this module - "Job database,"
"Job launching," "Job backend," and "Job hosts." The job database section
deals with getting info about existing Jobs, the job launching section
deals with starting up a subjob, and the job backend section provides
utilities for a python script running as a job.
Copyright Schrodinger, LLC. All rights reserved.
"""
import base64
import collections
import contextlib
import enum
import functools
import inspect
import logging
import os
import re
import shlex
import subprocess
import sys
import tempfile
import time
import warnings
from datetime import datetime
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
from urllib.parse import urlparse
from urllib.request import url2pathname
import backoff
import more_itertools
import schrodinger.infra.mm as mm
import schrodinger.infra.mmjob as mmjob
import schrodinger.utils.fileutils as fileutils
import schrodinger.utils.log
import schrodinger.utils.mmutil as mmutil
from schrodinger import get_maestro
from schrodinger import get_mmshare_version
from schrodinger import get_release_name
from schrodinger import gpgpu
from schrodinger.job import jobcontrol
from schrodinger.Qt import QtCore
from schrodinger.ui.qt.appframework2 import application
from schrodinger.utils import qt_utils
from schrodinger.utils import subprocess as subprocess_utils
from . import resource
_version = "$Revision: 1.105 $"
LOCAL_RUN = os.path.join('${SCHRODINGER}', 'run')
logger = logging.getLogger("schrodinger.jobcontrol")
schrodinger.utils.log.default_logging_config()
# If JobControl debugging is on, also turn on Python debugging for this module.
if os.environ.get('SCHRODINGER_JOB_DEBUG'):
logger.setLevel(logging.DEBUG)
profiling = os.getenv("JC_PROFILING", 0)
mmjob.mmjob_initialize(mm.MMERR_DEFAULT_HANDLER, "")
TOPLEVEL_HOST_ARGS_ENV = 'TOPLEVEL_HOST_ARGS'
[docs]class DisplayStatus(enum.Enum):
WAITING = "Waiting"
RUNNING = "Running"
CANCELED = "Canceled"
STOPPED = "Stopped"
FAILED = "Failed"
COMPLETED = "Completed"
[docs]def timestamp(msg):
if profiling:
print("@@", datetime.now().strftime("%H:%M:%S.%f"), "-", msg)
##
# A regular expression to grab a JobId, e.g. out of a launch command's
# stdout.
#
jobid_re = re.compile(r"^JobId:\s+(\S+)", re.IGNORECASE | re.MULTILINE)
##
# The timestamp format used for the Job database in the LaunchTime and
# StartTime fields.
#
timestamp_format = "%Y-%m-%d-%H:%M:%S"
##
# The order of host-entry fields, in which they will be stored
# in the _lines attribute of a Host object
entry_fields = [
"name", "base", "host", "nodelist", "user", "queue", "qargs", "schrodinger",
"proxyhost", "proxyport", "proxyexec", "tmpdir", "shareddir", "env",
"processors", "processors_per_node", "parallel", "ngpu", "gpgpu",
"cuda_cores", "maestrocontrols", "walltime", "memory", "accountcodes"
]
field_sortkey = {k: v for (v, k) in enumerate(entry_fields)}
if sys.platform == "darwin":
INSTALL_ROOT = "/opt/schrodinger"
elif sys.platform == "win32":
INSTALL_ROOT = "C:\\Program Files\\Schrodinger"
else:
INSTALL_ROOT = "/opt/schrodinger"
HOSTS_FILE = "schrodinger.hosts"
# Characters that don't require escaping on command-line of any OS:
SAFE_COMMAND_CHARS = set(
'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_.')
# Programs which are stoppable by sending 'halt' message to the backend.
STOPPABLE_PROGRAMS = {
"Desmond", "WaterMap", "multisim", "Quantum ESPRESSO executable",
"Periodic DFT"
}
# A regular expression to check if the argument of the command list
# is a shell redirection operator.
shell_redirect_re = re.compile(r'^\d*(<{1,2}|>{1,2})&?\d*$')
qapp = None
[docs]class JobcontrolException(Exception):
pass
[docs]class JobLaunchFailure(JobcontrolException, RuntimeError):
pass
[docs]class MissingHostsFileException(JobcontrolException):
pass
[docs]class UnreadableHostsFileException(JobcontrolException):
pass
##
# Job database stuff
#
[docs]class Job:
"""
A Job instance is always a snapshot of the job state at a specific
point in time. It is only updated when the `readAgain` method is
explicitly invoked.
"""
[docs] def __init__(self, job_id: str, cpp_job: mmjob.Job = None):
"""
Initialize a read-only Job object.
:param job_id: Unique identifier for a job
:param cpp_job: provide a c++ job object in memory, used for
constructing objects in wrapper objects from c++ APIs, rather than
direct construction.
"""
_launch_qapp()
self._case_insensitive_properties = self._get_case_insensitive_lookup()
if cpp_job:
self._cpp_job = cpp_job
self.job_id = self.JobId
else:
self.job_id = job_id
self.readAgain()
def _get_case_insensitive_lookup(self) -> Dict[str, str]:
"""
Construct a map of lowercase properties to valid attribute members
"""
d = {}
for name, _ in inspect.getmembers(Job, inspect.isdatadescriptor):
if name.startswith("_"):
continue
d[name.lower()] = name
return d
[docs] def readAgain(self):
"""
Reread the database. Calling this routine is necessary to get fresh
values.
"""
backend = get_backend()
if backend and backend.job_id == self.job_id:
self._cpp_job = mmjob.get_backend_job()
return
self._cpp_job = mmjob.Job(self.job_id)
[docs] def isComplete(self) -> bool:
"""
Returns True if the job is complete.
Note that this does not necessarily mean the output files have been
downloaded.
"""
return self._cpp_job.isComplete()
[docs] def isQueued(self) -> bool:
"""
Returns True if the job runs on a HPC queueing system.
"""
return self._cpp_job.isQueueJob()
[docs] def succeeded(self) -> bool:
"""
Returns False if the job was killed, died or fizzled. Returns True
if ExitStatus is finished.
:raises RuntimeError: if the job isn't completed, so use isComplete()
before calling.
"""
# Check ExitStatus before isComplete. There is a race condition in
# legacy jobcontrol where ExitStatus may be "finished" before
# isComplete is true, but the ExitStatus property will block if this
# is the case."
if self.ExitStatus == "finished":
return True
if not self.isComplete():
raise RuntimeError(
f"Job '{self.job_id}' is not complete. Current status is: {self.Status}"
)
return False
[docs] def wait_before_kill(self):
# Experiments show that attempting to kill a (non-job-server) job too
# quickly fails, so wait at least 5 seconds.
seconds_since_start = time.time() - time.mktime(
time.strptime(self.LaunchTime, timestamp_format))
if seconds_since_start < 5:
time.sleep(5 - seconds_since_start)
[docs] def stop(self):
"""
Kill the job while collecting output files.
"""
self._cpp_job.stopJob()
[docs] def kill(self):
"""
Kill the job if it is running. This cancels a running job and does not
return output files.
"""
self._cpp_job.killJob()
[docs] def cancel(self):
"""
Cancel a running job and do not return output files.
This method will eventually deprecate job.kill
"""
self._cpp_job.killJob()
[docs] def kill_for_smart_distribution(self):
"""
Kill the job for smart distribution if it is running.
"""
if self.isComplete():
return
if not mmjob.mmjob_is_job_server_job(self.job_id):
self.wait_before_kill()
self._cpp_job.killJobForSmartDistribution()
def _wait(self, max_interval: int):
"""
Wait for the job to complete.
:param max_interval: maximum interval to sleep between checking for
completion.
"""
if mmjob.mmjob_is_job_server_job(self.job_id):
self.download()
return
# The python implementation differs from the job server C++
# implementation in that it reads the job record again if it fails.
interval = 2
while 1:
self.readAgain()
if self.isComplete():
return
else:
time.sleep(interval)
# exponential fallback
if interval * 2 > max_interval:
interval = max_interval
else:
interval = interval * 2
[docs] def wait(self, max_interval: int = 60, throw_on_failure: bool = False):
"""
Wait for the job to complete; sleeping up to 'max_interval' seconds
between each database check. (Interval increase gradually from 2 sec up
to the maximum.)
NOTE: Do not use if your program is running in Maestro, as this
will make Maestro unresponsive while the job is running.
:param throw_on_failure: whether to raise an exception if not succeeded
:type throw_on_failure: bool
:raises RuntimeError: if the job did not succeed. The error message
will contain the last 20 lines of the job's logfile (if available).
"""
self._wait(max_interval)
if throw_on_failure:
if not self.succeeded():
msg = f"Job '{self.job_id}' did not succeed."
if not self.LogFiles or not os.path.exists(self.LogFiles[0]):
msg += "\nNo log file available."
else:
logfile = self.LogFiles[0]
num_lines = 20
with open(logfile) as fh:
last_lines = "".join(more_itertools.tail(num_lines, fh))
msg += (f"\nLast {num_lines} lines of {logfile}:\n"
f"{schrodinger.utils.log.SINGLE_LINE}\n"
f"{last_lines}")
raise RuntimeError(msg)
[docs] def download(self):
"""
Download the output of the job into job's launch directory.
No-op in legacy jobcontrol.
"""
if not mmjob.mmjob_is_job_server_job(self.job_id):
return
mmjob.mmjob_wait_and_download(self.job_id)
self.readAgain()
def __repr__(self):
"""
Returns the formal string representation of the Job object.
"""
return "Job(\"%s\")" % self.job_id
[docs] def get(self, attr, default=None):
"""
This function will always raise an error, but is provided to guide
users to a new syntax.
"""
if attr.lower() not in self._case_insensitive_properties:
raise TypeError(f"{attr} is not an attribute of Job")
canonical_name = self._case_insensitive_properties[attr.lower()]
raise TypeError(
f'Update syntax: Job.{canonical_name} replaces Job.get("{attr}") '
'to reflect that these attributes are always available')
def __getattr__(self, name):
if name in self._case_insensitive_properties.values():
return super().__getattribute__(name)
if name.lower() not in self._case_insensitive_properties:
return super().__getattribute__(name)
canonical_name = self._case_insensitive_properties[name.lower()]
warnings.warn(
f"Job.{canonical_name} is the appropriate way to call Job.{name}",
SyntaxWarning,
stacklevel=2)
return getattr(self, canonical_name)
[docs] def summary(self) -> str:
"""
Return a string summarizing all current Job attributes.
"""
return self._cpp_job.getSummary()
[docs] def getDuration(self) -> Optional[int]:
"""
Returns the wallclock running time of the job if it is complete. This
does not include time is submission status. Returns time in seconds.
If the job is not complete, returns None.
"""
if not self.StartTime or not self.StopTime:
return None
start = time.mktime(time.strptime(self.StartTime, timestamp_format))
stop = time.mktime(time.strptime(self.StopTime, timestamp_format))
return int(stop - start)
[docs] def isDownloaded(self):
"""
Check if output files were downloaded.
For legacy job control, identical to `isComplete()`.
:return: Whether the job files were downloaded.
:rtype: bool
"""
is_complete = self.isComplete()
if not mmjob.mmjob_is_job_server_job(self.job_id):
# With JOB_SERVER off, job gets downloaded when its completed.
return is_complete
return is_complete and not self._cpp_job.hasDownloadableFiles()
@property
def BatchId(self) -> Optional[str]:
"""
Return the batch id, if running on an HPC queueing system. Otherwise
return None.
"""
batchid = self._cpp_job.getBatchId()
if not batchid:
return None
return batchid
@property
def Dir(self) -> str:
"""
Return the absolute path of the launch directory.
"""
return self._cpp_job.getLaunchDir()
@property
def ExitCode(self) -> Union[int, str]:
"""
Returns the exit code of a process. If the job is still running, or it
was killed without collecting the exit code, return a string
indicating unknown status.
"""
try:
return self._cpp_job.getExitCode()
except RuntimeError:
return "Exit code unknown"
@property
def Host(self) -> str:
"""
Return the hostname of the host which launched this job.
"""
return self._cpp_job.getString("Host")
@property
def HostEntry(self) -> str:
"""
Return the name of the host entry this job was launched to.
"""
return self._cpp_job.getHostEntry()
@property
def LaunchTime(self) -> str:
"""
Return a string timestamp for the time that the job was launched.
This will before the job starts running, as soon as it is registered
with jobcontrol as a job to be run.
"""
return self._cpp_job.getLaunchTime()
@property
def JobId(self) -> str:
"""
Return an identifier for a job.
"""
return self._cpp_job.getJobId()
@property
def Name(self) -> str:
"""
Returns a string representing -JOBNAME that was specified on launch.
This may be an empty string.
"""
return self._cpp_job.getJobName()
@property
def ParentJobId(self) -> Optional[str]:
"""
Return the jobid of a parent job. If the job does not have a parent,
return None.
"""
parent_jobid = self._cpp_job.getParentJobId()
if not parent_jobid:
return None
return parent_jobid
@property
def Processors(self) -> int:
"""
For a batch job, returns the number of queue slots attached to this
job. For a local job, return the number of CPU cores allowed to be
used.
"""
return self._cpp_job.getProcessorCount()
@property
def Program(self) -> str:
"""
Return descriptive text for the name of the program running this job,
e.g. Jaguar. This field is optional and may return an empty string.
"""
return self._cpp_job.getProgram()
@property
def Version(self) -> str:
"""
Return the build number.
"""
mmshare_version = str(get_mmshare_version())
return "{release} build {version}".format(release=get_release_name(),
version=mmshare_version[-3:])
@property
def Project(self) -> str:
"""
Return the job's project name field. This will be an empty string if no
project is set.
"""
return self._cpp_job.getProject()
@property
def QueueHost(self) -> str:
"""
Return the hostname of the submission node of a HPC queueing system. If
not an HPC host, this will be an empty string.
"""
return self._cpp_job.getQueueHost()
@property
def StructureOutputFile(self) -> str:
"""
Return the name of the file returned by the job that will get
incorporated into a project of Maestro. Returns an empty string if
no file is specified.
"""
return self._cpp_job.getStructureOutputFile()
@property
def DisplayStatus(self) -> Optional[DisplayStatus]:
"""
Return a user-focused status that indicates the current state of the
job.
Returns None in the case of non JOB_SERVER jobs.
"""
# The opposite of "user-focused" in this case is
# implementation-dependent.
if mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER):
return DisplayStatus(self._cpp_job.getDisplayStatus())
else:
return None
@property
def StatusChangeReason(self) -> str:
"""
Returns a human-readable reason that a job entered its current state,
such as "job canceled by the user." If the reason was not recorded or
is not particularly interesting (e.g. normal transition from waiting to
running) it may be the empty string.
"""
if mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER):
return self._cpp_job.getStatusChangeReason()
else:
return ""
@property
def Status(self) -> str:
"""
Get the Status of the job. This is used by legacy jobcontrol API,
but is superseded by DisplayStatus for JOB_SERVER jobs.
"""
return self._cpp_job.getString("Status")
@property
def StartTime(self) -> str:
"""
Return a string for the starting time of the job. Returns an empty
string if the job is not yet started, for example, enqueued in an HPC
environment.
"""
return self._cpp_job.getStartTime()
@property
def StopTime(self) -> str:
"""
Return a string for the completion time of the job. Returns an empty
string if the job is not yet completed.
"""
return self._cpp_job.getStopTime()
@property
def StatusTime(self) -> str:
"""
Return a string for the time when the job was last updated.
"""
return self._cpp_job.getStatusTime()
@property
def Viewname(self) -> str:
"""
Return a representation of name used to filter jobs in maestro.
May be empty.
"""
return self._cpp_job.getViewName()
def _wait_for_complete(self, interval: int = 1, max_time: int = 30):
"""
Wait for a job to transition from finished to completed. This is
only meaningful with legacy jobcontrol.
:param interval: seconds to wait between reread of job record
:param max_interval: maximum time to wait for job to be completed
"""
@backoff.on_predicate(backoff.constant,
lambda job: not job.isComplete(),
interval=interval,
max_time=max_time)
def _reread():
self.readAgain()
return self
_reread()
@property
def ExitStatus(self) -> str:
"""
Get the ExitStatus of the job. This is a string representation of a
job. Consider using DisplayStatus instead.
:raises: RuntimeError if the job is not yet complete.
"""
def get_status():
return self._cpp_job.getString("ExitStatus")
exitstatus = get_status()
if exitstatus:
if self.isComplete():
return exitstatus
elif not mmjob.mmjob_is_job_server_job(
self.job_id) and exitstatus == "finished":
# There seems to sometimes a wait between "finished" and
# "completed", especially in jobdj. Do a 30s retry to wait for
# the specific finished->completed status change. This comes
# from the fact that a message can happen before job record
# is updated.
self._wait_for_complete()
exitstatus = get_status()
if self.isComplete():
return exitstatus
raise RuntimeError(
f"ExitStatus is not valid until the job {self.JobId} has "
f"completed; it is currently '{self.Status}:{exitstatus}'. "
f"Full job record: {self.summary()}")
@property
def JobDir(self) -> str:
"""
Return the directory where the job is run. This will be an empty string
if the job has not yet started.
"""
return self._cpp_job.getJobDir()
@property
def JobHost(self) -> str:
"""
Return the hostname where the job is run. This will be an empty string
if the job has not yet started.
"""
return self._cpp_job.getJobHost()
@property
def JobSchrodinger(self) -> str:
"""
Return the directory of Schrodinger installation where the job is
running.
"""
job_schrodinger = self._cpp_job.getString("JobSchrodinger")
if job_schrodinger:
return job_schrodinger
jobMMshareExec = self._cpp_job.getString("JobMMshareExec")
return os.path.dirname(os.path.dirname(os.path.dirname(jobMMshareExec)))
@property
def Envs(self) -> List[str]:
"""
Return a list of environment varaibles that are set by job, in
addition to a default environment on a machine. The format is
["SPECIAL_VAR=0", "SPECIAL_VAR2=yes"]
"""
return list(self._cpp_job.getEnvironment())
@property
def Errors(self) -> List[str]:
"""
Return possible error messages associated with a job. This will only
return values in legacy jobcontrol.
"""
return list(self._cpp_job.getErrors())
@property
def LogFiles(self) -> List[str]:
"""
Get list of log files associated with a log. May be an empty list.
"""
return list(self._cpp_job.getFiles(mmjob.JobFilesType_LOG_FILES))
@property
def SubJobs(self) -> List[str]:
"""
Return list of subjob job ids.
"""
return list(self._cpp_job.getSubjobIds())
@property
def Commandline(self) -> str:
"""
Return the command used to launch the job.
Note that this may not be accurate when the job is called directly from
a jobspec. In that case it will instead return the commandline of
the parent process.
"""
return self._cpp_job.getString("Commandline")
@property
def User(self) -> str:
"""
Return the username of user who launched the job.
"""
return self._cpp_job.getUser()
@property
def InputFiles(self) -> List[str]:
"""
Return list of files that will be transferred to the job directory
on launch.
"""
return list(self._cpp_job.getFiles(mmjob.JobFilesType_INPUT_FILES))
@property
def JobDB(self) -> str:
"""
Path to the Job Database in legacy jobcontrol. This is an empty str
for JOB_SERVER jobs.
"""
return self._cpp_job.getString("JobDB")
@property
def OrigLaunchDir(self) -> str:
"""
Return the launch directory of the oldest ancestor of this job.
"""
return self._cpp_job.getOriginalLaunchDir()
@property
def OrigLaunchHost(self) -> str:
"""
Return the hostname of the oldest ancestor of this job.
"""
return self._cpp_job.getOriginalLaunchHost()
[docs] def getOutputFiles(self) -> List[str]:
return self.OutputFiles
@property
def OutputFiles(self) -> List[str]:
"""
Return a list of output filenames which will be copied back, if
existing, at the end of a job.
Note that this list can grow while the backend is running, since output
files can be registered by the backend.
"""
return list(self._cpp_job.getFiles(mmjob.JobFilesType_OUTPUT_FILES))
[docs] def getProgressAsPercentage(self) -> float:
"""
Get the value of backend job progress in terms of percentage (values
from 0.0 - 100.0)
Return 0.0 when a job is not yet in running state.
"""
return self._cpp_job.getProgress().percentage_completed
[docs] def getProgressAsSteps(self) -> Tuple[int, int]:
"""
Get the value of backend job progress in terms of steps and totalsteps.
Return (0,1) when a job is not yet in 'running' state.
"""
progress = self._cpp_job.getProgress()
return (progress.completed_steps, progress.total_steps)
[docs] def getProgressAsString(self) -> str:
"""
Get the value of backend job progress in terms of descriptive text.
Return "The job has not yet started." when a job is not yet in
running state.
"""
return self._cpp_job.getProgress().description
[docs] def purgeRecord(self):
"""
Purge the job record for the job from the database.
"""
return self._cpp_job.removeRecord()
def _launch_qapp():
"""
Launch QCoreApplication so mmjob can run async calls under the hood.
If we are calling from maestro or other PyQt GUI, this should already
be instantiated, so don't put this somewhere that will get called on
import.
"""
global qapp
if not qapp:
qapp = QtCore.QCoreApplication.instance()
if not qapp:
qapp = QtCore.QCoreApplication([])
#
# Job launching stuff
#
def _get_viewname_from_launch_cmd(cmd: List[str]) -> str:
"""
Returns an empty viewname if not found.
"""
is_viewname = False
for arg in cmd:
if is_viewname:
return arg
elif arg == "-VIEWNAME":
is_viewname = True
return ""
@application.require_application(create=True, use_qtcore_app=True)
def launch_job(cmd: List[str],
print_output: bool = False,
expandvars: bool = True,
launch_dir: Optional[str] = None,
timeout: Optional[int] = None,
env: Optional[Dict[str, str]] = None,
show_failure_dialog: bool = True,
_debug_delay=None) -> jobcontrol.Job:
"""
Run a process under job control and return a Job object. For a process to
be under job control, it must print a valid JobId: line to stdout. If
such a line isn't printed, a RuntimeError will be raised.
The cmd argument should be a list of command arguments (including the
executable) as expected by the subprocess module.
If the executable is present in $SCHRODINGER or $SCHRODINGER/utilities,
an absolute path does not need to be specified.
NOTE: UI events will be processed while the job is launching.
:param print_output: Determines if the output from job launch is printed
to the terminal or not. Output will be logged (to stderr by default) if
Python or JobControl debugging is turned on or if there is a launch
failure, even if 'print_output' is False.
:param expandvars: If True, any environment variables of the form `$var`
or `${var`} will be expanded with their values by the
`os.path.expandvars` function.
:param launch_dir: Launch the job from the specified directory. If
unspecified use current working directory.
:param timeout: Timeout (in seconds) to be applied while waiting for the
job control launch process to start or finish. The launch process will
be terminated after this time. If None, the launch process will run
with a default timeout of 300s.
:param env: This dictionary will replace the environment for the launch
process. If env is None, use the current environment for the launch
process.
:param show_failure_dialog: If True, show failure dialog if we detect we
are using a graphical application and the job launch fails.
:raise RuntimeError: If there is a problem launching the job (e.g., no
JobId gets printed). If running within Maestro, an error dialog
will first be shown to the user.
:raise FileNotFoundError: If launch_dir doesn't exist.
"""
from schrodinger.infra import jobhub
if timeout is not None and timeout < 1:
# JobCommand only deals with timeouts of integer seconds
raise RuntimeError(
f"Timeout {timeout} will be truncated to the latest second and "
"will fail if set to a number less than 1 sec")
cmd = fix_cmd(cmd, expandvars)
timestamp('execute perl jlaunch.pl')
event_loop = QtCore.QEventLoop()
job = None
err_message = None
job_cmd = jobhub.JobCommand(cmd, _get_viewname_from_launch_cmd(cmd),
launch_dir)
if env:
qprocess_env = QtCore.QProcessEnvironment()
for key, value in env.items():
qprocess_env.insert(key, value)
job_cmd.setEnvironment(qprocess_env)
if timeout is not None:
job_cmd.setTimeout(int(timeout))
elif "_SCHRODINGER_JOB_LAUNCHING" in os.environ:
# NOTE: If this environment variable is set, it means a parent process
# has already launched the current process with a defined timeout.
# Assume this function is being called as a result of launchapi/launcher
# so don't set our own timeout; instead, rely on the timeout defined
# from the parent process.
job_cmd.setTimeout(-1) # unlimited
@qt_utils.exit_event_loop_on_exception
def job_started(launched_job: jobcontrol.Job, *, event_loop=None):
nonlocal job
job = launched_job
if _debug_delay is None:
event_loop.quit()
else:
QtCore.QTimer.singleShot(_debug_delay * 1000, event_loop.quit)
@qt_utils.exit_event_loop_on_exception
def job_launch_failed(launch_err: str, *, event_loop=None):
nonlocal err_message
nonlocal err_message
err_message = launch_err
event_loop.quit()
job_launcher = jobhub.JobLauncher(job_cmd)
job_launcher.jobStarted.connect(
functools.partial(job_started, event_loop=event_loop))
job_launcher.jobLaunchFailed.connect(
functools.partial(job_launch_failed, event_loop=event_loop))
job_launcher.launch()
# Confirm that neither signal was emitted yet
assert job is None
assert err_message is None
event_loop.exec()
exception = qt_utils.get_last_exception()
if exception:
raise exception
if not err_message and not job:
err_message = "Job launch failed"
if err_message:
if show_failure_dialog and get_maestro():
# Import QtWidgets only if running within Maestro:
# Note similarities with utils.JobLaunchFailureDialog
from schrodinger.Qt import QtWidgets
dialog = QtWidgets.QMessageBox()
dialog.setIcon(QtWidgets.QMessageBox.Warning)
dialog.setText(f"Job launch failed: {cmd}")
dialog.setDetailedText(err_message)
dialog.exec_()
raise jobcontrol.JobLaunchFailure(err_message)
if print_output:
print(f"JobId: {job.JobId}")
return job
[docs]def prepend_schrodinger_run(cmd: List[str]) -> List[str]:
"""
Check if a command executes a Python script and prepend
$SCHRODINGER/run to the command if it does not already begin
with it.
:param cmd: Command to prepend $SCHRODINGER/run to.
"""
if len(cmd) == 0:
raise ValueError('Empty command list specified')
if cmd[0].endswith('.py') or cmd[0].endswith('.pyc'):
cmd = [LOCAL_RUN] + cmd
return cmd
[docs]def fix_cmd(cmd: List[str], expandvars: bool = True) -> List[str]:
"""
A function to clean up the command passed to launch_job.
:param cmd: A list of strings for command line launching.
:param expandvars: If True, any environment variables of the form `$var` or
`${var`} will be expanded with their values by the `os.path.expandvars`
function.
:return: The command to be launched
"""
cmd = prepend_schrodinger_run(cmd)
# This function exists as a separate entity from launch_job mostly so it
# can be tested.
try:
cmd + []
except TypeError:
raise TypeError("String commands are not accepted. "
"Please use lists of arguments.")
# cmd is a list
logger.debug("launch_job command: %s" % " ".join(cmd))
if expandvars:
cmd = [os.path.expandvars(arg) for arg in cmd]
logger.debug(" expandvars? command: %s" % " ".join(cmd))
cmd = _fix_program_path(cmd[0]) + cmd[1:]
logger.debug(" modified? command: %s" % " ".join(cmd))
return cmd
def _fix_program_path(prog: str) -> List[str]:
"""
If the program executable doesn't exist as a file and isn't an absolute
path, prepend the SCHRODINGER directory if it exists there.
Return a list of command line arguments so this function can be used
from launch_job when 'cmd' is a list.
"""
# Shortcut the search if the program is python; this prevents an extra
# layer of $SCHRODINGER/utilities/python for python scripts launched
# from python. This is an issue because of toplevel.py.
if prog == "python":
if sys.platform == "win32":
return ["python.exe"]
else:
return ["python"]
prog = subprocess_utils.abs_schrodinger_path(prog)
return [prog]
[docs]def list2jmonitorcmdline(cmdlist: List[str]) -> str:
"""
Turn a command in list form to a single string that can be executed
by jmonitor.
"""
cmd = []
for arg in cmdlist:
if not mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER):
arg = subprocess.list2cmdline((arg,))
# Quote all args since they end up in a string. Make an exception
# for strings that contain only "safe" characters or shell redirect.
if not (arg.startswith('"') and arg.endswith('"')) \
and not shell_redirect_re.match(arg):
if not set(arg).issubset(SAFE_COMMAND_CHARS):
# If argument contains "unsafe" characters, quote it:
arg = '\"' + arg + '\"'
else:
if not shell_redirect_re.match(arg):
arg = shlex.quote(arg)
cmd.append(arg)
return ' '.join(cmd)
def _get_file_path(url: str) -> str:
"""
From a job spec url, return a filename.
"""
if not url.startswith("file://"):
raise RuntimeError(
"File URI is %s but job_control_launch doesn't know how "
"to deal with non-file URIs at this time." % url)
parsed_url = urlparse(url)
netloc = parsed_url.netloc
# for UNC paths
if netloc:
netloc = r"\\" + netloc
return url2pathname(netloc + parsed_url.path)
def _get_job_spec_launch_command(job_spec,
launch_parameters,
write_output=False):
"""
Get the launch command for a job based on its specification.
:param job_spec: Data defining the job.
:type job_spec: schrodinger.job.launchapi.JobSpecification
:param launch_parameters: launch parameters for the job
:type launch_parmeters: schrodinger.job.launchparams.LaunchParameters
:param write_output: If true, construct a launch command that can be run
from any SCHRODINGER. Used by appframework2 to write a shell script that
can run the job later.
:type write_output: bool
:return: Command to execute as a list of strings.
"""
# Set the launch_parameters jobname in the job_spec so it can sub out
# any <JOBNAME> variables when calling getCommand().
lp_jobname = launch_parameters.getJobname()
if lp_jobname:
job_spec.setJobname(lp_jobname)
job_spec.validate()
cmd = job_spec.getCommand()
stdout_file = job_spec.getStdout()
if not stdout_file:
stdout_file = job_spec.getDefaultStdout()
cmd = list2jmonitorcmdline(cmd)
cmd += f' > "{stdout_file}" '
stderr_file = job_spec.getStderr()
if not stderr_file:
stderr_file = job_spec.getDefaultStderr()
cmd = f" {cmd} "
if stdout_file == stderr_file:
cmd += "2>&1 "
else:
cmd += f"2> {stderr_file}"
if write_output:
launch_command = ["$SCHRODINGER/run", "jlaunch.pl"]
else:
launch_command = [
"perl",
os.path.join(os.environ['MMSHARE_EXEC'], "jlaunch.pl")
]
# Use the -usetmpdir option so that missing input files in the spec will
# cause the job to fail.
launch_command.extend([
"-usetmpdir", "-cmd",
"BASE64 " + base64.b64encode(cmd.encode('ascii')).decode('ascii')
])
if job_spec.jobUsesTPP():
tpp = launch_parameters.getTPP()
if tpp:
launch_parameters.setNumberOfProcessorsOneNode(tpp)
launch_command.extend(launch_parameters.convertToJLaunchOptions())
if not lp_jobname and job_spec.getJobname():
launch_command.extend(["-name", job_spec.getJobname()])
if job_spec.getProgramName():
launch_command.extend(["-prog", job_spec.getProgramName()])
if mmutil.feature_flag_is_enabled(
mmutil.JOB_SERVER) and job_spec.isStoppable():
launch_command.extend(["-stoppable"])
# In general, the number of queue slots is dictated by the number
# of threads needed. (i.e. each job requests N queue slots. For some
# jobs that mix parallelism, this is N * M jobs. In special case where
# the driver jobs launches all the jobs, we reserve by number of processors
# (only) as the number of queue slots
nprocs = launch_parameters.getNumberOfQueueSlots()
if job_spec.driverReservesCores():
if nprocs != 1:
raise RuntimeError(
f"Setting number of slots ({nprocs}) and processors consumed "
"by driver are incompatible")
nprocs = launch_parameters.getNumberOfSubjobs()
if not nprocs:
raise RuntimeError(
"For the driver to reserve cores, please set the number of "
"subjobs explicitly (:X) as -HOST host:X")
if nprocs:
launch_command.extend(["-NPROC", str(nprocs)])
licenses = job_spec.getLicenses()
if licenses:
for license_name, token_count in licenses:
launch_command.extend(["-lic", f"{license_name}:{token_count}"])
input_file_args = input_file_arguments(job_spec, launch_parameters,
write_output)
cmdline_file_args = file_arguments_for_launch_command(input_file_args)
launch_command.extend(cmdline_file_args)
for file_ in job_spec.getOutputFiles(stream=False, incorporate=False):
launch_command.extend(["-out", file_])
for file_ in job_spec.getOutputFiles(stream=True, incorporate=False):
launch_command.extend(["-log", file_])
for file_ in job_spec.getOutputFiles(incorporate=True):
launch_command.extend(['-structout', file_])
logger.debug("launch_from_job_spec launch command %s" % launch_command)
logger.debug("launch_from_job_spec job spec %s" %
job_spec.asJSON(indent=4, sort_keys=True))
logger.debug("launch_from_job_spec task command %s" % cmd)
return launch_command
[docs]def file_arguments_for_launch_command(file_args):
"""
Given a set of "raw" file arguments, return the set of those
to be used on an actual command line.
If the given set is too long, the arguments will be written to an argfile.
(It is the responsibility of the caller to remove that file after use.)
"""
# Command-line length limit is normally about 8192 or higher.
# Choose the maximum permissible length of all file arguments
# so as to make sure not to surpass it.
if total_file_arguments_length(file_args) < 5000:
args = []
for (option, value) in file_args:
args.extend([option, value])
return args
argfile = write_argfile(file_args)
return ['-argfile', argfile]
[docs]def total_file_arguments_length(args):
"""
Determine the total length of the given set of file arguments
(which is a list of 2-tuples) as they would be represented on the command line.
"""
# The total length of all tuple elements, plus spaces between them
return sum([sum([len(a) for a in tuple]) for tuple in args]) + 2 * len(args)
[docs]def write_argfile(file_args):
"""
Write a set of file arguments to a temporary "argfile" (one option-value pair per line)
and return the name of that file. (The caller is responsible for removing it.)
:param file_args: A list of (option, value) tuples
"""
tfd, tfpath = tempfile.mkstemp(text=True)
with os.fdopen(tfd, 'w') as tfh:
for (option, value) in file_args:
tfh.write('{} "{}"\n'.format(option,
value)) # Protect spaces in filenames
return tfpath
[docs]def launch_from_job_spec(job_spec, launch_parameters, display_commandline=None):
"""
Launch a job based on its specification.
:param job_spec: Data defining the job.
:type job_spec: schrodinger.job.launchapi.JobSpecification
:param launch_parameters: Data defining how the job is run
:type launch_parameters: schrodinger.job.launchparams.LaunchParameters
:param display_commandline: commandline attribute of resulting job. Most
cases will require this value to be specified,
optional value to make it easier to refactor
out in the future.
:type display_commandline: str
:return: A schrodinger.job.jobcontrol.Job object.
"""
launch_command = _get_job_spec_launch_command(job_spec, launch_parameters)
# Provide the user temp dir for the Go launch process to locate the localhost
# jobserver directory. This is slow to obtain within the Go launch process.
if mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER):
launch_command.extend(
["-user-tempdir",
mmjob.get_job_server_parent_path()])
env = os.environ.copy()
if display_commandline:
env["SCHRODINGER_COMMANDLINE"] = display_commandline
return launch_job(launch_command,
launch_dir=launch_parameters.getLaunchDirectory(),
env=env)
#
# Job control back end stuff
#
_backend_singleton = None
[docs]def get_backend() -> Optional["_Backend"]:
"""
A convenience function to see if we're running under job control. If so,
return a _Backend object. Otherwise, return None.
"""
global _backend_singleton
if _backend_singleton is not None:
return _backend_singleton
backend = _Backend()
if backend.job_id:
_backend_singleton = backend
return backend
return None
[docs]def get_runtime_path(pathname: str) -> str:
"""
Return the runtime path for the input file 'pathname'.
If the pathname is of a type that job control will not copy to the job
directory or no runtime file can be found, returns the original path
name.
"""
if not pathname:
raise TypeError("Pathname has to be non empty string")
# We don't care about the return code, so just ignore it.
try:
runtime_path = mmjob.mmjobbe_path_runtime(pathname)
except mm.MmException as e:
if e.rc == mmjob.MMJOBBE_ERROR:
runtime_path = pathname
else:
raise
return runtime_path
[docs]def under_job_control() -> bool:
"""
Returns True if this process is running under job control; False otherwise.
"""
return bool(mmjob.mmjobbe_has_jobcontrol())
class _Backend:
"""
An interface to mmjobbe. Because this class does nothing useful if
not already running under jobcontrol, all methods should be no-ops
if there is no job_id attribute.
"""
def __init__(self):
"""
Get the job_id and jobdir pathname of the current Job Control
process (only meaningful if running under Job Control).
Using the get_backend() function should be preferred to
initializing a _Backend object explicitly.
"""
self.job_id = None
self.job_dir = None
# Store mmjobbe_terminate as an attribute so it will be available
# even if the mm module is garbage collected before the _Backend
# instance is.
self.mmjobbe_terminate = mmjob.mmjobbe_terminate
if not under_job_control():
return
try:
mmjob.mmjobbe_initialize(mm.error_handler)
self.job_id = mmjob.mmjobbe_jobid()
self.job_dir = mmjob.mmjobbe_job_directory()
logger.debug("_Backend jobid is %s" % self.job_id)
except mm.MmException as e:
if e.rc != mmjob.MMJOBBE_NO_JOBID:
logger.warn("_Backend.__init__ exception: %s", e)
raise
except Exception as e:
logger.warn("_Backend.__init__ exception: %s", e)
raise
def getJob(self) -> Job:
"""
Retrieve a read-only instance of a job record (as an instance of the
Job class).
Changes made to the job record after this method is called (e.g. via
setStructureOutputFile) will not be visible in the record returned.
It should be called again to get any updates.
"""
return Job(self.job_id)
def __getstate__(self):
raise Exception("_Backend objects cannot be pickled.")
def __del__(self):
"""
Clean up library initializations.
"""
if self.job_id:
self.mmjobbe_terminate()
self.job_id = None
def setStructureOutputFile(self, filename: str):
"""
Set the path for a file that will be the output file which will be
marked for incorporation by maestro.
"""
if self.job_id:
mmjob.mmjobbe_set_structure_output_file(filename)
def addOutputFile(self, filename: str):
"""
Add an output file or directory for job control to copy back to the
launch host. Job control will silently skip the file or directory if it
does not exist.
"""
if self.job_id:
mmjob.mmjobbe_add_outputfile(filename)
def addRequiredOutputFile(self, filename: str):
"""
Add a required output file for job control to copy back to the launch
host. If this file does not exist at the end of the job, then job
control will mark the job as "died".
"""
if self.job_id:
mmjob.mmjobbe_add_required_outputfile(filename)
def addLogFile(self, filename: str):
"""
Add a log file for this job. A log file is continuously updated as the
log file appends. Log files can only be append-only text data.
"""
if self.job_id:
mmjob.mmjobbe_add_logfile(filename)
def addMonitorFile(self, filename: str):
"""
This function has no effect.
"""
if self.job_id:
mmjob.mmjobbe_add_monitorfile(filename)
def setStructureMonitorFile(self, filename: str):
"""
This function has no effect.
"""
if self.job_id:
mmjob.mmjobbe_set_structure_monitor_file(filename)
def copyOutputFile(self, path: str):
"""
Copy a completed output file or directory back to the launch directory.
:param path: The path to the file or directory that should be copied
"""
if self.job_id:
mmjob.mmjobbe_copy_outputfile(path)
def archiveFiles(self, filenames: List[str], archive_file: str):
"""
Archive the given files into a given archive file
and add the latter as an output file.
"""
if self.job_id:
mmjob.mmjobbe_archive_files(filenames, archive_file)
def sendMessageToParent(self, message: str):
"""
Send message <message> (string) to the parent of this job.
If this job does not have a parent, nothing will be done.
Normally it should not be necessary to use this method.
"""
if self.job_id:
mmjob.mmjobbe_send_parent(message)
def addMessageName(self, msgname: str):
"""
Add message type to be queued for the backend to read using
nextMessage(). Only messages whose first word is in this list
will be returned by nextMessage().
"""
if self.job_id:
mmjob.mmjobbe_add_message_name(msgname)
def nextMessage(self) -> Optional[str]:
"""
Return next unread message from the queue if there is one or None
if there are no messages. The types of messages to return must
first be specified via addMessageName().
"""
if self.job_id:
try:
return mmjob.mmjobbe_next_message()
except mm.MmException as e:
if e.rc == mmjob.MMJOBBE_EMPTY_LIST:
return None # No message
else:
raise # Any other exception
return None
def setJobProgress(self,
steps: int = 0,
totalsteps: int = 0,
description: str = ""):
"""
Update the progress of a job.
:param steps: number of steps completed
:param totalsteps: total number of steps
:param description: text description of progress
"""
mmjob.mmjobbe_set_jobprogress(steps, totalsteps, description)
def deleteSubJob(self, jobid: str):
"""
Tell jmonitor of the backend to delete the subjob field from
the parent job record.
"""
if self.job_id:
mmjob.mmjobbe_delete_subjob(jobid)
def haltRequested(self) -> bool:
"""
Check if a halt has been requested for the current job.
"""
if self.job_id and mmjob.mmjobbe_halt_requested():
return True
else:
return False
#
# Job hosts stuff
#
[docs]class Host:
"""
A class to encapsulate host info from the schrodinger.hosts file.
Use the module level functions get_host or get_hosts to create Host
instances.
:ivar name: Label for the Host.
:ivar user: Username by which to run jobs.
:ivar processors: Number of processors for the host/cluster.
:ivar tmpdir: Temporary/scratch directory to use for jobs. List
:ivar schrodinger: $SCHRODINGER installation to use for jobs.
:ivar env: Variables to set in the job environment. List.
:ivar gpgpu: GPGPU entries. List.
:ivar queue: Queue entries only. Queue type (e.g., SGE, PBS).
:ivar qargs: Queue entries only. Optional arguments passed to the queue
submission command.
"""
[docs] def __init__(self, name: str):
"""
Create a named Host object. The various host attributes must be set
after object instatiation.
Only host-entry fields can be public attributes of a Host object.
Attributes introduced to capture other information about the entry
must be private (named with a leading underscore.)
:param name: name of the host entry.
"""
self.name = name
self.server_address = None
self._host = None
self.user = None
self.processors = 1
self.tmpdir = []
self.schrodinger = None
self.env = []
self.gpgpu = []
self.queue = None
self.qargs = None
self._is_queue = False
self._lines = []
[docs] def to_hostentry(self) -> str:
"""
Return a string representation of the Host object suitable for
including in a hosts file.
"""
lines = []
for key in entry_fields:
if hasattr(self, key):
value = getattr(self, key)
if value and value != "0":
if type(value) is list:
if key == "gpgpu":
for item in value:
lines.append("{}: {}, {}".format(
key, item[0], item[1]))
else:
for item in value:
lines.append("{}: {}".format(key, str(item)))
else:
lines.append("{}: {}".format(key, str(value)))
return "\n".join(lines)
[docs] def getHost(self) -> str:
"""
Return the name of the host, which defaults to 'name' if a
separate 'host' attribute wasn't specified.
"""
if self._host:
return self._host
else:
return self.name
[docs] def setHost(self, host: str):
"""
Store host as _host to allow us to use a property for the 'host'
attr.
"""
self._host = host
host = property(getHost, setHost)
[docs] def isQueue(self) -> bool:
"""
Check to see whether the host represents a batch queue. Returns True
if the host is a HPC queueing system.
"""
return self._is_queue
def __str__(self):
"""
Return the informal string representation of the Host -- a
comma-separated list of "Key: value" attribute pairs.
"""
return ", ".join(self._hostLines())
def _hostLines(self):
"""
Return an array of lines that if joined with newlines would create
a schrodinger.hosts entry.
"""
return self._lines
def __repr__(self):
"""
Return the formal string representation of the Host.
"""
return "Host(%s)" % self.name
[docs] def matchesRequirement(
self, resource_requirement: resource.ComputeRequirement) -> bool:
"""
Return True if this host can meet the resource requirements provided.
All hosts will meet CPU resource requirements.
"""
if resource_requirement.compute_type == resource.ComputeType.GPU:
if self.gpgpu:
return True
elif self.name == "localhost":
return gpgpu.is_any_gpu_available()
return False
# resource type CPU, all hosts allow cpu jobs at this time
return True
[docs]def get_hostfile() -> str:
"""
Return the name of the schrodinger.hosts file last used by get_hosts().
The file is found using the standard search path ($SCHRODINGER_HOSTS,
local dir, $HOME/.schrodinger, $SCHRODINGER).
"""
return mmjob.mmjob_hostfile_path()
[docs]def hostfile_is_empty(host_filepath: str) -> bool:
"""
Return if the given host_filepath host is empty, meaning it contains only
the localhost entry. If the host_filepath str is empty or invalid, then this
function will raise an invalid path exception - IOError.
:param host_filepath: schrodinger.hosts file to use.
:type host_filepath: str
"""
if not os.path.isfile(host_filepath):
raise OSError("Host file not found in path: " + host_filepath)
try:
with _temp_hostfile(host_filepath) as num_hosts:
hosts = num_hosts
except (mm.MmException, MissingHostsFileException):
hosts = 0
# check if host only contains the localhost
return not hosts or len(hosts) == 1 and hosts[0].name == 'localhost'
[docs]def get_installed_hostfiles(root_dir="") -> List[str]:
"""
Return the pathname for the schrodinger.hosts file installed in the most
recent previous installation directory we can find.
If a root pathname is passed in, previous installations are searched for
there. Otherwise, we look in the standard install locations.
"""
if not root_dir:
root_dir = INSTALL_ROOT
if not os.path.isdir(root_dir):
return []
schrod_quarterly_re = re.compile(r"[a-z](20\d\d)-(\d)", re.IGNORECASE)
schrod_yearly_re = re.compile(r"[a-z](20\d\d)", re.IGNORECASE)
hostfiles = []
for dirname in os.listdir(root_dir):
hostsfile = os.path.join(root_dir, dirname, HOSTS_FILE)
if os.path.exists(hostsfile):
m = schrod_quarterly_re.search(dirname)
if m:
hostfiles.append((m.group(1), m.group(2), hostsfile))
continue
m = schrod_yearly_re.search(dirname)
if m:
hostfiles.append((m.group(1), "0", hostsfile))
continue
hostfiles.sort(reverse=True)
return [h[-1] for h in hostfiles]
[docs]def get_hosts() -> List[Host]:
"""
Return a list of all Hosts in the schrodinger.hosts file. After
this is called, get_hostfile() will return the pathname for the
schrodinger.hosts file that was used.
Raises UnreadableHostsFileException or MissingHostsFileException on error.
"""
hosts = []
# PANEL-3412
try:
mmjob.mmjob_hosts_reload()
except mm.MmException as e:
if e.rc == mmjob.MMJOB_HOSTS_MISSING:
raise MissingHostsFileException(
"Hosts file could not be loaded. (It could be missing.)")
# NOTE: If the schrodinger.hosts file is present but is empty, the host
# list will contain one host: localhost.
nhosts = _get_num_hosts()
for i in range(1, nhosts + 1):
name = mmjob.mmjob_host_name(i)
hosts.append(_get_host(name))
return hosts
@contextlib.contextmanager
def _temp_hostfile(fname: str):
try:
orig_hostfile = os.environ.get("SCHRODINGER_HOSTS", None)
os.environ["SCHRODINGER_HOSTS"] = fname
# The only way to check for validity is to actually set it and see what
# happens
hosts = get_hosts()
yield hosts
finally:
# now we restore the original value
if orig_hostfile is None:
del os.environ["SCHRODINGER_HOSTS"]
else:
os.environ["SCHRODINGER_HOSTS"] = orig_hostfile
# clear the bad host after resetting the environmental variable
get_hosts()
[docs]def hostfile_is_valid(fname: str) -> Tuple[bool, str]:
"""
:param fname: The full path of the host file to validate
:return: a (bool, str) tuple indicating whether the host file is valid
"""
is_valid = False
msg = ""
try:
with _temp_hostfile(fname):
is_valid = True
except (mm.MmException, MissingHostsFileException) as e:
msg = str(e)
return (is_valid, msg)
[docs]def get_host(name: str) -> int:
"""
Return a Host object for the named host. If the host is not
found, we return a Host object with the provided name and details
that match localhost. This matches behavior that jobcontrol uses.
Raises UnreadableHostsFileException or MissingHostsFileException on error.
"""
return _get_host(name)
def _get_num_hosts() -> int:
"""
Returns the number of hosts in the schrodinger.hosts file. Will raise
UnreadableHostsFileException or MissingHostsFileException on error.
"""
try:
num_hosts = mmjob.mmjob_hosts_length()
except mm.MmException as err:
if err.rc == mmjob.MMJOB_INCLUDE_MISSING:
raise UnreadableHostsFileException(
"The Schrodinger hosts file is invalid.")
raise
if num_hosts == 0:
raise MissingHostsFileException(
"Hosts file could not be loaded. (It could be missing.)")
return num_hosts
def _get_host(name) -> Host:
"""
Return a Host object for 'name', with all attributes read in from
the corresponding entry in the schrodinger.hosts file. Requires that mmjob
be initialized.
Raises UnreadableHostsFileException or MissingHostsFileException on error.
"""
original_host_name = None
h = Host(name)
try:
num_keys = mmjob.mmjob_host_keys_length(name)
except mm.MmException as e:
_get_num_hosts() # To verify validity of the hosts file. Will raise
# a more useful exception.
if e.rc == mmjob.MMJOB_NO_SUCH_HOST:
original_host_name = name
name = "localhost"
num_keys = mmjob.mmjob_host_keys_length(name)
else:
raise
setattr(h, "server_address", mmjob.mmjob_host_server(name))
h._lines = ["name: %s" % h.name]
# Go through each schrodinger.hosts entry for this host:
for i in range(1, num_keys + 1):
# Lowercase the names for consistency, since mmjob_host_get_int etc.
# don't care about the case and our style guide precscribes
# lowercase attribute names.
attr = mmjob.mmjob_host_keys_item(name, i).lower()
if mmjob.mmjob_host_is_list(name, attr) and attr != "gpgpu":
if not hasattr(h, attr):
setattr(h, attr, [])
for j in range(1, mmjob.mmjob_host_list_length(name, attr) + 1):
value = mmjob.mmjob_host_list_item(name, attr, j)
getattr(h, attr).append(value)
h._lines.append(f"{attr}: {value}")
elif attr == "gpgpu":
if not hasattr(h, attr):
setattr(h, attr, [])
for j in range(1, mmjob.mmjob_host_gpgpu_length(name) + 1):
value = mmjob.mmjob_host_list_item(name, attr, j)
(index, description) = get_gpgpu_params(value)
getattr(h, attr).append((index, description))
h._lines.append(f"{attr}: {index} {description}")
elif attr == 'processors':
value = mmjob.mmjob_host_get_int(name, attr)
setattr(h, attr, value)
if value > 1:
h._lines.append("processors: %s" % h.processors)
elif attr == 'processors_per_node':
value = mmjob.mmjob_host_get_str(name, attr)
# convert the string that mmjob gives us to an integer
setattr(h, attr, int(value))
if h.processors_per_node > 1:
h._lines.append("processors_per_node: %s" %
h.processors_per_node)
else:
value = mmjob.mmjob_host_get_str(name, attr)
setattr(h, attr, value)
h._lines.append(f"{attr}: {value}")
# sort lines in the predefined order of fields
h._lines.sort(key=lambda line: field_sortkey.get(line.split(':')[0], 999))
if mmjob.mmjob_host_is_queue(name):
h._is_queue = True
else:
h._is_queue = False
if original_host_name:
h._host = original_host_name
return h
[docs]def get_gpgpu_params(gpgpu_str: str) -> Tuple[str, str]:
"""
Convert a gpgpu string (ex. "0,V100") to a tuple (index, description).
Raise an exception if the string is invalid.
:param gpugpu_str: gpgpu line from schrodinger.hosts (ex. "0,V100")
:type gpgpu_str: str
:rtype: tuple(str, str)
:raises: ValueError if the input is invalid
"""
fields = re.split("[, ]+", gpgpu_str, 1)
if len(fields) < 2:
raise ValueError("ERROR: Invalid gpgpu value: '" + gpgpu_str + "'")
(index, description) = fields
return (index, description)
[docs]def host_str_to_list(hosts_str: str) -> List[Tuple[str, int]]:
"""
Convert a hosts string (Ex: "galina:1 monica:4") to a list of tuples.
First value of each tuple is the host, second value is # of cpus.
"""
# Implemented because of # EV 55179
host_list = []
# Use both white spaces and commas as host entry separators (Ev:56146):
for spacesplit in hosts_str.split():
for hostentry in spacesplit.split(','):
if hostentry: # not empty string
s = hostentry.split(':')
if len(s) == 1:
ncpus = None
else:
try:
ncpus = int(s[1])
except ValueError: # ncpus is not an integer
raise ValueError('ERROR: Could not parse host string: '
f"{hosts_str}")
host_list.append((s[0], ncpus))
return host_list
[docs]def host_list_to_str(host_list: List[Tuple[str, int]]) -> str:
"""
Converts a hosts list [('host1',1), ('host2', 10)] to a string.
Output example: "host1:1,host2:10"
"""
host_strings = []
for hostname, ncpus in host_list:
if ncpus is not None:
host_strings.append("%s:%i" % (hostname, ncpus))
else: # No #cpus specified; means unlimited CPUs:
host_strings.append(hostname)
# Return comma-separated host list string:
return ",".join(host_strings)
[docs]def get_command_line_host_list() -> Optional[List[Tuple[str, int]]]:
"""
Return a list of (host, ncpu) tuples corresponding to the host list that
is specified on the command line.
This function is meant to be called by scripts that are running under a
toplevel job control script but are not running under jlaunch.
The host list is determined from the following sources:
1. SCHRODINGER_NODELIST
2. JOBHOST (if only a single host is specified)
3. "localhost" (if no host is specified)
If no SCHRODINGER_NODELIST is present in the environment, None is
returned.
"""
hosts = None
if 'SCHRODINGER_NODELIST' in os.environ:
nodelist_str = os.environ['SCHRODINGER_NODELIST']
# Ev:63971 SCHRODINGER_NODELIST is "" for single host entries
if nodelist_str == "": # Only one host specified via -HOST:
nodelist_str = os.environ['JOBHOST']
if nodelist_str == "": # localhost
nodelist_str = 'localhost'
hosts = host_str_to_list(nodelist_str)
return hosts
_backend_hosts = []
[docs]def get_backend_host_list() -> Optional[List[Tuple[str, int]]]:
"""
Return a list of (host, ncpu) tuples corresponding to the host list as
determined from the SCHRODINGER_NODEFILE.
This function is meant to be called from scripts that are running under
jlaunch (i.e. backend scripts).
Returns None if SCHRODINGER_NODEFILE is not present in the environment.
"""
global _backend_hosts
# PYTHON-2012: when something uses the contents of SCHRODINGER_NODEFILE,
# its value is not supposed to be passed on to subjobs. This is
# accomplished by unsetting it here.
#
# I'm somewhat worried that people are calling this function more than
# once in the same process, however, so am caching the results in the
# global variable so that its behavior is consistent within a process at
# least.
if _backend_hosts:
return _backend_hosts
if 'SCHRODINGER_NODEFILE' in os.environ:
_hosts = collections.OrderedDict()
nodefilename = os.environ['SCHRODINGER_NODEFILE']
with open(nodefilename) as fh:
for line in fh:
host = line.strip()
if host:
hostpieces = host.split(":")
if len(hostpieces) == 2:
ncpu = int(hostpieces[1])
else:
ncpu = 1
if hostpieces[0] in _hosts:
_hosts[hostpieces[0]] += ncpu
else:
_hosts[hostpieces[0]] = ncpu
del os.environ['SCHRODINGER_NODEFILE']
_backend_hosts = list(_hosts.items())
# Return a copy of the global variable to ensure the return value
# remains consistent.
return _backend_hosts[:]
else:
return None
[docs]def calculate_njobs(host_list: Union[str, List[Tuple[str, int]]] = None) -> int:
"""
Derive the number of jobs from the specified host list.
This function is useful to determine number of subjobs if user didn't
specified the '-NJOBS' option.
:param host_list: String of hosts along with optional number of subjobs
-HOST my_cluster:20 or list of tuples of hosts, typically
one element [(my_cluster, 20)]
If host list is not specified then it uses get_command_line_host_list() to
determine njobs, else uses the user provided host list.
"""
if host_list:
if isinstance(host_list, str):
host_list = host_str_to_list(host_list)
else:
host_list = get_command_line_host_list()
ncpus_sum = 0
for (hostname, ncpus) in host_list:
if ncpus is None:
ncpus_sum += 1 # Assume 1 (even for queued hosts)
else:
ncpus_sum += ncpus
return ncpus_sum
[docs]def is_valid_hostname(hostname: str) -> bool:
"""
Checks if the hostname is valid.
:param hostname: host name
"""
return mmjob.mmjob_is_valid_hostname(hostname)
[docs]def get_jobname(filename: Optional[str] = None) -> Optional[str]:
"""
Figure out the jobname from the first available source: 1) the
SCHRODINGER_JOBNAME environment variable (comes from -JOBNAME during
startup); 2) the job control backend; 3) the basename of a given filename.
:param filename: if provided, and the jobname can't otherwise be determined,
(e.g., running outside job control with no -FILENAME
argument), construct a jobname from its basename.
:return: jobname (may be None if filename was not provided)
"""
env_jobname = os.environ.get('SCHRODINGER_JOBNAME')
if env_jobname:
return os.environ['SCHRODINGER_JOBNAME']
backend = get_backend()
if backend:
return backend.getJob().Name
if filename:
return fileutils.get_jobname(filename)
else:
return None
[docs]def register_job_output(job: Job):
"""
Registers the output and log files associated with the given job to the
backend if running under jobcontrol.
:param job: job from which to extract output/log files
"""
backend = get_backend()
if backend:
for filename in job.OutputFiles + job.LogFiles:
backend.addOutputFile(filename)