From 7ed4ac51a7fbac6312d7435cc6454ad96d8c27fd Mon Sep 17 00:00:00 2001 From: Sam Xie Date: Tue, 14 Jul 2020 19:22:11 +0800 Subject: [PATCH] Fix producer fail to inject tracing info into message header --- .../github.com/Shopify/sarama/producer.go | 24 ++++++++++++------- .../Shopify/sarama/producer_test.go | 20 +++++++++++----- 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/instrumentation/github.com/Shopify/sarama/producer.go b/instrumentation/github.com/Shopify/sarama/producer.go index c8dd30379af..84f968c3747 100644 --- a/instrumentation/github.com/Shopify/sarama/producer.go +++ b/instrumentation/github.com/Shopify/sarama/producer.go @@ -29,12 +29,13 @@ import ( type syncProducer struct { sarama.SyncProducer - cfg config + cfg config + saramaConfig *sarama.Config } // SendMessage calls sarama.SyncProducer.SendMessage and traces the request. func (p *syncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { - span := startProducerSpan(p.cfg, msg) + span := startProducerSpan(p.cfg, p.saramaConfig.Version, msg) partition, offset, err = p.SyncProducer.SendMessage(msg) finishProducerSpan(span, partition, offset, err) return partition, offset, err @@ -46,7 +47,7 @@ func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error { // treated individually, so we create a span for each one spans := make([]trace.Span, len(msgs)) for i, msg := range msgs { - spans[i] = startProducerSpan(p.cfg, msg) + spans[i] = startProducerSpan(p.cfg, p.saramaConfig.Version, msg) } err := p.SyncProducer.SendMessages(msgs) for i, span := range spans { @@ -57,11 +58,16 @@ func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error { // WrapSyncProducer wraps a sarama.SyncProducer so that all produced messages // are traced. -func WrapSyncProducer(serviceName string, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer { +func WrapSyncProducer(serviceName string, saramaConfig *sarama.Config, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer { cfg := newConfig(serviceName, opts...) + if saramaConfig == nil { + saramaConfig = sarama.NewConfig() + } + return &syncProducer{ SyncProducer: producer, cfg: cfg, + saramaConfig: saramaConfig, } } @@ -143,7 +149,7 @@ func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama } case msg := <-wrapped.input: msg.Metadata = uuid.New() - span := startProducerSpan(cfg, msg) + span := startProducerSpan(cfg, saramaConfig.Version, msg) p.Input() <- msg if saramaConfig.Producer.Return.Successes { spans[msg.Metadata] = span @@ -181,7 +187,7 @@ func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama return wrapped } -func startProducerSpan(cfg config, msg *sarama.ProducerMessage) trace.Span { +func startProducerSpan(cfg config, version sarama.KafkaVersion, msg *sarama.ProducerMessage) trace.Span { // If there's a span context in the message, use that as the parent context. carrier := NewProducerMessageCarrier(msg) ctx := propagation.ExtractHTTP(context.Background(), cfg.Propagators, carrier) @@ -199,8 +205,10 @@ func startProducerSpan(cfg config, msg *sarama.ProducerMessage) trace.Span { } ctx, span := cfg.Tracer.Start(ctx, "kafka.produce", opts...) - // Inject current span context, so consumers can use it to propagate span. - propagation.InjectHTTP(ctx, cfg.Propagators, carrier) + if version.IsAtLeast(sarama.V0_11_0_0) { + // Inject current span context, so consumers can use it to propagate span. + propagation.InjectHTTP(ctx, cfg.Propagators, carrier) + } return span } diff --git a/instrumentation/github.com/Shopify/sarama/producer_test.go b/instrumentation/github.com/Shopify/sarama/producer_test.go index e4a49cb033a..c2bc2400563 100644 --- a/instrumentation/github.com/Shopify/sarama/producer_test.go +++ b/instrumentation/github.com/Shopify/sarama/producer_test.go @@ -38,11 +38,12 @@ func TestWrapSyncProducer(t *testing.T) { // Mock tracer mt := mocktracer.NewTracer("kafka") + cfg := newSaramaConfig() // Mock sync producer - mockSyncProducer := mocks.NewSyncProducer(t, sarama.NewConfig()) + mockSyncProducer := mocks.NewSyncProducer(t, cfg) // Wrap sync producer - syncProducer := WrapSyncProducer(serviceName, mockSyncProducer, WithTracer(mt)) + syncProducer := WrapSyncProducer(serviceName, cfg, mockSyncProducer, WithTracer(mt)) // Create message with span context ctx, _ := mt.Start(context.Background(), "") @@ -157,8 +158,9 @@ func TestWrapAsyncProducer(t *testing.T) { t.Run("without successes config", func(t *testing.T) { mt := mocktracer.NewTracer("kafka") - mockAsyncProducer := mocks.NewAsyncProducer(t, nil) - ap := WrapAsyncProducer(serviceName, nil, mockAsyncProducer, WithTracer(mt)) + cfg := newSaramaConfig() + mockAsyncProducer := mocks.NewAsyncProducer(t, cfg) + ap := WrapAsyncProducer(serviceName, cfg, mockAsyncProducer, WithTracer(mt)) msgList := createMessages(mt) // Send message @@ -225,7 +227,7 @@ func TestWrapAsyncProducer(t *testing.T) { mt := mocktracer.NewTracer("kafka") // Set producer with successes config - cfg := sarama.NewConfig() + cfg := newSaramaConfig() cfg.Producer.Return.Successes = true mockAsyncProducer := mocks.NewAsyncProducer(t, cfg) @@ -298,7 +300,7 @@ func TestWrapAsyncProducer_Error(t *testing.T) { mt := mocktracer.NewTracer("kafka") // Set producer with successes config - cfg := sarama.NewConfig() + cfg := newSaramaConfig() cfg.Producer.Return.Successes = true mockAsyncProducer := mocks.NewAsyncProducer(t, cfg) @@ -320,3 +322,9 @@ func TestWrapAsyncProducer_Error(t *testing.T) { assert.Equal(t, codes.Internal, span.Status) assert.Equal(t, "test", span.StatusMessage) } + +func newSaramaConfig() *sarama.Config { + cfg := sarama.NewConfig() + cfg.Version = sarama.V0_11_0_0 + return cfg +}