From 34a5a2735921e0c0b348cd246df9569c3b8abf1e Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 16 May 2024 12:18:09 +0100 Subject: [PATCH] [chore][exporter/elasticsearch] Add benchmark for logs consumer (#33035) **Description:** Add a benchmark for elasticsearch exporter's `LogsConsumer`. **Link to tracking Issue:** [32504](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32504) **Testing:** `cd exporter/elasticsearchexporter/integrationtest && go test -bench=BenchmarkLogsExporter -run=^$ ./...` **Documentation:** N/A --- .../integrationtest/datareceiver.go | 18 +++-- .../integrationtest/exporter_bench_test.go | 80 +++++++++++++++++++ 2 files changed, 90 insertions(+), 8 deletions(-) create mode 100644 exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go diff --git a/exporter/elasticsearchexporter/integrationtest/datareceiver.go b/exporter/elasticsearchexporter/integrationtest/datareceiver.go index c35267948f3f..c5dcda0c3cac 100644 --- a/exporter/elasticsearchexporter/integrationtest/datareceiver.go +++ b/exporter/elasticsearchexporter/integrationtest/datareceiver.go @@ -33,7 +33,7 @@ type esDataReceiver struct { endpoint string } -func newElasticsearchDataReceiver(t testing.TB) testbed.DataReceiver { +func newElasticsearchDataReceiver(t testing.TB) *esDataReceiver { return &esDataReceiver{ DataReceiverBase: testbed.DataReceiverBase{}, endpoint: fmt.Sprintf("http://%s:%d", testbed.DefaultHost, testutil.GetAvailablePort(t)), @@ -113,6 +113,11 @@ type mockESReceiver struct { } func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consumer.Logs) (receiver.Logs, error) { + emptyLogs := plog.NewLogs() + emptyLogs.ResourceLogs().AppendEmpty(). + ScopeLogs().AppendEmpty(). + LogRecords().AppendEmpty() + r := mux.NewRouter() r.Use(func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -129,13 +134,10 @@ func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consume for k, item := range itemMap { // Ideally bulk request should be converted to log record // however, since we only assert count for now there is no - // need to do the actual translation. - logs := plog.NewLogs() - logs.ResourceLogs().AppendEmpty(). - ScopeLogs().AppendEmpty(). - LogRecords().AppendEmpty() - - if err := next.ConsumeLogs(context.Background(), logs); err != nil { + // need to do the actual translation. We use a pre-initialized + // empty plog.Logs to reduce allocation impact on tests and + // benchmarks due to this. + if err := next.ConsumeLogs(context.Background(), emptyLogs); err != nil { response.HasErrors = true item.Status = http.StatusTooManyRequests item.Error.Type = "simulated_es_error" diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go new file mode 100644 index 000000000000..7350c44add69 --- /dev/null +++ b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go @@ -0,0 +1,80 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package integrationtest + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" + "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" +) + +func BenchmarkLogsExporter(b *testing.B) { + for _, tc := range []struct { + name string + batchSize int + }{ + {name: "small_batch", batchSize: 10}, + {name: "medium_batch", batchSize: 100}, + {name: "large_batch", batchSize: 1000}, + {name: "xlarge_batch", batchSize: 10000}, + } { + b.Run(tc.name, func(b *testing.B) { + benchmarkLogs(b, tc.batchSize) + }) + } +} + +func benchmarkLogs(b *testing.B, batchSize int) { + var generatedCount, observedCount atomic.Uint64 + + receiver := newElasticsearchDataReceiver(b) + factory := elasticsearchexporter.NewFactory() + + cfg := factory.CreateDefaultConfig().(*elasticsearchexporter.Config) + cfg.Endpoints = []string{receiver.endpoint} + cfg.Flush.Interval = 10 * time.Millisecond + cfg.NumWorkers = 1 + + exporter, err := factory.CreateLogsExporter( + context.Background(), + exportertest.NewNopCreateSettings(), + cfg, + ) + require.NoError(b, err) + + provider := testbed.NewPerfTestDataProvider(testbed.LoadOptions{ItemsPerBatch: batchSize}) + provider.SetLoadGeneratorCounters(&generatedCount) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + logsConsumer, err := consumer.NewLogs(func(_ context.Context, ld plog.Logs) error { + observedCount.Add(uint64(ld.LogRecordCount())) + return nil + }) + require.NoError(b, err) + + require.NoError(b, receiver.Start(nil, nil, logsConsumer)) + defer func() { require.NoError(b, receiver.Stop()) }() + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + logs, _ := provider.GenerateLogs() + b.StartTimer() + require.NoError(b, exporter.ConsumeLogs(ctx, logs)) + } + require.NoError(b, exporter.Shutdown(ctx)) + require.Equal(b, generatedCount.Load(), observedCount.Load(), "failed to send all logs to backend") +}