Skip to content

Commit

Permalink
pass size to initilizer of MessagePackEventStream
Browse files Browse the repository at this point in the history
* pass `size` to initilizer of MessagePackEventStream if `size` exists in `option` at in_forward
* remove unnecessary condtion
  • Loading branch information
ganmacs committed Aug 4, 2016
1 parent d9952cf commit 3867b88
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
7 changes: 4 additions & 3 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,11 @@ def on_message(msg, chunk_size, source)

if entries.class == String
# PackedForward
es = MessagePackEventStream.new(entries)
option = msg[2]
size = (option && option['size']) || 0
es = MessagePackEventStream.new(entries, nil, size.to_i)
es = check_and_skip_invalid_event(tag, es, source) if @skip_invalid_event
router.emit_stream(tag, es)
option = msg[2]

elsif entries.class == Array
# Forward
Expand Down Expand Up @@ -265,7 +266,7 @@ def on_read(data)
@y = Yajl::Parser.new
@y.on_parse_complete = lambda { |obj|
option = @on_message.call(obj, @chunk_counter, @source)
respond option if option
respond option
@chunk_counter = 0
}
else
Expand Down
24 changes: 24 additions & 0 deletions test/plugin/test_in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,30 @@ def test_message_json
end
end

def test_set_size_to_option
d = create_driver

time = Time.parse("2011-01-02 13:14:15 UTC").to_i
events = [
["tag1", time, {"a"=>1}],
["tag1", time, {"a"=>2}]
]

entries = ''
events.each {|_tag, _time, record|
[_time, record].to_msgpack(entries)
}

chunk = ["tag1", entries, { 'size' => events.length }].to_msgpack

d.run do
Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj|
option = d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000")
assert_equal option['size'], events.length
end
end
end

def test_send_large_chunk_warning
d = create_driver(CONFIG + %[
chunk_size_warn_limit 16M
Expand Down

0 comments on commit 3867b88

Please sign in to comment.