From 4c61bbc1c55d996e7e986bfa73ffcae39cbea87b Mon Sep 17 00:00:00 2001 From: Haitao Li Date: Sat, 4 Sep 2021 22:09:19 +1000 Subject: [PATCH] Add gzip compression --- README.md | 2 +- fluent-bit-kinesis.go | 4 +++- kinesis/kinesis.go | 35 +++++++++++++++++++++++++++++++---- kinesis/kinesis_test.go | 23 ++++++++++++++++------- 4 files changed, 51 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 84205a7..126579b 100644 --- a/README.md +++ b/README.md @@ -23,7 +23,7 @@ If you think you’ve found a potential security issue, please do not post it in * `experimental_concurrency`: Specify a limit of concurrent go routines for flushing records to kinesis. By default `experimental_concurrency` is set to 0 and records are flushed in Fluent Bit's single thread. This means that requests to Kinesis will block the execution of Fluent Bit. If this value is set to `4` for example then calls to Flush records from fluentbit will spawn concurrent go routines until the limit of `4` concurrent go routines are running. Once the `experimental_concurrency` limit is reached calls to Flush will return a retry code. The upper limit of the `experimental_concurrency` option is `10`. WARNING: Enabling `experimental_concurrency` can lead to data loss if the retry count is reached. Enabling concurrency will increase resource usage (memory and CPU). * `experimental_concurrency_retries`: Specify a limit to the number of retries concurrent goroutines will attempt. By default `4` retries will be attempted before records are dropped. * `aggregation`: Setting `aggregation` to `true` will enable KPL aggregation of records sent to Kinesis. This feature isn't compatible with the `partition_key` feature. See the KPL aggregation section below for more details. -* `compression`: Setting `compression` to `zlib` will enable zlib compression of each record. By default this feature is disabled and records are not compressed. +* `compression`: Specify an algorithm for compression of each record. Supported compression algorithms are `zlib` and `gzip`. By default this feature is disabled and records are not compressed. * `replace_dots`: Replace dot characters in key names with the value of this option. For example, if you add `replace_dots _` in your config then all occurrences of `.` will be replaced with an underscore. By default, dots will not be replaced. ### Permissions diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 71f0304..08c95d8 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -152,10 +152,12 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin, var comp kinesis.CompressionType if strings.ToLower(compression) == string(kinesis.CompressionZlib) { comp = kinesis.CompressionZlib + } else if strings.ToLower(compression) == string(kinesis.CompressionGzip) { + comp = kinesis.CompressionGzip } else if strings.ToLower(compression) == string(kinesis.CompressionNone) || compression == "" { comp = kinesis.CompressionNone } else { - return nil, fmt.Errorf("[kinesis %d] Invalid 'compression' value (%s) specified, must be 'zlib', 'none', or undefined", pluginID, compression) + return nil, fmt.Errorf("[kinesis %d] Invalid 'compression' value (%s) specified, must be 'zlib', 'gzip', 'none', or undefined", pluginID, compression) } return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, replaceDots, concurrencyInt, concurrencyRetriesInt, isAggregate, appendNL, comp, pluginID) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index de19190..2fc49df 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -18,6 +18,7 @@ package kinesis import ( "bytes" + "compress/gzip" "compress/zlib" "fmt" "os" @@ -72,6 +73,8 @@ const ( CompressionNone CompressionType = "none" // CompressionZlib enables zlib compression CompressionZlib = "zlib" + // CompressionGzip enables gzip compression + CompressionGzip = "gzip" ) // OutputPlugin sends log records to kinesis @@ -460,11 +463,15 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface data = append(data, []byte("\n")...) } - if outputPlugin.compression == CompressionZlib { + switch outputPlugin.compression { + case CompressionZlib: data, err = zlibCompress(data) - if err != nil { - return nil, err - } + case CompressionGzip: + data, err = gzipCompress(data) + default: + } + if err != nil { + return nil, err } if len(data)+partitionKeyLen > maximumRecordSize { @@ -610,6 +617,26 @@ func zlibCompress(data []byte) ([]byte, error) { return b.Bytes(), nil } +func gzipCompress(data []byte) ([]byte, error) { + var b bytes.Buffer + + if data == nil { + return nil, fmt.Errorf("No data to compress. 'nil' value passed as data") + } + + zw := gzip.NewWriter(&b) + _, err := zw.Write(data) + if err != nil { + return data, err + } + err = zw.Close() + if err != nil { + return data, err + } + + return b.Bytes(), nil +} + // stringOrByteArray returns the string value if the input is a string or byte array otherwise an empty string func stringOrByteArray(v interface{}) string { switch t := v.(type) { diff --git a/kinesis/kinesis_test.go b/kinesis/kinesis_test.go index 3e6e977..df754e0 100644 --- a/kinesis/kinesis_test.go +++ b/kinesis/kinesis_test.go @@ -187,19 +187,28 @@ func TestAddRecordWithConcurrency(t *testing.T) { assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected FlushConcurrent return code to be FLB_OK") } -func TestZlibCompression(t *testing.T) { +var compressors = map[string]func([]byte) ([]byte, error){ + "zlib": zlibCompress, + "gzip": gzipCompress, +} + +func TestCompression(t *testing.T) { testData := []byte("Test Data: This is test data for compression. This data is needs to have with some repetitive values, so compression is effective.") - compressedBuf, err := zlibCompress(testData) - assert.Equal(t, err, nil, "Expected successful compression of data") - assert.Lessf(t, len(compressedBuf), len(testData), "Compressed data buffer should contain fewer bytes") + for z, f := range compressors { + compressedBuf, err := f(testData) + assert.Equalf(t, err, nil, "Expected successful %s compression of data", z) + assert.Lessf(t, len(compressedBuf), len(testData), "%s compressed data buffer should contain fewer bytes", z) + } } -func TestZlibCompressionEmpty(t *testing.T) { +func TestCompressionEmpty(t *testing.T) { - _, err := zlibCompress(nil) - assert.NotEqual(t, err, nil, "'nil' data should return an error") + for z, f := range compressors { + _, err := f(nil) + assert.NotEqualf(t, err, nil, "%s compressing 'nil' data should return an error", z) + } } func TestDotReplace(t *testing.T) {