schrodinger.seam.examples.haystack module

A pedagogical example workflow to demonstrate how logging and works with seam.

The workflow creates a collection of strings, most of which are “haystalk” and a few of which are “needle”. The workflow then processes each item in the collection, sleeping for a fixed amount of time for each “haystalk”/”needle” and aggregating them into lists.

To get a sense of how logging works, run this example and read the README found in the generated “seam/logs” directory.

Basic usage (est. walltime: 1m)

$SCHRODINGER/run haystack

Parallelized usage with jobserver (est. walltime: 5m)

$SCHRODINGER/run haystack –per-stalk-sleep-time 0.001 -HOST localhost:8

class schrodinger.seam.examples.haystack.Bale(sleep_per_item: float, error_on_needle: bool)

Bases: schrodinger.seam.examples.haystack.Bale

A DoFn that aggregates elements into lists and sleeps for a fixed amount of time per element.

__init__(sleep_per_item: float, error_on_needle: bool)

Called before a bundle of elements is processed on a worker.

Elements to be processed are split into bundles and distributed to workers. Before a worker calls process() on the first element of its bundle, it calls this method.

process(element: str)

Method to use for processing elements.

This is invoked by DoFnRunner for each element of a input PCollection.

The following parameters can be used as default values on process arguments to indicate that a DoFn accepts the corresponding parameters. For example, a DoFn might accept the element and its timestamp with the following signature:

def process(element=DoFn.ElementParam, timestamp=DoFn.TimestampParam):

The full set of parameters is:

  • DoFn.ElementParam: element to be processed, should not be mutated.

  • DoFn.SideInputParam: a side input that may be used when processing.

  • DoFn.TimestampParam: timestamp of the input element.

  • DoFn.WindowParam: Window the input element belongs to.

  • DoFn.TimerParam: a userstate.RuntimeTimer object defined by the spec of the parameter.

  • DoFn.StateParam: a userstate.RuntimeState object defined by the spec of the parameter.

  • DoFn.KeyParam: key associated with the element.

  • DoFn.RestrictionParam: an iobase.RestrictionTracker will be provided here to allow treatment as a Splittable DoFn. The restriction tracker will be derived from the restriction provider in the parameter.

  • DoFn.WatermarkEstimatorParam: a function that can be used to track output watermark of Splittable DoFn implementations.


element: The element to be processed *args: side inputs **kwargs: other keyword arguments.


An Iterable of output elements or None.


Called after a bundle of elements is processed on a worker.


alias of apache_beam.transforms.core._BundleFinalizerParam

DoFnProcessParams = [ElementParam, SideInputParam, TimestampParam, WindowParam, <class 'apache_beam.transforms.core._WatermarkEstimatorParam'>, PaneInfoParam, <class 'apache_beam.transforms.core._BundleFinalizerParam'>, KeyParam, <class 'apache_beam.transforms.core._StateDoFnParam'>, <class 'apache_beam.transforms.core._TimerDoFnParam'>]
DynamicTimerTagParam = DynamicTimerTagParam
ElementParam = ElementParam
KeyParam = KeyParam
PaneInfoParam = PaneInfoParam

alias of apache_beam.transforms.core._RestrictionDoFnParam

SideInputParam = SideInputParam

alias of apache_beam.transforms.core._StateDoFnParam


alias of apache_beam.transforms.core._TimerDoFnParam

TimestampParam = TimestampParam

alias of apache_beam.transforms.core._WatermarkEstimatorParam

WindowParam = WindowParam
display_data() dict

Returns the display data associated to a pipeline component.

It should be reimplemented in pipeline components that wish to have static display data.


Dict[str, Any]: A dictionary containing key:value pairs. The value might be an integer, float or string value; a DisplayDataItem for values that have more data (e.g. short value, label, url); or a HasDisplayData instance that has more display data that should be picked up. For example:

  'key1': 'string_value',
  'key2': 1234,
  'key3': 3.14159265,
  'key4': DisplayDataItem('', url=''),
  'key5': subComponent
static from_callable(fn)
classmethod from_runner_api(fn_proto: Type[RunnerApiFnT], context: beam_runner_api_pb2.FunctionSpec) RunnerApiFnT

Converts from an FunctionSpec to a Fn object.

Prefer registering a urn with its parameter type and constructor.

get_input_batch_type(input_element_type) Optional[Union[apache_beam.typehints.typehints.TypeConstraint, type]]

Determine the batch type expected as input to process_batch.

The default implementation of get_input_batch_type simply observes the input typehint for the first parameter of process_batch. A Batched DoFn may override this method if a dynamic approach is required.

input_element_type: The element type of the input PCollection this

DoFn is being applied to.


None if this DoFn cannot accept batches, else a Beam typehint or a native Python typehint.

get_output_batch_type(input_element_type) Optional[Union[apache_beam.typehints.typehints.TypeConstraint, type]]

Determine the batch type produced by this DoFn’s process_batch implementation and/or its process implementation with @yields_batch.

The default implementation of this method observes the return type annotations on process_batch and/or process. A Batched DoFn may override this method if a dynamic approach is required.

input_element_type: The element type of the input PCollection this

DoFn is being applied to.


None if this DoFn will never yield batches, else a Beam typehint or a native Python typehint.


Gets and/or initializes type hints for this object.

If type hints have not been set, attempts to initialize type hints in this order: - Using self.default_type_hints(). - Using self.__class__ type hints.

process_batch(batch, *args, **kwargs)
classmethod register_pickle_urn(pickle_urn)

Registers and implements the given urn via pickling.

classmethod register_urn(urn, parameter_type, fn=None)

Registers a urn with a constructor.

For example, if ‘beam:fn:foo’ had parameter type FooPayload, one could write RunnerApiFn.register_urn('bean:fn:foo', FooPayload, foo_from_proto) where foo_from_proto took as arguments a FooPayload and a PipelineContext. This function can also be used as a decorator rather than passing the callable in as the final parameter.

A corresponding to_runner_api_parameter method would be expected that returns the tuple (‘beam:fn:foo’, FooPayload)


Called to prepare an instance for processing bundles of elements.

This is a good place to initialize transient in-memory resources, such as network connections. The resources can then be disposed in DoFn.teardown.


Called to use to clean up this instance before it is discarded.

A runner will do its best to call this method on any given instance to prevent leaks of transient resources, however, there may be situations where this is impossible (e.g. process crash, hardware failure, etc.) or unnecessary (e.g. the pipeline is shutting down and the process is about to be killed anyway, so all transient resources will be released automatically by the OS). In these cases, the call may not happen. It will also not be retried, because in such situations the DoFn instance no longer exists, so there’s no instance to retry it on.

Thus, all work that depends on input elements, and all externally important side effects, must be performed in DoFn.process or DoFn.finish_bundle.

to_runner_api(context: PipelineContext) beam_runner_api_pb2.FunctionSpec

Returns an FunctionSpec encoding this Fn.

Prefer overriding self.to_runner_api_parameter.


Returns the urn and payload for this Fn.

The returned urn(s) should be registered with register_urn.

static unbounded_per_element()

A decorator on process fn specifying that the fn performs an unbounded amount of work per input element.

with_input_types(*arg_hints: apache_beam.typehints.decorators.WithTypeHintsT, **kwarg_hints: Any) apache_beam.typehints.decorators.WithTypeHintsT
with_output_types(*arg_hints: apache_beam.typehints.decorators.WithTypeHintsT, **kwarg_hints: Any) apache_beam.typehints.decorators.WithTypeHintsT
static yields_batches(fn)

A decorator to apply to process indicating it yields batches.

By default process is assumed to both consume and produce individual elements at a time. This decorator indicates that process produces “batches”, which are collections of multiple logical Beam elements.

static yields_elements(fn)

A decorator to apply to process_batch indicating it yields elements.

By default process_batch is assumed to both consume and produce “batches”, which are collections of multiple logical Beam elements. This decorator indicates that process_batch produces individual elements at a time. process_batch is always expected to consume batches.