From 88b372e17592e17386541054b74861338ea5e374 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 22 Apr 2016 16:38:58 +0900 Subject: [PATCH 1/5] Event streams should return # of records * to enable limits by # of records for buffer chunks * to be able to visualize data flow in fluentd --- lib/fluent/event.rb | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/lib/fluent/event.rb b/lib/fluent/event.rb index f1797a3d1b..09e226e992 100644 --- a/lib/fluent/event.rb +++ b/lib/fluent/event.rb @@ -21,6 +21,10 @@ class EventStream include Enumerable include MessagePackFactory::Mixin + def records + raise NotImplementedError, "DO NOT USE THIS CLASS directly." + end + def repeatable? false end @@ -29,7 +33,8 @@ def each(&block) raise NotImplementedError, "DO NOT USE THIS CLASS directly." end - def to_msgpack_stream + def to_msgpack_stream(time_int: false) + return to_msgpack_stream_forced_integer if time_int out = msgpack_packer each {|time,record| out.write([time,record]) @@ -57,6 +62,10 @@ def dup OneEventStream.new(@time, @record.dup) end + def records + 1 + end + def repeatable? true end @@ -81,6 +90,10 @@ def dup ArrayEventStream.new(entries) end + def records + @entries.size + end + def repeatable? true end @@ -102,7 +115,7 @@ def each(&block) # # Use this class as below, in loop of data-enumeration: # 1. initialize blank stream: - # streams[tag] ||= MultiEventStream + # streams[tag] ||= MultiEventStream.new # 2. add events # stream[tag].add(time, record) class MultiEventStream < EventStream @@ -119,6 +132,10 @@ def dup es end + def records + @time_array.size + end + def add(time, record) @time_array << time @record_array << record @@ -144,8 +161,13 @@ def each(&block) class MessagePackEventStream < EventStream # Keep cached_unpacker argument for existence plugins - def initialize(data, cached_unpacker = nil) + def initialize(data, records = 0, cached_unpacker = nil) @data = data + @records = records + end + + def records + @records end def repeatable? From 6596c9a7af1832f6d4bc94026bd567d29861b056 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 22 Apr 2016 16:40:56 +0900 Subject: [PATCH 2/5] add API to append(concat) whole event stream to chunks at once * it improves performance * it's needed to implement output plugins like ObjectBufferedOutput of v0.12 --- lib/fluent/plugin/buffer.rb | 53 ++++++- lib/fluent/plugin/buffer/chunk.rb | 5 + lib/fluent/plugin/buffer/file_chunk.rb | 10 ++ lib/fluent/plugin/buffer/memory_chunk.rb | 12 +- test/plugin/test_buffer.rb | 184 ++++++++++++++++++++++- test/plugin/test_buffer_file_chunk.rb | 74 +++++++++ test/plugin/test_buffer_memory_chunk.rb | 72 +++++++++ 7 files changed, 397 insertions(+), 13 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 38fb4a1db0..b7f9a43b9d 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -180,7 +180,7 @@ def emit(metadata, data, force: false) chunk.synchronize do begin chunk.append(data) - if !size_over?(chunk) || force + if !chunk_size_over?(chunk) || force chunk.commit stored = true @stage_size += (chunk.size - original_size) @@ -198,6 +198,49 @@ def emit(metadata, data, force: false) emit_step_by_step(metadata, data) end + def emit_bulk(metadata, bulk, records) + return if bulk.nil? || bulk.empty? + raise BufferOverflowError unless storable? + + stored = false + synchronize do # critical section for buffer (stage/queue) + until stored + chunk = @stage[metadata] + unless chunk + chunk = @stage[metadata] = generate_chunk(metadata) + end + + chunk.synchronize do # critical section for chunk (chunk append/commit/rollback) + begin + empty_chunk = chunk.empty? + chunk.concat(bulk, records) + + if chunk_size_over?(chunk) + if empty_chunk + log.warn "chunk bytes limit exceeds for a bulk event stream: #{bulk.bytesize}bytes" + else + chunk.rollback + enqueue_chunk(metadata) + next + end + end + + chunk.commit + stored = true + @stage_size += bulk.bytesize + if chunk_size_full?(chunk) + enqueue_chunk(metadata) + end + rescue + chunk.rollback + raise + end + end + end + end + nil + end + def queued_records synchronize { @queue.reduce(0){|r, chunk| r + chunk.records } } end @@ -310,10 +353,14 @@ def clear_queue! end end - def size_over?(chunk) + def chunk_size_over?(chunk) chunk.size > @chunk_bytes_limit || (@chunk_records_limit && chunk.records > @chunk_records_limit) end + def chunk_size_full?(chunk) + chunk.size >= @chunk_bytes_limit || (@chunk_records_limit && chunk.records >= @chunk_records_limit) + end + def emit_step_by_step(metadata, data) attempt_records = data.size / 3 @@ -336,7 +383,7 @@ def emit_step_by_step(metadata, data) attempt = data.slice(0, attempt_records) chunk.append(attempt) - if size_over?(chunk) + if chunk_size_over?(chunk) chunk.rollback if attempt_records <= MINIMUM_APPEND_ATTEMPT_RECORDS diff --git a/lib/fluent/plugin/buffer/chunk.rb b/lib/fluent/plugin/buffer/chunk.rb index e094d22075..5780e376ff 100644 --- a/lib/fluent/plugin/buffer/chunk.rb +++ b/lib/fluent/plugin/buffer/chunk.rb @@ -64,6 +64,11 @@ def append(data) raise NotImplementedError, "Implement this method in child class" end + # for event streams which is packed or zipped (and we want not to unpack/uncompress) + def concat(bulk, records) + raise NotImplementedError, "Implement this method in child class" + end + def commit raise NotImplementedError, "Implement this method in child class" end diff --git a/lib/fluent/plugin/buffer/file_chunk.rb b/lib/fluent/plugin/buffer/file_chunk.rb index 6f4615543c..f9f4e78789 100644 --- a/lib/fluent/plugin/buffer/file_chunk.rb +++ b/lib/fluent/plugin/buffer/file_chunk.rb @@ -74,6 +74,16 @@ def append(data) true end + def concat(bulk, records) + raise "BUG: appending to non-staged chunk, now '#{@state}'" unless @state == :staged + + bulk.force_encoding(Encoding::ASCII_8BIT) + @chunk.write bulk + @adding_bytes += bulk.bytesize + @adding_records += records + true + end + def commit write_metadata # this should be at first: of course, this operation may fail diff --git a/lib/fluent/plugin/buffer/memory_chunk.rb b/lib/fluent/plugin/buffer/memory_chunk.rb index 7368b927c2..a8589d2b6a 100644 --- a/lib/fluent/plugin/buffer/memory_chunk.rb +++ b/lib/fluent/plugin/buffer/memory_chunk.rb @@ -22,20 +22,28 @@ class Buffer class MemoryChunk < Chunk def initialize(metadata) super - @chunk = ''.force_encoding('ASCII-8BIT') + @chunk = ''.force_encoding(Encoding::ASCII_8BIT) @chunk_bytes = 0 @adding_bytes = 0 @adding_records = 0 end def append(data) - adding = data.join.force_encoding('ASCII-8BIT') + adding = data.join.force_encoding(Encoding::ASCII_8BIT) @chunk << adding @adding_bytes += adding.bytesize @adding_records += data.size true end + def concat(bulk, records) + bulk.force_encoding(Encoding::ASCII_8BIT) + @chunk << bulk + @adding_bytes += bulk.bytesize + @adding_records += records + true + end + def commit @records += @adding_records @chunk_bytes += @adding_bytes diff --git a/test/plugin/test_buffer.rb b/test/plugin/test_buffer.rb index c5930a02be..ae6c813ca0 100644 --- a/test/plugin/test_buffer.rb +++ b/test/plugin/test_buffer.rb @@ -26,6 +26,10 @@ def append(data) @append_count += 1 super end + def concat(data, records) + @append_count += 1 + super + end def rollback super @rollbacked = true @@ -610,6 +614,141 @@ def create_chunk(metadata, data) assert target_chunk.rollbacked assert_equal row * 8, target_chunk.read end + + test '#emit_bulk returns immediately if argument data is nil or empty string' do + assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) + assert_equal [@dm2,@dm3], @p.stage.keys + + m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i) + + @p.emit_bulk(m, '', 0) + + assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) + assert_equal [@dm2,@dm3], @p.stage.keys + end + + test '#emit_bulk raises BufferOverflowError if buffer is not storable' do + @p.stage_size = 256 * 1024 * 1024 + @p.queue_size = 256 * 1024 * 1024 + + m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i) + + assert_raise Fluent::Plugin::Buffer::BufferOverflowError do + @p.emit_bulk(m, "x" * 256, 1) + end + end + + test '#emit_bulk stores data into an existing chunk with metadata specified' do + assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) + assert_equal [@dm2,@dm3], @p.stage.keys + + dm3data = @p.stage[@dm3].read.dup + prev_stage_size = @p.stage_size + + assert_equal 1, @p.stage[@dm3].append_count + + @p.emit_bulk(@dm3, ("x"*256 + "y"*256 + "z"*256), 3) + + assert_equal 2, @p.stage[@dm3].append_count + assert_equal (dm3data + ("x" * 256) + ("y" * 256) + ("z" * 256)), @p.stage[@dm3].read + assert_equal (prev_stage_size + 768), @p.stage_size + + assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) + assert_equal [@dm2,@dm3], @p.stage.keys + end + + test '#emit_bulk creates new chunk and store data into it if there are not chunks for specified metadata' do + assert_equal 8 * 1024 * 1024, @p.chunk_bytes_limit + + assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) + assert_equal [@dm2,@dm3], @p.stage.keys + + m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i) + + row = "x" * 1024 * 1024 + row_half = "x" * 1024 * 512 + @p.emit_bulk(m, row*7 + row_half, 8) + + assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) + assert_equal [@dm2,@dm3,m], @p.stage.keys + assert_equal 1, @p.stage[m].append_count + end + + test '#emit_bulk tries to enqueue and store data into a new chunk if existing chunk does not have space for bulk' do + assert_equal 8 * 1024 * 1024, @p.chunk_bytes_limit + + assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) + assert_equal [@dm2,@dm3], @p.stage.keys + + m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i) + + row = "x" * 1024 * 1024 + row_half = "x" * 1024 * 512 + @p.emit_bulk(m, row*7 + row_half, 8) + + assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) + assert_equal [@dm2,@dm3,m], @p.stage.keys + assert_equal 1, @p.stage[m].append_count + + @p.emit_bulk(m, row, 1) + + assert_equal [@dm0,@dm1,@dm1,m], @p.queue.map(&:metadata) + assert_equal [@dm2,@dm3,m], @p.stage.keys + assert_equal 1, @p.stage[m].append_count + assert_equal 1024*1024, @p.stage[m].size + assert_equal 2, @p.queue.last.append_count # 1 -> emit (2) -> rollback&enqueue + assert @p.queue.last.rollbacked + end + + test '#emit_bulk enqueues chunk if it is already full after adding bulk data' do + assert_equal 8 * 1024 * 1024, @p.chunk_bytes_limit + + assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) + assert_equal [@dm2,@dm3], @p.stage.keys + + m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i) + + row = "x" * 1024 * 1024 + @p.emit_bulk(m, row * 8, 8) + + assert_equal [@dm0,@dm1,@dm1,m], @p.queue.map(&:metadata) + assert_equal [@dm2,@dm3], @p.stage.keys + assert_equal 1, @p.queue.last.append_count + end + + test '#emit_bulk rollbacks if commit raises errors' do + assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) + assert_equal [@dm2,@dm3], @p.stage.keys + + m = @p.metadata(timekey: Time.parse('2016-04-11 16:40:00 +0000').to_i) + + row = "x" * 1024 + row_half = "x" * 512 + @p.emit_bulk(m, row * 7 + row_half, 8) + + assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) + assert_equal [@dm2,@dm3,m], @p.stage.keys + + target_chunk = @p.stage[m] + + assert_equal 1, target_chunk.append_count + assert !target_chunk.rollbacked + + (class << target_chunk; self; end).module_eval do + define_method(:commit){ raise "yay" } + end + + assert_raise "yay" do + @p.emit_bulk(m, row, 1) + end + + assert_equal [@dm0,@dm1,@dm1], @p.queue.map(&:metadata) + assert_equal [@dm2,@dm3,m], @p.stage.keys + + assert_equal 2, target_chunk.append_count + assert target_chunk.rollbacked + assert_equal row * 7 + row_half, target_chunk.read + end end sub_test_case 'with configuration for test with lower limits' do @@ -658,17 +797,30 @@ def create_chunk(metadata, data) assert !@p.storable? end - test '#size_over? returns false if chunk size is bigger than limit' do + test '#chunk_size_over? returns true if chunk size is bigger than limit' do m = create_metadata(Time.parse('2016-04-11 16:40:00 +0000').to_i) c1 = create_chunk(m, ["a" * 128] * 8) - assert !@p.size_over?(c1) + assert !@p.chunk_size_over?(c1) c2 = create_chunk(m, ["a" * 128] * 9) - assert @p.size_over?(c2) + assert @p.chunk_size_over?(c2) c3 = create_chunk(m, ["a" * 128] * 8 + ["a"]) - assert @p.size_over?(c3) + assert @p.chunk_size_over?(c3) + end + + test '#chunk_size_full? returns true if chunk size is enough big against limit' do + m = create_metadata(Time.parse('2016-04-11 16:40:00 +0000').to_i) + + c1 = create_chunk(m, ["a" * 128] * 7) + assert !@p.chunk_size_full?(c1) + + c2 = create_chunk(m, ["a" * 128] * 8) + assert @p.chunk_size_full?(c2) + + c3 = create_chunk(m, ["a" * 128] * 7 + ["a" * 127]) + assert !@p.chunk_size_full?(c3) end test '#emit raises BufferChunkOverflowError if incoming data is bigger than chunk bytes limit' do @@ -708,20 +860,36 @@ def create_chunk(metadata, data) @p.start end - test '#size_over? returns false if too many records exists in a chunk even if its bytes is less than limit' do + test '#chunk_size_over? returns true if too many records exists in a chunk even if its bytes is less than limit' do assert_equal 6, @p.chunk_records_limit m = create_metadata(Time.parse('2016-04-11 16:40:00 +0000').to_i) c1 = create_chunk(m, ["a" * 128] * 6) assert_equal 6, c1.records - assert !@p.size_over?(c1) + assert !@p.chunk_size_over?(c1) c2 = create_chunk(m, ["a" * 128] * 7) - assert @p.size_over?(c2) + assert @p.chunk_size_over?(c2) c3 = create_chunk(m, ["a" * 128] * 6 + ["a"]) - assert @p.size_over?(c3) + assert @p.chunk_size_over?(c3) + end + + test '#chunk_size_full? returns true if enough many records exists in a chunk even if its bytes is less than limit' do + assert_equal 6, @p.chunk_records_limit + + m = create_metadata(Time.parse('2016-04-11 16:40:00 +0000').to_i) + + c1 = create_chunk(m, ["a" * 128] * 5) + assert_equal 5, c1.records + assert !@p.chunk_size_full?(c1) + + c2 = create_chunk(m, ["a" * 128] * 6) + assert @p.chunk_size_full?(c2) + + c3 = create_chunk(m, ["a" * 128] * 5 + ["a"]) + assert @p.chunk_size_full?(c3) end end diff --git a/test/plugin/test_buffer_file_chunk.rb b/test/plugin/test_buffer_file_chunk.rb index cbb9e30d04..6911600bf6 100644 --- a/test/plugin/test_buffer_file_chunk.rb +++ b/test/plugin/test_buffer_file_chunk.rb @@ -157,6 +157,37 @@ def gen_chunk_path(prefix, unique_id) assert_equal d4, JSON.parse(ds[3]) end + test 'can #concat, #commit and #read it' do + assert @c.empty? + + d1 = {"f1" => 'v1', "f2" => 'v2', "f3" => 'v3'} + d2 = {"f1" => 'vv1', "f2" => 'vv2', "f3" => 'vv3'} + data = [d1.to_json + "\n", d2.to_json + "\n"].join + @c.concat(data, 2) + @c.commit + + content = @c.read + ds = content.split("\n").select{|d| !d.empty? } + + assert_equal 2, ds.size + assert_equal d1, JSON.parse(ds[0]) + assert_equal d2, JSON.parse(ds[1]) + + d3 = {"f1" => 'x', "f2" => 'y', "f3" => 'z'} + d4 = {"f1" => 'a', "f2" => 'b', "f3" => 'c'} + @c.concat([d3.to_json + "\n", d4.to_json + "\n"].join, 2) + @c.commit + + content = @c.read + ds = content.split("\n").select{|d| !d.empty? } + + assert_equal 4, ds.size + assert_equal d1, JSON.parse(ds[0]) + assert_equal d2, JSON.parse(ds[1]) + assert_equal d3, JSON.parse(ds[2]) + assert_equal d4, JSON.parse(ds[3]) + end + test 'has its contents in binary (ascii-8bit)' do data1 = "aaa bbb ccc".force_encoding('utf-8') @c.append([data1]) @@ -241,6 +272,49 @@ def gen_chunk_path(prefix, unique_id) assert_equal (d1.to_json + "\n" + d2.to_json + "\n"), File.open(@c.path, 'rb'){|f| f.read } end + test 'can #rollback to revert non-committed data from #concat' do + assert @c.empty? + + d1 = {"f1" => 'v1', "f2" => 'v2', "f3" => 'v3'} + d2 = {"f1" => 'vv1', "f2" => 'vv2', "f3" => 'vv3'} + data = [d1.to_json + "\n", d2.to_json + "\n"].join + @c.concat(data, 2) + + assert_equal (d1.to_json + "\n" + d2.to_json + "\n").size, @c.size + assert_equal 2, @c.records + + @c.rollback + + assert @c.empty? + + assert_equal '', File.open(@c.path, 'rb'){|f| f.read } + + d1 = {"f1" => 'v1', "f2" => 'v2', "f3" => 'v3'} + d2 = {"f1" => 'vv1', "f2" => 'vv2', "f3" => 'vv3'} + data = [d1.to_json + "\n", d2.to_json + "\n"] + @c.append(data) + @c.commit + + assert_equal (d1.to_json + "\n" + d2.to_json + "\n").size, @c.size + assert_equal 2, @c.records + + first_size = @c.size + + d3 = {"f1" => 'x', "f2" => 'y', "f3" => 'z'} + d4 = {"f1" => 'a', "f2" => 'b', "f3" => 'c'} + @c.concat([d3.to_json + "\n", d4.to_json + "\n"].join, 2) + + assert_equal first_size + (d3.to_json + "\n" + d4.to_json + "\n").size, @c.size + assert_equal 4, @c.records + + @c.rollback + + assert_equal first_size, @c.size + assert_equal 2, @c.records + + assert_equal (d1.to_json + "\n" + d2.to_json + "\n"), File.open(@c.path, 'rb'){|f| f.read } + end + test 'can store its data by #close' do d1 = {"f1" => 'v1', "f2" => 'v2', "f3" => 'v3'} d2 = {"f1" => 'vv1', "f2" => 'vv2', "f3" => 'vv3'} diff --git a/test/plugin/test_buffer_memory_chunk.rb b/test/plugin/test_buffer_memory_chunk.rb index 62b38ddfd6..19caa06bcf 100644 --- a/test/plugin/test_buffer_memory_chunk.rb +++ b/test/plugin/test_buffer_memory_chunk.rb @@ -47,6 +47,37 @@ class BufferMemoryChunkTest < Test::Unit::TestCase assert_equal d4, JSON.parse(ds[3]) end + test 'can #concat, #commit and #read it' do + assert @c.empty? + + d1 = {"f1" => 'v1', "f2" => 'v2', "f3" => 'v3'} + d2 = {"f1" => 'vv1', "f2" => 'vv2', "f3" => 'vv3'} + data = [d1.to_json + "\n", d2.to_json + "\n"].join + @c.concat(data, 2) + @c.commit + + content = @c.read + ds = content.split("\n").select{|d| !d.empty? } + + assert_equal 2, ds.size + assert_equal d1, JSON.parse(ds[0]) + assert_equal d2, JSON.parse(ds[1]) + + d3 = {"f1" => 'x', "f2" => 'y', "f3" => 'z'} + d4 = {"f1" => 'a', "f2" => 'b', "f3" => 'c'} + @c.concat([d3.to_json + "\n", d4.to_json + "\n"].join, 2) + @c.commit + + content = @c.read + ds = content.split("\n").select{|d| !d.empty? } + + assert_equal 4, ds.size + assert_equal d1, JSON.parse(ds[0]) + assert_equal d2, JSON.parse(ds[1]) + assert_equal d3, JSON.parse(ds[2]) + assert_equal d4, JSON.parse(ds[3]) + end + test 'has its contents in binary (ascii-8bit)' do data1 = "aaa bbb ccc".force_encoding('utf-8') @c.append([data1]) @@ -129,6 +160,47 @@ class BufferMemoryChunkTest < Test::Unit::TestCase assert_equal 2, @c.records end + test 'can #rollback to revert non-committed data from #concat' do + assert @c.empty? + + d1 = {"f1" => 'v1', "f2" => 'v2', "f3" => 'v3'} + d2 = {"f1" => 'vv1', "f2" => 'vv2', "f3" => 'vv3'} + data = [d1.to_json + "\n", d2.to_json + "\n"].join + @c.concat(data, 2) + + assert_equal (d1.to_json + "\n" + d2.to_json + "\n").size, @c.size + assert_equal 2, @c.records + + @c.rollback + + assert @c.empty? + + assert @c.empty? + + d1 = {"f1" => 'v1', "f2" => 'v2', "f3" => 'v3'} + d2 = {"f1" => 'vv1', "f2" => 'vv2', "f3" => 'vv3'} + data = [d1.to_json + "\n", d2.to_json + "\n"] + @c.append(data) + @c.commit + + assert_equal (d1.to_json + "\n" + d2.to_json + "\n").size, @c.size + assert_equal 2, @c.records + + first_size = @c.size + + d3 = {"f1" => 'x', "f2" => 'y', "f3" => 'z'} + d4 = {"f1" => 'a', "f2" => 'b', "f3" => 'c'} + @c.concat([d3.to_json + "\n", d4.to_json + "\n"].join, 2) + + assert_equal first_size + (d3.to_json + "\n" + d4.to_json + "\n").size, @c.size + assert_equal 4, @c.records + + @c.rollback + + assert_equal first_size, @c.size + assert_equal 2, @c.records + end + test 'does nothing for #close' do d1 = {"f1" => 'v1', "f2" => 'v2', "f3" => 'v3'} d2 = {"f1" => 'vv1', "f2" => 'vv2', "f3" => 'vv3'} From 8e56525f228449a3c8a63d9d01148540c5e12fc8 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 22 Apr 2016 17:28:39 +0900 Subject: [PATCH 3/5] add a variation of output plugin, without #format method implementation * This type of output provides the standard format of binary representation of buffer chunk payloads * It is actually equal to results of EventStream#to_msgpack_stream * We can provide some tools to read-and-rescue buffer chunks in failure situations for such chunks * This output style is actually same with ObjectBufferedOutput of v0.12 --- lib/fluent/plugin/output.rb | 69 ++++- test/plugin/test_output.rb | 41 ++- test/plugin/test_output_as_standard.rb | 346 +++++++++++++++++++++++++ 3 files changed, 439 insertions(+), 17 deletions(-) create mode 100644 test/plugin/test_output_as_standard.rb diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index fb8f22fab0..a74c4a9b05 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -37,6 +37,8 @@ class Output < Base CHUNK_KEY_PATTERN = /^[-_.@a-zA-Z0-9]+$/ CHUNK_KEY_PLACEHOLDER_PATTERN = /\$\{[-_.@a-zA-Z0-9]+\}/ + config_param :time_as_integer, :bool, default: false + # `` and `` sections are available only when '#format' and '#write' are implemented config_section :buffer, param_name: :buffer_config, init: true, required: false, multi: false, final: true do config_argument :chunk_keys, :array, value_type: :string, default: [] @@ -98,10 +100,6 @@ def process(tag, es) raise NotImplementedError, "BUG: output plugins MUST implement this method" end - def format(tag, time, record) - raise NotImplementedError, "BUG: output plugins MUST implement this method" - end - def write(chunk) raise NotImplementedError, "BUG: output plugins MUST implement this method" end @@ -110,7 +108,10 @@ def try_write(chunk) raise NotImplementedError, "BUG: output plugins MUST implement this method" end - # TODO: add a way to do rollback_chunk + mark_as_failure, and rollback_chunk_automatically + mark_as_failure (by conf) + def format(tag, time, record) + # standard msgpack_event_stream chunk will be used if this method is not implemented in plugin subclass + raise NotImplementedError, "BUG: output plugins MUST implement this method" + end def prefer_buffered_processing # override this method to return false only when all of these are true: @@ -215,6 +216,9 @@ def configure(conf) @output_time_formatter_cache = {} end + # no chunk keys or only tags (chunking can be done without iterating event stream) + @simple_chunking = !@chunk_key_time && @chunk_keys.empty? + @flush_mode = @buffer_config.flush_mode if @flush_mode == :default @flush_mode = (@chunk_key_time ? :none : :fast) @@ -284,6 +288,7 @@ def start define_method(:emit, m) end + @custom_format = implement?(:custom_format) @delayed_commit = if implement?(:buffered) && implement?(:delayed_commit) prefer_delayed_commit else @@ -398,6 +403,9 @@ def support_in_v12_style?(feature) when :synchronous then false when :buffered then false when :delayed_commit then false + when :custom_format then false + else + raise ArgumentError, "unknown feature: #{feature}" end end @@ -405,8 +413,9 @@ def implement?(feature) methods_of_plugin = self.class.instance_methods(false) case feature when :synchronous then methods_of_plugin.include?(:process) || support_in_v12_style?(:synchronous) - when :buffered then methods_of_plugin.include?(:format) && methods_of_plugin.include?(:write) || support_in_v12_style?(:buffered) - when :delayed_commit then methods_of_plugin.include?(:format) && methods_of_plugin.include?(:try_write) + when :buffered then methods_of_plugin.include?(:write) || support_in_v12_style?(:buffered) + when :delayed_commit then methods_of_plugin.include?(:try_write) + when :custom_format then methods_of_plugin.include?(:format) || support_in_v12_style?(:custom_format) else raise ArgumentError, "Unknown feature for output plugin: #{feature}" end @@ -460,8 +469,7 @@ def emit_sync(tag, es) @counters_monitor.synchronize{ @emit_count += 1 } begin process(tag, es) - # TODO: how to count records of es? add API to event streams? - # @counters_monitor.synchronize{ @emit_records += records } + @counters_monitor.synchronize{ @emit_records += es.records } rescue @counters_monitor.synchronize{ @num_errors += 1 } raise @@ -471,7 +479,7 @@ def emit_sync(tag, es) def emit_buffered(tag, es) @counters_monitor.synchronize{ @emit_count += 1 } begin - metalist = handle_stream(tag, es) + metalist = execute_chunking(tag, es) if @flush_mode == :immediate metalist.each do |meta| @buffer.enqueue_chunk(meta) @@ -518,7 +526,17 @@ def metadata(tag, time, record) end end - def handle_stream(tag, es) + def execute_chunking(tag, es) + if @simple_chunking + handle_stream_simple(tag, es) + elsif @custom_format + handle_stream_with_custom_format(tag, es) + else + handle_stream_with_standard_format(tag, es) + end + end + + def handle_stream_with_custom_format(tag, es) meta_and_data = {} records = 0 es.each do |time, record| @@ -534,6 +552,35 @@ def handle_stream(tag, es) meta_and_data.keys end + def handle_stream_with_standard_format(tag, es) + meta_and_data = {} + records = 0 + es.each do |time, record| + meta = metadata(tag, time, record) + meta_and_data[meta] ||= MultiEventStream.new + meta_and_data[meta].add(time, record) + records += 1 + end + meta_and_data.each_pair do |meta, es| + @buffer.emit_bulk(meta, es.to_msgpack_stream(time_int: @time_as_integer), es.records) + end + @counters_monitor.synchronize{ @emit_records += records } + meta_and_data.keys + end + + def handle_stream_simple(tag, es) + meta = metadata((@chunk_key_tag ? tag : nil), nil, nil) + es_records = es.records + es_bulk = if @custom_format + es.map{|time,record| format(tag, time, record) }.join + else + es.to_msgpack_stream(time_int: @time_as_integer) + end + @buffer.emit_bulk(meta, es_bulk, es_records) + @counters_monitor.synchronize{ @emit_records += es_records } + [meta] + end + def commit_write(chunk_id, delayed: @delayed_commit, secondary: false) if delayed @dequeued_chunks_mutex.synchronize do diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb index eb558d900b..fdca255cb0 100644 --- a/test/plugin/test_output.rb +++ b/test/plugin/test_output.rb @@ -25,6 +25,11 @@ def write(chunk) @write ? @write.call(chunk) : nil end end + class DummyAsyncStandardOutput < DummyBareOutput + def write(chunk) + @write ? @write.call(chunk) : nil + end + end class DummyDelayedOutput < DummyBareOutput def format(tag, time, record) @format ? @format.call(tag, time, record) : [tag, time, record].to_json @@ -33,6 +38,11 @@ def try_write(chunk) @try_write ? @try_write.call(chunk) : nil end end + class DummyDelayedStandardOutput < DummyBareOutput + def try_write(chunk) + @try_write ? @try_write.call(chunk) : nil + end + end class DummyFullFeatureOutput < DummyBareOutput def prefer_buffered_processing @prefer_buffered_processing ? @prefer_buffered_processing.call : false @@ -61,7 +71,9 @@ def create_output(type=:full) when :bare then FluentPluginOutputTest::DummyBareOutput.new when :sync then FluentPluginOutputTest::DummySyncOutput.new when :buffered then FluentPluginOutputTest::DummyAsyncOutput.new + when :standard then FluentPluginOutputTest::DummyAsyncStandardOutput.new when :delayed then FluentPluginOutputTest::DummyDelayedOutput.new + when :sdelayed then FluentPluginOutputTest::DummyDelayedStandardOutput.new when :full then FluentPluginOutputTest::DummyFullFeatureOutput.new else raise ArgumentError, "unknown type: #{type}" @@ -91,26 +103,43 @@ def waiting(seconds) assert !i1.implement?(:synchronous) assert !i1.implement?(:buffered) assert !i1.implement?(:delayed_commit) + assert !i1.implement?(:custom_format) i2 = FluentPluginOutputTest::DummySyncOutput.new assert i2.implement?(:synchronous) assert !i2.implement?(:buffered) assert !i2.implement?(:delayed_commit) + assert !i2.implement?(:custom_format) i3 = FluentPluginOutputTest::DummyAsyncOutput.new assert !i3.implement?(:synchronous) assert i3.implement?(:buffered) assert !i3.implement?(:delayed_commit) + assert i3.implement?(:custom_format) - i4 = FluentPluginOutputTest::DummyDelayedOutput.new + i4 = FluentPluginOutputTest::DummyAsyncStandardOutput.new assert !i4.implement?(:synchronous) - assert !i4.implement?(:buffered) - assert i4.implement?(:delayed_commit) + assert i4.implement?(:buffered) + assert !i4.implement?(:delayed_commit) + assert !i4.implement?(:custom_format) - i5 = FluentPluginOutputTest::DummyFullFeatureOutput.new - assert i5.implement?(:synchronous) - assert i5.implement?(:buffered) + i5 = FluentPluginOutputTest::DummyDelayedOutput.new + assert !i5.implement?(:synchronous) + assert !i5.implement?(:buffered) assert i5.implement?(:delayed_commit) + assert i5.implement?(:custom_format) + + i6 = FluentPluginOutputTest::DummyDelayedStandardOutput.new + assert !i6.implement?(:synchronous) + assert !i6.implement?(:buffered) + assert i6.implement?(:delayed_commit) + assert !i6.implement?(:custom_format) + + i6 = FluentPluginOutputTest::DummyFullFeatureOutput.new + assert i6.implement?(:synchronous) + assert i6.implement?(:buffered) + assert i6.implement?(:delayed_commit) + assert i6.implement?(:custom_format) end test 'plugin lifecycle for configure/start/stop/before_shutdown/shutdown/after_shutdown/close/terminate' do diff --git a/test/plugin/test_output_as_standard.rb b/test/plugin/test_output_as_standard.rb new file mode 100644 index 0000000000..41829ecb64 --- /dev/null +++ b/test/plugin/test_output_as_standard.rb @@ -0,0 +1,346 @@ +require_relative '../helper' +require 'fluent/plugin/output' +require 'fluent/plugin/buffer' +require 'fluent/msgpack_factory' +require 'fluent/event' + +require 'json' +require 'time' +require 'timeout' + +require 'flexmock/test_unit' + +module FluentPluginStandardBufferedOutputTest + class DummyBareOutput < Fluent::Plugin::Output + def register(name, &block) + instance_variable_set("@#{name}", block) + end + end + class DummyAsyncOutput < DummyBareOutput + def format(tag, time, record) + @format ? @format.call(tag, time, record) : [tag, time, record].to_json + end + def write(chunk) + @write ? @write.call(chunk) : nil + end + end + class DummyAsyncStandardOutput < DummyBareOutput + def write(chunk) + @write ? @write.call(chunk) : nil + end + end +end + +class OutputTest < Test::Unit::TestCase + def create_output(type=:full) + case type + when :bare then FluentPluginStandardBufferedOutputTest::DummyBareOutput.new + when :buffered then FluentPluginStandardBufferedOutputTest::DummyAsyncOutput.new + when :standard then FluentPluginStandardBufferedOutputTest::DummyAsyncStandardOutput.new + else + raise ArgumentError, "unknown type: #{type}" + end + end + def create_metadata(timekey: nil, tag: nil, variables: nil) + Fluent::Plugin::Buffer::Metadata.new(timekey, tag, variables) + end + def waiting(seconds) + begin + Timeout.timeout(seconds) do + yield + end + rescue Timeout::Error + STDERR.print *(@i.log.out.logs) + raise + end + end + def test_event_stream + es = Fluent::MultiEventStream.new + es.add(event_time('2016-04-21 17:19:00 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es.add(event_time('2016-04-21 17:19:13 -0700'), {"key" => "my value", "name" => "moris2", "message" => "hello!"}) + es.add(event_time('2016-04-21 17:19:25 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es.add(event_time('2016-04-21 17:20:01 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es.add(event_time('2016-04-21 17:20:13 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es.add(event_time('2016-04-21 17:21:32 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es + end + + teardown do + if @i + @i.stop unless @i.stopped? + @i.before_shutdown unless @i.before_shutdown? + @i.shutdown unless @i.shutdown? + @i.after_shutdown unless @i.after_shutdown? + @i.close unless @i.closed? + @i.terminate unless @i.terminated? + end + end + + sub_test_case 'standard buffered without any chunk keys' do + test '#execute_chunking calls @buffer.emit_bulk just once with predefined msgpack format' do + @i = create_output(:standard) + @i.configure(config_element()) + @i.start + + m = create_metadata() + es = test_event_stream + + buffer_mock = flexmock(@i.buffer) + buffer_mock.should_receive(:emit_bulk).once.with(m, es.to_msgpack_stream, es.records) + + @i.execute_chunking("mytag.test", es) + end + + test '#execute_chunking calls @buffer.emit_bulk just once with predefined msgpack format, but time will be int if time_as_integer specified' do + @i = create_output(:standard) + @i.configure(config_element('ROOT','',{"time_as_integer"=>"true"})) + @i.start + + m = create_metadata() + es = test_event_stream + + buffer_mock = flexmock(@i.buffer) + buffer_mock.should_receive(:emit_bulk).once.with(m, es.to_msgpack_stream(time_int: true), es.records) + + @i.execute_chunking("mytag.test", es) + end + end + + sub_test_case 'standard buffered with tag chunk key' do + test '#execute_chunking calls @buffer.emit_bulk just once with predefined msgpack format' do + @i = create_output(:standard) + @i.configure(config_element('ROOT','',{},[config_element('buffer','tag')])) + @i.start + + m = create_metadata(tag: "mytag.test") + es = test_event_stream + + buffer_mock = flexmock(@i.buffer) + buffer_mock.should_receive(:emit_bulk).once.with(m, es.to_msgpack_stream, es.records) + + @i.execute_chunking("mytag.test", es) + end + + test '#execute_chunking calls @buffer.emit_bulk just once with predefined msgpack format, but time will be int if time_as_integer specified' do + @i = create_output(:standard) + @i.configure(config_element('ROOT','',{"time_as_integer"=>"true"},[config_element('buffer','tag')])) + @i.start + + m = create_metadata(tag: "mytag.test") + es = test_event_stream + + buffer_mock = flexmock(@i.buffer) + buffer_mock.should_receive(:emit_bulk).once.with(m, es.to_msgpack_stream(time_int: true), es.records) + + @i.execute_chunking("mytag.test", es) + end + end + + sub_test_case 'standard buffered with time chunk key' do + test '#execute_chunking calls @buffer.emit_bulk in times of # of time ranges with predefined msgpack format' do + @i = create_output(:standard) + @i.configure(config_element('ROOT','',{},[config_element('buffer','time',{"timekey_range" => "60"})])) + @i.start + + m1 = create_metadata(timekey: Time.parse('2016-04-21 17:19:00 -0700').to_i) + m2 = create_metadata(timekey: Time.parse('2016-04-21 17:20:00 -0700').to_i) + m3 = create_metadata(timekey: Time.parse('2016-04-21 17:21:00 -0700').to_i) + + es1 = Fluent::MultiEventStream.new + es1.add(event_time('2016-04-21 17:19:00 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es1.add(event_time('2016-04-21 17:19:13 -0700'), {"key" => "my value", "name" => "moris2", "message" => "hello!"}) + es1.add(event_time('2016-04-21 17:19:25 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + + es2 = Fluent::MultiEventStream.new + es2.add(event_time('2016-04-21 17:20:01 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es2.add(event_time('2016-04-21 17:20:13 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + + es3 = Fluent::MultiEventStream.new + es3.add(event_time('2016-04-21 17:21:32 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + + buffer_mock = flexmock(@i.buffer) + buffer_mock.should_receive(:emit_bulk).with(m1, es1.to_msgpack_stream, 3).once + buffer_mock.should_receive(:emit_bulk).with(m2, es2.to_msgpack_stream, 2).once + buffer_mock.should_receive(:emit_bulk).with(m3, es3.to_msgpack_stream, 1).once + + es = test_event_stream + @i.execute_chunking("mytag.test", es) + end + + test '#execute_chunking calls @buffer.emit_bulk in times of # of time ranges with predefined msgpack format, but time will be int if time_as_integer specified' do + @i = create_output(:standard) + @i.configure(config_element('ROOT','',{"time_as_integer" => "true"},[config_element('buffer','time',{"timekey_range" => "60"})])) + @i.start + + m1 = create_metadata(timekey: Time.parse('2016-04-21 17:19:00 -0700').to_i) + m2 = create_metadata(timekey: Time.parse('2016-04-21 17:20:00 -0700').to_i) + m3 = create_metadata(timekey: Time.parse('2016-04-21 17:21:00 -0700').to_i) + + es1 = Fluent::MultiEventStream.new + es1.add(event_time('2016-04-21 17:19:00 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es1.add(event_time('2016-04-21 17:19:13 -0700'), {"key" => "my value", "name" => "moris2", "message" => "hello!"}) + es1.add(event_time('2016-04-21 17:19:25 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + + es2 = Fluent::MultiEventStream.new + es2.add(event_time('2016-04-21 17:20:01 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es2.add(event_time('2016-04-21 17:20:13 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + + es3 = Fluent::MultiEventStream.new + es3.add(event_time('2016-04-21 17:21:32 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + + buffer_mock = flexmock(@i.buffer) + buffer_mock.should_receive(:emit_bulk).with(m1, es1.to_msgpack_stream(time_int: true), 3).once + buffer_mock.should_receive(:emit_bulk).with(m2, es2.to_msgpack_stream(time_int: true), 2).once + buffer_mock.should_receive(:emit_bulk).with(m3, es3.to_msgpack_stream(time_int: true), 1).once + + es = test_event_stream + @i.execute_chunking("mytag.test", es) + end + end + + sub_test_case 'standard buffered with variable chunk keys' do + test '#execute_chunking calls @buffer.emit_bulk in times of # of variable variations with predefined msgpack format' do + @i = create_output(:standard) + @i.configure(config_element('ROOT','',{},[config_element('buffer','key,name')])) + @i.start + + m1 = create_metadata(variables: {key: "my value", name: "moris1"}) + es1 = Fluent::MultiEventStream.new + es1.add(event_time('2016-04-21 17:19:00 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es1.add(event_time('2016-04-21 17:19:25 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es1.add(event_time('2016-04-21 17:20:01 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es1.add(event_time('2016-04-21 17:20:13 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es1.add(event_time('2016-04-21 17:21:32 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + + m2 = create_metadata(variables: {key: "my value", name: "moris2"}) + es2 = Fluent::MultiEventStream.new + es2.add(event_time('2016-04-21 17:19:13 -0700'), {"key" => "my value", "name" => "moris2", "message" => "hello!"}) + + buffer_mock = flexmock(@i.buffer) + buffer_mock.should_receive(:emit_bulk).with(m1, es1.to_msgpack_stream, 5).once + buffer_mock.should_receive(:emit_bulk).with(m2, es2.to_msgpack_stream, 1).once + + es = test_event_stream + @i.execute_chunking("mytag.test", es) + end + + test '#execute_chunking calls @buffer.emit_bulk in times of # of variable variations with predefined msgpack format, but time will be int if time_as_integer specified' do + @i = create_output(:standard) + @i.configure(config_element('ROOT','',{"time_as_integer" => "true"},[config_element('buffer','key,name')])) + @i.start + + m1 = create_metadata(variables: {key: "my value", name: "moris1"}) + es1 = Fluent::MultiEventStream.new + es1.add(event_time('2016-04-21 17:19:00 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es1.add(event_time('2016-04-21 17:19:25 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es1.add(event_time('2016-04-21 17:20:01 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es1.add(event_time('2016-04-21 17:20:13 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es1.add(event_time('2016-04-21 17:21:32 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + + m2 = create_metadata(variables: {key: "my value", name: "moris2"}) + es2 = Fluent::MultiEventStream.new + es2.add(event_time('2016-04-21 17:19:13 -0700'), {"key" => "my value", "name" => "moris2", "message" => "hello!"}) + + buffer_mock = flexmock(@i.buffer) + buffer_mock.should_receive(:emit_bulk).with(m1, es1.to_msgpack_stream(time_int: true), 5).once + buffer_mock.should_receive(:emit_bulk).with(m2, es2.to_msgpack_stream(time_int: true), 1).once + + es = test_event_stream + @i.execute_chunking("mytag.test", es) + end + end + + sub_test_case 'custom format buffered without any chunk keys' do + test '#execute_chunking calls @buffer.emit_bulk just once with customized format' do + @i = create_output(:buffered) + @i.register(:format){|tag, time, record| [time, record].to_json } + @i.configure(config_element()) + @i.start + + m = create_metadata() + es = test_event_stream + + buffer_mock = flexmock(@i.buffer) + buffer_mock.should_receive(:emit_bulk).once.with(m, es.map{|t,r| [t,r].to_json }.join, es.records) + + @i.execute_chunking("mytag.test", es) + end + end + + sub_test_case 'custom format buffered with tag chunk key' do + test '#execute_chunking calls @buffer.emit_bulk just once with customized format' do + @i = create_output(:buffered) + @i.register(:format){|tag, time, record| [time, record].to_json } + @i.configure(config_element('ROOT','',{},[config_element('buffer','tag')])) + @i.start + + m = create_metadata(tag: "mytag.test") + es = test_event_stream + + buffer_mock = flexmock(@i.buffer) + buffer_mock.should_receive(:emit_bulk).once.with(m, es.map{|t,r| [t,r].to_json }.join, es.records) + + @i.execute_chunking("mytag.test", es) + end + end + sub_test_case 'custom format buffered with time chunk key' do + test '#execute_chunking calls @buffer.emit in times of # of time ranges with customized format' do + @i = create_output(:buffered) + @i.register(:format){|tag, time, record| [time, record].to_json } + @i.configure(config_element('ROOT','',{},[config_element('buffer','time',{"timekey_range" => "60"})])) + @i.start + + m1 = create_metadata(timekey: Time.parse('2016-04-21 17:19:00 -0700').to_i) + m2 = create_metadata(timekey: Time.parse('2016-04-21 17:20:00 -0700').to_i) + m3 = create_metadata(timekey: Time.parse('2016-04-21 17:21:00 -0700').to_i) + + es1 = Fluent::MultiEventStream.new + es1.add(event_time('2016-04-21 17:19:00 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es1.add(event_time('2016-04-21 17:19:13 -0700'), {"key" => "my value", "name" => "moris2", "message" => "hello!"}) + es1.add(event_time('2016-04-21 17:19:25 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + + es2 = Fluent::MultiEventStream.new + es2.add(event_time('2016-04-21 17:20:01 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es2.add(event_time('2016-04-21 17:20:13 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + + es3 = Fluent::MultiEventStream.new + es3.add(event_time('2016-04-21 17:21:32 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + + buffer_mock = flexmock(@i.buffer) + buffer_mock.should_receive(:emit).with(m1, es1.map{|t,r| [t,r].to_json }).once + buffer_mock.should_receive(:emit).with(m2, es2.map{|t,r| [t,r].to_json }).once + buffer_mock.should_receive(:emit).with(m3, es3.map{|t,r| [t,r].to_json }).once + + es = test_event_stream + @i.execute_chunking("mytag.test", es) + end + end + + sub_test_case 'custom format buffered with variable chunk keys' do + test '#execute_chunking calls @buffer.emit in times of # of variable variations with customized format' do + @i = create_output(:buffered) + @i.register(:format){|tag, time, record| [time, record].to_json } + @i.configure(config_element('ROOT','',{},[config_element('buffer','key,name')])) + @i.start + + m1 = create_metadata(variables: {key: "my value", name: "moris1"}) + es1 = Fluent::MultiEventStream.new + es1.add(event_time('2016-04-21 17:19:00 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es1.add(event_time('2016-04-21 17:19:25 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es1.add(event_time('2016-04-21 17:20:01 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es1.add(event_time('2016-04-21 17:20:13 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + es1.add(event_time('2016-04-21 17:21:32 -0700'), {"key" => "my value", "name" => "moris1", "message" => "hello!"}) + + m2 = create_metadata(variables: {key: "my value", name: "moris2"}) + es2 = Fluent::MultiEventStream.new + es2.add(event_time('2016-04-21 17:19:13 -0700'), {"key" => "my value", "name" => "moris2", "message" => "hello!"}) + + buffer_mock = flexmock(@i.buffer) + buffer_mock.should_receive(:emit).with(m1, es1.map{|t,r| [t,r].to_json }).once + buffer_mock.should_receive(:emit).with(m2, es2.map{|t,r| [t,r].to_json }).once + + es = test_event_stream + @i.execute_chunking("mytag.test", es) + end + end +end From c814a867639c73068198ab861f564c5e113eafe9 Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 22 Apr 2016 19:16:04 +0900 Subject: [PATCH 4/5] add mixin to iterate events from chunks as event_stream --- lib/fluent/event.rb | 16 +++++- lib/fluent/plugin/buffer/chunk.rb | 14 +---- lib/fluent/plugin/buffer/file_chunk.rb | 2 + test/plugin/test_output.rb | 19 ++++--- test/plugin/test_output_as_buffered.rb | 55 ++++++++++--------- .../plugin/test_output_as_buffered_retries.rb | 5 +- .../test_output_as_buffered_secondary.rb | 5 +- test/plugin/test_output_as_standard.rb | 2 +- test/test_event.rb | 10 +++- 9 files changed, 71 insertions(+), 57 deletions(-) diff --git a/lib/fluent/event.rb b/lib/fluent/event.rb index 09e226e992..9a12ef194c 100644 --- a/lib/fluent/event.rb +++ b/lib/fluent/event.rb @@ -51,7 +51,6 @@ def to_msgpack_stream_forced_integer end end - class OneEventStream < EventStream def initialize(time, record) @time = time @@ -175,7 +174,6 @@ def repeatable? end def each(&block) - # TODO format check msgpack_unpacker.feed_each(@data, &block) nil end @@ -184,5 +182,17 @@ def to_msgpack_stream @data end end -end + module ChunkMessagePackEventStreamer + include MessagePackFactory::Mixin + # chunk.extend(ChunkEventStreamer) + # => chunk.each{|time, record| ... } + def each(&block) + open do |io| + msgpack_unpacker(io).each(&block) + end + nil + end + alias :msgpack_each :each + end +end diff --git a/lib/fluent/plugin/buffer/chunk.rb b/lib/fluent/plugin/buffer/chunk.rb index 5780e376ff..55583e62eb 100644 --- a/lib/fluent/plugin/buffer/chunk.rb +++ b/lib/fluent/plugin/buffer/chunk.rb @@ -14,9 +14,9 @@ # limitations under the License. # -require 'fluent/msgpack_factory' require 'fluent/plugin/buffer' require 'fluent/unique_id' +require 'fluent/event' require 'fileutils' require 'monitor' @@ -26,8 +26,8 @@ module Plugin class Buffer # fluent/plugin/buffer is alread loaded class Chunk include MonitorMixin - include MessagePackFactory::Mixin include UniqueId::Mixin + include ChunkMessagePackEventStreamer # Chunks has 2 part: # * metadata: contains metadata which should be restored after resume (if possible) @@ -113,16 +113,6 @@ def write_to(io) FileUtils.copy_stream(i, io) end end - - def msgpack_each(&block) - open do |io| - u = msgpack_factory.unpacker(io) - begin - u.each(&block) - rescue EOFError - end - end - end end end end diff --git a/lib/fluent/plugin/buffer/file_chunk.rb b/lib/fluent/plugin/buffer/file_chunk.rb index f9f4e78789..02b089705c 100644 --- a/lib/fluent/plugin/buffer/file_chunk.rb +++ b/lib/fluent/plugin/buffer/file_chunk.rb @@ -16,6 +16,7 @@ require 'fluent/plugin/buffer/chunk' require 'fluent/unique_id' +require 'fluent/msgpack_factory' module Fluent module Plugin @@ -33,6 +34,7 @@ class FileChunk < Chunk # path_suffix: path suffix string, like '.log' (or any other user specified) include SystemConfig::Mixin + include MessagePackFactory::Mixin FILE_PERMISSION = 0644 diff --git a/test/plugin/test_output.rb b/test/plugin/test_output.rb index fdca255cb0..b9fc81a19b 100644 --- a/test/plugin/test_output.rb +++ b/test/plugin/test_output.rb @@ -1,6 +1,7 @@ require_relative '../helper' require 'fluent/plugin/output' require 'fluent/plugin/buffer' +require 'fluent/event' require 'json' require 'time' @@ -288,7 +289,7 @@ def waiting(seconds) i.start t = event_time() - i.emit('tag', [ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) + i.emit('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) assert process_called @@ -303,7 +304,7 @@ def waiting(seconds) i.start t = event_time() - i.emit('tag', [ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) + i.emit('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) assert_equal 2, format_called_times @@ -325,7 +326,7 @@ def waiting(seconds) assert !i.prefer_buffered_processing t = event_time() - i.emit('tag', [ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) + i.emit('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) waiting(4){ Thread.pass until process_called } @@ -350,7 +351,7 @@ def waiting(seconds) assert i.prefer_buffered_processing t = event_time() - i.emit('tag', [ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) + i.emit('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) assert !process_called assert_equal 2, format_called_times @@ -367,7 +368,7 @@ def waiting(seconds) i.start t = event_time() - i.emit('tag', [ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) + i.emit('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) i.force_flush waiting(4){ Thread.pass until write_called } @@ -386,7 +387,7 @@ def waiting(seconds) i.start t = event_time() - i.emit('tag', [ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) + i.emit('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) i.force_flush waiting(4){ Thread.pass until try_write_called } @@ -410,7 +411,7 @@ def waiting(seconds) assert !i.prefer_delayed_commit t = event_time() - i.emit('tag', [ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) + i.emit('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) i.force_flush waiting(4){ Thread.pass until write_called || try_write_called } @@ -435,7 +436,7 @@ def waiting(seconds) assert i.prefer_delayed_commit t = event_time() - i.emit('tag', [ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) + i.emit('tag', Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ])) i.force_flush waiting(4){ Thread.pass until write_called || try_write_called } @@ -471,7 +472,7 @@ def waiting(seconds) @i.start t = event_time() - es = [ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ] + es = Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) 5.times do @i.emit('tag', es) end diff --git a/test/plugin/test_output_as_buffered.rb b/test/plugin/test_output_as_buffered.rb index 7ae05e5805..97eaa53515 100644 --- a/test/plugin/test_output_as_buffered.rb +++ b/test/plugin/test_output_as_buffered.rb @@ -1,6 +1,7 @@ require_relative '../helper' require 'fluent/plugin/output' require 'fluent/plugin/buffer' +require 'fluent/event' require 'json' require 'time' @@ -125,7 +126,7 @@ def waiting(seconds) @i.register(:format){|tag, time, record| ary << [tag, time, record]; '' } t = event_time() - es = [ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ] + es = Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) 5.times do @i.emit('tag.test', es) @@ -151,14 +152,14 @@ def waiting(seconds) event_size = [tag, t, r].to_json.size # 195 (1024 / event_size).times do |i| - @i.emit("test.tag", [ [t, r] ]) + @i.emit("test.tag", Fluent::ArrayEventStream.new([ [t, r] ])) end assert{ @i.buffer.queue.size == 0 && ary.size == 0 } staged_chunk = @i.buffer.stage[@i.buffer.stage.keys.first] assert{ staged_chunk.records != 0 } - @i.emit("test.tag", [ [t, r] ]) + @i.emit("test.tag", Fluent::ArrayEventStream.new([ [t, r] ])) assert{ @i.buffer.queue.size > 0 || @i.buffer.dequeued.size > 0 || ary.size > 0 } @@ -184,7 +185,7 @@ def waiting(seconds) event_size = [tag, t, r].to_json.size # 195 (1024 / event_size).times do |i| - @i.emit("test.tag", [ [t, r] ]) + @i.emit("test.tag", Fluent::ArrayEventStream.new([ [t, r] ])) end assert{ @i.buffer.queue.size == 0 && ary.size == 0 } @@ -225,7 +226,7 @@ def waiting(seconds) @i.register(:format){|tag, time, record| ary << [tag, time, record]; '' } t = event_time() - es = [ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ] + es = Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) 5.times do @i.emit('tag.test', es) @@ -255,8 +256,8 @@ def waiting(seconds) 3.times do |i| rand_records = rand(1..5) - es = [ [t, r] ] * rand_records - assert_equal rand_records, es.size + es = Fluent::ArrayEventStream.new([ [t, r] ] * rand_records) + assert_equal rand_records, es.records @i.interrupt_flushes @@ -292,7 +293,7 @@ def waiting(seconds) event_size = [tag, t, r].to_json.size # 195 (1024 / event_size).times do |i| - @i.emit("test.tag", [ [t, r] ]) + @i.emit("test.tag", Fluent::ArrayEventStream.new([ [t, r] ])) end assert{ @i.buffer.queue.size == 0 && ary.size == 0 } @@ -332,7 +333,7 @@ def waiting(seconds) @i.register(:format){|tag, time, record| ary << [tag, time, record]; '' } t = event_time() - es = [ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ] + es = Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) 5.times do @i.emit('tag.test', es) @@ -362,8 +363,8 @@ def waiting(seconds) 3.times do |i| rand_records = rand(1..5) - es = [ [t, r] ] * rand_records - assert_equal rand_records, es.size + es = Fluent::ArrayEventStream.new([ [t, r] ] * rand_records) + assert_equal rand_records, es.records @i.emit("test.tag", es) assert{ @i.buffer.stage.size == 0 && (@i.buffer.queue.size == 1 || @i.buffer.dequeued.size == 1 || ary.size > 0) } @@ -387,7 +388,7 @@ def waiting(seconds) (0...10).each do |i| r["key#{i}"] = "value #{i}" end - @i.emit("test.tag", [ [t, r] ]) + @i.emit("test.tag", Fluent::ArrayEventStream.new([ [t, r] ])) @i.stop @i.before_shutdown @@ -437,7 +438,7 @@ def waiting(seconds) @i.register(:format){|tag, time, record| ary << [tag, time, record]; '' } t = event_time() - es = [ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ] + es = Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) 5.times do @i.emit('tag.test', es) @@ -490,7 +491,7 @@ def waiting(seconds) @i.interrupt_flushes events.shuffle.each do |tag, time, record| - @i.emit(tag, [ [time, record] ]) + @i.emit(tag, Fluent::ArrayEventStream.new([ [time, record] ])) end assert{ @i.buffer.stage.size == 3 } assert{ @i.write_count == 0 } @@ -569,7 +570,7 @@ def waiting(seconds) @i.interrupt_flushes events.shuffle.each do |tag, time, record| - @i.emit(tag, [ [time, record] ]) + @i.emit(tag, Fluent::ArrayEventStream.new([ [time, record] ])) end assert{ @i.buffer.stage.size == 3 } assert{ @i.write_count == 0 } @@ -645,7 +646,7 @@ def waiting(seconds) @i.register(:format){|tag, time, record| ary << [tag, time, record]; '' } t = event_time() - es = [ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ] + es = Fluent::ArrayEventStream.new([ [t, {"key" => "value1"}], [t, {"key" => "value2"}] ]) 5.times do @i.emit('tag.test', es) @@ -699,7 +700,7 @@ def waiting(seconds) @i.interrupt_flushes events.shuffle.each do |tag, time, record| - @i.emit(tag, [ [time, record] ]) + @i.emit(tag, Fluent::ArrayEventStream.new([ [time, record] ])) end assert{ @i.buffer.stage.size == 2 } # test.tag.1 x1, test.tag.2 x1 @@ -790,7 +791,7 @@ def waiting(seconds) @i.interrupt_flushes events.shuffle.each do |tag, time, record| - @i.emit(tag, [ [time, record] ]) + @i.emit(tag, Fluent::ArrayEventStream.new([ [time, record] ])) end assert{ @i.buffer.stage.size == 2 } # test.tag.1 x1, test.tag.2 x1 @@ -861,10 +862,10 @@ def waiting(seconds) @i.register(:format){|tag, time, record| ary << [tag, time, record]; '' } t = event_time() - es = [ + es = Fluent::ArrayEventStream.new([ [t, {"key" => "value1", "name" => "moris", "service" => "a"}], [t, {"key" => "value2", "name" => "moris", "service" => "b"}], - ] + ]) 5.times do @i.emit('tag.test', es) @@ -909,7 +910,7 @@ def waiting(seconds) @i.interrupt_flushes events.shuffle.each do |tag, time, record| - @i.emit(tag, [ [time, record] ]) + @i.emit(tag, Fluent::ArrayEventStream.new([ [time, record] ])) end assert{ @i.buffer.stage.size == 3 } @@ -999,7 +1000,7 @@ def waiting(seconds) @i.interrupt_flushes events.shuffle.each do |tag, time, record| - @i.emit(tag, [ [time, record] ]) + @i.emit(tag, Fluent::ArrayEventStream.new([ [time, record] ])) end assert{ @i.buffer.stage.size == 3 } @@ -1096,10 +1097,10 @@ def waiting(seconds) @i.register(:format){|tag, time, record| ary << [tag, time, record]; '' } t = event_time() - es = [ + es = Fluent::ArrayEventStream.new([ [t, {"key" => "value1", "name" => "moris", "service" => "a"}], [t, {"key" => "value2", "name" => "moris", "service" => "b"}], - ] + ]) 5.times do @i.emit('tag.test', es) @@ -1152,7 +1153,7 @@ def waiting(seconds) @i.interrupt_flushes events.shuffle.each do |tag, time, record| - @i.emit(tag, [ [time, record] ]) + @i.emit(tag, Fluent::ArrayEventStream.new([ [time, record] ])) end assert{ @i.buffer.stage.size == 2 } @@ -1271,7 +1272,7 @@ def waiting(seconds) @i.interrupt_flushes events.shuffle.each do |tag, time, record| - @i.emit(tag, [ [time, record] ]) + @i.emit(tag, Fluent::ArrayEventStream.new([ [time, record] ])) end assert{ @i.buffer.stage.size == 2 } @@ -1431,7 +1432,7 @@ def waiting(seconds) @i.interrupt_flushes events.shuffle.each do |tag, time, record| - @i.emit(tag, [ [time, record] ]) + @i.emit(tag, Fluent::ArrayEventStream.new([ [time, record] ])) end assert{ @i.buffer.stage.size == 2 } diff --git a/test/plugin/test_output_as_buffered_retries.rb b/test/plugin/test_output_as_buffered_retries.rb index ffdcfc6d8b..118ab24443 100644 --- a/test/plugin/test_output_as_buffered_retries.rb +++ b/test/plugin/test_output_as_buffered_retries.rb @@ -1,6 +1,7 @@ require_relative '../helper' require 'fluent/plugin/output' require 'fluent/plugin/buffer' +require 'fluent/event' require 'json' require 'time' @@ -71,11 +72,11 @@ def waiting(seconds) end end def dummy_event_stream - [ + Fluent::ArrayEventStream.new([ [ event_time('2016-04-13 18:33:00'), {"name" => "moris", "age" => 36, "message" => "data1"} ], [ event_time('2016-04-13 18:33:13'), {"name" => "moris", "age" => 36, "message" => "data2"} ], [ event_time('2016-04-13 18:33:32'), {"name" => "moris", "age" => 36, "message" => "data3"} ], - ] + ]) end def get_log_time(msg, logs) log_time = nil diff --git a/test/plugin/test_output_as_buffered_secondary.rb b/test/plugin/test_output_as_buffered_secondary.rb index e5af89f574..1aca5eaf5a 100644 --- a/test/plugin/test_output_as_buffered_secondary.rb +++ b/test/plugin/test_output_as_buffered_secondary.rb @@ -1,6 +1,7 @@ require_relative '../helper' require 'fluent/plugin/output' require 'fluent/plugin/buffer' +require 'fluent/event' require 'json' require 'time' @@ -71,11 +72,11 @@ def waiting(seconds) end end def dummy_event_stream - [ + Fluent::ArrayEventStream.new([ [ event_time('2016-04-13 18:33:00'), {"name" => "moris", "age" => 36, "message" => "data1"} ], [ event_time('2016-04-13 18:33:13'), {"name" => "moris", "age" => 36, "message" => "data2"} ], [ event_time('2016-04-13 18:33:32'), {"name" => "moris", "age" => 36, "message" => "data3"} ], - ] + ]) end teardown do diff --git a/test/plugin/test_output_as_standard.rb b/test/plugin/test_output_as_standard.rb index 41829ecb64..53b004154c 100644 --- a/test/plugin/test_output_as_standard.rb +++ b/test/plugin/test_output_as_standard.rb @@ -31,7 +31,7 @@ def write(chunk) end end -class OutputTest < Test::Unit::TestCase +class StandardBufferedOutputTest < Test::Unit::TestCase def create_output(type=:full) case type when :bare then FluentPluginStandardBufferedOutputTest::DummyBareOutput.new diff --git a/test/test_event.rb b/test/test_event.rb index 488db46196..1d8d6332bd 100644 --- a/test/test_event.rb +++ b/test/test_event.rb @@ -7,7 +7,7 @@ class OneEventStreamTest < ::Test::Unit::TestCase include Fluent def setup - @time = Engine.now + @time = event_time() @record = {'k' => 'v', 'n' => 1} @es = OneEventStream.new(@time, @record) end @@ -36,6 +36,14 @@ def setup assert_equal @record, record } end + + test 'to_msgpack_stream with time_int argument' do + stream = @es.to_msgpack_stream(time_int: true) + Fluent::Engine.msgpack_factory.unpacker.feed_each(stream) { |time, record| + assert_equal @time.to_i, time + assert_equal @record, record + } + end end class ArrayEventStreamTest < ::Test::Unit::TestCase From 2845bad07c14223ed93396a52890e6ca257c944b Mon Sep 17 00:00:00 2001 From: TAGOMORI Satoshi Date: Fri, 22 Apr 2016 20:44:53 +0900 Subject: [PATCH 5/5] add a method not to iterate/re-pack content --- lib/fluent/event.rb | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/fluent/event.rb b/lib/fluent/event.rb index 9a12ef194c..1a3c25bc19 100644 --- a/lib/fluent/event.rb +++ b/lib/fluent/event.rb @@ -194,5 +194,9 @@ def each(&block) nil end alias :msgpack_each :each + + def to_msgpack_stream + read + end end end