Source code for schrodinger.application.matsci.qb_sdk.job
# Copyright (c) 2021, Qu & Co
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# built-in
import pprint
import time
from typing import Optional
# local
from .client import QubecClient, get_client
from .parameters import QUBEC_SDK_TERMINATE_ON_FAIL, SEPARATOR, QubecSdkError
from .tools import get_logger
DEFAULT_JOB_TIMEOUT = 86400
JOB_POLLING_TIME = 30
logger = get_logger(__name__)
[docs]class QubecJob:
"""
Class representing a running or completed job on the QUBEC platform
Attributes:
job_id (str): The unique ID assigned by QUBEC to the job
experiment (str): A string representing the desired quantum algorithm to run
backend_type (str): A string representing the type of quantum backend to use
backend (dict): A dictionary with relevant backend information
problem (dict): dictionary with problem definition
parameters (dict): Any additional parameters which has been sent to the QUBEC
job submission engine
status (str): The status of the job
result (dict): A dictionary with partial or full results of the job
error (str): A string with an error message if any problem occurred during job submission
"""
[docs] def __init__(
self,
job_id: str,
experiment: str = None,
backend_type: str = None,
backend: dict = None,
problem: dict = None,
**parameters,
) -> None:
"""
Initialize an instance of the QubecJob class representing a running or completed
job on the QUBEC platform
Args:
job_id (str): The unique ID assigned by QUBEC to the job
experiment (str): A string representing the desired quantum algorithm to run
backend_type (str): A string representing the type of quantum backend to use
backend (dict): A dictionary with relevant backend information
problem (dict): dictionary with problem definition
**parameters (dict): Any additional parameters which has been sent to the QUBEC
job submission engine
"""
self.job_id: str = job_id
self.experiment: str = experiment
self.backend_type: str = backend_type
self.backend: dict = backend if backend is not None else {}
self.problem: dict = problem if problem is not None else {}
self.parameters: dict = parameters
self.status: str = "initialized"
self.result: dict = {}
self.error: Optional[str] = None
[docs] def wait_completion(self, timeout=DEFAULT_JOB_TIMEOUT) -> None:
"""
Wait until the job has completed by periodically querying the
results in the backend. If the session has expired, the token
will be refreshed
Args:
timeout (int): A timeout after which stopping to listen for job progresses
Raises:
QubecSdkError if any error occured while querying for QUBEC job
progress in the backend
"""
client: QubecClient = get_client()
elapsed = 0
completed = False
logger.info(SEPARATOR)
while not completed:
if elapsed > timeout:
logger.warning(
f"Job {self.job_id} has timed out without completing.")
content = client.progress_job(self.job_id)
result = content["data"]["result"] if "result" in content[
"data"] else {}
status = content["data"]["status"] if "status" in content[
"data"] else "unknown"
raise QubecSdkError(
"The job has reached the timeout without terminating. " +
f"Final status: {status}. Partial results: {result}")
client.refresh_session()
try:
content = client.progress_job(self.job_id)
self.status = content["data"]["status"]
logger.info(
f"Checking progress of job {self.job_id}. Current status: {self.status}"
)
# detailed job information
details = content["data"].get("details",
"No information available")
logger.info(
f"Job progress information\n{pprint.pformat(details, indent=4, width=50)}\n"
)
# termination statuses
if self.status == "completed":
time.sleep(1)
self.result = content["data"].get("result", {})
completed = True
elif self.status == "failed":
time.sleep(1)
self.error = content["data"].get("error_msg", "")
msg = f"Job {self.job_id} has failed.\nDetailed error message: {self.error}"
logger.warning(msg)
if QUBEC_SDK_TERMINATE_ON_FAIL:
raise QubecSdkError(msg)
completed = True
elif self.status == "cancelled":
time.sleep(1)
msg = f"The job {self.job_id} has been cancelled by the user."
logger.warning(msg)
if QUBEC_SDK_TERMINATE_ON_FAIL:
raise QubecSdkError(msg)
completed = True
logger.info(SEPARATOR)
except QubecSdkError as e:
self.error = str(e)
break
time.sleep(JOB_POLLING_TIME)
elapsed += JOB_POLLING_TIME
[docs] def cancel(self) -> dict:
"""
Send a remote cancellation request of the job
Returns:
A dictionary with the cancellation API response
Raises:
QubecSdkError if the cancellation request failed
"""
client: QubecClient = get_client()
logger.debug(SEPARATOR)
try:
content = client.cancel_job(self.job_id)
logger.info(f"Successfully cancelled job {self.job_id}")
self.status = "cancelled"
return content
except QubecSdkError as e:
logger.warning(str(e))
return {}
[docs] def get_result(self, use_cached: bool = False) -> dict:
"""
Get the available results of the current job
Args:
use_cached (bool): If true, do not perform any request and used cached result values
Returns:
A dictionary with the job results
"""
if use_cached:
return self.result
else:
try:
client: QubecClient = get_client()
client.refresh_session()
content = client.progress_job(self.job_id)
self.result = content["data"]["result"]
self.status = content["data"]["status"]
return self.result
except (KeyError, QubecSdkError):
logger.warning(f"Cannot retrieve results of job {self.job_id}")
return {}
[docs] def get_error(self, use_cached: bool = False) -> Optional[str]:
if use_cached:
return self.error
else:
try:
client: QubecClient = get_client()
client.refresh_session()
content = client.progress_job(self.job_id)
self.error = (content["data"]["error_msg"]
if "error_msg" in content["data"] else None)
return self.error
except (KeyError, QubecSdkError):
logger.warning(f"Cannot retrieve error for job {self.job_id}")
return None
[docs] def get_status(self, use_cached: bool = False) -> Optional[str]:
"""
Get the current status of the job
Args:
use_cached (bool): If true, do not perform any request and used cached result values
Returns:
A string with the job status
"""
client: QubecClient = get_client()
client.refresh_session()
if use_cached:
return self.status
else:
try:
content = client.progress_job(self.job_id)
self.status = content["data"]["status"]
return self.status
except QubecSdkError:
logger.warning(f"Cannot retrieve results of job {self.job_id}")
return None
def __str__(self):
msg = f"""
{SEPARATOR}
Final job status: {self.status}
Results
"""
msg += pprint.pformat(self.get_result())
return msg
# TODO
[docs] @staticmethod
def from_id(job_id: str) -> "QubecJob":
pass