Skip to content
This repository has been archived by the owner on Apr 5, 2023. It is now read-only.

Commit

Permalink
Merge pull request #35 from mhshiba/ms-main/feature/convert-metadata
Browse files Browse the repository at this point in the history
✨ Convert metadata before sending it to consumer
  • Loading branch information
mhshiba authored Aug 9, 2022
2 parents ff7a429 + d8a41af commit 2742a3e
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 7 deletions.
6 changes: 3 additions & 3 deletions lib/pipefy_message/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ def process_message
build_consumer_instance.poller do |payload, metadata|
start = Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond)

context = metadata["context"]
correlation_id = metadata["correlationId"]
event_id = metadata["eventId"]
context = metadata[:context]
correlation_id = metadata[:correlationId]
event_id = metadata[:eventId]

logger.info(log_context({
message_text: "Message received by poller to be processed by consumer",
Expand Down
35 changes: 31 additions & 4 deletions lib/pipefy_message/providers/aws_client/sqs_broker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ def poller
@poller.poll({ wait_time_seconds: @config[:wait_time_seconds],
message_attribute_names: ["All"], attribute_names: ["All"] }) do |received_message|
payload = JSON.parse(received_message.body)
metadata = received_message.message_attributes.merge(received_message.attributes)
original_metadata = received_message.message_attributes.merge(received_message.attributes)

context = metadata["context"]
correlation_id = metadata["correlationId"]
event_id = metadata["eventId"]
metadata = transform_metadata(original_metadata)
context = metadata[:context]
correlation_id = metadata[:correlationId]
event_id = metadata[:eventId]
# We're extracting those again in the consumer
# process_message module. I considered whether these
# should perhaps be `yield`ed instead, but I guess
Expand Down Expand Up @@ -106,6 +107,32 @@ def handle_queue_name(queue_name)
def handle_queue_protocol(queue_url)
ENV["ASYNC_APP_ENV"] == "development" ? queue_url : queue_url.sub(%r{^http://}, "https://")
end

##
# Extracts metadata value according to its type
def extract_metadata_value(metadata, key)
return nil unless metadata.key? key

case metadata[key].data_type
when "String"
metadata[key].string_value
when "Binary"
metadata[key].binary_value
end
end

##
# Transform metadata to a simple hash
def transform_metadata(metadata)
context = extract_metadata_value(metadata, "context")
correlation_id = extract_metadata_value(metadata, "correlationId")
event_id = extract_metadata_value(metadata, "eventId")
{
context: context,
correlationId: correlation_id,
eventId: event_id
}
end
end
end
end
Expand Down

0 comments on commit 2742a3e

Please sign in to comment.