"""
Performs task-based work for the 1D similarity driver.
Copyright Schrodinger LLC, All Rights Reserved.
"""
import base64
import csv
import gzip
import heapq
import io
import math
import os
from schrodinger import adapter
from schrodinger import structure
from schrodinger.application.phase.packages import phase_utils
from schrodinger.infra import canvas
from schrodinger.infra import phase
from schrodinger.job import jobcontrol
from schrodinger.utils import fileutils
ONED_SCREEN = 'oned_screen'
TASK_CREATE = 'create'
TASK_DESCRIBE = 'describe'
TASK_EXPORT = 'export'
TASK_MERGE = 'merge'
TASK_RUN = 'run'
TASK_SPLIT = 'split'
LEGAL_STRUCTURE_FILE_FORMATS = [
fileutils.SMILES, fileutils.SMILESCSV, fileutils.MAESTRO, fileutils.SD
]
LEGAL_STRUCTURE_FILE_TYPES = ['SMILES', 'SMILES-CSV', 'Maestro', 'SD']
SPLIT_STRUCTURE_FILE_FORMAT_EXT = {
fileutils.SMILES: '.smi.gz',
fileutils.SMILESCSV: '.csv.gz',
fileutils.MAESTRO: '.maegz',
fileutils.SD: '.sdfgz'
}
ONED_VERSION = '1.0'
ONED_PHARM = 'pharm'
ONED_ELEMENT = 'element'
ONED_MMOD = 'mmod'
ONED_TREATMENTS = (ONED_PHARM, ONED_ELEMENT, ONED_MMOD)
ONED_TREATMENTS_DICT = {
ONED_PHARM: phase.ONED_TREATMENT_PHARM,
ONED_ELEMENT: phase.ONED_TREATMENT_ELEMENT,
ONED_MMOD: phase.ONED_TREATMENT_MMOD
}
ONED_CREATE_PROGRESS_INTERVAL = 10000
ONED_RUN_PROGRESS_INTERVAL = 100000
ONED_ROW_COUNT_BYTES = 16
ONED_HITS_SUFFIX = '-hits.csv.gz'
ONED_MAX_HITS = 1000
ONED_MAX_ROWS = 1000000
ONED_MAX_QUERIES = 25
ONED_REP_PROPERTY = 's_phase_1D_Rep'
ONED_SIM_PROPERTY = 'r_phase_1D_Sim'
ONED_LEADING_PROPERTIES = ['SMILES', 'NAME', ONED_REP_PROPERTY]
[docs]class BasicStatsAccumulator:
"""
Accumulates basic statistics for a set of property values.
"""
[docs] def __init__(self, property_name):
"""
Constructor taking the name of the property.
"""
self.property_name = property_name
self.count = 0
self.min = 0.0
self.max = 0.0
self.mean = 0.0
self._mean_x2 = 0.0
[docs] def addValue(self, value):
"""
Adds a numeric value to the series, updating the statistics.
"""
self.count += 1
if self.count == 1:
self.min = value
self.max = value
self.mean = value
self._mean_x2 = value * value
else:
self.min = min(self.min, value)
self.max = max(self.max, value)
wold = (self.count - 1) / self.count
wnew = 1.0 / self.count
self.mean = wold * self.mean + wnew * value
self._mean_x2 = wold * self._mean_x2 + wnew * value * value
@property
def std_dev(self):
"""
Returns the standard deviation.
"""
return math.sqrt(self._mean_x2 - self.mean * self.mean)
[docs]def base64_decode_fd(s):
"""
Decodes feature definitions from a Base64 string. Feature definitions
will be empty if string is empty.
:param s: Base64-encoded feature definition string
:type s: str
:return: Feature definitions
:rtype: list(phase.PhpFeatureDefinition)
"""
if not s:
return []
return phase.feature_definitions_from_string(
base64.b64decode(s.encode('ascii')).decode('utf-8'))
[docs]def base64_encode_fd(fd):
"""
Encodes feature definitions to a Base64 string. String will be empty if
fd is empty or None.
:param fd: Feature definitions
:type fd: list(phase.PhpFeatureDefinition)
:return: Base64-encoded feature definitions string
:rtype: str
"""
if not fd:
return ''
s = phase.feature_definitions_to_string(fd)
return base64.b64encode(s.encode('utf-8')).decode('ascii')
[docs]def combine_oned_hits(hits_files_in,
hits_file_out,
query_row=None,
sort=True,
max_hits=ONED_MAX_HITS,
max_rows=ONED_MAX_ROWS):
"""
Combines a set of 1D hits files with or without sorting and capping.
:param hits_files_in: List of compressed CSV hits files to combine
:type hits_files_in: list(str)
:param hits_file_out: Output compressed CSV hits file
:type hits_file_out: str
:param query_row: If supplied, this row is written before any hits
:type query_row: list(str)
:param sort: Whether to write a sorted hits file
:type sort: bool
:param max_hits: Cap on the number of sorted hits to output
:type max_hits: int
:param max_rows: Maximum number of sorted rows to hold in memory
:type max_rows: int
:return: Number of hits written
:rtype: int
"""
if len(hits_files_in) == 0:
return 0
if not sort:
return merge_oned_hits_files(hits_files_in, hits_file_out, query_row)
remove_unsorted_hits = False
if len(hits_files_in) == 1:
# No need to merge before sorting.
unsorted_hits_file = hits_files_in[0]
else:
unsorted_hits_file = hits_file_out + '.tmp'
merge_oned_hits_files(hits_files_in, unsorted_hits_file)
remove_unsorted_hits = True
# Get master properties, which may or may not have been merged.
with gzip.open(unsorted_hits_file, 'rt') as fh:
master_props = next(csv.reader(fh))
# Sort hits.
with gzip.open(hits_file_out, 'wt', newline='') as fh_out:
writer = csv.writer(fh_out)
writer.writerow(master_props)
if query_row:
writer.writerow(query_row)
# Make one or more passes through unsorted_hits_file, dumping sorted
# hits from a heap at the end of each pass.
max_heap = min(max_hits, max_rows)
hits_written = set()
while len(hits_written) < max_hits:
heap = []
num_hits_remaining = max_hits - len(hits_written)
max_heap = min(max_heap, num_hits_remaining)
with gzip.open(unsorted_hits_file, 'rt') as fh_in:
reader = csv.reader(fh_in)
next(reader)
for i, row in enumerate(reader):
if i in hits_written:
continue
sim = float(row[-1])
# We use -i here and when sorting to preserve original
# order in the event of a tie on sim.
item = (sim, -i, row)
if len(heap) == max_heap:
heapq.heappushpop(heap, item)
else:
heapq.heappush(heap, item)
for item in sorted(heap, reverse=True):
writer.writerow(item[2])
hits_written.add(-item[1])
if len(heap) == 0:
break
if remove_unsorted_hits:
fileutils.force_remove(unsorted_hits_file)
return len(hits_written)
[docs]def create_oned_data_file(structure_file,
oned_data_file,
treatment=phase.ONED_TREATMENT_PHARM,
fd=None,
props=None,
logger=None,
progress_interval=ONED_CREATE_PROGRESS_INTERVAL):
"""
Creates a 1D data file from the structures in a SMILES, SMILES-CSV,
Maestro or SD file.
:param structure_file: Input file of structures
:type structure_file: str
:param oned_data_file: Destination 1D data file (.1dbin)
:type oned_data_file: str
:param treatment: Structure treatment for 1D representations
:type treatment: phase.OneDTreatment
:param fd: Overrides default feature definitions. Relevant only when
treatment is phase.ONED_TREATMENT_PHARM.
:type fd: list(phase.PhpFeatureDefinition) or NoneType
:param props: m2io-style properties to include in the 1D data file,
other than SMILES and title. Not used when a SMILES file is
supplied.
:type props: list(str) or NoneType
:param logger: Logger for info level progress messages
:type logger: logging.Logger or NoneType
:param progress_interval: Interval between progress messages
:type progress_interval: int
:return: Number of rows written to the 1D data file
:rtype: int
"""
generator = phase.OneDGenerator(treatment)
fd_oned = []
if treatment == phase.ONED_TREATMENT_PHARM:
if fd:
fd_oned = fd
else:
fd_oned = phase.read_feature_definitions(
phase.getDefaultFdFilePath())
generator.setFd(fd_oned)
encoder = phase.OneDEncoder()
oned_attr = [ONED_VERSION, str(treatment), base64_encode_fd(fd_oned)]
properties = ONED_LEADING_PROPERTIES.copy()
additional_props = []
if props:
additional_props = list(props)
properties += additional_props
row_count = 0
with gzip.open(oned_data_file, 'wt', newline='') as fh_out:
# Delimit with tabs to avoid quoting issues.
writer = csv.writer(fh_out, delimiter='\t')
writer.writerow(oned_attr)
writer.writerow(properties)
with get_structure_file_reader(structure_file) as reader:
reading_smiles = True
if isinstance(reader, structure.StructureReader):
reading_smiles = False
for st in reader:
title = st.title
try:
if reading_smiles:
smiles = st.smiles
oned_rep = generator.generate(
adapter.to_structure(smiles))
else:
oned_rep = generator.generate(st)
smiles = adapter.to_smiles(st)
encoder.fromOneDRep(oned_rep)
base64_rep = encoder.toBase64()
except phase.PhpException as exc:
# The most common failure is a fragmented structure,
# which prevents creation of the topological distance
# matrix that the 1D coordinates attempt to reproduce.
if logger:
logger.info(f'{exc} - skipping "{title}"')
continue
prop_values = [smiles, title, base64_rep]
for prop in additional_props:
prop_values.append(st.property.get(prop, ''))
writer.writerow(prop_values)
row_count += 1
if logger and row_count % progress_interval == 0:
logger.info(f'{row_count} structures processed')
write_oned_data_file_row_count(oned_data_file, row_count)
if logger:
logger.info(f'Total number of structures processed: {row_count}')
return row_count
[docs]def describe_oned_data_file(oned_data_file, stats=False):
"""
Returns a string containing a description of the supplied 1D data file.
:param oned_data_file: Name of the 1D data file (.1dbin)
:type oned_data_file: str
:param stats: Whether to report basic statistics for any numeric
properties in the 1D data file
:type stats: bool
:return: String containing the description
:rtype: str
"""
version, treatment, fd = get_oned_data_file_attributes(oned_data_file)
descr = [f'Version: {version}']
descr.append(f'Structure treatment: {ONED_TREATMENTS[treatment]}')
if treatment == phase.ONED_TREATMENT_PHARM:
fd_default = phase.read_feature_definitions(
phase.getDefaultFdFilePath())
fd_descr = 'default' if fd == fd_default else 'custom'
descr.append(f'Feature definitions: {fd_descr}')
row_count = get_oned_data_file_row_count(oned_data_file)
descr.append(f'Total number of rows: {row_count}')
props = get_oned_data_file_properties(oned_data_file)
numeric_props = []
numeric_pos = []
is_int_prop = []
if props:
descr.append('Additional properties:')
for pos, prop in enumerate(props, 3):
descr.append(f' {prop}')
if stats and prop[0:2] in ('i_', 'r_'):
numeric_props.append(prop)
numeric_pos.append(pos)
is_int_prop.append(True if prop[0:2] == 'i_' else False)
if numeric_props:
accumulators = [BasicStatsAccumulator(prop) for prop in numeric_props]
for row in get_oned_data_file_rows(oned_data_file):
for i, pos in enumerate(numeric_pos):
if row[pos]:
value = int(row[pos]) if is_int_prop[i] else float(row[pos])
accumulators[i].addValue(value)
descr.append('Numeric property statistics:')
for accumulator in accumulators:
descr.append(f' {accumulator.property_name}')
descr.append(f' Count: {accumulator.count}')
descr.append(f' Min: {accumulator.min}')
descr.append(f' Max: {accumulator.max}')
descr.append(' Mean: %.6f' % accumulator.mean)
descr.append(' Std. Dev: %.6f' % accumulator.std_dev)
return '\n'.join(descr)
[docs]def export_oned_data_file(oned_data_file, output_file, subset=None):
"""
Exports rows from a 1D data file to a compressed CSV file. A subset
of rows may be specified as a string of comma-separated row ranges,
(e.g., '1:100,200:300') or via a text file with a property name on
the first line (e.g., 's_m_title' or 's_sd_Vendor_ID') and the values
of that property on subsequent lines.
:param oned_data_file: Name of the 1D data file (.1dbin)
:type oned_data_file: str
:param output_file: Output compressed CSV file
:type output_file: str
:param subset: A comma-separated list of row ranges or text file name
:type subset: str
:return: Number of rows exported
:rtype: int
"""
row_count = 0
with gzip.open(output_file, 'wt', newline='') as fh_out:
writer = csv.writer(fh_out)
writer.writerow(get_master_oned_properties([oned_data_file]))
if not subset:
for row in get_oned_data_file_rows(oned_data_file):
writer.writerow(row)
row_count += 1
elif os.path.isfile(subset):
pos, values = get_values_to_match(oned_data_file, subset)
for row in get_oned_data_file_rows(oned_data_file):
if row[pos] in values:
writer.writerow(row)
row_count += 1
else:
bitset = get_rows_to_export(subset)
start = 0
stop = bitset.size()
for i, row in enumerate(
get_oned_data_file_rows(oned_data_file, start, stop)):
if bitset.get(i):
writer.writerow(row)
row_count += 1
return row_count
[docs]def get_hits_file_names(jobname, nqueries):
"""
Returns the names of the hits files the job will produce based on the
number of query structures.
:param jobname: Job name
:type jobname: str
:param nqueries: Number of query structures
:type nqueries: int
:return: Hits file names
:rtype: list(str)
"""
suffix = ONED_HITS_SUFFIX
if nqueries == 1:
return [jobname + suffix]
else:
return [f'{jobname}_{i}{suffix}' for i in range(1, nqueries + 1)]
[docs]def get_jobname(args):
"""
Assigns the job name from SCHRODINGER_JOBNAME or from the base name of
the appropriate input file.
:param args: argparser.Namespace with command line arguments
:type args: argparser.Namespace
:return: Job name
:rtype: str
"""
task = args.task
if task in (TASK_CREATE, TASK_MERGE):
file_task = fileutils.get_basename(args.dest) + '_' + task
elif task in (TASK_DESCRIBE, TASK_EXPORT, TASK_SPLIT):
file_task = fileutils.get_basename(args.source) + '_' + task
else:
# TASK_RUN.
file_task = fileutils.get_basename(args.query)
return jobcontrol.get_jobname(file_task)
[docs]def get_master_oned_properties(filenames):
"""
Returns the union of all properties in the provided 1D data files or
compressed CSV hits files. The first three properties are always SMILES
NAME and ONED_REP_PROPERTY. If CSV hits files are supplied, the last
property will always be ONED_SIM_PROPERTY. Any additional properties
will appear after the first three properties. If multiple files are
supplied, those additional properties will be sorted alphabetically.
:param filenames: List of files to consider
:type filenames: list(str)
:return: Union of additional properties
:rtype: list(str)
"""
if not filenames:
return []
oned_fmt = is_oned_data_file(filenames[0])
extra_props = set()
trailing = [] if oned_fmt else [ONED_SIM_PROPERTY]
for filename in filenames:
if oned_fmt:
props = get_oned_data_file_properties(filename)
else:
with gzip.open(filename, 'rt') as fh:
props = next(csv.reader(fh))[3:-1]
if len(filenames) == 1:
# Preserve original order of properties.
return ONED_LEADING_PROPERTIES + props + trailing
for prop in props:
extra_props.add(prop)
return ONED_LEADING_PROPERTIES + sorted(extra_props) + trailing
[docs]def get_oned_data_file_attributes(oned_data_file):
"""
Returns the version, structure treatment and feature definitions of the
supplied 1D data file.
:param oned_data_file: Name of the 1D data file (.1dbin)
:type oned_data_file: str
:return: tuple of version, structure treatment and feature definitions
:rtype: str, phase.OneDTreatment, list(phase.PhpFeatureDefinition)
"""
with gzip.open(oned_data_file, 'rt') as fh:
tokens = next(csv.reader(fh, delimiter='\t'))
return tokens[0], int(tokens[1]), base64_decode_fd(tokens[2])
[docs]def get_oned_data_file_names(source, prefer_cwd=False):
"""
Returns the name of the 1D data file(s) specified in source, which may
be the name of a 1D data file or a list file containing the names of
one or more 1D data files.
:param source: 1D data file source specification
:type source: str
:param prefer_cwd: If source is a list file, setting this to True forces
the use of 1D data files in the CWD if they exist, even if they
also exist at the locations specified in the list file. This
addresses the situation where the list file contains absolute
paths that exist on the job host, but the corresponding files
have been copied to the job directory. In that case, we want to
be accessing only the files in the job directory.
:type prefer_cwd: bool
:return: 1D data file names
:rtype: list(str)
"""
source_format = phase.get_phase_file_format(source)
if source_format == phase.PhpFileFormat_PHP_FORMAT_LIST:
return phase_utils.get_file_names_from_list_file(source, prefer_cwd)
else:
return [source]
[docs]def get_oned_data_file_properties(oned_data_file):
"""
Returns a list of the names of any additional properties stored in the
supplied 1D data file. These are properties other than SMILES, NAME and
the ONED_REP_PROPERTY. The list will be empty if no additonal properties
are stored.
:param oned_data_file: Name of the 1D data file (.1dbin)
:type oned_data_file: str
:return: list of additional property names
:rtype: list(str)
"""
with gzip.open(oned_data_file, 'rt') as fh:
reader = csv.reader(fh, delimiter='\t')
next(reader)
return next(reader)[3:]
[docs]def get_oned_data_file_distribution(oned_data_files, nsub):
"""
Given a list of 1D data files to screen and the number of subjobs over
which the screen is to be distributed, this function determines how to
divide the 1D data files over the subjobs. A list with nsub elements is
returned, where a given element holds one or more 1D data file names
and the (start, stop) row limits to screen in that file. For example,
if 1D data files file1.1dbin and file2.1dbin are supplied, with 1200
and 1800 rows, respectively, and nsub is 3, this function would return
the following:
[[['file1.1dbin', (0, 1000)]], # subjob 1
[['file1.1dbin', (1000, 1200)], ['file2.1dbin', (0, 800)]], # subjob 2
[['file2.1dbin', (800, 1800)]]] # subjob 3
:param oned_data_files: List of 1D data files
:type oned_data_files: list(str)
:param nsub: Number of subjobs
:type nsub: int
:return: List of lists of file name, (start, stop) limits
:rtype: list(list(str, (int, int)))
"""
if not oned_data_files:
return []
file_row_counts = [get_oned_data_file_row_count(x) for x in oned_data_files]
total_rows = sum(file_row_counts)
nsub = min(nsub, total_rows)
subjob_row_counts = phase.partitionValues(total_rows, nsub)
distributions = []
file_pos = 0
start = 0
for subjob_row_count in subjob_row_counts:
distribution = []
current_row_count = 0
while current_row_count < subjob_row_count:
if start >= file_row_counts[file_pos]:
file_pos += 1
start = 0
remaining_row_count = subjob_row_count - current_row_count
stop = min(start + remaining_row_count, file_row_counts[file_pos])
distribution.append([oned_data_files[file_pos], (start, stop)])
current_row_count += stop - start
start = stop
distributions.append(distribution)
return distributions
[docs]def get_oned_data_file_splits(oned_data_file, prefix_out, nfiles):
"""
Returns a list of output file names and (start, stop) limits for
physically splitting a 1D data file into a number of smaller,
equal-sized files. A given element of the returned list will be
of the form:
<prefix_out>_<n>.1dbin, (start, stop)
where <prefix_out>_<n>.1dbin is the nth output file to create and
start, stop are the corresponding row limits in oned_data_file,
with stop being non-inclusive.
:param oned_data_file: The name of the 1D data file to be split
:type oned_data_file: str
:param prefix_out: Prefix for all output 1D data files
:type prefix_out: str
:param nfiles: The number of output 1D data files to create
:type nfiles: int
:return: List of file names and (start, stop) limits
:rtype: list(str, (int, int))
"""
total_rows = get_oned_data_file_row_count(oned_data_file)
file_sizes = phase.partitionValues(total_rows, nfiles)
file_ext = phase.PHASE_1DBIN_FILE_EXT
file_splits = []
start = 0
for i, file_size in enumerate(file_sizes, 1):
file_name = f'{prefix_out}_{i}{file_ext}'
stop = start + file_size
file_splits.append((file_name, (start, stop)))
start = stop
return file_splits
[docs]def get_oned_data_file_row_count(oned_data_file):
"""
Returns the number of rows in the supplied 1D data file.
:param oned_data_file: Name of the 1D data file (.1dbin)
:type oned_data_file: str
:return: Number of rows in the file
:rtype: int
"""
with open(oned_data_file, 'rb') as reader:
reader.seek(-ONED_ROW_COUNT_BYTES, io.SEEK_END)
return int.from_bytes(reader.read(), byteorder='big')
[docs]def get_oned_data_file_rows(oned_data_file, start=0, stop=None):
"""
Generator that yields rows from the supplied 1D data file. Each row
is a list of strings, where the first 3 elements are SMILES, name and
1D encoding, and any subsequent elements hold the values of additional
properties stored in the 1D data file.
:param oned_data_file: Name of the 1D data file (.1dbin)
:type oned_data_file: str
:param start: 0-based starting row position
:type start: int
:param stop: Upper limit on the rows to read. For example, if start=5
and stop=10, rows 5, 6, 7, 8 and 9 will be read. Reading
proceeds until the end of the file by default.
:type stop: int
:yield: The next row in the file
:ytype: list(str)
"""
nskip = start + 2
finish = stop if stop else get_oned_data_file_row_count(oned_data_file)
with gzip.open(oned_data_file, mode='rt') as fh:
reader = csv.reader(fh, delimiter='\t')
try:
for i in range(nskip):
next(reader)
for i in range(start, finish):
yield next(reader)
except gzip.BadGzipFile:
# Will get here if we reach the row count footer
pass
[docs]def get_oned_query(st_query, oned_data_file):
"""
Returns a 1D representation for the provided query structure that's
created according to the attributes in the supplied 1D data file.
:param st_query: The query structure
:type st_query: structure.Structure
:param oned_data_file: Name of the input 1D data file (.1dbin)
:type oned_data_file: str
:return: 1D representation of the query structure
:rtype: phase.OneDRep
"""
_, treatment, fd = get_oned_data_file_attributes(oned_data_file)
generator = phase.OneDGenerator(treatment)
if treatment == phase.ONED_TREATMENT_PHARM:
generator.setFd(fd)
return generator.generate(st_query)
[docs]def get_oned_query_row(st_query, oned_query_base64, master_properties):
"""
Returns a row for the query structure that can be written to the top
of a hits file.
:param st_query: The query structure
:type st_query: structure.Structure
:param oned_query_base64: Base64-encoded 1D representation of the query
structure
:type oned_query_base64: str
:param master_properties: The full list of properties being written
to the hits file
:type master_properties: list(str)
:return: Query structure row
:rtype: list(str)
"""
query_row = len(master_properties) * ['']
query_row[0] = adapter.to_smiles(st_query)
query_row[1] = st_query.title
query_row[2] = oned_query_base64
query_row[-1] = 1.0
for i, prop in enumerate(master_properties[3:-1], start=3):
query_row[i] = st_query.property.get(prop, '')
return query_row
[docs]def get_oned_query_rows(st_queries, oned_data_files):
"""
Given a list of query structures and the 1D data files that were
screened, this function returns a row for each query that can be
supplied to combine_oned_hits to ensure that the query appears at
the top of its associated hits file.
:param st_queries: Query structures
:type st_queries: list(structure.Structure)
:param oned_data_files: Names of 1D data files that were screened
:type oned_data_files: list(str)
:return: A row for each query
:rtype: list(list(str))
"""
if not oned_data_files:
return []
query_rows = []
encoder = phase.OneDEncoder()
master_props = get_master_oned_properties(oned_data_files)
master_props.append(ONED_SIM_PROPERTY)
for st in st_queries:
oned_query = get_oned_query(st, oned_data_files[0])
encoder.fromOneDRep(get_oned_query(st, oned_data_files[0]))
query_rows.append(
get_oned_query_row(st, encoder.toBase64(), master_props))
return query_rows
[docs]def get_oned_query_structures(query_file):
"""
Reads query structures from the provided SMILES, SMILESCSV, Maestro or
SD file. Coordinates are not set in the case of SMILES or SMILESCSV.
:param query_file: Structure file containing queries
:type query_file: str
:return: Query structures
:rtype: list(structure.Structure)
"""
queries = []
with get_structure_file_reader(query_file) as reader:
reading_smiles = True
if isinstance(reader, structure.StructureReader):
reading_smiles = False
for st in reader:
if reading_smiles:
st_smiles = adapter.to_structure(st.smiles)
st_smiles.property.update(st.property)
queries.append(st_smiles)
else:
queries.append(st)
return queries
[docs]def get_property_positions(master_properties, file_properties):
"""
Returns the postion of each master property in a potentially smaller
list of properties from a particular file. If a master property is
not found in file_properties, the position of that property will be
len(file_properties). Thus if file_pos contains the positions returned
by this function, and file_row contains the property values for some
row in that file, the following code can be used to construct a master
row of property values that contains '' for each missing value:
file_row.append('')
master_row = [file_row[pos] for pos in file_pos]
:param master_properties: Master list of properties
:type master_properties: list(str)
:param file_properties: The list of properties from the file
:param file_properties: list(str)
:return: Positions of master_properties within file_properties
:rtype: list of int
"""
positions = len(master_properties) * [len(file_properties)]
for i, prop in enumerate(master_properties):
try:
positions[i] = file_properties.index(prop)
except ValueError:
pass
return positions
[docs]def get_rows_to_export(row_ranges):
"""
Constructs a canvas.ChmBitset from a comma-separated list of row ranges
(e.g., '1:100,200:300'). Input row numbers are 1-based and upper limits
are inclusive. The returned bitset will have a logical size equal to the
highest row number supplied, and the on positions will be 0-based. Note
that the maximum logical size for a ChmBitset is 4,294,967,263, so this
function assumes that users will not attempt to create individual 1D data
files that would exceed the ChmBitset limit.
:param oned_data_file: The name of the 1D data file (.1dbin)
:type oned_data_file: str
:param row_ranges: Comma-separated list of 1-based row ranges
:type row_ranges: str
:return: Bitset with 0-based rows as the on positions
:rtype: canvas.ChmBitset
:raise: ValueError if an illegal string of row ranges is supplied
"""
try:
range_filter = canvas.ChmRangeFilter(row_ranges)
return range_filter.toBitset(range_filter.getLast())
except canvas.ChmException as err:
raise ValueError(str(err))
[docs]def get_split_file_names(prefix, n):
"""
Returns the names of the 1D data files that will be created in the
'split' task.
:param prefix: Prefix of output 1D data files
:type prefix: str
:param n: Number of files to create
:type n: int
:return: 1D data file names
:rtype: list(str)
"""
ext = phase.PHASE_1DBIN_FILE_EXT
return [f'{prefix}_{i}{ext}' for i in range(1, n + 1)]
[docs]def get_structure_file_reader(structure_file):
"""
Returns the appropriate reader for the supplied structure file,
which is expected to be SMILES, SMILESCSV, MAESTRO or SD.
:param structure_file: Input file of structures
:type structure_file: str
:return: Structure file reader
:rtype: structure.SmilesReader, structure.SmilesCsvReader or
structure.StructureReader
:raise: ValueError if the file format is illegal
"""
in_format = fileutils.get_structure_file_format(structure_file)
if in_format not in LEGAL_STRUCTURE_FILE_FORMATS:
raise ValueError(f'Illegal structure file format: "{structure_file}"')
if in_format == fileutils.SMILES:
return structure.SmilesReader(structure_file)
elif in_format == fileutils.SMILESCSV:
return structure.SmilesCsvReader(structure_file)
else:
return structure.StructureReader(structure_file)
[docs]def get_values_to_match(oned_data_file, filename):
"""
Given a 1D data file and a text file containing a property name
followed by property values to match, this function returns the
0-based position of the specified property in the 1D data file
and a set of the values to match.
:param oned_data_file: The name of the 1D data file (.1dbin)
:type oned_data_file: str
:param filename: The name of the text file with the property name
and the values to match
:type filename: str
:return: 0-based property position, followed by values to match
:rtype: int, set(str)
:raise: ValueError if the property is not found in oned_data_file
"""
with open(filename, 'rt') as reader:
prop = reader.readline().rstrip('\n')
prop_lower = prop.lower()
if prop_lower == 'smiles':
pos = 0
elif prop_lower in ('name', 'title', 's_m_title'):
pos = 1
else:
try:
oned_props = get_oned_data_file_properties(oned_data_file)
pos = oned_props.index(prop) + 3
except ValueError:
msg = f'Property "{prop}" not found in {oned_data_file}'
raise ValueError(msg)
return pos, set(line.strip() for line in reader)
[docs]def is_oned_data_file(filename):
"""
Returns True if the supplied file name corresponds to a 1D data file.
:param filename: The name of the file
:type filename: str
:return: Whether the name corresponds to a 1D data file
:rtype: bool
"""
file_format = phase.get_phase_file_format(filename)
return file_format == phase.PhpFileFormat_PHP_FORMAT_1DBIN
[docs]def merge_oned_data_files(oned_data_files_in, oned_data_file_out, remove=False):
"""
Merges a list of 1D data files, creating an output file with a master
set of properties.
:param oned_data_files_in: List of 1D data files to merge
:type oned_data_files_in: list(str)
:param oned_data_file_out: Output 1D data file
:type oned_data_file_out: str
:param remove: If True input 1D files will be removed after merge
:type remove: bool
:return: Total number of rows merged
:rtype: int
"""
if not oned_data_files_in:
return
master_props = get_master_oned_properties(oned_data_files_in)
version, treatment, fd = get_oned_data_file_attributes(
oned_data_files_in[0])
oned_attr = [version, str(treatment), base64_encode_fd(fd)]
with gzip.open(oned_data_file_out, 'wt', newline='') as fh_out:
writer = csv.writer(fh_out, delimiter='\t')
writer.writerow(oned_attr)
writer.writerow(master_props)
row_count = 0
for oned_data_file_in in oned_data_files_in:
file_props = get_master_oned_properties([oned_data_file_in])
file_pos = get_property_positions(master_props, file_props)
for row in get_oned_data_file_rows(oned_data_file_in):
row.append('') # For missing properties
writer.writerow([row[pos] for pos in file_pos])
row_count += 1
write_oned_data_file_row_count(oned_data_file_out, row_count)
if remove:
fileutils.force_remove(*oned_data_files_in)
return row_count
[docs]def merge_oned_hits_files(hits_files_in, hits_file_out, query_row=None):
"""
Merges a list of compressed CSV hits files, creating an output file
with a master set of properties.
:param hits_files_in: List of hits files to merge
:type hits_files_in: list(str)
:param hits_file_out: Output hits file
:type hits_file_out: str
:param query_row: If supplied, this row is written before any hits.
It should be obtained by calling get_oned_query_rows.
:type query_row: list(str)
:return: Number of merged hits written
:rtype: int
"""
hit_count = 0
master_props = get_master_oned_properties(hits_files_in)
with gzip.open(hits_file_out, 'wt', newline='') as fh_out:
writer = csv.writer(fh_out)
writer.writerow(master_props)
if query_row:
writer.writerow(query_row)
for hits_file_in in hits_files_in:
with gzip.open(hits_file_in, 'rt') as fh_in:
reader = csv.reader(fh_in)
file_pos = get_property_positions(master_props, next(reader))
for row in reader:
row.append('') # For missing properties
writer.writerow([row[pos] for pos in file_pos])
hit_count += 1
return hit_count
[docs]def run_oned_screen(st_query,
oned_data_file,
hits_file,
start=0,
stop=None,
write_query_row=False,
sort=True,
max_hits=ONED_MAX_HITS,
max_rows=ONED_MAX_ROWS,
min_sim=0.0,
logger=None,
progress_interval=ONED_RUN_PROGRESS_INTERVAL):
"""
Performs a 1D similarity screen with a single structure query and
writes hits to a compressed CSV file.
:param st_query: The query structure
:type st_query: structure.Structure
:param oned_data_file: Name of the input 1D data file (.1dbin)
:type oned_data_file: str
:param hits_file: Name of the output compressed CSV file (.csv.gz)
:type hits_file: str
:param start: 0-based starting row in oned_data_file
:type start: int
:param stop: Upper limit on the rows to screen. For example, if start=5
and stop=10, rows 5, 6, 7, 8 and 9 will be screened. Screening
proceeds until the end of the file by default.
:type stop: int
:param write_query_row: Whether to write the 1D query as the first row
:type write_query_row: bool
:param sort: Whether to write a sorted hits file
:type sort: bool
:param max_hits: Cap on the number of sorted hits to write
:type max_hits: int
:param min_sim: Write only hits whose similarity to the query are
greater than or equal to this value
:type min_sim: float
:param logger: Logger for info level progress messages
:type logger: logging.Logger or NoneType
:param progress_interval: Interval between progress messages
:type progress_interval: int
:return: Number of hits written
:rtype: int
"""
master_props = get_master_oned_properties([oned_data_file])
master_props.append(ONED_SIM_PROPERTY)
oned_query = get_oned_query(st_query, oned_data_file)
oned_sim_calc = phase.OneDSimCalc(oned_query)
encoder = phase.OneDEncoder()
query_row = None
if write_query_row:
encoder.fromOneDRep(oned_query)
query_row = get_oned_query_row(st_query, encoder.toBase64(),
master_props)
hits_file_out = hits_file
if sort:
# Use PhpSortCapper to reduce the number of unsorted hits written
# to disk.
hit_capper = phase.PhpSortCapperDoubleDecreasing(max_hits)
hits_file_out += '.tmp'
hit_count = 0
with gzip.open(hits_file_out, 'wt', newline='') as fh:
writer = csv.writer(fh)
writer.writerow(master_props)
if write_query_row and not sort:
writer.writerow(query_row)
struct_count = 0
for row in get_oned_data_file_rows(oned_data_file, start, stop):
encoder.fromBase64(row[2])
sim = oned_sim_calc.computeSim(encoder.toOneDRep())
struct_count += 1
if sim >= min_sim:
write_hit = not sort or hit_capper.insert(sim)
if write_hit:
writer.writerow(row + [str(sim)])
hit_count += 1
if logger and struct_count % progress_interval == 0:
logger.info(f'{struct_count} structures screened')
if logger:
logger.info(f'Total number of structures screened: {struct_count}')
if sort:
if logger:
logger.info('Sorting hits by decreasing similarity to query')
hit_count = combine_oned_hits([hits_file_out], hits_file, query_row,
sort, max_hits, max_rows)
fileutils.force_remove(hits_file_out)
if logger:
logger.info(f'Number of hits written: {hit_count}')
return hit_count
[docs]def split_structure_file(structure_file, prefix_out, nfiles):
"""
Splits a structure file into a number of smaller, equal-sized files
named <prefix_out>_1.<ext>, <prefix_out>_2.<ext>, etc., where <ext>
will be 'smi.gz', 'csv.gz', 'maegz' or 'sdfgz', depending on the type
of file supplied.
:param structure_file: The name of the structure file to be split
:type oned_data_file: str
:param prefix_out: Prefix for all output structure files
:type prefix_out: str
:param nfiles: The number of output structure files to create
:type nfiles: int
:return: The names of the files created
:rtype: list(str)
:raise: ValueError if the file format is illegal
"""
in_format = fileutils.get_structure_file_format(structure_file)
if in_format in (fileutils.MAESTRO, fileutils.SD):
splitter = phase.PhpStructureSplitter([structure_file], nfiles,
prefix_out)
return list(splitter.getFileNamesOut())
elif in_format in (fileutils.SMILES, fileutils.SMILESCSV):
ext = SPLIT_STRUCTURE_FILE_FORMAT_EXT[in_format]
write_header = True if in_format == fileutils.SMILESCSV else False
nstruct = structure.count_structures(structure_file)
row_counts = phase.partitionValues(nstruct, min(nfiles, nstruct))
outfiles = []
with fileutils.open_maybe_compressed(structure_file, 'rt') as reader:
header = reader.readline() if write_header else None
for i, row_count in enumerate(row_counts, 1):
outfile = f'{prefix_out}_{i}{ext}'
outfiles.append(outfile)
with gzip.open(outfile, 'wt', newline='') as writer:
if write_header:
writer.write(header)
for row in range(row_count):
writer.write(reader.readline())
return outfiles
else:
raise ValueError(f'Illegal structure file format: "{structure_file}"')
[docs]def split_oned_data_file(oned_data_file, prefix_out, nfiles):
"""
Splits a 1D data file into a number of smaller, equal-sized files
named <prefix_out>_1.1dbin, <prefix_out>_2.1dbin, etc.
:param oned_data_file: The name of the 1D data file to be split
:type oned_data_file: str
:param prefix_out: Prefix for all output 1D data files
:type prefix_out: str
:param nfiles: The number of output 1D data files to create
:type nfiles: int
:return: The names of the files created
:rtype: list(str)
"""
file_splits = get_oned_data_file_splits(oned_data_file, prefix_out, nfiles)
with gzip.open(oned_data_file, 'rt') as fh_in:
reader = csv.reader(fh_in, delimiter='\t')
oned_attr = next(reader)
properties = next(reader)
try:
for i, file_split in enumerate(file_splits):
outfile = file_split[0]
start, stop = file_split[1]
with gzip.open(outfile, 'wt', newline='') as fh_out:
writer = csv.writer(fh_out, delimiter='\t')
writer.writerow(oned_attr)
writer.writerow(properties)
for j in range(start, stop):
writer.writerow(next(reader))
write_oned_data_file_row_count(outfile, stop - start)
except gzip.BadGzipFile:
pass
return [file_split[0] for file_split in file_splits]
[docs]def write_oned_data_file_row_count(oned_data_file, row_count):
"""
Appends the total number of rows to a 1D data file.
:param oned_data_file: Name of the 1D data file (.1dbin)
:type oned_data_file: str
:param row_count: Total number of rows in file
:type row_count: int
"""
with open(oned_data_file, 'ab') as writer:
writer.write(
row_count.to_bytes(ONED_ROW_COUNT_BYTES,
byteorder='big',
signed=False))