"""
Contains `TestJCJob`, `TestSPJob`, `TestQueue`, and `Runner` classes.
`TestJCJob` is a test utility-specific subclass of queue.JobControlJob.
`TestSPJob` is a test utility-specific subclass of queue.SubprocessJob.  These
classes allow the test utility to track some information that we care about as
a job executes (for instance job duration and test_id).  `TestQueue` is a
subclass of queue.JobDJ, and allows further control on reporting.
`Runner` controls all job running parameters.  It is also responsible for
actually running the jobs and requesting their workups.  The meat is in
`Runner.__call__`.
"""
import contextlib
import datetime
import functools
import logging
import os
import platform
import shlex
import sys
import textwrap
import time
import traceback
from typing import Callable
from typing import TYPE_CHECKING
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
import psutil
from schrodinger.job import jobcontrol
from schrodinger.job import queue
from schrodinger.utils import log
from schrodinger.utils import subprocess
from schrodinger.utils.env import EnvironmentContext
from . import common
from . import client
from . import constants
from . import process_management
from . import sysinfo
from . import testscripts
from . import workup
from .joberrors import run_postmortem
if TYPE_CHECKING:
    from . import base_executable  # circular import
logger = common.logger
# Globals
DOWNLOAD_FAILED = 'file download failed'
SCHRODINGER_LICENSE_CHECKOUTS = 'SCHRODINGER_LICENSE_CHECKOUTS'
SCHRODINGER_STU_TEST_ID = 'SCHRODINGER_STU_TEST_ID'
STDOUT_FILENAME = 'stdout.txt'
[docs]class JobDJError(RuntimeError):
    pass 
# ****************************************************************************
[docs]class TestJob:
    """
    Interface for the TestJCJob and TestSPJob classes
    """
    @property
    def duration(self) -> Optional[int]:
        return self._duration
    @property
    def exit_status(self) -> str:
        """
        Returns an exit status from the list of available Jobcontrol exit
        statuses.
        - For TestJCJobs this comes from job.ExitStatus, which will return
          a Jobcontrol exit status (from Job.pm in MMSHARE_EXEC) if a job
          object (i.e. schrodinger.job.jobcontrol.job) is found. If a job
          object is not found, it could be because the job is not started or
          it failed to launch. These special cases are handled by setting
          the initial exit status to "Job not started" and resetting it to
          "Falied to launch" if the job fails to launch (see
          TestJCJob.docommand).
        - For TestSPJobs a we limit the available exit statuses to "finished",
          "died", and "killed" because there is no job object (i.e.
          schrodinger.job.jobcontrol.job) associated with TestSPJobs. Like
          TestJCJobs the initial exit status is set to "Job not started" and is
          reset it to "Failed to launch" if the job fails to launch (see
          TestSPJob.docommand).
        """
        raise NotImplementedError("Needs to be defined in job specific class")
    @property
    def test_id(self) -> int:
        return self._test_id
    @property
    def canceled_by_timeout(self) -> bool:
        """
        Return True only if the job was cancelled by JobDJ, this mostly due
        to timeout.
        """
        # this variable comes from BaseJob class
        return self.abort_job
[docs]    @contextlib.contextmanager
    def setupTestEnvironment(self, host: str = 'localhost'):
        """
        Set environment variables used during testing.
        SCHRODINGER_LICENSE_CHECKOUTS to a file in the directory: SHARED-2727
        SCHRODINGER_STU_TEST_ID to current test. (SHARED-3352)
        Remove TOPLEVEL_HOST_ARGS for Jaguar. (SHARED-4089)
        """
        # Only set SCHRODINGER_LICENSE_CHECKOUTS if the job runs locally.  For
        # remote jobs, this record of licenses is not guaranteed to be
        # consistent. See SHARED-2727.
        # Also skip license testing if the test is tagged with the skip license
        # testing tag.
        if use_license_checking(self._test):
            # Already in the test execution directory.
            license_check_file = os.path.abspath(constants.LICENSE_CHECK_FILE)
            if os.path.exists(license_check_file):
                os.remove(license_check_file)
        else:
            license_check_file = None
        with EnvironmentContext(SCHRODINGER_STU_TEST_ID, str(self.test_id)):
            with EnvironmentContext(SCHRODINGER_LICENSE_CHECKOUTS,
                                    license_check_file):
                yield 
[docs]    def infoStatus(self, status: str):
        self._jobdj.printStatus(self, status) 
[docs]    def debugStatus(self, status: str):
        status = self._jobdj.formatStatus(self, status)
        queue.logger.debug(status) 
[docs]    def warnStatus(self, status: str):
        status = self._jobdj.formatStatus(self, status)
        queue.logger.warning(status)  
# ****************************************************************************
[docs]class TestJCJob(queue.JobControlJob, TestJob):
    """
    Like a normal JobControlJob, but::
        - Ignore failures to launch.
        - Be aware of scriptID.
        - Easily access job duration and exit status.
    """
[docs]    def __init__(self,
                 command: List[str],
                 command_dir: Optional[str],
                 test_id: Union[int, str] = None,
                 test: testscripts.TestScript = None,
                 timeout: Optional[int] = None,
                 runs_locally: bool = False,
                 **kwargs):
        """
        Overridden to add the test_id.
        :param command: Command to be run.
        :param command_dir: Directory to run in.
        :param test_id: Unique identifier of script.
        :param test: The representation of all test data for the job that is
                being run.
        :param timeout: Duration in seconds after which to kill the job. None
                is never.
        :param runs_locally: Should this job be launched on localhost (never
                remote hosts)?
        """
        self._test_id = test_id
        """The test number in the database (or the directory name in validate
        mode."""
        if not test_id:
            self._test_id = os.path.basename(command_dir)
        # The STU test object
        self._test = test
        # Set initial value to be pulled by exit_status if a job has not been
        # launched
        self._exit_status = constants.JOB_NOT_STARTED
        # Job dies after this period.
        self.timeout = timeout
        self._runs_locally = runs_locally
        super().__init__(command=command,
                         command_dir=command_dir,
                         timeout=timeout,
                         **kwargs)
        self.name = kwargs.get('name', 'STU#%s' % self._test_id)
        self.exceptions_caught = 0 
[docs]    def doCommand(self, host: str = 'localhost', *args, **kwargs):
        """
        Overridden to ignore errors.  Executes the command described by
        self._command.  The parent class has two required arguments, but has
        the call signature `doCommand(*args, **kwargs)`, hence its usage here.
        """
        self.debugStatus('prelaunch')
        try:
            with self.setupTestEnvironment():
                with add_jobcontrol_handler(STDOUT_FILENAME):
                    super().doCommand(host, *args, **kwargs)
        except Exception as err:
            self.state = queue.JobState.FAILED
            queue.logger.exception(" Jobcontrol error message: %s" % err)
            with open(STDOUT_FILENAME, 'a') as stdout:
                stdout.write(f'{err}\n')
        else:
            with open(STDOUT_FILENAME, 'a') as stdout:
                if self.launch_error:
                    stdout.write(f'{self.launch_error}\n')
        self._exit_status = None
        self.host = host 
[docs]    def runsLocally(self) -> bool:
        "Force the test to be run on the host on which the JobDJ is running."
        return self._runs_locally 
    @property
    def duration(self) -> Optional[int]:
        """
        Duration of the job, according to the job record. Implemented as a
        property to provide consistent interface with `TestSPJob`. Also gives
        duration for RUNNING jobs, so as to be consistent with `TestSPJob`.
        """
        job = self.getJob()
        # Still running
        if job and job.StartTime and not job.StopTime:
            current = time.time()
            start = time.mktime(
                time.strptime(job.StartTime, jobcontrol.timestamp_format))
            return current - start
        # Not started or complete: returns None or the duration value.
        return self.getDuration()
    @property
    def exit_status(self) -> str:
        """
        Exit status of the job, according to the job record. Implemented as a
        property to provide consistent interface with `TestSPJob`.
        """
        # Not simply returning self._getState() because we want a consistent
        # exit status across jobcontrol and non jobcontrol jobs.
        job = self.getJob()
        if job and not self._exit_status:
            try:
                return job.ExitStatus
            except RuntimeError as err:
                if 'stranded' in str(err):
                    return 'stranded'
                if 'exited' in str(err):
                    return 'exited'
                raise
        else:
            return self._exit_status
    @exit_status.setter
    def exit_status(self, value: str):
        self._exit_status = value 
# ***************************************************************************
[docs]class TestSPJob(queue.SubprocessJob, TestJob):
    """
    Like a normal SubprocessJob job, but:
    - Ignore failures to launch.
    - Kill subjobs when killing this job.
    - Be aware of scriptID.
    - Access job duration and status.
    """
[docs]    def __init__(self,
                 command: List[str],
                 command_dir: Optional[str] = None,
                 test_id: Union[int, str] = None,
                 test: testscripts.TestScript = None,
                 timeout: Optional[int] = None,
                 **kwargs):
        """
        Overridden to add the test_id.
        :param command: Command to be run.
        :param command_dir: Directory to run in.
        :param test_id: Unique identifier of script.
        :param test: The representation of all test data for the job that is
                being run.
        :param timeout: Duration in seconds after which to kill the job. None
                is never.
        """
        self._test_id = test_id
        """The test number in the database (or the directory name in validate
        mode."""
        if not test_id:
            self._test_id = os.path.basename(command_dir)
        # The STU test object
        self._test = test
        # Set initial value to be pulled by exit_status if a job has not been
        # launched
        self._exit_status = constants.JOB_NOT_STARTED
        super().__init__(command=command,
                         command_dir=command_dir,
                         timeout=timeout,
                         **kwargs)
        self.name = kwargs.get('name', 'STU#%s' % self._test_id) 
[docs]    def preCommand(self, *args, **kwargs):
        """
        Overridden to open standard files for recording standard error and
        standard out.  Also marks the start time of the job.
        """
        super().preCommand(*args, **kwargs)
        # Do this AFTER super, so that the files are opened in the correct
        # directory
        self._stdout = open(STDOUT_FILENAME, 'w')
        self._stderr = subprocess.STDOUT
        self.addFinalizer(TestSPJob.cleanUp)
        self._start = time.time() 
[docs]    def doCommand(self, *args, **kwargs):
        """
        Overridden to ignore errors.  Executes the command described by
        self._command.
        """
        self.debugStatus('prelaunch')
        try:
            with self.setupTestEnvironment():
                super().doCommand(*args, **kwargs)
        except Exception as err:
            self.state = queue.JobState.FAILED
            self._exit_status = constants.FAILED_TO_LAUNCH
            queue.logger.exception(" Subprocess error message: %s" % err) 
[docs]    def update(self):
        """
        Overridden to make errors non fatal.
        """
        try:
            super().update()
        except Exception as err:
            self.state = queue.JobState.FAILED
            queue.logger.exception(" Subprocess error message: %s" % err) 
[docs]    def cleanUp(self):
        """
        Close standard files for recording standard error and standard out.
        """
        # Force data to be moved from OS buffer to disk. Required on Windows.
        if not self._stdout.closed:
            self._stdout.flush()
            os.fsync(self._stdout.fileno())
            self._stdout.close() 
[docs]    def getStatusStrings(self) -> Tuple[str, str, str]:
        """
        Return a tuple of status strings for printing by `JobDJ`.
        :return: (status, jobid, host)
        """
        if self.hasExited():
            status_string = self.exit_status
        else:
            status_string = "unknown"
        jobid = '  [none]'
        host = platform.node()
        return status_string, jobid, host 
    @property
    def exit_status(self) -> str:
        """
        Exit status of the job, according to subprocess. Implemented as a
        property to provide consistent interface with `TestJCJob`.
        """
        if self.state == queue.JobState.DONE:
            if self.abort_job:
                return 'killed'
            return queue.FINISHED
        elif self.state in (queue.JobState.FAILED,
                            queue.JobState.FAILED_RETRYABLE):
            if self._exit_status == constants.FAILED_TO_LAUNCH:
                return self._exit_status
            else:
                return 'died'
        else:
            return self._exit_status
[docs]    def kill(self):
        """First, kill children, then myself"""
        logger.warning(f" Killing {self.test_id}")
        try:
            test_process = psutil.Process(self._subprocess.pid)
            logger.warning("status of {self.test_id}: "
                           " ".join(test_process.cmdline()))
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            # Don't worry about processes that have already ended.
            # Windows: NoSuchProcess, Mac AccessDenied
            self.cleanUp()
            if self.state == queue.JobState.ACTIVE:
                self.state = queue.JobState.DONE
            return
        status = process_management.format_process_status(test_process)
        logger.warning(status)
        self._stdout.write("process status at kill point:\n" + status)
        self._stdout.flush()
        messages = process_management.kill_launched_jobcontrol_jobs(
            self.test_id)
        if messages:
            logger.warning(messages)
            self._stdout.write(messages)
            self._stdout.flush()
        process_management.kill_process_children(test_process)
        super().kill()
        try:
            if test_process.is_running():
                test_process.kill()
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            # Windows: Don't worry about processes that have already ended.
            pass
        self.cleanUp()  
[docs]class TestQueue(queue.JobDJ):
    """
    Like a normal JobDJ, but:
    - Print the script ID at status points
    - Run workups
    """
[docs]    def __init__(self,
                 hosts: Optional[List[Tuple[str, int]]] = None,
                 verbosity: str = 'quiet',
                 timeout: Optional[int] = None):
        update_delay = min(5, timeout or 5)
        super().__init__(hosts=hosts,
                         verbosity=verbosity,
                         max_retries=0,
                         max_failures=queue.NOLIMIT,
                         update_delay=update_delay,
                         job_class=TestJCJob)
        self.disableSmartDistribution()
        self.timeout = timeout 
[docs]    def printStatus(self,
                    job: Optional[TestJob] = None,
                    action: Optional[str] = None):
        if job is None:
            self.printHeader()
            return
        status = self.formatStatus(job, action)
        if action in ('launched', 'started'):
            queue.logger.warning(status)
        else:
            queue.logger.info(status) 
    def _setLoggerVerbosity(self):
        if self._verbosity == "quiet":
            queue.logger.setLevel(log.WARNING)
        elif self._verbosity == "normal":
            queue.logger.setLevel(log.INFO)
        elif self._verbosity == "verbose":
            queue.logger.setLevel(log.DEBUG)
        elif self._verbosity == "silent":
            queue.logger.setLevel(log.CRITICAL)
        else:
            self._verbosity = "quiet"
            queue.logger.setLevel(log.WARNING)
    def _start(self, **kwargs):
        try:
            self._original_verbosity = self._verbosity
            self._verbosity = "quiet"
            super()._start(**kwargs)
        finally:
            self._verbosity = self._original_verbosity
            self._setLoggerVerbosity()
[docs]    def addJob(self,
               job: TestJob,
               add_connected: bool = True,
               timeout: Optional[int] = None,
               **kwargs):
        if timeout is None:
            timeout = self.timeout
        if isinstance(job, TestJob):
            job.timeout = self.timeout
        return super().addJob(job, add_connected, timeout=timeout, **kwargs) 
    def _jobFailed(self, job: TestJob):
        super()._jobFailed(job)
        try:
            job.cleanUp()
        except AttributeError:
            pass 
# ****************************************************************************
[docs]class Runner:
    """
    Runner controls all job running parameters within the backend test utility
    code.  It is also responsible for actually running the jobs and requesting
    their workups.  The meat is in `Runner.addScript` and `Runner.__call__`.
    """
    # ************************************************************************
[docs]    def __init__(self, ui: "base_executable.TestUtility"):
        """
        Initialize Runner Class
        :param ui: Contains information about the user interface (i.e. the
                command line arguments)
        """
        self.additionalArgs = ui.additionalArgs
        self.verbosity = ui.verbosity
        self.ncpu = ui.ncpu
        self.postmortem = getattr(ui, 'postmortem', False)
        self.timeout = getattr(ui, 'timeout', None)
        self.reporter = getattr(ui, 'reporter', None)
        # keys off test_id, values are testscripts.TestScript
        self.tests = {}
        self.job_runner = TestQueue(verbosity=ui.verbosity,
                                    timeout=self.timeout)
        self.job_runner.disableSmartDistribution()
        self.xvfb_cmd = None 
    # ************************************************************************
[docs]    def addScript(self, test: testscripts.TestScript):
        """
        Add test to be executed by self.__call__.  Adds the test
        information to self.job_runner.
        :param test: Test to be executed.
        """
        test_id = test.id
        try:
            # Should replace this with the attribute of the test, but be
            # careful with validate mode.
            test_id = int(test_id)
        except ValueError:
            test_id = test_id or test.directory
        # This protects against tests that do not have an automate command
        cmd = test.command
        if not cmd:
            logger.warning('Test "%s" has no command, skipping' % test_id)
            return
        if not test.directory and not test_id:
            raise TypeError("I don't know where to run this job (no test "
                            "directory or test id")
        self.tests[test_id] = test
        jobdir = test.directory or str(test_id)
        jobdir = os.path.join(os.getcwd(), jobdir)
        # Add additional args to the cmd
        cmd += ' ' + self.additionalArgs
        # use shlex for shell like parsing
        # makes for standard space/quote protection
        cmd_list = []  # initial list for prepped commands to be added to
        # make tests that require display to run with xvfb on Linux
        if "require:display" in test.tags and sys.platform.startswith(
                "linux") and 'DISPLAY' not in os.environ:
            # Determine the appropriate xvfb invocation (see get_xvfb_cmd for
            # an explanation) and cache it on first use because the version of
            # xvfb is unlikely to change between commands.
            if not self.xvfb_cmd:
                self.xvfb_cmd = get_xvfb_cmd()
            cmd_list = self.xvfb_cmd + cmd_list
        try:
            for i_cmd in shlex.split(cmd):
                # Replace keywords in cmd with run specific values
                i_cmd = i_cmd.replace("${NCPU}", str(self.ncpu))
                i_cmd = i_cmd.replace("${HOST}", sysinfo.REMOTE.host)
                i_cmd = i_cmd.replace("${CWD}", jobdir)
                i_cmd = i_cmd.replace("${SHARED}",
                                      os.path.join(os.getcwd(), "shared"))
                i_cmd = i_cmd.replace('$SCHRODINGER', os.getenv('SCHRODINGER'))
                i_cmd = i_cmd.replace('${SCHRODINGER}',
                                      os.getenv('SCHRODINGER'))
                # Add element of cmd to list which will be passed to JC or SP
                cmd_list.append(i_cmd)
        except Exception:
            logger.exception(
                f"WARNING: Failed to parse command string for {test_id}")
        # amend the command line for jobs that need DRIVERHOST to be specified
        # (this will only be done when a remote host is specified)
        if "require:driverhost" in test.tags:
            cmd_list.append("-DRIVERHOST")
            cmd_list.append(sysinfo.REMOTE.host)
        # Need ${SLASH} keyword to workaround QA-618
        cmd_list = [c.replace("${SLASH}", "/") for c in cmd_list]
        if test.useJC():
            runs_locally = not test.runsRemotely()
            jobdict = dict(command_dir=jobdir,
                           test_id=test_id,
                           test=test,
                           runs_locally=runs_locally)
            if "${NCPU}" not in cmd:
                jobdict['procs'] = self.ncpu
            job = TestJCJob(cmd_list, **jobdict)
        else:
            job = TestSPJob(cmd_list,
                            command_dir=jobdir,
                            test_id=test_id,
                            test=test)
        self.job_runner.addJob(job)
        return job 
    # ************************************************************************
    def __call__(self) -> bool:
        """
        Execute and run workups on all tests added to instance of Runner
        object.  Outcomes stored in the TestScript objects in self.tests.
        This needs to be separate from the extraction step so that we can
        validate new tests.
        """
        if not self.job_runner.total_added:
            return True
        logger.debug("Executing tests.")
        self.problems = []
        workups = workup.discover_workups()
        try:
            self.job_runner.run(status_change_callback=functools.partial(
                self.jobStatusChange, workups))
        except Exception as err:
            msg = 'ERROR: Failure in executing jobs, Killing jobs: %s' % err
            logger.exception(msg)
            self.job_runner.killJobs()
            raise JobDJError(msg)
        if self.problems:
            msg = ('ERROR: Critical problems during test execution:\n  %s' %
                   '\n  '.join(self.problems))
            raise JobDJError(msg)
        return True
[docs]    def jobStatusChange(self, workups: Dict[str, Callable], job: TestJob):
        if job.state in {
                queue.JobState.FAILED, queue.JobState.FAILED_RETRYABLE
        }:
            if job.launch_error:
                job._exit_status = constants.FAILED_TO_LAUNCH
                with open(os.path.join(job.getCommandDir(), STDOUT_FILENAME),
                          'a') as stdout:
                    stdout.write(f'{job.launch_error}\n')
        elif job.state == queue.JobState.ACTIVE:
            job._exit_status = None
        if not job.hasExited() or self.tests[job.test_id].outcome is not None:
            return
        test = self.tests[job.test_id]
        _update_test_script_on_job_completion(test, job)
        self.problems.extend(
            run_workup(test, job, workups, self.postmortem, self.reporter))  
def _update_test_script_on_job_completion(test: testscripts.TestScript,
                                          job: TestJob):
    """
    After running a job, the test object needs to be updated to include
    additional information.
    """
    # Timing is -1 to distinguish between jobs that round to zero time, and
    # this value is serialized to DB as an integer
    if job.duration is None:
        test.timing = -1
    else:
        test.timing = job.duration
    test.exit_status = job.exit_status
[docs]def run_workup(test: testscripts.TestScript, job: TestJob,
               workups: Dict[str, Callable], generate_postmortem: bool,
               reporter: Optional[client.ResultReporter]) -> List[str]:
    """
    Run workup code and report results.
    If an upload failed or an unexpected error occurred (a workup failure is
    considered expected, since it can be reporter upon), then return a list
    of str error messages.
    """
    problems = []
    # Run workup and record results
    try:
        job.debugStatus('start workup')
        test.runWorkup(job, registered_workups=workups)
        if test.outcome:
            job.infoStatus('workup succeeded')
        else:
            job.warnStatus('workup failed')
    # This should be protected in workup_outcome, instead.
    except Exception as err:
        msg = 'Failure in workup: %s' % err
        logger.exception(msg)
        job.infoStatus('FAILURE EXECUTING WORKUP')
        test.outcome = False
        if not test.workup_messages:
            test.workup_messages = msg
    finally:
        jc_exit_status = constants.JC_outcome_codes.get(test.exit_status, False)
        # If the exit status was unexpected, run a postmortem.
        # Also, all KNIME tests with a failing workup should
        # include a postmortem.
        if generate_postmortem and not test.outcome:
            if not jc_exit_status or test.product == 'KNIME':
                try:
                    run_postmortem(job, test.product)
                except Exception:
                    job.warnStatus("failed to run postmortem")
                    msg = f"Reporting failed for test {job.test_id}"
                    logger.exception(msg)
                    problems.append(f"{msg}: {traceback.format_exc()}")
        if reporter:
            try:
                reporter.report(test)
                job.debugStatus('reported result')
            except Exception as err:
                job.warnStatus('failed to report result')
                msg = 'Reporting failed for test %s' % job.test_id
                logger.exception(msg)
                print(msg + ': ' + str(err))
                problems.append(msg + ': ' + str(err))
    return problems 
[docs]def get_xvfb_cmd() -> List[str]:
    """
    xvfb-run needs the -a (auto server number) option, except on CentOS 7 (and
    possibly other OSes) where that option is superceded by the -d (auto
    display) option. As a bonus, the long option --auto-display is shown in the
    help on CentOS 7 but does not actually work. Furthermore, xvfb-run does not
    have a --version flag which could be used to reason about supported options.
    To determine what flags are supported, try to run a no-op command with
    `xvfb-run -d`, and if that fails, use -a.
    """
    xvfb_cmd = ['xvfb-run', '-d']
    try:
        subprocess.run(
            xvfb_cmd + [':'],
            check=True,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
        )
    except subprocess.CalledProcessError:
        xvfb_cmd = ['xvfb-run', '-a']
    return xvfb_cmd 
[docs]def use_license_checking(test: testscripts.TestScript) -> bool:
    """
    Return True if license checking should be enabled for this test run.
    """
    # in production test is not None, but is sometimes None in unittest
    if not test:
        return True
    elif constants.SKIP_LICENSE_TAG in test.tags:
        return False
    elif "require:specific_host" in test.tags:
        return False
    elif sysinfo.REMOTE.name != sysinfo.LOCAL.name:
        return False
    return True 
[docs]@contextlib.contextmanager
def add_jobcontrol_handler(filename: str):
    """
    Within a context, modify jobcontrol logger to log messages to
    a particular file.
    :param filename: name of file to log jobcontrol launch messages to
    """
    handler = logging.FileHandler(filename)
    original_level = jobcontrol.logger.getEffectiveLevel()
    jobcontrol.logger.setLevel(logging.INFO)
    jobcontrol.logger.addHandler(handler)
    try:
        yield
    finally:
        jobcontrol.logger.removeHandler(handler)
        jobcontrol.logger.setLevel(original_level)
        handler.close()