Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] Add v1 factory converter to v2 storage factory #5497

Merged
merged 21 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Storage Factory Converter

A temporary v1 storage factory wrapper to implement v2 storage APIs.
This way, the existing v1 storage factories declared in `jaegerstorageextension`
can act as v2 storage while we migrate to v2 storage APIs.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package converter
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved

import (
"testing"

"github.com/jaegertracing/jaeger/pkg/testutils"
)

func TestMain(m *testing.M) {
testutils.VerifyGoLeaks(m)
}
58 changes: 58 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/converter/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package converter

import (
"context"
"io"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/metrics"
storage_v1 "github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage_v2/spanstore"
)

type Factory struct {
logger *zap.Logger
ss storage_v1.Factory
}

func NewFactory(logger *zap.Logger, ss storage_v1.Factory) spanstore.Factory {
return &Factory{
logger: logger,
ss: ss,

Check warning on line 25 in cmd/jaeger/internal/extension/jaegerstorage/converter/factory.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/factory.go#L22-L25

Added lines #L22 - L25 were not covered by tests
}
}

// Initialize implements spanstore.Factory.
func (f *Factory) Initialize(ctx context.Context) error {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
return f.ss.Initialize(metrics.NullFactory, f.logger)

Check warning on line 31 in cmd/jaeger/internal/extension/jaegerstorage/converter/factory.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/factory.go#L30-L31

Added lines #L30 - L31 were not covered by tests
}

// Close implements spanstore.Factory.
func (f *Factory) Close(ctx context.Context) error {
if closer, ok := f.ss.(io.Closer); ok {
return closer.Close()

Check warning on line 37 in cmd/jaeger/internal/extension/jaegerstorage/converter/factory.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/factory.go#L35-L37

Added lines #L35 - L37 were not covered by tests
}
return nil

Check warning on line 39 in cmd/jaeger/internal/extension/jaegerstorage/converter/factory.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/factory.go#L39

Added line #L39 was not covered by tests
}

// CreateTraceReader implements spanstore.Factory.
func (f *Factory) CreateTraceReader() (spanstore.Reader, error) {
spanReader, err := f.ss.CreateSpanReader()
if err != nil {
return nil, err

Check warning on line 46 in cmd/jaeger/internal/extension/jaegerstorage/converter/factory.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/factory.go#L43-L46

Added lines #L43 - L46 were not covered by tests
}
return NewTraceReader(spanReader)

Check warning on line 48 in cmd/jaeger/internal/extension/jaegerstorage/converter/factory.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/factory.go#L48

Added line #L48 was not covered by tests
}

// CreateTraceWriter implements spanstore.Factory.
func (f *Factory) CreateTraceWriter() (spanstore.Writer, error) {
spanWriter, err := f.ss.CreateSpanWriter()
if err != nil {
return nil, err

Check warning on line 55 in cmd/jaeger/internal/extension/jaegerstorage/converter/factory.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/factory.go#L52-L55

Added lines #L52 - L55 were not covered by tests
}
return NewTraceWriter(spanWriter)

Check warning on line 57 in cmd/jaeger/internal/extension/jaegerstorage/converter/factory.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/factory.go#L57

Added line #L57 was not covered by tests
}
139 changes: 139 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package converter

import (
"context"
"encoding/binary"
"fmt"

jaeger2otlp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/model"
spanstore_v1 "github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/spanstore"
)

type TraceReader struct {
spanReader spanstore_v1.Reader
}

func NewTraceReader(spanReader spanstore_v1.Reader) (spanstore.Reader, error) {
return &TraceReader{
spanReader: spanReader,
}, nil

Check warning on line 27 in cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go#L24-L27

Added lines #L24 - L27 were not covered by tests
}

// GetTrace implements spanstore.Reader.
func (s *TraceReader) GetTrace(ctx context.Context, traceID pcommon.TraceID) (ptrace.Traces, error) {

Check warning on line 31 in cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go#L31

Added line #L31 was not covered by tests
// otelcol-contrib has the translator to jaeger proto but declared in private function
// similar to https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/idutils/big_endian_converter.go#L21
traceIDHigh, traceIDLow := binary.BigEndian.Uint64(traceID[:8]), binary.BigEndian.Uint64(traceID[8:])
id := model.TraceID{
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
Low: traceIDLow,
High: traceIDHigh,

Check warning on line 37 in cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go#L34-L37

Added lines #L34 - L37 were not covered by tests
}
trace, err := s.spanReader.GetTrace(ctx, id)
if err != nil {
return ptrace.NewTraces(), err

Check warning on line 41 in cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go#L39-L41

Added lines #L39 - L41 were not covered by tests
}

batches := []*model.Batch{{Spans: trace.Spans}}
td, err := jaeger2otlp.ProtoToTraces(batches)
if err != nil {
return ptrace.NewTraces(), fmt.Errorf("cannot transform Jaeger trace to OTLP format: %w", err)

Check warning on line 47 in cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go#L44-L47

Added lines #L44 - L47 were not covered by tests
}

return td, nil

Check warning on line 50 in cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go#L50

Added line #L50 was not covered by tests
}

// GetServices implements spanstore.Reader.
func (s *TraceReader) GetServices(ctx context.Context) ([]string, error) {
services, err := s.spanReader.GetServices(ctx)
if err != nil {
return []string{}, err

Check warning on line 57 in cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go#L54-L57

Added lines #L54 - L57 were not covered by tests
}
return services, nil

Check warning on line 59 in cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go#L59

Added line #L59 was not covered by tests
}

// GetOperations implements spanstore.Reader.
func (s *TraceReader) GetOperations(ctx context.Context, query spanstore.OperationQueryParameters) ([]spanstore.Operation, error) {
ops, err := s.spanReader.GetOperations(ctx, spanstore_v1.OperationQueryParameters{
ServiceName: query.ServiceName,
SpanKind: query.SpanKind,
})
if err != nil {
return []spanstore.Operation{}, err

Check warning on line 69 in cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go#L63-L69

Added lines #L63 - L69 were not covered by tests
}

operations := []spanstore.Operation{}
for _, op := range ops {
operations = append(operations, spanstore.Operation{
Name: op.Name,
SpanKind: op.SpanKind,
})

Check warning on line 77 in cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go#L72-L77

Added lines #L72 - L77 were not covered by tests
}
return operations, nil

Check warning on line 79 in cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go#L79

Added line #L79 was not covered by tests
}

// FindTraces implements spanstore.Reader.
func (s *TraceReader) FindTraces(ctx context.Context, query spanstore.TraceQueryParameters) ([]ptrace.Traces, error) {
traces, err := s.spanReader.FindTraces(ctx, &spanstore_v1.TraceQueryParameters{
ServiceName: query.ServiceName,
OperationName: query.OperationName,
Tags: query.Tags,
StartTimeMin: query.StartTimeMin,
StartTimeMax: query.StartTimeMax,
DurationMin: query.DurationMin,
DurationMax: query.DurationMax,
NumTraces: query.NumTraces,
})
if err != nil {
return []ptrace.Traces{}, err

Check warning on line 95 in cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go#L83-L95

Added lines #L83 - L95 were not covered by tests
}

tds := []ptrace.Traces{}
for _, trace := range traces {
batch := []*model.Batch{{Spans: trace.Spans}}
td, err := jaeger2otlp.ProtoToTraces(batch)
if err != nil {
return []ptrace.Traces{}, fmt.Errorf("cannot transform Jaeger trace to OTLP format: %w", err)

Check warning on line 103 in cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go#L98-L103

Added lines #L98 - L103 were not covered by tests
}

tds = append(tds, td)

Check warning on line 106 in cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go#L106

Added line #L106 was not covered by tests
}

return tds, nil

Check warning on line 109 in cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go#L109

Added line #L109 was not covered by tests
}

// FindTraceIDs implements spanstore.Reader.
func (s *TraceReader) FindTraceIDs(ctx context.Context, query spanstore.TraceQueryParameters) ([]pcommon.TraceID, error) {
ids, err := s.spanReader.FindTraceIDs(ctx, &spanstore_v1.TraceQueryParameters{
ServiceName: query.ServiceName,
OperationName: query.OperationName,
Tags: query.Tags,
StartTimeMin: query.StartTimeMin,
StartTimeMax: query.StartTimeMax,
DurationMin: query.DurationMin,
DurationMax: query.DurationMax,
NumTraces: query.NumTraces,
})
if err != nil {
return []pcommon.TraceID{}, err

Check warning on line 125 in cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go#L113-L125

Added lines #L113 - L125 were not covered by tests
}

traceIDs := []pcommon.TraceID{}
for _, id := range ids {

Check warning on line 129 in cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go#L128-L129

Added lines #L128 - L129 were not covered by tests
// otelcol-contrib has the translator to OTLP but declared in private function
// similar to https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/idutils/big_endian_converter.go#L13
traceID := [16]byte{}
binary.BigEndian.PutUint64(traceID[:8], id.High)
binary.BigEndian.PutUint64(traceID[8:], id.Low)
traceIDs = append(traceIDs, traceID)

Check warning on line 135 in cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go#L132-L135

Added lines #L132 - L135 were not covered by tests
}

return traceIDs, nil

Check warning on line 138 in cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/reader.go#L138

Added line #L138 was not covered by tests
}
44 changes: 44 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/converter/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package converter

import (
"context"
"errors"
"fmt"

otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/ptrace"

spanstore_v1 "github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/spanstore"
)

type TraceWriter struct {
spanWriter spanstore_v1.Writer
}

func NewTraceWriter(spanWriter spanstore_v1.Writer) (spanstore.Writer, error) {
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
return &TraceWriter{
spanWriter: spanWriter,
}, nil

Check warning on line 25 in cmd/jaeger/internal/extension/jaegerstorage/converter/writer.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/writer.go#L22-L25

Added lines #L22 - L25 were not covered by tests
}

// WriteTraces implements spanstore.Writer.
func (t *TraceWriter) WriteTraces(ctx context.Context, td ptrace.Traces) error {
batches, err := otlp2jaeger.ProtoFromTraces(td)
if err != nil {
return fmt.Errorf("cannot transform OTLP traces to Jaeger format: %w", err)

Check warning on line 32 in cmd/jaeger/internal/extension/jaegerstorage/converter/writer.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/writer.go#L29-L32

Added lines #L29 - L32 were not covered by tests
}
var errs []error
for _, batch := range batches {
for _, span := range batch.Spans {
if span.Process == nil {
span.Process = batch.Process

Check warning on line 38 in cmd/jaeger/internal/extension/jaegerstorage/converter/writer.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/writer.go#L34-L38

Added lines #L34 - L38 were not covered by tests
}
errs = append(errs, t.spanWriter.WriteSpan(ctx, span))

Check warning on line 40 in cmd/jaeger/internal/extension/jaegerstorage/converter/writer.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/writer.go#L40

Added line #L40 was not covered by tests
}
}
return errors.Join(errs...)

Check warning on line 43 in cmd/jaeger/internal/extension/jaegerstorage/converter/writer.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerstorage/converter/writer.go#L43

Added line #L43 was not covered by tests
}
Loading