Source code for schrodinger.application.phase.packages.oned_task_utils

"""
Performs task-based work for the 1D similarity driver.

Copyright Schrodinger LLC, All Rights Reserved.
"""

import base64
import csv
import gzip
import heapq
import io
import math
import os

from schrodinger import adapter
from schrodinger import structure
from schrodinger.application.phase.packages import phase_utils
from schrodinger.infra import canvas
from schrodinger.infra import phase
from schrodinger.job import jobcontrol
from schrodinger.utils import fileutils

ONED_SCREEN = 'oned_screen'
TASK_CREATE = 'create'
TASK_DESCRIBE = 'describe'
TASK_EXPORT = 'export'
TASK_MERGE = 'merge'
TASK_RUN = 'run'
TASK_SPLIT = 'split'
LEGAL_STRUCTURE_FILE_FORMATS = [
    fileutils.SMILES, fileutils.SMILESCSV, fileutils.MAESTRO, fileutils.SD
]
LEGAL_STRUCTURE_FILE_TYPES = ['SMILES', 'SMILES-CSV', 'Maestro', 'SD']
SPLIT_STRUCTURE_FILE_FORMAT_EXT = {
    fileutils.SMILES: '.smi.gz',
    fileutils.SMILESCSV: '.csv.gz',
    fileutils.MAESTRO: '.maegz',
    fileutils.SD: '.sdfgz'
}
ONED_VERSION = '1.0'
ONED_PHARM = 'pharm'
ONED_ELEMENT = 'element'
ONED_MMOD = 'mmod'
ONED_TREATMENTS = (ONED_PHARM, ONED_ELEMENT, ONED_MMOD)
ONED_TREATMENTS_DICT = {
    ONED_PHARM: phase.ONED_TREATMENT_PHARM,
    ONED_ELEMENT: phase.ONED_TREATMENT_ELEMENT,
    ONED_MMOD: phase.ONED_TREATMENT_MMOD
}
ONED_CREATE_PROGRESS_INTERVAL = 10000
ONED_RUN_PROGRESS_INTERVAL = 100000
ONED_ROW_COUNT_BYTES = 16
ONED_HITS_SUFFIX = '-hits.csv.gz'
ONED_MAX_HITS = 1000
ONED_MAX_ROWS = 1000000
ONED_MAX_QUERIES = 25
ONED_REP_PROPERTY = 's_phase_1D_Rep'
ONED_SIM_PROPERTY = 'r_phase_1D_Sim'
ONED_LEADING_PROPERTIES = ['SMILES', 'NAME', ONED_REP_PROPERTY]


[docs]class BasicStatsAccumulator: """ Accumulates basic statistics for a set of property values. """
[docs] def __init__(self, property_name): """ Constructor taking the name of the property. """ self.property_name = property_name self.count = 0 self.min = 0.0 self.max = 0.0 self.mean = 0.0 self._mean_x2 = 0.0
[docs] def addValue(self, value): """ Adds a numeric value to the series, updating the statistics. """ self.count += 1 if self.count == 1: self.min = value self.max = value self.mean = value self._mean_x2 = value * value else: self.min = min(self.min, value) self.max = max(self.max, value) wold = (self.count - 1) / self.count wnew = 1.0 / self.count self.mean = wold * self.mean + wnew * value self._mean_x2 = wold * self._mean_x2 + wnew * value * value
@property def std_dev(self): """ Returns the standard deviation. """ return math.sqrt(self._mean_x2 - self.mean * self.mean)
[docs]def base64_decode_fd(s): """ Decodes feature definitions from a Base64 string. Feature definitions will be empty if string is empty. :param s: Base64-encoded feature definition string :type s: str :return: Feature definitions :rtype: list(phase.PhpFeatureDefinition) """ if not s: return [] return phase.feature_definitions_from_string( base64.b64decode(s.encode('ascii')).decode('utf-8'))
[docs]def base64_encode_fd(fd): """ Encodes feature definitions to a Base64 string. String will be empty if fd is empty or None. :param fd: Feature definitions :type fd: list(phase.PhpFeatureDefinition) :return: Base64-encoded feature definitions string :rtype: str """ if not fd: return '' s = phase.feature_definitions_to_string(fd) return base64.b64encode(s.encode('utf-8')).decode('ascii')
[docs]def combine_oned_hits(hits_files_in, hits_file_out, query_row=None, sort=True, max_hits=ONED_MAX_HITS, max_rows=ONED_MAX_ROWS): """ Combines a set of 1D hits files with or without sorting and capping. :param hits_files_in: List of compressed CSV hits files to combine :type hits_files_in: list(str) :param hits_file_out: Output compressed CSV hits file :type hits_file_out: str :param query_row: If supplied, this row is written before any hits :type query_row: list(str) :param sort: Whether to write a sorted hits file :type sort: bool :param max_hits: Cap on the number of sorted hits to output :type max_hits: int :param max_rows: Maximum number of sorted rows to hold in memory :type max_rows: int :return: Number of hits written :rtype: int """ if len(hits_files_in) == 0: return 0 if not sort: return merge_oned_hits_files(hits_files_in, hits_file_out, query_row) remove_unsorted_hits = False if len(hits_files_in) == 1: # No need to merge before sorting. unsorted_hits_file = hits_files_in[0] else: unsorted_hits_file = hits_file_out + '.tmp' merge_oned_hits_files(hits_files_in, unsorted_hits_file) remove_unsorted_hits = True # Get master properties, which may or may not have been merged. with gzip.open(unsorted_hits_file, 'rt') as fh: master_props = next(csv.reader(fh)) # Sort hits. with gzip.open(hits_file_out, 'wt', newline='') as fh_out: writer = csv.writer(fh_out) writer.writerow(master_props) if query_row: writer.writerow(query_row) # Make one or more passes through unsorted_hits_file, dumping sorted # hits from a heap at the end of each pass. max_heap = min(max_hits, max_rows) hits_written = set() while len(hits_written) < max_hits: heap = [] num_hits_remaining = max_hits - len(hits_written) max_heap = min(max_heap, num_hits_remaining) with gzip.open(unsorted_hits_file, 'rt') as fh_in: reader = csv.reader(fh_in) next(reader) for i, row in enumerate(reader): if i in hits_written: continue sim = float(row[-1]) # We use -i here and when sorting to preserve original # order in the event of a tie on sim. item = (sim, -i, row) if len(heap) == max_heap: heapq.heappushpop(heap, item) else: heapq.heappush(heap, item) for item in sorted(heap, reverse=True): writer.writerow(item[2]) hits_written.add(-item[1]) if len(heap) == 0: break if remove_unsorted_hits: fileutils.force_remove(unsorted_hits_file) return len(hits_written)
[docs]def create_oned_data_file(structure_file, oned_data_file, treatment=phase.ONED_TREATMENT_PHARM, fd=None, props=None, logger=None, progress_interval=ONED_CREATE_PROGRESS_INTERVAL): """ Creates a 1D data file from the structures in a SMILES, SMILES-CSV, Maestro or SD file. :param structure_file: Input file of structures :type structure_file: str :param oned_data_file: Destination 1D data file (.1dbin) :type oned_data_file: str :param treatment: Structure treatment for 1D representations :type treatment: phase.OneDTreatment :param fd: Overrides default feature definitions. Relevant only when treatment is phase.ONED_TREATMENT_PHARM. :type fd: list(phase.PhpFeatureDefinition) or NoneType :param props: m2io-style properties to include in the 1D data file, other than SMILES and title. Not used when a SMILES file is supplied. :type props: list(str) or NoneType :param logger: Logger for info level progress messages :type logger: logging.Logger or NoneType :param progress_interval: Interval between progress messages :type progress_interval: int :return: Number of rows written to the 1D data file :rtype: int """ generator = phase.OneDGenerator(treatment) fd_oned = [] if treatment == phase.ONED_TREATMENT_PHARM: if fd: fd_oned = fd else: fd_oned = phase.read_feature_definitions( phase.getDefaultFdFilePath()) generator.setFd(fd_oned) encoder = phase.OneDEncoder() oned_attr = [ONED_VERSION, str(treatment), base64_encode_fd(fd_oned)] properties = ONED_LEADING_PROPERTIES.copy() additional_props = [] if props: additional_props = list(props) properties += additional_props row_count = 0 with gzip.open(oned_data_file, 'wt', newline='') as fh_out: # Delimit with tabs to avoid quoting issues. writer = csv.writer(fh_out, delimiter='\t') writer.writerow(oned_attr) writer.writerow(properties) with get_structure_file_reader(structure_file) as reader: reading_smiles = True if isinstance(reader, structure.StructureReader): reading_smiles = False for st in reader: title = st.title try: if reading_smiles: smiles = st.smiles oned_rep = generator.generate( adapter.to_structure(smiles)) else: oned_rep = generator.generate(st) smiles = adapter.to_smiles(st) encoder.fromOneDRep(oned_rep) base64_rep = encoder.toBase64() except phase.PhpException as exc: # The most common failure is a fragmented structure, # which prevents creation of the topological distance # matrix that the 1D coordinates attempt to reproduce. if logger: logger.info(f'{exc} - skipping "{title}"') continue prop_values = [smiles, title, base64_rep] for prop in additional_props: prop_values.append(st.property.get(prop, '')) writer.writerow(prop_values) row_count += 1 if logger and row_count % progress_interval == 0: logger.info(f'{row_count} structures processed') write_oned_data_file_row_count(oned_data_file, row_count) if logger: logger.info(f'Total number of structures processed: {row_count}') return row_count
[docs]def describe_oned_data_file(oned_data_file, stats=False): """ Returns a string containing a description of the supplied 1D data file. :param oned_data_file: Name of the 1D data file (.1dbin) :type oned_data_file: str :param stats: Whether to report basic statistics for any numeric properties in the 1D data file :type stats: bool :return: String containing the description :rtype: str """ version, treatment, fd = get_oned_data_file_attributes(oned_data_file) descr = [f'Version: {version}'] descr.append(f'Structure treatment: {ONED_TREATMENTS[treatment]}') if treatment == phase.ONED_TREATMENT_PHARM: fd_default = phase.read_feature_definitions( phase.getDefaultFdFilePath()) fd_descr = 'default' if fd == fd_default else 'custom' descr.append(f'Feature definitions: {fd_descr}') row_count = get_oned_data_file_row_count(oned_data_file) descr.append(f'Total number of rows: {row_count}') props = get_oned_data_file_properties(oned_data_file) numeric_props = [] numeric_pos = [] is_int_prop = [] if props: descr.append('Additional properties:') for pos, prop in enumerate(props, 3): descr.append(f' {prop}') if stats and prop[0:2] in ('i_', 'r_'): numeric_props.append(prop) numeric_pos.append(pos) is_int_prop.append(True if prop[0:2] == 'i_' else False) if numeric_props: accumulators = [BasicStatsAccumulator(prop) for prop in numeric_props] for row in get_oned_data_file_rows(oned_data_file): for i, pos in enumerate(numeric_pos): if row[pos]: value = int(row[pos]) if is_int_prop[i] else float(row[pos]) accumulators[i].addValue(value) descr.append('Numeric property statistics:') for accumulator in accumulators: descr.append(f' {accumulator.property_name}') descr.append(f' Count: {accumulator.count}') descr.append(f' Min: {accumulator.min}') descr.append(f' Max: {accumulator.max}') descr.append(' Mean: %.6f' % accumulator.mean) descr.append(' Std. Dev: %.6f' % accumulator.std_dev) return '\n'.join(descr)
[docs]def export_oned_data_file(oned_data_file, output_file, subset=None): """ Exports rows from a 1D data file to a compressed CSV file. A subset of rows may be specified as a string of comma-separated row ranges, (e.g., '1:100,200:300') or via a text file with a property name on the first line (e.g., 's_m_title' or 's_sd_Vendor_ID') and the values of that property on subsequent lines. :param oned_data_file: Name of the 1D data file (.1dbin) :type oned_data_file: str :param output_file: Output compressed CSV file :type output_file: str :param subset: A comma-separated list of row ranges or text file name :type subset: str :return: Number of rows exported :rtype: int """ row_count = 0 with gzip.open(output_file, 'wt', newline='') as fh_out: writer = csv.writer(fh_out) writer.writerow(get_master_oned_properties([oned_data_file])) if not subset: for row in get_oned_data_file_rows(oned_data_file): writer.writerow(row) row_count += 1 elif os.path.isfile(subset): pos, values = get_values_to_match(oned_data_file, subset) for row in get_oned_data_file_rows(oned_data_file): if row[pos] in values: writer.writerow(row) row_count += 1 else: bitset = get_rows_to_export(subset) start = 0 stop = bitset.size() for i, row in enumerate( get_oned_data_file_rows(oned_data_file, start, stop)): if bitset.get(i): writer.writerow(row) row_count += 1 return row_count
[docs]def get_hits_file_names(jobname, nqueries): """ Returns the names of the hits files the job will produce based on the number of query structures. :param jobname: Job name :type jobname: str :param nqueries: Number of query structures :type nqueries: int :return: Hits file names :rtype: list(str) """ suffix = ONED_HITS_SUFFIX if nqueries == 1: return [jobname + suffix] else: return [f'{jobname}_{i}{suffix}' for i in range(1, nqueries + 1)]
[docs]def get_jobname(args): """ Assigns the job name from SCHRODINGER_JOBNAME or from the base name of the appropriate input file. :param args: argparser.Namespace with command line arguments :type args: argparser.Namespace :return: Job name :rtype: str """ task = args.task if task in (TASK_CREATE, TASK_MERGE): file_task = fileutils.get_basename(args.dest) + '_' + task elif task in (TASK_DESCRIBE, TASK_EXPORT, TASK_SPLIT): file_task = fileutils.get_basename(args.source) + '_' + task else: # TASK_RUN. file_task = fileutils.get_basename(args.query) return jobcontrol.get_jobname(file_task)
[docs]def get_master_oned_properties(filenames): """ Returns the union of all properties in the provided 1D data files or compressed CSV hits files. The first three properties are always SMILES NAME and ONED_REP_PROPERTY. If CSV hits files are supplied, the last property will always be ONED_SIM_PROPERTY. Any additional properties will appear after the first three properties. If multiple files are supplied, those additional properties will be sorted alphabetically. :param filenames: List of files to consider :type filenames: list(str) :return: Union of additional properties :rtype: list(str) """ if not filenames: return [] oned_fmt = is_oned_data_file(filenames[0]) extra_props = set() trailing = [] if oned_fmt else [ONED_SIM_PROPERTY] for filename in filenames: if oned_fmt: props = get_oned_data_file_properties(filename) else: with gzip.open(filename, 'rt') as fh: props = next(csv.reader(fh))[3:-1] if len(filenames) == 1: # Preserve original order of properties. return ONED_LEADING_PROPERTIES + props + trailing for prop in props: extra_props.add(prop) return ONED_LEADING_PROPERTIES + sorted(extra_props) + trailing
[docs]def get_oned_data_file_attributes(oned_data_file): """ Returns the version, structure treatment and feature definitions of the supplied 1D data file. :param oned_data_file: Name of the 1D data file (.1dbin) :type oned_data_file: str :return: tuple of version, structure treatment and feature definitions :rtype: str, phase.OneDTreatment, list(phase.PhpFeatureDefinition) """ with gzip.open(oned_data_file, 'rt') as fh: tokens = next(csv.reader(fh, delimiter='\t')) return tokens[0], int(tokens[1]), base64_decode_fd(tokens[2])
[docs]def get_oned_data_file_names(source, prefer_cwd=False): """ Returns the name of the 1D data file(s) specified in source, which may be the name of a 1D data file or a list file containing the names of one or more 1D data files. :param source: 1D data file source specification :type source: str :param prefer_cwd: If source is a list file, setting this to True forces the use of 1D data files in the CWD if they exist, even if they also exist at the locations specified in the list file. This addresses the situation where the list file contains absolute paths that exist on the job host, but the corresponding files have been copied to the job directory. In that case, we want to be accessing only the files in the job directory. :type prefer_cwd: bool :return: 1D data file names :rtype: list(str) """ source_format = phase.get_phase_file_format(source) if source_format == phase.PhpFileFormat_PHP_FORMAT_LIST: return phase_utils.get_file_names_from_list_file(source, prefer_cwd) else: return [source]
[docs]def get_oned_data_file_properties(oned_data_file): """ Returns a list of the names of any additional properties stored in the supplied 1D data file. These are properties other than SMILES, NAME and the ONED_REP_PROPERTY. The list will be empty if no additonal properties are stored. :param oned_data_file: Name of the 1D data file (.1dbin) :type oned_data_file: str :return: list of additional property names :rtype: list(str) """ with gzip.open(oned_data_file, 'rt') as fh: reader = csv.reader(fh, delimiter='\t') next(reader) return next(reader)[3:]
[docs]def get_oned_data_file_distribution(oned_data_files, nsub): """ Given a list of 1D data files to screen and the number of subjobs over which the screen is to be distributed, this function determines how to divide the 1D data files over the subjobs. A list with nsub elements is returned, where a given element holds one or more 1D data file names and the (start, stop) row limits to screen in that file. For example, if 1D data files file1.1dbin and file2.1dbin are supplied, with 1200 and 1800 rows, respectively, and nsub is 3, this function would return the following: [[['file1.1dbin', (0, 1000)]], # subjob 1 [['file1.1dbin', (1000, 1200)], ['file2.1dbin', (0, 800)]], # subjob 2 [['file2.1dbin', (800, 1800)]]] # subjob 3 :param oned_data_files: List of 1D data files :type oned_data_files: list(str) :param nsub: Number of subjobs :type nsub: int :return: List of lists of file name, (start, stop) limits :rtype: list(list(str, (int, int))) """ if not oned_data_files: return [] file_row_counts = [get_oned_data_file_row_count(x) for x in oned_data_files] total_rows = sum(file_row_counts) nsub = min(nsub, total_rows) subjob_row_counts = phase.partitionValues(total_rows, nsub) distributions = [] file_pos = 0 start = 0 for subjob_row_count in subjob_row_counts: distribution = [] current_row_count = 0 while current_row_count < subjob_row_count: if start >= file_row_counts[file_pos]: file_pos += 1 start = 0 remaining_row_count = subjob_row_count - current_row_count stop = min(start + remaining_row_count, file_row_counts[file_pos]) distribution.append([oned_data_files[file_pos], (start, stop)]) current_row_count += stop - start start = stop distributions.append(distribution) return distributions
[docs]def get_oned_data_file_splits(oned_data_file, prefix_out, nfiles): """ Returns a list of output file names and (start, stop) limits for physically splitting a 1D data file into a number of smaller, equal-sized files. A given element of the returned list will be of the form: <prefix_out>_<n>.1dbin, (start, stop) where <prefix_out>_<n>.1dbin is the nth output file to create and start, stop are the corresponding row limits in oned_data_file, with stop being non-inclusive. :param oned_data_file: The name of the 1D data file to be split :type oned_data_file: str :param prefix_out: Prefix for all output 1D data files :type prefix_out: str :param nfiles: The number of output 1D data files to create :type nfiles: int :return: List of file names and (start, stop) limits :rtype: list(str, (int, int)) """ total_rows = get_oned_data_file_row_count(oned_data_file) file_sizes = phase.partitionValues(total_rows, nfiles) file_ext = phase.PHASE_1DBIN_FILE_EXT file_splits = [] start = 0 for i, file_size in enumerate(file_sizes, 1): file_name = f'{prefix_out}_{i}{file_ext}' stop = start + file_size file_splits.append((file_name, (start, stop))) start = stop return file_splits
[docs]def get_oned_data_file_row_count(oned_data_file): """ Returns the number of rows in the supplied 1D data file. :param oned_data_file: Name of the 1D data file (.1dbin) :type oned_data_file: str :return: Number of rows in the file :rtype: int """ with open(oned_data_file, 'rb') as reader: reader.seek(-ONED_ROW_COUNT_BYTES, io.SEEK_END) return int.from_bytes(reader.read(), byteorder='big')
[docs]def get_oned_data_file_rows(oned_data_file, start=0, stop=None): """ Generator that yields rows from the supplied 1D data file. Each row is a list of strings, where the first 3 elements are SMILES, name and 1D encoding, and any subsequent elements hold the values of additional properties stored in the 1D data file. :param oned_data_file: Name of the 1D data file (.1dbin) :type oned_data_file: str :param start: 0-based starting row position :type start: int :param stop: Upper limit on the rows to read. For example, if start=5 and stop=10, rows 5, 6, 7, 8 and 9 will be read. Reading proceeds until the end of the file by default. :type stop: int :yield: The next row in the file :ytype: list(str) """ nskip = start + 2 finish = stop if stop else get_oned_data_file_row_count(oned_data_file) with gzip.open(oned_data_file, mode='rt') as fh: reader = csv.reader(fh, delimiter='\t') try: for i in range(nskip): next(reader) for i in range(start, finish): yield next(reader) except gzip.BadGzipFile: # Will get here if we reach the row count footer pass
[docs]def get_oned_query(st_query, oned_data_file): """ Returns a 1D representation for the provided query structure that's created according to the attributes in the supplied 1D data file. :param st_query: The query structure :type st_query: structure.Structure :param oned_data_file: Name of the input 1D data file (.1dbin) :type oned_data_file: str :return: 1D representation of the query structure :rtype: phase.OneDRep """ _, treatment, fd = get_oned_data_file_attributes(oned_data_file) generator = phase.OneDGenerator(treatment) if treatment == phase.ONED_TREATMENT_PHARM: generator.setFd(fd) return generator.generate(st_query)
[docs]def get_oned_query_row(st_query, oned_query_base64, master_properties): """ Returns a row for the query structure that can be written to the top of a hits file. :param st_query: The query structure :type st_query: structure.Structure :param oned_query_base64: Base64-encoded 1D representation of the query structure :type oned_query_base64: str :param master_properties: The full list of properties being written to the hits file :type master_properties: list(str) :return: Query structure row :rtype: list(str) """ query_row = len(master_properties) * [''] query_row[0] = adapter.to_smiles(st_query) query_row[1] = st_query.title query_row[2] = oned_query_base64 query_row[-1] = 1.0 for i, prop in enumerate(master_properties[3:-1], start=3): query_row[i] = st_query.property.get(prop, '') return query_row
[docs]def get_oned_query_rows(st_queries, oned_data_files): """ Given a list of query structures and the 1D data files that were screened, this function returns a row for each query that can be supplied to combine_oned_hits to ensure that the query appears at the top of its associated hits file. :param st_queries: Query structures :type st_queries: list(structure.Structure) :param oned_data_files: Names of 1D data files that were screened :type oned_data_files: list(str) :return: A row for each query :rtype: list(list(str)) """ if not oned_data_files: return [] query_rows = [] encoder = phase.OneDEncoder() master_props = get_master_oned_properties(oned_data_files) master_props.append(ONED_SIM_PROPERTY) for st in st_queries: oned_query = get_oned_query(st, oned_data_files[0]) encoder.fromOneDRep(get_oned_query(st, oned_data_files[0])) query_rows.append( get_oned_query_row(st, encoder.toBase64(), master_props)) return query_rows
[docs]def get_oned_query_structures(query_file): """ Reads query structures from the provided SMILES, SMILESCSV, Maestro or SD file. Coordinates are not set in the case of SMILES or SMILESCSV. :param query_file: Structure file containing queries :type query_file: str :return: Query structures :rtype: list(structure.Structure) """ queries = [] with get_structure_file_reader(query_file) as reader: reading_smiles = True if isinstance(reader, structure.StructureReader): reading_smiles = False for st in reader: if reading_smiles: st_smiles = adapter.to_structure(st.smiles) st_smiles.property.update(st.property) queries.append(st_smiles) else: queries.append(st) return queries
[docs]def get_property_positions(master_properties, file_properties): """ Returns the postion of each master property in a potentially smaller list of properties from a particular file. If a master property is not found in file_properties, the position of that property will be len(file_properties). Thus if file_pos contains the positions returned by this function, and file_row contains the property values for some row in that file, the following code can be used to construct a master row of property values that contains '' for each missing value: file_row.append('') master_row = [file_row[pos] for pos in file_pos] :param master_properties: Master list of properties :type master_properties: list(str) :param file_properties: The list of properties from the file :param file_properties: list(str) :return: Positions of master_properties within file_properties :rtype: list of int """ positions = len(master_properties) * [len(file_properties)] for i, prop in enumerate(master_properties): try: positions[i] = file_properties.index(prop) except ValueError: pass return positions
[docs]def get_rows_to_export(row_ranges): """ Constructs a canvas.ChmBitset from a comma-separated list of row ranges (e.g., '1:100,200:300'). Input row numbers are 1-based and upper limits are inclusive. The returned bitset will have a logical size equal to the highest row number supplied, and the on positions will be 0-based. Note that the maximum logical size for a ChmBitset is 4,294,967,263, so this function assumes that users will not attempt to create individual 1D data files that would exceed the ChmBitset limit. :param oned_data_file: The name of the 1D data file (.1dbin) :type oned_data_file: str :param row_ranges: Comma-separated list of 1-based row ranges :type row_ranges: str :return: Bitset with 0-based rows as the on positions :rtype: canvas.ChmBitset :raise: ValueError if an illegal string of row ranges is supplied """ try: range_filter = canvas.ChmRangeFilter(row_ranges) return range_filter.toBitset(range_filter.getLast()) except canvas.ChmException as err: raise ValueError(str(err))
[docs]def get_split_file_names(prefix, n): """ Returns the names of the 1D data files that will be created in the 'split' task. :param prefix: Prefix of output 1D data files :type prefix: str :param n: Number of files to create :type n: int :return: 1D data file names :rtype: list(str) """ ext = phase.PHASE_1DBIN_FILE_EXT return [f'{prefix}_{i}{ext}' for i in range(1, n + 1)]
[docs]def get_structure_file_reader(structure_file): """ Returns the appropriate reader for the supplied structure file, which is expected to be SMILES, SMILESCSV, MAESTRO or SD. :param structure_file: Input file of structures :type structure_file: str :return: Structure file reader :rtype: structure.SmilesReader, structure.SmilesCsvReader or structure.StructureReader :raise: ValueError if the file format is illegal """ in_format = fileutils.get_structure_file_format(structure_file) if in_format not in LEGAL_STRUCTURE_FILE_FORMATS: raise ValueError(f'Illegal structure file format: "{structure_file}"') if in_format == fileutils.SMILES: return structure.SmilesReader(structure_file) elif in_format == fileutils.SMILESCSV: return structure.SmilesCsvReader(structure_file) else: return structure.StructureReader(structure_file)
[docs]def get_values_to_match(oned_data_file, filename): """ Given a 1D data file and a text file containing a property name followed by property values to match, this function returns the 0-based position of the specified property in the 1D data file and a set of the values to match. :param oned_data_file: The name of the 1D data file (.1dbin) :type oned_data_file: str :param filename: The name of the text file with the property name and the values to match :type filename: str :return: 0-based property position, followed by values to match :rtype: int, set(str) :raise: ValueError if the property is not found in oned_data_file """ with open(filename, 'rt') as reader: prop = reader.readline().rstrip('\n') prop_lower = prop.lower() if prop_lower == 'smiles': pos = 0 elif prop_lower in ('name', 'title', 's_m_title'): pos = 1 else: try: oned_props = get_oned_data_file_properties(oned_data_file) pos = oned_props.index(prop) + 3 except ValueError: msg = f'Property "{prop}" not found in {oned_data_file}' raise ValueError(msg) return pos, set(line.strip() for line in reader)
[docs]def is_oned_data_file(filename): """ Returns True if the supplied file name corresponds to a 1D data file. :param filename: The name of the file :type filename: str :return: Whether the name corresponds to a 1D data file :rtype: bool """ file_format = phase.get_phase_file_format(filename) return file_format == phase.PhpFileFormat_PHP_FORMAT_1DBIN
[docs]def merge_oned_data_files(oned_data_files_in, oned_data_file_out, remove=False): """ Merges a list of 1D data files, creating an output file with a master set of properties. :param oned_data_files_in: List of 1D data files to merge :type oned_data_files_in: list(str) :param oned_data_file_out: Output 1D data file :type oned_data_file_out: str :param remove: If True input 1D files will be removed after merge :type remove: bool :return: Total number of rows merged :rtype: int """ if not oned_data_files_in: return master_props = get_master_oned_properties(oned_data_files_in) version, treatment, fd = get_oned_data_file_attributes( oned_data_files_in[0]) oned_attr = [version, str(treatment), base64_encode_fd(fd)] with gzip.open(oned_data_file_out, 'wt', newline='') as fh_out: writer = csv.writer(fh_out, delimiter='\t') writer.writerow(oned_attr) writer.writerow(master_props) row_count = 0 for oned_data_file_in in oned_data_files_in: file_props = get_master_oned_properties([oned_data_file_in]) file_pos = get_property_positions(master_props, file_props) for row in get_oned_data_file_rows(oned_data_file_in): row.append('') # For missing properties writer.writerow([row[pos] for pos in file_pos]) row_count += 1 write_oned_data_file_row_count(oned_data_file_out, row_count) if remove: fileutils.force_remove(*oned_data_files_in) return row_count
[docs]def merge_oned_hits_files(hits_files_in, hits_file_out, query_row=None): """ Merges a list of compressed CSV hits files, creating an output file with a master set of properties. :param hits_files_in: List of hits files to merge :type hits_files_in: list(str) :param hits_file_out: Output hits file :type hits_file_out: str :param query_row: If supplied, this row is written before any hits. It should be obtained by calling get_oned_query_rows. :type query_row: list(str) :return: Number of merged hits written :rtype: int """ hit_count = 0 master_props = get_master_oned_properties(hits_files_in) with gzip.open(hits_file_out, 'wt', newline='') as fh_out: writer = csv.writer(fh_out) writer.writerow(master_props) if query_row: writer.writerow(query_row) for hits_file_in in hits_files_in: with gzip.open(hits_file_in, 'rt') as fh_in: reader = csv.reader(fh_in) file_pos = get_property_positions(master_props, next(reader)) for row in reader: row.append('') # For missing properties writer.writerow([row[pos] for pos in file_pos]) hit_count += 1 return hit_count
[docs]def run_oned_screen(st_query, oned_data_file, hits_file, start=0, stop=None, write_query_row=False, sort=True, max_hits=ONED_MAX_HITS, max_rows=ONED_MAX_ROWS, min_sim=0.0, logger=None, progress_interval=ONED_RUN_PROGRESS_INTERVAL): """ Performs a 1D similarity screen with a single structure query and writes hits to a compressed CSV file. :param st_query: The query structure :type st_query: structure.Structure :param oned_data_file: Name of the input 1D data file (.1dbin) :type oned_data_file: str :param hits_file: Name of the output compressed CSV file (.csv.gz) :type hits_file: str :param start: 0-based starting row in oned_data_file :type start: int :param stop: Upper limit on the rows to screen. For example, if start=5 and stop=10, rows 5, 6, 7, 8 and 9 will be screened. Screening proceeds until the end of the file by default. :type stop: int :param write_query_row: Whether to write the 1D query as the first row :type write_query_row: bool :param sort: Whether to write a sorted hits file :type sort: bool :param max_hits: Cap on the number of sorted hits to write :type max_hits: int :param min_sim: Write only hits whose similarity to the query are greater than or equal to this value :type min_sim: float :param logger: Logger for info level progress messages :type logger: logging.Logger or NoneType :param progress_interval: Interval between progress messages :type progress_interval: int :return: Number of hits written :rtype: int """ master_props = get_master_oned_properties([oned_data_file]) master_props.append(ONED_SIM_PROPERTY) oned_query = get_oned_query(st_query, oned_data_file) oned_sim_calc = phase.OneDSimCalc(oned_query) encoder = phase.OneDEncoder() query_row = None if write_query_row: encoder.fromOneDRep(oned_query) query_row = get_oned_query_row(st_query, encoder.toBase64(), master_props) hits_file_out = hits_file if sort: # Use PhpSortCapper to reduce the number of unsorted hits written # to disk. hit_capper = phase.PhpSortCapperDoubleDecreasing(max_hits) hits_file_out += '.tmp' hit_count = 0 with gzip.open(hits_file_out, 'wt', newline='') as fh: writer = csv.writer(fh) writer.writerow(master_props) if write_query_row and not sort: writer.writerow(query_row) struct_count = 0 for row in get_oned_data_file_rows(oned_data_file, start, stop): encoder.fromBase64(row[2]) sim = oned_sim_calc.computeSim(encoder.toOneDRep()) struct_count += 1 if sim >= min_sim: write_hit = not sort or hit_capper.insert(sim) if write_hit: writer.writerow(row + [str(sim)]) hit_count += 1 if logger and struct_count % progress_interval == 0: logger.info(f'{struct_count} structures screened') if logger: logger.info(f'Total number of structures screened: {struct_count}') if sort: if logger: logger.info('Sorting hits by decreasing similarity to query') hit_count = combine_oned_hits([hits_file_out], hits_file, query_row, sort, max_hits, max_rows) fileutils.force_remove(hits_file_out) if logger: logger.info(f'Number of hits written: {hit_count}') return hit_count
[docs]def split_structure_file(structure_file, prefix_out, nfiles): """ Splits a structure file into a number of smaller, equal-sized files named <prefix_out>_1.<ext>, <prefix_out>_2.<ext>, etc., where <ext> will be 'smi.gz', 'csv.gz', 'maegz' or 'sdfgz', depending on the type of file supplied. :param structure_file: The name of the structure file to be split :type oned_data_file: str :param prefix_out: Prefix for all output structure files :type prefix_out: str :param nfiles: The number of output structure files to create :type nfiles: int :return: The names of the files created :rtype: list(str) :raise: ValueError if the file format is illegal """ in_format = fileutils.get_structure_file_format(structure_file) if in_format in (fileutils.MAESTRO, fileutils.SD): splitter = phase.PhpStructureSplitter([structure_file], nfiles, prefix_out) return list(splitter.getFileNamesOut()) elif in_format in (fileutils.SMILES, fileutils.SMILESCSV): ext = SPLIT_STRUCTURE_FILE_FORMAT_EXT[in_format] write_header = True if in_format == fileutils.SMILESCSV else False nstruct = structure.count_structures(structure_file) row_counts = phase.partitionValues(nstruct, min(nfiles, nstruct)) outfiles = [] with fileutils.open_maybe_compressed(structure_file, 'rt') as reader: header = reader.readline() if write_header else None for i, row_count in enumerate(row_counts, 1): outfile = f'{prefix_out}_{i}{ext}' outfiles.append(outfile) with gzip.open(outfile, 'wt', newline='') as writer: if write_header: writer.write(header) for row in range(row_count): writer.write(reader.readline()) return outfiles else: raise ValueError(f'Illegal structure file format: "{structure_file}"')
[docs]def split_oned_data_file(oned_data_file, prefix_out, nfiles): """ Splits a 1D data file into a number of smaller, equal-sized files named <prefix_out>_1.1dbin, <prefix_out>_2.1dbin, etc. :param oned_data_file: The name of the 1D data file to be split :type oned_data_file: str :param prefix_out: Prefix for all output 1D data files :type prefix_out: str :param nfiles: The number of output 1D data files to create :type nfiles: int :return: The names of the files created :rtype: list(str) """ file_splits = get_oned_data_file_splits(oned_data_file, prefix_out, nfiles) with gzip.open(oned_data_file, 'rt') as fh_in: reader = csv.reader(fh_in, delimiter='\t') oned_attr = next(reader) properties = next(reader) try: for i, file_split in enumerate(file_splits): outfile = file_split[0] start, stop = file_split[1] with gzip.open(outfile, 'wt', newline='') as fh_out: writer = csv.writer(fh_out, delimiter='\t') writer.writerow(oned_attr) writer.writerow(properties) for j in range(start, stop): writer.writerow(next(reader)) write_oned_data_file_row_count(outfile, stop - start) except gzip.BadGzipFile: pass return [file_split[0] for file_split in file_splits]
[docs]def write_oned_data_file_row_count(oned_data_file, row_count): """ Appends the total number of rows to a 1D data file. :param oned_data_file: Name of the 1D data file (.1dbin) :type oned_data_file: str :param row_count: Total number of rows in file :type row_count: int """ with open(oned_data_file, 'ab') as writer: writer.write( row_count.to_bytes(ONED_ROW_COUNT_BYTES, byteorder='big', signed=False))