schrodinger.seam.transforms.grouping module

class schrodinger.seam.transforms.grouping.GroupIntoBins(bin_key_func: Callable[[V], float], bin_width: int)

Bases: PTransform

Bins (key, value)’s into ((key, bin_index), [values]) tuples based on a given bin_width.

The “bin index” is calculated by 1) First passing the value into bin_key_fun to get a bin value, 2) Then, dividing the bin value by the bin width and rounding down to the nearest integer.

Parameters:
  • bin_key_func – A function that takes a value and returns a value to bin by. If not specified, the value itself is used.

  • bin_width – The width of each bin.

Example usage:

>>> import apache_beam as beam
>>> from schrodinger.application.transforms.dock import Dock
>>> from schrodinger.seam.io import chemio
>>> from schrodinger.test import mmshare_data_file
>>> with beam.Pipeline() as p:
...    input_data = [('a', 1), ('a', 2), ('a', 6), ('b', 5), ('b', 9)]
...    result = (
...        p
...        | beam.Create(input_data)
...        | GroupIntoBins(bin_key_func=lambda x: x, bin_width=3)
...        | beam.Map(print))
(('a', 0), [1, 2])
(('a', 2), [6])
(('b', 1), [5])
(('b', 3), [9])
__init__(bin_key_func: Callable[[V], float], bin_width: int)
expand(pcoll: PCollection[Tuple[K, V]]) PCollection[Tuple[Tuple[K, int], Iterable[V]]]
class schrodinger.seam.transforms.grouping.UnbinAndPartitionBoundaryValues(k: int, key_func: Optional[Callable[[V], float]] = None)

Bases: PTransform

Unbins and partitions the input into two pcollections of boundary and non-boundary values.

The boundary values are the first and last k values in each bin after sorting. The non-boundary values are the rest of the values in each bin.

Example usage:

>>> import apache_beam as beam
>>> from schrodinger.seam.transforms.grouping import UnbinAndPartitionBoundaryValues
>>> with beam.Pipeline() as p:
...     bins = [
...         ('a', [1, 2, 3])
...     ]
...     pcoll = p | beam.Create(bins)
...     split = pcoll | UnbinAndPartitionBoundaryValues(k=1)
...     _ = split.boundary_values | beam.Map(lambda x: print("Boundary", x))
...     _ = split.nonboundary_values | beam.Map(lambda x: print("NonBoundary", x))
Boundary ('a', 1)
NonBoundary ('a', 2)
Boundary ('a', 3)
BOUNDARY_TAG = 'boundary_values'
NON_BOUNDARY_TAG = 'nonboundary_values'
__init__(k: int, key_func: Optional[Callable[[V], float]] = None)
Parameters:
  • k – Used to define the first and last k elements as boundary values

  • key_func – Sorting key used to sort a list of values

expand(pcoll)
class schrodinger.seam.transforms.grouping.AssignNearestBinKey(bin_key_func: Callable[[V], float], bin_width: float)

Bases: PTransform

Assigns the nearest bin key to each element in the input PCollection.

The bin key is calculated by dividing the value by the bin width and rounding down to the nearest integer.

Parameters:
  • bin_key_func – A function that takes a value and returns a value to bin by. If not specified, the value itself is used.

  • bin_width – The width of each bin.

Example usage:

>>> import apache_beam as beam
>>> from schrodinger.seam.transforms.grouping import AssignNearestBinKey
>>> with beam.Pipeline() as p:
...     input_data = [('a', 7), ('a', 12), ('a', 22), ('b', 4), ('b', 25)]
...     result = (
...         p
...         | beam.Create(input_data)
...         | AssignNearestBinKey(bin_key_func=lambda x: x, bin_width=10)
...     )
...     _ = result | beam.Map(print)
(('a', 1), 7)
(('a', 1), 12)
(('a', 2), 22)
(('b', 0), 4)
(('b', 2), 25)
__init__(bin_key_func: Callable[[V], float], bin_width: float)
expand(pcoll: PCollection[Tuple[K, V]]) PCollection[Tuple[Tuple[K, int], V]]