Skip to content

Commit

Permalink
out_forward: heartbeat_type none
Browse files Browse the repository at this point in the history
  • Loading branch information
sonots committed Jul 7, 2015
1 parent e13cc27 commit 782061c
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 17 deletions.
60 changes: 43 additions & 17 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ def initialize
:tcp
when 'udp'
:udp
when 'none'
:none
else
raise ConfigError, "forward output heartbeat type should be 'tcp' or 'udp'"
raise ConfigError, "forward output heartbeat type should be 'tcp', 'udp', or 'none'"
end
end
config_param :heartbeat_interval, :time, :default => 1
Expand Down Expand Up @@ -111,7 +113,12 @@ def configure(conf)

node_conf = NodeConfig.new(name, host, port, weight, standby, failure,
@phi_threshold, recover_sample_size, @expire_dns_cache, @phi_failure_detector)
@nodes << Node.new(log, node_conf)

if @heartbeat_type == :none
@nodes << NoneHeartbeatNode.new(log, node_conf)
else
@nodes << Node.new(log, node_conf)
end
log.info "adding forwarding server '#{name}'", :host=>host, :port=>port, :weight=>weight, :plugin_id=>plugin_id
}
end
Expand All @@ -123,32 +130,36 @@ def start
rebuild_weight_array
@rr = 0

@loop = Coolio::Loop.new
unless @heartbeat_type == :none
@loop = Coolio::Loop.new

if @heartbeat_type == :udp
# assuming all hosts use udp
@usock = SocketUtil.create_udp_socket(@nodes.first.host)
@usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
@hb = HeartbeatHandler.new(@usock, method(:on_heartbeat))
@loop.attach(@hb)
end
if @heartbeat_type == :udp
# assuming all hosts use udp
@usock = SocketUtil.create_udp_socket(@nodes.first.host)
@usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
@hb = HeartbeatHandler.new(@usock, method(:on_heartbeat))
@loop.attach(@hb)
end

@timer = HeartbeatRequestTimer.new(@heartbeat_interval, method(:on_timer))
@loop.attach(@timer)
@timer = HeartbeatRequestTimer.new(@heartbeat_interval, method(:on_timer))
@loop.attach(@timer)

@thread = Thread.new(&method(:run))
@thread = Thread.new(&method(:run))
end
end

def shutdown
@finished = true
@loop.watchers.each {|w| w.detach }
@loop.stop
@thread.join
if @loop
@loop.watchers.each {|w| w.detach }
@loop.stop
end
@thread.join if @thread
@usock.close if @usock
end

def run
@loop.run
@loop.run if @loop
rescue
log.error "unexpected error", :error=>$!.to_s
log.error_backtrace
Expand Down Expand Up @@ -514,6 +525,21 @@ def to_msgpack(out = '')
end
end

# Override Node to disable heartbeat
class NoneHeartbeatNode < Node
def available?
true
end

def tick
false
end

def heartbeat(detect=true)
true
end
end

class FailureDetector
PHI_FACTOR = 1.0 / Math.log(10.0)
SAMPLE_SIZE = 1000
Expand Down
19 changes: 19 additions & 0 deletions test/plugin/test_out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ def test_configure_tcp_heartbeat
assert_equal :tcp, d.instance.heartbeat_type
end

def test_configure_none_heartbeat
d = create_driver(CONFIG + "\nheartbeat_type none")
assert_equal :none, d.instance.heartbeat_type
end

def test_phi_failure_detector
d = create_driver(CONFIG + %[phi_failure_detector false \n phi_threshold 0])
node = d.instance.nodes.first
Expand Down Expand Up @@ -327,6 +332,20 @@ def shutdown
}.configure(conf).inject_router()
end

def test_heartbeat_type_none
d = create_driver(CONFIG + "\nheartbeat_type none")
node = d.instance.nodes.first
assert_equal Fluent::ForwardOutput::NoneHeartbeatNode, node.class

d.instance.start
assert_nil d.instance.instance_variable_get(:@loop) # no HeartbeatHandler, or HeartbeatRequestTimer
assert_nil d.instance.instance_variable_get(:@thread) # no HeartbeatHandler, or HeartbeatRequestTimer

stub(node.failure).phi { raise 'Should not be called' }
node.tick
assert_equal node.available, true
end

class DummyEngineDriver < Fluent::Test::TestDriver
def initialize(klass, &block)
super(klass, &block)
Expand Down

0 comments on commit 782061c

Please sign in to comment.