Skip to content

Commit

Permalink
Fix race conditions in TimeoutEnforcer by properly interrupting
Browse files Browse the repository at this point in the history
Previously we could see errors as in logstash-plugins#95 due to some very esoteric
race conditions where Thread#raise would raise outside of the rescue
context. This patch changes the mechanism to be setting Thread.interrupt
which is more robust.
  • Loading branch information
andrewvc committed Oct 13, 2016
1 parent 8a2e855 commit 7ede04b
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 20 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
## 3.2.2
## 3.2.3
- No longer use 'trace' log level as it breaks rspec
- Fix race conditions in timeout enforcer

## 3.2.3
- Move one log message from info to debug to avoid noise

## 3.2.1
Expand Down
22 changes: 11 additions & 11 deletions lib/logstash/filters/grok/timeout_enforcer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,21 @@ def initialize(logger, timeout_nanos)

def grok_till_timeout(event, grok, field, value)
begin
thread = Thread.current
thread = java.lang.Thread.currentThread()
start_thread_groking(thread)
yield
rescue ::LogStash::Filters::Grok::TimeoutException => e
# These fields aren't present at the time the exception was raised
# so we add them here.
# We could store this metadata in the @threads_to_start_time hash
# but that'd come at a perf cost and this works just as well.
e.grok = grok
e.field = field
e.value = value
raise e
rescue InterruptedRegexpError => e
raise ::LogStash::Filters::Grok::TimeoutException.new(grok, field, value)
ensure
stop_thread_groking(thread)
# Clear any interrupts from any previous invocations that were not caught by Joni
thread.interrupted
end
end

def start_thread_groking(thread)
# Clear any interrupts from any previous invocations that were not caught by Joni
thread.interrupted
@timer_mutex.synchronize do
@threads_to_start_time[thread] = java.lang.System.nanoTime()
end
Expand All @@ -50,7 +47,10 @@ def cancel_timed_out!
elapsed = java.lang.System.nanoTime - start_time
if elapsed > @timeout_nanos
elapsed_millis = elapsed / 1000
thread.raise(::LogStash::Filters::Grok::TimeoutException.new(elapsed_millis))
thread.interrupt()
# Ensure that we never attempt to cancel this thread twice in the event
# of weird races
stop_thread_groking(thread)
end
end
end
Expand Down
6 changes: 2 additions & 4 deletions lib/logstash/filters/grok/timeout_exception.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
class LogStash::Filters::Grok::TimeoutException < Exception
attr_accessor :elapsed_millis, :grok, :field, :value
attr_reader :grok, :field, :value

def initialize(elapsed_millis, grok=nil, field=nil, value=nil)
@elapsed_millis = elapsed_millis
def initialize(grok=nil, field=nil, value=nil)
@field = field
@value = value
@grok = grok
Expand All @@ -20,4 +19,3 @@ def trunc_value
end
end
end

3 changes: 1 addition & 2 deletions logstash-filter-grok.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-filter-grok'
s.version = '3.2.2'
s.version = '3.2.3'
s.licenses = ['Apache License (2.0)']
s.summary = "Parse arbitrary text and structure it."
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -27,4 +27,3 @@ Gem::Specification.new do |s|

s.add_development_dependency 'logstash-devutils'
end

4 changes: 2 additions & 2 deletions spec/filters/grok_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -412,14 +412,14 @@ def pattern_path(path)
filter {
grok {
match => {
message => "(.*a){30}"
message => "(.*a){40}"
}
timeout_millis => 100
}
}
CONFIG

sample "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" do
sample "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" do
expect(subject.get("tags")).to include("_groktimeout")
expect(subject.get("tags")).not_to include("_grokparsefailure")
end
Expand Down

0 comments on commit 7ede04b

Please sign in to comment.