diff --git a/.rubocop.yml b/.rubocop.yml index c69fa14..b1aa165 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -19,9 +19,16 @@ Metrics/BlockLength: Lint/MissingSuper: Enabled: false +# Essentially, logs are making methods and blocks huge, not the +# logic itself. We thus decided to disable some linter checks +# related to number of lines. + Metrics/MethodLength: Enabled: false +Metrics/BlockLength: + Enabled: false + Metrics/AbcSize: Enabled: false diff --git a/lib/pipefy_message/consumer.rb b/lib/pipefy_message/consumer.rb index 95dd088..dcc3b2c 100644 --- a/lib/pipefy_message/consumer.rb +++ b/lib/pipefy_message/consumer.rb @@ -72,24 +72,36 @@ def process_message build_consumer_instance.poller do |payload, metadata| start = Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond) - logger.info({ - message_text: "Message received by poller to be processed by consumer", - received_message: payload, - metadata: metadata - }) + + context = metadata["context"] + correlation_id = metadata["correlationId"] + event_id = metadata["eventId"] + + logger.info(log_context({ + message_text: "Message received by poller to be processed by consumer", + received_message: payload, + metadata: metadata + }, context, correlation_id, event_id)) retry_count = metadata["ApproximateReceiveCount"].to_i - 1 obj.perform(payload["Message"], { retry_count: retry_count }) elapsed_time_ms = Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond) - start - logger.info({ - duration_ms: elapsed_time_ms, - message_text: "Message received by consumer poller, processed " \ - "in #{elapsed_time_ms} milliseconds" - }) + logger.info(log_context({ + duration_ms: elapsed_time_ms, + message_text: "Message received by consumer poller, processed " \ + "in #{elapsed_time_ms} milliseconds" + }, context, correlation_id, event_id)) end rescue PipefyMessage::Providers::Errors::ResourceError => e - logger.error("Failed to process message, details #{e.inspect}") + context = "NO_CONTEXT_RETRIEVED" unless defined? context + correlation_id = "NO_CID_RETRIEVED" unless defined? correlation_id + event_id = "NO_EVENT_ID_RETRIEVED" unless defined? event_id + # this shows up in multiple places; OK or DRY up? + + logger.error(log_context({ + message_text: "Failed to process message; details: #{e.inspect}" + }, context, correlation_id, event_id)) raise e end end diff --git a/lib/pipefy_message/logging.rb b/lib/pipefy_message/logging.rb index 207b884..cb9c27e 100644 --- a/lib/pipefy_message/logging.rb +++ b/lib/pipefy_message/logging.rb @@ -47,5 +47,17 @@ def self.logger_setup def self.included(base) base.extend(self) end + + ## + # Adds context, correlation and event identifiers to logs. Should be + # called with the hash containing the remainder of the logs as + # argument, eg: logger.info(log_context({ ... }, context, correlation_id, event_id)). + def log_context(log_hash, context, correlation_id, event_id) + { + context: context, + correlation_id: correlation_id, + event_id: event_id + }.merge(log_hash) + end end end diff --git a/lib/pipefy_message/providers/aws_client/sns_broker.rb b/lib/pipefy_message/providers/aws_client/sns_broker.rb index f848a48..6ac3874 100644 --- a/lib/pipefy_message/providers/aws_client/sns_broker.rb +++ b/lib/pipefy_message/providers/aws_client/sns_broker.rb @@ -28,23 +28,30 @@ def initialize(_opts = {}) ## # Publishes a message with the given payload to the SNS topic # with topic_name. - def publish(payload, topic_name, context = nil) - context = "NO_CONTEXT_PROVIDED" if context.nil? + def publish(payload, topic_name, context, correlation_id, event_id) message = prepare_payload(payload) topic_arn = @topic_arn_prefix + (@is_staging ? "#{topic_name}-staging" : topic_name) topic = @sns.topic(topic_arn) - logger.info( - { topic_arn: topic_arn, - payload: payload, - message_text: "Attempting to publish a json message to topic #{topic_arn}" } - ) + logger.info(log_context( + { + topic_arn: topic_arn, + payload: payload, + message_text: "Attempting to publish a json message to topic #{topic_arn}" + }, + context, correlation_id, event_id + )) - result = topic.publish({ message: message.to_json, message_structure: " json ", + result = topic.publish({ message: message.to_json, + message_structure: " json ", message_attributes: { "correlationId" => { data_type: "String", - string_value: SecureRandom.uuid.to_s + string_value: correlation_id + }, + "eventId" => { + data_type: "String", + string_value: event_id }, "context" => { data_type: "String", @@ -52,19 +59,30 @@ def publish(payload, topic_name, context = nil) } } }) - logger.info( - { topic_arn: topic_arn, - id: result.message_id, - message_text: "Message published with ID #{result.message_id}" } - ) + logger.info(log_context( + { + topic_arn: topic_arn, + message_id: result.message_id, + message_text: "Message published" + }, + context, correlation_id, event_id + )) result rescue StandardError => e - logger.error( - { topic_arn: topic_arn, - message_text: "Failed to publish message", - error_details: e.inspect } - ) + context = "NO_CONTEXT_RETRIEVED" unless defined? context + correlation_id = "NO_CID_RETRIEVED" unless defined? correlation_id + event_id = "NO_EVENT_ID_RETRIEVED" unless defined? event_id + # this shows up in multiple places; OK or DRY up? + + logger.error(log_context( + { + topic_arn: topic_arn, + message_text: "Failed to publish message", + error_details: e.inspect + }, + context, correlation_id, event_id + )) end private diff --git a/lib/pipefy_message/providers/aws_client/sqs_broker.rb b/lib/pipefy_message/providers/aws_client/sqs_broker.rb index eb5cfaa..ee7f081 100644 --- a/lib/pipefy_message/providers/aws_client/sqs_broker.rb +++ b/lib/pipefy_message/providers/aws_client/sqs_broker.rb @@ -17,13 +17,18 @@ def initialize(opts = {}) AwsClient.aws_setup @sqs = Aws::SQS::Client.new - logger.debug({ message_text: "SQS client created" }) + @is_staging = ENV["ASYNC_APP_ENV"] == "staging" @queue_url = handle_queue_protocol(@sqs.get_queue_url({ queue_name: handle_queue_name(@config[:queue_name]) }) .queue_url) @poller = Aws::SQS::QueuePoller.new(@queue_url, { client: @sqs }) + + logger.debug({ + queue_url: @queue_url, + message_text: "SQS client created" + }) rescue StandardError => e raise PipefyMessage::Providers::Errors::ResourceError, e.message end @@ -32,18 +37,44 @@ def initialize(opts = {}) # Initiates SQS queue polling, with wait_time_seconds as given # in the initial configuration. def poller - logger.info(build_log_hash("Initiating SQS polling on queue #{@queue_url}")) + logger.info(merge_log_hash({ + message_text: "Initiating SQS polling on queue #{@queue_url}" + })) @poller.poll({ wait_time_seconds: @config[:wait_time_seconds], message_attribute_names: ["All"], attribute_names: ["All"] }) do |received_message| - logger.debug(build_log_hash("Message received by SQS poller on queue #{@queue_url}")) - payload = JSON.parse(received_message.body) metadata = received_message.message_attributes.merge(received_message.attributes) + + context = metadata["context"] + correlation_id = metadata["correlationId"] + event_id = metadata["eventId"] + # We're extracting those again in the consumer + # process_message module. I considered whether these + # should perhaps be `yield`ed instead, but I guess + # this is not the bad kind of repetition. + + logger.debug( + merge_log_hash(log_context({ + message_text: "Message received by SQS poller" + }, context, correlation_id, event_id)) + ) + yield(payload, metadata) rescue StandardError => e # error in the routine, skip delete to try the message again later with 30sec of delay - logger.error("Failed to process message, details #{e.inspect}") + + context = "NO_CONTEXT_RETRIEVED" unless defined? context + correlation_id = "NO_CID_RETRIEVED" unless defined? correlation_id + event_id = "NO_EVENT_ID_RETRIEVED" unless defined? event_id + # this shows up in multiple places; OK or DRY up? + + logger.error( + merge_log_hash(log_context({ + queue_url: @queue_url, + message_text: "Failed to consume message; details: #{e.inspect}" + }, context, correlation_id, event_id)) + ) throw e if e.instance_of?(NameError) @@ -62,8 +93,8 @@ def default_options ## # Adds the queue name to logs, if not already present. - def build_log_hash(arg) - { queue_name: @config[:queue_name], message_text: arg } + def merge_log_hash(log_hash) + { queue_name: @config[:queue_name] }.merge(log_hash) end ## diff --git a/lib/pipefy_message/publisher.rb b/lib/pipefy_message/publisher.rb index ab470d7..42d62d2 100644 --- a/lib/pipefy_message/publisher.rb +++ b/lib/pipefy_message/publisher.rb @@ -14,8 +14,12 @@ def initialize(broker = "aws", broker_opts = {}) @publisher_instance = build_publisher_instance end - def publish(message, topic, context = nil) - @publisher_instance.publish(message, topic, context) + def publish(message, topic, context = nil, correlation_id = nil) + context = "NO_CONTEXT_PROVIDED" if context.nil? + correlation_id = "NO_CID_PROVIDED" if correlation_id.nil? + event_id = SecureRandom.uuid.to_s + + @publisher_instance.publish(message, topic, context, correlation_id, event_id) end private diff --git a/lib/samples/my_awesome_consumer.rb b/lib/samples/my_awesome_consumer.rb index 1679856..edb86c8 100644 --- a/lib/samples/my_awesome_consumer.rb +++ b/lib/samples/my_awesome_consumer.rb @@ -11,6 +11,5 @@ class MyAwesomeConsumer def perform(message, metadata) puts "Received message #{message} from broker - retry #{metadata[:retry_count]}" ## Fill with our logic here - raise StandardError end end diff --git a/spec/providers/aws/sns_broker_spec.rb b/spec/providers/aws/sns_broker_spec.rb index fe1f2a9..d73151c 100644 --- a/spec/providers/aws/sns_broker_spec.rb +++ b/spec/providers/aws/sns_broker_spec.rb @@ -26,13 +26,24 @@ describe "#publish" do include_context "AWS stub" - it "should return a message ID and a sequence number" do - publisher = described_class.new - - payload = { foo: "bar" } - topic_name = "pipefy-local-topic" + let(:payload) { { foo: "bar" } } + let(:topic_name) { "pipefy-local-topic" } + let(:context) { "NO_CONTEXT_PROVIDED" } + let(:cid) { "NO_CID_PROVIDED" } + let(:event_id) { "15075c9d-7337-4f70-be02-2732aff2c2f7" } + + let(:call_publish) do + subject.publish( + payload, + topic_name, + context, + cid, + event_id + ) + end - result = publisher.publish(payload, topic_name) + it "should return a message ID and a sequence number" do + result = call_publish # AWS default stub values: expect(result.message_id).to eq "messageId" @@ -40,24 +51,22 @@ end it "should receive payload with expected message attributes" do - sns_broker = described_class.new sns_client_mock = spy(Aws::SNS::Resource.new) sns_topic_mock = spy(Aws::SNS::Topic.new(arn: "foo", client: Aws::SNS::Client.new({}))) allow(sns_client_mock).to receive(:topic).and_return(sns_topic_mock) - sns_broker.instance_variable_set(:@sns, sns_client_mock) - allow(SecureRandom).to receive(:uuid).and_return("15075c9d-7337-4f70-be02-2732aff2c2f7") - payload = { foo: "bar" } - topic_name = "pipefy-local-topic" + subject.instance_variable_set(:@sns, sns_client_mock) - sns_broker.publish(payload, topic_name) + call_publish expected_payload = { message: "{\"default\":{\"foo\":\"bar\"}}", message_attributes: { "context" => { data_type: "String", - string_value: "NO_CONTEXT_PROVIDED" }, - "correlationId" => + string_value: context }, + "correlationId" => { data_type: "String", + string_value: cid }, + "eventId" => { data_type: "String", - string_value: "15075c9d-7337-4f70-be02-2732aff2c2f7" } }, + string_value: event_id } }, message_structure: " json " } expect(sns_topic_mock).to have_received(:publish).with(expected_payload) diff --git a/spec/publisher_spec.rb b/spec/publisher_spec.rb index 680772e..c535bf7 100644 --- a/spec/publisher_spec.rb +++ b/spec/publisher_spec.rb @@ -3,10 +3,25 @@ require_relative "../lib/pipefy_message/providers/aws_client/sns_broker" class TestBroker - def publish(message, topic, context = nil); end + def publish(message, topic, context, correlation_id, event_id); end end RSpec.describe PipefyMessage::Publisher do + let(:payload) { { foo: "bar" } } + let(:topic_name) { "pipefy-local-topic" } + let(:context) { "NO_CONTEXT_PROVIDED" } + let(:cid) { "NO_CID_PROVIDED" } + let(:event_id) { "15075c9d-7337-4f70-be02-2732aff2c2f7" } + + let(:call_publish) do + subject.publish( + payload, + topic_name, + context, + cid + ) + end + it "forwards the message to be published by an instance" do test_broker = instance_double("TestBroker") allow(test_broker).to receive(:publish) @@ -15,13 +30,11 @@ def publish(message, topic, context = nil); end .to receive(:build_publisher_instance) .and_return(test_broker) - publisher = described_class.new + allow(SecureRandom).to receive(:uuid).and_return("15075c9d-7337-4f70-be02-2732aff2c2f7") - payload = { foo: "bar" } - topic_name = "pipefy-local-topic" - publisher.publish(payload, topic_name) + call_publish - expect(test_broker).to have_received(:publish).with(payload, topic_name, nil) + expect(test_broker).to have_received(:publish).with(payload, topic_name, context, cid, event_id) end context "when I try to publish a message to SNS broker" do @@ -54,13 +67,11 @@ def publish(message, topic, context = nil); end .to receive(:build_publisher_instance) .and_return(mocked_publisher_impl) - publisher = described_class.new + allow(SecureRandom).to receive(:uuid).and_return("15075c9d-7337-4f70-be02-2732aff2c2f7") - payload = { foo: "bar" } - topic_name = "pipefy-local-topic" - result = publisher.publish(payload, topic_name) + result = call_publish expect(result).to eq mocked_return - expect(mocked_publisher_impl).to have_received(:publish).with(payload, topic_name, nil) + expect(mocked_publisher_impl).to have_received(:publish).with(payload, topic_name, context, cid, event_id) end end end