diff --git a/exporter/elasticsearchexporter/integrationtest/datareceiver.go b/exporter/elasticsearchexporter/integrationtest/datareceiver.go index c5dcda0c3cac..b0d3c7d4baa4 100644 --- a/exporter/elasticsearchexporter/integrationtest/datareceiver.go +++ b/exporter/elasticsearchexporter/integrationtest/datareceiver.go @@ -11,52 +11,85 @@ import ( "net/http" "net/url" "testing" - "time" "github.com/elastic/go-docappender/v2/docappendertest" "github.com/gorilla/mux" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent" "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" ) +const ( + // TestLogsIndex is used by the mock ES data receiver to indentify log events. + // Exporter LogsIndex configuration must be configured with TestLogsIndex for + // the data receiver to work properly + TestLogsIndex = "logs-test-idx" + + // TestTracesIndex is used by the mock ES data receiver to indentify trace + // events. Exporter TracesIndex configuration must be configured with + // TestTracesIndex for the data receiver to work properly + TestTracesIndex = "traces-test-idx" +) + type esDataReceiver struct { testbed.DataReceiverBase - receiver receiver.Logs - endpoint string + receiver receiver.Logs + endpoint string + decodeBulkRequest bool + t testing.TB } -func newElasticsearchDataReceiver(t testing.TB) *esDataReceiver { +func newElasticsearchDataReceiver(t testing.TB, decodeBulkRequest bool) *esDataReceiver { return &esDataReceiver{ - DataReceiverBase: testbed.DataReceiverBase{}, - endpoint: fmt.Sprintf("http://%s:%d", testbed.DefaultHost, testutil.GetAvailablePort(t)), + DataReceiverBase: testbed.DataReceiverBase{}, + endpoint: fmt.Sprintf("http://%s:%d", testbed.DefaultHost, testutil.GetAvailablePort(t)), + decodeBulkRequest: decodeBulkRequest, + t: t, } } -func (es *esDataReceiver) Start(_ consumer.Traces, _ consumer.Metrics, lc consumer.Logs) error { +func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consumer.Logs) error { factory := receiver.NewFactory( component.MustNewType("mockelasticsearch"), createDefaultConfig, receiver.WithLogs(createLogsReceiver, component.StabilityLevelDevelopment), + receiver.WithTraces(createTracesReceiver, component.StabilityLevelDevelopment), ) + esURL, err := url.Parse(es.endpoint) + if err != nil { + return fmt.Errorf("invalid ES URL specified %s: %w", es.endpoint, err) + } cfg := factory.CreateDefaultConfig().(*config) - cfg.ESEndpoint = es.endpoint + cfg.ServerConfig.Endpoint = esURL.Host + cfg.DecodeBulkRequests = es.decodeBulkRequest - var err error set := receivertest.NewNopCreateSettings() // Use an actual logger to log errors. set.Logger = zap.Must(zap.NewDevelopment()) - es.receiver, err = factory.CreateLogsReceiver(context.Background(), set, cfg, lc) + logsReceiver, err := factory.CreateLogsReceiver(context.Background(), set, cfg, lc) if err != nil { - return err + return fmt.Errorf("failed to create logs receiver: %w", err) } + tracesReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, tc) + if err != nil { + return fmt.Errorf("failed to create traces receiver: %w", err) + } + + // Since we use SharedComponent both receivers should be same + require.Same(es.t, logsReceiver, tracesReceiver) + es.receiver = logsReceiver + return es.receiver.Start(context.Background(), componenttest.NewNopHost()) } @@ -72,6 +105,8 @@ func (es *esDataReceiver) GenConfigYAMLStr() string { cfgFormat := ` elasticsearch: endpoints: [%s] + logs_index: %s + traces_index: %s flush: interval: 1s sending_queue: @@ -80,7 +115,7 @@ func (es *esDataReceiver) GenConfigYAMLStr() string { enabled: true max_requests: 10000 ` - return fmt.Sprintf(cfgFormat, es.endpoint) + return fmt.Sprintf(cfgFormat, es.endpoint, TestLogsIndex, TestTracesIndex) } func (es *esDataReceiver) ProtocolName() string { @@ -88,12 +123,22 @@ func (es *esDataReceiver) ProtocolName() string { } type config struct { - ESEndpoint string + confighttp.ServerConfig + + // DecodeBulkRequests controls decoding of the bulk request in the mock + // ES receiver. Decoding requests would consume resources and might + // pollute the benchmark results. Note that if decode bulk request is + // set to false then the consumers will not consume any events and the + // bulk request will always return http.StatusOK. + DecodeBulkRequests bool } func createDefaultConfig() component.Config { return &config{ - ESEndpoint: "127.0.0.1:9200", + ServerConfig: confighttp.ServerConfig{ + Endpoint: "127.0.0.1:9200", + }, + DecodeBulkRequests: true, } } @@ -103,20 +148,61 @@ func createLogsReceiver( rawCfg component.Config, next consumer.Logs, ) (receiver.Logs, error) { - cfg := rawCfg.(*config) - return newMockESReceiver(params, cfg, next) + receiver := receivers.GetOrAdd(rawCfg, func() component.Component { + return newMockESReceiver(params, rawCfg.(*config)) + }) + receiver.Unwrap().(*mockESReceiver).logsConsumer = next + return receiver, nil +} + +func createTracesReceiver( + _ context.Context, + params receiver.CreateSettings, + rawCfg component.Config, + next consumer.Traces, +) (receiver.Traces, error) { + receiver := receivers.GetOrAdd(rawCfg, func() component.Component { + return newMockESReceiver(params, rawCfg.(*config)) + }) + receiver.Unwrap().(*mockESReceiver).tracesConsumer = next + return receiver, nil } type mockESReceiver struct { - server *http.Server params receiver.CreateSettings + config *config + + tracesConsumer consumer.Traces + logsConsumer consumer.Logs + + server *http.Server +} + +func newMockESReceiver(params receiver.CreateSettings, cfg *config) receiver.Logs { + return &mockESReceiver{ + params: params, + config: cfg, + } } -func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consumer.Logs) (receiver.Logs, error) { +func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error { + if es.server != nil { + return nil + } + + ln, err := es.config.ToListener(ctx) + if err != nil { + return fmt.Errorf("failed to bind to address %s: %w", es.config.Endpoint, err) + } + + // Ideally bulk request items should be converted to the corresponding event record + // however, since we only assert count for now there is no need to do the actual + // translation. Instead we use a pre-initialized empty logs and traces model to + // reduce allocation impact on tests and benchmarks. emptyLogs := plog.NewLogs() - emptyLogs.ResourceLogs().AppendEmpty(). - ScopeLogs().AppendEmpty(). - LogRecords().AppendEmpty() + emptyLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + emptyTrace := ptrace.NewTraces() + emptyTrace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() r := mux.NewRouter() r.Use(func(next http.Handler) http.Handler { @@ -129,51 +215,54 @@ func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consume fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`) }) r.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) { + if !es.config.DecodeBulkRequests { + w.WriteHeader(http.StatusOK) + return + } _, response := docappendertest.DecodeBulkRequest(r) for _, itemMap := range response.Items { for k, item := range itemMap { - // Ideally bulk request should be converted to log record - // however, since we only assert count for now there is no - // need to do the actual translation. We use a pre-initialized - // empty plog.Logs to reduce allocation impact on tests and - // benchmarks due to this. - if err := next.ConsumeLogs(context.Background(), emptyLogs); err != nil { + var consumeErr error + switch item.Index { + case TestLogsIndex: + consumeErr = es.logsConsumer.ConsumeLogs(context.Background(), emptyLogs) + case TestTracesIndex: + consumeErr = es.tracesConsumer.ConsumeTraces(context.Background(), emptyTrace) + } + if consumeErr != nil { response.HasErrors = true item.Status = http.StatusTooManyRequests item.Error.Type = "simulated_es_error" - item.Error.Reason = err.Error() + item.Error.Reason = consumeErr.Error() } itemMap[k] = item } } - if err := json.NewEncoder(w).Encode(response); err != nil { + if jsonErr := json.NewEncoder(w).Encode(response); jsonErr != nil { w.WriteHeader(http.StatusInternalServerError) } }) - esURL, err := url.Parse(cfg.ESEndpoint) + es.server, err = es.config.ToServer(ctx, host, es.params.TelemetrySettings, r) if err != nil { - return nil, fmt.Errorf("failed to parse Elasticsearch endpoint: %w", err) + return fmt.Errorf("failed to create mock ES server: %w", err) } - return &mockESReceiver{ - server: &http.Server{ - Addr: esURL.Host, - Handler: r, - ReadHeaderTimeout: 20 * time.Second, - }, - params: params, - }, nil -} -func (es *mockESReceiver) Start(_ context.Context, _ component.Host) error { go func() { - if err := es.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - es.params.Logger.Error("failed while running mock ES receiver", zap.Error(err)) + if err := es.server.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) { + es.params.ReportStatus(component.NewFatalErrorEvent(err)) } }() return nil } func (es *mockESReceiver) Shutdown(ctx context.Context) error { + if es.server == nil { + return nil + } return es.server.Shutdown(ctx) } + +// mockESReceiver serves both, traces and logs. Shared component allows for a single +// instance of mockESReceiver to serve all supported event types. +var receivers = sharedcomponent.NewSharedComponents() diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go index 7350c44add69..65c8abbbda1a 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_bench_test.go @@ -5,76 +5,144 @@ package integrationtest import ( "context" + "fmt" "sync/atomic" "testing" "time" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" ) -func BenchmarkLogsExporter(b *testing.B) { - for _, tc := range []struct { - name string - batchSize int - }{ - {name: "small_batch", batchSize: 10}, - {name: "medium_batch", batchSize: 100}, - {name: "large_batch", batchSize: 1000}, - {name: "xlarge_batch", batchSize: 10000}, - } { - b.Run(tc.name, func(b *testing.B) { - benchmarkLogs(b, tc.batchSize) - }) +func BenchmarkExporter(b *testing.B) { + for _, eventType := range []string{"logs", "traces"} { + for _, tc := range []struct { + name string + batchSize int + }{ + {name: "small_batch", batchSize: 10}, + {name: "medium_batch", batchSize: 100}, + {name: "large_batch", batchSize: 1000}, + {name: "xlarge_batch", batchSize: 10000}, + } { + b.Run(fmt.Sprintf("%s/%s", eventType, tc.name), func(b *testing.B) { + switch eventType { + case "logs": + benchmarkLogs(b, tc.batchSize) + case "traces": + benchmarkTraces(b, tc.batchSize) + } + }) + } } } func benchmarkLogs(b *testing.B, batchSize int) { - var generatedCount, observedCount atomic.Uint64 - - receiver := newElasticsearchDataReceiver(b) - factory := elasticsearchexporter.NewFactory() - - cfg := factory.CreateDefaultConfig().(*elasticsearchexporter.Config) - cfg.Endpoints = []string{receiver.endpoint} - cfg.Flush.Interval = 10 * time.Millisecond - cfg.NumWorkers = 1 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - exporter, err := factory.CreateLogsExporter( - context.Background(), - exportertest.NewNopCreateSettings(), - cfg, + runnerCfg := prepareBenchmark(b, batchSize) + exporter, err := runnerCfg.factory.CreateLogsExporter( + ctx, exportertest.NewNopCreateSettings(), runnerCfg.esCfg, ) require.NoError(b, err) + require.NoError(b, exporter.Start(ctx, componenttest.NewNopHost())) - provider := testbed.NewPerfTestDataProvider(testbed.LoadOptions{ItemsPerBatch: batchSize}) - provider.SetLoadGeneratorCounters(&generatedCount) + b.ReportAllocs() + b.ResetTimer() + b.StopTimer() + for i := 0; i < b.N; i++ { + logs, _ := runnerCfg.provider.GenerateLogs() + b.StartTimer() + require.NoError(b, exporter.ConsumeLogs(ctx, logs)) + b.StopTimer() + } + b.ReportMetric( + float64(runnerCfg.generatedCount.Load())/b.Elapsed().Seconds(), + "events/s", + ) + require.NoError(b, exporter.Shutdown(ctx)) +} +func benchmarkTraces(b *testing.B, batchSize int) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - logsConsumer, err := consumer.NewLogs(func(_ context.Context, ld plog.Logs) error { - observedCount.Add(uint64(ld.LogRecordCount())) - return nil - }) + runnerCfg := prepareBenchmark(b, batchSize) + exporter, err := runnerCfg.factory.CreateTracesExporter( + ctx, exportertest.NewNopCreateSettings(), runnerCfg.esCfg, + ) require.NoError(b, err) - - require.NoError(b, receiver.Start(nil, nil, logsConsumer)) - defer func() { require.NoError(b, receiver.Stop()) }() + require.NoError(b, exporter.Start(ctx, componenttest.NewNopHost())) b.ReportAllocs() b.ResetTimer() + b.StopTimer() for i := 0; i < b.N; i++ { - b.StopTimer() - logs, _ := provider.GenerateLogs() + traces, _ := runnerCfg.provider.GenerateTraces() b.StartTimer() - require.NoError(b, exporter.ConsumeLogs(ctx, logs)) + require.NoError(b, exporter.ConsumeTraces(ctx, traces)) + b.StopTimer() } + b.ReportMetric( + float64(runnerCfg.generatedCount.Load())/b.Elapsed().Seconds(), + "events/s", + ) require.NoError(b, exporter.Shutdown(ctx)) - require.Equal(b, generatedCount.Load(), observedCount.Load(), "failed to send all logs to backend") +} + +type benchRunnerCfg struct { + factory exporter.Factory + provider testbed.DataProvider + esCfg *elasticsearchexporter.Config + + generatedCount atomic.Uint64 +} + +func prepareBenchmark( + b *testing.B, + batchSize int, +) *benchRunnerCfg { + b.Helper() + + cfg := &benchRunnerCfg{} + // Benchmarks don't decode the bulk requests to avoid allocations to pollute the results. + receiver := newElasticsearchDataReceiver(b, false /* DecodeBulkRequest */) + cfg.provider = testbed.NewPerfTestDataProvider(testbed.LoadOptions{ItemsPerBatch: batchSize}) + cfg.provider.SetLoadGeneratorCounters(&cfg.generatedCount) + + cfg.factory = elasticsearchexporter.NewFactory() + cfg.esCfg = cfg.factory.CreateDefaultConfig().(*elasticsearchexporter.Config) + cfg.esCfg.Endpoints = []string{receiver.endpoint} + cfg.esCfg.LogsIndex = TestLogsIndex + cfg.esCfg.TracesIndex = TestTracesIndex + cfg.esCfg.Flush.Interval = 10 * time.Millisecond + cfg.esCfg.NumWorkers = 1 + + tc, err := consumer.NewTraces(func(context.Context, ptrace.Traces) error { + return nil + }) + require.NoError(b, err) + mc, err := consumer.NewMetrics(func(context.Context, pmetric.Metrics) error { + return nil + }) + require.NoError(b, err) + lc, err := consumer.NewLogs(func(context.Context, plog.Logs) error { + return nil + }) + require.NoError(b, err) + + require.NoError(b, receiver.Start(tc, mc, lc)) + b.Cleanup(func() { require.NoError(b, receiver.Stop()) }) + + return cfg } diff --git a/exporter/elasticsearchexporter/integrationtest/exporter_test.go b/exporter/elasticsearchexporter/integrationtest/exporter_test.go index ed52e460d692..c0df3d575308 100644 --- a/exporter/elasticsearchexporter/integrationtest/exporter_test.go +++ b/exporter/elasticsearchexporter/integrationtest/exporter_test.go @@ -17,13 +17,6 @@ import ( ) func TestExporter(t *testing.T) { - // NOTE: The data receiver/mock backend will receive and process traces - // as log document. This does not reduce the effectiveness of testing as - // the assertions can still be made without considering for data types - // in the mock backend. Adding support for traces in the mock data - // receiver is possible, however, distinguishing between traces and - // logs is not straightforward after the document has been encoded and - // doesn't add any practical benefits to the test. for _, eventType := range []string{"logs", "traces"} { for _, tc := range []struct { name string @@ -64,7 +57,7 @@ func runner(t *testing.T, eventType string, restartCollector, mockESFailure bool t.Fatalf("failed to create data sender for type: %s", eventType) } - receiver := newElasticsearchDataReceiver(t) + receiver := newElasticsearchDataReceiver(t, true) loadOpts := testbed.LoadOptions{ DataItemsPerSecond: 1_000, ItemsPerBatch: 10, @@ -85,7 +78,7 @@ func runner(t *testing.T, eventType string, restartCollector, mockESFailure bool sender, receiver, collector, - testbed.NewCorrectTestValidator(sender.ProtocolName(), receiver.ProtocolName(), provider), + newCountValidator(t, provider), &testbed.CorrectnessResults{}, testbed.WithDecisionFunc(func() error { if esFailing.Load() { diff --git a/exporter/elasticsearchexporter/integrationtest/go.mod b/exporter/elasticsearchexporter/integrationtest/go.mod index 6acce33715ee..c52d44736028 100644 --- a/exporter/elasticsearchexporter/integrationtest/go.mod +++ b/exporter/elasticsearchexporter/integrationtest/go.mod @@ -8,10 +8,12 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter v0.101.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage/filestorage v0.101.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.101.0 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.101.0 github.com/open-telemetry/opentelemetry-collector-contrib/testbed v0.101.0 github.com/shirou/gopsutil/v3 v3.24.4 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.101.0 + go.opentelemetry.io/collector/config/confighttp v0.101.0 go.opentelemetry.io/collector/confmap v0.101.0 go.opentelemetry.io/collector/confmap/provider/fileprovider v0.101.0 go.opentelemetry.io/collector/consumer v0.101.0 @@ -84,7 +86,6 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/exporter/syslogexporter v0.101.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/exporter/zipkinexporter v0.101.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.101.0 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.101.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.101.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.101.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.101.0 // indirect @@ -126,7 +127,6 @@ require ( go.opentelemetry.io/collector/config/configauth v0.101.0 // indirect go.opentelemetry.io/collector/config/configcompression v1.8.0 // indirect go.opentelemetry.io/collector/config/configgrpc v0.101.0 // indirect - go.opentelemetry.io/collector/config/confighttp v0.101.0 // indirect go.opentelemetry.io/collector/config/confignet v0.101.0 // indirect go.opentelemetry.io/collector/config/configopaque v1.8.0 // indirect go.opentelemetry.io/collector/config/configretry v0.101.0 // indirect diff --git a/exporter/elasticsearchexporter/integrationtest/validator.go b/exporter/elasticsearchexporter/integrationtest/validator.go new file mode 100644 index 000000000000..cb9b411aecd9 --- /dev/null +++ b/exporter/elasticsearchexporter/integrationtest/validator.go @@ -0,0 +1,37 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package integrationtest // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/integrationtest" + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed" +) + +// countValidator provides a testbed validator that only asserts for counts. +type countValidator struct { + t testing.TB + dataProvider testbed.DataProvider +} + +// newCountValidator creates a new instance of the CountValidator. +func newCountValidator(t testing.TB, provider testbed.DataProvider) *countValidator { + return &countValidator{ + t: t, + dataProvider: provider, + } +} + +func (v *countValidator) Validate(tc *testbed.TestCase) { + itemsSent := int64(tc.LoadGenerator.DataItemsSent()) - int64(tc.LoadGenerator.PermanentErrors()) + assert.Equal(v.t, + itemsSent, + int64(tc.MockBackend.DataItemsReceived()), + "Received and sent counters do not match.", + ) +} + +func (v *countValidator) RecordResults(_ *testbed.TestCase) {}