Skip to content

Commit

Permalink
Merge pull request #1662 from fluent/support-record-accessor-in-chunk…
Browse files Browse the repository at this point in the history
…-keys

output: Support record_accessor in buffer's chunk keys
  • Loading branch information
repeatedly authored Aug 16, 2017
2 parents 1836ca3 + 60d6a7a commit c7e94e8
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 6 deletions.
28 changes: 23 additions & 5 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#

require 'fluent/plugin/base'
require 'fluent/plugin_helper/record_accessor'
require 'fluent/log'
require 'fluent/plugin_id'
require 'fluent/plugin_helper'
Expand All @@ -36,7 +37,7 @@ class Output < Base
helpers_internal :thread, :retry_state

CHUNK_KEY_PATTERN = /^[-_.@a-zA-Z0-9]+$/
CHUNK_KEY_PLACEHOLDER_PATTERN = /\$\{[-_.@a-zA-Z0-9]+\}/
CHUNK_KEY_PLACEHOLDER_PATTERN = /\$\{[-_.@$a-zA-Z0-9]+\}/
CHUNK_TAG_PLACEHOLDER_PATTERN = /\$\{(tag(?:\[\d+\])?)\}/

CHUNKING_FIELD_WARN_NUM = 4
Expand Down Expand Up @@ -161,7 +162,7 @@ def expired?
attr_reader :num_errors, :emit_count, :emit_records, :write_count, :rollback_count

# for tests
attr_reader :buffer, :retry, :secondary, :chunk_keys, :chunk_key_time, :chunk_key_tag
attr_reader :buffer, :retry, :secondary, :chunk_keys, :chunk_key_accessors, :chunk_key_time, :chunk_key_tag
attr_accessor :output_enqueue_thread_waiting, :dequeued_chunks, :dequeued_chunks_mutex
# output_enqueue_thread_waiting: for test of output.rb itself
attr_accessor :retry_for_error_chunk # if true, error flush will be retried even if under_plugin_development is true
Expand Down Expand Up @@ -203,7 +204,7 @@ def initialize
@output_flush_threads = nil

@simple_chunking = nil
@chunk_keys = @chunk_key_time = @chunk_key_tag = nil
@chunk_keys = @chunk_key_accessors = @chunk_key_time = @chunk_key_tag = nil
@flush_mode = nil
@timekey_zone = nil

Expand Down Expand Up @@ -276,8 +277,25 @@ def configure(conf)
@chunk_keys = @buffer_config.chunk_keys.dup
@chunk_key_time = !!@chunk_keys.delete('time')
@chunk_key_tag = !!@chunk_keys.delete('tag')
if @chunk_keys.any?{ |key| key !~ CHUNK_KEY_PATTERN }
if @chunk_keys.any? { |key|
begin
k = Fluent::PluginHelper::RecordAccessor::Accessor.parse_parameter(key)
if k.is_a?(String)
k !~ CHUNK_KEY_PATTERN
else
if key.start_with?('$[')
raise Fluent::ConfigError, "in chunk_keys: bracket notation is not allowed"
else
false
end
end
rescue => e
raise Fluent::ConfigError, "in chunk_keys: #{e.message}"
end
}
raise Fluent::ConfigError, "chunk_keys specification includes invalid char"
else
@chunk_key_accessors = Hash[@chunk_keys.map { |key| [key.to_sym, Fluent::PluginHelper::RecordAccessor::Accessor.new(key)] }]
end

if @chunk_key_time
Expand Down Expand Up @@ -778,7 +796,7 @@ def metadata(tag, time, record)
else
nil
end
pairs = Hash[@chunk_keys.map{|k| [k.to_sym, record[k]]}]
pairs = Hash[@chunk_key_accessors.map { |k, a| [k, a.call(record)] }]
@buffer.metadata(timekey: timekey, tag: (@chunk_key_tag ? tag : nil), variables: pairs)
end
end
Expand Down
28 changes: 27 additions & 1 deletion test/plugin/test_output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,18 @@ def waiting(seconds)
assert_equal "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/value1/value2/tail", @i.extract_placeholders(tmpl, m)
end

test '#extract_placeholders can extract nested variables if variables are configured with dot notation' do
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'key,$.nest.key', {})]))
assert !@i.chunk_key_time
assert !@i.chunk_key_tag
assert_equal ['key','$.nest.key'], @i.chunk_keys
tmpl = "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/${key}/${$.nest.key}/tail"
t = event_time('2016-04-11 20:30:00 +0900')
v = {:key => "value1", :"$.nest.key" => "value2"}
m = create_metadata(timekey: t, tag: 'fluentd.test.output', variables: v)
assert_equal "/mypath/%Y/%m/%d/%H-%M/${tag}/${tag[1]}/${tag[2]}/value1/value2/tail", @i.extract_placeholders(tmpl, m)
end

test '#extract_placeholders can extract all chunk keys if configured' do
@i.configure(config_element('ROOT', '', {}, [config_element('buffer', 'time,tag,key1,key2', {'timekey' => 60*30, 'timekey_zone' => "+0900"})]))
assert @i.chunk_key_time
Expand Down Expand Up @@ -493,11 +505,21 @@ def waiting(seconds)
assert_equal ['.hidden', '0001', '@timestamp', 'a_key', 'my-domain'], @i.get_placeholders_keys("http://${my-domain}/${.hidden}/${0001}/${a_key}?timestamp=${@timestamp}")
end

data('include space' => 'ke y',
'bracket notation' => "$['key']",
'invalid notation' => "$.ke y")
test 'configure checks invalid chunk keys' do |chunk_keys|
i = create_output(:buffered)
assert_raise Fluent::ConfigError do
i.configure(config_element('ROOT' , '', {}, [config_element('buffer', chunk_keys)]))
end
end

test '#metadata returns object which contains tag/timekey/variables from records as specified in configuration' do
tag = 'test.output'
time = event_time('2016-04-12 15:31:23 -0700')
timekey = event_time('2016-04-12 15:00:00 -0700')
record = {"key1" => "value1", "num1" => 1, "message" => "my message"}
record = {"key1" => "value1", "num1" => 1, "message" => "my message", "nest" => {"key" => "nested value"}}

i1 = create_output(:buffered)
i1.configure(config_element('ROOT','',{},[config_element('buffer', '')]))
Expand Down Expand Up @@ -530,6 +552,10 @@ def waiting(seconds)
i8 = create_output(:buffered)
i8.configure(config_element('ROOT','',{},[config_element('buffer', 'time,tag,key1', {"timekey" => 3600, "timekey_zone" => "-0700"})]))
assert_equal create_metadata(timekey: timekey, tag: tag, variables: {key1: "value1"}), i8.metadata(tag, time, record)

i9 = create_output(:buffered)
i9.configure(config_element('ROOT','',{},[config_element('buffer', 'key1,$.nest.key', {})]))
assert_equal create_metadata(variables: {:key1 => "value1", :"$.nest.key" => 'nested value'}), i9.metadata(tag, time, record)
end

test '#emit calls #process via #emit_sync for non-buffered output' do
Expand Down

0 comments on commit c7e94e8

Please sign in to comment.