"""
PathFinder helper functions for reading and writing files using RDKit Mol
objects.
"""
import collections
import copy
import csv
import gzip
import heapq
import io
from contextlib import ExitStack
import json
import os
import psutil
import shutil
import sys
import tempfile
import zipfile
import more_itertools
from rdkit import Chem
from schrodinger import structure
from schrodinger.structutils import smiles as smiles_mod
from schrodinger.thirdparty import rdkit_adapter
from schrodinger.utils import fileutils
from schrodinger.utils import log
from schrodinger.utils.fileutils import open_maybe_compressed
logger = log.get_output_logger('pathfinder')
# Empirical file handle limit seems to be max - 4. Errors throw on max -3,
# and seem to run fine on max - 4.
MAX_FILE_HANDLE_PADDING = 4
# Arbitrary number of file handles allowed for Windows.
DEFAULT_MAX_FILE_HANDLES = 512
# Filename extension for PathFinder reactant files
PFX = '.pfx'
METADATA = 'metadata.json'
STRUCTURES = 'structures.csv'
[docs]class MolWriter(structure.StructureWriter):
"""
Write Mol objects to a file using a StructureWriter-like API, optionally
generating 3D coordinates.
"""
[docs] def __init__(self,
filename,
generate_coordinates=True,
require_stereo=False):
super(MolWriter, self).__init__(filename)
self.generate_coordinates = generate_coordinates
self.require_stereo = require_stereo
[docs] def append(self, mol):
st = rdkit_adapter.from_rdkit(mol)
if self.generate_coordinates:
st.generate3dConformation(require_stereo=self.require_stereo)
super(MolWriter, self).append(st)
[docs]class StructureReaderAdapter:
"""
A wrapper for a Structure reader, which, when iterated through, yields
RDKit Mol objects, and can also be used as a context manager that
closes the reader on exit.
"""
[docs] def __init__(self, reader, implicitH=True):
"""
:param reader: source of structures to convert
:type reader: iterable of Structure
:param implicitH: use implicit hydrogens
:type implicitH: bool
"""
self.reader = reader
self.implicitH = implicitH
def __enter__(self):
return self
def __exit__(self, type, value, tb):
try:
self.reader.close()
except AttributeError:
# In case `reader` wasn't really a StructureReader but was
# something like a list of Structure.
pass
def __iter__(self):
for st in self.reader:
try:
yield rdkit_adapter.to_rdkit(st,
implicitH=self.implicitH,
include_coordinates=False)
except (ValueError, RuntimeError) as e:
logger.warning(e)
[docs]class BaseCsvMolReader:
"""
Parent class for CsvMolReader and CsvMolIterator.
"""
[docs] def __init__(self, file):
"""
:param file: CSV filename (file may be compressed) or file-like object.
"""
if hasattr(file, 'read'):
self.fh = file
else:
self.fh = open_maybe_compressed(file, 'rt')
header = self.fh.readline()
self.fieldnames = next(csv.reader([header]))
SKIPPED = {'SMILES', 'NAME', ''}
self.propnames = [f for f in self.fieldnames if f not in SKIPPED]
def __enter__(self):
return self
def __exit__(self, type, value, tb):
self.close()
[docs] def close(self):
self.fh.close()
def _parseLine(self, line):
vals = next(csv.reader([line]))
row = dict(zip(self.fieldnames, vals))
mol = Chem.MolFromSmiles(row['SMILES'])
if mol is None:
return None
for prop in self.propnames:
if row[prop]:
mol.SetProp(prop, row[prop])
for prop in ['NAME', 's_m_title']:
if prop in row:
mol.SetProp('_Name', row[prop])
break
return mol
[docs]class CsvMolReader(BaseCsvMolReader):
"""
Read a SMILES CSV file, returning Mol objects.
This is similar to RDKit's SmilesMolSupplier with delimiter=',', except that
it uses the csv module instead of naively splitting on commas. This makes it
possible to have field values containing commas, as long as they are quoted
following the CSV convention. Note, however, that multi-line records are
still not supported for efficiency reasons.
Also, gzip-compressed files (identified by the filename ending in "gz") are
supported.
A CsvMolReader supports random access, like a list. Upon instantiation, the
file is read in full and kept in memory. For a CSV file having only SMILES
and an ID, this takes about 100 MB per million entries.
"""
[docs] def __init__(self, file):
super().__init__(file)
with self.fh:
self.lines = self.fh.readlines()
[docs] def __len__(self):
return len(self.lines)
def __getitem__(self, index):
return self._parseLine(self.lines[index])
[docs]class CsvMolIterator(BaseCsvMolReader):
"""
Read a SMILES CSV file, returning Mol objects.
Unlike CsvMolReader, CsvMolIterator does not support random access, but
since it only keeps one line in memory at a time, memory use is minimal.
"""
def __iter__(self):
return self
def __next__(self):
return self._parseLine(next(self.fh))
[docs]class CsvMolWriter:
"""
Write a CSV file given Mol objects, using a StructureWriter-like API. The
first two columns are the SMILES and title, and the rest are the properties
of the molecule.
- We don't use structure.SmilesCsvWriter because it is too slow due to all
the conversions (the overall job takes 4 times as long, so the bottleneck
clearly becomes the writing of the output file!).
- We don't use Chem.SmilesWriter because even though it can use comma as a
delimiter, it doesn't write proper CSV files because it doesn't know how
to escape the delimiter.
Also, gzip-compressed files (identified by the filename ending in "gz") are
supported.
"""
[docs] def __init__(self, filename, properties=None, cxsmiles=False):
"""
:param filename: file to write
:type filename: str or file-like object
:param properties: optional, list of names of properties to write to
output file. If None, all the properties are written. (CAVEAT: if
`filename` is a file object rather than an actual filename, only the
properties present in the first molecule are written.)
:type properties: list of str or None
:param cxsmiles: when writing SMILES, use CXSMILES extensions
:type cxsmiles: bool
"""
if hasattr(filename, 'write'):
self.fh = filename
self.unionize_props = False
else:
self.fh = open_maybe_compressed(filename, 'wt', newline='')
self.unionize_props = properties is None
self.filename = filename
_, self.suffix = fileutils.splitext(self.filename)
self._writer = None
self.written_count = 0
self.properties = properties
self.cxsmiles = cxsmiles
self.tmpfiles = []
[docs] def append(self, mol):
"""
Write a molecule to the file. The first time this is called, the header
row is written based on mol's properties or the properties passed to
__init__, if any.
:param mol: molecule
:type mol: rdkit.Chem.rdchem.Mol
"""
props = self._getProps(mol)
props_list = list(props)
if (self._writer and self.unionize_props and
props_list != self.properties):
self.properties = props_list
self._openTmp()
if self._writer is None:
if self.properties is None:
self.properties = list(props)
self._initWriter(self.properties)
props['SMILES'] = self.toSmiles(mol)
props['NAME'] = mol.GetProp('_Name')
self._writer.writerow(props)
self.written_count += 1
[docs] def toSmiles(self, mol):
if self.cxsmiles:
# Remove atom properties added by reaction because they are fairly
# useless in a CXSmiles and take a lot of space.
new_mol = remove_react_atom_props(mol)
return Chem.MolToCXSmiles(new_mol)
else:
return Chem.MolToSmiles(mol)
def _getProps(self, mol):
"""
Return a dictionary of molecule properties after some munging. Property
names are renamed to follow the Schrodinger convention, and float values
are rounded for cosmetic reasons.
:param mol: molecule
:type mol: rdkit.Chem.rdchem.Mol
:return: molecule properties
:rtype: dict
"""
raw_props = rdkit_adapter.translate_rdkit_props_dict(
mol.GetPropsAsDict())
# We reduce float precision because RDKit produces values such as
# 320.41100000000006 where we would rather see 320.411.
props = {}
for name, val in raw_props.items():
if name.startswith('r_'):
props[name] = round(val, 6)
else:
props[name] = val
return props
def _initWriter(self, props):
"""
Initialize the underlying CSV writer, using 'props' to write the header
row. "SMILES" and "Name" are always added as the first two columns.
:param props: property names
:type props: iterable of str
"""
fields = ['SMILES', 'NAME'] + sorted(props)
self._writer = csv.DictWriter(self.fh, fields, extrasaction='ignore')
self._writer.writeheader()
def _openTmp(self):
logger.debug(f'Extended properties: {self.properties}')
with tempfile.NamedTemporaryFile(dir='.',
suffix=self.suffix,
delete=False) as fh:
tmpname = fh.name
self.fh.close()
self._writer = None
self.fh = open_maybe_compressed(tmpname, 'wt', newline='')
self.tmpfiles.append(tmpname)
def __enter__(self):
return self
def __exit__(self, type, value, tb):
self.close()
[docs] def close(self):
self.fh.close()
if self.tmpfiles:
self._mergeTmpfiles()
def _mergeTmpfiles(self):
with tempfile.NamedTemporaryFile(dir='.',
suffix=self.suffix,
delete=False) as fh:
tmpname = fh.name
fileutils.force_rename(self.filename, tmpname)
self.tmpfiles.insert(0, tmpname)
logger.debug(f'Merging tmpfiles: {self.tmpfiles}')
merge_handler = CsvMergeHandler(self.tmpfiles,
self.filename,
dedup_field='SMILES')
merge_files_in_memory(self.tmpfiles,
self.filename,
merge_handler,
dedup=False)
fileutils.force_remove(*self.tmpfiles)
[docs]class BasePfxMolReader:
"""
Parent class for PfxMolReader and PfxMolIterator.
"""
[docs] def __init__(self, filename):
"""
:type filename: str
"""
self.zipfh = zipfile.ZipFile(filename, 'r')
fh = io.TextIOWrapper(self.zipfh.open(STRUCTURES))
self.csv_mol_reader = self.csv_mol_reader_class(fh)
def __enter__(self):
return self
def __exit__(self, type, value, tb):
self.close()
[docs] def close(self):
self.csv_mol_reader.close()
self.zipfh.close()
[docs]class PfxMolReader(BasePfxMolReader):
"""
Reader for PFX (PathFinder reactants) files. These are really zip archives
containing a CSV file and a metadata JSON file.
Like CsvMolReader, PfxMolReader supports random access, like a list. Upon
instantiation, the file is read in full and kept in memory. For a file
having only SMILES and an ID, this takes about 100 MB per million entries.
"""
csv_mol_reader_class = CsvMolReader
[docs] def __len__(self):
return len(self.csv_mol_reader)
def __getitem__(self, index):
return self.csv_mol_reader[index]
[docs]class PfxMolIterator(BasePfxMolReader):
"""
Reader for PFX (PathFinder reactants) files. These are really zip archives
containing a CSV file and a metadata JSON file.
Unlike PfxMolReader, PfxMolIterator does not support random access, but
since it only keeps one line in memory at a time, memory use is minimal.
"""
csv_mol_reader_class = CsvMolIterator
def __iter__(self):
return self
def __next__(self):
return next(self.csv_mol_reader)
[docs]class PfxMolWriter:
"""
Writer for PFX (PathFinder reactants) files. These are really zip archives
containing a CSV file and a metadata JSON file.
"""
[docs] def __init__(self, filename, properties=None):
"""
:param filename: file to write
:type filename: str
:param properties: optional, list of names of properties to write to
output file. If None, all the properties present on the first
structure will be written (the assumption is that all molecules
will have the same properties, or at least that the first
molecule has all the properties that we care about).
:type properties: list of str or None
"""
self.zipfh = zipfile.ZipFile(filename,
'w',
compression=zipfile.ZIP_DEFLATED)
fh = io.TextIOWrapper(self.zipfh.open(STRUCTURES, 'w'), newline='')
self.csv_mol_writer = CsvMolWriter(fh, properties)
[docs] def append(self, mol):
"""
Write a molecule to the file.
:param mol: molecule
:type mol: rdkit.Chem.rdchem.Mol
"""
self.csv_mol_writer.append(mol)
@property
def written_count(self):
return self.csv_mol_writer.written_count
def _writeMetadata(self):
metadata = {'size': self.written_count}
with io.TextIOWrapper(self.zipfh.open(METADATA, 'w')) as fh:
json.dump(metadata, fh)
fh.write('\n')
def __enter__(self):
return self
def __exit__(self, type, value, tb):
self.close()
[docs] def close(self):
self.csv_mol_writer.close()
self._writeMetadata()
self.zipfh.close()
[docs]class RdkitMolWriter:
"""
Write Mol objects to a file using the RDKit file-writing classes, but
with a StructureWriter-like API. Supports SMILES and SDF.
"""
[docs] def __init__(self, filename, v3000=False):
"""
:param filename: filename to write
:type filtename: str
:param v3000: when writing SD, force the use of the V3000 format
:type V3000: bool
"""
self.rdkit_writer = None
if fileutils.is_gzipped_structure_file(filename):
self.fh = gzip.open(filename, 'w')
else:
self.fh = open(filename, 'w')
if fileutils.is_sd_file(filename):
self.rdkit_writer = Chem.SDWriter(self.fh)
self.rdkit_writer.SetForceV3000(v3000)
elif fileutils.is_smiles_file(filename):
self.rdkit_writer = Chem.SmilesWriter(self.fh,
includeHeader=False,
isomericSmiles=True)
else:
raise ValueError("Unsupported output file type.")
def __enter__(self):
return self
def __exit__(self, type, value, tb):
self.close()
@property
def written_count(self):
return self.rdkit_writer.NumMols()
[docs] def append(self, mol):
self.rdkit_writer.write(mol)
[docs] def close(self):
self.rdkit_writer.close()
self.fh.close()
[docs]class NoneSkipper:
"""
A wrapper for a mol supplier, which, when iterated through, skips
the `None` mols, and can also be used as a context manager.
"""
[docs] def __init__(self, supplier):
"""
:param supplier: supplier of molecules
:type supplier: iterable of Mol
"""
self.supplier = supplier
def __enter__(self):
return self
def __exit__(self, type, value, tb):
try:
self.supplier.close()
except AttributeError:
pass
def __iter__(self):
for mol in self.supplier:
if mol is not None:
yield mol
[docs] def __len__(self):
# This will only work if supplier supports it!
return len(self.supplier)
[docs]def get_mol_writer(filename,
generate_coordinates=True,
require_stereo=False,
v3000=False,
cxsmiles=False):
"""
Return a StructureWriter-like object based on the command-line arguments.
RDkit is used for non-Maestro formats.
:param filename: filename to write
:type filtename: str
:param generate_coordinates: generate 3D coordinates (non-SMILES formats)
:type generate_coordinates: bool
:param require_stereo: when generating coordinates, fail when there's
unspecified stereochemistry, instead of producing an arbitrary isomer
:type require_stereo: bool
:param v3000: when writing SD, force the use of the V3000 format
:type V3000: bool
:param cxsmiles: when writing SMILES, use CXSMILES extensions
:type cxsmiles: bool
"""
if fileutils.is_maestro_file(filename):
return MolWriter(filename,
generate_coordinates=generate_coordinates,
require_stereo=require_stereo)
elif fileutils.is_csv_file(filename) or is_csvgz(filename):
return CsvMolWriter(filename, cxsmiles=cxsmiles)
else:
return RdkitMolWriter(filename, v3000=v3000)
[docs]def get_mol_reader(filename, skip_bad=True, implicitH=True, random_access=True):
"""
Return a Mol reader given a filename or a SMILES string. For .smi and .csv
files, use the RDKit SmilesMolSupplier; for other formats, use
StructureReader but convert Structure to Mol before yielding each molecule.
Whenever possible, the reader will be a Sequence. This is the currently the
case for .smi and .csv files when skip_bad is False. (And for a SMILES
string, which returns a list of size 1.)
:param skip_bad: if True, bad structures are skipped implicitly, instead
of being yielded as None (only applies to SMILES and CSV formats.)
:type skip_bad: bool
:param implicitH: use implicit hydrogens (only has an effect when reading
Maestro files)
:type implicitH: bool
:param random_access: if False, the reader object can only be used as
an iterator, and the file is not read in memory
all at once. (Only applies to CSV and PFX.)
:type random_access: bool
:rtype: Generator or Sequence of Mol
"""
if os.path.isfile(filename):
if is_pfx(filename):
format = PFX
elif is_csvgz(filename):
format = fileutils.SMILESCSV
else:
format = fileutils.get_structure_file_format(filename)
logger.debug("Opening %s", filename)
if format == fileutils.MAESTRO:
reader = structure.StructureReader(filename)
return StructureReaderAdapter(reader, implicitH)
if format == fileutils.SMILES:
supp = Chem.SmilesMolSupplier(filename,
delimiter=' ',
titleLine=False,
nameColumn=1)
elif format == fileutils.SMILESCSV:
if random_access:
supp = CsvMolReader(filename)
else:
supp = CsvMolIterator(filename)
elif format == PFX:
if random_access:
supp = PfxMolReader(filename)
else:
supp = PfxMolIterator(filename)
elif format == fileutils.SD:
supp = Chem.SDMolSupplier(filename)
else:
raise ValueError(f"Unsupported file format: {format}")
if skip_bad:
return NoneSkipper(supp)
else:
return supp
else:
mol = Chem.MolFromSmiles(filename)
if mol is None:
raise IOError(f"'{filename}' must be either a valid filename "
"or a valid SMILES")
# We know mol is not None, but NoneSkipper also turns the list into
# a context manager, which some callers expect.
return NoneSkipper([mol])
[docs]def get_mol(target, implicitH=True):
"""
Read a Mol from a file or a SMILES string.
:param target: filename or SMILES
:type target: str
:param implicitH: use implicit hydrogens (only has an effect when reading
Maestro files)
:type implicitH: bool
:rtype: rdkit.Chem.Mol
"""
reader = get_mol_reader(target, skip_bad=False, implicitH=implicitH)
return next(iter(reader))
[docs]def combine_output_files(outfiles,
out,
dedup=True,
sort=False,
union_csv_columns=False):
"""
Write the final output file.
:param outfiles: subjob output filenames
:type outfiles: list of str
:param out: output filename
:type out: list of str
:param dedup: skip duplicate products
:type dedup: bool
:param sort: sort output (implies the subjob output is sorted)
:type sort: bool
:param union_csv_columns: if csv, union infile columns.
:type union_csv_columns: bool
"""
logger.info("Combining subjob output files into %s." % out)
if dedup:
logger.info(
"Duplicate products will be removed, possibly resulting "
"in a smaller number of products than originally requested.")
missing, existing = map(list,
more_itertools.partition(os.path.isfile, outfiles))
if missing:
logger.warning('Missing output files:')
for fname in missing:
logger.warning(f' {fname}')
# ENUM-409: If we can simply concatenate the files, just do that
can_concatenate = not (sort or dedup or union_csv_columns)
if can_concatenate:
is_csv = fileutils.is_csv_file(out) or is_csvgz(out)
# Use csv-specific concatenation to deal with headers
if is_csv:
cat_csv_files(existing, out)
# Use fileutils.cat directly if not dealing with a csv
else:
fileutils.cat(existing, out)
return
# if the results are sorted, it's more efficient to merge as streams
merge_function = merge_files_as_streams if sort else merge_files_in_memory
format_handler = get_format_handler(existing,
out,
union_csv_columns=union_csv_columns)
n = merge_function(existing, out, format_handler, dedup)
logger.info(f'Wrote {n} structures.')
[docs]def merge_files_as_streams(infiles, outfile, file_handler, dedup):
'''
Copies structures from `infiles` into `outfile`. Rejects
duplicates using 'file_handler.getCompareKey.' Assumes infiles are
sorted.
:param infiles: names of the structure files to be joined
:type infiles: iterable over str
:param outfile: output file name
:type outfile: str
:param file_handler: object to handle open, read and write operations
for the file.
:type outfile: instance of subclass of BaseMergeHandler
:param dedup: flag to indicate if duplicate products should be removed from
merged output file
:type dedup: bool
:return: number of structures written
:rtype: int
'''
# default file handle limit, also the windows limit
max_files = DEFAULT_MAX_FILE_HANDLES
# if not on Windows, we can use the actual file handle limit
if sys.platform != 'win32':
from resource import getrlimit, RLIMIT_NOFILE
soft_limit, _ = getrlimit(RLIMIT_NOFILE)
proc = psutil.Process()
max_files = soft_limit - len(
proc.open_files()) - MAX_FILE_HANDLE_PADDING
merge_count = 0
st_written = 0
file_queue = collections.deque(infiles)
while len(file_queue) > 0:
tmp_file = f"merge_tmp_{merge_count}_" + outfile
merge_count += 1
batch_iters = []
with ExitStack() as stack:
while len(file_queue) > 0 and len(batch_iters) < max_files:
batch_iters.append(
stack.enter_context(
file_handler.getProductReader(file_queue.popleft())))
# if this is the last batch, write to the final output file.
if len(file_queue) < 1:
tmp_file = outfile
with file_handler.getProductAppender(tmp_file) as writer:
# ENUM-410: merge the sorted files as streams with
# heapq.merge, which forms a sorted heap without pulling all
# items into memory at once.
merged = heapq.merge(*batch_iters,
key=file_handler.getCompareKey)
last_smiles = ""
for prod in merged:
cur_smiles = file_handler.getCompareKey(prod)
if dedup and cur_smiles == last_smiles:
continue
writer.append(prod)
last_smiles = cur_smiles
# Track final output file size
if tmp_file == outfile:
st_written += 1
# Add intermediate merge files to the queue
if tmp_file != outfile:
file_queue.append(tmp_file)
logger.info(f"{st_written} structures written.")
return st_written
[docs]def merge_files_in_memory(infiles, outfile, filetype_handler, dedup):
'''
Copies structures from `infiles` into `outfile`. Rejects
duplicates using filetype_handler.getCompareKey.
:param infiles: names of the structure files to be joined
:type infiles: iterable over str
:param outfile: output file name
:type outfile: str
:return: number of structures written
:rtype: int
'''
seen = set()
nwritten = 0
with filetype_handler.getProductAppender(outfile) as writer:
for fname in infiles:
with filetype_handler.getProductReader(fname) as reader:
for prod in reader:
smiles = filetype_handler.getCompareKey(prod)
if not dedup or smiles not in seen:
writer.append(prod)
nwritten += 1
if dedup:
seen.add(smiles)
return nwritten
[docs]class BaseMergeHandler:
"""
Base class for filetype handlers for subjob output deduplication
and merging.
"""
[docs] def getProductReader(self, file):
"""
Given a file name, create and return an iterable file handle
to iterate over all products.
:param file: file name
:type file: str
:return: iterable context manager over filetype-specific product format
:rtype: iterable
"""
raise NotImplementedError
[docs] def getProductAppender(self, file):
"""
Given a file name, create and return a file-writing object
that writes with when its "append" method is called.
:param file: file name
:type file: str
:return: a file handle with context management that supports the
append() call used in merge_files_in_memory and
merge_files_as_streams.
:rtype: file-like object
"""
raise NotImplementedError
[docs] def getCompareKey(self, product):
"""
Given a product (formatted according to the filetype), return
the computed comparison key (SMILES string) for the product.
:param product: filetype-specific product
:type product: filetype-specific product (type varies)
"""
raise NotImplementedError
[docs]class CsvMergeHandler(BaseMergeHandler):
"""
Class to bundle csv read/write operations
"""
[docs] def __init__(self, infiles, outfile, union_columns=True, dedup_field=None):
"""
:param infiles: list of output files to join column, if necessary.
:type infiles: list(str)
:param outfile: output file
:type outfile: str
:param union_columns: flag to write out the union of infile csv
columns (if infile columns differ)
:type union: bool
:param dedup_field: csv column to use to check for duplicates during
deduplication
:type dedup_field: str
"""
self.header = None
self.dedup_field = dedup_field
self.fieldnames = None
# Get fieldnames and use a csv DictReader/DictWriter if joining
# columns, or deduplicating by user-specified column name
if union_columns or dedup_field is not None:
self.fieldnames = get_fieldnames(infiles)
if len(infiles) > 0:
self.first_file = infiles[0]
[docs] def getProductReader(self, file):
"""
Open a csv file, skip the first (header) line if necessary, and return
a context-managing iterable over all remaining lines.
:param file: file name
:type file: str
:return: iterable context manager over csv lines
:rtype: _CsvReadWrapper (iter(str) or iter(dict))
"""
file_handle = open_maybe_compressed(file, 'rt')
if self.fieldnames is not None:
reader = csv.DictReader(file_handle)
else:
reader = csv.reader(file_handle)
# try to process the file header, if any
file_header = None
try:
file_header = next(reader)
except StopIteration:
pass
if file_header is not None:
if self.header is None:
self.header = file_header
if file_header != self.header:
msg = "Inconsistent header: {} != {}".format(
file, self.first_file)
file_handle.close()
raise ValueError(msg)
return CsvMergeHandler._CsvReadWrapper(file_handle, reader)
[docs] def getProductAppender(self, file):
"""
Open a csv file, write the first (header) line, and return
a line writer that supports the getProductAppender.append calls.
:param file: file name
:type file: str
:return: a file handle that supports the append() call used in
merge_files_in_memory and merge_files_as_streams.
:rtype: file-like object
"""
return CsvMergeHandler._CsvProductAppender(file, self.getHeader,
self.fieldnames)
[docs] def getHeader(self):
"""
Returns the header for ProductAppenders to reference.
:return: Header line for the input csv files.
:rtype: str
"""
return self.header
[docs] def getCompareKey(self, prod):
"""
Compute SMILES from a given csv-formatted product.
:param prod: product in question
:type prod: dict or list
:return: SMILES string
:rtype: str
"""
if self.dedup_field is not None:
return prod[self.dedup_field]
if isinstance(prod, dict):
# if dedup_field was not provided, we don't know which field
# (the dict key) is the compare key.
raise ValueError("Comparison column not specified, cannot compute "
"compare key.")
return prod[0]
class _CsvReadWrapper:
"""
Class to combine a csv.reader or csv.DictReader with a context
manager.
"""
def __init__(self, file_handle, csv_reader):
self.file_handle = file_handle
self.csv_reader = csv_reader
def __iter__(self):
return iter(self.csv_reader)
def __next__(self):
return next(self.csv_reader)
def __enter__(self):
return self
def __exit__(self, type, value, tb):
self.file_handle.close()
class _CsvProductAppender:
"""
Class to wrap a file handle with a csv-writing object, and
redirect append() calls to its write function.
"""
def __init__(self, file, header_function, fieldnames=None):
self.header_function = header_function
self.file_handle = open_maybe_compressed(file, 'wt', newline='')
self.wrote_header = False
if fieldnames is None:
self.csv_writer = csv.writer(self.file_handle)
header = self.header_function()
if header is not None:
self.csv_writer.writerow(header)
self.wrote_header = True
else:
self.csv_writer = csv.DictWriter(self.file_handle, fieldnames)
self.csv_writer.writeheader()
self.wrote_header = True
def __enter__(self):
return self
def __exit__(self, type, value, tb):
self.file_handle.__exit__(type, value, tb)
def append(self, line):
"""
Write the header if necessary, then append the line.
:param line: line to be written
:type line: str
"""
if not self.wrote_header:
# header should be defined by now
header = self.header_function()
assert header is not None
# append header if necessary
self.csv_writer.writerow(header)
self.wrote_header = True
self.csv_writer.writerow(line)
[docs]class StructureMergeHandler(BaseMergeHandler):
"""
Helper class to bundle structure.Structure IO operations.
"""
[docs] def __init__(self):
self.smiles_generator = smiles_mod.SmilesGenerator()
[docs] def getProductReader(self, file):
"""
Create and return a structure reader
:param file: structure file name
:type file: str
:return: structure reader for file
:rtype: structure.StructureReader
"""
return structure.StructureReader(file)
[docs] def getProductAppender(self, file):
"""
Create and return a structure writer
:param file: structure file name
:type file: str
:return: structure writer for file
:rtype: structure.StructureWriter
"""
return structure.StructureWriter(file)
[docs] def getCompareKey(self, prod):
"""
Compute smiles from a given Schrodinger structure to compare against
other structures.
:param prod: product in question
:type prod: structure.Structure
:return: SMILES string
:rtype: str
"""
return self.smiles_generator.getSmiles(prod)
[docs]class SmiMergeHandler(BaseMergeHandler):
"""
Helper class to bundle SMILES (.smi) IO operations.
"""
[docs] def getProductReader(self, file):
"""
Create and return a SMILES line reader
:param file: SMILES file name
:type file: str
:return: SMILES line reader for file
:rtype: file-like object (__enter__, __exit__, __iter__)
"""
return open(file, 'r')
[docs] def getProductAppender(self, file):
"""
Create and return a SMILES line writer
:param file: SMILES file name
:type file: str
:return: SMILES line writer for file
:rtype: _SmilesAppender
"""
return SmiMergeHandler._SmilesAppender(file)
[docs] def getCompareKey(self, prod):
"""
Compute smiles from a given SMILES line for comparison to other
SMILES lines.
:param prod: product in question
:type prod: str
:return: SMILES string
:rtype: str
"""
return prod.split()[0]
class _SmilesAppender:
"""
Wrapper class to redirect append() calls to
the standard file write() call.
"""
def __init__(self, file):
self.handle = open(file, 'w')
def append(self, product):
self.handle.write(product)
def __enter__(self):
return self
def __exit__(self, type, value, tb):
self.handle.__exit__(type, value, tb)
[docs]def get_fieldnames(filenames):
"""
Return a list with the union of the field names from all the given CSV
files. The field names are listed in the order in which they were first
seen. (First all the fields from file #1, then the "new" field names from
file #2, etc.)
:param filenames: list of CSV files
:type filenames: [str]
:return: list of field names
:rtype: [str]
"""
fieldnames = {}
for fname in filenames:
with open_maybe_compressed(fname, 'rt') as fin:
reader = csv.reader(fin)
try:
row = next(reader)
except StopIteration:
row = []
fieldnames.update({name: None for name in row})
return list(fieldnames.keys())
[docs]def is_csvgz(filename):
lcfname = filename.lower()
return (lcfname.endswith('.csv.gz') or lcfname.endswith('.csvgz'))
[docs]def is_pfx(filename):
return filename.lower().endswith(PFX)
[docs]def get_pfx_size(filename):
"""
Return the size from the metadata header of a .pfx file.
"""
with zipfile.ZipFile(filename) as zipfh:
jsonstr = zipfh.read(METADATA)
metadata = json.loads(jsonstr)
return metadata['size']
[docs]def remove_react_atom_props(mol):
"""
Return a copy of `mol` where atom properties added by the RDKit reaction
module have been stripped out.
:param mol: input molecule; not modified
:type mol: rdkit.Chem.Mol
:return: modified molecule
:rtype: rdkit.Chem.Mol
"""
new_mol = copy.copy(mol)
react_props = ['react_atom_idx', 'old_mapno']
for atom in new_mol.GetAtoms():
for prop in react_props:
atom.ClearProp(prop)
return new_mol
[docs]def cat_csv_files(source_filenames, dest_filename):
"""
Quick and dirty csv concatenation strategy. Assumes all csv files
have the same columns and does not deduplicate.
:param source_filenames: input files
:param dest_filename: destination file
"""
with open(dest_filename, 'wb') as fho:
header = None
for fname in source_filenames:
with open_maybe_compressed(fname, 'rb') as fh:
# consume first line, assumed to be header
file_header = next(fh)
if header is None:
header = file_header
fho.write(header)
if file_header != header:
raise ValueError(
f"Inconsistent header for {fname}: {header} != {file_header}"
)
shutil.copyfileobj(fh, fho)
[docs]def copy_csv_file(input_file, output_file):
"""
Copy compressed or uncompressed input .csv file to another .csv file.
Output file can also be compressed or uncompressed.
:param input_file: input file name
:type input_file: str
:param output_file: output file name
:type output_file: str
"""
with open_maybe_compressed(input_file, 'rb') as f_in:
with open_maybe_compressed(output_file, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)