Skip to content

Commit

Permalink
Merge pull request #1207 from fluent/symmetric-time-parse-and-format
Browse files Browse the repository at this point in the history
Symmetric time parse and format
  • Loading branch information
tagomoris authored Sep 6, 2016
2 parents 68d27dd + 79643c8 commit d8a40cd
Show file tree
Hide file tree
Showing 21 changed files with 712 additions and 157 deletions.
4 changes: 2 additions & 2 deletions lib/fluent/plugin/formatter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

require 'fluent/plugin/base'
require 'fluent/plugin/owned_by_mixin'

require 'fluent/mixin' # for TimeFormatter
require 'fluent/time'

module Fluent
module Plugin
class Formatter < Base
include OwnedByMixin
include TimeMixin::Formatter

configured_in :format

Expand Down
5 changes: 1 addition & 4 deletions lib/fluent/plugin/formatter_out_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ class OutFileFormatter < Formatter
end
end
config_param :time_type, :enum, list: [:float, :unixtime, :string], default: :string
config_param :time_format, :string, default: nil
config_param :localtime, :bool, default: true # if localtime is false and timezone is nil, then utc
config_param :timezone, :string, default: nil

def configure(conf)
# TODO: make a utility method in TimeFormatter to handle these conversion
Expand Down Expand Up @@ -63,7 +60,7 @@ def configure(conf)
when :float then ->(time){ time.to_r.to_f }
when :unixtime then ->(time){ time.to_i }
else
Fluent::TimeFormatter.new(@time_format, @localtime, @timezone)
time_formatter_create
end
end

Expand Down
50 changes: 3 additions & 47 deletions lib/fluent/plugin/parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@
require 'fluent/time'
require 'fluent/plugin/string_util'

require 'strptime'

module Fluent
module Plugin
class Parser < Base
include OwnedByMixin
include TimeMixin::Parser

class ParserError < StandardError; end

Expand All @@ -52,57 +51,14 @@ def call(*a, &b)
parse(*a, &b)
end

class TimeParser
def initialize(time_format)
@cache1_key = nil
@cache1_time = nil
@cache2_key = nil
@cache2_time = nil
@parser =
if time_format
begin
strptime = Strptime.new(time_format)
Proc.new { |value| Fluent::EventTime.from_time(strptime.exec(value)) }
rescue
Proc.new { |value| Fluent::EventTime.from_time(Time.strptime(value, time_format)) }
end
else
Proc.new { |value| Fluent::EventTime.parse(value) }
end
end

# TODO: new cache mechanism using format string
def parse(value)
unless value.is_a?(String)
raise ParserError, "value must be string: #{value}"
end

if @cache1_key == value
return @cache1_time
elsif @cache2_key == value
return @cache2_time
else
begin
time = @parser.call(value)
rescue => e
raise ParserError, "invalid time format: value = #{value}, error_class = #{e.class.name}, error = #{e.message}"
end
@cache1_key = @cache2_key
@cache1_time = @cache2_time
@cache2_key = value
@cache2_time = time
return time
end
end
end
TimeParser = Fluent::TimeParser
end

class ValuesParser < Parser
include Fluent::TypeConverter

config_param :keys, :array, default: []
config_param :time_key, :string, default: nil
config_param :time_format, :string, default: nil
config_param :null_value_pattern, :string, default: nil
config_param :null_empty_string, :bool, default: false

Expand All @@ -117,7 +73,7 @@ def configure(conf)
raise ConfigError, "time_format parameter is ignored because time_key parameter is not set. at #{conf.inspect}"
end

@time_parser = TimeParser.new(@time_format)
@time_parser = time_parser_create

if @null_value_pattern
@null_value_pattern = Regexp.new(@null_value_pattern)
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/parser_apache2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class Apache2Parser < Parser

def initialize
super
@time_parser = TimeParser.new(TIME_FORMAT)
@time_parser = time_parser_create(format: TIME_FORMAT)
@mutex = Mutex.new
end

Expand Down
5 changes: 2 additions & 3 deletions lib/fluent/plugin/parser_json.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ class JSONParser < Parser
Plugin.register_parser('json', self)

config_param :time_key, :string, default: 'time'
config_param :time_format, :string, default: nil
config_param :json_parser, :string, default: 'oj'

def configure(conf)
super

unless @time_format.nil?
@time_parser = TimeParser.new(@time_format)
if @time_format
@time_parser = time_parser_create
@mutex = Mutex.new
end

Expand Down
3 changes: 1 addition & 2 deletions lib/fluent/plugin/parser_regexp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ class RegexpParser < Parser
config_param :ignorecase, :bool, default: false
config_param :multiline, :bool, default: false
config_param :time_key, :string, default: 'time'
config_param :time_format, :string, default: nil

def initialize
super
Expand All @@ -18,7 +17,7 @@ def initialize

def configure(conf)
super
@time_parser = TimeParser.new(@time_format)
@time_parser = time_parser_create
unless @expression.empty?
if @expression[0] == "/" && @expression[-1] == "/"
regexp_option = 0
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/plugin/parser_syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class SyslogParser < Parser
# From in_syslog default pattern
REGEXP_WITH_PRI = /^\<(?<pri>[0-9]+)\>(?<time>[^ ]* {1,2}[^ ]* [^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$/

config_param :time_format, :string, default: "%b %d %H:%M:%S"
config_set_default :time_format, "%b %d %H:%M:%S"
config_param :with_priority, :bool, default: false

def initialize
Expand All @@ -40,7 +40,7 @@ def configure(conf)
super

@regexp = @with_priority ? REGEXP_WITH_PRI : REGEXP
@time_parser = TimeParser.new(@time_format)
@time_parser = time_parser_create
end

def patterns
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/plugin_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
require 'fluent/plugin_helper/parser'
require 'fluent/plugin_helper/formatter'
require 'fluent/plugin_helper/inject'
require 'fluent/plugin_helper/extract'
require 'fluent/plugin_helper/retry_state'
require 'fluent/plugin_helper/compat_parameters'

Expand Down
90 changes: 90 additions & 0 deletions lib/fluent/plugin_helper/extract.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/event'
require 'fluent/time'
require 'fluent/configurable'

module Fluent
module PluginHelper
module Extract
def extract_tag_from_record(record)
return nil unless @_extract_enabled

if @_extract_tag_key && record.has_key?(@_extract_tag_key)
return record[@_extract_tag_key].to_s
end

nil
end

def extract_time_from_record(record)
return nil unless @_extract_enabled

if @_extract_time_key && record.has_key?(@_extract_time_key)
return @_extract_time_parser.call(record[@_extract_time_key])
end

nil
end

module ExtractParams
include Fluent::Configurable
config_section :extract, required: false, multi: false, param_name: :extract_config do
config_param :tag_key, :string, default: nil
config_param :time_key, :string, default: nil
config_param :time_type, :enum, list: [:float, :unixtime, :string], default: :float

Fluent::TimeMixin::TIME_PARAMETERS.each do |name, type, opts|
config_param name, type, opts
end
end
end

def self.included(mod)
mod.include ExtractParams
end

def initialize
super
@_extract_enabled = false
@_extract_tag_key = nil
@_extract_time_key = nil
@_extract_time_parser = nil
end

def configure(conf)
super

if @extract_config
@_extract_tag_key = @extract_config.tag_key
@_extract_time_key = @extract_config.time_key
if @_extract_time_key
@_extract_time_parser = case @extract_config.time_type
when :float then ->(v){ Fluent::EventTime.new(v.to_i, ((v.to_f - v.to_i) * 1_000_000_000).to_i) }
when :unixtime then ->(v){ Fluent::EventTime.new(v.to_i, 0) }
else
localtime = @extract_config.localtime && !@extract_config.utc
Fluent::TimeParser.new(@extract_config.time_format, localtime, @extract_config.timezone)
end
end

@_extract_enabled = @_extract_tag_key || @_extract_time_key
end
end
end
end
end
20 changes: 8 additions & 12 deletions lib/fluent/plugin_helper/inject.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
#

require 'fluent/event'
require 'time'
require 'fluent/time'
require 'fluent/configurable'
require 'socket'

module Fluent
module PluginHelper
Expand Down Expand Up @@ -67,10 +68,10 @@ module InjectParams
config_param :tag_key, :string, default: nil
config_param :time_key, :string, default: nil
config_param :time_type, :enum, list: [:float, :unixtime, :string], default: :float
config_param :time_format, :string, default: nil
config_param :localtime, :bool, default: true # if localtime is false and timezone is nil, then utc
config_param :utc, :bool, default: false # placeholder to turn localtime to false
config_param :timezone, :string, default: nil

Fluent::TimeMixin::TIME_PARAMETERS.each do |name, type, opts|
config_param name, type, opts
end
end
end

Expand All @@ -89,12 +90,6 @@ def initialize
end

def configure(conf)
conf.elements('inject').each do |e|
if e.has_key?('utc') && Fluent::Config.bool_value(e['utc'])
e['localtime'] = 'false'
end
end

super

if @inject_config
Expand All @@ -113,7 +108,8 @@ def configure(conf)
when :float then ->(time){ time.to_r.truncate(+6).to_f } # microsecond floating point value
when :unixtime then ->(time){ time.to_i }
else
Fluent::TimeFormatter.new(@inject_config.time_format, @inject_config.localtime, @inject_config.timezone)
localtime = @inject_config.localtime && !@inject_config.utc
Fluent::TimeFormatter.new(@inject_config.time_format, localtime, @inject_config.timezone)
end
end

Expand Down
7 changes: 0 additions & 7 deletions lib/fluent/test/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,3 @@ def run(num_waits = 10, &block)
end
end
end

Test::Unit::Assertions.module_eval do
def assert_equal_event_time(a, b)
assert_equal(a.sec, b.sec)
assert_equal(a.nsec, b.nsec)
end
end
18 changes: 18 additions & 0 deletions lib/fluent/test/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@
module Fluent
module Test
module Helpers
# See "Example Custom Assertion: http://test-unit.github.io/test-unit/en/Test/Unit/Assertions.html
def assert_equal_event_time(expected, actual, message = nil)
message = build_message(message, <<EOT, expected, actual)
<?> expected but was
<?>.
EOT
assert_block(message) do
expected.is_a?(Fluent::EventTime) && actual.is_a?(Fluent::EventTime) && expected.sec == actual.sec && expected.nsec == actual.nsec
end
end

def config_element(name = 'test', argument = '', params = {}, elements = [])
Fluent::Config::Element.new(name, argument, params, elements)
end
Expand All @@ -37,6 +48,13 @@ def event_time(str=nil, format: nil)
end
end

def with_timezone(tz)
oldtz, ENV['TZ'] = ENV['TZ'], tz
yield
ensure
ENV['TZ'] = oldtz
end

def time2str(time, localtime: false, format: nil)
if format
if localtime
Expand Down
Loading

0 comments on commit d8a40cd

Please sign in to comment.