Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore non buffer chunk files when resumed #1124

Merged
merged 4 commits into from
Jul 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions lib/fluent/plugin/buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,17 @@ def resume
m = new_metadata() # this metadata will be overwritten by resuming .meta file content
# so it should not added into @metadata_list for now
mode = Fluent::Plugin::Buffer::FileChunk.assume_chunk_state(path)
if mode == :unknown
log.debug "uknown state chunk found", path: path
next
end

chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode) # file chunk resumes contents of metadata
case chunk.state
when :staged
stage[chunk.metadata] = chunk
when :queued
queue << chunk
else
raise "BUG: unexpected chunk state '#{chunk.state}' for path '#{path}'"
end
end

Expand Down
17 changes: 17 additions & 0 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,13 @@ def start
@queued_num[chunk.metadata] += 1
@queue_size += chunk.bytesize
end
log.debug "buffer started", instance: self.object_id, stage_size: @stage_size, queue_size: @queue_size
end

def close
super
synchronize do
log.debug "closing buffer", instance: self.object_id
@dequeued.each_pair do |chunk_id, chunk|
chunk.close
end
Expand Down Expand Up @@ -156,6 +158,7 @@ def new_metadata(timekey: nil, tag: nil, variables: nil)
end

def add_metadata(metadata)
log.trace "adding metadata", instance: self.object_id, metadata: metadata
synchronize do
if i = @metadata_list.index(metadata)
@metadata_list[i]
Expand All @@ -178,6 +181,8 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
return if metadata_and_data.size < 1
raise BufferOverflowError, "buffer space has too many data" unless storable?

log.trace "writing events into buffer", instance: self.object_id, metadata_size: metadata_and_data.size

staged_bytesize = 0
operated_chunks = []
unstaged_chunks = {} # metadata => [chunk, chunk, ...]
Expand Down Expand Up @@ -293,6 +298,7 @@ def queued?(metadata=nil)
end

def enqueue_chunk(metadata)
log.debug "enqueueing chunk", instance: self.object_id, metadata: metadata
synchronize do
chunk = @stage.delete(metadata)
return nil unless chunk
Expand All @@ -314,6 +320,7 @@ def enqueue_chunk(metadata)
end

def enqueue_unstaged_chunk(chunk)
log.debug "enqueueing unstaged chunk", instance: self.object_id, metadata: chunk.metadata
synchronize do
chunk.synchronize do
metadata = chunk.metadata
Expand All @@ -326,6 +333,7 @@ def enqueue_unstaged_chunk(chunk)
end

def enqueue_all
log.debug "enqueueing all chunks in buffer", instance: self.object_id
synchronize do
if block_given?
@stage.keys.each do |metadata|
Expand All @@ -343,6 +351,7 @@ def enqueue_all

def dequeue_chunk
return nil if @queue.empty?
log.debug "dequeueing a chunk", instance: self.object_id
synchronize do
chunk = @queue.shift

Expand All @@ -351,15 +360,18 @@ def dequeue_chunk

@dequeued[chunk.unique_id] = chunk
@queued_num[chunk.metadata] -= 1 # BUG if nil, 0 or subzero
log.debug "chunk dequeued", instance: self.object_id, metadata: chunk.metadata
chunk
end
end

def takeback_chunk(chunk_id)
log.debug "taking back a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id)
synchronize do
chunk = @dequeued.delete(chunk_id)
return false unless chunk # already purged by other thread
@queue.unshift(chunk)
log.debug "chunk taken back", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: chunk.metadata
@queued_num[chunk.metadata] += 1 # BUG if nil
end
true
Expand All @@ -371,22 +383,26 @@ def purge_chunk(chunk_id)
return nil unless chunk # purged by other threads

metadata = chunk.metadata
log.debug "purging a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata
begin
bytesize = chunk.bytesize
chunk.purge
@queue_size -= bytesize
rescue => e
log.error "failed to purge buffer chunk", chunk_id: dump_unique_id_hex(chunk_id), error_class: e.class, error: e
log.error_backtrace
end

if metadata && !@stage[metadata] && (!@queued_num[metadata] || @queued_num[metadata] < 1)
@metadata_list.delete(metadata)
end
log.debug "chunk purged", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata
end
nil
end

def clear_queue!
log.debug "clearing queue", instance: self.object_id
synchronize do
until @queue.empty?
begin
Expand All @@ -395,6 +411,7 @@ def clear_queue!
q.purge
rescue => e
log.error "unexpected error while clearing buffer queue", error_class: e.class, error: e
log.error_backtrace
end
end
@queue_size = 0
Expand Down
4 changes: 3 additions & 1 deletion lib/fluent/plugin/buffer/file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ def self.assume_chunk_state(path)
if /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n =~ path # //n switch means explicit 'ASCII-8BIT' pattern
$1 == 'b' ? :staged : :queued
else
:queued
# files which matches to glob of buffer file pattern
# it includes files which are created by out_file
:unknown
end
end

Expand Down
9 changes: 9 additions & 0 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ def handle_stream_simple(tag, es, enqueue: false)
end

def commit_write(chunk_id, delayed: @delayed_commit, secondary: false)
log.trace "committing write operation to a chunk", chunk: dump_unique_id_hex(chunk_id), delayed: delayed
if delayed
@dequeued_chunks_mutex.synchronize do
@dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id }
Expand Down Expand Up @@ -770,6 +771,8 @@ def try_flush
chunk = @buffer.dequeue_chunk
return unless chunk

log.debug "trying flush for a chunk", chunk: dump_unique_id_hex(chunk.unique_id)

output = self
using_secondary = false
if @retry_mutex.synchronize{ @retry && @retry.secondary? }
Expand All @@ -783,6 +786,7 @@ def try_flush

begin
if output.delayed_commit
log.trace "executing delayed write and commit", chunk: dump_unique_id_hex(chunk.unique_id)
@counters_monitor.synchronize{ @write_count += 1 }
output.try_write(chunk)
@dequeued_chunks_mutex.synchronize do
Expand All @@ -791,9 +795,14 @@ def try_flush
end
else # output plugin without delayed purge
chunk_id = chunk.unique_id
dump_chunk_id = dump_unique_id_hex(chunk_id)
log.trace "adding write count", instance: self.object_id
@counters_monitor.synchronize{ @write_count += 1 }
log.trace "executing sync write", chunk: dump_chunk_id
output.write(chunk)
log.trace "write operation done, committing", chunk: dump_chunk_id
commit_write(chunk_id, secondary: using_secondary)
log.trace "done to commit a chunk", chunk: dump_chunk_id
end
rescue => e
log.debug "taking back chunk for errors.", plugin_id: plugin_id, chunk: dump_unique_id_hex(chunk.unique_id)
Expand Down
68 changes: 68 additions & 0 deletions test/plugin/test_buf_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -500,4 +500,72 @@ def write_metadata(path, chunk_id, metadata, size, ctime, mtime)
assert_equal Time.parse('2016-04-17 14:01:22 -0700'), queue[3].modified_at
end
end

sub_test_case 'there are some non-buffer chunk files, with a path without buffer chunk ids' do
setup do
@bufdir = File.expand_path('../../tmp/buffer_file', __FILE__)

@c1id = Fluent::UniqueId.generate
p1 = File.join(@bufdir, "etest.201604171358.q#{Fluent::UniqueId.hex(@c1id)}.log")
File.open(p1, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-17 13:58:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-17 13:58:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-17 13:58:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-17 13:58:22 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
FileUtils.touch(p1, mtime: Time.parse('2016-04-17 13:58:28 -0700'))

@not_chunk = File.join(@bufdir, 'etest.20160416.log')
File.open(@not_chunk, 'wb') do |f|
f.write ["t1.test", event_time('2016-04-16 23:58:15 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t2.test", event_time('2016-04-16 23:58:17 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t3.test", event_time('2016-04-16 23:58:21 -0700').to_i, {"message" => "yay"}].to_json + "\n"
f.write ["t4.test", event_time('2016-04-16 23:58:22 -0700').to_i, {"message" => "yay"}].to_json + "\n"
end
FileUtils.touch(@not_chunk, mtime: Time.parse('2016-04-17 00:00:00 -0700'))

@bufpath = File.join(@bufdir, 'etest.*.log')

Fluent::Test.setup
@d = FluentPluginFileBufferTest::DummyOutputPlugin.new
@p = Fluent::Plugin::FileBuffer.new
@p.owner = @d
@p.configure(config_element('buffer', '', {'path' => @bufpath}))
@p.start
end

teardown do
if @p
@p.stop unless @p.stopped?
@p.before_shutdown unless @p.before_shutdown?
@p.shutdown unless @p.shutdown?
@p.after_shutdown unless @p.after_shutdown?
@p.close unless @p.closed?
@p.terminate unless @p.terminated?
end
if @bufdir
Dir.glob(File.join(@bufdir, '*')).each do |path|
next if ['.', '..'].include?(File.basename(path))
File.delete(path)
end
end
end

test '#resume returns queued chunks for files without metadata, while ignoring non-chunk looking files' do
assert_equal 0, @p.stage.size
assert_equal 1, @p.queue.size

queue = @p.queue

m = metadata()

assert_equal @c1id, queue[0].unique_id
assert_equal m, queue[0].metadata
assert_equal 0, queue[0].size
assert_equal :queued, queue[0].state
assert_equal Time.parse('2016-04-17 13:58:28 -0700'), queue[0].modified_at

assert File.exist?(@not_chunk)
end
end
end
5 changes: 3 additions & 2 deletions test/plugin/test_buffer_file_chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ def hex_id(id)
data(
correct_staged: ['/mydir/mypath/myfile.b00ff.log', :staged],
correct_queued: ['/mydir/mypath/myfile.q00ff.log', :queued],
incorrect_staged: ['/mydir/mypath/myfile.b00ff.log/unknown', :queued],
incorrect_queued: ['/mydir/mypath/myfile.q00ff.log/unknown', :queued],
incorrect_staged: ['/mydir/mypath/myfile.b00ff.log/unknown', :unknown],
incorrect_queued: ['/mydir/mypath/myfile.q00ff.log/unknown', :unknown],
output_file: ['/mydir/mypath/myfile.20160716.log', :unknown],
)
test 'can .assume_chunk_state' do |data|
path, expected = data
Expand Down