Source code for schrodinger.pipeline.pipeio
"""
Core Pipeline I/O classes (`Structures`, `Grid`, and `Text`, and `PhaseDB`).
Copyright Schrodinger, LLC. All rights reserved.
"""
import os
import sys
from schrodinger import structure
# Constants for built-in IO types:
STRUCTURES = "structures"
GRID = "grid"
TEXT = "text"
PHASEDB = "phasedb"
[docs]class PipeIO:
    """
    Parent class for all Pipeline I/O classes. Subclasses hold data that is
    passed between stages during execution.
    """
[docs]    def getFiles(self):
        """
        Return a list of files representing this object.
        This method must be implemented in subclasses.
        """ 
[docs]    def check(self):
        """
        Make sure that the object is valid. If it's not valid (e.g. file
        does not exist), raise a RuntimeError.
        This method must be implemented in subclasses.
        """ 
[docs]    def isFilled(self):
        """
        Check whether the object is used or empty.
        This method must be implemented in subclasses.
        """ 
[docs]    def getCount(self):
        """
        Return the number of items in this object.
        This method may be overridden in subclasses.
        """
        return None 
[docs]    def getOutputPaths(self):
        """
        Subclasses may override for special behavior when getting files when
        copying user output.
        """
        return self.getFiles()  
[docs]class Structures(PipeIO):
    """
    A class to hold the names of structure files.
    """
[docs]    def __init__(self, ligs=[], count=None):  # noqa: M511
        """
        Initialize the object with a list of ligand files.
        The list can be replaced later with `setData`.
        :type ligs: list
        :param ligs: A list of ligand structure file names.
        """
        self.setData(ligs)
        self.type = STRUCTURES
        self._count = count 
[docs]    def check(self):
        """
        Make sure all files in the list exist. Raise a RuntimeError if "."
        is in the list, and exit (sys.exit(1)) if any file can't be found.
        """
        if self._structures:
            if "." in self._structures:
                raise RuntimeError(
                    "Error: pipeio.Structures: input needs to be a LIST of files!"
                )
            for f in self._structures:
                if not os.path.exists(f):
                    print(
                        "ERROR: stages.pipeio.Structures: File does not exist:",
                        f)
                    if os.path.abspath(f):
                        print(
                            "File is specified as absolute path; should be local path."
                        )
                    sys.exit(1) 
    def __str__(self):
        """
        Return a string representation of the object.
        """
        if self._count:
            s = "[structures(%i)]:" % self._count
        else:
            s = "[structures]:"
        if self._structures:
            for lig in self._structures:
                s += "\n      " + lig
        else:
            s += "\n      * Empty *"
        return s
[docs]    def setData(self, ligfiles, count=None):
        """
        Replace the list of ligand files with `ligfiles`.
        :type ligfiles: list
        :raise RuntimeError:
                Raised if "." is in the list.
        """
        if '.' in ligfiles:
            raise RuntimeError(
                "Structures.setData(): input must be a list of files.")
        self._structures = []
        for filename in ligfiles:
            self._structures.append(filename)
        if count:
            self._count = count
        else:
            self._count = None 
[docs]    def getFiles(self):
        """
        Return the list of ligand file names after checking that all of them
        exist via the `check` method.
        """
        self.check()
        return self._structures 
[docs]    def isFilled(self):
        return len(self._structures) > 0 
[docs]    def getCount(self, count_if_needed=False):
        if self._count is None and count_if_needed:
            self.count()
        return self._count 
[docs]    def count(self):
        """
        Return the number of structures in the set.
        """
        if self._count is not None:
            return self._count
        else:
            count = 0
            for filename in self._structures:
                count += structure.count_structures(filename)
            self._count = count
            return count  
[docs]class Grid(PipeIO):
    """
    A class to hold a set of grid files (compressed or uncompressed).
    """
[docs]    def __init__(self, gridfile=None):
        """
        :type gridfile: str
        :param gridfile: The name of the grid file (for example,
                `<gridjobname>.grd` or `<gridjobname>.zip`). The value
                can be changed later with `setData`.
        """
        self.setData(gridfile)
        self.type = GRID 
[docs]    def check(self):
        """
        Check that the grid file exists.
        :raise RuntimeError:
                Raised if the file is missing.
        """
        if not self._gridfile:
            return
        if os.path.isfile(self._gridfile):
            return  # compressed file
        else:
            msg = "The following file is missing: %s" % self._gridfile
            raise RuntimeError(msg) 
[docs]    def getPath(self):
        """
        Return the grid file name.
        """
        return self._gridfile 
    def __str__(self):
        """
        Return a string representation of the object.
        """
        s = "[grid]:"
        s += "\n    " + str(self._gridfile)
        return s
[docs]    def setData(self, gridfile):
        """
        Replace the grid file name.
        :type gridfile: str
        :param gridfile:
                The replacement grid file name.
        """
        if gridfile is None:
            self._gridfile = None
        else:
            ext = os.path.splitext(gridfile)[1]
            if ext not in ('.grd', '.zip'):
                msg = "VSW.Grid: Invalid grid path: %s" % gridfile
                raise RuntimeError(msg)
            self._gridfile = gridfile 
[docs]    def getFiles(self):
        """
        Return a list of grid file names, expanded from the representative
        file name, after checking for their existence.
        For compressed grids, the `.zip` file is the only item returned,
        but for uncompressed grids, all the standard grid component file
        names are returned.
        """
        if not self._gridfile:
            return []
        self.check()
        (gridbase, ext) = os.path.splitext(self._gridfile)
        if ext == '.zip':
            files = [self._gridfile]
        else:  # Uncompressed grid
            files = []
            for ext in [
                    "_coul2.fld", ".csc", ".grd", "_greedy.save", ".gsc",
                    ".save", ".site", "_vdw.fld"
            ]:
                filename = gridbase + ext
                if os.path.isfile(filename):
                    files.append(filename)
        return files 
[docs]    def isFilled(self):
        return (self._gridfile is not None)  
[docs]class Text(PipeIO):
    """
    A class to hold the names of one or more text (or log) files.
    """
[docs]    def __init__(self, files=[]):  # noqa: M511
        """
        :type files: list
        :param files:
                A list of text or log file names. The file paths can be later
                replaced with setData().
        """
        self.setData(files)
        self.type = TEXT 
[docs]    def check(self):
        """
        Make sure all files in the list exist.
        :raise RuntimeError:
                Raised if "." is in the list or if any file can't be found.
        """
        if self._files:
            if '.' in self._files:  # Check if a single file name is passed
                raise RuntimeError(
                    "Error: pipeio.Text: input needs to be a LIST of files!")
            for f in self._files:
                if not os.path.exists(f):
                    raise RuntimeError(
                        "Error: pipeio.Text: file does not exist: " + f) 
    def __str__(self):
        """
        Return a string representation of the object.
        """
        s = "[text files]:"
        for filename in self._files:
            s += "\n      " + filename
        return s
[docs]    def setData(self, textfiles):
        """
        Replace the list of file names with `textfiles`.
        :raise RuntimeError:
                Raised if "." is in the list.
        """
        self._files = []
        # To check if a single file name is passed instead
        if '.' in textfiles:
            raise RuntimeError("Text.setData(): input must be a list of files.")
        for filename in textfiles:
            self._files.append(filename) 
[docs]    def getFiles(self):
        """
        Return the list of text file names after checking that all of them
        exist.
        """
        self.check()
        return self._files 
[docs]    def isFilled(self):
        return len(self._files) > 0  
[docs]class PhaseDB(PipeIO):
    """
    A class to hold the absolute pathname of a Phase database.
    """
[docs]    def __init__(self, path=None, remote=False):
        """
        :type path: str
        :param path: The path to the Phase database (This must end with .phdb.)
        :type remote: bool
        :param remote:
                A value to control whether the database should be checked for
                existence only on the remote host, since the localhost may not
                have access to that directory.
        """
        self.setData(path)
        self.type = PHASEDB
        self._remote = remote 
[docs]    def check(self):
        """
        Make sure all files in the list exist.
        :raise RuntimeError:
                Raised if "." is in the list or if any file can't be found.
        """
        if self._path and not self._remote:
            if self._path.endswith(".phdb"):
                # New format
                if os.path.isdir(self._path):
                    return
            else:
                # Old format
                if os.path.isfile(self._path + '_phasedb'):
                    return
            # If got here, then the database does not exist
            raise RuntimeError(
                'Error: pipeio.PhaseDB: Database does not exist: "%s"' %
                self._path) 
[docs]    def getPath(self):
        """
        Returns the path (abolute base name).
        For the old format, does NOT include "_phasedb", for new format DOES
        include the ".phdb".
        """
        return self._path 
    def __str__(self):
        """
        Return a string representation of the object.
        """
        s = "[phasedb]:"
        s += '\n      %s' % self._path
        return s
[docs]    def setData(self, path):
        """
        Replace the stored database path.
        """
        self._path = path 
[docs]    def getFiles(self):
        """
        Return the list of files.
        """
        if not self._path:
            return []
        elif self._path.endswith(".phdb"):
            return []  # No files, since it's a directory
            # Otherwise these files will get attempted to be added to the job
            # record.
        else:
            # Old format
            # Ev:95999 - Avoid copying phasdb to launch directory at the end of job
            # since user has selected a preferred location for it.
            return [] 
[docs]    def isFilled(self):
        return (self._path is not None) 
[docs]    def getOutputPaths(self):
        if self._path and self._path.endswith(".phdb"):
            # When copying user output, return the path so it's registered with
            # jobcontrol
            return [self._path]
        return []