Skip to content

Commit

Permalink
Producer respects Context passed to Send() and SendAsync() when apply…
Browse files Browse the repository at this point in the history
…ing backpressure

Previously, the Producer ignored the context passed to Send() and
SendAsync().

Now, the Producer respects the context in the case where the
ProducerOptions.MaxPendingMessages limit is reached.  In this case, the
producer will block until a permit for sending the message is available or
until the context expires, whichever comes first.

Failures to send messages due to context expiration are communicated to
callers via the existing TimeoutError error code.

Signed-off-by: Daniel Ferstay <[email protected]>
  • Loading branch information
Daniel Ferstay committed Jun 24, 2021
1 parent 641cb9d commit 4b2bfbb
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 7 deletions.
16 changes: 12 additions & 4 deletions pulsar/internal/semaphore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package internal

import (
"context"
"sync/atomic"

log "github.com/sirupsen/logrus"
Expand All @@ -26,7 +27,7 @@ import (
type Semaphore interface {
// Acquire a permit, if one is available and returns immediately,
// reducing the number of available permits by one.
Acquire()
Acquire(ctx context.Context) bool

// Try to acquire a permit. The method will return immediately
// with a `true` if it was possible to acquire a permit and
Expand Down Expand Up @@ -63,14 +64,21 @@ func NewSemaphore(maxPermits int32) Semaphore {
}
}

func (s *semaphore) Acquire() {
func (s *semaphore) Acquire(ctx context.Context) bool {
permits := atomic.AddInt32(&s.permits, 1)
if permits <= s.maxPermits {
return
return true
}

// Block on the channel until a new permit is available
<-s.ch
// or the context expires
select {
case <-s.ch:
return true
case <-ctx.Done():
atomic.AddInt32(&s.permits, -1)
return false
}
}

func (s *semaphore) TryAcquire() bool {
Expand Down
20 changes: 18 additions & 2 deletions pulsar/internal/semaphore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package internal

import (
"context"
"sync"
"testing"
"time"
Expand All @@ -35,7 +36,7 @@ func TestSemaphore(t *testing.T) {

for i := 0; i < n; i++ {
go func() {
s.Acquire()
assert.True(t, s.Acquire(context.Background()))
time.Sleep(100 * time.Millisecond)
s.Release()
wg.Done()
Expand All @@ -48,7 +49,7 @@ func TestSemaphore(t *testing.T) {
func TestSemaphore_TryAcquire(t *testing.T) {
s := NewSemaphore(1)

s.Acquire()
assert.True(t, s.Acquire(context.Background()))

assert.False(t, s.TryAcquire())

Expand All @@ -58,3 +59,18 @@ func TestSemaphore_TryAcquire(t *testing.T) {
assert.False(t, s.TryAcquire())
s.Release()
}

func TestSemaphore_ContextExpire(t *testing.T) {
s := NewSemaphore(1)

assert.True(t, s.Acquire(context.Background()))

ctx, cancel := context.WithCancel(context.Background())
cancel()
assert.False(t, s.Acquire(ctx))

assert.False(t, s.TryAcquire())
s.Release()

assert.True(t, s.TryAcquire())
}
6 changes: 5 additions & 1 deletion pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
errFailAddToBatch = newError(AddToBatchFailed, "message add to batch failed")
errSendTimeout = newError(TimeoutError, "message send timeout")
errSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue is full")
errContextExpired = newError(TimeoutError, "message send context expired")
errMessageTooLarge = newError(MessageTooBig, "message size exceeds MaxMessageSize")

buffersPool sync.Pool
Expand Down Expand Up @@ -658,7 +659,10 @@ func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *Producer
return
}
} else {
p.publishSemaphore.Acquire()
if !p.publishSemaphore.Acquire(ctx) {
callback(nil, msg, errContextExpired)
return
}
}

p.metrics.MessagesPending.Inc()
Expand Down
66 changes: 66 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,72 @@ func TestSendTimeout(t *testing.T) {
makeHTTPCall(t, http.MethodDelete, quotaURL, "")
}

func TestSendContextExpired(t *testing.T) {
quotaURL := adminURL + "/admin/v2/namespaces/public/default/backlogQuota"
quotaFmt := `{"limit": "%d", "policy": "producer_request_hold"}`
makeHTTPCall(t, http.MethodPost, quotaURL, fmt.Sprintf(quotaFmt, 1024))

client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.NoError(t, err)
defer client.Close()

topicName := newTopicName()
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: "send_context_expired_sub",
})
assert.Nil(t, err)
defer consumer.Close() // subscribe but do nothing

noRetry := uint(0)
producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
MaxPendingMessages: 1,
SendTimeout: 2 * time.Second,
MaxReconnectToBroker: &noRetry,
})
assert.Nil(t, err)
defer producer.Close()

// first send completes and fills the available backlog
id, err := producer.Send(context.Background(), &ProducerMessage{
Payload: make([]byte, 1024),
})
assert.Nil(t, err)
assert.NotNil(t, id)

// waiting for the backlog check
time.Sleep((5 + 1) * time.Second)

// next publish will not complete due to the backlog quota being full;
// this consumes the only available MaxPendingMessages permit
wg := sync.WaitGroup{}
wg.Add(1)
producer.SendAsync(context.Background(), &ProducerMessage{
Payload: make([]byte, 1024),
}, func(_ MessageID, _ *ProducerMessage, _ error) {
// we're not interested in the result of this send, but we don't
// want to exit this test case until it completes
wg.Done()
})

// final publish will block waiting for a send permit to become available
// then fail when the ctx times out
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
id, err = producer.Send(ctx, &ProducerMessage{
Payload: make([]byte, 1024),
})
assert.NotNil(t, err)
assert.Nil(t, id)

wg.Wait()

makeHTTPCall(t, http.MethodDelete, quotaURL, "")
}

type noopProduceInterceptor struct{}

func (noopProduceInterceptor) BeforeSend(producer Producer, message *ProducerMessage) {}
Expand Down

0 comments on commit 4b2bfbb

Please sign in to comment.