"""
Contains `TestJCJob`, `TestSPJob`, `TestQueue`, and `Runner` classes.
`TestJCJob` is a test utility-specific subclass of queue.JobControlJob.
`TestSPJob` is a test utility-specific subclass of queue.SubprocessJob. These
classes allow the test utility to track some information that we care about as
a job executes (for instance job duration and test_id). `TestQueue` is a
subclass of queue.JobDJ, and allows further control on reporting.
`Runner` controls all job running parameters. It is also responsible for
actually running the jobs and requesting their workups. The meat is in
`Runner.__call__`.
"""
import contextlib
import datetime
import functools
import logging
import os
import platform
import shlex
import sys
import textwrap
import time
import traceback
from typing import Callable
from typing import TYPE_CHECKING
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
import psutil
from schrodinger.job import jobcontrol
from schrodinger.job import queue
from schrodinger.utils import log
from schrodinger.utils import subprocess
from schrodinger.utils.env import EnvironmentContext
from . import common
from . import client
from . import constants
from . import process_management
from . import sysinfo
from . import testscripts
from . import workup
from .joberrors import run_postmortem
if TYPE_CHECKING:
from . import base_executable # circular import
logger = common.logger
# Globals
DOWNLOAD_FAILED = 'file download failed'
SCHRODINGER_LICENSE_CHECKOUTS = 'SCHRODINGER_LICENSE_CHECKOUTS'
SCHRODINGER_STU_TEST_ID = 'SCHRODINGER_STU_TEST_ID'
STDOUT_FILENAME = 'stdout.txt'
[docs]class JobDJError(RuntimeError):
pass
# ****************************************************************************
[docs]class TestJob:
"""
Interface for the TestJCJob and TestSPJob classes
"""
@property
def duration(self) -> Optional[int]:
return self._duration
@property
def exit_status(self) -> str:
"""
Returns an exit status from the list of available Jobcontrol exit
statuses.
- For TestJCJobs this comes from job.ExitStatus, which will return
a Jobcontrol exit status (from Job.pm in MMSHARE_EXEC) if a job
object (i.e. schrodinger.job.jobcontrol.job) is found. If a job
object is not found, it could be because the job is not started or
it failed to launch. These special cases are handled by setting
the initial exit status to "Job not started" and resetting it to
"Falied to launch" if the job fails to launch (see
TestJCJob.docommand).
- For TestSPJobs a we limit the available exit statuses to "finished",
"died", and "killed" because there is no job object (i.e.
schrodinger.job.jobcontrol.job) associated with TestSPJobs. Like
TestJCJobs the initial exit status is set to "Job not started" and is
reset it to "Failed to launch" if the job fails to launch (see
TestSPJob.docommand).
"""
raise NotImplementedError("Needs to be defined in job specific class")
@property
def test_id(self) -> int:
return self._test_id
@property
def canceled_by_timeout(self) -> bool:
"""
Return True only if the job was cancelled by JobDJ, this mostly due
to timeout.
"""
# this variable comes from BaseJob class
return self.abort_job
[docs] @contextlib.contextmanager
def setupTestEnvironment(self, host: str = 'localhost'):
"""
Set environment variables used during testing.
SCHRODINGER_LICENSE_CHECKOUTS to a file in the directory: SHARED-2727
SCHRODINGER_STU_TEST_ID to current test. (SHARED-3352)
Remove TOPLEVEL_HOST_ARGS for Jaguar. (SHARED-4089)
"""
# Only set SCHRODINGER_LICENSE_CHECKOUTS if the job runs locally. For
# remote jobs, this record of licenses is not guaranteed to be
# consistent. See SHARED-2727.
# Also skip license testing if the test is tagged with the skip license
# testing tag.
if use_license_checking(self._test):
# Already in the test execution directory.
license_check_file = os.path.abspath(constants.LICENSE_CHECK_FILE)
if os.path.exists(license_check_file):
os.remove(license_check_file)
else:
license_check_file = None
with EnvironmentContext(SCHRODINGER_STU_TEST_ID, str(self.test_id)):
with EnvironmentContext(SCHRODINGER_LICENSE_CHECKOUTS,
license_check_file):
yield
[docs] def infoStatus(self, status: str):
self._jobdj.printStatus(self, status)
[docs] def debugStatus(self, status: str):
status = self._jobdj.formatStatus(self, status)
queue.logger.debug(status)
[docs] def warnStatus(self, status: str):
status = self._jobdj.formatStatus(self, status)
queue.logger.warning(status)
# ****************************************************************************
[docs]class TestJCJob(queue.JobControlJob, TestJob):
"""
Like a normal JobControlJob, but::
- Ignore failures to launch.
- Be aware of scriptID.
- Easily access job duration and exit status.
"""
[docs] def __init__(self,
command: List[str],
command_dir: Optional[str],
test_id: Union[int, str] = None,
test: testscripts.TestScript = None,
timeout: Optional[int] = None,
runs_locally: bool = False,
**kwargs):
"""
Overridden to add the test_id.
:param command: Command to be run.
:param command_dir: Directory to run in.
:param test_id: Unique identifier of script.
:param test: The representation of all test data for the job that is
being run.
:param timeout: Duration in seconds after which to kill the job. None
is never.
:param runs_locally: Should this job be launched on localhost (never
remote hosts)?
"""
self._test_id = test_id
"""The test number in the database (or the directory name in validate
mode."""
if not test_id:
self._test_id = os.path.basename(command_dir)
# The STU test object
self._test = test
# Set initial value to be pulled by exit_status if a job has not been
# launched
self._exit_status = constants.JOB_NOT_STARTED
# Job dies after this period.
self.timeout = timeout
self._runs_locally = runs_locally
super().__init__(command=command,
command_dir=command_dir,
timeout=timeout,
**kwargs)
self.name = kwargs.get('name', 'STU#%s' % self._test_id)
self.exceptions_caught = 0
[docs] def doCommand(self, host: str = 'localhost', *args, **kwargs):
"""
Overridden to ignore errors. Executes the command described by
self._command. The parent class has two required arguments, but has
the call signature `doCommand(*args, **kwargs)`, hence its usage here.
"""
self.debugStatus('prelaunch')
try:
with self.setupTestEnvironment():
with add_jobcontrol_handler(STDOUT_FILENAME):
super().doCommand(host, *args, **kwargs)
except Exception as err:
self.state = queue.JobState.FAILED
queue.logger.exception(" Jobcontrol error message: %s" % err)
with open(STDOUT_FILENAME, 'a') as stdout:
stdout.write(f'{err}\n')
else:
with open(STDOUT_FILENAME, 'a') as stdout:
if self.launch_error:
stdout.write(f'{self.launch_error}\n')
self._exit_status = None
self.host = host
[docs] def runsLocally(self) -> bool:
"Force the test to be run on the host on which the JobDJ is running."
return self._runs_locally
@property
def duration(self) -> Optional[int]:
"""
Duration of the job, according to the job record. Implemented as a
property to provide consistent interface with `TestSPJob`. Also gives
duration for RUNNING jobs, so as to be consistent with `TestSPJob`.
"""
job = self.getJob()
# Still running
if job and job.StartTime and not job.StopTime:
current = time.time()
start = time.mktime(
time.strptime(job.StartTime, jobcontrol.timestamp_format))
return current - start
# Not started or complete: returns None or the duration value.
return self.getDuration()
@property
def exit_status(self) -> str:
"""
Exit status of the job, according to the job record. Implemented as a
property to provide consistent interface with `TestSPJob`.
"""
# Not simply returning self._getState() because we want a consistent
# exit status across jobcontrol and non jobcontrol jobs.
job = self.getJob()
if job and not self._exit_status:
try:
return job.ExitStatus
except RuntimeError as err:
if 'stranded' in str(err):
return 'stranded'
if 'exited' in str(err):
return 'exited'
raise
else:
return self._exit_status
@exit_status.setter
def exit_status(self, value: str):
self._exit_status = value
# ***************************************************************************
[docs]class TestSPJob(queue.SubprocessJob, TestJob):
"""
Like a normal SubprocessJob job, but:
- Ignore failures to launch.
- Kill subjobs when killing this job.
- Be aware of scriptID.
- Access job duration and status.
"""
[docs] def __init__(self,
command: List[str],
command_dir: Optional[str] = None,
test_id: Union[int, str] = None,
test: testscripts.TestScript = None,
timeout: Optional[int] = None,
**kwargs):
"""
Overridden to add the test_id.
:param command: Command to be run.
:param command_dir: Directory to run in.
:param test_id: Unique identifier of script.
:param test: The representation of all test data for the job that is
being run.
:param timeout: Duration in seconds after which to kill the job. None
is never.
"""
self._test_id = test_id
"""The test number in the database (or the directory name in validate
mode."""
if not test_id:
self._test_id = os.path.basename(command_dir)
# The STU test object
self._test = test
# Set initial value to be pulled by exit_status if a job has not been
# launched
self._exit_status = constants.JOB_NOT_STARTED
super().__init__(command=command,
command_dir=command_dir,
timeout=timeout,
**kwargs)
self.name = kwargs.get('name', 'STU#%s' % self._test_id)
[docs] def preCommand(self, *args, **kwargs):
"""
Overridden to open standard files for recording standard error and
standard out. Also marks the start time of the job.
"""
super().preCommand(*args, **kwargs)
# Do this AFTER super, so that the files are opened in the correct
# directory
self._stdout = open(STDOUT_FILENAME, 'w')
self._stderr = subprocess.STDOUT
self.addFinalizer(TestSPJob.cleanUp)
self._start = time.time()
[docs] def doCommand(self, *args, **kwargs):
"""
Overridden to ignore errors. Executes the command described by
self._command.
"""
self.debugStatus('prelaunch')
try:
with self.setupTestEnvironment():
super().doCommand(*args, **kwargs)
except Exception as err:
self.state = queue.JobState.FAILED
self._exit_status = constants.FAILED_TO_LAUNCH
queue.logger.exception(" Subprocess error message: %s" % err)
[docs] def update(self):
"""
Overridden to make errors non fatal.
"""
try:
super().update()
except Exception as err:
self.state = queue.JobState.FAILED
queue.logger.exception(" Subprocess error message: %s" % err)
[docs] def cleanUp(self):
"""
Close standard files for recording standard error and standard out.
"""
# Force data to be moved from OS buffer to disk. Required on Windows.
if not self._stdout.closed:
self._stdout.flush()
os.fsync(self._stdout.fileno())
self._stdout.close()
[docs] def getStatusStrings(self) -> Tuple[str, str, str]:
"""
Return a tuple of status strings for printing by `JobDJ`.
:return: (status, jobid, host)
"""
if self.hasExited():
status_string = self.exit_status
else:
status_string = "unknown"
jobid = ' [none]'
host = platform.node()
return status_string, jobid, host
@property
def exit_status(self) -> str:
"""
Exit status of the job, according to subprocess. Implemented as a
property to provide consistent interface with `TestJCJob`.
"""
if self.state == queue.JobState.DONE:
if self.abort_job:
return 'killed'
return queue.FINISHED
elif self.state in (queue.JobState.FAILED,
queue.JobState.FAILED_RETRYABLE):
if self._exit_status == constants.FAILED_TO_LAUNCH:
return self._exit_status
else:
return 'died'
else:
return self._exit_status
[docs] def kill(self):
"""First, kill children, then myself"""
logger.warning(f" Killing {self.test_id}")
try:
test_process = psutil.Process(self._subprocess.pid)
logger.warning("status of {self.test_id}: "
" ".join(test_process.cmdline()))
except (psutil.NoSuchProcess, psutil.AccessDenied):
# Don't worry about processes that have already ended.
# Windows: NoSuchProcess, Mac AccessDenied
self.cleanUp()
if self.state == queue.JobState.ACTIVE:
self.state = queue.JobState.DONE
return
status = process_management.format_process_status(test_process)
logger.warning(status)
self._stdout.write("process status at kill point:\n" + status)
self._stdout.flush()
messages = process_management.kill_launched_jobcontrol_jobs(
self.test_id)
if messages:
logger.warning(messages)
self._stdout.write(messages)
self._stdout.flush()
process_management.kill_process_children(test_process)
super().kill()
try:
if test_process.is_running():
test_process.kill()
except (psutil.NoSuchProcess, psutil.AccessDenied):
# Windows: Don't worry about processes that have already ended.
pass
self.cleanUp()
[docs]class TestQueue(queue.JobDJ):
"""
Like a normal JobDJ, but:
- Print the script ID at status points
- Run workups
"""
[docs] def __init__(self,
hosts: Optional[List[Tuple[str, int]]] = None,
verbosity: str = 'quiet',
timeout: Optional[int] = None):
update_delay = min(5, timeout or 5)
super().__init__(hosts=hosts,
verbosity=verbosity,
max_retries=0,
max_failures=queue.NOLIMIT,
update_delay=update_delay,
job_class=TestJCJob)
self.disableSmartDistribution()
self.timeout = timeout
[docs] def printStatus(self,
job: Optional[TestJob] = None,
action: Optional[str] = None):
if job is None:
self.printHeader()
return
status = self.formatStatus(job, action)
if action in ('launched', 'started'):
queue.logger.warning(status)
else:
queue.logger.info(status)
def _setLoggerVerbosity(self):
if self._verbosity == "quiet":
queue.logger.setLevel(log.WARNING)
elif self._verbosity == "normal":
queue.logger.setLevel(log.INFO)
elif self._verbosity == "verbose":
queue.logger.setLevel(log.DEBUG)
elif self._verbosity == "silent":
queue.logger.setLevel(log.CRITICAL)
else:
self._verbosity = "quiet"
queue.logger.setLevel(log.WARNING)
def _start(self, **kwargs):
try:
self._original_verbosity = self._verbosity
self._verbosity = "quiet"
super()._start(**kwargs)
finally:
self._verbosity = self._original_verbosity
self._setLoggerVerbosity()
[docs] def addJob(self,
job: TestJob,
add_connected: bool = True,
timeout: Optional[int] = None,
**kwargs):
if timeout is None:
timeout = self.timeout
if isinstance(job, TestJob):
job.timeout = self.timeout
return super().addJob(job, add_connected, timeout=timeout, **kwargs)
def _jobFailed(self, job: TestJob):
super()._jobFailed(job)
try:
job.cleanUp()
except AttributeError:
pass
# ****************************************************************************
[docs]class Runner:
"""
Runner controls all job running parameters within the backend test utility
code. It is also responsible for actually running the jobs and requesting
their workups. The meat is in `Runner.addScript` and `Runner.__call__`.
"""
# ************************************************************************
[docs] def __init__(self, ui: "base_executable.TestUtility"):
"""
Initialize Runner Class
:param ui: Contains information about the user interface (i.e. the
command line arguments)
"""
self.additionalArgs = ui.additionalArgs
self.verbosity = ui.verbosity
self.ncpu = ui.ncpu
self.postmortem = getattr(ui, 'postmortem', False)
self.timeout = getattr(ui, 'timeout', None)
self.reporter = getattr(ui, 'reporter', None)
# keys off test_id, values are testscripts.TestScript
self.tests = {}
self.job_runner = TestQueue(verbosity=ui.verbosity,
timeout=self.timeout)
self.job_runner.disableSmartDistribution()
self.xvfb_cmd = None
# ************************************************************************
[docs] def addScript(self, test: testscripts.TestScript):
"""
Add test to be executed by self.__call__. Adds the test
information to self.job_runner.
:param test: Test to be executed.
"""
test_id = test.id
try:
# Should replace this with the attribute of the test, but be
# careful with validate mode.
test_id = int(test_id)
except ValueError:
test_id = test_id or test.directory
# This protects against tests that do not have an automate command
cmd = test.command
if not cmd:
logger.warning('Test "%s" has no command, skipping' % test_id)
return
if not test.directory and not test_id:
raise TypeError("I don't know where to run this job (no test "
"directory or test id")
self.tests[test_id] = test
jobdir = test.directory or str(test_id)
jobdir = os.path.join(os.getcwd(), jobdir)
# Add additional args to the cmd
cmd += ' ' + self.additionalArgs
# use shlex for shell like parsing
# makes for standard space/quote protection
cmd_list = [] # initial list for prepped commands to be added to
# make tests that require display to run with xvfb on Linux
if "require:display" in test.tags and sys.platform.startswith(
"linux") and 'DISPLAY' not in os.environ:
# Determine the appropriate xvfb invocation (see get_xvfb_cmd for
# an explanation) and cache it on first use because the version of
# xvfb is unlikely to change between commands.
if not self.xvfb_cmd:
self.xvfb_cmd = get_xvfb_cmd()
cmd_list = self.xvfb_cmd + cmd_list
try:
for i_cmd in shlex.split(cmd):
# Replace keywords in cmd with run specific values
i_cmd = i_cmd.replace("${NCPU}", str(self.ncpu))
i_cmd = i_cmd.replace("${HOST}", sysinfo.REMOTE.host)
i_cmd = i_cmd.replace("${CWD}", jobdir)
i_cmd = i_cmd.replace("${SHARED}",
os.path.join(os.getcwd(), "shared"))
i_cmd = i_cmd.replace('$SCHRODINGER', os.getenv('SCHRODINGER'))
i_cmd = i_cmd.replace('${SCHRODINGER}',
os.getenv('SCHRODINGER'))
# Add element of cmd to list which will be passed to JC or SP
cmd_list.append(i_cmd)
except Exception:
logger.exception(
f"WARNING: Failed to parse command string for {test_id}")
# amend the command line for jobs that need DRIVERHOST to be specified
# (this will only be done when a remote host is specified)
if "require:driverhost" in test.tags:
cmd_list.append("-DRIVERHOST")
cmd_list.append(sysinfo.REMOTE.host)
# Need ${SLASH} keyword to workaround QA-618
cmd_list = [c.replace("${SLASH}", "/") for c in cmd_list]
if test.useJC():
runs_locally = not test.runsRemotely()
jobdict = dict(command_dir=jobdir,
test_id=test_id,
test=test,
runs_locally=runs_locally)
if "${NCPU}" not in cmd:
jobdict['procs'] = self.ncpu
job = TestJCJob(cmd_list, **jobdict)
else:
job = TestSPJob(cmd_list,
command_dir=jobdir,
test_id=test_id,
test=test)
self.job_runner.addJob(job)
return job
# ************************************************************************
def __call__(self) -> bool:
"""
Execute and run workups on all tests added to instance of Runner
object. Outcomes stored in the TestScript objects in self.tests.
This needs to be separate from the extraction step so that we can
validate new tests.
"""
if not self.job_runner.total_added:
return True
logger.debug("Executing tests.")
self.problems = []
workups = workup.discover_workups()
try:
self.job_runner.run(status_change_callback=functools.partial(
self.jobStatusChange, workups))
except Exception as err:
msg = 'ERROR: Failure in executing jobs, Killing jobs: %s' % err
logger.exception(msg)
self.job_runner.killJobs()
raise JobDJError(msg)
if self.problems:
msg = ('ERROR: Critical problems during test execution:\n %s' %
'\n '.join(self.problems))
raise JobDJError(msg)
return True
[docs] def jobStatusChange(self, workups: Dict[str, Callable], job: TestJob):
if job.state in {
queue.JobState.FAILED, queue.JobState.FAILED_RETRYABLE
}:
if job.launch_error:
job._exit_status = constants.FAILED_TO_LAUNCH
with open(os.path.join(job.getCommandDir(), STDOUT_FILENAME),
'a') as stdout:
stdout.write(f'{job.launch_error}\n')
elif job.state == queue.JobState.ACTIVE:
job._exit_status = None
if not job.hasExited() or self.tests[job.test_id].outcome is not None:
return
test = self.tests[job.test_id]
_update_test_script_on_job_completion(test, job)
self.problems.extend(
run_workup(test, job, workups, self.postmortem, self.reporter))
def _update_test_script_on_job_completion(test: testscripts.TestScript,
job: TestJob):
"""
After running a job, the test object needs to be updated to include
additional information.
"""
# Timing is -1 to distinguish between jobs that round to zero time, and
# this value is serialized to DB as an integer
if job.duration is None:
test.timing = -1
else:
test.timing = job.duration
test.exit_status = job.exit_status
[docs]def run_workup(test: testscripts.TestScript, job: TestJob,
workups: Dict[str, Callable], generate_postmortem: bool,
reporter: Optional[client.ResultReporter]) -> List[str]:
"""
Run workup code and report results.
If an upload failed or an unexpected error occurred (a workup failure is
considered expected, since it can be reporter upon), then return a list
of str error messages.
"""
problems = []
# Run workup and record results
try:
job.debugStatus('start workup')
test.runWorkup(job, registered_workups=workups)
if test.outcome:
job.infoStatus('workup succeeded')
else:
job.warnStatus('workup failed')
# This should be protected in workup_outcome, instead.
except Exception as err:
msg = 'Failure in workup: %s' % err
logger.exception(msg)
job.infoStatus('FAILURE EXECUTING WORKUP')
test.outcome = False
if not test.workup_messages:
test.workup_messages = msg
finally:
jc_exit_status = constants.JC_outcome_codes.get(test.exit_status, False)
# If the exit status was unexpected, run a postmortem.
# Also, all KNIME tests with a failing workup should
# include a postmortem.
if generate_postmortem and not test.outcome:
if not jc_exit_status or test.product == 'KNIME':
try:
# Set SCHRODINGER_STU_TEST_ID so that this gets reported
# from errors within postmortem filestore connections.
with EnvironmentContext(SCHRODINGER_STU_TEST_ID,
str(job.test_id)):
run_postmortem(job, test.product)
except Exception:
job.warnStatus("failed to run postmortem")
msg = f"Reporting failed for test {job.test_id}"
logger.exception(msg)
problems.append(f"{msg}: {traceback.format_exc()}")
if reporter:
try:
reporter.report(test)
job.debugStatus('reported result')
except Exception as err:
job.warnStatus('failed to report result')
msg = 'Reporting failed for test %s' % job.test_id
logger.exception(msg)
print(msg + ': ' + str(err))
problems.append(msg + ': ' + str(err))
return problems
[docs]def get_xvfb_cmd() -> List[str]:
"""
xvfb-run needs the -a (auto server number) option, except on CentOS 7 (and
possibly other OSes) where that option is superceded by the -d (auto
display) option. As a bonus, the long option --auto-display is shown in the
help on CentOS 7 but does not actually work. Furthermore, xvfb-run does not
have a --version flag which could be used to reason about supported options.
To determine what flags are supported, try to run a no-op command with
`xvfb-run -d`, and if that fails, use -a.
"""
xvfb_cmd = ['xvfb-run', '-d']
try:
subprocess.run(
xvfb_cmd + [':'],
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
except subprocess.CalledProcessError:
xvfb_cmd = ['xvfb-run', '-a']
return xvfb_cmd
[docs]def use_license_checking(test: testscripts.TestScript) -> bool:
"""
Return True if license checking should be enabled for this test run.
"""
# in production test is not None, but is sometimes None in unittest
if not test:
return True
elif constants.SKIP_LICENSE_TAG in test.tags:
return False
elif "require:specific_host" in test.tags:
return False
elif sysinfo.REMOTE.name != sysinfo.LOCAL.name:
return False
return True
[docs]@contextlib.contextmanager
def add_jobcontrol_handler(filename: str):
"""
Within a context, modify jobcontrol logger to log messages to
a particular file.
:param filename: name of file to log jobcontrol launch messages to
"""
handler = logging.FileHandler(filename)
original_level = jobcontrol.logger.getEffectiveLevel()
jobcontrol.logger.setLevel(logging.INFO)
jobcontrol.logger.addHandler(handler)
try:
yield
finally:
jobcontrol.logger.removeHandler(handler)
jobcontrol.logger.setLevel(original_level)
handler.close()