Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds 'additional_settings' option to configure the underlying AWS client #61

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.2.0
andsel marked this conversation as resolved.
Show resolved Hide resolved
- Feature: Add `additional_settings` option to fine-grain configuration of AWS client [#61](https://github.com/logstash-plugins/logstash-input-sqs/pull/61)

## 3.2.0
- Feature: Add `queue_owner_aws_account_id` parameter for cross-account queues [#60](https://github.com/logstash-plugins/logstash-input-sqs/pull/60)

Expand Down
23 changes: 23 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
|=======================================================================
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-access_key_id>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-additional_settings>> |<<hash,hash>>|No
| <<plugins-{type}s-{plugin}-aws_credentials_file>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-endpoint>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-id_field>> |<<string,string>>|No
Expand Down Expand Up @@ -121,6 +122,28 @@ This plugin uses the AWS SDK and supports several ways to get credentials, which
4. Environment variables `AMAZON_ACCESS_KEY_ID` and `AMAZON_SECRET_ACCESS_KEY`
5. IAM Instance Profile (available when running inside EC2)

[id="plugins-{type}s-{plugin}-additional_settings"]
===== `additional_settings`

* Value type is <<hash,hash>>
* Default value is `{}`

Key-value pairs of settings and corresponding values used to parametrize
the connection to SQS. See full list in https://docs.aws.amazon.com/sdk-for-ruby/v2/api/Aws/SQS/Client.html[the AWS SDK documentation]. Example:

[source,ruby]
input {
sqs {
access_key_id => "1234"
secret_access_key => "secret"
queue => "logstash-test-queue"
additional_settings => {
force_path_style => true
follow_redirects => false
}
}
}

[id="plugins-{type}s-{plugin}-aws_credentials_file"]
===== `aws_credentials_file`

Expand Down
24 changes: 23 additions & 1 deletion lib/logstash/inputs/sqs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class LogStash::Inputs::SQS < LogStash::Inputs::Threadable

default :codec, "json"

config :additional_settings, :validate => :hash, :default => {}

# Name of the SQS Queue name to pull messages from. Note that this is just the name of the queue, not the URL or ARN.
config :queue, :validate => :string, :required => true

Expand Down Expand Up @@ -116,7 +118,8 @@ def queue_url(aws_sqs_client)
end

def setup_queue
aws_sqs_client = Aws::SQS::Client.new(aws_options_hash)
options = symbolized_settings.merge(aws_options_hash || {})
aws_sqs_client = Aws::SQS::Client.new(options)
@poller = Aws::SQS::QueuePoller.new(queue_url(aws_sqs_client), :client => aws_sqs_client)
rescue Aws::SQS::Errors::ServiceError, Seahorse::Client::NetworkingError => e
@logger.error("Cannot establish connection to Amazon SQS", exception_details(e))
Expand Down Expand Up @@ -195,4 +198,23 @@ def exception_details(e, sleep_time = nil)
details
end

def symbolized_settings
@symbolized_settings ||= symbolize_keys_and_cast_true_false(@additional_settings)
end

def symbolize_keys_and_cast_true_false(hash)
case hash
when Hash
symbolized = {}
hash.each { |key, value| symbolized[key.to_sym] = symbolize_keys_and_cast_true_false(value) }
symbolized
when 'true'
true
when 'false'
false
else
hash
end
end

end # class LogStash::Inputs::SQS
4 changes: 2 additions & 2 deletions logstash-input-sqs.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|
s.name = 'logstash-input-sqs'
s.version = '3.2.0'
s.licenses = ['Apache License (2.0)']
s.version = '3.3.0'
s.licenses = ['Apache-2.0']
s.summary = "Pulls events from an Amazon Web Services Simple Queue Service queue"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
s.authors = ["Elastic"]
Expand Down
36 changes: 36 additions & 0 deletions spec/inputs/sqs_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,42 @@
end
end

describe "additional_settings" do
context "supported settings" do
let(:config) {
{
"additional_settings" => { "force_path_style" => 'true', "ssl_verify_peer" => 'false', "profile" => 'logstash' },
"queue" => queue_name
}
}

it 'should instantiate Aws::SQS clients with force_path_style set' do
expect(Aws::SQS::Client).to receive(:new).and_return(mock_sqs)
# mock a remote call to retrieve the queue URL
expect(mock_sqs).to receive(:get_queue_url).with({ :queue_name => queue_name }).and_return({:queue_url => queue_url })
expect(subject).to receive(:symbolized_settings) do |arg|
expect(arg).to include({:force_path_style => true, :ssl_verify_peer => false, :profile => 'logstash'})
end.and_call_original

expect { subject.register }.not_to raise_error
end
end

context "unsupported settings" do
let(:config) {
{
"additional_settings" => { "stub_responses" => 'true', "invalid_option" => "invalid" },
"queue" => queue_name
}
}

it 'must fail with ArgumentError' do
expect {subject.register}.to raise_error(ArgumentError, /invalid_option/)
end
end

end

context "when interrupting the plugin" do
before do
expect(Aws::SQS::Client).to receive(:new).and_return(mock_sqs)
Expand Down