Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade storage integration test to v2 Trace Reader #6388

Merged
merged 15 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cmd/jaeger/internal/integration/e2e_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/integration/storagecleaner"
"github.com/jaegertracing/jaeger/plugin/storage/integration"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

const otlpPort = 4317
Expand Down Expand Up @@ -149,8 +150,9 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {

s.SpanWriter, err = createSpanWriter(logger, otlpPort)
require.NoError(t, err)
s.SpanReader, err = createSpanReader(logger, ports.QueryGRPC)
spanReader, err := createSpanReader(logger, ports.QueryGRPC)
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)

t.Cleanup(func() {
// Call e2eCleanUp to close the SpanReader and SpanWriter gRPC connection.
Expand Down Expand Up @@ -207,7 +209,9 @@ func (s *E2EStorageIntegration) doHealthCheck(t *testing.T) bool {
// e2eCleanUp closes the SpanReader and SpanWriter gRPC connection.
// This function should be called after all the tests are finished.
func (s *E2EStorageIntegration) e2eCleanUp(t *testing.T) {
require.NoError(t, s.SpanReader.(io.Closer).Close())
spanReader, err := v1adapter.GetV1Reader(s.TraceReader)
require.NoError(t, err)
require.NoError(t, spanReader.(io.Closer).Close())
require.NoError(t, s.SpanWriter.(io.Closer).Close())
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/jaeger/internal/integration/tailsampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (ts *TailSamplingIntegration) testTailSamplingProccessor(t *testing.T) {
var actual []string
assert.Eventually(t, func() bool {
var err error
actual, err = ts.SpanReader.GetServices(context.Background())
actual, err = ts.TraceReader.GetServices(context.Background())
require.NoError(t, err)
sort.Strings(actual)
return assert.ObjectsAreEqualValues(ts.expectedServices, actual)
Expand Down
6 changes: 4 additions & 2 deletions plugin/storage/integration/badgerstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/badger"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type BadgerIntegrationStorage struct {
Expand All @@ -35,9 +36,10 @@ func (s *BadgerIntegrationStorage) initialize(t *testing.T) {
s.SpanWriter, err = s.factory.CreateSpanWriter()
require.NoError(t, err)

s.SpanReader, err = s.factory.CreateSpanReader()
spanReader, err := s.factory.CreateSpanReader()
require.NoError(t, err)

s.TraceReader = v1adapter.NewTraceReader(spanReader)

s.SamplingStore, err = s.factory.CreateSamplingStore(0)
require.NoError(t, err)
}
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type CassandraStorageIntegration struct {
Expand Down Expand Up @@ -74,8 +75,9 @@ func (s *CassandraStorageIntegration) initializeCassandra(t *testing.T) {
var err error
s.SpanWriter, err = f.CreateSpanWriter()
require.NoError(t, err)
s.SpanReader, err = f.CreateSpanReader()
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.ArchiveSpanReader, err = f.CreateArchiveSpanReader()
require.NoError(t, err)
s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter()
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/elasticsearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/es"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

const (
Expand Down Expand Up @@ -134,8 +135,9 @@ func (s *ESStorageIntegration) initSpanstore(t *testing.T, allTagsAsFields bool)
var err error
s.SpanWriter, err = f.CreateSpanWriter()
require.NoError(t, err)
s.SpanReader, err = f.CreateSpanReader()
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.ArchiveSpanReader, err = f.CreateArchiveSpanReader()
require.NoError(t, err)
s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter()
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type GRPCStorageIntegrationTestSuite struct {
Expand All @@ -38,8 +39,9 @@ func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) {

s.SpanWriter, err = f.CreateSpanWriter()
require.NoError(t, err)
s.SpanReader, err = f.CreateSpanReader()
spanReader, err := f.CreateSpanReader()
require.NoError(t, err)
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.ArchiveSpanReader, err = f.CreateArchiveSpanReader()
require.NoError(t, err)
s.ArchiveSpanWriter, err = f.CreateArchiveSpanWriter()
Expand Down
53 changes: 31 additions & 22 deletions plugin/storage/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

//go:embed fixtures
Expand All @@ -42,7 +44,7 @@ var fixtures embed.FS
// and RunAll() under different conditions.
type StorageIntegration struct {
SpanWriter spanstore.Writer
SpanReader spanstore.Reader
TraceReader tracestore.Reader
ArchiveSpanReader spanstore.Reader
ArchiveSpanWriter spanstore.Writer
DependencyWriter dependencystore.Writer
Expand Down Expand Up @@ -79,7 +81,7 @@ type StorageIntegration struct {
// the service name is formatted "query##-service".
type QueryFixtures struct {
Caption string
Query *spanstore.TraceQueryParameters
Query *tracestore.TraceQueryParams
ExpectedFixtures []string
}

Expand Down Expand Up @@ -143,7 +145,7 @@ func (s *StorageIntegration) testGetServices(t *testing.T) {
var actual []string
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.SpanReader.GetServices(context.Background())
actual, err = s.TraceReader.GetServices(context.Background())
if err != nil {
t.Log(err)
return false
Expand All @@ -154,9 +156,10 @@ func (s *StorageIntegration) testGetServices(t *testing.T) {
// If the storage backend returns more services than expected, let's log traces for those
t.Log("🛑 Found unexpected services!")
for _, service := range actual {
traces, err := s.SpanReader.FindTraces(context.Background(), &spanstore.TraceQueryParameters{
iterTraces := s.TraceReader.FindTraces(context.Background(), tracestore.TraceQueryParams{
ServiceName: service,
})
traces, err := v1adapter.PTracesSeq2ToModel(iterTraces)
if err != nil {
t.Log(err)
continue
Expand Down Expand Up @@ -202,7 +205,7 @@ func (s *StorageIntegration) testArchiveTrace(t *testing.T) {
return err == nil && len(actual.Spans) == 1
})
require.True(t, found)
CompareTraces(t, &model.Trace{Spans: []*model.Span{expected}}, actual)
v1adapter.CompareTraces(t, &model.Trace{Spans: []*model.Span{expected}}, actual)
}

func (s *StorageIntegration) testGetLargeSpan(t *testing.T) {
Expand All @@ -216,13 +219,15 @@ func (s *StorageIntegration) testGetLargeSpan(t *testing.T) {

var actual *model.Trace
found := s.waitForCondition(t, func(_ *testing.T) bool {
var err error
actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID})
iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID.ToOTELTraceID()})
traces, err := v1adapter.PTracesSeq2ToModel(iterTraces)
require.NotEmpty(t, traces)
actual = traces[0]
return err == nil && len(actual.Spans) >= len(expected.Spans)
})

if !assert.True(t, found) {
CompareTraces(t, expected, actual)
v1adapter.CompareTraces(t, expected, actual)
return
}

Expand All @@ -242,27 +247,27 @@ func (s *StorageIntegration) testGetOperations(t *testing.T) {
s.skipIfNeeded(t)
defer s.cleanUp(t)

var expected []spanstore.Operation
var expected []tracestore.Operation
if s.GetOperationsMissingSpanKind {
expected = []spanstore.Operation{
expected = []tracestore.Operation{
{Name: "example-operation-1"},
{Name: "example-operation-3"},
{Name: "example-operation-4"},
}
} else {
expected = []spanstore.Operation{
expected = []tracestore.Operation{
{Name: "example-operation-1", SpanKind: ""},
{Name: "example-operation-3", SpanKind: "server"},
{Name: "example-operation-4", SpanKind: "client"},
}
}
s.loadParseAndWriteExampleTrace(t)

var actual []spanstore.Operation
var actual []tracestore.Operation
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.SpanReader.GetOperations(context.Background(),
spanstore.OperationQueryParameters{ServiceName: "example-service-1"})
actual, err = s.TraceReader.GetOperations(context.Background(),
tracestore.OperationQueryParameters{ServiceName: "example-service-1"})
if err != nil {
t.Log(err)
return false
Expand All @@ -289,22 +294,25 @@ func (s *StorageIntegration) testGetTrace(t *testing.T) {

var actual *model.Trace
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
actual, err = s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: expectedTraceID})
iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: expectedTraceID.ToOTELTraceID()})
traces, err := v1adapter.PTracesSeq2ToModel(iterTraces)
if err != nil {
t.Log(err)
}
require.NotEmpty(t, traces)
actual = traces[0]
return err == nil && len(actual.Spans) == len(expected.Spans)
})
if !assert.True(t, found) {
CompareTraces(t, expected, actual)
v1adapter.CompareTraces(t, expected, actual)
}

t.Run("NotFound error", func(t *testing.T) {
fakeTraceID := model.TraceID{High: 0, Low: 1}
trace, err := s.SpanReader.GetTrace(context.Background(), spanstore.GetTraceParameters{TraceID: fakeTraceID})
iterTraces := s.TraceReader.GetTraces(context.Background(), tracestore.GetTraceParams{TraceID: fakeTraceID.ToOTELTraceID()})
traces, err := v1adapter.PTracesSeq2ToModel(iterTraces)
assert.Equal(t, spanstore.ErrTraceNotFound, err)
assert.Nil(t, trace)
assert.Nil(t, traces)
})
}

Expand Down Expand Up @@ -337,16 +345,17 @@ func (s *StorageIntegration) testFindTraces(t *testing.T) {
s.skipIfNeeded(t)
expected := expectedTracesPerTestCase[i]
actual := s.findTracesByQuery(t, queryTestCase.Query, expected)
CompareSliceOfTraces(t, expected, actual)
v1adapter.CompareSliceOfTraces(t, expected, actual)
})
}
}

func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *spanstore.TraceQueryParameters, expected []*model.Trace) []*model.Trace {
func (s *StorageIntegration) findTracesByQuery(t *testing.T, query *tracestore.TraceQueryParams, expected []*model.Trace) []*model.Trace {
var traces []*model.Trace
found := s.waitForCondition(t, func(t *testing.T) bool {
var err error
traces, err = s.SpanReader.FindTraces(context.Background(), query)
iterTraces := s.TraceReader.FindTraces(context.Background(), *query)
traces, err = v1adapter.PTracesSeq2ToModel(iterTraces)
if err != nil {
t.Log(err)
return false
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

const defaultLocalKafkaBroker = "127.0.0.1:9092"
Expand Down Expand Up @@ -91,7 +92,8 @@ func (s *KafkaIntegrationTestSuite) initialize(t *testing.T) {
spanConsumer.Start()

s.SpanWriter = spanWriter
s.SpanReader = &ingester{traceStore}
spanReader := &ingester{traceStore}
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.CleanUp = func(_ *testing.T) {}
s.SkipArchiveTest = true
}
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/integration/memstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

type MemStorageIntegrationTestSuite struct {
Expand All @@ -24,7 +25,8 @@ func (s *MemStorageIntegrationTestSuite) initialize(_ *testing.T) {
store := memory.NewStore()
archiveStore := memory.NewStore()
s.SamplingStore = memory.NewSamplingStore(2)
s.SpanReader = store
spanReader := store
s.TraceReader = v1adapter.NewTraceReader(spanReader)
s.SpanWriter = store
s.ArchiveSpanReader = archiveStore
s.ArchiveSpanWriter = archiveStore
Expand Down
74 changes: 74 additions & 0 deletions storage_v2/v1adapter/ptrace2model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package v1adapter

import (
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/iter"
"github.com/jaegertracing/jaeger/storage/spanstore"
otel2model "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
)

// PTracesSeq2ToModel consumes an iterator seqTrace. When necessary,
// it groups spans from *consecutive* chunks of ptrace.Traces into a single model.Trace
//
// Returns nil, and spanstore.ErrTraceNotFound for empty iterators
func PTracesSeq2ToModel(seqTrace iter.Seq2[[]ptrace.Traces, error]) ([]*model.Trace, error) {
var (
err error
lastTraceID *model.TraceID
lastTrace *model.Trace
)
jaegerTraces := []*model.Trace{}

seqTrace(func(otelTraces []ptrace.Traces, e error) bool {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
if e != nil {
err = e
return false
}

for _, otelTrace := range otelTraces {
spans := modelSpansFromOtelTrace(otelTrace)
if len(spans) == 0 {
continue
}
currentTraceID := spans[0].TraceID
if lastTraceID != nil && *lastTraceID == currentTraceID {
lastTrace.Spans = append(lastTrace.Spans, spans...)
} else {
newTrace := &model.Trace{Spans: spans}
lastTraceID = &currentTraceID
lastTrace = newTrace
jaegerTraces = append(jaegerTraces, lastTrace)
}
}
return true
})

if err != nil {
return nil, err
}

if len(jaegerTraces) == 0 {
return nil, spanstore.ErrTraceNotFound
}
return jaegerTraces, nil
}

// modelSpansFromOtelTrace extracts spans from otel traces
func modelSpansFromOtelTrace(otelTrace ptrace.Traces) []*model.Span {
spans := []*model.Span{}
batches := otel2model.ProtoFromTraces(otelTrace)
for _, batch := range batches {
for _, span := range batch.Spans {
span.Process = &model.Process{}
span.Process.ServiceName = batch.GetProcess().GetServiceName()
span.Process.Tags = batch.GetProcess().GetTags()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what we usually do in this case is

if span.Process == nil {
  span.Process = batch.Process
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright...

I tried updating the span process everytime, regardless, but the comparator would pick up a difference I couldn't notice by looking at json data. But I'll try with this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reusing the Process from Batch avoids unnecessary memory allocations.

Copy link
Contributor Author

@ekefan ekefan Dec 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay,

I tried again though, I don't know why yet, but for badger storage (the test I tried) it does this only for those cases.
Screenshot (83)
Screenshot (84)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent an hour debugging but couldn't find the root cause. I even wrote a simple test that does a roundtrip from model -> ptrace -> model where we can see that if Process.Tags==[] at the start it becomes nil after roundtrip. However, pretty.Diff did not complain about that, so I am not sure why it was complaining in the badger test (it complained exactly about that, that actual Process.Tags == nil while expected process had [].

spans = append(spans, span)
}
}
return spans
}
Loading