schrodinger.seam.transforms.grouping module

class schrodinger.seam.transforms.grouping.GroupIntoBins(bin_key_func: Callable[[V], float], bin_width: int)

Bases: PTransform

Bins (key, value)’s into ((key, bin_index), [values]) tuples based on a given bin_width.

The “bin index” is calculated by 1) First passing the value into bin_key_fun to get a bin value, 2) Then, dividing the bin value by the bin width and rounding down to the nearest integer.

Parameters:
  • bin_key_func – A function that takes a value and returns a value to bin by. If not specified, the value itself is used.

  • bin_width – The width of each bin.

Example usage:

>>> import apache_beam as beam
>>> from schrodinger.application.transforms.dock import Dock
>>> from schrodinger.seam.io import chemio
>>> from schrodinger.test import mmshare_data_file
>>> with beam.Pipeline() as p:
...    input_data = [('a', 1), ('a', 2), ('a', 6), ('b', 5), ('b', 9)]
...    result = (
...        p
...        | beam.Create(input_data)
...        | GroupIntoBins(bin_key_func=lambda x: x, bin_width=3)
...        | beam.Map(print))
(('a', 0), [1, 2])
(('a', 2), [6])
(('b', 1), [5])
(('b', 3), [9])
__init__(bin_key_func: Callable[[V], float], bin_width: int)
expand(pcoll: PCollection[Tuple[K, V]]) PCollection[Tuple[Tuple[K, int], Iterable[V]]]
class schrodinger.seam.transforms.grouping.GroupIntoBinsByCount(max_bin_size: int, key_func: Callable[[V], Any])

Bases: PTransform

Groups (key, value) pairs into ((key, bin_index), [values]) based on count.

This transform: 1) Adds a random UUID to each value’s sort key to break ties 2) Extracts sort keys from all values 3) Sorts the extracted keys to compute bin boundaries 4) Assigns each value to a bin based on these boundaries

This approach avoids loading all values into memory for sorting - only the extracted keys need to be sorted, which is much more memory-efficient for large objects like Structure instances.

The first max_bin_size values (lowest by key_func) go into bin 0, the next max_bin_size into bin 1, etc. The random UUID ensures that max_bin_size is strictly enforced even when there are ties in key_func.

Parameters:
  • max_bin_size – Maximum number of values per bin.

  • key_func – A function that takes a value and returns a sortable key.

Example usage:

>>> import apache_beam as beam
>>> with beam.Pipeline() as p:
...     # Each value is (energy, title). There are 4 items with energy=1.0,
...     # so a UUID generator determines which items go in bin 0 and 1.
...     # The title is not used in the sort; it just allows for tracking objects.
...     input_data = [
...         ('grp', (1.0, 'D')),
...         ('grp', (1.0, 'B')),
...         ('grp', (1.0, 'A')),
...         ('grp', (1.0, 'C')),
...         ('grp', (2.0, 'E')),
...         ('grp', (2.0, 'F')),
...     ]
...     result = (
...         p
...         | beam.Create(input_data)
...         | GroupIntoBinsByCount(
...             max_bin_size=3,
...             key_func=lambda x: x[0])  # only use energy to sort
...         | beam.Map(print))
(('grp', 0), [(1.0, 'A'), (1.0, 'B'), (1.0, 'C')])
(('grp', 1), [(1.0, 'D'), (2.0, 'E'), (2.0, 'F')])
# One of 'A', B', 'C', and 'D' was randomly chosen to go into bin 1
__init__(max_bin_size: int, key_func: Callable[[V], Any])
expand(pcoll: PCollection[Tuple[K, V]]) PCollection[Tuple[Tuple[K, int], Iterable[V]]]
class schrodinger.seam.transforms.grouping.UnbinAndPartitionBoundaryValues(k: int, key_func: Optional[Callable[[V], float]] = None)

Bases: PTransform

Unbins and partitions the input into two pcollections of boundary and non-boundary values.

The boundary values are the first and last k values in each bin after sorting. The non-boundary values are the rest of the values in each bin.

Example usage:

>>> import apache_beam as beam
>>> from schrodinger.seam.transforms.grouping import UnbinAndPartitionBoundaryValues
>>> with beam.Pipeline() as p:
...     bins = [
...         ('a', [1, 2, 3])
...     ]
...     pcoll = p | beam.Create(bins)
...     split = pcoll | UnbinAndPartitionBoundaryValues(k=1)
...     _ = split.boundary_values | beam.Map(lambda x: print("Boundary", x))
...     _ = split.nonboundary_values | beam.Map(lambda x: print("NonBoundary", x))
Boundary ('a', 1)
NonBoundary ('a', 2)
Boundary ('a', 3)
BOUNDARY_TAG = 'boundary_values'
NON_BOUNDARY_TAG = 'nonboundary_values'
__init__(k: int, key_func: Optional[Callable[[V], float]] = None)
Parameters:
  • k – Used to define the first and last k elements as boundary values

  • key_func – Sorting key used to sort a list of values

expand(pcoll)
class schrodinger.seam.transforms.grouping.AssignNearestBinKey(bin_key_func: Callable[[V], float], bin_width: float)

Bases: PTransform

Assigns the nearest bin key to each element in the input PCollection.

The bin key is calculated by dividing the value by the bin width and rounding to the nearest integer.

Parameters:
  • bin_key_func – A function that takes a value and returns a value to bin by. If not specified, the value itself is used.

  • bin_width – The width of each bin.

Example usage:

>>> import apache_beam as beam
>>> from schrodinger.seam.transforms.grouping import AssignNearestBinKey
>>> with beam.Pipeline() as p:
...     input_data = [('a', 7), ('a', 12), ('a', 22), ('b', 4), ('b', 25)]
...     result = (
...         p
...         | beam.Create(input_data)
...         | AssignNearestBinKey(bin_key_func=lambda x: x, bin_width=10)
...     )
...     _ = result | beam.Map(print)
(('a', 1), 7)
(('a', 1), 12)
(('a', 2), 22)
(('b', 0), 4)
(('b', 2), 25)
__init__(bin_key_func: Callable[[V], float], bin_width: float)
expand(pcoll: PCollection[Tuple[K, V]]) PCollection[Tuple[Tuple[K, int], V]]
class schrodinger.seam.transforms.grouping.UnbinAndPartitionBoundaryValuesByCount(k: int, key_func: Optional[Callable[[V], float]] = None)

Bases: PTransform

Unbins and partitions the input into boundary and non-boundary values, using and then re-assigning boundary indices for count-based binning.

For a bin with index N: - First k values (sorted by key_func) go to boundary index N - Last k values go to boundary index N + 1 - Middle values are non-boundary

This is designed for count-based binning where boundary indices must be explicitly assigned (unlike width-based binning where values can be used to determine boundary assignment). It effectively allows for cross-boundary comparison with count-based binning in a similar manner as the combination of UnbinAndPartitionBoundaryValues plus AssignNearestBinKey does for width-based binning.

Example usage:

>>> import apache_beam as beam
>>> with beam.Pipeline() as p:
...     bins = [
...         (('a', 0), [1, 2, 3, 4, 5]),
...         (('a', 1), [6, 7, 8, 9, 10]),
...     ]
...     pcoll = p | beam.Create(bins)
...     split = pcoll | UnbinAndPartitionBoundaryValuesByCount(k=2)
...     _ = split.boundary_values | beam.Map(lambda x: print("Boundary", x))
...     _ = split.nonboundary_values | beam.Map(lambda x: print("NonBoundary", x))
Boundary (('a', 0), 1)
Boundary (('a', 0), 2)
NonBoundary (('a', 0), 3)
Boundary (('a', 1), 4)  # changed from 0 to 1 because it's at high boundary
Boundary (('a', 1), 5)  # changed from 0 to 1 because it's at high boundary
Boundary (('a', 1), 6)
Boundary (('a', 1), 7)
NonBoundary (('a', 1), 8)
Boundary (('a', 2), 9)  # changed from 1 to 2 because it's at high boundary
Boundary (('a', 2), 10) # changed from 1 to 2 because it's at high boundary
BOUNDARY_TAG = 'boundary_values'
NON_BOUNDARY_TAG = 'nonboundary_values'
__init__(k: int, key_func: Optional[Callable[[V], float]] = None)
Parameters:
  • k – Number of elements at each boundary (first k and last k)

  • key_func – Sorting key used to sort values within each bin

expand(pcoll)