Source code for schrodinger.pipeline.pipeutils
"""
Shared functions for Pipeline stages.
Copyright Schrodinger, LLC. All rights reserved.
"""
# Contributors: Matvey Adzhigirey
import os
import sys
from past.utils import old_div
from schrodinger import structure
from schrodinger.utils import fileutils
[docs]def countRoots(ligfiles, unique_field="s_m_title"):
    """
    Counts the nunber of compounds in the supplied files.  Compounds are
    identified by the 'unique_field' property, and all structures that share
    the same 'unique_field' value are considered variants of the compound.
    Raises a RuntimeError if there is a problem reading a ligand file or if
    the 'unique_field' property is missing.  Returns a tuple of the total
    number of structures (i.e., variants) and the total number of compounds.
    """
    unique_field_is_title = (unique_field == "title" or
                             unique_field == "s_m_title")
    ligand_roots_dict = {}
    st_num = 0
    for ligfile in ligfiles:
        try:
            sts = structure.StructureReader(ligfile)
        except:
            raise RuntimeError("Could not read file:" + ligfile)
        for st in sts:
            st_num += 1
            if int(old_div(st_num, 1000)) * 1000 == st_num:
                sys.stdout.write(".")
                sys.stdout.flush()
            if unique_field_is_title:
                try:
                    root = st.title
                except:
                    raise RuntimeError("A ligand in file " + ligfile +
                                       " is missing a title!")
            else:
                try:
                    root = st.property[unique_field]
                except:
                    raise RuntimeError("No field " + unique_field +
                                       " in file " + ligfile + " ligand " +
                                       str(st.title) + "!")
            try:
                ligand_roots_dict[root] += 1
            except KeyError:
                ligand_roots_dict[root] = 1
    return (st_num, len(ligand_roots_dict)) 
[docs]class BackwardsReader(object):
    """
    Read a file line by line, backwards.
    Takes in a file path, returns an iterator class.
    """
    BLKSIZE = 4096
[docs]    def __init__(self, filename):
        self.fh = open(filename, "r")  # Ev:134557
        self.buf = ""
        self.fh.seek(0, os.SEEK_END)
        self.file_size = self.fh.tell() 
    def __iter__(self):
        offset_from_start = self.file_size
        delta = 0
        line = None
        while offset_from_start > 0:
            delta = min(self.file_size, delta + self.BLKSIZE)
            self.fh.seek(self.file_size - delta)
            toread = min(offset_from_start, self.BLKSIZE)
            self.buf = self.fh.read(toread)
            offset_from_start -= self.BLKSIZE
            lines = self.buf.split('\n')
            if line is not None:
                if self.buf[-1] != '\n':
                    lines[-1] += line
                else:
                    yield line
            line = lines[0]
            for idx in range(len(lines) - 1, 0, -1):
                if len(lines[idx]) > 0:
                    yield lines[idx]
        if line is not None:
            yield line
    def __del__(self):
        self.fh.close() 
[docs]def get_last_20_lines(logfile):
    """
    Given a log file, returns a string of last 20 lines of it.
    """
    msg = ""
    msg += "           Last 20 lines of %s:\n" % logfile
    msg += "******************************************************************************\n"
    if os.path.exists(logfile):
        lines = []
        for line in BackwardsReader(logfile):
            lines.insert(0, line)
            if len(lines) == 20:
                break
        for line in lines:
            msg += line + '\n'
    else:
        msg += "  NO LOG FILE\n"
    msg += "******************************************************************************\n"
    return msg 
[docs]class DotPrinter:
    """
    Class for printing a progress period or percentage every N number of iterations.
    Example:
    dp = DotPrinter(total_sts)
    for st in sr:
        dp.dot()
    """
[docs]    def __init__(self, total_sts=None, every=1000):
        self._total_sts = total_sts
        self._every = every  # Print dot every N structures
        self.prev_percent = 0
        self._num_dots = 0
        self._curr_st = 0 
[docs]    def dot(self):
        self._curr_st += 1
        if self._curr_st % self._every == 0:
            sys.stdout.write(".")
            if self._total_sts:  # If total number of structures is known
                self._num_dots += 1
                new_percent = int(self._curr_st * 100 / self._total_sts)
                if (new_percent > self.prev_percent and self._num_dots >= 10) \
                        
or (self._curr_st == self._total_sts):
                    sys.stdout.write(str(new_percent) + "%")
                    self.prev_percent = new_percent
                    self._num_dots = 0
            sys.stdout.flush()
        return self._curr_st  
# Ev:96648 & Ev: 104439
[docs]def read_unique_field(st, uniquefield):
    """
    Returns the value of the specified property for the specified st
    (converted to string). If the property does not exist, attempts to
    read the same property of different type (string/int/float).
    If neither is avaible, re-raises the missing property exception.
    """
    orig_exception = None
    try:
        return str(st.property[uniquefield])
    except Exception as err:
        orig_exception = err
    try:
        return str(st.property['s' + uniquefield[1:]])
    except:
        pass
    try:
        return str(st.property['i' + uniquefield[1:]])
    except:
        pass
    try:
        return str(st.property['r' + uniquefield[1:]])
    except:
        pass
    raise orig_exception 
[docs]def get_reader(filename, astext=False, sd_for_unknown=True, support_smi=True):
    """
    Return a StructureReader object for reading the file; based on the file
    type.
    :param astext: Returns a MaestroTextReader instance if specified file is
            a Maestro file and astext is True.
    :type astext: bool
    :param sd_for_unknown: Whether to open files with unknown extensions as SD.
    :type sd_for_unkown: bool
    :param support_smi: Whether to support SMILES and SMILESCSV formats.
    :type support_smi: bool
    """
    iformat = fileutils.get_structure_file_format(filename)
    if not iformat and sd_for_unknown:
        iformat = fileutils.SD
    if astext and iformat == fileutils.MAESTRO:
        return structure.MaestroTextReader(filename)
    # Will open unknown files as SD:
    if iformat == fileutils.SMILES and support_smi:
        sr = structure.SmilesReader(filename)
    elif iformat == fileutils.SMILESCSV and support_smi:
        sr = structure.SmilesCsvReader(filename)
    else:
        sr = structure.StructureReader(filename)
    return sr 
# EOF