schrodinger.stepper.cloud.aws_sqs module¶
AWS SQS API library.
Simple Queue Service: works primarily as publisher & subscriber model.
The available methods here are modeled on GCP’s PubSub service. The outline and
terminology are similar to pubsub.py
and the actions available to users match
the invocation from stepper.py.
Terminology: +————————-+————————–+ | PubSub | SQS | +————————-+————————–+ | Topic | Queue | +————————-+————————–+ | Subscriber | N/A (general API call) | +————————-+————————–+ | Push | Send Messages | +————————-+————————–+ | Pull | Receive Messages | +————————-+————————–+ | Acknowledge | Delete Message | +————————-+————————–+ | Acknowledgment Deadline | Visibility Timeout | +————————-+————————–+ | Message -> 1 entry | Message -> 1 parcel = | | | _MSG_PARCEL_SIZE entries | +————————-+————————–+
#=============================================================================== # Actions: #===============================================================================
The following high-level methods are available: 1. List all topics available in current user’s account. 2. Clear all topics in current user’s account. 3. Create new topic (also creates a dead-letter topic and deletes both if they already exist). 4. Upload a file to a topic by distributing contents into messages. 5. Download messages from a topic into a file. 6. Process messages from an input topic by running a step task and uploads the result into an output topic.
#=============================================================================== # AWS Services #===============================================================================
For AWS credential configuration see
schrodinger.stepper.cloud.aws_client.get_client()
docstring.
#=============================================================================== # Message Parcelling #===============================================================================
Multiple entries / inputs or parcelled into a single message in an SQS topic /
queue. The number of entries to parcel is determined by the _MSG_PARCEL_SIZE
constant. Message parcelling is performed to avoid hitting SQS’ 120k in-flight
messages limit. A message is considered in-flight when an SQS queue has sent a
message but the consumer or subscriber has yet to acknowledge the message, and
the message’s acknowledgement deadline (visibility timeout) has yet to expire.
There are two workflow patterns when processing these messages inside
aws_sqs.pull_and_process
:
batch_size
<_MSG_PARCEL_SIZE
:
When the requested batch_size
is smaller than the default message parcel
size, a single message’s body is broken down into smaller parcels of size
batch_size
and republished to the input topic. Note that some messages might
contain parcels that are smaller than the requested batch_size
and thus,
the workers must be designed to handle this case too.
batch_size
>=_MSG_PARCEL_SIZE
:
When the requested batch_size
is greater or equal to the default message
parcel size, a single or multiple messages are consumed and processed. Note
that if the requested batch_size
is not a multiple of the default message
parcel size, the number of entries processed by the worker will be greater than
the batch_size
. Thus, workers must be designed to handle inputs larger than
the requested batch_size
.
Outputs:
All step outputs are also parcelled into size _MSG_PARCEL_SIZE
and uploaded
to the requested output topic. Note that some messages may contain parcels
smaller than _MSG_PARCEL_SIZE
, and thus, the consumer of the output topic
must be able to handle this discrepancy.
- schrodinger.stepper.cloud.aws_sqs.print(*args)[source]¶
We override the builtin print in order to add timestamps to debugging statements.
- schrodinger.stepper.cloud.aws_sqs.create_topic(topic_name)[source]¶
Create both a new topic as well as a dead-letter topic to track messages which are received more than twice by subscribers. Note that both topics are deleted and newly generated if they already exist, hence any messages in these two topics will be lost forever.
- Parameters
topic_name (str) – name used to create new topic and as prefix to dead-letter topic.
- schrodinger.stepper.cloud.aws_sqs.create_queue(queue_name, queue_attributes)[source]¶
AWS API request to create new queue with requested queue attributes.
- Parameters
queue_name (str) – the name of queue to create
queue_attributes (dict) – settings for new queue
- Returns
aws response
- Return type
dict
- schrodinger.stepper.cloud.aws_sqs.clear_topics(prefix)[source]¶
Removes all topics with a name that starts with the given prefix string currently in the aws account. Note that this action is irreversible.
- Parameters
prefix (str) – topic names that start with prefix to clear.
- schrodinger.stepper.cloud.aws_sqs.delete_topic(topic_name)[source]¶
Deletes a topic by name in current aws account. Note that this action is irreversible.
- Parameters
topic_name (str) – topic to delete
- schrodinger.stepper.cloud.aws_sqs.delete_topic_by_url(topic_url)[source]¶
Deletes a topic by topic url in current aws account. Note that this action is irreversible.
- Parameters
topic_url (str) – URL of topic to delete
- schrodinger.stepper.cloud.aws_sqs.list_topic_urls(prefix='')[source]¶
- Parameters
prefix (str) – topic prefix to filter results by.
- Returns
list all available topics by URL.
- Return type
list[str]
- schrodinger.stepper.cloud.aws_sqs.get_num_available_messages(topic_name)[source]¶
Note that this method requires at least 60 secs to properly account for available messages from the last message sent, even then it is not an exact count.
- Parameters
topic_name (str) – topic to retrieve messages for
- Returns
the number of approximate available messages
- Return type
int
- schrodinger.stepper.cloud.aws_sqs.get_topic_url(topic_name)[source]¶
- Parameters
topic_name (str) – URL for topic with this name
- Returns
URL to topic
- Return type
str
- schrodinger.stepper.cloud.aws_sqs.get_topic_attributes(topic_name)[source]¶
- Parameters
topic_name (str) – name of topic to retrieve attributes of
- Returns
a dictionary of all attributes / settings for this topic
- Return type
dict
- schrodinger.stepper.cloud.aws_sqs.publish(topic_name, msg)[source]¶
- Parameters
topic_name (str) – name of topic to send messages to
msg (str) – message to publish
- Returns
if publish call to aws was successfully executed
- Return type
bool
- schrodinger.stepper.cloud.aws_sqs.publish_in_batches(topic_name, msgs)[source]¶
Publish to aws in batches of _AWS_MAX_MESSAGES or _PUBLISH_MAX_MESSAGES, whichever holds a lower value.
- Parameters
topic_name (str) – name of topic to send messages to
msgs (iterable[str]) – iterator of messages to publish
- Returns
(number of messages published, list of messages failed to publish)
- Return type
(int, list[str])
- schrodinger.stepper.cloud.aws_sqs.pull(topic_name, min_messages=1, max_messages=10, receive_message_timeout=300)[source]¶
Pull messages from requested topic. Guaranteed to return at least
min_messages
unlessreceive_message_timeout
expires.- Parameters
topic_name (str) – name of topic to pull messages from
min_messages (int) – minimum number of messages to retrieve
max_messages (int) – maximum number of messages to retrieve
receive_message_timeout (int) – expected time to wait to receive a message before returning in seconds.
- Returns
list of aws message dicts
- Return type
list[dict]
- schrodinger.stepper.cloud.aws_sqs.acknowledge(messages, topic_name)[source]¶
Acknowledge messages in requested topic. Note that messages are batched by _AWS_MAX_MESSAGES per aws request.
- Parameters
messages (iterable[dict]) – list of aws message dict
topic_name (str) – name of topic to acknowledge messages from
- Returns
list of message dicts that were failed to be acknowledged by aws
- Return type
list[dict]
- schrodinger.stepper.cloud.aws_sqs.refresh_ack_timeout(messages, topic_name, timeout=300)[source]¶
Refresh acknowledgement timeout (visibility timeout). This extends the timeout of the locked message in the topic so no other subscribers receive this message until the timeout expires. Note that this method resets the timeout rather than extending the existing timeout of the message.
- Parameters
messages (iterable[dict]) – list of aws message dict
topic_name (str) – name of topic to refresh timeouts
timeout (int) – timeout in seconds
- Returns
list of message dicts that were failed to be acknowledged by aws
- Return type
list[dict]
- schrodinger.stepper.cloud.aws_sqs.upload(fname, topic_name)[source]¶
Upload file encoded into messages, where each message contains _MSG_PARCEL_SIZE number of line(s), to requested topic. The messages are published in batches of _AWS_MAX_MESSAGES or _PUBLISH_MAX_MESSAGES, whichever holds a lower value.
- Parameters
fname (str) – path to file to upload
topic_name (str) – name of topic to upload to
- Returns
number of messages published
- Return type
int
- schrodinger.stepper.cloud.aws_sqs.multi_file_upload(files, topic_name)[source]¶
Upload multiple files as messages to
topic_name
.- Parameters
files (List[str]) – List of file paths to upload
topic_name (str) – name of topic to upload to
- Returns
number of messages published
- Return type
int
- schrodinger.stepper.cloud.aws_sqs.download(topic_name, fname)[source]¶
Download all messages in given topic into a file. The messages are pulled in batches of
_PULL_MAX_MESSAGES
and_RECEIVE_TIMEOUT
is used to determine how long to wait before assuming the topic is exhausted. Each message is written out as a new line.- Parameters
topic_name (str) – name of topic to pull messages from
fname (str) – path to file to write messages out to
- schrodinger.stepper.cloud.aws_sqs.pull_and_process(input_topic, output_topic, settings_file, step_path, workflow_id, batch_size)[source]¶
Pull
batch_size
number of messages frominput_topic
, run step task as a subprocess task, and upload results tooutput_topic
.- Parameters
input_topic (str) – input topic name
output_topic (str) – output topic name
settings_file (str) – file path to settings yaml file for stepper
step_path (str) – complete path to step to execute in a subprocess call
workflow_id (str) – step workflow ID
batch_size (int) – number of data values / entries to process in a batch
- Returns
(messages pulled / processed, messages uploaded)
- Return type
(int, int)
- schrodinger.stepper.cloud.aws_sqs.print_statistics(msgs_processed, msgs_uploaded, total_msgs_processed, total_msgs_uploaded, input_count, output_count, total_input_count, total_output_count)[source]¶
- schrodinger.stepper.cloud.aws_sqs.print_total_statistics(total_msgs_processed, total_msgs_uploaded, total_input_count, total_output_count)[source]¶
- schrodinger.stepper.cloud.aws_sqs.batch_and_republish(msg, input_topic, batch_size)[source]¶
Break a single message body into batch_size parcels and republish as new messages to input_topic.
- Parameters
msg – single aws message dict to republish in smaller chunks.
input_topic (str) – input topic name
batch_size (int) – number of entries to repackage in a single message.
- schrodinger.stepper.cloud.aws_sqs.process_msgs(msgs, input_topic, output_topic, settings_file, step_path, workflow_id)[source]¶
Process the requested msgs by writing out to disk to use as inputs for step_path. Only the acknowledgement deadline is refreshed, while the msgs themselves will remain unacknowledged. The output file generated by the step is returned.
- Parameters
input_topic (str) – input topic name
output_topic (str) – output topic name
settings_file (str) – file path to settings yaml file for stepper
step_path (str) – complete path to step to execute in a subprocess call
workflow_id (str) – step workflow ID
- Returns
the output file name from the step workflow.
- Return type
str