Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new multiplex option to logstash-output-sqs plugin #38

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 57 additions & 9 deletions lib/logstash/outputs/sqs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,41 @@
#
# ==== Batch Publishing
# This output publishes messages to SQS in batches in order to optimize event
# throughput and increase performance. This is done using the
# throughput, increase performance, and decrease costs. There are two different
# ways that this can be done.
#
# The default is to use SQS's native batching facilities, specifically the
# [`SendMessageBatch`](http://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessageBatch.html)
# API. When publishing messages to SQS in batches, the following service limits
# must be respected (see
# [Limits in Amazon SQS](http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-messages.html)):
#
# * The maximum number of messages per API call is 10.
# * The maximum allowed individual message size is 256KiB.
# * The maximum total payload size (i.e. the sum of the sizes of all
# individual messages within a batch) is also 256KiB.
#
# This plugin will dynamically adjust the size of the batch published to SQS in
# order to ensure that the total payload size does not exceed 256KiB.
# An alternate method of batching is to multiplex multiple Logstash messages
# inside each SQS message.
#
# The advantage of this is that you can potentially transmit many times more
# Logstash messages per SQS API call. If your Logstash messages are small
# (average <10Kib), then Using SQS's native batching is inefficient, since you
# can't send more than 10 messages per batch. E.g., if you're sending
# line-oriented log data, your messages may average less than 1Kib per message,
# or less than 10Kib total per SQS API call. With multiplexing, you can fit up
# to 256Kib per SQS API call, which would net you a best-case 25x savings on
# your SQS bill.
#
# Multiplexing is currently only available with the "json" codec; attempting
# to use it with other codecs will fall back to SQS SendMessageBatch batching.
# Any Logstash input plugin using the standard "json" codec will properly
# unpack multiplexed message batches; if you are using other software to
# consume from the queue, it may need to be modified.
#
# Whichever method of batching is used, this plugin will dynamically adjust the
# size of the batch published to SQS in order to ensure that the total payload
# size does not exceed the hard SQS limit of 256KiB.
#
# WARNING: This output cannot currently handle messages larger than 256KiB. Any
# single message exceeding this size will be dropped.
Expand All @@ -78,6 +101,9 @@ class LogStash::Outputs::SQS < LogStash::Outputs::Base
# http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/limits-messages.html.
config :message_max_size, :validate => :bytes, :default => '256KiB'

# Batch messages via multiplexing
config :multiplex, :validate => :boolean, :default => false

# The name of the target SQS queue. Note that this is just the name of the
# queue, not the URL or ARN.
config :queue, :validate => :string, :required => true
Expand All @@ -90,11 +116,20 @@ class LogStash::Outputs::SQS < LogStash::Outputs::Base
def register
@sqs = Aws::SQS::Client.new(aws_options_hash)

if @batch_events > 10
raise LogStash::ConfigurationError, 'The maximum batch size is 10 events'
if @multiplex && @codec.config_name != "json"
@logger.warn('Multiplex batching is only supported for the json codec; falling back to SQS batching.')
@multiplex = false
end

if @multiplex
@logger.warn("Multiplex batching is enabled. Reading from this queue with input codecs other than 'json' may have unexpected results.")
end

if !@multiplex && @batch_events > 10
raise LogStash::ConfigurationError, 'The maximum batch size is 10 events when multiplexing is not in use'
elsif @batch_events < 1
raise LogStash::ConfigurationError, 'The batch size must be greater than 0'
end
end

begin
params = { queue_name: @queue }
Expand Down Expand Up @@ -133,7 +168,10 @@ def multi_receive_encoded_batch(encoded_events)
next
end

if entries.size >= @batch_events or (bytes + encoded.bytesize) > @message_max_size
# Size computation: in the multiplexing case, we create a JSON array,
# so we need two extra bytes for the enclosing "[]" and one extra byte
# per message for the separator ",".
if entries.size >= @batch_events or (bytes + encoded.bytesize + 2 + entries.size) > @message_max_size
send_message_batch(entries)

bytes = 0
Expand Down Expand Up @@ -163,7 +201,17 @@ def multi_receive_encoded_single(encoded_events)

private
def send_message_batch(entries)
@logger.debug("Publishing #{entries.size} messages to SQS", :queue_url => @queue_url, :entries => entries)
@sqs.send_message_batch(:queue_url => @queue_url, :entries => entries)
if !@multiplex || entries.size < 2
@logger.debug("Publishing #{entries.size} messages to SQS", :queue_url => @queue_url, :entries => entries)
@sqs.send_message_batch(:queue_url => @queue_url, :entries => entries)
else
msgs = []
entries.each do |msg|
msgs.push(msg[:message_body])
end
multiplexed_msg_body = "[" + msgs.join(",") + "]"
@logger.debug("Publishing #{entries.size} messages to SQS, multiplexed into a single SQS message", :queue_url => @queue_url, :message_body => multiplexed_msg_body)
@sqs.send_message(:queue_url => @queue_url, :message_body => multiplexed_msg_body)
end
end
end