diff --git a/fluentd.gemspec b/fluentd.gemspec
index 3bae4edc57..d585392892 100644
--- a/fluentd.gemspec
+++ b/fluentd.gemspec
@@ -22,7 +22,8 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency("msgpack", [">= 0.7.0"])
gem.add_runtime_dependency("json", [">= 1.4.3"])
gem.add_runtime_dependency("yajl-ruby", ["~> 1.0"])
- gem.add_runtime_dependency("cool.io", [">= 1.4.1", "< 2.0.0"])
+ gem.add_runtime_dependency("cool.io", [">= 1.4.3", "< 2.0.0"])
+ gem.add_runtime_dependency("serverengine", [">= 1.6.3"])
gem.add_runtime_dependency("http_parser.rb", [">= 0.5.1", "< 0.7.0"])
gem.add_runtime_dependency("sigdump", ["~> 0.2.2"])
gem.add_runtime_dependency("tzinfo", [">= 1.0.0"])
diff --git a/lib/fluent/command/debug.rb b/lib/fluent/command/debug.rb
index 254d6b59f0..a5bc6f54ea 100644
--- a/lib/fluent/command/debug.rb
+++ b/lib/fluent/command/debug.rb
@@ -66,7 +66,10 @@
include Fluent::SystemConfig::Mixin
-$log = Fluent::Log.new(STDERR, Fluent::Log::LEVEL_TRACE)
+dl_opts = {}
+dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE
+logger = ServerEngine::DaemonLogger.new(STDERR, dl_opts)
+$log = Fluent::Log.new(logger)
Fluent::Engine.init(system_config)
DRb::DRbObject.class_eval do
diff --git a/lib/fluent/command/fluentd.rb b/lib/fluent/command/fluentd.rb
index 1be139838e..6f497607d2 100644
--- a/lib/fluent/command/fluentd.rb
+++ b/lib/fluent/command/fluentd.rb
@@ -260,4 +260,9 @@
exit 0 if early_exit
require 'fluent/supervisor'
-Fluent::Supervisor.new(opts).start
+if opts[:supervise]
+ Fluent::Supervisor.new(opts).run_supervisor
+else
+ Fluent::Supervisor.new(opts).run_worker
+end
+
diff --git a/lib/fluent/daemon.rb b/lib/fluent/daemon.rb
new file mode 100644
index 0000000000..055f4a2a35
--- /dev/null
+++ b/lib/fluent/daemon.rb
@@ -0,0 +1,15 @@
+#!/usr/bin/env ruby
+# -*- coding: utf-8 -*-
+
+here = File.dirname(__FILE__)
+$LOAD_PATH << File.expand_path(File.join(here, '..'))
+
+require 'serverengine'
+require 'fluent/supervisor'
+
+server_module = Fluent.const_get(ARGV[0])
+worker_module = Fluent.const_get(ARGV[1])
+# it doesn't call ARGV in block because when reloading config, params will be initialized and then it can't use previous config.
+config_path = ARGV[2]
+params = JSON.parse(ARGV[3])
+ServerEngine::Daemon.run_server(server_module, worker_module) { Fluent::Supervisor.load_config(config_path, params) }
diff --git a/lib/fluent/log.rb b/lib/fluent/log.rb
index e29e500c8e..5a22bec904 100644
--- a/lib/fluent/log.rb
+++ b/lib/fluent/log.rb
@@ -53,9 +53,34 @@ def self.str_to_level(log_level_str)
end
end
- def initialize(out=STDERR, level=LEVEL_TRACE, opts={})
- @out = out
- @level = level
+ def initialize(logger, opts={})
+ # overwrites logger.level= so that config reloading resets level of Fluentd::Log
+ orig_logger_level_setter = logger.class.public_instance_method(:level=).bind(logger)
+ me = self
+ # The original ruby logger sets the number as each log level like below.
+ # DEBUG = 0
+ # INFO = 1
+ # WARN = 2
+ # ERROR = 3
+ # FATAL = 4
+ # Serverengine use this original log number. In addition to this, serverengine sets -1 as TRACE level.
+ # TRACE = -1
+ #
+ # On the other hand, in fluentd side, it sets the number like below.
+ # TRACE = 0
+ # DEBUG = 1
+ # INFO = 2
+ # WARN = 3
+ # ERROR = 4
+ # FATAL = 5
+ #
+ # Then fluentd's level is set as serverengine's level + 1.
+ # So if serverengine's logger level is changed, fluentd's log level will be changed to that + 1.
+ logger.define_singleton_method(:level=) {|level| orig_logger_level_setter.call(level); me.level = self.level + 1 }
+
+ @logger = logger
+ @out = logger.instance_variable_get(:@logdev)
+ @level = logger.level + 1
@debug_mode = false
@self_event = false
@tag = 'fluent'
@@ -74,7 +99,10 @@ def initialize(out=STDERR, level=LEVEL_TRACE, opts={})
end
def dup
- clone = self.class.new(@out, @level, suppress_repeated_stacktrace: @suppress_repeated_stacktrace)
+ dl_opts = {}
+ dl_opts[:log_level] = @level - 1
+ logger = ServerEngine::DaemonLogger.new(@out, dl_opts)
+ clone = self.class.new(logger, suppress_repeated_stacktrace: @suppress_repeated_stacktrace)
clone.tag = @tag
clone.time_format = @time_format
# optional headers/attrs are not copied, because new PluginLogger should have another one of it
@@ -87,6 +115,18 @@ def dup
attr_accessor :time_format
attr_accessor :optional_header, :optional_attrs
+ def logdev=(logdev)
+ @out = logdev
+ @logger.instance_variable_set(:@logdev, logdev)
+ nil
+ end
+
+ def reopen!
+ # do noting in @logger.reopen! because it's already reopened in Supervisor.load_config
+ @logger.reopen! if @logger
+ nil
+ end
+
def enable_debug(b=true)
@debug_mode = b
self
@@ -238,7 +278,7 @@ def fatal_backtrace(backtrace=$!.backtrace)
end
def puts(msg)
- @out.puts(msg)
+ @logger << msg + "\n"
@out.flush
msg
rescue
diff --git a/lib/fluent/plugin/in_forward.rb b/lib/fluent/plugin/in_forward.rb
index 2fbd52c271..221d03d5a4 100644
--- a/lib/fluent/plugin/in_forward.rb
+++ b/lib/fluent/plugin/in_forward.rb
@@ -57,11 +57,16 @@ def configure(conf)
def start
@loop = Coolio::Loop.new
- @lsock = listen
+ socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
+ if Fluent.windows?
+ socket_manager_path = socket_manager_path.to_i
+ end
+ client = ServerEngine::SocketManager::Client.new(socket_manager_path)
+
+ @lsock = listen(client)
@loop.attach(@lsock)
- @usock = SocketUtil.create_udp_socket(@bind)
- @usock.bind(@bind, @port)
+ @usock = client.listen_udp(@bind, @port)
@usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
@hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request))
@loop.attach(@hbr)
@@ -86,9 +91,10 @@ def shutdown
@lsock.close
end
- def listen
+ def listen(client)
log.info "listening fluent socket on #{@bind}:#{@port}"
- s = Coolio::TCPServer.new(@bind, @port, Handler, @linger_timeout, log, method(:on_message))
+ sock = client.listen_tcp(@bind, @port)
+ s = Coolio::TCPServer.new(sock, nil, Handler, @linger_timeout, log, method(:on_message))
s.listen(@backlog) unless @backlog.nil?
s
end
diff --git a/lib/fluent/plugin/in_http.rb b/lib/fluent/plugin/in_http.rb
index 5f50603587..dc33f11d4f 100644
--- a/lib/fluent/plugin/in_http.rb
+++ b/lib/fluent/plugin/in_http.rb
@@ -101,7 +101,13 @@ def on_timer
def start
log.debug "listening http on #{@bind}:#{@port}"
- lsock = TCPServer.new(@bind, @port)
+
+ socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
+ if Fluent.windows?
+ socket_manager_path = socket_manager_path.to_i
+ end
+ client = ServerEngine::SocketManager::Client.new(socket_manager_path)
+ lsock = client.listen_tcp(@bind, @port)
detach_multi_process do
super
diff --git a/lib/fluent/plugin/in_syslog.rb b/lib/fluent/plugin/in_syslog.rb
index 206c8fbe4a..24e1b96f59 100644
--- a/lib/fluent/plugin/in_syslog.rb
+++ b/lib/fluent/plugin/in_syslog.rb
@@ -182,13 +182,18 @@ def receive_data(data, addr)
def listen(callback)
log.info "listening syslog socket on #{@bind}:#{@port} with #{@protocol_type}"
+ socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
+ if Fluent.windows?
+ socket_manager_path = socket_manager_path.to_i
+ end
+ client = ServerEngine::SocketManager::Client.new(socket_manager_path)
if @protocol_type == :udp
- @usock = SocketUtil.create_udp_socket(@bind)
- @usock.bind(@bind, @port)
+ @usock = client.listen_udp(@bind, @port)
SocketUtil::UdpHandler.new(@usock, log, 2048, callback)
else
# syslog family add "\n" to each message and this seems only way to split messages in tcp stream
- Coolio::TCPServer.new(@bind, @port, SocketUtil::TcpHandler, log, "\n", callback)
+ lsock = client.listen_tcp(@bind, @port)
+ Coolio::TCPServer.new(lsock, nil, SocketUtil::TcpHandler, log, "\n", callback)
end
end
diff --git a/lib/fluent/plugin/in_tcp.rb b/lib/fluent/plugin/in_tcp.rb
index cfac648bf9..addfc604c4 100644
--- a/lib/fluent/plugin/in_tcp.rb
+++ b/lib/fluent/plugin/in_tcp.rb
@@ -28,7 +28,14 @@ class TcpInput < SocketUtil::BaseInput
def listen(callback)
log.info "listening tcp socket on #{@bind}:#{@port}"
- Coolio::TCPServer.new(@bind, @port, SocketUtil::TcpHandler, log, @delimiter, callback)
+
+ socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
+ if Fluent.windows?
+ socket_manager_path = socket_manager_path.to_i
+ end
+ client = ServerEngine::SocketManager::Client.new(socket_manager_path)
+ lsock = client.listen_tcp(@bind, @port)
+ Coolio::TCPServer.new(lsock, nil, SocketUtil::TcpHandler, log, @delimiter, callback)
end
end
end
diff --git a/lib/fluent/plugin/in_udp.rb b/lib/fluent/plugin/in_udp.rb
index 5fe9dd90e8..14f08fc574 100644
--- a/lib/fluent/plugin/in_udp.rb
+++ b/lib/fluent/plugin/in_udp.rb
@@ -25,8 +25,12 @@ class UdpInput < SocketUtil::BaseInput
def listen(callback)
log.info "listening udp socket on #{@bind}:#{@port}"
- @usock = SocketUtil.create_udp_socket(@bind)
- @usock.bind(@bind, @port)
+ socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
+ if Fluent.windows?
+ socket_manager_path = socket_manager_path.to_i
+ end
+ client = ServerEngine::SocketManager::Client.new(socket_manager_path)
+ @usock = client.listen_udp(@bind, @port)
SocketUtil::UdpHandler.new(@usock, log, @body_size_limit, callback)
end
end
diff --git a/lib/fluent/plugin/socket_util.rb b/lib/fluent/plugin/socket_util.rb
index 83f2e00355..eb61874412 100644
--- a/lib/fluent/plugin/socket_util.rb
+++ b/lib/fluent/plugin/socket_util.rb
@@ -125,7 +125,7 @@ def start
def shutdown
@loop.watchers.each { |w| w.detach }
- @loop.stop
+ @loop.stop if @loop.instance_variable_get("@running")
@handler.close
@thread.join
end
diff --git a/lib/fluent/supervisor.rb b/lib/fluent/supervisor.rb
index 11a7004618..37546c62e3 100644
--- a/lib/fluent/supervisor.rb
+++ b/lib/fluent/supervisor.rb
@@ -24,6 +24,8 @@
require 'fluent/plugin'
require 'fluent/rpc'
require 'fluent/system_config'
+require 'serverengine'
+require 'shellwords'
if Fluent.windows?
require 'windows/library'
@@ -35,21 +37,210 @@
end
module Fluent
- class Supervisor
- def self.get_etc_passwd(user)
- if user.to_i.to_s == user
- Etc.getpwuid(user.to_i)
- else
- Etc.getpwnam(user)
+ module ServerModule
+ def before_run
+ @start_time = Time.now
+
+ if config[:rpc_endpoint]
+ @rpc_endpoint = config[:rpc_endpoint]
+ @enable_get_dump = config[:enable_get_dump]
+ run_rpc_server
end
+ install_supervisor_signal_handlers
+
+ socket_manager_path = ServerEngine::SocketManager::Server.generate_path
+ ServerEngine::SocketManager::Server.open(socket_manager_path)
+ ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s
end
- def self.get_etc_group(group)
- if group.to_i.to_s == group
- Etc.getgrgid(group.to_i)
- else
- Etc.getgrnam(group)
+ def after_run
+ if Time.now - @start_time < 1
+ $log.warn "process died within 1 second. exit."
+ end
+
+ stop_rpc_server if @rpc_endpoint
+ end
+
+ def run_rpc_server
+ @rpc_server = RPC::Server.new(@rpc_endpoint, $log)
+
+ # built-in RPC for signals
+ @rpc_server.mount_proc('/api/processes.interruptWorkers') { |req, res|
+ $log.debug "fluentd RPC got /api/processes.interruptWorkers request"
+ Process.kill :INT, $$
+ nil
+ }
+ @rpc_server.mount_proc('/api/processes.killWorkers') { |req, res|
+ $log.debug "fluentd RPC got /api/processes.killWorkers request"
+ Process.kill :TERM, $$
+ nil
+ }
+ @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res|
+ $log.debug "fluentd RPC got /api/plugins.flushBuffers request"
+ unless Fluent.windows?
+ Process.kill :USR1, $$
+ end
+ nil
+ }
+ @rpc_server.mount_proc('/api/config.reload') { |req, res|
+ $log.debug "fluentd RPC got /api/config.reload request"
+ if Fluent.windows?
+ # restart worker with auto restarting by killing
+ kill_worker
+ else
+ Process.kill :HUP, $$
+ end
+ nil
+ }
+ @rpc_server.mount_proc('/api/config.dump') { |req, res|
+ $log.debug "fluentd RPC got /api/config.dump request"
+ $log.info "dump in-memory config"
+ supervisor_dump_config_handler
+ nil
+ }
+
+ @rpc_server.mount_proc('/api/config.getDump') { |req, res|
+ $log.debug "fluentd RPC got /api/config.dump request"
+ $log.info "get dump in-memory config via HTTP"
+ res.body = supervisor_get_dump_config_handler
+ [nil, nil, res]
+ } if @enable_get_dump
+
+ @rpc_server.start
+ end
+
+ def stop_rpc_server
+ @rpc_server.shutdown
+ end
+
+ def install_supervisor_signal_handlers
+ trap :USR1 do
+ $log.debug "fluentd supervisor process get SIGUSR1"
+ supervisor_sigusr1_handler
+ end unless Fluent.windows?
+ end
+
+ def supervisor_sigusr1_handler
+ if log = config[:logger_initializer]
+ log.reopen!
+ end
+
+ if pid = config[:worker_pid]
+ Process.kill(:USR1, pid)
+ # don't rescue Erro::ESRSH here (invalid status)
+ end
+ end
+
+ def kill_worker
+ if pid = config[:worker_pid]
+ if Fluent.windows?
+ Process.kill :KILL, pid
+ else
+ Process.kill :INT, pid
+ end
+ end
+ end
+
+ def supervisor_dump_config_handler
+ $log.info config[:fluentd_conf].to_s
+ end
+
+ def supervisor_get_dump_config_handler
+ {conf: config[:fluentd_conf].to_s}
+ end
+ end
+
+ module WorkerModule
+ def spawn(process_manager)
+ main_cmd = config[:main_cmd]
+ @pm = process_manager.spawn(main_cmd)
+ end
+
+ def after_start
+ config[:worker_pid] = @pm.pid
+ end
+ end
+
+ class Supervisor
+ def self.load_config(path, params = {})
+
+ pre_loadtime = 0
+ pre_loadtime = params['pre_loadtime'].to_i if params['pre_loadtime']
+ pre_config_mtime = nil
+ pre_config_mtime = params['pre_config_mtime'] if params['pre_config_mtime']
+ config_mtime = File.mtime(path)
+
+ # reuse previous config if last load time is within 5 seconds and mtime of the config file is not changed
+ if Time.now - Time.at(pre_loadtime) < 5 and config_mtime == pre_config_mtime
+ return params['pre_conf']
end
+
+ config_fname = File.basename(path)
+ config_basedir = File.dirname(path)
+ config_data = File.read(path)
+ inline_config = params['inline_config']
+ if inline_config == '-'
+ config_data << "\n" << STDIN.read
+ elsif inline_config
+ config_data << "\n" << inline_config.gsub("\\n","\n")
+ end
+ fluentd_conf = Fluent::Config.parse(config_data, config_fname, config_basedir, params['use_v1_config'])
+ system_config = SystemConfig.create(fluentd_conf)
+
+ log_level = system_config.log_level || params['log_level']
+ suppress_repeated_stacktrace = system_config.suppress_repeated_stacktrace || params['suppress_repeated_stacktrace']
+ log_path = params['log_path']
+ chuser = params['chuser']
+ chgroup = params['chgroup']
+ rpc_endpoint = system_config.rpc_endpoint
+ enable_get_dump = system_config.enable_get_dump
+
+ log_opts = {suppress_repeated_stacktrace: suppress_repeated_stacktrace}
+ logger_initializer = Supervisor::LoggerInitializer.new(log_path, log_level, chuser, chgroup, log_opts)
+ # this #init sets initialized logger to $log
+ logger_initializer.init
+ logger = $log
+
+ daemonize = params.fetch('daemonize', false)
+ main_cmd = params['main_cmd']
+
+ se_config = {
+ worker_type: 'spawn',
+ workers: 1,
+ log_stdin: false,
+ log_stdout: false,
+ log_stderr: false,
+ enable_heartbeat: true,
+ auto_heartbeat: false,
+ logger: logger,
+ log: logger.out,
+ log_path: log_path,
+ log_level: log_level,
+ logger_initializer: logger_initializer,
+ chuser: chuser,
+ chgroup: chgroup,
+ suppress_repeated_stacktrace: suppress_repeated_stacktrace,
+ daemonize: daemonize,
+ rpc_endpoint: rpc_endpoint,
+ enable_get_dump: enable_get_dump,
+ windows_daemon_cmdline: [ServerEngine.ruby_bin_path,
+ File.join(File.dirname(__FILE__), 'daemon.rb'),
+ ServerModule.name,
+ WorkerModule.name,
+ path,
+ JSON.dump(params)],
+ fluentd_conf: fluentd_conf,
+ main_cmd: main_cmd,
+ }
+ pre_params = params.dup
+ params['pre_loadtime'] = Time.now.to_i
+ params['pre_config_mtime'] = config_mtime
+ params['pre_conf'] = se_config
+ # prevent pre_conf from being too big by reloading many times.
+ pre_params['pre_conf'] = nil
+ params['pre_conf'][:windows_daemon_cmdline][5] = JSON.dump(pre_params)
+
+ return se_config
end
class LoggerInitializer
@@ -65,15 +256,19 @@ def init
if @path && @path != "-"
@io = File.open(@path, "a")
if @chuser || @chgroup
- chuid = @chuser ? Supervisor.get_etc_passwd(@chuser).uid : nil
- chgid = @chgroup ? Supervisor.get_etc_group(@chgroup).gid : nil
+ chuid = @chuser ? ServerEngine::Daemon.get_etc_passwd(@chuser).uid : nil
+ chgid = @chgroup ? ServerEngine::Daemon.get_etc_group(@chgroup).gid : nil
File.chown(chuid, chgid, @path)
end
else
@io = STDOUT
end
- $log = Fluent::Log.new(@io, @level, @opts)
+ dl_opts = {}
+ # subtract 1 to match serverengine daemon logger side logging severity.
+ dl_opts[:log_level] = @level - 1
+ logger = ServerEngine::DaemonLogger.new(@io, dl_opts)
+ $log = Fluent::Log.new(logger, @opts)
$log.enable_color(false) if @path
$log.enable_debug if @level <= Fluent::Log::LEVEL_DEBUG
end
@@ -138,60 +333,20 @@ def initialize(opt)
@without_source = opt[:without_source]
@signame = opt[:signame]
- if Fluent.windows?
- ruby_path = "\0" * 256
- GetModuleFileName.call(0,ruby_path,256)
- ruby_path = ruby_path.rstrip.gsub(/\\/, '/')
- @rubybin_dir = ruby_path[0, ruby_path.rindex("/")]
- @winosvi = windows_version
- end
- log_opts = {suppress_repeated_stacktrace: opt[:suppress_repeated_stacktrace]}
+ @suppress_repeated_stacktrace = opt[:suppress_repeated_stacktrace]
+ log_opts = {suppress_repeated_stacktrace: @suppress_repeated_stacktrace}
@log = LoggerInitializer.new(@log_path, @log_level, @chuser, @chgroup, log_opts)
@finished = false
- @main_pid = nil
end
- def start
+ def run_supervisor
@log.init
show_plugin_config if @show_plugin_config
read_config
set_system_config
dry_run if @dry_run
- start_daemonize if @daemonize
- setup_rpc_server if @rpc_endpoint
- setup_rpc_get_dump if @enable_get_dump
-
- if @supervise
- install_supervisor_signal_handlers
- run_rpc_server if @rpc_endpoint
- until @finished
- supervise do
- change_privilege
- init_engine
- install_main_process_signal_handlers
- run_configure
- finish_daemonize if @daemonize
- run_engine
- exit 0
- end
- $log.error "fluentd main process died unexpectedly. restarting." unless @finished
- end
- else
- $log.info "starting fluentd-#{Fluent::VERSION} without supervision"
- run_rpc_server if @rpc_endpoint
- main_process do
- change_privilege
- init_engine
- install_main_process_signal_handlers
- install_main_process_winsigint_handler if Fluent.windows?
- run_configure
- finish_daemonize if @daemonize
- run_engine
- exit 0
- end
- end
- stop_rpc_server if @rpc_endpoint
+ supervise
end
def options
@@ -203,14 +358,33 @@ def options
}
end
+ def run_worker
+ @log.init
+ Process.setproctitle("worker:#{@process_name}") if @process_name
+
+ show_plugin_config if @show_plugin_config
+ read_config
+ set_system_config
+
+ install_main_process_signal_handlers
+
+ $log.info "starting fluentd-#{Fluent::VERSION} without supervision"
+
+ main_process do
+ change_privilege
+ init_engine
+ run_configure
+ run_engine
+ exit 0
+ end
+ end
+
private
def dry_run
$log.info "starting fluentd-#{Fluent::VERSION} as dry run mode"
-
change_privilege
init_engine
- install_main_process_signal_handlers
run_configure
exit 0
rescue => e
@@ -243,156 +417,73 @@ def show_plugin_config
exit 1
end
- def start_daemonize
- @wait_daemonize_pipe_r, @wait_daemonize_pipe_w = IO.pipe
-
- if fork
- # console process
- @wait_daemonize_pipe_w.close
- @wait_daemonize_pipe_w = nil
- wait_daemonize
- exit 0
- end
-
- # daemonize intermediate process
- @wait_daemonize_pipe_r.close
- @wait_daemonize_pipe_r = nil
-
- # in case the child process forked during run_configure
- @wait_daemonize_pipe_w.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
-
- Process.setsid
- exit!(0) if fork
- File.umask(0)
-
- # supervisor process
- @supervisor_pid = Process.pid
- end
+ def supervise
+ $log.info "starting fluentd-#{Fluent::VERSION}"
- def wait_daemonize
- supervisor_pid = @wait_daemonize_pipe_r.read
- if supervisor_pid.empty?
- # initialization failed
- exit! 1
+ if Fluent.windows?
+ fluentd_spawn_cmd = ServerEngine.ruby_bin_path + ' "' + $0.gsub('"', '""') + '" '
+ $fluentdargv.each{|a|
+ fluentd_spawn_cmd << ('"' + a.gsub('"', '""') + '" ')
+ }
+ else
+ fluentd_spawn_cmd = $0.shellescape + ' '
+ $fluentdargv.each{|a|
+ fluentd_spawn_cmd << (a.shellescape + " ")
+ }
end
- @wait_daemonize_pipe_r.close
- @wait_daemonize_pipe_r = nil
-
- # write pid file
- File.open(@daemonize, "w") {|f|
- f.write supervisor_pid
+ fluentd_spawn_cmd << ("--no-supervisor")
+ $log.info "spawn command to main: " + fluentd_spawn_cmd
+
+ params = {}
+ params['main_cmd'] = fluentd_spawn_cmd
+ params['daemonize'] = @daemonize
+ params['inline_config'] = @inline_config
+ params['log_path'] = @log_path
+ params['log_level'] = @log_level
+ params['chuser'] = @chuser
+ params['chgroup'] = @chgroup
+ params['use_v1_config'] = @use_v1_config
+ params['suppress_repeated_stacktrace'] = @suppress_repeated_stacktrace
+
+ se = ServerEngine.create(ServerModule, WorkerModule){
+ Fluent::Supervisor.load_config(@config_path, params)
}
+ se.run
end
- def finish_daemonize
- if @wait_daemonize_pipe_w
- STDIN.reopen("/dev/null")
- STDOUT.reopen("/dev/null", "w")
- STDERR.reopen("/dev/null", "w")
- @wait_daemonize_pipe_w.write @supervisor_pid.to_s
- @wait_daemonize_pipe_w.close
- @wait_daemonize_pipe_w = nil
+ def install_main_process_signal_handlers
+ # When user use Ctrl + C not SIGINT, SIGINT is sent to all process in same process group.
+ # Then serverengine can't handle signal, so need to handle it in this process.
+ trap :INT do
+ $log.debug "fluentd main process get SIGINT"
+ unless @finished
+ @finished = true
+ $log.debug "getting start to shutdown main process"
+ Fluent::Engine.stop
+ end
end
- end
-
- def setup_rpc_server
- @rpc_server = RPC::Server.new(@rpc_endpoint, $log)
-
- # built-in RPC for signals
- @rpc_server.mount_proc('/api/processes.interruptWorkers') { |req, res|
- $log.debug "fluentd RPC got /api/processes.interruptWorkers request"
- supervisor_sigint_handler
- nil
- }
- @rpc_server.mount_proc('/api/processes.killWorkers') { |req, res|
- $log.debug "fluentd RPC got /api/processes.killWorkers request"
- supervisor_sigterm_handler
- nil
- }
- @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res|
- $log.debug "fluentd RPC got /api/plugins.flushBuffers request"
- supervisor_sigusr1_handler
- nil
- }
- @rpc_server.mount_proc('/api/config.reload') { |req, res|
- $log.debug "fluentd RPC got /api/config.reload request"
- $log.info "restarting"
- supervisor_sighup_handler
- nil
- }
- @rpc_server.mount_proc('/api/config.dump') { |req, res|
- $log.debug "fluentd RPC got /api/config.dump request"
- $log.info "dump in-memory config"
- supervisor_dump_config_handler
- nil
- }
- end
-
- def setup_rpc_get_dump
- @rpc_server.mount_proc('/api/config.getDump') { |req, res|
- $log.debug "fluentd RPC got /api/config.dump request"
- $log.info "get dump in-memory config via HTTP"
- res.body = supervisor_get_dump_config_handler
- [nil, nil, res]
- }
- end
-
- def run_rpc_server
- @rpc_server.start
- end
- def stop_rpc_server
- @rpc_server.shutdown
+ trap :USR1 do
+ flush_buffer
+ end unless Fluent.windows?
end
- def supervise(&block)
- start_time = Time.now
-
- Process.setproctitle("supervisor:#{@process_name}") if @process_name
- $log.info "starting fluentd-#{Fluent::VERSION}"
+ def flush_buffer
+ $log.debug "fluentd main process get SIGUSR1"
+ $log.info "force flushing buffered events"
+ @log.reopen!
- if !Fluent.windows?
- @main_pid = fork do
- main_process(&block)
- end
- else
- if @supervise
- fluentd_spawn_cmd = @rubybin_dir+"/ruby.exe '"+@rubybin_dir+"/fluentd' "
- $fluentdargv.each{|a|
- fluentd_spawn_cmd << (a + " ")
- }
- fluentd_spawn_cmd << ("--no-supervisor")
- $log.info "spawn command to main (windows) : " + fluentd_spawn_cmd
- @main_pid = Process.spawn(fluentd_spawn_cmd)
- else
- main_process(&block)
+ # Creating new thread due to mutex can't lock
+ # in main thread during trap context
+ Thread.new {
+ begin
+ Fluent::Engine.flush!
+ $log.debug "flushing thread: flushed"
+ rescue Exception => e
+ $log.warn "flushing thread error: #{e}"
end
- end
-
- if @daemonize && @wait_daemonize_pipe_w
- STDIN.reopen("/dev/null")
- STDOUT.reopen("/dev/null", "w")
- STDERR.reopen("/dev/null", "w")
- @wait_daemonize_pipe_w.close
- @wait_daemonize_pipe_w = nil
- end
-
- Process.waitpid(@main_pid)
- @main_pid = nil
- ecode = $?.to_i
-
- if Fluent.windows?
- @th_sv.kill if @th_sv.alive?
- @th_sv.join rescue nil
- end
-
- $log.info "process finished", code: ecode
-
- if !@finished && Time.now - start_time < 1
- $log.warn "process died within 1 second. exit."
- exit ecode
- end
+ }.run
end
def main_process(&block)
@@ -400,15 +491,14 @@ def main_process(&block)
begin
block.call
- if Fluent.windows?
- @th_ma.join
- end
-
rescue Fluent::ConfigError
$log.error "config error", file: @config_path, error: $!.to_s
$log.debug_backtrace
unless @log.stdout?
- console = Fluent::Log.new(STDOUT, @log_level).enable_debug
+ logger = ServerEngine::DaemonLogger.new(STDOUT)
+ log = Fluent::Log.new(logger)
+ log.level = @log_level
+ console = log.enable_debug
console.error "config error", file: @config_path, error: $!.to_s
console.debug_backtrace
end
@@ -417,7 +507,10 @@ def main_process(&block)
$log.error "unexpected error", error: $!.to_s
$log.error_backtrace
unless @log.stdout?
- console = Fluent::Log.new(STDOUT, @log_level).enable_debug
+ logger = ServerEngine::DaemonLogger.new(STDOUT)
+ log = Fluent::Log.new(logger)
+ log.level = @log_level
+ console = log.enable_debug
console.error "unexpected error", error: $!.to_s
console.error_backtrace
end
@@ -426,96 +519,6 @@ def main_process(&block)
exit! 1
end
- def install_supervisor_signal_handlers
- install_supervisor_winsigint_handler
-
- trap :INT do
- $log.debug "fluentd supervisor process get SIGINT"
- supervisor_sigint_handler
- end
-
- trap :TERM do
- $log.debug "fluentd supervisor process get SIGTERM"
- supervisor_sigterm_handler
- end
-
- trap :HUP do
- $log.debug "fluentd supervisor process get SIGHUP"
- $log.info "restarting"
- supervisor_sighup_handler
- end unless Fluent.windows?
-
- trap :USR1 do
- $log.debug "fluentd supervisor process get SIGUSR1"
- supervisor_sigusr1_handler
- end unless Fluent.windows?
- end
-
- def supervisor_sigint_handler
- @finished = true
- unless Fluent.windows?
- if pid = @main_pid
- # kill processes only still exists
- unless Process.waitpid(pid, Process::WNOHANG)
- begin
- Process.kill(:INT, pid)
- rescue Errno::ESRCH
- # ignore processes already died
- end
- end
- end
- else
- begin
- @evtend.set
- rescue
- # nothing to do.
- end
- end
- end
-
- def supervisor_sigterm_handler
- @finished = true
- if pid = @main_pid
- # kill processes only still exists
- unless Process.waitpid(pid, Process::WNOHANG)
- begin
- Process.kill(:TERM, pid)
- rescue Errno::ESRCH
- # ignore processes already died
- end
- end
- end
- end
-
- def supervisor_sighup_handler
- # Creating new thread due to mutex can't lock
- # in main thread during trap context
- Thread.new {
- read_config
- set_system_config
- if pid = @main_pid
- Process.kill(:TERM, pid)
- # don't resuce Erro::ESRSH here (invalid status)
- end
- }.run
- end
-
- def supervisor_sigusr1_handler
- @log.reopen!
- if pid = @main_pid
- Process.kill(:USR1, pid)
- # don't resuce Erro::ESRSH here (invalid status)
- end
- end
-
- def supervisor_dump_config_handler
- $log.info @conf.to_s
- end
-
- def supervisor_get_dump_config_handler
- {conf: @conf.to_s}
- end
-
def read_config
$log.info "reading config file", path: @config_path
@config_fname = File.basename(@config_path)
@@ -534,25 +537,8 @@ def set_system_config
@system_config.apply(self)
end
- def run_configure
- Fluent::Engine.run_configure(@conf)
- end
-
def change_privilege
- if @chgroup
- etc_group = Supervisor.get_etc_group(@chgroup)
- Process::GID.change_privilege(etc_group.gid)
- end
-
- if @chuser
- etc_pw = Supervisor.get_etc_passwd(@chuser)
- user_groups = [etc_pw.gid]
- Etc.setgrent
- Etc.group { |gr| user_groups << gr.gid if gr.mem.include?(etc_pw.name) } # emulate 'id -G'
-
- Process.groups = Process.groups | user_groups
- Process::UID.change_privilege(etc_pw.uid)
- end
+ ServerEngine::Daemon.change_privilege(@chuser, @chgroup)
end
def init_engine
@@ -570,102 +556,12 @@ def init_engine
}
end
- def install_main_process_signal_handlers
- # Strictly speaking, these signal handling is not thread safe.
- # But enough safe to limit twice call of Fluent::Engine.stop.
-
- trap :INT do
- $log.debug "fluentd main process get SIGINT"
- unless Fluent.windows?
- unless @finished
- @finished = true
- $log.debug "getting start to shutdown main process"
- Fluent::Engine.stop
- end
- else
- begin
- @evtend.set
- rescue
- # nothing to do.
- end
- end
- end
-
- trap :TERM do
- $log.debug "fluentd main process get SIGTERM"
- unless @finished
- @finished = true
- $log.debug "getting start to shutdown main process"
- Fluent::Engine.stop
- end
- end
-
- trap :HUP do
- # TODO
- $log.debug "fluentd main process get SIGHUP"
- end unless Fluent.windows?
-
- trap :USR1 do
- $log.debug "fluentd main process get SIGUSR1"
- $log.info "force flushing buffered events"
- @log.reopen!
-
- # Creating new thread due to mutex can't lock
- # in main thread during trap context
- Thread.new {
- begin
- Fluent::Engine.flush!
- $log.debug "flushing thread: flushed"
- rescue Exception => e
- $log.warn "flushing thread error: #{e}"
- end
- }.run
- end unless Fluent.windows?
+ def run_configure
+ Fluent::Engine.run_configure(@conf)
end
def run_engine
Fluent::Engine.run
end
-
- def install_supervisor_winsigint_handler
- @winintname = @signame || "fluentdwinsigint_#{Process.pid}"
- @th_sv = Thread.new do
- @evtend = Win32::Event.new(@winintname, true)
- until @evtend.signaled?
- sleep(1)
- end
- @evtend.close
-
- @finished = true
- if pid = @main_pid
- unless Process.waitpid(pid, Process::WNOHANG)
- sigx = (@winosvi >= 6.2) ? (:INT) : (:KILL)
- begin
- Process.kill(sigx, pid)
- rescue Errno::ESRCH
- # ignore processes already died
- end
- end
- end
- end
- end
-
- def install_main_process_winsigint_handler
- @winintname = @signame || "fluentdwinsigint_#{Process.ppid}"
- @th_ma = Thread.new do
- @evtend = Win32::Event.open(@winintname)
- until @evtend.signaled?
- sleep(1)
- end
- @evtend.close
-
- unless @finished
- @finished = true
- $log.debug "getting start to shutdown main process"
- Fluent::Engine.stop
- end
- end
- $log.debug "install_main_process_winsigint_handler***** installed main winsiginthandler"
- end
end
end
diff --git a/lib/fluent/test.rb b/lib/fluent/test.rb
index d34b4442f7..01f6ecbdd5 100644
--- a/lib/fluent/test.rb
+++ b/lib/fluent/test.rb
@@ -22,5 +22,10 @@
require 'fluent/test/filter_test'
require 'fluent/test/parser_test'
require 'fluent/test/formatter_test'
+require 'serverengine'
-$log ||= Fluent::Log.new(Fluent::Test::DummyLogDevice.new)
+dl_opts = {}
+dl_opts[:log_level] = ServerEngine::DaemonLogger::INFO
+logdev = Fluent::Test::DummyLogDevice.new
+logger = ServerEngine::DaemonLogger.new(logdev, dl_opts)
+$log ||= Fluent::Log.new(logger)
\ No newline at end of file
diff --git a/lib/fluent/test/base.rb b/lib/fluent/test/base.rb
index ae3cce27a8..57305538c9 100644
--- a/lib/fluent/test/base.rb
+++ b/lib/fluent/test/base.rb
@@ -17,6 +17,7 @@
require 'fluent/engine'
require 'fluent/system_config'
require 'fluent/config'
+require 'serverengine'
module Fluent
module Test
@@ -119,7 +120,11 @@ def close
class TestLogger < Fluent::PluginLogger
def initialize
@logdev = DummyLogDevice.new
- super(Fluent::Log.new(@logdev))
+ dl_opts = {}
+ dl_opts[:log_level] = ServerEngine::DaemonLogger::INFO
+ logger = ServerEngine::DaemonLogger.new(@logdev, dl_opts)
+ log = Fluent::Log.new(logger)
+ super(log)
end
def reset
diff --git a/test/helper.rb b/test/helper.rb
index 47b0f32aca..c132203a4d 100644
--- a/test/helper.rb
+++ b/test/helper.rb
@@ -47,6 +47,7 @@ def to_masked_element
require 'fluent/plugin_id'
require 'fluent/plugin_helper'
require 'fluent/time'
+require 'serverengine'
module Fluent
module Plugin
@@ -104,4 +105,8 @@ def ipv6_enabled?
end
end
-$log = Fluent::Log.new(Fluent::Test::DummyLogDevice.new, Fluent::Log::LEVEL_WARN)
+dl_opts = {}
+dl_opts[:log_level] = ServerEngine::DaemonLogger::WARN
+logdev = Fluent::Test::DummyLogDevice.new
+logger = ServerEngine::DaemonLogger.new(logdev, dl_opts)
+$log ||= Fluent::Log.new(logger)
diff --git a/test/plugin/test_in_forward.rb b/test/plugin/test_in_forward.rb
index fcf8c9a50d..3a940e856e 100644
--- a/test/plugin/test_in_forward.rb
+++ b/test/plugin/test_in_forward.rb
@@ -7,6 +7,18 @@
require 'fluent/plugin/in_forward'
class ForwardInputTest < Test::Unit::TestCase
+ class << self
+ def startup
+ socket_manager_path = ServerEngine::SocketManager::Server.generate_path
+ @server = ServerEngine::SocketManager::Server.open(socket_manager_path)
+ ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s
+ end
+
+ def shutdown
+ @server.close
+ end
+ end
+
def setup
Fluent::Test.setup
@responses = [] # for testing responses after sending data
diff --git a/test/plugin/test_in_http.rb b/test/plugin/test_in_http.rb
index b0ec859643..7e7265893f 100644
--- a/test/plugin/test_in_http.rb
+++ b/test/plugin/test_in_http.rb
@@ -4,6 +4,18 @@
require 'net/http'
class HttpInputTest < Test::Unit::TestCase
+ class << self
+ def startup
+ socket_manager_path = ServerEngine::SocketManager::Server.generate_path
+ @server = ServerEngine::SocketManager::Server.open(socket_manager_path)
+ ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s
+ end
+
+ def shutdown
+ @server.close
+ end
+ end
+
def setup
Fluent::Test.setup
end
diff --git a/test/plugin/test_in_syslog.rb b/test/plugin/test_in_syslog.rb
index 643add36f1..95f761622d 100755
--- a/test/plugin/test_in_syslog.rb
+++ b/test/plugin/test_in_syslog.rb
@@ -3,6 +3,18 @@
require 'fluent/plugin/in_syslog'
class SyslogInputTest < Test::Unit::TestCase
+ class << self
+ def startup
+ socket_manager_path = ServerEngine::SocketManager::Server.generate_path
+ @server = ServerEngine::SocketManager::Server.open(socket_manager_path)
+ ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s
+ end
+
+ def shutdown
+ @server.close
+ end
+ end
+
def setup
Fluent::Test.setup
require 'fluent/plugin/socket_util'
diff --git a/test/plugin/test_in_tcp.rb b/test/plugin/test_in_tcp.rb
index a5cbfb79e7..c6416a1672 100755
--- a/test/plugin/test_in_tcp.rb
+++ b/test/plugin/test_in_tcp.rb
@@ -3,6 +3,18 @@
require 'fluent/plugin/in_tcp'
class TcpInputTest < Test::Unit::TestCase
+ class << self
+ def startup
+ socket_manager_path = ServerEngine::SocketManager::Server.generate_path
+ @server = ServerEngine::SocketManager::Server.open(socket_manager_path)
+ ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s
+ end
+
+ def shutdown
+ @server.close
+ end
+ end
+
def setup
Fluent::Test.setup
end
diff --git a/test/plugin/test_in_udp.rb b/test/plugin/test_in_udp.rb
index 3d8f60db32..01c47c1b8a 100755
--- a/test/plugin/test_in_udp.rb
+++ b/test/plugin/test_in_udp.rb
@@ -3,6 +3,18 @@
require 'fluent/plugin/in_udp'
class UdpInputTest < Test::Unit::TestCase
+ class << self
+ def startup
+ socket_manager_path = ServerEngine::SocketManager::Server.generate_path
+ @server = ServerEngine::SocketManager::Server.open(socket_manager_path)
+ ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = socket_manager_path.to_s
+ end
+
+ def shutdown
+ @server.close
+ end
+ end
+
def setup
Fluent::Test.setup
end
@@ -55,6 +67,7 @@ def test_time_format
tests.each {|test|
u.send(test['msg'], 0)
}
+ u.close
sleep 1
end
@@ -63,6 +76,7 @@ def test_time_format
assert_equal_event_time(tests[i]['expected'], emits[i][1])
}
}
+
end
{
@@ -89,6 +103,7 @@ def test_time_format
tests.each { |test|
u.send(test['msg'], 0)
}
+ u.close
sleep 1
end
diff --git a/test/test_config.rb b/test/test_config.rb
index e245519203..c9a93eeeb4 100644
--- a/test/test_config.rb
+++ b/test/test_config.rb
@@ -157,14 +157,23 @@ def write_config(path, data)
end
def test_inline
- prepare_config
+ prepare_config
opts = {
- config_path: "#{TMP_DIR}/config_test_1.conf",
- inline_config: ""
+ :config_path => "#{TMP_DIR}/config_test_1.conf",
+ :inline_config => ""
}
assert_nothing_raised do
Fluent::Supervisor.new(opts)
- end
+ end
+ create_warn_dummy_logger
+ end
+
+ def create_warn_dummy_logger
+ dl_opts = {}
+ dl_opts[:log_level] = ServerEngine::DaemonLogger::WARN
+ logdev = Fluent::Test::DummyLogDevice.new
+ logger = ServerEngine::DaemonLogger.new(logdev, dl_opts)
+ $log = Fluent::Log.new(logger)
end
end
diff --git a/test/test_log.rb b/test/test_log.rb
index 7218d61057..f368e6464b 100644
--- a/test/test_log.rb
+++ b/test/test_log.rb
@@ -26,7 +26,43 @@ def teardown
)
def test_output(data)
log_level, start = data
- log = Fluent::Log.new(@log_device, log_level)
+ logdev = @log_device
+ logger = ServerEngine::DaemonLogger.new(logdev)
+ log = Fluent::Log.new(logger)
+ log.level = log_level
+ log.trace "trace log"
+ log.debug "debug log"
+ log.info "info log"
+ log.warn "warn log"
+ log.error "error log"
+ log.fatal "fatal log"
+ expected = [
+ "#{@timestamp_str} [trace]: trace log\n",
+ "#{@timestamp_str} [debug]: debug log\n",
+ "#{@timestamp_str} [info]: info log\n",
+ "#{@timestamp_str} [warn]: warn log\n",
+ "#{@timestamp_str} [error]: error log\n",
+ "#{@timestamp_str} [fatal]: fatal log\n"
+ ][start..-1]
+ assert_equal(expected, log.out.logs)
+ end
+
+ data(
+ trace: [ServerEngine::DaemonLogger::TRACE, 0],
+ debug: [ServerEngine::DaemonLogger::DEBUG, 1],
+ info: [ServerEngine::DaemonLogger::INFO, 2],
+ warn: [ServerEngine::DaemonLogger::WARN, 3],
+ error: [ServerEngine::DaemonLogger::ERROR, 4],
+ fatal: [ServerEngine::DaemonLogger::FATAL, 5],
+ )
+ def test_output_with_serverengine_loglevel(data)
+ log_level, start = data
+
+ dl_opts = {}
+ dl_opts[:log_level] = log_level
+ logdev = @log_device
+ logger = ServerEngine::DaemonLogger.new(logdev, dl_opts)
+ log = Fluent::Log.new(logger)
log.trace "trace log"
log.debug "debug log"
log.info "info log"
@@ -54,7 +90,44 @@ def test_output(data)
)
def test_output_with_block(data)
log_level, start = data
- log = Fluent::Log.new(@log_device, log_level)
+
+ logdev = @log_device
+ logger = ServerEngine::DaemonLogger.new(logdev)
+ log = Fluent::Log.new(logger)
+ log.level = log_level
+ log.trace { "trace log" }
+ log.debug { "debug log" }
+ log.info { "info log" }
+ log.warn { "warn log" }
+ log.error { "error log" }
+ log.fatal { "fatal log" }
+ expected = [
+ "#{@timestamp_str} [trace]: trace log\n",
+ "#{@timestamp_str} [debug]: debug log\n",
+ "#{@timestamp_str} [info]: info log\n",
+ "#{@timestamp_str} [warn]: warn log\n",
+ "#{@timestamp_str} [error]: error log\n",
+ "#{@timestamp_str} [fatal]: fatal log\n"
+ ][start..-1]
+ assert_equal(expected, log.out.logs)
+ end
+
+ data(
+ trace: [ServerEngine::DaemonLogger::TRACE, 0],
+ debug: [ServerEngine::DaemonLogger::DEBUG, 1],
+ info: [ServerEngine::DaemonLogger::INFO, 2],
+ warn: [ServerEngine::DaemonLogger::WARN, 3],
+ error: [ServerEngine::DaemonLogger::ERROR, 4],
+ fatal: [ServerEngine::DaemonLogger::FATAL, 5],
+ )
+ def test_output_with_block_with_serverengine_loglevel(data)
+ log_level, start = data
+
+ dl_opts = {}
+ dl_opts[:log_level] = log_level
+ logdev = @log_device
+ logger = ServerEngine::DaemonLogger.new(logdev, dl_opts)
+ log = Fluent::Log.new(logger)
log.trace { "trace log" }
log.debug { "debug log" }
log.info { "info log" }
@@ -82,7 +155,42 @@ def test_output_with_block(data)
)
def test_execute_block(data)
log_level, expected = data
- log = Fluent::Log.new(@log_device, log_level)
+ logdev = @log_device
+ logger = ServerEngine::DaemonLogger.new(logdev)
+ log = Fluent::Log.new(logger)
+ log.level = log_level
+ block_called = {
+ trace: false,
+ debug: false,
+ info: false,
+ warn: false,
+ error: false,
+ fatal: false,
+ }
+ log.trace { block_called[:trace] = true }
+ log.debug { block_called[:debug] = true }
+ log.info { block_called[:info] = true }
+ log.warn { block_called[:warn] = true }
+ log.error { block_called[:error] = true }
+ log.fatal { block_called[:fatal] = true }
+ assert_equal(expected, block_called)
+ end
+
+ data(
+ trace: [ServerEngine::DaemonLogger::TRACE, { trace: true, debug: true, info: true, warn: true, error: true, fatal: true }],
+ debug: [ServerEngine::DaemonLogger::DEBUG, { trace: false, debug: true, info: true, warn: true, error: true, fatal: true }],
+ info: [ServerEngine::DaemonLogger::INFO, { trace: false, debug: false, info: true, warn: true, error: true, fatal: true }],
+ warn: [ServerEngine::DaemonLogger::WARN, { trace: false, debug: false, info: false, warn: true, error: true, fatal: true }],
+ error: [ServerEngine::DaemonLogger::ERROR, { trace: false, debug: false, info: false, warn: false, error: true, fatal: true }],
+ fatal: [ServerEngine::DaemonLogger::FATAL, { trace: false, debug: false, info: false, warn: false, error: false, fatal: true }],
+ )
+ def test_execute_block_with_serverengine_loglevel(data)
+ log_level, expected = data
+ dl_opts = {}
+ dl_opts[:log_level] = log_level
+ logdev = @log_device
+ logger = ServerEngine::DaemonLogger.new(logdev, dl_opts)
+ log = Fluent::Log.new(logger)
block_called = {
trace: false,
debug: false,
@@ -111,7 +219,55 @@ def test_execute_block(data)
def test_backtrace(data)
log_level, start = data
backtrace = ["line 1", "line 2", "line 3"]
- log = Fluent::Log.new(@log_device, log_level)
+ logdev = @log_device
+ logger = ServerEngine::DaemonLogger.new(logdev)
+ log = Fluent::Log.new(logger)
+ log.level = log_level
+ log.trace_backtrace(backtrace)
+ log.debug_backtrace(backtrace)
+ log.info_backtrace(backtrace)
+ log.warn_backtrace(backtrace)
+ log.error_backtrace(backtrace)
+ log.fatal_backtrace(backtrace)
+ expected = [
+ " #{@timestamp_str} [trace]: line 1\n",
+ " #{@timestamp_str} [trace]: line 2\n",
+ " #{@timestamp_str} [trace]: line 3\n",
+ " #{@timestamp_str} [debug]: line 1\n",
+ " #{@timestamp_str} [debug]: line 2\n",
+ " #{@timestamp_str} [debug]: line 3\n",
+ " #{@timestamp_str} [info]: line 1\n",
+ " #{@timestamp_str} [info]: line 2\n",
+ " #{@timestamp_str} [info]: line 3\n",
+ " #{@timestamp_str} [warn]: line 1\n",
+ " #{@timestamp_str} [warn]: line 2\n",
+ " #{@timestamp_str} [warn]: line 3\n",
+ " #{@timestamp_str} [error]: line 1\n",
+ " #{@timestamp_str} [error]: line 2\n",
+ " #{@timestamp_str} [error]: line 3\n",
+ " #{@timestamp_str} [fatal]: line 1\n",
+ " #{@timestamp_str} [fatal]: line 2\n",
+ " #{@timestamp_str} [fatal]: line 3\n"
+ ][start..-1]
+ assert_equal(expected, log.out.logs)
+ end
+
+ data(
+ trace: [ServerEngine::DaemonLogger::TRACE, 0],
+ debug: [ServerEngine::DaemonLogger::DEBUG, 3],
+ info: [ServerEngine::DaemonLogger::INFO, 6],
+ warn: [ServerEngine::DaemonLogger::WARN, 9],
+ error: [ServerEngine::DaemonLogger::ERROR, 12],
+ fatal: [ServerEngine::DaemonLogger::FATAL, 15],
+ )
+ def test_backtrace_with_serverengine_loglevel(data)
+ log_level, start = data
+ backtrace = ["line 1", "line 2", "line 3"]
+ dl_opts = {}
+ dl_opts[:log_level] = log_level
+ logdev = @log_device
+ logger = ServerEngine::DaemonLogger.new(logdev, dl_opts)
+ log = Fluent::Log.new(logger)
log.trace_backtrace(backtrace)
log.debug_backtrace(backtrace)
log.info_backtrace(backtrace)
@@ -145,7 +301,13 @@ def test_backtrace(data)
sub_test_case "suppress repeated backtrace" do
def test_same_log_level
backtrace = ["line 1", "line 2", "line 3"]
- log = Fluent::Log.new(@log_device, Fluent::Log::LEVEL_TRACE, suppress_repeated_stacktrace: true)
+ dl_opts = {}
+ dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE
+ logdev = @log_device
+ logger = ServerEngine::DaemonLogger.new(logdev, dl_opts)
+ opts = {}
+ opts[:suppress_repeated_stacktrace] = true
+ log = Fluent::Log.new(logger, opts)
log.trace_backtrace(backtrace)
log.trace_backtrace(backtrace)
log.trace_backtrace(backtrace + ["line 4"])
@@ -170,7 +332,13 @@ def test_same_log_level
def test_different_log_level
backtrace = ["line 1", "line 2", "line 3"]
- log = Fluent::Log.new(@log_device, Fluent::Log::LEVEL_TRACE, suppress_repeated_stacktrace: true)
+ dl_opts = {}
+ dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE
+ logdev = @log_device
+ logger = ServerEngine::DaemonLogger.new(logdev, dl_opts)
+ opts = {}
+ opts[:suppress_repeated_stacktrace] = true
+ log = Fluent::Log.new(logger, opts)
log.trace_backtrace(backtrace)
log.debug_backtrace(backtrace)
log.info_backtrace(backtrace)
@@ -192,7 +360,11 @@ def test_different_log_level
end
def test_dup
- log1 = Fluent::Log.new(@log_device, Fluent::Log::LEVEL_TRACE)
+ dl_opts = {}
+ dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE
+ logdev = @log_device
+ logger = ServerEngine::DaemonLogger.new(logdev, dl_opts)
+ log1 = Fluent::Log.new(logger)
log2 = log1.dup
log1.level = Fluent::Log::LEVEL_DEBUG
original_tag = log1.tag
@@ -204,13 +376,32 @@ def test_dup
end
def test_disable_events
- log = Fluent::Log.new(@log_device, Fluent::Log::LEVEL_TRACE)
+ dl_opts = {}
+ dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE
+ logdev = @log_device
+ logger = ServerEngine::DaemonLogger.new(logdev, dl_opts)
+ log = Fluent::Log.new(logger)
engine = log.instance_variable_get("@engine")
mock(engine).push_log_event(anything, anything, anything).once
log.trace "trace log"
log.disable_events(Thread.current)
log.trace "trace log"
end
+
+ def test_level_reload
+ dl_opts = {}
+ dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE
+ logdev = @log_device
+ logger = ServerEngine::DaemonLogger.new(logdev, dl_opts)
+ log = Fluent::Log.new(logger)
+ assert_equal(ServerEngine::DaemonLogger::TRACE, logger.level)
+ assert_equal(Fluent::Log::LEVEL_TRACE, log.level)
+ # change daemon logger side level
+ logger.level = ServerEngine::DaemonLogger::DEBUG
+ assert_equal(ServerEngine::DaemonLogger::DEBUG, logger.level)
+ # check fluentd log side level is also changed
+ assert_equal(Fluent::Log::LEVEL_DEBUG, log.level)
+ end
end
class PluginLoggerTest < Test::Unit::TestCase
@@ -219,7 +410,11 @@ def setup
@timestamp = Time.parse("2016-04-21 11:58:41 +0900")
@timestamp_str = @timestamp.strftime("%Y-%m-%d %H:%M:%S %z")
stub(Time).now { @timestamp }
- @logger = Fluent::Log.new(@log_device, Fluent::Log::LEVEL_TRACE)
+ dl_opts = {}
+ dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE
+ logdev = @log_device
+ logger = ServerEngine::DaemonLogger.new(logdev, dl_opts)
+ @logger = Fluent::Log.new(logger)
end
def teardown
diff --git a/test/test_supervisor.rb b/test/test_supervisor.rb
new file mode 100644
index 0000000000..e0f9bdb902
--- /dev/null
+++ b/test/test_supervisor.rb
@@ -0,0 +1,267 @@
+require_relative 'helper'
+require 'fluent/event_router'
+require 'fluent/system_config'
+require 'fluent/supervisor'
+require_relative 'test_plugin_classes'
+
+require 'net/http'
+require 'uri'
+
+class SupervisorTest < ::Test::Unit::TestCase
+ include Fluent
+ include FluentTest
+ include ServerModule
+ include WorkerModule
+
+ TMP_DIR = File.dirname(__FILE__) + "/tmp/config#{ENV['TEST_ENV_NUMBER']}"
+
+ def write_config(path, data)
+ FileUtils.mkdir_p(File.dirname(path))
+ File.open(path, "w") {|f| f.write data }
+ end
+
+ def test_initialize
+ opts = Fluent::Supervisor.default_options
+ sv = Fluent::Supervisor.new(opts)
+ opts.each { |k, v|
+ assert_equal v, sv.instance_variable_get("@#{k}")
+ }
+ end
+
+ def test_read_config
+ create_info_dummy_logger
+
+ tmp_dir = "#{TMP_DIR}/dir/test_read_config.conf"
+ conf_str = %[
+
+
+ @type stdout
+ @id stdout_output
+
+]
+ write_config tmp_dir, conf_str
+ opts = Fluent::Supervisor.default_options
+ sv = Fluent::Supervisor.new(opts)
+
+ use_v1_config = {}
+ use_v1_config['use_v1_config'] = true
+
+ sv.instance_variable_set(:@config_path, tmp_dir)
+ sv.instance_variable_set(:@use_v1_config, use_v1_config)
+ sv.send(:read_config)
+
+ conf = sv.instance_variable_get(:@conf)
+
+ elem = conf.elements.find { |e| e.name == 'source' }
+ assert_equal elem['@type'], "forward"
+ assert_equal elem['@id'], "forward_input"
+
+ elem = conf.elements.find { |e| e.name == 'match' }
+ assert_equal elem.arg, "debug.**"
+ assert_equal elem['@type'], "stdout"
+ assert_equal elem['@id'], "stdout_output"
+
+ $log.out.reset
+ end
+
+ def test_system_config
+ opts = Fluent::Supervisor.default_options
+ sv = Fluent::Supervisor.new(opts)
+ conf_data = <<-EOC
+
+ rpc_endpoint 127.0.0.1:24445
+ suppress_repeated_stacktrace true
+ suppress_config_dump true
+ without_source true
+ enable_get_dump true
+ process_name "process_name"
+ log_level info
+
+ EOC
+ conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true)
+ sv.instance_variable_set(:@conf, conf)
+ sv.send(:set_system_config)
+ sys_conf = sv.instance_variable_get(:@system_config)
+
+ assert_equal sys_conf.rpc_endpoint, '127.0.0.1:24445'
+ assert_equal sys_conf.suppress_repeated_stacktrace, true
+ assert_equal sys_conf.suppress_config_dump, true
+ assert_equal sys_conf.without_source, true
+ assert_equal sys_conf.enable_get_dump, true
+ assert_equal sys_conf.process_name, "process_name"
+ assert_equal sys_conf.log_level, 2
+ end
+
+ def test_main_process_signal_handlers
+ create_info_dummy_logger
+
+ unless Fluent.windows?
+ opts = Fluent::Supervisor.default_options
+ sv = Fluent::Supervisor.new(opts)
+ sv.send(:install_main_process_signal_handlers)
+
+ begin
+ Process.kill :USR1, $$
+ rescue
+ end
+
+ sleep 1
+
+ info_msg = '[info]: force flushing buffered events' + "\n"
+ assert{ $log.out.logs.first.end_with?(info_msg) }
+ end
+
+ $log.out.reset
+ end
+
+ def test_supervisor_signal_handler
+ create_debug_dummy_logger
+
+ unless Fluent.windows?
+
+ install_supervisor_signal_handlers
+ begin
+ Process.kill :USR1, $$
+ rescue
+ end
+
+ sleep 1
+
+ debug_msg = '[debug]: fluentd supervisor process get SIGUSR1' + "\n"
+ assert{ $log.out.logs.first.end_with?(debug_msg) }
+ end
+
+ $log.out.reset
+ end
+
+ def test_rpc_server
+ create_info_dummy_logger
+
+ unless Fluent.windows?
+ opts = Fluent::Supervisor.default_options
+ sv = Fluent::Supervisor.new(opts)
+ conf_data = <<-EOC
+
+ rpc_endpoint 0.0.0.0:24447
+
+ EOC
+ conf = Fluent::Config.parse(conf_data, "(test)", "(test_dir)", true)
+ sv.instance_variable_set(:@conf, conf)
+ sv.send(:set_system_config)
+ sys_conf = sv.instance_variable_get(:@system_config)
+ @rpc_endpoint = sys_conf.rpc_endpoint
+ @enable_get_dump = sys_conf.enable_get_dump
+
+ run_rpc_server
+
+ sv.send(:install_main_process_signal_handlers)
+ Net::HTTP.get URI.parse('http://0.0.0.0:24447/api/plugins.flushBuffers')
+ info_msg = '[info]: force flushing buffered events' + "\n"
+
+ stop_rpc_server
+
+ # In TravisCI with OSX(Xcode), it seems that can't use rpc server.
+ # This test will be passed in such environment.
+ pend unless $log.out.logs.first
+
+ assert{ $log.out.logs.first.end_with?(info_msg) }
+ end
+
+ $log.out.reset
+ end
+
+ def test_load_config
+ tmp_dir = "#{TMP_DIR}/dir/test_load_config.conf"
+ conf_info_str = %[
+
+ log_level info
+
+]
+ conf_debug_str = %[
+
+ log_level debug
+
+]
+ write_config tmp_dir, conf_info_str
+
+ params = {}
+ params['use_v1_config'] = true
+ params['log_path'] = 'log_path'
+ params['suppress_repeated_stacktrace'] = true
+ params['log_level'] = Fluent::Log::LEVEL_INFO
+ load_config_proc = Proc.new { Fluent::Supervisor.load_config(tmp_dir, params) }
+
+ # first call
+ se_config = load_config_proc.call
+ assert_equal se_config[:log_level], Fluent::Log::LEVEL_INFO
+ assert_equal se_config[:suppress_repeated_stacktrace], true
+ assert_equal se_config[:worker_type], 'spawn'
+ assert_equal se_config[:workers], 1
+ assert_equal se_config[:log_stdin], false
+ assert_equal se_config[:log_stdout], false
+ assert_equal se_config[:log_stderr], false
+ assert_equal se_config[:enable_heartbeat], true
+ assert_equal se_config[:auto_heartbeat], false
+
+ # second call immediately(reuse config)
+ se_config = load_config_proc.call
+ pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime']
+ pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime']
+ assert_nil pre_config_mtime
+ assert_nil pre_loadtime
+
+ sleep 5
+
+ # third call after 5 seconds(don't reuse config)
+ se_config = load_config_proc.call
+ pre_config_mtime = se_config[:windows_daemon_cmdline][5]['pre_config_mtime']
+ pre_loadtime = se_config[:windows_daemon_cmdline][5]['pre_loadtime']
+ assert_not_nil pre_config_mtime
+ assert_not_nil pre_loadtime
+
+ # forth call immediately(reuse config)
+ se_config = load_config_proc.call
+ # test that pre_config_mtime and pre_loadtime are not changed from previous one because reused pre_config
+ assert_equal se_config[:windows_daemon_cmdline][5]['pre_config_mtime'], pre_config_mtime
+ assert_equal se_config[:windows_daemon_cmdline][5]['pre_loadtime'], pre_loadtime
+
+ write_config tmp_dir, conf_debug_str
+
+ # fifth call after changed conf file(don't reuse config)
+ se_config = load_config_proc.call
+ assert_equal se_config[:log_level], Fluent::Log::LEVEL_DEBUG
+ end
+
+ def test_logger
+ opts = Fluent::Supervisor.default_options
+ sv = Fluent::Supervisor.new(opts)
+ log = sv.instance_variable_get(:@log)
+ log.init
+ logger = $log.instance_variable_get(:@logger)
+
+ assert_equal $log.level, Fluent::Log::LEVEL_INFO
+
+ # test that DamonLogger#level= overwrites Fluent.log#level
+ logger.level = 'debug'
+ assert_equal $log.level, Fluent::Log::LEVEL_DEBUG
+ end
+
+ def create_debug_dummy_logger
+ dl_opts = {}
+ dl_opts[:log_level] = ServerEngine::DaemonLogger::DEBUG
+ logdev = Fluent::Test::DummyLogDevice.new
+ logger = ServerEngine::DaemonLogger.new(logdev, dl_opts)
+ $log = Fluent::Log.new(logger)
+ end
+
+ def create_info_dummy_logger
+ dl_opts = {}
+ dl_opts[:log_level] = ServerEngine::DaemonLogger::INFO
+ logdev = Fluent::Test::DummyLogDevice.new
+ logger = ServerEngine::DaemonLogger.new(logdev, dl_opts)
+ $log = Fluent::Log.new(logger)
+ end
+end