schrodinger.seam.io.jobfilesystem module¶
- class schrodinger.seam.io.jobfilesystem.JobFileSystem(pipeline_options)¶
Bases:
FileSystemAn abstraction over files shared between a driver job and worker jobs.
All files written to this filesystem will be made available to all SeamRunner workers.
Example usage:
>>> jobfs_path = 'jobfs://test_file.txt' >>> with FileSystems.create('jobfs://test_file.txt') as f: ... f.write(b'This is a test file.') >>> with beam.Pipeline(runner=SeamRunner()) as p: ... (p ... | 'Read' >> beam.io.ReadFromText(jobfs_path) ... | beam.LogElements())
Note
Currently the
jobfs://filesystem only allows for writing from the driver job. Writing _to_ the jobfs filesystem from a transform will result in nothing being written.Warning
The
jobfs://filesystem will copy _all_jobfs://files to _every_ worker, regardless of whether or not it is used in the transforms the worker is executing. Copying many large files may result to the long startup time of the worker.- JOBFS_PREFIX = 'jobfs://'¶
- classmethod scheme()¶
URI scheme for the FileSystem
- open(path, mime_type='application/octet-stream', compression_type='auto') BinaryIO¶
Open a file for reading.
- NOTE::
mime_typeandcompression_typeare not implemented and will be ignored.
- create(path, mime_type='application/octet-stream', compression_type='auto') BinaryIO¶
Returns a write channel for the given file path.
- Args:
path: string path of the file object to be written to the system mime_type: MIME type to specify the type of content in the file object compression_type: Type of compression to be used for this object
Returns: file handle with a close function for the user to use
- clear()¶
Clear the jobfs file map and root directory.
- getLocalPath(path: str, create=False) str¶
Return the local file path for a jobfs path.
- classmethod getJobFSFileMap() dict[str, str]¶
Get the jobfs file map.
This method should be used with caution, as it exposes the internal state of the JobFileSystem. It is intended for use with infrastructure code that needs to know where jobfs files are stored.
Maps jobfs paths (e.g. jobfs://test_file.txt) to where they’re stored locally (e.g. _jobfs/test_file.txt).
- classmethod setJobFSFileMap(jobfs_file_map: dict[str, str])¶
Set the jobfs file map.
This method should be used with caution, as it exposes the internal state of the JobFileSystem. It is intended for use with infrastructure code.
- copy(source_file_names: list[str], destination_file_names: list[str])¶
Recursively copy files from source to destination paths.
- Parameters:
source_file_names – list of source jobfs paths to copy
destination_file_names – list of destination jobfs paths
- Raises:
BeamIOError – if any of the copy operations fail
- delete(paths: list[str])¶
Delete files at the provided jobfs paths.
- Parameters:
paths – list of jobfs paths to delete
- Raises:
BeamIOError – if any of the delete operations fail
- exists(path: str) bool¶
Check if the provided path exists on the FileSystem.
- has_dirs() bool¶
Whether this FileSystem supports directories.
- join(basepath: str, *paths: str) str¶
Join two or more paths together.
- last_updated(path: str) float¶
Get UNIX Epoch time in seconds on the FileSystem.
Raises:
BeamIOError: if path doesn’t exist.
- metadata(path: str) FileMetadata¶
Get metadata for a file in the jobfs filesystem.
Raises:
BeamIOError: if path doesn’t exist.
- mkdirs(path)¶
Recursively create directories for the provided path.
- Args:
path: string path of the directory structure that should be created
- Raises:
IOError: if leaf directory already exists.
- rename(source_file_names: list[str], destination_file_names: list[str])¶
Rename/move files from source to destination paths.
- Parameters:
source_file_names – list of source jobfs paths to rename
destination_file_names – list of destination jobfs paths
- Raises:
BeamIOError – if any of the rename operations fail
- size(path: str) int¶
Get size in bytes of a file on the FileSystem.
- Parameters:
path – jobfs path of the file
- Returns:
size of file in bytes
- Raises:
BeamIOError – if path doesn’t exist or has invalid prefix
- split(path: str) tuple[str, str]¶
Split the path into (head, tail) where tail is the last component.
The head will include the jobfs:// prefix.
- Parameters:
path – jobfs path to split
- Returns:
tuple of (head, tail) path components
- Raises:
BeamIOError – if path has invalid prefix
- schrodinger.seam.io.jobfilesystem.register_jobfs_output_files_on_exit(func)¶
Decorator that moves contents of the entire jobfs outputs directory to the current working directory and registers them as job output.