Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SQS message user attributes #23

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions docs/input-sqs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-session_token>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-threads>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-use_aws_bundled_ca>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-user_attributes_field>> |<<string,string>>|No
|=======================================================================

Also see <<plugins-{type}s-{plugin}-common-options>> for a list of options supported by all
Expand All @@ -111,7 +112,7 @@ input plugins.
&nbsp;

[id="plugins-{type}s-{plugin}-access_key_id"]
===== `access_key_id`
===== `access_key_id`

* Value type is <<string,string>>
* There is no default value for this setting.
Expand Down Expand Up @@ -147,7 +148,7 @@ the connection to SQS. See full list in https://docs.aws.amazon.com/sdk-for-ruby
}

[id="plugins-{type}s-{plugin}-aws_credentials_file"]
===== `aws_credentials_file`
===== `aws_credentials_file`

* Value type is <<string,string>>
* There is no default value for this setting.
Expand All @@ -174,39 +175,39 @@ This is useful when connecting to S3 compatible services, but beware that these
guaranteed to work correctly with the AWS SDK.

[id="plugins-{type}s-{plugin}-id_field"]
===== `id_field`
===== `id_field`

* Value type is <<string,string>>
* There is no default value for this setting.

Name of the event field in which to store the SQS message ID

[id="plugins-{type}s-{plugin}-md5_field"]
===== `md5_field`
===== `md5_field`

* Value type is <<string,string>>
* There is no default value for this setting.

Name of the event field in which to store the SQS message MD5 checksum

[id="plugins-{type}s-{plugin}-polling_frequency"]
===== `polling_frequency`
===== `polling_frequency`

* Value type is <<number,number>>
* Default value is `20`

Polling frequency, default is 20 seconds

[id="plugins-{type}s-{plugin}-proxy_uri"]
===== `proxy_uri`
===== `proxy_uri`

* Value type is <<string,string>>
* There is no default value for this setting.

URI to proxy server if required

[id="plugins-{type}s-{plugin}-queue"]
===== `queue`
===== `queue`

* This is a required setting.
* Value type is <<string,string>>
Expand All @@ -215,15 +216,15 @@ URI to proxy server if required
Name of the SQS Queue name to pull messages from. Note that this is just the name of the queue, not the URL or ARN.

[id="plugins-{type}s-{plugin}-queue_owner_aws_account_id"]
===== `queue_owner_aws_account_id`
===== `queue_owner_aws_account_id`

* Value type is <<string,string>>
* There is no default value for this setting.

ID of the AWS account owning the queue if you want to use a https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-basic-examples-of-sqs-policies.html#grant-two-permissions-to-one-account[cross-account queue] with embedded policy. Note that AWS SDK only support numerical account ID and not account aliases.

[id="plugins-{type}s-{plugin}-region"]
===== `region`
===== `region`

* Value type is <<string,string>>
* Default value is `"us-east-1"`
Expand All @@ -249,35 +250,42 @@ See the https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html[
Session name to use when assuming an IAM role.

[id="plugins-{type}s-{plugin}-secret_access_key"]
===== `secret_access_key`
===== `secret_access_key`

* Value type is <<string,string>>
* There is no default value for this setting.

The AWS Secret Access Key

[id="plugins-{type}s-{plugin}-sent_timestamp_field"]
===== `sent_timestamp_field`
===== `sent_timestamp_field`

* Value type is <<string,string>>
* There is no default value for this setting.

Name of the event field in which to store the SQS message Sent Timestamp

[id="plugins-{type}s-{plugin}-session_token"]
===== `session_token`
===== `session_token`

* Value type is <<string,string>>
* There is no default value for this setting.

The AWS Session token for temporary credential

[id="plugins-{type}s-{plugin}-threads"]
===== `threads`
===== `threads`

* Value type is <<number,number>>
* Default value is `1`

[id="plugins-{type}s-{plugin}-user_attributes_field"]
===== `user_attributes_field`

* Value type is <<string,string>>
* Default value is `"user_attributes"`

Name of the event field in which to store the SQS message User Attributes

[id="plugins-{type}s-{plugin}-use_aws_bundled_ca"]
===== `use_aws_bundled_ca`
Expand Down
26 changes: 24 additions & 2 deletions lib/logstash/inputs/sqs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ class LogStash::Inputs::SQS < LogStash::Inputs::Threadable
# Name of the event field in which to store the SQS message Sent Timestamp
config :sent_timestamp_field, :validate => :string

# Name of the event field in which to store the SQS message User Attributes
config :user_attributes_field, :validate => :string, :default => "user_attributes"

# Polling frequency, default is 20 seconds
config :polling_frequency, :validate => :number, :default => DEFAULT_POLLING_FREQUENCY

Expand Down Expand Up @@ -122,18 +125,37 @@ def setup_queue
end

def polling_options
{
{
:max_number_of_messages => MAX_MESSAGES_TO_FETCH,
:attribute_names => SQS_ATTRIBUTES,
:wait_time_seconds => @polling_frequency
}
end

def decode_sqs_user_attributes(attributes)
@logger.debug("Decoding user attributes", :user_attributes => attributes)
decoded_attributes = {}
attributes.each do |name, definition|
case definition.data_type
when "String"
attribute_value = definition.string_value
when "Binary"
attribute_value = definition.binary_value
when "Number"
attribute_value = BigDecimal.new(definition.string_value)
else
raise 'Unsupported SQS Message attribute data type'
end
decoded_attributes[name] = attribute_value
end
return decoded_attributes
end

def add_sqs_data(event, message)
event.set(@id_field, message.message_id) if @id_field
event.set(@md5_field, message.md5_of_body) if @md5_field
event.set(@sent_timestamp_field, convert_epoch_to_timestamp(message.attributes[SENT_TIMESTAMP])) if @sent_timestamp_field
event
event.set(@user_attributes_field, decode_sqs_user_attributes(message.message_attributes)) if !message.message_attributes.nil?
end

def handle_message(message, output_queue)
Expand Down
2 changes: 1 addition & 1 deletion logstash-integration-aws.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Gem::Specification.new do |s|
s.test_files = s.files.grep(%r{^(test|spec|features)/})


s.add_runtime_dependency "logstash-core-plugin-api", ">= 2.1.12", "<= 2.99"
s.add_runtime_dependency "logstash-core-plugin-api"
s.add_runtime_dependency "concurrent-ruby"
s.add_runtime_dependency "logstash-codec-json"
s.add_runtime_dependency "logstash-codec-plain"
Expand Down
19 changes: 11 additions & 8 deletions spec/inputs/sqs_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

let(:input) { LogStash::Inputs::SQS.new(config) }
let(:decoded_message) { { "bonjour" => "awesome" } }
let(:encoded_message) { double("sqs_message", :body => LogStash::Json::dump(decoded_message)) }
let(:encoded_message) { double("sqs_message", :body => LogStash::Json::dump(decoded_message), :message_attributes => {}) }

subject { input }

Expand Down Expand Up @@ -110,8 +110,8 @@

# We have to make sure we create a bunch of events
# so we actually really try to stop the plugin.
#
# rspec's `and_yield` allow you to define a fix amount of possible
#
# rspec's `and_yield` allow you to define a fix amount of possible
# yielded values and doesn't allow you to create infinite loop.
# And since we are actually creating thread we need to make sure
# we have enough work to keep the thread working until we kill it..
Expand All @@ -136,14 +136,16 @@ def poll(polling_options = {})
let(:md5_of_body) { "dr strange" }
let(:sent_timestamp) { LogStash::Timestamp.new }
let(:epoch_timestamp) { (sent_timestamp.utc.to_f * 1000).to_i }
let(:message_attributes) { }

let(:id_field) { "my_id_field" }
let(:md5_field) { "my_md5_field" }
let(:sent_timestamp_field) { "my_sent_timestamp_field" }
let(:user_attributes_field) { "my_user_attributes_field" }

let(:message) do
double("message", :message_id => message_id, :md5_of_body => md5_of_body, :attributes => { LogStash::Inputs::SQS::SENT_TIMESTAMP => epoch_timestamp } )
end
double("message", :message_id => message_id, :md5_of_body => md5_of_body, :attributes => { LogStash::Inputs::SQS::SENT_TIMESTAMP => epoch_timestamp }, :message_attributes => message_attributes )
end

subject { input.add_sqs_data(event, message) }

Expand All @@ -156,7 +158,8 @@ def poll(polling_options = {})
"queue" => queue_name,
"id_field" => id_field,
"md5_field" => md5_field,
"sent_timestamp_field" => sent_timestamp_field
"sent_timestamp_field" => sent_timestamp_field,
"user_attributes_field" => user_attributes_field
}
end

Expand Down Expand Up @@ -198,7 +201,7 @@ def poll(polling_options = {})
end

context "receiving messages" do
before do
before do
expect(subject).to receive(:poller).and_return(mock_sqs).at_least(:once)
end

Expand Down Expand Up @@ -235,7 +238,7 @@ def poll(polling_options = {})
it "retry to fetch messages" do
# change the poller implementation to raise SQS errors.
had_error = false

# actually using the child of `Object` to do an expectation of `#sleep`
expect(subject).to receive(:sleep).with(LogStash::Inputs::SQS::BACKOFF_SLEEP_TIME)
expect(mock_sqs).to receive(:poll).with(anything()).at_most(2) do
Expand Down