Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[awsxrayexporter] Allow sending spans in otlp format with a new config #236

Merged
merged 3 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions exporter/awsxrayexporter/awsxray.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package awsxrayexporter // import "github.com/open-telemetry/opentelemetry-colle

import (
"context"
"encoding/base64"
"errors"
"fmt"

"github.com/amazon-contributing/opentelemetry-collector-contrib/extension/awsmiddleware"
"github.com/aws/aws-sdk-go/aws/awserr"
Expand All @@ -15,6 +17,7 @@ import (
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsxrayexporter/internal/translator"
Expand All @@ -24,7 +27,10 @@ import (
)

const (
maxSegmentsPerPut = int(50) // limit imposed by PutTraceSegments API
maxSegmentsPerPut = int(50) // limit imposed by PutTraceSegments API
otlpFormatPrefix = "T1S" // X-Ray PutTraceSegment API uses this prefix to detect the format
otlpFormatKeyIndexAllAttributes = "aws.xray.exporter.config.index_all_attributes"
otlpFormatKeyIndexAttributes = "aws.xray.exporter.config.indexed_attributes"
)

// newTracesExporter creates an exporter.Traces that converts to an X-Ray PutTraceSegments
Expand Down Expand Up @@ -57,7 +63,15 @@ func newTracesExporter(
var err error
logger.Debug("TracesExporter", typeLog, nameLog, zap.Int("#spans", td.SpanCount()))

documents := extractResourceSpans(cfg, logger, td)
var documents []*string
if cfg.TransitSpanInOtlpFormat {
documents, err = encodeOtlpAsBase64(td, cfg)
if err != nil {
return err
}
} else { // by default use xray format
documents = extractResourceSpans(cfg, logger, td)
}

for offset := 0; offset < len(documents); offset += maxSegmentsPerPut {
var nextOffset int
Expand Down Expand Up @@ -137,3 +151,37 @@ func wrapErrorIfBadRequest(err error) error {
}
return err
}

// encodeOtlpAsBase64 builds bytes from traces and generate base64 value for them
func encodeOtlpAsBase64(td ptrace.Traces, cfg *Config) ([]*string, error) {
var documents []*string
for i := 0; i < td.ResourceSpans().Len(); i++ {
// 1. build a new trace with one resource span
singleTrace := ptrace.NewTraces()
td.ResourceSpans().At(i).CopyTo(singleTrace.ResourceSpans().AppendEmpty())
Copy link
Collaborator

@wangzlei wangzlei Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason here copy to a new trace?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because I didn't find a function that takes resourcespans as the input and produce bytes. The only function i found is this MarshalTraces, which takes trace as the input.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to put them under traces, this is the format used by the export request protobuf.
Basically the protobuf schema has same structure for TracesDatat and ExportTraceServiceRequest

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you try using ExportTraceServiceRequest https://opentelemetry.io/docs/specs/otlp/#binary-protobuf-encoding I think it should work as well and is closer to otlp implementation, which may have some other information in the future.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExportTraceServiceRequest is the Orig of Trace, however Orig is not being exposed publicly. there's a function as below, but as you can see, it's a private function.

func (ms Traces) getOrig() *otlpcollectortrace.ExportTraceServiceRequest {
	return internal.GetOrigTraces(internal.Traces(ms))
}


// 2. append index configuration to resource span as attributes, such that X-Ray Service build indexes based on them.
injectIndexConfigIntoOtlpPayload(singleTrace.ResourceSpans().At(0), cfg)

// 3. Marshal single trace into proto bytes
bytes, err := ptraceotlp.NewExportRequestFromTraces(singleTrace).MarshalProto()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can call it b instead of bytes, bytes is often https://pkg.go.dev/bytes

if err != nil {
return nil, fmt.Errorf("failed to marshal traces: %w", err)
}

// 4. build bytes into base64 and append with PROTOCOL HEADER at the beginning
base64Str := otlpFormatPrefix + base64.StdEncoding.EncodeToString(bytes)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we are not checking the size of encoded document? Does cwagent config has translator on the batching processor config? Though I think number of spans in one resource span is configured by the batch processor inside application for instrumentation SDK https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#batch-span-processor @mxiamxia Batch processor in agent/collector only has impact on how many resource spans (which equals to documents) and exporter is already handling more than 50 documents.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we don't check the size, would this cause any issue? I think as long as X-Ray Service can accept such size, we should be good.

Does cwagent config has translator on the batching processor config?

I'm not aware of that.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

documents = append(documents, &base64Str)
}

return documents, nil
}

func injectIndexConfigIntoOtlpPayload(resourceSpan ptrace.ResourceSpans, cfg *Config) {
jefchien marked this conversation as resolved.
Show resolved Hide resolved
attributes := resourceSpan.Resource().Attributes()
attributes.PutBool(otlpFormatKeyIndexAllAttributes, cfg.IndexAllAttributes)
indexAttributes := attributes.PutEmptySlice(otlpFormatKeyIndexAttributes)
for _, indexAttribute := range cfg.IndexedAttributes {
indexAttributes.AppendEmpty().SetStr(indexAttribute)
}
}
87 changes: 86 additions & 1 deletion exporter/awsxrayexporter/awsxray_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package awsxrayexporter
import (
"context"
"crypto/rand"
"encoding/base64"
"encoding/binary"
"fmt"
"testing"
Expand Down Expand Up @@ -120,6 +121,79 @@ func TestMiddleware(t *testing.T) {
handler.AssertCalled(t, "HandleResponse", mock.Anything, mock.Anything)
}

func TestTraceExportOtlpFormat(t *testing.T) {
config := generateConfig(t)
config.TransitSpanInOtlpFormat = true

traceExporter := initializeTracesExporter(t, generateConfig(t), telemetrytest.NewNopRegistry())
ctx := context.Background()
td := constructSpanData()
err := traceExporter.ConsumeTraces(ctx, td)
assert.Error(t, err)
err = traceExporter.Shutdown(ctx)
assert.NoError(t, err)
}

func TestEncodingOtlpFormat(t *testing.T) {
config1 := generateConfig(t)
config1.IndexAllAttributes = true
config1.IndexedAttributes = []string{"test", "test1", "test2"}
testEncodingOtlpFormatWithIndexConfiguration(t, config1)

config2 := generateConfig(t)
config2.IndexedAttributes = []string{"test", "test1", "test2"}
testEncodingOtlpFormatWithIndexConfiguration(t, config2)

config3 := generateConfig(t)
config3.IndexAllAttributes = true
testEncodingOtlpFormatWithIndexConfiguration(t, config3)

config4 := generateConfig(t)
config4.IndexedAttributes = []string{}
testEncodingOtlpFormatWithIndexConfiguration(t, config4)

config5 := generateConfig(t)
config5.IndexAllAttributes = false
testEncodingOtlpFormatWithIndexConfiguration(t, config5)

config6 := generateConfig(t)
config6.IndexAllAttributes = false
config6.IndexedAttributes = []string{}
testEncodingOtlpFormatWithIndexConfiguration(t, config6)
}

func testEncodingOtlpFormatWithIndexConfiguration(t *testing.T, config *Config) {
// 1. prepare 50 resource spans and encode them
td := constructMultiSpanData(50)
documents, err := encodeOtlpAsBase64(td, config)
assert.NoError(t, err)
assert.EqualValues(t, td.ResourceSpans().Len(), len(documents), "ensure #resourcespans same as #documents")

// 2. ensure documents can be decoded back
unmarshaler := &ptrace.ProtoUnmarshaler{}
for i, document := range documents {
assert.EqualValues(t, "T1S", (*document)[0:3], "ensure protocol prefix")
decodedBytes, err := base64.StdEncoding.DecodeString((*document)[3:])
assert.NoError(t, err)

trace, err := unmarshaler.UnmarshalTraces(decodedBytes)
assert.NoError(t, err)

trace.CopyTo(trace)

// 3. ensure index configurations are carried
injectIndexConfigIntoOtlpPayload(td.ResourceSpans().At(i), config)
assert.EqualValues(t, td.ResourceSpans().At(i), trace.ResourceSpans().At(0))
}
}

func TestEncodingOtlpFormatWithEmptySpans(t *testing.T) {
td := constructMultiSpanData(0)
documents, err := encodeOtlpAsBase64(td, generateConfig(t))
assert.NoError(t, err)
assert.EqualValues(t, 0, len(documents), "expect 0 document")
}

func BenchmarkForTracesExporter(b *testing.B) {
traceExporter := initializeTracesExporter(b, generateConfig(b), telemetrytest.NewNopRegistry())
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -156,7 +230,6 @@ func generateConfig(t testing.TB) *Config {

func constructSpanData() ptrace.Traces {
resource := constructResource()

traces := ptrace.NewTraces()
rspans := traces.ResourceSpans().AppendEmpty()
resource.CopyTo(rspans.Resource())
Expand All @@ -165,6 +238,18 @@ func constructSpanData() ptrace.Traces {
return traces
}

func constructMultiSpanData(numSpans int) ptrace.Traces {
traces := ptrace.NewTraces()
for range numSpans {
resource := constructResource()
rspans := traces.ResourceSpans().AppendEmpty()
resource.CopyTo(rspans.Resource())
ispans := rspans.ScopeSpans().AppendEmpty()
constructXrayTraceSpanData(ispans)
}
return traces
}

func constructW3CSpanData() ptrace.Traces {
resource := constructResource()
traces := ptrace.NewTraces()
Expand Down
3 changes: 3 additions & 0 deletions exporter/awsxrayexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type Config struct {
// AWS client.
MiddlewareID *component.ID `mapstructure:"middleware,omitempty"`

// X-Ray Export sends spans in its original otlp format to X-Ray Service when this flag is on
TransitSpanInOtlpFormat bool `mapstructure:"transit_spans_in_otlp_format,omitempty"`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this config name is accurate because strictly speaking we are not using otlp format. Server side is accepting base64 encoded protobuf binary where OTLP is passing the raw bytes are request body. I suggest we call it something like encode_to_otel_binary_base64

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the intention but I don't think exposing too much implementation details to users is helpful. All users need to know is that this flag enable them to send complete otlp format span to X-Ray, which contains more information than X-Ray format. Hence, i would still prefer keeping the name.


// skipTimestampValidation if enabled, will skip timestamp validation logic on the trace ID
skipTimestampValidation bool
}