"""
A dialog for obtaining a job directory of a previously submitted job
Copyright Schrodinger, LLC. All rights reserved.
"""
import os.path
import time
from schrodinger.application.matsci import jobutils
from schrodinger.application.matsci import mswidgets
from schrodinger.infra import jobhub
from schrodinger.Qt import QtCore
from schrodinger.Qt import QtGui
from schrodinger.Qt import QtWidgets
from schrodinger.Qt.QtCore import Qt
from schrodinger.ui.qt import filedialog
from schrodinger.ui.qt import swidgets
from schrodinger.ui.qt import utils as qtutils
from schrodinger.utils import fileutils
[docs]def start_job_manager():
"""
Start the job manager so that it begins an asynchronous update of the job
list in the background. It is best to do this as soon as possible in a new
process because obtaining the job list may take some time after the first
access of the jobhub job manager.
In Maestro, this call doesn't have any affect because Maestro starts the job
manager at startup. But this call does have an effect for panels run from
the command line.
This call cannot be done on import of this module because a QApplication
needs to be started first, which often isn't done at the point this module
is imported.
"""
JobManager.getManager()
[docs]class JobManager(QtCore.QObject):
""" A class for managing job information and alerts when new jobs start """
_singleton = None
TIME_FORMAT = '%a %b %d %I:%M:%S %p'
UPDATED_TEXT = 'Last updated: {utime}'
NEW_DATA_TEXT = '<font color=green>New jobs available</font>'
UPDATING_TEXT = 'Updating job information...'
new_data_available = QtCore.pyqtSignal()
[docs] def __init__(self):
""" Create a JobManager instance """
super().__init__()
self.jobhub_manager = jobhub.get_job_manager()
# Check to see if the job manager knows about jobs yet
# status_text has two states:
# UPDATING_TEXT: If no job information is available yet
# UPDATED_TEXT: The last time new job information was updated
jobs = self.getAllJobs()
if jobs:
self.update_time = self.getTime()
self.status_text = self.UPDATED_TEXT.format(utime=self.update_time)
else:
self.update_time = None
self.status_text = self.UPDATING_TEXT
self.prev_num_jobs = len(jobs)
[docs] def connectToJobHub(self):
""" Connect to the job hub manager's periodic callback """
# This connection is done in a method rather than directly so that it is
# easy to mock in a session fixture. We must prevent actually connecting
# to this signal in unit tests otherwise the number of tests that use
# this combined with the short periodicity of the callback causes
# problems with Windows builds
#
# The session fixture (curretly in
# schrodinger.test.pytest.sessionfixture) mocks this method out
# automatically for the entire test framework so that # unit tests don't
# have to do it individually for every panel that uses this class.
self.jobhub_manager.jobsChanged.connect(self.updateStatus)
[docs] @staticmethod
def getManager(*args, **kwargs):
"""
Get JobManager singleton
Calling this method the first time starts the jobhub manager loading job
information
"""
if not JobManager._singleton:
JobManager._singleton = JobManager(*args, **kwargs)
return JobManager._singleton
[docs] def getTime(self):
"""
Get the current time in user-format
:rtype: str
:return: The current time
"""
return time.strftime(self.TIME_FORMAT)
[docs] def updateStatus(self, jobids):
"""
Callback for the jobhub jobsChanged signal. This gets called
periodically whether any job changed or not.
:param list jobids: The job ids for jobs changing state
"""
if not jobids:
# This function is called as a periodic callback (despite the name
# of the calling signal, it is *not* called only when jobs change.
# Do nothing if no jobids have changed.
return
# Check if there are more jobs available than before
self.update_time = self.getTime()
self.status_text = self.UPDATED_TEXT.format(utime=self.update_time)
num_jobs = len(self.getAllJobs())
if num_jobs > self.prev_num_jobs:
self.new_data_available.emit()
self.prev_num_jobs = num_jobs
[docs] def getAllJobs(self):
"""
Get all jobs in the database
:rtype: view
:return: Each item in the view is a job object for a job in the database
"""
# Note that getJobs can return an empty list of jobs if called early in
# a session before the jobhub manager has updated the jobs list. After
# discussions with the jobcontrol team (Python GChat room 12/7/20) it
# looks like this is just something we'll have to live with.
return self.jobhub_manager.getJobs(jobhub.JobOption.ALL_JOBS).values()
[docs] def getRunningJobIDs(self):
"""
Get all running jobs
:rtype: view
:return: Each item in the view is a job ID for a running job
"""
return self.jobhub_manager.getJobs(jobhub.JobOption.ACTIVE_JOBS).keys()
[docs]class NewJobDirFrame(swidgets.SFrame):
"""
A collection of widgets that reads in the JobDB and puts jobs in a table
that can be selected. It also allows the user to specify a job directory
manually.
"""
[docs] def __init__(self,
master,
layout=None,
dclick_callback=None,
show_subjobs=False):
"""
Create a NewJobDirFrame instance
:type master: QWidget
:param master: The master panel for this frame, should have a warning
method
:type layout: QBoxLayout
:param layout: The layout to place this frame into
:type dclick_callback: callable
:param dclick_callback: The function to call when the user double-clicks
on a table cell. The function is called with 2 arguments,
the first is the cell row, the second is the cell column.
:param bool show_subjobs: If False, only show top-level jobs in the
table. Note that if programs are specified (see setAllowedPrograms),
subjobs of that program type will be shown regardless of this
setting.
"""
self.show_subjobs = show_subjobs
self.manager = JobManager.getManager()
self.manager.new_data_available.connect(self.newDataAlert)
self.master = master
self.programs = set()
swidgets.SFrame.__init__(self, layout=layout)
layout = self.mylayout
self.table_rb = swidgets.SRadioButton('Load from job database',
command=self.loadToggled,
layout=layout,
checked=True)
# Status update
ulayout = swidgets.SHBoxLayout(layout=layout)
self.update_button = swidgets.SPushButton(
'Refresh', layout=ulayout, command=self.loadJobsIntoTable)
self.update_label = swidgets.SLabel(self.manager.status_text,
layout=ulayout)
ulayout.addStretch()
# Job database table
tlayout = swidgets.SHBoxLayout(layout=layout, indent=True)
self.table = QtWidgets.QTableWidget()
table_headers = [
'Job Name', 'Launched', 'Status', 'Program', 'Directory'
]
self.table.setColumnCount(len(table_headers))
self.table.setHorizontalHeaderLabels(table_headers)
self.table.setSelectionBehavior(self.table.SelectRows)
self.table.setSelectionMode(self.table.SingleSelection)
self.table.setSortingEnabled(True)
if dclick_callback:
self.table.cellDoubleClicked.connect(dclick_callback)
self.loadJobsIntoTable()
tlayout.addWidget(self.table)
# Manual directory browsing
self.browse_rb = swidgets.SRadioButton('Manually locate job directory',
command=self.loadToggled,
layout=layout)
blayout = swidgets.SHBoxLayout(layout=layout, indent=True)
self.browse_le = swidgets.SLabeledEdit('Path:',
stretch=False,
layout=blayout)
self.browse_button = swidgets.SPushButton('Browse...',
command=self.browseDirectory,
layout=blayout)
self.loadToggled()
[docs] def newDataAlert(self):
""" Set the text label to alert user there are new jobs available """
self.update_label.setText(self.manager.NEW_DATA_TEXT)
[docs] def reset(self, load_jobs=True):
"""
Reset the entire frame
:type load_jobs: bool
:param load_jobs: Whether the job database should be loaded back into
the table after reset
"""
self.programs = set()
self.resetTable()
self.table_rb.reset()
self.browse_rb.reset()
self.browse_le.reset()
self.loadToggled()
if load_jobs:
self.loadJobsIntoTable()
[docs] def resetTable(self):
"""
Remove all rows from the table
"""
self.table.setRowCount(0)
[docs] def setAllowedPrograms(self, programs):
"""
Set the programs whose jobs are allowed to show up in the table
:type programs: set
:param programs: The strings that show up in the job.Program field for
programs whose jobs should show in the dialog. Use None to show all
active jobs.
"""
self.programs = programs
[docs] def readJobsFromDatabase(self):
"""
Read the jobs from the JobDB database
:rtype: list, list
:return: Two lists. The first contains the completed jobs, the second
currently running jobs. All list items are
`schrodinger.job.jobcontrol.Job` objects and the lists are sorted so
that the newest jobs appear first
"""
# Read in all the jobs in the job database and find the jobs of
# interest - parse them into running and completed
running = []
completed = []
all_jobs = self.manager.getAllJobs()
all_running_job_ids = set(self.manager.getRunningJobIDs())
self.update_label.setText(self.manager.status_text)
if self.programs:
jobs = []
for job in all_jobs:
try:
if job.Program in self.programs:
jobs.append(job)
except AttributeError:
# Some jobs may not have a program set (MATSCI-1713)
pass
else:
if self.show_subjobs:
jobs = all_jobs
else:
jobs = [x for x in all_jobs if not x.ParentJobId]
for ajob in jobs:
try:
exists = os.path.exists(ajob.Dir)
except AttributeError:
# It is possible for jobs to not have a Dir attribute if they
# are not fully fleshed out yet
exists = False
if not exists:
continue
if ajob.JobId in all_running_job_ids:
running.append(ajob)
else:
completed.append(ajob)
# Sort all the jobs by reversed Launch Time
running.sort(reverse=True, key=lambda x: x.LaunchTime)
completed.sort(reverse=True, key=lambda x: x.LaunchTime)
return running, completed
[docs] @qtutils.wait_cursor
def loadJobsIntoTable(self):
"""
Load all the desired jobs from the job database into the table
"""
# Turn off sorting because adding data to a table with sorting on will
# mess up the rows
self.table.setSortingEnabled(False)
self.resetTable()
# Standard item for the table - used to clone new items
base_item = QtWidgets.QTableWidgetItem()
base_item.setFlags(Qt.ItemIsSelectable | Qt.ItemIsEnabled)
def set_table_text(row, column, text, jobid=None):
"""
Put new text in the table in row, column
:type row: int
:param row: The row of the table cell
:type column: int
:param column: the column of the table cell
:type text: str
:param text: The text to insert into row, column
:type jobid: str
:param jobid: The job id of the job for this row
"""
item = QtWidgets.QTableWidgetItem(base_item)
item.setText(text)
if jobid:
item.setData(Qt.UserRole, jobid)
self.table.setItem(row, column, item)
running, completed = self.readJobsFromDatabase()
# Put all the jobs in the table, running first, then completed
for row, ajob in enumerate(running + completed):
self.table.insertRow(row)
set_table_text(row, 0, ajob.Name, jobid=ajob.job_id)
set_table_text(row, 1, ajob.LaunchTime)
set_table_text(row, 2, ajob.Status)
try:
set_table_text(row, 3, ajob.Program)
except AttributeError:
set_table_text(row, 3, "")
set_table_text(row, 4, ajob.Dir)
header = self.table.horizontalHeader()
header.resizeSections(header.ResizeToContents)
self.table.setSortingEnabled(True)
[docs] def loadToggled(self):
"""
Whether to load from the job table or a manual directory has been
toggled - react to that
"""
browsing = self.browse_rb.isChecked()
if browsing:
sranges = self.table.selectedRanges()
for srange in sranges:
self.table.setRangeSelected(srange, False)
self.table.setEnabled(not browsing)
self.browse_le.setEnabled(browsing)
self.browse_button.setEnabled(browsing)
[docs] def getCurrentJobDir(self):
"""
Get the currently selected job directory and the associated job if
applicable
:rtype: (str, `schrodinger.job.jobcontrol.Job` or (str, None) or (None,
None)
:return: The path to the selected job directory and if possible, the
associated Job object. If the user specified a job directory
manually, the Job object will be None. If no job directory has been
specified, the return value is (None, None)
"""
if self.browse_rb.isChecked():
# Manual directory
path = str(self.browse_le.text())
job = None
if not path:
raise RuntimeError('A directory must be specified manually.')
else:
# Read a directory from the job database
path = ""
for item in self.table.selectedItems():
if item.column() == 0:
jobid = str(item.data(Qt.UserRole))
job = self.manager.jobhub_manager.getJob(jobid)
path = job.Dir
break
if not path:
raise RuntimeError('Please select a job in the table to load.')
return path, job
[docs] def browseDirectory(self):
"""
Allow the user to browse to a new directory via file dialog
"""
job_dir = filedialog.get_existing_directory(id='jobdirdlg')
if not job_dir:
return
self.browse_le.setText(job_dir)
[docs] def setWaitCursor(self, app_wide=True):
"""
Set the cursor to the wait cursor. This will be an hourglass, clock or
similar. Call restoreCursor() to return to the default cursor.
If 'app_wide' is True then it will apply to the entire application
(including Maestro if running there). If it's False then it will apply
only to this panel.
Added for the wait_cursor decorator
"""
if app_wide:
QtWidgets.QApplication.instance().setOverrideCursor(
QtGui.QCursor(QtCore.Qt.WaitCursor))
else:
self.setCursor(QtGui.QCursor(QtCore.Qt.WaitCursor))
[docs] def restoreCursor(self, app_wide=True):
"""
Restore the application level cursor to the default. If 'app_wide' is
True then if will be restored for the entire application, if it's
False, it will be just for this panel.
Added for the wait_cursor decorator
"""
if app_wide:
QtWidgets.QApplication.instance().restoreOverrideCursor()
else:
self.setCursor(QtGui.QCursor(QtCore.Qt.ArrowCursor))
[docs]class NewJobDirDialog(swidgets.SDialog):
"""
SDialog to allow the user to read in information for a new job. Should be
created with the user_accept_function parameter set to a function that
accepts (path, job=job) api, where path is the path to the job directory and
job is a Job object if available.
"""
[docs] def __init__(self, *args, show_subjobs=False, **kwargs):
"""
Create a NewJobDirDialog instance
:param bool show_subjobs: If False, only show top-level jobs in the
table. Note that if programs are specified (see setAllowedPrograms),
subjobs of that program type will be shown regardless of this
setting.
All other arguments are passed to the parent class
"""
self.show_subjobs = show_subjobs
super().__init__(*args, **kwargs)
def __getattr__(self, attribute):
"""
Pass on any requests for unknown attributes to the job loader. If the
attribute is still unknown, raise an AttributeError for this class
"""
try:
return getattr(self.jdir_loader, attribute)
except AttributeError:
raise AttributeError('%s has no attribute %s' %
(self.__class__.__name__, attribute))
[docs] def layOut(self):
"""
Lay out the widgets for the dialog
We use a wait cursor because searching the job database may take a few
seconds (or more)
"""
layout = self.mylayout
self.jdir_loader = NewJobDirFrame(self,
layout,
dclick_callback=self.accept,
show_subjobs=self.show_subjobs)
size = QtCore.QSize(800, 400)
self.resize(size)
[docs] def accept(self):
"""
The user has pressed the Accept button. Call the user_accept_function
with the chosen job information.
"""
try:
path, job = self.jdir_loader.getCurrentJobDir()
except RuntimeError as msg:
self.warning(str(msg))
return
if not path:
return
self.user_accept_function(path, job=job)
return swidgets.SDialog.accept(self)
[docs]class InteractiveFileDownloader(jobutils.FileDownloader):
"""
Check for files on the server and download them while allowing the user to
cancel the processes via a procgress dialog.
See parent class for additional information.
"""
DLG_TITLE = 'Contacting Job Server'
[docs] def __init__(self, head, *args, **kwargs):
"""
Create an InteractiveFileDownloader instance
:param `QtWidgets.QWidget` head: The parent widget for dialogs posted by
this class
"""
self.head = head
super().__init__(*args, **kwargs)
self.dialog = None
[docs] def listAvailableFiles(self, *args, **kwargs):
"""
Find all files available on the server for the given job. Posts a
dialog during the server query to allow the user to cancel the process.
If the user cancels the process, and empty list is returned.
See the parent method for additional information
"""
kwargs['wait'] = False
super().listAvailableFiles(*args, **kwargs)
# Post a dialog to allow the user to cancel
text = 'Getting available file information from server'
dialog = mswidgets.ProcessBusyDialog(text,
self.process,
self.head,
title=self.DLG_TITLE)
code = dialog.activate()
# Check for cancellation/errors
if code == dialog.KILLED:
return self.available_filenames
elif code != dialog.SUCCESS:
self.raiseProcessError()
return self.parseAvailableFiles()
[docs] def downloadFilesToDir(self, job, filenames, dirname):
"""
Download a series of files from the server. The dialog will remain
open during the entire duration but will update text to inform the
user which file is currently downloading.
Use of this function versus looping over downloadJobFileToTemp avoids
the dialog flickering and constantly taking focus.
:param `jobconrol.Job` job: The job the files belong to
:param list filenames: The list of file names to download
:param str dirname: The directory to download the files to
:rtype: list or None
:return: filenames is returned if everything completes successfully,
otherwise None is returned.
:raise `jobutils.FileDownloadError`: If an error occurs while
downloading
"""
total = len(filenames)
for index, name in enumerate(filenames, 1):
text = f'Downloading file {index} of {total} from server...'
temp_path = os.path.join(dirname, name)
persistent = index != total
full_path = self.downloadJobFileToTemp(text,
job,
name,
persistent=persistent,
temp_path=temp_path)
if full_path is None:
# User killed
self.cleanFiles()
return None
return filenames
[docs] def downloadJobFileToTemp(self, text, *args, persistent=False, **kwargs):
"""
Download the requested file from the server. Posts a dialog during the
download to allow the user to cancel the process.
If the user cancels the process, None is returned.
:param bool persistent: If False, the dialog is closed when the download
completes. If True, the dialog remains open for additional use -
see downloadManyFilesToDirectory for instance. It is up to the
caller to handle closing the dialog by eventually calling
self.dialog.accept or calling this function with persistent=False.
See the parent method for additional information
"""
kwargs['wait'] = False
super().downloadJobFileToTemp(*args, **kwargs)
# Post a dialog to allow the user to cancel
if self.dialog:
dialog = self.dialog
dialog.setNewData(text, self.process)
else:
dialog = mswidgets.ProcessBusyDialog(text,
self.process,
self.head,
title=self.DLG_TITLE)
code = dialog.activate(persistent=persistent)
success = code == dialog.SUCCESS
# Keep a handle to the dialog if we will be re-using it
if persistent and success:
self.dialog = dialog
else:
if persistent:
dialog.accept()
self.dialog = None
# Check for cancellation/errors
if not success:
fileutils.force_remove(self.out_filename)
self.out_filename = None
if code != dialog.KILLED:
self.raiseProcessError()
return self.out_filename
[docs]class DownloadingMonitorMixin:
"""
A mixin class for panels that monitor running jobs and have to download
files from running job server jobs in order to function
"""
DOWNLOAD_FILE_ENDINGS = []
ALWAYS_REDOWNLOAD = []
[docs] def __init__(self, *args, **kwargs):
""" Create a Downloading Monitor Mixin object """
super().__init__(*args, **kwargs)
# Start the job manager so it gets a list of jobs asap
start_job_manager()
self.downloader = InteractiveFileDownloader(self)
self.downloaded_files = set()
self.current_path = None
[docs] def downloadJobFiles(self, job):
"""
Download available files from a running Job Server job
:param `jobcontrol.Job` job: The job to download files for
:rtype: str or None
:return: The path to the directory where files were downloaded,
or None if the process failed or was killed by the user
:raise `jobutils.FileDownloadError` if a process fails
"""
# Make a directory to hold the files if this is the first download for
# this job
if self.current_path is None:
with fileutils.tempfilename() as dirname:
# Get a temp directory name
pass
os.makedirs(dirname)
self.downloader.temp_directories.append(dirname)
else:
dirname = self.current_path
if not self.DOWNLOAD_FILE_ENDINGS:
# Nothing will ever qualify as downloadable
return dirname
# Find the list of all files that can be downloaded
available_path_data = self.downloader.listAvailableFiles(job)
if not available_path_data:
# User may have killed the job after some files downloaded
self.downloader.cleanFiles()
return None
# Filter the list down to just those we need for the viewer and haven't
# downloaded already
to_download = []
for pdata in available_path_data:
for ending in self.DOWNLOAD_FILE_ENDINGS:
if pdata.match(ending):
if (any(pdata.match(x) for x in self.ALWAYS_REDOWNLOAD) or
pdata.name not in self.downloaded_files):
to_download.append(pdata.name)
downloaded = self.downloader.downloadFilesToDir(job, to_download,
dirname)
if downloaded is not None:
self.downloaded_files.update(to_download)
return dirname
else:
return None
[docs] def downloadJobFilesIfNecessary(self, job, path):
"""
Download required files if this is a running Job Server job
:param `jobcontrol.Job` job: The job to check and download files for
:param str path: The current path to the job directory
:rtype: str
:return: The path to the directory the files were downloaded into
"""
if not job or not jobutils.is_downloadable_job_server_job(job):
return path
try:
path = self.downloadJobFiles(job)
except jobutils.FileDownloadError as err:
self.error(f'Unable to download files: {str(err)}')
return
return path
[docs] def myResetMethod(self):
""" Reset data """
self.downloader.cleanFiles()
self.downloaded_files = set()
self.current_path = None