Skip to content

Commit

Permalink
Merge pull request #1124 from fluent/ignore-non-buffer-chunk-files
Browse files Browse the repository at this point in the history
Ignore non buffer chunk files when resumed
  • Loading branch information
tagomoris authored Jul 28, 2016
2 parents 82eaf4a + 3e13086 commit 57660ee
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 5 deletions.
7 changes: 5 additions & 2 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,17 @@ def resume
m = new_metadata() # this metadata will be overwritten by resuming .meta file content
# so it should not added into @metadata_list for now
mode = Fluent::Plugin::Buffer::FileChunk.assume_chunk_state(path)
if mode == :unknown
log.debug "uknown state chunk found", path: path
next
end

chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode) # file chunk resumes contents of metadata
case chunk.state
when :staged
stage[chunk.metadata] = chunk
when :queued
queue << chunk
else
raise "BUG: unexpected chunk state '#{chunk.state}' for path '#{path}'"
end
end

Expand Down
17 changes: 17 additions & 0 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,13 @@ def start
@queued_num[chunk.metadata] += 1
@queue_size += chunk.bytesize
end
log.debug "buffer started", instance: self.object_id, stage_size: @stage_size, queue_size: @queue_size
end

def close
super
synchronize do
log.debug "closing buffer", instance: self.object_id
@dequeued.each_pair do |chunk_id, chunk|
chunk.close
end
Expand Down Expand Up @@ -156,6 +158,7 @@ def new_metadata(timekey: nil, tag: nil, variables: nil)
end

def add_metadata(metadata)
log.trace "adding metadata", instance: self.object_id, metadata: metadata
synchronize do
if i = @metadata_list.index(metadata)
@metadata_list[i]
Expand All @@ -178,6 +181,8 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
return if metadata_and_data.size < 1
raise BufferOverflowError, "buffer space has too many data" unless storable?

log.trace "writing events into buffer", instance: self.object_id, metadata_size: metadata_and_data.size

staged_bytesize = 0
operated_chunks = []
unstaged_chunks = {} # metadata => [chunk, chunk, ...]
Expand Down Expand Up @@ -293,6 +298,7 @@ def queued?(metadata=nil)
end

def enqueue_chunk(metadata)
log.debug "enqueueing chunk", instance: self.object_id, metadata: metadata
synchronize do
chunk = @stage.delete(metadata)
return nil unless chunk
Expand All @@ -314,6 +320,7 @@ def enqueue_chunk(metadata)
end

def enqueue_unstaged_chunk(chunk)
log.debug "enqueueing unstaged chunk", instance: self.object_id, metadata: chunk.metadata
synchronize do
chunk.synchronize do
metadata = chunk.metadata
Expand All @@ -326,6 +333,7 @@ def enqueue_unstaged_chunk(chunk)
end

def enqueue_all
log.debug "enqueueing all chunks in buffer", instance: self.object_id
synchronize do
if block_given?
@stage.keys.each do |metadata|
Expand All @@ -343,6 +351,7 @@ def enqueue_all

def dequeue_chunk
return nil if @queue.empty?
log.debug "dequeueing a chunk", instance: self.object_id
synchronize do
chunk = @queue.shift

Expand All @@ -351,15 +360,18 @@ def dequeue_chunk

@dequeued[chunk.unique_id] = chunk
@queued_num[chunk.metadata] -= 1 # BUG if nil, 0 or subzero
log.debug "chunk dequeued", instance: self.object_id, metadata: chunk.metadata
chunk
end
end

def takeback_chunk(chunk_id)
log.debug "taking back a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id)
synchronize do
chunk = @dequeued.delete(chunk_id)
return false unless chunk # already purged by other thread
@queue.unshift(chunk)
log.debug "chunk taken back", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: chunk.metadata
@queued_num[chunk.metadata] += 1 # BUG if nil
end
true
Expand All @@ -371,22 +383,26 @@ def purge_chunk(chunk_id)
return nil unless chunk # purged by other threads

metadata = chunk.metadata
log.debug "purging a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata
begin
bytesize = chunk.bytesize
chunk.purge
@queue_size -= bytesize
rescue => e
log.error "failed to purge buffer chunk", chunk_id: dump_unique_id_hex(chunk_id), error_class: e.class, error: e
log.error_backtrace
end

if metadata && !@stage[metadata] && (!@queued_num[metadata] || @queued_num[metadata] < 1)
@metadata_list.delete(metadata)
end
log.debug "chunk purged", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata
end
nil
end

def clear_queue!
log.debug "clearing queue", instance: self.object_id
synchronize do
until @queue.empty?
begin
Expand All @@ -395,6 +411,7 @@ def clear_queue!
q.purge
rescue => e
log.error "unexpected error while clearing buffer queue", error_class: e.class, error: e
log.error_backtrace
end
end
@queue_size = 0
Expand Down
4 changes: 3 additions & 1 deletion lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ def self.assume_chunk_state(path)
if /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n =~ path # //n switch means explicit 'ASCII-8BIT' pattern
$1 == 'b' ? :staged : :queued
else
:queued
# files which matches to glob of buffer file pattern
# it includes files which are created by out_file
:unknown
end
end

Expand Down
9 changes: 9 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ def handle_stream_simple(tag, es, enqueue: false)
end

def commit_write(chunk_id, delayed: @delayed_commit, secondary: false)
log.trace "committing write operation to a chunk", chunk: dump_unique_id_hex(chunk_id), delayed: delayed
if delayed
@dequeued_chunks_mutex.synchronize do
@dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id }
Expand Down Expand Up @@ -770,6 +771,8 @@ def try_flush
chunk = @buffer.dequeue_chunk
return unless chunk

log.debug "trying flush for a chunk", chunk: dump_unique_id_hex(chunk.unique_id)

output = self
using_secondary = false
if @retry_mutex.synchronize{ @retry && @retry.secondary? }
Expand All @@ -783,6 +786,7 @@ def try_flush

begin
if output.delayed_commit
log.trace "executing delayed write and commit", chunk: dump_unique_id_hex(chunk.unique_id)
@counters_monitor.synchronize{ @write_count += 1 }
output.try_write(chunk)
@dequeued_chunks_mutex.synchronize do
Expand All @@ -791,9 +795,14 @@ def try_flush
end
else # output plugin without delayed purge
chunk_id = chunk.unique_id
dump_chunk_id = dump_unique_id_hex(chunk_id)
log.trace "adding write count", instance: self.object_id
@counters_monitor.synchronize{ @write_count += 1 }
log.trace "executing sync write", chunk: dump_chunk_id
output.write(chunk)
log.trace "write operation done, committing", chunk: dump_chunk_id
commit_write(chunk_id, secondary: using_secondary)
log.trace "done to commit a chunk", chunk: dump_chunk_id
end
rescue => e
log.debug "taking back chunk for errors.", plugin_id: plugin_id, chunk: dump_unique_id_hex(chunk.unique_id)
Expand Down
68 changes: 68 additions & 0 deletions test/plugin/test_buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -500,4 +500,72 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
assert_equal Time.parse('2016-04-17 14:01:22 -0700'), queue[3].modified_at
end
end

sub_test_case 'there are some non-buffer chunk files, with a path without buffer chunk ids' do
setup do
@bufdir = File.expand_path('../../tmp/buffer_file', __FILE__)

@c1id = Fluent::UniqueId.generate
p1 = File.join(@bufdir, "etest.201604171358.q#{Fluent::UniqueId.hex(@c1id)}.log")
File.open(p1, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 13:58:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 13:58:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 13:58:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 13:58:22 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
FileUtils.touch(p1, mtime: Time.parse('2016-04-17 13:58:28 -0700'))

@not_chunk = File.join(@bufdir, 'etest.20160416.log')
File.open(@not_chunk, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-16 23:58:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-16 23:58:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-16 23:58:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-16 23:58:22 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
FileUtils.touch(@not_chunk, mtime: Time.parse('2016-04-17 00:00:00 -0700'))

@bufpath = File.join(@bufdir, 'etest.*.log')

Fluent::Test.setup
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@p = Fluent::Plugin::FileBuffer.new
@p.owner = @d
@p.configure(config_element('buffer', '', {'path' => @bufpath}))
@p.start
end

teardown do
if @p
@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end
if @bufdir
Dir.glob(File.join(@bufdir, '*')).each do |path|
next if ['.', '..'].include?(File.basename(path))
File.delete(path)
end
end
end

test '#resume returns queued chunks for files without metadata, while ignoring non-chunk looking files' do
assert_equal 0, @p.stage.size
assert_equal 1, @p.queue.size

queue = @p.queue

m = metadata()

assert_equal @c1id, queue[0].unique_id
assert_equal m, queue[0].metadata
assert_equal 0, queue[0].size
assert_equal :queued, queue[0].state
assert_equal Time.parse('2016-04-17 13:58:28 -0700'), queue[0].modified_at

assert File.exist?(@not_chunk)
end
end
end
5 changes: 3 additions & 2 deletions test/plugin/test_buffer_file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ def hex_id(id)
data(
correct_staged: ['/mydir/mypath/myfile.b00ff.log', :staged],
correct_queued: ['/mydir/mypath/myfile.q00ff.log', :queued],
incorrect_staged: ['/mydir/mypath/myfile.b00ff.log/unknown', :queued],
incorrect_queued: ['/mydir/mypath/myfile.q00ff.log/unknown', :queued],
incorrect_staged: ['/mydir/mypath/myfile.b00ff.log/unknown', :unknown],
incorrect_queued: ['/mydir/mypath/myfile.q00ff.log/unknown', :unknown],
output_file: ['/mydir/mypath/myfile.20160716.log', :unknown],
)
test 'can .assume_chunk_state' do |data|
path, expected = data
Expand Down

0 comments on commit 57660ee

Please sign in to comment.