From de3cf99129ba3ccb1d7286542c4c7d8a5c354217 Mon Sep 17 00:00:00 2001 From: Afzal Ansari Date: Mon, 31 Jul 2023 02:27:08 +0530 Subject: [PATCH] Replace es-spanstore tracing instrumentation with OpenTelemetry (#4596) ## Which problem is this PR solving? * Part of #3381 * This PR adds OTEL Provider to es-spanstore component ## Short description of the changes - Replaces OT tracer with OTEL tracer to support jtracer pkg --------- Signed-off-by: Afzal Ansari Signed-off-by: Afzal <94980910+afzalbin64@users.noreply.github.com> Co-authored-by: Afzal <94980910+afzalbin64@users.noreply.github.com> --- cmd/all-in-one/main.go | 2 + cmd/query/main.go | 2 + plugin/storage/es/factory.go | 34 ++++---- plugin/storage/es/spanstore/reader.go | 60 ++++++++------ plugin/storage/es/spanstore/reader_test.go | 81 +++++++++++++------ .../storage/integration/elasticsearch_test.go | 18 +++++ 6 files changed, 135 insertions(+), 62 deletions(-) diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 34d86f534ce..89297fdde2e 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -24,6 +24,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel" _ "go.uber.org/automaxprocs" "go.uber.org/zap" @@ -104,6 +105,7 @@ by default uses only in-memory database.`, if err != nil { logger.Fatal("Failed to initialize tracer", zap.Error(err)) } + otel.SetTracerProvider(tracer.OTEL) storageFactory.InitFromViper(v, logger) if err := storageFactory.Initialize(metricsFactory, logger); err != nil { diff --git a/cmd/query/main.go b/cmd/query/main.go index aadb7915787..0442dcd06a3 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -23,6 +23,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel" _ "go.uber.org/automaxprocs" "go.uber.org/zap" @@ -76,6 +77,7 @@ func main() { if err != nil { logger.Fatal("Failed to create tracer:", zap.Error(err)) } + otel.SetTracerProvider(jtracer.OTEL) queryOpts, err := new(app.QueryOptions).InitFromViper(v, logger) if err != nil { logger.Fatal("Failed to configure query service", zap.Error(err)) diff --git a/plugin/storage/es/factory.go b/plugin/storage/es/factory.go index d0f78c294f3..bad23bed879 100644 --- a/plugin/storage/es/factory.go +++ b/plugin/storage/es/factory.go @@ -21,6 +21,8 @@ import ( "io" "github.com/spf13/viper" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/es" @@ -50,6 +52,7 @@ type Factory struct { metricsFactory metrics.Factory logger *zap.Logger + tracer trace.TracerProvider newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) @@ -64,6 +67,7 @@ func NewFactory() *Factory { return &Factory{ Options: NewOptions(primaryNamespace, archiveNamespace), newClientFn: config.NewClient, + tracer: otel.GetTracerProvider(), } } @@ -108,17 +112,17 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) // CreateSpanReader implements storage.Factory func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { - return createSpanReader(f.metricsFactory, f.logger, f.primaryClient, f.primaryConfig, false) + return createSpanReader(f.primaryClient, f.primaryConfig, false, f.metricsFactory, f.logger, f.tracer) } // CreateSpanWriter implements storage.Factory func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { - return createSpanWriter(f.metricsFactory, f.logger, f.primaryClient, f.primaryConfig, false) + return createSpanWriter(f.primaryClient, f.primaryConfig, false, f.metricsFactory, f.logger) } // CreateDependencyReader implements storage.Factory func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { - return createDependencyReader(f.logger, f.primaryClient, f.primaryConfig) + return createDependencyReader(f.primaryClient, f.primaryConfig, f.logger) } // CreateArchiveSpanReader implements storage.ArchiveFactory @@ -126,7 +130,7 @@ func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) { if !f.archiveConfig.Enabled { return nil, nil } - return createSpanReader(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true) + return createSpanReader(f.archiveClient, f.archiveConfig, true, f.metricsFactory, f.logger, f.tracer) } // CreateArchiveSpanWriter implements storage.ArchiveFactory @@ -134,23 +138,22 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { if !f.archiveConfig.Enabled { return nil, nil } - return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true) + return createSpanWriter(f.archiveClient, f.archiveConfig, true, f.metricsFactory, f.logger) } func createSpanReader( - mFactory metrics.Factory, - logger *zap.Logger, client es.Client, cfg *config.Configuration, archive bool, + mFactory metrics.Factory, + logger *zap.Logger, + tp trace.TracerProvider, ) (spanstore.Reader, error) { if cfg.UseILM && !cfg.UseReadWriteAliases { return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping") } return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{ Client: client, - Logger: logger, - MetricsFactory: mFactory, MaxDocCount: cfg.MaxDocCount, MaxSpanAge: cfg.MaxSpanAge, IndexPrefix: cfg.IndexPrefix, @@ -162,15 +165,18 @@ func createSpanReader( UseReadWriteAliases: cfg.UseReadWriteAliases, Archive: archive, RemoteReadClusters: cfg.RemoteReadClusters, + Logger: logger, + MetricsFactory: mFactory, + Tracer: tp.Tracer("esSpanStore.SpanReader"), }), nil } func createSpanWriter( - mFactory metrics.Factory, - logger *zap.Logger, client es.Client, cfg *config.Configuration, archive bool, + mFactory metrics.Factory, + logger *zap.Logger, ) (spanstore.Writer, error) { var tags []string var err error @@ -197,8 +203,6 @@ func createSpanWriter( } writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{ Client: client, - Logger: logger, - MetricsFactory: mFactory, IndexPrefix: cfg.IndexPrefix, SpanIndexDateLayout: cfg.IndexDateLayoutSpans, ServiceIndexDateLayout: cfg.IndexDateLayoutServices, @@ -207,6 +211,8 @@ func createSpanWriter( TagDotReplacement: cfg.Tags.DotReplacement, Archive: archive, UseReadWriteAliases: cfg.UseReadWriteAliases, + Logger: logger, + MetricsFactory: mFactory, }) // Creating a template here would conflict with the one created for ILM resulting to no index rollover @@ -220,9 +226,9 @@ func createSpanWriter( } func createDependencyReader( - logger *zap.Logger, client es.Client, cfg *config.Configuration, + logger *zap.Logger, ) (dependencystore.Reader, error) { reader := esDepStore.NewDependencyStore(esDepStore.DependencyStoreParams{ Client: client, diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 7165d9d1e38..d21ba25c28c 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -24,9 +24,9 @@ import ( "time" "github.com/olivere/elastic" - "github.com/opentracing/opentracing-go" - ottag "github.com/opentracing/opentracing-go/ext" - otlog "github.com/opentracing/opentracing-go/log" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/model" @@ -93,7 +93,6 @@ var ( // SpanReader can query for and load traces from ElasticSearch type SpanReader struct { client es.Client - logger *zap.Logger // The age of the oldest service/operation we will look for. Because indices in ElasticSearch are by day, // this will be rounded down to UTC 00:00 of that day. maxSpanAge time.Duration @@ -109,15 +108,15 @@ type SpanReader struct { sourceFn sourceFn maxDocCount int useReadWriteAliases bool + logger *zap.Logger + tracer trace.Tracer } // SpanReaderParams holds constructor params for NewSpanReader type SpanReaderParams struct { Client es.Client - Logger *zap.Logger MaxSpanAge time.Duration MaxDocCount int - MetricsFactory metrics.Factory IndexPrefix string SpanIndexDateLayout string ServiceIndexDateLayout string @@ -127,6 +126,9 @@ type SpanReaderParams struct { Archive bool UseReadWriteAliases bool RemoteReadClusters []string + MetricsFactory metrics.Factory + Logger *zap.Logger + Tracer trace.Tracer } // NewSpanReader returns a new SpanReader with a metrics. @@ -139,7 +141,6 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { } return &SpanReader{ client: p.Client, - logger: p.Logger, maxSpanAge: maxSpanAge, serviceOperationStorage: NewServiceOperationStorage(p.Client, p.Logger, 0), // the decorator takes care of metrics spanIndexPrefix: indexNames(p.IndexPrefix, spanIndex), @@ -153,6 +154,8 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { sourceFn: getSourceFn(p.Archive, p.MaxDocCount), maxDocCount: p.MaxDocCount, useReadWriteAliases: p.UseReadWriteAliases, + logger: p.Logger, + tracer: p.Tracer, } } @@ -238,8 +241,8 @@ func indexNames(prefix, index string) string { // GetTrace takes a traceID and returns a Trace associated with that traceID func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "GetTrace") - defer span.Finish() + ctx, span := s.tracer.Start(ctx, "GetTrace") + defer span.End() currentTime := time.Now() traces, err := s.multiRead(ctx, []model.TraceID{traceID}, currentTime.Add(-s.maxSpanAge), currentTime) if err != nil { @@ -283,8 +286,8 @@ func (s *SpanReader) unmarshalJSONSpan(esSpanRaw *elastic.SearchHit) (*dbmodel.S // GetServices returns all services traced by Jaeger, ordered by frequency func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "GetServices") - defer span.Finish() + ctx, span := s.tracer.Start(ctx, "GetService") + defer span.End() currentTime := time.Now() jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.serviceIndexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime, s.serviceIndexRolloverFrequency) return s.serviceOperationStorage.getServices(ctx, jaegerIndices, s.maxDocCount) @@ -295,8 +298,8 @@ func (s *SpanReader) GetOperations( ctx context.Context, query spanstore.OperationQueryParameters, ) ([]spanstore.Operation, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "GetOperations") - defer span.Finish() + ctx, span := s.tracer.Start(ctx, "GetOperations") + defer span.End() currentTime := time.Now() jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.serviceIndexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime, s.serviceIndexRolloverFrequency) operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName, s.maxDocCount) @@ -329,8 +332,8 @@ func bucketToStringArray(buckets []*elastic.AggregationBucketKeyItem) ([]string, // FindTraces retrieves traces that match the traceQuery func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]*model.Trace, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "FindTraces") - defer span.Finish() + ctx, span := s.tracer.Start(ctx, "FindTraces") + defer span.End() uniqueTraceIDs, err := s.FindTraceIDs(ctx, traceQuery) if err != nil { @@ -341,8 +344,8 @@ func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.Trace // FindTraceIDs retrieves traces IDs that match the traceQuery func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]model.TraceID, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "FindTraceIDs") - defer span.Finish() + ctx, span := s.tracer.Start(ctx, "FindTraceIDs") + defer span.End() if err := validateQuery(traceQuery); err != nil { return nil, err @@ -360,9 +363,16 @@ func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.Tra } func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, startTime, endTime time.Time) ([]*model.Trace, error) { - childSpan, _ := opentracing.StartSpanFromContext(ctx, "multiRead") - childSpan.LogFields(otlog.Object("trace_ids", traceIDs)) - defer childSpan.Finish() + ctx, childSpan := s.tracer.Start(ctx, "multiRead") + defer childSpan.End() + + if childSpan.IsRecording() { + tracesIDs := make([]string, len(traceIDs)) + for i, traceID := range traceIDs { + tracesIDs[i] = traceID.String() + } + childSpan.SetAttributes(attribute.Key("trace_ids").StringSlice(tracesIDs)) + } if len(traceIDs) == 0 { return []*model.Trace{}, nil @@ -503,8 +513,8 @@ func validateQuery(p *spanstore.TraceQueryParameters) error { } func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]string, error) { - childSpan, _ := opentracing.StartSpanFromContext(ctx, "findTraceIDs") - defer childSpan.Finish() + ctx, childSpan := s.tracer.Start(ctx, "findTraceIDs") + defer childSpan.End() // Below is the JSON body to our HTTP GET request to ElasticSearch. This function creates this. // { // "size": 0, @@ -686,7 +696,7 @@ func (s *SpanReader) buildObjectQuery(field string, k string, v string) elastic. return elastic.NewBoolQuery().Must(keyQuery) } -func logErrorToSpan(span opentracing.Span, err error) { - ottag.Error.Set(span, true) - span.LogFields(otlog.Error(err)) +func logErrorToSpan(span trace.Span, err error) { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) } diff --git a/plugin/storage/es/spanstore/reader_test.go b/plugin/storage/es/spanstore/reader_test.go index ea7f0073f50..b13f70b23bd 100644 --- a/plugin/storage/es/spanstore/reader_test.go +++ b/plugin/storage/es/spanstore/reader_test.go @@ -29,6 +29,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/internal/metricstest" @@ -83,22 +86,39 @@ var exampleESSpan = []byte( }`) type spanReaderTest struct { - client *mocks.Client - logger *zap.Logger - logBuffer *testutils.Buffer - reader *SpanReader + client *mocks.Client + logger *zap.Logger + logBuffer *testutils.Buffer + traceBuffer *tracetest.InMemoryExporter + reader *SpanReader +} + +func tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func() error) { + exporter := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithSyncer(exporter), + ) + closer := func() error { + return tp.Shutdown(context.Background()) + } + return tp, exporter, closer } func withSpanReader(fn func(r *spanReaderTest)) { client := &mocks.Client{} + tracer, exp, closer := tracerProvider() + defer closer() logger, logBuffer := testutils.NewLogger() r := &spanReaderTest{ - client: client, - logger: logger, - logBuffer: logBuffer, + client: client, + logger: logger, + logBuffer: logBuffer, + traceBuffer: exp, reader: NewSpanReader(SpanReaderParams{ Client: client, Logger: zap.NewNop(), + Tracer: tracer.Tracer("test"), MaxSpanAge: 0, IndexPrefix: "", TagDotReplacement: "@", @@ -110,14 +130,18 @@ func withSpanReader(fn func(r *spanReaderTest)) { func withArchiveSpanReader(readAlias bool, fn func(r *spanReaderTest)) { client := &mocks.Client{} + tracer, exp, closer := tracerProvider() + defer closer() logger, logBuffer := testutils.NewLogger() r := &spanReaderTest{ - client: client, - logger: logger, - logBuffer: logBuffer, + client: client, + logger: logger, + logBuffer: logBuffer, + traceBuffer: exp, reader: NewSpanReader(SpanReaderParams{ Client: client, Logger: zap.NewNop(), + Tracer: tracer.Tracer("test"), MaxSpanAge: 0, IndexPrefix: "", TagDotReplacement: "@", @@ -163,13 +187,15 @@ func TestNewSpanReader(t *testing.T) { func TestSpanReaderIndices(t *testing.T) { client := &mocks.Client{} - logger, _ := testutils.NewLogger() - metricsFactory := metricstest.NewFactory(0) date := time.Date(2019, 10, 10, 5, 0, 0, 0, time.UTC) spanDataLayout := "2006-01-02-15" serviceDataLayout := "2006-01-02" spanDataLayoutFormat := date.UTC().Format(spanDataLayout) serviceDataLayoutFormat := date.UTC().Format(serviceDataLayout) + metricsFactory := metricstest.NewFactory(0) + logger, _ := testutils.NewLogger() + tracer, _, closer := tracerProvider() + defer closer() testCases := []struct { indices []string @@ -177,56 +203,48 @@ func TestSpanReaderIndices(t *testing.T) { }{ { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: false, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, }, indices: []string{spanIndex + spanDataLayoutFormat, serviceIndex + serviceDataLayoutFormat}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", UseReadWriteAliases: true, }, indices: []string{spanIndex + "read", serviceIndex + "read"}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", Archive: false, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + spanDataLayoutFormat, "foo:" + indexPrefixSeparator + serviceIndex + serviceDataLayoutFormat}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", UseReadWriteAliases: true, }, indices: []string{"foo:-" + spanIndex + "read", "foo:-" + serviceIndex + "read"}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: true, }, indices: []string{spanIndex + archiveIndexSuffix, serviceIndex + archiveIndexSuffix}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", Archive: true, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndex + archiveIndexSuffix}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true, }, indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix, "foo:" + indexPrefixSeparator + serviceIndex + archiveReadIndexSuffix}, }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, SpanIndexDateLayout: spanDataLayout, ServiceIndexDateLayout: serviceDataLayout, }, indices: []string{ @@ -240,7 +258,6 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ @@ -254,7 +271,6 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: false, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ @@ -268,7 +284,6 @@ func TestSpanReaderIndices(t *testing.T) { }, { params: SpanReaderParams{ - Client: client, Logger: logger, MetricsFactory: metricsFactory, IndexPrefix: "", Archive: true, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}, }, indices: []string{ @@ -282,6 +297,10 @@ func TestSpanReaderIndices(t *testing.T) { }, } for _, testCase := range testCases { + testCase.params.Client = client + testCase.params.Logger = logger + testCase.params.MetricsFactory = metricsFactory + testCase.params.Tracer = tracer.Tracer("test") r := NewSpanReader(testCase.params) actualSpan := r.timeRangeIndices(r.spanIndexPrefix, r.spanIndexDateLayout, date, date, -1*time.Hour) @@ -307,6 +326,7 @@ func TestSpanReader_GetTrace(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.NoError(t, err) require.NotNil(t, trace) @@ -383,6 +403,7 @@ func TestSpanReader_multiRead_followUp_query(t *testing.T) { }, nil) traces, err := r.reader.multiRead(context.Background(), []model.TraceID{{High: 0, Low: 1}, {High: 0, Low: 2}}, date, date) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.NoError(t, err) require.NotNil(t, traces) require.Len(t, traces, 2) @@ -420,6 +441,7 @@ func TestSpanReader_SearchAfter(t *testing.T) { }, nil).Times(2) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.NoError(t, err) require.NotNil(t, trace) @@ -439,6 +461,7 @@ func TestSpanReader_GetTraceQueryError(t *testing.T) { Responses: []*elastic.SearchResult{}, }, nil) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.EqualError(t, err, "trace not found") require.Nil(t, trace) }) @@ -458,6 +481,7 @@ func TestSpanReader_GetTraceNilHits(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.EqualError(t, err, "trace not found") require.Nil(t, trace) }) @@ -481,6 +505,7 @@ func TestSpanReader_GetTraceInvalidSpanError(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.Error(t, err, "invalid span") require.Nil(t, trace) }) @@ -505,6 +530,7 @@ func TestSpanReader_GetTraceSpanConversionError(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.NewTraceID(0, 1)) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.Error(t, err, "span conversion error, because lacks elements") require.Nil(t, trace) }) @@ -734,6 +760,7 @@ func TestSpanReader_FindTraces(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.NoError(t, err) assert.Len(t, traces, 1) @@ -778,6 +805,7 @@ func TestSpanReader_FindTracesInvalidQuery(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.Error(t, err) assert.Nil(t, traces) }) @@ -810,6 +838,7 @@ func TestSpanReader_FindTracesAggregationFailure(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.Error(t, err) assert.Nil(t, traces) }) @@ -844,6 +873,7 @@ func TestSpanReader_FindTracesNoTraceIDs(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.NoError(t, err) assert.Len(t, traces, 0) }) @@ -877,6 +907,7 @@ func TestSpanReader_FindTracesReadTraceFailure(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.EqualError(t, err, "read error") assert.Len(t, traces, 0) }) @@ -915,6 +946,7 @@ func TestSpanReader_FindTracesSpanCollectionFailure(t *testing.T) { } traces, err := r.reader.FindTraces(context.Background(), traceQuery) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.Error(t, err) assert.Len(t, traces, 0) }) @@ -1220,6 +1252,7 @@ func TestSpanReader_GetEmptyIndex(t *testing.T) { } services, err := r.reader.FindTraces(context.Background(), traceQuery) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.NoError(t, err) assert.Empty(t, services) }) @@ -1235,6 +1268,7 @@ func TestSpanReader_ArchiveTraces(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.Nil(t, trace) assert.EqualError(t, err, "trace not found") }) @@ -1250,6 +1284,7 @@ func TestSpanReader_ArchiveTraces_ReadAlias(t *testing.T) { }, nil) trace, err := r.reader.GetTrace(context.Background(), model.TraceID{}) + require.NotEmpty(t, r.traceBuffer.GetSpans(), "Spans recorded") require.Nil(t, trace) assert.EqualError(t, err, "trace not found") }) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 9d2ec031b1f..524b05e7d24 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -28,6 +28,9 @@ import ( "github.com/olivere/elastic" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/model" @@ -60,6 +63,18 @@ type ESStorageIntegration struct { logger *zap.Logger } +func tracerProvider() (trace.TracerProvider, *tracetest.InMemoryExporter, func() error) { + exporter := tracetest.NewInMemoryExporter() + tp := sdktrace.NewTracerProvider( + sdktrace.WithSampler(sdktrace.AlwaysSample()), + sdktrace.WithSyncer(exporter), + ) + closer := func() error { + return tp.Shutdown(context.Background()) + } + return tp, exporter, closer +} + func (s *ESStorageIntegration) getVersion() (uint, error) { pingResult, _, err := s.client.Ping(queryURL).Do(context.Background()) if err != nil { @@ -141,6 +156,8 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro if err != nil { return err } + tracer, _, closer := tracerProvider() + defer closer() s.SpanWriter = w s.SpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{ Client: client, @@ -151,6 +168,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) erro TagDotReplacement: tagKeyDeDotChar, Archive: archive, MaxDocCount: defaultMaxDocCount, + Tracer: tracer.Tracer("test"), }) dependencyStore := dependencystore.NewDependencyStore(dependencystore.DependencyStoreParams{ Client: client,