Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][exporter/elasticsearch] Add benchmark for traces consumer #33087

Merged
merged 11 commits into from
May 22, 2024
175 changes: 132 additions & 43 deletions exporter/elasticsearchexporter/integrationtest/datareceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,52 +11,85 @@ import (
"net/http"
"net/url"
"testing"
"time"

"github.com/elastic/go-docappender/v2/docappendertest"
"github.com/gorilla/mux"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
)

const (
// TestLogsIndex is used by the mock ES data receiver to indentify log events.
// Exporter LogsIndex configuration must be configured with TestLogsIndex for
// the data receiver to work properly
TestLogsIndex = "logs-test-idx"

// TestTracesIndex is used by the mock ES data receiver to indentify trace
// events. Exporter TracesIndex configuration must be configured with
// TestTracesIndex for the data receiver to work properly
TestTracesIndex = "traces-test-idx"
)

type esDataReceiver struct {
testbed.DataReceiverBase
receiver receiver.Logs
endpoint string
receiver receiver.Logs
endpoint string
decodeBulkRequest bool
t testing.TB
}

func newElasticsearchDataReceiver(t testing.TB) *esDataReceiver {
func newElasticsearchDataReceiver(t testing.TB, decodeBulkRequest bool) *esDataReceiver {
return &esDataReceiver{
DataReceiverBase: testbed.DataReceiverBase{},
endpoint: fmt.Sprintf("http://%s:%d", testbed.DefaultHost, testutil.GetAvailablePort(t)),
DataReceiverBase: testbed.DataReceiverBase{},
endpoint: fmt.Sprintf("http://%s:%d", testbed.DefaultHost, testutil.GetAvailablePort(t)),
decodeBulkRequest: decodeBulkRequest,
t: t,
}
}

func (es *esDataReceiver) Start(_ consumer.Traces, _ consumer.Metrics, lc consumer.Logs) error {
func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consumer.Logs) error {
factory := receiver.NewFactory(
component.MustNewType("mockelasticsearch"),
createDefaultConfig,
receiver.WithLogs(createLogsReceiver, component.StabilityLevelDevelopment),
receiver.WithTraces(createTracesReceiver, component.StabilityLevelDevelopment),
)
lahsivjar marked this conversation as resolved.
Show resolved Hide resolved
esURL, err := url.Parse(es.endpoint)
if err != nil {
return fmt.Errorf("invalid ES URL specified %s: %w", es.endpoint, err)
}
cfg := factory.CreateDefaultConfig().(*config)
cfg.ESEndpoint = es.endpoint
cfg.ServerConfig.Endpoint = esURL.Host
cfg.DecodeBulkRequests = es.decodeBulkRequest

var err error
set := receivertest.NewNopCreateSettings()
// Use an actual logger to log errors.
set.Logger = zap.Must(zap.NewDevelopment())
es.receiver, err = factory.CreateLogsReceiver(context.Background(), set, cfg, lc)
logsReceiver, err := factory.CreateLogsReceiver(context.Background(), set, cfg, lc)
if err != nil {
return err
return fmt.Errorf("failed to create logs receiver: %w", err)
}
tracesReceiver, err := factory.CreateTracesReceiver(context.Background(), set, cfg, tc)
if err != nil {
return fmt.Errorf("failed to create traces receiver: %w", err)
}

// Since we use SharedComponent both receivers should be same
require.Same(es.t, logsReceiver, tracesReceiver)
es.receiver = logsReceiver

return es.receiver.Start(context.Background(), componenttest.NewNopHost())
}

Expand All @@ -72,6 +105,8 @@ func (es *esDataReceiver) GenConfigYAMLStr() string {
cfgFormat := `
elasticsearch:
endpoints: [%s]
logs_index: %s
traces_index: %s
flush:
interval: 1s
sending_queue:
Expand All @@ -80,20 +115,30 @@ func (es *esDataReceiver) GenConfigYAMLStr() string {
enabled: true
max_requests: 10000
`
return fmt.Sprintf(cfgFormat, es.endpoint)
return fmt.Sprintf(cfgFormat, es.endpoint, TestLogsIndex, TestTracesIndex)
}

func (es *esDataReceiver) ProtocolName() string {
return "elasticsearch"
}

type config struct {
ESEndpoint string
confighttp.ServerConfig

// DecodeBulkRequests controls decoding of the bulk request in the mock
// ES receiver. Decoding requests would consume resources and might
// pollute the benchmark results. Note that if decode bulk request is
// set to false then the consumers will not consume any events and the
// bulk request will always return http.StatusOK.
DecodeBulkRequests bool
}

func createDefaultConfig() component.Config {
return &config{
ESEndpoint: "127.0.0.1:9200",
ServerConfig: confighttp.ServerConfig{
Endpoint: "127.0.0.1:9200",
},
DecodeBulkRequests: true,
}
}

Expand All @@ -103,20 +148,61 @@ func createLogsReceiver(
rawCfg component.Config,
next consumer.Logs,
) (receiver.Logs, error) {
cfg := rawCfg.(*config)
return newMockESReceiver(params, cfg, next)
receiver := receivers.GetOrAdd(rawCfg, func() component.Component {
return newMockESReceiver(params, rawCfg.(*config))
})
receiver.Unwrap().(*mockESReceiver).logsConsumer = next
return receiver, nil
}

func createTracesReceiver(
_ context.Context,
params receiver.CreateSettings,
rawCfg component.Config,
next consumer.Traces,
) (receiver.Traces, error) {
receiver := receivers.GetOrAdd(rawCfg, func() component.Component {
return newMockESReceiver(params, rawCfg.(*config))
})
receiver.Unwrap().(*mockESReceiver).tracesConsumer = next
return receiver, nil
}

type mockESReceiver struct {
server *http.Server
params receiver.CreateSettings
config *config

tracesConsumer consumer.Traces
logsConsumer consumer.Logs

server *http.Server
}

func newMockESReceiver(params receiver.CreateSettings, cfg *config) receiver.Logs {
return &mockESReceiver{
params: params,
config: cfg,
}
}

func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consumer.Logs) (receiver.Logs, error) {
func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error {
if es.server != nil {
return nil
}

ln, err := es.config.ToListener(ctx)
if err != nil {
return fmt.Errorf("failed to bind to address %s: %w", es.config.Endpoint, err)
}

// Ideally bulk request items should be converted to the corresponding event record
// however, since we only assert count for now there is no need to do the actual
// translation. Instead we use a pre-initialized empty logs and traces model to
// reduce allocation impact on tests and benchmarks.
emptyLogs := plog.NewLogs()
emptyLogs.ResourceLogs().AppendEmpty().
ScopeLogs().AppendEmpty().
LogRecords().AppendEmpty()
emptyLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
emptyTrace := ptrace.NewTraces()
emptyTrace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()

r := mux.NewRouter()
r.Use(func(next http.Handler) http.Handler {
Expand All @@ -129,51 +215,54 @@ func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consume
fmt.Fprintln(w, `{"version":{"number":"1.2.3"}}`)
})
r.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) {
if !es.config.DecodeBulkRequests {
w.WriteHeader(http.StatusOK)
return
}
_, response := docappendertest.DecodeBulkRequest(r)
for _, itemMap := range response.Items {
for k, item := range itemMap {
// Ideally bulk request should be converted to log record
// however, since we only assert count for now there is no
// need to do the actual translation. We use a pre-initialized
// empty plog.Logs to reduce allocation impact on tests and
// benchmarks due to this.
if err := next.ConsumeLogs(context.Background(), emptyLogs); err != nil {
var consumeErr error
switch item.Index {
case TestLogsIndex:
consumeErr = es.logsConsumer.ConsumeLogs(context.Background(), emptyLogs)
case TestTracesIndex:
consumeErr = es.tracesConsumer.ConsumeTraces(context.Background(), emptyTrace)
}
if consumeErr != nil {
response.HasErrors = true
item.Status = http.StatusTooManyRequests
item.Error.Type = "simulated_es_error"
item.Error.Reason = err.Error()
item.Error.Reason = consumeErr.Error()
}
itemMap[k] = item
}
}
if err := json.NewEncoder(w).Encode(response); err != nil {
if jsonErr := json.NewEncoder(w).Encode(response); jsonErr != nil {
w.WriteHeader(http.StatusInternalServerError)
}
})

esURL, err := url.Parse(cfg.ESEndpoint)
es.server, err = es.config.ToServer(ctx, host, es.params.TelemetrySettings, r)
if err != nil {
return nil, fmt.Errorf("failed to parse Elasticsearch endpoint: %w", err)
return fmt.Errorf("failed to create mock ES server: %w", err)
}
return &mockESReceiver{
server: &http.Server{
Addr: esURL.Host,
Handler: r,
ReadHeaderTimeout: 20 * time.Second,
},
params: params,
}, nil
}

func (es *mockESReceiver) Start(_ context.Context, _ component.Host) error {
go func() {
if err := es.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
es.params.Logger.Error("failed while running mock ES receiver", zap.Error(err))
if err := es.server.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
es.params.ReportStatus(component.NewFatalErrorEvent(err))
}
}()
return nil
}

func (es *mockESReceiver) Shutdown(ctx context.Context) error {
if es.server == nil {
return nil
}
return es.server.Shutdown(ctx)
}

// mockESReceiver serves both, traces and logs. Shared component allows for a single
// instance of mockESReceiver to serve all supported event types.
var receivers = sharedcomponent.NewSharedComponents()
Loading