Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in_monitor_agent emits plugins info to use exisiting plugins. fix #667 #670

Merged
merged 1 commit into from
Oct 27, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion lib/fluent/plugin/in_monitor_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def initialize

config_param :bind, :string, :default => '0.0.0.0'
config_param :port, :integer, :default => 24220
config_param :tag, :string, :default => nil
config_param :emit_interval, :time, :default => 60

class MonitorServlet < WEBrick::HTTPServlet::AbstractServlet
def initialize(server, agent)
Expand Down Expand Up @@ -199,6 +201,36 @@ def process(req, res)
end
end

class TimerWatcher < Coolio::TimerWatcher
def initialize(interval, log, &callback)
@callback = callback
@log = log

# Avoid long shutdown time
@num_call = 0
if interval >= 10
min_interval = 10
@call_interval = interval / 10
else
min_interval = interval
@call_interval = 0
end

super(min_interval, true)
end

def on_timer
@num_call += 1
if @num_call >= @call_interval
@num_call = 0
@callback.call
end
rescue => e
@log.error e.to_s
@log.error_backtrace
end
end

def start
log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins"
@srv = WEBrick::HTTPServer.new({
Expand All @@ -214,6 +246,29 @@ def start
@thread = Thread.new {
@srv.start
}
if @tag
log.debug "tag parameter is specified. Emit plugins info to '#{@tag}'"

@loop = Coolio::Loop.new
opts = {:with_config => false}
timer = TimerWatcher.new(@emit_interval, log) {
es = MultiEventStream.new
now = Engine.now
plugins_info_all(opts).each { |record|
es.add(now, record)
}
router.emit_stream(@tag, es)
}
@loop.attach(timer)
@thread_for_emit = Thread.new(&method(:run))
end
end

def run
@loop.run
rescue => e
log.error "unexpected error", :error => e.to_s
log.error_backtrace
end

def shutdown
Expand All @@ -225,6 +280,13 @@ def shutdown
@thread.join
@thread = nil
end
if @tag
@loop.watchers.each { |w| w.detach }
@loop.stop
@loop = nil
@thread_for_emit.join
@thread_for_emit = nil
end
end

MONITOR_INFO = {
Expand Down Expand Up @@ -320,7 +382,7 @@ def get_monitor_info(pe, opts={})
obj['plugin_id'] = pe.plugin_id
obj['plugin_category'] = plugin_category(pe)
obj['type'] = pe.config['@type'] || pe.config['type']
obj['config'] = pe.config
obj['config'] = pe.config if !opts.has_key?(:with_config) || opts[:with_config]

# run MONITOR_INFO in plugins' instance context and store the info to obj
MONITOR_INFO.each_pair {|key,code|
Expand Down