diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d04693..ee407dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ +## 3.0.10 + - Deps: unpin rufus scheduler [#20](https://github.com/logstash-plugins/logstash-output-cloudwatch/pull/20) + - Fix: an old undefined method error which would surface with load (as queue fills up) + ## 3.0.9 -- Fix: dropped usage of SHUTDOWN event deprecated since Logstash 5.0 [#18](https://github.com/logstash-plugins/logstash-output-cloudwatch/pull/18) + - Fix: dropped usage of SHUTDOWN event deprecated since Logstash 5.0 [#18](https://github.com/logstash-plugins/logstash-output-cloudwatch/pull/18) ## 3.0.8 - Docs: Set the default_codec doc attribute. diff --git a/lib/logstash/outputs/cloudwatch.rb b/lib/logstash/outputs/cloudwatch.rb index f59532d..307b740 100644 --- a/lib/logstash/outputs/cloudwatch.rb +++ b/lib/logstash/outputs/cloudwatch.rb @@ -3,6 +3,8 @@ require "logstash/namespace" require "logstash/plugin_mixins/aws_config" +require "rufus/scheduler" + # This output lets you aggregate and send metric data to AWS CloudWatch # # ==== Summary: @@ -154,28 +156,34 @@ class LogStash::Outputs::CloudWatch < LogStash::Outputs::Base # `add_field => [ "CW_dimensions", "prod" ]` config :field_dimensions, :validate => :string, :default => "CW_dimensions" + attr_reader :event_queue + public def register require "thread" - require "rufus/scheduler" require "aws-sdk" @cw = Aws::CloudWatch::Client.new(aws_options_hash) @event_queue = SizedQueue.new(@queue_size) @scheduler = Rufus::Scheduler.new - @job = @scheduler.every @timeframe do + @job = @scheduler.schedule_every @timeframe do @logger.debug("Scheduler Activated") publish(aggregate({})) end end # def register + # Rufus::Scheduler >= 3.4 moved the Time impl into a gem EoTime = ::EtOrbi::EoTime` + # Rufus::Scheduler 3.1 - 3.3 using it's own Time impl `Rufus::Scheduler::ZoTime` + RufusTimeImpl = defined?(Rufus::Scheduler::EoTime) ? Rufus::Scheduler::EoTime : + (defined?(Rufus::Scheduler::ZoTime) ? Rufus::Scheduler::ZoTime : ::Time) + public def receive(event) return unless (event.get(@field_metricname) || @metricname) if (@event_queue.length >= @event_queue.max) - @job.trigger + @job.trigger RufusTimeImpl.now @logger.warn("Posted to AWS CloudWatch ahead of schedule. If you see this often, consider increasing the cloudwatch queue_size option.") end diff --git a/logstash-output-cloudwatch.gemspec b/logstash-output-cloudwatch.gemspec index 15c3e9a..e61c535 100644 --- a/logstash-output-cloudwatch.gemspec +++ b/logstash-output-cloudwatch.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-cloudwatch' - s.version = '3.0.9' + s.version = '3.0.10' s.licenses = ['Apache License (2.0)'] s.summary = "Aggregates and sends metric data to AWS CloudWatch" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" @@ -22,7 +22,7 @@ Gem::Specification.new do |s| # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency 'logstash-mixin-aws', '>= 1.0.0' - s.add_runtime_dependency 'rufus-scheduler', [ '~> 3.0.9' ] + s.add_runtime_dependency 'rufus-scheduler', '>= 3.0.9' s.add_development_dependency 'logstash-devutils' end diff --git a/spec/outputs/cloudwatch_spec.rb b/spec/outputs/cloudwatch_spec.rb index b98420c..96ab79b 100644 --- a/spec/outputs/cloudwatch_spec.rb +++ b/spec/outputs/cloudwatch_spec.rb @@ -1,18 +1,38 @@ require "logstash/devutils/rspec/spec_helper" -require "logstash/plugin" -require "logstash/json" +require "logstash/outputs/cloudwatch" describe "outputs/cloudwatch" do + let(:config) { { 'metricname' => 'foo' } } - output = LogStash::Plugin.lookup("output", "cloudwatch").new + subject(:plugin) { LogStash::Outputs::CloudWatch.new(config) } it "should register" do - expect {output.register}.to_not raise_error + expect { plugin.register }.to_not raise_error end it "should respond correctly to a receive call" do + plugin.register event = LogStash::Event.new - expect { output.receive(event) }.to_not raise_error + expect { plugin.receive(event) }.to_not raise_error + end + + context 'with queue_size' do + + let(:queue_size) { 100 } + + let(:config) { super().merge('queue_size' => queue_size) } + + it "triggers job ahead of time" do + plugin.register + event_queue = plugin.event_queue + allow( event_queue ).to receive(:length).and_return queue_size # emulate full queue + expect( plugin ).to receive(:publish) + + event = LogStash::Event.new + plugin.receive(event) + sleep 1.0 # allow scheduler to kick in + end + end end