schrodinger.seam.testing.stu_tests.streamed_bundles_trial_deadline module¶
Test that streamed bundles correctly fall back to a distributed fleet when the trial worker’s deadline is exceeded.
When SCHRODINGER_USE_STREAMED_BUNDLES is enabled, the trial worker streams elements into a bundle. If the transform takes longer than the trial deadline, the bundle should be cancelled and the executor should fall back to running a distributed fleet — just as it does in buffered (non-streaming) mode.
This test creates a pipeline that batches 8 elements into a single batch (via BatchElements) and then processes the batch with a slow map that sleeps 50 seconds per element (400 seconds total per batch). This exceeds the trial worker deadline (~60s + 5min grace period = ~360s), forcing a fallback to distributed workers.
The test verifies that all elements are present in the output — if the streamed bundle deadline silently drops elements, the assertion will fail.
Usage:
SCHRODINGER_USE_STREAMED_BUNDLES=True \
$SCHRODINGER/run python3 -m schrodinger.seam.testing.stu_tests.streamed_bundles_trial_deadline
- class schrodinger.seam.testing.stu_tests.streamed_bundles_trial_deadline.SlowBatchProcessor(*unused_args, **unused_kwargs)¶
Bases:
DoFnProcess a batch by sleeping 10s per element, then yielding each one.
- process(batch)¶
Method to use for processing elements.
This is invoked by
DoFnRunnerfor each element of a inputPCollection.The following parameters can be used as default values on
processarguments to indicate that a DoFn accepts the corresponding parameters. For example, a DoFn might accept the element and its timestamp with the following signature:def process(element=DoFn.ElementParam, timestamp=DoFn.TimestampParam): ...
The full set of parameters is:
DoFn.ElementParam: element to be processed, should not be mutated.DoFn.SideInputParam: a side input that may be used when processing.DoFn.TimestampParam: timestamp of the input element.DoFn.WindowParam:Windowthe input element belongs to.DoFn.TimerParam: auserstate.RuntimeTimerobject defined by the spec of the parameter.DoFn.StateParam: auserstate.RuntimeStateobject defined by the spec of the parameter.DoFn.KeyParam: key associated with the element.DoFn.RestrictionParam: aniobase.RestrictionTrackerwill be provided here to allow treatment as a SplittableDoFn. The restriction tracker will be derived from the restriction provider in the parameter.DoFn.WatermarkEstimatorParam: a function that can be used to track output watermark of SplittableDoFnimplementations.DoFn.BundleContextParam: allows a shared context manager to be used per bundleDoFn.SetupContextParam: allows a shared context manager to be used per DoFn
- schrodinger.seam.testing.stu_tests.streamed_bundles_trial_deadline.main()¶