Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffer: Add queued_chunks_limit_size to control the number of queued chunks #1916

Merged
merged 3 commits into from
Apr 3, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
# if chunk size (or records) is 95% or more after #write, then that chunk will be enqueued
config_param :chunk_full_threshold, :float, default: DEFAULT_CHUNK_FULL_THRESHOLD

desc 'The max number of queued chunks.'
config_param :queued_chunks_limit_size, :integer, default: nil

desc 'Compress buffered data.'
config_param :compress, :enum, list: [:text, :gzip], default: :text

Expand Down Expand Up @@ -367,6 +370,10 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
end
end

def queue_full?
@queued_chunks_limit_size && (synchronize { @queue.size } >= @queued_chunks_limit_size)
end

def queued_records
synchronize { @queue.reduce(0){|r, chunk| r + chunk.size } }
end
Expand Down Expand Up @@ -426,6 +433,7 @@ def enqueue_all

if block_given?
synchronize{ @stage.keys }.each do |metadata|
return if queue_full?
# NOTE: The following line might cause data race depending on Ruby implementations except CRuby
# cf. https://github.com/fluent/fluentd/pull/1721#discussion_r146170251
chunk = @stage[metadata]
Expand All @@ -435,6 +443,7 @@ def enqueue_all
end
else
synchronize{ @stage.keys }.each do |metadata|
return if queue_full?
enqueue_chunk(metadata)
end
end
Expand Down
62 changes: 62 additions & 0 deletions test/plugin/test_output_as_buffered_retries.rb
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,68 @@ def get_log_time(msg, logs)
assert{ @i.buffer.stage.size == 1 }
assert{ chunks.all?{|c| c.empty? } }
end

test 'output plugin limits queued chunks via queued_chunks_limit_size' do
chunk_key = 'tag'
hash = {
'flush_interval' => 1,
'flush_thread_burst_interval' => 0.1,
'retry_randomize' => false,
'retry_max_times' => 7,
'queued_chunks_limit_size' => 2,
}
@i.configure(config_element('ROOT','',{},[config_element('buffer',chunk_key,hash)]))
@i.register(:prefer_buffered_processing) { true }
@i.register(:format) { |tag,time,record| [tag,time.to_i,record].to_json + "\n" }
@i.register(:write) { |chunk| raise "yay, your #write must fail" }
@i.start
@i.after_start

@i.interrupt_flushes

now = Time.parse('2016-04-13 18:33:30 -0700')
Timecop.freeze(now)

@i.emit_events("test.tag.1", dummy_event_stream())

now = Time.parse('2016-04-13 18:33:31 -0700')
Timecop.freeze(now)

@i.emit_events("test.tag.2", dummy_event_stream())

@i.enqueue_thread_wait
@i.flush_thread_wakeup
waiting(4) { Thread.pass until @i.write_count > 0 && @i.num_errors > 0 }

assert { @i.buffer.queue.size > 0 }
assert { @i.buffer.queue.first.metadata.tag == 'test.tag.1' }

assert { @i.write_count > 0 }
assert { @i.num_errors > 0 }

prev_write_count = @i.write_count
prev_num_errors = @i.num_errors

chunks = @i.buffer.queue.dup

20.times do |i| # large times enough
now = @i.next_flush_time

Timecop.freeze(now)
@i.enqueue_thread_wait
@i.flush_thread_wakeup
waiting(4) { Thread.pass until @i.write_count > prev_write_count && @i.num_errors > prev_num_errors }

@i.emit_events("test.tag.1", dummy_event_stream())
assert { @i.buffer.queue.size <= 2 }
assert { @i.buffer.stage.size == 1 } # all new data is stored into staged chunk

break if @i.buffer.queue.size == 0

prev_write_count = @i.write_count
prev_num_errors = @i.num_errors
end
end
end

sub_test_case 'bufferd output for retries with periodical retry' do
Expand Down