Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix shutdown deadlock issue v14 #1010

Merged
merged 3 commits into from
May 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -219,19 +219,19 @@ def write(metadata_and_data, bulk: false, enqueue: false)
errors << e
end
end
operated_chunks.clear if errors.empty?

@stage_size += staged_bytesize

if errors.size > 0
log.warn "error occurs in committing chunks: only first one raised", errors: errors.map(&:class)
raise errors.first
end
rescue
ensure
operated_chunks.each do |chunk|
chunk.rollback rescue nil # nothing possible to do for #rollback failure
chunk.mon_exit rescue nil # this may raise ThreadError for chunks already committed
end
raise
end
end

Expand Down Expand Up @@ -365,6 +365,7 @@ def write_once(metadata, data, bulk: false, &block)
adding_bytesize = nil

chunk = synchronize { @stage[metadata] ||= generate_chunk(metadata) }
enqueue_list = []

chunk.synchronize do
# retry this method if chunk is already queued (between getting chunk and entering critical section)
Expand Down Expand Up @@ -402,7 +403,7 @@ def write_once(metadata, data, bulk: false, &block)
elsif bulk
# this metadata might be enqueued already by other threads
# but #enqueue_chunk does nothing in such case
enqueue_chunk(metadata)
enqueue_list << metadata
raise ShouldRetry
end
end
Expand All @@ -412,6 +413,9 @@ def write_once(metadata, data, bulk: false, &block)
write_step_by_step(metadata, data, data.size / 3, &block)
end
rescue ShouldRetry
enqueue_list.each do |metadata|
enqueue_chunk(metadata)
end
retry
end

Expand Down
3 changes: 2 additions & 1 deletion lib/fluent/plugin_helper/thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def thread_create(title)
raise
ensure
unless thread_exit
log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title, error: $!
log.warn "thread doesn't exit correctly (killed or other reason)", plugin: self.class, title: title, thread: ::Thread.current, error: $!
end
@_threads_mutex.synchronize do
@_threads.delete(::Thread.current.object_id)
Expand Down Expand Up @@ -132,6 +132,7 @@ def terminate
super
@_threads_mutex.synchronize{ @_threads.keys }.each do |obj_id|
thread = @_threads[obj_id]
log.warn "killing existing thead", thread: thread
thread.kill if thread
end
@_threads_mutex.synchronize{ @_threads.keys }.each do |obj_id|
Expand Down