-
Notifications
You must be signed in to change notification settings - Fork 40
/
sqs.rb
203 lines (174 loc) · 6.76 KB
/
sqs.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# encoding: utf-8
#
require "logstash/inputs/threadable"
require "logstash/namespace"
require "logstash/timestamp"
require "logstash/plugin_mixins/aws_config"
require "logstash/errors"
require 'logstash/inputs/sqs/patch'
# Forcibly load all modules marked to be lazily loaded.
#
# It is recommended that this is called prior to launching threads. See
# https://aws.amazon.com/blogs/developer/threading-with-the-aws-sdk-for-ruby/.
Aws.eager_autoload!
# Pull events from an Amazon Web Services Simple Queue Service (SQS) queue.
#
# SQS is a simple, scalable queue system that is part of the
# Amazon Web Services suite of tools.
#
# Although SQS is similar to other queuing systems like AMQP, it
# uses a custom API and requires that you have an AWS account.
# See http://aws.amazon.com/sqs/ for more details on how SQS works,
# what the pricing schedule looks like and how to setup a queue.
#
# To use this plugin, you *must*:
#
# * Have an AWS account
# * Setup an SQS queue
# * Create an identify that has access to consume messages from the queue.
#
# The "consumer" identity must have the following permissions on the queue:
#
# * `sqs:ChangeMessageVisibility`
# * `sqs:ChangeMessageVisibilityBatch`
# * `sqs:DeleteMessage`
# * `sqs:DeleteMessageBatch`
# * `sqs:GetQueueAttributes`
# * `sqs:GetQueueUrl`
# * `sqs:ListQueues`
# * `sqs:ReceiveMessage`
#
# Typically, you should setup an IAM policy, create a user and apply the IAM policy to the user.
# A sample policy is as follows:
# [source,json]
# {
# "Statement": [
# {
# "Action": [
# "sqs:ChangeMessageVisibility",
# "sqs:ChangeMessageVisibilityBatch",
# "sqs:GetQueueAttributes",
# "sqs:GetQueueUrl",
# "sqs:ListQueues",
# "sqs:SendMessage",
# "sqs:SendMessageBatch"
# ],
# "Effect": "Allow",
# "Resource": [
# "arn:aws:sqs:us-east-1:123456789012:Logstash"
# ]
# }
# ]
# }
#
# See http://aws.amazon.com/iam/ for more details on setting up AWS identities.
#
class LogStash::Inputs::SQS < LogStash::Inputs::Threadable
include LogStash::PluginMixins::AwsConfig::V2
MAX_TIME_BEFORE_GIVING_UP = 60
MAX_MESSAGES_TO_FETCH = 10 # Between 1-10 in the AWS-SDK doc
SENT_TIMESTAMP = "SentTimestamp"
SQS_ATTRIBUTES = [SENT_TIMESTAMP]
BACKOFF_SLEEP_TIME = 1
BACKOFF_FACTOR = 2
DEFAULT_POLLING_FREQUENCY = 20
config_name "sqs"
default :codec, "json"
config :additional_settings, :validate => :hash, :default => {}
# Name of the SQS Queue name to pull messages from. Note that this is just the name of the queue, not the URL or ARN.
config :queue, :validate => :string, :required => true
# Account ID of the AWS account which owns the queue.
config :queue_owner_aws_account_id, :validate => :string, :required => false
# Name of the event field in which to store the SQS message ID
config :id_field, :validate => :string
# Name of the event field in which to store the SQS message MD5 checksum
config :md5_field, :validate => :string
# Name of the event field in which to store the SQS message Sent Timestamp
config :sent_timestamp_field, :validate => :string
# Polling frequency, default is 20 seconds
config :polling_frequency, :validate => :number, :default => DEFAULT_POLLING_FREQUENCY
attr_reader :poller
def register
require "aws-sdk"
@logger.info("Registering SQS input", :queue => @queue, :queue_owner_aws_account_id => @queue_owner_aws_account_id)
setup_queue
end
def queue_url(aws_sqs_client)
if @queue_owner_aws_account_id
return aws_sqs_client.get_queue_url({:queue_name => @queue, :queue_owner_aws_account_id => @queue_owner_aws_account_id})[:queue_url]
else
return aws_sqs_client.get_queue_url(:queue_name => @queue)[:queue_url]
end
end
def setup_queue
aws_sqs_client = Aws::SQS::Client.new(aws_options_hash || {})
poller = Aws::SQS::QueuePoller.new(queue_url(aws_sqs_client), :client => aws_sqs_client)
poller.before_request { |stats| throw :stop_polling if stop? }
@poller = poller
rescue Aws::SQS::Errors::ServiceError, Seahorse::Client::NetworkingError => e
@logger.error("Cannot establish connection to Amazon SQS", exception_details(e))
raise LogStash::ConfigurationError, "Verify the SQS queue name and your credentials"
end
def polling_options
{
:max_number_of_messages => MAX_MESSAGES_TO_FETCH,
:attribute_names => SQS_ATTRIBUTES,
:wait_time_seconds => @polling_frequency
}
end
def add_sqs_data(event, message)
event.set(@id_field, message.message_id) if @id_field
event.set(@md5_field, message.md5_of_body) if @md5_field
event.set(@sent_timestamp_field, convert_epoch_to_timestamp(message.attributes[SENT_TIMESTAMP])) if @sent_timestamp_field
event
end
def handle_message(message, output_queue)
@codec.decode(message.body) do |event|
add_sqs_data(event, message)
decorate(event)
output_queue << event
end
end
def run(output_queue)
@logger.debug("Polling SQS queue", :polling_options => polling_options)
run_with_backoff do
poller.poll(polling_options) do |messages, stats|
break if stop?
messages.each {|message| handle_message(message, output_queue) }
@logger.debug("SQS Stats:", :request_count => stats.request_count,
:received_message_count => stats.received_message_count,
:last_message_received_at => stats.last_message_received_at) if @logger.debug?
end
end
end
private
# Runs an AWS request inside a Ruby block with an exponential backoff in case
# we experience a ServiceError.
#
# @param [Block] block Ruby code block to execute.
def run_with_backoff(&block)
sleep_time = BACKOFF_SLEEP_TIME
begin
block.call
rescue Aws::SQS::Errors::ServiceError, Seahorse::Client::NetworkingError => e
@logger.warn("SQS error ... retrying with exponential backoff", exception_details(e, sleep_time))
sleep_time = backoff_sleep(sleep_time)
retry
end
end
def backoff_sleep(sleep_time)
sleep(sleep_time)
sleep_time > MAX_TIME_BEFORE_GIVING_UP ? sleep_time : sleep_time * BACKOFF_FACTOR
end
def convert_epoch_to_timestamp(time)
LogStash::Timestamp.at(time.to_i / 1000)
end
def exception_details(e, sleep_time = nil)
details = { :queue => @queue, :exception => e.class, :message => e.message }
details[:code] = e.code if e.is_a?(Aws::SQS::Errors::ServiceError) && e.code
details[:cause] = e.original_error if e.respond_to?(:original_error) && e.original_error # Seahorse::Client::NetworkingError
details[:sleep_time] = sleep_time if sleep_time
details[:backtrace] = e.backtrace if @logger.debug?
details
end
end # class LogStash::Inputs::SQS