diff --git a/lib/fluent/compat/output.rb b/lib/fluent/compat/output.rb index 397c0ffb4b..6ec1516a4a 100644 --- a/lib/fluent/compat/output.rb +++ b/lib/fluent/compat/output.rb @@ -21,6 +21,8 @@ require 'fluent/compat/output_chain' require 'fluent/timezone' +require 'fluent/plugin_helper/compat_parameters' + require 'time' module Fluent @@ -208,21 +210,7 @@ def support_in_v12_style?(feature) config_param :flush_at_shutdown, :bool, default: true - PARAMS_MAP = { - "buffer_type" => "@type", - "buffer_path" => "path", - "num_threads" => "flush_threads", - "flush_interval" => "flush_interval", - "try_flush_interval" => "flush_thread_interval", - "queued_chunk_flush_interval" => "flush_burst_interval", - "disable_retry_limit" => "retry_forever", - "retry_limit" => "retry_max_times", - "max_retry_wait" => "retry_max_interval", - "buffer_chunk_limit" => "chunk_bytes_limit", - "buffer_queue_limit" => "queue_length_limit", - "buffer_queue_full_action" => "overflow_action", - "flush_at_shutdown" => "flush_at_shutdown", - } + PARAMS_MAP = Fluent::PluginHelper::CompatParameters::PARAMS_MAP def configure(conf) bufconf = CompatOutputUtils.buffer_section(conf) @@ -233,6 +221,7 @@ def configure(conf) "retry_type" => "exponential_backoff", } PARAMS_MAP.each do |older, newer| + next unless newer buf_params[newer] = conf[older] if conf.has_key?(older) end @@ -344,21 +333,7 @@ def support_in_v12_style?(feature) config_set_default :time_as_integer, true - PARAMS_MAP = { - "buffer_type" => "@type", - "buffer_path" => "path", - "num_threads" => "flush_threads", - "flush_interval" => "flush_interval", - "try_flush_interval" => "flush_thread_interval", - "queued_chunk_flush_interval" => "flush_burst_interval", - "disable_retry_limit" => "retry_forever", - "retry_limit" => "retry_max_times", - "max_retry_wait" => "retry_max_interval", - "buffer_chunk_limit" => "chunk_bytes_limit", - "buffer_queue_limit" => "queue_length_limit", - "buffer_queue_full_action" => "overflow_action", - "flush_at_shutdown" => "flush_at_shutdown", - } + PARAMS_MAP = Fluent::PluginHelper::CompatParameters::PARAMS_MAP def configure(conf) bufconf = CompatOutputUtils.buffer_section(conf) @@ -369,6 +344,7 @@ def configure(conf) "retry_type" => "exponential_backoff", } PARAMS_MAP.each do |older, newer| + next unless newer buf_params[newer] = conf[older] if conf.has_key?(older) end @@ -471,22 +447,7 @@ def support_in_v12_style?(feature) config_set_default :@type, 'file' end - PARAMS_MAP = { - "buffer_type" => "@type", - "buffer_path" => "path", - "num_threads" => "flush_threads", - "flush_interval" => "flush_interval", - "try_flush_interval" => "flush_thread_interval", - "queued_chunk_flush_interval" => "flush_burst_interval", - "disable_retry_limit" => "retry_forever", - "retry_limit" => "retry_max_times", - "max_retry_wait" => "retry_max_interval", - "buffer_chunk_limit" => "chunk_bytes_limit", - "buffer_queue_limit" => "queue_length_limit", - "buffer_queue_full_action" => "overflow_action", - "flush_at_shutdown" => "flush_at_shutdown", - "time_slice_wait" => "timekey_wait", - } + PARAMS_MAP = Fluent::PluginHelper::CompatParameters::PARAMS_MAP.merge(Fluent::PluginHelper::CompatParameters::TIME_SLICED_PARAMS) def initialize super @@ -509,6 +470,7 @@ def configure(conf) "retry_type" => "exponential_backoff", } PARAMS_MAP.each do |older, newer| + next unless newer buf_params[newer] = conf[older] if conf.has_key?(older) end unless buf_params.has_key?("@type") diff --git a/lib/fluent/plugin_helper/compat_parameters.rb b/lib/fluent/plugin_helper/compat_parameters.rb new file mode 100644 index 0000000000..1061f290f0 --- /dev/null +++ b/lib/fluent/plugin_helper/compat_parameters.rb @@ -0,0 +1,93 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/config/element' + +module Fluent + module PluginHelper + module CompatParameters + # This plugin helper is to bring old-fashioned buffer/other + # configuration parameters to v0.14 plugin API configurations. + # This helper is mainly to convert plugins from v0.12 API + # to v0.14 API safely, without breaking user deployment. + + PARAMS_MAP = { + "buffer_type" => "@type", + "buffer_path" => "path", + "num_threads" => "flush_threads", + "flush_interval" => "flush_interval", + "try_flush_interval" => "flush_thread_interval", + "queued_chunk_flush_interval" => "flush_burst_interval", + "disable_retry_limit" => "retry_forever", + "retry_limit" => "retry_max_times", + "max_retry_wait" => "retry_max_interval", + "buffer_chunk_limit" => "chunk_bytes_limit", + "buffer_queue_limit" => "queue_length_limit", + "buffer_queue_full_action" => "overflow_action", + "flush_at_shutdown" => "flush_at_shutdown", + } + + TIME_SLICED_PARAMS = { + "time_slice_format" => nil, + "time_slice_wait" => "timekey_wait", + } + + def compat_parameters_default_chunk_key + # '', 'time' or 'tag' + raise NotImplementedError, "return one of '', 'time' or 'tag'" + end + + def configure(conf) + if conf.elements('buffer').empty? && PARAMS_MAP.keys.any?{|k| conf.has_key?(k) } || TIME_SLICED_PARAMS.keys.any?{|k| conf.has_key?(k) } + # TODO: warn obsolete parameters if these are deprecated + attr = {} + PARAMS_MAP.each do |compat, current| + next unless current + attr[current] = conf[compat] if conf.has_key?(compat) + end + TIME_SLICED_PARAMS.each do |compat, current| + next unless current + attr[current] = conf[compat] if conf.has_key?(compat) + end + + chunk_key = nil + + if conf.has_key?('time_slice_format') + chunk_key = 'time' + attr['timekey_range'] = case conf['time_slice_format'] + when /\%S/ then 1 + when /\%M/ then 60 + when /\%H/ then 3600 + when /\%d/ then 86400 + else + raise Fluent::ConfigError, "time_slice_format only with %Y or %m is too long" + end + else + chunk_key = compat_parameters_default_chunk_key + if chunk_key == 'time' + attr['timekey_range'] = 86400 # TimeSliceOutput.time_slice_format default value is '%Y%m%d' + end + end + + e = Fluent::Config::Element.new('buffer', chunk_key, attr, []) + conf.elements << e + end + + super + end + end + end +end diff --git a/test/plugin_helper/test_compat_parameters.rb b/test/plugin_helper/test_compat_parameters.rb new file mode 100644 index 0000000000..d058baf0b8 --- /dev/null +++ b/test/plugin_helper/test_compat_parameters.rb @@ -0,0 +1,175 @@ +require_relative '../helper' +require 'fluent/plugin_helper/compat_parameters' +require 'fluent/plugin/base' + +class CompatParameterTest < Test::Unit::TestCase + setup do + Fluent::Test.setup + @i = nil + end + + teardown do + if @i + @i.stop unless @i.stopped? + @i.before_shutdown unless @i.before_shutdown? + @i.shutdown unless @i.shutdown? + @i.after_shutdown unless @i.after_shutdown? + @i.close unless @i.closed? + @i.terminate unless @i.terminated? + end + end + + class Dummy0 < Fluent::Plugin::Output + helpers :compat_parameters + def compat_parameters_default_chunk_key + '' + end + def write(chunk) + # dummy + end + end + class Dummy1 < Fluent::Plugin::Output + helpers :compat_parameters + def compat_parameters_default_chunk_key + 'time' + end + def write(chunk) + # dummy + end + end + class Dummy2 < Fluent::Plugin::Output + helpers :compat_parameters + # for test to assume default key time by 'time_slice_format' + def write(chunk) + # dummy + end + end + class Dummy3 < Fluent::Plugin::Output + helpers :compat_parameters + def compat_parameters_default_chunk_key + 'tag' + end + def write(chunk) + # dummy + end + end + + sub_test_case 'plugins which does not have default chunk key' do + setup do + @p = Dummy0 + end + + test 'plugin helper converts parameters into plugin configuration parameters' do + hash = { + 'num_threads' => 8, + 'flush_interval' => '10s', + 'buffer_chunk_limit' => '8m', + 'buffer_queue_limit' => '1024', + 'flush_at_shutdown' => 'yes', + } + conf = config_element('ROOT', '', hash) + @i = @p.new + @i.configure(conf) + + assert_equal 'memory', @i.buffer_config[:@type] + assert_equal [], @i.buffer_config.chunk_keys + assert_equal 8, @i.buffer_config.flush_threads + assert_equal 10, @i.buffer_config.flush_interval + assert @i.buffer_config.flush_at_shutdown + + assert_equal 8*1024*1024, @i.buffer.chunk_bytes_limit + assert_equal 1024, @i.buffer.queue_length_limit + end + end + + sub_test_case 'plugins which has default chunk key: time' do + setup do + @p = Dummy1 + end + + test 'plugin helper converts parameters into plugin configuration parameters' do + hash = { + 'buffer_type' => 'file', + 'buffer_path' => '/tmp/mybuffer', + 'disable_retry_limit' => 'yes', + 'max_retry_wait' => '1h', + 'buffer_queue_full_action' => 'block', + } + conf = config_element('ROOT', '', hash) + @i = @p.new + @i.configure(conf) + + assert_equal 'file', @i.buffer_config[:@type] + assert_equal 24*60*60, @i.buffer_config.timekey_range + assert @i.buffer_config.retry_forever + assert_equal 60*60, @i.buffer_config.retry_max_interval + assert_equal :block, @i.buffer_config.overflow_action + + assert !@i.chunk_key_tag + assert_equal [], @i.chunk_keys + + assert_equal '/tmp/mybuffer/buffer.*.log', @i.buffer.path + end + end + + sub_test_case 'plugins which does not have default chunk key' do + setup do + @p = Dummy2 + end + + test 'plugin helper converts parameters into plugin configuration parameters' do + hash = { + 'buffer_type' => 'file', + 'buffer_path' => '/tmp/mybuffer', + 'time_slice_format' => '%Y%m%d%H', + 'time_slice_wait' => '10', + 'retry_limit' => '1024', + 'buffer_queue_full_action' => 'drop_oldest_chunk', + } + conf = config_element('ROOT', '', hash) + @i = @p.new + @i.configure(conf) + + assert_equal 'file', @i.buffer_config[:@type] + assert_equal 60*60, @i.buffer_config.timekey_range + assert_equal 10, @i.buffer_config.timekey_wait + assert_equal 1024, @i.buffer_config.retry_max_times + assert_equal :drop_oldest_chunk, @i.buffer_config.overflow_action + + assert @i.chunk_key_time + assert !@i.chunk_key_tag + assert_equal [], @i.chunk_keys + + assert_equal '/tmp/mybuffer/buffer.*.log', @i.buffer.path + end + end + + sub_test_case 'plugins which has default chunk key: tag' do + setup do + @p = Dummy3 + end + + test 'plugin helper converts parameters into plugin configuration parameters' do + hash = { + 'buffer_type' => 'memory', + 'num_threads' => '10', + 'flush_interval' => '10s', + 'try_flush_interval' => '0.1', + 'queued_chunk_flush_interval' => '0.5', + } + conf = config_element('ROOT', '', hash) + @i = @p.new + @i.configure(conf) + + assert_equal 'memory', @i.buffer_config[:@type] + assert_equal 10, @i.buffer_config.flush_threads + assert_equal 10, @i.buffer_config.flush_interval + assert_equal 0.1, @i.buffer_config.flush_thread_interval + assert_equal 0.5, @i.buffer_config.flush_burst_interval + + assert !@i.chunk_key_time + assert @i.chunk_key_tag + assert_equal [], @i.chunk_keys + end + end +end