Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][exporter/elasticsearch] Add benchmark for traces consumer #33087

Merged
merged 11 commits into from
May 22, 2024
141 changes: 104 additions & 37 deletions exporter/elasticsearchexporter/integrationtest/datareceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,36 @@ import (
"net/http"
"net/url"
"testing"
"time"

"github.com/elastic/go-docappender/v2/docappendertest"
"github.com/gorilla/mux"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
)

const (
// TestLogsIndex is used by the mock ES data receiver to indentify log events.
// Exporter LogsIndex configuration must be configured with TestLogsIndex for
// the data receiver to work properly
TestLogsIndex = "logs-test-idx"

// TestTracesIndex is used by the mock ES data receiver to indentify trace
// events. Exporter TracesIndex configuration must be configured with
// TestTracesIndex for the data receiver to work properly
TestTracesIndex = "traces-test-idx"
)

type esDataReceiver struct {
testbed.DataReceiverBase
receiver receiver.Logs
Expand All @@ -40,22 +54,30 @@ func newElasticsearchDataReceiver(t testing.TB) *esDataReceiver {
}
}

func (es *esDataReceiver) Start(_ consumer.Traces, _ consumer.Metrics, lc consumer.Logs) error {
func (es *esDataReceiver) Start(tc consumer.Traces, _ consumer.Metrics, lc consumer.Logs) error {
factory := receiver.NewFactory(
component.MustNewType("mockelasticsearch"),
createDefaultConfig,
receiver.WithLogs(createLogsReceiver, component.StabilityLevelDevelopment),
receiver.WithTraces(createTracesReceiver, component.StabilityLevelDevelopment),
)
lahsivjar marked this conversation as resolved.
Show resolved Hide resolved
esURL, err := url.Parse(es.endpoint)
if err != nil {
return fmt.Errorf("invalid ES URL specified %s: %w", es.endpoint, err)
}
cfg := factory.CreateDefaultConfig().(*config)
cfg.ESEndpoint = es.endpoint
cfg.ServerConfig.Endpoint = esURL.Host

var err error
set := receivertest.NewNopCreateSettings()
// Use an actual logger to log errors.
set.Logger = zap.Must(zap.NewDevelopment())
es.receiver, err = factory.CreateLogsReceiver(context.Background(), set, cfg, lc)
if err != nil {
return err
return fmt.Errorf("failed to create logs receiver: %w", err)
}
es.receiver, err = factory.CreateTracesReceiver(context.Background(), set, cfg, tc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is overwriting es.receiver which is a receiver.Logs. Is this intended?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, both will be same since we use SharedComponent

Copy link
Member Author

@lahsivjar lahsivjar May 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might have been a source of confusion for others so I added a comment and assertion (overkill maybe? - feedbacks welcomed) to check if they point to the same object.

if err != nil {
return fmt.Errorf("failed to create traces receiver: %w", err)
}
return es.receiver.Start(context.Background(), componenttest.NewNopHost())
}
Expand All @@ -72,6 +94,8 @@ func (es *esDataReceiver) GenConfigYAMLStr() string {
cfgFormat := `
elasticsearch:
endpoints: [%s]
logs_index: %s
traces_index: %s
flush:
interval: 1s
sending_queue:
Expand All @@ -80,20 +104,22 @@ func (es *esDataReceiver) GenConfigYAMLStr() string {
enabled: true
max_requests: 10000
`
return fmt.Sprintf(cfgFormat, es.endpoint)
return fmt.Sprintf(cfgFormat, es.endpoint, TestLogsIndex, TestTracesIndex)
}

func (es *esDataReceiver) ProtocolName() string {
return "elasticsearch"
}

type config struct {
ESEndpoint string
confighttp.ServerConfig
}

func createDefaultConfig() component.Config {
return &config{
ESEndpoint: "127.0.0.1:9200",
ServerConfig: confighttp.ServerConfig{
Endpoint: "127.0.0.1:9200",
},
}
}

Expand All @@ -103,20 +129,62 @@ func createLogsReceiver(
rawCfg component.Config,
next consumer.Logs,
) (receiver.Logs, error) {
cfg := rawCfg.(*config)
return newMockESReceiver(params, cfg, next)
receiver := receivers.GetOrAdd(rawCfg, func() component.Component {
return newMockESReceiver(params, rawCfg.(*config))
})
receiver.Unwrap().(*mockESReceiver).logsConsumer = next
return receiver, nil
}

func createTracesReceiver(
_ context.Context,
params receiver.CreateSettings,
rawCfg component.Config,
next consumer.Traces,
) (receiver.Traces, error) {
receiver := receivers.GetOrAdd(rawCfg, func() component.Component {
return newMockESReceiver(params, rawCfg.(*config))
})
receiver.Unwrap().(*mockESReceiver).tracesConsumer = next
return receiver, nil
}

type mockESReceiver struct {
server *http.Server
params receiver.CreateSettings
config *config

tracesConsumer consumer.Traces
logsConsumer consumer.Logs

server *http.Server
}

func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consumer.Logs) (receiver.Logs, error) {
func newMockESReceiver(params receiver.CreateSettings, cfg *config) receiver.Logs {
return &mockESReceiver{
params: params,
config: cfg,
}
}

func (es *mockESReceiver) Start(ctx context.Context, host component.Host) error {
if es.server != nil {
return nil
}

ln, err := es.config.ToListener(ctx)
if err != nil {
return fmt.Errorf("failed to bind to address %s: %w", es.config.Endpoint, err)
}

// Ideally bulk request items should be converted to the corresponding event record
// however, since we only assert count for now there is no need to do the actual
// translation. Instead we use a pre-initialized empty logs and traces model to
// reduce allocation impact on tests and benchmarks.
emptyLogs := plog.NewLogs()
emptyLogs.ResourceLogs().AppendEmpty().
ScopeLogs().AppendEmpty().
LogRecords().AppendEmpty()
emptyLogs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
emptyTrace := ptrace.NewTraces()
emptyTrace.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
emptyTrace.SpanCount()
lahsivjar marked this conversation as resolved.
Show resolved Hide resolved

r := mux.NewRouter()
r.Use(func(next http.Handler) http.Handler {
Expand All @@ -132,48 +200,47 @@ func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consume
_, response := docappendertest.DecodeBulkRequest(r)
for _, itemMap := range response.Items {
for k, item := range itemMap {
// Ideally bulk request should be converted to log record
// however, since we only assert count for now there is no
// need to do the actual translation. We use a pre-initialized
// empty plog.Logs to reduce allocation impact on tests and
// benchmarks due to this.
if err := next.ConsumeLogs(context.Background(), emptyLogs); err != nil {
var consumeErr error
switch item.Index {
case TestLogsIndex:
consumeErr = es.logsConsumer.ConsumeLogs(context.Background(), emptyLogs)
case TestTracesIndex:
consumeErr = es.tracesConsumer.ConsumeTraces(context.Background(), emptyTrace)
}
if consumeErr != nil {
response.HasErrors = true
item.Status = http.StatusTooManyRequests
item.Error.Type = "simulated_es_error"
item.Error.Reason = err.Error()
item.Error.Reason = consumeErr.Error()
}
itemMap[k] = item
}
}
if err := json.NewEncoder(w).Encode(response); err != nil {
if jsonErr := json.NewEncoder(w).Encode(response); jsonErr != nil {
w.WriteHeader(http.StatusInternalServerError)
}
})

esURL, err := url.Parse(cfg.ESEndpoint)
es.server, err = es.config.ToServer(ctx, host, es.params.TelemetrySettings, r)
if err != nil {
return nil, fmt.Errorf("failed to parse Elasticsearch endpoint: %w", err)
return fmt.Errorf("failed to create mock ES server: %w", err)
}
return &mockESReceiver{
server: &http.Server{
Addr: esURL.Host,
Handler: r,
ReadHeaderTimeout: 20 * time.Second,
},
params: params,
}, nil
}

func (es *mockESReceiver) Start(_ context.Context, _ component.Host) error {
go func() {
if err := es.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
es.params.Logger.Error("failed while running mock ES receiver", zap.Error(err))
if err := es.server.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) {
es.params.ReportStatus(component.NewFatalErrorEvent(err))
}
}()
return nil
}

func (es *mockESReceiver) Shutdown(ctx context.Context) error {
if es.server == nil {
return nil
}
return es.server.Shutdown(ctx)
}

// mockESReceiver serves both, traces and logs. Shared component allows for a single
// instance of mockESReceiver to serve all supported event types.
var receivers = sharedcomponent.NewSharedComponents()
Loading