Skip to content

Commit

Permalink
Merge pull request #1625 from mururu/capture-more-log-events
Browse files Browse the repository at this point in the history
Make sure log events generated at shutdown phase of plugins (except
  • Loading branch information
repeatedly authored Jul 12, 2017
2 parents 004bae1 + 8071cfd commit 8d773b7
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 7 deletions.
52 changes: 50 additions & 2 deletions lib/fluent/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def initialize(opts = {})
@started_outputs = []
@started_filters = []

@outputs_for_log_event = []
@filters_for_log_event = []
@started_outputs_for_log_event = []
@started_filters_for_log_event = []

@log = Engine.log
@event_router = EventRouter.new(NoMatchMatch.new(log), self)
@error_collector = nil
Expand Down Expand Up @@ -70,16 +75,47 @@ def start
@outputs.each { |o|
o.start
@started_outputs << o
@started_outputs_for_log_event << o if @outputs_for_log_event.include?(o)
}

@filters.each { |f|
f.start
@started_filters << f
@started_filters_for_log_event << f if @filters_for_log_event.include?(f)
}
end

def shutdown
@started_filters.map { |f|
(@started_filters - @started_filters_for_log_event).map { |f|
Thread.new do
begin
log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(f.class), plugin_id: f.plugin_id
f.shutdown
rescue => e
log.warn "unexpected error while shutting down filter plugins", plugin: f.class, plugin_id: f.plugin_id, error_class: e.class, error: e
log.warn_backtrace
end
end
}.each { |t| t.join }

# Output plugin as filter emits records at shutdown so emit problem still exist.
# This problem will be resolved after actual filter mechanizm.
(@started_outputs - @started_outputs_for_log_event).map { |o|
Thread.new do
begin
log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(o.class), plugin_id: o.plugin_id
o.shutdown
rescue => e
log.warn "unexpected error while shutting down output plugins", plugin: o.class, plugin_id: o.plugin_id, error_class: e.class, error: e
log.warn_backtrace
end
end
}.each { |t| t.join }

## execute callback from Engine to flush log event queue before shutting down corresponding filters and outputs
yield if block_given?

@started_filters_for_log_event.map { |f|
Thread.new do
begin
log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(f.class), plugin_id: f.plugin_id
Expand All @@ -93,7 +129,7 @@ def shutdown

# Output plugin as filter emits records at shutdown so emit problem still exist.
# This problem will be resolved after actual filter mechanizm.
@started_outputs.map { |o|
@started_outputs_for_log_event.map { |o|
Thread.new do
begin
log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(o.class), plugin_id: o.plugin_id
Expand Down Expand Up @@ -134,6 +170,10 @@ def add_match(type, pattern, conf)
@outputs << output
@event_router.add_rule(pattern, output)

if match_event_log_tag?(pattern)
@outputs_for_log_event << output
end

output
end

Expand All @@ -146,9 +186,17 @@ def add_filter(type, pattern, conf)
@filters << filter
@event_router.add_rule(pattern, filter)

if match_event_log_tag?(pattern)
@filters_for_log_event << filter
end

filter
end

def match_event_log_tag?(pattern)
EventRouter::Rule.new(pattern, nil).match?($log.tag)
end

# For handling invalid record
def emit_error_event(tag, time, record, error)
end
Expand Down
13 changes: 8 additions & 5 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,13 @@ def run
ensure
$log.info "shutting down fluentd"
if @log_emit_thread
@log_event_loop_stop = true
@log_emit_thread.join
shutdown do
@log_event_loop_stop = true
@log_emit_thread.join
end
else
shutdown
end
shutdown
end
end

Expand All @@ -237,8 +240,8 @@ def start
@root_agent.start
end

def shutdown
@root_agent.shutdown
def shutdown(&block)
@root_agent.shutdown(&block)
end
end

Expand Down

0 comments on commit 8d773b7

Please sign in to comment.