From 0f4a9a02c8fe80057d12c72d083457150c5d4b4a Mon Sep 17 00:00:00 2001 From: Nicholas Hulston Date: Mon, 7 Oct 2024 14:57:47 -0400 Subject: [PATCH 01/11] Add skeleton for injecting trace context --- contrib/aws/aws-sdk-go-v2/aws/aws.go | 14 +++++++ .../aws/internal/eventbridge/eventbridge.go | 34 ++++++++++++++++ contrib/aws/internal/sns/sns.go | 39 +++++++++++++++++++ contrib/aws/internal/sqs/sqs.go | 39 +++++++++++++++++++ 4 files changed, 126 insertions(+) create mode 100644 contrib/aws/internal/eventbridge/eventbridge.go create mode 100644 contrib/aws/internal/sns/sns.go create mode 100644 contrib/aws/internal/sqs/sqs.go diff --git a/contrib/aws/aws-sdk-go-v2/aws/aws.go b/contrib/aws/aws-sdk-go-v2/aws/aws.go index f193914cbd..2149fa3661 100644 --- a/contrib/aws/aws-sdk-go-v2/aws/aws.go +++ b/contrib/aws/aws-sdk-go-v2/aws/aws.go @@ -31,6 +31,10 @@ import ( "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/smithy-go/middleware" smithyhttp "github.com/aws/smithy-go/transport/http" + + eventBridgeTracer "gopkg.in/DataDog/dd-trace-go.v1/contrib/aws/internal/eventbridge" + snsTracer "gopkg.in/DataDog/dd-trace-go.v1/contrib/aws/internal/sns" + sqsTracer "gopkg.in/DataDog/dd-trace-go.v1/contrib/aws/internal/sqs" ) const componentName = "aws/aws-sdk-go-v2/aws" @@ -105,6 +109,16 @@ func (mw *traceMiddleware) startTraceMiddleware(stack *middleware.Stack) error { } span, spanctx := tracer.StartSpanFromContext(ctx, spanName(serviceID, operation), opts...) + // Inject trace context + switch serviceID { + case "SQS": + sqsTracer.EnrichOperation(spanctx, in, operation) + case "SNS": + snsTracer.EnrichOperation(spanctx, in, operation) + case "EventBridge": + eventBridgeTracer.EnrichOperation(spanctx, in, operation) + } + // Handle initialize and continue through the middleware chain. out, metadata, err = next.HandleInitialize(spanctx, in) if err != nil && (mw.cfg.errCheck == nil || mw.cfg.errCheck(err)) { diff --git a/contrib/aws/internal/eventbridge/eventbridge.go b/contrib/aws/internal/eventbridge/eventbridge.go new file mode 100644 index 0000000000..7dc200f7a1 --- /dev/null +++ b/contrib/aws/internal/eventbridge/eventbridge.go @@ -0,0 +1,34 @@ +package eventbridge + +import ( + "context" + "github.com/aws/aws-sdk-go-v2/service/eventbridge/types" + "github.com/aws/smithy-go/middleware" +) + +const ( + datadogKey = "_datadog" + startTimeKey = "x-datadog-start-time" + resourceNameKey = "x-datadog-resource-name" +) + +type messageCarrier map[string]string + +func (carrier messageCarrier) Set(key, val string) { + carrier[key] = val +} + +func EnrichOperation(ctx context.Context, in middleware.InitializeInput, operation string) { + switch operation { + case "PutEvents": + handlePutEvents(ctx, in) + } +} + +func handlePutEvents(ctx context.Context, in middleware.InitializeInput) { + // TODO +} + +func injectTraceContext(ctx context.Context, entry *types.PutEventsRequestEntry) { + // TODO +} diff --git a/contrib/aws/internal/sns/sns.go b/contrib/aws/internal/sns/sns.go new file mode 100644 index 0000000000..7d639c4c78 --- /dev/null +++ b/contrib/aws/internal/sns/sns.go @@ -0,0 +1,39 @@ +package sns + +import ( + "context" + "github.com/aws/aws-sdk-go-v2/service/sns/types" + "github.com/aws/smithy-go/middleware" +) + +const ( + datadogKey = "_datadog" + maxMessageAttributes = 10 +) + +type messageCarrier map[string]string + +func (carrier messageCarrier) Set(key, val string) { + carrier[key] = val +} + +func EnrichOperation(ctx context.Context, in middleware.InitializeInput, operation string) { + switch operation { + case "Publish": + handlePublish(ctx, in) + case "PublishBatch": + handlePublishBatch(ctx, in) + } +} + +func handlePublish(ctx context.Context, in middleware.InitializeInput) { + // TODO +} + +func handlePublishBatch(ctx context.Context, in middleware.InitializeInput) { + // TODO +} + +func injectTraceContext(ctx context.Context, messageAttributes map[string]types.MessageAttributeValue) { + // TODO +} diff --git a/contrib/aws/internal/sqs/sqs.go b/contrib/aws/internal/sqs/sqs.go new file mode 100644 index 0000000000..83775be2e5 --- /dev/null +++ b/contrib/aws/internal/sqs/sqs.go @@ -0,0 +1,39 @@ +package sqs + +import ( + "context" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "github.com/aws/smithy-go/middleware" +) + +const ( + datadogKey = "_datadog" + maxMessageAttributes = 10 +) + +type messageCarrier map[string]string + +func (carrier messageCarrier) Set(key, val string) { + carrier[key] = val +} + +func EnrichOperation(ctx context.Context, in middleware.InitializeInput, operation string) { + switch operation { + case "SendMessage": + handleSendMessage(ctx, in) + case "SendMessageBatch": + handleSendMessageBatch(ctx, in) + } +} + +func handleSendMessage(ctx context.Context, in middleware.InitializeInput) { + // TODO +} + +func handleSendMessageBatch(ctx context.Context, in middleware.InitializeInput) { + // TODO +} + +func injectTraceContext(ctx context.Context, messageAttributes map[string]types.MessageAttributeValue) { + // TODO +} From 461d0045087ef4c06cdede61bde46ae81fa4f151 Mon Sep 17 00:00:00 2001 From: Nicholas Hulston Date: Mon, 7 Oct 2024 15:07:00 -0400 Subject: [PATCH 02/11] Implement trace injection for SQS and SNS --- contrib/aws/internal/sns/sns.go | 63 +++++++++++++++++++++++++++++++-- contrib/aws/internal/sqs/sqs.go | 63 +++++++++++++++++++++++++++++++-- 2 files changed, 120 insertions(+), 6 deletions(-) diff --git a/contrib/aws/internal/sns/sns.go b/contrib/aws/internal/sns/sns.go index 7d639c4c78..26d0a1c9ff 100644 --- a/contrib/aws/internal/sns/sns.go +++ b/contrib/aws/internal/sns/sns.go @@ -2,8 +2,13 @@ package sns import ( "context" + "encoding/json" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sns/types" "github.com/aws/smithy-go/middleware" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + "gopkg.in/DataDog/dd-trace-go.v1/internal/log" ) const ( @@ -27,13 +32,65 @@ func EnrichOperation(ctx context.Context, in middleware.InitializeInput, operati } func handlePublish(ctx context.Context, in middleware.InitializeInput) { - // TODO + params, ok := in.Parameters.(*sns.PublishInput) + if !ok { + log.Debug("Unable to read PublishInput params") + return + } + + if params.MessageAttributes == nil { + params.MessageAttributes = make(map[string]types.MessageAttributeValue) + } + + injectTraceContext(ctx, params.MessageAttributes) } func handlePublishBatch(ctx context.Context, in middleware.InitializeInput) { - // TODO + params, ok := in.Parameters.(*sns.PublishBatchInput) + if !ok { + log.Debug("Unable to read PublishBatch params") + return + } + + for i := range params.PublishBatchRequestEntries { + entryPtr := ¶ms.PublishBatchRequestEntries[i] + if entryPtr.MessageAttributes == nil { + entryPtr.MessageAttributes = make(map[string]types.MessageAttributeValue) + } + injectTraceContext(ctx, entryPtr.MessageAttributes) + } } func injectTraceContext(ctx context.Context, messageAttributes map[string]types.MessageAttributeValue) { - // TODO + span, ok := tracer.SpanFromContext(ctx) + if !ok || span == nil { + log.Debug("Unable to find span from context") + return + } + + // SNS only allows a maximum of 10 message attributes. + // https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html + // Only inject if there's room. + if len(messageAttributes) >= maxMessageAttributes { + log.Info("Cannot inject trace context: message already has maximum allowed attributes") + return + } + + carrier := make(messageCarrier) + err := tracer.Inject(span.Context(), carrier) + if err != nil { + log.Debug("Unable to inject trace context: %s", err.Error()) + return + } + + jsonBytes, err := json.Marshal(carrier) + if err != nil { + log.Debug("Unable to marshal trace context: %s", err.Error()) + return + } + + messageAttributes[datadogKey] = types.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String(string(jsonBytes)), + } } diff --git a/contrib/aws/internal/sqs/sqs.go b/contrib/aws/internal/sqs/sqs.go index 83775be2e5..c474c9abea 100644 --- a/contrib/aws/internal/sqs/sqs.go +++ b/contrib/aws/internal/sqs/sqs.go @@ -2,8 +2,13 @@ package sqs import ( "context" + "encoding/json" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/aws/smithy-go/middleware" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + "gopkg.in/DataDog/dd-trace-go.v1/internal/log" ) const ( @@ -27,13 +32,65 @@ func EnrichOperation(ctx context.Context, in middleware.InitializeInput, operati } func handleSendMessage(ctx context.Context, in middleware.InitializeInput) { - // TODO + params, ok := in.Parameters.(*sqs.SendMessageInput) + if !ok { + log.Debug("Unable to read SendMessage params") + return + } + + if params.MessageAttributes == nil { + params.MessageAttributes = make(map[string]types.MessageAttributeValue) + } + + injectTraceContext(ctx, params.MessageAttributes) } func handleSendMessageBatch(ctx context.Context, in middleware.InitializeInput) { - // TODO + params, ok := in.Parameters.(*sqs.SendMessageBatchInput) + if !ok { + log.Debug("Unable to read SendMessageBatch params") + return + } + + for i := range params.Entries { + entryPtr := ¶ms.Entries[i] + if entryPtr.MessageAttributes == nil { + entryPtr.MessageAttributes = make(map[string]types.MessageAttributeValue) + } + injectTraceContext(ctx, entryPtr.MessageAttributes) + } } func injectTraceContext(ctx context.Context, messageAttributes map[string]types.MessageAttributeValue) { - // TODO + span, ok := tracer.SpanFromContext(ctx) + if !ok || span == nil { + log.Debug("Unable to find span from context") + return + } + + // SQS only allows a maximum of 10 message attributes. + // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes + // Only inject if there's room. + if len(messageAttributes) >= maxMessageAttributes { + log.Info("Cannot inject trace context: message already has maximum allowed attributes") + return + } + + carrier := make(messageCarrier) + err := tracer.Inject(span.Context(), carrier) + if err != nil { + log.Debug("Unable to inject trace context: %s", err.Error()) + return + } + + jsonBytes, err := json.Marshal(carrier) + if err != nil { + log.Debug("Unable to marshal trace context: %s", err.Error()) + return + } + + messageAttributes[datadogKey] = types.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String(string(jsonBytes)), + } } From 0f235a560004ee8b8486dfee5e65f3e5da3ca70a Mon Sep 17 00:00:00 2001 From: Nicholas Hulston Date: Mon, 7 Oct 2024 15:17:29 -0400 Subject: [PATCH 03/11] Implement trace injection for EventBridge --- .../aws/internal/eventbridge/eventbridge.go | 77 ++++++++++++++++++- 1 file changed, 74 insertions(+), 3 deletions(-) diff --git a/contrib/aws/internal/eventbridge/eventbridge.go b/contrib/aws/internal/eventbridge/eventbridge.go index 7dc200f7a1..f65cb4f503 100644 --- a/contrib/aws/internal/eventbridge/eventbridge.go +++ b/contrib/aws/internal/eventbridge/eventbridge.go @@ -2,14 +2,22 @@ package eventbridge import ( "context" + "encoding/json" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/eventbridge" "github.com/aws/aws-sdk-go-v2/service/eventbridge/types" "github.com/aws/smithy-go/middleware" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + "gopkg.in/DataDog/dd-trace-go.v1/internal/log" + "strconv" + "time" ) const ( datadogKey = "_datadog" startTimeKey = "x-datadog-start-time" resourceNameKey = "x-datadog-resource-name" + maxSizeBytes = 256 * 1024 // 256 KB ) type messageCarrier map[string]string @@ -26,9 +34,72 @@ func EnrichOperation(ctx context.Context, in middleware.InitializeInput, operati } func handlePutEvents(ctx context.Context, in middleware.InitializeInput) { - // TODO + params, ok := in.Parameters.(*eventbridge.PutEventsInput) + if !ok { + log.Debug("Unable to read PutEvents params") + return + } + + for i := range params.Entries { + injectTraceContext(ctx, ¶ms.Entries[i]) + } } -func injectTraceContext(ctx context.Context, entry *types.PutEventsRequestEntry) { - // TODO +func injectTraceContext(ctx context.Context, entryPtr *types.PutEventsRequestEntry) { + if entryPtr == nil { + return + } + + span, ok := tracer.SpanFromContext(ctx) + if !ok || span == nil { + log.Debug("Unable to find span from context") + return + } + + carrier := make(messageCarrier) + err := tracer.Inject(span.Context(), carrier) + if err != nil { + log.Debug("Unable to inject trace context: %s", err) + return + } + + // Add start time and resource name + startTimeMillis := time.Now().UnixMilli() + carrier[startTimeKey] = strconv.FormatInt(startTimeMillis, 10) + if entryPtr.EventBusName != nil { + carrier[resourceNameKey] = *entryPtr.EventBusName + } + + var detail map[string]interface{} + if entryPtr.Detail != nil { + err = json.Unmarshal([]byte(*entryPtr.Detail), &detail) + if err != nil { + log.Debug("Unable to unmarshal event detail: %s", err) + return + } + } else { + detail = make(map[string]interface{}) + } + + jsonBytes, err := json.Marshal(carrier) + if err != nil { + log.Debug("Unable to marshal trace context: %s", err) + return + } + + detail[datadogKey] = json.RawMessage(jsonBytes) + + updatedDetail, err := json.Marshal(detail) + if err != nil { + log.Debug("Unable to marshal modified event detail: %s", err) + return + } + + // Check new detail size + if len(updatedDetail) > maxSizeBytes { + log.Info("Payload size too large to pass context") + return + } + + entryPtr.Detail = aws.String(string(updatedDetail)) } From 6a89b614e7b45ef8d1bc0d128548e8bce44147a2 Mon Sep 17 00:00:00 2001 From: Nicholas Hulston Date: Mon, 7 Oct 2024 15:25:34 -0400 Subject: [PATCH 04/11] Add tests --- contrib/aws/aws-sdk-go-v2/aws/aws_test.go | 140 +++++++++++- .../internal/eventbridge/eventbridge_test.go | 207 ++++++++++++++++++ contrib/aws/internal/sns/sns_test.go | 191 ++++++++++++++++ contrib/aws/internal/sqs/sqs_test.go | 194 ++++++++++++++++ 4 files changed, 729 insertions(+), 3 deletions(-) create mode 100644 contrib/aws/internal/eventbridge/eventbridge_test.go create mode 100644 contrib/aws/internal/sns/sns_test.go create mode 100644 contrib/aws/internal/sqs/sqs_test.go diff --git a/contrib/aws/aws-sdk-go-v2/aws/aws_test.go b/contrib/aws/aws-sdk-go-v2/aws/aws_test.go index 88fba42c5d..6a5412a175 100644 --- a/contrib/aws/aws-sdk-go-v2/aws/aws_test.go +++ b/contrib/aws/aws-sdk-go-v2/aws/aws_test.go @@ -8,6 +8,7 @@ package aws import ( "context" "encoding/base64" + "encoding/json" "net/http" "net/http/httptest" "net/url" @@ -24,12 +25,13 @@ import ( "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/ec2" "github.com/aws/aws-sdk-go-v2/service/eventbridge" + eventBridgeTypes "github.com/aws/aws-sdk-go-v2/service/eventbridge/types" "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/sfn" "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sqs" - "github.com/aws/aws-sdk-go-v2/service/sqs/types" + sqsTypes "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/aws/smithy-go/middleware" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -281,6 +283,66 @@ func TestAppendMiddlewareSqsReceiveMessage(t *testing.T) { } } +func TestAppendMiddlewareSqsSendMessage(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + expectedStatusCode := 200 + server := mockAWS(expectedStatusCode) + defer server.Close() + + resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) { + return aws.Endpoint{ + PartitionID: "aws", + URL: server.URL, + SigningRegion: "eu-west-1", + }, nil + }) + + awsCfg := aws.Config{ + Region: "eu-west-1", + Credentials: aws.AnonymousCredentials{}, + EndpointResolver: resolver, + } + + AppendMiddleware(&awsCfg) + + sqsClient := sqs.NewFromConfig(awsCfg) + sendMessageInput := &sqs.SendMessageInput{ + MessageBody: aws.String("test message"), + QueueUrl: aws.String("https://sqs.us-west-2.amazonaws.com/123456789012/MyQueueName"), + } + _, err := sqsClient.SendMessage(context.Background(), sendMessageInput) + require.NoError(t, err) + + spans := mt.FinishedSpans() + require.Len(t, spans, 1) + + s := spans[0] + assert.Equal(t, "SQS.request", s.OperationName()) + assert.Equal(t, "SendMessage", s.Tag("aws.operation")) + assert.Equal(t, "SQS", s.Tag("aws.service")) + assert.Equal(t, "MyQueueName", s.Tag("queuename")) + assert.Equal(t, "SQS.SendMessage", s.Tag(ext.ResourceName)) + assert.Equal(t, "aws.SQS", s.Tag(ext.ServiceName)) + + // Check for trace context injection + assert.NotNil(t, sendMessageInput.MessageAttributes) + assert.Contains(t, sendMessageInput.MessageAttributes, "_datadog") + ddAttr := sendMessageInput.MessageAttributes["_datadog"] + assert.Equal(t, "String", *ddAttr.DataType) + assert.NotEmpty(t, *ddAttr.StringValue) + + // Decode and verify the injected trace context + var traceContext map[string]string + err = json.Unmarshal([]byte(*ddAttr.StringValue), &traceContext) + assert.NoError(t, err) + assert.Contains(t, traceContext, "x-datadog-trace-id") + assert.Contains(t, traceContext, "x-datadog-parent-id") + assert.NotEmpty(t, traceContext["x-datadog-trace-id"]) + assert.NotEmpty(t, traceContext["x-datadog-parent-id"]) +} + func TestAppendMiddlewareS3ListObjects(t *testing.T) { tests := []struct { name string @@ -441,6 +503,22 @@ func TestAppendMiddlewareSnsPublish(t *testing.T) { assert.Equal(t, server.URL+"/", s.Tag(ext.HTTPURL)) assert.Equal(t, "aws/aws-sdk-go-v2/aws", s.Tag(ext.Component)) assert.Equal(t, ext.SpanKindClient, s.Tag(ext.SpanKind)) + + // Check for trace context injection + assert.NotNil(t, tt.publishInput.MessageAttributes) + assert.Contains(t, tt.publishInput.MessageAttributes, "_datadog") + ddAttr := tt.publishInput.MessageAttributes["_datadog"] + assert.Equal(t, "String", *ddAttr.DataType) + assert.NotEmpty(t, *ddAttr.StringValue) + + // Decode and verify the injected trace context + var traceContext map[string]string + err := json.Unmarshal([]byte(*ddAttr.StringValue), &traceContext) + assert.NoError(t, err) + assert.Contains(t, traceContext, "x-datadog-trace-id") + assert.Contains(t, traceContext, "x-datadog-parent-id") + assert.NotEmpty(t, traceContext["x-datadog-trace-id"]) + assert.NotEmpty(t, traceContext["x-datadog-parent-id"]) }) } } @@ -657,6 +735,62 @@ func TestAppendMiddlewareEventBridgePutRule(t *testing.T) { } } +func TestAppendMiddlewareEventBridgePutEvents(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + expectedStatusCode := 200 + server := mockAWS(expectedStatusCode) + defer server.Close() + + resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) { + return aws.Endpoint{ + PartitionID: "aws", + URL: server.URL, + SigningRegion: "eu-west-1", + }, nil + }) + + awsCfg := aws.Config{ + Region: "eu-west-1", + Credentials: aws.AnonymousCredentials{}, + EndpointResolver: resolver, + } + + AppendMiddleware(&awsCfg) + + eventbridgeClient := eventbridge.NewFromConfig(awsCfg) + putEventsInput := &eventbridge.PutEventsInput{ + Entries: []eventBridgeTypes.PutEventsRequestEntry{ + { + EventBusName: aws.String("my-event-bus"), + Detail: aws.String(`{"key": "value"}`), + }, + }, + } + eventbridgeClient.PutEvents(context.Background(), putEventsInput) + + spans := mt.FinishedSpans() + require.Len(t, spans, 1) + + s := spans[0] + assert.Equal(t, "PutEvents", s.Tag("aws.operation")) + assert.Equal(t, "EventBridge.PutEvents", s.Tag(ext.ResourceName)) + + // Check for trace context injection + assert.Len(t, putEventsInput.Entries, 1) + entry := putEventsInput.Entries[0] + var detail map[string]interface{} + err := json.Unmarshal([]byte(*entry.Detail), &detail) + assert.NoError(t, err) + assert.Contains(t, detail, "_datadog") + ddData, ok := detail["_datadog"].(map[string]interface{}) + assert.True(t, ok) + assert.Contains(t, ddData, "x-datadog-start-time") + assert.Contains(t, ddData, "x-datadog-resource-name") + assert.Equal(t, "my-event-bus", ddData["x-datadog-resource-name"]) +} + func TestAppendMiddlewareSfnDescribeStateMachine(t *testing.T) { tests := []struct { name string @@ -971,8 +1105,8 @@ func TestMessagingNamingSchema(t *testing.T) { _, err = sqsClient.SendMessage(ctx, msg) require.NoError(t, err) - entry := types.SendMessageBatchRequestEntry{Id: aws.String("1"), MessageBody: aws.String("body")} - batchMsg := &sqs.SendMessageBatchInput{QueueUrl: sqsResp.QueueUrl, Entries: []types.SendMessageBatchRequestEntry{entry}} + entry := sqsTypes.SendMessageBatchRequestEntry{Id: aws.String("1"), MessageBody: aws.String("body")} + batchMsg := &sqs.SendMessageBatchInput{QueueUrl: sqsResp.QueueUrl, Entries: []sqsTypes.SendMessageBatchRequestEntry{entry}} _, err = sqsClient.SendMessageBatch(ctx, batchMsg) require.NoError(t, err) diff --git a/contrib/aws/internal/eventbridge/eventbridge_test.go b/contrib/aws/internal/eventbridge/eventbridge_test.go new file mode 100644 index 0000000000..0e5d6c8df8 --- /dev/null +++ b/contrib/aws/internal/eventbridge/eventbridge_test.go @@ -0,0 +1,207 @@ +package eventbridge + +import ( + "context" + "encoding/json" + "strconv" + "strings" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/eventbridge" + "github.com/aws/aws-sdk-go-v2/service/eventbridge/types" + "github.com/aws/smithy-go/middleware" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" +) + +type testCarrier struct { + m map[string]string +} + +func (c *testCarrier) Set(key, val string) { + c.m[key] = val +} + +func (c *testCarrier) ForeachKey(handler func(key, val string) error) error { + for k, v := range c.m { + if err := handler(k, v); err != nil { + return err + } + } + return nil +} + +func TestEnrichOperation(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + ctx := context.Background() + _, ctx = tracer.StartSpanFromContext(ctx, "test-span") + + input := middleware.InitializeInput{ + Parameters: &eventbridge.PutEventsInput{ + Entries: []types.PutEventsRequestEntry{ + { + Detail: aws.String(`{"key": "value"}`), + EventBusName: aws.String("test-bus"), + }, + { + Detail: aws.String(`{"another": "data"}`), + EventBusName: aws.String("test-bus-2"), + }, + }, + }, + } + + EnrichOperation(ctx, input, "PutEvents") + + params, ok := input.Parameters.(*eventbridge.PutEventsInput) + require.True(t, ok) + require.Len(t, params.Entries, 2) + + for _, entry := range params.Entries { + var detail map[string]interface{} + err := json.Unmarshal([]byte(*entry.Detail), &detail) + require.NoError(t, err) + + assert.Contains(t, detail, datadogKey) + ddData, ok := detail[datadogKey].(map[string]interface{}) + require.True(t, ok) + + assert.Contains(t, ddData, startTimeKey) + assert.Contains(t, ddData, resourceNameKey) + assert.Equal(t, *entry.EventBusName, ddData[resourceNameKey]) + } +} + +func TestInjectTraceContext(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + ctx := context.Background() + span, ctx := tracer.StartSpanFromContext(ctx, "test-span") + + tests := []struct { + name string + entry types.PutEventsRequestEntry + expected func(*testing.T, *types.PutEventsRequestEntry) + }{ + { + name: "Inject into empty detail", + entry: types.PutEventsRequestEntry{ + EventBusName: aws.String("test-bus"), + }, + expected: func(t *testing.T, entry *types.PutEventsRequestEntry) { + assert.NotNil(t, entry.Detail) + var detail map[string]interface{} + err := json.Unmarshal([]byte(*entry.Detail), &detail) + require.NoError(t, err) + assert.Contains(t, detail, datadogKey) + }, + }, + { + name: "Inject into existing detail", + entry: types.PutEventsRequestEntry{ + Detail: aws.String(`{"existing": "data"}`), + EventBusName: aws.String("test-bus"), + }, + expected: func(t *testing.T, entry *types.PutEventsRequestEntry) { + var detail map[string]interface{} + err := json.Unmarshal([]byte(*entry.Detail), &detail) + require.NoError(t, err) + assert.Contains(t, detail, "existing") + assert.Equal(t, "data", detail["existing"]) + assert.Contains(t, detail, datadogKey) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + injectTraceContext(ctx, &tt.entry) + tt.expected(t, &tt.entry) + + var detail map[string]interface{} + err := json.Unmarshal([]byte(*tt.entry.Detail), &detail) + require.NoError(t, err) + + ddData := detail[datadogKey].(map[string]interface{}) + assert.Contains(t, ddData, startTimeKey) + assert.Contains(t, ddData, resourceNameKey) + assert.Equal(t, *tt.entry.EventBusName, ddData[resourceNameKey]) + + // Check that start time exists and is not empty + startTimeStr, ok := ddData[startTimeKey].(string) + assert.True(t, ok) + startTime, err := strconv.ParseInt(startTimeStr, 10, 64) + assert.NoError(t, err) + assert.Greater(t, startTime, int64(0)) + + var carrier testCarrier + carrier.m = make(map[string]string) + for k, v := range ddData { + if s, ok := v.(string); ok { + carrier.m[k] = s + } + } + + extractedSpanContext, err := tracer.Extract(&carrier) + assert.NoError(t, err) + assert.Equal(t, span.Context().TraceID(), extractedSpanContext.TraceID()) + assert.Equal(t, span.Context().SpanID(), extractedSpanContext.SpanID()) + }) + } +} + +func TestInjectTraceContextSizeLimit(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + ctx := context.Background() + _, ctx = tracer.StartSpanFromContext(ctx, "test-span") + + tests := []struct { + name string + entry types.PutEventsRequestEntry + expected func(*testing.T, *types.PutEventsRequestEntry) + }{ + { + name: "Do not inject when payload is too large", + entry: types.PutEventsRequestEntry{ + Detail: aws.String(`{"large": "` + strings.Repeat("a", maxSizeBytes-15) + `"}`), + EventBusName: aws.String("test-bus"), + }, + expected: func(t *testing.T, entry *types.PutEventsRequestEntry) { + assert.GreaterOrEqual(t, len(*entry.Detail), maxSizeBytes-15) + assert.NotContains(t, *entry.Detail, datadogKey) + assert.True(t, strings.HasPrefix(*entry.Detail, `{"large": "`)) + assert.True(t, strings.HasSuffix(*entry.Detail, `"}`)) + }, + }, + { + name: "Inject when payload is just under the limit", + entry: types.PutEventsRequestEntry{ + Detail: aws.String(`{"large": "` + strings.Repeat("a", maxSizeBytes-1000) + `"}`), + EventBusName: aws.String("test-bus"), + }, + expected: func(t *testing.T, entry *types.PutEventsRequestEntry) { + assert.Less(t, len(*entry.Detail), maxSizeBytes) + var detail map[string]interface{} + err := json.Unmarshal([]byte(*entry.Detail), &detail) + require.NoError(t, err) + assert.Contains(t, detail, datadogKey) + assert.Contains(t, detail, "large") + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + injectTraceContext(ctx, &tt.entry) + tt.expected(t, &tt.entry) + }) + } +} diff --git a/contrib/aws/internal/sns/sns_test.go b/contrib/aws/internal/sns/sns_test.go new file mode 100644 index 0000000000..9c04097d0b --- /dev/null +++ b/contrib/aws/internal/sns/sns_test.go @@ -0,0 +1,191 @@ +package sns + +import ( + "context" + "encoding/json" + "fmt" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sns" + "github.com/aws/aws-sdk-go-v2/service/sns/types" + "github.com/aws/smithy-go/middleware" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" +) + +type testCarrier struct { + m map[string]string +} + +func (c *testCarrier) Set(key, val string) { + c.m[key] = val +} + +func (c *testCarrier) ForeachKey(handler func(key, val string) error) error { + for k, v := range c.m { + if err := handler(k, v); err != nil { + return err + } + } + return nil +} + +func TestEnrichOperation(t *testing.T) { + tests := []struct { + name string + operation string + input middleware.InitializeInput + setup func(context.Context) context.Context + check func(*testing.T, middleware.InitializeInput) + }{ + { + name: "Publish", + operation: "Publish", + input: middleware.InitializeInput{ + Parameters: &sns.PublishInput{ + Message: aws.String("test message"), + TopicArn: aws.String("arn:aws:sns:us-east-1:123456789012:test-topic"), + }, + }, + setup: func(ctx context.Context) context.Context { + _, ctx = tracer.StartSpanFromContext(ctx, "test-span") + return ctx + }, + check: func(t *testing.T, in middleware.InitializeInput) { + params, ok := in.Parameters.(*sns.PublishInput) + require.True(t, ok) + require.NotNil(t, params) + require.NotNil(t, params.MessageAttributes) + assert.Contains(t, params.MessageAttributes, datadogKey) + assert.NotNil(t, params.MessageAttributes[datadogKey].DataType) + assert.Equal(t, "String", *params.MessageAttributes[datadogKey].DataType) + assert.NotNil(t, params.MessageAttributes[datadogKey].StringValue) + assert.NotEmpty(t, *params.MessageAttributes[datadogKey].StringValue) + }, + }, + { + name: "PublishBatch", + operation: "PublishBatch", + input: middleware.InitializeInput{ + Parameters: &sns.PublishBatchInput{ + TopicArn: aws.String("arn:aws:sns:us-east-1:123456789012:test-topic"), + PublishBatchRequestEntries: []types.PublishBatchRequestEntry{ + { + Id: aws.String("1"), + Message: aws.String("test message 1"), + }, + { + Id: aws.String("2"), + Message: aws.String("test message 2"), + }, + }, + }, + }, + setup: func(ctx context.Context) context.Context { + _, ctx = tracer.StartSpanFromContext(ctx, "test-span") + return ctx + }, + check: func(t *testing.T, in middleware.InitializeInput) { + params, ok := in.Parameters.(*sns.PublishBatchInput) + require.True(t, ok) + require.NotNil(t, params) + require.NotNil(t, params.PublishBatchRequestEntries) + require.Len(t, params.PublishBatchRequestEntries, 2) + + for _, entry := range params.PublishBatchRequestEntries { + require.NotNil(t, entry.MessageAttributes) + assert.Contains(t, entry.MessageAttributes, datadogKey) + assert.NotNil(t, entry.MessageAttributes[datadogKey].DataType) + assert.Equal(t, "String", *entry.MessageAttributes[datadogKey].DataType) + assert.NotNil(t, entry.MessageAttributes[datadogKey].StringValue) + assert.NotEmpty(t, *entry.MessageAttributes[datadogKey].StringValue) + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + ctx := context.Background() + if tt.setup != nil { + ctx = tt.setup(ctx) + } + + EnrichOperation(ctx, tt.input, tt.operation) + + if tt.check != nil { + tt.check(t, tt.input) + } + }) + } +} + +func TestInjectTraceContext(t *testing.T) { + tests := []struct { + name string + existingAttributes int + expectInjection bool + }{ + { + name: "Inject with no existing attributes", + existingAttributes: 0, + expectInjection: true, + }, + { + name: "Inject with some existing attributes", + existingAttributes: 5, + expectInjection: true, + }, + { + name: "No injection when at max attributes", + existingAttributes: maxMessageAttributes, + expectInjection: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + ctx := context.Background() + span, ctx := tracer.StartSpanFromContext(ctx, "test-span") + + messageAttributes := make(map[string]types.MessageAttributeValue) + for i := 0; i < tt.existingAttributes; i++ { + messageAttributes[fmt.Sprintf("attr%d", i)] = types.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String("value"), + } + } + + injectTraceContext(ctx, messageAttributes) + + if tt.expectInjection { + assert.Contains(t, messageAttributes, datadogKey) + assert.NotNil(t, messageAttributes[datadogKey].DataType) + assert.Equal(t, "String", *messageAttributes[datadogKey].DataType) + assert.NotNil(t, messageAttributes[datadogKey].StringValue) + assert.NotEmpty(t, *messageAttributes[datadogKey].StringValue) + + var carrier testCarrier + carrier.m = make(map[string]string) + err := json.Unmarshal([]byte(*messageAttributes[datadogKey].StringValue), &carrier.m) + assert.NoError(t, err) + + extractedSpanContext, err := tracer.Extract(&carrier) + assert.NoError(t, err) + assert.Equal(t, span.Context().TraceID(), extractedSpanContext.TraceID()) + assert.Equal(t, span.Context().SpanID(), extractedSpanContext.SpanID()) + } else { + assert.NotContains(t, messageAttributes, datadogKey) + } + }) + } +} diff --git a/contrib/aws/internal/sqs/sqs_test.go b/contrib/aws/internal/sqs/sqs_test.go new file mode 100644 index 0000000000..559a904f21 --- /dev/null +++ b/contrib/aws/internal/sqs/sqs_test.go @@ -0,0 +1,194 @@ +package sqs + +import ( + "context" + "encoding/json" + "fmt" + "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/sqs" + "github.com/aws/smithy-go/middleware" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" +) + +type testCarrier struct { + m map[string]string +} + +func (c *testCarrier) Set(key, val string) { + c.m[key] = val +} + +func (c *testCarrier) ForeachKey(handler func(key, val string) error) error { + for k, v := range c.m { + if err := handler(k, v); err != nil { + return err + } + } + return nil +} + +func TestEnrichOperation(t *testing.T) { + tests := []struct { + name string + operation string + input middleware.InitializeInput + setup func(context.Context) context.Context + check func(*testing.T, middleware.InitializeInput) + }{ + { + name: "SendMessage", + operation: "SendMessage", + input: middleware.InitializeInput{ + Parameters: &sqs.SendMessageInput{ + MessageBody: aws.String("test message"), + QueueUrl: aws.String("https://sqs.us-east-1.amazonaws.com/1234567890/test-queue"), + }, + }, + setup: func(ctx context.Context) context.Context { + _, ctx = tracer.StartSpanFromContext(ctx, "test-span") + return ctx + }, + check: func(t *testing.T, in middleware.InitializeInput) { + params, ok := in.Parameters.(*sqs.SendMessageInput) + require.True(t, ok) + require.NotNil(t, params) + require.NotNil(t, params.MessageAttributes) + assert.Contains(t, params.MessageAttributes, datadogKey) + assert.NotNil(t, params.MessageAttributes[datadogKey].DataType) + assert.Equal(t, "String", *params.MessageAttributes[datadogKey].DataType) + assert.NotNil(t, params.MessageAttributes[datadogKey].StringValue) + assert.NotEmpty(t, *params.MessageAttributes[datadogKey].StringValue) + }, + }, + { + name: "SendMessageBatch", + operation: "SendMessageBatch", + input: middleware.InitializeInput{ + Parameters: &sqs.SendMessageBatchInput{ + QueueUrl: aws.String("https://sqs.us-east-1.amazonaws.com/1234567890/test-queue"), + Entries: []types.SendMessageBatchRequestEntry{ + { + Id: aws.String("1"), + MessageBody: aws.String("test message 1"), + }, + { + Id: aws.String("2"), + MessageBody: aws.String("test message 2"), + }, + { + Id: aws.String("3"), + MessageBody: aws.String("test message 3"), + }, + }, + }, + }, + setup: func(ctx context.Context) context.Context { + _, ctx = tracer.StartSpanFromContext(ctx, "test-span") + return ctx + }, + check: func(t *testing.T, in middleware.InitializeInput) { + params, ok := in.Parameters.(*sqs.SendMessageBatchInput) + require.True(t, ok) + require.NotNil(t, params) + require.NotNil(t, params.Entries) + require.Len(t, params.Entries, 3) + + for _, entry := range params.Entries { + require.NotNil(t, entry.MessageAttributes) + assert.Contains(t, entry.MessageAttributes, datadogKey) + assert.NotNil(t, entry.MessageAttributes[datadogKey].DataType) + assert.Equal(t, "String", *entry.MessageAttributes[datadogKey].DataType) + assert.NotNil(t, entry.MessageAttributes[datadogKey].StringValue) + assert.NotEmpty(t, *entry.MessageAttributes[datadogKey].StringValue) + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + ctx := context.Background() + if tt.setup != nil { + ctx = tt.setup(ctx) + } + + EnrichOperation(ctx, tt.input, tt.operation) + + if tt.check != nil { + tt.check(t, tt.input) + } + }) + } +} + +func TestInjectTraceContext(t *testing.T) { + tests := []struct { + name string + existingAttributes int + expectInjection bool + }{ + { + name: "Inject with no existing attributes", + existingAttributes: 0, + expectInjection: true, + }, + { + name: "Inject with some existing attributes", + existingAttributes: 5, + expectInjection: true, + }, + { + name: "No injection when at max attributes", + existingAttributes: maxMessageAttributes, + expectInjection: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mt := mocktracer.Start() + defer mt.Stop() + + ctx := context.Background() + span, ctx := tracer.StartSpanFromContext(ctx, "test-span") + + messageAttributes := make(map[string]types.MessageAttributeValue) + for i := 0; i < tt.existingAttributes; i++ { + messageAttributes[fmt.Sprintf("attr%d", i)] = types.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String("value"), + } + } + + injectTraceContext(ctx, messageAttributes) + + if tt.expectInjection { + assert.Contains(t, messageAttributes, datadogKey) + assert.NotNil(t, messageAttributes[datadogKey].DataType) + assert.Equal(t, "String", *messageAttributes[datadogKey].DataType) + assert.NotNil(t, messageAttributes[datadogKey].StringValue) + assert.NotEmpty(t, *messageAttributes[datadogKey].StringValue) + + var carrier testCarrier + err := json.Unmarshal([]byte(*messageAttributes[datadogKey].StringValue), &carrier.m) + assert.NoError(t, err) + + extractedSpanContext, err := tracer.Extract(&carrier) + assert.NoError(t, err) + assert.Equal(t, span.Context().TraceID(), extractedSpanContext.TraceID()) + assert.Equal(t, span.Context().SpanID(), extractedSpanContext.SpanID()) + } else { + assert.NotContains(t, messageAttributes, datadogKey) + } + }) + } +} From 16335a7b6cb04c57296972d0bdb4955f9ce4dbd0 Mon Sep 17 00:00:00 2001 From: Nicholas Hulston Date: Mon, 7 Oct 2024 16:14:05 -0400 Subject: [PATCH 05/11] Add copyright --- contrib/aws/internal/eventbridge/eventbridge.go | 5 +++++ contrib/aws/internal/eventbridge/eventbridge_test.go | 5 +++++ contrib/aws/internal/sns/sns.go | 5 +++++ contrib/aws/internal/sns/sns_test.go | 5 +++++ contrib/aws/internal/sqs/sqs.go | 5 +++++ contrib/aws/internal/sqs/sqs_test.go | 5 +++++ 6 files changed, 30 insertions(+) diff --git a/contrib/aws/internal/eventbridge/eventbridge.go b/contrib/aws/internal/eventbridge/eventbridge.go index f65cb4f503..26172e2d00 100644 --- a/contrib/aws/internal/eventbridge/eventbridge.go +++ b/contrib/aws/internal/eventbridge/eventbridge.go @@ -1,3 +1,8 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + package eventbridge import ( diff --git a/contrib/aws/internal/eventbridge/eventbridge_test.go b/contrib/aws/internal/eventbridge/eventbridge_test.go index 0e5d6c8df8..c4ba41f00b 100644 --- a/contrib/aws/internal/eventbridge/eventbridge_test.go +++ b/contrib/aws/internal/eventbridge/eventbridge_test.go @@ -1,3 +1,8 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + package eventbridge import ( diff --git a/contrib/aws/internal/sns/sns.go b/contrib/aws/internal/sns/sns.go index 26d0a1c9ff..287122cbd3 100644 --- a/contrib/aws/internal/sns/sns.go +++ b/contrib/aws/internal/sns/sns.go @@ -1,3 +1,8 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + package sns import ( diff --git a/contrib/aws/internal/sns/sns_test.go b/contrib/aws/internal/sns/sns_test.go index 9c04097d0b..a74353225a 100644 --- a/contrib/aws/internal/sns/sns_test.go +++ b/contrib/aws/internal/sns/sns_test.go @@ -1,3 +1,8 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + package sns import ( diff --git a/contrib/aws/internal/sqs/sqs.go b/contrib/aws/internal/sqs/sqs.go index c474c9abea..225c383243 100644 --- a/contrib/aws/internal/sqs/sqs.go +++ b/contrib/aws/internal/sqs/sqs.go @@ -1,3 +1,8 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + package sqs import ( diff --git a/contrib/aws/internal/sqs/sqs_test.go b/contrib/aws/internal/sqs/sqs_test.go index 559a904f21..e7030743b0 100644 --- a/contrib/aws/internal/sqs/sqs_test.go +++ b/contrib/aws/internal/sqs/sqs_test.go @@ -1,3 +1,8 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016 Datadog, Inc. + package sqs import ( From b71cdd50be3b6f4f6cc6e4ad1d932d56466c8bcd Mon Sep 17 00:00:00 2001 From: Nicholas Hulston Date: Thu, 10 Oct 2024 10:43:27 -0400 Subject: [PATCH 06/11] Use existing carrier; avoid using confusing pointers --- .../aws/internal/eventbridge/eventbridge.go | 8 +------ .../internal/eventbridge/eventbridge_test.go | 22 ++--------------- contrib/aws/internal/sns/sns.go | 15 ++++-------- contrib/aws/internal/sns/sns_test.go | 24 +++---------------- contrib/aws/internal/sqs/sqs.go | 15 ++++-------- contrib/aws/internal/sqs/sqs_test.go | 23 +++--------------- 6 files changed, 17 insertions(+), 90 deletions(-) diff --git a/contrib/aws/internal/eventbridge/eventbridge.go b/contrib/aws/internal/eventbridge/eventbridge.go index 26172e2d00..d13df2f8d7 100644 --- a/contrib/aws/internal/eventbridge/eventbridge.go +++ b/contrib/aws/internal/eventbridge/eventbridge.go @@ -25,12 +25,6 @@ const ( maxSizeBytes = 256 * 1024 // 256 KB ) -type messageCarrier map[string]string - -func (carrier messageCarrier) Set(key, val string) { - carrier[key] = val -} - func EnrichOperation(ctx context.Context, in middleware.InitializeInput, operation string) { switch operation { case "PutEvents": @@ -61,7 +55,7 @@ func injectTraceContext(ctx context.Context, entryPtr *types.PutEventsRequestEnt return } - carrier := make(messageCarrier) + carrier := tracer.TextMapCarrier{} err := tracer.Inject(span.Context(), carrier) if err != nil { log.Debug("Unable to inject trace context: %s", err) diff --git a/contrib/aws/internal/eventbridge/eventbridge_test.go b/contrib/aws/internal/eventbridge/eventbridge_test.go index c4ba41f00b..15c8eff8eb 100644 --- a/contrib/aws/internal/eventbridge/eventbridge_test.go +++ b/contrib/aws/internal/eventbridge/eventbridge_test.go @@ -22,23 +22,6 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) -type testCarrier struct { - m map[string]string -} - -func (c *testCarrier) Set(key, val string) { - c.m[key] = val -} - -func (c *testCarrier) ForeachKey(handler func(key, val string) error) error { - for k, v := range c.m { - if err := handler(k, v); err != nil { - return err - } - } - return nil -} - func TestEnrichOperation(t *testing.T) { mt := mocktracer.Start() defer mt.Stop() @@ -145,11 +128,10 @@ func TestInjectTraceContext(t *testing.T) { assert.NoError(t, err) assert.Greater(t, startTime, int64(0)) - var carrier testCarrier - carrier.m = make(map[string]string) + carrier := tracer.TextMapCarrier{} for k, v := range ddData { if s, ok := v.(string); ok { - carrier.m[k] = s + carrier[k] = s } } diff --git a/contrib/aws/internal/sns/sns.go b/contrib/aws/internal/sns/sns.go index 287122cbd3..e2be7ca43a 100644 --- a/contrib/aws/internal/sns/sns.go +++ b/contrib/aws/internal/sns/sns.go @@ -21,12 +21,6 @@ const ( maxMessageAttributes = 10 ) -type messageCarrier map[string]string - -func (carrier messageCarrier) Set(key, val string) { - carrier[key] = val -} - func EnrichOperation(ctx context.Context, in middleware.InitializeInput, operation string) { switch operation { case "Publish": @@ -58,11 +52,10 @@ func handlePublishBatch(ctx context.Context, in middleware.InitializeInput) { } for i := range params.PublishBatchRequestEntries { - entryPtr := ¶ms.PublishBatchRequestEntries[i] - if entryPtr.MessageAttributes == nil { - entryPtr.MessageAttributes = make(map[string]types.MessageAttributeValue) + if params.PublishBatchRequestEntries[i].MessageAttributes == nil { + params.PublishBatchRequestEntries[i].MessageAttributes = make(map[string]types.MessageAttributeValue) } - injectTraceContext(ctx, entryPtr.MessageAttributes) + injectTraceContext(ctx, params.PublishBatchRequestEntries[i].MessageAttributes) } } @@ -81,7 +74,7 @@ func injectTraceContext(ctx context.Context, messageAttributes map[string]types. return } - carrier := make(messageCarrier) + carrier := tracer.TextMapCarrier{} err := tracer.Inject(span.Context(), carrier) if err != nil { log.Debug("Unable to inject trace context: %s", err.Error()) diff --git a/contrib/aws/internal/sns/sns_test.go b/contrib/aws/internal/sns/sns_test.go index a74353225a..53a54f0448 100644 --- a/contrib/aws/internal/sns/sns_test.go +++ b/contrib/aws/internal/sns/sns_test.go @@ -21,23 +21,6 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) -type testCarrier struct { - m map[string]string -} - -func (c *testCarrier) Set(key, val string) { - c.m[key] = val -} - -func (c *testCarrier) ForeachKey(handler func(key, val string) error) error { - for k, v := range c.m { - if err := handler(k, v); err != nil { - return err - } - } - return nil -} - func TestEnrichOperation(t *testing.T) { tests := []struct { name string @@ -179,12 +162,11 @@ func TestInjectTraceContext(t *testing.T) { assert.NotNil(t, messageAttributes[datadogKey].StringValue) assert.NotEmpty(t, *messageAttributes[datadogKey].StringValue) - var carrier testCarrier - carrier.m = make(map[string]string) - err := json.Unmarshal([]byte(*messageAttributes[datadogKey].StringValue), &carrier.m) + carrier := tracer.TextMapCarrier{} + err := json.Unmarshal([]byte(*messageAttributes[datadogKey].StringValue), &carrier) assert.NoError(t, err) - extractedSpanContext, err := tracer.Extract(&carrier) + extractedSpanContext, err := tracer.Extract(carrier) assert.NoError(t, err) assert.Equal(t, span.Context().TraceID(), extractedSpanContext.TraceID()) assert.Equal(t, span.Context().SpanID(), extractedSpanContext.SpanID()) diff --git a/contrib/aws/internal/sqs/sqs.go b/contrib/aws/internal/sqs/sqs.go index 225c383243..d6c3f2bc74 100644 --- a/contrib/aws/internal/sqs/sqs.go +++ b/contrib/aws/internal/sqs/sqs.go @@ -21,12 +21,6 @@ const ( maxMessageAttributes = 10 ) -type messageCarrier map[string]string - -func (carrier messageCarrier) Set(key, val string) { - carrier[key] = val -} - func EnrichOperation(ctx context.Context, in middleware.InitializeInput, operation string) { switch operation { case "SendMessage": @@ -58,11 +52,10 @@ func handleSendMessageBatch(ctx context.Context, in middleware.InitializeInput) } for i := range params.Entries { - entryPtr := ¶ms.Entries[i] - if entryPtr.MessageAttributes == nil { - entryPtr.MessageAttributes = make(map[string]types.MessageAttributeValue) + if params.Entries[i].MessageAttributes == nil { + params.Entries[i].MessageAttributes = make(map[string]types.MessageAttributeValue) } - injectTraceContext(ctx, entryPtr.MessageAttributes) + injectTraceContext(ctx, params.Entries[i].MessageAttributes) } } @@ -81,7 +74,7 @@ func injectTraceContext(ctx context.Context, messageAttributes map[string]types. return } - carrier := make(messageCarrier) + carrier := tracer.TextMapCarrier{} err := tracer.Inject(span.Context(), carrier) if err != nil { log.Debug("Unable to inject trace context: %s", err.Error()) diff --git a/contrib/aws/internal/sqs/sqs_test.go b/contrib/aws/internal/sqs/sqs_test.go index e7030743b0..a5c1f186cc 100644 --- a/contrib/aws/internal/sqs/sqs_test.go +++ b/contrib/aws/internal/sqs/sqs_test.go @@ -21,23 +21,6 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) -type testCarrier struct { - m map[string]string -} - -func (c *testCarrier) Set(key, val string) { - c.m[key] = val -} - -func (c *testCarrier) ForeachKey(handler func(key, val string) error) error { - for k, v := range c.m { - if err := handler(k, v); err != nil { - return err - } - } - return nil -} - func TestEnrichOperation(t *testing.T) { tests := []struct { name string @@ -183,11 +166,11 @@ func TestInjectTraceContext(t *testing.T) { assert.NotNil(t, messageAttributes[datadogKey].StringValue) assert.NotEmpty(t, *messageAttributes[datadogKey].StringValue) - var carrier testCarrier - err := json.Unmarshal([]byte(*messageAttributes[datadogKey].StringValue), &carrier.m) + carrier := tracer.TextMapCarrier{} + err := json.Unmarshal([]byte(*messageAttributes[datadogKey].StringValue), &carrier) assert.NoError(t, err) - extractedSpanContext, err := tracer.Extract(&carrier) + extractedSpanContext, err := tracer.Extract(carrier) assert.NoError(t, err) assert.Equal(t, span.Context().TraceID(), extractedSpanContext.TraceID()) assert.Equal(t, span.Context().SpanID(), extractedSpanContext.SpanID()) From 80be4e3e7f9e87a29da5e2182f7a54fbbbe332c0 Mon Sep 17 00:00:00 2001 From: Nicholas Hulston Date: Thu, 10 Oct 2024 11:13:27 -0400 Subject: [PATCH 07/11] Inject trace context into SNS message attributes as binary --- contrib/aws/aws-sdk-go-v2/aws/aws_test.go | 6 +++--- contrib/aws/internal/sns/sns.go | 6 ++++-- contrib/aws/internal/sns/sns_test.go | 20 ++++++++++---------- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/contrib/aws/aws-sdk-go-v2/aws/aws_test.go b/contrib/aws/aws-sdk-go-v2/aws/aws_test.go index 6a5412a175..09768a3f37 100644 --- a/contrib/aws/aws-sdk-go-v2/aws/aws_test.go +++ b/contrib/aws/aws-sdk-go-v2/aws/aws_test.go @@ -508,12 +508,12 @@ func TestAppendMiddlewareSnsPublish(t *testing.T) { assert.NotNil(t, tt.publishInput.MessageAttributes) assert.Contains(t, tt.publishInput.MessageAttributes, "_datadog") ddAttr := tt.publishInput.MessageAttributes["_datadog"] - assert.Equal(t, "String", *ddAttr.DataType) - assert.NotEmpty(t, *ddAttr.StringValue) + assert.Equal(t, "Binary", *ddAttr.DataType) + assert.NotEmpty(t, ddAttr.BinaryValue) // Decode and verify the injected trace context var traceContext map[string]string - err := json.Unmarshal([]byte(*ddAttr.StringValue), &traceContext) + err := json.Unmarshal(ddAttr.BinaryValue, &traceContext) assert.NoError(t, err) assert.Contains(t, traceContext, "x-datadog-trace-id") assert.Contains(t, traceContext, "x-datadog-parent-id") diff --git a/contrib/aws/internal/sns/sns.go b/contrib/aws/internal/sns/sns.go index e2be7ca43a..64f18b3f67 100644 --- a/contrib/aws/internal/sns/sns.go +++ b/contrib/aws/internal/sns/sns.go @@ -87,8 +87,10 @@ func injectTraceContext(ctx context.Context, messageAttributes map[string]types. return } + // Use Binary since SNS subscription filter policies fail silently with JSON + // strings. https://github.com/DataDog/datadog-lambda-js/pull/269 messageAttributes[datadogKey] = types.MessageAttributeValue{ - DataType: aws.String("String"), - StringValue: aws.String(string(jsonBytes)), + DataType: aws.String("Binary"), + BinaryValue: jsonBytes, } } diff --git a/contrib/aws/internal/sns/sns_test.go b/contrib/aws/internal/sns/sns_test.go index 53a54f0448..00e7bfd72f 100644 --- a/contrib/aws/internal/sns/sns_test.go +++ b/contrib/aws/internal/sns/sns_test.go @@ -49,9 +49,9 @@ func TestEnrichOperation(t *testing.T) { require.NotNil(t, params.MessageAttributes) assert.Contains(t, params.MessageAttributes, datadogKey) assert.NotNil(t, params.MessageAttributes[datadogKey].DataType) - assert.Equal(t, "String", *params.MessageAttributes[datadogKey].DataType) - assert.NotNil(t, params.MessageAttributes[datadogKey].StringValue) - assert.NotEmpty(t, *params.MessageAttributes[datadogKey].StringValue) + assert.Equal(t, "Binary", *params.MessageAttributes[datadogKey].DataType) + assert.NotNil(t, params.MessageAttributes[datadogKey].BinaryValue) + assert.NotEmpty(t, params.MessageAttributes[datadogKey].BinaryValue) }, }, { @@ -87,9 +87,9 @@ func TestEnrichOperation(t *testing.T) { require.NotNil(t, entry.MessageAttributes) assert.Contains(t, entry.MessageAttributes, datadogKey) assert.NotNil(t, entry.MessageAttributes[datadogKey].DataType) - assert.Equal(t, "String", *entry.MessageAttributes[datadogKey].DataType) - assert.NotNil(t, entry.MessageAttributes[datadogKey].StringValue) - assert.NotEmpty(t, *entry.MessageAttributes[datadogKey].StringValue) + assert.Equal(t, "Binary", *entry.MessageAttributes[datadogKey].DataType) + assert.NotNil(t, entry.MessageAttributes[datadogKey].BinaryValue) + assert.NotEmpty(t, entry.MessageAttributes[datadogKey].BinaryValue) } }, }, @@ -158,12 +158,12 @@ func TestInjectTraceContext(t *testing.T) { if tt.expectInjection { assert.Contains(t, messageAttributes, datadogKey) assert.NotNil(t, messageAttributes[datadogKey].DataType) - assert.Equal(t, "String", *messageAttributes[datadogKey].DataType) - assert.NotNil(t, messageAttributes[datadogKey].StringValue) - assert.NotEmpty(t, *messageAttributes[datadogKey].StringValue) + assert.Equal(t, "Binary", *messageAttributes[datadogKey].DataType) + assert.NotNil(t, messageAttributes[datadogKey].BinaryValue) + assert.NotEmpty(t, messageAttributes[datadogKey].BinaryValue) carrier := tracer.TextMapCarrier{} - err := json.Unmarshal([]byte(*messageAttributes[datadogKey].StringValue), &carrier) + err := json.Unmarshal(messageAttributes[datadogKey].BinaryValue, &carrier) assert.NoError(t, err) extractedSpanContext, err := tracer.Extract(carrier) From 9be0a75b3b05cad8d25f9199f3316f84888236d4 Mon Sep 17 00:00:00 2001 From: Nicholas Hulston Date: Thu, 10 Oct 2024 14:14:00 -0400 Subject: [PATCH 08/11] Improve efficiency by (1) passing in span directly, and (2) getting trace context bytes only once and reusing it for batch requests --- contrib/aws/aws-sdk-go-v2/aws/aws.go | 6 +- .../aws/internal/eventbridge/eventbridge.go | 17 ++---- .../internal/eventbridge/eventbridge_test.go | 18 +++--- contrib/aws/internal/sns/sns.go | 59 +++++++++++-------- contrib/aws/internal/sns/sns_test.go | 27 ++++----- contrib/aws/internal/sqs/sqs.go | 59 +++++++++++-------- contrib/aws/internal/sqs/sqs_test.go | 27 ++++----- 7 files changed, 109 insertions(+), 104 deletions(-) diff --git a/contrib/aws/aws-sdk-go-v2/aws/aws.go b/contrib/aws/aws-sdk-go-v2/aws/aws.go index 2149fa3661..2a2bbf5c38 100644 --- a/contrib/aws/aws-sdk-go-v2/aws/aws.go +++ b/contrib/aws/aws-sdk-go-v2/aws/aws.go @@ -112,11 +112,11 @@ func (mw *traceMiddleware) startTraceMiddleware(stack *middleware.Stack) error { // Inject trace context switch serviceID { case "SQS": - sqsTracer.EnrichOperation(spanctx, in, operation) + sqsTracer.EnrichOperation(span, in, operation) case "SNS": - snsTracer.EnrichOperation(spanctx, in, operation) + snsTracer.EnrichOperation(span, in, operation) case "EventBridge": - eventBridgeTracer.EnrichOperation(spanctx, in, operation) + eventBridgeTracer.EnrichOperation(span, in, operation) } // Handle initialize and continue through the middleware chain. diff --git a/contrib/aws/internal/eventbridge/eventbridge.go b/contrib/aws/internal/eventbridge/eventbridge.go index d13df2f8d7..b6497aab7c 100644 --- a/contrib/aws/internal/eventbridge/eventbridge.go +++ b/contrib/aws/internal/eventbridge/eventbridge.go @@ -6,7 +6,6 @@ package eventbridge import ( - "context" "encoding/json" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/eventbridge" @@ -25,14 +24,14 @@ const ( maxSizeBytes = 256 * 1024 // 256 KB ) -func EnrichOperation(ctx context.Context, in middleware.InitializeInput, operation string) { +func EnrichOperation(span tracer.Span, in middleware.InitializeInput, operation string) { switch operation { case "PutEvents": - handlePutEvents(ctx, in) + handlePutEvents(span, in) } } -func handlePutEvents(ctx context.Context, in middleware.InitializeInput) { +func handlePutEvents(span tracer.Span, in middleware.InitializeInput) { params, ok := in.Parameters.(*eventbridge.PutEventsInput) if !ok { log.Debug("Unable to read PutEvents params") @@ -40,21 +39,15 @@ func handlePutEvents(ctx context.Context, in middleware.InitializeInput) { } for i := range params.Entries { - injectTraceContext(ctx, ¶ms.Entries[i]) + injectTraceContext(span, ¶ms.Entries[i]) } } -func injectTraceContext(ctx context.Context, entryPtr *types.PutEventsRequestEntry) { +func injectTraceContext(span tracer.Span, entryPtr *types.PutEventsRequestEntry) { if entryPtr == nil { return } - span, ok := tracer.SpanFromContext(ctx) - if !ok || span == nil { - log.Debug("Unable to find span from context") - return - } - carrier := tracer.TextMapCarrier{} err := tracer.Inject(span.Context(), carrier) if err != nil { diff --git a/contrib/aws/internal/eventbridge/eventbridge_test.go b/contrib/aws/internal/eventbridge/eventbridge_test.go index 15c8eff8eb..6902865bf3 100644 --- a/contrib/aws/internal/eventbridge/eventbridge_test.go +++ b/contrib/aws/internal/eventbridge/eventbridge_test.go @@ -26,25 +26,24 @@ func TestEnrichOperation(t *testing.T) { mt := mocktracer.Start() defer mt.Stop() - ctx := context.Background() - _, ctx = tracer.StartSpanFromContext(ctx, "test-span") + span := tracer.StartSpan("test-span") input := middleware.InitializeInput{ Parameters: &eventbridge.PutEventsInput{ Entries: []types.PutEventsRequestEntry{ { - Detail: aws.String(`{"key": "value"}`), + Detail: aws.String(`{"@123": "value", "_foo": "bar"}`), EventBusName: aws.String("test-bus"), }, { - Detail: aws.String(`{"another": "data"}`), + Detail: aws.String(`{"@123": "data", "_foo": "bar"}`), EventBusName: aws.String("test-bus-2"), }, }, }, } - EnrichOperation(ctx, input, "PutEvents") + EnrichOperation(span, input, "PutEvents") params, ok := input.Parameters.(*eventbridge.PutEventsInput) require.True(t, ok) @@ -55,6 +54,8 @@ func TestEnrichOperation(t *testing.T) { err := json.Unmarshal([]byte(*entry.Detail), &detail) require.NoError(t, err) + assert.Contains(t, detail, "@123") // make sure user data still exists + assert.Contains(t, detail, "_foo") assert.Contains(t, detail, datadogKey) ddData, ok := detail[datadogKey].(map[string]interface{}) require.True(t, ok) @@ -109,7 +110,7 @@ func TestInjectTraceContext(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - injectTraceContext(ctx, &tt.entry) + injectTraceContext(span, &tt.entry) tt.expected(t, &tt.entry) var detail map[string]interface{} @@ -147,8 +148,7 @@ func TestInjectTraceContextSizeLimit(t *testing.T) { mt := mocktracer.Start() defer mt.Stop() - ctx := context.Background() - _, ctx = tracer.StartSpanFromContext(ctx, "test-span") + span := tracer.StartSpan("test-span") tests := []struct { name string @@ -187,7 +187,7 @@ func TestInjectTraceContextSizeLimit(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - injectTraceContext(ctx, &tt.entry) + injectTraceContext(span, &tt.entry) tt.expected(t, &tt.entry) }) } diff --git a/contrib/aws/internal/sns/sns.go b/contrib/aws/internal/sns/sns.go index 64f18b3f67..17785d2359 100644 --- a/contrib/aws/internal/sns/sns.go +++ b/contrib/aws/internal/sns/sns.go @@ -6,7 +6,6 @@ package sns import ( - "context" "encoding/json" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sns" @@ -21,69 +20,77 @@ const ( maxMessageAttributes = 10 ) -func EnrichOperation(ctx context.Context, in middleware.InitializeInput, operation string) { +func EnrichOperation(span tracer.Span, in middleware.InitializeInput, operation string) { switch operation { case "Publish": - handlePublish(ctx, in) + handlePublish(span, in) case "PublishBatch": - handlePublishBatch(ctx, in) + handlePublishBatch(span, in) } } -func handlePublish(ctx context.Context, in middleware.InitializeInput) { +func handlePublish(span tracer.Span, in middleware.InitializeInput) { params, ok := in.Parameters.(*sns.PublishInput) if !ok { log.Debug("Unable to read PublishInput params") return } + traceContext, err := getTraceContextBytes(span) + if err != nil { + log.Debug("Unable to get trace context: %s", err.Error()) + return + } + if params.MessageAttributes == nil { params.MessageAttributes = make(map[string]types.MessageAttributeValue) } - injectTraceContext(ctx, params.MessageAttributes) + injectTraceContext(traceContext, params.MessageAttributes) } -func handlePublishBatch(ctx context.Context, in middleware.InitializeInput) { +func handlePublishBatch(span tracer.Span, in middleware.InitializeInput) { params, ok := in.Parameters.(*sns.PublishBatchInput) if !ok { log.Debug("Unable to read PublishBatch params") return } + traceContext, err := getTraceContextBytes(span) + if err != nil { + log.Debug("Unable to get trace context: %s", err.Error()) + return + } + for i := range params.PublishBatchRequestEntries { if params.PublishBatchRequestEntries[i].MessageAttributes == nil { params.PublishBatchRequestEntries[i].MessageAttributes = make(map[string]types.MessageAttributeValue) } - injectTraceContext(ctx, params.PublishBatchRequestEntries[i].MessageAttributes) + injectTraceContext(traceContext, params.PublishBatchRequestEntries[i].MessageAttributes) } } -func injectTraceContext(ctx context.Context, messageAttributes map[string]types.MessageAttributeValue) { - span, ok := tracer.SpanFromContext(ctx) - if !ok || span == nil { - log.Debug("Unable to find span from context") - return - } - - // SNS only allows a maximum of 10 message attributes. - // https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html - // Only inject if there's room. - if len(messageAttributes) >= maxMessageAttributes { - log.Info("Cannot inject trace context: message already has maximum allowed attributes") - return - } - +func getTraceContextBytes(span tracer.Span) ([]byte, error) { carrier := tracer.TextMapCarrier{} err := tracer.Inject(span.Context(), carrier) if err != nil { - log.Debug("Unable to inject trace context: %s", err.Error()) - return + return nil, err } jsonBytes, err := json.Marshal(carrier) if err != nil { - log.Debug("Unable to marshal trace context: %s", err.Error()) + return nil, err + } + + return jsonBytes, nil +} + +func injectTraceContext(jsonBytes []byte, messageAttributes map[string]types.MessageAttributeValue) { + // SNS only allows a maximum of 10 message attributes. + // https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html + // Only inject if there's room. + if len(messageAttributes) >= maxMessageAttributes { + log.Info("Cannot inject trace context: message already has maximum allowed attributes") return } diff --git a/contrib/aws/internal/sns/sns_test.go b/contrib/aws/internal/sns/sns_test.go index 00e7bfd72f..805e92d683 100644 --- a/contrib/aws/internal/sns/sns_test.go +++ b/contrib/aws/internal/sns/sns_test.go @@ -26,7 +26,7 @@ func TestEnrichOperation(t *testing.T) { name string operation string input middleware.InitializeInput - setup func(context.Context) context.Context + setup func(context.Context) tracer.Span check func(*testing.T, middleware.InitializeInput) }{ { @@ -38,9 +38,9 @@ func TestEnrichOperation(t *testing.T) { TopicArn: aws.String("arn:aws:sns:us-east-1:123456789012:test-topic"), }, }, - setup: func(ctx context.Context) context.Context { - _, ctx = tracer.StartSpanFromContext(ctx, "test-span") - return ctx + setup: func(ctx context.Context) tracer.Span { + span, _ := tracer.StartSpanFromContext(ctx, "test-span") + return span }, check: func(t *testing.T, in middleware.InitializeInput) { params, ok := in.Parameters.(*sns.PublishInput) @@ -72,9 +72,9 @@ func TestEnrichOperation(t *testing.T) { }, }, }, - setup: func(ctx context.Context) context.Context { - _, ctx = tracer.StartSpanFromContext(ctx, "test-span") - return ctx + setup: func(ctx context.Context) tracer.Span { + span, _ := tracer.StartSpanFromContext(ctx, "test-span") + return span }, check: func(t *testing.T, in middleware.InitializeInput) { params, ok := in.Parameters.(*sns.PublishBatchInput) @@ -101,11 +101,9 @@ func TestEnrichOperation(t *testing.T) { defer mt.Stop() ctx := context.Background() - if tt.setup != nil { - ctx = tt.setup(ctx) - } + span := tt.setup(ctx) - EnrichOperation(ctx, tt.input, tt.operation) + EnrichOperation(span, tt.input, tt.operation) if tt.check != nil { tt.check(t, tt.input) @@ -142,8 +140,7 @@ func TestInjectTraceContext(t *testing.T) { mt := mocktracer.Start() defer mt.Stop() - ctx := context.Background() - span, ctx := tracer.StartSpanFromContext(ctx, "test-span") + span := tracer.StartSpan("test-span") messageAttributes := make(map[string]types.MessageAttributeValue) for i := 0; i < tt.existingAttributes; i++ { @@ -153,7 +150,9 @@ func TestInjectTraceContext(t *testing.T) { } } - injectTraceContext(ctx, messageAttributes) + traceContext, err := getTraceContextBytes(span) + assert.NoError(t, err) + injectTraceContext(traceContext, messageAttributes) if tt.expectInjection { assert.Contains(t, messageAttributes, datadogKey) diff --git a/contrib/aws/internal/sqs/sqs.go b/contrib/aws/internal/sqs/sqs.go index d6c3f2bc74..f3fcd25e81 100644 --- a/contrib/aws/internal/sqs/sqs.go +++ b/contrib/aws/internal/sqs/sqs.go @@ -6,7 +6,6 @@ package sqs import ( - "context" "encoding/json" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/sqs" @@ -21,69 +20,77 @@ const ( maxMessageAttributes = 10 ) -func EnrichOperation(ctx context.Context, in middleware.InitializeInput, operation string) { +func EnrichOperation(span tracer.Span, in middleware.InitializeInput, operation string) { switch operation { case "SendMessage": - handleSendMessage(ctx, in) + handleSendMessage(span, in) case "SendMessageBatch": - handleSendMessageBatch(ctx, in) + handleSendMessageBatch(span, in) } } -func handleSendMessage(ctx context.Context, in middleware.InitializeInput) { +func handleSendMessage(span tracer.Span, in middleware.InitializeInput) { params, ok := in.Parameters.(*sqs.SendMessageInput) if !ok { log.Debug("Unable to read SendMessage params") return } + traceContext, err := getTraceContextBytes(span) + if err != nil { + log.Debug("Unable to get trace context: %s", err.Error()) + return + } + if params.MessageAttributes == nil { params.MessageAttributes = make(map[string]types.MessageAttributeValue) } - injectTraceContext(ctx, params.MessageAttributes) + injectTraceContext(traceContext, params.MessageAttributes) } -func handleSendMessageBatch(ctx context.Context, in middleware.InitializeInput) { +func handleSendMessageBatch(span tracer.Span, in middleware.InitializeInput) { params, ok := in.Parameters.(*sqs.SendMessageBatchInput) if !ok { log.Debug("Unable to read SendMessageBatch params") return } + traceContext, err := getTraceContextBytes(span) + if err != nil { + log.Debug("Unable to get trace context: %s", err.Error()) + return + } + for i := range params.Entries { if params.Entries[i].MessageAttributes == nil { params.Entries[i].MessageAttributes = make(map[string]types.MessageAttributeValue) } - injectTraceContext(ctx, params.Entries[i].MessageAttributes) + injectTraceContext(traceContext, params.Entries[i].MessageAttributes) } } -func injectTraceContext(ctx context.Context, messageAttributes map[string]types.MessageAttributeValue) { - span, ok := tracer.SpanFromContext(ctx) - if !ok || span == nil { - log.Debug("Unable to find span from context") - return - } - - // SQS only allows a maximum of 10 message attributes. - // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes - // Only inject if there's room. - if len(messageAttributes) >= maxMessageAttributes { - log.Info("Cannot inject trace context: message already has maximum allowed attributes") - return - } - +func getTraceContextBytes(span tracer.Span) ([]byte, error) { carrier := tracer.TextMapCarrier{} err := tracer.Inject(span.Context(), carrier) if err != nil { - log.Debug("Unable to inject trace context: %s", err.Error()) - return + return nil, err } jsonBytes, err := json.Marshal(carrier) if err != nil { - log.Debug("Unable to marshal trace context: %s", err.Error()) + return nil, err + } + + return jsonBytes, nil +} + +func injectTraceContext(jsonBytes []byte, messageAttributes map[string]types.MessageAttributeValue) { + // SQS only allows a maximum of 10 message attributes. + // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes + // Only inject if there's room. + if len(messageAttributes) >= maxMessageAttributes { + log.Info("Cannot inject trace context: message already has maximum allowed attributes") return } diff --git a/contrib/aws/internal/sqs/sqs_test.go b/contrib/aws/internal/sqs/sqs_test.go index a5c1f186cc..cac46f8833 100644 --- a/contrib/aws/internal/sqs/sqs_test.go +++ b/contrib/aws/internal/sqs/sqs_test.go @@ -26,7 +26,7 @@ func TestEnrichOperation(t *testing.T) { name string operation string input middleware.InitializeInput - setup func(context.Context) context.Context + setup func(context.Context) tracer.Span check func(*testing.T, middleware.InitializeInput) }{ { @@ -38,9 +38,9 @@ func TestEnrichOperation(t *testing.T) { QueueUrl: aws.String("https://sqs.us-east-1.amazonaws.com/1234567890/test-queue"), }, }, - setup: func(ctx context.Context) context.Context { - _, ctx = tracer.StartSpanFromContext(ctx, "test-span") - return ctx + setup: func(ctx context.Context) tracer.Span { + span, _ := tracer.StartSpanFromContext(ctx, "test-span") + return span }, check: func(t *testing.T, in middleware.InitializeInput) { params, ok := in.Parameters.(*sqs.SendMessageInput) @@ -76,9 +76,9 @@ func TestEnrichOperation(t *testing.T) { }, }, }, - setup: func(ctx context.Context) context.Context { - _, ctx = tracer.StartSpanFromContext(ctx, "test-span") - return ctx + setup: func(ctx context.Context) tracer.Span { + span, _ := tracer.StartSpanFromContext(ctx, "test-span") + return span }, check: func(t *testing.T, in middleware.InitializeInput) { params, ok := in.Parameters.(*sqs.SendMessageBatchInput) @@ -105,11 +105,9 @@ func TestEnrichOperation(t *testing.T) { defer mt.Stop() ctx := context.Background() - if tt.setup != nil { - ctx = tt.setup(ctx) - } + span := tt.setup(ctx) - EnrichOperation(ctx, tt.input, tt.operation) + EnrichOperation(span, tt.input, tt.operation) if tt.check != nil { tt.check(t, tt.input) @@ -146,8 +144,7 @@ func TestInjectTraceContext(t *testing.T) { mt := mocktracer.Start() defer mt.Stop() - ctx := context.Background() - span, ctx := tracer.StartSpanFromContext(ctx, "test-span") + span := tracer.StartSpan("test-span") messageAttributes := make(map[string]types.MessageAttributeValue) for i := 0; i < tt.existingAttributes; i++ { @@ -157,7 +154,9 @@ func TestInjectTraceContext(t *testing.T) { } } - injectTraceContext(ctx, messageAttributes) + traceContext, err := getTraceContextBytes(span) + assert.NoError(t, err) + injectTraceContext(traceContext, messageAttributes) if tt.expectInjection { assert.Contains(t, messageAttributes, datadogKey) From fb1a4f8d1be75853684f40487e175e429fab80b4 Mon Sep 17 00:00:00 2001 From: Nicholas Hulston Date: Thu, 10 Oct 2024 15:06:27 -0400 Subject: [PATCH 09/11] Improve efficiency by (1) building the new attribute once and reusing it for SNS/SQS, and (2) checking new eventbridge payload size before marshaling the updated detail --- .../aws/internal/eventbridge/eventbridge.go | 19 +++++++------ .../internal/eventbridge/eventbridge_test.go | 4 +-- contrib/aws/internal/sns/sns.go | 28 ++++++++++--------- contrib/aws/internal/sns/sns_test.go | 2 +- contrib/aws/internal/sqs/sqs.go | 24 ++++++++-------- contrib/aws/internal/sqs/sqs_test.go | 2 +- 6 files changed, 43 insertions(+), 36 deletions(-) diff --git a/contrib/aws/internal/eventbridge/eventbridge.go b/contrib/aws/internal/eventbridge/eventbridge.go index b6497aab7c..e82f4ecbb1 100644 --- a/contrib/aws/internal/eventbridge/eventbridge.go +++ b/contrib/aws/internal/eventbridge/eventbridge.go @@ -79,19 +79,22 @@ func injectTraceContext(span tracer.Span, entryPtr *types.PutEventsRequestEntry) return } - detail[datadogKey] = json.RawMessage(jsonBytes) + // Check sizes + detailSize := 0 + if entryPtr.Detail != nil { + detailSize = len(*entryPtr.Detail) + } + traceSize := len(jsonBytes) + if detailSize+traceSize > maxSizeBytes { + log.Info("Payload size too large to pass context") + return + } + detail[datadogKey] = json.RawMessage(jsonBytes) updatedDetail, err := json.Marshal(detail) if err != nil { log.Debug("Unable to marshal modified event detail: %s", err) return } - - // Check new detail size - if len(updatedDetail) > maxSizeBytes { - log.Info("Payload size too large to pass context") - return - } - entryPtr.Detail = aws.String(string(updatedDetail)) } diff --git a/contrib/aws/internal/eventbridge/eventbridge_test.go b/contrib/aws/internal/eventbridge/eventbridge_test.go index 6902865bf3..fda81714b3 100644 --- a/contrib/aws/internal/eventbridge/eventbridge_test.go +++ b/contrib/aws/internal/eventbridge/eventbridge_test.go @@ -158,11 +158,11 @@ func TestInjectTraceContextSizeLimit(t *testing.T) { { name: "Do not inject when payload is too large", entry: types.PutEventsRequestEntry{ - Detail: aws.String(`{"large": "` + strings.Repeat("a", maxSizeBytes-15) + `"}`), + Detail: aws.String(`{"large": "` + strings.Repeat("a", maxSizeBytes-50) + `"}`), EventBusName: aws.String("test-bus"), }, expected: func(t *testing.T, entry *types.PutEventsRequestEntry) { - assert.GreaterOrEqual(t, len(*entry.Detail), maxSizeBytes-15) + assert.GreaterOrEqual(t, len(*entry.Detail), maxSizeBytes-50) assert.NotContains(t, *entry.Detail, datadogKey) assert.True(t, strings.HasPrefix(*entry.Detail, `{"large": "`)) assert.True(t, strings.HasSuffix(*entry.Detail, `"}`)) diff --git a/contrib/aws/internal/sns/sns.go b/contrib/aws/internal/sns/sns.go index 17785d2359..b40ca5ea85 100644 --- a/contrib/aws/internal/sns/sns.go +++ b/contrib/aws/internal/sns/sns.go @@ -36,7 +36,7 @@ func handlePublish(span tracer.Span, in middleware.InitializeInput) { return } - traceContext, err := getTraceContextBytes(span) + traceContext, err := getTraceContext(span) if err != nil { log.Debug("Unable to get trace context: %s", err.Error()) return @@ -56,7 +56,7 @@ func handlePublishBatch(span tracer.Span, in middleware.InitializeInput) { return } - traceContext, err := getTraceContextBytes(span) + traceContext, err := getTraceContext(span) if err != nil { log.Debug("Unable to get trace context: %s", err.Error()) return @@ -70,22 +70,29 @@ func handlePublishBatch(span tracer.Span, in middleware.InitializeInput) { } } -func getTraceContextBytes(span tracer.Span) ([]byte, error) { +func getTraceContext(span tracer.Span) (types.MessageAttributeValue, error) { carrier := tracer.TextMapCarrier{} err := tracer.Inject(span.Context(), carrier) if err != nil { - return nil, err + return types.MessageAttributeValue{}, err } jsonBytes, err := json.Marshal(carrier) if err != nil { - return nil, err + return types.MessageAttributeValue{}, err } - return jsonBytes, nil + // Use Binary since SNS subscription filter policies fail silently with JSON + // strings. https://github.com/DataDog/datadog-lambda-js/pull/269 + attribute := types.MessageAttributeValue{ + DataType: aws.String("Binary"), + BinaryValue: jsonBytes, + } + + return attribute, nil } -func injectTraceContext(jsonBytes []byte, messageAttributes map[string]types.MessageAttributeValue) { +func injectTraceContext(traceContext types.MessageAttributeValue, messageAttributes map[string]types.MessageAttributeValue) { // SNS only allows a maximum of 10 message attributes. // https://docs.aws.amazon.com/sns/latest/dg/sns-message-attributes.html // Only inject if there's room. @@ -94,10 +101,5 @@ func injectTraceContext(jsonBytes []byte, messageAttributes map[string]types.Mes return } - // Use Binary since SNS subscription filter policies fail silently with JSON - // strings. https://github.com/DataDog/datadog-lambda-js/pull/269 - messageAttributes[datadogKey] = types.MessageAttributeValue{ - DataType: aws.String("Binary"), - BinaryValue: jsonBytes, - } + messageAttributes[datadogKey] = traceContext } diff --git a/contrib/aws/internal/sns/sns_test.go b/contrib/aws/internal/sns/sns_test.go index 805e92d683..0f955680f0 100644 --- a/contrib/aws/internal/sns/sns_test.go +++ b/contrib/aws/internal/sns/sns_test.go @@ -150,7 +150,7 @@ func TestInjectTraceContext(t *testing.T) { } } - traceContext, err := getTraceContextBytes(span) + traceContext, err := getTraceContext(span) assert.NoError(t, err) injectTraceContext(traceContext, messageAttributes) diff --git a/contrib/aws/internal/sqs/sqs.go b/contrib/aws/internal/sqs/sqs.go index f3fcd25e81..9fbd8a9f90 100644 --- a/contrib/aws/internal/sqs/sqs.go +++ b/contrib/aws/internal/sqs/sqs.go @@ -36,7 +36,7 @@ func handleSendMessage(span tracer.Span, in middleware.InitializeInput) { return } - traceContext, err := getTraceContextBytes(span) + traceContext, err := getTraceContext(span) if err != nil { log.Debug("Unable to get trace context: %s", err.Error()) return @@ -56,7 +56,7 @@ func handleSendMessageBatch(span tracer.Span, in middleware.InitializeInput) { return } - traceContext, err := getTraceContextBytes(span) + traceContext, err := getTraceContext(span) if err != nil { log.Debug("Unable to get trace context: %s", err.Error()) return @@ -70,22 +70,27 @@ func handleSendMessageBatch(span tracer.Span, in middleware.InitializeInput) { } } -func getTraceContextBytes(span tracer.Span) ([]byte, error) { +func getTraceContext(span tracer.Span) (types.MessageAttributeValue, error) { carrier := tracer.TextMapCarrier{} err := tracer.Inject(span.Context(), carrier) if err != nil { - return nil, err + return types.MessageAttributeValue{}, err } jsonBytes, err := json.Marshal(carrier) if err != nil { - return nil, err + return types.MessageAttributeValue{}, err } - return jsonBytes, nil + attribute := types.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String(string(jsonBytes)), + } + + return attribute, nil } -func injectTraceContext(jsonBytes []byte, messageAttributes map[string]types.MessageAttributeValue) { +func injectTraceContext(traceContext types.MessageAttributeValue, messageAttributes map[string]types.MessageAttributeValue) { // SQS only allows a maximum of 10 message attributes. // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes // Only inject if there's room. @@ -94,8 +99,5 @@ func injectTraceContext(jsonBytes []byte, messageAttributes map[string]types.Mes return } - messageAttributes[datadogKey] = types.MessageAttributeValue{ - DataType: aws.String("String"), - StringValue: aws.String(string(jsonBytes)), - } + messageAttributes[datadogKey] = traceContext } diff --git a/contrib/aws/internal/sqs/sqs_test.go b/contrib/aws/internal/sqs/sqs_test.go index cac46f8833..1a66adab09 100644 --- a/contrib/aws/internal/sqs/sqs_test.go +++ b/contrib/aws/internal/sqs/sqs_test.go @@ -154,7 +154,7 @@ func TestInjectTraceContext(t *testing.T) { } } - traceContext, err := getTraceContextBytes(span) + traceContext, err := getTraceContext(span) assert.NoError(t, err) injectTraceContext(traceContext, messageAttributes) From 368537874b263c85dcbe408961914a213d534f4b Mon Sep 17 00:00:00 2001 From: Nicholas Hulston Date: Wed, 16 Oct 2024 14:44:01 -0400 Subject: [PATCH 10/11] Improve efficiency by manually parsing strings to inject custom fields and trace context. Reuse shared trace context for multiple entries --- .../aws/internal/eventbridge/eventbridge.go | 86 ++++++++++--------- .../internal/eventbridge/eventbridge_test.go | 22 +++-- 2 files changed, 57 insertions(+), 51 deletions(-) diff --git a/contrib/aws/internal/eventbridge/eventbridge.go b/contrib/aws/internal/eventbridge/eventbridge.go index e82f4ecbb1..4ef556d04b 100644 --- a/contrib/aws/internal/eventbridge/eventbridge.go +++ b/contrib/aws/internal/eventbridge/eventbridge.go @@ -7,13 +7,13 @@ package eventbridge import ( "encoding/json" + "fmt" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/eventbridge" "github.com/aws/aws-sdk-go-v2/service/eventbridge/types" "github.com/aws/smithy-go/middleware" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" - "strconv" "time" ) @@ -38,16 +38,7 @@ func handlePutEvents(span tracer.Span, in middleware.InitializeInput) { return } - for i := range params.Entries { - injectTraceContext(span, ¶ms.Entries[i]) - } -} - -func injectTraceContext(span tracer.Span, entryPtr *types.PutEventsRequestEntry) { - if entryPtr == nil { - return - } - + // Create trace context carrier := tracer.TextMapCarrier{} err := tracer.Inject(span.Context(), carrier) if err != nil { @@ -55,46 +46,63 @@ func injectTraceContext(span tracer.Span, entryPtr *types.PutEventsRequestEntry) return } - // Add start time and resource name + carrierJSON, err := json.Marshal(carrier) + if err != nil { + log.Debug("Unable to marshal trace context: %s", err) + return + } + + // Prepare the reused trace context string startTimeMillis := time.Now().UnixMilli() - carrier[startTimeKey] = strconv.FormatInt(startTimeMillis, 10) + reusedTraceContext := fmt.Sprintf(`%s,"%s":"%d"`, carrierJSON[:len(carrierJSON)-1], startTimeKey, startTimeMillis) + + for i := range params.Entries { + injectTraceContext(reusedTraceContext, ¶ms.Entries[i]) + } +} + +func injectTraceContext(baseTraceContext string, entryPtr *types.PutEventsRequestEntry) { + if entryPtr == nil { + return + } + + // Build the complete trace context + var traceContext string if entryPtr.EventBusName != nil { - carrier[resourceNameKey] = *entryPtr.EventBusName + traceContext = fmt.Sprintf(`%s,"%s":"%s"}`, baseTraceContext, resourceNameKey, *entryPtr.EventBusName) + } else { + traceContext = baseTraceContext + "}" } - var detail map[string]interface{} - if entryPtr.Detail != nil { - err = json.Unmarshal([]byte(*entryPtr.Detail), &detail) - if err != nil { - log.Debug("Unable to unmarshal event detail: %s", err) - return - } + // Get current detail string + var detail string + if entryPtr.Detail == nil || *entryPtr.Detail == "" { + detail = "{}" } else { - detail = make(map[string]interface{}) + detail = *entryPtr.Detail } - jsonBytes, err := json.Marshal(carrier) - if err != nil { - log.Debug("Unable to marshal trace context: %s", err) + // Basic JSON structure validation + if len(detail) < 2 || detail[len(detail)-1] != '}' { + log.Debug("Unable to parse detail JSON. Not injecting trace context into EventBridge payload.") return } - // Check sizes - detailSize := 0 - if entryPtr.Detail != nil { - detailSize = len(*entryPtr.Detail) - } - traceSize := len(jsonBytes) - if detailSize+traceSize > maxSizeBytes { - log.Info("Payload size too large to pass context") - return + // Create new detail string + var newDetail string + if len(detail) > 2 { + // Case where detail is not empty + newDetail = fmt.Sprintf(`%s,"%s":%s}`, detail[:len(detail)-1], datadogKey, traceContext) + } else { + // Cae where detail is empty + newDetail = fmt.Sprintf(`{"%s":%s}`, datadogKey, traceContext) } - detail[datadogKey] = json.RawMessage(jsonBytes) - updatedDetail, err := json.Marshal(detail) - if err != nil { - log.Debug("Unable to marshal modified event detail: %s", err) + // Check sizes + if len(newDetail) > maxSizeBytes { + log.Debug("Payload size too large to pass context") return } - entryPtr.Detail = aws.String(string(updatedDetail)) + + entryPtr.Detail = aws.String(newDetail) } diff --git a/contrib/aws/internal/eventbridge/eventbridge_test.go b/contrib/aws/internal/eventbridge/eventbridge_test.go index fda81714b3..77c9ab1e72 100644 --- a/contrib/aws/internal/eventbridge/eventbridge_test.go +++ b/contrib/aws/internal/eventbridge/eventbridge_test.go @@ -8,10 +8,7 @@ package eventbridge import ( "context" "encoding/json" - "strconv" - "strings" - "testing" - + "fmt" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/eventbridge" "github.com/aws/aws-sdk-go-v2/service/eventbridge/types" @@ -20,6 +17,8 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/mocktracer" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + "strings" + "testing" ) func TestEnrichOperation(t *testing.T) { @@ -71,7 +70,8 @@ func TestInjectTraceContext(t *testing.T) { defer mt.Stop() ctx := context.Background() - span, ctx := tracer.StartSpanFromContext(ctx, "test-span") + span, _ := tracer.StartSpanFromContext(ctx, "test-span") + baseTraceContext := fmt.Sprintf(`{"x-datadog-trace-id":"%d","x-datadog-parent-id":"%d","x-datadog-start-time":"123456789"`, span.Context().TraceID(), span.Context().SpanID()) tests := []struct { name string @@ -110,7 +110,7 @@ func TestInjectTraceContext(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - injectTraceContext(span, &tt.entry) + injectTraceContext(baseTraceContext, &tt.entry) tt.expected(t, &tt.entry) var detail map[string]interface{} @@ -123,11 +123,9 @@ func TestInjectTraceContext(t *testing.T) { assert.Equal(t, *tt.entry.EventBusName, ddData[resourceNameKey]) // Check that start time exists and is not empty - startTimeStr, ok := ddData[startTimeKey].(string) + startTime, ok := ddData[startTimeKey] assert.True(t, ok) - startTime, err := strconv.ParseInt(startTimeStr, 10, 64) - assert.NoError(t, err) - assert.Greater(t, startTime, int64(0)) + assert.Equal(t, startTime, "123456789") carrier := tracer.TextMapCarrier{} for k, v := range ddData { @@ -148,7 +146,7 @@ func TestInjectTraceContextSizeLimit(t *testing.T) { mt := mocktracer.Start() defer mt.Stop() - span := tracer.StartSpan("test-span") + baseTraceContext := `{"x-datadog-trace-id":"12345","x-datadog-parent-id":"67890","x-datadog-start-time":"123456789"` tests := []struct { name string @@ -187,7 +185,7 @@ func TestInjectTraceContextSizeLimit(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - injectTraceContext(span, &tt.entry) + injectTraceContext(baseTraceContext, &tt.entry) tt.expected(t, &tt.entry) }) } From edb0584a24a8f3933e22d3871b0ddfabfed3d84a Mon Sep 17 00:00:00 2001 From: Nicholas Hulston Date: Wed, 16 Oct 2024 17:04:42 -0400 Subject: [PATCH 11/11] Set start time in carrier before unmarshaling --- contrib/aws/internal/eventbridge/eventbridge.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/contrib/aws/internal/eventbridge/eventbridge.go b/contrib/aws/internal/eventbridge/eventbridge.go index 4ef556d04b..5a2a56068e 100644 --- a/contrib/aws/internal/eventbridge/eventbridge.go +++ b/contrib/aws/internal/eventbridge/eventbridge.go @@ -14,6 +14,7 @@ import ( "github.com/aws/smithy-go/middleware" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" + "strconv" "time" ) @@ -46,15 +47,18 @@ func handlePutEvents(span tracer.Span, in middleware.InitializeInput) { return } + // Add start time + startTimeMillis := time.Now().UnixMilli() + carrier[startTimeKey] = strconv.FormatInt(startTimeMillis, 10) + carrierJSON, err := json.Marshal(carrier) if err != nil { log.Debug("Unable to marshal trace context: %s", err) return } - // Prepare the reused trace context string - startTimeMillis := time.Now().UnixMilli() - reusedTraceContext := fmt.Sprintf(`%s,"%s":"%d"`, carrierJSON[:len(carrierJSON)-1], startTimeKey, startTimeMillis) + // Remove last '}' + reusedTraceContext := string(carrierJSON[:len(carrierJSON)-1]) for i := range params.Entries { injectTraceContext(reusedTraceContext, ¶ms.Entries[i])