Skip to content

Commit

Permalink
Allow Logstash to write its logs in JSON format
Browse files Browse the repository at this point in the history
This is made available by a --log-in-json flag. Default is false.
When false, the old behavior [1] is used.

When true, JSON logs are emitted.

[1] The old behavior is realy two things. First, using Object#inspect to
serialize. Second, to color the output if the IO is a tty.

For #1569

This is a manual backport of PR #4820 into the 2.x branch.

Fixes #5277
  • Loading branch information
jordansissel committed May 10, 2016
1 parent 9b712e5 commit 39e0ef5
Show file tree
Hide file tree
Showing 13 changed files with 112 additions and 10 deletions.
1 change: 1 addition & 0 deletions lib/bootstrap/rspec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

require "rspec/core"
require "rspec"
require 'ci/reporter/rake/rspec_loader'

status = RSpec::Core::Runner.run(ARGV.empty? ? ["spec"] : ARGV).to_i
exit status if status != 0
20 changes: 18 additions & 2 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ class LogStash::Agent < Clamp::Command
I18n.t("logstash.agent.flag.allow-env"),
:attribute_name => :allow_env, :default => false

option ["--[no-]log-in-json"], :flag,
I18n.t("logstash.agent.flag.log-in-json"),
:default => false

def initialize(*params)
super(*params)
@logger = Cabin::Channel.get(LogStash)
Expand Down Expand Up @@ -142,6 +146,7 @@ def execute
require "logstash/pipeline"
require "cabin" # gem 'cabin'
require "logstash/plugin"
require "logstash/logging/json"

LogStash::ShutdownWatcher.unsafe_shutdown = unsafe_shutdown?
LogStash::ShutdownWatcher.logger = @logger
Expand Down Expand Up @@ -294,11 +299,22 @@ def configure_logging(path)

puts "Sending logstash logs to #{path}."
@logger.unsubscribe(@logger_subscription) if @logger_subscription
@logger_subscription = @logger.subscribe(@log_fd)
if log_in_json?
@logger_subscription = @logger.subscribe(LogStash::Logging::JSON.new(@log_fd))
@logger.subscribe(LogStash::Logging::JSON.new(STDOUT), :level => :fatal)
else
@logger_subscription = @logger.subscribe(@log_fd)
@logger.subscribe(STDOUT, :level => :fatal)
end
else
@logger.subscribe(STDOUT)
if log_in_json?
@logger.subscribe(LogStash::Logging::JSON.new(STDOUT))
else
@logger.subscribe(STDOUT)
end
end


# TODO(sissel): redirect stdout/stderr to the log as well
# http://jira.codehaus.org/browse/JRUBY-7003
end # def configure_logging
Expand Down
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/inputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def stop

public
def do_stop
@logger.debug("stopping", :plugin => self)
@logger.debug("stopping", :plugin => self.class.name)
@stop_called.make_true
stop
end
Expand Down
21 changes: 21 additions & 0 deletions logstash-core/lib/logstash/logging/json.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# encoding: utf-8
require "logstash/namespace"
require "logstash/logging"
require "logstash/json"

module LogStash; class Logging; class JSON
def initialize(io)
raise ArgumentError, "Expected IO, got #{io.class.name}" unless io.is_a?(IO)

@io = io
@lock = Mutex.new
end

def <<(obj)
serialized = LogStash::Json.dump(obj)
@lock.synchronize do
@io.puts(serialized)
@io.flush
end
end
end; end; end
6 changes: 3 additions & 3 deletions logstash-core/lib/logstash/output_delegator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def register
@workers << @klass.new(@config)
@workers.first.register # Needed in case register calls `workers_not_supported`

@logger.debug("Will start workers for output", :worker_count => target_worker_count, :class => @klass)
@logger.debug("Will start workers for output", :worker_count => target_worker_count, :class => @klass.name)

# Threadsafe versions don't need additional workers
setup_additional_workers!(target_worker_count) unless @threadsafe
Expand Down Expand Up @@ -134,7 +134,7 @@ def worker_multi_receive(events)
end

def do_close
@logger.debug("closing output delegator", :klass => @klass)
@logger.debug("closing output delegator", :klass => @klass.name)

if @threadsafe
@workers.each(&:do_close)
Expand Down Expand Up @@ -169,4 +169,4 @@ def worker_count
private
# Needed for testing, so private
attr_reader :threadsafe_worker, :worker_queue
end end
end end
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ def shutdown(&before_stop)
def shutdown_workers
# Each worker thread will receive this exactly once!
@worker_threads.each do |t|
@logger.debug("Pushing shutdown", :thread => t)
@logger.debug("Pushing shutdown", :thread => t.inspect)
@input_queue.push(LogStash::SHUTDOWN)
end

Expand Down
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def initialize(params=nil)
# main task terminates
public
def do_close
@logger.debug("closing", :plugin => self)
@logger.debug("closing", :plugin => self.class.name)
close
end

Expand Down
4 changes: 4 additions & 0 deletions logstash-core/locales/en.yml
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,7 @@ en:
Print the compiled config ruby code out as a debug log (you must also have --debug enabled).
WARNING: This will include any 'password' options passed to plugin configs as plaintext, and may result
in plaintext passwords appearing in your logs!
log-in-json: |+
Specify that Logstash should write its own logs in JSON form - one
event per line. If false, Logstash will log using Ruby's
Object#inspect (not easy to machine-parse)
1 change: 1 addition & 0 deletions logstash-core/spec/logstash/output_delegator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
allow(out_klass).to receive(:new).with(any_args).and_return(out_inst)
allow(out_klass).to receive(:threadsafe?).and_return(false)
allow(out_klass).to receive(:workers_not_supported?).and_return(false)
allow(out_klass).to receive(:name).and_return("example")
allow(out_inst).to receive(:register)
allow(out_inst).to receive(:multi_receive)
allow(logger).to receive(:debug).with(any_args)
Expand Down
4 changes: 2 additions & 2 deletions logstash-core/spec/logstash/plugin_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

describe LogStash::Plugin do
it "should fail lookup on inexisting type" do
expect_any_instance_of(Cabin::Channel).to receive(:debug).once
#expect_any_instance_of(Cabin::Channel).to receive(:debug).once
expect { LogStash::Plugin.lookup("badbadtype", "badname") }.to raise_error(LogStash::PluginLoadingError)
end

it "should fail lookup on inexisting name" do
expect_any_instance_of(Cabin::Channel).to receive(:debug).once
#expect_any_instance_of(Cabin::Channel).to receive(:debug).once
expect { LogStash::Plugin.lookup("filter", "badname") }.to raise_error(LogStash::PluginLoadingError)
end

Expand Down
27 changes: 27 additions & 0 deletions logstash-core/spec/logstash/runner_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "logstash/runner"
require "stud/task"
require "stud/trap"
require "stud/temporary"

class NullRunner
def run(args); end
Expand All @@ -14,6 +15,7 @@ def run(args); end

before :each do
allow(Cabin::Channel).to receive(:get).with(LogStash).and_return(channel)
allow(channel).to receive(:subscribe).with(any_args).and_call_original
end

context "argument parsing" do
Expand Down Expand Up @@ -54,4 +56,29 @@ def run(args); end
end
end
end

context "--log-in-json" do
let(:logfile) { Stud::Temporary.file }
let(:args) { [ "agent", "--log-in-json", "-l", logfile.path, "-e", "some-invalid-config" ] }

after do
logfile.close
File.unlink(logfile.path)
end

before do
expect(channel).to receive(:subscribe).with(kind_of(LogStash::Logging::JSON)).and_call_original
subject.run(args).wait

# Log file should have stuff in it.
expect(logfile.stat.size).to be > 0
end

it "should log in valid json. One object per line." do
logfile.each_line do |line|
expect(line).not_to be_empty
expect { LogStash::Json.load(line) }.not_to raise_error
end
end
end
end
1 change: 1 addition & 0 deletions logstash-core/spec/logstash/shutdown_watcher_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
allow(pipeline).to receive(:thread).and_return(Thread.current)
allow(reporter).to receive(:snapshot).and_return(reporter_snapshot)
allow(reporter_snapshot).to receive(:o_simple_hash).and_return({})
allow(reporter_snapshot).to receive(:to_json_data).and_return("reporter-double")

allow(subject).to receive(:pipeline_report_snapshot).and_wrap_original do |m, *args|
report_count += 1
Expand Down
31 changes: 31 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,38 @@
CoverageHelper.eager_load if ENV['COVERAGE']

require "logstash/devutils/rspec/spec_helper"
require "logstash/logging/json"

class JSONIOThingy < IO
def initialize; end
def flush; end

def puts(payload)
# Ensure that all log payloads are valid json.
LogStash::Json.load(payload)
end
end

RSpec.configure do |c|
c.before do
# Force Cabin to always have a JSON subscriber. The main purpose of this
# is to catch crashes in json serialization for our logs. JSONIOThingy
# exists to validate taht what LogStash::Logging::JSON emits is always
# valid JSON.
jsonvalidator = JSONIOThingy.new
allow(Cabin::Channel).to receive(:new).and_wrap_original do |m, *args|
logger = m.call(*args)
logger.level = :debug
logger.subscribe(LogStash::Logging::JSON.new(jsonvalidator))

logger
end
end

end

def installed_plugins
Gem::Specification.find_all.select { |spec| spec.metadata["logstash_plugin"] }.map { |plugin| plugin.name }
end


0 comments on commit 39e0ef5

Please sign in to comment.