Skip to content

Commit

Permalink
add plugin helper to load v0.12 style buffer configuration from v0.14…
Browse files Browse the repository at this point in the history
… style plugin
  • Loading branch information
tagomoris committed May 20, 2016
1 parent 83e90c4 commit 452642f
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 46 deletions.
54 changes: 8 additions & 46 deletions lib/fluent/compat/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
require 'fluent/compat/output_chain'
require 'fluent/timezone'

require 'fluent/plugin_helper/compat_parameters'

require 'time'

module Fluent
Expand Down Expand Up @@ -208,21 +210,7 @@ def support_in_v12_style?(feature)

config_param :flush_at_shutdown, :bool, default: true

PARAMS_MAP = {
"buffer_type" => "@type",
"buffer_path" => "path",
"num_threads" => "flush_threads",
"flush_interval" => "flush_interval",
"try_flush_interval" => "flush_thread_interval",
"queued_chunk_flush_interval" => "flush_burst_interval",
"disable_retry_limit" => "retry_forever",
"retry_limit" => "retry_max_times",
"max_retry_wait" => "retry_max_interval",
"buffer_chunk_limit" => "chunk_bytes_limit",
"buffer_queue_limit" => "queue_length_limit",
"buffer_queue_full_action" => "overflow_action",
"flush_at_shutdown" => "flush_at_shutdown",
}
PARAMS_MAP = Fluent::PluginHelper::CompatParameters::PARAMS_MAP

def configure(conf)
bufconf = CompatOutputUtils.buffer_section(conf)
Expand All @@ -233,6 +221,7 @@ def configure(conf)
"retry_type" => "exponential_backoff",
}
PARAMS_MAP.each do |older, newer|
next unless newer
buf_params[newer] = conf[older] if conf.has_key?(older)
end

Expand Down Expand Up @@ -344,21 +333,7 @@ def support_in_v12_style?(feature)

config_set_default :time_as_integer, true

PARAMS_MAP = {
"buffer_type" => "@type",
"buffer_path" => "path",
"num_threads" => "flush_threads",
"flush_interval" => "flush_interval",
"try_flush_interval" => "flush_thread_interval",
"queued_chunk_flush_interval" => "flush_burst_interval",
"disable_retry_limit" => "retry_forever",
"retry_limit" => "retry_max_times",
"max_retry_wait" => "retry_max_interval",
"buffer_chunk_limit" => "chunk_bytes_limit",
"buffer_queue_limit" => "queue_length_limit",
"buffer_queue_full_action" => "overflow_action",
"flush_at_shutdown" => "flush_at_shutdown",
}
PARAMS_MAP = Fluent::PluginHelper::CompatParameters::PARAMS_MAP

def configure(conf)
bufconf = CompatOutputUtils.buffer_section(conf)
Expand All @@ -369,6 +344,7 @@ def configure(conf)
"retry_type" => "exponential_backoff",
}
PARAMS_MAP.each do |older, newer|
next unless newer
buf_params[newer] = conf[older] if conf.has_key?(older)
end

Expand Down Expand Up @@ -471,22 +447,7 @@ def support_in_v12_style?(feature)
config_set_default :@type, 'file'
end

PARAMS_MAP = {
"buffer_type" => "@type",
"buffer_path" => "path",
"num_threads" => "flush_threads",
"flush_interval" => "flush_interval",
"try_flush_interval" => "flush_thread_interval",
"queued_chunk_flush_interval" => "flush_burst_interval",
"disable_retry_limit" => "retry_forever",
"retry_limit" => "retry_max_times",
"max_retry_wait" => "retry_max_interval",
"buffer_chunk_limit" => "chunk_bytes_limit",
"buffer_queue_limit" => "queue_length_limit",
"buffer_queue_full_action" => "overflow_action",
"flush_at_shutdown" => "flush_at_shutdown",
"time_slice_wait" => "timekey_wait",
}
PARAMS_MAP = Fluent::PluginHelper::CompatParameters::PARAMS_MAP.merge(Fluent::PluginHelper::CompatParameters::TIME_SLICED_PARAMS)

def initialize
super
Expand All @@ -509,6 +470,7 @@ def configure(conf)
"retry_type" => "exponential_backoff",
}
PARAMS_MAP.each do |older, newer|
next unless newer
buf_params[newer] = conf[older] if conf.has_key?(older)
end
unless buf_params.has_key?("@type")
Expand Down
93 changes: 93 additions & 0 deletions lib/fluent/plugin_helper/compat_parameters.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/config/element'

module Fluent
module PluginHelper
module CompatParameters
# This plugin helper is to bring old-fashioned buffer/other
# configuration parameters to v0.14 plugin API configurations.
# This helper is mainly to convert plugins from v0.12 API
# to v0.14 API safely, without breaking user deployment.

PARAMS_MAP = {
"buffer_type" => "@type",
"buffer_path" => "path",
"num_threads" => "flush_threads",
"flush_interval" => "flush_interval",
"try_flush_interval" => "flush_thread_interval",
"queued_chunk_flush_interval" => "flush_burst_interval",
"disable_retry_limit" => "retry_forever",
"retry_limit" => "retry_max_times",
"max_retry_wait" => "retry_max_interval",
"buffer_chunk_limit" => "chunk_bytes_limit",
"buffer_queue_limit" => "queue_length_limit",
"buffer_queue_full_action" => "overflow_action",
"flush_at_shutdown" => "flush_at_shutdown",
}

TIME_SLICED_PARAMS = {
"time_slice_format" => nil,
"time_slice_wait" => "timekey_wait",
}

def compat_parameters_default_chunk_key
# '', 'time' or 'tag'
raise NotImplementedError, "return one of '', 'time' or 'tag'"
end

def configure(conf)
if conf.elements('buffer').empty? && PARAMS_MAP.keys.any?{|k| conf.has_key?(k) } || TIME_SLICED_PARAMS.keys.any?{|k| conf.has_key?(k) }
# TODO: warn obsolete parameters if these are deprecated
attr = {}
PARAMS_MAP.each do |compat, current|
next unless current
attr[current] = conf[compat] if conf.has_key?(compat)
end
TIME_SLICED_PARAMS.each do |compat, current|
next unless current
attr[current] = conf[compat] if conf.has_key?(compat)
end

chunk_key = nil

if conf.has_key?('time_slice_format')
chunk_key = 'time'
attr['timekey_range'] = case conf['time_slice_format']
when /\%S/ then 1
when /\%M/ then 60
when /\%H/ then 3600
when /\%d/ then 86400
else
raise Fluent::ConfigError, "time_slice_format only with %Y or %m is too long"
end
else
chunk_key = compat_parameters_default_chunk_key
if chunk_key == 'time'
attr['timekey_range'] = 86400 # TimeSliceOutput.time_slice_format default value is '%Y%m%d'
end
end

e = Fluent::Config::Element.new('buffer', chunk_key, attr, [])
conf.elements << e
end

super
end
end
end
end
175 changes: 175 additions & 0 deletions test/plugin_helper/test_compat_parameters.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
require_relative '../helper'
require 'fluent/plugin_helper/compat_parameters'
require 'fluent/plugin/base'

class CompatParameterTest < Test::Unit::TestCase
setup do
Fluent::Test.setup
@i = nil
end

teardown do
if @i
@i.stop unless @i.stopped?
@i.before_shutdown unless @i.before_shutdown?
@i.shutdown unless @i.shutdown?
@i.after_shutdown unless @i.after_shutdown?
@i.close unless @i.closed?
@i.terminate unless @i.terminated?
end
end

class Dummy0 < Fluent::Plugin::Output
helpers :compat_parameters
def compat_parameters_default_chunk_key
''
end
def write(chunk)
# dummy
end
end
class Dummy1 < Fluent::Plugin::Output
helpers :compat_parameters
def compat_parameters_default_chunk_key
'time'
end
def write(chunk)
# dummy
end
end
class Dummy2 < Fluent::Plugin::Output
helpers :compat_parameters
# for test to assume default key time by 'time_slice_format'
def write(chunk)
# dummy
end
end
class Dummy3 < Fluent::Plugin::Output
helpers :compat_parameters
def compat_parameters_default_chunk_key
'tag'
end
def write(chunk)
# dummy
end
end

sub_test_case 'plugins which does not have default chunk key' do
setup do
@p = Dummy0
end

test 'plugin helper converts parameters into plugin configuration parameters' do
hash = {
'num_threads' => 8,
'flush_interval' => '10s',
'buffer_chunk_limit' => '8m',
'buffer_queue_limit' => '1024',
'flush_at_shutdown' => 'yes',
}
conf = config_element('ROOT', '', hash)
@i = @p.new
@i.configure(conf)

assert_equal 'memory', @i.buffer_config[:@type]
assert_equal [], @i.buffer_config.chunk_keys
assert_equal 8, @i.buffer_config.flush_threads
assert_equal 10, @i.buffer_config.flush_interval
assert @i.buffer_config.flush_at_shutdown

assert_equal 8*1024*1024, @i.buffer.chunk_bytes_limit
assert_equal 1024, @i.buffer.queue_length_limit
end
end

sub_test_case 'plugins which has default chunk key: time' do
setup do
@p = Dummy1
end

test 'plugin helper converts parameters into plugin configuration parameters' do
hash = {
'buffer_type' => 'file',
'buffer_path' => '/tmp/mybuffer',
'disable_retry_limit' => 'yes',
'max_retry_wait' => '1h',
'buffer_queue_full_action' => 'block',
}
conf = config_element('ROOT', '', hash)
@i = @p.new
@i.configure(conf)

assert_equal 'file', @i.buffer_config[:@type]
assert_equal 24*60*60, @i.buffer_config.timekey_range
assert @i.buffer_config.retry_forever
assert_equal 60*60, @i.buffer_config.retry_max_interval
assert_equal :block, @i.buffer_config.overflow_action

assert !@i.chunk_key_tag
assert_equal [], @i.chunk_keys

assert_equal '/tmp/mybuffer/buffer.*.log', @i.buffer.path
end
end

sub_test_case 'plugins which does not have default chunk key' do
setup do
@p = Dummy2
end

test 'plugin helper converts parameters into plugin configuration parameters' do
hash = {
'buffer_type' => 'file',
'buffer_path' => '/tmp/mybuffer',
'time_slice_format' => '%Y%m%d%H',
'time_slice_wait' => '10',
'retry_limit' => '1024',
'buffer_queue_full_action' => 'drop_oldest_chunk',
}
conf = config_element('ROOT', '', hash)
@i = @p.new
@i.configure(conf)

assert_equal 'file', @i.buffer_config[:@type]
assert_equal 60*60, @i.buffer_config.timekey_range
assert_equal 10, @i.buffer_config.timekey_wait
assert_equal 1024, @i.buffer_config.retry_max_times
assert_equal :drop_oldest_chunk, @i.buffer_config.overflow_action

assert @i.chunk_key_time
assert !@i.chunk_key_tag
assert_equal [], @i.chunk_keys

assert_equal '/tmp/mybuffer/buffer.*.log', @i.buffer.path
end
end

sub_test_case 'plugins which has default chunk key: tag' do
setup do
@p = Dummy3
end

test 'plugin helper converts parameters into plugin configuration parameters' do
hash = {
'buffer_type' => 'memory',
'num_threads' => '10',
'flush_interval' => '10s',
'try_flush_interval' => '0.1',
'queued_chunk_flush_interval' => '0.5',
}
conf = config_element('ROOT', '', hash)
@i = @p.new
@i.configure(conf)

assert_equal 'memory', @i.buffer_config[:@type]
assert_equal 10, @i.buffer_config.flush_threads
assert_equal 10, @i.buffer_config.flush_interval
assert_equal 0.1, @i.buffer_config.flush_thread_interval
assert_equal 0.5, @i.buffer_config.flush_burst_interval

assert !@i.chunk_key_time
assert @i.chunk_key_tag
assert_equal [], @i.chunk_keys
end
end
end

0 comments on commit 452642f

Please sign in to comment.