Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

buffer: backup broken file chunk #4025

Merged
merged 6 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,20 @@ def generate_chunk(metadata)
end

def handle_broken_files(path, mode, e)
log.error "found broken chunk file during resume. Deleted corresponding files:", :path => path, :mode => mode, :err_msg => e.message
# After support 'backup_dir' feature, these files are moved to backup_dir instead of unlink.
log.error "found broken chunk file during resume.", :path => path, :mode => mode, :err_msg => e.message
unique_id = Fluent::Plugin::Buffer::FileChunk.unique_id_from_path(path)
backup(unique_id) { |f|
File.open(path, 'rb') { |chunk|
chunk.set_encoding(Encoding::ASCII_8BIT)
chunk.sync = true
chunk.binmode
IO.copy_stream(chunk, f)
}
}
rescue => error
log.error "backup failed. Delete corresponding files.", :err_msg => error.message
ensure
log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(unique_id)} chunk is thrown away." if @disable_chunk_backup
File.unlink(path, path + '.meta') rescue nil
end

Expand Down
16 changes: 14 additions & 2 deletions lib/fluent/plugin/buf_file_single.rb
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,20 @@ def generate_chunk(metadata)
end

def handle_broken_files(path, mode, e)
log.error "found broken chunk file during resume. Delete corresponding files:", path: path, mode: mode, err_msg: e.message
# After support 'backup_dir' feature, these files are moved to backup_dir instead of unlink.
log.error "found broken chunk file during resume.", :path => path, :mode => mode, :err_msg => e.message
unique_id, _ = Fluent::Plugin::Buffer::FileSingleChunk.unique_id_and_key_from_path(path)
backup(unique_id) { |f|
File.open(path, 'rb') { |chunk|
chunk.set_encoding(Encoding::ASCII_8BIT)
chunk.sync = true
chunk.binmode
IO.copy_stream(chunk, f)
}
}
rescue => error
log.error "backup failed. Delete corresponding files.", :err_msg => error.message
ensure
log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(unique_id)} chunk is thrown away." if @disable_chunk_backup
File.unlink(path) rescue nil
end

Expand Down
21 changes: 21 additions & 0 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than
desc 'Compress buffered data.'
config_param :compress, :enum, list: [:text, :gzip], default: :text

desc 'If true, chunks are thrown away when unrecoverable error happens'
config_param :disable_chunk_backup, :bool, default: false

Metadata = Struct.new(:timekey, :tag, :variables, :seq) do
def initialize(timekey, tag, variables)
super(timekey, tag, variables, 0)
Expand Down Expand Up @@ -903,6 +906,24 @@ def statistics
{ 'buffer' => stats }
end

def backup(chunk_unique_id)
unique_id = dump_unique_id_hex(chunk_unique_id)

if @disable_chunk_backup
log.warn "disable_chunk_backup is true. #{unique_id} chunk is not backed up."
return
end

safe_owner_id = owner.plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_')
backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR
backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_owner_id, "#{unique_id}.log")
backup_dir = File.dirname(backup_file)

log.warn "bad chunk is moved to #{backup_file}"
FileUtils.mkdir_p(backup_dir, mode: system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(backup_dir)
File.open(backup_file, 'ab', system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION) { |f| yield f }
end

private

def optimistic_queued?(metadata = nil)
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def self.generate_queued_chunk_path(path, unique_id)
end
end

# used only for queued v0.12 buffer path
# used only for queued v0.12 buffer path or broken files
def self.unique_id_from_path(path)
if /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n =~ path # //n switch means explicit 'ASCII-8BIT' pattern
return $2.scan(/../).map{|x| x.to_i(16) }.pack('C*')
Expand Down
20 changes: 9 additions & 11 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ class Output < Base
config_param :retry_max_interval, :time, default: nil, desc: 'The maximum interval seconds for exponential backoff between retries while failing.'

config_param :retry_randomize, :bool, default: true, desc: 'If true, output plugin will retry after randomized interval not to do burst retries.'
config_param :disable_chunk_backup, :bool, default: false, desc: 'If true, chunks are thrown away when unrecoverable error happens'
end

config_section :secondary, param_name: :secondary_config, required: false, multi: false, final: true do
Expand Down Expand Up @@ -378,6 +377,7 @@ def configure(conf)
buffer_conf = conf.elements(name: 'buffer').first || Fluent::Config::Element.new('buffer', '', {}, [])
@buffer = Plugin.new_buffer(buffer_type, parent: self)
@buffer.configure(buffer_conf)
keep_buffer_config_compat
ashie marked this conversation as resolved.
Show resolved Hide resolved
@buffer.enable_update_timekeys if @chunk_key_time

@flush_at_shutdown = @buffer_config.flush_at_shutdown
Expand Down Expand Up @@ -435,6 +435,12 @@ def configure(conf)
self
end

def keep_buffer_config_compat
# Need this to call `@buffer_config.disable_chunk_backup` just as before,
# since some plugins may use this option in this way.
@buffer_config[:disable_chunk_backup] = @buffer.disable_chunk_backup
end

def start
super

Expand Down Expand Up @@ -1240,18 +1246,10 @@ def try_flush
end

def backup_chunk(chunk, using_secondary, delayed_commit)
if @buffer_config.disable_chunk_backup
if @buffer.disable_chunk_backup
log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(chunk.unique_id)} chunk is thrown away"
else
unique_id = dump_unique_id_hex(chunk.unique_id)
safe_plugin_id = plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_')
backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR
backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_plugin_id, "#{unique_id}.log")
backup_dir = File.dirname(backup_file)

log.warn "bad chunk is moved to #{backup_file}"
FileUtils.mkdir_p(backup_dir, mode: system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(backup_dir)
File.open(backup_file, 'ab', system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION) { |f|
@buffer.backup(chunk.unique_id) { |f|
chunk.write_to(f)
}
end
Expand Down
1 change: 0 additions & 1 deletion test/command/test_plugin_config_formatter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ class SimpleServiceDiscovery < ::Fluent::Plugin::ServiceDiscovery
retry_exponential_backoff_base: float: (2)
retry_max_interval: time: (nil)
retry_randomize: bool: (true)
disable_chunk_backup: bool: (false)
<secondary>: optional, single
@type: string: (nil)
<buffer>: optional, single
Expand Down
85 changes: 62 additions & 23 deletions test/plugin/test_buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1151,15 +1151,13 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)

sub_test_case 'there are existing broken file chunks' do
setup do
@id_output = 'backup_test'
@bufdir = File.expand_path('../../tmp/broken_buffer_file', __FILE__)
FileUtils.mkdir_p @bufdir unless File.exist?(@bufdir)
FileUtils.rm_rf @bufdir rescue nil
FileUtils.mkdir_p @bufdir
@bufpath = File.join(@bufdir, 'broken_test.*.log')

Fluent::Test.setup
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@p = Fluent::Plugin::FileBuffer.new
@p.owner = @d
@p.configure(config_element('buffer', '', {'path' => @bufpath}))
end

teardown do
Expand All @@ -1171,12 +1169,12 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end
if @bufdir
Dir.glob(File.join(@bufdir, '*')).each do |path|
next if ['.', '..'].include?(File.basename(path))
File.delete(path)
end
end
end

def setup_plugins(buf_conf)
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@d.configure(config_element('ROOT', '', {'@id' => @id_output}, [config_element('buffer', '', buf_conf)]))
@p = @d.buffer
end

def create_first_chunk(mode)
Expand Down Expand Up @@ -1232,44 +1230,85 @@ def compare_log(plugin, msg)
assert { logs.any? { |log| log.include?(msg) } }
end

test '#resume ignores staged empty chunk' do
_, p1 = create_first_chunk('b')
test '#resume backups staged empty chunk' do
setup_plugins({'path' => @bufpath})
c1id, p1 = create_first_chunk('b')
File.open(p1, 'wb') { |f| } # create staged empty chunk file
c2id, _ = create_second_chunk('b')

@p.start
Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do
@p.start
end

compare_staged_chunk(@p.stage, c2id, '2016-04-17 14:01:00 -0700', 3, :staged)
compare_log(@p, 'staged file chunk is empty')
assert { not File.exist?(p1) }
assert { File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c1id)}.log") }
end

test '#resume ignores staged broken metadata' do
test '#resume backups staged broken metadata' do
setup_plugins({'path' => @bufpath})
c1id, _ = create_first_chunk('b')
_, p2 = create_second_chunk('b')
c2id, p2 = create_second_chunk('b')
File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create staged broken meta file

@p.start
Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do
@p.start
end

compare_staged_chunk(@p.stage, c1id, '2016-04-17 14:00:00 -0700', 4, :staged)
compare_log(@p, 'staged meta file is broken')
assert { not File.exist?(p2) }
assert { File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") }
end

test '#resume ignores enqueued empty chunk' do
_, p1 = create_first_chunk('q')
test '#resume backups enqueued empty chunk' do
setup_plugins({'path' => @bufpath})
c1id, p1 = create_first_chunk('q')
File.open(p1, 'wb') { |f| } # create enqueued empty chunk file
c2id, _ = create_second_chunk('q')

@p.start
Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do
@p.start
end

compare_queued_chunk(@p.queue, c2id, 3, :queued)
compare_log(@p, 'enqueued file chunk is empty')
assert { not File.exist?(p1) }
assert { File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c1id)}.log") }
end

test '#resume ignores enqueued broken metadata' do
test '#resume backups enqueued broken metadata' do
setup_plugins({'path' => @bufpath})
c1id, _ = create_first_chunk('q')
_, p2 = create_second_chunk('q')
c2id, p2 = create_second_chunk('q')
File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create enqueued broken meta file

@p.start
Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do
@p.start
end

compare_queued_chunk(@p.queue, c1id, 4, :queued)
compare_log(@p, 'enqueued meta file is broken')
assert { not File.exist?(p2) }
assert { File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") }
end

test '#resume throws away broken chunk with disable_chunk_backup' do
setup_plugins({'path' => @bufpath, 'disable_chunk_backup' => true})
c1id, _ = create_first_chunk('b')
c2id, p2 = create_second_chunk('b')
File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create staged broken meta file

Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do
@p.start
end

compare_staged_chunk(@p.stage, c1id, '2016-04-17 14:00:00 -0700', 4, :staged)
compare_log(@p, 'staged meta file is broken')
compare_log(@p, 'disable_chunk_backup is true')
assert { not File.exist?(p2) }
assert { not File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") }
end
end
end
65 changes: 65 additions & 0 deletions test/plugin/test_buf_file_single.rb
Original file line number Diff line number Diff line change
Expand Up @@ -830,4 +830,69 @@ def create_driver(conf = TAG_CONF, klass = FluentPluginFileSingleBufferTest::Dum
assert_equal :queued, queue[0].state
end
end

sub_test_case 'there are existing broken file chunks' do
setup do
FileUtils.rm_rf(@bufdir) rescue nil
FileUtils.mkdir_p(@bufdir)
end

teardown do
return unless @p

@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end

test '#resume backups empty chunk' do
id_output = 'backup_test'
@d = create_driver(%[
@id #{id_output}
<buffer tag>
@type file_single
path #{PATH}
</buffer>
])
@p = @d.instance.buffer

c1id = Fluent::UniqueId.generate
p1 = File.join(@bufdir, "fsb.foo.b#{Fluent::UniqueId.hex(c1id)}.buf")
File.open(p1, 'wb') { |f| } # create empty chunk file

Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do
@p.start
end

assert { not File.exist?(p1) }
assert { File.exist?("#{@bufdir}/backup/worker0/#{id_output}/#{@d.instance.dump_unique_id_hex(c1id)}.log") }
end

test '#resume throws away broken chunk with disable_chunk_backup' do
id_output = 'backup_test'
@d = create_driver(%[
@id #{id_output}
<buffer tag>
@type file_single
path #{PATH}
disable_chunk_backup true
</buffer>
])
@p = @d.instance.buffer

c1id = Fluent::UniqueId.generate
p1 = File.join(@bufdir, "fsb.foo.b#{Fluent::UniqueId.hex(c1id)}.buf")
File.open(p1, 'wb') { |f| } # create empty chunk file

Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do
@p.start
end

assert { not File.exist?(p1) }
assert { not File.exist?("#{@bufdir}/backup/worker0/#{id_output}/#{@d.instance.dump_unique_id_hex(c1id)}.log") }
end
end
end