Source code for schrodinger.application.matsci.qb_sdk.job
# Copyright (c) 2021, Qu & Co
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright notice, this
#    list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
#    this list of conditions and the following disclaimer in the documentation
#    and/or other materials provided with the distribution.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# built-in
import pprint
import time
from typing import Optional
# local
from .client import QubecClient, get_client
from .parameters import QUBEC_SDK_TERMINATE_ON_FAIL, SEPARATOR, QubecSdkError
from .tools import get_logger
DEFAULT_JOB_TIMEOUT = 86400
JOB_POLLING_TIME = 30
logger = get_logger(__name__)
[docs]class QubecJob:
    """
    Class representing a running or completed job on the QUBEC platform
    Attributes:
        job_id (str): The unique ID assigned by QUBEC to the job
        experiment (str): A string representing the desired quantum algorithm to run
        backend_type (str): A string representing the type of quantum backend to use
        backend (dict): A dictionary with relevant backend information
        problem (dict): dictionary with problem definition
        parameters (dict): Any additional parameters which has been sent to the QUBEC
            job submission engine
        status (str): The status of the job
        result (dict): A dictionary with partial or full results of the job
        error (str): A string with an error message if any problem occurred during job submission
    """
[docs]    def __init__(
        self,
        job_id: str,
        experiment: str = None,
        backend_type: str = None,
        backend: dict = None,
        problem: dict = None,
        **parameters,
    ) -> None:
        """
        Initialize an instance of the QubecJob class representing a running or completed
        job on the QUBEC platform
        Args:
            job_id (str): The unique ID assigned by QUBEC to the job
            experiment (str): A string representing the desired quantum algorithm to run
            backend_type (str): A string representing the type of quantum backend to use
            backend (dict): A dictionary with relevant backend information
            problem (dict): dictionary with problem definition
            **parameters (dict): Any additional parameters which has been sent to the QUBEC
                job submission engine
        """
        self.job_id: str = job_id
        self.experiment: str = experiment
        self.backend_type: str = backend_type
        self.backend: dict = backend if backend is not None else {}
        self.problem: dict = problem if problem is not None else {}
        self.parameters: dict = parameters
        self.status: str = "initialized"
        self.result: dict = {}
        self.error: Optional[str] = None 
[docs]    def wait_completion(self, timeout=DEFAULT_JOB_TIMEOUT) -> None:
        """
        Wait until the job has completed by periodically querying the
        results in the backend. If the session has expired, the token
        will be refreshed
        Args:
            timeout (int): A timeout after which stopping to listen for job progresses
        Raises:
            QubecSdkError if any error occured while querying for QUBEC job
                progress in the backend
        """
        client: QubecClient = get_client()
        elapsed = 0
        completed = False
        logger.info(SEPARATOR)
        while not completed:
            if elapsed > timeout:
                logger.warning(
                    f"Job {self.job_id} has timed out without completing.")
                content = client.progress_job(self.job_id)
                result = content["data"]["result"] if "result" in content[
                    "data"] else {}
                status = content["data"]["status"] if "status" in content[
                    "data"] else "unknown"
                raise QubecSdkError(
                    "The job has reached the timeout without terminating. " +
                    f"Final status: {status}. Partial results: {result}")
            client.refresh_session()
            try:
                content = client.progress_job(self.job_id)
                self.status = content["data"]["status"]
                logger.info(
                    f"Checking progress of job {self.job_id}. Current status: {self.status}"
                )
                # detailed job information
                details = content["data"].get("details",
                                              "No information available")
                logger.info(
                    f"Job progress information\n{pprint.pformat(details, indent=4, width=50)}\n"
                )
                # termination statuses
                if self.status == "completed":
                    time.sleep(1)
                    self.result = content["data"].get("result", {})
                    completed = True
                elif self.status == "failed":
                    time.sleep(1)
                    self.error = content["data"].get("error_msg", "")
                    msg = f"Job {self.job_id} has failed.\nDetailed error message: {self.error}"
                    logger.warning(msg)
                    if QUBEC_SDK_TERMINATE_ON_FAIL:
                        raise QubecSdkError(msg)
                    completed = True
                elif self.status == "cancelled":
                    time.sleep(1)
                    msg = f"The job {self.job_id} has been cancelled by the user."
                    logger.warning(msg)
                    if QUBEC_SDK_TERMINATE_ON_FAIL:
                        raise QubecSdkError(msg)
                    completed = True
                logger.info(SEPARATOR)
            except QubecSdkError as e:
                self.error = str(e)
                break
            time.sleep(JOB_POLLING_TIME)
            elapsed += JOB_POLLING_TIME 
[docs]    def cancel(self) -> dict:
        """
        Send a remote cancellation request of the job
        Returns:
            A dictionary with the cancellation API response
        Raises:
            QubecSdkError if the cancellation request failed
        """
        client: QubecClient = get_client()
        logger.debug(SEPARATOR)
        try:
            content = client.cancel_job(self.job_id)
            logger.info(f"Successfully cancelled job {self.job_id}")
            self.status = "cancelled"
            return content
        except QubecSdkError as e:
            logger.warning(str(e))
            return {} 
[docs]    def get_result(self, use_cached: bool = False) -> dict:
        """
        Get the available results of the current job
        Args:
            use_cached (bool): If true, do not perform any request and used cached result values
        Returns:
            A dictionary with the job results
        """
        if use_cached:
            return self.result
        else:
            try:
                client: QubecClient = get_client()
                client.refresh_session()
                content = client.progress_job(self.job_id)
                self.result = content["data"]["result"]
                self.status = content["data"]["status"]
                return self.result
            except (KeyError, QubecSdkError):
                logger.warning(f"Cannot retrieve results of job {self.job_id}")
                return {} 
[docs]    def get_error(self, use_cached: bool = False) -> Optional[str]:
        if use_cached:
            return self.error
        else:
            try:
                client: QubecClient = get_client()
                client.refresh_session()
                content = client.progress_job(self.job_id)
                self.error = (content["data"]["error_msg"]
                              if "error_msg" in content["data"] else None)
                return self.error
            except (KeyError, QubecSdkError):
                logger.warning(f"Cannot retrieve error for job {self.job_id}")
                return None 
[docs]    def get_status(self, use_cached: bool = False) -> Optional[str]:
        """
        Get the current status of the job
        Args:
            use_cached (bool): If true, do not perform any request and used cached result values
        Returns:
            A string with the job status
        """
        client: QubecClient = get_client()
        client.refresh_session()
        if use_cached:
            return self.status
        else:
            try:
                content = client.progress_job(self.job_id)
                self.status = content["data"]["status"]
                return self.status
            except QubecSdkError:
                logger.warning(f"Cannot retrieve results of job {self.job_id}")
                return None 
    def __str__(self):
        msg = f"""
{SEPARATOR}
Final job status: {self.status}
Results
"""
        msg += pprint.pformat(self.get_result())
        return msg
    # TODO
[docs]    @staticmethod
    def from_id(job_id: str) -> "QubecJob":
        pass