diff --git a/.gitignore b/.gitignore index 90f7941..a237f68 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ vendor terraform.tfvars terraform.tfstate* .terraform* +.idea/* diff --git a/CHANGELOG.md b/CHANGELOG.md index bd8ca87..c84e6d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +# 2.1.0 + - added option to configure polling frequency and number of messages to fetch in one batch to be able to manage AWS SQS cost + # 2.0.0 - added support for Logstash 6.x. diff --git a/lib/logstash/inputs/s3sqs.rb b/lib/logstash/inputs/s3sqs.rb index 56d438b..3a4e9cb 100644 --- a/lib/logstash/inputs/s3sqs.rb +++ b/lib/logstash/inputs/s3sqs.rb @@ -83,6 +83,10 @@ class LogStash::Inputs::S3SQS < LogStash::Inputs::Threadable MAX_TIME_BEFORE_GIVING_UP = 60 EVENT_SOURCE = 'aws:s3' EVENT_TYPE = 'ObjectCreated' + DEFAULT_POLLING_FREQUENCY = 1 + MAX_MESSAGES_TO_FETCH = 1 + SENT_TIMESTAMP = "SentTimestamp" + SQS_ATTRIBUTES = [SENT_TIMESTAMP] config_name "s3sqs" @@ -91,6 +95,10 @@ class LogStash::Inputs::S3SQS < LogStash::Inputs::Threadable # Name of the SQS Queue to pull messages from. Note that this is just the name of the queue, not the URL or ARN. config :queue, :validate => :string, :required => true + config :max_messages, :validate => :number, :default => MAX_MESSAGES_TO_FETCH + + config :polling_frequency, :validate => :number, :default => DEFAULT_POLLING_FREQUENCY + attr_reader :poller attr_reader :s3 @@ -111,24 +119,21 @@ def setup_queue def polling_options { - # we will query 1 message at a time, so we can ensure correct error handling if we can't download a single file correctly - # (we will throw :skip_delete if download size isn't correct to process the event again later - # -> set a reasonable "Default Visibility Timeout" for your queue, so that there's enough time to process the log files) - :max_number_of_messages => 1, - # we will use the queue's setting, a good value is 10 seconds - # (to ensure fast logstash shutdown on the one hand and few api calls on the other hand) - :wait_time_seconds => nil, + :max_number_of_messages => @max_messages, + :attribute_names => SQS_ATTRIBUTES, + :wait_time_seconds => @polling_frequency, + :skip_delete => true } end def handle_message(message, queue) - hash = JSON.parse message.body + event = JSON.parse message.body # there may be test events sent from the s3 bucket which won't contain a Records array, # we will skip those events and remove them from queue - if hash['Records'] then + if event['Records'] then # typically there will be only 1 record per event, but since it is an array we will # treat it as if there could be more records - hash['Records'].each do |record| + event['Records'].each do |record| # in case there are any events with Records that aren't s3 object-created events and can't therefore be # processed by this plugin, we will skip them and remove them from queue if record['eventSource'] == EVENT_SOURCE and record['eventName'].start_with?(EVENT_TYPE) then @@ -140,7 +145,7 @@ def handle_message(message, queue) ) rescue => e @logger.warn("issuing :skip_delete on failed download", :bucket => record['s3']['bucket']['name'], :object => record['s3']['object']['key'], :error => e) - throw :skip_delete + return false end # verify downloaded content size if response.content_length == record['s3']['object']['size'] then @@ -168,32 +173,32 @@ def handle_message(message, queue) end rescue => e @logger.warn("issuing :skip_delete on failed plain text processing", :bucket => record['s3']['bucket']['name'], :object => record['s3']['object']['key'], :error => e) - throw :skip_delete + return false end # otherwise try again later else @logger.warn("issuing :skip_delete on wrong download content size", :bucket => record['s3']['bucket']['name'], :object => record['s3']['object']['key'], :download_size => response.content_length, :expected => record['s3']['object']['size']) - throw :skip_delete + return false end end end end + true end def run(queue) - # ensure we can stop logstash correctly - poller.before_request do |stats| - if stop? then - @logger.warn("issuing :stop_polling on stop?", :queue => @queue) - # this can take up to "Receive Message Wait Time" (of the sqs queue) seconds to be recognized - throw :stop_polling - end - end - # poll a message and process it run_with_backoff do - poller.poll(polling_options) do |message| - handle_message(message, queue) + poller.poll(polling_options) do |messages, stats| + break if stop? + messages.each do |message| + if handle_message(message, queue) + poller.delete_message(message) + end + end + @logger.debug("SQS Stats:", :request_count => stats.request_count, + :received_message_count => stats.received_message_count, + :last_message_received_at => stats.last_message_received_at) if @logger.debug? end end end diff --git a/logstash-input-s3sqs.gemspec b/logstash-input-s3sqs.gemspec index 7784265..39e8a91 100644 --- a/logstash-input-s3sqs.gemspec +++ b/logstash-input-s3sqs.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-input-s3sqs' - s.version = '2.0.0' + s.version = '2.1.0' s.licenses = ['Apache-2.0'] s.summary = "Get logs from AWS s3 buckets as issued by an object-created event via sqs." s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"