diff --git a/instrumentation/github.com/Shopify/sarama/consumer.go b/instrumentation/github.com/Shopify/sarama/consumer.go index b945a037c41..88ecff36e65 100644 --- a/instrumentation/github.com/Shopify/sarama/consumer.go +++ b/instrumentation/github.com/Shopify/sarama/consumer.go @@ -51,7 +51,7 @@ func WrapPartitionConsumer(serviceName string, pc sarama.PartitionConsumer, opts for msg := range msgs { // Extract a span context from message to link. carrier := NewConsumerMessageCarrier(msg) - parentSpanContext := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), cfg.Propagators, carrier)) + parentSpanContext := propagation.ExtractHTTP(context.Background(), cfg.Propagators, carrier) // Create a span. attrs := []kv.KeyValue{ @@ -67,10 +67,7 @@ func WrapPartitionConsumer(serviceName string, pc sarama.PartitionConsumer, opts trace.WithAttributes(attrs...), trace.WithSpanKind(trace.SpanKindConsumer), } - if parentSpanContext.IsValid() { - opts = append(opts, trace.LinkedTo(parentSpanContext)) - } - newCtx, span := cfg.Tracer.Start(context.Background(), "kafka.consume", opts...) + newCtx, span := cfg.Tracer.Start(parentSpanContext, "kafka.consume", opts...) // Inject current span context, so consumers can use it to propagate span. propagation.InjectHTTP(newCtx, cfg.Propagators, carrier) diff --git a/instrumentation/github.com/Shopify/sarama/consumer_test.go b/instrumentation/github.com/Shopify/sarama/consumer_test.go index 48d304c7167..8b1658ab3a5 100644 --- a/instrumentation/github.com/Shopify/sarama/consumer_test.go +++ b/instrumentation/github.com/Shopify/sarama/consumer_test.go @@ -99,10 +99,10 @@ func consumeAndCheck(t *testing.T, mt *mocktracer.Tracer, mockPartitionConsumer assert.Len(t, spans, 2) expectedList := []struct { - kvList []kv.KeyValue - links map[trace.SpanContext][]kv.KeyValue - kind trace.SpanKind - msgKey []byte + kvList []kv.KeyValue + parentSpanID trace.SpanID + kind trace.SpanKind + msgKey []byte }{ { kvList: []kv.KeyValue{ @@ -114,11 +114,9 @@ func consumeAndCheck(t *testing.T, mt *mocktracer.Tracer, mockPartitionConsumer standard.MessagingMessageIDKey.Int64(1), kafkaPartitionKey.Int32(0), }, - links: map[trace.SpanContext][]kv.KeyValue{ - trace.SpanFromContext(ctx).SpanContext(): nil, - }, - kind: trace.SpanKindConsumer, - msgKey: []byte("foo"), + parentSpanID: trace.SpanFromContext(ctx).SpanContext().SpanID, + kind: trace.SpanKindConsumer, + msgKey: []byte("foo"), }, { kvList: []kv.KeyValue{ @@ -130,7 +128,6 @@ func consumeAndCheck(t *testing.T, mt *mocktracer.Tracer, mockPartitionConsumer standard.MessagingMessageIDKey.Int64(2), kafkaPartitionKey.Int32(0), }, - links: make(map[trace.SpanContext][]kv.KeyValue), kind: trace.SpanKindConsumer, msgKey: []byte("foo2"), }, @@ -140,7 +137,7 @@ func consumeAndCheck(t *testing.T, mt *mocktracer.Tracer, mockPartitionConsumer t.Run(fmt.Sprint("index", i), func(t *testing.T) { span := spans[i] - assert.Equal(t, expected.links, span.Links) + assert.Equal(t, expected.parentSpanID, span.ParentSpanID) remoteSpanFromMessage := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), propagators, NewConsumerMessageCarrier(msgList[i]))) assert.Equal(t, span.SpanContext(), remoteSpanFromMessage,