Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure log events generated at shutdown phase of plugins (except #1625

Merged
merged 1 commit into from
Jul 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions lib/fluent/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def initialize(opts = {})
@started_outputs = []
@started_filters = []

@outputs_for_log_event = []
@filters_for_log_event = []
@started_outputs_for_log_event = []
@started_filters_for_log_event = []

@log = Engine.log
@event_router = EventRouter.new(NoMatchMatch.new(log), self)
@error_collector = nil
Expand Down Expand Up @@ -70,16 +75,47 @@ def start
@outputs.each { |o|
o.start
@started_outputs << o
@started_outputs_for_log_event << o if @outputs_for_log_event.include?(o)
}

@filters.each { |f|
f.start
@started_filters << f
@started_filters_for_log_event << f if @filters_for_log_event.include?(f)
}
end

def shutdown
@started_filters.map { |f|
(@started_filters - @started_filters_for_log_event).map { |f|
Thread.new do
begin
log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(f.class), plugin_id: f.plugin_id
f.shutdown
rescue => e
log.warn "unexpected error while shutting down filter plugins", plugin: f.class, plugin_id: f.plugin_id, error_class: e.class, error: e
log.warn_backtrace
end
end
}.each { |t| t.join }

# Output plugin as filter emits records at shutdown so emit problem still exist.
# This problem will be resolved after actual filter mechanizm.
(@started_outputs - @started_outputs_for_log_event).map { |o|
Thread.new do
begin
log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(o.class), plugin_id: o.plugin_id
o.shutdown
rescue => e
log.warn "unexpected error while shutting down output plugins", plugin: o.class, plugin_id: o.plugin_id, error_class: e.class, error: e
log.warn_backtrace
end
end
}.each { |t| t.join }

## execute callback from Engine to flush log event queue before shutting down corresponding filters and outputs
yield if block_given?

@started_filters_for_log_event.map { |f|
Thread.new do
begin
log.info "shutting down filter#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(f.class), plugin_id: f.plugin_id
Expand All @@ -93,7 +129,7 @@ def shutdown

# Output plugin as filter emits records at shutdown so emit problem still exist.
# This problem will be resolved after actual filter mechanizm.
@started_outputs.map { |o|
@started_outputs_for_log_event.map { |o|
Thread.new do
begin
log.info "shutting down output#{@context.nil? ? '' : " in #{@context}"}", type: Plugin.lookup_name_from_class(o.class), plugin_id: o.plugin_id
Expand Down Expand Up @@ -134,6 +170,10 @@ def add_match(type, pattern, conf)
@outputs << output
@event_router.add_rule(pattern, output)

if match_event_log_tag?(pattern)
@outputs_for_log_event << output
end

output
end

Expand All @@ -146,9 +186,17 @@ def add_filter(type, pattern, conf)
@filters << filter
@event_router.add_rule(pattern, filter)

if match_event_log_tag?(pattern)
@filters_for_log_event << filter
end

filter
end

def match_event_log_tag?(pattern)
EventRouter::Rule.new(pattern, nil).match?($log.tag)
end

# For handling invalid record
def emit_error_event(tag, time, record, error)
end
Expand Down
13 changes: 8 additions & 5 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,13 @@ def run
ensure
$log.info "shutting down fluentd"
if @log_emit_thread
@log_event_loop_stop = true
@log_emit_thread.join
shutdown do
@log_event_loop_stop = true
@log_emit_thread.join
end
else
shutdown
end
shutdown
end
end

Expand All @@ -237,8 +240,8 @@ def start
@root_agent.start
end

def shutdown
@root_agent.shutdown
def shutdown(&block)
@root_agent.shutdown(&block)
end
end

Expand Down