Skip to content
This repository has been archived by the owner on Apr 5, 2023. It is now read-only.

Commit

Permalink
Add support for correlation and event IDs (#33)
Browse files Browse the repository at this point in the history
* feat: Add cid to publisher publish method

* feat: Add cid support to consumer

* refact: Fix failing specs

* refact: Change cid to correlation_id for readability

* chore: Ignore length-related linter warnings due to logs

* feat: Add unique, broker-agnostic event_id to messages

* fix: Remove stray exception in sample file 😅

* refact: Move event_id assignment to abstract Publisher layer

* feat: Add context to logs; use method to log context, cid and event_id
  • Loading branch information
ferreira-mev authored Aug 2, 2022
1 parent 6a33150 commit 89b680a
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 66 deletions.
7 changes: 7 additions & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,16 @@ Metrics/BlockLength:
Lint/MissingSuper:
Enabled: false

# Essentially, logs are making methods and blocks huge, not the
# logic itself. We thus decided to disable some linter checks
# related to number of lines.

Metrics/MethodLength:
Enabled: false

Metrics/BlockLength:
Enabled: false

Metrics/AbcSize:
Enabled: false

Expand Down
34 changes: 23 additions & 11 deletions lib/pipefy_message/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,36 @@ def process_message

build_consumer_instance.poller do |payload, metadata|
start = Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond)
logger.info({
message_text: "Message received by poller to be processed by consumer",
received_message: payload,
metadata: metadata
})

context = metadata["context"]
correlation_id = metadata["correlationId"]
event_id = metadata["eventId"]

logger.info(log_context({
message_text: "Message received by poller to be processed by consumer",
received_message: payload,
metadata: metadata
}, context, correlation_id, event_id))

retry_count = metadata["ApproximateReceiveCount"].to_i - 1
obj.perform(payload["Message"], { retry_count: retry_count })

elapsed_time_ms = Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond) - start
logger.info({
duration_ms: elapsed_time_ms,
message_text: "Message received by consumer poller, processed " \
"in #{elapsed_time_ms} milliseconds"
})
logger.info(log_context({
duration_ms: elapsed_time_ms,
message_text: "Message received by consumer poller, processed " \
"in #{elapsed_time_ms} milliseconds"
}, context, correlation_id, event_id))
end
rescue PipefyMessage::Providers::Errors::ResourceError => e
logger.error("Failed to process message, details #{e.inspect}")
context = "NO_CONTEXT_RETRIEVED" unless defined? context
correlation_id = "NO_CID_RETRIEVED" unless defined? correlation_id
event_id = "NO_EVENT_ID_RETRIEVED" unless defined? event_id
# this shows up in multiple places; OK or DRY up?

logger.error(log_context({
message_text: "Failed to process message; details: #{e.inspect}"
}, context, correlation_id, event_id))
raise e
end
end
Expand Down
12 changes: 12 additions & 0 deletions lib/pipefy_message/logging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,17 @@ def self.logger_setup
def self.included(base)
base.extend(self)
end

##
# Adds context, correlation and event identifiers to logs. Should be
# called with the hash containing the remainder of the logs as
# argument, eg: logger.info(log_context({ ... }, context, correlation_id, event_id)).
def log_context(log_hash, context, correlation_id, event_id)
{
context: context,
correlation_id: correlation_id,
event_id: event_id
}.merge(log_hash)
end
end
end
56 changes: 37 additions & 19 deletions lib/pipefy_message/providers/aws_client/sns_broker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,43 +28,61 @@ def initialize(_opts = {})
##
# Publishes a message with the given payload to the SNS topic
# with topic_name.
def publish(payload, topic_name, context = nil)
context = "NO_CONTEXT_PROVIDED" if context.nil?
def publish(payload, topic_name, context, correlation_id, event_id)
message = prepare_payload(payload)
topic_arn = @topic_arn_prefix + (@is_staging ? "#{topic_name}-staging" : topic_name)
topic = @sns.topic(topic_arn)

logger.info(
{ topic_arn: topic_arn,
payload: payload,
message_text: "Attempting to publish a json message to topic #{topic_arn}" }
)
logger.info(log_context(
{
topic_arn: topic_arn,
payload: payload,
message_text: "Attempting to publish a json message to topic #{topic_arn}"
},
context, correlation_id, event_id
))

result = topic.publish({ message: message.to_json, message_structure: " json ",
result = topic.publish({ message: message.to_json,
message_structure: " json ",
message_attributes: {
"correlationId" => {
data_type: "String",
string_value: SecureRandom.uuid.to_s
string_value: correlation_id
},
"eventId" => {
data_type: "String",
string_value: event_id
},
"context" => {
data_type: "String",
string_value: context
}
} })

logger.info(
{ topic_arn: topic_arn,
id: result.message_id,
message_text: "Message published with ID #{result.message_id}" }
)
logger.info(log_context(
{
topic_arn: topic_arn,
message_id: result.message_id,
message_text: "Message published"
},
context, correlation_id, event_id
))

result
rescue StandardError => e
logger.error(
{ topic_arn: topic_arn,
message_text: "Failed to publish message",
error_details: e.inspect }
)
context = "NO_CONTEXT_RETRIEVED" unless defined? context
correlation_id = "NO_CID_RETRIEVED" unless defined? correlation_id
event_id = "NO_EVENT_ID_RETRIEVED" unless defined? event_id
# this shows up in multiple places; OK or DRY up?

logger.error(log_context(
{
topic_arn: topic_arn,
message_text: "Failed to publish message",
error_details: e.inspect
},
context, correlation_id, event_id
))
end

private
Expand Down
45 changes: 38 additions & 7 deletions lib/pipefy_message/providers/aws_client/sqs_broker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,18 @@ def initialize(opts = {})
AwsClient.aws_setup

@sqs = Aws::SQS::Client.new
logger.debug({ message_text: "SQS client created" })

@is_staging = ENV["ASYNC_APP_ENV"] == "staging"

@queue_url = handle_queue_protocol(@sqs.get_queue_url({ queue_name: handle_queue_name(@config[:queue_name]) })
.queue_url)

@poller = Aws::SQS::QueuePoller.new(@queue_url, { client: @sqs })

logger.debug({
queue_url: @queue_url,
message_text: "SQS client created"
})
rescue StandardError => e
raise PipefyMessage::Providers::Errors::ResourceError, e.message
end
Expand All @@ -32,18 +37,44 @@ def initialize(opts = {})
# Initiates SQS queue polling, with wait_time_seconds as given
# in the initial configuration.
def poller
logger.info(build_log_hash("Initiating SQS polling on queue #{@queue_url}"))
logger.info(merge_log_hash({
message_text: "Initiating SQS polling on queue #{@queue_url}"
}))

@poller.poll({ wait_time_seconds: @config[:wait_time_seconds],
message_attribute_names: ["All"], attribute_names: ["All"] }) do |received_message|
logger.debug(build_log_hash("Message received by SQS poller on queue #{@queue_url}"))

payload = JSON.parse(received_message.body)
metadata = received_message.message_attributes.merge(received_message.attributes)

context = metadata["context"]
correlation_id = metadata["correlationId"]
event_id = metadata["eventId"]
# We're extracting those again in the consumer
# process_message module. I considered whether these
# should perhaps be `yield`ed instead, but I guess
# this is not the bad kind of repetition.

logger.debug(
merge_log_hash(log_context({
message_text: "Message received by SQS poller"
}, context, correlation_id, event_id))
)

yield(payload, metadata)
rescue StandardError => e
# error in the routine, skip delete to try the message again later with 30sec of delay
logger.error("Failed to process message, details #{e.inspect}")

context = "NO_CONTEXT_RETRIEVED" unless defined? context
correlation_id = "NO_CID_RETRIEVED" unless defined? correlation_id
event_id = "NO_EVENT_ID_RETRIEVED" unless defined? event_id
# this shows up in multiple places; OK or DRY up?

logger.error(
merge_log_hash(log_context({
queue_url: @queue_url,
message_text: "Failed to consume message; details: #{e.inspect}"
}, context, correlation_id, event_id))
)

throw e if e.instance_of?(NameError)

Expand All @@ -62,8 +93,8 @@ def default_options

##
# Adds the queue name to logs, if not already present.
def build_log_hash(arg)
{ queue_name: @config[:queue_name], message_text: arg }
def merge_log_hash(log_hash)
{ queue_name: @config[:queue_name] }.merge(log_hash)
end

##
Expand Down
8 changes: 6 additions & 2 deletions lib/pipefy_message/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ def initialize(broker = "aws", broker_opts = {})
@publisher_instance = build_publisher_instance
end

def publish(message, topic, context = nil)
@publisher_instance.publish(message, topic, context)
def publish(message, topic, context = nil, correlation_id = nil)
context = "NO_CONTEXT_PROVIDED" if context.nil?
correlation_id = "NO_CID_PROVIDED" if correlation_id.nil?
event_id = SecureRandom.uuid.to_s

@publisher_instance.publish(message, topic, context, correlation_id, event_id)
end

private
Expand Down
1 change: 0 additions & 1 deletion lib/samples/my_awesome_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,5 @@ class MyAwesomeConsumer
def perform(message, metadata)
puts "Received message #{message} from broker - retry #{metadata[:retry_count]}"
## Fill with our logic here
raise StandardError
end
end
39 changes: 24 additions & 15 deletions spec/providers/aws/sns_broker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,38 +26,47 @@
describe "#publish" do
include_context "AWS stub"

it "should return a message ID and a sequence number" do
publisher = described_class.new

payload = { foo: "bar" }
topic_name = "pipefy-local-topic"
let(:payload) { { foo: "bar" } }
let(:topic_name) { "pipefy-local-topic" }
let(:context) { "NO_CONTEXT_PROVIDED" }
let(:cid) { "NO_CID_PROVIDED" }
let(:event_id) { "15075c9d-7337-4f70-be02-2732aff2c2f7" }

let(:call_publish) do
subject.publish(
payload,
topic_name,
context,
cid,
event_id
)
end

result = publisher.publish(payload, topic_name)
it "should return a message ID and a sequence number" do
result = call_publish

# AWS default stub values:
expect(result.message_id).to eq "messageId"
expect(result.sequence_number).to eq "String"
end

it "should receive payload with expected message attributes" do
sns_broker = described_class.new
sns_client_mock = spy(Aws::SNS::Resource.new)
sns_topic_mock = spy(Aws::SNS::Topic.new(arn: "foo", client: Aws::SNS::Client.new({})))
allow(sns_client_mock).to receive(:topic).and_return(sns_topic_mock)

sns_broker.instance_variable_set(:@sns, sns_client_mock)
allow(SecureRandom).to receive(:uuid).and_return("15075c9d-7337-4f70-be02-2732aff2c2f7")
payload = { foo: "bar" }
topic_name = "pipefy-local-topic"
subject.instance_variable_set(:@sns, sns_client_mock)

sns_broker.publish(payload, topic_name)
call_publish

expected_payload = { message: "{\"default\":{\"foo\":\"bar\"}}",
message_attributes: { "context" => { data_type: "String",
string_value: "NO_CONTEXT_PROVIDED" },
"correlationId" =>
string_value: context },
"correlationId" => { data_type: "String",
string_value: cid },
"eventId" =>
{ data_type: "String",
string_value: "15075c9d-7337-4f70-be02-2732aff2c2f7" } },
string_value: event_id } },
message_structure: " json " }

expect(sns_topic_mock).to have_received(:publish).with(expected_payload)
Expand Down
33 changes: 22 additions & 11 deletions spec/publisher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,25 @@
require_relative "../lib/pipefy_message/providers/aws_client/sns_broker"

class TestBroker
def publish(message, topic, context = nil); end
def publish(message, topic, context, correlation_id, event_id); end
end

RSpec.describe PipefyMessage::Publisher do
let(:payload) { { foo: "bar" } }
let(:topic_name) { "pipefy-local-topic" }
let(:context) { "NO_CONTEXT_PROVIDED" }
let(:cid) { "NO_CID_PROVIDED" }
let(:event_id) { "15075c9d-7337-4f70-be02-2732aff2c2f7" }

let(:call_publish) do
subject.publish(
payload,
topic_name,
context,
cid
)
end

it "forwards the message to be published by an instance" do
test_broker = instance_double("TestBroker")
allow(test_broker).to receive(:publish)
Expand All @@ -15,13 +30,11 @@ def publish(message, topic, context = nil); end
.to receive(:build_publisher_instance)
.and_return(test_broker)

publisher = described_class.new
allow(SecureRandom).to receive(:uuid).and_return("15075c9d-7337-4f70-be02-2732aff2c2f7")

payload = { foo: "bar" }
topic_name = "pipefy-local-topic"
publisher.publish(payload, topic_name)
call_publish

expect(test_broker).to have_received(:publish).with(payload, topic_name, nil)
expect(test_broker).to have_received(:publish).with(payload, topic_name, context, cid, event_id)
end

context "when I try to publish a message to SNS broker" do
Expand Down Expand Up @@ -54,13 +67,11 @@ def publish(message, topic, context = nil); end
.to receive(:build_publisher_instance)
.and_return(mocked_publisher_impl)

publisher = described_class.new
allow(SecureRandom).to receive(:uuid).and_return("15075c9d-7337-4f70-be02-2732aff2c2f7")

payload = { foo: "bar" }
topic_name = "pipefy-local-topic"
result = publisher.publish(payload, topic_name)
result = call_publish
expect(result).to eq mocked_return
expect(mocked_publisher_impl).to have_received(:publish).with(payload, topic_name, nil)
expect(mocked_publisher_impl).to have_received(:publish).with(payload, topic_name, context, cid, event_id)
end
end
end

0 comments on commit 89b680a

Please sign in to comment.