Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(pubsublite): wire subscriber delivers single messages #3326

Merged
merged 5 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pubsublite/internal/wire/acks.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ func (at *ackTracker) unsafeProcessAcks() {
}
}

// Empty returns true if there are no outstanding acks.
func (at *ackTracker) Empty() bool {
at.mu.Lock()
defer at.mu.Unlock()
return at.outstandingAcks.Front() == nil
}

// commitCursorTracker tracks pending and last successful committed offsets.
// It is only accessed by the committer.
type commitCursorTracker struct {
Expand Down
15 changes: 7 additions & 8 deletions pubsublite/internal/wire/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func (c *committer) Start() {
}
}

// Stop initiates shutdown of the committer. The final commit offset will be
// send to the server, but acks that arrive afterward will be discarded.
// Stop initiates shutdown of the committer. It will wait for outstanding acks
// and send the final commit offset to the server.
func (c *committer) Stop() {
c.mu.Lock()
defer c.mu.Unlock()
Expand Down Expand Up @@ -194,23 +194,22 @@ func (c *committer) unsafeInitiateShutdown(targetStatus serviceStatus, err error
return
}

// Discard outstanding acks.
c.acks.Release()

// If it's a graceful shutdown, expedite sending final commits to the stream.
if targetStatus == serviceTerminating {
c.unsafeCommitOffsetToStream()
c.unsafeCheckDone()
return
}
// Otherwise immediately terminate the stream.

// Otherwise discard outstanding acks and immediately terminate the stream.
c.acks.Release()
c.unsafeTerminate()
}

func (c *committer) unsafeCheckDone() {
// The commit stream can be closed once the final commit offset has been
// confirmed.
if c.status == serviceTerminating && c.cursorTracker.UpToDate() {
// confirmed and there are no outstanding acks.
if c.status == serviceTerminating && c.cursorTracker.UpToDate() && c.acks.Empty() {
c.unsafeTerminate()
}
}
Expand Down
3 changes: 2 additions & 1 deletion pubsublite/internal/wire/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func TestCommitterStopFlushesCommits(t *testing.T) {
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
stream.Push(commitReq(34), commitResp(1), nil)
stream.Push(commitReq(56), commitResp(1), nil)
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)

mockServer.OnTestStart(verifiers)
Expand All @@ -114,7 +115,7 @@ func TestCommitterStopFlushesCommits(t *testing.T) {

ack1.Ack()
cmt.Stop() // Stop should flush the first offset
ack2.Ack() // Acks after Stop() are discarded
ack2.Ack() // Acks after Stop() are processed
cmt.SendBatchCommit()
// Committer terminates when all acks are processed.
if gotErr := cmt.FinalError(); gotErr != nil {
Expand Down
60 changes: 23 additions & 37 deletions pubsublite/internal/wire/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"errors"
"reflect"
"sync"
"time"

"google.golang.org/grpc"
Expand All @@ -40,64 +39,55 @@ type ReceivedMessage struct {
Ack AckConsumer
}

// MessageReceiverFunc receives a batch of Pub/Sub messages from a topic
// partition.
type MessageReceiverFunc func([]*ReceivedMessage)
// MessageReceiverFunc receives a Pub/Sub message from a topic partition.
type MessageReceiverFunc func(*ReceivedMessage)

const maxMessagesBufferSize = 1000
const maxMessageBufferSize = 10000

// messageDeliveryQueue delivers received messages to the client-provided
// MessageReceiverFunc sequentially.
type messageDeliveryQueue struct {
receiver MessageReceiverFunc
messagesC chan []*ReceivedMessage
messagesC chan *ReceivedMessage
stopC chan struct{}

// Fields below must be guarded with mu.
mu sync.Mutex
status serviceStatus
acks *ackTracker
status serviceStatus
}

func newMessageDeliveryQueue(receiver MessageReceiverFunc, bufferSize int) *messageDeliveryQueue {
// The buffer size is based on ReceiveSettings.MaxOutstandingMessages to
// handle the worst case of single messages. But ensure there's a reasonable
// limit as channel buffer capacity is allocated on creation.
if bufferSize > maxMessagesBufferSize {
bufferSize = maxMessagesBufferSize
func newMessageDeliveryQueue(acks *ackTracker, receiver MessageReceiverFunc, bufferSize int) *messageDeliveryQueue {
// The buffer size is based on ReceiveSettings.MaxOutstandingMessages. But
// ensure there's a reasonable limit as channel buffer capacity is allocated
// on creation.
if bufferSize > maxMessageBufferSize {
bufferSize = maxMessageBufferSize
}
return &messageDeliveryQueue{
acks: acks,
receiver: receiver,
messagesC: make(chan []*ReceivedMessage, bufferSize),
messagesC: make(chan *ReceivedMessage, bufferSize),
stopC: make(chan struct{}),
}
}

func (mq *messageDeliveryQueue) Start() {
mq.mu.Lock()
defer mq.mu.Unlock()

if mq.status == serviceUninitialized {
go mq.deliverMessages()
mq.status = serviceActive
}
}

func (mq *messageDeliveryQueue) Stop() {
mq.mu.Lock()
defer mq.mu.Unlock()

if mq.status < serviceTerminated {
close(mq.stopC)
mq.status = serviceTerminated
}
}

func (mq *messageDeliveryQueue) Add(messages []*ReceivedMessage) {
mq.mu.Lock()
defer mq.mu.Unlock()

if mq.status == serviceActive {
mq.messagesC <- messages
for _, msg := range messages {
mq.messagesC <- msg
}
}
}

Expand All @@ -113,8 +103,11 @@ func (mq *messageDeliveryQueue) deliverMessages() {
select {
case <-mq.stopC:
return // Ends the goroutine.
case msgs := <-mq.messagesC:
mq.receiver(msgs)
case msg := <-mq.messagesC:
// Register outstanding acks, which are primarily handled by the
// `committer`.
mq.acks.Push(msg.Ack.(*ackConsumer))
mq.receiver(msg)
}
}
}
Expand All @@ -138,7 +131,6 @@ type subscribeStream struct {

// Fields below must be guarded with mu.
stream *retryableStream
acks *ackTracker
offsetTracker subscriberOffsetTracker
flowControl flowControlBatcher
pollFlowControl *periodicTask
Expand All @@ -162,8 +154,7 @@ func newSubscribeStream(ctx context.Context, subClient *vkit.SubscriberClient, s
},
},
},
messageQueue: newMessageDeliveryQueue(receiver, settings.MaxOutstandingMessages),
acks: acks,
messageQueue: newMessageDeliveryQueue(acks, receiver, settings.MaxOutstandingMessages),
}
s.stream = newRetryableStream(ctx, s, settings.Timeout, reflect.TypeOf(pb.SubscribeResponse{}))

Expand Down Expand Up @@ -290,12 +281,7 @@ func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse)

var receivedMsgs []*ReceivedMessage
for _, msg := range response.Messages {
// Register outstanding acks, which are primarily handled by the
// `committer`.
ack := newAckConsumer(msg.GetCursor().GetOffset(), msg.GetSizeBytes(), s.onAck)
if err := s.acks.Push(ack); err != nil {
return err
}
receivedMsgs = append(receivedMsgs, &ReceivedMessage{Msg: msg, Ack: ack})
}
s.messageQueue.Add(receivedMsgs)
Expand Down
Loading