Skip to content

Commit

Permalink
Add ServerEngine
Browse files Browse the repository at this point in the history
  • Loading branch information
naritta committed May 10, 2016
1 parent 4f5c48d commit c1170cc
Show file tree
Hide file tree
Showing 6 changed files with 369 additions and 412 deletions.
3 changes: 2 additions & 1 deletion fluentd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ Gem::Specification.new do |gem|
gem.add_runtime_dependency("msgpack", [">= 0.7.0"])
gem.add_runtime_dependency("json", [">= 1.4.3"])
gem.add_runtime_dependency("yajl-ruby", ["~> 1.0"])
gem.add_runtime_dependency("cool.io", [">= 1.4.1", "< 2.0.0"])
gem.add_runtime_dependency("cool.io", [">= 1.4.3", "< 2.0.0"])
gem.add_runtime_dependency("serverengine", [">= 1.6.3"])
gem.add_runtime_dependency("http_parser.rb", [">= 0.5.1", "< 0.7.0"])
gem.add_runtime_dependency("sigdump", ["~> 0.2.2"])
gem.add_runtime_dependency("tzinfo", [">= 1.0.0"])
Expand Down
7 changes: 6 additions & 1 deletion lib/fluent/command/fluentd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,9 @@
exit 0 if early_exit

require 'fluent/supervisor'
Fluent::Supervisor.new(opts).start
if opts[:supervise]
Fluent::Supervisor.new(opts).run_supervisor
else
Fluent::Supervisor.new(opts).run_worker
end

15 changes: 15 additions & 0 deletions lib/fluent/daemon.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env ruby
# -*- coding: utf-8 -*-

here = File.dirname(__FILE__)
$LOAD_PATH << File.expand_path(File.join(here, '..'))

require 'serverengine'
require 'fluent/supervisor'

server_module = Fluent.const_get(ARGV[0])
worker_module = Fluent.const_get(ARGV[1])
# it doesn't call ARGV in block because when reloading config, params will be initialized and then it can't use previous config.
config_path = ARGV[2]
params = JSON.parse(ARGV[3])
ServerEngine::Daemon.run_server(server_module, worker_module) { Fluent::Supervisor.load_config(config_path, params) }
50 changes: 45 additions & 5 deletions lib/fluent/log.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,34 @@ def self.str_to_level(log_level_str)
end
end

def initialize(out=STDERR, level=LEVEL_TRACE, opts={})
@out = out
@level = level
def initialize(logger, opts={})
# overwrites logger.level= so that config reloading resets level of Fluentd::Log
orig_logger_level_setter = logger.class.public_instance_method(:level=).bind(logger)
me = self
# The original ruby logger sets the number as each log level like below.
# DEBUG = 0
# INFO = 1
# WARN = 2
# ERROR = 3
# FATAL = 4
# Serverengine use this original log number. In addition to this, serverengine sets -1 as TRACE level.
# TRACE = -1
#
# On the other hand, in fluentd side, it sets the number like below.
# TRACE = 0
# DEBUG = 1
# INFO = 2
# WARN = 3
# ERROR = 4
# FATAL = 5
#
# Then fluentd's level is set as serverengine's level + 1.
# So if serverengine's logger level is changed, fluentd's log level will be changed to that + 1.
logger.define_singleton_method(:level=) {|level| orig_logger_level_setter.call(level); me.level = self.level + 1 }

@logger = logger
@out = logger.instance_variable_get(:@logdev)
@level = logger.level + 1
@debug_mode = false
@self_event = false
@tag = 'fluent'
Expand All @@ -74,7 +99,10 @@ def initialize(out=STDERR, level=LEVEL_TRACE, opts={})
end

def dup
clone = self.class.new(@out, @level, suppress_repeated_stacktrace: @suppress_repeated_stacktrace)
dl_opts = {}
dl_opts[:log_level] = @level - 1
logger = ServerEngine::DaemonLogger.new(@out, dl_opts)
clone = self.class.new(logger, suppress_repeated_stacktrace: @suppress_repeated_stacktrace)
clone.tag = @tag
clone.time_format = @time_format
# optional headers/attrs are not copied, because new PluginLogger should have another one of it
Expand All @@ -87,6 +115,18 @@ def dup
attr_accessor :time_format
attr_accessor :optional_header, :optional_attrs

def logdev=(logdev)
@out = logdev
@logger.instance_variable_set(:@logdev, logdev)
nil
end

def reopen!
# do noting in @logger.reopen! because it's already reopened in Supervisor.load_config
@logger.reopen! if @logger
nil
end

def enable_debug(b=true)
@debug_mode = b
self
Expand Down Expand Up @@ -238,7 +278,7 @@ def fatal_backtrace(backtrace=$!.backtrace)
end

def puts(msg)
@out.puts(msg)
@logger << msg + "\n"
@out.flush
msg
rescue
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/socket_util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def start

def shutdown
@loop.watchers.each { |w| w.detach }
@loop.stop
@loop.stop if @loop.instance_variable_get("@running")
@handler.close
@thread.join
end
Expand Down
Loading

0 comments on commit c1170cc

Please sign in to comment.