schrodinger.seam.testing.stu_tests.streamed_bundles_trial_deadline module

Test that streamed bundles correctly fall back to a distributed fleet when the trial worker’s deadline is exceeded.

When SCHRODINGER_USE_STREAMED_BUNDLES is enabled, the trial worker streams elements into a bundle. If the transform takes longer than the trial deadline, the bundle should be cancelled and the executor should fall back to running a distributed fleet — just as it does in buffered (non-streaming) mode.

This test creates a pipeline that batches 8 elements into a single batch (via BatchElements) and then processes the batch with a slow map that sleeps 50 seconds per element (400 seconds total per batch). This exceeds the trial worker deadline (~60s + 5min grace period = ~360s), forcing a fallback to distributed workers.

The test verifies that all elements are present in the output — if the streamed bundle deadline silently drops elements, the assertion will fail.

Usage:

SCHRODINGER_USE_STREAMED_BUNDLES=True \
    $SCHRODINGER/run python3 -m schrodinger.seam.testing.stu_tests.streamed_bundles_trial_deadline
class schrodinger.seam.testing.stu_tests.streamed_bundles_trial_deadline.SlowBatchProcessor(*unused_args, **unused_kwargs)

Bases: DoFn

Process a batch by sleeping 10s per element, then yielding each one.

process(batch)

Method to use for processing elements.

This is invoked by DoFnRunner for each element of a input PCollection.

The following parameters can be used as default values on process arguments to indicate that a DoFn accepts the corresponding parameters. For example, a DoFn might accept the element and its timestamp with the following signature:

def process(element=DoFn.ElementParam, timestamp=DoFn.TimestampParam):
  ...

The full set of parameters is:

  • DoFn.ElementParam: element to be processed, should not be mutated.

  • DoFn.SideInputParam: a side input that may be used when processing.

  • DoFn.TimestampParam: timestamp of the input element.

  • DoFn.WindowParam: Window the input element belongs to.

  • DoFn.TimerParam: a userstate.RuntimeTimer object defined by the spec of the parameter.

  • DoFn.StateParam: a userstate.RuntimeState object defined by the spec of the parameter.

  • DoFn.KeyParam: key associated with the element.

  • DoFn.RestrictionParam: an iobase.RestrictionTracker will be provided here to allow treatment as a Splittable DoFn. The restriction tracker will be derived from the restriction provider in the parameter.

  • DoFn.WatermarkEstimatorParam: a function that can be used to track output watermark of Splittable DoFn implementations.

  • DoFn.BundleContextParam: allows a shared context manager to be used per bundle

  • DoFn.SetupContextParam: allows a shared context manager to be used per DoFn

Args:

element: The element to be processed *args: side inputs **kwargs: other keyword arguments.

Returns:

An Iterable of output elements or None.

schrodinger.seam.testing.stu_tests.streamed_bundles_trial_deadline.main()