Skip to content
This repository has been archived by the owner on Apr 5, 2023. It is now read-only.

Commit

Permalink
Merge pull request #39 from pipefy/ef-main/improvement/improve-error-…
Browse files Browse the repository at this point in the history
…logs

Improvement: Make error logs more granular and detailed
  • Loading branch information
mhshiba authored Aug 17, 2022
2 parents d9d8e72 + fbc1458 commit be54b8e
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 96 deletions.
21 changes: 16 additions & 5 deletions lib/pipefy_message/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,23 @@ def process_message
logger.info(log_context({
log_text: "Message received by poller to be processed by consumer",
received_message: payload,
metadata: metadata
received_metadata: metadata
}, context, correlation_id, event_id))

retry_count = metadata["ApproximateReceiveCount"].to_i - 1
obj.perform(payload,
{ retry_count: retry_count, context: context, correlation_id: correlation_id })

begin
obj.perform(payload,
{ retry_count: retry_count, context: context, correlation_id: correlation_id })
rescue StandardError => e
logger.error(log_context({
received_message: payload,
received_metadata: metadata,
log_text: "Consumer #{obj.class.name}.perform method failed to process "\
"received_message with #{e.inspect}"
}, context, correlation_id, event_id))
raise e
end

elapsed_time_ms = Process.clock_gettime(Process::CLOCK_MONOTONIC, :millisecond) - start
logger.info(log_context({
Expand All @@ -94,14 +105,14 @@ def process_message
"in #{elapsed_time_ms} milliseconds"
}, context, correlation_id, event_id))
end
rescue PipefyMessage::Providers::Errors::ResourceError => e
rescue StandardError => e
context = "NO_CONTEXT_RETRIEVED" unless defined? context
correlation_id = "NO_CID_RETRIEVED" unless defined? correlation_id
event_id = "NO_EVENT_ID_RETRIEVED" unless defined? event_id
# this shows up in multiple places; OK or DRY up?

logger.error(log_context({
log_text: "Failed to process message; details: #{e.inspect}"
log_text: "Queue polling failed with #{e.inspect}"
}, context, correlation_id, event_id))
raise e
end
Expand Down
3 changes: 2 additions & 1 deletion lib/pipefy_message/logging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ def self.logger_setup
logger.level = LOG_LEVELS.index(ENV.fetch("ASYNC_LOG_LEVEL", "INFO")) || Logger::ERROR

logger.formatter = proc do |severity, datetime, progname, msg|
msg_hash = msg.is_a?(Hash) ? msg : { log_text: msg }
{ time: datetime.to_s,
level: severity.to_s,
program_name: progname.to_s,
context: "async_processing",
data: msg }.to_json + $INPUT_RECORD_SEPARATOR
data: msg_hash }.to_json + $INPUT_RECORD_SEPARATOR
end
end
end
Expand Down
39 changes: 32 additions & 7 deletions lib/pipefy_message/providers/aws_client/sns_broker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ def initialize(_opts = {})
@topic_arn_prefix = ENV.fetch("AWS_SNS_ARN_PREFIX", "arn:aws:sns:us-east-1:000000000000:")
@is_staging = ENV["ASYNC_APP_ENV"] == "staging"
rescue StandardError => e
raise PipefyMessage::Providers::Errors::ResourceError, e.message
msg = "Failed to initialize AWS SNS broker with #{e.inspect}"

logger.error({
log_text: msg
})

raise PipefyMessage::Providers::Errors::ResourceError, msg
end

##
Expand All @@ -31,13 +37,27 @@ def initialize(_opts = {})
def publish(payload, topic_name, context, correlation_id, event_id)
message = prepare_payload(payload)
topic_arn = @topic_arn_prefix + (@is_staging ? "#{topic_name}-staging" : topic_name)
topic = @sns.topic(topic_arn)

begin
topic = @sns.topic(topic_arn)
rescue StandardError => e
msg = "Resolving AWS SNS topic #{topic_name} failed with #{e.inspect}"

logger.error({
topic_name: topic_name,
topic_arn: topic_arn,
log_text: msg
})

raise PipefyMessage::Providers::Errors::ResourceError, msg
end

logger.info(log_context(
{
topic_name: topic_name,
topic_arn: topic_arn,
payload: payload,
log_text: "Attempting to publish a json message to topic #{topic_arn}"
log_text: "Attempting to publish message to SNS topic #{topic_name}"
},
context, correlation_id, event_id
))
Expand All @@ -61,9 +81,11 @@ def publish(payload, topic_name, context, correlation_id, event_id)

logger.info(log_context(
{
topic_name: topic_name,
topic_arn: topic_arn,
message_id: result.message_id,
log_text: "Message published"
aws_message_id: result.message_id,
payload: payload,
log_text: "Message published to SNS topic #{topic_name}"
},
context, correlation_id, event_id
))
Expand All @@ -77,12 +99,15 @@ def publish(payload, topic_name, context, correlation_id, event_id)

logger.error(log_context(
{
topic_name: topic_name,
topic_arn: topic_arn,
log_text: "Failed to publish message",
error_details: e.inspect
payload: payload,
log_text: "AWS SNS broker failed to publish to topic #{topic_name} with #{e.inspect}"
},
context, correlation_id, event_id
))

raise e
end

private
Expand Down
67 changes: 38 additions & 29 deletions lib/pipefy_message/providers/aws_client/sqs_broker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,35 @@ def initialize(opts = {})
AwsClient.aws_setup

@sqs = Aws::SQS::Client.new

@is_staging = ENV["ASYNC_APP_ENV"] == "staging"

@queue_url = handle_queue_protocol(@sqs.get_queue_url({ queue_name: handle_queue_name(@config[:queue_name]) })
.queue_url)

@poller = Aws::SQS::QueuePoller.new(@queue_url, { client: @sqs })

logger.debug({
queue_url: @queue_url,
log_text: "SQS client created"
logger.debug(log_queue_info({ log_text: "SQS client created" }))
rescue Aws::SQS::Errors::QueueDoesNotExist, Aws::SQS::Errors::NonExistentQueue
logger.error({
queue_name: @config[:queue_name],
log_text: "Failed to initialize AWS SQS broker: the specified queue "\
"(#{@config[:queue_name]}) does not exist"
})

raise PipefyMessage::Providers::Errors::QueueDoesNotExist,
"The specified AWS SQS queue #{@config[:queue_name]} does not exist"
rescue StandardError => e
raise PipefyMessage::Providers::Errors::ResourceError, e.message
msg = "Failed to initialize AWS SQS broker with #{e.inspect}"
logger.error({
queue_name: @config[:queue_name],
log_text: msg
})
raise PipefyMessage::Providers::Errors::ResourceError, msg
end

##
# Initiates SQS queue polling, with wait_time_seconds as given
# in the initial configuration.
def poller
logger.info(merge_log_hash({
log_text: "Initiating SQS polling on queue #{@queue_url}"
}))
logger.info(log_queue_info({ log_text: "Initiating SQS polling on queue #{@queue_url}" }))

@poller.poll({ wait_time_seconds: @config[:wait_time_seconds],
message_attribute_names: ["All"], attribute_names: ["All"] }) do |received_message|
Expand All @@ -48,31 +54,33 @@ def poller
correlation_id = metadata[:correlationId]
event_id = metadata[:eventId]
# We're extracting those again in the consumer
# process_message module. I considered whether these
# process_message method. I considered whether these
# should perhaps be `yield`ed instead, but I guess
# this is not the bad kind of repetition.

logger.debug(
merge_log_hash(log_context({
log_text: "Message received by SQS poller"
}, context, correlation_id, event_id))
log_queue_info(log_context({ received_message: payload,
received_metadata: metadata,
log_text: "Message received by SQS poller" },
context, correlation_id, event_id))
)
yield(payload, metadata)
rescue StandardError => e
# error in the routine, skip delete to try the message again later with 30sec of delay

context = "NO_CONTEXT_RETRIEVED" unless defined? context
correlation_id = "NO_CID_RETRIEVED" unless defined? correlation_id
event_id = "NO_EVENT_ID_RETRIEVED" unless defined? event_id
# this shows up in multiple places; OK or DRY up?

logger.error(
merge_log_hash(log_context({
queue_url: @queue_url,
log_text: "Failed to consume message; details: #{e.inspect}"
}, context, correlation_id, event_id))
)
if defined? received_message
# This would probably only be the case if a malformed and
# thus unparseable message is received (eg: in case of
# breaking changes in SQS)
logger.error(log_queue_info({
received_message: received_message,
log_text: "Consuming received_message failed with #{e.inspect}"
}))
else
logger.error(log_queue_info({
log_text: "SQS polling failed with #{e.inspect}"
}))
end

# error in the routine, skip delete to try the message again later with 30sec of delay
throw e if e.instance_of?(NameError)

throw :skip_delete
Expand All @@ -90,8 +98,9 @@ def default_options

##
# Adds the queue name to logs, if not already present.
def merge_log_hash(log_hash)
{ queue_name: @config[:queue_name] }.merge(log_hash)
def log_queue_info(log_hash)
{ queue_name: @config[:queue_name],
queue_url: @queue_url }.merge(log_hash)
end

##
Expand Down
53 changes: 11 additions & 42 deletions lib/pipefy_message/providers/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,61 +8,30 @@ module Providers
# as common error messages.
module Errors
##
# Enables automatic error logging when prepended to a custom error
# class.
#
# In order for this module to work, the prepending class must
# inherit from the Ruby Exception class. Note that this condition
# is satisfied by any custom error class that inherits from
# StandardError and its children, such as RuntimeError, given that
# StandardError is itself a child of Exception.
#
# The reason to use prepend rather than include is to make this
# module come below the error class in the inheritance chain.
# This ensures that, when an error is raised and its constructor
# is called, this module's initialize method gets called first,
# then calls the original constructor with super, and calls the
# logger once initialization is done. This effectively "wraps"
# the error initialize method in the logging constructor below.
module LoggingError
prepend PipefyMessage::Logging
# Abstraction for service and networking errors.
class ResourceError < RuntimeError
end

def initialize(msg = nil)
##
# Abstraction for errors caused by nonexisting queues, such as
# Aws::SQS::Errors::QueueDoesNotExist.
class QueueDoesNotExist < ResourceError
def initialize(msg = "The specified queue does not exist")
super
logger.error({
error_class: self.class,
error_message: message,
stack_trace: full_message
})

# message and full_message are methods provided by the
# Ruby Exception class.
# The hash keys used above were an attempt to provide more
# descriptive names than those of the original methods (:P),
# but this has still led to some confusion, so, to be more
# explicit: if e is an instance of Exception (or its
# children), e.message returns the error message for e and
# e.full_message provides the full stack trace. This is
# what's being included in the logs above. Logging inside
# the constructor ensures information is logged as soon
# as the error is raised.
# For details, please refer to the official documentation for
# the Exception class.
end
end

##
# Abstraction for service and networking errors.
class ResourceError < RuntimeError
prepend PipefyMessage::Providers::Errors::LoggingError
# Abstraction for provider authorization errors, such as
# Aws::SNS::Errors::AuthorizationError.
class AuthorizationError < RuntimeError
end

##
# To be raised when an invalid value is passed as an option
# to a method call (eg: if a queueing service client is
# initialized with an invalid queue identifier).
class InvalidOption < ArgumentError
prepend PipefyMessage::Providers::Errors::LoggingError
end

# Error messages:
Expand Down
20 changes: 19 additions & 1 deletion lib/pipefy_message/publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,17 @@ class Publisher
def initialize(broker = "aws", broker_opts = {})
@broker = broker
@broker_opts = broker_opts
@publisher_instance = build_publisher_instance
begin
@publisher_instance = build_publisher_instance
rescue StandardError => e
logger.error({
broker: broker,
broker_opts: broker_opts,
log_text: "Failed to initialize #{broker} broker with #{e.inspect}"
})

raise e
end
end

def publish(message, topic, context = nil, correlation_id = nil)
Expand All @@ -20,6 +30,14 @@ def publish(message, topic, context = nil, correlation_id = nil)
event_id = SecureRandom.uuid.to_s

@publisher_instance.publish(message, topic, context, correlation_id, event_id)
rescue StandardError => e
logger.error(log_context({
topic_name: topic,
message: message,
log_text: "Failed to publish message to topic #{topic} with #{e.inspect}"
}, context, correlation_id, event_id))

raise e
end

private
Expand Down
Loading

0 comments on commit be54b8e

Please sign in to comment.