From e3650a7e2fb18b9bbe6b076cbfea74a199b98b56 Mon Sep 17 00:00:00 2001 From: Sam Pointer Date: Tue, 28 Mar 2017 16:01:42 +0100 Subject: [PATCH] configurable sub-second precision with no time key For input plugins that do not provide a time key as part of the record but do provide a `time` to the router, allow the degree of sub-second time precision to be configurable. Some sources (such as AWS CloudWatch) may provide an accurate time source but do not include a time portion for an otherwise free-form record: there is nothing to parse. In this case the casting of a `DateTime` to a `String` loses any sub-second precision. --- README.md | 9 ++++- lib/fluent/plugin/out_elasticsearch.rb | 3 +- test/plugin/test_out_elasticsearch.rb | 49 +++++++++++++++++++++----- 3 files changed, 50 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index f36a7ffc..d64775ec 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ Note: For Amazon Elasticsearch Service please consider using [fluent-plugin-aws- + [logstash_prefix](#logstash_prefix) + [logstash_dateformat](#logstash_dateformat) + [time_key_format](#time_key_format) + + [time_precision](#time_precision) + [time_key](#time_key) + [time_key_exclude_timestamp](#time_key_exclude_timestamp) + [utc_index](#utc_index) @@ -140,6 +141,12 @@ For example to parse ISO8601 times with sub-second precision: time_key_format %Y-%m-%dT%H:%M:%S.%N%z ``` +### time_precision + +Should the record not include a `time_key`, define the degree of sub-second time precision to preserve from the `time` portion of the routed event. + +For example, should your input plugin not include a `time_key` in the record but it able to pass a `time` to the router when emitting the event (AWS CloudWatch events are an example of this), then this setting will allow you to preserve the sub-second time resolution of those events. This is the case for: [fluent-plugin-cloudwatch-ingest](https://github.com/sampointer/fluent-plugin-cloudwatch-ingest). + ### time_key By default, when inserting records in [Logstash](https://www.elastic.co/products/logstash) format, `@timestamp` is dynamically created with the time at log ingestion. If you'd like to use a custom time, include an `@timestamp` with your record. @@ -360,7 +367,7 @@ remove_keys a_parent, a_routing # a_parent and a_routing fields won't be sent to ### remove_keys_on_update -Remove keys on update will not update the configured keys in elasticsearch when a record is being updated. +Remove keys on update will not update the configured keys in elasticsearch when a record is being updated. This setting only has any effect if the write operation is update or upsert. If the write setting is upsert then these keys are only removed if the record is being diff --git a/lib/fluent/plugin/out_elasticsearch.rb b/lib/fluent/plugin/out_elasticsearch.rb index 0be0e352..8db0dea8 100644 --- a/lib/fluent/plugin/out_elasticsearch.rb +++ b/lib/fluent/plugin/out_elasticsearch.rb @@ -27,6 +27,7 @@ class ConnectionFailure < StandardError; end config_param :target_index_key, :string, :default => nil config_param :target_type_key, :string, :default => nil config_param :time_key_format, :string, :default => nil + config_param :time_precision, :integer, :default => 0 config_param :logstash_format, :bool, :default => false config_param :logstash_prefix, :string, :default => "logstash" config_param :logstash_dateformat, :string, :default => "%Y.%m.%d" @@ -299,7 +300,7 @@ def write_objects(tag, chunk) record[TIMESTAMP_FIELD] = rts unless @time_key_exclude_timestamp else dt = Time.at(time).to_datetime - record[TIMESTAMP_FIELD] = dt.to_s + record[TIMESTAMP_FIELD] = dt.iso8601(@time_precision) end dt = dt.new_offset(0) if @utc_index target_index = "#{@logstash_prefix}-#{dt.strftime(@logstash_dateformat)}" diff --git a/test/plugin/test_out_elasticsearch.rb b/test/plugin/test_out_elasticsearch.rb index 8dff6415..712a25cc 100644 --- a/test/plugin/test_out_elasticsearch.rb +++ b/test/plugin/test_out_elasticsearch.rb @@ -120,8 +120,8 @@ def test_template_create to_return(:status => 200, :body => "", :headers => {}) driver('test', config) - end - + end + def test_template_create_invalid_filename config = %{ @@ -177,14 +177,14 @@ def test_templates_create to_return(:status => 200, :body => "", :headers => {}) stub_request(:put, "https://john:doe@logs.google.com:777/es//_template/logstash3"). to_return(:status => 200, :body => "", :headers => {}) - + driver('test', config) - + assert_requested( :put, "https://john:doe@logs.google.com:777/es//_template/logstash1", times: 1) assert_requested( :put, "https://john:doe@logs.google.com:777/es//_template/logstash2", times: 1) assert_not_requested(:put, "https://john:doe@logs.google.com:777/es//_template/logstash3") #exists end - + def test_templates_not_used cwd = File.dirname(__FILE__) template_file = File.join(cwd, 'test_template.json') @@ -199,7 +199,7 @@ def test_templates_not_used template_name logstash template_file #{template_file} templates {"logstash1":"#{template_file}", "logstash2":"#{template_file}" } - } + } # connection start stub_request(:head, "https://john:doe@logs.google.com:777/es//"). to_return(:status => 200, :body => "", :headers => {}) @@ -254,7 +254,7 @@ def test_templates_can_be_partially_created_if_error_occurs assert_raise(RuntimeError) { driver('test', config) } - + assert_requested(:put, "https://john:doe@logs.google.com:777/es//_template/logstash1", times: 1) assert_not_requested(:put, "https://john:doe@logs.google.com:777/es//_template/logstash2") end @@ -770,6 +770,37 @@ def test_uses_custom_time_key_format_obscure_format assert_equal(index_cmds[1]['@timestamp'], ts) end + def test_uses_no_subsecond_precision_by_default + driver.configure("logstash_format true\n") + stub_elastic_ping + stub_elastic + begin + time = Fluent::EventTime.new(Time.now.to_i, 000000000) + rescue + time = Fluent::Engine.now + end + driver.emit(sample_record, time) + driver.run + assert(index_cmds[1].has_key? '@timestamp') + assert_equal(index_cmds[1]['@timestamp'], Time.at(time).iso8601) + end + + def test_uses_subsecond_precision_when_configured + driver.configure("logstash_format true + time_precision 3\n") + stub_elastic_ping + stub_elastic + begin + time = Fluent::EventTime.new(Time.now.to_i, 000000000) + rescue + time = Fluent::Engine.now + end + driver.emit(sample_record, time) + driver.run + assert(index_cmds[1].has_key? '@timestamp') + assert_equal(index_cmds[1]['@timestamp'], Time.at(time).iso8601(3)) + end + def test_doesnt_add_tag_key_by_default stub_elastic_ping stub_elastic @@ -931,7 +962,7 @@ def test_reconnect_on_error_enabled stub_request(:post, "http://localhost:9200/_bulk").with do |req| raise ZeroDivisionError, "any not host_unreachable_exceptions exception" end - + driver.configure("reconnect_on_error true\n") driver.emit(sample_record) @@ -955,7 +986,7 @@ def test_reconnect_on_error_disabled stub_request(:post, "http://localhost:9200/_bulk").with do |req| raise ZeroDivisionError, "any not host_unreachable_exceptions exception" end - + driver.configure("reconnect_on_error false\n") driver.emit(sample_record)