Skip to content

Commit

Permalink
Merge pull request #880 from naritta/add-serverengine
Browse files Browse the repository at this point in the history
Add ServerEngine
  • Loading branch information
tagomoris committed May 10, 2016
2 parents 0d05606 + 87ef5f3 commit a5511a1
Show file tree
Hide file tree
Showing 23 changed files with 978 additions and 441 deletions.
3 changes: 2 additions & 1 deletion fluentd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency("msgpack", [">= 0.7.0"])
gem.add_runtime_dependency("json", [">= 1.4.3"])
gem.add_runtime_dependency("yajl-ruby", ["~> 1.0"])
gem.add_runtime_dependency("cool.io", [">= 1.4.1", "< 2.0.0"])
gem.add_runtime_dependency("cool.io", [">= 1.4.3", "< 2.0.0"])
gem.add_runtime_dependency("serverengine", [">= 1.6.3"])
gem.add_runtime_dependency("http_parser.rb", [">= 0.5.1", "< 0.7.0"])
gem.add_runtime_dependency("sigdump", ["~> 0.2.2"])
gem.add_runtime_dependency("tzinfo", [">= 1.0.0"])
Expand Down
5 changes: 4 additions & 1 deletion lib/fluent/command/debug.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@

include Fluent::SystemConfig::Mixin

$log = Fluent::Log.new(STDERR, Fluent::Log::LEVEL_TRACE)
dl_opts = {}
dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE
logger = ServerEngine::DaemonLogger.new(STDERR, dl_opts)
$log = Fluent::Log.new(logger)
Fluent::Engine.init(system_config)

DRb::DRbObject.class_eval do
Expand Down
7 changes: 6 additions & 1 deletion lib/fluent/command/fluentd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,9 @@
exit 0 if early_exit

require 'fluent/supervisor'
Fluent::Supervisor.new(opts).start
if opts[:supervise]
Fluent::Supervisor.new(opts).run_supervisor
else
Fluent::Supervisor.new(opts).run_worker
end

15 changes: 15 additions & 0 deletions lib/fluent/daemon.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env ruby
# -*- coding: utf-8 -*-

here = File.dirname(__FILE__)
$LOAD_PATH << File.expand_path(File.join(here, '..'))

require 'serverengine'
require 'fluent/supervisor'

server_module = Fluent.const_get(ARGV[0])
worker_module = Fluent.const_get(ARGV[1])
# it doesn't call ARGV in block because when reloading config, params will be initialized and then it can't use previous config.
config_path = ARGV[2]
params = JSON.parse(ARGV[3])
ServerEngine::Daemon.run_server(server_module, worker_module) { Fluent::Supervisor.load_config(config_path, params) }
50 changes: 45 additions & 5 deletions lib/fluent/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,34 @@ def self.str_to_level(log_level_str)
end
end

def initialize(out=STDERR, level=LEVEL_TRACE, opts={})
@out = out
@level = level
def initialize(logger, opts={})
# overwrites logger.level= so that config reloading resets level of Fluentd::Log
orig_logger_level_setter = logger.class.public_instance_method(:level=).bind(logger)
me = self
# The original ruby logger sets the number as each log level like below.
# DEBUG = 0
# INFO = 1
# WARN = 2
# ERROR = 3
# FATAL = 4
# Serverengine use this original log number. In addition to this, serverengine sets -1 as TRACE level.
# TRACE = -1
#
# On the other hand, in fluentd side, it sets the number like below.
# TRACE = 0
# DEBUG = 1
# INFO = 2
# WARN = 3
# ERROR = 4
# FATAL = 5
#
# Then fluentd's level is set as serverengine's level + 1.
# So if serverengine's logger level is changed, fluentd's log level will be changed to that + 1.
logger.define_singleton_method(:level=) {|level| orig_logger_level_setter.call(level); me.level = self.level + 1 }

@logger = logger
@out = logger.instance_variable_get(:@logdev)
@level = logger.level + 1
@debug_mode = false
@self_event = false
@tag = 'fluent'
Expand All @@ -74,7 +99,10 @@ def initialize(out=STDERR, level=LEVEL_TRACE, opts={})
end

def dup
clone = self.class.new(@out, @level, suppress_repeated_stacktrace: @suppress_repeated_stacktrace)
dl_opts = {}
dl_opts[:log_level] = @level - 1
logger = ServerEngine::DaemonLogger.new(@out, dl_opts)
clone = self.class.new(logger, suppress_repeated_stacktrace: @suppress_repeated_stacktrace)
clone.tag = @tag
clone.time_format = @time_format
# optional headers/attrs are not copied, because new PluginLogger should have another one of it
Expand All @@ -87,6 +115,18 @@ def dup
attr_accessor :time_format
attr_accessor :optional_header, :optional_attrs

def logdev=(logdev)
@out = logdev
@logger.instance_variable_set(:@logdev, logdev)
nil
end

def reopen!
# do noting in @logger.reopen! because it's already reopened in Supervisor.load_config
@logger.reopen! if @logger
nil
end

def enable_debug(b=true)
@debug_mode = b
self
Expand Down Expand Up @@ -238,7 +278,7 @@ def fatal_backtrace(backtrace=$!.backtrace)
end

def puts(msg)
@out.puts(msg)
@logger << msg + "\n"
@out.flush
msg
rescue
Expand Down
16 changes: 11 additions & 5 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,16 @@ def configure(conf)
def start
@loop = Coolio::Loop.new

@lsock = listen
socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
socket_manager_path = socket_manager_path.to_i
end
client = ServerEngine::SocketManager::Client.new(socket_manager_path)

@lsock = listen(client)
@loop.attach(@lsock)

@usock = SocketUtil.create_udp_socket(@bind)
@usock.bind(@bind, @port)
@usock = client.listen_udp(@bind, @port)
@usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
@hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request))
@loop.attach(@hbr)
Expand All @@ -86,9 +91,10 @@ def shutdown
@lsock.close
end

def listen
def listen(client)
log.info "listening fluent socket on #{@bind}:#{@port}"
s = Coolio::TCPServer.new(@bind, @port, Handler, @linger_timeout, log, method(:on_message))
sock = client.listen_tcp(@bind, @port)
s = Coolio::TCPServer.new(sock, nil, Handler, @linger_timeout, log, method(:on_message))
s.listen(@backlog) unless @backlog.nil?
s
end
Expand Down
8 changes: 7 additions & 1 deletion lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,13 @@ def on_timer

def start
log.debug "listening http on #{@bind}:#{@port}"
lsock = TCPServer.new(@bind, @port)

socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
socket_manager_path = socket_manager_path.to_i
end
client = ServerEngine::SocketManager::Client.new(socket_manager_path)
lsock = client.listen_tcp(@bind, @port)

detach_multi_process do
super
Expand Down
11 changes: 8 additions & 3 deletions lib/fluent/plugin/in_syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,18 @@ def receive_data(data, addr)

def listen(callback)
log.info "listening syslog socket on #{@bind}:#{@port} with #{@protocol_type}"
socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
socket_manager_path = socket_manager_path.to_i
end
client = ServerEngine::SocketManager::Client.new(socket_manager_path)
if @protocol_type == :udp
@usock = SocketUtil.create_udp_socket(@bind)
@usock.bind(@bind, @port)
@usock = client.listen_udp(@bind, @port)
SocketUtil::UdpHandler.new(@usock, log, 2048, callback)
else
# syslog family add "\n" to each message and this seems only way to split messages in tcp stream
Coolio::TCPServer.new(@bind, @port, SocketUtil::TcpHandler, log, "\n", callback)
lsock = client.listen_tcp(@bind, @port)
Coolio::TCPServer.new(lsock, nil, SocketUtil::TcpHandler, log, "\n", callback)
end
end

Expand Down
9 changes: 8 additions & 1 deletion lib/fluent/plugin/in_tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ class TcpInput < SocketUtil::BaseInput

def listen(callback)
log.info "listening tcp socket on #{@bind}:#{@port}"
Coolio::TCPServer.new(@bind, @port, SocketUtil::TcpHandler, log, @delimiter, callback)

socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
socket_manager_path = socket_manager_path.to_i
end
client = ServerEngine::SocketManager::Client.new(socket_manager_path)
lsock = client.listen_tcp(@bind, @port)
Coolio::TCPServer.new(lsock, nil, SocketUtil::TcpHandler, log, @delimiter, callback)
end
end
end
8 changes: 6 additions & 2 deletions lib/fluent/plugin/in_udp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ class UdpInput < SocketUtil::BaseInput

def listen(callback)
log.info "listening udp socket on #{@bind}:#{@port}"
@usock = SocketUtil.create_udp_socket(@bind)
@usock.bind(@bind, @port)
socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
socket_manager_path = socket_manager_path.to_i
end
client = ServerEngine::SocketManager::Client.new(socket_manager_path)
@usock = client.listen_udp(@bind, @port)
SocketUtil::UdpHandler.new(@usock, log, @body_size_limit, callback)
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/socket_util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def start

def shutdown
@loop.watchers.each { |w| w.detach }
@loop.stop
@loop.stop if @loop.instance_variable_get("@running")
@handler.close
@thread.join
end
Expand Down
Loading

0 comments on commit a5511a1

Please sign in to comment.