diff --git a/lib/fluent/plugin/buf_file.rb b/lib/fluent/plugin/buf_file.rb index d9f2bd2866..945b9682f0 100644 --- a/lib/fluent/plugin/buf_file.rb +++ b/lib/fluent/plugin/buf_file.rb @@ -122,14 +122,17 @@ def resume m = new_metadata() # this metadata will be overwritten by resuming .meta file content # so it should not added into @metadata_list for now mode = Fluent::Plugin::Buffer::FileChunk.assume_chunk_state(path) + if mode == :unknown + log.debug "uknown state chunk found", path: path + next + end + chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode) # file chunk resumes contents of metadata case chunk.state when :staged stage[chunk.metadata] = chunk when :queued queue << chunk - else - raise "BUG: unexpected chunk state '#{chunk.state}' for path '#{path}'" end end diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 2670bbbaaf..1eff6de52f 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -104,11 +104,13 @@ def start @queued_num[chunk.metadata] += 1 @queue_size += chunk.bytesize end + log.debug "buffer started", instance: self.object_id, stage_size: @stage_size, queue_size: @queue_size end def close super synchronize do + log.debug "closing buffer", instance: self.object_id @dequeued.each_pair do |chunk_id, chunk| chunk.close end @@ -156,6 +158,7 @@ def new_metadata(timekey: nil, tag: nil, variables: nil) end def add_metadata(metadata) + log.trace "adding metadata", instance: self.object_id, metadata: metadata synchronize do if i = @metadata_list.index(metadata) @metadata_list[i] @@ -178,6 +181,8 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false) return if metadata_and_data.size < 1 raise BufferOverflowError, "buffer space has too many data" unless storable? + log.trace "writing events into buffer", instance: self.object_id, metadata_size: metadata_and_data.size + staged_bytesize = 0 operated_chunks = [] unstaged_chunks = {} # metadata => [chunk, chunk, ...] @@ -293,6 +298,7 @@ def queued?(metadata=nil) end def enqueue_chunk(metadata) + log.debug "enqueueing chunk", instance: self.object_id, metadata: metadata synchronize do chunk = @stage.delete(metadata) return nil unless chunk @@ -314,6 +320,7 @@ def enqueue_chunk(metadata) end def enqueue_unstaged_chunk(chunk) + log.debug "enqueueing unstaged chunk", instance: self.object_id, metadata: chunk.metadata synchronize do chunk.synchronize do metadata = chunk.metadata @@ -326,6 +333,7 @@ def enqueue_unstaged_chunk(chunk) end def enqueue_all + log.debug "enqueueing all chunks in buffer", instance: self.object_id synchronize do if block_given? @stage.keys.each do |metadata| @@ -343,6 +351,7 @@ def enqueue_all def dequeue_chunk return nil if @queue.empty? + log.debug "dequeueing a chunk", instance: self.object_id synchronize do chunk = @queue.shift @@ -351,15 +360,18 @@ def dequeue_chunk @dequeued[chunk.unique_id] = chunk @queued_num[chunk.metadata] -= 1 # BUG if nil, 0 or subzero + log.debug "chunk dequeued", instance: self.object_id, metadata: chunk.metadata chunk end end def takeback_chunk(chunk_id) + log.debug "taking back a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id) synchronize do chunk = @dequeued.delete(chunk_id) return false unless chunk # already purged by other thread @queue.unshift(chunk) + log.debug "chunk taken back", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: chunk.metadata @queued_num[chunk.metadata] += 1 # BUG if nil end true @@ -371,22 +383,26 @@ def purge_chunk(chunk_id) return nil unless chunk # purged by other threads metadata = chunk.metadata + log.debug "purging a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata begin bytesize = chunk.bytesize chunk.purge @queue_size -= bytesize rescue => e log.error "failed to purge buffer chunk", chunk_id: dump_unique_id_hex(chunk_id), error_class: e.class, error: e + log.error_backtrace end if metadata && !@stage[metadata] && (!@queued_num[metadata] || @queued_num[metadata] < 1) @metadata_list.delete(metadata) end + log.debug "chunk purged", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata end nil end def clear_queue! + log.debug "clearing queue", instance: self.object_id synchronize do until @queue.empty? begin @@ -395,6 +411,7 @@ def clear_queue! q.purge rescue => e log.error "unexpected error while clearing buffer queue", error_class: e.class, error: e + log.error_backtrace end end @queue_size = 0 diff --git a/lib/fluent/plugin/buffer/file_chunk.rb b/lib/fluent/plugin/buffer/file_chunk.rb index 09d9419700..6fda86da16 100644 --- a/lib/fluent/plugin/buffer/file_chunk.rb +++ b/lib/fluent/plugin/buffer/file_chunk.rb @@ -149,7 +149,9 @@ def self.assume_chunk_state(path) if /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n =~ path # //n switch means explicit 'ASCII-8BIT' pattern $1 == 'b' ? :staged : :queued else - :queued + # files which matches to glob of buffer file pattern + # it includes files which are created by out_file + :unknown end end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 5a19204275..32bc6ef3d0 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -695,6 +695,7 @@ def handle_stream_simple(tag, es, enqueue: false) end def commit_write(chunk_id, delayed: @delayed_commit, secondary: false) + log.trace "committing write operation to a chunk", chunk: dump_unique_id_hex(chunk_id), delayed: delayed if delayed @dequeued_chunks_mutex.synchronize do @dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id } @@ -770,6 +771,8 @@ def try_flush chunk = @buffer.dequeue_chunk return unless chunk + log.debug "trying flush for a chunk", chunk: dump_unique_id_hex(chunk.unique_id) + output = self using_secondary = false if @retry_mutex.synchronize{ @retry && @retry.secondary? } @@ -783,6 +786,7 @@ def try_flush begin if output.delayed_commit + log.trace "executing delayed write and commit", chunk: dump_unique_id_hex(chunk.unique_id) @counters_monitor.synchronize{ @write_count += 1 } output.try_write(chunk) @dequeued_chunks_mutex.synchronize do @@ -791,9 +795,14 @@ def try_flush end else # output plugin without delayed purge chunk_id = chunk.unique_id + dump_chunk_id = dump_unique_id_hex(chunk_id) + log.trace "adding write count", instance: self.object_id @counters_monitor.synchronize{ @write_count += 1 } + log.trace "executing sync write", chunk: dump_chunk_id output.write(chunk) + log.trace "write operation done, committing", chunk: dump_chunk_id commit_write(chunk_id, secondary: using_secondary) + log.trace "done to commit a chunk", chunk: dump_chunk_id end rescue => e log.debug "taking back chunk for errors.", plugin_id: plugin_id, chunk: dump_unique_id_hex(chunk.unique_id) diff --git a/test/plugin/test_buf_file.rb b/test/plugin/test_buf_file.rb index 21cd7ef68f..aeb3257941 100644 --- a/test/plugin/test_buf_file.rb +++ b/test/plugin/test_buf_file.rb @@ -500,4 +500,72 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime) assert_equal Time.parse('2016-04-17 14:01:22 -0700'), queue[3].modified_at end end + + sub_test_case 'there are some non-buffer chunk files, with a path without buffer chunk ids' do + setup do + @bufdir = File.expand_path('../../tmp/buffer_file', __FILE__) + + @c1id = Fluent::UniqueId.generate + p1 = File.join(@bufdir, "etest.201604171358.q#{Fluent::UniqueId.hex(@c1id)}.log") + File.open(p1, 'wb') do |f| + f.write ["t1.test", event_time('2016-04-17 13:58:15 -0700').to_i, {"message" => "yay"}].to_json + "\n" + f.write ["t2.test", event_time('2016-04-17 13:58:17 -0700').to_i, {"message" => "yay"}].to_json + "\n" + f.write ["t3.test", event_time('2016-04-17 13:58:21 -0700').to_i, {"message" => "yay"}].to_json + "\n" + f.write ["t4.test", event_time('2016-04-17 13:58:22 -0700').to_i, {"message" => "yay"}].to_json + "\n" + end + FileUtils.touch(p1, mtime: Time.parse('2016-04-17 13:58:28 -0700')) + + @not_chunk = File.join(@bufdir, 'etest.20160416.log') + File.open(@not_chunk, 'wb') do |f| + f.write ["t1.test", event_time('2016-04-16 23:58:15 -0700').to_i, {"message" => "yay"}].to_json + "\n" + f.write ["t2.test", event_time('2016-04-16 23:58:17 -0700').to_i, {"message" => "yay"}].to_json + "\n" + f.write ["t3.test", event_time('2016-04-16 23:58:21 -0700').to_i, {"message" => "yay"}].to_json + "\n" + f.write ["t4.test", event_time('2016-04-16 23:58:22 -0700').to_i, {"message" => "yay"}].to_json + "\n" + end + FileUtils.touch(@not_chunk, mtime: Time.parse('2016-04-17 00:00:00 -0700')) + + @bufpath = File.join(@bufdir, 'etest.*.log') + + Fluent::Test.setup + @d = FluentPluginFileBufferTest::DummyOutputPlugin.new + @p = Fluent::Plugin::FileBuffer.new + @p.owner = @d + @p.configure(config_element('buffer', '', {'path' => @bufpath})) + @p.start + end + + teardown do + if @p + @p.stop unless @p.stopped? + @p.before_shutdown unless @p.before_shutdown? + @p.shutdown unless @p.shutdown? + @p.after_shutdown unless @p.after_shutdown? + @p.close unless @p.closed? + @p.terminate unless @p.terminated? + end + if @bufdir + Dir.glob(File.join(@bufdir, '*')).each do |path| + next if ['.', '..'].include?(File.basename(path)) + File.delete(path) + end + end + end + + test '#resume returns queued chunks for files without metadata, while ignoring non-chunk looking files' do + assert_equal 0, @p.stage.size + assert_equal 1, @p.queue.size + + queue = @p.queue + + m = metadata() + + assert_equal @c1id, queue[0].unique_id + assert_equal m, queue[0].metadata + assert_equal 0, queue[0].size + assert_equal :queued, queue[0].state + assert_equal Time.parse('2016-04-17 13:58:28 -0700'), queue[0].modified_at + + assert File.exist?(@not_chunk) + end + end end diff --git a/test/plugin/test_buffer_file_chunk.rb b/test/plugin/test_buffer_file_chunk.rb index 293060b8e9..13e72f8b68 100644 --- a/test/plugin/test_buffer_file_chunk.rb +++ b/test/plugin/test_buffer_file_chunk.rb @@ -49,8 +49,9 @@ def hex_id(id) data( correct_staged: ['/mydir/mypath/myfile.b00ff.log', :staged], correct_queued: ['/mydir/mypath/myfile.q00ff.log', :queued], - incorrect_staged: ['/mydir/mypath/myfile.b00ff.log/unknown', :queued], - incorrect_queued: ['/mydir/mypath/myfile.q00ff.log/unknown', :queued], + incorrect_staged: ['/mydir/mypath/myfile.b00ff.log/unknown', :unknown], + incorrect_queued: ['/mydir/mypath/myfile.q00ff.log/unknown', :unknown], + output_file: ['/mydir/mypath/myfile.20160716.log', :unknown], ) test 'can .assume_chunk_state' do |data| path, expected = data