schrodinger.seam.transforms.core module

schrodinger.seam.transforms.core.yields_elements(finish_bundle_method: Callable)

A decorator that wraps a DoFn’s finish_bundle method to allow it to yield elements.

Example usage:

>>> import apache_beam as beam
>>> from schrodinger.seam.transforms import core
>>>
>>> class Batcher(beam.DoFn):
...
...     def __init__(self):
...         self._batch = []
...
...     def start_bundle(self):
...         self._batch.clear()
...
...     def process(self, element):
...         self._batch.append(element)
...
...     @yields_elements
...     def finish_bundle(self):
...         yield self._batch
...         self._batch.clear()
...
class schrodinger.seam.transforms.core.GroupIntoBatches(batch_size: int)

Bases: PTransform

PTransform that batches the input into desired batch size after grouping by key.

Identical behavior to Beam’s GroupIntoBatches but doesn’t use Beam timer’s (which are unsupported by the SeamRunner).

__init__(batch_size: int)
classmethod with_hot_keys(max_values_per_shard: int)

PTransform that batches the input into desired batch size after grouping by key.

Has the added feature of lowering memory by adding additional data into the keys such that the intermediate GroupByKey transform does not have to store all the data associated with keys that have a lot of values (“hot keys”) in memory prior to batching.

Parameters:

max_values_per_shard – the expected maximum amount of values per shard (every shard is not guaranteed to have fewer than max_values_per_shard and can exceed it by a small percentage)

Returns:

a group into batches with hot keys PTransform with

expand(pcoll)
class schrodinger.seam.transforms.core.TeeIf(condition: bool, *consumers: Union[PTransform[PCollection[T], Any], Callable[[PCollection[T]], Any]])

Bases: PTransform

A PTransform which performs beam.Tee() if the condition is True, and does nothing if the condition is False.

Example usage:

>>> import apache_beam as beam
>>> from schrodinger import structure
>>> from schrodinger.seam.io import chemio
>>> from schrodinger.seam.transforms import core
>>>
>>> sts = [structure.create_new_structure(num_atoms=i) for i in range(1, 11)]
>>> debug_flag = True
>>>
>>> with beam.Pipeline() as p:
...     st_count = (p
...                 | beam.Create(sts)
...                         | core.TeeIf(debug_flag, chemio.WriteStructuresToFile("output_file.maegz"))
...                 | beam.combiners.Count.Globally()
...                )
>>> # st_count will be 10, and output_file.maegz will be created because debug_flag was set to True
__init__(condition: bool, *consumers: Union[PTransform[PCollection[T], Any], Callable[[PCollection[T]], Any]])
expand(pcoll)