diff --git a/lib/fluent/agent.rb b/lib/fluent/agent.rb index 7bef4d97c3..4a2b62ed61 100644 --- a/lib/fluent/agent.rb +++ b/lib/fluent/agent.rb @@ -38,6 +38,11 @@ def initialize(opts = {}) @started_outputs = [] @started_filters = [] + @outputs_for_log_event = [] + @filters_for_log_event = [] + @started_outputs_for_log_event = [] + @started_filters_for_log_event = [] + @log = Engine.log @event_router = EventRouter.new(NoMatchMatch.new(log), self) @error_collector = nil @@ -70,16 +75,47 @@ def start @outputs.each { |o| o.start @started_outputs << o + @started_outputs_for_log_event << o if @outputs_for_log_event.include?(o) } @filters.each { |f| f.start @started_filters << f + @started_filters_for_log_event << f if @filters_for_log_event.include?(f) } end def shutdown - @started_filters.map { |f| + (@started_filters - @started_filters_for_log_event).map { |f| + Thread.new do + begin + log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(f.class), plugin_id: f.plugin_id + f.shutdown + rescue => e + log.warn "unexpected error while shutting down filter plugins", plugin: f.class, plugin_id: f.plugin_id, error_class: e.class, error: e + log.warn_backtrace + end + end + }.each { |t| t.join } + + # Output plugin as filter emits records at shutdown so emit problem still exist. + # This problem will be resolved after actual filter mechanizm. + (@started_outputs - @started_outputs_for_log_event).map { |o| + Thread.new do + begin + log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(o.class), plugin_id: o.plugin_id + o.shutdown + rescue => e + log.warn "unexpected error while shutting down output plugins", plugin: o.class, plugin_id: o.plugin_id, error_class: e.class, error: e + log.warn_backtrace + end + end + }.each { |t| t.join } + + ## execute callback from Engine to flush log event queue before shutting down corresponding filters and outputs + yield if block_given? + + @started_filters_for_log_event.map { |f| Thread.new do begin log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(f.class), plugin_id: f.plugin_id @@ -93,7 +129,7 @@ def shutdown # Output plugin as filter emits records at shutdown so emit problem still exist. # This problem will be resolved after actual filter mechanizm. - @started_outputs.map { |o| + @started_outputs_for_log_event.map { |o| Thread.new do begin log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(o.class), plugin_id: o.plugin_id @@ -134,6 +170,10 @@ def add_match(type, pattern, conf) @outputs << output @event_router.add_rule(pattern, output) + if match_event_log_tag?(pattern) + @outputs_for_log_event << output + end + output end @@ -146,9 +186,17 @@ def add_filter(type, pattern, conf) @filters << filter @event_router.add_rule(pattern, filter) + if match_event_log_tag?(pattern) + @filters_for_log_event << filter + end + filter end + def match_event_log_tag?(pattern) + EventRouter::Rule.new(pattern, nil).match?($log.tag) + end + # For handling invalid record def emit_error_event(tag, time, record, error) end diff --git a/lib/fluent/engine.rb b/lib/fluent/engine.rb index 0de59ba4d6..e20f031bd2 100644 --- a/lib/fluent/engine.rb +++ b/lib/fluent/engine.rb @@ -210,10 +210,13 @@ def run ensure $log.info "shutting down fluentd" if @log_emit_thread - @log_event_loop_stop = true - @log_emit_thread.join + shutdown do + @log_event_loop_stop = true + @log_emit_thread.join + end + else + shutdown end - shutdown end end @@ -237,8 +240,8 @@ def start @root_agent.start end - def shutdown - @root_agent.shutdown + def shutdown(&block) + @root_agent.shutdown(&block) end end