Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/elasticsearch] Add reliability tests #31848

Merged
merged 13 commits into from
Apr 15, 2024
Prev Previous commit
Next Next commit
Use available port for mock ES receiver
  • Loading branch information
lahsivjar committed Apr 12, 2024
commit fc3e8cec802e1b10e8196540e41732786997360e
4 changes: 2 additions & 2 deletions exporter/elasticsearchexporter/integrationtest/collector.go
Original file line number Diff line number Diff line change
@@ -31,8 +31,8 @@ import (
"golang.org/x/sync/errgroup"
)

// CreateConfigYaml creates a yaml config for an otel collector for testing.
func CreateConfigYaml(
// createConfigYaml creates a yaml config for an otel collector for testing.
func createConfigYaml(
t testing.TB,
sender testbed.DataSender,
receiver testbed.DataReceiver,
56 changes: 45 additions & 11 deletions exporter/elasticsearchexporter/integrationtest/datareceiver.go
Original file line number Diff line number Diff line change
@@ -6,38 +6,52 @@ package integrationtest
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"testing"

"github.com/elastic/go-docappender/docappendertest"
"github.com/gorilla/mux"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.uber.org/zap"
)

type esDataReceiver struct {
testbed.DataReceiverBase
receiver receiver.Logs
endpoint string
}

func NewElasticsearchDataReceiver() testbed.DataReceiver {
func NewElasticsearchDataReceiver(t testing.TB) testbed.DataReceiver {
return &esDataReceiver{
DataReceiverBase: testbed.DataReceiverBase{},
endpoint: fmt.Sprintf("http://%s:%d", testbed.DefaultHost, testutil.GetAvailablePort(t)),
}
}

func (es *esDataReceiver) Start(_ consumer.Traces, _ consumer.Metrics, lc consumer.Logs) error {
factory := receiver.NewFactory(
component.MustNewType("mockelasticsearch"),
nil,
createDefaultConfig,
receiver.WithLogs(createLogsReceiver, component.StabilityLevelDevelopment),
)
cfg := factory.CreateDefaultConfig().(*config)
cfg.ESEndpoint = es.endpoint

var err error
es.receiver, err = factory.CreateLogsReceiver(context.Background(), receiver.CreateSettings{}, nil, lc)
set := receivertest.NewNopCreateSettings()
// Use an actual logger to log errors.
set.Logger = zap.Must(zap.NewDevelopment())
es.receiver, err = factory.CreateLogsReceiver(context.Background(), set, cfg, lc)
if err != nil {
return err
}
@@ -53,9 +67,9 @@ func (es *esDataReceiver) Stop() error {

func (es *esDataReceiver) GenConfigYAMLStr() string {
// Note that this generates an exporter config for agent.
return `
cfgFormat := `
elasticsearch:
endpoints: [http://127.0.0.1:9200]
endpoints: [%s]
flush:
interval: 1s
sending_queue:
@@ -64,26 +78,39 @@ func (es *esDataReceiver) GenConfigYAMLStr() string {
enabled: true
max_requests: 10000
`
return fmt.Sprintf(cfgFormat, es.endpoint)
}

func (es *esDataReceiver) ProtocolName() string {
return "elasticsearch"
}

type config struct {
ESEndpoint string
}

func createDefaultConfig() component.Config {
return &config{
ESEndpoint: "127.0.0.1:9200",
}
}

func createLogsReceiver(
_ context.Context,
_ receiver.CreateSettings,
_ component.Config,
params receiver.CreateSettings,
rawCfg component.Config,
next consumer.Logs,
) (receiver.Logs, error) {
return newMockESReceiver(next)
cfg := rawCfg.(*config)
return newMockESReceiver(params, cfg, next)
}

type mockESReceiver struct {
server *http.Server
params receiver.CreateSettings
}

func newMockESReceiver(next consumer.Logs) (receiver.Logs, error) {
func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consumer.Logs) (receiver.Logs, error) {
r := mux.NewRouter()
r.Use(mux.MiddlewareFunc(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -118,17 +145,24 @@ func newMockESReceiver(next consumer.Logs) (receiver.Logs, error) {
json.NewEncoder(w).Encode(response)
})

esURL, err := url.Parse(cfg.ESEndpoint)
if err != nil {
return nil, fmt.Errorf("failed to parse Elasticsearch endpoint: %w", err)
}
return &mockESReceiver{
server: &http.Server{
Addr: "127.0.0.1:9200",
Addr: esURL.Host,
Handler: r,
},
params: params,
}, nil
}

func (es *mockESReceiver) Start(_ context.Context, host component.Host) error {
go func() {
es.server.ListenAndServe()
if err := es.server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
es.params.Logger.Error("failed while running mock ES receiver", zap.Error(err))
}
}()
return nil
}
Original file line number Diff line number Diff line change
@@ -42,13 +42,13 @@ func runner(t *testing.T, restartCollector, mockESFailure bool) {
sender := testbed.NewOTLPLogsDataSender(
testbed.DefaultHost, testutil.GetAvailablePort(t),
)
receiver := NewElasticsearchDataReceiver()
receiver := NewElasticsearchDataReceiver(t)
provider := testbed.NewPerfTestDataProvider(testbed.LoadOptions{
DataItemsPerSecond: 10_000,
ItemsPerBatch: 10,
})

cfg := CreateConfigYaml(t, sender, receiver, nil, nil, "logs", getDebugFlag(t))
cfg := createConfigYaml(t, sender, receiver, nil, nil, "logs", getDebugFlag(t))
t.Log("test otel collector configuration:", cfg)
collector := NewRecreatableOtelCol(t)
cleanup, err := collector.PrepareConfig(cfg)
2 changes: 1 addition & 1 deletion exporter/elasticsearchexporter/integrationtest/go.mod
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ require (
go.opentelemetry.io/collector/processor v0.98.0
go.opentelemetry.io/collector/receiver v0.98.0
go.opentelemetry.io/collector/receiver/otlpreceiver v0.98.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.6.0
)

@@ -160,7 +161,6 @@ require (
go.opentelemetry.io/otel/trace v1.25.0 // indirect
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect