Skip to content

Commit

Permalink
Add a test that in_forward creates CompressedMessagePackEventStream
Browse files Browse the repository at this point in the history
  • Loading branch information
ganmacs committed Sep 13, 2016
1 parent 80addd2 commit 60381fa
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 2 deletions.
6 changes: 5 additions & 1 deletion lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,11 @@ def configure(conf)
end

def format_stream(tag, es) # for BufferedOutputTestDriver
es.to_msgpack_stream(time_int: @time_as_integer)
if @compress == :gzip
es.to_compressed_msgpack_stream(time_int: @time_as_integer)
else
es.to_msgpack_stream(time_int: @time_as_integer)
end
end

def write(chunk)
Expand Down
4 changes: 3 additions & 1 deletion lib/fluent/test/input_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class InputTestDriver < TestDriver
def initialize(klass, &block)
super(klass, &block)
@emit_streams = []
@event_streams = []
@expects = nil
# for checking only the number of emitted records during run
@expected_emits_length = nil
Expand All @@ -42,7 +43,7 @@ def expected_emits

attr_accessor :expected_emits_length
attr_accessor :run_timeout
attr_reader :emit_streams
attr_reader :emit_streams, :event_streams

def emits
all = []
Expand Down Expand Up @@ -165,6 +166,7 @@ def run(num_waits = 10, &block)

private
def emit_stream(tag, es)
@event_streams << es
@emit_streams << [tag, es.to_a]
end
end
Expand Down
34 changes: 34 additions & 0 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,40 @@ def test_send_without_time_as_integer
assert_empty d.instance.exceptions
end

def test_send_comprssed_message_pack_stream_if_compress_is_gzip
target_input_driver = create_target_input_driver

d = create_driver(CONFIG + %[
flush_interval 1s
compress gzip
])

time = event_time('2011-01-02 13:14:15 UTC')

records = [
{"a" => 1},
{"a" => 2}
]
d.register_run_post_condition do
d.instance.responses.length == 1
end

target_input_driver.run do
d.run do
records.each do |record|
d.emit record, time
end
end
end

event_streams = target_input_driver.event_streams
assert_true event_streams[0].is_a?(Fluent::CompressedMessagePackEventStream)

emits = target_input_driver.emits
assert_equal ['test', time, records[0]], emits[0]
assert_equal ['test', time, records[1]], emits[1]
end

def test_send_to_a_node_supporting_responses
target_input_driver = create_target_input_driver(response_stub: true)

Expand Down

0 comments on commit 60381fa

Please sign in to comment.