Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ServerEngine #880

Merged
merged 5 commits into from
May 10, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion fluentd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency("msgpack", [">= 0.7.0"])
gem.add_runtime_dependency("json", [">= 1.4.3"])
gem.add_runtime_dependency("yajl-ruby", ["~> 1.0"])
gem.add_runtime_dependency("cool.io", [">= 1.4.1", "< 2.0.0"])
gem.add_runtime_dependency("cool.io", [">= 1.4.3", "< 2.0.0"])
gem.add_runtime_dependency("serverengine", [">= 1.6.3"])
gem.add_runtime_dependency("http_parser.rb", [">= 0.5.1", "< 0.7.0"])
gem.add_runtime_dependency("sigdump", ["~> 0.2.2"])
gem.add_runtime_dependency("tzinfo", [">= 1.0.0"])
Expand Down
5 changes: 4 additions & 1 deletion lib/fluent/command/debug.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@

include Fluent::SystemConfig::Mixin

$log = Fluent::Log.new(STDERR, Fluent::Log::LEVEL_TRACE)
dl_opts = {}
dl_opts[:log_level] = ServerEngine::DaemonLogger::TRACE
logger = ServerEngine::DaemonLogger.new(STDERR, dl_opts)
$log = Fluent::Log.new(logger)
Fluent::Engine.init(system_config)

DRb::DRbObject.class_eval do
Expand Down
7 changes: 6 additions & 1 deletion lib/fluent/command/fluentd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,9 @@
exit 0 if early_exit

require 'fluent/supervisor'
Fluent::Supervisor.new(opts).start
if opts[:supervise]
Fluent::Supervisor.new(opts).run_supervisor
else
Fluent::Supervisor.new(opts).run_worker
end

15 changes: 15 additions & 0 deletions lib/fluent/daemon.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env ruby
# -*- coding: utf-8 -*-

here = File.dirname(__FILE__)
$LOAD_PATH << File.expand_path(File.join(here, '..'))

require 'serverengine'
require 'fluent/supervisor'

server_module = Fluent.const_get(ARGV[0])
worker_module = Fluent.const_get(ARGV[1])
# it doesn't call ARGV in block because when reloading config, params will be initialized and then it can't use previous config.
config_path = ARGV[2]
params = JSON.parse(ARGV[3])
ServerEngine::Daemon.run_server(server_module, worker_module) { Fluent::Supervisor.load_config(config_path, params) }
50 changes: 45 additions & 5 deletions lib/fluent/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,34 @@ def self.str_to_level(log_level_str)
end
end

def initialize(out=STDERR, level=LEVEL_TRACE, opts={})
@out = out
@level = level
def initialize(logger, opts={})
# overwrites logger.level= so that config reloading resets level of Fluentd::Log
orig_logger_level_setter = logger.class.public_instance_method(:level=).bind(logger)
me = self
# The original ruby logger sets the number as each log level like below.
# DEBUG = 0
# INFO = 1
# WARN = 2
# ERROR = 3
# FATAL = 4
# Serverengine use this original log number. In addition to this, serverengine sets -1 as TRACE level.
# TRACE = -1
#
# On the other hand, in fluentd side, it sets the number like below.
# TRACE = 0
# DEBUG = 1
# INFO = 2
# WARN = 3
# ERROR = 4
# FATAL = 5
#
# Then fluentd's level is set as serverengine's level + 1.
# So if serverengine's logger level is changed, fluentd's log level will be changed to that + 1.
logger.define_singleton_method(:level=) {|level| orig_logger_level_setter.call(level); me.level = self.level + 1 }

@logger = logger
@out = logger.instance_variable_get(:@logdev)
@level = logger.level + 1
@debug_mode = false
@self_event = false
@tag = 'fluent'
Expand All @@ -74,7 +99,10 @@ def initialize(out=STDERR, level=LEVEL_TRACE, opts={})
end

def dup
clone = self.class.new(@out, @level, suppress_repeated_stacktrace: @suppress_repeated_stacktrace)
dl_opts = {}
dl_opts[:log_level] = @level - 1
logger = ServerEngine::DaemonLogger.new(@out, dl_opts)
clone = self.class.new(logger, suppress_repeated_stacktrace: @suppress_repeated_stacktrace)
clone.tag = @tag
clone.time_format = @time_format
# optional headers/attrs are not copied, because new PluginLogger should have another one of it
Expand All @@ -87,6 +115,18 @@ def dup
attr_accessor :time_format
attr_accessor :optional_header, :optional_attrs

def logdev=(logdev)
@out = logdev
@logger.instance_variable_set(:@logdev, logdev)
nil
end

def reopen!
# do noting in @logger.reopen! because it's already reopened in Supervisor.load_config
@logger.reopen! if @logger
nil
end

def enable_debug(b=true)
@debug_mode = b
self
Expand Down Expand Up @@ -238,7 +278,7 @@ def fatal_backtrace(backtrace=$!.backtrace)
end

def puts(msg)
@out.puts(msg)
@logger << msg + "\n"
@out.flush
msg
rescue
Expand Down
16 changes: 11 additions & 5 deletions lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,16 @@ def configure(conf)
def start
@loop = Coolio::Loop.new

@lsock = listen
socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
socket_manager_path = socket_manager_path.to_i
end
client = ServerEngine::SocketManager::Client.new(socket_manager_path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to have a utility method for these several lines (it looks to appear very often), but I have a plan of a new plugin helper for tcp/udp/tls sockets. So it's my task following this pull-request.


@lsock = listen(client)
@loop.attach(@lsock)

@usock = SocketUtil.create_udp_socket(@bind)
@usock.bind(@bind, @port)
@usock = client.listen_udp(@bind, @port)
@usock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
@hbr = HeartbeatRequestHandler.new(@usock, method(:on_heartbeat_request))
@loop.attach(@hbr)
Expand All @@ -86,9 +91,10 @@ def shutdown
@lsock.close
end

def listen
def listen(client)
log.info "listening fluent socket on #{@bind}:#{@port}"
s = Coolio::TCPServer.new(@bind, @port, Handler, @linger_timeout, log, method(:on_message))
sock = client.listen_tcp(@bind, @port)
s = Coolio::TCPServer.new(sock, nil, Handler, @linger_timeout, log, method(:on_message))
s.listen(@backlog) unless @backlog.nil?
s
end
Expand Down
8 changes: 7 additions & 1 deletion lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,13 @@ def on_timer

def start
log.debug "listening http on #{@bind}:#{@port}"
lsock = TCPServer.new(@bind, @port)

socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
socket_manager_path = socket_manager_path.to_i
end
client = ServerEngine::SocketManager::Client.new(socket_manager_path)
lsock = client.listen_tcp(@bind, @port)

detach_multi_process do
super
Expand Down
11 changes: 8 additions & 3 deletions lib/fluent/plugin/in_syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,18 @@ def receive_data(data, addr)

def listen(callback)
log.info "listening syslog socket on #{@bind}:#{@port} with #{@protocol_type}"
socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
socket_manager_path = socket_manager_path.to_i
end
client = ServerEngine::SocketManager::Client.new(socket_manager_path)
if @protocol_type == :udp
@usock = SocketUtil.create_udp_socket(@bind)
@usock.bind(@bind, @port)
@usock = client.listen_udp(@bind, @port)
SocketUtil::UdpHandler.new(@usock, log, 2048, callback)
else
# syslog family add "\n" to each message and this seems only way to split messages in tcp stream
Coolio::TCPServer.new(@bind, @port, SocketUtil::TcpHandler, log, "\n", callback)
lsock = client.listen_tcp(@bind, @port)
Coolio::TCPServer.new(lsock, nil, SocketUtil::TcpHandler, log, "\n", callback)
end
end

Expand Down
9 changes: 8 additions & 1 deletion lib/fluent/plugin/in_tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ class TcpInput < SocketUtil::BaseInput

def listen(callback)
log.info "listening tcp socket on #{@bind}:#{@port}"
Coolio::TCPServer.new(@bind, @port, SocketUtil::TcpHandler, log, @delimiter, callback)

socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
socket_manager_path = socket_manager_path.to_i
end
client = ServerEngine::SocketManager::Client.new(socket_manager_path)
lsock = client.listen_tcp(@bind, @port)
Coolio::TCPServer.new(lsock, nil, SocketUtil::TcpHandler, log, @delimiter, callback)
end
end
end
8 changes: 6 additions & 2 deletions lib/fluent/plugin/in_udp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@ class UdpInput < SocketUtil::BaseInput

def listen(callback)
log.info "listening udp socket on #{@bind}:#{@port}"
@usock = SocketUtil.create_udp_socket(@bind)
@usock.bind(@bind, @port)
socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
socket_manager_path = socket_manager_path.to_i
end
client = ServerEngine::SocketManager::Client.new(socket_manager_path)
@usock = client.listen_udp(@bind, @port)
SocketUtil::UdpHandler.new(@usock, log, @body_size_limit, callback)
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/socket_util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def start

def shutdown
@loop.watchers.each { |w| w.detach }
@loop.stop
@loop.stop if @loop.instance_variable_get("@running")
@handler.close
@thread.join
end
Expand Down
Loading