import os
import random
import string
from datetime import datetime
from typing import Callable
from typing import List
from typing import Optional
import schrodinger
from schrodinger.infra import jobhub
from schrodinger.Qt import QtCore
from schrodinger.utils import mmutil
from . import jobcontrol
from .download import download_job
# Temporary, delete when PANEL-19692 is complete
DEBUG = False
class _AbstractJobHandler(QtCore.QObject):
"""
Base class for job handlers.
:ivar jobCompleted: Signal emitted when the job is completed and downloaded.
Under JOB_SERVER, a job can be complete but not downloaded, but this
signal will only be emitted when a job has finished downloading.
"""
jobCompleted = QtCore.pyqtSignal(jobcontrol.Job)
jobDownloadFailed = QtCore.pyqtSignal(jobcontrol.Job, str)
jobProgressChanged = QtCore.pyqtSignal(jobcontrol.Job, int, int, str)
def __init__(self,
cmd: List[str],
viewname: Optional[str] = None,
launch_dir: Optional[str] = None):
super().__init__()
if viewname is None:
# If no viewname is provided, we just give the job a random string
# for a viewname
viewname = "".join(
random.choices(string.ascii_uppercase + string.digits, k=32))
self._launch_dir = launch_dir if launch_dir else os.getcwd()
self.viewname = viewname
self.job = None
self._setupWaitLoop()
self._setupJobCmd(cmd)
for signal, slot in self._getJobManagerSignalsAndSlots():
signal.connect(slot)
self._completed = False
def _setupJobCmd(self, cmd_list: List[str]):
"""
Construct a job command adding a viewname and project to a job command
if they are not already present.
"""
if "-VIEWNAME" not in cmd_list:
cmd_list += ["-VIEWNAME", self.viewname]
# Calling schrodinger.get_maestro() directly to reduce the risk of
# a unittest leaking a maestro mock
maestro = schrodinger.get_maestro()
if maestro and "-PROJ" not in cmd_list:
pt = maestro.project_table_get()
prj_name = pt.project_name
cmd_list += ["-PROJ", prj_name]
self._cmd_list = cmd_list
def _setupWaitLoop(self):
self._wait_loop = QtCore.QEventLoop()
self.jobCompleted.connect(self._wait_loop.quit)
def _getJobManagerSignalsAndSlots(self):
jmgr = jobhub.get_job_manager()
done_signal, done_slot = self._getDoneSignalAndSlot()
return [
(jmgr.jobProgressChanged, self._onJobProgressChanged),
(done_signal, done_slot),
]
def _getDoneSignalAndSlot(self):
jmgr = jobhub.get_job_manager()
if not mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER):
# with JOB_SERVER off, the job is downloaded when it's completed so
# no special handling is needed
done_signal = jmgr.jobCompleted
done_slot = self._onJobDone
elif is_auto_download_active():
# jobDownloaded is emitted when maestro downloads the job
done_signal = jmgr.jobDownloaded
done_slot = self._onJobDone
else:
done_signal = jmgr.jobCompleted
# Without auto-download, need special handling for completed job
done_slot = self._onJobFinishedRunning
return done_signal, done_slot
# ==========================================================================
# _AbstractJobHandler API
# ==========================================================================
def launchJob(self, *, _debug_delay=None) -> jobcontrol.Job:
"""
Launch the job.
"""
if self.job is not None:
raise RuntimeError("Job has already been launched")
return self._launchJob(_debug_delay=_debug_delay)
def _launchJob(self, *, _debug_delay=None):
raise NotImplementedError
def wait(self):
if self.job is None:
raise RuntimeError("Can't wait until the job has been launched")
if self.job.isComplete():
return
self._wait_loop.exec()
# ==========================================================================
# _AbstractJobHandler slots
# ==========================================================================
def _onJobFinishedRunning(self, job: jobcontrol.Job):
"""
Download the job if it succeeded. Only called outside of maestro.
Called when job finishes running. Under JOB_SERVER, a job completes
when the (remote) calculations finish but before the results have been
downloaded.
"""
if self.job is None or self._completed:
return
if job.JobId == self.job.JobId:
try:
download_error = download_job(job.JobId)
except Exception as exc:
self.jobDownloadFailed.emit(job, str(exc))
else:
if download_error:
self.jobDownloadFailed.emit(job, download_error)
finally:
self._onJobDone(job)
def _onJobDone(self, job: jobcontrol.Job):
"""
Called after job is both finished running and downloaded
"""
if DEBUG:
print(f"{datetime.now()} _onJobDone called")
if self.job is None or self._completed:
return
if job.JobId == self.job.JobId:
self.job = job
self._completeJob()
def _onJobProgressChanged(self, job: jobcontrol.Job, current_step: int,
total_steps: int, progress_msg: str):
if self._completed:
return
if (not self.job or job.JobId != self.job.job_id):
return
self.job = job
self.jobProgressChanged.emit(self.job, current_step, total_steps,
progress_msg)
# ==========================================================================
# _AbstractJobHandler implementation methods
# ==========================================================================
def _completeJob(self):
"""
Mark the current job as completed (i.e. downloaded).
"""
if self._completed:
return
self._completed = True
self.jobCompleted.emit(self.job)
for signal, slot in self._getJobManagerSignalsAndSlots():
signal.disconnect(slot)
def _checkForEarlyCompletion(self):
# Preemptive job ID
try:
job = jobhub.get_cached_job(self.job.job_id)
except jobhub.StdException:
# The job manager did not recognize the job ID. This means the
# `jobCompleted` signal was not emitted.
return
if job.isComplete():
_, done_slot = self._getDoneSignalAndSlot()
done_slot(job)
def __del__(self):
if hasattr(self, "_wait_loop"):
self._wait_loop.quit()
[docs]class JobHandler(_AbstractJobHandler):
"""
A Job Handler for running and waiting on jobs. To use, initialize with
a list of strings that you would use with JobViewFilter.launchJob. Then
connect `my_jobhandler.jobCompleted` to any slots that need to be executed
after the job is finished. The job handler also has a wait method that
will pause execution of the current event until the job is finished. Note
that during the wait, other ui events will continue to be processed.
"""
# ==========================================================================
# JobHandler API
# ==========================================================================
def _launchJob(self, *, _debug_delay=None) -> jobcontrol.Job:
"""
Launch the job. An event loop is executed while job is being launched.
:return Job object for the started job.
:rtype: jobcontrol.Job
:raises JobLaunchFailure: if the job failed to start. NOTE: unlike
jobcontrol.launch_job(), no dialog is shown on failue, so calling
code is responsible by informing the user of the failure.
"""
self.job = jobcontrol.launch_job(self._cmd_list,
launch_dir=self._launch_dir,
print_output=True,
_debug_delay=_debug_delay)
self._checkForEarlyCompletion()
return self.job
[docs]class AsyncJobHandler(_AbstractJobHandler):
"""
A jobhandler that launches jobs asynchronously (i.e. launchJob doesn't
wait for the job to actually start before returning).
"""
jobStarted = QtCore.pyqtSignal(jobcontrol.Job)
jobLaunchFailed = QtCore.pyqtSignal(Exception)
[docs] def __init__(self, *args, **kwargs):
"""
See _AbstractJobHandler for arguments.
"""
super().__init__(*args, **kwargs)
self.err_message = None
# ==========================================================================
# AsyncJobHandler slots
# ==========================================================================
def _onJobStarted(self, launched_job: jobcontrol.Job):
job = launched_job
if not job:
raise jobcontrol.JobLaunchFailure(
'Launch failed (event loop killed)')
self.job = job
self.err_message = None
self._checkForEarlyCompletion()
self.jobStarted.emit(job)
def _onJobLaunchFailed(self, launch_err):
err_message = launch_err
if err_message:
self.jobLaunchFailed.emit(jobcontrol.JobLaunchFailure(err_message))
self.err_message = err_message
# ==========================================================================
# AsyncJobHandler implementation methods
# ==========================================================================
def _launchJob(self, *, _debug_delay=None):
"""
Launch a job asynchronously. Returns before command is started.
"""
viewname = "" # this parameter is unused MAE-45178
job_launcher = jobhub.JobLauncher(
jobhub.JobCommand(self._cmd_list, viewname, self._launch_dir))
if _debug_delay is None:
job_launcher.jobStarted.connect(self._onJobStarted)
else:
def slot(job):
QtCore.QTimer.singleShot(_debug_delay * 1000,
lambda: self._onJobStarted(job))
job_launcher.jobStarted.connect(slot)
job_launcher.jobLaunchFailed.connect(self._onJobLaunchFailed)
self.job_launcher = job_launcher
job_launcher.launch()
# Confirm that neither signal was emitted yet
assert self.job is None
assert self.err_message is None
[docs]def job_incorporated(job_id: str, first_entry_id: int, last_entry_id: int):
"""
The function which is called after successful incorporation of the job from
maestro. It is called only if job output is incorporated through maestro
job incorporation. If individual panels have their own incorporation handler
registered via maestro.job_incorporation_function_add(), and the panel is
currently open and has handled the incorporation, this function will not be
called by Maestro.
:param job_id: The id of the incorporated job
:param first_entry_id: The id of the first entry imported in the project
from the output structure file associated with the given job.
:param last_entry_id: The id of the last entry imported in the project
from the output structure file associated with the given job.
"""
# Preemptive job ID
try:
job = jobhub.get_cached_job(job_id)
except jobhub.StdException:
# Job record is missing
return
viewname = job.Viewname
if not viewname:
return
from schrodinger.maestro import maestro_job_callbacks
func = maestro_job_callbacks.maestro_job_incorporated_callbacks.get(
viewname)
if func is not None:
func(job_id, first_entry_id, last_entry_id)
[docs]def connect_job_manager_signals():
"""
Called by maestro to connect job manager signals to Python slots.
"""
if not schrodinger.get_maestro():
return
# Set up custom job completion handlers
# Only active with feature flags JOB_SERVER and NEW_INCORPORATION_INFRA
def handle_job_completion(jobdata):
handler_id = jobdata.getCustomHandlerId()
if not handler_id:
return
from schrodinger.maestro import maestro_job_callbacks
func = maestro_job_callbacks.custom_job_completion_handlers.get(
handler_id)
if func is None:
# TODO PANEL-19836: determine how we want to report this problem
return
else:
func(jobdata)
jmgr = jobhub.get_job_manager()
jmgr.readyForPythonIncorporation.connect(handle_job_completion)
# ===============================================================================
# Convenience functions
# ===============================================================================
_active_handlers = []
[docs]def launch_job_with_callback(
cmd: List[str],
callback: Callable[[jobcontrol.Job], None],
launch_dir: Optional[str] = None) -> jobcontrol.Job:
"""
Launch the given job, and call the specified callback when the job
completes (either successfully or with a failure).
:param cmd: Command list
:param callback: Function to call when the job completes (either
successfully or with a failure), with Job object as the only
parameter.
:param launch_dir: Directory to launch job under
:raises RuntimeError: if the job fails to start.
"""
jhandler = JobHandler(cmd, launch_dir=launch_dir)
_active_handlers.append(jhandler)
def job_completed(job: jobcontrol.Job):
if jhandler in _active_handlers:
_active_handlers.remove(jhandler)
callback(jhandler.job)
jhandler.jobCompleted.connect(job_completed)
jhandler.launchJob()
return jhandler.job
[docs]def is_auto_download_active():
"""
:return: Whether job auto-downloading is enabled
"""
return bool(schrodinger.get_maestro() and
mmutil.feature_flag_is_enabled(mmutil.JOB_SERVER))