diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 65d25e7201..5b42bb6cf1 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -122,12 +122,17 @@ def format(tag, time, record) raise NotImplementedError, "BUG: output plugins MUST implement this method" end - def formatted_to_msgpack_binary + def formatted_to_msgpack_binary? # To indicate custom format method (#format) returns msgpack binary or not. # If #format returns msgpack binary, override this method to return true. false end + # Compatibility for existing plugins + def formatted_to_msgpack_binary + formatted_to_msgpack_binary? + end + def prefer_buffered_processing # override this method to return false only when all of these are true: # * plugin has both implementation for buffered and non-buffered methods diff --git a/test/plugin/test_output_as_buffered.rb b/test/plugin/test_output_as_buffered.rb index e8240706df..2f94d35a13 100644 --- a/test/plugin/test_output_as_buffered.rb +++ b/test/plugin/test_output_as_buffered.rb @@ -75,6 +75,31 @@ def try_write(chunk) end end class DummyCustomFormatBufferedOutput < DummyBareOutput + def initialize + super + @format_type_is_msgpack = nil + @prefer_delayed_commit = nil + @write = nil + @try_write = nil + end + def format(tag, time, record) + @format ? @format.call(tag, time, record) : [tag, time, record].to_json + end + def formatted_to_msgpack_binary? + @format_type_is_msgpack ? @format_type_is_msgpack.call : false + end + def prefer_delayed_commit + @prefer_delayed_commit ? @prefer_delayed_commit.call : false + end + def write(chunk) + @write ? @write.call(chunk) : nil + end + def try_write(chunk) + @try_write ? @try_write.call(chunk) : nil + end + end + # check for formatted_to_msgpack_binary compatibility + class DummyOldCustomFormatBufferedOutput < DummyBareOutput def initialize super @format_type_is_msgpack = nil @@ -163,6 +188,7 @@ def create_output(type=:full) when :full then FluentPluginOutputAsBufferedTest::DummyFullFeatureOutput.new when :old_buf then FluentPluginOutputAsBufferedTest::DummyOldBufferedOutput.new when :old_obj then FluentPluginOutputAsBufferedTest::DummyOldObjectBufferedOutput.new + when :old_custom then FluentPluginOutputAsBufferedTest::DummyOldCustomFormatBufferedOutput.new else raise ArgumentError, "unknown type: #{type}" end @@ -306,9 +332,11 @@ def waiting(seconds) assert_equal 0, events_from_chunk.size end - test 'plugin using custom format can iterate chunk in #write if #format returns msgpack' do + data('formatted_to_msgpack_binary?' => :custom, + 'formatted_to_msgpack_binary' => :old_custom) + test 'plugin using custom format can iterate chunk in #write if #format returns msgpack' do |out_type| events_from_chunk = [] - @i = create_output(:custom) + @i = create_output(out_type) @i.configure(config_element('ROOT','',{},[config_element('buffer','',@hash)])) @i.register(:prefer_delayed_commit){ false } @i.register(:format){ |tag, time, record| [tag,time,record].to_msgpack }