Skip to content

Commit

Permalink
Merge pull request #972 from fluent/introduce-v14-test-drivers
Browse files Browse the repository at this point in the history
Introduce v14 test drivers
  • Loading branch information
tagomoris committed May 24, 2016
2 parents 5109677 + d8cae43 commit 9c4a64b
Show file tree
Hide file tree
Showing 26 changed files with 1,119 additions and 354 deletions.
32 changes: 23 additions & 9 deletions lib/fluent/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,32 @@

module Fluent
module Config
def self.parse(str, fname, basepath = Dir.pwd, v1_config = false)
if fname =~ /\.rb$/
def self.parse(str, fname, basepath = Dir.pwd, v1_config = nil, syntax: :v1)
parser = if fname =~ /\.rb$/ || syntax == :ruby
:ruby
elsif v1_config.nil?
case syntax
when :v1 then :v1
when :v0 then :v0
else
raise ArgumentError, "Unknown Fluentd configuration syntax: '#{syntax}'"
end
elsif v1_config then :v1
else :v0
end
case parser
when :v1
require 'fluent/config/v1_parser'
V1Parser.parse(str, fname, basepath, Kernel.binding)
when :v0
# TODO: show deprecated message in v1
require 'fluent/config/parser'
Parser.parse(str, fname, basepath)
when :ruby
require 'fluent/config/dsl'
Config::DSL::Parser.parse(str, File.join(basepath, fname))
else
if v1_config
require 'fluent/config/v1_parser'
V1Parser.parse(str, fname, basepath, Kernel.binding)
else
require 'fluent/config/parser'
Parser.parse(str, fname, basepath)
end
raise "[BUG] unknown configuration parser specification:'#{parser}'"
end
end

Expand Down
155 changes: 80 additions & 75 deletions lib/fluent/plugin/exec_util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,106 +22,111 @@
require 'fluent/parser'

module Fluent
module ExecUtil
SUPPORTED_FORMAT = {
'tsv' => :tsv,
'json' => :json,
'msgpack' => :msgpack,
}

class Parser
def initialize(on_message)
@on_message = on_message
module Plugin
module ExecUtil
SUPPORTED_FORMAT = {
'tsv' => :tsv,
'json' => :json,
'msgpack' => :msgpack,
}

class Parser
def initialize(on_message)
@on_message = on_message
end
end
end

class TextParserWrapperParser < Parser
def initialize(conf, on_message)
@parser = Plugin.new_parser(conf['format'])
@parser.configure(conf)
super(on_message)
end
class TextParserWrapperParser < Parser
def initialize(conf, on_message)
@parser = Plugin.new_parser(conf['format'])
@parser.configure(conf)
super(on_message)
end

def call(io)
io.each_line(&method(:each_line))
end
def call(io)
io.each_line(&method(:each_line))
end

def each_line(line)
line.chomp!
@parser.parse(line) { |time, record|
@on_message.call(record, time)
}
def each_line(line)
line.chomp!
@parser.parse(line) { |time, record|
@on_message.call(record, time)
}
end
end
end

class TSVParser < Parser
def initialize(keys, on_message)
@keys = keys
super(on_message)
end
class TSVParser < Parser
def initialize(keys, on_message)
@keys = keys
super(on_message)
end

def call(io)
io.each_line(&method(:each_line))
end
def call(io)
io.each_line(&method(:each_line))
end

def each_line(line)
line.chomp!
vals = line.split("\t")
def each_line(line)
line.chomp!
vals = line.split("\t")

record = Hash[@keys.zip(vals)]
record = Hash[@keys.zip(vals)]

@on_message.call(record)
@on_message.call(record)
end
end
end

class JSONParser < Parser
def call(io)
y = Yajl::Parser.new
y.on_parse_complete = @on_message
y.parse(io)
class JSONParser < Parser
def call(io)
y = Yajl::Parser.new
y.on_parse_complete = @on_message
y.parse(io)
end
end
end

class MessagePackParser < Parser
def call(io)
@u = Fluent::Engine.msgpack_factory.unpacker(io)
begin
@u.each(&@on_message)
rescue EOFError
class MessagePackParser < Parser
def call(io)
@u = Fluent::Engine.msgpack_factory.unpacker(io)
begin
@u.each(&@on_message)
rescue EOFError
end
end
end
end

class Formatter
end

class TSVFormatter < Formatter
def initialize(in_keys)
@in_keys = in_keys
super()
class Formatter
end

def call(record, out)
last = @in_keys.length-1
for i in 0..last
key = @in_keys[i]
out << record[key].to_s
out << "\t" if i != last
class TSVFormatter < Formatter
def initialize(in_keys)
@in_keys = in_keys
super()
end

def call(record, out)
last = @in_keys.length-1
for i in 0..last
key = @in_keys[i]
out << record[key].to_s
out << "\t" if i != last
end
out << "\n"
end
out << "\n"
end
end

class JSONFormatter < Formatter
def call(record, out)
out << Yajl.dump(record) << "\n"
class JSONFormatter < Formatter
def call(record, out)
out << Yajl.dump(record) << "\n"
end
end
end

class MessagePackFormatter < Formatter
def call(record, out)
record.to_msgpack(out)
class MessagePackFormatter < Formatter
def call(record, out)
record.to_msgpack(out)
end
end
end
end

# obsolete
ExecUtil = Fluent::Plugin::ExecUtil
end
61 changes: 33 additions & 28 deletions lib/fluent/plugin/file_util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,43 @@
#

module Fluent
module FileUtil
# Check file is writable if file exists
# Check directory is writable if file does not exist
#
# @param [String] path File path
# @return [Boolean] file is writable or not
def writable?(path)
return false if File.directory?(path)
return File.writable?(path) if File.exist?(path)
module Plugin
module FileUtil
# Check file is writable if file exists
# Check directory is writable if file does not exist
#
# @param [String] path File path
# @return [Boolean] file is writable or not
def writable?(path)
return false if File.directory?(path)
return File.writable?(path) if File.exist?(path)

dirname = File.dirname(path)
return false if !File.directory?(dirname)
File.writable?(dirname)
end
module_function :writable?
dirname = File.dirname(path)
return false if !File.directory?(dirname)
File.writable?(dirname)
end
module_function :writable?

# Check file is writable in conjunction wtih mkdir_p(dirname(path))
#
# @param [String] path File path
# @return [Boolean] file writable or not
def writable_p?(path)
return false if File.directory?(path)
return File.writable?(path) if File.exist?(path)
# Check file is writable in conjunction wtih mkdir_p(dirname(path))
#
# @param [String] path File path
# @return [Boolean] file writable or not
def writable_p?(path)
return false if File.directory?(path)
return File.writable?(path) if File.exist?(path)

dirname = File.dirname(path)
until File.exist?(dirname)
dirname = File.dirname(dirname)
end
dirname = File.dirname(path)
until File.exist?(dirname)
dirname = File.dirname(dirname)
end

return false if !File.directory?(dirname)
File.writable?(dirname)
return false if !File.directory?(dirname)
File.writable?(dirname)
end
module_function :writable_p?
end
module_function :writable_p?
end

# obsolete
FileUtil = Fluent::Plugin::FileUtil
end
35 changes: 19 additions & 16 deletions lib/fluent/plugin/in_dummy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

require 'json'

require 'fluent/input'
require 'fluent/plugin/input'
require 'fluent/config/error'

module Fluent
module Fluent::Plugin
class DummyInput < Input
Fluent::Plugin.register_input('dummy', self)

helpers :thread, :storage

BIN_NUM = 10

desc "The value is the tag assigned to the generated events."
Expand All @@ -48,37 +50,39 @@ class DummyInput < Input
dummy
end

def configure(conf)
def initialize
super
@storage = nil
end

@increment_value = 0
def configure(conf)
super
@dummy_index = 0
end

def start
super
@running = true
@thread = Thread.new(&method(:run))
end

def shutdown
@running = false
@thread.join
super
@storage = storage_create(type: 'local')
if @auto_increment_key && !@storage.get(:auto_increment_value)
@storage.put(:auto_increment_value, -1)
end

thread_create(:dummy_input, &method(:run))
end

def run
batch_num = (@rate / BIN_NUM).to_i
residual_num = (@rate % BIN_NUM)
while @running
while thread_current_running?
current_time = Time.now.to_i
BIN_NUM.times do
break unless (@running && Time.now.to_i <= current_time)
break unless (thread_current_running? && Time.now.to_i <= current_time)
wait(0.1) { emit(batch_num) }
end
emit(residual_num)
# wait for next second
while @running && Time.now.to_i <= current_time
while thread_current_running? && Time.now.to_i <= current_time
sleep 0.01
end
end
Expand All @@ -97,8 +101,7 @@ def generate
@dummy_index += 1
if @auto_increment_key
d = d.dup
d[@auto_increment_key] = @increment_value
@increment_value += 1
d[@auto_increment_key] = @storage.update(:auto_increment_value){|v| v + 1 }
end
d
end
Expand Down
Loading

0 comments on commit 9c4a64b

Please sign in to comment.