Skip to content

Commit

Permalink
Merge pull request #1302 from fluent/show-errors-on-console-under-plu…
Browse files Browse the repository at this point in the history
…gin-development

Show errors on console under plugin development
  • Loading branch information
tagomoris authored Nov 2, 2016
2 parents d7df62b + 34dc637 commit 54d3509
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 61 deletions.
15 changes: 0 additions & 15 deletions lib/fluent/plugin/out_stdout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,8 @@ def prefer_buffered_processing
false
end

def prefer_delayed_commit
@delayed
end

attr_accessor :delayed
attr_accessor :formatter

def initialize
super
@delayed = false
end

def configure(conf)
compat_parameters_convert(conf, :inject, :formatter)

Expand All @@ -76,10 +66,5 @@ def format(tag, time, record)
def write(chunk)
chunk.write_to($log)
end

def try_write(chunk)
chunk.write_to($log)
commit_write(chunk.unique_id)
end
end
end
28 changes: 17 additions & 11 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,16 @@ def expired?

# for tests
attr_reader :buffer, :retry, :secondary, :chunk_keys, :chunk_key_time, :chunk_key_tag
attr_accessor :output_enqueue_thread_waiting, :in_tests
attr_accessor :output_enqueue_thread_waiting, :dequeued_chunks, :dequeued_chunks_mutex

# output_enqueue_thread_waiting: for test of output.rb itself
# in_tests: for tests of plugins with test drivers

def initialize
super
@counters_monitor = Monitor.new
@buffering = false
@delayed_commit = false
@as_secondary = false
@in_tests = false
@primary_instance = nil

# TODO: well organized counters
Expand All @@ -188,6 +186,7 @@ def initialize
@secondary = nil
@retry = nil
@dequeued_chunks = nil
@dequeued_chunks_mutex = nil
@output_enqueue_thread = nil
@output_flush_threads = nil

Expand Down Expand Up @@ -399,10 +398,8 @@ def start
end
@output_flush_thread_current_position = 0

unless @in_tests
if @flush_mode == :interval || @chunk_key_time
@output_enqueue_thread = thread_create(:enqueue_thread, &method(:enqueue_thread_run))
end
if !@under_plugin_development && (@flush_mode == :interval || @chunk_key_time)
@output_enqueue_thread = thread_create(:enqueue_thread, &method(:enqueue_thread_run))
end
end
@secondary.start if @secondary
Expand Down Expand Up @@ -981,11 +978,11 @@ def try_flush
if output.delayed_commit
log.trace "executing delayed write and commit", chunk: dump_unique_id_hex(chunk.unique_id)
@counters_monitor.synchronize{ @write_count += 1 }
output.try_write(chunk)
@dequeued_chunks_mutex.synchronize do
# delayed_commit_timeout for secondary is configured in <buffer> of primary (<secondary> don't get <buffer>)
@dequeued_chunks << DequeuedChunkInfo.new(chunk.unique_id, Time.now, self.delayed_commit_timeout)
end
output.try_write(chunk)
else # output plugin without delayed purge
chunk_id = chunk.unique_id
dump_chunk_id = dump_unique_id_hex(chunk_id)
Expand All @@ -994,11 +991,16 @@ def try_flush
log.trace "executing sync write", chunk: dump_chunk_id
output.write(chunk)
log.trace "write operation done, committing", chunk: dump_chunk_id
commit_write(chunk_id, secondary: using_secondary)
commit_write(chunk_id, delayed: false, secondary: using_secondary)
log.trace "done to commit a chunk", chunk: dump_chunk_id
end
rescue => e
log.debug "taking back chunk for errors.", plugin_id: plugin_id, chunk: dump_unique_id_hex(chunk.unique_id)
if output.delayed_commit
@dequeued_chunks_mutex.synchronize do
@dequeued_chunks.delete_if{|d| d.chunk_id == chunk.unique_id }
end
end
@buffer.takeback_chunk(chunk.unique_id)

if @under_plugin_development
Expand Down Expand Up @@ -1059,7 +1061,11 @@ def submit_flush_once
@output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count
state = @output_flush_threads[@output_flush_thread_current_position]
state.next_time = 0
state.thread.run
if state.thread && state.thread.status # "run"/"sleep"/"aborting" or false(successfully stop) or nil(killed by exception)
state.thread.run
else
log.warn "thread is already dead"
end
end

def force_flush
Expand Down Expand Up @@ -1204,7 +1210,7 @@ def flush_thread_run(state)
rescue => e
# normal errors are rescued by output plugins in #try_flush
# so this rescue section is for critical & unrecoverable errors
log.error "error on output thread", plugin_id: plugin_id, error_class: e.class, error: e
log.error "error on output thread", plugin_id: plugin_id, error: e
log.error_backtrace
raise
end
Expand Down
6 changes: 6 additions & 0 deletions lib/fluent/plugin_helper/thread.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ def thread_create(title)
yield
thread_exit = true
rescue Exception => e
if @under_plugin_development
STDERR.puts "\nError raised in thread from #thread_create, #{e.class}:#{e.message}"
e.backtrace.each do |msg|
STDERR.puts [" ", msg].join
end
end
log.warn "thread exited by unexpected error", plugin: self.class, title: title, error: e
thread_exit = true
raise
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/test/driver/base_owner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def initialize(klass, opts: {}, &block)
@instance.system_config_override(opts)
end
@instance.log = TestLogger.new
@instance.log.under_plugin_development = true
@logs = @instance.log.out.logs

@event_streams = nil
Expand Down
22 changes: 19 additions & 3 deletions lib/fluent/test/driver/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ class Output < BaseOwner
def initialize(klass, opts: {}, &block)
super
raise ArgumentError, "plugin is not an instance of Fluent::Plugin::Output" unless @instance.is_a? Fluent::Plugin::Output
@instance.in_tests = true
@flush_buffer_at_cleanup = nil
@wait_flush_completion = nil
@format_hook = nil
@format_results = []
end

def run(flush: true, **kwargs, &block)
def run(flush: true, wait_flush_completion: true, **kwargs, &block)
@flush_buffer_at_cleanup = flush
@wait_flush_completion = wait_flush_completion
super(**kwargs, &block)
end

Expand All @@ -55,7 +56,22 @@ def formatted

def flush
@instance.force_flush
Timeout.timeout(10){ sleep 0.1 until !@instance.buffer || @instance.buffer.queue.size == 0 }
wait_flush_completion if @wait_flush_completion
end

def wait_flush_completion
buffer_queue = ->(){ @instance.buffer && @instance.buffer.queue.size > 0 }
dequeued_chunks = ->(){
@instance.dequeued_chunks_mutex &&
@instance.dequeued_chunks &&
@instance.dequeued_chunks_mutex.synchronize{ @instance.dequeued_chunks.size > 0 }
}

Timeout.timeout(10) do
while buffer_queue.call || dequeued_chunks.call
sleep 0.1
end
end
end

def instance_hook_after_started
Expand Down
46 changes: 46 additions & 0 deletions lib/fluent/test/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class DummyLogDevice
def initialize
@logs = []
@flush_logs = true
@use_stderr = false
end

def reset
Expand All @@ -40,7 +41,17 @@ def puts(*args)
args.each{ |arg| write(arg + "\n") }
end

def dump_stderr(&block)
@use_stderr = true
block.call
ensure
@use_stderr = false
end

def write(message)
if @use_stderr
STDERR.write message
end
@logs.push message
end

Expand All @@ -54,15 +65,50 @@ def close
end

class TestLogger < Fluent::PluginLogger
attr_accessor :under_plugin_development

def initialize
@logdev = DummyLogDevice.new
@under_plugin_development = false
dl_opts = {}
dl_opts[:log_level] = ServerEngine::DaemonLogger::INFO
logger = ServerEngine::DaemonLogger.new(@logdev, dl_opts)
log = Fluent::Log.new(logger)
super(log)
end

def error(*args, &block)
if @under_plugin_development
@logdev.dump_stderr{ super }
else
super
end
end

def error_backtrace(backtrace=$!.backtrace)
if @under_plugin_development
@logdev.dump_stderr{ super }
else
super
end
end

def fatal(*args, &block)
if @under_plugin_development
@logdev.dump_stderr{ super }
else
super
end
end

def fatal_backtrace(backtrace=$!.backtrace)
if @under_plugin_development
@logdev.dump_stderr{ super }
else
super
end
end

def reset
@logdev.reset
end
Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_in_dummy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def create_driver(conf)
sub_test_case 'configure' do
test 'required parameters' do
assert_raise_message("'tag' parameter is required") do
create_driver('')
Fluent::Plugin::DummyInput.new.configure(config_element('ROOT',''))
end
end

Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_out_null.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def create_driver(conf = "")
d.instance.delayed = true

t = event_time("2016-05-23 00:22:13 -0800")
d.run(default_tag: 'test', flush: true, shutdown: false) do
d.run(default_tag: 'test', flush: true, wait_flush_completion: false, shutdown: false) do
d.feed(t, {"message" => "null null null"})
d.feed(t, {"message" => "null null"})
d.feed(t, {"message" => "null"})
Expand Down
4 changes: 3 additions & 1 deletion test/plugin/test_out_secondary_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ def create_driver(conf = CONFIG, primary = create_primary)

test 'should be passed directory' do
assert_raise Fluent::ConfigError do
create_driver %[]
i = Fluent::Plugin::SecondaryFileOutput.new
i.acts_as_secondary(create_primary)
i.configure(config_element())
end

assert_nothing_raised do
Expand Down
29 changes: 0 additions & 29 deletions test/plugin/test_out_stdout.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,6 @@ def create_driver(conf = CONFIG)
end
assert_equal "#{Time.at(time).localtime.strftime(TIME_FORMAT)} test: {\"test\":\"test\"}\n", out
end

data('oj' => 'oj', 'yajl' => 'yajl')
test '#try_write(asynchronous)' do |data|
d = create_driver(config_element("ROOT", "", {"output_type" => "json", "json_parser" => data}, [config_element("buffer")]))
time = event_time()
d.instance.delayed = true

out = capture_log do
d.run(default_tag: 'test', flush: true, shutdown: false) do
d.feed(time, {'test' => 'test'})
end
end

assert_equal "#{Time.at(time).localtime.strftime(TIME_FORMAT)} test: {\"test\":\"test\"}\n", out
end
end

sub_test_case 'emit hash' do
Expand All @@ -169,20 +154,6 @@ def create_driver(conf = CONFIG)

assert_equal "#{Time.at(time).localtime.strftime(TIME_FORMAT)} test: {\"test\"=>\"test\"}\n", out
end

test '#try_write(asynchronous)' do
d = create_driver(config_element("ROOT", "", {"output_type" => "hash"}, [config_element("buffer")]))
time = event_time()
d.instance.delayed = true

out = capture_log do
d.run(default_tag: 'test', flush: true, shutdown: false) do
d.feed(time, {'test' => 'test'})
end
end

assert_equal "#{Time.at(time).localtime.strftime(TIME_FORMAT)} test: {\"test\"=>\"test\"}\n", out
end
end
end

Expand Down

0 comments on commit 54d3509

Please sign in to comment.