From 39a977f154bda66c5cf5ad86d694e46085f9cf76 Mon Sep 17 00:00:00 2001 From: Eduarda Ferreira Date: Mon, 15 Aug 2022 17:35:16 -0300 Subject: [PATCH 1/9] feat: Treat consumer perform failure separate from polling failure --- lib/pipefy_message/consumer.rb | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/lib/pipefy_message/consumer.rb b/lib/pipefy_message/consumer.rb index 3a752e7..d198e71 100644 --- a/lib/pipefy_message/consumer.rb +++ b/lib/pipefy_message/consumer.rb @@ -80,12 +80,23 @@ def process_message logger.info(log_context({ log_text: "Message received by poller to be processed by consumer", received_message: payload, - metadata: metadata + received_metadata: metadata }, context, correlation_id, event_id)) retry_count = metadata["ApproximateReceiveCount"].to_i - 1 - obj.perform(payload, - { retry_count: retry_count, context: context, correlation_id: correlation_id }) + + begin + obj.perform(payload, + { retry_count: retry_count, context: context, correlation_id: correlation_id }) + rescue StandardError => e + logger.error(log_context({ + received_message: payload, + received_metadata: metadata, + log_text: "Consumer #{obj.name}.perform method failed to process "\ + "received_message with #{e.inspect}" + }, context, correlation_id, event_id)) + raise e + end elapsed_time_ms = Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond) - start logger.info(log_context({ @@ -94,14 +105,14 @@ def process_message "in #{elapsed_time_ms} milliseconds" }, context, correlation_id, event_id)) end - rescue PipefyMessage::Providers::Errors::ResourceError => e + rescue StandardError => e context = "NO_CONTEXT_RETRIEVED" unless defined? context correlation_id = "NO_CID_RETRIEVED" unless defined? correlation_id event_id = "NO_EVENT_ID_RETRIEVED" unless defined? event_id # this shows up in multiple places; OK or DRY up? logger.error(log_context({ - log_text: "Failed to process message; details: #{e.inspect}" + log_text: "Queue polling failed with #{e.inspect}" }, context, correlation_id, event_id)) raise e end From ebe64ee8f54a5d89422eb376862f5116fd2a3d05 Mon Sep 17 00:00:00 2001 From: Eduarda Ferreira Date: Mon, 15 Aug 2022 17:37:28 -0300 Subject: [PATCH 2/9] feat: Distinguish errors when resolving and publishing and log message payload --- .../providers/aws_client/sns_broker.rb | 39 +++++++++++++++---- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/lib/pipefy_message/providers/aws_client/sns_broker.rb b/lib/pipefy_message/providers/aws_client/sns_broker.rb index d1fad9c..bc145d7 100644 --- a/lib/pipefy_message/providers/aws_client/sns_broker.rb +++ b/lib/pipefy_message/providers/aws_client/sns_broker.rb @@ -22,7 +22,13 @@ def initialize(_opts = {}) @topic_arn_prefix = ENV.fetch("AWS_SNS_ARN_PREFIX", "arn:aws:sns:us-east-1:000000000000:") @is_staging = ENV["ASYNC_APP_ENV"] == "staging" rescue StandardError => e - raise PipefyMessage::Providers::Errors::ResourceError, e.message + msg = "Failed to initialize AWS SNS broker with #{e.inspect}" + + logger.error({ + log_text: msg + }) + + raise PipefyMessage::Providers::Errors::ResourceError, msg end ## @@ -31,13 +37,27 @@ def initialize(_opts = {}) def publish(payload, topic_name, context, correlation_id, event_id) message = prepare_payload(payload) topic_arn = @topic_arn_prefix + (@is_staging ? "#{topic_name}-staging" : topic_name) - topic = @sns.topic(topic_arn) + + begin + topic = @sns.topic(topic_arn) + rescue StandardError => e + msg = "Resolving AWS SNS topic #{topic_name} failed with #{e.inspect}" + + logger.error({ + topic_name: topic_name, + topic_arn: topic_arn, + log_text: msg + }) + + raise PipefyMessage::Providers::Errors::ResourceError, msg + end logger.info(log_context( { + topic_name: topic_name, topic_arn: topic_arn, payload: payload, - log_text: "Attempting to publish a json message to topic #{topic_arn}" + log_text: "Attempting to publish message to SNS topic #{topic_name}" }, context, correlation_id, event_id )) @@ -61,9 +81,11 @@ def publish(payload, topic_name, context, correlation_id, event_id) logger.info(log_context( { + topic_name: topic_name, topic_arn: topic_arn, - message_id: result.message_id, - log_text: "Message published" + aws_message_id: result.message_id, + payload: payload, + log_text: "Message published to SNS topic #{topic_name}" }, context, correlation_id, event_id )) @@ -77,12 +99,15 @@ def publish(payload, topic_name, context, correlation_id, event_id) logger.error(log_context( { + topic_name: topic_name, topic_arn: topic_arn, - log_text: "Failed to publish message", - error_details: e.inspect + payload: payload, + log_text: "AWS SNS broker failed to publish to topic #{topic_name} with #{e.inspect}" }, context, correlation_id, event_id )) + + raise e end private From 95a4f03683c858a0f7d4d1c7a5de8a75e6285d57 Mon Sep 17 00:00:00 2001 From: Eduarda Ferreira Date: Mon, 15 Aug 2022 17:39:16 -0300 Subject: [PATCH 3/9] feat/refact: Create QueueDoesNotExist error and remove separate logging on exception init (That is a particularly common error atm) --- lib/pipefy_message/providers/errors.rb | 53 ++++++-------------------- 1 file changed, 11 insertions(+), 42 deletions(-) diff --git a/lib/pipefy_message/providers/errors.rb b/lib/pipefy_message/providers/errors.rb index c5f7649..7c6ef43 100644 --- a/lib/pipefy_message/providers/errors.rb +++ b/lib/pipefy_message/providers/errors.rb @@ -8,53 +8,23 @@ module Providers # as common error messages. module Errors ## - # Enables automatic error logging when prepended to a custom error - # class. - # - # In order for this module to work, the prepending class must - # inherit from the Ruby Exception class. Note that this condition - # is satisfied by any custom error class that inherits from - # StandardError and its children, such as RuntimeError, given that - # StandardError is itself a child of Exception. - # - # The reason to use prepend rather than include is to make this - # module come below the error class in the inheritance chain. - # This ensures that, when an error is raised and its constructor - # is called, this module's initialize method gets called first, - # then calls the original constructor with super, and calls the - # logger once initialization is done. This effectively "wraps" - # the error initialize method in the logging constructor below. - module LoggingError - prepend PipefyMessage::Logging + # Abstraction for service and networking errors. + class ResourceError < RuntimeError + end - def initialize(msg = nil) + ## + # Abstraction for errors caused by nonexisting queues, such as + # Aws::SQS::Errors::QueueDoesNotExist. + class QueueDoesNotExist < ResourceError + def initialize(msg = "The specified queue does not exist") super - logger.error({ - error_class: self.class, - error_message: message, - stack_trace: full_message - }) - - # message and full_message are methods provided by the - # Ruby Exception class. - # The hash keys used above were an attempt to provide more - # descriptive names than those of the original methods (:P), - # but this has still led to some confusion, so, to be more - # explicit: if e is an instance of Exception (or its - # children), e.message returns the error message for e and - # e.full_message provides the full stack trace. This is - # what's being included in the logs above. Logging inside - # the constructor ensures information is logged as soon - # as the error is raised. - # For details, please refer to the official documentation for - # the Exception class. end end ## - # Abstraction for service and networking errors. - class ResourceError < RuntimeError - prepend PipefyMessage::Providers::Errors::LoggingError + # Abstraction for provider authorization errors, such as + # Aws::SNS::Errors::AuthorizationError. + class AuthorizationError < RuntimeError end ## @@ -62,7 +32,6 @@ class ResourceError < RuntimeError # to a method call (eg: if a queueing service client is # initialized with an invalid queue identifier). class InvalidOption < ArgumentError - prepend PipefyMessage::Providers::Errors::LoggingError end # Error messages: From d34b9f0824cf795a64c8278a41a3819ab6b667ee Mon Sep 17 00:00:00 2001 From: Eduarda Ferreira Date: Mon, 15 Aug 2022 17:41:02 -0300 Subject: [PATCH 4/9] feat: Log failures when initializing and publishing --- .../providers/aws_client/sqs_broker.rb | 66 ++++++++++++------- lib/pipefy_message/publisher.rb | 20 +++++- 2 files changed, 62 insertions(+), 24 deletions(-) diff --git a/lib/pipefy_message/providers/aws_client/sqs_broker.rb b/lib/pipefy_message/providers/aws_client/sqs_broker.rb index e5a9640..9e97d5c 100644 --- a/lib/pipefy_message/providers/aws_client/sqs_broker.rb +++ b/lib/pipefy_message/providers/aws_client/sqs_broker.rb @@ -25,19 +25,34 @@ def initialize(opts = {}) @poller = Aws::SQS::QueuePoller.new(@queue_url, { client: @sqs }) - logger.debug({ - queue_url: @queue_url, - log_text: "SQS client created" + logger.debug(log_queue_info({ + log_text: "SQS client created" + })) + rescue Aws::SQS::Errors::QueueDoesNotExist, Aws::SQS::Errors::NonExistentQueue + logger.error({ + queue_name: @config[:queue_name], + log_text: "Failed to initialize AWS SQS broker: the specified queue "\ + "(#{@config[:queue_name]}) does not exist" }) + + raise PipefyMessage::Providers::Errors::QueueDoesNotExist, + "The specified AWS SQS queue #{@config[:queue_name]} does not exist" rescue StandardError => e - raise PipefyMessage::Providers::Errors::ResourceError, e.message + msg = "Failed to initialize AWS SQS broker with #{e.inspect}" + + logger.error({ + queue_name: @config[:queue_name], + log_text: msg + }) + + raise PipefyMessage::Providers::Errors::ResourceError, msg end ## # Initiates SQS queue polling, with wait_time_seconds as given # in the initial configuration. def poller - logger.info(merge_log_hash({ + logger.info(log_queue_info({ log_text: "Initiating SQS polling on queue #{@queue_url}" })) @@ -48,30 +63,34 @@ def poller correlation_id = metadata[:correlationId] event_id = metadata[:eventId] # We're extracting those again in the consumer - # process_message module. I considered whether these + # process_message method. I considered whether these # should perhaps be `yield`ed instead, but I guess # this is not the bad kind of repetition. logger.debug( - merge_log_hash(log_context({ - log_text: "Message received by SQS poller" - }, context, correlation_id, event_id)) + log_queue_info(log_context({ received_message: payload, + received_metadata: metadata, + log_text: "Message received by SQS poller" }, + context, correlation_id, event_id)) ) yield(payload, metadata) rescue StandardError => e - # error in the routine, skip delete to try the message again later with 30sec of delay - - context = "NO_CONTEXT_RETRIEVED" unless defined? context - correlation_id = "NO_CID_RETRIEVED" unless defined? correlation_id - event_id = "NO_EVENT_ID_RETRIEVED" unless defined? event_id - # this shows up in multiple places; OK or DRY up? + if defined? received_message + # This would probably only be the case if a malformed and + # thus unparseable message is received (eg: in case of + # breaking changes in SQS) + + logger.error(log_queue_info({ + received_message: received_message, + log_text: "Consuming received_message failed with #{e.inspect}" + })) + else + logger.error(log_queue_info({ + log_text: "SQS polling failed with #{e.inspect}" + })) + end - logger.error( - merge_log_hash(log_context({ - queue_url: @queue_url, - log_text: "Failed to consume message; details: #{e.inspect}" - }, context, correlation_id, event_id)) - ) + # error in the routine, skip delete to try the message again later with 30sec of delay throw e if e.instance_of?(NameError) @@ -90,8 +109,9 @@ def default_options ## # Adds the queue name to logs, if not already present. - def merge_log_hash(log_hash) - { queue_name: @config[:queue_name] }.merge(log_hash) + def log_queue_info(log_hash) + { queue_name: @config[:queue_name], + queue_url: @queue_url }.merge(log_hash) end ## diff --git a/lib/pipefy_message/publisher.rb b/lib/pipefy_message/publisher.rb index 42d62d2..6b29b74 100644 --- a/lib/pipefy_message/publisher.rb +++ b/lib/pipefy_message/publisher.rb @@ -11,7 +11,17 @@ class Publisher def initialize(broker = "aws", broker_opts = {}) @broker = broker @broker_opts = broker_opts - @publisher_instance = build_publisher_instance + begin + @publisher_instance = build_publisher_instance + rescue StandardError => e + logger.error({ + broker: broker, + broker_opts: broker_opts, + log_text: "Failed to initialize #{broker} broker with #{e.inspect}" + }) + + raise e + end end def publish(message, topic, context = nil, correlation_id = nil) @@ -20,6 +30,14 @@ def publish(message, topic, context = nil, correlation_id = nil) event_id = SecureRandom.uuid.to_s @publisher_instance.publish(message, topic, context, correlation_id, event_id) + rescue StandardError => e + logger.error(log_context({ + topic_name: topic, + message: message, + log_text: "Failed to publish message to topic #{topic} with #{e.inspect}" + }, context, correlation_id, event_id)) + + raise e end private From 889324e6d1e6192abfcbfa61f395df14b0797ff0 Mon Sep 17 00:00:00 2001 From: Eduarda Ferreira Date: Mon, 15 Aug 2022 17:41:16 -0300 Subject: [PATCH 5/9] fix: Fix failing specs --- spec/providers/aws/sqs_broker_spec.rb | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/spec/providers/aws/sqs_broker_spec.rb b/spec/providers/aws/sqs_broker_spec.rb index dad9ed5..9391fd9 100644 --- a/spec/providers/aws/sqs_broker_spec.rb +++ b/spec/providers/aws/sqs_broker_spec.rb @@ -8,8 +8,9 @@ RSpec.describe PipefyMessage::Providers::AwsClient::SqsBroker do include_context "AWS stub" + let(:test_queue) { "test-queue" } + describe "#initialize" do - let(:test_queue) { "test-queue" } let(:sqs_opts) do { wait_time_seconds: 10, # default (not set) @@ -42,15 +43,15 @@ end it "should handle queue name by environment " do - [{ env: "staging", expected_queue_name: "test_queue-staging" }, - { env: "dev", expected_queue_name: "test_queue" }, - { env: "prod", expected_queue_name: "test_queue" }].each do |obj| + [{ env: "staging", expected_queue_name: "#{test_queue}-staging" }, + { env: "dev", expected_queue_name: test_queue }, + { env: "prod", expected_queue_name: test_queue }].each do |obj| ENV["ASYNC_APP_ENV"] = obj[:env] mock_sqs_client = instance_double("Aws::SQS::Client") allow(mock_sqs_client).to receive(:get_queue_url).and_return(Aws::SQS::Types::GetQueueUrlResult.new(queue_url: "http://fake/url")) allow(Aws::SQS::Client).to receive(:new).and_return(mock_sqs_client) - described_class.new(queue_name: "test_queue") + described_class.new(queue_name: test_queue) expect(mock_sqs_client).to have_received(:get_queue_url).with({ queue_name: obj[:expected_queue_name] }) end @@ -162,7 +163,7 @@ it "should raise NonExistentQueue" do allow_any_instance_of(Aws::SQS::Client) .to receive(:get_queue_url) - .with({ queue_name: "pipefy-local-queue" }) + .with({ queue_name: test_queue }) .and_raise( Aws::SQS::Errors::NonExistentQueue.new( double(Aws::SQS::Client), @@ -171,14 +172,13 @@ ) expect do - described_class.new - end.to raise_error(PipefyMessage::Providers::Errors::ResourceError, - /The specified queue my_queue does not exist for this wsdl version/) + described_class.new({ queue_name: test_queue }) + end.to raise_error(PipefyMessage::Providers::Errors::QueueDoesNotExist) end it "should raise NetworkingError" do allow_any_instance_of(Aws::SQS::Client) .to receive(:get_queue_url) - .with({ queue_name: "pipefy-local-queue" }) + .with({ queue_name: test_queue }) .and_raise( Seahorse::Client::NetworkingError.new( Errno::ECONNREFUSED.new(""), @@ -187,7 +187,7 @@ ) expect do - described_class.new("my_queue") + described_class.new({ queue_name: test_queue }) end.to raise_error(PipefyMessage::Providers::Errors::ResourceError) end end From 845a781447baa49e3b07d5945a0c5bde37769f9e Mon Sep 17 00:00:00 2001 From: Marcelo Shiba Date: Tue, 16 Aug 2022 17:00:58 -0300 Subject: [PATCH 6/9] :egg: Override Rails::logger by gem logger This is needed to assure that all logs are logged as json --- bin/pipefymessage | 6 ++++++ lib/pipefy_message/logging.rb | 3 ++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/bin/pipefymessage b/bin/pipefymessage index 1ae323c..b69879f 100755 --- a/bin/pipefymessage +++ b/bin/pipefymessage @@ -4,6 +4,7 @@ require "rubygems" require "singleton" require "thor" +require "pipefy_message" module PipefyMessage # CLI @@ -24,6 +25,8 @@ module PipefyMessage # Runner class Runner include Singleton + include PipefyMessage::Logging + def initialize_rails # Adapted from: https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/cli.rb @@ -35,6 +38,9 @@ module PipefyMessage require File.expand_path("config/application.rb") require File.expand_path("config/environment.rb") end + + # Override rails logger by gem logger + ::Rails.logger = logger end def write_pid(options) diff --git a/lib/pipefy_message/logging.rb b/lib/pipefy_message/logging.rb index 594013b..5f4993f 100644 --- a/lib/pipefy_message/logging.rb +++ b/lib/pipefy_message/logging.rb @@ -32,11 +32,12 @@ def self.logger_setup logger.level = LOG_LEVELS.index(ENV.fetch("ASYNC_LOG_LEVEL", "INFO")) || Logger::ERROR logger.formatter = proc do |severity, datetime, progname, msg| + msg_hash = msg.is_a?(Hash) ? msg : { log_text: msg } { time: datetime.to_s, level: severity.to_s, program_name: progname.to_s, context: "async_processing", - data: msg }.to_json + $INPUT_RECORD_SEPARATOR + data: msg_hash }.to_json + $INPUT_RECORD_SEPARATOR end end end From 8a4cd5e689140e4941d09ed52bee741840dd8405 Mon Sep 17 00:00:00 2001 From: Marcelo Shiba Date: Tue, 16 Aug 2022 17:07:24 -0300 Subject: [PATCH 7/9] :art: Remove empty lines --- .../providers/aws_client/sqs_broker.rb | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/lib/pipefy_message/providers/aws_client/sqs_broker.rb b/lib/pipefy_message/providers/aws_client/sqs_broker.rb index 9e97d5c..400afdb 100644 --- a/lib/pipefy_message/providers/aws_client/sqs_broker.rb +++ b/lib/pipefy_message/providers/aws_client/sqs_broker.rb @@ -17,17 +17,12 @@ def initialize(opts = {}) AwsClient.aws_setup @sqs = Aws::SQS::Client.new - @is_staging = ENV["ASYNC_APP_ENV"] == "staging" - @queue_url = handle_queue_protocol(@sqs.get_queue_url({ queue_name: handle_queue_name(@config[:queue_name]) }) .queue_url) - @poller = Aws::SQS::QueuePoller.new(@queue_url, { client: @sqs }) - logger.debug(log_queue_info({ - log_text: "SQS client created" - })) + logger.debug(log_queue_info({ log_text: "SQS client created" })) rescue Aws::SQS::Errors::QueueDoesNotExist, Aws::SQS::Errors::NonExistentQueue logger.error({ queue_name: @config[:queue_name], @@ -39,12 +34,10 @@ def initialize(opts = {}) "The specified AWS SQS queue #{@config[:queue_name]} does not exist" rescue StandardError => e msg = "Failed to initialize AWS SQS broker with #{e.inspect}" - logger.error({ queue_name: @config[:queue_name], log_text: msg }) - raise PipefyMessage::Providers::Errors::ResourceError, msg end @@ -52,9 +45,7 @@ def initialize(opts = {}) # Initiates SQS queue polling, with wait_time_seconds as given # in the initial configuration. def poller - logger.info(log_queue_info({ - log_text: "Initiating SQS polling on queue #{@queue_url}" - })) + logger.info(log_queue_info({ log_text: "Initiating SQS polling on queue #{@queue_url}" })) @poller.poll({ wait_time_seconds: @config[:wait_time_seconds], message_attribute_names: ["All"], attribute_names: ["All"] }) do |received_message| @@ -79,7 +70,6 @@ def poller # This would probably only be the case if a malformed and # thus unparseable message is received (eg: in case of # breaking changes in SQS) - logger.error(log_queue_info({ received_message: received_message, log_text: "Consuming received_message failed with #{e.inspect}" @@ -91,7 +81,6 @@ def poller end # error in the routine, skip delete to try the message again later with 30sec of delay - throw e if e.instance_of?(NameError) throw :skip_delete From d20428919b698307f8a0107e21e2d1eedbad8151 Mon Sep 17 00:00:00 2001 From: Marcelo Shiba Date: Wed, 17 Aug 2022 11:06:21 -0300 Subject: [PATCH 8/9] :bug: I thought it was supposed to be obj.class.name --- lib/pipefy_message/consumer.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pipefy_message/consumer.rb b/lib/pipefy_message/consumer.rb index d198e71..bf7f7b1 100644 --- a/lib/pipefy_message/consumer.rb +++ b/lib/pipefy_message/consumer.rb @@ -92,7 +92,7 @@ def process_message logger.error(log_context({ received_message: payload, received_metadata: metadata, - log_text: "Consumer #{obj.name}.perform method failed to process "\ + log_text: "Consumer #{obj.class.name}.perform method failed to process "\ "received_message with #{e.inspect}" }, context, correlation_id, event_id)) raise e From fbc1458ad13c5a57ff849a64bb53ad0e2d0d2e42 Mon Sep 17 00:00:00 2001 From: Marcelo Shiba Date: Wed, 17 Aug 2022 15:09:17 -0300 Subject: [PATCH 9/9] :rewind: Revert changes made in pipefymessage.rb --- bin/pipefymessage | 6 ------ 1 file changed, 6 deletions(-) diff --git a/bin/pipefymessage b/bin/pipefymessage index b69879f..1ae323c 100755 --- a/bin/pipefymessage +++ b/bin/pipefymessage @@ -4,7 +4,6 @@ require "rubygems" require "singleton" require "thor" -require "pipefy_message" module PipefyMessage # CLI @@ -25,8 +24,6 @@ module PipefyMessage # Runner class Runner include Singleton - include PipefyMessage::Logging - def initialize_rails # Adapted from: https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/cli.rb @@ -38,9 +35,6 @@ module PipefyMessage require File.expand_path("config/application.rb") require File.expand_path("config/environment.rb") end - - # Override rails logger by gem logger - ::Rails.logger = logger end def write_pid(options)