Skip to content

Commit

Permalink
in_monitor_agent emits plugins info to use exisiting plugins. fix #667
Browse files Browse the repository at this point in the history
  • Loading branch information
repeatedly committed Sep 4, 2015
1 parent c529038 commit 3b1494f
Showing 1 changed file with 56 additions and 0 deletions.
56 changes: 56 additions & 0 deletions lib/fluent/plugin/in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def initialize

config_param :bind, :string, :default => '0.0.0.0'
config_param :port, :integer, :default => 24220
config_param :tag, :string, :default => nil
config_param :emit_interval, :time, :default => 60

class MonitorServlet < WEBrick::HTTPServlet::AbstractServlet
def initialize(server, agent)
Expand Down Expand Up @@ -199,6 +201,36 @@ def process(req, res)
end
end

class TimerWatcher < Coolio::TimerWatcher
def initialize(interval, log, &callback)
@callback = callback
@log = log

# Avoid long shutdown time
@num_call = 0
if interval >= 10
min_interval = 10
@call_interval = interval / 10
else
min_interval = interval
@call_interval = 0
end

super(min_interval, true)
end

def on_timer
@num_call += 1
if @num_call >= @call_interval
@num_call = 0
@callback.call
end
rescue => e
@log.error e.to_s
@log.error_backtrace
end
end

def start
log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins"
@srv = WEBrick::HTTPServer.new({
Expand All @@ -214,6 +246,23 @@ def start
@thread = Thread.new {
@srv.start
}
if @tag
log.debug "tag parameter is specified. Emit plugins info to '#{@tag}'"

@loop = Coolio::Loop.new
timer = TimerWatcher.new(@emit_interval, log) {
router.emit(@tag, Engine.now, {'plugins'.freeze => plugins_info_all})
}
@loop.attach(timer)
@thread_for_emit = Thread.new(&method(:run))
end
end

def run
@loop.run
rescue => e
log.error "unexpected error", :error => e.to_s
log.error_backtrace
end

def shutdown
Expand All @@ -225,6 +274,13 @@ def shutdown
@thread.join
@thread = nil
end
if @tag
@loop.watchers.each { |w| w.detach }
@loop.stop
@loop = nil
@thread_for_emit.join
@thread_for_emit = nil
end
end

MONITOR_INFO = {
Expand Down

0 comments on commit 3b1494f

Please sign in to comment.