From 78b5a7c50e2c168b5d87509b5aaa0731c708eed8 Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Tue, 30 May 2023 17:37:32 -0700 Subject: [PATCH] [exporter/splunk_hec] Apply compression if it's enabled in the config The compression used to be enabled only if the payload size was greater than 1.5KB which significantly complicated the logic and made it hard to test. This change makes the compression unconditionally applied to the payload if it's enabled in the config. The benchmarking shows improvements in the throughput and CPU usage for large payloads and expected degradation for small payloads which is acceptable given that it's not a common case. --- .chloggen/splunk-hec-apply-compression.yaml | 25 ++++ exporter/splunkhecexporter/buffer.go | 136 ++++++++++---------- exporter/splunkhecexporter/client.go | 8 +- exporter/splunkhecexporter/client_test.go | 126 ++++++++++-------- 4 files changed, 169 insertions(+), 126 deletions(-) create mode 100755 .chloggen/splunk-hec-apply-compression.yaml diff --git a/.chloggen/splunk-hec-apply-compression.yaml b/.chloggen/splunk-hec-apply-compression.yaml new file mode 100755 index 000000000000..8b4df43551dc --- /dev/null +++ b/.chloggen/splunk-hec-apply-compression.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: exporter/splunk_hec + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Apply compression to Splunk HEC payload unconditionally if it's enabled in the config. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [22969, 22018] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The compression used to be enabled only if the payload size was greater than 1.5KB which significantly + complicated the logic and made it hard to test. This change makes the compression unconditionally applied to + the payload if it's enabled in the config. The benchmarking shows improvements in the throughput and CPU usage for + large payloads and expected degradation for small payloads which is acceptable given that it's not a common case. + diff --git a/exporter/splunkhecexporter/buffer.go b/exporter/splunkhecexporter/buffer.go index 7a1c059f4db1..c4d82bc776a3 100644 --- a/exporter/splunkhecexporter/buffer.go +++ b/exporter/splunkhecexporter/buffer.go @@ -17,22 +17,24 @@ var ( errOverCapacity = errors.New("over capacity") ) -// Minimum number of bytes to compress. 1500 is the MTU of an ethernet frame. -const minCompressionLen = 1500 - // bufferState encapsulates intermediate buffer state when pushing data type bufferState struct { - compressionAvailable bool - bufferMaxLen uint - maxEventLength uint - writer io.Writer - buf *bytes.Buffer - jsonStream *jsoniter.Stream - rawLength int + maxEventLength uint + buf buffer + jsonStream *jsoniter.Stream + rawLength int +} + +type buffer interface { + io.Writer + io.Reader + io.Closer + Reset() + Len() int } func (b *bufferState) compressionEnabled() bool { - _, ok := b.writer.(*cancellableGzipWriter) + _, ok := b.buf.(*cancellableGzipWriter) return ok } @@ -42,9 +44,6 @@ func (b *bufferState) containsData() bool { func (b *bufferState) reset() { b.buf.Reset() - if _, ok := b.writer.(*cancellableBytesWriter); !ok { - b.writer = &cancellableBytesWriter{innerWriter: b.buf, maxCapacity: b.bufferMaxLen} - } b.rawLength = 0 } @@ -53,10 +52,7 @@ func (b *bufferState) Read(p []byte) (n int, err error) { } func (b *bufferState) Close() error { - if _, ok := b.writer.(*cancellableGzipWriter); ok { - return b.writer.(*cancellableGzipWriter).close() - } - return nil + return b.buf.Close() } // accept returns true if data is accepted by the buffer @@ -64,53 +60,15 @@ func (b *bufferState) accept(data []byte) (bool, error) { if len(data)+b.rawLength > int(b.maxEventLength) { return false, nil } - _, err := b.writer.Write(data) - overCapacity := errors.Is(err, errOverCapacity) - bufLen := b.buf.Len() - if overCapacity { - bufLen += len(data) - } - if b.compressionAvailable && !b.compressionEnabled() && bufLen > minCompressionLen { - // switch over to a zip buffer. - tmpBuf := bytes.NewBuffer(make([]byte, 0, b.bufferMaxLen+bufCapPadding)) - writer := gzip.NewWriter(tmpBuf) - writer.Reset(tmpBuf) - zipWriter := &cancellableGzipWriter{ - innerBuffer: tmpBuf, - innerWriter: writer, - // 8 bytes required for the zip footer. - maxCapacity: b.bufferMaxLen - 8, - } - - if b.bufferMaxLen == 0 { - zipWriter.maxCapacity = 0 - } - - // we write the bytes buffer into the zip buffer. Any error from this is I/O, and should stop the process. - if _, err2 := zipWriter.Write(b.buf.Bytes()); err2 != nil { - return false, err2 - } - b.writer = zipWriter - b.buf = tmpBuf - // if the byte writer was over capacity, try to write the new entry in the zip writer: - if overCapacity { - if _, err2 := zipWriter.Write(data); err2 != nil { - overCapacity2 := errors.Is(err2, errOverCapacity) - if overCapacity2 { - return false, nil - } - return false, err2 - } - - } + _, err := b.buf.Write(data) + if err == nil { b.rawLength += len(data) return true, nil } - if overCapacity { + if errors.Is(err, errOverCapacity) { return false, nil } - b.rawLength += len(data) - return true, err + return false, err } type cancellableBytesWriter struct { @@ -128,6 +86,22 @@ func (c *cancellableBytesWriter) Write(b []byte) (int, error) { return c.innerWriter.Write(b) } +func (c *cancellableBytesWriter) Read(p []byte) (int, error) { + return c.innerWriter.Read(p) +} + +func (c *cancellableBytesWriter) Reset() { + c.innerWriter.Reset() +} + +func (c *cancellableBytesWriter) Close() error { + return nil +} + +func (c *cancellableBytesWriter) Len() int { + return c.innerWriter.Len() +} + type cancellableGzipWriter struct { innerBuffer *bytes.Buffer innerWriter *gzip.Writer @@ -168,10 +142,24 @@ func (c *cancellableGzipWriter) Write(b []byte) (int, error) { return c.innerWriter.Write(b) } -func (c *cancellableGzipWriter) close() error { +func (c *cancellableGzipWriter) Read(p []byte) (int, error) { + return c.innerBuffer.Read(p) +} + +func (c *cancellableGzipWriter) Reset() { + c.innerBuffer.Reset() + c.innerWriter.Reset(c.innerBuffer) + c.len = 0 +} + +func (c *cancellableGzipWriter) Close() error { return c.innerWriter.Close() } +func (c *cancellableGzipWriter) Len() int { + return c.innerBuffer.Len() +} + // bufferStatePool is a pool of bufferState objects. type bufferStatePool struct { pool *sync.Pool @@ -189,18 +177,28 @@ func (p bufferStatePool) put(bf *bufferState) { const initBufferCap = 512 -func newBufferStatePool(bufCap uint, compressionAvailable bool, maxEventLength uint) bufferStatePool { +func newBufferStatePool(bufCap uint, compressionEnabled bool, maxEventLength uint) bufferStatePool { return bufferStatePool{ &sync.Pool{ New: func() interface{} { - buf := bytes.NewBuffer(make([]byte, 0, initBufferCap)) + innerBuffer := bytes.NewBuffer(make([]byte, 0, initBufferCap)) + var buf buffer + if compressionEnabled { + buf = &cancellableGzipWriter{ + innerBuffer: innerBuffer, + innerWriter: gzip.NewWriter(buf), + maxCapacity: bufCap, + } + } else { + buf = &cancellableBytesWriter{ + innerWriter: innerBuffer, + maxCapacity: bufCap, + } + } return &bufferState{ - compressionAvailable: compressionAvailable, - writer: &cancellableBytesWriter{innerWriter: buf, maxCapacity: bufCap}, - buf: buf, - jsonStream: jsoniter.NewStream(jsoniter.ConfigDefault, nil, initBufferCap), - bufferMaxLen: bufCap, - maxEventLength: maxEventLength, + buf: buf, + jsonStream: jsoniter.NewStream(jsoniter.ConfigDefault, nil, initBufferCap), + maxEventLength: maxEventLength, } }, }, diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index c57500b25524..2b2baf4d65f3 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -219,7 +219,7 @@ func (c *client) fillLogsBuffer(logs plog.Logs, bs *bufferState, is iterState) ( } permanentErrors = append(permanentErrors, consumererror.NewPermanent( fmt.Errorf("dropped log event: error: event size %d bytes larger than configured max"+ - " content length %d bytes", len(b), bs.bufferMaxLen))) + " content length %d bytes", len(b), c.config.MaxContentLengthLogs))) return iterState{i, j, k + 1, false}, permanentErrors } } @@ -277,7 +277,7 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, bs *bufferState, is } permanentErrors = append(permanentErrors, consumererror.NewPermanent( fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+ - " content length %d bytes", len(b), bs.bufferMaxLen))) + " content length %d bytes", len(b), c.config.MaxContentLengthMetrics))) return iterState{i, j, k + 1, false}, permanentErrors } } @@ -322,7 +322,7 @@ func (c *client) fillTracesBuffer(traces ptrace.Traces, bs *bufferState, is iter } permanentErrors = append(permanentErrors, consumererror.NewPermanent( fmt.Errorf("dropped span event: error: event size %d bytes larger than configured max"+ - " content length %d bytes", len(b), bs.bufferMaxLen))) + " content length %d bytes", len(b), c.config.MaxContentLengthTraces))) return iterState{i, j, k + 1, false}, permanentErrors } } @@ -381,7 +381,7 @@ func (c *client) pushTracesDataInBatches(ctx context.Context, td ptrace.Traces, } func (c *client) postEvents(ctx context.Context, bufState *bufferState, headers map[string]string) error { - if err := bufState.Close(); err != nil { + if err := bufState.buf.Close(); err != nil { return err } return c.hecWorker.send(ctx, bufState, headers) diff --git a/exporter/splunkhecexporter/client_test.go b/exporter/splunkhecexporter/client_test.go index b538b58aaf46..cbde34bf5206 100644 --- a/exporter/splunkhecexporter/client_test.go +++ b/exporter/splunkhecexporter/client_test.go @@ -194,10 +194,8 @@ type CapturingData struct { func (c *CapturingData) ServeHTTP(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) - if c.checkCompression { - if len(body) > minCompressionLen && r.Header.Get("Content-Encoding") != "gzip" { - c.testing.Fatal("No compression") - } + if c.checkCompression && r.Header.Get("Content-Encoding") != "gzip" { + c.testing.Fatal("No compression") } if err != nil { @@ -377,13 +375,8 @@ func TestReceiveTracesBatches(t *testing.T) { type wantType struct { batches [][]string numBatches int - compressed bool } - // The test cases depend on the constant minCompressionLen = 1500. - // If the constant changed, the test cases with want.compressed=true must be updated. - require.Equal(t, minCompressionLen, 1500) - tests := []struct { name string conf *Config @@ -396,6 +389,7 @@ func TestReceiveTracesBatches(t *testing.T) { conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthTraces = 0 + cfg.DisableCompression = true return cfg }(), want: wantType{ @@ -414,6 +408,7 @@ func TestReceiveTracesBatches(t *testing.T) { conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthTraces = 320 + cfg.DisableCompression = true return cfg }(), want: wantType{ @@ -432,6 +427,7 @@ func TestReceiveTracesBatches(t *testing.T) { conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthTraces = 640 + cfg.DisableCompression = true return cfg }(), want: wantType{ @@ -443,7 +439,7 @@ func TestReceiveTracesBatches(t *testing.T) { }, }, { - name: "1 compressed batch of 2037 bytes, make sure the event size is more than minCompressionLen=1500 to trigger compression", + name: "1 compressed batch of 2037 bytes", traces: createTraceData(1, 10), conf: func() *Config { return NewFactory().CreateDefaultConfig().(*Config) @@ -453,7 +449,6 @@ func TestReceiveTracesBatches(t *testing.T) { {`"start_time":1`, `"start_time":2`, `"start_time":3`, `"start_time":4`, `"start_time":7`, `"start_time":8`, `"start_time":9`}, }, numBatches: 1, - compressed: true, }, }, { @@ -461,18 +456,17 @@ func TestReceiveTracesBatches(t *testing.T) { traces: createTraceData(1, 100), conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) - cfg.MaxContentLengthTraces = minCompressionLen + 500 + cfg.MaxContentLengthTraces = 2000 return cfg }(), want: wantType{ // just test that the test has 2 batches, don't test its contents. batches: [][]string{{""}, {""}}, numBatches: 2, - compressed: true, }, }, { - name: "100 events, make sure that we produce only one compressed batch when MaxContentLengthTraces is 0", + name: "100 events", traces: createTraceData(1, 100), conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) @@ -484,7 +478,6 @@ func TestReceiveTracesBatches(t *testing.T) { {`"start_time":1`, `"start_time":2`, `"start_time":3`, `"start_time":4`, `"start_time":7`, `"start_time":8`, `"start_time":9`, `"start_time":20`, `"start_time":40`, `"start_time":85`, `"start_time":98`, `"start_time":99`}, }, numBatches: 1, - compressed: true, }, }, { @@ -514,12 +507,12 @@ func TestReceiveTracesBatches(t *testing.T) { if test.conf.MaxContentLengthTraces != 0 { require.True(t, int(test.conf.MaxContentLengthTraces) > len(got[i].body)) } - if test.want.compressed { - validateCompressedContains(t, batch, got[i].body) - } else { + if test.conf.DisableCompression { for _, expected := range batch { assert.Contains(t, string(got[i].body), expected) } + } else { + validateCompressedContains(t, batch, got[i].body) } } @@ -528,7 +521,7 @@ func TestReceiveTracesBatches(t *testing.T) { eventFound := false for _, batch := range got { batchBody := batch.body - if test.want.compressed { + if !test.conf.DisableCompression { z, err := gzip.NewReader(bytes.NewReader(batchBody)) require.NoError(t, err) batchBody, err = io.ReadAll(z) @@ -553,15 +546,10 @@ func TestReceiveLogs(t *testing.T) { type wantType struct { batches [][]string numBatches int - compressed bool wantErr string wantDrops int // expected number of dropped events } - // The test cases depend on the constant minCompressionLen = 1500. - // If the constant changed, the test cases with want.compressed=true must be updated. - require.Equal(t, minCompressionLen, 1500) - tests := []struct { name string conf *Config @@ -574,6 +562,7 @@ func TestReceiveLogs(t *testing.T) { conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthLogs = 0 + cfg.DisableCompression = true return cfg }(), want: wantType{ @@ -592,6 +581,7 @@ func TestReceiveLogs(t *testing.T) { conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthLogs = 300 + cfg.DisableCompression = true return cfg }(), want: wantType{ @@ -605,7 +595,7 @@ func TestReceiveLogs(t *testing.T) { }, }, { - name: "1 log event long enough to trigger compression", + name: "1 log long event", logs: func() plog.Logs { l := createLogData(1, 1, 1) l.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr(strings.Repeat("a", 1800)) @@ -621,7 +611,6 @@ func TestReceiveLogs(t *testing.T) { {`"otel.log.name":"0_0_0"`}, }, numBatches: 1, - compressed: true, }, }, { @@ -630,6 +619,7 @@ func TestReceiveLogs(t *testing.T) { conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthLogs = 448 + cfg.DisableCompression = true return cfg }(), want: wantType{ @@ -641,7 +631,7 @@ func TestReceiveLogs(t *testing.T) { }, }, { - name: "1 compressed batch of 2037 bytes, make sure the event size is more than minCompressionLen=1500 to trigger compression", + name: "1 compressed batch of 2037 bytes", logs: createLogData(1, 1, 10), conf: func() *Config { return NewFactory().CreateDefaultConfig().(*Config) @@ -651,7 +641,6 @@ func TestReceiveLogs(t *testing.T) { {`"otel.log.name":"0_0_0"`, `"otel.log.name":"0_0_1"`, `"otel.log.name":"0_0_5"`, `"otel.log.name":"0_0_6"`, `"otel.log.name":"0_0_7"`, `"otel.log.name":"0_0_8"`, `"otel.log.name":"0_0_9"`}, }, numBatches: 1, - compressed: true, }, }, { @@ -659,7 +648,7 @@ func TestReceiveLogs(t *testing.T) { logs: createLogData(1, 1, 150), conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) - cfg.MaxContentLengthLogs = minCompressionLen + 150 + cfg.MaxContentLengthLogs = 1650 return cfg }(), want: wantType{ @@ -668,7 +657,6 @@ func TestReceiveLogs(t *testing.T) { {`"otel.log.name":"0_0_110"`, `"otel.log.name":"0_0_149"`}, }, numBatches: 2, - compressed: true, }, }, { @@ -684,14 +672,13 @@ func TestReceiveLogs(t *testing.T) { {`"otel.log.name":"0_0_0"`, `"otel.log.name":"0_0_90"`, `"otel.log.name":"0_0_110"`, `"otel.log.name":"0_0_149"`}, }, numBatches: 1, - compressed: true, }, }, { - name: "one event with 1340 bytes, then one triggering compression (going over 1500 bytes) and bypassing the max length, moving to a separate batch", + name: "one event with 1340 bytes and another one bypassing the max length, moving to a separate batch", logs: func() plog.Logs { firstLog := createLogData(1, 1, 2) - firstLog.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr(repeatableString(1340)) + firstLog.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Body().SetStr(repeatableString(1500)) firstLog.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Body().SetStr(repeatableString(2800000)) return firstLog }(), @@ -705,7 +692,6 @@ func TestReceiveLogs(t *testing.T) { {`"otel.log.name":"0_0_0"`}, {`"otel.log.name":"0_0_1"`}, }, numBatches: 2, - compressed: true, }, }, { @@ -723,7 +709,6 @@ func TestReceiveLogs(t *testing.T) { want: wantType{ batches: [][]string{}, numBatches: 0, - compressed: true, wantErr: "timeout", // our server will time out waiting for the data. }, }, @@ -806,12 +791,12 @@ func TestReceiveLogs(t *testing.T) { if test.conf.MaxContentLengthLogs != 0 { require.True(t, int(test.conf.MaxContentLengthLogs) > len(got[i].body)) } - if test.want.compressed { - validateCompressedContains(t, wantBatch, got[i].body) - } else { + if test.conf.DisableCompression { for _, expected := range wantBatch { assert.Contains(t, string(got[i].body), expected) } + } else { + validateCompressedContains(t, wantBatch, got[i].body) } } @@ -828,7 +813,7 @@ func TestReceiveLogs(t *testing.T) { eventFound := false for _, batch := range got { batchBody := batch.body - if test.want.compressed { + if !test.conf.DisableCompression { z, err := gzip.NewReader(bytes.NewReader(batchBody)) require.NoError(t, err) batchBody, err = io.ReadAll(z) @@ -867,6 +852,7 @@ func TestReceiveRaw(t *testing.T) { conf: func() *Config { conf := createDefaultConfig().(*Config) conf.ExportRaw = true + conf.DisableCompression = true return conf }(), text: "mylog\n", @@ -881,6 +867,7 @@ func TestReceiveRaw(t *testing.T) { conf: func() *Config { conf := createDefaultConfig().(*Config) conf.ExportRaw = true + conf.DisableCompression = true return conf }(), text: "bXlieXRlcw==\n", @@ -895,6 +882,7 @@ func TestReceiveRaw(t *testing.T) { conf: func() *Config { conf := createDefaultConfig().(*Config) conf.ExportRaw = true + conf.DisableCompression = true return conf }(), text: "64.345\n", @@ -905,6 +893,7 @@ func TestReceiveRaw(t *testing.T) { conf: func() *Config { conf := createDefaultConfig().(*Config) conf.ExportRaw = true + conf.DisableCompression = true return conf }(), text: "mylog\nmylog\nmylog\nmylog\nmylog\n", @@ -919,6 +908,7 @@ func TestReceiveRaw(t *testing.T) { conf: func() *Config { conf := createDefaultConfig().(*Config) conf.ExportRaw = true + conf.DisableCompression = true return conf }(), text: "[1,\"foo\",true]\n", @@ -933,6 +923,7 @@ func TestReceiveRaw(t *testing.T) { conf: func() *Config { conf := createDefaultConfig().(*Config) conf.ExportRaw = true + conf.DisableCompression = true return conf }(), text: "{\"foo\":\"bar\"}\n", @@ -1019,10 +1010,6 @@ func TestReceiveBatchedMetrics(t *testing.T) { compressed bool } - // The test cases depend on the constant minCompressionLen = 1500. - // If the constant changed, the test cases with want.compressed=true must be updated. - require.Equal(t, minCompressionLen, 1500) - tests := []struct { name string conf *Config @@ -1035,6 +1022,7 @@ func TestReceiveBatchedMetrics(t *testing.T) { conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthMetrics = 0 + cfg.DisableCompression = true return cfg }(), want: wantType{ @@ -1050,6 +1038,7 @@ func TestReceiveBatchedMetrics(t *testing.T) { conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthMetrics = 300 + cfg.DisableCompression = true return cfg }(), want: wantType{ @@ -1068,6 +1057,7 @@ func TestReceiveBatchedMetrics(t *testing.T) { conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthMetrics = 448 + cfg.DisableCompression = true return cfg }(), want: wantType{ @@ -1079,7 +1069,7 @@ func TestReceiveBatchedMetrics(t *testing.T) { }, }, { - name: "1 compressed batch of 2037 bytes, make sure the event size is more than minCompressionLen=1500 to trigger compression", + name: "1 compressed batch of 2037 bytes", metrics: createMetricsData(1, 10), conf: func() *Config { return NewFactory().CreateDefaultConfig().(*Config) @@ -1097,7 +1087,7 @@ func TestReceiveBatchedMetrics(t *testing.T) { metrics: createMetricsData(1, 100), conf: func() *Config { cfg := NewFactory().CreateDefaultConfig().(*Config) - cfg.MaxContentLengthMetrics = minCompressionLen + 150 + cfg.MaxContentLengthMetrics = 1650 return cfg }(), want: wantType{ @@ -1533,38 +1523,68 @@ func Test_pushLogData_ShouldAddHeadersForProfilingData(t *testing.T) { // 10 resources, 10 records, 1Kb max HEC batch: 17 HEC batches func Benchmark_pushLogData_10_10_1024(b *testing.B) { - benchPushLogData(b, 10, 10, 1024) + benchPushLogData(b, 10, 10, 1024, false) } // 10 resources, 10 records, 8Kb max HEC batch: 2 HEC batches func Benchmark_pushLogData_10_10_8K(b *testing.B) { - benchPushLogData(b, 10, 10, 8*1024) + benchPushLogData(b, 10, 10, 8*1024, false) } // 10 resources, 10 records, 1Mb max HEC batch: 1 HEC batch func Benchmark_pushLogData_10_10_2M(b *testing.B) { - benchPushLogData(b, 10, 10, 2*1024*1024) + benchPushLogData(b, 10, 10, 2*1024*1024, false) } // 10 resources, 200 records, 2Mb max HEC batch: 1 HEC batch func Benchmark_pushLogData_10_200_2M(b *testing.B) { - benchPushLogData(b, 10, 200, 2*1024*1024) + benchPushLogData(b, 10, 200, 2*1024*1024, false) } // 100 resources, 200 records, 2Mb max HEC batch: 2 HEC batches func Benchmark_pushLogData_100_200_2M(b *testing.B) { - benchPushLogData(b, 100, 200, 2*1024*1024) + benchPushLogData(b, 100, 200, 2*1024*1024, false) } // 100 resources, 200 records, 5Mb max HEC batch: 1 HEC batches func Benchmark_pushLogData_100_200_5M(b *testing.B) { - benchPushLogData(b, 100, 200, 5*1024*1024) + benchPushLogData(b, 100, 200, 5*1024*1024, false) +} + +// 10 resources, 10 records, 1Kb max HEC batch: 2 HEC batches +func Benchmark_pushLogData_compressed_10_10_1024(b *testing.B) { + benchPushLogData(b, 10, 10, 1024, true) +} + +// 10 resources, 10 records, 8Kb max HEC batch: 1 HEC batche +func Benchmark_pushLogData_compressed_10_10_8K(b *testing.B) { + benchPushLogData(b, 10, 10, 8*1024, true) } -func benchPushLogData(b *testing.B, numResources int, numRecords int, bufSize uint) { +// 10 resources, 10 records, 1Mb max HEC batch: 1 HEC batch +func Benchmark_pushLogData_compressed_10_10_2M(b *testing.B) { + benchPushLogData(b, 10, 10, 2*1024*1024, true) +} + +// 10 resources, 200 records, 2Mb max HEC batch: 1 HEC batch +func Benchmark_pushLogData_compressed_10_200_2M(b *testing.B) { + benchPushLogData(b, 10, 200, 2*1024*1024, true) +} + +// 100 resources, 200 records, 2Mb max HEC batch: 1 HEC batch +func Benchmark_pushLogData_compressed_100_200_2M(b *testing.B) { + benchPushLogData(b, 100, 200, 2*1024*1024, true) +} + +// 100 resources, 200 records, 5Mb max HEC batch: 1 HEC batches +func Benchmark_pushLogData_compressed_100_200_5M(b *testing.B) { + benchPushLogData(b, 100, 200, 5*1024*1024, true) +} + +func benchPushLogData(b *testing.B, numResources int, numRecords int, bufSize uint, compressionEnabled bool) { config := NewFactory().CreateDefaultConfig().(*Config) config.MaxContentLengthLogs = bufSize - config.DisableCompression = true + config.DisableCompression = !compressionEnabled c := newLogsClient(exportertest.NewNopCreateSettings(), config) c.hecWorker = &mockHecWorker{} exp, err := exporterhelper.NewLogsExporter(context.Background(), exportertest.NewNopCreateSettings(), config,