schrodinger.application.transforms.combiners module

class schrodinger.application.transforms.combiners.CombinePerKeyWithSideInputs(combine_fn, *args, **kwargs)

Bases: PTransform

An implementation of CombinePerKey that does mapper-side pre-combining and allows for keyword side inputs.

This transform should only be used when keyword side inputs are needed, as it’s not as efficient as the native CombinePerKey transform.

__init__(combine_fn, *args, **kwargs)
expand(pcoll)
class schrodinger.application.transforms.combiners.PartialGroupByKeyCombiningValues(combine_fn, args, kwargs)

Bases: DoFn

Aggregates values into a per-key-window cache.

As bundles are in-memory-sized, we don’t bother flushing until the very end.

__init__(combine_fn, args, kwargs)
setup()

Called to prepare an instance for processing bundles of elements.

This is a good place to initialize transient in-memory resources, such as network connections. The resources can then be disposed in DoFn.teardown.

start_bundle()

Called before a bundle of elements is processed on a worker.

Elements to be processed are split into bundles and distributed to workers. Before a worker calls process() on the first element of its bundle, it calls this method.

process(element, window=WindowParam, **side_input_kwargs)

Method to use for processing elements.

This is invoked by DoFnRunner for each element of a input PCollection.

The following parameters can be used as default values on process arguments to indicate that a DoFn accepts the corresponding parameters. For example, a DoFn might accept the element and its timestamp with the following signature:

def process(element=DoFn.ElementParam, timestamp=DoFn.TimestampParam):
  ...

The full set of parameters is:

  • DoFn.ElementParam: element to be processed, should not be mutated.

  • DoFn.SideInputParam: a side input that may be used when processing.

  • DoFn.TimestampParam: timestamp of the input element.

  • DoFn.WindowParam: Window the input element belongs to.

  • DoFn.TimerParam: a userstate.RuntimeTimer object defined by the spec of the parameter.

  • DoFn.StateParam: a userstate.RuntimeState object defined by the spec of the parameter.

  • DoFn.KeyParam: key associated with the element.

  • DoFn.RestrictionParam: an iobase.RestrictionTracker will be provided here to allow treatment as a Splittable DoFn. The restriction tracker will be derived from the restriction provider in the parameter.

  • DoFn.WatermarkEstimatorParam: a function that can be used to track output watermark of Splittable DoFn implementations.

  • DoFn.BundleContextParam: allows a shared context manager to be used per bundle

  • DoFn.SetupContextParam: allows a shared context manager to be used per DoFn

Args:

element: The element to be processed *args: side inputs **kwargs: other keyword arguments.

Returns:

An Iterable of output elements or None.

finish_bundle()

Called after a bundle of elements is processed on a worker.

teardown()

Called to use to clean up this instance before it is discarded.

A runner will do its best to call this method on any given instance to prevent leaks of transient resources, however, there may be situations where this is impossible (e.g. process crash, hardware failure, etc.) or unnecessary (e.g. the pipeline is shutting down and the process is about to be killed anyway, so all transient resources will be released automatically by the OS). In these cases, the call may not happen. It will also not be retried, because in such situations the DoFn instance no longer exists, so there’s no instance to retry it on.

Thus, all work that depends on input elements, and all externally important side effects, must be performed in DoFn.process or DoFn.finish_bundle.

class schrodinger.application.transforms.combiners.FinishCombine(combine_fn, args, kwargs)

Bases: DoFn

Merges partially combined results.

__init__(combine_fn, args, kwargs)
setup()

Called to prepare an instance for processing bundles of elements.

This is a good place to initialize transient in-memory resources, such as network connections. The resources can then be disposed in DoFn.teardown.

process(element, window=WindowParam, **side_input_kwargs)

Method to use for processing elements.

This is invoked by DoFnRunner for each element of a input PCollection.

The following parameters can be used as default values on process arguments to indicate that a DoFn accepts the corresponding parameters. For example, a DoFn might accept the element and its timestamp with the following signature:

def process(element=DoFn.ElementParam, timestamp=DoFn.TimestampParam):
  ...

The full set of parameters is:

  • DoFn.ElementParam: element to be processed, should not be mutated.

  • DoFn.SideInputParam: a side input that may be used when processing.

  • DoFn.TimestampParam: timestamp of the input element.

  • DoFn.WindowParam: Window the input element belongs to.

  • DoFn.TimerParam: a userstate.RuntimeTimer object defined by the spec of the parameter.

  • DoFn.StateParam: a userstate.RuntimeState object defined by the spec of the parameter.

  • DoFn.KeyParam: key associated with the element.

  • DoFn.RestrictionParam: an iobase.RestrictionTracker will be provided here to allow treatment as a Splittable DoFn. The restriction tracker will be derived from the restriction provider in the parameter.

  • DoFn.WatermarkEstimatorParam: a function that can be used to track output watermark of Splittable DoFn implementations.

  • DoFn.BundleContextParam: allows a shared context manager to be used per bundle

  • DoFn.SetupContextParam: allows a shared context manager to be used per DoFn

Args:

element: The element to be processed *args: side inputs **kwargs: other keyword arguments.

Returns:

An Iterable of output elements or None.

teardown()

Called to use to clean up this instance before it is discarded.

A runner will do its best to call this method on any given instance to prevent leaks of transient resources, however, there may be situations where this is impossible (e.g. process crash, hardware failure, etc.) or unnecessary (e.g. the pipeline is shutting down and the process is about to be killed anyway, so all transient resources will be released automatically by the OS). In these cases, the call may not happen. It will also not be retried, because in such situations the DoFn instance no longer exists, so there’s no instance to retry it on.

Thus, all work that depends on input elements, and all externally important side effects, must be performed in DoFn.process or DoFn.finish_bundle.