schrodinger.seam.io.jobfilesystem module

class schrodinger.seam.io.jobfilesystem.JobFileSystem(pipeline_options)

Bases: FileSystem

An abstraction over files shared between a driver job and worker jobs.

All files written to this filesystem will be made available to all SeamRunner workers.

Example usage:

>>> jobfs_path = 'jobfs://test_file.txt'
>>> with FileSystems.create('jobfs://test_file.txt') as f:
...     f.write(b'This is a test file.')
>>> with beam.Pipeline(runner=SeamRunner()) as p:
...     (p
...         | 'Read' >> beam.io.ReadFromText(jobfs_path)
...         | beam.LogElements())

Note

Currently the jobfs:// filesystem only allows for writing from the driver job. Writing _to_ the jobfs filesystem from a transform will result in nothing being written.

Warning

The jobfs:// filesystem will copy _all_ jobfs:// files to _every_ worker, regardless of whether or not it is used in the transforms the worker is executing. Copying many large files may result to the long startup time of the worker.

JOBFS_PREFIX = 'jobfs://'
classmethod scheme()

URI scheme for the FileSystem

open(path, mime_type='application/octet-stream', compression_type='auto') BinaryIO

Open a file for reading.

NOTE::

mime_type and compression_type are not implemented and will be ignored.

create(path, mime_type='application/octet-stream', compression_type='auto') BinaryIO

Returns a write channel for the given file path.

Args:

path: string path of the file object to be written to the system mime_type: MIME type to specify the type of content in the file object compression_type: Type of compression to be used for this object

Returns: file handle with a close function for the user to use

clear()

Clear the jobfs file map and root directory.

getLocalPath(path: str) str

Return the local file path for a jobfs path.

classmethod getJobFSFileMap() dict[str, str]

Get the jobfs file map.

This method should be used with caution, as it exposes the internal state of the JobFileSystem. It is intended for use with infrastructure code that needs to know where jobfs files are stored.

Maps jobfs paths (e.g. jobfs://test_file.txt) to where they’re stored locally (e.g. _jobfs/test_file.txt).

classmethod setJobFSFileMap(jobfs_file_map: dict[str, str])

Set the jobfs file map.

This method should be used with caution, as it exposes the internal state of the JobFileSystem. It is intended for use with infrastructure code.

copy(source_file_names, destination_file_names)

Recursively copy the file tree from the source to the destination

Args:

source_file_names: list of source file objects that needs to be copied destination_file_names: list of destination of the new object

Raises:

BeamIOError: if any of the copy operations fail

delete(path)

Deletes files or directories at the provided paths. Directories will be deleted recursively.

Args:

paths: list of paths that give the file objects to be deleted

Raises:

BeamIOError: if any of the delete operations fail

exists(path: str) bool

Check if the provided path exists on the FileSystem.

Args:

path: string path that needs to be checked.

Returns: boolean flag indicating if path exists

has_dirs() bool

Whether this FileSystem supports directories.

join(basepath: str, *paths: str) str

Join two or more pathname components for the filesystem

Args:

basepath: string path of the first component of the path paths: path components to be added

Returns: full path after combining all the passed components

last_updated(path)

Get UNIX Epoch time in seconds on the FileSystem.

Args:

path: string path of file.

Returns: float UNIX Epoch time

Raises:

BeamIOError: if path doesn’t exist.

metadata(path)

Fetch metadata of a file on the FileSystem.

This operation returns metadata as stored in the underlying FileSystem. It should not need to read file data to obtain this value. For web based file systems, this method should also incur as few as possible requests.

Args:

path: string path of a file.

Returns:

FileMetadata.

Raises:

BeamIOError: if path isn’t a file or doesn’t exist.

mkdirs(path)

Recursively create directories for the provided path.

Args:

path: string path of the directory structure that should be created

Raises:

IOError: if leaf directory already exists.

rename(src, dest)

Rename the files at the source list to the destination list. Source and destination lists should be of the same size.

Args:

source_file_names: List of file paths that need to be moved destination_file_names: List of destination_file_names for the files

Raises:

BeamIOError: if any of the rename operations fail

size(path: str) int

Get size in bytes of a file on the FileSystem.

Args:

path: string filepath of file.

Returns: int size of file according to the FileSystem.

Raises:

BeamIOError: if path doesn’t exist.

split(path: str)

Splits the given path into two parts.

Splits the path into a pair (head, tail) such that tail contains the last component of the path and head contains everything up to that.

For file-systems other than the local file-system, head should include the prefix.

Args:

path: path as a string

Returns:

a pair of path components as strings.