diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index 569f3e7049..b777320a6f 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -43,8 +43,10 @@ def initialize :tcp when 'udp' :udp + when 'none' + :none else - raise ConfigError, "forward output heartbeat type should be 'tcp' or 'udp'" + raise ConfigError, "forward output heartbeat type should be 'tcp', 'udp', or 'none'" end end config_param :heartbeat_interval, :time, :default => 1 @@ -111,7 +113,12 @@ def configure(conf) node_conf = NodeConfig.new(name, host, port, weight, standby, failure, @phi_threshold, recover_sample_size, @expire_dns_cache, @phi_failure_detector) - @nodes << Node.new(log, node_conf) + + if @heartbeat_type == :none + @nodes << NoneHeartbeatNode.new(log, node_conf) + else + @nodes << Node.new(log, node_conf) + end log.info "adding forwarding server '#{name}'", :host=>host, :port=>port, :weight=>weight, :plugin_id=>plugin_id } end @@ -123,32 +130,36 @@ def start rebuild_weight_array @rr = 0 - @loop = Coolio::Loop.new + unless @heartbeat_type == :none + @loop = Coolio::Loop.new - if @heartbeat_type == :udp - # assuming all hosts use udp - @usock = SocketUtil.create_udp_socket(@nodes.first.host) - @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) - @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat)) - @loop.attach(@hb) - end + if @heartbeat_type == :udp + # assuming all hosts use udp + @usock = SocketUtil.create_udp_socket(@nodes.first.host) + @usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) + @hb = HeartbeatHandler.new(@usock, method(:on_heartbeat)) + @loop.attach(@hb) + end - @timer = HeartbeatRequestTimer.new(@heartbeat_interval, method(:on_timer)) - @loop.attach(@timer) + @timer = HeartbeatRequestTimer.new(@heartbeat_interval, method(:on_timer)) + @loop.attach(@timer) - @thread = Thread.new(&method(:run)) + @thread = Thread.new(&method(:run)) + end end def shutdown @finished = true - @loop.watchers.each {|w| w.detach } - @loop.stop - @thread.join + if @loop + @loop.watchers.each {|w| w.detach } + @loop.stop + end + @thread.join if @thread @usock.close if @usock end def run - @loop.run + @loop.run if @loop rescue log.error "unexpected error", :error=>$!.to_s log.error_backtrace @@ -514,6 +525,21 @@ def to_msgpack(out = '') end end + # Override Node to disable heartbeat + class NoneHeartbeatNode < Node + def available? + true + end + + def tick + false + end + + def heartbeat(detect=true) + true + end + end + class FailureDetector PHI_FACTOR = 1.0 / Math.log(10.0) SAMPLE_SIZE = 1000 diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index a657362135..3bce14607d 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -59,6 +59,11 @@ def test_configure_tcp_heartbeat assert_equal :tcp, d.instance.heartbeat_type end + def test_configure_none_heartbeat + d = create_driver(CONFIG + "\nheartbeat_type none") + assert_equal :none, d.instance.heartbeat_type + end + def test_phi_failure_detector d = create_driver(CONFIG + %[phi_failure_detector false \n phi_threshold 0]) node = d.instance.nodes.first @@ -327,6 +332,20 @@ def shutdown }.configure(conf).inject_router() end + def test_heartbeat_type_none + d = create_driver(CONFIG + "\nheartbeat_type none") + node = d.instance.nodes.first + assert_equal Fluent::ForwardOutput::NoneHeartbeatNode, node.class + + d.instance.start + assert_nil d.instance.instance_variable_get(:@loop) # no HeartbeatHandler, or HeartbeatRequestTimer + assert_nil d.instance.instance_variable_get(:@thread) # no HeartbeatHandler, or HeartbeatRequestTimer + + stub(node.failure).phi { raise 'Should not be called' } + node.tick + assert_equal node.available, true + end + class DummyEngineDriver < Fluent::Test::TestDriver def initialize(klass, &block) super(klass, &block)