Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve on logging #1809

Merged
merged 2 commits into from
Jan 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions lib/fluent/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,9 @@ def skipped_type?(type)
end
end

def on_trace(&block)
def on_trace
return if @level > LEVEL_TRACE
block.call if block
yield
end

def trace(*args, &block)
Expand All @@ -282,9 +282,9 @@ def trace_backtrace(backtrace=$!.backtrace, type: :default)
dump_stacktrace(type, backtrace, LEVEL_TRACE)
end

def on_debug(&block)
def on_debug
return if @level > LEVEL_DEBUG
block.call if block
yield
end

def debug(*args, &block)
Expand All @@ -302,9 +302,9 @@ def debug_backtrace(backtrace=$!.backtrace, type: :default)
dump_stacktrace(type, backtrace, LEVEL_DEBUG)
end

def on_info(&block)
def on_info
return if @level > LEVEL_INFO
block.call if block
yield
end

def info(*args, &block)
Expand All @@ -322,9 +322,9 @@ def info_backtrace(backtrace=$!.backtrace, type: :default)
dump_stacktrace(type, backtrace, LEVEL_INFO)
end

def on_warn(&block)
def on_warn
return if @level > LEVEL_WARN
block.call if block
yield
end

def warn(*args, &block)
Expand All @@ -342,9 +342,9 @@ def warn_backtrace(backtrace=$!.backtrace, type: :default)
dump_stacktrace(type, backtrace, LEVEL_WARN)
end

def on_error(&block)
def on_error
return if @level > LEVEL_ERROR
block.call if block
yield
end

def error(*args, &block)
Expand All @@ -362,9 +362,9 @@ def error_backtrace(backtrace=$!.backtrace, type: :default)
dump_stacktrace(type, backtrace, LEVEL_ERROR)
end

def on_fatal(&block)
def on_fatal
return if @level > LEVEL_FATAL
block.call if block
yield
end

def fatal(*args, &block)
Expand Down
26 changes: 17 additions & 9 deletions lib/fluent/plugin/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ def new_metadata(timekey: nil, tag: nil, variables: nil)
end

def add_metadata(metadata)
log.trace "adding metadata", instance: self.object_id, metadata: metadata
log.on_trace { log.trace "adding metadata", instance: self.object_id, metadata: metadata }

synchronize do
if i = @metadata_list.index(metadata)
@metadata_list[i]
Expand All @@ -263,7 +264,7 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false)
return if metadata_and_data.size < 1
raise BufferOverflowError, "buffer space has too many data" unless storable?

log.trace "writing events into buffer", instance: self.object_id, metadata_size: metadata_and_data.size
log.on_trace { log.trace "writing events into buffer", instance: self.object_id, metadata_size: metadata_and_data.size }

staged_bytesize = 0
operated_chunks = []
Expand Down Expand Up @@ -382,7 +383,8 @@ def queued?(metadata=nil)
end

def enqueue_chunk(metadata)
log.trace "enqueueing chunk", instance: self.object_id, metadata: metadata
log.on_trace { log.trace "enqueueing chunk", instance: self.object_id, metadata: metadata }

chunk = synchronize do
@stage.delete(metadata)
end
Expand All @@ -406,7 +408,8 @@ def enqueue_chunk(metadata)
end

def enqueue_unstaged_chunk(chunk)
log.trace "enqueueing unstaged chunk", instance: self.object_id, metadata: chunk.metadata
log.on_trace { log.trace "enqueueing unstaged chunk", instance: self.object_id, metadata: chunk.metadata }

synchronize do
chunk.synchronize do
metadata = chunk.metadata
Expand All @@ -419,7 +422,8 @@ def enqueue_unstaged_chunk(chunk)
end

def enqueue_all
log.trace "enqueueing all chunks in buffer", instance: self.object_id
log.on_trace { log.trace "enqueueing all chunks in buffer", instance: self.object_id }

if block_given?
synchronize{ @stage.keys }.each do |metadata|
# NOTE: The following line might cause data race depending on Ruby implementations except CRuby
Expand All @@ -438,7 +442,8 @@ def enqueue_all

def dequeue_chunk
return nil if @queue.empty?
log.trace "dequeueing a chunk", instance: self.object_id
log.on_trace { log.trace "dequeueing a chunk", instance: self.object_id }

synchronize do
chunk = @queue.shift

Expand All @@ -453,7 +458,8 @@ def dequeue_chunk
end

def takeback_chunk(chunk_id)
log.trace "taking back a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id)
log.on_trace { log.trace "taking back a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id) }

synchronize do
chunk = @dequeued.delete(chunk_id)
return false unless chunk # already purged by other thread
Expand All @@ -470,7 +476,8 @@ def purge_chunk(chunk_id)
return nil unless chunk # purged by other threads

metadata = chunk.metadata
log.trace "purging a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata
log.on_trace { log.trace "purging a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata }

begin
bytesize = chunk.bytesize
chunk.purge
Expand All @@ -489,7 +496,8 @@ def purge_chunk(chunk_id)
end

def clear_queue!
log.trace "clearing queue", instance: self.object_id
log.on_trace { log.trace "clearing queue", instance: self.object_id }

synchronize do
until @queue.empty?
begin
Expand Down
5 changes: 3 additions & 2 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,8 @@ def handle_stream_simple(tag, es, enqueue: false)
end

def commit_write(chunk_id, delayed: @delayed_commit, secondary: false)
log.trace "committing write operation to a chunk", chunk: dump_unique_id_hex(chunk_id), delayed: delayed
log.on_trace { log.trace "committing write operation to a chunk", chunk: dump_unique_id_hex(chunk_id), delayed: delayed }

if delayed
@dequeued_chunks_mutex.synchronize do
@dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id }
Expand Down Expand Up @@ -1057,7 +1058,7 @@ def try_flush
chunk = @buffer.dequeue_chunk
return unless chunk

log.trace "trying flush for a chunk", chunk: dump_unique_id_hex(chunk.unique_id)
log.on_trace { log.trace "trying flush for a chunk", chunk: dump_unique_id_hex(chunk.unique_id) }

output = self
using_secondary = false
Expand Down