Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add gzip compression #162

Merged
merged 2 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ If you think you’ve found a potential security issue, please do not post it in
* `experimental_concurrency`: Specify a limit of concurrent go routines for flushing records to kinesis. By default `experimental_concurrency` is set to 0 and records are flushed in Fluent Bit's single thread. This means that requests to Kinesis will block the execution of Fluent Bit. If this value is set to `4` for example then calls to Flush records from fluentbit will spawn concurrent go routines until the limit of `4` concurrent go routines are running. Once the `experimental_concurrency` limit is reached calls to Flush will return a retry code. The upper limit of the `experimental_concurrency` option is `10`. WARNING: Enabling `experimental_concurrency` can lead to data loss if the retry count is reached. Enabling concurrency will increase resource usage (memory and CPU).
* `experimental_concurrency_retries`: Specify a limit to the number of retries concurrent goroutines will attempt. By default `4` retries will be attempted before records are dropped.
* `aggregation`: Setting `aggregation` to `true` will enable KPL aggregation of records sent to Kinesis. This feature isn't compatible with the `partition_key` feature. See the KPL aggregation section below for more details.
* `compression`: Setting `compression` to `zlib` will enable zlib compression of each record. By default this feature is disabled and records are not compressed.
* `compression`: Specify an algorithm for compression of each record. Supported compression algorithms are `zlib` and `gzip`. By default this feature is disabled and records are not compressed.
* `replace_dots`: Replace dot characters in key names with the value of this option. For example, if you add `replace_dots _` in your config then all occurrences of `.` will be replaced with an underscore. By default, dots will not be replaced.

### Permissions
Expand Down
4 changes: 3 additions & 1 deletion fluent-bit-kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,12 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
var comp kinesis.CompressionType
if strings.ToLower(compression) == string(kinesis.CompressionZlib) {
comp = kinesis.CompressionZlib
} else if strings.ToLower(compression) == string(kinesis.CompressionGzip) {
comp = kinesis.CompressionGzip
} else if strings.ToLower(compression) == string(kinesis.CompressionNone) || compression == "" {
comp = kinesis.CompressionNone
} else {
return nil, fmt.Errorf("[kinesis %d] Invalid 'compression' value (%s) specified, must be 'zlib', 'none', or undefined", pluginID, compression)
return nil, fmt.Errorf("[kinesis %d] Invalid 'compression' value (%s) specified, must be 'zlib', 'gzip', 'none', or undefined", pluginID, compression)
}

return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, replaceDots, concurrencyInt, concurrencyRetriesInt, isAggregate, appendNL, comp, pluginID)
Expand Down
35 changes: 31 additions & 4 deletions kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package kinesis

import (
"bytes"
"compress/gzip"
"compress/zlib"
"fmt"
"os"
Expand Down Expand Up @@ -72,6 +73,8 @@ const (
CompressionNone CompressionType = "none"
// CompressionZlib enables zlib compression
CompressionZlib = "zlib"
// CompressionGzip enables gzip compression
CompressionGzip = "gzip"
)

// OutputPlugin sends log records to kinesis
Expand Down Expand Up @@ -460,11 +463,15 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface
data = append(data, []byte("\n")...)
}

if outputPlugin.compression == CompressionZlib {
switch outputPlugin.compression {
case CompressionZlib:
data, err = zlibCompress(data)
if err != nil {
return nil, err
}
case CompressionGzip:
data, err = gzipCompress(data)
default:
}
if err != nil {
return nil, err
}

if len(data)+partitionKeyLen > maximumRecordSize {
Expand Down Expand Up @@ -610,6 +617,26 @@ func zlibCompress(data []byte) ([]byte, error) {
return b.Bytes(), nil
}

func gzipCompress(data []byte) ([]byte, error) {
var b bytes.Buffer

if data == nil {
return nil, fmt.Errorf("No data to compress. 'nil' value passed as data")
}
hossain-rayhan marked this conversation as resolved.
Show resolved Hide resolved

zw := gzip.NewWriter(&b)
_, err := zw.Write(data)
if err != nil {
return data, err
}
err = zw.Close()
if err != nil {
return data, err
}

return b.Bytes(), nil
}

// stringOrByteArray returns the string value if the input is a string or byte array otherwise an empty string
func stringOrByteArray(v interface{}) string {
switch t := v.(type) {
Expand Down
23 changes: 16 additions & 7 deletions kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,28 @@ func TestAddRecordWithConcurrency(t *testing.T) {
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected FlushConcurrent return code to be FLB_OK")
}

func TestZlibCompression(t *testing.T) {
var compressors = map[string]func([]byte) ([]byte, error){
"zlib": zlibCompress,
"gzip": gzipCompress,
}

func TestCompression(t *testing.T) {

testData := []byte("Test Data: This is test data for compression. This data is needs to have with some repetitive values, so compression is effective.")

compressedBuf, err := zlibCompress(testData)
assert.Equal(t, err, nil, "Expected successful compression of data")
assert.Lessf(t, len(compressedBuf), len(testData), "Compressed data buffer should contain fewer bytes")
for z, f := range compressors {
compressedBuf, err := f(testData)
assert.Equalf(t, err, nil, "Expected successful %s compression of data", z)
assert.Lessf(t, len(compressedBuf), len(testData), "%s compressed data buffer should contain fewer bytes", z)
}
}

func TestZlibCompressionEmpty(t *testing.T) {
func TestCompressionEmpty(t *testing.T) {

_, err := zlibCompress(nil)
assert.NotEqual(t, err, nil, "'nil' data should return an error")
for z, f := range compressors {
_, err := f(nil)
assert.NotEqualf(t, err, nil, "%s compressing 'nil' data should return an error", z)
}
}

func TestDotReplace(t *testing.T) {
Expand Down