schrodinger.seam.transforms.outcomes module¶
Outcome handling for Seam transforms.
This module provides OutcomeParDo, a drop-in replacement for beam.ParDo that automatically routes outcomes to the appropriate output collections.
- [inputs] → OutcomeParDo → [main] (successful outputs)
→ [dropped] (intentional drops + errors)
dropped = PCollection[DroppedRecord]
- class schrodinger.seam.transforms.outcomes.DroppedRecord(input: Any, drop_reason: str, failure_type: str | None = None, traceback: str | None = None, transform_type: str | None = None, transform_label: str | None = None)¶
Bases:
objectA dropped element with context.
Used for both intentional drops (e.g., filter criteria not met) and errors (e.g., exceptions during processing).
Usage in DoFn:
def process(self, element): if element.mw > 500: yield DroppedRecord( input=element, drop_reason=f"MW {element.mw} exceeds 500" ) return yield element
- input: Any¶
- drop_reason: str¶
- failure_type: str | None = None¶
- traceback: str | None = None¶
- transform_type: str | None = None¶
- transform_label: str | None = None¶
- __init__(input: Any, drop_reason: str, failure_type: str | None = None, traceback: str | None = None, transform_type: str | None = None, transform_label: str | None = None) None¶
- class schrodinger.seam.transforms.outcomes.ErrorHandling(*, exc_class: type[Exception] | tuple[type[Exception], ...] = <class 'Exception'>)¶
Bases:
BaseModelConfiguration for error handling in OutcomeParDo.
When provided to OutcomeParDo, exceptions matching exc_class will be caught and routed to the dropped output. Without error_handling, exceptions propagate and crash the pipeline.
- exc_class: type[Exception] | tuple[type[Exception], ...]¶
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [
ConfigDict][pydantic.config.ConfigDict].
- class schrodinger.seam.transforms.outcomes.OutcomePCollections(main: PCollection, dropped: apache_beam.pvalue.PCollection | None = None)¶
Bases:
objectResult of a transform with main and dropped outputs.
Usage:
result = inputs | OutcomeParDo(MyDoFn()) # Chain to next transform (uses .main implicitly) result | NextTransform() # Access outputs explicitly good = result.main bad = result.dropped
- __init__(main: PCollection, dropped: apache_beam.pvalue.PCollection | None = None)¶
- property main: PCollection¶
The successful outputs.
- property dropped: PCollection¶
The dropped elements (PCollection[DroppedRecord]).
Includes both intentional drops and errors.
- class schrodinger.seam.transforms.outcomes.OutcomeParDo(dofn: DoFn, error_handling: ErrorHandling | None = None, transform_type: str = '')¶
Bases:
PTransformDrop-in replacement for beam.ParDo with automatic outcome routing.
Produces two outputs: - main: Successfully processed elements - dropped: Elements that were dropped (intentional, error, or no output)
DoFns can yield DroppedRecord directly to indicate intentional rejection:
class MyDoFn(beam.DoFn): def process(self, element: Structure) -> Iterable[Structure | DroppedRecord]: if element.mw > 500: yield DroppedRecord( input=element, drop_reason=f"MW {element.mw} exceeds 500" ) return yield element
OutcomeParDo automatically creates DroppedRecords for: - Exceptions (when error_handling is configured) - Inputs that produce no output
Usage:
# Errors propagate (crash pipeline) result = inputs | OutcomeParDo(MyDoFn()) # Catch all exceptions -> route to dropped result = inputs | OutcomeParDo(MyDoFn(), error_handling=ErrorHandling()) # Catch only specific exceptions result = inputs | OutcomeParDo( MyDoFn(), error_handling=ErrorHandling(exc_class=ValueError) )
- __init__(dofn: DoFn, error_handling: ErrorHandling | None = None, transform_type: str = '')¶
- expand(pcoll: PCollection) OutcomePCollections¶