Skip to content

Commit

Permalink
Fix producer fail to inject tracing info into message header
Browse files Browse the repository at this point in the history
  • Loading branch information
XSAM committed Jul 14, 2020
1 parent 76845d6 commit 7ed4ac5
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 14 deletions.
24 changes: 16 additions & 8 deletions instrumentation/github.com/Shopify/sarama/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ import (

type syncProducer struct {
sarama.SyncProducer
cfg config
cfg config
saramaConfig *sarama.Config
}

// SendMessage calls sarama.SyncProducer.SendMessage and traces the request.
func (p *syncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
span := startProducerSpan(p.cfg, msg)
span := startProducerSpan(p.cfg, p.saramaConfig.Version, msg)
partition, offset, err = p.SyncProducer.SendMessage(msg)
finishProducerSpan(span, partition, offset, err)
return partition, offset, err
Expand All @@ -46,7 +47,7 @@ func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
// treated individually, so we create a span for each one
spans := make([]trace.Span, len(msgs))
for i, msg := range msgs {
spans[i] = startProducerSpan(p.cfg, msg)
spans[i] = startProducerSpan(p.cfg, p.saramaConfig.Version, msg)
}
err := p.SyncProducer.SendMessages(msgs)
for i, span := range spans {
Expand All @@ -57,11 +58,16 @@ func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error {

// WrapSyncProducer wraps a sarama.SyncProducer so that all produced messages
// are traced.
func WrapSyncProducer(serviceName string, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer {
func WrapSyncProducer(serviceName string, saramaConfig *sarama.Config, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer {
cfg := newConfig(serviceName, opts...)
if saramaConfig == nil {
saramaConfig = sarama.NewConfig()
}

return &syncProducer{
SyncProducer: producer,
cfg: cfg,
saramaConfig: saramaConfig,
}
}

Expand Down Expand Up @@ -143,7 +149,7 @@ func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama
}
case msg := <-wrapped.input:
msg.Metadata = uuid.New()
span := startProducerSpan(cfg, msg)
span := startProducerSpan(cfg, saramaConfig.Version, msg)
p.Input() <- msg
if saramaConfig.Producer.Return.Successes {
spans[msg.Metadata] = span
Expand Down Expand Up @@ -181,7 +187,7 @@ func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama
return wrapped
}

func startProducerSpan(cfg config, msg *sarama.ProducerMessage) trace.Span {
func startProducerSpan(cfg config, version sarama.KafkaVersion, msg *sarama.ProducerMessage) trace.Span {
// If there's a span context in the message, use that as the parent context.
carrier := NewProducerMessageCarrier(msg)
ctx := propagation.ExtractHTTP(context.Background(), cfg.Propagators, carrier)
Expand All @@ -199,8 +205,10 @@ func startProducerSpan(cfg config, msg *sarama.ProducerMessage) trace.Span {
}
ctx, span := cfg.Tracer.Start(ctx, "kafka.produce", opts...)

// Inject current span context, so consumers can use it to propagate span.
propagation.InjectHTTP(ctx, cfg.Propagators, carrier)
if version.IsAtLeast(sarama.V0_11_0_0) {
// Inject current span context, so consumers can use it to propagate span.
propagation.InjectHTTP(ctx, cfg.Propagators, carrier)
}

return span
}
Expand Down
20 changes: 14 additions & 6 deletions instrumentation/github.com/Shopify/sarama/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ func TestWrapSyncProducer(t *testing.T) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")

cfg := newSaramaConfig()
// Mock sync producer
mockSyncProducer := mocks.NewSyncProducer(t, sarama.NewConfig())
mockSyncProducer := mocks.NewSyncProducer(t, cfg)

// Wrap sync producer
syncProducer := WrapSyncProducer(serviceName, mockSyncProducer, WithTracer(mt))
syncProducer := WrapSyncProducer(serviceName, cfg, mockSyncProducer, WithTracer(mt))

// Create message with span context
ctx, _ := mt.Start(context.Background(), "")
Expand Down Expand Up @@ -157,8 +158,9 @@ func TestWrapAsyncProducer(t *testing.T) {

t.Run("without successes config", func(t *testing.T) {
mt := mocktracer.NewTracer("kafka")
mockAsyncProducer := mocks.NewAsyncProducer(t, nil)
ap := WrapAsyncProducer(serviceName, nil, mockAsyncProducer, WithTracer(mt))
cfg := newSaramaConfig()
mockAsyncProducer := mocks.NewAsyncProducer(t, cfg)
ap := WrapAsyncProducer(serviceName, cfg, mockAsyncProducer, WithTracer(mt))

msgList := createMessages(mt)
// Send message
Expand Down Expand Up @@ -225,7 +227,7 @@ func TestWrapAsyncProducer(t *testing.T) {
mt := mocktracer.NewTracer("kafka")

// Set producer with successes config
cfg := sarama.NewConfig()
cfg := newSaramaConfig()
cfg.Producer.Return.Successes = true

mockAsyncProducer := mocks.NewAsyncProducer(t, cfg)
Expand Down Expand Up @@ -298,7 +300,7 @@ func TestWrapAsyncProducer_Error(t *testing.T) {
mt := mocktracer.NewTracer("kafka")

// Set producer with successes config
cfg := sarama.NewConfig()
cfg := newSaramaConfig()
cfg.Producer.Return.Successes = true

mockAsyncProducer := mocks.NewAsyncProducer(t, cfg)
Expand All @@ -320,3 +322,9 @@ func TestWrapAsyncProducer_Error(t *testing.T) {
assert.Equal(t, codes.Internal, span.Status)
assert.Equal(t, "test", span.StatusMessage)
}

func newSaramaConfig() *sarama.Config {
cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0
return cfg
}

0 comments on commit 7ed4ac5

Please sign in to comment.