diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index 4725f5c9fe..4172909509 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -195,8 +195,20 @@ def generate_chunk(metadata) end def handle_broken_files(path, mode, e) - log.error "found broken chunk file during resume. Deleted corresponding files:", :path => path, :mode => mode, :err_msg => e.message - # After support 'backup_dir' feature, these files are moved to backup_dir instead of unlink. + log.error "found broken chunk file during resume.", :path => path, :mode => mode, :err_msg => e.message + unique_id = Fluent::Plugin::Buffer::FileChunk.unique_id_from_path(path) + backup(unique_id) { |f| + File.open(path, 'rb') { |chunk| + chunk.set_encoding(Encoding::ASCII_8BIT) + chunk.sync = true + chunk.binmode + IO.copy_stream(chunk, f) + } + } + rescue => error + log.error "backup failed. Delete corresponding files.", :err_msg => error.message + ensure + log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(unique_id)} chunk is thrown away." if @disable_chunk_backup File.unlink(path, path + '.meta') rescue nil end diff --git a/lib/fluent/plugin/buf_file_single.rb b/lib/fluent/plugin/buf_file_single.rb index 225447063e..f11c5df7e7 100644 --- a/lib/fluent/plugin/buf_file_single.rb +++ b/lib/fluent/plugin/buf_file_single.rb @@ -207,8 +207,20 @@ def generate_chunk(metadata) end def handle_broken_files(path, mode, e) - log.error "found broken chunk file during resume. Delete corresponding files:", path: path, mode: mode, err_msg: e.message - # After support 'backup_dir' feature, these files are moved to backup_dir instead of unlink. + log.error "found broken chunk file during resume.", :path => path, :mode => mode, :err_msg => e.message + unique_id, _ = Fluent::Plugin::Buffer::FileSingleChunk.unique_id_and_key_from_path(path) + backup(unique_id) { |f| + File.open(path, 'rb') { |chunk| + chunk.set_encoding(Encoding::ASCII_8BIT) + chunk.sync = true + chunk.binmode + IO.copy_stream(chunk, f) + } + } + rescue => error + log.error "backup failed. Delete corresponding files.", :err_msg => error.message + ensure + log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(unique_id)} chunk is thrown away." if @disable_chunk_backup File.unlink(path) rescue nil end diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 8315694e64..d04ae08296 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -66,6 +66,9 @@ class BufferChunkOverflowError < BufferError; end # A record size is larger than desc 'Compress buffered data.' config_param :compress, :enum, list: [:text, :gzip], default: :text + desc 'If true, chunks are thrown away when unrecoverable error happens' + config_param :disable_chunk_backup, :bool, default: false + Metadata = Struct.new(:timekey, :tag, :variables, :seq) do def initialize(timekey, tag, variables) super(timekey, tag, variables, 0) @@ -903,6 +906,24 @@ def statistics { 'buffer' => stats } end + def backup(chunk_unique_id) + unique_id = dump_unique_id_hex(chunk_unique_id) + + if @disable_chunk_backup + log.warn "disable_chunk_backup is true. #{unique_id} chunk is not backed up." + return + end + + safe_owner_id = owner.plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_') + backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR + backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_owner_id, "#{unique_id}.log") + backup_dir = File.dirname(backup_file) + + log.warn "bad chunk is moved to #{backup_file}" + FileUtils.mkdir_p(backup_dir, mode: system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(backup_dir) + File.open(backup_file, 'ab', system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION) { |f| yield f } + end + private def optimistic_queued?(metadata = nil) diff --git a/lib/fluent/plugin/buffer/file_chunk.rb b/lib/fluent/plugin/buffer/file_chunk.rb index e08987cf55..06bbf017c2 100644 --- a/lib/fluent/plugin/buffer/file_chunk.rb +++ b/lib/fluent/plugin/buffer/file_chunk.rb @@ -204,7 +204,7 @@ def self.generate_queued_chunk_path(path, unique_id) end end - # used only for queued v0.12 buffer path + # used only for queued v0.12 buffer path or broken files def self.unique_id_from_path(path) if /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n =~ path # //n switch means explicit 'ASCII-8BIT' pattern return $2.scan(/../).map{|x| x.to_i(16) }.pack('C*') diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 9087994bc9..5dd5255652 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -99,7 +99,6 @@ class Output < Base config_param :retry_max_interval, :time, default: nil, desc: 'The maximum interval seconds for exponential backoff between retries while failing.' config_param :retry_randomize, :bool, default: true, desc: 'If true, output plugin will retry after randomized interval not to do burst retries.' - config_param :disable_chunk_backup, :bool, default: false, desc: 'If true, chunks are thrown away when unrecoverable error happens' end config_section :secondary, param_name: :secondary_config, required: false, multi: false, final: true do @@ -378,6 +377,7 @@ def configure(conf) buffer_conf = conf.elements(name: 'buffer').first || Fluent::Config::Element.new('buffer', '', {}, []) @buffer = Plugin.new_buffer(buffer_type, parent: self) @buffer.configure(buffer_conf) + keep_buffer_config_compat @buffer.enable_update_timekeys if @chunk_key_time @flush_at_shutdown = @buffer_config.flush_at_shutdown @@ -435,6 +435,12 @@ def configure(conf) self end + def keep_buffer_config_compat + # Need this to call `@buffer_config.disable_chunk_backup` just as before, + # since some plugins may use this option in this way. + @buffer_config[:disable_chunk_backup] = @buffer.disable_chunk_backup + end + def start super @@ -1240,18 +1246,10 @@ def try_flush end def backup_chunk(chunk, using_secondary, delayed_commit) - if @buffer_config.disable_chunk_backup + if @buffer.disable_chunk_backup log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(chunk.unique_id)} chunk is thrown away" else - unique_id = dump_unique_id_hex(chunk.unique_id) - safe_plugin_id = plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_') - backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR - backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_plugin_id, "#{unique_id}.log") - backup_dir = File.dirname(backup_file) - - log.warn "bad chunk is moved to #{backup_file}" - FileUtils.mkdir_p(backup_dir, mode: system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(backup_dir) - File.open(backup_file, 'ab', system_config.file_permission || Fluent::DEFAULT_FILE_PERMISSION) { |f| + @buffer.backup(chunk.unique_id) { |f| chunk.write_to(f) } end diff --git a/test/command/test_plugin_config_formatter.rb b/test/command/test_plugin_config_formatter.rb index 28e311401d..a72ea29340 100644 --- a/test/command/test_plugin_config_formatter.rb +++ b/test/command/test_plugin_config_formatter.rb @@ -188,7 +188,6 @@ class SimpleServiceDiscovery < ::Fluent::Plugin::ServiceDiscovery retry_exponential_backoff_base: float: (2) retry_max_interval: time: (nil) retry_randomize: bool: (true) - disable_chunk_backup: bool: (false) : optional, single @type: string: (nil) : optional, single diff --git a/test/plugin/test_buf_file.rb b/test/plugin/test_buf_file.rb index 6a89bd9867..8a8c6744a4 100644 --- a/test/plugin/test_buf_file.rb +++ b/test/plugin/test_buf_file.rb @@ -1151,15 +1151,13 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime) sub_test_case 'there are existing broken file chunks' do setup do + @id_output = 'backup_test' @bufdir = File.expand_path('../../tmp/broken_buffer_file', __FILE__) - FileUtils.mkdir_p @bufdir unless File.exist?(@bufdir) + FileUtils.rm_rf @bufdir rescue nil + FileUtils.mkdir_p @bufdir @bufpath = File.join(@bufdir, 'broken_test.*.log') Fluent::Test.setup - @d = FluentPluginFileBufferTest::DummyOutputPlugin.new - @p = Fluent::Plugin::FileBuffer.new - @p.owner = @d - @p.configure(config_element('buffer', '', {'path' => @bufpath})) end teardown do @@ -1171,12 +1169,12 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime) @p.close unless @p.closed? @p.terminate unless @p.terminated? end - if @bufdir - Dir.glob(File.join(@bufdir, '*')).each do |path| - next if ['.', '..'].include?(File.basename(path)) - File.delete(path) - end - end + end + + def setup_plugins(buf_conf) + @d = FluentPluginFileBufferTest::DummyOutputPlugin.new + @d.configure(config_element('ROOT', '', {'@id' => @id_output}, [config_element('buffer', '', buf_conf)])) + @p = @d.buffer end def create_first_chunk(mode) @@ -1232,44 +1230,85 @@ def compare_log(plugin, msg) assert { logs.any? { |log| log.include?(msg) } } end - test '#resume ignores staged empty chunk' do - _, p1 = create_first_chunk('b') + test '#resume backups staged empty chunk' do + setup_plugins({'path' => @bufpath}) + c1id, p1 = create_first_chunk('b') File.open(p1, 'wb') { |f| } # create staged empty chunk file c2id, _ = create_second_chunk('b') - @p.start + Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do + @p.start + end + compare_staged_chunk(@p.stage, c2id, '2016-04-17 14:01:00 -0700', 3, :staged) compare_log(@p, 'staged file chunk is empty') + assert { not File.exist?(p1) } + assert { File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c1id)}.log") } end - test '#resume ignores staged broken metadata' do + test '#resume backups staged broken metadata' do + setup_plugins({'path' => @bufpath}) c1id, _ = create_first_chunk('b') - _, p2 = create_second_chunk('b') + c2id, p2 = create_second_chunk('b') File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create staged broken meta file - @p.start + Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do + @p.start + end + compare_staged_chunk(@p.stage, c1id, '2016-04-17 14:00:00 -0700', 4, :staged) compare_log(@p, 'staged meta file is broken') + assert { not File.exist?(p2) } + assert { File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") } end - test '#resume ignores enqueued empty chunk' do - _, p1 = create_first_chunk('q') + test '#resume backups enqueued empty chunk' do + setup_plugins({'path' => @bufpath}) + c1id, p1 = create_first_chunk('q') File.open(p1, 'wb') { |f| } # create enqueued empty chunk file c2id, _ = create_second_chunk('q') - @p.start + Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do + @p.start + end + compare_queued_chunk(@p.queue, c2id, 3, :queued) compare_log(@p, 'enqueued file chunk is empty') + assert { not File.exist?(p1) } + assert { File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c1id)}.log") } end - test '#resume ignores enqueued broken metadata' do + test '#resume backups enqueued broken metadata' do + setup_plugins({'path' => @bufpath}) c1id, _ = create_first_chunk('q') - _, p2 = create_second_chunk('q') + c2id, p2 = create_second_chunk('q') File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create enqueued broken meta file - @p.start + Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do + @p.start + end + compare_queued_chunk(@p.queue, c1id, 4, :queued) compare_log(@p, 'enqueued meta file is broken') + assert { not File.exist?(p2) } + assert { File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") } + end + + test '#resume throws away broken chunk with disable_chunk_backup' do + setup_plugins({'path' => @bufpath, 'disable_chunk_backup' => true}) + c1id, _ = create_first_chunk('b') + c2id, p2 = create_second_chunk('b') + File.open(p2 + '.meta', 'wb') { |f| f.write("\0" * 70) } # create staged broken meta file + + Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do + @p.start + end + + compare_staged_chunk(@p.stage, c1id, '2016-04-17 14:00:00 -0700', 4, :staged) + compare_log(@p, 'staged meta file is broken') + compare_log(@p, 'disable_chunk_backup is true') + assert { not File.exist?(p2) } + assert { not File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") } end end end diff --git a/test/plugin/test_buf_file_single.rb b/test/plugin/test_buf_file_single.rb index 9a14a2f4c7..85e8bc57d5 100644 --- a/test/plugin/test_buf_file_single.rb +++ b/test/plugin/test_buf_file_single.rb @@ -830,4 +830,69 @@ def create_driver(conf = TAG_CONF, klass = FluentPluginFileSingleBufferTest::Dum assert_equal :queued, queue[0].state end end + + sub_test_case 'there are existing broken file chunks' do + setup do + FileUtils.rm_rf(@bufdir) rescue nil + FileUtils.mkdir_p(@bufdir) + end + + teardown do + return unless @p + + @p.stop unless @p.stopped? + @p.before_shutdown unless @p.before_shutdown? + @p.shutdown unless @p.shutdown? + @p.after_shutdown unless @p.after_shutdown? + @p.close unless @p.closed? + @p.terminate unless @p.terminated? + end + + test '#resume backups empty chunk' do + id_output = 'backup_test' + @d = create_driver(%[ + @id #{id_output} + + @type file_single + path #{PATH} + + ]) + @p = @d.instance.buffer + + c1id = Fluent::UniqueId.generate + p1 = File.join(@bufdir, "fsb.foo.b#{Fluent::UniqueId.hex(c1id)}.buf") + File.open(p1, 'wb') { |f| } # create empty chunk file + + Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do + @p.start + end + + assert { not File.exist?(p1) } + assert { File.exist?("#{@bufdir}/backup/worker0/#{id_output}/#{@d.instance.dump_unique_id_hex(c1id)}.log") } + end + + test '#resume throws away broken chunk with disable_chunk_backup' do + id_output = 'backup_test' + @d = create_driver(%[ + @id #{id_output} + + @type file_single + path #{PATH} + disable_chunk_backup true + + ]) + @p = @d.instance.buffer + + c1id = Fluent::UniqueId.generate + p1 = File.join(@bufdir, "fsb.foo.b#{Fluent::UniqueId.hex(c1id)}.buf") + File.open(p1, 'wb') { |f| } # create empty chunk file + + Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do + @p.start + end + + assert { not File.exist?(p1) } + assert { not File.exist?("#{@bufdir}/backup/worker0/#{id_output}/#{@d.instance.dump_unique_id_hex(c1id)}.log") } + end + end end