Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki Logstash Plugin #1822

Merged
merged 30 commits into from
Jul 17, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3159e46
Logstash plugin
adityacs Mar 26, 2020
0994c44
include_labels
adityacs Apr 10, 2020
727bdcb
include_labels
adityacs Apr 10, 2020
88587c1
Removes binary.
cyriltovena Apr 17, 2020
9c9b072
Improve documentation and remove the push path.
cyriltovena Apr 17, 2020
d45c9db
Move to cmd.
cyriltovena Jul 13, 2020
a21605a
Add more precision for jruby.
cyriltovena Jul 13, 2020
87895e9
Update docs/clients/logstash/README.md
cyriltovena Jul 13, 2020
913dbbb
p
cyriltovena Jul 13, 2020
9b9a250
ignore
cyriltovena Jul 13, 2020
1a4de4a
remove ignore file/
cyriltovena Jul 13, 2020
659ca6b
More precision for installing jruby
cyriltovena Jul 13, 2020
c2b33e3
Rename without Grafana
cyriltovena Jul 13, 2020
cbef9a4
A lot of refactoring and testing.
cyriltovena Jul 14, 2020
d10f042
change delay logic
adityacs Jul 15, 2020
26875cb
Fully tested version.
cyriltovena Jul 15, 2020
5a1af98
Merge branch 'loki_logstash_plugin' of github.com:adityacs/loki into …
cyriltovena Jul 15, 2020
1366ef8
Forgot to save merge.
cyriltovena Jul 15, 2020
99f504d
working version.
cyriltovena Jul 15, 2020
4776178
Makefile + easier docker build.
cyriltovena Jul 16, 2020
425d375
adds ci to build logstash image.
cyriltovena Jul 16, 2020
0d2acee
Fix build for logstash.
cyriltovena Jul 16, 2020
37508d1
Adds example with helm charts.
cyriltovena Jul 16, 2020
a8cde47
Fix target to send 10 logs with logstash.
cyriltovena Jul 16, 2020
0a266ec
Improved documentation.
cyriltovena Jul 16, 2020
4ae8cc7
add missing helm add repo for external repo
cyriltovena Jul 16, 2020
c87581a
Review comment.
cyriltovena Jul 16, 2020
3e6a5cf
Fixes loki service in Promtail.
cyriltovena Jul 17, 2020
29ee215
Merge remote-tracking branch 'upstream/master' into loki_logstash_plugin
cyriltovena Jul 17, 2020
e9fb63a
Update loki-stack version
cyriltovena Jul 17, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/logstash/.rakeTasks
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<Settings><!--This file was automatically generated by Ruby plugin.
You are allowed to:
1. Remove rake task
2. Add existing rake tasks
To add existing rake tasks automatically delete this file and reload the project.
--><RakeGroup description="" fullCmd="" taksId="rake" /></Settings>
15 changes: 9 additions & 6 deletions cmd/logstash/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ source 'https://rubygems.org'

gemspec

logstash_path = ENV["LOGSTASH_PATH"] || "logstash-libs"
use_logstash_source = ENV["LOGSTASH_SOURCE"] && ENV["LOGSTASH_SOURCE"].to_s == "1"
# logstash_path = ENV["LOGSTASH_PATH"] || "./logstash"
# use_logstash_source = ENV["LOGSTASH_SOURCE"] && ENV["LOGSTASH_SOURCE"].to_s == "1"
#
# if Dir.exist?(logstash_path) && use_logstash_source
#
# end

if Dir.exist?(logstash_path) && use_logstash_source
gem 'logstash-core', :path => "#{logstash_path}/logstash-core"
gem 'logstash-core-plugin-api', :path => "#{logstash_path}/logstash-core-plugin-api"
end

gem 'logstash-core', :path => "./logstash/logstash-core"
gem 'logstash-core-plugin-api', :path => "./logstash/logstash-core-plugin-api"
15 changes: 12 additions & 3 deletions cmd/logstash/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ cd logstash
git checkout tags/v7.6.2
export LOGSTASH_PATH=`pwd`
export LOGSTASH_SOURCE="1"
export GEM_PATH=$LOGSTASH_PATH/vendor/bundle/
export GEM_HOME=$LOGSTASH_PATH/vendor/bundle/
export GEM_PATH=$LOGSTASH_PATH/vendor/bundle/jruby/2.5.0
export GEM_HOME=$LOGSTASH_PATH/vendor/bundle/jruby/2.5.0
./gradlew assemble
cd ..
ruby -S bundle install --path
ruby -S bundle install --path=$LOGSTASH_PATH/vendor/bundle/
ruby -S bundle exec rake vendor
```

Expand All @@ -57,6 +58,14 @@ ruby -S bundle exec rake vendor

`ruby -S bundle exec rspec`

Alternatively if you don't want to install JRuby. Enter inside logstash-loki container.

```bash
docker build -t logstash-loki ./
docker run -v `pwd`/spec:/home/logstash/spec -it --rm --entrypoint /bin/sh logstash-loki
bundle exec rspec
```

## Install plugin to local logstash

`bin/logstash-plugin install --no-verify --local logstash-output-loki-1.0.0.gem`
Expand Down
141 changes: 33 additions & 108 deletions cmd/logstash/lib/logstash/outputs/loki.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ class LogStash::Outputs::Loki < LogStash::Outputs::Base
## 'Interval in seconds to wait before pushing a batch of records to loki. Defaults to 1 second'
config :batch_wait, :validate => :number, :default => 1, :required => false

## 'Array of label names to include in all logstreams'
config :include_labels, :validate => :array, :default => [], :required => true

## 'Extra labels to add to all log streams'
config :external_labels, :validate => :hash, :default => {}, :required => false

## 'Log line field to pick from logstash. Defaults to "message"'
config :message_field, :validate => :string, :default => "message", :required => false

Expand All @@ -64,23 +58,16 @@ def register
raise LogStash::ConfigurationError, "url parameter must be valid HTTP, currently '#{@url}'"
end

if @include_labels.empty?
raise LogStash::ConfigurationError, "include_labels should contain atleast one label, currently '#{@include_labels}'"
end

if @min_delay > @max_delay
raise LogStash::ConfigurationError, "Min delay should be less than Max delay, currently 'Min delay is #{@min_delay} and Max delay is #{@max_delay}'"
end

@logger.info("Loki output plugin", :class => self.class.name)

# intialize channels
# initialize channels
@Channel = Concurrent::Channel
@entries = @Channel.new

# excluded message and timestamp from labels
@exclude_labels = ["message", "@timestamp"]

# create nil batch object.
@batch = nil

Expand Down Expand Up @@ -130,8 +117,8 @@ def ssl_opts(uri)
end

def run()
min_wait_checkfrequency = 1/1000 #1 millisecond
max_wait_checkfrequency = @batch_wait
min_wait_checkfrequency = 1/100 #1 millisecond
max_wait_checkfrequency = @batch_wait / 10
if max_wait_checkfrequency < min_wait_checkfrequency
max_wait_checkfrequency = min_wait_checkfrequency
end
Expand All @@ -140,27 +127,15 @@ def run()
loop do
Concurrent::Channel.select do |s|
s.take(@entries) { |e|
if @batch.nil?
@batch = Batch.new(e)
next
end

line = e.entry['line']
if @batch.size_bytes_after(line) > @batch_size
@logger.debug("Max batch_size is reached. Sending batch to loki")
send(@tenant_id, @batch)
@batch = Batch.new(e)
next
end
@batch.add(e)
if add_entry_to_batch(e)
@logger.debug("Max batch_size is reached. Sending batch to loki")
send(@tenant_id, @batch)
@batch = Batch.new(e)
end
}
s.take(@max_wait_check) {
# Send batch if max wait time has been reached
if [email protected]?
if @batch.age() < @batch_wait
next
end

if is_batch_expired
@logger.debug("Max batch_wait time is reached. Sending batch to loki")
send(@tenant_id, @batch)
@batch = nil
Expand All @@ -170,69 +145,42 @@ def run()
end
end

def add_entry_to_batch(e)
line = e.entry['line']
# we don't want to send empty lines.
return false if line.to_s.strip.empty?

if @batch.nil?
@batch = Batch.new(e)
return false
end

if @batch.size_bytes_after(line) > @batch_size
return true
end
@batch.add(e)
return false
end

def is_batch_expired
return [email protected]? && @batch.age() >= @batch_wait
end

## Receives logstash events
public
def receive(event)
labels = {}
event_hash = event.to_hash
lbls = handle_labels(event_hash, labels, "")

data_labels, entry_hash = build_entry(lbls, event)
@entries << Entry.new(data_labels, entry_hash)

@entries << Entry.new(event,@message_field)
end

def close
@logger.info("Closing loki output plugin. Flushing all pending batches")
send(@tenant_id, @batch) if [email protected]?
@entries.close
send(@tenant_id, @batch) if [email protected]?
@max_wait_check.close if !@max_wait_check.nil?
end

def build_entry(lbls, event)
labels = lbls.merge(@external_labels)
entry_hash = {
"ts" => event.get("@timestamp").to_i * (10**9),
"line" => event.get(@message_field).to_s
}
return labels, entry_hash
end

def handle_labels(event_hash, labels, parent_key)
event_hash.each{ |key,value|
if !@exclude_labels.include?(key)
if value.is_a?(Hash)
if parent_key != ""
handle_labels(value, labels, parent_key + "_" + key)
else
handle_labels(value, labels, key)
end
else
if parent_key != ""
labels[parent_key + "_" + key] = value.to_s
else
labels[key] = value.to_s
end
end
end
}
return extract_labels(labels)
end

def extract_labels(extracted_labels)
labels = {}
extracted_labels.each { |key, value|
if @include_labels.include?(key)
key = key.gsub("@", '')
labels[key] = value
end
}
return labels
end

def send(tenant_id, batch)
payload = build_payload(batch)
res = loki_http_request(tenant_id, payload, @min_delay, @max_delay, @retries)
res = loki_http_request(tenant_id, batch.to_json, @min_delay, @max_delay, @retries)

if res.is_a?(Net::HTTPSuccess)
@logger.debug("Successfully pushed data to loki")
Expand Down Expand Up @@ -282,27 +230,4 @@ def loki_http_request(tenant_id, payload, min_delay, max_delay, retries)
end
return res
end

def build_payload(batch)
payload = {}
payload['streams'] = []
batch.streams.each { |labels, stream|
stream_obj = get_stream_obj(stream)
payload['streams'].push(stream_obj)
}
return payload.to_json
end

def get_stream_obj(stream)
stream_obj = {}
stream_obj['stream'] = stream['labels']
stream_obj['values'] = []
values = []
stream['entries'].each { |entry|
values.push(entry['ts'].to_s)
values.push(entry['line'])
}
stream_obj['values'].push(values)
return stream_obj
end
end
106 changes: 61 additions & 45 deletions cmd/logstash/lib/logstash/outputs/loki/batch.rb
Original file line number Diff line number Diff line change
@@ -1,47 +1,63 @@
require 'time'

module LogStash
module Outputs
class Loki
class Batch
attr_reader :streams
def initialize(e)
@bytes = 0
@createdAt = Time.now
@streams = {}
add(e)
end

def size_bytes()
return @bytes
end

def add(e)
@bytes = @bytes + e.entry['line'].length

# Append the entry to an already existing stream (if any)
labels = e.labels.to_s
if @streams.has_key?(labels)
stream = @streams[labels]
stream['entries'] = stream['entries'] + e.entry
return
else
# Add the entry as a new stream
@streams[labels] = {
"labels" => e.labels,
"entries" => [e.entry],
}
end
end

def size_bytes_after(line)
return @bytes + line.length
end

def age()
return Time.now - @createdAt
end
end
end
end
end
module Loki
class Batch
attr_reader :streams
def initialize(e)
@bytes = 0
@createdAt = Time.now
@streams = {}
add(e)
end

def size_bytes
return @bytes
end

def add(e)
@bytes = @bytes + e.entry['line'].length

# Append the entry to an already existing stream (if any)
labels = e.labels.sort.to_h
labelkey = labels.to_s
if @streams.has_key?(labelkey)
stream = @streams[labelkey]
stream['entries'].append(e.entry)
return
else
# Add the entry as a new stream
@streams[labelkey] = {
"labels" => labels,
"entries" => [e.entry],
}
end
end

def size_bytes_after(line)
return @bytes + line.length
end

def age()
return Time.now - @createdAt
end

def to_json
streams = []
@streams.each { |_ , stream|
streams.append(build_stream(stream))
}
return {"streams"=>streams}.to_json
end

def build_stream(stream)
values = []
stream['entries'].each { |entry|
values.append([entry['ts'].to_s, entry['line']])
}
return {
'stream'=>stream['labels'],
'values' => values
}
end
end
end
Loading