diff --git a/.chloggen/splunkhec-exporter-multimetric-batch.yaml b/.chloggen/splunkhec-exporter-multimetric-batch.yaml new file mode 100644 index 000000000000..6f7cea1e252e --- /dev/null +++ b/.chloggen/splunkhec-exporter-multimetric-batch.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. +# If your change doesn't affect end users, such as a test fix or a tooling change, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: splunkhecexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Apply multi-metric merge at the level of the whole batch rather than within events emitted for one metric. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [23365] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index 99e8fb01208a..d14f725931e9 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -97,6 +97,9 @@ func (c *client) pushMetricsData( } } + if c.config.UseMultiMetricFormat { + return c.pushMultiMetricsDataInBatches(ctx, md, localHeaders) + } return c.pushMetricsDataInBatches(ctx, md, localHeaders) } @@ -255,15 +258,6 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS // Parsing metric record to Splunk event. events := mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger) tempBuf := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthMetrics)) - if c.config.UseMultiMetricFormat { - merged, err := mergeEventsToMultiMetricFormat(events) - if err != nil { - permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( - "error merging events: %w", err))) - } else { - events = merged - } - } for _, event := range events { // JSON encoding event and writing to buffer. b, err := marshalEvent(event, c.config.MaxEventSize, jsonStream) @@ -298,6 +292,43 @@ func (c *client) fillMetricsBuffer(metrics pmetric.Metrics, buf buffer, is iterS return iterState{done: true}, permanentErrors } +func (c *client) fillMetricsBufferMultiMetrics(events []*splunk.Event, buf buffer, is iterState) (iterState, []error) { + var permanentErrors []error + jsonStream := jsonStreamPool.Get().(*jsoniter.Stream) + defer jsonStreamPool.Put(jsonStream) + + for i := is.record; i < len(events); i++ { + event := events[i] + // JSON encoding event and writing to buffer. + b, jsonErr := marshalEvent(event, c.config.MaxEventSize, jsonStream) + if jsonErr != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf("dropped metric event: %v, error: %w", event, jsonErr))) + continue + } + _, err := buf.Write(b) + if errors.Is(err, errOverCapacity) { + if !buf.Empty() { + return iterState{ + record: i, + done: false, + }, permanentErrors + } + permanentErrors = append(permanentErrors, consumererror.NewPermanent( + fmt.Errorf("dropped metric event: error: event size %d bytes larger than configured max"+ + " content length %d bytes", len(b), c.config.MaxContentLengthMetrics))) + return iterState{ + record: i + 1, + done: i+1 != len(events), + }, permanentErrors + } else if err != nil { + permanentErrors = append(permanentErrors, consumererror.NewPermanent(fmt.Errorf( + "error writing the event: %w", err))) + } + } + + return iterState{done: true}, permanentErrors +} + func (c *client) fillTracesBuffer(traces ptrace.Traces, buf buffer, is iterState) (iterState, []error) { var permanentErrors []error jsonStream := jsonStreamPool.Get().(*jsoniter.Stream) @@ -345,6 +376,51 @@ func (c *client) fillTracesBuffer(traces ptrace.Traces, buf buffer, is iterState return iterState{done: true}, permanentErrors } +// pushMultiMetricsDataInBatches sends batches of Splunk multi-metric events in JSON format. +// The batch content length is restricted to MaxContentLengthMetrics. +// md metrics are parsed to Splunk events. +func (c *client) pushMultiMetricsDataInBatches(ctx context.Context, md pmetric.Metrics, headers map[string]string) error { + buf := c.bufferPool.get() + defer c.bufferPool.put(buf) + is := iterState{} + + var permanentErrors []error + var events []*splunk.Event + for i := 0; i < md.ResourceMetrics().Len(); i++ { + rm := md.ResourceMetrics().At(i) + for j := 0; j < rm.ScopeMetrics().Len(); j++ { + sm := rm.ScopeMetrics().At(j) + for k := 0; k < sm.Metrics().Len(); k++ { + metric := sm.Metrics().At(k) + + // Parsing metric record to Splunk event. + events = append(events, mapMetricToSplunkEvent(rm.Resource(), metric, c.config, c.logger)...) + } + } + } + + merged, err := mergeEventsToMultiMetricFormat(events) + if err != nil { + return consumererror.NewPermanent(fmt.Errorf("error merging events: %w", err)) + } + + for !is.done { + buf.Reset() + + latestIterState, batchPermanentErrors := c.fillMetricsBufferMultiMetrics(merged, buf, is) + permanentErrors = append(permanentErrors, batchPermanentErrors...) + if !buf.Empty() { + if err := c.postEvents(ctx, buf, headers); err != nil { + return consumererror.NewMetrics(err, md) + } + } + + is = latestIterState + } + + return multierr.Combine(permanentErrors...) +} + // pushMetricsDataInBatches sends batches of Splunk events in JSON format. // The batch content length is restricted to MaxContentLengthMetrics. // md metrics are parsed to Splunk events. @@ -363,6 +439,7 @@ func (c *client) pushMetricsDataInBatches(ctx context.Context, md pmetric.Metric return consumererror.NewMetrics(err, subMetrics(md, is)) } } + is = latestIterState } diff --git a/exporter/splunkhecexporter/client_test.go b/exporter/splunkhecexporter/client_test.go index 708cea958542..0f9ee2bde5a9 100644 --- a/exporter/splunkhecexporter/client_test.go +++ b/exporter/splunkhecexporter/client_test.go @@ -85,7 +85,7 @@ func createMetricsData(resourcesNum, dataPointsNum int) pmetric.Metrics { tsUnix := time.Unix(int64(count), int64(count)*time.Millisecond.Nanoseconds()) ilm := rm.ScopeMetrics().AppendEmpty() metric := ilm.Metrics().AppendEmpty() - metric.SetName("gauge_double_with_dims") + metric.SetName(fmt.Sprintf("gauge_double_with_dims_%d", j)) doublePt := metric.SetEmptyGauge().DataPoints().AppendEmpty() doublePt.SetTimestamp(pcommon.NewTimestampFromTime(tsUnix)) doublePt.SetDoubleValue(doubleVal) @@ -207,7 +207,7 @@ func (c *CapturingData) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.WriteHeader(c.statusCode) } -func runMetricsExport(cfg *Config, metrics pmetric.Metrics, expectedBatchesNum int, t *testing.T) ([]receivedRequest, error) { +func runMetricsExport(cfg *Config, metrics pmetric.Metrics, expectedBatchesNum int, useMultiMetricsFormat bool, t *testing.T) ([]receivedRequest, error) { listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { panic(err) @@ -216,6 +216,7 @@ func runMetricsExport(cfg *Config, metrics pmetric.Metrics, expectedBatchesNum i factory := NewFactory() cfg.HTTPClientSettings.Endpoint = "http://" + listener.Addr().String() + "/services/collector" cfg.Token = "1234-1234" + cfg.UseMultiMetricFormat = useMultiMetricsFormat rr := make(chan receivedRequest) capture := CapturingData{testing: t, receivedRequest: rr, statusCode: 200, checkCompression: !cfg.DisableCompression} @@ -249,7 +250,7 @@ func runMetricsExport(cfg *Config, metrics pmetric.Metrics, expectedBatchesNum i return requests, nil } case <-time.After(5 * time.Second): - if len(requests) == 0 { + if len(requests) == 0 && expectedBatchesNum != 0 { err = errors.New("timeout") } return requests, err @@ -955,7 +956,7 @@ func TestReceiveMetricEvent(t *testing.T) { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.DisableCompression = true - actual, err := runMetricsExport(cfg, metrics, 1, t) + actual, err := runMetricsExport(cfg, metrics, 1, false, t) assert.Len(t, actual, 1) assert.NoError(t, err) @@ -993,7 +994,7 @@ func TestReceiveMetrics(t *testing.T) { md := createMetricsData(1, 3) cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.DisableCompression = true - actual, err := runMetricsExport(cfg, md, 1, t) + actual, err := runMetricsExport(cfg, md, 1, false, t) assert.Len(t, actual, 1) assert.NoError(t, err) msg := string(actual[0].body) @@ -1031,6 +1032,27 @@ func TestReceiveBatchedMetrics(t *testing.T) { numBatches: 1, }, }, + { + name: "one metric event too large to fit in a batch", + metrics: func() pmetric.Metrics { + m := pmetric.NewMetrics() + metric := m.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + g := metric.SetEmptyGauge() + g.DataPoints().AppendEmpty().SetIntValue(32) + metric.SetName(repeatableString(256)) + return m + }(), + conf: func() *Config { + cfg := NewFactory().CreateDefaultConfig().(*Config) + cfg.MaxContentLengthMetrics = 20 + cfg.DisableCompression = true + return cfg + }(), + want: wantType{ + batches: [][]string{}, + numBatches: 0, + }, + }, { name: "1 metric event per payload (configured max content length is same as event size)", metrics: createMetricsData(1, 4), @@ -1130,55 +1152,65 @@ func TestReceiveBatchedMetrics(t *testing.T) { } for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - got, err := runMetricsExport(test.conf, test.metrics, test.want.numBatches, t) + testFn := func(multiMetric bool) func(*testing.T) { + return func(t *testing.T) { + got, err := runMetricsExport(test.conf, test.metrics, test.want.numBatches, multiMetric, t) - require.NoError(t, err) - require.Len(t, got, test.want.numBatches) - - for i, batch := range test.want.batches { - require.NotZero(t, got[i]) - if test.conf.MaxContentLengthMetrics != 0 { - require.True(t, int(test.conf.MaxContentLengthMetrics) > len(got[i].body)) - } - if test.want.compressed { - validateCompressedContains(t, batch, got[i].body) - } else { - found := false + require.NoError(t, err) + require.Len(t, got, test.want.numBatches) - for _, expected := range batch { - if strings.Contains(string(got[i].body), expected) { - found = true - break + for i, batch := range test.want.batches { + require.NotZero(t, got[i]) + if test.conf.MaxContentLengthMetrics != 0 { + require.True(t, int(test.conf.MaxContentLengthMetrics) > len(got[i].body)) + } + if test.want.compressed { + validateCompressedContains(t, batch, got[i].body) + } else { + found := false + + for _, expected := range batch { + if strings.Contains(string(got[i].body), expected) { + found = true + break + } } + assert.True(t, found, "%s did not match any expected batch", string(got[i].body)) } - assert.True(t, found, "%s did not match any expected batch", string(got[i].body)) } - } - // ensure all events are sent out - for i := 1; i < test.metrics.MetricCount(); i++ { - eventFound := false - for _, batch := range got { - batchBody := batch.body - if test.want.compressed { - z, err := gzip.NewReader(bytes.NewReader(batchBody)) - require.NoError(t, err) - batchBody, err = io.ReadAll(z) - z.Close() - require.NoError(t, err) - } - time := float64(i) + 0.001*float64(i) - if strings.Contains(string(batchBody), fmt.Sprintf(`"time":%g`, time)) { - if eventFound { - t.Errorf("metric event %d found in multiple batches", i) + if test.want.numBatches == 0 { + assert.Equal(t, 0, len(got)) + return + } + + // ensure all events are sent out + for i := 1; i < test.metrics.MetricCount(); i++ { + eventFound := false + for _, batch := range got { + batchBody := batch.body + if test.want.compressed { + z, err := gzip.NewReader(bytes.NewReader(batchBody)) + require.NoError(t, err) + batchBody, err = io.ReadAll(z) + z.Close() + require.NoError(t, err) + } + time := float64(i) + 0.001*float64(i) + if strings.Contains(string(batchBody), fmt.Sprintf(`"time":%g`, time)) { + if eventFound { + t.Errorf("metric event %d found in multiple batches", i) + } + eventFound = true } - eventFound = true } + assert.Truef(t, eventFound, "metric event %d not found in any batch", i) } - assert.Truef(t, eventFound, "metric event %d not found in any batch", i) } - }) + } + t.Run(test.name, testFn(false)) + t.Run(test.name+"_MultiMetric", testFn(true)) + } } @@ -1198,6 +1230,23 @@ func Test_PushMetricsData_Histogram_NaN_Sum(t *testing.T) { assert.NoError(t, permanentErrors) } +func Test_PushMetricsData_Histogram_NaN_Sum_MultiMetric(t *testing.T) { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + ilm := rm.ScopeMetrics().AppendEmpty() + histogram := ilm.Metrics().AppendEmpty() + histogram.SetName("histogram_with_empty_sum") + dp := histogram.SetEmptyHistogram().DataPoints().AppendEmpty() + dp.SetSum(math.NaN()) + cfg := NewFactory().CreateDefaultConfig().(*Config) + cfg.UseMultiMetricFormat = true + c := newMetricsClient(exportertest.NewNopCreateSettings(), cfg) + c.hecWorker = &mockHecWorker{} + + permanentErrors := c.pushMetricsDataInBatches(context.Background(), metrics, map[string]string{}) + assert.NoError(t, permanentErrors) +} + func Test_PushMetricsData_Summary_NaN_Sum(t *testing.T) { metrics := pmetric.NewMetrics() rm := metrics.ResourceMetrics().AppendEmpty() @@ -1217,7 +1266,7 @@ func Test_PushMetricsData_Summary_NaN_Sum(t *testing.T) { func TestReceiveMetricsWithCompression(t *testing.T) { cfg := NewFactory().CreateDefaultConfig().(*Config) cfg.MaxContentLengthMetrics = 1800 - request, err := runMetricsExport(cfg, createMetricsData(1, 100), 1, t) + request, err := runMetricsExport(cfg, createMetricsData(1, 100), 1, false, t) assert.NoError(t, err) assert.Equal(t, "gzip", request[0].headers.Get("Content-Encoding")) assert.NotEqual(t, "", request) @@ -1280,7 +1329,13 @@ func TestInvalidLogs(t *testing.T) { func TestInvalidMetrics(t *testing.T) { cfg := NewFactory().CreateDefaultConfig().(*Config) - _, err := runMetricsExport(cfg, pmetric.NewMetrics(), 1, t) + _, err := runMetricsExport(cfg, pmetric.NewMetrics(), 1, false, t) + assert.Error(t, err) +} + +func TestInvalidMetricsMultiMetric(t *testing.T) { + cfg := NewFactory().CreateDefaultConfig().(*Config) + _, err := runMetricsExport(cfg, pmetric.NewMetrics(), 1, true, t) assert.Error(t, err) } @@ -1609,6 +1664,148 @@ func benchPushLogData(b *testing.B, numResources int, numRecords int, bufSize ui } } +// 10 resources, 10 records, 1Kb max HEC batch: 17 HEC batches +func Benchmark_pushMetricData_10_10_1024(b *testing.B) { + benchPushMetricData(b, 10, 10, 1024, false, false) +} + +// 10 resources, 10 records, 8Kb max HEC batch: 2 HEC batches +func Benchmark_pushMetricData_10_10_8K(b *testing.B) { + benchPushMetricData(b, 10, 10, 8*1024, false, false) +} + +// 10 resources, 10 records, 1Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_10_10_2M(b *testing.B) { + benchPushMetricData(b, 10, 10, 2*1024*1024, false, false) +} + +// 10 resources, 200 records, 2Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_10_200_2M(b *testing.B) { + benchPushMetricData(b, 10, 200, 2*1024*1024, false, false) +} + +// 100 resources, 200 records, 2Mb max HEC batch: 2 HEC batches +func Benchmark_pushMetricData_100_200_2M(b *testing.B) { + benchPushMetricData(b, 100, 200, 2*1024*1024, false, false) +} + +// 100 resources, 200 records, 5Mb max HEC batch: 1 HEC batches +func Benchmark_pushMetricData_100_200_5M(b *testing.B) { + benchPushMetricData(b, 100, 200, 5*1024*1024, false, false) +} + +// 10 resources, 10 records, 1Kb max HEC batch: 2 HEC batches +func Benchmark_pushMetricData_compressed_10_10_1024(b *testing.B) { + benchPushMetricData(b, 10, 10, 1024, true, false) +} + +// 10 resources, 10 records, 8Kb max HEC batch: 1 HEC batche +func Benchmark_pushMetricData_compressed_10_10_8K(b *testing.B) { + benchPushMetricData(b, 10, 10, 8*1024, true, false) +} + +// 10 resources, 10 records, 1Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_compressed_10_10_2M(b *testing.B) { + benchPushMetricData(b, 10, 10, 2*1024*1024, true, false) +} + +// 10 resources, 200 records, 2Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_compressed_10_200_2M(b *testing.B) { + benchPushMetricData(b, 10, 200, 2*1024*1024, true, false) +} + +// 100 resources, 200 records, 2Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_compressed_100_200_2M(b *testing.B) { + benchPushMetricData(b, 100, 200, 2*1024*1024, true, false) +} + +// 100 resources, 200 records, 5Mb max HEC batch: 1 HEC batches +func Benchmark_pushMetricData_compressed_100_200_5M(b *testing.B) { + benchPushMetricData(b, 100, 200, 5*1024*1024, true, false) +} + +// 10 resources, 10 records, 1Kb max HEC batch: 17 HEC batches +func Benchmark_pushMetricData_10_10_1024_MultiMetric(b *testing.B) { + benchPushMetricData(b, 10, 10, 1024, false, true) +} + +// 10 resources, 10 records, 8Kb max HEC batch: 2 HEC batches +func Benchmark_pushMetricData_10_10_8K_MultiMetric(b *testing.B) { + benchPushMetricData(b, 10, 10, 8*1024, false, true) +} + +// 10 resources, 10 records, 1Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_10_10_2M_MultiMetric(b *testing.B) { + benchPushMetricData(b, 10, 10, 2*1024*1024, false, true) +} + +// 10 resources, 200 records, 2Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_10_200_2M_MultiMetric(b *testing.B) { + benchPushMetricData(b, 10, 200, 2*1024*1024, false, true) +} + +// 100 resources, 200 records, 2Mb max HEC batch: 2 HEC batches +func Benchmark_pushMetricData_100_200_2M_MultiMetric(b *testing.B) { + benchPushMetricData(b, 100, 200, 2*1024*1024, false, true) +} + +// 100 resources, 200 records, 5Mb max HEC batch: 1 HEC batches +func Benchmark_pushMetricData_100_200_5M_MultiMetric(b *testing.B) { + benchPushMetricData(b, 100, 200, 5*1024*1024, false, true) +} + +// 10 resources, 10 records, 1Kb max HEC batch: 2 HEC batches +func Benchmark_pushMetricData_compressed_10_10_1024_MultiMetric(b *testing.B) { + benchPushMetricData(b, 10, 10, 1024, true, true) +} + +// 10 resources, 10 records, 8Kb max HEC batch: 1 HEC batche +func Benchmark_pushMetricData_compressed_10_10_8K_MultiMetric(b *testing.B) { + benchPushMetricData(b, 10, 10, 8*1024, true, true) +} + +// 10 resources, 10 records, 1Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_compressed_10_10_2M_MultiMetric(b *testing.B) { + benchPushMetricData(b, 10, 10, 2*1024*1024, true, true) +} + +// 10 resources, 200 records, 2Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_compressed_10_200_2M_MultiMetric(b *testing.B) { + benchPushMetricData(b, 10, 200, 2*1024*1024, true, true) +} + +// 100 resources, 200 records, 2Mb max HEC batch: 1 HEC batch +func Benchmark_pushMetricData_compressed_100_200_2M_MultiMetric(b *testing.B) { + benchPushMetricData(b, 100, 200, 2*1024*1024, true, true) +} + +// 100 resources, 200 records, 5Mb max HEC batch: 1 HEC batches +func Benchmark_pushMetricData_compressed_100_200_5M_MultiMetric(b *testing.B) { + benchPushMetricData(b, 100, 200, 5*1024*1024, true, true) +} + +func benchPushMetricData(b *testing.B, numResources int, numRecords int, bufSize uint, compressionEnabled bool, useMultiMetricFormat bool) { + config := NewFactory().CreateDefaultConfig().(*Config) + config.MaxContentLengthMetrics = bufSize + config.DisableCompression = !compressionEnabled + config.UseMultiMetricFormat = useMultiMetricFormat + c := newLogsClient(exportertest.NewNopCreateSettings(), config) + c.hecWorker = &mockHecWorker{} + exp, err := exporterhelper.NewMetricsExporter(context.Background(), exportertest.NewNopCreateSettings(), config, + c.pushMetricsData) + require.NoError(b, err) + + metrics := createMetricsData(numResources, numRecords) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + err := exp.ConsumeMetrics(context.Background(), metrics) + require.NoError(b, err) + } +} + func BenchmarkConsumeLogsRejected(b *testing.B) { config := NewFactory().CreateDefaultConfig().(*Config) config.DisableCompression = true diff --git a/exporter/splunkhecexporter/metricdata_to_splunk.go b/exporter/splunkhecexporter/metricdata_to_splunk.go index 09b557107dcf..01bf25e9b274 100644 --- a/exporter/splunkhecexporter/metricdata_to_splunk.go +++ b/exporter/splunkhecexporter/metricdata_to_splunk.go @@ -278,10 +278,11 @@ func mergeEventsToMultiMetricFormat(events []*splunk.Event) ([]*splunk.Event, er hashes := map[uint32]*splunk.Event{} hasher := fnv.New32a() var merged []*splunk.Event + marshaler := jsoniter.ConfigCompatibleWithStandardLibrary for _, e := range events { cloned := copyEventWithoutValues(e) - marshaler := jsoniter.ConfigCompatibleWithStandardLibrary + data, err := marshaler.Marshal(cloned) if err != nil { return nil, err diff --git a/exporter/splunkhecexporter/testdata/hec_metric_event.json b/exporter/splunkhecexporter/testdata/hec_metric_event.json index 20858fa9fe33..d9b0a0652a6c 100644 --- a/exporter/splunkhecexporter/testdata/hec_metric_event.json +++ b/exporter/splunkhecexporter/testdata/hec_metric_event.json @@ -5,7 +5,7 @@ "k/n1": "vn1", "k/r0": "vr0", "k/r1": "vr1", - "metric_name:gauge_double_with_dims": 1234.5678, + "metric_name:gauge_double_with_dims_0": 1234.5678, "metric_type": "Gauge", "k0": "v0", "k1": "v1",