schrodinger.seam.transforms.outcomes module

Outcome handling for Seam transforms.

This module provides OutcomeParDo, a drop-in replacement for beam.ParDo that automatically routes outcomes to the appropriate output collections.

[inputs] → OutcomeParDo → [main] (successful outputs)

→ [dropped] (intentional drops + errors)

dropped = PCollection[DroppedRecord]

class schrodinger.seam.transforms.outcomes.DroppedRecord(input: Any, drop_reason: str, failure_type: str | None = None, traceback: str | None = None, transform_type: str | None = None, transform_label: str | None = None)

Bases: object

A dropped element with context.

Used for both intentional drops (e.g., filter criteria not met) and errors (e.g., exceptions during processing).

Usage in DoFn:

def process(self, element):
    if element.mw > 500:
        yield DroppedRecord(
            input=element,
            drop_reason=f"MW {element.mw} exceeds 500"
        )
        return
    yield element
input: Any
drop_reason: str
failure_type: str | None = None
traceback: str | None = None
transform_type: str | None = None
transform_label: str | None = None
__init__(input: Any, drop_reason: str, failure_type: str | None = None, traceback: str | None = None, transform_type: str | None = None, transform_label: str | None = None) None
class schrodinger.seam.transforms.outcomes.ErrorHandling(*, exc_class: type[Exception] | tuple[type[Exception], ...] = <class 'Exception'>)

Bases: BaseModel

Configuration for error handling in OutcomeParDo.

When provided to OutcomeParDo, exceptions matching exc_class will be caught and routed to the dropped output. Without error_handling, exceptions propagate and crash the pipeline.

exc_class: type[Exception] | tuple[type[Exception], ...]
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class schrodinger.seam.transforms.outcomes.OutcomePCollections(main: PCollection, dropped: apache_beam.pvalue.PCollection | None = None)

Bases: object

Result of a transform with main and dropped outputs.

Usage:

result = inputs | OutcomeParDo(MyDoFn())

# Chain to next transform (uses .main implicitly)
result | NextTransform()

# Access outputs explicitly
good = result.main
bad = result.dropped
__init__(main: PCollection, dropped: apache_beam.pvalue.PCollection | None = None)
property main: PCollection

The successful outputs.

property dropped: PCollection

The dropped elements (PCollection[DroppedRecord]).

Includes both intentional drops and errors.

class schrodinger.seam.transforms.outcomes.OutcomeParDo(dofn: DoFn, error_handling: ErrorHandling | None = None, transform_type: str = '')

Bases: PTransform

Drop-in replacement for beam.ParDo with automatic outcome routing.

Produces two outputs: - main: Successfully processed elements - dropped: Elements that were dropped (intentional, error, or no output)

DoFns can yield DroppedRecord directly to indicate intentional rejection:

class MyDoFn(beam.DoFn):
    def process(self, element: Structure) -> Iterable[Structure | DroppedRecord]:
        if element.mw > 500:
            yield DroppedRecord(
                input=element,
                drop_reason=f"MW {element.mw} exceeds 500"
            )
            return
        yield element

OutcomeParDo automatically creates DroppedRecords for: - Exceptions (when error_handling is configured) - Inputs that produce no output

Usage:

# Errors propagate (crash pipeline)
result = inputs | OutcomeParDo(MyDoFn())

# Catch all exceptions -> route to dropped
result = inputs | OutcomeParDo(MyDoFn(), error_handling=ErrorHandling())

# Catch only specific exceptions
result = inputs | OutcomeParDo(
    MyDoFn(),
    error_handling=ErrorHandling(exc_class=ValueError)
)
__init__(dofn: DoFn, error_handling: ErrorHandling | None = None, transform_type: str = '')
expand(pcoll: PCollection) OutcomePCollections