Skip to content

Commit

Permalink
Add tail_path option to in_tail plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
sonots committed May 16, 2016
1 parent aeaa5df commit 0ce19ec
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 5 deletions.
12 changes: 8 additions & 4 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def initialize
raise ConfigError, e.message
end
end
desc 'Add the log path being tailed to records. Specify the field name to be used.'
config_param :path_key, :string, default: nil

attr_reader :paths

Expand Down Expand Up @@ -252,6 +254,7 @@ def flush_buffer(tw)
else
@tag
end
record[@path_key] ||= tw.path unless @path_key.nil?
router.emit(tag, time, record)
else
log.warn "got incomplete line at shutdown from #{tw.path}: #{lb.inspect}"
Expand Down Expand Up @@ -289,12 +292,13 @@ def receive_lines(lines, tail_watcher)
return true
end

def convert_line_to_event(line, es)
def convert_line_to_event(line, es, tail_watcher)
begin
line.chomp! # remove \n
line.force_encoding(@encoding) if @encoding
@parser.parse(line) { |time, record|
if time && record
record[@path_key] ||= tail_watcher.path unless @path_key.nil?
es.add(time, record)
else
log.warn "pattern not match: #{line.inspect}"
Expand All @@ -309,7 +313,7 @@ def convert_line_to_event(line, es)
def parse_singleline(lines, tail_watcher)
es = MultiEventStream.new
lines.each { |line|
convert_line_to_event(line, es)
convert_line_to_event(line, es, tail_watcher)
}
es
end
Expand All @@ -322,7 +326,7 @@ def parse_multilines(lines, tail_watcher)
lines.each { |line|
if @parser.firstline?(line)
if lb
convert_line_to_event(lb, es)
convert_line_to_event(lb, es, tail_watcher)
end
lb = line
else
Expand All @@ -339,7 +343,7 @@ def parse_multilines(lines, tail_watcher)
lb << line
@parser.parse(lb) { |time, record|
if time && record
convert_line_to_event(lb, es)
convert_line_to_event(lb, es, tail_watcher)
lb = ''
end
}
Expand Down
119 changes: 118 additions & 1 deletion test/plugin/test_in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
require 'fluent/plugin/in_tail'
require 'fluent/system_config'
require 'net/http'
require 'flexmock'
require 'flexmock/test_unit'

class TailInputTest < Test::Unit::TestCase
include FlexMock::TestCase
Expand Down Expand Up @@ -809,4 +809,121 @@ def execute_test(error_class, error_message)
d.emits
end
end

sub_test_case "tail_path" do
def test_tail_path_with_singleline
File.open("#{TMP_DIR}/tail.txt", "wb") {|f|
f.puts "test1"
f.puts "test2"
}

d = create_driver(%[path_key path] + SINGLE_LINE_CONFIG)

d.run do
sleep 1

File.open("#{TMP_DIR}/tail.txt", "ab") {|f|
f.puts "test3"
f.puts "test4"
}
sleep 1
end

emits = d.emits
assert_equal(true, emits.length > 0)
emits.each do |emit|
assert_equal("#{TMP_DIR}/tail.txt", emit[2]["path"])
end
end

def test_tail_path_with_multiline_with_firstline
File.open("#{TMP_DIR}/tail.txt", "wb") { |f| }

d = create_driver %[
path_key path
format multiline
format1 /^s (?<message1>[^\\n]+)(\\nf (?<message2>[^\\n]+))?(\\nf (?<message3>.*))?/
format_firstline /^[s]/
]
d.run do
File.open("#{TMP_DIR}/tail.txt", "ab") { |f|
f.puts "f test1"
f.puts "s test2"
f.puts "f test3"
f.puts "f test4"
f.puts "s test5"
f.puts "s test6"
f.puts "f test7"
f.puts "s test8"
}
sleep 1
end

emits = d.emits
assert(emits.length == 4)
emits.each do |emit|
assert_equal("#{TMP_DIR}/tail.txt", emit[2]["path"])
end
end

def test_tail_path_with_multiline_without_firstline
File.open("#{TMP_DIR}/tail.txt", "wb") { |f| }

d = create_driver %[
path_key path
format multiline
format1 /(?<var1>foo \\d)\\n/
format2 /(?<var2>bar \\d)\\n/
format3 /(?<var3>baz \\d)/
]
d.run do
File.open("#{TMP_DIR}/tail.txt", "ab") { |f|
f.puts "foo 1"
f.puts "bar 1"
f.puts "baz 1"
}
sleep 1
end

emits = d.emits
assert(emits.length > 0)
emits.each do |emit|
assert_equal("#{TMP_DIR}/tail.txt", emit[2]["path"])
end
end

def test_tail_path_with_multiline_with_multiple_paths
files = ["#{TMP_DIR}/tail1.txt", "#{TMP_DIR}/tail2.txt"]
files.each { |file| File.open(file, "wb") { |f| } }

d = create_driver(%[
path #{files[0]},#{files[1]}
path_key path
tag t1
format multiline
format1 /^[s|f] (?<message>.*)/
format_firstline /^[s]/
], false)
d.run do
files.each do |file|
File.open(file, 'ab') { |f|
f.puts "f #{file} line should be ignored"
f.puts "s test1"
f.puts "f test2"
f.puts "f test3"
f.puts "s test4"
}
end
sleep 1
end

emits = d.emits
assert(emits.length == 4)
assert_equal("#{TMP_DIR}/tail1.txt", emits[0][2]["path"])
assert_equal("#{TMP_DIR}/tail2.txt", emits[1][2]["path"])
# "test4" events are here because these events are flushed at shutdown phase
assert_equal("#{TMP_DIR}/tail1.txt", emits[2][2]["path"])
assert_equal("#{TMP_DIR}/tail2.txt", emits[3][2]["path"])
end
end
end

0 comments on commit 0ce19ec

Please sign in to comment.