Skip to content

Commit

Permalink
Merge pull request #1396 from fluent/fix-to-exit-during-buffer-queue-…
Browse files Browse the repository at this point in the history
…is-blocking

fix to break infinite-loop when shutdown goes on
  • Loading branch information
tagomoris authored Jan 5, 2017
2 parents c121222 + d3a77e6 commit 9068e0f
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 1 deletion.
17 changes: 17 additions & 0 deletions example/in_dummy_blocks.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<source>
@type dummy
tag dummy
rate 100
dummy {"message":"yaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaay"}
</source>

<match dummy>
@type null
never_flush true
<buffer>
@type memory
overflow_action block
chunk_limit_size 1k
total_limit_size 2k
</buffer>
</match>
6 changes: 6 additions & 0 deletions lib/fluent/plugin/out_null.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ class NullOutput < Output
# This plugin is for tests of non-buffered/buffered plugins
Fluent::Plugin.register_output('null', self)

desc "The parameter for testing to simulate output plugin which never succeed to flush."
config_param :never_flush, :bool, default: false

config_section :buffer do
config_set_default :chunk_keys, ['tag']
config_set_default :flush_at_shutdown, true
Expand All @@ -44,16 +47,19 @@ def initialize
end

def process(tag, es)
raise "failed to flush" if @never_flush
# Do nothing
end

def write(chunk)
raise "failed to flush" if @never_flush
if @feed_proc
@feed_proc.call(chunk)
end
end

def try_write(chunk)
raise "failed to flush" if @never_flush
if @feed_proc
@feed_proc.call(chunk)
end
Expand Down
8 changes: 7 additions & 1 deletion lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -782,13 +782,19 @@ def write_guard(&block)
begin
block.call
rescue Fluent::Plugin::Buffer::BufferOverflowError
log.warn "failed to write data into buffer by buffer overflow"
log.warn "failed to write data into buffer by buffer overflow", action: @buffer_config.overflow_action
case @buffer_config.overflow_action
when :throw_exception
raise
when :block
log.debug "buffer.write is now blocking"
until @buffer.storable?
if self.stopped?
log.error "breaking block behavior to shutdown Fluentd"
# to break infinite loop to exit Fluentd process
raise
end
log.trace "sleeping until buffer can store more data"
sleep 1
end
log.debug "retrying buffer.write after blocked operation"
Expand Down

0 comments on commit 9068e0f

Please sign in to comment.