Skip to content

Commit

Permalink
Add support for selectively choosing deduplication keys.
Browse files Browse the repository at this point in the history
  • Loading branch information
Keito Kira committed Sep 25, 2023
1 parent 32e1273 commit cadd4a7
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 20 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
Unreleased Changes
------------------

* Feature - Add support for selectively choosing deduplication keys.

3.8.0 (2023-06-02)
------------------

Expand Down
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,36 @@ When using FIFO queues, jobs will NOT be processed concurrently by the poller
to ensure the correct ordering. Additionally, all jobs on a FIFO queue will be queued
synchronously, even if you have configured the `amazon_sqs_async` adapter.

#### Message Deduplication ID

FIFO queues support [Message deduplication ID](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html), which is the token used for deduplication of sent messages.
If a message with a particular message deduplication ID is sent successfully, any messages sent with the same message deduplication ID are accepted successfully but aren't delivered during the 5-minute deduplication interval.

##### Customize Deduplication keys

If necessary, the deduplication key used to create the message deduplication ID can be customized:

```ruby
Aws::Rails::SqsActiveJob.configure do |config|
config.deduplication_keys = [:job_class, :queue_name, :arguments]
end
# Or to set deduplication keys for a single job:
class YourJob < ApplicationJob
include Aws::Rails::SqsActiveJob
deduplicate_with :job_class, :queue_name, :arguments
#...
end
```

By default, the following keys are used for deduplication keys:

```
job_class, provider_job_id, queue_name, priority, arguments, executions, exception_executions, locale, timezone, enqueued_at
```

Note that `job_id` is NOT included in deduplication keys because it is unique for each initialization of the job, and the run-once behavior must be guaranteed for ActiveJob retries.

#### Message Group IDs

FIFO queues require a message group id to be provided for the job. It is determined by:
Expand Down
24 changes: 20 additions & 4 deletions lib/active_job/queue_adapters/amazon_sqs_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,8 @@ def _enqueue(job, body = nil, send_message_opts = {})
send_message_opts[:message_attributes] = message_attributes(job)

if Aws::Rails::SqsActiveJob.fifo?(queue_url)
# job_id is unique per initialization of job
# Remove it from message dup id to ensure run-once behavior
# with ActiveJob retries
send_message_opts[:message_deduplication_id] =
Digest::SHA256.hexdigest(Aws::Json.dump(body.except('job_id')))
Digest::SHA256.hexdigest(Aws::Json.dump(deduplication_body(job, body)))

message_group_id = job.message_group_id if job.respond_to?(:message_group_id)
message_group_id ||= Aws::Rails::SqsActiveJob.config.message_group_id
Expand All @@ -54,6 +51,25 @@ def message_attributes(job)
}
}
end

def deduplication_body(job, body)
dedup_keys = job.deduplication_keys if job.respond_to?(:deduplication_keys)
dedup_keys ||= Aws::Rails::SqsActiveJob.config.deduplication_keys.map(&:to_s)

ignored_dedup_key = 'job_id'

if dedup_keys.include?(ignored_dedup_key)
dedup_keys.delete(ignored_dedup_key)
Rails.logger.warn <<~WARNING
job_id cannot be used as a key for deduplication.
It is ignored to ensure run-once behavior of ActiveJob retries.
WARNING
end

dedup_keys = body.except(ignored_dedup_key).keys if dedup_keys.blank?

body.slice(*dedup_keys)
end
end

# create an alias to allow `:amazon` to be used as the adapter name
Expand Down
1 change: 1 addition & 0 deletions lib/aws-sdk-rails.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
require_relative 'aws/rails/railtie'
require_relative 'aws/rails/notifications'
require_relative 'aws/rails/sqs_active_job/configuration'
require_relative 'aws/rails/sqs_active_job/deduplication'
require_relative 'aws/rails/sqs_active_job/executor'
require_relative 'aws/rails/sqs_active_job/job_runner'
require_relative 'aws/rails/sqs_active_job/lambda_handler'
Expand Down
19 changes: 13 additions & 6 deletions lib/aws/rails/sqs_active_job/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ class Configuration
shutdown_timeout: 15,
queues: {},
logger: ::Rails.logger,
message_group_id: 'SqsActiveJobGroup'
message_group_id: 'SqsActiveJobGroup',
deduplication_keys: []
}

# @api private
attr_accessor :queues, :max_messages, :visibility_timeout,
:shutdown_timeout, :client, :logger,
:async_queue_error_handler, :message_group_id
:async_queue_error_handler, :message_group_id,
:deduplication_keys

# Don't use this method directly: Confugration is a singleton class, use
# +Aws::Rails::SqsActiveJob.config+ to access the singleton config.
Expand Down Expand Up @@ -67,7 +69,7 @@ class Configuration
# for the poller.
#
# @option options [String] :config_file
# Override file to load configuration from. If not specified will
# Override file to load configuration from. If not specified will
# attempt to load from config/aws_sqs_active_job.yml.
#
# @option options [String] :message_group_id (SqsActiveJobGroup)
Expand All @@ -81,13 +83,18 @@ class Configuration
# +active_job.queue_adapter = :amazon_sqs_async+. Called with:
# [error, job, job_options]
#
# @option options [SQS::Client] :client SQS Client to use. A default
# @option options [SQS::Client] :client SQS Client to use. A default
# client will be created if none is provided.
#
# @option options [Array] :deduplication_keys ([])
# Keys for deduplication of FIFO queues.
# The type of keys stored in the array should be String or Symbol.

def initialize(options = {})
options[:config_file] ||= config_file if config_file.exist?
options = DEFAULTS
.merge(file_options(options))
.merge(options)
.merge(file_options(options))
.merge(options)
set_attributes(options)
end

Expand Down
19 changes: 19 additions & 0 deletions lib/aws/rails/sqs_active_job/deduplication.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# frozen_string_literal: true

module Aws
module Rails
module SqsActiveJob
extend ActiveSupport::Concern

included do
class_attribute :deduplication_keys
end

module ClassMethods
def deduplicate_with(*keys)
self.deduplication_keys = keys.map(&:to_s)
end
end
end
end
end
37 changes: 27 additions & 10 deletions test/active_job/queue_adapters/amazon_sqs_adapter_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,37 @@ module QueueAdapters
end

it 'adds message_deduplication_id and default message_group_id if job does not override it' do
expect(client).to receive(:send_message)
.with(
{
queue_url: 'https://queue-url.fifo',
message_body: instance_of(String),
message_attributes: instance_of(Hash),
message_group_id: Aws::Rails::SqsActiveJob.config.message_group_id,
message_deduplication_id: instance_of(String)
}
)
expect(client).to receive(:send_message).with(
{
queue_url: 'https://queue-url.fifo',
message_body: instance_of(String),
message_attributes: instance_of(Hash),
message_group_id: Aws::Rails::SqsActiveJob.config.message_group_id,
message_deduplication_id: instance_of(String)
}
)

TestJob.perform_later('test')
sleep(0.2)
end

describe 'when job has deduplication keys defined' do
it 'adds customized message_deduplication_id' do
expect(client).to receive(:send_message).with(
{
queue_url: 'https://queue-url.fifo',
message_body: instance_of(String),
message_attributes: instance_of(Hash),
message_group_id: Aws::Rails::SqsActiveJob.config.message_group_id,
message_deduplication_id: instance_of(String)
}
)

TestJobWithDeduplicationKeys.perform_later('test')
sleep(0.2)
end
end

describe 'when job has #message_group_id defined' do
it 'adds message_deduplication_id and default message_group_id if job does not return a value' do
expect(client).to receive(:send_message).with(
Expand Down
5 changes: 5 additions & 0 deletions test/aws/rails/sqs_active_job/test_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,8 @@ def perform(a1, a2)
class TestJobWithMessageGroupID < TestJob
def message_group_id; end
end

class TestJobWithDeduplicationKeys < TestJob
include Aws::Rails::SqsActiveJob
deduplicate_with :job_class, :queue_name
end

0 comments on commit cadd4a7

Please sign in to comment.