diff --git a/lib/fluent/output.rb b/lib/fluent/output.rb index e3d577315f..f8fe6c09ca 100644 --- a/lib/fluent/output.rb +++ b/lib/fluent/output.rb @@ -272,7 +272,7 @@ def format_stream(tag, es) #def write(chunk) #end - def enqueue_buffer + def enqueue_buffer(force = false) @buffer.keys.each {|key| @buffer.push(key) } @@ -383,7 +383,7 @@ def try_flush end def force_flush - enqueue_buffer + enqueue_buffer(true) submit_flush end @@ -551,8 +551,14 @@ def emit(tag, es, chain) } end - def enqueue_buffer - @enqueue_buffer_proc.call + def enqueue_buffer(force = false) + if force + @buffer.keys.each {|key| + @buffer.push(key) + } + else + @enqueue_buffer_proc.call + end end #def format(tag, event) diff --git a/test/test_output.rb b/test/test_output.rb index 3cc455ba75..a2370a7c23 100644 --- a/test/test_output.rb +++ b/test/test_output.rb @@ -1,6 +1,8 @@ require_relative 'helper' require 'fluent/test' require 'fluent/output' +require 'timecop' +require 'flexmock' module FluentOutputTest include Fluent @@ -160,4 +162,49 @@ def test_secondary assert_not_nil d.instance.instance_variable_get(:@secondary).router end end + + class TimeSlicedOutputTest < ::Test::Unit::TestCase + include FluentOutputTest + include FlexMock::TestCase + + def setup + Fluent::Test.setup + FileUtils.rm_rf(TMP_DIR) + FileUtils.mkdir_p(TMP_DIR) + end + + TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/time_sliced_output") + + CONFIG = %[] + + def create_driver(conf=CONFIG) + Fluent::Test::TimeSlicedOutputTestDriver.new(Fluent::TimeSlicedOutput).configure(conf, true) + end + + sub_test_case "test_force_flush" do + setup do + time = Time.parse("2011-01-02 13:14:15 UTC") + Timecop.freeze(time) + @es = OneEventStream.new(time.to_i, {"message" => "foo"}) + end + + teardown do + Timecop.return + end + + test "force_flush immediately flushes" do + d = create_driver(CONFIG + %[ + time_format %Y%m%d%H%M%S + buffer_path #{TMP_DIR}/foo + ]) + d.instance.start + # buffer should be popped (flushed) immediately + flexmock(d.instance.instance_variable_get(:@buffer)).should_receive(:pop).once + # force_flush + d.instance.emit('test', @es, NullOutputChain.instance) + d.instance.force_flush + 10.times { sleep 0.05 } + end + end + end end