Skip to content

Commit

Permalink
Merge pull request #2105 from fluent/fix-in_tail-resource-leak
Browse files Browse the repository at this point in the history
in_tail: Fix rotation related resource leak. fix #1941
  • Loading branch information
repeatedly authored Aug 20, 2018
2 parents 9e99c8d + deecfbe commit 1d7e09b
Showing 1 changed file with 29 additions and 8 deletions.
37 changes: 29 additions & 8 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,16 @@ def setup_watcher(path, pe)
line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @enable_stat_watcher, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, @from_encoding, @encoding, open_on_every_update, &method(:receive_lines))
tw.attach do |watcher|
watcher.timer_trigger = timer_execute(:in_tail_timer_trigger, 1, &watcher.method(:on_notify)) if watcher.enable_watch_timer
event_loop_attach(watcher.stat_trigger) if watcher.enable_stat_watcher
event_loop_attach(watcher.timer_trigger) if watcher.timer_trigger
event_loop_attach(watcher.stat_trigger) if watcher.stat_trigger
end
tw
rescue => e
if tw
tw.detach
tw.detach { |watcher|
event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger
event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger
}
tw.close
end
raise e
Expand Down Expand Up @@ -343,7 +346,10 @@ def update_watcher(path, pe)
# so adding close_io argument to avoid this problem.
# At shutdown, IOHandler's io will be released automatically after detached the event loop
def detach_watcher(tw, close_io = true)
tw.detach
tw.detach { |watcher|
event_loop_detach(watcher.timer_trigger) if watcher.timer_trigger
event_loop_detach(watcher.stat_trigger) if watcher.stat_trigger
}
tw.close if close_io
flush_buffer(tw)
if tw.unwatched && @pf
Expand All @@ -352,6 +358,8 @@ def detach_watcher(tw, close_io = true)
end

def detach_watcher_after_rotate_wait(tw)
# Call event_loop_attach/event_loop_detach is high-cost for short-live object.
# If this has a problem with large number of files, use @_event_loop directly instead of timer_execute.
timer_execute(:in_tail_close_watcher, @rotate_wait, repeat: false) do
detach_watcher(tw)
end
Expand Down Expand Up @@ -479,7 +487,7 @@ def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, e
@update_watcher = update_watcher

@stat_trigger = @enable_stat_watcher ? StatWatcher.new(self, &method(:on_notify)) : nil
@timer_trigger = nil
@timer_trigger = @enable_watch_timer ? TimerTrigger.new(1, log, &method(:on_notify)) : nil

@rotate_handler = RotateHandler.new(self, &method(:on_rotate))
@io_handler = nil
Expand Down Expand Up @@ -513,8 +521,7 @@ def attach
end

def detach
@timer_trigger.detach if @enable_watch_timer && @timer_trigger && @timer_trigger.attached?
@stat_trigger.detach if @enable_stat_watcher && @stat_trigger && @stat_trigger.attached?
yield self
@io_handler.on_notify if @io_handler
end

Expand Down Expand Up @@ -613,6 +620,21 @@ def swap_state(pe)
pe # This pe will be updated in on_rotate after TailWatcher is initialized
end

class TimerTrigger < Coolio::TimerWatcher
def initialize(interval, log, &callback)
@callback = callback
@log = log
super(interval, true)
end

def on_timer
@callback.call
rescue => e
@log.error e.to_s
@log.error_backtrace
end
end

class StatWatcher < Coolio::StatWatcher
def initialize(watcher, &callback)
@watcher = watcher
Expand All @@ -629,7 +651,6 @@ def on_change(prev, cur)
end
end


class FIFO
def initialize(from_encoding, encoding)
@from_encoding = from_encoding
Expand Down

0 comments on commit 1d7e09b

Please sign in to comment.