Skip to content

Commit

Permalink
[awsxrayexporter] add new configuration to allow sending otlp formatt…
Browse files Browse the repository at this point in the history
…ed spans to X-Ray Service
  • Loading branch information
wyTrivail committed Oct 1, 2024
1 parent 299ef87 commit a804a73
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 3 deletions.
52 changes: 50 additions & 2 deletions exporter/awsxrayexporter/awsxray.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package awsxrayexporter // import "github.com/open-telemetry/opentelemetry-colle

import (
"context"
"encoding/base64"
"errors"
"fmt"

"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand All @@ -24,7 +26,10 @@ import (
)

const (
maxSegmentsPerPut = int(50) // limit imposed by PutTraceSegments API
maxSegmentsPerPut = int(50) // limit imposed by PutTraceSegments API
otlpFormatPrefix = "T1S" // X-Ray PutTraceSegment API uses this prefix to detect the format
otlpFormatKeyIndexAllAttributes = "aws.xray.exporter.config.index_all_attributes"
otlpFormatKeyIndexAttributes = "aws.xray.exporter.config.indexed_attributes"
)

// newTracesExporter creates an exporter.Traces that converts to an X-Ray PutTraceSegments
Expand Down Expand Up @@ -57,7 +62,15 @@ func newTracesExporter(
var err error
logger.Debug("TracesExporter", typeLog, nameLog, zap.Int("#spans", td.SpanCount()))

documents := extractResourceSpans(cfg, logger, td)
var documents []*string
if cfg.TransitSpanInOtlpFormat {
documents, err = encodeOtlpAsBase64(td, cfg)
if err != nil {
return err
}
} else { // by default use xray format
documents = extractResourceSpans(cfg, logger, td)
}

for offset := 0; offset < len(documents); offset += maxSegmentsPerPut {
var nextOffset int
Expand Down Expand Up @@ -137,3 +150,38 @@ func wrapErrorIfBadRequest(err error) error {
}
return err
}

// encodeOtlpAsBase64 builds bytes from traces and generate base64 value for them
func encodeOtlpAsBase64(td ptrace.Traces, cfg *Config) ([]*string, error) {
var documents []*string
marshaller := &ptrace.ProtoMarshaler{}
for i := 0; i < td.ResourceSpans().Len(); i++ {
// 1. build a new trace with one resource span
singleTrace := ptrace.NewTraces()
td.ResourceSpans().At(i).CopyTo(singleTrace.ResourceSpans().AppendEmpty())

// 2. append index configuration to resource span as attributes, such that X-Ray Service build indexes based on them.
injectIndexConfigIntoOtlpPayload(singleTrace.ResourceSpans().At(0), cfg)

// 3. Marshal single trace into proto bytes
bytes, err := marshaller.MarshalTraces(singleTrace)
if err != nil {
return nil, fmt.Errorf("failed to marshal traces: %w", err)
}

// 4. build bytes into base64 and append with PROTOCOL HEADER at the beginning
base64Str := otlpFormatPrefix + base64.StdEncoding.EncodeToString(bytes)
documents = append(documents, &base64Str)
}

return documents, nil
}

func injectIndexConfigIntoOtlpPayload(resourceSpan ptrace.ResourceSpans, cfg *Config) {
attributes := resourceSpan.Resource().Attributes()
attributes.PutBool(otlpFormatKeyIndexAllAttributes, cfg.IndexAllAttributes)
indexAttributes := attributes.PutEmptySlice(otlpFormatKeyIndexAttributes)
for _, indexAttribute := range cfg.IndexedAttributes {
indexAttributes.AppendEmpty().SetStr(indexAttribute)
}
}
87 changes: 86 additions & 1 deletion exporter/awsxrayexporter/awsxray_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package awsxrayexporter
import (
"context"
"crypto/rand"
"encoding/base64"
"encoding/binary"
"fmt"
"testing"
Expand Down Expand Up @@ -120,6 +121,79 @@ func TestMiddleware(t *testing.T) {
handler.AssertCalled(t, "HandleResponse", mock.Anything, mock.Anything)
}

func TestTraceExportOtlpFormat(t *testing.T) {
config := generateConfig(t)
config.TransitSpanInOtlpFormat = true

traceExporter := initializeTracesExporter(t, generateConfig(t), telemetrytest.NewNopRegistry())
ctx := context.Background()
td := constructSpanData()
err := traceExporter.ConsumeTraces(ctx, td)
assert.Error(t, err)
err = traceExporter.Shutdown(ctx)
assert.NoError(t, err)
}

func TestEncodingOtlpFormat(t *testing.T) {
config1 := generateConfig(t)
config1.IndexAllAttributes = true
config1.IndexedAttributes = []string{"test", "test1", "test2"}
testEncodingOtlpFormatWithIndexConfiguration(t, config1)

config2 := generateConfig(t)
config2.IndexedAttributes = []string{"test", "test1", "test2"}
testEncodingOtlpFormatWithIndexConfiguration(t, config2)

config3 := generateConfig(t)
config3.IndexAllAttributes = true
testEncodingOtlpFormatWithIndexConfiguration(t, config3)

config4 := generateConfig(t)
config4.IndexedAttributes = []string{}
testEncodingOtlpFormatWithIndexConfiguration(t, config4)

config5 := generateConfig(t)
config5.IndexAllAttributes = false
testEncodingOtlpFormatWithIndexConfiguration(t, config5)

config6 := generateConfig(t)
config6.IndexAllAttributes = false
config6.IndexedAttributes = []string{}
testEncodingOtlpFormatWithIndexConfiguration(t, config6)
}

func testEncodingOtlpFormatWithIndexConfiguration(t *testing.T, config *Config) {
// 1. prepare 50 resource spans and encode them
td := constructMultiSpanData(50)
documents, err := encodeOtlpAsBase64(td, config)
assert.NoError(t, err)
assert.EqualValues(t, td.ResourceSpans().Len(), len(documents), "ensure #resourcespans same as #documents")

// 2. ensure documents can be decoded back
unmarshaler := &ptrace.ProtoUnmarshaler{}
for i, document := range documents {
assert.EqualValues(t, "T1S", (*document)[0:3], "ensure protocol prefix")
decodedBytes, err := base64.StdEncoding.DecodeString((*document)[3:])
assert.NoError(t, err)

trace, err := unmarshaler.UnmarshalTraces(decodedBytes)
assert.NoError(t, err)

trace.CopyTo(trace)

// 3. ensure index configurations are carried
injectIndexConfigIntoOtlpPayload(td.ResourceSpans().At(i), config)
assert.EqualValues(t, td.ResourceSpans().At(i), trace.ResourceSpans().At(0))
}
}

func TestEncodingOtlpFormatWithEmptySpans(t *testing.T) {
td := constructMultiSpanData(0)
documents, err := encodeOtlpAsBase64(td, generateConfig(t))
assert.NoError(t, err)
assert.EqualValues(t, 0, len(documents), "expect 0 document")
}

func BenchmarkForTracesExporter(b *testing.B) {
traceExporter := initializeTracesExporter(b, generateConfig(b), telemetrytest.NewNopRegistry())
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -156,7 +230,6 @@ func generateConfig(t testing.TB) *Config {

func constructSpanData() ptrace.Traces {
resource := constructResource()

traces := ptrace.NewTraces()
rspans := traces.ResourceSpans().AppendEmpty()
resource.CopyTo(rspans.Resource())
Expand All @@ -165,6 +238,18 @@ func constructSpanData() ptrace.Traces {
return traces
}

func constructMultiSpanData(numSpans int) ptrace.Traces {
traces := ptrace.NewTraces()
for range numSpans {
resource := constructResource()
rspans := traces.ResourceSpans().AppendEmpty()
resource.CopyTo(rspans.Resource())
ispans := rspans.ScopeSpans().AppendEmpty()
constructXrayTraceSpanData(ispans)
}
return traces
}

func constructW3CSpanData() ptrace.Traces {
resource := constructResource()
traces := ptrace.NewTraces()
Expand Down
3 changes: 3 additions & 0 deletions exporter/awsxrayexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type Config struct {
// AWS client.
MiddlewareID *component.ID `mapstructure:"middleware,omitempty"`

// X-Ray Export sends spans in its original otlp format to X-Ray Service when this flag is on
TransitSpanInOtlpFormat bool `mapstructure:"transit_spans_in_otlp_format,omitempty"`

// skipTimestampValidation if enabled, will skip timestamp validation logic on the trace ID
skipTimestampValidation bool
}

0 comments on commit a804a73

Please sign in to comment.