Skip to content

Commit

Permalink
Merge pull request #533 from fluent/fix_force_flush_time_sliced_output
Browse files Browse the repository at this point in the history
Fix TimeSlicedOutput does not flush with SIGUSR1
  • Loading branch information
repeatedly authored and sonots committed Jan 15, 2015
1 parent 0c56f7b commit 49e15d2
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
14 changes: 10 additions & 4 deletions lib/fluent/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def format_stream(tag, es)
#def write(chunk)
#end

def enqueue_buffer
def enqueue_buffer(force = false)
@buffer.keys.each {|key|
@buffer.push(key)
}
Expand Down Expand Up @@ -379,7 +379,7 @@ def try_flush
end

def force_flush
enqueue_buffer
enqueue_buffer(true)
submit_flush
end

Expand Down Expand Up @@ -547,8 +547,14 @@ def emit(tag, es, chain)
}
end

def enqueue_buffer
@enqueue_buffer_proc.call
def enqueue_buffer(force = false)
if force
@buffer.keys.each {|key|
@buffer.push(key)
}
else
@enqueue_buffer_proc.call
end
end

#def format(tag, event)
Expand Down
47 changes: 47 additions & 0 deletions test/test_output.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require_relative 'helper'
require 'fluent/test'
require 'fluent/output'
require 'timecop'
require 'flexmock'

module FluentOutputTest
include Fluent
Expand Down Expand Up @@ -159,4 +161,49 @@ def test_router_emit
assert_true d.instance.router.respond_to?(:emit)
end
end

class TimeSlicedOutputTest < ::Test::Unit::TestCase
include FluentOutputTest
include FlexMock::TestCase

def setup
Fluent::Test.setup
FileUtils.rm_rf(TMP_DIR)
FileUtils.mkdir_p(TMP_DIR)
end

TMP_DIR = File.expand_path(File.dirname(__FILE__) + "/../tmp/time_sliced_output")

CONFIG = %[]

def create_driver(conf=CONFIG)
Fluent::Test::TimeSlicedOutputTestDriver.new(Fluent::TimeSlicedOutput).configure(conf, true)
end

sub_test_case "test_force_flush" do
setup do
time = Time.parse("2011-01-02 13:14:15 UTC")
Timecop.freeze(time)
@es = OneEventStream.new(time.to_i, {"message" => "foo"})
end

teardown do
Timecop.return
end

test "force_flush immediately flushes" do
d = create_driver(CONFIG + %[
time_format %Y%m%d%H%M%S
buffer_path #{TMP_DIR}/foo
])
d.instance.start
# buffer should be popped (flushed) immediately
flexmock(d.instance.instance_variable_get(:@buffer)).should_receive(:pop).once
# force_flush
d.instance.emit('test', @es, NullOutputChain.instance)
d.instance.force_flush
10.times { sleep 0.05 }
end
end
end
end

0 comments on commit 49e15d2

Please sign in to comment.