import os
from typing import Optional
from schrodinger.infra import mmjob
from schrodinger.job import jobcontrol
from schrodinger.Qt import QtCore
from . import mmshare_testfile
job_record_scalars = {
"JobId",
"ParentJobId",
"Name",
"Program",
"Processors",
"OrigLaunchHost",
"OrigLaunchDir",
"Host",
"User",
"Dir",
"HostEntry",
"Commandline",
"QueueHost",
"BatchId",
"JobHost",
"JobSchrodinger",
"JobDir",
"JobProgress",
"Status",
"ExitStatus",
"ExitCode",
"Timezone",
"LaunchTime",
"StartTime",
"StatusTime",
"StopTime",
"StructureOutputFile",
"Debug",
"Project",
"Disposition",
"ViewName",
"TmpLaunchDir",
"DisplayStatus",
"StatusChangeReason",
"DownloadableFiles",
}
job_record_lists = {
"InputFiles",
"OutputFiles",
"LogFiles",
# "Licenses", # ignored because it doesn't appear to be used in jobcontrol.Job
"Envs",
"SubJobs",
"Errors",
}
[docs]def create_fake_job(JobId: str, **kwargs) -> jobcontrol.Job:
"""
Returns a jobcontrol.Job created a from a job record with the given kwargs
dict.
"""
cpp_job = _get_cpp_job(JobId=JobId, **kwargs)
return jobcontrol.Job(JobId, cpp_job)
def _get_cpp_job(**kwargs) -> mmjob.Job:
"""
Returns an mmjob.Job created from a job record with the given kwargs dict.
The values in the kwargs dict will NOT be escaped.
"""
job_record_lines = []
for field, value in kwargs.items():
if field in job_record_scalars:
if value is None or value == "":
continue
job_record_lines.append(f"{field}: {value}")
elif field in job_record_lists:
if len(value) == 0:
continue
# list items are 1-indexed.
for i in range(1, len(value) + 1):
line = f"{field}[{i}]: {value[i-1]}"
job_record_lines.append(line)
else:
raise ValueError(
f"Unexpected keyword argument: {field} is not a valid job record field."
)
job_record_text = '\n'.join(job_record_lines)
cpp_job = mmjob.job_from_job_record_text(job_record_text)
return cpp_job
[docs]def get_legacy_completed_job() -> jobcontrol.Job:
"""
Returns a completed job with an ExitStatus of finished.
"""
legacy_job_id = "isabel-0-42f280f5"
return create_fake_job(legacy_job_id,
Status="completed",
ExitStatus="finished")
[docs]def get_legacy_died_job() -> jobcontrol.Job:
"""
Returns a completed job with legacy jobcontrol JobId and an ExitStatus of
"died".
"""
return get_job(mmshare_testfile("job_test_files/isabel-0-42f280f5"))
[docs]def get_legacy_running_job() -> jobcontrol.Job:
"""
Returns a running job with a legacy job id.
"""
return get_job(mmshare_testfile("job_test_files/isabel-0-42f280f6"))
[docs]def get_job(filename) -> jobcontrol.Job:
"""
Return a job object from a text file.
"""
with open(filename) as fh:
cpp_job = mmjob.job_from_job_record_text(fh.read())
# No need to patch jobcontrol.Job.readAgain here, it won't be called
# because we're already initializing with a cpp_job.
job = jobcontrol.Job(job_id=os.path.basename(filename), cpp_job=cpp_job)
return job
[docs]class FakeJobDownloader(QtCore.QObject):
"""
Drop-in replacement for the JobDownloader class, immediately emits a signal
indicating a successful download.
"""
downloadFinished = QtCore.pyqtSignal(str)
[docs] def __init__(self, jobid):
super().__init__()
self.output = ''
[docs] def download(self):
self.downloadFinished.emit(self.output)
[docs]class FakeFailedJobDownloader(FakeJobDownloader):
"""
Like FakeJobDownloader, but emits its signal with a non-empty string,
indicating a download failure.
"""
FAKE_OUTPUT = 'fake download failed'
[docs] def __init__(self, jobid):
super().__init__(jobid)
self.output = FakeFailedJobDownloader.FAKE_OUTPUT
[docs]class FakeJobLauncher(QtCore.QObject):
"""
Drop-in replacement for the JobLauncher class, immediately emits a signal
indicating a successful download.
"""
# hack to provide class var to show last value
last_job_cmd = None
jobStarted = QtCore.pyqtSignal(jobcontrol.Job)
jobLaunchFailed = QtCore.pyqtSignal(str)
[docs] def __init__(self, job_cmd, job: Optional[jobcontrol.Job] = None):
# set cls variable
FakeJobLauncher.last_job_cmd = job_cmd
super().__init__()
# The job to emit from launch.
if job is None:
job = get_legacy_died_job()
self._job = job
[docs] def launch(self):
QtCore.QTimer.singleShot(0, self._launchFinished)
def _launchFinished(self):
self.jobStarted.emit(self._job)