Skip to content

Commit

Permalink
Merge pull request #1490 from fluent/backport-add-xxx-key-to-in_forward
Browse files Browse the repository at this point in the history
in_forward: backport source / address key features
  • Loading branch information
repeatedly authored Mar 3, 2017
2 parents 79a2cbe + 2249683 commit 9fee2fb
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 47 deletions.
57 changes: 47 additions & 10 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,24 @@ def initialize
config_param :chunk_size_limit, :size, default: nil
desc 'Skip an event if incoming event is invalid.'
config_param :skip_invalid_event, :bool, default: false
desc 'Try to resolve hostname from IP addresses or not.'
config_param :resolve_hostname, :bool, default: nil
desc "The field name of the client's source address."
config_param :source_address_key, :string, default: nil
desc "The field name of the client's hostname."
config_param :source_hostname_key, :string, default: nil

def configure(conf)
super

if @source_hostname_key
if @resolve_hostname.nil?
@resolve_hostname = true
elsif !@resolve_hostname # user specifies "false" in configure
raise Fluent::ConfigError, "resolve_hostname must be true with source_hostname_key"
end
end
@enable_field_injection = @source_address_key || @source_hostname_key
end

def start
Expand Down Expand Up @@ -88,7 +101,7 @@ def shutdown

def listen
log.info "listening fluent socket on #{@bind}:#{@port}"
s = Coolio::TCPServer.new(@bind, @port, Handler, @linger_timeout, log, method(:on_message))
s = Coolio::TCPServer.new(@bind, @port, Handler, @linger_timeout, log, @resolve_hostname, method(:on_message))
s.listen(@backlog) unless @backlog.nil?
s
end
Expand Down Expand Up @@ -161,7 +174,7 @@ def on_message(msg, chunk_size, peeraddr)
# PackedForward
es = MessagePackEventStream.new(entries)
es = check_and_skip_invalid_event(tag, es, peeraddr) if @skip_invalid_event
es = add_source_host(es, peeraddr[2]) if @source_hostname_key
es = add_source_host(es, peeraddr) if @enable_field_injection
router.emit_stream(tag, es)
option = msg[2]

Expand All @@ -180,7 +193,7 @@ def on_message(msg, chunk_size, peeraddr)
}
es
end
es = add_source_host(es, peeraddr[2]) if @source_hostname_key
es = add_source_host(es, peeraddr) if @enable_field_injection
router.emit_stream(tag, es)
option = msg[2]

Expand All @@ -194,7 +207,10 @@ def on_message(msg, chunk_size, peeraddr)
end
return if record.nil?
time = Engine.now if time == 0
record[@source_hostname_key] = peeraddr[2] if @source_hostname_key
if @enable_field_injection
record[@source_hostname_key] = peeraddr[2] if @source_hostname_key
record[@source_address_key] = peeraddr[3] if @source_address_key
end
router.emit(tag, time, record)
option = msg[3]
end
Expand All @@ -219,12 +235,31 @@ def check_and_skip_invalid_event(tag, es, peeraddr)
new_es
end

def add_source_host(es, host)
def add_source_host(es, peeraddr)
new_es = MultiEventStream.new
es.each { |time, record|
record[@source_hostname_key] = host
new_es.add(time, record)
}
if @source_address_key && @source_hostname_key
address = peeraddr[3]
hostname = peeraddr[2]
es.each { |time, record|
record[@source_address_key] = address
record[@source_hostname_key] = hostname
new_es.add(time, record)
}
elsif @source_address_key
address = peeraddr[3]
es.each { |time, record|
record[@source_address_key] = address
new_es.add(time, record)
}
elsif @source_hostname_key
hostname = peeraddr[2]
es.each { |time, record|
record[@source_hostname_key] = hostname
new_es.add(time, record)
}
else
raise "BUG: don't call this method in this case"
end
new_es
end

Expand All @@ -236,11 +271,13 @@ def source_message(peeraddr)
class Handler < Coolio::Socket
PEERADDR_FAILED = ["?", "?", "name resolusion failed", "?"]

def initialize(io, linger_timeout, log, on_message)
def initialize(io, linger_timeout, log, resolve_hostname, on_message)
super(io)

@peeraddr = nil
if io.is_a?(TCPSocket) # for unix domain socket support in the future
io.do_not_reverse_lookup = !resolve_hostname unless resolve_hostname.nil?

@peeraddr = (io.peeraddr rescue PEERADDR_FAILED)
opt = [1, linger_timeout].pack('I!I!') # { int l_onoff; int l_linger; }
io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
Expand Down
126 changes: 89 additions & 37 deletions test/plugin/test_in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -498,52 +498,104 @@ def send_data(data, try_to_receive_response=false, response_timeout=1)
@responses << res if try_to_receive_response
end

# TODO: Use sub_test_case. Currently Errno::EADDRINUSE happens inside sub_test_case
test 'message protocol with source_hostname_key' do
execute_test { |events|
events.each { |tag, time, record|
send_data [tag, time, record].to_msgpack
LOCALHOST_HOSTNAME_GETTER = ->(){sock = UDPSocket.new(::Socket::AF_INET); sock.do_not_reverse_lookup = false; sock.connect("127.0.0.1", 2048); sock.peeraddr[2] }
LOCALHOST_HOSTNAME = LOCALHOST_HOSTNAME_GETTER.call
DUMMY_SOCK = Struct.new(:remote_host, :remote_addr, :remote_port).new(LOCALHOST_HOSTNAME, "127.0.0.1", 0)

sub_test_case 'source_hostname_key and source_address_key features' do
test 'resolve_hostname must be true with source_hostname_key' do
assert_raise(Fluent::ConfigError) {
create_driver(CONFIG + <<EOS)
resolve_hostname false
source_hostname_key hostname
EOS
}
}
end
end
data(
both: [:hostname, :address],
hostname: [:hostname],
address: [:address],
)
test 'message protocol' do |keys|
execute_test(*keys) { |events|
events.each { |tag, time, record|
send_data [tag, time, record].to_msgpack
}
}
end

test 'forward protocol with source_hostname_key' do
execute_test { |events|
entries = []
events.each {|tag,time,record|
entries << [time, record]
data(
both: [:hostname, :address],
hostname: [:hostname],
address: [:address],
)
test 'forward protocol' do |keys|
execute_test(*keys) { |events|
entries = []
events.each {|tag,time,record|
entries << [time, record]
}
send_data ['tag1', entries].to_msgpack
}
send_data ['tag1', entries].to_msgpack
}
end
end

test 'packed forward protocol with source_hostname_key' do
execute_test { |events|
entries = ''
events.each { |tag, time, record|
Fluent::Engine.msgpack_factory.packer(entries).write([time, record]).flush
data(
both: [:hostname, :address],
hostname: [:hostname],
address: [:address],
)
test 'packed forward protocol' do |keys|
execute_test(*keys) { |events|
entries = ''
events.each { |tag, time, record|
Fluent::Engine.msgpack_factory.packer(entries).write([time, record]).flush
}
send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s
}
send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s
}
end
end

def execute_test(&block)
d = create_driver(CONFIG + 'source_hostname_key source')
def execute_test(*keys, &block)
conf = CONFIG.dup
if keys.include?(:hostname)
conf << <<EOL
source_hostname_key source_hostname
EOL
end
if keys.include?(:address)
conf << <<EOL
source_address_key source_address
EOL
end
d = create_driver(conf)

time = Time.parse("2011-01-02 13:14:15 UTC").to_i
events = [
["tag1", time, {"a"=>1}],
["tag1", time, {"a"=>2}]
]
d.expected_emits_length = events.length
time = Time.parse("2011-01-02 13:14:15 UTC").to_i
events = [
["tag1", time, {"a"=>1}],
["tag1", time, {"a"=>2}]
]
d.expected_emits_length = events.length

d.run do
block.call(events)
end
d.run do
block.call(events)
end

d.emits.each { |tag, _time, record|
assert_true record.has_key?('source')
}
d.emits.each { |tag, _time, record|
if keys.include?(:hostname)
assert_true record.has_key?('source_hostname')
assert_equal DUMMY_SOCK.remote_host, record['source_hostname']
unless keys.include?(:address)
assert_false record.has_key?('source_address')
end
end
if keys.include?(:address)
assert_true record.has_key?('source_address')
assert_equal DUMMY_SOCK.remote_addr, record['source_address']
unless keys.include?(:hostname)
assert_false record.has_key?('source_hostname')
end
end
}
end
end

# TODO heartbeat
Expand Down

0 comments on commit 9fee2fb

Please sign in to comment.