"""
Fixtures that are encoded into pytest and run as part of each test. These are
never explicitly called and only used by test machinery.
"""
import os
import sys
import threading
import traceback
from unittest import mock
import _pytest.monkeypatch
import pytest
from schrodinger import get_maestro
from schrodinger.infra import jobhub
from schrodinger.infra import mm
from schrodinger.Qt import QtCore
from schrodinger.tasks import jobtasks
from schrodinger.tasks import tasks
from schrodinger.utils.sysinfo import is_display_present
from schrodinger.utils import mmutil
from schrodinger.utils import qapplication
from schrodinger.utils import qt_utils
from . import reporter
from .startup import SchrodingerIniOptions
ORIGINAL_DIR = os.getcwd()
_QAPP = None
[docs]@pytest.fixture(autouse=True)
def patch_os_environ():
"""
Some tests modify os.environ. We don't want to allow that to cause test-test
interactions
"""
with mock.patch.dict(os.environ):
yield
[docs]@pytest.fixture(autouse=True)
def ensure_maestro_idempotent():
orig_setting = get_maestro()
yield
if orig_setting != get_maestro():
raise RuntimeError("This test modified the behavior of get_maestro() "
"function without restoring it, original setting "
f"was {orig_setting}")
[docs]@pytest.fixture(autouse=True)
def ensure_JOB_SERVER_idempotent():
orig_setting = mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER)
yield
if orig_setting != mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER):
raise RuntimeError("This test modified the value of the JOB_SERVER "
"featureflag without restoring it, original setting "
f"was {orig_setting}")
[docs]@pytest.fixture(autouse=True)
def ensure_LAST_EXCEPTION_cleared():
yield
assert qt_utils.LAST_EXCEPTION is None
[docs]@pytest.fixture(autouse=True)
def disable_multiprocessing(monkeypatch):
"""
Disable multiprocessing.Pool in all tests. This was causing memory issues
in certain cases. Any test that tries to access multiprocessing.Pool will
get an AttributeError when run.
"""
def raise_multiprocessing_error(*args, **kwargs):
msg = 'multiprocessing.Pool is disabled for all python tests. See '\
'SHARED-5429 for full details.'
raise AttributeError(msg)
monkeypatch.setattr('multiprocessing.Pool', raise_multiprocessing_error)
[docs]@pytest.fixture(autouse=True)
def assert_mmffld_terminated(request):
"""
For any future instantiations of different force fields (that is, with
different custom default or custom parameters), mmffld is required to be
fully terminated. The fixture here confirms that the global mmffld ref
count has been brought back down to 0 at the end of each test.
"""
try:
yield
finally:
if mm.mmffld_refcount():
# terminate so other tests have clean slate
while mm.mmffld_refcount():
mm.mmffld_terminate()
assert False, "mmffld not properly terminated after test"
[docs]@pytest.fixture(scope='session', autouse=True)
def disable_show():
"""
Change show() to a no-op for common top-level windows to reduce number of
widgets that appear while running tests.
"""
if not is_display_present():
yield
return
monkeypatch = _pytest.monkeypatch.MonkeyPatch()
for patch_path in (
'schrodinger.ui.qt.appframework2.baseapp.BasePanel.show',
'schrodinger.ui.qt.basewidgets.Panel.show',
'schrodinger.Qt.QtWidgets.QDockWidget.show',
'schrodinger.Qt.QtWidgets.QDialog.show',
'schrodinger.Qt.QtWidgets.QMainWindow.show',
):
monkeypatch.setattr(patch_path, lambda _: None)
yield
monkeypatch.undo()
[docs]@pytest.fixture(scope='session', autouse=True)
def disable_qmessagebox():
"""
Change QMessageBox to raise RuntimeError so they won't hang the
test harness opaquely.
"""
if not is_display_present():
yield
return
monkeypatch = _pytest.monkeypatch.MonkeyPatch()
def raise_qmessagebox_error(parent, title, msg, *args, **kwargs):
raise RuntimeError(f'QMessageBox raised {title}:{msg}')
def raise_messagebox_error(obj):
title = obj._title
if sys.platform.startswith("darwin"):
msg = obj.informativeText()
else:
msg = obj.text()
raise RuntimeError(f'MessageBox raised {title}:{msg}')
def raise_filedialog_error(obj):
raise RuntimeError('FileDialog shown')
def raise_exec_error(obj):
class_name = type(obj).__name__
raise RuntimeError(f"{class_name} exec'd: {obj.text()}")
def raise_dialog_exec_error(obj):
class_name = type(obj).__name__
raise RuntimeError(f"{class_name} exec'd")
monkeypatch.setattr('schrodinger.Qt.QtWidgets.QMessageBox.critical',
raise_qmessagebox_error)
monkeypatch.setattr('schrodinger.Qt.QtWidgets.QMessageBox.information',
raise_qmessagebox_error)
monkeypatch.setattr('schrodinger.Qt.QtWidgets.QMessageBox.question',
raise_qmessagebox_error)
monkeypatch.setattr('schrodinger.Qt.QtWidgets.QMessageBox.warning',
raise_qmessagebox_error)
monkeypatch.setattr('schrodinger.Qt.QtWidgets.QMessageBox.exec',
raise_exec_error)
monkeypatch.setattr('schrodinger.Qt.QtWidgets.QDialog.exec',
raise_dialog_exec_error)
monkeypatch.setattr(
'schrodinger.ui.qt.messagebox.MessageBox._pytest_abort_hook',
raise_messagebox_error)
monkeypatch.setattr(
'schrodinger.ui.qt.filedialog.FileDialog._pytest_abort_hook',
raise_filedialog_error)
yield
monkeypatch.undo()
[docs]@pytest.fixture(scope='session', autouse=True)
def prevent_jobdirdlg_jobhub_connection():
"""
Prevent jobdirdlg from connecting to the jobhub callback. Connecting to this
callback in multiple unit tests causes issues on Windows builds due to the
short periodicity of the callback.
"""
if not is_display_present():
# Avoid performing the mock when there are no graphics libraries. In
# that case, the module won't ever (and can't) be imported anyway.
yield
return
with mock.patch('schrodinger.application.matsci.jobdirdlg.JobManager.'
'connectToJobHub'):
yield
[docs]@pytest.fixture(scope='session', autouse=True)
def use_mmpref_sandbox(tmpdir_factory):
"""
Sets preferences (backed by QSettings) to a unique file. Allows
parallel xdist tests to not stomp on each other.
"""
pref_dir = tmpdir_factory.mktemp("pref")
os.environ["SCHRODINGER_MMPREF_SANDBOX"] = str(pref_dir)
# Force af2 to always reread this value
if is_display_present():
import schrodinger.ui.qt.appframework2.settings
schrodinger.ui.qt.appframework2.settings._preference_handler = None
def _get_unittest_task_wait_time():
"""
Get the default timeout time for tests
We define this in a method so we can mock and test it.
"""
return 60 * 15 # 15 minutes
[docs]@pytest.fixture(scope='session', autouse=True)
def task_wait_with_default_timeout():
"""
Set a default timeout when waiting on tasks in tests. If the default
timeout is hit, then raise an error.
"""
original_wait = tasks._wait
def mock_wait(task, timeout=None):
raise_on_timeout = timeout is None
if timeout is None:
timeout = _get_unittest_task_wait_time()
task_completed = original_wait(task, timeout)
if not task_completed and jobtasks.is_jobtask(task):
task.stop()
if raise_on_timeout and not task_completed:
raise tasks._TaskTestTimeout(
"Waited-for task did not complete in unit test.")
return task_completed
with mock.patch.object(tasks, '_wait', mock_wait):
yield
[docs]@pytest.fixture(scope='session', autouse=True)
def task_cleanup_tmpdirs():
"""
Clean up all tempdirs created by tasks.
"""
yield
for tmpdir in tasks.AbstractTask._all_task_tempdirs:
tmpdir.cleanup()
[docs]@pytest.fixture(autouse=True)
def fail_on_running_tasks():
"""
Global fixture that checks to make sure there are no tasks still running
after each unit test completes. If there are, this fixture will attempt to
kill/stop the tasks before raising an exception.
For tests that need to mock task.run, see the fixture `mock_task_helper`.
:raises RuntimeError: If any tasks are running at the end of the unittest
"""
started_tasks = []
real_start = tasks.AbstractTask.start
def store_task_reference(self, *args, **kwargs):
ret = real_start(self, *args, **kwargs)
if self.status is self.RUNNING:
started_tasks.append(self)
return ret
with mock.patch("schrodinger.tasks.tasks.AbstractTask.start",
store_task_reference):
yield
leaked_tasks = []
for task in started_tasks:
try:
if task.status is task.RUNNING:
leaked_tasks.append(task)
if jobtasks.is_jobtask(task):
task.stop()
else:
task.kill()
except Exception:
# We let any exception go because the top priority is attempting to
# kill/stop all tasks
pass
if leaked_tasks:
raise RuntimeError(
f"Leaked {len(leaked_tasks)} tasks: {leaked_tasks[0]}...")
[docs]@pytest.fixture(scope='session', autouse=True)
def disallow_mock_in_swig(request):
if request.config.getini(SchrodingerIniOptions.DISALLOW_MOCK_IN_SWIG.value):
from schrodinger.infra import mmcheck
with mock.patch.object(mmcheck, 'should_check_for_mock', True):
yield
else:
yield
[docs]@pytest.fixture(scope='session', autouse=True)
def delete_job_manager(request):
try:
yield
finally:
if not sys.platform.startswith("darwin"):
jobhub.delete_job_manager()
[docs]@pytest.fixture
def force_no_run_in_dir(pytestconfig):
"""
Change back to the original working directory. The cwd is modified by
the stupid run-in-dir option to our pytest for mmshare code (to be removed
in PYTHON-3408), but this fixture this overrides that.
"""
if pytestconfig.getvalue('run_in_dir'):
test_dir = os.getcwd()
os.chdir(ORIGINAL_DIR)
yield
os.chdir(test_dir)
else:
yield
[docs]def check_current_thread_is_main_thread():
"""
Raise an exception if the current thread is not the main thread
"""
main_thread = threading.main_thread().ident
current_thread = threading.get_ident()
if main_thread != current_thread:
raise RuntimeError(f"{main_thread=} is not {current_thread=} "
"which will result in crashes on macOS")
[docs]def start_qapp(config):
"""Start a qapp in this process."""
# This must be started before any test is run.
# If it is not, the qapp will only be triggered once py.test gets to the
# first test that needs it. If this happens to be after the jobcontrol
# tests, then a QCoreApplication(non-GUI) will exist, and all GUI tests
# will crash
#
# It must also be run outside the fixture mechanism, because fixtures are
# run after unittest.TestCase.setUpClass
global _QAPP
_QAPP = QtCore.QCoreApplication.instance()
if not _QAPP:
if sys.platform == 'darwin':
check_current_thread_is_main_thread()
use_qtcore_app = config.option.no_display or not is_display_present()
_QAPP = qapplication.get_application(use_qtcore_app=use_qtcore_app)
def mock_exit():
raise RuntimeError('Must not exec/quit application in unit tests')
_QAPP.exit = mock_exit
_QAPP.quit = mock_exit
_QAPP.exec = mock_exit
# A repo where none of the tests will use a Python job manager can override
# `pytest_configure` to set `config.ignore_job_manager = True`
if not getattr(config, 'ignore_job_manager', False):
# job manager requires a qapplication to be created,
# we want to be first in and create quick(er) polling
# for a test
jobhub.setup_job_manager(200) # millisecond polling
return _QAPP
[docs]def runtest_setup(item):
"""
Execute each test in the directory where the test file lives.
Capture exceptions that occur outside of the main thread (for instance
in event loops).
"""
monkeypatch = _pytest.monkeypatch.MonkeyPatch()
monkeypatch.setattr(QtCore, 'QTimer', Timer)
setattr(item, "_monkeypatch", monkeypatch)
if item.config.getvalue('run_in_dir'):
test_directory = os.path.dirname(str(item.fspath))
os.chdir(test_directory)
# It is not necessary to switch back to the original directory,
# because each test changes directory before execution.
# Reset any leaky LAST_EXCEPTION status between test items, so all tests
# do not fail. If this is line is needed, as in previously
# ensure_LAST_EXCEPTION_cleared failed, it means future test results might
# be contaminated.
qt_utils.LAST_EXCEPTION = None
# Start a qapp before any other fixtures or setup. In particular, this
# can't be a fixture because fixtures are run after
# unittest.TestCase.setUpClass
start_qapp(item.config)
# Capture event loop exceptions
setattr(item, 'exceptions_found', [])
def exception_recorder(typ, value, tb):
# print traceback in case we are running without buffered output, so
# we see this exception
traceback.print_exception(typ, value, tb)
item.exceptions_found.append((typ, value, tb))
setattr(item, 'original_excepthook', sys.excepthook)
sys.excepthook = exception_recorder
ALL_TIMERS = []
[docs]class Timer(QtCore.QTimer):
"""
Subclass of QTimer that stores all timers so they can be stopped at the end
of each test
"""
[docs] @classmethod
def singleShot(cls, msec, slot):
timer = cls()
timer._started_from_singleShot_classmethod = True
timer.setSingleShot(True)
if msec < 2000:
# Reproduce native behavior
timer.setTimerType(QtCore.Qt.TimerType.PreciseTimer)
timer.setInterval(msec)
timer.timeout.connect(slot)
timer.start()
[docs] @QtCore.pyqtSlot(int)
@QtCore.pyqtSlot()
def start(self, *args, **kwargs):
ALL_TIMERS.append(self)
super().start(*args, **kwargs)
[docs]def stop_all_timers():
"""
Stop all running `Timer` objects
"""
while ALL_TIMERS:
timer = ALL_TIMERS.pop()
try:
if not timer.isActive():
continue
timer.stop()
if getattr(timer, '_started_from_singleShot_classmethod', False):
singleshot = 'QTimer.singleShot '
else:
singleshot = ''
print(f'Stopped {singleshot}timer of interval {timer.interval()}',
file=sys.stderr)
except Exception:
traceback.print_exc()
[docs]def runtest_teardown(item, nextitem):
stop_all_timers()
if _QAPP:
_QAPP.processEvents()
stop_all_timers()
reporter.log_exceptions(item)
monkeypatch = getattr(item, '_monkeypatch', None)
if monkeypatch is not None:
monkeypatch.undo()