From 40cd5f8f16241bf5c5b519d328f988add06e5652 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 16 May 2024 12:35:17 +0100 Subject: [PATCH 1/8] [chore][exporter/elasticsearch] Add benchmark for ConsumeTraces - Updates the mock Elasticsearch data receiver to handle traces. - Adds an extra metric to report events per second. --- .../integrationtest/datareceiver.go | 137 +++++++++++---- .../integrationtest/exporter_bench_test.go | 161 +++++++++++++----- .../integrationtest/exporter_test.go | 9 +- .../integrationtest/validator.go | 36 ++++ 4 files changed, 258 insertions(+), 85 deletions(-) create mode 100644 exporter/elasticsearchexporter/integrationtest/validator.go diff --git a/exporter/elasticsearchexporter/integrationtest/datareceiver.go b/exporter/elasticsearchexporter/integrationtest/datareceiver.go index c5dcda0c3cac..85ba6fa9e84b 100644 --- a/exporter/elasticsearchexporter/integrationtest/datareceiver.go +++ b/exporter/elasticsearchexporter/integrationtest/datareceiver.go @@ -11,22 +11,36 @@ import ( "net/http" "net/url" "testing" - "time" "github.com/elastic/go-docappender/v2/docappendertest" "github.com/gorilla/mux" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" ) +const ( + // TestLogsIndex is used by the mock ES data receiver to indentify log events. + // Exporter LogsIndex configuration must be configured with TestLogsIndex for + // the data receiver to work properly + TestLogsIndex = "logs-test-idx" + + // TestTracesIndex is used by the mock ES data receiver to indentify trace + // events. Exporter TracesIndex configuration must be configured with + // TestTracesIndex for the data receiver to work properly + TestTracesIndex = "traces-test-idx" +) + type esDataReceiver struct { testbed.DataReceiverBase receiver receiver.Logs @@ -40,22 +54,30 @@ func newElasticsearchDataReceiver(t testing.TB) *esDataReceiver { } } -func (es *esDataReceiver) Start(_ consumer.Traces, _ consumer.Metrics, lc consumer.Logs) error { +func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consumer.Logs) error { factory := receiver.NewFactory( component.MustNewType("mockelasticsearch"), createDefaultConfig, receiver.WithLogs(createLogsReceiver, component.StabilityLevelDevelopment), + receiver.WithTraces(createTracesReceiver, component.StabilityLevelDevelopment), ) + esURL, err := url.Parse(es.endpoint) + if err != nil { + return fmt.Errorf("invalid ES URL specified %s: %w", es.endpoint, err) + } cfg := factory.CreateDefaultConfig().(*config) - cfg.ESEndpoint = es.endpoint + cfg.ServerConfig.Endpoint = esURL.Host - var err error set := receivertest.NewNopCreateSettings() // Use an actual logger to log errors. set.Logger = zap.Must(zap.NewDevelopment()) es.receiver, err = factory.CreateLogsReceiver(context.Background(), set, cfg, lc) if err != nil { - return err + return fmt.Errorf("failed to create logs receiver: %w", err) + } + es.receiver, err = factory.CreateTracesReceiver(context.Background(), set, cfg, tc) + if err != nil { + return fmt.Errorf("failed to create traces receiver: %w", err) } return es.receiver.Start(context.Background(), componenttest.NewNopHost()) } @@ -72,6 +94,8 @@ func (es *esDataReceiver) GenConfigYAMLStr() string { cfgFormat := ` elasticsearch: endpoints: [%s] + logs_index: %s + traces_index: %s flush: interval: 1s sending_queue: @@ -80,7 +104,7 @@ func (es *esDataReceiver) GenConfigYAMLStr() string { enabled: true max_requests: 10000 ` - return fmt.Sprintf(cfgFormat, es.endpoint) + return fmt.Sprintf(cfgFormat, es.endpoint, TestLogsIndex, TestTracesIndex) } func (es *esDataReceiver) ProtocolName() string { @@ -88,12 +112,14 @@ func (es *esDataReceiver) ProtocolName() string { } type config struct { - ESEndpoint string + confighttp.ServerConfig } func createDefaultConfig() component.Config { return &config{ - ESEndpoint: "127.0.0.1:9200", + ServerConfig: confighttp.ServerConfig{ + Endpoint: "127.0.0.1:9200", + }, } } @@ -103,20 +129,62 @@ func createLogsReceiver( rawCfg component.Config, next consumer.Logs, ) (receiver.Logs, error) { - cfg := rawCfg.(*config) - return newMockESReceiver(params, cfg, next) + receiver := receivers.GetOrAdd(rawCfg, func() component.Component { + return newMockESReceiver(params, rawCfg.(*config)) + }) + receiver.Unwrap().(*mockESReceiver).logsConsumer = next + return receiver, nil +} + +func createTracesReceiver( + _ context.Context, + params receiver.CreateSettings, + rawCfg component.Config, + next consumer.Traces, +) (receiver.Traces, error) { + receiver := receivers.GetOrAdd(rawCfg, func() component.Component { + return newMockESReceiver(params, rawCfg.(*config)) + }) + receiver.Unwrap().(*mockESReceiver).tracesConsumer = next + return receiver, nil } type mockESReceiver struct { - server *http.Server params receiver.CreateSettings + config *config + + tracesConsumer consumer.Traces + logsConsumer consumer.Logs + + server *http.Server } -func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consumer.Logs) (receiver.Logs, error) { +func newMockESReceiver(params receiver.CreateSettings, cfg *config) receiver.Logs { + return &mockESReceiver{ + params: params, + config: cfg, + } +} + +func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error { + if es.server != nil { + return nil + } + + ln, err := es.config.ToListener(ctx) + if err != nil { + return fmt.Errorf("failed to bind to address %s: %w", es.config.Endpoint, err) + } + + // Ideally bulk request items should be converted to the corresponding event record + // however, since we only assert count for now there is no need to do the actual + // translation. Instead we use a pre-initialized empty logs and traces model to + // reduce allocation impact on tests and benchmarks. emptyLogs := plog.NewLogs() - emptyLogs.ResourceLogs().AppendEmpty(). - ScopeLogs().AppendEmpty(). - LogRecords().AppendEmpty() + emptyLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + emptyTrace := ptrace.NewTraces() + emptyTrace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + emptyTrace.SpanCount() r := mux.NewRouter() r.Use(func(next http.Handler) http.Handler { @@ -132,12 +200,14 @@ func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consume _, response := docappendertest.DecodeBulkRequest(r) for _, itemMap := range response.Items { for k, item := range itemMap { - // Ideally bulk request should be converted to log record - // however, since we only assert count for now there is no - // need to do the actual translation. We use a pre-initialized - // empty plog.Logs to reduce allocation impact on tests and - // benchmarks due to this. - if err := next.ConsumeLogs(context.Background(), emptyLogs); err != nil { + var err error + switch item.Index { + case TestLogsIndex: + err = es.logsConsumer.ConsumeLogs(context.Background(), emptyLogs) + case TestTracesIndex: + err = es.tracesConsumer.ConsumeTraces(context.Background(), emptyTrace) + } + if err != nil { response.HasErrors = true item.Status = http.StatusTooManyRequests item.Error.Type = "simulated_es_error" @@ -151,29 +221,26 @@ func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consume } }) - esURL, err := url.Parse(cfg.ESEndpoint) + es.server, err = es.config.ToServer(ctx, host, es.params.TelemetrySettings, r) if err != nil { - return nil, fmt.Errorf("failed to parse Elasticsearch endpoint: %w", err) + return fmt.Errorf("failed to create mock ES server: %w", err) } - return &mockESReceiver{ - server: &http.Server{ - Addr: esURL.Host, - Handler: r, - ReadHeaderTimeout: 20 * time.Second, - }, - params: params, - }, nil -} -func (es *mockESReceiver) Start(_ context.Context, _ component.Host) error { go func() { - if err := es.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - es.params.Logger.Error("failed while running mock ES receiver", zap.Error(err)) + if err := es.server.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) { + es.params.ReportStatus(component.NewFatalErrorEvent(err)) } }() return nil } func (es *mockESReceiver) Shutdown(ctx context.Context) error { + if es.server == nil { + return nil + } return es.server.Shutdown(ctx) } + +// mockESReceiver serves both, traces and logs. Shared component allows for a single +// instance of mockESReceiver to serve all supported event types. +var receivers = sharedcomponent.NewSharedComponents() diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go index 7350c44add69..26943a538340 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go @@ -5,76 +5,153 @@ package integrationtest import ( "context" + "fmt" "sync/atomic" "testing" "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" + "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/pdata/plog" - - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" ) -func BenchmarkLogsExporter(b *testing.B) { - for _, tc := range []struct { - name string - batchSize int - }{ - {name: "small_batch", batchSize: 10}, - {name: "medium_batch", batchSize: 100}, - {name: "large_batch", batchSize: 1000}, - {name: "xlarge_batch", batchSize: 10000}, - } { - b.Run(tc.name, func(b *testing.B) { - benchmarkLogs(b, tc.batchSize) - }) +func BenchmarkExporter(b *testing.B) { + for _, eventType := range []string{"logs", "traces"} { + for _, tc := range []struct { + name string + batchSize int + }{ + {name: "small_batch", batchSize: 10}, + {name: "medium_batch", batchSize: 100}, + {name: "large_batch", batchSize: 1000}, + {name: "xlarge_batch", batchSize: 10000}, + } { + b.Run(fmt.Sprintf("%s/%s", eventType, tc.name), func(b *testing.B) { + switch eventType { + case "logs": + benchmarkLogs(b, tc.batchSize) + case "traces": + benchmarkTraces(b, tc.batchSize) + } + }) + } } } func benchmarkLogs(b *testing.B, batchSize int) { - var generatedCount, observedCount atomic.Uint64 - - receiver := newElasticsearchDataReceiver(b) - factory := elasticsearchexporter.NewFactory() - - cfg := factory.CreateDefaultConfig().(*elasticsearchexporter.Config) - cfg.Endpoints = []string{receiver.endpoint} - cfg.Flush.Interval = 10 * time.Millisecond - cfg.NumWorkers = 1 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - exporter, err := factory.CreateLogsExporter( - context.Background(), - exportertest.NewNopCreateSettings(), - cfg, + runnerCfg := prepareBenchmark(b, batchSize) + exporter, err := runnerCfg.factory.CreateLogsExporter( + ctx, exportertest.NewNopCreateSettings(), runnerCfg.esCfg, ) require.NoError(b, err) - provider := testbed.NewPerfTestDataProvider(testbed.LoadOptions{ItemsPerBatch: batchSize}) - provider.SetLoadGeneratorCounters(&generatedCount) + b.ReportAllocs() + b.ResetTimer() + b.StopTimer() + for i := 0; i < b.N; i++ { + logs, _ := runnerCfg.provider.GenerateLogs() + b.StartTimer() + exporter.ConsumeLogs(ctx, logs) + b.StopTimer() + } + b.ReportMetric( + float64(runnerCfg.observedCount.Load())/b.Elapsed().Seconds(), + "events/s", + ) + require.NoError(b, exporter.Shutdown(ctx)) + require.Equal(b, + runnerCfg.generatedCount.Load(), + runnerCfg.observedCount.Load(), + "failed to send all logs to backend", + ) +} +func benchmarkTraces(b *testing.B, batchSize int) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logsConsumer, err := consumer.NewLogs(func(_ context.Context, ld plog.Logs) error { - observedCount.Add(uint64(ld.LogRecordCount())) - return nil - }) + runnerCfg := prepareBenchmark(b, batchSize) + exporter, err := runnerCfg.factory.CreateTracesExporter( + ctx, exportertest.NewNopCreateSettings(), runnerCfg.esCfg, + ) require.NoError(b, err) - require.NoError(b, receiver.Start(nil, nil, logsConsumer)) - defer func() { require.NoError(b, receiver.Stop()) }() - b.ReportAllocs() b.ResetTimer() + b.StopTimer() for i := 0; i < b.N; i++ { - b.StopTimer() - logs, _ := provider.GenerateLogs() + traces, _ := runnerCfg.provider.GenerateTraces() b.StartTimer() - require.NoError(b, exporter.ConsumeLogs(ctx, logs)) + exporter.ConsumeTraces(ctx, traces) + b.StopTimer() } + b.ReportMetric( + float64(runnerCfg.observedCount.Load())/b.Elapsed().Seconds(), + "events/s", + ) require.NoError(b, exporter.Shutdown(ctx)) - require.Equal(b, generatedCount.Load(), observedCount.Load(), "failed to send all logs to backend") + require.Equal(b, + runnerCfg.generatedCount.Load(), + runnerCfg.observedCount.Load(), + "failed to send all traces to backend", + ) +} + +type benchRunnerCfg struct { + factory exporter.Factory + provider testbed.DataProvider + esCfg *elasticsearchexporter.Config + + generatedCount atomic.Uint64 + observedCount atomic.Uint64 +} + +func prepareBenchmark( + b *testing.B, + batchSize int, +) *benchRunnerCfg { + b.Helper() + + cfg := &benchRunnerCfg{} + receiver := newElasticsearchDataReceiver(b) + cfg.provider = testbed.NewPerfTestDataProvider(testbed.LoadOptions{ItemsPerBatch: batchSize}) + cfg.provider.SetLoadGeneratorCounters(&cfg.generatedCount) + + cfg.factory = elasticsearchexporter.NewFactory() + cfg.esCfg = cfg.factory.CreateDefaultConfig().(*elasticsearchexporter.Config) + cfg.esCfg.Endpoints = []string{receiver.endpoint} + cfg.esCfg.LogsIndex = TestLogsIndex + cfg.esCfg.TracesIndex = TestTracesIndex + cfg.esCfg.Flush.Interval = 10 * time.Millisecond + cfg.esCfg.NumWorkers = 1 + + tc, err := consumer.NewTraces(func(_ context.Context, td ptrace.Traces) error { + cfg.observedCount.Add(uint64(td.SpanCount())) + return nil + }) + require.NoError(b, err) + mc, err := consumer.NewMetrics(func(_ context.Context, md pmetric.Metrics) error { + cfg.observedCount.Add(uint64(md.MetricCount())) + return nil + }) + require.NoError(b, err) + lc, err := consumer.NewLogs(func(_ context.Context, ld plog.Logs) error { + cfg.observedCount.Add(uint64(ld.LogRecordCount())) + return nil + }) + require.NoError(b, err) + + require.NoError(b, receiver.Start(tc, mc, lc)) + b.Cleanup(func() { require.NoError(b, receiver.Stop()) }) + + return cfg } diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_test.go index ed52e460d692..068c4a458ebc 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_test.go @@ -17,13 +17,6 @@ import ( ) func TestExporter(t *testing.T) { - // NOTE: The data receiver/mock backend will receive and process traces - // as log document. This does not reduce the effectiveness of testing as - // the assertions can still be made without considering for data types - // in the mock backend. Adding support for traces in the mock data - // receiver is possible, however, distinguishing between traces and - // logs is not straightforward after the document has been encoded and - // doesn't add any practical benefits to the test. for _, eventType := range []string{"logs", "traces"} { for _, tc := range []struct { name string @@ -85,7 +78,7 @@ func runner(t *testing.T, eventType string, restartCollector, mockESFailure bool sender, receiver, collector, - testbed.NewCorrectTestValidator(sender.ProtocolName(), receiver.ProtocolName(), provider), + NewCountValidator(t, provider), &testbed.CorrectnessResults{}, testbed.WithDecisionFunc(func() error { if esFailing.Load() { diff --git a/exporter/elasticsearchexporter/integrationtest/validator.go b/exporter/elasticsearchexporter/integrationtest/validator.go new file mode 100644 index 000000000000..759b80af6311 --- /dev/null +++ b/exporter/elasticsearchexporter/integrationtest/validator.go @@ -0,0 +1,36 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package integrationtest + +import ( + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" + "github.com/stretchr/testify/assert" +) + +// CountValidator provides a testbed validator that only asserts for counts. +type CountValidator struct { + t testing.TB + dataProvider testbed.DataProvider +} + +// NewCountValidator creates a new instance of the CountValidator. +func NewCountValidator(t testing.TB, provider testbed.DataProvider) *CountValidator { + return &CountValidator{ + t: t, + dataProvider: provider, + } +} + +func (v *CountValidator) Validate(tc *testbed.TestCase) { + itemsSent := int64(tc.LoadGenerator.DataItemsSent()) - int64(tc.LoadGenerator.PermanentErrors()) + assert.Equal(v.t, + itemsSent, + int64(tc.MockBackend.DataItemsReceived()), + "Received and sent counters do not match.", + ) +} + +func (v *CountValidator) RecordResults(tc *testbed.TestCase) {} From bab6219ccfff385d0705f7473ca18254ba493d24 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 16 May 2024 13:09:38 +0100 Subject: [PATCH 2/8] Fix lint checks --- .../integrationtest/datareceiver.go | 12 ++++++------ .../integrationtest/exporter_bench_test.go | 9 +++++---- .../integrationtest/exporter_test.go | 2 +- .../integrationtest/go.mod | 4 ++-- .../integrationtest/validator.go | 19 ++++++++++--------- 5 files changed, 24 insertions(+), 22 deletions(-) diff --git a/exporter/elasticsearchexporter/integrationtest/datareceiver.go b/exporter/elasticsearchexporter/integrationtest/datareceiver.go index 85ba6fa9e84b..21f716dc57c1 100644 --- a/exporter/elasticsearchexporter/integrationtest/datareceiver.go +++ b/exporter/elasticsearchexporter/integrationtest/datareceiver.go @@ -200,23 +200,23 @@ func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error _, response := docappendertest.DecodeBulkRequest(r) for _, itemMap := range response.Items { for k, item := range itemMap { - var err error + var consumeErr error switch item.Index { case TestLogsIndex: - err = es.logsConsumer.ConsumeLogs(context.Background(), emptyLogs) + consumeErr = es.logsConsumer.ConsumeLogs(context.Background(), emptyLogs) case TestTracesIndex: - err = es.tracesConsumer.ConsumeTraces(context.Background(), emptyTrace) + consumeErr = es.tracesConsumer.ConsumeTraces(context.Background(), emptyTrace) } - if err != nil { + if consumeErr != nil { response.HasErrors = true item.Status = http.StatusTooManyRequests item.Error.Type = "simulated_es_error" - item.Error.Reason = err.Error() + item.Error.Reason = consumeErr.Error() } itemMap[k] = item } } - if err := json.NewEncoder(w).Encode(response); err != nil { + if jsonErr := json.NewEncoder(w).Encode(response); jsonErr != nil { w.WriteHeader(http.StatusInternalServerError) } }) diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go index 26943a538340..1d4e0b4fb46c 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go @@ -10,8 +10,6 @@ import ( "testing" "time" - "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" - "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" @@ -19,6 +17,9 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" + "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" ) func BenchmarkExporter(b *testing.B) { @@ -60,7 +61,7 @@ func benchmarkLogs(b *testing.B, batchSize int) { for i := 0; i < b.N; i++ { logs, _ := runnerCfg.provider.GenerateLogs() b.StartTimer() - exporter.ConsumeLogs(ctx, logs) + require.NoError(b, exporter.ConsumeLogs(ctx, logs)) b.StopTimer() } b.ReportMetric( @@ -91,7 +92,7 @@ func benchmarkTraces(b *testing.B, batchSize int) { for i := 0; i < b.N; i++ { traces, _ := runnerCfg.provider.GenerateTraces() b.StartTimer() - exporter.ConsumeTraces(ctx, traces) + require.NoError(b, exporter.ConsumeTraces(ctx, traces)) b.StopTimer() } b.ReportMetric( diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_test.go index 068c4a458ebc..9c135a5734b5 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_test.go @@ -78,7 +78,7 @@ func runner(t *testing.T, eventType string, restartCollector, mockESFailure bool sender, receiver, collector, - NewCountValidator(t, provider), + newCountValidator(t, provider), &testbed.CorrectnessResults{}, testbed.WithDecisionFunc(func() error { if esFailing.Load() { diff --git a/exporter/elasticsearchexporter/integrationtest/go.mod b/exporter/elasticsearchexporter/integrationtest/go.mod index ec2ec4afb9df..e202e45ca4ba 100644 --- a/exporter/elasticsearchexporter/integrationtest/go.mod +++ b/exporter/elasticsearchexporter/integrationtest/go.mod @@ -8,10 +8,12 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter v0.100.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/filestorage v0.100.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.100.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.100.0 github.com/open-telemetry/opentelemetry-collector-contrib/testbed v0.100.0 github.com/shirou/gopsutil/v3 v3.24.4 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.100.1-0.20240509190532-c555005fcc80 + go.opentelemetry.io/collector/config/confighttp v0.100.1-0.20240509190532-c555005fcc80 go.opentelemetry.io/collector/confmap v0.100.1-0.20240509190532-c555005fcc80 go.opentelemetry.io/collector/confmap/provider/fileprovider v0.100.1-0.20240509190532-c555005fcc80 go.opentelemetry.io/collector/consumer v0.100.1-0.20240509190532-c555005fcc80 @@ -84,7 +86,6 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/exporter/zipkinexporter v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.100.0 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.100.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.100.0 // indirect @@ -126,7 +127,6 @@ require ( go.opentelemetry.io/collector/config/configauth v0.100.1-0.20240509190532-c555005fcc80 // indirect go.opentelemetry.io/collector/config/configcompression v1.7.1-0.20240509190532-c555005fcc80 // indirect go.opentelemetry.io/collector/config/configgrpc v0.100.1-0.20240509190532-c555005fcc80 // indirect - go.opentelemetry.io/collector/config/confighttp v0.100.1-0.20240509190532-c555005fcc80 // indirect go.opentelemetry.io/collector/config/confignet v0.100.1-0.20240509190532-c555005fcc80 // indirect go.opentelemetry.io/collector/config/configopaque v1.7.1-0.20240509190532-c555005fcc80 // indirect go.opentelemetry.io/collector/config/configretry v0.100.1-0.20240509190532-c555005fcc80 // indirect diff --git a/exporter/elasticsearchexporter/integrationtest/validator.go b/exporter/elasticsearchexporter/integrationtest/validator.go index 759b80af6311..cb9b411aecd9 100644 --- a/exporter/elasticsearchexporter/integrationtest/validator.go +++ b/exporter/elasticsearchexporter/integrationtest/validator.go @@ -1,30 +1,31 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package integrationtest +package integrationtest // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/integrationtest" import ( "testing" - "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" "github.com/stretchr/testify/assert" + + "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" ) -// CountValidator provides a testbed validator that only asserts for counts. -type CountValidator struct { +// countValidator provides a testbed validator that only asserts for counts. +type countValidator struct { t testing.TB dataProvider testbed.DataProvider } -// NewCountValidator creates a new instance of the CountValidator. -func NewCountValidator(t testing.TB, provider testbed.DataProvider) *CountValidator { - return &CountValidator{ +// newCountValidator creates a new instance of the CountValidator. +func newCountValidator(t testing.TB, provider testbed.DataProvider) *countValidator { + return &countValidator{ t: t, dataProvider: provider, } } -func (v *CountValidator) Validate(tc *testbed.TestCase) { +func (v *countValidator) Validate(tc *testbed.TestCase) { itemsSent := int64(tc.LoadGenerator.DataItemsSent()) - int64(tc.LoadGenerator.PermanentErrors()) assert.Equal(v.t, itemsSent, @@ -33,4 +34,4 @@ func (v *CountValidator) Validate(tc *testbed.TestCase) { ) } -func (v *CountValidator) RecordResults(tc *testbed.TestCase) {} +func (v *countValidator) RecordResults(_ *testbed.TestCase) {} From e0dadf53b82986b1637867a425c2e2e7d0538a7e Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 16 May 2024 13:49:08 +0100 Subject: [PATCH 3/8] Add comment/assertion for mock es receivers --- .../integrationtest/datareceiver.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/exporter/elasticsearchexporter/integrationtest/datareceiver.go b/exporter/elasticsearchexporter/integrationtest/datareceiver.go index 21f716dc57c1..4c78e4a5cc94 100644 --- a/exporter/elasticsearchexporter/integrationtest/datareceiver.go +++ b/exporter/elasticsearchexporter/integrationtest/datareceiver.go @@ -14,6 +14,7 @@ import ( "github.com/elastic/go-docappender/v2/docappendertest" "github.com/gorilla/mux" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confighttp" @@ -45,12 +46,14 @@ type esDataReceiver struct { testbed.DataReceiverBase receiver receiver.Logs endpoint string + t testing.TB } func newElasticsearchDataReceiver(t testing.TB) *esDataReceiver { return &esDataReceiver{ DataReceiverBase: testbed.DataReceiverBase{}, endpoint: fmt.Sprintf("http://%s:%d", testbed.DefaultHost, testutil.GetAvailablePort(t)), + t: t, } } @@ -71,14 +74,19 @@ func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consu set := receivertest.NewNopCreateSettings() // Use an actual logger to log errors. set.Logger = zap.Must(zap.NewDevelopment()) - es.receiver, err = factory.CreateLogsReceiver(context.Background(), set, cfg, lc) + logsReceiver, err := factory.CreateLogsReceiver(context.Background(), set, cfg, lc) if err != nil { return fmt.Errorf("failed to create logs receiver: %w", err) } - es.receiver, err = factory.CreateTracesReceiver(context.Background(), set, cfg, tc) + tracesReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, tc) if err != nil { return fmt.Errorf("failed to create traces receiver: %w", err) } + + // Since we use SharedComponent both receivers should be same + require.Same(es.t, logsReceiver, tracesReceiver) + es.receiver = logsReceiver + return es.receiver.Start(context.Background(), componenttest.NewNopHost()) } From 8c73a6a48a799e0bc5803384b765ecc9f44d3c52 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 16 May 2024 13:57:42 +0100 Subject: [PATCH 4/8] Remove unnecessary line of code --- exporter/elasticsearchexporter/integrationtest/datareceiver.go | 1 - 1 file changed, 1 deletion(-) diff --git a/exporter/elasticsearchexporter/integrationtest/datareceiver.go b/exporter/elasticsearchexporter/integrationtest/datareceiver.go index 4c78e4a5cc94..b4ad98724aae 100644 --- a/exporter/elasticsearchexporter/integrationtest/datareceiver.go +++ b/exporter/elasticsearchexporter/integrationtest/datareceiver.go @@ -192,7 +192,6 @@ func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error emptyLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() emptyTrace := ptrace.NewTraces() emptyTrace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() - emptyTrace.SpanCount() r := mux.NewRouter() r.Use(func(next http.Handler) http.Handler { From 908846f68b3533230320c555d0bbad098f05a6ca Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Thu, 16 May 2024 17:51:16 +0100 Subject: [PATCH 5/8] Start the exporter --- .../integrationtest/exporter_bench_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go index 1d4e0b4fb46c..98b70b935573 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exportertest" @@ -54,6 +55,7 @@ func benchmarkLogs(b *testing.B, batchSize int) { ctx, exportertest.NewNopCreateSettings(), runnerCfg.esCfg, ) require.NoError(b, err) + require.NoError(b, exporter.Start(ctx, componenttest.NewNopHost())) b.ReportAllocs() b.ResetTimer() @@ -85,6 +87,7 @@ func benchmarkTraces(b *testing.B, batchSize int) { ctx, exportertest.NewNopCreateSettings(), runnerCfg.esCfg, ) require.NoError(b, err) + require.NoError(b, exporter.Start(ctx, componenttest.NewNopHost())) b.ReportAllocs() b.ResetTimer() From ff29acceb98057aeb88e511fb24ac75478ce9214 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Fri, 17 May 2024 15:28:05 +0100 Subject: [PATCH 6/8] Stop decoding bulk requests for benchmarks --- .../integrationtest/datareceiver.go | 29 ++++++++++++++----- .../integrationtest/exporter_bench_test.go | 27 +++++------------ .../integrationtest/exporter_test.go | 2 +- 3 files changed, 30 insertions(+), 28 deletions(-) diff --git a/exporter/elasticsearchexporter/integrationtest/datareceiver.go b/exporter/elasticsearchexporter/integrationtest/datareceiver.go index b4ad98724aae..a56a8044c9ce 100644 --- a/exporter/elasticsearchexporter/integrationtest/datareceiver.go +++ b/exporter/elasticsearchexporter/integrationtest/datareceiver.go @@ -44,16 +44,18 @@ const ( type esDataReceiver struct { testbed.DataReceiverBase - receiver receiver.Logs - endpoint string - t testing.TB + receiver receiver.Logs + endpoint string + decodeBulkRequest bool + t testing.TB } -func newElasticsearchDataReceiver(t testing.TB) *esDataReceiver { +func newElasticsearchDataReceiver(t testing.TB, decodeBulkRequest bool) *esDataReceiver { return &esDataReceiver{ - DataReceiverBase: testbed.DataReceiverBase{}, - endpoint: fmt.Sprintf("http://%s:%d", testbed.DefaultHost, testutil.GetAvailablePort(t)), - t: t, + DataReceiverBase: testbed.DataReceiverBase{}, + endpoint: fmt.Sprintf("http://%s:%d", testbed.DefaultHost, testutil.GetAvailablePort(t)), + decodeBulkRequest: decodeBulkRequest, + t: t, } } @@ -70,6 +72,7 @@ func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consu } cfg := factory.CreateDefaultConfig().(*config) cfg.ServerConfig.Endpoint = esURL.Host + cfg.DecodeBulkRequests = es.decodeBulkRequest set := receivertest.NewNopCreateSettings() // Use an actual logger to log errors. @@ -121,6 +124,13 @@ func (es *esDataReceiver) ProtocolName() string { type config struct { confighttp.ServerConfig + + // DecodeBulkRequests controls decoding of the bulk request in the mock + // ES receiver. Decoding requests would consumer resources and might + // pollute the benchmark results. Note that if decode bulk request is + // set to false then the consumers will not consume any events and the + // bulk request will always returh http.StatusOK. + DecodeBulkRequests bool } func createDefaultConfig() component.Config { @@ -128,6 +138,7 @@ func createDefaultConfig() component.Config { ServerConfig: confighttp.ServerConfig{ Endpoint: "127.0.0.1:9200", }, + DecodeBulkRequests: true, } } @@ -204,6 +215,10 @@ func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`) }) r.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) { + if !es.config.DecodeBulkRequests { + w.WriteHeader(http.StatusOK) + return + } _, response := docappendertest.DecodeBulkRequest(r) for _, itemMap := range response.Items { for k, item := range itemMap { diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go index 98b70b935573..65c8abbbda1a 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go @@ -67,15 +67,10 @@ func benchmarkLogs(b *testing.B, batchSize int) { b.StopTimer() } b.ReportMetric( - float64(runnerCfg.observedCount.Load())/b.Elapsed().Seconds(), + float64(runnerCfg.generatedCount.Load())/b.Elapsed().Seconds(), "events/s", ) require.NoError(b, exporter.Shutdown(ctx)) - require.Equal(b, - runnerCfg.generatedCount.Load(), - runnerCfg.observedCount.Load(), - "failed to send all logs to backend", - ) } func benchmarkTraces(b *testing.B, batchSize int) { @@ -99,15 +94,10 @@ func benchmarkTraces(b *testing.B, batchSize int) { b.StopTimer() } b.ReportMetric( - float64(runnerCfg.observedCount.Load())/b.Elapsed().Seconds(), + float64(runnerCfg.generatedCount.Load())/b.Elapsed().Seconds(), "events/s", ) require.NoError(b, exporter.Shutdown(ctx)) - require.Equal(b, - runnerCfg.generatedCount.Load(), - runnerCfg.observedCount.Load(), - "failed to send all traces to backend", - ) } type benchRunnerCfg struct { @@ -116,7 +106,6 @@ type benchRunnerCfg struct { esCfg *elasticsearchexporter.Config generatedCount atomic.Uint64 - observedCount atomic.Uint64 } func prepareBenchmark( @@ -126,7 +115,8 @@ func prepareBenchmark( b.Helper() cfg := &benchRunnerCfg{} - receiver := newElasticsearchDataReceiver(b) + // Benchmarks don't decode the bulk requests to avoid allocations to pollute the results. + receiver := newElasticsearchDataReceiver(b, false /* DecodeBulkRequest */) cfg.provider = testbed.NewPerfTestDataProvider(testbed.LoadOptions{ItemsPerBatch: batchSize}) cfg.provider.SetLoadGeneratorCounters(&cfg.generatedCount) @@ -138,18 +128,15 @@ func prepareBenchmark( cfg.esCfg.Flush.Interval = 10 * time.Millisecond cfg.esCfg.NumWorkers = 1 - tc, err := consumer.NewTraces(func(_ context.Context, td ptrace.Traces) error { - cfg.observedCount.Add(uint64(td.SpanCount())) + tc, err := consumer.NewTraces(func(context.Context, ptrace.Traces) error { return nil }) require.NoError(b, err) - mc, err := consumer.NewMetrics(func(_ context.Context, md pmetric.Metrics) error { - cfg.observedCount.Add(uint64(md.MetricCount())) + mc, err := consumer.NewMetrics(func(context.Context, pmetric.Metrics) error { return nil }) require.NoError(b, err) - lc, err := consumer.NewLogs(func(_ context.Context, ld plog.Logs) error { - cfg.observedCount.Add(uint64(ld.LogRecordCount())) + lc, err := consumer.NewLogs(func(context.Context, plog.Logs) error { return nil }) require.NoError(b, err) diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_test.go index 9c135a5734b5..c0df3d575308 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_test.go @@ -57,7 +57,7 @@ func runner(t *testing.T, eventType string, restartCollector, mockESFailure bool t.Fatalf("failed to create data sender for type: %s", eventType) } - receiver := newElasticsearchDataReceiver(t) + receiver := newElasticsearchDataReceiver(t, true) loadOpts := testbed.LoadOptions{ DataItemsPerSecond: 1_000, ItemsPerBatch: 10, From 8132c0993b3ad67c04c7d15291fe17161d4bf0ed Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Fri, 17 May 2024 15:34:16 +0100 Subject: [PATCH 7/8] Fix typo --- exporter/elasticsearchexporter/integrationtest/datareceiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/integrationtest/datareceiver.go b/exporter/elasticsearchexporter/integrationtest/datareceiver.go index a56a8044c9ce..0a353b00ff9f 100644 --- a/exporter/elasticsearchexporter/integrationtest/datareceiver.go +++ b/exporter/elasticsearchexporter/integrationtest/datareceiver.go @@ -126,7 +126,7 @@ type config struct { confighttp.ServerConfig // DecodeBulkRequests controls decoding of the bulk request in the mock - // ES receiver. Decoding requests would consumer resources and might + // ES receiver. Decoding requests would consume resources and might // pollute the benchmark results. Note that if decode bulk request is // set to false then the consumers will not consume any events and the // bulk request will always returh http.StatusOK. From 4dd2b74015a0d3665694bcbb396fb17d9d6b6c73 Mon Sep 17 00:00:00 2001 From: Vishal Raj Date: Fri, 17 May 2024 15:34:55 +0100 Subject: [PATCH 8/8] Fix typo take 2 --- exporter/elasticsearchexporter/integrationtest/datareceiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/elasticsearchexporter/integrationtest/datareceiver.go b/exporter/elasticsearchexporter/integrationtest/datareceiver.go index 0a353b00ff9f..b0d3c7d4baa4 100644 --- a/exporter/elasticsearchexporter/integrationtest/datareceiver.go +++ b/exporter/elasticsearchexporter/integrationtest/datareceiver.go @@ -129,7 +129,7 @@ type config struct { // ES receiver. Decoding requests would consume resources and might // pollute the benchmark results. Note that if decode bulk request is // set to false then the consumers will not consume any events and the - // bulk request will always returh http.StatusOK. + // bulk request will always return http.StatusOK. DecodeBulkRequests bool }