Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented polling frequency and max message behavior #5

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ vendor
terraform.tfvars
terraform.tfstate*
.terraform*
.idea/*
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# 2.1.0
- added option to configure polling frequency and number of messages to fetch in one batch to be able to manage AWS SQS cost

# 2.0.0
- added support for Logstash 6.x.

Expand Down
53 changes: 29 additions & 24 deletions lib/logstash/inputs/s3sqs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ class LogStash::Inputs::S3SQS < LogStash::Inputs::Threadable
MAX_TIME_BEFORE_GIVING_UP = 60
EVENT_SOURCE = 'aws:s3'
EVENT_TYPE = 'ObjectCreated'
DEFAULT_POLLING_FREQUENCY = 1
MAX_MESSAGES_TO_FETCH = 1
SENT_TIMESTAMP = "SentTimestamp"
SQS_ATTRIBUTES = [SENT_TIMESTAMP]

config_name "s3sqs"

Expand All @@ -91,6 +95,10 @@ class LogStash::Inputs::S3SQS < LogStash::Inputs::Threadable
# Name of the SQS Queue to pull messages from. Note that this is just the name of the queue, not the URL or ARN.
config :queue, :validate => :string, :required => true

config :max_messages, :validate => :number, :default => MAX_MESSAGES_TO_FETCH

config :polling_frequency, :validate => :number, :default => DEFAULT_POLLING_FREQUENCY

attr_reader :poller
attr_reader :s3

Expand All @@ -111,24 +119,21 @@ def setup_queue

def polling_options
{
# we will query 1 message at a time, so we can ensure correct error handling if we can't download a single file correctly
# (we will throw :skip_delete if download size isn't correct to process the event again later
# -> set a reasonable "Default Visibility Timeout" for your queue, so that there's enough time to process the log files)
:max_number_of_messages => 1,
# we will use the queue's setting, a good value is 10 seconds
# (to ensure fast logstash shutdown on the one hand and few api calls on the other hand)
:wait_time_seconds => nil,
:max_number_of_messages => @max_messages,
:attribute_names => SQS_ATTRIBUTES,
:wait_time_seconds => @polling_frequency,
:skip_delete => true
}
end

def handle_message(message, queue)
hash = JSON.parse message.body
event = JSON.parse message.body
# there may be test events sent from the s3 bucket which won't contain a Records array,
# we will skip those events and remove them from queue
if hash['Records'] then
if event['Records'] then
# typically there will be only 1 record per event, but since it is an array we will
# treat it as if there could be more records
hash['Records'].each do |record|
event['Records'].each do |record|
# in case there are any events with Records that aren't s3 object-created events and can't therefore be
# processed by this plugin, we will skip them and remove them from queue
if record['eventSource'] == EVENT_SOURCE and record['eventName'].start_with?(EVENT_TYPE) then
Expand All @@ -140,7 +145,7 @@ def handle_message(message, queue)
)
rescue => e
@logger.warn("issuing :skip_delete on failed download", :bucket => record['s3']['bucket']['name'], :object => record['s3']['object']['key'], :error => e)
throw :skip_delete
return false
end
# verify downloaded content size
if response.content_length == record['s3']['object']['size'] then
Expand Down Expand Up @@ -168,32 +173,32 @@ def handle_message(message, queue)
end
rescue => e
@logger.warn("issuing :skip_delete on failed plain text processing", :bucket => record['s3']['bucket']['name'], :object => record['s3']['object']['key'], :error => e)
throw :skip_delete
return false
end
# otherwise try again later
else
@logger.warn("issuing :skip_delete on wrong download content size", :bucket => record['s3']['bucket']['name'], :object => record['s3']['object']['key'],
:download_size => response.content_length, :expected => record['s3']['object']['size'])
throw :skip_delete
return false
end
end
end
end
true
end

def run(queue)
# ensure we can stop logstash correctly
poller.before_request do |stats|
if stop? then
@logger.warn("issuing :stop_polling on stop?", :queue => @queue)
# this can take up to "Receive Message Wait Time" (of the sqs queue) seconds to be recognized
throw :stop_polling
end
end
# poll a message and process it
run_with_backoff do
poller.poll(polling_options) do |message|
handle_message(message, queue)
poller.poll(polling_options) do |messages, stats|
break if stop?
messages.each do |message|
if handle_message(message, queue)
poller.delete_message(message)
end
end
@logger.debug("SQS Stats:", :request_count => stats.request_count,
:received_message_count => stats.received_message_count,
:last_message_received_at => stats.last_message_received_at) if @logger.debug?
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion logstash-input-s3sqs.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-input-s3sqs'
s.version = '2.0.0'
s.version = '2.1.0'
s.licenses = ['Apache-2.0']
s.summary = "Get logs from AWS s3 buckets as issued by an object-created event via sqs."
s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"
Expand Down