Skip to content

Commit

Permalink
Fix async producer may not dispatch the last message before close
Browse files Browse the repository at this point in the history
  • Loading branch information
XSAM committed Jul 17, 2020
1 parent 75d24d4 commit 253bc31
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 28 deletions.
1 change: 0 additions & 1 deletion instrumentation/github.com/Shopify/sarama/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ replace go.opentelemetry.io/contrib => ../../../..

require (
github.com/Shopify/sarama v1.26.4
github.com/google/uuid v1.1.1
github.com/stretchr/testify v1.6.1
go.opentelemetry.io/contrib v0.7.0
go.opentelemetry.io/otel v0.8.0
Expand Down
2 changes: 0 additions & 2 deletions instrumentation/github.com/Shopify/sarama/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE=
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8=
Expand Down
71 changes: 48 additions & 23 deletions instrumentation/github.com/Shopify/sarama/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

"github.com/Shopify/sarama"
"github.com/google/uuid"
"google.golang.org/grpc/codes"

"go.opentelemetry.io/otel/api/kv"
Expand Down Expand Up @@ -83,7 +82,6 @@ type asyncProducer struct {
input chan *sarama.ProducerMessage
successes chan *sarama.ProducerMessage
errors chan *sarama.ProducerError
close chan closeType
closeErr chan error
}

Expand All @@ -104,16 +102,25 @@ func (p *asyncProducer) Errors() <-chan *sarama.ProducerError {

// AsyncClose async close producer.
func (p *asyncProducer) AsyncClose() {
p.close <- closeAsync
p.input <- &sarama.ProducerMessage{
Metadata: closeAsync,
}
}

// Close shuts down the producer and waits for any buffered messages to be
// flushed.
func (p *asyncProducer) Close() error {
p.close <- closeSync
p.input <- &sarama.ProducerMessage{
Metadata: closeSync,
}
return <-p.closeErr
}

type producerMessageContext struct {
span trace.Span
metadataBackup interface{}
}

// WrapAsyncProducer wraps a sarama.AsyncProducer so that all produced messages
// are traced. It requires the underlying sarama Config so we can know whether
// or not successes will be returned.
Expand All @@ -131,30 +138,45 @@ func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama
input: make(chan *sarama.ProducerMessage),
successes: make(chan *sarama.ProducerMessage),
errors: make(chan *sarama.ProducerError),
close: make(chan closeType),
closeErr: make(chan error),
}
go func() {
spans := make(map[interface{}]trace.Span)
producerMessageContexts := make(map[interface{}]producerMessageContext)
defer close(wrapped.successes)
defer close(wrapped.errors)
for {
select {
case t := <-wrapped.close:
switch t {
case closeSync:
go func() {
wrapped.closeErr <- p.Close()
}()
case closeAsync:
p.AsyncClose()
}
case msg := <-wrapped.input:
msg.Metadata = uuid.New()
// Shut down if message metadata is a close type.
// Sarama will close after dispatching every message.
// So wrapper should follow this mechanism by adding a special message at
// the end of the input channel.
if ct, ok := msg.Metadata.(closeType); ok {
switch ct {
case closeSync:
go func() {
wrapped.closeErr <- p.Close()
}()
case closeAsync:
p.AsyncClose()
}
continue
}

span := startProducerSpan(cfg, saramaConfig.Version, msg)

// Create message context, backend message metadata
mc := producerMessageContext{
metadataBackup: msg.Metadata,
span: span,
}

// Specific metadata with span id
msg.Metadata = span.SpanContext().SpanID

p.Input() <- msg
if saramaConfig.Producer.Return.Successes {
spans[msg.Metadata] = span
producerMessageContexts[msg.Metadata] = mc
} else {
// If returning successes isn't enabled, we just finish the
// span right away because there's no way to know when it will
Expand All @@ -167,9 +189,12 @@ func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama
return
}
key := msg.Metadata
if span, ok := spans[key]; ok {
delete(spans, key)
finishProducerSpan(span, msg.Partition, msg.Offset, nil)
if mc, ok := producerMessageContexts[key]; ok {
delete(producerMessageContexts, key)
finishProducerSpan(mc.span, msg.Partition, msg.Offset, nil)

// Restore message metadata
msg.Metadata = mc.metadataBackup
}
wrapped.successes <- msg
case err, ok := <-p.Errors():
Expand All @@ -178,9 +203,9 @@ func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama
return
}
key := err.Msg.Metadata
if span, ok := spans[key]; ok {
delete(spans, key)
finishProducerSpan(span, err.Msg.Partition, err.Msg.Offset, err.Err)
if mc, ok := producerMessageContexts[key]; ok {
delete(producerMessageContexts, key)
finishProducerSpan(mc.span, err.Msg.Partition, err.Msg.Offset, err.Err)
}
wrapped.errors <- err
}
Expand Down
10 changes: 8 additions & 2 deletions instrumentation/github.com/Shopify/sarama/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,13 @@ func TestWrapAsyncProducer(t *testing.T) {

msgList := createMessages(mt)
// Send message
for _, msg := range msgList {
for i, msg := range msgList {
mockAsyncProducer.ExpectInputAndSucceed()
// Add metadata to msg
msg.Metadata = i
ap.Input() <- msg
<-ap.Successes()
newMsg := <-ap.Successes()
assert.Equal(t, newMsg, msg)
}

err := ap.Close()
Expand Down Expand Up @@ -289,6 +292,9 @@ func TestWrapAsyncProducer(t *testing.T) {
assert.Equal(t, k.Value, span.Attributes[k.Key], k.Key)
}

// Check metadata
assert.Equal(t, i, msg.Metadata)

// Check tracing propagation
remoteSpanFromMessage := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), propagators, NewProducerMessageCarrier(msg)))
assert.True(t, remoteSpanFromMessage.IsValid())
Expand Down

0 comments on commit 253bc31

Please sign in to comment.