Skip to content
This repository has been archived by the owner on Apr 5, 2023. It is now read-only.

Commit

Permalink
Merge pull request #38 from pipefy/ms-main/bugfix/handle-message-attr…
Browse files Browse the repository at this point in the history
…ibute-from-payload

Proper handling of message according to Raw Message Delivery setting on AWS SNS/SQS
  • Loading branch information
mhshiba authored Aug 12, 2022
2 parents 2742a3e + 84ad309 commit 19912fd
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 42 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ services:

localstack:
container_name: localstack
image: localstack/localstack-light:0.14.3
image: localstack/localstack-light:1.0.4
environment:
AWS_DEFAULT_REGION: us-east-1
LOCALSTACK_SERVICES: sns,sqs
Expand Down
2 changes: 1 addition & 1 deletion lib/pipefy_message/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def process_message
}, context, correlation_id, event_id))

retry_count = metadata["ApproximateReceiveCount"].to_i - 1
obj.perform(payload["Message"],
obj.perform(payload,
{ retry_count: retry_count, context: context, correlation_id: correlation_id })

elapsed_time_ms = Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond) - start
Expand Down
2 changes: 1 addition & 1 deletion lib/pipefy_message/providers/aws_client/sns_broker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def initialize(_opts = {})

@topic_arn_prefix = ENV.fetch("AWS_SNS_ARN_PREFIX", "arn:aws:sns:us-east-1:000000000000:")
@is_staging = ENV["ASYNC_APP_ENV"] == "staging"
rescue StandardError
rescue StandardError => e
raise PipefyMessage::Providers::Errors::ResourceError, e.message
end

Expand Down
56 changes: 32 additions & 24 deletions lib/pipefy_message/providers/aws_client/sqs_broker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ def poller

@poller.poll({ wait_time_seconds: @config[:wait_time_seconds],
message_attribute_names: ["All"], attribute_names: ["All"] }) do |received_message|
payload = JSON.parse(received_message.body)
original_metadata = received_message.message_attributes.merge(received_message.attributes)

metadata = transform_metadata(original_metadata)
metadata, payload = extract_metadata_and_payload(received_message)
context = metadata[:context]
correlation_id = metadata[:correlationId]
event_id = metadata[:eventId]
Expand All @@ -60,7 +57,6 @@ def poller
message_text: "Message received by SQS poller"
}, context, correlation_id, event_id))
)

yield(payload, metadata)
rescue StandardError => e
# error in the routine, skip delete to try the message again later with 30sec of delay
Expand Down Expand Up @@ -110,28 +106,40 @@ def handle_queue_protocol(queue_url)

##
# Extracts metadata value according to its type
def extract_metadata_value(metadata, key)
return nil unless metadata.key? key

case metadata[key].data_type
when "String"
metadata[key].string_value
when "Binary"
metadata[key].binary_value
end
def extract_metadata_value(metadata, key, body_message_attribute_field)
value_from_metadata = if !metadata.empty? && metadata.key?(key)
case metadata[key].data_type
when "String"
metadata[key].string_value
when "Binary"
metadata[key].binary_value
end
end

value_from_metadata.nil? ? body_message_attribute_field.dig(key, "Value") : value_from_metadata
end

##
# Transform metadata to a simple hash
def transform_metadata(metadata)
context = extract_metadata_value(metadata, "context")
correlation_id = extract_metadata_value(metadata, "correlationId")
event_id = extract_metadata_value(metadata, "eventId")
{
context: context,
correlationId: correlation_id,
eventId: event_id
}
# Transform metadata and payload to a simple hash
# Also handle differences if `Enable raw message delivery` SQS setting is on/off
def extract_metadata_and_payload(received_message)
original_metadata = received_message.message_attributes.merge(received_message.attributes)
body_as_json = JSON.parse(received_message.body)

body_message_attribute = body_as_json["MessageAttributes"] || {}
context = extract_metadata_value(original_metadata, "context", body_message_attribute)
correlation_id = extract_metadata_value(original_metadata, "correlationId", body_message_attribute)
event_id = extract_metadata_value(original_metadata, "eventId", body_message_attribute)
payload = body_as_json["Message"] || received_message.body

[
{
context: context,
correlationId: correlation_id,
eventId: event_id
},
payload
]
end
end
end
Expand Down
105 changes: 90 additions & 15 deletions spec/providers/aws/sqs_broker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,29 +57,104 @@
end

describe "#poller" do
it "should consume a message" do
mocked_message = { message_id: "44c44782-fee1-6784-d614-43b73c0bda8d",
receipt_handle: "2312dasdas1231221312321adsads",
body: "{\"Message\": {\"foo\": \"bar\"}}" }

let(:eventId) { SecureRandom.hex }
let(:correlationId) { SecureRandom.hex }
let(:context) { SecureRandom.hex }
let(:expected_message_result) { { "default" => { "foo" => "bar" } }.to_json }
let(:expected_metadata_result) do
{
context: context,
correlationId: correlationId,
eventId: eventId
}
end
let(:mocked_element) { Aws::SQS::Types::Message.new(mocked_message) }
let(:mocked_list) do
mocked_list = Aws::Xml::DefaultList.new
mocked_list.append(mocked_element)
mocked_list
end
let(:mocked_poller) do
mocked_poller = Aws::SQS::QueuePoller.new("http://localhost:4566/000000000000/my_queue",
{ skip_delete: true })
mocked_poller.before_request { |stats| throw :stop_polling if stats.received_message_count > 0 }

mocked_element = Aws::SQS::Types::Message.new(mocked_message)
mocked_list = Aws::Xml::DefaultList.new
mocked_list.append(mocked_element)
mocked_poller.client.stub_responses(:receive_message, messages: mocked_list)

mocked_poller
end
let(:worker) do
worker = described_class.new
worker.instance_variable_set(:@poller, mocked_poller)
worker
end

context "Raw Message Delivery disabled" do
let(:body_json) do
# rubocop:disable Layout/HeredocIndentation
<<~EOS_BODY
{
\"Type\" : \"Notification\",
\"MessageId\" : \"6c7057f5-0d43-54ad-a502-0c9a4b6cdcf1\",
\"TopicArn\" : \"arn:aws:sns:us-east-1:038527119583:core-card-field-value-updated-topic\",
\"Message\" : \"{\\\"default\\\":{\\\"foo\\\":\\\"bar\\\"}}\",
\"Timestamp\" : \"2022-08-11T18:01:19.875Z\",
\"SignatureVersion\" : \"1\",
\"Signature\" : \"GrOeiHuqVV9eB+RAWZ2XYe2ko/KXxnxVhQ/sW8zV3ybgO0UD6BI32XL/mw4r562msXpG0BZc7dFbJG6XPVcQ7YZWnVKU7c34nS9NyTimMTz5Df/raKCdVkxigMhbS45CPMC//u7Sz9fDb/MXTrInnuSVPY14/QwEwXqyV45M+lTzLoBJSM05UX0eo1MOQxRQ8IYgPay5z6BSSHq4B6/59U88PMv4VJLNaWIb8dTiO1ixK9Nz7Xk/dqqC/bI6A+VLUNhVSitDfkDaPPoSG5qFnBPRzpcQhznANkjecW6MSWtCf0R8BuSqAYNxoCzDcC5xOf3zJOccfUTwvxz5f5jwfg==\",
\"SigningCertURL\" : \"https://sns.us-east-1.amazonaws.com/SimpleNotificationService-56e67fcb41f6fec09b0196692625d385.pem\",
\"UnsubscribeURL\" : \"https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:038527119583:core-card-field-value-updated-topic:995474b4-3a57-4c94-abdf-3ba50244723d\",
\"MessageAttributes\" : {
\"eventId\" : {\"Type\":\"String\",\"Value\":\"#{eventId}\"},
\"context\" : {\"Type\":\"String\",\"Value\":\"#{context}\"},
\"correlationId\" : {\"Type\":\"String\",\"Value\":\"#{correlationId}\"}
}
}
EOS_BODY
# rubocop:enable Layout/HeredocIndentation
end
let(:mocked_message) do
{
message_id: "44c44782-fee1-6784-d614-43b73c0bda8d",
receipt_handle: "2312dasdas1231221312321adsads",
body: body_json
}
end

it "should consume a message " do
worker.poller do |message, metadata|
expect(message).to eq expected_message_result
expect(metadata).to eq expected_metadata_result
end
end
end

context "Raw Message Delivery enabled" do
let(:mocked_message) do
{
message_id: "44c44782-fee1-6784-d614-43b73c0bda8d",
receipt_handle: "2312dasdas1231221312321adsads",
body: "{\"default\":{\"foo\":\"bar\"}}",
message_attributes: {
"context" => {
data_type: "String",
string_value: context
},
"correlationId" => {
data_type: "String",
string_value: correlationId
},
"eventId" => {
data_type: "String",
string_value: eventId
}
}
}
end

result = nil
expected_result = { "Message" => { "foo" => "bar" } }
worker.poller do |message|
result = message
it "should consume a message " do
worker.poller do |message, metadata|
expect(message).to eq expected_message_result
expect(metadata).to eq expected_metadata_result
end
end
expect(result).to eq expected_result
end
end

Expand Down

0 comments on commit 19912fd

Please sign in to comment.