Skip to content

Commit

Permalink
[chore][exporter/elasticsearch] Add benchmark for logs consumer (#33035)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.> Add a benchmark for
elasticsearch exporter's `LogsConsumer`.

**Link to tracking Issue:** <Issue number if applicable>
[32504](#32504)

**Testing:** <Describe what testing was performed and which tests were
added.> `cd exporter/elasticsearchexporter/integrationtest && go test
-bench=BenchmarkLogsExporter -run=^$ ./...`

**Documentation:** <Describe the documentation added.> N/A
  • Loading branch information
lahsivjar authored May 16, 2024
1 parent 1863a59 commit 34a5a27
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 8 deletions.
18 changes: 10 additions & 8 deletions exporter/elasticsearchexporter/integrationtest/datareceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type esDataReceiver struct {
endpoint string
}

func newElasticsearchDataReceiver(t testing.TB) testbed.DataReceiver {
func newElasticsearchDataReceiver(t testing.TB) *esDataReceiver {
return &esDataReceiver{
DataReceiverBase: testbed.DataReceiverBase{},
endpoint: fmt.Sprintf("http://%s:%d", testbed.DefaultHost, testutil.GetAvailablePort(t)),
Expand Down Expand Up @@ -113,6 +113,11 @@ type mockESReceiver struct {
}

func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consumer.Logs) (receiver.Logs, error) {
emptyLogs := plog.NewLogs()
emptyLogs.ResourceLogs().AppendEmpty().
ScopeLogs().AppendEmpty().
LogRecords().AppendEmpty()

r := mux.NewRouter()
r.Use(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -129,13 +134,10 @@ func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consume
for k, item := range itemMap {
// Ideally bulk request should be converted to log record
// however, since we only assert count for now there is no
// need to do the actual translation.
logs := plog.NewLogs()
logs.ResourceLogs().AppendEmpty().
ScopeLogs().AppendEmpty().
LogRecords().AppendEmpty()

if err := next.ConsumeLogs(context.Background(), logs); err != nil {
// need to do the actual translation. We use a pre-initialized
// empty plog.Logs to reduce allocation impact on tests and
// benchmarks due to this.
if err := next.ConsumeLogs(context.Background(), emptyLogs); err != nil {
response.HasErrors = true
item.Status = http.StatusTooManyRequests
item.Error.Type = "simulated_es_error"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package integrationtest

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/plog"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
)

func BenchmarkLogsExporter(b *testing.B) {
for _, tc := range []struct {
name string
batchSize int
}{
{name: "small_batch", batchSize: 10},
{name: "medium_batch", batchSize: 100},
{name: "large_batch", batchSize: 1000},
{name: "xlarge_batch", batchSize: 10000},
} {
b.Run(tc.name, func(b *testing.B) {
benchmarkLogs(b, tc.batchSize)
})
}
}

func benchmarkLogs(b *testing.B, batchSize int) {
var generatedCount, observedCount atomic.Uint64

receiver := newElasticsearchDataReceiver(b)
factory := elasticsearchexporter.NewFactory()

cfg := factory.CreateDefaultConfig().(*elasticsearchexporter.Config)
cfg.Endpoints = []string{receiver.endpoint}
cfg.Flush.Interval = 10 * time.Millisecond
cfg.NumWorkers = 1

exporter, err := factory.CreateLogsExporter(
context.Background(),
exportertest.NewNopCreateSettings(),
cfg,
)
require.NoError(b, err)

provider := testbed.NewPerfTestDataProvider(testbed.LoadOptions{ItemsPerBatch: batchSize})
provider.SetLoadGeneratorCounters(&generatedCount)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

logsConsumer, err := consumer.NewLogs(func(_ context.Context, ld plog.Logs) error {
observedCount.Add(uint64(ld.LogRecordCount()))
return nil
})
require.NoError(b, err)

require.NoError(b, receiver.Start(nil, nil, logsConsumer))
defer func() { require.NoError(b, receiver.Stop()) }()

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
logs, _ := provider.GenerateLogs()
b.StartTimer()
require.NoError(b, exporter.ConsumeLogs(ctx, logs))
}
require.NoError(b, exporter.Shutdown(ctx))
require.Equal(b, generatedCount.Load(), observedCount.Load(), "failed to send all logs to backend")
}

0 comments on commit 34a5a27

Please sign in to comment.