"""
This module provides functionality for splitting a large data set into
smaller chunks for scalable diversity selection via DiversitySelector.
Copyright Schrodinger LLC, All Rights Reserved.
"""
import itertools
import math
from operator import itemgetter
import numpy
from schrodinger.application.combinatorial_diversity import diversity_selector
from schrodinger.infra import canvas
[docs]def compute_factor_scores(xcols, evectors):
"""
Given N columns of autoscaled X variables and the N eigenvectors obtained
from PCA of those X variables, this function computes the score on each
eigenvector for each row of X values.
:param evectors: Eigenvectors from a PCA analysis. The jth eigenvector
is stored in evectors[:, j].
:type evectors: numpy.ndarray
:param xcols: Columns of autoscaled X variables. The jth column is stored
in xcols[j].
:type xcols: numpy.ndarray
:return: N PCA scores for each row in xcols. The shape of the returned
vector is (xcols.shape[1], xcols.shape[0]), i.e., the shape of
the transpose of xcols.
:rtype: numpy.ndarray
"""
return numpy.matmul(evectors, xcols).transpose()
[docs]def compute_sim_to_probes(fp_file, probe_rows):
"""
Given a 32-bit fingerprint file and the 0-based row numbers for N
diverse probe structures, this function computes columns of autoscaled
Tanimoto similarities between the probes and all fingerprints in the
file.
:param fp_file: Input file of 32-bit fingerprints.
:type fp_file: str
:param probe_rows: List of 0-based fingerprint row numbers for N
diverse probe structures.
:type probe_rows: list(int)
:return: N columns of autoscaled similarities.
:rtype: numpy.ndarray
"""
fpin = canvas.ChmFPIn32(fp_file)
probe_fps = []
for row in probe_rows:
fpin.setPos(row + 1)
probe_fps.append(fpin.next())
num_rows = fpin.getRowCount()
num_probes = len(probe_rows)
sim_cols = numpy.zeros((num_probes, num_rows), numpy.double)
fpin.rewind()
# Compute raw similarities to probes.
i = 0
while fpin.hasNext():
fp = fpin.next()
for j, probe_fp in enumerate(probe_fps):
sim_cols[j][i] = fp.simTanimoto(probe_fp)
i += 1
# Autoscale similarities.
for j in range(num_probes):
sim_cols[j] -= numpy.mean(sim_cols[j])
sim_cols[j] /= numpy.std(sim_cols[j])
return sim_cols
[docs]def create_sim_cormat(sim_cols):
"""
Given N columns of autoscaled similarities, this function creates an
an NxN matrix of Pearson correlations among those columns.
:param sim_cols: N columns of autoscaled similarities.
:type sim_cols: numpy.ndarray
:return: NxN correlation matrix.
:rtype: numpy.ndarray
"""
ncols, nrows = sim_cols.shape
cormat = numpy.zeros((ncols, ncols), numpy.double)
for i in range(ncols):
cormat[i][i] = 1.0
for j in range(i):
cormat[i][j] = numpy.dot(sim_cols[i], sim_cols[j]) / nrows
cormat[j][i] = cormat[i][j]
return cormat
[docs]def diagonalize_symmat(symmat):
"""
Diagonalizes a real, symmetric matrix and returns the eigenvalues
and eigenvectors sorted by decreasing eigenvalue.
:param symmat: Real, symmetric matrix. Not modified.
:type symmat: numpy.ndarray
:return: Reverse-sorted eigenvalues, followed by eigenvectors. The
jth eigenvector is stored in the column slice [:, j] of the
returned numpy.ndarray.
:rtype: numpy.float64, numpy.ndarray
"""
evalues, evectors = numpy.linalg.eig(symmat)
evalues = numpy.real(evalues)
evectors = numpy.real(evectors)
# Reverse-sort evalues and reorder columns of evectors accordingly.
n = symmat.shape[0]
sorted_evalues, indices = zip(
*sorted(zip(evalues, range(n)), key=itemgetter(0), reverse=True))
sorted_evectors = evectors[:, indices]
# There's no guarantee that different platforms/machines will yield
# eigenvectors with consistent algebraic signs. To standardize results,
# ensure that the largest element of each eigenvector is positive.
for j in range(n):
esign = math.copysign(1, max(sorted_evectors[:, j], key=math.fabs))
sorted_evectors[:, j] *= esign
return sorted_evalues, sorted_evectors
[docs]def get_all_orthant_strings(ndim):
"""
Yields all possible orthant strings for the given number of dimensions.
For example, if ndim = 2, this function would yield the 2-dimensional
orthant strings '++', '+-', '-+', '--'. These correspond to the usual
4 quadrants in xy space.
:param ndim: Number of dimensions.
:type ndim: int
:yield: All possible orthant strings of length ndim.
:ytype: str
"""
for sign_list in itertools.product('+-', repeat=ndim):
yield ''.join(sign_list)
[docs]def get_orthant_strings(scores, ndim):
"""
Given PCA factor scores over the full set of eigenvectors and a desired
number of dimensions in that factor space, this function yields strings
containing '+' and '-' characters which indicate the orthant in which
each row of scores resides. A value of '+' is assigned if score >= 0 and
a value of '-' is assigned if score is < 0.
For example, if a given row consists of the following scores on 8
factors:
[1.3289, -0.2439, -2.1774, 0.8391, 1.4632, -0.6268, 1.2238, -1.7802]
and ndim = 4, the orthant string would be '+--+'.
:param scores: PCA factor scores (see compute_factor_scores).
:type scores: numpy.ndarray
:param ndim: Number of factors to consider. This determines the
number of characters in each orthant string.
:type ndim: int
:yield: Orthant string for each row in scores.
:ytype: str
"""
for row in scores:
signs = ndim * ['+']
for j in range(ndim):
if row[j] < 0.0:
signs[j] = '-'
yield "".join(signs)
[docs]def partition_scores(scores, min_pop):
"""
Given PCA factor scores over the full set of eigenvectors and a minimum
required population, this function partitions the scores into distinct
orthant pairs of nearly equal population, where the smallest population
is guaranteed to be at least min_pop. This is achieved by making a series
of calls to get_orthant_strings with progressively larger values of ndim,
grouping the scores by orthant string, sorting by population size and
then combining the highest and lowest populations, the 2nd highest and
2nd lowest populations, etc. These combined populations decrease as ndim
is increased, and the largest value of ndim which allows min_pop to be
satisfied is used.
For example:
1. Suppose ndim=4 is the largest dimension that satisfies min_pop
2. Suppose a given row of scores yields the orthant string '-+-+'
3. Suppose orthant '-+-+' is combined with orthant '+--+'
4. That row of scores would be assigned to orthant pair '+--+|-+-+'
:param scores: PCA factor scores (see compute_factor_scores).
:type scores: numpy.ndarray
:param min_pop: Minimum required population of any orthant pair.
:type min_pop: int
:return: Dictionary of orthant pair --> list of 0-based row numbers.
:rtype: dict{str: list(int)}
"""
nrows, ncols = scores.shape
if nrows <= min_pop:
# Use all rows.
return {'+|-': list(range(nrows))}
ndim_max = 1
orthant_pairs_max = ['+', '-']
for i in range(ncols):
ndim = i + 1
# Accumulate counts by orthant string.
counts = dict(zip(get_all_orthant_strings(ndim), 2**ndim * [0]))
for orthant_string in get_orthant_strings(scores, ndim):
counts[orthant_string] += 1
sorted_counts = sorted([(n, orthant) for orthant, n in counts.items()])
# Combine largest, smallest, 2nd largest, 2nd smallest, etc.
nhalf = int(len(sorted_counts) / 2)
min_pop_failure = False
orthant_pairs = []
for j1 in range(nhalf):
j2 = -j1 - 1
combined_count = sorted_counts[j1][0] + sorted_counts[j2][0]
orthant_pair = sorted([sorted_counts[j1][1], sorted_counts[j2][1]])
orthant_pairs.append(orthant_pair)
if combined_count < min_pop:
min_pop_failure = True
break
if min_pop_failure:
break
ndim_max = ndim
orthant_pairs_max = orthant_pairs
# Assign final dictionary of orthant pairs and row numbers at ndim_max.
members = {}
for orthant_string in get_all_orthant_strings(ndim_max):
members[orthant_string] = []
for i, orthant_string in enumerate(get_orthant_strings(scores, ndim_max)):
members[orthant_string].append(i)
orthant_pair_dict = {}
for orthant_pair in orthant_pairs_max:
orth1 = orthant_pair[0]
orth2 = orthant_pair[1]
pair_string = f'{orth1}|{orth2}'
orthant_pair_dict[pair_string] = sorted(members[orth1] + members[orth2])
return orthant_pair_dict
[docs]def select_probes(fp_file, num_probes, rand_seed):
"""
Selects the requested number of diverse probe structures from the
provided 32-bit fingerprint file and returns the corresponding 0-based
fingerprint row numbers.
:param fp_file: Input file of 32-bit fingerprints and SMILES.
:type fp_file: str
:param num_probes: Number of diverse probe structures.
:type num_probes: int
:param rand_seed: Random seed for underlying diversity algorithm.
:type rand_seed: int
:return: List of 0-based row numbers for diverse probe structures.
:rtype: list(int)
"""
selector = diversity_selector.DiversitySelector(fp_file,
rand_seed=rand_seed)
selector.select(num_probes)
return selector.subset_rows