Skip to content

Commit

Permalink
in_tail: Skip setup failed watcher to avoid resource leak and log blo…
Browse files Browse the repository at this point in the history
…at. Backport fluent#1742
  • Loading branch information
Yuki Ito committed Nov 16, 2017
1 parent 33f7a9d commit 5d60b50
Showing 1 changed file with 34 additions and 5 deletions.
39 changes: 34 additions & 5 deletions lib/fluent/plugin/in_tail.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ module Fluent
class NewTailInput < Input
Plugin.register_input('tail', self)

class WatcherSetupError < StandardError
def initialize(msg)
@message = msg
end

def to_s
@message
end
end

def initialize
super
@paths = []
Expand Down Expand Up @@ -65,6 +75,8 @@ def initialize
config_param :skip_refresh_on_startup, :bool, default: false
desc 'Ignore repeated permission error logs'
config_param :ignore_repeated_permission_error, :bool, default: false
# This option is for Cool.io's loop wait timeout to avoid loop stuck at shutdown. Almost users don't need to change this value.
config_param :blocking_timeout, :time, default: 0.5

attr_reader :paths

Expand Down Expand Up @@ -207,6 +219,9 @@ def setup_watcher(path, pe)
tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, &method(:receive_lines))
tw.attach(@loop)
tw
rescue => e
tw.close if tw
raise e
end

def start_watchers(paths)
Expand All @@ -223,7 +238,13 @@ def start_watchers(paths)
end
end

@tails[path] = setup_watcher(path, pe)
begin
tw = setup_watcher(path, pe)
rescue WatcherSetupError => e
log.warn "Skip #{path} because unexpected setup error happens: #{e}"
next
end
@tails[path] = tw
}
end

Expand Down Expand Up @@ -292,7 +313,7 @@ def flush_buffer(tw)
end

def run
@loop.run
@loop.run(@blocking_timeout)
rescue
log.error "unexpected error", error: $!.to_s
log.error_backtrace
Expand Down Expand Up @@ -436,8 +457,8 @@ def attach(loop)
end

def detach
@timer_trigger.detach if @enable_watch_timer && @timer_trigger.attached?
@stat_trigger.detach if @stat_trigger.attached?
@timer_trigger.detach if @enable_watch_timer && @timer_trigger && @timer_trigger.attached?
@stat_trigger.detach if @stat_trigger && @stat_trigger.attached?
end

def close(close_io = true)
Expand Down Expand Up @@ -485,7 +506,12 @@ def on_rotate(io)
pos = @read_from_head ? 0 : fsize
@pe.update(inode, pos)
end
io.seek(pos)

begin
io.seek(pos)
rescue RangeError
raise WatcherSetupError, "seek error with #{@path}: file_position = #{pos.to_s(16)}"
end

@io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines))
else
Expand Down Expand Up @@ -690,6 +716,9 @@ def on_notify
@fsize = fsize
end

rescue WatcherSetupError => e
io.close if io
raise e
rescue
@log.error $!.to_s
@log.error_backtrace
Expand Down

0 comments on commit 5d60b50

Please sign in to comment.