diff --git a/CHANGELOG.md b/CHANGELOG.md index 888c911..1e9c639 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,8 @@ -## 3.2.2 +## 3.2.3 + - No longer use 'trace' log level as it breaks rspec + - Fix race conditions in timeout enforcer + +## 3.2.3 - Move one log message from info to debug to avoid noise ## 3.2.1 diff --git a/lib/logstash/filters/grok/timeout_enforcer.rb b/lib/logstash/filters/grok/timeout_enforcer.rb index 3919149..1e05231 100644 --- a/lib/logstash/filters/grok/timeout_enforcer.rb +++ b/lib/logstash/filters/grok/timeout_enforcer.rb @@ -14,24 +14,21 @@ def initialize(logger, timeout_nanos) def grok_till_timeout(event, grok, field, value) begin - thread = Thread.current + thread = java.lang.Thread.currentThread() start_thread_groking(thread) yield - rescue ::LogStash::Filters::Grok::TimeoutException => e - # These fields aren't present at the time the exception was raised - # so we add them here. - # We could store this metadata in the @threads_to_start_time hash - # but that'd come at a perf cost and this works just as well. - e.grok = grok - e.field = field - e.value = value - raise e + rescue InterruptedRegexpError => e + raise ::LogStash::Filters::Grok::TimeoutException.new(grok, field, value) ensure stop_thread_groking(thread) + # Clear any interrupts from any previous invocations that were not caught by Joni + thread.interrupted end end def start_thread_groking(thread) + # Clear any interrupts from any previous invocations that were not caught by Joni + thread.interrupted @timer_mutex.synchronize do @threads_to_start_time[thread] = java.lang.System.nanoTime() end @@ -50,7 +47,10 @@ def cancel_timed_out! elapsed = java.lang.System.nanoTime - start_time if elapsed > @timeout_nanos elapsed_millis = elapsed / 1000 - thread.raise(::LogStash::Filters::Grok::TimeoutException.new(elapsed_millis)) + thread.interrupt() + # Ensure that we never attempt to cancel this thread twice in the event + # of weird races + stop_thread_groking(thread) end end end diff --git a/lib/logstash/filters/grok/timeout_exception.rb b/lib/logstash/filters/grok/timeout_exception.rb index 6b7c78c..803d108 100644 --- a/lib/logstash/filters/grok/timeout_exception.rb +++ b/lib/logstash/filters/grok/timeout_exception.rb @@ -1,8 +1,7 @@ class LogStash::Filters::Grok::TimeoutException < Exception - attr_accessor :elapsed_millis, :grok, :field, :value + attr_reader :grok, :field, :value - def initialize(elapsed_millis, grok=nil, field=nil, value=nil) - @elapsed_millis = elapsed_millis + def initialize(grok=nil, field=nil, value=nil) @field = field @value = value @grok = grok @@ -20,4 +19,3 @@ def trunc_value end end end - diff --git a/logstash-filter-grok.gemspec b/logstash-filter-grok.gemspec index 3bb6c05..7bdb2b8 100644 --- a/logstash-filter-grok.gemspec +++ b/logstash-filter-grok.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-filter-grok' - s.version = '3.2.2' + s.version = '3.2.3' s.licenses = ['Apache License (2.0)'] s.summary = "Parse arbitrary text and structure it." s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" @@ -27,4 +27,3 @@ Gem::Specification.new do |s| s.add_development_dependency 'logstash-devutils' end - diff --git a/spec/filters/grok_spec.rb b/spec/filters/grok_spec.rb index aada2db..d2e9d60 100644 --- a/spec/filters/grok_spec.rb +++ b/spec/filters/grok_spec.rb @@ -412,14 +412,14 @@ def pattern_path(path) filter { grok { match => { - message => "(.*a){30}" + message => "(.*a){40}" } timeout_millis => 100 } } CONFIG - sample "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" do + sample "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" do expect(subject.get("tags")).to include("_groktimeout") expect(subject.get("tags")).not_to include("_grokparsefailure") end