diff --git a/instrumentation/github.com/Shopify/sarama/go.mod b/instrumentation/github.com/Shopify/sarama/go.mod index 768ec4de833..b0f56e9320b 100644 --- a/instrumentation/github.com/Shopify/sarama/go.mod +++ b/instrumentation/github.com/Shopify/sarama/go.mod @@ -6,7 +6,6 @@ replace go.opentelemetry.io/contrib => ../../../.. require ( github.com/Shopify/sarama v1.26.4 - github.com/google/uuid v1.1.1 github.com/stretchr/testify v1.6.1 go.opentelemetry.io/contrib v0.7.0 go.opentelemetry.io/otel v0.8.0 diff --git a/instrumentation/github.com/Shopify/sarama/go.sum b/instrumentation/github.com/Shopify/sarama/go.sum index fc4f972bafb..fd24d62a10f 100644 --- a/instrumentation/github.com/Shopify/sarama/go.sum +++ b/instrumentation/github.com/Shopify/sarama/go.sum @@ -50,8 +50,6 @@ github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= -github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8= diff --git a/instrumentation/github.com/Shopify/sarama/producer.go b/instrumentation/github.com/Shopify/sarama/producer.go index c604676101c..14a2d50d082 100644 --- a/instrumentation/github.com/Shopify/sarama/producer.go +++ b/instrumentation/github.com/Shopify/sarama/producer.go @@ -18,7 +18,6 @@ import ( "context" "github.com/Shopify/sarama" - "github.com/google/uuid" "google.golang.org/grpc/codes" "go.opentelemetry.io/otel/api/kv" @@ -83,7 +82,6 @@ type asyncProducer struct { input chan *sarama.ProducerMessage successes chan *sarama.ProducerMessage errors chan *sarama.ProducerError - close chan closeType closeErr chan error } @@ -104,16 +102,25 @@ func (p *asyncProducer) Errors() <-chan *sarama.ProducerError { // AsyncClose async close producer. func (p *asyncProducer) AsyncClose() { - p.close <- closeAsync + p.input <- &sarama.ProducerMessage{ + Metadata: closeAsync, + } } // Close shuts down the producer and waits for any buffered messages to be // flushed. func (p *asyncProducer) Close() error { - p.close <- closeSync + p.input <- &sarama.ProducerMessage{ + Metadata: closeSync, + } return <-p.closeErr } +type producerMessageContext struct { + span trace.Span + metadataBackup interface{} +} + // WrapAsyncProducer wraps a sarama.AsyncProducer so that all produced messages // are traced. It requires the underlying sarama Config so we can know whether // or not successes will be returned. @@ -131,30 +138,45 @@ func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama input: make(chan *sarama.ProducerMessage), successes: make(chan *sarama.ProducerMessage), errors: make(chan *sarama.ProducerError), - close: make(chan closeType), closeErr: make(chan error), } go func() { - spans := make(map[interface{}]trace.Span) + producerMessageContexts := make(map[interface{}]producerMessageContext) defer close(wrapped.successes) defer close(wrapped.errors) for { select { - case t := <-wrapped.close: - switch t { - case closeSync: - go func() { - wrapped.closeErr <- p.Close() - }() - case closeAsync: - p.AsyncClose() - } case msg := <-wrapped.input: - msg.Metadata = uuid.New() + // Shut down if message metadata is a close type. + // Sarama will close after dispatching every message. + // So wrapper should follow this mechanism by adding a special message at + // the end of the input channel. + if ct, ok := msg.Metadata.(closeType); ok { + switch ct { + case closeSync: + go func() { + wrapped.closeErr <- p.Close() + }() + case closeAsync: + p.AsyncClose() + } + continue + } + span := startProducerSpan(cfg, saramaConfig.Version, msg) + + // Create message context, backend message metadata + mc := producerMessageContext{ + metadataBackup: msg.Metadata, + span: span, + } + + // Specific metadata with span id + msg.Metadata = span.SpanContext().SpanID + p.Input() <- msg if saramaConfig.Producer.Return.Successes { - spans[msg.Metadata] = span + producerMessageContexts[msg.Metadata] = mc } else { // If returning successes isn't enabled, we just finish the // span right away because there's no way to know when it will @@ -167,9 +189,12 @@ func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama return } key := msg.Metadata - if span, ok := spans[key]; ok { - delete(spans, key) - finishProducerSpan(span, msg.Partition, msg.Offset, nil) + if mc, ok := producerMessageContexts[key]; ok { + delete(producerMessageContexts, key) + finishProducerSpan(mc.span, msg.Partition, msg.Offset, nil) + + // Restore message metadata + msg.Metadata = mc.metadataBackup } wrapped.successes <- msg case err, ok := <-p.Errors(): @@ -178,9 +203,9 @@ func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama return } key := err.Msg.Metadata - if span, ok := spans[key]; ok { - delete(spans, key) - finishProducerSpan(span, err.Msg.Partition, err.Msg.Offset, err.Err) + if mc, ok := producerMessageContexts[key]; ok { + delete(producerMessageContexts, key) + finishProducerSpan(mc.span, err.Msg.Partition, err.Msg.Offset, err.Err) } wrapped.errors <- err } diff --git a/instrumentation/github.com/Shopify/sarama/producer_test.go b/instrumentation/github.com/Shopify/sarama/producer_test.go index 25707f13c29..af5877039a4 100644 --- a/instrumentation/github.com/Shopify/sarama/producer_test.go +++ b/instrumentation/github.com/Shopify/sarama/producer_test.go @@ -235,10 +235,13 @@ func TestWrapAsyncProducer(t *testing.T) { msgList := createMessages(mt) // Send message - for _, msg := range msgList { + for i, msg := range msgList { mockAsyncProducer.ExpectInputAndSucceed() + // Add metadata to msg + msg.Metadata = i ap.Input() <- msg - <-ap.Successes() + newMsg := <-ap.Successes() + assert.Equal(t, newMsg, msg) } err := ap.Close() @@ -289,6 +292,9 @@ func TestWrapAsyncProducer(t *testing.T) { assert.Equal(t, k.Value, span.Attributes[k.Key], k.Key) } + // Check metadata + assert.Equal(t, i, msg.Metadata) + // Check tracing propagation remoteSpanFromMessage := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), propagators, NewProducerMessageCarrier(msg))) assert.True(t, remoteSpanFromMessage.IsValid())