From 39e0ef5783c26d0da1175a036c74301fd70dfe7c Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Tue, 10 May 2016 10:45:11 -0700 Subject: [PATCH] Allow Logstash to write its logs in JSON format This is made available by a --log-in-json flag. Default is false. When false, the old behavior [1] is used. When true, JSON logs are emitted. [1] The old behavior is realy two things. First, using Object#inspect to serialize. Second, to color the output if the IO is a tty. For #1569 This is a manual backport of PR #4820 into the 2.x branch. Fixes #5277 --- lib/bootstrap/rspec.rb | 1 + logstash-core/lib/logstash/agent.rb | 20 ++++++++++-- logstash-core/lib/logstash/inputs/base.rb | 2 +- logstash-core/lib/logstash/logging/json.rb | 21 +++++++++++++ .../lib/logstash/output_delegator.rb | 6 ++-- logstash-core/lib/logstash/pipeline.rb | 2 +- logstash-core/lib/logstash/plugin.rb | 2 +- logstash-core/locales/en.yml | 4 +++ .../spec/logstash/output_delegator_spec.rb | 1 + logstash-core/spec/logstash/plugin_spec.rb | 4 +-- logstash-core/spec/logstash/runner_spec.rb | 27 ++++++++++++++++ .../spec/logstash/shutdown_watcher_spec.rb | 1 + spec/spec_helper.rb | 31 +++++++++++++++++++ 13 files changed, 112 insertions(+), 10 deletions(-) create mode 100644 logstash-core/lib/logstash/logging/json.rb diff --git a/lib/bootstrap/rspec.rb b/lib/bootstrap/rspec.rb index f32057c7f9c..4c95f3bfc76 100755 --- a/lib/bootstrap/rspec.rb +++ b/lib/bootstrap/rspec.rb @@ -7,6 +7,7 @@ require "rspec/core" require "rspec" +require 'ci/reporter/rake/rspec_loader' status = RSpec::Core::Runner.run(ARGV.empty? ? ["spec"] : ARGV).to_i exit status if status != 0 diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index 04a3e8b709e..7995492b7b5 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -91,6 +91,10 @@ class LogStash::Agent < Clamp::Command I18n.t("logstash.agent.flag.allow-env"), :attribute_name => :allow_env, :default => false + option ["--[no-]log-in-json"], :flag, + I18n.t("logstash.agent.flag.log-in-json"), + :default => false + def initialize(*params) super(*params) @logger = Cabin::Channel.get(LogStash) @@ -142,6 +146,7 @@ def execute require "logstash/pipeline" require "cabin" # gem 'cabin' require "logstash/plugin" + require "logstash/logging/json" LogStash::ShutdownWatcher.unsafe_shutdown = unsafe_shutdown? LogStash::ShutdownWatcher.logger = @logger @@ -294,11 +299,22 @@ def configure_logging(path) puts "Sending logstash logs to #{path}." @logger.unsubscribe(@logger_subscription) if @logger_subscription - @logger_subscription = @logger.subscribe(@log_fd) + if log_in_json? + @logger_subscription = @logger.subscribe(LogStash::Logging::JSON.new(@log_fd)) + @logger.subscribe(LogStash::Logging::JSON.new(STDOUT), :level => :fatal) + else + @logger_subscription = @logger.subscribe(@log_fd) + @logger.subscribe(STDOUT, :level => :fatal) + end else - @logger.subscribe(STDOUT) + if log_in_json? + @logger.subscribe(LogStash::Logging::JSON.new(STDOUT)) + else + @logger.subscribe(STDOUT) + end end + # TODO(sissel): redirect stdout/stderr to the log as well # http://jira.codehaus.org/browse/JRUBY-7003 end # def configure_logging diff --git a/logstash-core/lib/logstash/inputs/base.rb b/logstash-core/lib/logstash/inputs/base.rb index 414cd714784..1cf29abd472 100644 --- a/logstash-core/lib/logstash/inputs/base.rb +++ b/logstash-core/lib/logstash/inputs/base.rb @@ -78,7 +78,7 @@ def stop public def do_stop - @logger.debug("stopping", :plugin => self) + @logger.debug("stopping", :plugin => self.class.name) @stop_called.make_true stop end diff --git a/logstash-core/lib/logstash/logging/json.rb b/logstash-core/lib/logstash/logging/json.rb new file mode 100644 index 00000000000..1637fa11ce4 --- /dev/null +++ b/logstash-core/lib/logstash/logging/json.rb @@ -0,0 +1,21 @@ +# encoding: utf-8 +require "logstash/namespace" +require "logstash/logging" +require "logstash/json" + +module LogStash; class Logging; class JSON + def initialize(io) + raise ArgumentError, "Expected IO, got #{io.class.name}" unless io.is_a?(IO) + + @io = io + @lock = Mutex.new + end + + def <<(obj) + serialized = LogStash::Json.dump(obj) + @lock.synchronize do + @io.puts(serialized) + @io.flush + end + end +end; end; end diff --git a/logstash-core/lib/logstash/output_delegator.rb b/logstash-core/lib/logstash/output_delegator.rb index 7a232c6107f..dcac7ae8a40 100644 --- a/logstash-core/lib/logstash/output_delegator.rb +++ b/logstash-core/lib/logstash/output_delegator.rb @@ -74,7 +74,7 @@ def register @workers << @klass.new(@config) @workers.first.register # Needed in case register calls `workers_not_supported` - @logger.debug("Will start workers for output", :worker_count => target_worker_count, :class => @klass) + @logger.debug("Will start workers for output", :worker_count => target_worker_count, :class => @klass.name) # Threadsafe versions don't need additional workers setup_additional_workers!(target_worker_count) unless @threadsafe @@ -134,7 +134,7 @@ def worker_multi_receive(events) end def do_close - @logger.debug("closing output delegator", :klass => @klass) + @logger.debug("closing output delegator", :klass => @klass.name) if @threadsafe @workers.each(&:do_close) @@ -169,4 +169,4 @@ def worker_count private # Needed for testing, so private attr_reader :threadsafe_worker, :worker_queue -end end \ No newline at end of file +end end diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index 3af7a6a3790..cdda242581d 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -392,7 +392,7 @@ def shutdown(&before_stop) def shutdown_workers # Each worker thread will receive this exactly once! @worker_threads.each do |t| - @logger.debug("Pushing shutdown", :thread => t) + @logger.debug("Pushing shutdown", :thread => t.inspect) @input_queue.push(LogStash::SHUTDOWN) end diff --git a/logstash-core/lib/logstash/plugin.rb b/logstash-core/lib/logstash/plugin.rb index 1f9d471b087..73431b1b811 100644 --- a/logstash-core/lib/logstash/plugin.rb +++ b/logstash-core/lib/logstash/plugin.rb @@ -32,7 +32,7 @@ def initialize(params=nil) # main task terminates public def do_close - @logger.debug("closing", :plugin => self) + @logger.debug("closing", :plugin => self.class.name) close end diff --git a/logstash-core/locales/en.yml b/logstash-core/locales/en.yml index b8f4f12b5b0..c627cf7dcbf 100644 --- a/logstash-core/locales/en.yml +++ b/logstash-core/locales/en.yml @@ -226,3 +226,7 @@ en: Print the compiled config ruby code out as a debug log (you must also have --debug enabled). WARNING: This will include any 'password' options passed to plugin configs as plaintext, and may result in plaintext passwords appearing in your logs! + log-in-json: |+ + Specify that Logstash should write its own logs in JSON form - one + event per line. If false, Logstash will log using Ruby's + Object#inspect (not easy to machine-parse) diff --git a/logstash-core/spec/logstash/output_delegator_spec.rb b/logstash-core/spec/logstash/output_delegator_spec.rb index 78fa4c4f952..84c79926ff3 100644 --- a/logstash-core/spec/logstash/output_delegator_spec.rb +++ b/logstash-core/spec/logstash/output_delegator_spec.rb @@ -16,6 +16,7 @@ allow(out_klass).to receive(:new).with(any_args).and_return(out_inst) allow(out_klass).to receive(:threadsafe?).and_return(false) allow(out_klass).to receive(:workers_not_supported?).and_return(false) + allow(out_klass).to receive(:name).and_return("example") allow(out_inst).to receive(:register) allow(out_inst).to receive(:multi_receive) allow(logger).to receive(:debug).with(any_args) diff --git a/logstash-core/spec/logstash/plugin_spec.rb b/logstash-core/spec/logstash/plugin_spec.rb index 16d1485e180..b720fe49d27 100644 --- a/logstash-core/spec/logstash/plugin_spec.rb +++ b/logstash-core/spec/logstash/plugin_spec.rb @@ -4,12 +4,12 @@ describe LogStash::Plugin do it "should fail lookup on inexisting type" do - expect_any_instance_of(Cabin::Channel).to receive(:debug).once + #expect_any_instance_of(Cabin::Channel).to receive(:debug).once expect { LogStash::Plugin.lookup("badbadtype", "badname") }.to raise_error(LogStash::PluginLoadingError) end it "should fail lookup on inexisting name" do - expect_any_instance_of(Cabin::Channel).to receive(:debug).once + #expect_any_instance_of(Cabin::Channel).to receive(:debug).once expect { LogStash::Plugin.lookup("filter", "badname") }.to raise_error(LogStash::PluginLoadingError) end diff --git a/logstash-core/spec/logstash/runner_spec.rb b/logstash-core/spec/logstash/runner_spec.rb index 00f078bedc0..07fff9a5a96 100644 --- a/logstash-core/spec/logstash/runner_spec.rb +++ b/logstash-core/spec/logstash/runner_spec.rb @@ -3,6 +3,7 @@ require "logstash/runner" require "stud/task" require "stud/trap" +require "stud/temporary" class NullRunner def run(args); end @@ -14,6 +15,7 @@ def run(args); end before :each do allow(Cabin::Channel).to receive(:get).with(LogStash).and_return(channel) + allow(channel).to receive(:subscribe).with(any_args).and_call_original end context "argument parsing" do @@ -54,4 +56,29 @@ def run(args); end end end end + + context "--log-in-json" do + let(:logfile) { Stud::Temporary.file } + let(:args) { [ "agent", "--log-in-json", "-l", logfile.path, "-e", "some-invalid-config" ] } + + after do + logfile.close + File.unlink(logfile.path) + end + + before do + expect(channel).to receive(:subscribe).with(kind_of(LogStash::Logging::JSON)).and_call_original + subject.run(args).wait + + # Log file should have stuff in it. + expect(logfile.stat.size).to be > 0 + end + + it "should log in valid json. One object per line." do + logfile.each_line do |line| + expect(line).not_to be_empty + expect { LogStash::Json.load(line) }.not_to raise_error + end + end + end end diff --git a/logstash-core/spec/logstash/shutdown_watcher_spec.rb b/logstash-core/spec/logstash/shutdown_watcher_spec.rb index 118e126ea5d..fc799f3f69a 100644 --- a/logstash-core/spec/logstash/shutdown_watcher_spec.rb +++ b/logstash-core/spec/logstash/shutdown_watcher_spec.rb @@ -20,6 +20,7 @@ allow(pipeline).to receive(:thread).and_return(Thread.current) allow(reporter).to receive(:snapshot).and_return(reporter_snapshot) allow(reporter_snapshot).to receive(:o_simple_hash).and_return({}) + allow(reporter_snapshot).to receive(:to_json_data).and_return("reporter-double") allow(subject).to receive(:pipeline_report_snapshot).and_wrap_original do |m, *args| report_count += 1 diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 5428fd8fd90..5d9cce28a0e 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -5,7 +5,38 @@ CoverageHelper.eager_load if ENV['COVERAGE'] require "logstash/devutils/rspec/spec_helper" +require "logstash/logging/json" + +class JSONIOThingy < IO + def initialize; end + def flush; end + + def puts(payload) + # Ensure that all log payloads are valid json. + LogStash::Json.load(payload) + end +end + +RSpec.configure do |c| + c.before do + # Force Cabin to always have a JSON subscriber. The main purpose of this + # is to catch crashes in json serialization for our logs. JSONIOThingy + # exists to validate taht what LogStash::Logging::JSON emits is always + # valid JSON. + jsonvalidator = JSONIOThingy.new + allow(Cabin::Channel).to receive(:new).and_wrap_original do |m, *args| + logger = m.call(*args) + logger.level = :debug + logger.subscribe(LogStash::Logging::JSON.new(jsonvalidator)) + + logger + end + end + +end def installed_plugins Gem::Specification.find_all.select { |spec| spec.metadata["logstash_plugin"] }.map { |plugin| plugin.name } end + +