From a11d5365e5bac07ff8a490d5600140ee42b0c7e4 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Thu, 28 Dec 2017 09:31:26 +0900 Subject: [PATCH 1/2] log: Use yield instead of &block --- lib/fluent/log.rb | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/fluent/log.rb b/lib/fluent/log.rb index a83b99c46f..ef201e8722 100644 --- a/lib/fluent/log.rb +++ b/lib/fluent/log.rb @@ -261,9 +261,9 @@ def skipped_type?(type) end end - def on_trace(&block) + def on_trace return if @level > LEVEL_TRACE - block.call if block + yield end def trace(*args, &block) @@ -282,9 +282,9 @@ def trace_backtrace(backtrace=$!.backtrace, type: :default) dump_stacktrace(type, backtrace, LEVEL_TRACE) end - def on_debug(&block) + def on_debug return if @level > LEVEL_DEBUG - block.call if block + yield end def debug(*args, &block) @@ -302,9 +302,9 @@ def debug_backtrace(backtrace=$!.backtrace, type: :default) dump_stacktrace(type, backtrace, LEVEL_DEBUG) end - def on_info(&block) + def on_info return if @level > LEVEL_INFO - block.call if block + yield end def info(*args, &block) @@ -322,9 +322,9 @@ def info_backtrace(backtrace=$!.backtrace, type: :default) dump_stacktrace(type, backtrace, LEVEL_INFO) end - def on_warn(&block) + def on_warn return if @level > LEVEL_WARN - block.call if block + yield end def warn(*args, &block) @@ -342,9 +342,9 @@ def warn_backtrace(backtrace=$!.backtrace, type: :default) dump_stacktrace(type, backtrace, LEVEL_WARN) end - def on_error(&block) + def on_error return if @level > LEVEL_ERROR - block.call if block + yield end def error(*args, &block) @@ -362,9 +362,9 @@ def error_backtrace(backtrace=$!.backtrace, type: :default) dump_stacktrace(type, backtrace, LEVEL_ERROR) end - def on_fatal(&block) + def on_fatal return if @level > LEVEL_FATAL - block.call if block + yield end def fatal(*args, &block) From d6760e26b6b47dffa1b0a60ef9d76e0a9aa72f95 Mon Sep 17 00:00:00 2001 From: Masahiro Nakagawa Date: Thu, 28 Dec 2017 09:31:46 +0900 Subject: [PATCH 2/2] Use on_* API to avoid unnecessary object allocation --- lib/fluent/plugin/buffer.rb | 26 +++++++++++++++++--------- lib/fluent/plugin/output.rb | 5 +++-- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/lib/fluent/plugin/buffer.rb b/lib/fluent/plugin/buffer.rb index 7bad8f9fbe..68bf772d3c 100644 --- a/lib/fluent/plugin/buffer.rb +++ b/lib/fluent/plugin/buffer.rb @@ -240,7 +240,8 @@ def new_metadata(timekey: nil, tag: nil, variables: nil) end def add_metadata(metadata) - log.trace "adding metadata", instance: self.object_id, metadata: metadata + log.on_trace { log.trace "adding metadata", instance: self.object_id, metadata: metadata } + synchronize do if i = @metadata_list.index(metadata) @metadata_list[i] @@ -263,7 +264,7 @@ def write(metadata_and_data, format: nil, size: nil, enqueue: false) return if metadata_and_data.size < 1 raise BufferOverflowError, "buffer space has too many data" unless storable? - log.trace "writing events into buffer", instance: self.object_id, metadata_size: metadata_and_data.size + log.on_trace { log.trace "writing events into buffer", instance: self.object_id, metadata_size: metadata_and_data.size } staged_bytesize = 0 operated_chunks = [] @@ -382,7 +383,8 @@ def queued?(metadata=nil) end def enqueue_chunk(metadata) - log.trace "enqueueing chunk", instance: self.object_id, metadata: metadata + log.on_trace { log.trace "enqueueing chunk", instance: self.object_id, metadata: metadata } + chunk = synchronize do @stage.delete(metadata) end @@ -406,7 +408,8 @@ def enqueue_chunk(metadata) end def enqueue_unstaged_chunk(chunk) - log.trace "enqueueing unstaged chunk", instance: self.object_id, metadata: chunk.metadata + log.on_trace { log.trace "enqueueing unstaged chunk", instance: self.object_id, metadata: chunk.metadata } + synchronize do chunk.synchronize do metadata = chunk.metadata @@ -419,7 +422,8 @@ def enqueue_unstaged_chunk(chunk) end def enqueue_all - log.trace "enqueueing all chunks in buffer", instance: self.object_id + log.on_trace { log.trace "enqueueing all chunks in buffer", instance: self.object_id } + if block_given? synchronize{ @stage.keys }.each do |metadata| # NOTE: The following line might cause data race depending on Ruby implementations except CRuby @@ -438,7 +442,8 @@ def enqueue_all def dequeue_chunk return nil if @queue.empty? - log.trace "dequeueing a chunk", instance: self.object_id + log.on_trace { log.trace "dequeueing a chunk", instance: self.object_id } + synchronize do chunk = @queue.shift @@ -453,7 +458,8 @@ def dequeue_chunk end def takeback_chunk(chunk_id) - log.trace "taking back a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id) + log.on_trace { log.trace "taking back a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id) } + synchronize do chunk = @dequeued.delete(chunk_id) return false unless chunk # already purged by other thread @@ -470,7 +476,8 @@ def purge_chunk(chunk_id) return nil unless chunk # purged by other threads metadata = chunk.metadata - log.trace "purging a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata + log.on_trace { log.trace "purging a chunk", instance: self.object_id, chunk_id: dump_unique_id_hex(chunk_id), metadata: metadata } + begin bytesize = chunk.bytesize chunk.purge @@ -489,7 +496,8 @@ def purge_chunk(chunk_id) end def clear_queue! - log.trace "clearing queue", instance: self.object_id + log.on_trace { log.trace "clearing queue", instance: self.object_id } + synchronize do until @queue.empty? begin diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index b90c1d04c2..db49644948 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -971,7 +971,8 @@ def handle_stream_simple(tag, es, enqueue: false) end def commit_write(chunk_id, delayed: @delayed_commit, secondary: false) - log.trace "committing write operation to a chunk", chunk: dump_unique_id_hex(chunk_id), delayed: delayed + log.on_trace { log.trace "committing write operation to a chunk", chunk: dump_unique_id_hex(chunk_id), delayed: delayed } + if delayed @dequeued_chunks_mutex.synchronize do @dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id } @@ -1057,7 +1058,7 @@ def try_flush chunk = @buffer.dequeue_chunk return unless chunk - log.trace "trying flush for a chunk", chunk: dump_unique_id_hex(chunk.unique_id) + log.on_trace { log.trace "trying flush for a chunk", chunk: dump_unique_id_hex(chunk.unique_id) } output = self using_secondary = false