diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index c151c6c741..6de12cf099 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -365,6 +365,7 @@ def write_once(metadata, data, bulk: false, &block) adding_bytesize = nil chunk = synchronize { @stage[metadata] ||= generate_chunk(metadata) } + enqueue_list = [] chunk.synchronize do # retry this method if chunk is already queued (between getting chunk and entering critical section) @@ -402,7 +403,7 @@ def write_once(metadata, data, bulk: false, &block) elsif bulk # this metadata might be enqueued already by other threads # but #enqueue_chunk does nothing in such case - enqueue_chunk(metadata) + enqueue_list << metadata raise ShouldRetry end end @@ -412,6 +413,9 @@ def write_once(metadata, data, bulk: false, &block) write_step_by_step(metadata, data, data.size / 3, &block) end rescue ShouldRetry + enqueue_list.each do |metadata| + enqueue_chunk(metadata) + end retry end