Skip to content

Commit

Permalink
pubsub: implement synchronous mode for Receive
Browse files Browse the repository at this point in the history
Add a boolean ReceiveSetting called Synchronous, false by default.

When true, Receive uses the ordinary Pull RPC instead of
StreamingPull, and is careful to ensure that the number of messages in
the client never exceeds MaxOutstandingMessages.

Fixes #1088.

Change-Id: I2ef9d06263d6487c14786e7ed98580e52254cd47
Reviewed-on: https://code-review.googlesource.com/33330
Reviewed-by: kokoro <[email protected]>
Reviewed-by: Michael Darakananda <[email protected]>
  • Loading branch information
jba committed Sep 26, 2018
1 parent de78205 commit afb8009
Show file tree
Hide file tree
Showing 6 changed files with 282 additions and 103 deletions.
12 changes: 12 additions & 0 deletions pubsub/flow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
package pubsub

import (
"sync/atomic"

"golang.org/x/net/context"
"golang.org/x/sync/semaphore"
)

// flowController implements flow control for Subscription.Receive.
type flowController struct {
maxCount int
maxSize int // max total size of messages
semCount, semSize *semaphore.Weighted // enforces max number and size of messages
count_ int64 // acquires - releases (atomic)
}

// newFlowController creates a new flowController that ensures no more than
Expand All @@ -31,6 +35,7 @@ type flowController struct {
// respectively.
func newFlowController(maxCount, maxSize int) *flowController {
fc := &flowController{
maxCount: maxCount,
maxSize: maxSize,
semCount: nil,
semSize: nil,
Expand Down Expand Up @@ -63,6 +68,7 @@ func (f *flowController) acquire(ctx context.Context, size int) error {
return err
}
}
atomic.AddInt64(&f.count_, 1)
return nil
}

Expand All @@ -85,11 +91,13 @@ func (f *flowController) tryAcquire(size int) bool {
return false
}
}
atomic.AddInt64(&f.count_, 1)
return true
}

// release notes that one message of size bytes is no longer outstanding.
func (f *flowController) release(size int) {
atomic.AddInt64(&f.count_, -1)
if f.semCount != nil {
f.semCount.Release(1)
}
Expand All @@ -104,3 +112,7 @@ func (f *flowController) bound(size int) int64 {
}
return int64(size)
}

func (f *flowController) count() int {
return int(atomic.LoadInt64(&f.count_))
}
26 changes: 21 additions & 5 deletions pubsub/flow_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ func TestFlowControllerSaturation(t *testing.T) {
} {
fc := newFlowController(maxCount, maxSize)
// Atomically track flow controller state.
var curCount, curSize int64
// The flowController itself tracks count.
var curSize int64
success := errors.New("")
// Time out if wantSize or wantCount is never reached.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
Expand All @@ -143,7 +144,7 @@ func TestFlowControllerSaturation(t *testing.T) {
if err := fc.acquire(ctx, test.acquireSize); err != nil {
return err
}
c := atomic.AddInt64(&curCount, 1)
c := int64(fc.count())
if c > test.wantCount {
return fmt.Errorf("count %d exceeds want %d", c, test.wantCount)
}
Expand All @@ -158,9 +159,6 @@ func TestFlowControllerSaturation(t *testing.T) {
hitSize = true
}
time.Sleep(5 * time.Millisecond) // Let other goroutines make progress.
if atomic.AddInt64(&curCount, -1) < 0 {
return errors.New("negative count")
}
if atomic.AddInt64(&curSize, -int64(test.acquireSize)) < 0 {
return errors.New("negative size")
}
Expand Down Expand Up @@ -217,6 +215,24 @@ func TestFlowControllerUnboundedCount(t *testing.T) {
}
}

func TestFlowControllerUnboundedCount2(t *testing.T) {
t.Parallel()
ctx := context.Background()
fc := newFlowController(0, 0)
// Successfully acquire 4 bytes.
if err := fc.acquire(ctx, 4); err != nil {
t.Errorf("got %v, wanted no error", err)
}
fc.release(1)
fc.release(1)
fc.release(1)
wantCount := int64(-2)
c := int64(fc.count())
if c != wantCount {
t.Fatalf("got count %d, want %d", c, wantCount)
}
}

func TestFlowControllerUnboundedBytes(t *testing.T) {
t.Parallel()
ctx := context.Background()
Expand Down
109 changes: 61 additions & 48 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,55 +106,11 @@ func TestIntegration_All(t *testing.T) {
t.Errorf("subscription %s should exist, but it doesn't", sub.ID())
}

var msgs []*Message
for i := 0; i < 10; i++ {
text := fmt.Sprintf("a message with an index %d", i)
attrs := make(map[string]string)
attrs["foo"] = "bar"
msgs = append(msgs, &Message{
Data: []byte(text),
Attributes: attrs,
})
}

// Publish the messages.
type pubResult struct {
m *Message
r *PublishResult
}
var rs []pubResult
for _, m := range msgs {
r := topic.Publish(ctx, m)
rs = append(rs, pubResult{m, r})
}
want := make(map[string]*messageData)
for _, res := range rs {
id, err := res.r.Get(ctx)
if err != nil {
t.Fatal(err)
for _, sync := range []bool{false, true} {
for _, maxMsgs := range []int{0, 3, -1} { // MaxOutstandingMessages = default, 3, unlimited
testPublishAndReceive(t, topic, sub, maxMsgs, sync)
}
md := extractMessageData(res.m)
md.ID = id
want[md.ID] = md
}

// Use a timeout to ensure that Pull does not block indefinitely if there are unexpectedly few messages available.
timeoutCtx, _ := context.WithTimeout(ctx, time.Minute)
gotMsgs, err := pullN(timeoutCtx, sub, len(want), func(ctx context.Context, m *Message) {
m.Ack()
})
if err != nil {
t.Fatalf("Pull: %v", err)
}
got := make(map[string]*messageData)
for _, m := range gotMsgs {
md := extractMessageData(m)
got[md.ID] = md
}
if !testutil.Equal(got, want) {
t.Errorf("messages: got: %v ; want: %v", got, want)
}

if msg, ok := testIAM(ctx, topic.IAM(), "pubsub.topics.get"); !ok {
t.Errorf("topic IAM: %s", msg)
}
Expand All @@ -167,7 +123,8 @@ func TestIntegration_All(t *testing.T) {
t.Fatalf("CreateSnapshot error: %v", err)
}

timeoutCtx, _ = context.WithTimeout(ctx, time.Minute)
timeoutCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
err = internal.Retry(timeoutCtx, gax.Backoff{}, func() (bool, error) {
snapIt := client.Snapshots(timeoutCtx)
for {
Expand Down Expand Up @@ -221,6 +178,62 @@ func TestIntegration_All(t *testing.T) {
}
}

func testPublishAndReceive(t *testing.T, topic *Topic, sub *Subscription, maxMsgs int, synchronous bool) {
ctx := context.Background()
var msgs []*Message
for i := 0; i < 10; i++ {
text := fmt.Sprintf("a message with an index %d", i)
attrs := make(map[string]string)
attrs["foo"] = "bar"
msgs = append(msgs, &Message{
Data: []byte(text),
Attributes: attrs,
})
}

// Publish some messages.
type pubResult struct {
m *Message
r *PublishResult
}
var rs []pubResult
for _, m := range msgs {
r := topic.Publish(ctx, m)
rs = append(rs, pubResult{m, r})
}
want := make(map[string]*messageData)
for _, res := range rs {
id, err := res.r.Get(ctx)
if err != nil {
t.Fatal(err)
}
md := extractMessageData(res.m)
md.ID = id
want[md.ID] = md
}

sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs
sub.ReceiveSettings.Synchronous = synchronous
// Use a timeout to ensure that Pull does not block indefinitely if there are
// unexpectedly few messages available.
timeoutCtx, _ := context.WithTimeout(ctx, time.Minute)
gotMsgs, err := pullN(timeoutCtx, sub, len(want), func(ctx context.Context, m *Message) {
m.Ack()
})
if err != nil {
t.Fatalf("Pull: %v", err)
}
got := make(map[string]*messageData)
for _, m := range gotMsgs {
md := extractMessageData(m)
got[md.ID] = md
}
if !testutil.Equal(got, want) {
t.Errorf("MaxOutstandingMessages=%d, Synchronous=%t: messages: got: %v ; want: %v",
maxMsgs, synchronous, got, want)
}
}

// IAM tests.
// NOTE: for these to succeed, the test runner identity must have the Pub/Sub Admin or Owner roles.
// To set, visit https://console.developers.google.com, select "IAM & Admin" from the top-left
Expand Down
Loading

0 comments on commit afb8009

Please sign in to comment.