schrodinger.seam.io.jobfilesystem module

class schrodinger.seam.io.jobfilesystem.JobFileSystem(pipeline_options)

Bases: FileSystem

An abstraction over files shared between a driver job and worker jobs.

All files written to this filesystem will be made available to all SeamRunner workers.

Example usage:

>>> jobfs_path = 'jobfs://test_file.txt'
>>> with FileSystems.create('jobfs://test_file.txt') as f:
...     f.write(b'This is a test file.')
>>> with beam.Pipeline(runner=SeamRunner()) as p:
...     (p
...         | 'Read' >> beam.io.ReadFromText(jobfs_path)
...         | beam.LogElements())

Note

Currently the jobfs:// filesystem only allows for writing from the driver job. Writing _to_ the jobfs filesystem from a transform will result in nothing being written.

Warning

The jobfs:// filesystem will copy _all_ jobfs:// files to _every_ worker, regardless of whether or not it is used in the transforms the worker is executing. Copying many large files may result to the long startup time of the worker.

JOBFS_PREFIX = 'jobfs://'
classmethod scheme()

URI scheme for the FileSystem

open(path, mime_type='application/octet-stream', compression_type='auto') BinaryIO

Open a file for reading.

NOTE::

mime_type and compression_type are not implemented and will be ignored.

create(path, mime_type='application/octet-stream', compression_type='auto') BinaryIO

Returns a write channel for the given file path.

Args:

path: string path of the file object to be written to the system mime_type: MIME type to specify the type of content in the file object compression_type: Type of compression to be used for this object

Returns: file handle with a close function for the user to use

clear()

Clear the jobfs file map and root directory.

getLocalPath(path: str, create=False) str

Return the local file path for a jobfs path.

classmethod getJobFSFileMap() dict[str, str]

Get the jobfs file map.

This method should be used with caution, as it exposes the internal state of the JobFileSystem. It is intended for use with infrastructure code that needs to know where jobfs files are stored.

Maps jobfs paths (e.g. jobfs://test_file.txt) to where they’re stored locally (e.g. _jobfs/test_file.txt).

classmethod setJobFSFileMap(jobfs_file_map: dict[str, str])

Set the jobfs file map.

This method should be used with caution, as it exposes the internal state of the JobFileSystem. It is intended for use with infrastructure code.

copy(source_file_names: list[str], destination_file_names: list[str])

Recursively copy files from source to destination paths.

Parameters:
  • source_file_names – list of source jobfs paths to copy

  • destination_file_names – list of destination jobfs paths

Raises:

BeamIOError – if any of the copy operations fail

delete(paths: list[str])

Delete files at the provided jobfs paths.

Parameters:

paths – list of jobfs paths to delete

Raises:

BeamIOError – if any of the delete operations fail

exists(path: str) bool

Check if the provided path exists on the FileSystem.

has_dirs() bool

Whether this FileSystem supports directories.

join(basepath: str, *paths: str) str

Join two or more paths together.

last_updated(path: str) float

Get UNIX Epoch time in seconds on the FileSystem.

Raises: BeamIOError: if path doesn’t exist.

metadata(path: str) FileMetadata

Get metadata for a file in the jobfs filesystem.

Raises: BeamIOError: if path doesn’t exist.

mkdirs(path)

Recursively create directories for the provided path.

Args:

path: string path of the directory structure that should be created

Raises:

IOError: if leaf directory already exists.

rename(source_file_names: list[str], destination_file_names: list[str])

Rename/move files from source to destination paths.

Parameters:
  • source_file_names – list of source jobfs paths to rename

  • destination_file_names – list of destination jobfs paths

Raises:

BeamIOError – if any of the rename operations fail

size(path: str) int

Get size in bytes of a file on the FileSystem.

Parameters:

path – jobfs path of the file

Returns:

size of file in bytes

Raises:

BeamIOError – if path doesn’t exist or has invalid prefix

split(path: str) tuple[str, str]

Split the path into (head, tail) where tail is the last component.

The head will include the jobfs:// prefix.

Parameters:

path – jobfs path to split

Returns:

tuple of (head, tail) path components

Raises:

BeamIOError – if path has invalid prefix

schrodinger.seam.io.jobfilesystem.register_jobfs_output_files_on_exit(func)

Decorator that moves contents of the entire jobfs outputs directory to the current working directory and registers them as job output.