Source code for schrodinger.pipeline.pipeutils
"""
Shared functions for Pipeline stages.
Copyright Schrodinger, LLC. All rights reserved.
"""
# Contributors: Matvey Adzhigirey
import os
import sys
from past.utils import old_div
from schrodinger import structure
from schrodinger.utils import fileutils
[docs]def countRoots(ligfiles, unique_field="s_m_title"):
"""
Counts the nunber of compounds in the supplied files. Compounds are
identified by the 'unique_field' property, and all structures that share
the same 'unique_field' value are considered variants of the compound.
Raises a RuntimeError if there is a problem reading a ligand file or if
the 'unique_field' property is missing. Returns a tuple of the total
number of structures (i.e., variants) and the total number of compounds.
"""
unique_field_is_title = (unique_field == "title" or
unique_field == "s_m_title")
ligand_roots_dict = {}
st_num = 0
for ligfile in ligfiles:
try:
sts = structure.StructureReader(ligfile)
except:
raise RuntimeError("Could not read file:" + ligfile)
for st in sts:
st_num += 1
if int(old_div(st_num, 1000)) * 1000 == st_num:
sys.stdout.write(".")
sys.stdout.flush()
if unique_field_is_title:
try:
root = st.title
except:
raise RuntimeError("A ligand in file " + ligfile +
" is missing a title!")
else:
try:
root = st.property[unique_field]
except:
raise RuntimeError("No field " + unique_field +
" in file " + ligfile + " ligand " +
str(st.title) + "!")
try:
ligand_roots_dict[root] += 1
except KeyError:
ligand_roots_dict[root] = 1
return (st_num, len(ligand_roots_dict))
[docs]class BackwardsReader(object):
"""
Read a file line by line, backwards.
Takes in a file path, returns an iterator class.
"""
BLKSIZE = 4096
[docs] def __init__(self, filename):
self.fh = open(filename, "r") # Ev:134557
self.buf = ""
self.fh.seek(0, os.SEEK_END)
self.file_size = self.fh.tell()
def __iter__(self):
offset_from_start = self.file_size
delta = 0
line = None
while offset_from_start > 0:
delta = min(self.file_size, delta + self.BLKSIZE)
self.fh.seek(self.file_size - delta)
toread = min(offset_from_start, self.BLKSIZE)
self.buf = self.fh.read(toread)
offset_from_start -= self.BLKSIZE
lines = self.buf.split('\n')
if line is not None:
if self.buf[-1] != '\n':
lines[-1] += line
else:
yield line
line = lines[0]
for idx in range(len(lines) - 1, 0, -1):
if len(lines[idx]) > 0:
yield lines[idx]
if line is not None:
yield line
def __del__(self):
self.fh.close()
[docs]def get_last_20_lines(logfile):
"""
Given a log file, returns a string of last 20 lines of it.
"""
msg = ""
msg += " Last 20 lines of %s:\n" % logfile
msg += "******************************************************************************\n"
if os.path.exists(logfile):
lines = []
for line in BackwardsReader(logfile):
lines.insert(0, line)
if len(lines) == 20:
break
for line in lines:
msg += line + '\n'
else:
msg += " NO LOG FILE\n"
msg += "******************************************************************************\n"
return msg
[docs]class DotPrinter:
"""
Class for printing a progress period or percentage every N number of iterations.
Example:
dp = DotPrinter(total_sts)
for st in sr:
dp.dot()
"""
[docs] def __init__(self, total_sts=None, every=1000):
self._total_sts = total_sts
self._every = every # Print dot every N structures
self.prev_percent = 0
self._num_dots = 0
self._curr_st = 0
[docs] def dot(self):
self._curr_st += 1
if self._curr_st % self._every == 0:
sys.stdout.write(".")
if self._total_sts: # If total number of structures is known
self._num_dots += 1
new_percent = int(self._curr_st * 100 / self._total_sts)
if (new_percent > self.prev_percent and self._num_dots >= 10) \
or (self._curr_st == self._total_sts):
sys.stdout.write(str(new_percent) + "%")
self.prev_percent = new_percent
self._num_dots = 0
sys.stdout.flush()
return self._curr_st
# Ev:96648 & Ev: 104439
[docs]def read_unique_field(st, uniquefield):
"""
Returns the value of the specified property for the specified st
(converted to string). If the property does not exist, attempts to
read the same property of different type (string/int/float).
If neither is avaible, re-raises the missing property exception.
"""
orig_exception = None
try:
return str(st.property[uniquefield])
except Exception as err:
orig_exception = err
try:
return str(st.property['s' + uniquefield[1:]])
except:
pass
try:
return str(st.property['i' + uniquefield[1:]])
except:
pass
try:
return str(st.property['r' + uniquefield[1:]])
except:
pass
raise orig_exception
[docs]def get_reader(filename, astext=False, sd_for_unknown=True, support_smi=True):
"""
Return a StructureReader object for reading the file; based on the file
type.
:param astext: Returns a MaestroTextReader instance if specified file is
a Maestro file and astext is True.
:type astext: bool
:param sd_for_unknown: Whether to open files with unknown extensions as SD.
:type sd_for_unkown: bool
:param support_smi: Whether to support SMILES and SMILESCSV formats.
:type support_smi: bool
"""
iformat = fileutils.get_structure_file_format(filename)
if not iformat and sd_for_unknown:
iformat = fileutils.SD
if astext and iformat == fileutils.MAESTRO:
return structure.MaestroTextReader(filename)
# Will open unknown files as SD:
if iformat == fileutils.SMILES and support_smi:
sr = structure.SmilesReader(filename)
elif iformat == fileutils.SMILESCSV and support_smi:
sr = structure.SmilesCsvReader(filename)
else:
sr = structure.StructureReader(filename)
return sr
# EOF