Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deduplicate message batches in SQS Poller #2918

Merged
merged 11 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gems/aws-sdk-sqs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
Unreleased Changes
------------------

* Feature - Update QueuePoller to handle duplicate messages before yielding. Fixes bug in (#2913).

1.63.0 (2023-09-27)
------------------

Expand Down
69 changes: 34 additions & 35 deletions gems/aws-sdk-sqs/lib/aws-sdk-sqs/queue_poller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

module Aws
module SQS

# A utility class for long polling messages in a loop. **Messages are
# automatically deleted from the queue at the end of the given block.**
#
Expand Down Expand Up @@ -203,7 +202,6 @@ module SQS
# ```
#
class QueuePoller

# @param [String] queue_url
# @option options [Client] :client
# @option (see #poll)
Expand Down Expand Up @@ -333,7 +331,7 @@ def poll(options = {}, &block)
loop do
messages = get_messages(config, stats)
if messages.empty?
check_idle_timeout(config, stats, messages)
check_idle_timeout(config, stats)
else
process_messages(config, stats, messages, &block)
end
Expand All @@ -348,21 +346,21 @@ def poll(options = {}, &block)
# `#receipt_handle`.
# @param [Integer] seconds
def change_message_visibility_timeout(message, seconds)
@client.change_message_visibility({
@client.change_message_visibility(
queue_url: @queue_url,
receipt_handle: message.receipt_handle,
visibility_timeout: seconds,
})
visibility_timeout: seconds
)
end

# @note This method should be called from inside a {#poll} block.
# @param [#receipt_handle] message An object that responds to
# `#receipt_handle`.
def delete_message(message)
@client.delete_message({
@client.delete_message(
queue_url: @queue_url,
receipt_handle: message.receipt_handle,
})
receipt_handle: message.receipt_handle
)
end

# @note This method should be called from inside a {#poll} block.
Expand All @@ -372,16 +370,16 @@ def delete_message(message)
def delete_messages(messages)
@client.delete_message_batch(
queue_url: @queue_url,
entries: messages.map { |msg|
entries: messages.map do |msg|
{ id: msg.message_id, receipt_handle: msg.receipt_handle }
}
end
)
end

private

def get_messages(config, stats)
config.before_request.call(stats) if config.before_request
config.before_request&.call(stats)
messages = send_request(config).messages
stats.request_count += 1
messages
Expand All @@ -392,17 +390,22 @@ def send_request(config)
@client.receive_message(params)
end

def check_idle_timeout(config, stats, messages)
if config.idle_timeout
since = stats.last_message_received_at || stats.polling_started_at
idle_time = Time.now - since
throw :stop_polling if idle_time > config.idle_timeout
end
def check_idle_timeout(config, stats)
return unless config.idle_timeout

since = stats.last_message_received_at || stats.polling_started_at
idle_time = Time.now - since
throw :stop_polling if idle_time > config.idle_timeout
end

def process_messages(config, stats, messages, &block)
stats.received_message_count += messages.count
stats.last_message_received_at = Time.now

# duplicated messages will have a different receipt handle
# so we need to provide the most recent receipt to
# delete a batch - thus, filtering below by message_id
messages = messages.reverse.uniq(&:message_id).reverse!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

catch(:skip_delete) do
yield_messages(config, messages, stats, &block)
delete_messages(messages) unless config.skip_delete
Expand All @@ -421,7 +424,6 @@ def yield_messages(config, messages, stats, &block)

# Statistics tracked client-side by the {QueuePoller}.
class PollerStats

def initialize
@request_count = 0
@received_message_count = 0
Expand All @@ -444,27 +446,25 @@ def initialize

# @return [Time,nil]
attr_accessor :polling_stopped_at

end

# A read-only set of configuration used by the QueuePoller.
class PollerConfig

# @api private
CONFIG_OPTIONS = Set.new([
:idle_timeout,
:skip_delete,
:before_request,
])
CONFIG_OPTIONS = Set.new %i[
idle_timeout
skip_delete
before_request
]

# @api private
PARAM_OPTIONS = Set.new([
:wait_time_seconds,
:max_number_of_messages,
:visibility_timeout,
:attribute_names,
:message_attribute_names,
])
PARAM_OPTIONS = Set.new %i[
wait_time_seconds
max_number_of_messages
visibility_timeout
attribute_names
message_attribute_names
]

# @return [Integer,nil]
attr_reader :idle_timeout
Expand All @@ -487,7 +487,7 @@ def initialize(options)
max_number_of_messages: 1,
visibility_timeout: nil,
attribute_names: ['All'],
message_attribute_names: ['All'],
message_attribute_names: ['All']
}
options.each do |opt_name, value|
if CONFIG_OPTIONS.include?(opt_name)
Expand Down Expand Up @@ -522,7 +522,6 @@ def to_h
PARAM_OPTIONS.each { |key| hash[key] = @request_params[key] }
hash
end

end
end
end
Expand Down
Loading