Skip to content

Commit

Permalink
refactor(pubsublite): wire subscriber delivers single messages (#3326)
Browse files Browse the repository at this point in the history
Outstanding acks are only registered with the ackTracker for delivered messages. The committer stream will wait for outstanding acks before terminating.
  • Loading branch information
tmdiep authored Dec 2, 2020
1 parent 5344d93 commit b5e8ad5
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 87 deletions.
7 changes: 7 additions & 0 deletions pubsublite/internal/wire/acks.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ func (at *ackTracker) unsafeProcessAcks() {
}
}

// Empty returns true if there are no outstanding acks.
func (at *ackTracker) Empty() bool {
at.mu.Lock()
defer at.mu.Unlock()
return at.outstandingAcks.Front() == nil
}

// commitCursorTracker tracks pending and last successful committed offsets.
// It is only accessed by the committer.
type commitCursorTracker struct {
Expand Down
15 changes: 7 additions & 8 deletions pubsublite/internal/wire/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (c *committer) Start() {
}
}

// Stop initiates shutdown of the committer. The final commit offset will be
// send to the server, but acks that arrive afterward will be discarded.
// Stop initiates shutdown of the committer. It will wait for outstanding acks
// and send the final commit offset to the server.
func (c *committer) Stop() {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -194,23 +194,22 @@ func (c *committer) unsafeInitiateShutdown(targetStatus serviceStatus, err error
return
}

// Discard outstanding acks.
c.acks.Release()

// If it's a graceful shutdown, expedite sending final commits to the stream.
if targetStatus == serviceTerminating {
c.unsafeCommitOffsetToStream()
c.unsafeCheckDone()
return
}
// Otherwise immediately terminate the stream.

// Otherwise discard outstanding acks and immediately terminate the stream.
c.acks.Release()
c.unsafeTerminate()
}

func (c *committer) unsafeCheckDone() {
// The commit stream can be closed once the final commit offset has been
// confirmed.
if c.status == serviceTerminating && c.cursorTracker.UpToDate() {
// confirmed and there are no outstanding acks.
if c.status == serviceTerminating && c.cursorTracker.UpToDate() && c.acks.Empty() {
c.unsafeTerminate()
}
}
Expand Down
3 changes: 2 additions & 1 deletion pubsublite/internal/wire/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func TestCommitterStopFlushesCommits(t *testing.T) {
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
stream.Push(commitReq(34), commitResp(1), nil)
stream.Push(commitReq(56), commitResp(1), nil)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)

mockServer.OnTestStart(verifiers)
Expand All @@ -114,7 +115,7 @@ func TestCommitterStopFlushesCommits(t *testing.T) {

ack1.Ack()
cmt.Stop() // Stop should flush the first offset
ack2.Ack() // Acks after Stop() are discarded
ack2.Ack() // Acks after Stop() are processed
cmt.SendBatchCommit()
// Committer terminates when all acks are processed.
if gotErr := cmt.FinalError(); gotErr != nil {
Expand Down
60 changes: 23 additions & 37 deletions pubsublite/internal/wire/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"errors"
"reflect"
"sync"
"time"

"google.golang.org/grpc"
Expand All @@ -40,64 +39,55 @@ type ReceivedMessage struct {
Ack AckConsumer
}

// MessageReceiverFunc receives a batch of Pub/Sub messages from a topic
// partition.
type MessageReceiverFunc func([]*ReceivedMessage)
// MessageReceiverFunc receives a Pub/Sub message from a topic partition.
type MessageReceiverFunc func(*ReceivedMessage)

const maxMessagesBufferSize = 1000
const maxMessageBufferSize = 10000

// messageDeliveryQueue delivers received messages to the client-provided
// MessageReceiverFunc sequentially.
type messageDeliveryQueue struct {
receiver MessageReceiverFunc
messagesC chan []*ReceivedMessage
messagesC chan *ReceivedMessage
stopC chan struct{}

// Fields below must be guarded with mu.
mu sync.Mutex
status serviceStatus
acks *ackTracker
status serviceStatus
}

func newMessageDeliveryQueue(receiver MessageReceiverFunc, bufferSize int) *messageDeliveryQueue {
// The buffer size is based on ReceiveSettings.MaxOutstandingMessages to
// handle the worst case of single messages. But ensure there's a reasonable
// limit as channel buffer capacity is allocated on creation.
if bufferSize > maxMessagesBufferSize {
bufferSize = maxMessagesBufferSize
func newMessageDeliveryQueue(acks *ackTracker, receiver MessageReceiverFunc, bufferSize int) *messageDeliveryQueue {
// The buffer size is based on ReceiveSettings.MaxOutstandingMessages. But
// ensure there's a reasonable limit as channel buffer capacity is allocated
// on creation.
if bufferSize > maxMessageBufferSize {
bufferSize = maxMessageBufferSize
}
return &messageDeliveryQueue{
acks: acks,
receiver: receiver,
messagesC: make(chan []*ReceivedMessage, bufferSize),
messagesC: make(chan *ReceivedMessage, bufferSize),
stopC: make(chan struct{}),
}
}

func (mq *messageDeliveryQueue) Start() {
mq.mu.Lock()
defer mq.mu.Unlock()

if mq.status == serviceUninitialized {
go mq.deliverMessages()
mq.status = serviceActive
}
}

func (mq *messageDeliveryQueue) Stop() {
mq.mu.Lock()
defer mq.mu.Unlock()

if mq.status < serviceTerminated {
close(mq.stopC)
mq.status = serviceTerminated
}
}

func (mq *messageDeliveryQueue) Add(messages []*ReceivedMessage) {
mq.mu.Lock()
defer mq.mu.Unlock()

if mq.status == serviceActive {
mq.messagesC <- messages
for _, msg := range messages {
mq.messagesC <- msg
}
}
}

Expand All @@ -113,8 +103,11 @@ func (mq *messageDeliveryQueue) deliverMessages() {
select {
case <-mq.stopC:
return // Ends the goroutine.
case msgs := <-mq.messagesC:
mq.receiver(msgs)
case msg := <-mq.messagesC:
// Register outstanding acks, which are primarily handled by the
// `committer`.
mq.acks.Push(msg.Ack.(*ackConsumer))
mq.receiver(msg)
}
}
}
Expand All @@ -138,7 +131,6 @@ type subscribeStream struct {

// Fields below must be guarded with mu.
stream *retryableStream
acks *ackTracker
offsetTracker subscriberOffsetTracker
flowControl flowControlBatcher
pollFlowControl *periodicTask
Expand All @@ -162,8 +154,7 @@ func newSubscribeStream(ctx context.Context, subClient *vkit.SubscriberClient, s
},
},
},
messageQueue: newMessageDeliveryQueue(receiver, settings.MaxOutstandingMessages),
acks: acks,
messageQueue: newMessageDeliveryQueue(acks, receiver, settings.MaxOutstandingMessages),
}
s.stream = newRetryableStream(ctx, s, settings.Timeout, reflect.TypeOf(pb.SubscribeResponse{}))

Expand Down Expand Up @@ -290,12 +281,7 @@ func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse)

var receivedMsgs []*ReceivedMessage
for _, msg := range response.Messages {
// Register outstanding acks, which are primarily handled by the
// `committer`.
ack := newAckConsumer(msg.GetCursor().GetOffset(), msg.GetSizeBytes(), s.onAck)
if err := s.acks.Push(ack); err != nil {
return err
}
receivedMsgs = append(receivedMsgs, &ReceivedMessage{Msg: msg, Ack: ack})
}
s.messageQueue.Add(receivedMsgs)
Expand Down
Loading

0 comments on commit b5e8ad5

Please sign in to comment.